IOMapperBase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.fs;
  19. import java.io.IOException;
  20. import java.net.InetAddress;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.conf.Configured;
  23. import org.apache.hadoop.io.LongWritable;
  24. import org.apache.hadoop.io.UTF8;
  25. import org.apache.hadoop.io.Writable;
  26. import org.apache.hadoop.io.WritableComparable;
  27. import org.apache.hadoop.mapred.JobConf;
  28. import org.apache.hadoop.mapred.Mapper;
  29. import org.apache.hadoop.mapred.OutputCollector;
  30. import org.apache.hadoop.mapred.Reporter;
  31. /**
  32.  * Base mapper class for IO operations.
  33.  * <p>
  34.  * Two abstract method {@link #doIO(Reporter, String, long)} and 
  35.  * {@link #collectStats(OutputCollector,String,long,Object)} should be
  36.  * overloaded in derived classes to define the IO operation and the
  37.  * statistics data to be collected by subsequent reducers.
  38.  * 
  39.  */
  40. public abstract class IOMapperBase extends Configured
  41.     implements Mapper<UTF8, LongWritable, UTF8, UTF8> {
  42.   
  43.   protected byte[] buffer;
  44.   protected int bufferSize;
  45.   protected FileSystem fs;
  46.   protected String hostName;
  47.   public IOMapperBase(Configuration conf) { 
  48.     super(conf); 
  49.     try {
  50.       fs = FileSystem.get(conf);
  51.     } catch (Exception e) {
  52.       throw new RuntimeException("Cannot create file system.", e);
  53.     }
  54.     bufferSize = conf.getInt("test.io.file.buffer.size", 4096);
  55.     buffer = new byte[bufferSize];
  56.     try {
  57.       hostName = InetAddress.getLocalHost().getHostName();
  58.     } catch(Exception e) {
  59.       hostName = "localhost";
  60.     }
  61.   }
  62.   public void configure(JobConf job) {
  63.     setConf(job);
  64.   }
  65.   public void close() throws IOException {
  66.   }
  67.   
  68.   /**
  69.    * Perform io operation, usually read or write.
  70.    * 
  71.    * @param reporter
  72.    * @param name file name
  73.    * @param value offset within the file
  74.    * @return object that is passed as a parameter to 
  75.    *          {@link #collectStats(OutputCollector,String,long,Object)}
  76.    * @throws IOException
  77.    */
  78.   abstract Object doIO(Reporter reporter, 
  79.                        String name, 
  80.                        long value) throws IOException;
  81.   /**
  82.    * Collect stat data to be combined by a subsequent reducer.
  83.    * 
  84.    * @param output
  85.    * @param name file name
  86.    * @param execTime IO execution time
  87.    * @param doIOReturnValue value returned by {@link #doIO(Reporter,String,long)}
  88.    * @throws IOException
  89.    */
  90.   abstract void collectStats(OutputCollector<UTF8, UTF8> output, 
  91.                              String name, 
  92.                              long execTime, 
  93.                              Object doIOReturnValue) throws IOException;
  94.   
  95.   /**
  96.    * Map file name and offset into statistical data.
  97.    * <p>
  98.    * The map task is to get the 
  99.    * <tt>key</tt>, which contains the file name, and the 
  100.    * <tt>value</tt>, which is the offset within the file.
  101.    * 
  102.    * The parameters are passed to the abstract method 
  103.    * {@link #doIO(Reporter,String,long)}, which performs the io operation, 
  104.    * usually read or write data, and then 
  105.    * {@link #collectStats(OutputCollector,String,long,Object)} 
  106.    * is called to prepare stat data for a subsequent reducer.
  107.    */
  108.   public void map(UTF8 key, 
  109.                   LongWritable value,
  110.                   OutputCollector<UTF8, UTF8> output, 
  111.                   Reporter reporter) throws IOException {
  112.     String name = key.toString();
  113.     long longValue = value.get();
  114.     
  115.     reporter.setStatus("starting " + name + " ::host = " + hostName);
  116.     
  117.     long tStart = System.currentTimeMillis();
  118.     Object statValue = doIO(reporter, name, longValue);
  119.     long tEnd = System.currentTimeMillis();
  120.     long execTime = tEnd - tStart;
  121.     collectStats(output, name, execTime, statValue);
  122.     
  123.     reporter.setStatus("finished " + name + " ::host = " + hostName);
  124.   }
  125. }