TestJobInProgress.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. package org.apache.hadoop.mapred;
  2. import java.io.DataOutputStream;
  3. import java.io.IOException;
  4. import java.util.Iterator;
  5. import java.util.Map;
  6. import java.util.Set;
  7. import java.util.HashSet;
  8. import org.apache.commons.logging.Log;
  9. import org.apache.commons.logging.LogFactory;
  10. import org.apache.hadoop.conf.Configuration;
  11. import org.apache.hadoop.examples.RandomWriter;
  12. import org.apache.hadoop.fs.FileSystem;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.hdfs.MiniDFSCluster;
  15. import org.apache.hadoop.io.IntWritable;
  16. import org.apache.hadoop.io.LongWritable;
  17. import org.apache.hadoop.io.Text;
  18. import org.apache.hadoop.mapred.UtilsForTests;
  19. import org.apache.hadoop.mapred.lib.IdentityMapper;
  20. import org.apache.hadoop.mapred.lib.IdentityReducer;
  21. import org.apache.hadoop.net.Node;
  22. import junit.framework.TestCase;
  23. public class TestJobInProgress extends TestCase {
  24.   static final Log LOG = LogFactory.getLog(TestJobInProgress.class);
  25.   private MiniMRCluster mrCluster;
  26.   private MiniDFSCluster dfsCluster;
  27.   JobTracker jt;
  28.   private static Path TEST_DIR = 
  29.     new Path(System.getProperty("test.build.data","/tmp"), "jip-testing");
  30.   private static int numSlaves = 4;
  31.   public static class FailMapTaskJob extends MapReduceBase implements
  32.       Mapper<LongWritable, Text, Text, IntWritable> {
  33.     @Override
  34.     public void map(LongWritable key, Text value,
  35.         OutputCollector<Text, IntWritable> output, Reporter reporter)
  36.         throws IOException {
  37.       // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
  38.       try {
  39.         Thread.sleep(1000);
  40.       } catch (InterruptedException e) {
  41.         throw new IllegalArgumentException("Interrupted MAP task");
  42.       }
  43.       throw new IllegalArgumentException("Failing MAP task");
  44.     }
  45.   }
  46.   // Suppressing waring as we just need to write a failing reduce task job
  47.   // We don't need to bother about the actual key value pairs which are passed.
  48.   @SuppressWarnings("unchecked")
  49.   public static class FailReduceTaskJob extends MapReduceBase implements
  50.       Reducer {
  51.     @Override
  52.     public void reduce(Object key, Iterator values, OutputCollector output,
  53.         Reporter reporter) throws IOException {
  54.       // reporter.incrCounter(TaskCounts.LaunchedTask, 1);
  55.       try {
  56.         Thread.sleep(1000);
  57.       } catch (InterruptedException e) {
  58.         throw new IllegalArgumentException("Failing Reduce task");
  59.       }
  60.       throw new IllegalArgumentException("Failing Reduce task");
  61.     }
  62.   }
  63.   @Override
  64.   protected void setUp() throws Exception {
  65.     // TODO Auto-generated method stub
  66.     super.setUp();
  67.     Configuration conf = new Configuration();
  68.     dfsCluster = new MiniDFSCluster(conf, numSlaves, true, null);
  69.     mrCluster = new MiniMRCluster(numSlaves, dfsCluster.getFileSystem()
  70.         .getUri().toString(), 1);
  71.     jt = mrCluster.getJobTrackerRunner().getJobTracker();
  72.   }
  73.   public void testPendingMapTaskCount() throws Exception {
  74.     launchTask(FailMapTaskJob.class, IdentityReducer.class);
  75.     checkTaskCounts();
  76.   }
  77.   
  78.   public void testPendingReduceTaskCount() throws Exception {
  79.     launchTask(IdentityMapper.class, FailReduceTaskJob.class);
  80.     checkTaskCounts();
  81.   }
  82.   /**
  83.    * Test if running tasks are correctly maintained for various types of jobs
  84.    */
  85.   private void testRunningTaskCount(boolean speculation, boolean locality)
  86.   throws Exception {
  87.     LOG.info("Testing running jobs with speculation : " + speculation 
  88.              + ", locality : " + locality);
  89.     // cleanup
  90.     dfsCluster.getFileSystem().delete(TEST_DIR, true);
  91.     
  92.     final Path mapSignalFile = new Path(TEST_DIR, "map-signal");
  93.     final Path redSignalFile = new Path(TEST_DIR, "reduce-signal");
  94.     
  95.     // configure a waiting job with 2 maps and 2 reducers
  96.     JobConf job = 
  97.       configure(UtilsForTests.WaitingMapper.class, IdentityReducer.class, 1, 1,
  98.                 locality);
  99.     job.set(UtilsForTests.getTaskSignalParameter(true), mapSignalFile.toString());
  100.     job.set(UtilsForTests.getTaskSignalParameter(false), redSignalFile.toString());
  101.     
  102.     // Disable slow-start for reduces since this maps don't complete 
  103.     // in these test-cases...
  104.     job.setFloat("mapred.reduce.slowstart.completed.maps", 0.0f);
  105.     
  106.     // test jobs with speculation
  107.     job.setSpeculativeExecution(speculation);
  108.     JobClient jc = new JobClient(job);
  109.     RunningJob running = jc.submitJob(job);
  110.     JobTracker jobtracker = mrCluster.getJobTrackerRunner().getJobTracker();
  111.     JobInProgress jip = jobtracker.getJob(running.getID());
  112.     LOG.info("Running job " + jip.getJobID());
  113.     
  114.     // wait
  115.     LOG.info("Waiting for job " + jip.getJobID() + " to be ready");
  116.     waitTillReady(jip, job);
  117.     
  118.     // check if the running structures are populated
  119.     Set<TaskInProgress> uniqueTasks = new HashSet<TaskInProgress>();
  120.     for (Map.Entry<Node, Set<TaskInProgress>> s : 
  121.            jip.getRunningMapCache().entrySet()) {
  122.       uniqueTasks.addAll(s.getValue());
  123.     }
  124.     
  125.     // add non local map tasks
  126.     uniqueTasks.addAll(jip.getNonLocalRunningMaps());
  127.     
  128.     assertEquals("Running map count doesnt match for jobs with speculation " 
  129.                  + speculation + ", and locality " + locality,
  130.                  jip.runningMaps(), uniqueTasks.size());
  131.     assertEquals("Running reducer count doesnt match for jobs with speculation "
  132.                  + speculation + ", and locality " + locality,
  133.                  jip.runningReduces(), jip.getRunningReduces().size());
  134.     
  135.     // signal the tasks
  136.     LOG.info("Signaling the tasks");
  137.     UtilsForTests.signalTasks(dfsCluster, dfsCluster.getFileSystem(),
  138.                               mapSignalFile.toString(), 
  139.                               redSignalFile.toString(), numSlaves);
  140.     
  141.     // wait for the job to complete
  142.     LOG.info("Waiting for job " + jip.getJobID() + " to be complete");
  143.     UtilsForTests.waitTillDone(jc);
  144.     
  145.     // cleanup
  146.     dfsCluster.getFileSystem().delete(TEST_DIR, true);
  147.   }
  148.   
  149.   // wait for the job to start
  150.   private void waitTillReady(JobInProgress jip, JobConf job) {
  151.     // wait for all the maps to get scheduled
  152.     while (jip.runningMaps() < job.getNumMapTasks()) {
  153.       UtilsForTests.waitFor(10);
  154.     }
  155.     
  156.     // wait for all the reducers to get scheduled
  157.     while (jip.runningReduces() < job.getNumReduceTasks()) {
  158.       UtilsForTests.waitFor(10);
  159.     }
  160.   }
  161.   
  162.   public void testRunningTaskCount() throws Exception {
  163.     // test with spec = false and locality=true
  164.     testRunningTaskCount(false, true);
  165.     
  166.     // test with spec = true and locality=true
  167.     testRunningTaskCount(true, true);
  168.     
  169.     // test with spec = false and locality=false
  170.     testRunningTaskCount(false, false);
  171.     
  172.     // test with spec = true and locality=false
  173.     testRunningTaskCount(true, false);
  174.   }
  175.   
  176.   @Override
  177.   protected void tearDown() throws Exception {
  178.     mrCluster.shutdown();
  179.     dfsCluster.shutdown();
  180.     super.tearDown();
  181.   }
  182.   
  183.   void launchTask(Class MapClass,Class ReduceClass) throws Exception{
  184.     JobConf job = configure(MapClass, ReduceClass, 5, 10, true);
  185.     try {
  186.       JobClient.runJob(job);
  187.     } catch (IOException ioe) {}
  188.   }
  189.   
  190.   @SuppressWarnings("unchecked")
  191.   JobConf configure(Class MapClass,Class ReduceClass, int maps, int reducers,
  192.                     boolean locality) 
  193.   throws Exception {
  194.     JobConf jobConf = mrCluster.createJobConf();
  195.     final Path inDir = new Path("./failjob/input");
  196.     final Path outDir = new Path("./failjob/output");
  197.     String input = "Test failing job.n One more line";
  198.     FileSystem inFs = inDir.getFileSystem(jobConf);
  199.     FileSystem outFs = outDir.getFileSystem(jobConf);
  200.     outFs.delete(outDir, true);
  201.     if (!inFs.mkdirs(inDir)) {
  202.       throw new IOException("create directory failed" + inDir.toString());
  203.     }
  204.     DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
  205.     file.writeBytes(input);
  206.     file.close();
  207.     jobConf.setJobName("failmaptask");
  208.     if (locality) {
  209.       jobConf.setInputFormat(TextInputFormat.class);
  210.     } else {
  211.       jobConf.setInputFormat(UtilsForTests.RandomInputFormat.class);
  212.     }
  213.     jobConf.setOutputKeyClass(Text.class);
  214.     jobConf.setOutputValueClass(Text.class);
  215.     jobConf.setMapperClass(MapClass);
  216.     jobConf.setCombinerClass(ReduceClass);
  217.     jobConf.setReducerClass(ReduceClass);
  218.     FileInputFormat.setInputPaths(jobConf, inDir);
  219.     FileOutputFormat.setOutputPath(jobConf, outDir);
  220.     jobConf.setNumMapTasks(maps);
  221.     jobConf.setNumReduceTasks(reducers);
  222.     return jobConf; 
  223.   }
  224.   void checkTaskCounts() {
  225.     JobStatus[] status = jt.getAllJobs();
  226.     for (JobStatus js : status) {
  227.       JobInProgress jip = jt.getJob(js.getJobID());
  228.       Counters counter = jip.getJobCounters();
  229.       long totalTaskCount = counter
  230.           .getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_MAPS)
  231.           + counter.getCounter(JobInProgress.Counter.TOTAL_LAUNCHED_REDUCES);
  232.       while (jip.getNumTaskCompletionEvents() < totalTaskCount) {
  233.         assertEquals(true, (jip.runningMaps() >= 0));
  234.         assertEquals(true, (jip.pendingMaps() >= 0));
  235.         assertEquals(true, (jip.runningReduces() >= 0));
  236.         assertEquals(true, (jip.pendingReduces() >= 0));
  237.       }
  238.     }
  239.   }
  240.   
  241. }