CompressionCodecFactory.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /*
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.io.compress;
  19. import java.util.*;
  20. import org.apache.commons.logging.Log;
  21. import org.apache.commons.logging.LogFactory;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.util.ReflectionUtils;
  25. /**
  26.  * A factory that will find the correct codec for a given filename.
  27.  */
  28. public class CompressionCodecFactory {
  29.   public static final Log LOG =
  30.     LogFactory.getLog(CompressionCodecFactory.class.getName());
  31.   /**
  32.    * A map from the reversed filename suffixes to the codecs.
  33.    * This is probably overkill, because the maps should be small, but it 
  34.    * automatically supports finding the longest matching suffix. 
  35.    */
  36.   private SortedMap<String, CompressionCodec> codecs = null;
  37.   
  38.   private void addCodec(CompressionCodec codec) {
  39.     String suffix = codec.getDefaultExtension();
  40.     codecs.put(new StringBuffer(suffix).reverse().toString(), codec);
  41.   }
  42.   
  43.   /**
  44.    * Print the extension map out as a string.
  45.    */
  46.   public String toString() {
  47.     StringBuffer buf = new StringBuffer();
  48.     Iterator<Map.Entry<String, CompressionCodec>> itr = 
  49.       codecs.entrySet().iterator();
  50.     buf.append("{ ");
  51.     if (itr.hasNext()) {
  52.       Map.Entry<String, CompressionCodec> entry = itr.next();
  53.       buf.append(entry.getKey());
  54.       buf.append(": ");
  55.       buf.append(entry.getValue().getClass().getName());
  56.       while (itr.hasNext()) {
  57.         entry = itr.next();
  58.         buf.append(", ");
  59.         buf.append(entry.getKey());
  60.         buf.append(": ");
  61.         buf.append(entry.getValue().getClass().getName());
  62.       }
  63.     }
  64.     buf.append(" }");
  65.     return buf.toString();
  66.   }
  67.   /**
  68.    * Get the list of codecs listed in the configuration
  69.    * @param conf the configuration to look in
  70.    * @return a list of the Configuration classes or null if the attribute
  71.    *         was not set
  72.    */
  73.   public static List<Class<? extends CompressionCodec>> getCodecClasses(Configuration conf) {
  74.     String codecsString = conf.get("io.compression.codecs");
  75.     if (codecsString != null) {
  76.       List<Class<? extends CompressionCodec>> result
  77.         = new ArrayList<Class<? extends CompressionCodec>>();
  78.       StringTokenizer codecSplit = new StringTokenizer(codecsString, ",");
  79.       while (codecSplit.hasMoreElements()) {
  80.         String codecSubstring = codecSplit.nextToken();
  81.         if (codecSubstring.length() != 0) {
  82.           try {
  83.             Class<?> cls = conf.getClassByName(codecSubstring);
  84.             if (!CompressionCodec.class.isAssignableFrom(cls)) {
  85.               throw new IllegalArgumentException("Class " + codecSubstring +
  86.                                                  " is not a CompressionCodec");
  87.             }
  88.             result.add(cls.asSubclass(CompressionCodec.class));
  89.           } catch (ClassNotFoundException ex) {
  90.             throw new IllegalArgumentException("Compression codec " + 
  91.                                                codecSubstring + " not found.",
  92.                                                ex);
  93.           }
  94.         }
  95.       }
  96.       return result;
  97.     } else {
  98.       return null;
  99.     }
  100.   }
  101.   
  102.   /**
  103.    * Sets a list of codec classes in the configuration.
  104.    * @param conf the configuration to modify
  105.    * @param classes the list of classes to set
  106.    */
  107.   public static void setCodecClasses(Configuration conf,
  108.                                      List<Class> classes) {
  109.     StringBuffer buf = new StringBuffer();
  110.     Iterator<Class> itr = classes.iterator();
  111.     if (itr.hasNext()) {
  112.       Class cls = itr.next();
  113.       buf.append(cls.getName());
  114.       while(itr.hasNext()) {
  115.         buf.append(',');
  116.         buf.append(itr.next().getName());
  117.       }
  118.     }
  119.     conf.set("io.compression.codecs", buf.toString());   
  120.   }
  121.   
  122.   /**
  123.    * Find the codecs specified in the config value io.compression.codecs 
  124.    * and register them. Defaults to gzip and zip.
  125.    */
  126.   public CompressionCodecFactory(Configuration conf) {
  127.     codecs = new TreeMap<String, CompressionCodec>();
  128.     List<Class<? extends CompressionCodec>> codecClasses = getCodecClasses(conf);
  129.     if (codecClasses == null) {
  130.       addCodec(new GzipCodec());
  131.       addCodec(new DefaultCodec());      
  132.     } else {
  133.       Iterator<Class<? extends CompressionCodec>> itr = codecClasses.iterator();
  134.       while (itr.hasNext()) {
  135.         CompressionCodec codec = ReflectionUtils.newInstance(itr.next(), conf);
  136.         addCodec(codec);     
  137.       }
  138.     }
  139.   }
  140.   
  141.   /**
  142.    * Find the relevant compression codec for the given file based on its
  143.    * filename suffix.
  144.    * @param file the filename to check
  145.    * @return the codec object
  146.    */
  147.   public CompressionCodec getCodec(Path file) {
  148.     CompressionCodec result = null;
  149.     if (codecs != null) {
  150.       String filename = file.getName();
  151.       String reversedFilename = new StringBuffer(filename).reverse().toString();
  152.       SortedMap<String, CompressionCodec> subMap = 
  153.         codecs.headMap(reversedFilename);
  154.       if (!subMap.isEmpty()) {
  155.         String potentialSuffix = subMap.lastKey();
  156.         if (reversedFilename.startsWith(potentialSuffix)) {
  157.           result = codecs.get(potentialSuffix);
  158.         }
  159.       }
  160.     }
  161.     return result;
  162.   }
  163.   
  164.   /**
  165.    * Removes a suffix from a filename, if it has it.
  166.    * @param filename the filename to strip
  167.    * @param suffix the suffix to remove
  168.    * @return the shortened filename
  169.    */
  170.   public static String removeSuffix(String filename, String suffix) {
  171.     if (filename.endsWith(suffix)) {
  172.       return filename.substring(0, filename.length() - suffix.length());
  173.     }
  174.     return filename;
  175.   }
  176.   
  177.   /**
  178.    * A little test program.
  179.    * @param args
  180.    */
  181.   public static void main(String[] args) throws Exception {
  182.     Configuration conf = new Configuration();
  183.     CompressionCodecFactory factory = new CompressionCodecFactory(conf);
  184.     boolean encode = false;
  185.     for(int i=0; i < args.length; ++i) {
  186.       if ("-in".equals(args[i])) {
  187.         encode = true;
  188.       } else if ("-out".equals(args[i])) {
  189.         encode = false;
  190.       } else {
  191.         CompressionCodec codec = factory.getCodec(new Path(args[i]));
  192.         if (codec == null) {
  193.           System.out.println("Codec for " + args[i] + " not found.");
  194.         } else { 
  195.           if (encode) {
  196.             CompressionOutputStream out = 
  197.               codec.createOutputStream(new java.io.FileOutputStream(args[i]));
  198.             byte[] buffer = new byte[100];
  199.             String inFilename = removeSuffix(args[i], 
  200.                                              codec.getDefaultExtension());
  201.             java.io.InputStream in = new java.io.FileInputStream(inFilename);
  202.             int len = in.read(buffer);
  203.             while (len > 0) {
  204.               out.write(buffer, 0, len);
  205.               len = in.read(buffer);
  206.             }
  207.             in.close();
  208.             out.close();
  209.           } else {
  210.             CompressionInputStream in = 
  211.               codec.createInputStream(new java.io.FileInputStream(args[i]));
  212.             byte[] buffer = new byte[100];
  213.             int len = in.read(buffer);
  214.             while (len > 0) {
  215.               System.out.write(buffer, 0, len);
  216.               len = in.read(buffer);
  217.             }
  218.             in.close();
  219.           }
  220.         }
  221.       }
  222.     }
  223.   }
  224. }