TestFileOutputFormat.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.fs.FileStatus;
  20. import org.apache.hadoop.fs.FileSystem;
  21. import org.apache.hadoop.fs.Path;
  22. import org.apache.hadoop.io.LongWritable;
  23. import org.apache.hadoop.io.Text;
  24. import java.io.DataOutputStream;
  25. import java.io.IOException;
  26. import java.io.OutputStream;
  27. import java.util.Iterator;
  28. public class TestFileOutputFormat extends HadoopTestCase {
  29.   public TestFileOutputFormat() throws IOException {
  30.     super(HadoopTestCase.CLUSTER_MR, HadoopTestCase.LOCAL_FS, 1, 1);
  31.   }
  32.   public void testCustomFile() throws Exception {
  33.     Path inDir = new Path("testing/fileoutputformat/input");
  34.     Path outDir = new Path("testing/fileoutputformat/output");
  35.     // Hack for local FS that does not have the concept of a 'mounting point'
  36.     if (isLocalFS()) {
  37.       String localPathRoot = System.getProperty("test.build.data", "/tmp")
  38.         .replace(' ', '+');
  39.       inDir = new Path(localPathRoot, inDir);
  40.       outDir = new Path(localPathRoot, outDir);
  41.     }
  42.     JobConf conf = createJobConf();
  43.     FileSystem fs = FileSystem.get(conf);
  44.     fs.delete(outDir, true);
  45.     if (!fs.mkdirs(inDir)) {
  46.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  47.     }
  48.     DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  49.     file.writeBytes("anbnncndne");
  50.     file.close();
  51.     file = fs.create(new Path(inDir, "part-1"));
  52.     file.writeBytes("anbnncndne");
  53.     file.close();
  54.     conf.setJobName("fof");
  55.     conf.setInputFormat(TextInputFormat.class);
  56.     conf.setOutputKeyClass(LongWritable.class);
  57.     conf.setOutputValueClass(Text.class);
  58.     conf.setMapOutputKeyClass(LongWritable.class);
  59.     conf.setMapOutputValueClass(Text.class);
  60.     conf.setOutputFormat(TextOutputFormat.class);
  61.     conf.setOutputKeyClass(LongWritable.class);
  62.     conf.setOutputValueClass(Text.class);
  63.     conf.setMapperClass(TestMap.class);
  64.     conf.setReducerClass(TestReduce.class);
  65.     FileInputFormat.setInputPaths(conf, inDir);
  66.     FileOutputFormat.setOutputPath(conf, outDir);
  67.     JobClient jc = new JobClient(conf);
  68.     RunningJob job = jc.submitJob(conf);
  69.     while (!job.isComplete()) {
  70.       Thread.sleep(100);
  71.     }
  72.     assertTrue(job.isSuccessful());
  73.     boolean map0 = false;
  74.     boolean map1 = false;
  75.     boolean reduce = false;
  76.     FileStatus[] statuses = fs.listStatus(outDir);
  77.     for (FileStatus status : statuses) {
  78.       map0 = map0 || status.getPath().getName().equals("test-m-00000");
  79.       map1 = map1 || status.getPath().getName().equals("test-m-00001");
  80.       reduce = reduce || status.getPath().getName().equals("test-r-00000");
  81.     }
  82.     assertTrue(map0);
  83.     assertTrue(map1);
  84.     assertTrue(reduce);
  85.   }
  86.   public static class TestMap implements Mapper<LongWritable, Text,
  87.     LongWritable, Text> {
  88.     public void configure(JobConf conf) {
  89.       try {
  90.         FileSystem fs = FileSystem.get(conf);
  91.         OutputStream os =
  92.           fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
  93.         os.write(1);
  94.         os.close();
  95.       }
  96.       catch (IOException ex) {
  97.         throw new RuntimeException(ex);
  98.       }
  99.     }
  100.     public void map(LongWritable key, Text value,
  101.                     OutputCollector<LongWritable, Text> output,
  102.                     Reporter reporter) throws IOException {
  103.       output.collect(key, value);
  104.     }
  105.     public void close() throws IOException {
  106.     }
  107.   }
  108.   public static class TestReduce implements Reducer<LongWritable, Text,
  109.     LongWritable, Text> {
  110.     public void configure(JobConf conf) {
  111.       try {
  112.         FileSystem fs = FileSystem.get(conf);
  113.         OutputStream os =
  114.           fs.create(FileOutputFormat.getPathForCustomFile(conf, "test"));
  115.         os.write(1);
  116.         os.close();
  117.       }
  118.       catch (IOException ex) {
  119.         throw new RuntimeException(ex);
  120.       }
  121.     }
  122.     public void reduce(LongWritable key, Iterator<Text> values,
  123.                        OutputCollector<LongWritable, Text> output,
  124.                        Reporter reporter) throws IOException {
  125.       while (values.hasNext()) {
  126.         Text value = values.next();
  127.         output.collect(key, value);
  128.       }
  129.     }
  130.     public void close() throws IOException {
  131.     }
  132.   }
  133. }