ReduceTask.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:97k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.File;
  22. import java.io.IOException;
  23. import java.io.InputStream;
  24. import java.io.OutputStream;
  25. import java.lang.Math;
  26. import java.lang.reflect.Constructor;
  27. import java.lang.reflect.InvocationTargetException;
  28. import java.net.URI;
  29. import java.net.URL;
  30. import java.net.URLClassLoader;
  31. import java.net.URLConnection;
  32. import java.text.DecimalFormat;
  33. import java.util.ArrayList;
  34. import java.util.Collections;
  35. import java.util.Comparator;
  36. import java.util.HashMap;
  37. import java.util.HashSet;
  38. import java.util.LinkedHashMap;
  39. import java.util.Iterator;
  40. import java.util.LinkedList;
  41. import java.util.List;
  42. import java.util.Map;
  43. import java.util.Random;
  44. import java.util.Set;
  45. import java.util.SortedSet;
  46. import java.util.TreeSet;
  47. import java.util.concurrent.ConcurrentHashMap;
  48. import org.apache.commons.logging.Log;
  49. import org.apache.commons.logging.LogFactory;
  50. import org.apache.hadoop.conf.Configuration;
  51. import org.apache.hadoop.fs.ChecksumFileSystem;
  52. import org.apache.hadoop.fs.FSError;
  53. import org.apache.hadoop.fs.FileStatus;
  54. import org.apache.hadoop.fs.FileSystem;
  55. import org.apache.hadoop.fs.LocalFileSystem;
  56. import org.apache.hadoop.fs.Path;
  57. import org.apache.hadoop.io.DataInputBuffer;
  58. import org.apache.hadoop.io.IOUtils;
  59. import org.apache.hadoop.io.IntWritable;
  60. import org.apache.hadoop.io.RawComparator;
  61. import org.apache.hadoop.io.SequenceFile;
  62. import org.apache.hadoop.io.Writable;
  63. import org.apache.hadoop.io.WritableFactories;
  64. import org.apache.hadoop.io.WritableFactory;
  65. import org.apache.hadoop.io.WritableUtils;
  66. import org.apache.hadoop.io.SequenceFile.CompressionType;
  67. import org.apache.hadoop.io.compress.CodecPool;
  68. import org.apache.hadoop.io.compress.CompressionCodec;
  69. import org.apache.hadoop.io.compress.Decompressor;
  70. import org.apache.hadoop.io.compress.DefaultCodec;
  71. import org.apache.hadoop.mapred.IFile.*;
  72. import org.apache.hadoop.mapred.Merger.Segment;
  73. import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
  74. import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
  75. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  76. import org.apache.hadoop.metrics.MetricsContext;
  77. import org.apache.hadoop.metrics.MetricsRecord;
  78. import org.apache.hadoop.metrics.MetricsUtil;
  79. import org.apache.hadoop.metrics.Updater;
  80. import org.apache.hadoop.util.Progress;
  81. import org.apache.hadoop.util.Progressable;
  82. import org.apache.hadoop.util.ReflectionUtils;
  83. import org.apache.hadoop.util.StringUtils;
  84. /** A Reduce task. */
  85. class ReduceTask extends Task {
  86.   static {                                        // register a ctor
  87.     WritableFactories.setFactory
  88.       (ReduceTask.class,
  89.        new WritableFactory() {
  90.          public Writable newInstance() { return new ReduceTask(); }
  91.        });
  92.   }
  93.   
  94.   private static final Log LOG = LogFactory.getLog(ReduceTask.class.getName());
  95.   private int numMaps;
  96.   private ReduceCopier reduceCopier;
  97.   private CompressionCodec codec;
  98.   { 
  99.     getProgress().setStatus("reduce"); 
  100.     setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
  101.   }
  102.   private Progress copyPhase;
  103.   private Progress sortPhase;
  104.   private Progress reducePhase;
  105.   private Counters.Counter reduceShuffleBytes = 
  106.     getCounters().findCounter(Counter.REDUCE_SHUFFLE_BYTES);
  107.   private Counters.Counter reduceInputKeyCounter = 
  108.     getCounters().findCounter(Counter.REDUCE_INPUT_GROUPS);
  109.   private Counters.Counter reduceInputValueCounter = 
  110.     getCounters().findCounter(Counter.REDUCE_INPUT_RECORDS);
  111.   private Counters.Counter reduceOutputCounter = 
  112.     getCounters().findCounter(Counter.REDUCE_OUTPUT_RECORDS);
  113.   private Counters.Counter reduceCombineOutputCounter =
  114.     getCounters().findCounter(Counter.COMBINE_OUTPUT_RECORDS);
  115.   // A custom comparator for map output files. Here the ordering is determined
  116.   // by the file's size and path. In case of files with same size and different
  117.   // file paths, the first parameter is considered smaller than the second one.
  118.   // In case of files with same size and path are considered equal.
  119.   private Comparator<FileStatus> mapOutputFileComparator = 
  120.     new Comparator<FileStatus>() {
  121.       public int compare(FileStatus a, FileStatus b) {
  122.         if (a.getLen() < b.getLen())
  123.           return -1;
  124.         else if (a.getLen() == b.getLen())
  125.           if (a.getPath().toString().equals(b.getPath().toString()))
  126.             return 0;
  127.           else
  128.             return -1; 
  129.         else
  130.           return 1;
  131.       }
  132.   };
  133.   
  134.   // A sorted set for keeping a set of map output files on disk
  135.   private final SortedSet<FileStatus> mapOutputFilesOnDisk = 
  136.     new TreeSet<FileStatus>(mapOutputFileComparator);
  137.   public ReduceTask() {
  138.     super();
  139.   }
  140.   public ReduceTask(String jobFile, TaskAttemptID taskId,
  141.                     int partition, int numMaps) {
  142.     super(jobFile, taskId, partition);
  143.     this.numMaps = numMaps;
  144.   }
  145.   
  146.   private CompressionCodec initCodec() {
  147.     // check if map-outputs are to be compressed
  148.     if (conf.getCompressMapOutput()) {
  149.       Class<? extends CompressionCodec> codecClass =
  150.         conf.getMapOutputCompressorClass(DefaultCodec.class);
  151.       return ReflectionUtils.newInstance(codecClass, conf);
  152.     } 
  153.     return null;
  154.   }
  155.   @Override
  156.   public TaskRunner createRunner(TaskTracker tracker, TaskInProgress tip) 
  157.   throws IOException {
  158.     return new ReduceTaskRunner(tip, tracker, this.conf);
  159.   }
  160.   @Override
  161.   public boolean isMapTask() {
  162.     return false;
  163.   }
  164.   public int getNumMaps() { return numMaps; }
  165.   
  166.   /**
  167.    * Localize the given JobConf to be specific for this task.
  168.    */
  169.   @Override
  170.   public void localizeConfiguration(JobConf conf) throws IOException {
  171.     super.localizeConfiguration(conf);
  172.     conf.setNumMapTasks(numMaps);
  173.   }
  174.   @Override
  175.   public void write(DataOutput out) throws IOException {
  176.     super.write(out);
  177.     out.writeInt(numMaps);                        // write the number of maps
  178.   }
  179.   @Override
  180.   public void readFields(DataInput in) throws IOException {
  181.     super.readFields(in);
  182.     numMaps = in.readInt();
  183.   }
  184.   
  185.   // Get the input files for the reducer.
  186.   private Path[] getMapFiles(FileSystem fs, boolean isLocal) 
  187.   throws IOException {
  188.     List<Path> fileList = new ArrayList<Path>();
  189.     if (isLocal) {
  190.       // for local jobs
  191.       for(int i = 0; i < numMaps; ++i) {
  192.         fileList.add(mapOutputFile.getInputFile(i, getTaskID()));
  193.       }
  194.     } else {
  195.       // for non local jobs
  196.       for (FileStatus filestatus : mapOutputFilesOnDisk) {
  197.         fileList.add(filestatus.getPath());
  198.       }
  199.     }
  200.     return fileList.toArray(new Path[0]);
  201.   }
  202.   private class ReduceValuesIterator<KEY,VALUE> 
  203.           extends ValuesIterator<KEY,VALUE> {
  204.     public ReduceValuesIterator (RawKeyValueIterator in,
  205.                                  RawComparator<KEY> comparator, 
  206.                                  Class<KEY> keyClass,
  207.                                  Class<VALUE> valClass,
  208.                                  Configuration conf, Progressable reporter)
  209.       throws IOException {
  210.       super(in, comparator, keyClass, valClass, conf, reporter);
  211.     }
  212.     @Override
  213.     public VALUE next() {
  214.       reduceInputValueCounter.increment(1);
  215.       return moveToNext();
  216.     }
  217.     
  218.     protected VALUE moveToNext() {
  219.       return super.next();
  220.     }
  221.     
  222.     public void informReduceProgress() {
  223.       reducePhase.set(super.in.getProgress().get()); // update progress
  224.       reporter.progress();
  225.     }
  226.   }
  227.   private class SkippingReduceValuesIterator<KEY,VALUE> 
  228.      extends ReduceValuesIterator<KEY,VALUE> {
  229.      private SkipRangeIterator skipIt;
  230.      private TaskUmbilicalProtocol umbilical;
  231.      private Counters.Counter skipGroupCounter;
  232.      private Counters.Counter skipRecCounter;
  233.      private long grpIndex = -1;
  234.      private Class<KEY> keyClass;
  235.      private Class<VALUE> valClass;
  236.      private SequenceFile.Writer skipWriter;
  237.      private boolean toWriteSkipRecs;
  238.      private boolean hasNext;
  239.      private TaskReporter reporter;
  240.      
  241.      public SkippingReduceValuesIterator(RawKeyValueIterator in,
  242.          RawComparator<KEY> comparator, Class<KEY> keyClass,
  243.          Class<VALUE> valClass, Configuration conf, TaskReporter reporter,
  244.          TaskUmbilicalProtocol umbilical) throws IOException {
  245.        super(in, comparator, keyClass, valClass, conf, reporter);
  246.        this.umbilical = umbilical;
  247.        this.skipGroupCounter = 
  248.          reporter.getCounter(Counter.REDUCE_SKIPPED_GROUPS);
  249.        this.skipRecCounter = 
  250.          reporter.getCounter(Counter.REDUCE_SKIPPED_RECORDS);
  251.        this.toWriteSkipRecs = toWriteSkipRecs() &&  
  252.          SkipBadRecords.getSkipOutputPath(conf)!=null;
  253.        this.keyClass = keyClass;
  254.        this.valClass = valClass;
  255.        this.reporter = reporter;
  256.        skipIt = getSkipRanges().skipRangeIterator();
  257.        mayBeSkip();
  258.      }
  259.      
  260.      void nextKey() throws IOException {
  261.        super.nextKey();
  262.        mayBeSkip();
  263.      }
  264.      
  265.      boolean more() { 
  266.        return super.more() && hasNext; 
  267.      }
  268.      
  269.      private void mayBeSkip() throws IOException {
  270.        hasNext = skipIt.hasNext();
  271.        if(!hasNext) {
  272.          LOG.warn("Further groups got skipped.");
  273.          return;
  274.        }
  275.        grpIndex++;
  276.        long nextGrpIndex = skipIt.next();
  277.        long skip = 0;
  278.        long skipRec = 0;
  279.        while(grpIndex<nextGrpIndex && super.more()) {
  280.          while (hasNext()) {
  281.            VALUE value = moveToNext();
  282.            if(toWriteSkipRecs) {
  283.              writeSkippedRec(getKey(), value);
  284.            }
  285.            skipRec++;
  286.          }
  287.          super.nextKey();
  288.          grpIndex++;
  289.          skip++;
  290.        }
  291.        
  292.        //close the skip writer once all the ranges are skipped
  293.        if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
  294.          skipWriter.close();
  295.        }
  296.        skipGroupCounter.increment(skip);
  297.        skipRecCounter.increment(skipRec);
  298.        reportNextRecordRange(umbilical, grpIndex);
  299.      }
  300.      
  301.      @SuppressWarnings("unchecked")
  302.      private void writeSkippedRec(KEY key, VALUE value) throws IOException{
  303.        if(skipWriter==null) {
  304.          Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
  305.          Path skipFile = new Path(skipDir, getTaskID().toString());
  306.          skipWriter = SequenceFile.createWriter(
  307.                skipFile.getFileSystem(conf), conf, skipFile,
  308.                keyClass, valClass, 
  309.                CompressionType.BLOCK, reporter);
  310.        }
  311.        skipWriter.append(key, value);
  312.      }
  313.   }
  314.   @Override
  315.   @SuppressWarnings("unchecked")
  316.   public void run(JobConf job, final TaskUmbilicalProtocol umbilical)
  317.     throws IOException, InterruptedException, ClassNotFoundException {
  318.     job.setBoolean("mapred.skip.on", isSkipping());
  319.     if (isMapOrReduce()) {
  320.       copyPhase = getProgress().addPhase("copy");
  321.       sortPhase  = getProgress().addPhase("sort");
  322.       reducePhase = getProgress().addPhase("reduce");
  323.     }
  324.     // start thread that will handle communication with parent
  325.     TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
  326.     reporter.startCommunicationThread();
  327.     boolean useNewApi = job.getUseNewReducer();
  328.     initialize(job, getJobID(), reporter, useNewApi);
  329.     // check if it is a cleanupJobTask
  330.     if (jobCleanup) {
  331.       runJobCleanupTask(umbilical, reporter);
  332.       return;
  333.     }
  334.     if (jobSetup) {
  335.       runJobSetupTask(umbilical, reporter);
  336.       return;
  337.     }
  338.     if (taskCleanup) {
  339.       runTaskCleanupTask(umbilical, reporter);
  340.       return;
  341.     }
  342.     
  343.     // Initialize the codec
  344.     codec = initCodec();
  345.     boolean isLocal = "local".equals(job.get("mapred.job.tracker", "local"));
  346.     if (!isLocal) {
  347.       reduceCopier = new ReduceCopier(umbilical, job, reporter);
  348.       if (!reduceCopier.fetchOutputs()) {
  349.         if(reduceCopier.mergeThrowable instanceof FSError) {
  350.           LOG.error("Task: " + getTaskID() + " - FSError: " + 
  351.               StringUtils.stringifyException(reduceCopier.mergeThrowable));
  352.           umbilical.fsError(getTaskID(), 
  353.               reduceCopier.mergeThrowable.getMessage());
  354.         }
  355.         throw new IOException("Task: " + getTaskID() + 
  356.             " - The reduce copier failed", reduceCopier.mergeThrowable);
  357.       }
  358.     }
  359.     copyPhase.complete();                         // copy is already complete
  360.     setPhase(TaskStatus.Phase.SORT);
  361.     statusUpdate(umbilical);
  362.     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
  363.     RawKeyValueIterator rIter = isLocal
  364.       ? Merger.merge(job, rfs, job.getMapOutputKeyClass(),
  365.           job.getMapOutputValueClass(), codec, getMapFiles(rfs, true),
  366.           !conf.getKeepFailedTaskFiles(), job.getInt("io.sort.factor", 100),
  367.           new Path(getTaskID().toString()), job.getOutputKeyComparator(),
  368.           reporter, spilledRecordsCounter, null)
  369.       : reduceCopier.createKVIterator(job, rfs, reporter);
  370.         
  371.     // free up the data structures
  372.     mapOutputFilesOnDisk.clear();
  373.     
  374.     sortPhase.complete();                         // sort is complete
  375.     setPhase(TaskStatus.Phase.REDUCE); 
  376.     statusUpdate(umbilical);
  377.     Class keyClass = job.getMapOutputKeyClass();
  378.     Class valueClass = job.getMapOutputValueClass();
  379.     RawComparator comparator = job.getOutputValueGroupingComparator();
  380.     if (useNewApi) {
  381.       runNewReducer(job, umbilical, reporter, rIter, comparator, 
  382.                     keyClass, valueClass);
  383.     } else {
  384.       runOldReducer(job, umbilical, reporter, rIter, comparator, 
  385.                     keyClass, valueClass);
  386.     }
  387.     done(umbilical, reporter);
  388.   }
  389.   @SuppressWarnings("unchecked")
  390.   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  391.   void runOldReducer(JobConf job,
  392.                      TaskUmbilicalProtocol umbilical,
  393.                      final TaskReporter reporter,
  394.                      RawKeyValueIterator rIter,
  395.                      RawComparator<INKEY> comparator,
  396.                      Class<INKEY> keyClass,
  397.                      Class<INVALUE> valueClass) throws IOException {
  398.     Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer = 
  399.       ReflectionUtils.newInstance(job.getReducerClass(), job);
  400.     // make output collector
  401.     String finalName = getOutputName(getPartition());
  402.     FileSystem fs = FileSystem.get(job);
  403.     final RecordWriter<OUTKEY,OUTVALUE> out = 
  404.       job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);  
  405.     
  406.     OutputCollector<OUTKEY,OUTVALUE> collector = 
  407.       new OutputCollector<OUTKEY,OUTVALUE>() {
  408.         public void collect(OUTKEY key, OUTVALUE value)
  409.           throws IOException {
  410.           out.write(key, value);
  411.           reduceOutputCounter.increment(1);
  412.           // indicate that progress update needs to be sent
  413.           reporter.progress();
  414.         }
  415.       };
  416.     
  417.     // apply reduce function
  418.     try {
  419.       //increment processed counter only if skipping feature is enabled
  420.       boolean incrProcCount = SkipBadRecords.getReducerMaxSkipGroups(job)>0 &&
  421.         SkipBadRecords.getAutoIncrReducerProcCount(job);
  422.       
  423.       ReduceValuesIterator<INKEY,INVALUE> values = isSkipping() ? 
  424.           new SkippingReduceValuesIterator<INKEY,INVALUE>(rIter, 
  425.               comparator, keyClass, valueClass, 
  426.               job, reporter, umbilical) :
  427.           new ReduceValuesIterator<INKEY,INVALUE>(rIter, 
  428.           job.getOutputValueGroupingComparator(), keyClass, valueClass, 
  429.           job, reporter);
  430.       values.informReduceProgress();
  431.       while (values.more()) {
  432.         reduceInputKeyCounter.increment(1);
  433.         reducer.reduce(values.getKey(), values, collector, reporter);
  434.         if(incrProcCount) {
  435.           reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
  436.               SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS, 1);
  437.         }
  438.         values.nextKey();
  439.         values.informReduceProgress();
  440.       }
  441.       //Clean up: repeated in catch block below
  442.       reducer.close();
  443.       out.close(reporter);
  444.       //End of clean up.
  445.     } catch (IOException ioe) {
  446.       try {
  447.         reducer.close();
  448.       } catch (IOException ignored) {}
  449.         
  450.       try {
  451.         out.close(reporter);
  452.       } catch (IOException ignored) {}
  453.       
  454.       throw ioe;
  455.     }
  456.   }
  457.   static class NewTrackingRecordWriter<K,V> 
  458.       extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
  459.     private final org.apache.hadoop.mapreduce.RecordWriter<K,V> real;
  460.     private final org.apache.hadoop.mapreduce.Counter outputRecordCounter;
  461.   
  462.     NewTrackingRecordWriter(org.apache.hadoop.mapreduce.RecordWriter<K,V> real,
  463.                             org.apache.hadoop.mapreduce.Counter recordCounter) {
  464.       this.real = real;
  465.       this.outputRecordCounter = recordCounter;
  466.     }
  467.     @Override
  468.     public void close(TaskAttemptContext context) throws IOException,
  469.     InterruptedException {
  470.       real.close(context);
  471.     }
  472.     @Override
  473.     public void write(K key, V value) throws IOException, InterruptedException {
  474.       real.write(key,value);
  475.       outputRecordCounter.increment(1);
  476.     }
  477.   }
  478.   @SuppressWarnings("unchecked")
  479.   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  480.   void runNewReducer(JobConf job,
  481.                      final TaskUmbilicalProtocol umbilical,
  482.                      final TaskReporter reporter,
  483.                      RawKeyValueIterator rIter,
  484.                      RawComparator<INKEY> comparator,
  485.                      Class<INKEY> keyClass,
  486.                      Class<INVALUE> valueClass
  487.                      ) throws IOException,InterruptedException, 
  488.                               ClassNotFoundException {
  489.     // make a task context so we can get the classes
  490.     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
  491.       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
  492.     // make a reducer
  493.     org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE> reducer =
  494.       (org.apache.hadoop.mapreduce.Reducer<INKEY,INVALUE,OUTKEY,OUTVALUE>)
  495.         ReflectionUtils.newInstance(taskContext.getReducerClass(), job);
  496.     org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE> output =
  497.       (org.apache.hadoop.mapreduce.RecordWriter<OUTKEY,OUTVALUE>)
  498.         outputFormat.getRecordWriter(taskContext);
  499.     job.setBoolean("mapred.skip.on", isSkipping());
  500.     org.apache.hadoop.mapreduce.Reducer.Context 
  501.          reducerContext = createReduceContext(reducer, job, getTaskID(),
  502.                                                rIter, reduceInputValueCounter, 
  503.                                                output, committer,
  504.                                                reporter, comparator, keyClass,
  505.                                                valueClass);
  506.     reducer.run(reducerContext);
  507.     output.close(reducerContext);
  508.   }
  509.   class ReduceCopier<K, V> implements MRConstants {
  510.     /** Reference to the umbilical object */
  511.     private TaskUmbilicalProtocol umbilical;
  512.     private final TaskReporter reporter;
  513.     
  514.     /** Reference to the task object */
  515.     
  516.     /** Number of ms before timing out a copy */
  517.     private static final int STALLED_COPY_TIMEOUT = 3 * 60 * 1000;
  518.     
  519.     /** Max events to fetch in one go from the tasktracker */
  520.     private static final int MAX_EVENTS_TO_FETCH = 10000;
  521.     /**
  522.      * our reduce task instance
  523.      */
  524.     private ReduceTask reduceTask;
  525.     
  526.     /**
  527.      * the list of map outputs currently being copied
  528.      */
  529.     private List<MapOutputLocation> scheduledCopies;
  530.     
  531.     /**
  532.      *  the results of dispatched copy attempts
  533.      */
  534.     private List<CopyResult> copyResults;
  535.     
  536.     /**
  537.      *  the number of outputs to copy in parallel
  538.      */
  539.     private int numCopiers;
  540.     
  541.     /**
  542.      *  a number that is set to the max #fetches we'd schedule and then
  543.      *  pause the schduling
  544.      */
  545.     private int maxInFlight;
  546.     
  547.     /**
  548.      * the amount of time spent on fetching one map output before considering 
  549.      * it as failed and notifying the jobtracker about it.
  550.      */
  551.     private int maxBackoff;
  552.     
  553.     /**
  554.      * busy hosts from which copies are being backed off
  555.      * Map of host -> next contact time
  556.      */
  557.     private Map<String, Long> penaltyBox;
  558.     
  559.     /**
  560.      * the set of unique hosts from which we are copying
  561.      */
  562.     private Set<String> uniqueHosts;
  563.     
  564.     /**
  565.      * A reference to the RamManager for writing the map outputs to.
  566.      */
  567.     
  568.     private ShuffleRamManager ramManager;
  569.     
  570.     /**
  571.      * A reference to the local file system for writing the map outputs to.
  572.      */
  573.     private FileSystem localFileSys;
  574.     private FileSystem rfs;
  575.     /**
  576.      * Number of files to merge at a time
  577.      */
  578.     private int ioSortFactor;
  579.     
  580.     /**
  581.      * A reference to the throwable object (if merge throws an exception)
  582.      */
  583.     private volatile Throwable mergeThrowable;
  584.     
  585.     /** 
  586.      * A flag to indicate when to exit localFS merge
  587.      */
  588.     private volatile boolean exitLocalFSMerge = false;
  589.     /** 
  590.      * A flag to indicate when to exit getMapEvents thread 
  591.      */
  592.     private volatile boolean exitGetMapEvents = false;
  593.     
  594.     /**
  595.      * When we accumulate maxInMemOutputs number of files in ram, we merge/spill
  596.      */
  597.     private final int maxInMemOutputs;
  598.     /**
  599.      * Usage threshold for in-memory output accumulation.
  600.      */
  601.     private final float maxInMemCopyPer;
  602.     /**
  603.      * Maximum memory usage of map outputs to merge from memory into
  604.      * the reduce, in bytes.
  605.      */
  606.     private final long maxInMemReduce;
  607.     /**
  608.      * The threads for fetching the files.
  609.      */
  610.     private List<MapOutputCopier> copiers = null;
  611.     
  612.     /**
  613.      * The object for metrics reporting.
  614.      */
  615.     private ShuffleClientMetrics shuffleClientMetrics = null;
  616.     
  617.     /**
  618.      * the minimum interval between tasktracker polls
  619.      */
  620.     private static final long MIN_POLL_INTERVAL = 1000;
  621.     
  622.     /**
  623.      * a list of map output locations for fetch retrials 
  624.      */
  625.     private List<MapOutputLocation> retryFetches =
  626.       new ArrayList<MapOutputLocation>();
  627.     
  628.     /** 
  629.      * The set of required map outputs
  630.      */
  631.     private Set <TaskID> copiedMapOutputs = 
  632.       Collections.synchronizedSet(new TreeSet<TaskID>());
  633.     
  634.     /** 
  635.      * The set of obsolete map taskids.
  636.      */
  637.     private Set <TaskAttemptID> obsoleteMapIds = 
  638.       Collections.synchronizedSet(new TreeSet<TaskAttemptID>());
  639.     
  640.     private Random random = null;
  641.     /**
  642.      * the max of all the map completion times
  643.      */
  644.     private int maxMapRuntime;
  645.     
  646.     /**
  647.      * Maximum number of fetch-retries per-map.
  648.      */
  649.     private volatile int maxFetchRetriesPerMap;
  650.     
  651.     /**
  652.      * Combiner runner, if a combiner is needed
  653.      */
  654.     private CombinerRunner combinerRunner;
  655.     /**
  656.      * Resettable collector used for combine.
  657.      */
  658.     private CombineOutputCollector combineCollector = null;
  659.     /**
  660.      * Maximum percent of failed fetch attempt before killing the reduce task.
  661.      */
  662.     private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
  663.     /**
  664.      * Minimum percent of progress required to keep the reduce alive.
  665.      */
  666.     private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
  667.     /**
  668.      * Maximum percent of shuffle execution time required to keep the reducer alive.
  669.      */
  670.     private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
  671.     
  672.     /**
  673.      * Minimum number of map fetch retries.
  674.      */
  675.     private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
  676.     /**
  677.      * Maximum no. of unique maps from which we failed to fetch map-outputs
  678.      * even after {@link #maxFetchRetriesPerMap} retries; after this the
  679.      * reduce task is failed.
  680.      */
  681.     private int maxFailedUniqueFetches = 5;
  682.     /**
  683.      * The maps from which we fail to fetch map-outputs 
  684.      * even after {@link #maxFetchRetriesPerMap} retries.
  685.      */
  686.     Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>(); 
  687.     
  688.     /**
  689.      * A map of taskId -> no. of failed fetches
  690.      */
  691.     Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = 
  692.       new HashMap<TaskAttemptID, Integer>();    
  693.     /**
  694.      * Initial backoff interval (milliseconds)
  695.      */
  696.     private static final int BACKOFF_INIT = 4000; 
  697.     
  698.     /**
  699.      * The interval for logging in the shuffle
  700.      */
  701.     private static final int MIN_LOG_TIME = 60000;
  702.     /** 
  703.      * List of in-memory map-outputs.
  704.      */
  705.     private final List<MapOutput> mapOutputsFilesInMemory =
  706.       Collections.synchronizedList(new LinkedList<MapOutput>());
  707.     
  708.     /**
  709.      * The map for (Hosts, List of MapIds from this Host) maintaining
  710.      * map output locations
  711.      */
  712.     private final Map<String, List<MapOutputLocation>> mapLocations = 
  713.       new ConcurrentHashMap<String, List<MapOutputLocation>>();
  714.     
  715.     /**
  716.      * This class contains the methods that should be used for metrics-reporting
  717.      * the specific metrics for shuffle. This class actually reports the
  718.      * metrics for the shuffle client (the ReduceTask), and hence the name
  719.      * ShuffleClientMetrics.
  720.      */
  721.     class ShuffleClientMetrics implements Updater {
  722.       private MetricsRecord shuffleMetrics = null;
  723.       private int numFailedFetches = 0;
  724.       private int numSuccessFetches = 0;
  725.       private long numBytes = 0;
  726.       private int numThreadsBusy = 0;
  727.       ShuffleClientMetrics(JobConf conf) {
  728.         MetricsContext metricsContext = MetricsUtil.getContext("mapred");
  729.         this.shuffleMetrics = 
  730.           MetricsUtil.createRecord(metricsContext, "shuffleInput");
  731.         this.shuffleMetrics.setTag("user", conf.getUser());
  732.         this.shuffleMetrics.setTag("jobName", conf.getJobName());
  733.         this.shuffleMetrics.setTag("jobId", ReduceTask.this.getJobID().toString());
  734.         this.shuffleMetrics.setTag("taskId", getTaskID().toString());
  735.         this.shuffleMetrics.setTag("sessionId", conf.getSessionId());
  736.         metricsContext.registerUpdater(this);
  737.       }
  738.       public synchronized void inputBytes(long numBytes) {
  739.         this.numBytes += numBytes;
  740.       }
  741.       public synchronized void failedFetch() {
  742.         ++numFailedFetches;
  743.       }
  744.       public synchronized void successFetch() {
  745.         ++numSuccessFetches;
  746.       }
  747.       public synchronized void threadBusy() {
  748.         ++numThreadsBusy;
  749.       }
  750.       public synchronized void threadFree() {
  751.         --numThreadsBusy;
  752.       }
  753.       public void doUpdates(MetricsContext unused) {
  754.         synchronized (this) {
  755.           shuffleMetrics.incrMetric("shuffle_input_bytes", numBytes);
  756.           shuffleMetrics.incrMetric("shuffle_failed_fetches", 
  757.                                     numFailedFetches);
  758.           shuffleMetrics.incrMetric("shuffle_success_fetches", 
  759.                                     numSuccessFetches);
  760.           if (numCopiers != 0) {
  761.             shuffleMetrics.setMetric("shuffle_fetchers_busy_percent",
  762.                 100*((float)numThreadsBusy/numCopiers));
  763.           } else {
  764.             shuffleMetrics.setMetric("shuffle_fetchers_busy_percent", 0);
  765.           }
  766.           numBytes = 0;
  767.           numSuccessFetches = 0;
  768.           numFailedFetches = 0;
  769.         }
  770.         shuffleMetrics.update();
  771.       }
  772.     }
  773.     /** Represents the result of an attempt to copy a map output */
  774.     private class CopyResult {
  775.       
  776.       // the map output location against which a copy attempt was made
  777.       private final MapOutputLocation loc;
  778.       
  779.       // the size of the file copied, -1 if the transfer failed
  780.       private final long size;
  781.       
  782.       //a flag signifying whether a copy result is obsolete
  783.       private static final int OBSOLETE = -2;
  784.       
  785.       CopyResult(MapOutputLocation loc, long size) {
  786.         this.loc = loc;
  787.         this.size = size;
  788.       }
  789.       
  790.       public boolean getSuccess() { return size >= 0; }
  791.       public boolean isObsolete() { 
  792.         return size == OBSOLETE;
  793.       }
  794.       public long getSize() { return size; }
  795.       public String getHost() { return loc.getHost(); }
  796.       public MapOutputLocation getLocation() { return loc; }
  797.     }
  798.     
  799.     private int nextMapOutputCopierId = 0;
  800.     
  801.     /**
  802.      * Abstraction to track a map-output.
  803.      */
  804.     private class MapOutputLocation {
  805.       TaskAttemptID taskAttemptId;
  806.       TaskID taskId;
  807.       String ttHost;
  808.       URL taskOutput;
  809.       
  810.       public MapOutputLocation(TaskAttemptID taskAttemptId, 
  811.                                String ttHost, URL taskOutput) {
  812.         this.taskAttemptId = taskAttemptId;
  813.         this.taskId = this.taskAttemptId.getTaskID();
  814.         this.ttHost = ttHost;
  815.         this.taskOutput = taskOutput;
  816.       }
  817.       
  818.       public TaskAttemptID getTaskAttemptId() {
  819.         return taskAttemptId;
  820.       }
  821.       
  822.       public TaskID getTaskId() {
  823.         return taskId;
  824.       }
  825.       
  826.       public String getHost() {
  827.         return ttHost;
  828.       }
  829.       
  830.       public URL getOutputLocation() {
  831.         return taskOutput;
  832.       }
  833.     }
  834.     
  835.     /** Describes the output of a map; could either be on disk or in-memory. */
  836.     private class MapOutput {
  837.       final TaskID mapId;
  838.       final TaskAttemptID mapAttemptId;
  839.       
  840.       final Path file;
  841.       final Configuration conf;
  842.       
  843.       byte[] data;
  844.       final boolean inMemory;
  845.       long compressedSize;
  846.       
  847.       public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, 
  848.                        Configuration conf, Path file, long size) {
  849.         this.mapId = mapId;
  850.         this.mapAttemptId = mapAttemptId;
  851.         
  852.         this.conf = conf;
  853.         this.file = file;
  854.         this.compressedSize = size;
  855.         
  856.         this.data = null;
  857.         
  858.         this.inMemory = false;
  859.       }
  860.       
  861.       public MapOutput(TaskID mapId, TaskAttemptID mapAttemptId, byte[] data, int compressedLength) {
  862.         this.mapId = mapId;
  863.         this.mapAttemptId = mapAttemptId;
  864.         
  865.         this.file = null;
  866.         this.conf = null;
  867.         
  868.         this.data = data;
  869.         this.compressedSize = compressedLength;
  870.         
  871.         this.inMemory = true;
  872.       }
  873.       
  874.       public void discard() throws IOException {
  875.         if (inMemory) {
  876.           data = null;
  877.         } else {
  878.           FileSystem fs = file.getFileSystem(conf);
  879.           fs.delete(file, true);
  880.         }
  881.       }
  882.     }
  883.     
  884.     class ShuffleRamManager implements RamManager {
  885.       /* Maximum percentage of the in-memory limit that a single shuffle can 
  886.        * consume*/ 
  887.       private static final float MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION = 0.25f;
  888.       
  889.       /* Maximum percentage of shuffle-threads which can be stalled 
  890.        * simultaneously after which a merge is triggered. */ 
  891.       private static final float MAX_STALLED_SHUFFLE_THREADS_FRACTION = 0.75f;
  892.       
  893.       private final int maxSize;
  894.       private final int maxSingleShuffleLimit;
  895.       
  896.       private int size = 0;
  897.       
  898.       private Object dataAvailable = new Object();
  899.       private int fullSize = 0;
  900.       private int numPendingRequests = 0;
  901.       private int numRequiredMapOutputs = 0;
  902.       private int numClosed = 0;
  903.       private boolean closed = false;
  904.       
  905.       public ShuffleRamManager(Configuration conf) throws IOException {
  906.         final float maxInMemCopyUse =
  907.           conf.getFloat("mapred.job.shuffle.input.buffer.percent", 0.70f);
  908.         if (maxInMemCopyUse > 1.0 || maxInMemCopyUse < 0.0) {
  909.           throw new IOException("mapred.job.shuffle.input.buffer.percent" +
  910.                                 maxInMemCopyUse);
  911.         }
  912.         maxSize = (int)Math.min(
  913.             Runtime.getRuntime().maxMemory() * maxInMemCopyUse,
  914.             Integer.MAX_VALUE);
  915.         maxSingleShuffleLimit = (int)(maxSize * MAX_SINGLE_SHUFFLE_SEGMENT_FRACTION);
  916.         LOG.info("ShuffleRamManager: MemoryLimit=" + maxSize + 
  917.                  ", MaxSingleShuffleLimit=" + maxSingleShuffleLimit);
  918.       }
  919.       
  920.       public synchronized boolean reserve(int requestedSize, InputStream in) 
  921.       throws InterruptedException {
  922.         // Wait till the request can be fulfilled...
  923.         while ((size + requestedSize) > maxSize) {
  924.           
  925.           // Close the input...
  926.           if (in != null) {
  927.             try {
  928.               in.close();
  929.             } catch (IOException ie) {
  930.               LOG.info("Failed to close connection with: " + ie);
  931.             } finally {
  932.               in = null;
  933.             }
  934.           } 
  935.           // Track pending requests
  936.           synchronized (dataAvailable) {
  937.             ++numPendingRequests;
  938.             dataAvailable.notify();
  939.           }
  940.           // Wait for memory to free up
  941.           wait();
  942.           
  943.           // Track pending requests
  944.           synchronized (dataAvailable) {
  945.             --numPendingRequests;
  946.           }
  947.         }
  948.         
  949.         size += requestedSize;
  950.         
  951.         return (in != null);
  952.       }
  953.       
  954.       public synchronized void unreserve(int requestedSize) {
  955.         size -= requestedSize;
  956.         
  957.         synchronized (dataAvailable) {
  958.           fullSize -= requestedSize;
  959.           --numClosed;
  960.         }
  961.         
  962.         // Notify the threads blocked on RamManager.reserve
  963.         notifyAll();
  964.       }
  965.       
  966.       public boolean waitForDataToMerge() throws InterruptedException {
  967.         boolean done = false;
  968.         synchronized (dataAvailable) {
  969.                  // Start in-memory merge if manager has been closed or...
  970.           while (!closed
  971.                  &&
  972.                  // In-memory threshold exceeded and at least two segments
  973.                  // have been fetched
  974.                  (getPercentUsed() < maxInMemCopyPer || numClosed < 2)
  975.                  &&
  976.                  // More than "mapred.inmem.merge.threshold" map outputs
  977.                  // have been fetched into memory
  978.                  (maxInMemOutputs <= 0 || numClosed < maxInMemOutputs)
  979.                  && 
  980.                  // More than MAX... threads are blocked on the RamManager
  981.                  // or the blocked threads are the last map outputs to be
  982.                  // fetched. If numRequiredMapOutputs is zero, either
  983.                  // setNumCopiedMapOutputs has not been called (no map ouputs
  984.                  // have been fetched, so there is nothing to merge) or the
  985.                  // last map outputs being transferred without
  986.                  // contention, so a merge would be premature.
  987.                  (numPendingRequests < 
  988.                       numCopiers*MAX_STALLED_SHUFFLE_THREADS_FRACTION && 
  989.                   (0 == numRequiredMapOutputs ||
  990.                    numPendingRequests < numRequiredMapOutputs))) {
  991.             dataAvailable.wait();
  992.           }
  993.           done = closed;
  994.         }
  995.         return done;
  996.       }
  997.       
  998.       public void closeInMemoryFile(int requestedSize) {
  999.         synchronized (dataAvailable) {
  1000.           fullSize += requestedSize;
  1001.           ++numClosed;
  1002.           dataAvailable.notify();
  1003.         }
  1004.       }
  1005.       
  1006.       public void setNumCopiedMapOutputs(int numRequiredMapOutputs) {
  1007.         synchronized (dataAvailable) {
  1008.           this.numRequiredMapOutputs = numRequiredMapOutputs;
  1009.           dataAvailable.notify();
  1010.         }
  1011.       }
  1012.       
  1013.       public void close() {
  1014.         synchronized (dataAvailable) {
  1015.           closed = true;
  1016.           LOG.info("Closed ram manager");
  1017.           dataAvailable.notify();
  1018.         }
  1019.       }
  1020.       
  1021.       private float getPercentUsed() {
  1022.         return (float)fullSize/maxSize;
  1023.       }
  1024.       int getMemoryLimit() {
  1025.         return maxSize;
  1026.       }
  1027.       
  1028.       boolean canFitInMemory(long requestedSize) {
  1029.         return (requestedSize < Integer.MAX_VALUE && 
  1030.                 requestedSize < maxSingleShuffleLimit);
  1031.       }
  1032.     }
  1033.     /** Copies map outputs as they become available */
  1034.     private class MapOutputCopier extends Thread {
  1035.       // basic/unit connection timeout (in milliseconds)
  1036.       private final static int UNIT_CONNECT_TIMEOUT = 30 * 1000;
  1037.       // default read timeout (in milliseconds)
  1038.       private final static int DEFAULT_READ_TIMEOUT = 3 * 60 * 1000;
  1039.       private MapOutputLocation currentLocation = null;
  1040.       private int id = nextMapOutputCopierId++;
  1041.       private Reporter reporter;
  1042.       
  1043.       // Decompression of map-outputs
  1044.       private CompressionCodec codec = null;
  1045.       private Decompressor decompressor = null;
  1046.       
  1047.       public MapOutputCopier(JobConf job, Reporter reporter) {
  1048.         setName("MapOutputCopier " + reduceTask.getTaskID() + "." + id);
  1049.         LOG.debug(getName() + " created");
  1050.         this.reporter = reporter;
  1051.         
  1052.         if (job.getCompressMapOutput()) {
  1053.           Class<? extends CompressionCodec> codecClass =
  1054.             job.getMapOutputCompressorClass(DefaultCodec.class);
  1055.           codec = ReflectionUtils.newInstance(codecClass, job);
  1056.           decompressor = CodecPool.getDecompressor(codec);
  1057.         }
  1058.       }
  1059.       
  1060.       /**
  1061.        * Fail the current file that we are fetching
  1062.        * @return were we currently fetching?
  1063.        */
  1064.       public synchronized boolean fail() {
  1065.         if (currentLocation != null) {
  1066.           finish(-1);
  1067.           return true;
  1068.         } else {
  1069.           return false;
  1070.         }
  1071.       }
  1072.       
  1073.       /**
  1074.        * Get the current map output location.
  1075.        */
  1076.       public synchronized MapOutputLocation getLocation() {
  1077.         return currentLocation;
  1078.       }
  1079.       
  1080.       private synchronized void start(MapOutputLocation loc) {
  1081.         currentLocation = loc;
  1082.       }
  1083.       
  1084.       private synchronized void finish(long size) {
  1085.         if (currentLocation != null) {
  1086.           LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
  1087.           synchronized (copyResults) {
  1088.             copyResults.add(new CopyResult(currentLocation, size));
  1089.             copyResults.notify();
  1090.           }
  1091.           currentLocation = null;
  1092.         }
  1093.       }
  1094.       
  1095.       /** Loop forever and fetch map outputs as they become available.
  1096.        * The thread exits when it is interrupted by {@link ReduceTaskRunner}
  1097.        */
  1098.       @Override
  1099.       public void run() {
  1100.         while (true) {        
  1101.           try {
  1102.             MapOutputLocation loc = null;
  1103.             long size = -1;
  1104.             
  1105.             synchronized (scheduledCopies) {
  1106.               while (scheduledCopies.isEmpty()) {
  1107.                 scheduledCopies.wait();
  1108.               }
  1109.               loc = scheduledCopies.remove(0);
  1110.             }
  1111.             
  1112.             try {
  1113.               shuffleClientMetrics.threadBusy();
  1114.               start(loc);
  1115.               size = copyOutput(loc);
  1116.               shuffleClientMetrics.successFetch();
  1117.             } catch (IOException e) {
  1118.               LOG.warn(reduceTask.getTaskID() + " copy failed: " +
  1119.                        loc.getTaskAttemptId() + " from " + loc.getHost());
  1120.               LOG.warn(StringUtils.stringifyException(e));
  1121.               shuffleClientMetrics.failedFetch();
  1122.               
  1123.               // Reset 
  1124.               size = -1;
  1125.             } finally {
  1126.               shuffleClientMetrics.threadFree();
  1127.               finish(size);
  1128.             }
  1129.           } catch (InterruptedException e) { 
  1130.             break; // ALL DONE
  1131.           } catch (FSError e) {
  1132.             LOG.error("Task: " + reduceTask.getTaskID() + " - FSError: " + 
  1133.                       StringUtils.stringifyException(e));
  1134.             try {
  1135.               umbilical.fsError(reduceTask.getTaskID(), e.getMessage());
  1136.             } catch (IOException io) {
  1137.               LOG.error("Could not notify TT of FSError: " + 
  1138.                       StringUtils.stringifyException(io));
  1139.             }
  1140.           } catch (Throwable th) {
  1141.             LOG.error("Map output copy failure: " + 
  1142.                       StringUtils.stringifyException(th));
  1143.           }
  1144.         }
  1145.         
  1146.         if (decompressor != null) {
  1147.           CodecPool.returnDecompressor(decompressor);
  1148.         }
  1149.           
  1150.       }
  1151.       
  1152.       /** Copies a a map output from a remote host, via HTTP. 
  1153.        * @param currentLocation the map output location to be copied
  1154.        * @return the path (fully qualified) of the copied file
  1155.        * @throws IOException if there is an error copying the file
  1156.        * @throws InterruptedException if the copier should give up
  1157.        */
  1158.       private long copyOutput(MapOutputLocation loc
  1159.                               ) throws IOException, InterruptedException {
  1160.         // check if we still need to copy the output from this location
  1161.         if (copiedMapOutputs.contains(loc.getTaskId()) || 
  1162.             obsoleteMapIds.contains(loc.getTaskAttemptId())) {
  1163.           return CopyResult.OBSOLETE;
  1164.         } 
  1165.  
  1166.         // a temp filename. If this file gets created in ramfs, we're fine,
  1167.         // else, we will check the localFS to find a suitable final location
  1168.         // for this path
  1169.         TaskAttemptID reduceId = reduceTask.getTaskID();
  1170.         Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
  1171.                                  reduceId.getJobID().toString(),
  1172.                                  reduceId.toString()) 
  1173.                                  + "/map_" +
  1174.                                  loc.getTaskId().getId() + ".out");
  1175.         
  1176.         // Copy the map output to a temp file whose name is unique to this attempt 
  1177.         Path tmpMapOutput = new Path(filename+"-"+id);
  1178.         
  1179.         // Copy the map output
  1180.         MapOutput mapOutput = getMapOutput(loc, tmpMapOutput);
  1181.         if (mapOutput == null) {
  1182.           throw new IOException("Failed to fetch map-output for " + 
  1183.                                 loc.getTaskAttemptId() + " from " + 
  1184.                                 loc.getHost());
  1185.         }
  1186.         
  1187.         // The size of the map-output
  1188.         long bytes = mapOutput.compressedSize;
  1189.         
  1190.         // lock the ReduceTask while we do the rename
  1191.         synchronized (ReduceTask.this) {
  1192.           if (copiedMapOutputs.contains(loc.getTaskId())) {
  1193.             mapOutput.discard();
  1194.             return CopyResult.OBSOLETE;
  1195.           }
  1196.           // Special case: discard empty map-outputs
  1197.           if (bytes == 0) {
  1198.             try {
  1199.               mapOutput.discard();
  1200.             } catch (IOException ioe) {
  1201.               LOG.info("Couldn't discard output of " + loc.getTaskId());
  1202.             }
  1203.             
  1204.             // Note that we successfully copied the map-output
  1205.             noteCopiedMapOutput(loc.getTaskId());
  1206.             
  1207.             return bytes;
  1208.           }
  1209.           
  1210.           // Process map-output
  1211.           if (mapOutput.inMemory) {
  1212.             // Save it in the synchronized list of map-outputs
  1213.             mapOutputsFilesInMemory.add(mapOutput);
  1214.           } else {
  1215.             // Rename the temporary file to the final file; 
  1216.             // ensure it is on the same partition
  1217.             tmpMapOutput = mapOutput.file;
  1218.             filename = new Path(tmpMapOutput.getParent(), filename.getName());
  1219.             if (!localFileSys.rename(tmpMapOutput, filename)) {
  1220.               localFileSys.delete(tmpMapOutput, true);
  1221.               bytes = -1;
  1222.               throw new IOException("Failed to rename map output " + 
  1223.                   tmpMapOutput + " to " + filename);
  1224.             }
  1225.             synchronized (mapOutputFilesOnDisk) {        
  1226.               addToMapOutputFilesOnDisk(localFileSys.getFileStatus(filename));
  1227.             }
  1228.           }
  1229.           // Note that we successfully copied the map-output
  1230.           noteCopiedMapOutput(loc.getTaskId());
  1231.         }
  1232.         
  1233.         return bytes;
  1234.       }
  1235.       
  1236.       /**
  1237.        * Save the map taskid whose output we just copied.
  1238.        * This function assumes that it has been synchronized on ReduceTask.this.
  1239.        * 
  1240.        * @param taskId map taskid
  1241.        */
  1242.       private void noteCopiedMapOutput(TaskID taskId) {
  1243.         copiedMapOutputs.add(taskId);
  1244.         ramManager.setNumCopiedMapOutputs(numMaps - copiedMapOutputs.size());
  1245.       }
  1246.       /**
  1247.        * Get the map output into a local file (either in the inmemory fs or on the 
  1248.        * local fs) from the remote server.
  1249.        * We use the file system so that we generate checksum files on the data.
  1250.        * @param mapOutputLoc map-output to be fetched
  1251.        * @param filename the filename to write the data into
  1252.        * @param connectionTimeout number of milliseconds for connection timeout
  1253.        * @param readTimeout number of milliseconds for read timeout
  1254.        * @return the path of the file that got created
  1255.        * @throws IOException when something goes wrong
  1256.        */
  1257.       private MapOutput getMapOutput(MapOutputLocation mapOutputLoc, 
  1258.                                      Path filename)
  1259.       throws IOException, InterruptedException {
  1260.         // Connect
  1261.         URLConnection connection = 
  1262.           mapOutputLoc.getOutputLocation().openConnection();
  1263.         InputStream input = getInputStream(connection, STALLED_COPY_TIMEOUT,
  1264.                                            DEFAULT_READ_TIMEOUT); 
  1265.         //We will put a file in memory if it meets certain criteria:
  1266.         //1. The size of the (decompressed) file should be less than 25% of 
  1267.         //    the total inmem fs
  1268.         //2. There is space available in the inmem fs
  1269.         long decompressedLength = 
  1270.           Long.parseLong(connection.getHeaderField(RAW_MAP_OUTPUT_LENGTH));  
  1271.         long compressedLength = 
  1272.           Long.parseLong(connection.getHeaderField(MAP_OUTPUT_LENGTH));
  1273.         // Check if this map-output can be saved in-memory
  1274.         boolean shuffleInMemory = ramManager.canFitInMemory(decompressedLength); 
  1275.         // Shuffle
  1276.         MapOutput mapOutput = null;
  1277.         if (shuffleInMemory) { 
  1278.           LOG.info("Shuffling " + decompressedLength + " bytes (" + 
  1279.               compressedLength + " raw bytes) " + 
  1280.               "into RAM from " + mapOutputLoc.getTaskAttemptId());
  1281.           mapOutput = shuffleInMemory(mapOutputLoc, connection, input,
  1282.                                       (int)decompressedLength,
  1283.                                       (int)compressedLength);
  1284.         } else {
  1285.           LOG.info("Shuffling " + decompressedLength + " bytes (" + 
  1286.               compressedLength + " raw bytes) " + 
  1287.               "into Local-FS from " + mapOutputLoc.getTaskAttemptId());
  1288.           mapOutput = shuffleToDisk(mapOutputLoc, input, filename, 
  1289.               compressedLength);
  1290.         }
  1291.             
  1292.         return mapOutput;
  1293.       }
  1294.       /** 
  1295.        * The connection establishment is attempted multiple times and is given up 
  1296.        * only on the last failure. Instead of connecting with a timeout of 
  1297.        * X, we try connecting with a timeout of x < X but multiple times. 
  1298.        */
  1299.       private InputStream getInputStream(URLConnection connection, 
  1300.                                          int connectionTimeout, 
  1301.                                          int readTimeout) 
  1302.       throws IOException {
  1303.         int unit = 0;
  1304.         if (connectionTimeout < 0) {
  1305.           throw new IOException("Invalid timeout "
  1306.                                 + "[timeout = " + connectionTimeout + " ms]");
  1307.         } else if (connectionTimeout > 0) {
  1308.           unit = (UNIT_CONNECT_TIMEOUT > connectionTimeout)
  1309.                  ? connectionTimeout
  1310.                  : UNIT_CONNECT_TIMEOUT;
  1311.         }
  1312.         // set the read timeout to the total timeout
  1313.         connection.setReadTimeout(readTimeout);
  1314.         // set the connect timeout to the unit-connect-timeout
  1315.         connection.setConnectTimeout(unit);
  1316.         while (true) {
  1317.           try {
  1318.             return connection.getInputStream();
  1319.           } catch (IOException ioe) {
  1320.             // update the total remaining connect-timeout
  1321.             connectionTimeout -= unit;
  1322.             // throw an exception if we have waited for timeout amount of time
  1323.             // note that the updated value if timeout is used here
  1324.             if (connectionTimeout == 0) {
  1325.               throw ioe;
  1326.             }
  1327.             // reset the connect timeout for the last try
  1328.             if (connectionTimeout < unit) {
  1329.               unit = connectionTimeout;
  1330.               // reset the connect time out for the final connect
  1331.               connection.setConnectTimeout(unit);
  1332.             }
  1333.           }
  1334.         }
  1335.       }
  1336.       private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
  1337.                                         URLConnection connection, 
  1338.                                         InputStream input,
  1339.                                         int mapOutputLength,
  1340.                                         int compressedLength)
  1341.       throws IOException, InterruptedException {
  1342.         // Reserve ram for the map-output
  1343.         boolean createdNow = ramManager.reserve(mapOutputLength, input);
  1344.       
  1345.         // Reconnect if we need to
  1346.         if (!createdNow) {
  1347.           // Reconnect
  1348.           try {
  1349.             connection = mapOutputLoc.getOutputLocation().openConnection();
  1350.             input = getInputStream(connection, STALLED_COPY_TIMEOUT, 
  1351.                                    DEFAULT_READ_TIMEOUT);
  1352.           } catch (IOException ioe) {
  1353.             LOG.info("Failed reopen connection to fetch map-output from " + 
  1354.                      mapOutputLoc.getHost());
  1355.             
  1356.             // Inform the ram-manager
  1357.             ramManager.closeInMemoryFile(mapOutputLength);
  1358.             ramManager.unreserve(mapOutputLength);
  1359.             
  1360.             throw ioe;
  1361.           }
  1362.         }
  1363.         IFileInputStream checksumIn = 
  1364.           new IFileInputStream(input,compressedLength);
  1365.         input = checksumIn;       
  1366.       
  1367.         // Are map-outputs compressed?
  1368.         if (codec != null) {
  1369.           decompressor.reset();
  1370.           input = codec.createInputStream(input, decompressor);
  1371.         }
  1372.       
  1373.         // Copy map-output into an in-memory buffer
  1374.         byte[] shuffleData = new byte[mapOutputLength];
  1375.         MapOutput mapOutput = 
  1376.           new MapOutput(mapOutputLoc.getTaskId(), 
  1377.                         mapOutputLoc.getTaskAttemptId(), shuffleData, compressedLength);
  1378.         
  1379.         int bytesRead = 0;
  1380.         try {
  1381.           int n = input.read(shuffleData, 0, shuffleData.length);
  1382.           while (n > 0) {
  1383.             bytesRead += n;
  1384.             shuffleClientMetrics.inputBytes(n);
  1385.             // indicate we're making progress
  1386.             reporter.progress();
  1387.             n = input.read(shuffleData, bytesRead, 
  1388.                            (shuffleData.length-bytesRead));
  1389.           }
  1390.           LOG.info("Read " + bytesRead + " bytes from map-output for " +
  1391.                    mapOutputLoc.getTaskAttemptId());
  1392.           input.close();
  1393.         } catch (IOException ioe) {
  1394.           LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
  1395.                    ioe);
  1396.           // Inform the ram-manager
  1397.           ramManager.closeInMemoryFile(mapOutputLength);
  1398.           ramManager.unreserve(mapOutputLength);
  1399.           
  1400.           // Discard the map-output
  1401.           try {
  1402.             mapOutput.discard();
  1403.           } catch (IOException ignored) {
  1404.             LOG.info("Failed to discard map-output from " + 
  1405.                      mapOutputLoc.getTaskAttemptId(), ignored);
  1406.           }
  1407.           mapOutput = null;
  1408.           
  1409.           // Close the streams
  1410.           IOUtils.cleanup(LOG, input);
  1411.           // Re-throw
  1412.           throw ioe;
  1413.         }
  1414.         // Close the in-memory file
  1415.         ramManager.closeInMemoryFile(mapOutputLength);
  1416.         // Sanity check
  1417.         if (bytesRead != mapOutputLength) {
  1418.           // Inform the ram-manager
  1419.           ramManager.unreserve(mapOutputLength);
  1420.           
  1421.           // Discard the map-output
  1422.           try {
  1423.             mapOutput.discard();
  1424.           } catch (IOException ignored) {
  1425.             // IGNORED because we are cleaning up
  1426.             LOG.info("Failed to discard map-output from " + 
  1427.                      mapOutputLoc.getTaskAttemptId(), ignored);
  1428.           }
  1429.           mapOutput = null;
  1430.           throw new IOException("Incomplete map output received for " +
  1431.                                 mapOutputLoc.getTaskAttemptId() + " from " +
  1432.                                 mapOutputLoc.getOutputLocation() + " (" + 
  1433.                                 bytesRead + " instead of " + 
  1434.                                 mapOutputLength + ")"
  1435.           );
  1436.         }
  1437.         // TODO: Remove this after a 'fix' for HADOOP-3647
  1438.         if (mapOutputLength > 0) {
  1439.           DataInputBuffer dib = new DataInputBuffer();
  1440.           dib.reset(shuffleData, 0, shuffleData.length);
  1441.           LOG.info("Rec #1 from " + mapOutputLoc.getTaskAttemptId() + " -> (" + 
  1442.                    WritableUtils.readVInt(dib) + ", " + 
  1443.                    WritableUtils.readVInt(dib) + ") from " + 
  1444.                    mapOutputLoc.getHost());
  1445.         }
  1446.         
  1447.         return mapOutput;
  1448.       }
  1449.       
  1450.       private MapOutput shuffleToDisk(MapOutputLocation mapOutputLoc,
  1451.                                       InputStream input,
  1452.                                       Path filename,
  1453.                                       long mapOutputLength) 
  1454.       throws IOException {
  1455.         // Find out a suitable location for the output on local-filesystem
  1456.         Path localFilename = 
  1457.           lDirAlloc.getLocalPathForWrite(filename.toUri().getPath(), 
  1458.                                          mapOutputLength, conf);
  1459.         MapOutput mapOutput = 
  1460.           new MapOutput(mapOutputLoc.getTaskId(), mapOutputLoc.getTaskAttemptId(), 
  1461.                         conf, localFileSys.makeQualified(localFilename), 
  1462.                         mapOutputLength);
  1463.         // Copy data to local-disk
  1464.         OutputStream output = null;
  1465.         long bytesRead = 0;
  1466.         try {
  1467.           output = rfs.create(localFilename);
  1468.           
  1469.           byte[] buf = new byte[64 * 1024];
  1470.           int n = input.read(buf, 0, buf.length);
  1471.           while (n > 0) {
  1472.             bytesRead += n;
  1473.             shuffleClientMetrics.inputBytes(n);
  1474.             output.write(buf, 0, n);
  1475.             // indicate we're making progress
  1476.             reporter.progress();
  1477.             n = input.read(buf, 0, buf.length);
  1478.           }
  1479.           LOG.info("Read " + bytesRead + " bytes from map-output for " +
  1480.               mapOutputLoc.getTaskAttemptId());
  1481.           output.close();
  1482.           input.close();
  1483.         } catch (IOException ioe) {
  1484.           LOG.info("Failed to shuffle from " + mapOutputLoc.getTaskAttemptId(), 
  1485.                    ioe);
  1486.           // Discard the map-output
  1487.           try {
  1488.             mapOutput.discard();
  1489.           } catch (IOException ignored) {
  1490.             LOG.info("Failed to discard map-output from " + 
  1491.                 mapOutputLoc.getTaskAttemptId(), ignored);
  1492.           }
  1493.           mapOutput = null;
  1494.           // Close the streams
  1495.           IOUtils.cleanup(LOG, input, output);
  1496.           // Re-throw
  1497.           throw ioe;
  1498.         }
  1499.         // Sanity check
  1500.         if (bytesRead != mapOutputLength) {
  1501.           try {
  1502.             mapOutput.discard();
  1503.           } catch (Throwable th) {
  1504.             // IGNORED because we are cleaning up
  1505.             LOG.info("Failed to discard map-output from " + 
  1506.                 mapOutputLoc.getTaskAttemptId(), th);
  1507.           }
  1508.           mapOutput = null;
  1509.           throw new IOException("Incomplete map output received for " +
  1510.                                 mapOutputLoc.getTaskAttemptId() + " from " +
  1511.                                 mapOutputLoc.getOutputLocation() + " (" + 
  1512.                                 bytesRead + " instead of " + 
  1513.                                 mapOutputLength + ")"
  1514.           );
  1515.         }
  1516.         return mapOutput;
  1517.       }
  1518.       
  1519.     } // MapOutputCopier
  1520.     
  1521.     private void configureClasspath(JobConf conf)
  1522.       throws IOException {
  1523.       
  1524.       // get the task and the current classloader which will become the parent
  1525.       Task task = ReduceTask.this;
  1526.       ClassLoader parent = conf.getClassLoader();   
  1527.       
  1528.       // get the work directory which holds the elements we are dynamically
  1529.       // adding to the classpath
  1530.       File workDir = new File(task.getJobFile()).getParentFile();
  1531.       ArrayList<URL> urllist = new ArrayList<URL>();
  1532.       
  1533.       // add the jars and directories to the classpath
  1534.       String jar = conf.getJar();
  1535.       if (jar != null) {      
  1536.         File jobCacheDir = new File(new Path(jar).getParent().toString());
  1537.         File[] libs = new File(jobCacheDir, "lib").listFiles();
  1538.         if (libs != null) {
  1539.           for (int i = 0; i < libs.length; i++) {
  1540.             urllist.add(libs[i].toURL());
  1541.           }
  1542.         }
  1543.         urllist.add(new File(jobCacheDir, "classes").toURL());
  1544.         urllist.add(jobCacheDir.toURL());
  1545.         
  1546.       }
  1547.       urllist.add(workDir.toURL());
  1548.       
  1549.       // create a new classloader with the old classloader as its parent
  1550.       // then set that classloader as the one used by the current jobconf
  1551.       URL[] urls = urllist.toArray(new URL[urllist.size()]);
  1552.       URLClassLoader loader = new URLClassLoader(urls, parent);
  1553.       conf.setClassLoader(loader);
  1554.     }
  1555.     
  1556.     public ReduceCopier(TaskUmbilicalProtocol umbilical, JobConf conf,
  1557.                         TaskReporter reporter
  1558.                         )throws ClassNotFoundException, IOException {
  1559.       
  1560.       configureClasspath(conf);
  1561.       this.reporter = reporter;
  1562.       this.shuffleClientMetrics = new ShuffleClientMetrics(conf);
  1563.       this.umbilical = umbilical;      
  1564.       this.reduceTask = ReduceTask.this;
  1565.       this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
  1566.       this.copyResults = new ArrayList<CopyResult>(100);    
  1567.       this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
  1568.       this.maxInFlight = 4 * numCopiers;
  1569.       this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
  1570.       Counters.Counter combineInputCounter = 
  1571.         reporter.getCounter(Task.Counter.COMBINE_INPUT_RECORDS);
  1572.       this.combinerRunner = CombinerRunner.create(conf, getTaskID(),
  1573.                                                   combineInputCounter,
  1574.                                                   reporter, null);
  1575.       if (combinerRunner != null) {
  1576.         combineCollector = 
  1577.           new CombineOutputCollector(reduceCombineOutputCounter);
  1578.       }
  1579.       
  1580.       this.ioSortFactor = conf.getInt("io.sort.factor", 10);
  1581.       // the exponential backoff formula
  1582.       //    backoff (t) = init * base^(t-1)
  1583.       // so for max retries we get
  1584.       //    backoff(1) + .... + backoff(max_fetch_retries) ~ max
  1585.       // solving which we get
  1586.       //    max_fetch_retries ~ log((max * (base - 1) / init) + 1) / log(base)
  1587.       // for the default value of max = 300 (5min) we get max_fetch_retries = 6
  1588.       // the order is 4,8,16,32,64,128. sum of which is 252 sec = 4.2 min
  1589.       
  1590.       // optimizing for the base 2
  1591.       this.maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
  1592.              getClosestPowerOf2((this.maxBackoff * 1000 / BACKOFF_INIT) + 1));
  1593.       this.maxFailedUniqueFetches = Math.min(numMaps, 
  1594.                                              this.maxFailedUniqueFetches);
  1595.       this.maxInMemOutputs = conf.getInt("mapred.inmem.merge.threshold", 1000);
  1596.       this.maxInMemCopyPer =
  1597.         conf.getFloat("mapred.job.shuffle.merge.percent", 0.66f);
  1598.       final float maxRedPer =
  1599.         conf.getFloat("mapred.job.reduce.input.buffer.percent", 0f);
  1600.       if (maxRedPer > 1.0 || maxRedPer < 0.0) {
  1601.         throw new IOException("mapred.job.reduce.input.buffer.percent" +
  1602.                               maxRedPer);
  1603.       }
  1604.       this.maxInMemReduce = (int)Math.min(
  1605.           Runtime.getRuntime().maxMemory() * maxRedPer, Integer.MAX_VALUE);
  1606.       // Setup the RamManager
  1607.       ramManager = new ShuffleRamManager(conf);
  1608.       localFileSys = FileSystem.getLocal(conf);
  1609.       rfs = ((LocalFileSystem)localFileSys).getRaw();
  1610.       // hosts -> next contact time
  1611.       this.penaltyBox = new LinkedHashMap<String, Long>();
  1612.       
  1613.       // hostnames
  1614.       this.uniqueHosts = new HashSet<String>();
  1615.       
  1616.       // Seed the random number generator with a reasonably globally unique seed
  1617.       long randomSeed = System.nanoTime() + 
  1618.                         (long)Math.pow(this.reduceTask.getPartition(),
  1619.                                        (this.reduceTask.getPartition()%10)
  1620.                                       );
  1621.       this.random = new Random(randomSeed);
  1622.       this.maxMapRuntime = 0;
  1623.     }
  1624.     
  1625.     private boolean busyEnough(int numInFlight) {
  1626.       return numInFlight > maxInFlight;
  1627.     }
  1628.     
  1629.     
  1630.     public boolean fetchOutputs() throws IOException {
  1631.       int totalFailures = 0;
  1632.       int            numInFlight = 0, numCopied = 0;
  1633.       DecimalFormat  mbpsFormat = new DecimalFormat("0.00");
  1634.       final Progress copyPhase = 
  1635.         reduceTask.getProgress().phase();
  1636.       LocalFSMerger localFSMergerThread = null;
  1637.       InMemFSMergeThread inMemFSMergeThread = null;
  1638.       GetMapEventsThread getMapEventsThread = null;
  1639.       
  1640.       for (int i = 0; i < numMaps; i++) {
  1641.         copyPhase.addPhase();       // add sub-phase per file
  1642.       }
  1643.       
  1644.       copiers = new ArrayList<MapOutputCopier>(numCopiers);
  1645.       
  1646.       // start all the copying threads
  1647.       for (int i=0; i < numCopiers; i++) {
  1648.         MapOutputCopier copier = new MapOutputCopier(conf, reporter);
  1649.         copiers.add(copier);
  1650.         copier.start();
  1651.       }
  1652.       
  1653.       //start the on-disk-merge thread
  1654.       localFSMergerThread = new LocalFSMerger((LocalFileSystem)localFileSys);
  1655.       //start the in memory merger thread
  1656.       inMemFSMergeThread = new InMemFSMergeThread();
  1657.       localFSMergerThread.start();
  1658.       inMemFSMergeThread.start();
  1659.       
  1660.       // start the map events thread
  1661.       getMapEventsThread = new GetMapEventsThread();
  1662.       getMapEventsThread.start();
  1663.       
  1664.       // start the clock for bandwidth measurement
  1665.       long startTime = System.currentTimeMillis();
  1666.       long currentTime = startTime;
  1667.       long lastProgressTime = startTime;
  1668.       long lastOutputTime = 0;
  1669.       
  1670.         // loop until we get all required outputs
  1671.         while (copiedMapOutputs.size() < numMaps && mergeThrowable == null) {
  1672.           
  1673.           currentTime = System.currentTimeMillis();
  1674.           boolean logNow = false;
  1675.           if (currentTime - lastOutputTime > MIN_LOG_TIME) {
  1676.             lastOutputTime = currentTime;
  1677.             logNow = true;
  1678.           }
  1679.           if (logNow) {
  1680.             LOG.info(reduceTask.getTaskID() + " Need another " 
  1681.                    + (numMaps - copiedMapOutputs.size()) + " map output(s) "
  1682.                    + "where " + numInFlight + " is already in progress");
  1683.           }
  1684.           // Put the hash entries for the failed fetches.
  1685.           Iterator<MapOutputLocation> locItr = retryFetches.iterator();
  1686.           while (locItr.hasNext()) {
  1687.             MapOutputLocation loc = locItr.next(); 
  1688.             List<MapOutputLocation> locList = 
  1689.               mapLocations.get(loc.getHost());
  1690.             
  1691.             // Check if the list exists. Map output location mapping is cleared 
  1692.             // once the jobtracker restarts and is rebuilt from scratch.
  1693.             // Note that map-output-location mapping will be recreated and hence
  1694.             // we continue with the hope that we might find some locations
  1695.             // from the rebuild map.
  1696.             if (locList != null) {
  1697.               // Add to the beginning of the list so that this map is 
  1698.               //tried again before the others and we can hasten the 
  1699.               //re-execution of this map should there be a problem
  1700.               locList.add(0, loc);
  1701.             }
  1702.           }
  1703.           if (retryFetches.size() > 0) {
  1704.             LOG.info(reduceTask.getTaskID() + ": " +  
  1705.                   "Got " + retryFetches.size() +
  1706.                   " map-outputs from previous failures");
  1707.           }
  1708.           // clear the "failed" fetches hashmap
  1709.           retryFetches.clear();
  1710.           // now walk through the cache and schedule what we can
  1711.           int numScheduled = 0;
  1712.           int numDups = 0;
  1713.           
  1714.           synchronized (scheduledCopies) {
  1715.   
  1716.             // Randomize the map output locations to prevent 
  1717.             // all reduce-tasks swamping the same tasktracker
  1718.             List<String> hostList = new ArrayList<String>();
  1719.             hostList.addAll(mapLocations.keySet()); 
  1720.             
  1721.             Collections.shuffle(hostList, this.random);
  1722.               
  1723.             Iterator<String> hostsItr = hostList.iterator();
  1724.             while (hostsItr.hasNext()) {
  1725.             
  1726.               String host = hostsItr.next();
  1727.               List<MapOutputLocation> knownOutputsByLoc = 
  1728.                 mapLocations.get(host);
  1729.               // Check if the list exists. Map output location mapping is 
  1730.               // cleared once the jobtracker restarts and is rebuilt from 
  1731.               // scratch.
  1732.               // Note that map-output-location mapping will be recreated and 
  1733.               // hence we continue with the hope that we might find some 
  1734.               // locations from the rebuild map and add then for fetching.
  1735.               if (knownOutputsByLoc == null || knownOutputsByLoc.size() == 0) {
  1736.                 continue;
  1737.               }
  1738.               
  1739.               //Identify duplicate hosts here
  1740.               if (uniqueHosts.contains(host)) {
  1741.                  numDups += knownOutputsByLoc.size(); 
  1742.                  continue;
  1743.               }
  1744.               Long penaltyEnd = penaltyBox.get(host);
  1745.               boolean penalized = false;
  1746.             
  1747.               if (penaltyEnd != null) {
  1748.                 if (currentTime < penaltyEnd.longValue()) {
  1749.                   penalized = true;
  1750.                 } else {
  1751.                   penaltyBox.remove(host);
  1752.                 }
  1753.               }
  1754.               
  1755.               if (penalized)
  1756.                 continue;
  1757.               synchronized (knownOutputsByLoc) {
  1758.               
  1759.                 locItr = knownOutputsByLoc.iterator();
  1760.             
  1761.                 while (locItr.hasNext()) {
  1762.               
  1763.                   MapOutputLocation loc = locItr.next();
  1764.               
  1765.                   // Do not schedule fetches from OBSOLETE maps
  1766.                   if (obsoleteMapIds.contains(loc.getTaskAttemptId())) {
  1767.                     locItr.remove();
  1768.                     continue;
  1769.                   }
  1770.                   uniqueHosts.add(host);
  1771.                   scheduledCopies.add(loc);
  1772.                   locItr.remove();  // remove from knownOutputs
  1773.                   numInFlight++; numScheduled++;
  1774.                   break; //we have a map from this host
  1775.                 }
  1776.               }
  1777.             }
  1778.             scheduledCopies.notifyAll();
  1779.           }
  1780.           if (numScheduled > 0 || logNow) {
  1781.             LOG.info(reduceTask.getTaskID() + " Scheduled " + numScheduled +
  1782.                    " outputs (" + penaltyBox.size() +
  1783.                    " slow hosts and" + numDups + " dup hosts)");
  1784.           }
  1785.           if (penaltyBox.size() > 0 && logNow) {
  1786.             LOG.info("Penalized(slow) Hosts: ");
  1787.             for (String host : penaltyBox.keySet()) {
  1788.               LOG.info(host + " Will be considered after: " + 
  1789.                   ((penaltyBox.get(host) - currentTime)/1000) + " seconds.");
  1790.             }
  1791.           }
  1792.           // if we have no copies in flight and we can't schedule anything
  1793.           // new, just wait for a bit
  1794.           try {
  1795.             if (numInFlight == 0 && numScheduled == 0) {
  1796.               // we should indicate progress as we don't want TT to think
  1797.               // we're stuck and kill us
  1798.               reporter.progress();
  1799.               Thread.sleep(5000);
  1800.             }
  1801.           } catch (InterruptedException e) { } // IGNORE
  1802.           
  1803.           while (numInFlight > 0 && mergeThrowable == null) {
  1804.             LOG.debug(reduceTask.getTaskID() + " numInFlight = " + 
  1805.                       numInFlight);
  1806.             //the call to getCopyResult will either 
  1807.             //1) return immediately with a null or a valid CopyResult object,
  1808.             //                 or
  1809.             //2) if the numInFlight is above maxInFlight, return with a 
  1810.             //   CopyResult object after getting a notification from a 
  1811.             //   fetcher thread, 
  1812.             //So, when getCopyResult returns null, we can be sure that
  1813.             //we aren't busy enough and we should go and get more mapcompletion
  1814.             //events from the tasktracker
  1815.             CopyResult cr = getCopyResult(numInFlight);
  1816.             if (cr == null) {
  1817.               break;
  1818.             }
  1819.             
  1820.             if (cr.getSuccess()) {  // a successful copy
  1821.               numCopied++;
  1822.               lastProgressTime = System.currentTimeMillis();
  1823.               reduceShuffleBytes.increment(cr.getSize());
  1824.                 
  1825.               long secsSinceStart = 
  1826.                 (System.currentTimeMillis()-startTime)/1000+1;
  1827.               float mbs = ((float)reduceShuffleBytes.getCounter())/(1024*1024);
  1828.               float transferRate = mbs/secsSinceStart;
  1829.                 
  1830.               copyPhase.startNextPhase();
  1831.               copyPhase.setStatus("copy (" + numCopied + " of " + numMaps 
  1832.                                   + " at " +
  1833.                                   mbpsFormat.format(transferRate) +  " MB/s)");
  1834.                 
  1835.               // Note successful fetch for this mapId to invalidate
  1836.               // (possibly) old fetch-failures
  1837.               fetchFailedMaps.remove(cr.getLocation().getTaskId());
  1838.             } else if (cr.isObsolete()) {
  1839.               //ignore
  1840.               LOG.info(reduceTask.getTaskID() + 
  1841.                        " Ignoring obsolete copy result for Map Task: " + 
  1842.                        cr.getLocation().getTaskAttemptId() + " from host: " + 
  1843.                        cr.getHost());
  1844.             } else {
  1845.               retryFetches.add(cr.getLocation());
  1846.               
  1847.               // note the failed-fetch
  1848.               TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
  1849.               TaskID mapId = cr.getLocation().getTaskId();
  1850.               
  1851.               totalFailures++;
  1852.               Integer noFailedFetches = 
  1853.                 mapTaskToFailedFetchesMap.get(mapTaskId);
  1854.               noFailedFetches = 
  1855.                 (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
  1856.               mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
  1857.               LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
  1858.                        noFailedFetches + " from " + mapTaskId);
  1859.               
  1860.               // did the fetch fail too many times?
  1861.               // using a hybrid technique for notifying the jobtracker.
  1862.               //   a. the first notification is sent after max-retries 
  1863.               //   b. subsequent notifications are sent after 2 retries.   
  1864.               if ((noFailedFetches >= maxFetchRetriesPerMap) 
  1865.                   && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
  1866.                 synchronized (ReduceTask.this) {
  1867.                   taskStatus.addFetchFailedMap(mapTaskId);
  1868.                   LOG.info("Failed to fetch map-output from " + mapTaskId + 
  1869.                            " even after MAX_FETCH_RETRIES_PER_MAP retries... "
  1870.                            + " reporting to the JobTracker");
  1871.                 }
  1872.               }
  1873.               // note unique failed-fetch maps
  1874.               if (noFailedFetches == maxFetchRetriesPerMap) {
  1875.                 fetchFailedMaps.add(mapId);
  1876.                   
  1877.                 // did we have too many unique failed-fetch maps?
  1878.                 // and did we fail on too many fetch attempts?
  1879.                 // and did we progress enough
  1880.                 //     or did we wait for too long without any progress?
  1881.                
  1882.                 // check if the reducer is healthy
  1883.                 boolean reducerHealthy = 
  1884.                     (((float)totalFailures / (totalFailures + numCopied)) 
  1885.                      < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
  1886.                 
  1887.                 // check if the reducer has progressed enough
  1888.                 boolean reducerProgressedEnough = 
  1889.                     (((float)numCopied / numMaps) 
  1890.                      >= MIN_REQUIRED_PROGRESS_PERCENT);
  1891.                 
  1892.                 // check if the reducer is stalled for a long time
  1893.                 // duration for which the reducer is stalled
  1894.                 int stallDuration = 
  1895.                     (int)(System.currentTimeMillis() - lastProgressTime);
  1896.                 // duration for which the reducer ran with progress
  1897.                 int shuffleProgressDuration = 
  1898.                     (int)(lastProgressTime - startTime);
  1899.                 // min time the reducer should run without getting killed
  1900.                 int minShuffleRunDuration = 
  1901.                     (shuffleProgressDuration > maxMapRuntime) 
  1902.                     ? shuffleProgressDuration 
  1903.                     : maxMapRuntime;
  1904.                 boolean reducerStalled = 
  1905.                     (((float)stallDuration / minShuffleRunDuration) 
  1906.                      >= MAX_ALLOWED_STALL_TIME_PERCENT);
  1907.                 
  1908.                 // kill if not healthy and has insufficient progress
  1909.                 if ((fetchFailedMaps.size() >= maxFailedUniqueFetches ||
  1910.                      fetchFailedMaps.size() == (numMaps - copiedMapOutputs.size()))
  1911.                     && !reducerHealthy 
  1912.                     && (!reducerProgressedEnough || reducerStalled)) { 
  1913.                   LOG.fatal("Shuffle failed with too many fetch failures " + 
  1914.                             "and insufficient progress!" +
  1915.                             "Killing task " + getTaskID() + ".");
  1916.                   umbilical.shuffleError(getTaskID(), 
  1917.                                          "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
  1918.                                          + " bailing-out.");
  1919.                 }
  1920.               }
  1921.                 
  1922.               // back off exponentially until num_retries <= max_retries
  1923.               // back off by max_backoff/2 on subsequent failed attempts
  1924.               currentTime = System.currentTimeMillis();
  1925.               int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
  1926.                                    ? BACKOFF_INIT 
  1927.                                      * (1 << (noFailedFetches - 1)) 
  1928.                                    : (this.maxBackoff * 1000 / 2);
  1929.               penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
  1930.               LOG.warn(reduceTask.getTaskID() + " adding host " +
  1931.                        cr.getHost() + " to penalty box, next contact in " +
  1932.                        (currentBackOff/1000) + " seconds");
  1933.             }
  1934.             uniqueHosts.remove(cr.getHost());
  1935.             numInFlight--;
  1936.           }
  1937.         }
  1938.         
  1939.         // all done, inform the copiers to exit
  1940.         exitGetMapEvents= true;
  1941.         try {
  1942.           getMapEventsThread.join();
  1943.           LOG.info("getMapsEventsThread joined.");
  1944.         } catch (Throwable t) {
  1945.           LOG.info("getMapsEventsThread threw an exception: " +
  1946.               StringUtils.stringifyException(t));
  1947.         }
  1948.         synchronized (copiers) {
  1949.           synchronized (scheduledCopies) {
  1950.             for (MapOutputCopier copier : copiers) {
  1951.               copier.interrupt();
  1952.             }
  1953.             copiers.clear();
  1954.           }
  1955.         }
  1956.         
  1957.         // copiers are done, exit and notify the waiting merge threads
  1958.         synchronized (mapOutputFilesOnDisk) {
  1959.           exitLocalFSMerge = true;
  1960.           mapOutputFilesOnDisk.notify();
  1961.         }
  1962.         
  1963.         ramManager.close();
  1964.         
  1965.         //Do a merge of in-memory files (if there are any)
  1966.         if (mergeThrowable == null) {
  1967.           try {
  1968.             // Wait for the on-disk merge to complete
  1969.             localFSMergerThread.join();
  1970.             LOG.info("Interleaved on-disk merge complete: " + 
  1971.                      mapOutputFilesOnDisk.size() + " files left.");
  1972.             
  1973.             //wait for an ongoing merge (if it is in flight) to complete
  1974.             inMemFSMergeThread.join();
  1975.             LOG.info("In-memory merge complete: " + 
  1976.                      mapOutputsFilesInMemory.size() + " files left.");
  1977.             } catch (Throwable t) {
  1978.             LOG.warn(reduceTask.getTaskID() +
  1979.                      " Final merge of the inmemory files threw an exception: " + 
  1980.                      StringUtils.stringifyException(t));
  1981.             // check if the last merge generated an error
  1982.             if (mergeThrowable != null) {
  1983.               mergeThrowable = t;
  1984.             }
  1985.             return false;
  1986.           }
  1987.         }
  1988.         return mergeThrowable == null && copiedMapOutputs.size() == numMaps;
  1989.     }
  1990.     
  1991.     private long createInMemorySegments(
  1992.         List<Segment<K, V>> inMemorySegments, long leaveBytes)
  1993.         throws IOException {
  1994.       long totalSize = 0L;
  1995.       synchronized (mapOutputsFilesInMemory) {
  1996.         // fullSize could come from the RamManager, but files can be
  1997.         // closed but not yet present in mapOutputsFilesInMemory
  1998.         long fullSize = 0L;
  1999.         for (MapOutput mo : mapOutputsFilesInMemory) {
  2000.           fullSize += mo.data.length;
  2001.         }
  2002.         while(fullSize > leaveBytes) {
  2003.           MapOutput mo = mapOutputsFilesInMemory.remove(0);
  2004.           totalSize += mo.data.length;
  2005.           fullSize -= mo.data.length;
  2006.           Reader<K, V> reader = 
  2007.             new InMemoryReader<K, V>(ramManager, mo.mapAttemptId,
  2008.                                      mo.data, 0, mo.data.length);
  2009.           Segment<K, V> segment = 
  2010.             new Segment<K, V>(reader, true);
  2011.           inMemorySegments.add(segment);
  2012.         }
  2013.       }
  2014.       return totalSize;
  2015.     }
  2016.     /**
  2017.      * Create a RawKeyValueIterator from copied map outputs. All copying
  2018.      * threads have exited, so all of the map outputs are available either in
  2019.      * memory or on disk. We also know that no merges are in progress, so
  2020.      * synchronization is more lax, here.
  2021.      *
  2022.      * The iterator returned must satisfy the following constraints:
  2023.      *   1. Fewer than io.sort.factor files may be sources
  2024.      *   2. No more than maxInMemReduce bytes of map outputs may be resident
  2025.      *      in memory when the reduce begins
  2026.      *
  2027.      * If we must perform an intermediate merge to satisfy (1), then we can
  2028.      * keep the excluded outputs from (2) in memory and include them in the
  2029.      * first merge pass. If not, then said outputs must be written to disk
  2030.      * first.
  2031.      */
  2032.     @SuppressWarnings("unchecked")
  2033.     private RawKeyValueIterator createKVIterator(
  2034.         JobConf job, FileSystem fs, Reporter reporter) throws IOException {
  2035.       // merge config params
  2036.       Class<K> keyClass = (Class<K>)job.getMapOutputKeyClass();
  2037.       Class<V> valueClass = (Class<V>)job.getMapOutputValueClass();
  2038.       boolean keepInputs = job.getKeepFailedTaskFiles();
  2039.       final Path tmpDir = new Path(getTaskID().toString());
  2040.       final RawComparator<K> comparator =
  2041.         (RawComparator<K>)job.getOutputKeyComparator();
  2042.       // segments required to vacate memory
  2043.       List<Segment<K,V>> memDiskSegments = new ArrayList<Segment<K,V>>();
  2044.       long inMemToDiskBytes = 0;
  2045.       if (mapOutputsFilesInMemory.size() > 0) {
  2046.         TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
  2047.         inMemToDiskBytes = createInMemorySegments(memDiskSegments,
  2048.             maxInMemReduce);
  2049.         final int numMemDiskSegments = memDiskSegments.size();
  2050.         if (numMemDiskSegments > 0 &&
  2051.               ioSortFactor > mapOutputFilesOnDisk.size()) {
  2052.           // must spill to disk, but can't retain in-mem for intermediate merge
  2053.           final Path outputPath = mapOutputFile.getInputFileForWrite(mapId,
  2054.                             reduceTask.getTaskID(), inMemToDiskBytes);
  2055.           final RawKeyValueIterator rIter = Merger.merge(job, fs,
  2056.               keyClass, valueClass, memDiskSegments, numMemDiskSegments,
  2057.               tmpDir, comparator, reporter, spilledRecordsCounter, null);
  2058.           final Writer writer = new Writer(job, fs, outputPath,
  2059.               keyClass, valueClass, codec, null);
  2060.           try {
  2061.             Merger.writeFile(rIter, writer, reporter, job);
  2062.             addToMapOutputFilesOnDisk(fs.getFileStatus(outputPath));
  2063.           } catch (Exception e) {
  2064.             if (null != outputPath) {
  2065.               fs.delete(outputPath, true);
  2066.             }
  2067.             throw new IOException("Final merge failed", e);
  2068.           } finally {
  2069.             if (null != writer) {
  2070.               writer.close();
  2071.             }
  2072.           }
  2073.           LOG.info("Merged " + numMemDiskSegments + " segments, " +
  2074.                    inMemToDiskBytes + " bytes to disk to satisfy " +
  2075.                    "reduce memory limit");
  2076.           inMemToDiskBytes = 0;
  2077.           memDiskSegments.clear();
  2078.         } else if (inMemToDiskBytes != 0) {
  2079.           LOG.info("Keeping " + numMemDiskSegments + " segments, " +
  2080.                    inMemToDiskBytes + " bytes in memory for " +
  2081.                    "intermediate, on-disk merge");
  2082.         }
  2083.       }
  2084.       // segments on disk
  2085.       List<Segment<K,V>> diskSegments = new ArrayList<Segment<K,V>>();
  2086.       long onDiskBytes = inMemToDiskBytes;
  2087.       Path[] onDisk = getMapFiles(fs, false);
  2088.       for (Path file : onDisk) {
  2089.         onDiskBytes += fs.getFileStatus(file).getLen();
  2090.         diskSegments.add(new Segment<K, V>(job, fs, file, codec, keepInputs));
  2091.       }
  2092.       LOG.info("Merging " + onDisk.length + " files, " +
  2093.                onDiskBytes + " bytes from disk");
  2094.       Collections.sort(diskSegments, new Comparator<Segment<K,V>>() {
  2095.         public int compare(Segment<K, V> o1, Segment<K, V> o2) {
  2096.           if (o1.getLength() == o2.getLength()) {
  2097.             return 0;
  2098.           }
  2099.           return o1.getLength() < o2.getLength() ? -1 : 1;
  2100.         }
  2101.       });
  2102.       // build final list of segments from merged backed by disk + in-mem
  2103.       List<Segment<K,V>> finalSegments = new ArrayList<Segment<K,V>>();
  2104.       long inMemBytes = createInMemorySegments(finalSegments, 0);
  2105.       LOG.info("Merging " + finalSegments.size() + " segments, " +
  2106.                inMemBytes + " bytes from memory into reduce");
  2107.       if (0 != onDiskBytes) {
  2108.         final int numInMemSegments = memDiskSegments.size();
  2109.         diskSegments.addAll(0, memDiskSegments);
  2110.         memDiskSegments.clear();
  2111.         RawKeyValueIterator diskMerge = Merger.merge(
  2112.             job, fs, keyClass, valueClass, diskSegments,
  2113.             ioSortFactor, numInMemSegments, tmpDir, comparator,
  2114.             reporter, false, spilledRecordsCounter, null);
  2115.         diskSegments.clear();
  2116.         if (0 == finalSegments.size()) {
  2117.           return diskMerge;
  2118.         }
  2119.         finalSegments.add(new Segment<K,V>(
  2120.               new RawKVIteratorReader(diskMerge, onDiskBytes), true));
  2121.       }
  2122.       return Merger.merge(job, fs, keyClass, valueClass,
  2123.                    finalSegments, finalSegments.size(), tmpDir,
  2124.                    comparator, reporter, spilledRecordsCounter, null);
  2125.     }
  2126.     class RawKVIteratorReader extends IFile.Reader<K,V> {
  2127.       private final RawKeyValueIterator kvIter;
  2128.       public RawKVIteratorReader(RawKeyValueIterator kvIter, long size)
  2129.           throws IOException {
  2130.         super(null, null, size, null, spilledRecordsCounter);
  2131.         this.kvIter = kvIter;
  2132.       }
  2133.       public boolean next(DataInputBuffer key, DataInputBuffer value)
  2134.           throws IOException {
  2135.         if (kvIter.next()) {
  2136.           final DataInputBuffer kb = kvIter.getKey();
  2137.           final DataInputBuffer vb = kvIter.getValue();
  2138.           final int kp = kb.getPosition();
  2139.           final int klen = kb.getLength() - kp;
  2140.           key.reset(kb.getData(), kp, klen);
  2141.           final int vp = vb.getPosition();
  2142.           final int vlen = vb.getLength() - vp;
  2143.           value.reset(vb.getData(), vp, vlen);
  2144.           bytesRead += klen + vlen;
  2145.           return true;
  2146.         }
  2147.         return false;
  2148.       }
  2149.       public long getPosition() throws IOException {
  2150.         return bytesRead;
  2151.       }
  2152.       public void close() throws IOException {
  2153.         kvIter.close();
  2154.       }
  2155.     }
  2156.     private CopyResult getCopyResult(int numInFlight) {  
  2157.       synchronized (copyResults) {
  2158.         while (copyResults.isEmpty()) {
  2159.           try {
  2160.             //The idea is that if we have scheduled enough, we can wait until
  2161.             //we hear from one of the copiers.
  2162.             if (busyEnough(numInFlight)) {
  2163.               copyResults.wait();
  2164.             } else {
  2165.               return null;
  2166.             }
  2167.           } catch (InterruptedException e) { }
  2168.         }
  2169.         return copyResults.remove(0);
  2170.       }    
  2171.     }
  2172.     
  2173.     private void addToMapOutputFilesOnDisk(FileStatus status) {
  2174.       synchronized (mapOutputFilesOnDisk) {
  2175.         mapOutputFilesOnDisk.add(status);
  2176.         mapOutputFilesOnDisk.notify();
  2177.       }
  2178.     }
  2179.     
  2180.     
  2181.     
  2182.     /** Starts merging the local copy (on disk) of the map's output so that
  2183.      * most of the reducer's input is sorted i.e overlapping shuffle
  2184.      * and merge phases.
  2185.      */
  2186.     private class LocalFSMerger extends Thread {
  2187.       private LocalFileSystem localFileSys;
  2188.       public LocalFSMerger(LocalFileSystem fs) {
  2189.         this.localFileSys = fs;
  2190.         setName("Thread for merging on-disk files");
  2191.         setDaemon(true);
  2192.       }
  2193.       @SuppressWarnings("unchecked")
  2194.       public void run() {
  2195.         try {
  2196.           LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
  2197.           while(!exitLocalFSMerge){
  2198.             synchronized (mapOutputFilesOnDisk) {
  2199.               while (!exitLocalFSMerge &&
  2200.                   mapOutputFilesOnDisk.size() < (2 * ioSortFactor - 1)) {
  2201.                 LOG.info(reduceTask.getTaskID() + " Thread waiting: " + getName());
  2202.                 mapOutputFilesOnDisk.wait();
  2203.               }
  2204.             }
  2205.             if(exitLocalFSMerge) {//to avoid running one extra time in the end
  2206.               break;
  2207.             }
  2208.             List<Path> mapFiles = new ArrayList<Path>();
  2209.             long approxOutputSize = 0;
  2210.             int bytesPerSum = 
  2211.               reduceTask.getConf().getInt("io.bytes.per.checksum", 512);
  2212.             LOG.info(reduceTask.getTaskID() + "We have  " + 
  2213.                 mapOutputFilesOnDisk.size() + " map outputs on disk. " +
  2214.                 "Triggering merge of " + ioSortFactor + " files");
  2215.             // 1. Prepare the list of files to be merged. This list is prepared
  2216.             // using a list of map output files on disk. Currently we merge
  2217.             // io.sort.factor files into 1.
  2218.             synchronized (mapOutputFilesOnDisk) {
  2219.               for (int i = 0; i < ioSortFactor; ++i) {
  2220.                 FileStatus filestatus = mapOutputFilesOnDisk.first();
  2221.                 mapOutputFilesOnDisk.remove(filestatus);
  2222.                 mapFiles.add(filestatus.getPath());
  2223.                 approxOutputSize += filestatus.getLen();
  2224.               }
  2225.             }
  2226.             
  2227.             // sanity check
  2228.             if (mapFiles.size() == 0) {
  2229.                 return;
  2230.             }
  2231.             
  2232.             // add the checksum length
  2233.             approxOutputSize += ChecksumFileSystem
  2234.                                 .getChecksumLength(approxOutputSize,
  2235.                                                    bytesPerSum);
  2236.   
  2237.             // 2. Start the on-disk merge process
  2238.             Path outputPath = 
  2239.               lDirAlloc.getLocalPathForWrite(mapFiles.get(0).toString(), 
  2240.                                              approxOutputSize, conf)
  2241.               .suffix(".merged");
  2242.             Writer writer = 
  2243.               new Writer(conf,rfs, outputPath, 
  2244.                          conf.getMapOutputKeyClass(), 
  2245.                          conf.getMapOutputValueClass(),
  2246.                          codec, null);
  2247.             RawKeyValueIterator iter  = null;
  2248.             Path tmpDir = new Path(reduceTask.getTaskID().toString());
  2249.             try {
  2250.               iter = Merger.merge(conf, rfs,
  2251.                                   conf.getMapOutputKeyClass(),
  2252.                                   conf.getMapOutputValueClass(),
  2253.                                   codec, mapFiles.toArray(new Path[mapFiles.size()]), 
  2254.                                   true, ioSortFactor, tmpDir, 
  2255.                                   conf.getOutputKeyComparator(), reporter,
  2256.                                   spilledRecordsCounter, null);
  2257.               
  2258.               Merger.writeFile(iter, writer, reporter, conf);
  2259.               writer.close();
  2260.             } catch (Exception e) {
  2261.               localFileSys.delete(outputPath, true);
  2262.               throw new IOException (StringUtils.stringifyException(e));
  2263.             }
  2264.             
  2265.             synchronized (mapOutputFilesOnDisk) {
  2266.               addToMapOutputFilesOnDisk(localFileSys.getFileStatus(outputPath));
  2267.             }
  2268.             
  2269.             LOG.info(reduceTask.getTaskID() +
  2270.                      " Finished merging " + mapFiles.size() + 
  2271.                      " map output files on disk of total-size " + 
  2272.                      approxOutputSize + "." + 
  2273.                      " Local output file is " + outputPath + " of size " +
  2274.                      localFileSys.getFileStatus(outputPath).getLen());
  2275.             }
  2276.         } catch (Throwable t) {
  2277.           LOG.warn(reduceTask.getTaskID()
  2278.                    + " Merging of the local FS files threw an exception: "
  2279.                    + StringUtils.stringifyException(t));
  2280.           if (mergeThrowable == null) {
  2281.             mergeThrowable = t;
  2282.           }
  2283.         } 
  2284.       }
  2285.     }
  2286.     private class InMemFSMergeThread extends Thread {
  2287.       
  2288.       public InMemFSMergeThread() {
  2289.         setName("Thread for merging in memory files");
  2290.         setDaemon(true);
  2291.       }
  2292.       
  2293.       public void run() {
  2294.         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
  2295.         try {
  2296.           boolean exit = false;
  2297.           do {
  2298.             exit = ramManager.waitForDataToMerge();
  2299.             if (!exit) {
  2300.               doInMemMerge();
  2301.             }
  2302.           } while (!exit);
  2303.         } catch (Throwable t) {
  2304.           LOG.warn(reduceTask.getTaskID() +
  2305.                    " Merge of the inmemory files threw an exception: "
  2306.                    + StringUtils.stringifyException(t));
  2307.           ReduceCopier.this.mergeThrowable = t;
  2308.         }
  2309.       }
  2310.       
  2311.       @SuppressWarnings("unchecked")
  2312.       private void doInMemMerge() throws IOException{
  2313.         if (mapOutputsFilesInMemory.size() == 0) {
  2314.           return;
  2315.         }
  2316.         
  2317.         //name this output file same as the name of the first file that is 
  2318.         //there in the current list of inmem files (this is guaranteed to
  2319.         //be absent on the disk currently. So we don't overwrite a prev. 
  2320.         //created spill). Also we need to create the output file now since
  2321.         //it is not guaranteed that this file will be present after merge
  2322.         //is called (we delete empty files as soon as we see them
  2323.         //in the merge method)
  2324.         //figure out the mapId 
  2325.         TaskID mapId = mapOutputsFilesInMemory.get(0).mapId;
  2326.         List<Segment<K, V>> inMemorySegments = new ArrayList<Segment<K,V>>();
  2327.         long mergeOutputSize = createInMemorySegments(inMemorySegments, 0);
  2328.         int noInMemorySegments = inMemorySegments.size();
  2329.         Path outputPath = mapOutputFile.getInputFileForWrite(mapId, 
  2330.                           reduceTask.getTaskID(), mergeOutputSize);
  2331.         Writer writer = 
  2332.           new Writer(conf, rfs, outputPath,
  2333.                      conf.getMapOutputKeyClass(),
  2334.                      conf.getMapOutputValueClass(),
  2335.                      codec, null);
  2336.         RawKeyValueIterator rIter = null;
  2337.         try {
  2338.           LOG.info("Initiating in-memory merge with " + noInMemorySegments + 
  2339.                    " segments...");
  2340.           
  2341.           rIter = Merger.merge(conf, rfs,
  2342.                                (Class<K>)conf.getMapOutputKeyClass(),
  2343.                                (Class<V>)conf.getMapOutputValueClass(),
  2344.                                inMemorySegments, inMemorySegments.size(),
  2345.                                new Path(reduceTask.getTaskID().toString()),
  2346.                                conf.getOutputKeyComparator(), reporter,
  2347.                                spilledRecordsCounter, null);
  2348.           
  2349.           if (combinerRunner == null) {
  2350.             Merger.writeFile(rIter, writer, reporter, conf);
  2351.           } else {
  2352.             combineCollector.setWriter(writer);
  2353.             combinerRunner.combine(rIter, combineCollector);
  2354.           }
  2355.           writer.close();
  2356.           LOG.info(reduceTask.getTaskID() + 
  2357.               " Merge of the " + noInMemorySegments +
  2358.               " files in-memory complete." +
  2359.               " Local file is " + outputPath + " of size " + 
  2360.               localFileSys.getFileStatus(outputPath).getLen());
  2361.         } catch (Exception e) { 
  2362.           //make sure that we delete the ondisk file that we created 
  2363.           //earlier when we invoked cloneFileAttributes
  2364.           localFileSys.delete(outputPath, true);
  2365.           throw (IOException)new IOException
  2366.                   ("Intermediate merge failed").initCause(e);
  2367.         }
  2368.         // Note the output of the merge
  2369.         FileStatus status = localFileSys.getFileStatus(outputPath);
  2370.         synchronized (mapOutputFilesOnDisk) {
  2371.           addToMapOutputFilesOnDisk(status);
  2372.         }
  2373.       }
  2374.     }
  2375.     private class GetMapEventsThread extends Thread {
  2376.       
  2377.       private IntWritable fromEventId = new IntWritable(0);
  2378.       private static final long SLEEP_TIME = 1000;
  2379.       
  2380.       public GetMapEventsThread() {
  2381.         setName("Thread for polling Map Completion Events");
  2382.         setDaemon(true);
  2383.       }
  2384.       
  2385.       @Override
  2386.       public void run() {
  2387.       
  2388.         LOG.info(reduceTask.getTaskID() + " Thread started: " + getName());
  2389.         
  2390.         do {
  2391.           try {
  2392.             int numNewMaps = getMapCompletionEvents();
  2393.             if (numNewMaps > 0) {
  2394.               LOG.info(reduceTask.getTaskID() + ": " +  
  2395.                   "Got " + numNewMaps + " new map-outputs"); 
  2396.             }
  2397.             Thread.sleep(SLEEP_TIME);
  2398.           } 
  2399.           catch (InterruptedException e) {
  2400.             LOG.warn(reduceTask.getTaskID() +
  2401.                 " GetMapEventsThread returning after an " +
  2402.                 " interrupted exception");
  2403.             return;
  2404.           }
  2405.           catch (Throwable t) {
  2406.             LOG.warn(reduceTask.getTaskID() +
  2407.                 " GetMapEventsThread Ignoring exception : " +
  2408.                 StringUtils.stringifyException(t));
  2409.           }
  2410.         } while (!exitGetMapEvents);
  2411.         LOG.info("GetMapEventsThread exiting");
  2412.       
  2413.       }
  2414.       
  2415.       /** 
  2416.        * Queries the {@link TaskTracker} for a set of map-completion events 
  2417.        * from a given event ID.
  2418.        * @throws IOException
  2419.        */  
  2420.       private int getMapCompletionEvents() throws IOException {
  2421.         
  2422.         int numNewMaps = 0;
  2423.         
  2424.         MapTaskCompletionEventsUpdate update = 
  2425.           umbilical.getMapCompletionEvents(reduceTask.getJobID(), 
  2426.                                            fromEventId.get(), 
  2427.                                            MAX_EVENTS_TO_FETCH,
  2428.                                            reduceTask.getTaskID());
  2429.         TaskCompletionEvent events[] = update.getMapTaskCompletionEvents();
  2430.           
  2431.         // Check if the reset is required.
  2432.         // Since there is no ordering of the task completion events at the 
  2433.         // reducer, the only option to sync with the new jobtracker is to reset 
  2434.         // the events index
  2435.         if (update.shouldReset()) {
  2436.           fromEventId.set(0);
  2437.           obsoleteMapIds.clear(); // clear the obsolete map
  2438.           mapLocations.clear(); // clear the map locations mapping
  2439.         }
  2440.         
  2441.         // Update the last seen event ID
  2442.         fromEventId.set(fromEventId.get() + events.length);
  2443.         
  2444.         // Process the TaskCompletionEvents:
  2445.         // 1. Save the SUCCEEDED maps in knownOutputs to fetch the outputs.
  2446.         // 2. Save the OBSOLETE/FAILED/KILLED maps in obsoleteOutputs to stop 
  2447.         //    fetching from those maps.
  2448.         // 3. Remove TIPFAILED maps from neededOutputs since we don't need their
  2449.         //    outputs at all.
  2450.         for (TaskCompletionEvent event : events) {
  2451.           switch (event.getTaskStatus()) {
  2452.             case SUCCEEDED:
  2453.             {
  2454.               URI u = URI.create(event.getTaskTrackerHttp());
  2455.               String host = u.getHost();
  2456.               TaskAttemptID taskId = event.getTaskAttemptId();
  2457.               int duration = event.getTaskRunTime();
  2458.               if (duration > maxMapRuntime) {
  2459.                 maxMapRuntime = duration; 
  2460.                 // adjust max-fetch-retries based on max-map-run-time
  2461.                 maxFetchRetriesPerMap = Math.max(MIN_FETCH_RETRIES_PER_MAP, 
  2462.                   getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1));
  2463.               }
  2464.               URL mapOutputLocation = new URL(event.getTaskTrackerHttp() + 
  2465.                                       "/mapOutput?job=" + taskId.getJobID() +
  2466.                                       "&map=" + taskId + 
  2467.                                       "&reduce=" + getPartition());
  2468.               List<MapOutputLocation> loc = mapLocations.get(host);
  2469.               if (loc == null) {
  2470.                 loc = Collections.synchronizedList
  2471.                   (new LinkedList<MapOutputLocation>());
  2472.                 mapLocations.put(host, loc);
  2473.                }
  2474.               loc.add(new MapOutputLocation(taskId, host, mapOutputLocation));
  2475.               numNewMaps ++;
  2476.             }
  2477.             break;
  2478.             case FAILED:
  2479.             case KILLED:
  2480.             case OBSOLETE:
  2481.             {
  2482.               obsoleteMapIds.add(event.getTaskAttemptId());
  2483.               LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 
  2484.                        " map-task: '" + event.getTaskAttemptId() + "'");
  2485.             }
  2486.             break;
  2487.             case TIPFAILED:
  2488.             {
  2489.               copiedMapOutputs.add(event.getTaskAttemptId().getTaskID());
  2490.               LOG.info("Ignoring output of failed map TIP: '" +  
  2491.                    event.getTaskAttemptId() + "'");
  2492.             }
  2493.             break;
  2494.           }
  2495.         }
  2496.         return numNewMaps;
  2497.       }
  2498.     }
  2499.   }
  2500.   /**
  2501.    * Return the exponent of the power of two closest to the given
  2502.    * positive value, or zero if value leq 0.
  2503.    * This follows the observation that the msb of a given value is
  2504.    * also the closest power of two, unless the bit following it is
  2505.    * set.
  2506.    */
  2507.   private static int getClosestPowerOf2(int value) {
  2508.     if (value <= 0)
  2509.       throw new IllegalArgumentException("Undefined for " + value);
  2510.     final int hob = Integer.highestOneBit(value);
  2511.     return Integer.numberOfTrailingZeros(hob) +
  2512.       (((hob >>> 1) & value) == 0 ? 0 : 1);
  2513.   }
  2514. }