DistTool.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:4k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.tools;
  19. import java.io.BufferedReader;
  20. import java.io.DataInput;
  21. import java.io.DataOutput;
  22. import java.io.FileNotFoundException;
  23. import java.io.IOException;
  24. import java.io.InputStreamReader;
  25. import java.util.ArrayList;
  26. import java.util.List;
  27. import java.util.Random;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.conf.Configuration;
  31. import org.apache.hadoop.fs.FileSystem;
  32. import org.apache.hadoop.fs.Path;
  33. import org.apache.hadoop.io.Text;
  34. import org.apache.hadoop.mapred.InvalidInputException;
  35. import org.apache.hadoop.mapred.JobConf;
  36. /**
  37.  * An abstract class for distributed tool for file related operations.
  38.  */
  39. abstract class DistTool implements org.apache.hadoop.util.Tool {
  40.   protected static final Log LOG = LogFactory.getLog(DistTool.class);
  41.   protected JobConf jobconf;
  42.   /** {@inheritDoc} */
  43.   public void setConf(Configuration conf) {
  44.     if (jobconf != conf) {
  45.       jobconf = conf instanceof JobConf? (JobConf)conf: new JobConf(conf);
  46.     }
  47.   }
  48.   /** {@inheritDoc} */
  49.   public JobConf getConf() {return jobconf;}
  50.   protected DistTool(Configuration conf) {setConf(conf);}
  51.   private static final Random RANDOM = new Random();
  52.   protected static String getRandomId() {
  53.     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
  54.   }
  55.   /** Sanity check for source */
  56.   protected static void checkSource(Configuration conf, List<Path> srcs
  57.       ) throws InvalidInputException {
  58.     List<IOException> ioes = new ArrayList<IOException>();
  59.     for(Path p : srcs) {
  60.       try {
  61.         if (!p.getFileSystem(conf).exists(p)) {
  62.           ioes.add(new FileNotFoundException("Source "+p+" does not exist."));
  63.         }
  64.       }
  65.       catch(IOException e) {ioes.add(e);}
  66.     }
  67.     if (!ioes.isEmpty()) {
  68.       throw new InvalidInputException(ioes);
  69.     }
  70.   }
  71.   protected static String readString(DataInput in) throws IOException {
  72.     if (in.readBoolean()) {
  73.       return Text.readString(in);
  74.     }
  75.     return null;
  76.   }
  77.   protected static void writeString(DataOutput out, String s
  78.       ) throws IOException {
  79.     boolean b = s != null;
  80.     out.writeBoolean(b);
  81.     if (b) {Text.writeString(out, s);}
  82.   }
  83.   protected static List<String> readFile(Configuration conf, Path inputfile
  84.       ) throws IOException {
  85.     List<String> result = new ArrayList<String>();
  86.     FileSystem fs = inputfile.getFileSystem(conf);
  87.     BufferedReader input = null;
  88.     try {
  89.       input = new BufferedReader(new InputStreamReader(fs.open(inputfile)));
  90.       for(String line; (line = input.readLine()) != null;) {
  91.         result.add(line);
  92.       }
  93.     } finally {
  94.       input.close();
  95.     }
  96.     return result;
  97.   }
  98.   /** An exception class for duplicated source files. */
  99.   public static class DuplicationException extends IOException {
  100.     private static final long serialVersionUID = 1L;
  101.     /** Error code for this exception */
  102.     public static final int ERROR_CODE = -2;
  103.     DuplicationException(String message) {super(message);}
  104.   }
  105. }