MapTask.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:53k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
  20. import static org.apache.hadoop.mapred.Task.Counter.COMBINE_OUTPUT_RECORDS;
  21. import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
  22. import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
  23. import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
  24. import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
  25. import java.io.DataInput;
  26. import java.io.DataOutput;
  27. import java.io.DataOutputStream;
  28. import java.io.IOException;
  29. import java.io.OutputStream;
  30. import java.lang.reflect.Constructor;
  31. import java.lang.reflect.InvocationTargetException;
  32. import java.util.ArrayList;
  33. import java.util.List;
  34. import java.util.concurrent.locks.Condition;
  35. import java.util.concurrent.locks.ReentrantLock;
  36. import org.apache.commons.logging.Log;
  37. import org.apache.commons.logging.LogFactory;
  38. import org.apache.hadoop.conf.Configuration;
  39. import org.apache.hadoop.fs.FSDataOutputStream;
  40. import org.apache.hadoop.fs.FileSystem;
  41. import org.apache.hadoop.fs.LocalFileSystem;
  42. import org.apache.hadoop.fs.Path;
  43. import org.apache.hadoop.io.BytesWritable;
  44. import org.apache.hadoop.io.DataInputBuffer;
  45. import org.apache.hadoop.io.RawComparator;
  46. import org.apache.hadoop.io.SequenceFile;
  47. import org.apache.hadoop.io.Text;
  48. import org.apache.hadoop.io.SequenceFile.CompressionType;
  49. import org.apache.hadoop.io.compress.CompressionCodec;
  50. import org.apache.hadoop.io.compress.DefaultCodec;
  51. import org.apache.hadoop.io.serializer.Deserializer;
  52. import org.apache.hadoop.io.serializer.SerializationFactory;
  53. import org.apache.hadoop.io.serializer.Serializer;
  54. import org.apache.hadoop.mapred.IFile.Writer;
  55. import org.apache.hadoop.mapred.Merger.Segment;
  56. import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
  57. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  58. import org.apache.hadoop.util.IndexedSortable;
  59. import org.apache.hadoop.util.IndexedSorter;
  60. import org.apache.hadoop.util.Progress;
  61. import org.apache.hadoop.util.QuickSort;
  62. import org.apache.hadoop.util.ReflectionUtils;
  63. /** A Map task. */
  64. class MapTask extends Task {
  65.   /**
  66.    * The size of each record in the index file for the map-outputs.
  67.    */
  68.   public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
  69.   
  70.   private BytesWritable split = new BytesWritable();
  71.   private String splitClass;
  72.   private final static int APPROX_HEADER_LENGTH = 150;
  73.   private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
  74.   {   // set phase for this task
  75.     setPhase(TaskStatus.Phase.MAP); 
  76.   }
  77.   public MapTask() {
  78.     super();
  79.   }
  80.   public MapTask(String jobFile, TaskAttemptID taskId, 
  81.                  int partition, String splitClass, BytesWritable split
  82.                  ) {
  83.     super(jobFile, taskId, partition);
  84.     this.splitClass = splitClass;
  85.     this.split = split;
  86.   }
  87.   @Override
  88.   public boolean isMapTask() {
  89.     return true;
  90.   }
  91.   @Override
  92.   public void localizeConfiguration(JobConf conf) throws IOException {
  93.     super.localizeConfiguration(conf);
  94.     if (isMapOrReduce()) {
  95.       Path localSplit = new Path(new Path(getJobFile()).getParent(), 
  96.                                  "split.dta");
  97.       LOG.debug("Writing local split to " + localSplit);
  98.       DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
  99.       Text.writeString(out, splitClass);
  100.       split.write(out);
  101.       out.close();
  102.     }
  103.   }
  104.   
  105.   @Override
  106.   public TaskRunner createRunner(TaskTracker tracker, 
  107.       TaskTracker.TaskInProgress tip) {
  108.     return new MapTaskRunner(tip, tracker, this.conf);
  109.   }
  110.   @Override
  111.   public void write(DataOutput out) throws IOException {
  112.     super.write(out);
  113.     if (isMapOrReduce()) {
  114.       Text.writeString(out, splitClass);
  115.       split.write(out);
  116.       split = null;
  117.     }
  118.   }
  119.   
  120.   @Override
  121.   public void readFields(DataInput in) throws IOException {
  122.     super.readFields(in);
  123.     if (isMapOrReduce()) {
  124.       splitClass = Text.readString(in);
  125.       split.readFields(in);
  126.     }
  127.   }
  128.   /**
  129.    * This class wraps the user's record reader to update the counters and progress
  130.    * as records are read.
  131.    * @param <K>
  132.    * @param <V>
  133.    */
  134.   class TrackedRecordReader<K, V> 
  135.       implements RecordReader<K,V> {
  136.     private RecordReader<K,V> rawIn;
  137.     private Counters.Counter inputByteCounter;
  138.     private Counters.Counter inputRecordCounter;
  139.     private TaskReporter reporter;
  140.     private long beforePos = -1;
  141.     private long afterPos = -1;
  142.     
  143.     TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter) 
  144.       throws IOException{
  145.       rawIn = raw;
  146.       inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
  147.       inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
  148.       this.reporter = reporter;
  149.     }
  150.     public K createKey() {
  151.       return rawIn.createKey();
  152.     }
  153.       
  154.     public V createValue() {
  155.       return rawIn.createValue();
  156.     }
  157.      
  158.     public synchronized boolean next(K key, V value)
  159.     throws IOException {
  160.       boolean ret = moveToNext(key, value);
  161.       if (ret) {
  162.         incrCounters();
  163.       }
  164.       return ret;
  165.     }
  166.     
  167.     protected void incrCounters() {
  168.       inputRecordCounter.increment(1);
  169.       inputByteCounter.increment(afterPos - beforePos);
  170.     }
  171.      
  172.     protected synchronized boolean moveToNext(K key, V value)
  173.       throws IOException {
  174.       reporter.setProgress(getProgress());
  175.       beforePos = getPos();
  176.       boolean ret = rawIn.next(key, value);
  177.       afterPos = getPos();
  178.       return ret;
  179.     }
  180.     
  181.     public long getPos() throws IOException { return rawIn.getPos(); }
  182.     public void close() throws IOException { rawIn.close(); }
  183.     public float getProgress() throws IOException {
  184.       return rawIn.getProgress();
  185.     }
  186.     TaskReporter getTaskReporter() {
  187.       return reporter;
  188.     }
  189.   }
  190.   /**
  191.    * This class skips the records based on the failed ranges from previous 
  192.    * attempts.
  193.    */
  194.   class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
  195.     private SkipRangeIterator skipIt;
  196.     private SequenceFile.Writer skipWriter;
  197.     private boolean toWriteSkipRecs;
  198.     private TaskUmbilicalProtocol umbilical;
  199.     private Counters.Counter skipRecCounter;
  200.     private long recIndex = -1;
  201.     
  202.     SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
  203.                          TaskReporter reporter) throws IOException{
  204.       super(raw, reporter);
  205.       this.umbilical = umbilical;
  206.       this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
  207.       this.toWriteSkipRecs = toWriteSkipRecs() &&  
  208.         SkipBadRecords.getSkipOutputPath(conf)!=null;
  209.       skipIt = getSkipRanges().skipRangeIterator();
  210.     }
  211.     
  212.     public synchronized boolean next(K key, V value)
  213.     throws IOException {
  214.       if(!skipIt.hasNext()) {
  215.         LOG.warn("Further records got skipped.");
  216.         return false;
  217.       }
  218.       boolean ret = moveToNext(key, value);
  219.       long nextRecIndex = skipIt.next();
  220.       long skip = 0;
  221.       while(recIndex<nextRecIndex && ret) {
  222.         if(toWriteSkipRecs) {
  223.           writeSkippedRec(key, value);
  224.         }
  225.        ret = moveToNext(key, value);
  226.         skip++;
  227.       }
  228.       //close the skip writer once all the ranges are skipped
  229.       if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
  230.         skipWriter.close();
  231.       }
  232.       skipRecCounter.increment(skip);
  233.       reportNextRecordRange(umbilical, recIndex);
  234.       if (ret) {
  235.         incrCounters();
  236.       }
  237.       return ret;
  238.     }
  239.     
  240.     protected synchronized boolean moveToNext(K key, V value)
  241.     throws IOException {
  242.     recIndex++;
  243.       return super.moveToNext(key, value);
  244.     }
  245.     
  246.     @SuppressWarnings("unchecked")
  247.     private void writeSkippedRec(K key, V value) throws IOException{
  248.       if(skipWriter==null) {
  249.         Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
  250.         Path skipFile = new Path(skipDir, getTaskID().toString());
  251.         skipWriter = 
  252.           SequenceFile.createWriter(
  253.               skipFile.getFileSystem(conf), conf, skipFile,
  254.               (Class<K>) createKey().getClass(),
  255.               (Class<V>) createValue().getClass(), 
  256.               CompressionType.BLOCK, getTaskReporter());
  257.       }
  258.       skipWriter.append(key, value);
  259.     }
  260.   }
  261.   @Override
  262.   public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
  263.     throws IOException, ClassNotFoundException, InterruptedException {
  264.     // start thread that will handle communication with parent
  265.     TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
  266.     reporter.startCommunicationThread();
  267.     boolean useNewApi = job.getUseNewMapper();
  268.     initialize(job, getJobID(), reporter, useNewApi);
  269.     // check if it is a cleanupJobTask
  270.     if (jobCleanup) {
  271.       runJobCleanupTask(umbilical, reporter);
  272.       return;
  273.     }
  274.     if (jobSetup) {
  275.       runJobSetupTask(umbilical, reporter);
  276.       return;
  277.     }
  278.     if (taskCleanup) {
  279.       runTaskCleanupTask(umbilical, reporter);
  280.       return;
  281.     }
  282.     if (useNewApi) {
  283.       runNewMapper(job, split, umbilical, reporter);
  284.     } else {
  285.       runOldMapper(job, split, umbilical, reporter);
  286.     }
  287.     done(umbilical, reporter);
  288.   }
  289.   @SuppressWarnings("unchecked")
  290.   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  291.   void runOldMapper(final JobConf job,
  292.                     final BytesWritable rawSplit,
  293.                     final TaskUmbilicalProtocol umbilical,
  294.                     TaskReporter reporter
  295.                     ) throws IOException, InterruptedException,
  296.                              ClassNotFoundException {
  297.     InputSplit inputSplit = null;
  298.     // reinstantiate the split
  299.     try {
  300.       inputSplit = (InputSplit) 
  301.         ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
  302.     } catch (ClassNotFoundException exp) {
  303.       IOException wrap = new IOException("Split class " + splitClass + 
  304.                                          " not found");
  305.       wrap.initCause(exp);
  306.       throw wrap;
  307.     }
  308.     DataInputBuffer splitBuffer = new DataInputBuffer();
  309.     splitBuffer.reset(split.getBytes(), 0, split.getLength());
  310.     inputSplit.readFields(splitBuffer);
  311.     
  312.     updateJobWithSplit(job, inputSplit);
  313.     reporter.setInputSplit(inputSplit);
  314.     RecordReader<INKEY,INVALUE> rawIn =                  // open input
  315.       job.getInputFormat().getRecordReader(inputSplit, job, reporter);
  316.     RecordReader<INKEY,INVALUE> in = isSkipping() ? 
  317.         new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
  318.         new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
  319.     job.setBoolean("mapred.skip.on", isSkipping());
  320.     int numReduceTasks = conf.getNumReduceTasks();
  321.     LOG.info("numReduceTasks: " + numReduceTasks);
  322.     MapOutputCollector collector = null;
  323.     if (numReduceTasks > 0) {
  324.       collector = new MapOutputBuffer(umbilical, job, reporter);
  325.     } else { 
  326.       collector = new DirectMapOutputCollector(umbilical, job, reporter);
  327.     }
  328.     MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
  329.       ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
  330.     try {
  331.       runner.run(in, collector, reporter);      
  332.       collector.flush();
  333.     } finally {
  334.       //close
  335.       in.close();                               // close input
  336.       collector.close();
  337.     }
  338.   }
  339.   /**
  340.    * Update the job with details about the file split
  341.    * @param job the job configuration to update
  342.    * @param inputSplit the file split
  343.    */
  344.   private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
  345.     if (inputSplit instanceof FileSplit) {
  346.       FileSplit fileSplit = (FileSplit) inputSplit;
  347.       job.set("map.input.file", fileSplit.getPath().toString());
  348.       job.setLong("map.input.start", fileSplit.getStart());
  349.       job.setLong("map.input.length", fileSplit.getLength());
  350.     }
  351.   }
  352.   static class NewTrackingRecordReader<K,V> 
  353.     extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
  354.     private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
  355.     private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
  356.     
  357.     NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
  358.                             TaskReporter reporter) {
  359.       this.real = real;
  360.       this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
  361.     }
  362.     @Override
  363.     public void close() throws IOException {
  364.       real.close();
  365.     }
  366.     @Override
  367.     public K getCurrentKey() throws IOException, InterruptedException {
  368.       return real.getCurrentKey();
  369.     }
  370.     @Override
  371.     public V getCurrentValue() throws IOException, InterruptedException {
  372.       return real.getCurrentValue();
  373.     }
  374.     @Override
  375.     public float getProgress() throws IOException, InterruptedException {
  376.       return real.getProgress();
  377.     }
  378.     @Override
  379.     public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
  380.                            org.apache.hadoop.mapreduce.TaskAttemptContext context
  381.                            ) throws IOException, InterruptedException {
  382.       real.initialize(split, context);
  383.     }
  384.     @Override
  385.     public boolean nextKeyValue() throws IOException, InterruptedException {
  386.       boolean result = real.nextKeyValue();
  387.       if (result) {
  388.         inputRecordCounter.increment(1);
  389.       }
  390.       return result;
  391.     }
  392.   }
  393.   private class NewOutputCollector<K,V>
  394.     extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
  395.     private MapOutputCollector<K,V> collector;
  396.     NewOutputCollector(JobConf job, 
  397.                        TaskUmbilicalProtocol umbilical,
  398.                        TaskReporter reporter
  399.                        ) throws IOException, ClassNotFoundException {
  400.       collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
  401.     }
  402.     @Override
  403.     public void write(K key, V value) throws IOException {
  404.       collector.collect(key, value);
  405.     }
  406.     @Override
  407.     public void close(TaskAttemptContext context
  408.                       ) throws IOException,InterruptedException {
  409.       try {
  410.         collector.flush();
  411.       } catch (ClassNotFoundException cnf) {
  412.         throw new IOException("can't find class ", cnf);
  413.       }
  414.       collector.close();
  415.     }
  416.   }
  417.   @SuppressWarnings("unchecked")
  418.   private <INKEY,INVALUE,OUTKEY,OUTVALUE>
  419.   void runNewMapper(final JobConf job,
  420.                     final BytesWritable rawSplit,
  421.                     final TaskUmbilicalProtocol umbilical,
  422.                     TaskReporter reporter
  423.                     ) throws IOException, ClassNotFoundException,
  424.                              InterruptedException {
  425.     // make a task context so we can get the classes
  426.     org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
  427.       new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
  428.     // make a mapper
  429.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
  430.       (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
  431.         ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
  432.     // make the input format
  433.     org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
  434.       (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
  435.         ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
  436.     // rebuild the input split
  437.     org.apache.hadoop.mapreduce.InputSplit split = null;
  438.     DataInputBuffer splitBuffer = new DataInputBuffer();
  439.     splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
  440.     SerializationFactory factory = new SerializationFactory(job);
  441.     Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
  442.       deserializer = 
  443.         (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>) 
  444.         factory.getDeserializer(job.getClassByName(splitClass));
  445.     deserializer.open(splitBuffer);
  446.     split = deserializer.deserialize(null);
  447.     org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
  448.       new NewTrackingRecordReader<INKEY,INVALUE>
  449.           (inputFormat.createRecordReader(split, taskContext), reporter);
  450.     
  451.     job.setBoolean("mapred.skip.on", isSkipping());
  452.     org.apache.hadoop.mapreduce.RecordWriter output = null;
  453.     org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context 
  454.          mapperContext = null;
  455.     try {
  456.       Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
  457.         org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
  458.         (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
  459.                      Configuration.class,
  460.                      org.apache.hadoop.mapreduce.TaskAttemptID.class,
  461.                      org.apache.hadoop.mapreduce.RecordReader.class,
  462.                      org.apache.hadoop.mapreduce.RecordWriter.class,
  463.                      org.apache.hadoop.mapreduce.OutputCommitter.class,
  464.                      org.apache.hadoop.mapreduce.StatusReporter.class,
  465.                      org.apache.hadoop.mapreduce.InputSplit.class});
  466.       // get an output object
  467.       if (job.getNumReduceTasks() == 0) {
  468.         output = outputFormat.getRecordWriter(taskContext);
  469.       } else {
  470.         output = new NewOutputCollector(job, umbilical, reporter);
  471.       }
  472.       mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
  473.                                                      input, output, committer,
  474.                                                      reporter, split);
  475.       input.initialize(split, mapperContext);
  476.       mapper.run(mapperContext);
  477.       input.close();
  478.       output.close(mapperContext);
  479.     } catch (NoSuchMethodException e) {
  480.       throw new IOException("Can't find Context constructor", e);
  481.     } catch (InstantiationException e) {
  482.       throw new IOException("Can't create Context", e);
  483.     } catch (InvocationTargetException e) {
  484.       throw new IOException("Can't invoke Context constructor", e);
  485.     } catch (IllegalAccessException e) {
  486.       throw new IOException("Can't invoke Context constructor", e);
  487.     }
  488.   }
  489.   interface MapOutputCollector<K, V>
  490.     extends OutputCollector<K, V> {
  491.     public void close() throws IOException, InterruptedException;
  492.     
  493.     public void flush() throws IOException, InterruptedException, 
  494.                                ClassNotFoundException;
  495.         
  496.   }
  497.   class DirectMapOutputCollector<K, V>
  498.     implements MapOutputCollector<K, V> {
  499.  
  500.     private RecordWriter<K, V> out = null;
  501.     private TaskReporter reporter = null;
  502.     private final Counters.Counter mapOutputRecordCounter;
  503.     @SuppressWarnings("unchecked")
  504.     public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
  505.         JobConf job, TaskReporter reporter) throws IOException {
  506.       this.reporter = reporter;
  507.       String finalName = getOutputName(getPartition());
  508.       FileSystem fs = FileSystem.get(job);
  509.       out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
  510.       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
  511.     }
  512.     public void close() throws IOException {
  513.       if (this.out != null) {
  514.         out.close(this.reporter);
  515.       }
  516.     }
  517.     public void flush() throws IOException, InterruptedException, 
  518.                                ClassNotFoundException {
  519.     }
  520.     public void collect(K key, V value) throws IOException {
  521.       reporter.progress();
  522.       out.write(key, value);
  523.       mapOutputRecordCounter.increment(1);
  524.     }
  525.     
  526.   }
  527.   class MapOutputBuffer<K extends Object, V extends Object> 
  528.   implements MapOutputCollector<K, V>, IndexedSortable {
  529.     private final int partitions;
  530.     private final Partitioner<K, V> partitioner;
  531.     private final JobConf job;
  532.     private final TaskReporter reporter;
  533.     private final Class<K> keyClass;
  534.     private final Class<V> valClass;
  535.     private final RawComparator<K> comparator;
  536.     private final SerializationFactory serializationFactory;
  537.     private final Serializer<K> keySerializer;
  538.     private final Serializer<V> valSerializer;
  539.     private final CombinerRunner<K,V> combinerRunner;
  540.     private final CombineOutputCollector<K, V> combineCollector;
  541.     
  542.     // Compression for map-outputs
  543.     private CompressionCodec codec = null;
  544.     // k/v accounting
  545.     private volatile int kvstart = 0;  // marks beginning of spill
  546.     private volatile int kvend = 0;    // marks beginning of collectable
  547.     private int kvindex = 0;           // marks end of collected
  548.     private final int[] kvoffsets;     // indices into kvindices
  549.     private final int[] kvindices;     // partition, k/v offsets into kvbuffer
  550.     private volatile int bufstart = 0; // marks beginning of spill
  551.     private volatile int bufend = 0;   // marks beginning of collectable
  552.     private volatile int bufvoid = 0;  // marks the point where we should stop
  553.                                        // reading at the end of the buffer
  554.     private int bufindex = 0;          // marks end of collected
  555.     private int bufmark = 0;           // marks end of record
  556.     private byte[] kvbuffer;           // main output buffer
  557.     private static final int PARTITION = 0; // partition offset in acct
  558.     private static final int KEYSTART = 1;  // key offset in acct
  559.     private static final int VALSTART = 2;  // val offset in acct
  560.     private static final int ACCTSIZE = 3;  // total #fields in acct
  561.     private static final int RECSIZE =
  562.                        (ACCTSIZE + 1) * 4;  // acct bytes per record
  563.     // spill accounting
  564.     private volatile int numSpills = 0;
  565.     private volatile Throwable sortSpillException = null;
  566.     private final int softRecordLimit;
  567.     private final int softBufferLimit;
  568.     private final int minSpillsForCombine;
  569.     private final IndexedSorter sorter;
  570.     private final ReentrantLock spillLock = new ReentrantLock();
  571.     private final Condition spillDone = spillLock.newCondition();
  572.     private final Condition spillReady = spillLock.newCondition();
  573.     private final BlockingBuffer bb = new BlockingBuffer();
  574.     private volatile boolean spillThreadRunning = false;
  575.     private final SpillThread spillThread = new SpillThread();
  576.     private final FileSystem localFs;
  577.     private final FileSystem rfs;
  578.    
  579.     private final Counters.Counter mapOutputByteCounter;
  580.     private final Counters.Counter mapOutputRecordCounter;
  581.     private final Counters.Counter combineOutputCounter;
  582.     
  583.     private ArrayList<SpillRecord> indexCacheList;
  584.     private int totalIndexCacheMemory;
  585.     private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
  586.     @SuppressWarnings("unchecked")
  587.     public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
  588.                            TaskReporter reporter
  589.                            ) throws IOException, ClassNotFoundException {
  590.       this.job = job;
  591.       this.reporter = reporter;
  592.       localFs = FileSystem.getLocal(job);
  593.       partitions = job.getNumReduceTasks();
  594.       partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
  595.        
  596.       rfs = ((LocalFileSystem)localFs).getRaw();
  597.       indexCacheList = new ArrayList<SpillRecord>();
  598.       
  599.       //sanity checks
  600.       final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
  601.       final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
  602.       final int sortmb = job.getInt("io.sort.mb", 100);
  603.       if (spillper > (float)1.0 || spillper < (float)0.0) {
  604.         throw new IOException("Invalid "io.sort.spill.percent": " + spillper);
  605.       }
  606.       if (recper > (float)1.0 || recper < (float)0.01) {
  607.         throw new IOException("Invalid "io.sort.record.percent": " + recper);
  608.       }
  609.       if ((sortmb & 0x7FF) != sortmb) {
  610.         throw new IOException("Invalid "io.sort.mb": " + sortmb);
  611.       }
  612.       sorter = ReflectionUtils.newInstance(
  613.             job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
  614.       LOG.info("io.sort.mb = " + sortmb);
  615.       // buffers and accounting
  616.       int maxMemUsage = sortmb << 20;
  617.       int recordCapacity = (int)(maxMemUsage * recper);
  618.       recordCapacity -= recordCapacity % RECSIZE;
  619.       kvbuffer = new byte[maxMemUsage - recordCapacity];
  620.       bufvoid = kvbuffer.length;
  621.       recordCapacity /= RECSIZE;
  622.       kvoffsets = new int[recordCapacity];
  623.       kvindices = new int[recordCapacity * ACCTSIZE];
  624.       softBufferLimit = (int)(kvbuffer.length * spillper);
  625.       softRecordLimit = (int)(kvoffsets.length * spillper);
  626.       LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
  627.       LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
  628.       // k/v serialization
  629.       comparator = job.getOutputKeyComparator();
  630.       keyClass = (Class<K>)job.getMapOutputKeyClass();
  631.       valClass = (Class<V>)job.getMapOutputValueClass();
  632.       serializationFactory = new SerializationFactory(job);
  633.       keySerializer = serializationFactory.getSerializer(keyClass);
  634.       keySerializer.open(bb);
  635.       valSerializer = serializationFactory.getSerializer(valClass);
  636.       valSerializer.open(bb);
  637.       // counters
  638.       mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
  639.       mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
  640.       Counters.Counter combineInputCounter = 
  641.         reporter.getCounter(COMBINE_INPUT_RECORDS);
  642.       combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
  643.       // compression
  644.       if (job.getCompressMapOutput()) {
  645.         Class<? extends CompressionCodec> codecClass =
  646.           job.getMapOutputCompressorClass(DefaultCodec.class);
  647.         codec = ReflectionUtils.newInstance(codecClass, job);
  648.       }
  649.       // combiner
  650.       combinerRunner = CombinerRunner.create(job, getTaskID(), 
  651.                                              combineInputCounter,
  652.                                              reporter, null);
  653.       if (combinerRunner != null) {
  654.         combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
  655.       } else {
  656.         combineCollector = null;
  657.       }
  658.       minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
  659.       spillThread.setDaemon(true);
  660.       spillThread.setName("SpillThread");
  661.       spillLock.lock();
  662.       try {
  663.         spillThread.start();
  664.         while (!spillThreadRunning) {
  665.           spillDone.await();
  666.         }
  667.       } catch (InterruptedException e) {
  668.         throw (IOException)new IOException("Spill thread failed to initialize"
  669.             ).initCause(sortSpillException);
  670.       } finally {
  671.         spillLock.unlock();
  672.       }
  673.       if (sortSpillException != null) {
  674.         throw (IOException)new IOException("Spill thread failed to initialize"
  675.             ).initCause(sortSpillException);
  676.       }
  677.     }
  678.     public synchronized void collect(K key, V value)
  679.         throws IOException {
  680.       reporter.progress();
  681.       if (key.getClass() != keyClass) {
  682.         throw new IOException("Type mismatch in key from map: expected "
  683.                               + keyClass.getName() + ", recieved "
  684.                               + key.getClass().getName());
  685.       }
  686.       if (value.getClass() != valClass) {
  687.         throw new IOException("Type mismatch in value from map: expected "
  688.                               + valClass.getName() + ", recieved "
  689.                               + value.getClass().getName());
  690.       }
  691.       final int kvnext = (kvindex + 1) % kvoffsets.length;
  692.       spillLock.lock();
  693.       try {
  694.         boolean kvfull;
  695.         do {
  696.           if (sortSpillException != null) {
  697.             throw (IOException)new IOException("Spill failed"
  698.                 ).initCause(sortSpillException);
  699.           }
  700.           // sufficient acct space
  701.           kvfull = kvnext == kvstart;
  702.           final boolean kvsoftlimit = ((kvnext > kvend)
  703.               ? kvnext - kvend > softRecordLimit
  704.               : kvend - kvnext <= kvoffsets.length - softRecordLimit);
  705.           if (kvstart == kvend && kvsoftlimit) {
  706.             LOG.info("Spilling map output: record full = " + kvsoftlimit);
  707.             startSpill();
  708.           }
  709.           if (kvfull) {
  710.             try {
  711.               while (kvstart != kvend) {
  712.                 reporter.progress();
  713.                 spillDone.await();
  714.               }
  715.             } catch (InterruptedException e) {
  716.               throw (IOException)new IOException(
  717.                   "Collector interrupted while waiting for the writer"
  718.                   ).initCause(e);
  719.             }
  720.           }
  721.         } while (kvfull);
  722.       } finally {
  723.         spillLock.unlock();
  724.       }
  725.       try {
  726.         // serialize key bytes into buffer
  727.         int keystart = bufindex;
  728.         keySerializer.serialize(key);
  729.         if (bufindex < keystart) {
  730.           // wrapped the key; reset required
  731.           bb.reset();
  732.           keystart = 0;
  733.         }
  734.         // serialize value bytes into buffer
  735.         final int valstart = bufindex;
  736.         valSerializer.serialize(value);
  737.         int valend = bb.markRecord();
  738.         final int partition = partitioner.getPartition(key, value, partitions);
  739.         if (partition < 0 || partition >= partitions) {
  740.           throw new IOException("Illegal partition for " + key + " (" +
  741.               partition + ")");
  742.         }
  743.         mapOutputRecordCounter.increment(1);
  744.         mapOutputByteCounter.increment(valend >= keystart
  745.             ? valend - keystart
  746.             : (bufvoid - keystart) + valend);
  747.         // update accounting info
  748.         int ind = kvindex * ACCTSIZE;
  749.         kvoffsets[kvindex] = ind;
  750.         kvindices[ind + PARTITION] = partition;
  751.         kvindices[ind + KEYSTART] = keystart;
  752.         kvindices[ind + VALSTART] = valstart;
  753.         kvindex = kvnext;
  754.       } catch (MapBufferTooSmallException e) {
  755.         LOG.info("Record too large for in-memory buffer: " + e.getMessage());
  756.         spillSingleRecord(key, value);
  757.         mapOutputRecordCounter.increment(1);
  758.         return;
  759.       }
  760.     }
  761.     /**
  762.      * Compare logical range, st i, j MOD offset capacity.
  763.      * Compare by partition, then by key.
  764.      * @see IndexedSortable#compare
  765.      */
  766.     public int compare(int i, int j) {
  767.       final int ii = kvoffsets[i % kvoffsets.length];
  768.       final int ij = kvoffsets[j % kvoffsets.length];
  769.       // sort by partition
  770.       if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
  771.         return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
  772.       }
  773.       // sort by key
  774.       return comparator.compare(kvbuffer,
  775.           kvindices[ii + KEYSTART],
  776.           kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
  777.           kvbuffer,
  778.           kvindices[ij + KEYSTART],
  779.           kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
  780.     }
  781.     /**
  782.      * Swap logical indices st i, j MOD offset capacity.
  783.      * @see IndexedSortable#swap
  784.      */
  785.     public void swap(int i, int j) {
  786.       i %= kvoffsets.length;
  787.       j %= kvoffsets.length;
  788.       int tmp = kvoffsets[i];
  789.       kvoffsets[i] = kvoffsets[j];
  790.       kvoffsets[j] = tmp;
  791.     }
  792.     /**
  793.      * Inner class managing the spill of serialized records to disk.
  794.      */
  795.     protected class BlockingBuffer extends DataOutputStream {
  796.       public BlockingBuffer() {
  797.         this(new Buffer());
  798.       }
  799.       private BlockingBuffer(OutputStream out) {
  800.         super(out);
  801.       }
  802.       /**
  803.        * Mark end of record. Note that this is required if the buffer is to
  804.        * cut the spill in the proper place.
  805.        */
  806.       public int markRecord() {
  807.         bufmark = bufindex;
  808.         return bufindex;
  809.       }
  810.       /**
  811.        * Set position from last mark to end of writable buffer, then rewrite
  812.        * the data between last mark and kvindex.
  813.        * This handles a special case where the key wraps around the buffer.
  814.        * If the key is to be passed to a RawComparator, then it must be
  815.        * contiguous in the buffer. This recopies the data in the buffer back
  816.        * into itself, but starting at the beginning of the buffer. Note that
  817.        * reset() should <b>only</b> be called immediately after detecting
  818.        * this condition. To call it at any other time is undefined and would
  819.        * likely result in data loss or corruption.
  820.        * @see #markRecord()
  821.        */
  822.       protected synchronized void reset() throws IOException {
  823.         // spillLock unnecessary; If spill wraps, then
  824.         // bufindex < bufstart < bufend so contention is impossible
  825.         // a stale value for bufstart does not affect correctness, since
  826.         // we can only get false negatives that force the more
  827.         // conservative path
  828.         int headbytelen = bufvoid - bufmark;
  829.         bufvoid = bufmark;
  830.         if (bufindex + headbytelen < bufstart) {
  831.           System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
  832.           System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
  833.           bufindex += headbytelen;
  834.         } else {
  835.           byte[] keytmp = new byte[bufindex];
  836.           System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
  837.           bufindex = 0;
  838.           out.write(kvbuffer, bufmark, headbytelen);
  839.           out.write(keytmp);
  840.         }
  841.       }
  842.     }
  843.     public class Buffer extends OutputStream {
  844.       private final byte[] scratch = new byte[1];
  845.       @Override
  846.       public synchronized void write(int v)
  847.           throws IOException {
  848.         scratch[0] = (byte)v;
  849.         write(scratch, 0, 1);
  850.       }
  851.       /**
  852.        * Attempt to write a sequence of bytes to the collection buffer.
  853.        * This method will block if the spill thread is running and it
  854.        * cannot write.
  855.        * @throws MapBufferTooSmallException if record is too large to
  856.        *    deserialize into the collection buffer.
  857.        */
  858.       @Override
  859.       public synchronized void write(byte b[], int off, int len)
  860.           throws IOException {
  861.         boolean buffull = false;
  862.         boolean wrap = false;
  863.         spillLock.lock();
  864.         try {
  865.           do {
  866.             if (sortSpillException != null) {
  867.               throw (IOException)new IOException("Spill failed"
  868.                   ).initCause(sortSpillException);
  869.             }
  870.             // sufficient buffer space?
  871.             if (bufstart <= bufend && bufend <= bufindex) {
  872.               buffull = bufindex + len > bufvoid;
  873.               wrap = (bufvoid - bufindex) + bufstart > len;
  874.             } else {
  875.               // bufindex <= bufstart <= bufend
  876.               // bufend <= bufindex <= bufstart
  877.               wrap = false;
  878.               buffull = bufindex + len > bufstart;
  879.             }
  880.             if (kvstart == kvend) {
  881.               // spill thread not running
  882.               if (kvend != kvindex) {
  883.                 // we have records we can spill
  884.                 final boolean bufsoftlimit = (bufindex > bufend)
  885.                   ? bufindex - bufend > softBufferLimit
  886.                   : bufend - bufindex < bufvoid - softBufferLimit;
  887.                 if (bufsoftlimit || (buffull && !wrap)) {
  888.                   LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
  889.                   startSpill();
  890.                 }
  891.               } else if (buffull && !wrap) {
  892.                 // We have no buffered records, and this record is too large
  893.                 // to write into kvbuffer. We must spill it directly from
  894.                 // collect
  895.                 final int size = ((bufend <= bufindex)
  896.                   ? bufindex - bufend
  897.                   : (bufvoid - bufend) + bufindex) + len;
  898.                 bufstart = bufend = bufindex = bufmark = 0;
  899.                 kvstart = kvend = kvindex = 0;
  900.                 bufvoid = kvbuffer.length;
  901.                 throw new MapBufferTooSmallException(size + " bytes");
  902.               }
  903.             }
  904.             if (buffull && !wrap) {
  905.               try {
  906.                 while (kvstart != kvend) {
  907.                   reporter.progress();
  908.                   spillDone.await();
  909.                 }
  910.               } catch (InterruptedException e) {
  911.                   throw (IOException)new IOException(
  912.                       "Buffer interrupted while waiting for the writer"
  913.                       ).initCause(e);
  914.               }
  915.             }
  916.           } while (buffull && !wrap);
  917.         } finally {
  918.           spillLock.unlock();
  919.         }
  920.         // here, we know that we have sufficient space to write
  921.         if (buffull) {
  922.           final int gaplen = bufvoid - bufindex;
  923.           System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
  924.           len -= gaplen;
  925.           off += gaplen;
  926.           bufindex = 0;
  927.         }
  928.         System.arraycopy(b, off, kvbuffer, bufindex, len);
  929.         bufindex += len;
  930.       }
  931.     }
  932.     public synchronized void flush() throws IOException, ClassNotFoundException,
  933.                                             InterruptedException {
  934.       LOG.info("Starting flush of map output");
  935.       spillLock.lock();
  936.       try {
  937.         while (kvstart != kvend) {
  938.           reporter.progress();
  939.           spillDone.await();
  940.         }
  941.         if (sortSpillException != null) {
  942.           throw (IOException)new IOException("Spill failed"
  943.               ).initCause(sortSpillException);
  944.         }
  945.         if (kvend != kvindex) {
  946.           kvend = kvindex;
  947.           bufend = bufmark;
  948.           sortAndSpill();
  949.         }
  950.       } catch (InterruptedException e) {
  951.         throw (IOException)new IOException(
  952.             "Buffer interrupted while waiting for the writer"
  953.             ).initCause(e);
  954.       } finally {
  955.         spillLock.unlock();
  956.       }
  957.       assert !spillLock.isHeldByCurrentThread();
  958.       // shut down spill thread and wait for it to exit. Since the preceding
  959.       // ensures that it is finished with its work (and sortAndSpill did not
  960.       // throw), we elect to use an interrupt instead of setting a flag.
  961.       // Spilling simultaneously from this thread while the spill thread
  962.       // finishes its work might be both a useful way to extend this and also
  963.       // sufficient motivation for the latter approach.
  964.       try {
  965.         spillThread.interrupt();
  966.         spillThread.join();
  967.       } catch (InterruptedException e) {
  968.         throw (IOException)new IOException("Spill failed"
  969.             ).initCause(e);
  970.       }
  971.       // release sort buffer before the merge
  972.       kvbuffer = null;
  973.       mergeParts();
  974.     }
  975.     public void close() { }
  976.     protected class SpillThread extends Thread {
  977.       @Override
  978.       public void run() {
  979.         spillLock.lock();
  980.         spillThreadRunning = true;
  981.         try {
  982.           while (true) {
  983.             spillDone.signal();
  984.             while (kvstart == kvend) {
  985.               spillReady.await();
  986.             }
  987.             try {
  988.               spillLock.unlock();
  989.               sortAndSpill();
  990.             } catch (Throwable e) {
  991.               sortSpillException = e;
  992.             } finally {
  993.               spillLock.lock();
  994.               if (bufend < bufindex && bufindex < bufstart) {
  995.                 bufvoid = kvbuffer.length;
  996.               }
  997.               kvstart = kvend;
  998.               bufstart = bufend;
  999.             }
  1000.           }
  1001.         } catch (InterruptedException e) {
  1002.           Thread.currentThread().interrupt();
  1003.         } finally {
  1004.           spillLock.unlock();
  1005.           spillThreadRunning = false;
  1006.         }
  1007.       }
  1008.     }
  1009.     private synchronized void startSpill() {
  1010.       LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
  1011.                "; bufvoid = " + bufvoid);
  1012.       LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
  1013.                "; length = " + kvoffsets.length);
  1014.       kvend = kvindex;
  1015.       bufend = bufmark;
  1016.       spillReady.signal();
  1017.     }
  1018.     private void sortAndSpill() throws IOException, ClassNotFoundException,
  1019.                                        InterruptedException {
  1020.       //approximate the length of the output file to be the length of the
  1021.       //buffer + header lengths for the partitions
  1022.       long size = (bufend >= bufstart
  1023.           ? bufend - bufstart
  1024.           : (bufvoid - bufend) + bufstart) +
  1025.                   partitions * APPROX_HEADER_LENGTH;
  1026.       FSDataOutputStream out = null;
  1027.       try {
  1028.         // create spill file
  1029.         final SpillRecord spillRec = new SpillRecord(partitions);
  1030.         final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  1031.             numSpills, size);
  1032.         out = rfs.create(filename);
  1033.         final int endPosition = (kvend > kvstart)
  1034.           ? kvend
  1035.           : kvoffsets.length + kvend;
  1036.         sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
  1037.         int spindex = kvstart;
  1038.         IndexRecord rec = new IndexRecord();
  1039.         InMemValBytes value = new InMemValBytes();
  1040.         for (int i = 0; i < partitions; ++i) {
  1041.           IFile.Writer<K, V> writer = null;
  1042.           try {
  1043.             long segmentStart = out.getPos();
  1044.             writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
  1045.                                       spilledRecordsCounter);
  1046.             if (combinerRunner == null) {
  1047.               // spill directly
  1048.               DataInputBuffer key = new DataInputBuffer();
  1049.               while (spindex < endPosition &&
  1050.                   kvindices[kvoffsets[spindex % kvoffsets.length]
  1051.                             + PARTITION] == i) {
  1052.                 final int kvoff = kvoffsets[spindex % kvoffsets.length];
  1053.                 getVBytesForOffset(kvoff, value);
  1054.                 key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
  1055.                           (kvindices[kvoff + VALSTART] - 
  1056.                            kvindices[kvoff + KEYSTART]));
  1057.                 writer.append(key, value);
  1058.                 ++spindex;
  1059.               }
  1060.             } else {
  1061.               int spstart = spindex;
  1062.               while (spindex < endPosition &&
  1063.                   kvindices[kvoffsets[spindex % kvoffsets.length]
  1064.                             + PARTITION] == i) {
  1065.                 ++spindex;
  1066.               }
  1067.               // Note: we would like to avoid the combiner if we've fewer
  1068.               // than some threshold of records for a partition
  1069.               if (spstart != spindex) {
  1070.                 combineCollector.setWriter(writer);
  1071.                 RawKeyValueIterator kvIter =
  1072.                   new MRResultIterator(spstart, spindex);
  1073.                 combinerRunner.combine(kvIter, combineCollector);
  1074.               }
  1075.             }
  1076.             // close the writer
  1077.             writer.close();
  1078.             // record offsets
  1079.             rec.startOffset = segmentStart;
  1080.             rec.rawLength = writer.getRawLength();
  1081.             rec.partLength = writer.getCompressedLength();
  1082.             spillRec.putIndex(rec, i);
  1083.             writer = null;
  1084.           } finally {
  1085.             if (null != writer) writer.close();
  1086.           }
  1087.         }
  1088.         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  1089.           // create spill index file
  1090.           Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  1091.               getTaskID(), numSpills,
  1092.               partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  1093.           spillRec.writeToFile(indexFilename, job);
  1094.         } else {
  1095.           indexCacheList.add(spillRec);
  1096.           totalIndexCacheMemory +=
  1097.             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  1098.         }
  1099.         LOG.info("Finished spill " + numSpills);
  1100.         ++numSpills;
  1101.       } finally {
  1102.         if (out != null) out.close();
  1103.       }
  1104.     }
  1105.     /**
  1106.      * Handles the degenerate case where serialization fails to fit in
  1107.      * the in-memory buffer, so we must spill the record from collect
  1108.      * directly to a spill file. Consider this "losing".
  1109.      */
  1110.     private void spillSingleRecord(final K key, final V value) 
  1111.         throws IOException {
  1112.       long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
  1113.       FSDataOutputStream out = null;
  1114.       final int partition = partitioner.getPartition(key, value, partitions);
  1115.       try {
  1116.         // create spill file
  1117.         final SpillRecord spillRec = new SpillRecord(partitions);
  1118.         final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
  1119.             numSpills, size);
  1120.         out = rfs.create(filename);
  1121.         
  1122.         // we don't run the combiner for a single record
  1123.         IndexRecord rec = new IndexRecord();
  1124.         for (int i = 0; i < partitions; ++i) {
  1125.           IFile.Writer<K, V> writer = null;
  1126.           try {
  1127.             long segmentStart = out.getPos();
  1128.             // Create a new codec, don't care!
  1129.             writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
  1130.                                             spilledRecordsCounter);
  1131.             if (i == partition) {
  1132.               final long recordStart = out.getPos();
  1133.               writer.append(key, value);
  1134.               // Note that our map byte count will not be accurate with
  1135.               // compression
  1136.               mapOutputByteCounter.increment(out.getPos() - recordStart);
  1137.             }
  1138.             writer.close();
  1139.             // record offsets
  1140.             rec.startOffset = segmentStart;
  1141.             rec.rawLength = writer.getRawLength();
  1142.             rec.partLength = writer.getCompressedLength();
  1143.             spillRec.putIndex(rec, i);
  1144.             writer = null;
  1145.           } catch (IOException e) {
  1146.             if (null != writer) writer.close();
  1147.             throw e;
  1148.           }
  1149.         }
  1150.         if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
  1151.           // create spill index file
  1152.           Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
  1153.               getTaskID(), numSpills,
  1154.               partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
  1155.           spillRec.writeToFile(indexFilename, job);
  1156.         } else {
  1157.           indexCacheList.add(spillRec);
  1158.           totalIndexCacheMemory +=
  1159.             spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  1160.         }
  1161.         ++numSpills;
  1162.       } finally {
  1163.         if (out != null) out.close();
  1164.       }
  1165.     }
  1166.     /**
  1167.      * Given an offset, populate vbytes with the associated set of
  1168.      * deserialized value bytes. Should only be called during a spill.
  1169.      */
  1170.     private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
  1171.       final int nextindex = (kvoff / ACCTSIZE ==
  1172.                             (kvend - 1 + kvoffsets.length) % kvoffsets.length)
  1173.         ? bufend
  1174.         : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
  1175.       int vallen = (nextindex >= kvindices[kvoff + VALSTART])
  1176.         ? nextindex - kvindices[kvoff + VALSTART]
  1177.         : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
  1178.       vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
  1179.     }
  1180.     /**
  1181.      * Inner class wrapping valuebytes, used for appendRaw.
  1182.      */
  1183.     protected class InMemValBytes extends DataInputBuffer {
  1184.       private byte[] buffer;
  1185.       private int start;
  1186.       private int length;
  1187.             
  1188.       public void reset(byte[] buffer, int start, int length) {
  1189.         this.buffer = buffer;
  1190.         this.start = start;
  1191.         this.length = length;
  1192.         
  1193.         if (start + length > bufvoid) {
  1194.           this.buffer = new byte[this.length];
  1195.           final int taillen = bufvoid - start;
  1196.           System.arraycopy(buffer, start, this.buffer, 0, taillen);
  1197.           System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
  1198.           this.start = 0;
  1199.         }
  1200.         
  1201.         super.reset(this.buffer, this.start, this.length);
  1202.       }
  1203.     }
  1204.     protected class MRResultIterator implements RawKeyValueIterator {
  1205.       private final DataInputBuffer keybuf = new DataInputBuffer();
  1206.       private final InMemValBytes vbytes = new InMemValBytes();
  1207.       private final int end;
  1208.       private int current;
  1209.       public MRResultIterator(int start, int end) {
  1210.         this.end = end;
  1211.         current = start - 1;
  1212.       }
  1213.       public boolean next() throws IOException {
  1214.         return ++current < end;
  1215.       }
  1216.       public DataInputBuffer getKey() throws IOException {
  1217.         final int kvoff = kvoffsets[current % kvoffsets.length];
  1218.         keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
  1219.                      kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
  1220.         return keybuf;
  1221.       }
  1222.       public DataInputBuffer getValue() throws IOException {
  1223.         getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
  1224.         return vbytes;
  1225.       }
  1226.       public Progress getProgress() {
  1227.         return null;
  1228.       }
  1229.       public void close() { }
  1230.     }
  1231.     private void mergeParts() throws IOException, InterruptedException, 
  1232.                                      ClassNotFoundException {
  1233.       // get the approximate size of the final output/index files
  1234.       long finalOutFileSize = 0;
  1235.       long finalIndexFileSize = 0;
  1236.       final Path[] filename = new Path[numSpills];
  1237.       final TaskAttemptID mapId = getTaskID();
  1238.       for(int i = 0; i < numSpills; i++) {
  1239.         filename[i] = mapOutputFile.getSpillFile(mapId, i);
  1240.         finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
  1241.       }
  1242.       if (numSpills == 1) { //the spill is the final output
  1243.         rfs.rename(filename[0],
  1244.             new Path(filename[0].getParent(), "file.out"));
  1245.         if (indexCacheList.size() == 0) {
  1246.           rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
  1247.               new Path(filename[0].getParent(),"file.out.index"));
  1248.         } else {
  1249.           indexCacheList.get(0).writeToFile(
  1250.                 new Path(filename[0].getParent(),"file.out.index"), job);
  1251.         }
  1252.         return;
  1253.       }
  1254.       // read in paged indices
  1255.       for (int i = indexCacheList.size(); i < numSpills; ++i) {
  1256.         Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
  1257.         indexCacheList.add(new SpillRecord(indexFileName, job));
  1258.       }
  1259.       //make correction in the length to include the sequence file header
  1260.       //lengths for each partition
  1261.       finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
  1262.       finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
  1263.       Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
  1264.                              finalOutFileSize);
  1265.       Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
  1266.                             mapId, finalIndexFileSize);
  1267.       //The output stream for the final single output file
  1268.       FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
  1269.       if (numSpills == 0) {
  1270.         //create dummy files
  1271.         IndexRecord rec = new IndexRecord();
  1272.         SpillRecord sr = new SpillRecord(partitions);
  1273.         try {
  1274.           for (int i = 0; i < partitions; i++) {
  1275.             long segmentStart = finalOut.getPos();
  1276.             Writer<K, V> writer =
  1277.               new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
  1278.             writer.close();
  1279.             rec.startOffset = segmentStart;
  1280.             rec.rawLength = writer.getRawLength();
  1281.             rec.partLength = writer.getCompressedLength();
  1282.             sr.putIndex(rec, i);
  1283.           }
  1284.           sr.writeToFile(finalIndexFile, job);
  1285.         } finally {
  1286.           finalOut.close();
  1287.         }
  1288.         return;
  1289.       }
  1290.       {
  1291.         IndexRecord rec = new IndexRecord();
  1292.         final SpillRecord spillRec = new SpillRecord(partitions);
  1293.         for (int parts = 0; parts < partitions; parts++) {
  1294.           //create the segments to be merged
  1295.           List<Segment<K,V>> segmentList =
  1296.             new ArrayList<Segment<K, V>>(numSpills);
  1297.           for(int i = 0; i < numSpills; i++) {
  1298.             IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
  1299.             Segment<K,V> s =
  1300.               new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
  1301.                                indexRecord.partLength, codec, true);
  1302.             segmentList.add(i, s);
  1303.             if (LOG.isDebugEnabled()) {
  1304.               LOG.debug("MapId=" + mapId + " Reducer=" + parts +
  1305.                   "Spill =" + i + "(" + indexRecord.startOffset + "," +
  1306.                   indexRecord.rawLength + ", " + indexRecord.partLength + ")");
  1307.             }
  1308.           }
  1309.           //merge
  1310.           @SuppressWarnings("unchecked")
  1311.           RawKeyValueIterator kvIter = Merger.merge(job, rfs,
  1312.                          keyClass, valClass,
  1313.                          segmentList, job.getInt("io.sort.factor", 100),
  1314.                          new Path(mapId.toString()),
  1315.                          job.getOutputKeyComparator(), reporter,
  1316.                          null, spilledRecordsCounter);
  1317.           //write merged output to disk
  1318.           long segmentStart = finalOut.getPos();
  1319.           Writer<K, V> writer =
  1320.               new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
  1321.                                spilledRecordsCounter);
  1322.           if (combinerRunner == null || numSpills < minSpillsForCombine) {
  1323.             Merger.writeFile(kvIter, writer, reporter, job);
  1324.           } else {
  1325.             combineCollector.setWriter(writer);
  1326.             combinerRunner.combine(kvIter, combineCollector);
  1327.           }
  1328.           //close
  1329.           writer.close();
  1330.           // record offsets
  1331.           rec.startOffset = segmentStart;
  1332.           rec.rawLength = writer.getRawLength();
  1333.           rec.partLength = writer.getCompressedLength();
  1334.           spillRec.putIndex(rec, parts);
  1335.         }
  1336.         spillRec.writeToFile(finalIndexFile, job);
  1337.         finalOut.close();
  1338.         for(int i = 0; i < numSpills; i++) {
  1339.           rfs.delete(filename[i],true);
  1340.         }
  1341.       }
  1342.     }
  1343.   } // MapOutputBuffer
  1344.   
  1345.   /**
  1346.    * Exception indicating that the allocated sort buffer is insufficient
  1347.    * to hold the current record.
  1348.    */
  1349.   @SuppressWarnings("serial")
  1350.   private static class MapBufferTooSmallException extends IOException {
  1351.     public MapBufferTooSmallException(String s) {
  1352.       super(s);
  1353.     }
  1354.   }
  1355. }