DFSClient.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:111k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs;
  19. import org.apache.hadoop.io.*;
  20. import org.apache.hadoop.io.retry.RetryPolicies;
  21. import org.apache.hadoop.io.retry.RetryPolicy;
  22. import org.apache.hadoop.io.retry.RetryProxy;
  23. import org.apache.hadoop.fs.*;
  24. import org.apache.hadoop.fs.permission.FsPermission;
  25. import org.apache.hadoop.ipc.*;
  26. import org.apache.hadoop.net.NetUtils;
  27. import org.apache.hadoop.net.NodeBase;
  28. import org.apache.hadoop.conf.*;
  29. import org.apache.hadoop.hdfs.DistributedFileSystem.DiskStatus;
  30. import org.apache.hadoop.hdfs.protocol.*;
  31. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  32. import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
  33. import org.apache.hadoop.hdfs.server.datanode.DataNode;
  34. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  35. import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
  36. import org.apache.hadoop.security.AccessControlException;
  37. import org.apache.hadoop.security.UnixUserGroupInformation;
  38. import org.apache.hadoop.util.*;
  39. import org.apache.commons.logging.*;
  40. import java.io.*;
  41. import java.net.*;
  42. import java.util.*;
  43. import java.util.zip.CRC32;
  44. import java.util.concurrent.TimeUnit;
  45. import java.util.concurrent.ConcurrentHashMap;
  46. import java.nio.BufferOverflowException;
  47. import java.nio.ByteBuffer;
  48. import javax.net.SocketFactory;
  49. import javax.security.auth.login.LoginException;
  50. /********************************************************
  51.  * DFSClient can connect to a Hadoop Filesystem and 
  52.  * perform basic file tasks.  It uses the ClientProtocol
  53.  * to communicate with a NameNode daemon, and connects 
  54.  * directly to DataNodes to read/write block data.
  55.  *
  56.  * Hadoop DFS users should obtain an instance of 
  57.  * DistributedFileSystem, which uses DFSClient to handle
  58.  * filesystem tasks.
  59.  *
  60.  ********************************************************/
  61. public class DFSClient implements FSConstants, java.io.Closeable {
  62.   public static final Log LOG = LogFactory.getLog(DFSClient.class);
  63.   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
  64.   private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
  65.   public final ClientProtocol namenode;
  66.   final private ClientProtocol rpcNamenode;
  67.   final UnixUserGroupInformation ugi;
  68.   volatile boolean clientRunning = true;
  69.   Random r = new Random();
  70.   final String clientName;
  71.   final LeaseChecker leasechecker = new LeaseChecker();
  72.   private Configuration conf;
  73.   private long defaultBlockSize;
  74.   private short defaultReplication;
  75.   private SocketFactory socketFactory;
  76.   private int socketTimeout;
  77.   private int datanodeWriteTimeout;
  78.   final int writePacketSize;
  79.   private final FileSystem.Statistics stats;
  80.   private int maxBlockAcquireFailures;
  81.     
  82.  
  83.   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
  84.     return createNamenode(NameNode.getAddress(conf), conf);
  85.   }
  86.   public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
  87.       Configuration conf) throws IOException {
  88.     try {
  89.       return createNamenode(createRPCNamenode(nameNodeAddr, conf,
  90.         UnixUserGroupInformation.login(conf, true)));
  91.     } catch (LoginException e) {
  92.       throw (IOException)(new IOException().initCause(e));
  93.     }
  94.   }
  95.   private static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
  96.       Configuration conf, UnixUserGroupInformation ugi) 
  97.     throws IOException {
  98.     return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
  99.         ClientProtocol.versionID, nameNodeAddr, ugi, conf,
  100.         NetUtils.getSocketFactory(conf, ClientProtocol.class));
  101.   }
  102.   private static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
  103.     throws IOException {
  104.     RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
  105.         5, LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
  106.     
  107.     Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
  108.       new HashMap<Class<? extends Exception>, RetryPolicy>();
  109.     remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
  110.     Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
  111.       new HashMap<Class<? extends Exception>, RetryPolicy>();
  112.     exceptionToPolicyMap.put(RemoteException.class, 
  113.         RetryPolicies.retryByRemoteException(
  114.             RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
  115.     RetryPolicy methodPolicy = RetryPolicies.retryByException(
  116.         RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
  117.     Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
  118.     
  119.     methodNameToPolicyMap.put("create", methodPolicy);
  120.     return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
  121.         rpcNamenode, methodNameToPolicyMap);
  122.   }
  123.   static ClientDatanodeProtocol createClientDatanodeProtocolProxy (
  124.       DatanodeID datanodeid, Configuration conf) throws IOException {
  125.     InetSocketAddress addr = NetUtils.createSocketAddr(
  126.       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
  127.     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
  128.       ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
  129.     }
  130.     return (ClientDatanodeProtocol)RPC.getProxy(ClientDatanodeProtocol.class,
  131.         ClientDatanodeProtocol.versionID, addr, conf);
  132.   }
  133.         
  134.   /** 
  135.    * Create a new DFSClient connected to the default namenode.
  136.    */
  137.   public DFSClient(Configuration conf) throws IOException {
  138.     this(NameNode.getAddress(conf), conf, null);
  139.   }
  140.   /** 
  141.    * Create a new DFSClient connected to the given namenode server.
  142.    */
  143.   public DFSClient(InetSocketAddress nameNodeAddr, Configuration conf,
  144.                    FileSystem.Statistics stats)
  145.     throws IOException {
  146.     this.conf = conf;
  147.     this.stats = stats;
  148.     this.socketTimeout = conf.getInt("dfs.socket.timeout", 
  149.                                      HdfsConstants.READ_TIMEOUT);
  150.     this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
  151.                                             HdfsConstants.WRITE_TIMEOUT);
  152.     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
  153.     // dfs.write.packet.size is an internal config variable
  154.     this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024);
  155.     this.maxBlockAcquireFailures = 
  156.                           conf.getInt("dfs.client.max.block.acquire.failures",
  157.                                       MAX_BLOCK_ACQUIRE_FAILURES);
  158.     
  159.     try {
  160.       this.ugi = UnixUserGroupInformation.login(conf, true);
  161.     } catch (LoginException e) {
  162.       throw (IOException)(new IOException().initCause(e));
  163.     }
  164.     this.rpcNamenode = createRPCNamenode(nameNodeAddr, conf, ugi);
  165.     this.namenode = createNamenode(rpcNamenode);
  166.     String taskId = conf.get("mapred.task.id");
  167.     if (taskId != null) {
  168.       this.clientName = "DFSClient_" + taskId; 
  169.     } else {
  170.       this.clientName = "DFSClient_" + r.nextInt();
  171.     }
  172.     defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE);
  173.     defaultReplication = (short) conf.getInt("dfs.replication", 3);
  174.   }
  175.   public DFSClient(InetSocketAddress nameNodeAddr, 
  176.                    Configuration conf) throws IOException {
  177.     this(nameNodeAddr, conf, null);
  178.   }
  179.   private void checkOpen() throws IOException {
  180.     if (!clientRunning) {
  181.       IOException result = new IOException("Filesystem closed");
  182.       throw result;
  183.     }
  184.   }
  185.     
  186.   /**
  187.    * Close the file system, abandoning all of the leases and files being
  188.    * created and close connections to the namenode.
  189.    */
  190.   public synchronized void close() throws IOException {
  191.     if(clientRunning) {
  192.       leasechecker.close();
  193.       clientRunning = false;
  194.       try {
  195.         leasechecker.interruptAndJoin();
  196.       } catch (InterruptedException ie) {
  197.       }
  198.   
  199.       // close connections to the namenode
  200.       RPC.stopProxy(rpcNamenode);
  201.     }
  202.   }
  203.   /**
  204.    * Get the default block size for this cluster
  205.    * @return the default block size in bytes
  206.    */
  207.   public long getDefaultBlockSize() {
  208.     return defaultBlockSize;
  209.   }
  210.     
  211.   public long getBlockSize(String f) throws IOException {
  212.     try {
  213.       return namenode.getPreferredBlockSize(f);
  214.     } catch (IOException ie) {
  215.       LOG.warn("Problem getting block size: " + 
  216.           StringUtils.stringifyException(ie));
  217.       throw ie;
  218.     }
  219.   }
  220.   /**
  221.    * Report corrupt blocks that were discovered by the client.
  222.    */
  223.   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
  224.     namenode.reportBadBlocks(blocks);
  225.   }
  226.   
  227.   public short getDefaultReplication() {
  228.     return defaultReplication;
  229.   }
  230.     
  231.   /**
  232.    *  @deprecated Use getBlockLocations instead
  233.    *
  234.    * Get hints about the location of the indicated block(s).
  235.    * 
  236.    * getHints() returns a list of hostnames that store data for
  237.    * a specific file region.  It returns a set of hostnames for 
  238.    * every block within the indicated region.
  239.    *
  240.    * This function is very useful when writing code that considers
  241.    * data-placement when performing operations.  For example, the
  242.    * MapReduce system tries to schedule tasks on the same machines
  243.    * as the data-block the task processes. 
  244.    */
  245.   @Deprecated
  246.   public String[][] getHints(String src, long start, long length) 
  247.     throws IOException {
  248.     BlockLocation[] blkLocations = getBlockLocations(src, start, length);
  249.     if ((blkLocations == null) || (blkLocations.length == 0)) {
  250.       return new String[0][];
  251.     }
  252.     int blkCount = blkLocations.length;
  253.     String[][]hints = new String[blkCount][];
  254.     for (int i=0; i < blkCount ; i++) {
  255.       String[] hosts = blkLocations[i].getHosts();
  256.       hints[i] = new String[hosts.length];
  257.       hints[i] = hosts;
  258.     }
  259.     return hints;
  260.   }
  261.   private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
  262.       String src, long start, long length) throws IOException {
  263.     try {
  264.       return namenode.getBlockLocations(src, start, length);
  265.     } catch(RemoteException re) {
  266.       throw re.unwrapRemoteException(AccessControlException.class,
  267.                                     FileNotFoundException.class);
  268.     }
  269.   }
  270.   /**
  271.    * Get block location info about file
  272.    * 
  273.    * getBlockLocations() returns a list of hostnames that store 
  274.    * data for a specific file region.  It returns a set of hostnames
  275.    * for every block within the indicated region.
  276.    *
  277.    * This function is very useful when writing code that considers
  278.    * data-placement when performing operations.  For example, the
  279.    * MapReduce system tries to schedule tasks on the same machines
  280.    * as the data-block the task processes. 
  281.    */
  282.   public BlockLocation[] getBlockLocations(String src, long start, 
  283.     long length) throws IOException {
  284.     LocatedBlocks blocks = callGetBlockLocations(namenode, src, start, length);
  285.     if (blocks == null) {
  286.       return new BlockLocation[0];
  287.     }
  288.     int nrBlocks = blocks.locatedBlockCount();
  289.     BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
  290.     int idx = 0;
  291.     for (LocatedBlock blk : blocks.getLocatedBlocks()) {
  292.       assert idx < nrBlocks : "Incorrect index";
  293.       DatanodeInfo[] locations = blk.getLocations();
  294.       String[] hosts = new String[locations.length];
  295.       String[] names = new String[locations.length];
  296.       String[] racks = new String[locations.length];
  297.       for (int hCnt = 0; hCnt < locations.length; hCnt++) {
  298.         hosts[hCnt] = locations[hCnt].getHostName();
  299.         names[hCnt] = locations[hCnt].getName();
  300.         NodeBase node = new NodeBase(names[hCnt], 
  301.                                      locations[hCnt].getNetworkLocation());
  302.         racks[hCnt] = node.toString();
  303.       }
  304.       blkLocations[idx] = new BlockLocation(names, hosts, racks,
  305.                                             blk.getStartOffset(),
  306.                                             blk.getBlockSize());
  307.       idx++;
  308.     }
  309.     return blkLocations;
  310.   }
  311.   public DFSInputStream open(String src) throws IOException {
  312.     return open(src, conf.getInt("io.file.buffer.size", 4096), true, null);
  313.   }
  314.   /**
  315.    * Create an input stream that obtains a nodelist from the
  316.    * namenode, and then reads from all the right places.  Creates
  317.    * inner subclass of InputStream that does the right out-of-band
  318.    * work.
  319.    */
  320.   DFSInputStream open(String src, int buffersize, boolean verifyChecksum,
  321.                       FileSystem.Statistics stats
  322.       ) throws IOException {
  323.     checkOpen();
  324.     //    Get block info from namenode
  325.     return new DFSInputStream(src, buffersize, verifyChecksum);
  326.   }
  327.   /**
  328.    * Create a new dfs file and return an output stream for writing into it. 
  329.    * 
  330.    * @param src stream name
  331.    * @param overwrite do not check for file existence if true
  332.    * @return output stream
  333.    * @throws IOException
  334.    */
  335.   public OutputStream create(String src, 
  336.                              boolean overwrite
  337.                              ) throws IOException {
  338.     return create(src, overwrite, defaultReplication, defaultBlockSize, null);
  339.   }
  340.     
  341.   /**
  342.    * Create a new dfs file and return an output stream for writing into it
  343.    * with write-progress reporting. 
  344.    * 
  345.    * @param src stream name
  346.    * @param overwrite do not check for file existence if true
  347.    * @return output stream
  348.    * @throws IOException
  349.    */
  350.   public OutputStream create(String src, 
  351.                              boolean overwrite,
  352.                              Progressable progress
  353.                              ) throws IOException {
  354.     return create(src, overwrite, defaultReplication, defaultBlockSize, null);
  355.   }
  356.     
  357.   /**
  358.    * Create a new dfs file with the specified block replication 
  359.    * and return an output stream for writing into the file.  
  360.    * 
  361.    * @param src stream name
  362.    * @param overwrite do not check for file existence if true
  363.    * @param replication block replication
  364.    * @return output stream
  365.    * @throws IOException
  366.    */
  367.   public OutputStream create(String src, 
  368.                              boolean overwrite, 
  369.                              short replication,
  370.                              long blockSize
  371.                              ) throws IOException {
  372.     return create(src, overwrite, replication, blockSize, null);
  373.   }
  374.   
  375.   /**
  376.    * Create a new dfs file with the specified block replication 
  377.    * with write-progress reporting and return an output stream for writing
  378.    * into the file.  
  379.    * 
  380.    * @param src stream name
  381.    * @param overwrite do not check for file existence if true
  382.    * @param replication block replication
  383.    * @return output stream
  384.    * @throws IOException
  385.    */
  386.   public OutputStream create(String src, 
  387.                              boolean overwrite, 
  388.                              short replication,
  389.                              long blockSize,
  390.                              Progressable progress
  391.                              ) throws IOException {
  392.     return create(src, overwrite, replication, blockSize, progress,
  393.         conf.getInt("io.file.buffer.size", 4096));
  394.   }
  395.   /**
  396.    * Call
  397.    * {@link #create(String,FsPermission,boolean,short,long,Progressable,int)}
  398.    * with default permission.
  399.    * @see FsPermission#getDefault()
  400.    */
  401.   public OutputStream create(String src,
  402.       boolean overwrite,
  403.       short replication,
  404.       long blockSize,
  405.       Progressable progress,
  406.       int buffersize
  407.       ) throws IOException {
  408.     return create(src, FsPermission.getDefault(),
  409.         overwrite, replication, blockSize, progress, buffersize);
  410.   }
  411.   /**
  412.    * Create a new dfs file with the specified block replication 
  413.    * with write-progress reporting and return an output stream for writing
  414.    * into the file.  
  415.    * 
  416.    * @param src stream name
  417.    * @param permission The permission of the directory being created.
  418.    * If permission == null, use {@link FsPermission#getDefault()}.
  419.    * @param overwrite do not check for file existence if true
  420.    * @param replication block replication
  421.    * @return output stream
  422.    * @throws IOException
  423.    * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
  424.    */
  425.   public OutputStream create(String src, 
  426.                              FsPermission permission,
  427.                              boolean overwrite, 
  428.                              short replication,
  429.                              long blockSize,
  430.                              Progressable progress,
  431.                              int buffersize
  432.                              ) throws IOException {
  433.     checkOpen();
  434.     if (permission == null) {
  435.       permission = FsPermission.getDefault();
  436.     }
  437.     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
  438.     LOG.debug(src + ": masked=" + masked);
  439.     OutputStream result = new DFSOutputStream(src, masked,
  440.         overwrite, replication, blockSize, progress, buffersize,
  441.         conf.getInt("io.bytes.per.checksum", 512));
  442.     leasechecker.put(src, result);
  443.     return result;
  444.   }
  445.   /**
  446.    * Append to an existing HDFS file.  
  447.    * 
  448.    * @param src file name
  449.    * @param buffersize buffer size
  450.    * @param progress for reporting write-progress
  451.    * @return an output stream for writing into the file
  452.    * @throws IOException
  453.    * @see ClientProtocol#append(String, String)
  454.    */
  455.   OutputStream append(String src, int buffersize, Progressable progress
  456.       ) throws IOException {
  457.     checkOpen();
  458.     FileStatus stat = null;
  459.     LocatedBlock lastBlock = null;
  460.     try {
  461.       stat = getFileInfo(src);
  462.       lastBlock = namenode.append(src, clientName);
  463.     } catch(RemoteException re) {
  464.       throw re.unwrapRemoteException(FileNotFoundException.class,
  465.                                      AccessControlException.class,
  466.                                      QuotaExceededException.class);
  467.     }
  468.     OutputStream result = new DFSOutputStream(src, buffersize, progress,
  469.         lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512));
  470.     leasechecker.put(src, result);
  471.     return result;
  472.   }
  473.   /**
  474.    * Set replication for an existing file.
  475.    * 
  476.    * @see ClientProtocol#setReplication(String, short)
  477.    * @param replication
  478.    * @throws IOException
  479.    * @return true is successful or false if file does not exist 
  480.    */
  481.   public boolean setReplication(String src, 
  482.                                 short replication
  483.                                 ) throws IOException {
  484.     try {
  485.       return namenode.setReplication(src, replication);
  486.     } catch(RemoteException re) {
  487.       throw re.unwrapRemoteException(AccessControlException.class,
  488.                                      QuotaExceededException.class);
  489.     }
  490.   }
  491.   /**
  492.    * Rename file or directory.
  493.    * See {@link ClientProtocol#rename(String, String)}. 
  494.    */
  495.   public boolean rename(String src, String dst) throws IOException {
  496.     checkOpen();
  497.     try {
  498.       return namenode.rename(src, dst);
  499.     } catch(RemoteException re) {
  500.       throw re.unwrapRemoteException(AccessControlException.class,
  501.                                      QuotaExceededException.class);
  502.     }
  503.   }
  504.   /**
  505.    * Delete file or directory.
  506.    * See {@link ClientProtocol#delete(String)}. 
  507.    */
  508.   @Deprecated
  509.   public boolean delete(String src) throws IOException {
  510.     checkOpen();
  511.     return namenode.delete(src, true);
  512.   }
  513.   /**
  514.    * delete file or directory.
  515.    * delete contents of the directory if non empty and recursive 
  516.    * set to true
  517.    */
  518.   public boolean delete(String src, boolean recursive) throws IOException {
  519.     checkOpen();
  520.     try {
  521.       return namenode.delete(src, recursive);
  522.     } catch(RemoteException re) {
  523.       throw re.unwrapRemoteException(AccessControlException.class);
  524.     }
  525.   }
  526.   
  527.   /** Implemented using getFileInfo(src)
  528.    */
  529.   public boolean exists(String src) throws IOException {
  530.     checkOpen();
  531.     return getFileInfo(src) != null;
  532.   }
  533.   /** @deprecated Use getFileStatus() instead */
  534.   @Deprecated
  535.   public boolean isDirectory(String src) throws IOException {
  536.     FileStatus fs = getFileInfo(src);
  537.     if (fs != null)
  538.       return fs.isDir();
  539.     else
  540.       throw new FileNotFoundException("File does not exist: " + src);
  541.   }
  542.   /**
  543.    */
  544.   public FileStatus[] listPaths(String src) throws IOException {
  545.     checkOpen();
  546.     try {
  547.       return namenode.getListing(src);
  548.     } catch(RemoteException re) {
  549.       throw re.unwrapRemoteException(AccessControlException.class);
  550.     }
  551.   }
  552.   public FileStatus getFileInfo(String src) throws IOException {
  553.     checkOpen();
  554.     try {
  555.       return namenode.getFileInfo(src);
  556.     } catch(RemoteException re) {
  557.       throw re.unwrapRemoteException(AccessControlException.class);
  558.     }
  559.   }
  560.   /**
  561.    * Get the checksum of a file.
  562.    * @param src The file path
  563.    * @return The checksum 
  564.    * @see DistributedFileSystem#getFileChecksum(Path)
  565.    */
  566.   MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
  567.     checkOpen();
  568.     return getFileChecksum(src, namenode, socketFactory, socketTimeout);    
  569.   }
  570.   /**
  571.    * Get the checksum of a file.
  572.    * @param src The file path
  573.    * @return The checksum 
  574.    */
  575.   public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
  576.       ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout
  577.       ) throws IOException {
  578.     //get all block locations
  579.     final List<LocatedBlock> locatedblocks
  580.         = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE).getLocatedBlocks();
  581.     final DataOutputBuffer md5out = new DataOutputBuffer();
  582.     int bytesPerCRC = 0;
  583.     long crcPerBlock = 0;
  584.     //get block checksum for each block
  585.     for(int i = 0; i < locatedblocks.size(); i++) {
  586.       LocatedBlock lb = locatedblocks.get(i);
  587.       final Block block = lb.getBlock();
  588.       final DatanodeInfo[] datanodes = lb.getLocations();
  589.       
  590.       //try each datanode location of the block
  591.       final int timeout = 3000 * datanodes.length + socketTimeout;
  592.       boolean done = false;
  593.       for(int j = 0; !done && j < datanodes.length; j++) {
  594.         //connect to a datanode
  595.         final Socket sock = socketFactory.createSocket();
  596.         NetUtils.connect(sock, 
  597.                          NetUtils.createSocketAddr(datanodes[j].getName()),
  598.                          timeout);
  599.         sock.setSoTimeout(timeout);
  600.         DataOutputStream out = new DataOutputStream(
  601.             new BufferedOutputStream(NetUtils.getOutputStream(sock), 
  602.                                      DataNode.SMALL_BUFFER_SIZE));
  603.         DataInputStream in = new DataInputStream(NetUtils.getInputStream(sock));
  604.         // get block MD5
  605.         try {
  606.           if (LOG.isDebugEnabled()) {
  607.             LOG.debug("write to " + datanodes[j].getName() + ": "
  608.                 + DataTransferProtocol.OP_BLOCK_CHECKSUM +
  609.                 ", block=" + block);
  610.           }
  611.           out.writeShort(DataTransferProtocol.DATA_TRANSFER_VERSION);
  612.           out.write(DataTransferProtocol.OP_BLOCK_CHECKSUM);
  613.           out.writeLong(block.getBlockId());
  614.           out.writeLong(block.getGenerationStamp());
  615.           out.flush();
  616.          
  617.           final short reply = in.readShort();
  618.           if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
  619.             throw new IOException("Bad response " + reply + " for block "
  620.                 + block + " from datanode " + datanodes[j].getName());
  621.           }
  622.           //read byte-per-checksum
  623.           final int bpc = in.readInt(); 
  624.           if (i == 0) { //first block
  625.             bytesPerCRC = bpc;
  626.           }
  627.           else if (bpc != bytesPerCRC) {
  628.             throw new IOException("Byte-per-checksum not matched: bpc=" + bpc
  629.                 + " but bytesPerCRC=" + bytesPerCRC);
  630.           }
  631.           
  632.           //read crc-per-block
  633.           final long cpb = in.readLong();
  634.           if (locatedblocks.size() > 1 && i == 0) {
  635.             crcPerBlock = cpb;
  636.           }
  637.           //read md5
  638.           final MD5Hash md5 = MD5Hash.read(in);
  639.           md5.write(md5out);
  640.           
  641.           done = true;
  642.           if (LOG.isDebugEnabled()) {
  643.             if (i == 0) {
  644.               LOG.debug("set bytesPerCRC=" + bytesPerCRC
  645.                   + ", crcPerBlock=" + crcPerBlock);
  646.             }
  647.             LOG.debug("got reply from " + datanodes[j].getName()
  648.                 + ": md5=" + md5);
  649.           }
  650.         } catch (IOException ie) {
  651.           LOG.warn("src=" + src + ", datanodes[" + j + "].getName()="
  652.               + datanodes[j].getName(), ie);
  653.         } finally {
  654.           IOUtils.closeStream(in);
  655.           IOUtils.closeStream(out);
  656.           IOUtils.closeSocket(sock);        
  657.         }
  658.       }
  659.       if (!done) {
  660.         throw new IOException("Fail to get block MD5 for " + block);
  661.       }
  662.     }
  663.     //compute file MD5
  664.     final MD5Hash fileMD5 = MD5Hash.digest(md5out.getData()); 
  665.     return new MD5MD5CRC32FileChecksum(bytesPerCRC, crcPerBlock, fileMD5);
  666.   }
  667.   /**
  668.    * Set permissions to a file or directory.
  669.    * @param src path name.
  670.    * @param permission
  671.    * @throws <code>FileNotFoundException</code> is file does not exist.
  672.    */
  673.   public void setPermission(String src, FsPermission permission
  674.                             ) throws IOException {
  675.     checkOpen();
  676.     try {
  677.       namenode.setPermission(src, permission);
  678.     } catch(RemoteException re) {
  679.       throw re.unwrapRemoteException(AccessControlException.class,
  680.                                      FileNotFoundException.class);
  681.     }
  682.   }
  683.   /**
  684.    * Set file or directory owner.
  685.    * @param src path name.
  686.    * @param username user id.
  687.    * @param groupname user group.
  688.    * @throws <code>FileNotFoundException</code> is file does not exist.
  689.    */
  690.   public void setOwner(String src, String username, String groupname
  691.                       ) throws IOException {
  692.     checkOpen();
  693.     try {
  694.       namenode.setOwner(src, username, groupname);
  695.     } catch(RemoteException re) {
  696.       throw re.unwrapRemoteException(AccessControlException.class,
  697.                                      FileNotFoundException.class);
  698.     }
  699.   }
  700.   public DiskStatus getDiskStatus() throws IOException {
  701.     long rawNums[] = namenode.getStats();
  702.     return new DiskStatus(rawNums[0], rawNums[1], rawNums[2]);
  703.   }
  704.   /**
  705.    */
  706.   public long totalRawCapacity() throws IOException {
  707.     long rawNums[] = namenode.getStats();
  708.     return rawNums[0];
  709.   }
  710.   /**
  711.    */
  712.   public long totalRawUsed() throws IOException {
  713.     long rawNums[] = namenode.getStats();
  714.     return rawNums[1];
  715.   }
  716.   /**
  717.    * Returns count of blocks with no good replicas left. Normally should be 
  718.    * zero.
  719.    * @throws IOException
  720.    */ 
  721.   public long getMissingBlocksCount() throws IOException {
  722.     return namenode.getStats()[ClientProtocol.GET_STATS_MISSING_BLOCKS_IDX];
  723.   }
  724.   
  725.   /**
  726.    * Returns count of blocks with one of more replica missing.
  727.    * @throws IOException
  728.    */ 
  729.   public long getUnderReplicatedBlocksCount() throws IOException {
  730.     return namenode.getStats()[ClientProtocol.GET_STATS_UNDER_REPLICATED_IDX];
  731.   }
  732.   
  733.   /**
  734.    * Returns count of blocks with at least one replica marked corrupt. 
  735.    * @throws IOException
  736.    */ 
  737.   public long getCorruptBlocksCount() throws IOException {
  738.     return namenode.getStats()[ClientProtocol.GET_STATS_CORRUPT_BLOCKS_IDX];
  739.   }
  740.   
  741.   public DatanodeInfo[] datanodeReport(DatanodeReportType type)
  742.   throws IOException {
  743.     return namenode.getDatanodeReport(type);
  744.   }
  745.     
  746.   /**
  747.    * Enter, leave or get safe mode.
  748.    * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} 
  749.    * for more details.
  750.    * 
  751.    * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
  752.    */
  753.   public boolean setSafeMode(SafeModeAction action) throws IOException {
  754.     return namenode.setSafeMode(action);
  755.   }
  756.   /**
  757.    * Save namespace image.
  758.    * See {@link ClientProtocol#saveNamespace()} 
  759.    * for more details.
  760.    * 
  761.    * @see ClientProtocol#saveNamespace()
  762.    */
  763.   void saveNamespace() throws AccessControlException, IOException {
  764.     try {
  765.       namenode.saveNamespace();
  766.     } catch(RemoteException re) {
  767.       throw re.unwrapRemoteException(AccessControlException.class);
  768.     }
  769.   }
  770.   /**
  771.    * Refresh the hosts and exclude files.  (Rereads them.)
  772.    * See {@link ClientProtocol#refreshNodes()} 
  773.    * for more details.
  774.    * 
  775.    * @see ClientProtocol#refreshNodes()
  776.    */
  777.   public void refreshNodes() throws IOException {
  778.     namenode.refreshNodes();
  779.   }
  780.   /**
  781.    * Dumps DFS data structures into specified file.
  782.    * See {@link ClientProtocol#metaSave(String)} 
  783.    * for more details.
  784.    * 
  785.    * @see ClientProtocol#metaSave(String)
  786.    */
  787.   public void metaSave(String pathname) throws IOException {
  788.     namenode.metaSave(pathname);
  789.   }
  790.     
  791.   /**
  792.    * @see ClientProtocol#finalizeUpgrade()
  793.    */
  794.   public void finalizeUpgrade() throws IOException {
  795.     namenode.finalizeUpgrade();
  796.   }
  797.   /**
  798.    * @see ClientProtocol#distributedUpgradeProgress(FSConstants.UpgradeAction)
  799.    */
  800.   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
  801.                                                         ) throws IOException {
  802.     return namenode.distributedUpgradeProgress(action);
  803.   }
  804.   /**
  805.    */
  806.   public boolean mkdirs(String src) throws IOException {
  807.     return mkdirs(src, null);
  808.   }
  809.   /**
  810.    * Create a directory (or hierarchy of directories) with the given
  811.    * name and permission.
  812.    *
  813.    * @param src The path of the directory being created
  814.    * @param permission The permission of the directory being created.
  815.    * If permission == null, use {@link FsPermission#getDefault()}.
  816.    * @return True if the operation success.
  817.    * @see ClientProtocol#mkdirs(String, FsPermission)
  818.    */
  819.   public boolean mkdirs(String src, FsPermission permission)throws IOException{
  820.     checkOpen();
  821.     if (permission == null) {
  822.       permission = FsPermission.getDefault();
  823.     }
  824.     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
  825.     LOG.debug(src + ": masked=" + masked);
  826.     try {
  827.       return namenode.mkdirs(src, masked);
  828.     } catch(RemoteException re) {
  829.       throw re.unwrapRemoteException(AccessControlException.class,
  830.                                      QuotaExceededException.class);
  831.     }
  832.   }
  833.   ContentSummary getContentSummary(String src) throws IOException {
  834.     try {
  835.       return namenode.getContentSummary(src);
  836.     } catch(RemoteException re) {
  837.       throw re.unwrapRemoteException(AccessControlException.class,
  838.                                      FileNotFoundException.class);
  839.     }
  840.   }
  841.   /**
  842.    * Sets or resets quotas for a directory.
  843.    * @see org.apache.hadoop.hdfs.protocol.ClientProtocol#setQuota(String, long, long)
  844.    */
  845.   void setQuota(String src, long namespaceQuota, long diskspaceQuota) 
  846.                                                  throws IOException {
  847.     // sanity check
  848.     if ((namespaceQuota <= 0 && namespaceQuota != FSConstants.QUOTA_DONT_SET &&
  849.          namespaceQuota != FSConstants.QUOTA_RESET) ||
  850.         (diskspaceQuota <= 0 && diskspaceQuota != FSConstants.QUOTA_DONT_SET &&
  851.          diskspaceQuota != FSConstants.QUOTA_RESET)) {
  852.       throw new IllegalArgumentException("Invalid values for quota : " +
  853.                                          namespaceQuota + " and " + 
  854.                                          diskspaceQuota);
  855.                                          
  856.     }
  857.     
  858.     try {
  859.       namenode.setQuota(src, namespaceQuota, diskspaceQuota);
  860.     } catch(RemoteException re) {
  861.       throw re.unwrapRemoteException(AccessControlException.class,
  862.                                      FileNotFoundException.class,
  863.                                      QuotaExceededException.class);
  864.     }
  865.   }
  866.   /**
  867.    * set the modification and access time of a file
  868.    * @throws FileNotFoundException if the path is not a file
  869.    */
  870.   public void setTimes(String src, long mtime, long atime) throws IOException {
  871.     checkOpen();
  872.     try {
  873.       namenode.setTimes(src, mtime, atime);
  874.     } catch(RemoteException re) {
  875.       throw re.unwrapRemoteException(AccessControlException.class,
  876.                                      FileNotFoundException.class);
  877.     }
  878.   }
  879.   /**
  880.    * Pick the best node from which to stream the data.
  881.    * Entries in <i>nodes</i> are already in the priority order
  882.    */
  883.   private DatanodeInfo bestNode(DatanodeInfo nodes[], 
  884.                                 AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
  885.                                 throws IOException {
  886.     if (nodes != null) { 
  887.       for (int i = 0; i < nodes.length; i++) {
  888.         if (!deadNodes.containsKey(nodes[i])) {
  889.           return nodes[i];
  890.         }
  891.       }
  892.     }
  893.     throw new IOException("No live nodes contain current block");
  894.   }
  895.   boolean isLeaseCheckerStarted() {
  896.     return leasechecker.daemon != null;
  897.   }
  898.   /** Lease management*/
  899.   class LeaseChecker implements Runnable {
  900.     /** A map from src -> DFSOutputStream of files that are currently being
  901.      * written by this client.
  902.      */
  903.     private final SortedMap<String, OutputStream> pendingCreates
  904.         = new TreeMap<String, OutputStream>();
  905.     private Daemon daemon = null;
  906.     
  907.     synchronized void put(String src, OutputStream out) {
  908.       if (clientRunning) {
  909.         if (daemon == null) {
  910.           daemon = new Daemon(this);
  911.           daemon.start();
  912.         }
  913.         pendingCreates.put(src, out);
  914.       }
  915.     }
  916.     
  917.     synchronized void remove(String src) {
  918.       pendingCreates.remove(src);
  919.     }
  920.     
  921.     void interruptAndJoin() throws InterruptedException {
  922.       Daemon daemonCopy = null;
  923.       synchronized (this) {
  924.         if (daemon != null) {
  925.           daemon.interrupt();
  926.           daemonCopy = daemon;
  927.         }
  928.       }
  929.      
  930.       if (daemonCopy != null) {
  931.         LOG.debug("Wait for lease checker to terminate");
  932.         daemonCopy.join();
  933.       }
  934.     }
  935.     synchronized void close() {
  936.       while (!pendingCreates.isEmpty()) {
  937.         String src = pendingCreates.firstKey();
  938.         OutputStream out = pendingCreates.remove(src);
  939.         if (out != null) {
  940.           try {
  941.             out.close();
  942.           } catch (IOException ie) {
  943.             LOG.error("Exception closing file " + src+ " : " + ie, ie);
  944.           }
  945.         }
  946.       }
  947.     }
  948.     private void renew() throws IOException {
  949.       synchronized(this) {
  950.         if (pendingCreates.isEmpty()) {
  951.           return;
  952.         }
  953.       }
  954.       namenode.renewLease(clientName);
  955.     }
  956.     /**
  957.      * Periodically check in with the namenode and renew all the leases
  958.      * when the lease period is half over.
  959.      */
  960.     public void run() {
  961.       long lastRenewed = 0;
  962.       while (clientRunning && !Thread.interrupted()) {
  963.         if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) {
  964.           try {
  965.             renew();
  966.             lastRenewed = System.currentTimeMillis();
  967.           } catch (IOException ie) {
  968.             LOG.warn("Problem renewing lease for " + clientName, ie);
  969.           }
  970.         }
  971.         try {
  972.           Thread.sleep(1000);
  973.         } catch (InterruptedException ie) {
  974.           if (LOG.isDebugEnabled()) {
  975.             LOG.debug(this + " is interrupted.", ie);
  976.           }
  977.           return;
  978.         }
  979.       }
  980.     }
  981.     /** {@inheritDoc} */
  982.     public String toString() {
  983.       String s = getClass().getSimpleName();
  984.       if (LOG.isTraceEnabled()) {
  985.         return s + "@" + DFSClient.this + ": "
  986.                + StringUtils.stringifyException(new Throwable("for testing"));
  987.       }
  988.       return s;
  989.     }
  990.   }
  991.   /** Utility class to encapsulate data node info and its ip address. */
  992.   private static class DNAddrPair {
  993.     DatanodeInfo info;
  994.     InetSocketAddress addr;
  995.     DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
  996.       this.info = info;
  997.       this.addr = addr;
  998.     }
  999.   }
  1000.   /** This is a wrapper around connection to datadone
  1001.    * and understands checksum, offset etc
  1002.    */
  1003.   public static class BlockReader extends FSInputChecker {
  1004.     private Socket dnSock; //for now just sending checksumOk.
  1005.     private DataInputStream in;
  1006.     private DataChecksum checksum;
  1007.     private long lastChunkOffset = -1;
  1008.     private long lastChunkLen = -1;
  1009.     private long lastSeqNo = -1;
  1010.     private long startOffset;
  1011.     private long firstChunkOffset;
  1012.     private int bytesPerChecksum;
  1013.     private int checksumSize;
  1014.     private boolean gotEOS = false;
  1015.     
  1016.     byte[] skipBuf = null;
  1017.     ByteBuffer checksumBytes = null;
  1018.     int dataLeft = 0;
  1019.     boolean isLastPacket = false;
  1020.     
  1021.     /* FSInputChecker interface */
  1022.     
  1023.     /* same interface as inputStream java.io.InputStream#read()
  1024.      * used by DFSInputStream#read()
  1025.      * This violates one rule when there is a checksum error:
  1026.      * "Read should not modify user buffer before successful read"
  1027.      * because it first reads the data to user buffer and then checks
  1028.      * the checksum.
  1029.      */
  1030.     @Override
  1031.     public synchronized int read(byte[] buf, int off, int len) 
  1032.                                  throws IOException {
  1033.       
  1034.       //for the first read, skip the extra bytes at the front.
  1035.       if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
  1036.         // Skip these bytes. But don't call this.skip()!
  1037.         int toSkip = (int)(startOffset - firstChunkOffset);
  1038.         if ( skipBuf == null ) {
  1039.           skipBuf = new byte[bytesPerChecksum];
  1040.         }
  1041.         if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
  1042.           // should never happen
  1043.           throw new IOException("Could not skip required number of bytes");
  1044.         }
  1045.       }
  1046.       
  1047.       boolean eosBefore = gotEOS;
  1048.       int nRead = super.read(buf, off, len);
  1049.       
  1050.       // if gotEOS was set in the previous read and checksum is enabled :
  1051.       if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
  1052.         //checksum is verified and there are no errors.
  1053.         checksumOk(dnSock);
  1054.       }
  1055.       return nRead;
  1056.     }
  1057.     @Override
  1058.     public synchronized long skip(long n) throws IOException {
  1059.       /* How can we make sure we don't throw a ChecksumException, at least
  1060.        * in majority of the cases?. This one throws. */  
  1061.       if ( skipBuf == null ) {
  1062.         skipBuf = new byte[bytesPerChecksum]; 
  1063.       }
  1064.       long nSkipped = 0;
  1065.       while ( nSkipped < n ) {
  1066.         int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
  1067.         int ret = read(skipBuf, 0, toSkip);
  1068.         if ( ret <= 0 ) {
  1069.           return nSkipped;
  1070.         }
  1071.         nSkipped += ret;
  1072.       }
  1073.       return nSkipped;
  1074.     }
  1075.     @Override
  1076.     public int read() throws IOException {
  1077.       throw new IOException("read() is not expected to be invoked. " +
  1078.                             "Use read(buf, off, len) instead.");
  1079.     }
  1080.     
  1081.     @Override
  1082.     public boolean seekToNewSource(long targetPos) throws IOException {
  1083.       /* Checksum errors are handled outside the BlockReader. 
  1084.        * DFSInputStream does not always call 'seekToNewSource'. In the 
  1085.        * case of pread(), it just tries a different replica without seeking.
  1086.        */ 
  1087.       return false;
  1088.     }
  1089.     
  1090.     @Override
  1091.     public void seek(long pos) throws IOException {
  1092.       throw new IOException("Seek() is not supported in BlockInputChecker");
  1093.     }
  1094.     @Override
  1095.     protected long getChunkPosition(long pos) {
  1096.       throw new RuntimeException("getChunkPosition() is not supported, " +
  1097.                                  "since seek is not required");
  1098.     }
  1099.     
  1100.     /**
  1101.      * Makes sure that checksumBytes has enough capacity 
  1102.      * and limit is set to the number of checksum bytes needed 
  1103.      * to be read.
  1104.      */
  1105.     private void adjustChecksumBytes(int dataLen) {
  1106.       int requiredSize = 
  1107.         ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
  1108.       if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
  1109.         checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
  1110.       } else {
  1111.         checksumBytes.clear();
  1112.       }
  1113.       checksumBytes.limit(requiredSize);
  1114.     }
  1115.     
  1116.     @Override
  1117.     protected synchronized int readChunk(long pos, byte[] buf, int offset, 
  1118.                                          int len, byte[] checksumBuf) 
  1119.                                          throws IOException {
  1120.       // Read one chunk.
  1121.       
  1122.       if ( gotEOS ) {
  1123.         if ( startOffset < 0 ) {
  1124.           //This is mainly for debugging. can be removed.
  1125.           throw new IOException( "BlockRead: already got EOS or an error" );
  1126.         }
  1127.         startOffset = -1;
  1128.         return -1;
  1129.       }
  1130.       
  1131.       // Read one DATA_CHUNK.
  1132.       long chunkOffset = lastChunkOffset;
  1133.       if ( lastChunkLen > 0 ) {
  1134.         chunkOffset += lastChunkLen;
  1135.       }
  1136.       
  1137.       if ( (pos + firstChunkOffset) != chunkOffset ) {
  1138.         throw new IOException("Mismatch in pos : " + pos + " + " + 
  1139.                               firstChunkOffset + " != " + chunkOffset);
  1140.       }
  1141.       // Read next packet if the previous packet has been read completely.
  1142.       if (dataLeft <= 0) {
  1143.         //Read packet headers.
  1144.         int packetLen = in.readInt();
  1145.         long offsetInBlock = in.readLong();
  1146.         long seqno = in.readLong();
  1147.         boolean lastPacketInBlock = in.readBoolean();
  1148.       
  1149.         if (LOG.isDebugEnabled()) {
  1150.           LOG.debug("DFSClient readChunk got seqno " + seqno +
  1151.                     " offsetInBlock " + offsetInBlock +
  1152.                     " lastPacketInBlock " + lastPacketInBlock +
  1153.                     " packetLen " + packetLen);
  1154.         }
  1155.         
  1156.         int dataLen = in.readInt();
  1157.       
  1158.         // Sanity check the lengths
  1159.         if ( dataLen < 0 || 
  1160.              ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
  1161.              (seqno != (lastSeqNo + 1)) ) {
  1162.              throw new IOException("BlockReader: error in packet header" +
  1163.                                    "(chunkOffset : " + chunkOffset + 
  1164.                                    ", dataLen : " + dataLen +
  1165.                                    ", seqno : " + seqno + 
  1166.                                    " (last: " + lastSeqNo + "))");
  1167.         }
  1168.         
  1169.         lastSeqNo = seqno;
  1170.         isLastPacket = lastPacketInBlock;
  1171.         dataLeft = dataLen;
  1172.         adjustChecksumBytes(dataLen);
  1173.         if (dataLen > 0) {
  1174.           IOUtils.readFully(in, checksumBytes.array(), 0,
  1175.                             checksumBytes.limit());
  1176.         }
  1177.       }
  1178.       int chunkLen = Math.min(dataLeft, bytesPerChecksum);
  1179.       
  1180.       if ( chunkLen > 0 ) {
  1181.         // len should be >= chunkLen
  1182.         IOUtils.readFully(in, buf, offset, chunkLen);
  1183.         checksumBytes.get(checksumBuf, 0, checksumSize);
  1184.       }
  1185.       
  1186.       dataLeft -= chunkLen;
  1187.       lastChunkOffset = chunkOffset;
  1188.       lastChunkLen = chunkLen;
  1189.       
  1190.       if ((dataLeft == 0 && isLastPacket) || chunkLen == 0) {
  1191.         gotEOS = true;
  1192.       }
  1193.       if ( chunkLen == 0 ) {
  1194.         return -1;
  1195.       }
  1196.       
  1197.       return chunkLen;
  1198.     }
  1199.     
  1200.     private BlockReader( String file, long blockId, DataInputStream in, 
  1201.                          DataChecksum checksum, boolean verifyChecksum,
  1202.                          long startOffset, long firstChunkOffset, 
  1203.                          Socket dnSock ) {
  1204.       super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
  1205.             1, verifyChecksum,
  1206.             checksum.getChecksumSize() > 0? checksum : null, 
  1207.             checksum.getBytesPerChecksum(),
  1208.             checksum.getChecksumSize());
  1209.       
  1210.       this.dnSock = dnSock;
  1211.       this.in = in;
  1212.       this.checksum = checksum;
  1213.       this.startOffset = Math.max( startOffset, 0 );
  1214.       this.firstChunkOffset = firstChunkOffset;
  1215.       lastChunkOffset = firstChunkOffset;
  1216.       lastChunkLen = -1;
  1217.       bytesPerChecksum = this.checksum.getBytesPerChecksum();
  1218.       checksumSize = this.checksum.getChecksumSize();
  1219.     }
  1220.     public static BlockReader newBlockReader(Socket sock, String file, long blockId, 
  1221.         long genStamp, long startOffset, long len, int bufferSize) throws IOException {
  1222.       return newBlockReader(sock, file, blockId, genStamp, startOffset, len, bufferSize,
  1223.           true);
  1224.     }
  1225.     /** Java Doc required */
  1226.     public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
  1227.                                        long genStamp,
  1228.                                        long startOffset, long len,
  1229.                                        int bufferSize, boolean verifyChecksum)
  1230.                                        throws IOException {
  1231.       return newBlockReader(sock, file, blockId, genStamp, startOffset,
  1232.                             len, bufferSize, verifyChecksum, "");
  1233.     }
  1234.     public static BlockReader newBlockReader( Socket sock, String file,
  1235.                                        long blockId, 
  1236.                                        long genStamp,
  1237.                                        long startOffset, long len,
  1238.                                        int bufferSize, boolean verifyChecksum,
  1239.                                        String clientName)
  1240.                                        throws IOException {
  1241.       // in and out will be closed when sock is closed (by the caller)
  1242.       DataOutputStream out = new DataOutputStream(
  1243.         new BufferedOutputStream(NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT)));
  1244.       //write the header.
  1245.       out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
  1246.       out.write( DataTransferProtocol.OP_READ_BLOCK );
  1247.       out.writeLong( blockId );
  1248.       out.writeLong( genStamp );
  1249.       out.writeLong( startOffset );
  1250.       out.writeLong( len );
  1251.       Text.writeString(out, clientName);
  1252.       out.flush();
  1253.       
  1254.       //
  1255.       // Get bytes in block, set streams
  1256.       //
  1257.       DataInputStream in = new DataInputStream(
  1258.           new BufferedInputStream(NetUtils.getInputStream(sock), 
  1259.                                   bufferSize));
  1260.       
  1261.       if ( in.readShort() != DataTransferProtocol.OP_STATUS_SUCCESS ) {
  1262.         throw new IOException("Got error in response to OP_READ_BLOCK " +
  1263.                               "for file " + file + 
  1264.                               " for block " + blockId);
  1265.       }
  1266.       DataChecksum checksum = DataChecksum.newDataChecksum( in );
  1267.       //Warning when we get CHECKSUM_NULL?
  1268.       
  1269.       // Read the first chunk offset.
  1270.       long firstChunkOffset = in.readLong();
  1271.       
  1272.       if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
  1273.           firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
  1274.         throw new IOException("BlockReader: error in first chunk offset (" +
  1275.                               firstChunkOffset + ") startOffset is " + 
  1276.                               startOffset + " for file " + file);
  1277.       }
  1278.       return new BlockReader( file, blockId, in, checksum, verifyChecksum,
  1279.                               startOffset, firstChunkOffset, sock );
  1280.     }
  1281.     @Override
  1282.     public synchronized void close() throws IOException {
  1283.       startOffset = -1;
  1284.       checksum = null;
  1285.       // in will be closed when its Socket is closed.
  1286.     }
  1287.     
  1288.     /** kind of like readFully(). Only reads as much as possible.
  1289.      * And allows use of protected readFully().
  1290.      */
  1291.     public int readAll(byte[] buf, int offset, int len) throws IOException {
  1292.       return readFully(this, buf, offset, len);
  1293.     }
  1294.     
  1295.     /* When the reader reaches end of a block and there are no checksum
  1296.      * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
  1297.      * checksum was verified and there was no error.
  1298.      */ 
  1299.     private void checksumOk(Socket sock) {
  1300.       try {
  1301.         OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
  1302.         byte buf[] = { (DataTransferProtocol.OP_STATUS_CHECKSUM_OK >>> 8) & 0xff,
  1303.                        (DataTransferProtocol.OP_STATUS_CHECKSUM_OK) & 0xff };
  1304.         out.write(buf);
  1305.         out.flush();
  1306.       } catch (IOException e) {
  1307.         // its ok not to be able to send this.
  1308.         LOG.debug("Could not write to datanode " + sock.getInetAddress() +
  1309.                   ": " + e.getMessage());
  1310.       }
  1311.     }
  1312.   }
  1313.     
  1314.   /****************************************************************
  1315.    * DFSInputStream provides bytes from a named file.  It handles 
  1316.    * negotiation of the namenode and various datanodes as necessary.
  1317.    ****************************************************************/
  1318.   class DFSInputStream extends FSInputStream {
  1319.     private Socket s = null;
  1320.     private boolean closed = false;
  1321.     private String src;
  1322.     private long prefetchSize = 10 * defaultBlockSize;
  1323.     private BlockReader blockReader = null;
  1324.     private boolean verifyChecksum;
  1325.     private LocatedBlocks locatedBlocks = null;
  1326.     private DatanodeInfo currentNode = null;
  1327.     private Block currentBlock = null;
  1328.     private long pos = 0;
  1329.     private long blockEnd = -1;
  1330.     private int failures = 0;
  1331.     /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
  1332.      * parallel accesses to DFSInputStream (through ptreads) properly */
  1333.     private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
  1334.                new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
  1335.     private int buffersize = 1;
  1336.     
  1337.     private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
  1338.     
  1339.     void addToDeadNodes(DatanodeInfo dnInfo) {
  1340.       deadNodes.put(dnInfo, dnInfo);
  1341.     }
  1342.     
  1343.     DFSInputStream(String src, int buffersize, boolean verifyChecksum
  1344.                    ) throws IOException {
  1345.       this.verifyChecksum = verifyChecksum;
  1346.       this.buffersize = buffersize;
  1347.       this.src = src;
  1348.       prefetchSize = conf.getLong("dfs.read.prefetch.size", prefetchSize);
  1349.       openInfo();
  1350.     }
  1351.     /**
  1352.      * Grab the open-file info from namenode
  1353.      */
  1354.     synchronized void openInfo() throws IOException {
  1355.       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
  1356.       if (newInfo == null) {
  1357.         throw new IOException("Cannot open filename " + src);
  1358.       }
  1359.       if (locatedBlocks != null) {
  1360.         Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
  1361.         Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
  1362.         while (oldIter.hasNext() && newIter.hasNext()) {
  1363.           if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
  1364.             throw new IOException("Blocklist for " + src + " has changed!");
  1365.           }
  1366.         }
  1367.       }
  1368.       this.locatedBlocks = newInfo;
  1369.       this.currentNode = null;
  1370.     }
  1371.     
  1372.     public synchronized long getFileLength() {
  1373.       return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
  1374.     }
  1375.     /**
  1376.      * Returns the datanode from which the stream is currently reading.
  1377.      */
  1378.     public DatanodeInfo getCurrentDatanode() {
  1379.       return currentNode;
  1380.     }
  1381.     /**
  1382.      * Returns the block containing the target position. 
  1383.      */
  1384.     public Block getCurrentBlock() {
  1385.       return currentBlock;
  1386.     }
  1387.     /**
  1388.      * Return collection of blocks that has already been located.
  1389.      */
  1390.     synchronized List<LocatedBlock> getAllBlocks() throws IOException {
  1391.       return getBlockRange(0, this.getFileLength());
  1392.     }
  1393.     /**
  1394.      * Get block at the specified position.
  1395.      * Fetch it from the namenode if not cached.
  1396.      * 
  1397.      * @param offset
  1398.      * @return located block
  1399.      * @throws IOException
  1400.      */
  1401.     private LocatedBlock getBlockAt(long offset) throws IOException {
  1402.       assert (locatedBlocks != null) : "locatedBlocks is null";
  1403.       // search cached blocks first
  1404.       int targetBlockIdx = locatedBlocks.findBlock(offset);
  1405.       if (targetBlockIdx < 0) { // block is not cached
  1406.         targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
  1407.         // fetch more blocks
  1408.         LocatedBlocks newBlocks;
  1409.         newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
  1410.         assert (newBlocks != null) : "Could not find target position " + offset;
  1411.         locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
  1412.       }
  1413.       LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
  1414.       // update current position
  1415.       this.pos = offset;
  1416.       this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
  1417.       this.currentBlock = blk.getBlock();
  1418.       return blk;
  1419.     }
  1420.     /**
  1421.      * Get blocks in the specified range.
  1422.      * Fetch them from the namenode if not cached.
  1423.      * 
  1424.      * @param offset
  1425.      * @param length
  1426.      * @return consequent segment of located blocks
  1427.      * @throws IOException
  1428.      */
  1429.     private synchronized List<LocatedBlock> getBlockRange(long offset, 
  1430.                                                           long length) 
  1431.                                                         throws IOException {
  1432.       assert (locatedBlocks != null) : "locatedBlocks is null";
  1433.       List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
  1434.       // search cached blocks first
  1435.       int blockIdx = locatedBlocks.findBlock(offset);
  1436.       if (blockIdx < 0) { // block is not cached
  1437.         blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
  1438.       }
  1439.       long remaining = length;
  1440.       long curOff = offset;
  1441.       while(remaining > 0) {
  1442.         LocatedBlock blk = null;
  1443.         if(blockIdx < locatedBlocks.locatedBlockCount())
  1444.           blk = locatedBlocks.get(blockIdx);
  1445.         if (blk == null || curOff < blk.getStartOffset()) {
  1446.           LocatedBlocks newBlocks;
  1447.           newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
  1448.           locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
  1449.           continue;
  1450.         }
  1451.         assert curOff >= blk.getStartOffset() : "Block not found";
  1452.         blockRange.add(blk);
  1453.         long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
  1454.         remaining -= bytesRead;
  1455.         curOff += bytesRead;
  1456.         blockIdx++;
  1457.       }
  1458.       return blockRange;
  1459.     }
  1460.     /**
  1461.      * Open a DataInputStream to a DataNode so that it can be read from.
  1462.      * We get block ID and the IDs of the destinations at startup, from the namenode.
  1463.      */
  1464.     private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
  1465.       if (target >= getFileLength()) {
  1466.         throw new IOException("Attempted to read past end of file");
  1467.       }
  1468.       if ( blockReader != null ) {
  1469.         blockReader.close(); 
  1470.         blockReader = null;
  1471.       }
  1472.       
  1473.       if (s != null) {
  1474.         s.close();
  1475.         s = null;
  1476.       }
  1477.       //
  1478.       // Compute desired block
  1479.       //
  1480.       LocatedBlock targetBlock = getBlockAt(target);
  1481.       assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
  1482.       long offsetIntoBlock = target - targetBlock.getStartOffset();
  1483.       //
  1484.       // Connect to best DataNode for desired Block, with potential offset
  1485.       //
  1486.       DatanodeInfo chosenNode = null;
  1487.       while (s == null) {
  1488.         DNAddrPair retval = chooseDataNode(targetBlock);
  1489.         chosenNode = retval.info;
  1490.         InetSocketAddress targetAddr = retval.addr;
  1491.         try {
  1492.           s = socketFactory.createSocket();
  1493.           NetUtils.connect(s, targetAddr, socketTimeout);
  1494.           s.setSoTimeout(socketTimeout);
  1495.           Block blk = targetBlock.getBlock();
  1496.           
  1497.           blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
  1498.               blk.getGenerationStamp(),
  1499.               offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
  1500.               buffersize, verifyChecksum, clientName);
  1501.           return chosenNode;
  1502.         } catch (IOException ex) {
  1503.           // Put chosen node into dead list, continue
  1504.           LOG.debug("Failed to connect to " + targetAddr + ":" 
  1505.                     + StringUtils.stringifyException(ex));
  1506.           addToDeadNodes(chosenNode);
  1507.           if (s != null) {
  1508.             try {
  1509.               s.close();
  1510.             } catch (IOException iex) {
  1511.             }                        
  1512.           }
  1513.           s = null;
  1514.         }
  1515.       }
  1516.       return chosenNode;
  1517.     }
  1518.     /**
  1519.      * Close it down!
  1520.      */
  1521.     @Override
  1522.     public synchronized void close() throws IOException {
  1523.       if (closed) {
  1524.         return;
  1525.       }
  1526.       checkOpen();
  1527.       
  1528.       if ( blockReader != null ) {
  1529.         blockReader.close();
  1530.         blockReader = null;
  1531.       }
  1532.       
  1533.       if (s != null) {
  1534.         s.close();
  1535.         s = null;
  1536.       }
  1537.       super.close();
  1538.       closed = true;
  1539.     }
  1540.     @Override
  1541.     public synchronized int read() throws IOException {
  1542.       int ret = read( oneByteBuf, 0, 1 );
  1543.       return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
  1544.     }
  1545.     /* This is a used by regular read() and handles ChecksumExceptions.
  1546.      * name readBuffer() is chosen to imply similarity to readBuffer() in
  1547.      * ChecksuFileSystem
  1548.      */ 
  1549.     private synchronized int readBuffer(byte buf[], int off, int len) 
  1550.                                                     throws IOException {
  1551.       IOException ioe;
  1552.       
  1553.       /* we retry current node only once. So this is set to true only here.
  1554.        * Intention is to handle one common case of an error that is not a
  1555.        * failure on datanode or client : when DataNode closes the connection
  1556.        * since client is idle. If there are other cases of "non-errors" then
  1557.        * then a datanode might be retried by setting this to true again.
  1558.        */
  1559.       boolean retryCurrentNode = true;
  1560.  
  1561.       while (true) {
  1562.         // retry as many times as seekToNewSource allows.
  1563.         try {
  1564.           return blockReader.read(buf, off, len);
  1565.         } catch ( ChecksumException ce ) {
  1566.           LOG.warn("Found Checksum error for " + currentBlock + " from " +
  1567.                    currentNode.getName() + " at " + ce.getPos());          
  1568.           reportChecksumFailure(src, currentBlock, currentNode);
  1569.           ioe = ce;
  1570.           retryCurrentNode = false;
  1571.         } catch ( IOException e ) {
  1572.           if (!retryCurrentNode) {
  1573.             LOG.warn("Exception while reading from " + currentBlock +
  1574.                      " of " + src + " from " + currentNode + ": " +
  1575.                      StringUtils.stringifyException(e));
  1576.           }
  1577.           ioe = e;
  1578.         }
  1579.         boolean sourceFound = false;
  1580.         if (retryCurrentNode) {
  1581.           /* possibly retry the same node so that transient errors don't
  1582.            * result in application level failures (e.g. Datanode could have
  1583.            * closed the connection because the client is idle for too long).
  1584.            */ 
  1585.           sourceFound = seekToBlockSource(pos);
  1586.         } else {
  1587.           addToDeadNodes(currentNode);
  1588.           sourceFound = seekToNewSource(pos);
  1589.         }
  1590.         if (!sourceFound) {
  1591.           throw ioe;
  1592.         }
  1593.         retryCurrentNode = false;
  1594.       }
  1595.     }
  1596.     /**
  1597.      * Read the entire buffer.
  1598.      */
  1599.     @Override
  1600.     public synchronized int read(byte buf[], int off, int len) throws IOException {
  1601.       checkOpen();
  1602.       if (closed) {
  1603.         throw new IOException("Stream closed");
  1604.       }
  1605.       if (pos < getFileLength()) {
  1606.         int retries = 2;
  1607.         while (retries > 0) {
  1608.           try {
  1609.             if (pos > blockEnd) {
  1610.               currentNode = blockSeekTo(pos);
  1611.             }
  1612.             int realLen = Math.min(len, (int) (blockEnd - pos + 1));
  1613.             int result = readBuffer(buf, off, realLen);
  1614.             
  1615.             if (result >= 0) {
  1616.               pos += result;
  1617.             } else {
  1618.               // got a EOS from reader though we expect more data on it.
  1619.               throw new IOException("Unexpected EOS from the reader");
  1620.             }
  1621.             if (stats != null && result != -1) {
  1622.               stats.incrementBytesRead(result);
  1623.             }
  1624.             return result;
  1625.           } catch (ChecksumException ce) {
  1626.             throw ce;            
  1627.           } catch (IOException e) {
  1628.             if (retries == 1) {
  1629.               LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
  1630.             }
  1631.             blockEnd = -1;
  1632.             if (currentNode != null) { addToDeadNodes(currentNode); }
  1633.             if (--retries == 0) {
  1634.               throw e;
  1635.             }
  1636.           }
  1637.         }
  1638.       }
  1639.       return -1;
  1640.     }
  1641.         
  1642.     private DNAddrPair chooseDataNode(LocatedBlock block)
  1643.       throws IOException {
  1644.       while (true) {
  1645.         DatanodeInfo[] nodes = block.getLocations();
  1646.         try {
  1647.           DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
  1648.           InetSocketAddress targetAddr = 
  1649.                             NetUtils.createSocketAddr(chosenNode.getName());
  1650.           return new DNAddrPair(chosenNode, targetAddr);
  1651.         } catch (IOException ie) {
  1652.           String blockInfo = block.getBlock() + " file=" + src;
  1653.           if (failures >= maxBlockAcquireFailures) {
  1654.             throw new IOException("Could not obtain block: " + blockInfo);
  1655.           }
  1656.           
  1657.           if (nodes == null || nodes.length == 0) {
  1658.             LOG.info("No node available for block: " + blockInfo);
  1659.           }
  1660.           LOG.info("Could not obtain block " + block.getBlock() + " from any node:  " + ie);
  1661.           try {
  1662.             Thread.sleep(3000);
  1663.           } catch (InterruptedException iex) {
  1664.           }
  1665.           deadNodes.clear(); //2nd option is to remove only nodes[blockId]
  1666.           openInfo();
  1667.           failures++;
  1668.           continue;
  1669.         }
  1670.       }
  1671.     } 
  1672.         
  1673.     private void fetchBlockByteRange(LocatedBlock block, long start,
  1674.                                      long end, byte[] buf, int offset) throws IOException {
  1675.       //
  1676.       // Connect to best DataNode for desired Block, with potential offset
  1677.       //
  1678.       Socket dn = null;
  1679.       int numAttempts = block.getLocations().length;
  1680.       IOException ioe = null;
  1681.       
  1682.       while (dn == null && numAttempts-- > 0 ) {
  1683.         DNAddrPair retval = chooseDataNode(block);
  1684.         DatanodeInfo chosenNode = retval.info;
  1685.         InetSocketAddress targetAddr = retval.addr;
  1686.         BlockReader reader = null;
  1687.             
  1688.         try {
  1689.           dn = socketFactory.createSocket();
  1690.           NetUtils.connect(dn, targetAddr, socketTimeout);
  1691.           dn.setSoTimeout(socketTimeout);
  1692.               
  1693.           int len = (int) (end - start + 1);
  1694.               
  1695.           reader = BlockReader.newBlockReader(dn, src, 
  1696.                                               block.getBlock().getBlockId(),
  1697.                                               block.getBlock().getGenerationStamp(),
  1698.                                               start, len, buffersize, 
  1699.                                               verifyChecksum, clientName);
  1700.           int nread = reader.readAll(buf, offset, len);
  1701.           if (nread != len) {
  1702.             throw new IOException("truncated return from reader.read(): " +
  1703.                                   "excpected " + len + ", got " + nread);
  1704.           }
  1705.           return;
  1706.         } catch (ChecksumException e) {
  1707.           ioe = e;
  1708.           LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
  1709.                    src + " at " + block.getBlock() + ":" + 
  1710.                    e.getPos() + " from " + chosenNode.getName());
  1711.           reportChecksumFailure(src, block.getBlock(), chosenNode);
  1712.         } catch (IOException e) {
  1713.           ioe = e;
  1714.           LOG.warn("Failed to connect to " + targetAddr + 
  1715.                    " for file " + src + 
  1716.                    " for block " + block.getBlock().getBlockId() + ":"  +
  1717.                    StringUtils.stringifyException(e));
  1718.         } finally {
  1719.           IOUtils.closeStream(reader);
  1720.           IOUtils.closeSocket(dn);
  1721.           dn = null;
  1722.         }
  1723.         // Put chosen node into dead list, continue
  1724.         addToDeadNodes(chosenNode);
  1725.       }
  1726.       throw (ioe == null) ? new IOException("Could not read data") : ioe;
  1727.     }
  1728.     /**
  1729.      * Read bytes starting from the specified position.
  1730.      * 
  1731.      * @param position start read from this position
  1732.      * @param buffer read buffer
  1733.      * @param offset offset into buffer
  1734.      * @param length number of bytes to read
  1735.      * 
  1736.      * @return actual number of bytes read
  1737.      */
  1738.     @Override
  1739.     public int read(long position, byte[] buffer, int offset, int length)
  1740.       throws IOException {
  1741.       // sanity checks
  1742.       checkOpen();
  1743.       if (closed) {
  1744.         throw new IOException("Stream closed");
  1745.       }
  1746.       long filelen = getFileLength();
  1747.       if ((position < 0) || (position >= filelen)) {
  1748.         return -1;
  1749.       }
  1750.       int realLen = length;
  1751.       if ((position + length) > filelen) {
  1752.         realLen = (int)(filelen - position);
  1753.       }
  1754.       
  1755.       // determine the block and byte range within the block
  1756.       // corresponding to position and realLen
  1757.       List<LocatedBlock> blockRange = getBlockRange(position, realLen);
  1758.       int remaining = realLen;
  1759.       for (LocatedBlock blk : blockRange) {
  1760.         long targetStart = position - blk.getStartOffset();
  1761.         long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
  1762.         fetchBlockByteRange(blk, targetStart, 
  1763.                             targetStart + bytesToRead - 1, buffer, offset);
  1764.         remaining -= bytesToRead;
  1765.         position += bytesToRead;
  1766.         offset += bytesToRead;
  1767.       }
  1768.       assert remaining == 0 : "Wrong number of bytes read.";
  1769.       if (stats != null) {
  1770.         stats.incrementBytesRead(realLen);
  1771.       }
  1772.       return realLen;
  1773.     }
  1774.      
  1775.     @Override
  1776.     public long skip(long n) throws IOException {
  1777.       if ( n > 0 ) {
  1778.         long curPos = getPos();
  1779.         long fileLen = getFileLength();
  1780.         if( n+curPos > fileLen ) {
  1781.           n = fileLen - curPos;
  1782.         }
  1783.         seek(curPos+n);
  1784.         return n;
  1785.       }
  1786.       return n < 0 ? -1 : 0;
  1787.     }
  1788.     /**
  1789.      * Seek to a new arbitrary location
  1790.      */
  1791.     @Override
  1792.     public synchronized void seek(long targetPos) throws IOException {
  1793.       if (targetPos > getFileLength()) {
  1794.         throw new IOException("Cannot seek after EOF");
  1795.       }
  1796.       boolean done = false;
  1797.       if (pos <= targetPos && targetPos <= blockEnd) {
  1798.         //
  1799.         // If this seek is to a positive position in the current
  1800.         // block, and this piece of data might already be lying in
  1801.         // the TCP buffer, then just eat up the intervening data.
  1802.         //
  1803.         int diff = (int)(targetPos - pos);
  1804.         if (diff <= TCP_WINDOW_SIZE) {
  1805.           try {
  1806.             pos += blockReader.skip(diff);
  1807.             if (pos == targetPos) {
  1808.               done = true;
  1809.             }
  1810.           } catch (IOException e) {//make following read to retry
  1811.             LOG.debug("Exception while seek to " + targetPos + " from "
  1812.                       + currentBlock +" of " + src + " from " + currentNode + 
  1813.                       ": " + StringUtils.stringifyException(e));
  1814.           }
  1815.         }
  1816.       }
  1817.       if (!done) {
  1818.         pos = targetPos;
  1819.         blockEnd = -1;
  1820.       }
  1821.     }
  1822.     /**
  1823.      * Same as {@link #seekToNewSource(long)} except that it does not exclude
  1824.      * the current datanode and might connect to the same node.
  1825.      */
  1826.     private synchronized boolean seekToBlockSource(long targetPos)
  1827.                                                    throws IOException {
  1828.       currentNode = blockSeekTo(targetPos);
  1829.       return true;
  1830.     }
  1831.     
  1832.     /**
  1833.      * Seek to given position on a node other than the current node.  If
  1834.      * a node other than the current node is found, then returns true. 
  1835.      * If another node could not be found, then returns false.
  1836.      */
  1837.     @Override
  1838.     public synchronized boolean seekToNewSource(long targetPos) throws IOException {
  1839.       boolean markedDead = deadNodes.containsKey(currentNode);
  1840.       addToDeadNodes(currentNode);
  1841.       DatanodeInfo oldNode = currentNode;
  1842.       DatanodeInfo newNode = blockSeekTo(targetPos);
  1843.       if (!markedDead) {
  1844.         /* remove it from deadNodes. blockSeekTo could have cleared 
  1845.          * deadNodes and added currentNode again. Thats ok. */
  1846.         deadNodes.remove(oldNode);
  1847.       }
  1848.       if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
  1849.         currentNode = newNode;
  1850.         return true;
  1851.       } else {
  1852.         return false;
  1853.       }
  1854.     }
  1855.         
  1856.     /**
  1857.      */
  1858.     @Override
  1859.     public synchronized long getPos() throws IOException {
  1860.       return pos;
  1861.     }
  1862.     /**
  1863.      */
  1864.     @Override
  1865.     public synchronized int available() throws IOException {
  1866.       if (closed) {
  1867.         throw new IOException("Stream closed");
  1868.       }
  1869.       return (int) (getFileLength() - pos);
  1870.     }
  1871.     /**
  1872.      * We definitely don't support marks
  1873.      */
  1874.     @Override
  1875.     public boolean markSupported() {
  1876.       return false;
  1877.     }
  1878.     @Override
  1879.     public void mark(int readLimit) {
  1880.     }
  1881.     @Override
  1882.     public void reset() throws IOException {
  1883.       throw new IOException("Mark/reset not supported");
  1884.     }
  1885.   }
  1886.     
  1887.   static class DFSDataInputStream extends FSDataInputStream {
  1888.     DFSDataInputStream(DFSInputStream in)
  1889.       throws IOException {
  1890.       super(in);
  1891.     }
  1892.       
  1893.     /**
  1894.      * Returns the datanode from which the stream is currently reading.
  1895.      */
  1896.     public DatanodeInfo getCurrentDatanode() {
  1897.       return ((DFSInputStream)in).getCurrentDatanode();
  1898.     }
  1899.       
  1900.     /**
  1901.      * Returns the block containing the target position. 
  1902.      */
  1903.     public Block getCurrentBlock() {
  1904.       return ((DFSInputStream)in).getCurrentBlock();
  1905.     }
  1906.     /**
  1907.      * Return collection of blocks that has already been located.
  1908.      */
  1909.     synchronized List<LocatedBlock> getAllBlocks() throws IOException {
  1910.       return ((DFSInputStream)in).getAllBlocks();
  1911.     }
  1912.   }
  1913.   /****************************************************************
  1914.    * DFSOutputStream creates files from a stream of bytes.
  1915.    *
  1916.    * The client application writes data that is cached internally by
  1917.    * this stream. Data is broken up into packets, each packet is
  1918.    * typically 64K in size. A packet comprises of chunks. Each chunk
  1919.    * is typically 512 bytes and has an associated checksum with it.
  1920.    *
  1921.    * When a client application fills up the currentPacket, it is
  1922.    * enqueued into dataQueue.  The DataStreamer thread picks up
  1923.    * packets from the dataQueue, sends it to the first datanode in
  1924.    * the pipeline and moves it from the dataQueue to the ackQueue.
  1925.    * The ResponseProcessor receives acks from the datanodes. When an
  1926.    * successful ack for a packet is received from all datanodes, the
  1927.    * ResponseProcessor removes the corresponding packet from the
  1928.    * ackQueue.
  1929.    *
  1930.    * In case of error, all outstanding packets and moved from
  1931.    * ackQueue. A new pipeline is setup by eliminating the bad
  1932.    * datanode from the original pipeline. The DataStreamer now
  1933.    * starts sending packets from the dataQueue.
  1934.   ****************************************************************/
  1935.   class DFSOutputStream extends FSOutputSummer implements Syncable {
  1936.     private Socket s;
  1937.     boolean closed = false;
  1938.   
  1939.     private String src;
  1940.     private DataOutputStream blockStream;
  1941.     private DataInputStream blockReplyStream;
  1942.     private Block block;
  1943.     final private long blockSize;
  1944.     private DataChecksum checksum;
  1945.     private LinkedList<Packet> dataQueue = new LinkedList<Packet>();
  1946.     private LinkedList<Packet> ackQueue = new LinkedList<Packet>();
  1947.     private Packet currentPacket = null;
  1948.     private int maxPackets = 80; // each packet 64K, total 5MB
  1949.     // private int maxPackets = 1000; // each packet 64K, total 64MB
  1950.     private DataStreamer streamer = new DataStreamer();;
  1951.     private ResponseProcessor response = null;
  1952.     private long currentSeqno = 0;
  1953.     private long bytesCurBlock = 0; // bytes writen in current block
  1954.     private int packetSize = 0; // write packet size, including the header.
  1955.     private int chunksPerPacket = 0;
  1956.     private DatanodeInfo[] nodes = null; // list of targets for current block
  1957.     private volatile boolean hasError = false;
  1958.     private volatile int errorIndex = 0;
  1959.     private volatile IOException lastException = null;
  1960.     private long artificialSlowdown = 0;
  1961.     private long lastFlushOffset = -1; // offset when flush was invoked
  1962.     private boolean persistBlocks = false; // persist blocks on namenode
  1963.     private int recoveryErrorCount = 0; // number of times block recovery failed
  1964.     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
  1965.     private volatile boolean appendChunk = false;   // appending to existing partial block
  1966.     private long initialFileSize = 0; // at time of file open
  1967.     private void setLastException(IOException e) {
  1968.       if (lastException == null) {
  1969.         lastException = e;
  1970.       }
  1971.     }
  1972.     
  1973.     private class Packet {
  1974.       ByteBuffer buffer;           // only one of buf and buffer is non-null
  1975.       byte[]  buf;
  1976.       long    seqno;               // sequencenumber of buffer in block
  1977.       long    offsetInBlock;       // offset in block
  1978.       boolean lastPacketInBlock;   // is this the last packet in block?
  1979.       int     numChunks;           // number of chunks currently in packet
  1980.       int     maxChunks;           // max chunks in packet
  1981.       int     dataStart;
  1982.       int     dataPos;
  1983.       int     checksumStart;
  1984.       int     checksumPos;      
  1985.   
  1986.       // create a new packet
  1987.       Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
  1988.         this.lastPacketInBlock = false;
  1989.         this.numChunks = 0;
  1990.         this.offsetInBlock = offsetInBlock;
  1991.         this.seqno = currentSeqno;
  1992.         currentSeqno++;
  1993.         
  1994.         buffer = null;
  1995.         buf = new byte[pktSize];
  1996.         
  1997.         checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
  1998.         checksumPos = checksumStart;
  1999.         dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
  2000.         dataPos = dataStart;
  2001.         maxChunks = chunksPerPkt;
  2002.       }
  2003.       void writeData(byte[] inarray, int off, int len) {
  2004.         if ( dataPos + len > buf.length) {
  2005.           throw new BufferOverflowException();
  2006.         }
  2007.         System.arraycopy(inarray, off, buf, dataPos, len);
  2008.         dataPos += len;
  2009.       }
  2010.   
  2011.       void  writeChecksum(byte[] inarray, int off, int len) {
  2012.         if (checksumPos + len > dataStart) {
  2013.           throw new BufferOverflowException();
  2014.         }
  2015.         System.arraycopy(inarray, off, buf, checksumPos, len);
  2016.         checksumPos += len;
  2017.       }
  2018.       
  2019.       /**
  2020.        * Returns ByteBuffer that contains one full packet, including header.
  2021.        */
  2022.       ByteBuffer getBuffer() {
  2023.         /* Once this is called, no more data can be added to the packet.
  2024.          * setting 'buf' to null ensures that.
  2025.          * This is called only when the packet is ready to be sent.
  2026.          */
  2027.         if (buffer != null) {
  2028.           return buffer;
  2029.         }
  2030.         
  2031.         //prepare the header and close any gap between checksum and data.
  2032.         
  2033.         int dataLen = dataPos - dataStart;
  2034.         int checksumLen = checksumPos - checksumStart;
  2035.         
  2036.         if (checksumPos != dataStart) {
  2037.           /* move the checksum to cover the gap.
  2038.            * This can happen for the last packet.
  2039.            */
  2040.           System.arraycopy(buf, checksumStart, buf, 
  2041.                            dataStart - checksumLen , checksumLen); 
  2042.         }
  2043.         
  2044.         int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
  2045.         
  2046.         //normally dataStart == checksumPos, i.e., offset is zero.
  2047.         buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
  2048.                                  DataNode.PKT_HEADER_LEN + pktLen);
  2049.         buf = null;
  2050.         buffer.mark();
  2051.         
  2052.         /* write the header and data length.
  2053.          * The format is described in comment before DataNode.BlockSender
  2054.          */
  2055.         buffer.putInt(pktLen);  // pktSize
  2056.         buffer.putLong(offsetInBlock); 
  2057.         buffer.putLong(seqno);
  2058.         buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
  2059.         //end of pkt header
  2060.         buffer.putInt(dataLen); // actual data length, excluding checksum.
  2061.         
  2062.         buffer.reset();
  2063.         return buffer;
  2064.       }
  2065.     }
  2066.   
  2067.     //
  2068.     // The DataStreamer class is responsible for sending data packets to the
  2069.     // datanodes in the pipeline. It retrieves a new blockid and block locations
  2070.     // from the namenode, and starts streaming packets to the pipeline of
  2071.     // Datanodes. Every packet has a sequence number associated with
  2072.     // it. When all the packets for a block are sent out and acks for each
  2073.     // if them are received, the DataStreamer closes the current block.
  2074.     //
  2075.     private class DataStreamer extends Daemon {
  2076.       private volatile boolean closed = false;
  2077.   
  2078.       public void run() {
  2079.         while (!closed && clientRunning) {
  2080.           // if the Responder encountered an error, shutdown Responder
  2081.           if (hasError && response != null) {
  2082.             try {
  2083.               response.close();
  2084.               response.join();
  2085.               response = null;
  2086.             } catch (InterruptedException  e) {
  2087.             }
  2088.           }
  2089.           Packet one = null;
  2090.           synchronized (dataQueue) {
  2091.             // process IO errors if any
  2092.             boolean doSleep = processDatanodeError(hasError, false);
  2093.             // wait for a packet to be sent.
  2094.             while ((!closed && !hasError && clientRunning 
  2095.                    && dataQueue.size() == 0) || doSleep) {
  2096.               try {
  2097.                 dataQueue.wait(1000);
  2098.               } catch (InterruptedException  e) {
  2099.               }
  2100.               doSleep = false;
  2101.             }
  2102.             if (closed || hasError || dataQueue.size() == 0 || !clientRunning) {
  2103.               continue;
  2104.             }
  2105.             try {
  2106.               // get packet to be sent.
  2107.               one = dataQueue.getFirst();
  2108.               long offsetInBlock = one.offsetInBlock;
  2109.   
  2110.               // get new block from namenode.
  2111.               if (blockStream == null) {
  2112.                 LOG.debug("Allocating new block");
  2113.                 nodes = nextBlockOutputStream(src); 
  2114.                 this.setName("DataStreamer for file " + src +
  2115.                              " block " + block);
  2116.                 response = new ResponseProcessor(nodes);
  2117.                 response.start();
  2118.               }
  2119.               if (offsetInBlock >= blockSize) {
  2120.                 throw new IOException("BlockSize " + blockSize +
  2121.                                       " is smaller than data size. " +
  2122.                                       " Offset of packet in block " + 
  2123.                                       offsetInBlock +
  2124.                                       " Aborting file " + src);
  2125.               }
  2126.               ByteBuffer buf = one.getBuffer();
  2127.               
  2128.               // move packet from dataQueue to ackQueue
  2129.               dataQueue.removeFirst();
  2130.               dataQueue.notifyAll();
  2131.               synchronized (ackQueue) {
  2132.                 ackQueue.addLast(one);
  2133.                 ackQueue.notifyAll();
  2134.               } 
  2135.               
  2136.               // write out data to remote datanode
  2137.               blockStream.write(buf.array(), buf.position(), buf.remaining());
  2138.               
  2139.               if (one.lastPacketInBlock) {
  2140.                 blockStream.writeInt(0); // indicate end-of-block 
  2141.               }
  2142.               blockStream.flush();
  2143.               if (LOG.isDebugEnabled()) {
  2144.                 LOG.debug("DataStreamer block " + block +
  2145.                           " wrote packet seqno:" + one.seqno +
  2146.                           " size:" + buf.remaining() +
  2147.                           " offsetInBlock:" + one.offsetInBlock + 
  2148.                           " lastPacketInBlock:" + one.lastPacketInBlock);
  2149.               }
  2150.             } catch (Throwable e) {
  2151.               LOG.warn("DataStreamer Exception: " + 
  2152.                        StringUtils.stringifyException(e));
  2153.               if (e instanceof IOException) {
  2154.                 setLastException((IOException)e);
  2155.               }
  2156.               hasError = true;
  2157.             }
  2158.           }
  2159.           if (closed || hasError || !clientRunning) {
  2160.             continue;
  2161.           }
  2162.           // Is this block full?
  2163.           if (one.lastPacketInBlock) {
  2164.             synchronized (ackQueue) {
  2165.               while (!hasError && ackQueue.size() != 0 && clientRunning) {
  2166.                 try {
  2167.                   ackQueue.wait();   // wait for acks to arrive from datanodes
  2168.                 } catch (InterruptedException  e) {
  2169.                 }
  2170.               }
  2171.             }
  2172.             LOG.debug("Closing old block " + block);
  2173.             this.setName("DataStreamer for file " + src);
  2174.             response.close();        // ignore all errors in Response
  2175.             try {
  2176.               response.join();
  2177.               response = null;
  2178.             } catch (InterruptedException  e) {
  2179.             }
  2180.             if (closed || hasError || !clientRunning) {
  2181.               continue;
  2182.             }
  2183.             synchronized (dataQueue) {
  2184.               try {
  2185.                 blockStream.close();
  2186.                 blockReplyStream.close();
  2187.               } catch (IOException e) {
  2188.               }
  2189.               nodes = null;
  2190.               response = null;
  2191.               blockStream = null;
  2192.               blockReplyStream = null;
  2193.             }
  2194.           }
  2195.           if (progress != null) { progress.progress(); }
  2196.           // This is used by unit test to trigger race conditions.
  2197.           if (artificialSlowdown != 0 && clientRunning) {
  2198.             try { 
  2199.               Thread.sleep(artificialSlowdown); 
  2200.             } catch (InterruptedException e) {}
  2201.           }
  2202.         }
  2203.       }
  2204.       // shutdown thread
  2205.       void close() {
  2206.         closed = true;
  2207.         synchronized (dataQueue) {
  2208.           dataQueue.notifyAll();
  2209.         }
  2210.         synchronized (ackQueue) {
  2211.           ackQueue.notifyAll();
  2212.         }
  2213.         this.interrupt();
  2214.       }
  2215.     }
  2216.                   
  2217.     //
  2218.     // Processes reponses from the datanodes.  A packet is removed 
  2219.     // from the ackQueue when its response arrives.
  2220.     //
  2221.     private class ResponseProcessor extends Thread {
  2222.       private volatile boolean closed = false;
  2223.       private DatanodeInfo[] targets = null;
  2224.       private boolean lastPacketInBlock = false;
  2225.       ResponseProcessor (DatanodeInfo[] targets) {
  2226.         this.targets = targets;
  2227.       }
  2228.       public void run() {
  2229.         this.setName("ResponseProcessor for block " + block);
  2230.   
  2231.         while (!closed && clientRunning && !lastPacketInBlock) {
  2232.           // process responses from datanodes.
  2233.           try {
  2234.             // verify seqno from datanode
  2235.             long seqno = blockReplyStream.readLong();
  2236.             LOG.debug("DFSClient received ack for seqno " + seqno);
  2237.             if (seqno == -1) {
  2238.               continue;
  2239.             } else if (seqno == -2) {
  2240.               // no nothing
  2241.             } else {
  2242.               Packet one = null;
  2243.               synchronized (ackQueue) {
  2244.                 one = ackQueue.getFirst();
  2245.               }
  2246.               if (one.seqno != seqno) {
  2247.                 throw new IOException("Responseprocessor: Expecting seqno " + 
  2248.                                       " for block " + block +
  2249.                                       one.seqno + " but received " + seqno);
  2250.               }
  2251.               lastPacketInBlock = one.lastPacketInBlock;
  2252.             }
  2253.             // processes response status from all datanodes.
  2254.             for (int i = 0; i < targets.length && clientRunning; i++) {
  2255.               short reply = blockReplyStream.readShort();
  2256.               if (reply != DataTransferProtocol.OP_STATUS_SUCCESS) {
  2257.                 errorIndex = i; // first bad datanode
  2258.                 throw new IOException("Bad response " + reply +
  2259.                                       " for block " + block +
  2260.                                       " from datanode " + 
  2261.                                       targets[i].getName());
  2262.               }
  2263.             }
  2264.             synchronized (ackQueue) {
  2265.               ackQueue.removeFirst();
  2266.               ackQueue.notifyAll();
  2267.             }
  2268.           } catch (Exception e) {
  2269.             if (!closed) {
  2270.               hasError = true;
  2271.               if (e instanceof IOException) {
  2272.                 setLastException((IOException)e);
  2273.               }
  2274.               LOG.warn("DFSOutputStream ResponseProcessor exception " + 
  2275.                        " for block " + block +
  2276.                         StringUtils.stringifyException(e));
  2277.               closed = true;
  2278.             }
  2279.           }
  2280.           synchronized (dataQueue) {
  2281.             dataQueue.notifyAll();
  2282.           }
  2283.           synchronized (ackQueue) {
  2284.             ackQueue.notifyAll();
  2285.           }
  2286.         }
  2287.       }
  2288.       void close() {
  2289.         closed = true;
  2290.         this.interrupt();
  2291.       }
  2292.     }
  2293.     // If this stream has encountered any errors so far, shutdown 
  2294.     // threads and mark stream as closed. Returns true if we should
  2295.     // sleep for a while after returning from this call.
  2296.     //
  2297.     private boolean processDatanodeError(boolean hasError, boolean isAppend) {
  2298.       if (!hasError) {
  2299.         return false;
  2300.       }
  2301.       if (response != null) {
  2302.         LOG.info("Error Recovery for block " + block +
  2303.                  " waiting for responder to exit. ");
  2304.         return true;
  2305.       }
  2306.       if (errorIndex >= 0) {
  2307.         LOG.warn("Error Recovery for block " + block
  2308.             + " bad datanode[" + errorIndex + "] "
  2309.             + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
  2310.       }
  2311.       if (blockStream != null) {
  2312.         try {
  2313.           blockStream.close();
  2314.           blockReplyStream.close();
  2315.         } catch (IOException e) {
  2316.         }
  2317.       }
  2318.       blockStream = null;
  2319.       blockReplyStream = null;
  2320.       // move packets from ack queue to front of the data queue
  2321.       synchronized (ackQueue) {
  2322.         dataQueue.addAll(0, ackQueue);
  2323.         ackQueue.clear();
  2324.       }
  2325.       boolean success = false;
  2326.       while (!success && clientRunning) {
  2327.         DatanodeInfo[] newnodes = null;
  2328.         if (nodes == null) {
  2329.           String msg = "Could not get block locations. " +
  2330.                                           "Source file "" + src
  2331.                                           + "" - Aborting...";
  2332.           LOG.warn(msg);
  2333.           setLastException(new IOException(msg));
  2334.           closed = true;
  2335.           if (streamer != null) streamer.close();
  2336.           return false;
  2337.         }
  2338.         StringBuilder pipelineMsg = new StringBuilder();
  2339.         for (int j = 0; j < nodes.length; j++) {
  2340.           pipelineMsg.append(nodes[j].getName());
  2341.           if (j < nodes.length - 1) {
  2342.             pipelineMsg.append(", ");
  2343.           }
  2344.         }
  2345.         // remove bad datanode from list of datanodes.
  2346.         // If errorIndex was not set (i.e. appends), then do not remove 
  2347.         // any datanodes
  2348.         // 
  2349.         if (errorIndex < 0) {
  2350.           newnodes = nodes;
  2351.         } else {
  2352.           if (nodes.length <= 1) {
  2353.             lastException = new IOException("All datanodes " + pipelineMsg + 
  2354.                                             " are bad. Aborting...");
  2355.             closed = true;
  2356.             if (streamer != null) streamer.close();
  2357.             return false;
  2358.           }
  2359.           LOG.warn("Error Recovery for block " + block +
  2360.                    " in pipeline " + pipelineMsg + 
  2361.                    ": bad datanode " + nodes[errorIndex].getName());
  2362.           newnodes =  new DatanodeInfo[nodes.length-1];
  2363.           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
  2364.           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
  2365.               newnodes.length-errorIndex);
  2366.         }
  2367.         // Tell the primary datanode to do error recovery 
  2368.         // by stamping appropriate generation stamps.
  2369.         //
  2370.         LocatedBlock newBlock = null;
  2371.         ClientDatanodeProtocol primary =  null;
  2372.         DatanodeInfo primaryNode = null;
  2373.         try {
  2374.           // Pick the "least" datanode as the primary datanode to avoid deadlock.
  2375.           primaryNode = Collections.min(Arrays.asList(newnodes));
  2376.           primary = createClientDatanodeProtocolProxy(primaryNode, conf);
  2377.           newBlock = primary.recoverBlock(block, isAppend, newnodes);
  2378.         } catch (IOException e) {
  2379.           recoveryErrorCount++;
  2380.           if (recoveryErrorCount > maxRecoveryErrorCount) {
  2381.             if (nodes.length > 1) {
  2382.               // if the primary datanode failed, remove it from the list.
  2383.               // The original bad datanode is left in the list because it is
  2384.               // conservative to remove only one datanode in one iteration.
  2385.               for (int j = 0; j < nodes.length; j++) {
  2386.                 if (nodes[j].equals(primaryNode)) {
  2387.                   errorIndex = j; // forget original bad node.
  2388.                 }
  2389.               }
  2390.               // remove primary node from list
  2391.               newnodes =  new DatanodeInfo[nodes.length-1];
  2392.               System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
  2393.               System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
  2394.                                newnodes.length-errorIndex);
  2395.               nodes = newnodes;
  2396.               LOG.warn("Error Recovery for block " + block + " failed " +
  2397.                        " because recovery from primary datanode " +
  2398.                        primaryNode + " failed " + recoveryErrorCount +
  2399.                        " times. " + " Pipeline was " + pipelineMsg +
  2400.                        ". Marking primary datanode as bad.");
  2401.               recoveryErrorCount = 0; 
  2402.               errorIndex = -1;
  2403.               return true;          // sleep when we return from here
  2404.             }
  2405.             String emsg = "Error Recovery for block " + block + " failed " +
  2406.                           " because recovery from primary datanode " +
  2407.                           primaryNode + " failed " + recoveryErrorCount + 
  2408.                           " times. "  + " Pipeline was " + pipelineMsg +
  2409.                           ". Aborting...";
  2410.             LOG.warn(emsg);
  2411.             lastException = new IOException(emsg);
  2412.             closed = true;
  2413.             if (streamer != null) streamer.close();
  2414.             return false;       // abort with IOexception
  2415.           } 
  2416.           LOG.warn("Error Recovery for block " + block + " failed " +
  2417.                    " because recovery from primary datanode " +
  2418.                    primaryNode + " failed " + recoveryErrorCount +
  2419.                    " times. "  + " Pipeline was " + pipelineMsg +
  2420.                    ". Will retry...");
  2421.           return true;          // sleep when we return from here
  2422.         } finally {
  2423.           RPC.stopProxy(primary);
  2424.         }
  2425.         recoveryErrorCount = 0; // block recovery successful
  2426.         // If the block recovery generated a new generation stamp, use that
  2427.         // from now on.  Also, setup new pipeline
  2428.         //
  2429.         if (newBlock != null) {
  2430.           block = newBlock.getBlock();
  2431.           nodes = newBlock.getLocations();
  2432.         }
  2433.         this.hasError = false;
  2434.         lastException = null;
  2435.         errorIndex = 0;
  2436.         success = createBlockOutputStream(nodes, clientName, true);
  2437.       }
  2438.       response = new ResponseProcessor(nodes);
  2439.       response.start();
  2440.       return false; // do not sleep, continue processing
  2441.     }
  2442.     private void isClosed() throws IOException {
  2443.       if (closed && lastException != null) {
  2444.           throw lastException;
  2445.       }
  2446.     }
  2447.     //
  2448.     // returns the list of targets, if any, that is being currently used.
  2449.     //
  2450.     DatanodeInfo[] getPipeline() {
  2451.       synchronized (dataQueue) {
  2452.         if (nodes == null) {
  2453.           return null;
  2454.         }
  2455.         DatanodeInfo[] value = new DatanodeInfo[nodes.length];
  2456.         for (int i = 0; i < nodes.length; i++) {
  2457.           value[i] = nodes[i];
  2458.         }
  2459.         return value;
  2460.       }
  2461.     }
  2462.     private Progressable progress;
  2463.     private DFSOutputStream(String src, long blockSize, Progressable progress,
  2464.         int bytesPerChecksum) throws IOException {
  2465.       super(new CRC32(), bytesPerChecksum, 4);
  2466.       this.src = src;
  2467.       this.blockSize = blockSize;
  2468.       this.progress = progress;
  2469.       if (progress != null) {
  2470.         LOG.debug("Set non-null progress callback on DFSOutputStream "+src);
  2471.       }
  2472.       
  2473.       if ( bytesPerChecksum < 1 || blockSize % bytesPerChecksum != 0) {
  2474.         throw new IOException("io.bytes.per.checksum(" + bytesPerChecksum +
  2475.                               ") and blockSize(" + blockSize + 
  2476.                               ") do not match. " + "blockSize should be a " +
  2477.                               "multiple of io.bytes.per.checksum");
  2478.                               
  2479.       }
  2480.       checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
  2481.                                               bytesPerChecksum);
  2482.     }
  2483.     /**
  2484.      * Create a new output stream to the given DataNode.
  2485.      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
  2486.      */
  2487.     DFSOutputStream(String src, FsPermission masked, boolean overwrite,
  2488.         short replication, long blockSize, Progressable progress,
  2489.         int buffersize, int bytesPerChecksum) throws IOException {
  2490.       this(src, blockSize, progress, bytesPerChecksum);
  2491.       computePacketChunkSize(writePacketSize, bytesPerChecksum);
  2492.       try {
  2493.         namenode.create(
  2494.             src, masked, clientName, overwrite, replication, blockSize);
  2495.       } catch(RemoteException re) {
  2496.         throw re.unwrapRemoteException(AccessControlException.class,
  2497.                                        QuotaExceededException.class);
  2498.       }
  2499.       streamer.start();
  2500.     }
  2501.   
  2502.     /**
  2503.      * Create a new output stream to the given DataNode.
  2504.      * @see ClientProtocol#create(String, FsPermission, String, boolean, short, long)
  2505.      */
  2506.     DFSOutputStream(String src, int buffersize, Progressable progress,
  2507.         LocatedBlock lastBlock, FileStatus stat,
  2508.         int bytesPerChecksum) throws IOException {
  2509.       this(src, stat.getBlockSize(), progress, bytesPerChecksum);
  2510.       initialFileSize = stat.getLen(); // length of file when opened
  2511.       //
  2512.       // The last partial block of the file has to be filled.
  2513.       //
  2514.       if (lastBlock != null) {
  2515.         block = lastBlock.getBlock();
  2516.         long usedInLastBlock = stat.getLen() % blockSize;
  2517.         int freeInLastBlock = (int)(blockSize - usedInLastBlock);
  2518.         // calculate the amount of free space in the pre-existing 
  2519.         // last crc chunk
  2520.         int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
  2521.         int freeInCksum = bytesPerChecksum - usedInCksum;
  2522.         // if there is space in the last block, then we have to 
  2523.         // append to that block
  2524.         if (freeInLastBlock > blockSize) {
  2525.           throw new IOException("The last block for file " + 
  2526.                                 src + " is full.");
  2527.         }
  2528.         // indicate that we are appending to an existing block
  2529.         bytesCurBlock = lastBlock.getBlockSize();
  2530.         if (usedInCksum > 0 && freeInCksum > 0) {
  2531.           // if there is space in the last partial chunk, then 
  2532.           // setup in such a way that the next packet will have only 
  2533.           // one chunk that fills up the partial chunk.
  2534.           //
  2535.           computePacketChunkSize(0, freeInCksum);
  2536.           resetChecksumChunk(freeInCksum);
  2537.           this.appendChunk = true;
  2538.         } else {
  2539.           // if the remaining space in the block is smaller than 
  2540.           // that expected size of of a packet, then create 
  2541.           // smaller size packet.
  2542.           //
  2543.           computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
  2544.                                  bytesPerChecksum);
  2545.         }
  2546.         // setup pipeline to append to the last block XXX retries??
  2547.         nodes = lastBlock.getLocations();
  2548.         errorIndex = -1;   // no errors yet.
  2549.         if (nodes.length < 1) {
  2550.           throw new IOException("Unable to retrieve blocks locations " +
  2551.                                 " for last block " + block +
  2552.                                 "of file " + src);
  2553.                         
  2554.         }
  2555.         processDatanodeError(true, true);
  2556.         streamer.start();
  2557.       }
  2558.       else {
  2559.         computePacketChunkSize(writePacketSize, bytesPerChecksum);
  2560.         streamer.start();
  2561.       }
  2562.     }
  2563.     private void computePacketChunkSize(int psize, int csize) {
  2564.       int chunkSize = csize + checksum.getChecksumSize();
  2565.       int n = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
  2566.       chunksPerPacket = Math.max((psize - n + chunkSize-1)/chunkSize, 1);
  2567.       packetSize = n + chunkSize*chunksPerPacket;
  2568.       if (LOG.isDebugEnabled()) {
  2569.         LOG.debug("computePacketChunkSize: src=" + src +
  2570.                   ", chunkSize=" + chunkSize +
  2571.                   ", chunksPerPacket=" + chunksPerPacket +
  2572.                   ", packetSize=" + packetSize);
  2573.       }
  2574.     }
  2575.     /**
  2576.      * Open a DataOutputStream to a DataNode so that it can be written to.
  2577.      * This happens when a file is created and each time a new block is allocated.
  2578.      * Must get block ID and the IDs of the destinations from the namenode.
  2579.      * Returns the list of target datanodes.
  2580.      */
  2581.     private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
  2582.       LocatedBlock lb = null;
  2583.       boolean retry = false;
  2584.       DatanodeInfo[] nodes;
  2585.       int count = conf.getInt("dfs.client.block.write.retries", 3);
  2586.       boolean success;
  2587.       do {
  2588.         hasError = false;
  2589.         lastException = null;
  2590.         errorIndex = 0;
  2591.         retry = false;
  2592.         nodes = null;
  2593.         success = false;
  2594.                 
  2595.         long startTime = System.currentTimeMillis();
  2596.         lb = locateFollowingBlock(startTime);
  2597.         block = lb.getBlock();
  2598.         nodes = lb.getLocations();
  2599.   
  2600.         //
  2601.         // Connect to first DataNode in the list.
  2602.         //
  2603.         success = createBlockOutputStream(nodes, clientName, false);
  2604.         if (!success) {
  2605.           LOG.info("Abandoning block " + block);
  2606.           namenode.abandonBlock(block, src, clientName);
  2607.           // Connection failed.  Let's wait a little bit and retry
  2608.           retry = true;
  2609.           try {
  2610.             if (System.currentTimeMillis() - startTime > 5000) {
  2611.               LOG.info("Waiting to find target node: " + nodes[0].getName());
  2612.             }
  2613.             Thread.sleep(6000);
  2614.           } catch (InterruptedException iex) {
  2615.           }
  2616.         }
  2617.       } while (retry && --count >= 0);
  2618.       if (!success) {
  2619.         throw new IOException("Unable to create new block.");
  2620.       }
  2621.       return nodes;
  2622.     }
  2623.     // connects to the first datanode in the pipeline
  2624.     // Returns true if success, otherwise return failure.
  2625.     //
  2626.     private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
  2627.                     boolean recoveryFlag) {
  2628.       String firstBadLink = "";
  2629.       if (LOG.isDebugEnabled()) {
  2630.         for (int i = 0; i < nodes.length; i++) {
  2631.           LOG.debug("pipeline = " + nodes[i].getName());
  2632.         }
  2633.       }
  2634.       // persist blocks on namenode on next flush
  2635.       persistBlocks = true;
  2636.       try {
  2637.         LOG.debug("Connecting to " + nodes[0].getName());
  2638.         InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
  2639.         s = socketFactory.createSocket();
  2640.         int timeoutValue = 3000 * nodes.length + socketTimeout;
  2641.         NetUtils.connect(s, target, timeoutValue);
  2642.         s.setSoTimeout(timeoutValue);
  2643.         s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
  2644.         LOG.debug("Send buf size " + s.getSendBufferSize());
  2645.         long writeTimeout = HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
  2646.                             datanodeWriteTimeout;
  2647.         //
  2648.         // Xmit header info to datanode
  2649.         //
  2650.         DataOutputStream out = new DataOutputStream(
  2651.             new BufferedOutputStream(NetUtils.getOutputStream(s, writeTimeout), 
  2652.                                      DataNode.SMALL_BUFFER_SIZE));
  2653.         blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
  2654.         out.writeShort( DataTransferProtocol.DATA_TRANSFER_VERSION );
  2655.         out.write( DataTransferProtocol.OP_WRITE_BLOCK );
  2656.         out.writeLong( block.getBlockId() );
  2657.         out.writeLong( block.getGenerationStamp() );
  2658.         out.writeInt( nodes.length );
  2659.         out.writeBoolean( recoveryFlag );       // recovery flag
  2660.         Text.writeString( out, client );
  2661.         out.writeBoolean(false); // Not sending src node information
  2662.         out.writeInt( nodes.length - 1 );
  2663.         for (int i = 1; i < nodes.length; i++) {
  2664.           nodes[i].write(out);
  2665.         }
  2666.         checksum.writeHeader( out );
  2667.         out.flush();
  2668.         // receive ack for connect
  2669.         firstBadLink = Text.readString(blockReplyStream);
  2670.         if (firstBadLink.length() != 0) {
  2671.           throw new IOException("Bad connect ack with firstBadLink " + firstBadLink);
  2672.         }
  2673.         blockStream = out;
  2674.         return true;     // success
  2675.       } catch (IOException ie) {
  2676.         LOG.info("Exception in createBlockOutputStream " + ie);
  2677.         // find the datanode that matches
  2678.         if (firstBadLink.length() != 0) {
  2679.           for (int i = 0; i < nodes.length; i++) {
  2680.             if (nodes[i].getName().equals(firstBadLink)) {
  2681.               errorIndex = i;
  2682.               break;
  2683.             }
  2684.           }
  2685.         }
  2686.         hasError = true;
  2687.         setLastException(ie);
  2688.         blockReplyStream = null;
  2689.         return false;  // error
  2690.       }
  2691.     }
  2692.   
  2693.     private LocatedBlock locateFollowingBlock(long start
  2694.                                               ) throws IOException {     
  2695.       int retries = 5;
  2696.       long sleeptime = 400;
  2697.       while (true) {
  2698.         long localstart = System.currentTimeMillis();
  2699.         while (true) {
  2700.           try {
  2701.             return namenode.addBlock(src, clientName);
  2702.           } catch (RemoteException e) {
  2703.             IOException ue = 
  2704.               e.unwrapRemoteException(FileNotFoundException.class,
  2705.                                       AccessControlException.class,
  2706.                                       QuotaExceededException.class);
  2707.             if (ue != e) { 
  2708.               throw ue; // no need to retry these exceptions
  2709.             }
  2710.             
  2711.             if (--retries == 0 && 
  2712.                 !NotReplicatedYetException.class.getName().
  2713.                 equals(e.getClassName())) {
  2714.               throw e;
  2715.             } else {
  2716.               LOG.info(StringUtils.stringifyException(e));
  2717.               if (System.currentTimeMillis() - localstart > 5000) {
  2718.                 LOG.info("Waiting for replication for " + 
  2719.                          (System.currentTimeMillis() - localstart)/1000 + 
  2720.                          " seconds");
  2721.               }
  2722.               try {
  2723.                 LOG.warn("NotReplicatedYetException sleeping " + src +
  2724.                           " retries left " + retries);
  2725.                 Thread.sleep(sleeptime);
  2726.                 sleeptime *= 2;
  2727.               } catch (InterruptedException ie) {
  2728.               }
  2729.             }                
  2730.           }
  2731.         }
  2732.       } 
  2733.     }
  2734.   
  2735.     // @see FSOutputSummer#writeChunk()
  2736.     @Override
  2737.     protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] checksum) 
  2738.                                                           throws IOException {
  2739.       checkOpen();
  2740.       isClosed();
  2741.   
  2742.       int cklen = checksum.length;
  2743.       int bytesPerChecksum = this.checksum.getBytesPerChecksum(); 
  2744.       if (len > bytesPerChecksum) {
  2745.         throw new IOException("writeChunk() buffer size is " + len +
  2746.                               " is larger than supported  bytesPerChecksum " +
  2747.                               bytesPerChecksum);
  2748.       }
  2749.       if (checksum.length != this.checksum.getChecksumSize()) {
  2750.         throw new IOException("writeChunk() checksum size is supposed to be " +
  2751.                               this.checksum.getChecksumSize() + 
  2752.                               " but found to be " + checksum.length);
  2753.       }
  2754.       synchronized (dataQueue) {
  2755.   
  2756.         // If queue is full, then wait till we can create  enough space
  2757.         while (!closed && dataQueue.size() + ackQueue.size()  > maxPackets) {
  2758.           try {
  2759.             dataQueue.wait();
  2760.           } catch (InterruptedException  e) {
  2761.           }
  2762.         }
  2763.         isClosed();
  2764.   
  2765.         if (currentPacket == null) {
  2766.           currentPacket = new Packet(packetSize, chunksPerPacket, 
  2767.                                      bytesCurBlock);
  2768.           if (LOG.isDebugEnabled()) {
  2769.             LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
  2770.                       currentPacket.seqno +
  2771.                       ", src=" + src +
  2772.                       ", packetSize=" + packetSize +
  2773.                       ", chunksPerPacket=" + chunksPerPacket +
  2774.                       ", bytesCurBlock=" + bytesCurBlock);
  2775.           }
  2776.         }
  2777.         currentPacket.writeChecksum(checksum, 0, cklen);
  2778.         currentPacket.writeData(b, offset, len);
  2779.         currentPacket.numChunks++;
  2780.         bytesCurBlock += len;
  2781.         // If packet is full, enqueue it for transmission
  2782.         //
  2783.         if (currentPacket.numChunks == currentPacket.maxChunks ||
  2784.             bytesCurBlock == blockSize) {
  2785.           if (LOG.isDebugEnabled()) {
  2786.             LOG.debug("DFSClient writeChunk packet full seqno=" +
  2787.                       currentPacket.seqno +
  2788.                       ", src=" + src +
  2789.                       ", bytesCurBlock=" + bytesCurBlock +
  2790.                       ", blockSize=" + blockSize +
  2791.                       ", appendChunk=" + appendChunk);
  2792.           }
  2793.           //
  2794.           // if we allocated a new packet because we encountered a block
  2795.           // boundary, reset bytesCurBlock.
  2796.           //
  2797.           if (bytesCurBlock == blockSize) {
  2798.             currentPacket.lastPacketInBlock = true;
  2799.             bytesCurBlock = 0;
  2800.             lastFlushOffset = -1;
  2801.           }
  2802.           dataQueue.addLast(currentPacket);
  2803.           dataQueue.notifyAll();
  2804.           currentPacket = null;
  2805.  
  2806.           // If this was the first write after reopening a file, then the above
  2807.           // write filled up any partial chunk. Tell the summer to generate full 
  2808.           // crc chunks from now on.
  2809.           if (appendChunk) {
  2810.             appendChunk = false;
  2811.             resetChecksumChunk(bytesPerChecksum);
  2812.           }
  2813.           int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
  2814.           computePacketChunkSize(psize, bytesPerChecksum);
  2815.         }
  2816.       }
  2817.       //LOG.debug("DFSClient writeChunk done length " + len +
  2818.       //          " checksum length " + cklen);
  2819.     }
  2820.   
  2821.     /**
  2822.      * All data is written out to datanodes. It is not guaranteed 
  2823.      * that data has been flushed to persistent store on the 
  2824.      * datanode. Block allocations are persisted on namenode.
  2825.      */
  2826.     public synchronized void sync() throws IOException {
  2827.       try {
  2828.         /* Record current blockOffset. This might be changed inside
  2829.          * flushBuffer() where a partial checksum chunk might be flushed.
  2830.          * After the flush, reset the bytesCurBlock back to its previous value,
  2831.          * any partial checksum chunk will be sent now and in next packet.
  2832.          */
  2833.         long saveOffset = bytesCurBlock;
  2834.         // flush checksum buffer, but keep checksum buffer intact
  2835.         flushBuffer(true);
  2836.         LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
  2837.                   " bytesCurBlock " + bytesCurBlock +
  2838.                   " lastFlushOffset " + lastFlushOffset);
  2839.         
  2840.         // Flush only if we haven't already flushed till this offset.
  2841.         if (lastFlushOffset != bytesCurBlock) {
  2842.           // record the valid offset of this flush
  2843.           lastFlushOffset = bytesCurBlock;
  2844.           // wait for all packets to be sent and acknowledged
  2845.           flushInternal();
  2846.         } else {
  2847.           // just discard the current packet since it is already been sent.
  2848.           currentPacket = null;
  2849.         }
  2850.         
  2851.         // Restore state of stream. Record the last flush offset 
  2852.         // of the last full chunk that was flushed.
  2853.         //
  2854.         bytesCurBlock = saveOffset;
  2855.         // If any new blocks were allocated since the last flush, 
  2856.         // then persist block locations on namenode. 
  2857.         //
  2858.         if (persistBlocks) {
  2859.           namenode.fsync(src, clientName);
  2860.           persistBlocks = false;
  2861.         }
  2862.       } catch (IOException e) {
  2863.           lastException = new IOException("IOException flush:" + e);
  2864.           closed = true;
  2865.           closeThreads();
  2866.           throw e;
  2867.       }
  2868.     }
  2869.     /**
  2870.      * Waits till all existing data is flushed and confirmations 
  2871.      * received from datanodes. 
  2872.      */
  2873.     private synchronized void flushInternal() throws IOException {
  2874.       checkOpen();
  2875.       isClosed();
  2876.       while (!closed) {
  2877.         synchronized (dataQueue) {
  2878.           isClosed();
  2879.           //
  2880.           // If there is data in the current buffer, send it across
  2881.           //
  2882.           if (currentPacket != null) {
  2883.             dataQueue.addLast(currentPacket);
  2884.             dataQueue.notifyAll();
  2885.             currentPacket = null;
  2886.           }
  2887.           // wait for all buffers to be flushed to datanodes
  2888.           if (!closed && dataQueue.size() != 0) {
  2889.             try {
  2890.               dataQueue.wait();
  2891.             } catch (InterruptedException e) {
  2892.             }
  2893.             continue;
  2894.           }
  2895.         }
  2896.         // wait for all acks to be received back from datanodes
  2897.         synchronized (ackQueue) {
  2898.           if (!closed && ackQueue.size() != 0) {
  2899.             try {
  2900.               ackQueue.wait();
  2901.             } catch (InterruptedException e) {
  2902.             }
  2903.             continue;
  2904.           }
  2905.         }
  2906.         // acquire both the locks and verify that we are
  2907.         // *really done*. In the case of error recovery, 
  2908.         // packets might move back from ackQueue to dataQueue.
  2909.         //
  2910.         synchronized (dataQueue) {
  2911.           synchronized (ackQueue) {
  2912.             if (dataQueue.size() + ackQueue.size() == 0) {
  2913.               break;       // we are done
  2914.             }
  2915.           }
  2916.         }
  2917.       }
  2918.     }
  2919.   
  2920.     /**
  2921.      * Closes this output stream and releases any system 
  2922.      * resources associated with this stream.
  2923.      */
  2924.     @Override
  2925.     public void close() throws IOException {
  2926.       if(closed)
  2927.         return;
  2928.       closeInternal();
  2929.       leasechecker.remove(src);
  2930.       
  2931.       if (s != null) {
  2932.         s.close();
  2933.         s = null;
  2934.       }
  2935.     }
  2936.  
  2937.     // shutdown datastreamer and responseprocessor threads.
  2938.     private void closeThreads() throws IOException {
  2939.       try {
  2940.         streamer.close();
  2941.         streamer.join();
  2942.         
  2943.         // shutdown response after streamer has exited.
  2944.         if (response != null) {
  2945.           response.close();
  2946.           response.join();
  2947.           response = null;
  2948.         }
  2949.       } catch (InterruptedException e) {
  2950.         throw new IOException("Failed to shutdown response thread");
  2951.       }
  2952.     }
  2953.     
  2954.     /**
  2955.      * Closes this output stream and releases any system 
  2956.      * resources associated with this stream.
  2957.      */
  2958.     private synchronized void closeInternal() throws IOException {
  2959.       checkOpen();
  2960.       isClosed();
  2961.       try {
  2962.           flushBuffer();       // flush from all upper layers
  2963.       
  2964.           // Mark that this packet is the last packet in block.
  2965.           // If there are no outstanding packets and the last packet
  2966.           // was not the last one in the current block, then create a
  2967.           // packet with empty payload.
  2968.           synchronized (dataQueue) {
  2969.             if (currentPacket == null && bytesCurBlock != 0) {
  2970.               currentPacket = new Packet(packetSize, chunksPerPacket,
  2971.                                          bytesCurBlock);
  2972.             }
  2973.             if (currentPacket != null) { 
  2974.               currentPacket.lastPacketInBlock = true;
  2975.             }
  2976.           }
  2977.         flushInternal();             // flush all data to Datanodes
  2978.         isClosed(); // check to see if flushInternal had any exceptions
  2979.         closed = true; // allow closeThreads() to showdown threads
  2980.         closeThreads();
  2981.         
  2982.         synchronized (dataQueue) {
  2983.           if (blockStream != null) {
  2984.             blockStream.writeInt(0); // indicate end-of-block to datanode
  2985.             blockStream.close();
  2986.             blockReplyStream.close();
  2987.           }
  2988.           if (s != null) {
  2989.             s.close();
  2990.             s = null;
  2991.           }
  2992.         }
  2993.         streamer = null;
  2994.         blockStream = null;
  2995.         blockReplyStream = null;
  2996.         long localstart = System.currentTimeMillis();
  2997.         boolean fileComplete = false;
  2998.         while (!fileComplete) {
  2999.           fileComplete = namenode.complete(src, clientName);
  3000.           if (!fileComplete) {
  3001.             try {
  3002.               Thread.sleep(400);
  3003.               if (System.currentTimeMillis() - localstart > 5000) {
  3004.                 LOG.info("Could not complete file " + src + " retrying...");
  3005.               }
  3006.             } catch (InterruptedException ie) {
  3007.             }
  3008.           }
  3009.         }
  3010.       } finally {
  3011.         closed = true;
  3012.       }
  3013.     }
  3014.     void setArtificialSlowdown(long period) {
  3015.       artificialSlowdown = period;
  3016.     }
  3017.     synchronized void setChunksPerPacket(int value) {
  3018.       chunksPerPacket = Math.min(chunksPerPacket, value);
  3019.       packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER +
  3020.                    (checksum.getBytesPerChecksum() + 
  3021.                     checksum.getChecksumSize()) * chunksPerPacket;
  3022.     }
  3023.     synchronized void setTestFilename(String newname) {
  3024.       src = newname;
  3025.     }
  3026.     /**
  3027.      * Returns the size of a file as it was when this stream was opened
  3028.      */
  3029.     long getInitialLen() {
  3030.       return initialFileSize;
  3031.     }
  3032.   }
  3033.   void reportChecksumFailure(String file, Block blk, DatanodeInfo dn) {
  3034.     DatanodeInfo [] dnArr = { dn };
  3035.     LocatedBlock [] lblocks = { new LocatedBlock(blk, dnArr) };
  3036.     reportChecksumFailure(file, lblocks);
  3037.   }
  3038.     
  3039.   // just reports checksum failure and ignores any exception during the report.
  3040.   void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
  3041.     try {
  3042.       reportBadBlocks(lblocks);
  3043.     } catch (IOException ie) {
  3044.       LOG.info("Found corruption while reading " + file 
  3045.                + ".  Error repairing corrupt blocks.  Bad blocks remain. " 
  3046.                + StringUtils.stringifyException(ie));
  3047.     }
  3048.   }
  3049.   /** {@inheritDoc} */
  3050.   public String toString() {
  3051.     return getClass().getSimpleName() + "[clientName=" + clientName
  3052.         + ", ugi=" + ugi + "]"; 
  3053.   }
  3054. }