Mapper.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.io.RawComparator;
  22. import org.apache.hadoop.io.compress.CompressionCodec;
  23. /** 
  24.  * Maps input key/value pairs to a set of intermediate key/value pairs.  
  25.  * 
  26.  * <p>Maps are the individual tasks which transform input records into a 
  27.  * intermediate records. The transformed intermediate records need not be of 
  28.  * the same type as the input records. A given input pair may map to zero or 
  29.  * many output pairs.</p> 
  30.  * 
  31.  * <p>The Hadoop Map-Reduce framework spawns one map task for each 
  32.  * {@link InputSplit} generated by the {@link InputFormat} for the job.
  33.  * <code>Mapper</code> implementations can access the {@link Configuration} for 
  34.  * the job via the {@link JobContext#getConfiguration()}.
  35.  * 
  36.  * <p>The framework first calls 
  37.  * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
  38.  * {@link #map(Object, Object, Context)} 
  39.  * for each key/value pair in the <code>InputSplit</code>. Finally 
  40.  * {@link #cleanup(Context)} is called.</p>
  41.  * 
  42.  * <p>All intermediate values associated with a given output key are 
  43.  * subsequently grouped by the framework, and passed to a {@link Reducer} to  
  44.  * determine the final output. Users can control the sorting and grouping by 
  45.  * specifying two key {@link RawComparator} classes.</p>
  46.  *
  47.  * <p>The <code>Mapper</code> outputs are partitioned per 
  48.  * <code>Reducer</code>. Users can control which keys (and hence records) go to 
  49.  * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
  50.  * 
  51.  * <p>Users can optionally specify a <code>combiner</code>, via 
  52.  * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
  53.  * intermediate outputs, which helps to cut down the amount of data transferred 
  54.  * from the <code>Mapper</code> to the <code>Reducer</code>.
  55.  * 
  56.  * <p>Applications can specify if and how the intermediate
  57.  * outputs are to be compressed and which {@link CompressionCodec}s are to be
  58.  * used via the <code>Configuration</code>.</p>
  59.  *  
  60.  * <p>If the job has zero
  61.  * reduces then the output of the <code>Mapper</code> is directly written
  62.  * to the {@link OutputFormat} without sorting by keys.</p>
  63.  * 
  64.  * <p>Example:</p>
  65.  * <p><blockquote><pre>
  66.  * public class TokenCounterMapper 
  67.  *     extends Mapper<Object, Text, Text, IntWritable>{
  68.  *    
  69.  *   private final static IntWritable one = new IntWritable(1);
  70.  *   private Text word = new Text();
  71.  *   
  72.  *   public void map(Object key, Text value, Context context) throws IOException {
  73.  *     StringTokenizer itr = new StringTokenizer(value.toString());
  74.  *     while (itr.hasMoreTokens()) {
  75.  *       word.set(itr.nextToken());
  76.  *       context.collect(word, one);
  77.  *     }
  78.  *   }
  79.  * }
  80.  * </pre></blockquote></p>
  81.  *
  82.  * <p>Applications may override the {@link #run(Context)} method to exert 
  83.  * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
  84.  * etc.</p>
  85.  * 
  86.  * @see InputFormat
  87.  * @see JobContext
  88.  * @see Partitioner  
  89.  * @see Reducer
  90.  */
  91. public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
  92.   public class Context 
  93.     extends MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
  94.     public Context(Configuration conf, TaskAttemptID taskid,
  95.                    RecordReader<KEYIN,VALUEIN> reader,
  96.                    RecordWriter<KEYOUT,VALUEOUT> writer,
  97.                    OutputCommitter committer,
  98.                    StatusReporter reporter,
  99.                    InputSplit split) throws IOException, InterruptedException {
  100.       super(conf, taskid, reader, writer, committer, reporter, split);
  101.     }
  102.   }
  103.   
  104.   /**
  105.    * Called once at the beginning of the task.
  106.    */
  107.   protected void setup(Context context
  108.                        ) throws IOException, InterruptedException {
  109.     // NOTHING
  110.   }
  111.   /**
  112.    * Called once for each key/value pair in the input split. Most applications
  113.    * should override this, but the default is the identity function.
  114.    */
  115.   @SuppressWarnings("unchecked")
  116.   protected void map(KEYIN key, VALUEIN value, 
  117.                      Context context) throws IOException, InterruptedException {
  118.     context.write((KEYOUT) key, (VALUEOUT) value);
  119.   }
  120.   /**
  121.    * Called once at the end of the task.
  122.    */
  123.   protected void cleanup(Context context
  124.                          ) throws IOException, InterruptedException {
  125.     // NOTHING
  126.   }
  127.   
  128.   /**
  129.    * Expert users can override this method for more complete control over the
  130.    * execution of the Mapper.
  131.    * @param context
  132.    * @throws IOException
  133.    */
  134.   public void run(Context context) throws IOException, InterruptedException {
  135.     setup(context);
  136.     while (context.nextKeyValue()) {
  137.       map(context.getCurrentKey(), context.getCurrentValue(), context);
  138.     }
  139.     cleanup(context);
  140.   }
  141. }