MRCaching.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.*;
  20. import java.util.*;
  21. import org.apache.hadoop.fs.*;
  22. import org.apache.hadoop.io.IntWritable;
  23. import org.apache.hadoop.io.LongWritable;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.mapred.JobClient;
  26. import org.apache.hadoop.mapred.JobConf;
  27. import org.apache.hadoop.mapred.Mapper;
  28. import org.apache.hadoop.mapred.OutputCollector;
  29. import org.apache.hadoop.mapred.Reducer;
  30. import org.apache.hadoop.mapred.Reporter;
  31. import org.apache.hadoop.util.*;
  32. import org.apache.hadoop.mapred.MapReduceBase;
  33. import org.apache.hadoop.filecache.*;
  34. import java.net.URI;
  35. public class MRCaching {
  36.   static String testStr = "This is a test file " + "used for testing caching "
  37.     + "jars, zip and normal files.";
  38.   /**
  39.    * Using the wordcount example and adding caching to it. The cache
  40.    * archives/files are set and then are checked in the map if they have been
  41.    * localized or not.
  42.    */
  43.   public static class MapClass extends MapReduceBase
  44.     implements Mapper<LongWritable, Text, Text, IntWritable> {
  45.     
  46.     JobConf conf;
  47.     private final static IntWritable one = new IntWritable(1);
  48.     private Text word = new Text();
  49.     public void configure(JobConf jconf) {
  50.       conf = jconf;
  51.       try {
  52.         Path[] localArchives = DistributedCache.getLocalCacheArchives(conf);
  53.         Path[] localFiles = DistributedCache.getLocalCacheFiles(conf);
  54.         // read the cached files (unzipped, unjarred and text)
  55.         // and put it into a single file TEST_ROOT_DIR/test.txt
  56.         String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
  57.         Path file = new Path("file:///", TEST_ROOT_DIR);
  58.         FileSystem fs = FileSystem.getLocal(conf);
  59.         if (!fs.mkdirs(file)) {
  60.           throw new IOException("Mkdirs failed to create " + file.toString());
  61.         }
  62.         Path fileOut = new Path(file, "test.txt");
  63.         fs.delete(fileOut, true);
  64.         DataOutputStream out = fs.create(fileOut);
  65.         for (int i = 0; i < localArchives.length; i++) {
  66.           // read out the files from these archives
  67.           File f = new File(localArchives[i].toString());
  68.           File txt = new File(f, "test.txt");
  69.           FileInputStream fin = new FileInputStream(txt);
  70.           DataInputStream din = new DataInputStream(fin);
  71.           String str = din.readLine();
  72.           din.close();
  73.           out.writeBytes(str);
  74.           out.writeBytes("n");
  75.         }
  76.         for (int i = 0; i < localFiles.length; i++) {
  77.           // read out the files from these archives
  78.           File txt = new File(localFiles[i].toString());
  79.           FileInputStream fin = new FileInputStream(txt);
  80.           DataInputStream din = new DataInputStream(fin);
  81.           String str = din.readLine();
  82.           out.writeBytes(str);
  83.           out.writeBytes("n");
  84.         }
  85.         out.close();
  86.       } catch (IOException ie) {
  87.         System.out.println(StringUtils.stringifyException(ie));
  88.       }
  89.     }
  90.     public void map(LongWritable key, Text value,
  91.                     OutputCollector<Text, IntWritable> output,
  92.                     Reporter reporter) throws IOException {
  93.       String line = value.toString();
  94.       StringTokenizer itr = new StringTokenizer(line);
  95.       while (itr.hasMoreTokens()) {
  96.         word.set(itr.nextToken());
  97.         output.collect(word, one);
  98.       }
  99.     }
  100.   }
  101.   /**
  102.    * Using the wordcount example and adding caching to it. The cache
  103.    * archives/files are set and then are checked in the map if they have been
  104.    * symlinked or not.
  105.    */
  106.   public static class MapClass2 extends MapClass {
  107.     
  108.     JobConf conf;
  109.     public void configure(JobConf jconf) {
  110.       conf = jconf;
  111.       try {
  112.         // read the cached files (unzipped, unjarred and text)
  113.         // and put it into a single file TEST_ROOT_DIR/test.txt
  114.         String TEST_ROOT_DIR = jconf.get("test.build.data","/tmp");
  115.         Path file = new Path("file:///", TEST_ROOT_DIR);
  116.         FileSystem fs = FileSystem.getLocal(conf);
  117.         if (!fs.mkdirs(file)) {
  118.           throw new IOException("Mkdirs failed to create " + file.toString());
  119.         }
  120.         Path fileOut = new Path(file, "test.txt");
  121.         fs.delete(fileOut, true);
  122.         DataOutputStream out = fs.create(fileOut); 
  123.         String[] symlinks = new String[6];
  124.         symlinks[0] = ".";
  125.         symlinks[1] = "testjar";
  126.         symlinks[2] = "testzip";
  127.         symlinks[3] = "testtgz";
  128.         symlinks[4] = "testtargz";
  129.         symlinks[5] = "testtar";
  130.         for (int i = 0; i < symlinks.length; i++) {
  131.           // read out the files from these archives
  132.           File f = new File(symlinks[i]);
  133.           File txt = new File(f, "test.txt");
  134.           FileInputStream fin = new FileInputStream(txt);
  135.           BufferedReader reader = new BufferedReader(new InputStreamReader(fin));
  136.           String str = reader.readLine();
  137.           reader.close();
  138.           out.writeBytes(str);
  139.           out.writeBytes("n");
  140.         }
  141.         out.close();
  142.       } catch (IOException ie) {
  143.         System.out.println(StringUtils.stringifyException(ie));
  144.       }
  145.     }
  146.   }
  147.   /**
  148.    * A reducer class that just emits the sum of the input values.
  149.    */
  150.   public static class ReduceClass extends MapReduceBase
  151.     implements Reducer<Text, IntWritable, Text, IntWritable> {
  152.     public void reduce(Text key, Iterator<IntWritable> values,
  153.                        OutputCollector<Text, IntWritable> output,
  154.                        Reporter reporter) throws IOException {
  155.       int sum = 0;
  156.       while (values.hasNext()) {
  157.         sum += values.next().get();
  158.       }
  159.       output.collect(key, new IntWritable(sum));
  160.     }
  161.   }
  162.   public static class TestResult {
  163.     public RunningJob job;
  164.     public boolean isOutputOk;
  165.     TestResult(RunningJob job, boolean isOutputOk) {
  166.       this.job = job;
  167.       this.isOutputOk = isOutputOk;
  168.     }
  169.   }
  170.   static void setupCache(String cacheDir, FileSystem fs) 
  171.   throws IOException {
  172.     Path localPath = new Path("build/test/cache");
  173.     Path txtPath = new Path(localPath, new Path("test.txt"));
  174.     Path jarPath = new Path(localPath, new Path("test.jar"));
  175.     Path zipPath = new Path(localPath, new Path("test.zip"));
  176.     Path tarPath = new Path(localPath, new Path("test.tgz"));
  177.     Path tarPath1 = new Path(localPath, new Path("test.tar.gz"));
  178.     Path tarPath2 = new Path(localPath, new Path("test.tar"));
  179.     Path cachePath = new Path(cacheDir);
  180.     fs.delete(cachePath, true);
  181.     if (!fs.mkdirs(cachePath)) {
  182.       throw new IOException("Mkdirs failed to create " + cachePath.toString());
  183.     }
  184.     fs.copyFromLocalFile(txtPath, cachePath);
  185.     fs.copyFromLocalFile(jarPath, cachePath);
  186.     fs.copyFromLocalFile(zipPath, cachePath);
  187.     fs.copyFromLocalFile(tarPath, cachePath);
  188.     fs.copyFromLocalFile(tarPath1, cachePath);
  189.     fs.copyFromLocalFile(tarPath2, cachePath);
  190.   }
  191.  
  192.   public static TestResult launchMRCache(String indir,
  193.                                          String outdir, String cacheDir, 
  194.                                          JobConf conf, String input) 
  195.   throws IOException {
  196.     setupCache(cacheDir, FileSystem.get(conf));
  197.     return launchMRCache(indir,outdir, cacheDir, conf, input, false); 
  198.   }
  199.   
  200.   public static TestResult launchMRCache(String indir,
  201.                                          String outdir, String cacheDir, 
  202.                                          JobConf conf, String input,
  203.                                          boolean withSymlink)
  204.     throws IOException {
  205.     String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data","/tmp"))
  206.       .toString().replace(' ', '+');
  207.     //if (TEST_ROOT_DIR.startsWith("C:")) TEST_ROOT_DIR = "/tmp";
  208.     conf.set("test.build.data", TEST_ROOT_DIR);
  209.     final Path inDir = new Path(indir);
  210.     final Path outDir = new Path(outdir);
  211.     FileSystem fs = FileSystem.get(conf);
  212.     fs.delete(outDir, true);
  213.     if (!fs.mkdirs(inDir)) {
  214.       throw new IOException("Mkdirs failed to create " + inDir.toString());
  215.     }
  216.     {
  217.       System.out.println("HERE:"+inDir);
  218.       DataOutputStream file = fs.create(new Path(inDir, "part-0"));
  219.       file.writeBytes(input);
  220.       file.close();
  221.     }
  222.     conf.setJobName("cachetest");
  223.     // the keys are words (strings)
  224.     conf.setOutputKeyClass(Text.class);
  225.     // the values are counts (ints)
  226.     conf.setOutputValueClass(IntWritable.class);
  227.     conf.setCombinerClass(MRCaching.ReduceClass.class);
  228.     conf.setReducerClass(MRCaching.ReduceClass.class);
  229.     FileInputFormat.setInputPaths(conf, inDir);
  230.     FileOutputFormat.setOutputPath(conf, outDir);
  231.     conf.setNumMapTasks(1);
  232.     conf.setNumReduceTasks(1);
  233.     conf.setSpeculativeExecution(false);
  234.     URI[] uris = new URI[6];
  235.     if (!withSymlink) {
  236.       conf.setMapperClass(MRCaching.MapClass.class);
  237.       uris[0] = fs.getUri().resolve(cacheDir + "/test.txt");
  238.       uris[1] = fs.getUri().resolve(cacheDir + "/test.jar");
  239.       uris[2] = fs.getUri().resolve(cacheDir + "/test.zip");
  240.       uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz");
  241.       uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz");
  242.       uris[5] = fs.getUri().resolve(cacheDir + "/test.tar");
  243.     } else {
  244.       DistributedCache.createSymlink(conf);
  245.       conf.setMapperClass(MRCaching.MapClass2.class);
  246.       uris[0] = fs.getUri().resolve(cacheDir + "/test.txt#" + "test.txt");
  247.       uris[1] = fs.getUri().resolve(cacheDir + "/test.jar#" + "testjar");
  248.       uris[2] = fs.getUri().resolve(cacheDir + "/test.zip#" + "testzip");
  249.       uris[3] = fs.getUri().resolve(cacheDir + "/test.tgz#" + "testtgz");
  250.       uris[4] = fs.getUri().resolve(cacheDir + "/test.tar.gz#" + "testtargz");
  251.       uris[5] = fs.getUri().resolve(cacheDir + "/test.tar#" + "testtar");
  252.     }
  253.     DistributedCache.addCacheFile(uris[0], conf);
  254.     for (int i = 1; i < 6; i++) {
  255.       DistributedCache.addCacheArchive(uris[i], conf);
  256.     }
  257.     RunningJob job = JobClient.runJob(conf);
  258.     int count = 0;
  259.     // after the job ran check to see if the input from the localized cache
  260.     // match the real string. check if there are 3 instances or not.
  261.     Path result = new Path(TEST_ROOT_DIR + "/test.txt");
  262.     {
  263.       BufferedReader file = new BufferedReader
  264.          (new InputStreamReader(FileSystem.getLocal(conf).open(result)));
  265.       String line = file.readLine();
  266.       while (line != null) {
  267.         if (!testStr.equals(line))
  268.           return new TestResult(job, false);
  269.         count++;
  270.         line = file.readLine();
  271.       }
  272.       file.close();
  273.     }
  274.     if (count != 6)
  275.       return new TestResult(job, false);
  276.     return new TestResult(job, true);
  277.   }
  278. }