StreamBackedIterator.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:3k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.mapred.join;
  19. import java.io.ByteArrayInputStream;
  20. import java.io.ByteArrayOutputStream;
  21. import java.io.DataInputStream;
  22. import java.io.DataOutputStream;
  23. import java.io.IOException;
  24. import org.apache.hadoop.io.Writable;
  25. /**
  26.  * This class provides an implementation of ResetableIterator. This
  27.  * implementation uses a byte array to store elements added to it.
  28.  */
  29. public class StreamBackedIterator<X extends Writable>
  30.     implements ResetableIterator<X> {
  31.   private static class ReplayableByteInputStream extends ByteArrayInputStream {
  32.     public ReplayableByteInputStream(byte[] arr) {
  33.       super(arr);
  34.     }
  35.     public void resetStream() {
  36.       mark = 0;
  37.       reset();
  38.     }
  39.   }
  40.   private ByteArrayOutputStream outbuf = new ByteArrayOutputStream();
  41.   private DataOutputStream outfbuf = new DataOutputStream(outbuf);
  42.   private ReplayableByteInputStream inbuf;
  43.   private DataInputStream infbuf;
  44.   public StreamBackedIterator() { }
  45.   public boolean hasNext() {
  46.     return infbuf != null && inbuf.available() > 0;
  47.   }
  48.   public boolean next(X val) throws IOException {
  49.     if (hasNext()) {
  50.       inbuf.mark(0);
  51.       val.readFields(infbuf);
  52.       return true;
  53.     }
  54.     return false;
  55.   }
  56.   public boolean replay(X val) throws IOException {
  57.     inbuf.reset();
  58.     if (0 == inbuf.available())
  59.       return false;
  60.     val.readFields(infbuf);
  61.     return true;
  62.   }
  63.   public void reset() {
  64.     if (null != outfbuf) {
  65.       inbuf = new ReplayableByteInputStream(outbuf.toByteArray());
  66.       infbuf =  new DataInputStream(inbuf);
  67.       outfbuf = null;
  68.     }
  69.     inbuf.resetStream();
  70.   }
  71.   public void add(X item) throws IOException {
  72.     item.write(outfbuf);
  73.   }
  74.   public void close() throws IOException {
  75.     if (null != infbuf)
  76.       infbuf.close();
  77.     if (null != outfbuf)
  78.       outfbuf.close();
  79.   }
  80.   public void clear() {
  81.     if (null != inbuf)
  82.       inbuf.resetStream();
  83.     outbuf.reset();
  84.     outfbuf = new DataOutputStream(outbuf);
  85.   }
  86. }