SleepJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:9k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.examples;
  19. import java.io.IOException;
  20. import java.io.DataInput;
  21. import java.io.DataOutput;
  22. import java.util.Iterator;
  23. import java.util.Random;
  24. import org.apache.hadoop.conf.Configuration;
  25. import org.apache.hadoop.conf.Configured;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.fs.Path;
  28. import org.apache.hadoop.io.IntWritable;
  29. import org.apache.hadoop.io.NullWritable;
  30. import org.apache.hadoop.io.SequenceFile;
  31. import org.apache.hadoop.mapred.*;
  32. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  33. import org.apache.hadoop.util.Tool;
  34. import org.apache.hadoop.util.ToolRunner;
  35. /**
  36.  * Dummy class for testing MR framefork. Sleeps for a defined period 
  37.  * of time in mapper and reducer. Generates fake input for map / reduce 
  38.  * jobs. Note that generated number of input pairs is in the order 
  39.  * of <code>numMappers * mapSleepTime / 100</code>, so the job uses
  40.  * some disk space.
  41.  */
  42. public class SleepJob extends Configured implements Tool,  
  43.              Mapper<IntWritable, IntWritable, IntWritable, NullWritable>,
  44.              Reducer<IntWritable, NullWritable, NullWritable, NullWritable>,
  45.              Partitioner<IntWritable,NullWritable> {
  46.   private long mapSleepDuration = 100;
  47.   private long reduceSleepDuration = 100;
  48.   private int mapSleepCount = 1;
  49.   private int reduceSleepCount = 1;
  50.   private int count = 0;
  51.   public int getPartition(IntWritable k, NullWritable v, int numPartitions) {
  52.     return k.get() % numPartitions;
  53.   }
  54.   
  55.   public static class EmptySplit implements InputSplit {
  56.     public void write(DataOutput out) throws IOException { }
  57.     public void readFields(DataInput in) throws IOException { }
  58.     public long getLength() { return 0L; }
  59.     public String[] getLocations() { return new String[0]; }
  60.   }
  61.   public static class SleepInputFormat extends Configured
  62.       implements InputFormat<IntWritable,IntWritable> {
  63.     public InputSplit[] getSplits(JobConf conf, int numSplits) {
  64.       InputSplit[] ret = new InputSplit[numSplits];
  65.       for (int i = 0; i < numSplits; ++i) {
  66.         ret[i] = new EmptySplit();
  67.       }
  68.       return ret;
  69.     }
  70.     public RecordReader<IntWritable,IntWritable> getRecordReader(
  71.         InputSplit ignored, JobConf conf, Reporter reporter)
  72.         throws IOException {
  73.       final int count = conf.getInt("sleep.job.map.sleep.count", 1);
  74.       if (count < 0) throw new IOException("Invalid map count: " + count);
  75.       final int redcount = conf.getInt("sleep.job.reduce.sleep.count", 1);
  76.       if (redcount < 0)
  77.         throw new IOException("Invalid reduce count: " + redcount);
  78.       final int emitPerMapTask = (redcount * conf.getNumReduceTasks());
  79.     return new RecordReader<IntWritable,IntWritable>() {
  80.         private int records = 0;
  81.         private int emitCount = 0;
  82.         public boolean next(IntWritable key, IntWritable value)
  83.             throws IOException {
  84.           key.set(emitCount);
  85.           int emit = emitPerMapTask / count;
  86.           if ((emitPerMapTask) % count > records) {
  87.             ++emit;
  88.           }
  89.           emitCount += emit;
  90.           value.set(emit);
  91.           return records++ < count;
  92.         }
  93.         public IntWritable createKey() { return new IntWritable(); }
  94.         public IntWritable createValue() { return new IntWritable(); }
  95.         public long getPos() throws IOException { return records; }
  96.         public void close() throws IOException { }
  97.         public float getProgress() throws IOException {
  98.           return records / ((float)count);
  99.         }
  100.       };
  101.     }
  102.   }
  103.   public void map(IntWritable key, IntWritable value,
  104.       OutputCollector<IntWritable, NullWritable> output, Reporter reporter)
  105.       throws IOException {
  106.     //it is expected that every map processes mapSleepCount number of records. 
  107.     try {
  108.       reporter.setStatus("Sleeping... (" +
  109.           (mapSleepDuration * (mapSleepCount - count)) + ") ms left");
  110.       Thread.sleep(mapSleepDuration);
  111.     }
  112.     catch (InterruptedException ex) {
  113.       throw (IOException)new IOException(
  114.           "Interrupted while sleeping").initCause(ex);
  115.     }
  116.     ++count;
  117.     // output reduceSleepCount * numReduce number of random values, so that
  118.     // each reducer will get reduceSleepCount number of keys.
  119.     int k = key.get();
  120.     for (int i = 0; i < value.get(); ++i) {
  121.       output.collect(new IntWritable(k + i), NullWritable.get());
  122.     }
  123.   }
  124.   public void reduce(IntWritable key, Iterator<NullWritable> values,
  125.       OutputCollector<NullWritable, NullWritable> output, Reporter reporter)
  126.       throws IOException {
  127.     try {
  128.       reporter.setStatus("Sleeping... (" +
  129.           (reduceSleepDuration * (reduceSleepCount - count)) + ") ms left");
  130.         Thread.sleep(reduceSleepDuration);
  131.       
  132.     }
  133.     catch (InterruptedException ex) {
  134.       throw (IOException)new IOException(
  135.           "Interrupted while sleeping").initCause(ex);
  136.     }
  137.     count++;
  138.   }
  139.   public void configure(JobConf job) {
  140.     this.mapSleepCount =
  141.       job.getInt("sleep.job.map.sleep.count", mapSleepCount);
  142.     this.reduceSleepCount =
  143.       job.getInt("sleep.job.reduce.sleep.count", reduceSleepCount);
  144.     this.mapSleepDuration =
  145.       job.getLong("sleep.job.map.sleep.time" , 100) / mapSleepCount;
  146.     this.reduceSleepDuration =
  147.       job.getLong("sleep.job.reduce.sleep.time" , 100) / reduceSleepCount;
  148.   }
  149.   public void close() throws IOException {
  150.   }
  151.   public static void main(String[] args) throws Exception{
  152.     int res = ToolRunner.run(new Configuration(), new SleepJob(), args);
  153.     System.exit(res);
  154.   }
  155.   public int run(int numMapper, int numReducer, long mapSleepTime,
  156.       int mapSleepCount, long reduceSleepTime,
  157.       int reduceSleepCount) throws IOException {
  158.     JobConf job = setupJobConf(numMapper, numReducer, mapSleepTime, 
  159.                   mapSleepCount, reduceSleepTime, reduceSleepCount);
  160.     JobClient.runJob(job);
  161.     return 0;
  162.   }
  163.   public JobConf setupJobConf(int numMapper, int numReducer, 
  164.                                 long mapSleepTime, int mapSleepCount, 
  165.                                 long reduceSleepTime, int reduceSleepCount) {
  166.     JobConf job = new JobConf(getConf(), SleepJob.class);
  167.     job.setNumMapTasks(numMapper);
  168.     job.setNumReduceTasks(numReducer);
  169.     job.setMapperClass(SleepJob.class);
  170.     job.setMapOutputKeyClass(IntWritable.class);
  171.     job.setMapOutputValueClass(NullWritable.class);
  172.     job.setReducerClass(SleepJob.class);
  173.     job.setOutputFormat(NullOutputFormat.class);
  174.     job.setInputFormat(SleepInputFormat.class);
  175.     job.setPartitionerClass(SleepJob.class);
  176.     job.setSpeculativeExecution(false);
  177.     job.setJobName("Sleep job");
  178.     FileInputFormat.addInputPath(job, new Path("ignored"));
  179.     job.setLong("sleep.job.map.sleep.time", mapSleepTime);
  180.     job.setLong("sleep.job.reduce.sleep.time", reduceSleepTime);
  181.     job.setInt("sleep.job.map.sleep.count", mapSleepCount);
  182.     job.setInt("sleep.job.reduce.sleep.count", reduceSleepCount);
  183.     return job;
  184.   }
  185.   public int run(String[] args) throws Exception {
  186.     if(args.length < 1) {
  187.       System.err.println("SleepJob [-m numMapper] [-r numReducer]" +
  188.           " [-mt mapSleepTime (msec)] [-rt reduceSleepTime (msec)]" +
  189.           " [-recordt recordSleepTime (msec)]");
  190.       ToolRunner.printGenericCommandUsage(System.err);
  191.       return -1;
  192.     }
  193.     int numMapper = 1, numReducer = 1;
  194.     long mapSleepTime = 100, reduceSleepTime = 100, recSleepTime = 100;
  195.     int mapSleepCount = 1, reduceSleepCount = 1;
  196.     for(int i=0; i < args.length; i++ ) {
  197.       if(args[i].equals("-m")) {
  198.         numMapper = Integer.parseInt(args[++i]);
  199.       }
  200.       else if(args[i].equals("-r")) {
  201.         numReducer = Integer.parseInt(args[++i]);
  202.       }
  203.       else if(args[i].equals("-mt")) {
  204.         mapSleepTime = Long.parseLong(args[++i]);
  205.       }
  206.       else if(args[i].equals("-rt")) {
  207.         reduceSleepTime = Long.parseLong(args[++i]);
  208.       }
  209.       else if (args[i].equals("-recordt")) {
  210.         recSleepTime = Long.parseLong(args[++i]);
  211.       }
  212.     }
  213.     
  214.     // sleep for *SleepTime duration in Task by recSleepTime per record
  215.     mapSleepCount = (int)Math.ceil(mapSleepTime / ((double)recSleepTime));
  216.     reduceSleepCount = (int)Math.ceil(reduceSleepTime / ((double)recSleepTime));
  217.     
  218.     return run(numMapper, numReducer, mapSleepTime, mapSleepCount,
  219.         reduceSleepTime, reduceSleepCount);
  220.   }
  221. }