SequenceFileAsBinaryOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.io.DataOutputStream;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.WritableComparable;
  24. import org.apache.hadoop.io.Writable;
  25. import org.apache.hadoop.io.BytesWritable;
  26. import org.apache.hadoop.io.SequenceFile;
  27. import org.apache.hadoop.io.SequenceFile.CompressionType;
  28. import org.apache.hadoop.io.SequenceFile.ValueBytes;
  29. import org.apache.hadoop.io.compress.CompressionCodec;
  30. import org.apache.hadoop.io.compress.DefaultCodec;
  31. import org.apache.hadoop.conf.Configuration;
  32. import org.apache.hadoop.util.ReflectionUtils;
  33. import org.apache.hadoop.util.Progressable;
  34. /** 
  35.  * An {@link OutputFormat} that writes keys, values to 
  36.  * {@link SequenceFile}s in binary(raw) format
  37.  */
  38. public class SequenceFileAsBinaryOutputFormat 
  39.  extends SequenceFileOutputFormat <BytesWritable,BytesWritable> {
  40.   /** 
  41.    * Inner class used for appendRaw
  42.    */
  43.   static protected class WritableValueBytes implements ValueBytes {
  44.     private BytesWritable value;
  45.     public WritableValueBytes() {
  46.       this.value = null;
  47.     }
  48.     public WritableValueBytes(BytesWritable value) {
  49.       this.value = value;
  50.     }
  51.     public void reset(BytesWritable value) {
  52.       this.value = value;
  53.     }
  54.     public void writeUncompressedBytes(DataOutputStream outStream)
  55.       throws IOException {
  56.       outStream.write(value.getBytes(), 0, value.getLength());
  57.     }
  58.     public void writeCompressedBytes(DataOutputStream outStream)
  59.       throws IllegalArgumentException, IOException {
  60.       throw
  61.         new UnsupportedOperationException("WritableValueBytes doesn't support " 
  62.                                           + "RECORD compression"); 
  63.     }
  64.     public int getSize(){
  65.       return value.getLength();
  66.     }
  67.   }
  68.   /**
  69.    * Set the key class for the {@link SequenceFile}
  70.    * <p>This allows the user to specify the key class to be different 
  71.    * from the actual class ({@link BytesWritable}) used for writing </p>
  72.    * 
  73.    * @param conf the {@link JobConf} to modify
  74.    * @param theClass the SequenceFile output key class.
  75.    */
  76.   static public void setSequenceFileOutputKeyClass(JobConf conf, 
  77.                                                    Class<?> theClass) {
  78.     conf.setClass("mapred.seqbinary.output.key.class", theClass, Object.class);
  79.   }
  80.   /**
  81.    * Set the value class for the {@link SequenceFile}
  82.    * <p>This allows the user to specify the value class to be different 
  83.    * from the actual class ({@link BytesWritable}) used for writing </p>
  84.    * 
  85.    * @param conf the {@link JobConf} to modify
  86.    * @param theClass the SequenceFile output key class.
  87.    */
  88.   static public void setSequenceFileOutputValueClass(JobConf conf, 
  89.                                                      Class<?> theClass) {
  90.     conf.setClass("mapred.seqbinary.output.value.class", 
  91.                   theClass, Object.class);
  92.   }
  93.   /**
  94.    * Get the key class for the {@link SequenceFile}
  95.    * 
  96.    * @return the key class of the {@link SequenceFile}
  97.    */
  98.   static public Class<? extends WritableComparable> getSequenceFileOutputKeyClass(JobConf conf) { 
  99.     return conf.getClass("mapred.seqbinary.output.key.class", 
  100.                          conf.getOutputKeyClass().asSubclass(WritableComparable.class),
  101.                          WritableComparable.class);
  102.   }
  103.   /**
  104.    * Get the value class for the {@link SequenceFile}
  105.    * 
  106.    * @return the value class of the {@link SequenceFile}
  107.    */
  108.   static public Class<? extends Writable> getSequenceFileOutputValueClass(JobConf conf) { 
  109.     return conf.getClass("mapred.seqbinary.output.value.class", 
  110.                          conf.getOutputValueClass().asSubclass(Writable.class),
  111.                          Writable.class);
  112.   }
  113.   
  114.   @Override 
  115.   public RecordWriter <BytesWritable, BytesWritable> 
  116.              getRecordWriter(FileSystem ignored, JobConf job,
  117.                              String name, Progressable progress)
  118.     throws IOException {
  119.     // get the path of the temporary output file 
  120.     Path file = FileOutputFormat.getTaskOutputPath(job, name);
  121.     
  122.     FileSystem fs = file.getFileSystem(job);
  123.     CompressionCodec codec = null;
  124.     CompressionType compressionType = CompressionType.NONE;
  125.     if (getCompressOutput(job)) {
  126.       // find the kind of compression to do
  127.       compressionType = getOutputCompressionType(job);
  128.       // find the right codec
  129.       Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(job,
  130.   DefaultCodec.class);
  131.       codec = ReflectionUtils.newInstance(codecClass, job);
  132.     }
  133.     final SequenceFile.Writer out = 
  134.       SequenceFile.createWriter(fs, job, file,
  135.                     getSequenceFileOutputKeyClass(job),
  136.                     getSequenceFileOutputValueClass(job),
  137.                     compressionType,
  138.                     codec,
  139.                     progress);
  140.     return new RecordWriter<BytesWritable, BytesWritable>() {
  141.         
  142.         private WritableValueBytes wvaluebytes = new WritableValueBytes();
  143.         public void write(BytesWritable bkey, BytesWritable bvalue)
  144.           throws IOException {
  145.           wvaluebytes.reset(bvalue);
  146.           out.appendRaw(bkey.getBytes(), 0, bkey.getLength(), wvaluebytes);
  147.           wvaluebytes.reset(null);
  148.         }
  149.         public void close(Reporter reporter) throws IOException { 
  150.           out.close();
  151.         }
  152.       };
  153.   }
  154.   @Override 
  155.   public void checkOutputSpecs(FileSystem ignored, JobConf job) 
  156.             throws IOException {
  157.     super.checkOutputSpecs(ignored, job);
  158.     if (getCompressOutput(job) && 
  159.         getOutputCompressionType(job) == CompressionType.RECORD ){
  160.         throw new InvalidJobConfException("SequenceFileAsBinaryOutputFormat "
  161.                     + "doesn't support Record Compression" );
  162.     }
  163.   }
  164. }