TestDataTransferProtocol.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import junit.framework.TestCase;
  20. import java.io.*;
  21. import java.util.Random;
  22. import java.net.InetSocketAddress;
  23. import java.net.Socket;
  24. import java.nio.ByteBuffer;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.FSDataInputStream;
  27. import org.apache.hadoop.fs.FSDataOutputStream;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.io.IOUtils;
  30. import org.apache.hadoop.io.Text;
  31. import org.apache.hadoop.net.NetUtils;
  32. import org.apache.hadoop.util.DataChecksum;
  33. import org.apache.hadoop.fs.Path;
  34. import org.apache.hadoop.hdfs.DFSClient.DFSDataInputStream;
  35. import org.apache.hadoop.hdfs.protocol.Block;
  36. import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
  37. import org.apache.hadoop.hdfs.protocol.DatanodeID;
  38. import org.apache.hadoop.hdfs.protocol.FSConstants.DatanodeReportType;
  39. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  40. import org.apache.commons.logging.Log;
  41. import org.apache.commons.logging.LogFactory;
  42. /**
  43.  * This tests data transfer protocol handling in the Datanode. It sends
  44.  * various forms of wrong data and verifies that Datanode handles it well.
  45.  */
  46. public class TestDataTransferProtocol extends TestCase {
  47.   
  48.   private static final Log LOG = LogFactory.getLog(
  49.                     "org.apache.hadoop.hdfs.TestDataTransferProtocol");
  50.   
  51.   DatanodeID datanode;
  52.   InetSocketAddress dnAddr;
  53.   ByteArrayOutputStream sendBuf = new ByteArrayOutputStream(128);
  54.   DataOutputStream sendOut = new DataOutputStream(sendBuf);
  55.   // byte[] recvBuf = new byte[128];
  56.   // ByteBuffer recvByteBuf = ByteBuffer.wrap(recvBuf);
  57.   ByteArrayOutputStream recvBuf = new ByteArrayOutputStream(128);
  58.   DataOutputStream recvOut = new DataOutputStream(recvBuf);
  59.   private void sendRecvData(String testDescription,
  60.                             boolean eofExpected) throws IOException {
  61.     /* Opens a socket to datanode
  62.      * sends the data in sendBuf.
  63.      * If there is data in expectedBuf, expects to receive the data
  64.      *     from datanode that matches expectedBuf.
  65.      * If there is an exception while recieving, throws it
  66.      *     only if exceptionExcepted is false.
  67.      */
  68.     
  69.     Socket sock = null;
  70.     try {
  71.       
  72.       if ( testDescription != null ) {
  73.         LOG.info("Testing : " + testDescription);
  74.       }
  75.       sock = new Socket();
  76.       sock.connect(dnAddr, HdfsConstants.READ_TIMEOUT);
  77.       sock.setSoTimeout(HdfsConstants.READ_TIMEOUT);
  78.       
  79.       OutputStream out = sock.getOutputStream();
  80.       // Should we excuse 
  81.       byte[] retBuf = new byte[recvBuf.size()];
  82.       
  83.       DataInputStream in = new DataInputStream(sock.getInputStream());
  84.       out.write(sendBuf.toByteArray());
  85.       try {
  86.         in.readFully(retBuf);
  87.       } catch (EOFException eof) {
  88.         if ( eofExpected ) {
  89.           LOG.info("Got EOF as expected.");
  90.           return;
  91.         }
  92.         throw eof;
  93.       }
  94.       for (int i=0; i<retBuf.length; i++) {
  95.         System.out.print(retBuf[i]);
  96.       }
  97.       System.out.println(":");
  98.       
  99.       if (eofExpected) {
  100.         throw new IOException("Did not recieve IOException when an exception " +
  101.                               "is expected while reading from " + 
  102.                               datanode.getName());
  103.       }
  104.       
  105.       byte[] needed = recvBuf.toByteArray();
  106.       for (int i=0; i<retBuf.length; i++) {
  107.         System.out.print(retBuf[i]);
  108.         assertEquals("checking byte[" + i + "]", needed[i], retBuf[i]);
  109.       }
  110.     } finally {
  111.       IOUtils.closeSocket(sock);
  112.     }
  113.   }
  114.   
  115.   void createFile(FileSystem fs, Path path, int fileLen) throws IOException {
  116.     byte [] arr = new byte[fileLen];
  117.     FSDataOutputStream out = fs.create(path);
  118.     out.write(arr);
  119.     out.close();
  120.   }
  121.   
  122.   void readFile(FileSystem fs, Path path, int fileLen) throws IOException {
  123.     byte [] arr = new byte[fileLen];
  124.     FSDataInputStream in = fs.open(path);
  125.     in.readFully(arr);
  126.   }
  127.   
  128.   public void testDataTransferProtocol() throws IOException {
  129.     Random random = new Random();
  130.     int oneMil = 1024*1024;
  131.     Path file = new Path("dataprotocol.dat");
  132.     int numDataNodes = 1;
  133.     
  134.     Configuration conf = new Configuration();
  135.     conf.setInt("dfs.replication", numDataNodes); 
  136.     MiniDFSCluster cluster = new MiniDFSCluster(conf, numDataNodes, true, null);
  137.     cluster.waitActive();
  138.     DFSClient dfsClient = new DFSClient(
  139.                  new InetSocketAddress("localhost", cluster.getNameNodePort()),
  140.                  conf);                
  141.     datanode = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
  142.     dnAddr = NetUtils.createSocketAddr(datanode.getName());
  143.     FileSystem fileSys = cluster.getFileSystem();
  144.     
  145.     int fileLen = Math.min(conf.getInt("dfs.block.size", 4096), 4096);
  146.     
  147.     createFile(fileSys, file, fileLen);
  148.     // get the first blockid for the file
  149.     Block firstBlock = DFSTestUtil.getFirstBlock(fileSys, file);
  150.     long newBlockId = firstBlock.getBlockId() + 1;
  151.     recvBuf.reset();
  152.     sendBuf.reset();
  153.     
  154.     // bad version
  155.     recvOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
  156.     sendOut.writeShort((short)(DataTransferProtocol.DATA_TRANSFER_VERSION-1));
  157.     sendRecvData("Wrong Version", true);
  158.     // bad ops
  159.     sendBuf.reset();
  160.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  161.     sendOut.writeByte((byte)(DataTransferProtocol.OP_WRITE_BLOCK-1));
  162.     sendRecvData("Wrong Op Code", true);
  163.     
  164.     /* Test OP_WRITE_BLOCK */
  165.     sendBuf.reset();
  166.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  167.     sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
  168.     sendOut.writeLong(newBlockId); // block id
  169.     sendOut.writeLong(0);          // generation stamp
  170.     sendOut.writeInt(0);           // targets in pipeline 
  171.     sendOut.writeBoolean(false);   // recoveryFlag
  172.     Text.writeString(sendOut, "cl");// clientID
  173.     sendOut.writeBoolean(false); // no src node info
  174.     sendOut.writeInt(0);           // number of downstream targets
  175.     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
  176.     
  177.     // bad bytes per checksum
  178.     sendOut.writeInt(-1-random.nextInt(oneMil));
  179.     recvBuf.reset();
  180.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
  181.     sendRecvData("wrong bytesPerChecksum while writing", true);
  182.     sendBuf.reset();
  183.     recvBuf.reset();
  184.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  185.     sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
  186.     sendOut.writeLong(newBlockId);
  187.     sendOut.writeLong(0);          // generation stamp
  188.     sendOut.writeInt(0);           // targets in pipeline 
  189.     sendOut.writeBoolean(false);   // recoveryFlag
  190.     Text.writeString(sendOut, "cl");// clientID
  191.     sendOut.writeBoolean(false); // no src node info
  192.     // bad number of targets
  193.     sendOut.writeInt(-1-random.nextInt(oneMil));
  194.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
  195.     sendRecvData("bad targets len while writing block " + newBlockId, true);
  196.     sendBuf.reset();
  197.     recvBuf.reset();
  198.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  199.     sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
  200.     sendOut.writeLong(++newBlockId);
  201.     sendOut.writeLong(0);          // generation stamp
  202.     sendOut.writeInt(0);           // targets in pipeline 
  203.     sendOut.writeBoolean(false);   // recoveryFlag
  204.     Text.writeString(sendOut, "cl");// clientID
  205.     sendOut.writeBoolean(false); // no src node info
  206.     sendOut.writeInt(0);
  207.     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
  208.     sendOut.writeInt((int)512);
  209.     sendOut.writeInt(4);           // size of packet
  210.     sendOut.writeLong(0);          // OffsetInBlock
  211.     sendOut.writeLong(100);        // sequencenumber
  212.     sendOut.writeBoolean(false);   // lastPacketInBlock
  213.     
  214.     // bad data chunk length
  215.     sendOut.writeInt(-1-random.nextInt(oneMil));
  216.     Text.writeString(recvOut, ""); // first bad node
  217.     recvOut.writeLong(100);        // sequencenumber
  218.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
  219.     sendRecvData("negative DATA_CHUNK len while writing block " + newBlockId, 
  220.                  true);
  221.     // test for writing a valid zero size block
  222.     sendBuf.reset();
  223.     recvBuf.reset();
  224.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  225.     sendOut.writeByte((byte)DataTransferProtocol.OP_WRITE_BLOCK);
  226.     sendOut.writeLong(++newBlockId);
  227.     sendOut.writeLong(0);          // generation stamp
  228.     sendOut.writeInt(0);           // targets in pipeline 
  229.     sendOut.writeBoolean(false);   // recoveryFlag
  230.     Text.writeString(sendOut, "cl");// clientID
  231.     sendOut.writeBoolean(false); // no src node info
  232.     sendOut.writeInt(0);
  233.     sendOut.writeByte((byte)DataChecksum.CHECKSUM_CRC32);
  234.     sendOut.writeInt((int)512);    // checksum size
  235.     sendOut.writeInt(8);           // size of packet
  236.     sendOut.writeLong(0);          // OffsetInBlock
  237.     sendOut.writeLong(100);        // sequencenumber
  238.     sendOut.writeBoolean(true);    // lastPacketInBlock
  239.     sendOut.writeInt(0);           // chunk length
  240.     sendOut.writeInt(0);           // zero checksum
  241.     //ok finally write a block with 0 len
  242.     Text.writeString(recvOut, ""); // first bad node
  243.     recvOut.writeLong(100);        // sequencenumber
  244.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);
  245.     sendRecvData("Writing a zero len block blockid " + newBlockId, false);
  246.     
  247.     /* Test OP_READ_BLOCK */
  248.     // bad block id
  249.     sendBuf.reset();
  250.     recvBuf.reset();
  251.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  252.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  253.     newBlockId = firstBlock.getBlockId()-1;
  254.     sendOut.writeLong(newBlockId);
  255.     sendOut.writeLong(firstBlock.getGenerationStamp());
  256.     sendOut.writeLong(0L);
  257.     sendOut.writeLong(fileLen);
  258.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);
  259.     Text.writeString(sendOut, "cl");
  260.     sendRecvData("Wrong block ID " + newBlockId + " for read", false); 
  261.     // negative block start offset
  262.     sendBuf.reset();
  263.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  264.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  265.     sendOut.writeLong(firstBlock.getBlockId());
  266.     sendOut.writeLong(firstBlock.getGenerationStamp());
  267.     sendOut.writeLong(-1L);
  268.     sendOut.writeLong(fileLen);
  269.     Text.writeString(sendOut, "cl");
  270.     sendRecvData("Negative start-offset for read for block " + 
  271.                  firstBlock.getBlockId(), false);
  272.     // bad block start offset
  273.     sendBuf.reset();
  274.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  275.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  276.     sendOut.writeLong(firstBlock.getBlockId());
  277.     sendOut.writeLong(firstBlock.getGenerationStamp());
  278.     sendOut.writeLong(fileLen);
  279.     sendOut.writeLong(fileLen);
  280.     Text.writeString(sendOut, "cl");
  281.     sendRecvData("Wrong start-offset for reading block " +
  282.                  firstBlock.getBlockId(), false);
  283.     
  284.     // negative length is ok. Datanode assumes we want to read the whole block.
  285.     recvBuf.reset();
  286.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_SUCCESS);    
  287.     sendBuf.reset();
  288.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  289.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  290.     sendOut.writeLong(firstBlock.getBlockId());
  291.     sendOut.writeLong(firstBlock.getGenerationStamp());
  292.     sendOut.writeLong(0);
  293.     sendOut.writeLong(-1-random.nextInt(oneMil));
  294.     Text.writeString(sendOut, "cl");
  295.     sendRecvData("Negative length for reading block " +
  296.                  firstBlock.getBlockId(), false);
  297.     
  298.     // length is more than size of block.
  299.     recvBuf.reset();
  300.     recvOut.writeShort((short)DataTransferProtocol.OP_STATUS_ERROR);    
  301.     sendBuf.reset();
  302.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  303.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  304.     sendOut.writeLong(firstBlock.getBlockId());
  305.     sendOut.writeLong(firstBlock.getGenerationStamp());
  306.     sendOut.writeLong(0);
  307.     sendOut.writeLong(fileLen + 1);
  308.     Text.writeString(sendOut, "cl");
  309.     sendRecvData("Wrong length for reading block " +
  310.                  firstBlock.getBlockId(), false);
  311.     
  312.     //At the end of all this, read the file to make sure that succeeds finally.
  313.     sendBuf.reset();
  314.     sendOut.writeShort((short)DataTransferProtocol.DATA_TRANSFER_VERSION);
  315.     sendOut.writeByte((byte)DataTransferProtocol.OP_READ_BLOCK);
  316.     sendOut.writeLong(firstBlock.getBlockId());
  317.     sendOut.writeLong(firstBlock.getGenerationStamp());
  318.     sendOut.writeLong(0);
  319.     sendOut.writeLong(fileLen);
  320.     Text.writeString(sendOut, "cl");
  321.     readFile(fileSys, file, fileLen);
  322.   }
  323. }