SecondaryNameNode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.fs.FileSystem;
  21. import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
  22. import org.apache.hadoop.hdfs.protocol.FSConstants;
  23. import org.apache.hadoop.hdfs.server.common.HdfsConstants;
  24. import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
  25. import org.apache.hadoop.ipc.*;
  26. import org.apache.hadoop.conf.*;
  27. import org.apache.hadoop.util.StringUtils;
  28. import org.apache.hadoop.util.Daemon;
  29. import org.apache.hadoop.http.HttpServer;
  30. import org.apache.hadoop.net.NetUtils;
  31. import java.io.*;
  32. import java.net.*;
  33. import java.util.ArrayList;
  34. import java.util.Collection;
  35. import java.util.Iterator;
  36. import org.apache.hadoop.metrics.jvm.JvmMetrics;
  37. /**********************************************************
  38.  * The Secondary NameNode is a helper to the primary NameNode.
  39.  * The Secondary is responsible for supporting periodic checkpoints 
  40.  * of the HDFS metadata. The current design allows only one Secondary
  41.  * NameNode per HDFs cluster.
  42.  *
  43.  * The Secondary NameNode is a daemon that periodically wakes
  44.  * up (determined by the schedule specified in the configuration),
  45.  * triggers a periodic checkpoint and then goes back to sleep.
  46.  * The Secondary NameNode uses the ClientProtocol to talk to the
  47.  * primary NameNode.
  48.  *
  49.  **********************************************************/
  50. public class SecondaryNameNode implements Runnable {
  51.     
  52.   public static final Log LOG = 
  53.     LogFactory.getLog(SecondaryNameNode.class.getName());
  54.   private String fsName;
  55.   private CheckpointStorage checkpointImage;
  56.   private NamenodeProtocol namenode;
  57.   private Configuration conf;
  58.   private InetSocketAddress nameNodeAddr;
  59.   private volatile boolean shouldRun;
  60.   private HttpServer infoServer;
  61.   private int infoPort;
  62.   private String infoBindAddress;
  63.   private Collection<File> checkpointDirs;
  64.   private Collection<File> checkpointEditsDirs;
  65.   private long checkpointPeriod; // in seconds
  66.   private long checkpointSize;    // size (in MB) of current Edit Log
  67.   /**
  68.    * Utility class to facilitate junit test error simulation.
  69.    */
  70.   static class ErrorSimulator {
  71.     private static boolean[] simulation = null; // error simulation events
  72.     static void initializeErrorSimulationEvent(int numberOfEvents) {
  73.       simulation = new boolean[numberOfEvents]; 
  74.       for (int i = 0; i < numberOfEvents; i++) {
  75.         simulation[i] = false;
  76.       }
  77.     }
  78.     
  79.     static boolean getErrorSimulation(int index) {
  80.       if(simulation == null)
  81.         return false;
  82.       assert(index < simulation.length);
  83.       return simulation[index];
  84.     }
  85.     
  86.     static void setErrorSimulation(int index) {
  87.       assert(index < simulation.length);
  88.       simulation[index] = true;
  89.     }
  90.     
  91.     static void clearErrorSimulation(int index) {
  92.       assert(index < simulation.length);
  93.       simulation[index] = false;
  94.     }
  95.   }
  96.   FSImage getFSImage() {
  97.     return checkpointImage;
  98.   }
  99.   /**
  100.    * Create a connection to the primary namenode.
  101.    */
  102.   public SecondaryNameNode(Configuration conf)  throws IOException {
  103.     try {
  104.       initialize(conf);
  105.     } catch(IOException e) {
  106.       shutdown();
  107.       throw e;
  108.     }
  109.   }
  110.   /**
  111.    * Initialize SecondaryNameNode.
  112.    */
  113.   private void initialize(Configuration conf) throws IOException {
  114.     // initiate Java VM metrics
  115.     JvmMetrics.init("SecondaryNameNode", conf.get("session.id"));
  116.     
  117.     // Create connection to the namenode.
  118.     shouldRun = true;
  119.     nameNodeAddr = NameNode.getAddress(conf);
  120.     this.conf = conf;
  121.     this.namenode =
  122.         (NamenodeProtocol) RPC.waitForProxy(NamenodeProtocol.class,
  123.             NamenodeProtocol.versionID, nameNodeAddr, conf);
  124.     // initialize checkpoint directories
  125.     fsName = getInfoServer();
  126.     checkpointDirs = FSImage.getCheckpointDirs(conf,
  127.                                   "/tmp/hadoop/dfs/namesecondary");
  128.     checkpointEditsDirs = FSImage.getCheckpointEditsDirs(conf, 
  129.                                   "/tmp/hadoop/dfs/namesecondary");    
  130.     checkpointImage = new CheckpointStorage();
  131.     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
  132.     // Initialize other scheduling parameters from the configuration
  133.     checkpointPeriod = conf.getLong("fs.checkpoint.period", 3600);
  134.     checkpointSize = conf.getLong("fs.checkpoint.size", 4194304);
  135.     // initialize the webserver for uploading files.
  136.     String infoAddr = 
  137.       NetUtils.getServerAddress(conf, 
  138.                                 "dfs.secondary.info.bindAddress",
  139.                                 "dfs.secondary.info.port",
  140.                                 "dfs.secondary.http.address");
  141.     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  142.     infoBindAddress = infoSocAddr.getHostName();
  143.     int tmpInfoPort = infoSocAddr.getPort();
  144.     infoServer = new HttpServer("secondary", infoBindAddress, tmpInfoPort,
  145.         tmpInfoPort == 0, conf);
  146.     infoServer.setAttribute("name.system.image", checkpointImage);
  147.     this.infoServer.setAttribute("name.conf", conf);
  148.     infoServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
  149.     infoServer.start();
  150.     // The web-server port can be ephemeral... ensure we have the correct info
  151.     infoPort = infoServer.getPort();
  152.     conf.set("dfs.secondary.http.address", infoBindAddress + ":" +infoPort); 
  153.     LOG.info("Secondary Web-server up at: " + infoBindAddress + ":" +infoPort);
  154.     LOG.warn("Checkpoint Period   :" + checkpointPeriod + " secs " +
  155.              "(" + checkpointPeriod/60 + " min)");
  156.     LOG.warn("Log Size Trigger    :" + checkpointSize + " bytes " +
  157.              "(" + checkpointSize/1024 + " KB)");
  158.   }
  159.   /**
  160.    * Shut down this instance of the datanode.
  161.    * Returns only after shutdown is complete.
  162.    */
  163.   public void shutdown() {
  164.     shouldRun = false;
  165.     try {
  166.       if (infoServer != null) infoServer.stop();
  167.     } catch (Exception e) {
  168.       LOG.warn("Exception shutting down SecondaryNameNode", e);
  169.     }
  170.     try {
  171.       if (checkpointImage != null) checkpointImage.close();
  172.     } catch(IOException e) {
  173.       LOG.warn(StringUtils.stringifyException(e));
  174.     }
  175.   }
  176.   //
  177.   // The main work loop
  178.   //
  179.   public void run() {
  180.     //
  181.     // Poll the Namenode (once every 5 minutes) to find the size of the
  182.     // pending edit log.
  183.     //
  184.     long period = 5 * 60;              // 5 minutes
  185.     long lastCheckpointTime = 0;
  186.     if (checkpointPeriod < period) {
  187.       period = checkpointPeriod;
  188.     }
  189.     while (shouldRun) {
  190.       try {
  191.         Thread.sleep(1000 * period);
  192.       } catch (InterruptedException ie) {
  193.         // do nothing
  194.       }
  195.       if (!shouldRun) {
  196.         break;
  197.       }
  198.       try {
  199.         long now = System.currentTimeMillis();
  200.         long size = namenode.getEditLogSize();
  201.         if (size >= checkpointSize || 
  202.             now >= lastCheckpointTime + 1000 * checkpointPeriod) {
  203.           doCheckpoint();
  204.           lastCheckpointTime = now;
  205.         }
  206.       } catch (IOException e) {
  207.         LOG.error("Exception in doCheckpoint: ");
  208.         LOG.error(StringUtils.stringifyException(e));
  209.         e.printStackTrace();
  210.       } catch (Throwable e) {
  211.         LOG.error("Throwable Exception in doCheckpoint: ");
  212.         LOG.error(StringUtils.stringifyException(e));
  213.         e.printStackTrace();
  214.         Runtime.getRuntime().exit(-1);
  215.       }
  216.     }
  217.   }
  218.   /**
  219.    * Download <code>fsimage</code> and <code>edits</code>
  220.    * files from the name-node.
  221.    * @throws IOException
  222.    */
  223.   private void downloadCheckpointFiles(CheckpointSignature sig
  224.                                       ) throws IOException {
  225.     
  226.     checkpointImage.cTime = sig.cTime;
  227.     checkpointImage.checkpointTime = sig.checkpointTime;
  228.     // get fsimage
  229.     String fileid = "getimage=1";
  230.     File[] srcNames = checkpointImage.getImageFiles();
  231.     assert srcNames.length > 0 : "No checkpoint targets.";
  232.     TransferFsImage.getFileClient(fsName, fileid, srcNames);
  233.     LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
  234.              srcNames[0].length() + " bytes.");
  235.     // get edits file
  236.     fileid = "getedit=1";
  237.     srcNames = checkpointImage.getEditsFiles();
  238.     assert srcNames.length > 0 : "No checkpoint targets.";
  239.     TransferFsImage.getFileClient(fsName, fileid, srcNames);
  240.     LOG.info("Downloaded file " + srcNames[0].getName() + " size " +
  241.         srcNames[0].length() + " bytes.");
  242.     checkpointImage.checkpointUploadDone();
  243.   }
  244.   /**
  245.    * Copy the new fsimage into the NameNode
  246.    */
  247.   private void putFSImage(CheckpointSignature sig) throws IOException {
  248.     String fileid = "putimage=1&port=" + infoPort +
  249.       "&machine=" +
  250.       InetAddress.getLocalHost().getHostAddress() +
  251.       "&token=" + sig.toString();
  252.     LOG.info("Posted URL " + fsName + fileid);
  253.     TransferFsImage.getFileClient(fsName, fileid, (File[])null);
  254.   }
  255.   /**
  256.    * Returns the Jetty server that the Namenode is listening on.
  257.    */
  258.   private String getInfoServer() throws IOException {
  259.     URI fsName = FileSystem.getDefaultUri(conf);
  260.     if (!"hdfs".equals(fsName.getScheme())) {
  261.       throw new IOException("This is not a DFS");
  262.     }
  263.     return NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
  264.                                      "dfs.info.port", "dfs.http.address");
  265.   }
  266.   /**
  267.    * Create a new checkpoint
  268.    */
  269.   void doCheckpoint() throws IOException {
  270.     // Do the required initialization of the merge work area.
  271.     startCheckpoint();
  272.     // Tell the namenode to start logging transactions in a new edit file
  273.     // Retuns a token that would be used to upload the merged image.
  274.     CheckpointSignature sig = (CheckpointSignature)namenode.rollEditLog();
  275.     // error simulation code for junit test
  276.     if (ErrorSimulator.getErrorSimulation(0)) {
  277.       throw new IOException("Simulating error0 " +
  278.                             "after creating edits.new");
  279.     }
  280.     downloadCheckpointFiles(sig);   // Fetch fsimage and edits
  281.     doMerge(sig);                   // Do the merge
  282.   
  283.     //
  284.     // Upload the new image into the NameNode. Then tell the Namenode
  285.     // to make this new uploaded image as the most current image.
  286.     //
  287.     putFSImage(sig);
  288.     // error simulation code for junit test
  289.     if (ErrorSimulator.getErrorSimulation(1)) {
  290.       throw new IOException("Simulating error1 " +
  291.                             "after uploading new image to NameNode");
  292.     }
  293.     namenode.rollFsImage();
  294.     checkpointImage.endCheckpoint();
  295.     LOG.warn("Checkpoint done. New Image Size: " 
  296.               + checkpointImage.getFsImageName().length());
  297.   }
  298.   private void startCheckpoint() throws IOException {
  299.     checkpointImage.unlockAll();
  300.     checkpointImage.getEditLog().close();
  301.     checkpointImage.recoverCreate(checkpointDirs, checkpointEditsDirs);
  302.     checkpointImage.startCheckpoint();
  303.   }
  304.   /**
  305.    * Merge downloaded image and edits and write the new image into
  306.    * current storage directory.
  307.    */
  308.   private void doMerge(CheckpointSignature sig) throws IOException {
  309.     FSNamesystem namesystem = 
  310.             new FSNamesystem(checkpointImage, conf);
  311.     assert namesystem.dir.fsImage == checkpointImage;
  312.     checkpointImage.doMerge(sig);
  313.   }
  314.   /**
  315.    * @param argv The parameters passed to this program.
  316.    * @exception Exception if the filesystem does not exist.
  317.    * @return 0 on success, non zero on error.
  318.    */
  319.   private int processArgs(String[] argv) throws Exception {
  320.     if (argv.length < 1) {
  321.       printUsage("");
  322.       return -1;
  323.     }
  324.     int exitCode = -1;
  325.     int i = 0;
  326.     String cmd = argv[i++];
  327.     //
  328.     // verify that we have enough command line parameters
  329.     //
  330.     if ("-geteditsize".equals(cmd)) {
  331.       if (argv.length != 1) {
  332.         printUsage(cmd);
  333.         return exitCode;
  334.       }
  335.     } else if ("-checkpoint".equals(cmd)) {
  336.       if (argv.length != 1 && argv.length != 2) {
  337.         printUsage(cmd);
  338.         return exitCode;
  339.       }
  340.       if (argv.length == 2 && !"force".equals(argv[i])) {
  341.         printUsage(cmd);
  342.         return exitCode;
  343.       }
  344.     }
  345.     exitCode = 0;
  346.     try {
  347.       if ("-checkpoint".equals(cmd)) {
  348.         long size = namenode.getEditLogSize();
  349.         if (size >= checkpointSize || 
  350.             argv.length == 2 && "force".equals(argv[i])) {
  351.           doCheckpoint();
  352.         } else {
  353.           System.err.println("EditLog size " + size + " bytes is " +
  354.                              "smaller than configured checkpoint " +
  355.                              "size " + checkpointSize + " bytes.");
  356.           System.err.println("Skipping checkpoint.");
  357.         }
  358.       } else if ("-geteditsize".equals(cmd)) {
  359.         long size = namenode.getEditLogSize();
  360.         System.out.println("EditLog size is " + size + " bytes");
  361.       } else {
  362.         exitCode = -1;
  363.         LOG.error(cmd.substring(1) + ": Unknown command");
  364.         printUsage("");
  365.       }
  366.     } catch (RemoteException e) {
  367.       //
  368.       // This is a error returned by hadoop server. Print
  369.       // out the first line of the error mesage, ignore the stack trace.
  370.       exitCode = -1;
  371.       try {
  372.         String[] content;
  373.         content = e.getLocalizedMessage().split("n");
  374.         LOG.error(cmd.substring(1) + ": "
  375.                   + content[0]);
  376.       } catch (Exception ex) {
  377.         LOG.error(cmd.substring(1) + ": "
  378.                   + ex.getLocalizedMessage());
  379.       }
  380.     } catch (IOException e) {
  381.       //
  382.       // IO exception encountered locally.
  383.       //
  384.       exitCode = -1;
  385.       LOG.error(cmd.substring(1) + ": "
  386.                 + e.getLocalizedMessage());
  387.     } finally {
  388.       // Does the RPC connection need to be closed?
  389.     }
  390.     return exitCode;
  391.   }
  392.   /**
  393.    * Displays format of commands.
  394.    * @param cmd The command that is being executed.
  395.    */
  396.   private void printUsage(String cmd) {
  397.     if ("-geteditsize".equals(cmd)) {
  398.       System.err.println("Usage: java SecondaryNameNode"
  399.                          + " [-geteditsize]");
  400.     } else if ("-checkpoint".equals(cmd)) {
  401.       System.err.println("Usage: java SecondaryNameNode"
  402.                          + " [-checkpoint [force]]");
  403.     } else {
  404.       System.err.println("Usage: java SecondaryNameNode " +
  405.                          "[-checkpoint [force]] " +
  406.                          "[-geteditsize] ");
  407.     }
  408.   }
  409.   /**
  410.    * main() has some simple utility methods.
  411.    * @param argv Command line parameters.
  412.    * @exception Exception if the filesystem does not exist.
  413.    */
  414.   public static void main(String[] argv) throws Exception {
  415.     StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
  416.     Configuration tconf = new Configuration();
  417.     if (argv.length >= 1) {
  418.       SecondaryNameNode secondary = new SecondaryNameNode(tconf);
  419.       int ret = secondary.processArgs(argv);
  420.       System.exit(ret);
  421.     }
  422.     // Create a never ending deamon
  423.     Daemon checkpointThread = new Daemon(new SecondaryNameNode(tconf)); 
  424.     checkpointThread.start();
  425.   }
  426.   static class CheckpointStorage extends FSImage {
  427.     /**
  428.      */
  429.     CheckpointStorage() throws IOException {
  430.       super();
  431.     }
  432.     @Override
  433.     public
  434.     boolean isConversionNeeded(StorageDirectory sd) {
  435.       return false;
  436.     }
  437.     /**
  438.      * Analyze checkpoint directories.
  439.      * Create directories if they do not exist.
  440.      * Recover from an unsuccessful checkpoint is necessary. 
  441.      * 
  442.      * @param dataDirs
  443.      * @param editsDirs
  444.      * @throws IOException
  445.      */
  446.     void recoverCreate(Collection<File> dataDirs,
  447.                        Collection<File> editsDirs) throws IOException {
  448.       Collection<File> tempDataDirs = new ArrayList<File>(dataDirs);
  449.       Collection<File> tempEditsDirs = new ArrayList<File>(editsDirs);
  450.       this.storageDirs = new ArrayList<StorageDirectory>();
  451.       setStorageDirectories(tempDataDirs, tempEditsDirs);
  452.       for (Iterator<StorageDirectory> it = 
  453.                    dirIterator(); it.hasNext();) {
  454.         StorageDirectory sd = it.next();
  455.         boolean isAccessible = true;
  456.         try { // create directories if don't exist yet
  457.           if(!sd.getRoot().mkdirs()) {
  458.             // do nothing, directory is already created
  459.           }
  460.         } catch(SecurityException se) {
  461.           isAccessible = false;
  462.         }
  463.         if(!isAccessible)
  464.           throw new InconsistentFSStateException(sd.getRoot(),
  465.               "cannot access checkpoint directory.");
  466.         StorageState curState;
  467.         try {
  468.           curState = sd.analyzeStorage(HdfsConstants.StartupOption.REGULAR);
  469.           // sd is locked but not opened
  470.           switch(curState) {
  471.           case NON_EXISTENT:
  472.             // fail if any of the configured checkpoint dirs are inaccessible 
  473.             throw new InconsistentFSStateException(sd.getRoot(),
  474.                   "checkpoint directory does not exist or is not accessible.");
  475.           case NOT_FORMATTED:
  476.             break;  // it's ok since initially there is no current and VERSION
  477.           case NORMAL:
  478.             break;
  479.           default:  // recovery is possible
  480.             sd.doRecover(curState);
  481.           }
  482.         } catch (IOException ioe) {
  483.           sd.unlock();
  484.           throw ioe;
  485.         }
  486.       }
  487.     }
  488.     /**
  489.      * Prepare directories for a new checkpoint.
  490.      * <p>
  491.      * Rename <code>current</code> to <code>lastcheckpoint.tmp</code>
  492.      * and recreate <code>current</code>.
  493.      * @throws IOException
  494.      */
  495.     void startCheckpoint() throws IOException {
  496.       for(StorageDirectory sd : storageDirs) {
  497.         File curDir = sd.getCurrentDir();
  498.         File tmpCkptDir = sd.getLastCheckpointTmp();
  499.         assert !tmpCkptDir.exists() : 
  500.           tmpCkptDir.getName() + " directory must not exist.";
  501.         if(curDir.exists()) {
  502.           // rename current to tmp
  503.           rename(curDir, tmpCkptDir);
  504.         }
  505.         if (!curDir.mkdir())
  506.           throw new IOException("Cannot create directory " + curDir);
  507.       }
  508.     }
  509.     void endCheckpoint() throws IOException {
  510.       for(StorageDirectory sd : storageDirs) {
  511.         File tmpCkptDir = sd.getLastCheckpointTmp();
  512.         File prevCkptDir = sd.getPreviousCheckpoint();
  513.         // delete previous dir
  514.         if (prevCkptDir.exists())
  515.           deleteDir(prevCkptDir);
  516.         // rename tmp to previous
  517.         if (tmpCkptDir.exists())
  518.           rename(tmpCkptDir, prevCkptDir);
  519.       }
  520.     }
  521.     /**
  522.      * Merge image and edits, and verify consistency with the signature.
  523.      */
  524.     private void doMerge(CheckpointSignature sig) throws IOException {
  525.       getEditLog().open();
  526.       StorageDirectory sdName = null;
  527.       StorageDirectory sdEdits = null;
  528.       Iterator<StorageDirectory> it = null;
  529.       it = dirIterator(NameNodeDirType.IMAGE);
  530.       if (it.hasNext())
  531.         sdName = it.next();
  532.       it = dirIterator(NameNodeDirType.EDITS);
  533.       if (it.hasNext())
  534.         sdEdits = it.next();
  535.       if ((sdName == null) || (sdEdits == null))
  536.         throw new IOException("Could not locate checkpoint directories");
  537.       loadFSImage(FSImage.getImageFile(sdName, NameNodeFile.IMAGE));
  538.       loadFSEdits(sdEdits);
  539.       sig.validateStorageInfo(this);
  540.       saveFSImage();
  541.     }
  542.   }
  543. }