BlockReceiver.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:35k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.BufferedOutputStream;
  20. import java.io.DataInputStream;
  21. import java.io.DataOutputStream;
  22. import java.io.EOFException;
  23. import java.io.IOException;
  24. import java.io.OutputStream;
  25. import java.nio.ByteBuffer;
  26. import java.util.LinkedList;
  27. import java.util.zip.CRC32;
  28. import java.util.zip.Checksum;
  29. import org.apache.commons.logging.Log;
  30. import org.apache.hadoop.fs.FSInputChecker;
  31. import org.apache.hadoop.fs.FSOutputSummer;
  32. import org.apache.hadoop.hdfs.protocol.Block;
  33. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  34. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  35. import org.apache.hadoop.hdfs.protocol.FSConstants;
  36. import org.apache.hadoop.hdfs.protocol.LocatedBlock;
  37. import org.apache.hadoop.io.IOUtils;
  38. import org.apache.hadoop.util.Daemon;
  39. import org.apache.hadoop.util.DataChecksum;
  40. import org.apache.hadoop.util.StringUtils;
  41. import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
  42. /** A class that receives a block and writes to its own disk, meanwhile
  43.  * may copies it to another site. If a throttler is provided,
  44.  * streaming throttling is also supported.
  45.  **/
  46. class BlockReceiver implements java.io.Closeable, FSConstants {
  47.   public static final Log LOG = DataNode.LOG;
  48.   static final Log ClientTraceLog = DataNode.ClientTraceLog;
  49.   
  50.   private Block block; // the block to receive
  51.   protected boolean finalized;
  52.   private DataInputStream in = null; // from where data are read
  53.   private DataChecksum checksum; // from where chunks of a block can be read
  54.   private OutputStream out = null; // to block file at local disk
  55.   private DataOutputStream checksumOut = null; // to crc file at local disk
  56.   private int bytesPerChecksum;
  57.   private int checksumSize;
  58.   private ByteBuffer buf; // contains one full packet.
  59.   private int bufRead; //amount of valid data in the buf
  60.   private int maxPacketReadLen;
  61.   protected long offsetInBlock;
  62.   protected final String inAddr;
  63.   protected final String myAddr;
  64.   private String mirrorAddr;
  65.   private DataOutputStream mirrorOut;
  66.   private Daemon responder = null;
  67.   private BlockTransferThrottler throttler;
  68.   private FSDataset.BlockWriteStreams streams;
  69.   private boolean isRecovery = false;
  70.   private String clientName;
  71.   DatanodeInfo srcDataNode = null;
  72.   private Checksum partialCrc = null;
  73.   private DataNode datanode = null;
  74.   BlockReceiver(Block block, DataInputStream in, String inAddr,
  75.                 String myAddr, boolean isRecovery, String clientName, 
  76.                 DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
  77.     try{
  78.       this.block = block;
  79.       this.in = in;
  80.       this.inAddr = inAddr;
  81.       this.myAddr = myAddr;
  82.       this.isRecovery = isRecovery;
  83.       this.clientName = clientName;
  84.       this.offsetInBlock = 0;
  85.       this.srcDataNode = srcDataNode;
  86.       this.datanode = datanode;
  87.       this.checksum = DataChecksum.newDataChecksum(in);
  88.       this.bytesPerChecksum = checksum.getBytesPerChecksum();
  89.       this.checksumSize = checksum.getChecksumSize();
  90.       //
  91.       // Open local disk out
  92.       //
  93.       streams = datanode.data.writeToBlock(block, isRecovery);
  94.       this.finalized = datanode.data.isValidBlock(block);
  95.       if (streams != null) {
  96.         this.out = streams.dataOut;
  97.         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
  98.                                                   streams.checksumOut, 
  99.                                                   SMALL_BUFFER_SIZE));
  100.         // If this block is for appends, then remove it from periodic
  101.         // validation.
  102.         if (datanode.blockScanner != null && isRecovery) {
  103.           datanode.blockScanner.deleteBlock(block);
  104.         }
  105.       }
  106.     } catch (BlockAlreadyExistsException bae) {
  107.       throw bae;
  108.     } catch(IOException ioe) {
  109.       IOUtils.closeStream(this);
  110.       cleanupBlock();
  111.       
  112.       // check if there is a disk error
  113.       IOException cause = FSDataset.getCauseIfDiskError(ioe);
  114.       if (cause != null) { // possible disk error
  115.         ioe = cause;
  116.         datanode.checkDiskError(ioe); // may throw an exception here
  117.       }
  118.       
  119.       throw ioe;
  120.     }
  121.   }
  122.   /**
  123.    * close files.
  124.    */
  125.   public void close() throws IOException {
  126.     IOException ioe = null;
  127.     // close checksum file
  128.     try {
  129.       if (checksumOut != null) {
  130.         checksumOut.flush();
  131.         checksumOut.close();
  132.         checksumOut = null;
  133.       }
  134.     } catch(IOException e) {
  135.       ioe = e;
  136.     }
  137.     // close block file
  138.     try {
  139.       if (out != null) {
  140.         out.flush();
  141.         out.close();
  142.         out = null;
  143.       }
  144.     } catch (IOException e) {
  145.       ioe = e;
  146.     }
  147.     // disk check
  148.     if(ioe != null) {
  149.       datanode.checkDiskError(ioe);
  150.       throw ioe;
  151.     }
  152.   }
  153.   /**
  154.    * Flush block data and metadata files to disk.
  155.    * @throws IOException
  156.    */
  157.   void flush() throws IOException {
  158.     if (checksumOut != null) {
  159.       checksumOut.flush();
  160.     }
  161.     if (out != null) {
  162.       out.flush();
  163.     }
  164.   }
  165.   /**
  166.    * While writing to mirrorOut, failure to write to mirror should not
  167.    * affect this datanode unless a client is writing the block.
  168.    */
  169.   private void handleMirrorOutError(IOException ioe) throws IOException {
  170.     LOG.info(datanode.dnRegistration + ":Exception writing block " +
  171.              block + " to mirror " + mirrorAddr + "n" +
  172.              StringUtils.stringifyException(ioe));
  173.     mirrorOut = null;
  174.     //
  175.     // If stream-copy fails, continue
  176.     // writing to disk for replication requests. For client
  177.     // writes, return error so that the client can do error
  178.     // recovery.
  179.     //
  180.     if (clientName.length() > 0) {
  181.       throw ioe;
  182.     }
  183.   }
  184.   
  185.   /**
  186.    * Verify multiple CRC chunks. 
  187.    */
  188.   private void verifyChunks( byte[] dataBuf, int dataOff, int len, 
  189.                              byte[] checksumBuf, int checksumOff ) 
  190.                              throws IOException {
  191.     while (len > 0) {
  192.       int chunkLen = Math.min(len, bytesPerChecksum);
  193.       
  194.       checksum.update(dataBuf, dataOff, chunkLen);
  195.       if (!checksum.compare(checksumBuf, checksumOff)) {
  196.         if (srcDataNode != null) {
  197.           try {
  198.             LOG.info("report corrupt block " + block + " from datanode " +
  199.                       srcDataNode + " to namenode");
  200.             LocatedBlock lb = new LocatedBlock(block, 
  201.                                             new DatanodeInfo[] {srcDataNode});
  202.             datanode.namenode.reportBadBlocks(new LocatedBlock[] {lb});
  203.           } catch (IOException e) {
  204.             LOG.warn("Failed to report bad block " + block + 
  205.                       " from datanode " + srcDataNode + " to namenode");
  206.           }
  207.         }
  208.         throw new IOException("Unexpected checksum mismatch " + 
  209.                               "while writing " + block + " from " + inAddr);
  210.       }
  211.       checksum.reset();
  212.       dataOff += chunkLen;
  213.       checksumOff += checksumSize;
  214.       len -= chunkLen;
  215.     }
  216.   }
  217.   /**
  218.    * Makes sure buf.position() is zero without modifying buf.remaining().
  219.    * It moves the data if position needs to be changed.
  220.    */
  221.   private void shiftBufData() {
  222.     if (bufRead != buf.limit()) {
  223.       throw new IllegalStateException("bufRead should be same as " +
  224.                                       "buf.limit()");
  225.     }
  226.     
  227.     //shift the remaining data on buf to the front
  228.     if (buf.position() > 0) {
  229.       int dataLeft = buf.remaining();
  230.       if (dataLeft > 0) {
  231.         byte[] b = buf.array();
  232.         System.arraycopy(b, buf.position(), b, 0, dataLeft);
  233.       }
  234.       buf.position(0);
  235.       bufRead = dataLeft;
  236.       buf.limit(bufRead);
  237.     }
  238.   }
  239.   
  240.   /**
  241.    * reads upto toRead byte to buf at buf.limit() and increments the limit.
  242.    * throws an IOException if read does not succeed.
  243.    */
  244.   private int readToBuf(int toRead) throws IOException {
  245.     if (toRead < 0) {
  246.       toRead = (maxPacketReadLen > 0 ? maxPacketReadLen : buf.capacity())
  247.                - buf.limit();
  248.     }
  249.     
  250.     int nRead = in.read(buf.array(), buf.limit(), toRead);
  251.     
  252.     if (nRead < 0) {
  253.       throw new EOFException("while trying to read " + toRead + " bytes");
  254.     }
  255.     bufRead = buf.limit() + nRead;
  256.     buf.limit(bufRead);
  257.     return nRead;
  258.   }
  259.   
  260.   
  261.   /**
  262.    * Reads (at least) one packet and returns the packet length.
  263.    * buf.position() points to the start of the packet and 
  264.    * buf.limit() point to the end of the packet. There could 
  265.    * be more data from next packet in buf.<br><br>
  266.    * 
  267.    * It tries to read a full packet with single read call.
  268.    * Consecutive packets are usually of the same length.
  269.    */
  270.   private int readNextPacket() throws IOException {
  271.     /* This dances around buf a little bit, mainly to read 
  272.      * full packet with single read and to accept arbitarary size  
  273.      * for next packet at the same time.
  274.      */
  275.     if (buf == null) {
  276.       /* initialize buffer to the best guess size:
  277.        * 'chunksPerPacket' calculation here should match the same 
  278.        * calculation in DFSClient to make the guess accurate.
  279.        */
  280.       int chunkSize = bytesPerChecksum + checksumSize;
  281.       int chunksPerPacket = (datanode.writePacketSize - DataNode.PKT_HEADER_LEN - 
  282.                              SIZE_OF_INTEGER + chunkSize - 1)/chunkSize;
  283.       buf = ByteBuffer.allocate(DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
  284.                                 Math.max(chunksPerPacket, 1) * chunkSize);
  285.       buf.limit(0);
  286.     }
  287.     
  288.     // See if there is data left in the buffer :
  289.     if (bufRead > buf.limit()) {
  290.       buf.limit(bufRead);
  291.     }
  292.     
  293.     while (buf.remaining() < SIZE_OF_INTEGER) {
  294.       if (buf.position() > 0) {
  295.         shiftBufData();
  296.       }
  297.       readToBuf(-1);
  298.     }
  299.     
  300.     /* We mostly have the full packet or at least enough for an int
  301.      */
  302.     buf.mark();
  303.     int payloadLen = buf.getInt();
  304.     buf.reset();
  305.     
  306.     if (payloadLen == 0) {
  307.       //end of stream!
  308.       buf.limit(buf.position() + SIZE_OF_INTEGER);
  309.       return 0;
  310.     }
  311.     
  312.     // check corrupt values for pktLen, 100MB upper limit should be ok?
  313.     if (payloadLen < 0 || payloadLen > (100*1024*1024)) {
  314.       throw new IOException("Incorrect value for packet payload : " +
  315.                             payloadLen);
  316.     }
  317.     
  318.     int pktSize = payloadLen + DataNode.PKT_HEADER_LEN;
  319.     
  320.     if (buf.remaining() < pktSize) {
  321.       //we need to read more data
  322.       int toRead = pktSize - buf.remaining();
  323.       
  324.       // first make sure buf has enough space.        
  325.       int spaceLeft = buf.capacity() - buf.limit();
  326.       if (toRead > spaceLeft && buf.position() > 0) {
  327.         shiftBufData();
  328.         spaceLeft = buf.capacity() - buf.limit();
  329.       }
  330.       if (toRead > spaceLeft) {
  331.         byte oldBuf[] = buf.array();
  332.         int toCopy = buf.limit();
  333.         buf = ByteBuffer.allocate(toCopy + toRead);
  334.         System.arraycopy(oldBuf, 0, buf.array(), 0, toCopy);
  335.         buf.limit(toCopy);
  336.       }
  337.       
  338.       //now read:
  339.       while (toRead > 0) {
  340.         toRead -= readToBuf(toRead);
  341.       }
  342.     }
  343.     
  344.     if (buf.remaining() > pktSize) {
  345.       buf.limit(buf.position() + pktSize);
  346.     }
  347.     
  348.     if (pktSize > maxPacketReadLen) {
  349.       maxPacketReadLen = pktSize;
  350.     }
  351.     
  352.     return payloadLen;
  353.   }
  354.   
  355.   /** 
  356.    * Receives and processes a packet. It can contain many chunks.
  357.    * returns size of the packet.
  358.    */
  359.   private int receivePacket() throws IOException {
  360.     
  361.     int payloadLen = readNextPacket();
  362.     
  363.     if (payloadLen <= 0) {
  364.       return payloadLen;
  365.     }
  366.     
  367.     buf.mark();
  368.     //read the header
  369.     buf.getInt(); // packet length
  370.     offsetInBlock = buf.getLong(); // get offset of packet in block
  371.     long seqno = buf.getLong();    // get seqno
  372.     boolean lastPacketInBlock = (buf.get() != 0);
  373.     
  374.     int endOfHeader = buf.position();
  375.     buf.reset();
  376.     
  377.     if (LOG.isDebugEnabled()){
  378.       LOG.debug("Receiving one packet for block " + block +
  379.                 " of length " + payloadLen +
  380.                 " seqno " + seqno +
  381.                 " offsetInBlock " + offsetInBlock +
  382.                 " lastPacketInBlock " + lastPacketInBlock);
  383.     }
  384.     
  385.     setBlockPosition(offsetInBlock);
  386.     
  387.     //First write the packet to the mirror:
  388.     if (mirrorOut != null) {
  389.       try {
  390.         mirrorOut.write(buf.array(), buf.position(), buf.remaining());
  391.         mirrorOut.flush();
  392.       } catch (IOException e) {
  393.         handleMirrorOutError(e);
  394.       }
  395.     }
  396.     buf.position(endOfHeader);        
  397.     int len = buf.getInt();
  398.     
  399.     if (len < 0) {
  400.       throw new IOException("Got wrong length during writeBlock(" + block + 
  401.                             ") from " + inAddr + " at offset " + 
  402.                             offsetInBlock + ": " + len); 
  403.     } 
  404.     if (len == 0) {
  405.       LOG.debug("Receiving empty packet for block " + block);
  406.     } else {
  407.       offsetInBlock += len;
  408.       int checksumLen = ((len + bytesPerChecksum - 1)/bytesPerChecksum)*
  409.                                                             checksumSize;
  410.       if ( buf.remaining() != (checksumLen + len)) {
  411.         throw new IOException("Data remaining in packet does not match " +
  412.                               "sum of checksumLen and dataLen");
  413.       }
  414.       int checksumOff = buf.position();
  415.       int dataOff = checksumOff + checksumLen;
  416.       byte pktBuf[] = buf.array();
  417.       buf.position(buf.limit()); // move to the end of the data.
  418.       /* skip verifying checksum iff this is not the last one in the 
  419.        * pipeline and clientName is non-null. i.e. Checksum is verified
  420.        * on all the datanodes when the data is being written by a 
  421.        * datanode rather than a client. Whe client is writing the data, 
  422.        * protocol includes acks and only the last datanode needs to verify 
  423.        * checksum.
  424.        */
  425.       if (mirrorOut == null || clientName.length() == 0) {
  426.         verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
  427.       }
  428.       try {
  429.         if (!finalized) {
  430.           //finally write to the disk :
  431.           out.write(pktBuf, dataOff, len);
  432.           // If this is a partial chunk, then verify that this is the only
  433.           // chunk in the packet. Calculate new crc for this chunk.
  434.           if (partialCrc != null) {
  435.             if (len > bytesPerChecksum) {
  436.               throw new IOException("Got wrong length during writeBlock(" + 
  437.                                     block + ") from " + inAddr + " " +
  438.                                     "A packet can have only one partial chunk."+
  439.                                     " len = " + len + 
  440.                                     " bytesPerChecksum " + bytesPerChecksum);
  441.             }
  442.             partialCrc.update(pktBuf, dataOff, len);
  443.             byte[] buf = FSOutputSummer.convertToByteStream(partialCrc, checksumSize);
  444.             checksumOut.write(buf);
  445.             LOG.debug("Writing out partial crc for data len " + len);
  446.             partialCrc = null;
  447.           } else {
  448.             checksumOut.write(pktBuf, checksumOff, checksumLen);
  449.           }
  450.           datanode.myMetrics.bytesWritten.inc(len);
  451.         }
  452.       } catch (IOException iex) {
  453.         datanode.checkDiskError(iex);
  454.         throw iex;
  455.       }
  456.     }
  457.     /// flush entire packet before sending ack
  458.     flush();
  459.     // put in queue for pending acks
  460.     if (responder != null) {
  461.       ((PacketResponder)responder.getRunnable()).enqueue(seqno,
  462.                                       lastPacketInBlock); 
  463.     }
  464.     
  465.     if (throttler != null) { // throttle I/O
  466.       throttler.throttle(payloadLen);
  467.     }
  468.     
  469.     return payloadLen;
  470.   }
  471.   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
  472.     checksum.writeHeader(mirrorOut);
  473.   }
  474.  
  475.   void receiveBlock(
  476.       DataOutputStream mirrOut, // output to next datanode
  477.       DataInputStream mirrIn,   // input from next datanode
  478.       DataOutputStream replyOut,  // output to previous datanode
  479.       String mirrAddr, BlockTransferThrottler throttlerArg,
  480.       int numTargets) throws IOException {
  481.       mirrorOut = mirrOut;
  482.       mirrorAddr = mirrAddr;
  483.       throttler = throttlerArg;
  484.     try {
  485.       // write data chunk header
  486.       if (!finalized) {
  487.         BlockMetadataHeader.writeHeader(checksumOut, checksum);
  488.       }
  489.       if (clientName.length() > 0) {
  490.         responder = new Daemon(datanode.threadGroup, 
  491.                                new PacketResponder(this, block, mirrIn, 
  492.                                                    replyOut, numTargets));
  493.         responder.start(); // start thread to processes reponses
  494.       }
  495.       /* 
  496.        * Receive until packet length is zero.
  497.        */
  498.       while (receivePacket() > 0) {}
  499.       // flush the mirror out
  500.       if (mirrorOut != null) {
  501.         try {
  502.           mirrorOut.writeInt(0); // mark the end of the block
  503.           mirrorOut.flush();
  504.         } catch (IOException e) {
  505.           handleMirrorOutError(e);
  506.         }
  507.       }
  508.       // wait for all outstanding packet responses. And then
  509.       // indicate responder to gracefully shutdown.
  510.       if (responder != null) {
  511.         ((PacketResponder)responder.getRunnable()).close();
  512.       }
  513.       // if this write is for a replication request (and not
  514.       // from a client), then finalize block. For client-writes, 
  515.       // the block is finalized in the PacketResponder.
  516.       if (clientName.length() == 0) {
  517.         // close the block/crc files
  518.         close();
  519.         // Finalize the block. Does this fsync()?
  520.         block.setNumBytes(offsetInBlock);
  521.         datanode.data.finalizeBlock(block);
  522.         datanode.myMetrics.blocksWritten.inc();
  523.       }
  524.     } catch (IOException ioe) {
  525.       LOG.info("Exception in receiveBlock for block " + block + 
  526.                " " + ioe);
  527.       IOUtils.closeStream(this);
  528.       if (responder != null) {
  529.         responder.interrupt();
  530.       }
  531.       cleanupBlock();
  532.       throw ioe;
  533.     } finally {
  534.       if (responder != null) {
  535.         try {
  536.           responder.join();
  537.         } catch (InterruptedException e) {
  538.           throw new IOException("Interrupted receiveBlock");
  539.         }
  540.         responder = null;
  541.       }
  542.     }
  543.   }
  544.   /** Cleanup a partial block 
  545.    * if this write is for a replication request (and not from a client)
  546.    */
  547.   private void cleanupBlock() throws IOException {
  548.     if (clientName.length() == 0) { // not client write
  549.       datanode.data.unfinalizeBlock(block);
  550.     }
  551.   }
  552.   /**
  553.    * Sets the file pointer in the local block file to the specified value.
  554.    */
  555.   private void setBlockPosition(long offsetInBlock) throws IOException {
  556.     if (finalized) {
  557.       if (!isRecovery) {
  558.         throw new IOException("Write to offset " + offsetInBlock +
  559.                               " of block " + block +
  560.                               " that is already finalized.");
  561.       }
  562.       if (offsetInBlock > datanode.data.getLength(block)) {
  563.         throw new IOException("Write to offset " + offsetInBlock +
  564.                               " of block " + block +
  565.                               " that is already finalized and is of size " +
  566.                               datanode.data.getLength(block));
  567.       }
  568.       return;
  569.     }
  570.     if (datanode.data.getChannelPosition(block, streams) == offsetInBlock) {
  571.       return;                   // nothing to do 
  572.     }
  573.     long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
  574.                             offsetInBlock / bytesPerChecksum * checksumSize;
  575.     if (out != null) {
  576.      out.flush();
  577.     }
  578.     if (checksumOut != null) {
  579.       checksumOut.flush();
  580.     }
  581.     // If this is a partial chunk, then read in pre-existing checksum
  582.     if (offsetInBlock % bytesPerChecksum != 0) {
  583.       LOG.info("setBlockPosition trying to set position to " +
  584.                offsetInBlock +
  585.                " for block " + block +
  586.                " which is not a multiple of bytesPerChecksum " +
  587.                bytesPerChecksum);
  588.       computePartialChunkCrc(offsetInBlock, offsetInChecksum, bytesPerChecksum);
  589.     }
  590.     LOG.info("Changing block file offset of block " + block + " from " + 
  591.         datanode.data.getChannelPosition(block, streams) +
  592.              " to " + offsetInBlock +
  593.              " meta file offset to " + offsetInChecksum);
  594.     // set the position of the block file
  595.     datanode.data.setChannelPosition(block, streams, offsetInBlock, offsetInChecksum);
  596.   }
  597.   /**
  598.    * reads in the partial crc chunk and computes checksum
  599.    * of pre-existing data in partial chunk.
  600.    */
  601.   private void computePartialChunkCrc(long blkoff, long ckoff, 
  602.                                       int bytesPerChecksum) throws IOException {
  603.     // find offset of the beginning of partial chunk.
  604.     //
  605.     int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
  606.     int checksumSize = checksum.getChecksumSize();
  607.     blkoff = blkoff - sizePartialChunk;
  608.     LOG.info("computePartialChunkCrc sizePartialChunk " + 
  609.               sizePartialChunk +
  610.               " block " + block +
  611.               " offset in block " + blkoff +
  612.               " offset in metafile " + ckoff);
  613.     // create an input stream from the block file
  614.     // and read in partial crc chunk into temporary buffer
  615.     //
  616.     byte[] buf = new byte[sizePartialChunk];
  617.     byte[] crcbuf = new byte[checksumSize];
  618.     FSDataset.BlockInputStreams instr = null;
  619.     try { 
  620.       instr = datanode.data.getTmpInputStreams(block, blkoff, ckoff);
  621.       IOUtils.readFully(instr.dataIn, buf, 0, sizePartialChunk);
  622.       // open meta file and read in crc value computer earlier
  623.       IOUtils.readFully(instr.checksumIn, crcbuf, 0, crcbuf.length);
  624.     } finally {
  625.       IOUtils.closeStream(instr);
  626.     }
  627.     // compute crc of partial chunk from data read in the block file.
  628.     partialCrc = new CRC32();
  629.     partialCrc.update(buf, 0, sizePartialChunk);
  630.     LOG.info("Read in partial CRC chunk from disk for block " + block);
  631.     // paranoia! verify that the pre-computed crc matches what we
  632.     // recalculated just now
  633.     if (partialCrc.getValue() != FSInputChecker.checksum2long(crcbuf)) {
  634.       String msg = "Partial CRC " + partialCrc.getValue() +
  635.                    " does not match value computed the " +
  636.                    " last time file was closed " +
  637.                    FSInputChecker.checksum2long(crcbuf);
  638.       throw new IOException(msg);
  639.     }
  640.     //LOG.debug("Partial CRC matches 0x" + 
  641.     //            Long.toHexString(partialCrc.getValue()));
  642.   }
  643.   
  644.   
  645.   /**
  646.    * Processed responses from downstream datanodes in the pipeline
  647.    * and sends back replies to the originator.
  648.    */
  649.   class PacketResponder implements Runnable, FSConstants {   
  650.     //packet waiting for ack
  651.     private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
  652.     private volatile boolean running = true;
  653.     private Block block;
  654.     DataInputStream mirrorIn;   // input from downstream datanode
  655.     DataOutputStream replyOut;  // output to upstream datanode
  656.     private int numTargets;     // number of downstream datanodes including myself
  657.     private BlockReceiver receiver; // The owner of this responder.
  658.     public String toString() {
  659.       return "PacketResponder " + numTargets + " for Block " + this.block;
  660.     }
  661.     PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
  662.                     DataOutputStream out, int numTargets) {
  663.       this.receiver = receiver;
  664.       this.block = b;
  665.       mirrorIn = in;
  666.       replyOut = out;
  667.       this.numTargets = numTargets;
  668.     }
  669.     /**
  670.      * enqueue the seqno that is still be to acked by the downstream datanode.
  671.      * @param seqno
  672.      * @param lastPacketInBlock
  673.      */
  674.     synchronized void enqueue(long seqno, boolean lastPacketInBlock) {
  675.       if (running) {
  676.         LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
  677.                   " to ack queue.");
  678.         ackQueue.addLast(new Packet(seqno, lastPacketInBlock));
  679.         notifyAll();
  680.       }
  681.     }
  682.     /**
  683.      * wait for all pending packets to be acked. Then shutdown thread.
  684.      */
  685.     synchronized void close() {
  686.       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
  687.         try {
  688.           wait();
  689.         } catch (InterruptedException e) {
  690.           running = false;
  691.         }
  692.       }
  693.       LOG.debug("PacketResponder " + numTargets +
  694.                " for block " + block + " Closing down.");
  695.       running = false;
  696.       notifyAll();
  697.     }
  698.     private synchronized void lastDataNodeRun() {
  699.       long lastHeartbeat = System.currentTimeMillis();
  700.       boolean lastPacket = false;
  701.       while (running && datanode.shouldRun && !lastPacket) {
  702.         long now = System.currentTimeMillis();
  703.         try {
  704.             // wait for a packet to be sent to downstream datanode
  705.             while (running && datanode.shouldRun && ackQueue.size() == 0) {
  706.               long idle = now - lastHeartbeat;
  707.               long timeout = (datanode.socketTimeout/2) - idle;
  708.               if (timeout <= 0) {
  709.                 timeout = 1000;
  710.               }
  711.               try {
  712.                 wait(timeout);
  713.               } catch (InterruptedException e) {
  714.                 if (running) {
  715.                   LOG.info("PacketResponder " + numTargets +
  716.                            " for block " + block + " Interrupted.");
  717.                   running = false;
  718.                 }
  719.                 break;
  720.               }
  721.           
  722.               // send a heartbeat if it is time.
  723.               now = System.currentTimeMillis();
  724.               if (now - lastHeartbeat > datanode.socketTimeout/2) {
  725.                 replyOut.writeLong(-1); // send heartbeat
  726.                 replyOut.flush();
  727.                 lastHeartbeat = now;
  728.               }
  729.             }
  730.             if (!running || !datanode.shouldRun) {
  731.               break;
  732.             }
  733.             Packet pkt = ackQueue.removeFirst();
  734.             long expected = pkt.seqno;
  735.             notifyAll();
  736.             LOG.debug("PacketResponder " + numTargets +
  737.                       " for block " + block + 
  738.                       " acking for packet " + expected);
  739.             // If this is the last packet in block, then close block
  740.             // file and finalize the block before responding success
  741.             if (pkt.lastPacketInBlock) {
  742.               if (!receiver.finalized) {
  743.                 receiver.close();
  744.                 block.setNumBytes(receiver.offsetInBlock);
  745.                 datanode.data.finalizeBlock(block);
  746.                 datanode.myMetrics.blocksWritten.inc();
  747.                 datanode.notifyNamenodeReceivedBlock(block, 
  748.                     DataNode.EMPTY_DEL_HINT);
  749.                 if (ClientTraceLog.isInfoEnabled() &&
  750.                     receiver.clientName.length() > 0) {
  751.                   ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
  752.                         receiver.inAddr, receiver.myAddr, block.getNumBytes(),
  753.                         "HDFS_WRITE", receiver.clientName,
  754.                         datanode.dnRegistration.getStorageID(), block));
  755.                 } else {
  756.                   LOG.info("Received block " + block + 
  757.                            " of size " + block.getNumBytes() + 
  758.                            " from " + receiver.inAddr);
  759.                 }
  760.               }
  761.               lastPacket = true;
  762.             }
  763.             replyOut.writeLong(expected);
  764.             replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
  765.             replyOut.flush();
  766.         } catch (Exception e) {
  767.           if (running) {
  768.             LOG.info("PacketResponder " + block + " " + numTargets + 
  769.                      " Exception " + StringUtils.stringifyException(e));
  770.             running = false;
  771.           }
  772.         }
  773.       }
  774.       LOG.info("PacketResponder " + numTargets + 
  775.                " for block " + block + " terminating");
  776.     }
  777.     /**
  778.      * Thread to process incoming acks.
  779.      * @see java.lang.Runnable#run()
  780.      */
  781.     public void run() {
  782.       // If this is the last datanode in pipeline, then handle differently
  783.       if (numTargets == 0) {
  784.         lastDataNodeRun();
  785.         return;
  786.       }
  787.       boolean lastPacketInBlock = false;
  788.       while (running && datanode.shouldRun && !lastPacketInBlock) {
  789.         try {
  790.             short op = DataTransferProtocol.OP_STATUS_SUCCESS;
  791.             boolean didRead = false;
  792.             long expected = -2;
  793.             try { 
  794.               // read seqno from downstream datanode
  795.               long seqno = mirrorIn.readLong();
  796.               didRead = true;
  797.               if (seqno == -1) {
  798.                 replyOut.writeLong(-1); // send keepalive
  799.                 replyOut.flush();
  800.                 LOG.debug("PacketResponder " + numTargets + " got -1");
  801.                 continue;
  802.               } else if (seqno == -2) {
  803.                 LOG.debug("PacketResponder " + numTargets + " got -2");
  804.               } else {
  805.                 LOG.debug("PacketResponder " + numTargets + " got seqno = " + 
  806.                     seqno);
  807.                 Packet pkt = null;
  808.                 synchronized (this) {
  809.                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
  810.                     if (LOG.isDebugEnabled()) {
  811.                       LOG.debug("PacketResponder " + numTargets + 
  812.                                 " seqno = " + seqno +
  813.                                 " for block " + block +
  814.                                 " waiting for local datanode to finish write.");
  815.                     }
  816.                     wait();
  817.                   }
  818.                   pkt = ackQueue.removeFirst();
  819.                   expected = pkt.seqno;
  820.                   notifyAll();
  821.                   LOG.debug("PacketResponder " + numTargets + " seqno = " + seqno);
  822.                   if (seqno != expected) {
  823.                     throw new IOException("PacketResponder " + numTargets +
  824.                                           " for block " + block +
  825.                                           " expected seqno:" + expected +
  826.                                           " received:" + seqno);
  827.                   }
  828.                   lastPacketInBlock = pkt.lastPacketInBlock;
  829.                 }
  830.               }
  831.             } catch (Throwable e) {
  832.               if (running) {
  833.                 LOG.info("PacketResponder " + block + " " + numTargets + 
  834.                          " Exception " + StringUtils.stringifyException(e));
  835.                 running = false;
  836.               }
  837.             }
  838.             if (Thread.interrupted()) {
  839.               /* The receiver thread cancelled this thread. 
  840.                * We could also check any other status updates from the 
  841.                * receiver thread (e.g. if it is ok to write to replyOut). 
  842.                * It is prudent to not send any more status back to the client
  843.                * because this datanode has a problem. The upstream datanode
  844.                * will detect a timout on heartbeats and will declare that
  845.                * this datanode is bad, and rightly so.
  846.                */
  847.               LOG.info("PacketResponder " + block +  " " + numTargets +
  848.                        " : Thread is interrupted.");
  849.               running = false;
  850.               continue;
  851.             }
  852.             
  853.             if (!didRead) {
  854.               op = DataTransferProtocol.OP_STATUS_ERROR;
  855.             }
  856.             
  857.             // If this is the last packet in block, then close block
  858.             // file and finalize the block before responding success
  859.             if (lastPacketInBlock && !receiver.finalized) {
  860.               receiver.close();
  861.               block.setNumBytes(receiver.offsetInBlock);
  862.               datanode.data.finalizeBlock(block);
  863.               datanode.myMetrics.blocksWritten.inc();
  864.               datanode.notifyNamenodeReceivedBlock(block, 
  865.                   DataNode.EMPTY_DEL_HINT);
  866.               if (ClientTraceLog.isInfoEnabled() &&
  867.                   receiver.clientName.length() > 0) {
  868.                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
  869.                       receiver.inAddr, receiver.myAddr, block.getNumBytes(),
  870.                       "HDFS_WRITE", receiver.clientName,
  871.                       datanode.dnRegistration.getStorageID(), block));
  872.               } else {
  873.                 LOG.info("Received block " + block + 
  874.                          " of size " + block.getNumBytes() + 
  875.                          " from " + receiver.inAddr);
  876.               }
  877.             }
  878.             // send my status back to upstream datanode
  879.             replyOut.writeLong(expected); // send seqno upstream
  880.             replyOut.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
  881.             LOG.debug("PacketResponder " + numTargets + 
  882.                       " for block " + block +
  883.                       " responded my status " +
  884.                       " for seqno " + expected);
  885.             // forward responses from downstream datanodes.
  886.             for (int i = 0; i < numTargets && datanode.shouldRun; i++) {
  887.               try {
  888.                 if (op == DataTransferProtocol.OP_STATUS_SUCCESS) {
  889.                   op = mirrorIn.readShort();
  890.                   if (op != DataTransferProtocol.OP_STATUS_SUCCESS) {
  891.                     LOG.debug("PacketResponder for block " + block +
  892.                               ": error code received from downstream " +
  893.                               " datanode[" + i + "] " + op);
  894.                   }
  895.                 }
  896.               } catch (Throwable e) {
  897.                 op = DataTransferProtocol.OP_STATUS_ERROR;
  898.               }
  899.               replyOut.writeShort(op);
  900.             }
  901.             replyOut.flush();
  902.             LOG.debug("PacketResponder " + block + " " + numTargets + 
  903.                       " responded other status " + " for seqno " + expected);
  904.             // If we were unable to read the seqno from downstream, then stop.
  905.             if (expected == -2) {
  906.               running = false;
  907.             }
  908.             // If we forwarded an error response from a downstream datanode
  909.             // and we are acting on behalf of a client, then we quit. The 
  910.             // client will drive the recovery mechanism.
  911.             if (op == DataTransferProtocol.OP_STATUS_ERROR && receiver.clientName.length() > 0) {
  912.               running = false;
  913.             }
  914.         } catch (IOException e) {
  915.           if (running) {
  916.             LOG.info("PacketResponder " + block + " " + numTargets + 
  917.                      " Exception " + StringUtils.stringifyException(e));
  918.             running = false;
  919.           }
  920.         } catch (RuntimeException e) {
  921.           if (running) {
  922.             LOG.info("PacketResponder " + block + " " + numTargets + 
  923.                      " Exception " + StringUtils.stringifyException(e));
  924.             running = false;
  925.           }
  926.         }
  927.       }
  928.       LOG.info("PacketResponder " + numTargets + 
  929.                " for block " + block + " terminating");
  930.     }
  931.   }
  932.   
  933.   /**
  934.    * This information is cached by the Datanode in the ackQueue.
  935.    */
  936.   static private class Packet {
  937.     long seqno;
  938.     boolean lastPacketInBlock;
  939.     Packet(long seqno, boolean lastPacketInBlock) {
  940.       this.seqno = seqno;
  941.       this.lastPacketInBlock = lastPacketInBlock;
  942.     }
  943.   }
  944. }