GenericMRLoadJobCreator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.util.Random;
  20. import java.util.Stack;
  21. import org.apache.hadoop.fs.FileStatus;
  22. import org.apache.hadoop.fs.FileSystem;
  23. import org.apache.hadoop.fs.Path;
  24. import org.apache.hadoop.io.LongWritable;
  25. import org.apache.hadoop.io.SequenceFile;
  26. import org.apache.hadoop.io.Text;
  27. import org.apache.hadoop.mapred.GenericMRLoadGenerator;
  28. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  29. import org.apache.hadoop.mapred.JobConf;
  30. public class GenericMRLoadJobCreator extends GenericMRLoadGenerator {
  31.   public static JobConf createJob(String[] argv, boolean mapoutputCompressed,
  32.       boolean outputCompressed) throws Exception {
  33.     JobConf job = new JobConf();
  34.     job.setJarByClass(GenericMRLoadGenerator.class);
  35.     job.setMapperClass(SampleMapper.class);
  36.     job.setReducerClass(SampleReducer.class);
  37.     if (!parseArgs(argv, job)) {
  38.       return null;
  39.     }
  40.     if (null == FileOutputFormat.getOutputPath(job)) {
  41.       // No output dir? No writes
  42.       job.setOutputFormat(NullOutputFormat.class);
  43.     }
  44.     if (0 == FileInputFormat.getInputPaths(job).length) {
  45.       // No input dir? Generate random data
  46.       System.err.println("No input path; ignoring InputFormat");
  47.       confRandom(job);
  48.     } else if (null != job.getClass("mapred.indirect.input.format", null)) {
  49.       // specified IndirectInputFormat? Build src list
  50.       JobClient jClient = new JobClient(job);
  51.       Path sysdir = jClient.getSystemDir();
  52.       Random r = new Random();
  53.       Path indirInputFile = new Path(sysdir, Integer.toString(r
  54.           .nextInt(Integer.MAX_VALUE), 36)
  55.           + "_files");
  56.       job.set("mapred.indirect.input.file", indirInputFile.toString());
  57.       SequenceFile.Writer writer = SequenceFile.createWriter(sysdir
  58.           .getFileSystem(job), job, indirInputFile, LongWritable.class,
  59.           Text.class, SequenceFile.CompressionType.NONE);
  60.       try {
  61.         for (Path p : FileInputFormat.getInputPaths(job)) {
  62.           FileSystem fs = p.getFileSystem(job);
  63.           Stack<Path> pathstack = new Stack<Path>();
  64.           pathstack.push(p);
  65.           while (!pathstack.empty()) {
  66.             for (FileStatus stat : fs.listStatus(pathstack.pop())) {
  67.               if (stat.isDir()) {
  68.                 if (!stat.getPath().getName().startsWith("_")) {
  69.                   pathstack.push(stat.getPath());
  70.                 }
  71.               } else {
  72.                 writer.sync();
  73.                 writer.append(new LongWritable(stat.getLen()), new Text(stat
  74.                     .getPath().toUri().toString()));
  75.               }
  76.             }
  77.           }
  78.         }
  79.       } finally {
  80.         writer.close();
  81.       }
  82.     }
  83.     job.setCompressMapOutput(mapoutputCompressed);
  84.     job.setBoolean("mapred.output.compress", outputCompressed);
  85.     return job;
  86.   }
  87. }