StreamJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:39k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.File;
  20. import java.io.FileNotFoundException;
  21. import java.io.IOException;
  22. import java.net.URI;
  23. import java.net.URLEncoder;
  24. import java.util.ArrayList;
  25. import java.util.Arrays;
  26. import java.util.Iterator;
  27. import java.util.List;
  28. import java.util.ListIterator;
  29. import java.util.Map;
  30. import java.util.TreeMap;
  31. import java.util.TreeSet;
  32. import org.apache.commons.cli2.Argument;
  33. import org.apache.commons.cli2.CommandLine;
  34. import org.apache.commons.cli2.Group;
  35. import org.apache.commons.cli2.Option;
  36. import org.apache.commons.cli2.OptionException;
  37. import org.apache.commons.cli2.WriteableCommandLine;
  38. import org.apache.commons.cli2.builder.ArgumentBuilder;
  39. import org.apache.commons.cli2.builder.DefaultOptionBuilder;
  40. import org.apache.commons.cli2.builder.GroupBuilder;
  41. import org.apache.commons.cli2.commandline.Parser;
  42. import org.apache.commons.cli2.option.PropertyOption;
  43. import org.apache.commons.cli2.resource.ResourceConstants;
  44. import org.apache.commons.cli2.util.HelpFormatter;
  45. import org.apache.commons.cli2.validation.InvalidArgumentException;
  46. import org.apache.commons.cli2.validation.Validator;
  47. import org.apache.commons.logging.Log;
  48. import org.apache.commons.logging.LogFactory;
  49. import org.apache.hadoop.conf.Configuration;
  50. import org.apache.hadoop.filecache.DistributedCache;
  51. import org.apache.hadoop.fs.Path;
  52. import org.apache.hadoop.io.Text;
  53. import org.apache.hadoop.mapred.FileAlreadyExistsException;
  54. import org.apache.hadoop.mapred.FileInputFormat;
  55. import org.apache.hadoop.mapred.FileOutputFormat;
  56. import org.apache.hadoop.mapred.InvalidJobConfException;
  57. import org.apache.hadoop.mapred.JobClient;
  58. import org.apache.hadoop.mapred.JobConf;
  59. import org.apache.hadoop.mapred.JobID;
  60. import org.apache.hadoop.mapred.KeyValueTextInputFormat;
  61. import org.apache.hadoop.mapred.RunningJob;
  62. import org.apache.hadoop.mapred.SequenceFileAsTextInputFormat;
  63. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  64. import org.apache.hadoop.mapred.TextInputFormat;
  65. import org.apache.hadoop.mapred.TextOutputFormat;
  66. import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorCombiner;
  67. import org.apache.hadoop.mapred.lib.aggregate.ValueAggregatorReducer;
  68. import org.apache.hadoop.util.GenericOptionsParser;
  69. import org.apache.hadoop.util.StringUtils;
  70. import org.apache.hadoop.util.Tool;
  71. /** All the client-side work happens here.
  72.  * (Jar packaging, MapRed job submission and monitoring)
  73.  */
  74. public class StreamJob implements Tool {
  75.   protected static final Log LOG = LogFactory.getLog(StreamJob.class.getName());
  76.   final static String REDUCE_NONE = "NONE";
  77.     
  78.   /** -----------Streaming CLI Implementation  **/
  79.   private DefaultOptionBuilder builder = 
  80.     new DefaultOptionBuilder("-","-", false);
  81.   private ArgumentBuilder argBuilder = new ArgumentBuilder(); 
  82.   private Parser parser = new Parser(); 
  83.   private Group allOptions; 
  84.   HelpFormatter helpFormatter = new HelpFormatter("  ", "  ", "  ", 900);
  85.   // need these two at class level to extract values later from 
  86.   // commons-cli command line
  87.   private MultiPropertyOption jobconf = new MultiPropertyOption(
  88.                                                                 "-jobconf", "(n=v) Optional. Add or override a JobConf property.", 'D'); 
  89.   private MultiPropertyOption cmdenv = new MultiPropertyOption(
  90.                                                                "-cmdenv", "(n=v) Pass env.var to streaming commands.", 'E');  
  91.   /**@deprecated use StreamJob() with ToolRunner or set the 
  92.    * Configuration using {@link #setConf(Configuration)} and 
  93.    * run with {@link #run(String[])}.  
  94.    */
  95.   @Deprecated
  96.   public StreamJob(String[] argv, boolean mayExit) {
  97.     this();
  98.     argv_ = argv;
  99.     this.config_ = new Configuration();
  100.   }
  101.   
  102.   public StreamJob() {
  103.     setupOptions();
  104.     this.config_ = new Configuration();
  105.   }
  106.   
  107.   @Override
  108.   public Configuration getConf() {
  109.     return config_;
  110.   }
  111.   @Override
  112.   public void setConf(Configuration conf) {
  113.     this.config_ = conf;
  114.   }
  115.   
  116.   @Override
  117.   public int run(String[] args) throws Exception {
  118.     try {
  119.       this.argv_ = args;
  120.       init();
  121.   
  122.       preProcessArgs();
  123.       parseArgv();
  124.       postProcessArgs();
  125.   
  126.       setJobConf();
  127.       return submitAndMonitorJob();
  128.     }catch (IllegalArgumentException ex) {
  129.       //ignore, since log will already be printed
  130.       return 1;
  131.     }
  132.   }
  133.   
  134.   /**
  135.    * This method creates a streaming job from the given argument list.
  136.    * The created object can be used and/or submitted to a jobtracker for 
  137.    * execution by a job agent such as JobControl
  138.    * @param argv the list args for creating a streaming job
  139.    * @return the created JobConf object 
  140.    * @throws IOException
  141.    */
  142.   static public JobConf createJob(String[] argv) throws IOException {
  143.     StreamJob job = new StreamJob();
  144.     job.argv_ = argv;
  145.     job.init();
  146.     job.preProcessArgs();
  147.     job.parseArgv();
  148.     job.postProcessArgs();
  149.     job.setJobConf();
  150.     return job.jobConf_;
  151.   }
  152.   /**
  153.    * This is the method that actually 
  154.    * intializes the job conf and submits the job
  155.    * to the jobtracker
  156.    * @throws IOException
  157.    * @deprecated use {@link #run(String[])} instead.
  158.    */
  159.   @Deprecated
  160.   public int go() throws IOException {
  161.     try {
  162.       return run(argv_);
  163.     }
  164.     catch (Exception ex) {
  165.       throw new IOException(ex.getMessage());
  166.     }
  167.   }
  168.   
  169.   protected void init() {
  170.     try {
  171.       env_ = new Environment();
  172.     } catch (IOException io) {
  173.       throw new RuntimeException(io);
  174.     }
  175.   }
  176.   void preProcessArgs() {
  177.     verbose_ = false;
  178.     addTaskEnvironment_ = "";
  179.   }
  180.   void postProcessArgs() throws IOException {
  181.     
  182.     if (inputSpecs_.size() == 0) {
  183.       fail("Required argument: -input <name>");
  184.     }
  185.     if (output_ == null) {
  186.       fail("Required argument: -output ");
  187.     }
  188.     msg("addTaskEnvironment=" + addTaskEnvironment_);
  189.     Iterator it = packageFiles_.iterator();
  190.     while (it.hasNext()) {
  191.       File f = new File((String) it.next());
  192.       if (f.isFile()) {
  193.         shippedCanonFiles_.add(f.getCanonicalPath());
  194.       }
  195.     }
  196.     msg("shippedCanonFiles_=" + shippedCanonFiles_);
  197.     // careful with class names..
  198.     mapCmd_ = unqualifyIfLocalPath(mapCmd_);
  199.     comCmd_ = unqualifyIfLocalPath(comCmd_);
  200.     redCmd_ = unqualifyIfLocalPath(redCmd_);
  201.   }
  202.   String unqualifyIfLocalPath(String cmd) throws IOException {
  203.     if (cmd == null) {
  204.       //
  205.     } else {
  206.       String prog = cmd;
  207.       String args = "";
  208.       int s = cmd.indexOf(" ");
  209.       if (s != -1) {
  210.         prog = cmd.substring(0, s);
  211.         args = cmd.substring(s + 1);
  212.       }
  213.       String progCanon;
  214.       try {
  215.         progCanon = new File(prog).getCanonicalPath();
  216.       } catch (IOException io) {
  217.         progCanon = prog;
  218.       }
  219.       boolean shipped = shippedCanonFiles_.contains(progCanon);
  220.       msg("shipped: " + shipped + " " + progCanon);
  221.       if (shipped) {
  222.         // Change path to simple filename.
  223.         // That way when PipeMapRed calls Runtime.exec(),
  224.         // it will look for the excutable in Task's working dir.
  225.         // And this is where TaskRunner unjars our job jar.
  226.         prog = new File(prog).getName();
  227.         if (args.length() > 0) {
  228.           cmd = prog + " " + args;
  229.         } else {
  230.           cmd = prog;
  231.         }
  232.       }
  233.     }
  234.     msg("cmd=" + cmd);
  235.     return cmd;
  236.   }
  237.   void parseArgv(){
  238.     CommandLine cmdLine = null; 
  239.     try{
  240.       cmdLine = parser.parse(argv_);
  241.     }catch(Exception oe){
  242.       LOG.error(oe.getMessage());
  243.       exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
  244.     }
  245.     
  246.     if (cmdLine != null){
  247.       verbose_ =  cmdLine.hasOption("-verbose");
  248.       detailedUsage_ = cmdLine.hasOption("-info");
  249.       debug_ = cmdLine.hasOption("-debug")? debug_ + 1 : debug_;
  250.       
  251.       inputSpecs_.addAll(cmdLine.getValues("-input"));
  252.       output_ = (String) cmdLine.getValue("-output"); 
  253.       
  254.       mapCmd_ = (String)cmdLine.getValue("-mapper"); 
  255.       comCmd_ = (String)cmdLine.getValue("-combiner"); 
  256.       redCmd_ = (String)cmdLine.getValue("-reducer"); 
  257.       
  258.       if(!cmdLine.getValues("-file").isEmpty()) {
  259.         packageFiles_.addAll(cmdLine.getValues("-file"));
  260.       }
  261.          
  262.       String fsName = (String)cmdLine.getValue("-dfs");
  263.       if (null != fsName){
  264.         LOG.warn("-dfs option is deprecated, please use -fs instead.");
  265.         config_.set("fs.default.name", fsName);
  266.       }
  267.       
  268.       additionalConfSpec_ = (String)cmdLine.getValue("-additionalconfspec"); 
  269.       inputFormatSpec_ = (String)cmdLine.getValue("-inputformat"); 
  270.       outputFormatSpec_ = (String)cmdLine.getValue("-outputformat");
  271.       numReduceTasksSpec_ = (String)cmdLine.getValue("-numReduceTasks"); 
  272.       partitionerSpec_ = (String)cmdLine.getValue("-partitioner");
  273.       inReaderSpec_ = (String)cmdLine.getValue("-inputreader"); 
  274.       mapDebugSpec_ = (String)cmdLine.getValue("-mapdebug");    
  275.       reduceDebugSpec_ = (String)cmdLine.getValue("-reducedebug");
  276.       
  277.       List<String> car = cmdLine.getValues("-cacheArchive"); 
  278.       if (null != car && !car.isEmpty()){
  279.         LOG.warn("-cacheArchive option is deprecated, please use -archives instead.");
  280.         for(String s : car){
  281.           cacheArchives = (cacheArchives == null)?s :cacheArchives + "," + s;  
  282.         }
  283.       }
  284.       List<String> caf = cmdLine.getValues("-cacheFile"); 
  285.       if (null != caf && !caf.isEmpty()){
  286.         LOG.warn("-cacheFile option is deprecated, please use -files instead.");
  287.         for(String s : caf){
  288.           cacheFiles = (cacheFiles == null)?s :cacheFiles + "," + s;  
  289.         }
  290.       }
  291.       
  292.       List<String> jobConfArgs = (List<String>)cmdLine.getValue(jobconf); 
  293.       List<String> envArgs = (List<String>)cmdLine.getValue(cmdenv); 
  294.       
  295.       if (null != jobConfArgs && !jobConfArgs.isEmpty()){
  296.         LOG.warn("-jobconf option is deprecated, please use -D instead.");
  297.         for(String s : jobConfArgs){
  298.           String []parts = s.split("=", 2); 
  299.           config_.set(parts[0], parts[1]);
  300.         }
  301.       }
  302.       if (null != envArgs){
  303.         for(String s : envArgs){
  304.           if (addTaskEnvironment_.length() > 0) {
  305.             addTaskEnvironment_ += " ";
  306.           }
  307.           addTaskEnvironment_ += s;
  308.         }
  309.       }
  310.     }else {
  311.       exitUsage(argv_.length > 0 && "-info".equals(argv_[0]));
  312.     }
  313.   }
  314.   protected void msg(String msg) {
  315.     if (verbose_) {
  316.       System.out.println("STREAM: " + msg);
  317.     }
  318.   }
  319.   
  320.   private Option createOption(String name, String desc, 
  321.                               String argName, int max, boolean required){
  322.     Argument argument = argBuilder.
  323.       withName(argName).
  324.       withMinimum(1).
  325.       withMaximum(max).
  326.       create();
  327.     return builder.
  328.       withLongName(name).
  329.       withArgument(argument).
  330.       withDescription(desc).
  331.       withRequired(required).
  332.       create();
  333.   }
  334.   
  335.   private Option createOption(String name, String desc, 
  336.                               String argName, int max, boolean required, Validator validator){
  337.     
  338.     Argument argument = argBuilder.
  339.       withName(argName).
  340.       withMinimum(1).
  341.       withMaximum(max).
  342.       withValidator(validator).
  343.       create();
  344.    
  345.     return builder.
  346.       withLongName(name).
  347.       withArgument(argument).
  348.       withDescription(desc).
  349.       withRequired(required).
  350.       create();
  351.   }  
  352.   
  353.   private Option createBoolOption(String name, String desc){
  354.     return builder.withLongName(name).withDescription(desc).create();
  355.   }
  356.   
  357.   private void setupOptions(){
  358.     final Validator fileValidator = new Validator(){
  359.         public void validate(final List values) throws InvalidArgumentException {
  360.           // Note : This code doesnt belong here, it should be changed to 
  361.           // an can exec check in java 6
  362.           for (String file : (List<String>)values) {
  363.             File f = new File(file);  
  364.             if (!f.exists()) {
  365.               throw new InvalidArgumentException("Argument : " + 
  366.                                                  f.getAbsolutePath() + " doesn't exist."); 
  367.             }
  368.             if (!f.isFile()) {
  369.               throw new InvalidArgumentException("Argument : " + 
  370.                                                  f.getAbsolutePath() + " is not a file."); 
  371.             }
  372.             if (!f.canRead()) {
  373.               throw new InvalidArgumentException("Argument : " + 
  374.                                                  f.getAbsolutePath() + " is not accessible"); 
  375.             }
  376.           }
  377.         }      
  378.       }; 
  379.     // Note: not extending CLI2's FileValidator, that overwrites 
  380.     // the String arg into File and causes ClassCastException 
  381.     // in inheritance tree. 
  382.     final Validator execValidator = new Validator(){
  383.         public void validate(final List values) throws InvalidArgumentException {
  384.           // Note : This code doesnt belong here, it should be changed to 
  385.           // an can exec check in java 6
  386.           for (String file : (List<String>)values) {
  387.             try{
  388.               Runtime.getRuntime().exec("chmod 0777 " + (new File(file)).getAbsolutePath());
  389.             }catch(IOException ioe){
  390.               // ignore 
  391.             }
  392.           }
  393.           fileValidator.validate(values);
  394.         }      
  395.       }; 
  396.     Option input   = createOption("input", 
  397.                                   "DFS input file(s) for the Map step", 
  398.                                   "path", 
  399.                                   Integer.MAX_VALUE, 
  400.                                   true);  
  401.     
  402.     Option output  = createOption("output", 
  403.                                   "DFS output directory for the Reduce step", 
  404.                                   "path", 1, true); 
  405.     Option mapper  = createOption("mapper", 
  406.                                   "The streaming command to run", "cmd", 1, false);
  407.     Option combiner = createOption("combiner", 
  408.                                    "The streaming command to run", "cmd", 1, false);
  409.     // reducer could be NONE 
  410.     Option reducer = createOption("reducer", 
  411.                                   "The streaming command to run", "cmd", 1, false); 
  412.     Option file = createOption("file", 
  413.                                "File/dir to be shipped in the Job jar file", 
  414.                                "file", Integer.MAX_VALUE, false, execValidator); 
  415.     Option dfs = createOption("dfs", 
  416.                               "Optional. Override DFS configuration", "<h:p>|local", 1, false); 
  417.     Option jt = createOption("jt", 
  418.                              "Optional. Override JobTracker configuration", "<h:p>|local", 1, false);
  419.     Option additionalconfspec = createOption("additionalconfspec", 
  420.                                              "Optional.", "spec", 1, false);
  421.     Option inputformat = createOption("inputformat", 
  422.                                       "Optional.", "spec", 1, false);
  423.     Option outputformat = createOption("outputformat", 
  424.                                        "Optional.", "spec", 1, false);
  425.     Option partitioner = createOption("partitioner", 
  426.                                       "Optional.", "spec", 1, false);
  427.     Option numReduceTasks = createOption("numReduceTasks", 
  428.         "Optional.", "spec",1, false );
  429.     Option inputreader = createOption("inputreader", 
  430.                                       "Optional.", "spec", 1, false);
  431.     Option mapDebug = createOption("mapdebug",
  432.                                    "Optional.", "spec", 1, false);
  433.     Option reduceDebug = createOption("reducedebug",
  434.                                       "Optional", "spec",1, false);
  435.     Option cacheFile = createOption("cacheFile", 
  436.                                     "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
  437.     Option cacheArchive = createOption("cacheArchive", 
  438.                                        "File name URI", "fileNameURI", Integer.MAX_VALUE, false);
  439.     
  440.     // boolean properties
  441.     
  442.     Option verbose = createBoolOption("verbose", "print verbose output"); 
  443.     Option info = createBoolOption("info", "print verbose output"); 
  444.     Option help = createBoolOption("help", "print this help message"); 
  445.     Option debug = createBoolOption("debug", "print debug output"); 
  446.     Option inputtagged = createBoolOption("inputtagged", "inputtagged"); 
  447.     
  448.     allOptions = new GroupBuilder().
  449.       withOption(input).
  450.       withOption(output).
  451.       withOption(mapper).
  452.       withOption(combiner).
  453.       withOption(reducer).
  454.       withOption(file).
  455.       withOption(dfs).
  456.       withOption(jt).
  457.       withOption(additionalconfspec).
  458.       withOption(inputformat).
  459.       withOption(outputformat).
  460.       withOption(partitioner).
  461.       withOption(numReduceTasks).
  462.       withOption(inputreader).
  463.       withOption(mapDebug).
  464.       withOption(reduceDebug).
  465.       withOption(jobconf).
  466.       withOption(cmdenv).
  467.       withOption(cacheFile).
  468.       withOption(cacheArchive).
  469.       withOption(verbose).
  470.       withOption(info).
  471.       withOption(debug).
  472.       withOption(inputtagged).
  473.       withOption(help).
  474.       create();
  475.     parser.setGroup(allOptions);
  476.     
  477.   }
  478.   public void exitUsage(boolean detailed) {
  479.     //         1         2         3         4         5         6         7
  480.     //1234567890123456789012345678901234567890123456789012345678901234567890123456789
  481.     
  482.     System.out.println("Usage: $HADOOP_HOME/bin/hadoop jar \");
  483.     System.out.println("          $HADOOP_HOME/hadoop-streaming.jar [options]");
  484.     System.out.println("Options:");
  485.     System.out.println("  -input    <path>     DFS input file(s) for the Map step");
  486.     System.out.println("  -output   <path>     DFS output directory for the Reduce step");
  487.     System.out.println("  -mapper   <cmd|JavaClassName>      The streaming command to run");
  488.     System.out.println("  -combiner <JavaClassName> Combiner has to be a Java class");
  489.     System.out.println("  -reducer  <cmd|JavaClassName>      The streaming command to run");
  490.     System.out.println("  -file     <file>     File/dir to be shipped in the Job jar file");
  491.     System.out.println("  -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional.");
  492.     System.out.println("  -outputformat TextOutputFormat(default)|JavaClassName  Optional.");
  493.     System.out.println("  -partitioner JavaClassName  Optional.");
  494.     System.out.println("  -numReduceTasks <num>  Optional.");
  495.     System.out.println("  -inputreader <spec>  Optional.");
  496.     System.out.println("  -cmdenv   <n>=<v>    Optional. Pass env.var to streaming commands");
  497.     System.out.println("  -mapdebug <path>  Optional. " +
  498.     "To run this script when a map task fails ");
  499.     System.out.println("  -reducedebug <path>  Optional." +
  500.     " To run this script when a reduce task fails ");
  501.     System.out.println("  -verbose");
  502.     System.out.println();
  503.     GenericOptionsParser.printGenericCommandUsage(System.out);
  504.     if (!detailed) {
  505.       System.out.println();      
  506.       System.out.println("For more details about these options:");
  507.       System.out.println("Use $HADOOP_HOME/bin/hadoop jar build/hadoop-streaming.jar -info");
  508.       fail("");
  509.     }
  510.     System.out.println();
  511.     System.out.println("In -input: globbing on <path> is supported and can have multiple -input");
  512.     System.out.println("Default Map input format: a line is a record in UTF-8");
  513.     System.out.println("  the key part ends at first TAB, the rest of the line is the value");
  514.     System.out.println("Custom input format: -inputformat package.MyInputFormat ");
  515.     System.out.println("Map output format, reduce input/output format:");
  516.     System.out.println("  Format defined by what the mapper command outputs. Line-oriented");
  517.     System.out.println();
  518.     System.out.println("The files or directories named in the -file argument[s] end up in the");
  519.     System.out.println("  working directory when the mapper and reducer are run.");
  520.     System.out.println("  The location of this working directory is unspecified.");
  521.     System.out.println();
  522.     System.out.println("To set the number of reduce tasks (num. of output files):");
  523.     System.out.println("  -D mapred.reduce.tasks=10");
  524.     System.out.println("To skip the sort/combine/shuffle/sort/reduce step:");
  525.     System.out.println("  Use -numReduceTasks 0");
  526.     System.out
  527.       .println("  A Task's Map output then becomes a 'side-effect output' rather than a reduce input");
  528.     System.out
  529.       .println("  This speeds up processing, This also feels more like "in-place" processing");
  530.     System.out.println("  because the input filename and the map input order are preserved");
  531.     System.out.println("  This equivalent -reducer NONE");
  532.     System.out.println();
  533.     System.out.println("To speed up the last maps:");
  534.     System.out.println("  -D mapred.map.tasks.speculative.execution=true");
  535.     System.out.println("To speed up the last reduces:");
  536.     System.out.println("  -D mapred.reduce.tasks.speculative.execution=true");
  537.     System.out.println("To name the job (appears in the JobTracker Web UI):");
  538.     System.out.println("  -D mapred.job.name='My Job' ");
  539.     System.out.println("To change the local temp directory:");
  540.     System.out.println("  -D dfs.data.dir=/tmp/dfs");
  541.     System.out.println("  -D stream.tmpdir=/tmp/streaming");
  542.     System.out.println("Additional local temp directories with -cluster local:");
  543.     System.out.println("  -D mapred.local.dir=/tmp/local");
  544.     System.out.println("  -D mapred.system.dir=/tmp/system");
  545.     System.out.println("  -D mapred.temp.dir=/tmp/temp");
  546.     System.out.println("To treat tasks with non-zero exit status as SUCCEDED:");    
  547.     System.out.println("  -D stream.non.zero.exit.is.failure=false");
  548.     System.out.println("Use a custom hadoopStreaming build along a standard hadoop install:");
  549.     System.out.println("  $HADOOP_HOME/bin/hadoop jar /path/my-hadoop-streaming.jar [...]\");
  550.     System.out
  551.       .println("    [...] -D stream.shipped.hadoopstreaming=/path/my-hadoop-streaming.jar");
  552.     System.out.println("For more details about jobconf parameters see:");
  553.     System.out.println("  http://wiki.apache.org/hadoop/JobConfFile");
  554.     System.out.println("To set an environement variable in a streaming command:");
  555.     System.out.println("   -cmdenv EXAMPLE_DIR=/home/example/dictionaries/");
  556.     System.out.println();
  557.     System.out.println("Shortcut:");
  558.     System.out
  559.       .println("   setenv HSTREAMING "$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar"");
  560.     System.out.println();
  561.     System.out.println("Example: $HSTREAMING -mapper "/usr/local/bin/perl5 filter.pl"");
  562.     System.out.println("           -file /local/filter.pl -input "/logs/0604*/*" [...]");
  563.     System.out.println("  Ships a script, invokes the non-shipped perl interpreter");
  564.     System.out.println("  Shipped files go to the working directory so filter.pl is found by perl");
  565.     System.out.println("  Input files are all the daily logs for days in month 2006-04");
  566.     fail("");
  567.   }
  568.   public void fail(String message) {    
  569.     System.err.println(message);
  570.     throw new IllegalArgumentException(message);
  571.   }
  572.   // --------------------------------------------
  573.   protected String getHadoopClientHome() {
  574.     String h = env_.getProperty("HADOOP_HOME"); // standard Hadoop
  575.     if (h == null) {
  576.       //fail("Missing required environment variable: HADOOP_HOME");
  577.       h = "UNDEF";
  578.     }
  579.     return h;
  580.   }
  581.   protected boolean isLocalHadoop() {
  582.     return StreamUtil.isLocalJobTracker(jobConf_);
  583.   }
  584.   @Deprecated
  585.   protected String getClusterNick() {
  586.     return "default";
  587.   }
  588.   /** @return path to the created Jar file or null if no files are necessary.
  589.    */
  590.   protected String packageJobJar() throws IOException {
  591.     ArrayList unjarFiles = new ArrayList();
  592.     // Runtime code: ship same version of code as self (job submitter code)
  593.     // usually found in: build/contrib or build/hadoop-<version>-dev-streaming.jar
  594.     // First try an explicit spec: it's too hard to find our own location in this case:
  595.     // $HADOOP_HOME/bin/hadoop jar /not/first/on/classpath/custom-hadoop-streaming.jar
  596.     // where findInClasspath() would find the version of hadoop-streaming.jar in $HADOOP_HOME
  597.     String runtimeClasses = config_.get("stream.shipped.hadoopstreaming"); // jar or class dir
  598.     
  599.     if (runtimeClasses == null) {
  600.       runtimeClasses = StreamUtil.findInClasspath(StreamJob.class.getName());
  601.     }
  602.     if (runtimeClasses == null) {
  603.       throw new IOException("runtime classes not found: " + getClass().getPackage());
  604.     } else {
  605.       msg("Found runtime classes in: " + runtimeClasses);
  606.     }
  607.     if (isLocalHadoop()) {
  608.       // don't package class files (they might get unpackaged in "." and then
  609.       //  hide the intended CLASSPATH entry)
  610.       // we still package everything else (so that scripts and executable are found in
  611.       //  Task workdir like distributed Hadoop)
  612.     } else {
  613.       if (new File(runtimeClasses).isDirectory()) {
  614.         packageFiles_.add(runtimeClasses);
  615.       } else {
  616.         unjarFiles.add(runtimeClasses);
  617.       }
  618.     }
  619.     if (packageFiles_.size() + unjarFiles.size() == 0) {
  620.       return null;
  621.     }
  622.     String tmp = jobConf_.get("stream.tmpdir"); //, "/tmp/${user.name}/"
  623.     File tmpDir = (tmp == null) ? null : new File(tmp);
  624.     // tmpDir=null means OS default tmp dir
  625.     File jobJar = File.createTempFile("streamjob", ".jar", tmpDir);
  626.     System.out.println("packageJobJar: " + packageFiles_ + " " + unjarFiles + " " + jobJar
  627.                        + " tmpDir=" + tmpDir);
  628.     if (debug_ == 0) {
  629.       jobJar.deleteOnExit();
  630.     }
  631.     JarBuilder builder = new JarBuilder();
  632.     if (verbose_) {
  633.       builder.setVerbose(true);
  634.     }
  635.     String jobJarName = jobJar.getAbsolutePath();
  636.     builder.merge(packageFiles_, unjarFiles, jobJarName);
  637.     return jobJarName;
  638.   }
  639.   
  640.   /**
  641.    * get the uris of all the files/caches
  642.    */
  643.   protected void getURIs(String lcacheArchives, String lcacheFiles) {
  644.     String archives[] = StringUtils.getStrings(lcacheArchives);
  645.     String files[] = StringUtils.getStrings(lcacheFiles);
  646.     fileURIs = StringUtils.stringToURI(files);
  647.     archiveURIs = StringUtils.stringToURI(archives);
  648.   }
  649.   
  650.   protected void setJobConf() throws IOException {
  651.     if (additionalConfSpec_ != null) {
  652.       LOG.warn("-additionalconfspec option is deprecated, please use -conf instead.");
  653.       config_.addResource(new Path(additionalConfSpec_));
  654.     }
  655.     // general MapRed job properties
  656.     jobConf_ = new JobConf(config_);
  657.     
  658.     // All streaming jobs get the task timeout value
  659.     // from the configuration settings.
  660.     // The correct FS must be set before this is called!
  661.     // (to resolve local vs. dfs drive letter differences) 
  662.     // (mapred.working.dir will be lazily initialized ONCE and depends on FS)
  663.     for (int i = 0; i < inputSpecs_.size(); i++) {
  664.       FileInputFormat.addInputPaths(jobConf_, 
  665.                         (String) inputSpecs_.get(i));
  666.     }
  667.     jobConf_.set("stream.numinputspecs", "" + inputSpecs_.size());
  668.     String defaultPackage = this.getClass().getPackage().getName();
  669.     Class c;
  670.     Class fmt = null;
  671.     if (inReaderSpec_ == null && inputFormatSpec_ == null) {
  672.       fmt = TextInputFormat.class;
  673.     } else if (inputFormatSpec_ != null) {
  674.       if (inputFormatSpec_.equals(TextInputFormat.class.getName())
  675.           || inputFormatSpec_.equals(TextInputFormat.class.getCanonicalName())
  676.           || inputFormatSpec_.equals(TextInputFormat.class.getSimpleName())) {
  677.         fmt = TextInputFormat.class;
  678.       } else if (inputFormatSpec_.equals(KeyValueTextInputFormat.class
  679.           .getName())
  680.           || inputFormatSpec_.equals(KeyValueTextInputFormat.class
  681.               .getCanonicalName())
  682.           || inputFormatSpec_.equals(KeyValueTextInputFormat.class.getSimpleName())) {
  683.       } else if (inputFormatSpec_.equals(SequenceFileInputFormat.class
  684.           .getName())
  685.           || inputFormatSpec_
  686.               .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class
  687.                   .getCanonicalName())
  688.           || inputFormatSpec_
  689.               .equals(org.apache.hadoop.mapred.SequenceFileInputFormat.class.getSimpleName())) {
  690.       } else if (inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
  691.           .getName())
  692.           || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class
  693.               .getCanonicalName())
  694.           || inputFormatSpec_.equals(SequenceFileAsTextInputFormat.class.getSimpleName())) {
  695.         fmt = SequenceFileAsTextInputFormat.class;
  696.       } else {
  697.         c = StreamUtil.goodClassOrNull(jobConf_, inputFormatSpec_, defaultPackage);
  698.         if (c != null) {
  699.           fmt = c;
  700.         } else {
  701.           fail("-inputformat : class not found : " + inputFormatSpec_);
  702.         }
  703.       }
  704.     } 
  705.     if (fmt == null) {
  706.       fmt = StreamInputFormat.class;
  707.     }
  708.     jobConf_.setInputFormat(fmt);
  709.     jobConf_.setOutputKeyClass(Text.class);
  710.     jobConf_.setOutputValueClass(Text.class);
  711.     jobConf_.set("stream.addenvironment", addTaskEnvironment_);
  712.     if (mapCmd_ != null) {
  713.       c = StreamUtil.goodClassOrNull(jobConf_, mapCmd_, defaultPackage);
  714.       if (c != null) {
  715.         jobConf_.setMapperClass(c);
  716.       } else {
  717.         jobConf_.setMapperClass(PipeMapper.class);
  718.         jobConf_.setMapRunnerClass(PipeMapRunner.class);
  719.         jobConf_.set("stream.map.streamprocessor", 
  720.                      URLEncoder.encode(mapCmd_, "UTF-8"));
  721.       }
  722.     }
  723.     if (comCmd_ != null) {
  724.       c = StreamUtil.goodClassOrNull(jobConf_, comCmd_, defaultPackage);
  725.       if (c != null) {
  726.         jobConf_.setCombinerClass(c);
  727.       } else {
  728.         fail("-combiner : class not found : " + comCmd_);
  729.       }
  730.     }
  731.     boolean reducerNone_ = false;
  732.     if (redCmd_ != null) {
  733.       reducerNone_ = redCmd_.equals(REDUCE_NONE);
  734.       if (redCmd_.compareToIgnoreCase("aggregate") == 0) {
  735.         jobConf_.setReducerClass(ValueAggregatorReducer.class);
  736.         jobConf_.setCombinerClass(ValueAggregatorCombiner.class);
  737.       } else {
  738.         c = StreamUtil.goodClassOrNull(jobConf_, redCmd_, defaultPackage);
  739.         if (c != null) {
  740.           jobConf_.setReducerClass(c);
  741.         } else {
  742.           jobConf_.setReducerClass(PipeReducer.class);
  743.           jobConf_.set("stream.reduce.streamprocessor", URLEncoder.encode(
  744.               redCmd_, "UTF-8"));
  745.         }
  746.       }
  747.     }
  748.     if (inReaderSpec_ != null) {
  749.       String[] args = inReaderSpec_.split(",");
  750.       String readerClass = args[0];
  751.       // this argument can only be a Java class
  752.       c = StreamUtil.goodClassOrNull(jobConf_, readerClass, defaultPackage);
  753.       if (c != null) {
  754.         jobConf_.set("stream.recordreader.class", c.getName());
  755.       } else {
  756.         fail("-inputreader: class not found: " + readerClass);
  757.       }
  758.       for (int i = 1; i < args.length; i++) {
  759.         String[] nv = args[i].split("=", 2);
  760.         String k = "stream.recordreader." + nv[0];
  761.         String v = (nv.length > 1) ? nv[1] : "";
  762.         jobConf_.set(k, v);
  763.       }
  764.     }
  765.     
  766.     FileOutputFormat.setOutputPath(jobConf_, new Path(output_));
  767.     fmt = null;
  768.     if (outputFormatSpec_!= null) {
  769.       c = StreamUtil.goodClassOrNull(jobConf_, outputFormatSpec_, defaultPackage);
  770.       if (c != null) {
  771.         fmt = c;
  772.       } else {
  773.         fail("-outputformat : class not found : " + outputFormatSpec_);
  774.       }
  775.     }
  776.     if (fmt == null) {
  777.       fmt = TextOutputFormat.class;
  778.     }
  779.     jobConf_.setOutputFormat(fmt);
  780.     if (partitionerSpec_!= null) {
  781.       c = StreamUtil.goodClassOrNull(jobConf_, partitionerSpec_, defaultPackage);
  782.       if (c != null) {
  783.         jobConf_.setPartitionerClass(c);
  784.       } else {
  785.         fail("-partitioner : class not found : " + partitionerSpec_);
  786.       }
  787.     }
  788.     
  789.     if (numReduceTasksSpec_!= null) {
  790.       int numReduceTasks = Integer.parseInt(numReduceTasksSpec_);
  791.       jobConf_.setNumReduceTasks(numReduceTasks);
  792.     }
  793.     if (reducerNone_) {
  794.       jobConf_.setNumReduceTasks(0);
  795.     }
  796.     
  797.     if(mapDebugSpec_ != null){
  798.      jobConf_.setMapDebugScript(mapDebugSpec_);
  799.     }
  800.     if(reduceDebugSpec_ != null){
  801.      jobConf_.setReduceDebugScript(reduceDebugSpec_);
  802.     }
  803.     // last, allow user to override anything
  804.     // (although typically used with properties we didn't touch)
  805.     jar_ = packageJobJar();
  806.     if (jar_ != null) {
  807.       jobConf_.setJar(jar_);
  808.     }
  809.     
  810.     if ((cacheArchives != null) || (cacheFiles != null)){
  811.       getURIs(cacheArchives, cacheFiles);
  812.       boolean b = DistributedCache.checkURIs(fileURIs, archiveURIs);
  813.       if (!b)
  814.         fail(LINK_URI);
  815.     }
  816.     DistributedCache.createSymlink(jobConf_);
  817.     // set the jobconf for the caching parameters
  818.     if (cacheArchives != null)
  819.       DistributedCache.setCacheArchives(archiveURIs, jobConf_);
  820.     if (cacheFiles != null)
  821.       DistributedCache.setCacheFiles(fileURIs, jobConf_);
  822.     
  823.     if (verbose_) {
  824.       listJobConfProperties();
  825.     }
  826.    
  827.     msg("submitting to jobconf: " + getJobTrackerHostPort());
  828.   }
  829.   /**
  830.    * Prints out the jobconf properties on stdout
  831.    * when verbose is specified.
  832.    */
  833.   protected void listJobConfProperties()
  834.   {
  835.     msg("==== JobConf properties:");
  836.     Iterator it = jobConf_.iterator();
  837.     TreeMap sorted = new TreeMap();
  838.     while(it.hasNext()) {
  839.       Map.Entry en = (Map.Entry)it.next();
  840.       sorted.put(en.getKey(), en.getValue());
  841.     }
  842.     it = sorted.entrySet().iterator();
  843.     while(it.hasNext()) {
  844.       Map.Entry en = (Map.Entry)it.next();
  845.       msg(en.getKey() + "=" + en.getValue());
  846.     }
  847.     msg("====");
  848.   }
  849.   protected String getJobTrackerHostPort() {
  850.     return jobConf_.get("mapred.job.tracker");
  851.   }
  852.   protected void jobInfo() {
  853.     if (isLocalHadoop()) {
  854.       LOG.info("Job running in-process (local Hadoop)");
  855.     } else {
  856.       String hp = getJobTrackerHostPort();
  857.       LOG.info("To kill this job, run:");
  858.       LOG.info(getHadoopClientHome() + "/bin/hadoop job  -Dmapred.job.tracker=" + hp + " -kill "
  859.                + jobId_);
  860.       //LOG.info("Job file: " + running_.getJobFile());
  861.       LOG.info("Tracking URL: " + StreamUtil.qualifyHost(running_.getTrackingURL()));
  862.     }
  863.   }
  864.   // Based on JobClient
  865.   public int submitAndMonitorJob() throws IOException {
  866.     if (jar_ != null && isLocalHadoop()) {
  867.       // getAbs became required when shell and subvm have different working dirs...
  868.       File wd = new File(".").getAbsoluteFile();
  869.       StreamUtil.unJar(new File(jar_), wd);
  870.     }
  871.     // if jobConf_ changes must recreate a JobClient
  872.     jc_ = new JobClient(jobConf_);
  873.     boolean error = true;
  874.     running_ = null;
  875.     String lastReport = null;
  876.     try {
  877.       running_ = jc_.submitJob(jobConf_);
  878.       jobId_ = running_.getID();
  879.       LOG.info("getLocalDirs(): " + Arrays.asList(jobConf_.getLocalDirs()));
  880.       LOG.info("Running job: " + jobId_);
  881.       jobInfo();
  882.       while (!running_.isComplete()) {
  883.         try {
  884.           Thread.sleep(1000);
  885.         } catch (InterruptedException e) {
  886.         }
  887.         running_ = jc_.getJob(jobId_);
  888.         String report = null;
  889.         report = " map " + Math.round(running_.mapProgress() * 100) + "%  reduce "
  890.           + Math.round(running_.reduceProgress() * 100) + "%";
  891.         if (!report.equals(lastReport)) {
  892.           LOG.info(report);
  893.           lastReport = report;
  894.         }
  895.       }
  896.       if (!running_.isSuccessful()) {
  897.         jobInfo();
  898. LOG.error("Job not Successful!");
  899. return 1;
  900.       }
  901.       LOG.info("Job complete: " + jobId_);
  902.       LOG.info("Output: " + output_);
  903.       error = false;
  904.     } catch(FileNotFoundException fe) {
  905.       LOG.error("Error launching job , bad input path : " + fe.getMessage());
  906.       return 2;
  907.     } catch(InvalidJobConfException je) {
  908.       LOG.error("Error launching job , Invalid job conf : " + je.getMessage());
  909.       return 3;
  910.     } catch(FileAlreadyExistsException fae) {
  911.       LOG.error("Error launching job , Output path already exists : " 
  912.                 + fae.getMessage());
  913.       return 4;
  914.     } catch(IOException ioe) {
  915.       LOG.error("Error Launching job : " + ioe.getMessage());
  916.       return 5;
  917.     } finally {
  918.       if (error && (running_ != null)) {
  919.         LOG.info("killJob...");
  920.         running_.killJob();
  921.       }
  922.       jc_.close();
  923.     }
  924.     return 0;
  925.   }
  926.   /** Support -jobconf x=y x1=y1 type options **/
  927.   static class MultiPropertyOption extends PropertyOption{
  928.     private String optionString; 
  929.     MultiPropertyOption(){
  930.       super(); 
  931.     }
  932.     
  933.     MultiPropertyOption(final String optionString,
  934.                         final String description,
  935.                         final int id){
  936.       super(optionString, description, id); 
  937.       this.optionString = optionString;
  938.     }
  939.     @Override
  940.     public boolean canProcess(final WriteableCommandLine commandLine,
  941.                               final String argument) {
  942.       boolean ret = (argument != null) && argument.startsWith(optionString);
  943.         
  944.       return ret;
  945.     }    
  946.     @Override
  947.     public void process(final WriteableCommandLine commandLine,
  948.                         final ListIterator arguments) throws OptionException {
  949.       final String arg = (String) arguments.next();
  950.       if (!canProcess(commandLine, arg)) {
  951.         throw new OptionException(this, 
  952.                                   ResourceConstants.UNEXPECTED_TOKEN, arg);
  953.       }
  954.       
  955.       ArrayList properties = new ArrayList(); 
  956.       String next = ""; 
  957.       while(arguments.hasNext()){
  958.         next = (String) arguments.next();
  959.         if (!next.startsWith("-")){
  960.           properties.add(next);
  961.         }else{
  962.           arguments.previous();
  963.           break; 
  964.         }
  965.       } 
  966.       // add to any existing values (support specifying args multiple times)
  967.       List<String> oldVal = (List<String>)commandLine.getValue(this); 
  968.       if (oldVal == null){
  969.         commandLine.addValue(this, properties);
  970.       }else{
  971.         oldVal.addAll(properties); 
  972.       }
  973.     }
  974.   }
  975.   protected String[] argv_;
  976.   protected boolean verbose_;
  977.   protected boolean detailedUsage_;
  978.   protected int debug_;
  979.   protected Environment env_;
  980.   protected String jar_;
  981.   protected boolean localHadoop_;
  982.   protected Configuration config_;
  983.   protected JobConf jobConf_;
  984.   protected JobClient jc_;
  985.   // command-line arguments
  986.   protected ArrayList inputSpecs_ = new ArrayList(); // <String>
  987.   protected TreeSet seenPrimary_ = new TreeSet(); // <String>
  988.   protected boolean hasSimpleInputSpecs_;
  989.   protected ArrayList packageFiles_ = new ArrayList(); // <String>
  990.   protected ArrayList shippedCanonFiles_ = new ArrayList(); // <String>
  991.   //protected TreeMap<String, String> userJobConfProps_ = new TreeMap<String, String>(); 
  992.   protected String output_;
  993.   protected String mapCmd_;
  994.   protected String comCmd_;
  995.   protected String redCmd_;
  996.   protected String cacheFiles;
  997.   protected String cacheArchives;
  998.   protected URI[] fileURIs;
  999.   protected URI[] archiveURIs;
  1000.   protected String inReaderSpec_;
  1001.   protected String inputFormatSpec_;
  1002.   protected String outputFormatSpec_;
  1003.   protected String partitionerSpec_;
  1004.   protected String numReduceTasksSpec_;
  1005.   protected String additionalConfSpec_;
  1006.   protected String mapDebugSpec_;
  1007.   protected String reduceDebugSpec_;
  1008.   // Use to communicate config to the external processes (ex env.var.HADOOP_USER)
  1009.   // encoding "a=b c=d"
  1010.   protected String addTaskEnvironment_;
  1011.   protected boolean outputSingleNode_;
  1012.   protected long minRecWrittenToEnableSkip_;
  1013.   protected RunningJob running_;
  1014.   protected JobID jobId_;
  1015.   protected static final String LINK_URI = "You need to specify the uris as hdfs://host:port/#linkname," +
  1016.     "Please specify a different link name for all of your caching URIs";
  1017. }