DistributedSearch.java
上传用户:cctqzzy
上传日期:2022-03-14
资源大小:12198k
文件大小:14k
源码类别:

搜索引擎

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one or more
  3.  * contributor license agreements.  See the NOTICE file distributed with
  4.  * this work for additional information regarding copyright ownership.
  5.  * The ASF licenses this file to You under the Apache License, Version 2.0
  6.  * (the "License"); you may not use this file except in compliance with
  7.  * the License.  You may obtain a copy of the License at
  8.  *
  9.  *     http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package chapter10;
  18. import org.apache.nutch.searcher;
  19. import java.net.InetSocketAddress;
  20. import java.io.*;
  21. import java.util.*;
  22. import java.lang.reflect.Method;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.nutch.parse.ParseData;
  26. import org.apache.nutch.parse.ParseText;
  27. import org.apache.nutch.crawl.Inlinks;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.ipc.RPC;
  30. import org.apache.hadoop.ipc.VersionedProtocol;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.fs.FileSystem;
  33. import org.apache.nutch.util.NutchConfiguration;
  34. /** Implements the search API over IPC connnections. */
  35. public class DistributedSearch {
  36.   public static final Log LOG = LogFactory.getLog(DistributedSearch.class);
  37.   private DistributedSearch() {}                  // no public ctor
  38.   /** The distributed search protocol. */
  39.   public static interface Protocol
  40.     extends Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks, VersionedProtocol {
  41.     /** The name of the segments searched by this node. */
  42.     String[] getSegmentNames();
  43.   }
  44.   /** The search server. */
  45.   public static class Server  {
  46.     private Server() {}
  47.     /** Runs a search server. */
  48.     public static void main(String[] args) throws Exception {
  49.       String usage = "DistributedSearch$Server <port> <index dir>";
  50.       if (args.length == 0 || args.length > 2) {
  51.         System.err.println(usage);
  52.         System.exit(-1);
  53.       }
  54.       int port = Integer.parseInt(args[0]);
  55.       Path directory = new Path(args[1]);
  56.       Configuration conf = NutchConfiguration.create();
  57.       org.apache.hadoop.ipc.Server server = getServer(conf, directory, port);
  58.       server.start();
  59.       server.join();
  60.     }
  61.     
  62.     static org.apache.hadoop.ipc.Server getServer(Configuration conf, Path directory, int port) throws IOException{
  63.       NutchBean bean = new NutchBean(conf, directory);
  64.       return RPC.getServer(bean, "0.0.0.0", port, 10, true, conf);
  65.     }
  66.   }
  67.   /** The search client. */
  68.   public static class Client extends Thread
  69.     implements Searcher, HitDetailer, HitSummarizer, HitContent, HitInlinks,
  70.                Runnable {
  71.     private InetSocketAddress[] defaultAddresses;
  72.     private boolean[] liveServer;
  73.     private HashMap segmentToAddress = new HashMap();
  74.     
  75.     private boolean running = true;
  76.     private Configuration conf;
  77.     /** Construct a client talking to servers listed in the named file.
  78.      * Each line in the file lists a server hostname and port, separated by
  79.      * whitespace. 
  80.      */
  81.     public Client(Path file, Configuration conf) throws IOException {
  82.       this(readConfig(file, conf), conf);
  83.     }
  84.     private static InetSocketAddress[] readConfig(Path path, Configuration conf)
  85.       throws IOException {
  86.       FileSystem fs = FileSystem.get(conf);
  87.       BufferedReader reader =
  88.         new BufferedReader(new InputStreamReader(fs.open(path)));
  89.       try {
  90.         ArrayList addrs = new ArrayList();
  91.         String line;
  92.         while ((line = reader.readLine()) != null) {
  93.           StringTokenizer tokens = new StringTokenizer(line);
  94.           if (tokens.hasMoreTokens()) {
  95.             String host = tokens.nextToken();
  96.             if (tokens.hasMoreTokens()) {
  97.               String port = tokens.nextToken();
  98.               addrs.add(new InetSocketAddress(host, Integer.parseInt(port)));
  99.               if (LOG.isInfoEnabled()) {
  100.                 LOG.info("Client adding server "  + host + ":" + port);
  101.               }
  102.             }
  103.           }
  104.         }
  105.         return (InetSocketAddress[])
  106.           addrs.toArray(new InetSocketAddress[addrs.size()]);
  107.       } finally {
  108.         reader.close();
  109.       }
  110.     }
  111.     /** Construct a client talking to the named servers. */
  112.     public Client(InetSocketAddress[] addresses, Configuration conf) throws IOException {
  113.       this.conf = conf;
  114.       this.defaultAddresses = addresses;
  115.       this.liveServer = new boolean[addresses.length];
  116.       updateSegments();
  117.       setDaemon(true);
  118.       start();
  119.     }
  120.     
  121.     private static final Method GET_SEGMENTS;
  122.     private static final Method SEARCH;
  123.     private static final Method DETAILS;
  124.     private static final Method SUMMARY;
  125.     static {
  126.       try {
  127.         GET_SEGMENTS = Protocol.class.getMethod
  128.           ("getSegmentNames", new Class[] {});
  129.         SEARCH = Protocol.class.getMethod
  130.           ("search", new Class[] { Query.class, Integer.TYPE, String.class,
  131.                                    String.class, Boolean.TYPE});
  132.         DETAILS = Protocol.class.getMethod
  133.           ("getDetails", new Class[] { Hit.class});
  134.         SUMMARY = Protocol.class.getMethod
  135.           ("getSummary", new Class[] { HitDetails.class, Query.class});
  136.       } catch (NoSuchMethodException e) {
  137.         throw new RuntimeException(e);
  138.       }
  139.     }
  140.     /** Updates segment names.
  141.      * 
  142.      * @throws IOException
  143.      */
  144.     public void updateSegments() throws IOException {
  145.       
  146.       int liveServers=0;
  147.       int liveSegments=0;
  148.       
  149.       // Create new array of flags so they can all be updated at once.
  150.       boolean[] updatedLiveServer = new boolean[defaultAddresses.length];
  151.       
  152.       // build segmentToAddress map
  153.       Object[][] params = new Object[defaultAddresses.length][0];
  154.       String[][] results =
  155.         (String[][])RPC.call(GET_SEGMENTS, params, defaultAddresses, this.conf);
  156.       for (int i = 0; i < results.length; i++) {  // process results of call
  157.         InetSocketAddress addr = defaultAddresses[i];
  158.         String[] segments = results[i];
  159.         if (segments == null) {
  160.           updatedLiveServer[i] = false;
  161.           if (LOG.isWarnEnabled()) {
  162.             LOG.warn("Client: no segments from: " + addr);
  163.           }
  164.           continue;
  165.         }
  166.         for (int j = 0; j < segments.length; j++) {
  167.           if (LOG.isTraceEnabled()) {
  168.             LOG.trace("Client: segment "+segments[j]+" at "+addr);
  169.           }
  170.           segmentToAddress.put(segments[j], addr);
  171.         }
  172.         updatedLiveServer[i] = true;
  173.         liveServers++;
  174.         liveSegments+=segments.length;
  175.       }
  176.       // Now update live server flags.
  177.       this.liveServer = updatedLiveServer;
  178.       if (LOG.isInfoEnabled()) {
  179.         LOG.info("STATS: "+liveServers+" servers, "+liveSegments+" segments.");
  180.       }
  181.     }
  182.     /** Return the names of segments searched. */
  183.     public String[] getSegmentNames() {
  184.       return (String[])
  185.         segmentToAddress.keySet().toArray(new String[segmentToAddress.size()]);
  186.     }
  187.     public Hits search(final Query query, final int numHits,
  188.                        final String dedupField, final String sortField,
  189.                        final boolean reverse) throws IOException {
  190.       // Get the list of live servers.  It would be nice to build this
  191.       // list in updateSegments(), but that would create concurrency issues.
  192.       // We grab a local reference to the live server flags in case it
  193.       // is updated while we are building our list of liveAddresses.
  194.       boolean[] savedLiveServer = this.liveServer;
  195.       int numLive = 0;
  196.       for (int i = 0; i < savedLiveServer.length; i++) {
  197.         if (savedLiveServer[i])
  198.           numLive++;
  199.       }
  200.       InetSocketAddress[] liveAddresses = new InetSocketAddress[numLive];
  201.       int[] liveIndexNos = new int[numLive];
  202.       int k = 0;
  203.       for (int i = 0; i < savedLiveServer.length; i++) {
  204.         if (savedLiveServer[i]) {
  205.           liveAddresses[k] = defaultAddresses[i];
  206.           liveIndexNos[k] = i;
  207.           k++;
  208.         }
  209.       }
  210.       Object[][] params = new Object[liveAddresses.length][5];
  211.       for (int i = 0; i < params.length; i++) {
  212.         params[i][0] = query;
  213.         params[i][1] = new Integer(numHits);
  214.         params[i][2] = dedupField;
  215.         params[i][3] = sortField;
  216.         params[i][4] = Boolean.valueOf(reverse);
  217.       }
  218.       Hits[] results = (Hits[])RPC.call(SEARCH, params, liveAddresses, this.conf);
  219.       TreeSet queue;                              // cull top hits from results
  220.       if (sortField == null || reverse) {
  221.         queue = new TreeSet(new Comparator() {
  222.             public int compare(Object o1, Object o2) {
  223.               return ((Comparable)o2).compareTo(o1); // reverse natural order
  224.             }
  225.           });
  226.       } else {
  227.         queue = new TreeSet();
  228.       }
  229.       
  230.       long totalHits = 0;
  231.       Comparable maxValue = null;
  232.       for (int i = 0; i < results.length; i++) {
  233.         Hits hits = results[i];
  234.         if (hits == null) continue;
  235.         totalHits += hits.getTotal();
  236.         for (int j = 0; j < hits.getLength(); j++) {
  237.           Hit h = hits.getHit(j);
  238.           if (maxValue == null ||
  239.               ((reverse || sortField == null)
  240.                ? h.getSortValue().compareTo(maxValue) >= 0
  241.                : h.getSortValue().compareTo(maxValue) <= 0)) {
  242.             queue.add(new Hit(liveIndexNos[i], h.getIndexDocNo(),
  243.                               h.getSortValue(), h.getDedupValue()));
  244.             if (queue.size() > numHits) {         // if hit queue overfull
  245.               queue.remove(queue.last());         // remove lowest in hit queue
  246.               maxValue = ((Hit)queue.last()).getSortValue(); // reset maxValue
  247.             }
  248.           }
  249.         }
  250.       }
  251.       return new Hits(totalHits, (Hit[])queue.toArray(new Hit[queue.size()]));
  252.     }
  253.     
  254.     // version for hadoop-0.5.0.jar
  255.     public static final long versionID = 1L;
  256.     
  257.     private Protocol getRemote(Hit hit) throws IOException {
  258.       return (Protocol)
  259.         RPC.getProxy(Protocol.class, versionID, defaultAddresses[hit.getIndexNo()], conf);
  260.     }
  261.     private Protocol getRemote(HitDetails hit) throws IOException {
  262.       InetSocketAddress address =
  263.         (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
  264.       return (Protocol)RPC.getProxy(Protocol.class, versionID, address, conf);
  265.     }
  266.     public String getExplanation(Query query, Hit hit) throws IOException {
  267.       return getRemote(hit).getExplanation(query, hit);
  268.     }
  269.     
  270.     public HitDetails getDetails(Hit hit) throws IOException {
  271.       return getRemote(hit).getDetails(hit);
  272.     }
  273.     
  274.     public HitDetails[] getDetails(Hit[] hits) throws IOException {
  275.       InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
  276.       Object[][] params = new Object[hits.length][1];
  277.       for (int i = 0; i < hits.length; i++) {
  278.         addrs[i] = defaultAddresses[hits[i].getIndexNo()];
  279.         params[i][0] = hits[i];
  280.       }
  281.       return (HitDetails[])RPC.call(DETAILS, params, addrs, conf);
  282.     }
  283.     public Summary getSummary(HitDetails hit, Query query) throws IOException {
  284.       return getRemote(hit).getSummary(hit, query);
  285.     }
  286.     public Summary[] getSummary(HitDetails[] hits, Query query)
  287.       throws IOException {
  288.       InetSocketAddress[] addrs = new InetSocketAddress[hits.length];
  289.       Object[][] params = new Object[hits.length][2];
  290.       for (int i = 0; i < hits.length; i++) {
  291.         HitDetails hit = hits[i];
  292.         addrs[i] =
  293.           (InetSocketAddress)segmentToAddress.get(hit.getValue("segment"));
  294.         params[i][0] = hit;
  295.         params[i][1] = query;
  296.       }
  297.       return (Summary[])RPC.call(SUMMARY, params, addrs, conf);
  298.     }
  299.     
  300.     public byte[] getContent(HitDetails hit) throws IOException {
  301.       return getRemote(hit).getContent(hit);
  302.     }
  303.     
  304.     public ParseData getParseData(HitDetails hit) throws IOException {
  305.       return getRemote(hit).getParseData(hit);
  306.     }
  307.       
  308.     public ParseText getParseText(HitDetails hit) throws IOException {
  309.       return getRemote(hit).getParseText(hit);
  310.     }
  311.       
  312.     public String[] getAnchors(HitDetails hit) throws IOException {
  313.       return getRemote(hit).getAnchors(hit);
  314.     }
  315.     public Inlinks getInlinks(HitDetails hit) throws IOException {
  316.       return getRemote(hit).getInlinks(hit);
  317.     }
  318.     public long getFetchDate(HitDetails hit) throws IOException {
  319.       return getRemote(hit).getFetchDate(hit);
  320.     }
  321.       
  322.     public static void main(String[] args) throws Exception {
  323.       String usage = "DistributedSearch$Client query <host> <port> ...";
  324.       if (args.length == 0) {
  325.         System.err.println(usage);
  326.         System.exit(-1);
  327.       }
  328.       Query query = Query.parse(args[0], NutchConfiguration.create());
  329.       
  330.       InetSocketAddress[] addresses = new InetSocketAddress[(args.length-1)/2];
  331.       for (int i = 0; i < (args.length-1)/2; i++) {
  332.         addresses[i] =
  333.           new InetSocketAddress(args[i*2+1], Integer.parseInt(args[i*2+2]));
  334.       }
  335.       Client client = new Client(addresses, NutchConfiguration.create());
  336.       //client.setTimeout(Integer.MAX_VALUE);
  337.       Hits hits = client.search(query, 10, null, null, false);
  338.       System.out.println("Total hits: " + hits.getTotal());
  339.       for (int i = 0; i < hits.getLength(); i++) {
  340.         System.out.println(" "+i+" "+ client.getDetails(hits.getHit(i)));
  341.       }
  342.     }
  343.     public void run() {
  344.       while (running){
  345.         try{
  346.           Thread.sleep(10000);
  347.         } catch (InterruptedException ie){
  348.           if (LOG.isInfoEnabled()) {
  349.             LOG.info("Thread sleep interrupted.");
  350.           }
  351.         }
  352.         try{
  353.           if (LOG.isInfoEnabled()) {
  354.             LOG.info("Querying segments from search servers...");
  355.           }
  356.           updateSegments();
  357.         } catch (IOException ioe) {
  358.           if (LOG.isWarnEnabled()) { LOG.warn("No search servers available!"); }
  359.           liveServer = new boolean[defaultAddresses.length];
  360.         }
  361.       }
  362.     }
  363.     
  364.     /**
  365.      * Stops the watchdog thread.
  366.      */
  367.     public void close() {
  368.       running = false;
  369.       interrupt();
  370.     }
  371.   }
  372. }