PipesPartitioner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:2k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import org.apache.hadoop.io.Writable;
  20. import org.apache.hadoop.io.WritableComparable;
  21. import org.apache.hadoop.mapred.JobConf;
  22. import org.apache.hadoop.mapred.Partitioner;
  23. import org.apache.hadoop.util.ReflectionUtils;
  24. /**
  25.  * This partitioner is one that can either be set manually per a record or it
  26.  * can fall back onto a Java partitioner that was set by the user.
  27.  */
  28. class PipesPartitioner<K extends WritableComparable,
  29.                        V extends Writable>
  30.   implements Partitioner<K, V> {
  31.   
  32.   private static ThreadLocal<Integer> cache = new ThreadLocal<Integer>();
  33.   private Partitioner<K, V> part = null;
  34.   
  35.   @SuppressWarnings("unchecked")
  36.   public void configure(JobConf conf) {
  37.     part =
  38.       ReflectionUtils.newInstance(Submitter.getJavaPartitioner(conf), conf);
  39.   }
  40.   /**
  41.    * Set the next key to have the given partition.
  42.    * @param newValue the next partition value
  43.    */
  44.   static void setNextPartition(int newValue) {
  45.     cache.set(newValue);
  46.   }
  47.   /**
  48.    * If a partition result was set manually, return it. Otherwise, we call
  49.    * the Java partitioner.
  50.    * @param key the key to partition
  51.    * @param value the value to partition
  52.    * @param numPartitions the number of reduces
  53.    */
  54.   public int getPartition(K key, V value, 
  55.                           int numPartitions) {
  56.     Integer result = cache.get();
  57.     if (result == null) {
  58.       return part.getPartition(key, value, numPartitions);
  59.     } else {
  60.       return result;
  61.     }
  62.   }
  63. }