TestJavaSerialization.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.BufferedReader;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.io.InputStreamReader;
  23. import java.io.OutputStream;
  24. import java.io.OutputStreamWriter;
  25. import java.io.Writer;
  26. import java.util.Iterator;
  27. import java.util.StringTokenizer;
  28. import org.apache.hadoop.fs.FileUtil;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.io.LongWritable;
  31. import org.apache.hadoop.io.Text;
  32. import org.apache.hadoop.io.serializer.JavaSerializationComparator;
  33. public class TestJavaSerialization extends ClusterMapReduceTestCase {
  34.   
  35.   static class WordCountMapper extends MapReduceBase implements
  36.       Mapper<LongWritable, Text, String, Long> {
  37.     public void map(LongWritable key, Text value,
  38.         OutputCollector<String, Long> output, Reporter reporter)
  39.         throws IOException {
  40.       StringTokenizer st = new StringTokenizer(value.toString());
  41.       while (st.hasMoreTokens()) {
  42.         output.collect(st.nextToken(), 1L);
  43.       }
  44.     }
  45.   }
  46.   
  47.   static class SumReducer<K> extends MapReduceBase implements
  48.       Reducer<K, Long, K, Long> {
  49.     
  50.     public void reduce(K key, Iterator<Long> values,
  51.         OutputCollector<K, Long> output, Reporter reporter)
  52.       throws IOException {
  53.       long sum = 0;
  54.       while (values.hasNext()) {
  55.         sum += values.next();
  56.       }
  57.       output.collect(key, sum);
  58.     }
  59.     
  60.   }
  61.   
  62.   public void testMapReduceJob() throws Exception {
  63.     OutputStream os = getFileSystem().create(new Path(getInputDir(),
  64.         "text.txt"));
  65.     Writer wr = new OutputStreamWriter(os);
  66.     wr.write("b an");
  67.     wr.close();
  68.     JobConf conf = createJobConf();
  69.     conf.setJobName("JavaSerialization");
  70.     
  71.     conf.set("io.serializations",
  72.     "org.apache.hadoop.io.serializer.JavaSerialization," +
  73.     "org.apache.hadoop.io.serializer.WritableSerialization");
  74.     conf.setInputFormat(TextInputFormat.class);
  75.     conf.setOutputKeyClass(String.class);
  76.     conf.setOutputValueClass(Long.class);
  77.     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
  78.     conf.setMapperClass(WordCountMapper.class);
  79.     conf.setReducerClass(SumReducer.class);
  80.     FileInputFormat.setInputPaths(conf, getInputDir());
  81.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  82.     JobClient.runJob(conf);
  83.     Path[] outputFiles = FileUtil.stat2Paths(
  84.                            getFileSystem().listStatus(getOutputDir(),
  85.                            new OutputLogFilter()));
  86.     assertEquals(1, outputFiles.length);
  87.     InputStream is = getFileSystem().open(outputFiles[0]);
  88.     BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  89.     assertEquals("at1", reader.readLine());
  90.     assertEquals("bt1", reader.readLine());
  91.     assertNull(reader.readLine());
  92.     reader.close();
  93.   }
  94.   /**
  95.    * HADOOP-4466:
  96.    * This test verifies the JavSerialization impl can write to SequenceFiles. by virtue other
  97.    * SequenceFileOutputFormat is not coupled to Writable types, if so, the job will fail.
  98.    *
  99.    */
  100.   public void testWriteToSequencefile() throws Exception {
  101.     OutputStream os = getFileSystem().create(new Path(getInputDir(),
  102.         "text.txt"));
  103.     Writer wr = new OutputStreamWriter(os);
  104.     wr.write("b an");
  105.     wr.close();
  106.     JobConf conf = createJobConf();
  107.     conf.setJobName("JavaSerialization");
  108.     conf.set("io.serializations",
  109.     "org.apache.hadoop.io.serializer.JavaSerialization," +
  110.     "org.apache.hadoop.io.serializer.WritableSerialization");
  111.     conf.setInputFormat(TextInputFormat.class);
  112.     conf.setOutputFormat(SequenceFileOutputFormat.class); // test we can write to sequence files
  113.     conf.setOutputKeyClass(String.class);
  114.     conf.setOutputValueClass(Long.class);
  115.     conf.setOutputKeyComparatorClass(JavaSerializationComparator.class);
  116.     conf.setMapperClass(WordCountMapper.class);
  117.     conf.setReducerClass(SumReducer.class);
  118.     FileInputFormat.setInputPaths(conf, getInputDir());
  119.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  120.     JobClient.runJob(conf);
  121.     Path[] outputFiles = FileUtil.stat2Paths(
  122.                            getFileSystem().listStatus(getOutputDir(),
  123.                            new OutputLogFilter()));
  124.     assertEquals(1, outputFiles.length);
  125. }
  126. }