ExternalMapReduce.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package testshell;
  19. import java.io.IOException;
  20. import java.util.Iterator;
  21. import org.apache.hadoop.conf.Configuration;
  22. import org.apache.hadoop.conf.Configured;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.IntWritable;
  25. import org.apache.hadoop.io.Writable;
  26. import org.apache.hadoop.io.WritableComparable;
  27. import org.apache.hadoop.mapred.FileInputFormat;
  28. import org.apache.hadoop.mapred.FileOutputFormat;
  29. import org.apache.hadoop.mapred.JobClient;
  30. import org.apache.hadoop.mapred.JobConf;
  31. import org.apache.hadoop.mapred.MapReduceBase;
  32. import org.apache.hadoop.mapred.Mapper;
  33. import org.apache.hadoop.mapred.OutputCollector;
  34. import org.apache.hadoop.mapred.Reducer;
  35. import org.apache.hadoop.mapred.Reporter;
  36. import org.apache.hadoop.util.Tool;
  37. import org.apache.hadoop.util.ToolRunner;
  38. /**
  39.  * will be in an external jar and used for 
  40.  * test in TestJobShell.java.
  41.  */
  42. public class ExternalMapReduce extends Configured implements Tool {
  43.   public void configure(JobConf job) {
  44.     // do nothing
  45.   }
  46.   public void close()
  47.     throws IOException {
  48.   }
  49.   public static class MapClass extends MapReduceBase 
  50.     implements Mapper<WritableComparable, Writable,
  51.                       WritableComparable, IntWritable> {
  52.     public void map(WritableComparable key, Writable value,
  53.                     OutputCollector<WritableComparable, IntWritable> output,
  54.                     Reporter reporter)
  55.       throws IOException {
  56.       //check for classpath
  57.       String classpath = System.getProperty("java.class.path");
  58.       if (classpath.indexOf("testjob.jar") == -1) {
  59.         throw new IOException("failed to find in the library " + classpath);
  60.       }
  61.       //fork off ls to see if the file exists.
  62.       // java file.exists() will not work on 
  63.       // cygwin since it is a symlink
  64.       String[] argv = new String[2];
  65.       argv[0] = "ls";
  66.       argv[1] = "files_tmp";
  67.       Process p = Runtime.getRuntime().exec(argv);
  68.       int ret = -1;
  69.       try {
  70.         ret = p.waitFor();
  71.       } catch(InterruptedException ie) {
  72.         //do nothing here.
  73.       }
  74.       if (ret != 0) {
  75.         throw new IOException("files_tmp does not exist");
  76.       }
  77.     }
  78.   }
  79.   public static class Reduce extends MapReduceBase
  80.     implements Reducer<WritableComparable, Writable,
  81.                        WritableComparable, IntWritable> {
  82.     public void reduce(WritableComparable key, Iterator<Writable> values,
  83.                        OutputCollector<WritableComparable, IntWritable> output,
  84.                        Reporter reporter)
  85.       throws IOException {
  86.      //do nothing
  87.     }
  88.   }
  89.   
  90.   public int run(String[] argv) throws IOException {
  91.     if (argv.length < 2) {
  92.       System.out.println("ExternalMapReduce <input> <output>");
  93.       return -1;
  94.     }
  95.     Path outDir = new Path(argv[1]);
  96.     Path input = new Path(argv[0]);
  97.     JobConf testConf = new JobConf(getConf(), ExternalMapReduce.class);
  98.     
  99.     //try to load a class from libjar
  100.     try {
  101.       testConf.getClassByName("testjar.ClassWordCount");
  102.     } catch (ClassNotFoundException e) {
  103.       System.out.println("Could not find class from libjar");
  104.       return -1;
  105.     }
  106.     
  107.     
  108.     testConf.setJobName("external job");
  109.     FileInputFormat.setInputPaths(testConf, input);
  110.     FileOutputFormat.setOutputPath(testConf, outDir);
  111.     testConf.setMapperClass(MapClass.class);
  112.     testConf.setReducerClass(Reduce.class);
  113.     testConf.setNumReduceTasks(1);
  114.     JobClient.runJob(testConf);
  115.     return 0;
  116.   }
  117.   
  118.   public static void main(String[] args) throws Exception {
  119.     int res = ToolRunner.run(new Configuration(),
  120.                      new ExternalMapReduce(), args);
  121.     System.exit(res);
  122.   }
  123. }