TestIndexUpdater.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.text.NumberFormat;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
  24. import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileStatus;
  25. import org.apache.hadoop.fs.FileSystem;
  26. import org.apache.hadoop.fs.Path;
  27. import org.apache.hadoop.fs.PathFilter;
  28. import org.apache.hadoop.mapred.MiniMRCluster;
  29. import org.apache.lucene.document.Document;
  30. import org.apache.lucene.index.IndexReader;
  31. import org.apache.lucene.index.IndexWriter;
  32. import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
  33. import org.apache.lucene.index.MultiReader;
  34. import org.apache.lucene.index.Term;
  35. import org.apache.lucene.search.Hits;
  36. import org.apache.lucene.search.IndexSearcher;
  37. import org.apache.lucene.search.TermQuery;
  38. import org.apache.lucene.store.Directory;
  39. import junit.framework.TestCase;
  40. public class TestIndexUpdater extends TestCase {
  41.   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  42.   static {
  43.     NUMBER_FORMAT.setMinimumIntegerDigits(5);
  44.     NUMBER_FORMAT.setGroupingUsed(false);
  45.   }
  46.   // however, "we only allow 0 or 1 reducer in local mode" - from
  47.   // LocalJobRunner
  48.   private Configuration conf;
  49.   private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
  50.   private Path inputPath = new Path("/myexample/data.txt");
  51.   private Path outputPath = new Path("/myoutput");
  52.   private Path indexPath = new Path("/myindex");
  53.   private int initNumShards = 3;
  54.   private int numMapTasks = 5;
  55.   private int numDataNodes = 3;
  56.   private int numTaskTrackers = 3;
  57.   private int numRuns = 3;
  58.   private int numDocsPerRun = 10; // num of docs in local input path
  59.   private FileSystem fs;
  60.   private MiniDFSCluster dfsCluster;
  61.   private MiniMRCluster mrCluster;
  62.   public TestIndexUpdater() throws IOException {
  63.     super();
  64.     if (System.getProperty("hadoop.log.dir") == null) {
  65.       String base = new File(".").getPath(); // getAbsolutePath();
  66.       System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
  67.     }
  68.     conf = new Configuration();
  69.   }
  70.   protected void setUp() throws Exception {
  71.     super.setUp();
  72.     try {
  73.       dfsCluster =
  74.           new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
  75.       fs = dfsCluster.getFileSystem();
  76.       if (fs.exists(inputPath)) {
  77.         fs.delete(inputPath);
  78.       }
  79.       fs.copyFromLocalFile(localInputPath, inputPath);
  80.       if (fs.exists(outputPath)) {
  81.         // do not create, mapred will create
  82.         fs.delete(outputPath);
  83.       }
  84.       if (fs.exists(indexPath)) {
  85.         fs.delete(indexPath);
  86.       }
  87.       mrCluster =
  88.           new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
  89.     } catch (IOException e) {
  90.       if (dfsCluster != null) {
  91.         dfsCluster.shutdown();
  92.         dfsCluster = null;
  93.       }
  94.       if (fs != null) {
  95.         fs.close();
  96.         fs = null;
  97.       }
  98.       if (mrCluster != null) {
  99.         mrCluster.shutdown();
  100.         mrCluster = null;
  101.       }
  102.       throw e;
  103.     }
  104.   }
  105.   protected void tearDown() throws Exception {
  106.     if (dfsCluster != null) {
  107.       dfsCluster.shutdown();
  108.       dfsCluster = null;
  109.     }
  110.     if (fs != null) {
  111.       fs.close();
  112.       fs = null;
  113.     }
  114.     if (mrCluster != null) {
  115.       mrCluster.shutdown();
  116.       mrCluster = null;
  117.     }
  118.     super.tearDown();
  119.   }
  120.   public void testIndexUpdater() throws IOException {
  121.     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
  122.     // max field length, compound file and number of segments will be checked
  123.     // later
  124.     iconf.setIndexMaxFieldLength(2);
  125.     iconf.setIndexUseCompoundFile(true);
  126.     iconf.setIndexMaxNumSegments(1);
  127.     long versionNumber = -1;
  128.     long generation = -1;
  129.     for (int i = 0; i < numRuns; i++) {
  130.       if (fs.exists(outputPath)) {
  131.         fs.delete(outputPath);
  132.       }
  133.       Shard[] shards = new Shard[initNumShards + i];
  134.       for (int j = 0; j < shards.length; j++) {
  135.         shards[j] =
  136.             new Shard(versionNumber, new Path(indexPath,
  137.                 NUMBER_FORMAT.format(j)).toString(), generation);
  138.       }
  139.       run(i + 1, shards);
  140.     }
  141.   }
  142.   private void run(int numRuns, Shard[] shards) throws IOException {
  143.     IIndexUpdater updater = new IndexUpdater();
  144.     updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
  145.         shards);
  146.     // verify the done files
  147.     Path[] doneFileNames = new Path[shards.length];
  148.     int count = 0;
  149.     FileStatus[] fileStatus = fs.listStatus(outputPath);
  150.     for (int i = 0; i < fileStatus.length; i++) {
  151.       FileStatus[] doneFiles = fs.listStatus(fileStatus[i].getPath());
  152.       for (int j = 0; j < doneFiles.length; j++) {
  153.         doneFileNames[count++] = doneFiles[j].getPath();
  154.       }
  155.     }
  156.     assertEquals(shards.length, count);
  157.     for (int i = 0; i < count; i++) {
  158.       assertTrue(doneFileNames[i].getName().startsWith(
  159.           IndexUpdateReducer.DONE.toString()));
  160.     }
  161.     // verify the index
  162.     IndexReader[] readers = new IndexReader[shards.length];
  163.     for (int i = 0; i < shards.length; i++) {
  164.       Directory dir =
  165.           new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
  166.               false, conf);
  167.       readers[i] = IndexReader.open(dir);
  168.     }
  169.     IndexReader reader = new MultiReader(readers);
  170.     IndexSearcher searcher = new IndexSearcher(reader);
  171.     Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
  172.     assertEquals(numRuns * numDocsPerRun, hits.length());
  173.     int[] counts = new int[numDocsPerRun];
  174.     for (int i = 0; i < hits.length(); i++) {
  175.       Document doc = hits.doc(i);
  176.       counts[Integer.parseInt(doc.get("id"))]++;
  177.     }
  178.     for (int i = 0; i < numDocsPerRun; i++) {
  179.       assertEquals(numRuns, counts[i]);
  180.     }
  181.     // max field length is 2, so "dot" is also indexed but not "org"
  182.     hits = searcher.search(new TermQuery(new Term("content", "dot")));
  183.     assertEquals(numRuns, hits.length());
  184.     hits = searcher.search(new TermQuery(new Term("content", "org")));
  185.     assertEquals(0, hits.length());
  186.     searcher.close();
  187.     reader.close();
  188.     // open and close an index writer with KeepOnlyLastCommitDeletionPolicy
  189.     // to remove earlier checkpoints
  190.     for (int i = 0; i < shards.length; i++) {
  191.       Directory dir =
  192.           new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
  193.               false, conf);
  194.       IndexWriter writer =
  195.           new IndexWriter(dir, false, null,
  196.               new KeepOnlyLastCommitDeletionPolicy());
  197.       writer.close();
  198.     }
  199.     // verify the number of segments, must be done after an writer with
  200.     // KeepOnlyLastCommitDeletionPolicy so that earlier checkpoints are removed
  201.     for (int i = 0; i < shards.length; i++) {
  202.       PathFilter cfsFilter = new PathFilter() {
  203.         public boolean accept(Path path) {
  204.           return path.getName().endsWith(".cfs");
  205.         }
  206.       };
  207.       FileStatus[] cfsFiles =
  208.           fs.listStatus(new Path(shards[i].getDirectory()), cfsFilter);
  209.       assertEquals(1, cfsFiles.length);
  210.     }
  211.   }
  212. }