OutputHandler.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.pipes;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.HashMap;
  22. import java.util.List;
  23. import java.util.Map;
  24. import org.apache.hadoop.io.FloatWritable;
  25. import org.apache.hadoop.io.NullWritable;
  26. import org.apache.hadoop.io.Writable;
  27. import org.apache.hadoop.io.WritableComparable;
  28. import org.apache.hadoop.mapred.Counters;
  29. import org.apache.hadoop.mapred.OutputCollector;
  30. import org.apache.hadoop.mapred.RecordReader;
  31. import org.apache.hadoop.mapred.Reporter;
  32. /**
  33.  * Handles the upward (C++ to Java) messages from the application.
  34.  */
  35. class OutputHandler<K extends WritableComparable,
  36.                     V extends Writable>
  37.   implements UpwardProtocol<K, V> {
  38.   
  39.   private Reporter reporter;
  40.   private OutputCollector<K, V> collector;
  41.   private float progressValue = 0.0f;
  42.   private boolean done = false;
  43.   private Throwable exception = null;
  44.   RecordReader<FloatWritable,NullWritable> recordReader = null;
  45.   private Map<Integer, Counters.Counter> registeredCounters = 
  46.     new HashMap<Integer, Counters.Counter>();
  47.   /**
  48.    * Create a handler that will handle any records output from the application.
  49.    * @param collector the "real" collector that takes the output
  50.    * @param reporter the reporter for reporting progress
  51.    */
  52.   public OutputHandler(OutputCollector<K, V> collector, Reporter reporter, 
  53.                        RecordReader<FloatWritable,NullWritable> recordReader) {
  54.     this.reporter = reporter;
  55.     this.collector = collector;
  56.     this.recordReader = recordReader;
  57.   }
  58.   /**
  59.    * The task output a normal record.
  60.    */
  61.   public void output(K key, V value) throws IOException {
  62.     collector.collect(key, value);
  63.   }
  64.   /**
  65.    * The task output a record with a partition number attached.
  66.    */
  67.   public void partitionedOutput(int reduce, K key, 
  68.                                 V value) throws IOException {
  69.     PipesPartitioner.setNextPartition(reduce);
  70.     collector.collect(key, value);
  71.   }
  72.   /**
  73.    * Update the status message for the task.
  74.    */
  75.   public void status(String msg) {
  76.     reporter.setStatus(msg);
  77.   }
  78.   private FloatWritable progressKey = new FloatWritable(0.0f);
  79.   private NullWritable nullValue = NullWritable.get();
  80.   /**
  81.    * Update the amount done and call progress on the reporter.
  82.    */
  83.   public void progress(float progress) throws IOException {
  84.     progressValue = progress;
  85.     reporter.progress();
  86.     
  87.     if (recordReader != null) {
  88.       progressKey.set(progress);
  89.       recordReader.next(progressKey, nullValue);
  90.     }
  91.   }
  92.   /**
  93.    * The task finished successfully.
  94.    */
  95.   public void done() throws IOException {
  96.     synchronized (this) {
  97.       done = true;
  98.       notify();
  99.     }
  100.   }
  101.   /**
  102.    * Get the current amount done.
  103.    * @return a float between 0.0 and 1.0
  104.    */
  105.   public float getProgress() {
  106.     return progressValue;
  107.   }
  108.   /**
  109.    * The task failed with an exception.
  110.    */
  111.   public void failed(Throwable e) {
  112.     synchronized (this) {
  113.       exception = e;
  114.       notify();
  115.     }
  116.   }
  117.   /**
  118.    * Wait for the task to finish or abort.
  119.    * @return did the task finish correctly?
  120.    * @throws Throwable
  121.    */
  122.   public synchronized boolean waitForFinish() throws Throwable {
  123.     while (!done && exception == null) {
  124.       wait();
  125.     }
  126.     if (exception != null) {
  127.       throw exception;
  128.     }
  129.     return done;
  130.   }
  131.   public void registerCounter(int id, String group, String name) throws IOException {
  132.     Counters.Counter counter = reporter.getCounter(group, name);
  133.     registeredCounters.put(id, counter);
  134.   }
  135.   public void incrementCounter(int id, long amount) throws IOException {
  136.     if (id < registeredCounters.size()) {
  137.       Counters.Counter counter = registeredCounters.get(id);
  138.       counter.increment(amount);
  139.     } else {
  140.       throw new IOException("Invalid counter with id: " + id);
  141.     }
  142.   }
  143. }