UpdateIndex.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:10k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.main;
  19. import java.io.IOException;
  20. import java.text.NumberFormat;
  21. import java.util.Arrays;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
  26. import org.apache.hadoop.contrib.index.mapred.IIndexUpdater;
  27. import org.apache.hadoop.contrib.index.mapred.Shard;
  28. import org.apache.hadoop.fs.FileStatus;
  29. import org.apache.hadoop.fs.FileSystem;
  30. import org.apache.hadoop.fs.Path;
  31. import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobConf;
  32. import org.apache.hadoop.mapred.FileInputFormat;
  33. import org.apache.hadoop.util.ReflectionUtils;
  34. /**
  35.  * A distributed "index" is partitioned into "shards". Each shard corresponds
  36.  * to a Lucene instance. This class contains the main() method which uses a
  37.  * Map/Reduce job to analyze documents and update Lucene instances in parallel.
  38.  * 
  39.  * The main() method in UpdateIndex requires the following information for
  40.  * updating the shards:
  41.  *   - Input formatter. This specifies how to format the input documents.
  42.  *   - Analysis. This defines the analyzer to use on the input. The analyzer
  43.  *     determines whether a document is being inserted, updated, or deleted.
  44.  *     For inserts or updates, the analyzer also converts each input document
  45.  *     into a Lucene document.
  46.  *   - Input paths. This provides the location(s) of updated documents,
  47.  *     e.g., HDFS files or directories, or HBase tables.
  48.  *   - Shard paths, or index path with the number of shards. Either specify
  49.  *     the path for each shard, or specify an index path and the shards are
  50.  *     the sub-directories of the index directory.
  51.  *   - Output path. When the update to a shard is done, a message is put here.
  52.  *   - Number of map tasks.
  53.  *
  54.  * All of the information can be specified in a configuration file. All but
  55.  * the first two can also be specified as command line options. Check out
  56.  * conf/index-config.xml.template for other configurable parameters.
  57.  *
  58.  * Note: Because of the parallel nature of Map/Reduce, the behaviour of
  59.  * multiple inserts, deletes or updates to the same document is undefined.
  60.  */
  61. public class UpdateIndex {
  62.   public static final Log LOG = LogFactory.getLog(UpdateIndex.class);
  63.   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  64.   static {
  65.     NUMBER_FORMAT.setMinimumIntegerDigits(5);
  66.     NUMBER_FORMAT.setGroupingUsed(false);
  67.   }
  68.   private static long now() {
  69.     return System.currentTimeMillis();
  70.   }
  71.   private static void printUsage(String cmd) {
  72.     System.err.println("Usage: java " + UpdateIndex.class.getName() + "n"
  73.         + "                        -inputPaths <inputPath,inputPath>n"
  74.         + "                        -outputPath <outputPath>n"
  75.         + "                        -shards     <shardDir,shardDir>n"
  76.         + "                        -indexPath  <indexPath>n"
  77.         + "                        -numShards  <num>n"
  78.         + "                        -numMapTasks <num>n"
  79.         + "                        -conf       <confPath>n"
  80.         + "Note: Do not use both -shards option and -indexPath option.");
  81.   }
  82.   private static String getIndexPath(Configuration conf) {
  83.     return conf.get("sea.index.path");
  84.   }
  85.   private static int getNumShards(Configuration conf) {
  86.     return conf.getInt("sea.num.shards", 1);
  87.   }
  88.   private static Shard[] createShards(String indexPath, int numShards,
  89.       Configuration conf) throws IOException {
  90.     String parent = Shard.normalizePath(indexPath) + Path.SEPARATOR;
  91.     long versionNumber = -1;
  92.     long generation = -1;
  93.     FileSystem fs = FileSystem.get(conf);
  94.     Path path = new Path(indexPath);
  95.     if (fs.exists(path)) {
  96.       FileStatus[] fileStatus = fs.listStatus(path);
  97.       String[] shardNames = new String[fileStatus.length];
  98.       int count = 0;
  99.       for (int i = 0; i < fileStatus.length; i++) {
  100.         if (fileStatus[i].isDir()) {
  101.           shardNames[count] = fileStatus[i].getPath().getName();
  102.           count++;
  103.         }
  104.       }
  105.       Arrays.sort(shardNames, 0, count);
  106.       Shard[] shards = new Shard[count >= numShards ? count : numShards];
  107.       for (int i = 0; i < count; i++) {
  108.         shards[i] =
  109.             new Shard(versionNumber, parent + shardNames[i], generation);
  110.       }
  111.       int number = count;
  112.       for (int i = count; i < numShards; i++) {
  113.         String shardPath;
  114.         while (true) {
  115.           shardPath = parent + NUMBER_FORMAT.format(number++);
  116.           if (!fs.exists(new Path(shardPath))) {
  117.             break;
  118.           }
  119.         }
  120.         shards[i] = new Shard(versionNumber, shardPath, generation);
  121.       }
  122.       return shards;
  123.     } else {
  124.       Shard[] shards = new Shard[numShards];
  125.       for (int i = 0; i < shards.length; i++) {
  126.         shards[i] =
  127.             new Shard(versionNumber, parent + NUMBER_FORMAT.format(i),
  128.                 generation);
  129.       }
  130.       return shards;
  131.     }
  132.   }
  133.   /**
  134.    * The main() method
  135.    * @param argv
  136.    */
  137.   public static void main(String[] argv) {
  138.     if (argv.length == 0) {
  139.       printUsage("");
  140.       System.exit(-1);
  141.     }
  142.     String inputPathsString = null;
  143.     Path outputPath = null;
  144.     String shardsString = null;
  145.     String indexPath = null;
  146.     int numShards = -1;
  147.     int numMapTasks = -1;
  148.     Configuration conf = new Configuration();
  149.     String confPath = null;
  150.     // parse the command line
  151.     for (int i = 0; i < argv.length; i++) { // parse command line
  152.       if (argv[i].equals("-inputPaths")) {
  153.         inputPathsString = argv[++i];
  154.       } else if (argv[i].equals("-outputPath")) {
  155.         outputPath = new Path(argv[++i]);
  156.       } else if (argv[i].equals("-shards")) {
  157.         shardsString = argv[++i];
  158.       } else if (argv[i].equals("-indexPath")) {
  159.         indexPath = argv[++i];
  160.       } else if (argv[i].equals("-numShards")) {
  161.         numShards = Integer.parseInt(argv[++i]);
  162.       } else if (argv[i].equals("-numMapTasks")) {
  163.         numMapTasks = Integer.parseInt(argv[++i]);
  164.       } else if (argv[i].equals("-conf")) {
  165.         // add as a local FS resource
  166.         confPath = argv[++i];
  167.         conf.addResource(new Path(confPath));
  168.       } else {
  169.         System.out.println("Unknown option " + argv[i] + " w/ value "
  170.             + argv[++i]);
  171.       }
  172.     }
  173.     LOG.info("inputPaths = " + inputPathsString);
  174.     LOG.info("outputPath = " + outputPath);
  175.     LOG.info("shards     = " + shardsString);
  176.     LOG.info("indexPath  = " + indexPath);
  177.     LOG.info("numShards  = " + numShards);
  178.     LOG.info("numMapTasks= " + numMapTasks);
  179.     LOG.info("confPath   = " + confPath);
  180.     Path[] inputPaths = null;
  181.     Shard[] shards = null;
  182.     JobConf jobConf = new JobConf(conf);
  183.     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(jobConf);
  184.     if (inputPathsString != null) {
  185.       jobConf.set("mapred.input.dir", inputPathsString);
  186.     }
  187.     inputPaths = FileInputFormat.getInputPaths(jobConf);
  188.     if (inputPaths.length == 0) {
  189.       inputPaths = null;
  190.     }
  191.     if (outputPath == null) {
  192.       outputPath = FileOutputFormat.getOutputPath(jobConf);     }
  193.     if (inputPaths == null || outputPath == null) {
  194.       System.err.println("InputPaths and outputPath must be specified.");
  195.       printUsage("");
  196.       System.exit(-1);
  197.     }
  198.     if (shardsString != null) {
  199.       iconf.setIndexShards(shardsString);
  200.     }
  201.     shards = Shard.getIndexShards(iconf);
  202.     if (shards != null && shards.length == 0) {
  203.       shards = null;
  204.     }
  205.     if (indexPath == null) {
  206.       indexPath = getIndexPath(conf);
  207.     }
  208.     if (numShards <= 0) {
  209.       numShards = getNumShards(conf);
  210.     }
  211.     if (shards == null && indexPath == null) {
  212.       System.err.println("Either shards or indexPath must be specified.");
  213.       printUsage("");
  214.       System.exit(-1);
  215.     }
  216.     if (numMapTasks <= 0) {
  217.       numMapTasks = jobConf.getNumMapTasks();
  218.     }
  219.     try {
  220.       // create shards and set their directories if necessary
  221.       if (shards == null) {
  222.         shards = createShards(indexPath, numShards, conf);
  223.       }
  224.       long startTime = now();
  225.       try {
  226.         IIndexUpdater updater =
  227.             (IIndexUpdater) ReflectionUtils.newInstance(
  228.                 iconf.getIndexUpdaterClass(), conf);
  229.         LOG.info("sea.index.updater = "
  230.             + iconf.getIndexUpdaterClass().getName());
  231.         updater.run(conf, inputPaths, outputPath, numMapTasks, shards);
  232.         LOG.info("Index update job is done");
  233.       } finally {
  234.         long elapsedTime = now() - startTime;
  235.         LOG.info("Elapsed time is  " + (elapsedTime / 1000) + "s");
  236.         System.out.println("Elapsed time is " + (elapsedTime / 1000) + "s");
  237.       }
  238.     } catch (Exception e) {
  239.       e.printStackTrace(System.err);
  240.     }
  241.   }
  242. }