IFile.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:18k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.EOFException;
  20. import java.io.File;
  21. import java.io.FileOutputStream;
  22. import java.io.IOException;
  23. import java.io.InputStream;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FSDataInputStream;
  26. import org.apache.hadoop.fs.FSDataOutputStream;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.io.DataInputBuffer;
  30. import org.apache.hadoop.io.DataOutputBuffer;
  31. import org.apache.hadoop.io.WritableUtils;
  32. import org.apache.hadoop.io.compress.CodecPool;
  33. import org.apache.hadoop.io.compress.CompressionCodec;
  34. import org.apache.hadoop.io.compress.CompressionOutputStream;
  35. import org.apache.hadoop.io.compress.Compressor;
  36. import org.apache.hadoop.io.compress.Decompressor;
  37. import org.apache.hadoop.io.serializer.SerializationFactory;
  38. import org.apache.hadoop.io.serializer.Serializer;
  39. /**
  40.  * <code>IFile</code> is the simple <key-len, value-len, key, value> format
  41.  * for the intermediate map-outputs in Map-Reduce.
  42.  * 
  43.  * There is a <code>Writer</code> to write out map-outputs in this format and 
  44.  * a <code>Reader</code> to read files of this format.
  45.  */
  46. class IFile {
  47.   private static final int EOF_MARKER = -1;
  48.   
  49.   /**
  50.    * <code>IFile.Writer</code> to write out intermediate map-outputs. 
  51.    */
  52.   public static class Writer<K extends Object, V extends Object> {
  53.     FSDataOutputStream out;
  54.     boolean ownOutputStream = false;
  55.     long start = 0;
  56.     FSDataOutputStream rawOut;
  57.     
  58.     CompressionOutputStream compressedOut;
  59.     Compressor compressor;
  60.     boolean compressOutput = false;
  61.     
  62.     long decompressedBytesWritten = 0;
  63.     long compressedBytesWritten = 0;
  64.     // Count records written to disk
  65.     private long numRecordsWritten = 0;
  66.     private final Counters.Counter writtenRecordsCounter;
  67.     IFileOutputStream checksumOut;
  68.     Class<K> keyClass;
  69.     Class<V> valueClass;
  70.     Serializer<K> keySerializer;
  71.     Serializer<V> valueSerializer;
  72.     
  73.     DataOutputBuffer buffer = new DataOutputBuffer();
  74.     public Writer(Configuration conf, FileSystem fs, Path file, 
  75.                   Class<K> keyClass, Class<V> valueClass,
  76.                   CompressionCodec codec,
  77.                   Counters.Counter writesCounter) throws IOException {
  78.       this(conf, fs.create(file), keyClass, valueClass, codec,
  79.            writesCounter);
  80.       ownOutputStream = true;
  81.     }
  82.     
  83.     public Writer(Configuration conf, FSDataOutputStream out, 
  84.         Class<K> keyClass, Class<V> valueClass,
  85.         CompressionCodec codec, Counters.Counter writesCounter)
  86.         throws IOException {
  87.       this.writtenRecordsCounter = writesCounter;
  88.       this.checksumOut = new IFileOutputStream(out);
  89.       this.rawOut = out;
  90.       this.start = this.rawOut.getPos();
  91.       
  92.       if (codec != null) {
  93.         this.compressor = CodecPool.getCompressor(codec);
  94.         this.compressor.reset();
  95.         this.compressedOut = codec.createOutputStream(checksumOut, compressor);
  96.         this.out = new FSDataOutputStream(this.compressedOut,  null);
  97.         this.compressOutput = true;
  98.       } else {
  99.         this.out = new FSDataOutputStream(checksumOut,null);
  100.       }
  101.       
  102.       this.keyClass = keyClass;
  103.       this.valueClass = valueClass;
  104.       SerializationFactory serializationFactory = new SerializationFactory(conf);
  105.       this.keySerializer = serializationFactory.getSerializer(keyClass);
  106.       this.keySerializer.open(buffer);
  107.       this.valueSerializer = serializationFactory.getSerializer(valueClass);
  108.       this.valueSerializer.open(buffer);
  109.     }
  110.     public void close() throws IOException {
  111.       // Close the serializers
  112.       keySerializer.close();
  113.       valueSerializer.close();
  114.       // Write EOF_MARKER for key/value length
  115.       WritableUtils.writeVInt(out, EOF_MARKER);
  116.       WritableUtils.writeVInt(out, EOF_MARKER);
  117.       decompressedBytesWritten += 2 * WritableUtils.getVIntSize(EOF_MARKER);
  118.       
  119.       //Flush the stream
  120.       out.flush();
  121.   
  122.       if (compressOutput) {
  123.         // Flush
  124.         compressedOut.finish();
  125.         compressedOut.resetState();
  126.       }
  127.       
  128.       // Close the underlying stream iff we own it...
  129.       if (ownOutputStream) {
  130.         out.close();
  131.       }
  132.       else {
  133.         // Write the checksum
  134.         checksumOut.finish();
  135.       }
  136.       compressedBytesWritten = rawOut.getPos() - start;
  137.       if (compressOutput) {
  138.         // Return back the compressor
  139.         CodecPool.returnCompressor(compressor);
  140.         compressor = null;
  141.       }
  142.       out = null;
  143.       if(writtenRecordsCounter != null) {
  144.         writtenRecordsCounter.increment(numRecordsWritten);
  145.       }
  146.     }
  147.     public void append(K key, V value) throws IOException {
  148.       if (key.getClass() != keyClass)
  149.         throw new IOException("wrong key class: "+ key.getClass()
  150.                               +" is not "+ keyClass);
  151.       if (value.getClass() != valueClass)
  152.         throw new IOException("wrong value class: "+ value.getClass()
  153.                               +" is not "+ valueClass);
  154.       // Append the 'key'
  155.       keySerializer.serialize(key);
  156.       int keyLength = buffer.getLength();
  157.       if (keyLength < 0) {
  158.         throw new IOException("Negative key-length not allowed: " + keyLength + 
  159.                               " for " + key);
  160.       }
  161.       // Append the 'value'
  162.       valueSerializer.serialize(value);
  163.       int valueLength = buffer.getLength() - keyLength;
  164.       if (valueLength < 0) {
  165.         throw new IOException("Negative value-length not allowed: " + 
  166.                               valueLength + " for " + value);
  167.       }
  168.       
  169.       // Write the record out
  170.       WritableUtils.writeVInt(out, keyLength);                  // key length
  171.       WritableUtils.writeVInt(out, valueLength);                // value length
  172.       out.write(buffer.getData(), 0, buffer.getLength());       // data
  173.       // Reset
  174.       buffer.reset();
  175.       
  176.       // Update bytes written
  177.       decompressedBytesWritten += keyLength + valueLength + 
  178.                                   WritableUtils.getVIntSize(keyLength) + 
  179.                                   WritableUtils.getVIntSize(valueLength);
  180.       ++numRecordsWritten;
  181.     }
  182.     
  183.     public void append(DataInputBuffer key, DataInputBuffer value)
  184.     throws IOException {
  185.       int keyLength = key.getLength() - key.getPosition();
  186.       if (keyLength < 0) {
  187.         throw new IOException("Negative key-length not allowed: " + keyLength + 
  188.                               " for " + key);
  189.       }
  190.       
  191.       int valueLength = value.getLength() - value.getPosition();
  192.       if (valueLength < 0) {
  193.         throw new IOException("Negative value-length not allowed: " + 
  194.                               valueLength + " for " + value);
  195.       }
  196.       WritableUtils.writeVInt(out, keyLength);
  197.       WritableUtils.writeVInt(out, valueLength);
  198.       out.write(key.getData(), key.getPosition(), keyLength); 
  199.       out.write(value.getData(), value.getPosition(), valueLength); 
  200.       // Update bytes written
  201.       decompressedBytesWritten += keyLength + valueLength + 
  202.                       WritableUtils.getVIntSize(keyLength) + 
  203.                       WritableUtils.getVIntSize(valueLength);
  204.       ++numRecordsWritten;
  205.     }
  206.     
  207.     public long getRawLength() {
  208.       return decompressedBytesWritten;
  209.     }
  210.     
  211.     public long getCompressedLength() {
  212.       return compressedBytesWritten;
  213.     }
  214.   }
  215.   /**
  216.    * <code>IFile.Reader</code> to read intermediate map-outputs. 
  217.    */
  218.   public static class Reader<K extends Object, V extends Object> {
  219.     private static final int DEFAULT_BUFFER_SIZE = 128*1024;
  220.     private static final int MAX_VINT_SIZE = 9;
  221.     // Count records read from disk
  222.     private long numRecordsRead = 0;
  223.     private final Counters.Counter readRecordsCounter;
  224.     final InputStream in;        // Possibly decompressed stream that we read
  225.     Decompressor decompressor;
  226.     long bytesRead = 0;
  227.     final long fileLength;
  228.     boolean eof = false;
  229.     final IFileInputStream checksumIn;
  230.     
  231.     byte[] buffer = null;
  232.     int bufferSize = DEFAULT_BUFFER_SIZE;
  233.     DataInputBuffer dataIn = new DataInputBuffer();
  234.     int recNo = 1;
  235.     
  236.     /**
  237.      * Construct an IFile Reader.
  238.      * 
  239.      * @param conf Configuration File 
  240.      * @param fs  FileSystem
  241.      * @param file Path of the file to be opened. This file should have
  242.      *             checksum bytes for the data at the end of the file.
  243.      * @param codec codec
  244.      * @param readsCounter Counter for records read from disk
  245.      * @throws IOException
  246.      */
  247.     public Reader(Configuration conf, FileSystem fs, Path file,
  248.                   CompressionCodec codec,
  249.                   Counters.Counter readsCounter) throws IOException {
  250.       this(conf, fs.open(file), 
  251.            fs.getFileStatus(file).getLen(),
  252.            codec, readsCounter);
  253.     }
  254.     /**
  255.      * Construct an IFile Reader.
  256.      * 
  257.      * @param conf Configuration File 
  258.      * @param in   The input stream
  259.      * @param length Length of the data in the stream, including the checksum
  260.      *               bytes.
  261.      * @param codec codec
  262.      * @param readsCounter Counter for records read from disk
  263.      * @throws IOException
  264.      */
  265.     public Reader(Configuration conf, FSDataInputStream in, long length, 
  266.                   CompressionCodec codec,
  267.                   Counters.Counter readsCounter) throws IOException {
  268.       readRecordsCounter = readsCounter;
  269.       checksumIn = new IFileInputStream(in,length);
  270.       if (codec != null) {
  271.         decompressor = CodecPool.getDecompressor(codec);
  272.         this.in = codec.createInputStream(checksumIn, decompressor);
  273.       } else {
  274.         this.in = checksumIn;
  275.       }
  276.       this.fileLength = length;
  277.       
  278.       if (conf != null) {
  279.         bufferSize = conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE);
  280.       }
  281.     }
  282.     
  283.     public long getLength() { 
  284.       return fileLength - checksumIn.getSize();
  285.     }
  286.     
  287.     public long getPosition() throws IOException {    
  288.       return checksumIn.getPosition(); 
  289.     }
  290.     
  291.     /**
  292.      * Read upto len bytes into buf starting at offset off.
  293.      * 
  294.      * @param buf buffer 
  295.      * @param off offset
  296.      * @param len length of buffer
  297.      * @return the no. of bytes read
  298.      * @throws IOException
  299.      */
  300.     private int readData(byte[] buf, int off, int len) throws IOException {
  301.       int bytesRead = 0;
  302.       while (bytesRead < len) {
  303.         int n = in.read(buf, off+bytesRead, len-bytesRead);
  304.         if (n < 0) {
  305.           return bytesRead;
  306.         }
  307.         bytesRead += n;
  308.       }
  309.       return len;
  310.     }
  311.     
  312.     void readNextBlock(int minSize) throws IOException {
  313.       if (buffer == null) {
  314.         buffer = new byte[bufferSize];
  315.         dataIn.reset(buffer, 0, 0);
  316.       }
  317.       buffer = 
  318.         rejigData(buffer, 
  319.                   (bufferSize < minSize) ? new byte[minSize << 1] : buffer);
  320.       bufferSize = buffer.length;
  321.     }
  322.     
  323.     private byte[] rejigData(byte[] source, byte[] destination) 
  324.     throws IOException{
  325.       // Copy remaining data into the destination array
  326.       int bytesRemaining = dataIn.getLength()-dataIn.getPosition();
  327.       if (bytesRemaining > 0) {
  328.         System.arraycopy(source, dataIn.getPosition(), 
  329.             destination, 0, bytesRemaining);
  330.       }
  331.       
  332.       // Read as much data as will fit from the underlying stream 
  333.       int n = readData(destination, bytesRemaining, 
  334.                        (destination.length - bytesRemaining));
  335.       dataIn.reset(destination, 0, (bytesRemaining + n));
  336.       
  337.       return destination;
  338.     }
  339.     
  340.     public boolean next(DataInputBuffer key, DataInputBuffer value) 
  341.     throws IOException {
  342.       // Sanity check
  343.       if (eof) {
  344.         throw new EOFException("Completed reading " + bytesRead);
  345.       }
  346.       
  347.       // Check if we have enough data to read lengths
  348.       if ((dataIn.getLength() - dataIn.getPosition()) < 2*MAX_VINT_SIZE) {
  349.         readNextBlock(2*MAX_VINT_SIZE);
  350.       }
  351.       
  352.       // Read key and value lengths
  353.       int oldPos = dataIn.getPosition();
  354.       int keyLength = WritableUtils.readVInt(dataIn);
  355.       int valueLength = WritableUtils.readVInt(dataIn);
  356.       int pos = dataIn.getPosition();
  357.       bytesRead += pos - oldPos;
  358.       
  359.       // Check for EOF
  360.       if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
  361.         eof = true;
  362.         return false;
  363.       }
  364.       
  365.       // Sanity check
  366.       if (keyLength < 0) {
  367.         throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
  368.                               keyLength);
  369.       }
  370.       if (valueLength < 0) {
  371.         throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
  372.                               valueLength);
  373.       }
  374.       
  375.       final int recordLength = keyLength + valueLength;
  376.       
  377.       // Check if we have the raw key/value in the buffer
  378.       if ((dataIn.getLength()-pos) < recordLength) {
  379.         readNextBlock(recordLength);
  380.         
  381.         // Sanity check
  382.         if ((dataIn.getLength() - dataIn.getPosition()) < recordLength) {
  383.           throw new EOFException("Rec# " + recNo + ": Could read the next " +
  384.                               " record");
  385.         }
  386.       }
  387.       // Setup the key and value
  388.       pos = dataIn.getPosition();
  389.       byte[] data = dataIn.getData();
  390.       key.reset(data, pos, keyLength);
  391.       value.reset(data, (pos + keyLength), valueLength);
  392.       
  393.       // Position for the next record
  394.       long skipped = dataIn.skip(recordLength);
  395.       if (skipped != recordLength) {
  396.         throw new IOException("Rec# " + recNo + ": Failed to skip past record " +
  397.                            "of length: " + recordLength);
  398.       }
  399.       
  400.       // Record the bytes read
  401.       bytesRead += recordLength;
  402.       ++recNo;
  403.       ++numRecordsRead;
  404.       return true;
  405.     }
  406.     public void close() throws IOException {
  407.       // Return the decompressor
  408.       if (decompressor != null) {
  409.         decompressor.reset();
  410.         CodecPool.returnDecompressor(decompressor);
  411.         decompressor = null;
  412.       }
  413.       
  414.       // Close the underlying stream
  415.       in.close();
  416.       
  417.       // Release the buffer
  418.       dataIn = null;
  419.       buffer = null;
  420.       if(readRecordsCounter != null) {
  421.         readRecordsCounter.increment(numRecordsRead);
  422.       }
  423.     }
  424.   }    
  425.   
  426.   /**
  427.    * <code>IFile.InMemoryReader</code> to read map-outputs present in-memory.
  428.    */
  429.   public static class InMemoryReader<K, V> extends Reader<K, V> {
  430.     RamManager ramManager;
  431.     TaskAttemptID taskAttemptId;
  432.     
  433.     public InMemoryReader(RamManager ramManager, TaskAttemptID taskAttemptId,
  434.                           byte[] data, int start, int length)
  435.                           throws IOException {
  436.       super(null, null, length - start, null, null);
  437.       this.ramManager = ramManager;
  438.       this.taskAttemptId = taskAttemptId;
  439.       
  440.       buffer = data;
  441.       bufferSize = (int)fileLength;
  442.       dataIn.reset(buffer, start, length);
  443.     }
  444.     
  445.     @Override
  446.     public long getPosition() throws IOException {
  447.       // InMemoryReader does not initialize streams like Reader, so in.getPos()
  448.       // would not work. Instead, return the number of uncompressed bytes read,
  449.       // which will be correct since in-memory data is not compressed.
  450.       return bytesRead;
  451.     }
  452.     
  453.     @Override
  454.     public long getLength() { 
  455.       return fileLength;
  456.     }
  457.     
  458.     private void dumpOnError() {
  459.       File dumpFile = new File("../output/" + taskAttemptId + ".dump");
  460.       System.err.println("Dumping corrupt map-output of " + taskAttemptId + 
  461.                          " to " + dumpFile.getAbsolutePath());
  462.       try {
  463.         FileOutputStream fos = new FileOutputStream(dumpFile);
  464.         fos.write(buffer, 0, bufferSize);
  465.         fos.close();
  466.       } catch (IOException ioe) {
  467.         System.err.println("Failed to dump map-output of " + taskAttemptId);
  468.       }
  469.     }
  470.     
  471.     public boolean next(DataInputBuffer key, DataInputBuffer value) 
  472.     throws IOException {
  473.       try {
  474.       // Sanity check
  475.       if (eof) {
  476.         throw new EOFException("Completed reading " + bytesRead);
  477.       }
  478.       
  479.       // Read key and value lengths
  480.       int oldPos = dataIn.getPosition();
  481.       int keyLength = WritableUtils.readVInt(dataIn);
  482.       int valueLength = WritableUtils.readVInt(dataIn);
  483.       int pos = dataIn.getPosition();
  484.       bytesRead += pos - oldPos;
  485.       
  486.       // Check for EOF
  487.       if (keyLength == EOF_MARKER && valueLength == EOF_MARKER) {
  488.         eof = true;
  489.         return false;
  490.       }
  491.       
  492.       // Sanity check
  493.       if (keyLength < 0) {
  494.         throw new IOException("Rec# " + recNo + ": Negative key-length: " + 
  495.                               keyLength);
  496.       }
  497.       if (valueLength < 0) {
  498.         throw new IOException("Rec# " + recNo + ": Negative value-length: " + 
  499.                               valueLength);
  500.       }
  501.       final int recordLength = keyLength + valueLength;
  502.       
  503.       // Setup the key and value
  504.       pos = dataIn.getPosition();
  505.       byte[] data = dataIn.getData();
  506.       key.reset(data, pos, keyLength);
  507.       value.reset(data, (pos + keyLength), valueLength);
  508.       
  509.       // Position for the next record
  510.       long skipped = dataIn.skip(recordLength);
  511.       if (skipped != recordLength) {
  512.         throw new IOException("Rec# " + recNo + ": Failed to skip past record of length: " + 
  513.                               recordLength);
  514.       }
  515.       
  516.       // Record the byte
  517.       bytesRead += recordLength;
  518.       ++recNo;
  519.       
  520.       return true;
  521.       } catch (IOException ioe) {
  522.         dumpOnError();
  523.         throw ioe;
  524.       }
  525.     }
  526.       
  527.     public void close() {
  528.       // Release
  529.       dataIn = null;
  530.       buffer = null;
  531.       
  532.       // Inform the RamManager
  533.       ramManager.unreserve(bufferSize);
  534.     }
  535.   }
  536. }