TestDataJoin.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.utils.join;
  19. import java.io.IOException;
  20. import junit.framework.Test;
  21. import junit.framework.TestCase;
  22. import junit.framework.TestSuite;
  23. import junit.extensions.TestSetup;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.hdfs.MiniDFSCluster;
  26. import org.apache.hadoop.fs.FSDataInputStream;
  27. import org.apache.hadoop.fs.FileStatus;
  28. import org.apache.hadoop.fs.FileSystem;
  29. import org.apache.hadoop.fs.Path;
  30. import org.apache.hadoop.io.LongWritable;
  31. import org.apache.hadoop.io.SequenceFile;
  32. import org.apache.hadoop.io.Text;
  33. import org.apache.hadoop.mapred.*;
  34. public class TestDataJoin extends TestCase {
  35.   private static MiniDFSCluster cluster = null;
  36.   public static Test suite() {
  37.     TestSetup setup = new TestSetup(new TestSuite(TestDataJoin.class)) {
  38.       protected void setUp() throws Exception {
  39.         Configuration conf = new Configuration();
  40.         cluster = new MiniDFSCluster(conf, 2, true, null);
  41.       }
  42.       protected void tearDown() throws Exception {
  43.         if (cluster != null) {
  44.           cluster.shutdown();
  45.         }
  46.       }
  47.     };
  48.     return setup;
  49.   }
  50.   public void testDataJoin() throws Exception {
  51.     final int srcs = 4;
  52.     JobConf job = new JobConf();
  53.     Path base = cluster.getFileSystem().makeQualified(new Path("/inner"));
  54.     Path[] src = writeSimpleSrc(base, job, srcs);
  55.     job.setInputFormat(SequenceFileInputFormat.class);
  56.     Path outdir = new Path(base, "out");
  57.     FileOutputFormat.setOutputPath(job, outdir);
  58.     job.setMapperClass(SampleDataJoinMapper.class);
  59.     job.setReducerClass(SampleDataJoinReducer.class);
  60.     job.setMapOutputKeyClass(Text.class);
  61.     job.setMapOutputValueClass(SampleTaggedMapOutput.class);
  62.     job.setOutputKeyClass(Text.class);
  63.     job.setOutputValueClass(Text.class);
  64.     job.setOutputFormat(TextOutputFormat.class);
  65.     job.setNumMapTasks(1);
  66.     job.setNumReduceTasks(1);
  67.     FileInputFormat.setInputPaths(job, src);
  68.     try {
  69.       JobClient.runJob(job);
  70.       confirmOutput(outdir, job, srcs);
  71.     } finally {
  72.       base.getFileSystem(job).delete(base, true);
  73.     }
  74.   }
  75.   private static void confirmOutput(Path out, JobConf job, int srcs)
  76.       throws IOException {
  77.     FileSystem fs = out.getFileSystem(job);
  78.     FileStatus[] outlist = fs.listStatus(out);
  79.     assertEquals(1, outlist.length);
  80.     assertTrue(0 < outlist[0].getLen());
  81.     FSDataInputStream in = fs.open(outlist[0].getPath());
  82.     LineRecordReader rr = new LineRecordReader(in, 0, Integer.MAX_VALUE, job);
  83.     LongWritable k = new LongWritable();
  84.     Text v = new Text();
  85.     int count = 0;
  86.     while (rr.next(k, v)) {
  87.       String[] vals = v.toString().split("t");
  88.       assertEquals(srcs + 1, vals.length);
  89.       int[] ivals = new int[vals.length];
  90.       for (int i = 0; i < vals.length; ++i)
  91.         ivals[i] = Integer.parseInt(vals[i]);
  92.       assertEquals(0, ivals[0] % (srcs * srcs));
  93.       for (int i = 1; i < vals.length; ++i) {
  94.         assertEquals((ivals[i] - (i - 1)) * srcs, 10 * ivals[0]);
  95.       }
  96.       ++count;
  97.     }
  98.     assertEquals(4, count);
  99.   }
  100.   private static SequenceFile.Writer[] createWriters(Path testdir,
  101.       JobConf conf, int srcs, Path[] src) throws IOException {
  102.     for (int i = 0; i < srcs; ++i) {
  103.       src[i] = new Path(testdir, Integer.toString(i + 10, 36));
  104.     }
  105.     SequenceFile.Writer out[] = new SequenceFile.Writer[srcs];
  106.     for (int i = 0; i < srcs; ++i) {
  107.       out[i] = new SequenceFile.Writer(testdir.getFileSystem(conf), conf,
  108.           src[i], Text.class, Text.class);
  109.     }
  110.     return out;
  111.   }
  112.   private static Path[] writeSimpleSrc(Path testdir, JobConf conf,
  113.       int srcs) throws IOException {
  114.     SequenceFile.Writer out[] = null;
  115.     Path[] src = new Path[srcs];
  116.     try {
  117.       out = createWriters(testdir, conf, srcs, src);
  118.       final int capacity = srcs * 2 + 1;
  119.       Text key = new Text();
  120.       key.set("ignored");
  121.       Text val = new Text();
  122.       for (int k = 0; k < capacity; ++k) {
  123.         for (int i = 0; i < srcs; ++i) {
  124.           val.set(Integer.toString(k % srcs == 0 ? k * srcs : k * srcs + i) +
  125.               "t" + Integer.toString(10 * k + i));
  126.           out[i].append(key, val);
  127.           if (i == k) {
  128.             // add duplicate key
  129.             out[i].append(key, val);
  130.           }
  131.         }
  132.       }
  133.     } finally {
  134.       if (out != null) {
  135.         for (int i = 0; i < srcs; ++i) {
  136.           if (out[i] != null)
  137.             out[i].close();
  138.         }
  139.       }
  140.     }
  141.     return src;
  142.   }
  143. }