SortedRanges.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:11k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred;
  19. import java.io.DataInput;
  20. import java.io.DataOutput;
  21. import java.io.IOException;
  22. import java.util.Iterator;
  23. import java.util.SortedSet;
  24. import java.util.TreeSet;
  25. import org.apache.commons.logging.Log;
  26. import org.apache.commons.logging.LogFactory;
  27. import org.apache.hadoop.io.Writable;
  28. /**
  29.  * Keeps the Ranges sorted by startIndex.
  30.  * The added ranges are always ensured to be non-overlapping.
  31.  * Provides the SkipRangeIterator, which skips the Ranges 
  32.  * stored in this object.
  33.  */
  34. class SortedRanges implements Writable{
  35.   
  36.   private static final Log LOG = 
  37.     LogFactory.getLog(SortedRanges.class);
  38.   
  39.   private TreeSet<Range> ranges = new TreeSet<Range>();
  40.   private long indicesCount;
  41.   
  42.   /**
  43.    * Get Iterator which skips the stored ranges.
  44.    * The Iterator.next() call return the index starting from 0.
  45.    * @return SkipRangeIterator
  46.    */
  47.   synchronized SkipRangeIterator skipRangeIterator(){
  48.     return new SkipRangeIterator(ranges.iterator());
  49.   }
  50.   
  51.   /**
  52.    * Get the no of indices stored in the ranges.
  53.    * @return indices count
  54.    */
  55.   synchronized long getIndicesCount() {
  56.     return indicesCount;
  57.   }
  58.   
  59.   /**
  60.    * Get the sorted set of ranges.
  61.    * @return ranges
  62.    */
  63.   synchronized SortedSet<Range> getRanges() {
  64.    return ranges;
  65.   }
  66.   
  67.   /**
  68.    * Add the range indices. It is ensured that the added range 
  69.    * doesn't overlap the existing ranges. If it overlaps, the 
  70.    * existing overlapping ranges are removed and a single range 
  71.    * having the superset of all the removed ranges and this range 
  72.    * is added. 
  73.    * If the range is of 0 length, doesn't do anything.
  74.    * @param range Range to be added.
  75.    */
  76.   synchronized void add(Range range){
  77.     if(range.isEmpty()) {
  78.       return;
  79.     }
  80.     
  81.     long startIndex = range.getStartIndex();
  82.     long endIndex = range.getEndIndex();
  83.     //make sure that there are no overlapping ranges
  84.     SortedSet<Range> headSet = ranges.headSet(range);
  85.     if(headSet.size()>0) {
  86.       Range previousRange = headSet.last();
  87.       LOG.debug("previousRange "+previousRange);
  88.       if(startIndex<previousRange.getEndIndex()) {
  89.         //previousRange overlaps this range
  90.         //remove the previousRange
  91.         if(ranges.remove(previousRange)) {
  92.           indicesCount-=previousRange.getLength();
  93.         }
  94.         //expand this range
  95.         startIndex = previousRange.getStartIndex();
  96.         endIndex = endIndex>=previousRange.getEndIndex() ?
  97.                           endIndex : previousRange.getEndIndex();
  98.       }
  99.     }
  100.     
  101.     Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
  102.     while(tailSetIt.hasNext()) {
  103.       Range nextRange = tailSetIt.next();
  104.       LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
  105.           "  endIndex:"+endIndex);
  106.       if(endIndex>=nextRange.getStartIndex()) {
  107.         //nextRange overlaps this range
  108.         //remove the nextRange
  109.         tailSetIt.remove();
  110.         indicesCount-=nextRange.getLength();
  111.         if(endIndex<nextRange.getEndIndex()) {
  112.           //expand this range
  113.           endIndex = nextRange.getEndIndex();
  114.           break;
  115.         }
  116.       } else {
  117.         break;
  118.       }
  119.     }
  120.     add(startIndex,endIndex);
  121.   }
  122.   
  123.   /**
  124.    * Remove the range indices. If this range is  
  125.    * found in existing ranges, the existing ranges 
  126.    * are shrunk.
  127.    * If range is of 0 length, doesn't do anything.
  128.    * @param range Range to be removed.
  129.    */
  130.   synchronized void remove(Range range) {
  131.     if(range.isEmpty()) {
  132.       return;
  133.     }
  134.     long startIndex = range.getStartIndex();
  135.     long endIndex = range.getEndIndex();
  136.     //make sure that there are no overlapping ranges
  137.     SortedSet<Range> headSet = ranges.headSet(range);
  138.     if(headSet.size()>0) {
  139.       Range previousRange = headSet.last();
  140.       LOG.debug("previousRange "+previousRange);
  141.       if(startIndex<previousRange.getEndIndex()) {
  142.         //previousRange overlaps this range
  143.         //narrow down the previousRange
  144.         if(ranges.remove(previousRange)) {
  145.           indicesCount-=previousRange.getLength();
  146.           LOG.debug("removed previousRange "+previousRange);
  147.         }
  148.         add(previousRange.getStartIndex(), startIndex);
  149.         if(endIndex<=previousRange.getEndIndex()) {
  150.           add(endIndex, previousRange.getEndIndex());
  151.         }
  152.       }
  153.     }
  154.     
  155.     Iterator<Range> tailSetIt = ranges.tailSet(range).iterator();
  156.     while(tailSetIt.hasNext()) {
  157.       Range nextRange = tailSetIt.next();
  158.       LOG.debug("nextRange "+nextRange +"   startIndex:"+startIndex+
  159.           "  endIndex:"+endIndex);
  160.       if(endIndex>nextRange.getStartIndex()) {
  161.         //nextRange overlaps this range
  162.         //narrow down the nextRange
  163.         tailSetIt.remove();
  164.         indicesCount-=nextRange.getLength();
  165.         if(endIndex<nextRange.getEndIndex()) {
  166.           add(endIndex, nextRange.getEndIndex());
  167.           break;
  168.         }
  169.       } else {
  170.         break;
  171.       }
  172.     }
  173.   }
  174.   
  175.   private void add(long start, long end) {
  176.     if(end>start) {
  177.       Range recRange = new Range(start, end-start);
  178.       ranges.add(recRange);
  179.       indicesCount+=recRange.getLength();
  180.       LOG.debug("added "+recRange);
  181.     }
  182.   }
  183.   
  184.   public synchronized void readFields(DataInput in) throws IOException {
  185.     indicesCount = in.readLong();
  186.     ranges = new TreeSet<Range>();
  187.     int size = in.readInt();
  188.     for(int i=0;i<size;i++) {
  189.       Range range = new Range();
  190.       range.readFields(in);
  191.       ranges.add(range);
  192.     }
  193.   }
  194.   public synchronized void write(DataOutput out) throws IOException {
  195.     out.writeLong(indicesCount);
  196.     out.writeInt(ranges.size());
  197.     Iterator<Range> it = ranges.iterator();
  198.     while(it.hasNext()) {
  199.       Range range = it.next();
  200.       range.write(out);
  201.     }
  202.   }
  203.   
  204.   public String toString() {
  205.     StringBuffer sb = new StringBuffer();
  206.     Iterator<Range> it = ranges.iterator();
  207.     while(it.hasNext()) {
  208.       Range range = it.next();
  209.       sb.append(range.toString()+"n");
  210.     }
  211.     return sb.toString();
  212.   }
  213.   
  214.   /**
  215.    * Index Range. Comprises of start index and length.
  216.    * A Range can be of 0 length also. The Range stores indices 
  217.    * of type long.
  218.    */
  219.   static class Range implements Comparable<Range>, Writable{
  220.     private long startIndex;
  221.     private long length;
  222.         
  223.     Range(long startIndex, long length) {
  224.       if(length<0) {
  225.         throw new RuntimeException("length can't be negative");
  226.       }
  227.       this.startIndex = startIndex;
  228.       this.length = length;
  229.     }
  230.     
  231.     Range() {
  232.       this(0,0);
  233.     }
  234.     
  235.     /**
  236.      * Get the start index. Start index in inclusive.
  237.      * @return startIndex. 
  238.      */
  239.     long getStartIndex() {
  240.       return startIndex;
  241.     }
  242.     
  243.     /**
  244.      * Get the end index. End index is exclusive.
  245.      * @return endIndex.
  246.      */
  247.     long getEndIndex() {
  248.       return startIndex + length;
  249.     }
  250.     
  251.    /**
  252.     * Get Length.
  253.     * @return length
  254.     */
  255.     long getLength() {
  256.       return length;
  257.     }
  258.     
  259.     /**
  260.      * Range is empty if its length is zero.
  261.      * @return <code>true</code> if empty
  262.      *         <code>false</code> otherwise.
  263.      */
  264.     boolean isEmpty() {
  265.       return length==0;
  266.     }
  267.     
  268.     public boolean equals(Object o) {
  269.       if(o!=null && o instanceof Range) {
  270.         Range range = (Range)o;
  271.         return startIndex==range.startIndex &&
  272.         length==range.length;
  273.       }
  274.       return false;
  275.     }
  276.     
  277.     public int hashCode() {
  278.       return Long.valueOf(startIndex).hashCode() +
  279.           Long.valueOf(length).hashCode();
  280.     }
  281.     
  282.     public int compareTo(Range o) {
  283.       if(this.equals(o)) {
  284.         return 0;
  285.       }
  286.       return (this.startIndex > o.startIndex) ? 1:-1;
  287.     }
  288.     public void readFields(DataInput in) throws IOException {
  289.       startIndex = in.readLong();
  290.       length = in.readLong();
  291.     }
  292.     public void write(DataOutput out) throws IOException {
  293.       out.writeLong(startIndex);
  294.       out.writeLong(length);
  295.     }
  296.     
  297.     public String toString() {
  298.       return startIndex +":" + length;
  299.     }    
  300.   }
  301.   
  302.   /**
  303.    * Index Iterator which skips the stored ranges.
  304.    */
  305.   static class SkipRangeIterator implements Iterator<Long> {
  306.     Iterator<Range> rangeIterator;
  307.     Range range = new Range();
  308.     long next = -1;
  309.     
  310.     /**
  311.      * Constructor
  312.      * @param rangeIterator the iterator which gives the ranges.
  313.      */
  314.     SkipRangeIterator(Iterator<Range> rangeIterator) {
  315.       this.rangeIterator = rangeIterator;
  316.       doNext();
  317.     }
  318.     
  319.     /**
  320.      * Returns true till the index reaches Long.MAX_VALUE.
  321.      * @return <code>true</code> next index exists.
  322.      *         <code>false</code> otherwise.
  323.      */
  324.     public synchronized boolean hasNext() {
  325.       return next<Long.MAX_VALUE;
  326.     }
  327.     
  328.     /**
  329.      * Get the next available index. The index starts from 0.
  330.      * @return next index
  331.      */
  332.     public synchronized Long next() {
  333.       long ci = next;
  334.       doNext();
  335.       return ci;
  336.     }
  337.     
  338.     private void doNext() {
  339.       next++;
  340.       LOG.debug("currentIndex "+next +"   "+range);
  341.       skipIfInRange();
  342.       while(next>=range.getEndIndex() && rangeIterator.hasNext()) {
  343.         range = rangeIterator.next();
  344.         skipIfInRange();
  345.       }
  346.     }
  347.     
  348.     private void skipIfInRange() {
  349.       if(next>=range.getStartIndex() && 
  350.           next<range.getEndIndex()) {
  351.         //need to skip the range
  352.         LOG.warn("Skipping index " + next +"-" + range.getEndIndex());
  353.         next = range.getEndIndex();
  354.         
  355.       }
  356.     }
  357.     
  358.     /**
  359.      * Get whether all the ranges have been skipped.
  360.      * @return <code>true</code> if all ranges have been skipped.
  361.      *         <code>false</code> otherwise.
  362.      */
  363.     synchronized boolean skippedAllRanges() {
  364.       return !rangeIterator.hasNext() && next>range.getEndIndex();
  365.     }
  366.     
  367.     /**
  368.      * Remove is not supported. Doesn't apply.
  369.      */
  370.     public void remove() {
  371.       throw new UnsupportedOperationException("remove not supported.");
  372.     }
  373.     
  374.   }
  375. }