JobControlTestUtils.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.jobcontrol;
  19. import java.io.IOException;
  20. import java.text.NumberFormat;
  21. import java.util.Iterator;
  22. import java.util.List;
  23. import java.util.Random;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.fs.FSDataOutputStream;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.LongWritable;
  29. import org.apache.hadoop.io.Text;
  30. import org.apache.hadoop.mapred.FileInputFormat;
  31. import org.apache.hadoop.mapred.FileOutputFormat;
  32. import org.apache.hadoop.mapred.JobConf;
  33. import org.apache.hadoop.mapred.MapReduceBase;
  34. import org.apache.hadoop.mapred.Mapper;
  35. import org.apache.hadoop.mapred.OutputCollector;
  36. import org.apache.hadoop.mapred.Reducer;
  37. import org.apache.hadoop.mapred.Reporter;
  38. /**
  39.  * Utility methods used in various Job Control unit tests.
  40.  */
  41. public class JobControlTestUtils {
  42.   static private Random rand = new Random();
  43.   private static NumberFormat idFormat = NumberFormat.getInstance();
  44.   static {
  45.     idFormat.setMinimumIntegerDigits(4);
  46.     idFormat.setGroupingUsed(false);
  47.   }
  48.   /**
  49.    * Cleans the data from the passed Path in the passed FileSystem.
  50.    * 
  51.    * @param fs FileSystem to delete data from.
  52.    * @param dirPath Path to be deleted.
  53.    * @throws IOException If an error occurs cleaning the data.
  54.    */
  55.   static void cleanData(FileSystem fs, Path dirPath) throws IOException {
  56.     fs.delete(dirPath, true);
  57.   }
  58.   /**
  59.    * Generates a string of random digits.
  60.    * 
  61.    * @return A random string.
  62.    */
  63.   private static String generateRandomWord() {
  64.     return idFormat.format(rand.nextLong());
  65.   }
  66.   /**
  67.    * Generates a line of random text.
  68.    * 
  69.    * @return A line of random text.
  70.    */
  71.   private static String generateRandomLine() {
  72.     long r = rand.nextLong() % 7;
  73.     long n = r + 20;
  74.     StringBuffer sb = new StringBuffer();
  75.     for (int i = 0; i < n; i++) {
  76.       sb.append(generateRandomWord()).append(" ");
  77.     }
  78.     sb.append("n");
  79.     return sb.toString();
  80.   }
  81.   /**
  82.    * Generates data that can be used for Job Control tests.
  83.    * 
  84.    * @param fs FileSystem to create data in.
  85.    * @param dirPath Path to create the data in.
  86.    * @throws IOException If an error occurs creating the data.
  87.    */
  88.   static void generateData(FileSystem fs, Path dirPath) throws IOException {
  89.     FSDataOutputStream out = fs.create(new Path(dirPath, "data.txt"));
  90.     for (int i = 0; i < 10000; i++) {
  91.       String line = generateRandomLine();
  92.       out.write(line.getBytes("UTF-8"));
  93.     }
  94.     out.close();
  95.   }
  96.   /**
  97.    * Creates a simple copy job.
  98.    * 
  99.    * @param indirs List of input directories.
  100.    * @param outdir Output directory.
  101.    * @return JobConf initialised for a simple copy job.
  102.    * @throws Exception If an error occurs creating job configuration.
  103.    */
  104.   static JobConf createCopyJob(List<Path> indirs, Path outdir) throws Exception {
  105.     Configuration defaults = new Configuration();
  106.     JobConf theJob = new JobConf(defaults, TestJobControl.class);
  107.     theJob.setJobName("DataMoveJob");
  108.     FileInputFormat.setInputPaths(theJob, indirs.toArray(new Path[0]));
  109.     theJob.setMapperClass(DataCopy.class);
  110.     FileOutputFormat.setOutputPath(theJob, outdir);
  111.     theJob.setOutputKeyClass(Text.class);
  112.     theJob.setOutputValueClass(Text.class);
  113.     theJob.setReducerClass(DataCopy.class);
  114.     theJob.setNumMapTasks(12);
  115.     theJob.setNumReduceTasks(4);
  116.     return theJob;
  117.   }
  118.   /**
  119.    * Simple Mapper and Reducer implementation which copies data it reads in.
  120.    */
  121.   public static class DataCopy extends MapReduceBase implements
  122.       Mapper<LongWritable, Text, Text, Text>, Reducer<Text, Text, Text, Text> {
  123.     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
  124.         Reporter reporter) throws IOException {
  125.       output.collect(new Text(key.toString()), value);
  126.     }
  127.     public void reduce(Text key, Iterator<Text> values,
  128.         OutputCollector<Text, Text> output, Reporter reporter)
  129.         throws IOException {
  130.       Text dumbKey = new Text("");
  131.       while (values.hasNext()) {
  132.         Text data = (Text) values.next();
  133.         output.collect(dumbKey, data);
  134.       }
  135.     }
  136.   }
  137. }