BigMapOutput.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.Date;
  21. import java.util.Random;
  22. import org.apache.commons.logging.Log;
  23. import org.apache.commons.logging.LogFactory;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.conf.Configured;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.FileStatus;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.io.BytesWritable;
  30. import org.apache.hadoop.io.SequenceFile;
  31. import org.apache.hadoop.io.SequenceFile.CompressionType;
  32. import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
  33. import org.apache.hadoop.mapred.lib.IdentityMapper;
  34. import org.apache.hadoop.mapred.lib.IdentityReducer;
  35. import org.apache.hadoop.util.Tool;
  36. import org.apache.hadoop.util.ToolRunner;
  37. public class BigMapOutput extends Configured implements Tool {
  38.   public static final Log LOG =
  39.     LogFactory.getLog(BigMapOutput.class.getName());
  40.   private static Random random = new Random();
  41.   
  42.   private static void randomizeBytes(byte[] data, int offset, int length) {
  43.     for(int i=offset + length - 1; i >= offset; --i) {
  44.       data[i] = (byte) random.nextInt(256);
  45.     }
  46.   }
  47.   private static void createBigMapInputFile(Configuration conf, FileSystem fs, 
  48.                                             Path dir, long fileSizeInMB) 
  49.   throws IOException {
  50.     // Check if the input path exists and is non-empty
  51.     if (fs.exists(dir)) {
  52.       FileStatus[] list = fs.listStatus(dir);
  53.       if (list != null && list.length > 0) {
  54.         throw new IOException("Input path: " + dir + " already exists... ");
  55.       }
  56.     }
  57.     
  58.     Path file = new Path(dir, "part-0");
  59.     SequenceFile.Writer writer = 
  60.       SequenceFile.createWriter(fs, conf, file, 
  61.                                 BytesWritable.class, BytesWritable.class,
  62.                                 CompressionType.NONE);
  63.     long numBytesToWrite = fileSizeInMB * 1024 * 1024;
  64.     int minKeySize = conf.getInt("test.bmo.min_key", 10);;
  65.     int keySizeRange = 
  66.       conf.getInt("test.bmo.max_key", 1000) - minKeySize;
  67.     int minValueSize = conf.getInt("test.bmo.min_value", 0);
  68.     int valueSizeRange = 
  69.       conf.getInt("test.bmo.max_value", 20000) - minValueSize;
  70.     BytesWritable randomKey = new BytesWritable();
  71.     BytesWritable randomValue = new BytesWritable();
  72.     LOG.info("Writing " + numBytesToWrite + " bytes to " + file + " with " +
  73.              "minKeySize: " + minKeySize + " keySizeRange: " + keySizeRange +
  74.              " minValueSize: " + minValueSize + " valueSizeRange: " + valueSizeRange);
  75.     long start = System.currentTimeMillis();
  76.     while (numBytesToWrite > 0) {
  77.       int keyLength = minKeySize + 
  78.         (keySizeRange != 0 ? random.nextInt(keySizeRange) : 0);
  79.       randomKey.setSize(keyLength);
  80.       randomizeBytes(randomKey.getBytes(), 0, randomKey.getLength());
  81.       int valueLength = minValueSize +
  82.         (valueSizeRange != 0 ? random.nextInt(valueSizeRange) : 0);
  83.       randomValue.setSize(valueLength);
  84.       randomizeBytes(randomValue.getBytes(), 0, randomValue.getLength());
  85.       writer.append(randomKey, randomValue);
  86.       numBytesToWrite -= keyLength + valueLength;
  87.     }
  88.     writer.close();
  89.     long end = System.currentTimeMillis();
  90.     LOG.info("Created " + file + " of size: " + fileSizeInMB + "MB in " + 
  91.              (end-start)/1000 + "secs");
  92.   }
  93.   
  94.   private static void usage() {
  95.     System.err.println("BigMapOutput -input <input-dir> -output <output-dir> " +
  96.                        "[-create <filesize in MB>]");
  97.     ToolRunner.printGenericCommandUsage(System.err);
  98.     System.exit(1);
  99.   }
  100.   public int run(String[] args) throws Exception {    
  101.     if (args.length < 4) { //input-dir should contain a huge file ( > 2GB)
  102.       usage();
  103.     } 
  104.     Path bigMapInput = null;
  105.     Path outputPath = null;
  106.     boolean createInput = false;
  107.     long fileSizeInMB = 3 * 1024;         // default of 3GB (>2GB)
  108.     for(int i=0; i < args.length; ++i) {
  109.       if ("-input".equals(args[i])){
  110.         bigMapInput = new Path(args[++i]);
  111.       } else if ("-output".equals(args[i])){
  112.         outputPath = new Path(args[++i]);
  113.       } else if ("-create".equals(args[i])) {
  114.         createInput = true;
  115.         fileSizeInMB = Long.parseLong(args[++i]);
  116.       } else {
  117.         usage();
  118.       }
  119.     }
  120.     
  121.     FileSystem fs = FileSystem.get(getConf());
  122.     JobConf jobConf = new JobConf(getConf(), BigMapOutput.class);
  123.     jobConf.setJobName("BigMapOutput");
  124.     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
  125.     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  126.     FileInputFormat.setInputPaths(jobConf, bigMapInput);
  127.     if (fs.exists(outputPath)) {
  128.       fs.delete(outputPath, true);
  129.     }
  130.     FileOutputFormat.setOutputPath(jobConf, outputPath);
  131.     jobConf.setMapperClass(IdentityMapper.class);
  132.     jobConf.setReducerClass(IdentityReducer.class);
  133.     jobConf.setOutputKeyClass(BytesWritable.class);
  134.     jobConf.setOutputValueClass(BytesWritable.class);
  135.     
  136.     if (createInput) {
  137.       createBigMapInputFile(jobConf, fs, bigMapInput, fileSizeInMB);
  138.     }
  139.     
  140.     Date startTime = new Date();
  141.     System.out.println("Job started: " + startTime);
  142.     JobClient.runJob(jobConf);
  143.     Date end_time = new Date();
  144.     System.out.println("Job ended: " + end_time);
  145.     
  146.     return 0;
  147.   }
  148.   public static void main(String argv[]) throws Exception {
  149.     int res = ToolRunner.run(new Configuration(), new BigMapOutput(), argv);
  150.     System.exit(res);
  151.   }
  152. }