TextOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataOutputStream;
  20. import java.io.IOException;
  21. import java.io.UnsupportedEncodingException;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.fs.FSDataOutputStream;
  25. import org.apache.hadoop.io.NullWritable;
  26. import org.apache.hadoop.io.Text;
  27. import org.apache.hadoop.io.compress.CompressionCodec;
  28. import org.apache.hadoop.io.compress.GzipCodec;
  29. import org.apache.hadoop.util.*;
  30. /** An {@link OutputFormat} that writes plain text files. 
  31.  * @deprecated Use 
  32.  *   {@link org.apache.hadoop.mapreduce.lib.output.TextOutputFormat} instead.
  33.  */
  34. @Deprecated
  35. public class TextOutputFormat<K, V> extends FileOutputFormat<K, V> {
  36.   protected static class LineRecordWriter<K, V>
  37.     implements RecordWriter<K, V> {
  38.     private static final String utf8 = "UTF-8";
  39.     private static final byte[] newline;
  40.     static {
  41.       try {
  42.         newline = "n".getBytes(utf8);
  43.       } catch (UnsupportedEncodingException uee) {
  44.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  45.       }
  46.     }
  47.     protected DataOutputStream out;
  48.     private final byte[] keyValueSeparator;
  49.     public LineRecordWriter(DataOutputStream out, String keyValueSeparator) {
  50.       this.out = out;
  51.       try {
  52.         this.keyValueSeparator = keyValueSeparator.getBytes(utf8);
  53.       } catch (UnsupportedEncodingException uee) {
  54.         throw new IllegalArgumentException("can't find " + utf8 + " encoding");
  55.       }
  56.     }
  57.     public LineRecordWriter(DataOutputStream out) {
  58.       this(out, "t");
  59.     }
  60.     /**
  61.      * Write the object to the byte stream, handling Text as a special
  62.      * case.
  63.      * @param o the object to print
  64.      * @throws IOException if the write throws, we pass it on
  65.      */
  66.     private void writeObject(Object o) throws IOException {
  67.       if (o instanceof Text) {
  68.         Text to = (Text) o;
  69.         out.write(to.getBytes(), 0, to.getLength());
  70.       } else {
  71.         out.write(o.toString().getBytes(utf8));
  72.       }
  73.     }
  74.     public synchronized void write(K key, V value)
  75.       throws IOException {
  76.       boolean nullKey = key == null || key instanceof NullWritable;
  77.       boolean nullValue = value == null || value instanceof NullWritable;
  78.       if (nullKey && nullValue) {
  79.         return;
  80.       }
  81.       if (!nullKey) {
  82.         writeObject(key);
  83.       }
  84.       if (!(nullKey || nullValue)) {
  85.         out.write(keyValueSeparator);
  86.       }
  87.       if (!nullValue) {
  88.         writeObject(value);
  89.       }
  90.       out.write(newline);
  91.     }
  92.     public synchronized void close(Reporter reporter) throws IOException {
  93.       out.close();
  94.     }
  95.   }
  96.   public RecordWriter<K, V> getRecordWriter(FileSystem ignored,
  97.                                                   JobConf job,
  98.                                                   String name,
  99.                                                   Progressable progress)
  100.     throws IOException {
  101.     boolean isCompressed = getCompressOutput(job);
  102.     String keyValueSeparator = job.get("mapred.textoutputformat.separator", 
  103.                                        "t");
  104.     if (!isCompressed) {
  105.       Path file = FileOutputFormat.getTaskOutputPath(job, name);
  106.       FileSystem fs = file.getFileSystem(job);
  107.       FSDataOutputStream fileOut = fs.create(file, progress);
  108.       return new LineRecordWriter<K, V>(fileOut, keyValueSeparator);
  109.     } else {
  110.       Class<? extends CompressionCodec> codecClass =
  111.         getOutputCompressorClass(job, GzipCodec.class);
  112.       // create the named codec
  113.       CompressionCodec codec = ReflectionUtils.newInstance(codecClass, job);
  114.       // build the filename including the extension
  115.       Path file = 
  116.         FileOutputFormat.getTaskOutputPath(job, 
  117.                                            name + codec.getDefaultExtension());
  118.       FileSystem fs = file.getFileSystem(job);
  119.       FSDataOutputStream fileOut = fs.create(file, progress);
  120.       return new LineRecordWriter<K, V>(new DataOutputStream
  121.                                         (codec.createOutputStream(fileOut)),
  122.                                         keyValueSeparator);
  123.     }
  124.   }
  125. }