PipeReducer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.IOException;
  20. import java.io.UnsupportedEncodingException;
  21. import java.util.Iterator;
  22. import java.net.URLDecoder;
  23. import org.apache.hadoop.mapred.JobConf;
  24. import org.apache.hadoop.mapred.Reducer;
  25. import org.apache.hadoop.mapred.Reporter;
  26. import org.apache.hadoop.mapred.OutputCollector;
  27. import org.apache.hadoop.mapred.SkipBadRecords;
  28. import org.apache.hadoop.util.StringUtils;
  29. import org.apache.hadoop.io.Writable;
  30. /** A generic Reducer bridge.
  31.  *  It delegates operations to an external program via stdin and stdout.
  32.  */
  33. public class PipeReducer extends PipeMapRed implements Reducer {
  34.   private byte[] reduceOutFieldSeparator;
  35.   private byte[] reduceInputFieldSeparator;
  36.   private int numOfReduceOutputKeyFields = 1;
  37.   private boolean skipping = false;
  38.   
  39.   String getPipeCommand(JobConf job) {
  40.     String str = job.get("stream.reduce.streamprocessor");
  41.     if (str == null) {
  42.       return str;
  43.     }
  44.     try {
  45.       return URLDecoder.decode(str, "UTF-8");
  46.     } catch (UnsupportedEncodingException e) {
  47.       System.err.println("stream.reduce.streamprocessor in jobconf not found");
  48.       return null;
  49.     }
  50.   }
  51.   boolean getDoPipe() {
  52.     String argv = getPipeCommand(job_);
  53.     // Currently: null is identity reduce. REDUCE_NONE is no-map-outputs.
  54.     return (argv != null) && !StreamJob.REDUCE_NONE.equals(argv);
  55.   }
  56.   public void configure(JobConf job) {
  57.     super.configure(job);
  58.     //disable the auto increment of the counter. For streaming, no of 
  59.     //processed records could be different(equal or less) than the no of 
  60.     //records input.
  61.     SkipBadRecords.setAutoIncrReducerProcCount(job, false);
  62.     skipping = job.getBoolean("mapred.skip.on", false);
  63.     try {
  64.       reduceOutFieldSeparator = job_.get("stream.reduce.output.field.separator", "t").getBytes("UTF-8");
  65.       reduceInputFieldSeparator = job_.get("stream.reduce.input.field.separator", "t").getBytes("UTF-8");
  66.       this.numOfReduceOutputKeyFields = job_.getInt("stream.num.reduce.output.key.fields", 1);
  67.     } catch (UnsupportedEncodingException e) {
  68.       throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  69.     }
  70.   }
  71.   public void reduce(Object key, Iterator values, OutputCollector output,
  72.                      Reporter reporter) throws IOException {
  73.     // init
  74.     if (doPipe_ && outThread_ == null) {
  75.       startOutputThreads(output, reporter);
  76.     }
  77.     try {
  78.       while (values.hasNext()) {
  79.         Writable val = (Writable) values.next();
  80.         numRecRead_++;
  81.         maybeLogRecord();
  82.         if (doPipe_) {
  83.           if (outerrThreadsThrowable != null) {
  84.             mapRedFinished();
  85.             throw new IOException ("MROutput/MRErrThread failed:"
  86.                                    + StringUtils.stringifyException(
  87.                                                                     outerrThreadsThrowable));
  88.           }
  89.           write(key);
  90.           clientOut_.write(getInputSeparator());
  91.           write(val);
  92.           clientOut_.write('n');
  93.         } else {
  94.           // "identity reduce"
  95.           output.collect(key, val);
  96.         }
  97.       }
  98.       if(doPipe_ && skipping) {
  99.         //flush the streams on every record input if running in skip mode
  100.         //so that we don't buffer other records surrounding a bad record. 
  101.         clientOut_.flush();
  102.       }
  103.     } catch (IOException io) {
  104.       // a common reason to get here is failure of the subprocess.
  105.       // Document that fact, if possible.
  106.       String extraInfo = "";
  107.       try {
  108.         int exitVal = sim.exitValue();
  109. if (exitVal == 0) {
  110.   extraInfo = "subprocess exited successfullyn";
  111. } else {
  112.   extraInfo = "subprocess exited with error code " + exitVal + "n";
  113. };
  114.       } catch (IllegalThreadStateException e) {
  115.         // hmm, but child is still running.  go figure.
  116. extraInfo = "subprocess still runningn";
  117.       };
  118.       appendLogToJobLog("failure");
  119.       mapRedFinished();
  120.       throw new IOException(extraInfo + getContext() + io.getMessage());
  121.     }
  122.   }
  123.   public void close() {
  124.     appendLogToJobLog("success");
  125.     mapRedFinished();
  126.   }
  127.   byte[] getInputSeparator() {
  128.     return reduceInputFieldSeparator;
  129.   }
  130.   @Override
  131.   byte[] getFieldSeparator() {
  132.     return reduceOutFieldSeparator;
  133.   }
  134.   
  135.   @Override
  136.   int getNumOfKeyFields() {
  137.     return numOfReduceOutputKeyFields;
  138.   }
  139. }