ShardWriter.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.lucene;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.contrib.index.mapred.IndexUpdateConfiguration;
  24. import org.apache.hadoop.contrib.index.mapred.IntermediateForm;
  25. import org.apache.hadoop.contrib.index.mapred.Shard;
  26. import org.apache.hadoop.fs.FileStatus;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.fs.PathFilter;
  30. import org.apache.lucene.index.IndexWriter;
  31. import org.apache.lucene.index.KeepOnlyLastCommitDeletionPolicy;
  32. import org.apache.lucene.index.Term;
  33. import org.apache.lucene.store.Directory;
  34. /**
  35.  * The initial version of an index is stored in the perm dir. Index files
  36.  * created by newer versions are written to a temp dir on the local FS. After
  37.  * successfully creating the new version in the temp dir, the shard writer
  38.  * moves the new files to the perm dir and deletes the temp dir in close().
  39.  */
  40. public class ShardWriter {
  41.   static final Log LOG = LogFactory.getLog(ShardWriter.class);
  42.   private final FileSystem fs;
  43.   private final FileSystem localFs;
  44.   private final Path perm;
  45.   private final Path temp;
  46.   private final Directory dir;
  47.   private final IndexWriter writer;
  48.   private int maxNumSegments;
  49.   private long numForms = 0;
  50.   /**
  51.    * Constructor
  52.    * @param fs
  53.    * @param shard
  54.    * @param tempDir
  55.    * @param iconf
  56.    * @throws IOException
  57.    */
  58.   public ShardWriter(FileSystem fs, Shard shard, String tempDir,
  59.       IndexUpdateConfiguration iconf) throws IOException {
  60.     LOG.info("Construct a shard writer");
  61.     this.fs = fs;
  62.     localFs = FileSystem.getLocal(iconf.getConfiguration());
  63.     perm = new Path(shard.getDirectory());
  64.     temp = new Path(tempDir);
  65.     long initGeneration = shard.getGeneration();
  66.     if (!fs.exists(perm)) {
  67.       assert (initGeneration < 0);
  68.       fs.mkdirs(perm);
  69.     } else {
  70.       restoreGeneration(fs, perm, initGeneration);
  71.     }
  72.     dir =
  73.         new MixedDirectory(fs, perm, localFs, fs.startLocalOutput(perm, temp),
  74.             iconf.getConfiguration());
  75.     // analyzer is null because we only use addIndexes, not addDocument
  76.     writer =
  77.         new IndexWriter(dir, false, null,
  78.             initGeneration < 0 ? new KeepOnlyLastCommitDeletionPolicy()
  79.                 : new MixedDeletionPolicy());
  80.     setParameters(iconf);
  81.   }
  82.   /**
  83.    * Process an intermediate form by carrying out, on the Lucene instance of
  84.    * the shard, the deletes and the inserts (a ram index) in the form. 
  85.    * @param form  the intermediate form containing deletes and a ram index
  86.    * @throws IOException
  87.    */
  88.   public void process(IntermediateForm form) throws IOException {
  89.     // first delete
  90.     Iterator<Term> iter = form.deleteTermIterator();
  91.     while (iter.hasNext()) {
  92.       writer.deleteDocuments(iter.next());
  93.     }
  94.     // then insert
  95.     writer.addIndexesNoOptimize(new Directory[] { form.getDirectory() });
  96.     numForms++;
  97.   }
  98.   /**
  99.    * Close the shard writer. Optimize the Lucene instance of the shard before
  100.    * closing if necessary, and copy the files created in the temp directory
  101.    * to the permanent directory after closing.
  102.    * @throws IOException
  103.    */
  104.   public void close() throws IOException {
  105.     LOG.info("Closing the shard writer, processed " + numForms + " forms");
  106.     try {
  107.       try {
  108.         if (maxNumSegments > 0) {
  109.           writer.optimize(maxNumSegments);
  110.           LOG.info("Optimized the shard into at most " + maxNumSegments
  111.               + " segments");
  112.         }
  113.       } finally {
  114.         writer.close();
  115.         LOG.info("Closed Lucene index writer");
  116.       }
  117.       moveFromTempToPerm();
  118.       LOG.info("Moved new index files to " + perm);
  119.     } finally {
  120.       dir.close();
  121.       LOG.info("Closed the shard writer");
  122.     }
  123.   }
  124.   /* (non-Javadoc)
  125.    * @see java.lang.Object#toString()
  126.    */
  127.   public String toString() {
  128.     return this.getClass().getName() + "@" + perm + "&" + temp;
  129.   }
  130.   private void setParameters(IndexUpdateConfiguration iconf) {
  131.     int maxFieldLength = iconf.getIndexMaxFieldLength();
  132.     if (maxFieldLength > 0) {
  133.       writer.setMaxFieldLength(maxFieldLength);
  134.     }
  135.     writer.setUseCompoundFile(iconf.getIndexUseCompoundFile());
  136.     maxNumSegments = iconf.getIndexMaxNumSegments();
  137.     if (maxFieldLength > 0) {
  138.       LOG.info("sea.max.field.length = " + writer.getMaxFieldLength());
  139.     }
  140.     LOG.info("sea.use.compound.file = " + writer.getUseCompoundFile());
  141.     LOG.info("sea.max.num.segments = " + maxNumSegments);
  142.   }
  143.   // in case a previous reduce task fails, restore the generation to
  144.   // the original starting point by deleting the segments.gen file
  145.   // and the segments_N files whose generations are greater than the
  146.   // starting generation; rest of the unwanted files will be deleted
  147.   // once the unwanted segments_N files are deleted
  148.   private void restoreGeneration(FileSystem fs, Path perm, long startGen)
  149.       throws IOException {
  150.     FileStatus[] fileStatus = fs.listStatus(perm, new PathFilter() {
  151.       public boolean accept(Path path) {
  152.         return LuceneUtil.isSegmentsFile(path.getName());
  153.       }
  154.     });
  155.     // remove the segments_N files whose generation are greater than
  156.     // the starting generation
  157.     for (int i = 0; i < fileStatus.length; i++) {
  158.       Path path = fileStatus[i].getPath();
  159.       if (startGen < LuceneUtil.generationFromSegmentsFileName(path.getName())) {
  160.         fs.delete(path);
  161.       }
  162.     }
  163.     // always remove segments.gen in case last failed try removed segments_N
  164.     // but not segments.gen, and segments.gen will be overwritten anyway.
  165.     Path segmentsGenFile = new Path(LuceneUtil.IndexFileNames.SEGMENTS_GEN);
  166.     if (fs.exists(segmentsGenFile)) {
  167.       fs.delete(segmentsGenFile);
  168.     }
  169.   }
  170.   // move the files created in the temp dir into the perm dir
  171.   // and then delete the temp dir from the local FS
  172.   private void moveFromTempToPerm() throws IOException {
  173.     try {
  174.       FileStatus[] fileStatus =
  175.           localFs.listStatus(temp, LuceneIndexFileNameFilter.getFilter());
  176.       Path segmentsPath = null;
  177.       Path segmentsGenPath = null;
  178.       // move the files created in temp dir except segments_N and segments.gen
  179.       for (int i = 0; i < fileStatus.length; i++) {
  180.         Path path = fileStatus[i].getPath();
  181.         String name = path.getName();
  182.         if (LuceneUtil.isSegmentsGenFile(name)) {
  183.           assert (segmentsGenPath == null);
  184.           segmentsGenPath = path;
  185.         } else if (LuceneUtil.isSegmentsFile(name)) {
  186.           assert (segmentsPath == null);
  187.           segmentsPath = path;
  188.         } else {
  189.           fs.completeLocalOutput(new Path(perm, name), path);
  190.         }
  191.       }
  192.       // move the segments_N file
  193.       if (segmentsPath != null) {
  194.         fs.completeLocalOutput(new Path(perm, segmentsPath.getName()),
  195.             segmentsPath);
  196.       }
  197.       // move the segments.gen file
  198.       if (segmentsGenPath != null) {
  199.         fs.completeLocalOutput(new Path(perm, segmentsGenPath.getName()),
  200.             segmentsGenPath);
  201.       }
  202.     } finally {
  203.       // finally delete the temp dir (files should have been deleted)
  204.       localFs.delete(temp);
  205.     }
  206.   }
  207. }