ThreadedInputStream.java
上传用户:songled
上传日期:2022-07-14
资源大小:94k
文件大小:7k
源码类别:

进程与线程

开发平台:

Java

  1. import java.io.*;
  2. // uses SureStop from chapter 16
  3. // uses ByteFIFO from chapter 18
  4. public class ThreadedInputStream extends FilterInputStream {
  5. private ByteFIFO buffer;
  6. private volatile boolean closeRequested;
  7. private volatile boolean eofDetected;
  8. private volatile boolean ioxDetected;
  9. private volatile String ioxMessage;
  10. private Thread internalThread;
  11. private volatile boolean noStopRequested;
  12. public ThreadedInputStream(InputStream in, int bufferSize) {
  13. super(in);
  14. buffer = new ByteFIFO(bufferSize);
  15. closeRequested = false;
  16. eofDetected = false;
  17. ioxDetected = false;
  18. ioxMessage = null;
  19. noStopRequested = true;
  20. Runnable r = new Runnable() {
  21. public void run() {
  22. try {
  23. runWork();
  24. } catch ( Exception x ) {
  25. // in case ANY exception slips through
  26. x.printStackTrace(); 
  27. }
  28. }
  29. };
  30. internalThread = new Thread(r);
  31. internalThread.setDaemon(true);
  32. internalThread.start();
  33. }
  34. public ThreadedInputStream(InputStream in) {
  35. this(in, 2048);
  36. }
  37. private void runWork() {
  38. byte[] workBuf = new byte[buffer.getCapacity()];
  39. try {
  40. while ( noStopRequested ) {
  41. int readCount = in.read(workBuf);
  42. if ( readCount == -1 ) {
  43. signalEOF();
  44. stopRequest();
  45. } else if ( readCount > 0 ) {
  46. addToBuffer(workBuf, readCount);
  47. }
  48. }
  49. } catch ( IOException iox ) {
  50. if ( !closeRequested ) {
  51. ioxMessage = iox.getMessage();
  52. signalIOX();
  53. }
  54. } catch ( InterruptedException x ) {
  55. // ignore
  56. } finally {
  57. // no matter what, make sure that eofDetected is set
  58. signalEOF();
  59. }
  60. }
  61. private void signalEOF() {
  62. synchronized ( buffer ) {
  63. eofDetected = true;
  64. buffer.notifyAll();
  65. }
  66. }
  67. private void signalIOX() {
  68. synchronized ( buffer ) {
  69. ioxDetected = true;
  70. buffer.notifyAll();
  71. }
  72. }
  73. private void signalClose() {
  74. synchronized ( buffer ) {
  75. closeRequested = true;
  76. buffer.notifyAll();
  77. }
  78. }
  79. private void addToBuffer(byte[] workBuf, int readCount) 
  80. throws InterruptedException {
  81. // Create an array exactly as large as the number of
  82. // bytes read and copy the data into it.
  83. byte[] addBuf = new byte[readCount];
  84. System.arraycopy(workBuf, 0, addBuf, 0, addBuf.length);
  85. buffer.add(addBuf);
  86. }
  87. private void stopRequest() {
  88. if ( noStopRequested ) {
  89. noStopRequested = false;
  90. internalThread.interrupt();
  91. }
  92. }
  93. public void close() throws IOException {
  94. if ( closeRequested ) {
  95. // already closeRequested, just return
  96. return;
  97. }
  98. signalClose();
  99. SureStop.ensureStop(internalThread, 10000);
  100. stopRequest();
  101. // Use a new thread to close "in" in case it blocks
  102. final InputStream localIn = in;
  103. Runnable r = new Runnable() {
  104. public void run() {
  105. try {
  106. localIn.close();
  107. } catch ( IOException iox ) {
  108. // ignore
  109. }
  110. }
  111. };
  112. Thread t = new Thread(r, "in-close");
  113. // give up when all other non-daemon threads die
  114. t.setDaemon(true);  
  115. t.start();
  116. }
  117. private void throwExceptionIfClosed() throws IOException {
  118. if ( closeRequested ) {
  119. throw new IOException("stream is closed");
  120. }
  121. }
  122. // Throws InterruptedIOException if the thread blocked on
  123. // read() is interrupted while waiting for data to arrive.
  124. public int read() 
  125. throws InterruptedIOException, IOException {
  126. // Using read(byte[]) to keep code in one place --makes
  127. // single-byte read less efficient, but simplifies 
  128. // the coding.
  129. byte[] data = new byte[1];
  130. int ret = read(data, 0, 1);
  131. if ( ret != 1 ) {
  132. return -1;
  133. }
  134. return data[0] & 0x000000FF;
  135. }
  136. // Throws InterruptedIOException if the thread blocked on
  137. // read() is interrupted while waiting for data to arrive.
  138. public int read(byte[] dest) 
  139. throws InterruptedIOException, IOException {
  140. return read(dest, 0, dest.length);
  141. }
  142. // Throws InterruptedIOException if the thread blocked on
  143. // read() is interrupted while waiting for data to arrive.
  144. public int read(
  145. byte[] dest, 
  146. int offset, 
  147. int length
  148. ) throws InterruptedIOException, IOException {
  149. throwExceptionIfClosed();
  150. if ( length < 1 ) {
  151. return 0;
  152. }
  153. if ( ( offset < 0 ) || 
  154.  ( ( offset + length ) > dest.length ) 
  155.    ) {
  156. throw new IllegalArgumentException(
  157. "offset must be at least 0, and " +
  158. "(offset + length) must be less than or " +
  159. "equal to dest.length. " +
  160. "offset=" + offset + 
  161. ", (offset + length )=" + ( offset + length ) +
  162. ", dest.length=" + dest.length);
  163. }
  164. byte[] data = removeUpTo(length);
  165. if ( data.length > 0 ) {
  166. System.arraycopy(data, 0, dest, offset, data.length);
  167. return data.length;
  168. }
  169. // no data
  170. if ( eofDetected ) {
  171. return -1;
  172. }
  173. // no data and not end of file, must be exception
  174. stopRequest();
  175. if ( ioxMessage == null ) {
  176. ioxMessage = "stream cannot be read";
  177. }
  178. throw new IOException(ioxMessage);
  179. }
  180. private byte[] removeUpTo(int maxRead) throws IOException {
  181. // Convenience method to assist read(byte[], int, int).
  182. // Waits until at least one byte is ready, EOF is 
  183. // detected,  an IOException is thrown, or the 
  184. // stream is closed.
  185. try {
  186. synchronized ( buffer ) {
  187. while ( buffer.isEmpty() && 
  188. !eofDetected && 
  189. !ioxDetected &&
  190. !closeRequested
  191.   ) {
  192. buffer.wait();
  193. }
  194. // If stream was closed while waiting, 
  195. // get out right away.
  196. throwExceptionIfClosed();
  197. // Ignore eof and exception flags for now, see 
  198. // if any data remains.
  199. byte[] data = buffer.removeAll();
  200. if ( data.length > maxRead ) {
  201. // Pulled out too many bytes, 
  202. // put excess back.
  203. byte[] putBackData = 
  204. new byte[data.length - maxRead];
  205. System.arraycopy(data, maxRead, 
  206. putBackData, 0, putBackData.length);
  207. buffer.add(putBackData);
  208. byte[] keepData = new byte[maxRead];
  209. System.arraycopy(data, 0, 
  210. keepData, 0, keepData.length);
  211. data = keepData;
  212. }
  213. return data;
  214. }
  215. } catch ( InterruptedException ix ) {
  216. // convert to an IOException
  217. throw new InterruptedIOException("interrupted " +
  218. "while waiting for data to arrive for reading");
  219. }
  220. }
  221. public long skip(long n) throws IOException {
  222. throwExceptionIfClosed();
  223. if ( n <= 0 ) {
  224. return 0;
  225. }
  226. int skipLen = (int) Math.min(n, Integer.MAX_VALUE);
  227. int readCount = read(new byte[skipLen]);
  228. if ( readCount < 0 ) {
  229. return 0;
  230. }
  231. return readCount;
  232. }
  233. public int available() throws IOException {
  234. throwExceptionIfClosed();
  235. return buffer.getSize();
  236. }
  237. public boolean markSupported() {
  238. return false;
  239. }
  240. public synchronized void mark(int readLimit) {
  241. // ignore method calls, mark not supported
  242. }
  243. public synchronized void reset() throws IOException {
  244. throw new IOException(
  245. "mark-reset not supported on this stream");
  246. }
  247. }