WritableUtils.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:13k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io;
  19. import java.io.*;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.util.ReflectionUtils;
  22. import java.util.zip.GZIPInputStream;
  23. import java.util.zip.GZIPOutputStream;
  24. public final class WritableUtils  {
  25.   public static byte[] readCompressedByteArray(DataInput in) throws IOException {
  26.     int length = in.readInt();
  27.     if (length == -1) return null;
  28.     byte[] buffer = new byte[length];
  29.     in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
  30.     GZIPInputStream gzi = new GZIPInputStream(new ByteArrayInputStream(buffer, 0, buffer.length));
  31.     byte[] outbuf = new byte[length];
  32.     ByteArrayOutputStream bos =  new ByteArrayOutputStream();
  33.     int len;
  34.     while((len=gzi.read(outbuf, 0, outbuf.length)) != -1){
  35.       bos.write(outbuf, 0, len);
  36.     }
  37.     byte[] decompressed =  bos.toByteArray();
  38.     bos.close();
  39.     gzi.close();
  40.     return decompressed;
  41.   }
  42.   public static void skipCompressedByteArray(DataInput in) throws IOException {
  43.     int length = in.readInt();
  44.     if (length != -1) {
  45.       skipFully(in, length);
  46.     }
  47.   }
  48.   public static int  writeCompressedByteArray(DataOutput out, byte[] bytes) throws IOException {
  49.     if (bytes != null) {
  50.       ByteArrayOutputStream bos =  new ByteArrayOutputStream();
  51.       GZIPOutputStream gzout = new GZIPOutputStream(bos);
  52.       gzout.write(bytes, 0, bytes.length);
  53.       gzout.close();
  54.       byte[] buffer = bos.toByteArray();
  55.       int len = buffer.length;
  56.       out.writeInt(len);
  57.       out.write(buffer, 0, len);
  58.       /* debug only! Once we have confidence, can lose this. */
  59.       return ((bytes.length != 0) ? (100*buffer.length)/bytes.length : 0);
  60.     } else {
  61.       out.writeInt(-1);
  62.       return -1;
  63.     }
  64.   }
  65.   /* Ugly utility, maybe someone else can do this better  */
  66.   public static String readCompressedString(DataInput in) throws IOException {
  67.     byte[] bytes = readCompressedByteArray(in);
  68.     if (bytes == null) return null;
  69.     return new String(bytes, "UTF-8");
  70.   }
  71.   public static int  writeCompressedString(DataOutput out, String s) throws IOException {
  72.     return writeCompressedByteArray(out, (s != null) ? s.getBytes("UTF-8") : null);
  73.   }
  74.   /*
  75.    *
  76.    * Write a String as a Network Int n, followed by n Bytes
  77.    * Alternative to 16 bit read/writeUTF.
  78.    * Encoding standard is... ?
  79.    * 
  80.    */
  81.   public static void writeString(DataOutput out, String s) throws IOException {
  82.     if (s != null) {
  83.       byte[] buffer = s.getBytes("UTF-8");
  84.       int len = buffer.length;
  85.       out.writeInt(len);
  86.       out.write(buffer, 0, len);
  87.     } else {
  88.       out.writeInt(-1);
  89.     }
  90.   }
  91.   /*
  92.    * Read a String as a Network Int n, followed by n Bytes
  93.    * Alternative to 16 bit read/writeUTF.
  94.    * Encoding standard is... ?
  95.    *
  96.    */
  97.   public static String readString(DataInput in) throws IOException{
  98.     int length = in.readInt();
  99.     if (length == -1) return null;
  100.     byte[] buffer = new byte[length];
  101.     in.readFully(buffer);      // could/should use readFully(buffer,0,length)?
  102.     return new String(buffer,"UTF-8");  
  103.   }
  104.   /*
  105.    * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  106.    * Could be generalised using introspection.
  107.    *
  108.    */
  109.   public static void writeStringArray(DataOutput out, String[] s) throws IOException{
  110.     out.writeInt(s.length);
  111.     for(int i = 0; i < s.length; i++) {
  112.       writeString(out, s[i]);
  113.     }
  114.   }
  115.   /*
  116.    * Write a String array as a Nework Int N, followed by Int N Byte Array of
  117.    * compressed Strings. Handles also null arrays and null values.
  118.    * Could be generalised using introspection.
  119.    *
  120.    */
  121.   public static void writeCompressedStringArray(DataOutput out, String[] s) throws IOException{
  122.     if (s == null) {
  123.       out.writeInt(-1);
  124.       return;
  125.     }
  126.     out.writeInt(s.length);
  127.     for(int i = 0; i < s.length; i++) {
  128.       writeCompressedString(out, s[i]);
  129.     }
  130.   }
  131.   /*
  132.    * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  133.    * Could be generalised using introspection. Actually this bit couldn't...
  134.    *
  135.    */
  136.   public static String[] readStringArray(DataInput in) throws IOException {
  137.     int len = in.readInt();
  138.     if (len == -1) return null;
  139.     String[] s = new String[len];
  140.     for(int i = 0; i < len; i++) {
  141.       s[i] = readString(in);
  142.     }
  143.     return s;
  144.   }
  145.   /*
  146.    * Write a String array as a Nework Int N, followed by Int N Byte Array Strings.
  147.    * Could be generalised using introspection. Handles null arrays and null values.
  148.    *
  149.    */
  150.   public static  String[] readCompressedStringArray(DataInput in) throws IOException {
  151.     int len = in.readInt();
  152.     if (len == -1) return null;
  153.     String[] s = new String[len];
  154.     for(int i = 0; i < len; i++) {
  155.       s[i] = readCompressedString(in);
  156.     }
  157.     return s;
  158.   }
  159.   /*
  160.    *
  161.    * Test Utility Method Display Byte Array. 
  162.    *
  163.    */
  164.   public static void displayByteArray(byte[] record){
  165.     int i;
  166.     for(i=0;i < record.length -1; i++){
  167.       if (i % 16 == 0) { System.out.println(); }
  168.       System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
  169.       System.out.print(Integer.toHexString(record[i] & 0x0F));
  170.       System.out.print(",");
  171.     }
  172.     System.out.print(Integer.toHexString(record[i]  >> 4 & 0x0F));
  173.     System.out.print(Integer.toHexString(record[i] & 0x0F));
  174.     System.out.println();
  175.   }
  176.   /**
  177.    * Make a copy of a writable object using serialization to a buffer.
  178.    * @param orig The object to copy
  179.    * @return The copied object
  180.    */
  181.   public static <T extends Writable> T clone(T orig, Configuration conf) {
  182.     try {
  183.       @SuppressWarnings("unchecked") // Unchecked cast from Class to Class<T>
  184.       T newInst = ReflectionUtils.newInstance((Class<T>) orig.getClass(), conf);
  185.       ReflectionUtils.copy(conf, orig, newInst);
  186.       return newInst;
  187.     } catch (IOException e) {
  188.       throw new RuntimeException("Error writing/reading clone buffer", e);
  189.     }
  190.   }
  191.   /**
  192.    * Make a copy of the writable object using serialiation to a buffer
  193.    * @param dst the object to copy from
  194.    * @param src the object to copy into, which is destroyed
  195.    * @throws IOException
  196.    * @deprecated use ReflectionUtils.cloneInto instead.
  197.    */
  198.   @Deprecated
  199.   public static void cloneInto(Writable dst, Writable src) throws IOException {
  200.     ReflectionUtils.cloneWritableInto(dst, src);
  201.   }
  202.   /**
  203.    * Serializes an integer to a binary stream with zero-compressed encoding.
  204.    * For -120 <= i <= 127, only one byte is used with the actual value.
  205.    * For other values of i, the first byte value indicates whether the
  206.    * integer is positive or negative, and the number of bytes that follow.
  207.    * If the first byte value v is between -121 and -124, the following integer
  208.    * is positive, with number of bytes that follow are -(v+120).
  209.    * If the first byte value v is between -125 and -128, the following integer
  210.    * is negative, with number of bytes that follow are -(v+124). Bytes are
  211.    * stored in the high-non-zero-byte-first order.
  212.    *
  213.    * @param stream Binary output stream
  214.    * @param i Integer to be serialized
  215.    * @throws java.io.IOException 
  216.    */
  217.   public static void writeVInt(DataOutput stream, int i) throws IOException {
  218.     writeVLong(stream, i);
  219.   }
  220.   
  221.   /**
  222.    * Serializes a long to a binary stream with zero-compressed encoding.
  223.    * For -112 <= i <= 127, only one byte is used with the actual value.
  224.    * For other values of i, the first byte value indicates whether the
  225.    * long is positive or negative, and the number of bytes that follow.
  226.    * If the first byte value v is between -113 and -120, the following long
  227.    * is positive, with number of bytes that follow are -(v+112).
  228.    * If the first byte value v is between -121 and -128, the following long
  229.    * is negative, with number of bytes that follow are -(v+120). Bytes are
  230.    * stored in the high-non-zero-byte-first order.
  231.    * 
  232.    * @param stream Binary output stream
  233.    * @param i Long to be serialized
  234.    * @throws java.io.IOException 
  235.    */
  236.   public static void writeVLong(DataOutput stream, long i) throws IOException {
  237.     if (i >= -112 && i <= 127) {
  238.       stream.writeByte((byte)i);
  239.       return;
  240.     }
  241.       
  242.     int len = -112;
  243.     if (i < 0) {
  244.       i ^= -1L; // take one's complement'
  245.       len = -120;
  246.     }
  247.       
  248.     long tmp = i;
  249.     while (tmp != 0) {
  250.       tmp = tmp >> 8;
  251.       len--;
  252.     }
  253.       
  254.     stream.writeByte((byte)len);
  255.       
  256.     len = (len < -120) ? -(len + 120) : -(len + 112);
  257.       
  258.     for (int idx = len; idx != 0; idx--) {
  259.       int shiftbits = (idx - 1) * 8;
  260.       long mask = 0xFFL << shiftbits;
  261.       stream.writeByte((byte)((i & mask) >> shiftbits));
  262.     }
  263.   }
  264.   
  265.   /**
  266.    * Reads a zero-compressed encoded long from input stream and returns it.
  267.    * @param stream Binary input stream
  268.    * @throws java.io.IOException 
  269.    * @return deserialized long from stream.
  270.    */
  271.   public static long readVLong(DataInput stream) throws IOException {
  272.     byte firstByte = stream.readByte();
  273.     int len = decodeVIntSize(firstByte);
  274.     if (len == 1) {
  275.       return firstByte;
  276.     }
  277.     long i = 0;
  278.     for (int idx = 0; idx < len-1; idx++) {
  279.       byte b = stream.readByte();
  280.       i = i << 8;
  281.       i = i | (b & 0xFF);
  282.     }
  283.     return (isNegativeVInt(firstByte) ? (i ^ -1L) : i);
  284.   }
  285.   /**
  286.    * Reads a zero-compressed encoded integer from input stream and returns it.
  287.    * @param stream Binary input stream
  288.    * @throws java.io.IOException 
  289.    * @return deserialized integer from stream.
  290.    */
  291.   public static int readVInt(DataInput stream) throws IOException {
  292.     return (int) readVLong(stream);
  293.   }
  294.  
  295.   /**
  296.    * Given the first byte of a vint/vlong, determine the sign
  297.    * @param value the first byte
  298.    * @return is the value negative
  299.    */
  300.   public static boolean isNegativeVInt(byte value) {
  301.     return value < -120 || (value >= -112 && value < 0);
  302.   }
  303.   /**
  304.    * Parse the first byte of a vint/vlong to determine the number of bytes
  305.    * @param value the first byte of the vint/vlong
  306.    * @return the total number of bytes (1 to 9)
  307.    */
  308.   public static int decodeVIntSize(byte value) {
  309.     if (value >= -112) {
  310.       return 1;
  311.     } else if (value < -120) {
  312.       return -119 - value;
  313.     }
  314.     return -111 - value;
  315.   }
  316.   /**
  317.    * Get the encoded length if an integer is stored in a variable-length format
  318.    * @return the encoded length 
  319.    */
  320.   public static int getVIntSize(long i) {
  321.     if (i >= -112 && i <= 127) {
  322.       return 1;
  323.     }
  324.       
  325.     if (i < 0) {
  326.       i ^= -1L; // take one's complement'
  327.     }
  328.     // find the number of bytes with non-leading zeros
  329.     int dataBits = Long.SIZE - Long.numberOfLeadingZeros(i);
  330.     // find the number of data bytes + length byte
  331.     return (dataBits + 7) / 8 + 1;
  332.   }
  333.   /**
  334.    * Read an Enum value from DataInput, Enums are read and written 
  335.    * using String values. 
  336.    * @param <T> Enum type
  337.    * @param in DataInput to read from 
  338.    * @param enumType Class type of Enum
  339.    * @return Enum represented by String read from DataInput
  340.    * @throws IOException
  341.    */
  342.   public static <T extends Enum<T>> T readEnum(DataInput in, Class<T> enumType)
  343.     throws IOException{
  344.     return T.valueOf(enumType, Text.readString(in));
  345.   }
  346.   /**
  347.    * writes String value of enum to DataOutput. 
  348.    * @param out Dataoutput stream
  349.    * @param enumVal enum value
  350.    * @throws IOException
  351.    */
  352.   public static void writeEnum(DataOutput out,  Enum<?> enumVal) 
  353.     throws IOException{
  354.     Text.writeString(out, enumVal.name()); 
  355.   }
  356.   /**
  357.    * Skip <i>len</i> number of bytes in input stream<i>in</i>
  358.    * @param in input stream
  359.    * @param len number of bytes to skip
  360.    * @throws IOException when skipped less number of bytes
  361.    */
  362.   public static void skipFully(DataInput in, int len) throws IOException {
  363.     int total = 0;
  364.     int cur = 0;
  365.     while ((total<len) && ((cur = in.skipBytes(len-total)) > 0)) {
  366.         total += cur;
  367.     }
  368.     if (total<len) {
  369.       throw new IOException("Not able to skip " + len + " bytes, possibly " +
  370.                             "due to end of input.");
  371.     }
  372.   }
  373.   /** Convert writables to a byte array */
  374.   public static byte[] toByteArray(Writable... writables) {
  375.     final DataOutputBuffer out = new DataOutputBuffer();
  376.     try {
  377.       for(Writable w : writables) {
  378.         w.write(out);
  379.       }
  380.       out.close();
  381.     } catch (IOException e) {
  382.       throw new RuntimeException("Fail to convert writables to a byte array",e);
  383.     }
  384.     return out.getData();
  385.   }
  386. }