PipesMapRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.IOException;
  20. import org.apache.hadoop.io.FloatWritable;
  21. import org.apache.hadoop.io.NullWritable;
  22. import org.apache.hadoop.io.Writable;
  23. import org.apache.hadoop.io.WritableComparable;
  24. import org.apache.hadoop.mapred.JobConf;
  25. import org.apache.hadoop.mapred.MapRunner;
  26. import org.apache.hadoop.mapred.OutputCollector;
  27. import org.apache.hadoop.mapred.RecordReader;
  28. import org.apache.hadoop.mapred.Reporter;
  29. import org.apache.hadoop.mapred.SkipBadRecords;
  30. /**
  31.  * An adaptor to run a C++ mapper.
  32.  */
  33. class PipesMapRunner<K1 extends WritableComparable, V1 extends Writable,
  34.     K2 extends WritableComparable, V2 extends Writable>
  35.     extends MapRunner<K1, V1, K2, V2> {
  36.   private JobConf job;
  37.   /**
  38.    * Get the new configuration.
  39.    * @param job the job's configuration
  40.    */
  41.   public void configure(JobConf job) {
  42.     this.job = job;
  43.     //disable the auto increment of the counter. For pipes, no of processed 
  44.     //records could be different(equal or less) than the no of records input.
  45.     SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  46.   }
  47.   /**
  48.    * Run the map task.
  49.    * @param input the set of inputs
  50.    * @param output the object to collect the outputs of the map
  51.    * @param reporter the object to update with status
  52.    */
  53.   @SuppressWarnings("unchecked")
  54.   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  55.                   Reporter reporter) throws IOException {
  56.     Application<K1, V1, K2, V2> application = null;
  57.     try {
  58.       RecordReader<FloatWritable, NullWritable> fakeInput = 
  59.         (!Submitter.getIsJavaRecordReader(job) && 
  60.          !Submitter.getIsJavaMapper(job)) ? 
  61.   (RecordReader<FloatWritable, NullWritable>) input : null;
  62.       application = new Application<K1, V1, K2, V2>(job, fakeInput, output, 
  63.                                                     reporter,
  64.           (Class<? extends K2>) job.getOutputKeyClass(), 
  65.           (Class<? extends V2>) job.getOutputValueClass());
  66.     } catch (InterruptedException ie) {
  67.       throw new RuntimeException("interrupted", ie);
  68.     }
  69.     DownwardProtocol<K1, V1> downlink = application.getDownlink();
  70.     boolean isJavaInput = Submitter.getIsJavaRecordReader(job);
  71.     downlink.runMap(reporter.getInputSplit(), 
  72.                     job.getNumReduceTasks(), isJavaInput);
  73.     boolean skipping = job.getBoolean("mapred.skip.on", false);
  74.     try {
  75.       if (isJavaInput) {
  76.         // allocate key & value instances that are re-used for all entries
  77.         K1 key = input.createKey();
  78.         V1 value = input.createValue();
  79.         downlink.setInputTypes(key.getClass().getName(),
  80.                                value.getClass().getName());
  81.         
  82.         while (input.next(key, value)) {
  83.           // map pair to output
  84.           downlink.mapItem(key, value);
  85.           if(skipping) {
  86.             //flush the streams on every record input if running in skip mode
  87.             //so that we don't buffer other records surrounding a bad record.
  88.             downlink.flush();
  89.           }
  90.         }
  91.         downlink.endOfInput();
  92.       }
  93.       application.waitForFinish();
  94.     } catch (Throwable t) {
  95.       application.abort(t);
  96.     } finally {
  97.       application.cleanup();
  98.     }
  99.   }
  100.   
  101. }