Sort.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.IOException;
  20. import java.net.URI;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.conf.Configured;
  24. import org.apache.hadoop.filecache.DistributedCache;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.io.BytesWritable;
  27. import org.apache.hadoop.io.Writable;
  28. import org.apache.hadoop.io.WritableComparable;
  29. import org.apache.hadoop.mapred.*;
  30. import org.apache.hadoop.mapred.lib.IdentityMapper;
  31. import org.apache.hadoop.mapred.lib.IdentityReducer;
  32. import org.apache.hadoop.mapred.lib.InputSampler;
  33. import org.apache.hadoop.mapred.lib.TotalOrderPartitioner;
  34. import org.apache.hadoop.util.Tool;
  35. import org.apache.hadoop.util.ToolRunner;
  36. /**
  37.  * This is the trivial map/reduce program that does absolutely nothing
  38.  * other than use the framework to fragment and sort the input values.
  39.  *
  40.  * To run: bin/hadoop jar build/hadoop-examples.jar sort
  41.  *            [-m <i>maps</i>] [-r <i>reduces</i>]
  42.  *            [-inFormat <i>input format class</i>] 
  43.  *            [-outFormat <i>output format class</i>] 
  44.  *            [-outKey <i>output key class</i>] 
  45.  *            [-outValue <i>output value class</i>] 
  46.  *            [-totalOrder <i>pcnt</i> <i>num samples</i> <i>max splits</i>]
  47.  *            <i>in-dir</i> <i>out-dir</i> 
  48.  */
  49. public class Sort<K,V> extends Configured implements Tool {
  50.   private RunningJob jobResult = null;
  51.   static int printUsage() {
  52.     System.out.println("sort [-m <maps>] [-r <reduces>] " +
  53.                        "[-inFormat <input format class>] " +
  54.                        "[-outFormat <output format class>] " + 
  55.                        "[-outKey <output key class>] " +
  56.                        "[-outValue <output value class>] " +
  57.                        "[-totalOrder <pcnt> <num samples> <max splits>] " +
  58.                        "<input> <output>");
  59.     ToolRunner.printGenericCommandUsage(System.out);
  60.     return -1;
  61.   }
  62.   /**
  63.    * The main driver for sort program.
  64.    * Invoke this method to submit the map/reduce job.
  65.    * @throws IOException When there is communication problems with the 
  66.    *                     job tracker.
  67.    */
  68.   public int run(String[] args) throws Exception {
  69.     JobConf jobConf = new JobConf(getConf(), Sort.class);
  70.     jobConf.setJobName("sorter");
  71.     jobConf.setMapperClass(IdentityMapper.class);        
  72.     jobConf.setReducerClass(IdentityReducer.class);
  73.     JobClient client = new JobClient(jobConf);
  74.     ClusterStatus cluster = client.getClusterStatus();
  75.     int num_reduces = (int) (cluster.getMaxReduceTasks() * 0.9);
  76.     String sort_reduces = jobConf.get("test.sort.reduces_per_host");
  77.     if (sort_reduces != null) {
  78.        num_reduces = cluster.getTaskTrackers() * 
  79.                        Integer.parseInt(sort_reduces);
  80.     }
  81.     Class<? extends InputFormat> inputFormatClass = 
  82.       SequenceFileInputFormat.class;
  83.     Class<? extends OutputFormat> outputFormatClass = 
  84.       SequenceFileOutputFormat.class;
  85.     Class<? extends WritableComparable> outputKeyClass = BytesWritable.class;
  86.     Class<? extends Writable> outputValueClass = BytesWritable.class;
  87.     List<String> otherArgs = new ArrayList<String>();
  88.     InputSampler.Sampler<K,V> sampler = null;
  89.     for(int i=0; i < args.length; ++i) {
  90.       try {
  91.         if ("-m".equals(args[i])) {
  92.           jobConf.setNumMapTasks(Integer.parseInt(args[++i]));
  93.         } else if ("-r".equals(args[i])) {
  94.           num_reduces = Integer.parseInt(args[++i]);
  95.         } else if ("-inFormat".equals(args[i])) {
  96.           inputFormatClass = 
  97.             Class.forName(args[++i]).asSubclass(InputFormat.class);
  98.         } else if ("-outFormat".equals(args[i])) {
  99.           outputFormatClass = 
  100.             Class.forName(args[++i]).asSubclass(OutputFormat.class);
  101.         } else if ("-outKey".equals(args[i])) {
  102.           outputKeyClass = 
  103.             Class.forName(args[++i]).asSubclass(WritableComparable.class);
  104.         } else if ("-outValue".equals(args[i])) {
  105.           outputValueClass = 
  106.             Class.forName(args[++i]).asSubclass(Writable.class);
  107.         } else if ("-totalOrder".equals(args[i])) {
  108.           double pcnt = Double.parseDouble(args[++i]);
  109.           int numSamples = Integer.parseInt(args[++i]);
  110.           int maxSplits = Integer.parseInt(args[++i]);
  111.           if (0 >= maxSplits) maxSplits = Integer.MAX_VALUE;
  112.           sampler =
  113.             new InputSampler.RandomSampler<K,V>(pcnt, numSamples, maxSplits);
  114.         } else {
  115.           otherArgs.add(args[i]);
  116.         }
  117.       } catch (NumberFormatException except) {
  118.         System.out.println("ERROR: Integer expected instead of " + args[i]);
  119.         return printUsage();
  120.       } catch (ArrayIndexOutOfBoundsException except) {
  121.         System.out.println("ERROR: Required parameter missing from " +
  122.             args[i-1]);
  123.         return printUsage(); // exits
  124.       }
  125.     }
  126.     // Set user-supplied (possibly default) job configs
  127.     jobConf.setNumReduceTasks(num_reduces);
  128.     jobConf.setInputFormat(inputFormatClass);
  129.     jobConf.setOutputFormat(outputFormatClass);
  130.     jobConf.setOutputKeyClass(outputKeyClass);
  131.     jobConf.setOutputValueClass(outputValueClass);
  132.     // Make sure there are exactly 2 parameters left.
  133.     if (otherArgs.size() != 2) {
  134.       System.out.println("ERROR: Wrong number of parameters: " +
  135.           otherArgs.size() + " instead of 2.");
  136.       return printUsage();
  137.     }
  138.     FileInputFormat.setInputPaths(jobConf, otherArgs.get(0));
  139.     FileOutputFormat.setOutputPath(jobConf, new Path(otherArgs.get(1)));
  140.     if (sampler != null) {
  141.       System.out.println("Sampling input to effect total-order sort...");
  142.       jobConf.setPartitionerClass(TotalOrderPartitioner.class);
  143.       Path inputDir = FileInputFormat.getInputPaths(jobConf)[0];
  144.       inputDir = inputDir.makeQualified(inputDir.getFileSystem(jobConf));
  145.       Path partitionFile = new Path(inputDir, "_sortPartitioning");
  146.       TotalOrderPartitioner.setPartitionFile(jobConf, partitionFile);
  147.       InputSampler.<K,V>writePartitionFile(jobConf, sampler);
  148.       URI partitionUri = new URI(partitionFile.toString() +
  149.                                  "#" + "_sortPartitioning");
  150.       DistributedCache.addCacheFile(partitionUri, jobConf);
  151.       DistributedCache.createSymlink(jobConf);
  152.     }
  153.     System.out.println("Running on " +
  154.         cluster.getTaskTrackers() +
  155.         " nodes to sort from " + 
  156.         FileInputFormat.getInputPaths(jobConf)[0] + " into " +
  157.         FileOutputFormat.getOutputPath(jobConf) +
  158.         " with " + num_reduces + " reduces.");
  159.     Date startTime = new Date();
  160.     System.out.println("Job started: " + startTime);
  161.     jobResult = JobClient.runJob(jobConf);
  162.     Date end_time = new Date();
  163.     System.out.println("Job ended: " + end_time);
  164.     System.out.println("The job took " + 
  165.         (end_time.getTime() - startTime.getTime()) /1000 + " seconds.");
  166.     return 0;
  167.   }
  168.   public static void main(String[] args) throws Exception {
  169.     int res = ToolRunner.run(new Configuration(), new Sort(), args);
  170.     System.exit(res);
  171.   }
  172.   /**
  173.    * Get the last job that was run using this instance.
  174.    * @return the results of the last job that was run
  175.    */
  176.   public RunningJob getResult() {
  177.     return jobResult;
  178.   }
  179. }