IndexCache.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:5k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.IOException;
  20. import java.util.concurrent.ConcurrentHashMap;
  21. import java.util.concurrent.LinkedBlockingQueue;
  22. import java.util.concurrent.atomic.AtomicInteger;
  23. import org.apache.commons.logging.Log;
  24. import org.apache.commons.logging.LogFactory;
  25. import org.apache.hadoop.fs.Path;
  26. class IndexCache {
  27.   private final JobConf conf;
  28.   private final int totalMemoryAllowed;
  29.   private AtomicInteger totalMemoryUsed = new AtomicInteger();
  30.   private static final Log LOG = LogFactory.getLog(IndexCache.class);
  31.   private final ConcurrentHashMap<String,IndexInformation> cache =
  32.     new ConcurrentHashMap<String,IndexInformation>();
  33.   
  34.   private final LinkedBlockingQueue<String> queue = 
  35.     new LinkedBlockingQueue<String>();
  36.   public IndexCache(JobConf conf) {
  37.     this.conf = conf;
  38.     totalMemoryAllowed =
  39.       conf.getInt("mapred.tasktracker.indexcache.mb", 10) * 1024 * 1024;
  40.     LOG.info("IndexCache created with max memory = " + totalMemoryAllowed);
  41.   }
  42.   /**
  43.    * This method gets the index information for the given mapId and reduce.
  44.    * It reads the index file into cache if it is not already present.
  45.    * @param mapId
  46.    * @param reduce
  47.    * @param fileName The file to read the index information from if it is not
  48.    *                 already present in the cache
  49.    * @return The Index Information
  50.    * @throws IOException
  51.    */
  52.   public IndexRecord getIndexInformation(String mapId, int reduce,
  53.       Path fileName) throws IOException {
  54.     IndexInformation info = cache.get(mapId);
  55.     if (info == null) {
  56.       info = readIndexFileToCache(fileName, mapId);
  57.     } else {
  58.       synchronized (info) {
  59.         while (null == info.mapSpillRecord) {
  60.           try {
  61.             info.wait();
  62.           } catch (InterruptedException e) {
  63.             throw new IOException("Interrupted waiting for construction", e);
  64.           }
  65.         }
  66.       }
  67.       LOG.debug("IndexCache HIT: MapId " + mapId + " found");
  68.     }
  69.     if (info.mapSpillRecord.size() == 0 ||
  70.         info.mapSpillRecord.size() < reduce) {
  71.       throw new IOException("Invalid request " +
  72.         " Map Id = " + mapId + " Reducer = " + reduce +
  73.         " Index Info Length = " + info.mapSpillRecord.size());
  74.     }
  75.     return info.mapSpillRecord.getIndex(reduce);
  76.   }
  77.   private IndexInformation readIndexFileToCache(Path indexFileName,
  78.       String mapId) throws IOException {
  79.     IndexInformation info;
  80.     IndexInformation newInd = new IndexInformation();
  81.     if ((info = cache.putIfAbsent(mapId, newInd)) != null) {
  82.       synchronized (info) {
  83.         while (null == info.mapSpillRecord) {
  84.           try {
  85.             info.wait();
  86.           } catch (InterruptedException e) {
  87.             throw new IOException("Interrupted waiting for construction", e);
  88.           }
  89.         }
  90.       }
  91.       LOG.debug("IndexCache HIT: MapId " + mapId + " found");
  92.       return info;
  93.     }
  94.     LOG.debug("IndexCache MISS: MapId " + mapId + " not found") ;
  95.     SpillRecord tmp = null;
  96.     try { 
  97.       tmp = new SpillRecord(indexFileName, conf);
  98.     } catch (Throwable e) { 
  99.       tmp = new SpillRecord(0);
  100.       cache.remove(mapId);
  101.       throw new IOException("Error Reading IndexFile", e);
  102.     } finally { 
  103.       synchronized (newInd) { 
  104.         newInd.mapSpillRecord = tmp;
  105.         newInd.notifyAll();
  106.       } 
  107.     } 
  108.     queue.add(mapId);
  109.     
  110.     if (totalMemoryUsed.addAndGet(newInd.getSize()) > totalMemoryAllowed) {
  111.       freeIndexInformation();
  112.     }
  113.     return newInd;
  114.   }
  115.   /**
  116.    * This method removes the map from the cache. It should be called when
  117.    * a map output on this tracker is discarded.
  118.    * @param mapId The taskID of this map.
  119.    */
  120.   public void removeMap(String mapId) {
  121.     IndexInformation info = cache.remove(mapId);
  122.     if (info != null) {
  123.       totalMemoryUsed.addAndGet(-info.getSize());
  124.       if (!queue.remove(mapId)) {
  125.         LOG.warn("Map ID" + mapId + " not found in queue!!");
  126.       }
  127.     } else {
  128.       LOG.info("Map ID " + mapId + " not found in cache");
  129.     }
  130.   }
  131.   /**
  132.    * Bring memory usage below totalMemoryAllowed.
  133.    */
  134.   private synchronized void freeIndexInformation() {
  135.     while (totalMemoryUsed.get() > totalMemoryAllowed) {
  136.       String s = queue.remove();
  137.       IndexInformation info = cache.remove(s);
  138.       if (info != null) {
  139.         totalMemoryUsed.addAndGet(-info.getSize());
  140.       }
  141.     }
  142.   }
  143.   private static class IndexInformation {
  144.     SpillRecord mapSpillRecord;
  145.     int getSize() {
  146.       return mapSpillRecord == null
  147.         ? 0
  148.         : mapSpillRecord.size() * MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
  149.     }
  150.   }
  151. }