ReliabilityTest.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.File;
  20. import java.io.FileOutputStream;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.Collection;
  24. import java.util.Collections;
  25. import java.util.Date;
  26. import java.util.HashMap;
  27. import java.util.Map;
  28. import java.util.StringTokenizer;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.commons.logging.LogFactory;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.conf.Configured;
  33. import org.apache.hadoop.fs.FileSystem;
  34. import org.apache.hadoop.fs.Path;
  35. import org.apache.hadoop.util.GenericOptionsParser;
  36. import org.apache.hadoop.util.Shell;
  37. import org.apache.hadoop.util.StringUtils;
  38. import org.apache.hadoop.util.Tool;
  39. import org.apache.hadoop.util.ToolRunner;
  40. /**
  41.  * This class tests reliability of the framework in the face of failures of 
  42.  * both tasks and tasktrackers. Steps:
  43.  * 1) Get the cluster status
  44.  * 2) Get the number of slots in the cluster
  45.  * 3) Spawn a sleepjob that occupies the entire cluster (with two waves of maps)
  46.  * 4) Get the list of running attempts for the job
  47.  * 5) Fail a few of them
  48.  * 6) Now fail a few trackers (ssh)
  49.  * 7) Job should run to completion
  50.  * 8) The above is repeated for the Sort suite of job (randomwriter, sort,
  51.  *    validator). All jobs must complete, and finally, the sort validation
  52.  *    should succeed.
  53.  * To run the test:
  54.  * ./bin/hadoop --config <config> jar
  55.  *   build/hadoop-<version>-test.jar MRReliabilityTest -libjars
  56.  *   build/hadoop-<version>-examples.jar [-scratchdir <dir>]"
  57.  *   
  58.  *   The scratchdir is optional and by default the current directory on the client
  59.  *   will be used as the scratch space. Note that password-less SSH must be set up 
  60.  *   between the client machine from where the test is submitted, and the cluster 
  61.  *   nodes where the test runs.
  62.  */
  63. public class ReliabilityTest extends Configured implements Tool {
  64.   private String dir;
  65.   private static final Log LOG = LogFactory.getLog(ReliabilityTest.class); 
  66.   private void displayUsage() {
  67.     LOG.info("This must be run in only the distributed mode " +
  68.      "(LocalJobRunner not supported).ntUsage: MRReliabilityTest " +
  69.      "-libjars <path to hadoop-examples.jar> [-scratchdir <dir>]" +
  70.      "n[-scratchdir] points to a scratch space on this host where temp" +
  71.      " files for this test will be created. Defaults to current working" +
  72.      " dir. nPasswordless SSH must be set up between this host and the" +
  73.      " nodes which the test is going to use");
  74.     System.exit(-1);
  75.   }
  76.   
  77.   public int run(String[] args) throws Exception {
  78.     Configuration conf = getConf();
  79.     if ("local".equals(conf.get("mapred.job.tracker", "local"))) {
  80.       displayUsage();
  81.     }
  82.     String[] otherArgs = 
  83.       new GenericOptionsParser(conf, args).getRemainingArgs();
  84.     if (otherArgs.length == 2) {
  85.       if (otherArgs[0].equals("-scratchdir")) {
  86.         dir = otherArgs[1];
  87.       } else {
  88.         displayUsage();
  89.       }
  90.     }
  91.     else if (otherArgs.length == 0) {
  92.       dir = System.getProperty("user.dir");
  93.     } else {
  94.       displayUsage();
  95.     }
  96.     
  97.     //to protect against the case of jobs failing even when multiple attempts
  98.     //fail, set some high values for the max attempts
  99.     conf.setInt("mapred.map.max.attempts", 10);
  100.     conf.setInt("mapred.reduce.max.attempts", 10);
  101.     runSleepJobTest(new JobClient(new JobConf(conf)), conf);
  102.     runSortJobTests(new JobClient(new JobConf(conf)), conf);
  103.     return 0;
  104.   }
  105.   
  106.   private void runSleepJobTest(final JobClient jc, final Configuration conf) 
  107.   throws Exception {
  108.     ClusterStatus c = jc.getClusterStatus();
  109.     int maxMaps = c.getMaxMapTasks() * 2;
  110.     int maxReduces = maxMaps;
  111.     int mapSleepTime = (int)c.getTTExpiryInterval();
  112.     int reduceSleepTime = mapSleepTime;
  113.     String[] sleepJobArgs = new String[] {     
  114.         "-m", Integer.toString(maxMaps), 
  115.         "-r", Integer.toString(maxReduces),
  116.         "-mt", Integer.toString(mapSleepTime),
  117.         "-rt", Integer.toString(reduceSleepTime)};
  118.     runTest(jc, conf, "org.apache.hadoop.examples.SleepJob", sleepJobArgs, 
  119.         new KillTaskThread(jc, 2, 0.2f, false, 2),
  120.         new KillTrackerThread(jc, 2, 0.4f, false, 1));
  121.     LOG.info("SleepJob done");
  122.   }
  123.   
  124.   private void runSortJobTests(final JobClient jc, final Configuration conf) 
  125.   throws Exception {
  126.     String inputPath = "my_reliability_test_input";
  127.     String outputPath = "my_reliability_test_output";
  128.     FileSystem fs = jc.getFs();
  129.     fs.delete(new Path(inputPath), true);
  130.     fs.delete(new Path(outputPath), true);
  131.     runRandomWriterTest(jc, conf, inputPath);
  132.     runSortTest(jc, conf, inputPath, outputPath);
  133.     runSortValidatorTest(jc, conf, inputPath, outputPath);
  134.   }
  135.   
  136.   private void runRandomWriterTest(final JobClient jc, 
  137.       final Configuration conf, final String inputPath) 
  138.   throws Exception {
  139.     runTest(jc, conf, "org.apache.hadoop.examples.RandomWriter", 
  140.         new String[]{inputPath}, 
  141.         null, new KillTrackerThread(jc, 0, 0.4f, false, 1));
  142.     LOG.info("RandomWriter job done");
  143.   }
  144.   
  145.   private void runSortTest(final JobClient jc, final Configuration conf,
  146.       final String inputPath, final String outputPath) 
  147.   throws Exception {
  148.     runTest(jc, conf, "org.apache.hadoop.examples.Sort", 
  149.         new String[]{inputPath, outputPath},
  150.         new KillTaskThread(jc, 2, 0.2f, false, 2),
  151.         new KillTrackerThread(jc, 2, 0.8f, false, 1));
  152.     LOG.info("Sort job done");
  153.   }
  154.   
  155.   private void runSortValidatorTest(final JobClient jc, 
  156.       final Configuration conf, final String inputPath, final String outputPath)
  157.   throws Exception {
  158.     runTest(jc, conf, "org.apache.hadoop.mapred.SortValidator", new String[] {
  159.         "-sortInput", inputPath, "-sortOutput", outputPath},
  160.         new KillTaskThread(jc, 2, 0.2f, false, 1),
  161.         new KillTrackerThread(jc, 2, 0.8f, false, 1));  
  162.     LOG.info("SortValidator job done");    
  163.   }
  164.   
  165.   private String normalizeCommandPath(String command) {
  166.     final String hadoopHome;
  167.     if ((hadoopHome = System.getenv("HADOOP_HOME")) != null) {
  168.       command = hadoopHome + "/" + command;
  169.     }
  170.     return command;
  171.   }
  172.   
  173.   private void checkJobExitStatus(int status, String jobName) {
  174.     if (status != 0) {
  175.       LOG.info(jobName + " job failed with status: " + status);
  176.       System.exit(status);
  177.     } else {
  178.       LOG.info(jobName + " done.");
  179.     }
  180.   }
  181.   //Starts the job in a thread. It also starts the taskKill/tasktrackerKill
  182.   //threads.
  183.   private void runTest(final JobClient jc, final Configuration conf,
  184.       final String jobClass, final String[] args, KillTaskThread killTaskThread,
  185.       KillTrackerThread killTrackerThread) throws Exception {
  186.     int prevJobsNum = jc.getAllJobs().length;
  187.     Thread t = new Thread("Job Test") {
  188.       public void run() {
  189.         try {
  190.           Class<?> jobClassObj = conf.getClassByName(jobClass);
  191.           int status = ToolRunner.run(conf, (Tool)(jobClassObj.newInstance()), 
  192.               args);
  193.           checkJobExitStatus(status, jobClass);
  194.         } catch (Exception e) {
  195.           LOG.fatal("JOB " + jobClass + " failed to run");
  196.           System.exit(-1);
  197.         }
  198.       }
  199.     };
  200.     t.setDaemon(true);
  201.     t.start();
  202.     JobStatus[] jobs;
  203.     //get the job ID. This is the job that we just submitted
  204.     while ((jobs = jc.getAllJobs()).length - prevJobsNum == 0) {
  205.       LOG.info("Waiting for the job " + jobClass +" to start");
  206.       Thread.sleep(1000);
  207.     }
  208.     JobID jobId = jobs[jobs.length - 1].getJobID();
  209.     RunningJob rJob = jc.getJob(jobId);
  210.     while (rJob.getJobState() == JobStatus.PREP) {
  211.       LOG.info("JobID : " + jobId + " not started RUNNING yet");
  212.       Thread.sleep(1000);
  213.       rJob = jc.getJob(jobId);
  214.     }
  215.     if (killTaskThread != null) {
  216.       killTaskThread.setRunningJob(rJob);
  217.       killTaskThread.start();
  218.       killTaskThread.join();
  219.       LOG.info("DONE WITH THE TASK KILL/FAIL TESTS");
  220.     }
  221.     if (killTrackerThread != null) {
  222.       killTrackerThread.setRunningJob(rJob);
  223.       killTrackerThread.start();
  224.       killTrackerThread.join();
  225.       LOG.info("DONE WITH THE TESTS TO DO WITH LOST TASKTRACKERS");
  226.     }
  227.     t.join();
  228.   }
  229.   
  230.   private class KillTrackerThread extends Thread {
  231.     private volatile boolean killed = false;
  232.     private JobClient jc;
  233.     private RunningJob rJob;
  234.     final private int thresholdMultiplier;
  235.     private float threshold = 0.2f;
  236.     private boolean onlyMapsProgress;
  237.     private int numIterations;
  238.     final private String slavesFile = dir + "/_reliability_test_slaves_file_";
  239.     final String shellCommand = normalizeCommandPath("bin/slaves.sh");
  240.     final private String STOP_COMMAND = "ps uwwx | grep java | grep " + 
  241.     "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
  242.     " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s STOP";
  243.     final private String RESUME_COMMAND = "ps uwwx | grep java | grep " + 
  244.     "org.apache.hadoop.mapred.TaskTracker"+ " |" + 
  245.     " grep -v grep | tr -s ' ' | cut -d ' ' -f2 | xargs kill -s CONT";
  246.     //Only one instance must be active at any point
  247.     public KillTrackerThread(JobClient jc, int threshaldMultiplier,
  248.         float threshold, boolean onlyMapsProgress, int numIterations) {
  249.       this.jc = jc;
  250.       this.thresholdMultiplier = threshaldMultiplier;
  251.       this.threshold = threshold;
  252.       this.onlyMapsProgress = onlyMapsProgress;
  253.       this.numIterations = numIterations;
  254.       setDaemon(true);
  255.     }
  256.     public void setRunningJob(RunningJob rJob) {
  257.       this.rJob = rJob;
  258.     }
  259.     public void kill() {
  260.       killed = true;
  261.     }
  262.     public void run() {
  263.       stopStartTrackers(true);
  264.       if (!onlyMapsProgress) {
  265.         stopStartTrackers(false);
  266.       }
  267.     }
  268.     private void stopStartTrackers(boolean considerMaps) {
  269.       if (considerMaps) {
  270.         LOG.info("Will STOP/RESUME tasktrackers based on Maps'" +
  271.                 " progress");
  272.       } else {
  273.         LOG.info("Will STOP/RESUME tasktrackers based on " +
  274.                 "Reduces' progress");
  275.       }
  276.       LOG.info("Initial progress threshold: " + threshold + 
  277.           ". Threshold Multiplier: " + thresholdMultiplier + 
  278.           ". Number of iterations: " + numIterations);
  279.       float thresholdVal = threshold;
  280.       int numIterationsDone = 0;
  281.       while (!killed) {
  282.         try {
  283.           float progress;
  284.           if (jc.getJob(rJob.getID()).isComplete() ||
  285.               numIterationsDone == numIterations) {
  286.             break;
  287.           }
  288.           if (considerMaps) {
  289.             progress = jc.getJob(rJob.getID()).mapProgress();
  290.           } else {
  291.             progress = jc.getJob(rJob.getID()).reduceProgress();
  292.           }
  293.           if (progress >= thresholdVal) {
  294.             numIterationsDone++;
  295.             ClusterStatus c;
  296.             stopTaskTrackers((c = jc.getClusterStatus(true)));
  297.             Thread.sleep((int)Math.ceil(1.5 * c.getTTExpiryInterval()));
  298.             startTaskTrackers();
  299.             thresholdVal = thresholdVal * thresholdMultiplier;
  300.           }
  301.           Thread.sleep(5000);
  302.         } catch (InterruptedException ie) {
  303.           killed = true;
  304.           return;
  305.         } catch (Exception e) {
  306.           LOG.fatal(StringUtils.stringifyException(e));
  307.         }
  308.       }
  309.     }
  310.     private void stopTaskTrackers(ClusterStatus c) throws Exception {
  311.       Collection <String> trackerNames = c.getActiveTrackerNames();
  312.       ArrayList<String> trackerNamesList = new ArrayList<String>(trackerNames);
  313.       Collections.shuffle(trackerNamesList);
  314.       int count = 0;
  315.       FileOutputStream fos = new FileOutputStream(new File(slavesFile));
  316.       LOG.info(new Date() + " Stopping a few trackers");
  317.       for (String tracker : trackerNamesList) {
  318.         String host = convertTrackerNameToHostName(tracker);
  319.         LOG.info(new Date() + " Marking tracker on host: " + host);
  320.         fos.write((host + "n").getBytes());
  321.         if (count++ >= trackerNamesList.size()/2) {
  322.           break;
  323.         }
  324.       }
  325.       fos.close();
  326.       runOperationOnTT("suspend");
  327.     }
  328.     private void startTaskTrackers() throws Exception {
  329.       LOG.info(new Date() + " Resuming the stopped trackers");
  330.       runOperationOnTT("resume");
  331.       new File(slavesFile).delete();
  332.     }
  333.     
  334.     private void runOperationOnTT(String operation) throws IOException {
  335.       Map<String,String> hMap = new HashMap<String,String>();
  336.       hMap.put("HADOOP_SLAVES", slavesFile);
  337.       StringTokenizer strToken;
  338.       if (operation.equals("suspend")) {
  339.         strToken = new StringTokenizer(STOP_COMMAND, " ");
  340.       } else {
  341.         strToken = new StringTokenizer(RESUME_COMMAND, " ");
  342.       }
  343.       String commandArgs[] = new String[strToken.countTokens() + 1];
  344.       int i = 0;
  345.       commandArgs[i++] = shellCommand;
  346.       while (strToken.hasMoreTokens()) {
  347.         commandArgs[i++] = strToken.nextToken();
  348.       }
  349.       String output = Shell.execCommand(hMap, commandArgs);
  350.       if (output != null && !output.equals("")) {
  351.         LOG.info(output);
  352.       }
  353.     }
  354.     private String convertTrackerNameToHostName(String trackerName) {
  355.       // Convert the trackerName to it's host name
  356.       int indexOfColon = trackerName.indexOf(":");
  357.       String trackerHostName = (indexOfColon == -1) ? 
  358.           trackerName : 
  359.             trackerName.substring(0, indexOfColon);
  360.       return trackerHostName.substring("tracker_".length());
  361.     }
  362.   }
  363.   
  364.   private class KillTaskThread extends Thread {
  365.     private volatile boolean killed = false;
  366.     private RunningJob rJob;
  367.     private JobClient jc;
  368.     final private int thresholdMultiplier;
  369.     private float threshold = 0.2f;
  370.     private boolean onlyMapsProgress;
  371.     private int numIterations;
  372.     public KillTaskThread(JobClient jc, int thresholdMultiplier, 
  373.         float threshold, boolean onlyMapsProgress, int numIterations) {
  374.       this.jc = jc;
  375.       this.thresholdMultiplier = thresholdMultiplier;
  376.       this.threshold = threshold;
  377.       this.onlyMapsProgress = onlyMapsProgress;
  378.       this.numIterations = numIterations;
  379.       setDaemon(true);
  380.     }
  381.     public void setRunningJob(RunningJob rJob) {
  382.       this.rJob = rJob;
  383.     }
  384.     public void kill() {
  385.       killed = true;
  386.     }
  387.     public void run() {
  388.       killBasedOnProgress(true);
  389.       if (!onlyMapsProgress) {
  390.         killBasedOnProgress(false);
  391.       }
  392.     }
  393.     private void killBasedOnProgress(boolean considerMaps) {
  394.       boolean fail = false;
  395.       if (considerMaps) {
  396.         LOG.info("Will kill tasks based on Maps' progress");
  397.       } else {
  398.         LOG.info("Will kill tasks based on Reduces' progress");
  399.       }
  400.       LOG.info("Initial progress threshold: " + threshold + 
  401.           ". Threshold Multiplier: " + thresholdMultiplier + 
  402.           ". Number of iterations: " + numIterations);
  403.       float thresholdVal = threshold;
  404.       int numIterationsDone = 0;
  405.       while (!killed) {
  406.         try {
  407.           float progress;
  408.           if (jc.getJob(rJob.getID()).isComplete() || 
  409.               numIterationsDone == numIterations) {
  410.             break;
  411.           }
  412.           if (considerMaps) {
  413.             progress = jc.getJob(rJob.getID()).mapProgress();
  414.           } else {
  415.             progress = jc.getJob(rJob.getID()).reduceProgress();
  416.           }
  417.           if (progress >= thresholdVal) {
  418.             numIterationsDone++;
  419.             if (numIterationsDone > 0 && numIterationsDone % 2 == 0) {
  420.               fail = true; //fail tasks instead of kill
  421.             }
  422.             ClusterStatus c = jc.getClusterStatus();
  423.             LOG.info(new Date() + " Killing a few tasks");
  424.             Collection<TaskAttemptID> runningTasks =
  425.               new ArrayList<TaskAttemptID>();
  426.             TaskReport mapReports[] = jc.getMapTaskReports(rJob.getID());
  427.             for (TaskReport mapReport : mapReports) {
  428.               if (mapReport.getCurrentStatus() == TIPStatus.RUNNING) {
  429.                 runningTasks.addAll(mapReport.getRunningTaskAttempts());
  430.               }
  431.             }
  432.             if (runningTasks.size() > c.getTaskTrackers()/2) {
  433.               int count = 0;
  434.               for (TaskAttemptID t : runningTasks) {
  435.                 LOG.info(new Date() + " Killed task : " + t);
  436.                 rJob.killTask(t, fail);
  437.                 if (count++ > runningTasks.size()/2) { //kill 50%
  438.                   break;
  439.                 }
  440.               }
  441.             }
  442.             runningTasks.clear();
  443.             TaskReport reduceReports[] = jc.getReduceTaskReports(rJob.getID());
  444.             for (TaskReport reduceReport : reduceReports) {
  445.               if (reduceReport.getCurrentStatus() == TIPStatus.RUNNING) {
  446.                 runningTasks.addAll(reduceReport.getRunningTaskAttempts());
  447.               }
  448.             }
  449.             if (runningTasks.size() > c.getTaskTrackers()/2) {
  450.               int count = 0;
  451.               for (TaskAttemptID t : runningTasks) {
  452.                 LOG.info(new Date() + " Killed task : " + t);
  453.                 rJob.killTask(t, fail);
  454.                 if (count++ > runningTasks.size()/2) { //kill 50%
  455.                   break;
  456.                 }
  457.               }
  458.             }
  459.             thresholdVal = thresholdVal * thresholdMultiplier;
  460.           }
  461.           Thread.sleep(5000);
  462.         } catch (InterruptedException ie) {
  463.           killed = true;
  464.         } catch (Exception e) {
  465.           LOG.fatal(StringUtils.stringifyException(e));
  466.         }
  467.       }
  468.     }
  469.   }
  470.   
  471.   public static void main(String args[]) throws Exception {
  472.     int res = ToolRunner.run(new Configuration(), new ReliabilityTest(), args);
  473.     System.exit(res);
  474.   }
  475. }