SimulatedFSDataset.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.datanode;
  19. import java.io.IOException;
  20. import java.io.InputStream;
  21. import java.io.OutputStream;
  22. import java.util.Arrays;
  23. import java.util.HashMap;
  24. import java.util.Random;
  25. import javax.management.NotCompliantMBeanException;
  26. import javax.management.ObjectName;
  27. import javax.management.StandardMBean;
  28. import org.apache.hadoop.conf.Configurable;
  29. import org.apache.hadoop.conf.Configuration;
  30. import org.apache.hadoop.hdfs.protocol.Block;
  31. import org.apache.hadoop.hdfs.protocol.FSConstants;
  32. import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
  33. import org.apache.hadoop.metrics.util.MBeanUtil;
  34. import org.apache.hadoop.util.DataChecksum;
  35. import org.apache.hadoop.util.DiskChecker.DiskErrorException;
  36. /**
  37.  * This class implements a simulated FSDataset.
  38.  * 
  39.  * Blocks that are created are recorded but their data (plus their CRCs) are
  40.  *  discarded.
  41.  * Fixed data is returned when blocks are read; a null CRC meta file is
  42.  * created for such data.
  43.  * 
  44.  * This FSDataset does not remember any block information across its
  45.  * restarts; it does however offer an operation to inject blocks
  46.  *  (See the TestInectionForSImulatedStorage()
  47.  * for a usage example of injection.
  48.  * 
  49.  * Note the synchronization is coarse grained - it is at each method. 
  50.  */
  51. public class SimulatedFSDataset  implements FSConstants, FSDatasetInterface, Configurable{
  52.   
  53.   public static final String CONFIG_PROPERTY_SIMULATED =
  54.                                     "dfs.datanode.simulateddatastorage";
  55.   public static final String CONFIG_PROPERTY_CAPACITY =
  56.                             "dfs.datanode.simulateddatastorage.capacity";
  57.   
  58.   public static final long DEFAULT_CAPACITY = 2L<<40; // 1 terabyte
  59.   public static final byte DEFAULT_DATABYTE = 9; // 1 terabyte
  60.   byte simulatedDataByte = DEFAULT_DATABYTE;
  61.   Configuration conf = null;
  62.   
  63.   static byte[] nullCrcFileData;
  64.   {
  65.     DataChecksum checksum = DataChecksum.newDataChecksum( DataChecksum.
  66.                               CHECKSUM_NULL, 16*1024 );
  67.     byte[] nullCrcHeader = checksum.getHeader();
  68.     nullCrcFileData =  new byte[2 + nullCrcHeader.length];
  69.     nullCrcFileData[0] = (byte) ((FSDataset.METADATA_VERSION >>> 8) & 0xff);
  70.     nullCrcFileData[1] = (byte) (FSDataset.METADATA_VERSION & 0xff);
  71.     for (int i = 0; i < nullCrcHeader.length; i++) {
  72.       nullCrcFileData[i+2] = nullCrcHeader[i];
  73.     }
  74.   }
  75.   
  76.   private class BInfo { // information about a single block
  77.     Block theBlock;
  78.     private boolean finalized = false; // if not finalized => ongoing creation
  79.     SimulatedOutputStream oStream = null;
  80.     BInfo(Block b, boolean forWriting) throws IOException {
  81.       theBlock = new Block(b);
  82.       if (theBlock.getNumBytes() < 0) {
  83.         theBlock.setNumBytes(0);
  84.       }
  85.       if (!storage.alloc(theBlock.getNumBytes())) { // expected length - actual length may
  86.                                           // be more - we find out at finalize
  87.         DataNode.LOG.warn("Lack of free storage on a block alloc");
  88.         throw new IOException("Creating block, no free space available");
  89.       }
  90.       if (forWriting) {
  91.         finalized = false;
  92.         oStream = new SimulatedOutputStream();
  93.       } else {
  94.         finalized = true;
  95.         oStream = null;
  96.       }
  97.     }
  98.     synchronized long getGenerationStamp() {
  99.       return theBlock.getGenerationStamp();
  100.     }
  101.     synchronized void updateBlock(Block b) {
  102.       theBlock.setGenerationStamp(b.getGenerationStamp());
  103.       setlength(b.getNumBytes());
  104.     }
  105.     
  106.     synchronized long getlength() {
  107.       if (!finalized) {
  108.          return oStream.getLength();
  109.       } else {
  110.         return theBlock.getNumBytes();
  111.       }
  112.     }
  113.     synchronized void setlength(long length) {
  114.       if (!finalized) {
  115.          oStream.setLength(length);
  116.       } else {
  117.         theBlock.setNumBytes(length);
  118.       }
  119.     }
  120.     
  121.     synchronized SimulatedInputStream getIStream() throws IOException {
  122.       if (!finalized) {
  123.         // throw new IOException("Trying to read an unfinalized block");
  124.          return new SimulatedInputStream(oStream.getLength(), DEFAULT_DATABYTE);
  125.       } else {
  126.         return new SimulatedInputStream(theBlock.getNumBytes(), DEFAULT_DATABYTE);
  127.       }
  128.     }
  129.     
  130.     synchronized void finalizeBlock(long finalSize) throws IOException {
  131.       if (finalized) {
  132.         throw new IOException(
  133.             "Finalizing a block that has already been finalized" + 
  134.             theBlock.getBlockId());
  135.       }
  136.       if (oStream == null) {
  137.         DataNode.LOG.error("Null oStream on unfinalized block - bug");
  138.         throw new IOException("Unexpected error on finalize");
  139.       }
  140.       if (oStream.getLength() != finalSize) {
  141.         DataNode.LOG.warn("Size passed to finalize (" + finalSize +
  142.                     ")does not match what was written:" + oStream.getLength());
  143.         throw new IOException(
  144.           "Size passed to finalize does not match the amount of data written");
  145.       }
  146.       // We had allocated the expected length when block was created; 
  147.       // adjust if necessary
  148.       long extraLen = finalSize - theBlock.getNumBytes();
  149.       if (extraLen > 0) {
  150.         if (!storage.alloc(extraLen)) {
  151.           DataNode.LOG.warn("Lack of free storage on a block alloc");
  152.           throw new IOException("Creating block, no free space available");
  153.         }
  154.       } else {
  155.         storage.free(-extraLen);
  156.       }
  157.       theBlock.setNumBytes(finalSize);  
  158.       finalized = true;
  159.       oStream = null;
  160.       return;
  161.     }
  162.     
  163.     SimulatedInputStream getMetaIStream() {
  164.       return new SimulatedInputStream(nullCrcFileData);  
  165.     }
  166.     synchronized boolean isFinalized() {
  167.       return finalized;
  168.     }
  169.   }
  170.   
  171.   static private class SimulatedStorage {
  172.     private long capacity;  // in bytes
  173.     private long used;    // in bytes
  174.     
  175.     synchronized long getFree() {
  176.       return capacity - used;
  177.     }
  178.     
  179.     synchronized long getCapacity() {
  180.       return capacity;
  181.     }
  182.     
  183.     synchronized long getUsed() {
  184.       return used;
  185.     }
  186.     
  187.     synchronized boolean alloc(long amount) {
  188.       if (getFree() >= amount) {
  189.         used += amount;
  190.         return true;
  191.       } else {
  192.         return false;    
  193.       }
  194.     }
  195.     
  196.     synchronized void free(long amount) {
  197.       used -= amount;
  198.     }
  199.     
  200.     SimulatedStorage(long cap) {
  201.       capacity = cap;
  202.       used = 0;   
  203.     }
  204.   }
  205.   
  206.   private HashMap<Block, BInfo> blockMap = null;
  207.   private SimulatedStorage storage = null;
  208.   private String storageId;
  209.   
  210.   public SimulatedFSDataset(Configuration conf) throws IOException {
  211.     setConf(conf);
  212.   }
  213.   
  214.   private SimulatedFSDataset() { // real construction when setConf called.. Uggg
  215.   }
  216.   
  217.   public Configuration getConf() {
  218.     return conf;
  219.   }
  220.   public void setConf(Configuration iconf)  {
  221.     conf = iconf;
  222.     storageId = conf.get("StorageId", "unknownStorageId" +
  223.                                         new Random().nextInt());
  224.     registerMBean(storageId);
  225.     storage = new SimulatedStorage(
  226.         conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
  227.     //DataNode.LOG.info("Starting Simulated storage; Capacity = " + getCapacity() + 
  228.     //    "Used = " + getDfsUsed() + "Free =" + getRemaining());
  229.     blockMap = new HashMap<Block,BInfo>(); 
  230.   }
  231.   public synchronized void injectBlocks(Block[] injectBlocks)
  232.                                             throws IOException {
  233.     if (injectBlocks != null) {
  234.       for (Block b: injectBlocks) { // if any blocks in list is bad, reject list
  235.         if (b == null) {
  236.           throw new NullPointerException("Null blocks in block list");
  237.         }
  238.         if (isValidBlock(b)) {
  239.           throw new IOException("Block already exists in  block list");
  240.         }
  241.       }
  242.       HashMap<Block, BInfo> oldBlockMap = blockMap;
  243.       blockMap = 
  244.           new HashMap<Block,BInfo>(injectBlocks.length + oldBlockMap.size());
  245.       blockMap.putAll(oldBlockMap);
  246.       for (Block b: injectBlocks) {
  247.           BInfo binfo = new BInfo(b, false);
  248.           blockMap.put(b, binfo);
  249.       }
  250.     }
  251.   }
  252.   public synchronized void finalizeBlock(Block b) throws IOException {
  253.     BInfo binfo = blockMap.get(b);
  254.     if (binfo == null) {
  255.       throw new IOException("Finalizing a non existing block " + b);
  256.     }
  257.     binfo.finalizeBlock(b.getNumBytes());
  258.   }
  259.   public synchronized void unfinalizeBlock(Block b) throws IOException {
  260.     if (isBeingWritten(b)) {
  261.       blockMap.remove(b);
  262.     }
  263.   }
  264.   public synchronized Block[] getBlockReport() {
  265.     Block[] blockTable = new Block[blockMap.size()];
  266.     int count = 0;
  267.     for (BInfo b : blockMap.values()) {
  268.       if (b.isFinalized()) {
  269.         blockTable[count++] = b.theBlock;
  270.       }
  271.     }
  272.     if (count != blockTable.length) {
  273.       blockTable = Arrays.copyOf(blockTable, count);
  274.     }
  275.     return blockTable;
  276.   }
  277.   public long getCapacity() throws IOException {
  278.     return storage.getCapacity();
  279.   }
  280.   public long getDfsUsed() throws IOException {
  281.     return storage.getUsed();
  282.   }
  283.   public long getRemaining() throws IOException {
  284.     return storage.getFree();
  285.   }
  286.   public synchronized long getLength(Block b) throws IOException {
  287.     BInfo binfo = blockMap.get(b);
  288.     if (binfo == null) {
  289.       throw new IOException("Finalizing a non existing block " + b);
  290.     }
  291.     return binfo.getlength();
  292.   }
  293.   /** {@inheritDoc} */
  294.   public Block getStoredBlock(long blkid) throws IOException {
  295.     Block b = new Block(blkid);
  296.     BInfo binfo = blockMap.get(b);
  297.     if (binfo == null) {
  298.       return null;
  299.     }
  300.     b.setGenerationStamp(binfo.getGenerationStamp());
  301.     b.setNumBytes(binfo.getlength());
  302.     return b;
  303.   }
  304.   /** {@inheritDoc} */
  305.   public void updateBlock(Block oldblock, Block newblock) throws IOException {
  306.     BInfo binfo = blockMap.get(newblock);
  307.     if (binfo == null) {
  308.       throw new IOException("BInfo not found, b=" + newblock);
  309.     }
  310.     binfo.updateBlock(newblock);
  311.   }
  312.   public synchronized void invalidate(Block[] invalidBlks) throws IOException {
  313.     boolean error = false;
  314.     if (invalidBlks == null) {
  315.       return;
  316.     }
  317.     for (Block b: invalidBlks) {
  318.       if (b == null) {
  319.         continue;
  320.       }
  321.       BInfo binfo = blockMap.get(b);
  322.       if (binfo == null) {
  323.         error = true;
  324.         DataNode.LOG.warn("Invalidate: Missing block");
  325.         continue;
  326.       }
  327.       storage.free(binfo.getlength());
  328.       blockMap.remove(b);
  329.     }
  330.       if (error) {
  331.           throw new IOException("Invalidate: Missing blocks.");
  332.       }
  333.   }
  334.   public synchronized boolean isValidBlock(Block b) {
  335.     // return (blockMap.containsKey(b));
  336.     BInfo binfo = blockMap.get(b);
  337.     if (binfo == null) {
  338.       return false;
  339.     }
  340.     return binfo.isFinalized();
  341.   }
  342.   /* check if a block is created but not finalized */
  343.   private synchronized boolean isBeingWritten(Block b) {
  344.     BInfo binfo = blockMap.get(b);
  345.     if (binfo == null) {
  346.       return false;
  347.     }
  348.     return !binfo.isFinalized();  
  349.   }
  350.   
  351.   public String toString() {
  352.     return getStorageInfo();
  353.   }
  354.   public synchronized BlockWriteStreams writeToBlock(Block b, 
  355.                                             boolean isRecovery)
  356.                                             throws IOException {
  357.     if (isValidBlock(b)) {
  358.           throw new BlockAlreadyExistsException("Block " + b + 
  359.               " is valid, and cannot be written to.");
  360.       }
  361.     if (isBeingWritten(b)) {
  362.         throw new BlockAlreadyExistsException("Block " + b + 
  363.             " is being written, and cannot be written to.");
  364.     }
  365.       BInfo binfo = new BInfo(b, true);
  366.       blockMap.put(b, binfo);
  367.       SimulatedOutputStream crcStream = new SimulatedOutputStream();
  368.       return new BlockWriteStreams(binfo.oStream, crcStream);
  369.   }
  370.   public synchronized InputStream getBlockInputStream(Block b)
  371.                                             throws IOException {
  372.     BInfo binfo = blockMap.get(b);
  373.     if (binfo == null) {
  374.       throw new IOException("No such Block " + b );  
  375.     }
  376.     
  377.     //DataNode.LOG.info("Opening block(" + b.blkid + ") of length " + b.len);
  378.     return binfo.getIStream();
  379.   }
  380.   
  381.   public synchronized InputStream getBlockInputStream(Block b, long seekOffset)
  382.                               throws IOException {
  383.     InputStream result = getBlockInputStream(b);
  384.     result.skip(seekOffset);
  385.     return result;
  386.   }
  387.   /** Not supported */
  388.   public BlockInputStreams getTmpInputStreams(Block b, long blkoff, long ckoff
  389.       ) throws IOException {
  390.     throw new IOException("Not supported");
  391.   }
  392.   /** No-op */
  393.   public void validateBlockMetadata(Block b) {
  394.   }
  395.   /**
  396.    * Returns metaData of block b as an input stream
  397.    * @param b - the block for which the metadata is desired
  398.    * @return metaData of block b as an input stream
  399.    * @throws IOException - block does not exist or problems accessing
  400.    *  the meta file
  401.    */
  402.   private synchronized InputStream getMetaDataInStream(Block b)
  403.                                               throws IOException {
  404.     BInfo binfo = blockMap.get(b);
  405.     if (binfo == null) {
  406.       throw new IOException("No such Block " + b );  
  407.     }
  408.     if (!binfo.finalized) {
  409.       throw new IOException("Block " + b + 
  410.           " is being written, its meta cannot be read");
  411.     }
  412.     return binfo.getMetaIStream();
  413.   }
  414.   public synchronized long getMetaDataLength(Block b) throws IOException {
  415.     BInfo binfo = blockMap.get(b);
  416.     if (binfo == null) {
  417.       throw new IOException("No such Block " + b );  
  418.     }
  419.     if (!binfo.finalized) {
  420.       throw new IOException("Block " + b +
  421.           " is being written, its metalength cannot be read");
  422.     }
  423.     return binfo.getMetaIStream().getLength();
  424.   }
  425.   
  426.   public MetaDataInputStream getMetaDataInputStream(Block b)
  427.   throws IOException {
  428.        return new MetaDataInputStream(getMetaDataInStream(b),
  429.                                                 getMetaDataLength(b));
  430.   }
  431.   public synchronized boolean metaFileExists(Block b) throws IOException {
  432.     if (!isValidBlock(b)) {
  433.           throw new IOException("Block " + b +
  434.               " is valid, and cannot be written to.");
  435.       }
  436.     return true; // crc exists for all valid blocks
  437.   }
  438.   public void checkDataDir() throws DiskErrorException {
  439.     // nothing to check for simulated data set
  440.   }
  441.   public synchronized long getChannelPosition(Block b, 
  442.                                               BlockWriteStreams stream)
  443.                                               throws IOException {
  444.     BInfo binfo = blockMap.get(b);
  445.     if (binfo == null) {
  446.       throw new IOException("No such Block " + b );
  447.     }
  448.     return binfo.getlength();
  449.   }
  450.   public synchronized void setChannelPosition(Block b, BlockWriteStreams stream, 
  451.                                               long dataOffset, long ckOffset)
  452.                                               throws IOException {
  453.     BInfo binfo = blockMap.get(b);
  454.     if (binfo == null) {
  455.       throw new IOException("No such Block " + b );
  456.     }
  457.     binfo.setlength(dataOffset);
  458.   }
  459.   /** 
  460.    * Simulated input and output streams
  461.    *
  462.    */
  463.   static private class SimulatedInputStream extends java.io.InputStream {
  464.     
  465.     byte theRepeatedData = 7;
  466.     long length; // bytes
  467.     int currentPos = 0;
  468.     byte[] data = null;
  469.     
  470.     /**
  471.      * An input stream of size l with repeated bytes
  472.      * @param l
  473.      * @param iRepeatedData
  474.      */
  475.     SimulatedInputStream(long l, byte iRepeatedData) {
  476.       length = l;
  477.       theRepeatedData = iRepeatedData;
  478.     }
  479.     
  480.     /**
  481.      * An input stream of of the supplied data
  482.      * 
  483.      * @param iData
  484.      */
  485.     SimulatedInputStream(byte[] iData) {
  486.       data = iData;
  487.       length = data.length;
  488.       
  489.     }
  490.     
  491.     /**
  492.      * 
  493.      * @return the lenght of the input stream
  494.      */
  495.     long getLength() {
  496.       return length;
  497.     }
  498.     @Override
  499.     public int read() throws IOException {
  500.       if (currentPos >= length)
  501.         return -1;
  502.       if (data !=null) {
  503.         return data[currentPos++];
  504.       } else {
  505.         currentPos++;
  506.         return theRepeatedData;
  507.       }
  508.     }
  509.     
  510.     @Override
  511.     public int read(byte[] b) throws IOException { 
  512.       if (b == null) {
  513.         throw new NullPointerException();
  514.       }
  515.       if (b.length == 0) {
  516.         return 0;
  517.       }
  518.       if (currentPos >= length) { // EOF
  519.         return -1;
  520.       }
  521.       int bytesRead = (int) Math.min(b.length, length-currentPos);
  522.       if (data != null) {
  523.         System.arraycopy(data, currentPos, b, 0, bytesRead);
  524.       } else { // all data is zero
  525.         for (int i : b) {  
  526.           b[i] = theRepeatedData;
  527.         }
  528.       }
  529.       currentPos += bytesRead;
  530.       return bytesRead;
  531.     }
  532.   }
  533.   
  534.   /**
  535.    * This class implements an output stream that merely throws its data away, but records its
  536.    * length.
  537.    *
  538.    */
  539.   static private class SimulatedOutputStream extends OutputStream {
  540.     long length = 0;
  541.     
  542.     /**
  543.      * constructor for Simulated Output Steram
  544.      */
  545.     SimulatedOutputStream() {
  546.     }
  547.     
  548.     /**
  549.      * 
  550.      * @return the length of the data created so far.
  551.      */
  552.     long getLength() {
  553.       return length;
  554.     }
  555.     /**
  556.      */
  557.     void setLength(long length) {
  558.       this.length = length;
  559.     }
  560.     
  561.     @Override
  562.     public void write(int arg0) throws IOException {
  563.       length++;
  564.     }
  565.     
  566.     @Override
  567.     public void write(byte[] b) throws IOException {
  568.       length += b.length;
  569.     }
  570.     
  571.     @Override
  572.     public void write(byte[] b,
  573.               int off,
  574.               int len) throws IOException  {
  575.       length += len;
  576.     }
  577.   }
  578.   
  579.   private ObjectName mbeanName;
  580.   
  581.   /**
  582.    * Register the FSDataset MBean using the name
  583.    *        "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
  584.    *  We use storage id for MBean name since a minicluster within a single
  585.    * Java VM may have multiple Simulated Datanodes.
  586.    */
  587.   void registerMBean(final String storageId) {
  588.     // We wrap to bypass standard mbean naming convetion.
  589.     // This wraping can be removed in java 6 as it is more flexible in 
  590.     // package naming for mbeans and their impl.
  591.     StandardMBean bean;
  592.     try {
  593.       bean = new StandardMBean(this,FSDatasetMBean.class);
  594.       mbeanName = MBeanUtil.registerMBean("DataNode",
  595.           "FSDatasetState-" + storageId, bean);
  596.     } catch (NotCompliantMBeanException e) {
  597.       e.printStackTrace();
  598.     }
  599.  
  600.     DataNode.LOG.info("Registered FSDatasetStatusMBean");
  601.   }
  602.   public void shutdown() {
  603.     if (mbeanName != null)
  604.       MBeanUtil.unregisterMBean(mbeanName);
  605.   }
  606.   public String getStorageInfo() {
  607.     return "Simulated FSDataset-" + storageId;
  608.   }
  609. }