DistributedCache.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:33k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.filecache;
  19. import org.apache.commons.logging.*;
  20. import java.io.*;
  21. import java.util.*;
  22. import org.apache.hadoop.conf.*;
  23. import org.apache.hadoop.util.*;
  24. import org.apache.hadoop.fs.*;
  25. import java.net.URI;
  26. /**
  27.  * Distribute application-specific large, read-only files efficiently.
  28.  * 
  29.  * <p><code>DistributedCache</code> is a facility provided by the Map-Reduce
  30.  * framework to cache files (text, archives, jars etc.) needed by applications.
  31.  * </p>
  32.  * 
  33.  * <p>Applications specify the files, via urls (hdfs:// or http://) to be cached 
  34.  * via the {@link org.apache.hadoop.mapred.JobConf}.
  35.  * The <code>DistributedCache</code> assumes that the
  36.  * files specified via hdfs:// urls are already present on the 
  37.  * {@link FileSystem} at the path specified by the url.</p>
  38.  * 
  39.  * <p>The framework will copy the necessary files on to the slave node before 
  40.  * any tasks for the job are executed on that node. Its efficiency stems from 
  41.  * the fact that the files are only copied once per job and the ability to 
  42.  * cache archives which are un-archived on the slaves.</p> 
  43.  *
  44.  * <p><code>DistributedCache</code> can be used to distribute simple, read-only
  45.  * data/text files and/or more complex types such as archives, jars etc. 
  46.  * Archives (zip, tar and tgz/tar.gz files) are un-archived at the slave nodes. 
  47.  * Jars may be optionally added to the classpath of the tasks, a rudimentary 
  48.  * software distribution mechanism.  Files have execution permissions.
  49.  * Optionally users can also direct it to symlink the distributed cache file(s)
  50.  * into the working directory of the task.</p>
  51.  * 
  52.  * <p><code>DistributedCache</code> tracks modification timestamps of the cache 
  53.  * files. Clearly the cache files should not be modified by the application 
  54.  * or externally while the job is executing.</p>
  55.  * 
  56.  * <p>Here is an illustrative example on how to use the 
  57.  * <code>DistributedCache</code>:</p>
  58.  * <p><blockquote><pre>
  59.  *     // Setting up the cache for the application
  60.  *     
  61.  *     1. Copy the requisite files to the <code>FileSystem</code>:
  62.  *     
  63.  *     $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat  
  64.  *     $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip  
  65.  *     $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar
  66.  *     $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar
  67.  *     $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz
  68.  *     $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz
  69.  *     
  70.  *     2. Setup the application's <code>JobConf</code>:
  71.  *     
  72.  *     JobConf job = new JobConf();
  73.  *     DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"), 
  74.  *                                   job);
  75.  *     DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);
  76.  *     DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);
  77.  *     DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);
  78.  *     DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);
  79.  *     DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);
  80.  *     
  81.  *     3. Use the cached files in the {@link org.apache.hadoop.mapred.Mapper}
  82.  *     or {@link org.apache.hadoop.mapred.Reducer}:
  83.  *     
  84.  *     public static class MapClass extends MapReduceBase  
  85.  *     implements Mapper&lt;K, V, K, V&gt; {
  86.  *     
  87.  *       private Path[] localArchives;
  88.  *       private Path[] localFiles;
  89.  *       
  90.  *       public void configure(JobConf job) {
  91.  *         // Get the cached archives/files
  92.  *         localArchives = DistributedCache.getLocalCacheArchives(job);
  93.  *         localFiles = DistributedCache.getLocalCacheFiles(job);
  94.  *       }
  95.  *       
  96.  *       public void map(K key, V value, 
  97.  *                       OutputCollector&lt;K, V&gt; output, Reporter reporter) 
  98.  *       throws IOException {
  99.  *         // Use data from the cached archives/files here
  100.  *         // ...
  101.  *         // ...
  102.  *         output.collect(k, v);
  103.  *       }
  104.  *     }
  105.  *     
  106.  * </pre></blockquote></p>
  107.  * 
  108.  * @see org.apache.hadoop.mapred.JobConf
  109.  * @see org.apache.hadoop.mapred.JobClient
  110.  */
  111. public class DistributedCache {
  112.   // cacheID to cacheStatus mapping
  113.   private static TreeMap<String, CacheStatus> cachedArchives = new TreeMap<String, CacheStatus>();
  114.   
  115.   private static TreeMap<Path, Long> baseDirSize = new TreeMap<Path, Long>();
  116.   
  117.   // default total cache size
  118.   private static final long DEFAULT_CACHE_SIZE = 10737418240L;
  119.   private static final Log LOG =
  120.     LogFactory.getLog(DistributedCache.class);
  121.   
  122.   /**
  123.    * Get the locally cached file or archive; it could either be 
  124.    * previously cached (and valid) or copy it from the {@link FileSystem} now.
  125.    * 
  126.    * @param cache the cache to be localized, this should be specified as 
  127.    * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
  128.    * or hostname:port is provided the file is assumed to be in the filesystem
  129.    * being used in the Configuration
  130.    * @param conf The Confguration file which contains the filesystem
  131.    * @param baseDir The base cache Dir where you wnat to localize the files/archives
  132.    * @param fileStatus The file status on the dfs.
  133.    * @param isArchive if the cache is an archive or a file. In case it is an
  134.    *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
  135.    *  be unzipped/unjarred/untarred automatically 
  136.    *  and the directory where the archive is unzipped/unjarred/untarred is
  137.    *  returned as the Path.
  138.    *  In case of a file, the path to the file is returned
  139.    * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
  140.    * file to be cached hasn't changed since the job started
  141.    * @param currentWorkDir this is the directory where you would want to create symlinks 
  142.    * for the locally cached files/archives
  143.    * @return the path to directory where the archives are unjarred in case of archives,
  144.    * the path to the file where the file is copied locally 
  145.    * @throws IOException
  146.    */
  147.   public static Path getLocalCache(URI cache, Configuration conf, 
  148.                                    Path baseDir, FileStatus fileStatus,
  149.                                    boolean isArchive, long confFileStamp,
  150.                                    Path currentWorkDir) 
  151.   throws IOException {
  152.     return getLocalCache(cache, conf, baseDir, fileStatus, isArchive, 
  153.         confFileStamp, currentWorkDir, true);
  154.   }
  155.   /**
  156.    * Get the locally cached file or archive; it could either be 
  157.    * previously cached (and valid) or copy it from the {@link FileSystem} now.
  158.    * 
  159.    * @param cache the cache to be localized, this should be specified as 
  160.    * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
  161.    * or hostname:port is provided the file is assumed to be in the filesystem
  162.    * being used in the Configuration
  163.    * @param conf The Confguration file which contains the filesystem
  164.    * @param baseDir The base cache Dir where you wnat to localize the files/archives
  165.    * @param fileStatus The file status on the dfs.
  166.    * @param isArchive if the cache is an archive or a file. In case it is an
  167.    *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will
  168.    *  be unzipped/unjarred/untarred automatically 
  169.    *  and the directory where the archive is unzipped/unjarred/untarred is
  170.    *  returned as the Path.
  171.    *  In case of a file, the path to the file is returned
  172.    * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
  173.    * file to be cached hasn't changed since the job started
  174.    * @param currentWorkDir this is the directory where you would want to create symlinks 
  175.    * for the locally cached files/archives
  176.    * @param honorSymLinkConf if this is false, then the symlinks are not
  177.    * created even if conf says so (this is required for an optimization in task
  178.    * launches
  179.    * @return the path to directory where the archives are unjarred in case of archives,
  180.    * the path to the file where the file is copied locally 
  181.    * @throws IOException
  182.    */
  183.   public static Path getLocalCache(URI cache, Configuration conf, 
  184.       Path baseDir, FileStatus fileStatus,
  185.       boolean isArchive, long confFileStamp,
  186.       Path currentWorkDir, boolean honorSymLinkConf) 
  187.   throws IOException {
  188.     String cacheId = makeRelative(cache, conf);
  189.     CacheStatus lcacheStatus;
  190.     Path localizedPath;
  191.     synchronized (cachedArchives) {
  192.       lcacheStatus = cachedArchives.get(cacheId);
  193.       if (lcacheStatus == null) {
  194.         // was never localized
  195.         lcacheStatus = new CacheStatus(baseDir, new Path(baseDir, new Path(cacheId)));
  196.         cachedArchives.put(cacheId, lcacheStatus);
  197.       }
  198.       synchronized (lcacheStatus) {
  199.         localizedPath = localizeCache(conf, cache, confFileStamp, lcacheStatus, 
  200.             fileStatus, isArchive, currentWorkDir, honorSymLinkConf);
  201.         lcacheStatus.refcount++;
  202.       }
  203.     }
  204.     // try deleting stuff if you can
  205.     long size = 0;
  206.     synchronized (baseDirSize) {
  207.       Long get = baseDirSize.get(baseDir);
  208.       if ( get != null ) {
  209.      size = get.longValue();
  210.       }
  211.     }
  212.     // setting the cache size to a default of 10GB
  213.     long allowedSize = conf.getLong("local.cache.size", DEFAULT_CACHE_SIZE);
  214.     if (allowedSize < size) {
  215.       // try some cache deletions
  216.       deleteCache(conf);
  217.     }
  218.     return localizedPath;
  219.   }
  220.   
  221.   /**
  222.    * Get the locally cached file or archive; it could either be 
  223.    * previously cached (and valid) or copy it from the {@link FileSystem} now.
  224.    * 
  225.    * @param cache the cache to be localized, this should be specified as 
  226.    * new URI(hdfs://hostname:port/absolute_path_to_file#LINKNAME). If no schema 
  227.    * or hostname:port is provided the file is assumed to be in the filesystem
  228.    * being used in the Configuration
  229.    * @param conf The Confguration file which contains the filesystem
  230.    * @param baseDir The base cache Dir where you wnat to localize the files/archives
  231.    * @param isArchive if the cache is an archive or a file. In case it is an 
  232.    *  archive with a .zip or .jar or .tar or .tgz or .tar.gz extension it will 
  233.    *  be unzipped/unjarred/untarred automatically 
  234.    *  and the directory where the archive is unzipped/unjarred/untarred 
  235.    *  is returned as the Path.
  236.    *  In case of a file, the path to the file is returned
  237.    * @param confFileStamp this is the hdfs file modification timestamp to verify that the 
  238.    * file to be cached hasn't changed since the job started
  239.    * @param currentWorkDir this is the directory where you would want to create symlinks 
  240.    * for the locally cached files/archives
  241.    * @return the path to directory where the archives are unjarred in case of archives,
  242.    * the path to the file where the file is copied locally 
  243.    * @throws IOException
  244.    */
  245.   public static Path getLocalCache(URI cache, Configuration conf, 
  246.                                    Path baseDir, boolean isArchive,
  247.                                    long confFileStamp, Path currentWorkDir) 
  248.   throws IOException {
  249.     return getLocalCache(cache, conf, 
  250.                          baseDir, null, isArchive,
  251.                          confFileStamp, currentWorkDir);
  252.   }
  253.   
  254.   /**
  255.    * This is the opposite of getlocalcache. When you are done with
  256.    * using the cache, you need to release the cache
  257.    * @param cache The cache URI to be released
  258.    * @param conf configuration which contains the filesystem the cache 
  259.    * is contained in.
  260.    * @throws IOException
  261.    */
  262.   public static void releaseCache(URI cache, Configuration conf)
  263.     throws IOException {
  264.     String cacheId = makeRelative(cache, conf);
  265.     synchronized (cachedArchives) {
  266.       CacheStatus lcacheStatus = cachedArchives.get(cacheId);
  267.       if (lcacheStatus == null)
  268.         return;
  269.       synchronized (lcacheStatus) {
  270.         lcacheStatus.refcount--;
  271.       }
  272.     }
  273.   }
  274.   
  275.   // To delete the caches which have a refcount of zero
  276.   
  277.   private static void deleteCache(Configuration conf) throws IOException {
  278.     // try deleting cache Status with refcount of zero
  279.     synchronized (cachedArchives) {
  280.       for (Iterator it = cachedArchives.keySet().iterator(); it.hasNext();) {
  281.         String cacheId = (String) it.next();
  282.         CacheStatus lcacheStatus = cachedArchives.get(cacheId);
  283.         synchronized (lcacheStatus) {
  284.           if (lcacheStatus.refcount == 0) {
  285.             // delete this cache entry
  286.             FileSystem.getLocal(conf).delete(lcacheStatus.localLoadPath, true);
  287.             synchronized (baseDirSize) {
  288.               Long dirSize = baseDirSize.get(lcacheStatus.baseDir);
  289.               if ( dirSize != null ) {
  290.              dirSize -= lcacheStatus.size;
  291.              baseDirSize.put(lcacheStatus.baseDir, dirSize);
  292.               }
  293.             }
  294.             it.remove();
  295.           }
  296.         }
  297.       }
  298.     }
  299.   }
  300.   /*
  301.    * Returns the relative path of the dir this cache will be localized in
  302.    * relative path that this cache will be localized in. For
  303.    * hdfs://hostname:port/absolute_path -- the relative path is
  304.    * hostname/absolute path -- if it is just /absolute_path -- then the
  305.    * relative path is hostname of DFS this mapred cluster is running
  306.    * on/absolute_path
  307.    */
  308.   public static String makeRelative(URI cache, Configuration conf)
  309.     throws IOException {
  310.     String host = cache.getHost();
  311.     if (host == null) {
  312.       host = cache.getScheme();
  313.     }
  314.     if (host == null) {
  315.       URI defaultUri = FileSystem.get(conf).getUri();
  316.       host = defaultUri.getHost();
  317.       if (host == null) {
  318.         host = defaultUri.getScheme();
  319.       }
  320.     }
  321.     String path = host + cache.getPath();
  322.     path = path.replace(":/","/");                // remove windows device colon
  323.     return path;
  324.   }
  325.   private static Path cacheFilePath(Path p) {
  326.     return new Path(p, p.getName());
  327.   }
  328.   // the method which actually copies the caches locally and unjars/unzips them
  329.   // and does chmod for the files
  330.   private static Path localizeCache(Configuration conf, 
  331.                                     URI cache, long confFileStamp,
  332.                                     CacheStatus cacheStatus,
  333.                                     FileStatus fileStatus,
  334.                                     boolean isArchive, 
  335.                                     Path currentWorkDir,boolean honorSymLinkConf) 
  336.   throws IOException {
  337.     boolean doSymlink = honorSymLinkConf && getSymlink(conf);
  338.     if(cache.getFragment() == null) {
  339.      doSymlink = false;
  340.     }
  341.     FileSystem fs = getFileSystem(cache, conf);
  342.     String link = currentWorkDir.toString() + Path.SEPARATOR + cache.getFragment();
  343.     File flink = new File(link);
  344.     if (ifExistsAndFresh(conf, fs, cache, confFileStamp,
  345.                            cacheStatus, fileStatus)) {
  346.       if (isArchive) {
  347.         if (doSymlink){
  348.           if (!flink.exists())
  349.             FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
  350.                              link);
  351.         }
  352.         return cacheStatus.localLoadPath;
  353.       }
  354.       else {
  355.         if (doSymlink){
  356.           if (!flink.exists())
  357.             FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
  358.                              link);
  359.         }
  360.         return cacheFilePath(cacheStatus.localLoadPath);
  361.       }
  362.     } else {
  363.       // remove the old archive
  364.       // if the old archive cannot be removed since it is being used by another
  365.       // job
  366.       // return null
  367.       if (cacheStatus.refcount > 1 && (cacheStatus.currentStatus == true))
  368.         throw new IOException("Cache " + cacheStatus.localLoadPath.toString()
  369.                               + " is in use and cannot be refreshed");
  370.       
  371.       FileSystem localFs = FileSystem.getLocal(conf);
  372.       localFs.delete(cacheStatus.localLoadPath, true);
  373.       synchronized (baseDirSize) {
  374.      Long dirSize = baseDirSize.get(cacheStatus.baseDir);
  375.      if ( dirSize != null ) {
  376.        dirSize -= cacheStatus.size;
  377.        baseDirSize.put(cacheStatus.baseDir, dirSize);
  378.      }
  379.       }
  380.       Path parchive = new Path(cacheStatus.localLoadPath,
  381.                                new Path(cacheStatus.localLoadPath.getName()));
  382.       
  383.       if (!localFs.mkdirs(cacheStatus.localLoadPath)) {
  384.         throw new IOException("Mkdirs failed to create directory " + 
  385.                               cacheStatus.localLoadPath.toString());
  386.       }
  387.       String cacheId = cache.getPath();
  388.       fs.copyToLocalFile(new Path(cacheId), parchive);
  389.       if (isArchive) {
  390.         String tmpArchive = parchive.toString().toLowerCase();
  391.         File srcFile = new File(parchive.toString());
  392.         File destDir = new File(parchive.getParent().toString());
  393.         if (tmpArchive.endsWith(".jar")) {
  394.           RunJar.unJar(srcFile, destDir);
  395.         } else if (tmpArchive.endsWith(".zip")) {
  396.           FileUtil.unZip(srcFile, destDir);
  397.         } else if (isTarFile(tmpArchive)) {
  398.           FileUtil.unTar(srcFile, destDir);
  399.         }
  400.         // else will not do anyhting
  401.         // and copy the file into the dir as it is
  402.       }
  403.       
  404.       long cacheSize = FileUtil.getDU(new File(parchive.getParent().toString()));
  405.       cacheStatus.size = cacheSize;
  406.       synchronized (baseDirSize) {
  407.        Long dirSize = baseDirSize.get(cacheStatus.baseDir);
  408.        if( dirSize == null ) {
  409.          dirSize = Long.valueOf(cacheSize);
  410.        } else {
  411.          dirSize += cacheSize;
  412.        }
  413.        baseDirSize.put(cacheStatus.baseDir, dirSize);
  414.       }
  415.       
  416.       // do chmod here 
  417.       try {
  418.      FileUtil.chmod(parchive.toString(), "+x");
  419.       } catch(InterruptedException e) {
  420.      LOG.warn("Exception in chmod" + e.toString());
  421.       }
  422.       // update cacheStatus to reflect the newly cached file
  423.       cacheStatus.currentStatus = true;
  424.       cacheStatus.mtime = getTimestamp(conf, cache);
  425.     }
  426.     
  427.     if (isArchive){
  428.       if (doSymlink){
  429.         if (!flink.exists())
  430.           FileUtil.symLink(cacheStatus.localLoadPath.toString(), 
  431.                            link);
  432.       }
  433.       return cacheStatus.localLoadPath;
  434.     }
  435.     else {
  436.       if (doSymlink){
  437.         if (!flink.exists())
  438.           FileUtil.symLink(cacheFilePath(cacheStatus.localLoadPath).toString(), 
  439.                            link);
  440.       }
  441.       return cacheFilePath(cacheStatus.localLoadPath);
  442.     }
  443.   }
  444.   private static boolean isTarFile(String filename) {
  445.     return (filename.endsWith(".tgz") || filename.endsWith(".tar.gz") ||
  446.            filename.endsWith(".tar"));
  447.   }
  448.   
  449.   // Checks if the cache has already been localized and is fresh
  450.   private static boolean ifExistsAndFresh(Configuration conf, FileSystem fs, 
  451.                                           URI cache, long confFileStamp, 
  452.                                           CacheStatus lcacheStatus,
  453.                                           FileStatus fileStatus) 
  454.   throws IOException {
  455.     // check for existence of the cache
  456.     if (lcacheStatus.currentStatus == false) {
  457.       return false;
  458.     } else {
  459.       long dfsFileStamp;
  460.       if (fileStatus != null) {
  461.         dfsFileStamp = fileStatus.getModificationTime();
  462.       } else {
  463.         dfsFileStamp = getTimestamp(conf, cache);
  464.       }
  465.       // ensure that the file on hdfs hasn't been modified since the job started 
  466.       if (dfsFileStamp != confFileStamp) {
  467.         LOG.fatal("File: " + cache + " has changed on HDFS since job started");
  468.         throw new IOException("File: " + cache + 
  469.                               " has changed on HDFS since job started");
  470.       }
  471.       
  472.       if (dfsFileStamp != lcacheStatus.mtime) {
  473.         // needs refreshing
  474.         return false;
  475.       }
  476.     }
  477.     
  478.     return true;
  479.   }
  480.   /**
  481.    * Returns mtime of a given cache file on hdfs.
  482.    * @param conf configuration
  483.    * @param cache cache file 
  484.    * @return mtime of a given cache file on hdfs
  485.    * @throws IOException
  486.    */
  487.   public static long getTimestamp(Configuration conf, URI cache)
  488.     throws IOException {
  489.     FileSystem fileSystem = FileSystem.get(cache, conf);
  490.     Path filePath = new Path(cache.getPath());
  491.     return fileSystem.getFileStatus(filePath).getModificationTime();
  492.   }
  493.   
  494.   /**
  495.    * This method create symlinks for all files in a given dir in another directory
  496.    * @param conf the configuration
  497.    * @param jobCacheDir the target directory for creating symlinks
  498.    * @param workDir the directory in which the symlinks are created
  499.    * @throws IOException
  500.    */
  501.   public static void createAllSymlink(Configuration conf, File jobCacheDir, File workDir)
  502.     throws IOException{
  503.     if ((jobCacheDir == null || !jobCacheDir.isDirectory()) ||
  504.            workDir == null || (!workDir.isDirectory())) {
  505.       return;
  506.     }
  507.     boolean createSymlink = getSymlink(conf);
  508.     if (createSymlink){
  509.       File[] list = jobCacheDir.listFiles();
  510.       for (int i=0; i < list.length; i++){
  511.         FileUtil.symLink(list[i].getAbsolutePath(),
  512.                          new File(workDir, list[i].getName()).toString());
  513.       }
  514.     }  
  515.   }
  516.   
  517.   private static String getFileSysName(URI url) {
  518.     String fsname = url.getScheme();
  519.     if ("hdfs".equals(fsname)) {
  520.       String host = url.getHost();
  521.       int port = url.getPort();
  522.       return (port == (-1)) ? host : (host + ":" + port);
  523.     } else {
  524.       return null;
  525.     }
  526.   }
  527.   
  528.   private static FileSystem getFileSystem(URI cache, Configuration conf)
  529.     throws IOException {
  530.     String fileSysName = getFileSysName(cache);
  531.     if (fileSysName != null)
  532.       return FileSystem.getNamed(fileSysName, conf);
  533.     else
  534.       return FileSystem.get(conf);
  535.   }
  536.   /**
  537.    * Set the configuration with the given set of archives
  538.    * @param archives The list of archives that need to be localized
  539.    * @param conf Configuration which will be changed
  540.    */
  541.   public static void setCacheArchives(URI[] archives, Configuration conf) {
  542.     String sarchives = StringUtils.uriToString(archives);
  543.     conf.set("mapred.cache.archives", sarchives);
  544.   }
  545.   /**
  546.    * Set the configuration with the given set of files
  547.    * @param files The list of files that need to be localized
  548.    * @param conf Configuration which will be changed
  549.    */
  550.   public static void setCacheFiles(URI[] files, Configuration conf) {
  551.     String sfiles = StringUtils.uriToString(files);
  552.     conf.set("mapred.cache.files", sfiles);
  553.   }
  554.   /**
  555.    * Get cache archives set in the Configuration
  556.    * @param conf The configuration which contains the archives
  557.    * @return A URI array of the caches set in the Configuration
  558.    * @throws IOException
  559.    */
  560.   public static URI[] getCacheArchives(Configuration conf) throws IOException {
  561.     return StringUtils.stringToURI(conf.getStrings("mapred.cache.archives"));
  562.   }
  563.   /**
  564.    * Get cache files set in the Configuration
  565.    * @param conf The configuration which contains the files
  566.    * @return A URI array of the files set in the Configuration
  567.    * @throws IOException
  568.    */
  569.   public static URI[] getCacheFiles(Configuration conf) throws IOException {
  570.     return StringUtils.stringToURI(conf.getStrings("mapred.cache.files"));
  571.   }
  572.   /**
  573.    * Return the path array of the localized caches
  574.    * @param conf Configuration that contains the localized archives
  575.    * @return A path array of localized caches
  576.    * @throws IOException
  577.    */
  578.   public static Path[] getLocalCacheArchives(Configuration conf)
  579.     throws IOException {
  580.     return StringUtils.stringToPath(conf
  581.                                     .getStrings("mapred.cache.localArchives"));
  582.   }
  583.   /**
  584.    * Return the path array of the localized files
  585.    * @param conf Configuration that contains the localized files
  586.    * @return A path array of localized files
  587.    * @throws IOException
  588.    */
  589.   public static Path[] getLocalCacheFiles(Configuration conf)
  590.     throws IOException {
  591.     return StringUtils.stringToPath(conf.getStrings("mapred.cache.localFiles"));
  592.   }
  593.   /**
  594.    * Get the timestamps of the archives
  595.    * @param conf The configuration which stored the timestamps
  596.    * @return a string array of timestamps 
  597.    * @throws IOException
  598.    */
  599.   public static String[] getArchiveTimestamps(Configuration conf) {
  600.     return conf.getStrings("mapred.cache.archives.timestamps");
  601.   }
  602.   /**
  603.    * Get the timestamps of the files
  604.    * @param conf The configuration which stored the timestamps
  605.    * @return a string array of timestamps 
  606.    * @throws IOException
  607.    */
  608.   public static String[] getFileTimestamps(Configuration conf) {
  609.     return conf.getStrings("mapred.cache.files.timestamps");
  610.   }
  611.   /**
  612.    * This is to check the timestamp of the archives to be localized
  613.    * @param conf Configuration which stores the timestamp's
  614.    * @param timestamps comma separated list of timestamps of archives.
  615.    * The order should be the same as the order in which the archives are added.
  616.    */
  617.   public static void setArchiveTimestamps(Configuration conf, String timestamps) {
  618.     conf.set("mapred.cache.archives.timestamps", timestamps);
  619.   }
  620.   /**
  621.    * This is to check the timestamp of the files to be localized
  622.    * @param conf Configuration which stores the timestamp's
  623.    * @param timestamps comma separated list of timestamps of files.
  624.    * The order should be the same as the order in which the files are added.
  625.    */
  626.   public static void setFileTimestamps(Configuration conf, String timestamps) {
  627.     conf.set("mapred.cache.files.timestamps", timestamps);
  628.   }
  629.   
  630.   /**
  631.    * Set the conf to contain the location for localized archives 
  632.    * @param conf The conf to modify to contain the localized caches
  633.    * @param str a comma separated list of local archives
  634.    */
  635.   public static void setLocalArchives(Configuration conf, String str) {
  636.     conf.set("mapred.cache.localArchives", str);
  637.   }
  638.   /**
  639.    * Set the conf to contain the location for localized files 
  640.    * @param conf The conf to modify to contain the localized caches
  641.    * @param str a comma separated list of local files
  642.    */
  643.   public static void setLocalFiles(Configuration conf, String str) {
  644.     conf.set("mapred.cache.localFiles", str);
  645.   }
  646.   /**
  647.    * Add a archives to be localized to the conf
  648.    * @param uri The uri of the cache to be localized
  649.    * @param conf Configuration to add the cache to
  650.    */
  651.   public static void addCacheArchive(URI uri, Configuration conf) {
  652.     String archives = conf.get("mapred.cache.archives");
  653.     conf.set("mapred.cache.archives", archives == null ? uri.toString()
  654.              : archives + "," + uri.toString());
  655.   }
  656.   
  657.   /**
  658.    * Add a file to be localized to the conf
  659.    * @param uri The uri of the cache to be localized
  660.    * @param conf Configuration to add the cache to
  661.    */
  662.   public static void addCacheFile(URI uri, Configuration conf) {
  663.     String files = conf.get("mapred.cache.files");
  664.     conf.set("mapred.cache.files", files == null ? uri.toString() : files + ","
  665.              + uri.toString());
  666.   }
  667.   /**
  668.    * Add an file path to the current set of classpath entries It adds the file
  669.    * to cache as well.
  670.    * 
  671.    * @param file Path of the file to be added
  672.    * @param conf Configuration that contains the classpath setting
  673.    */
  674.   public static void addFileToClassPath(Path file, Configuration conf)
  675.     throws IOException {
  676.     String classpath = conf.get("mapred.job.classpath.files");
  677.     conf.set("mapred.job.classpath.files", classpath == null ? file.toString()
  678.              : classpath + System.getProperty("path.separator") + file.toString());
  679.     FileSystem fs = FileSystem.get(conf);
  680.     URI uri = fs.makeQualified(file).toUri();
  681.     addCacheFile(uri, conf);
  682.   }
  683.   /**
  684.    * Get the file entries in classpath as an array of Path
  685.    * 
  686.    * @param conf Configuration that contains the classpath setting
  687.    */
  688.   public static Path[] getFileClassPaths(Configuration conf) {
  689.     String classpath = conf.get("mapred.job.classpath.files");
  690.     if (classpath == null)
  691.       return null;
  692.     ArrayList list = Collections.list(new StringTokenizer(classpath, System
  693.                                                           .getProperty("path.separator")));
  694.     Path[] paths = new Path[list.size()];
  695.     for (int i = 0; i < list.size(); i++) {
  696.       paths[i] = new Path((String) list.get(i));
  697.     }
  698.     return paths;
  699.   }
  700.   /**
  701.    * Add an archive path to the current set of classpath entries. It adds the
  702.    * archive to cache as well.
  703.    * 
  704.    * @param archive Path of the archive to be added
  705.    * @param conf Configuration that contains the classpath setting
  706.    */
  707.   public static void addArchiveToClassPath(Path archive, Configuration conf)
  708.     throws IOException {
  709.     String classpath = conf.get("mapred.job.classpath.archives");
  710.     conf.set("mapred.job.classpath.archives", classpath == null ? archive
  711.              .toString() : classpath + System.getProperty("path.separator")
  712.              + archive.toString());
  713.     FileSystem fs = FileSystem.get(conf);
  714.     URI uri = fs.makeQualified(archive).toUri();
  715.     addCacheArchive(uri, conf);
  716.   }
  717.   /**
  718.    * Get the archive entries in classpath as an array of Path
  719.    * 
  720.    * @param conf Configuration that contains the classpath setting
  721.    */
  722.   public static Path[] getArchiveClassPaths(Configuration conf) {
  723.     String classpath = conf.get("mapred.job.classpath.archives");
  724.     if (classpath == null)
  725.       return null;
  726.     ArrayList list = Collections.list(new StringTokenizer(classpath, System
  727.                                                           .getProperty("path.separator")));
  728.     Path[] paths = new Path[list.size()];
  729.     for (int i = 0; i < list.size(); i++) {
  730.       paths[i] = new Path((String) list.get(i));
  731.     }
  732.     return paths;
  733.   }
  734.   /**
  735.    * This method allows you to create symlinks in the current working directory
  736.    * of the task to all the cache files/archives
  737.    * @param conf the jobconf 
  738.    */
  739.   public static void createSymlink(Configuration conf){
  740.     conf.set("mapred.create.symlink", "yes");
  741.   }
  742.   
  743.   /**
  744.    * This method checks to see if symlinks are to be create for the 
  745.    * localized cache files in the current working directory 
  746.    * @param conf the jobconf
  747.    * @return true if symlinks are to be created- else return false
  748.    */
  749.   public static boolean getSymlink(Configuration conf){
  750.     String result = conf.get("mapred.create.symlink");
  751.     if ("yes".equals(result)){
  752.       return true;
  753.     }
  754.     return false;
  755.   }
  756.   /**
  757.    * This method checks if there is a conflict in the fragment names 
  758.    * of the uris. Also makes sure that each uri has a fragment. It 
  759.    * is only to be called if you want to create symlinks for 
  760.    * the various archives and files.
  761.    * @param uriFiles The uri array of urifiles
  762.    * @param uriArchives the uri array of uri archives
  763.    */
  764.   public static boolean checkURIs(URI[]  uriFiles, URI[] uriArchives){
  765.     if ((uriFiles == null) && (uriArchives == null)){
  766.       return true;
  767.     }
  768.     if (uriFiles != null){
  769.       for (int i = 0; i < uriFiles.length; i++){
  770.         String frag1 = uriFiles[i].getFragment();
  771.         if (frag1 == null)
  772.           return false;
  773.         for (int j=i+1; j < uriFiles.length; j++){
  774.           String frag2 = uriFiles[j].getFragment();
  775.           if (frag2 == null)
  776.             return false;
  777.           if (frag1.equalsIgnoreCase(frag2))
  778.             return false;
  779.         }
  780.         if (uriArchives != null){
  781.           for (int j = 0; j < uriArchives.length; j++){
  782.             String frag2 = uriArchives[j].getFragment();
  783.             if (frag2 == null){
  784.               return false;
  785.             }
  786.             if (frag1.equalsIgnoreCase(frag2))
  787.               return false;
  788.             for (int k=j+1; k < uriArchives.length; k++){
  789.               String frag3 = uriArchives[k].getFragment();
  790.               if (frag3 == null)
  791.                 return false;
  792.               if (frag2.equalsIgnoreCase(frag3))
  793.                 return false;
  794.             }
  795.           }
  796.         }
  797.       }
  798.     }
  799.     return true;
  800.   }
  801.   private static class CacheStatus {
  802.     // false, not loaded yet, true is loaded
  803.     boolean currentStatus;
  804.     // the local load path of this cache
  805.     Path localLoadPath;
  806.     
  807.     //the base dir where the cache lies
  808.     Path baseDir;
  809.     
  810.     //the size of this cache
  811.     long size;
  812.     // number of instances using this cache
  813.     int refcount;
  814.     // the cache-file modification time
  815.     long mtime;
  816.     public CacheStatus(Path baseDir, Path localLoadPath) {
  817.       super();
  818.       this.currentStatus = false;
  819.       this.localLoadPath = localLoadPath;
  820.       this.refcount = 0;
  821.       this.mtime = -1;
  822.       this.baseDir = baseDir;
  823.       this.size = 0;
  824.     }
  825.   }
  826.   /**
  827.    * Clear the entire contents of the cache and delete the backing files. This
  828.    * should only be used when the server is reinitializing, because the users
  829.    * are going to lose their files.
  830.    */
  831.   public static void purgeCache(Configuration conf) throws IOException {
  832.     synchronized (cachedArchives) {
  833.       FileSystem localFs = FileSystem.getLocal(conf);
  834.       for (Map.Entry<String,CacheStatus> f: cachedArchives.entrySet()) {
  835.         try {
  836.           localFs.delete(f.getValue().localLoadPath, true);
  837.         } catch (IOException ie) {
  838.           LOG.debug("Error cleaning up cache", ie);
  839.         }
  840.       }
  841.       cachedArchives.clear();
  842.     }
  843.   }
  844. }