BlockSender.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs.server.datanode;
- import java.io.BufferedInputStream;
- import java.io.DataInputStream;
- import java.io.DataOutputStream;
- import java.io.FileInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.io.OutputStream;
- import java.net.SocketException;
- import java.nio.ByteBuffer;
- import java.nio.channels.FileChannel;
- import java.util.Arrays;
- import org.apache.commons.logging.Log;
- import org.apache.hadoop.fs.ChecksumException;
- import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.protocol.FSConstants;
- import org.apache.hadoop.io.IOUtils;
- import org.apache.hadoop.net.SocketOutputStream;
- import org.apache.hadoop.util.DataChecksum;
- import org.apache.hadoop.util.StringUtils;
- /**
- * Reads a block from the disk and sends it to a recipient.
- */
- class BlockSender implements java.io.Closeable, FSConstants {
- public static final Log LOG = DataNode.LOG;
- static final Log ClientTraceLog = DataNode.ClientTraceLog;
-
- private Block block; // the block to read from
- private InputStream blockIn; // data stream
- private long blockInPosition = -1; // updated while using transferTo().
- private DataInputStream checksumIn; // checksum datastream
- private DataChecksum checksum; // checksum stream
- private long offset; // starting position to read
- private long endOffset; // ending position
- private long blockLength;
- private int bytesPerChecksum; // chunk size
- private int checksumSize; // checksum size
- private boolean corruptChecksumOk; // if need to verify checksum
- private boolean chunkOffsetOK; // if need to send chunk offset
- private long seqno; // sequence number of packet
- private boolean transferToAllowed = true;
- private boolean blockReadFully; //set when the whole block is read
- private boolean verifyChecksum; //if true, check is verified while reading
- private BlockTransferThrottler throttler;
- private final String clientTraceFmt; // format of client trace log message
- /**
- * Minimum buffer used while sending data to clients. Used only if
- * transferTo() is enabled. 64KB is not that large. It could be larger, but
- * not sure if there will be much more improvement.
- */
- private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
-
- BlockSender(Block block, long startOffset, long length,
- boolean corruptChecksumOk, boolean chunkOffsetOK,
- boolean verifyChecksum, DataNode datanode) throws IOException {
- this(block, startOffset, length, corruptChecksumOk, chunkOffsetOK,
- verifyChecksum, datanode, null);
- }
- BlockSender(Block block, long startOffset, long length,
- boolean corruptChecksumOk, boolean chunkOffsetOK,
- boolean verifyChecksum, DataNode datanode, String clientTraceFmt)
- throws IOException {
- try {
- this.block = block;
- this.chunkOffsetOK = chunkOffsetOK;
- this.corruptChecksumOk = corruptChecksumOk;
- this.verifyChecksum = verifyChecksum;
- this.blockLength = datanode.data.getLength(block);
- this.transferToAllowed = datanode.transferToAllowed;
- this.clientTraceFmt = clientTraceFmt;
- if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
- checksumIn = new DataInputStream(
- new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
- BUFFER_SIZE));
- // read and handle the common header here. For now just a version
- BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
- short version = header.getVersion();
- if (version != FSDataset.METADATA_VERSION) {
- LOG.warn("Wrong version (" + version + ") for metadata file for "
- + block + " ignoring ...");
- }
- checksum = header.getChecksum();
- } else {
- LOG.warn("Could not find metadata file for " + block);
- // This only decides the buffer size. Use BUFFER_SIZE?
- checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_NULL,
- 16 * 1024);
- }
- /* If bytesPerChecksum is very large, then the metadata file
- * is mostly corrupted. For now just truncate bytesPerchecksum to
- * blockLength.
- */
- bytesPerChecksum = checksum.getBytesPerChecksum();
- if (bytesPerChecksum > 10*1024*1024 && bytesPerChecksum > blockLength){
- checksum = DataChecksum.newDataChecksum(checksum.getChecksumType(),
- Math.max((int)blockLength, 10*1024*1024));
- bytesPerChecksum = checksum.getBytesPerChecksum();
- }
- checksumSize = checksum.getChecksumSize();
- if (length < 0) {
- length = blockLength;
- }
- endOffset = blockLength;
- if (startOffset < 0 || startOffset > endOffset
- || (length + startOffset) > endOffset) {
- String msg = " Offset " + startOffset + " and length " + length
- + " don't match block " + block + " ( blockLen " + endOffset + " )";
- LOG.warn(datanode.dnRegistration + ":sendBlock() : " + msg);
- throw new IOException(msg);
- }
-
- offset = (startOffset - (startOffset % bytesPerChecksum));
- if (length >= 0) {
- // Make sure endOffset points to end of a checksumed chunk.
- long tmpLen = startOffset + length;
- if (tmpLen % bytesPerChecksum != 0) {
- tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
- }
- if (tmpLen < endOffset) {
- endOffset = tmpLen;
- }
- }
- // seek to the right offsets
- if (offset > 0) {
- long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
- // note blockInStream is seeked when created below
- if (checksumSkip > 0) {
- // Should we use seek() for checksum file as well?
- IOUtils.skipFully(checksumIn, checksumSkip);
- }
- }
- seqno = 0;
- blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
- } catch (IOException ioe) {
- IOUtils.closeStream(this);
- IOUtils.closeStream(blockIn);
- throw ioe;
- }
- }
- /**
- * close opened files.
- */
- public void close() throws IOException {
- IOException ioe = null;
- // close checksum file
- if(checksumIn!=null) {
- try {
- checksumIn.close();
- } catch (IOException e) {
- ioe = e;
- }
- checksumIn = null;
- }
- // close data file
- if(blockIn!=null) {
- try {
- blockIn.close();
- } catch (IOException e) {
- ioe = e;
- }
- blockIn = null;
- }
- // throw IOException if there is any
- if(ioe!= null) {
- throw ioe;
- }
- }
- /**
- * Converts an IOExcpetion (not subclasses) to SocketException.
- * This is typically done to indicate to upper layers that the error
- * was a socket error rather than often more serious exceptions like
- * disk errors.
- */
- private static IOException ioeToSocketException(IOException ioe) {
- if (ioe.getClass().equals(IOException.class)) {
- // "se" could be a new class in stead of SocketException.
- IOException se = new SocketException("Original Exception : " + ioe);
- se.initCause(ioe);
- /* Change the stacktrace so that original trace is not truncated
- * when printed.*/
- se.setStackTrace(ioe.getStackTrace());
- return se;
- }
- // otherwise just return the same exception.
- return ioe;
- }
- /**
- * Sends upto maxChunks chunks of data.
- *
- * When blockInPosition is >= 0, assumes 'out' is a
- * {@link SocketOutputStream} and tries
- * {@link SocketOutputStream#transferToFully(FileChannel, long, int)} to
- * send data (and updates blockInPosition).
- */
- private int sendChunks(ByteBuffer pkt, int maxChunks, OutputStream out)
- throws IOException {
- // Sends multiple chunks in one packet with a single write().
- int len = Math.min((int) (endOffset - offset),
- bytesPerChecksum*maxChunks);
- if (len == 0) {
- return 0;
- }
- int numChunks = (len + bytesPerChecksum - 1)/bytesPerChecksum;
- int packetLen = len + numChunks*checksumSize + 4;
- pkt.clear();
-
- // write packet header
- pkt.putInt(packetLen);
- pkt.putLong(offset);
- pkt.putLong(seqno);
- pkt.put((byte)((offset + len >= endOffset) ? 1 : 0));
- //why no ByteBuf.putBoolean()?
- pkt.putInt(len);
-
- int checksumOff = pkt.position();
- int checksumLen = numChunks * checksumSize;
- byte[] buf = pkt.array();
-
- if (checksumSize > 0 && checksumIn != null) {
- try {
- checksumIn.readFully(buf, checksumOff, checksumLen);
- } catch (IOException e) {
- LOG.warn(" Could not read or failed to veirfy checksum for data" +
- " at offset " + offset + " for block " + block + " got : "
- + StringUtils.stringifyException(e));
- IOUtils.closeStream(checksumIn);
- checksumIn = null;
- if (corruptChecksumOk) {
- if (checksumOff < checksumLen) {
- // Just fill the array with zeros.
- Arrays.fill(buf, checksumOff, checksumLen, (byte) 0);
- }
- } else {
- throw e;
- }
- }
- }
-
- int dataOff = checksumOff + checksumLen;
-
- if (blockInPosition < 0) {
- //normal transfer
- IOUtils.readFully(blockIn, buf, dataOff, len);
- if (verifyChecksum) {
- int dOff = dataOff;
- int cOff = checksumOff;
- int dLeft = len;
- for (int i=0; i<numChunks; i++) {
- checksum.reset();
- int dLen = Math.min(dLeft, bytesPerChecksum);
- checksum.update(buf, dOff, dLen);
- if (!checksum.compare(buf, cOff)) {
- throw new ChecksumException("Checksum failed at " +
- (offset + len - dLeft), len);
- }
- dLeft -= dLen;
- dOff += dLen;
- cOff += checksumSize;
- }
- }
- //writing is done below (mainly to handle IOException)
- }
-
- try {
- if (blockInPosition >= 0) {
- //use transferTo(). Checks on out and blockIn are already done.
- SocketOutputStream sockOut = (SocketOutputStream)out;
- //first write the packet
- sockOut.write(buf, 0, dataOff);
- // no need to flush. since we know out is not a buffered stream.
- sockOut.transferToFully(((FileInputStream)blockIn).getChannel(),
- blockInPosition, len);
- blockInPosition += len;
- } else {
- // normal transfer
- out.write(buf, 0, dataOff + len);
- }
-
- } catch (IOException e) {
- /* exception while writing to the client (well, with transferTo(),
- * it could also be while reading from the local file).
- */
- throw ioeToSocketException(e);
- }
- if (throttler != null) { // rebalancing so throttle
- throttler.throttle(packetLen);
- }
- return len;
- }
- /**
- * sendBlock() is used to read block and its metadata and stream the data to
- * either a client or to another datanode.
- *
- * @param out stream to which the block is written to
- * @param baseStream optional. if non-null, <code>out</code> is assumed to
- * be a wrapper over this stream. This enables optimizations for
- * sending the data, e.g.
- * {@link SocketOutputStream#transferToFully(FileChannel,
- * long, int)}.
- * @param throttler for sending data.
- * @return total bytes reads, including crc.
- */
- long sendBlock(DataOutputStream out, OutputStream baseStream,
- BlockTransferThrottler throttler) throws IOException {
- if( out == null ) {
- throw new IOException( "out stream is null" );
- }
- this.throttler = throttler;
- long initialOffset = offset;
- long totalRead = 0;
- OutputStream streamForSendChunks = out;
-
- try {
- try {
- checksum.writeHeader(out);
- if ( chunkOffsetOK ) {
- out.writeLong( offset );
- }
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
- }
-
- int maxChunksPerPacket;
- int pktSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
-
- if (transferToAllowed && !verifyChecksum &&
- baseStream instanceof SocketOutputStream &&
- blockIn instanceof FileInputStream) {
-
- FileChannel fileChannel = ((FileInputStream)blockIn).getChannel();
-
- // blockInPosition also indicates sendChunks() uses transferTo.
- blockInPosition = fileChannel.position();
- streamForSendChunks = baseStream;
-
- // assure a mininum buffer size.
- maxChunksPerPacket = (Math.max(BUFFER_SIZE,
- MIN_BUFFER_WITH_TRANSFERTO)
- + bytesPerChecksum - 1)/bytesPerChecksum;
-
- // allocate smaller buffer while using transferTo().
- pktSize += checksumSize * maxChunksPerPacket;
- } else {
- maxChunksPerPacket = Math.max(1,
- (BUFFER_SIZE + bytesPerChecksum - 1)/bytesPerChecksum);
- pktSize += (bytesPerChecksum + checksumSize) * maxChunksPerPacket;
- }
- ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
- while (endOffset > offset) {
- long len = sendChunks(pktBuf, maxChunksPerPacket,
- streamForSendChunks);
- offset += len;
- totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
- checksumSize);
- seqno++;
- }
- try {
- out.writeInt(0); // mark the end of block
- out.flush();
- } catch (IOException e) { //socket error
- throw ioeToSocketException(e);
- }
- } finally {
- if (clientTraceFmt != null) {
- ClientTraceLog.info(String.format(clientTraceFmt, totalRead));
- }
- close();
- }
- blockReadFully = (initialOffset == 0 && offset >= blockLength);
- return totalRead;
- }
-
- boolean isBlockReadFully() {
- return blockReadFully;
- }
- }