TestDistributionPolicy.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.mapred;
  19. import java.io.File;
  20. import java.io.IOException;
  21. import java.text.NumberFormat;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.contrib.index.example.HashingDistributionPolicy;
  24. import org.apache.hadoop.contrib.index.example.RoundRobinDistributionPolicy;
  25. import org.apache.hadoop.contrib.index.lucene.FileSystemDirectory;
  26. import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.mapred.MiniMRCluster;
  29. import org.apache.lucene.document.Document;
  30. import org.apache.lucene.index.IndexReader;
  31. import org.apache.lucene.index.MultiReader;
  32. import org.apache.lucene.index.Term;
  33. import org.apache.lucene.search.Hits;
  34. import org.apache.lucene.search.IndexSearcher;
  35. import org.apache.lucene.search.TermQuery;
  36. import org.apache.lucene.store.Directory;
  37. import junit.framework.TestCase;
  38. public class TestDistributionPolicy extends TestCase {
  39.   private static final NumberFormat NUMBER_FORMAT = NumberFormat.getInstance();
  40.   static {
  41.     NUMBER_FORMAT.setMinimumIntegerDigits(5);
  42.     NUMBER_FORMAT.setGroupingUsed(false);
  43.   }
  44.   // however, "we only allow 0 or 1 reducer in local mode" - from
  45.   // LocalJobRunner
  46.   private Configuration conf;
  47.   private Path localInputPath = new Path(System.getProperty("build.test") + "/sample/data.txt");
  48.   private Path localUpdatePath =
  49.       new Path(System.getProperty("build.test") + "/sample/data2.txt");
  50.   private Path inputPath = new Path("/myexample/data.txt");
  51.   private Path updatePath = new Path("/myexample/data2.txt");
  52.   private Path outputPath = new Path("/myoutput");
  53.   private Path indexPath = new Path("/myindex");
  54.   private int numShards = 3;
  55.   private int numMapTasks = 5;
  56.   private int numDataNodes = 3;
  57.   private int numTaskTrackers = 3;
  58.   private int numDocsPerRun = 10; // num of docs in local input path
  59.   private FileSystem fs;
  60.   private MiniDFSCluster dfsCluster;
  61.   private MiniMRCluster mrCluster;
  62.   public TestDistributionPolicy() throws IOException {
  63.     super();
  64.     if (System.getProperty("hadoop.log.dir") == null) {
  65.       String base = new File(".").getPath(); // getAbsolutePath();
  66.       System.setProperty("hadoop.log.dir", new Path(base).toString() + "/logs");
  67.     }
  68.     conf = new Configuration();
  69.   }
  70.   protected void setUp() throws Exception {
  71.     super.setUp();
  72.     try {
  73.       dfsCluster =
  74.           new MiniDFSCluster(conf, numDataNodes, true, (String[]) null);
  75.       fs = dfsCluster.getFileSystem();
  76.       if (fs.exists(inputPath)) {
  77.         fs.delete(inputPath);
  78.       }
  79.       fs.copyFromLocalFile(localInputPath, inputPath);
  80.       if (fs.exists(updatePath)) {
  81.         fs.delete(updatePath);
  82.       }
  83.       fs.copyFromLocalFile(localUpdatePath, updatePath);
  84.       if (fs.exists(outputPath)) {
  85.         // do not create, mapred will create
  86.         fs.delete(outputPath);
  87.       }
  88.       if (fs.exists(indexPath)) {
  89.         fs.delete(indexPath);
  90.       }
  91.       mrCluster =
  92.           new MiniMRCluster(numTaskTrackers, fs.getUri().toString(), 1);
  93.     } catch (IOException e) {
  94.       if (dfsCluster != null) {
  95.         dfsCluster.shutdown();
  96.         dfsCluster = null;
  97.       }
  98.       if (fs != null) {
  99.         fs.close();
  100.         fs = null;
  101.       }
  102.       if (mrCluster != null) {
  103.         mrCluster.shutdown();
  104.         mrCluster = null;
  105.       }
  106.       throw e;
  107.     }
  108.   }
  109.   protected void tearDown() throws Exception {
  110.     if (dfsCluster != null) {
  111.       dfsCluster.shutdown();
  112.       dfsCluster = null;
  113.     }
  114.     if (fs != null) {
  115.       fs.close();
  116.       fs = null;
  117.     }
  118.     if (mrCluster != null) {
  119.       mrCluster.shutdown();
  120.       mrCluster = null;
  121.     }
  122.     super.tearDown();
  123.   }
  124.   public void testDistributionPolicy() throws IOException {
  125.     IndexUpdateConfiguration iconf = new IndexUpdateConfiguration(conf);
  126.     // test hashing distribution policy
  127.     iconf.setDistributionPolicyClass(HashingDistributionPolicy.class);
  128.     onetest();
  129.     if (fs.exists(indexPath)) {
  130.       fs.delete(indexPath);
  131.     }
  132.     // test round-robin distribution policy
  133.     iconf.setDistributionPolicyClass(RoundRobinDistributionPolicy.class);
  134.     onetest();
  135.   }
  136.   private void onetest() throws IOException {
  137.     long versionNumber = -1;
  138.     long generation = -1;
  139.     Shard[] shards = new Shard[numShards];
  140.     for (int j = 0; j < shards.length; j++) {
  141.       shards[j] =
  142.           new Shard(versionNumber,
  143.               new Path(indexPath, NUMBER_FORMAT.format(j)).toString(),
  144.               generation);
  145.     }
  146.     if (fs.exists(outputPath)) {
  147.       fs.delete(outputPath);
  148.     }
  149.     IIndexUpdater updater = new IndexUpdater();
  150.     updater.run(conf, new Path[] { inputPath }, outputPath, numMapTasks,
  151.         shards);
  152.     if (fs.exists(outputPath)) {
  153.       fs.delete(outputPath);
  154.     }
  155.     // delete docs w/ even docids, update docs w/ odd docids
  156.     updater.run(conf, new Path[] { updatePath }, outputPath, numMapTasks,
  157.         shards);
  158.     verify(shards);
  159.   }
  160.   private void verify(Shard[] shards) throws IOException {
  161.     // verify the index
  162.     IndexReader[] readers = new IndexReader[shards.length];
  163.     for (int i = 0; i < shards.length; i++) {
  164.       Directory dir =
  165.           new FileSystemDirectory(fs, new Path(shards[i].getDirectory()),
  166.               false, conf);
  167.       readers[i] = IndexReader.open(dir);
  168.     }
  169.     IndexReader reader = new MultiReader(readers);
  170.     IndexSearcher searcher = new IndexSearcher(reader);
  171.     Hits hits = searcher.search(new TermQuery(new Term("content", "apache")));
  172.     assertEquals(0, hits.length());
  173.     hits = searcher.search(new TermQuery(new Term("content", "hadoop")));
  174.     assertEquals(numDocsPerRun / 2, hits.length());
  175.     int[] counts = new int[numDocsPerRun];
  176.     for (int i = 0; i < hits.length(); i++) {
  177.       Document doc = hits.doc(i);
  178.       counts[Integer.parseInt(doc.get("id"))]++;
  179.     }
  180.     for (int i = 0; i < numDocsPerRun; i++) {
  181.       if (i % 2 == 0) {
  182.         assertEquals(0, counts[i]);
  183.       } else {
  184.         assertEquals(1, counts[i]);
  185.       }
  186.     }
  187.     searcher.close();
  188.     reader.close();
  189.   }
  190. }