HadoopThriftServer.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:20k
源码类别:

网格计算

开发平台:

Java

  1. package org.apache.hadoop.thriftfs;
  2. import com.facebook.thrift.TException;
  3. import com.facebook.thrift.TApplicationException;
  4. import com.facebook.thrift.protocol.TBinaryProtocol;
  5. import com.facebook.thrift.protocol.TProtocol;
  6. import com.facebook.thrift.server.TServer;
  7. import com.facebook.thrift.server.TThreadPoolServer;
  8. import com.facebook.thrift.transport.TServerSocket;
  9. import com.facebook.thrift.transport.TServerTransport;
  10. import com.facebook.thrift.transport.TTransportFactory;
  11. // Include Generated code
  12. import org.apache.hadoop.thriftfs.api.*;
  13. import org.apache.hadoop.thriftfs.api.ThriftHadoopFileSystem;
  14. import java.io.*;
  15. import java.util.*;
  16. import java.net.*;
  17. import org.apache.hadoop.fs.*;
  18. import org.apache.hadoop.fs.permission.FsPermission;
  19. import org.apache.commons.logging.Log;
  20. import org.apache.commons.logging.LogFactory;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.util.Daemon;
  23. import org.apache.hadoop.util.StringUtils;
  24. /**
  25.  * ThriftHadoopFileSystem
  26.  * A thrift wrapper around the Hadoop File System
  27.  */
  28. public class HadoopThriftServer extends ThriftHadoopFileSystem {
  29.   static int serverPort = 0;                    // default port
  30.   TServer    server = null;
  31.   public static class HadoopThriftHandler implements ThriftHadoopFileSystem.Iface
  32.   {
  33.     public static final Log LOG = LogFactory.getLog("org.apache.hadoop.thrift");
  34.     // HDFS glue
  35.     Configuration conf;
  36.     FileSystem fs;
  37.         
  38.     // stucture that maps each Thrift object into an hadoop object
  39.     private long nextId = new Random().nextLong();
  40.     private HashMap<Long, Object> hadoopHash = new HashMap<Long, Object>();
  41.     private Daemon inactivityThread = null;
  42.     // Detect inactive session
  43.     private static volatile long inactivityPeriod = 3600 * 1000; // 1 hr
  44.     private static volatile long inactivityRecheckInterval = 60 * 1000;
  45.     private static volatile boolean fsRunning = true;
  46.     private static long now;
  47.     // allow outsider to change the hadoopthrift path
  48.     public void setOption(String key, String val) {
  49.     }
  50.     /**
  51.      * Current system time.
  52.      * @return current time in msec.
  53.      */
  54.     static long now() {
  55.       return System.currentTimeMillis();
  56.     }
  57.     /**
  58.     * getVersion
  59.     *
  60.     * @return current version of the interface.
  61.     */
  62.     public String getVersion() {
  63.       return "0.1";
  64.     }
  65.     /**
  66.      * shutdown
  67.      *
  68.      * cleanly closes everything and exit.
  69.      */
  70.     public void shutdown(int status) {
  71.       LOG.info("HadoopThriftServer shutting down.");
  72.       try {
  73.         fs.close();
  74.       } catch (IOException e) {
  75.         LOG.warn("Unable to close file system");
  76.       }
  77.       Runtime.getRuntime().exit(status);
  78.     }
  79.     /**
  80.      * Periodically checks to see if there is inactivity
  81.      */
  82.     class InactivityMonitor implements Runnable {
  83.       public void run() {
  84.         while (fsRunning) {
  85.           try {
  86.             if (now() > now + inactivityPeriod) {
  87.               LOG.warn("HadoopThriftServer Inactivity period of " +
  88.                        inactivityPeriod + " expired... Stopping Server.");
  89.               shutdown(-1);
  90.             }
  91.           } catch (Exception e) {
  92.             LOG.error(StringUtils.stringifyException(e));
  93.           }
  94.           try {
  95.             Thread.sleep(inactivityRecheckInterval);
  96.           } catch (InterruptedException ie) {
  97.           }
  98.         }
  99.       }
  100.     }
  101.     /**
  102.      * HadoopThriftServer
  103.      *
  104.      * Constructor for the HadoopThriftServer glue with Thrift Class.
  105.      *
  106.      * @param name - the name of this handler
  107.      */
  108.     public HadoopThriftHandler(String name) {
  109.       conf = new Configuration();
  110.       now = now();
  111.       try {
  112.         inactivityThread = new Daemon(new InactivityMonitor());
  113.         fs = FileSystem.get(conf);
  114.       } catch (IOException e) {
  115.         LOG.warn("Unable to open hadoop file system...");
  116.         Runtime.getRuntime().exit(-1);
  117.       }
  118.     }
  119.     /**
  120.       * printStackTrace
  121.       *
  122.       * Helper function to print an exception stack trace to the log and not stderr
  123.       *
  124.       * @param e the exception
  125.       *
  126.       */
  127.     static private void printStackTrace(Exception e) {
  128.       for(StackTraceElement s: e.getStackTrace()) {
  129.         LOG.error(s);
  130.       }
  131.     }
  132.     /**
  133.      * Lookup a thrift object into a hadoop object
  134.      */
  135.     private synchronized Object lookup(long id) {
  136.       return hadoopHash.get(new Long(id));
  137.     }
  138.     /**
  139.      * Insert a thrift object into a hadoop object. Return its id.
  140.      */
  141.     private synchronized long insert(Object o) {
  142.       nextId++;
  143.       hadoopHash.put(nextId, o);
  144.       return nextId;
  145.     }
  146.     /**
  147.      * Delete a thrift object from the hadoop store.
  148.      */
  149.     private synchronized Object remove(long id) {
  150.       return hadoopHash.remove(new Long(id));
  151.     }
  152.     /**
  153.       * Implement the API exported by this thrift server
  154.       */
  155.     /** Set inactivity timeout period. The period is specified in seconds.
  156.       * if there are no RPC calls to the HadoopThrift server for this much
  157.       * time, then the server kills itself.
  158.       */
  159.     public synchronized void setInactivityTimeoutPeriod(long periodInSeconds) {
  160.       inactivityPeriod = periodInSeconds * 1000; // in milli seconds
  161.       if (inactivityRecheckInterval > inactivityPeriod ) {
  162.         inactivityRecheckInterval = inactivityPeriod;
  163.       }
  164.     }
  165.     /**
  166.       * Create a file and open it for writing
  167.       */
  168.     public ThriftHandle create(Pathname path) throws ThriftIOException {
  169.       try {
  170.         now = now();
  171.         HadoopThriftHandler.LOG.debug("create: " + path);
  172.         FSDataOutputStream out = fs.create(new Path(path.pathname));
  173.         long id = insert(out);
  174.         ThriftHandle obj = new ThriftHandle(id);
  175.         HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
  176.         return obj;
  177.       } catch (IOException e) {
  178.         throw new ThriftIOException(e.getMessage());
  179.       }
  180.     }
  181.     /**
  182.       * Create a file and open it for writing, delete file if it exists
  183.       */
  184.     public ThriftHandle createFile(Pathname path, 
  185.                                    short mode,
  186.                                    boolean  overwrite,
  187.                                    int bufferSize,
  188.                                    short replication,
  189.                                    long blockSize) throws ThriftIOException {
  190.       try {
  191.         now = now();
  192.         HadoopThriftHandler.LOG.debug("create: " + path +
  193.                                      " permission: " + mode +
  194.                                      " overwrite: " + overwrite +
  195.                                      " bufferSize: " + bufferSize +
  196.                                      " replication: " + replication +
  197.                                      " blockSize: " + blockSize);
  198.         FSDataOutputStream out = fs.create(new Path(path.pathname), 
  199.                                            new FsPermission(mode),
  200.                                            overwrite,
  201.                                            bufferSize,
  202.                                            replication,
  203.                                            blockSize,
  204.                                            null); // progress
  205.         long id = insert(out);
  206.         ThriftHandle obj = new ThriftHandle(id);
  207.         HadoopThriftHandler.LOG.debug("created: " + path + " id: " + id);
  208.         return obj;
  209.       } catch (IOException e) {
  210.         throw new ThriftIOException(e.getMessage());
  211.       }
  212.     }
  213.     /**
  214.      * Opens an existing file and returns a handle to read it
  215.      */
  216.     public ThriftHandle open(Pathname path) throws ThriftIOException {
  217.       try {
  218.         now = now();
  219.         HadoopThriftHandler.LOG.debug("open: " + path);
  220.         FSDataInputStream out = fs.open(new Path(path.pathname));
  221.         long id = insert(out);
  222.         ThriftHandle obj = new ThriftHandle(id);
  223.         HadoopThriftHandler.LOG.debug("opened: " + path + " id: " + id);
  224.         return obj;
  225.       } catch (IOException e) {
  226.         throw new ThriftIOException(e.getMessage());
  227.       }
  228.     }
  229.     /**
  230.      * Opens an existing file to append to it.
  231.      */
  232.     public ThriftHandle append(Pathname path) throws ThriftIOException {
  233.       try {
  234.         now = now();
  235.         HadoopThriftHandler.LOG.debug("append: " + path);
  236.         FSDataOutputStream out = fs.append(new Path(path.pathname));
  237.         long id = insert(out);
  238.         ThriftHandle obj = new ThriftHandle(id);
  239.         HadoopThriftHandler.LOG.debug("appended: " + path + " id: " + id);
  240.         return obj;
  241.       } catch (IOException e) {
  242.         throw new ThriftIOException(e.getMessage());
  243.       }
  244.     }
  245.     /**
  246.      * write to a file
  247.      */
  248.     public boolean write(ThriftHandle tout, String data) throws ThriftIOException {
  249.       try {
  250.         now = now();
  251.         HadoopThriftHandler.LOG.debug("write: " + tout.id);
  252.         FSDataOutputStream out = (FSDataOutputStream)lookup(tout.id);
  253.         byte[] tmp = data.getBytes("UTF-8");
  254.         out.write(tmp, 0, tmp.length);
  255.         HadoopThriftHandler.LOG.debug("wrote: " + tout.id);
  256.         return true;
  257.       } catch (IOException e) {
  258.         throw new ThriftIOException(e.getMessage());
  259.       }
  260.     }
  261.     /**
  262.      * read from a file
  263.      */
  264.     public String read(ThriftHandle tout, long offset,
  265.                        int length) throws ThriftIOException {
  266.       try {
  267.         now = now();
  268.         HadoopThriftHandler.LOG.debug("read: " + tout.id +
  269.                                      " offset: " + offset +
  270.                                      " length: " + length);
  271.         FSDataInputStream in = (FSDataInputStream)lookup(tout.id);
  272.         if (in.getPos() != offset) {
  273.           in.seek(offset);
  274.         }
  275.         byte[] tmp = new byte[length];
  276.         int numbytes = in.read(offset, tmp, 0, length);
  277.         HadoopThriftHandler.LOG.debug("read done: " + tout.id);
  278.         return new String(tmp, 0, numbytes, "UTF-8");
  279.       } catch (IOException e) {
  280.         throw new ThriftIOException(e.getMessage());
  281.       }
  282.     }
  283.     /**
  284.      * Delete a file/directory
  285.      */
  286.     public boolean rm(Pathname path, boolean recursive) 
  287.                           throws ThriftIOException {
  288.       try {
  289.         now = now();
  290.         HadoopThriftHandler.LOG.debug("rm: " + path +
  291.                                      " recursive: " + recursive);
  292.         boolean ret = fs.delete(new Path(path.pathname), recursive);
  293.         HadoopThriftHandler.LOG.debug("rm: " + path);
  294.         return ret;
  295.       } catch (IOException e) {
  296.         throw new ThriftIOException(e.getMessage());
  297.       }
  298.     }
  299.     /**
  300.      * Move a file/directory
  301.      */
  302.     public boolean rename(Pathname path, Pathname dest) 
  303.                           throws ThriftIOException {
  304.       try {
  305.         now = now();
  306.         HadoopThriftHandler.LOG.debug("rename: " + path +
  307.                                      " destination: " + dest);
  308.         boolean ret = fs.rename(new Path(path.pathname), 
  309.                                 new Path(dest.pathname));
  310.         HadoopThriftHandler.LOG.debug("rename: " + path);
  311.         return ret;
  312.       } catch (IOException e) {
  313.         throw new ThriftIOException(e.getMessage());
  314.       }
  315.     }
  316.     /**
  317.      *  close file
  318.      */
  319.      public boolean close(ThriftHandle tout) throws ThriftIOException {
  320.        try {
  321.          now = now();
  322.          HadoopThriftHandler.LOG.debug("close: " + tout.id);
  323.          Object obj = remove(tout.id);
  324.          if (obj instanceof FSDataOutputStream) {
  325.            FSDataOutputStream out = (FSDataOutputStream)obj;
  326.            out.close();
  327.          } else if (obj instanceof FSDataInputStream) {
  328.            FSDataInputStream in = (FSDataInputStream)obj;
  329.            in.close();
  330.          } else {
  331.            throw new ThriftIOException("Unknown thrift handle.");
  332.          }
  333.          HadoopThriftHandler.LOG.debug("closed: " + tout.id);
  334.          return true;
  335.        } catch (IOException e) {
  336.          throw new ThriftIOException(e.getMessage());
  337.        }
  338.      }
  339.      /**
  340.       * Create a directory
  341.       */
  342.     public boolean mkdirs(Pathname path) throws ThriftIOException {
  343.       try {
  344.         now = now();
  345.         HadoopThriftHandler.LOG.debug("mkdirs: " + path);
  346.         boolean ret = fs.mkdirs(new Path(path.pathname));
  347.         HadoopThriftHandler.LOG.debug("mkdirs: " + path);
  348.         return ret;
  349.       } catch (IOException e) {
  350.         throw new ThriftIOException(e.getMessage());
  351.       }
  352.     }
  353.     /**
  354.      * Does this pathname exist?
  355.      */
  356.     public boolean exists(Pathname path) throws ThriftIOException {
  357.       try {
  358.         now = now();
  359.         HadoopThriftHandler.LOG.debug("exists: " + path);
  360.         boolean ret = fs.exists(new Path(path.pathname));
  361.         HadoopThriftHandler.LOG.debug("exists done: " + path);
  362.         return ret;
  363.       } catch (IOException e) {
  364.         throw new ThriftIOException(e.getMessage());
  365.       }
  366.     }
  367.     /**
  368.      * Returns status about the specified pathname
  369.      */
  370.     public org.apache.hadoop.thriftfs.api.FileStatus stat(
  371.                             Pathname path) throws ThriftIOException {
  372.       try {
  373.         now = now();
  374.         HadoopThriftHandler.LOG.debug("stat: " + path);
  375.         org.apache.hadoop.fs.FileStatus stat = fs.getFileStatus(
  376.                                            new Path(path.pathname));
  377.         HadoopThriftHandler.LOG.debug("stat done: " + path);
  378.         return new org.apache.hadoop.thriftfs.api.FileStatus(
  379.           stat.getPath().toString(),
  380.           stat.getLen(),
  381.           stat.isDir(),
  382.           stat.getReplication(),
  383.           stat.getBlockSize(),
  384.           stat.getModificationTime(),
  385.           stat.getPermission().toString(),
  386.           stat.getOwner(),
  387.           stat.getGroup());
  388.       } catch (IOException e) {
  389.         throw new ThriftIOException(e.getMessage());
  390.       }
  391.     }
  392.     /**
  393.      * If the specified pathname is a directory, then return the
  394.      * list of pathnames in this directory
  395.      */
  396.     public List<org.apache.hadoop.thriftfs.api.FileStatus> listStatus(
  397.                             Pathname path) throws ThriftIOException {
  398.       try {
  399.         now = now();
  400.         HadoopThriftHandler.LOG.debug("listStatus: " + path);
  401.         org.apache.hadoop.fs.FileStatus[] stat = fs.listStatus(
  402.                                            new Path(path.pathname));
  403.         HadoopThriftHandler.LOG.debug("listStatus done: " + path);
  404.         org.apache.hadoop.thriftfs.api.FileStatus tmp;
  405.         List<org.apache.hadoop.thriftfs.api.FileStatus> value = 
  406.           new LinkedList<org.apache.hadoop.thriftfs.api.FileStatus>();
  407.         for (int i = 0; i < stat.length; i++) {
  408.           tmp = new org.apache.hadoop.thriftfs.api.FileStatus(
  409.                       stat[i].getPath().toString(),
  410.                       stat[i].getLen(),
  411.                       stat[i].isDir(),
  412.                       stat[i].getReplication(),
  413.                       stat[i].getBlockSize(),
  414.                       stat[i].getModificationTime(),
  415.                       stat[i].getPermission().toString(),
  416.                       stat[i].getOwner(),
  417.                       stat[i].getGroup());
  418.           value.add(tmp);
  419.         }
  420.         return value;
  421.       } catch (IOException e) {
  422.         throw new ThriftIOException(e.getMessage());
  423.       }
  424.     }
  425.     /**
  426.      * Sets the permission of a pathname
  427.      */
  428.     public void chmod(Pathname path, short mode) throws ThriftIOException {
  429.       try {
  430.         now = now();
  431.         HadoopThriftHandler.LOG.debug("chmod: " + path + 
  432.                                      " mode " + mode);
  433.         fs.setPermission(new Path(path.pathname), new FsPermission(mode));
  434.         HadoopThriftHandler.LOG.debug("chmod done: " + path);
  435.       } catch (IOException e) {
  436.         throw new ThriftIOException(e.getMessage());
  437.       }
  438.     }
  439.     /**
  440.      * Sets the owner & group of a pathname
  441.      */
  442.     public void chown(Pathname path, String owner, String group) 
  443.                                                        throws ThriftIOException {
  444.       try {
  445.         now = now();
  446.         HadoopThriftHandler.LOG.debug("chown: " + path +
  447.                                      " owner: " + owner +
  448.                                      " group: " + group);
  449.         fs.setOwner(new Path(path.pathname), owner, group);
  450.         HadoopThriftHandler.LOG.debug("chown done: " + path);
  451.       } catch (IOException e) {
  452.         throw new ThriftIOException(e.getMessage());
  453.       }
  454.     }
  455.     /**
  456.      * Sets the replication factor of a file
  457.      */
  458.     public void setReplication(Pathname path, short repl) throws ThriftIOException {
  459.       try {
  460.         now = now();
  461.         HadoopThriftHandler.LOG.debug("setrepl: " + path +
  462.                                      " replication factor: " + repl);
  463.         fs.setReplication(new Path(path.pathname), repl);
  464.         HadoopThriftHandler.LOG.debug("setrepl done: " + path);
  465.       } catch (IOException e) {
  466.         throw new ThriftIOException(e.getMessage());
  467.       }
  468.     }
  469.     /**
  470.      * Returns the block locations of this file
  471.      */
  472.     public List<org.apache.hadoop.thriftfs.api.BlockLocation> 
  473.              getFileBlockLocations(Pathname path, long start, long length) 
  474.                                          throws ThriftIOException {
  475.       try {
  476.         now = now();
  477.         HadoopThriftHandler.LOG.debug("getFileBlockLocations: " + path);
  478.         org.apache.hadoop.fs.FileStatus status = fs.getFileStatus(
  479.                                                  new Path(path.pathname));
  480.         org.apache.hadoop.fs.BlockLocation[] stat = 
  481.             fs.getFileBlockLocations(status, start, length);
  482.         HadoopThriftHandler.LOG.debug("getFileBlockLocations done: " + path);
  483.         org.apache.hadoop.thriftfs.api.BlockLocation tmp;
  484.         List<org.apache.hadoop.thriftfs.api.BlockLocation> value = 
  485.           new LinkedList<org.apache.hadoop.thriftfs.api.BlockLocation>();
  486.         for (int i = 0; i < stat.length; i++) {
  487.           // construct the list of hostnames from the array returned
  488.           // by HDFS
  489.           List<String> hosts = new LinkedList<String>();
  490.           String[] hostsHdfs = stat[i].getHosts();
  491.           for (int j = 0; j < hostsHdfs.length; j++) {
  492.             hosts.add(hostsHdfs[j]);
  493.           }
  494.           // construct the list of host:port from the array returned
  495.           // by HDFS
  496.           List<String> names = new LinkedList<String>();
  497.           String[] namesHdfs = stat[i].getNames();
  498.           for (int j = 0; j < namesHdfs.length; j++) {
  499.             names.add(namesHdfs[j]);
  500.           }
  501.           tmp = new org.apache.hadoop.thriftfs.api.BlockLocation(
  502.                       hosts, names, stat[i].getOffset(), stat[i].getLength());
  503.           value.add(tmp);
  504.         }
  505.         return value;
  506.       } catch (IOException e) {
  507.         throw new ThriftIOException(e.getMessage());
  508.       }
  509.     }
  510.   }
  511.   // Bind to port. If the specified port is 0, then bind to random port.
  512.   private ServerSocket createServerSocket(int port) throws IOException {
  513.     try {
  514.       ServerSocket sock = new ServerSocket();
  515.       // Prevent 2MSL delay problem on server restarts
  516.       sock.setReuseAddress(true);
  517.       // Bind to listening port
  518.       if (port == 0) {
  519.         sock.bind(null);
  520.         serverPort = sock.getLocalPort();
  521.       } else {
  522.         sock.bind(new InetSocketAddress(port));
  523.       }
  524.       return sock;
  525.     } catch (IOException ioe) {
  526.       throw new IOException("Could not create ServerSocket on port " + port + "." +
  527.                             ioe);
  528.     }
  529.   }
  530.   /**
  531.    * Constrcts a server object
  532.    */
  533.   public HadoopThriftServer(String [] args) {
  534.     if (args.length > 0) {
  535.       serverPort = new Integer(args[0]);
  536.     }
  537.     try {
  538.       ServerSocket ssock = createServerSocket(serverPort);
  539.       TServerTransport serverTransport = new TServerSocket(ssock);
  540.       Iface handler = new HadoopThriftHandler("hdfs-thrift-dhruba");
  541.       ThriftHadoopFileSystem.Processor processor = new ThriftHadoopFileSystem.Processor(handler);
  542.       TThreadPoolServer.Options options = new TThreadPoolServer.Options();
  543.       options.minWorkerThreads = 10;
  544.       server = new TThreadPoolServer(processor, serverTransport,
  545.                                              new TTransportFactory(),
  546.                                              new TTransportFactory(),
  547.                                              new TBinaryProtocol.Factory(),
  548.                                              new TBinaryProtocol.Factory(), 
  549.                                              options);
  550.       System.out.println("Starting the hadoop thrift server on port [" + serverPort + "]...");
  551.       HadoopThriftHandler.LOG.info("Starting the hadoop thrift server on port [" +serverPort + "]...");
  552.       System.out.flush();
  553.     } catch (Exception x) {
  554.       x.printStackTrace();
  555.     }
  556.   }
  557.   public static void main(String [] args) {
  558.     HadoopThriftServer me = new HadoopThriftServer(args);
  559.     me.server.serve();
  560.   }
  561. };