资源说明:Python Index - Full text search search engine built on top of Python/MPI. DEPRECATED. Use PomeGranate instead.
# NOTE PLEASE READ IT HERE * All the code present here is actually have been moved and integrated inside [PomeGranate](http://github.com/nopper/PomeGranate) # Pydex - A simple case study of inverted index creation ## What is an Inverted Index? > In computer science, an inverted index (also referred to as postings > file or inverted file) is an index data structure storing a mapping from > content, such as words or numbers, to its locations in a database file, > or in a document or a set of documents. The purpose of an inverted index > is to allow fast full text searches, at a cost of increased processing > when a document is added to the database > -- [Wikipedia][1] [1]:http://en.wikipedia.org/wiki/Inverted_index ## Requirements In order to have a proper working copy you need *at least* [Python 2.7](http://) and [mpi4py](http://) package compiled from the source. ## Requirements on Ubuntu You need to install the following packages on ubuntu in order to run our solution: $ sudo apt-get install mpich2 libcr-dev g++ python-dev ## Installing requirements For the tests we have used MPICHv2, but note that any implementation of MPI should be fine to get pydex working. So you choose your favourite implementation from the package management system of your distribution. Regarding the python requirements I really suggest to use a virtual environment for installing all the dependences. Therefore we strongly recommend you to use virtualenv. You can successively use the `requirements.txt` file with pip to install all the missing packages. ## How to run? We have used pydex to run a reverse index creation on top of a wikipedia dump. In order to successfully reiterate our experiment you have to download the latest dump of wikipedia database from [this page](http://dumps.wikimedia.org/enwiki/latest/). The file you have to download is called `enwiki-latest-pages-articles.xml.bz2` and its size is about 7.0GB. After you download it, you have to pass it through a preliminary phase by using `wiki-extractor` script which resides in `src/utils` directory. What you have to do is to specify an output directory and the size in bytes of the partitions you want to create: $ bzcat enwiki-latest-pages-articles.xml.bz2 | \ python src/utils/wiki-extractor.py /mnt/root/collection 66060288 This command will extract on the fly the dump file of wikipedia and pipe the contents to our script. The script is in charge of filtering out and in converting articles contents and to recompress them in tgz archives of about 64MB each. This preliminary phase of conversion is needed to feed our indexer. The extraction and recompression takes about 3 hours and half. After having completed this step you are ready to launch the real indexing phase. Take a look at the configuration file `pydex.json` in order to specify input/output directories and other settings. Then you can spawn your command by simply running: $ mpiexec -np 6 python src/main.py The default configuration has 2 mappers, 2 reducers. You have to take in consideration that 1 master and 1 combiner node will be needed. That is the reason why you have to specify 6 as number of processing elements. Take also in consideration that both directories you specify in the configuration file should be accessible by the remote Python MPI interpreters. Therefore a mechanism like NFS or a distributed/network filesystem should be used. This is not required if you are testing pydex on a single machine. ## How it works? The construction of the reverse index based on the tf-idf model is really straightforward. It consist in three phases of map-reduce-combine optimized for massively parallel architecture like a farm of cluster. The code should therefore be run on top of a distributed filesystem like kosmosfs, kobold of tahoe-fs in order to further optimize the distribution of the workers. Now we briefly describe the work assigned to each subcomponent: - **MAP**: The *map* phase simply consist in reading a portion of file and producing a tuple (key, value) that should be passed to the reducer. - **REDUCER**: The *reducer* is in charge of collecting multiple (key, value) tuples from a huge number of mappers. The aim is to produce a single value (key, sum-of-values) whenever multiple tuples contains the same key. These partial results are further forwarded to the *combiner* - **COMBINER**: The *combiner* goal is to just sort results coming from reducers and to collate in various output files. Now we further explain the goal of each phase in order to derive the correct tf-idf reverse index. ### Inputs Assumptions: 3 mappers, 1 reducer, 1 combiner Document #1: foo python hello world foo Document #2: python world Document #3: hello world bar bar ### First phase Mapper #1: (, 1), ( , 1), ( , 1), ( , 1), ( , 1) Mapper #2: ( , 1), ( , 1) Mapper #3: ( , 1), ( , 1), ( , 1), ( , 1) Assuming 2 by 2 round robin: Reducer: ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 2), ( , 1) After sorting (by the way this is done in place) and assuming the heap is of static size of 5 tuples max we have: Chunk #1: ( , 1), ( , 1), ( , 1), ( , 1), ( , 1) Chunk #2: ( , 2), ( , 1), ( , 1), ( , 1), ( , 1) Combiner: ( , 2), ( , 2), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1), ( , 1) ### Second phase In the second phase the final goal is to have the word-count per document. In this case each mapper executes: {[word, docId] => wordCount} -> {docId => [word, wordCount]} The reducer on the other hand: {docId => [[word-1, wordCount], [word-2, wordCount], ...]} => {[word, docId] => [wordCount, wordsPerDoc]} In the specific example we have: Mapper #1: (<#3>, bar, 2), (<#1>, foo, 2), (<#1>, hello, 1) Mapper #2: (<#3>, hello, 1), (<#1>, python, 1), (<#2>, python, 1) Mapper #3: (<#1>, world, 1), (<#2>, world, 1), (<#3>, world, 1) Reducer: (<#3>, bar, 2, 0), (<#1>, foo, 2, 0), (<#3>, hello, 1, 0), (<#1>, python, 1, 0), (<#1>, world, 1, 0), (<#2>, world, 1, 0), (<#1>, hello, 1, 0), (<#2>, python, 1, 0), (<#3>, world, 1, 0) After sorting (by the way this is done in place) and assuming the heap is of static size of 5 tuples max we have: Chunk #1: (<#1>, foo, 2, 4), (<#1>, python, 1, 4), (<#1>, world, 1, 4), (<#3>, hello, 1, 3), (<#3>, bar, 2, 3) Chunk #2: (<#1>, hello, 1, 1), (<#2>, python, 1, 2), (<#2>, world, 1, 2), (<#3>, world, 1, 1) The combiner at this point will keep up a counter and stop writing to disk until the document id of the iterator is different from the previous one. The output at this point will be: They might be not in order (<#1>, hello, 1, 5), (<#1>, foo, 2, 5), (<#1>, python, 1, 5), (<#1>, world, 1, 5), (<#2>, python, 1, 2), (<#2>, world, 1, 2), (<#3>, hello, 1, 4), (<#3>, bar, 2, 4), (<#3>, world, 1, 4) ### Third phase The mapper in this phase shall produce something like: {[word, docId] => [wordCount, wordsPerDoc]} => {word => [docId, wordCount, wordsPerDoc]} While the reducer: {word => [[docId-1, wordCount-1, wordsPerDoc-1], [docId-2, wordCount-2, wordsPerDoc-2], ...]} => {[word, docId] => [wordCount, wordsPerDoc, docsPerWord]} => {[word, docId] => tfidf} So in our case: Mapper #1: ( , #1, 1, 5), ( , #1, 2, 5), ( , #1, 1, 5) Mapper #2: ( , #1, 1, 5), ( , #2, 1, 2), ( , #2, 1, 2) Mapper #3: ( , #3, 1, 4), ( , #3, 2, 4), ( , #3, 1, 4) Reducer: ( , #1, 1, 5, 0), ( , #1, 2, 5, 0), ( , #1, 1, 5, 0), ( , #2, 1, 2, 0), ( , #3, 1, 4, 0) ( , #3, 2, 4, 0), ( , #1, 1, 5, 0), ( , #2, 1, 2, 0), ( , #3, 1, 4, 0) Reducer after sorting: Reducer: ( , 2, 5, 1), ( , 1, 5, 2), ( , 1, 4, 2), ( , 1, 2, 1), ( , 1, 5, 1) ( , 2, 4, 1), ( , 1, 5, 1), ( , 1, 2, 2), ( , 1, 4, 2) Combiner: ( , 2, 4, 1), ( , 2, 5, 1), ( , 1, 5, 2), ( , 1, 4, 2), ( , 1, 5, 2), // after computing ( , 1, 2, 2), ( , 1, 5, 3), // after computing ( , 1, 2, 3), ( , 1, 4, 3) Please note that the only feasible approach here is to have a multiway merge sort that outputs ( , wordCount, wordPerDoc) in sorted ascending order and a counter keeping track of the docsPerWord flushed on a secondary file buffer. ### Final phase Mapper #1: => (2 / 4) * log(3 / 1) = 0.79 => (2 / 5) * log(3 / 1) = 0.63 => (1 / 5) * log(3 / 2) = 0.12 Mapper #2: => (1 / 4) * log(3 / 2) = 0.15 => (1 / 5) * log(3 / 2) = 0.12 => (1 / 2) * log(3 / 2) = 0.29 Mapper #3: => (1 / 5) * log(3 / 3) = 0 => (1 / 2) * log(3 / 3) = 0 => (1 / 4) * log(3 / 3) = 0 Final matrix ------------+------+------+------+------+ | foo | bar | hello|python| ------------+------+------+------+------+ Document #1 | 0.63 | - | 0.12 | 0.12 | Document #2 | - | - | - | 0.29 | Document #3 | - | 0.39 | 0.15 | - | ------------+------+------+------+------+ Implementation ============== Instead of counting the number of termination messages from the mappers the reducer can simply wait the termination message from the master. This would simplify the code a lot and make it possible to introduce dynamic process feature in the code (varying at runtime the number of mappers involved).