FSEditLog.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:42k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import java.io.BufferedInputStream;
  20. import java.io.DataInput;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutput;
  23. import java.io.EOFException;
  24. import java.io.File;
  25. import java.io.FileInputStream;
  26. import java.io.FileOutputStream;
  27. import java.io.IOException;
  28. import java.io.RandomAccessFile;
  29. import java.util.ArrayList;
  30. import java.util.Iterator;
  31. import java.lang.Math;
  32. import java.nio.channels.FileChannel;
  33. import java.nio.ByteBuffer;
  34. import org.apache.hadoop.hdfs.protocol.Block;
  35. import org.apache.hadoop.hdfs.protocol.DatanodeID;
  36. import org.apache.hadoop.hdfs.protocol.FSConstants;
  37. import org.apache.hadoop.hdfs.server.common.Storage;
  38. import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
  39. import org.apache.hadoop.hdfs.server.namenode.FSImage.NameNodeDirType;
  40. import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
  41. import org.apache.hadoop.io.*;
  42. import org.apache.hadoop.fs.FileStatus;
  43. import org.apache.hadoop.fs.permission.*;
  44. /**
  45.  * FSEditLog maintains a log of the namespace modifications.
  46.  * 
  47.  */
  48. public class FSEditLog {
  49.   private static final byte OP_INVALID = -1;
  50.   private static final byte OP_ADD = 0;
  51.   private static final byte OP_RENAME = 1;  // rename
  52.   private static final byte OP_DELETE = 2;  // delete
  53.   private static final byte OP_MKDIR = 3;   // create directory
  54.   private static final byte OP_SET_REPLICATION = 4; // set replication
  55.   //the following two are used only for backward compatibility :
  56.   @Deprecated private static final byte OP_DATANODE_ADD = 5;
  57.   @Deprecated private static final byte OP_DATANODE_REMOVE = 6;
  58.   private static final byte OP_SET_PERMISSIONS = 7;
  59.   private static final byte OP_SET_OWNER = 8;
  60.   private static final byte OP_CLOSE = 9;    // close after write
  61.   private static final byte OP_SET_GENSTAMP = 10;    // store genstamp
  62.   /* The following two are not used any more. Should be removed once
  63.    * LAST_UPGRADABLE_LAYOUT_VERSION is -17 or newer. */
  64.   private static final byte OP_SET_NS_QUOTA = 11; // set namespace quota
  65.   private static final byte OP_CLEAR_NS_QUOTA = 12; // clear namespace quota
  66.   private static final byte OP_TIMES = 13; // sets mod & access time on a file
  67.   private static final byte OP_SET_QUOTA = 14; // sets name and disk quotas.
  68.   private static int sizeFlushBuffer = 512*1024;
  69.   private ArrayList<EditLogOutputStream> editStreams = null;
  70.   private FSImage fsimage = null;
  71.   // a monotonically increasing counter that represents transactionIds.
  72.   private long txid = 0;
  73.   // stores the last synced transactionId.
  74.   private long synctxid = 0;
  75.   // the time of printing the statistics to the log file.
  76.   private long lastPrintTime;
  77.   // is a sync currently running?
  78.   private boolean isSyncRunning;
  79.   // these are statistics counters.
  80.   private long numTransactions;        // number of transactions
  81.   private long numTransactionsBatchedInSync;
  82.   private long totalTimeTransactions;  // total time for all transactions
  83.   private NameNodeMetrics metrics;
  84.   private static class TransactionId {
  85.     public long txid;
  86.     TransactionId(long value) {
  87.       this.txid = value;
  88.     }
  89.   }
  90.   // stores the most current transactionId of this thread.
  91.   private static final ThreadLocal<TransactionId> myTransactionId = new ThreadLocal<TransactionId>() {
  92.     protected synchronized TransactionId initialValue() {
  93.       return new TransactionId(Long.MAX_VALUE);
  94.     }
  95.   };
  96.   /**
  97.    * An implementation of the abstract class {@link EditLogOutputStream},
  98.    * which stores edits in a local file.
  99.    */
  100.   static private class EditLogFileOutputStream extends EditLogOutputStream {
  101.     private File file;
  102.     private FileOutputStream fp;    // file stream for storing edit logs 
  103.     private FileChannel fc;         // channel of the file stream for sync
  104.     private DataOutputBuffer bufCurrent;  // current buffer for writing
  105.     private DataOutputBuffer bufReady;    // buffer ready for flushing
  106.     static ByteBuffer fill = ByteBuffer.allocateDirect(512); // preallocation
  107.     EditLogFileOutputStream(File name) throws IOException {
  108.       super();
  109.       file = name;
  110.       bufCurrent = new DataOutputBuffer(sizeFlushBuffer);
  111.       bufReady = new DataOutputBuffer(sizeFlushBuffer);
  112.       RandomAccessFile rp = new RandomAccessFile(name, "rw");
  113.       fp = new FileOutputStream(rp.getFD()); // open for append
  114.       fc = rp.getChannel();
  115.       fc.position(fc.size());
  116.     }
  117.     @Override
  118.     String getName() {
  119.       return file.getPath();
  120.     }
  121.     /** {@inheritDoc} */
  122.     @Override
  123.     public void write(int b) throws IOException {
  124.       bufCurrent.write(b);
  125.     }
  126.     /** {@inheritDoc} */
  127.     @Override
  128.     void write(byte op, Writable ... writables) throws IOException {
  129.       write(op);
  130.       for(Writable w : writables) {
  131.         w.write(bufCurrent);
  132.       }
  133.     }
  134.     /**
  135.      * Create empty edits logs file.
  136.      */
  137.     @Override
  138.     void create() throws IOException {
  139.       fc.truncate(0);
  140.       fc.position(0);
  141.       bufCurrent.writeInt(FSConstants.LAYOUT_VERSION);
  142.       setReadyToFlush();
  143.       flush();
  144.     }
  145.     @Override
  146.     public void close() throws IOException {
  147.       // close should have been called after all pending transactions 
  148.       // have been flushed & synced.
  149.       int bufSize = bufCurrent.size();
  150.       if (bufSize != 0) {
  151.         throw new IOException("FSEditStream has " + bufSize +
  152.                               " bytes still to be flushed and cannot " +
  153.                               "be closed.");
  154.       } 
  155.       bufCurrent.close();
  156.       bufReady.close();
  157.       // remove the last INVALID marker from transaction log.
  158.       fc.truncate(fc.position());
  159.       fp.close();
  160.       
  161.       bufCurrent = bufReady = null;
  162.     }
  163.     /**
  164.      * All data that has been written to the stream so far will be flushed.
  165.      * New data can be still written to the stream while flushing is performed.
  166.      */
  167.     @Override
  168.     void setReadyToFlush() throws IOException {
  169.       assert bufReady.size() == 0 : "previous data is not flushed yet";
  170.       write(OP_INVALID);           // insert end-of-file marker
  171.       DataOutputBuffer tmp = bufReady;
  172.       bufReady = bufCurrent;
  173.       bufCurrent = tmp;
  174.     }
  175.     /**
  176.      * Flush ready buffer to persistent store.
  177.      * currentBuffer is not flushed as it accumulates new log records
  178.      * while readyBuffer will be flushed and synced.
  179.      */
  180.     @Override
  181.     protected void flushAndSync() throws IOException {
  182.       preallocate();            // preallocate file if necessary
  183.       bufReady.writeTo(fp);     // write data to file
  184.       bufReady.reset();         // erase all data in the buffer
  185.       fc.force(false);          // metadata updates not needed because of preallocation
  186.       fc.position(fc.position()-1); // skip back the end-of-file marker
  187.     }
  188.     /**
  189.      * Return the size of the current edit log including buffered data.
  190.      */
  191.     @Override
  192.     long length() throws IOException {
  193.       // file size + size of both buffers
  194.       return fc.size() + bufReady.size() + bufCurrent.size();
  195.     }
  196.     // allocate a big chunk of data
  197.     private void preallocate() throws IOException {
  198.       long position = fc.position();
  199.       if (position + 4096 >= fc.size()) {
  200.         FSNamesystem.LOG.debug("Preallocating Edit log, current size " +
  201.                                 fc.size());
  202.         long newsize = position + 1024*1024; // 1MB
  203.         fill.position(0);
  204.         int written = fc.write(fill, newsize);
  205.         FSNamesystem.LOG.debug("Edit log size is now " + fc.size() +
  206.                               " written " + written + " bytes " +
  207.                               " at offset " +  newsize);
  208.       }
  209.     }
  210.     
  211.     /**
  212.      * Returns the file associated with this stream
  213.      */
  214.     File getFile() {
  215.       return file;
  216.     }
  217.   }
  218.   static class EditLogFileInputStream extends EditLogInputStream {
  219.     private File file;
  220.     private FileInputStream fStream;
  221.     EditLogFileInputStream(File name) throws IOException {
  222.       file = name;
  223.       fStream = new FileInputStream(name);
  224.     }
  225.     @Override
  226.     String getName() {
  227.       return file.getPath();
  228.     }
  229.     @Override
  230.     public int available() throws IOException {
  231.       return fStream.available();
  232.     }
  233.     @Override
  234.     public int read() throws IOException {
  235.       return fStream.read();
  236.     }
  237.     @Override
  238.     public int read(byte[] b, int off, int len) throws IOException {
  239.       return fStream.read(b, off, len);
  240.     }
  241.     @Override
  242.     public void close() throws IOException {
  243.       fStream.close();
  244.     }
  245.     @Override
  246.     long length() throws IOException {
  247.       // file size + size of both buffers
  248.       return file.length();
  249.     }
  250.   }
  251.   FSEditLog(FSImage image) {
  252.     fsimage = image;
  253.     isSyncRunning = false;
  254.     metrics = NameNode.getNameNodeMetrics();
  255.     lastPrintTime = FSNamesystem.now();
  256.   }
  257.   
  258.   private File getEditFile(StorageDirectory sd) {
  259.     return fsimage.getEditFile(sd);
  260.   }
  261.   
  262.   private File getEditNewFile(StorageDirectory sd) {
  263.     return fsimage.getEditNewFile(sd);
  264.   }
  265.   
  266.   private int getNumStorageDirs() {
  267.  int numStorageDirs = 0;
  268.  for (Iterator<StorageDirectory> it = 
  269.        fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext(); it.next())
  270.    numStorageDirs++;
  271.     return numStorageDirs;
  272.   }
  273.   
  274.   synchronized int getNumEditStreams() {
  275.     return editStreams == null ? 0 : editStreams.size();
  276.   }
  277.   boolean isOpen() {
  278.     return getNumEditStreams() > 0;
  279.   }
  280.   /**
  281.    * Create empty edit log files.
  282.    * Initialize the output stream for logging.
  283.    * 
  284.    * @throws IOException
  285.    */
  286.   public synchronized void open() throws IOException {
  287.     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
  288.     if (editStreams == null)
  289.       editStreams = new ArrayList<EditLogOutputStream>();
  290.     for (Iterator<StorageDirectory> it = 
  291.            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  292.       StorageDirectory sd = it.next();
  293.       File eFile = getEditFile(sd);
  294.       try {
  295.         EditLogOutputStream eStream = new EditLogFileOutputStream(eFile);
  296.         editStreams.add(eStream);
  297.       } catch (IOException e) {
  298.         FSNamesystem.LOG.warn("Unable to open edit log file " + eFile);
  299.         // Remove the directory from list of storage directories
  300.         it.remove();
  301.       }
  302.     }
  303.   }
  304.   public synchronized void createEditLogFile(File name) throws IOException {
  305.     EditLogOutputStream eStream = new EditLogFileOutputStream(name);
  306.     eStream.create();
  307.     eStream.close();
  308.   }
  309.   /**
  310.    * Create edits.new if non existent.
  311.    */
  312.   synchronized void createNewIfMissing() throws IOException {
  313.  for (Iterator<StorageDirectory> it = 
  314.        fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  315.       File newFile = getEditNewFile(it.next());
  316.       if (!newFile.exists())
  317.         createEditLogFile(newFile);
  318.     }
  319.   }
  320.   
  321.   /**
  322.    * Shutdown the file store.
  323.    */
  324.   public synchronized void close() throws IOException {
  325.     while (isSyncRunning) {
  326.       try {
  327.         wait(1000);
  328.       } catch (InterruptedException ie) { 
  329.       }
  330.     }
  331.     if (editStreams == null) {
  332.       return;
  333.     }
  334.     printStatistics(true);
  335.     numTransactions = totalTimeTransactions = numTransactionsBatchedInSync = 0;
  336.     for (int idx = 0; idx < editStreams.size(); idx++) {
  337.       EditLogOutputStream eStream = editStreams.get(idx);
  338.       try {
  339.         eStream.setReadyToFlush();
  340.         eStream.flush();
  341.         eStream.close();
  342.       } catch (IOException e) {
  343.         processIOError(idx);
  344.         idx--;
  345.       }
  346.     }
  347.     editStreams.clear();
  348.   }
  349.   /**
  350.    * If there is an IO Error on any log operations, remove that
  351.    * directory from the list of directories.
  352.    * If no more directories remain, then exit.
  353.    */
  354.   synchronized void processIOError(int index) {
  355.     if (editStreams == null || editStreams.size() <= 1) {
  356.       FSNamesystem.LOG.fatal(
  357.       "Fatal Error : All storage directories are inaccessible."); 
  358.       Runtime.getRuntime().exit(-1);
  359.     }
  360.     assert(index < getNumStorageDirs());
  361.     assert(getNumStorageDirs() == editStreams.size());
  362.     
  363.     File parentStorageDir = ((EditLogFileOutputStream)editStreams
  364.                                       .get(index)).getFile()
  365.                                       .getParentFile().getParentFile();
  366.     editStreams.remove(index);
  367.     //
  368.     // Invoke the ioerror routine of the fsimage
  369.     //
  370.     fsimage.processIOError(parentStorageDir);
  371.   }
  372.   
  373.   /**
  374.    * If there is an IO Error on any log operations on storage directory,
  375.    * remove any stream associated with that directory 
  376.    */
  377.   synchronized void processIOError(StorageDirectory sd) {
  378.     // Try to remove stream only if one should exist
  379.     if (!sd.getStorageDirType().isOfType(NameNodeDirType.EDITS))
  380.       return;
  381.     if (editStreams == null || editStreams.size() <= 1) {
  382.       FSNamesystem.LOG.fatal(
  383.           "Fatal Error : All storage directories are inaccessible."); 
  384.       Runtime.getRuntime().exit(-1);
  385.     }
  386.     for (int idx = 0; idx < editStreams.size(); idx++) {
  387.       File parentStorageDir = ((EditLogFileOutputStream)editStreams
  388.                                        .get(idx)).getFile()
  389.                                        .getParentFile().getParentFile();
  390.       if (parentStorageDir.getName().equals(sd.getRoot().getName()))
  391.         editStreams.remove(idx);
  392.  }
  393.   }
  394.   
  395.   /**
  396.    * The specified streams have IO errors. Remove them from logging
  397.    * new transactions.
  398.    */
  399.   private void processIOError(ArrayList<EditLogOutputStream> errorStreams) {
  400.     if (errorStreams == null) {
  401.       return;                       // nothing to do
  402.     }
  403.     for (int idx = 0; idx < errorStreams.size(); idx++) {
  404.       EditLogOutputStream eStream = errorStreams.get(idx);
  405.       int j = 0;
  406.       int numEditStreams = editStreams.size();
  407.       for (j = 0; j < numEditStreams; j++) {
  408.         if (editStreams.get(j) == eStream) {
  409.           break;
  410.         }
  411.       }
  412.       if (j == numEditStreams) {
  413.           FSNamesystem.LOG.error("Unable to find sync log on which " +
  414.                                  " IO error occured. " +
  415.                                  "Fatal Error.");
  416.           Runtime.getRuntime().exit(-1);
  417.       }
  418.       processIOError(j);
  419.     }
  420.     fsimage.incrementCheckpointTime();
  421.   }
  422.   /**
  423.    * check if ANY edits.new log exists
  424.    */
  425.   boolean existsNew() throws IOException {
  426.     for (Iterator<StorageDirectory> it = 
  427.            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  428.       if (getEditNewFile(it.next()).exists()) { 
  429.         return true;
  430.       }
  431.     }
  432.     return false;
  433.   }
  434.   /**
  435.    * Load an edit log, and apply the changes to the in-memory structure
  436.    * This is where we apply edits that we've been writing to disk all
  437.    * along.
  438.    */
  439.   static int loadFSEdits(EditLogInputStream edits) throws IOException {
  440.     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  441.     FSDirectory fsDir = fsNamesys.dir;
  442.     int numEdits = 0;
  443.     int logVersion = 0;
  444.     String clientName = null;
  445.     String clientMachine = null;
  446.     String path = null;
  447.     int numOpAdd = 0, numOpClose = 0, numOpDelete = 0,
  448.         numOpRename = 0, numOpSetRepl = 0, numOpMkDir = 0,
  449.         numOpSetPerm = 0, numOpSetOwner = 0, numOpSetGenStamp = 0,
  450.         numOpTimes = 0, numOpOther = 0;
  451.     long startTime = FSNamesystem.now();
  452.     DataInputStream in = new DataInputStream(new BufferedInputStream(edits));
  453.     try {
  454.       // Read log file version. Could be missing. 
  455.       in.mark(4);
  456.       // If edits log is greater than 2G, available method will return negative
  457.       // numbers, so we avoid having to call available
  458.       boolean available = true;
  459.       try {
  460.         logVersion = in.readByte();
  461.       } catch (EOFException e) {
  462.         available = false;
  463.       }
  464.       if (available) {
  465.         in.reset();
  466.         logVersion = in.readInt();
  467.         if (logVersion < FSConstants.LAYOUT_VERSION) // future version
  468.           throw new IOException(
  469.                           "Unexpected version of the file system log file: "
  470.                           + logVersion + ". Current version = " 
  471.                           + FSConstants.LAYOUT_VERSION + ".");
  472.       }
  473.       assert logVersion <= Storage.LAST_UPGRADABLE_LAYOUT_VERSION :
  474.                             "Unsupported version " + logVersion;
  475.       while (true) {
  476.         long timestamp = 0;
  477.         long mtime = 0;
  478.         long atime = 0;
  479.         long blockSize = 0;
  480.         byte opcode = -1;
  481.         try {
  482.           opcode = in.readByte();
  483.           if (opcode == OP_INVALID) {
  484.             FSNamesystem.LOG.info("Invalid opcode, reached end of edit log " +
  485.                                    "Number of transactions found " + numEdits);
  486.             break; // no more transactions
  487.           }
  488.         } catch (EOFException e) {
  489.           break; // no more transactions
  490.         }
  491.         numEdits++;
  492.         switch (opcode) {
  493.         case OP_ADD:
  494.         case OP_CLOSE: {
  495.           // versions > 0 support per file replication
  496.           // get name and replication
  497.           int length = in.readInt();
  498.           if (-7 == logVersion && length != 3||
  499.               -17 < logVersion && logVersion < -7 && length != 4 ||
  500.               logVersion <= -17 && length != 5) {
  501.               throw new IOException("Incorrect data format."  +
  502.                                     " logVersion is " + logVersion +
  503.                                     " but writables.length is " +
  504.                                     length + ". ");
  505.           }
  506.           path = FSImage.readString(in);
  507.           short replication = adjustReplication(readShort(in));
  508.           mtime = readLong(in);
  509.           if (logVersion <= -17) {
  510.             atime = readLong(in);
  511.           }
  512.           if (logVersion < -7) {
  513.             blockSize = readLong(in);
  514.           }
  515.           // get blocks
  516.           Block blocks[] = null;
  517.           if (logVersion <= -14) {
  518.             blocks = readBlocks(in);
  519.           } else {
  520.             BlockTwo oldblk = new BlockTwo();
  521.             int num = in.readInt();
  522.             blocks = new Block[num];
  523.             for (int i = 0; i < num; i++) {
  524.               oldblk.readFields(in);
  525.               blocks[i] = new Block(oldblk.blkid, oldblk.len, 
  526.                                     Block.GRANDFATHER_GENERATION_STAMP);
  527.             }
  528.           }
  529.           // Older versions of HDFS does not store the block size in inode.
  530.           // If the file has more than one block, use the size of the
  531.           // first block as the blocksize. Otherwise use the default
  532.           // block size.
  533.           if (-8 <= logVersion && blockSize == 0) {
  534.             if (blocks.length > 1) {
  535.               blockSize = blocks[0].getNumBytes();
  536.             } else {
  537.               long first = ((blocks.length == 1)? blocks[0].getNumBytes(): 0);
  538.               blockSize = Math.max(fsNamesys.getDefaultBlockSize(), first);
  539.             }
  540.           }
  541.            
  542.           PermissionStatus permissions = fsNamesys.getUpgradePermission();
  543.           if (logVersion <= -11) {
  544.             permissions = PermissionStatus.read(in);
  545.           }
  546.           // clientname, clientMachine and block locations of last block.
  547.           if (opcode == OP_ADD && logVersion <= -12) {
  548.             clientName = FSImage.readString(in);
  549.             clientMachine = FSImage.readString(in);
  550.             if (-13 <= logVersion) {
  551.               readDatanodeDescriptorArray(in);
  552.             }
  553.           } else {
  554.             clientName = "";
  555.             clientMachine = "";
  556.           }
  557.           // The open lease transaction re-creates a file if necessary.
  558.           // Delete the file if it already exists.
  559.           if (FSNamesystem.LOG.isDebugEnabled()) {
  560.             FSNamesystem.LOG.debug(opcode + ": " + path + 
  561.                                    " numblocks : " + blocks.length +
  562.                                    " clientHolder " +  clientName +
  563.                                    " clientMachine " + clientMachine);
  564.           }
  565.           fsDir.unprotectedDelete(path, mtime);
  566.           // add to the file tree
  567.           INodeFile node = (INodeFile)fsDir.unprotectedAddFile(
  568.                                                     path, permissions,
  569.                                                     blocks, replication, 
  570.                                                     mtime, atime, blockSize);
  571.           if (opcode == OP_ADD) {
  572.             numOpAdd++;
  573.             //
  574.             // Replace current node with a INodeUnderConstruction.
  575.             // Recreate in-memory lease record.
  576.             //
  577.             INodeFileUnderConstruction cons = new INodeFileUnderConstruction(
  578.                                       node.getLocalNameBytes(),
  579.                                       node.getReplication(), 
  580.                                       node.getModificationTime(),
  581.                                       node.getPreferredBlockSize(),
  582.                                       node.getBlocks(),
  583.                                       node.getPermissionStatus(),
  584.                                       clientName, 
  585.                                       clientMachine, 
  586.                                       null);
  587.             fsDir.replaceNode(path, node, cons);
  588.             fsNamesys.leaseManager.addLease(cons.clientName, path);
  589.           }
  590.           break;
  591.         } 
  592.         case OP_SET_REPLICATION: {
  593.           numOpSetRepl++;
  594.           path = FSImage.readString(in);
  595.           short replication = adjustReplication(readShort(in));
  596.           fsDir.unprotectedSetReplication(path, replication, null);
  597.           break;
  598.         } 
  599.         case OP_RENAME: {
  600.           numOpRename++;
  601.           int length = in.readInt();
  602.           if (length != 3) {
  603.             throw new IOException("Incorrect data format. " 
  604.                                   + "Mkdir operation.");
  605.           }
  606.           String s = FSImage.readString(in);
  607.           String d = FSImage.readString(in);
  608.           timestamp = readLong(in);
  609.           FileStatus dinfo = fsDir.getFileInfo(d);
  610.           fsDir.unprotectedRenameTo(s, d, timestamp);
  611.           fsNamesys.changeLease(s, d, dinfo);
  612.           break;
  613.         }
  614.         case OP_DELETE: {
  615.           numOpDelete++;
  616.           int length = in.readInt();
  617.           if (length != 2) {
  618.             throw new IOException("Incorrect data format. " 
  619.                                   + "delete operation.");
  620.           }
  621.           path = FSImage.readString(in);
  622.           timestamp = readLong(in);
  623.           fsDir.unprotectedDelete(path, timestamp);
  624.           break;
  625.         }
  626.         case OP_MKDIR: {
  627.           numOpMkDir++;
  628.           PermissionStatus permissions = fsNamesys.getUpgradePermission();
  629.           int length = in.readInt();
  630.           if (-17 < logVersion && length != 2 ||
  631.               logVersion <= -17 && length != 3) {
  632.             throw new IOException("Incorrect data format. " 
  633.                                   + "Mkdir operation.");
  634.           }
  635.           path = FSImage.readString(in);
  636.           timestamp = readLong(in);
  637.           // The disk format stores atimes for directories as well.
  638.           // However, currently this is not being updated/used because of
  639.           // performance reasons.
  640.           if (logVersion <= -17) {
  641.             atime = readLong(in);
  642.           }
  643.           if (logVersion <= -11) {
  644.             permissions = PermissionStatus.read(in);
  645.           }
  646.           fsDir.unprotectedMkdir(path, permissions, timestamp);
  647.           break;
  648.         }
  649.         case OP_SET_GENSTAMP: {
  650.           numOpSetGenStamp++;
  651.           long lw = in.readLong();
  652.           fsDir.namesystem.setGenerationStamp(lw);
  653.           break;
  654.         } 
  655.         case OP_DATANODE_ADD: {
  656.           numOpOther++;
  657.           FSImage.DatanodeImage nodeimage = new FSImage.DatanodeImage();
  658.           nodeimage.readFields(in);
  659.           //Datnodes are not persistent any more.
  660.           break;
  661.         }
  662.         case OP_DATANODE_REMOVE: {
  663.           numOpOther++;
  664.           DatanodeID nodeID = new DatanodeID();
  665.           nodeID.readFields(in);
  666.           //Datanodes are not persistent any more.
  667.           break;
  668.         }
  669.         case OP_SET_PERMISSIONS: {
  670.           numOpSetPerm++;
  671.           if (logVersion > -11)
  672.             throw new IOException("Unexpected opcode " + opcode
  673.                                   + " for version " + logVersion);
  674.           fsDir.unprotectedSetPermission(
  675.               FSImage.readString(in), FsPermission.read(in));
  676.           break;
  677.         }
  678.         case OP_SET_OWNER: {
  679.           numOpSetOwner++;
  680.           if (logVersion > -11)
  681.             throw new IOException("Unexpected opcode " + opcode
  682.                                   + " for version " + logVersion);
  683.           fsDir.unprotectedSetOwner(FSImage.readString(in),
  684.               FSImage.readString_EmptyAsNull(in),
  685.               FSImage.readString_EmptyAsNull(in));
  686.           break;
  687.         }
  688.         case OP_SET_NS_QUOTA: {
  689.           if (logVersion > -16) {
  690.             throw new IOException("Unexpected opcode " + opcode
  691.                 + " for version " + logVersion);
  692.           }
  693.           fsDir.unprotectedSetQuota(FSImage.readString(in), 
  694.                                     readLongWritable(in), 
  695.                                     FSConstants.QUOTA_DONT_SET);
  696.           break;
  697.         }
  698.         case OP_CLEAR_NS_QUOTA: {
  699.           if (logVersion > -16) {
  700.             throw new IOException("Unexpected opcode " + opcode
  701.                 + " for version " + logVersion);
  702.           }
  703.           fsDir.unprotectedSetQuota(FSImage.readString(in),
  704.                                     FSConstants.QUOTA_RESET,
  705.                                     FSConstants.QUOTA_DONT_SET);
  706.           break;
  707.         }
  708.         case OP_SET_QUOTA:
  709.           fsDir.unprotectedSetQuota(FSImage.readString(in),
  710.                                     readLongWritable(in),
  711.                                     readLongWritable(in));
  712.                                       
  713.           break;
  714.         case OP_TIMES: {
  715.           numOpTimes++;
  716.           int length = in.readInt();
  717.           if (length != 3) {
  718.             throw new IOException("Incorrect data format. " 
  719.                                   + "times operation.");
  720.           }
  721.           path = FSImage.readString(in);
  722.           mtime = readLong(in);
  723.           atime = readLong(in);
  724.           fsDir.unprotectedSetTimes(path, mtime, atime, true);
  725.           break;
  726.         }
  727.         default: {
  728.           throw new IOException("Never seen opcode " + opcode);
  729.         }
  730.         }
  731.       }
  732.     } finally {
  733.       in.close();
  734.     }
  735.     FSImage.LOG.info("Edits file " + edits.getName() 
  736.         + " of size " + edits.length() + " edits # " + numEdits 
  737.         + " loaded in " + (FSNamesystem.now()-startTime)/1000 + " seconds.");
  738.     if (FSImage.LOG.isDebugEnabled()) {
  739.       FSImage.LOG.debug("numOpAdd = " + numOpAdd + " numOpClose = " + numOpClose 
  740.           + " numOpDelete = " + numOpDelete + " numOpRename = " + numOpRename 
  741.           + " numOpSetRepl = " + numOpSetRepl + " numOpMkDir = " + numOpMkDir
  742.           + " numOpSetPerm = " + numOpSetPerm 
  743.           + " numOpSetOwner = " + numOpSetOwner
  744.           + " numOpSetGenStamp = " + numOpSetGenStamp 
  745.           + " numOpTimes = " + numOpTimes
  746.           + " numOpOther = " + numOpOther);
  747.     }
  748.     if (logVersion != FSConstants.LAYOUT_VERSION) // other version
  749.       numEdits++; // save this image asap
  750.     return numEdits;
  751.   }
  752.   // a place holder for reading a long
  753.   private static final LongWritable longWritable = new LongWritable();
  754.   /** Read an integer from an input stream */
  755.   private static long readLongWritable(DataInputStream in) throws IOException {
  756.     synchronized (longWritable) {
  757.       longWritable.readFields(in);
  758.       return longWritable.get();
  759.     }
  760.   }
  761.   
  762.   static short adjustReplication(short replication) {
  763.     FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
  764.     short minReplication = fsNamesys.getMinReplication();
  765.     if (replication<minReplication) {
  766.       replication = minReplication;
  767.     }
  768.     short maxReplication = fsNamesys.getMaxReplication();
  769.     if (replication>maxReplication) {
  770.       replication = maxReplication;
  771.     }
  772.     return replication;
  773.   }
  774.   /**
  775.    * Write an operation to the edit log. Do not sync to persistent
  776.    * store yet.
  777.    */
  778.   synchronized void logEdit(byte op, Writable ... writables) {
  779.     assert this.getNumEditStreams() > 0 : "no editlog streams";
  780.     long start = FSNamesystem.now();
  781.     for (int idx = 0; idx < editStreams.size(); idx++) {
  782.       EditLogOutputStream eStream = editStreams.get(idx);
  783.       try {
  784.         eStream.write(op, writables);
  785.       } catch (IOException ie) {
  786.         processIOError(idx);         
  787.         // processIOError will remove the idx's stream 
  788.         // from the editStreams collection, so we need to update idx
  789.         idx--; 
  790.       }
  791.     }
  792.     // get a new transactionId
  793.     txid++;
  794.     //
  795.     // record the transactionId when new data was written to the edits log
  796.     //
  797.     TransactionId id = myTransactionId.get();
  798.     id.txid = txid;
  799.     // update statistics
  800.     long end = FSNamesystem.now();
  801.     numTransactions++;
  802.     totalTimeTransactions += (end-start);
  803.     if (metrics != null) // Metrics is non-null only when used inside name node
  804.       metrics.transactions.inc((end-start));
  805.   }
  806.   //
  807.   // Sync all modifications done by this thread.
  808.   //
  809.   public void logSync() throws IOException {
  810.     ArrayList<EditLogOutputStream> errorStreams = null;
  811.     long syncStart = 0;
  812.     // Fetch the transactionId of this thread. 
  813.     long mytxid = myTransactionId.get().txid;
  814.     final int numEditStreams;
  815.     synchronized (this) {
  816.       numEditStreams = editStreams.size();
  817.       assert numEditStreams > 0 : "no editlog streams";
  818.       printStatistics(false);
  819.       // if somebody is already syncing, then wait
  820.       while (mytxid > synctxid && isSyncRunning) {
  821.         try {
  822.           wait(1000);
  823.         } catch (InterruptedException ie) { 
  824.         }
  825.       }
  826.       //
  827.       // If this transaction was already flushed, then nothing to do
  828.       //
  829.       if (mytxid <= synctxid) {
  830.         numTransactionsBatchedInSync++;
  831.         if (metrics != null) // Metrics is non-null only when used inside name node
  832.           metrics.transactionsBatchedInSync.inc();
  833.         return;
  834.       }
  835.    
  836.       // now, this thread will do the sync
  837.       syncStart = txid;
  838.       isSyncRunning = true;   
  839.       // swap buffers
  840.       for (int idx = 0; idx < numEditStreams; idx++) {
  841.         editStreams.get(idx).setReadyToFlush();
  842.       }
  843.     }
  844.     // do the sync
  845.     long start = FSNamesystem.now();
  846.     for (int idx = 0; idx < numEditStreams; idx++) {
  847.       EditLogOutputStream eStream = editStreams.get(idx);
  848.       try {
  849.         eStream.flush();
  850.       } catch (IOException ie) {
  851.         //
  852.         // remember the streams that encountered an error.
  853.         //
  854.         if (errorStreams == null) {
  855.           errorStreams = new ArrayList<EditLogOutputStream>(1);
  856.         }
  857.         errorStreams.add(eStream);
  858.         FSNamesystem.LOG.error("Unable to sync edit log. " +
  859.                                "Fatal Error.");
  860.       }
  861.     }
  862.     long elapsed = FSNamesystem.now() - start;
  863.     synchronized (this) {
  864.        processIOError(errorStreams);
  865.        synctxid = syncStart;
  866.        isSyncRunning = false;
  867.        this.notifyAll();
  868.     }
  869.     if (metrics != null) // Metrics is non-null only when used inside name node
  870.       metrics.syncs.inc(elapsed);
  871.   }
  872.   //
  873.   // print statistics every 1 minute.
  874.   //
  875.   private void printStatistics(boolean force) {
  876.     long now = FSNamesystem.now();
  877.     if (lastPrintTime + 60000 > now && !force) {
  878.       return;
  879.     }
  880.     if (editStreams == null) {
  881.       return;
  882.     }
  883.     lastPrintTime = now;
  884.     StringBuilder buf = new StringBuilder();
  885.     buf.append("Number of transactions: ");
  886.     buf.append(numTransactions);
  887.     buf.append(" Total time for transactions(ms): ");
  888.     buf.append(totalTimeTransactions);
  889.     buf.append("Number of transactions batched in Syncs: ");
  890.     buf.append(numTransactionsBatchedInSync);
  891.     buf.append(" Number of syncs: ");
  892.     buf.append(editStreams.get(0).getNumSync());
  893.     buf.append(" SyncTimes(ms): ");
  894.     int numEditStreams = editStreams.size();
  895.     for (int idx = 0; idx < numEditStreams; idx++) {
  896.       EditLogOutputStream eStream = editStreams.get(idx);
  897.       buf.append(eStream.getTotalSyncTime());
  898.       buf.append(" ");
  899.     }
  900.     FSNamesystem.LOG.info(buf);
  901.   }
  902.   /** 
  903.    * Add open lease record to edit log. 
  904.    * Records the block locations of the last block.
  905.    */
  906.   public void logOpenFile(String path, INodeFileUnderConstruction newNode) 
  907.                    throws IOException {
  908.     UTF8 nameReplicationPair[] = new UTF8[] { 
  909.       new UTF8(path), 
  910.       FSEditLog.toLogReplication(newNode.getReplication()),
  911.       FSEditLog.toLogLong(newNode.getModificationTime()),
  912.       FSEditLog.toLogLong(newNode.getAccessTime()),
  913.       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
  914.     logEdit(OP_ADD,
  915.             new ArrayWritable(UTF8.class, nameReplicationPair), 
  916.             new ArrayWritable(Block.class, newNode.getBlocks()),
  917.             newNode.getPermissionStatus(),
  918.             new UTF8(newNode.getClientName()),
  919.             new UTF8(newNode.getClientMachine()));
  920.   }
  921.   /** 
  922.    * Add close lease record to edit log.
  923.    */
  924.   public void logCloseFile(String path, INodeFile newNode) {
  925.     UTF8 nameReplicationPair[] = new UTF8[] {
  926.       new UTF8(path),
  927.       FSEditLog.toLogReplication(newNode.getReplication()),
  928.       FSEditLog.toLogLong(newNode.getModificationTime()),
  929.       FSEditLog.toLogLong(newNode.getAccessTime()),
  930.       FSEditLog.toLogLong(newNode.getPreferredBlockSize())};
  931.     logEdit(OP_CLOSE,
  932.             new ArrayWritable(UTF8.class, nameReplicationPair),
  933.             new ArrayWritable(Block.class, newNode.getBlocks()),
  934.             newNode.getPermissionStatus());
  935.   }
  936.   
  937.   /** 
  938.    * Add create directory record to edit log
  939.    */
  940.   public void logMkDir(String path, INode newNode) {
  941.     UTF8 info[] = new UTF8[] {
  942.       new UTF8(path),
  943.       FSEditLog.toLogLong(newNode.getModificationTime()),
  944.       FSEditLog.toLogLong(newNode.getAccessTime())
  945.     };
  946.     logEdit(OP_MKDIR, new ArrayWritable(UTF8.class, info),
  947.         newNode.getPermissionStatus());
  948.   }
  949.   
  950.   /** 
  951.    * Add rename record to edit log
  952.    * TODO: use String parameters until just before writing to disk
  953.    */
  954.   void logRename(String src, String dst, long timestamp) {
  955.     UTF8 info[] = new UTF8[] { 
  956.       new UTF8(src),
  957.       new UTF8(dst),
  958.       FSEditLog.toLogLong(timestamp)};
  959.     logEdit(OP_RENAME, new ArrayWritable(UTF8.class, info));
  960.   }
  961.   
  962.   /** 
  963.    * Add set replication record to edit log
  964.    */
  965.   void logSetReplication(String src, short replication) {
  966.     logEdit(OP_SET_REPLICATION, 
  967.             new UTF8(src), 
  968.             FSEditLog.toLogReplication(replication));
  969.   }
  970.   
  971.   /** Add set namespace quota record to edit log
  972.    * 
  973.    * @param src the string representation of the path to a directory
  974.    * @param quota the directory size limit
  975.    */
  976.   void logSetQuota(String src, long nsQuota, long dsQuota) {
  977.     logEdit(OP_SET_QUOTA, new UTF8(src), 
  978.             new LongWritable(nsQuota), new LongWritable(dsQuota));
  979.   }
  980.   /**  Add set permissions record to edit log */
  981.   void logSetPermissions(String src, FsPermission permissions) {
  982.     logEdit(OP_SET_PERMISSIONS, new UTF8(src), permissions);
  983.   }
  984.   /**  Add set owner record to edit log */
  985.   void logSetOwner(String src, String username, String groupname) {
  986.     UTF8 u = new UTF8(username == null? "": username);
  987.     UTF8 g = new UTF8(groupname == null? "": groupname);
  988.     logEdit(OP_SET_OWNER, new UTF8(src), u, g);
  989.   }
  990.   /** 
  991.    * Add delete file record to edit log
  992.    */
  993.   void logDelete(String src, long timestamp) {
  994.     UTF8 info[] = new UTF8[] { 
  995.       new UTF8(src),
  996.       FSEditLog.toLogLong(timestamp)};
  997.     logEdit(OP_DELETE, new ArrayWritable(UTF8.class, info));
  998.   }
  999.   /** 
  1000.    * Add generation stamp record to edit log
  1001.    */
  1002.   void logGenerationStamp(long genstamp) {
  1003.     logEdit(OP_SET_GENSTAMP, new LongWritable(genstamp));
  1004.   }
  1005.   /** 
  1006.    * Add access time record to edit log
  1007.    */
  1008.   void logTimes(String src, long mtime, long atime) {
  1009.     UTF8 info[] = new UTF8[] { 
  1010.       new UTF8(src),
  1011.       FSEditLog.toLogLong(mtime),
  1012.       FSEditLog.toLogLong(atime)};
  1013.     logEdit(OP_TIMES, new ArrayWritable(UTF8.class, info));
  1014.   }
  1015.   
  1016.   static private UTF8 toLogReplication(short replication) {
  1017.     return new UTF8(Short.toString(replication));
  1018.   }
  1019.   
  1020.   static private UTF8 toLogLong(long timestamp) {
  1021.     return new UTF8(Long.toString(timestamp));
  1022.   }
  1023.   /**
  1024.    * Return the size of the current EditLog
  1025.    */
  1026.   synchronized long getEditLogSize() throws IOException {
  1027.     assert(getNumStorageDirs() == editStreams.size());
  1028.     long size = 0;
  1029.     for (int idx = 0; idx < editStreams.size(); idx++) {
  1030.       long curSize = editStreams.get(idx).length();
  1031.       assert (size == 0 || size == curSize) : "All streams must be the same";
  1032.       size = curSize;
  1033.     }
  1034.     return size;
  1035.   }
  1036.   /**
  1037.    * Closes the current edit log and opens edits.new. 
  1038.    * Returns the lastModified time of the edits log.
  1039.    */
  1040.   synchronized void rollEditLog() throws IOException {
  1041.     //
  1042.     // If edits.new already exists in some directory, verify it
  1043.     // exists in all directories.
  1044.     //
  1045.     if (existsNew()) {
  1046.       for (Iterator<StorageDirectory> it = 
  1047.                fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  1048.         File editsNew = getEditNewFile(it.next());
  1049.      if (!editsNew.exists()) { 
  1050.           throw new IOException("Inconsistent existance of edits.new " +
  1051.                                 editsNew);
  1052.         }
  1053.       }
  1054.       return; // nothing to do, edits.new exists!
  1055.     }
  1056.     close();                     // close existing edit log
  1057.     //
  1058.     // Open edits.new
  1059.     //
  1060.     for (Iterator<StorageDirectory> it = 
  1061.            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  1062.       StorageDirectory sd = it.next();
  1063.       try {
  1064.         EditLogFileOutputStream eStream = 
  1065.              new EditLogFileOutputStream(getEditNewFile(sd));
  1066.         eStream.create();
  1067.         editStreams.add(eStream);
  1068.       } catch (IOException e) {
  1069.         // remove stream and this storage directory from list
  1070.         processIOError(sd);
  1071.        it.remove();
  1072.       }
  1073.     }
  1074.   }
  1075.   /**
  1076.    * Removes the old edit log and renamed edits.new as edits.
  1077.    * Reopens the edits file.
  1078.    */
  1079.   synchronized void purgeEditLog() throws IOException {
  1080.     //
  1081.     // If edits.new does not exists, then return error.
  1082.     //
  1083.     if (!existsNew()) {
  1084.       throw new IOException("Attempt to purge edit log " +
  1085.                             "but edits.new does not exist.");
  1086.     }
  1087.     close();
  1088.     //
  1089.     // Delete edits and rename edits.new to edits.
  1090.     //
  1091.     for (Iterator<StorageDirectory> it = 
  1092.            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();) {
  1093.       StorageDirectory sd = it.next();
  1094.       if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
  1095.         //
  1096.         // renameTo() fails on Windows if the destination
  1097.         // file exists.
  1098.         //
  1099.         getEditFile(sd).delete();
  1100.         if (!getEditNewFile(sd).renameTo(getEditFile(sd))) {
  1101.           // Should we also remove from edits
  1102.           it.remove(); 
  1103.         }
  1104.       }
  1105.     }
  1106.     //
  1107.     // Reopen all the edits logs.
  1108.     //
  1109.     open();
  1110.   }
  1111.   /**
  1112.    * Return the name of the edit file
  1113.    */
  1114.   synchronized File getFsEditName() throws IOException {
  1115.     StorageDirectory sd = null;
  1116.     for (Iterator<StorageDirectory> it = 
  1117.            fsimage.dirIterator(NameNodeDirType.EDITS); it.hasNext();)
  1118.       sd = it.next();
  1119.     return getEditFile(sd);
  1120.   }
  1121.   /**
  1122.    * Returns the timestamp of the edit log
  1123.    */
  1124.   synchronized long getFsEditTime() {
  1125.     Iterator<StorageDirectory> it = fsimage.dirIterator(NameNodeDirType.EDITS);
  1126.     if(it.hasNext())
  1127.       return getEditFile(it.next()).lastModified();
  1128.     return 0;
  1129.   }
  1130.   // sets the initial capacity of the flush buffer.
  1131.   static void setBufferCapacity(int size) {
  1132.     sizeFlushBuffer = size;
  1133.   }
  1134.   /**
  1135.    * A class to read in blocks stored in the old format. The only two
  1136.    * fields in the block were blockid and length.
  1137.    */
  1138.   static class BlockTwo implements Writable {
  1139.     long blkid;
  1140.     long len;
  1141.     static {                                      // register a ctor
  1142.       WritableFactories.setFactory
  1143.         (BlockTwo.class,
  1144.          new WritableFactory() {
  1145.            public Writable newInstance() { return new BlockTwo(); }
  1146.          });
  1147.     }
  1148.     BlockTwo() {
  1149.       blkid = 0;
  1150.       len = 0;
  1151.     }
  1152.     /////////////////////////////////////
  1153.     // Writable
  1154.     /////////////////////////////////////
  1155.     public void write(DataOutput out) throws IOException {
  1156.       out.writeLong(blkid);
  1157.       out.writeLong(len);
  1158.     }
  1159.     public void readFields(DataInput in) throws IOException {
  1160.       this.blkid = in.readLong();
  1161.       this.len = in.readLong();
  1162.     }
  1163.   }
  1164.   /** This method is defined for compatibility reason. */
  1165.   static private DatanodeDescriptor[] readDatanodeDescriptorArray(DataInput in
  1166.       ) throws IOException {
  1167.     DatanodeDescriptor[] locations = new DatanodeDescriptor[in.readInt()];
  1168.     for (int i = 0; i < locations.length; i++) {
  1169.       locations[i] = new DatanodeDescriptor();
  1170.       locations[i].readFieldsFromFSEditLog(in);
  1171.     }
  1172.     return locations;
  1173.   }
  1174.   static private short readShort(DataInputStream in) throws IOException {
  1175.     return Short.parseShort(FSImage.readString(in));
  1176.   }
  1177.   static private long readLong(DataInputStream in) throws IOException {
  1178.     return Long.parseLong(FSImage.readString(in));
  1179.   }
  1180.   static private Block[] readBlocks(DataInputStream in) throws IOException {
  1181.     int numBlocks = in.readInt();
  1182.     Block[] blocks = new Block[numBlocks];
  1183.     for (int i = 0; i < numBlocks; i++) {
  1184.       blocks[i] = new Block();
  1185.       blocks[i].readFields(in);
  1186.     }
  1187.     return blocks;
  1188.   }
  1189. }