IndexUpdateMapper.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:7k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.index.mapred;
  19. import java.io.IOException;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;
  22. import org.apache.hadoop.io.Writable;
  23. import org.apache.hadoop.io.WritableComparable;
  24. import org.apache.hadoop.mapred.JobConf;
  25. import org.apache.hadoop.mapred.MapReduceBase;
  26. import org.apache.hadoop.mapred.Mapper;
  27. import org.apache.hadoop.mapred.OutputCollector;
  28. import org.apache.hadoop.mapred.Reporter;
  29. import org.apache.hadoop.util.ReflectionUtils;
  30. import org.apache.lucene.analysis.Analyzer;
  31. /**
  32.  * This class applies local analysis on a key-value pair and then convert the
  33.  * result docid-operation pair to a shard-and-intermediate form pair.
  34.  */
  35. public class IndexUpdateMapper<K extends WritableComparable, V extends Writable>
  36.     extends MapReduceBase implements Mapper<K, V, Shard, IntermediateForm> {
  37.   static final Log LOG = LogFactory.getLog(IndexUpdateMapper.class);
  38.   /**
  39.    * Get the map output key class.
  40.    * @return the map output key class
  41.    */
  42.   public static Class<? extends WritableComparable> getMapOutputKeyClass() {
  43.     return Shard.class;
  44.   }
  45.   /**
  46.    * Get the map output value class.
  47.    * @return the map output value class
  48.    */
  49.   public static Class<? extends Writable> getMapOutputValueClass() {
  50.     return IntermediateForm.class;
  51.   }
  52.   IndexUpdateConfiguration iconf;
  53.   private Analyzer analyzer;
  54.   private Shard[] shards;
  55.   private IDistributionPolicy distributionPolicy;
  56.   private ILocalAnalysis<K, V> localAnalysis;
  57.   private DocumentID tmpKey;
  58.   private DocumentAndOp tmpValue;
  59.   private OutputCollector<DocumentID, DocumentAndOp> tmpCollector =
  60.       new OutputCollector<DocumentID, DocumentAndOp>() {
  61.         public void collect(DocumentID key, DocumentAndOp value)
  62.             throws IOException {
  63.           tmpKey = key;
  64.           tmpValue = value;
  65.         }
  66.       };
  67.   /**
  68.    * Map a key-value pair to a shard-and-intermediate form pair. Internally,
  69.    * the local analysis is first applied to map the key-value pair to a
  70.    * document id-and-operation pair, then the docid-and-operation pair is
  71.    * mapped to a shard-intermediate form pair. The intermediate form is of the
  72.    * form of a single-document ram index and/or a single delete term.
  73.    */
  74.   public void map(K key, V value,
  75.       OutputCollector<Shard, IntermediateForm> output, Reporter reporter)
  76.       throws IOException {
  77.     synchronized (this) {
  78.       localAnalysis.map(key, value, tmpCollector, reporter);
  79.       if (tmpKey != null && tmpValue != null) {
  80.         DocumentAndOp doc = tmpValue;
  81.         IntermediateForm form = new IntermediateForm();
  82.         form.configure(iconf);
  83.         form.process(doc, analyzer);
  84.         form.closeWriter();
  85.         if (doc.getOp() == DocumentAndOp.Op.INSERT) {
  86.           int chosenShard = distributionPolicy.chooseShardForInsert(tmpKey);
  87.           if (chosenShard >= 0) {
  88.             // insert into one shard
  89.             output.collect(shards[chosenShard], form);
  90.           } else {
  91.             throw new IOException("Chosen shard for insert must be >= 0");
  92.           }
  93.         } else if (doc.getOp() == DocumentAndOp.Op.DELETE) {
  94.           int chosenShard = distributionPolicy.chooseShardForDelete(tmpKey);
  95.           if (chosenShard >= 0) {
  96.             // delete from one shard
  97.             output.collect(shards[chosenShard], form);
  98.           } else {
  99.             // broadcast delete to all shards
  100.             for (int i = 0; i < shards.length; i++) {
  101.               output.collect(shards[i], form);
  102.             }
  103.           }
  104.         } else { // UPDATE
  105.           int insertToShard = distributionPolicy.chooseShardForInsert(tmpKey);
  106.           int deleteFromShard =
  107.               distributionPolicy.chooseShardForDelete(tmpKey);
  108.           if (insertToShard >= 0) {
  109.             if (insertToShard == deleteFromShard) {
  110.               // update into one shard
  111.               output.collect(shards[insertToShard], form);
  112.             } else {
  113.               // prepare a deletion form
  114.               IntermediateForm deletionForm = new IntermediateForm();
  115.               deletionForm.configure(iconf);
  116.               deletionForm.process(new DocumentAndOp(DocumentAndOp.Op.DELETE,
  117.                   doc.getTerm()), analyzer);
  118.               deletionForm.closeWriter();
  119.               if (deleteFromShard >= 0) {
  120.                 // delete from one shard
  121.                 output.collect(shards[deleteFromShard], deletionForm);
  122.               } else {
  123.                 // broadcast delete to all shards
  124.                 for (int i = 0; i < shards.length; i++) {
  125.                   output.collect(shards[i], deletionForm);
  126.                 }
  127.               }
  128.               // prepare an insertion form
  129.               IntermediateForm insertionForm = new IntermediateForm();
  130.               insertionForm.configure(iconf);
  131.               insertionForm.process(new DocumentAndOp(DocumentAndOp.Op.INSERT,
  132.                   doc.getDocument()), analyzer);
  133.               insertionForm.closeWriter();
  134.               // insert into one shard
  135.               output.collect(shards[insertToShard], insertionForm);
  136.             }
  137.           } else {
  138.             throw new IOException("Chosen shard for insert must be >= 0");
  139.           }
  140.         }
  141.       }
  142.     }
  143.   }
  144.   /* (non-Javadoc)
  145.    * @see org.apache.hadoop.mapred.MapReduceBase#configure(org.apache.hadoop.mapred.JobConf)
  146.    */
  147.   public void configure(JobConf job) {
  148.     iconf = new IndexUpdateConfiguration(job);
  149.     analyzer =
  150.         (Analyzer) ReflectionUtils.newInstance(
  151.             iconf.getDocumentAnalyzerClass(), job);
  152.     localAnalysis =
  153.         (ILocalAnalysis) ReflectionUtils.newInstance(
  154.             iconf.getLocalAnalysisClass(), job);
  155.     localAnalysis.configure(job);
  156.     shards = Shard.getIndexShards(iconf);
  157.     distributionPolicy =
  158.         (IDistributionPolicy) ReflectionUtils.newInstance(
  159.             iconf.getDistributionPolicyClass(), job);
  160.     distributionPolicy.init(shards);
  161.     LOG.info("sea.document.analyzer = " + analyzer.getClass().getName());
  162.     LOG.info("sea.local.analysis = " + localAnalysis.getClass().getName());
  163.     LOG.info(shards.length + " shards = " + iconf.getIndexShards());
  164.     LOG.info("sea.distribution.policy = "
  165.         + distributionPolicy.getClass().getName());
  166.   }
  167.   /* (non-Javadoc)
  168.    * @see org.apache.hadoop.mapred.MapReduceBase#close()
  169.    */
  170.   public void close() throws IOException {
  171.     localAnalysis.close();
  172.   }
  173. }