IndexUpdater.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.mapred;
  19. import java.io.IOException;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
  24. import org.apache.hadoop.contrib.index.lucene.LuceneUtil;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.mapred.FileInputFormat;
  28. import org.apache.hadoop.mapred.FileOutputFormat;
  29. import org.apache.hadoop.mapred.JobClient;
  30. import org.apache.hadoop.mapred.JobConf;
  31. /**
  32.  * An implementation of an index updater interface which creates a Map/Reduce
  33.  * job configuration and run the Map/Reduce job to analyze documents and update
  34.  * Lucene instances in parallel.
  35.  */
  36. public class IndexUpdater implements IIndexUpdater {
  37.   public static final Log LOG = LogFactory.getLog(IndexUpdater.class);
  38.   public IndexUpdater() {
  39.   }
  40.   /* (non-Javadoc)
  41.    * @see org.apache.hadoop.contrib.index.mapred.IIndexUpdater#run(org.apache.hadoop.conf.Configuration, org.apache.hadoop.fs.Path[], org.apache.hadoop.fs.Path, int, org.apache.hadoop.contrib.index.mapred.Shard[])
  42.    */
  43.   public void run(Configuration conf, Path[] inputPaths, Path outputPath,
  44.       int numMapTasks, Shard[] shards) throws IOException {
  45.     JobConf jobConf =
  46.         createJob(conf, inputPaths, outputPath, numMapTasks, shards);
  47.     JobClient.runJob(jobConf);
  48.   }
  49.   JobConf createJob(Configuration conf, Path[] inputPaths, Path outputPath,
  50.       int numMapTasks, Shard[] shards) throws IOException {
  51.     // set the starting generation for each shard
  52.     // when a reduce task fails, a new reduce task
  53.     // has to know where to re-start
  54.     setShardGeneration(conf, shards);
  55.     // iconf.set sets properties in conf
  56.     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
  57.     Shard.setIndexShards(iconf, shards);
  58.     // MapTask.MapOutputBuffer uses "io.sort.mb" to decide its max buffer size
  59.     // (max buffer size = 1/2 * "io.sort.mb").
  60.     // Here we half-en "io.sort.mb" because we use the other half memory to
  61.     // build an intermediate form/index in Combiner.
  62.     iconf.setIOSortMB(iconf.getIOSortMB() / 2);
  63.     // create the job configuration
  64.     JobConf jobConf = new JobConf(conf, IndexUpdater.class);
  65.     jobConf.setJobName(this.getClass().getName() + "_"
  66.         + System.currentTimeMillis());
  67.     // provided by application
  68.     FileInputFormat.setInputPaths(jobConf, inputPaths);
  69.     FileOutputFormat.setOutputPath(jobConf, outputPath);
  70.     jobConf.setNumMapTasks(numMapTasks);
  71.     // already set shards
  72.     jobConf.setNumReduceTasks(shards.length);
  73.     jobConf.setInputFormat(iconf.getIndexInputFormatClass());
  74.     Path[] inputs = FileInputFormat.getInputPaths(jobConf);
  75.     StringBuilder buffer = new StringBuilder(inputs[0].toString());
  76.     for (int i = 1; i < inputs.length; i++) {
  77.       buffer.append(",");
  78.       buffer.append(inputs[i].toString());
  79.     }
  80.     LOG.info("mapred.input.dir = " + buffer.toString());
  81.     LOG.info("mapred.output.dir = " + 
  82.              FileOutputFormat.getOutputPath(jobConf).toString());
  83.     LOG.info("mapred.map.tasks = " + jobConf.getNumMapTasks());
  84.     LOG.info("mapred.reduce.tasks = " + jobConf.getNumReduceTasks());
  85.     LOG.info(shards.length + " shards = " + iconf.getIndexShards());
  86.     // better if we don't create the input format instance
  87.     LOG.info("mapred.input.format.class = "
  88.         + jobConf.getInputFormat().getClass().getName());
  89.     // set by the system
  90.     jobConf.setMapOutputKeyClass(IndexUpdateMapper.getMapOutputKeyClass());
  91.     jobConf.setMapOutputValueClass(IndexUpdateMapper.getMapOutputValueClass());
  92.     jobConf.setOutputKeyClass(IndexUpdateReducer.getOutputKeyClass());
  93.     jobConf.setOutputValueClass(IndexUpdateReducer.getOutputValueClass());
  94.     jobConf.setMapperClass(IndexUpdateMapper.class);
  95.     jobConf.setPartitionerClass(IndexUpdatePartitioner.class);
  96.     jobConf.setCombinerClass(IndexUpdateCombiner.class);
  97.     jobConf.setReducerClass(IndexUpdateReducer.class);
  98.     jobConf.setOutputFormat(IndexUpdateOutputFormat.class);
  99.     return jobConf;
  100.   }
  101.   void setShardGeneration(Configuration conf, Shard[] shards)
  102.       throws IOException {
  103.     FileSystem fs = FileSystem.get(conf);
  104.     for (int i = 0; i < shards.length; i++) {
  105.       Path path = new Path(shards[i].getDirectory());
  106.       long generation = -1;
  107.       if (fs.exists(path)) {
  108.         FileSystemDirectory dir = null;
  109.         try {
  110.           dir = new FileSystemDirectory(fs, path, false, conf);
  111.           generation = LuceneUtil.getCurrentSegmentGeneration(dir);
  112.         } finally {
  113.           if (dir != null) {
  114.             dir.close();
  115.           }
  116.         }
  117.       }
  118.       if (generation != shards[i].getGeneration()) {
  119.         // set the starting generation for the shard
  120.         shards[i] =
  121.             new Shard(shards[i].getVersion(), shards[i].getDirectory(),
  122.                 generation);
  123.       }
  124.     }
  125.   }
  126. }