mapred.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. """define MapReduce as subclass of Service"""
  15. # -*- python -*-
  16. import os, copy, time
  17. from service import *
  18. from hodlib.Hod.nodePool import *
  19. from hodlib.Common.desc import CommandDesc
  20. from hodlib.Common.util import get_exception_string, parseEquals
  21. class MapReduceExternal(MasterSlave):
  22.   """dummy proxy to external MapReduce instance"""
  23.   def __init__(self, serviceDesc, workDirs, version):
  24.     MasterSlave.__init__(self, serviceDesc, workDirs,None)
  25.     self.launchedMaster = True
  26.     self.masterInitialized = True
  27.     self.version = version
  28.     
  29.   def getMasterRequest(self):
  30.     return None
  31.   def getMasterCommands(self, serviceDict):
  32.     return []
  33.   def getAdminCommands(self, serviceDict):
  34.     return []
  35.   def getWorkerCommands(self, serviceDict):
  36.     return []
  37.   def getMasterAddrs(self):
  38.     attrs = self.serviceDesc.getfinalAttrs()
  39.     addr = attrs['mapred.job.tracker']
  40.     return [addr]
  41.   def needsMore(self):
  42.     return 0
  43.   def needsLess(self):
  44.     return 0
  45.   def setMasterParams(self, dict):
  46.     self.serviceDesc['final-attrs']['mapred.job.tracker'] = "%s:%s" % (dict['host'], 
  47.       dict['tracker_port'])
  48.     
  49.     if self.version < 16:
  50.       self.serviceDesc.dict['final-attrs']['mapred.job.tracker.info.port'] = 
  51.                                       str(self.serviceDesc.dict['info_port'])
  52.     else:
  53.       # After Hadoop-2185
  54.       self.serviceDesc['final-attrs']['mapred.job.tracker.http.address'] = 
  55.         "%s:%s" %(dict['host'], dict['info_port'])
  56.   def getInfoAddrs(self):
  57.     attrs = self.serviceDesc.getfinalAttrs()
  58.     if self.version < 16:
  59.       addr = attrs['mapred.job.tracker']
  60.       k,v = addr.split( ":")
  61.       infoaddr = k + ':' + attrs['mapred.job.tracker.info.port']
  62.     else:
  63.       # After Hadoop-2185
  64.       # Note: earlier,we never respected mapred.job.tracker.http.address
  65.       infoaddr = attrs['mapred.job.tracker.http.address']
  66.     return [infoaddr]
  67.   
  68. class MapReduce(MasterSlave):
  69.   def __init__(self, serviceDesc, workDirs,required_node, version,
  70.                 workers_per_ring = 1):
  71.     MasterSlave.__init__(self, serviceDesc, workDirs,required_node)
  72.     self.masterNode = None
  73.     self.masterAddr = None
  74.     self.infoAddr = None
  75.     self.workers = []
  76.     self.required_node = required_node
  77.     self.version = version
  78.     self.workers_per_ring = workers_per_ring
  79.   def isLaunchable(self, serviceDict):
  80.     hdfs = serviceDict['hdfs']
  81.     if (hdfs.isMasterInitialized()):
  82.       return True
  83.     return False
  84.   
  85.   def getMasterRequest(self):
  86.     req = NodeRequest(1, [], False)
  87.     return req
  88.   def getMasterCommands(self, serviceDict):
  89.     hdfs = serviceDict['hdfs']
  90.     cmdDesc = self._getJobTrackerCommand(hdfs)
  91.     return [cmdDesc]
  92.   def getAdminCommands(self, serviceDict):
  93.     return []
  94.   def getWorkerCommands(self, serviceDict):
  95.     hdfs = serviceDict['hdfs']
  96.     workerCmds = []
  97.     for id in range(1, self.workers_per_ring + 1):
  98.       workerCmds.append(self._getTaskTrackerCommand(str(id), hdfs))
  99.       
  100.     return workerCmds
  101.   def setMasterNodes(self, list):
  102.     node = list[0]
  103.     self.masterNode = node
  104.   def getMasterAddrs(self):
  105.     return [self.masterAddr]
  106.   def getInfoAddrs(self):
  107.     return [self.infoAddr]
  108.   def getWorkers(self):
  109.     return self.workers
  110.   def requiredNode(self):
  111.     return self.required_host
  112.   def setMasterParams(self, list):
  113.     dict = self._parseEquals(list)
  114.     self.masterAddr = dict['mapred.job.tracker']
  115.     k,v = self.masterAddr.split(":")
  116.     self.masterNode = k
  117.     if self.version < 16:
  118.       self.infoAddr = self.masterNode + ':' + dict['mapred.job.tracker.info.port']
  119.     else:
  120.       # After Hadoop-2185
  121.       self.infoAddr = dict['mapred.job.tracker.http.address']
  122.   
  123.   def _parseEquals(self, list):
  124.     return parseEquals(list)
  125.   def _setWorkDirs(self, workDirs, envs, attrs, parentDirs, subDir):
  126.     local = []
  127.     system = None
  128.     temp = None
  129.     hadooptmpdir = None
  130.     dfsclient = []
  131.     
  132.     for p in parentDirs:
  133.       workDirs.append(p)
  134.       workDirs.append(os.path.join(p, subDir))
  135.       dir = os.path.join(p, subDir, 'mapred-local')
  136.       local.append(dir)
  137.       if not system:
  138.         system = os.path.join(p, subDir, 'mapred-system')
  139.       if not temp:
  140.         temp = os.path.join(p, subDir, 'mapred-temp')
  141.       if not hadooptmpdir:
  142.         # Not used currently, generating hadooptmpdir just in case
  143.         hadooptmpdir = os.path.join(p, subDir, 'hadoop-tmp')
  144.       dfsclientdir = os.path.join(p, subDir, 'dfs-client')
  145.       dfsclient.append(dfsclientdir)
  146.       workDirs.append(dfsclientdir)
  147.     # FIXME!! use csv
  148.     attrs['mapred.local.dir'] = ','.join(local)
  149.     attrs['mapred.system.dir'] = 'fillindir'
  150.     attrs['mapred.temp.dir'] = temp
  151.     attrs['hadoop.tmp.dir'] = hadooptmpdir
  152.     envs['HADOOP_ROOT_LOGGER'] = "INFO,DRFA"
  153.   def _getJobTrackerCommand(self, hdfs):
  154.     sd = self.serviceDesc
  155.     parentDirs = self.workDirs
  156.     workDirs = []
  157.     attrs = sd.getfinalAttrs().copy()
  158.     envs = sd.getEnvs().copy()
  159.     if 'mapred.job.tracker' not in attrs:
  160.       attrs['mapred.job.tracker'] = 'fillinhostport'
  161.     if self.version < 16:
  162.       if 'mapred.job.tracker.info.port' not in attrs:
  163.         attrs['mapred.job.tracker.info.port'] = 'fillinport'
  164.     else:
  165.       # Addressing Hadoop-2185,
  166.       if 'mapred.job.tracker.http.address' not in attrs:
  167.         attrs['mapred.job.tracker.http.address'] = 'fillinhostport'
  168.     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
  169.     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-jt')
  170.     dict = { 'name' : 'jobtracker' }
  171.     dict['version'] = self.version
  172.     dict['program'] = os.path.join('bin', 'hadoop')
  173.     dict['argv'] = ['jobtracker']
  174.     dict['envs'] = envs
  175.     dict['pkgdirs'] = sd.getPkgDirs()
  176.     dict['workdirs'] = workDirs
  177.     dict['final-attrs'] = attrs
  178.     dict['attrs'] = sd.getAttrs()
  179.     cmd = CommandDesc(dict)
  180.     return cmd
  181.   def _getTaskTrackerCommand(self, id, hdfs):
  182.     sd = self.serviceDesc
  183.     parentDirs = self.workDirs
  184.     workDirs = []
  185.     attrs = sd.getfinalAttrs().copy()
  186.     envs = sd.getEnvs().copy()
  187.     jt = self.masterAddr
  188.     if jt == None:
  189.       raise ValueError, "Can't get job tracker address"
  190.     attrs['mapred.job.tracker'] = jt
  191.     attrs['fs.default.name'] = hdfs.getMasterAddrs()[0]
  192.     if self.version < 16:
  193.       if 'tasktracker.http.port' not in attrs:
  194.         attrs['tasktracker.http.port'] = 'fillinport'
  195.       # earlier to 16, tasktrackers always took ephemeral port 0 for
  196.       # tasktracker.report.bindAddress
  197.     else:
  198.       # Adding the following. Hadoop-2185
  199.       if 'mapred.task.tracker.report.address' not in attrs:
  200.         attrs['mapred.task.tracker.report.address'] = 'fillinhostport'
  201.       if 'mapred.task.tracker.http.address' not in attrs:
  202.         attrs['mapred.task.tracker.http.address'] = 'fillinhostport'
  203.     # unique parentDirs in case of multiple tasktrackers per hodring
  204.     pd = []
  205.     for dir in parentDirs:
  206.       dir = dir + "-" + id
  207.       pd.append(dir)
  208.     parentDirs = pd
  209.     # end of unique workdirs
  210.     self._setWorkDirs(workDirs, envs, attrs, parentDirs, 'mapred-tt')
  211.     dict = { 'name' : 'tasktracker' }
  212.     dict['program'] = os.path.join('bin', 'hadoop')
  213.     dict['argv'] = ['tasktracker']
  214.     dict['envs'] = envs
  215.     dict['pkgdirs'] = sd.getPkgDirs()
  216.     dict['workdirs'] = workDirs
  217.     dict['final-attrs'] = attrs
  218.     dict['attrs'] = sd.getAttrs()
  219.     cmd = CommandDesc(dict)
  220.     return cmd