IndexUpdateReducer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.mapred;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.contrib.index.lucene.ShardWriter;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.io.Closeable;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.io.Writable;
  29. import org.apache.hadoop.io.WritableComparable;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapred.MapReduceBase;
  32. import org.apache.hadoop.mapred.OutputCollector;
  33. import org.apache.hadoop.mapred.Reducer;
  34. import org.apache.hadoop.mapred.Reporter;
  35. /**
  36.  * This reducer applies to a shard the changes for it. A "new version" of
  37.  * a shard is created at the end of a reduce. It is important to note that
  38.  * the new version of the shard is not derived from scratch. By leveraging
  39.  * Lucene's update algorithm, the new version of each Lucene instance will
  40.  * share as many files as possible as the previous version. 
  41.  */
  42. public class IndexUpdateReducer extends MapReduceBase implements
  43.     Reducer<Shard, IntermediateForm, Shard, Text> {
  44.   static final Log LOG = LogFactory.getLog(IndexUpdateReducer.class);
  45.   static final Text DONE = new Text("done");
  46.   /**
  47.    * Get the reduce output key class.
  48.    * @return the reduce output key class
  49.    */
  50.   public static Class<? extends WritableComparable> getOutputKeyClass() {
  51.     return Shard.class;
  52.   }
  53.   /**
  54.    * Get the reduce output value class.
  55.    * @return the reduce output value class
  56.    */
  57.   public static Class<? extends Writable> getOutputValueClass() {
  58.     return Text.class;
  59.   }
  60.   private IndexUpdateConfiguration iconf;
  61.   private String mapredTempDir;
  62.   /* (non-Javadoc)
  63.    * @see org.apache.hadoop.mapred.Reducer#reduce(java.lang.Object, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter)
  64.    */
  65.   public void reduce(Shard key, Iterator<IntermediateForm> values,
  66.       OutputCollector<Shard, Text> output, Reporter reporter)
  67.       throws IOException {
  68.     LOG.info("Construct a shard writer for " + key);
  69.     FileSystem fs = FileSystem.get(iconf.getConfiguration());
  70.     String temp =
  71.         mapredTempDir + Path.SEPARATOR + "shard_" + System.currentTimeMillis();
  72.     final ShardWriter writer = new ShardWriter(fs, key, temp, iconf);
  73.     // update the shard
  74.     while (values.hasNext()) {
  75.       IntermediateForm form = values.next();
  76.       writer.process(form);
  77.       reporter.progress();
  78.     }
  79.     // close the shard
  80.     final Reporter fReporter = reporter;
  81.     new Closeable() {
  82.       volatile boolean closed = false;
  83.       public void close() throws IOException {
  84.         // spawn a thread to give progress heartbeats
  85.         Thread prog = new Thread() {
  86.           public void run() {
  87.             while (!closed) {
  88.               try {
  89.                 fReporter.setStatus("closing");
  90.                 Thread.sleep(1000);
  91.               } catch (InterruptedException e) {
  92.                 continue;
  93.               } catch (Throwable e) {
  94.                 return;
  95.               }
  96.             }
  97.           }
  98.         };
  99.         try {
  100.           prog.start();
  101.           if (writer != null) {
  102.             writer.close();
  103.           }
  104.         } finally {
  105.           closed = true;
  106.         }
  107.       }
  108.     }.close();
  109.     LOG.info("Closed the shard writer for " + key + ", writer = " + writer);
  110.     output.collect(key, DONE);
  111.   }
  112.   /* (non-Javadoc)
  113.    * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
  114.    */
  115.   public void configure(JobConf job) {
  116.     iconf = new IndexUpdateConfiguration(job);
  117.     mapredTempDir = iconf.getMapredTempDir();
  118.     mapredTempDir = Shard.normalizePath(mapredTempDir);
  119.   }
  120.   /* (non-Javadoc)
  121.    * @see org.apache.hadoop.mapred.MapReduceBase#close()
  122.    */
  123.   public void close() throws IOException {
  124.   }
  125. }