PipeMapRed.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import java.nio.charset.CharacterCodingException;
  21. import java.io.IOException;
  22. import java.util.Date;
  23. import java.util.Map;
  24. import java.util.Iterator;
  25. import java.util.Arrays;
  26. import java.util.ArrayList;
  27. import java.util.Properties;
  28. import org.apache.commons.logging.*;
  29. import org.apache.hadoop.fs.FileUtil;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapred.Reporter;
  32. import org.apache.hadoop.mapred.OutputCollector;
  33. import org.apache.hadoop.util.LineReader;
  34. import org.apache.hadoop.util.StringUtils;
  35. import org.apache.hadoop.util.UTF8ByteArrayUtils;
  36. import org.apache.hadoop.io.Text;
  37. import org.apache.hadoop.io.BytesWritable;
  38. import org.apache.hadoop.fs.FileSystem;
  39. /** Shared functionality for PipeMapper, PipeReducer.
  40.  */
  41. public abstract class PipeMapRed {
  42.   protected static final Log LOG = LogFactory.getLog(PipeMapRed.class.getName());
  43.   /** The command to be spawned as a subprocess.
  44.    * Mapper/Reducer operations will delegate to it
  45.    */
  46.   abstract String getPipeCommand(JobConf job);
  47.   abstract byte[] getFieldSeparator();
  48.   abstract int getNumOfKeyFields();
  49.   abstract boolean getDoPipe();
  50.   final static int OUTSIDE = 1;
  51.   final static int SINGLEQ = 2;
  52.   final static int DOUBLEQ = 3;
  53.   
  54.   private final static int BUFFER_SIZE = 128 * 1024;
  55.   static String[] splitArgs(String args) {
  56.     ArrayList argList = new ArrayList();
  57.     char[] ch = args.toCharArray();
  58.     int clen = ch.length;
  59.     int state = OUTSIDE;
  60.     int argstart = 0;
  61.     for (int c = 0; c <= clen; c++) {
  62.       boolean last = (c == clen);
  63.       int lastState = state;
  64.       boolean endToken = false;
  65.       if (!last) {
  66.         if (ch[c] == ''') {
  67.           if (state == OUTSIDE) {
  68.             state = SINGLEQ;
  69.           } else if (state == SINGLEQ) {
  70.             state = OUTSIDE;
  71.           }
  72.           endToken = (state != lastState);
  73.         } else if (ch[c] == '"') {
  74.           if (state == OUTSIDE) {
  75.             state = DOUBLEQ;
  76.           } else if (state == DOUBLEQ) {
  77.             state = OUTSIDE;
  78.           }
  79.           endToken = (state != lastState);
  80.         } else if (ch[c] == ' ') {
  81.           if (state == OUTSIDE) {
  82.             endToken = true;
  83.           }
  84.         }
  85.       }
  86.       if (last || endToken) {
  87.         if (c == argstart) {
  88.           // unquoted space
  89.         } else {
  90.           String a;
  91.           a = args.substring(argstart, c);
  92.           argList.add(a);
  93.         }
  94.         argstart = c + 1;
  95.         lastState = state;
  96.       }
  97.     }
  98.     return (String[]) argList.toArray(new String[0]);
  99.   }
  100.   public void configure(JobConf job) {
  101.     try {
  102.       String argv = getPipeCommand(job);
  103.       joinDelay_ = job.getLong("stream.joindelay.milli", 0);
  104.       job_ = job;
  105.       fs_ = FileSystem.get(job_);
  106.       nonZeroExitIsFailure_ = job_.getBoolean("stream.non.zero.exit.is.failure", true);
  107.       
  108.       doPipe_ = getDoPipe();
  109.       if (!doPipe_) return;
  110.       setStreamJobDetails(job);
  111.       
  112.       String[] argvSplit = splitArgs(argv);
  113.       String prog = argvSplit[0];
  114.       File currentDir = new File(".").getAbsoluteFile();
  115.       if (new File(prog).isAbsolute()) {
  116.         // we don't own it. Hope it is executable
  117.       } else {
  118.         FileUtil.chmod(new File(currentDir, prog).toString(), "a+x");
  119.       }
  120.       // 
  121.       // argvSplit[0]:
  122.       // An absolute path should be a preexisting valid path on all TaskTrackers
  123.       // A relative path is converted into an absolute pathname by looking
  124.       // up the PATH env variable. If it still fails, look it up in the
  125.       // tasktracker's local working directory
  126.       //
  127.       if (!new File(argvSplit[0]).isAbsolute()) {
  128.         PathFinder finder = new PathFinder("PATH");
  129.         finder.prependPathComponent(currentDir.toString());
  130.         File f = finder.getAbsolutePath(argvSplit[0]);
  131.         if (f != null) {
  132.           argvSplit[0] = f.getAbsolutePath();
  133.         }
  134.         f = null;
  135.       }
  136.       logprintln("PipeMapRed exec " + Arrays.asList(argvSplit));
  137.       Environment childEnv = (Environment) StreamUtil.env().clone();
  138.       addJobConfToEnvironment(job_, childEnv);
  139.       addEnvironment(childEnv, job_.get("stream.addenvironment"));
  140.       // add TMPDIR environment variable with the value of java.io.tmpdir
  141.       envPut(childEnv, "TMPDIR", System.getProperty("java.io.tmpdir"));
  142.       // Start the process
  143.       ProcessBuilder builder = new ProcessBuilder(argvSplit);
  144.       builder.environment().putAll(childEnv.toMap());
  145.       sim = builder.start();
  146.       clientOut_ = new DataOutputStream(new BufferedOutputStream(
  147.                                               sim.getOutputStream(),
  148.                                               BUFFER_SIZE));
  149.       clientIn_ = new DataInputStream(new BufferedInputStream(
  150.                                               sim.getInputStream(),
  151.                                               BUFFER_SIZE));
  152.       clientErr_ = new DataInputStream(new BufferedInputStream(sim.getErrorStream()));
  153.       startTime_ = System.currentTimeMillis();
  154.       errThread_ = new MRErrorThread();
  155.       errThread_.start();
  156.     } catch (Exception e) {
  157.       logStackTrace(e);
  158.       LOG.error("configuration exception", e);
  159.       throw new RuntimeException("configuration exception", e);
  160.     }
  161.   }
  162.   
  163.   void setStreamJobDetails(JobConf job) {
  164.     jobLog_ = job.get("stream.jobLog_");
  165.     String s = job.get("stream.minRecWrittenToEnableSkip_");
  166.     if (s != null) {
  167.       minRecWrittenToEnableSkip_ = Long.parseLong(s);
  168.       logprintln("JobConf set minRecWrittenToEnableSkip_ =" + minRecWrittenToEnableSkip_);
  169.     }
  170.     taskId_ = StreamUtil.getTaskInfo(job_);
  171.   }
  172.   void logStackTrace(Exception e) {
  173.     if (e == null) return;
  174.     e.printStackTrace();
  175.     if (log_ != null) {
  176.       e.printStackTrace(log_);
  177.     }
  178.   }
  179.   void logprintln(String s) {
  180.     if (log_ != null) {
  181.       log_.println(s);
  182.     } else {
  183.       LOG.info(s); // or LOG.info()
  184.     }
  185.   }
  186.   void logflush() {
  187.     if (log_ != null) {
  188.       log_.flush();
  189.     }
  190.   }
  191.   void addJobConfToEnvironment(JobConf conf, Properties env) {
  192.     if (debug_) {
  193.       logprintln("addJobConfToEnvironment: begin");
  194.     }
  195.     Iterator it = conf.iterator();
  196.     while (it.hasNext()) {
  197.       Map.Entry en = (Map.Entry) it.next();
  198.       String name = (String) en.getKey();
  199.       //String value = (String)en.getValue(); // does not apply variable expansion
  200.       String value = conf.get(name); // does variable expansion 
  201.       name = safeEnvVarName(name);
  202.       envPut(env, name, value);
  203.     }
  204.     if (debug_) {
  205.       logprintln("addJobConfToEnvironment: end");
  206.     }
  207.   }
  208.   String safeEnvVarName(String var) {
  209.     StringBuffer safe = new StringBuffer();
  210.     int len = var.length();
  211.     for (int i = 0; i < len; i++) {
  212.       char c = var.charAt(i);
  213.       char s;
  214.       if ((c >= '0' && c <= '9') || (c >= 'A' && c <= 'Z') || (c >= 'a' && c <= 'z')) {
  215.         s = c;
  216.       } else {
  217.         s = '_';
  218.       }
  219.       safe.append(s);
  220.     }
  221.     return safe.toString();
  222.   }
  223.   void addEnvironment(Properties env, String nameVals) {
  224.     // encoding "a=b c=d" from StreamJob
  225.     if (nameVals == null) return;
  226.     String[] nv = nameVals.split(" ");
  227.     for (int i = 0; i < nv.length; i++) {
  228.       String[] pair = nv[i].split("=", 2);
  229.       if (pair.length != 2) {
  230.         logprintln("Skip ev entry:" + nv[i]);
  231.       } else {
  232.         envPut(env, pair[0], pair[1]);
  233.       }
  234.     }
  235.   }
  236.   void envPut(Properties env, String name, String value) {
  237.     if (debug_) {
  238.       logprintln("Add  ev entry:" + name + "=" + value);
  239.     }
  240.     env.put(name, value);
  241.   }
  242.   /** .. and if successful: delete the task log */
  243.   void appendLogToJobLog(String status) {
  244.     if (jobLog_ == null) {
  245.       return; // not using a common joblog
  246.     }
  247.     if (log_ != null) {
  248.       StreamUtil.exec("/bin/rm " + LOGNAME, log_);
  249.     }
  250.   }
  251.   void startOutputThreads(OutputCollector output, Reporter reporter) {
  252.     outThread_ = new MROutputThread(output, reporter);
  253.     outThread_.start();
  254.     errThread_.setReporter(reporter);
  255.   }
  256.   void waitOutputThreads() {
  257.     try {
  258.       if (outThread_ == null) {
  259.         // This happens only when reducer has empty input(So reduce() is not
  260.         // called at all in this task). If reducer still generates output,
  261.         // which is very uncommon and we may not have to support this case.
  262.         // So we don't write this output to HDFS, but we consume/collect
  263.         // this output just to avoid reducer hanging forever.
  264.         OutputCollector collector = new OutputCollector() {
  265.           public void collect(Object key, Object value)
  266.             throws IOException {
  267.             //just consume it, no need to write the record anywhere
  268.           }
  269.         };
  270.         Reporter reporter = Reporter.NULL;//dummy reporter
  271.         startOutputThreads(collector, reporter);
  272.       }
  273.       int exitVal = sim.waitFor();
  274.       // how'd it go?
  275.       if (exitVal != 0) {
  276.         if (nonZeroExitIsFailure_) {
  277.           throw new RuntimeException("PipeMapRed.waitOutputThreads(): subprocess failed with code "
  278.                                      + exitVal);
  279.         } else {
  280.           logprintln("PipeMapRed.waitOutputThreads(): subprocess exited with code " + exitVal
  281.                      + " in " + PipeMapRed.class.getName());
  282.         }
  283.       }
  284.       if (outThread_ != null) {
  285.         outThread_.join(joinDelay_);
  286.       }
  287.       if (errThread_ != null) {
  288.         errThread_.join(joinDelay_);
  289.       }
  290.     } catch (InterruptedException e) {
  291.       //ignore
  292.     }
  293.   }
  294.   /**
  295.    * Split a line into key and value.
  296.    * @param line: a byte array of line containing UTF-8 bytes
  297.    * @param key: key of a record
  298.    * @param val: value of a record
  299.    * @throws IOException
  300.    */
  301.   void splitKeyVal(byte[] line, int length, Text key, Text val)
  302.   throws IOException {
  303.     int numKeyFields = getNumOfKeyFields();
  304.     byte[] separator = getFieldSeparator();
  305.     
  306.     // Need to find numKeyFields separators
  307.     int pos = UTF8ByteArrayUtils.findBytes(line, 0, length, separator);
  308.     for(int k=1; k<numKeyFields && pos!=-1; k++) {
  309.       pos = UTF8ByteArrayUtils.findBytes(line, pos + separator.length, 
  310.           length, separator);
  311.     }
  312.     try {
  313.       if (pos == -1) {
  314.         key.set(line, 0, length);
  315.         val.set("");
  316.       } else {
  317.         StreamKeyValUtil.splitKeyVal(line, 0, length, key, val, pos, separator.length);
  318.       }
  319.     } catch (CharacterCodingException e) {
  320.       LOG.warn(StringUtils.stringifyException(e));
  321.     }
  322.   }
  323.   class MROutputThread extends Thread {
  324.     MROutputThread(OutputCollector output, Reporter reporter) {
  325.       setDaemon(true);
  326.       this.output = output;
  327.       this.reporter = reporter;
  328.     }
  329.     public void run() {
  330.       LineReader lineReader = null;
  331.       try {
  332.         Text key = new Text();
  333.         Text val = new Text();
  334.         Text line = new Text();
  335.         lineReader = new LineReader((InputStream)clientIn_, job_);
  336.         // 3/4 Tool to Hadoop
  337.         while (lineReader.readLine(line) > 0) {
  338.           answer = line.getBytes();
  339.           splitKeyVal(answer, line.getLength(), key, val);
  340.           output.collect(key, val);
  341.           line.clear();
  342.           numRecWritten_++;
  343.           long now = System.currentTimeMillis();
  344.           if (now-lastStdoutReport > reporterOutDelay_) {
  345.             lastStdoutReport = now;
  346.             String hline = "Records R/W=" + numRecRead_ + "/" + numRecWritten_;
  347.             reporter.setStatus(hline);
  348.             logprintln(hline);
  349.             logflush();
  350.           }
  351.         }
  352.         if (lineReader != null) {
  353.           lineReader.close();
  354.         }
  355.         if (clientIn_ != null) {
  356.           clientIn_.close();
  357.           clientIn_ = null;
  358.           LOG.info("MROutputThread done");
  359.         }
  360.       } catch (Throwable th) {
  361.         outerrThreadsThrowable = th;
  362.         LOG.warn(StringUtils.stringifyException(th));
  363.         try {
  364.           if (lineReader != null) {
  365.             lineReader.close();
  366.           }
  367.           if (clientIn_ != null) {
  368.             clientIn_.close();
  369.             clientIn_ = null;
  370.           }
  371.         } catch (IOException io) {
  372.           LOG.info(StringUtils.stringifyException(io));
  373.         }
  374.       }
  375.     }
  376.     OutputCollector output;
  377.     Reporter reporter;
  378.     byte[] answer;
  379.     long lastStdoutReport = 0;
  380.     
  381.   }
  382.   class MRErrorThread extends Thread {
  383.     public MRErrorThread() {
  384.       this.reporterPrefix = job_.get("stream.stderr.reporter.prefix", "reporter:");
  385.       this.counterPrefix = reporterPrefix + "counter:";
  386.       this.statusPrefix = reporterPrefix + "status:";
  387.       setDaemon(true);
  388.     }
  389.     
  390.     public void setReporter(Reporter reporter) {
  391.       this.reporter = reporter;
  392.     }
  393.       
  394.     public void run() {
  395.       Text line = new Text();
  396.       LineReader lineReader = null;
  397.       try {
  398.         lineReader = new LineReader((InputStream)clientErr_, job_);
  399.         while (lineReader.readLine(line) > 0) {
  400.           String lineStr = line.toString();
  401.           if (matchesReporter(lineStr)) {
  402.             if (matchesCounter(lineStr)) {
  403.               incrCounter(lineStr);
  404.             } else if (matchesStatus(lineStr)) {
  405.               setStatus(lineStr);
  406.             } else {
  407.               LOG.warn("Cannot parse reporter line: " + lineStr);
  408.             }
  409.           } else {
  410.             System.err.println(lineStr);
  411.           }
  412.           long now = System.currentTimeMillis(); 
  413.           if (reporter != null && now-lastStderrReport > reporterErrDelay_) {
  414.             lastStderrReport = now;
  415.             reporter.progress();
  416.           }
  417.           line.clear();
  418.         }
  419.         if (lineReader != null) {
  420.           lineReader.close();
  421.         }
  422.         if (clientErr_ != null) {
  423.           clientErr_.close();
  424.           clientErr_ = null;
  425.           LOG.info("MRErrorThread done");
  426.         }
  427.       } catch (Throwable th) {
  428.         outerrThreadsThrowable = th;
  429.         LOG.warn(StringUtils.stringifyException(th));
  430.         try {
  431.           if (lineReader != null) {
  432.             lineReader.close();
  433.           }
  434.           if (clientErr_ != null) {
  435.             clientErr_.close();
  436.             clientErr_ = null;
  437.           }
  438.         } catch (IOException io) {
  439.           LOG.info(StringUtils.stringifyException(io));
  440.         }
  441.       }
  442.     }
  443.     
  444.     private boolean matchesReporter(String line) {
  445.       return line.startsWith(reporterPrefix);
  446.     }
  447.     private boolean matchesCounter(String line) {
  448.       return line.startsWith(counterPrefix);
  449.     }
  450.     private boolean matchesStatus(String line) {
  451.       return line.startsWith(statusPrefix);
  452.     }
  453.     private void incrCounter(String line) {
  454.       String trimmedLine = line.substring(counterPrefix.length()).trim();
  455.       String[] columns = trimmedLine.split(",");
  456.       if (columns.length == 3) {
  457.         try {
  458.           reporter.incrCounter(columns[0], columns[1],
  459.               Long.parseLong(columns[2]));
  460.         } catch (NumberFormatException e) {
  461.           LOG.warn("Cannot parse counter increment '" + columns[2] +
  462.               "' from line: " + line);
  463.         }
  464.       } else {
  465.         LOG.warn("Cannot parse counter line: " + line);
  466.       }
  467.     }
  468.     private void setStatus(String line) {
  469.       reporter.setStatus(line.substring(statusPrefix.length()).trim());
  470.     }
  471.     
  472.     long lastStderrReport = 0;
  473.     volatile Reporter reporter;
  474.     private final String reporterPrefix;
  475.     private final String counterPrefix;
  476.     private final String statusPrefix;
  477.   }
  478.   public void mapRedFinished() {
  479.     try {
  480.       if (!doPipe_) {
  481.         logprintln("mapRedFinished");
  482.         return;
  483.       }
  484.       try {
  485.         if (clientOut_ != null) {
  486.           clientOut_.flush();
  487.           clientOut_.close();
  488.         }
  489.       } catch (IOException io) {
  490.       }
  491.       waitOutputThreads();
  492.       if (sim != null) sim.destroy();
  493.       logprintln("mapRedFinished");
  494.     } catch (RuntimeException e) {
  495.       logStackTrace(e);
  496.       throw e;
  497.     }
  498.     if (debugFailLate_) {
  499.       throw new RuntimeException("debugFailLate_");
  500.     }
  501.   }
  502.   void maybeLogRecord() {
  503.     if (numRecRead_ >= nextRecReadLog_) {
  504.       String info = numRecInfo();
  505.       logprintln(info);
  506.       logflush();
  507.       if (nextRecReadLog_ < 100000) {
  508.   nextRecReadLog_ *= 10;
  509.       } else {
  510.   nextRecReadLog_ += 100000;
  511.       }
  512.     }
  513.   }
  514.   public String getContext() {
  515.     String s = numRecInfo() + "n";
  516.     s += "minRecWrittenToEnableSkip_=" + minRecWrittenToEnableSkip_ + " ";
  517.     s += "LOGNAME=" + LOGNAME + "n";
  518.     s += envline("HOST");
  519.     s += envline("USER");
  520.     s += envline("HADOOP_USER");
  521.     //s += envline("PWD"); // =/home/crawler/hadoop/trunk
  522.     s += "last Hadoop input: |" + mapredKey_ + "|n";
  523.     if (outThread_ != null) {
  524.       s += "last tool output: |" + outThread_.answer + "|n";
  525.     }
  526.     s += "Date: " + new Date() + "n";
  527.     // s += envline("HADOOP_HOME");
  528.     // s += envline("REMOTE_HOST");
  529.     return s;
  530.   }
  531.   String envline(String var) {
  532.     return var + "=" + StreamUtil.env().get(var) + "n";
  533.   }
  534.   String numRecInfo() {
  535.     long elapsed = (System.currentTimeMillis() - startTime_) / 1000;
  536.     return "R/W/S=" + numRecRead_ + "/" + numRecWritten_ + "/" + numRecSkipped_ + " in:"
  537.       + safeDiv(numRecRead_, elapsed) + " [rec/s]" + " out:" + safeDiv(numRecWritten_, elapsed)
  538.       + " [rec/s]";
  539.   }
  540.   String safeDiv(long n, long d) {
  541.     return (d == 0) ? "NA" : "" + n / d + "=" + n + "/" + d;
  542.   }
  543.   String logFailure(Exception e) {
  544.     StringWriter sw = new StringWriter();
  545.     PrintWriter pw = new PrintWriter(sw);
  546.     e.printStackTrace(pw);
  547.     String msg = "log:" + jobLog_ + "n" + getContext() + sw + "n";
  548.     logprintln(msg);
  549.     return msg;
  550.   }
  551.   /**
  552.    * Write a value to the output stream using UTF-8 encoding
  553.    * @param value output value
  554.    * @throws IOException
  555.    */
  556.   void write(Object value) throws IOException {
  557.     byte[] bval;
  558.     int valSize;
  559.     if (value instanceof BytesWritable) {
  560.       BytesWritable val = (BytesWritable) value;
  561.       bval = val.getBytes();
  562.       valSize = val.getLength();
  563.     } else if (value instanceof Text) {
  564.       Text val = (Text) value;
  565.       bval = val.getBytes();
  566.       valSize = val.getLength();
  567.     } else {
  568.       String sval = value.toString();
  569.       bval = sval.getBytes("UTF-8");
  570.       valSize = bval.length;
  571.     }
  572.     clientOut_.write(bval, 0, valSize);
  573.   }
  574.   long startTime_;
  575.   long numRecRead_ = 0;
  576.   long numRecWritten_ = 0;
  577.   long numRecSkipped_ = 0;
  578.   long nextRecReadLog_ = 1;
  579.   
  580.   long minRecWrittenToEnableSkip_ = Long.MAX_VALUE;
  581.   long reporterOutDelay_ = 10*1000L; 
  582.   long reporterErrDelay_ = 10*1000L; 
  583.   long joinDelay_;
  584.   JobConf job_;
  585.   FileSystem fs_;
  586.   boolean doPipe_;
  587.   boolean debug_;
  588.   boolean debugFailEarly_;
  589.   boolean debugFailDuring_;
  590.   boolean debugFailLate_;
  591.   boolean nonZeroExitIsFailure_;
  592.   
  593.   Process sim;
  594.   MROutputThread outThread_;
  595.   String jobLog_;
  596.   MRErrorThread errThread_;
  597.   DataOutputStream clientOut_;
  598.   DataInputStream clientErr_;
  599.   DataInputStream clientIn_;
  600.   // set in PipeMapper/PipeReducer subclasses
  601.   String mapredKey_;
  602.   int numExceptions_;
  603.   StreamUtil.TaskId taskId_;
  604.   protected volatile Throwable outerrThreadsThrowable;
  605.   String LOGNAME;
  606.   PrintStream log_;
  607. }