hod.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:29k
源码类别:

网格计算

开发平台:

Java

  1. #Licensed to the Apache Software Foundation (ASF) under one
  2. #or more contributor license agreements.  See the NOTICE file
  3. #distributed with this work for additional information
  4. #regarding copyright ownership.  The ASF licenses this file
  5. #to you under the Apache License, Version 2.0 (the
  6. #"License"); you may not use this file except in compliance
  7. #with the License.  You may obtain a copy of the License at
  8. #     http://www.apache.org/licenses/LICENSE-2.0
  9. #Unless required by applicable law or agreed to in writing, software
  10. #distributed under the License is distributed on an "AS IS" BASIS,
  11. #WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. #See the License for the specific language governing permissions and
  13. #limitations under the License.
  14. # -*- python -*-
  15. import sys, os, getpass, pprint, re, cPickle, random, shutil, time, errno
  16. import hodlib.Common.logger
  17. from hodlib.ServiceRegistry.serviceRegistry import svcrgy
  18. from hodlib.Common.xmlrpc import hodXRClient
  19. from hodlib.Common.util import to_http_url, get_exception_string
  20. from hodlib.Common.util import get_exception_error_string
  21. from hodlib.Common.util import hodInterrupt, HodInterruptException
  22. from hodlib.Common.util import HOD_INTERRUPTED_CODE
  23. from hodlib.Common.nodepoolutil import NodePoolUtil
  24. from hodlib.Hod.hadoop import hadoopCluster, hadoopScript
  25. CLUSTER_DATA_FILE = 'clusters'
  26. INVALID_STATE_FILE_MSGS = 
  27.               [
  28.                 "Requested operation cannot be performed. Cannot read %s: " + 
  29.                 "Permission denied.",
  30.                 "Requested operation cannot be performed. " + 
  31.                 "Cannot write to %s: Permission denied.",
  32.                 "Requested operation cannot be performed. " + 
  33.                 "Cannot read/write to %s: Permission denied.",
  34.                 "Cannot update %s: Permission denied. " + 
  35.                 "Cluster is deallocated, but info and list " + 
  36.                 "operations might show incorrect information.",
  37.               ]
  38. class hodState:
  39.   def __init__(self, store):
  40.     self.__store = store
  41.     self.__stateFile = None
  42.     self.__init_store()
  43.     self.__STORE_EXT = ".state"
  44.    
  45.   def __init_store(self):
  46.     if not os.path.exists(self.__store):
  47.       os.mkdir(self.__store)
  48.   
  49.   def __set_state_file(self, id=None):
  50.     if id:
  51.       self.__stateFile = os.path.join(self.__store, "%s%s" % (id, 
  52.                                       self.__STORE_EXT))
  53.     else:
  54.       for item in os.listdir(self.__store):
  55.         if item.endswith(self.__STORE_EXT):  
  56.           self.__stateFile = os.path.join(self.__store, item)          
  57.   def get_state_file(self):
  58.     return self.__stateFile
  59.           
  60.   def checkStateFile(self, id=None, modes=(os.R_OK,)):
  61.     # is state file exists/readable/writable/both?
  62.     self.__set_state_file(id)
  63.     # return true if file doesn't exist, because HOD CAN create
  64.     # state file and so WILL have permissions to read and/or write
  65.     try:
  66.       os.stat(self.__stateFile)
  67.     except OSError, err:
  68.       if err.errno == errno.ENOENT: # error 2 (no such file)
  69.         return True
  70.     # file exists
  71.     ret = True
  72.     for mode in modes:
  73.       ret = ret and os.access(self.__stateFile, mode)
  74.     return ret
  75.   def read(self, id=None):
  76.     info = {}
  77.     
  78.     self.__set_state_file(id)
  79.   
  80.     if self.__stateFile:
  81.       if os.path.isfile(self.__stateFile):
  82.         stateFile = open(self.__stateFile, 'r')
  83.         try:
  84.           info = cPickle.load(stateFile)
  85.         except EOFError:
  86.           pass
  87.         
  88.         stateFile.close()
  89.     
  90.     return info
  91.            
  92.   def write(self, id, info):
  93.     self.__set_state_file(id)
  94.     if not os.path.exists(self.__stateFile):
  95.       self.clear(id)
  96.  
  97.     stateFile = open(self.__stateFile, 'w')
  98.     cPickle.dump(info, stateFile)
  99.     stateFile.close()
  100.   
  101.   def clear(self, id=None):
  102.     self.__set_state_file(id)
  103.     if self.__stateFile and os.path.exists(self.__stateFile):
  104.       os.remove(self.__stateFile)
  105.     else:
  106.       for item in os.listdir(self.__store):
  107.         if item.endswith(self.__STORE_EXT):
  108.           os.remove(item)
  109.         
  110. class hodRunner:
  111.   def __init__(self, cfg, log=None, cluster=None):
  112.     self.__hodhelp = hodHelp()
  113.     self.__ops = self.__hodhelp.ops
  114.     self.__cfg = cfg  
  115.     self.__npd = self.__cfg['nodepooldesc']
  116.     self.__opCode = 0
  117.     self.__user = getpass.getuser()
  118.     self.__registry = None
  119.     self.__baseLogger = None
  120.     # Allowing to pass in log object to help testing - a stub can be passed in
  121.     if log is None:
  122.       self.__setup_logger()
  123.     else:
  124.       self.__log = log
  125.     
  126.     self.__userState = hodState(self.__cfg['hod']['user_state']) 
  127.     
  128.     self.__clusterState = None
  129.     self.__clusterStateInfo = { 'env' : None, 'hdfs' : None, 'mapred' : None }
  130.     
  131.     # Allowing to pass in log object to help testing - a stib can be passed in
  132.     if cluster is None:
  133.       self.__cluster = hadoopCluster(self.__cfg, self.__log)
  134.     else:
  135.       self.__cluster = cluster
  136.   
  137.   def __setup_logger(self):
  138.     self.__baseLogger = hodlib.Common.logger.hodLog('hod')
  139.     self.__log = self.__baseLogger.add_logger(self.__user )
  140.  
  141.     if self.__cfg['hod']['stream']:
  142.       self.__baseLogger.add_stream(level=self.__cfg['hod']['debug'], 
  143.                             addToLoggerNames=(self.__user ,))
  144.   
  145.     if self.__cfg['hod'].has_key('syslog-address'):
  146.       self.__baseLogger.add_syslog(self.__cfg['hod']['syslog-address'], 
  147.                                    level=self.__cfg['hod']['debug'], 
  148.                                    addToLoggerNames=(self.__user ,))
  149.   def get_logger(self):
  150.     return self.__log
  151.   def __setup_cluster_logger(self, directory):
  152.     self.__baseLogger.add_file(logDirectory=directory, level=4,
  153.                           backupCount=self.__cfg['hod']['log-rollover-count'],
  154.                           addToLoggerNames=(self.__user ,))
  155.   def __setup_cluster_state(self, directory):
  156.     self.__clusterState = hodState(directory)
  157.   def __norm_cluster_dir(self, directory):
  158.     directory = os.path.expanduser(directory)
  159.     if not os.path.isabs(directory):
  160.       directory = os.path.join(self.__cfg['hod']['original-dir'], directory)
  161.     directory = os.path.abspath(directory)
  162.     
  163.     return directory
  164.   
  165.   def __setup_service_registry(self):
  166.     cfg = self.__cfg['hod'].copy()
  167.     cfg['debug'] = 0
  168.     self.__registry = svcrgy(cfg, self.__log)
  169.     self.__registry.start()
  170.     self.__log.debug(self.__registry.getXMLRPCAddr())
  171.     self.__cfg['hod']['xrs-address'] = self.__registry.getXMLRPCAddr()
  172.     self.__cfg['ringmaster']['svcrgy-addr'] = self.__cfg['hod']['xrs-address']
  173.   def __set_cluster_state_info(self, env, hdfs, mapred, ring, jobid, min, max):
  174.     self.__clusterStateInfo['env'] = env
  175.     self.__clusterStateInfo['hdfs'] = "http://%s" % hdfs
  176.     self.__clusterStateInfo['mapred'] = "http://%s" % mapred
  177.     self.__clusterStateInfo['ring'] = ring
  178.     self.__clusterStateInfo['jobid'] = jobid
  179.     self.__clusterStateInfo['min'] = min
  180.     self.__clusterStateInfo['max'] = max
  181.     
  182.   def __set_user_state_info(self, info):
  183.     userState = self.__userState.read(CLUSTER_DATA_FILE)
  184.     for key in info.keys():
  185.       userState[key] = info[key]
  186.       
  187.     self.__userState.write(CLUSTER_DATA_FILE, userState)  
  188.   def __remove_cluster(self, clusterDir):
  189.     clusterInfo = self.__userState.read(CLUSTER_DATA_FILE)
  190.     if clusterDir in clusterInfo:
  191.       del(clusterInfo[clusterDir])
  192.       self.__userState.write(CLUSTER_DATA_FILE, clusterInfo)
  193.       
  194.   def __cleanup(self):
  195.     if self.__registry: self.__registry.stop()
  196.     
  197.   def __check_operation(self, operation):    
  198.     opList = operation.split()
  199.     
  200.     if not opList[0] in self.__ops:
  201.       self.__log.critical("Invalid hod operation specified: %s" % operation)
  202.       self._op_help(None)
  203.       self.__opCode = 2
  204.          
  205.     return opList 
  206.   
  207.   def __adjustMasterFailureCountConfig(self, nodeCount):
  208.     # This method adjusts the ringmaster.max-master-failures variable
  209.     # to a value that is bounded by the a function of the number of
  210.     # nodes.
  211.     maxFailures = self.__cfg['ringmaster']['max-master-failures']
  212.     # Count number of masters required - depends on which services
  213.     # are external
  214.     masters = 0
  215.     if not self.__cfg['gridservice-hdfs']['external']:
  216.       masters += 1
  217.     if not self.__cfg['gridservice-mapred']['external']:
  218.       masters += 1
  219.     # So, if there are n nodes and m masters, we look atleast for
  220.     # all masters to come up. Therefore, atleast m nodes should be
  221.     # good, which means a maximum of n-m master nodes can fail.
  222.     maxFailedNodes = nodeCount - masters
  223.     # The configured max number of failures is now bounded by this
  224.     # number.
  225.     self.__cfg['ringmaster']['max-master-failures'] = 
  226.                               min(maxFailures, maxFailedNodes)
  227.   def _op_allocate(self, args):
  228.     operation = "allocate"
  229.     argLength = len(args)
  230.     min = 0
  231.     max = 0
  232.     errorFlag = False
  233.     errorMsgs = []
  234.     if argLength == 3:
  235.       nodes = args[2]
  236.       clusterDir = self.__norm_cluster_dir(args[1])
  237.       if not os.path.exists(clusterDir):
  238.         try:
  239.           os.makedirs(clusterDir)
  240.         except OSError, err:
  241.           errorFlag = True
  242.           errorMsgs.append("Could not create cluster directory. %s" 
  243.                             % (str(err)))
  244.       elif not os.path.isdir(clusterDir):
  245.         errorFlag = True
  246.         errorMsgs.append( 
  247.                     "Invalid cluster directory (--hod.clusterdir or -d) : " + 
  248.                          clusterDir + " : Not a directory")
  249.         
  250.       if int(nodes) < 3 :
  251.         errorFlag = True
  252.         errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + 
  253.                          "Must be >= 3. Given nodes: %s" % nodes)
  254.       if errorFlag:
  255.         for msg in errorMsgs:
  256.           self.__log.critical(msg)
  257.         self.__opCode = 3
  258.         return
  259.       if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, 
  260.                                               (os.R_OK, os.W_OK)):
  261.         self.__log.critical(INVALID_STATE_FILE_MSGS[2] % 
  262.                          self.__userState.get_state_file())
  263.         self.__opCode = 1
  264.         return
  265.       clusterList = self.__userState.read(CLUSTER_DATA_FILE)
  266.       if clusterDir in clusterList.keys():
  267.         self.__setup_cluster_state(clusterDir)
  268.         clusterInfo = self.__clusterState.read()
  269.         # Check if the job is not running. Only then can we safely
  270.         # allocate another cluster. Otherwise the user would need
  271.         # to deallocate and free up resources himself.
  272.         if clusterInfo.has_key('jobid') and 
  273.             self.__cluster.is_cluster_deallocated(clusterInfo['jobid']):
  274.           self.__log.warn("Found a dead cluster at cluster directory '%s'. Deallocating it to allocate a new one." % (clusterDir))
  275.           self.__remove_cluster(clusterDir)
  276.           self.__clusterState.clear()
  277.         else:
  278.           self.__log.critical("Found a previously allocated cluster at cluster directory '%s'. HOD cannot determine if this cluster can be automatically deallocated. Deallocate the cluster if it is unused." % (clusterDir))
  279.           self.__opCode = 12
  280.           return
  281.  
  282.       self.__setup_cluster_logger(clusterDir)
  283.       (status, message) = self.__cluster.is_valid_account()
  284.       if status is not 0:
  285.         if message:
  286.           for line in message:
  287.             self.__log.critical("verify-account output: %s" % line)
  288.         self.__log.critical("Cluster cannot be allocated because account verification failed. " 
  289.                               + "verify-account returned exit code: %s." % status)
  290.         self.__opCode = 4
  291.         return
  292.       else:
  293.         self.__log.debug("verify-account returned zero exit code.")
  294.         if message:
  295.           self.__log.debug("verify-account output: %s" % message)
  296.       if re.match('d+-d+', nodes):
  297.         (min, max) = nodes.split("-")
  298.         min = int(min)
  299.         max = int(max)
  300.       else:
  301.         try:
  302.           nodes = int(nodes)
  303.           min = nodes
  304.           max = nodes
  305.         except ValueError:
  306.           print self.__hodhelp.help(operation)
  307.           self.__log.critical(
  308.           "%s operation requires a pos_int value for n(nodecount)." % 
  309.           operation)
  310.           self.__opCode = 3
  311.         else:
  312.           self.__setup_cluster_state(clusterDir)
  313.           clusterInfo = self.__clusterState.read()
  314.           self.__opCode = self.__cluster.check_cluster(clusterInfo)
  315.           if self.__opCode == 0 or self.__opCode == 15:
  316.             self.__setup_service_registry()   
  317.             if hodInterrupt.isSet(): 
  318.               self.__cleanup()
  319.               raise HodInterruptException()
  320.             self.__log.debug("Service Registry started.")
  321.             self.__adjustMasterFailureCountConfig(nodes)
  322.             
  323.             try:
  324.               allocateStatus = self.__cluster.allocate(clusterDir, min, max)    
  325.             except HodInterruptException, h:
  326.               self.__cleanup()
  327.               raise h
  328.             # Allocation has gone through.
  329.             # Don't care about interrupts any more
  330.             try:
  331.               if allocateStatus == 0:
  332.                 self.__set_cluster_state_info(os.environ, 
  333.                                               self.__cluster.hdfsInfo, 
  334.                                               self.__cluster.mapredInfo, 
  335.                                               self.__cluster.ringmasterXRS,
  336.                                               self.__cluster.jobId,
  337.                                               min, max)
  338.                 self.__setup_cluster_state(clusterDir)
  339.                 self.__clusterState.write(self.__cluster.jobId, 
  340.                                           self.__clusterStateInfo)
  341.                 #  Do we need to check for interrupts here ??
  342.   
  343.                 self.__set_user_state_info( 
  344.                   { clusterDir : self.__cluster.jobId, } )
  345.               self.__opCode = allocateStatus
  346.             except Exception, e:
  347.               # Some unknown problem.
  348.               self.__cleanup()
  349.               self.__cluster.deallocate(clusterDir, self.__clusterStateInfo)
  350.               self.__opCode = 1
  351.               raise Exception(e)
  352.           elif self.__opCode == 12:
  353.             self.__log.critical("Cluster %s already allocated." % clusterDir)
  354.           elif self.__opCode == 10:
  355.             self.__log.critical("deadt%st%s" % (clusterInfo['jobid'], 
  356.                                                   clusterDir))
  357.           elif self.__opCode == 13:
  358.             self.__log.warn("hdfs deadt%st%s" % (clusterInfo['jobid'], 
  359.                                                        clusterDir))
  360.           elif self.__opCode == 14:
  361.             self.__log.warn("mapred deadt%st%s" % (clusterInfo['jobid'], 
  362.                                                      clusterDir))   
  363.           
  364.           if self.__opCode > 0 and self.__opCode != 15:
  365.             self.__log.critical("Cannot allocate cluster %s" % clusterDir)
  366.     else:
  367.       print self.__hodhelp.help(operation)
  368.       self.__log.critical("%s operation requires two arguments. "  % operation
  369.                         + "A cluster directory and a nodecount.")
  370.       self.__opCode = 3
  371.  
  372.   def _is_cluster_allocated(self, clusterDir):
  373.     if os.path.isdir(clusterDir):
  374.       self.__setup_cluster_state(clusterDir)
  375.       clusterInfo = self.__clusterState.read()
  376.       if clusterInfo != {}:
  377.         return True
  378.     return False
  379.   def _op_deallocate(self, args):
  380.     operation = "deallocate"
  381.     argLength = len(args)
  382.     if argLength == 2:
  383.       clusterDir = self.__norm_cluster_dir(args[1])
  384.       if os.path.isdir(clusterDir):
  385.         self.__setup_cluster_state(clusterDir)
  386.         clusterInfo = self.__clusterState.read()
  387.         if clusterInfo == {}:
  388.           self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
  389.         else:
  390.           self.__opCode = 
  391.             self.__cluster.deallocate(clusterDir, clusterInfo)
  392.           # irrespective of whether deallocate failed or not
  393.           # remove the cluster state.
  394.           self.__clusterState.clear()
  395.           if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
  396.             self.__log.critical(INVALID_STATE_FILE_MSGS[3] % 
  397.                                self.__userState.get_state_file())
  398.             self.__opCode = 1
  399.             return
  400.           self.__remove_cluster(clusterDir)
  401.       else:
  402.         self.__handle_invalid_cluster_directory(clusterDir, cleanUp=True)
  403.     else:
  404.       print self.__hodhelp.help(operation)
  405.       self.__log.critical("%s operation requires one argument. "  % operation
  406.                         + "A cluster path.")
  407.       self.__opCode = 3
  408.             
  409.   def _op_list(self, args):
  410.     operation = 'list'
  411.     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
  412.     for path in clusterList.keys():
  413.       if not os.path.isdir(path):
  414.         self.__log.info("cluster state unknownt%st%s" % (clusterList[path], path))
  415.         continue
  416.       self.__setup_cluster_state(path)
  417.       clusterInfo = self.__clusterState.read()
  418.       if clusterInfo == {}:
  419.         # something wrong with the cluster directory.
  420.         self.__log.info("cluster state unknownt%st%s" % (clusterList[path], path))
  421.         continue
  422.       clusterStatus = self.__cluster.check_cluster(clusterInfo)
  423.       if clusterStatus == 12:
  424.         self.__log.info("alivet%st%s" % (clusterList[path], path))
  425.       elif clusterStatus == 10:
  426.         self.__log.info("deadt%st%s" % (clusterList[path], path))
  427.       elif clusterStatus == 13:
  428.         self.__log.info("hdfs deadt%st%s" % (clusterList[path], path))
  429.       elif clusterStatus == 14:
  430.         self.__log.info("mapred deadt%st%s" % (clusterList[path], path))    
  431.          
  432.   def _op_info(self, args):
  433.     operation = 'info'
  434.     argLength = len(args)  
  435.     if argLength == 2:
  436.       clusterDir = self.__norm_cluster_dir(args[1])
  437.       if os.path.isdir(clusterDir):
  438.         self.__setup_cluster_state(clusterDir)
  439.         clusterInfo = self.__clusterState.read()
  440.         if clusterInfo == {}:
  441.           # something wrong with the cluster directory.
  442.           self.__handle_invalid_cluster_directory(clusterDir)
  443.         else:
  444.           clusterStatus = self.__cluster.check_cluster(clusterInfo)
  445.           if clusterStatus == 12:
  446.             self.__print_cluster_info(clusterInfo)
  447.             self.__log.info("hadoop-site.xml at %s" % clusterDir)
  448.           elif clusterStatus == 10:
  449.             self.__log.critical("%s cluster is dead" % clusterDir)
  450.           elif clusterStatus == 13:
  451.             self.__log.warn("%s cluster hdfs is dead" % clusterDir)
  452.           elif clusterStatus == 14:
  453.             self.__log.warn("%s cluster mapred is dead" % clusterDir)
  454.           if clusterStatus != 12:
  455.             if clusterStatus == 15:
  456.               self.__log.critical("Cluster %s not allocated." % clusterDir)
  457.             else:
  458.               self.__print_cluster_info(clusterInfo)
  459.               self.__log.info("hadoop-site.xml at %s" % clusterDir)
  460.             
  461.             self.__opCode = clusterStatus
  462.       else:
  463.         self.__handle_invalid_cluster_directory(clusterDir)
  464.     else:
  465.       print self.__hodhelp.help(operation)
  466.       self.__log.critical("%s operation requires one argument. "  % operation
  467.                         + "A cluster path.")
  468.       self.__opCode = 3      
  469.   def __handle_invalid_cluster_directory(self, clusterDir, cleanUp=False):
  470.     if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
  471.       self.__log.critical(INVALID_STATE_FILE_MSGS[0] % 
  472.                            self.__userState.get_state_file())
  473.       self.__opCode = 1
  474.       return
  475.     clusterList = self.__userState.read(CLUSTER_DATA_FILE)
  476.     if clusterDir in clusterList.keys():
  477.       # previously allocated cluster.
  478.       self.__log.critical("Cannot find information for cluster with id '%s' in previously allocated cluster directory '%s'." % (clusterList[clusterDir], clusterDir))
  479.       if cleanUp:
  480.         self.__cluster.delete_job(clusterList[clusterDir])
  481.         self.__log.critical("Freeing resources allocated to the cluster.")
  482.         if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.W_OK,)):
  483.           self.__log.critical(INVALID_STATE_FILE_MSGS[1] % 
  484.                               self.__userState.get_state_file())
  485.           self.__opCode = 1
  486.           return
  487.         self.__remove_cluster(clusterDir)
  488.       self.__opCode = 3
  489.     else:
  490.       if not os.path.exists(clusterDir):
  491.         self.__log.critical(  
  492.                   "Invalid hod.clusterdir(--hod.clusterdir or -d). " + 
  493.                   clusterDir + " : No such directory")
  494.       elif not os.path.isdir(clusterDir):
  495.         self.__log.critical( 
  496.                   "Invalid hod.clusterdir(--hod.clusterdir or -d). " + 
  497.                   clusterDir + " : Not a directory")
  498.       else:
  499.         self.__log.critical( 
  500.                   "Invalid hod.clusterdir(--hod.clusterdir or -d). " + 
  501.                   clusterDir + " : Not tied to any allocated cluster.")
  502.       self.__opCode = 15
  503.     
  504.   def __print_cluster_info(self, clusterInfo):
  505.     keys = clusterInfo.keys()
  506.     _dict = { 
  507.               'jobid' : 'Cluster Id', 'min' : 'Nodecount',
  508.               'hdfs' : 'HDFS UI at' , 'mapred' : 'Mapred UI at'
  509.             }
  510.     for key in _dict.keys():
  511.       if clusterInfo.has_key(key):
  512.         self.__log.info("%s %s" % (_dict[key], clusterInfo[key]))
  513.     if clusterInfo.has_key('ring'):
  514.       self.__log.debug("%st%s" % ('Ringmaster at ', clusterInfo['ring']))
  515.     
  516.     if self.__cfg['hod']['debug'] == 4:
  517.       for var in clusterInfo['env'].keys():
  518.         self.__log.debug("%s = %s" % (var, clusterInfo['env'][var]))
  519.   def _op_help(self, arg):
  520.     if arg == None or arg.__len__() != 2:
  521.       print "hod commands:n"
  522.       for op in self.__ops:
  523.         print self.__hodhelp.help(op)
  524.     else:
  525.       if arg[1] not in self.__ops:
  526.         print self.__hodhelp.help('help')
  527.         self.__log.critical("Help requested for invalid operation : %s"%arg[1])
  528.         self.__opCode = 3
  529.       else: print self.__hodhelp.help(arg[1])
  530.   def operation(self):  
  531.     operation = self.__cfg['hod']['operation']
  532.     try:
  533.       opList = self.__check_operation(operation)
  534.       if self.__opCode == 0:
  535.         if not self.__userState.checkStateFile(CLUSTER_DATA_FILE, (os.R_OK,)):
  536.            self.__log.critical(INVALID_STATE_FILE_MSGS[0] % 
  537.                          self.__userState.get_state_file())
  538.            self.__opCode = 1
  539.            return self.__opCode
  540.         getattr(self, "_op_%s" % opList[0])(opList)
  541.     except HodInterruptException, h:
  542.       self.__log.critical("op: %s failed because of a process interrupt." 
  543.                                                                 % operation)
  544.       self.__opCode = HOD_INTERRUPTED_CODE
  545.     except:
  546.       self.__log.critical("op: %s failed: %s" % (operation,
  547.                           get_exception_error_string()))
  548.       self.__log.debug(get_exception_string())
  549.     
  550.     self.__cleanup()
  551.     
  552.     self.__log.debug("return code: %s" % self.__opCode)
  553.     
  554.     return self.__opCode
  555.   
  556.   def script(self):
  557.     errorFlag = False
  558.     errorMsgs = []
  559.     scriptRet = 0 # return from the script, if run
  560.     
  561.     script = self.__cfg['hod']['script']
  562.     nodes = self.__cfg['hod']['nodecount']
  563.     clusterDir = self.__cfg['hod']['clusterdir']
  564.     
  565.     if not os.path.exists(script):
  566.       errorFlag = True
  567.       errorMsgs.append("Invalid script file (--hod.script or -s) : " + 
  568.                        script + " : No such file")
  569.     elif not os.path.isfile(script):
  570.       errorFlag = True
  571.       errorMsgs.append("Invalid script file (--hod.script or -s) : " + 
  572.                        script + " : Not a file.")
  573.     else:
  574.       isExecutable = os.access(script, os.X_OK)
  575.       if not isExecutable:
  576.         errorFlag = True
  577.         errorMsgs.append("Invalid script file (--hod.script or -s) : " + 
  578.                          script + " : Not an executable.")
  579.     if not os.path.exists(clusterDir):
  580.       try:
  581.         os.makedirs(clusterDir)
  582.       except OSError, err:
  583.         errorFlag = True
  584.         errorMsgs.append("Could not create cluster directory. %s" % (str(err)))
  585.     elif not os.path.isdir(clusterDir):
  586.       errorFlag = True
  587.       errorMsgs.append( 
  588.                   "Invalid cluster directory (--hod.clusterdir or -d) : " + 
  589.                        clusterDir + " : Not a directory")
  590.     if int(self.__cfg['hod']['nodecount']) < 3 :
  591.       errorFlag = True
  592.       errorMsgs.append("Invalid nodecount (--hod.nodecount or -n) : " + 
  593.                        "Must be >= 3. Given nodes: %s" % nodes)
  594.     if errorFlag:
  595.       for msg in errorMsgs:
  596.         self.__log.critical(msg)
  597.       self.handle_script_exit_code(scriptRet, clusterDir)
  598.       sys.exit(3)
  599.     try:
  600.       self._op_allocate(('allocate', clusterDir, str(nodes)))
  601.       if self.__opCode == 0:
  602.         if self.__cfg['hod'].has_key('script-wait-time'):
  603.           time.sleep(self.__cfg['hod']['script-wait-time'])
  604.           self.__log.debug('Slept for %d time. Now going to run the script' % self.__cfg['hod']['script-wait-time'])
  605.         if hodInterrupt.isSet():
  606.           self.__log.debug('Hod interrupted - not executing script')
  607.         else:
  608.           scriptRunner = hadoopScript(clusterDir, 
  609.                                   self.__cfg['hod']['original-dir'])
  610.           self.__opCode = scriptRunner.run(script)
  611.           scriptRet = self.__opCode
  612.           self.__log.info("Exit code from running the script: %d" % self.__opCode)
  613.       else:
  614.         self.__log.critical("Error %d in allocating the cluster. Cannot run the script." % self.__opCode)
  615.       if hodInterrupt.isSet():
  616.         # Got interrupt while executing script. Unsetting it for deallocating
  617.         hodInterrupt.setFlag(False)
  618.       if self._is_cluster_allocated(clusterDir):
  619.         self._op_deallocate(('deallocate', clusterDir))
  620.     except HodInterruptException, h:
  621.       self.__log.critical("Script failed because of a process interrupt.")
  622.       self.__opCode = HOD_INTERRUPTED_CODE
  623.     except:
  624.       self.__log.critical("script: %s failed: %s" % (script,
  625.                           get_exception_error_string()))
  626.       self.__log.debug(get_exception_string())
  627.     
  628.     self.__cleanup()
  629.     self.handle_script_exit_code(scriptRet, clusterDir)
  630.     
  631.     return self.__opCode
  632.   def handle_script_exit_code(self, scriptRet, clusterDir):
  633.     # We want to give importance to a failed script's exit code, and write out exit code to a file separately
  634.     # so users can easily get it if required. This way they can differentiate between the script's exit code
  635.     # and hod's exit code.
  636.     if os.path.exists(clusterDir):
  637.       exit_code_file_name = (os.path.join(clusterDir, 'script.exitcode'))
  638.       if scriptRet != 0:
  639.         exit_code_file = open(exit_code_file_name, 'w')
  640.         print >>exit_code_file, scriptRet
  641.         exit_code_file.close()
  642.         self.__opCode = scriptRet
  643.       else:
  644.         #ensure script exit code file is not there:
  645.         if (os.path.exists(exit_code_file_name)):
  646.           os.remove(exit_code_file_name)
  647. class hodHelp:
  648.   def __init__(self):
  649.     self.ops = ['allocate', 'deallocate', 'info', 'list','script',  'help']
  650.     self.usage_strings = 
  651.       {
  652.         'allocate'   : 'hod allocate -d <clusterdir> -n <nodecount> [OPTIONS]',
  653.         'deallocate' : 'hod deallocate -d <clusterdir> [OPTIONS]',
  654.         'list'       : 'hod list [OPTIONS]',
  655.         'info'       : 'hod info -d <clusterdir> [OPTIONS]',
  656.         'script'     :
  657.               'hod script -d <clusterdir> -n <nodecount> -s <script> [OPTIONS]',
  658.         'help'       : 'hod help <OPERATION>',
  659.         }
  660.     self.description_strings = 
  661.       {
  662.        'allocate' : "Allocates a cluster of n nodes using the specified n" + 
  663.       "              cluster directory to store cluster state n" + 
  664.       "              information. The Hadoop site XML is also stored n" + 
  665.       "              in this location.n",
  666.        'deallocate' : "Deallocates a cluster using the specified n" + 
  667.       "             cluster directory.  This operation is also n" + 
  668.       "             required to clean up a dead cluster.n",
  669.        'list' : "List all clusters currently allocated by a user, n" + 
  670.       "              along with limited status information and the n" + 
  671.       "              cluster ID.n",
  672.        'info' : "Provide detailed information on an allocated cluster.n",
  673.        'script' : "Allocates a cluster of n nodes with the given n" +
  674.            "              cluster directory, runs the specified script n" + 
  675.            "              using the allocated cluster, and then n" + 
  676.            "              deallocates the cluster.n",
  677.  
  678.        'help' : "Print help for the operation and exit.n" + 
  679.                 "Available operations : %s.n" % self.ops,
  680.        }
  681.   def usage(self, op):
  682.     return "Usage       : " + self.usage_strings[op] + "n" + 
  683.            "For full description: hod help " + op + ".n"
  684.   def help(self, op=None):
  685.     if op is None:
  686.       return "hod <operation> [ARGS] [OPTIONS]n" + 
  687.              "Available operations : %sn" % self.ops + 
  688.              "For help on a particular operation : hod help <operation>.n" + 
  689.              "For all options : hod help options."
  690.     else:
  691.       return "Usage       : " + self.usage_strings[op] + "n" + 
  692.              "Description : " + self.description_strings[op] + 
  693.              "For all options : hod help options.n"