ValueAggregatorJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib.aggregate;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.Text;
  24. import org.apache.hadoop.mapred.FileInputFormat;
  25. import org.apache.hadoop.mapred.FileOutputFormat;
  26. import org.apache.hadoop.mapred.InputFormat;
  27. import org.apache.hadoop.mapred.JobClient;
  28. import org.apache.hadoop.mapred.JobConf;
  29. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  30. import org.apache.hadoop.mapred.TextInputFormat;
  31. import org.apache.hadoop.mapred.TextOutputFormat;
  32. import org.apache.hadoop.mapred.jobcontrol.Job;
  33. import org.apache.hadoop.mapred.jobcontrol.JobControl;
  34. import org.apache.hadoop.util.GenericOptionsParser;
  35. /**
  36.  * This is the main class for creating a map/reduce job using Aggregate
  37.  * framework. The Aggregate is a specialization of map/reduce framework,
  38.  * specilizing for performing various simple aggregations.
  39.  * 
  40.  * Generally speaking, in order to implement an application using Map/Reduce
  41.  * model, the developer is to implement Map and Reduce functions (and possibly
  42.  * combine function). However, a lot of applications related to counting and
  43.  * statistics computing have very similar characteristics. Aggregate abstracts
  44.  * out the general patterns of these functions and implementing those patterns.
  45.  * In particular, the package provides generic mapper/redducer/combiner classes,
  46.  * and a set of built-in value aggregators, and a generic utility class that
  47.  * helps user create map/reduce jobs using the generic class. The built-in
  48.  * aggregators include:
  49.  * 
  50.  * sum over numeric values count the number of distinct values compute the
  51.  * histogram of values compute the minimum, maximum, media,average, standard
  52.  * deviation of numeric values
  53.  * 
  54.  * The developer using Aggregate will need only to provide a plugin class
  55.  * conforming to the following interface:
  56.  * 
  57.  * public interface ValueAggregatorDescriptor { public ArrayList<Entry>
  58.  * generateKeyValPairs(Object key, Object value); public void
  59.  * configure(JobConfjob); }
  60.  * 
  61.  * The package also provides a base class, ValueAggregatorBaseDescriptor,
  62.  * implementing the above interface. The user can extend the base class and
  63.  * implement generateKeyValPairs accordingly.
  64.  * 
  65.  * The primary work of generateKeyValPairs is to emit one or more key/value
  66.  * pairs based on the input key/value pair. The key in an output key/value pair
  67.  * encode two pieces of information: aggregation type and aggregation id. The
  68.  * value will be aggregated onto the aggregation id according the aggregation
  69.  * type.
  70.  * 
  71.  * This class offers a function to generate a map/reduce job using Aggregate
  72.  * framework. The function takes the following parameters: input directory spec
  73.  * input format (text or sequence file) output directory a file specifying the
  74.  * user plugin class
  75.  * 
  76.  */
  77. public class ValueAggregatorJob {
  78.   public static JobControl createValueAggregatorJobs(String args[]
  79.     , Class<? extends ValueAggregatorDescriptor>[] descriptors) throws IOException {
  80.     
  81.     JobControl theControl = new JobControl("ValueAggregatorJobs");
  82.     ArrayList<Job> dependingJobs = new ArrayList<Job>();
  83.     JobConf aJobConf = createValueAggregatorJob(args);
  84.     if(descriptors != null)
  85.       setAggregatorDescriptors(aJobConf, descriptors);
  86.     Job aJob = new Job(aJobConf, dependingJobs);
  87.     theControl.addJob(aJob);
  88.     return theControl;
  89.   }
  90.   public static JobControl createValueAggregatorJobs(String args[]) throws IOException {
  91.     return createValueAggregatorJobs(args, null);
  92.   }
  93.   
  94.   /**
  95.    * Create an Aggregate based map/reduce job.
  96.    * 
  97.    * @param args the arguments used for job creation. Generic hadoop
  98.    * arguments are accepted.
  99.    * @return a JobConf object ready for submission.
  100.    * 
  101.    * @throws IOException
  102.    * @see GenericOptionsParser
  103.    */
  104.   public static JobConf createValueAggregatorJob(String args[])
  105.     throws IOException {
  106.     Configuration conf = new Configuration();
  107.     
  108.     GenericOptionsParser genericParser 
  109.       = new GenericOptionsParser(conf, args);
  110.     args = genericParser.getRemainingArgs();
  111.     
  112.     if (args.length < 2) {
  113.       System.out.println("usage: inputDirs outDir "
  114.           + "[numOfReducer [textinputformat|seq [specfile [jobName]]]]");
  115.       GenericOptionsParser.printGenericCommandUsage(System.out);
  116.       System.exit(1);
  117.     }
  118.     String inputDir = args[0];
  119.     String outputDir = args[1];
  120.     int numOfReducers = 1;
  121.     if (args.length > 2) {
  122.       numOfReducers = Integer.parseInt(args[2]);
  123.     }
  124.     Class<? extends InputFormat> theInputFormat =
  125.       TextInputFormat.class;
  126.     if (args.length > 3 && 
  127.         args[3].compareToIgnoreCase("textinputformat") == 0) {
  128.       theInputFormat = TextInputFormat.class;
  129.     } else {
  130.       theInputFormat = SequenceFileInputFormat.class;
  131.     }
  132.     Path specFile = null;
  133.     if (args.length > 4) {
  134.       specFile = new Path(args[4]);
  135.     }
  136.     String jobName = "";
  137.     
  138.     if (args.length > 5) {
  139.       jobName = args[5];
  140.     }
  141.     
  142.     JobConf theJob = new JobConf(conf);
  143.     if (specFile != null) {
  144.       theJob.addResource(specFile);
  145.     }
  146.     String userJarFile = theJob.get("user.jar.file");
  147.     if (userJarFile == null) {
  148.       theJob.setJarByClass(ValueAggregator.class);
  149.     } else {
  150.       theJob.setJar(userJarFile);
  151.     }
  152.     theJob.setJobName("ValueAggregatorJob: " + jobName);
  153.     FileInputFormat.addInputPaths(theJob, inputDir);
  154.     theJob.setInputFormat(theInputFormat);
  155.     
  156.     theJob.setMapperClass(ValueAggregatorMapper.class);
  157.     FileOutputFormat.setOutputPath(theJob, new Path(outputDir));
  158.     theJob.setOutputFormat(TextOutputFormat.class);
  159.     theJob.setMapOutputKeyClass(Text.class);
  160.     theJob.setMapOutputValueClass(Text.class);
  161.     theJob.setOutputKeyClass(Text.class);
  162.     theJob.setOutputValueClass(Text.class);
  163.     theJob.setReducerClass(ValueAggregatorReducer.class);
  164.     theJob.setCombinerClass(ValueAggregatorCombiner.class);
  165.     theJob.setNumMapTasks(1);
  166.     theJob.setNumReduceTasks(numOfReducers);
  167.     return theJob;
  168.   }
  169.   public static JobConf createValueAggregatorJob(String args[]
  170.     , Class<? extends ValueAggregatorDescriptor>[] descriptors)
  171.   throws IOException {
  172.     JobConf job = createValueAggregatorJob(args);
  173.     setAggregatorDescriptors(job, descriptors);
  174.     return job;
  175.   }
  176.   
  177.   public static void setAggregatorDescriptors(JobConf job
  178.       , Class<? extends ValueAggregatorDescriptor>[] descriptors) {
  179.     job.setInt("aggregator.descriptor.num", descriptors.length);
  180.     //specify the aggregator descriptors
  181.     for(int i=0; i< descriptors.length; i++) {
  182.       job.set("aggregator.descriptor." + i, "UserDefined," + descriptors[i].getName());
  183.     }    
  184.   }
  185.   
  186.   /**
  187.    * create and run an Aggregate based map/reduce job.
  188.    * 
  189.    * @param args the arguments used for job creation
  190.    * @throws IOException
  191.    */
  192.   public static void main(String args[]) throws IOException {
  193.     JobConf job = ValueAggregatorJob.createValueAggregatorJob(args);
  194.     JobClient.runJob(job);
  195.   }
  196. }