NameNode.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:34k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.hdfs.server.namenode;
  19. import org.apache.commons.logging.*;
  20. import org.apache.hadoop.fs.ContentSummary;
  21. import org.apache.hadoop.fs.FileStatus;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.fs.Trash;
  24. import org.apache.hadoop.fs.FileSystem;
  25. import org.apache.hadoop.fs.permission.FsPermission;
  26. import org.apache.hadoop.fs.permission.PermissionStatus;
  27. import org.apache.hadoop.hdfs.HDFSPolicyProvider;
  28. import org.apache.hadoop.hdfs.protocol.*;
  29. import org.apache.hadoop.hdfs.server.common.HdfsConstants.StartupOption;
  30. import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
  31. import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
  32. import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.CompleteFileStatus;
  33. import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
  34. import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
  35. import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
  36. import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
  37. import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
  38. import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
  39. import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
  40. import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
  41. import org.apache.hadoop.http.HttpServer;
  42. import org.apache.hadoop.ipc.*;
  43. import org.apache.hadoop.conf.*;
  44. import org.apache.hadoop.util.ReflectionUtils;
  45. import org.apache.hadoop.util.StringUtils;
  46. import org.apache.hadoop.net.NetUtils;
  47. import org.apache.hadoop.net.NetworkTopology;
  48. import org.apache.hadoop.security.SecurityUtil;
  49. import org.apache.hadoop.security.UserGroupInformation;
  50. import org.apache.hadoop.security.authorize.AuthorizationException;
  51. import org.apache.hadoop.security.authorize.ConfiguredPolicy;
  52. import org.apache.hadoop.security.authorize.PolicyProvider;
  53. import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
  54. import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
  55. import java.io.*;
  56. import java.net.*;
  57. import java.util.Collection;
  58. import java.util.Iterator;
  59. /**********************************************************
  60.  * NameNode serves as both directory namespace manager and
  61.  * "inode table" for the Hadoop DFS.  There is a single NameNode
  62.  * running in any DFS deployment.  (Well, except when there
  63.  * is a second backup/failover NameNode.)
  64.  *
  65.  * The NameNode controls two critical tables:
  66.  *   1)  filename->blocksequence (namespace)
  67.  *   2)  block->machinelist ("inodes")
  68.  *
  69.  * The first table is stored on disk and is very precious.
  70.  * The second table is rebuilt every time the NameNode comes
  71.  * up.
  72.  *
  73.  * 'NameNode' refers to both this class as well as the 'NameNode server'.
  74.  * The 'FSNamesystem' class actually performs most of the filesystem
  75.  * management.  The majority of the 'NameNode' class itself is concerned
  76.  * with exposing the IPC interface and the http server to the outside world,
  77.  * plus some configuration management.
  78.  *
  79.  * NameNode implements the ClientProtocol interface, which allows
  80.  * clients to ask for DFS services.  ClientProtocol is not
  81.  * designed for direct use by authors of DFS client code.  End-users
  82.  * should instead use the org.apache.nutch.hadoop.fs.FileSystem class.
  83.  *
  84.  * NameNode also implements the DatanodeProtocol interface, used by
  85.  * DataNode programs that actually store DFS data blocks.  These
  86.  * methods are invoked repeatedly and automatically by all the
  87.  * DataNodes in a DFS deployment.
  88.  *
  89.  * NameNode also implements the NamenodeProtocol interface, used by
  90.  * secondary namenodes or rebalancing processes to get partial namenode's
  91.  * state, for example partial blocksMap etc.
  92.  **********************************************************/
  93. public class NameNode implements ClientProtocol, DatanodeProtocol,
  94.                                  NamenodeProtocol, FSConstants,
  95.                                  RefreshAuthorizationPolicyProtocol {
  96.   static{
  97.     Configuration.addDefaultResource("hdfs-default.xml");
  98.     Configuration.addDefaultResource("hdfs-site.xml");
  99.   }
  100.   
  101.   public long getProtocolVersion(String protocol, 
  102.                                  long clientVersion) throws IOException { 
  103.     if (protocol.equals(ClientProtocol.class.getName())) {
  104.       return ClientProtocol.versionID; 
  105.     } else if (protocol.equals(DatanodeProtocol.class.getName())){
  106.       return DatanodeProtocol.versionID;
  107.     } else if (protocol.equals(NamenodeProtocol.class.getName())){
  108.       return NamenodeProtocol.versionID;
  109.     } else if (protocol.equals(RefreshAuthorizationPolicyProtocol.class.getName())){
  110.       return RefreshAuthorizationPolicyProtocol.versionID;
  111.     } else {
  112.       throw new IOException("Unknown protocol to name node: " + protocol);
  113.     }
  114.   }
  115.     
  116.   public static final int DEFAULT_PORT = 8020;
  117.   public static final Log LOG = LogFactory.getLog(NameNode.class.getName());
  118.   public static final Log stateChangeLog = LogFactory.getLog("org.apache.hadoop.hdfs.StateChange");
  119.   public FSNamesystem namesystem; // TODO: This should private. Use getNamesystem() instead. 
  120.   /** RPC server */
  121.   private Server server;
  122.   /** RPC server address */
  123.   private InetSocketAddress serverAddress = null;
  124.   /** httpServer */
  125.   private HttpServer httpServer;
  126.   /** HTTP server address */
  127.   private InetSocketAddress httpAddress = null;
  128.   private Thread emptier;
  129.   /** only used for testing purposes  */
  130.   private boolean stopRequested = false;
  131.   /** Is service level authorization enabled? */
  132.   private boolean serviceAuthEnabled = false;
  133.   
  134.   /** Format a new filesystem.  Destroys any filesystem that may already
  135.    * exist at this location.  **/
  136.   public static void format(Configuration conf) throws IOException {
  137.     format(conf, false);
  138.   }
  139.   static NameNodeMetrics myMetrics;
  140.   public FSNamesystem getNamesystem() {
  141.     return namesystem;
  142.   }
  143.   public static NameNodeMetrics getNameNodeMetrics() {
  144.     return myMetrics;
  145.   }
  146.   
  147.   public static InetSocketAddress getAddress(String address) {
  148.     return NetUtils.createSocketAddr(address, DEFAULT_PORT);
  149.   }
  150.   public static InetSocketAddress getAddress(Configuration conf) {
  151.     return getAddress(FileSystem.getDefaultUri(conf).getAuthority());
  152.   }
  153.   public static URI getUri(InetSocketAddress namenode) {
  154.     int port = namenode.getPort();
  155.     String portString = port == DEFAULT_PORT ? "" : (":"+port);
  156.     return URI.create("hdfs://"+ namenode.getHostName()+portString);
  157.   }
  158.   /**
  159.    * Initialize name-node.
  160.    * 
  161.    * @param conf the configuration
  162.    */
  163.   private void initialize(Configuration conf) throws IOException {
  164.     InetSocketAddress socAddr = NameNode.getAddress(conf);
  165.     int handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
  166.     
  167.     // set service-level authorization security policy
  168.     if (serviceAuthEnabled = 
  169.           conf.getBoolean(
  170.             ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
  171.       PolicyProvider policyProvider = 
  172.         (PolicyProvider)(ReflectionUtils.newInstance(
  173.             conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
  174.                 HDFSPolicyProvider.class, PolicyProvider.class), 
  175.             conf));
  176.       SecurityUtil.setPolicy(new ConfiguredPolicy(conf, policyProvider));
  177.     }
  178.     // create rpc server 
  179.     this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
  180.                                 handlerCount, false, conf);
  181.     // The rpc-server port can be ephemeral... ensure we have the correct info
  182.     this.serverAddress = this.server.getListenerAddress(); 
  183.     FileSystem.setDefaultUri(conf, getUri(serverAddress));
  184.     LOG.info("Namenode up at: " + this.serverAddress);
  185.     myMetrics = new NameNodeMetrics(conf, this);
  186.     this.namesystem = new FSNamesystem(this, conf);
  187.     startHttpServer(conf);
  188.     this.server.start();  //start RPC server   
  189.     startTrashEmptier(conf);
  190.   }
  191.   private void startTrashEmptier(Configuration conf) throws IOException {
  192.     this.emptier = new Thread(new Trash(conf).getEmptier(), "Trash Emptier");
  193.     this.emptier.setDaemon(true);
  194.     this.emptier.start();
  195.   }
  196.   private void startHttpServer(Configuration conf) throws IOException {
  197.     String infoAddr = 
  198.       NetUtils.getServerAddress(conf, "dfs.info.bindAddress", 
  199.                                 "dfs.info.port", "dfs.http.address");
  200.     InetSocketAddress infoSocAddr = NetUtils.createSocketAddr(infoAddr);
  201.     String infoHost = infoSocAddr.getHostName();
  202.     int infoPort = infoSocAddr.getPort();
  203.     this.httpServer = new HttpServer("hdfs", infoHost, infoPort, 
  204.         infoPort == 0, conf);
  205.     if (conf.getBoolean("dfs.https.enable", false)) {
  206.       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
  207.       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
  208.           "dfs.https.address", infoHost + ":" + 0));
  209.       Configuration sslConf = new Configuration(false);
  210.       sslConf.addResource(conf.get("dfs.https.server.keystore.resource",
  211.           "ssl-server.xml"));
  212.       this.httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
  213.       // assume same ssl port for all datanodes
  214.       InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
  215.           "dfs.datanode.https.address", infoHost + ":" + 50475));
  216.       this.httpServer.setAttribute("datanode.https.port", datanodeSslPort
  217.           .getPort());
  218.     }
  219.     this.httpServer.setAttribute("name.node", this);
  220.     this.httpServer.setAttribute("name.node.address", getNameNodeAddress());
  221.     this.httpServer.setAttribute("name.system.image", getFSImage());
  222.     this.httpServer.setAttribute("name.conf", conf);
  223.     this.httpServer.addInternalServlet("fsck", "/fsck", FsckServlet.class);
  224.     this.httpServer.addInternalServlet("getimage", "/getimage", GetImageServlet.class);
  225.     this.httpServer.addInternalServlet("listPaths", "/listPaths/*", ListPathsServlet.class);
  226.     this.httpServer.addInternalServlet("data", "/data/*", FileDataServlet.class);
  227.     this.httpServer.addInternalServlet("checksum", "/fileChecksum/*",
  228.         FileChecksumServlets.RedirectServlet.class);
  229.     this.httpServer.start();
  230.     // The web-server port can be ephemeral... ensure we have the correct info
  231.     infoPort = this.httpServer.getPort();
  232.     this.httpAddress = new InetSocketAddress(infoHost, infoPort);
  233.     conf.set("dfs.http.address", infoHost + ":" + infoPort);
  234.     LOG.info("Web-server up at: " + infoHost + ":" + infoPort);
  235.   }
  236.   /**
  237.    * Start NameNode.
  238.    * <p>
  239.    * The name-node can be started with one of the following startup options:
  240.    * <ul> 
  241.    * <li>{@link StartupOption#REGULAR REGULAR} - normal name node startup</li>
  242.    * <li>{@link StartupOption#FORMAT FORMAT} - format name node</li>
  243.    * <li>{@link StartupOption#UPGRADE UPGRADE} - start the cluster  
  244.    * upgrade and create a snapshot of the current file system state</li> 
  245.    * <li>{@link StartupOption#ROLLBACK ROLLBACK} - roll the  
  246.    *            cluster back to the previous state</li>
  247.    * </ul>
  248.    * The option is passed via configuration field: 
  249.    * <tt>dfs.namenode.startup</tt>
  250.    * 
  251.    * The conf will be modified to reflect the actual ports on which 
  252.    * the NameNode is up and running if the user passes the port as
  253.    * <code>zero</code> in the conf.
  254.    * 
  255.    * @param conf  confirguration
  256.    * @throws IOException
  257.    */
  258.   public NameNode(Configuration conf) throws IOException {
  259.     try {
  260.       initialize(conf);
  261.     } catch (IOException e) {
  262.       this.stop();
  263.       throw e;
  264.     }
  265.   }
  266.   /**
  267.    * Wait for service to finish.
  268.    * (Normally, it runs forever.)
  269.    */
  270.   public void join() {
  271.     try {
  272.       this.server.join();
  273.     } catch (InterruptedException ie) {
  274.     }
  275.   }
  276.   /**
  277.    * Stop all NameNode threads and wait for all to finish.
  278.    */
  279.   public void stop() {
  280.     if (stopRequested)
  281.       return;
  282.     stopRequested = true;
  283.     try {
  284.       if (httpServer != null) httpServer.stop();
  285.     } catch (Exception e) {
  286.       LOG.error(StringUtils.stringifyException(e));
  287.     }
  288.     if(namesystem != null) namesystem.close();
  289.     if(emptier != null) emptier.interrupt();
  290.     if(server != null) server.stop();
  291.     if (myMetrics != null) {
  292.       myMetrics.shutdown();
  293.     }
  294.     if (namesystem != null) {
  295.       namesystem.shutdown();
  296.     }
  297.   }
  298.   
  299.   /////////////////////////////////////////////////////
  300.   // NamenodeProtocol
  301.   /////////////////////////////////////////////////////
  302.   /**
  303.    * return a list of blocks & their locations on <code>datanode</code> whose
  304.    * total size is <code>size</code>
  305.    * 
  306.    * @param datanode on which blocks are located
  307.    * @param size total size of blocks
  308.    */
  309.   public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
  310.   throws IOException {
  311.     if(size <= 0) {
  312.       throw new IllegalArgumentException(
  313.         "Unexpected not positive size: "+size);
  314.     }
  315.     return namesystem.getBlocks(datanode, size); 
  316.   }
  317.   
  318.   /////////////////////////////////////////////////////
  319.   // ClientProtocol
  320.   /////////////////////////////////////////////////////
  321.   /** {@inheritDoc} */
  322.   public LocatedBlocks   getBlockLocations(String src, 
  323.                                           long offset, 
  324.                                           long length) throws IOException {
  325.     myMetrics.numGetBlockLocations.inc();
  326.     return namesystem.getBlockLocations(getClientMachine(), 
  327.                                         src, offset, length);
  328.   }
  329.   
  330.   private static String getClientMachine() {
  331.     String clientMachine = Server.getRemoteAddress();
  332.     if (clientMachine == null) {
  333.       clientMachine = "";
  334.     }
  335.     return clientMachine;
  336.   }
  337.   /** {@inheritDoc} */
  338.   public void create(String src, 
  339.                      FsPermission masked,
  340.                              String clientName, 
  341.                              boolean overwrite,
  342.                              short replication,
  343.                              long blockSize
  344.                              ) throws IOException {
  345.     String clientMachine = getClientMachine();
  346.     if (stateChangeLog.isDebugEnabled()) {
  347.       stateChangeLog.debug("*DIR* NameNode.create: file "
  348.                          +src+" for "+clientName+" at "+clientMachine);
  349.     }
  350.     if (!checkPathLength(src)) {
  351.       throw new IOException("create: Pathname too long.  Limit " 
  352.                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
  353.     }
  354.     namesystem.startFile(src,
  355.         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
  356.             null, masked),
  357.         clientName, clientMachine, overwrite, replication, blockSize);
  358.     myMetrics.numFilesCreated.inc();
  359.     myMetrics.numCreateFileOps.inc();
  360.   }
  361.   /** {@inheritDoc} */
  362.   public LocatedBlock append(String src, String clientName) throws IOException {
  363.     String clientMachine = getClientMachine();
  364.     if (stateChangeLog.isDebugEnabled()) {
  365.       stateChangeLog.debug("*DIR* NameNode.append: file "
  366.           +src+" for "+clientName+" at "+clientMachine);
  367.     }
  368.     LocatedBlock info = namesystem.appendFile(src, clientName, clientMachine);
  369.     myMetrics.numFilesAppended.inc();
  370.     return info;
  371.   }
  372.   /** {@inheritDoc} */
  373.   public boolean setReplication(String src, 
  374.                                 short replication
  375.                                 ) throws IOException {
  376.     return namesystem.setReplication(src, replication);
  377.   }
  378.     
  379.   /** {@inheritDoc} */
  380.   public void setPermission(String src, FsPermission permissions
  381.       ) throws IOException {
  382.     namesystem.setPermission(src, permissions);
  383.   }
  384.   /** {@inheritDoc} */
  385.   public void setOwner(String src, String username, String groupname
  386.       ) throws IOException {
  387.     namesystem.setOwner(src, username, groupname);
  388.   }
  389.   /**
  390.    */
  391.   public LocatedBlock addBlock(String src, 
  392.                                String clientName) throws IOException {
  393.     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
  394.                          +src+" for "+clientName);
  395.     LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
  396.     if (locatedBlock != null)
  397.       myMetrics.numAddBlockOps.inc();
  398.     return locatedBlock;
  399.   }
  400.   /**
  401.    * The client needs to give up on the block.
  402.    */
  403.   public void abandonBlock(Block b, String src, String holder
  404.       ) throws IOException {
  405.     stateChangeLog.debug("*BLOCK* NameNode.abandonBlock: "
  406.                          +b+" of file "+src);
  407.     if (!namesystem.abandonBlock(b, src, holder)) {
  408.       throw new IOException("Cannot abandon block during write to " + src);
  409.     }
  410.   }
  411.   /** {@inheritDoc} */
  412.   public boolean complete(String src, String clientName) throws IOException {
  413.     stateChangeLog.debug("*DIR* NameNode.complete: " + src + " for " + clientName);
  414.     CompleteFileStatus returnCode = namesystem.completeFile(src, clientName);
  415.     if (returnCode == CompleteFileStatus.STILL_WAITING) {
  416.       return false;
  417.     } else if (returnCode == CompleteFileStatus.COMPLETE_SUCCESS) {
  418.       return true;
  419.     } else {
  420.       throw new IOException("Could not complete write to file " + src + " by " + clientName);
  421.     }
  422.   }
  423.   /**
  424.    * The client has detected an error on the specified located blocks 
  425.    * and is reporting them to the server.  For now, the namenode will 
  426.    * mark the block as corrupt.  In the future we might 
  427.    * check the blocks are actually corrupt. 
  428.    */
  429.   public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
  430.     stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
  431.     for (int i = 0; i < blocks.length; i++) {
  432.       Block blk = blocks[i].getBlock();
  433.       DatanodeInfo[] nodes = blocks[i].getLocations();
  434.       for (int j = 0; j < nodes.length; j++) {
  435.         DatanodeInfo dn = nodes[j];
  436.         namesystem.markBlockAsCorrupt(blk, dn);
  437.       }
  438.     }
  439.   }
  440.   /** {@inheritDoc} */
  441.   public long nextGenerationStamp(Block block) throws IOException{
  442.     return namesystem.nextGenerationStampForBlock(block);
  443.   }
  444.   /** {@inheritDoc} */
  445.   public void commitBlockSynchronization(Block block,
  446.       long newgenerationstamp, long newlength,
  447.       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
  448.       ) throws IOException {
  449.     namesystem.commitBlockSynchronization(block,
  450.         newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
  451.   }
  452.   
  453.   public long getPreferredBlockSize(String filename) throws IOException {
  454.     return namesystem.getPreferredBlockSize(filename);
  455.   }
  456.     
  457.   /**
  458.    */
  459.   public boolean rename(String src, String dst) throws IOException {
  460.     stateChangeLog.debug("*DIR* NameNode.rename: " + src + " to " + dst);
  461.     if (!checkPathLength(dst)) {
  462.       throw new IOException("rename: Pathname too long.  Limit " 
  463.                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
  464.     }
  465.     boolean ret = namesystem.renameTo(src, dst);
  466.     if (ret) {
  467.       myMetrics.numFilesRenamed.inc();
  468.     }
  469.     return ret;
  470.   }
  471.   /**
  472.    */
  473.   @Deprecated
  474.   public boolean delete(String src) throws IOException {
  475.     return delete(src, true);
  476.   }
  477.   /** {@inheritDoc} */
  478.   public boolean delete(String src, boolean recursive) throws IOException {
  479.     if (stateChangeLog.isDebugEnabled()) {
  480.       stateChangeLog.debug("*DIR* Namenode.delete: src=" + src
  481.           + ", recursive=" + recursive);
  482.     }
  483.     boolean ret = namesystem.delete(src, recursive);
  484.     if (ret) 
  485.       myMetrics.numDeleteFileOps.inc();
  486.     return ret;
  487.   }
  488.   /**
  489.    * Check path length does not exceed maximum.  Returns true if
  490.    * length and depth are okay.  Returns false if length is too long 
  491.    * or depth is too great.
  492.    * 
  493.    */
  494.   private boolean checkPathLength(String src) {
  495.     Path srcPath = new Path(src);
  496.     return (src.length() <= MAX_PATH_LENGTH &&
  497.             srcPath.depth() <= MAX_PATH_DEPTH);
  498.   }
  499.     
  500.   /** {@inheritDoc} */
  501.   public boolean mkdirs(String src, FsPermission masked) throws IOException {
  502.     stateChangeLog.debug("*DIR* NameNode.mkdirs: " + src);
  503.     if (!checkPathLength(src)) {
  504.       throw new IOException("mkdirs: Pathname too long.  Limit " 
  505.                             + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
  506.     }
  507.     return namesystem.mkdirs(src,
  508.         new PermissionStatus(UserGroupInformation.getCurrentUGI().getUserName(),
  509.             null, masked));
  510.   }
  511.   /**
  512.    */
  513.   public void renewLease(String clientName) throws IOException {
  514.     namesystem.renewLease(clientName);        
  515.   }
  516.   /**
  517.    */
  518.   public FileStatus[] getListing(String src) throws IOException {
  519.     FileStatus[] files = namesystem.getListing(src);
  520.     if (files != null) {
  521.       myMetrics.numGetListingOps.inc();
  522.     }
  523.     return files;
  524.   }
  525.   /**
  526.    * Get the file info for a specific file.
  527.    * @param src The string representation of the path to the file
  528.    * @throws IOException if permission to access file is denied by the system
  529.    * @return object containing information regarding the file
  530.    *         or null if file not found
  531.    */
  532.   public FileStatus getFileInfo(String src)  throws IOException {
  533.     return namesystem.getFileInfo(src);
  534.   }
  535.   /** @inheritDoc */
  536.   public long[] getStats() throws IOException {
  537.     return namesystem.getStats();
  538.   }
  539.   /**
  540.    */
  541.   public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
  542.   throws IOException {
  543.     DatanodeInfo results[] = namesystem.datanodeReport(type);
  544.     if (results == null ) {
  545.       throw new IOException("Cannot find datanode report");
  546.     }
  547.     return results;
  548.   }
  549.     
  550.   /**
  551.    * @inheritDoc
  552.    */
  553.   public boolean setSafeMode(SafeModeAction action) throws IOException {
  554.     return namesystem.setSafeMode(action);
  555.   }
  556.   /**
  557.    * Is the cluster currently in safe mode?
  558.    */
  559.   public boolean isInSafeMode() {
  560.     return namesystem.isInSafeMode();
  561.   }
  562.   /**
  563.    * @inheritDoc
  564.    */
  565.   public void saveNamespace() throws IOException {
  566.     namesystem.saveNamespace();
  567.   }
  568.   /**
  569.    * Refresh the list of datanodes that the namenode should allow to  
  570.    * connect.  Re-reads conf by creating new Configuration object and 
  571.    * uses the files list in the configuration to update the list. 
  572.    */
  573.   public void refreshNodes() throws IOException {
  574.     namesystem.refreshNodes(new Configuration());
  575.   }
  576.   /**
  577.    * Returns the size of the current edit log.
  578.    */
  579.   public long getEditLogSize() throws IOException {
  580.     return namesystem.getEditLogSize();
  581.   }
  582.   /**
  583.    * Roll the edit log.
  584.    */
  585.   public CheckpointSignature rollEditLog() throws IOException {
  586.     return namesystem.rollEditLog();
  587.   }
  588.   /**
  589.    * Roll the image 
  590.    */
  591.   public void rollFsImage() throws IOException {
  592.     namesystem.rollFSImage();
  593.   }
  594.     
  595.   public void finalizeUpgrade() throws IOException {
  596.     namesystem.finalizeUpgrade();
  597.   }
  598.   public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
  599.                                                         ) throws IOException {
  600.     return namesystem.distributedUpgradeProgress(action);
  601.   }
  602.   /**
  603.    * Dumps namenode state into specified file
  604.    */
  605.   public void metaSave(String filename) throws IOException {
  606.     namesystem.metaSave(filename);
  607.   }
  608.   /** {@inheritDoc} */
  609.   public ContentSummary getContentSummary(String path) throws IOException {
  610.     return namesystem.getContentSummary(path);
  611.   }
  612.   /** {@inheritDoc} */
  613.   public void setQuota(String path, long namespaceQuota, long diskspaceQuota) 
  614.                        throws IOException {
  615.     namesystem.setQuota(path, namespaceQuota, diskspaceQuota);
  616.   }
  617.   
  618.   /** {@inheritDoc} */
  619.   public void fsync(String src, String clientName) throws IOException {
  620.     namesystem.fsync(src, clientName);
  621.   }
  622.   /** @inheritDoc */
  623.   public void setTimes(String src, long mtime, long atime) throws IOException {
  624.     namesystem.setTimes(src, mtime, atime);
  625.   }
  626.   ////////////////////////////////////////////////////////////////
  627.   // DatanodeProtocol
  628.   ////////////////////////////////////////////////////////////////
  629.   /** 
  630.    */
  631.   public DatanodeRegistration register(DatanodeRegistration nodeReg
  632.                                        ) throws IOException {
  633.     verifyVersion(nodeReg.getVersion());
  634.     namesystem.registerDatanode(nodeReg);
  635.       
  636.     return nodeReg;
  637.   }
  638.   /**
  639.    * Data node notify the name node that it is alive 
  640.    * Return an array of block-oriented commands for the datanode to execute.
  641.    * This will be either a transfer or a delete operation.
  642.    */
  643.   public DatanodeCommand[] sendHeartbeat(DatanodeRegistration nodeReg,
  644.                                        long capacity,
  645.                                        long dfsUsed,
  646.                                        long remaining,
  647.                                        int xmitsInProgress,
  648.                                        int xceiverCount) throws IOException {
  649.     verifyRequest(nodeReg);
  650.     return namesystem.handleHeartbeat(nodeReg, capacity, dfsUsed, remaining,
  651.         xceiverCount, xmitsInProgress);
  652.   }
  653.   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
  654.                                      long[] blocks) throws IOException {
  655.     verifyRequest(nodeReg);
  656.     BlockListAsLongs blist = new BlockListAsLongs(blocks);
  657.     stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
  658.            +"from "+nodeReg.getName()+" "+blist.getNumberOfBlocks() +" blocks");
  659.     namesystem.processReport(nodeReg, blist);
  660.     if (getFSImage().isUpgradeFinalized())
  661.       return DatanodeCommand.FINALIZE;
  662.     return null;
  663.   }
  664.   public void blockReceived(DatanodeRegistration nodeReg, 
  665.                             Block blocks[],
  666.                             String delHints[]) throws IOException {
  667.     verifyRequest(nodeReg);
  668.     stateChangeLog.debug("*BLOCK* NameNode.blockReceived: "
  669.                          +"from "+nodeReg.getName()+" "+blocks.length+" blocks.");
  670.     for (int i = 0; i < blocks.length; i++) {
  671.       namesystem.blockReceived(nodeReg, blocks[i], delHints[i]);
  672.     }
  673.   }
  674.   /**
  675.    */
  676.   public void errorReport(DatanodeRegistration nodeReg,
  677.                           int errorCode, 
  678.                           String msg) throws IOException {
  679.     // Log error message from datanode
  680.     String dnName = (nodeReg == null ? "unknown DataNode" : nodeReg.getName());
  681.     LOG.info("Error report from " + dnName + ": " + msg);
  682.     if (errorCode == DatanodeProtocol.NOTIFY) {
  683.       return;
  684.     }
  685.     verifyRequest(nodeReg);
  686.     if (errorCode == DatanodeProtocol.DISK_ERROR) {
  687.       namesystem.removeDatanode(nodeReg);            
  688.     }
  689.   }
  690.     
  691.   public NamespaceInfo versionRequest() throws IOException {
  692.     return namesystem.getNamespaceInfo();
  693.   }
  694.   public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
  695.     return namesystem.processDistributedUpgradeCommand(comm);
  696.   }
  697.   /** 
  698.    * Verify request.
  699.    * 
  700.    * Verifies correctness of the datanode version, registration ID, and 
  701.    * if the datanode does not need to be shutdown.
  702.    * 
  703.    * @param nodeReg data node registration
  704.    * @throws IOException
  705.    */
  706.   public void verifyRequest(DatanodeRegistration nodeReg) throws IOException {
  707.     verifyVersion(nodeReg.getVersion());
  708.     if (!namesystem.getRegistrationID().equals(nodeReg.getRegistrationID()))
  709.       throw new UnregisteredDatanodeException(nodeReg);
  710.   }
  711.     
  712.   /**
  713.    * Verify version.
  714.    * 
  715.    * @param version
  716.    * @throws IOException
  717.    */
  718.   public void verifyVersion(int version) throws IOException {
  719.     if (version != LAYOUT_VERSION)
  720.       throw new IncorrectVersionException(version, "data node");
  721.   }
  722.   /**
  723.    * Returns the name of the fsImage file
  724.    */
  725.   public File getFsImageName() throws IOException {
  726.     return getFSImage().getFsImageName();
  727.   }
  728.     
  729.   public FSImage getFSImage() {
  730.     return namesystem.dir.fsImage;
  731.   }
  732.   /**
  733.    * Returns the name of the fsImage file uploaded by periodic
  734.    * checkpointing
  735.    */
  736.   public File[] getFsImageNameCheckpoint() throws IOException {
  737.     return getFSImage().getFsImageNameCheckpoint();
  738.   }
  739.   /**
  740.    * Returns the address on which the NameNodes is listening to.
  741.    * @return the address on which the NameNodes is listening to.
  742.    */
  743.   public InetSocketAddress getNameNodeAddress() {
  744.     return serverAddress;
  745.   }
  746.   /**
  747.    * Returns the address of the NameNodes http server, 
  748.    * which is used to access the name-node web UI.
  749.    * 
  750.    * @return the http address.
  751.    */
  752.   public InetSocketAddress getHttpAddress() {
  753.     return httpAddress;
  754.   }
  755.   NetworkTopology getNetworkTopology() {
  756.     return this.namesystem.clusterMap;
  757.   }
  758.   /**
  759.    * Verify that configured directories exist, then
  760.    * Interactively confirm that formatting is desired 
  761.    * for each existing directory and format them.
  762.    * 
  763.    * @param conf
  764.    * @param isConfirmationNeeded
  765.    * @return true if formatting was aborted, false otherwise
  766.    * @throws IOException
  767.    */
  768.   private static boolean format(Configuration conf,
  769.                                 boolean isConfirmationNeeded
  770.                                 ) throws IOException {
  771.     Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
  772.     Collection<File> editDirsToFormat = 
  773.                  FSNamesystem.getNamespaceEditsDirs(conf);
  774.     for(Iterator<File> it = dirsToFormat.iterator(); it.hasNext();) {
  775.       File curDir = it.next();
  776.       if (!curDir.exists())
  777.         continue;
  778.       if (isConfirmationNeeded) {
  779.         System.err.print("Re-format filesystem in " + curDir +" ? (Y or N) ");
  780.         if (!(System.in.read() == 'Y')) {
  781.           System.err.println("Format aborted in "+ curDir);
  782.           return true;
  783.         }
  784.         while(System.in.read() != 'n'); // discard the enter-key
  785.       }
  786.     }
  787.     FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
  788.                                          editDirsToFormat), conf);
  789.     nsys.dir.fsImage.format();
  790.     return false;
  791.   }
  792.   private static boolean finalize(Configuration conf,
  793.                                boolean isConfirmationNeeded
  794.                                ) throws IOException {
  795.     Collection<File> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
  796.     Collection<File> editDirsToFormat = 
  797.                                FSNamesystem.getNamespaceEditsDirs(conf);
  798.     FSNamesystem nsys = new FSNamesystem(new FSImage(dirsToFormat,
  799.                                          editDirsToFormat), conf);
  800.     System.err.print(
  801.         ""finalize" will remove the previous state of the files system.n"
  802.         + "Recent upgrade will become permanent.n"
  803.         + "Rollback option will not be available anymore.n");
  804.     if (isConfirmationNeeded) {
  805.       System.err.print("Finalize filesystem state ? (Y or N) ");
  806.       if (!(System.in.read() == 'Y')) {
  807.         System.err.println("Finalize aborted.");
  808.         return true;
  809.       }
  810.       while(System.in.read() != 'n'); // discard the enter-key
  811.     }
  812.     nsys.dir.fsImage.finalizeUpgrade();
  813.     return false;
  814.   }
  815.   @Override
  816.   public void refreshServiceAcl() throws IOException {
  817.     if (!serviceAuthEnabled) {
  818.       throw new AuthorizationException("Service Level Authorization not enabled!");
  819.     }
  820.     SecurityUtil.getPolicy().refresh();
  821.   }
  822.   private static void printUsage() {
  823.     System.err.println(
  824.       "Usage: java NameNode [" +
  825.       StartupOption.FORMAT.getName() + "] | [" +
  826.       StartupOption.UPGRADE.getName() + "] | [" +
  827.       StartupOption.ROLLBACK.getName() + "] | [" +
  828.       StartupOption.FINALIZE.getName() + "] | [" +
  829.       StartupOption.IMPORT.getName() + "]");
  830.   }
  831.   private static StartupOption parseArguments(String args[]) {
  832.     int argsLen = (args == null) ? 0 : args.length;
  833.     StartupOption startOpt = StartupOption.REGULAR;
  834.     for(int i=0; i < argsLen; i++) {
  835.       String cmd = args[i];
  836.       if (StartupOption.FORMAT.getName().equalsIgnoreCase(cmd)) {
  837.         startOpt = StartupOption.FORMAT;
  838.       } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
  839.         startOpt = StartupOption.REGULAR;
  840.       } else if (StartupOption.UPGRADE.getName().equalsIgnoreCase(cmd)) {
  841.         startOpt = StartupOption.UPGRADE;
  842.       } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
  843.         startOpt = StartupOption.ROLLBACK;
  844.       } else if (StartupOption.FINALIZE.getName().equalsIgnoreCase(cmd)) {
  845.         startOpt = StartupOption.FINALIZE;
  846.       } else if (StartupOption.IMPORT.getName().equalsIgnoreCase(cmd)) {
  847.         startOpt = StartupOption.IMPORT;
  848.       } else
  849.         return null;
  850.     }
  851.     return startOpt;
  852.   }
  853.   private static void setStartupOption(Configuration conf, StartupOption opt) {
  854.     conf.set("dfs.namenode.startup", opt.toString());
  855.   }
  856.   static StartupOption getStartupOption(Configuration conf) {
  857.     return StartupOption.valueOf(conf.get("dfs.namenode.startup",
  858.                                           StartupOption.REGULAR.toString()));
  859.   }
  860.   public static NameNode createNameNode(String argv[], 
  861.                                  Configuration conf) throws IOException {
  862.     if (conf == null)
  863.       conf = new Configuration();
  864.     StartupOption startOpt = parseArguments(argv);
  865.     if (startOpt == null) {
  866.       printUsage();
  867.       return null;
  868.     }
  869.     setStartupOption(conf, startOpt);
  870.     switch (startOpt) {
  871.       case FORMAT:
  872.         boolean aborted = format(conf, true);
  873.         System.exit(aborted ? 1 : 0);
  874.       case FINALIZE:
  875.         aborted = finalize(conf, true);
  876.         System.exit(aborted ? 1 : 0);
  877.       default:
  878.     }
  879.     NameNode namenode = new NameNode(conf);
  880.     return namenode;
  881.   }
  882.     
  883.   /**
  884.    */
  885.   public static void main(String argv[]) throws Exception {
  886.     try {
  887.       StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
  888.       NameNode namenode = createNameNode(argv, null);
  889.       if (namenode != null)
  890.         namenode.join();
  891.     } catch (Throwable e) {
  892.       LOG.error(StringUtils.stringifyException(e));
  893.       System.exit(-1);
  894.     }
  895.   }
  896. }