MapFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:25k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.*;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;
  22. import org.apache.hadoop.fs.*;
  23. import org.apache.hadoop.conf.*;
  24. import org.apache.hadoop.util.Progressable;
  25. import org.apache.hadoop.util.ReflectionUtils;
  26. import org.apache.hadoop.io.SequenceFile.CompressionType;
  27. import org.apache.hadoop.io.compress.CompressionCodec;
  28. import org.apache.hadoop.io.compress.DefaultCodec;
  29. /** A file-based map from keys to values.
  30.  * 
  31.  * <p>A map is a directory containing two files, the <code>data</code> file,
  32.  * containing all keys and values in the map, and a smaller <code>index</code>
  33.  * file, containing a fraction of the keys.  The fraction is determined by
  34.  * {@link Writer#getIndexInterval()}.
  35.  *
  36.  * <p>The index file is read entirely into memory.  Thus key implementations
  37.  * should try to keep themselves small.
  38.  *
  39.  * <p>Map files are created by adding entries in-order.  To maintain a large
  40.  * database, perform updates by copying the previous version of a database and
  41.  * merging in a sorted change list, to create a new version of the database in
  42.  * a new file.  Sorting large change lists can be done with {@link
  43.  * SequenceFile.Sorter}.
  44.  */
  45. public class MapFile {
  46.   private static final Log LOG = LogFactory.getLog(MapFile.class);
  47.   /** The name of the index file. */
  48.   public static final String INDEX_FILE_NAME = "index";
  49.   /** The name of the data file. */
  50.   public static final String DATA_FILE_NAME = "data";
  51.   protected MapFile() {}                          // no public ctor
  52.   /** Writes a new map. */
  53.   public static class Writer implements java.io.Closeable {
  54.     private SequenceFile.Writer data;
  55.     private SequenceFile.Writer index;
  56.     final private static String INDEX_INTERVAL = "io.map.index.interval";
  57.     private int indexInterval = 128;
  58.     private long size;
  59.     private LongWritable position = new LongWritable();
  60.     // the following fields are used only for checking key order
  61.     private WritableComparator comparator;
  62.     private DataInputBuffer inBuf = new DataInputBuffer();
  63.     private DataOutputBuffer outBuf = new DataOutputBuffer();
  64.     private WritableComparable lastKey;
  65.     /** Create the named map for keys of the named class. */
  66.     public Writer(Configuration conf, FileSystem fs, String dirName,
  67.                   Class<? extends WritableComparable> keyClass, Class valClass)
  68.       throws IOException {
  69.       this(conf, fs, dirName,
  70.            WritableComparator.get(keyClass), valClass,
  71.            SequenceFile.getCompressionType(conf));
  72.     }
  73.     /** Create the named map for keys of the named class. */
  74.     public Writer(Configuration conf, FileSystem fs, String dirName,
  75.                   Class<? extends WritableComparable> keyClass, Class valClass,
  76.                   CompressionType compress, Progressable progress)
  77.       throws IOException {
  78.       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
  79.            compress, progress);
  80.     }
  81.     /** Create the named map for keys of the named class. */
  82.     public Writer(Configuration conf, FileSystem fs, String dirName,
  83.                   Class<? extends WritableComparable> keyClass, Class valClass,
  84.                   CompressionType compress, CompressionCodec codec,
  85.                   Progressable progress)
  86.       throws IOException {
  87.       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass,
  88.            compress, codec, progress);
  89.     }
  90.     /** Create the named map for keys of the named class. */
  91.     public Writer(Configuration conf, FileSystem fs, String dirName,
  92.                   Class<? extends WritableComparable> keyClass, Class valClass,
  93.                   CompressionType compress)
  94.       throws IOException {
  95.       this(conf, fs, dirName, WritableComparator.get(keyClass), valClass, compress);
  96.     }
  97.     /** Create the named map using the named key comparator. */
  98.     public Writer(Configuration conf, FileSystem fs, String dirName,
  99.                   WritableComparator comparator, Class valClass)
  100.       throws IOException {
  101.       this(conf, fs, dirName, comparator, valClass,
  102.            SequenceFile.getCompressionType(conf));
  103.     }
  104.     /** Create the named map using the named key comparator. */
  105.     public Writer(Configuration conf, FileSystem fs, String dirName,
  106.                   WritableComparator comparator, Class valClass,
  107.                   SequenceFile.CompressionType compress)
  108.       throws IOException {
  109.       this(conf, fs, dirName, comparator, valClass, compress, null);
  110.     }
  111.     /** Create the named map using the named key comparator. */
  112.     public Writer(Configuration conf, FileSystem fs, String dirName,
  113.                   WritableComparator comparator, Class valClass,
  114.                   SequenceFile.CompressionType compress,
  115.                   Progressable progress)
  116.       throws IOException {
  117.       this(conf, fs, dirName, comparator, valClass, 
  118.            compress, new DefaultCodec(), progress);
  119.     }
  120.     /** Create the named map using the named key comparator. */
  121.     public Writer(Configuration conf, FileSystem fs, String dirName,
  122.                   WritableComparator comparator, Class valClass,
  123.                   SequenceFile.CompressionType compress, CompressionCodec codec,
  124.                   Progressable progress)
  125.       throws IOException {
  126.       this.indexInterval = conf.getInt(INDEX_INTERVAL, this.indexInterval);
  127.       this.comparator = comparator;
  128.       this.lastKey = comparator.newKey();
  129.       Path dir = new Path(dirName);
  130.       if (!fs.mkdirs(dir)) {
  131.         throw new IOException("Mkdirs failed to create directory " + dir.toString());
  132.       }
  133.       Path dataFile = new Path(dir, DATA_FILE_NAME);
  134.       Path indexFile = new Path(dir, INDEX_FILE_NAME);
  135.       Class keyClass = comparator.getKeyClass();
  136.       this.data =
  137.         SequenceFile.createWriter
  138.         (fs, conf, dataFile, keyClass, valClass, compress, codec, progress);
  139.       this.index =
  140.         SequenceFile.createWriter
  141.         (fs, conf, indexFile, keyClass, LongWritable.class,
  142.          CompressionType.BLOCK, progress);
  143.     }
  144.     
  145.     /** The number of entries that are added before an index entry is added.*/
  146.     public int getIndexInterval() { return indexInterval; }
  147.     /** Sets the index interval.
  148.      * @see #getIndexInterval()
  149.      */
  150.     public void setIndexInterval(int interval) { indexInterval = interval; }
  151.     /** Sets the index interval and stores it in conf
  152.      * @see #getIndexInterval()
  153.      */
  154.     public static void setIndexInterval(Configuration conf, int interval) {
  155.       conf.setInt(INDEX_INTERVAL, interval);
  156.     }
  157.     /** Close the map. */
  158.     public synchronized void close() throws IOException {
  159.       data.close();
  160.       index.close();
  161.     }
  162.     /** Append a key/value pair to the map.  The key must be greater or equal
  163.      * to the previous key added to the map. */
  164.     public synchronized void append(WritableComparable key, Writable val)
  165.       throws IOException {
  166.       checkKey(key);
  167.       
  168.       if (size % indexInterval == 0) {            // add an index entry
  169.         position.set(data.getLength());           // point to current eof
  170.         index.append(key, position);
  171.       }
  172.       data.append(key, val);                      // append key/value to data
  173.       size++;
  174.     }
  175.     private void checkKey(WritableComparable key) throws IOException {
  176.       // check that keys are well-ordered
  177.       if (size != 0 && comparator.compare(lastKey, key) > 0)
  178.         throw new IOException("key out of order: "+key+" after "+lastKey);
  179.           
  180.       // update lastKey with a copy of key by writing and reading
  181.       outBuf.reset();
  182.       key.write(outBuf);                          // write new key
  183.       inBuf.reset(outBuf.getData(), outBuf.getLength());
  184.       lastKey.readFields(inBuf);                  // read into lastKey
  185.     }
  186.   }
  187.   
  188.   /** Provide access to an existing map. */
  189.   public static class Reader implements java.io.Closeable {
  190.       
  191.     /** Number of index entries to skip between each entry.  Zero by default.
  192.      * Setting this to values larger than zero can facilitate opening large map
  193.      * files using less memory. */
  194.     private int INDEX_SKIP = 0;
  195.       
  196.     private WritableComparator comparator;
  197.     private WritableComparable nextKey;
  198.     private long seekPosition = -1;
  199.     private int seekIndex = -1;
  200.     private long firstPosition;
  201.     // the data, on disk
  202.     private SequenceFile.Reader data;
  203.     private SequenceFile.Reader index;
  204.     // whether the index Reader was closed
  205.     private boolean indexClosed = false;
  206.     // the index, in memory
  207.     private int count = -1;
  208.     private WritableComparable[] keys;
  209.     private long[] positions;
  210.     /** Returns the class of keys in this file. */
  211.     public Class<?> getKeyClass() { return data.getKeyClass(); }
  212.     /** Returns the class of values in this file. */
  213.     public Class<?> getValueClass() { return data.getValueClass(); }
  214.     /** Construct a map reader for the named map.*/
  215.     public Reader(FileSystem fs, String dirName, Configuration conf) throws IOException {
  216.       this(fs, dirName, null, conf);
  217.       INDEX_SKIP = conf.getInt("io.map.index.skip", 0);
  218.     }
  219.     /** Construct a map reader for the named map using the named comparator.*/
  220.     public Reader(FileSystem fs, String dirName, WritableComparator comparator, Configuration conf)
  221.       throws IOException {
  222.       this(fs, dirName, comparator, conf, true);
  223.     }
  224.     
  225.     /**
  226.      * Hook to allow subclasses to defer opening streams until further
  227.      * initialization is complete.
  228.      * @see #createDataFileReader(FileSystem, Path, Configuration)
  229.      */
  230.     protected Reader(FileSystem fs, String dirName,
  231.         WritableComparator comparator, Configuration conf, boolean open)
  232.       throws IOException {
  233.       
  234.       if (open) {
  235.         open(fs, dirName, comparator, conf);
  236.       }
  237.     }
  238.     
  239.     protected synchronized void open(FileSystem fs, String dirName,
  240.         WritableComparator comparator, Configuration conf) throws IOException {
  241.       Path dir = new Path(dirName);
  242.       Path dataFile = new Path(dir, DATA_FILE_NAME);
  243.       Path indexFile = new Path(dir, INDEX_FILE_NAME);
  244.       // open the data
  245.       this.data = createDataFileReader(fs, dataFile, conf);
  246.       this.firstPosition = data.getPosition();
  247.       if (comparator == null)
  248.         this.comparator = WritableComparator.get(data.getKeyClass().asSubclass(WritableComparable.class));
  249.       else
  250.         this.comparator = comparator;
  251.       // open the index
  252.       this.index = new SequenceFile.Reader(fs, indexFile, conf);
  253.     }
  254.     /**
  255.      * Override this method to specialize the type of
  256.      * {@link SequenceFile.Reader} returned.
  257.      */
  258.     protected SequenceFile.Reader createDataFileReader(FileSystem fs,
  259.         Path dataFile, Configuration conf) throws IOException {
  260.       return new SequenceFile.Reader(fs, dataFile,  conf);
  261.     }
  262.     private void readIndex() throws IOException {
  263.       // read the index entirely into memory
  264.       if (this.keys != null)
  265.         return;
  266.       this.count = 0;
  267.       this.keys = new WritableComparable[1024];
  268.       this.positions = new long[1024];
  269.       try {
  270.         int skip = INDEX_SKIP;
  271.         LongWritable position = new LongWritable();
  272.         WritableComparable lastKey = null;
  273.         while (true) {
  274.           WritableComparable k = comparator.newKey();
  275.           if (!index.next(k, position))
  276.             break;
  277.           // check order to make sure comparator is compatible
  278.           if (lastKey != null && comparator.compare(lastKey, k) > 0)
  279.             throw new IOException("key out of order: "+k+" after "+lastKey);
  280.           lastKey = k;
  281.           
  282.           if (skip > 0) {
  283.             skip--;
  284.             continue;                             // skip this entry
  285.           } else {
  286.             skip = INDEX_SKIP;                    // reset skip
  287.           }
  288.           if (count == keys.length) {                // time to grow arrays
  289.             int newLength = (keys.length*3)/2;
  290.             WritableComparable[] newKeys = new WritableComparable[newLength];
  291.             long[] newPositions = new long[newLength];
  292.             System.arraycopy(keys, 0, newKeys, 0, count);
  293.             System.arraycopy(positions, 0, newPositions, 0, count);
  294.             keys = newKeys;
  295.             positions = newPositions;
  296.           }
  297.           keys[count] = k;
  298.           positions[count] = position.get();
  299.           count++;
  300.         }
  301.       } catch (EOFException e) {
  302.         LOG.warn("Unexpected EOF reading " + index +
  303.                               " at entry #" + count + ".  Ignoring.");
  304.       } finally {
  305. indexClosed = true;
  306.         index.close();
  307.       }
  308.     }
  309.     /** Re-positions the reader before its first key. */
  310.     public synchronized void reset() throws IOException {
  311.       data.seek(firstPosition);
  312.     }
  313.     /** Get the key at approximately the middle of the file.
  314.      * 
  315.      * @throws IOException
  316.      */
  317.     public synchronized WritableComparable midKey() throws IOException {
  318.       readIndex();
  319.       int pos = ((count - 1) / 2);              // middle of the index
  320.       if (pos < 0) {
  321.         throw new IOException("MapFile empty");
  322.       }
  323.       
  324.       return keys[pos];
  325.     }
  326.     
  327.     /** Reads the final key from the file.
  328.      *
  329.      * @param key key to read into
  330.      */
  331.     public synchronized void finalKey(WritableComparable key)
  332.       throws IOException {
  333.       long originalPosition = data.getPosition(); // save position
  334.       try {
  335.         readIndex();                              // make sure index is valid
  336.         if (count > 0) {
  337.           data.seek(positions[count-1]);          // skip to last indexed entry
  338.         } else {
  339.           reset();                                // start at the beginning
  340.         }
  341.         while (data.next(key)) {}                 // scan to eof
  342.       } finally {
  343.         data.seek(originalPosition);              // restore position
  344.       }
  345.     }
  346.     /** Positions the reader at the named key, or if none such exists, at the
  347.      * first entry after the named key.  Returns true iff the named key exists
  348.      * in this map.
  349.      */
  350.     public synchronized boolean seek(WritableComparable key) throws IOException {
  351.       return seekInternal(key) == 0;
  352.     }
  353.     /** 
  354.      * Positions the reader at the named key, or if none such exists, at the
  355.      * first entry after the named key.
  356.      *
  357.      * @return  0   - exact match found
  358.      *          < 0 - positioned at next record
  359.      *          1   - no more records in file
  360.      */
  361.     private synchronized int seekInternal(WritableComparable key)
  362.       throws IOException {
  363.       return seekInternal(key, false);
  364.     }
  365.     /** 
  366.      * Positions the reader at the named key, or if none such exists, at the
  367.      * key that falls just before or just after dependent on how the
  368.      * <code>before</code> parameter is set.
  369.      * 
  370.      * @param before - IF true, and <code>key</code> does not exist, position
  371.      * file at entry that falls just before <code>key</code>.  Otherwise,
  372.      * position file at record that sorts just after.
  373.      * @return  0   - exact match found
  374.      *          < 0 - positioned at next record
  375.      *          1   - no more records in file
  376.      */
  377.     private synchronized int seekInternal(WritableComparable key,
  378.         final boolean before)
  379.       throws IOException {
  380.       readIndex();                                // make sure index is read
  381.       if (seekIndex != -1                         // seeked before
  382.           && seekIndex+1 < count           
  383.           && comparator.compare(key, keys[seekIndex+1])<0 // before next indexed
  384.           && comparator.compare(key, nextKey)
  385.           >= 0) {                                 // but after last seeked
  386.         // do nothing
  387.       } else {
  388.         seekIndex = binarySearch(key);
  389.         if (seekIndex < 0)                        // decode insertion point
  390.           seekIndex = -seekIndex-2;
  391.         if (seekIndex == -1)                      // belongs before first entry
  392.           seekPosition = firstPosition;           // use beginning of file
  393.         else
  394.           seekPosition = positions[seekIndex];    // else use index
  395.       }
  396.       data.seek(seekPosition);
  397.       
  398.       if (nextKey == null)
  399.         nextKey = comparator.newKey();
  400.      
  401.       // If we're looking for the key before, we need to keep track
  402.       // of the position we got the current key as well as the position
  403.       // of the key before it.
  404.       long prevPosition = -1;
  405.       long curPosition = seekPosition;
  406.       while (data.next(nextKey)) {
  407.         int c = comparator.compare(key, nextKey);
  408.         if (c <= 0) {                             // at or beyond desired
  409.           if (before && c != 0) {
  410.             if (prevPosition == -1) {
  411.               // We're on the first record of this index block
  412.               // and we've already passed the search key. Therefore
  413.               // we must be at the beginning of the file, so seek
  414.               // to the beginning of this block and return c
  415.               data.seek(curPosition);
  416.             } else {
  417.               // We have a previous record to back up to
  418.               data.seek(prevPosition);
  419.               data.next(nextKey);
  420.               // now that we've rewound, the search key must be greater than this key
  421.               return 1;
  422.             }
  423.           }
  424.           return c;
  425.         }
  426.         if (before) {
  427.           prevPosition = curPosition;
  428.           curPosition = data.getPosition();
  429.         }
  430.       }
  431.       return 1;
  432.     }
  433.     private int binarySearch(WritableComparable key) {
  434.       int low = 0;
  435.       int high = count-1;
  436.       while (low <= high) {
  437.         int mid = (low + high) >>> 1;
  438.         WritableComparable midVal = keys[mid];
  439.         int cmp = comparator.compare(midVal, key);
  440.         if (cmp < 0)
  441.           low = mid + 1;
  442.         else if (cmp > 0)
  443.           high = mid - 1;
  444.         else
  445.           return mid;                             // key found
  446.       }
  447.       return -(low + 1);                          // key not found.
  448.     }
  449.     /** Read the next key/value pair in the map into <code>key</code> and
  450.      * <code>val</code>.  Returns true if such a pair exists and false when at
  451.      * the end of the map */
  452.     public synchronized boolean next(WritableComparable key, Writable val)
  453.       throws IOException {
  454.       return data.next(key, val);
  455.     }
  456.     /** Return the value for the named key, or null if none exists. */
  457.     public synchronized Writable get(WritableComparable key, Writable val)
  458.       throws IOException {
  459.       if (seek(key)) {
  460.         data.getCurrentValue(val);
  461.         return val;
  462.       } else
  463.         return null;
  464.     }
  465.     /** 
  466.      * Finds the record that is the closest match to the specified key.
  467.      * Returns <code>key</code> or if it does not exist, at the first entry
  468.      * after the named key.
  469.      * 
  470. -     * @param key       - key that we're trying to find
  471. -     * @param val       - data value if key is found
  472. -     * @return          - the key that was the closest match or null if eof.
  473.      */
  474.     public synchronized WritableComparable getClosest(WritableComparable key,
  475.       Writable val)
  476.     throws IOException {
  477.       return getClosest(key, val, false);
  478.     }
  479.     /** 
  480.      * Finds the record that is the closest match to the specified key.
  481.      * 
  482.      * @param key       - key that we're trying to find
  483.      * @param val       - data value if key is found
  484.      * @param before    - IF true, and <code>key</code> does not exist, return
  485.      * the first entry that falls just before the <code>key</code>.  Otherwise,
  486.      * return the record that sorts just after.
  487.      * @return          - the key that was the closest match or null if eof.
  488.      */
  489.     public synchronized WritableComparable getClosest(WritableComparable key,
  490.         Writable val, final boolean before)
  491.       throws IOException {
  492.      
  493.       int c = seekInternal(key, before);
  494.       // If we didn't get an exact match, and we ended up in the wrong
  495.       // direction relative to the query key, return null since we
  496.       // must be at the beginning or end of the file.
  497.       if ((!before && c > 0) ||
  498.           (before && c < 0)) {
  499.         return null;
  500.       }
  501.       data.getCurrentValue(val);
  502.       return nextKey;
  503.     }
  504.     /** Close the map. */
  505.     public synchronized void close() throws IOException {
  506.       if (!indexClosed) {
  507.         index.close();
  508.       }
  509.       data.close();
  510.     }
  511.   }
  512.   /** Renames an existing map directory. */
  513.   public static void rename(FileSystem fs, String oldName, String newName)
  514.     throws IOException {
  515.     Path oldDir = new Path(oldName);
  516.     Path newDir = new Path(newName);
  517.     if (!fs.rename(oldDir, newDir)) {
  518.       throw new IOException("Could not rename " + oldDir + " to " + newDir);
  519.     }
  520.   }
  521.   /** Deletes the named map file. */
  522.   public static void delete(FileSystem fs, String name) throws IOException {
  523.     Path dir = new Path(name);
  524.     Path data = new Path(dir, DATA_FILE_NAME);
  525.     Path index = new Path(dir, INDEX_FILE_NAME);
  526.     fs.delete(data, true);
  527.     fs.delete(index, true);
  528.     fs.delete(dir, true);
  529.   }
  530.   /**
  531.    * This method attempts to fix a corrupt MapFile by re-creating its index.
  532.    * @param fs filesystem
  533.    * @param dir directory containing the MapFile data and index
  534.    * @param keyClass key class (has to be a subclass of Writable)
  535.    * @param valueClass value class (has to be a subclass of Writable)
  536.    * @param dryrun do not perform any changes, just report what needs to be done
  537.    * @return number of valid entries in this MapFile, or -1 if no fixing was needed
  538.    * @throws Exception
  539.    */
  540.   public static long fix(FileSystem fs, Path dir,
  541.                          Class<? extends Writable> keyClass,
  542.                          Class<? extends Writable> valueClass, boolean dryrun,
  543.                          Configuration conf) throws Exception {
  544.     String dr = (dryrun ? "[DRY RUN ] " : "");
  545.     Path data = new Path(dir, DATA_FILE_NAME);
  546.     Path index = new Path(dir, INDEX_FILE_NAME);
  547.     int indexInterval = 128;
  548.     if (!fs.exists(data)) {
  549.       // there's nothing we can do to fix this!
  550.       throw new Exception(dr + "Missing data file in " + dir + ", impossible to fix this.");
  551.     }
  552.     if (fs.exists(index)) {
  553.       // no fixing needed
  554.       return -1;
  555.     }
  556.     SequenceFile.Reader dataReader = new SequenceFile.Reader(fs, data, conf);
  557.     if (!dataReader.getKeyClass().equals(keyClass)) {
  558.       throw new Exception(dr + "Wrong key class in " + dir + ", expected" + keyClass.getName() +
  559.                           ", got " + dataReader.getKeyClass().getName());
  560.     }
  561.     if (!dataReader.getValueClass().equals(valueClass)) {
  562.       throw new Exception(dr + "Wrong value class in " + dir + ", expected" + valueClass.getName() +
  563.                           ", got " + dataReader.getValueClass().getName());
  564.     }
  565.     long cnt = 0L;
  566.     Writable key = ReflectionUtils.newInstance(keyClass, conf);
  567.     Writable value = ReflectionUtils.newInstance(valueClass, conf);
  568.     SequenceFile.Writer indexWriter = null;
  569.     if (!dryrun) indexWriter = SequenceFile.createWriter(fs, conf, index, keyClass, LongWritable.class);
  570.     try {
  571.       long pos = 0L;
  572.       LongWritable position = new LongWritable();
  573.       while(dataReader.next(key, value)) {
  574.         cnt++;
  575.         if (cnt % indexInterval == 0) {
  576.           position.set(pos);
  577.           if (!dryrun) indexWriter.append(key, position);
  578.         }
  579.         pos = dataReader.getPosition();
  580.       }
  581.     } catch(Throwable t) {
  582.       // truncated data file. swallow it.
  583.     }
  584.     dataReader.close();
  585.     if (!dryrun) indexWriter.close();
  586.     return cnt;
  587.   }
  588.   public static void main(String[] args) throws Exception {
  589.     String usage = "Usage: MapFile inFile outFile";
  590.       
  591.     if (args.length != 2) {
  592.       System.err.println(usage);
  593.       System.exit(-1);
  594.     }
  595.       
  596.     String in = args[0];
  597.     String out = args[1];
  598.     Configuration conf = new Configuration();
  599.     FileSystem fs = FileSystem.getLocal(conf);
  600.     MapFile.Reader reader = new MapFile.Reader(fs, in, conf);
  601.     MapFile.Writer writer =
  602.       new MapFile.Writer(conf, fs, out,
  603.           reader.getKeyClass().asSubclass(WritableComparable.class),
  604.           reader.getValueClass());
  605.     WritableComparable key =
  606.       ReflectionUtils.newInstance(reader.getKeyClass().asSubclass(WritableComparable.class), conf);
  607.     Writable value =
  608.       ReflectionUtils.newInstance(reader.getValueClass().asSubclass(Writable.class), conf);
  609.     while (reader.next(key, value))               // copy all entries
  610.       writer.append(key, value);
  611.     writer.close();
  612.   }
  613. }