TestLostTracker.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.conf.Configuration;
  20. import org.apache.hadoop.fs.*;
  21. import org.apache.hadoop.hdfs.MiniDFSCluster;
  22. import junit.framework.TestCase;
  23. import java.io.*;
  24. public class TestLostTracker extends TestCase {
  25.   final Path testDir = new Path("/jt-lost-tt");
  26.   final Path inDir = new Path(testDir, "input");
  27.   final Path shareDir = new Path(testDir, "share");
  28.   final Path outputDir = new Path(testDir, "output");
  29.   
  30.   private JobConf configureJob(JobConf conf, int maps, int reduces,
  31.                                String mapSignal, String redSignal) 
  32.   throws IOException {
  33.     UtilsForTests.configureWaitingJobConf(conf, inDir, outputDir, 
  34.         maps, reduces, "test-lost-tt", 
  35.         mapSignal, redSignal);
  36.     return conf;
  37.   }
  38.   
  39.   public void testLostTracker(MiniDFSCluster dfs,
  40.                               MiniMRCluster mr) 
  41.   throws IOException {
  42.     FileSystem fileSys = dfs.getFileSystem();
  43.     JobConf jobConf = mr.createJobConf();
  44.     int numMaps = 10;
  45.     int numReds = 1;
  46.     String mapSignalFile = UtilsForTests.getMapSignalFile(shareDir);
  47.     String redSignalFile = UtilsForTests.getReduceSignalFile(shareDir);
  48.     
  49.     // Configure the job
  50.     JobConf job = configureJob(jobConf, numMaps, numReds, 
  51.                                mapSignalFile, redSignalFile);
  52.       
  53.     fileSys.delete(shareDir, true);
  54.     
  55.     // Submit the job   
  56.     JobClient jobClient = new JobClient(job);
  57.     RunningJob rJob = jobClient.submitJob(job);
  58.     JobID id = rJob.getID();
  59.     
  60.     // wait for the job to be inited
  61.     mr.initializeJob(id);
  62.     
  63.     // Make sure that the master job is 50% completed
  64.     while (UtilsForTests.getJobStatus(jobClient, id).mapProgress() 
  65.            < 0.5f) {
  66.       UtilsForTests.waitFor(10);
  67.     }
  68.     // get a completed task on 1st tracker 
  69.     TaskAttemptID taskid = mr.getTaskTrackerRunner(0).getTaskTracker().
  70.                               getNonRunningTasks().get(0).getTaskID();
  71.     // Kill the 1st tasktracker
  72.     mr.stopTaskTracker(0);
  73.     // Signal all the maps to complete
  74.     UtilsForTests.signalTasks(dfs, fileSys, true, mapSignalFile, redSignalFile);
  75.     
  76.     // Signal the reducers to complete
  77.     UtilsForTests.signalTasks(dfs, fileSys, false, mapSignalFile, 
  78.                               redSignalFile);
  79.     // wait till the job is done
  80.     UtilsForTests.waitTillDone(jobClient);
  81.     // Check if the tasks on the lost tracker got killed and re-executed
  82.     assertEquals(jobClient.getClusterStatus().getTaskTrackers(), 1);
  83.     assertEquals(JobStatus.SUCCEEDED, rJob.getJobState());
  84.     TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
  85.                          getTip(taskid.getTaskID());
  86.     assertTrue(tip.isComplete());
  87.     assertEquals(tip.numKilledTasks(), 1);
  88.     
  89.     // check if the task statuses for the tasks are sane
  90.     JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
  91.     for (TaskInProgress taskInProgress : jt.getJob(id).getMapTasks()) {
  92.       testTaskStatuses(taskInProgress.getTaskStatuses());
  93.     }
  94.     
  95.     // validate the history file
  96.     TestJobHistory.validateJobHistoryFileFormat(id, job, "SUCCESS", true);
  97.     TestJobHistory.validateJobHistoryFileContent(mr, rJob, job);
  98.   }
  99.   
  100.   private void testTaskStatuses(TaskStatus[] tasks) {
  101.     for (TaskStatus status : tasks) {
  102.       assertTrue("Invalid start time " + status.getStartTime(), 
  103.                  status.getStartTime() > 0);
  104.       assertTrue("Invalid finish time " + status.getFinishTime(), 
  105.                  status.getFinishTime() > 0);
  106.       assertTrue("Start time (" + status.getStartTime() + ") is greater than " 
  107.                  + "the finish time (" + status.getFinishTime() + ")", 
  108.                  status.getStartTime() <= status.getFinishTime());
  109.       assertNotNull("Task phase information is null", status.getPhase());
  110.       assertNotNull("Task run-state information is null", status.getRunState());
  111.       assertNotNull("TaskTracker information is null", status.getTaskTracker());
  112.     }
  113.   }
  114.   public void testLostTracker() throws IOException {
  115.     String namenode = null;
  116.     MiniDFSCluster dfs = null;
  117.     MiniMRCluster mr = null;
  118.     FileSystem fileSys = null;
  119.     try {
  120.       Configuration conf = new Configuration();
  121.       conf.setBoolean("dfs.replication.considerLoad", false);
  122.       dfs = new MiniDFSCluster(conf, 1, true, null, null);
  123.       dfs.waitActive();
  124.       fileSys = dfs.getFileSystem();
  125.       
  126.       // clean up
  127.       fileSys.delete(testDir, true);
  128.       
  129.       if (!fileSys.mkdirs(inDir)) {
  130.         throw new IOException("Mkdirs failed to create " + inDir.toString());
  131.       }
  132.       // Write the input file
  133.       UtilsForTests.writeFile(dfs.getNameNode(), conf, 
  134.                               new Path(inDir + "/file"), (short)1);
  135.       dfs.startDataNodes(conf, 1, true, null, null, null, null);
  136.       dfs.waitActive();
  137.       namenode = (dfs.getFileSystem()).getUri().getHost() + ":" 
  138.                  + (dfs.getFileSystem()).getUri().getPort();
  139.       JobConf jtConf = new JobConf();
  140.       jtConf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
  141.       jtConf.setInt("mapred.tasktracker.reduce.tasks.maximum", 1);
  142.       jtConf.setLong("mapred.tasktracker.expiry.interval", 10 * 1000);
  143.       jtConf.setInt("mapred.reduce.copy.backoff", 4);
  144.       
  145.       mr = new MiniMRCluster(2, namenode, 1, null, null, jtConf);
  146.       
  147.       // Test Lost tracker case
  148.       testLostTracker(dfs, mr);
  149.     } finally {
  150.       if (mr != null) {
  151.         try {
  152.           mr.shutdown();
  153.         } catch (Exception e) {}
  154.       }
  155.       if (dfs != null) {
  156.         try {
  157.           dfs.shutdown();
  158.         } catch (Exception e) {}
  159.       }
  160.     }
  161.   }
  162.   public static void main(String[] args) throws IOException {
  163.     new TestLostTracker().testLostTracker();
  164.   }
  165. }