TestClusterMapReduceTestCase.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.fs.FileUtil;
  20. import org.apache.hadoop.fs.Path;
  21. import org.apache.hadoop.io.LongWritable;
  22. import org.apache.hadoop.io.Text;
  23. import java.io.*;
  24. import java.util.Properties;
  25. public class TestClusterMapReduceTestCase extends ClusterMapReduceTestCase {
  26.   public void _testMapReduce(boolean restart) throws Exception {
  27.     OutputStream os = getFileSystem().create(new Path(getInputDir(), "text.txt"));
  28.     Writer wr = new OutputStreamWriter(os);
  29.     wr.write("hello1n");
  30.     wr.write("hello2n");
  31.     wr.write("hello3n");
  32.     wr.write("hello4n");
  33.     wr.close();
  34.     if (restart) {
  35.       stopCluster();
  36.       startCluster(false, null);
  37.     }
  38.     
  39.     JobConf conf = createJobConf();
  40.     conf.setJobName("mr");
  41.     conf.setInputFormat(TextInputFormat.class);
  42.     conf.setMapOutputKeyClass(LongWritable.class);
  43.     conf.setMapOutputValueClass(Text.class);
  44.     conf.setOutputFormat(TextOutputFormat.class);
  45.     conf.setOutputKeyClass(LongWritable.class);
  46.     conf.setOutputValueClass(Text.class);
  47.     conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
  48.     conf.setReducerClass(org.apache.hadoop.mapred.lib.IdentityReducer.class);
  49.     FileInputFormat.setInputPaths(conf, getInputDir());
  50.     FileOutputFormat.setOutputPath(conf, getOutputDir());
  51.     JobClient.runJob(conf);
  52.     Path[] outputFiles = FileUtil.stat2Paths(
  53.                            getFileSystem().listStatus(getOutputDir(),
  54.                            new OutputLogFilter()));
  55.     if (outputFiles.length > 0) {
  56.       InputStream is = getFileSystem().open(outputFiles[0]);
  57.       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  58.       String line = reader.readLine();
  59.       int counter = 0;
  60.       while (line != null) {
  61.         counter++;
  62.         assertTrue(line.contains("hello"));
  63.         line = reader.readLine();
  64.       }
  65.       reader.close();
  66.       assertEquals(4, counter);
  67.     }
  68.   }
  69.   public void testMapReduce() throws Exception {
  70.     _testMapReduce(false);
  71.   }
  72.   public void testMapReduceRestarting() throws Exception {
  73.     _testMapReduce(true);
  74.   }
  75.   public void testDFSRestart() throws Exception {
  76.     Path file = new Path(getInputDir(), "text.txt");
  77.     OutputStream os = getFileSystem().create(file);
  78.     Writer wr = new OutputStreamWriter(os);
  79.     wr.close();
  80.     stopCluster();
  81.     startCluster(false, null);
  82.     assertTrue(getFileSystem().exists(file));
  83.     stopCluster();
  84.     startCluster(true, null);
  85.     assertFalse(getFileSystem().exists(file));
  86.     
  87.   }
  88.   public void testMRConfig() throws Exception {
  89.     JobConf conf = createJobConf();
  90.     assertNull(conf.get("xyz"));
  91.     Properties config = new Properties();
  92.     config.setProperty("xyz", "XYZ");
  93.     stopCluster();
  94.     startCluster(false, config);
  95.     conf = createJobConf();
  96.     assertEquals("XYZ", conf.get("xyz"));
  97.   }
  98. }