DataXceiver.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:23k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.BufferedInputStream;
  20. import java.io.BufferedOutputStream;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutputStream;
  23. import java.io.IOException;
  24. import java.io.OutputStream;
  25. import java.net.InetSocketAddress;
  26. import java.net.Socket;
  27. import java.net.SocketException;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.hadoop.hdfs.protocol.Block;
  30. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  31. import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
  32. import org.apache.hadoop.hdfs.protocol.FSConstants;
  33. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  34. import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.MetaDataInputStream;
  35. import org.apache.hadoop.io.IOUtils;
  36. import org.apache.hadoop.io.MD5Hash;
  37. import org.apache.hadoop.io.Text;
  38. import org.apache.hadoop.net.NetUtils;
  39. import org.apache.hadoop.util.DataChecksum;
  40. import org.apache.hadoop.util.StringUtils;
  41. import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
  42. /**
  43.  * Thread for processing incoming/outgoing data stream.
  44.  */
  45. class DataXceiver implements Runnable, FSConstants {
  46.   public static final Log LOG = DataNode.LOG;
  47.   static final Log ClientTraceLog = DataNode.ClientTraceLog;
  48.   
  49.   Socket s;
  50.   final String remoteAddress; // address of remote side
  51.   final String localAddress;  // local address of this daemon
  52.   DataNode datanode;
  53.   DataXceiverServer dataXceiverServer;
  54.   
  55.   public DataXceiver(Socket s, DataNode datanode, 
  56.       DataXceiverServer dataXceiverServer) {
  57.     
  58.     this.s = s;
  59.     this.datanode = datanode;
  60.     this.dataXceiverServer = dataXceiverServer;
  61.     dataXceiverServer.childSockets.put(s, s);
  62.     remoteAddress = s.getRemoteSocketAddress().toString();
  63.     localAddress = s.getLocalSocketAddress().toString();
  64.     LOG.debug("Number of active connections is: " + datanode.getXceiverCount());
  65.   }
  66.   /**
  67.    * Read/write data from/to the DataXceiveServer.
  68.    */
  69.   public void run() {
  70.     DataInputStream in=null; 
  71.     try {
  72.       in = new DataInputStream(
  73.           new BufferedInputStream(NetUtils.getInputStream(s), 
  74.                                   SMALL_BUFFER_SIZE));
  75.       short version = in.readShort();
  76.       if ( version != DataTransferProtocol.DATA_TRANSFER_VERSION ) {
  77.         throw new IOException( "Version Mismatch" );
  78.       }
  79.       boolean local = s.getInetAddress().equals(s.getLocalAddress());
  80.       byte op = in.readByte();
  81.       // Make sure the xciver count is not exceeded
  82.       int curXceiverCount = datanode.getXceiverCount();
  83.       if (curXceiverCount > dataXceiverServer.maxXceiverCount) {
  84.         throw new IOException("xceiverCount " + curXceiverCount
  85.                               + " exceeds the limit of concurrent xcievers "
  86.                               + dataXceiverServer.maxXceiverCount);
  87.       }
  88.       long startTime = DataNode.now();
  89.       switch ( op ) {
  90.       case DataTransferProtocol.OP_READ_BLOCK:
  91.         readBlock( in );
  92.         datanode.myMetrics.readBlockOp.inc(DataNode.now() - startTime);
  93.         if (local)
  94.           datanode.myMetrics.readsFromLocalClient.inc();
  95.         else
  96.           datanode.myMetrics.readsFromRemoteClient.inc();
  97.         break;
  98.       case DataTransferProtocol.OP_WRITE_BLOCK:
  99.         writeBlock( in );
  100.         datanode.myMetrics.writeBlockOp.inc(DataNode.now() - startTime);
  101.         if (local)
  102.           datanode.myMetrics.writesFromLocalClient.inc();
  103.         else
  104.           datanode.myMetrics.writesFromRemoteClient.inc();
  105.         break;
  106.       case DataTransferProtocol.OP_READ_METADATA:
  107.         readMetadata( in );
  108.         datanode.myMetrics.readMetadataOp.inc(DataNode.now() - startTime);
  109.         break;
  110.       case DataTransferProtocol.OP_REPLACE_BLOCK: // for balancing purpose; send to a destination
  111.         replaceBlock(in);
  112.         datanode.myMetrics.replaceBlockOp.inc(DataNode.now() - startTime);
  113.         break;
  114.       case DataTransferProtocol.OP_COPY_BLOCK:
  115.             // for balancing purpose; send to a proxy source
  116.         copyBlock(in);
  117.         datanode.myMetrics.copyBlockOp.inc(DataNode.now() - startTime);
  118.         break;
  119.       case DataTransferProtocol.OP_BLOCK_CHECKSUM: //get the checksum of a block
  120.         getBlockChecksum(in);
  121.         datanode.myMetrics.blockChecksumOp.inc(DataNode.now() - startTime);
  122.         break;
  123.       default:
  124.         throw new IOException("Unknown opcode " + op + " in data stream");
  125.       }
  126.     } catch (Throwable t) {
  127.       LOG.error(datanode.dnRegistration + ":DataXceiver",t);
  128.     } finally {
  129.       LOG.debug(datanode.dnRegistration + ":Number of active connections is: "
  130.                                + datanode.getXceiverCount());
  131.       IOUtils.closeStream(in);
  132.       IOUtils.closeSocket(s);
  133.       dataXceiverServer.childSockets.remove(s);
  134.     }
  135.   }
  136.   /**
  137.    * Read a block from the disk.
  138.    * @param in The stream to read from
  139.    * @throws IOException
  140.    */
  141.   private void readBlock(DataInputStream in) throws IOException {
  142.     //
  143.     // Read in the header
  144.     //
  145.     long blockId = in.readLong();          
  146.     Block block = new Block( blockId, 0 , in.readLong());
  147.     long startOffset = in.readLong();
  148.     long length = in.readLong();
  149.     String clientName = Text.readString(in);
  150.     // send the block
  151.     OutputStream baseStream = NetUtils.getOutputStream(s, 
  152.         datanode.socketWriteTimeout);
  153.     DataOutputStream out = new DataOutputStream(
  154.                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
  155.     
  156.     BlockSender blockSender = null;
  157.     final String clientTraceFmt =
  158.       clientName.length() > 0 && ClientTraceLog.isInfoEnabled()
  159.         ? String.format(DN_CLIENTTRACE_FORMAT, localAddress, remoteAddress,
  160.             "%d", "HDFS_READ", clientName,
  161.             datanode.dnRegistration.getStorageID(), block)
  162.         : datanode.dnRegistration + " Served block " + block + " to " +
  163.             s.getInetAddress();
  164.     try {
  165.       try {
  166.         blockSender = new BlockSender(block, startOffset, length,
  167.             true, true, false, datanode, clientTraceFmt);
  168.       } catch(IOException e) {
  169.         out.writeShort(DataTransferProtocol.OP_STATUS_ERROR);
  170.         throw e;
  171.       }
  172.       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS); // send op status
  173.       long read = blockSender.sendBlock(out, baseStream, null); // send data
  174.       if (blockSender.isBlockReadFully()) {
  175.         // See if client verification succeeded. 
  176.         // This is an optional response from client.
  177.         try {
  178.           if (in.readShort() == DataTransferProtocol.OP_STATUS_CHECKSUM_OK  && 
  179.               datanode.blockScanner != null) {
  180.             datanode.blockScanner.verifiedByClient(block);
  181.           }
  182.         } catch (IOException ignored) {}
  183.       }
  184.       
  185.       datanode.myMetrics.bytesRead.inc((int) read);
  186.       datanode.myMetrics.blocksRead.inc();
  187.     } catch ( SocketException ignored ) {
  188.       // Its ok for remote side to close the connection anytime.
  189.       datanode.myMetrics.blocksRead.inc();
  190.     } catch ( IOException ioe ) {
  191.       /* What exactly should we do here?
  192.        * Earlier version shutdown() datanode if there is disk error.
  193.        */
  194.       LOG.warn(datanode.dnRegistration +  ":Got exception while serving " + 
  195.           block + " to " +
  196.                 s.getInetAddress() + ":n" + 
  197.                 StringUtils.stringifyException(ioe) );
  198.       throw ioe;
  199.     } finally {
  200.       IOUtils.closeStream(out);
  201.       IOUtils.closeStream(blockSender);
  202.     }
  203.   }
  204.   /**
  205.    * Write a block to disk.
  206.    * 
  207.    * @param in The stream to read from
  208.    * @throws IOException
  209.    */
  210.   private void writeBlock(DataInputStream in) throws IOException {
  211.     DatanodeInfo srcDataNode = null;
  212.     LOG.debug("writeBlock receive buf size " + s.getReceiveBufferSize() +
  213.               " tcp no delay " + s.getTcpNoDelay());
  214.     //
  215.     // Read in the header
  216.     //
  217.     Block block = new Block(in.readLong(), 
  218.         dataXceiverServer.estimateBlockSize, in.readLong());
  219.     LOG.info("Receiving block " + block + 
  220.              " src: " + remoteAddress +
  221.              " dest: " + localAddress);
  222.     int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
  223.     boolean isRecovery = in.readBoolean(); // is this part of recovery?
  224.     String client = Text.readString(in); // working on behalf of this client
  225.     boolean hasSrcDataNode = in.readBoolean(); // is src node info present
  226.     if (hasSrcDataNode) {
  227.       srcDataNode = new DatanodeInfo();
  228.       srcDataNode.readFields(in);
  229.     }
  230.     int numTargets = in.readInt();
  231.     if (numTargets < 0) {
  232.       throw new IOException("Mislabelled incoming datastream.");
  233.     }
  234.     DatanodeInfo targets[] = new DatanodeInfo[numTargets];
  235.     for (int i = 0; i < targets.length; i++) {
  236.       DatanodeInfo tmp = new DatanodeInfo();
  237.       tmp.readFields(in);
  238.       targets[i] = tmp;
  239.     }
  240.     DataOutputStream mirrorOut = null;  // stream to next target
  241.     DataInputStream mirrorIn = null;    // reply from next target
  242.     DataOutputStream replyOut = null;   // stream to prev target
  243.     Socket mirrorSock = null;           // socket to next target
  244.     BlockReceiver blockReceiver = null; // responsible for data handling
  245.     String mirrorNode = null;           // the name:port of next target
  246.     String firstBadLink = "";           // first datanode that failed in connection setup
  247.     try {
  248.       // open a block receiver and check if the block does not exist
  249.       blockReceiver = new BlockReceiver(block, in, 
  250.           s.getRemoteSocketAddress().toString(),
  251.           s.getLocalSocketAddress().toString(),
  252.           isRecovery, client, srcDataNode, datanode);
  253.       // get a connection back to the previous target
  254.       replyOut = new DataOutputStream(
  255.                      NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
  256.       //
  257.       // Open network conn to backup machine, if 
  258.       // appropriate
  259.       //
  260.       if (targets.length > 0) {
  261.         InetSocketAddress mirrorTarget = null;
  262.         // Connect to backup machine
  263.         mirrorNode = targets[0].getName();
  264.         mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
  265.         mirrorSock = datanode.newSocket();
  266.         try {
  267.           int timeoutValue = numTargets * datanode.socketTimeout;
  268.           int writeTimeout = datanode.socketWriteTimeout + 
  269.                              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * numTargets);
  270.           NetUtils.connect(mirrorSock, mirrorTarget, timeoutValue);
  271.           mirrorSock.setSoTimeout(timeoutValue);
  272.           mirrorSock.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
  273.           mirrorOut = new DataOutputStream(
  274.              new BufferedOutputStream(
  275.                          NetUtils.getOutputStream(mirrorSock, writeTimeout),
  276.                          SMALL_BUFFER_SIZE));
  277.           mirrorIn = new DataInputStream(NetUtils.getInputStream(mirrorSock));
  278.           // Write header: Copied from DFSClient.java!
  279.           mirrorOut.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
  280.           mirrorOut.write( DataTransferProtocol.OP_WRITE_BLOCK );
  281.           mirrorOut.writeLong( block.getBlockId() );
  282.           mirrorOut.writeLong( block.getGenerationStamp() );
  283.           mirrorOut.writeInt( pipelineSize );
  284.           mirrorOut.writeBoolean( isRecovery );
  285.           Text.writeString( mirrorOut, client );
  286.           mirrorOut.writeBoolean(hasSrcDataNode);
  287.           if (hasSrcDataNode) { // pass src node information
  288.             srcDataNode.write(mirrorOut);
  289.           }
  290.           mirrorOut.writeInt( targets.length - 1 );
  291.           for ( int i = 1; i < targets.length; i++ ) {
  292.             targets[i].write( mirrorOut );
  293.           }
  294.           blockReceiver.writeChecksumHeader(mirrorOut);
  295.           mirrorOut.flush();
  296.           // read connect ack (only for clients, not for replication req)
  297.           if (client.length() != 0) {
  298.             firstBadLink = Text.readString(mirrorIn);
  299.             if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
  300.               LOG.info("Datanode " + targets.length +
  301.                        " got response for connect ack " +
  302.                        " from downstream datanode with firstbadlink as " +
  303.                        firstBadLink);
  304.             }
  305.           }
  306.         } catch (IOException e) {
  307.           if (client.length() != 0) {
  308.             Text.writeString(replyOut, mirrorNode);
  309.             replyOut.flush();
  310.           }
  311.           IOUtils.closeStream(mirrorOut);
  312.           mirrorOut = null;
  313.           IOUtils.closeStream(mirrorIn);
  314.           mirrorIn = null;
  315.           IOUtils.closeSocket(mirrorSock);
  316.           mirrorSock = null;
  317.           if (client.length() > 0) {
  318.             throw e;
  319.           } else {
  320.             LOG.info(datanode.dnRegistration + ":Exception transfering block " +
  321.                      block + " to mirror " + mirrorNode +
  322.                      ". continuing without the mirror.n" +
  323.                      StringUtils.stringifyException(e));
  324.           }
  325.         }
  326.       }
  327.       // send connect ack back to source (only for clients)
  328.       if (client.length() != 0) {
  329.         if (LOG.isDebugEnabled() || firstBadLink.length() > 0) {
  330.           LOG.info("Datanode " + targets.length +
  331.                    " forwarding connect ack to upstream firstbadlink is " +
  332.                    firstBadLink);
  333.         }
  334.         Text.writeString(replyOut, firstBadLink);
  335.         replyOut.flush();
  336.       }
  337.       // receive the block and mirror to the next target
  338.       String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
  339.       blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
  340.                                  mirrorAddr, null, targets.length);
  341.       // if this write is for a replication request (and not
  342.       // from a client), then confirm block. For client-writes,
  343.       // the block is finalized in the PacketResponder.
  344.       if (client.length() == 0) {
  345.         datanode.notifyNamenodeReceivedBlock(block, DataNode.EMPTY_DEL_HINT);
  346.         LOG.info("Received block " + block + 
  347.                  " src: " + remoteAddress +
  348.                  " dest: " + localAddress +
  349.                  " of size " + block.getNumBytes());
  350.       }
  351.       if (datanode.blockScanner != null) {
  352.         datanode.blockScanner.addBlock(block);
  353.       }
  354.       
  355.     } catch (IOException ioe) {
  356.       LOG.info("writeBlock " + block + " received exception " + ioe);
  357.       throw ioe;
  358.     } finally {
  359.       // close all opened streams
  360.       IOUtils.closeStream(mirrorOut);
  361.       IOUtils.closeStream(mirrorIn);
  362.       IOUtils.closeStream(replyOut);
  363.       IOUtils.closeSocket(mirrorSock);
  364.       IOUtils.closeStream(blockReceiver);
  365.     }
  366.   }
  367.   /**
  368.    * Reads the metadata and sends the data in one 'DATA_CHUNK'.
  369.    * @param in
  370.    */
  371.   void readMetadata(DataInputStream in) throws IOException {
  372.     Block block = new Block( in.readLong(), 0 , in.readLong());
  373.     MetaDataInputStream checksumIn = null;
  374.     DataOutputStream out = null;
  375.     
  376.     try {
  377.       checksumIn = datanode.data.getMetaDataInputStream(block);
  378.       
  379.       long fileSize = checksumIn.getLength();
  380.       if (fileSize >= 1L<<31 || fileSize <= 0) {
  381.           throw new IOException("Unexpected size for checksumFile of block" +
  382.                   block);
  383.       }
  384.       byte [] buf = new byte[(int)fileSize];
  385.       IOUtils.readFully(checksumIn, buf, 0, buf.length);
  386.       
  387.       out = new DataOutputStream(
  388.                 NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
  389.       
  390.       out.writeByte(DataTransferProtocol.OP_STATUS_SUCCESS);
  391.       out.writeInt(buf.length);
  392.       out.write(buf);
  393.       
  394.       //last DATA_CHUNK
  395.       out.writeInt(0);
  396.     } finally {
  397.       IOUtils.closeStream(out);
  398.       IOUtils.closeStream(checksumIn);
  399.     }
  400.   }
  401.   
  402.   /**
  403.    * Get block checksum (MD5 of CRC32).
  404.    * @param in
  405.    */
  406.   void getBlockChecksum(DataInputStream in) throws IOException {
  407.     final Block block = new Block(in.readLong(), 0 , in.readLong());
  408.     DataOutputStream out = null;
  409.     final MetaDataInputStream metadataIn = datanode.data.getMetaDataInputStream(block);
  410.     final DataInputStream checksumIn = new DataInputStream(new BufferedInputStream(
  411.         metadataIn, BUFFER_SIZE));
  412.     try {
  413.       //read metadata file
  414.       final BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
  415.       final DataChecksum checksum = header.getChecksum(); 
  416.       final int bytesPerCRC = checksum.getBytesPerChecksum();
  417.       final long crcPerBlock = (metadataIn.getLength()
  418.           - BlockMetadataHeader.getHeaderSize())/checksum.getChecksumSize();
  419.       
  420.       //compute block checksum
  421.       final MD5Hash md5 = MD5Hash.digest(checksumIn);
  422.       if (LOG.isDebugEnabled()) {
  423.         LOG.debug("block=" + block + ", bytesPerCRC=" + bytesPerCRC
  424.             + ", crcPerBlock=" + crcPerBlock + ", md5=" + md5);
  425.       }
  426.       //write reply
  427.       out = new DataOutputStream(
  428.           NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
  429.       out.writeShort(DataTransferProtocol.OP_STATUS_SUCCESS);
  430.       out.writeInt(bytesPerCRC);
  431.       out.writeLong(crcPerBlock);
  432.       md5.write(out);
  433.       out.flush();
  434.     } finally {
  435.       IOUtils.closeStream(out);
  436.       IOUtils.closeStream(checksumIn);
  437.       IOUtils.closeStream(metadataIn);
  438.     }
  439.   }
  440.   /**
  441.    * Read a block from the disk and then sends it to a destination.
  442.    * 
  443.    * @param in The stream to read from
  444.    * @throws IOException
  445.    */
  446.   private void copyBlock(DataInputStream in) throws IOException {
  447.     // Read in the header
  448.     long blockId = in.readLong(); // read block id
  449.     Block block = new Block(blockId, 0, in.readLong());
  450.     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
  451.       LOG.info("Not able to copy block " + blockId + " to " 
  452.           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
  453.       return;
  454.     }
  455.     BlockSender blockSender = null;
  456.     DataOutputStream reply = null;
  457.     boolean isOpSuccess = true;
  458.     try {
  459.       // check if the block exists or not
  460.       blockSender = new BlockSender(block, 0, -1, false, false, false, 
  461.           datanode);
  462.       // set up response stream
  463.       OutputStream baseStream = NetUtils.getOutputStream(
  464.           s, datanode.socketWriteTimeout);
  465.       reply = new DataOutputStream(new BufferedOutputStream(
  466.           baseStream, SMALL_BUFFER_SIZE));
  467.       // send block content to the target
  468.       long read = blockSender.sendBlock(reply, baseStream, 
  469.                                         dataXceiverServer.balanceThrottler);
  470.       datanode.myMetrics.bytesRead.inc((int) read);
  471.       datanode.myMetrics.blocksRead.inc();
  472.       
  473.       LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
  474.     } catch (IOException ioe) {
  475.       isOpSuccess = false;
  476.       throw ioe;
  477.     } finally {
  478.       dataXceiverServer.balanceThrottler.release();
  479.       if (isOpSuccess) {
  480.         try {
  481.           // send one last byte to indicate that the resource is cleaned.
  482.           reply.writeChar('d');
  483.         } catch (IOException ignored) {
  484.         }
  485.       }
  486.       IOUtils.closeStream(reply);
  487.       IOUtils.closeStream(blockSender);
  488.     }
  489.   }
  490.   /**
  491.    * Receive a block and write it to disk, it then notifies the namenode to
  492.    * remove the copy from the source.
  493.    * 
  494.    * @param in The stream to read from
  495.    * @throws IOException
  496.    */
  497.   private void replaceBlock(DataInputStream in) throws IOException {
  498.     /* read header */
  499.     long blockId = in.readLong();
  500.     Block block = new Block(blockId, dataXceiverServer.estimateBlockSize,
  501.         in.readLong()); // block id & generation stamp
  502.     String sourceID = Text.readString(in); // read del hint
  503.     DatanodeInfo proxySource = new DatanodeInfo(); // read proxy source
  504.     proxySource.readFields(in);
  505.     if (!dataXceiverServer.balanceThrottler.acquire()) { // not able to start
  506.       LOG.warn("Not able to receive block " + blockId + " from " 
  507.           + s.getRemoteSocketAddress() + " because threads quota is exceeded.");
  508.       sendResponse(s, (short)DataTransferProtocol.OP_STATUS_ERROR, 
  509.           datanode.socketWriteTimeout);
  510.       return;
  511.     }
  512.     Socket proxySock = null;
  513.     DataOutputStream proxyOut = null;
  514.     short opStatus = DataTransferProtocol.OP_STATUS_SUCCESS;
  515.     BlockReceiver blockReceiver = null;
  516.     DataInputStream proxyReply = null;
  517.     
  518.     try {
  519.       // get the output stream to the proxy
  520.       InetSocketAddress proxyAddr = NetUtils.createSocketAddr(
  521.           proxySource.getName());
  522.       proxySock = datanode.newSocket();
  523.       NetUtils.connect(proxySock, proxyAddr, datanode.socketTimeout);
  524.       proxySock.setSoTimeout(datanode.socketTimeout);
  525.       OutputStream baseStream = NetUtils.getOutputStream(proxySock, 
  526.           datanode.socketWriteTimeout);
  527.       proxyOut = new DataOutputStream(
  528.                      new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
  529.       /* send request to the proxy */
  530.       proxyOut.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION); // transfer version
  531.       proxyOut.writeByte(DataTransferProtocol.OP_COPY_BLOCK); // op code
  532.       proxyOut.writeLong(block.getBlockId()); // block id
  533.       proxyOut.writeLong(block.getGenerationStamp()); // block id
  534.       proxyOut.flush();
  535.       // receive the response from the proxy
  536.       proxyReply = new DataInputStream(new BufferedInputStream(
  537.           NetUtils.getInputStream(proxySock), BUFFER_SIZE));
  538.       // open a block receiver and check if the block does not exist
  539.       blockReceiver = new BlockReceiver(
  540.           block, proxyReply, proxySock.getRemoteSocketAddress().toString(),
  541.           proxySock.getLocalSocketAddress().toString(),
  542.           false, "", null, datanode);
  543.       // receive a block
  544.       blockReceiver.receiveBlock(null, null, null, null, 
  545.           dataXceiverServer.balanceThrottler, -1);
  546.                     
  547.       // notify name node
  548.       datanode.notifyNamenodeReceivedBlock(block, sourceID);
  549.       LOG.info("Moved block " + block + 
  550.           " from " + s.getRemoteSocketAddress());
  551.       
  552.     } catch (IOException ioe) {
  553.       opStatus = DataTransferProtocol.OP_STATUS_ERROR;
  554.       throw ioe;
  555.     } finally {
  556.       // receive the last byte that indicates the proxy released its thread resource
  557.       if (opStatus == DataTransferProtocol.OP_STATUS_SUCCESS) {
  558.         try {
  559.           proxyReply.readChar();
  560.         } catch (IOException ignored) {
  561.         }
  562.       }
  563.       
  564.       // now release the thread resource
  565.       dataXceiverServer.balanceThrottler.release();
  566.       
  567.       // send response back
  568.       try {
  569.         sendResponse(s, opStatus, datanode.socketWriteTimeout);
  570.       } catch (IOException ioe) {
  571.         LOG.warn("Error writing reply back to " + s.getRemoteSocketAddress());
  572.       }
  573.       IOUtils.closeStream(proxyOut);
  574.       IOUtils.closeStream(blockReceiver);
  575.       IOUtils.closeStream(proxyReply);
  576.     }
  577.   }
  578.   
  579.   /**
  580.    * Utility function for sending a response.
  581.    * @param s socket to write to
  582.    * @param opStatus status message to write
  583.    * @param timeout send timeout
  584.    **/
  585.   private void sendResponse(Socket s, short opStatus, long timeout) 
  586.                                                        throws IOException {
  587.     DataOutputStream reply = 
  588.       new DataOutputStream(NetUtils.getOutputStream(s, timeout));
  589.     try {
  590.       reply.writeShort(opStatus);
  591.       reply.flush();
  592.     } finally {
  593.       IOUtils.closeStream(reply);
  594.     }
  595.   }
  596. }