TestCollect.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import org.apache.hadoop.fs.*;
  20. import org.apache.hadoop.io.*;
  21. import org.apache.hadoop.mapred.UtilsForTests.RandomInputFormat;
  22. import junit.framework.TestCase;
  23. import java.io.*;
  24. import java.util.*;
  25. /** 
  26.  * TestCollect checks if the collect can handle simultaneous invocations.
  27.  */
  28. public class TestCollect extends TestCase 
  29. {
  30.   final static Path OUTPUT_DIR = new Path("build/test/test.collect.output");
  31.   static final int NUM_FEEDERS = 10;
  32.   static final int NUM_COLLECTS_PER_THREAD = 1000;
  33.   
  34.   /** 
  35.    * Map is a Mapper that spawns threads which simultaneously call collect. 
  36.    * Each thread has a specific range to write to the buffer and is unique to 
  37.    * the thread. This is a synchronization test for the map's collect.
  38.    */
  39.    
  40.   static class Map
  41.     implements Mapper<Text, Text, IntWritable, IntWritable> {
  42.     
  43.     public void configure(JobConf job) {
  44.     }
  45.     
  46.     public void map(Text key, Text val,
  47.                     final OutputCollector<IntWritable, IntWritable> out,
  48.                     Reporter reporter) throws IOException {
  49.       // Class for calling collect in separate threads
  50.       class CollectFeeder extends Thread {
  51.         int id; // id for the thread
  52.         
  53.         public CollectFeeder(int id) {
  54.           this.id = id;
  55.         }
  56.         
  57.         public void run() {
  58.           for (int j = 1; j <= NUM_COLLECTS_PER_THREAD; j++) {
  59.             try {
  60.               out.collect(new IntWritable((id * NUM_COLLECTS_PER_THREAD) + j), 
  61.                                           new IntWritable(0));
  62.             } catch (IOException ioe) { }
  63.           }
  64.         }
  65.       }
  66.       
  67.       CollectFeeder [] feeders = new CollectFeeder[NUM_FEEDERS];
  68.       
  69.       // start the feeders
  70.       for (int i = 0; i < NUM_FEEDERS; i++) {
  71.         feeders[i] = new CollectFeeder(i);
  72.         feeders[i].start();
  73.       }
  74.       // wait for them to finish
  75.       for (int i = 0; i < NUM_FEEDERS; i++) {
  76.         try {
  77.           feeders[i].join();
  78.         } catch (InterruptedException ie) {
  79.           throw new IOException(ie.toString());
  80.         }
  81.       }
  82.     }
  83.     
  84.     public void close() {
  85.     }
  86.   }
  87.   
  88.   static class Reduce
  89.   implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
  90.   
  91.     static int numSeen;
  92.     static int actualSum;
  93.     public void configure(JobConf job) { }
  94.     public void reduce(IntWritable key, Iterator<IntWritable> val,
  95.                        OutputCollector<IntWritable, IntWritable> out,
  96.                        Reporter reporter) throws IOException {
  97.       actualSum += key.get(); // keep the running count of the seen values
  98.       numSeen++; // number of values seen so far
  99.       
  100.       // using '1+2+3+...n =  n*(n+1)/2' to validate
  101.       int expectedSum = numSeen * (numSeen + 1) / 2;
  102.       if (expectedSum != actualSum) {
  103.         throw new IOException("Collect test failed!! Ordering mismatch.");
  104.       }
  105.     }
  106.     public void close() { }
  107.   }
  108.   
  109.   public void configure(JobConf conf) throws IOException {
  110.     conf.setJobName("TestCollect");
  111.     conf.setJarByClass(TestCollect.class);
  112.     
  113.     conf.setInputFormat(RandomInputFormat.class); // for self data generation
  114.     conf.setOutputKeyClass(IntWritable.class);
  115.     conf.setOutputValueClass(IntWritable.class);
  116.     FileOutputFormat.setOutputPath(conf, OUTPUT_DIR);
  117.     
  118.     conf.setMapperClass(Map.class);
  119.     conf.setReducerClass(Reduce.class);
  120.     conf.setNumMapTasks(1);
  121.     conf.setNumReduceTasks(1);
  122.   }
  123.   
  124.   public void testCollect() throws IOException {
  125.     JobConf conf = new JobConf();
  126.     configure(conf);
  127.     try {
  128.       JobClient.runJob(conf);
  129.       // check if all the values were seen by the reducer
  130.       if (Reduce.numSeen != (NUM_COLLECTS_PER_THREAD * NUM_FEEDERS)) {
  131.         throw new IOException("Collect test failed!! Total does not match.");
  132.       }
  133.     } catch (IOException ioe) {
  134.       throw ioe;
  135.     } finally {
  136.       FileSystem fs = FileSystem.get(conf);
  137.       fs.delete(OUTPUT_DIR, true);
  138.     }
  139.   }
  140.   
  141.   public static void main(String[] args) throws IOException {
  142.     new TestCollect().testCollect();
  143.   }
  144. }