Submitter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.IOException;
  20. import java.net.URI;
  21. import java.net.URISyntaxException;
  22. import java.net.URL;
  23. import java.net.URLClassLoader;
  24. import java.security.AccessController;
  25. import java.security.PrivilegedAction;
  26. import java.util.StringTokenizer;
  27. import org.apache.commons.cli2.CommandLine;
  28. import org.apache.commons.cli2.OptionException;
  29. import org.apache.commons.cli2.builder.ArgumentBuilder;
  30. import org.apache.commons.cli2.builder.DefaultOptionBuilder;
  31. import org.apache.commons.cli2.builder.GroupBuilder;
  32. import org.apache.commons.cli2.commandline.Parser;
  33. import org.apache.commons.logging.Log;
  34. import org.apache.commons.logging.LogFactory;
  35. import org.apache.hadoop.conf.Configuration;
  36. import org.apache.hadoop.conf.Configured;
  37. import org.apache.hadoop.filecache.DistributedCache;
  38. import org.apache.hadoop.fs.FileSystem;
  39. import org.apache.hadoop.fs.Path;
  40. import org.apache.hadoop.io.Text;
  41. import org.apache.hadoop.mapred.FileInputFormat;
  42. import org.apache.hadoop.mapred.FileOutputFormat;
  43. import org.apache.hadoop.mapred.InputFormat;
  44. import org.apache.hadoop.mapred.JobClient;
  45. import org.apache.hadoop.mapred.JobConf;
  46. import org.apache.hadoop.mapred.Mapper;
  47. import org.apache.hadoop.mapred.OutputFormat;
  48. import org.apache.hadoop.mapred.Partitioner;
  49. import org.apache.hadoop.mapred.Reducer;
  50. import org.apache.hadoop.mapred.RunningJob;
  51. import org.apache.hadoop.mapred.lib.HashPartitioner;
  52. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  53. import org.apache.hadoop.util.GenericOptionsParser;
  54. import org.apache.hadoop.util.Tool;
  55. /**
  56.  * The main entry point and job submitter. It may either be used as a command
  57.  * line-based or API-based method to launch Pipes jobs.
  58.  */
  59. public class Submitter extends Configured implements Tool {
  60.   protected static final Log LOG = LogFactory.getLog(Submitter.class);
  61.   
  62.   public Submitter() {
  63.     this(new Configuration());
  64.   }
  65.   
  66.   public Submitter(Configuration conf) {
  67.     setConf(conf);
  68.   }
  69.   
  70.   /**
  71.    * Get the URI of the application's executable.
  72.    * @param conf
  73.    * @return the URI where the application's executable is located
  74.    */
  75.   public static String getExecutable(JobConf conf) {
  76.     return conf.get("hadoop.pipes.executable");
  77.   }
  78.   
  79.   /**
  80.    * Set the URI for the application's executable. Normally this is a hdfs: 
  81.    * location.
  82.    * @param conf
  83.    * @param executable The URI of the application's executable.
  84.    */
  85.   public static void setExecutable(JobConf conf, String executable) {
  86.     conf.set("hadoop.pipes.executable", executable);
  87.   }
  88.   /**
  89.    * Set whether the job is using a Java RecordReader.
  90.    * @param conf the configuration to modify
  91.    * @param value the new value
  92.    */
  93.   public static void setIsJavaRecordReader(JobConf conf, boolean value) {
  94.     conf.setBoolean("hadoop.pipes.java.recordreader", value);
  95.   }
  96.   /**
  97.    * Check whether the job is using a Java RecordReader
  98.    * @param conf the configuration to check
  99.    * @return is it a Java RecordReader?
  100.    */
  101.   public static boolean getIsJavaRecordReader(JobConf conf) {
  102.     return conf.getBoolean("hadoop.pipes.java.recordreader", false);
  103.   }
  104.   /**
  105.    * Set whether the Mapper is written in Java.
  106.    * @param conf the configuration to modify
  107.    * @param value the new value
  108.    */
  109.   public static void setIsJavaMapper(JobConf conf, boolean value) {
  110.     conf.setBoolean("hadoop.pipes.java.mapper", value);
  111.   }
  112.   /**
  113.    * Check whether the job is using a Java Mapper.
  114.    * @param conf the configuration to check
  115.    * @return is it a Java Mapper?
  116.    */
  117.   public static boolean getIsJavaMapper(JobConf conf) {
  118.     return conf.getBoolean("hadoop.pipes.java.mapper", false);
  119.   }
  120.   /**
  121.    * Set whether the Reducer is written in Java.
  122.    * @param conf the configuration to modify
  123.    * @param value the new value
  124.    */
  125.   public static void setIsJavaReducer(JobConf conf, boolean value) {
  126.     conf.setBoolean("hadoop.pipes.java.reducer", value);
  127.   }
  128.   /**
  129.    * Check whether the job is using a Java Reducer.
  130.    * @param conf the configuration to check
  131.    * @return is it a Java Reducer?
  132.    */
  133.   public static boolean getIsJavaReducer(JobConf conf) {
  134.     return conf.getBoolean("hadoop.pipes.java.reducer", false);
  135.   }
  136.   /**
  137.    * Set whether the job will use a Java RecordWriter.
  138.    * @param conf the configuration to modify
  139.    * @param value the new value to set
  140.    */
  141.   public static void setIsJavaRecordWriter(JobConf conf, boolean value) {
  142.     conf.setBoolean("hadoop.pipes.java.recordwriter", value);
  143.   }
  144.   /**
  145.    * Will the reduce use a Java RecordWriter?
  146.    * @param conf the configuration to check
  147.    * @return true, if the output of the job will be written by Java
  148.    */
  149.   public static boolean getIsJavaRecordWriter(JobConf conf) {
  150.     return conf.getBoolean("hadoop.pipes.java.recordwriter", false);
  151.   }
  152.   /**
  153.    * Set the configuration, if it doesn't already have a value for the given
  154.    * key.
  155.    * @param conf the configuration to modify
  156.    * @param key the key to set
  157.    * @param value the new "default" value to set
  158.    */
  159.   private static void setIfUnset(JobConf conf, String key, String value) {
  160.     if (conf.get(key) == null) {
  161.       conf.set(key, value);
  162.     }
  163.   }
  164.   /**
  165.    * Save away the user's original partitioner before we override it.
  166.    * @param conf the configuration to modify
  167.    * @param cls the user's partitioner class
  168.    */
  169.   static void setJavaPartitioner(JobConf conf, Class cls) {
  170.     conf.set("hadoop.pipes.partitioner", cls.getName());
  171.   }
  172.   
  173.   /**
  174.    * Get the user's original partitioner.
  175.    * @param conf the configuration to look in
  176.    * @return the class that the user submitted
  177.    */
  178.   static Class<? extends Partitioner> getJavaPartitioner(JobConf conf) {
  179.     return conf.getClass("hadoop.pipes.partitioner", 
  180.                          HashPartitioner.class,
  181.                          Partitioner.class);
  182.   }
  183.   /**
  184.    * Does the user want to keep the command file for debugging? If this is
  185.    * true, pipes will write a copy of the command data to a file in the
  186.    * task directory named "downlink.data", which may be used to run the C++
  187.    * program under the debugger. You probably also want to set 
  188.    * JobConf.setKeepFailedTaskFiles(true) to keep the entire directory from
  189.    * being deleted.
  190.    * To run using the data file, set the environment variable 
  191.    * "hadoop.pipes.command.file" to point to the file.
  192.    * @param conf the configuration to check
  193.    * @return will the framework save the command file?
  194.    */
  195.   public static boolean getKeepCommandFile(JobConf conf) {
  196.     return conf.getBoolean("hadoop.pipes.command-file.keep", false);
  197.   }
  198.   /**
  199.    * Set whether to keep the command file for debugging
  200.    * @param conf the configuration to modify
  201.    * @param keep the new value
  202.    */
  203.   public static void setKeepCommandFile(JobConf conf, boolean keep) {
  204.     conf.setBoolean("hadoop.pipes.command-file.keep", keep);
  205.   }
  206.   /**
  207.    * Submit a job to the map/reduce cluster. All of the necessary modifications
  208.    * to the job to run under pipes are made to the configuration.
  209.    * @param conf the job to submit to the cluster (MODIFIED)
  210.    * @throws IOException
  211.    * @deprecated Use {@link Submitter#runJob(JobConf)}
  212.    */
  213.   @Deprecated
  214.   public static RunningJob submitJob(JobConf conf) throws IOException {
  215.     return runJob(conf);
  216.   }
  217.   /**
  218.    * Submit a job to the map/reduce cluster. All of the necessary modifications
  219.    * to the job to run under pipes are made to the configuration.
  220.    * @param conf the job to submit to the cluster (MODIFIED)
  221.    * @throws IOException
  222.    */
  223.   public static RunningJob runJob(JobConf conf) throws IOException {
  224.     setupPipesJob(conf);
  225.     return JobClient.runJob(conf);
  226.   }
  227.   /**
  228.    * Submit a job to the Map-Reduce framework.
  229.    * This returns a handle to the {@link RunningJob} which can be used to track
  230.    * the running-job.
  231.    * 
  232.    * @param conf the job configuration.
  233.    * @return a handle to the {@link RunningJob} which can be used to track the
  234.    *         running-job.
  235.    * @throws IOException
  236.    */
  237.   public static RunningJob jobSubmit(JobConf conf) throws IOException {
  238.     setupPipesJob(conf);
  239.     return new JobClient(conf).submitJob(conf);
  240.   }
  241.   
  242.   private static void setupPipesJob(JobConf conf) throws IOException {
  243.     // default map output types to Text
  244.     if (!getIsJavaMapper(conf)) {
  245.       conf.setMapRunnerClass(PipesMapRunner.class);
  246.       // Save the user's partitioner and hook in our's.
  247.       setJavaPartitioner(conf, conf.getPartitionerClass());
  248.       conf.setPartitionerClass(PipesPartitioner.class);
  249.     }
  250.     if (!getIsJavaReducer(conf)) {
  251.       conf.setReducerClass(PipesReducer.class);
  252.       if (!getIsJavaRecordWriter(conf)) {
  253.         conf.setOutputFormat(NullOutputFormat.class);
  254.       }
  255.     }
  256.     String textClassname = Text.class.getName();
  257.     setIfUnset(conf, "mapred.mapoutput.key.class", textClassname);
  258.     setIfUnset(conf, "mapred.mapoutput.value.class", textClassname);
  259.     setIfUnset(conf, "mapred.output.key.class", textClassname);
  260.     setIfUnset(conf, "mapred.output.value.class", textClassname);
  261.     
  262.     // Use PipesNonJavaInputFormat if necessary to handle progress reporting
  263.     // from C++ RecordReaders ...
  264.     if (!getIsJavaRecordReader(conf) && !getIsJavaMapper(conf)) {
  265.       conf.setClass("mapred.pipes.user.inputformat", 
  266.                     conf.getInputFormat().getClass(), InputFormat.class);
  267.       conf.setInputFormat(PipesNonJavaInputFormat.class);
  268.     }
  269.     
  270.     String exec = getExecutable(conf);
  271.     if (exec == null) {
  272.       throw new IllegalArgumentException("No application program defined.");
  273.     }
  274.     // add default debug script only when executable is expressed as
  275.     // <path>#<executable>
  276.     if (exec.contains("#")) {
  277.       DistributedCache.createSymlink(conf);
  278.       // set default gdb commands for map and reduce task 
  279.       String defScript = "$HADOOP_HOME/src/c++/pipes/debug/pipes-default-script";
  280.       setIfUnset(conf,"mapred.map.task.debug.script",defScript);
  281.       setIfUnset(conf,"mapred.reduce.task.debug.script",defScript);
  282.     }
  283.     URI[] fileCache = DistributedCache.getCacheFiles(conf);
  284.     if (fileCache == null) {
  285.       fileCache = new URI[1];
  286.     } else {
  287.       URI[] tmp = new URI[fileCache.length+1];
  288.       System.arraycopy(fileCache, 0, tmp, 1, fileCache.length);
  289.       fileCache = tmp;
  290.     }
  291.     try {
  292.       fileCache[0] = new URI(exec);
  293.     } catch (URISyntaxException e) {
  294.       IOException ie = new IOException("Problem parsing execable URI " + exec);
  295.       ie.initCause(e);
  296.       throw ie;
  297.     }
  298.     DistributedCache.setCacheFiles(fileCache, conf);
  299.   }
  300.   /**
  301.    * A command line parser for the CLI-based Pipes job submitter.
  302.    */
  303.   static class CommandLineParser {
  304.     private DefaultOptionBuilder option = 
  305.       new DefaultOptionBuilder("-","-", false);
  306.     private ArgumentBuilder arg = new ArgumentBuilder();
  307.     private GroupBuilder optionList = new GroupBuilder();
  308.     
  309.     void addOption(String longName, boolean required, String description, 
  310.                    String paramName) {
  311.       arg.withName(paramName).withMinimum(1).withMaximum(1);
  312.       optionList.withOption(option.withLongName(longName).
  313.                                    withArgument(arg.create()).
  314.                                    withDescription(description).
  315.                                    withRequired(required).create());
  316.     }
  317.     
  318.     void addArgument(String name, boolean required, String description) {
  319.       arg.withName(name).withMinimum(1).withMaximum(1);
  320.       optionList.withOption(arg.create());
  321.     }
  322.     Parser createParser() {
  323.       Parser result = new Parser();
  324.       result.setGroup(optionList.create());
  325.       return result;
  326.     }
  327.     
  328.     void printUsage() {
  329.       // The CLI package should do this for us, but I can't figure out how
  330.       // to make it print something reasonable.
  331.       System.out.println("bin/hadoop pipes");
  332.       System.out.println("  [-input <path>] // Input directory");
  333.       System.out.println("  [-output <path>] // Output directory");
  334.       System.out.println("  [-jar <jar file> // jar filename");
  335.       System.out.println("  [-inputformat <class>] // InputFormat class");
  336.       System.out.println("  [-map <class>] // Java Map class");
  337.       System.out.println("  [-partitioner <class>] // Java Partitioner");
  338.       System.out.println("  [-reduce <class>] // Java Reduce class");
  339.       System.out.println("  [-writer <class>] // Java RecordWriter");
  340.       System.out.println("  [-program <executable>] // executable URI");
  341.       System.out.println("  [-reduces <num>] // number of reduces");
  342.       System.out.println();
  343.       GenericOptionsParser.printGenericCommandUsage(System.out);
  344.     }
  345.   }
  346.   
  347.   private static <InterfaceType> 
  348.   Class<? extends InterfaceType> getClass(CommandLine cl, String key, 
  349.                                           JobConf conf, 
  350.                                           Class<InterfaceType> cls
  351.                                          ) throws ClassNotFoundException {
  352.     return conf.getClassByName((String) cl.getValue(key)).asSubclass(cls);
  353.   }
  354.   @Override
  355.   public int run(String[] args) throws Exception {
  356.     CommandLineParser cli = new CommandLineParser();
  357.     if (args.length == 0) {
  358.       cli.printUsage();
  359.       return 1;
  360.     }
  361.     cli.addOption("input", false, "input path to the maps", "path");
  362.     cli.addOption("output", false, "output path from the reduces", "path");
  363.     
  364.     cli.addOption("jar", false, "job jar file", "path");
  365.     cli.addOption("inputformat", false, "java classname of InputFormat", 
  366.                   "class");
  367.     //cli.addArgument("javareader", false, "is the RecordReader in Java");
  368.     cli.addOption("map", false, "java classname of Mapper", "class");
  369.     cli.addOption("partitioner", false, "java classname of Partitioner", 
  370.                   "class");
  371.     cli.addOption("reduce", false, "java classname of Reducer", "class");
  372.     cli.addOption("writer", false, "java classname of OutputFormat", "class");
  373.     cli.addOption("program", false, "URI to application executable", "class");
  374.     cli.addOption("reduces", false, "number of reduces", "num");
  375.     cli.addOption("jobconf", false, 
  376.         ""n1=v1,n2=v2,.." (Deprecated) Optional. Add or override a JobConf property.",
  377.         "key=val");
  378.     Parser parser = cli.createParser();
  379.     try {
  380.       
  381.       GenericOptionsParser genericParser = new GenericOptionsParser(getConf(), args);
  382.       CommandLine results = parser.parse(genericParser.getRemainingArgs());
  383.       
  384.       JobConf job = new JobConf(getConf());
  385.       
  386.       if (results.hasOption("-input")) {
  387.         FileInputFormat.setInputPaths(job, 
  388.                           (String) results.getValue("-input"));
  389.       }
  390.       if (results.hasOption("-output")) {
  391.         FileOutputFormat.setOutputPath(job, 
  392.           new Path((String) results.getValue("-output")));
  393.       }
  394.       if (results.hasOption("-jar")) {
  395.         job.setJar((String) results.getValue("-jar"));
  396.       }
  397.       if (results.hasOption("-inputformat")) {
  398.         setIsJavaRecordReader(job, true);
  399.         job.setInputFormat(getClass(results, "-inputformat", job,
  400.                                      InputFormat.class));
  401.       }
  402.       if (results.hasOption("-javareader")) {
  403.         setIsJavaRecordReader(job, true);
  404.       }
  405.       if (results.hasOption("-map")) {
  406.         setIsJavaMapper(job, true);
  407.         job.setMapperClass(getClass(results, "-map", job, Mapper.class));
  408.       }
  409.       if (results.hasOption("-partitioner")) {
  410.         job.setPartitionerClass(getClass(results, "-partitioner", job,
  411.                                           Partitioner.class));
  412.       }
  413.       if (results.hasOption("-reduce")) {
  414.         setIsJavaReducer(job, true);
  415.         job.setReducerClass(getClass(results, "-reduce", job, Reducer.class));
  416.       }
  417.       if (results.hasOption("-reduces")) {
  418.         job.setNumReduceTasks(Integer.parseInt((String) 
  419.                                                 results.getValue("-reduces")));
  420.       }
  421.       if (results.hasOption("-writer")) {
  422.         setIsJavaRecordWriter(job, true);
  423.         job.setOutputFormat(getClass(results, "-writer", job, 
  424.                                       OutputFormat.class));
  425.       }
  426.       if (results.hasOption("-program")) {
  427.         setExecutable(job, (String) results.getValue("-program"));
  428.       }
  429.       if (results.hasOption("-jobconf")) {
  430.         LOG.warn("-jobconf option is deprecated, please use -D instead.");
  431.         String options = (String)results.getValue("-jobconf");
  432.         StringTokenizer tokenizer = new StringTokenizer(options, ",");
  433.         while (tokenizer.hasMoreTokens()) {
  434.           String keyVal = tokenizer.nextToken().trim();
  435.           String[] keyValSplit = keyVal.split("=");
  436.           job.set(keyValSplit[0], keyValSplit[1]);
  437.         }
  438.       }
  439.       // if they gave us a jar file, include it into the class path
  440.       String jarFile = job.getJar();
  441.       if (jarFile != null) {
  442.         final URL[] urls = new URL[]{ FileSystem.getLocal(job).
  443.             pathToFile(new Path(jarFile)).toURL()};
  444.         //FindBugs complains that creating a URLClassLoader should be
  445.         //in a doPrivileged() block. 
  446.         ClassLoader loader =
  447.           AccessController.doPrivileged(
  448.               new PrivilegedAction<ClassLoader>() {
  449.                 public ClassLoader run() {
  450.                   return new URLClassLoader(urls);
  451.                 }
  452.               }
  453.             );
  454.         job.setClassLoader(loader);
  455.       }
  456.       
  457.       runJob(job);
  458.       return 0;
  459.     } catch (OptionException oe) {
  460.       cli.printUsage();
  461.       return 1;
  462.     }
  463.     
  464.   }
  465.   
  466.   /**
  467.    * Submit a pipes job based on the command line arguments.
  468.    * @param args
  469.    */
  470.   public static void main(String[] args) throws Exception {
  471.     new Submitter().run(args);
  472.   }
  473. }