MultithreadedMapRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import org.apache.hadoop.util.ReflectionUtils;
  20. import org.apache.hadoop.mapred.MapRunnable;
  21. import org.apache.hadoop.mapred.JobConf;
  22. import org.apache.hadoop.mapred.Mapper;
  23. import org.apache.hadoop.mapred.RecordReader;
  24. import org.apache.hadoop.mapred.OutputCollector;
  25. import org.apache.hadoop.mapred.Reporter;
  26. import org.apache.hadoop.mapred.SkipBadRecords;
  27. import org.apache.commons.logging.Log;
  28. import org.apache.commons.logging.LogFactory;
  29. import java.io.IOException;
  30. import java.util.concurrent.*;
  31. /**
  32.  * Multithreaded implementation for @link org.apache.hadoop.mapred.MapRunnable.
  33.  * <p>
  34.  * It can be used instead of the default implementation,
  35.  * @link org.apache.hadoop.mapred.MapRunner, when the Map operation is not CPU
  36.  * bound in order to improve throughput.
  37.  * <p>
  38.  * Map implementations using this MapRunnable must be thread-safe.
  39.  * <p>
  40.  * The Map-Reduce job has to be configured to use this MapRunnable class (using
  41.  * the JobConf.setMapRunnerClass method) and
  42.  * the number of thread the thread-pool can use with the
  43.  * <code>mapred.map.multithreadedrunner.threads</code> property, its default
  44.  * value is 10 threads.
  45.  * <p>
  46.  */
  47. public class MultithreadedMapRunner<K1, V1, K2, V2>
  48.     implements MapRunnable<K1, V1, K2, V2> {
  49.   private static final Log LOG =
  50.     LogFactory.getLog(MultithreadedMapRunner.class.getName());
  51.   private JobConf job;
  52.   private Mapper<K1, V1, K2, V2> mapper;
  53.   private ExecutorService executorService;
  54.   private volatile IOException ioException;
  55.   private volatile RuntimeException runtimeException;
  56.   private boolean incrProcCount;
  57.   @SuppressWarnings("unchecked")
  58.   public void configure(JobConf jobConf) {
  59.     int numberOfThreads =
  60.       jobConf.getInt("mapred.map.multithreadedrunner.threads", 10);
  61.     if (LOG.isDebugEnabled()) {
  62.       LOG.debug("Configuring jobConf " + jobConf.getJobName() +
  63.                 " to use " + numberOfThreads + " threads");
  64.     }
  65.     this.job = jobConf;
  66.     //increment processed counter only if skipping feature is enabled
  67.     this.incrProcCount = SkipBadRecords.getMapperMaxSkipRecords(job)>0 && 
  68.       SkipBadRecords.getAutoIncrMapperProcCount(job);
  69.     this.mapper = ReflectionUtils.newInstance(jobConf.getMapperClass(),
  70.         jobConf);
  71.     // Creating a threadpool of the configured size to execute the Mapper
  72.     // map method in parallel.
  73.     executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
  74.                                              0L, TimeUnit.MILLISECONDS,
  75.                                              new BlockingArrayQueue
  76.                                                (numberOfThreads));
  77.   }
  78.   /**
  79.    * A blocking array queue that replaces offer and add, which throws on a full
  80.    * queue, to a put, which waits on a full queue.
  81.    */
  82.   private static class BlockingArrayQueue extends ArrayBlockingQueue<Runnable> {
  83.     public BlockingArrayQueue(int capacity) {
  84.       super(capacity);
  85.     }
  86.     public boolean offer(Runnable r) {
  87.       return add(r);
  88.     }
  89.     public boolean add(Runnable r) {
  90.       try {
  91.         put(r);
  92.       } catch (InterruptedException ie) {
  93.         Thread.currentThread().interrupt();
  94.       }
  95.       return true;
  96.     }
  97.   }
  98.   private void checkForExceptionsFromProcessingThreads()
  99.       throws IOException, RuntimeException {
  100.     // Checking if a Mapper.map within a Runnable has generated an
  101.     // IOException. If so we rethrow it to force an abort of the Map
  102.     // operation thus keeping the semantics of the default
  103.     // implementation.
  104.     if (ioException != null) {
  105.       throw ioException;
  106.     }
  107.     // Checking if a Mapper.map within a Runnable has generated a
  108.     // RuntimeException. If so we rethrow it to force an abort of the Map
  109.     // operation thus keeping the semantics of the default
  110.     // implementation.
  111.     if (runtimeException != null) {
  112.       throw runtimeException;
  113.     }
  114.   }
  115.   public void run(RecordReader<K1, V1> input, OutputCollector<K2, V2> output,
  116.                   Reporter reporter)
  117.     throws IOException {
  118.     try {
  119.       // allocate key & value instances these objects will not be reused
  120.       // because execution of Mapper.map is not serialized.
  121.       K1 key = input.createKey();
  122.       V1 value = input.createValue();
  123.       while (input.next(key, value)) {
  124.         executorService.execute(new MapperInvokeRunable(key, value, output,
  125.                                 reporter));
  126.         checkForExceptionsFromProcessingThreads();
  127.         // Allocate new key & value instances as mapper is running in parallel
  128.         key = input.createKey();
  129.         value = input.createValue();
  130.       }
  131.       if (LOG.isDebugEnabled()) {
  132.         LOG.debug("Finished dispatching all Mappper.map calls, job "
  133.                   + job.getJobName());
  134.       }
  135.       // Graceful shutdown of the Threadpool, it will let all scheduled
  136.       // Runnables to end.
  137.       executorService.shutdown();
  138.       try {
  139.         // Now waiting for all Runnables to end.
  140.         while (!executorService.awaitTermination(100, TimeUnit.MILLISECONDS)) {
  141.           if (LOG.isDebugEnabled()) {
  142.             LOG.debug("Awaiting all running Mappper.map calls to finish, job "
  143.                       + job.getJobName());
  144.           }
  145.           // NOTE: while Mapper.map dispatching has concluded there are still
  146.           // map calls in progress and exceptions would be thrown.
  147.           checkForExceptionsFromProcessingThreads();
  148.         }
  149.         // NOTE: it could be that a map call has had an exception after the
  150.         // call for awaitTermination() returing true. And edge case but it
  151.         // could happen.
  152.         checkForExceptionsFromProcessingThreads();
  153.       } catch (IOException ioEx) {
  154.         // Forcing a shutdown of all thread of the threadpool and rethrowing
  155.         // the IOException
  156.         executorService.shutdownNow();
  157.         throw ioEx;
  158.       } catch (InterruptedException iEx) {
  159.         throw new RuntimeException(iEx);
  160.       }
  161.     } finally {
  162.       mapper.close();
  163.     }
  164.   }
  165.   /**
  166.    * Runnable to execute a single Mapper.map call from a forked thread.
  167.    */
  168.   private class MapperInvokeRunable implements Runnable {
  169.     private K1 key;
  170.     private V1 value;
  171.     private OutputCollector<K2, V2> output;
  172.     private Reporter reporter;
  173.     /**
  174.      * Collecting all required parameters to execute a Mapper.map call.
  175.      * <p>
  176.      *
  177.      * @param key
  178.      * @param value
  179.      * @param output
  180.      * @param reporter
  181.      */
  182.     public MapperInvokeRunable(K1 key, V1 value,
  183.                                OutputCollector<K2, V2> output,
  184.                                Reporter reporter) {
  185.       this.key = key;
  186.       this.value = value;
  187.       this.output = output;
  188.       this.reporter = reporter;
  189.     }
  190.     /**
  191.      * Executes a Mapper.map call with the given Mapper and parameters.
  192.      * <p>
  193.      * This method is called from the thread-pool thread.
  194.      *
  195.      */
  196.     public void run() {
  197.       try {
  198.         // map pair to output
  199.         MultithreadedMapRunner.this.mapper.map(key, value, output, reporter);
  200.         if(incrProcCount) {
  201.           reporter.incrCounter(SkipBadRecords.COUNTER_GROUP, 
  202.               SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS, 1);
  203.         }
  204.       } catch (IOException ex) {
  205.         // If there is an IOException during the call it is set in an instance
  206.         // variable of the MultithreadedMapRunner from where it will be
  207.         // rethrown.
  208.         synchronized (MultithreadedMapRunner.this) {
  209.           if (MultithreadedMapRunner.this.ioException == null) {
  210.             MultithreadedMapRunner.this.ioException = ex;
  211.           }
  212.         }
  213.       } catch (RuntimeException ex) {
  214.         // If there is a RuntimeException during the call it is set in an
  215.         // instance variable of the MultithreadedMapRunner from where it will be
  216.         // rethrown.
  217.         synchronized (MultithreadedMapRunner.this) {
  218.           if (MultithreadedMapRunner.this.runtimeException == null) {
  219.             MultithreadedMapRunner.this.runtimeException = ex;
  220.           }
  221.         }
  222.       }
  223.     }
  224.   }
  225. }