HadoopArchives.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:23k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.tools;
  19. import java.io.FileNotFoundException;
  20. import java.io.IOException;
  21. import java.util.ArrayList;
  22. import java.util.HashSet;
  23. import java.util.Iterator;
  24. import java.util.List;
  25. import java.util.Map;
  26. import java.util.Set;
  27. import java.util.TreeMap;
  28. import org.apache.commons.logging.Log;
  29. import org.apache.commons.logging.LogFactory;
  30. import org.apache.hadoop.conf.Configuration;
  31. import org.apache.hadoop.fs.FSDataInputStream;
  32. import org.apache.hadoop.fs.FSDataOutputStream;
  33. import org.apache.hadoop.fs.FileStatus;
  34. import org.apache.hadoop.fs.FileSystem;
  35. import org.apache.hadoop.fs.HarFileSystem;
  36. import org.apache.hadoop.fs.Path;
  37. import org.apache.hadoop.io.IntWritable;
  38. import org.apache.hadoop.io.LongWritable;
  39. import org.apache.hadoop.io.SequenceFile;
  40. import org.apache.hadoop.io.Text;
  41. import org.apache.hadoop.mapred.FileInputFormat;
  42. import org.apache.hadoop.mapred.FileOutputFormat;
  43. import org.apache.hadoop.mapred.FileSplit;
  44. import org.apache.hadoop.mapred.InputFormat;
  45. import org.apache.hadoop.mapred.InputSplit;
  46. import org.apache.hadoop.mapred.JobClient;
  47. import org.apache.hadoop.mapred.JobConf;
  48. import org.apache.hadoop.mapred.Mapper;
  49. import org.apache.hadoop.mapred.OutputCollector;
  50. import org.apache.hadoop.mapred.RecordReader;
  51. import org.apache.hadoop.mapred.Reducer;
  52. import org.apache.hadoop.mapred.SequenceFileRecordReader;
  53. import org.apache.hadoop.mapred.Reporter;
  54. import org.apache.hadoop.mapred.lib.NullOutputFormat;
  55. import org.apache.hadoop.util.Tool;
  56. import org.apache.hadoop.util.ToolRunner;
  57. /**
  58.  * a archive creation utility.
  59.  * This class provides methods that can be used 
  60.  * to create hadoop archives. For understanding of 
  61.  * Hadoop archives look at {@link HarFileSystem}.
  62.  */
  63. public class HadoopArchives implements Tool {
  64.   private static final Log LOG = LogFactory.getLog(HadoopArchives.class);
  65.   
  66.   private static final String NAME = "har"; 
  67.   static final String SRC_LIST_LABEL = NAME + ".src.list";
  68.   static final String DST_DIR_LABEL = NAME + ".dest.path";
  69.   static final String TMP_DIR_LABEL = NAME + ".tmp.dir";
  70.   static final String JOB_DIR_LABEL = NAME + ".job.dir";
  71.   static final String SRC_COUNT_LABEL = NAME + ".src.count";
  72.   static final String TOTAL_SIZE_LABEL = NAME + ".total.size";
  73.   static final String DST_HAR_LABEL = NAME + ".archive.name";
  74.   // size of each part file
  75.   // its fixed for now.
  76.   static final long partSize = 2 * 1024 * 1024 * 1024l;
  77.   private static final String usage = "archive"
  78.   + " -archiveName NAME <src>* <dest>" +
  79.   "n";
  80.   
  81.  
  82.   private JobConf conf;
  83.   public void setConf(Configuration conf) {
  84.     if (conf instanceof JobConf) {
  85.       this.conf = (JobConf) conf;
  86.     } else {
  87.       this.conf = new JobConf(conf, HadoopArchives.class);
  88.     }
  89.   }
  90.   public Configuration getConf() {
  91.     return this.conf;
  92.   }
  93.   public HadoopArchives(Configuration conf) {
  94.     setConf(conf);
  95.   }
  96.   // check the src paths
  97.   private static void checkPaths(Configuration conf, List<Path> paths) throws
  98.   IOException {
  99.     for (Path p : paths) {
  100.       FileSystem fs = p.getFileSystem(conf);
  101.       if (!fs.exists(p)) {
  102.         throw new FileNotFoundException("Source " + p + " does not exist.");
  103.       }
  104.     }
  105.   }
  106.   /**
  107.    * this assumes that there are two types of files file/dir
  108.    * @param fs the input filesystem
  109.    * @param p the top level path 
  110.    * @param out the list of paths output of recursive ls
  111.    * @throws IOException
  112.    */
  113.   private void recursivels(FileSystem fs, Path p, List<FileStatus> out) 
  114.   throws IOException {
  115.     FileStatus fstatus = fs.getFileStatus(p);
  116.     if (!fstatus.isDir()) {
  117.       out.add(fstatus);
  118.       return;
  119.     }
  120.     else {
  121.       out.add(fstatus);
  122.       FileStatus[] listStatus = fs.listStatus(p);
  123.       for (FileStatus stat: listStatus) {
  124.         recursivels(fs, stat.getPath(), out);
  125.       }
  126.     }
  127.   }
  128.   /**
  129.    * Input format of a hadoop archive job responsible for 
  130.    * generating splits of the file list
  131.    */
  132.   static class HArchiveInputFormat implements InputFormat<LongWritable, Text> {
  133.     //generate input splits from the src file lists
  134.     public InputSplit[] getSplits(JobConf jconf, int numSplits)
  135.     throws IOException {
  136.       String srcfilelist = jconf.get(SRC_LIST_LABEL, "");
  137.       if ("".equals(srcfilelist)) {
  138.           throw new IOException("Unable to get the " +
  139.               "src file for archive generation.");
  140.       }
  141.       long totalSize = jconf.getLong(TOTAL_SIZE_LABEL, -1);
  142.       if (totalSize == -1) {
  143.         throw new IOException("Invalid size of files to archive");
  144.       }
  145.       //we should be safe since this is set by our own code
  146.       Path src = new Path(srcfilelist);
  147.       FileSystem fs = src.getFileSystem(jconf);
  148.       FileStatus fstatus = fs.getFileStatus(src);
  149.       ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
  150.       LongWritable key = new LongWritable();
  151.       Text value = new Text();
  152.       SequenceFile.Reader reader = null;
  153.       // the remaining bytes in the file split
  154.       long remaining = fstatus.getLen();
  155.       // the count of sizes calculated till now
  156.       long currentCount = 0L;
  157.       // the endposition of the split
  158.       long lastPos = 0L;
  159.       // the start position of the split
  160.       long startPos = 0L;
  161.       long targetSize = totalSize/numSplits;
  162.       // create splits of size target size so that all the maps 
  163.       // have equals sized data to read and write to.
  164.       try {
  165.         reader = new SequenceFile.Reader(fs, src, jconf);
  166.         while(reader.next(key, value)) {
  167.           if (currentCount + key.get() > targetSize && currentCount != 0){
  168.             long size = lastPos - startPos;
  169.             splits.add(new FileSplit(src, startPos, size, (String[]) null));
  170.             remaining = remaining - size;
  171.             startPos = lastPos;
  172.             currentCount = 0L;
  173.           }
  174.           currentCount += key.get();
  175.           lastPos = reader.getPosition();
  176.         }
  177.         // the remaining not equal to the target size.
  178.         if (remaining != 0) {
  179.           splits.add(new FileSplit(src, startPos, remaining, (String[])null));
  180.         }
  181.       }
  182.       finally { 
  183.         reader.close();
  184.       }
  185.       return splits.toArray(new FileSplit[splits.size()]);
  186.     }
  187.     public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
  188.         JobConf job, Reporter reporter) throws IOException {
  189.       return new SequenceFileRecordReader<LongWritable, Text>(job,
  190.                  (FileSplit)split);
  191.     }
  192.   }
  193.   private boolean checkValidName(String name) {
  194.     Path tmp = new Path(name);
  195.     if (tmp.depth() != 1) {
  196.       return false;
  197.     }
  198.     if (name.endsWith(".har")) 
  199.       return true;
  200.     return false;
  201.   }
  202.   
  203.   private Path largestDepth(List<Path> paths) {
  204.     Path deepest = paths.get(0);
  205.     for (Path p: paths) {
  206.       if (p.depth() > deepest.depth()) {
  207.         deepest = p;
  208.       }
  209.     }
  210.     return deepest;
  211.   }
  212.   
  213.   // this method is tricky. This method writes 
  214.   // the top level directories in such a way so that 
  215.   // the output only contains valid directoreis in archives.
  216.   // so for an input path specified by the user 
  217.   // as /user/hadoop
  218.   // we need to index 
  219.   // / as the root 
  220.   // /user as a directory
  221.   // /user/hadoop as a directory
  222.   // so for multiple input paths it makes sure that it
  223.   // does the right thing.
  224.   // so if the user specifies the input directories as 
  225.   // /user/harry and /user/hadoop
  226.   // we need to write / and user as its child
  227.   // and /user and harry and hadoop as its children
  228.   private void writeTopLevelDirs(SequenceFile.Writer srcWriter, 
  229.       List<Path> paths) throws IOException {
  230.     //these are qualified paths 
  231.     List<Path> justDirs = new ArrayList<Path>();
  232.     for (Path p: paths) {
  233.       if (!p.getFileSystem(getConf()).isFile(p)) {
  234.         justDirs.add(new Path(p.toUri().getPath()));
  235.       }
  236.       else {
  237.         justDirs.add(new Path(p.getParent().toUri().getPath()));
  238.       }
  239.     }
  240.     
  241.     //get the largest depth path
  242.     // this is tricky
  243.     TreeMap<String, HashSet<String>> allpaths = new TreeMap<String, HashSet<String>>();
  244.     Path deepest = largestDepth(paths);
  245.     Path root = new Path(Path.SEPARATOR);
  246.     for (int i = 0; i < deepest.depth(); i++) {
  247.       List<Path> parents = new ArrayList<Path>();
  248.       for (Path p: justDirs) {
  249.         if (p.compareTo(root) == 0){
  250.           //don nothing
  251.         }
  252.         else {
  253.           Path parent = p.getParent();
  254.           if (allpaths.containsKey(parent.toString())) {
  255.             HashSet<String> children = allpaths.get(parent.toString());
  256.             children.add(p.getName());
  257.           }
  258.           else {
  259.             HashSet<String> children = new HashSet<String>();
  260.             children.add(p.getName());
  261.             allpaths.put(parent.toString(), children);
  262.           }
  263.           parents.add(parent);
  264.         }
  265.       }
  266.       justDirs = parents;
  267.     }
  268.     Set<Map.Entry<String, HashSet<String>>> keyVals = allpaths.entrySet();
  269.     for (Map.Entry<String, HashSet<String>> entry : keyVals) {
  270.       HashSet<String> children = entry.getValue();
  271.       String toWrite = entry.getKey() + " dir ";
  272.       StringBuffer sbuff = new StringBuffer();
  273.       sbuff.append(toWrite);
  274.       for (String child: children) {
  275.         sbuff.append(child + " ");
  276.       }
  277.       toWrite = sbuff.toString();
  278.       srcWriter.append(new LongWritable(0L), new Text(toWrite));
  279.     }
  280.   }
  281.   
  282.   /**archive the given source paths into
  283.    * the dest
  284.    * @param srcPaths the src paths to be archived
  285.    * @param dest the dest dir that will contain the archive
  286.    */
  287.   public void archive(List<Path> srcPaths, String archiveName, Path dest) 
  288.   throws IOException {
  289.     checkPaths(conf, srcPaths);
  290.     int numFiles = 0;
  291.     long totalSize = 0;
  292.     conf.set(DST_HAR_LABEL, archiveName);
  293.     Path outputPath = new Path(dest, archiveName);
  294.     FileOutputFormat.setOutputPath(conf, outputPath);
  295.     FileSystem outFs = outputPath.getFileSystem(conf);
  296.     if (outFs.exists(outputPath) || outFs.isFile(dest)) {
  297.       throw new IOException("Invalid Output.");
  298.     }
  299.     conf.set(DST_DIR_LABEL, outputPath.toString());
  300.     final String randomId = DistCp.getRandomId();
  301.     Path jobDirectory = new Path(new JobClient(conf).getSystemDir(),
  302.                           NAME + "_" + randomId);
  303.     conf.set(JOB_DIR_LABEL, jobDirectory.toString());
  304.     //get a tmp directory for input splits
  305.     FileSystem jobfs = jobDirectory.getFileSystem(conf);
  306.     jobfs.mkdirs(jobDirectory);
  307.     Path srcFiles = new Path(jobDirectory, "_har_src_files");
  308.     conf.set(SRC_LIST_LABEL, srcFiles.toString());
  309.     SequenceFile.Writer srcWriter = SequenceFile.createWriter(jobfs, conf,
  310.         srcFiles, LongWritable.class, Text.class, 
  311.         SequenceFile.CompressionType.NONE);
  312.     // get the list of files 
  313.     // create single list of files and dirs
  314.     try {
  315.       // write the top level dirs in first 
  316.       writeTopLevelDirs(srcWriter, srcPaths);
  317.       srcWriter.sync();
  318.       // these are the input paths passed 
  319.       // from the command line
  320.       // we do a recursive ls on these paths 
  321.       // and then write them to the input file 
  322.       // one at a time
  323.       for (Path src: srcPaths) {
  324.         FileSystem fs = src.getFileSystem(conf);
  325.         ArrayList<FileStatus> allFiles = new ArrayList<FileStatus>();
  326.         recursivels(fs, src, allFiles);
  327.         for (FileStatus stat: allFiles) {
  328.           String toWrite = "";
  329.           long len = stat.isDir()? 0:stat.getLen();
  330.           if (stat.isDir()) {
  331.             toWrite = "" + fs.makeQualified(stat.getPath()) + " dir ";
  332.             //get the children 
  333.             FileStatus[] list = fs.listStatus(stat.getPath());
  334.             StringBuffer sbuff = new StringBuffer();
  335.             sbuff.append(toWrite);
  336.             for (FileStatus stats: list) {
  337.               sbuff.append(stats.getPath().getName() + " ");
  338.             }
  339.             toWrite = sbuff.toString();
  340.           }
  341.           else {
  342.             toWrite +=  fs.makeQualified(stat.getPath()) + " file ";
  343.           }
  344.           srcWriter.append(new LongWritable(len), new 
  345.               Text(toWrite));
  346.           srcWriter.sync();
  347.           numFiles++;
  348.           totalSize += len;
  349.         }
  350.       }
  351.     } finally {
  352.       srcWriter.close();
  353.     }
  354.     //increase the replication of src files
  355.     jobfs.setReplication(srcFiles, (short) 10);
  356.     conf.setInt(SRC_COUNT_LABEL, numFiles);
  357.     conf.setLong(TOTAL_SIZE_LABEL, totalSize);
  358.     int numMaps = (int)(totalSize/partSize);
  359.     //run atleast one map.
  360.     conf.setNumMapTasks(numMaps == 0? 1:numMaps);
  361.     conf.setNumReduceTasks(1);
  362.     conf.setInputFormat(HArchiveInputFormat.class);
  363.     conf.setOutputFormat(NullOutputFormat.class);
  364.     conf.setMapperClass(HArchivesMapper.class);
  365.     conf.setReducerClass(HArchivesReducer.class);
  366.     conf.setMapOutputKeyClass(IntWritable.class);
  367.     conf.setMapOutputValueClass(Text.class);
  368.     conf.set("hadoop.job.history.user.location", "none");
  369.     FileInputFormat.addInputPath(conf, jobDirectory);
  370.     //make sure no speculative execution is done
  371.     conf.setSpeculativeExecution(false);
  372.     JobClient.runJob(conf);
  373.     //delete the tmp job directory
  374.     try {
  375.       jobfs.delete(jobDirectory, true);
  376.     } catch(IOException ie) {
  377.       LOG.info("Unable to clean tmp directory " + jobDirectory);
  378.     }
  379.   }
  380.   static class HArchivesMapper 
  381.   implements Mapper<LongWritable, Text, IntWritable, Text> {
  382.     private JobConf conf = null;
  383.     int partId = -1 ; 
  384.     Path tmpOutputDir = null;
  385.     Path tmpOutput = null;
  386.     String partname = null;
  387.     FSDataOutputStream partStream = null;
  388.     FileSystem destFs = null;
  389.     byte[] buffer;
  390.     int buf_size = 128 * 1024;
  391.     
  392.     // configure the mapper and create 
  393.     // the part file.
  394.     // use map reduce framework to write into
  395.     // tmp files. 
  396.     public void configure(JobConf conf) {
  397.       this.conf = conf;
  398.       // this is tightly tied to map reduce
  399.       // since it does not expose an api 
  400.       // to get the partition
  401.       partId = conf.getInt("mapred.task.partition", -1);
  402.       // create a file name using the partition
  403.       // we need to write to this directory
  404.       tmpOutputDir = FileOutputFormat.getWorkOutputPath(conf);
  405.       // get the output path and write to the tmp 
  406.       // directory 
  407.       partname = "part-" + partId;
  408.       tmpOutput = new Path(tmpOutputDir, partname);
  409.       try {
  410.         destFs = tmpOutput.getFileSystem(conf);
  411.         //this was a stale copy
  412.         if (destFs.exists(tmpOutput)) {
  413.           destFs.delete(tmpOutput, false);
  414.         }
  415.         partStream = destFs.create(tmpOutput);
  416.       } catch(IOException ie) {
  417.         throw new RuntimeException("Unable to open output file " + tmpOutput);
  418.       }
  419.       buffer = new byte[buf_size];
  420.     }
  421.     // copy raw data.
  422.     public void copyData(Path input, FSDataInputStream fsin, 
  423.         FSDataOutputStream fout, Reporter reporter) throws IOException {
  424.       try {
  425.         for (int cbread=0; (cbread = fsin.read(buffer))>= 0;) {
  426.           fout.write(buffer, 0,cbread);
  427.           reporter.progress();
  428.         }
  429.       } finally {
  430.         fsin.close();
  431.       }
  432.     }
  433.     
  434.     // the relative path of p. basically 
  435.     // getting rid of schema. Parsing and doing 
  436.     // string manipulation is not good - so
  437.     // just use the path api to do it.
  438.     private Path makeRelative(Path p) {
  439.       Path retPath = new Path(p.toUri().getPath());
  440.       return retPath;
  441.     }
  442.     
  443.     static class MapStat {
  444.       private String pathname;
  445.       private boolean isDir;
  446.       private List<String> children;
  447.       public MapStat(String line) {
  448.         String[] splits = line.split(" ");
  449.         pathname = splits[0];
  450.         if ("dir".equals(splits[1])) {
  451.           isDir = true;
  452.         }
  453.         else {
  454.           isDir = false;
  455.         }
  456.         if (isDir) {
  457.           children = new ArrayList<String>();
  458.           for (int i = 2; i < splits.length; i++) {
  459.             children.add(splits[i]);
  460.           }
  461.         }
  462.       }
  463.     }
  464.     // read files from the split input 
  465.     // and write it onto the part files.
  466.     // also output hash(name) and string 
  467.     // for reducer to create index 
  468.     // and masterindex files.
  469.     public void map(LongWritable key, Text value,
  470.         OutputCollector<IntWritable, Text> out,
  471.         Reporter reporter) throws IOException {
  472.       String line  = value.toString();
  473.       MapStat mstat = new MapStat(line);
  474.       Path srcPath = new Path(mstat.pathname);
  475.       String towrite = null;
  476.       Path relPath = makeRelative(srcPath);
  477.       int hash = HarFileSystem.getHarHash(relPath);
  478.       long startPos = partStream.getPos();
  479.       if (mstat.isDir) { 
  480.         towrite = relPath.toString() + " " + "dir none " + 0 + " " + 0 + " ";
  481.         StringBuffer sbuff = new StringBuffer();
  482.         sbuff.append(towrite);
  483.         for (String child: mstat.children) {
  484.           sbuff.append(child + " ");
  485.         }
  486.         towrite = sbuff.toString();
  487.         //reading directories is also progress
  488.         reporter.progress();
  489.       }
  490.       else {
  491.         FileSystem srcFs = srcPath.getFileSystem(conf);
  492.         FileStatus srcStatus = srcFs.getFileStatus(srcPath);
  493.         FSDataInputStream input = srcFs.open(srcStatus.getPath());
  494.         reporter.setStatus("Copying file " + srcStatus.getPath() + 
  495.             " to archive.");
  496.         copyData(srcStatus.getPath(), input, partStream, reporter);
  497.         towrite = relPath.toString() + " file " + partname + " " + startPos
  498.         + " " + srcStatus.getLen() + " ";
  499.       }
  500.       out.collect(new IntWritable(hash), new Text(towrite));
  501.     }
  502.     
  503.     public void close() throws IOException {
  504.       // close the part files.
  505.       partStream.close();
  506.     }
  507.   }
  508.   
  509.   /** the reduce for creating the index and the master index 
  510.    * 
  511.    */
  512.   static class HArchivesReducer implements Reducer<IntWritable, 
  513.   Text, Text, Text> {
  514.     private JobConf conf = null;
  515.     private long startIndex = 0;
  516.     private long endIndex = 0;
  517.     private long startPos = 0;
  518.     private Path masterIndex = null;
  519.     private Path index = null;
  520.     private FileSystem fs = null;
  521.     private FSDataOutputStream outStream = null;
  522.     private FSDataOutputStream indexStream = null;
  523.     private int numIndexes = 1000;
  524.     private Path tmpOutputDir = null;
  525.     private int written = 0;
  526.     private int keyVal = 0;
  527.     
  528.     // configure 
  529.     public void configure(JobConf conf) {
  530.       this.conf = conf;
  531.       tmpOutputDir = FileOutputFormat.getWorkOutputPath(this.conf);
  532.       masterIndex = new Path(tmpOutputDir, "_masterindex");
  533.       index = new Path(tmpOutputDir, "_index");
  534.       try {
  535.         fs = masterIndex.getFileSystem(conf);
  536.         if (fs.exists(masterIndex)) {
  537.           fs.delete(masterIndex, false);
  538.         }
  539.         if (fs.exists(index)) {
  540.           fs.delete(index, false);
  541.         }
  542.         indexStream = fs.create(index);
  543.         outStream = fs.create(masterIndex);
  544.         String version = HarFileSystem.VERSION + " n";
  545.         outStream.write(version.getBytes());
  546.         
  547.       } catch(IOException e) {
  548.         throw new RuntimeException(e);
  549.       }
  550.     }
  551.     
  552.     // create the index and master index. The input to 
  553.     // the reduce is already sorted by the hash of the 
  554.     // files. SO we just need to write it to the index. 
  555.     // We update the masterindex as soon as we update 
  556.     // numIndex entries.
  557.     public void reduce(IntWritable key, Iterator<Text> values,
  558.         OutputCollector<Text, Text> out,
  559.         Reporter reporter) throws IOException {
  560.       keyVal = key.get();
  561.       while(values.hasNext()) {
  562.         Text value = values.next();
  563.         String towrite = value.toString() + "n";
  564.         indexStream.write(towrite.getBytes());
  565.         written++;
  566.         if (written > numIndexes -1) {
  567.           // every 1000 indexes we report status
  568.           reporter.setStatus("Creating index for archives");
  569.           reporter.progress();
  570.           endIndex = keyVal;
  571.           String masterWrite = startIndex + " " + endIndex + " " + startPos 
  572.                               +  " " + indexStream.getPos() + " n" ;
  573.           outStream.write(masterWrite.getBytes());
  574.           startPos = indexStream.getPos();
  575.           startIndex = endIndex;
  576.           written = 0;
  577.         }
  578.       }
  579.     }
  580.     
  581.     public void close() throws IOException {
  582.       //write the last part of the master index.
  583.       if (written > 0) {
  584.         String masterWrite = startIndex + " " + keyVal + " " + startPos  +
  585.                              " " + indexStream.getPos() + " n";
  586.         outStream.write(masterWrite.getBytes());
  587.       }
  588.       // close the streams
  589.       outStream.close();
  590.       indexStream.close();
  591.       // try increasing the replication 
  592.       fs.setReplication(index, (short) 10);
  593.       fs.setReplication(masterIndex, (short) 10);
  594.     }
  595.     
  596.   }
  597.   
  598.   /** the main driver for creating the archives
  599.    *  it takes at least two command line parameters. The src and the 
  600.    *  dest. It does an lsr on the source paths.
  601.    *  The mapper created archuves and the reducer creates 
  602.    *  the archive index.
  603.    */
  604.   public int run(String[] args) throws Exception {
  605.     try {
  606.       List<Path> srcPaths = new ArrayList<Path>();
  607.       Path destPath = null;
  608.       // check we were supposed to archive or 
  609.       // unarchive
  610.       String archiveName = null;
  611.       if (args.length < 4) {
  612.         System.out.println(usage);
  613.         throw new IOException("Invalid usage.");
  614.       }
  615.       if (!"-archiveName".equals(args[0])) {
  616.         System.out.println(usage);
  617.         throw new IOException("Archive Name not specified.");
  618.       }
  619.       archiveName = args[1];
  620.       if (!checkValidName(archiveName)) {
  621.         System.out.println(usage);
  622.         throw new IOException("Invalid name for archives. " + archiveName);
  623.       }
  624.       for (int i = 2; i < args.length; i++) {
  625.         if (i == (args.length - 1)) {
  626.           destPath = new Path(args[i]);
  627.         }
  628.         else {
  629.           srcPaths.add(new Path(args[i]));
  630.         }
  631.       }
  632.       if (srcPaths.size() == 0) {
  633.         System.out.println(usage);
  634.         throw new IOException("Invalid Usage: No input sources specified.");
  635.       }
  636.       // do a glob on the srcPaths and then pass it on
  637.       List<Path> globPaths = new ArrayList<Path>();
  638.       for (Path p: srcPaths) {
  639.         FileSystem fs = p.getFileSystem(getConf());
  640.         FileStatus[] statuses = fs.globStatus(p);
  641.         for (FileStatus status: statuses) {
  642.           globPaths.add(fs.makeQualified(status.getPath()));
  643.         }
  644.       }
  645.       archive(globPaths, archiveName, destPath);
  646.     } catch(IOException ie) {
  647.       System.err.println(ie.getLocalizedMessage());
  648.       return -1;
  649.     }
  650.     return 0;
  651.   }
  652.   /** the main functions **/
  653.   public static void main(String[] args) {
  654.     JobConf job = new JobConf(HadoopArchives.class);
  655.     HadoopArchives harchives = new HadoopArchives(job);
  656.     try {
  657.       int res = harchives.run(args);
  658.       System.exit(res);
  659.     } catch(Exception e) {
  660.       System.err.println(e.getLocalizedMessage());
  661.     }
  662.   }
  663. }