MapTask.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:53k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred;
- import static org.apache.hadoop.mapred.Task.Counter.COMBINE_INPUT_RECORDS;
- import static org.apache.hadoop.mapred.Task.Counter.COMBINE_OUTPUT_RECORDS;
- import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
- import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
- import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
- import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_RECORDS;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.DataOutputStream;
- import java.io.IOException;
- import java.io.OutputStream;
- import java.lang.reflect.Constructor;
- import java.lang.reflect.InvocationTargetException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.concurrent.locks.Condition;
- import java.util.concurrent.locks.ReentrantLock;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataOutputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.LocalFileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.BytesWritable;
- import org.apache.hadoop.io.DataInputBuffer;
- import org.apache.hadoop.io.RawComparator;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.SequenceFile.CompressionType;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.DefaultCodec;
- import org.apache.hadoop.io.serializer.Deserializer;
- import org.apache.hadoop.io.serializer.SerializationFactory;
- import org.apache.hadoop.io.serializer.Serializer;
- import org.apache.hadoop.mapred.IFile.Writer;
- import org.apache.hadoop.mapred.Merger.Segment;
- import org.apache.hadoop.mapred.SortedRanges.SkipRangeIterator;
- import org.apache.hadoop.mapreduce.TaskAttemptContext;
- import org.apache.hadoop.util.IndexedSortable;
- import org.apache.hadoop.util.IndexedSorter;
- import org.apache.hadoop.util.Progress;
- import org.apache.hadoop.util.QuickSort;
- import org.apache.hadoop.util.ReflectionUtils;
- /** A Map task. */
- class MapTask extends Task {
- /**
- * The size of each record in the index file for the map-outputs.
- */
- public static final int MAP_OUTPUT_INDEX_RECORD_LENGTH = 24;
-
- private BytesWritable split = new BytesWritable();
- private String splitClass;
- private final static int APPROX_HEADER_LENGTH = 150;
- private static final Log LOG = LogFactory.getLog(MapTask.class.getName());
- { // set phase for this task
- setPhase(TaskStatus.Phase.MAP);
- }
- public MapTask() {
- super();
- }
- public MapTask(String jobFile, TaskAttemptID taskId,
- int partition, String splitClass, BytesWritable split
- ) {
- super(jobFile, taskId, partition);
- this.splitClass = splitClass;
- this.split = split;
- }
- @Override
- public boolean isMapTask() {
- return true;
- }
- @Override
- public void localizeConfiguration(JobConf conf) throws IOException {
- super.localizeConfiguration(conf);
- if (isMapOrReduce()) {
- Path localSplit = new Path(new Path(getJobFile()).getParent(),
- "split.dta");
- LOG.debug("Writing local split to " + localSplit);
- DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
- Text.writeString(out, splitClass);
- split.write(out);
- out.close();
- }
- }
-
- @Override
- public TaskRunner createRunner(TaskTracker tracker,
- TaskTracker.TaskInProgress tip) {
- return new MapTaskRunner(tip, tracker, this.conf);
- }
- @Override
- public void write(DataOutput out) throws IOException {
- super.write(out);
- if (isMapOrReduce()) {
- Text.writeString(out, splitClass);
- split.write(out);
- split = null;
- }
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- super.readFields(in);
- if (isMapOrReduce()) {
- splitClass = Text.readString(in);
- split.readFields(in);
- }
- }
- /**
- * This class wraps the user's record reader to update the counters and progress
- * as records are read.
- * @param <K>
- * @param <V>
- */
- class TrackedRecordReader<K, V>
- implements RecordReader<K,V> {
- private RecordReader<K,V> rawIn;
- private Counters.Counter inputByteCounter;
- private Counters.Counter inputRecordCounter;
- private TaskReporter reporter;
- private long beforePos = -1;
- private long afterPos = -1;
-
- TrackedRecordReader(RecordReader<K,V> raw, TaskReporter reporter)
- throws IOException{
- rawIn = raw;
- inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
- inputByteCounter = reporter.getCounter(MAP_INPUT_BYTES);
- this.reporter = reporter;
- }
- public K createKey() {
- return rawIn.createKey();
- }
-
- public V createValue() {
- return rawIn.createValue();
- }
-
- public synchronized boolean next(K key, V value)
- throws IOException {
- boolean ret = moveToNext(key, value);
- if (ret) {
- incrCounters();
- }
- return ret;
- }
-
- protected void incrCounters() {
- inputRecordCounter.increment(1);
- inputByteCounter.increment(afterPos - beforePos);
- }
-
- protected synchronized boolean moveToNext(K key, V value)
- throws IOException {
- reporter.setProgress(getProgress());
- beforePos = getPos();
- boolean ret = rawIn.next(key, value);
- afterPos = getPos();
- return ret;
- }
-
- public long getPos() throws IOException { return rawIn.getPos(); }
- public void close() throws IOException { rawIn.close(); }
- public float getProgress() throws IOException {
- return rawIn.getProgress();
- }
- TaskReporter getTaskReporter() {
- return reporter;
- }
- }
- /**
- * This class skips the records based on the failed ranges from previous
- * attempts.
- */
- class SkippingRecordReader<K, V> extends TrackedRecordReader<K,V> {
- private SkipRangeIterator skipIt;
- private SequenceFile.Writer skipWriter;
- private boolean toWriteSkipRecs;
- private TaskUmbilicalProtocol umbilical;
- private Counters.Counter skipRecCounter;
- private long recIndex = -1;
-
- SkippingRecordReader(RecordReader<K,V> raw, TaskUmbilicalProtocol umbilical,
- TaskReporter reporter) throws IOException{
- super(raw, reporter);
- this.umbilical = umbilical;
- this.skipRecCounter = reporter.getCounter(Counter.MAP_SKIPPED_RECORDS);
- this.toWriteSkipRecs = toWriteSkipRecs() &&
- SkipBadRecords.getSkipOutputPath(conf)!=null;
- skipIt = getSkipRanges().skipRangeIterator();
- }
-
- public synchronized boolean next(K key, V value)
- throws IOException {
- if(!skipIt.hasNext()) {
- LOG.warn("Further records got skipped.");
- return false;
- }
- boolean ret = moveToNext(key, value);
- long nextRecIndex = skipIt.next();
- long skip = 0;
- while(recIndex<nextRecIndex && ret) {
- if(toWriteSkipRecs) {
- writeSkippedRec(key, value);
- }
- ret = moveToNext(key, value);
- skip++;
- }
- //close the skip writer once all the ranges are skipped
- if(skip>0 && skipIt.skippedAllRanges() && skipWriter!=null) {
- skipWriter.close();
- }
- skipRecCounter.increment(skip);
- reportNextRecordRange(umbilical, recIndex);
- if (ret) {
- incrCounters();
- }
- return ret;
- }
-
- protected synchronized boolean moveToNext(K key, V value)
- throws IOException {
- recIndex++;
- return super.moveToNext(key, value);
- }
-
- @SuppressWarnings("unchecked")
- private void writeSkippedRec(K key, V value) throws IOException{
- if(skipWriter==null) {
- Path skipDir = SkipBadRecords.getSkipOutputPath(conf);
- Path skipFile = new Path(skipDir, getTaskID().toString());
- skipWriter =
- SequenceFile.createWriter(
- skipFile.getFileSystem(conf), conf, skipFile,
- (Class<K>) createKey().getClass(),
- (Class<V>) createValue().getClass(),
- CompressionType.BLOCK, getTaskReporter());
- }
- skipWriter.append(key, value);
- }
- }
- @Override
- public void run(final JobConf job, final TaskUmbilicalProtocol umbilical)
- throws IOException, ClassNotFoundException, InterruptedException {
- // start thread that will handle communication with parent
- TaskReporter reporter = new TaskReporter(getProgress(), umbilical);
- reporter.startCommunicationThread();
- boolean useNewApi = job.getUseNewMapper();
- initialize(job, getJobID(), reporter, useNewApi);
- // check if it is a cleanupJobTask
- if (jobCleanup) {
- runJobCleanupTask(umbilical, reporter);
- return;
- }
- if (jobSetup) {
- runJobSetupTask(umbilical, reporter);
- return;
- }
- if (taskCleanup) {
- runTaskCleanupTask(umbilical, reporter);
- return;
- }
- if (useNewApi) {
- runNewMapper(job, split, umbilical, reporter);
- } else {
- runOldMapper(job, split, umbilical, reporter);
- }
- done(umbilical, reporter);
- }
- @SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runOldMapper(final JobConf job,
- final BytesWritable rawSplit,
- final TaskUmbilicalProtocol umbilical,
- TaskReporter reporter
- ) throws IOException, InterruptedException,
- ClassNotFoundException {
- InputSplit inputSplit = null;
- // reinstantiate the split
- try {
- inputSplit = (InputSplit)
- ReflectionUtils.newInstance(job.getClassByName(splitClass), job);
- } catch (ClassNotFoundException exp) {
- IOException wrap = new IOException("Split class " + splitClass +
- " not found");
- wrap.initCause(exp);
- throw wrap;
- }
- DataInputBuffer splitBuffer = new DataInputBuffer();
- splitBuffer.reset(split.getBytes(), 0, split.getLength());
- inputSplit.readFields(splitBuffer);
-
- updateJobWithSplit(job, inputSplit);
- reporter.setInputSplit(inputSplit);
- RecordReader<INKEY,INVALUE> rawIn = // open input
- job.getInputFormat().getRecordReader(inputSplit, job, reporter);
- RecordReader<INKEY,INVALUE> in = isSkipping() ?
- new SkippingRecordReader<INKEY,INVALUE>(rawIn, umbilical, reporter) :
- new TrackedRecordReader<INKEY,INVALUE>(rawIn, reporter);
- job.setBoolean("mapred.skip.on", isSkipping());
- int numReduceTasks = conf.getNumReduceTasks();
- LOG.info("numReduceTasks: " + numReduceTasks);
- MapOutputCollector collector = null;
- if (numReduceTasks > 0) {
- collector = new MapOutputBuffer(umbilical, job, reporter);
- } else {
- collector = new DirectMapOutputCollector(umbilical, job, reporter);
- }
- MapRunnable<INKEY,INVALUE,OUTKEY,OUTVALUE> runner =
- ReflectionUtils.newInstance(job.getMapRunnerClass(), job);
- try {
- runner.run(in, collector, reporter);
- collector.flush();
- } finally {
- //close
- in.close(); // close input
- collector.close();
- }
- }
- /**
- * Update the job with details about the file split
- * @param job the job configuration to update
- * @param inputSplit the file split
- */
- private void updateJobWithSplit(final JobConf job, InputSplit inputSplit) {
- if (inputSplit instanceof FileSplit) {
- FileSplit fileSplit = (FileSplit) inputSplit;
- job.set("map.input.file", fileSplit.getPath().toString());
- job.setLong("map.input.start", fileSplit.getStart());
- job.setLong("map.input.length", fileSplit.getLength());
- }
- }
- static class NewTrackingRecordReader<K,V>
- extends org.apache.hadoop.mapreduce.RecordReader<K,V> {
- private final org.apache.hadoop.mapreduce.RecordReader<K,V> real;
- private final org.apache.hadoop.mapreduce.Counter inputRecordCounter;
-
- NewTrackingRecordReader(org.apache.hadoop.mapreduce.RecordReader<K,V> real,
- TaskReporter reporter) {
- this.real = real;
- this.inputRecordCounter = reporter.getCounter(MAP_INPUT_RECORDS);
- }
- @Override
- public void close() throws IOException {
- real.close();
- }
- @Override
- public K getCurrentKey() throws IOException, InterruptedException {
- return real.getCurrentKey();
- }
- @Override
- public V getCurrentValue() throws IOException, InterruptedException {
- return real.getCurrentValue();
- }
- @Override
- public float getProgress() throws IOException, InterruptedException {
- return real.getProgress();
- }
- @Override
- public void initialize(org.apache.hadoop.mapreduce.InputSplit split,
- org.apache.hadoop.mapreduce.TaskAttemptContext context
- ) throws IOException, InterruptedException {
- real.initialize(split, context);
- }
- @Override
- public boolean nextKeyValue() throws IOException, InterruptedException {
- boolean result = real.nextKeyValue();
- if (result) {
- inputRecordCounter.increment(1);
- }
- return result;
- }
- }
- private class NewOutputCollector<K,V>
- extends org.apache.hadoop.mapreduce.RecordWriter<K,V> {
- private MapOutputCollector<K,V> collector;
- NewOutputCollector(JobConf job,
- TaskUmbilicalProtocol umbilical,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException {
- collector = new MapOutputBuffer<K,V>(umbilical, job, reporter);
- }
- @Override
- public void write(K key, V value) throws IOException {
- collector.collect(key, value);
- }
- @Override
- public void close(TaskAttemptContext context
- ) throws IOException,InterruptedException {
- try {
- collector.flush();
- } catch (ClassNotFoundException cnf) {
- throw new IOException("can't find class ", cnf);
- }
- collector.close();
- }
- }
- @SuppressWarnings("unchecked")
- private <INKEY,INVALUE,OUTKEY,OUTVALUE>
- void runNewMapper(final JobConf job,
- final BytesWritable rawSplit,
- final TaskUmbilicalProtocol umbilical,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException,
- InterruptedException {
- // make a task context so we can get the classes
- org.apache.hadoop.mapreduce.TaskAttemptContext taskContext =
- new org.apache.hadoop.mapreduce.TaskAttemptContext(job, getTaskID());
- // make a mapper
- org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE> mapper =
- (org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>)
- ReflectionUtils.newInstance(taskContext.getMapperClass(), job);
- // make the input format
- org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE> inputFormat =
- (org.apache.hadoop.mapreduce.InputFormat<INKEY,INVALUE>)
- ReflectionUtils.newInstance(taskContext.getInputFormatClass(), job);
- // rebuild the input split
- org.apache.hadoop.mapreduce.InputSplit split = null;
- DataInputBuffer splitBuffer = new DataInputBuffer();
- splitBuffer.reset(rawSplit.getBytes(), 0, rawSplit.getLength());
- SerializationFactory factory = new SerializationFactory(job);
- Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>
- deserializer =
- (Deserializer<? extends org.apache.hadoop.mapreduce.InputSplit>)
- factory.getDeserializer(job.getClassByName(splitClass));
- deserializer.open(splitBuffer);
- split = deserializer.deserialize(null);
- org.apache.hadoop.mapreduce.RecordReader<INKEY,INVALUE> input =
- new NewTrackingRecordReader<INKEY,INVALUE>
- (inputFormat.createRecordReader(split, taskContext), reporter);
-
- job.setBoolean("mapred.skip.on", isSkipping());
- org.apache.hadoop.mapreduce.RecordWriter output = null;
- org.apache.hadoop.mapreduce.Mapper<INKEY,INVALUE,OUTKEY,OUTVALUE>.Context
- mapperContext = null;
- try {
- Constructor<org.apache.hadoop.mapreduce.Mapper.Context> contextConstructor =
- org.apache.hadoop.mapreduce.Mapper.Context.class.getConstructor
- (new Class[]{org.apache.hadoop.mapreduce.Mapper.class,
- Configuration.class,
- org.apache.hadoop.mapreduce.TaskAttemptID.class,
- org.apache.hadoop.mapreduce.RecordReader.class,
- org.apache.hadoop.mapreduce.RecordWriter.class,
- org.apache.hadoop.mapreduce.OutputCommitter.class,
- org.apache.hadoop.mapreduce.StatusReporter.class,
- org.apache.hadoop.mapreduce.InputSplit.class});
- // get an output object
- if (job.getNumReduceTasks() == 0) {
- output = outputFormat.getRecordWriter(taskContext);
- } else {
- output = new NewOutputCollector(job, umbilical, reporter);
- }
- mapperContext = contextConstructor.newInstance(mapper, job, getTaskID(),
- input, output, committer,
- reporter, split);
- input.initialize(split, mapperContext);
- mapper.run(mapperContext);
- input.close();
- output.close(mapperContext);
- } catch (NoSuchMethodException e) {
- throw new IOException("Can't find Context constructor", e);
- } catch (InstantiationException e) {
- throw new IOException("Can't create Context", e);
- } catch (InvocationTargetException e) {
- throw new IOException("Can't invoke Context constructor", e);
- } catch (IllegalAccessException e) {
- throw new IOException("Can't invoke Context constructor", e);
- }
- }
- interface MapOutputCollector<K, V>
- extends OutputCollector<K, V> {
- public void close() throws IOException, InterruptedException;
-
- public void flush() throws IOException, InterruptedException,
- ClassNotFoundException;
-
- }
- class DirectMapOutputCollector<K, V>
- implements MapOutputCollector<K, V> {
-
- private RecordWriter<K, V> out = null;
- private TaskReporter reporter = null;
- private final Counters.Counter mapOutputRecordCounter;
- @SuppressWarnings("unchecked")
- public DirectMapOutputCollector(TaskUmbilicalProtocol umbilical,
- JobConf job, TaskReporter reporter) throws IOException {
- this.reporter = reporter;
- String finalName = getOutputName(getPartition());
- FileSystem fs = FileSystem.get(job);
- out = job.getOutputFormat().getRecordWriter(fs, job, finalName, reporter);
- mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
- }
- public void close() throws IOException {
- if (this.out != null) {
- out.close(this.reporter);
- }
- }
- public void flush() throws IOException, InterruptedException,
- ClassNotFoundException {
- }
- public void collect(K key, V value) throws IOException {
- reporter.progress();
- out.write(key, value);
- mapOutputRecordCounter.increment(1);
- }
-
- }
- class MapOutputBuffer<K extends Object, V extends Object>
- implements MapOutputCollector<K, V>, IndexedSortable {
- private final int partitions;
- private final Partitioner<K, V> partitioner;
- private final JobConf job;
- private final TaskReporter reporter;
- private final Class<K> keyClass;
- private final Class<V> valClass;
- private final RawComparator<K> comparator;
- private final SerializationFactory serializationFactory;
- private final Serializer<K> keySerializer;
- private final Serializer<V> valSerializer;
- private final CombinerRunner<K,V> combinerRunner;
- private final CombineOutputCollector<K, V> combineCollector;
-
- // Compression for map-outputs
- private CompressionCodec codec = null;
- // k/v accounting
- private volatile int kvstart = 0; // marks beginning of spill
- private volatile int kvend = 0; // marks beginning of collectable
- private int kvindex = 0; // marks end of collected
- private final int[] kvoffsets; // indices into kvindices
- private final int[] kvindices; // partition, k/v offsets into kvbuffer
- private volatile int bufstart = 0; // marks beginning of spill
- private volatile int bufend = 0; // marks beginning of collectable
- private volatile int bufvoid = 0; // marks the point where we should stop
- // reading at the end of the buffer
- private int bufindex = 0; // marks end of collected
- private int bufmark = 0; // marks end of record
- private byte[] kvbuffer; // main output buffer
- private static final int PARTITION = 0; // partition offset in acct
- private static final int KEYSTART = 1; // key offset in acct
- private static final int VALSTART = 2; // val offset in acct
- private static final int ACCTSIZE = 3; // total #fields in acct
- private static final int RECSIZE =
- (ACCTSIZE + 1) * 4; // acct bytes per record
- // spill accounting
- private volatile int numSpills = 0;
- private volatile Throwable sortSpillException = null;
- private final int softRecordLimit;
- private final int softBufferLimit;
- private final int minSpillsForCombine;
- private final IndexedSorter sorter;
- private final ReentrantLock spillLock = new ReentrantLock();
- private final Condition spillDone = spillLock.newCondition();
- private final Condition spillReady = spillLock.newCondition();
- private final BlockingBuffer bb = new BlockingBuffer();
- private volatile boolean spillThreadRunning = false;
- private final SpillThread spillThread = new SpillThread();
- private final FileSystem localFs;
- private final FileSystem rfs;
-
- private final Counters.Counter mapOutputByteCounter;
- private final Counters.Counter mapOutputRecordCounter;
- private final Counters.Counter combineOutputCounter;
-
- private ArrayList<SpillRecord> indexCacheList;
- private int totalIndexCacheMemory;
- private static final int INDEX_CACHE_MEMORY_LIMIT = 1024 * 1024;
- @SuppressWarnings("unchecked")
- public MapOutputBuffer(TaskUmbilicalProtocol umbilical, JobConf job,
- TaskReporter reporter
- ) throws IOException, ClassNotFoundException {
- this.job = job;
- this.reporter = reporter;
- localFs = FileSystem.getLocal(job);
- partitions = job.getNumReduceTasks();
- partitioner = ReflectionUtils.newInstance(job.getPartitionerClass(), job);
-
- rfs = ((LocalFileSystem)localFs).getRaw();
- indexCacheList = new ArrayList<SpillRecord>();
-
- //sanity checks
- final float spillper = job.getFloat("io.sort.spill.percent",(float)0.8);
- final float recper = job.getFloat("io.sort.record.percent",(float)0.05);
- final int sortmb = job.getInt("io.sort.mb", 100);
- if (spillper > (float)1.0 || spillper < (float)0.0) {
- throw new IOException("Invalid "io.sort.spill.percent": " + spillper);
- }
- if (recper > (float)1.0 || recper < (float)0.01) {
- throw new IOException("Invalid "io.sort.record.percent": " + recper);
- }
- if ((sortmb & 0x7FF) != sortmb) {
- throw new IOException("Invalid "io.sort.mb": " + sortmb);
- }
- sorter = ReflectionUtils.newInstance(
- job.getClass("map.sort.class", QuickSort.class, IndexedSorter.class), job);
- LOG.info("io.sort.mb = " + sortmb);
- // buffers and accounting
- int maxMemUsage = sortmb << 20;
- int recordCapacity = (int)(maxMemUsage * recper);
- recordCapacity -= recordCapacity % RECSIZE;
- kvbuffer = new byte[maxMemUsage - recordCapacity];
- bufvoid = kvbuffer.length;
- recordCapacity /= RECSIZE;
- kvoffsets = new int[recordCapacity];
- kvindices = new int[recordCapacity * ACCTSIZE];
- softBufferLimit = (int)(kvbuffer.length * spillper);
- softRecordLimit = (int)(kvoffsets.length * spillper);
- LOG.info("data buffer = " + softBufferLimit + "/" + kvbuffer.length);
- LOG.info("record buffer = " + softRecordLimit + "/" + kvoffsets.length);
- // k/v serialization
- comparator = job.getOutputKeyComparator();
- keyClass = (Class<K>)job.getMapOutputKeyClass();
- valClass = (Class<V>)job.getMapOutputValueClass();
- serializationFactory = new SerializationFactory(job);
- keySerializer = serializationFactory.getSerializer(keyClass);
- keySerializer.open(bb);
- valSerializer = serializationFactory.getSerializer(valClass);
- valSerializer.open(bb);
- // counters
- mapOutputByteCounter = reporter.getCounter(MAP_OUTPUT_BYTES);
- mapOutputRecordCounter = reporter.getCounter(MAP_OUTPUT_RECORDS);
- Counters.Counter combineInputCounter =
- reporter.getCounter(COMBINE_INPUT_RECORDS);
- combineOutputCounter = reporter.getCounter(COMBINE_OUTPUT_RECORDS);
- // compression
- if (job.getCompressMapOutput()) {
- Class<? extends CompressionCodec> codecClass =
- job.getMapOutputCompressorClass(DefaultCodec.class);
- codec = ReflectionUtils.newInstance(codecClass, job);
- }
- // combiner
- combinerRunner = CombinerRunner.create(job, getTaskID(),
- combineInputCounter,
- reporter, null);
- if (combinerRunner != null) {
- combineCollector= new CombineOutputCollector<K,V>(combineOutputCounter);
- } else {
- combineCollector = null;
- }
- minSpillsForCombine = job.getInt("min.num.spills.for.combine", 3);
- spillThread.setDaemon(true);
- spillThread.setName("SpillThread");
- spillLock.lock();
- try {
- spillThread.start();
- while (!spillThreadRunning) {
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw (IOException)new IOException("Spill thread failed to initialize"
- ).initCause(sortSpillException);
- } finally {
- spillLock.unlock();
- }
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill thread failed to initialize"
- ).initCause(sortSpillException);
- }
- }
- public synchronized void collect(K key, V value)
- throws IOException {
- reporter.progress();
- if (key.getClass() != keyClass) {
- throw new IOException("Type mismatch in key from map: expected "
- + keyClass.getName() + ", recieved "
- + key.getClass().getName());
- }
- if (value.getClass() != valClass) {
- throw new IOException("Type mismatch in value from map: expected "
- + valClass.getName() + ", recieved "
- + value.getClass().getName());
- }
- final int kvnext = (kvindex + 1) % kvoffsets.length;
- spillLock.lock();
- try {
- boolean kvfull;
- do {
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- // sufficient acct space
- kvfull = kvnext == kvstart;
- final boolean kvsoftlimit = ((kvnext > kvend)
- ? kvnext - kvend > softRecordLimit
- : kvend - kvnext <= kvoffsets.length - softRecordLimit);
- if (kvstart == kvend && kvsoftlimit) {
- LOG.info("Spilling map output: record full = " + kvsoftlimit);
- startSpill();
- }
- if (kvfull) {
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Collector interrupted while waiting for the writer"
- ).initCause(e);
- }
- }
- } while (kvfull);
- } finally {
- spillLock.unlock();
- }
- try {
- // serialize key bytes into buffer
- int keystart = bufindex;
- keySerializer.serialize(key);
- if (bufindex < keystart) {
- // wrapped the key; reset required
- bb.reset();
- keystart = 0;
- }
- // serialize value bytes into buffer
- final int valstart = bufindex;
- valSerializer.serialize(value);
- int valend = bb.markRecord();
- final int partition = partitioner.getPartition(key, value, partitions);
- if (partition < 0 || partition >= partitions) {
- throw new IOException("Illegal partition for " + key + " (" +
- partition + ")");
- }
- mapOutputRecordCounter.increment(1);
- mapOutputByteCounter.increment(valend >= keystart
- ? valend - keystart
- : (bufvoid - keystart) + valend);
- // update accounting info
- int ind = kvindex * ACCTSIZE;
- kvoffsets[kvindex] = ind;
- kvindices[ind + PARTITION] = partition;
- kvindices[ind + KEYSTART] = keystart;
- kvindices[ind + VALSTART] = valstart;
- kvindex = kvnext;
- } catch (MapBufferTooSmallException e) {
- LOG.info("Record too large for in-memory buffer: " + e.getMessage());
- spillSingleRecord(key, value);
- mapOutputRecordCounter.increment(1);
- return;
- }
- }
- /**
- * Compare logical range, st i, j MOD offset capacity.
- * Compare by partition, then by key.
- * @see IndexedSortable#compare
- */
- public int compare(int i, int j) {
- final int ii = kvoffsets[i % kvoffsets.length];
- final int ij = kvoffsets[j % kvoffsets.length];
- // sort by partition
- if (kvindices[ii + PARTITION] != kvindices[ij + PARTITION]) {
- return kvindices[ii + PARTITION] - kvindices[ij + PARTITION];
- }
- // sort by key
- return comparator.compare(kvbuffer,
- kvindices[ii + KEYSTART],
- kvindices[ii + VALSTART] - kvindices[ii + KEYSTART],
- kvbuffer,
- kvindices[ij + KEYSTART],
- kvindices[ij + VALSTART] - kvindices[ij + KEYSTART]);
- }
- /**
- * Swap logical indices st i, j MOD offset capacity.
- * @see IndexedSortable#swap
- */
- public void swap(int i, int j) {
- i %= kvoffsets.length;
- j %= kvoffsets.length;
- int tmp = kvoffsets[i];
- kvoffsets[i] = kvoffsets[j];
- kvoffsets[j] = tmp;
- }
- /**
- * Inner class managing the spill of serialized records to disk.
- */
- protected class BlockingBuffer extends DataOutputStream {
- public BlockingBuffer() {
- this(new Buffer());
- }
- private BlockingBuffer(OutputStream out) {
- super(out);
- }
- /**
- * Mark end of record. Note that this is required if the buffer is to
- * cut the spill in the proper place.
- */
- public int markRecord() {
- bufmark = bufindex;
- return bufindex;
- }
- /**
- * Set position from last mark to end of writable buffer, then rewrite
- * the data between last mark and kvindex.
- * This handles a special case where the key wraps around the buffer.
- * If the key is to be passed to a RawComparator, then it must be
- * contiguous in the buffer. This recopies the data in the buffer back
- * into itself, but starting at the beginning of the buffer. Note that
- * reset() should <b>only</b> be called immediately after detecting
- * this condition. To call it at any other time is undefined and would
- * likely result in data loss or corruption.
- * @see #markRecord()
- */
- protected synchronized void reset() throws IOException {
- // spillLock unnecessary; If spill wraps, then
- // bufindex < bufstart < bufend so contention is impossible
- // a stale value for bufstart does not affect correctness, since
- // we can only get false negatives that force the more
- // conservative path
- int headbytelen = bufvoid - bufmark;
- bufvoid = bufmark;
- if (bufindex + headbytelen < bufstart) {
- System.arraycopy(kvbuffer, 0, kvbuffer, headbytelen, bufindex);
- System.arraycopy(kvbuffer, bufvoid, kvbuffer, 0, headbytelen);
- bufindex += headbytelen;
- } else {
- byte[] keytmp = new byte[bufindex];
- System.arraycopy(kvbuffer, 0, keytmp, 0, bufindex);
- bufindex = 0;
- out.write(kvbuffer, bufmark, headbytelen);
- out.write(keytmp);
- }
- }
- }
- public class Buffer extends OutputStream {
- private final byte[] scratch = new byte[1];
- @Override
- public synchronized void write(int v)
- throws IOException {
- scratch[0] = (byte)v;
- write(scratch, 0, 1);
- }
- /**
- * Attempt to write a sequence of bytes to the collection buffer.
- * This method will block if the spill thread is running and it
- * cannot write.
- * @throws MapBufferTooSmallException if record is too large to
- * deserialize into the collection buffer.
- */
- @Override
- public synchronized void write(byte b[], int off, int len)
- throws IOException {
- boolean buffull = false;
- boolean wrap = false;
- spillLock.lock();
- try {
- do {
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- // sufficient buffer space?
- if (bufstart <= bufend && bufend <= bufindex) {
- buffull = bufindex + len > bufvoid;
- wrap = (bufvoid - bufindex) + bufstart > len;
- } else {
- // bufindex <= bufstart <= bufend
- // bufend <= bufindex <= bufstart
- wrap = false;
- buffull = bufindex + len > bufstart;
- }
- if (kvstart == kvend) {
- // spill thread not running
- if (kvend != kvindex) {
- // we have records we can spill
- final boolean bufsoftlimit = (bufindex > bufend)
- ? bufindex - bufend > softBufferLimit
- : bufend - bufindex < bufvoid - softBufferLimit;
- if (bufsoftlimit || (buffull && !wrap)) {
- LOG.info("Spilling map output: buffer full= " + bufsoftlimit);
- startSpill();
- }
- } else if (buffull && !wrap) {
- // We have no buffered records, and this record is too large
- // to write into kvbuffer. We must spill it directly from
- // collect
- final int size = ((bufend <= bufindex)
- ? bufindex - bufend
- : (bufvoid - bufend) + bufindex) + len;
- bufstart = bufend = bufindex = bufmark = 0;
- kvstart = kvend = kvindex = 0;
- bufvoid = kvbuffer.length;
- throw new MapBufferTooSmallException(size + " bytes");
- }
- }
- if (buffull && !wrap) {
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
- }
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Buffer interrupted while waiting for the writer"
- ).initCause(e);
- }
- }
- } while (buffull && !wrap);
- } finally {
- spillLock.unlock();
- }
- // here, we know that we have sufficient space to write
- if (buffull) {
- final int gaplen = bufvoid - bufindex;
- System.arraycopy(b, off, kvbuffer, bufindex, gaplen);
- len -= gaplen;
- off += gaplen;
- bufindex = 0;
- }
- System.arraycopy(b, off, kvbuffer, bufindex, len);
- bufindex += len;
- }
- }
- public synchronized void flush() throws IOException, ClassNotFoundException,
- InterruptedException {
- LOG.info("Starting flush of map output");
- spillLock.lock();
- try {
- while (kvstart != kvend) {
- reporter.progress();
- spillDone.await();
- }
- if (sortSpillException != null) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(sortSpillException);
- }
- if (kvend != kvindex) {
- kvend = kvindex;
- bufend = bufmark;
- sortAndSpill();
- }
- } catch (InterruptedException e) {
- throw (IOException)new IOException(
- "Buffer interrupted while waiting for the writer"
- ).initCause(e);
- } finally {
- spillLock.unlock();
- }
- assert !spillLock.isHeldByCurrentThread();
- // shut down spill thread and wait for it to exit. Since the preceding
- // ensures that it is finished with its work (and sortAndSpill did not
- // throw), we elect to use an interrupt instead of setting a flag.
- // Spilling simultaneously from this thread while the spill thread
- // finishes its work might be both a useful way to extend this and also
- // sufficient motivation for the latter approach.
- try {
- spillThread.interrupt();
- spillThread.join();
- } catch (InterruptedException e) {
- throw (IOException)new IOException("Spill failed"
- ).initCause(e);
- }
- // release sort buffer before the merge
- kvbuffer = null;
- mergeParts();
- }
- public void close() { }
- protected class SpillThread extends Thread {
- @Override
- public void run() {
- spillLock.lock();
- spillThreadRunning = true;
- try {
- while (true) {
- spillDone.signal();
- while (kvstart == kvend) {
- spillReady.await();
- }
- try {
- spillLock.unlock();
- sortAndSpill();
- } catch (Throwable e) {
- sortSpillException = e;
- } finally {
- spillLock.lock();
- if (bufend < bufindex && bufindex < bufstart) {
- bufvoid = kvbuffer.length;
- }
- kvstart = kvend;
- bufstart = bufend;
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- } finally {
- spillLock.unlock();
- spillThreadRunning = false;
- }
- }
- }
- private synchronized void startSpill() {
- LOG.info("bufstart = " + bufstart + "; bufend = " + bufmark +
- "; bufvoid = " + bufvoid);
- LOG.info("kvstart = " + kvstart + "; kvend = " + kvindex +
- "; length = " + kvoffsets.length);
- kvend = kvindex;
- bufend = bufmark;
- spillReady.signal();
- }
- private void sortAndSpill() throws IOException, ClassNotFoundException,
- InterruptedException {
- //approximate the length of the output file to be the length of the
- //buffer + header lengths for the partitions
- long size = (bufend >= bufstart
- ? bufend - bufstart
- : (bufvoid - bufend) + bufstart) +
- partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- try {
- // create spill file
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
- final int endPosition = (kvend > kvstart)
- ? kvend
- : kvoffsets.length + kvend;
- sorter.sort(MapOutputBuffer.this, kvstart, endPosition, reporter);
- int spindex = kvstart;
- IndexRecord rec = new IndexRecord();
- InMemValBytes value = new InMemValBytes();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- writer = new Writer<K, V>(job, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combinerRunner == null) {
- // spill directly
- DataInputBuffer key = new DataInputBuffer();
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- final int kvoff = kvoffsets[spindex % kvoffsets.length];
- getVBytesForOffset(kvoff, value);
- key.reset(kvbuffer, kvindices[kvoff + KEYSTART],
- (kvindices[kvoff + VALSTART] -
- kvindices[kvoff + KEYSTART]));
- writer.append(key, value);
- ++spindex;
- }
- } else {
- int spstart = spindex;
- while (spindex < endPosition &&
- kvindices[kvoffsets[spindex % kvoffsets.length]
- + PARTITION] == i) {
- ++spindex;
- }
- // Note: we would like to avoid the combiner if we've fewer
- // than some threshold of records for a partition
- if (spstart != spindex) {
- combineCollector.setWriter(writer);
- RawKeyValueIterator kvIter =
- new MRResultIterator(spstart, spindex);
- combinerRunner.combine(kvIter, combineCollector);
- }
- }
- // close the writer
- writer.close();
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, i);
- writer = null;
- } finally {
- if (null != writer) writer.close();
- }
- }
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- LOG.info("Finished spill " + numSpills);
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
- /**
- * Handles the degenerate case where serialization fails to fit in
- * the in-memory buffer, so we must spill the record from collect
- * directly to a spill file. Consider this "losing".
- */
- private void spillSingleRecord(final K key, final V value)
- throws IOException {
- long size = kvbuffer.length + partitions * APPROX_HEADER_LENGTH;
- FSDataOutputStream out = null;
- final int partition = partitioner.getPartition(key, value, partitions);
- try {
- // create spill file
- final SpillRecord spillRec = new SpillRecord(partitions);
- final Path filename = mapOutputFile.getSpillFileForWrite(getTaskID(),
- numSpills, size);
- out = rfs.create(filename);
-
- // we don't run the combiner for a single record
- IndexRecord rec = new IndexRecord();
- for (int i = 0; i < partitions; ++i) {
- IFile.Writer<K, V> writer = null;
- try {
- long segmentStart = out.getPos();
- // Create a new codec, don't care!
- writer = new IFile.Writer<K,V>(job, out, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (i == partition) {
- final long recordStart = out.getPos();
- writer.append(key, value);
- // Note that our map byte count will not be accurate with
- // compression
- mapOutputByteCounter.increment(out.getPos() - recordStart);
- }
- writer.close();
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, i);
- writer = null;
- } catch (IOException e) {
- if (null != writer) writer.close();
- throw e;
- }
- }
- if (totalIndexCacheMemory >= INDEX_CACHE_MEMORY_LIMIT) {
- // create spill index file
- Path indexFilename = mapOutputFile.getSpillIndexFileForWrite(
- getTaskID(), numSpills,
- partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH);
- spillRec.writeToFile(indexFilename, job);
- } else {
- indexCacheList.add(spillRec);
- totalIndexCacheMemory +=
- spillRec.size() * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- }
- ++numSpills;
- } finally {
- if (out != null) out.close();
- }
- }
- /**
- * Given an offset, populate vbytes with the associated set of
- * deserialized value bytes. Should only be called during a spill.
- */
- private void getVBytesForOffset(int kvoff, InMemValBytes vbytes) {
- final int nextindex = (kvoff / ACCTSIZE ==
- (kvend - 1 + kvoffsets.length) % kvoffsets.length)
- ? bufend
- : kvindices[(kvoff + ACCTSIZE + KEYSTART) % kvindices.length];
- int vallen = (nextindex >= kvindices[kvoff + VALSTART])
- ? nextindex - kvindices[kvoff + VALSTART]
- : (bufvoid - kvindices[kvoff + VALSTART]) + nextindex;
- vbytes.reset(kvbuffer, kvindices[kvoff + VALSTART], vallen);
- }
- /**
- * Inner class wrapping valuebytes, used for appendRaw.
- */
- protected class InMemValBytes extends DataInputBuffer {
- private byte[] buffer;
- private int start;
- private int length;
-
- public void reset(byte[] buffer, int start, int length) {
- this.buffer = buffer;
- this.start = start;
- this.length = length;
-
- if (start + length > bufvoid) {
- this.buffer = new byte[this.length];
- final int taillen = bufvoid - start;
- System.arraycopy(buffer, start, this.buffer, 0, taillen);
- System.arraycopy(buffer, 0, this.buffer, taillen, length-taillen);
- this.start = 0;
- }
-
- super.reset(this.buffer, this.start, this.length);
- }
- }
- protected class MRResultIterator implements RawKeyValueIterator {
- private final DataInputBuffer keybuf = new DataInputBuffer();
- private final InMemValBytes vbytes = new InMemValBytes();
- private final int end;
- private int current;
- public MRResultIterator(int start, int end) {
- this.end = end;
- current = start - 1;
- }
- public boolean next() throws IOException {
- return ++current < end;
- }
- public DataInputBuffer getKey() throws IOException {
- final int kvoff = kvoffsets[current % kvoffsets.length];
- keybuf.reset(kvbuffer, kvindices[kvoff + KEYSTART],
- kvindices[kvoff + VALSTART] - kvindices[kvoff + KEYSTART]);
- return keybuf;
- }
- public DataInputBuffer getValue() throws IOException {
- getVBytesForOffset(kvoffsets[current % kvoffsets.length], vbytes);
- return vbytes;
- }
- public Progress getProgress() {
- return null;
- }
- public void close() { }
- }
- private void mergeParts() throws IOException, InterruptedException,
- ClassNotFoundException {
- // get the approximate size of the final output/index files
- long finalOutFileSize = 0;
- long finalIndexFileSize = 0;
- final Path[] filename = new Path[numSpills];
- final TaskAttemptID mapId = getTaskID();
- for(int i = 0; i < numSpills; i++) {
- filename[i] = mapOutputFile.getSpillFile(mapId, i);
- finalOutFileSize += rfs.getFileStatus(filename[i]).getLen();
- }
- if (numSpills == 1) { //the spill is the final output
- rfs.rename(filename[0],
- new Path(filename[0].getParent(), "file.out"));
- if (indexCacheList.size() == 0) {
- rfs.rename(mapOutputFile.getSpillIndexFile(mapId, 0),
- new Path(filename[0].getParent(),"file.out.index"));
- } else {
- indexCacheList.get(0).writeToFile(
- new Path(filename[0].getParent(),"file.out.index"), job);
- }
- return;
- }
- // read in paged indices
- for (int i = indexCacheList.size(); i < numSpills; ++i) {
- Path indexFileName = mapOutputFile.getSpillIndexFile(mapId, i);
- indexCacheList.add(new SpillRecord(indexFileName, job));
- }
- //make correction in the length to include the sequence file header
- //lengths for each partition
- finalOutFileSize += partitions * APPROX_HEADER_LENGTH;
- finalIndexFileSize = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
- Path finalOutputFile = mapOutputFile.getOutputFileForWrite(mapId,
- finalOutFileSize);
- Path finalIndexFile = mapOutputFile.getOutputIndexFileForWrite(
- mapId, finalIndexFileSize);
- //The output stream for the final single output file
- FSDataOutputStream finalOut = rfs.create(finalOutputFile, true, 4096);
- if (numSpills == 0) {
- //create dummy files
- IndexRecord rec = new IndexRecord();
- SpillRecord sr = new SpillRecord(partitions);
- try {
- for (int i = 0; i < partitions; i++) {
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec, null);
- writer.close();
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- sr.putIndex(rec, i);
- }
- sr.writeToFile(finalIndexFile, job);
- } finally {
- finalOut.close();
- }
- return;
- }
- {
- IndexRecord rec = new IndexRecord();
- final SpillRecord spillRec = new SpillRecord(partitions);
- for (int parts = 0; parts < partitions; parts++) {
- //create the segments to be merged
- List<Segment<K,V>> segmentList =
- new ArrayList<Segment<K, V>>(numSpills);
- for(int i = 0; i < numSpills; i++) {
- IndexRecord indexRecord = indexCacheList.get(i).getIndex(parts);
- Segment<K,V> s =
- new Segment<K,V>(job, rfs, filename[i], indexRecord.startOffset,
- indexRecord.partLength, codec, true);
- segmentList.add(i, s);
- if (LOG.isDebugEnabled()) {
- LOG.debug("MapId=" + mapId + " Reducer=" + parts +
- "Spill =" + i + "(" + indexRecord.startOffset + "," +
- indexRecord.rawLength + ", " + indexRecord.partLength + ")");
- }
- }
- //merge
- @SuppressWarnings("unchecked")
- RawKeyValueIterator kvIter = Merger.merge(job, rfs,
- keyClass, valClass,
- segmentList, job.getInt("io.sort.factor", 100),
- new Path(mapId.toString()),
- job.getOutputKeyComparator(), reporter,
- null, spilledRecordsCounter);
- //write merged output to disk
- long segmentStart = finalOut.getPos();
- Writer<K, V> writer =
- new Writer<K, V>(job, finalOut, keyClass, valClass, codec,
- spilledRecordsCounter);
- if (combinerRunner == null || numSpills < minSpillsForCombine) {
- Merger.writeFile(kvIter, writer, reporter, job);
- } else {
- combineCollector.setWriter(writer);
- combinerRunner.combine(kvIter, combineCollector);
- }
- //close
- writer.close();
- // record offsets
- rec.startOffset = segmentStart;
- rec.rawLength = writer.getRawLength();
- rec.partLength = writer.getCompressedLength();
- spillRec.putIndex(rec, parts);
- }
- spillRec.writeToFile(finalIndexFile, job);
- finalOut.close();
- for(int i = 0; i < numSpills; i++) {
- rfs.delete(filename[i],true);
- }
- }
- }
- } // MapOutputBuffer
-
- /**
- * Exception indicating that the allocated sort buffer is insufficient
- * to hold the current record.
- */
- @SuppressWarnings("serial")
- private static class MapBufferTooSmallException extends IOException {
- public MapBufferTooSmallException(String s) {
- super(s);
- }
- }
- }