DataJoinJob.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:6k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.contrib.utils.join;
  19. import java.io.IOException;
  20. import org.apache.hadoop.conf.Configuration;
  21. import org.apache.hadoop.fs.FileSystem;
  22. import org.apache.hadoop.fs.Path;
  23. import org.apache.hadoop.io.SequenceFile;
  24. import org.apache.hadoop.io.Text;
  25. import org.apache.hadoop.mapred.FileOutputFormat;
  26. import org.apache.hadoop.mapred.JobClient;
  27. import org.apache.hadoop.mapred.JobConf;
  28. import org.apache.hadoop.mapred.FileInputFormat;
  29. import org.apache.hadoop.mapred.RunningJob;
  30. import org.apache.hadoop.mapred.SequenceFileInputFormat;
  31. import org.apache.hadoop.mapred.SequenceFileOutputFormat;
  32. import org.apache.hadoop.mapred.TextInputFormat;
  33. import org.apache.hadoop.mapred.TextOutputFormat;
  34. import org.apache.hadoop.mapred.JobID;
  35. /**
  36.  * This class implements the main function for creating a map/reduce
  37.  * job to join data of different sources. To create sucn a job, the 
  38.  * user must implement a mapper class that extends DataJoinMapperBase class,
  39.  * and a reducer class that extends DataJoinReducerBase. 
  40.  * 
  41.  */
  42. public class DataJoinJob {
  43.   public static Class getClassByName(String className) {
  44.     Class retv = null;
  45.     try {
  46.       ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
  47.       retv = Class.forName(className, true, classLoader);
  48.     } catch (Exception e) {
  49.       throw new RuntimeException(e);
  50.     }
  51.     return retv;
  52.   }
  53.   public static JobConf createDataJoinJob(String args[]) throws IOException {
  54.     String inputDir = args[0];
  55.     String outputDir = args[1];
  56.     Class inputFormat = SequenceFileInputFormat.class;
  57.     if (args[2].compareToIgnoreCase("text") != 0) {
  58.       System.out.println("Using SequenceFileInputFormat: " + args[2]);
  59.     } else {
  60.       System.out.println("Using TextInputFormat: " + args[2]);
  61.       inputFormat = TextInputFormat.class;
  62.     }
  63.     int numOfReducers = Integer.parseInt(args[3]);
  64.     Class mapper = getClassByName(args[4]);
  65.     Class reducer = getClassByName(args[5]);
  66.     Class mapoutputValueClass = getClassByName(args[6]);
  67.     Class outputFormat = TextOutputFormat.class;
  68.     Class outputValueClass = Text.class;
  69.     if (args[7].compareToIgnoreCase("text") != 0) {
  70.       System.out.println("Using SequenceFileOutputFormat: " + args[7]);
  71.       outputFormat = SequenceFileOutputFormat.class;
  72.       outputValueClass = getClassByName(args[7]);
  73.     } else {
  74.       System.out.println("Using TextOutputFormat: " + args[7]);
  75.     }
  76.     long maxNumOfValuesPerGroup = 100;
  77.     String jobName = "";
  78.     if (args.length > 8) {
  79.       maxNumOfValuesPerGroup = Long.parseLong(args[8]);
  80.     }
  81.     if (args.length > 9) {
  82.       jobName = args[9];
  83.     }
  84.     Configuration defaults = new Configuration();
  85.     JobConf job = new JobConf(defaults, DataJoinJob.class);
  86.     job.setJobName("DataJoinJob: " + jobName);
  87.     FileSystem fs = FileSystem.get(defaults);
  88.     fs.delete(new Path(outputDir));
  89.     FileInputFormat.setInputPaths(job, inputDir);
  90.     job.setInputFormat(inputFormat);
  91.     job.setMapperClass(mapper);
  92.     FileOutputFormat.setOutputPath(job, new Path(outputDir));
  93.     job.setOutputFormat(outputFormat);
  94.     SequenceFileOutputFormat.setOutputCompressionType(job,
  95.             SequenceFile.CompressionType.BLOCK);
  96.     job.setMapOutputKeyClass(Text.class);
  97.     job.setMapOutputValueClass(mapoutputValueClass);
  98.     job.setOutputKeyClass(Text.class);
  99.     job.setOutputValueClass(outputValueClass);
  100.     job.setReducerClass(reducer);
  101.     job.setNumMapTasks(1);
  102.     job.setNumReduceTasks(numOfReducers);
  103.     job.setLong("datajoin.maxNumOfValuesPerGroup", maxNumOfValuesPerGroup);
  104.     return job;
  105.   }
  106.   /**
  107.    * Submit/run a map/reduce job.
  108.    * 
  109.    * @param job
  110.    * @return true for success
  111.    * @throws IOException
  112.    */
  113.   public static boolean runJob(JobConf job) throws IOException {
  114.     JobClient jc = new JobClient(job);
  115.     boolean sucess = true;
  116.     RunningJob running = null;
  117.     try {
  118.       running = jc.submitJob(job);
  119.       JobID jobId = running.getID();
  120.       System.out.println("Job " + jobId + " is submitted");
  121.       while (!running.isComplete()) {
  122.         System.out.println("Job " + jobId + " is still running.");
  123.         try {
  124.           Thread.sleep(60000);
  125.         } catch (InterruptedException e) {
  126.         }
  127.         running = jc.getJob(jobId);
  128.       }
  129.       sucess = running.isSuccessful();
  130.     } finally {
  131.       if (!sucess && (running != null)) {
  132.         running.killJob();
  133.       }
  134.       jc.close();
  135.     }
  136.     return sucess;
  137.   }
  138.   /**
  139.    * @param args
  140.    */
  141.   public static void main(String[] args) {
  142.     boolean success;
  143.     if (args.length < 8 || args.length > 10) {
  144.       System.out.println("usage: DataJoinJob " + "inputdirs outputdir map_input_file_format "
  145.                          + "numofParts " + "mapper_class " + "reducer_class "
  146.                          + "map_output_value_class "
  147.                          + "output_value_class [maxNumOfValuesPerGroup [descriptionOfJob]]]");
  148.       System.exit(-1);
  149.     }
  150.     try {
  151.       JobConf job = DataJoinJob.createDataJoinJob(args);
  152.       success = DataJoinJob.runJob(job);
  153.       if (!success) {
  154.         System.out.println("Job failed");
  155.       }
  156.     } catch (IOException ioe) {
  157.       ioe.printStackTrace();
  158.     }
  159.   }
  160. }