TextOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapreduce.lib.output;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.io.UnsupportedEncodingException;
  22. import org.apache.hadoop.conf.Configuration;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.io.NullWritable;
  27. import org.apache.hadoop.io.Text;
  28. import org.apache.hadoop.io.compress.CompressionCodec;
  29. import org.apache.hadoop.io.compress.GzipCodec;
  30. import org.apache.hadoop.mapreduce.OutputFormat;
  31. import org.apache.hadoop.mapreduce.RecordWriter;
  32. import org.apache.hadoop.mapreduce.TaskAttemptContext;
  33. import org.apache.hadoop.util.*;
  34. /** An {@link OutputFormat} that writes plain text files. */
  35. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  36.   protected static class LineRecordWriter<K, V>
  37.     extends RecordWriter<K, V> {
  38.     private static final String utf8 = "UTF-8";
  39.     private static final byte[] newline;
  40.     static {
  41.       try {
  42.         newline = "n".getBytes(utf8);
  43.       } catch (UnsupportedEncodingException uee) {
  44.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  45.       }
  46.     }
  47.     protected DataOutputStream out;
  48.     private final byte[] keyValueSeparator;
  49.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  50.       this.out = out;
  51.       try {
  52.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  53.       } catch (UnsupportedEncodingException uee) {
  54.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  55.       }
  56.     }
  57.     public LineRecordWriter(DataOutputStream out) {
  58.       this(out, "t");
  59.     }
  60.     /**
  61.      * Write the object to the byte stream, handling Text as a special
  62.      * case.
  63.      * @param o the object to print
  64.      * @throws IOException if the write throws, we pass it on
  65.      */
  66.     private void writeObject(Object o) throws IOException {
  67.       if (o instanceof Text) {
  68.         Text to = (Text) o;
  69.         out.write(to.getBytes(), 0, to.getLength());
  70.       } else {
  71.         out.write(o.toString().getBytes(utf8));
  72.       }
  73.     }
  74.     public synchronized void write(K key, V value)
  75.       throws IOException {
  76.       boolean nullKey = key == null || key instanceof NullWritable;
  77.       boolean nullValue = value == null || value instanceof NullWritable;
  78.       if (nullKey && nullValue) {
  79.         return;
  80.       }
  81.       if (!nullKey) {
  82.         writeObject(key);
  83.       }
  84.       if (!(nullKey || nullValue)) {
  85.         out.write(keyValueSeparator);
  86.       }
  87.       if (!nullValue) {
  88.         writeObject(value);
  89.       }
  90.       out.write(newline);
  91.     }
  92.     public synchronized 
  93.     void close(TaskAttemptContext context) throws IOException {
  94.       out.close();
  95.     }
  96.   }
  97.   public RecordWriter<K, V> 
  98.          getRecordWriter(TaskAttemptContext job
  99.                          ) throws IOException, InterruptedException {
  100.     Configuration conf = job.getConfiguration();
  101.     boolean isCompressed = getCompressOutput(job);
  102.     String keyValueSeparator= conf.get("mapred.textoutputformat.separator",
  103.                                        "t");
  104.     CompressionCodec codec = null;
  105.     String extension = "";
  106.     if (isCompressed) {
  107.       Class<? extends CompressionCodec> codecClass = 
  108.         getOutputCompressorClass(job, GzipCodec.class);
  109.       codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
  110.       extension = codec.getDefaultExtension();
  111.     }
  112.     Path file = getDefaultWorkFile(job, extension);
  113.     FileSystem fs = file.getFileSystem(conf);
  114.     if (!isCompressed) {
  115.       FSDataOutputStream fileOut = fs.create(file, false);
  116.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  117.     } else {
  118.       FSDataOutputStream fileOut = fs.create(file, false);
  119.       return new LineRecordWriter<K, V>(new DataOutputStream
  120.                                         (codec.createOutputStream(fileOut)),
  121.                                         keyValueSeparator);
  122.     }
  123.   }
  124. }