CleanupQueue.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.concurrent.LinkedBlockingQueue;
  21. import org.apache.commons.logging.Log;
  22. import org.apache.commons.logging.LogFactory;
  23. import org.apache.hadoop.fs.FileSystem;
  24. import org.apache.hadoop.fs.Path;
  25. class CleanupQueue {
  26.   public static final Log LOG =
  27.     LogFactory.getLog(CleanupQueue.class);
  28.   private static PathCleanupThread cleanupThread;
  29.   /**
  30.    * Create a singleton path-clean-up queue. It can be used to delete
  31.    * paths(directories/files) in a separate thread. This constructor creates a
  32.    * clean-up thread and also starts it as a daemon. Callers can instantiate one
  33.    * CleanupQueue per JVM and can use it for deleting paths. Use
  34.    * {@link CleanupQueue#addToQueue(JobConf, Path...)} to add paths for
  35.    * deletion.
  36.    */
  37.   public CleanupQueue() {
  38.     synchronized (PathCleanupThread.class) {
  39.       if (cleanupThread == null) {
  40.         cleanupThread = new PathCleanupThread();
  41.       }
  42.     }
  43.   }
  44.   
  45.   public void addToQueue(JobConf conf, Path...paths) {
  46.     cleanupThread.addToQueue(conf,paths);
  47.   }
  48.   private static class PathCleanupThread extends Thread {
  49.     static class PathAndConf {
  50.       JobConf conf;
  51.       Path path;
  52.       PathAndConf(JobConf conf, Path path) {
  53.         this.conf = conf;
  54.         this.path = path;
  55.       }
  56.     }
  57.     // cleanup queue which deletes files/directories of the paths queued up.
  58.     private LinkedBlockingQueue<PathAndConf> queue = new LinkedBlockingQueue<PathAndConf>();
  59.     public PathCleanupThread() {
  60.       setName("Directory/File cleanup thread");
  61.       setDaemon(true);
  62.       start();
  63.     }
  64.     public void addToQueue(JobConf conf,Path... paths) {
  65.       for (Path p : paths) {
  66.         try {
  67.           queue.put(new PathAndConf(conf,p));
  68.         } catch (InterruptedException ie) {}
  69.       }
  70.     }
  71.     public void run() {
  72.       LOG.debug(getName() + " started.");
  73.       PathAndConf pathAndConf = null;
  74.       while (true) {
  75.         try {
  76.           pathAndConf = queue.take();
  77.           // delete the path.
  78.           FileSystem fs = pathAndConf.path.getFileSystem(pathAndConf.conf);
  79.           fs.delete(pathAndConf.path, true);
  80.           LOG.debug("DELETED " + pathAndConf.path);
  81.         } catch (InterruptedException t) {
  82.           return;
  83.         } catch (Exception e) {
  84.           LOG.warn("Error deleting path" + pathAndConf.path);
  85.         } 
  86.       }
  87.     }
  88.   }
  89. }