FSDataset.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:46k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.hdfs.server.datanode;
- import java.io.*;
- import java.util.*;
- import javax.management.NotCompliantMBeanException;
- import javax.management.ObjectName;
- import javax.management.StandardMBean;
- import org.apache.hadoop.fs.*;
- import org.apache.hadoop.hdfs.protocol.Block;
- import org.apache.hadoop.hdfs.protocol.FSConstants;
- import org.apache.hadoop.metrics.util.MBeanUtil;
- import org.apache.hadoop.util.DataChecksum;
- import org.apache.hadoop.util.DiskChecker;
- import org.apache.hadoop.util.DiskChecker.DiskErrorException;
- import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
- import org.apache.hadoop.conf.*;
- import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
- import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
- /**************************************************
- * FSDataset manages a set of data blocks. Each block
- * has a unique name and an extent on disk.
- *
- ***************************************************/
- public class FSDataset implements FSConstants, FSDatasetInterface {
- /**
- * A node type that can be built into a tree reflecting the
- * hierarchy of blocks on the local disk.
- */
- class FSDir {
- File dir;
- int numBlocks = 0;
- FSDir children[];
- int lastChildIdx = 0;
- /**
- */
- public FSDir(File dir)
- throws IOException {
- this.dir = dir;
- this.children = null;
- if (!dir.exists()) {
- if (!dir.mkdirs()) {
- throw new IOException("Mkdirs failed to create " +
- dir.toString());
- }
- } else {
- File[] files = dir.listFiles();
- int numChildren = 0;
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- numChildren++;
- } else if (Block.isBlockFilename(files[idx])) {
- numBlocks++;
- }
- }
- if (numChildren > 0) {
- children = new FSDir[numChildren];
- int curdir = 0;
- for (int idx = 0; idx < files.length; idx++) {
- if (files[idx].isDirectory()) {
- children[curdir] = new FSDir(files[idx]);
- curdir++;
- }
- }
- }
- }
- }
-
- public File addBlock(Block b, File src) throws IOException {
- //First try without creating subdirectories
- File file = addBlock(b, src, false, false);
- return (file != null) ? file : addBlock(b, src, true, true);
- }
- private File addBlock(Block b, File src, boolean createOk,
- boolean resetIdx) throws IOException {
- if (numBlocks < maxBlocksPerDir) {
- File dest = new File(dir, b.getBlockName());
- File metaData = getMetaFile( src, b );
- File newmeta = getMetaFile(dest, b);
- if ( ! metaData.renameTo( newmeta ) ||
- ! src.renameTo( dest ) ) {
- throw new IOException( "could not move files for " + b +
- " from tmp to " +
- dest.getAbsolutePath() );
- }
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("addBlock: Moved " + metaData + " to " + newmeta);
- DataNode.LOG.debug("addBlock: Moved " + src + " to " + dest);
- }
- numBlocks += 1;
- return dest;
- }
-
- if (lastChildIdx < 0 && resetIdx) {
- //reset so that all children will be checked
- lastChildIdx = random.nextInt(children.length);
- }
-
- if (lastChildIdx >= 0 && children != null) {
- //Check if any child-tree has room for a block.
- for (int i=0; i < children.length; i++) {
- int idx = (lastChildIdx + i)%children.length;
- File file = children[idx].addBlock(b, src, false, resetIdx);
- if (file != null) {
- lastChildIdx = idx;
- return file;
- }
- }
- lastChildIdx = -1;
- }
-
- if (!createOk) {
- return null;
- }
-
- if (children == null || children.length == 0) {
- children = new FSDir[maxBlocksPerDir];
- for (int idx = 0; idx < maxBlocksPerDir; idx++) {
- children[idx] = new FSDir(new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx));
- }
- }
-
- //now pick a child randomly for creating a new set of subdirs.
- lastChildIdx = random.nextInt(children.length);
- return children[ lastChildIdx ].addBlock(b, src, true, false);
- }
- /** Find the metadata file for the specified block file.
- * Return the generation stamp from the name of the metafile.
- */
- long getGenerationStampFromFile(File[] listdir, File blockFile) {
- String blockName = blockFile.getName();
- for (int j = 0; j < listdir.length; j++) {
- String path = listdir[j].getName();
- if (!path.startsWith(blockName)) {
- continue;
- }
- String[] vals = path.split("_");
- if (vals.length != 3) { // blk, blkid, genstamp.meta
- continue;
- }
- String[] str = vals[2].split("\.");
- if (str.length != 2) {
- continue;
- }
- return Long.parseLong(str[0]);
- }
- DataNode.LOG.warn("Block " + blockFile +
- " does not have a metafile!");
- return Block.GRANDFATHER_GENERATION_STAMP;
- }
- /**
- * Populate the given blockSet with any child blocks
- * found at this node.
- */
- public void getBlockInfo(TreeSet<Block> blockSet) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getBlockInfo(blockSet);
- }
- }
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
- blockSet.add(new Block(blockFiles[i], blockFiles[i].length(), genStamp));
- }
- }
- }
- void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap, FSVolume volume) {
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].getVolumeMap(volumeMap, volume);
- }
- }
- File blockFiles[] = dir.listFiles();
- for (int i = 0; i < blockFiles.length; i++) {
- if (Block.isBlockFilename(blockFiles[i])) {
- long genStamp = getGenerationStampFromFile(blockFiles, blockFiles[i]);
- volumeMap.put(new Block(blockFiles[i], blockFiles[i].length(), genStamp),
- new DatanodeBlockInfo(volume, blockFiles[i]));
- }
- }
- }
-
- /**
- * check if a data diretory is healthy
- * @throws DiskErrorException
- */
- public void checkDirTree() throws DiskErrorException {
- DiskChecker.checkDir(dir);
-
- if (children != null) {
- for (int i = 0; i < children.length; i++) {
- children[i].checkDirTree();
- }
- }
- }
-
- void clearPath(File f) {
- String root = dir.getAbsolutePath();
- String dir = f.getAbsolutePath();
- if (dir.startsWith(root)) {
- String[] dirNames = dir.substring(root.length()).
- split(File.separator + "subdir");
- if (clearPath(f, dirNames, 1))
- return;
- }
- clearPath(f, null, -1);
- }
-
- /*
- * dirNames is an array of string integers derived from
- * usual directory structure data/subdirN/subdirXY/subdirM ...
- * If dirName array is non-null, we only check the child at
- * the children[dirNames[idx]]. This avoids iterating over
- * children in common case. If directory structure changes
- * in later versions, we need to revisit this.
- */
- private boolean clearPath(File f, String[] dirNames, int idx) {
- if ((dirNames == null || idx == dirNames.length) &&
- dir.compareTo(f) == 0) {
- numBlocks--;
- return true;
- }
-
- if (dirNames != null) {
- //guess the child index from the directory name
- if (idx > (dirNames.length - 1) || children == null) {
- return false;
- }
- int childIdx;
- try {
- childIdx = Integer.parseInt(dirNames[idx]);
- } catch (NumberFormatException ignored) {
- // layout changed? we could print a warning.
- return false;
- }
- return (childIdx >= 0 && childIdx < children.length) ?
- children[childIdx].clearPath(f, dirNames, idx+1) : false;
- }
- //guesses failed. back to blind iteration.
- if (children != null) {
- for(int i=0; i < children.length; i++) {
- if (children[i].clearPath(f, null, -1)){
- return true;
- }
- }
- }
- return false;
- }
-
- public String toString() {
- return "FSDir{" +
- "dir=" + dir +
- ", children=" + (children == null ? null : Arrays.asList(children)) +
- "}";
- }
- }
- class FSVolume {
- private FSDir dataDir;
- private File tmpDir;
- private File detachDir; // copy on write for blocks in snapshot
- private DF usage;
- private DU dfsUsage;
- private long reserved;
-
- FSVolume(File currentDir, Configuration conf) throws IOException {
- this.reserved = conf.getLong("dfs.datanode.du.reserved", 0);
- boolean supportAppends = conf.getBoolean("dfs.support.append", false);
- File parent = currentDir.getParentFile();
- this.detachDir = new File(parent, "detach");
- if (detachDir.exists()) {
- recoverDetachedBlocks(currentDir, detachDir);
- }
- // Files that were being written when the datanode was last shutdown
- // are now moved back to the data directory. It is possible that
- // in the future, we might want to do some sort of datanode-local
- // recovery for these blocks. For example, crc validation.
- //
- this.tmpDir = new File(parent, "tmp");
- if (tmpDir.exists()) {
- if (supportAppends) {
- recoverDetachedBlocks(currentDir, tmpDir);
- } else {
- FileUtil.fullyDelete(tmpDir);
- }
- }
- this.dataDir = new FSDir(currentDir);
- if (!tmpDir.mkdirs()) {
- if (!tmpDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + tmpDir.toString());
- }
- }
- if (!detachDir.mkdirs()) {
- if (!detachDir.isDirectory()) {
- throw new IOException("Mkdirs failed to create " + detachDir.toString());
- }
- }
- this.usage = new DF(parent, conf);
- this.dfsUsage = new DU(parent, conf);
- this.dfsUsage.start();
- }
- void decDfsUsed(long value) {
- dfsUsage.decDfsUsed(value);
- }
-
- long getDfsUsed() throws IOException {
- return dfsUsage.getUsed();
- }
-
- long getCapacity() throws IOException {
- if (reserved > usage.getCapacity()) {
- return 0;
- }
- return usage.getCapacity()-reserved;
- }
-
- long getAvailable() throws IOException {
- long remaining = getCapacity()-getDfsUsed();
- long available = usage.getAvailable();
- if (remaining>available) {
- remaining = available;
- }
- return (remaining > 0) ? remaining : 0;
- }
-
- String getMount() throws IOException {
- return usage.getMount();
- }
-
- File getDir() {
- return dataDir.dir;
- }
-
- /**
- * Temporary files. They get moved to the real block directory either when
- * the block is finalized or the datanode restarts.
- */
- File createTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return createTmpFile(b, f);
- }
- /**
- * Returns the name of the temporary file for this block.
- */
- File getTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return f;
- }
- /**
- * Files used for copy-on-write. They need recovery when datanode
- * restarts.
- */
- File createDetachFile(Block b, String filename) throws IOException {
- File f = new File(detachDir, filename);
- return createTmpFile(b, f);
- }
- private File createTmpFile(Block b, File f) throws IOException {
- if (f.exists()) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should not be present, but is.");
- }
- // Create the zero-length temp file
- //
- boolean fileCreated = false;
- try {
- fileCreated = f.createNewFile();
- } catch (IOException ioe) {
- throw (IOException)new IOException(DISK_ERROR +f).initCause(ioe);
- }
- if (!fileCreated) {
- throw new IOException("Unexpected problem in creating temporary file for "+
- b + ". File " + f + " should be creatable, but is already present.");
- }
- return f;
- }
-
- File addBlock(Block b, File f) throws IOException {
- File blockFile = dataDir.addBlock(b, f);
- File metaFile = getMetaFile( blockFile , b);
- dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
- return blockFile;
- }
-
- void checkDirs() throws DiskErrorException {
- dataDir.checkDirTree();
- DiskChecker.checkDir(tmpDir);
- }
-
- void getBlockInfo(TreeSet<Block> blockSet) {
- dataDir.getBlockInfo(blockSet);
- }
-
- void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
- dataDir.getVolumeMap(volumeMap, this);
- }
-
- void clearPath(File f) {
- dataDir.clearPath(f);
- }
-
- public String toString() {
- return dataDir.dir.getAbsolutePath();
- }
- /**
- * Recover detached files on datanode restart. If a detached block
- * does not exist in the original directory, then it is moved to the
- * original directory.
- */
- private void recoverDetachedBlocks(File dataDir, File dir)
- throws IOException {
- File contents[] = dir.listFiles();
- if (contents == null) {
- return;
- }
- for (int i = 0; i < contents.length; i++) {
- if (!contents[i].isFile()) {
- throw new IOException ("Found " + contents[i] + " in " + dir +
- " but it is not a file.");
- }
- //
- // If the original block file still exists, then no recovery
- // is needed.
- //
- File blk = new File(dataDir, contents[i].getName());
- if (!blk.exists()) {
- if (!contents[i].renameTo(blk)) {
- throw new IOException("Unable to recover detached file " +
- contents[i]);
- }
- continue;
- }
- if (!contents[i].delete()) {
- throw new IOException("Unable to cleanup detached file " +
- contents[i]);
- }
- }
- }
- }
-
- static class FSVolumeSet {
- FSVolume[] volumes = null;
- int curVolume = 0;
-
- FSVolumeSet(FSVolume[] volumes) {
- this.volumes = volumes;
- }
-
- synchronized FSVolume getNextVolume(long blockSize) throws IOException {
- int startVolume = curVolume;
- while (true) {
- FSVolume volume = volumes[curVolume];
- curVolume = (curVolume + 1) % volumes.length;
- if (volume.getAvailable() > blockSize) { return volume; }
- if (curVolume == startVolume) {
- throw new DiskOutOfSpaceException("Insufficient space for an additional block");
- }
- }
- }
-
- long getDfsUsed() throws IOException {
- long dfsUsed = 0L;
- for (int idx = 0; idx < volumes.length; idx++) {
- dfsUsed += volumes[idx].getDfsUsed();
- }
- return dfsUsed;
- }
- synchronized long getCapacity() throws IOException {
- long capacity = 0L;
- for (int idx = 0; idx < volumes.length; idx++) {
- capacity += volumes[idx].getCapacity();
- }
- return capacity;
- }
-
- synchronized long getRemaining() throws IOException {
- long remaining = 0L;
- for (int idx = 0; idx < volumes.length; idx++) {
- remaining += volumes[idx].getAvailable();
- }
- return remaining;
- }
-
- synchronized void getBlockInfo(TreeSet<Block> blockSet) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getBlockInfo(blockSet);
- }
- }
-
- synchronized void getVolumeMap(HashMap<Block, DatanodeBlockInfo> volumeMap) {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].getVolumeMap(volumeMap);
- }
- }
-
- synchronized void checkDirs() throws DiskErrorException {
- for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].checkDirs();
- }
- }
-
- public String toString() {
- StringBuffer sb = new StringBuffer();
- for (int idx = 0; idx < volumes.length; idx++) {
- sb.append(volumes[idx].toString());
- if (idx != volumes.length - 1) { sb.append(","); }
- }
- return sb.toString();
- }
- }
-
- //////////////////////////////////////////////////////
- //
- // FSDataSet
- //
- //////////////////////////////////////////////////////
- //Find better place?
- public static final String METADATA_EXTENSION = ".meta";
- public static final short METADATA_VERSION = 1;
-
- static class ActiveFile {
- final File file;
- final List<Thread> threads = new ArrayList<Thread>(2);
- ActiveFile(File f, List<Thread> list) {
- file = f;
- if (list != null) {
- threads.addAll(list);
- }
- threads.add(Thread.currentThread());
- }
-
- public String toString() {
- return getClass().getSimpleName() + "(file=" + file
- + ", threads=" + threads + ")";
- }
- }
-
- static String getMetaFileName(String blockFileName, long genStamp) {
- return blockFileName + "_" + genStamp + METADATA_EXTENSION;
- }
-
- static File getMetaFile(File f , Block b) {
- return new File(getMetaFileName(f.getAbsolutePath(),
- b.getGenerationStamp()));
- }
- protected File getMetaFile(Block b) throws IOException {
- return getMetaFile(getBlockFile(b), b);
- }
- /** Find the corresponding meta data file from a given block file */
- private static File findMetaFile(final File blockFile) throws IOException {
- final String prefix = blockFile.getName() + "_";
- final File parent = blockFile.getParentFile();
- File[] matches = parent.listFiles(new FilenameFilter() {
- public boolean accept(File dir, String name) {
- return dir.equals(parent)
- && name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
- }
- });
- if (matches == null || matches.length == 0) {
- throw new IOException("Meta file not found, blockFile=" + blockFile);
- }
- else if (matches.length > 1) {
- throw new IOException("Found more than one meta files: "
- + Arrays.asList(matches));
- }
- return matches[0];
- }
-
- /** Find the corresponding meta data file from a given block file */
- private static long parseGenerationStamp(File blockFile, File metaFile
- ) throws IOException {
- String metaname = metaFile.getName();
- String gs = metaname.substring(blockFile.getName().length() + 1,
- metaname.length() - METADATA_EXTENSION.length());
- try {
- return Long.parseLong(gs);
- } catch(NumberFormatException nfe) {
- throw (IOException)new IOException("blockFile=" + blockFile
- + ", metaFile=" + metaFile).initCause(nfe);
- }
- }
- /** Return the block file for the given ID */
- public File findBlockFile(long blockId) {
- final Block b = new Block(blockId);
- File blockfile = null;
- ActiveFile activefile = ongoingCreates.get(b);
- if (activefile != null) {
- blockfile = activefile.file;
- }
- if (blockfile == null) {
- blockfile = getFile(b);
- }
- if (blockfile == null) {
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("ongoingCreates=" + ongoingCreates);
- DataNode.LOG.debug("volumeMap=" + volumeMap);
- }
- }
- return blockfile;
- }
- /** {@inheritDoc} */
- public synchronized Block getStoredBlock(long blkid) throws IOException {
- File blockfile = findBlockFile(blkid);
- if (blockfile == null) {
- return null;
- }
- File metafile = findMetaFile(blockfile);
- return new Block(blkid, blockfile.length(),
- parseGenerationStamp(blockfile, metafile));
- }
- public boolean metaFileExists(Block b) throws IOException {
- return getMetaFile(b).exists();
- }
-
- public long getMetaDataLength(Block b) throws IOException {
- File checksumFile = getMetaFile( b );
- return checksumFile.length();
- }
- public MetaDataInputStream getMetaDataInputStream(Block b)
- throws IOException {
- File checksumFile = getMetaFile( b );
- return new MetaDataInputStream(new FileInputStream(checksumFile),
- checksumFile.length());
- }
- FSVolumeSet volumes;
- private HashMap<Block,ActiveFile> ongoingCreates = new HashMap<Block,ActiveFile>();
- private int maxBlocksPerDir = 0;
- private HashMap<Block,DatanodeBlockInfo> volumeMap = null;
- static Random random = new Random();
-
- /**
- * An FSDataset has a directory where it loads its data files.
- */
- public FSDataset(DataStorage storage, Configuration conf) throws IOException {
- this.maxBlocksPerDir = conf.getInt("dfs.datanode.numblocks", 64);
- FSVolume[] volArray = new FSVolume[storage.getNumStorageDirs()];
- for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- volArray[idx] = new FSVolume(storage.getStorageDir(idx).getCurrentDir(), conf);
- }
- volumes = new FSVolumeSet(volArray);
- volumeMap = new HashMap<Block, DatanodeBlockInfo>();
- volumes.getVolumeMap(volumeMap);
- registerMBean(storage.getStorageID());
- }
- /**
- * Return the total space used by dfs datanode
- */
- public long getDfsUsed() throws IOException {
- return volumes.getDfsUsed();
- }
-
- /**
- * Return total capacity, used and unused
- */
- public long getCapacity() throws IOException {
- return volumes.getCapacity();
- }
- /**
- * Return how many bytes can still be stored in the FSDataset
- */
- public long getRemaining() throws IOException {
- return volumes.getRemaining();
- }
- /**
- * Find the block's on-disk length
- */
- public long getLength(Block b) throws IOException {
- return getBlockFile(b).length();
- }
- /**
- * Get File name for a given block.
- */
- public synchronized File getBlockFile(Block b) throws IOException {
- File f = validateBlockFile(b);
- if(f == null) {
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", volumeMap=" + volumeMap);
- }
- throw new IOException("Block " + b + " is not valid.");
- }
- return f;
- }
-
- public synchronized InputStream getBlockInputStream(Block b) throws IOException {
- return new FileInputStream(getBlockFile(b));
- }
- public synchronized InputStream getBlockInputStream(Block b, long seekOffset) throws IOException {
- File blockFile = getBlockFile(b);
- RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
- if (seekOffset > 0) {
- blockInFile.seek(seekOffset);
- }
- return new FileInputStream(blockInFile.getFD());
- }
- /**
- * Returns handles to the block file and its metadata file
- */
- public synchronized BlockInputStreams getTmpInputStreams(Block b,
- long blkOffset, long ckoff) throws IOException {
- DatanodeBlockInfo info = volumeMap.get(b);
- if (info == null) {
- throw new IOException("Block " + b + " does not exist in volumeMap.");
- }
- FSVolume v = info.getVolume();
- File blockFile = v.getTmpFile(b);
- RandomAccessFile blockInFile = new RandomAccessFile(blockFile, "r");
- if (blkOffset > 0) {
- blockInFile.seek(blkOffset);
- }
- File metaFile = getMetaFile(blockFile, b);
- RandomAccessFile metaInFile = new RandomAccessFile(metaFile, "r");
- if (ckoff > 0) {
- metaInFile.seek(ckoff);
- }
- return new BlockInputStreams(new FileInputStream(blockInFile.getFD()),
- new FileInputStream(metaInFile.getFD()));
- }
-
- private BlockWriteStreams createBlockWriteStreams( File f , File metafile) throws IOException {
- return new BlockWriteStreams(new FileOutputStream(new RandomAccessFile( f , "rw" ).getFD()),
- new FileOutputStream( new RandomAccessFile( metafile , "rw" ).getFD() ));
- }
- /**
- * Make a copy of the block if this block is linked to an existing
- * snapshot. This ensures that modifying this block does not modify
- * data in any existing snapshots.
- * @param block Block
- * @param numLinks Detach if the number of links exceed this value
- * @throws IOException
- * @return - true if the specified block was detached
- */
- public boolean detachBlock(Block block, int numLinks) throws IOException {
- DatanodeBlockInfo info = null;
- synchronized (this) {
- info = volumeMap.get(block);
- }
- return info.detachBlock(block, numLinks);
- }
- static private <T> void updateBlockMap(Map<Block, T> blockmap,
- Block oldblock, Block newblock) throws IOException {
- if (blockmap.containsKey(oldblock)) {
- T value = blockmap.remove(oldblock);
- blockmap.put(newblock, value);
- }
- }
- /** {@inheritDoc} */
- public void updateBlock(Block oldblock, Block newblock) throws IOException {
- if (oldblock.getBlockId() != newblock.getBlockId()) {
- throw new IOException("Cannot update oldblock (=" + oldblock
- + ") to newblock (=" + newblock + ").");
- }
-
- for(;;) {
- final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
- if (threads == null) {
- return;
- }
- // interrupt and wait for all ongoing create threads
- for(Thread t : threads) {
- t.interrupt();
- }
- for(Thread t : threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
- }
- }
- }
- }
- /**
- * Try to update an old block to a new block.
- * If there are ongoing create threads running for the old block,
- * the threads will be returned without updating the block.
- *
- * @return ongoing create threads if there is any. Otherwise, return null.
- */
- private synchronized List<Thread> tryUpdateBlock(
- Block oldblock, Block newblock) throws IOException {
- //check ongoing create threads
- final ActiveFile activefile = ongoingCreates.get(oldblock);
- if (activefile != null && !activefile.threads.isEmpty()) {
- //remove dead threads
- for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
- final Thread t = i.next();
- if (!t.isAlive()) {
- i.remove();
- }
- }
- //return living threads
- if (!activefile.threads.isEmpty()) {
- return new ArrayList<Thread>(activefile.threads);
- }
- }
- //No ongoing create threads is alive. Update block.
- File blockFile = findBlockFile(oldblock.getBlockId());
- if (blockFile == null) {
- throw new IOException("Block " + oldblock + " does not exist.");
- }
- File oldMetaFile = findMetaFile(blockFile);
- long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
-
- //rename meta file to a tmp file
- File tmpMetaFile = new File(oldMetaFile.getParent(),
- oldMetaFile.getName()+"_tmp" + newblock.getGenerationStamp());
- if (!oldMetaFile.renameTo(tmpMetaFile)){
- throw new IOException("Cannot rename block meta file to " + tmpMetaFile);
- }
- //update generation stamp
- if (oldgs > newblock.getGenerationStamp()) {
- throw new IOException("Cannot update block (id=" + newblock.getBlockId()
- + ") generation stamp from " + oldgs
- + " to " + newblock.getGenerationStamp());
- }
-
- //update length
- if (newblock.getNumBytes() > oldblock.getNumBytes()) {
- throw new IOException("Cannot update block file (=" + blockFile
- + ") length from " + oldblock.getNumBytes() + " to " + newblock.getNumBytes());
- }
- if (newblock.getNumBytes() < oldblock.getNumBytes()) {
- truncateBlock(blockFile, tmpMetaFile, oldblock.getNumBytes(), newblock.getNumBytes());
- }
- //rename the tmp file to the new meta file (with new generation stamp)
- File newMetaFile = getMetaFile(blockFile, newblock);
- if (!tmpMetaFile.renameTo(newMetaFile)) {
- throw new IOException("Cannot rename tmp meta file to " + newMetaFile);
- }
- updateBlockMap(ongoingCreates, oldblock, newblock);
- updateBlockMap(volumeMap, oldblock, newblock);
- // paranoia! verify that the contents of the stored block
- // matches the block file on disk.
- validateBlockMetadata(newblock);
- return null;
- }
- static private void truncateBlock(File blockFile, File metaFile,
- long oldlen, long newlen) throws IOException {
- if (newlen == oldlen) {
- return;
- }
- if (newlen > oldlen) {
- throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
- + ") to newlen (=" + newlen + ")");
- }
- DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
- int checksumsize = dcs.getChecksumSize();
- int bpc = dcs.getBytesPerChecksum();
- long n = (newlen - 1)/bpc + 1;
- long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
- long lastchunkoffset = (n - 1)*bpc;
- int lastchunksize = (int)(newlen - lastchunkoffset);
- byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
- RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
- try {
- //truncate blockFile
- blockRAF.setLength(newlen);
-
- //read last chunk
- blockRAF.seek(lastchunkoffset);
- blockRAF.readFully(b, 0, lastchunksize);
- } finally {
- blockRAF.close();
- }
- //compute checksum
- dcs.update(b, 0, lastchunksize);
- dcs.writeValue(b, 0, false);
- //update metaFile
- RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
- try {
- metaRAF.setLength(newmetalen);
- metaRAF.seek(newmetalen - checksumsize);
- metaRAF.write(b, 0, checksumsize);
- } finally {
- metaRAF.close();
- }
- }
- private final static String DISK_ERROR = "Possible disk error on file creation: ";
- /** Get the cause of an I/O exception if caused by a possible disk error
- * @param ioe an I/O exception
- * @return cause if the I/O exception is caused by a possible disk error;
- * null otherwise.
- */
- static IOException getCauseIfDiskError(IOException ioe) {
- if (ioe.getMessage()!=null && ioe.getMessage().startsWith(DISK_ERROR)) {
- return (IOException)ioe.getCause();
- } else {
- return null;
- }
- }
- /**
- * Start writing to a block file
- * If isRecovery is true and the block pre-exists, then we kill all
- volumeMap.put(b, v);
- volumeMap.put(b, v);
- * other threads that might be writing to this block, and then reopen the file.
- */
- public BlockWriteStreams writeToBlock(Block b, boolean isRecovery) throws IOException {
- //
- // Make sure the block isn't a valid one - we're still creating it!
- //
- if (isValidBlock(b)) {
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b + " is valid, and cannot be written to.");
- }
- // If the block was successfully finalized because all packets
- // were successfully processed at the Datanode but the ack for
- // some of the packets were not received by the client. The client
- // re-opens the connection and retries sending those packets.
- // The other reason is that an "append" is occurring to this block.
- detachBlock(b, 1);
- }
- long blockSize = b.getNumBytes();
- //
- // Serialize access to /tmp, and check if file already there.
- //
- File f = null;
- List<Thread> threads = null;
- synchronized (this) {
- //
- // Is it already in the create process?
- //
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile != null) {
- f = activeFile.file;
- threads = activeFile.threads;
-
- if (!isRecovery) {
- throw new BlockAlreadyExistsException("Block " + b +
- " has already been started (though not completed), and thus cannot be created.");
- } else {
- for (Thread thread:threads) {
- thread.interrupt();
- }
- }
- ongoingCreates.remove(b);
- }
- FSVolume v = null;
- if (!isRecovery) {
- v = volumes.getNextVolume(blockSize);
- // create temporary file to hold block in the designated volume
- f = createTmpFile(v, b);
- volumeMap.put(b, new DatanodeBlockInfo(v));
- } else if (f != null) {
- DataNode.LOG.info("Reopen already-open Block for append " + b);
- // create or reuse temporary file to hold block in the designated volume
- v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new DatanodeBlockInfo(v));
- } else {
- // reopening block for appending to it.
- DataNode.LOG.info("Reopen Block for append " + b);
- v = volumeMap.get(b).getVolume();
- f = createTmpFile(v, b);
- File blkfile = getBlockFile(b);
- File oldmeta = getMetaFile(b);
- File newmeta = getMetaFile(f, b);
- // rename meta file to tmp directory
- DataNode.LOG.debug("Renaming " + oldmeta + " to " + newmeta);
- if (!oldmeta.renameTo(newmeta)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move meta file " + oldmeta +
- " to tmp dir " + newmeta);
- }
- // rename block file to tmp directory
- DataNode.LOG.debug("Renaming " + blkfile + " to " + f);
- if (!blkfile.renameTo(f)) {
- if (!f.delete()) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to remove file " + f);
- }
- if (!blkfile.renameTo(f)) {
- throw new IOException("Block " + b + " reopen failed. " +
- " Unable to move block file " + blkfile +
- " to tmp dir " + f);
- }
- }
- volumeMap.put(b, new DatanodeBlockInfo(v));
- }
- if (f == null) {
- DataNode.LOG.warn("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- throw new IOException("Block " + b + " reopen failed " +
- " Unable to locate tmp file.");
- }
- ongoingCreates.put(b, new ActiveFile(f, threads));
- }
- try {
- if (threads != null) {
- for (Thread thread:threads) {
- thread.join();
- }
- }
- } catch (InterruptedException e) {
- throw new IOException("Recovery waiting for thread interrupted.");
- }
- //
- // Finally, allow a writer to the block file
- // REMIND - mjc - make this a filter stream that enforces a max
- // block size, so clients can't go crazy
- //
- File metafile = getMetaFile(f, b);
- DataNode.LOG.debug("writeTo blockfile is " + f + " of size " + f.length());
- DataNode.LOG.debug("writeTo metafile is " + metafile + " of size " + metafile.length());
- return createBlockWriteStreams( f , metafile);
- }
- /**
- * Retrieves the offset in the block to which the
- * the next write will write data to.
- */
- public long getChannelPosition(Block b, BlockWriteStreams streams)
- throws IOException {
- FileOutputStream file = (FileOutputStream) streams.dataOut;
- return file.getChannel().position();
- }
- /**
- * Sets the offset in the block to which the
- * the next write will write data to.
- */
- public void setChannelPosition(Block b, BlockWriteStreams streams,
- long dataOffset, long ckOffset)
- throws IOException {
- long size = 0;
- synchronized (this) {
- FSVolume vol = volumeMap.get(b).getVolume();
- size = vol.getTmpFile(b).length();
- }
- if (size < dataOffset) {
- String msg = "Trying to change block file offset of block " + b +
- " to " + dataOffset +
- " but actual size of file is " +
- size;
- throw new IOException(msg);
- }
- FileOutputStream file = (FileOutputStream) streams.dataOut;
- file.getChannel().position(dataOffset);
- file = (FileOutputStream) streams.checksumOut;
- file.getChannel().position(ckOffset);
- }
- synchronized File createTmpFile( FSVolume vol, Block blk ) throws IOException {
- if ( vol == null ) {
- vol = volumeMap.get( blk ).getVolume();
- if ( vol == null ) {
- throw new IOException("Could not find volume for block " + blk);
- }
- }
- return vol.createTmpFile(blk);
- }
- //
- // REMIND - mjc - eventually we should have a timeout system
- // in place to clean up block files left by abandoned clients.
- // We should have some timer in place, so that if a blockfile
- // is created but non-valid, and has been idle for >48 hours,
- // we can GC it safely.
- //
- /**
- * Complete the block write!
- */
- public synchronized void finalizeBlock(Block b) throws IOException {
- ActiveFile activeFile = ongoingCreates.get(b);
- if (activeFile == null) {
- throw new IOException("Block " + b + " is already finalized.");
- }
- File f = activeFile.file;
- if (f == null || !f.exists()) {
- throw new IOException("No temporary file " + f + " for block " + b);
- }
- FSVolume v = volumeMap.get(b).getVolume();
- if (v == null) {
- throw new IOException("No volume for temporary file " + f +
- " for block " + b);
- }
-
- File dest = null;
- dest = v.addBlock(b, f);
- volumeMap.put(b, new DatanodeBlockInfo(v, dest));
- ongoingCreates.remove(b);
- }
- /**
- * Remove the temporary block file (if any)
- */
- public synchronized void unfinalizeBlock(Block b) throws IOException {
- // remove the block from in-memory data structure
- ActiveFile activefile = ongoingCreates.remove(b);
- if (activefile == null) {
- return;
- }
- volumeMap.remove(b);
-
- // delete the on-disk temp file
- if (delBlockFromDisk(activefile.file, getMetaFile(activefile.file, b), b)) {
- DataNode.LOG.warn("Block " + b + " unfinalized and removed. " );
- }
- }
- /**
- * Remove a block from disk
- * @param blockFile block file
- * @param metaFile block meta file
- * @param b a block
- * @return true if on-disk files are deleted; false otherwise
- */
- private boolean delBlockFromDisk(File blockFile, File metaFile, Block b) {
- if (blockFile == null) {
- DataNode.LOG.warn("No file exists for block: " + b);
- return true;
- }
-
- if (!blockFile.delete()) {
- DataNode.LOG.warn("Not able to delete the block file: " + blockFile);
- return false;
- } else { // remove the meta file
- if (metaFile != null && !metaFile.delete()) {
- DataNode.LOG.warn(
- "Not able to delete the meta block file: " + metaFile);
- return false;
- }
- }
- return true;
- }
-
- /**
- * Return a table of block data
- */
- public Block[] getBlockReport() {
- TreeSet<Block> blockSet = new TreeSet<Block>();
- volumes.getBlockInfo(blockSet);
- Block blockTable[] = new Block[blockSet.size()];
- int i = 0;
- for (Iterator<Block> it = blockSet.iterator(); it.hasNext(); i++) {
- blockTable[i] = it.next();
- }
- return blockTable;
- }
- /**
- * Check whether the given block is a valid one.
- */
- public boolean isValidBlock(Block b) {
- return validateBlockFile(b) != null;
- }
- /**
- * Find the file corresponding to the block and return it if it exists.
- */
- File validateBlockFile(Block b) {
- //Should we check for metadata file too?
- File f = getFile(b);
- if(f != null && f.exists())
- return f;
- if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
- InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
- }
- return null;
- }
- /** {@inheritDoc} */
- public void validateBlockMetadata(Block b) throws IOException {
- DatanodeBlockInfo info = volumeMap.get(b);
- if (info == null) {
- throw new IOException("Block " + b + " does not exist in volumeMap.");
- }
- FSVolume v = info.getVolume();
- File tmp = v.getTmpFile(b);
- File f = getFile(b);
- if (f == null) {
- f = tmp;
- }
- if (f == null) {
- throw new IOException("Block " + b + " does not exist on disk.");
- }
- if (!f.exists()) {
- throw new IOException("Block " + b +
- " block file " + f +
- " does not exist on disk.");
- }
- if (b.getNumBytes() != f.length()) {
- throw new IOException("Block " + b +
- " length is " + b.getNumBytes() +
- " does not match block file length " +
- f.length());
- }
- File meta = getMetaFile(f, b);
- if (meta == null) {
- throw new IOException("Block " + b +
- " metafile does not exist.");
- }
- if (!meta.exists()) {
- throw new IOException("Block " + b +
- " metafile " + meta +
- " does not exist on disk.");
- }
- if (meta.length() == 0) {
- throw new IOException("Block " + b + " metafile " + meta + " is empty.");
- }
- long stamp = parseGenerationStamp(f, meta);
- if (stamp != b.getGenerationStamp()) {
- throw new IOException("Block " + b +
- " genstamp is " + b.getGenerationStamp() +
- " does not match meta file stamp " +
- stamp);
- }
- }
- /**
- * We're informed that a block is no longer valid. We
- * could lazily garbage-collect the block, but why bother?
- * just get rid of it.
- */
- public void invalidate(Block invalidBlks[]) throws IOException {
- boolean error = false;
- for (int i = 0; i < invalidBlks.length; i++) {
- File f = null;
- FSVolume v;
- synchronized (this) {
- f = getFile(invalidBlks[i]);
- DatanodeBlockInfo dinfo = volumeMap.get(invalidBlks[i]);
- if (dinfo == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". BlockInfo not found in volumeMap.");
- error = true;
- continue;
- }
- v = dinfo.getVolume();
- if (f == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Block not found in blockMap." +
- ((v == null) ? " " : " Block found in volumeMap."));
- error = true;
- continue;
- }
- if (v == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". No volume for this block." +
- " Block found in blockMap. " + f + ".");
- error = true;
- continue;
- }
- File parent = f.getParentFile();
- if (parent == null) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] +
- ". Parent not found for file " + f + ".");
- error = true;
- continue;
- }
- v.clearPath(parent);
- volumeMap.remove(invalidBlks[i]);
- }
- File metaFile = getMetaFile( f, invalidBlks[i] );
- long blockSize = f.length()+metaFile.length();
- if ( !f.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
- DataNode.LOG.warn("Unexpected error trying to delete block "
- + invalidBlks[i] + " at file " + f);
- error = true;
- continue;
- }
- v.decDfsUsed(blockSize);
- DataNode.LOG.info("Deleting block " + invalidBlks[i] + " file " + f);
- if (f.exists()) {
- //
- // This is a temporary check especially for hadoop-1220.
- // This will go away in the future.
- //
- DataNode.LOG.info("File " + f + " was deleted but still exists!");
- }
- }
- if (error) {
- throw new IOException("Error in deleting blocks.");
- }
- }
- /**
- * Turn the block identifier into a filename.
- */
- public synchronized File getFile(Block b) {
- DatanodeBlockInfo info = volumeMap.get(b);
- if (info != null) {
- return info.getFile();
- }
- return null;
- }
- /**
- * check if a data directory is healthy
- * @throws DiskErrorException
- */
- public void checkDataDir() throws DiskErrorException {
- volumes.checkDirs();
- }
-
- public String toString() {
- return "FSDataset{dirpath='"+volumes+"'}";
- }
- private ObjectName mbeanName;
- private Random rand = new Random();
-
- /**
- * Register the FSDataset MBean using the name
- * "hadoop:service=DataNode,name=FSDatasetState-<storageid>"
- */
- void registerMBean(final String storageId) {
- // We wrap to bypass standard mbean naming convetion.
- // This wraping can be removed in java 6 as it is more flexible in
- // package naming for mbeans and their impl.
- StandardMBean bean;
- String storageName;
- if (storageId == null || storageId.equals("")) {// Temp fix for the uninitialized storage
- storageName = "UndefinedStorageId" + rand.nextInt();
- } else {
- storageName = storageId;
- }
- try {
- bean = new StandardMBean(this,FSDatasetMBean.class);
- mbeanName = MBeanUtil.registerMBean("DataNode", "FSDatasetState-" + storageName, bean);
- } catch (NotCompliantMBeanException e) {
- e.printStackTrace();
- }
-
- DataNode.LOG.info("Registered FSDatasetStatusMBean");
- }
- public void shutdown() {
- if (mbeanName != null)
- MBeanUtil.unregisterMBean(mbeanName);
-
- if(volumes != null) {
- for (FSVolume volume : volumes.volumes) {
- if(volume != null) {
- volume.dfsUsage.shutdown();
- }
- }
- }
- }
- public String getStorageInfo() {
- return toString();
- }
- }