hdfs.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define Hdfs as subclass of Service"""
  15. # -*- python -*-
  16. import os
  17. from service import *
  18. from hodlib.Hod.nodePool import *
  19. from hodlib.Common.desc import CommandDesc
  20. from hodlib.Common.util import get_exception_string, parseEquals
  21. class HdfsExternal(MasterSlave):
  22.   """dummy proxy to external HDFS instance"""
  23.   def __init__(self, serviceDesc, workDirs, version):
  24.     MasterSlave.__init__(self, serviceDesc, workDirs,None)
  25.     self.launchedMaster = True
  26.     self.masterInitialized = True
  27.     self.version = version
  28.     
  29.   def getMasterRequest(self):
  30.     return None
  31.   def getMasterCommands(self, serviceDict):
  32.     return []
  33.   def getAdminCommands(self, serviceDict):
  34.     return []
  35.   def getWorkerCommands(self, serviceDict):
  36.     return []
  37.   def getMasterAddrs(self):
  38.     attrs = self.serviceDesc.getfinalAttrs()
  39.     addr = attrs['fs.default.name']
  40.     return [addr]
  41.   
  42.   def setMasterParams(self, dict):
  43.    self.serviceDesc.dict['final-attrs']['fs.default.name'] = "%s:%s" % 
  44.      (dict['host'], dict['fs_port'])
  45.    if self.version < 16:
  46.     self.serviceDesc.dict['final-attrs']['dfs.info.port'] = 
  47.                                     str(self.serviceDesc.dict['info_port'])
  48.    else:
  49.      # After Hadoop-2185
  50.      self.serviceDesc.dict['final-attrs']['dfs.http.address'] = "%s:%s" % 
  51.        (dict['host'], dict['info_port'])
  52.   def getInfoAddrs(self):
  53.     attrs = self.serviceDesc.getfinalAttrs()
  54.     if self.version < 16:
  55.       addr = attrs['fs.default.name']
  56.       k,v = addr.split( ":")
  57.       infoaddr = k + ':' + attrs['dfs.info.port']
  58.     else:
  59.       # After Hadoop-2185
  60.       infoaddr = attrs['dfs.http.address']
  61.     return [infoaddr]
  62. class Hdfs(MasterSlave):
  63.   def __init__(self, serviceDesc, nodePool, required_node, version, 
  64.                                         format=True, upgrade=False,
  65.                                         workers_per_ring = 1):
  66.     MasterSlave.__init__(self, serviceDesc, nodePool, required_node)
  67.     self.masterNode = None
  68.     self.masterAddr = None
  69.     self.runAdminCommands = True
  70.     self.infoAddr = None
  71.     self._isLost = False
  72.     self.format = format
  73.     self.upgrade = upgrade
  74.     self.workers = []
  75.     self.version = version
  76.     self.workers_per_ring = workers_per_ring
  77.   def getMasterRequest(self):
  78.     req = NodeRequest(1, [], False)
  79.     return req
  80.   def getMasterCommands(self, serviceDict):
  81.     masterCommands = []
  82.     if self.format:
  83.       masterCommands.append(self._getNameNodeCommand(True))
  84.     if self.upgrade:
  85.       masterCommands.append(self._getNameNodeCommand(False, True))
  86.     else:
  87.       masterCommands.append(self._getNameNodeCommand(False))
  88.     return masterCommands
  89.   def getAdminCommands(self, serviceDict):
  90.     adminCommands = []
  91.     if self.upgrade and self.runAdminCommands:
  92.       adminCommands.append(self._getNameNodeAdminCommand('-safemode wait'))
  93.       adminCommands.append(self._getNameNodeAdminCommand('-finalizeUpgrade',
  94.                                                           True, True))
  95.     self.runAdminCommands = False
  96.     return adminCommands
  97.   def getWorkerCommands(self, serviceDict):
  98.     workerCmds = []
  99.     for id in range(1, self.workers_per_ring + 1):
  100.       workerCmds.append(self._getDataNodeCommand(str(id)))
  101.     return workerCmds
  102.   def setMasterNodes(self, list):
  103.     node = list[0]
  104.     self.masterNode = node
  105.     
  106.   def getMasterAddrs(self):
  107.     return [self.masterAddr]
  108.   def getInfoAddrs(self):
  109.     return [self.infoAddr]
  110.   def getWorkers(self):
  111.     return self.workers
  112.   def setMasterParams(self, list):
  113.     dict = self._parseEquals(list)
  114.     self.masterAddr = dict['fs.default.name']
  115.     k,v = self.masterAddr.split( ":")
  116.     self.masterNode = k
  117.     if self.version < 16:
  118.       self.infoAddr = self.masterNode + ':' + dict['dfs.info.port']
  119.     else:
  120.       # After Hadoop-2185
  121.       self.infoAddr = dict['dfs.http.address']
  122.    
  123.   def _parseEquals(self, list):
  124.     return parseEquals(list)
  125.   
  126.   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
  127.     namedir = None
  128.     hadooptmpdir = None
  129.     datadir = []
  130.     for p in parentDirs:
  131.       workDirs.append(p)
  132.       workDirs.append(os.path.join(p, subDir))
  133.       dir = os.path.join(p, subDir, 'dfs-data')
  134.       datadir.append(dir)
  135.       if not hadooptmpdir:
  136.         # Not used currently, generating hadooptmpdir just in case
  137.         hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
  138.       if not namedir:
  139.         namedir = os.path.join(p, subDir, 'dfs-name')
  140.     workDirs.append(namedir)
  141.     workDirs.extend(datadir)
  142.     # FIXME!! use csv
  143.     attrs['dfs.name.dir'] = namedir
  144.     attrs['hadoop.tmp.dir'] = hadooptmpdir
  145.     attrs['dfs.data.dir'] = ','.join(datadir)
  146.     envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
  147.   def _getNameNodeCommand(self, format=False, upgrade=False):
  148.     sd = self.serviceDesc
  149.     parentDirs = self.workDirs
  150.     workDirs = []
  151.     attrs = sd.getfinalAttrs().copy()
  152.     envs = sd.getEnvs().copy()
  153.     
  154.     if 'fs.default.name' not in attrs:
  155.       attrs['fs.default.name'] = 'fillinhostport'
  156.  
  157.     if self.version < 16:
  158.      if 'dfs.info.port' not in attrs:
  159.       attrs['dfs.info.port'] = 'fillinport'
  160.     else:
  161.       # Addressing Hadoop-2185, added the following. Earlier versions don't
  162.       # care about this
  163.       if 'dfs.http.address' not in attrs:
  164.         attrs['dfs.http.address'] = 'fillinhostport'
  165.     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
  166.     dict = { 'name' : 'namenode' }
  167.     dict['program'] = os.path.join('bin', 'hadoop')
  168.     argv = ['namenode']
  169.     if format:
  170.       argv.append('-format')
  171.     elif upgrade:
  172.       argv.append('-upgrade')
  173.     dict['argv'] = argv
  174.     dict['envs'] = envs
  175.     dict['pkgdirs'] = sd.getPkgDirs()
  176.     dict['workdirs'] = workDirs
  177.     dict['final-attrs'] = attrs
  178.     dict['attrs'] = sd.getAttrs()
  179.     if format:
  180.       dict['fg'] = 'true'
  181.       dict['stdin'] = 'Y'
  182.     cmd = CommandDesc(dict)
  183.     return cmd
  184.   def _getNameNodeAdminCommand(self, adminCommand, wait=True, ignoreFailures=False):
  185.     sd = self.serviceDesc
  186.     parentDirs = self.workDirs
  187.     workDirs = []
  188.     attrs = sd.getfinalAttrs().copy()
  189.     envs = sd.getEnvs().copy()
  190.     nn = self.masterAddr
  191.     if nn == None:
  192.       raise ValueError, "Can't get namenode address"
  193.     attrs['fs.default.name'] = nn
  194.     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-nn')
  195.     dict = { 'name' : 'dfsadmin' }
  196.     dict['program'] = os.path.join('bin', 'hadoop')
  197.     argv = ['dfsadmin']
  198.     argv.append(adminCommand)
  199.     dict['argv'] = argv
  200.     dict['envs'] = envs
  201.     dict['pkgdirs'] = sd.getPkgDirs()
  202.     dict['workdirs'] = workDirs
  203.     dict['final-attrs'] = attrs
  204.     dict['attrs'] = sd.getAttrs()
  205.     if wait:
  206.       dict['fg'] = 'true'
  207.       dict['stdin'] = 'Y'
  208.     if ignoreFailures:
  209.       dict['ignorefailures'] = 'Y'
  210.     cmd = CommandDesc(dict)
  211.     return cmd
  212.  
  213.   def _getDataNodeCommand(self, id):
  214.     sd = self.serviceDesc
  215.     parentDirs = self.workDirs
  216.     workDirs = []
  217.     attrs = sd.getfinalAttrs().copy()
  218.     envs = sd.getEnvs().copy()
  219.     nn = self.masterAddr
  220.     if nn == None:
  221.       raise ValueError, "Can't get namenode address"
  222.     attrs['fs.default.name'] = nn
  223.     if self.version < 16:
  224.       if 'dfs.datanode.port' not in attrs:
  225.         attrs['dfs.datanode.port'] = 'fillinport'
  226.       if 'dfs.datanode.info.port' not in attrs:
  227.         attrs['dfs.datanode.info.port'] = 'fillinport'
  228.     else:
  229.       # Adding the following. Hadoop-2185
  230.       if 'dfs.datanode.address' not in attrs:
  231.         attrs['dfs.datanode.address'] = 'fillinhostport'
  232.       if 'dfs.datanode.http.address' not in attrs:
  233.         attrs['dfs.datanode.http.address'] = 'fillinhostport'
  234.     
  235.     if self.version >= 18:
  236.       # After HADOOP-3283
  237.       # TODO: check for major as well as minor versions
  238.       attrs['dfs.datanode.ipc.address'] = 'fillinhostport'
  239.                     
  240.     # unique workdirs in case of multiple datanodes per hodring
  241.     pd = []
  242.     for dir in parentDirs:
  243.       dir = dir + "-" + id
  244.       pd.append(dir)
  245.     parentDirs = pd
  246.     # end of unique workdirs
  247.     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'hdfs-dn')
  248.     dict = { 'name' : 'datanode' }
  249.     dict['program'] = os.path.join('bin', 'hadoop')
  250.     dict['argv'] = ['datanode']
  251.     dict['envs'] = envs
  252.     dict['pkgdirs'] = sd.getPkgDirs()
  253.     dict['workdirs'] = workDirs
  254.     dict['final-attrs'] = attrs
  255.     dict['attrs'] = sd.getAttrs()
  256.  
  257.     cmd = CommandDesc(dict)
  258.     return cmd