StreamDemultiplexor.java
上传用户:demmber
上传日期:2007-12-22
资源大小:717k
文件大小:23k
源码类别:

Java编程

开发平台:

Java

  1. /*
  2.  * @(#)StreamDemultiplexor.java 0.3-3 06/05/2001
  3.  *
  4.  *  This file is part of the HTTPClient package
  5.  *  Copyright (C) 1996-2001 Ronald Tschal鋜
  6.  *
  7.  *  This library is free software; you can redistribute it and/or
  8.  *  modify it under the terms of the GNU Lesser General Public
  9.  *  License as published by the Free Software Foundation; either
  10.  *  version 2 of the License, or (at your option) any later version.
  11.  *
  12.  *  This library is distributed in the hope that it will be useful,
  13.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  14.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  15.  *  Lesser General Public License for more details.
  16.  *
  17.  *  You should have received a copy of the GNU Lesser General Public
  18.  *  License along with this library; if not, write to the Free
  19.  *  Software Foundation, Inc., 59 Temple Place, Suite 330, Boston,
  20.  *  MA 02111-1307, USA
  21.  *
  22.  *  For questions, suggestions, bug-reports, enhancement-requests etc.
  23.  *  I may be contacted at:
  24.  *
  25.  *  ronald@innovation.ch
  26.  *
  27.  *  The HTTPClient's home page is located at:
  28.  *
  29.  *  http://www.innovation.ch/java/HTTPClient/ 
  30.  *
  31.  */
  32. package HTTPClient;
  33. import java.io.IOException;
  34. import java.io.EOFException;
  35. import java.io.InterruptedIOException;
  36. import java.net.Socket;
  37. import java.net.SocketException;
  38. /**
  39.  * This class handles the demultiplexing of input stream. This is needed
  40.  * for things like keep-alive in HTTP/1.0, persist in HTTP/1.1 and in HTTP-NG.
  41.  *
  42.  * @version 0.3-3  06/05/2001
  43.  * @author Ronald Tschal鋜
  44.  */
  45. class StreamDemultiplexor implements GlobalConstants
  46. {
  47.     /** the protocol were handling request for */
  48.     private int                    Protocol;
  49.     /** the connection we're working for */
  50.     private HTTPConnection         Connection;
  51.     /** the input stream to demultiplex */
  52.     private BufferedInputStream    Stream;
  53.     /** the socket this hangs off */
  54.     private Socket                 Sock = null;
  55.     /** signals after the closing of which stream to close the socket */
  56.     private ResponseHandler        MarkedForClose;
  57.     /** timer used to close the socket if unused for a given time */
  58.     private SocketTimeout.TimeoutEntry Timer = null;
  59.     /** timer thread which implements the timers */
  60.     private static SocketTimeout   TimerThread = null;
  61.     /** cleanup object to stop timer thread when we're gc'd */
  62.     private static Object          cleanup;
  63.     /** a Vector to hold the list of response handlers were serving */
  64.     private LinkedList             RespHandlerList;
  65.     /** number of unread bytes in current chunk (if transf-enc == chunked) */
  66.     private long                   chunk_len;
  67.     /** the currently set timeout for the socket */
  68.     private int                    cur_timeout = 0;
  69.     static
  70.     {
  71. TimerThread = new SocketTimeout(60);
  72. TimerThread.start();
  73. /* This is here to clean up the timer thread should the
  74.  * StreamDemultiplexor class be gc'd. This will not usually happen,
  75.  * unless the stuff is being run in an Applet or similar environment
  76.  * where multiple classloaders are used to load the same class
  77.  * multiple times. However, even in those environments it's not clear
  78.  * that this here will do us any good, because classes aren't usually
  79.  * gc'd unless their classloader is, but the timer thread keeps a
  80.  * reference to the classloader, and hence ought to prevent the
  81.  * classloader from being gc'd.
  82.  */
  83. cleanup = new Object() {
  84.     private final SocketTimeout timer = StreamDemultiplexor.TimerThread;
  85.     protected void finalize()
  86.     {
  87. timer.kill();
  88.     }
  89. };
  90.     }
  91.     // Constructors
  92.     /**
  93.      * a simple contructor.
  94.      *
  95.      * @param protocol   the protocol used on this stream.
  96.      * @param sock       the socket which we're to demux.
  97.      * @param connection the http-connection this socket belongs to.
  98.      */
  99.     StreamDemultiplexor(int protocol, Socket sock, HTTPConnection connection)
  100.     throws IOException
  101.     {
  102. this.Protocol   = protocol;
  103. this.Connection = connection;
  104. RespHandlerList = new LinkedList();
  105. init(sock);
  106.     }
  107.     /**
  108.      * Initializes the demultiplexor with a new socket.
  109.      *
  110.      * @param stream   the stream to demultiplex
  111.      */
  112.     private void init(Socket sock)  throws IOException
  113.     {
  114. Log.write(Log.DEMUX, "Demux: Initializing Stream Demultiplexor (" +
  115.      this.hashCode() + ")");
  116. this.Sock       = sock;
  117. this.Stream     = new BufferedInputStream(sock.getInputStream());
  118. MarkedForClose  = null;
  119. chunk_len       = -1;
  120. // create a timer to close the socket after 60 seconds, but don't
  121. // start it yet
  122. Timer = TimerThread.setTimeout(this);
  123. Timer.hyber();
  124.     }
  125.     // Methods
  126.     /**
  127.      * Each Response must register with us.
  128.      */
  129.     void register(Response resp_handler, Request req)  throws RetryException
  130.     {
  131. synchronized (RespHandlerList)
  132. {
  133.     if (Sock == null)
  134. throw new RetryException();
  135.     RespHandlerList.addToEnd(
  136. new ResponseHandler(resp_handler, req, this));
  137. }
  138.     }
  139.     /**
  140.      * creates an input stream for the response.
  141.      *
  142.      * @param resp the response structure requesting the stream
  143.      * @return an InputStream
  144.      */
  145.     RespInputStream getStream(Response resp)
  146.     {
  147. ResponseHandler resph;
  148. synchronized (RespHandlerList)
  149. {
  150.     for (resph = (ResponseHandler) RespHandlerList.enumerate();
  151.  resph != null;
  152.  resph = (ResponseHandler) RespHandlerList.next())
  153.     {
  154. if (resph.resp == resp)  break;
  155.     }
  156. }
  157. if (resph != null)
  158.     return resph.stream;
  159. else
  160.     return null;
  161.     }
  162.     /**
  163.      * Restarts the timer thread that will close an unused socket after
  164.      * 60 seconds.
  165.      */
  166.     void restartTimer()
  167.     {
  168. if (Timer != null)  Timer.reset();
  169.     }
  170.     /**
  171.      * reads an array of bytes from the master stream.
  172.      */
  173.     int read(byte[] b, int off, int len, ResponseHandler resph, int timeout)
  174.     throws IOException
  175.     {
  176. if (resph.exception != null)
  177. {
  178.     resph.exception.fillInStackTrace();
  179.     throw resph.exception;
  180. }
  181. if (resph.eof)
  182.     return -1;
  183. // read the headers and data for all responses preceding us.
  184. ResponseHandler head;
  185. while ((head = (ResponseHandler) RespHandlerList.getFirst()) != null  &&
  186. head != resph)
  187. {
  188.     try
  189. { head.stream.readAll(timeout); }
  190.     catch (IOException ioe)
  191.     {
  192. if (ioe instanceof InterruptedIOException)
  193.     throw ioe;
  194. else
  195. {
  196.     resph.exception.fillInStackTrace();
  197.     throw resph.exception;
  198. }
  199.     }
  200. }
  201. // Now we can read from the stream.
  202. synchronized (this)
  203. {
  204.     if (resph.exception != null)
  205.     {
  206. resph.exception.fillInStackTrace();
  207. throw resph.exception;
  208.     }
  209.     if (resph.resp.cd_type != CD_HDRS)
  210. Log.write(Log.DEMUX, "Demux: Reading for stream " +
  211.      resph.stream.hashCode());
  212.     if (Timer != null)  Timer.hyber();
  213.     try
  214.     {
  215. int rcvd = -1;
  216. if (timeout != cur_timeout)
  217. {
  218.     Log.write(Log.DEMUX, "Demux: Setting timeout to " +
  219.  timeout + " ms");
  220.     Sock.setSoTimeout(timeout);
  221.     cur_timeout = timeout;
  222. }
  223. switch (resph.resp.cd_type)
  224. {
  225.     case CD_HDRS:
  226. rcvd = Stream.read(b, off, len);
  227. if (rcvd == -1)
  228.     throw new EOFException("Premature EOF encountered");
  229. break;
  230.     case CD_0:
  231. rcvd = -1;
  232. close(resph);
  233. break;
  234.     case CD_CLOSE:
  235. rcvd = Stream.read(b, off, len);
  236. if (rcvd == -1)
  237.     close(resph);
  238. break;
  239.     case CD_CONTLEN:
  240. int cl = resph.resp.ContentLength;
  241. if (len > cl - resph.stream.count)
  242.     len = cl - resph.stream.count;
  243. rcvd = Stream.read(b, off, len);
  244. if (rcvd == -1)
  245.     throw new EOFException("Premature EOF encountered");
  246. if (resph.stream.count+rcvd == cl)
  247.     close(resph);
  248. break;
  249.     case CD_CHUNKED:
  250. if (chunk_len == -1) // it's a new chunk
  251.     chunk_len = Codecs.getChunkLength(Stream);
  252. if (chunk_len > 0) // it's data
  253. {
  254.     if (len > chunk_len)  len = (int) chunk_len;
  255.     rcvd = Stream.read(b, off, len);
  256.     if (rcvd == -1)
  257. throw new EOFException("Premature EOF encountered");
  258.     chunk_len -= rcvd;
  259.     if (chunk_len == 0) // got the whole chunk
  260.     {
  261. Stream.read(); // CR
  262. Stream.read(); // LF
  263. chunk_len = -1;
  264.     }
  265. }
  266. else // the footers (trailers)
  267. {
  268.     resph.resp.readTrailers(Stream);
  269.     rcvd = -1;
  270.     close(resph);
  271.     chunk_len = -1;
  272. }
  273. break;
  274.     case CD_MP_BR:
  275. byte[] endbndry = resph.getEndBoundary(Stream);
  276. int[]  end_cmp  = resph.getEndCompiled(Stream);
  277. rcvd = Stream.read(b, off, len);
  278. if (rcvd == -1)
  279.     throw new EOFException("Premature EOF encountered");
  280. int ovf = Stream.pastEnd(endbndry, end_cmp);
  281. if (ovf != -1)
  282. {
  283.     rcvd -= ovf;
  284.     close(resph);
  285. }
  286. break;
  287.     default:
  288. throw new Error("Internal Error in StreamDemultiplexor: " +
  289. "Invalid cd_type " + resph.resp.cd_type);
  290. }
  291. restartTimer();
  292. return rcvd;
  293.     }
  294.     catch (InterruptedIOException ie) // don't intercept this one
  295.     {
  296. restartTimer();
  297. throw ie;
  298.     }
  299.     catch (IOException ioe)
  300.     {
  301. Log.write(Log.DEMUX, "Demux: ", ioe);
  302. close(ioe, true);
  303. throw resph.exception; // set by retry_requests
  304.     }
  305.     catch (ParseException pe)
  306.     {
  307. Log.write(Log.DEMUX, "Demux: ", pe);
  308. close(new IOException(pe.toString()), true);
  309. throw resph.exception; // set by retry_requests
  310.     }
  311. }
  312.     }
  313.     /**
  314.      * skips a number of bytes in the master stream. This is done via a
  315.      * dummy read, as the socket input stream doesn't like skip()'s.
  316.      */
  317.     synchronized long skip(long num, ResponseHandler resph) throws IOException
  318.     {
  319. if (resph.exception != null)
  320. {
  321.     resph.exception.fillInStackTrace();
  322.     throw resph.exception;
  323. }
  324. if (resph.eof)
  325.     return 0;
  326. byte[] dummy = new byte[(int) num];
  327. int rcvd = read(dummy, 0, (int) num, resph, 0);
  328. if (rcvd == -1)
  329.     return 0;
  330. else
  331.     return rcvd;
  332.     }
  333.     /**
  334.      * Determines the number of available bytes. If <var>resph</var> is null, return
  335.      * available bytes on the socket stream itself (used by HTTPConnection).
  336.      */
  337.     synchronized int available(ResponseHandler resph) throws IOException
  338.     {
  339. if (resph != null  &&  resph.exception != null)
  340. {
  341.     resph.exception.fillInStackTrace();
  342.     throw resph.exception;
  343. }
  344. if (resph != null  &&  resph.eof)
  345.     return 0;
  346. int avail = Stream.available();
  347. if (resph == null)
  348.   return avail;
  349. switch (resph.resp.cd_type)
  350. {
  351.     case CD_0:
  352. return 0;
  353.     case CD_HDRS:
  354. // this is something of a hack; I could return 0, but then
  355. // if you were waiting for something on a response that
  356. // wasn't first in line (and you didn't try to read the
  357. // other response) you'd wait forever. On the other hand,
  358. // we might be making a false promise here...
  359. return (avail > 0 ? 1 : 0);
  360.     case CD_CLOSE:
  361. return avail;
  362.     case CD_CONTLEN:
  363. int cl = resph.resp.ContentLength;
  364. cl -= resph.stream.count;
  365. return (avail < cl ? avail : cl);
  366.     case CD_CHUNKED:
  367. return avail; // not perfect...
  368.     case CD_MP_BR:
  369. return avail; // not perfect...
  370.     default:
  371. throw new Error("Internal Error in StreamDemultiplexor: " +
  372. "Invalid cd_type " + resph.resp.cd_type);
  373. }
  374.     }
  375.     /**
  376.      * Closes the socket and all associated streams. If <var>exception</var>
  377.      * is not null then all active requests are retried.
  378.      *
  379.      * <P>There are five ways this method may be activated. 1) if an exception
  380.      * occurs during read or write. 2) if the stream is marked for close but
  381.      * no responses are outstanding (e.g. due to a timeout). 3) when the
  382.      * markedForClose response is closed. 4) if all response streams up until
  383.      * and including the markedForClose response have been closed. 5) if this
  384.      * demux is finalized.
  385.      *
  386.      * @param exception the IOException to be sent to the streams.
  387.      * @param was_reset if true then the exception is due to a connection
  388.      *                  reset; otherwise it means we generated the exception
  389.      *                  ourselves and this is a "normal" close.
  390.      */
  391.     synchronized void close(IOException exception, boolean was_reset)
  392.     {
  393. if (Sock == null) // already cleaned up
  394.     return;
  395. Log.write(Log.DEMUX, "Demux: Closing all streams and socket (" +
  396.      this.hashCode() + ")");
  397. try
  398.     { Stream.close(); }
  399. catch (IOException ioe) { }
  400. try
  401.     { Sock.close(); }
  402. catch (IOException ioe) { }
  403. Sock = null;
  404. if (Timer != null)
  405. {
  406.     Timer.kill();
  407.     Timer = null;
  408. }
  409. Connection.DemuxList.remove(this);
  410. // Here comes the tricky part: redo outstanding requests!
  411. if (exception != null)
  412.     synchronized (RespHandlerList)
  413. { retry_requests(exception, was_reset); }
  414.     }
  415.     /**
  416.      * Retries outstanding requests. Well, actually the RetryModule does
  417.      * that. Here we just throw a RetryException for each request so that
  418.      * the RetryModule can catch and handle them.
  419.      *
  420.      * @param exception the exception that led to this call.
  421.      * @param was_reset this flag is passed to the RetryException and is
  422.      *                  used by the RetryModule to distinguish abnormal closes
  423.      *                  from expected closes.
  424.      */
  425.     private void retry_requests(IOException exception, boolean was_reset)
  426.     {
  427. RetryException  first = null,
  428. prev  = null;
  429. ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
  430. while (resph != null)
  431. {
  432.     /* if the application is already reading the data then the
  433.      * response has already been handled. In this case we must
  434.      * throw the real exception.
  435.      */
  436.     if (resph.resp.got_headers)
  437.     {
  438. resph.exception = exception;
  439.     }
  440.     else
  441.     {
  442. RetryException tmp = new RetryException(exception.getMessage());
  443. if (first == null)  first = tmp;
  444. tmp.request    = resph.request;
  445. tmp.response   = resph.resp;
  446. tmp.exception  = exception;
  447. tmp.conn_reset = was_reset;
  448. tmp.first      = first;
  449. tmp.addToListAfter(prev);
  450. prev = tmp;
  451. resph.exception = tmp;
  452.     }
  453.     RespHandlerList.remove(resph);
  454.     resph = (ResponseHandler) RespHandlerList.next();
  455. }
  456.     }
  457.     /**
  458.      * Closes the associated stream. If this one has been markedForClose then
  459.      * the socket is closed; else closeSocketIfAllStreamsClosed is invoked.
  460.      */
  461.     private void close(ResponseHandler resph)
  462.     {
  463. synchronized (RespHandlerList)
  464. {
  465.     if (resph != (ResponseHandler) RespHandlerList.getFirst())
  466. return;
  467.     Log.write(Log.DEMUX, "Demux: Closing stream " +
  468.  resph.stream.hashCode());
  469.     resph.eof = true;
  470.     RespHandlerList.remove(resph);
  471. }
  472. if (resph == MarkedForClose)
  473.     close(new IOException("Premature end of Keep-Alive"), false);
  474. else
  475.     closeSocketIfAllStreamsClosed();
  476.     }
  477.     /**
  478.      * Close the socket if all the streams have been closed.
  479.      *
  480.      * <P>When a stream reaches eof it is removed from the response handler
  481.      * list, but when somebody close()'s the response stream it is just
  482.      * marked as such. This means that all responses in the list have either
  483.      * not been read at all or only partially read, but they might have been
  484.      * close()'d meaning that nobody is interested in the data. So If all the
  485.      * response streams up till and including the one markedForClose have
  486.      * been close()'d then we can remove them from our list and close the
  487.      * socket.
  488.      *
  489.      * <P>Note: if the response list is emtpy or if no response is
  490.      * markedForClose then this method does nothing. Specifically it does
  491.      * not close the socket. We only want to close the socket if we've been
  492.      * told to do so.
  493.      *
  494.      * <P>Also note that there might still be responses in the list after
  495.      * the markedForClose one. These are due to us having pipelined more
  496.      * requests to the server than it's willing to serve on a single
  497.      * connection. These requests will be retried if possible.
  498.      */
  499.     synchronized void closeSocketIfAllStreamsClosed()
  500.     {
  501. synchronized (RespHandlerList)
  502. {
  503.     ResponseHandler resph = (ResponseHandler) RespHandlerList.enumerate();
  504.     while (resph != null  &&  resph.stream.closed)
  505.     {
  506. if (resph == MarkedForClose)
  507. {
  508.     // remove all response handlers first
  509.     ResponseHandler tmp;
  510.     do
  511.     {
  512. tmp = (ResponseHandler) RespHandlerList.getFirst();
  513. RespHandlerList.remove(tmp);
  514.     }
  515.     while (tmp != resph);
  516.     // close the socket
  517.     close(new IOException("Premature end of Keep-Alive"), false);
  518.     return;
  519. }
  520. resph = (ResponseHandler) RespHandlerList.next();
  521.     }
  522. }
  523.     }
  524.     /**
  525.      * returns the socket associated with this demux
  526.      */
  527.     synchronized Socket getSocket()
  528.     {
  529. if (MarkedForClose != null)
  530.     return null;
  531. if (Timer != null)  Timer.hyber();
  532. return Sock;
  533.     }
  534.     /**
  535.      * Mark this demux to not accept any more request and to close the
  536.      * stream after this <var>resp</var>onse or all requests have been
  537.      * processed, or close immediately if no requests are registered.
  538.      *
  539.      * @param response the Response after which the connection should
  540.      *                 be closed.
  541.      */
  542.     synchronized void markForClose(Response resp)
  543.     {
  544. synchronized (RespHandlerList)
  545. {
  546.     if (RespHandlerList.getFirst() == null) // no active request,
  547.     { // so close the socket
  548. close(new IOException("Premature end of Keep-Alive"), false);
  549. return;
  550.     }
  551.     if (Timer != null)
  552.     {
  553. Timer.kill();
  554. Timer = null;
  555.     }
  556.     ResponseHandler resph, lasth = null;
  557.     for (resph = (ResponseHandler) RespHandlerList.enumerate();
  558.  resph != null;
  559.  resph = (ResponseHandler) RespHandlerList.next())
  560.     {
  561. if (resph.resp == resp) // new resp precedes any others
  562. {
  563.     MarkedForClose = resph;
  564.     Log.write(Log.DEMUX, "Demux: stream " +
  565.  resp.inp_stream.hashCode() +
  566.  " marked for close");
  567.     closeSocketIfAllStreamsClosed();
  568.     return;
  569. }
  570. if (MarkedForClose == resph)
  571.     return; // already marked for closing after an earlier resp
  572. lasth = resph;
  573.     }
  574.     if (lasth == null)
  575. return;
  576.     MarkedForClose = lasth; // resp == null, so use last resph
  577.     closeSocketIfAllStreamsClosed();
  578.     Log.write(Log.DEMUX, "Demux: stream " + lasth.stream.hashCode() +
  579.  " marked for close");
  580. }
  581.     }
  582.     /**
  583.      * Emergency stop. Closes the socket and notifies the responses that
  584.      * the requests are aborted.
  585.      *
  586.      * @since V0.3
  587.      */
  588.     void abort()
  589.     {
  590. Log.write(Log.DEMUX, "Demux: Aborting socket (" + this.hashCode() + ")");
  591. // notify all responses of abort
  592. synchronized (RespHandlerList)
  593. {
  594.     for (ResponseHandler resph =
  595. (ResponseHandler) RespHandlerList.enumerate();
  596.  resph != null;
  597.  resph = (ResponseHandler) RespHandlerList.next())
  598.     {
  599. if (resph.resp.http_resp != null)
  600.     resph.resp.http_resp.markAborted();
  601. if (resph.exception == null)
  602.     resph.exception = new IOException("Request aborted by user");
  603.     }
  604.     /* Close the socket.
  605.      * Note: this duplicates most of close(IOException, boolean). We
  606.      * do *not* call close() because that is synchronized, but we want
  607.      * abort() to be asynch.
  608.      */
  609.     if (Sock != null)
  610.     {
  611. try
  612. {
  613.     try
  614. { Sock.setSoLinger(false, 0); }
  615.     catch (SocketException se)
  616. { }
  617.     try
  618. { Stream.close(); }
  619.     catch (IOException ioe) { }
  620.     try
  621. { Sock.close(); }
  622.     catch (IOException ioe) { }
  623.     Sock = null;
  624.     if (Timer != null)
  625.     {
  626. Timer.kill();
  627. Timer = null;
  628.     }
  629. }
  630. catch (NullPointerException npe)
  631.     { }
  632. Connection.DemuxList.remove(this);
  633.     }
  634. }
  635.     }
  636.     /**
  637.      * A safety net to close the connection.
  638.      */
  639.     protected void finalize() throws Throwable
  640.     {
  641. close((IOException) null, false);
  642. super.finalize();
  643.     }
  644.     /**
  645.      * produces a string.
  646.      * @return a string containing the class name and protocol number
  647.      */
  648.     public String toString()
  649.     {
  650. String prot;
  651. switch (Protocol)
  652. {
  653.     case HTTP:
  654. prot = "HTTP"; break;
  655.     case HTTPS:
  656. prot = "HTTPS"; break;
  657.     case SHTTP:
  658. prot = "SHTTP"; break;
  659.     case HTTP_NG:
  660. prot = "HTTP_NG"; break;
  661.     default:
  662. throw new Error("HTTPClient Internal Error: invalid protocol " +
  663. Protocol);
  664. }
  665. return getClass().getName() + "[Protocol=" + prot + "]";
  666.     }
  667. }
  668. /**
  669.  * This thread is used to reap idle connections. It is NOT used to timeout
  670.  * reads or writes on a socket. It keeps a list of timer entries and expires
  671.  * them after a given time.
  672.  */
  673. class SocketTimeout extends Thread
  674. {
  675.     private boolean alive = true;
  676.     /**
  677.      * This class represents a timer entry. It is used to close an
  678.      * inactive socket after n seconds. Once running, the timer may be
  679.      * suspended (hyber()), restarted (reset()), or aborted (kill()).
  680.      * When the timer expires it invokes markForClose() on the
  681.      * associated stream demultipexer.
  682.      */
  683.     class TimeoutEntry
  684.     {
  685. boolean restart = false,
  686. hyber   = false,
  687. alive   = true;
  688. StreamDemultiplexor demux;
  689. TimeoutEntry next = null,
  690.      prev = null;
  691. TimeoutEntry(StreamDemultiplexor demux)
  692. {
  693.     this.demux = demux;
  694. }
  695. void reset()
  696. {
  697.     hyber = false;
  698.     if (restart)  return;
  699.     restart = true;
  700.     synchronized (time_list)
  701.     {
  702. if (!alive)  return;
  703. // remove from current position
  704. next.prev = prev;
  705. prev.next = next;
  706. // and add to end of timeout list
  707. next = time_list[current];
  708. prev = time_list[current].prev;
  709. prev.next = this;
  710. next.prev = this; 
  711.     }
  712. }
  713. void hyber()
  714. {
  715.     if (alive)  hyber = true;
  716. }
  717. void kill()
  718. {
  719.     alive   = false;
  720.     restart = false;
  721.     hyber   = false;
  722.     synchronized (time_list)
  723.     {
  724. if (prev == null)  return;
  725. next.prev = prev;
  726. prev.next = next;
  727. prev = null;
  728.     }
  729. }
  730.     }
  731.     TimeoutEntry[] time_list; // jdk 1.1.x javac bug: these must not
  732.     int    current; //   be private!
  733.     SocketTimeout(int secs)
  734.     {
  735. super("SocketTimeout");
  736. try { setDaemon(true); }
  737. catch (SecurityException se) { } // Oh well...
  738. setPriority(MAX_PRIORITY);
  739. time_list = new TimeoutEntry[secs];
  740. for (int idx=0; idx<secs; idx++)
  741. {
  742.     time_list[idx] = new TimeoutEntry(null);
  743.     time_list[idx].next = time_list[idx].prev = time_list[idx];
  744. }
  745. current = 0;
  746.     }
  747.     public TimeoutEntry setTimeout(StreamDemultiplexor demux)
  748.     {
  749. TimeoutEntry entry = new TimeoutEntry(demux);
  750. synchronized (time_list)
  751. {
  752.     entry.next = time_list[current];
  753.     entry.prev = time_list[current].prev;
  754.     entry.prev.next = entry;
  755.     entry.next.prev = entry; 
  756. }
  757. return entry;
  758.     }
  759.     /**
  760.      * This timer is implemented by sleeping for 1 second and then
  761.      * checking the timer list.
  762.      */
  763.     public void run()
  764.     {
  765. TimeoutEntry marked = null;
  766. while (alive)
  767. {
  768.     try { sleep(1000L); } catch (InterruptedException ie) { }
  769.     synchronized (time_list)
  770.     {
  771. // reset all restart flags
  772. for (TimeoutEntry entry = time_list[current].next;
  773.      entry != time_list[current];
  774.      entry = entry.next)
  775. {
  776.     entry.restart = false;
  777. }
  778. current++;
  779. if (current >= time_list.length)
  780.     current = 0;
  781. // remove all expired timers 
  782. for (TimeoutEntry entry = time_list[current].next;
  783.      entry != time_list[current];
  784.      entry = entry.next)
  785. {
  786.     if (entry.alive  &&  !entry.hyber)
  787.     {
  788. TimeoutEntry prev = entry.prev;
  789. entry.kill();
  790. /* put on death row. Note: we must not invoke
  791.  * markForClose() here because it is synch'd
  792.  * and can therefore lead to a deadlock if that
  793.  * thread is trying to do a reset() or kill()
  794.  */
  795. entry.next = marked;
  796. marked = entry;
  797. entry = prev;
  798.     }
  799. }
  800.     }
  801.     while (marked != null)
  802.     {
  803. marked.demux.markForClose(null);
  804. marked = marked.next;
  805.     }
  806. }
  807.     }
  808.     /**
  809.      * Stop the timer thread.
  810.      */
  811.     public void kill() {
  812. alive = false;
  813.     }
  814. }