RespInputStream.java
上传用户:demmber
上传日期:2007-12-22
资源大小:717k
文件大小:8k
源码类别:

Java编程

开发平台:

Java

  1. /*
  2.  * @(#)RespInputStream.java 0.3-3 06/05/2001
  3.  *
  4.  *  This file is part of the HTTPClient package
  5.  *  Copyright (C) 1996-2001 Ronald Tschal鋜
  6.  *
  7.  *  This library is free software; you can redistribute it and/or
  8.  *  modify it under the terms of the GNU Lesser General Public
  9.  *  License as published by the Free Software Foundation; either
  10.  *  version 2 of the License, or (at your option) any later version.
  11.  *
  12.  *  This library is distributed in the hope that it will be useful,
  13.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  14.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15.  *  Lesser General Public License for more details.
  16.  *
  17.  *  You should have received a copy of the GNU Lesser General Public
  18.  *  License along with this library; if not, write to the Free
  19.  *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
  20.  *  MA 02111-1307, USA
  21.  *
  22.  *  For questions, suggestions, bug-reports, enhancement-requests etc.
  23.  *  I may be contacted at:
  24.  *
  25.  *  ronald@innovation.ch
  26.  *
  27.  *  The HTTPClient's home page is located at:
  28.  *
  29.  *  http://www.innovation.ch/java/HTTPClient/ 
  30.  *
  31.  */
  32. package HTTPClient;
  33. import java.io.InputStream;
  34. import java.io.IOException;
  35. import java.io.InterruptedIOException;
  36. /**
  37.  * This is the InputStream that gets returned to the user. The extensions
  38.  * consist of the capability to have the data pushed into a buffer if the
  39.  * stream demux needs to.
  40.  *
  41.  * @version 0.3-3  06/05/2001
  42.  * @author Ronald Tschal鋜
  43.  * @since V0.2
  44.  */
  45. final class RespInputStream extends InputStream implements GlobalConstants
  46. {
  47.     /** Use old behaviour: don't set a timeout when reading the response body */
  48.     private static boolean dontTimeoutBody = false;
  49.     /** the stream demultiplexor */
  50.     private StreamDemultiplexor demux = null;
  51.     /** our response handler */
  52.     private ResponseHandler     resph;
  53.     /** signals that the user has closed the stream and will therefore
  54. not read any further data */
  55.     boolean             closed = false;
  56.     /** signals that the connection may not be closed prematurely */
  57.     private boolean             dont_truncate = false;
  58.     /** this buffer is used to buffer data that the demux has to get rid of */
  59.     private byte[]              buffer = null;
  60.     /** signals that we were interrupted and that the buffer is not complete */
  61.     private boolean             interrupted = false;
  62.     /** the offset at which the unread data starts in the buffer */
  63.     private int                 offset = 0;
  64.     /** the end of the data in the buffer */
  65.     private int                 end = 0;
  66.     /** the total number of bytes of entity data read from the demux so far */
  67.             int                 count = 0;
  68.     static
  69.     {
  70. try
  71. {
  72.     dontTimeoutBody = Boolean.getBoolean("HTTPClient.dontTimeoutRespBody");
  73.     if (dontTimeoutBody)
  74. Log.write(Log.DEMUX, "RspIS: disabling timeouts when " +
  75.      "reading response body");
  76. }
  77. catch (Exception e)
  78.     { }
  79.     }
  80.     // Constructors
  81.     RespInputStream(StreamDemultiplexor demux, ResponseHandler resph)
  82.     {
  83. this.demux = demux;
  84. this.resph = resph;
  85.     }
  86.     // public Methods
  87.     private byte[] ch = new byte[1];
  88.     /**
  89.      * Reads a single byte.
  90.      *
  91.      * @return the byte read, or -1 if EOF.
  92.      * @exception IOException if any exception occured on the connection.
  93.      */
  94.     public synchronized int read() throws IOException
  95.     {
  96. int rcvd = read(ch, 0, 1);
  97. if (rcvd == 1)
  98.     return ch[0] & 0xff;
  99. else
  100.     return -1;
  101.     }
  102.     /**
  103.      * Reads <var>len</var> bytes into <var>b</var>, starting at offset
  104.      * <var>off</var>.
  105.      *
  106.      * @return the number of bytes actually read, or -1 if EOF.
  107.      * @exception IOException if any exception occured on the connection.
  108.      */
  109.     public synchronized int read(byte[] b, int off, int len) throws IOException
  110.     {
  111. if (closed)
  112.     return -1;
  113. int left = end - offset;
  114. if (buffer != null  &&  !(left == 0  &&  interrupted))
  115. {
  116.     if (left == 0)  return -1;
  117.     len = (len > left ? left : len);
  118.     System.arraycopy(buffer, offset, b, off, len);
  119.     offset += len;
  120.     return len;
  121. }
  122. else
  123. {
  124.     if (resph.resp.cd_type != CD_HDRS)
  125. Log.write(Log.DEMUX, "RspIS: Reading stream " + this.hashCode());
  126.     int rcvd;
  127.     if (dontTimeoutBody  &&  resph.resp.cd_type != CD_HDRS)
  128. rcvd = demux.read(b, off, len, resph, 0);
  129.     else
  130. rcvd = demux.read(b, off, len, resph, resph.resp.timeout);
  131.     if (rcvd != -1  &&  resph.resp.got_headers)
  132. count += rcvd;
  133.     return rcvd;
  134. }
  135.     }
  136.     /**
  137.      * skips <var>num</var> bytes.
  138.      *
  139.      * @return the number of bytes actually skipped.
  140.      * @exception IOException if any exception occured on the connection.
  141.      */
  142.     public synchronized long skip(long num) throws IOException
  143.     {
  144. if (closed)
  145.     return 0;
  146. int left = end - offset;
  147. if (buffer != null  &&  !(left == 0  &&  interrupted))
  148. {
  149.     num = (num > left ? left : num);
  150.     offset  += num;
  151.     return num;
  152. }
  153. else
  154. {
  155.     long skpd = demux.skip(num, resph);
  156.     if (resph.resp.got_headers)
  157. count += skpd;
  158.     return skpd;
  159. }
  160.     }
  161.     /**
  162.      * gets the number of bytes available for reading without blocking.
  163.      *
  164.      * @return the number of bytes available.
  165.      * @exception IOException if any exception occured on the connection.
  166.      */
  167.     public synchronized int available() throws IOException
  168.     {
  169. if (closed)
  170.     return 0;
  171. if (buffer != null  &&  !(end-offset == 0  &&  interrupted))
  172.     return end-offset;
  173. else
  174.     return demux.available(resph);
  175.     }
  176.     /**
  177.      * closes the stream.
  178.      *
  179.      * @exception if any exception occured on the connection before or
  180.      *            during close.
  181.      */
  182.     public synchronized void close()  throws IOException
  183.     {
  184. if (!closed)
  185. {
  186.     closed = true;
  187.     if (dont_truncate  &&  (buffer == null  ||  interrupted))
  188. readAll(resph.resp.timeout);
  189.     Log.write(Log.DEMUX, "RspIS: User closed stream " + hashCode());
  190.     demux.closeSocketIfAllStreamsClosed();
  191.     if (dont_truncate)
  192.     {
  193. try
  194.     { resph.resp.http_resp.invokeTrailerHandlers(false); }
  195. catch (ModuleException me)
  196.     { throw new IOException(me.toString()); }
  197.     }
  198. }
  199.     }
  200.     /**
  201.      * A safety net to clean up.
  202.      */
  203.     protected void finalize()  throws Throwable
  204.     {
  205. try
  206.     { close(); }
  207. finally
  208.     { super.finalize(); }
  209.     }
  210.     // local Methods
  211.     /**
  212.      * Reads all remainings data into buffer. This is used to force a read
  213.      * of upstream responses.
  214.      *
  215.      * <P>This is probably the most tricky and buggy method around. It's the
  216.      * only one that really violates the strict top-down method invocation
  217.      * from the Response through the ResponseStream to the StreamDemultiplexor.
  218.      * This means we need to be awfully careful about what is synchronized
  219.      * and what parameters are passed to whom.
  220.      *
  221.      * @param timeout the timeout to use for reading from the demux
  222.      * @exception IOException If any exception occurs while reading stream.
  223.      */
  224.     void readAll(int timeout)  throws IOException
  225.     {
  226. Log.write(Log.DEMUX, "RspIS: Read-all on stream " + this.hashCode());
  227. synchronized (resph.resp)
  228. {
  229.     if (!resph.resp.got_headers) // force headers to be read
  230.     {
  231. int sav_to = resph.resp.timeout;
  232. resph.resp.timeout = timeout;
  233. resph.resp.getStatusCode();
  234. resph.resp.timeout = sav_to;
  235.     }
  236. }
  237. synchronized (this)
  238. {
  239.     if (buffer != null  &&  !interrupted)  return;
  240.     int rcvd = 0;
  241.     try
  242.     {
  243. if (closed) // throw away
  244. {
  245.     buffer = new byte[10000];
  246.     do
  247.     {
  248. count += rcvd;
  249. rcvd   = demux.read(buffer, 0, buffer.length, resph,
  250.     timeout);
  251.     } while (rcvd != -1);
  252.     buffer = null;
  253. }
  254. else
  255. {
  256.     if (buffer == null)
  257.     {
  258. buffer = new byte[10000];
  259. offset = 0;
  260. end    = 0;
  261.     }
  262.     do
  263.     {
  264. rcvd = demux.read(buffer, end, buffer.length-end, resph,
  265.   timeout);
  266. if (rcvd < 0)  break;
  267. count  += rcvd;
  268. end    += rcvd;
  269. buffer  = Util.resizeArray(buffer, end+10000);
  270.     } while (true);
  271. }
  272.     }
  273.     catch (InterruptedIOException iioe)
  274.     {
  275. interrupted = true;
  276. throw iioe;
  277.     }
  278.     catch (IOException ioe)
  279.     {
  280. buffer = null; // force a read on demux for exception
  281.     }
  282.     interrupted = false;
  283. }
  284.     }
  285.     /**
  286.      * Sometime the full response body must be read, i.e. the connection may
  287.      * not be closed prematurely (by us). Currently this is needed when the
  288.      * chunked encoding with trailers is used in a response.
  289.      */
  290.     synchronized void dontTruncate()
  291.     {
  292. dont_truncate = true;
  293.     }
  294. }