StreamUtil.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:14k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.streaming;
  19. import java.text.DecimalFormat;
  20. import java.io.*;
  21. import java.net.*;
  22. import java.util.ArrayList;
  23. import java.util.Arrays;
  24. import java.util.Enumeration;
  25. import java.util.Iterator;
  26. import java.util.List;
  27. import java.util.jar.*;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.FSDataInputStream;
  32. import org.apache.hadoop.mapred.FileSplit;
  33. import org.apache.hadoop.mapred.JobConf;
  34. /** Utilities not available elsewhere in Hadoop.
  35.  *  
  36.  */
  37. public class StreamUtil {
  38.   /** It may seem strange to silently switch behaviour when a String
  39.    * is not a classname; the reason is simplified Usage:<pre>
  40.    * -mapper [classname | program ]
  41.    * instead of the explicit Usage:
  42.    * [-mapper program | -javamapper classname], -mapper and -javamapper are mutually exclusive.
  43.    * (repeat for -reducer, -combiner) </pre>
  44.    */
  45.   public static Class goodClassOrNull(Configuration conf, String className, String defaultPackage) {
  46.     if (className.indexOf('.') == -1 && defaultPackage != null) {
  47.       className = defaultPackage + "." + className;
  48.     }
  49.     Class clazz = null;
  50.     try {
  51.       clazz = conf.getClassByName(className);
  52.     } catch (ClassNotFoundException cnf) {
  53.     }
  54.     return clazz;
  55.   }
  56.   public static String findInClasspath(String className) {
  57.     return findInClasspath(className, StreamUtil.class.getClassLoader());
  58.   }
  59.   /** @return a jar file path or a base directory or null if not found.
  60.    */
  61.   public static String findInClasspath(String className, ClassLoader loader) {
  62.     String relPath = className;
  63.     relPath = relPath.replace('.', '/');
  64.     relPath += ".class";
  65.     java.net.URL classUrl = loader.getResource(relPath);
  66.     String codePath;
  67.     if (classUrl != null) {
  68.       boolean inJar = classUrl.getProtocol().equals("jar");
  69.       codePath = classUrl.toString();
  70.       if (codePath.startsWith("jar:")) {
  71.         codePath = codePath.substring("jar:".length());
  72.       }
  73.       if (codePath.startsWith("file:")) { // can have both
  74.         codePath = codePath.substring("file:".length());
  75.       }
  76.       if (inJar) {
  77.         // A jar spec: remove class suffix in /path/my.jar!/package/Class
  78.         int bang = codePath.lastIndexOf('!');
  79.         codePath = codePath.substring(0, bang);
  80.       } else {
  81.         // A class spec: remove the /my/package/Class.class portion
  82.         int pos = codePath.lastIndexOf(relPath);
  83.         if (pos == -1) {
  84.           throw new IllegalArgumentException("invalid codePath: className=" + className
  85.                                              + " codePath=" + codePath);
  86.         }
  87.         codePath = codePath.substring(0, pos);
  88.       }
  89.     } else {
  90.       codePath = null;
  91.     }
  92.     return codePath;
  93.   }
  94.   // copied from TaskRunner  
  95.   static void unJar(File jarFile, File toDir) throws IOException {
  96.     JarFile jar = new JarFile(jarFile);
  97.     try {
  98.       Enumeration entries = jar.entries();
  99.       while (entries.hasMoreElements()) {
  100.         JarEntry entry = (JarEntry) entries.nextElement();
  101.         if (!entry.isDirectory()) {
  102.           InputStream in = jar.getInputStream(entry);
  103.           try {
  104.             File file = new File(toDir, entry.getName());
  105.             file.getParentFile().mkdirs();
  106.             OutputStream out = new FileOutputStream(file);
  107.             try {
  108.               byte[] buffer = new byte[8192];
  109.               int i;
  110.               while ((i = in.read(buffer)) != -1) {
  111.                 out.write(buffer, 0, i);
  112.               }
  113.             } finally {
  114.               out.close();
  115.             }
  116.           } finally {
  117.             in.close();
  118.           }
  119.         }
  120.       }
  121.     } finally {
  122.       jar.close();
  123.     }
  124.   }
  125.   final static long KB = 1024L * 1;
  126.   final static long MB = 1024L * KB;
  127.   final static long GB = 1024L * MB;
  128.   final static long TB = 1024L * GB;
  129.   final static long PB = 1024L * TB;
  130.   static DecimalFormat dfm = new DecimalFormat("####.000");
  131.   static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
  132.   public static String dfmt(double d) {
  133.     return dfm.format(d);
  134.   }
  135.   public static String ifmt(double d) {
  136.     return ifm.format(d);
  137.   }
  138.   public static String formatBytes(long numBytes) {
  139.     StringBuffer buf = new StringBuffer();
  140.     boolean bDetails = true;
  141.     double num = numBytes;
  142.     if (numBytes < KB) {
  143.       buf.append(numBytes).append(" B");
  144.       bDetails = false;
  145.     } else if (numBytes < MB) {
  146.       buf.append(dfmt(num / KB)).append(" KB");
  147.     } else if (numBytes < GB) {
  148.       buf.append(dfmt(num / MB)).append(" MB");
  149.     } else if (numBytes < TB) {
  150.       buf.append(dfmt(num / GB)).append(" GB");
  151.     } else if (numBytes < PB) {
  152.       buf.append(dfmt(num / TB)).append(" TB");
  153.     } else {
  154.       buf.append(dfmt(num / PB)).append(" PB");
  155.     }
  156.     if (bDetails) {
  157.       buf.append(" (").append(ifmt(numBytes)).append(" bytes)");
  158.     }
  159.     return buf.toString();
  160.   }
  161.   public static String formatBytes2(long numBytes) {
  162.     StringBuffer buf = new StringBuffer();
  163.     long u = 0;
  164.     if (numBytes >= TB) {
  165.       u = numBytes / TB;
  166.       numBytes -= u * TB;
  167.       buf.append(u).append(" TB ");
  168.     }
  169.     if (numBytes >= GB) {
  170.       u = numBytes / GB;
  171.       numBytes -= u * GB;
  172.       buf.append(u).append(" GB ");
  173.     }
  174.     if (numBytes >= MB) {
  175.       u = numBytes / MB;
  176.       numBytes -= u * MB;
  177.       buf.append(u).append(" MB ");
  178.     }
  179.     if (numBytes >= KB) {
  180.       u = numBytes / KB;
  181.       numBytes -= u * KB;
  182.       buf.append(u).append(" KB ");
  183.     }
  184.     buf.append(u).append(" B"); //even if zero
  185.     return buf.toString();
  186.   }
  187.   static Environment env;
  188.   static String HOST;
  189.   static {
  190.     try {
  191.       env = new Environment();
  192.       HOST = env.getHost();
  193.     } catch (IOException io) {
  194.       io.printStackTrace();
  195.     }
  196.   }
  197.   static class StreamConsumer extends Thread {
  198.     StreamConsumer(InputStream in, OutputStream out) {
  199.       this.bin = new LineNumberReader(new BufferedReader(new InputStreamReader(in)));
  200.       if (out != null) {
  201.         this.bout = new DataOutputStream(out);
  202.       }
  203.     }
  204.     public void run() {
  205.       try {
  206.         String line;
  207.         while ((line = bin.readLine()) != null) {
  208.           if (bout != null) {
  209.             bout.writeUTF(line); //writeChars
  210.             bout.writeChar('n');
  211.           }
  212.         }
  213.         bout.flush();
  214.       } catch (IOException io) {
  215.       }
  216.     }
  217.     LineNumberReader bin;
  218.     DataOutputStream bout;
  219.   }
  220.   static void exec(String arg, PrintStream log) {
  221.     exec(new String[] { arg }, log);
  222.   }
  223.   static void exec(String[] args, PrintStream log) {
  224.     try {
  225.       log.println("Exec: start: " + Arrays.asList(args));
  226.       Process proc = Runtime.getRuntime().exec(args);
  227.       new StreamConsumer(proc.getErrorStream(), log).start();
  228.       new StreamConsumer(proc.getInputStream(), log).start();
  229.       int status = proc.waitFor();
  230.       //if status != 0
  231.       log.println("Exec: status=" + status + ": " + Arrays.asList(args));
  232.     } catch (InterruptedException in) {
  233.       in.printStackTrace();
  234.     } catch (IOException io) {
  235.       io.printStackTrace();
  236.     }
  237.   }
  238.   static String qualifyHost(String url) {
  239.     try {
  240.       return qualifyHost(new URL(url)).toString();
  241.     } catch (IOException io) {
  242.       return url;
  243.     }
  244.   }
  245.   static URL qualifyHost(URL url) {
  246.     try {
  247.       InetAddress a = InetAddress.getByName(url.getHost());
  248.       String qualHost = a.getCanonicalHostName();
  249.       URL q = new URL(url.getProtocol(), qualHost, url.getPort(), url.getFile());
  250.       return q;
  251.     } catch (IOException io) {
  252.       return url;
  253.     }
  254.   }
  255.   static final String regexpSpecials = "[]()?*+|.!^-\~@";
  256.   public static String regexpEscape(String plain) {
  257.     StringBuffer buf = new StringBuffer();
  258.     char[] ch = plain.toCharArray();
  259.     int csup = ch.length;
  260.     for (int c = 0; c < csup; c++) {
  261.       if (regexpSpecials.indexOf(ch[c]) != -1) {
  262.         buf.append("\");
  263.       }
  264.       buf.append(ch[c]);
  265.     }
  266.     return buf.toString();
  267.   }
  268.   public static String safeGetCanonicalPath(File f) {
  269.     try {
  270.       String s = f.getCanonicalPath();
  271.       return (s == null) ? f.toString() : s;
  272.     } catch (IOException io) {
  273.       return f.toString();
  274.     }
  275.   }
  276.   static String slurp(File f) throws IOException {
  277.     int len = (int) f.length();
  278.     byte[] buf = new byte[len];
  279.     FileInputStream in = new FileInputStream(f);
  280.     String contents = null;
  281.     try {
  282.       in.read(buf, 0, len);
  283.       contents = new String(buf, "UTF-8");
  284.     } finally {
  285.       in.close();
  286.     }
  287.     return contents;
  288.   }
  289.   static String slurpHadoop(Path p, FileSystem fs) throws IOException {
  290.     int len = (int) fs.getLength(p);
  291.     byte[] buf = new byte[len];
  292.     FSDataInputStream in = fs.open(p);
  293.     String contents = null;
  294.     try {
  295.       in.readFully(in.getPos(), buf);
  296.       contents = new String(buf, "UTF-8");
  297.     } finally {
  298.       in.close();
  299.     }
  300.     return contents;
  301.   }
  302.   public static String rjustify(String s, int width) {
  303.     if (s == null) s = "null";
  304.     if (width > s.length()) {
  305.       s = getSpace(width - s.length()) + s;
  306.     }
  307.     return s;
  308.   }
  309.   public static String ljustify(String s, int width) {
  310.     if (s == null) s = "null";
  311.     if (width > s.length()) {
  312.       s = s + getSpace(width - s.length());
  313.     }
  314.     return s;
  315.   }
  316.   static char[] space;
  317.   static {
  318.     space = new char[300];
  319.     Arrays.fill(space, 'u0020');
  320.   }
  321.   public static String getSpace(int len) {
  322.     if (len > space.length) {
  323.       space = new char[Math.max(len, 2 * space.length)];
  324.       Arrays.fill(space, 'u0020');
  325.     }
  326.     return new String(space, 0, len);
  327.   }
  328.   static private Environment env_;
  329.   static Environment env() {
  330.     if (env_ != null) {
  331.       return env_;
  332.     }
  333.     try {
  334.       env_ = new Environment();
  335.     } catch (IOException io) {
  336.       io.printStackTrace();
  337.     }
  338.     return env_;
  339.   }
  340.   public static String makeJavaCommand(Class main, String[] argv) {
  341.     ArrayList vargs = new ArrayList();
  342.     File javaHomeBin = new File(System.getProperty("java.home"), "bin");
  343.     File jvm = new File(javaHomeBin, "java");
  344.     vargs.add(jvm.toString());
  345.     // copy parent classpath
  346.     vargs.add("-classpath");
  347.     vargs.add(""" + System.getProperty("java.class.path") + """);
  348.     // add heap-size limit
  349.     vargs.add("-Xmx" + Runtime.getRuntime().maxMemory());
  350.     // Add main class and its arguments
  351.     vargs.add(main.getName());
  352.     for (int i = 0; i < argv.length; i++) {
  353.       vargs.add(argv[i]);
  354.     }
  355.     return collate(vargs, " ");
  356.   }
  357.   public static String collate(Object[] args, String sep) {
  358.     return collate(Arrays.asList(args), sep);
  359.   }
  360.   public static String collate(List args, String sep) {
  361.     StringBuffer buf = new StringBuffer();
  362.     Iterator it = args.iterator();
  363.     while (it.hasNext()) {
  364.       if (buf.length() > 0) {
  365.         buf.append(" ");
  366.       }
  367.       buf.append(it.next());
  368.     }
  369.     return buf.toString();
  370.   }
  371.   // JobConf helpers
  372.   public static FileSplit getCurrentSplit(JobConf job) {
  373.     String path = job.get("map.input.file");
  374.     if (path == null) {
  375.       return null;
  376.     }
  377.     Path p = new Path(path);
  378.     long start = Long.parseLong(job.get("map.input.start"));
  379.     long length = Long.parseLong(job.get("map.input.length"));
  380.     return new FileSplit(p, start, length, job);
  381.   }
  382.   static class TaskId {
  383.     boolean mapTask;
  384.     String jobid;
  385.     int taskid;
  386.     int execid;
  387.   }
  388.   public static boolean isLocalJobTracker(JobConf job) {
  389.     return job.get("mapred.job.tracker", "local").equals("local");
  390.   }
  391.   public static TaskId getTaskInfo(JobConf job) {
  392.     TaskId res = new TaskId();
  393.     String id = job.get("mapred.task.id");
  394.     if (isLocalJobTracker(job)) {
  395.       // it uses difft naming 
  396.       res.mapTask = job.getBoolean("mapred.task.is.map", true);
  397.       res.jobid = "0";
  398.       res.taskid = 0;
  399.       res.execid = 0;
  400.     } else {
  401.       String[] e = id.split("_");
  402.       res.mapTask = e[3].equals("m");
  403.       res.jobid = e[1] + "_" + e[2];
  404.       res.taskid = Integer.parseInt(e[4]);
  405.       res.execid = Integer.parseInt(e[5]);
  406.     }
  407.     return res;
  408.   }
  409.   public static void touch(File file) throws IOException {
  410.     file = file.getAbsoluteFile();
  411.     FileOutputStream out = new FileOutputStream(file);
  412.     out.close();
  413.     if (!file.exists()) {
  414.       throw new IOException("touch failed: " + file);
  415.     }
  416.   }
  417.   public static boolean isCygwin() {
  418.     String OS = System.getProperty("os.name");
  419.     return (OS.indexOf("Windows") > -1);
  420.   }
  421.   public static String localizeBin(String path) {
  422.     if (isCygwin()) {
  423.       path = "C:/cygwin/" + path;
  424.     }
  425.     return path;
  426.   }
  427.   
  428.   /** @param name foo where &lt;junit>&lt;sysproperty key="foo" value="${foo}"/> 
  429.    * If foo is undefined then Ant sets the unevaluated value. 
  430.    * Take this into account when setting defaultVal. */
  431.   public static String getBoundAntProperty(String name, String defaultVal)
  432.   {
  433.     String val = System.getProperty(name);
  434.     if (val != null && val.indexOf("${") >= 0) {
  435.       val = null;
  436.     }
  437.     if (val == null) {
  438.       val = defaultVal;
  439.     }
  440.     return val;
  441.   }
  442. }