UtilsForTests.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:19k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.text.DecimalFormat;
  20. import java.io.*;
  21. import java.util.Arrays;
  22. import java.util.Iterator;
  23. import org.apache.hadoop.conf.Configuration;
  24. import org.apache.hadoop.examples.RandomWriter;
  25. import org.apache.hadoop.fs.Path;
  26. import org.apache.hadoop.fs.FileSystem;
  27. import org.apache.hadoop.hdfs.DFSTestUtil;
  28. import org.apache.hadoop.hdfs.MiniDFSCluster;
  29. import org.apache.hadoop.hdfs.server.namenode.NameNode;
  30. import org.apache.hadoop.io.BytesWritable;
  31. import org.apache.hadoop.io.SequenceFile;
  32. import org.apache.hadoop.io.Text;
  33. import org.apache.hadoop.io.Writable;
  34. import org.apache.hadoop.io.IntWritable;
  35. import org.apache.hadoop.io.LongWritable;
  36. import org.apache.hadoop.io.WritableComparable;
  37. import org.apache.hadoop.io.SequenceFile.CompressionType;
  38. import org.apache.hadoop.mapred.JobConf;
  39. import org.apache.hadoop.mapred.SortValidator.RecordStatsChecker.NonSplitableSequenceFileInputFormat;
  40. import org.apache.hadoop.mapred.lib.IdentityMapper;
  41. import org.apache.hadoop.mapred.lib.IdentityReducer;
  42. /** 
  43.  * Utilities used in unit test.
  44.  *  
  45.  */
  46. public class UtilsForTests {
  47.   final static long KB = 1024L * 1;
  48.   final static long MB = 1024L * KB;
  49.   final static long GB = 1024L * MB;
  50.   final static long TB = 1024L * GB;
  51.   final static long PB = 1024L * TB;
  52.   final static Object waitLock = new Object();
  53.   static DecimalFormat dfm = new DecimalFormat("####.000");
  54.   static DecimalFormat ifm = new DecimalFormat("###,###,###,###,###");
  55.   public static String dfmt(double d) {
  56.     return dfm.format(d);
  57.   }
  58.   public static String ifmt(double d) {
  59.     return ifm.format(d);
  60.   }
  61.   public static String formatBytes(long numBytes) {
  62.     StringBuffer buf = new StringBuffer();
  63.     boolean bDetails = true;
  64.     double num = numBytes;
  65.     if (numBytes < KB) {
  66.       buf.append(numBytes + " B");
  67.       bDetails = false;
  68.     } else if (numBytes < MB) {
  69.       buf.append(dfmt(num / KB) + " KB");
  70.     } else if (numBytes < GB) {
  71.       buf.append(dfmt(num / MB) + " MB");
  72.     } else if (numBytes < TB) {
  73.       buf.append(dfmt(num / GB) + " GB");
  74.     } else if (numBytes < PB) {
  75.       buf.append(dfmt(num / TB) + " TB");
  76.     } else {
  77.       buf.append(dfmt(num / PB) + " PB");
  78.     }
  79.     if (bDetails) {
  80.       buf.append(" (" + ifmt(numBytes) + " bytes)");
  81.     }
  82.     return buf.toString();
  83.   }
  84.   public static String formatBytes2(long numBytes) {
  85.     StringBuffer buf = new StringBuffer();
  86.     long u = 0;
  87.     if (numBytes >= TB) {
  88.       u = numBytes / TB;
  89.       numBytes -= u * TB;
  90.       buf.append(u + " TB ");
  91.     }
  92.     if (numBytes >= GB) {
  93.       u = numBytes / GB;
  94.       numBytes -= u * GB;
  95.       buf.append(u + " GB ");
  96.     }
  97.     if (numBytes >= MB) {
  98.       u = numBytes / MB;
  99.       numBytes -= u * MB;
  100.       buf.append(u + " MB ");
  101.     }
  102.     if (numBytes >= KB) {
  103.       u = numBytes / KB;
  104.       numBytes -= u * KB;
  105.       buf.append(u + " KB ");
  106.     }
  107.     buf.append(u + " B"); //even if zero
  108.     return buf.toString();
  109.   }
  110.   static final String regexpSpecials = "[]()?*+|.!^-\~@";
  111.   public static String regexpEscape(String plain) {
  112.     StringBuffer buf = new StringBuffer();
  113.     char[] ch = plain.toCharArray();
  114.     int csup = ch.length;
  115.     for (int c = 0; c < csup; c++) {
  116.       if (regexpSpecials.indexOf(ch[c]) != -1) {
  117.         buf.append("\");
  118.       }
  119.       buf.append(ch[c]);
  120.     }
  121.     return buf.toString();
  122.   }
  123.   public static String safeGetCanonicalPath(File f) {
  124.     try {
  125.       String s = f.getCanonicalPath();
  126.       return (s == null) ? f.toString() : s;
  127.     } catch (IOException io) {
  128.       return f.toString();
  129.     }
  130.   }
  131.   static String slurp(File f) throws IOException {
  132.     int len = (int) f.length();
  133.     byte[] buf = new byte[len];
  134.     FileInputStream in = new FileInputStream(f);
  135.     String contents = null;
  136.     try {
  137.       in.read(buf, 0, len);
  138.       contents = new String(buf, "UTF-8");
  139.     } finally {
  140.       in.close();
  141.     }
  142.     return contents;
  143.   }
  144.   static String slurpHadoop(Path p, FileSystem fs) throws IOException {
  145.     int len = (int) fs.getLength(p);
  146.     byte[] buf = new byte[len];
  147.     InputStream in = fs.open(p);
  148.     String contents = null;
  149.     try {
  150.       in.read(buf, 0, len);
  151.       contents = new String(buf, "UTF-8");
  152.     } finally {
  153.       in.close();
  154.     }
  155.     return contents;
  156.   }
  157.   public static String rjustify(String s, int width) {
  158.     if (s == null) s = "null";
  159.     if (width > s.length()) {
  160.       s = getSpace(width - s.length()) + s;
  161.     }
  162.     return s;
  163.   }
  164.   public static String ljustify(String s, int width) {
  165.     if (s == null) s = "null";
  166.     if (width > s.length()) {
  167.       s = s + getSpace(width - s.length());
  168.     }
  169.     return s;
  170.   }
  171.   static char[] space;
  172.   static {
  173.     space = new char[300];
  174.     Arrays.fill(space, 'u0020');
  175.   }
  176.   public static String getSpace(int len) {
  177.     if (len > space.length) {
  178.       space = new char[Math.max(len, 2 * space.length)];
  179.       Arrays.fill(space, 'u0020');
  180.     }
  181.     return new String(space, 0, len);
  182.   }
  183.   
  184.   /**
  185.    * Gets job status from the jobtracker given the jobclient and the job id
  186.    */
  187.   static JobStatus getJobStatus(JobClient jc, JobID id) throws IOException {
  188.     JobStatus[] statuses = jc.getAllJobs();
  189.     for (JobStatus jobStatus : statuses) {
  190.       if (jobStatus.getJobID().equals(id)) {
  191.         return jobStatus;
  192.       }
  193.     }
  194.     return null;
  195.   }
  196.   
  197.   /**
  198.    * A utility that waits for specified amount of time
  199.    */
  200.   static void waitFor(long duration) {
  201.     try {
  202.       synchronized (waitLock) {
  203.         waitLock.wait(duration);
  204.       }
  205.     } catch (InterruptedException ie) {}
  206.   }
  207.   
  208.   /**
  209.    * Wait for the jobtracker to be RUNNING.
  210.    */
  211.   static void waitForJobTracker(JobClient jobClient) {
  212.     while (true) {
  213.       try {
  214.         ClusterStatus status = jobClient.getClusterStatus();
  215.         while (status.getJobTrackerState() != JobTracker.State.RUNNING) {
  216.           waitFor(100);
  217.           status = jobClient.getClusterStatus();
  218.         }
  219.         break; // means that the jt is ready
  220.       } catch (IOException ioe) {}
  221.     }
  222.   }
  223.   
  224.   /**
  225.    * Waits until all the jobs at the jobtracker complete.
  226.    */
  227.   static void waitTillDone(JobClient jobClient) throws IOException {
  228.     // Wait for the last job to complete
  229.     while (true) {
  230.       boolean shouldWait = false;
  231.       for (JobStatus jobStatuses : jobClient.getAllJobs()) {
  232.         if (jobStatuses.getRunState() == JobStatus.RUNNING) {
  233.           shouldWait = true;
  234.           break;
  235.         }
  236.       }
  237.       if (shouldWait) {
  238.         waitFor(1000);
  239.       } else {
  240.         break;
  241.       }
  242.     }
  243.   }
  244.   
  245.   /**
  246.    * Configure a waiting job
  247.    */
  248.   static void configureWaitingJobConf(JobConf jobConf, Path inDir,
  249.                                       Path outputPath, int numMaps, int numRed,
  250.                                       String jobName, String mapSignalFilename,
  251.                                       String redSignalFilename)
  252.   throws IOException {
  253.     jobConf.setJobName(jobName);
  254.     jobConf.setInputFormat(NonSplitableSequenceFileInputFormat.class);
  255.     jobConf.setOutputFormat(SequenceFileOutputFormat.class);
  256.     FileInputFormat.setInputPaths(jobConf, inDir);
  257.     FileOutputFormat.setOutputPath(jobConf, outputPath);
  258.     jobConf.setMapperClass(UtilsForTests.HalfWaitingMapper.class);
  259.     jobConf.setReducerClass(IdentityReducer.class);
  260.     jobConf.setOutputKeyClass(BytesWritable.class);
  261.     jobConf.setOutputValueClass(BytesWritable.class);
  262.     jobConf.setInputFormat(RandomInputFormat.class);
  263.     jobConf.setNumMapTasks(numMaps);
  264.     jobConf.setNumReduceTasks(numRed);
  265.     jobConf.setJar("build/test/testjar/testjob.jar");
  266.     jobConf.set(getTaskSignalParameter(true), mapSignalFilename);
  267.     jobConf.set(getTaskSignalParameter(false), redSignalFilename);
  268.   }
  269.   /**
  270.    * Commonly used map and reduce classes 
  271.    */
  272.   
  273.   /** 
  274.    * Map is a Mapper that just waits for a file to be created on the dfs. The 
  275.    * file creation is a signal to the mappers and hence acts as a waiting job. 
  276.    */
  277.   static class WaitingMapper 
  278.   extends MapReduceBase 
  279.   implements Mapper<WritableComparable, Writable, 
  280.                     WritableComparable, Writable> {
  281.     FileSystem fs = null;
  282.     Path signal;
  283.     int id = 0;
  284.     int totalMaps = 0;
  285.     /**
  286.      * Checks if the map task needs to wait. By default all the maps will wait.
  287.      * This method needs to be overridden to make a custom waiting mapper. 
  288.      */
  289.     public boolean shouldWait(int id) {
  290.       return true;
  291.     }
  292.     
  293.     /**
  294.      * Returns a signal file on which the map task should wait. By default all 
  295.      * the maps wait on a single file passed as test.mapred.map.waiting.target.
  296.      * This method needs to be overridden to make a custom waiting mapper
  297.      */
  298.     public Path getSignalFile(int id) {
  299.       return signal;
  300.     }
  301.     
  302.     /** The waiting function.  The map exits once it gets a signal. Here the 
  303.      * signal is the file existence. 
  304.      */
  305.     public void map(WritableComparable key, Writable val, 
  306.                     OutputCollector<WritableComparable, Writable> output,
  307.                     Reporter reporter)
  308.     throws IOException {
  309.       if (shouldWait(id)) {
  310.         if (fs != null) {
  311.           while (!fs.exists(getSignalFile(id))) {
  312.             try {
  313.               reporter.progress();
  314.               synchronized (this) {
  315.                 this.wait(1000); // wait for 1 sec
  316.               }
  317.             } catch (InterruptedException ie) {
  318.               System.out.println("Interrupted while the map was waiting for "
  319.                                  + " the signal.");
  320.               break;
  321.             }
  322.           }
  323.         } else {
  324.           throw new IOException("Could not get the DFS!!");
  325.         }
  326.       }
  327.     }
  328.     public void configure(JobConf conf) {
  329.       try {
  330.         String taskId = conf.get("mapred.task.id");
  331.         id = Integer.parseInt(taskId.split("_")[4]);
  332.         totalMaps = Integer.parseInt(conf.get("mapred.map.tasks"));
  333.         fs = FileSystem.get(conf);
  334.         signal = new Path(conf.get(getTaskSignalParameter(true)));
  335.       } catch (IOException ioe) {
  336.         System.out.println("Got an exception while obtaining the filesystem");
  337.       }
  338.     }
  339.   }
  340.   
  341.   /** Only the later half of the maps wait for the signal while the rest 
  342.    * complete immediately.
  343.    */
  344.   static class HalfWaitingMapper extends WaitingMapper {
  345.     @Override
  346.     public boolean shouldWait(int id) {
  347.       return id >= (totalMaps / 2);
  348.     }
  349.   }
  350.   
  351.   /** 
  352.    * Reduce that just waits for a file to be created on the dfs. The 
  353.    * file creation is a signal to the reduce.
  354.    */
  355.   static class WaitingReducer extends MapReduceBase 
  356.   implements Reducer<WritableComparable, Writable, 
  357.                      WritableComparable, Writable> {
  358.     FileSystem fs = null;
  359.     Path signal;
  360.     
  361.     /** The waiting function.  The reduce exits once it gets a signal. Here the
  362.      * signal is the file existence. 
  363.      */
  364.     public void reduce(WritableComparable key, Iterator<Writable> val, 
  365.                        OutputCollector<WritableComparable, Writable> output,
  366.                        Reporter reporter)
  367.     throws IOException {
  368.       if (fs != null) {
  369.         while (!fs.exists(signal)) {
  370.           try {
  371.             reporter.progress();
  372.             synchronized (this) {
  373.               this.wait(1000); // wait for 1 sec
  374.             }
  375.           } catch (InterruptedException ie) {
  376.             System.out.println("Interrupted while the map was waiting for the"
  377.                                + " signal.");
  378.             break;
  379.           }
  380.         }
  381.       } else {
  382.         throw new IOException("Could not get the DFS!!");
  383.       }
  384.     }
  385.     public void configure(JobConf conf) {
  386.       try {
  387.         fs = FileSystem.get(conf);
  388.         signal = new Path(conf.get(getTaskSignalParameter(false)));
  389.       } catch (IOException ioe) {
  390.         System.out.println("Got an exception while obtaining the filesystem");
  391.       }
  392.     }
  393.   }
  394.   
  395.   static String getTaskSignalParameter(boolean isMap) {
  396.     return isMap 
  397.            ? "test.mapred.map.waiting.target" 
  398.            : "test.mapred.reduce.waiting.target";
  399.   }
  400.   
  401.   /**
  402.    * Signal the maps/reduces to start.
  403.    */
  404.   static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
  405.                           String mapSignalFile, 
  406.                           String reduceSignalFile, int replication) 
  407.   throws IOException {
  408.     writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(mapSignalFile), 
  409.               (short)replication);
  410.     writeFile(dfs.getNameNode(), fileSys.getConf(), new Path(reduceSignalFile), 
  411.               (short)replication);
  412.   }
  413.   
  414.   /**
  415.    * Signal the maps/reduces to start.
  416.    */
  417.   static void signalTasks(MiniDFSCluster dfs, FileSystem fileSys, 
  418.                           boolean isMap, String mapSignalFile, 
  419.                           String reduceSignalFile)
  420.   throws IOException {
  421.     //  signal the maps to complete
  422.     writeFile(dfs.getNameNode(), fileSys.getConf(),
  423.               isMap 
  424.               ? new Path(mapSignalFile)
  425.               : new Path(reduceSignalFile), (short)1);
  426.   }
  427.   
  428.   static String getSignalFile(Path dir) {
  429.     return (new Path(dir, "signal")).toString();
  430.   }
  431.   
  432.   static String getMapSignalFile(Path dir) {
  433.     return (new Path(dir, "map-signal")).toString();
  434.   }
  435.   static String getReduceSignalFile(Path dir) {
  436.     return (new Path(dir, "reduce-signal")).toString();
  437.   }
  438.   
  439.   static void writeFile(NameNode namenode, Configuration conf, Path name, 
  440.       short replication) throws IOException {
  441.     FileSystem fileSys = FileSystem.get(conf);
  442.     SequenceFile.Writer writer = 
  443.       SequenceFile.createWriter(fileSys, conf, name, 
  444.                                 BytesWritable.class, BytesWritable.class,
  445.                                 CompressionType.NONE);
  446.     writer.append(new BytesWritable(), new BytesWritable());
  447.     writer.close();
  448.     fileSys.setReplication(name, replication);
  449.     DFSTestUtil.waitReplication(fileSys, name, replication);
  450.   }
  451.   
  452.   // Input formats
  453.   /**
  454.    * A custom input format that creates virtual inputs of a single string
  455.    * for each map. Using {@link RandomWriter} code. 
  456.    */
  457.   public static class RandomInputFormat implements InputFormat<Text, Text> {
  458.     
  459.     public InputSplit[] getSplits(JobConf job, 
  460.                                   int numSplits) throws IOException {
  461.       InputSplit[] result = new InputSplit[numSplits];
  462.       Path outDir = FileOutputFormat.getOutputPath(job);
  463.       for(int i=0; i < result.length; ++i) {
  464.         result[i] = new FileSplit(new Path(outDir, "dummy-split-" + i),
  465.                                   0, 1, (String[])null);
  466.       }
  467.       return result;
  468.     }
  469.     static class RandomRecordReader implements RecordReader<Text, Text> {
  470.       Path name;
  471.       public RandomRecordReader(Path p) {
  472.         name = p;
  473.       }
  474.       public boolean next(Text key, Text value) {
  475.         if (name != null) {
  476.           key.set(name.getName());
  477.           name = null;
  478.           return true;
  479.         }
  480.         return false;
  481.       }
  482.       public Text createKey() {
  483.         return new Text();
  484.       }
  485.       public Text createValue() {
  486.         return new Text();
  487.       }
  488.       public long getPos() {
  489.         return 0;
  490.       }
  491.       public void close() {}
  492.       public float getProgress() {
  493.         return 0.0f;
  494.       }
  495.     }
  496.     public RecordReader<Text, Text> getRecordReader(InputSplit split,
  497.                                                     JobConf job, 
  498.                                                     Reporter reporter) 
  499.     throws IOException {
  500.       return new RandomRecordReader(((FileSplit) split).getPath());
  501.     }
  502.   }
  503.   // Start a job and return its RunningJob object
  504.   static RunningJob runJob(JobConf conf, Path inDir, Path outDir)
  505.                     throws IOException {
  506.     FileSystem fs = FileSystem.get(conf);
  507.     fs.delete(outDir, true);
  508.     if (!fs.exists(inDir)) {
  509.       fs.mkdirs(inDir);
  510.     }
  511.     String input = "The quick brown foxn" + "has many sillyn"
  512.         + "red fox soxn";
  513.     DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  514.     file.writeBytes(input);
  515.     file.close();
  516.     conf.setInputFormat(TextInputFormat.class);
  517.     conf.setOutputKeyClass(LongWritable.class);
  518.     conf.setOutputValueClass(Text.class);
  519.     FileInputFormat.setInputPaths(conf, inDir);
  520.     FileOutputFormat.setOutputPath(conf, outDir);
  521.     conf.setNumMapTasks(1);
  522.     conf.setNumReduceTasks(1);
  523.     JobClient jobClient = new JobClient(conf);
  524.     RunningJob job = jobClient.submitJob(conf);
  525.     return job;
  526.   }
  527.   // Run a job that will be succeeded and wait until it completes
  528.   static RunningJob runJobSucceed(JobConf conf, Path inDir, Path outDir)
  529.          throws IOException {
  530.     conf.setJobName("test-job-succeed");
  531.     conf.setMapperClass(IdentityMapper.class);
  532.     conf.setReducerClass(IdentityReducer.class);
  533.     
  534.     RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
  535.     while (!job.isComplete()) {
  536.       try {
  537.         Thread.sleep(100);
  538.       } catch (InterruptedException e) {
  539.         break;
  540.       }
  541.     }
  542.     return job;
  543.   }
  544.   // Run a job that will be failed and wait until it completes
  545.   static RunningJob runJobFail(JobConf conf, Path inDir, Path outDir)
  546.          throws IOException {
  547.     conf.setJobName("test-job-fail");
  548.     conf.setMapperClass(FailMapper.class);
  549.     conf.setReducerClass(IdentityReducer.class);
  550.     
  551.     RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
  552.     while (!job.isComplete()) {
  553.       try {
  554.         Thread.sleep(100);
  555.       } catch (InterruptedException e) {
  556.         break;
  557.       }
  558.     }
  559.     return job;
  560.   }
  561.   // Run a job that will be killed and wait until it completes
  562.   static RunningJob runJobKill(JobConf conf,  Path inDir, Path outDir)
  563.          throws IOException {
  564.     conf.setJobName("test-job-kill");
  565.     conf.setMapperClass(KillMapper.class);
  566.     conf.setReducerClass(IdentityReducer.class);
  567.     
  568.     RunningJob job = UtilsForTests.runJob(conf, inDir, outDir);
  569.     while (job.getJobState() != JobStatus.RUNNING) {
  570.       try {
  571.         Thread.sleep(100);
  572.       } catch (InterruptedException e) {
  573.         break;
  574.       }
  575.     }
  576.     job.killJob();
  577.     while (job.cleanupProgress() == 0.0f) {
  578.       try {
  579.         Thread.sleep(10);
  580.       } catch (InterruptedException ie) {
  581.         break;
  582.       }
  583.     }
  584.     return job;
  585.   }
  586.   // Mapper that fails
  587.   static class FailMapper extends MapReduceBase implements
  588.       Mapper<WritableComparable, Writable, WritableComparable, Writable> {
  589.     public void map(WritableComparable key, Writable value,
  590.         OutputCollector<WritableComparable, Writable> out, Reporter reporter)
  591.         throws IOException {
  592.       throw new RuntimeException("failing map");
  593.     }
  594.   }
  595.   // Mapper that sleeps for a long time.
  596.   // Used for running a job that will be killed
  597.   static class KillMapper extends MapReduceBase implements
  598.       Mapper<WritableComparable, Writable, WritableComparable, Writable> {
  599.     public void map(WritableComparable key, Writable value,
  600.         OutputCollector<WritableComparable, Writable> out, Reporter reporter)
  601.         throws IOException {
  602.       try {
  603.         Thread.sleep(1000000);
  604.       } catch (InterruptedException e) {
  605.         // Do nothing
  606.       }
  607.     }
  608.   }
  609. }