FSDataset.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:46k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.*;
  20. import java.util.*;
  21. import javax.management.NotCompliantMBeanException;
  22. import javax.management.ObjectName;
  23. import javax.management.StandardMBean;
  24. import org.apache.hadoop.fs.*;
  25. import org.apache.hadoop.hdfs.protocol.Block;
  26. import org.apache.hadoop.hdfs.protocol.FSConstants;
  27. import org.apache.hadoop.metrics.util.MBeanUtil;
  28. import org.apache.hadoop.util.DataChecksum;
  29. import org.apache.hadoop.util.DiskChecker;
  30. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  31. import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
  32. import org.apache.hadoop.conf.*;
  33. import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
  34. import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
  35. /**************************************************
  36.  * FSDataset manages a set of data blocks.  Each block
  37.  * has a unique name and an extent on disk.
  38.  *
  39.  ***************************************************/
  40. public class FSDataset implements FSConstants, FSDatasetInterface {
  41.   /**
  42.    * A node type that can be built into a tree reflecting the
  43.    * hierarchy of blocks on the local disk.
  44.    */
  45.   class FSDir {
  46.     File dir;
  47.     int numBlocks = 0;
  48.     FSDir children[];
  49.     int lastChildIdx = 0;
  50.     /**
  51.      */
  52.     public FSDir(File dir) 
  53.       throws IOException {
  54.       this.dir = dir;
  55.       this.children = null;
  56.       if (!dir.exists()) {
  57.         if (!dir.mkdirs()) {
  58.           throw new IOException("Mkdirs failed to create " + 
  59.                                 dir.toString());
  60.         }
  61.       } else {
  62.         File[] files = dir.listFiles();
  63.         int numChildren = 0;
  64.         for (int idx = 0; idx < files.length; idx++) {
  65.           if (files[idx].isDirectory()) {
  66.             numChildren++;
  67.           } else if (Block.isBlockFilename(files[idx])) {
  68.             numBlocks++;
  69.           }
  70.         }
  71.         if (numChildren > 0) {
  72.           children = new FSDir[numChildren];
  73.           int curdir = 0;
  74.           for (int idx = 0; idx < files.length; idx++) {
  75.             if (files[idx].isDirectory()) {
  76.               children[curdir] = new FSDir(files[idx]);
  77.               curdir++;
  78.             }
  79.           }
  80.         }
  81.       }
  82.     }
  83.         
  84.     public File addBlock(Block b, File src) throws IOException {
  85.       //First try without creating subdirectories
  86.       File file = addBlock(b, src, false, false);          
  87.       return (file != null) ? file : addBlock(b, src, true, true);
  88.     }
  89.     private File addBlock(Block b, File src, boolean createOk, 
  90.                           boolean resetIdx) throws IOException {
  91.       if (numBlocks < maxBlocksPerDir) {
  92.         File dest = new File(dir, b.getBlockName());
  93.         File metaData = getMetaFile( src, b );
  94.         File newmeta = getMetaFile(dest, b);
  95.         if ( ! metaData.renameTo( newmeta ) ||
  96.             ! src.renameTo( dest ) ) {
  97.           throw new IOException( "could not move files for " + b +
  98.                                  " from tmp to " + 
  99.                                  dest.getAbsolutePath() );
  100.         }
  101.         if (DataNode.LOG.isDebugEnabled()) {
  102.           DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
  103.           DataNode.LOG.debug("addBlock: Moved " + src + " to " + dest);
  104.         }
  105.         numBlocks += 1;
  106.         return dest;
  107.       }
  108.             
  109.       if (lastChildIdx < 0 && resetIdx) {
  110.         //reset so that all children will be checked
  111.         lastChildIdx = random.nextInt(children.length);              
  112.       }
  113.             
  114.       if (lastChildIdx >= 0 && children != null) {
  115.         //Check if any child-tree has room for a block.
  116.         for (int i=0; i < children.length; i++) {
  117.           int idx = (lastChildIdx + i)%children.length;
  118.           File file = children[idx].addBlock(b, src, false, resetIdx);
  119.           if (file != null) {
  120.             lastChildIdx = idx;
  121.             return file; 
  122.           }
  123.         }
  124.         lastChildIdx = -1;
  125.       }
  126.             
  127.       if (!createOk) {
  128.         return null;
  129.       }
  130.             
  131.       if (children == null || children.length == 0) {
  132.         children = new FSDir[maxBlocksPerDir];
  133.         for (int idx = 0; idx < maxBlocksPerDir; idx++) {
  134.           children[idx] = new FSDir(new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx));
  135.         }
  136.       }
  137.             
  138.       //now pick a child randomly for creating a new set of subdirs.
  139.       lastChildIdx = random.nextInt(children.length);
  140.       return children[ lastChildIdx ].addBlock(b, src, true, false); 
  141.     }
  142.     /** Find the metadata file for the specified block file.
  143.      * Return the generation stamp from the name of the metafile.
  144.      */
  145.     long getGenerationStampFromFile(File[] listdir, File blockFile) {
  146.       String blockName = blockFile.getName();
  147.       for (int j = 0; j < listdir.length; j++) {
  148.         String path = listdir[j].getName();
  149.         if (!path.startsWith(blockName)) {
  150.           continue;
  151.         }
  152.         String[] vals = path.split("_");
  153.         if (vals.length != 3) {     // blk, blkid, genstamp.meta
  154.           continue;
  155.         }
  156.         String[] str = vals[2].split("\.");
  157.         if (str.length != 2) {
  158.           continue;
  159.         }
  160.         return Long.parseLong(str[0]);
  161.       }
  162.       DataNode.LOG.warn("Block " + blockFile + 
  163.                         " does not have a metafile!");
  164.       return Block.GRANDFATHER_GENERATION_STAMP;
  165.     }
  166.     /**
  167.      * Populate the given blockSet with any child blocks
  168.      * found at this node.
  169.      */
  170.     public void getBlockInfo(TreeSet<Block> blockSet) {
  171.       if (children != null) {
  172.         for (int i = 0; i < children.length; i++) {
  173.           children[i].getBlockInfo(blockSet);
  174.         }
  175.       }
  176.       File blockFiles[] = dir.listFiles();
  177.       for (int i = 0; i < blockFiles.length; i++) {
  178.         if (Block.isBlockFilename(blockFiles[i])) {
  179.           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
  180.           blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp));
  181.         }
  182.       }
  183.     }
  184.     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
  185.       if (children != null) {
  186.         for (int i = 0; i < children.length; i++) {
  187.           children[i].getVolumeMap(volumeMap, volume);
  188.         }
  189.       }
  190.       File blockFiles[] = dir.listFiles();
  191.       for (int i = 0; i < blockFiles.length; i++) {
  192.         if (Block.isBlockFilename(blockFiles[i])) {
  193.           long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
  194.           volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp), 
  195.                         new DatanodeBlockInfo(volume, blockFiles[i]));
  196.         }
  197.       }
  198.     }
  199.         
  200.     /**
  201.      * check if a data diretory is healthy
  202.      * @throws DiskErrorException
  203.      */
  204.     public void checkDirTree() throws DiskErrorException {
  205.       DiskChecker.checkDir(dir);
  206.             
  207.       if (children != null) {
  208.         for (int i = 0; i < children.length; i++) {
  209.           children[i].checkDirTree();
  210.         }
  211.       }
  212.     }
  213.         
  214.     void clearPath(File f) {
  215.       String root = dir.getAbsolutePath();
  216.       String dir = f.getAbsolutePath();
  217.       if (dir.startsWith(root)) {
  218.         String[] dirNames = dir.substring(root.length()).
  219.           split(File.separator + "subdir");
  220.         if (clearPath(f, dirNames, 1))
  221.           return;
  222.       }
  223.       clearPath(f, null, -1);
  224.     }
  225.         
  226.     /*
  227.      * dirNames is an array of string integers derived from
  228.      * usual directory structure data/subdirN/subdirXY/subdirM ...
  229.      * If dirName array is non-null, we only check the child at 
  230.      * the children[dirNames[idx]]. This avoids iterating over
  231.      * children in common case. If directory structure changes 
  232.      * in later versions, we need to revisit this.
  233.      */
  234.     private boolean clearPath(File f, String[] dirNames, int idx) {
  235.       if ((dirNames == null || idx == dirNames.length) &&
  236.           dir.compareTo(f) == 0) {
  237.         numBlocks--;
  238.         return true;
  239.       }
  240.           
  241.       if (dirNames != null) {
  242.         //guess the child index from the directory name
  243.         if (idx > (dirNames.length - 1) || children == null) {
  244.           return false;
  245.         }
  246.         int childIdx; 
  247.         try {
  248.           childIdx = Integer.parseInt(dirNames[idx]);
  249.         } catch (NumberFormatException ignored) {
  250.           // layout changed? we could print a warning.
  251.           return false;
  252.         }
  253.         return (childIdx >= 0 && childIdx < children.length) ?
  254.           children[childIdx].clearPath(f, dirNames, idx+1) : false;
  255.       }
  256.       //guesses failed. back to blind iteration.
  257.       if (children != null) {
  258.         for(int i=0; i < children.length; i++) {
  259.           if (children[i].clearPath(f, null, -1)){
  260.             return true;
  261.           }
  262.         }
  263.       }
  264.       return false;
  265.     }
  266.         
  267.     public String toString() {
  268.       return "FSDir{" +
  269.         "dir=" + dir +
  270.         ", children=" + (children == null ? null : Arrays.asList(children)) +
  271.         "}";
  272.     }
  273.   }
  274.   class FSVolume {
  275.     private FSDir dataDir;
  276.     private File tmpDir;
  277.     private File detachDir; // copy on write for blocks in snapshot
  278.     private DF usage;
  279.     private DU dfsUsage;
  280.     private long reserved;
  281.     
  282.     FSVolume(File currentDir, Configuration conf) throws IOException {
  283.       this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
  284.       boolean supportAppends = conf.getBoolean("dfs.support.append", false);
  285.       File parent = currentDir.getParentFile();
  286.       this.detachDir = new File(parent, "detach");
  287.       if (detachDir.exists()) {
  288.         recoverDetachedBlocks(currentDir, detachDir);
  289.       }
  290.       // Files that were being written when the datanode was last shutdown
  291.       // are now moved back to the data directory. It is possible that
  292.       // in the future, we might want to do some sort of datanode-local
  293.       // recovery for these blocks. For example, crc validation.
  294.       //
  295.       this.tmpDir = new File(parent, "tmp");
  296.       if (tmpDir.exists()) {
  297.         if (supportAppends) {
  298.           recoverDetachedBlocks(currentDir, tmpDir);
  299.         } else {
  300.           FileUtil.fullyDelete(tmpDir);
  301.         }
  302.       }
  303.       this.dataDir = new FSDir(currentDir);
  304.       if (!tmpDir.mkdirs()) {
  305.         if (!tmpDir.isDirectory()) {
  306.           throw new IOException("Mkdirs failed to create " + tmpDir.toString());
  307.         }
  308.       }
  309.       if (!detachDir.mkdirs()) {
  310.         if (!detachDir.isDirectory()) {
  311.           throw new IOException("Mkdirs failed to create " + detachDir.toString());
  312.         }
  313.       }
  314.       this.usage = new DF(parent, conf);
  315.       this.dfsUsage = new DU(parent, conf);
  316.       this.dfsUsage.start();
  317.     }
  318.     void decDfsUsed(long value) {
  319.       dfsUsage.decDfsUsed(value);
  320.     }
  321.     
  322.     long getDfsUsed() throws IOException {
  323.       return dfsUsage.getUsed();
  324.     }
  325.     
  326.     long getCapacity() throws IOException {
  327.       if (reserved > usage.getCapacity()) {
  328.         return 0;
  329.       }
  330.       return usage.getCapacity()-reserved;
  331.     }
  332.       
  333.     long getAvailable() throws IOException {
  334.       long remaining = getCapacity()-getDfsUsed();
  335.       long available = usage.getAvailable();
  336.       if (remaining>available) {
  337.         remaining = available;
  338.       }
  339.       return (remaining > 0) ? remaining : 0;
  340.     }
  341.       
  342.     String getMount() throws IOException {
  343.       return usage.getMount();
  344.     }
  345.       
  346.     File getDir() {
  347.       return dataDir.dir;
  348.     }
  349.     
  350.     /**
  351.      * Temporary files. They get moved to the real block directory either when
  352.      * the block is finalized or the datanode restarts.
  353.      */
  354.     File createTmpFile(Block b) throws IOException {
  355.       File f = new File(tmpDir, b.getBlockName());
  356.       return createTmpFile(b, f);
  357.     }
  358.     /**
  359.      * Returns the name of the temporary file for this block.
  360.      */
  361.     File getTmpFile(Block b) throws IOException {
  362.       File f = new File(tmpDir, b.getBlockName());
  363.       return f;
  364.     }
  365.     /**
  366.      * Files used for copy-on-write. They need recovery when datanode
  367.      * restarts.
  368.      */
  369.     File createDetachFile(Block b, String filename) throws IOException {
  370.       File f = new File(detachDir, filename);
  371.       return createTmpFile(b, f);
  372.     }
  373.     private File createTmpFile(Block b, File f) throws IOException {
  374.       if (f.exists()) {
  375.         throw new IOException("Unexpected problem in creating temporary file for "+
  376.                               b + ".  File " + f + " should not be present, but is.");
  377.       }
  378.       // Create the zero-length temp file
  379.       //
  380.       boolean fileCreated = false;
  381.       try {
  382.         fileCreated = f.createNewFile();
  383.       } catch (IOException ioe) {
  384.         throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
  385.       }
  386.       if (!fileCreated) {
  387.         throw new IOException("Unexpected problem in creating temporary file for "+
  388.                               b + ".  File " + f + " should be creatable, but is already present.");
  389.       }
  390.       return f;
  391.     }
  392.       
  393.     File addBlock(Block b, File f) throws IOException {
  394.       File blockFile = dataDir.addBlock(b, f);
  395.       File metaFile = getMetaFile( blockFile , b);
  396.       dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
  397.       return blockFile;
  398.     }
  399.       
  400.     void checkDirs() throws DiskErrorException {
  401.       dataDir.checkDirTree();
  402.       DiskChecker.checkDir(tmpDir);
  403.     }
  404.       
  405.     void getBlockInfo(TreeSet<Block> blockSet) {
  406.       dataDir.getBlockInfo(blockSet);
  407.     }
  408.       
  409.     void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
  410.       dataDir.getVolumeMap(volumeMap, this);
  411.     }
  412.       
  413.     void clearPath(File f) {
  414.       dataDir.clearPath(f);
  415.     }
  416.       
  417.     public String toString() {
  418.       return dataDir.dir.getAbsolutePath();
  419.     }
  420.     /**
  421.      * Recover detached files on datanode restart. If a detached block
  422.      * does not exist in the original directory, then it is moved to the
  423.      * original directory.
  424.      */
  425.     private void recoverDetachedBlocks(File dataDir, File dir) 
  426.                                            throws IOException {
  427.       File contents[] = dir.listFiles();
  428.       if (contents == null) {
  429.         return;
  430.       }
  431.       for (int i = 0; i < contents.length; i++) {
  432.         if (!contents[i].isFile()) {
  433.           throw new IOException ("Found " + contents[i] + " in " + dir +
  434.                                  " but it is not a file.");
  435.         }
  436.         //
  437.         // If the original block file still exists, then no recovery
  438.         // is needed.
  439.         //
  440.         File blk = new File(dataDir, contents[i].getName());
  441.         if (!blk.exists()) {
  442.           if (!contents[i].renameTo(blk)) {
  443.             throw new IOException("Unable to recover detached file " +
  444.                                   contents[i]);
  445.           }
  446.           continue;
  447.         }
  448.         if (!contents[i].delete()) {
  449.             throw new IOException("Unable to cleanup detached file " +
  450.                                   contents[i]);
  451.         }
  452.       }
  453.     }
  454.   }
  455.     
  456.   static class FSVolumeSet {
  457.     FSVolume[] volumes = null;
  458.     int curVolume = 0;
  459.       
  460.     FSVolumeSet(FSVolume[] volumes) {
  461.       this.volumes = volumes;
  462.     }
  463.       
  464.     synchronized FSVolume getNextVolume(long blockSize) throws IOException {
  465.       int startVolume = curVolume;
  466.       while (true) {
  467.         FSVolume volume = volumes[curVolume];
  468.         curVolume = (curVolume + 1) % volumes.length;
  469.         if (volume.getAvailable() > blockSize) { return volume; }
  470.         if (curVolume == startVolume) {
  471.           throw new DiskOutOfSpaceException("Insufficient space for an additional block");
  472.         }
  473.       }
  474.     }
  475.       
  476.     long getDfsUsed() throws IOException {
  477.       long dfsUsed = 0L;
  478.       for (int idx = 0; idx < volumes.length; idx++) {
  479.         dfsUsed += volumes[idx].getDfsUsed();
  480.       }
  481.       return dfsUsed;
  482.     }
  483.     synchronized long getCapacity() throws IOException {
  484.       long capacity = 0L;
  485.       for (int idx = 0; idx < volumes.length; idx++) {
  486.         capacity += volumes[idx].getCapacity();
  487.       }
  488.       return capacity;
  489.     }
  490.       
  491.     synchronized long getRemaining() throws IOException {
  492.       long remaining = 0L;
  493.       for (int idx = 0; idx < volumes.length; idx++) {
  494.         remaining += volumes[idx].getAvailable();
  495.       }
  496.       return remaining;
  497.     }
  498.       
  499.     synchronized void getBlockInfo(TreeSet<Block> blockSet) {
  500.       for (int idx = 0; idx < volumes.length; idx++) {
  501.         volumes[idx].getBlockInfo(blockSet);
  502.       }
  503.     }
  504.       
  505.     synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
  506.       for (int idx = 0; idx < volumes.length; idx++) {
  507.         volumes[idx].getVolumeMap(volumeMap);
  508.       }
  509.     }
  510.       
  511.     synchronized void checkDirs() throws DiskErrorException {
  512.       for (int idx = 0; idx < volumes.length; idx++) {
  513.         volumes[idx].checkDirs();
  514.       }
  515.     }
  516.       
  517.     public String toString() {
  518.       StringBuffer sb = new StringBuffer();
  519.       for (int idx = 0; idx < volumes.length; idx++) {
  520.         sb.append(volumes[idx].toString());
  521.         if (idx != volumes.length - 1) { sb.append(","); }
  522.       }
  523.       return sb.toString();
  524.     }
  525.   }
  526.   
  527.   //////////////////////////////////////////////////////
  528.   //
  529.   // FSDataSet
  530.   //
  531.   //////////////////////////////////////////////////////
  532.   //Find better place?
  533.   public static final String METADATA_EXTENSION = ".meta";
  534.   public static final short METADATA_VERSION = 1;
  535.     
  536.   static class ActiveFile {
  537.     final File file;
  538.     final List<Thread> threads = new ArrayList<Thread>(2);
  539.     ActiveFile(File f, List<Thread> list) {
  540.       file = f;
  541.       if (list != null) {
  542.         threads.addAll(list);
  543.       }
  544.       threads.add(Thread.currentThread());
  545.     }
  546.     
  547.     public String toString() {
  548.       return getClass().getSimpleName() + "(file=" + file
  549.           + ", threads=" + threads + ")";
  550.     }
  551.   } 
  552.   
  553.   static String getMetaFileName(String blockFileName, long genStamp) {
  554.     return blockFileName + "_" + genStamp + METADATA_EXTENSION;
  555.   }
  556.   
  557.   static File getMetaFile(File f , Block b) {
  558.     return new File(getMetaFileName(f.getAbsolutePath(),
  559.                                     b.getGenerationStamp())); 
  560.   }
  561.   protected File getMetaFile(Block b) throws IOException {
  562.     return getMetaFile(getBlockFile(b), b);
  563.   }
  564.   /** Find the corresponding meta data file from a given block file */
  565.   private static File findMetaFile(final File blockFile) throws IOException {
  566.     final String prefix = blockFile.getName() + "_";
  567.     final File parent = blockFile.getParentFile();
  568.     File[] matches = parent.listFiles(new FilenameFilter() {
  569.       public boolean accept(File dir, String name) {
  570.         return dir.equals(parent)
  571.             && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
  572.       }
  573.     });
  574.     if (matches == null || matches.length == 0) {
  575.       throw new IOException("Meta file not found, blockFile=" + blockFile);
  576.     }
  577.     else if (matches.length > 1) {
  578.       throw new IOException("Found more than one meta files: " 
  579.           + Arrays.asList(matches));
  580.     }
  581.     return matches[0];
  582.   }
  583.   
  584.   /** Find the corresponding meta data file from a given block file */
  585.   private static long parseGenerationStamp(File blockFile, File metaFile
  586.       ) throws IOException {
  587.     String metaname = metaFile.getName();
  588.     String gs = metaname.substring(blockFile.getName().length() + 1,
  589.         metaname.length() - METADATA_EXTENSION.length());
  590.     try {
  591.       return Long.parseLong(gs);
  592.     } catch(NumberFormatException nfe) {
  593.       throw (IOException)new IOException("blockFile=" + blockFile
  594.           + ", metaFile=" + metaFile).initCause(nfe);
  595.     }
  596.   }
  597.   /** Return the block file for the given ID */ 
  598.   public File findBlockFile(long blockId) {
  599.     final Block b = new Block(blockId);
  600.     File blockfile = null;
  601.     ActiveFile activefile = ongoingCreates.get(b);
  602.     if (activefile != null) {
  603.       blockfile = activefile.file;
  604.     }
  605.     if (blockfile == null) {
  606.       blockfile = getFile(b);
  607.     }
  608.     if (blockfile == null) {
  609.       if (DataNode.LOG.isDebugEnabled()) {
  610.         DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
  611.         DataNode.LOG.debug("volumeMap=" + volumeMap);
  612.       }
  613.     }
  614.     return blockfile;
  615.   }
  616.   /** {@inheritDoc} */
  617.   public synchronized Block getStoredBlock(long blkid) throws IOException {
  618.     File blockfile = findBlockFile(blkid);
  619.     if (blockfile == null) {
  620.       return null;
  621.     }
  622.     File metafile = findMetaFile(blockfile);
  623.     return new Block(blkid, blockfile.length(),
  624.         parseGenerationStamp(blockfile, metafile));
  625.   }
  626.   public boolean metaFileExists(Block b) throws IOException {
  627.     return getMetaFile(b).exists();
  628.   }
  629.   
  630.   public long getMetaDataLength(Block b) throws IOException {
  631.     File checksumFile = getMetaFile( b );
  632.     return checksumFile.length();
  633.   }
  634.   public MetaDataInputStream getMetaDataInputStream(Block b)
  635.       throws IOException {
  636.     File checksumFile = getMetaFile( b );
  637.     return new MetaDataInputStream(new FileInputStream(checksumFile),
  638.                                                     checksumFile.length());
  639.   }
  640.   FSVolumeSet volumes;
  641.   private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
  642.   private int maxBlocksPerDir = 0;
  643.   private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
  644.   static  Random random = new Random();
  645.   
  646.   /**
  647.    * An FSDataset has a directory where it loads its data files.
  648.    */
  649.   public FSDataset(DataStorage storage, Configuration conf) throws IOException {
  650.     this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
  651.     FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
  652.     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
  653.       volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
  654.     }
  655.     volumes = new FSVolumeSet(volArray);
  656.     volumeMap = new HashMap<Block, DatanodeBlockInfo>();
  657.     volumes.getVolumeMap(volumeMap);
  658.     registerMBean(storage.getStorageID());
  659.   }
  660.   /**
  661.    * Return the total space used by dfs datanode
  662.    */
  663.   public long getDfsUsed() throws IOException {
  664.     return volumes.getDfsUsed();
  665.   }
  666.   
  667.   /**
  668.    * Return total capacity, used and unused
  669.    */
  670.   public long getCapacity() throws IOException {
  671.     return volumes.getCapacity();
  672.   }
  673.   /**
  674.    * Return how many bytes can still be stored in the FSDataset
  675.    */
  676.   public long getRemaining() throws IOException {
  677.     return volumes.getRemaining();
  678.   }
  679.   /**
  680.    * Find the block's on-disk length
  681.    */
  682.   public long getLength(Block b) throws IOException {
  683.     return getBlockFile(b).length();
  684.   }
  685.   /**
  686.    * Get File name for a given block.
  687.    */
  688.   public synchronized File getBlockFile(Block b) throws IOException {
  689.     File f = validateBlockFile(b);
  690.     if(f == null) {
  691.       if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
  692.         InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
  693.       }
  694.       throw new IOException("Block " + b + " is not valid.");
  695.     }
  696.     return f;
  697.   }
  698.   
  699.   public synchronized InputStream getBlockInputStream(Block b) throws IOException {
  700.     return new FileInputStream(getBlockFile(b));
  701.   }
  702.   public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
  703.     File blockFile = getBlockFile(b);
  704.     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
  705.     if (seekOffset > 0) {
  706.       blockInFile.seek(seekOffset);
  707.     }
  708.     return new FileInputStream(blockInFile.getFD());
  709.   }
  710.   /**
  711.    * Returns handles to the block file and its metadata file
  712.    */
  713.   public synchronized BlockInputStreams getTmpInputStreams(Block b, 
  714.                           long blkOffset, long ckoff) throws IOException {
  715.     DatanodeBlockInfo info = volumeMap.get(b);
  716.     if (info == null) {
  717.       throw new IOException("Block " + b + " does not exist in volumeMap.");
  718.     }
  719.     FSVolume v = info.getVolume();
  720.     File blockFile = v.getTmpFile(b);
  721.     RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
  722.     if (blkOffset > 0) {
  723.       blockInFile.seek(blkOffset);
  724.     }
  725.     File metaFile = getMetaFile(blockFile, b);
  726.     RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
  727.     if (ckoff > 0) {
  728.       metaInFile.seek(ckoff);
  729.     }
  730.     return new BlockInputStreams(new FileInputStream(blockInFile.getFD()),
  731.                                 new FileInputStream(metaInFile.getFD()));
  732.   }
  733.     
  734.   private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
  735.       return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
  736.           new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
  737.   }
  738.   /**
  739.    * Make a copy of the block if this block is linked to an existing
  740.    * snapshot. This ensures that modifying this block does not modify
  741.    * data in any existing snapshots.
  742.    * @param block Block
  743.    * @param numLinks Detach if the number of links exceed this value
  744.    * @throws IOException
  745.    * @return - true if the specified block was detached
  746.    */
  747.   public boolean detachBlock(Block block, int numLinks) throws IOException {
  748.     DatanodeBlockInfo info = null;
  749.     synchronized (this) {
  750.       info = volumeMap.get(block);
  751.     }
  752.     return info.detachBlock(block, numLinks);
  753.   }
  754.   static private <T> void updateBlockMap(Map<Block, T> blockmap,
  755.       Block oldblock, Block newblock) throws IOException {
  756.     if (blockmap.containsKey(oldblock)) {
  757.       T value = blockmap.remove(oldblock);
  758.       blockmap.put(newblock, value);
  759.     }
  760.   }
  761.   /** {@inheritDoc} */
  762.   public void updateBlock(Block oldblock, Block newblock) throws IOException {
  763.     if (oldblock.getBlockId() != newblock.getBlockId()) {
  764.       throw new IOException("Cannot update oldblock (=" + oldblock
  765.           + ") to newblock (=" + newblock + ").");
  766.     }
  767.     
  768.     for(;;) {
  769.       final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
  770.       if (threads == null) {
  771.         return;
  772.       }
  773.       // interrupt and wait for all ongoing create threads
  774.       for(Thread t : threads) {
  775.         t.interrupt();
  776.       }
  777.       for(Thread t : threads) {
  778.         try {
  779.           t.join();
  780.         } catch (InterruptedException e) {
  781.           DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
  782.         }
  783.       }
  784.     }
  785.   }
  786.   /**
  787.    * Try to update an old block to a new block.
  788.    * If there are ongoing create threads running for the old block,
  789.    * the threads will be returned without updating the block. 
  790.    * 
  791.    * @return ongoing create threads if there is any. Otherwise, return null.
  792.    */
  793.   private synchronized List<Thread> tryUpdateBlock(
  794.       Block oldblock, Block newblock) throws IOException {
  795.     //check ongoing create threads
  796.     final ActiveFile activefile = ongoingCreates.get(oldblock);
  797.     if (activefile != null && !activefile.threads.isEmpty()) {
  798.       //remove dead threads
  799.       for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
  800.         final Thread t = i.next();
  801.         if (!t.isAlive()) {
  802.           i.remove();
  803.         }
  804.       }
  805.       //return living threads
  806.       if (!activefile.threads.isEmpty()) {
  807.         return new ArrayList<Thread>(activefile.threads);
  808.       }
  809.     }
  810.     //No ongoing create threads is alive.  Update block.
  811.     File blockFile = findBlockFile(oldblock.getBlockId());
  812.     if (blockFile == null) {
  813.       throw new IOException("Block " + oldblock + " does not exist.");
  814.     }
  815.     File oldMetaFile = findMetaFile(blockFile);
  816.     long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
  817.     
  818.     //rename meta file to a tmp file
  819.     File tmpMetaFile = new File(oldMetaFile.getParent(),
  820.         oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
  821.     if (!oldMetaFile.renameTo(tmpMetaFile)){
  822.       throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
  823.     }
  824.     //update generation stamp
  825.     if (oldgs > newblock.getGenerationStamp()) {
  826.       throw new IOException("Cannot update block (id=" + newblock.getBlockId()
  827.           + ") generation stamp from " + oldgs
  828.           + " to " + newblock.getGenerationStamp());
  829.     }
  830.     
  831.     //update length
  832.     if (newblock.getNumBytes() > oldblock.getNumBytes()) {
  833.       throw new IOException("Cannot update block file (=" + blockFile
  834.           + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
  835.     }
  836.     if (newblock.getNumBytes() < oldblock.getNumBytes()) {
  837.       truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
  838.     }
  839.     //rename the tmp file to the new meta file (with new generation stamp)
  840.     File newMetaFile = getMetaFile(blockFile, newblock);
  841.     if (!tmpMetaFile.renameTo(newMetaFile)) {
  842.       throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
  843.     }
  844.     updateBlockMap(ongoingCreates, oldblock, newblock);
  845.     updateBlockMap(volumeMap, oldblock, newblock);
  846.     // paranoia! verify that the contents of the stored block 
  847.     // matches the block file on disk.
  848.     validateBlockMetadata(newblock);
  849.     return null;
  850.   }
  851.   static private void truncateBlock(File blockFile, File metaFile,
  852.       long oldlen, long newlen) throws IOException {
  853.     if (newlen == oldlen) {
  854.       return;
  855.     }
  856.     if (newlen > oldlen) {
  857.       throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
  858.           + ") to newlen (=" + newlen + ")");
  859.     }
  860.     DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum(); 
  861.     int checksumsize = dcs.getChecksumSize();
  862.     int bpc = dcs.getBytesPerChecksum();
  863.     long n = (newlen - 1)/bpc + 1;
  864.     long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
  865.     long lastchunkoffset = (n - 1)*bpc;
  866.     int lastchunksize = (int)(newlen - lastchunkoffset); 
  867.     byte[] b = new byte[Math.max(lastchunksize, checksumsize)]; 
  868.     RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
  869.     try {
  870.       //truncate blockFile 
  871.       blockRAF.setLength(newlen);
  872.  
  873.       //read last chunk
  874.       blockRAF.seek(lastchunkoffset);
  875.       blockRAF.readFully(b, 0, lastchunksize);
  876.     } finally {
  877.       blockRAF.close();
  878.     }
  879.     //compute checksum
  880.     dcs.update(b, 0, lastchunksize);
  881.     dcs.writeValue(b, 0, false);
  882.     //update metaFile 
  883.     RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
  884.     try {
  885.       metaRAF.setLength(newmetalen);
  886.       metaRAF.seek(newmetalen - checksumsize);
  887.       metaRAF.write(b, 0, checksumsize);
  888.     } finally {
  889.       metaRAF.close();
  890.     }
  891.   }
  892.   private final static String DISK_ERROR = "Possible disk error on file creation: ";
  893.   /** Get the cause of an I/O exception if caused by a possible disk error
  894.    * @param ioe an I/O exception
  895.    * @return cause if the I/O exception is caused by a possible disk error;
  896.    *         null otherwise.
  897.    */ 
  898.   static IOException getCauseIfDiskError(IOException ioe) {
  899.     if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
  900.       return (IOException)ioe.getCause();
  901.     } else {
  902.       return null;
  903.     }
  904.   }
  905.   /**
  906.    * Start writing to a block file
  907.    * If isRecovery is true and the block pre-exists, then we kill all
  908.       volumeMap.put(b, v);
  909.       volumeMap.put(b, v);
  910.    * other threads that might be writing to this block, and then reopen the file.
  911.    */
  912.   public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
  913.     //
  914.     // Make sure the block isn't a valid one - we're still creating it!
  915.     //
  916.     if (isValidBlock(b)) {
  917.       if (!isRecovery) {
  918.         throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
  919.       }
  920.       // If the block was successfully finalized because all packets
  921.       // were successfully processed at the Datanode but the ack for
  922.       // some of the packets were not received by the client. The client 
  923.       // re-opens the connection and retries sending those packets.
  924.       // The other reason is that an "append" is occurring to this block.
  925.       detachBlock(b, 1);
  926.     }
  927.     long blockSize = b.getNumBytes();
  928.     //
  929.     // Serialize access to /tmp, and check if file already there.
  930.     //
  931.     File f = null;
  932.     List<Thread> threads = null;
  933.     synchronized (this) {
  934.       //
  935.       // Is it already in the create process?
  936.       //
  937.       ActiveFile activeFile = ongoingCreates.get(b);
  938.       if (activeFile != null) {
  939.         f = activeFile.file;
  940.         threads = activeFile.threads;
  941.         
  942.         if (!isRecovery) {
  943.           throw new BlockAlreadyExistsException("Block " + b +
  944.                                   " has already been started (though not completed), and thus cannot be created.");
  945.         } else {
  946.           for (Thread thread:threads) {
  947.             thread.interrupt();
  948.           }
  949.         }
  950.         ongoingCreates.remove(b);
  951.       }
  952.       FSVolume v = null;
  953.       if (!isRecovery) {
  954.         v = volumes.getNextVolume(blockSize);
  955.         // create temporary file to hold block in the designated volume
  956.         f = createTmpFile(v, b);
  957.         volumeMap.put(b, new DatanodeBlockInfo(v));
  958.       } else if (f != null) {
  959.         DataNode.LOG.info("Reopen already-open Block for append " + b);
  960.         // create or reuse temporary file to hold block in the designated volume
  961.         v = volumeMap.get(b).getVolume();
  962.         volumeMap.put(b, new DatanodeBlockInfo(v));
  963.       } else {
  964.         // reopening block for appending to it.
  965.         DataNode.LOG.info("Reopen Block for append " + b);
  966.         v = volumeMap.get(b).getVolume();
  967.         f = createTmpFile(v, b);
  968.         File blkfile = getBlockFile(b);
  969.         File oldmeta = getMetaFile(b);
  970.         File newmeta = getMetaFile(f, b);
  971.         // rename meta file to tmp directory
  972.         DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
  973.         if (!oldmeta.renameTo(newmeta)) {
  974.           throw new IOException("Block " + b + " reopen failed. " +
  975.                                 " Unable to move meta file  " + oldmeta +
  976.                                 " to tmp dir " + newmeta);
  977.         }
  978.         // rename block file to tmp directory
  979.         DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
  980.         if (!blkfile.renameTo(f)) {
  981.           if (!f.delete()) {
  982.             throw new IOException("Block " + b + " reopen failed. " +
  983.                                   " Unable to remove file " + f);
  984.           }
  985.           if (!blkfile.renameTo(f)) {
  986.             throw new IOException("Block " + b + " reopen failed. " +
  987.                                   " Unable to move block file " + blkfile +
  988.                                   " to tmp dir " + f);
  989.           }
  990.         }
  991.         volumeMap.put(b, new DatanodeBlockInfo(v));
  992.       }
  993.       if (f == null) {
  994.         DataNode.LOG.warn("Block " + b + " reopen failed " +
  995.                           " Unable to locate tmp file.");
  996.         throw new IOException("Block " + b + " reopen failed " +
  997.                               " Unable to locate tmp file.");
  998.       }
  999.       ongoingCreates.put(b, new ActiveFile(f, threads));
  1000.     }
  1001.     try {
  1002.       if (threads != null) {
  1003.         for (Thread thread:threads) {
  1004.           thread.join();
  1005.         }
  1006.       }
  1007.     } catch (InterruptedException e) {
  1008.       throw new IOException("Recovery waiting for thread interrupted.");
  1009.     }
  1010.     //
  1011.     // Finally, allow a writer to the block file
  1012.     // REMIND - mjc - make this a filter stream that enforces a max
  1013.     // block size, so clients can't go crazy
  1014.     //
  1015.     File metafile = getMetaFile(f, b);
  1016.     DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
  1017.     DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
  1018.     return createBlockWriteStreams( f , metafile);
  1019.   }
  1020.   /**
  1021.    * Retrieves the offset in the block to which the
  1022.    * the next write will write data to.
  1023.    */
  1024.   public long getChannelPosition(Block b, BlockWriteStreams streams) 
  1025.                                  throws IOException {
  1026.     FileOutputStream file = (FileOutputStream) streams.dataOut;
  1027.     return file.getChannel().position();
  1028.   }
  1029.   /**
  1030.    * Sets the offset in the block to which the
  1031.    * the next write will write data to.
  1032.    */
  1033.   public void setChannelPosition(Block b, BlockWriteStreams streams, 
  1034.                                  long dataOffset, long ckOffset) 
  1035.                                  throws IOException {
  1036.     long size = 0;
  1037.     synchronized (this) {
  1038.       FSVolume vol = volumeMap.get(b).getVolume();
  1039.       size = vol.getTmpFile(b).length();
  1040.     }
  1041.     if (size < dataOffset) {
  1042.       String msg = "Trying to change block file offset of block " + b +
  1043.                      " to " + dataOffset +
  1044.                      " but actual size of file is " +
  1045.                      size;
  1046.       throw new IOException(msg);
  1047.     }
  1048.     FileOutputStream file = (FileOutputStream) streams.dataOut;
  1049.     file.getChannel().position(dataOffset);
  1050.     file = (FileOutputStream) streams.checksumOut;
  1051.     file.getChannel().position(ckOffset);
  1052.   }
  1053.   synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
  1054.     if ( vol == null ) {
  1055.       vol = volumeMap.get( blk ).getVolume();
  1056.       if ( vol == null ) {
  1057.         throw new IOException("Could not find volume for block " + blk);
  1058.       }
  1059.     }
  1060.     return vol.createTmpFile(blk);
  1061.   }
  1062.   //
  1063.   // REMIND - mjc - eventually we should have a timeout system
  1064.   // in place to clean up block files left by abandoned clients.
  1065.   // We should have some timer in place, so that if a blockfile
  1066.   // is created but non-valid, and has been idle for >48 hours,
  1067.   // we can GC it safely.
  1068.   //
  1069.   /**
  1070.    * Complete the block write!
  1071.    */
  1072.   public synchronized void finalizeBlock(Block b) throws IOException {
  1073.     ActiveFile activeFile = ongoingCreates.get(b);
  1074.     if (activeFile == null) {
  1075.       throw new IOException("Block " + b + " is already finalized.");
  1076.     }
  1077.     File f = activeFile.file;
  1078.     if (f == null || !f.exists()) {
  1079.       throw new IOException("No temporary file " + f + " for block " + b);
  1080.     }
  1081.     FSVolume v = volumeMap.get(b).getVolume();
  1082.     if (v == null) {
  1083.       throw new IOException("No volume for temporary file " + f + 
  1084.                             " for block " + b);
  1085.     }
  1086.         
  1087.     File dest = null;
  1088.     dest = v.addBlock(b, f);
  1089.     volumeMap.put(b, new DatanodeBlockInfo(v, dest));
  1090.     ongoingCreates.remove(b);
  1091.   }
  1092.   /**
  1093.    * Remove the temporary block file (if any)
  1094.    */
  1095.   public synchronized void unfinalizeBlock(Block b) throws IOException {
  1096.     // remove the block from in-memory data structure
  1097.     ActiveFile activefile = ongoingCreates.remove(b);
  1098.     if (activefile == null) {
  1099.       return;
  1100.     }
  1101.     volumeMap.remove(b);
  1102.     
  1103.     // delete the on-disk temp file
  1104.     if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
  1105.       DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
  1106.     }
  1107.   }
  1108.   /**
  1109.    * Remove a block from disk
  1110.    * @param blockFile block file
  1111.    * @param metaFile block meta file
  1112.    * @param b a block
  1113.    * @return true if on-disk files are deleted; false otherwise
  1114.    */
  1115.   private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
  1116.     if (blockFile == null) {
  1117.       DataNode.LOG.warn("No file exists for block: " + b);
  1118.       return true;
  1119.     }
  1120.     
  1121.     if (!blockFile.delete()) {
  1122.       DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
  1123.       return false;
  1124.     } else { // remove the meta file
  1125.       if (metaFile != null && !metaFile.delete()) {
  1126.         DataNode.LOG.warn(
  1127.             "Not able to delete the meta block file: " + metaFile);
  1128.         return false;
  1129.       }
  1130.     }
  1131.     return true;
  1132.   }
  1133.   
  1134.   /**
  1135.    * Return a table of block data
  1136.    */
  1137.   public Block[] getBlockReport() {
  1138.     TreeSet<Block> blockSet = new TreeSet<Block>();
  1139.     volumes.getBlockInfo(blockSet);
  1140.     Block blockTable[] = new Block[blockSet.size()];
  1141.     int i = 0;
  1142.     for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
  1143.       blockTable[i] = it.next();
  1144.     }
  1145.     return blockTable;
  1146.   }
  1147.   /**
  1148.    * Check whether the given block is a valid one.
  1149.    */
  1150.   public boolean isValidBlock(Block b) {
  1151.     return validateBlockFile(b) != null;
  1152.   }
  1153.   /**
  1154.    * Find the file corresponding to the block and return it if it exists.
  1155.    */
  1156.   File validateBlockFile(Block b) {
  1157.     //Should we check for metadata file too?
  1158.     File f = getFile(b);
  1159.     if(f != null && f.exists())
  1160.       return f;
  1161.     if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
  1162.       InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
  1163.     }
  1164.     return null;
  1165.   }
  1166.   /** {@inheritDoc} */
  1167.   public void validateBlockMetadata(Block b) throws IOException {
  1168.     DatanodeBlockInfo info = volumeMap.get(b);
  1169.     if (info == null) {
  1170.       throw new IOException("Block " + b + " does not exist in volumeMap.");
  1171.     }
  1172.     FSVolume v = info.getVolume();
  1173.     File tmp = v.getTmpFile(b);
  1174.     File f = getFile(b);
  1175.     if (f == null) {
  1176.       f = tmp;
  1177.     }
  1178.     if (f == null) {
  1179.       throw new IOException("Block " + b + " does not exist on disk.");
  1180.     }
  1181.     if (!f.exists()) {
  1182.       throw new IOException("Block " + b + 
  1183.                             " block file " + f +
  1184.                             " does not exist on disk.");
  1185.     }
  1186.     if (b.getNumBytes() != f.length()) {
  1187.       throw new IOException("Block " + b + 
  1188.                             " length is " + b.getNumBytes()  +
  1189.                             " does not match block file length " +
  1190.                             f.length());
  1191.     }
  1192.     File meta = getMetaFile(f, b);
  1193.     if (meta == null) {
  1194.       throw new IOException("Block " + b + 
  1195.                             " metafile does not exist.");
  1196.     }
  1197.     if (!meta.exists()) {
  1198.       throw new IOException("Block " + b + 
  1199.                             " metafile " + meta +
  1200.                             " does not exist on disk.");
  1201.     }
  1202.     if (meta.length() == 0) {
  1203.       throw new IOException("Block " + b + " metafile " + meta + " is empty.");
  1204.     }
  1205.     long stamp = parseGenerationStamp(f, meta);
  1206.     if (stamp != b.getGenerationStamp()) {
  1207.       throw new IOException("Block " + b + 
  1208.                             " genstamp is " + b.getGenerationStamp()  +
  1209.                             " does not match meta file stamp " +
  1210.                             stamp);
  1211.     }
  1212.   }
  1213.   /**
  1214.    * We're informed that a block is no longer valid.  We
  1215.    * could lazily garbage-collect the block, but why bother?
  1216.    * just get rid of it.
  1217.    */
  1218.   public void invalidate(Block invalidBlks[]) throws IOException {
  1219.     boolean error = false;
  1220.     for (int i = 0; i < invalidBlks.length; i++) {
  1221.       File f = null;
  1222.       FSVolume v;
  1223.       synchronized (this) {
  1224.         f = getFile(invalidBlks[i]);
  1225.         DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
  1226.         if (dinfo == null) {
  1227.           DataNode.LOG.warn("Unexpected error trying to delete block "
  1228.                            + invalidBlks[i] + 
  1229.                            ". BlockInfo not found in volumeMap.");
  1230.           error = true;
  1231.           continue;
  1232.         }
  1233.         v = dinfo.getVolume();
  1234.         if (f == null) {
  1235.           DataNode.LOG.warn("Unexpected error trying to delete block "
  1236.                             + invalidBlks[i] + 
  1237.                             ". Block not found in blockMap." +
  1238.                             ((v == null) ? " " : " Block found in volumeMap."));
  1239.           error = true;
  1240.           continue;
  1241.         }
  1242.         if (v == null) {
  1243.           DataNode.LOG.warn("Unexpected error trying to delete block "
  1244.                             + invalidBlks[i] + 
  1245.                             ". No volume for this block." +
  1246.                             " Block found in blockMap. " + f + ".");
  1247.           error = true;
  1248.           continue;
  1249.         }
  1250.         File parent = f.getParentFile();
  1251.         if (parent == null) {
  1252.           DataNode.LOG.warn("Unexpected error trying to delete block "
  1253.                             + invalidBlks[i] + 
  1254.                             ". Parent not found for file " + f + ".");
  1255.           error = true;
  1256.           continue;
  1257.         }
  1258.         v.clearPath(parent);
  1259.         volumeMap.remove(invalidBlks[i]);
  1260.       }
  1261.       File metaFile = getMetaFile( f, invalidBlks[i] );
  1262.       long blockSize = f.length()+metaFile.length();
  1263.       if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
  1264.         DataNode.LOG.warn("Unexpected error trying to delete block "
  1265.                           + invalidBlks[i] + " at file " + f);
  1266.         error = true;
  1267.         continue;
  1268.       }
  1269.       v.decDfsUsed(blockSize);
  1270.       DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
  1271.       if (f.exists()) {
  1272.         //
  1273.         // This is a temporary check especially for hadoop-1220. 
  1274.         // This will go away in the future.
  1275.         //
  1276.         DataNode.LOG.info("File " + f + " was deleted but still exists!");
  1277.       }
  1278.     }
  1279.     if (error) {
  1280.       throw new IOException("Error in deleting blocks.");
  1281.     }
  1282.   }
  1283.   /**
  1284.    * Turn the block identifier into a filename.
  1285.    */
  1286.   public synchronized File getFile(Block b) {
  1287.     DatanodeBlockInfo info = volumeMap.get(b);
  1288.     if (info != null) {
  1289.       return info.getFile();
  1290.     }
  1291.     return null;
  1292.   }
  1293.   /**
  1294.    * check if a data directory is healthy
  1295.    * @throws DiskErrorException
  1296.    */
  1297.   public void checkDataDir() throws DiskErrorException {
  1298.     volumes.checkDirs();
  1299.   }
  1300.     
  1301.   public String toString() {
  1302.     return "FSDataset{dirpath='"+volumes+"'}";
  1303.   }
  1304.   private ObjectName mbeanName;
  1305.   private Random rand = new Random();
  1306.   
  1307.   /**
  1308.    * Register the FSDataset MBean using the name
  1309.    *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
  1310.    */
  1311.   void registerMBean(final String storageId) {
  1312.     // We wrap to bypass standard mbean naming convetion.
  1313.     // This wraping can be removed in java 6 as it is more flexible in 
  1314.     // package naming for mbeans and their impl.
  1315.     StandardMBean bean;
  1316.     String storageName;
  1317.     if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
  1318.       storageName = "UndefinedStorageId" + rand.nextInt();
  1319.     } else {
  1320.       storageName = storageId;
  1321.     }
  1322.     try {
  1323.       bean = new StandardMBean(this,FSDatasetMBean.class);
  1324.       mbeanName = MBeanUtil.registerMBean("DataNode", "FSDatasetState-" + storageName, bean);
  1325.     } catch (NotCompliantMBeanException e) {
  1326.       e.printStackTrace();
  1327.     }
  1328.  
  1329.     DataNode.LOG.info("Registered FSDatasetStatusMBean");
  1330.   }
  1331.   public void shutdown() {
  1332.     if (mbeanName != null)
  1333.       MBeanUtil.unregisterMBean(mbeanName);
  1334.     
  1335.     if(volumes != null) {
  1336.       for (FSVolume volume : volumes.volumes) {
  1337.         if(volume != null) {
  1338.           volume.dfsUsage.shutdown();
  1339.         }
  1340.       }
  1341.     }
  1342.   }
  1343.   public String getStorageInfo() {
  1344.     return toString();
  1345.   }
  1346. }