hdfs.py
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:15k
源码类别:

网格计算

开发平台:

Java

  1. #!/usr/bin/env python
  2. """
  3.   hdfs.py is a python client for the thrift interface to HDFS.
  4.   
  5.   Licensed under the Apache License, Version 2.0 (the "License"); 
  6.   you may not use this file except in compliance with the License. 
  7.   You may obtain a copy of the License at 
  8.   
  9.   http://www.apache.org/licenses/LICENSE-2.0 
  10.   
  11.   Unless required by applicable law or agreed to in writing, software 
  12.   distributed under the License is distributed on an "AS IS" BASIS, 
  13.   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
  14.   implied. See the License for the specific language governing permissions 
  15.   and limitations under the License. 
  16. """
  17. import sys
  18. sys.path.append('../gen-py')
  19. from optparse import OptionParser
  20. from thrift import Thrift
  21. from thrift.transport import TSocket
  22. from thrift.transport import TTransport
  23. from thrift.protocol import TBinaryProtocol
  24. from hadoopfs import ThriftHadoopFileSystem
  25. from hadoopfs.ttypes import *
  26. from readline import *
  27. from cmd import *
  28. import os
  29. import re
  30. import readline
  31. import subprocess
  32. #
  33. # The address of the FileSystemClientProxy. If the host and port are
  34. # not specified, then a proxy server is automatically spawned. 
  35. #
  36. host = 'localhost'
  37. port = 4677                       # use any port
  38. proxyStartScript = './start_thrift_server.sh'
  39. startServer = True                # shall we start a proxy server?
  40. #
  41. # The hdfs interactive shell. The Cmd class is a builtin that uses readline + implements
  42. # a whole bunch of utility stuff like help and custom tab completions.
  43. # It makes everything real easy.
  44. #
  45. class hadoopthrift_cli(Cmd):
  46.   # my custom prompt looks better than the default
  47.   prompt = 'hdfs>> '
  48.   #############################
  49.   # Class constructor
  50.   #############################
  51.   def __init__(self, server_name, server_port):
  52.     Cmd.__init__(self)
  53.     self.server_name = server_name
  54.     self.server_port = server_port
  55.   #############################
  56.   # Start the ClientProxy Server if we can find it.
  57.   # Read in its stdout to determine what port it is running on
  58.   #############################
  59.   def startProxyServer(self):
  60.     try:
  61.       p = subprocess.Popen(proxyStartScript, self.server_port, stdout=subprocess.PIPE)
  62.       content = p.stdout.readline()
  63.       p.stdout.close()
  64.       val = re.split( '[|]', content)
  65.       print val[1]
  66.       self.server_port = val[1]
  67.       return True
  68.     except Exception, ex:
  69.       print "ERROR in starting proxy  server " + proxyStartScript
  70.       print '%s' % (ex.message)
  71.       return False
  72.   #############################
  73.   # Connect to clientproxy
  74.   #############################
  75.   def connect(self):
  76.     try:
  77.       # connect to hdfs thrift server
  78.       self.transport = TSocket.TSocket(self.server_name, self.server_port)
  79.       self.transport = TTransport.TBufferedTransport(self.transport)
  80.       self.protocol = TBinaryProtocol.TBinaryProtocol(self.transport)
  81.       # Create a client to use the protocol encoder
  82.       self.client = ThriftHadoopFileSystem.Client(self.protocol)
  83.       self.transport.open()
  84.       # tell the HadoopThrift server to die after 60 minutes of inactivity
  85.       self.client.setInactivityTimeoutPeriod(60*60)
  86.       return True
  87.     except Thrift.TException, tx:
  88.       print "ERROR in connecting to ", self.server_name, ":", self.server_port
  89.       print '%s' % (tx.message)
  90.       return False
  91.   #
  92.   # Disconnect from client proxy
  93.   #
  94.   def shutdown(self):
  95.     try :
  96.       self.transport.close()
  97.     except Exception, tx:
  98.       return False
  99.   #############################
  100.   # Create the specified file. Returns a handle to write data.
  101.   #############################
  102.   def do_create(self, name):
  103.     if name == "":
  104.       print "  ERROR usage: create <pathname>"
  105.       print
  106.       return 0
  107.     # Create the file, and immediately closes the handle
  108.     path = Pathname();
  109.     path.pathname = name;
  110.     status = self.client.create(path)
  111.     self.client.close(status)
  112.     return 0
  113.   #############################
  114.   # Delete the specified file.
  115.   #############################
  116.   def do_rm(self, name):
  117.     if name == "":
  118.       print "  ERROR usage: rm <pathname>n"
  119.       return 0
  120.     # delete file
  121.     path = Pathname();
  122.     path.pathname = name;
  123.     status = self.client.rm(path, False)
  124.     if status == False:
  125.       print "  ERROR in deleting path: " + name
  126.     return 0
  127.   #############################
  128.   # Rename the specified file/dir
  129.   #############################
  130.   def do_mv(self, line):
  131.     params = line.split()
  132.     if (len(params) != 2):
  133.       print "  ERROR usage: mv <srcpathname> <destpathname>n"
  134.       return 0
  135.     src = params[0].strip()
  136.     dest = params[1].strip()
  137.     if src == "":
  138.       print "  ERROR usage: mv <srcpathname> <destpathname>n"
  139.       return 0
  140.     if dest == "":
  141.       print "  ERROR usage: mv <srcpathname> <destpathname>n"
  142.       return 0
  143.     # move file
  144.     path = Pathname();
  145.     path.pathname = src;
  146.     destpath = Pathname();
  147.     destpath.pathname = dest;
  148.     status = self.client.rename(path, destpath)
  149.     if status == False:
  150.       print "  ERROR in renaming path: " + name
  151.     return 0
  152.   #############################
  153.   # Delete the specified file.
  154.   #############################
  155.   def do_mkdirs(self, name):
  156.     if name == "":
  157.       print "  ERROR usage: mkdirs <pathname>n"
  158.       return 0
  159.     # create directory
  160.     path = Pathname();
  161.     path.pathname = name;
  162.     fields = self.client.mkdirs(path)
  163.     return 0
  164.   #############################
  165.   # does the pathname exist?
  166.   #############################
  167.   def do_exists(self, name):
  168.     if name == "":
  169.       print "  ERROR usage: exists <pathname>n"
  170.       return 0
  171.     # check existence of pathname
  172.     path = Pathname();
  173.     path.pathname = name;
  174.     fields = self.client.exists(path)
  175.     if (fields == True):
  176.       print name + " exists."
  177.     else:
  178.       print name + " does not exist."
  179.     return 0
  180.   #############################
  181.   # copy local file into hdfs
  182.   #############################
  183.   def do_put(self, line):
  184.     params = line.split()
  185.     if (len(params) != 2):
  186.       print "  ERROR usage: put <localpathname> <hdfspathname>n"
  187.       return 0
  188.     local = params[0].strip()
  189.     hdfs = params[1].strip()
  190.     if local == "":
  191.       print "  ERROR usage: put <localpathname> <hdfspathname>n"
  192.       return 0
  193.     if hdfs == "":
  194.       print "  ERROR usage: put <localpathname> <hdfspathname>n"
  195.       return 0
  196.     # open local file
  197.     input = open(local, 'rb')
  198.     # open output file
  199.     path = Pathname();
  200.     path.pathname = hdfs;
  201.     output = self.client.create(path)
  202.     # read 1MB at a time and upload to hdfs
  203.     while True:
  204.       chunk = input.read(1024*1024)
  205.       if not chunk: break
  206.       self.client.write(output, chunk)
  207.       
  208.     self.client.close(output) 
  209.     input.close()
  210.   #############################
  211.   # copy hdfs file into local
  212.   #############################
  213.   def do_get(self, line):
  214.     params = line.split()
  215.     if (len(params) != 2):
  216.       print "  ERROR usage: get <hdfspathname> <localpathname>n"
  217.       return 0
  218.     hdfs = params[0].strip()
  219.     local = params[1].strip()
  220.     if local == "":
  221.       print "  ERROR usage: get <hdfspathname> <localpathname>n"
  222.       return 0
  223.     if hdfs == "":
  224.       print "  ERROR usage: get <hdfspathname> <localpathname>n"
  225.       return 0
  226.     # open output local file
  227.     output = open(local, 'wb')
  228.     # open input hdfs file
  229.     path = Pathname();
  230.     path.pathname = hdfs;
  231.     input = self.client.open(path)
  232.     # find size of hdfs file
  233.     filesize = self.client.stat(path).length
  234.     # read 1MB bytes at a time from hdfs
  235.     offset = 0
  236.     chunksize = 1024 * 1024
  237.     while True:
  238.       chunk = self.client.read(input, offset, chunksize)
  239.       if not chunk: break
  240.       output.write(chunk)
  241.       offset += chunksize
  242.       if (offset >= filesize): break
  243.       
  244.     self.client.close(input) 
  245.     output.close()
  246.   #############################
  247.   # List attributes of this path
  248.   #############################
  249.   def do_ls(self, name):
  250.     if name == "":
  251.       print "  ERROR usage: list <pathname>n"
  252.       return 0
  253.     # list file status
  254.     path = Pathname();
  255.     path.pathname = name;
  256.     status = self.client.stat(path)
  257.     if (status.isdir == False):
  258.       self.printStatus(status)
  259.       return 0
  260.     
  261.     # This is a directory, fetch its contents
  262.     liststatus = self.client.listStatus(path)
  263.     for item in liststatus:
  264.       self.printStatus(item)
  265.   #############################
  266.   # Set permissions for a file
  267.   #############################
  268.   def do_chmod(self, line):
  269.     params = line.split()
  270.     if (len(params) != 2):
  271.       print "  ERROR usage: chmod 774 <pathname>n"
  272.       return 0
  273.     perm = params[0].strip()
  274.     name = params[1].strip()
  275.     if name == "":
  276.       print "  ERROR usage: chmod 774 <pathname>n"
  277.       return 0
  278.     if perm == "":
  279.       print "  ERROR usage: chmod 774 <pathname>n"
  280.       return 0
  281.     # set permissions (in octal)
  282.     path = Pathname();
  283.     path.pathname = name;
  284.     status = self.client.chmod(path, int(perm,8))
  285.     return 0
  286.   #############################
  287.   # Set owner for a file. This is not an atomic operation.
  288.   # A change to the group of a file may be overwritten by this one.
  289.   #############################
  290.   def do_chown(self, line):
  291.     params = line.split()
  292.     if (len(params) != 2):
  293.       print "  ERROR usage: chown <ownername> <pathname>n"
  294.       return 0
  295.     owner = params[0].strip()
  296.     name = params[1].strip()
  297.     if name == "":
  298.       print "  ERROR usage: chown <ownername> <pathname>n"
  299.       return 0
  300.     # get the current owner and group
  301.     path = Pathname();
  302.     path.pathname = name;
  303.     cur = self.client.stat(path)
  304.     # set new owner, keep old group
  305.     status = self.client.chown(path, owner, cur.group)
  306.     return 0
  307.   #######################################
  308.   # Set the replication factor for a file
  309.   ######################################
  310.   def do_setreplication(self, line):
  311.     params = line.split()
  312.     if (len(params) != 2):
  313.       print "  ERROR usage: setreplication <replication factor> <pathname>n"
  314.       return 0
  315.     repl = params[0].strip()
  316.     name = params[1].strip()
  317.     if name == "":
  318.       print "  ERROR usage: setreplication <replication factor> <pathname>n"
  319.       return 0
  320.     if repl == "":
  321.       print "  ERROR usage: setreplication <replication factor> <pathname>n"
  322.       return 0
  323.     path = Pathname();
  324.     path.pathname = name;
  325.     status = self.client.setReplication(path, int(repl))
  326.     return 0
  327.   #############################
  328.   # Display the locations of the blocks of this file
  329.   #############################
  330.   def do_getlocations(self, name):
  331.     if name == "":
  332.       print "  ERROR usage: getlocations <pathname>n"
  333.       return 0
  334.     path = Pathname();
  335.     path.pathname = name;
  336.     # find size of hdfs file
  337.     filesize = self.client.stat(path).length
  338.     # getlocations file
  339.     blockLocations = self.client.getFileBlockLocations(path, 0, filesize)
  340.     for item in blockLocations:
  341.       self.printLocations(item)
  342.     
  343.     return 0
  344.   #############################
  345.   # Utility methods from here
  346.   #############################
  347.   #
  348.   # If I don't do this, the last command is always re-executed which is annoying.
  349.   #
  350.   def emptyline(self):
  351.     pass
  352.   # 
  353.   # print the status of a path
  354.   #
  355.   def printStatus(self, stat):
  356.     print str(stat.block_replication) + "t" + str(stat.length) + "t" + str(stat.modification_time) + "t" + stat.permission + "t" + stat.owner + "t" + stat.group + "t" + stat.path
  357.           
  358.   # 
  359.   # print the locations of a block
  360.   #
  361.   def printLocations(self, location):
  362.     print str(location.names) + "t"  + str(location.offset) + "t" + str(location.length)
  363.   #
  364.   # Various ways to exit the hdfs shell
  365.   #
  366.   def do_quit(self,ignored):
  367.     try:
  368.       if startServer:
  369.         self.client.shutdown(1)
  370.       return -1
  371.     except Exception, ex:
  372.       return -1
  373.   def do_q(self,ignored):
  374.     return self.do_quit(ignored)
  375.   # ctl-d
  376.   def do_EOF(self,ignored):
  377.     return self.do_quit(ignored)
  378.   #
  379.   # Give the user some amount of help - I am a nice guy
  380.   #
  381.   def help_create(self):
  382.     print "create <pathname>"
  383.   def help_rm(self):
  384.     print "rm <pathname>"
  385.   def help_mv(self):
  386.     print "mv <srcpathname> <destpathname>"
  387.   def help_mkdirs(self):
  388.     print "mkdirs <pathname>"
  389.   def help_exists(self):
  390.     print "exists <pathname>"
  391.   def help_put(self):
  392.     print "put <localpathname> <hdfspathname>"
  393.   def help_get(self):
  394.     print "get <hdfspathname> <localpathname>"
  395.   def help_ls(self):
  396.     print "ls <hdfspathname>"
  397.   def help_chmod(self):
  398.     print "chmod 775 <hdfspathname>"
  399.   def help_chown(self):
  400.     print "chown <ownername> <hdfspathname>"
  401.   def help_setreplication(self):
  402.     print "setrep <replication factor> <hdfspathname>"
  403.   def help_getlocations(self):
  404.     print "getlocations <pathname>"
  405.   def help_EOF(self):
  406.     print '<ctl-d> will quit this program.'
  407.   def help_quit(self):
  408.     print 'if you need to know what quit does, you shouldn't be using a computer.'
  409.   def help_q(self):
  410.     print 'quit and if you need to know what quit does, you shouldn't be using a computer.'
  411.   def help_help(self):
  412.     print 'duh'
  413.   def usage(exec_name):
  414.     print "Usage: "
  415.     print "  %s [proxyclientname [proxyclientport]]" % exec_name
  416.     print "  %s -v" % exec_name
  417.     print "  %s --help" % exec_name
  418.     print "  %s -h" % exec_name
  419. if __name__ == "__main__":
  420.   #
  421.   # Rudimentary command line processing.
  422.   #
  423.   # real parsing:
  424.   parser = OptionParser()
  425.   parser.add_option("-e", "--execute", dest="command_str",
  426.                                       help="execute this command and exit")
  427.   parser.add_option("-s","--proxyclient",dest="host",help="the proxyclient's hostname")
  428.   parser.add_option("-p","--port",dest="port",help="the proxyclient's port number")
  429.   (options, args) = parser.parse_args()
  430.   #
  431.   # Save host and port information of the proxy server
  432.   #
  433.   if (options.host):
  434.     host = options.host
  435.     startServer = False
  436.   if (options.port):
  437.     port = options.port
  438.     startServer = False
  439.   #
  440.   # Retrieve the user's readline history.
  441.   #
  442.   historyFileName = os.path.expanduser("~/.hdfs_history")
  443.   if (os.path.exists(historyFileName)):
  444.     readline.read_history_file(historyFileName)
  445.   #
  446.   # Create class and connect to proxy server
  447.   #
  448.   c = hadoopthrift_cli(host,port)
  449.   if startServer:
  450.     if c.startProxyServer() == False:
  451.       sys.exit(1)
  452.   if c.connect() == False:
  453.     sys.exit(1)
  454.     
  455.   #
  456.   # If this utility was invoked with one argument, process it
  457.   #
  458.   if (options.command_str):
  459.     c.onecmd(options.command_str)
  460.     sys.exit(0)
  461.   #
  462.   # Start looping over user commands.
  463.   #
  464.   c.cmdloop('Welcome to the Thrift interactive shell for Hadoop File System. - how can I help you? ' + 'n'
  465.       'Press tab twice to see the list of commands. ' + 'n' +
  466.       'To complete the name of a command press tab once. n'
  467.       )
  468.   c.shutdown();
  469.   readline.write_history_file(historyFileName)
  470.   print '' # I am nothing if not courteous.
  471.   sys.exit(0)