Task.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:40k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.lang.reflect.Constructor;
  23. import java.lang.reflect.InvocationTargetException;
  24. import java.text.NumberFormat;
  25. import java.util.HashMap;
  26. import java.util.Iterator;
  27. import java.util.Map;
  28. import java.util.NoSuchElementException;
  29. import java.util.concurrent.atomic.AtomicBoolean;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import org.apache.hadoop.conf.Configurable;
  33. import org.apache.hadoop.conf.Configuration;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.fs.LocalDirAllocator;
  36. import org.apache.hadoop.fs.Path;
  37. import org.apache.hadoop.fs.FileSystem.Statistics;
  38. import org.apache.hadoop.io.DataInputBuffer;
  39. import org.apache.hadoop.io.RawComparator;
  40. import org.apache.hadoop.io.Text;
  41. import org.apache.hadoop.io.Writable;
  42. import org.apache.hadoop.io.serializer.Deserializer;
  43. import org.apache.hadoop.io.serializer.SerializationFactory;
  44. import org.apache.hadoop.mapred.Counters.Counter;
  45. import org.apache.hadoop.mapred.IFile.Writer;
  46. import org.apache.hadoop.net.NetUtils;
  47. import org.apache.hadoop.util.Progress;
  48. import org.apache.hadoop.util.Progressable;
  49. import org.apache.hadoop.util.ReflectionUtils;
  50. import org.apache.hadoop.util.StringUtils;
  51. /** Base class for tasks. */
  52. abstract class Task implements Writable, Configurable {
  53.   private static final Log LOG =
  54.     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
  55.   // Counters used by Task subclasses
  56.   protected static enum Counter { 
  57.     MAP_INPUT_RECORDS, 
  58.     MAP_OUTPUT_RECORDS,
  59.     MAP_SKIPPED_RECORDS,
  60.     MAP_INPUT_BYTES, 
  61.     MAP_OUTPUT_BYTES,
  62.     COMBINE_INPUT_RECORDS,
  63.     COMBINE_OUTPUT_RECORDS,
  64.     REDUCE_INPUT_GROUPS,
  65.     REDUCE_SHUFFLE_BYTES,
  66.     REDUCE_INPUT_RECORDS,
  67.     REDUCE_OUTPUT_RECORDS,
  68.     REDUCE_SKIPPED_GROUPS,
  69.     REDUCE_SKIPPED_RECORDS,
  70.     SPILLED_RECORDS
  71.   }
  72.   
  73.   /**
  74.    * Counters to measure the usage of the different file systems.
  75.    * Always return the String array with two elements. First one is the name of  
  76.    * BYTES_READ counter and second one is of the BYTES_WRITTEN counter.
  77.    */
  78.   protected static String[] getFileSystemCounterNames(String uriScheme) {
  79.     String scheme = uriScheme.toUpperCase();
  80.     return new String[]{scheme+"_BYTES_READ", scheme+"_BYTES_WRITTEN"};
  81.   }
  82.   
  83.   /**
  84.    * Name of the FileSystem counters' group
  85.    */
  86.   protected static final String FILESYSTEM_COUNTER_GROUP = "FileSystemCounters";
  87.   ///////////////////////////////////////////////////////////
  88.   // Helper methods to construct task-output paths
  89.   ///////////////////////////////////////////////////////////
  90.   
  91.   /** Construct output file names so that, when an output directory listing is
  92.    * sorted lexicographically, positions correspond to output partitions.*/
  93.   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  94.   static {
  95.     NUMBER_FORMAT.setMinimumIntegerDigits(5);
  96.     NUMBER_FORMAT.setGroupingUsed(false);
  97.   }
  98.   static synchronized String getOutputName(int partition) {
  99.     return "part-" + NUMBER_FORMAT.format(partition);
  100.   }
  101.   ////////////////////////////////////////////
  102.   // Fields
  103.   ////////////////////////////////////////////
  104.   private String jobFile;                         // job configuration file
  105.   private TaskAttemptID taskId;                   // unique, includes job id
  106.   private int partition;                          // id within job
  107.   TaskStatus taskStatus;                          // current status of the task
  108.   protected boolean jobCleanup = false;
  109.   protected boolean jobSetup = false;
  110.   protected boolean taskCleanup = false;
  111.   
  112.   //skip ranges based on failed ranges from previous attempts
  113.   private SortedRanges skipRanges = new SortedRanges();
  114.   private boolean skipping = false;
  115.   private boolean writeSkipRecs = true;
  116.   
  117.   //currently processing record start index
  118.   private volatile long currentRecStartIndex; 
  119.   private Iterator<Long> currentRecIndexIterator = 
  120.     skipRanges.skipRangeIterator();
  121.   
  122.   protected JobConf conf;
  123.   protected MapOutputFile mapOutputFile = new MapOutputFile();
  124.   protected LocalDirAllocator lDirAlloc;
  125.   private final static int MAX_RETRIES = 10;
  126.   protected JobContext jobContext;
  127.   protected TaskAttemptContext taskContext;
  128.   protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
  129.   protected org.apache.hadoop.mapreduce.OutputCommitter committer;
  130.   protected final Counters.Counter spilledRecordsCounter;
  131.   private String pidFile = "";
  132.   ////////////////////////////////////////////
  133.   // Constructors
  134.   ////////////////////////////////////////////
  135.   public Task() {
  136.     taskStatus = TaskStatus.createTaskStatus(isMapTask());
  137.     taskId = new TaskAttemptID();
  138.     spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
  139.   }
  140.   public Task(String jobFile, TaskAttemptID taskId, int partition) {
  141.     this.jobFile = jobFile;
  142.     this.taskId = taskId;
  143.      
  144.     this.partition = partition;
  145.     this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
  146.                                                   0.0f, 
  147.                                                   TaskStatus.State.UNASSIGNED, 
  148.                                                   "", "", "", 
  149.                                                   isMapTask() ? 
  150.                                                     TaskStatus.Phase.MAP : 
  151.                                                     TaskStatus.Phase.SHUFFLE, 
  152.                                                   counters);
  153.     this.mapOutputFile.setJobId(taskId.getJobID());
  154.     spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
  155.   }
  156.   ////////////////////////////////////////////
  157.   // Accessors
  158.   ////////////////////////////////////////////
  159.   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
  160.   public String getJobFile() { return jobFile; }
  161.   public TaskAttemptID getTaskID() { return taskId; }
  162.   Counters getCounters() { return counters; }
  163.   public void setPidFile(String pidFile) { 
  164.     this.pidFile = pidFile; 
  165.   }
  166.   public String getPidFile() { 
  167.     return pidFile; 
  168.   }
  169.   
  170.   /**
  171.    * Get the job name for this task.
  172.    * @return the job name
  173.    */
  174.   public JobID getJobID() {
  175.     return taskId.getJobID();
  176.   }
  177.   
  178.   /**
  179.    * Get the index of this task within the job.
  180.    * @return the integer part of the task id
  181.    */
  182.   public int getPartition() {
  183.     return partition;
  184.   }
  185.   /**
  186.    * Return current phase of the task. 
  187.    * needs to be synchronized as communication thread sends the phase every second
  188.    * @return
  189.    */
  190.   public synchronized TaskStatus.Phase getPhase(){
  191.     return this.taskStatus.getPhase(); 
  192.   }
  193.   /**
  194.    * Set current phase of the task. 
  195.    * @param p
  196.    */
  197.   protected synchronized void setPhase(TaskStatus.Phase phase){
  198.     this.taskStatus.setPhase(phase); 
  199.   }
  200.   
  201.   /**
  202.    * Get whether to write skip records.
  203.    */
  204.   protected boolean toWriteSkipRecs() {
  205.     return writeSkipRecs;
  206.   }
  207.       
  208.   /**
  209.    * Set whether to write skip records.
  210.    */
  211.   protected void setWriteSkipRecs(boolean writeSkipRecs) {
  212.     this.writeSkipRecs = writeSkipRecs;
  213.   }
  214.   
  215.   /**
  216.    * Get skipRanges.
  217.    */
  218.   public SortedRanges getSkipRanges() {
  219.     return skipRanges;
  220.   }
  221.   /**
  222.    * Set skipRanges.
  223.    */
  224.   public void setSkipRanges(SortedRanges skipRanges) {
  225.     this.skipRanges = skipRanges;
  226.   }
  227.   /**
  228.    * Is Task in skipping mode.
  229.    */
  230.   public boolean isSkipping() {
  231.     return skipping;
  232.   }
  233.   /**
  234.    * Sets whether to run Task in skipping mode.
  235.    * @param skipping
  236.    */
  237.   public void setSkipping(boolean skipping) {
  238.     this.skipping = skipping;
  239.   }
  240.   /**
  241.    * Return current state of the task. 
  242.    * needs to be synchronized as communication thread 
  243.    * sends the state every second
  244.    * @return
  245.    */
  246.   synchronized TaskStatus.State getState(){
  247.     return this.taskStatus.getRunState(); 
  248.   }
  249.   /**
  250.    * Set current state of the task. 
  251.    * @param state
  252.    */
  253.   synchronized void setState(TaskStatus.State state){
  254.     this.taskStatus.setRunState(state); 
  255.   }
  256.   void setTaskCleanupTask() {
  257.     taskCleanup = true;
  258.   }
  259.    
  260.   boolean isTaskCleanupTask() {
  261.     return taskCleanup;
  262.   }
  263.   boolean isJobCleanupTask() {
  264.     return jobCleanup;
  265.   }
  266.   boolean isJobSetupTask() {
  267.     return jobSetup;
  268.   }
  269.   void setJobSetupTask() {
  270.     jobSetup = true; 
  271.   }
  272.   void setJobCleanupTask() {
  273.     jobCleanup = true; 
  274.   }
  275.   boolean isMapOrReduce() {
  276.     return !jobSetup && !jobCleanup && !taskCleanup;
  277.   }
  278.   
  279.   ////////////////////////////////////////////
  280.   // Writable methods
  281.   ////////////////////////////////////////////
  282.   public void write(DataOutput out) throws IOException {
  283.     Text.writeString(out, jobFile);
  284.     taskId.write(out);
  285.     out.writeInt(partition);
  286.     taskStatus.write(out);
  287.     skipRanges.write(out);
  288.     out.writeBoolean(skipping);
  289.     out.writeBoolean(jobCleanup);
  290.     out.writeBoolean(jobSetup);
  291.     out.writeBoolean(writeSkipRecs);
  292.     out.writeBoolean(taskCleanup);  
  293.     Text.writeString(out, pidFile);
  294.   }
  295.   
  296.   public void readFields(DataInput in) throws IOException {
  297.     jobFile = Text.readString(in);
  298.     taskId = TaskAttemptID.read(in);
  299.     partition = in.readInt();
  300.     taskStatus.readFields(in);
  301.     this.mapOutputFile.setJobId(taskId.getJobID()); 
  302.     skipRanges.readFields(in);
  303.     currentRecIndexIterator = skipRanges.skipRangeIterator();
  304.     currentRecStartIndex = currentRecIndexIterator.next();
  305.     skipping = in.readBoolean();
  306.     jobCleanup = in.readBoolean();
  307.     jobSetup = in.readBoolean();
  308.     writeSkipRecs = in.readBoolean();
  309.     taskCleanup = in.readBoolean();
  310.     if (taskCleanup) {
  311.       setPhase(TaskStatus.Phase.CLEANUP);
  312.     }
  313.     pidFile = Text.readString(in);
  314.   }
  315.   @Override
  316.   public String toString() { return taskId.toString(); }
  317.   /**
  318.    * Localize the given JobConf to be specific for this task.
  319.    */
  320.   public void localizeConfiguration(JobConf conf) throws IOException {
  321.     conf.set("mapred.tip.id", taskId.getTaskID().toString()); 
  322.     conf.set("mapred.task.id", taskId.toString());
  323.     conf.setBoolean("mapred.task.is.map", isMapTask());
  324.     conf.setInt("mapred.task.partition", partition);
  325.     conf.set("mapred.job.id", taskId.getJobID().toString());
  326.   }
  327.   
  328.   /** Run this task as a part of the named job.  This method is executed in the
  329.    * child process and is what invokes user-supplied map, reduce, etc. methods.
  330.    * @param umbilical for progress reports
  331.    */
  332.   public abstract void run(JobConf job, TaskUmbilicalProtocol umbilical)
  333.     throws IOException, ClassNotFoundException, InterruptedException;
  334.   /** Return an approprate thread runner for this task. 
  335.    * @param tip TODO*/
  336.   public abstract TaskRunner createRunner(TaskTracker tracker, 
  337.       TaskTracker.TaskInProgress tip) throws IOException;
  338.   /** The number of milliseconds between progress reports. */
  339.   public static final int PROGRESS_INTERVAL = 3000;
  340.   private transient Progress taskProgress = new Progress();
  341.   // Current counters
  342.   private transient Counters counters = new Counters();
  343.   /* flag to track whether task is done */
  344.   private AtomicBoolean taskDone = new AtomicBoolean(false);
  345.   
  346.   public abstract boolean isMapTask();
  347.   public Progress getProgress() { return taskProgress; }
  348.   public void initialize(JobConf job, JobID id, 
  349.                          Reporter reporter,
  350.                          boolean useNewApi) throws IOException, 
  351.                                                    ClassNotFoundException,
  352.                                                    InterruptedException {
  353.     jobContext = new JobContext(job, id, reporter);
  354.     taskContext = new TaskAttemptContext(job, taskId, reporter);
  355.     if (getState() == TaskStatus.State.UNASSIGNED) {
  356.       setState(TaskStatus.State.RUNNING);
  357.     }
  358.     if (useNewApi) {
  359.       LOG.debug("using new api for output committer");
  360.       outputFormat =
  361.         ReflectionUtils.newInstance(taskContext.getOutputFormatClass(), job);
  362.       committer = outputFormat.getOutputCommitter(taskContext);
  363.     } else {
  364.       committer = conf.getOutputCommitter();
  365.     }
  366.     Path outputPath = FileOutputFormat.getOutputPath(conf);
  367.     if (outputPath != null) {
  368.       if ((committer instanceof FileOutputCommitter)) {
  369.         FileOutputFormat.setWorkOutputPath(conf, 
  370.           ((FileOutputCommitter)committer).getTempTaskOutputPath(taskContext));
  371.       } else {
  372.         FileOutputFormat.setWorkOutputPath(conf, outputPath);
  373.       }
  374.     }
  375.     committer.setupTask(taskContext);
  376.   }
  377.   
  378.   protected class TaskReporter 
  379.       extends org.apache.hadoop.mapreduce.StatusReporter
  380.       implements Runnable, Reporter {
  381.     private TaskUmbilicalProtocol umbilical;
  382.     private InputSplit split = null;
  383.     private Progress taskProgress;
  384.     private Thread pingThread = null;
  385.     /**
  386.      * flag that indicates whether progress update needs to be sent to parent.
  387.      * If true, it has been set. If false, it has been reset. 
  388.      * Using AtomicBoolean since we need an atomic read & reset method. 
  389.      */  
  390.     private AtomicBoolean progressFlag = new AtomicBoolean(false);
  391.     
  392.     TaskReporter(Progress taskProgress,
  393.                  TaskUmbilicalProtocol umbilical) {
  394.       this.umbilical = umbilical;
  395.       this.taskProgress = taskProgress;
  396.     }
  397.     // getters and setters for flag
  398.     void setProgressFlag() {
  399.       progressFlag.set(true);
  400.     }
  401.     boolean resetProgressFlag() {
  402.       return progressFlag.getAndSet(false);
  403.     }
  404.     public void setStatus(String status) {
  405.       taskProgress.setStatus(status);
  406.       // indicate that progress update needs to be sent
  407.       setProgressFlag();
  408.     }
  409.     public void setProgress(float progress) {
  410.       taskProgress.set(progress);
  411.       // indicate that progress update needs to be sent
  412.       setProgressFlag();
  413.     }
  414.     public void progress() {
  415.       // indicate that progress update needs to be sent
  416.       setProgressFlag();
  417.     }
  418.     public Counters.Counter getCounter(String group, String name) {
  419.       Counters.Counter counter = null;
  420.       if (counters != null) {
  421.         counter = counters.findCounter(group, name);
  422.       }
  423.       return counter;
  424.     }
  425.     public Counters.Counter getCounter(Enum<?> name) {
  426.       return counters == null ? null : counters.findCounter(name);
  427.     }
  428.     public void incrCounter(Enum key, long amount) {
  429.       if (counters != null) {
  430.         counters.incrCounter(key, amount);
  431.       }
  432.       setProgressFlag();
  433.     }
  434.     public void incrCounter(String group, String counter, long amount) {
  435.       if (counters != null) {
  436.         counters.incrCounter(group, counter, amount);
  437.       }
  438.       if(skipping && SkipBadRecords.COUNTER_GROUP.equals(group) && (
  439.           SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS.equals(counter) ||
  440.           SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS.equals(counter))) {
  441.         //if application reports the processed records, move the 
  442.         //currentRecStartIndex to the next.
  443.         //currentRecStartIndex is the start index which has not yet been 
  444.         //finished and is still in task's stomach.
  445.         for(int i=0;i<amount;i++) {
  446.           currentRecStartIndex = currentRecIndexIterator.next();
  447.         }
  448.       }
  449.       setProgressFlag();
  450.     }
  451.     public void setInputSplit(InputSplit split) {
  452.       this.split = split;
  453.     }
  454.     public InputSplit getInputSplit() throws UnsupportedOperationException {
  455.       if (split == null) {
  456.         throw new UnsupportedOperationException("Input only available on map");
  457.       } else {
  458.         return split;
  459.       }
  460.     }    
  461.     /** 
  462.      * The communication thread handles communication with the parent (Task Tracker). 
  463.      * It sends progress updates if progress has been made or if the task needs to 
  464.      * let the parent know that it's alive. It also pings the parent to see if it's alive. 
  465.      */
  466.     public void run() {
  467.       final int MAX_RETRIES = 3;
  468.       int remainingRetries = MAX_RETRIES;
  469.       // get current flag value and reset it as well
  470.       boolean sendProgress = resetProgressFlag();
  471.       while (!taskDone.get()) {
  472.         try {
  473.           boolean taskFound = true; // whether TT knows about this task
  474.           // sleep for a bit
  475.           try {
  476.             Thread.sleep(PROGRESS_INTERVAL);
  477.           } 
  478.           catch (InterruptedException e) {
  479.             LOG.debug(getTaskID() + " Progress/ping thread exiting " +
  480.             "since it got interrupted");
  481.             break;
  482.           }
  483.           if (sendProgress) {
  484.             // we need to send progress update
  485.             updateCounters();
  486.             taskStatus.statusUpdate(taskProgress.get(),
  487.                                     taskProgress.toString(), 
  488.                                     counters);
  489.             taskFound = umbilical.statusUpdate(taskId, taskStatus);
  490.             taskStatus.clearStatus();
  491.           }
  492.           else {
  493.             // send ping 
  494.             taskFound = umbilical.ping(taskId);
  495.           }
  496.           // if Task Tracker is not aware of our task ID (probably because it died and 
  497.           // came back up), kill ourselves
  498.           if (!taskFound) {
  499.             LOG.warn("Parent died.  Exiting "+taskId);
  500.             System.exit(66);
  501.           }
  502.           sendProgress = resetProgressFlag(); 
  503.           remainingRetries = MAX_RETRIES;
  504.         } 
  505.         catch (Throwable t) {
  506.           LOG.info("Communication exception: " + StringUtils.stringifyException(t));
  507.           remainingRetries -=1;
  508.           if (remainingRetries == 0) {
  509.             ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0);
  510.             LOG.warn("Last retry, killing "+taskId);
  511.             System.exit(65);
  512.           }
  513.         }
  514.       }
  515.     }
  516.     public void startCommunicationThread() {
  517.       if (pingThread == null) {
  518.         pingThread = new Thread(this, "communication thread");
  519.         pingThread.setDaemon(true);
  520.         pingThread.start();
  521.       }
  522.     }
  523.     public void stopCommunicationThread() throws InterruptedException {
  524.       if (pingThread != null) {
  525.         pingThread.interrupt();
  526.         pingThread.join();
  527.       }
  528.     }
  529.   }
  530.   
  531.   /**
  532.    *  Reports the next executing record range to TaskTracker.
  533.    *  
  534.    * @param umbilical
  535.    * @param nextRecIndex the record index which would be fed next.
  536.    * @throws IOException
  537.    */
  538.   protected void reportNextRecordRange(final TaskUmbilicalProtocol umbilical, 
  539.       long nextRecIndex) throws IOException{
  540.     //currentRecStartIndex is the start index which has not yet been finished 
  541.     //and is still in task's stomach.
  542.     long len = nextRecIndex - currentRecStartIndex +1;
  543.     SortedRanges.Range range = 
  544.       new SortedRanges.Range(currentRecStartIndex, len);
  545.     taskStatus.setNextRecordRange(range);
  546.     LOG.debug("sending reportNextRecordRange " + range);
  547.     umbilical.reportNextRecordRange(taskId, range);
  548.   }
  549.   /**
  550.    * An updater that tracks the last number reported for a given file
  551.    * system and only creates the counters when they are needed.
  552.    */
  553.   class FileSystemStatisticUpdater {
  554.     private long prevReadBytes = 0;
  555.     private long prevWriteBytes = 0;
  556.     private FileSystem.Statistics stats;
  557.     private Counters.Counter readCounter = null;
  558.     private Counters.Counter writeCounter = null;
  559.     private String[] counterNames;
  560.     
  561.     FileSystemStatisticUpdater(String uriScheme, FileSystem.Statistics stats) {
  562.       this.stats = stats;
  563.       this.counterNames = getFileSystemCounterNames(uriScheme);
  564.     }
  565.     void updateCounters() {
  566.       long newReadBytes = stats.getBytesRead();
  567.       long newWriteBytes = stats.getBytesWritten();
  568.       if (prevReadBytes != newReadBytes) {
  569.         if (readCounter == null) {
  570.           readCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
  571.               counterNames[0]);
  572.         }
  573.         readCounter.increment(newReadBytes - prevReadBytes);
  574.         prevReadBytes = newReadBytes;
  575.       }
  576.       if (prevWriteBytes != newWriteBytes) {
  577.         if (writeCounter == null) {
  578.           writeCounter = counters.findCounter(FILESYSTEM_COUNTER_GROUP, 
  579.               counterNames[1]);
  580.         }
  581.         writeCounter.increment(newWriteBytes - prevWriteBytes);
  582.         prevWriteBytes = newWriteBytes;
  583.       }
  584.     }
  585.   }
  586.   
  587.   /**
  588.    * A Map where Key-> URIScheme and value->FileSystemStatisticUpdater
  589.    */
  590.   private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
  591.      new HashMap<String, FileSystemStatisticUpdater>();
  592.   
  593.   private synchronized void updateCounters() {
  594.     for(Statistics stat: FileSystem.getAllStatistics()) {
  595.       String uriScheme = stat.getScheme();
  596.       FileSystemStatisticUpdater updater = statisticUpdaters.get(uriScheme);
  597.       if(updater==null) {//new FileSystem has been found in the cache
  598.         updater = new FileSystemStatisticUpdater(uriScheme, stat);
  599.         statisticUpdaters.put(uriScheme, updater);
  600.       }
  601.       updater.updateCounters();      
  602.     }
  603.   }
  604.   public void done(TaskUmbilicalProtocol umbilical,
  605.                    TaskReporter reporter
  606.                    ) throws IOException, InterruptedException {
  607.     LOG.info("Task:" + taskId + " is done."
  608.              + " And is in the process of commiting");
  609.     updateCounters();
  610.     // check whether the commit is required.
  611.     boolean commitRequired = committer.needsTaskCommit(taskContext);
  612.     if (commitRequired) {
  613.       int retries = MAX_RETRIES;
  614.       setState(TaskStatus.State.COMMIT_PENDING);
  615.       // say the task tracker that task is commit pending
  616.       while (true) {
  617.         try {
  618.           umbilical.commitPending(taskId, taskStatus);
  619.           break;
  620.         } catch (InterruptedException ie) {
  621.           // ignore
  622.         } catch (IOException ie) {
  623.           LOG.warn("Failure sending commit pending: " + 
  624.                     StringUtils.stringifyException(ie));
  625.           if (--retries == 0) {
  626.             System.exit(67);
  627.           }
  628.         }
  629.       }
  630.       //wait for commit approval and commit
  631.       commit(umbilical, reporter, committer);
  632.     }
  633.     taskDone.set(true);
  634.     reporter.stopCommunicationThread();
  635.     sendLastUpdate(umbilical);
  636.     //signal the tasktracker that we are done
  637.     sendDone(umbilical);
  638.   }
  639.   protected void statusUpdate(TaskUmbilicalProtocol umbilical) 
  640.   throws IOException {
  641.     int retries = MAX_RETRIES;
  642.     while (true) {
  643.       try {
  644.         if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
  645.           LOG.warn("Parent died.  Exiting "+taskId);
  646.           System.exit(66);
  647.         }
  648.         taskStatus.clearStatus();
  649.         return;
  650.       } catch (InterruptedException ie) {
  651.         Thread.currentThread().interrupt(); // interrupt ourself
  652.       } catch (IOException ie) {
  653.         LOG.warn("Failure sending status update: " + 
  654.                   StringUtils.stringifyException(ie));
  655.         if (--retries == 0) {
  656.           throw ie;
  657.         }
  658.       }
  659.     }
  660.   }
  661.   
  662.   private void sendLastUpdate(TaskUmbilicalProtocol umbilical) 
  663.   throws IOException {
  664.     // send a final status report
  665.     taskStatus.statusUpdate(taskProgress.get(),
  666.                             taskProgress.toString(), 
  667.                             counters);
  668.     statusUpdate(umbilical);
  669.   }
  670.   private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
  671.     int retries = MAX_RETRIES;
  672.     while (true) {
  673.       try {
  674.         umbilical.done(getTaskID());
  675.         LOG.info("Task '" + taskId + "' done.");
  676.         return;
  677.       } catch (IOException ie) {
  678.         LOG.warn("Failure signalling completion: " + 
  679.                  StringUtils.stringifyException(ie));
  680.         if (--retries == 0) {
  681.           throw ie;
  682.         }
  683.       }
  684.     }
  685.   }
  686.   private void commit(TaskUmbilicalProtocol umbilical,
  687.                       TaskReporter reporter,
  688.                       org.apache.hadoop.mapreduce.OutputCommitter committer
  689.                       ) throws IOException {
  690.     int retries = MAX_RETRIES;
  691.     while (true) {
  692.       try {
  693.         while (!umbilical.canCommit(taskId)) {
  694.           try {
  695.             Thread.sleep(1000);
  696.           } catch(InterruptedException ie) {
  697.             //ignore
  698.           }
  699.           reporter.setProgressFlag();
  700.         }
  701.         // task can Commit now  
  702.         try {
  703.           LOG.info("Task " + taskId + " is allowed to commit now");
  704.           committer.commitTask(taskContext);
  705.           return;
  706.         } catch (IOException iee) {
  707.           LOG.warn("Failure committing: " + 
  708.                     StringUtils.stringifyException(iee));
  709.           discardOutput(taskContext);
  710.           throw iee;
  711.         }
  712.       } catch (IOException ie) {
  713.         LOG.warn("Failure asking whether task can commit: " + 
  714.             StringUtils.stringifyException(ie));
  715.         if (--retries == 0) {
  716.           //if it couldn't commit a successfully then delete the output
  717.           discardOutput(taskContext);
  718.           System.exit(68);
  719.         }
  720.       }
  721.     }
  722.   }
  723.   private 
  724.   void discardOutput(TaskAttemptContext taskContext) {
  725.     try {
  726.       committer.abortTask(taskContext);
  727.     } catch (IOException ioe)  {
  728.       LOG.warn("Failure cleaning up: " + 
  729.                StringUtils.stringifyException(ioe));
  730.     }
  731.   }
  732.   protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
  733.                                 TaskReporter reporter) 
  734.   throws IOException, InterruptedException {
  735.     taskCleanup(umbilical);
  736.     done(umbilical, reporter);
  737.   }
  738.   void taskCleanup(TaskUmbilicalProtocol umbilical) 
  739.   throws IOException {
  740.     // set phase for this task
  741.     setPhase(TaskStatus.Phase.CLEANUP);
  742.     getProgress().setStatus("cleanup");
  743.     statusUpdate(umbilical);
  744.     LOG.info("Runnning cleanup for the task");
  745.     // do the cleanup
  746.     discardOutput(taskContext);
  747.   }
  748.   protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
  749.                                TaskReporter reporter
  750.                               ) throws IOException, InterruptedException {
  751.     // set phase for this task
  752.     setPhase(TaskStatus.Phase.CLEANUP);
  753.     getProgress().setStatus("cleanup");
  754.     statusUpdate(umbilical);
  755.     // do the cleanup
  756.     committer.cleanupJob(jobContext);
  757.     done(umbilical, reporter);
  758.   }
  759.   protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
  760.                              TaskReporter reporter
  761.                              ) throws IOException, InterruptedException {
  762.     // do the setup
  763.     getProgress().setStatus("setup");
  764.     committer.setupJob(jobContext);
  765.     done(umbilical, reporter);
  766.   }
  767.   
  768.   public void setConf(Configuration conf) {
  769.     if (conf instanceof JobConf) {
  770.       this.conf = (JobConf) conf;
  771.     } else {
  772.       this.conf = new JobConf(conf);
  773.     }
  774.     this.mapOutputFile.setConf(this.conf);
  775.     this.lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  776.     // add the static resolutions (this is required for the junit to
  777.     // work on testcases that simulate multiple nodes on a single physical
  778.     // node.
  779.     String hostToResolved[] = conf.getStrings("hadoop.net.static.resolutions");
  780.     if (hostToResolved != null) {
  781.       for (String str : hostToResolved) {
  782.         String name = str.substring(0, str.indexOf('='));
  783.         String resolvedName = str.substring(str.indexOf('=') + 1);
  784.         NetUtils.addStaticResolution(name, resolvedName);
  785.       }
  786.     }
  787.   }
  788.   public Configuration getConf() {
  789.     return this.conf;
  790.   }
  791.   /**
  792.    * OutputCollector for the combiner.
  793.    */
  794.   protected static class CombineOutputCollector<K extends Object, V extends Object> 
  795.   implements OutputCollector<K, V> {
  796.     private Writer<K, V> writer;
  797.     private Counters.Counter outCounter;
  798.     public CombineOutputCollector(Counters.Counter outCounter) {
  799.       this.outCounter = outCounter;
  800.     }
  801.     public synchronized void setWriter(Writer<K, V> writer) {
  802.       this.writer = writer;
  803.     }
  804.     public synchronized void collect(K key, V value)
  805.         throws IOException {
  806.       outCounter.increment(1);
  807.       writer.append(key, value);
  808.     }
  809.   }
  810.   /** Iterates values while keys match in sorted input. */
  811.   static class ValuesIterator<KEY,VALUE> implements Iterator<VALUE> {
  812.     protected RawKeyValueIterator in; //input iterator
  813.     private KEY key;               // current key
  814.     private KEY nextKey;
  815.     private VALUE value;             // current value
  816.     private boolean hasNext;                      // more w/ this key
  817.     private boolean more;                         // more in file
  818.     private RawComparator<KEY> comparator;
  819.     protected Progressable reporter;
  820.     private Deserializer<KEY> keyDeserializer;
  821.     private Deserializer<VALUE> valDeserializer;
  822.     private DataInputBuffer keyIn = new DataInputBuffer();
  823.     private DataInputBuffer valueIn = new DataInputBuffer();
  824.     
  825.     public ValuesIterator (RawKeyValueIterator in, 
  826.                            RawComparator<KEY> comparator, 
  827.                            Class<KEY> keyClass,
  828.                            Class<VALUE> valClass, Configuration conf, 
  829.                            Progressable reporter)
  830.       throws IOException {
  831.       this.in = in;
  832.       this.comparator = comparator;
  833.       this.reporter = reporter;
  834.       SerializationFactory serializationFactory = new SerializationFactory(conf);
  835.       this.keyDeserializer = serializationFactory.getDeserializer(keyClass);
  836.       this.keyDeserializer.open(keyIn);
  837.       this.valDeserializer = serializationFactory.getDeserializer(valClass);
  838.       this.valDeserializer.open(this.valueIn);
  839.       readNextKey();
  840.       key = nextKey;
  841.       nextKey = null; // force new instance creation
  842.       hasNext = more;
  843.     }
  844.     RawKeyValueIterator getRawIterator() { return in; }
  845.     
  846.     /// Iterator methods
  847.     public boolean hasNext() { return hasNext; }
  848.     private int ctr = 0;
  849.     public VALUE next() {
  850.       if (!hasNext) {
  851.         throw new NoSuchElementException("iterate past last value");
  852.       }
  853.       try {
  854.         readNextValue();
  855.         readNextKey();
  856.       } catch (IOException ie) {
  857.         throw new RuntimeException("problem advancing post rec#"+ctr, ie);
  858.       }
  859.       reporter.progress();
  860.       return value;
  861.     }
  862.     public void remove() { throw new RuntimeException("not implemented"); }
  863.     /// Auxiliary methods
  864.     /** Start processing next unique key. */
  865.     void nextKey() throws IOException {
  866.       // read until we find a new key
  867.       while (hasNext) { 
  868.         readNextKey();
  869.       }
  870.       ++ctr;
  871.       
  872.       // move the next key to the current one
  873.       KEY tmpKey = key;
  874.       key = nextKey;
  875.       nextKey = tmpKey;
  876.       hasNext = more;
  877.     }
  878.     /** True iff more keys remain. */
  879.     boolean more() { 
  880.       return more; 
  881.     }
  882.     /** The current key. */
  883.     KEY getKey() { 
  884.       return key; 
  885.     }
  886.     /** 
  887.      * read the next key 
  888.      */
  889.     private void readNextKey() throws IOException {
  890.       more = in.next();
  891.       if (more) {
  892.         DataInputBuffer nextKeyBytes = in.getKey();
  893.         keyIn.reset(nextKeyBytes.getData(), nextKeyBytes.getPosition(), nextKeyBytes.getLength());
  894.         nextKey = keyDeserializer.deserialize(nextKey);
  895.         hasNext = key != null && (comparator.compare(key, nextKey) == 0);
  896.       } else {
  897.         hasNext = false;
  898.       }
  899.     }
  900.     /**
  901.      * Read the next value
  902.      * @throws IOException
  903.      */
  904.     private void readNextValue() throws IOException {
  905.       DataInputBuffer nextValueBytes = in.getValue();
  906.       valueIn.reset(nextValueBytes.getData(), nextValueBytes.getPosition(), nextValueBytes.getLength());
  907.       value = valDeserializer.deserialize(value);
  908.     }
  909.   }
  910.   protected static class CombineValuesIterator<KEY,VALUE>
  911.       extends ValuesIterator<KEY,VALUE> {
  912.     private final Counters.Counter combineInputCounter;
  913.     public CombineValuesIterator(RawKeyValueIterator in,
  914.         RawComparator<KEY> comparator, Class<KEY> keyClass,
  915.         Class<VALUE> valClass, Configuration conf, Reporter reporter,
  916.         Counters.Counter combineInputCounter) throws IOException {
  917.       super(in, comparator, keyClass, valClass, conf, reporter);
  918.       this.combineInputCounter = combineInputCounter;
  919.     }
  920.     public VALUE next() {
  921.       combineInputCounter.increment(1);
  922.       return super.next();
  923.     }
  924.   }
  925.   private static final Constructor<org.apache.hadoop.mapreduce.Reducer.Context> 
  926.   contextConstructor;
  927.   static {
  928.     try {
  929.       contextConstructor = 
  930.         org.apache.hadoop.mapreduce.Reducer.Context.class.getConstructor
  931.         (new Class[]{org.apache.hadoop.mapreduce.Reducer.class,
  932.             Configuration.class,
  933.             org.apache.hadoop.mapreduce.TaskAttemptID.class,
  934.             RawKeyValueIterator.class,
  935.             org.apache.hadoop.mapreduce.Counter.class,
  936.             org.apache.hadoop.mapreduce.RecordWriter.class,
  937.             org.apache.hadoop.mapreduce.OutputCommitter.class,
  938.             org.apache.hadoop.mapreduce.StatusReporter.class,
  939.             RawComparator.class,
  940.             Class.class,
  941.             Class.class});
  942.     } catch (NoSuchMethodException nme) {
  943.       throw new IllegalArgumentException("Can't find constructor");
  944.     }
  945.   }
  946.   @SuppressWarnings("unchecked")
  947.   protected static <INKEY,INVALUE,OUTKEY,OUTVALUE> 
  948.   org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
  949.   createReduceContext(org.apache.hadoop.mapreduce.Reducer
  950.                         <INKEY,INVALUE,OUTKEY,OUTVALUE> reducer,
  951.                       Configuration job,
  952.                       org.apache.hadoop.mapreduce.TaskAttemptID taskId, 
  953.                       RawKeyValueIterator rIter,
  954.                       org.apache.hadoop.mapreduce.Counter inputCounter,
  955.                       org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output, 
  956.                       org.apache.hadoop.mapreduce.OutputCommitter committer,
  957.                       org.apache.hadoop.mapreduce.StatusReporter reporter,
  958.                       RawComparator<INKEY> comparator,
  959.                       Class<INKEY> keyClass, Class<INVALUE> valueClass
  960.   ) throws IOException, ClassNotFoundException {
  961.     try {
  962.       return contextConstructor.newInstance(reducer, job, taskId,
  963.                                             rIter, inputCounter, output, 
  964.                                             committer, reporter, comparator, 
  965.                                             keyClass, valueClass);
  966.     } catch (InstantiationException e) {
  967.       throw new IOException("Can't create Context", e);
  968.     } catch (InvocationTargetException e) {
  969.       throw new IOException("Can't invoke Context constructor", e);
  970.     } catch (IllegalAccessException e) {
  971.       throw new IOException("Can't invoke Context constructor", e);
  972.     }
  973.   }
  974.   protected static abstract class CombinerRunner<K,V> {
  975.     protected final Counters.Counter inputCounter;
  976.     protected final JobConf job;
  977.     protected final TaskReporter reporter;
  978.     CombinerRunner(Counters.Counter inputCounter,
  979.                    JobConf job,
  980.                    TaskReporter reporter) {
  981.       this.inputCounter = inputCounter;
  982.       this.job = job;
  983.       this.reporter = reporter;
  984.     }
  985.     
  986.     /**
  987.      * Run the combiner over a set of inputs.
  988.      * @param iterator the key/value pairs to use as input
  989.      * @param collector the output collector
  990.      */
  991.     abstract void combine(RawKeyValueIterator iterator, 
  992.                           OutputCollector<K,V> collector
  993.                          ) throws IOException, InterruptedException, 
  994.                                   ClassNotFoundException;
  995.     static <K,V> 
  996.     CombinerRunner<K,V> create(JobConf job,
  997.                                TaskAttemptID taskId,
  998.                                Counters.Counter inputCounter,
  999.                                TaskReporter reporter,
  1000.                                org.apache.hadoop.mapreduce.OutputCommitter committer
  1001.                               ) throws ClassNotFoundException {
  1002.       Class<? extends Reducer<K,V,K,V>> cls = 
  1003.         (Class<? extends Reducer<K,V,K,V>>) job.getCombinerClass();
  1004.       if (cls != null) {
  1005.         return new OldCombinerRunner(cls, job, inputCounter, reporter);
  1006.       }
  1007.       // make a task context so we can get the classes
  1008.       org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
  1009.         new org.apache.hadoop.mapreduce.TaskAttemptContext(job, taskId);
  1010.       Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> newcls = 
  1011.         (Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>>)
  1012.            taskContext.getCombinerClass();
  1013.       if (newcls != null) {
  1014.         return new NewCombinerRunner<K,V>(newcls, job, taskId, taskContext, 
  1015.                                           inputCounter, reporter, committer);
  1016.       }
  1017.       
  1018.       return null;
  1019.     }
  1020.   }
  1021.   
  1022.   protected static class OldCombinerRunner<K,V> extends CombinerRunner<K,V> {
  1023.     private final Class<? extends Reducer<K,V,K,V>> combinerClass;
  1024.     private final Class<K> keyClass;
  1025.     private final Class<V> valueClass;
  1026.     private final RawComparator<K> comparator;
  1027.     protected OldCombinerRunner(Class<? extends Reducer<K,V,K,V>> cls,
  1028.                                 JobConf conf,
  1029.                                 Counters.Counter inputCounter,
  1030.                                 TaskReporter reporter) {
  1031.       super(inputCounter, conf, reporter);
  1032.       combinerClass = cls;
  1033.       keyClass = (Class<K>) job.getMapOutputKeyClass();
  1034.       valueClass = (Class<V>) job.getMapOutputValueClass();
  1035.       comparator = (RawComparator<K>) job.getOutputKeyComparator();
  1036.     }
  1037.     @SuppressWarnings("unchecked")
  1038.     protected void combine(RawKeyValueIterator kvIter,
  1039.                            OutputCollector<K,V> combineCollector
  1040.                            ) throws IOException {
  1041.       Reducer<K,V,K,V> combiner = 
  1042.         ReflectionUtils.newInstance(combinerClass, job);
  1043.       try {
  1044.         CombineValuesIterator<K,V> values = 
  1045.           new CombineValuesIterator<K,V>(kvIter, comparator, keyClass, 
  1046.                                          valueClass, job, Reporter.NULL,
  1047.                                          inputCounter);
  1048.         while (values.more()) {
  1049.           combiner.reduce(values.getKey(), values, combineCollector,
  1050.               Reporter.NULL);
  1051.           values.nextKey();
  1052.         }
  1053.       } finally {
  1054.         combiner.close();
  1055.       }
  1056.     }
  1057.   }
  1058.   
  1059.   protected static class NewCombinerRunner<K, V> extends CombinerRunner<K,V> {
  1060.     private final Class<? extends org.apache.hadoop.mapreduce.Reducer<K,V,K,V>> 
  1061.         reducerClass;
  1062.     private final org.apache.hadoop.mapreduce.TaskAttemptID taskId;
  1063.     private final RawComparator<K> comparator;
  1064.     private final Class<K> keyClass;
  1065.     private final Class<V> valueClass;
  1066.     private final org.apache.hadoop.mapreduce.OutputCommitter committer;
  1067.     NewCombinerRunner(Class reducerClass,
  1068.                       JobConf job,
  1069.                       org.apache.hadoop.mapreduce.TaskAttemptID taskId,
  1070.                       org.apache.hadoop.mapreduce.TaskAttemptContext context,
  1071.                       Counters.Counter inputCounter,
  1072.                       TaskReporter reporter,
  1073.                       org.apache.hadoop.mapreduce.OutputCommitter committer) {
  1074.       super(inputCounter, job, reporter);
  1075.       this.reducerClass = reducerClass;
  1076.       this.taskId = taskId;
  1077.       keyClass = (Class<K>) context.getMapOutputKeyClass();
  1078.       valueClass = (Class<V>) context.getMapOutputValueClass();
  1079.       comparator = (RawComparator<K>) context.getSortComparator();
  1080.       this.committer = committer;
  1081.     }
  1082.     private static class OutputConverter<K,V>
  1083.             extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
  1084.       OutputCollector<K,V> output;
  1085.       OutputConverter(OutputCollector<K,V> output) {
  1086.         this.output = output;
  1087.       }
  1088.       @Override
  1089.       public void close(org.apache.hadoop.mapreduce.TaskAttemptContext context){
  1090.       }
  1091.       @Override
  1092.       public void write(K key, V value
  1093.                         ) throws IOException, InterruptedException {
  1094.         output.collect(key,value);
  1095.       }
  1096.     }
  1097.     @Override
  1098.     void combine(RawKeyValueIterator iterator, 
  1099.                  OutputCollector<K,V> collector
  1100.                  ) throws IOException, InterruptedException,
  1101.                           ClassNotFoundException {
  1102.       // make a reducer
  1103.       org.apache.hadoop.mapreduce.Reducer<K,V,K,V> reducer =
  1104.         (org.apache.hadoop.mapreduce.Reducer<K,V,K,V>)
  1105.           ReflectionUtils.newInstance(reducerClass, job);
  1106.       org.apache.hadoop.mapreduce.Reducer.Context 
  1107.            reducerContext = createReduceContext(reducer, job, taskId,
  1108.                                                 iterator, inputCounter, 
  1109.                                                 new OutputConverter(collector),
  1110.                                                 committer,
  1111.                                                 reporter, comparator, keyClass,
  1112.                                                 valueClass);
  1113.       reducer.run(reducerContext);
  1114.     }
  1115.     
  1116.   }
  1117. }