DistCh.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:16k
- /**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package org.apache.hadoop.tools;
- import java.io.DataInput;
- import java.io.DataOutput;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.List;
- import java.util.Stack;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FileStatus;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.permission.FsPermission;
- import org.apache.hadoop.io.SequenceFile;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.Writable;
- import org.apache.hadoop.io.WritableComparable;
- import org.apache.hadoop.mapred.FileOutputFormat;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.InputFormat;
- import org.apache.hadoop.mapred.InputSplit;
- import org.apache.hadoop.mapred.InvalidInputException;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.SequenceFileRecordReader;
- import org.apache.hadoop.util.StringUtils;
- import org.apache.hadoop.util.ToolRunner;
- /**
- * A Map-reduce program to recursively change files properties
- * such as owner, group and permission.
- */
- public class DistCh extends DistTool {
- static final String NAME = "distch";
- static final String JOB_DIR_LABEL = NAME + ".job.dir";
- static final String OP_LIST_LABEL = NAME + ".op.list";
- static final String OP_COUNT_LABEL = NAME + ".op.count";
- static final String USAGE = "java " + DistCh.class.getName()
- + " [OPTIONS] <path:owner:group:permission>+ "
- + "nnThe values of owner, group and permission can be empty."
- + "nPermission is a octal number."
- + "nnOPTIONS:"
- + "n-f <urilist_uri> Use list at <urilist_uri> as src list"
- + "n-i Ignore failures"
- + "n-log <logdir> Write logs to <logdir>"
- ;
- private static final long OP_PER_MAP = 1000;
- private static final int MAX_MAPS_PER_NODE = 20;
- private static final int SYNC_FILE_MAX = 10;
- static enum Counter { SUCCEED, FAIL }
- static enum Option {
- IGNORE_FAILURES("-i", NAME + ".ignore.failures");
- final String cmd, propertyname;
- private Option(String cmd, String propertyname) {
- this.cmd = cmd;
- this.propertyname = propertyname;
- }
- }
- DistCh(Configuration conf) {
- super(createJobConf(conf));
- }
- private static JobConf createJobConf(Configuration conf) {
- JobConf jobconf = new JobConf(conf, DistCh.class);
- jobconf.setJobName(NAME);
- jobconf.setMapSpeculativeExecution(false);
- jobconf.setInputFormat(ChangeInputFormat.class);
- jobconf.setOutputKeyClass(Text.class);
- jobconf.setOutputValueClass(Text.class);
- jobconf.setMapperClass(ChangeFilesMapper.class);
- jobconf.setNumReduceTasks(0);
- return jobconf;
- }
- /** File operations. */
- static class FileOperation implements Writable {
- private Path src;
- private String owner;
- private String group;
- private FsPermission permission;
- FileOperation() {}
- FileOperation(Path src, FileOperation that) {
- this.src = src;
- this.owner = that.owner;
- this.group = that.group;
- this.permission = that.permission;
- checkState();
- }
- /**
- * path:owner:group:permission
- * e.g.
- * /user/foo:foo:bar:700
- */
- FileOperation(String line) {
- try {
- String[] t = line.split(":", 4);
- for(int i = 0; i < t.length; i++) {
- if ("".equals(t[i])) {
- t[i] = null;
- }
- }
- src = new Path(t[0]);
- owner = t[1];
- group = t[2];
- permission = t[3] == null? null:
- new FsPermission(Short.parseShort(t[3], 8));
- checkState();
- }
- catch(Exception e) {
- throw (IllegalArgumentException)new IllegalArgumentException(
- "line=" + line).initCause(e);
- }
- }
- private void checkState() throws IllegalStateException {
- if (owner == null && group == null && permission == null) {
- throw new IllegalStateException(
- "owner == null && group == null && permission == null");
- }
- }
- static final FsPermission FILE_UMASK
- = FsPermission.createImmutable((short)0111);
- private boolean isDifferent(FileStatus original) {
- if (owner != null && !owner.equals(original.getOwner())) {
- return true;
- }
- if (group != null && !group.equals(original.getGroup())) {
- return true;
- }
- if (permission != null) {
- FsPermission orig = original.getPermission();
- return original.isDir()? !permission.equals(orig):
- !permission.applyUMask(FILE_UMASK).equals(orig);
- }
- return false;
- }
- void run(Configuration conf) throws IOException {
- FileSystem fs = src.getFileSystem(conf);
- if (permission != null) {
- fs.setPermission(src, permission);
- }
- if (owner != null || group != null) {
- fs.setOwner(src, owner, group);
- }
- }
- /** {@inheritDoc} */
- public void readFields(DataInput in) throws IOException {
- this.src = new Path(Text.readString(in));
- owner = DistTool.readString(in);
- group = DistTool.readString(in);
- permission = in.readBoolean()? FsPermission.read(in): null;
- }
- /** {@inheritDoc} */
- public void write(DataOutput out) throws IOException {
- Text.writeString(out, src.toString());
- DistTool.writeString(out, owner);
- DistTool.writeString(out, group);
- boolean b = permission != null;
- out.writeBoolean(b);
- if (b) {permission.write(out);}
- }
- /** {@inheritDoc} */
- public String toString() {
- return src + ":" + owner + ":" + group + ":" + permission;
- }
- }
- /** Responsible for generating splits of the src file list. */
- static class ChangeInputFormat implements InputFormat<Text, FileOperation> {
- /** Do nothing. */
- public void validateInput(JobConf job) {}
- /**
- * Produce splits such that each is no greater than the quotient of the
- * total size and the number of splits requested.
- * @param job The handle to the JobConf object
- * @param numSplits Number of splits requested
- */
- public InputSplit[] getSplits(JobConf job, int numSplits
- ) throws IOException {
- final int srcCount = job.getInt(OP_COUNT_LABEL, -1);
- final int targetcount = srcCount / numSplits;
- String srclist = job.get(OP_LIST_LABEL, "");
- if (srcCount < 0 || "".equals(srclist)) {
- throw new RuntimeException("Invalid metadata: #files(" + srcCount +
- ") listuri(" + srclist + ")");
- }
- Path srcs = new Path(srclist);
- FileSystem fs = srcs.getFileSystem(job);
- List<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
- Text key = new Text();
- FileOperation value = new FileOperation();
- SequenceFile.Reader in = null;
- long prev = 0L;
- int count = 0; //count src
- try {
- for(in = new SequenceFile.Reader(fs, srcs, job); in.next(key, value); ) {
- long curr = in.getPosition();
- long delta = curr - prev;
- if (++count > targetcount) {
- count = 0;
- splits.add(new FileSplit(srcs, prev, delta, (String[])null));
- prev = curr;
- }
- }
- }
- finally {
- in.close();
- }
- long remaining = fs.getFileStatus(srcs).getLen() - prev;
- if (remaining != 0) {
- splits.add(new FileSplit(srcs, prev, remaining, (String[])null));
- }
- LOG.info("numSplits=" + numSplits + ", splits.size()=" + splits.size());
- return splits.toArray(new FileSplit[splits.size()]);
- }
- /** {@inheritDoc} */
- public RecordReader<Text, FileOperation> getRecordReader(InputSplit split,
- JobConf job, Reporter reporter) throws IOException {
- return new SequenceFileRecordReader<Text, FileOperation>(job,
- (FileSplit)split);
- }
- }
- /** The mapper for changing files. */
- static class ChangeFilesMapper
- implements Mapper<Text, FileOperation, WritableComparable<?>, Text> {
- private JobConf jobconf;
- private boolean ignoreFailures;
- private int failcount = 0;
- private int succeedcount = 0;
- private String getCountString() {
- return "Succeeded: " + succeedcount + " Failed: " + failcount;
- }
- /** {@inheritDoc} */
- public void configure(JobConf job) {
- this.jobconf = job;
- ignoreFailures=job.getBoolean(Option.IGNORE_FAILURES.propertyname,false);
- }
- /** Run a FileOperation */
- public void map(Text key, FileOperation value,
- OutputCollector<WritableComparable<?>, Text> out, Reporter reporter
- ) throws IOException {
- try {
- value.run(jobconf);
- ++succeedcount;
- reporter.incrCounter(Counter.SUCCEED, 1);
- } catch (IOException e) {
- ++failcount;
- reporter.incrCounter(Counter.FAIL, 1);
- String s = "FAIL: " + value + ", " + StringUtils.stringifyException(e);
- out.collect(null, new Text(s));
- LOG.info(s);
- } finally {
- reporter.setStatus(getCountString());
- }
- }
- /** {@inheritDoc} */
- public void close() throws IOException {
- if (failcount == 0 || ignoreFailures) {
- return;
- }
- throw new IOException(getCountString());
- }
- }
- private static void check(Configuration conf, List<FileOperation> ops
- ) throws InvalidInputException {
- List<Path> srcs = new ArrayList<Path>();
- for(FileOperation op : ops) {
- srcs.add(op.src);
- }
- DistTool.checkSource(conf, srcs);
- }
- private static List<FileOperation> fetchList(Configuration conf, Path inputfile
- ) throws IOException {
- List<FileOperation> result = new ArrayList<FileOperation>();
- for(String line : readFile(conf, inputfile)) {
- result.add(new FileOperation(line));
- }
- return result;
- }
- /** This is the main driver for recursively changing files properties. */
- public int run(String[] args) throws Exception {
- List<FileOperation> ops = new ArrayList<FileOperation>();
- Path logpath = null;
- boolean isIgnoreFailures = false;
- try {
- for (int idx = 0; idx < args.length; idx++) {
- if ("-f".equals(args[idx])) {
- if (++idx == args.length) {
- System.out.println("urilist_uri not specified");
- System.out.println(USAGE);
- return -1;
- }
- ops.addAll(fetchList(jobconf, new Path(args[idx])));
- } else if (Option.IGNORE_FAILURES.cmd.equals(args[idx])) {
- isIgnoreFailures = true;
- } else if ("-log".equals(args[idx])) {
- if (++idx == args.length) {
- System.out.println("logdir not specified");
- System.out.println(USAGE);
- return -1;
- }
- logpath = new Path(args[idx]);
- } else if ('-' == args[idx].codePointAt(0)) {
- System.out.println("Invalid switch " + args[idx]);
- System.out.println(USAGE);
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- } else {
- ops.add(new FileOperation(args[idx]));
- }
- }
- // mandatory command-line parameters
- if (ops.isEmpty()) {
- throw new IllegalStateException("Operation is empty");
- }
- LOG.info("ops=" + ops);
- LOG.info("isIgnoreFailures=" + isIgnoreFailures);
- jobconf.setBoolean(Option.IGNORE_FAILURES.propertyname, isIgnoreFailures);
- check(jobconf, ops);
- try {
- if (setup(ops, logpath)) {
- JobClient.runJob(jobconf);
- }
- } finally {
- try {
- if (logpath == null) {
- //delete log directory
- final Path logdir = FileOutputFormat.getOutputPath(jobconf);
- if (logdir != null) {
- logdir.getFileSystem(jobconf).delete(logdir, true);
- }
- }
- }
- finally {
- //delete job directory
- final String jobdir = jobconf.get(JOB_DIR_LABEL);
- if (jobdir != null) {
- final Path jobpath = new Path(jobdir);
- jobpath.getFileSystem(jobconf).delete(jobpath, true);
- }
- }
- }
- } catch(DuplicationException e) {
- LOG.error("Input error:", e);
- return DuplicationException.ERROR_CODE;
- } catch(Exception e) {
- LOG.error(NAME + " failed: ", e);
- System.out.println(USAGE);
- ToolRunner.printGenericCommandUsage(System.out);
- return -1;
- }
- return 0;
- }
- /** Calculate how many maps to run. */
- private static int getMapCount(int srcCount, int numNodes) {
- int numMaps = (int)(srcCount / OP_PER_MAP);
- numMaps = Math.min(numMaps, numNodes * MAX_MAPS_PER_NODE);
- return Math.max(numMaps, 1);
- }
- private boolean setup(List<FileOperation> ops, Path log) throws IOException {
- final String randomId = getRandomId();
- JobClient jClient = new JobClient(jobconf);
- Path jobdir = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
- LOG.info(JOB_DIR_LABEL + "=" + jobdir);
- if (log == null) {
- log = new Path(jobdir, "_logs");
- }
- FileOutputFormat.setOutputPath(jobconf, log);
- LOG.info("log=" + log);
- //create operation list
- FileSystem fs = jobdir.getFileSystem(jobconf);
- Path opList = new Path(jobdir, "_" + OP_LIST_LABEL);
- jobconf.set(OP_LIST_LABEL, opList.toString());
- int opCount = 0, synCount = 0;
- SequenceFile.Writer opWriter = null;
- try {
- opWriter = SequenceFile.createWriter(fs, jobconf, opList, Text.class,
- FileOperation.class, SequenceFile.CompressionType.NONE);
- for(FileOperation op : ops) {
- FileStatus srcstat = fs.getFileStatus(op.src);
- if (srcstat.isDir() && op.isDifferent(srcstat)) {
- ++opCount;
- opWriter.append(new Text(op.src.toString()), op);
- }
- Stack<Path> pathstack = new Stack<Path>();
- for(pathstack.push(op.src); !pathstack.empty(); ) {
- for(FileStatus stat : fs.listStatus(pathstack.pop())) {
- if (stat.isDir()) {
- pathstack.push(stat.getPath());
- }
- if (op.isDifferent(stat)) {
- ++opCount;
- if (++synCount > SYNC_FILE_MAX) {
- opWriter.sync();
- synCount = 0;
- }
- Path f = stat.getPath();
- opWriter.append(new Text(f.toString()), new FileOperation(f, op));
- }
- }
- }
- }
- } finally {
- opWriter.close();
- }
- checkDuplication(fs, opList, new Path(jobdir, "_sorted"), jobconf);
- jobconf.setInt(OP_COUNT_LABEL, opCount);
- LOG.info(OP_COUNT_LABEL + "=" + opCount);
- jobconf.setNumMapTasks(getMapCount(opCount,
- new JobClient(jobconf).getClusterStatus().getTaskTrackers()));
- return opCount != 0;
- }
- private static void checkDuplication(FileSystem fs, Path file, Path sorted,
- Configuration conf) throws IOException {
- SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
- new Text.Comparator(), Text.class, FileOperation.class, conf);
- sorter.sort(file, sorted);
- SequenceFile.Reader in = null;
- try {
- in = new SequenceFile.Reader(fs, sorted, conf);
- FileOperation curop = new FileOperation();
- Text prevsrc = null, cursrc = new Text();
- for(; in.next(cursrc, curop); ) {
- if (prevsrc != null && cursrc.equals(prevsrc)) {
- throw new DuplicationException(
- "Invalid input, there are duplicated files in the sources: "
- + prevsrc + ", " + cursrc);
- }
- prevsrc = cursrc;
- cursrc = new Text();
- curop = new FileOperation();
- }
- }
- finally {
- in.close();
- }
- }
- public static void main(String[] args) throws Exception {
- System.exit(ToolRunner.run(new DistCh(new Configuration()), args));
- }
- }