TestKeyFieldBasedComparator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.lib;
  19. import java.io.*;
  20. import org.apache.hadoop.fs.FileSystem;
  21. import org.apache.hadoop.fs.FileUtil;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.LongWritable;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.mapred.FileInputFormat;
  26. import org.apache.hadoop.mapred.FileOutputFormat;
  27. import org.apache.hadoop.mapred.HadoopTestCase;
  28. import org.apache.hadoop.mapred.JobClient;
  29. import org.apache.hadoop.mapred.JobConf;
  30. import org.apache.hadoop.mapred.OutputLogFilter;
  31. import org.apache.hadoop.mapred.RunningJob;
  32. import org.apache.hadoop.mapred.TextInputFormat;
  33. import org.apache.hadoop.mapred.TextOutputFormat;
  34. public class TestKeyFieldBasedComparator extends HadoopTestCase {
  35.   JobConf conf;
  36.   String line1 = "123 -123 005120 123.9 0.01 0.18 010 10.1 4444 011 011 234";
  37.   String line2 = "134 -12 005100 123.10 -1.01 0.19 02 10.0 4444.1";
  38.   public TestKeyFieldBasedComparator() throws IOException {
  39.     super(HadoopTestCase.LOCAL_MR, HadoopTestCase.LOCAL_FS, 1, 1);
  40.     conf = createJobConf();
  41.   }
  42.   public void configure(String keySpec, int expect) throws Exception {
  43.     Path testdir = new Path("build/test/test.mapred.spill");
  44.     Path inDir = new Path(testdir, "in");
  45.     Path outDir = new Path(testdir, "out");
  46.     FileSystem fs = getFileSystem();
  47.     fs.delete(testdir, true);
  48.     conf.setInputFormat(TextInputFormat.class);
  49.     FileInputFormat.setInputPaths(conf, inDir);
  50.     FileOutputFormat.setOutputPath(conf, outDir);
  51.     conf.setOutputKeyClass(Text.class);
  52.     conf.setOutputValueClass(LongWritable.class);
  53.     conf.setNumMapTasks(1);
  54.     conf.setNumReduceTasks(2);
  55.     conf.setOutputFormat(TextOutputFormat.class);
  56.     conf.setOutputKeyComparatorClass(KeyFieldBasedComparator.class);
  57.     conf.setKeyFieldComparatorOptions(keySpec);
  58.     conf.setKeyFieldPartitionerOptions("-k1.1,1.1");
  59.     conf.set("map.output.key.field.separator", " ");
  60.     conf.setMapperClass(InverseMapper.class);
  61.     conf.setReducerClass(IdentityReducer.class);
  62.     if (!fs.mkdirs(testdir)) {
  63.       throw new IOException("Mkdirs failed to create " + testdir.toString());
  64.     }
  65.     if (!fs.mkdirs(inDir)) {
  66.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  67.     }
  68.     // set up input data in 2 files 
  69.     Path inFile = new Path(inDir, "part0");
  70.     FileOutputStream fos = new FileOutputStream(inFile.toString());
  71.     fos.write((line1 + "n").getBytes());
  72.     fos.write((line2 + "n").getBytes());
  73.     fos.close();
  74.     JobClient jc = new JobClient(conf);
  75.     RunningJob r_job = jc.submitJob(conf);
  76.     while (!r_job.isComplete()) {
  77.       Thread.sleep(1000);
  78.     }
  79.     
  80.     if (!r_job.isSuccessful()) {
  81.       fail("Oops! The job broke due to an unexpected error");
  82.     }
  83.     Path[] outputFiles = FileUtil.stat2Paths(
  84.         getFileSystem().listStatus(outDir,
  85.         new OutputLogFilter()));
  86.     if (outputFiles.length > 0) {
  87.       InputStream is = getFileSystem().open(outputFiles[0]);
  88.       BufferedReader reader = new BufferedReader(new InputStreamReader(is));
  89.       String line = reader.readLine();
  90.       //make sure we get what we expect as the first line, and also
  91.       //that we have two lines (both the lines must end up in the same
  92.       //reducer since the partitioner takes the same key spec for all
  93.       //lines
  94.       if (expect == 1) {
  95.         assertTrue(line.startsWith(line1));
  96.       } else if (expect == 2) {
  97.         assertTrue(line.startsWith(line2));
  98.       }
  99.       line = reader.readLine();
  100.       if (expect == 1) {
  101.         assertTrue(line.startsWith(line2));
  102.       } else if (expect == 2) {
  103.         assertTrue(line.startsWith(line1));
  104.       }
  105.       reader.close();
  106.     }
  107.   }
  108.   public void testBasicUnixComparator() throws Exception {
  109.     configure("-k1,1n", 1);
  110.     configure("-k2,2n", 1);
  111.     configure("-k2.2,2n", 2);
  112.     configure("-k3.4,3n", 2);
  113.     configure("-k3.2,3.3n -k4,4n", 2);
  114.     configure("-k3.2,3.3n -k4,4nr", 1);
  115.     configure("-k2.4,2.4n", 2);
  116.     configure("-k7,7", 1);
  117.     configure("-k7,7n", 2);
  118.     configure("-k8,8n", 2);
  119.     configure("-k9,9n", 1);
  120.     configure("-k11,11",2);
  121.     configure("-k10,10",2);
  122.   }
  123. }