PipeMapper.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.io.*;
  20. import java.net.URLDecoder;
  21. import org.apache.hadoop.mapred.JobConf;
  22. import org.apache.hadoop.mapred.Mapper;
  23. import org.apache.hadoop.mapred.Reporter;
  24. import org.apache.hadoop.mapred.OutputCollector;
  25. import org.apache.hadoop.mapred.SkipBadRecords;
  26. import org.apache.hadoop.mapred.TextInputFormat;
  27. import org.apache.hadoop.util.StringUtils;
  28. /** A generic Mapper bridge.
  29.  *  It delegates operations to an external program via stdin and stdout.
  30.  */
  31. public class PipeMapper extends PipeMapRed implements Mapper {
  32.   private boolean ignoreKey = false;
  33.   private boolean skipping = false;
  34.   private byte[] mapOutputFieldSeparator;
  35.   private byte[] mapInputFieldSeparator;
  36.   private int numOfMapOutputKeyFields = 1;
  37.   
  38.   String getPipeCommand(JobConf job) {
  39.     String str = job.get("stream.map.streamprocessor");
  40.     if (str == null) {
  41.       return str;
  42.     }
  43.     try {
  44.       return URLDecoder.decode(str, "UTF-8");
  45.     }
  46.     catch (UnsupportedEncodingException e) {
  47.       System.err.println("stream.map.streamprocessor in jobconf not found");
  48.       return null;
  49.     }
  50.   }
  51.   boolean getDoPipe() {
  52.     return true;
  53.   }
  54.   
  55.   public void configure(JobConf job) {
  56.     super.configure(job);
  57.     //disable the auto increment of the counter. For streaming, no of 
  58.     //processed records could be different(equal or less) than the no of 
  59.     //records input.
  60.     SkipBadRecords.setAutoIncrMapperProcCount(job, false);
  61.     skipping = job.getBoolean("mapred.skip.on", false);
  62.     String inputFormatClassName = job.getClass("mapred.input.format.class", TextInputFormat.class).getCanonicalName();
  63.     ignoreKey = inputFormatClassName.equals(TextInputFormat.class.getCanonicalName());
  64.     try {
  65.       mapOutputFieldSeparator = job.get("stream.map.output.field.separator", "t").getBytes("UTF-8");
  66.       mapInputFieldSeparator = job.get("stream.map.input.field.separator", "t").getBytes("UTF-8");
  67.       numOfMapOutputKeyFields = job.getInt("stream.num.map.output.key.fields", 1);
  68.     } catch (UnsupportedEncodingException e) {
  69.       throw new RuntimeException("The current system does not support UTF-8 encoding!", e);
  70.     }
  71.   }
  72.   // Do NOT declare default constructor
  73.   // (MapRed creates it reflectively)
  74.   public void map(Object key, Object value, OutputCollector output, Reporter reporter) throws IOException {
  75.     if (outerrThreadsThrowable != null) {
  76.       mapRedFinished();
  77.       throw new IOException ("MROutput/MRErrThread failed:"
  78.                              + StringUtils.stringifyException(
  79.                                                               outerrThreadsThrowable));
  80.     }
  81.     try {
  82.       // 1/4 Hadoop in
  83.       numRecRead_++;
  84.       maybeLogRecord();
  85.       if (debugFailDuring_ && numRecRead_ == 3) {
  86.         throw new IOException("debugFailDuring_");
  87.       }
  88.       // 2/4 Hadoop to Tool
  89.       if (numExceptions_ == 0) {
  90.         if (!this.ignoreKey) {
  91.           write(key);
  92.           clientOut_.write(getInputSeparator());
  93.         }
  94.         write(value);
  95.         clientOut_.write('n');
  96.         if(skipping) {
  97.           //flush the streams on every record input if running in skip mode
  98.           //so that we don't buffer other records surrounding a bad record. 
  99.           clientOut_.flush();
  100.         }
  101.       } else {
  102.         numRecSkipped_++;
  103.       }
  104.     } catch (IOException io) {
  105.       numExceptions_++;
  106.       if (numExceptions_ > 1 || numRecWritten_ < minRecWrittenToEnableSkip_) {
  107.         // terminate with failure
  108.         String msg = logFailure(io);
  109.         appendLogToJobLog("failure");
  110.         mapRedFinished();
  111.         throw new IOException(msg);
  112.       } else {
  113.         // terminate with success:
  114.         // swallow input records although the stream processor failed/closed
  115.       }
  116.     }
  117.   }
  118.   public void close() {
  119.     appendLogToJobLog("success");
  120.     mapRedFinished();
  121.   }
  122.   byte[] getInputSeparator() {
  123.     return mapInputFieldSeparator;
  124.   }
  125.   @Override
  126.   byte[] getFieldSeparator() {
  127.     return mapOutputFieldSeparator;
  128.   }
  129.   @Override
  130.   int getNumOfKeyFields() {
  131.     return numOfMapOutputKeyFields;
  132.   }
  133. }