ControlledMapReduceJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.Iterator;
  24. import java.util.Random;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.apache.hadoop.conf.Configuration;
  28. import org.apache.hadoop.conf.Configured;
  29. import org.apache.hadoop.fs.FileStatus;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.fs.PathFilter;
  33. import org.apache.hadoop.io.BytesWritable;
  34. import org.apache.hadoop.io.IntWritable;
  35. import org.apache.hadoop.io.NullWritable;
  36. import org.apache.hadoop.io.SequenceFile;
  37. import org.apache.hadoop.io.SequenceFile.CompressionType;
  38. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  39. import org.apache.hadoop.util.StringUtils;
  40. import org.apache.hadoop.util.Tool;
  41. import org.apache.hadoop.util.ToolRunner;
  42. /**
  43.  * A Controlled Map/Reduce Job. The tasks are controlled by the presence of
  44.  * particularly named files in the directory signalFileDir on the file-system
  45.  * that the job is configured to work with. Tasks get scheduled by the
  46.  * scheduler, occupy the slots on the TaskTrackers and keep running till the
  47.  * user gives a signal via files whose names are of the form MAPS_[0-9]* and
  48.  * REDUCES_[0-9]*. For e.g., whenever the map tasks see that a file name MAPS_5
  49.  * is created in the singalFileDir, all the maps whose TaskAttemptIDs are below
  50.  * 4 get finished. At any time, there should be only one MAPS_[0-9]* file and
  51.  * only one REDUCES_[0-9]* file in the singnalFileDir. In the beginning MAPS_0
  52.  * and REDUCE_0 files are present, and further signals are given by renaming
  53.  * these files.
  54.  * 
  55.  */
  56. class ControlledMapReduceJob extends Configured implements Tool,
  57.     Mapper<NullWritable, NullWritable, IntWritable, NullWritable>,
  58.     Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
  59.     Partitioner<IntWritable, NullWritable>,
  60.     InputFormat<NullWritable, NullWritable> {
  61.   static final Log LOG = LogFactory.getLog(ControlledMapReduceJob.class);
  62.   private FileSystem fs = null;
  63.   private int taskNumber;
  64.   private static ArrayList<Path> signalFileDirCache = new ArrayList<Path>();
  65.   private Path signalFileDir;
  66.   {
  67.     Random random = new Random();
  68.     signalFileDir = new Path("signalFileDir-" + random.nextLong());
  69.     while (signalFileDirCache.contains(signalFileDir)) {
  70.       signalFileDir = new Path("signalFileDir-" + random.nextLong());
  71.     }
  72.     signalFileDirCache.add(signalFileDir);
  73.   }
  74.   private long mapsFinished = 0;
  75.   private long reducesFinished = 0;
  76.   private RunningJob rJob = null;
  77.   private int numMappers;
  78.   private int numReducers;
  79.   private final String MAP_SIGFILE_PREFIX = "MAPS_";
  80.   private final String REDUCE_SIGFILE_PREFIX = "REDUCES_";
  81.   private void initialize()
  82.       throws IOException {
  83.     fs = FileSystem.get(getConf());
  84.     fs.mkdirs(signalFileDir);
  85.     writeFile(new Path(signalFileDir, MAP_SIGFILE_PREFIX + mapsFinished));
  86.     writeFile(new Path(signalFileDir, REDUCE_SIGFILE_PREFIX + reducesFinished));
  87.   }
  88.   /**
  89.    * Finish N number of maps/reduces.
  90.    * 
  91.    * @param isMap
  92.    * @param noOfTasksToFinish
  93.    * @throws IOException
  94.    */
  95.   public void finishNTasks(boolean isMap, int noOfTasksToFinish)
  96.       throws IOException {
  97.     if (noOfTasksToFinish < 0) {
  98.       throw new IOException(
  99.           "Negative values for noOfTasksToFinish not acceptable");
  100.     }
  101.     if (noOfTasksToFinish == 0) {
  102.       return;
  103.     }
  104.     LOG.info("Going to finish off " + noOfTasksToFinish);
  105.     String PREFIX = isMap ? MAP_SIGFILE_PREFIX : REDUCE_SIGFILE_PREFIX;
  106.     long tasksFinished = isMap ? mapsFinished : reducesFinished;
  107.     Path oldSignalFile =
  108.         new Path(signalFileDir, PREFIX + String.valueOf(tasksFinished));
  109.     Path newSignalFile =
  110.         new Path(signalFileDir, PREFIX
  111.             + String.valueOf(tasksFinished + noOfTasksToFinish));
  112.     fs.rename(oldSignalFile, newSignalFile);
  113.     if (isMap) {
  114.       mapsFinished += noOfTasksToFinish;
  115.     } else {
  116.       reducesFinished += noOfTasksToFinish;
  117.     }
  118.     LOG.info("Successfully sent signal to finish off " + noOfTasksToFinish);
  119.   }
  120.   /**
  121.    * Finished all tasks of type determined by isMap
  122.    * 
  123.    * @param isMap
  124.    * @throws IOException
  125.    */
  126.   public void finishAllTasks(boolean isMap)
  127.       throws IOException {
  128.     finishNTasks(isMap, (isMap ? numMappers : numReducers));
  129.   }
  130.   /**
  131.    * Finish the job
  132.    * 
  133.    * @throws IOException
  134.    */
  135.   public void finishJob()
  136.       throws IOException {
  137.     finishAllTasks(true);
  138.     finishAllTasks(false);
  139.   }
  140.   /**
  141.    * Wait till noOfTasksToBeRunning number of tasks of type specified by isMap
  142.    * started running. This currently uses a jip object and directly uses its api
  143.    * to determine the number of tasks running.
  144.    * 
  145.    * <p>
  146.    * 
  147.    * TODO: It should eventually use a JobID and then get the information from
  148.    * the JT to check the number of running tasks.
  149.    * 
  150.    * @param jip
  151.    * @param isMap
  152.    * @param noOfTasksToBeRunning
  153.    */
  154.   static void waitTillNTasksStartRunning(JobInProgress jip, boolean isMap,
  155.       int noOfTasksToBeRunning)
  156.       throws InterruptedException {
  157.     int numTasks = 0;
  158.     while (numTasks != noOfTasksToBeRunning) {
  159.       Thread.sleep(1000);
  160.       numTasks = isMap ? jip.runningMaps() : jip.runningReduces();
  161.       LOG.info("Waiting till " + noOfTasksToBeRunning
  162.           + (isMap ? " map" : " reduce") + " tasks of the job "
  163.           + jip.getJobID() + " start running. " + numTasks
  164.           + " tasks already started running.");
  165.     }
  166.   }
  167.   /**
  168.    * Make sure that the number of tasks of type specified by isMap running in
  169.    * the given job is the same as noOfTasksToBeRunning
  170.    * 
  171.    * <p>
  172.    * 
  173.    * TODO: It should eventually use a JobID and then get the information from
  174.    * the JT to check the number of running tasks.
  175.    * 
  176.    * @param jip
  177.    * @param isMap
  178.    * @param noOfTasksToBeRunning
  179.    */
  180.   static void assertNumTasksRunning(JobInProgress jip, boolean isMap,
  181.       int noOfTasksToBeRunning)
  182.       throws Exception {
  183.     if ((isMap ? jip.runningMaps() : jip.runningReduces()) != noOfTasksToBeRunning) {
  184.       throw new Exception("Number of tasks running is not "
  185.           + noOfTasksToBeRunning);
  186.     }
  187.   }
  188.   /**
  189.    * Wait till noOfTasksToFinish number of tasks of type specified by isMap
  190.    * are finished. This currently uses a jip object and directly uses its api to
  191.    * determine the number of tasks finished.
  192.    * 
  193.    * <p>
  194.    * 
  195.    * TODO: It should eventually use a JobID and then get the information from
  196.    * the JT to check the number of finished tasks.
  197.    * 
  198.    * @param jip
  199.    * @param isMap
  200.    * @param noOfTasksToFinish
  201.    * @throws InterruptedException
  202.    */
  203.   static void waitTillNTotalTasksFinish(JobInProgress jip, boolean isMap,
  204.       int noOfTasksToFinish)
  205.       throws InterruptedException {
  206.     int noOfTasksAlreadyFinished = 0;
  207.     while (noOfTasksAlreadyFinished < noOfTasksToFinish) {
  208.       Thread.sleep(1000);
  209.       noOfTasksAlreadyFinished =
  210.           (isMap ? jip.finishedMaps() : jip.finishedReduces());
  211.       LOG.info("Waiting till " + noOfTasksToFinish
  212.           + (isMap ? " map" : " reduce") + " tasks of the job "
  213.           + jip.getJobID() + " finish. " + noOfTasksAlreadyFinished
  214.           + " tasks already got finished.");
  215.     }
  216.   }
  217.   /**
  218.    * Have all the tasks of type specified by isMap finished in this job?
  219.    * 
  220.    * @param jip
  221.    * @param isMap
  222.    * @return true if finished, false otherwise
  223.    */
  224.   static boolean haveAllTasksFinished(JobInProgress jip, boolean isMap) {
  225.     return ((isMap ? jip.runningMaps() : jip.runningReduces()) == 0);
  226.   }
  227.   private void writeFile(Path name)
  228.       throws IOException {
  229.     Configuration conf = new Configuration(false);
  230.     SequenceFile.Writer writer =
  231.         SequenceFile.createWriter(fs, conf, name, BytesWritable.class,
  232.             BytesWritable.class, CompressionType.NONE);
  233.     writer.append(new BytesWritable(), new BytesWritable());
  234.     writer.close();
  235.   }
  236.   @Override
  237.   public void configure(JobConf conf) {
  238.     try {
  239.       signalFileDir = new Path(conf.get("signal.dir.path"));
  240.       numReducers = conf.getNumReduceTasks();
  241.       fs = FileSystem.get(conf);
  242.       String taskAttemptId = conf.get("mapred.task.id");
  243.       if (taskAttemptId != null) {
  244.         TaskAttemptID taskAttemptID = TaskAttemptID.forName(taskAttemptId);
  245.         taskNumber = taskAttemptID.getTaskID().getId();
  246.       }
  247.     } catch (IOException ioe) {
  248.       LOG.warn("Caught exception " + ioe);
  249.     }
  250.   }
  251.   private FileStatus[] listSignalFiles(FileSystem fileSys, final boolean isMap)
  252.       throws IOException {
  253.     return fileSys.globStatus(new Path(signalFileDir.toString() + "/*"),
  254.         new PathFilter() {
  255.           @Override
  256.           public boolean accept(Path path) {
  257.             if (isMap && path.getName().startsWith(MAP_SIGFILE_PREFIX)) {
  258.               LOG.debug("Found signal file : " + path.getName());
  259.               return true;
  260.             } else if (!isMap
  261.                 && path.getName().startsWith(REDUCE_SIGFILE_PREFIX)) {
  262.               LOG.debug("Found signal file : " + path.getName());
  263.               return true;
  264.             }
  265.             LOG.info("Didn't find any relevant signal files.");
  266.             return false;
  267.           }
  268.         });
  269.   }
  270.   @Override
  271.   public void map(NullWritable key, NullWritable value,
  272.       OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
  273.       throws IOException {
  274.     LOG.info(taskNumber + " has started.");
  275.     FileStatus[] files = listSignalFiles(fs, true);
  276.     String[] sigFileComps = files[0].getPath().getName().split("_");
  277.     String signalType = sigFileComps[0];
  278.     int noOfTasks = Integer.parseInt(sigFileComps[1]);
  279.     while (!signalType.equals("MAPS") || taskNumber + 1 > noOfTasks) {
  280.       LOG.info("Signal type found : " + signalType
  281.           + " .Number of tasks to be finished by this signal : " + noOfTasks
  282.           + " . My id : " + taskNumber);
  283.       LOG.info(taskNumber + " is still alive.");
  284.       try {
  285.         reporter.progress();
  286.         Thread.sleep(1000);
  287.       } catch (InterruptedException ie) {
  288.         LOG.info(taskNumber + " is still alive.");
  289.         break;
  290.       }
  291.       files = listSignalFiles(fs, true);
  292.       sigFileComps = files[0].getPath().getName().split("_");
  293.       signalType = sigFileComps[0];
  294.       noOfTasks = Integer.parseInt(sigFileComps[1]);
  295.     }
  296.     LOG.info("Signal type found : " + signalType
  297.         + " .Number of tasks to be finished by this signal : " + noOfTasks
  298.         + " . My id : " + taskNumber);
  299.     // output numReduce number of random values, so that
  300.     // each reducer will get one key each.
  301.     for (int i = 0; i < numReducers; i++) {
  302.       output.collect(new IntWritable(i), NullWritable.get());
  303.     }
  304.     LOG.info(taskNumber + " is finished.");
  305.   }
  306.   @Override
  307.   public void reduce(IntWritable key, Iterator<NullWritable> values,
  308.       OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
  309.       throws IOException {
  310.     LOG.info(taskNumber + " has started.");
  311.     FileStatus[] files = listSignalFiles(fs, false);
  312.     String[] sigFileComps = files[0].getPath().getName().split("_");
  313.     String signalType = sigFileComps[0];
  314.     int noOfTasks = Integer.parseInt(sigFileComps[1]);
  315.     while (!signalType.equals("REDUCES") || taskNumber + 1 > noOfTasks) {
  316.       LOG.info("Signal type found : " + signalType
  317.           + " .Number of tasks to be finished by this signal : " + noOfTasks
  318.           + " . My id : " + taskNumber);
  319.       LOG.info(taskNumber + " is still alive.");
  320.       try {
  321.         reporter.progress();
  322.         Thread.sleep(1000);
  323.       } catch (InterruptedException ie) {
  324.         LOG.info(taskNumber + " is still alive.");
  325.         break;
  326.       }
  327.       files = listSignalFiles(fs, false);
  328.       sigFileComps = files[0].getPath().getName().split("_");
  329.       signalType = sigFileComps[0];
  330.       noOfTasks = Integer.parseInt(sigFileComps[1]);
  331.     }
  332.     LOG.info("Signal type found : " + signalType
  333.         + " .Number of tasks to be finished by this signal : " + noOfTasks
  334.         + " . My id : " + taskNumber);
  335.     LOG.info(taskNumber + " is finished.");
  336.   }
  337.   @Override
  338.   public void close()
  339.       throws IOException {
  340.     // nothing
  341.   }
  342.   public JobID getJobId() {
  343.     if (rJob == null) {
  344.       return null;
  345.     }
  346.     return rJob.getID();
  347.   }
  348.   public int run(int numMapper, int numReducer)
  349.       throws IOException {
  350.     JobConf conf =
  351.         getControlledMapReduceJobConf(getConf(), numMapper, numReducer);
  352.     JobClient client = new JobClient(conf);
  353.     rJob = client.submitJob(conf);
  354.     while (!rJob.isComplete()) {
  355.       try {
  356.         Thread.sleep(1000);
  357.       } catch (InterruptedException ie) {
  358.         break;
  359.       }
  360.     }
  361.     if (rJob.isSuccessful()) {
  362.       return 0;
  363.     }
  364.     return 1;
  365.   }
  366.   private JobConf getControlledMapReduceJobConf(Configuration clusterConf,
  367.       int numMapper, int numReducer)
  368.       throws IOException {
  369.     setConf(clusterConf);
  370.     initialize();
  371.     JobConf conf = new JobConf(getConf(), ControlledMapReduceJob.class);
  372.     conf.setJobName("ControlledJob");
  373.     conf.set("signal.dir.path", signalFileDir.toString());
  374.     conf.setNumMapTasks(numMapper);
  375.     conf.setNumReduceTasks(numReducer);
  376.     conf.setMapperClass(ControlledMapReduceJob.class);
  377.     conf.setMapOutputKeyClass(IntWritable.class);
  378.     conf.setMapOutputValueClass(NullWritable.class);
  379.     conf.setReducerClass(ControlledMapReduceJob.class);
  380.     conf.setOutputKeyClass(NullWritable.class);
  381.     conf.setOutputValueClass(NullWritable.class);
  382.     conf.setInputFormat(ControlledMapReduceJob.class);
  383.     FileInputFormat.addInputPath(conf, new Path("ignored"));
  384.     conf.setOutputFormat(NullOutputFormat.class);
  385.     // Set the following for reduce tasks to be able to be started running
  386.     // immediately along with maps.
  387.     conf.set("mapred.reduce.slowstart.completed.maps", String.valueOf(0));
  388.     return conf;
  389.   }
  390.   @Override
  391.   public int run(String[] args)
  392.       throws Exception {
  393.     numMappers = Integer.parseInt(args[0]);
  394.     numReducers = Integer.parseInt(args[1]);
  395.     return run(numMappers, numReducers);
  396.   }
  397.   @Override
  398.   public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
  399.     return k.get() % numPartitions;
  400.   }
  401.   @Override
  402.   public RecordReader<NullWritable, NullWritable> getRecordReader(
  403.       InputSplit split, JobConf job, Reporter reporter) {
  404.     LOG.debug("Inside RecordReader.getRecordReader");
  405.     return new RecordReader<NullWritable, NullWritable>() {
  406.       private int pos = 0;
  407.       public void close() {
  408.         // nothing
  409.       }
  410.       public NullWritable createKey() {
  411.         return NullWritable.get();
  412.       }
  413.       public NullWritable createValue() {
  414.         return NullWritable.get();
  415.       }
  416.       public long getPos() {
  417.         return pos;
  418.       }
  419.       public float getProgress() {
  420.         return pos * 100;
  421.       }
  422.       public boolean next(NullWritable key, NullWritable value) {
  423.         if (pos++ == 0) {
  424.           LOG.debug("Returning the next record");
  425.           return true;
  426.         }
  427.         LOG.debug("No more records. Returning none.");
  428.         return false;
  429.       }
  430.     };
  431.   }
  432.   @Override
  433.   public InputSplit[] getSplits(JobConf job, int numSplits) {
  434.     LOG.debug("Inside InputSplit.getSplits");
  435.     InputSplit[] ret = new InputSplit[numSplits];
  436.     for (int i = 0; i < numSplits; ++i) {
  437.       ret[i] = new EmptySplit();
  438.     }
  439.     return ret;
  440.   }
  441.   public static class EmptySplit implements InputSplit {
  442.     public void write(DataOutput out)
  443.         throws IOException {
  444.     }
  445.     public void readFields(DataInput in)
  446.         throws IOException {
  447.     }
  448.     public long getLength() {
  449.       return 0L;
  450.     }
  451.     public String[] getLocations() {
  452.       return new String[0];
  453.     }
  454.   }
  455.   static class ControlledMapReduceJobRunner extends Thread {
  456.     private JobConf conf;
  457.     private ControlledMapReduceJob job;
  458.     private JobID jobID;
  459.     private int numMappers;
  460.     private int numReducers;
  461.     public ControlledMapReduceJobRunner() {
  462.       this(new JobConf(), 5, 5);
  463.     }
  464.     public ControlledMapReduceJobRunner(JobConf cnf, int numMap, int numRed) {
  465.       this.conf = cnf;
  466.       this.numMappers = numMap;
  467.       this.numReducers = numRed;
  468.     }
  469.     public ControlledMapReduceJob getJob() {
  470.       while (job == null) {
  471.         try {
  472.           Thread.sleep(1000);
  473.         } catch (InterruptedException ie) {
  474.           LOG.info(ControlledMapReduceJobRunner.class.getName()
  475.               + " is interrupted.");
  476.           break;
  477.         }
  478.       }
  479.       return job;
  480.     }
  481.     public JobID getJobID()
  482.         throws IOException {
  483.       ControlledMapReduceJob job = getJob();
  484.       JobID id = job.getJobId();
  485.       while (id == null) {
  486.         id = job.getJobId();
  487.         try {
  488.           Thread.sleep(1000);
  489.         } catch (InterruptedException ie) {
  490.           LOG.info(ControlledMapReduceJobRunner.class.getName()
  491.               + " is interrupted.");
  492.           break;
  493.         }
  494.       }
  495.       return id;
  496.     }
  497.     @Override
  498.     public void run() {
  499.       if (job != null) {
  500.         LOG.warn("Job is already running.");
  501.         return;
  502.       }
  503.       try {
  504.         job = new ControlledMapReduceJob();
  505.         int ret =
  506.             ToolRunner.run(this.conf, job, new String[] {
  507.                 String.valueOf(numMappers), String.valueOf(numReducers) });
  508.         LOG.info("Return value for the job : " + ret);
  509.       } catch (Exception e) {
  510.         LOG.warn("Caught exception : " + StringUtils.stringifyException(e));
  511.       }
  512.     }
  513.     static ControlledMapReduceJobRunner getControlledMapReduceJobRunner(
  514.         JobConf conf, int numMappers, int numReducers) {
  515.       return new ControlledMapReduceJobRunner(conf, numMappers, numReducers);
  516.     }
  517.   }
  518. }