DistCp.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:47k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.tools;
  19. import java.io.BufferedReader;
  20. import java.io.DataInput;
  21. import java.io.DataOutput;
  22. import java.io.FileNotFoundException;
  23. import java.io.IOException;
  24. import java.io.InputStreamReader;
  25. import java.util.ArrayList;
  26. import java.util.EnumSet;
  27. import java.util.Iterator;
  28. import java.util.List;
  29. import java.util.Random;
  30. import java.util.Stack;
  31. import java.util.StringTokenizer;
  32. import org.apache.commons.logging.Log;
  33. import org.apache.commons.logging.LogFactory;
  34. import org.apache.hadoop.conf.Configuration;
  35. import org.apache.hadoop.fs.FSDataInputStream;
  36. import org.apache.hadoop.fs.FSDataOutputStream;
  37. import org.apache.hadoop.fs.FileChecksum;
  38. import org.apache.hadoop.fs.FileStatus;
  39. import org.apache.hadoop.fs.FileSystem;
  40. import org.apache.hadoop.fs.FsShell;
  41. import org.apache.hadoop.fs.Path;
  42. import org.apache.hadoop.fs.permission.FsPermission;
  43. import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
  44. import org.apache.hadoop.io.LongWritable;
  45. import org.apache.hadoop.io.SequenceFile;
  46. import org.apache.hadoop.io.Text;
  47. import org.apache.hadoop.io.Writable;
  48. import org.apache.hadoop.io.WritableComparable;
  49. import org.apache.hadoop.ipc.RemoteException;
  50. import org.apache.hadoop.mapred.FileOutputFormat;
  51. import org.apache.hadoop.mapred.FileSplit;
  52. import org.apache.hadoop.mapred.InputFormat;
  53. import org.apache.hadoop.mapred.InputSplit;
  54. import org.apache.hadoop.mapred.InvalidInputException;
  55. import org.apache.hadoop.mapred.JobClient;
  56. import org.apache.hadoop.mapred.JobConf;
  57. import org.apache.hadoop.mapred.Mapper;
  58. import org.apache.hadoop.mapred.OutputCollector;
  59. import org.apache.hadoop.mapred.RecordReader;
  60. import org.apache.hadoop.mapred.Reporter;
  61. import org.apache.hadoop.mapred.SequenceFileRecordReader;
  62. import org.apache.hadoop.security.AccessControlException;
  63. import org.apache.hadoop.util.StringUtils;
  64. import org.apache.hadoop.util.Tool;
  65. import org.apache.hadoop.util.ToolRunner;
  66. /**
  67.  * A Map-reduce program to recursively copy directories between
  68.  * different file-systems.
  69.  */
  70. public class DistCp implements Tool {
  71.   public static final Log LOG = LogFactory.getLog(DistCp.class);
  72.   private static final String NAME = "distcp";
  73.   private static final String usage = NAME
  74.     + " [OPTIONS] <srcurl>* <desturl>" +
  75.     "nnOPTIONS:" +
  76.     "n-p[rbugp]              Preserve status" +
  77.     "n                       r: replication number" +
  78.     "n                       b: block size" +
  79.     "n                       u: user" + 
  80.     "n                       g: group" +
  81.     "n                       p: permission" +
  82.     "n                       -p alone is equivalent to -prbugp" +
  83.     "n-i                     Ignore failures" +
  84.     "n-log <logdir>          Write logs to <logdir>" +
  85.     "n-m <num_maps>          Maximum number of simultaneous copies" +
  86.     "n-overwrite             Overwrite destination" +
  87.     "n-update                Overwrite if src size different from dst size" +
  88.     "n-f <urilist_uri>       Use list at <urilist_uri> as src list" +
  89.     "n-filelimit <n>         Limit the total number of files to be <= n" +
  90.     "n-sizelimit <n>         Limit the total size to be <= n bytes" +
  91.     "n-delete                Delete the files existing in the dst but not in src" +
  92.     "n-mapredSslConf <f>     Filename of SSL configuration for mapper task" +
  93.     
  94.     "nnNOTE 1: if -overwrite or -update are set, each source URI is " +
  95.     "n      interpreted as an isomorphic update to an existing directory." +
  96.     "nFor example:" +
  97.     "nhadoop " + NAME + " -p -update "hdfs://A:8020/user/foo/bar" " +
  98.     ""hdfs://B:8020/user/foo/baz"n" +
  99.     "n     would update all descendants of 'baz' also in 'bar'; it would " +
  100.     "n     *not* update /user/foo/baz/bar" + 
  101.     "nnNOTE 2: The parameter <n> in -filelimit and -sizelimit can be " +
  102.     "n     specified with symbolic representation.  For examples," +
  103.     "n       1230k = 1230 * 1024 = 1259520" +
  104.     "n       891g = 891 * 1024^3 = 956703965184" +
  105.     
  106.     "n";
  107.   
  108.   private static final long BYTES_PER_MAP =  256 * 1024 * 1024;
  109.   private static final int MAX_MAPS_PER_NODE = 20;
  110.   private static final int SYNC_FILE_MAX = 10;
  111.   static enum Counter { COPY, SKIP, FAIL, BYTESCOPIED, BYTESEXPECTED }
  112.   static enum Options {
  113.     DELETE("-delete", NAME + ".delete"),
  114.     FILE_LIMIT("-filelimit", NAME + ".limit.file"),
  115.     SIZE_LIMIT("-sizelimit", NAME + ".limit.size"),
  116.     IGNORE_READ_FAILURES("-i", NAME + ".ignore.read.failures"),
  117.     PRESERVE_STATUS("-p", NAME + ".preserve.status"),
  118.     OVERWRITE("-overwrite", NAME + ".overwrite.always"),
  119.     UPDATE("-update", NAME + ".overwrite.ifnewer");
  120.     final String cmd, propertyname;
  121.     private Options(String cmd, String propertyname) {
  122.       this.cmd = cmd;
  123.       this.propertyname = propertyname;
  124.     }
  125.     
  126.     private long parseLong(String[] args, int offset) {
  127.       if (offset ==  args.length) {
  128.         throw new IllegalArgumentException("<n> not specified in " + cmd);
  129.       }
  130.       long n = StringUtils.TraditionalBinaryPrefix.string2long(args[offset]);
  131.       if (n <= 0) {
  132.         throw new IllegalArgumentException("n = " + n + " <= 0 in " + cmd);
  133.       }
  134.       return n;
  135.     }
  136.   }
  137.   static enum FileAttribute {
  138.     BLOCK_SIZE, REPLICATION, USER, GROUP, PERMISSION;
  139.     final char symbol;
  140.     private FileAttribute() {symbol = toString().toLowerCase().charAt(0);}
  141.     
  142.     static EnumSet<FileAttribute> parse(String s) {
  143.       if (s == null || s.length() == 0) {
  144.         return EnumSet.allOf(FileAttribute.class);
  145.       }
  146.       EnumSet<FileAttribute> set = EnumSet.noneOf(FileAttribute.class);
  147.       FileAttribute[] attributes = values();
  148.       for(char c : s.toCharArray()) {
  149.         int i = 0;
  150.         for(; i < attributes.length && c != attributes[i].symbol; i++);
  151.         if (i < attributes.length) {
  152.           if (!set.contains(attributes[i])) {
  153.             set.add(attributes[i]);
  154.           } else {
  155.             throw new IllegalArgumentException("There are more than one '"
  156.                 + attributes[i].symbol + "' in " + s); 
  157.           }
  158.         } else {
  159.           throw new IllegalArgumentException("'" + c + "' in " + s
  160.               + " is undefined.");
  161.         }
  162.       }
  163.       return set;
  164.     }
  165.   }
  166.   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
  167.   static final String DST_DIR_LABEL = NAME + ".dest.path";
  168.   static final String JOB_DIR_LABEL = NAME + ".job.dir";
  169.   static final String MAX_MAPS_LABEL = NAME + ".max.map.tasks";
  170.   static final String SRC_LIST_LABEL = NAME + ".src.list";
  171.   static final String SRC_COUNT_LABEL = NAME + ".src.count";
  172.   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
  173.   static final String DST_DIR_LIST_LABEL = NAME + ".dst.dir.list";
  174.   static final String BYTES_PER_MAP_LABEL = NAME + ".bytes.per.map";
  175.   static final String PRESERVE_STATUS_LABEL
  176.       = Options.PRESERVE_STATUS.propertyname + ".value";
  177.   private JobConf conf;
  178.   public void setConf(Configuration conf) {
  179.     if (conf instanceof JobConf) {
  180.       this.conf = (JobConf) conf;
  181.     } else {
  182.       this.conf = new JobConf(conf);
  183.     }
  184.   }
  185.   public Configuration getConf() {
  186.     return conf;
  187.   }
  188.   public DistCp(Configuration conf) {
  189.     setConf(conf);
  190.   }
  191.   /**
  192.    * An input/output pair of filenames.
  193.    */
  194.   static class FilePair implements Writable {
  195.     FileStatus input = new FileStatus();
  196.     String output;
  197.     FilePair() { }
  198.     FilePair(FileStatus input, String output) {
  199.       this.input = input;
  200.       this.output = output;
  201.     }
  202.     public void readFields(DataInput in) throws IOException {
  203.       input.readFields(in);
  204.       output = Text.readString(in);
  205.     }
  206.     public void write(DataOutput out) throws IOException {
  207.       input.write(out);
  208.       Text.writeString(out, output);
  209.     }
  210.     public String toString() {
  211.       return input + " : " + output;
  212.     }
  213.   }
  214.   /**
  215.    * InputFormat of a distcp job responsible for generating splits of the src
  216.    * file list.
  217.    */
  218.   static class CopyInputFormat implements InputFormat<Text, Text> {
  219.     /**
  220.      * Produce splits such that each is no greater than the quotient of the
  221.      * total size and the number of splits requested.
  222.      * @param job The handle to the JobConf object
  223.      * @param numSplits Number of splits requested
  224.      */
  225.     public InputSplit[] getSplits(JobConf job, int numSplits)
  226.         throws IOException {
  227.       int cnfiles = job.getInt(SRC_COUNT_LABEL, -1);
  228.       long cbsize = job.getLong(TOTAL_SIZE_LABEL, -1);
  229.       String srcfilelist = job.get(SRC_LIST_LABEL, "");
  230.       if (cnfiles < 0 || cbsize < 0 || "".equals(srcfilelist)) {
  231.         throw new RuntimeException("Invalid metadata: #files(" + cnfiles +
  232.                                    ") total_size(" + cbsize + ") listuri(" +
  233.                                    srcfilelist + ")");
  234.       }
  235.       Path src = new Path(srcfilelist);
  236.       FileSystem fs = src.getFileSystem(job);
  237.       FileStatus srcst = fs.getFileStatus(src);
  238.       ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  239.       LongWritable key = new LongWritable();
  240.       FilePair value = new FilePair();
  241.       final long targetsize = cbsize / numSplits;
  242.       long pos = 0L;
  243.       long last = 0L;
  244.       long acc = 0L;
  245.       long cbrem = srcst.getLen();
  246.       SequenceFile.Reader sl = null;
  247.       try {
  248.         sl = new SequenceFile.Reader(fs, src, job);
  249.         for (; sl.next(key, value); last = sl.getPosition()) {
  250.           // if adding this split would put this split past the target size,
  251.           // cut the last split and put this next file in the next split.
  252.           if (acc + key.get() > targetsize && acc != 0) {
  253.             long splitsize = last - pos;
  254.             splits.add(new FileSplit(src, pos, splitsize, (String[])null));
  255.             cbrem -= splitsize;
  256.             pos = last;
  257.             acc = 0L;
  258.           }
  259.           acc += key.get();
  260.         }
  261.       }
  262.       finally {
  263.         checkAndClose(sl);
  264.       }
  265.       if (cbrem != 0) {
  266.         splits.add(new FileSplit(src, pos, cbrem, (String[])null));
  267.       }
  268.       return splits.toArray(new FileSplit[splits.size()]);
  269.     }
  270.     /**
  271.      * Returns a reader for this split of the src file list.
  272.      */
  273.     public RecordReader<Text, Text> getRecordReader(InputSplit split,
  274.         JobConf job, Reporter reporter) throws IOException {
  275.       return new SequenceFileRecordReader<Text, Text>(job, (FileSplit)split);
  276.     }
  277.   }
  278.   /**
  279.    * FSCopyFilesMapper: The mapper for copying files between FileSystems.
  280.    */
  281.   static class CopyFilesMapper
  282.       implements Mapper<LongWritable, FilePair, WritableComparable<?>, Text> {
  283.     // config
  284.     private int sizeBuf = 128 * 1024;
  285.     private FileSystem destFileSys = null;
  286.     private boolean ignoreReadFailures;
  287.     private boolean preserve_status;
  288.     private EnumSet<FileAttribute> preseved;
  289.     private boolean overwrite;
  290.     private boolean update;
  291.     private Path destPath = null;
  292.     private byte[] buffer = null;
  293.     private JobConf job;
  294.     // stats
  295.     private int failcount = 0;
  296.     private int skipcount = 0;
  297.     private int copycount = 0;
  298.     private String getCountString() {
  299.       return "Copied: " + copycount + " Skipped: " + skipcount
  300.           + " Failed: " + failcount;
  301.     }
  302.     private void updateStatus(Reporter reporter) {
  303.       reporter.setStatus(getCountString());
  304.     }
  305.     /**
  306.      * Return true if dst should be replaced by src and the update flag is set.
  307.      * Right now, this merely checks that the src and dst len are not equal. 
  308.      * This should be improved on once modification times, CRCs, etc. can
  309.      * be meaningful in this context.
  310.      * @throws IOException 
  311.      */
  312.     private boolean needsUpdate(FileStatus srcstatus,
  313.         FileSystem dstfs, Path dstpath) throws IOException {
  314.       return update && !sameFile(srcstatus.getPath().getFileSystem(job),
  315.           srcstatus, dstfs, dstpath);
  316.     }
  317.     
  318.     private FSDataOutputStream create(Path f, Reporter reporter,
  319.         FileStatus srcstat) throws IOException {
  320.       if (destFileSys.exists(f)) {
  321.         destFileSys.delete(f, false);
  322.       }
  323.       if (!preserve_status) {
  324.         return destFileSys.create(f, true, sizeBuf, reporter);
  325.       }
  326.       FsPermission permission = preseved.contains(FileAttribute.PERMISSION)?
  327.           srcstat.getPermission(): null;
  328.       short replication = preseved.contains(FileAttribute.REPLICATION)?
  329.           srcstat.getReplication(): destFileSys.getDefaultReplication();
  330.       long blockSize = preseved.contains(FileAttribute.BLOCK_SIZE)?
  331.           srcstat.getBlockSize(): destFileSys.getDefaultBlockSize();
  332.       return destFileSys.create(f, permission, true, sizeBuf, replication,
  333.           blockSize, reporter);
  334.     }
  335.     /**
  336.      * Copy a file to a destination.
  337.      * @param srcstat src path and metadata
  338.      * @param dstpath dst path
  339.      * @param reporter
  340.      */
  341.     private void copy(FileStatus srcstat, Path relativedst,
  342.         OutputCollector<WritableComparable<?>, Text> outc, Reporter reporter)
  343.         throws IOException {
  344.       Path absdst = new Path(destPath, relativedst);
  345.       int totfiles = job.getInt(SRC_COUNT_LABEL, -1);
  346.       assert totfiles >= 0 : "Invalid file count " + totfiles;
  347.       // if a directory, ensure created even if empty
  348.       if (srcstat.isDir()) {
  349.         if (destFileSys.exists(absdst)) {
  350.           if (!destFileSys.getFileStatus(absdst).isDir()) {
  351.             throw new IOException("Failed to mkdirs: " + absdst+" is a file.");
  352.           }
  353.         }
  354.         else if (!destFileSys.mkdirs(absdst)) {
  355.           throw new IOException("Failed to mkdirs " + absdst);
  356.         }
  357.         // TODO: when modification times can be set, directories should be
  358.         // emitted to reducers so they might be preserved. Also, mkdirs does
  359.         // not currently return an error when the directory already exists;
  360.         // if this changes, all directory work might as well be done in reduce
  361.         return;
  362.       }
  363.       if (destFileSys.exists(absdst) && !overwrite
  364.           && !needsUpdate(srcstat, destFileSys, absdst)) {
  365.         outc.collect(null, new Text("SKIP: " + srcstat.getPath()));
  366.         ++skipcount;
  367.         reporter.incrCounter(Counter.SKIP, 1);
  368.         updateStatus(reporter);
  369.         return;
  370.       }
  371.       Path tmpfile = new Path(job.get(TMP_DIR_LABEL), relativedst);
  372.       long cbcopied = 0L;
  373.       FSDataInputStream in = null;
  374.       FSDataOutputStream out = null;
  375.       try {
  376.         // open src file
  377.         in = srcstat.getPath().getFileSystem(job).open(srcstat.getPath());
  378.         reporter.incrCounter(Counter.BYTESEXPECTED, srcstat.getLen());
  379.         // open tmp file
  380.         out = create(tmpfile, reporter, srcstat);
  381.         // copy file
  382.         for(int cbread; (cbread = in.read(buffer)) >= 0; ) {
  383.           out.write(buffer, 0, cbread);
  384.           cbcopied += cbread;
  385.           reporter.setStatus(
  386.               String.format("%.2f ", cbcopied*100.0/srcstat.getLen())
  387.               + absdst + " [ " +
  388.               StringUtils.humanReadableInt(cbcopied) + " / " +
  389.               StringUtils.humanReadableInt(srcstat.getLen()) + " ]");
  390.         }
  391.       } finally {
  392.         checkAndClose(in);
  393.         checkAndClose(out);
  394.       }
  395.       if (cbcopied != srcstat.getLen()) {
  396.         throw new IOException("File size not matched: copied "
  397.             + bytesString(cbcopied) + " to tmpfile (=" + tmpfile
  398.             + ") but expected " + bytesString(srcstat.getLen()) 
  399.             + " from " + srcstat.getPath());        
  400.       }
  401.       else {
  402.         if (totfiles == 1) {
  403.           // Copying a single file; use dst path provided by user as destination
  404.           // rather than destination directory, if a file
  405.           Path dstparent = absdst.getParent();
  406.           if (!(destFileSys.exists(dstparent) &&
  407.                 destFileSys.getFileStatus(dstparent).isDir())) {
  408.             absdst = dstparent;
  409.           }
  410.         }
  411.         if (destFileSys.exists(absdst) &&
  412.             destFileSys.getFileStatus(absdst).isDir()) {
  413.           throw new IOException(absdst + " is a directory");
  414.         }
  415.         if (!destFileSys.mkdirs(absdst.getParent())) {
  416.           throw new IOException("Failed to craete parent dir: " + absdst.getParent());
  417.         }
  418.         rename(tmpfile, absdst);
  419.         FileStatus dststat = destFileSys.getFileStatus(absdst);
  420.         if (dststat.getLen() != srcstat.getLen()) {
  421.           destFileSys.delete(absdst, false);
  422.           throw new IOException("File size not matched: copied "
  423.               + bytesString(dststat.getLen()) + " to dst (=" + absdst
  424.               + ") but expected " + bytesString(srcstat.getLen()) 
  425.               + " from " + srcstat.getPath());        
  426.         } 
  427.         updatePermissions(srcstat, dststat);
  428.       }
  429.       // report at least once for each file
  430.       ++copycount;
  431.       reporter.incrCounter(Counter.BYTESCOPIED, cbcopied);
  432.       reporter.incrCounter(Counter.COPY, 1);
  433.       updateStatus(reporter);
  434.     }
  435.     
  436.     /** rename tmp to dst, delete dst if already exists */
  437.     private void rename(Path tmp, Path dst) throws IOException {
  438.       try {
  439.         if (destFileSys.exists(dst)) {
  440.           destFileSys.delete(dst, true);
  441.         }
  442.         if (!destFileSys.rename(tmp, dst)) {
  443.           throw new IOException();
  444.         }
  445.       }
  446.       catch(IOException cause) {
  447.         throw (IOException)new IOException("Fail to rename tmp file (=" + tmp 
  448.             + ") to destination file (=" + dst + ")").initCause(cause);
  449.       }
  450.     }
  451.     private void updatePermissions(FileStatus src, FileStatus dst
  452.         ) throws IOException {
  453.       if (preserve_status) {
  454.         DistCp.updatePermissions(src, dst, preseved, destFileSys);
  455.       }
  456.     }
  457.     static String bytesString(long b) {
  458.       return b + " bytes (" + StringUtils.humanReadableInt(b) + ")";
  459.     }
  460.     /** Mapper configuration.
  461.      * Extracts source and destination file system, as well as
  462.      * top-level paths on source and destination directories.
  463.      * Gets the named file systems, to be used later in map.
  464.      */
  465.     public void configure(JobConf job)
  466.     {
  467.       destPath = new Path(job.get(DST_DIR_LABEL, "/"));
  468.       try {
  469.         destFileSys = destPath.getFileSystem(job);
  470.       } catch (IOException ex) {
  471.         throw new RuntimeException("Unable to get the named file system.", ex);
  472.       }
  473.       sizeBuf = job.getInt("copy.buf.size", 128 * 1024);
  474.       buffer = new byte[sizeBuf];
  475.       ignoreReadFailures = job.getBoolean(Options.IGNORE_READ_FAILURES.propertyname, false);
  476.       preserve_status = job.getBoolean(Options.PRESERVE_STATUS.propertyname, false);
  477.       if (preserve_status) {
  478.         preseved = FileAttribute.parse(job.get(PRESERVE_STATUS_LABEL));
  479.       }
  480.       update = job.getBoolean(Options.UPDATE.propertyname, false);
  481.       overwrite = !update && job.getBoolean(Options.OVERWRITE.propertyname, false);
  482.       this.job = job;
  483.     }
  484.     /** Map method. Copies one file from source file system to destination.
  485.      * @param key src len
  486.      * @param value FilePair (FileStatus src, Path dst)
  487.      * @param out Log of failed copies
  488.      * @param reporter
  489.      */
  490.     public void map(LongWritable key,
  491.                     FilePair value,
  492.                     OutputCollector<WritableComparable<?>, Text> out,
  493.                     Reporter reporter) throws IOException {
  494.       final FileStatus srcstat = value.input;
  495.       final Path relativedst = new Path(value.output);
  496.       try {
  497.         copy(srcstat, relativedst, out, reporter);
  498.       } catch (IOException e) {
  499.         ++failcount;
  500.         reporter.incrCounter(Counter.FAIL, 1);
  501.         updateStatus(reporter);
  502.         final String sfailure = "FAIL " + relativedst + " : " +
  503.                           StringUtils.stringifyException(e);
  504.         out.collect(null, new Text(sfailure));
  505.         LOG.info(sfailure);
  506.         try {
  507.           for (int i = 0; i < 3; ++i) {
  508.             try {
  509.               final Path tmp = new Path(job.get(TMP_DIR_LABEL), relativedst);
  510.               if (destFileSys.delete(tmp, true))
  511.                 break;
  512.             } catch (Throwable ex) {
  513.               // ignore, we are just cleaning up
  514.               LOG.debug("Ignoring cleanup exception", ex);
  515.             }
  516.             // update status, so we don't get timed out
  517.             updateStatus(reporter);
  518.             Thread.sleep(3 * 1000);
  519.           }
  520.         } catch (InterruptedException inte) {
  521.           throw (IOException)new IOException().initCause(inte);
  522.         }
  523.       } finally {
  524.         updateStatus(reporter);
  525.       }
  526.     }
  527.     public void close() throws IOException {
  528.       if (0 == failcount || ignoreReadFailures) {
  529.         return;
  530.       }
  531.       throw new IOException(getCountString());
  532.     }
  533.   }
  534.   private static List<Path> fetchFileList(Configuration conf, Path srcList)
  535.       throws IOException {
  536.     List<Path> result = new ArrayList<Path>();
  537.     FileSystem fs = srcList.getFileSystem(conf);
  538.     BufferedReader input = null;
  539.     try {
  540.       input = new BufferedReader(new InputStreamReader(fs.open(srcList)));
  541.       String line = input.readLine();
  542.       while (line != null) {
  543.         result.add(new Path(line));
  544.         line = input.readLine();
  545.       }
  546.     } finally {
  547.       checkAndClose(input);
  548.     }
  549.     return result;
  550.   }
  551.   @Deprecated
  552.   public static void copy(Configuration conf, String srcPath,
  553.                           String destPath, Path logPath,
  554.                           boolean srcAsList, boolean ignoreReadFailures)
  555.       throws IOException {
  556.     final Path src = new Path(srcPath);
  557.     List<Path> tmp = new ArrayList<Path>();
  558.     if (srcAsList) {
  559.       tmp.addAll(fetchFileList(conf, src));
  560.     } else {
  561.       tmp.add(src);
  562.     }
  563.     EnumSet<Options> flags = ignoreReadFailures
  564.       ? EnumSet.of(Options.IGNORE_READ_FAILURES)
  565.       : EnumSet.noneOf(Options.class);
  566.     final Path dst = new Path(destPath);
  567.     copy(conf, new Arguments(tmp, dst, logPath, flags, null,
  568.         Long.MAX_VALUE, Long.MAX_VALUE, null));
  569.   }
  570.   /** Sanity check for srcPath */
  571.   private static void checkSrcPath(Configuration conf, List<Path> srcPaths
  572.       ) throws IOException {
  573.     List<IOException> rslt = new ArrayList<IOException>();
  574.     for (Path p : srcPaths) {
  575.       FileSystem fs = p.getFileSystem(conf);
  576.       if (!fs.exists(p)) {
  577.         rslt.add(new IOException("Input source " + p + " does not exist."));
  578.       }
  579.     }
  580.     if (!rslt.isEmpty()) {
  581.       throw new InvalidInputException(rslt);
  582.     }
  583.   }
  584.   /**
  585.    * Driver to copy srcPath to destPath depending on required protocol.
  586.    * @param args arguments
  587.    */
  588.   static void copy(final Configuration conf, final Arguments args
  589.       ) throws IOException {
  590.     LOG.info("srcPaths=" + args.srcs);
  591.     LOG.info("destPath=" + args.dst);
  592.     checkSrcPath(conf, args.srcs);
  593.     JobConf job = createJobConf(conf);
  594.     if (args.preservedAttributes != null) {
  595.       job.set(PRESERVE_STATUS_LABEL, args.preservedAttributes);
  596.     }
  597.     if (args.mapredSslConf != null) {
  598.       job.set("dfs.https.client.keystore.resource", args.mapredSslConf);
  599.     }
  600.     
  601.     //Initialize the mapper
  602.     try {
  603.       setup(conf, job, args);
  604.       JobClient.runJob(job);
  605.       finalize(conf, job, args.dst, args.preservedAttributes);
  606.     } finally {
  607.       //delete tmp
  608.       fullyDelete(job.get(TMP_DIR_LABEL), job);
  609.       //delete jobDirectory
  610.       fullyDelete(job.get(JOB_DIR_LABEL), job);
  611.     }
  612.   }
  613.   private static void updatePermissions(FileStatus src, FileStatus dst,
  614.       EnumSet<FileAttribute> preseved, FileSystem destFileSys
  615.       ) throws IOException {
  616.     String owner = null;
  617.     String group = null;
  618.     if (preseved.contains(FileAttribute.USER)
  619.         && !src.getOwner().equals(dst.getOwner())) {
  620.       owner = src.getOwner();
  621.     }
  622.     if (preseved.contains(FileAttribute.GROUP)
  623.         && !src.getGroup().equals(dst.getGroup())) {
  624.       group = src.getGroup();
  625.     }
  626.     if (owner != null || group != null) {
  627.       destFileSys.setOwner(dst.getPath(), owner, group);
  628.     }
  629.     if (preseved.contains(FileAttribute.PERMISSION)
  630.         && !src.getPermission().equals(dst.getPermission())) {
  631.       destFileSys.setPermission(dst.getPath(), src.getPermission());
  632.     }
  633.   }
  634.   static private void finalize(Configuration conf, JobConf jobconf,
  635.       final Path destPath, String presevedAttributes) throws IOException {
  636.     if (presevedAttributes == null) {
  637.       return;
  638.     }
  639.     EnumSet<FileAttribute> preseved = FileAttribute.parse(presevedAttributes);
  640.     if (!preseved.contains(FileAttribute.USER)
  641.         && !preseved.contains(FileAttribute.GROUP)
  642.         && !preseved.contains(FileAttribute.PERMISSION)) {
  643.       return;
  644.     }
  645.     FileSystem dstfs = destPath.getFileSystem(conf);
  646.     Path dstdirlist = new Path(jobconf.get(DST_DIR_LIST_LABEL));
  647.     SequenceFile.Reader in = null;
  648.     try {
  649.       in = new SequenceFile.Reader(dstdirlist.getFileSystem(jobconf),
  650.           dstdirlist, jobconf);
  651.       Text dsttext = new Text();
  652.       FilePair pair = new FilePair(); 
  653.       for(; in.next(dsttext, pair); ) {
  654.         Path absdst = new Path(destPath, pair.output);
  655.         updatePermissions(pair.input, dstfs.getFileStatus(absdst),
  656.             preseved, dstfs);
  657.       }
  658.     } finally {
  659.       checkAndClose(in);
  660.     }
  661.   }
  662.   static private class Arguments {
  663.     final List<Path> srcs;
  664.     final Path dst;
  665.     final Path log;
  666.     final EnumSet<Options> flags;
  667.     final String preservedAttributes;
  668.     final long filelimit;
  669.     final long sizelimit;
  670.     final String mapredSslConf;
  671.     
  672.     /**
  673.      * Arguments for distcp
  674.      * @param srcs List of source paths
  675.      * @param dst Destination path
  676.      * @param log Log output directory
  677.      * @param flags Command-line flags
  678.      * @param preservedAttributes Preserved attributes 
  679.      * @param filelimit File limit
  680.      * @param sizelimit Size limit
  681.      */
  682.     Arguments(List<Path> srcs, Path dst, Path log,
  683.         EnumSet<Options> flags, String preservedAttributes,
  684.         long filelimit, long sizelimit, String mapredSslConf) {
  685.       this.srcs = srcs;
  686.       this.dst = dst;
  687.       this.log = log;
  688.       this.flags = flags;
  689.       this.preservedAttributes = preservedAttributes;
  690.       this.filelimit = filelimit;
  691.       this.sizelimit = sizelimit;
  692.       this.mapredSslConf = mapredSslConf;
  693.       
  694.       if (LOG.isTraceEnabled()) {
  695.         LOG.trace("this = " + this);
  696.       }
  697.     }
  698.     static Arguments valueOf(String[] args, Configuration conf
  699.         ) throws IOException {
  700.       List<Path> srcs = new ArrayList<Path>();
  701.       Path dst = null;
  702.       Path log = null;
  703.       EnumSet<Options> flags = EnumSet.noneOf(Options.class);
  704.       String presevedAttributes = null;
  705.       String mapredSslConf = null;
  706.       long filelimit = Long.MAX_VALUE;
  707.       long sizelimit = Long.MAX_VALUE;
  708.       for (int idx = 0; idx < args.length; idx++) {
  709.         Options[] opt = Options.values();
  710.         int i = 0;
  711.         for(; i < opt.length && !args[idx].startsWith(opt[i].cmd); i++);
  712.         if (i < opt.length) {
  713.           flags.add(opt[i]);
  714.           if (opt[i] == Options.PRESERVE_STATUS) {
  715.             presevedAttributes =  args[idx].substring(2);         
  716.             FileAttribute.parse(presevedAttributes); //validation
  717.           }
  718.           else if (opt[i] == Options.FILE_LIMIT) {
  719.             filelimit = Options.FILE_LIMIT.parseLong(args, ++idx);
  720.           }
  721.           else if (opt[i] == Options.SIZE_LIMIT) {
  722.             sizelimit = Options.SIZE_LIMIT.parseLong(args, ++idx);
  723.           }
  724.         } else if ("-f".equals(args[idx])) {
  725.           if (++idx ==  args.length) {
  726.             throw new IllegalArgumentException("urilist_uri not specified in -f");
  727.           }
  728.           srcs.addAll(fetchFileList(conf, new Path(args[idx])));
  729.         } else if ("-log".equals(args[idx])) {
  730.           if (++idx ==  args.length) {
  731.             throw new IllegalArgumentException("logdir not specified in -log");
  732.           }
  733.           log = new Path(args[idx]);
  734.         } else if ("-mapredSslConf".equals(args[idx])) {
  735.           if (++idx ==  args.length) {
  736.             throw new IllegalArgumentException("ssl conf file not specified in -mapredSslConf");
  737.           }
  738.           mapredSslConf = args[idx];
  739.         } else if ("-m".equals(args[idx])) {
  740.           if (++idx == args.length) {
  741.             throw new IllegalArgumentException("num_maps not specified in -m");
  742.           }
  743.           try {
  744.             conf.setInt(MAX_MAPS_LABEL, Integer.valueOf(args[idx]));
  745.           } catch (NumberFormatException e) {
  746.             throw new IllegalArgumentException("Invalid argument to -m: " +
  747.                                                args[idx]);
  748.           }
  749.         } else if ('-' == args[idx].codePointAt(0)) {
  750.           throw new IllegalArgumentException("Invalid switch " + args[idx]);
  751.         } else if (idx == args.length -1) {
  752.           dst = new Path(args[idx]);
  753.         } else {
  754.           srcs.add(new Path(args[idx]));
  755.         }
  756.       }
  757.       // mandatory command-line parameters
  758.       if (srcs.isEmpty() || dst == null) {
  759.         throw new IllegalArgumentException("Missing "
  760.             + (dst == null ? "dst path" : "src"));
  761.       }
  762.       // incompatible command-line flags
  763.       final boolean isOverwrite = flags.contains(Options.OVERWRITE);
  764.       final boolean isUpdate = flags.contains(Options.UPDATE);
  765.       final boolean isDelete = flags.contains(Options.DELETE);
  766.       if (isOverwrite && isUpdate) {
  767.         throw new IllegalArgumentException("Conflicting overwrite policies");
  768.       }
  769.       if (isDelete && !isOverwrite && !isUpdate) {
  770.         throw new IllegalArgumentException(Options.DELETE.cmd
  771.             + " must be specified with " + Options.OVERWRITE + " or "
  772.             + Options.UPDATE + ".");
  773.       }
  774.       return new Arguments(srcs, dst, log, flags, presevedAttributes,
  775.           filelimit, sizelimit, mapredSslConf);
  776.     }
  777.     
  778.     /** {@inheritDoc} */
  779.     public String toString() {
  780.       return getClass().getName() + "{"
  781.           + "n  srcs = " + srcs 
  782.           + "n  dst = " + dst 
  783.           + "n  log = " + log 
  784.           + "n  flags = " + flags
  785.           + "n  preservedAttributes = " + preservedAttributes 
  786.           + "n  filelimit = " + filelimit 
  787.           + "n  sizelimit = " + sizelimit
  788.           + "n  mapredSslConf = " + mapredSslConf
  789.           + "n}"; 
  790.     }
  791.   }
  792.   /**
  793.    * This is the main driver for recursively copying directories
  794.    * across file systems. It takes at least two cmdline parameters. A source
  795.    * URL and a destination URL. It then essentially does an "ls -lR" on the
  796.    * source URL, and writes the output in a round-robin manner to all the map
  797.    * input files. The mapper actually copies the files allotted to it. The
  798.    * reduce is empty.
  799.    */
  800.   public int run(String[] args) {
  801.     try {
  802.       copy(conf, Arguments.valueOf(args, conf));
  803.       return 0;
  804.     } catch (IllegalArgumentException e) {
  805.       System.err.println(StringUtils.stringifyException(e) + "n" + usage);
  806.       ToolRunner.printGenericCommandUsage(System.err);
  807.       return -1;
  808.     } catch (DuplicationException e) {
  809.       System.err.println(StringUtils.stringifyException(e));
  810.       return DuplicationException.ERROR_CODE;
  811.     } catch (RemoteException e) {
  812.       final IOException unwrapped = e.unwrapRemoteException(
  813.           FileNotFoundException.class, 
  814.           AccessControlException.class,
  815.           QuotaExceededException.class);
  816.       System.err.println(StringUtils.stringifyException(unwrapped));
  817.       return -3;
  818.     } catch (Exception e) {
  819.       System.err.println("With failures, global counters are inaccurate; " +
  820.           "consider running with -i");
  821.       System.err.println("Copy failed: " + StringUtils.stringifyException(e));
  822.       return -999;
  823.     }
  824.   }
  825.   public static void main(String[] args) throws Exception {
  826.     JobConf job = new JobConf(DistCp.class);
  827.     DistCp distcp = new DistCp(job);
  828.     int res = ToolRunner.run(distcp, args);
  829.     System.exit(res);
  830.   }
  831.   /**
  832.    * Make a path relative with respect to a root path.
  833.    * absPath is always assumed to descend from root.
  834.    * Otherwise returned path is null.
  835.    */
  836.   static String makeRelative(Path root, Path absPath) {
  837.     if (!absPath.isAbsolute()) {
  838.       throw new IllegalArgumentException("!absPath.isAbsolute(), absPath="
  839.           + absPath);
  840.     }
  841.     String p = absPath.toUri().getPath();
  842.     StringTokenizer pathTokens = new StringTokenizer(p, "/");
  843.     for(StringTokenizer rootTokens = new StringTokenizer(
  844.         root.toUri().getPath(), "/"); rootTokens.hasMoreTokens(); ) {
  845.       if (!rootTokens.nextToken().equals(pathTokens.nextToken())) {
  846.         return null;
  847.       }
  848.     }
  849.     StringBuilder sb = new StringBuilder();
  850.     for(; pathTokens.hasMoreTokens(); ) {
  851.       sb.append(pathTokens.nextToken());
  852.       if (pathTokens.hasMoreTokens()) { sb.append(Path.SEPARATOR); }
  853.     }
  854.     return sb.length() == 0? ".": sb.toString();
  855.   }
  856.   /**
  857.    * Calculate how many maps to run.
  858.    * Number of maps is bounded by a minimum of the cumulative size of the
  859.    * copy / (distcp.bytes.per.map, default BYTES_PER_MAP or -m on the
  860.    * command line) and at most (distcp.max.map.tasks, default
  861.    * MAX_MAPS_PER_NODE * nodes in the cluster).
  862.    * @param totalBytes Count of total bytes for job
  863.    * @param job The job to configure
  864.    * @return Count of maps to run.
  865.    */
  866.   private static void setMapCount(long totalBytes, JobConf job) 
  867.       throws IOException {
  868.     int numMaps =
  869.       (int)(totalBytes / job.getLong(BYTES_PER_MAP_LABEL, BYTES_PER_MAP));
  870.     numMaps = Math.min(numMaps, 
  871.         job.getInt(MAX_MAPS_LABEL, MAX_MAPS_PER_NODE *
  872.           new JobClient(job).getClusterStatus().getTaskTrackers()));
  873.     job.setNumMapTasks(Math.max(numMaps, 1));
  874.   }
  875.   /** Fully delete dir */
  876.   static void fullyDelete(String dir, Configuration conf) throws IOException {
  877.     if (dir != null) {
  878.       Path tmp = new Path(dir);
  879.       tmp.getFileSystem(conf).delete(tmp, true);
  880.     }
  881.   }
  882.   //Job configuration
  883.   private static JobConf createJobConf(Configuration conf) {
  884.     JobConf jobconf = new JobConf(conf, DistCp.class);
  885.     jobconf.setJobName(NAME);
  886.     // turn off speculative execution, because DFS doesn't handle
  887.     // multiple writers to the same file.
  888.     jobconf.setMapSpeculativeExecution(false);
  889.     jobconf.setInputFormat(CopyInputFormat.class);
  890.     jobconf.setOutputKeyClass(Text.class);
  891.     jobconf.setOutputValueClass(Text.class);
  892.     jobconf.setMapperClass(CopyFilesMapper.class);
  893.     jobconf.setNumReduceTasks(0);
  894.     return jobconf;
  895.   }
  896.   private static final Random RANDOM = new Random();
  897.   public static String getRandomId() {
  898.     return Integer.toString(RANDOM.nextInt(Integer.MAX_VALUE), 36);
  899.   }
  900.   /**
  901.    * Initialize DFSCopyFileMapper specific job-configuration.
  902.    * @param conf : The dfs/mapred configuration.
  903.    * @param jobConf : The handle to the jobConf object to be initialized.
  904.    * @param args Arguments
  905.    */
  906.   private static void setup(Configuration conf, JobConf jobConf,
  907.                             final Arguments args)
  908.       throws IOException {
  909.     jobConf.set(DST_DIR_LABEL, args.dst.toUri().toString());
  910.     //set boolean values
  911.     final boolean update = args.flags.contains(Options.UPDATE);
  912.     final boolean overwrite = !update && args.flags.contains(Options.OVERWRITE);
  913.     jobConf.setBoolean(Options.UPDATE.propertyname, update);
  914.     jobConf.setBoolean(Options.OVERWRITE.propertyname, overwrite);
  915.     jobConf.setBoolean(Options.IGNORE_READ_FAILURES.propertyname,
  916.         args.flags.contains(Options.IGNORE_READ_FAILURES));
  917.     jobConf.setBoolean(Options.PRESERVE_STATUS.propertyname,
  918.         args.flags.contains(Options.PRESERVE_STATUS));
  919.     final String randomId = getRandomId();
  920.     JobClient jClient = new JobClient(jobConf);
  921.     Path jobDirectory = new Path(jClient.getSystemDir(), NAME + "_" + randomId);
  922.     jobConf.set(JOB_DIR_LABEL, jobDirectory.toString());
  923.     FileSystem dstfs = args.dst.getFileSystem(conf);
  924.     boolean dstExists = dstfs.exists(args.dst);
  925.     boolean dstIsDir = false;
  926.     if (dstExists) {
  927.       dstIsDir = dstfs.getFileStatus(args.dst).isDir();
  928.     }
  929.     // default logPath
  930.     Path logPath = args.log; 
  931.     if (logPath == null) {
  932.       String filename = "_distcp_logs_" + randomId;
  933.       if (!dstExists || !dstIsDir) {
  934.         Path parent = args.dst.getParent();
  935.         if (!dstfs.exists(parent)) {
  936.           dstfs.mkdirs(parent);
  937.         }
  938.         logPath = new Path(parent, filename);
  939.       } else {
  940.         logPath = new Path(args.dst, filename);
  941.       }
  942.     }
  943.     FileOutputFormat.setOutputPath(jobConf, logPath);
  944.     // create src list, dst list
  945.     FileSystem jobfs = jobDirectory.getFileSystem(jobConf);
  946.     Path srcfilelist = new Path(jobDirectory, "_distcp_src_files");
  947.     jobConf.set(SRC_LIST_LABEL, srcfilelist.toString());
  948.     SequenceFile.Writer src_writer = SequenceFile.createWriter(jobfs, jobConf,
  949.         srcfilelist, LongWritable.class, FilePair.class,
  950.         SequenceFile.CompressionType.NONE);
  951.     Path dstfilelist = new Path(jobDirectory, "_distcp_dst_files");
  952.     SequenceFile.Writer dst_writer = SequenceFile.createWriter(jobfs, jobConf,
  953.         dstfilelist, Text.class, Text.class,
  954.         SequenceFile.CompressionType.NONE);
  955.     Path dstdirlist = new Path(jobDirectory, "_distcp_dst_dirs");
  956.     jobConf.set(DST_DIR_LIST_LABEL, dstdirlist.toString());
  957.     SequenceFile.Writer dir_writer = SequenceFile.createWriter(jobfs, jobConf,
  958.         dstdirlist, Text.class, FilePair.class,
  959.         SequenceFile.CompressionType.NONE);
  960.     // handle the case where the destination directory doesn't exist
  961.     // and we've only a single src directory OR we're updating/overwriting
  962.     // the contents of the destination directory.
  963.     final boolean special =
  964.       (args.srcs.size() == 1 && !dstExists) || update || overwrite;
  965.     int srcCount = 0, cnsyncf = 0, dirsyn = 0;
  966.     long fileCount = 0L, byteCount = 0L, cbsyncs = 0L;
  967.     try {
  968.       for(Iterator<Path> srcItr = args.srcs.iterator(); srcItr.hasNext(); ) {
  969.         final Path src = srcItr.next();
  970.         FileSystem srcfs = src.getFileSystem(conf);
  971.         FileStatus srcfilestat = srcfs.getFileStatus(src);
  972.         Path root = special && srcfilestat.isDir()? src: src.getParent();
  973.         if (srcfilestat.isDir()) {
  974.           ++srcCount;
  975.         }
  976.         Stack<FileStatus> pathstack = new Stack<FileStatus>();
  977.         for(pathstack.push(srcfilestat); !pathstack.empty(); ) {
  978.           FileStatus cur = pathstack.pop();
  979.           FileStatus[] children = srcfs.listStatus(cur.getPath());
  980.           for(int i = 0; i < children.length; i++) {
  981.             boolean skipfile = false;
  982.             final FileStatus child = children[i]; 
  983.             final String dst = makeRelative(root, child.getPath());
  984.             ++srcCount;
  985.             if (child.isDir()) {
  986.               pathstack.push(child);
  987.             }
  988.             else {
  989.               //skip file if the src and the dst files are the same.
  990.               skipfile = update && sameFile(srcfs, child, dstfs, new Path(args.dst, dst));
  991.               //skip file if it exceed file limit or size limit
  992.               skipfile |= fileCount == args.filelimit
  993.                           || byteCount + child.getLen() > args.sizelimit; 
  994.               if (!skipfile) {
  995.                 ++fileCount;
  996.                 byteCount += child.getLen();
  997.                 if (LOG.isTraceEnabled()) {
  998.                   LOG.trace("adding file " + child.getPath());
  999.                 }
  1000.                 ++cnsyncf;
  1001.                 cbsyncs += child.getLen();
  1002.                 if (cnsyncf > SYNC_FILE_MAX || cbsyncs > BYTES_PER_MAP) {
  1003.                   src_writer.sync();
  1004.                   dst_writer.sync();
  1005.                   cnsyncf = 0;
  1006.                   cbsyncs = 0L;
  1007.                 }
  1008.               }
  1009.             }
  1010.             if (!skipfile) {
  1011.               src_writer.append(new LongWritable(child.isDir()? 0: child.getLen()),
  1012.                   new FilePair(child, dst));
  1013.             }
  1014.             dst_writer.append(new Text(dst),
  1015.                 new Text(child.getPath().toString()));
  1016.           }
  1017.           if (cur.isDir()) {
  1018.             String dst = makeRelative(root, cur.getPath());
  1019.             dir_writer.append(new Text(dst), new FilePair(cur, dst));
  1020.             if (++dirsyn > SYNC_FILE_MAX) {
  1021.               dirsyn = 0;
  1022.               dir_writer.sync();                
  1023.             }
  1024.           }
  1025.         }
  1026.       }
  1027.     } finally {
  1028.       checkAndClose(src_writer);
  1029.       checkAndClose(dst_writer);
  1030.       checkAndClose(dir_writer);
  1031.     }
  1032.     FileStatus dststatus = null;
  1033.     try {
  1034.       dststatus = dstfs.getFileStatus(args.dst);
  1035.     } catch(FileNotFoundException fnfe) {
  1036.       LOG.info(args.dst + " does not exist.");
  1037.     }
  1038.     // create dest path dir if copying > 1 file
  1039.     if (dststatus == null) {
  1040.       if (srcCount > 1 && !dstfs.mkdirs(args.dst)) {
  1041.         throw new IOException("Failed to create" + args.dst);
  1042.       }
  1043.     }
  1044.     
  1045.     final Path sorted = new Path(jobDirectory, "_distcp_sorted"); 
  1046.     checkDuplication(jobfs, dstfilelist, sorted, conf);
  1047.     if (dststatus != null && args.flags.contains(Options.DELETE)) {
  1048.       deleteNonexisting(dstfs, dststatus, sorted,
  1049.           jobfs, jobDirectory, jobConf, conf);
  1050.     }
  1051.     Path tmpDir = new Path(
  1052.         (dstExists && !dstIsDir) || (!dstExists && srcCount == 1)?
  1053.         args.dst.getParent(): args.dst, "_distcp_tmp_" + randomId);
  1054.     jobConf.set(TMP_DIR_LABEL, tmpDir.toUri().toString());
  1055.     LOG.info("srcCount=" + srcCount);
  1056.     jobConf.setInt(SRC_COUNT_LABEL, srcCount);
  1057.     jobConf.setLong(TOTAL_SIZE_LABEL, byteCount);
  1058.     setMapCount(byteCount, jobConf);
  1059.   }
  1060.   /**
  1061.    * Check whether the contents of src and dst are the same.
  1062.    * 
  1063.    * Return false if dstpath does not exist
  1064.    * 
  1065.    * If the files have different sizes, return false.
  1066.    * 
  1067.    * If the files have the same sizes, the file checksums will be compared.
  1068.    * 
  1069.    * When file checksum is not supported in any of file systems,
  1070.    * two files are considered as the same if they have the same size.
  1071.    */
  1072.   static private boolean sameFile(FileSystem srcfs, FileStatus srcstatus,
  1073.       FileSystem dstfs, Path dstpath) throws IOException {
  1074.     FileStatus dststatus;
  1075.     try {
  1076.       dststatus = dstfs.getFileStatus(dstpath);
  1077.     } catch(FileNotFoundException fnfe) {
  1078.       return false;
  1079.     }
  1080.     //same length?
  1081.     if (srcstatus.getLen() != dststatus.getLen()) {
  1082.       return false;
  1083.     }
  1084.     //compare checksums
  1085.     try {
  1086.       final FileChecksum srccs = srcfs.getFileChecksum(srcstatus.getPath());
  1087.       final FileChecksum dstcs = dstfs.getFileChecksum(dststatus.getPath());
  1088.       //return true if checksum is not supported
  1089.       //(i.e. some of the checksums is null)
  1090.       return srccs == null || dstcs == null || srccs.equals(dstcs);
  1091.     } catch(FileNotFoundException fnfe) {
  1092.       return false;
  1093.     }
  1094.   }
  1095.   
  1096.   /** Delete the dst files/dirs which do not exist in src */
  1097.   static private void deleteNonexisting(
  1098.       FileSystem dstfs, FileStatus dstroot, Path dstsorted,
  1099.       FileSystem jobfs, Path jobdir, JobConf jobconf, Configuration conf
  1100.       ) throws IOException {
  1101.     if (!dstroot.isDir()) {
  1102.       throw new IOException("dst must be a directory when option "
  1103.           + Options.DELETE.cmd + " is set, but dst (= " + dstroot.getPath()
  1104.           + ") is not a directory.");
  1105.     }
  1106.     //write dst lsr results
  1107.     final Path dstlsr = new Path(jobdir, "_distcp_dst_lsr");
  1108.     final SequenceFile.Writer writer = SequenceFile.createWriter(jobfs, jobconf,
  1109.         dstlsr, Text.class, FileStatus.class,
  1110.         SequenceFile.CompressionType.NONE);
  1111.     try {
  1112.       //do lsr to get all file statuses in dstroot
  1113.       final Stack<FileStatus> lsrstack = new Stack<FileStatus>();
  1114.       for(lsrstack.push(dstroot); !lsrstack.isEmpty(); ) {
  1115.         final FileStatus status = lsrstack.pop();
  1116.         if (status.isDir()) {
  1117.           for(FileStatus child : dstfs.listStatus(status.getPath())) {
  1118.             String relative = makeRelative(dstroot.getPath(), child.getPath());
  1119.             writer.append(new Text(relative), child);
  1120.             lsrstack.push(child);
  1121.           }
  1122.         }
  1123.       }
  1124.     } finally {
  1125.       checkAndClose(writer);
  1126.     }
  1127.     //sort lsr results
  1128.     final Path sortedlsr = new Path(jobdir, "_distcp_dst_lsr_sorted");
  1129.     SequenceFile.Sorter sorter = new SequenceFile.Sorter(jobfs,
  1130.         new Text.Comparator(), Text.class, FileStatus.class, jobconf);
  1131.     sorter.sort(dstlsr, sortedlsr);
  1132.     //compare lsr list and dst list  
  1133.     SequenceFile.Reader lsrin = null;
  1134.     SequenceFile.Reader dstin = null;
  1135.     try {
  1136.       lsrin = new SequenceFile.Reader(jobfs, sortedlsr, jobconf);
  1137.       dstin = new SequenceFile.Reader(jobfs, dstsorted, jobconf);
  1138.       //compare sorted lsr list and sorted dst list
  1139.       final Text lsrpath = new Text();
  1140.       final FileStatus lsrstatus = new FileStatus();
  1141.       final Text dstpath = new Text();
  1142.       final Text dstfrom = new Text();
  1143.       final FsShell shell = new FsShell(conf);
  1144.       final String[] shellargs = {"-rmr", null};
  1145.       boolean hasnext = dstin.next(dstpath, dstfrom);
  1146.       for(; lsrin.next(lsrpath, lsrstatus); ) {
  1147.         int dst_cmp_lsr = dstpath.compareTo(lsrpath);
  1148.         for(; hasnext && dst_cmp_lsr < 0; ) {
  1149.           hasnext = dstin.next(dstpath, dstfrom);
  1150.           dst_cmp_lsr = dstpath.compareTo(lsrpath);
  1151.         }
  1152.         
  1153.         if (dst_cmp_lsr == 0) {
  1154.           //lsrpath exists in dst, skip it
  1155.           hasnext = dstin.next(dstpath, dstfrom);
  1156.         }
  1157.         else {
  1158.           //lsrpath does not exist, delete it
  1159.           String s = new Path(dstroot.getPath(), lsrpath.toString()).toString();
  1160.           if (shellargs[1] == null || !isAncestorPath(shellargs[1], s)) {
  1161.             shellargs[1] = s;
  1162.             int r = 0;
  1163.             try {
  1164.                r = shell.run(shellargs);
  1165.             } catch(Exception e) {
  1166.               throw new IOException("Exception from shell.", e);
  1167.             }
  1168.             if (r != 0) {
  1169.               throw new IOException(""" + shellargs[0] + " " + shellargs[1]
  1170.                   + "" returns non-zero value " + r);
  1171.             }
  1172.           }
  1173.         }
  1174.       }
  1175.     } finally {
  1176.       checkAndClose(lsrin);
  1177.       checkAndClose(dstin);
  1178.     }
  1179.   }
  1180.   //is x an ancestor path of y?
  1181.   static private boolean isAncestorPath(String x, String y) {
  1182.     if (!y.startsWith(x)) {
  1183.       return false;
  1184.     }
  1185.     final int len = x.length();
  1186.     return y.length() == len || y.charAt(len) == Path.SEPARATOR_CHAR;  
  1187.   }
  1188.   
  1189.   /** Check whether the file list have duplication. */
  1190.   static private void checkDuplication(FileSystem fs, Path file, Path sorted,
  1191.     Configuration conf) throws IOException {
  1192.     SequenceFile.Reader in = null;
  1193.     try {
  1194.       SequenceFile.Sorter sorter = new SequenceFile.Sorter(fs,
  1195.         new Text.Comparator(), Text.class, Text.class, conf);
  1196.       sorter.sort(file, sorted);
  1197.       in = new SequenceFile.Reader(fs, sorted, conf);
  1198.       Text prevdst = null, curdst = new Text();
  1199.       Text prevsrc = null, cursrc = new Text(); 
  1200.       for(; in.next(curdst, cursrc); ) {
  1201.         if (prevdst != null && curdst.equals(prevdst)) {
  1202.           throw new DuplicationException(
  1203.             "Invalid input, there are duplicated files in the sources: "
  1204.             + prevsrc + ", " + cursrc);
  1205.         }
  1206.         prevdst = curdst;
  1207.         curdst = new Text();
  1208.         prevsrc = cursrc;
  1209.         cursrc = new Text();
  1210.       }
  1211.     }
  1212.     finally {
  1213.       checkAndClose(in);
  1214.     }
  1215.   } 
  1216.   static boolean checkAndClose(java.io.Closeable io) {
  1217.     if (io != null) {
  1218.       try {
  1219.         io.close();
  1220.       }
  1221.       catch(IOException ioe) {
  1222.         LOG.warn(StringUtils.stringifyException(ioe));
  1223.         return false;
  1224.       }
  1225.     }
  1226.     return true;
  1227.   }
  1228.   /** An exception class for duplicated source files. */
  1229.   public static class DuplicationException extends IOException {
  1230.     private static final long serialVersionUID = 1L;
  1231.     /** Error code for this exception */
  1232.     public static final int ERROR_CODE = -2;
  1233.     DuplicationException(String message) {super(message);}
  1234.   }
  1235. }