FieldSelectionMapReduce.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:12k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Iterator;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.mapred.JobConf;
  26. import org.apache.hadoop.mapred.Mapper;
  27. import org.apache.hadoop.mapred.OutputCollector;
  28. import org.apache.hadoop.mapred.Reducer;
  29. import org.apache.hadoop.mapred.Reporter;
  30. import org.apache.hadoop.mapred.TextInputFormat;
  31. /**
  32.  * This class implements a mapper/reducer class that can be used to perform
  33.  * field selections in a manner similar to unix cut. The input data is treated
  34.  * as fields separated by a user specified separator (the default value is
  35.  * "t"). The user can specify a list of fields that form the map output keys,
  36.  * and a list of fields that form the map output values. If the inputformat is
  37.  * TextInputFormat, the mapper will ignore the key to the map function. and the
  38.  * fields are from the value only. Otherwise, the fields are the union of those
  39.  * from the key and those from the value.
  40.  * 
  41.  * The field separator is under attribute "mapred.data.field.separator"
  42.  * 
  43.  * The map output field list spec is under attribute "map.output.key.value.fields.spec".
  44.  * The value is expected to be like "keyFieldsSpec:valueFieldsSpec"
  45.  * key/valueFieldsSpec are comma (,) separated field spec: fieldSpec,fieldSpec,fieldSpec ...
  46.  * Each field spec can be a simple number (e.g. 5) specifying a specific field, or a range
  47.  * (like 2-5) to specify a range of fields, or an open range (like 3-) specifying all 
  48.  * the fields starting from field 3. The open range field spec applies value fields only.
  49.  * They have no effect on the key fields.
  50.  * 
  51.  * Here is an example: "4,3,0,1:6,5,1-3,7-". It specifies to use fields 4,3,0 and 1 for keys,
  52.  * and use fields 6,5,1,2,3,7 and above for values.
  53.  * 
  54.  * The reduce output field list spec is under attribute "reduce.output.key.value.fields.spec".
  55.  * 
  56.  * The reducer extracts output key/value pairs in a similar manner, except that
  57.  * the key is never ignored.
  58.  * 
  59.  */
  60. public class FieldSelectionMapReduce<K, V>
  61.     implements Mapper<K, V, Text, Text>, Reducer<Text, Text, Text, Text> {
  62.   private String mapOutputKeyValueSpec;
  63.   private boolean ignoreInputKey;
  64.   private String fieldSeparator = "t";
  65.   private int[] mapOutputKeyFieldList = null;
  66.   private int[] mapOutputValueFieldList = null;
  67.   private int allMapValueFieldsFrom = -1;
  68.   private String reduceOutputKeyValueSpec;
  69.   private int[] reduceOutputKeyFieldList = null;
  70.   private int[] reduceOutputValueFieldList = null;
  71.   private int allReduceValueFieldsFrom = -1;
  72.   private static Text emptyText = new Text("");
  73.   public static final Log LOG = LogFactory.getLog("FieldSelectionMapReduce");
  74.   private String specToString() {
  75.     StringBuffer sb = new StringBuffer();
  76.     sb.append("fieldSeparator: ").append(fieldSeparator).append("n");
  77.     sb.append("mapOutputKeyValueSpec: ").append(mapOutputKeyValueSpec).append(
  78.         "n");
  79.     sb.append("reduceOutputKeyValueSpec: ").append(reduceOutputKeyValueSpec)
  80.         .append("n");
  81.     sb.append("allMapValueFieldsFrom: ").append(allMapValueFieldsFrom).append(
  82.         "n");
  83.     sb.append("allReduceValueFieldsFrom: ").append(allReduceValueFieldsFrom)
  84.         .append("n");
  85.     int i = 0;
  86.     sb.append("mapOutputKeyFieldList.length: ").append(
  87.         mapOutputKeyFieldList.length).append("n");
  88.     for (i = 0; i < mapOutputKeyFieldList.length; i++) {
  89.       sb.append("t").append(mapOutputKeyFieldList[i]).append("n");
  90.     }
  91.     sb.append("mapOutputValueFieldList.length: ").append(
  92.         mapOutputValueFieldList.length).append("n");
  93.     for (i = 0; i < mapOutputValueFieldList.length; i++) {
  94.       sb.append("t").append(mapOutputValueFieldList[i]).append("n");
  95.     }
  96.     sb.append("reduceOutputKeyFieldList.length: ").append(
  97.         reduceOutputKeyFieldList.length).append("n");
  98.     for (i = 0; i < reduceOutputKeyFieldList.length; i++) {
  99.       sb.append("t").append(reduceOutputKeyFieldList[i]).append("n");
  100.     }
  101.     sb.append("reduceOutputValueFieldList.length: ").append(
  102.         reduceOutputValueFieldList.length).append("n");
  103.     for (i = 0; i < reduceOutputValueFieldList.length; i++) {
  104.       sb.append("t").append(reduceOutputValueFieldList[i]).append("n");
  105.     }
  106.     return sb.toString();
  107.   }
  108.   /**
  109.    * The identify function. Input key/value pair is written directly to output.
  110.    */
  111.   public void map(K key, V val,
  112.                   OutputCollector<Text, Text> output, Reporter reporter) throws IOException {
  113.     String valStr = val.toString();
  114.     String[] inputValFields = valStr.split(this.fieldSeparator);
  115.     String[] inputKeyFields = null;
  116.     String[] fields = null;
  117.     if (this.ignoreInputKey) {
  118.       fields = inputValFields;
  119.     } else {
  120.       inputKeyFields = key.toString().split(this.fieldSeparator);
  121.       fields = new String[inputKeyFields.length + inputValFields.length];
  122.       int i = 0;
  123.       for (i = 0; i < inputKeyFields.length; i++) {
  124.         fields[i] = inputKeyFields[i];
  125.       }
  126.       for (i = 0; i < inputValFields.length; i++) {
  127.         fields[inputKeyFields.length + i] = inputValFields[i];
  128.       }
  129.     }
  130.     String newKey = selectFields(fields, mapOutputKeyFieldList, -1,
  131.         fieldSeparator);
  132.     String newVal = selectFields(fields, mapOutputValueFieldList,
  133.         allMapValueFieldsFrom, fieldSeparator);
  134.     if (newKey == null) {
  135.       newKey = newVal;
  136.       newVal = null;
  137.     }
  138.     Text newTextKey = emptyText;
  139.     if (newKey != null) {
  140.       newTextKey = new Text(newKey);
  141.     }
  142.     Text newTextVal = emptyText;
  143.     if (newTextVal != null) {
  144.       newTextVal = new Text(newVal);
  145.     }
  146.     output.collect(newTextKey, newTextVal);
  147.   }
  148.   /**
  149.    * Extract the actual field numbers from the given field specs.
  150.    * If a field spec is in the form of "n-" (like 3-), then n will be the 
  151.    * return value. Otherwise, -1 will be returned.  
  152.    * @param fieldListSpec an array of field specs
  153.    * @param fieldList an array of field numbers extracted from the specs.
  154.    * @return number n if some field spec is in the form of "n-", -1 otherwise.
  155.    */
  156.   private int extractFields(String[] fieldListSpec,
  157.                             ArrayList<Integer> fieldList) {
  158.     int allFieldsFrom = -1;
  159.     int i = 0;
  160.     int j = 0;
  161.     int pos = -1;
  162.     String fieldSpec = null;
  163.     for (i = 0; i < fieldListSpec.length; i++) {
  164.       fieldSpec = fieldListSpec[i];
  165.       if (fieldSpec.length() == 0) {
  166.         continue;
  167.       }
  168.       pos = fieldSpec.indexOf('-');
  169.       if (pos < 0) {
  170.         Integer fn = new Integer(fieldSpec);
  171.         fieldList.add(fn);
  172.       } else {
  173.         String start = fieldSpec.substring(0, pos);
  174.         String end = fieldSpec.substring(pos + 1);
  175.         if (start.length() == 0) {
  176.           start = "0";
  177.         }
  178.         if (end.length() == 0) {
  179.           allFieldsFrom = Integer.parseInt(start);
  180.           continue;
  181.         }
  182.         int startPos = Integer.parseInt(start);
  183.         int endPos = Integer.parseInt(end);
  184.         for (j = startPos; j <= endPos; j++) {
  185.           fieldList.add(j);
  186.         }
  187.       }
  188.     }
  189.     return allFieldsFrom;
  190.   }
  191.   private void parseOutputKeyValueSpec() {
  192.     String[] mapKeyValSpecs = mapOutputKeyValueSpec.split(":", -1);
  193.     String[] mapKeySpec = mapKeyValSpecs[0].split(",");
  194.     String[] mapValSpec = new String[0];
  195.     if (mapKeyValSpecs.length > 1) {
  196.       mapValSpec = mapKeyValSpecs[1].split(",");
  197.     }
  198.     int i = 0;
  199.     ArrayList<Integer> fieldList = new ArrayList<Integer>();
  200.     extractFields(mapKeySpec, fieldList);
  201.     this.mapOutputKeyFieldList = new int[fieldList.size()];
  202.     for (i = 0; i < fieldList.size(); i++) {
  203.       this.mapOutputKeyFieldList[i] = fieldList.get(i).intValue();
  204.     }
  205.     fieldList = new ArrayList<Integer>();
  206.     allMapValueFieldsFrom = extractFields(mapValSpec, fieldList);
  207.     this.mapOutputValueFieldList = new int[fieldList.size()];
  208.     for (i = 0; i < fieldList.size(); i++) {
  209.       this.mapOutputValueFieldList[i] = fieldList.get(i).intValue();
  210.     }
  211.     String[] reduceKeyValSpecs = reduceOutputKeyValueSpec.split(":", -1);
  212.     String[] reduceKeySpec = reduceKeyValSpecs[0].split(",");
  213.     String[] reduceValSpec = new String[0];
  214.     if (reduceKeyValSpecs.length > 1) {
  215.       reduceValSpec = reduceKeyValSpecs[1].split(",");
  216.     }
  217.     fieldList = new ArrayList<Integer>();
  218.     extractFields(reduceKeySpec, fieldList);
  219.     this.reduceOutputKeyFieldList = new int[fieldList.size()];
  220.     for (i = 0; i < fieldList.size(); i++) {
  221.       this.reduceOutputKeyFieldList[i] = fieldList.get(i).intValue();
  222.     }
  223.     fieldList = new ArrayList<Integer>();
  224.     allReduceValueFieldsFrom = extractFields(reduceValSpec, fieldList);
  225.     this.reduceOutputValueFieldList = new int[fieldList.size()];
  226.     for (i = 0; i < fieldList.size(); i++) {
  227.       this.reduceOutputValueFieldList[i] = fieldList.get(i).intValue();
  228.     }
  229.   }
  230.   public void configure(JobConf job) {
  231.     this.fieldSeparator = job.get("mapred.data.field.separator", "t");
  232.     this.mapOutputKeyValueSpec = job.get("map.output.key.value.fields.spec",
  233.         "0-:");
  234.     this.ignoreInputKey = TextInputFormat.class.getCanonicalName().equals(
  235.         job.getInputFormat().getClass().getCanonicalName());
  236.     this.reduceOutputKeyValueSpec = job.get(
  237.         "reduce.output.key.value.fields.spec", "0-:");
  238.     parseOutputKeyValueSpec();
  239.     LOG.info(specToString());
  240.   }
  241.   public void close() throws IOException {
  242.     // TODO Auto-generated method stub
  243.   }
  244.   private static String selectFields(String[] fields, int[] fieldList,
  245.       int allFieldsFrom, String separator) {
  246.     String retv = null;
  247.     int i = 0;
  248.     StringBuffer sb = null;
  249.     if (fieldList != null && fieldList.length > 0) {
  250.       if (sb == null) {
  251.         sb = new StringBuffer();
  252.       }
  253.       for (i = 0; i < fieldList.length; i++) {
  254.         if (fieldList[i] < fields.length) {
  255.           sb.append(fields[fieldList[i]]);
  256.         }
  257.         sb.append(separator);
  258.       }
  259.     }
  260.     if (allFieldsFrom >= 0) {
  261.       if (sb == null) {
  262.         sb = new StringBuffer();
  263.       }
  264.       for (i = allFieldsFrom; i < fields.length; i++) {
  265.         sb.append(fields[i]).append(separator);
  266.       }
  267.     }
  268.     if (sb != null) {
  269.       retv = sb.toString();
  270.       if (retv.length() > 0) {
  271.         retv = retv.substring(0, retv.length() - 1);
  272.       }
  273.     }
  274.     return retv;
  275.   }
  276.   public void reduce(Text key, Iterator<Text> values,
  277.                      OutputCollector<Text, Text> output, Reporter reporter)
  278.     throws IOException {
  279.     String keyStr = key.toString() + this.fieldSeparator;
  280.     while (values.hasNext()) {
  281.       String valStr = values.next().toString();
  282.       valStr = keyStr + valStr;
  283.       String[] fields = valStr.split(this.fieldSeparator);
  284.       String newKey = selectFields(fields, reduceOutputKeyFieldList, -1,
  285.           fieldSeparator);
  286.       String newVal = selectFields(fields, reduceOutputValueFieldList,
  287.           allReduceValueFieldsFrom, fieldSeparator);
  288.       Text newTextKey = null;
  289.       if (newKey != null) {
  290.         newTextKey = new Text(newKey);
  291.       }
  292.       Text newTextVal = null;
  293.       if (newVal != null) {
  294.         newTextVal = new Text(newVal);
  295.       }
  296.       output.collect(newTextKey, newTextVal);
  297.     }
  298.   }
  299. }