GridMixRunner.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:24k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.ArrayList;
  21. import java.util.Arrays;
  22. import java.util.Calendar;
  23. import java.util.Date;
  24. import java.util.EnumSet;
  25. import java.util.Iterator;
  26. import java.util.Map.Entry;
  27. import java.util.TreeMap;
  28. import org.apache.hadoop.conf.Configuration;
  29. import org.apache.hadoop.examples.Sort;
  30. import org.apache.hadoop.fs.FileSystem;
  31. import org.apache.hadoop.fs.Path;
  32. import org.apache.hadoop.mapred.Counters.Group;
  33. import org.apache.hadoop.mapred.RunningJob;
  34. import org.apache.hadoop.mapred.jobcontrol.*;
  35. import org.apache.hadoop.mapred.lib.IdentityMapper;
  36. import org.apache.hadoop.mapred.lib.IdentityReducer;
  37. import org.apache.hadoop.streaming.StreamJob;
  38. public class GridMixRunner {
  39.   private static final int NUM_OF_LARGE_JOBS_PER_CLASS = 0;
  40.   private static final int NUM_OF_MEDIUM_JOBS_PER_CLASS = 0;
  41.   private static final int NUM_OF_SMALL_JOBS_PER_CLASS = 0;
  42.   private static final int NUM_OF_REDUCERS_FOR_SMALL_JOB = 15;
  43.   private static final int NUM_OF_REDUCERS_FOR_MEDIUM_JOB = 170;
  44.   private static final int NUM_OF_REDUCERS_FOR_LARGE_JOB = 370;
  45.   private static final String GRID_MIX_DATA = "/gridmix/data";
  46.   private static final String VARCOMPSEQ =
  47.     GRID_MIX_DATA + "/WebSimulationBlockCompressed";
  48.   private static final String FIXCOMPSEQ =
  49.     GRID_MIX_DATA + "/MonsterQueryBlockCompressed";
  50.   private static final String VARINFLTEXT =
  51.     GRID_MIX_DATA + "/SortUncompressed";
  52.   private static final String GRIDMIXCONFIG = "gridmix_config.xml";
  53.   private static final Configuration config = initConfig();
  54.   private static final FileSystem fs = initFs();
  55.   private final JobControl gridmix;
  56.   private int numOfJobs = 0;
  57.   private enum Size {
  58.     SMALL("small",                               // name
  59.           "/{part-00000,part-00001,part-00002}", // default input subset
  60.           NUM_OF_SMALL_JOBS_PER_CLASS,           // defuault num jobs
  61.           NUM_OF_REDUCERS_FOR_SMALL_JOB),        // default num reducers
  62.     MEDIUM("medium",                             // name
  63.           "/{part-000*0,part-000*1,part-000*2}", // default input subset
  64.           NUM_OF_MEDIUM_JOBS_PER_CLASS,          // defuault num jobs
  65.           NUM_OF_REDUCERS_FOR_MEDIUM_JOB),       // default num reducers
  66.     LARGE("large",                               // name
  67.           "",                                    // default input subset
  68.           NUM_OF_LARGE_JOBS_PER_CLASS,           // defuault num jobs
  69.           NUM_OF_REDUCERS_FOR_LARGE_JOB);        // default num reducers
  70.     private final String str;
  71.     private final String path;
  72.     private final int numJobs;
  73.     private final int numReducers;
  74.     Size(String str, String path, int numJobs, int numReducers) {
  75.       this.str = str;
  76.       this.path = path;
  77.       this.numJobs = numJobs;
  78.       this.numReducers = numReducers;
  79.     }
  80.     public String defaultPath(String base) {
  81.       return base + path;
  82.     }
  83.     public int defaultNumJobs() {
  84.       return numJobs;
  85.     }
  86.     public int defaultNumReducers() {
  87.       return numReducers;
  88.     }
  89.     public String toString() {
  90.       return str;
  91.     }
  92.   }
  93.   private enum GridMixJob {
  94.     STREAMSORT("streamSort") {
  95.     public void addJob(int numReducers, boolean mapoutputCompressed,
  96.         boolean outputCompressed, Size size, JobControl gridmix) {
  97.       final String prop = String.format("streamSort.%sJobs.inputFiles", size);
  98.       final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
  99.       final String outdir = addTSSuffix("perf-out/stream-out-dir-" + size);
  100.       StringBuffer sb = new StringBuffer();
  101.       sb.append("-input ").append(indir).append(" ");
  102.       sb.append("-output ").append(outdir).append(" ");
  103.       sb.append("-mapper cat ");
  104.       sb.append("-reducer cat ");
  105.       sb.append("-numReduceTasks ").append(numReducers);
  106.       String[] args = sb.toString().split(" ");
  107.       clearDir(outdir);
  108.       try {
  109.         JobConf jobconf = StreamJob.createJob(args);
  110.         jobconf.setJobName("GridmixStreamingSorter." + size);
  111.         jobconf.setCompressMapOutput(mapoutputCompressed);
  112.         jobconf.setBoolean("mapred.output.compress", outputCompressed);
  113.         Job job = new Job(jobconf);
  114.         gridmix.addJob(job);
  115.       } catch (Exception ex) {
  116.         ex.printStackTrace();
  117.       }
  118.     }
  119.     },
  120.     JAVASORT("javaSort") {
  121.     public void addJob(int numReducers, boolean mapoutputCompressed,
  122.         boolean outputCompressed, Size size, JobControl gridmix) {
  123.       final String prop = String.format("javaSort.%sJobs.inputFiles", size);
  124.       final String indir = getInputDirsFor(prop, size.defaultPath(VARINFLTEXT));
  125.       final String outdir = addTSSuffix("perf-out/sort-out-dir-" + size);
  126.       clearDir(outdir);
  127.       try {
  128.         JobConf jobConf = new JobConf();
  129.         jobConf.setJarByClass(Sort.class);
  130.         jobConf.setJobName("GridmixJavaSorter." + size);
  131.         jobConf.setMapperClass(IdentityMapper.class);
  132.         jobConf.setReducerClass(IdentityReducer.class);
  133.         jobConf.setNumReduceTasks(numReducers);
  134.         jobConf.setInputFormat(org.apache.hadoop.mapred.KeyValueTextInputFormat.class);
  135.         jobConf.setOutputFormat(org.apache.hadoop.mapred.TextOutputFormat.class);
  136.         jobConf.setOutputKeyClass(org.apache.hadoop.io.Text.class);
  137.         jobConf.setOutputValueClass(org.apache.hadoop.io.Text.class);
  138.         jobConf.setCompressMapOutput(mapoutputCompressed);
  139.         jobConf.setBoolean("mapred.output.compress", outputCompressed);
  140.         FileInputFormat.addInputPaths(jobConf, indir);
  141.         FileOutputFormat.setOutputPath(jobConf, new Path(outdir));
  142.         Job job = new Job(jobConf);
  143.         gridmix.addJob(job);
  144.       } catch (Exception ex) {
  145.         ex.printStackTrace();
  146.       }
  147.     }
  148.     },
  149.     WEBDATASCAN("webdataScan") {
  150.     public void addJob(int numReducers, boolean mapoutputCompressed,
  151.         boolean outputCompressed, Size size, JobControl gridmix) {
  152.       final String prop = String.format("webdataScan.%sJobs.inputFiles", size);
  153.       final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
  154.       final String outdir = addTSSuffix("perf-out/webdata-scan-out-dir-" + size);
  155.       StringBuffer sb = new StringBuffer();
  156.       sb.append("-keepmap 0.2 ");
  157.       sb.append("-keepred 5 ");
  158.       sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
  159.       sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
  160.       sb.append("-outKey org.apache.hadoop.io.Text ");
  161.       sb.append("-outValue org.apache.hadoop.io.Text ");
  162.       sb.append("-indir ").append(indir).append(" ");
  163.       sb.append("-outdir ").append(outdir).append(" ");
  164.       sb.append("-r ").append(numReducers);
  165.       String[] args = sb.toString().split(" ");
  166.       clearDir(outdir);
  167.       try {
  168.         JobConf jobconf = GenericMRLoadJobCreator.createJob(
  169.             args, mapoutputCompressed, outputCompressed);
  170.         jobconf.setJobName("GridmixWebdatascan." + size);
  171.         Job job = new Job(jobconf);
  172.         gridmix.addJob(job);
  173.       } catch (Exception ex) {
  174.         System.out.println(ex.getStackTrace());
  175.       }
  176.     }
  177.     },
  178.     COMBINER("combiner") {
  179.     public void addJob(int numReducers, boolean mapoutputCompressed,
  180.         boolean outputCompressed, Size size, JobControl gridmix) {
  181.       final String prop = String.format("combiner.%sJobs.inputFiles", size);
  182.       final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
  183.       final String outdir = addTSSuffix("perf-out/combiner-out-dir-" + size);
  184.       StringBuffer sb = new StringBuffer();
  185.       sb.append("-r ").append(numReducers).append(" ");
  186.       sb.append("-indir ").append(indir).append(" ");
  187.       sb.append("-outdir ").append(outdir);
  188.       sb.append("-mapoutputCompressed ").append(mapoutputCompressed).append(" ");
  189.       sb.append("-outputCompressed ").append(outputCompressed);
  190.       String[] args = sb.toString().split(" ");
  191.       clearDir(outdir);
  192.       try {
  193.         JobConf jobconf = CombinerJobCreator.createJob(args);
  194.         jobconf.setJobName("GridmixCombinerJob." + size);
  195.         Job job = new Job(jobconf);
  196.         gridmix.addJob(job);
  197.       } catch (Exception ex) {
  198.         ex.printStackTrace();
  199.       }
  200.     }
  201.     },
  202.     MONSTERQUERY("monsterQuery") {
  203.     public void addJob(int numReducers, boolean mapoutputCompressed,
  204.         boolean outputCompressed, Size size, JobControl gridmix) {
  205.       final String prop = String.format("monsterQuery.%sJobs.inputFiles", size);
  206.       final String indir = getInputDirsFor(prop, size.defaultPath(FIXCOMPSEQ));
  207.       final String outdir = addTSSuffix("perf-out/mq-out-dir-" + size);
  208.       int iter = 3;
  209.       try {
  210.         Job pjob = null;
  211.         Job job = null;
  212.         for (int i = 0; i < iter; i++) {
  213.           String outdirfull = outdir + "." + i;
  214.           String indirfull = (0 == i) ? indir : outdir + "." + (i - 1);
  215.           Path outfile = new Path(outdirfull);
  216.           StringBuffer sb = new StringBuffer();
  217.           sb.append("-keepmap 10 ");
  218.           sb.append("-keepred 40 ");
  219.           sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
  220.           sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
  221.           sb.append("-outKey org.apache.hadoop.io.Text ");
  222.           sb.append("-outValue org.apache.hadoop.io.Text ");
  223.           sb.append("-indir ").append(indirfull).append(" ");
  224.           sb.append("-outdir ").append(outdirfull).append(" ");
  225.           sb.append("-r ").append(numReducers);
  226.           String[] args = sb.toString().split(" ");
  227.           try {
  228.             fs.delete(outfile);
  229.           } catch (IOException ex) {
  230.             System.out.println(ex.toString());
  231.           }
  232.           JobConf jobconf = GenericMRLoadJobCreator.createJob(
  233.               args, mapoutputCompressed, outputCompressed);
  234.           jobconf.setJobName("GridmixMonsterQuery." + size);
  235.           job = new Job(jobconf);
  236.           if (pjob != null) {
  237.             job.addDependingJob(pjob);
  238.           }
  239.           gridmix.addJob(job);
  240.           pjob = job;
  241.         }
  242.       } catch (Exception e) {
  243.         System.out.println(e.getStackTrace());
  244.       }
  245.     }
  246.     },
  247.     WEBDATASORT("webdataSort") {
  248.     public void addJob(int numReducers, boolean mapoutputCompressed,
  249.         boolean outputCompressed, Size size, JobControl gridmix) {
  250.       final String prop = String.format("webdataSort.%sJobs.inputFiles", size);
  251.       final String indir = getInputDirsFor(prop, size.defaultPath(VARCOMPSEQ));
  252.       final String outdir = addTSSuffix("perf-out/webdata-sort-out-dir-" + size);
  253.       StringBuffer sb = new StringBuffer();
  254.       sb.append("-keepmap 100 ");
  255.       sb.append("-keepred 100 ");
  256.       sb.append("-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat ");
  257.       sb.append("-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat ");
  258.       sb.append("-outKey org.apache.hadoop.io.Text ");
  259.       sb.append("-outValue org.apache.hadoop.io.Text ");
  260.       sb.append("-indir ").append(indir).append(" ");
  261.       sb.append("-outdir ").append(outdir).append(" ");
  262.       sb.append("-r ").append(numReducers);
  263.       String[] args = sb.toString().split(" ");
  264.       clearDir(outdir);
  265.       try {
  266.         JobConf jobconf = GenericMRLoadJobCreator.createJob(
  267.             args, mapoutputCompressed, outputCompressed);
  268.         jobconf.setJobName("GridmixWebdataSort." + size);
  269.         Job job = new Job(jobconf);
  270.         gridmix.addJob(job);
  271.       } catch (Exception ex) {
  272.         System.out.println(ex.getStackTrace());
  273.       }
  274.     }
  275.     };
  276.     private final String name;
  277.     GridMixJob(String name) {
  278.       this.name = name;
  279.     }
  280.     public String getName() {
  281.       return name;
  282.     }
  283.     public abstract void addJob(int numReducers, boolean mapComp,
  284.         boolean outComp, Size size, JobControl gridmix);
  285.   }
  286.   public GridMixRunner() throws IOException {
  287.     gridmix = new JobControl("GridMix");
  288.     if (null == config || null == fs) {
  289.       throw new IOException("Bad configuration. Cannot continue.");
  290.     }
  291.   }
  292.   private static FileSystem initFs() {
  293.     try {
  294.       return FileSystem.get(config);
  295.     } catch (Exception e) {
  296.       System.out.println("fs initation error: " + e.getMessage());
  297.     }
  298.     return null;
  299.   }
  300.   private static Configuration initConfig() {
  301.     Configuration conf = new Configuration();
  302.     String configFile = System.getenv("GRIDMIXCONFIG");
  303.     if (configFile == null) {
  304.       String configDir = System.getProperty("user.dir");
  305.       if (configDir == null) {
  306.         configDir = ".";
  307.       }
  308.       configFile = configDir + "/" + GRIDMIXCONFIG;
  309.     }
  310.     try {
  311.       Path fileResource = new Path(configFile);
  312.       conf.addResource(fileResource);
  313.     } catch (Exception e) {
  314.       System.err.println("Error reading config file " + configFile + ":" +
  315.           e.getMessage());
  316.       return null;
  317.     }
  318.     return conf;
  319.   }
  320.   private static int[] getInts(Configuration conf, String name, int defaultV) {
  321.     String[] vals = conf.getStrings(name, String.valueOf(defaultV));
  322.     int[] results = new int[vals.length];
  323.     for (int i = 0; i < vals.length; ++i) {
  324.       results[i] = Integer.parseInt(vals[i]);
  325.     }
  326.     return results;
  327.   }
  328.   private static String getInputDirsFor(String jobType, String defaultIndir) {
  329.     String inputFile[] = config.getStrings(jobType, defaultIndir);
  330.     StringBuffer indirBuffer = new StringBuffer();
  331.     for (int i = 0; i < inputFile.length; i++) {
  332.       indirBuffer = indirBuffer.append(inputFile[i]).append(",");
  333.     }
  334.     return indirBuffer.substring(0, indirBuffer.length() - 1);
  335.   }
  336.   private static void clearDir(String dir) {
  337.     try {
  338.       Path outfile = new Path(dir);
  339.       fs.delete(outfile);
  340.     } catch (IOException ex) {
  341.       ex.printStackTrace();
  342.       System.out.println("delete file error:");
  343.       System.out.println(ex.toString());
  344.     }
  345.   }
  346.   private boolean select(int total, int selected, int index) {
  347.     if (selected <= 0 || selected >= total) {
  348.       return selected > 0;
  349.     }
  350.     int step = total / selected;
  351.     int effectiveTotal = total - total % selected;
  352.     return (index <= effectiveTotal - 1 && (index % step == 0));
  353.   }
  354.   private static String addTSSuffix(String s) {
  355.     Date date = Calendar.getInstance().getTime();
  356.     String ts = String.valueOf(date.getTime());
  357.     return s + "_" + ts;
  358.   }
  359.   private void addJobs(GridMixJob job, Size size) throws IOException {
  360.     final String prefix = String.format("%s.%sJobs", job.getName(), size);
  361.     int[] numJobs = getInts(config, prefix + ".numOfJobs",
  362.         size.defaultNumJobs());
  363.     int[] numReduces = getInts(config, prefix + ".numOfReduces",
  364.         size.defaultNumReducers());
  365.     if (numJobs.length != numReduces.length) {
  366.       throw new IOException("Configuration error: " +
  367.           prefix + ".numOfJobs must match " +
  368.           prefix + ".numOfReduces");
  369.     }
  370.     int numMapoutputCompressed = config.getInt(
  371.         prefix + ".numOfMapoutputCompressed", 0);
  372.     int numOutputCompressed = config.getInt(
  373.         prefix + ".numOfOutputCompressed", size.defaultNumJobs());
  374.     int totalJobs = 0;
  375.     for (int nJob : numJobs) {
  376.       totalJobs += nJob;
  377.     }
  378.     int currentIndex = 0;
  379.     for (int i = 0; i < numJobs.length; ++i) {
  380.       for (int j = 0; j < numJobs[i]; ++j) {
  381.         boolean mapoutputComp =
  382.           select(totalJobs, numMapoutputCompressed, currentIndex);
  383.         boolean outputComp =
  384.           select(totalJobs, numOutputCompressed, currentIndex);
  385.         job.addJob(numReduces[i], mapoutputComp, outputComp, size, gridmix);
  386.         ++numOfJobs;
  387.         ++currentIndex;
  388.       }
  389.     }
  390.   }
  391.   private void addAllJobs(GridMixJob job) throws IOException {
  392.     for (Size size : EnumSet.allOf(Size.class)) {
  393.       addJobs(job, size);
  394.     }
  395.   }
  396.   public void addjobs() throws IOException {
  397.     for (GridMixJob jobtype : EnumSet.allOf(GridMixJob.class)) {
  398.       addAllJobs(jobtype);
  399.     }
  400.     System.out.println("total " + gridmix.getWaitingJobs().size() + " jobs");
  401.   }
  402.   class SimpleStats {
  403.     long minValue;
  404.     long maxValue;
  405.     long averageValue;
  406.     long mediumValue;
  407.     int n;
  408.     SimpleStats(long[] data) {
  409.       Arrays.sort(data);
  410.       n = data.length;
  411.       minValue = data[0];
  412.       maxValue = data[n - 1];
  413.       mediumValue = data[n / 2];
  414.       long total = 0;
  415.       for (int i = 0; i < n; i++) {
  416.         total += data[i];
  417.       }
  418.       averageValue = total / n;
  419.     }
  420.   }
  421.   class TaskExecutionStats {
  422.     TreeMap<String, SimpleStats> theStats;
  423.     void computeStats(String name, long[] data) {
  424.       SimpleStats v = new SimpleStats(data);
  425.       theStats.put(name, v);
  426.     }
  427.     TaskExecutionStats() {
  428.       theStats = new TreeMap<String, SimpleStats>();
  429.     }
  430.   }
  431.   private TreeMap<String, String> getStatForJob(Job job) {
  432.     TreeMap<String, String> retv = new TreeMap<String, String>();
  433.     String mapreduceID = job.getAssignedJobID().toString();
  434.     JobClient jc = job.getJobClient();
  435.     JobConf jobconf = job.getJobConf();
  436.     String jobName = jobconf.getJobName();
  437.     retv.put("JobId", mapreduceID);
  438.     retv.put("JobName", jobName);
  439.     TaskExecutionStats theTaskExecutionStats = new TaskExecutionStats();
  440.     try {
  441.       RunningJob running = jc.getJob(JobID.forName(mapreduceID));
  442.       Counters jobCounters = running.getCounters();
  443.       Iterator<Group> groups = jobCounters.iterator();
  444.       while (groups.hasNext()) {
  445.         Group g = groups.next();
  446.         String gn = g.getName();
  447.         Iterator<Counters.Counter> cs = g.iterator();
  448.         while (cs.hasNext()) {
  449.           Counters.Counter c = cs.next();
  450.           String n = c.getName();
  451.           long v = c.getCounter();
  452.           retv.put(mapreduceID + "." + jobName + "." + gn + "." + n, "" + v);
  453.         }
  454.       }
  455.       TaskReport[] maps = jc.getMapTaskReports(JobID.forName(mapreduceID));
  456.       TaskReport[] reduces = jc
  457.           .getReduceTaskReports(JobID.forName(mapreduceID));
  458.       retv.put(mapreduceID + "." + jobName + "." + "numOfMapTasks", ""
  459.           + maps.length);
  460.       retv.put(mapreduceID + "." + jobName + "." + "numOfReduceTasks", ""
  461.           + reduces.length);
  462.       long[] mapExecutionTimes = new long[maps.length];
  463.       long[] reduceExecutionTimes = new long[reduces.length];
  464.       Date date = Calendar.getInstance().getTime();
  465.       long startTime = date.getTime();
  466.       long finishTime = 0;
  467.       for (int j = 0; j < maps.length; j++) {
  468.         TaskReport map = maps[j];
  469.         long thisStartTime = map.getStartTime();
  470.         long thisFinishTime = map.getFinishTime();
  471.         if (thisStartTime > 0 && thisFinishTime > 0) {
  472.           mapExecutionTimes[j] = thisFinishTime - thisStartTime;
  473.         }
  474.         if (startTime > thisStartTime) {
  475.           startTime = thisStartTime;
  476.         }
  477.         if (finishTime < thisFinishTime) {
  478.           finishTime = thisFinishTime;
  479.         }
  480.       }
  481.       theTaskExecutionStats.computeStats("mapExecutionTimeStats",
  482.           mapExecutionTimes);
  483.       retv.put(mapreduceID + "." + jobName + "." + "mapStartTime", ""
  484.           + startTime);
  485.       retv.put(mapreduceID + "." + jobName + "." + "mapEndTime", ""
  486.           + finishTime);
  487.       for (int j = 0; j < reduces.length; j++) {
  488.         TaskReport reduce = reduces[j];
  489.         long thisStartTime = reduce.getStartTime();
  490.         long thisFinishTime = reduce.getFinishTime();
  491.         if (thisStartTime > 0 && thisFinishTime > 0) {
  492.           reduceExecutionTimes[j] = thisFinishTime - thisStartTime;
  493.         }
  494.         if (startTime > thisStartTime) {
  495.           startTime = thisStartTime;
  496.         }
  497.         if (finishTime < thisFinishTime) {
  498.           finishTime = thisFinishTime;
  499.         }
  500.       }
  501.       theTaskExecutionStats.computeStats("reduceExecutionTimeStats",
  502.           reduceExecutionTimes);
  503.       retv.put(mapreduceID + "." + jobName + "." + "reduceStartTime", ""
  504.           + startTime);
  505.       retv.put(mapreduceID + "." + jobName + "." + "reduceEndTime", ""
  506.           + finishTime);
  507.       if (job.getState() == Job.SUCCESS) {
  508.         retv.put(mapreduceID + "." + "jobStatus", "successful");
  509.       } else if (job.getState() == Job.FAILED) {
  510.         retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "failed");
  511.       } else {
  512.         retv.put(mapreduceID + "." + jobName + "." + "jobStatus", "unknown");
  513.       }
  514.       Iterator<Entry<String, SimpleStats>> entries = theTaskExecutionStats.theStats
  515.           .entrySet().iterator();
  516.       while (entries.hasNext()) {
  517.         Entry<String, SimpleStats> e = entries.next();
  518.         SimpleStats v = e.getValue();
  519.         retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "min",
  520.             "" + v.minValue);
  521.         retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "max",
  522.             "" + v.maxValue);
  523.         retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "."
  524.             + "medium", "" + v.mediumValue);
  525.         retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "." + "avg",
  526.             "" + v.averageValue);
  527.         retv.put(mapreduceID + "." + jobName + "." + e.getKey() + "."
  528.             + "numOfItems", "" + v.n);
  529.       }
  530.     } catch (Exception e) {
  531.       e.printStackTrace();
  532.     }
  533.     return retv;
  534.   }
  535.   private void printJobStat(TreeMap<String, String> stat) {
  536.     Iterator<Entry<String, String>> entries = stat.entrySet().iterator();
  537.     while (entries.hasNext()) {
  538.       Entry<String, String> e = entries.next();
  539.       System.out.println(e.getKey() + "t" + e.getValue());
  540.     }
  541.   }
  542.   private void printStatsForJobs(ArrayList<Job> jobs) {
  543.     for (int i = 0; i < jobs.size(); i++) {
  544.       printJobStat(getStatForJob(jobs.get(i)));
  545.     }
  546.   }
  547.   public void run() {
  548.     Thread theGridmixRunner = new Thread(gridmix);
  549.     theGridmixRunner.start();
  550.     long startTime = System.currentTimeMillis();
  551.     while (!gridmix.allFinished()) {
  552.       System.out.println("Jobs in waiting state: "
  553.           + gridmix.getWaitingJobs().size());
  554.       System.out.println("Jobs in ready state: "
  555.           + gridmix.getReadyJobs().size());
  556.       System.out.println("Jobs in running state: "
  557.           + gridmix.getRunningJobs().size());
  558.       System.out.println("Jobs in success state: "
  559.           + gridmix.getSuccessfulJobs().size());
  560.       System.out.println("Jobs in failed state: "
  561.           + gridmix.getFailedJobs().size());
  562.       System.out.println("n");
  563.       try {
  564.         Thread.sleep(10 * 1000);
  565.       } catch (Exception e) {
  566.       }
  567.     }
  568.     long endTime = System.currentTimeMillis();
  569.     ArrayList<Job> fail = gridmix.getFailedJobs();
  570.     ArrayList<Job> succeed = gridmix.getSuccessfulJobs();
  571.     int numOfSuccessfulJob = succeed.size();
  572.     if (numOfSuccessfulJob > 0) {
  573.       System.out.println(numOfSuccessfulJob + " jobs succeeded");
  574.       printStatsForJobs(succeed);
  575.     }
  576.     int numOfFailedjob = fail.size();
  577.     if (numOfFailedjob > 0) {
  578.       System.out.println("------------------------------- ");
  579.       System.out.println(numOfFailedjob + " jobs failed");
  580.       printStatsForJobs(fail);
  581.     }
  582.     System.out.println("GridMix results:");
  583.     System.out.println("Total num of Jobs: " + numOfJobs);
  584.     System.out.println("ExecutionTime: " + ((endTime-startTime)/1000));
  585.     gridmix.stop();
  586.   }
  587.   public static void main(String argv[]) throws Exception {
  588.     GridMixRunner gridmixRunner = new GridMixRunner();
  589.     gridmixRunner.addjobs();
  590.     gridmixRunner.run();
  591.   }
  592. }