MultithreadedMapper.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce.lib.map;
  19. import org.apache.hadoop.util.ReflectionUtils;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.mapreduce.Counter;
  22. import org.apache.hadoop.mapreduce.InputSplit;
  23. import org.apache.hadoop.mapreduce.Job;
  24. import org.apache.hadoop.mapreduce.JobContext;
  25. import org.apache.hadoop.mapreduce.Mapper;
  26. import org.apache.hadoop.mapreduce.RecordReader;
  27. import org.apache.hadoop.mapreduce.RecordWriter;
  28. import org.apache.hadoop.mapreduce.StatusReporter;
  29. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.commons.logging.LogFactory;
  32. import java.io.IOException;
  33. import java.util.ArrayList;
  34. import java.util.List;
  35. /**
  36.  * Multithreaded implementation for @link org.apache.hadoop.mapreduce.Mapper.
  37.  * <p>
  38.  * It can be used instead of the default implementation,
  39.  * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
  40.  * bound in order to improve throughput.
  41.  * <p>
  42.  * Mapper implementations using this MapRunnable must be thread-safe.
  43.  * <p>
  44.  * The Map-Reduce job has to be configured with the mapper to use via 
  45.  * {@link #setMapperClass(Configuration, Class)} and
  46.  * the number of thread the thread-pool can use with the
  47.  * {@link #getNumberOfThreads(Configuration) method. The default
  48.  * value is 10 threads.
  49.  * <p>
  50.  */
  51. public class MultithreadedMapper<K1, V1, K2, V2> 
  52.   extends Mapper<K1, V1, K2, V2> {
  53.   private static final Log LOG = LogFactory.getLog(MultithreadedMapper.class);
  54.   private Class<Mapper<K1,V1,K2,V2>> mapClass;
  55.   private Context outer;
  56.   private List<MapRunner> runners;
  57.   /**
  58.    * The number of threads in the thread pool that will run the map function.
  59.    * @param job the job
  60.    * @return the number of threads
  61.    */
  62.   public static int getNumberOfThreads(JobContext job) {
  63.     return job.getConfiguration().
  64.             getInt("mapred.map.multithreadedrunner.threads", 10);
  65.   }
  66.   /**
  67.    * Set the number of threads in the pool for running maps.
  68.    * @param job the job to modify
  69.    * @param threads the new number of threads
  70.    */
  71.   public static void setNumberOfThreads(Job job, int threads) {
  72.     job.getConfiguration().setInt("mapred.map.multithreadedrunner.threads", 
  73.                                   threads);
  74.   }
  75.   /**
  76.    * Get the application's mapper class.
  77.    * @param <K1> the map's input key type
  78.    * @param <V1> the map's input value type
  79.    * @param <K2> the map's output key type
  80.    * @param <V2> the map's output value type
  81.    * @param job the job
  82.    * @return the mapper class to run
  83.    */
  84.   @SuppressWarnings("unchecked")
  85.   public static <K1,V1,K2,V2>
  86.   Class<Mapper<K1,V1,K2,V2>> getMapperClass(JobContext job) {
  87.     return (Class<Mapper<K1,V1,K2,V2>>) 
  88.          job.getConfiguration().getClass("mapred.map.multithreadedrunner.class",
  89.                                          Mapper.class);
  90.   }
  91.   
  92.   /**
  93.    * Set the application's mapper class.
  94.    * @param <K1> the map input key type
  95.    * @param <V1> the map input value type
  96.    * @param <K2> the map output key type
  97.    * @param <V2> the map output value type
  98.    * @param job the job to modify
  99.    * @param cls the class to use as the mapper
  100.    */
  101.   public static <K1,V1,K2,V2> 
  102.   void setMapperClass(Job job, 
  103.                       Class<Mapper<K1,V1,K2,V2>> cls) {
  104.     if (MultithreadedMapper.class.isAssignableFrom(cls)) {
  105.       throw new IllegalArgumentException("Can't have recursive " + 
  106.                                          "MultithreadedMapper instances.");
  107.     }
  108.     job.getConfiguration().setClass("mapred.map.multithreadedrunner.class",
  109.                                     cls, Mapper.class);
  110.   }
  111.   /**
  112.    * Run the application's maps using a thread pool.
  113.    */
  114.   @Override
  115.   public void run(Context context) throws IOException, InterruptedException {
  116.     outer = context;
  117.     int numberOfThreads = getNumberOfThreads(context);
  118.     mapClass = getMapperClass(context);
  119.     if (LOG.isDebugEnabled()) {
  120.       LOG.debug("Configuring multithread runner to use " + numberOfThreads + 
  121.                 " threads");
  122.     }
  123.     
  124.     runners =  new ArrayList<MapRunner>(numberOfThreads);
  125.     for(int i=0; i < numberOfThreads; ++i) {
  126.       MapRunner thread = new MapRunner(context);
  127.       thread.start();
  128.       runners.set(i, thread);
  129.     }
  130.     for(int i=0; i < numberOfThreads; ++i) {
  131.       MapRunner thread = runners.get(i);
  132.       thread.join();
  133.       Throwable th = thread.throwable;
  134.       if (th != null) {
  135.         if (th instanceof IOException) {
  136.           throw (IOException) th;
  137.         } else if (th instanceof InterruptedException) {
  138.           throw (InterruptedException) th;
  139.         } else {
  140.           throw (RuntimeException) th;
  141.         }
  142.       }
  143.     }
  144.   }
  145.   private class SubMapRecordReader extends RecordReader<K1,V1> {
  146.     private K1 key;
  147.     private V1 value;
  148.     private Configuration conf;
  149.     @Override
  150.     public void close() throws IOException {
  151.     }
  152.     @Override
  153.     public float getProgress() throws IOException, InterruptedException {
  154.       return 0;
  155.     }
  156.     @Override
  157.     public void initialize(InputSplit split, 
  158.                            TaskAttemptContext context
  159.                            ) throws IOException, InterruptedException {
  160.       conf = context.getConfiguration();
  161.     }
  162.     @Override
  163.     public boolean nextKeyValue() throws IOException, InterruptedException {
  164.       synchronized (outer) {
  165.         if (!outer.nextKeyValue()) {
  166.           return false;
  167.         }
  168.         key = ReflectionUtils.copy(outer.getConfiguration(),
  169.                                    outer.getCurrentKey(), key);
  170.         value = ReflectionUtils.copy(conf, outer.getCurrentValue(), value);
  171.         return true;
  172.       }
  173.     }
  174.     public K1 getCurrentKey() {
  175.       return key;
  176.     }
  177.     @Override
  178.     public V1 getCurrentValue() {
  179.       return value;
  180.     }
  181.   }
  182.   
  183.   private class SubMapRecordWriter extends RecordWriter<K2,V2> {
  184.     @Override
  185.     public void close(TaskAttemptContext context) throws IOException,
  186.                                                  InterruptedException {
  187.     }
  188.     @Override
  189.     public void write(K2 key, V2 value) throws IOException,
  190.                                                InterruptedException {
  191.       synchronized (outer) {
  192.         outer.write(key, value);
  193.       }
  194.     }  
  195.   }
  196.   private class SubMapStatusReporter extends StatusReporter {
  197.     @Override
  198.     public Counter getCounter(Enum<?> name) {
  199.       return outer.getCounter(name);
  200.     }
  201.     @Override
  202.     public Counter getCounter(String group, String name) {
  203.       return outer.getCounter(group, name);
  204.     }
  205.     @Override
  206.     public void progress() {
  207.       outer.progress();
  208.     }
  209.     @Override
  210.     public void setStatus(String status) {
  211.       outer.setStatus(status);
  212.     }
  213.     
  214.   }
  215.   private class MapRunner extends Thread {
  216.     private Mapper<K1,V1,K2,V2> mapper;
  217.     private Context subcontext;
  218.     private Throwable throwable;
  219.     MapRunner(Context context) throws IOException, InterruptedException {
  220.       mapper = ReflectionUtils.newInstance(mapClass, 
  221.                                            context.getConfiguration());
  222.       subcontext = new Context(outer.getConfiguration(), 
  223.                             outer.getTaskAttemptID(),
  224.                             new SubMapRecordReader(),
  225.                             new SubMapRecordWriter(), 
  226.                             context.getOutputCommitter(),
  227.                             new SubMapStatusReporter(),
  228.                             outer.getInputSplit());
  229.     }
  230.     public Throwable getThrowable() {
  231.       return throwable;
  232.     }
  233.     @Override
  234.     public void run() {
  235.       try {
  236.         mapper.run(subcontext);
  237.       } catch (Throwable ie) {
  238.         throwable = ie;
  239.       }
  240.     }
  241.   }
  242. }