BlockSender.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.BufferedInputStream;
  20. import java.io.DataInputStream;
  21. import java.io.DataOutputStream;
  22. import java.io.FileInputStream;
  23. import java.io.IOException;
  24. import java.io.InputStream;
  25. import java.io.OutputStream;
  26. import java.net.SocketException;
  27. import java.nio.ByteBuffer;
  28. import java.nio.channels.FileChannel;
  29. import java.util.Arrays;
  30. import org.apache.commons.logging.Log;
  31. import org.apache.hadoop.fs.ChecksumException;
  32. import org.apache.hadoop.hdfs.protocol.Block;
  33. import org.apache.hadoop.hdfs.protocol.FSConstants;
  34. import org.apache.hadoop.io.IOUtils;
  35. import org.apache.hadoop.net.SocketOutputStream;
  36. import org.apache.hadoop.util.DataChecksum;
  37. import org.apache.hadoop.util.StringUtils;
  38. /**
  39.  * Reads a block from the disk and sends it to a recipient.
  40.  */
  41. class BlockSender implements java.io.Closeable, FSConstants {
  42.   public static final Log LOG = DataNode.LOG;
  43.   static final Log ClientTraceLog = DataNode.ClientTraceLog;
  44.   
  45.   private Block block; // the block to read from
  46.   private InputStream blockIn; // data stream
  47.   private long blockInPosition = -1; // updated while using transferTo().
  48.   private DataInputStream checksumIn; // checksum datastream
  49.   private DataChecksum checksum; // checksum stream
  50.   private long offset; // starting position to read
  51.   private long endOffset; // ending position
  52.   private long blockLength;
  53.   private int bytesPerChecksum; // chunk size
  54.   private int checksumSize; // checksum size
  55.   private boolean corruptChecksumOk; // if need to verify checksum
  56.   private boolean chunkOffsetOK; // if need to send chunk offset
  57.   private long seqno; // sequence number of packet
  58.   private boolean transferToAllowed = true;
  59.   private boolean blockReadFully; //set when the whole block is read
  60.   private boolean verifyChecksum; //if true, check is verified while reading
  61.   private BlockTransferThrottler throttler;
  62.   private final String clientTraceFmt; // format of client trace log message
  63.   /**
  64.    * Minimum buffer used while sending data to clients. Used only if
  65.    * transferTo() is enabled. 64KB is not that large. It could be larger, but
  66.    * not sure if there will be much more improvement.
  67.    */
  68.   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
  69.   
  70.   BlockSender(Block block, long startOffset, long length,
  71.               boolean corruptChecksumOk, boolean chunkOffsetOK,
  72.               boolean verifyChecksum, DataNode datanode) throws IOException {
  73.     this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
  74.          verifyChecksum, datanode, null);
  75.   }
  76.   BlockSender(Block block, long startOffset, long length,
  77.               boolean corruptChecksumOk, boolean chunkOffsetOK,
  78.               boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
  79.       throws IOException {
  80.     try {
  81.       this.block = block;
  82.       this.chunkOffsetOK = chunkOffsetOK;
  83.       this.corruptChecksumOk = corruptChecksumOk;
  84.       this.verifyChecksum = verifyChecksum;
  85.       this.blockLength = datanode.data.getLength(block);
  86.       this.transferToAllowed = datanode.transferToAllowed;
  87.       this.clientTraceFmt = clientTraceFmt;
  88.       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
  89.         checksumIn = new DataInputStream(
  90.                 new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
  91.                                         BUFFER_SIZE));
  92.         // read and handle the common header here. For now just a version
  93.        BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
  94.        short version = header.getVersion();
  95.         if (version != FSDataset.METADATA_VERSION) {
  96.           LOG.warn("Wrong version (" + version + ") for metadata file for "
  97.               + block + " ignoring ...");
  98.         }
  99.         checksum = header.getChecksum();
  100.       } else {
  101.         LOG.warn("Could not find metadata file for " + block);
  102.         // This only decides the buffer size. Use BUFFER_SIZE?
  103.         checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
  104.             16 * 1024);
  105.       }
  106.       /* If bytesPerChecksum is very large, then the metadata file
  107.        * is mostly corrupted. For now just truncate bytesPerchecksum to
  108.        * blockLength.
  109.        */        
  110.       bytesPerChecksum = checksum.getBytesPerChecksum();
  111.       if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
  112.         checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
  113.                                    Math.max((int)blockLength, 10*1024*1024));
  114.         bytesPerChecksum = checksum.getBytesPerChecksum();        
  115.       }
  116.       checksumSize = checksum.getChecksumSize();
  117.       if (length < 0) {
  118.         length = blockLength;
  119.       }
  120.       endOffset = blockLength;
  121.       if (startOffset < 0 || startOffset > endOffset
  122.           || (length + startOffset) > endOffset) {
  123.         String msg = " Offset " + startOffset + " and length " + length
  124.         + " don't match block " + block + " ( blockLen " + endOffset + " )";
  125.         LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
  126.         throw new IOException(msg);
  127.       }
  128.       
  129.       offset = (startOffset - (startOffset % bytesPerChecksum));
  130.       if (length >= 0) {
  131.         // Make sure endOffset points to end of a checksumed chunk.
  132.         long tmpLen = startOffset + length;
  133.         if (tmpLen % bytesPerChecksum != 0) {
  134.           tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
  135.         }
  136.         if (tmpLen < endOffset) {
  137.           endOffset = tmpLen;
  138.         }
  139.       }
  140.       // seek to the right offsets
  141.       if (offset > 0) {
  142.         long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
  143.         // note blockInStream is  seeked when created below
  144.         if (checksumSkip > 0) {
  145.           // Should we use seek() for checksum file as well?
  146.           IOUtils.skipFully(checksumIn, checksumSkip);
  147.         }
  148.       }
  149.       seqno = 0;
  150.       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
  151.     } catch (IOException ioe) {
  152.       IOUtils.closeStream(this);
  153.       IOUtils.closeStream(blockIn);
  154.       throw ioe;
  155.     }
  156.   }
  157.   /**
  158.    * close opened files.
  159.    */
  160.   public void close() throws IOException {
  161.     IOException ioe = null;
  162.     // close checksum file
  163.     if(checksumIn!=null) {
  164.       try {
  165.         checksumIn.close();
  166.       } catch (IOException e) {
  167.         ioe = e;
  168.       }
  169.       checksumIn = null;
  170.     }
  171.     // close data file
  172.     if(blockIn!=null) {
  173.       try {
  174.         blockIn.close();
  175.       } catch (IOException e) {
  176.         ioe = e;
  177.       }
  178.       blockIn = null;
  179.     }
  180.     // throw IOException if there is any
  181.     if(ioe!= null) {
  182.       throw ioe;
  183.     }
  184.   }
  185.   /**
  186.    * Converts an IOExcpetion (not subclasses) to SocketException.
  187.    * This is typically done to indicate to upper layers that the error 
  188.    * was a socket error rather than often more serious exceptions like 
  189.    * disk errors.
  190.    */
  191.   private static IOException ioeToSocketException(IOException ioe) {
  192.     if (ioe.getClass().equals(IOException.class)) {
  193.       // "se" could be a new class in stead of SocketException.
  194.       IOException se = new SocketException("Original Exception : " + ioe);
  195.       se.initCause(ioe);
  196.       /* Change the stacktrace so that original trace is not truncated
  197.        * when printed.*/ 
  198.       se.setStackTrace(ioe.getStackTrace());
  199.       return se;
  200.     }
  201.     // otherwise just return the same exception.
  202.     return ioe;
  203.   }
  204.   /**
  205.    * Sends upto maxChunks chunks of data.
  206.    * 
  207.    * When blockInPosition is >= 0, assumes 'out' is a 
  208.    * {@link SocketOutputStream} and tries 
  209.    * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
  210.    * send data (and updates blockInPosition).
  211.    */
  212.   private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out) 
  213.                          throws IOException {
  214.     // Sends multiple chunks in one packet with a single write().
  215.     int len = Math.min((int) (endOffset - offset),
  216.                        bytesPerChecksum*maxChunks);
  217.     if (len == 0) {
  218.       return 0;
  219.     }
  220.     int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
  221.     int packetLen = len + numChunks*checksumSize + 4;
  222.     pkt.clear();
  223.     
  224.     // write packet header
  225.     pkt.putInt(packetLen);
  226.     pkt.putLong(offset);
  227.     pkt.putLong(seqno);
  228.     pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
  229.                //why no ByteBuf.putBoolean()?
  230.     pkt.putInt(len);
  231.     
  232.     int checksumOff = pkt.position();
  233.     int checksumLen = numChunks * checksumSize;
  234.     byte[] buf = pkt.array();
  235.     
  236.     if (checksumSize > 0 && checksumIn != null) {
  237.       try {
  238.         checksumIn.readFully(buf, checksumOff, checksumLen);
  239.       } catch (IOException e) {
  240.         LOG.warn(" Could not read or failed to veirfy checksum for data" +
  241.                  " at offset " + offset + " for block " + block + " got : "
  242.                  + StringUtils.stringifyException(e));
  243.         IOUtils.closeStream(checksumIn);
  244.         checksumIn = null;
  245.         if (corruptChecksumOk) {
  246.           if (checksumOff < checksumLen) {
  247.             // Just fill the array with zeros.
  248.             Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
  249.           }
  250.         } else {
  251.           throw e;
  252.         }
  253.       }
  254.     }
  255.     
  256.     int dataOff = checksumOff + checksumLen;
  257.     
  258.     if (blockInPosition < 0) {
  259.       //normal transfer
  260.       IOUtils.readFully(blockIn, buf, dataOff, len);
  261.       if (verifyChecksum) {
  262.         int dOff = dataOff;
  263.         int cOff = checksumOff;
  264.         int dLeft = len;
  265.         for (int i=0; i<numChunks; i++) {
  266.           checksum.reset();
  267.           int dLen = Math.min(dLeft, bytesPerChecksum);
  268.           checksum.update(buf, dOff, dLen);
  269.           if (!checksum.compare(buf, cOff)) {
  270.             throw new ChecksumException("Checksum failed at " + 
  271.                                         (offset + len - dLeft), len);
  272.           }
  273.           dLeft -= dLen;
  274.           dOff += dLen;
  275.           cOff += checksumSize;
  276.         }
  277.       }
  278.       //writing is done below (mainly to handle IOException)
  279.     }
  280.     
  281.     try {
  282.       if (blockInPosition >= 0) {
  283.         //use transferTo(). Checks on out and blockIn are already done. 
  284.         SocketOutputStream sockOut = (SocketOutputStream)out;
  285.         //first write the packet
  286.         sockOut.write(buf, 0, dataOff);
  287.         // no need to flush. since we know out is not a buffered stream. 
  288.         sockOut.transferToFully(((FileInputStream)blockIn).getChannel(), 
  289.                                 blockInPosition, len);
  290.         blockInPosition += len;
  291.       } else {
  292.         // normal transfer
  293.         out.write(buf, 0, dataOff + len);
  294.       }
  295.       
  296.     } catch (IOException e) {
  297.       /* exception while writing to the client (well, with transferTo(),
  298.        * it could also be while reading from the local file).
  299.        */
  300.       throw ioeToSocketException(e);
  301.     }
  302.     if (throttler != null) { // rebalancing so throttle
  303.       throttler.throttle(packetLen);
  304.     }
  305.     return len;
  306.   }
  307.   /**
  308.    * sendBlock() is used to read block and its metadata and stream the data to
  309.    * either a client or to another datanode. 
  310.    * 
  311.    * @param out  stream to which the block is written to
  312.    * @param baseStream optional. if non-null, <code>out</code> is assumed to 
  313.    *        be a wrapper over this stream. This enables optimizations for
  314.    *        sending the data, e.g. 
  315.    *        {@link SocketOutputStream#transferToFully(FileChannel, 
  316.    *        long, int)}.
  317.    * @param throttler for sending data.
  318.    * @return total bytes reads, including crc.
  319.    */
  320.   long sendBlock(DataOutputStream out, OutputStream baseStream, 
  321.                  BlockTransferThrottler throttler) throws IOException {
  322.     if( out == null ) {
  323.       throw new IOException( "out stream is null" );
  324.     }
  325.     this.throttler = throttler;
  326.     long initialOffset = offset;
  327.     long totalRead = 0;
  328.     OutputStream streamForSendChunks = out;
  329.     
  330.     try {
  331.       try {
  332.         checksum.writeHeader(out);
  333.         if ( chunkOffsetOK ) {
  334.           out.writeLong( offset );
  335.         }
  336.         out.flush();
  337.       } catch (IOException e) { //socket error
  338.         throw ioeToSocketException(e);
  339.       }
  340.       
  341.       int maxChunksPerPacket;
  342.       int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
  343.       
  344.       if (transferToAllowed && !verifyChecksum && 
  345.           baseStream instanceof SocketOutputStream && 
  346.           blockIn instanceof FileInputStream) {
  347.         
  348.         FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
  349.         
  350.         // blockInPosition also indicates sendChunks() uses transferTo.
  351.         blockInPosition = fileChannel.position();
  352.         streamForSendChunks = baseStream;
  353.         
  354.         // assure a mininum buffer size.
  355.         maxChunksPerPacket = (Math.max(BUFFER_SIZE, 
  356.                                        MIN_BUFFER_WITH_TRANSFERTO)
  357.                               + bytesPerChecksum - 1)/bytesPerChecksum;
  358.         
  359.         // allocate smaller buffer while using transferTo(). 
  360.         pktSize += checksumSize * maxChunksPerPacket;
  361.       } else {
  362.         maxChunksPerPacket = Math.max(1,
  363.                  (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
  364.         pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
  365.       }
  366.       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
  367.       while (endOffset > offset) {
  368.         long len = sendChunks(pktBuf, maxChunksPerPacket, 
  369.                               streamForSendChunks);
  370.         offset += len;
  371.         totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
  372.                             checksumSize);
  373.         seqno++;
  374.       }
  375.       try {
  376.         out.writeInt(0); // mark the end of block        
  377.         out.flush();
  378.       } catch (IOException e) { //socket error
  379.         throw ioeToSocketException(e);
  380.       }
  381.     } finally {
  382.       if (clientTraceFmt != null) {
  383.         ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
  384.       }
  385.       close();
  386.     }
  387.     blockReadFully = (initialOffset == 0 && offset >= blockLength);
  388.     return totalRead;
  389.   }
  390.   
  391.   boolean isBlockReadFully() {
  392.     return blockReadFully;
  393.   }
  394. }