DistCh.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.tools;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.ArrayList;
  23. import java.util.List;
  24. import java.util.Stack;
  25. import org.apache.hadoop.conf.Configuration;
  26. import org.apache.hadoop.fs.FileStatus;
  27. import org.apache.hadoop.fs.FileSystem;
  28. import org.apache.hadoop.fs.Path;
  29. import org.apache.hadoop.fs.permission.FsPermission;
  30. import org.apache.hadoop.io.SequenceFile;
  31. import org.apache.hadoop.io.Text;
  32. import org.apache.hadoop.io.Writable;
  33. import org.apache.hadoop.io.WritableComparable;
  34. import org.apache.hadoop.mapred.FileOutputFormat;
  35. import org.apache.hadoop.mapred.FileSplit;
  36. import org.apache.hadoop.mapred.InputFormat;
  37. import org.apache.hadoop.mapred.InputSplit;
  38. import org.apache.hadoop.mapred.InvalidInputException;
  39. import org.apache.hadoop.mapred.JobClient;
  40. import org.apache.hadoop.mapred.JobConf;
  41. import org.apache.hadoop.mapred.Mapper;
  42. import org.apache.hadoop.mapred.OutputCollector;
  43. import org.apache.hadoop.mapred.RecordReader;
  44. import org.apache.hadoop.mapred.Reporter;
  45. import org.apache.hadoop.mapred.SequenceFileRecordReader;
  46. import org.apache.hadoop.util.StringUtils;
  47. import org.apache.hadoop.util.ToolRunner;
  48. /**
  49.  * A Map-reduce program to recursively change files properties
  50.  * such as owner, group and permission.
  51.  */
  52. public class DistCh extends DistTool {
  53.   static final String NAME = "distch";
  54.   static final String JOB_DIR_LABEL = NAME + ".job.dir";
  55.   static final String OP_LIST_LABEL = NAME + ".op.list";
  56.   static final String OP_COUNT_LABEL = NAME + ".op.count";
  57.   static final String USAGE = "java " + DistCh.class.getName() 
  58.       + " [OPTIONS] <path:owner:group:permission>+ "
  59.       + "nnThe values of owner, group and permission can be empty."
  60.       + "nPermission is a octal number."
  61.       + "nnOPTIONS:"
  62.       + "n-f <urilist_uri>       Use list at <urilist_uri> as src list"
  63.       + "n-i                     Ignore failures"
  64.       + "n-log <logdir>          Write logs to <logdir>"
  65.       ;
  66.   private static final long OP_PER_MAP =  1000;
  67.   private static final int MAX_MAPS_PER_NODE = 20;
  68.   private static final int SYNC_FILE_MAX = 10;
  69.   static enum Counter { SUCCEED, FAIL }
  70.   static enum Option {
  71.     IGNORE_FAILURES("-i", NAME + ".ignore.failures");
  72.     final String cmd, propertyname;
  73.     private Option(String cmd, String propertyname) {
  74.       this.cmd = cmd;
  75.       this.propertyname = propertyname;
  76.     }
  77.   }
  78.   DistCh(Configuration conf) {
  79.     super(createJobConf(conf));
  80.   }
  81.   private static JobConf createJobConf(Configuration conf) {
  82.     JobConf jobconf = new JobConf(conf, DistCh.class);
  83.     jobconf.setJobName(NAME);
  84.     jobconf.setMapSpeculativeExecution(false);
  85.     jobconf.setInputFormat(ChangeInputFormat.class);
  86.     jobconf.setOutputKeyClass(Text.class);
  87.     jobconf.setOutputValueClass(Text.class);
  88.     jobconf.setMapperClass(ChangeFilesMapper.class);
  89.     jobconf.setNumReduceTasks(0);
  90.     return jobconf;
  91.   }
  92.   /** File operations. */
  93.   static class FileOperation implements Writable {
  94.     private Path src;
  95.     private String owner;
  96.     private String group;
  97.     private FsPermission permission;
  98.     FileOperation() {}
  99.     FileOperation(Path src, FileOperation that) {
  100.       this.src = src;
  101.       this.owner = that.owner;
  102.       this.group = that.group;
  103.       this.permission = that.permission;
  104.       checkState();
  105.     }
  106.     /**
  107.      * path:owner:group:permission
  108.      * e.g.
  109.      * /user/foo:foo:bar:700 
  110.      */
  111.     FileOperation(String line) {
  112.       try {
  113.         String[] t = line.split(":", 4);
  114.         for(int i = 0; i < t.length; i++) {
  115.           if ("".equals(t[i])) {
  116.             t[i] = null;
  117.           }
  118.         }
  119.         src = new Path(t[0]);
  120.         owner = t[1];
  121.         group = t[2];
  122.         permission = t[3] == null? null:
  123.           new FsPermission(Short.parseShort(t[3], 8));
  124.         checkState();
  125.       }
  126.       catch(Exception e) {
  127.         throw (IllegalArgumentException)new IllegalArgumentException(
  128.             "line=" + line).initCause(e);
  129.       }
  130.     }
  131.     private void checkState() throws IllegalStateException {
  132.       if (owner == null && group == null && permission == null) {
  133.         throw new IllegalStateException(
  134.             "owner == null && group == null && permission == null");
  135.       }
  136.     }
  137.     static final FsPermission FILE_UMASK
  138.         = FsPermission.createImmutable((short)0111);
  139.     private boolean isDifferent(FileStatus original) {
  140.       if (owner != null && !owner.equals(original.getOwner())) {
  141.         return true;
  142.       }
  143.       if (group != null && !group.equals(original.getGroup())) {
  144.         return true;
  145.       }
  146.       if (permission != null) {
  147.         FsPermission orig = original.getPermission();
  148.         return original.isDir()? !permission.equals(orig):
  149.           !permission.applyUMask(FILE_UMASK).equals(orig);
  150.       }
  151.       return false;
  152.     }
  153.     void run(Configuration conf) throws IOException {
  154.       FileSystem fs = src.getFileSystem(conf);
  155.       if (permission != null) {
  156.         fs.setPermission(src, permission);
  157.       }
  158.       if (owner != null || group != null) {
  159.         fs.setOwner(src, owner, group);
  160.       }
  161.     }
  162.     /** {@inheritDoc} */
  163.     public void readFields(DataInput in) throws IOException {
  164.       this.src = new Path(Text.readString(in));
  165.       owner = DistTool.readString(in);
  166.       group = DistTool.readString(in);
  167.       permission = in.readBoolean()? FsPermission.read(in): null;
  168.     }
  169.     /** {@inheritDoc} */
  170.     public void write(DataOutput out) throws IOException {
  171.       Text.writeString(out, src.toString());
  172.       DistTool.writeString(out, owner);
  173.       DistTool.writeString(out, group);
  174.       boolean b = permission != null;
  175.       out.writeBoolean(b);
  176.       if (b) {permission.write(out);}
  177.     }
  178.     /** {@inheritDoc} */
  179.     public String toString() {
  180.       return src + ":" + owner + ":" + group + ":" + permission; 
  181.     }
  182.   }
  183.   /** Responsible for generating splits of the src file list. */
  184.   static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
  185.     /** Do nothing. */
  186.     public void validateInput(JobConf job) {}
  187.     /**
  188.      * Produce splits such that each is no greater than the quotient of the
  189.      * total size and the number of splits requested.
  190.      * @param job The handle to the JobConf object
  191.      * @param numSplits Number of splits requested
  192.      */
  193.     public InputSplit[] getSplits(JobConf job, int numSplits
  194.         ) throws IOException {
  195.       final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
  196.       final int targetcount = srcCount / numSplits;
  197.       String srclist = job.get(OP_LIST_LABEL, "");
  198.       if (srcCount < 0 || "".equals(srclist)) {
  199.         throw new RuntimeException("Invalid metadata: #files(" + srcCount +
  200.                                    ") listuri(" + srclist + ")");
  201.       }
  202.       Path srcs = new Path(srclist);
  203.       FileSystem fs = srcs.getFileSystem(job);
  204.       List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  205.       Text key = new Text();
  206.       FileOperation value = new FileOperation();
  207.       SequenceFile.Reader in = null;
  208.       long prev = 0L;
  209.       int count = 0; //count src
  210.       try {
  211.         for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
  212.           long curr = in.getPosition();
  213.           long delta = curr - prev;
  214.           if (++count > targetcount) {
  215.             count = 0;
  216.             splits.add(new FileSplit(srcs, prev, delta, (String[])null));
  217.             prev = curr;
  218.           }
  219.         }
  220.       }
  221.       finally {
  222.         in.close();
  223.       }
  224.       long remaining = fs.getFileStatus(srcs).getLen() - prev;
  225.       if (remaining != 0) {
  226.         splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
  227.       }
  228.       LOG.info("numSplits="  + numSplits + ", splits.size()=" + splits.size());
  229.       return splits.toArray(new FileSplit[splits.size()]);
  230.     }
  231.     /** {@inheritDoc} */
  232.     public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
  233.         JobConf job, Reporter reporter) throws IOException {
  234.       return new SequenceFileRecordReader<Text, FileOperation>(job,
  235.           (FileSplit)split);
  236.     }
  237.   }
  238.   /** The mapper for changing files. */
  239.   static class ChangeFilesMapper 
  240.       implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
  241.     private JobConf jobconf;
  242.     private boolean ignoreFailures;
  243.     private int failcount = 0;
  244.     private int succeedcount = 0;
  245.     private String getCountString() {
  246.       return "Succeeded: " + succeedcount + " Failed: " + failcount;
  247.     }
  248.     /** {@inheritDoc} */
  249.     public void configure(JobConf job) {
  250.       this.jobconf = job;
  251.       ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
  252.     }
  253.     /** Run a FileOperation */
  254.     public void map(Text key, FileOperation value,
  255.         OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
  256.         ) throws IOException {
  257.       try {
  258.         value.run(jobconf);
  259.         ++succeedcount;
  260.         reporter.incrCounter(Counter.SUCCEED, 1);
  261.       } catch (IOException e) {
  262.         ++failcount;
  263.         reporter.incrCounter(Counter.FAIL, 1);
  264.         String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
  265.         out.collect(null, new Text(s));
  266.         LOG.info(s);
  267.       } finally {
  268.         reporter.setStatus(getCountString());
  269.       }
  270.     }
  271.     /** {@inheritDoc} */
  272.     public void close() throws IOException {
  273.       if (failcount == 0 || ignoreFailures) {
  274.         return;
  275.       }
  276.       throw new IOException(getCountString());
  277.     }
  278.   }
  279.   private static void check(Configuration conf, List<FileOperation> ops
  280.       ) throws InvalidInputException {
  281.     List<Path> srcs = new ArrayList<Path>();
  282.     for(FileOperation op : ops) {
  283.       srcs.add(op.src);
  284.     }
  285.     DistTool.checkSource(conf, srcs);
  286.   }
  287.   private static List<FileOperation> fetchList(Configuration conf, Path inputfile
  288.       ) throws IOException {
  289.     List<FileOperation> result = new ArrayList<FileOperation>();
  290.     for(String line : readFile(conf, inputfile)) {
  291.       result.add(new FileOperation(line));
  292.     }
  293.     return result;
  294.   }
  295.   /** This is the main driver for recursively changing files properties. */
  296.   public int run(String[] args) throws Exception {
  297.     List<FileOperation> ops = new ArrayList<FileOperation>();
  298.     Path logpath = null;
  299.     boolean isIgnoreFailures = false;
  300.     try {
  301.       for (int idx = 0; idx < args.length; idx++) {
  302.         if ("-f".equals(args[idx])) {
  303.           if (++idx ==  args.length) {
  304.             System.out.println("urilist_uri not specified");
  305.             System.out.println(USAGE);
  306.             return -1;
  307.           }
  308.           ops.addAll(fetchList(jobconf, new Path(args[idx])));
  309.         } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
  310.           isIgnoreFailures = true;
  311.         } else if ("-log".equals(args[idx])) {
  312.           if (++idx ==  args.length) {
  313.             System.out.println("logdir not specified");
  314.             System.out.println(USAGE);
  315.             return -1;
  316.           }
  317.           logpath = new Path(args[idx]);
  318.         } else if ('-' == args[idx].codePointAt(0)) {
  319.           System.out.println("Invalid switch " + args[idx]);
  320.           System.out.println(USAGE);
  321.           ToolRunner.printGenericCommandUsage(System.out);
  322.           return -1;
  323.         } else {
  324.           ops.add(new FileOperation(args[idx]));
  325.         }
  326.       }
  327.       // mandatory command-line parameters
  328.       if (ops.isEmpty()) {
  329.         throw new IllegalStateException("Operation is empty");
  330.       }
  331.       LOG.info("ops=" + ops);
  332.       LOG.info("isIgnoreFailures=" + isIgnoreFailures);
  333.       jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
  334.       check(jobconf, ops);
  335.       try {
  336.         if (setup(ops, logpath)) {
  337.           JobClient.runJob(jobconf);
  338.         }
  339.       } finally {
  340.         try {
  341.           if (logpath == null) {
  342.             //delete log directory
  343.             final Path logdir = FileOutputFormat.getOutputPath(jobconf);
  344.             if (logdir != null) {
  345.               logdir.getFileSystem(jobconf).delete(logdir, true);
  346.             }
  347.           }
  348.         }
  349.         finally {
  350.           //delete job directory
  351.           final String jobdir = jobconf.get(JOB_DIR_LABEL);
  352.           if (jobdir != null) {
  353.             final Path jobpath = new Path(jobdir);
  354.             jobpath.getFileSystem(jobconf).delete(jobpath, true);
  355.           }
  356.         }
  357.       }
  358.     } catch(DuplicationException e) {
  359.       LOG.error("Input error:", e);
  360.       return DuplicationException.ERROR_CODE;
  361.     } catch(Exception e) {
  362.       LOG.error(NAME + " failed: ", e);
  363.       System.out.println(USAGE);
  364.       ToolRunner.printGenericCommandUsage(System.out);
  365.       return -1;
  366.     }
  367.     return 0;
  368.   }
  369.   /** Calculate how many maps to run. */
  370.   private static int getMapCount(int srcCount, int numNodes) {
  371.     int numMaps = (int)(srcCount / OP_PER_MAP);
  372.     numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
  373.     return Math.max(numMaps, 1);
  374.   }
  375.   private boolean setup(List<FileOperation> ops, Path log) throws IOException {
  376.     final String randomId = getRandomId();
  377.     JobClient jClient = new JobClient(jobconf);
  378.     Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
  379.     LOG.info(JOB_DIR_LABEL + "=" + jobdir);
  380.     if (log == null) {
  381.       log = new Path(jobdir, "_logs");
  382.     }
  383.     FileOutputFormat.setOutputPath(jobconf, log);
  384.     LOG.info("log=" + log);
  385.     //create operation list
  386.     FileSystem fs = jobdir.getFileSystem(jobconf);
  387.     Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
  388.     jobconf.set(OP_LIST_LABEL, opList.toString());
  389.     int opCount = 0, synCount = 0;
  390.     SequenceFile.Writer opWriter = null;
  391.     try {
  392.       opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
  393.           FileOperation.class, SequenceFile.CompressionType.NONE);
  394.       for(FileOperation op : ops) {
  395.         FileStatus srcstat = fs.getFileStatus(op.src); 
  396.         if (srcstat.isDir() && op.isDifferent(srcstat)) {
  397.           ++opCount;
  398.           opWriter.append(new Text(op.src.toString()), op);
  399.         }
  400.         Stack<Path> pathstack = new Stack<Path>();
  401.         for(pathstack.push(op.src); !pathstack.empty(); ) {
  402.           for(FileStatus stat : fs.listStatus(pathstack.pop())) {
  403.             if (stat.isDir()) {
  404.               pathstack.push(stat.getPath());
  405.             }
  406.             if (op.isDifferent(stat)) {              
  407.               ++opCount;
  408.               if (++synCount > SYNC_FILE_MAX) {
  409.                 opWriter.sync();
  410.                 synCount = 0;
  411.               }
  412.               Path f = stat.getPath();
  413.               opWriter.append(new Text(f.toString()), new FileOperation(f, op));
  414.             }
  415.           }
  416.         }
  417.       }
  418.     } finally {
  419.       opWriter.close();
  420.     }
  421.     checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
  422.     jobconf.setInt(OP_COUNT_LABEL, opCount);
  423.     LOG.info(OP_COUNT_LABEL + "=" + opCount);
  424.     jobconf.setNumMapTasks(getMapCount(opCount,
  425.         new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
  426.     return opCount != 0;    
  427.   }
  428.   private static void checkDuplication(FileSystem fs, Path file, Path sorted,
  429.     Configuration conf) throws IOException {
  430.     SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
  431.         new Text.Comparator(), Text.class, FileOperation.class, conf);
  432.     sorter.sort(file, sorted);
  433.     SequenceFile.Reader in = null;
  434.     try {
  435.       in = new SequenceFile.Reader(fs, sorted, conf);
  436.       FileOperation curop = new FileOperation();
  437.       Text prevsrc = null, cursrc = new Text(); 
  438.       for(; in.next(cursrc, curop); ) {
  439.         if (prevsrc != null && cursrc.equals(prevsrc)) {
  440.           throw new DuplicationException(
  441.             "Invalid input, there are duplicated files in the sources: "
  442.             + prevsrc + ", " + cursrc);
  443.         }
  444.         prevsrc = cursrc;
  445.         cursrc = new Text();
  446.         curop = new FileOperation();
  447.       }
  448.     }
  449.     finally {
  450.       in.close();
  451.     }
  452.   } 
  453.   public static void main(String[] args) throws Exception {
  454.     System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
  455.   }
  456. }