SequenceFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:112k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.*;
  20. import java.util.*;
  21. import java.rmi.server.UID;
  22. import java.security.MessageDigest;
  23. import org.apache.commons.logging.*;
  24. import org.apache.hadoop.fs.*;
  25. import org.apache.hadoop.io.compress.CodecPool;
  26. import org.apache.hadoop.io.compress.CompressionCodec;
  27. import org.apache.hadoop.io.compress.CompressionInputStream;
  28. import org.apache.hadoop.io.compress.CompressionOutputStream;
  29. import org.apache.hadoop.io.compress.Compressor;
  30. import org.apache.hadoop.io.compress.Decompressor;
  31. import org.apache.hadoop.io.compress.DefaultCodec;
  32. import org.apache.hadoop.io.compress.GzipCodec;
  33. import org.apache.hadoop.io.compress.zlib.ZlibFactory;
  34. import org.apache.hadoop.io.serializer.Deserializer;
  35. import org.apache.hadoop.io.serializer.SerializationFactory;
  36. import org.apache.hadoop.io.serializer.Serializer;
  37. import org.apache.hadoop.conf.*;
  38. import org.apache.hadoop.util.Progressable;
  39. import org.apache.hadoop.util.Progress;
  40. import org.apache.hadoop.util.ReflectionUtils;
  41. import org.apache.hadoop.util.NativeCodeLoader;
  42. import org.apache.hadoop.util.MergeSort;
  43. import org.apache.hadoop.util.PriorityQueue;
  44. /** 
  45.  * <code>SequenceFile</code>s are flat files consisting of binary key/value 
  46.  * pairs.
  47.  * 
  48.  * <p><code>SequenceFile</code> provides {@link Writer}, {@link Reader} and
  49.  * {@link Sorter} classes for writing, reading and sorting respectively.</p>
  50.  * 
  51.  * There are three <code>SequenceFile</code> <code>Writer</code>s based on the 
  52.  * {@link CompressionType} used to compress key/value pairs:
  53.  * <ol>
  54.  *   <li>
  55.  *   <code>Writer</code> : Uncompressed records.
  56.  *   </li>
  57.  *   <li>
  58.  *   <code>RecordCompressWriter</code> : Record-compressed files, only compress 
  59.  *                                       values.
  60.  *   </li>
  61.  *   <li>
  62.  *   <code>BlockCompressWriter</code> : Block-compressed files, both keys & 
  63.  *                                      values are collected in 'blocks' 
  64.  *                                      separately and compressed. The size of 
  65.  *                                      the 'block' is configurable.
  66.  * </ol>
  67.  * 
  68.  * <p>The actual compression algorithm used to compress key and/or values can be
  69.  * specified by using the appropriate {@link CompressionCodec}.</p>
  70.  * 
  71.  * <p>The recommended way is to use the static <tt>createWriter</tt> methods
  72.  * provided by the <code>SequenceFile</code> to chose the preferred format.</p>
  73.  *
  74.  * <p>The {@link Reader} acts as the bridge and can read any of the above 
  75.  * <code>SequenceFile</code> formats.</p>
  76.  *
  77.  * <h4 id="Formats">SequenceFile Formats</h4>
  78.  * 
  79.  * <p>Essentially there are 3 different formats for <code>SequenceFile</code>s
  80.  * depending on the <code>CompressionType</code> specified. All of them share a
  81.  * <a href="#Header">common header</a> described below.
  82.  * 
  83.  * <h5 id="Header">SequenceFile Header</h5>
  84.  * <ul>
  85.  *   <li>
  86.  *   version - 3 bytes of magic header <b>SEQ</b>, followed by 1 byte of actual 
  87.  *             version number (e.g. SEQ4 or SEQ6)
  88.  *   </li>
  89.  *   <li>
  90.  *   keyClassName -key class
  91.  *   </li>
  92.  *   <li>
  93.  *   valueClassName - value class
  94.  *   </li>
  95.  *   <li>
  96.  *   compression - A boolean which specifies if compression is turned on for 
  97.  *                 keys/values in this file.
  98.  *   </li>
  99.  *   <li>
  100.  *   blockCompression - A boolean which specifies if block-compression is 
  101.  *                      turned on for keys/values in this file.
  102.  *   </li>
  103.  *   <li>
  104.  *   compression codec - <code>CompressionCodec</code> class which is used for  
  105.  *                       compression of keys and/or values (if compression is 
  106.  *                       enabled).
  107.  *   </li>
  108.  *   <li>
  109.  *   metadata - {@link Metadata} for this file.
  110.  *   </li>
  111.  *   <li>
  112.  *   sync - A sync marker to denote end of the header.
  113.  *   </li>
  114.  * </ul>
  115.  * 
  116.  * <h5 id="#UncompressedFormat">Uncompressed SequenceFile Format</h5>
  117.  * <ul>
  118.  * <li>
  119.  * <a href="#Header">Header</a>
  120.  * </li>
  121.  * <li>
  122.  * Record
  123.  *   <ul>
  124.  *     <li>Record length</li>
  125.  *     <li>Key length</li>
  126.  *     <li>Key</li>
  127.  *     <li>Value</li>
  128.  *   </ul>
  129.  * </li>
  130.  * <li>
  131.  * A sync-marker every few <code>100</code> bytes or so.
  132.  * </li>
  133.  * </ul>
  134.  *
  135.  * <h5 id="#RecordCompressedFormat">Record-Compressed SequenceFile Format</h5>
  136.  * <ul>
  137.  * <li>
  138.  * <a href="#Header">Header</a>
  139.  * </li>
  140.  * <li>
  141.  * Record
  142.  *   <ul>
  143.  *     <li>Record length</li>
  144.  *     <li>Key length</li>
  145.  *     <li>Key</li>
  146.  *     <li><i>Compressed</i> Value</li>
  147.  *   </ul>
  148.  * </li>
  149.  * <li>
  150.  * A sync-marker every few <code>100</code> bytes or so.
  151.  * </li>
  152.  * </ul>
  153.  * 
  154.  * <h5 id="#BlockCompressedFormat">Block-Compressed SequenceFile Format</h5>
  155.  * <ul>
  156.  * <li>
  157.  * <a href="#Header">Header</a>
  158.  * </li>
  159.  * <li>
  160.  * Record <i>Block</i>
  161.  *   <ul>
  162.  *     <li>Compressed key-lengths block-size</li>
  163.  *     <li>Compressed key-lengths block</li>
  164.  *     <li>Compressed keys block-size</li>
  165.  *     <li>Compressed keys block</li>
  166.  *     <li>Compressed value-lengths block-size</li>
  167.  *     <li>Compressed value-lengths block</li>
  168.  *     <li>Compressed values block-size</li>
  169.  *     <li>Compressed values block</li>
  170.  *   </ul>
  171.  * </li>
  172.  * <li>
  173.  * A sync-marker every few <code>100</code> bytes or so.
  174.  * </li>
  175.  * </ul>
  176.  * 
  177.  * <p>The compressed blocks of key lengths and value lengths consist of the 
  178.  * actual lengths of individual keys/values encoded in ZeroCompressedInteger 
  179.  * format.</p>
  180.  * 
  181.  * @see CompressionCodec
  182.  */
  183. public class SequenceFile {
  184.   private static final Log LOG = LogFactory.getLog(SequenceFile.class);
  185.   private SequenceFile() {}                         // no public ctor
  186.   private static final byte BLOCK_COMPRESS_VERSION = (byte)4;
  187.   private static final byte CUSTOM_COMPRESS_VERSION = (byte)5;
  188.   private static final byte VERSION_WITH_METADATA = (byte)6;
  189.   private static byte[] VERSION = new byte[] {
  190.     (byte)'S', (byte)'E', (byte)'Q', VERSION_WITH_METADATA
  191.   };
  192.   private static final int SYNC_ESCAPE = -1;      // "length" of sync entries
  193.   private static final int SYNC_HASH_SIZE = 16;   // number of bytes in hash 
  194.   private static final int SYNC_SIZE = 4+SYNC_HASH_SIZE; // escape + hash
  195.   /** The number of bytes between sync points.*/
  196.   public static final int SYNC_INTERVAL = 100*SYNC_SIZE; 
  197.   /** 
  198.    * The compression type used to compress key/value pairs in the 
  199.    * {@link SequenceFile}.
  200.    * 
  201.    * @see SequenceFile.Writer
  202.    */
  203.   public static enum CompressionType {
  204.     /** Do not compress records. */
  205.     NONE, 
  206.     /** Compress values only, each separately. */
  207.     RECORD,
  208.     /** Compress sequences of records together in blocks. */
  209.     BLOCK
  210.   }
  211.   /**
  212.    * Get the compression type for the reduce outputs
  213.    * @param job the job config to look in
  214.    * @return the kind of compression to use
  215.    * @deprecated Use 
  216.    *             {@link org.apache.hadoop.mapred.SequenceFileOutputFormat#getOutputCompressionType(org.apache.hadoop.mapred.JobConf)} 
  217.    *             to get {@link CompressionType} for job-outputs.
  218.    */
  219.   @Deprecated
  220.   static public CompressionType getCompressionType(Configuration job) {
  221.     String name = job.get("io.seqfile.compression.type");
  222.     return name == null ? CompressionType.RECORD : 
  223.       CompressionType.valueOf(name);
  224.   }
  225.   
  226.   /**
  227.    * Set the compression type for sequence files.
  228.    * @param job the configuration to modify
  229.    * @param val the new compression type (none, block, record)
  230.    * @deprecated Use the one of the many SequenceFile.createWriter methods to specify
  231.    *             the {@link CompressionType} while creating the {@link SequenceFile} or
  232.    *             {@link org.apache.hadoop.mapred.SequenceFileOutputFormat#setOutputCompressionType(org.apache.hadoop.mapred.JobConf, org.apache.hadoop.io.SequenceFile.CompressionType)}
  233.    *             to specify the {@link CompressionType} for job-outputs. 
  234.    * or 
  235.    */
  236.   @Deprecated
  237.   static public void setCompressionType(Configuration job, 
  238.                                         CompressionType val) {
  239.     job.set("io.seqfile.compression.type", val.toString());
  240.   }
  241.     
  242.   /**
  243.    * Construct the preferred type of SequenceFile Writer.
  244.    * @param fs The configured filesystem. 
  245.    * @param conf The configuration.
  246.    * @param name The name of the file. 
  247.    * @param keyClass The 'key' type.
  248.    * @param valClass The 'value' type.
  249.    * @return Returns the handle to the constructed SequenceFile Writer.
  250.    * @throws IOException
  251.    */
  252.   public static Writer 
  253.     createWriter(FileSystem fs, Configuration conf, Path name, 
  254.                  Class keyClass, Class valClass) 
  255.     throws IOException {
  256.     return createWriter(fs, conf, name, keyClass, valClass,
  257.                         getCompressionType(conf));
  258.   }
  259.   
  260.   /**
  261.    * Construct the preferred type of SequenceFile Writer.
  262.    * @param fs The configured filesystem. 
  263.    * @param conf The configuration.
  264.    * @param name The name of the file. 
  265.    * @param keyClass The 'key' type.
  266.    * @param valClass The 'value' type.
  267.    * @param compressionType The compression type.
  268.    * @return Returns the handle to the constructed SequenceFile Writer.
  269.    * @throws IOException
  270.    */
  271.   public static Writer 
  272.     createWriter(FileSystem fs, Configuration conf, Path name, 
  273.                  Class keyClass, Class valClass, CompressionType compressionType) 
  274.     throws IOException {
  275.     return createWriter(fs, conf, name, keyClass, valClass,
  276.             fs.getConf().getInt("io.file.buffer.size", 4096),
  277.             fs.getDefaultReplication(), fs.getDefaultBlockSize(),
  278.             compressionType, new DefaultCodec(), null, new Metadata());
  279.   }
  280.   
  281.   /**
  282.    * Construct the preferred type of SequenceFile Writer.
  283.    * @param fs The configured filesystem. 
  284.    * @param conf The configuration.
  285.    * @param name The name of the file. 
  286.    * @param keyClass The 'key' type.
  287.    * @param valClass The 'value' type.
  288.    * @param compressionType The compression type.
  289.    * @param progress The Progressable object to track progress.
  290.    * @return Returns the handle to the constructed SequenceFile Writer.
  291.    * @throws IOException
  292.    */
  293.   public static Writer
  294.     createWriter(FileSystem fs, Configuration conf, Path name, 
  295.                  Class keyClass, Class valClass, CompressionType compressionType,
  296.                  Progressable progress) throws IOException {
  297.     return createWriter(fs, conf, name, keyClass, valClass,
  298.             fs.getConf().getInt("io.file.buffer.size", 4096),
  299.             fs.getDefaultReplication(), fs.getDefaultBlockSize(),
  300.             compressionType, new DefaultCodec(), progress, new Metadata());
  301.   }
  302.   /**
  303.    * Construct the preferred type of SequenceFile Writer.
  304.    * @param fs The configured filesystem. 
  305.    * @param conf The configuration.
  306.    * @param name The name of the file. 
  307.    * @param keyClass The 'key' type.
  308.    * @param valClass The 'value' type.
  309.    * @param compressionType The compression type.
  310.    * @param codec The compression codec.
  311.    * @return Returns the handle to the constructed SequenceFile Writer.
  312.    * @throws IOException
  313.    */
  314.   public static Writer 
  315.     createWriter(FileSystem fs, Configuration conf, Path name, 
  316.                  Class keyClass, Class valClass, 
  317.                  CompressionType compressionType, CompressionCodec codec) 
  318.     throws IOException {
  319.     return createWriter(fs, conf, name, keyClass, valClass,
  320.             fs.getConf().getInt("io.file.buffer.size", 4096),
  321.             fs.getDefaultReplication(), fs.getDefaultBlockSize(),
  322.             compressionType, codec, null, new Metadata());
  323.   }
  324.   
  325.   /**
  326.    * Construct the preferred type of SequenceFile Writer.
  327.    * @param fs The configured filesystem. 
  328.    * @param conf The configuration.
  329.    * @param name The name of the file. 
  330.    * @param keyClass The 'key' type.
  331.    * @param valClass The 'value' type.
  332.    * @param compressionType The compression type.
  333.    * @param codec The compression codec.
  334.    * @param progress The Progressable object to track progress.
  335.    * @param metadata The metadata of the file.
  336.    * @return Returns the handle to the constructed SequenceFile Writer.
  337.    * @throws IOException
  338.    */
  339.   public static Writer
  340.     createWriter(FileSystem fs, Configuration conf, Path name, 
  341.                  Class keyClass, Class valClass, 
  342.                  CompressionType compressionType, CompressionCodec codec,
  343.                  Progressable progress, Metadata metadata) throws IOException {
  344.     return createWriter(fs, conf, name, keyClass, valClass,
  345.             fs.getConf().getInt("io.file.buffer.size", 4096),
  346.             fs.getDefaultReplication(), fs.getDefaultBlockSize(),
  347.             compressionType, codec, progress, metadata);
  348.   }
  349.   /**
  350.    * Construct the preferred type of SequenceFile Writer.
  351.    * @param fs The configured filesystem.
  352.    * @param conf The configuration.
  353.    * @param name The name of the file.
  354.    * @param keyClass The 'key' type.
  355.    * @param valClass The 'value' type.
  356.    * @param bufferSize buffer size for the underlaying outputstream.
  357.    * @param replication replication factor for the file.
  358.    * @param blockSize block size for the file.
  359.    * @param compressionType The compression type.
  360.    * @param codec The compression codec.
  361.    * @param progress The Progressable object to track progress.
  362.    * @param metadata The metadata of the file.
  363.    * @return Returns the handle to the constructed SequenceFile Writer.
  364.    * @throws IOException
  365.    */
  366.   public static Writer
  367.     createWriter(FileSystem fs, Configuration conf, Path name,
  368.                  Class keyClass, Class valClass, int bufferSize,
  369.                  short replication, long blockSize,
  370.                  CompressionType compressionType, CompressionCodec codec,
  371.                  Progressable progress, Metadata metadata) throws IOException {
  372.     if ((codec instanceof GzipCodec) &&
  373.         !NativeCodeLoader.isNativeCodeLoaded() &&
  374.         !ZlibFactory.isNativeZlibLoaded(conf)) {
  375.       throw new IllegalArgumentException("SequenceFile doesn't work with " +
  376.                                          "GzipCodec without native-hadoop code!");
  377.     }
  378.     Writer writer = null;
  379.     if (compressionType == CompressionType.NONE) {
  380.       writer = new Writer(fs, conf, name, keyClass, valClass,
  381.                           bufferSize, replication, blockSize,
  382.                           progress, metadata);
  383.     } else if (compressionType == CompressionType.RECORD) {
  384.       writer = new RecordCompressWriter(fs, conf, name, keyClass, valClass,
  385.                                         bufferSize, replication, blockSize,
  386.                                         codec, progress, metadata);
  387.     } else if (compressionType == CompressionType.BLOCK){
  388.       writer = new BlockCompressWriter(fs, conf, name, keyClass, valClass,
  389.                                        bufferSize, replication, blockSize,
  390.                                        codec, progress, metadata);
  391.     }
  392.     return writer;
  393.   }
  394.   /**
  395.    * Construct the preferred type of SequenceFile Writer.
  396.    * @param fs The configured filesystem. 
  397.    * @param conf The configuration.
  398.    * @param name The name of the file. 
  399.    * @param keyClass The 'key' type.
  400.    * @param valClass The 'value' type.
  401.    * @param compressionType The compression type.
  402.    * @param codec The compression codec.
  403.    * @param progress The Progressable object to track progress.
  404.    * @return Returns the handle to the constructed SequenceFile Writer.
  405.    * @throws IOException
  406.    */
  407.   public static Writer
  408.     createWriter(FileSystem fs, Configuration conf, Path name, 
  409.                  Class keyClass, Class valClass, 
  410.                  CompressionType compressionType, CompressionCodec codec,
  411.                  Progressable progress) throws IOException {
  412.     Writer writer = createWriter(fs, conf, name, keyClass, valClass, 
  413.                                  compressionType, codec, progress, new Metadata());
  414.     return writer;
  415.   }
  416.   /**
  417.    * Construct the preferred type of 'raw' SequenceFile Writer.
  418.    * @param out The stream on top which the writer is to be constructed.
  419.    * @param keyClass The 'key' type.
  420.    * @param valClass The 'value' type.
  421.    * @param compress Compress data?
  422.    * @param blockCompress Compress blocks?
  423.    * @param metadata The metadata of the file.
  424.    * @return Returns the handle to the constructed SequenceFile Writer.
  425.    * @throws IOException
  426.    */
  427.   private static Writer
  428.     createWriter(Configuration conf, FSDataOutputStream out, 
  429.                  Class keyClass, Class valClass, boolean compress, boolean blockCompress,
  430.                  CompressionCodec codec, Metadata metadata)
  431.     throws IOException {
  432.     if (codec != null && (codec instanceof GzipCodec) && 
  433.         !NativeCodeLoader.isNativeCodeLoaded() && 
  434.         !ZlibFactory.isNativeZlibLoaded(conf)) {
  435.       throw new IllegalArgumentException("SequenceFile doesn't work with " +
  436.                                          "GzipCodec without native-hadoop code!");
  437.     }
  438.     Writer writer = null;
  439.     if (!compress) {
  440.       writer = new Writer(conf, out, keyClass, valClass, metadata);
  441.     } else if (compress && !blockCompress) {
  442.       writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
  443.     } else {
  444.       writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
  445.     }
  446.     
  447.     return writer;
  448.   }
  449.   /**
  450.    * Construct the preferred type of 'raw' SequenceFile Writer.
  451.    * @param fs The configured filesystem. 
  452.    * @param conf The configuration.
  453.    * @param file The name of the file. 
  454.    * @param keyClass The 'key' type.
  455.    * @param valClass The 'value' type.
  456.    * @param compress Compress data?
  457.    * @param blockCompress Compress blocks?
  458.    * @param codec The compression codec.
  459.    * @param progress
  460.    * @param metadata The metadata of the file.
  461.    * @return Returns the handle to the constructed SequenceFile Writer.
  462.    * @throws IOException
  463.    */
  464.   private static Writer
  465.   createWriter(FileSystem fs, Configuration conf, Path file, 
  466.                Class keyClass, Class valClass, 
  467.                boolean compress, boolean blockCompress,
  468.                CompressionCodec codec, Progressable progress, Metadata metadata)
  469.   throws IOException {
  470.   if (codec != null && (codec instanceof GzipCodec) && 
  471.       !NativeCodeLoader.isNativeCodeLoaded() && 
  472.       !ZlibFactory.isNativeZlibLoaded(conf)) {
  473.     throw new IllegalArgumentException("SequenceFile doesn't work with " +
  474.                                        "GzipCodec without native-hadoop code!");
  475.   }
  476.   Writer writer = null;
  477.   if (!compress) {
  478.     writer = new Writer(fs, conf, file, keyClass, valClass, progress, metadata);
  479.   } else if (compress && !blockCompress) {
  480.     writer = new RecordCompressWriter(fs, conf, file, keyClass, valClass, 
  481.                                       codec, progress, metadata);
  482.   } else {
  483.     writer = new BlockCompressWriter(fs, conf, file, keyClass, valClass, 
  484.                                      codec, progress, metadata);
  485.   }
  486.   
  487.   return writer;
  488. }
  489.   /**
  490.    * Construct the preferred type of 'raw' SequenceFile Writer.
  491.    * @param conf The configuration.
  492.    * @param out The stream on top which the writer is to be constructed.
  493.    * @param keyClass The 'key' type.
  494.    * @param valClass The 'value' type.
  495.    * @param compressionType The compression type.
  496.    * @param codec The compression codec.
  497.    * @param metadata The metadata of the file.
  498.    * @return Returns the handle to the constructed SequenceFile Writer.
  499.    * @throws IOException
  500.    */
  501.   public static Writer
  502.     createWriter(Configuration conf, FSDataOutputStream out, 
  503.                  Class keyClass, Class valClass, CompressionType compressionType,
  504.                  CompressionCodec codec, Metadata metadata)
  505.     throws IOException {
  506.     if ((codec instanceof GzipCodec) && 
  507.         !NativeCodeLoader.isNativeCodeLoaded() && 
  508.         !ZlibFactory.isNativeZlibLoaded(conf)) {
  509.       throw new IllegalArgumentException("SequenceFile doesn't work with " +
  510.                                          "GzipCodec without native-hadoop code!");
  511.     }
  512.     Writer writer = null;
  513.     if (compressionType == CompressionType.NONE) {
  514.       writer = new Writer(conf, out, keyClass, valClass, metadata);
  515.     } else if (compressionType == CompressionType.RECORD) {
  516.       writer = new RecordCompressWriter(conf, out, keyClass, valClass, codec, metadata);
  517.     } else if (compressionType == CompressionType.BLOCK){
  518.       writer = new BlockCompressWriter(conf, out, keyClass, valClass, codec, metadata);
  519.     }
  520.     
  521.     return writer;
  522.   }
  523.   
  524.   /**
  525.    * Construct the preferred type of 'raw' SequenceFile Writer.
  526.    * @param conf The configuration.
  527.    * @param out The stream on top which the writer is to be constructed.
  528.    * @param keyClass The 'key' type.
  529.    * @param valClass The 'value' type.
  530.    * @param compressionType The compression type.
  531.    * @param codec The compression codec.
  532.    * @return Returns the handle to the constructed SequenceFile Writer.
  533.    * @throws IOException
  534.    */
  535.   public static Writer
  536.     createWriter(Configuration conf, FSDataOutputStream out, 
  537.                  Class keyClass, Class valClass, CompressionType compressionType,
  538.                  CompressionCodec codec)
  539.     throws IOException {
  540.     Writer writer = createWriter(conf, out, keyClass, valClass, compressionType,
  541.                                  codec, new Metadata());
  542.     return writer;
  543.   }
  544.   
  545.   /** The interface to 'raw' values of SequenceFiles. */
  546.   public static interface ValueBytes {
  547.     /** Writes the uncompressed bytes to the outStream.
  548.      * @param outStream : Stream to write uncompressed bytes into.
  549.      * @throws IOException
  550.      */
  551.     public void writeUncompressedBytes(DataOutputStream outStream)
  552.       throws IOException;
  553.     /** Write compressed bytes to outStream. 
  554.      * Note: that it will NOT compress the bytes if they are not compressed.
  555.      * @param outStream : Stream to write compressed bytes into.
  556.      */
  557.     public void writeCompressedBytes(DataOutputStream outStream) 
  558.       throws IllegalArgumentException, IOException;
  559.     /**
  560.      * Size of stored data.
  561.      */
  562.     public int getSize();
  563.   }
  564.   
  565.   private static class UncompressedBytes implements ValueBytes {
  566.     private int dataSize;
  567.     private byte[] data;
  568.     
  569.     private UncompressedBytes() {
  570.       data = null;
  571.       dataSize = 0;
  572.     }
  573.     
  574.     private void reset(DataInputStream in, int length) throws IOException {
  575.       data = new byte[length];
  576.       dataSize = -1;
  577.       
  578.       in.readFully(data);
  579.       dataSize = data.length;
  580.     }
  581.     
  582.     public int getSize() {
  583.       return dataSize;
  584.     }
  585.     
  586.     public void writeUncompressedBytes(DataOutputStream outStream)
  587.       throws IOException {
  588.       outStream.write(data, 0, dataSize);
  589.     }
  590.     public void writeCompressedBytes(DataOutputStream outStream) 
  591.       throws IllegalArgumentException, IOException {
  592.       throw 
  593.         new IllegalArgumentException("UncompressedBytes cannot be compressed!");
  594.     }
  595.   } // UncompressedBytes
  596.   
  597.   private static class CompressedBytes implements ValueBytes {
  598.     private int dataSize;
  599.     private byte[] data;
  600.     DataInputBuffer rawData = null;
  601.     CompressionCodec codec = null;
  602.     CompressionInputStream decompressedStream = null;
  603.     private CompressedBytes(CompressionCodec codec) {
  604.       data = null;
  605.       dataSize = 0;
  606.       this.codec = codec;
  607.     }
  608.     private void reset(DataInputStream in, int length) throws IOException {
  609.       data = new byte[length];
  610.       dataSize = -1;
  611.       in.readFully(data);
  612.       dataSize = data.length;
  613.     }
  614.     
  615.     public int getSize() {
  616.       return dataSize;
  617.     }
  618.     
  619.     public void writeUncompressedBytes(DataOutputStream outStream)
  620.       throws IOException {
  621.       if (decompressedStream == null) {
  622.         rawData = new DataInputBuffer();
  623.         decompressedStream = codec.createInputStream(rawData);
  624.       } else {
  625.         decompressedStream.resetState();
  626.       }
  627.       rawData.reset(data, 0, dataSize);
  628.       byte[] buffer = new byte[8192];
  629.       int bytesRead = 0;
  630.       while ((bytesRead = decompressedStream.read(buffer, 0, 8192)) != -1) {
  631.         outStream.write(buffer, 0, bytesRead);
  632.       }
  633.     }
  634.     public void writeCompressedBytes(DataOutputStream outStream) 
  635.       throws IllegalArgumentException, IOException {
  636.       outStream.write(data, 0, dataSize);
  637.     }
  638.   } // CompressedBytes
  639.   
  640.   /**
  641.    * The class encapsulating with the metadata of a file.
  642.    * The metadata of a file is a list of attribute name/value
  643.    * pairs of Text type.
  644.    *
  645.    */
  646.   public static class Metadata implements Writable {
  647.     private TreeMap<Text, Text> theMetadata;
  648.     
  649.     public Metadata() {
  650.       this(new TreeMap<Text, Text>());
  651.     }
  652.     
  653.     public Metadata(TreeMap<Text, Text> arg) {
  654.       if (arg == null) {
  655.         this.theMetadata = new TreeMap<Text, Text>();
  656.       } else {
  657.         this.theMetadata = arg;
  658.       }
  659.     }
  660.     
  661.     public Text get(Text name) {
  662.       return this.theMetadata.get(name);
  663.     }
  664.     
  665.     public void set(Text name, Text value) {
  666.       this.theMetadata.put(name, value);
  667.     }
  668.     
  669.     public TreeMap<Text, Text> getMetadata() {
  670.       return new TreeMap<Text, Text>(this.theMetadata);
  671.     }
  672.     
  673.     public void write(DataOutput out) throws IOException {
  674.       out.writeInt(this.theMetadata.size());
  675.       Iterator<Map.Entry<Text, Text>> iter =
  676.         this.theMetadata.entrySet().iterator();
  677.       while (iter.hasNext()) {
  678.         Map.Entry<Text, Text> en = iter.next();
  679.         en.getKey().write(out);
  680.         en.getValue().write(out);
  681.       }
  682.     }
  683.     public void readFields(DataInput in) throws IOException {
  684.       int sz = in.readInt();
  685.       if (sz < 0) throw new IOException("Invalid size: " + sz + " for file metadata object");
  686.       this.theMetadata = new TreeMap<Text, Text>();
  687.       for (int i = 0; i < sz; i++) {
  688.         Text key = new Text();
  689.         Text val = new Text();
  690.         key.readFields(in);
  691.         val.readFields(in);
  692.         this.theMetadata.put(key, val);
  693.       }    
  694.     }
  695.     
  696.     public boolean equals(Metadata other) {
  697.       if (other == null) return false;
  698.       if (this.theMetadata.size() != other.theMetadata.size()) {
  699.         return false;
  700.       }
  701.       Iterator<Map.Entry<Text, Text>> iter1 =
  702.         this.theMetadata.entrySet().iterator();
  703.       Iterator<Map.Entry<Text, Text>> iter2 =
  704.         other.theMetadata.entrySet().iterator();
  705.       while (iter1.hasNext() && iter2.hasNext()) {
  706.         Map.Entry<Text, Text> en1 = iter1.next();
  707.         Map.Entry<Text, Text> en2 = iter2.next();
  708.         if (!en1.getKey().equals(en2.getKey())) {
  709.           return false;
  710.         }
  711.         if (!en1.getValue().equals(en2.getValue())) {
  712.           return false;
  713.         }
  714.       }
  715.       if (iter1.hasNext() || iter2.hasNext()) {
  716.         return false;
  717.       }
  718.       return true;
  719.     }
  720.     public int hashCode() {
  721.       assert false : "hashCode not designed";
  722.       return 42; // any arbitrary constant will do 
  723.     }
  724.     
  725.     public String toString() {
  726.       StringBuffer sb = new StringBuffer();
  727.       sb.append("size: ").append(this.theMetadata.size()).append("n");
  728.       Iterator<Map.Entry<Text, Text>> iter =
  729.         this.theMetadata.entrySet().iterator();
  730.       while (iter.hasNext()) {
  731.         Map.Entry<Text, Text> en = iter.next();
  732.         sb.append("t").append(en.getKey().toString()).append("t").append(en.getValue().toString());
  733.         sb.append("n");
  734.       }
  735.       return sb.toString();
  736.     }
  737.   }
  738.   
  739.   /** Write key/value pairs to a sequence-format file. */
  740.   public static class Writer implements java.io.Closeable {
  741.     Configuration conf;
  742.     FSDataOutputStream out;
  743.     boolean ownOutputStream = true;
  744.     DataOutputBuffer buffer = new DataOutputBuffer();
  745.     Class keyClass;
  746.     Class valClass;
  747.     private boolean compress;
  748.     CompressionCodec codec = null;
  749.     CompressionOutputStream deflateFilter = null;
  750.     DataOutputStream deflateOut = null;
  751.     Metadata metadata = null;
  752.     Compressor compressor = null;
  753.     
  754.     protected Serializer keySerializer;
  755.     protected Serializer uncompressedValSerializer;
  756.     protected Serializer compressedValSerializer;
  757.     
  758.     // Insert a globally unique 16-byte value every few entries, so that one
  759.     // can seek into the middle of a file and then synchronize with record
  760.     // starts and ends by scanning for this value.
  761.     long lastSyncPos;                     // position of last sync
  762.     byte[] sync;                          // 16 random bytes
  763.     {
  764.       try {                                       
  765.         MessageDigest digester = MessageDigest.getInstance("MD5");
  766.         long time = System.currentTimeMillis();
  767.         digester.update((new UID()+"@"+time).getBytes());
  768.         sync = digester.digest();
  769.       } catch (Exception e) {
  770.         throw new RuntimeException(e);
  771.       }
  772.     }
  773.     /** Implicit constructor: needed for the period of transition!*/
  774.     Writer()
  775.     {}
  776.     
  777.     /** Create the named file. */
  778.     public Writer(FileSystem fs, Configuration conf, Path name, 
  779.                   Class keyClass, Class valClass)
  780.       throws IOException {
  781.       this(fs, conf, name, keyClass, valClass, null, new Metadata());
  782.     }
  783.     
  784.     /** Create the named file with write-progress reporter. */
  785.     public Writer(FileSystem fs, Configuration conf, Path name, 
  786.                   Class keyClass, Class valClass,
  787.                   Progressable progress, Metadata metadata)
  788.       throws IOException {
  789.       this(fs, conf, name, keyClass, valClass,
  790.            fs.getConf().getInt("io.file.buffer.size", 4096),
  791.            fs.getDefaultReplication(), fs.getDefaultBlockSize(),
  792.            progress, metadata);
  793.     }
  794.     
  795.     /** Create the named file with write-progress reporter. */
  796.     public Writer(FileSystem fs, Configuration conf, Path name,
  797.                   Class keyClass, Class valClass,
  798.                   int bufferSize, short replication, long blockSize,
  799.                   Progressable progress, Metadata metadata)
  800.       throws IOException {
  801.       init(name, conf,
  802.            fs.create(name, true, bufferSize, replication, blockSize, progress),
  803.               keyClass, valClass, false, null, metadata);
  804.       initializeFileHeader();
  805.       writeFileHeader();
  806.       finalizeFileHeader();
  807.     }
  808.     /** Write to an arbitrary stream using a specified buffer size. */
  809.     private Writer(Configuration conf, FSDataOutputStream out, 
  810.                    Class keyClass, Class valClass, Metadata metadata)
  811.       throws IOException {
  812.       this.ownOutputStream = false;
  813.       init(null, conf, out, keyClass, valClass, false, null, metadata);
  814.       
  815.       initializeFileHeader();
  816.       writeFileHeader();
  817.       finalizeFileHeader();
  818.     }
  819.     /** Write the initial part of file header. */
  820.     void initializeFileHeader() 
  821.       throws IOException{
  822.       out.write(VERSION);
  823.     }
  824.     /** Write the final part of file header. */
  825.     void finalizeFileHeader() 
  826.       throws IOException{
  827.       out.write(sync);                       // write the sync bytes
  828.       out.flush();                           // flush header
  829.     }
  830.     
  831.     boolean isCompressed() { return compress; }
  832.     boolean isBlockCompressed() { return false; }
  833.     
  834.     /** Write and flush the file header. */
  835.     void writeFileHeader() 
  836.       throws IOException {
  837.       Text.writeString(out, keyClass.getName());
  838.       Text.writeString(out, valClass.getName());
  839.       
  840.       out.writeBoolean(this.isCompressed());
  841.       out.writeBoolean(this.isBlockCompressed());
  842.       
  843.       if (this.isCompressed()) {
  844.         Text.writeString(out, (codec.getClass()).getName());
  845.       }
  846.       this.metadata.write(out);
  847.     }
  848.     
  849.     /** Initialize. */
  850.     @SuppressWarnings("unchecked")
  851.     void init(Path name, Configuration conf, FSDataOutputStream out,
  852.               Class keyClass, Class valClass,
  853.               boolean compress, CompressionCodec codec, Metadata metadata) 
  854.       throws IOException {
  855.       this.conf = conf;
  856.       this.out = out;
  857.       this.keyClass = keyClass;
  858.       this.valClass = valClass;
  859.       this.compress = compress;
  860.       this.codec = codec;
  861.       this.metadata = metadata;
  862.       SerializationFactory serializationFactory = new SerializationFactory(conf);
  863.       this.keySerializer = serializationFactory.getSerializer(keyClass);
  864.       this.keySerializer.open(buffer);
  865.       this.uncompressedValSerializer = serializationFactory.getSerializer(valClass);
  866.       this.uncompressedValSerializer.open(buffer);
  867.       if (this.codec != null) {
  868.         ReflectionUtils.setConf(this.codec, this.conf);
  869.         this.compressor = CodecPool.getCompressor(this.codec);
  870.         this.deflateFilter = this.codec.createOutputStream(buffer, compressor);
  871.         this.deflateOut = 
  872.           new DataOutputStream(new BufferedOutputStream(deflateFilter));
  873.         this.compressedValSerializer = serializationFactory.getSerializer(valClass);
  874.         this.compressedValSerializer.open(deflateOut);
  875.       }
  876.     }
  877.     
  878.     /** Returns the class of keys in this file. */
  879.     public Class getKeyClass() { return keyClass; }
  880.     /** Returns the class of values in this file. */
  881.     public Class getValueClass() { return valClass; }
  882.     /** Returns the compression codec of data in this file. */
  883.     public CompressionCodec getCompressionCodec() { return codec; }
  884.     
  885.     /** create a sync point */
  886.     public void sync() throws IOException {
  887.       if (sync != null && lastSyncPos != out.getPos()) {
  888.         out.writeInt(SYNC_ESCAPE);                // mark the start of the sync
  889.         out.write(sync);                          // write sync
  890.         lastSyncPos = out.getPos();               // update lastSyncPos
  891.       }
  892.     }
  893.     /** Returns the configuration of this file. */
  894.     Configuration getConf() { return conf; }
  895.     
  896.     /** Close the file. */
  897.     public synchronized void close() throws IOException {
  898.       keySerializer.close();
  899.       uncompressedValSerializer.close();
  900.       if (compressedValSerializer != null) {
  901.         compressedValSerializer.close();
  902.       }
  903.       CodecPool.returnCompressor(compressor);
  904.       compressor = null;
  905.       
  906.       if (out != null) {
  907.         
  908.         // Close the underlying stream iff we own it...
  909.         if (ownOutputStream) {
  910.           out.close();
  911.         } else {
  912.           out.flush();
  913.         }
  914.         out = null;
  915.       }
  916.     }
  917.     synchronized void checkAndWriteSync() throws IOException {
  918.       if (sync != null &&
  919.           out.getPos() >= lastSyncPos+SYNC_INTERVAL) { // time to emit sync
  920.         sync();
  921.       }
  922.     }
  923.     /** Append a key/value pair. */
  924.     public synchronized void append(Writable key, Writable val)
  925.       throws IOException {
  926.       append((Object) key, (Object) val);
  927.     }
  928.     /** Append a key/value pair. */
  929.     @SuppressWarnings("unchecked")
  930.     public synchronized void append(Object key, Object val)
  931.       throws IOException {
  932.       if (key.getClass() != keyClass)
  933.         throw new IOException("wrong key class: "+key.getClass().getName()
  934.                               +" is not "+keyClass);
  935.       if (val.getClass() != valClass)
  936.         throw new IOException("wrong value class: "+val.getClass().getName()
  937.                               +" is not "+valClass);
  938.       buffer.reset();
  939.       // Append the 'key'
  940.       keySerializer.serialize(key);
  941.       int keyLength = buffer.getLength();
  942.       if (keyLength < 0)
  943.         throw new IOException("negative length keys not allowed: " + key);
  944.       // Append the 'value'
  945.       if (compress) {
  946.         deflateFilter.resetState();
  947.         compressedValSerializer.serialize(val);
  948.         deflateOut.flush();
  949.         deflateFilter.finish();
  950.       } else {
  951.         uncompressedValSerializer.serialize(val);
  952.       }
  953.       // Write the record out
  954.       checkAndWriteSync();                                // sync
  955.       out.writeInt(buffer.getLength());                   // total record length
  956.       out.writeInt(keyLength);                            // key portion length
  957.       out.write(buffer.getData(), 0, buffer.getLength()); // data
  958.     }
  959.     public synchronized void appendRaw(byte[] keyData, int keyOffset,
  960.         int keyLength, ValueBytes val) throws IOException {
  961.       if (keyLength < 0)
  962.         throw new IOException("negative length keys not allowed: " + keyLength);
  963.       int valLength = val.getSize();
  964.       checkAndWriteSync();
  965.       
  966.       out.writeInt(keyLength+valLength);          // total record length
  967.       out.writeInt(keyLength);                    // key portion length
  968.       out.write(keyData, keyOffset, keyLength);   // key
  969.       val.writeUncompressedBytes(out);            // value
  970.     }
  971.     /** Returns the current length of the output file.
  972.      *
  973.      * <p>This always returns a synchronized position.  In other words,
  974.      * immediately after calling {@link SequenceFile.Reader#seek(long)} with a position
  975.      * returned by this method, {@link SequenceFile.Reader#next(Writable)} may be called.  However
  976.      * the key may be earlier in the file than key last written when this
  977.      * method was called (e.g., with block-compression, it may be the first key
  978.      * in the block that was being written when this method was called).
  979.      */
  980.     public synchronized long getLength() throws IOException {
  981.       return out.getPos();
  982.     }
  983.   } // class Writer
  984.   /** Write key/compressed-value pairs to a sequence-format file. */
  985.   static class RecordCompressWriter extends Writer {
  986.     
  987.     /** Create the named file. */
  988.     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
  989.                                 Class keyClass, Class valClass, CompressionCodec codec) 
  990.       throws IOException {
  991.       this(conf, fs.create(name), keyClass, valClass, codec, new Metadata());
  992.     }
  993.     
  994.     /** Create the named file with write-progress reporter. */
  995.     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
  996.                                 Class keyClass, Class valClass, CompressionCodec codec,
  997.                                 Progressable progress, Metadata metadata)
  998.       throws IOException {
  999.       this(fs, conf, name, keyClass, valClass,
  1000.            fs.getConf().getInt("io.file.buffer.size", 4096),
  1001.            fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
  1002.            progress, metadata);
  1003.     }
  1004.     /** Create the named file with write-progress reporter. */
  1005.     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name,
  1006.                                 Class keyClass, Class valClass,
  1007.                                 int bufferSize, short replication, long blockSize,
  1008.                                 CompressionCodec codec,
  1009.                                 Progressable progress, Metadata metadata)
  1010.       throws IOException {
  1011.       super.init(name, conf,
  1012.                  fs.create(name, true, bufferSize, replication, blockSize, progress),
  1013.                  keyClass, valClass, true, codec, metadata);
  1014.       initializeFileHeader();
  1015.       writeFileHeader();
  1016.       finalizeFileHeader();
  1017.     }
  1018.     /** Create the named file with write-progress reporter. */
  1019.     public RecordCompressWriter(FileSystem fs, Configuration conf, Path name, 
  1020.                                 Class keyClass, Class valClass, CompressionCodec codec,
  1021.                                 Progressable progress)
  1022.       throws IOException {
  1023.       this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
  1024.     }
  1025.     
  1026.     /** Write to an arbitrary stream using a specified buffer size. */
  1027.     private RecordCompressWriter(Configuration conf, FSDataOutputStream out,
  1028.                                  Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
  1029.       throws IOException {
  1030.       this.ownOutputStream = false;
  1031.       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
  1032.       
  1033.       initializeFileHeader();
  1034.       writeFileHeader();
  1035.       finalizeFileHeader();
  1036.       
  1037.     }
  1038.     
  1039.     boolean isCompressed() { return true; }
  1040.     boolean isBlockCompressed() { return false; }
  1041.     /** Append a key/value pair. */
  1042.     @SuppressWarnings("unchecked")
  1043.     public synchronized void append(Object key, Object val)
  1044.       throws IOException {
  1045.       if (key.getClass() != keyClass)
  1046.         throw new IOException("wrong key class: "+key.getClass().getName()
  1047.                               +" is not "+keyClass);
  1048.       if (val.getClass() != valClass)
  1049.         throw new IOException("wrong value class: "+val.getClass().getName()
  1050.                               +" is not "+valClass);
  1051.       buffer.reset();
  1052.       // Append the 'key'
  1053.       keySerializer.serialize(key);
  1054.       int keyLength = buffer.getLength();
  1055.       if (keyLength < 0)
  1056.         throw new IOException("negative length keys not allowed: " + key);
  1057.       // Compress 'value' and append it
  1058.       deflateFilter.resetState();
  1059.       compressedValSerializer.serialize(val);
  1060.       deflateOut.flush();
  1061.       deflateFilter.finish();
  1062.       // Write the record out
  1063.       checkAndWriteSync();                                // sync
  1064.       out.writeInt(buffer.getLength());                   // total record length
  1065.       out.writeInt(keyLength);                            // key portion length
  1066.       out.write(buffer.getData(), 0, buffer.getLength()); // data
  1067.     }
  1068.     /** Append a key/value pair. */
  1069.     public synchronized void appendRaw(byte[] keyData, int keyOffset,
  1070.         int keyLength, ValueBytes val) throws IOException {
  1071.       if (keyLength < 0)
  1072.         throw new IOException("negative length keys not allowed: " + keyLength);
  1073.       int valLength = val.getSize();
  1074.       
  1075.       checkAndWriteSync();                        // sync
  1076.       out.writeInt(keyLength+valLength);          // total record length
  1077.       out.writeInt(keyLength);                    // key portion length
  1078.       out.write(keyData, keyOffset, keyLength);   // 'key' data
  1079.       val.writeCompressedBytes(out);              // 'value' data
  1080.     }
  1081.     
  1082.   } // RecordCompressionWriter
  1083.   /** Write compressed key/value blocks to a sequence-format file. */
  1084.   static class BlockCompressWriter extends Writer {
  1085.     
  1086.     private int noBufferedRecords = 0;
  1087.     
  1088.     private DataOutputBuffer keyLenBuffer = new DataOutputBuffer();
  1089.     private DataOutputBuffer keyBuffer = new DataOutputBuffer();
  1090.     private DataOutputBuffer valLenBuffer = new DataOutputBuffer();
  1091.     private DataOutputBuffer valBuffer = new DataOutputBuffer();
  1092.     private int compressionBlockSize;
  1093.     
  1094.     /** Create the named file. */
  1095.     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
  1096.                                Class keyClass, Class valClass, CompressionCodec codec) 
  1097.       throws IOException {
  1098.       this(fs, conf, name, keyClass, valClass,
  1099.            fs.getConf().getInt("io.file.buffer.size", 4096),
  1100.            fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
  1101.            null, new Metadata());
  1102.     }
  1103.     
  1104.     /** Create the named file with write-progress reporter. */
  1105.     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
  1106.                                Class keyClass, Class valClass, CompressionCodec codec,
  1107.                                Progressable progress, Metadata metadata)
  1108.       throws IOException {
  1109.       this(fs, conf, name, keyClass, valClass,
  1110.            fs.getConf().getInt("io.file.buffer.size", 4096),
  1111.            fs.getDefaultReplication(), fs.getDefaultBlockSize(), codec,
  1112.            progress, metadata);
  1113.     }
  1114.     /** Create the named file with write-progress reporter. */
  1115.     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name,
  1116.                                Class keyClass, Class valClass,
  1117.                                int bufferSize, short replication, long blockSize,
  1118.                                CompressionCodec codec,
  1119.                                Progressable progress, Metadata metadata)
  1120.       throws IOException {
  1121.       super.init(name, conf,
  1122.                  fs.create(name, true, bufferSize, replication, blockSize, progress),
  1123.                  keyClass, valClass, true, codec, metadata);
  1124.       init(conf.getInt("io.seqfile.compress.blocksize", 1000000));
  1125.       initializeFileHeader();
  1126.       writeFileHeader();
  1127.       finalizeFileHeader();
  1128.     }
  1129.     /** Create the named file with write-progress reporter. */
  1130.     public BlockCompressWriter(FileSystem fs, Configuration conf, Path name, 
  1131.                                Class keyClass, Class valClass, CompressionCodec codec,
  1132.                                Progressable progress)
  1133.       throws IOException {
  1134.       this(fs, conf, name, keyClass, valClass, codec, progress, new Metadata());
  1135.     }
  1136.     
  1137.     /** Write to an arbitrary stream using a specified buffer size. */
  1138.     private BlockCompressWriter(Configuration conf, FSDataOutputStream out,
  1139.                                 Class keyClass, Class valClass, CompressionCodec codec, Metadata metadata)
  1140.       throws IOException {
  1141.       this.ownOutputStream = false;
  1142.       super.init(null, conf, out, keyClass, valClass, true, codec, metadata);
  1143.       init(1000000);
  1144.       
  1145.       initializeFileHeader();
  1146.       writeFileHeader();
  1147.       finalizeFileHeader();
  1148.     }
  1149.     
  1150.     boolean isCompressed() { return true; }
  1151.     boolean isBlockCompressed() { return true; }
  1152.     /** Initialize */
  1153.     void init(int compressionBlockSize) throws IOException {
  1154.       this.compressionBlockSize = compressionBlockSize;
  1155.       keySerializer.close();
  1156.       keySerializer.open(keyBuffer);
  1157.       uncompressedValSerializer.close();
  1158.       uncompressedValSerializer.open(valBuffer);
  1159.     }
  1160.     
  1161.     /** Workhorse to check and write out compressed data/lengths */
  1162.     private synchronized 
  1163.       void writeBuffer(DataOutputBuffer uncompressedDataBuffer) 
  1164.       throws IOException {
  1165.       deflateFilter.resetState();
  1166.       buffer.reset();
  1167.       deflateOut.write(uncompressedDataBuffer.getData(), 0, 
  1168.                        uncompressedDataBuffer.getLength());
  1169.       deflateOut.flush();
  1170.       deflateFilter.finish();
  1171.       
  1172.       WritableUtils.writeVInt(out, buffer.getLength());
  1173.       out.write(buffer.getData(), 0, buffer.getLength());
  1174.     }
  1175.     
  1176.     /** Compress and flush contents to dfs */
  1177.     public synchronized void sync() throws IOException {
  1178.       if (noBufferedRecords > 0) {
  1179.         super.sync();
  1180.         
  1181.         // No. of records
  1182.         WritableUtils.writeVInt(out, noBufferedRecords);
  1183.         
  1184.         // Write 'keys' and lengths
  1185.         writeBuffer(keyLenBuffer);
  1186.         writeBuffer(keyBuffer);
  1187.         
  1188.         // Write 'values' and lengths
  1189.         writeBuffer(valLenBuffer);
  1190.         writeBuffer(valBuffer);
  1191.         
  1192.         // Flush the file-stream
  1193.         out.flush();
  1194.         
  1195.         // Reset internal states
  1196.         keyLenBuffer.reset();
  1197.         keyBuffer.reset();
  1198.         valLenBuffer.reset();
  1199.         valBuffer.reset();
  1200.         noBufferedRecords = 0;
  1201.       }
  1202.       
  1203.     }
  1204.     
  1205.     /** Close the file. */
  1206.     public synchronized void close() throws IOException {
  1207.       if (out != null) {
  1208.         sync();
  1209.       }
  1210.       super.close();
  1211.     }
  1212.     /** Append a key/value pair. */
  1213.     @SuppressWarnings("unchecked")
  1214.     public synchronized void append(Object key, Object val)
  1215.       throws IOException {
  1216.       if (key.getClass() != keyClass)
  1217.         throw new IOException("wrong key class: "+key+" is not "+keyClass);
  1218.       if (val.getClass() != valClass)
  1219.         throw new IOException("wrong value class: "+val+" is not "+valClass);
  1220.       // Save key/value into respective buffers 
  1221.       int oldKeyLength = keyBuffer.getLength();
  1222.       keySerializer.serialize(key);
  1223.       int keyLength = keyBuffer.getLength() - oldKeyLength;
  1224.       if (keyLength < 0)
  1225.         throw new IOException("negative length keys not allowed: " + key);
  1226.       WritableUtils.writeVInt(keyLenBuffer, keyLength);
  1227.       int oldValLength = valBuffer.getLength();
  1228.       uncompressedValSerializer.serialize(val);
  1229.       int valLength = valBuffer.getLength() - oldValLength;
  1230.       WritableUtils.writeVInt(valLenBuffer, valLength);
  1231.       
  1232.       // Added another key/value pair
  1233.       ++noBufferedRecords;
  1234.       
  1235.       // Compress and flush?
  1236.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength();
  1237.       if (currentBlockSize >= compressionBlockSize) {
  1238.         sync();
  1239.       }
  1240.     }
  1241.     
  1242.     /** Append a key/value pair. */
  1243.     public synchronized void appendRaw(byte[] keyData, int keyOffset,
  1244.         int keyLength, ValueBytes val) throws IOException {
  1245.       
  1246.       if (keyLength < 0)
  1247.         throw new IOException("negative length keys not allowed");
  1248.       int valLength = val.getSize();
  1249.       
  1250.       // Save key/value data in relevant buffers
  1251.       WritableUtils.writeVInt(keyLenBuffer, keyLength);
  1252.       keyBuffer.write(keyData, keyOffset, keyLength);
  1253.       WritableUtils.writeVInt(valLenBuffer, valLength);
  1254.       val.writeUncompressedBytes(valBuffer);
  1255.       // Added another key/value pair
  1256.       ++noBufferedRecords;
  1257.       // Compress and flush?
  1258.       int currentBlockSize = keyBuffer.getLength() + valBuffer.getLength(); 
  1259.       if (currentBlockSize >= compressionBlockSize) {
  1260.         sync();
  1261.       }
  1262.     }
  1263.   
  1264.   } // BlockCompressionWriter
  1265.   
  1266.   /** Reads key/value pairs from a sequence-format file. */
  1267.   public static class Reader implements java.io.Closeable {
  1268.     private Path file;
  1269.     private FSDataInputStream in;
  1270.     private DataOutputBuffer outBuf = new DataOutputBuffer();
  1271.     private byte version;
  1272.     private String keyClassName;
  1273.     private String valClassName;
  1274.     private Class keyClass;
  1275.     private Class valClass;
  1276.     private CompressionCodec codec = null;
  1277.     private Metadata metadata = null;
  1278.     
  1279.     private byte[] sync = new byte[SYNC_HASH_SIZE];
  1280.     private byte[] syncCheck = new byte[SYNC_HASH_SIZE];
  1281.     private boolean syncSeen;
  1282.     private long end;
  1283.     private int keyLength;
  1284.     private int recordLength;
  1285.     private boolean decompress;
  1286.     private boolean blockCompressed;
  1287.     
  1288.     private Configuration conf;
  1289.     private int noBufferedRecords = 0;
  1290.     private boolean lazyDecompress = true;
  1291.     private boolean valuesDecompressed = true;
  1292.     
  1293.     private int noBufferedKeys = 0;
  1294.     private int noBufferedValues = 0;
  1295.     
  1296.     private DataInputBuffer keyLenBuffer = null;
  1297.     private CompressionInputStream keyLenInFilter = null;
  1298.     private DataInputStream keyLenIn = null;
  1299.     private Decompressor keyLenDecompressor = null;
  1300.     private DataInputBuffer keyBuffer = null;
  1301.     private CompressionInputStream keyInFilter = null;
  1302.     private DataInputStream keyIn = null;
  1303.     private Decompressor keyDecompressor = null;
  1304.     private DataInputBuffer valLenBuffer = null;
  1305.     private CompressionInputStream valLenInFilter = null;
  1306.     private DataInputStream valLenIn = null;
  1307.     private Decompressor valLenDecompressor = null;
  1308.     private DataInputBuffer valBuffer = null;
  1309.     private CompressionInputStream valInFilter = null;
  1310.     private DataInputStream valIn = null;
  1311.     private Decompressor valDecompressor = null;
  1312.     
  1313.     private Deserializer keyDeserializer;
  1314.     private Deserializer valDeserializer;
  1315.     /** Open the named file. */
  1316.     public Reader(FileSystem fs, Path file, Configuration conf)
  1317.       throws IOException {
  1318.       this(fs, file, conf.getInt("io.file.buffer.size", 4096), conf, false);
  1319.     }
  1320.     private Reader(FileSystem fs, Path file, int bufferSize,
  1321.                    Configuration conf, boolean tempReader) throws IOException {
  1322.       this(fs, file, bufferSize, 0, fs.getLength(file), conf, tempReader);
  1323.     }
  1324.     
  1325.     private Reader(FileSystem fs, Path file, int bufferSize, long start,
  1326.                    long length, Configuration conf, boolean tempReader) 
  1327.     throws IOException {
  1328.       this.file = file;
  1329.       this.in = openFile(fs, file, bufferSize, length);
  1330.       this.conf = conf;
  1331.       seek(start);
  1332.       this.end = in.getPos() + length;
  1333.       init(tempReader);
  1334.     }
  1335.     /**
  1336.      * Override this method to specialize the type of
  1337.      * {@link FSDataInputStream} returned.
  1338.      */
  1339.     protected FSDataInputStream openFile(FileSystem fs, Path file,
  1340.         int bufferSize, long length) throws IOException {
  1341.       return fs.open(file, bufferSize);
  1342.     }
  1343.     
  1344.     /**
  1345.      * Initialize the {@link Reader}
  1346.      * @param tmpReader <code>true</code> if we are constructing a temporary
  1347.      *                  reader {@link SequenceFile.Sorter.cloneFileAttributes}, 
  1348.      *                  and hence do not initialize every component; 
  1349.      *                  <code>false</code> otherwise.
  1350.      * @throws IOException
  1351.      */
  1352.     private void init(boolean tempReader) throws IOException {
  1353.       byte[] versionBlock = new byte[VERSION.length];
  1354.       in.readFully(versionBlock);
  1355.       if ((versionBlock[0] != VERSION[0]) ||
  1356.           (versionBlock[1] != VERSION[1]) ||
  1357.           (versionBlock[2] != VERSION[2]))
  1358.         throw new IOException(file + " not a SequenceFile");
  1359.       // Set 'version'
  1360.       version = versionBlock[3];
  1361.       if (version > VERSION[3])
  1362.         throw new VersionMismatchException(VERSION[3], version);
  1363.       if (version < BLOCK_COMPRESS_VERSION) {
  1364.         UTF8 className = new UTF8();
  1365.         className.readFields(in);
  1366.         keyClassName = className.toString(); // key class name
  1367.         className.readFields(in);
  1368.         valClassName = className.toString(); // val class name
  1369.       } else {
  1370.         keyClassName = Text.readString(in);
  1371.         valClassName = Text.readString(in);
  1372.       }
  1373.       if (version > 2) {                          // if version > 2
  1374.         this.decompress = in.readBoolean();       // is compressed?
  1375.       } else {
  1376.         decompress = false;
  1377.       }
  1378.       if (version >= BLOCK_COMPRESS_VERSION) {    // if version >= 4
  1379.         this.blockCompressed = in.readBoolean();  // is block-compressed?
  1380.       } else {
  1381.         blockCompressed = false;
  1382.       }
  1383.       
  1384.       // if version >= 5
  1385.       // setup the compression codec
  1386.       if (decompress) {
  1387.         if (version >= CUSTOM_COMPRESS_VERSION) {
  1388.           String codecClassname = Text.readString(in);
  1389.           try {
  1390.             Class<? extends CompressionCodec> codecClass
  1391.               = conf.getClassByName(codecClassname).asSubclass(CompressionCodec.class);
  1392.             this.codec = ReflectionUtils.newInstance(codecClass, conf);
  1393.           } catch (ClassNotFoundException cnfe) {
  1394.             throw new IllegalArgumentException("Unknown codec: " + 
  1395.                                                codecClassname, cnfe);
  1396.           }
  1397.         } else {
  1398.           codec = new DefaultCodec();
  1399.           ((Configurable)codec).setConf(conf);
  1400.         }
  1401.       }
  1402.       
  1403.       this.metadata = new Metadata();
  1404.       if (version >= VERSION_WITH_METADATA) {    // if version >= 6
  1405.         this.metadata.readFields(in);
  1406.       }
  1407.       
  1408.       if (version > 1) {                          // if version > 1
  1409.         in.readFully(sync);                       // read sync bytes
  1410.       }
  1411.       
  1412.       // Initialize... *not* if this we are constructing a temporary Reader
  1413.       if (!tempReader) {
  1414.         valBuffer = new DataInputBuffer();
  1415.         if (decompress) {
  1416.           valDecompressor = CodecPool.getDecompressor(codec);
  1417.           valInFilter = codec.createInputStream(valBuffer, valDecompressor);
  1418.           valIn = new DataInputStream(valInFilter);
  1419.         } else {
  1420.           valIn = valBuffer;
  1421.         }
  1422.         if (blockCompressed) {
  1423.           keyLenBuffer = new DataInputBuffer();
  1424.           keyBuffer = new DataInputBuffer();
  1425.           valLenBuffer = new DataInputBuffer();
  1426.           keyLenDecompressor = CodecPool.getDecompressor(codec);
  1427.           keyLenInFilter = codec.createInputStream(keyLenBuffer, 
  1428.                                                    keyLenDecompressor);
  1429.           keyLenIn = new DataInputStream(keyLenInFilter);
  1430.           keyDecompressor = CodecPool.getDecompressor(codec);
  1431.           keyInFilter = codec.createInputStream(keyBuffer, keyDecompressor);
  1432.           keyIn = new DataInputStream(keyInFilter);
  1433.           valLenDecompressor = CodecPool.getDecompressor(codec);
  1434.           valLenInFilter = codec.createInputStream(valLenBuffer, 
  1435.                                                    valLenDecompressor);
  1436.           valLenIn = new DataInputStream(valLenInFilter);
  1437.         }
  1438.         
  1439.         SerializationFactory serializationFactory =
  1440.           new SerializationFactory(conf);
  1441.         this.keyDeserializer =
  1442.           getDeserializer(serializationFactory, getKeyClass());
  1443.         if (!blockCompressed) {
  1444.           this.keyDeserializer.open(valBuffer);
  1445.         } else {
  1446.           this.keyDeserializer.open(keyIn);
  1447.         }
  1448.         this.valDeserializer =
  1449.           getDeserializer(serializationFactory, getValueClass());
  1450.         this.valDeserializer.open(valIn);
  1451.       }
  1452.     }
  1453.     
  1454.     @SuppressWarnings("unchecked")
  1455.     private Deserializer getDeserializer(SerializationFactory sf, Class c) {
  1456.       return sf.getDeserializer(c);
  1457.     }
  1458.     
  1459.     /** Close the file. */
  1460.     public synchronized void close() throws IOException {
  1461.       // Return the decompressors to the pool
  1462.       CodecPool.returnDecompressor(keyLenDecompressor);
  1463.       CodecPool.returnDecompressor(keyDecompressor);
  1464.       CodecPool.returnDecompressor(valLenDecompressor);
  1465.       CodecPool.returnDecompressor(valDecompressor);
  1466.       keyLenDecompressor = keyDecompressor = null;
  1467.       valLenDecompressor = valDecompressor = null;
  1468.       
  1469.       if (keyDeserializer != null) {
  1470.      keyDeserializer.close();
  1471.       }
  1472.       if (valDeserializer != null) {
  1473.         valDeserializer.close();
  1474.       }
  1475.       
  1476.       // Close the input-stream
  1477.       in.close();
  1478.     }
  1479.     /** Returns the name of the key class. */
  1480.     public String getKeyClassName() {
  1481.       return keyClassName;
  1482.     }
  1483.     /** Returns the class of keys in this file. */
  1484.     public synchronized Class<?> getKeyClass() {
  1485.       if (null == keyClass) {
  1486.         try {
  1487.           keyClass = WritableName.getClass(getKeyClassName(), conf);
  1488.         } catch (IOException e) {
  1489.           throw new RuntimeException(e);
  1490.         }
  1491.       }
  1492.       return keyClass;
  1493.     }
  1494.     /** Returns the name of the value class. */
  1495.     public String getValueClassName() {
  1496.       return valClassName;
  1497.     }
  1498.     /** Returns the class of values in this file. */
  1499.     public synchronized Class<?> getValueClass() {
  1500.       if (null == valClass) {
  1501.         try {
  1502.           valClass = WritableName.getClass(getValueClassName(), conf);
  1503.         } catch (IOException e) {
  1504.           throw new RuntimeException(e);
  1505.         }
  1506.       }
  1507.       return valClass;
  1508.     }
  1509.     /** Returns true if values are compressed. */
  1510.     public boolean isCompressed() { return decompress; }
  1511.     
  1512.     /** Returns true if records are block-compressed. */
  1513.     public boolean isBlockCompressed() { return blockCompressed; }
  1514.     
  1515.     /** Returns the compression codec of data in this file. */
  1516.     public CompressionCodec getCompressionCodec() { return codec; }
  1517.     /** Returns the metadata object of the file */
  1518.     public Metadata getMetadata() {
  1519.       return this.metadata;
  1520.     }
  1521.     
  1522.     /** Returns the configuration used for this file. */
  1523.     Configuration getConf() { return conf; }
  1524.     
  1525.     /** Read a compressed buffer */
  1526.     private synchronized void readBuffer(DataInputBuffer buffer, 
  1527.                                          CompressionInputStream filter) throws IOException {
  1528.       // Read data into a temporary buffer
  1529.       DataOutputBuffer dataBuffer = new DataOutputBuffer();
  1530.       try {
  1531.         int dataBufferLength = WritableUtils.readVInt(in);
  1532.         dataBuffer.write(in, dataBufferLength);
  1533.       
  1534.         // Set up 'buffer' connected to the input-stream
  1535.         buffer.reset(dataBuffer.getData(), 0, dataBuffer.getLength());
  1536.       } finally {
  1537.         dataBuffer.close();
  1538.       }
  1539.       // Reset the codec
  1540.       filter.resetState();
  1541.     }
  1542.     
  1543.     /** Read the next 'compressed' block */
  1544.     private synchronized void readBlock() throws IOException {
  1545.       // Check if we need to throw away a whole block of 
  1546.       // 'values' due to 'lazy decompression' 
  1547.       if (lazyDecompress && !valuesDecompressed) {
  1548.         in.seek(WritableUtils.readVInt(in)+in.getPos());
  1549.         in.seek(WritableUtils.readVInt(in)+in.getPos());
  1550.       }
  1551.       
  1552.       // Reset internal states
  1553.       noBufferedKeys = 0; noBufferedValues = 0; noBufferedRecords = 0;
  1554.       valuesDecompressed = false;
  1555.       //Process sync
  1556.       if (sync != null) {
  1557.         in.readInt();
  1558.         in.readFully(syncCheck);                // read syncCheck
  1559.         if (!Arrays.equals(sync, syncCheck))    // check it
  1560.           throw new IOException("File is corrupt!");
  1561.       }
  1562.       syncSeen = true;
  1563.       // Read number of records in this block
  1564.       noBufferedRecords = WritableUtils.readVInt(in);
  1565.       
  1566.       // Read key lengths and keys
  1567.       readBuffer(keyLenBuffer, keyLenInFilter);
  1568.       readBuffer(keyBuffer, keyInFilter);
  1569.       noBufferedKeys = noBufferedRecords;
  1570.       
  1571.       // Read value lengths and values
  1572.       if (!lazyDecompress) {
  1573.         readBuffer(valLenBuffer, valLenInFilter);
  1574.         readBuffer(valBuffer, valInFilter);
  1575.         noBufferedValues = noBufferedRecords;
  1576.         valuesDecompressed = true;
  1577.       }
  1578.     }
  1579.     /** 
  1580.      * Position valLenIn/valIn to the 'value' 
  1581.      * corresponding to the 'current' key 
  1582.      */
  1583.     private synchronized void seekToCurrentValue() throws IOException {
  1584.       if (!blockCompressed) {
  1585.         if (decompress) {
  1586.           valInFilter.resetState();
  1587.         }
  1588.         valBuffer.reset();
  1589.       } else {
  1590.         // Check if this is the first value in the 'block' to be read
  1591.         if (lazyDecompress && !valuesDecompressed) {
  1592.           // Read the value lengths and values
  1593.           readBuffer(valLenBuffer, valLenInFilter);
  1594.           readBuffer(valBuffer, valInFilter);
  1595.           noBufferedValues = noBufferedRecords;
  1596.           valuesDecompressed = true;
  1597.         }
  1598.         
  1599.         // Calculate the no. of bytes to skip
  1600.         // Note: 'current' key has already been read!
  1601.         int skipValBytes = 0;
  1602.         int currentKey = noBufferedKeys + 1;          
  1603.         for (int i=noBufferedValues; i > currentKey; --i) {
  1604.           skipValBytes += WritableUtils.readVInt(valLenIn);
  1605.           --noBufferedValues;
  1606.         }
  1607.         
  1608.         // Skip to the 'val' corresponding to 'current' key
  1609.         if (skipValBytes > 0) {
  1610.           if (valIn.skipBytes(skipValBytes) != skipValBytes) {
  1611.             throw new IOException("Failed to seek to " + currentKey + 
  1612.                                   "(th) value!");
  1613.           }
  1614.         }
  1615.       }
  1616.     }
  1617.     /**
  1618.      * Get the 'value' corresponding to the last read 'key'.
  1619.      * @param val : The 'value' to be read.
  1620.      * @throws IOException
  1621.      */
  1622.     public synchronized void getCurrentValue(Writable val) 
  1623.       throws IOException {
  1624.       if (val instanceof Configurable) {
  1625.         ((Configurable) val).setConf(this.conf);
  1626.       }
  1627.       // Position stream to 'current' value
  1628.       seekToCurrentValue();
  1629.       if (!blockCompressed) {
  1630.         val.readFields(valIn);
  1631.         
  1632.         if (valIn.read() > 0) {
  1633.           LOG.info("available bytes: " + valIn.available());
  1634.           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
  1635.                                 + " bytes, should read " +
  1636.                                 (valBuffer.getLength()-keyLength));
  1637.         }
  1638.       } else {
  1639.         // Get the value
  1640.         int valLength = WritableUtils.readVInt(valLenIn);
  1641.         val.readFields(valIn);
  1642.         
  1643.         // Read another compressed 'value'
  1644.         --noBufferedValues;
  1645.         
  1646.         // Sanity check
  1647.         if (valLength < 0) {
  1648.           LOG.debug(val + " is a zero-length value");
  1649.         }
  1650.       }
  1651.     }
  1652.     
  1653.     /**
  1654.      * Get the 'value' corresponding to the last read 'key'.
  1655.      * @param val : The 'value' to be read.
  1656.      * @throws IOException
  1657.      */
  1658.     public synchronized Object getCurrentValue(Object val) 
  1659.       throws IOException {
  1660.       if (val instanceof Configurable) {
  1661.         ((Configurable) val).setConf(this.conf);
  1662.       }
  1663.       // Position stream to 'current' value
  1664.       seekToCurrentValue();
  1665.       if (!blockCompressed) {
  1666.         val = deserializeValue(val);
  1667.         
  1668.         if (valIn.read() > 0) {
  1669.           LOG.info("available bytes: " + valIn.available());
  1670.           throw new IOException(val+" read "+(valBuffer.getPosition()-keyLength)
  1671.                                 + " bytes, should read " +
  1672.                                 (valBuffer.getLength()-keyLength));
  1673.         }
  1674.       } else {
  1675.         // Get the value
  1676.         int valLength = WritableUtils.readVInt(valLenIn);
  1677.         val = deserializeValue(val);
  1678.         
  1679.         // Read another compressed 'value'
  1680.         --noBufferedValues;
  1681.         
  1682.         // Sanity check
  1683.         if (valLength < 0) {
  1684.           LOG.debug(val + " is a zero-length value");
  1685.         }
  1686.       }
  1687.       return val;
  1688.     }
  1689.     @SuppressWarnings("unchecked")
  1690.     private Object deserializeValue(Object val) throws IOException {
  1691.       return valDeserializer.deserialize(val);
  1692.     }
  1693.     
  1694.     /** Read the next key in the file into <code>key</code>, skipping its
  1695.      * value.  True if another entry exists, and false at end of file. */
  1696.     public synchronized boolean next(Writable key) throws IOException {
  1697.       if (key.getClass() != getKeyClass())
  1698.         throw new IOException("wrong key class: "+key.getClass().getName()
  1699.                               +" is not "+keyClass);
  1700.       if (!blockCompressed) {
  1701.         outBuf.reset();
  1702.         
  1703.         keyLength = next(outBuf);
  1704.         if (keyLength < 0)
  1705.           return false;
  1706.         
  1707.         valBuffer.reset(outBuf.getData(), outBuf.getLength());
  1708.         
  1709.         key.readFields(valBuffer);
  1710.         valBuffer.mark(0);
  1711.         if (valBuffer.getPosition() != keyLength)
  1712.           throw new IOException(key + " read " + valBuffer.getPosition()
  1713.                                 + " bytes, should read " + keyLength);
  1714.       } else {
  1715.         //Reset syncSeen
  1716.         syncSeen = false;
  1717.         
  1718.         if (noBufferedKeys == 0) {
  1719.           try {
  1720.             readBlock();
  1721.           } catch (EOFException eof) {
  1722.             return false;
  1723.           }
  1724.         }
  1725.         
  1726.         int keyLength = WritableUtils.readVInt(keyLenIn);
  1727.         
  1728.         // Sanity check
  1729.         if (keyLength < 0) {
  1730.           return false;
  1731.         }
  1732.         
  1733.         //Read another compressed 'key'
  1734.         key.readFields(keyIn);
  1735.         --noBufferedKeys;
  1736.       }
  1737.       return true;
  1738.     }
  1739.     /** Read the next key/value pair in the file into <code>key</code> and
  1740.      * <code>val</code>.  Returns true if such a pair exists and false when at
  1741.      * end of file */
  1742.     public synchronized boolean next(Writable key, Writable val)
  1743.       throws IOException {
  1744.       if (val.getClass() != getValueClass())
  1745.         throw new IOException("wrong value class: "+val+" is not "+valClass);
  1746.       boolean more = next(key);
  1747.       
  1748.       if (more) {
  1749.         getCurrentValue(val);
  1750.       }
  1751.       return more;
  1752.     }
  1753.     
  1754.     /**
  1755.      * Read and return the next record length, potentially skipping over 
  1756.      * a sync block.
  1757.      * @return the length of the next record or -1 if there is no next record
  1758.      * @throws IOException
  1759.      */
  1760.     private synchronized int readRecordLength() throws IOException {
  1761.       if (in.getPos() >= end) {
  1762.         return -1;
  1763.       }      
  1764.       int length = in.readInt();
  1765.       if (version > 1 && sync != null &&
  1766.           length == SYNC_ESCAPE) {              // process a sync entry
  1767.         in.readFully(syncCheck);                // read syncCheck
  1768.         if (!Arrays.equals(sync, syncCheck))    // check it
  1769.           throw new IOException("File is corrupt!");
  1770.         syncSeen = true;
  1771.         if (in.getPos() >= end) {
  1772.           return -1;
  1773.         }
  1774.         length = in.readInt();                  // re-read length
  1775.       } else {
  1776.         syncSeen = false;
  1777.       }
  1778.       
  1779.       return length;
  1780.     }
  1781.     
  1782.     /** Read the next key/value pair in the file into <code>buffer</code>.
  1783.      * Returns the length of the key read, or -1 if at end of file.  The length
  1784.      * of the value may be computed by calling buffer.getLength() before and
  1785.      * after calls to this method. */
  1786.     /** @deprecated Call {@link #nextRaw(DataOutputBuffer,SequenceFile.ValueBytes)}. */
  1787.     public synchronized int next(DataOutputBuffer buffer) throws IOException {
  1788.       // Unsupported for block-compressed sequence files
  1789.       if (blockCompressed) {
  1790.         throw new IOException("Unsupported call for block-compressed" +
  1791.                               " SequenceFiles - use SequenceFile.Reader.next(DataOutputStream, ValueBytes)");
  1792.       }
  1793.       try {
  1794.         int length = readRecordLength();
  1795.         if (length == -1) {
  1796.           return -1;
  1797.         }
  1798.         int keyLength = in.readInt();
  1799.         buffer.write(in, length);
  1800.         return keyLength;
  1801.       } catch (ChecksumException e) {             // checksum failure
  1802.         handleChecksumException(e);
  1803.         return next(buffer);
  1804.       }
  1805.     }
  1806.     public ValueBytes createValueBytes() {
  1807.       ValueBytes val = null;
  1808.       if (!decompress || blockCompressed) {
  1809.         val = new UncompressedBytes();
  1810.       } else {
  1811.         val = new CompressedBytes(codec);
  1812.       }
  1813.       return val;
  1814.     }
  1815.     /**
  1816.      * Read 'raw' records.
  1817.      * @param key - The buffer into which the key is read
  1818.      * @param val - The 'raw' value
  1819.      * @return Returns the total record length or -1 for end of file
  1820.      * @throws IOException
  1821.      */
  1822.     public synchronized int nextRaw(DataOutputBuffer key, ValueBytes val) 
  1823.       throws IOException {
  1824.       if (!blockCompressed) {
  1825.         int length = readRecordLength();
  1826.         if (length == -1) {
  1827.           return -1;
  1828.         }
  1829.         int keyLength = in.readInt();
  1830.         int valLength = length - keyLength;
  1831.         key.write(in, keyLength);
  1832.         if (decompress) {
  1833.           CompressedBytes value = (CompressedBytes)val;
  1834.           value.reset(in, valLength);
  1835.         } else {
  1836.           UncompressedBytes value = (UncompressedBytes)val;
  1837.           value.reset(in, valLength);
  1838.         }
  1839.         
  1840.         return length;
  1841.       } else {
  1842.         //Reset syncSeen
  1843.         syncSeen = false;
  1844.         
  1845.         // Read 'key'
  1846.         if (noBufferedKeys == 0) {
  1847.           if (in.getPos() >= end) 
  1848.             return -1;
  1849.           try { 
  1850.             readBlock();
  1851.           } catch (EOFException eof) {
  1852.             return -1;
  1853.           }
  1854.         }
  1855.         int keyLength = WritableUtils.readVInt(keyLenIn);
  1856.         if (keyLength < 0) {
  1857.           throw new IOException("zero length key found!");
  1858.         }
  1859.         key.write(keyIn, keyLength);
  1860.         --noBufferedKeys;
  1861.         
  1862.         // Read raw 'value'
  1863.         seekToCurrentValue();
  1864.         int valLength = WritableUtils.readVInt(valLenIn);
  1865.         UncompressedBytes rawValue = (UncompressedBytes)val;
  1866.         rawValue.reset(valIn, valLength);
  1867.         --noBufferedValues;
  1868.         
  1869.         return (keyLength+valLength);
  1870.       }
  1871.       
  1872.     }
  1873.     /**
  1874.      * Read 'raw' keys.
  1875.      * @param key - The buffer into which the key is read
  1876.      * @return Returns the key length or -1 for end of file
  1877.      * @throws IOException
  1878.      */
  1879.     public int nextRawKey(DataOutputBuffer key) 
  1880.       throws IOException {
  1881.       if (!blockCompressed) {
  1882.         recordLength = readRecordLength();
  1883.         if (recordLength == -1) {
  1884.           return -1;
  1885.         }
  1886.         keyLength = in.readInt();
  1887.         key.write(in, keyLength);
  1888.         return keyLength;
  1889.       } else {
  1890.         //Reset syncSeen
  1891.         syncSeen = false;
  1892.         
  1893.         // Read 'key'
  1894.         if (noBufferedKeys == 0) {
  1895.           if (in.getPos() >= end) 
  1896.             return -1;
  1897.           try { 
  1898.             readBlock();
  1899.           } catch (EOFException eof) {
  1900.             return -1;
  1901.           }
  1902.         }
  1903.         int keyLength = WritableUtils.readVInt(keyLenIn);
  1904.         if (keyLength < 0) {
  1905.           throw new IOException("zero length key found!");
  1906.         }
  1907.         key.write(keyIn, keyLength);
  1908.         --noBufferedKeys;
  1909.         
  1910.         return keyLength;
  1911.       }
  1912.       
  1913.     }
  1914.     /** Read the next key in the file, skipping its
  1915.      * value.  Return null at end of file. */
  1916.     public synchronized Object next(Object key) throws IOException {
  1917.       if (key != null && key.getClass() != getKeyClass()) {
  1918.         throw new IOException("wrong key class: "+key.getClass().getName()
  1919.                               +" is not "+keyClass);
  1920.       }
  1921.       if (!blockCompressed) {
  1922.         outBuf.reset();
  1923.         
  1924.         keyLength = next(outBuf);
  1925.         if (keyLength < 0)
  1926.           return null;
  1927.         
  1928.         valBuffer.reset(outBuf.getData(), outBuf.getLength());
  1929.         
  1930.         key = deserializeKey(key);
  1931.         valBuffer.mark(0);
  1932.         if (valBuffer.getPosition() != keyLength)
  1933.           throw new IOException(key + " read " + valBuffer.getPosition()
  1934.                                 + " bytes, should read " + keyLength);
  1935.       } else {
  1936.         //Reset syncSeen
  1937.         syncSeen = false;
  1938.         
  1939.         if (noBufferedKeys == 0) {
  1940.           try {
  1941.             readBlock();
  1942.           } catch (EOFException eof) {
  1943.             return null;
  1944.           }
  1945.         }
  1946.         
  1947.         int keyLength = WritableUtils.readVInt(keyLenIn);
  1948.         
  1949.         // Sanity check
  1950.         if (keyLength < 0) {
  1951.           return null;
  1952.         }
  1953.         
  1954.         //Read another compressed 'key'
  1955.         key = deserializeKey(key);
  1956.         --noBufferedKeys;
  1957.       }
  1958.       return key;
  1959.     }
  1960.     @SuppressWarnings("unchecked")
  1961.     private Object deserializeKey(Object key) throws IOException {
  1962.       return keyDeserializer.deserialize(key);
  1963.     }
  1964.     /**
  1965.      * Read 'raw' values.
  1966.      * @param val - The 'raw' value
  1967.      * @return Returns the value length
  1968.      * @throws IOException
  1969.      */
  1970.     public synchronized int nextRawValue(ValueBytes val) 
  1971.       throws IOException {
  1972.       
  1973.       // Position stream to current value
  1974.       seekToCurrentValue();
  1975.  
  1976.       if (!blockCompressed) {
  1977.         int valLength = recordLength - keyLength;
  1978.         if (decompress) {
  1979.           CompressedBytes value = (CompressedBytes)val;
  1980.           value.reset(in, valLength);
  1981.         } else {
  1982.           UncompressedBytes value = (UncompressedBytes)val;
  1983.           value.reset(in, valLength);
  1984.         }
  1985.          
  1986.         return valLength;
  1987.       } else {
  1988.         int valLength = WritableUtils.readVInt(valLenIn);
  1989.         UncompressedBytes rawValue = (UncompressedBytes)val;
  1990.         rawValue.reset(valIn, valLength);
  1991.         --noBufferedValues;
  1992.         return valLength;
  1993.       }
  1994.       
  1995.     }
  1996.     private void handleChecksumException(ChecksumException e)
  1997.       throws IOException {
  1998.       if (this.conf.getBoolean("io.skip.checksum.errors", false)) {
  1999.         LOG.warn("Bad checksum at "+getPosition()+". Skipping entries.");
  2000.         sync(getPosition()+this.conf.getInt("io.bytes.per.checksum", 512));
  2001.       } else {
  2002.         throw e;
  2003.       }
  2004.     }
  2005.     /** Set the current byte position in the input file.
  2006.      *
  2007.      * <p>The position passed must be a position returned by {@link
  2008.      * SequenceFile.Writer#getLength()} when writing this file.  To seek to an arbitrary
  2009.      * position, use {@link SequenceFile.Reader#sync(long)}.
  2010.      */
  2011.     public synchronized void seek(long position) throws IOException {
  2012.       in.seek(position);
  2013.       if (blockCompressed) {                      // trigger block read
  2014.         noBufferedKeys = 0;
  2015.         valuesDecompressed = true;
  2016.       }
  2017.     }
  2018.     /** Seek to the next sync mark past a given position.*/
  2019.     public synchronized void sync(long position) throws IOException {
  2020.       if (position+SYNC_SIZE >= end) {
  2021.         seek(end);
  2022.         return;
  2023.       }
  2024.       try {
  2025.         seek(position+4);                         // skip escape
  2026.         in.readFully(syncCheck);
  2027.         int syncLen = sync.length;
  2028.         for (int i = 0; in.getPos() < end; i++) {
  2029.           int j = 0;
  2030.           for (; j < syncLen; j++) {
  2031.             if (sync[j] != syncCheck[(i+j)%syncLen])
  2032.               break;
  2033.           }
  2034.           if (j == syncLen) {
  2035.             in.seek(in.getPos() - SYNC_SIZE);     // position before sync
  2036.             return;
  2037.           }
  2038.           syncCheck[i%syncLen] = in.readByte();
  2039.         }
  2040.       } catch (ChecksumException e) {             // checksum failure
  2041.         handleChecksumException(e);
  2042.       }
  2043.     }
  2044.     /** Returns true iff the previous call to next passed a sync mark.*/
  2045.     public boolean syncSeen() { return syncSeen; }
  2046.     /** Return the current byte position in the input file. */
  2047.     public synchronized long getPosition() throws IOException {
  2048.       return in.getPos();
  2049.     }
  2050.     /** Returns the name of the file. */
  2051.     public String toString() {
  2052.       return file.toString();
  2053.     }
  2054.   }
  2055.   /** Sorts key/value pairs in a sequence-format file.
  2056.    *
  2057.    * <p>For best performance, applications should make sure that the {@link
  2058.    * Writable#readFields(DataInput)} implementation of their keys is
  2059.    * very efficient.  In particular, it should avoid allocating memory.
  2060.    */
  2061.   public static class Sorter {
  2062.     private RawComparator comparator;
  2063.     private MergeSort mergeSort; //the implementation of merge sort
  2064.     
  2065.     private Path[] inFiles;                     // when merging or sorting
  2066.     private Path outFile;
  2067.     private int memory; // bytes
  2068.     private int factor; // merged per pass
  2069.     private FileSystem fs = null;
  2070.     private Class keyClass;
  2071.     private Class valClass;
  2072.     private Configuration conf;
  2073.     
  2074.     private Progressable progressable = null;
  2075.     /** Sort and merge files containing the named classes. */
  2076.     public Sorter(FileSystem fs, Class<? extends WritableComparable> keyClass,
  2077.                   Class valClass, Configuration conf)  {
  2078.       this(fs, WritableComparator.get(keyClass), keyClass, valClass, conf);
  2079.     }
  2080.     /** Sort and merge using an arbitrary {@link RawComparator}. */
  2081.     public Sorter(FileSystem fs, RawComparator comparator, Class keyClass, 
  2082.                   Class valClass, Configuration conf) {
  2083.       this.fs = fs;
  2084.       this.comparator = comparator;
  2085.       this.keyClass = keyClass;
  2086.       this.valClass = valClass;
  2087.       this.memory = conf.getInt("io.sort.mb", 100) * 1024 * 1024;
  2088.       this.factor = conf.getInt("io.sort.factor", 100);
  2089.       this.conf = conf;
  2090.     }
  2091.     /** Set the number of streams to merge at once.*/
  2092.     public void setFactor(int factor) { this.factor = factor; }
  2093.     /** Get the number of streams to merge at once.*/
  2094.     public int getFactor() { return factor; }
  2095.     /** Set the total amount of buffer memory, in bytes.*/
  2096.     public void setMemory(int memory) { this.memory = memory; }
  2097.     /** Get the total amount of buffer memory, in bytes.*/
  2098.     public int getMemory() { return memory; }
  2099.     /** Set the progressable object in order to report progress. */
  2100.     public void setProgressable(Progressable progressable) {
  2101.       this.progressable = progressable;
  2102.     }
  2103.     
  2104.     /** 
  2105.      * Perform a file sort from a set of input files into an output file.
  2106.      * @param inFiles the files to be sorted
  2107.      * @param outFile the sorted output file
  2108.      * @param deleteInput should the input files be deleted as they are read?
  2109.      */
  2110.     public void sort(Path[] inFiles, Path outFile,
  2111.                      boolean deleteInput) throws IOException {
  2112.       if (fs.exists(outFile)) {
  2113.         throw new IOException("already exists: " + outFile);
  2114.       }
  2115.       this.inFiles = inFiles;
  2116.       this.outFile = outFile;
  2117.       int segments = sortPass(deleteInput);
  2118.       if (segments > 1) {
  2119.         mergePass(outFile.getParent());
  2120.       }
  2121.     }
  2122.     /** 
  2123.      * Perform a file sort from a set of input files and return an iterator.
  2124.      * @param inFiles the files to be sorted
  2125.      * @param tempDir the directory where temp files are created during sort
  2126.      * @param deleteInput should the input files be deleted as they are read?
  2127.      * @return iterator the RawKeyValueIterator
  2128.      */
  2129.     public RawKeyValueIterator sortAndIterate(Path[] inFiles, Path tempDir, 
  2130.                                               boolean deleteInput) throws IOException {
  2131.       Path outFile = new Path(tempDir + Path.SEPARATOR + "all.2");
  2132.       if (fs.exists(outFile)) {
  2133.         throw new IOException("already exists: " + outFile);
  2134.       }
  2135.       this.inFiles = inFiles;
  2136.       //outFile will basically be used as prefix for temp files in the cases
  2137.       //where sort outputs multiple sorted segments. For the single segment
  2138.       //case, the outputFile itself will contain the sorted data for that
  2139.       //segment
  2140.       this.outFile = outFile;
  2141.       int segments = sortPass(deleteInput);
  2142.       if (segments > 1)
  2143.         return merge(outFile.suffix(".0"), outFile.suffix(".0.index"), 
  2144.                      tempDir);
  2145.       else if (segments == 1)
  2146.         return merge(new Path[]{outFile}, true, tempDir);
  2147.       else return null;
  2148.     }
  2149.     /**
  2150.      * The backwards compatible interface to sort.
  2151.      * @param inFile the input file to sort
  2152.      * @param outFile the sorted output file
  2153.      */
  2154.     public void sort(Path inFile, Path outFile) throws IOException {
  2155.       sort(new Path[]{inFile}, outFile, false);
  2156.     }
  2157.     
  2158.     private int sortPass(boolean deleteInput) throws IOException {
  2159.       LOG.debug("running sort pass");
  2160.       SortPass sortPass = new SortPass();         // make the SortPass
  2161.       sortPass.setProgressable(progressable);
  2162.       mergeSort = new MergeSort(sortPass.new SeqFileComparator());
  2163.       try {
  2164.         return sortPass.run(deleteInput);         // run it
  2165.       } finally {
  2166.         sortPass.close();                         // close it
  2167.       }
  2168.     }
  2169.     private class SortPass {
  2170.       private int memoryLimit = memory/4;
  2171.       private int recordLimit = 1000000;
  2172.       
  2173.       private DataOutputBuffer rawKeys = new DataOutputBuffer();
  2174.       private byte[] rawBuffer;
  2175.       private int[] keyOffsets = new int[1024];
  2176.       private int[] pointers = new int[keyOffsets.length];
  2177.       private int[] pointersCopy = new int[keyOffsets.length];
  2178.       private int[] keyLengths = new int[keyOffsets.length];
  2179.       private ValueBytes[] rawValues = new ValueBytes[keyOffsets.length];
  2180.       
  2181.       private ArrayList segmentLengths = new ArrayList();
  2182.       
  2183.       private Reader in = null;
  2184.       private FSDataOutputStream out = null;
  2185.       private FSDataOutputStream indexOut = null;
  2186.       private Path outName;
  2187.       private Progressable progressable = null;
  2188.       public int run(boolean deleteInput) throws IOException {
  2189.         int segments = 0;
  2190.         int currentFile = 0;
  2191.         boolean atEof = (currentFile >= inFiles.length);
  2192.         boolean isCompressed = false;
  2193.         boolean isBlockCompressed = false;
  2194.         CompressionCodec codec = null;
  2195.         segmentLengths.clear();
  2196.         if (atEof) {
  2197.           return 0;
  2198.         }
  2199.         
  2200.         // Initialize
  2201.         in = new Reader(fs, inFiles[currentFile], conf);
  2202.         isCompressed = in.isCompressed();
  2203.         isBlockCompressed = in.isBlockCompressed();
  2204.         codec = in.getCompressionCodec();
  2205.         
  2206.         for (int i=0; i < rawValues.length; ++i) {
  2207.           rawValues[i] = null;
  2208.         }
  2209.         
  2210.         while (!atEof) {
  2211.           int count = 0;
  2212.           int bytesProcessed = 0;
  2213.           rawKeys.reset();
  2214.           while (!atEof && 
  2215.                  bytesProcessed < memoryLimit && count < recordLimit) {
  2216.             // Read a record into buffer
  2217.             // Note: Attempt to re-use 'rawValue' as far as possible
  2218.             int keyOffset = rawKeys.getLength();       
  2219.             ValueBytes rawValue = 
  2220.               (count == keyOffsets.length || rawValues[count] == null) ? 
  2221.               in.createValueBytes() : 
  2222.               rawValues[count];
  2223.             int recordLength = in.nextRaw(rawKeys, rawValue);
  2224.             if (recordLength == -1) {
  2225.               in.close();
  2226.               if (deleteInput) {
  2227.                 fs.delete(inFiles[currentFile], true);
  2228.               }
  2229.               currentFile += 1;
  2230.               atEof = currentFile >= inFiles.length;
  2231.               if (!atEof) {
  2232.                 in = new Reader(fs, inFiles[currentFile], conf);
  2233.               } else {
  2234.                 in = null;
  2235.               }
  2236.               continue;
  2237.             }
  2238.             int keyLength = rawKeys.getLength() - keyOffset;
  2239.             if (count == keyOffsets.length)
  2240.               grow();
  2241.             keyOffsets[count] = keyOffset;                // update pointers
  2242.             pointers[count] = count;
  2243.             keyLengths[count] = keyLength;
  2244.             rawValues[count] = rawValue;
  2245.             bytesProcessed += recordLength; 
  2246.             count++;
  2247.           }
  2248.           // buffer is full -- sort & flush it
  2249.           LOG.debug("flushing segment " + segments);
  2250.           rawBuffer = rawKeys.getData();
  2251.           sort(count);
  2252.           // indicate we're making progress
  2253.           if (progressable != null) {
  2254.             progressable.progress();
  2255.           }
  2256.           flush(count, bytesProcessed, isCompressed, isBlockCompressed, codec, 
  2257.                 segments==0 && atEof);
  2258.           segments++;
  2259.         }
  2260.         return segments;
  2261.       }
  2262.       public void close() throws IOException {
  2263.         if (in != null) {
  2264.           in.close();
  2265.         }
  2266.         if (out != null) {
  2267.           out.close();
  2268.         }
  2269.         if (indexOut != null) {
  2270.           indexOut.close();
  2271.         }
  2272.       }
  2273.       private void grow() {
  2274.         int newLength = keyOffsets.length * 3 / 2;
  2275.         keyOffsets = grow(keyOffsets, newLength);
  2276.         pointers = grow(pointers, newLength);
  2277.         pointersCopy = new int[newLength];
  2278.         keyLengths = grow(keyLengths, newLength);
  2279.         rawValues = grow(rawValues, newLength);
  2280.       }
  2281.       private int[] grow(int[] old, int newLength) {
  2282.         int[] result = new int[newLength];
  2283.         System.arraycopy(old, 0, result, 0, old.length);
  2284.         return result;
  2285.       }
  2286.       
  2287.       private ValueBytes[] grow(ValueBytes[] old, int newLength) {
  2288.         ValueBytes[] result = new ValueBytes[newLength];
  2289.         System.arraycopy(old, 0, result, 0, old.length);
  2290.         for (int i=old.length; i < newLength; ++i) {
  2291.           result[i] = null;
  2292.         }
  2293.         return result;
  2294.       }
  2295.       private void flush(int count, int bytesProcessed, boolean isCompressed, 
  2296.                          boolean isBlockCompressed, CompressionCodec codec, boolean done) 
  2297.         throws IOException {
  2298.         if (out == null) {
  2299.           outName = done ? outFile : outFile.suffix(".0");
  2300.           out = fs.create(outName);
  2301.           if (!done) {
  2302.             indexOut = fs.create(outName.suffix(".index"));
  2303.           }
  2304.         }
  2305.         long segmentStart = out.getPos();
  2306.         Writer writer = createWriter(conf, out, keyClass, valClass, 
  2307.                                      isCompressed, isBlockCompressed, codec, 
  2308.                                      new Metadata());
  2309.         
  2310.         if (!done) {
  2311.           writer.sync = null;                     // disable sync on temp files
  2312.         }
  2313.         for (int i = 0; i < count; i++) {         // write in sorted order
  2314.           int p = pointers[i];
  2315.           writer.appendRaw(rawBuffer, keyOffsets[p], keyLengths[p], rawValues[p]);
  2316.         }
  2317.         writer.close();
  2318.         
  2319.         if (!done) {
  2320.           // Save the segment length
  2321.           WritableUtils.writeVLong(indexOut, segmentStart);
  2322.           WritableUtils.writeVLong(indexOut, (out.getPos()-segmentStart));
  2323.           indexOut.flush();
  2324.         }
  2325.       }
  2326.       private void sort(int count) {
  2327.         System.arraycopy(pointers, 0, pointersCopy, 0, count);
  2328.         mergeSort.mergeSort(pointersCopy, pointers, 0, count);
  2329.       }
  2330.       class SeqFileComparator implements Comparator<IntWritable> {
  2331.         public int compare(IntWritable I, IntWritable J) {
  2332.           return comparator.compare(rawBuffer, keyOffsets[I.get()], 
  2333.                                     keyLengths[I.get()], rawBuffer, 
  2334.                                     keyOffsets[J.get()], keyLengths[J.get()]);
  2335.         }
  2336.       }
  2337.       
  2338.       /** set the progressable object in order to report progress */
  2339.       public void setProgressable(Progressable progressable)
  2340.       {
  2341.         this.progressable = progressable;
  2342.       }
  2343.       
  2344.     } // SequenceFile.Sorter.SortPass
  2345.     /** The interface to iterate over raw keys/values of SequenceFiles. */
  2346.     public static interface RawKeyValueIterator {
  2347.       /** Gets the current raw key
  2348.        * @return DataOutputBuffer
  2349.        * @throws IOException
  2350.        */
  2351.       DataOutputBuffer getKey() throws IOException; 
  2352.       /** Gets the current raw value
  2353.        * @return ValueBytes 
  2354.        * @throws IOException
  2355.        */
  2356.       ValueBytes getValue() throws IOException; 
  2357.       /** Sets up the current key and value (for getKey and getValue)
  2358.        * @return true if there exists a key/value, false otherwise 
  2359.        * @throws IOException
  2360.        */
  2361.       boolean next() throws IOException;
  2362.       /** closes the iterator so that the underlying streams can be closed
  2363.        * @throws IOException
  2364.        */
  2365.       void close() throws IOException;
  2366.       /** Gets the Progress object; this has a float (0.0 - 1.0) 
  2367.        * indicating the bytes processed by the iterator so far
  2368.        */
  2369.       Progress getProgress();
  2370.     }    
  2371.     
  2372.     /**
  2373.      * Merges the list of segments of type <code>SegmentDescriptor</code>
  2374.      * @param segments the list of SegmentDescriptors
  2375.      * @param tmpDir the directory to write temporary files into
  2376.      * @return RawKeyValueIterator
  2377.      * @throws IOException
  2378.      */
  2379.     public RawKeyValueIterator merge(List <SegmentDescriptor> segments, 
  2380.                                      Path tmpDir) 
  2381.       throws IOException {
  2382.       // pass in object to report progress, if present
  2383.       MergeQueue mQueue = new MergeQueue(segments, tmpDir, progressable);
  2384.       return mQueue.merge();
  2385.     }
  2386.     /**
  2387.      * Merges the contents of files passed in Path[] using a max factor value
  2388.      * that is already set
  2389.      * @param inNames the array of path names
  2390.      * @param deleteInputs true if the input files should be deleted when 
  2391.      * unnecessary
  2392.      * @param tmpDir the directory to write temporary files into
  2393.      * @return RawKeyValueIteratorMergeQueue
  2394.      * @throws IOException
  2395.      */
  2396.     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
  2397.                                      Path tmpDir) 
  2398.       throws IOException {
  2399.       return merge(inNames, deleteInputs, 
  2400.                    (inNames.length < factor) ? inNames.length : factor,
  2401.                    tmpDir);
  2402.     }
  2403.     /**
  2404.      * Merges the contents of files passed in Path[]
  2405.      * @param inNames the array of path names
  2406.      * @param deleteInputs true if the input files should be deleted when 
  2407.      * unnecessary
  2408.      * @param factor the factor that will be used as the maximum merge fan-in
  2409.      * @param tmpDir the directory to write temporary files into
  2410.      * @return RawKeyValueIteratorMergeQueue
  2411.      * @throws IOException
  2412.      */
  2413.     public RawKeyValueIterator merge(Path [] inNames, boolean deleteInputs,
  2414.                                      int factor, Path tmpDir) 
  2415.       throws IOException {
  2416.       //get the segments from inNames
  2417.       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
  2418.       for (int i = 0; i < inNames.length; i++) {
  2419.         SegmentDescriptor s = new SegmentDescriptor(0, 
  2420.                                                     fs.getLength(inNames[i]), inNames[i]);
  2421.         s.preserveInput(!deleteInputs);
  2422.         s.doSync();
  2423.         a.add(s);
  2424.       }
  2425.       this.factor = factor;
  2426.       MergeQueue mQueue = new MergeQueue(a, tmpDir, progressable);
  2427.       return mQueue.merge();
  2428.     }
  2429.     /**
  2430.      * Merges the contents of files passed in Path[]
  2431.      * @param inNames the array of path names
  2432.      * @param tempDir the directory for creating temp files during merge
  2433.      * @param deleteInputs true if the input files should be deleted when 
  2434.      * unnecessary
  2435.      * @return RawKeyValueIteratorMergeQueue
  2436.      * @throws IOException
  2437.      */
  2438.     public RawKeyValueIterator merge(Path [] inNames, Path tempDir, 
  2439.                                      boolean deleteInputs) 
  2440.       throws IOException {
  2441.       //outFile will basically be used as prefix for temp files for the
  2442.       //intermediate merge outputs           
  2443.       this.outFile = new Path(tempDir + Path.SEPARATOR + "merged");
  2444.       //get the segments from inNames
  2445.       ArrayList <SegmentDescriptor> a = new ArrayList <SegmentDescriptor>();
  2446.       for (int i = 0; i < inNames.length; i++) {
  2447.         SegmentDescriptor s = new SegmentDescriptor(0, 
  2448.                                                     fs.getLength(inNames[i]), inNames[i]);
  2449.         s.preserveInput(!deleteInputs);
  2450.         s.doSync();
  2451.         a.add(s);
  2452.       }
  2453.       factor = (inNames.length < factor) ? inNames.length : factor;
  2454.       // pass in object to report progress, if present
  2455.       MergeQueue mQueue = new MergeQueue(a, tempDir, progressable);
  2456.       return mQueue.merge();
  2457.     }
  2458.     /**
  2459.      * Clones the attributes (like compression of the input file and creates a 
  2460.      * corresponding Writer
  2461.      * @param inputFile the path of the input file whose attributes should be 
  2462.      * cloned
  2463.      * @param outputFile the path of the output file 
  2464.      * @param prog the Progressable to report status during the file write
  2465.      * @return Writer
  2466.      * @throws IOException
  2467.      */
  2468.     public Writer cloneFileAttributes(Path inputFile, Path outputFile, 
  2469.                                       Progressable prog) 
  2470.     throws IOException {
  2471.       FileSystem srcFileSys = inputFile.getFileSystem(conf);
  2472.       Reader reader = new Reader(srcFileSys, inputFile, 4096, conf, true);
  2473.       boolean compress = reader.isCompressed();
  2474.       boolean blockCompress = reader.isBlockCompressed();
  2475.       CompressionCodec codec = reader.getCompressionCodec();
  2476.       reader.close();
  2477.       Writer writer = createWriter(outputFile.getFileSystem(conf), conf, 
  2478.                                    outputFile, keyClass, valClass, compress, 
  2479.                                    blockCompress, codec, prog,
  2480.                                    new Metadata());
  2481.       return writer;
  2482.     }
  2483.     /**
  2484.      * Writes records from RawKeyValueIterator into a file represented by the 
  2485.      * passed writer
  2486.      * @param records the RawKeyValueIterator
  2487.      * @param writer the Writer created earlier 
  2488.      * @throws IOException
  2489.      */
  2490.     public void writeFile(RawKeyValueIterator records, Writer writer) 
  2491.       throws IOException {
  2492.       while(records.next()) {
  2493.         writer.appendRaw(records.getKey().getData(), 0, 
  2494.                          records.getKey().getLength(), records.getValue());
  2495.       }
  2496.       writer.sync();
  2497.     }
  2498.         
  2499.     /** Merge the provided files.
  2500.      * @param inFiles the array of input path names
  2501.      * @param outFile the final output file
  2502.      * @throws IOException
  2503.      */
  2504.     public void merge(Path[] inFiles, Path outFile) throws IOException {
  2505.       if (fs.exists(outFile)) {
  2506.         throw new IOException("already exists: " + outFile);
  2507.       }
  2508.       RawKeyValueIterator r = merge(inFiles, false, outFile.getParent());
  2509.       Writer writer = cloneFileAttributes(inFiles[0], outFile, null);
  2510.       
  2511.       writeFile(r, writer);
  2512.       writer.close();
  2513.     }
  2514.     /** sort calls this to generate the final merged output */
  2515.     private int mergePass(Path tmpDir) throws IOException {
  2516.       LOG.debug("running merge pass");
  2517.       Writer writer = cloneFileAttributes(
  2518.                                           outFile.suffix(".0"), outFile, null);
  2519.       RawKeyValueIterator r = merge(outFile.suffix(".0"), 
  2520.                                     outFile.suffix(".0.index"), tmpDir);
  2521.       writeFile(r, writer);
  2522.       writer.close();
  2523.       return 0;
  2524.     }
  2525.     /** Used by mergePass to merge the output of the sort
  2526.      * @param inName the name of the input file containing sorted segments
  2527.      * @param indexIn the offsets of the sorted segments
  2528.      * @param tmpDir the relative directory to store intermediate results in
  2529.      * @return RawKeyValueIterator
  2530.      * @throws IOException
  2531.      */
  2532.     private RawKeyValueIterator merge(Path inName, Path indexIn, Path tmpDir) 
  2533.       throws IOException {
  2534.       //get the segments from indexIn
  2535.       //we create a SegmentContainer so that we can track segments belonging to
  2536.       //inName and delete inName as soon as we see that we have looked at all
  2537.       //the contained segments during the merge process & hence don't need 
  2538.       //them anymore
  2539.       SegmentContainer container = new SegmentContainer(inName, indexIn);
  2540.       MergeQueue mQueue = new MergeQueue(container.getSegmentList(), tmpDir, progressable);
  2541.       return mQueue.merge();
  2542.     }
  2543.     
  2544.     /** This class implements the core of the merge logic */
  2545.     private class MergeQueue extends PriorityQueue 
  2546.       implements RawKeyValueIterator {
  2547.       private boolean compress;
  2548.       private boolean blockCompress;
  2549.       private DataOutputBuffer rawKey = new DataOutputBuffer();
  2550.       private ValueBytes rawValue;
  2551.       private long totalBytesProcessed;
  2552.       private float progPerByte;
  2553.       private Progress mergeProgress = new Progress();
  2554.       private Path tmpDir;
  2555.       private Progressable progress = null; //handle to the progress reporting object
  2556.       private SegmentDescriptor minSegment;
  2557.       
  2558.       //a TreeMap used to store the segments sorted by size (segment offset and
  2559.       //segment path name is used to break ties between segments of same sizes)
  2560.       private Map<SegmentDescriptor, Void> sortedSegmentSizes =
  2561.         new TreeMap<SegmentDescriptor, Void>();
  2562.             
  2563.       @SuppressWarnings("unchecked")
  2564.       public void put(SegmentDescriptor stream) throws IOException {
  2565.         if (size() == 0) {
  2566.           compress = stream.in.isCompressed();
  2567.           blockCompress = stream.in.isBlockCompressed();
  2568.         } else if (compress != stream.in.isCompressed() || 
  2569.                    blockCompress != stream.in.isBlockCompressed()) {
  2570.           throw new IOException("All merged files must be compressed or not.");
  2571.         } 
  2572.         super.put(stream);
  2573.       }
  2574.       
  2575.       /**
  2576.        * A queue of file segments to merge
  2577.        * @param segments the file segments to merge
  2578.        * @param tmpDir a relative local directory to save intermediate files in
  2579.        * @param progress the reference to the Progressable object
  2580.        */
  2581.       public MergeQueue(List <SegmentDescriptor> segments,
  2582.           Path tmpDir, Progressable progress) {
  2583.         int size = segments.size();
  2584.         for (int i = 0; i < size; i++) {
  2585.           sortedSegmentSizes.put(segments.get(i), null);
  2586.         }
  2587.         this.tmpDir = tmpDir;
  2588.         this.progress = progress;
  2589.       }
  2590.       protected boolean lessThan(Object a, Object b) {
  2591.         // indicate we're making progress
  2592.         if (progress != null) {
  2593.           progress.progress();
  2594.         }
  2595.         SegmentDescriptor msa = (SegmentDescriptor)a;
  2596.         SegmentDescriptor msb = (SegmentDescriptor)b;
  2597.         return comparator.compare(msa.getKey().getData(), 0, 
  2598.                                   msa.getKey().getLength(), msb.getKey().getData(), 0, 
  2599.                                   msb.getKey().getLength()) < 0;
  2600.       }
  2601.       public void close() throws IOException {
  2602.         SegmentDescriptor ms;                           // close inputs
  2603.         while ((ms = (SegmentDescriptor)pop()) != null) {
  2604.           ms.cleanup();
  2605.         }
  2606.         minSegment = null;
  2607.       }
  2608.       public DataOutputBuffer getKey() throws IOException {
  2609.         return rawKey;
  2610.       }
  2611.       public ValueBytes getValue() throws IOException {
  2612.         return rawValue;
  2613.       }
  2614.       public boolean next() throws IOException {
  2615.         if (size() == 0)
  2616.           return false;
  2617.         if (minSegment != null) {
  2618.           //minSegment is non-null for all invocations of next except the first
  2619.           //one. For the first invocation, the priority queue is ready for use
  2620.           //but for the subsequent invocations, first adjust the queue 
  2621.           adjustPriorityQueue(minSegment);
  2622.           if (size() == 0) {
  2623.             minSegment = null;
  2624.             return false;
  2625.           }
  2626.         }
  2627.         minSegment = (SegmentDescriptor)top();
  2628.         long startPos = minSegment.in.getPosition(); // Current position in stream
  2629.         //save the raw key reference
  2630.         rawKey = minSegment.getKey();
  2631.         //load the raw value. Re-use the existing rawValue buffer
  2632.         if (rawValue == null) {
  2633.           rawValue = minSegment.in.createValueBytes();
  2634.         }
  2635.         minSegment.nextRawValue(rawValue);
  2636.         long endPos = minSegment.in.getPosition(); // End position after reading value
  2637.         updateProgress(endPos - startPos);
  2638.         return true;
  2639.       }
  2640.       
  2641.       public Progress getProgress() {
  2642.         return mergeProgress; 
  2643.       }
  2644.       private void adjustPriorityQueue(SegmentDescriptor ms) throws IOException{
  2645.         long startPos = ms.in.getPosition(); // Current position in stream
  2646.         boolean hasNext = ms.nextRawKey();
  2647.         long endPos = ms.in.getPosition(); // End position after reading key
  2648.         updateProgress(endPos - startPos);
  2649.         if (hasNext) {
  2650.           adjustTop();
  2651.         } else {
  2652.           pop();
  2653.           ms.cleanup();
  2654.         }
  2655.       }
  2656.       private void updateProgress(long bytesProcessed) {
  2657.         totalBytesProcessed += bytesProcessed;
  2658.         if (progPerByte > 0) {
  2659.           mergeProgress.set(totalBytesProcessed * progPerByte);
  2660.         }
  2661.       }
  2662.       
  2663.       /** This is the single level merge that is called multiple times 
  2664.        * depending on the factor size and the number of segments
  2665.        * @return RawKeyValueIterator
  2666.        * @throws IOException
  2667.        */
  2668.       public RawKeyValueIterator merge() throws IOException {
  2669.         //create the MergeStreams from the sorted map created in the constructor
  2670.         //and dump the final output to a file
  2671.         int numSegments = sortedSegmentSizes.size();
  2672.         int origFactor = factor;
  2673.         int passNo = 1;
  2674.         LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
  2675.         do {
  2676.           //get the factor for this pass of merge
  2677.           factor = getPassFactor(passNo, numSegments);
  2678.           List<SegmentDescriptor> segmentsToMerge =
  2679.             new ArrayList<SegmentDescriptor>();
  2680.           int segmentsConsidered = 0;
  2681.           int numSegmentsToConsider = factor;
  2682.           while (true) {
  2683.             //extract the smallest 'factor' number of segment pointers from the 
  2684.             //TreeMap. Call cleanup on the empty segments (no key/value data)
  2685.             SegmentDescriptor[] mStream = 
  2686.               getSegmentDescriptors(numSegmentsToConsider);
  2687.             for (int i = 0; i < mStream.length; i++) {
  2688.               if (mStream[i].nextRawKey()) {
  2689.                 segmentsToMerge.add(mStream[i]);
  2690.                 segmentsConsidered++;
  2691.                 // Count the fact that we read some bytes in calling nextRawKey()
  2692.                 updateProgress(mStream[i].in.getPosition());
  2693.               }
  2694.               else {
  2695.                 mStream[i].cleanup();
  2696.                 numSegments--; //we ignore this segment for the merge
  2697.               }
  2698.             }
  2699.             //if we have the desired number of segments
  2700.             //or looked at all available segments, we break
  2701.             if (segmentsConsidered == factor || 
  2702.                 sortedSegmentSizes.size() == 0) {
  2703.               break;
  2704.             }
  2705.               
  2706.             numSegmentsToConsider = factor - segmentsConsidered;
  2707.           }
  2708.           //feed the streams to the priority queue
  2709.           initialize(segmentsToMerge.size()); clear();
  2710.           for (int i = 0; i < segmentsToMerge.size(); i++) {
  2711.             put(segmentsToMerge.get(i));
  2712.           }
  2713.           //if we have lesser number of segments remaining, then just return the
  2714.           //iterator, else do another single level merge
  2715.           if (numSegments <= factor) {
  2716.             //calculate the length of the remaining segments. Required for 
  2717.             //calculating the merge progress
  2718.             long totalBytes = 0;
  2719.             for (int i = 0; i < segmentsToMerge.size(); i++) {
  2720.               totalBytes += segmentsToMerge.get(i).segmentLength;
  2721.             }
  2722.             if (totalBytes != 0) //being paranoid
  2723.               progPerByte = 1.0f / (float)totalBytes;
  2724.             //reset factor to what it originally was
  2725.             factor = origFactor;
  2726.             return this;
  2727.           } else {
  2728.             //we want to spread the creation of temp files on multiple disks if 
  2729.             //available under the space constraints
  2730.             long approxOutputSize = 0; 
  2731.             for (SegmentDescriptor s : segmentsToMerge) {
  2732.               approxOutputSize += s.segmentLength + 
  2733.                                   ChecksumFileSystem.getApproxChkSumLength(
  2734.                                   s.segmentLength);
  2735.             }
  2736.             Path tmpFilename = 
  2737.               new Path(tmpDir, "intermediate").suffix("." + passNo);
  2738.             Path outputFile =  lDirAlloc.getLocalPathForWrite(
  2739.                                                 tmpFilename.toString(),
  2740.                                                 approxOutputSize, conf);
  2741.             LOG.debug("writing intermediate results to " + outputFile);
  2742.             Writer writer = cloneFileAttributes(
  2743.                                                 fs.makeQualified(segmentsToMerge.get(0).segmentPathName), 
  2744.                                                 fs.makeQualified(outputFile), null);
  2745.             writer.sync = null; //disable sync for temp files
  2746.             writeFile(this, writer);
  2747.             writer.close();
  2748.             
  2749.             //we finished one single level merge; now clean up the priority 
  2750.             //queue
  2751.             this.close();
  2752.             
  2753.             SegmentDescriptor tempSegment = 
  2754.               new SegmentDescriptor(0, fs.getLength(outputFile), outputFile);
  2755.             //put the segment back in the TreeMap
  2756.             sortedSegmentSizes.put(tempSegment, null);
  2757.             numSegments = sortedSegmentSizes.size();
  2758.             passNo++;
  2759.           }
  2760.           //we are worried about only the first pass merge factor. So reset the 
  2761.           //factor to what it originally was
  2762.           factor = origFactor;
  2763.         } while(true);
  2764.       }
  2765.   
  2766.       //Hadoop-591
  2767.       public int getPassFactor(int passNo, int numSegments) {
  2768.         if (passNo > 1 || numSegments <= factor || factor == 1) 
  2769.           return factor;
  2770.         int mod = (numSegments - 1) % (factor - 1);
  2771.         if (mod == 0)
  2772.           return factor;
  2773.         return mod + 1;
  2774.       }
  2775.       
  2776.       /** Return (& remove) the requested number of segment descriptors from the
  2777.        * sorted map.
  2778.        */
  2779.       public SegmentDescriptor[] getSegmentDescriptors(int numDescriptors) {
  2780.         if (numDescriptors > sortedSegmentSizes.size())
  2781.           numDescriptors = sortedSegmentSizes.size();
  2782.         SegmentDescriptor[] SegmentDescriptors = 
  2783.           new SegmentDescriptor[numDescriptors];
  2784.         Iterator iter = sortedSegmentSizes.keySet().iterator();
  2785.         int i = 0;
  2786.         while (i < numDescriptors) {
  2787.           SegmentDescriptors[i++] = (SegmentDescriptor)iter.next();
  2788.           iter.remove();
  2789.         }
  2790.         return SegmentDescriptors;
  2791.       }
  2792.     } // SequenceFile.Sorter.MergeQueue
  2793.     /** This class defines a merge segment. This class can be subclassed to 
  2794.      * provide a customized cleanup method implementation. In this 
  2795.      * implementation, cleanup closes the file handle and deletes the file 
  2796.      */
  2797.     public class SegmentDescriptor implements Comparable {
  2798.       
  2799.       long segmentOffset; //the start of the segment in the file
  2800.       long segmentLength; //the length of the segment
  2801.       Path segmentPathName; //the path name of the file containing the segment
  2802.       boolean ignoreSync = true; //set to true for temp files
  2803.       private Reader in = null; 
  2804.       private DataOutputBuffer rawKey = null; //this will hold the current key
  2805.       private boolean preserveInput = false; //delete input segment files?
  2806.       
  2807.       /** Constructs a segment
  2808.        * @param segmentOffset the offset of the segment in the file
  2809.        * @param segmentLength the length of the segment
  2810.        * @param segmentPathName the path name of the file containing the segment
  2811.        */
  2812.       public SegmentDescriptor (long segmentOffset, long segmentLength, 
  2813.                                 Path segmentPathName) {
  2814.         this.segmentOffset = segmentOffset;
  2815.         this.segmentLength = segmentLength;
  2816.         this.segmentPathName = segmentPathName;
  2817.       }
  2818.       
  2819.       /** Do the sync checks */
  2820.       public void doSync() {ignoreSync = false;}
  2821.       
  2822.       /** Whether to delete the files when no longer needed */
  2823.       public void preserveInput(boolean preserve) {
  2824.         preserveInput = preserve;
  2825.       }
  2826.       public boolean shouldPreserveInput() {
  2827.         return preserveInput;
  2828.       }
  2829.       
  2830.       public int compareTo(Object o) {
  2831.         SegmentDescriptor that = (SegmentDescriptor)o;
  2832.         if (this.segmentLength != that.segmentLength) {
  2833.           return (this.segmentLength < that.segmentLength ? -1 : 1);
  2834.         }
  2835.         if (this.segmentOffset != that.segmentOffset) {
  2836.           return (this.segmentOffset < that.segmentOffset ? -1 : 1);
  2837.         }
  2838.         return (this.segmentPathName.toString()).
  2839.           compareTo(that.segmentPathName.toString());
  2840.       }
  2841.       public boolean equals(Object o) {
  2842.         if (!(o instanceof SegmentDescriptor)) {
  2843.           return false;
  2844.         }
  2845.         SegmentDescriptor that = (SegmentDescriptor)o;
  2846.         if (this.segmentLength == that.segmentLength &&
  2847.             this.segmentOffset == that.segmentOffset &&
  2848.             this.segmentPathName.toString().equals(
  2849.               that.segmentPathName.toString())) {
  2850.           return true;
  2851.         }
  2852.         return false;
  2853.       }
  2854.       public int hashCode() {
  2855.         return 37 * 17 + (int) (segmentOffset^(segmentOffset>>>32));
  2856.       }
  2857.       /** Fills up the rawKey object with the key returned by the Reader
  2858.        * @return true if there is a key returned; false, otherwise
  2859.        * @throws IOException
  2860.        */
  2861.       public boolean nextRawKey() throws IOException {
  2862.         if (in == null) {
  2863.           int bufferSize = conf.getInt("io.file.buffer.size", 4096); 
  2864.           if (fs.getUri().getScheme().startsWith("ramfs")) {
  2865.             bufferSize = conf.getInt("io.bytes.per.checksum", 512);
  2866.           }
  2867.           Reader reader = new Reader(fs, segmentPathName, 
  2868.                                      bufferSize, segmentOffset, 
  2869.                                      segmentLength, conf, false);
  2870.         
  2871.           //sometimes we ignore syncs especially for temp merge files
  2872.           if (ignoreSync) reader.sync = null;
  2873.           if (reader.getKeyClass() != keyClass)
  2874.             throw new IOException("wrong key class: " + reader.getKeyClass() +
  2875.                                   " is not " + keyClass);
  2876.           if (reader.getValueClass() != valClass)
  2877.             throw new IOException("wrong value class: "+reader.getValueClass()+
  2878.                                   " is not " + valClass);
  2879.           this.in = reader;
  2880.           rawKey = new DataOutputBuffer();
  2881.         }
  2882.         rawKey.reset();
  2883.         int keyLength = 
  2884.           in.nextRawKey(rawKey);
  2885.         return (keyLength >= 0);
  2886.       }
  2887.       /** Fills up the passed rawValue with the value corresponding to the key
  2888.        * read earlier
  2889.        * @param rawValue
  2890.        * @return the length of the value
  2891.        * @throws IOException
  2892.        */
  2893.       public int nextRawValue(ValueBytes rawValue) throws IOException {
  2894.         int valLength = in.nextRawValue(rawValue);
  2895.         return valLength;
  2896.       }
  2897.       
  2898.       /** Returns the stored rawKey */
  2899.       public DataOutputBuffer getKey() {
  2900.         return rawKey;
  2901.       }
  2902.       
  2903.       /** closes the underlying reader */
  2904.       private void close() throws IOException {
  2905.         this.in.close();
  2906.         this.in = null;
  2907.       }
  2908.       /** The default cleanup. Subclasses can override this with a custom 
  2909.        * cleanup 
  2910.        */
  2911.       public void cleanup() throws IOException {
  2912.         close();
  2913.         if (!preserveInput) {
  2914.           fs.delete(segmentPathName, true);
  2915.         }
  2916.       }
  2917.     } // SequenceFile.Sorter.SegmentDescriptor
  2918.     
  2919.     /** This class provisions multiple segments contained within a single
  2920.      *  file
  2921.      */
  2922.     private class LinkedSegmentsDescriptor extends SegmentDescriptor {
  2923.       SegmentContainer parentContainer = null;
  2924.       /** Constructs a segment
  2925.        * @param segmentOffset the offset of the segment in the file
  2926.        * @param segmentLength the length of the segment
  2927.        * @param segmentPathName the path name of the file containing the segment
  2928.        * @param parent the parent SegmentContainer that holds the segment
  2929.        */
  2930.       public LinkedSegmentsDescriptor (long segmentOffset, long segmentLength, 
  2931.                                        Path segmentPathName, SegmentContainer parent) {
  2932.         super(segmentOffset, segmentLength, segmentPathName);
  2933.         this.parentContainer = parent;
  2934.       }
  2935.       /** The default cleanup. Subclasses can override this with a custom 
  2936.        * cleanup 
  2937.        */
  2938.       public void cleanup() throws IOException {
  2939.         super.close();
  2940.         if (super.shouldPreserveInput()) return;
  2941.         parentContainer.cleanup();
  2942.       }
  2943.     } //SequenceFile.Sorter.LinkedSegmentsDescriptor
  2944.     /** The class that defines a container for segments to be merged. Primarily
  2945.      * required to delete temp files as soon as all the contained segments
  2946.      * have been looked at */
  2947.     private class SegmentContainer {
  2948.       private int numSegmentsCleanedUp = 0; //track the no. of segment cleanups
  2949.       private int numSegmentsContained; //# of segments contained
  2950.       private Path inName; //input file from where segments are created
  2951.       
  2952.       //the list of segments read from the file
  2953.       private ArrayList <SegmentDescriptor> segments = 
  2954.         new ArrayList <SegmentDescriptor>();
  2955.       /** This constructor is there primarily to serve the sort routine that 
  2956.        * generates a single output file with an associated index file */
  2957.       public SegmentContainer(Path inName, Path indexIn) throws IOException {
  2958.         //get the segments from indexIn
  2959.         FSDataInputStream fsIndexIn = fs.open(indexIn);
  2960.         long end = fs.getLength(indexIn);
  2961.         while (fsIndexIn.getPos() < end) {
  2962.           long segmentOffset = WritableUtils.readVLong(fsIndexIn);
  2963.           long segmentLength = WritableUtils.readVLong(fsIndexIn);
  2964.           Path segmentName = inName;
  2965.           segments.add(new LinkedSegmentsDescriptor(segmentOffset, 
  2966.                                                     segmentLength, segmentName, this));
  2967.         }
  2968.         fsIndexIn.close();
  2969.         fs.delete(indexIn, true);
  2970.         numSegmentsContained = segments.size();
  2971.         this.inName = inName;
  2972.       }
  2973.       public List <SegmentDescriptor> getSegmentList() {
  2974.         return segments;
  2975.       }
  2976.       public void cleanup() throws IOException {
  2977.         numSegmentsCleanedUp++;
  2978.         if (numSegmentsCleanedUp == numSegmentsContained) {
  2979.           fs.delete(inName, true);
  2980.         }
  2981.       }
  2982.     } //SequenceFile.Sorter.SegmentContainer
  2983.   } // SequenceFile.Sorter
  2984. } // SequenceFile