KeyFieldBasedPartitioner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.mapred.lib;
- import java.io.UnsupportedEncodingException;
- import java.util.List;
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.Partitioner;
- import org.apache.hadoop.mapred.lib.KeyFieldHelper.KeyDescription;
- /**
- * Defines a way to partition keys based on certain key fields (also see
- * {@link KeyFieldBasedComparator}.
- * The key specification supported is of the form -k pos1[,pos2], where,
- * pos is of the form f[.c][opts], where f is the number
- * of the key field to use, and c is the number of the first character from
- * the beginning of the field. Fields and character posns are numbered
- * starting with 1; a character position of zero in pos2 indicates the
- * field's last character. If '.c' is omitted from pos1, it defaults to 1
- * (the beginning of the field); if omitted from pos2, it defaults to 0
- * (the end of the field).
- *
- */
- public class KeyFieldBasedPartitioner<K2, V2> implements Partitioner<K2, V2> {
- private static final Log LOG = LogFactory.getLog(KeyFieldBasedPartitioner.class.getName());
- private int numOfPartitionFields;
-
- private KeyFieldHelper keyFieldHelper = new KeyFieldHelper();
- public void configure(JobConf job) {
- String keyFieldSeparator = job.get("map.output.key.field.separator", "t");
- keyFieldHelper.setKeyFieldSeparator(keyFieldSeparator);
- if (job.get("num.key.fields.for.partition") != null) {
- LOG.warn("Using deprecated num.key.fields.for.partition. " +
- "Use mapred.text.key.partitioner.options instead");
- this.numOfPartitionFields = job.getInt("num.key.fields.for.partition",0);
- keyFieldHelper.setKeyFieldSpec(1,numOfPartitionFields);
- } else {
- String option = job.getKeyFieldPartitionerOption();
- keyFieldHelper.parseOption(option);
- }
- }
- public int getPartition(K2 key, V2 value,
- int numReduceTasks) {
- byte[] keyBytes;
- List <KeyDescription> allKeySpecs = keyFieldHelper.keySpecs();
- if (allKeySpecs.size() == 0) {
- return (key.toString().hashCode() & Integer.MAX_VALUE) % numReduceTasks;
- }
- try {
- keyBytes = key.toString().getBytes("UTF-8");
- } catch (UnsupportedEncodingException e) {
- throw new RuntimeException("The current system does not " +
- "support UTF-8 encoding!", e);
- }
- int []lengthIndicesFirst = keyFieldHelper.getWordLengths(keyBytes, 0,
- keyBytes.length);
- int currentHash = 0;
- for (KeyDescription keySpec : allKeySpecs) {
- int startChar = keyFieldHelper.getStartOffset(keyBytes, 0, keyBytes.length,
- lengthIndicesFirst, keySpec);
- int endChar = keyFieldHelper.getEndOffset(keyBytes, 0, keyBytes.length,
- lengthIndicesFirst, keySpec);
- currentHash = hashCode(keyBytes, startChar, endChar,
- currentHash);
- }
- return (currentHash & Integer.MAX_VALUE) % numReduceTasks;
- }
-
- protected int hashCode(byte[] b, int start, int end, int currentHash) {
- for (int i = start; i <= end; i++) {
- currentHash = 31*currentHash + b[i];
- }
- return currentHash;
- }
- }