ByteFIFO.java
上传用户:songled
上传日期:2022-07-14
资源大小:94k
文件大小:4k
源码类别:

进程与线程

开发平台:

Java

  1. public class ByteFIFO extends Object {
  2. private byte[] queue;
  3. private int capacity;
  4. private int size;
  5. private int head;
  6. private int tail;
  7. public ByteFIFO(int cap) {
  8. capacity = ( cap > 0 ) ? cap : 1; // at least 1
  9. queue = new byte[capacity];
  10. head = 0;
  11. tail = 0;
  12. size = 0;
  13. }
  14. public int getCapacity() {
  15. return capacity;
  16. }
  17. public synchronized int getSize() {
  18. return size;
  19. }
  20. public synchronized boolean isEmpty() {
  21. return ( size == 0 );
  22. }
  23. public synchronized boolean isFull() {
  24. return ( size == capacity );
  25. }
  26. public synchronized void add(byte b) 
  27. throws InterruptedException {
  28. waitWhileFull();
  29. queue[head] = b;
  30. head = ( head + 1 ) % capacity;
  31. size++;
  32. notifyAll(); // let any waiting threads know about change
  33. }
  34. public synchronized void add(byte[] list) 
  35. throws InterruptedException {
  36. // For efficiency, the bytes are copied in blocks
  37. // instead of one at a time. As space becomes available,
  38. // more bytes are copied until all of them have been
  39. // added.
  40. int ptr = 0;
  41. while ( ptr < list.length ) {
  42. // If full, the lock will be released to allow 
  43. // another thread to come in and remove bytes.
  44. waitWhileFull();
  45. int space = capacity - size;
  46. int distToEnd = capacity - head;
  47. int blockLen = Math.min(space, distToEnd);
  48. int bytesRemaining = list.length - ptr;
  49. int copyLen = Math.min(blockLen, bytesRemaining);
  50. System.arraycopy(list, ptr, queue, head, copyLen);
  51. head = ( head + copyLen ) % capacity;
  52. size += copyLen;
  53. ptr += copyLen;
  54. // Keep the lock, but let any waiting threads 
  55. // know that something has changed.
  56. notifyAll();
  57. }
  58. }
  59. public synchronized byte remove() 
  60. throws InterruptedException {
  61. waitWhileEmpty();
  62. byte b = queue[tail];
  63. tail = ( tail + 1 ) % capacity;
  64. size--;
  65. notifyAll(); // let any waiting threads know about change
  66. return b;
  67. }
  68. public synchronized byte[] removeAll() {
  69. // For efficiency, the bytes are copied in blocks
  70. // instead of one at a time. 
  71. if ( isEmpty() ) {
  72. // Nothing to remove, return a zero-length
  73. // array and do not bother with notification
  74. // since nothing was removed.
  75. return new byte[0]; 
  76. }
  77. // based on the current size
  78. byte[] list = new byte[size]; 
  79. // copy in the block from tail to the end
  80. int distToEnd = capacity - tail;
  81. int copyLen = Math.min(size, distToEnd);
  82. System.arraycopy(queue, tail, list, 0, copyLen);
  83. // If data wraps around, copy the remaining data
  84. // from the front of the array.
  85. if ( size > copyLen ) {
  86. System.arraycopy(
  87. queue, 0, list, copyLen, size - copyLen);
  88. }
  89. tail = ( tail + size ) % capacity;
  90. size = 0; // everything has been removed
  91. // Signal any and all waiting threads that 
  92. // something has changed.
  93. notifyAll(); 
  94. return list; 
  95. }
  96. public synchronized byte[] removeAtLeastOne() 
  97. throws InterruptedException {
  98. waitWhileEmpty(); // wait for a least one to be in FIFO
  99. return removeAll();
  100. }
  101. public synchronized boolean waitUntilEmpty(long msTimeout) 
  102. throws InterruptedException {
  103. if ( msTimeout == 0L ) {
  104. waitUntilEmpty();  // use other method
  105. return true;
  106. }
  107. // wait only for the specified amount of time
  108. long endTime = System.currentTimeMillis() + msTimeout;
  109. long msRemaining = msTimeout;
  110. while ( !isEmpty() && ( msRemaining > 0L ) ) {
  111. wait(msRemaining);
  112. msRemaining = endTime - System.currentTimeMillis();
  113. }
  114. // May have timed out, or may have met condition, 
  115. // calc return value.
  116. return isEmpty();
  117. }
  118. public synchronized void waitUntilEmpty() 
  119. throws InterruptedException {
  120. while ( !isEmpty() ) {
  121. wait();
  122. }
  123. }
  124. public synchronized void waitWhileEmpty() 
  125. throws InterruptedException {
  126. while ( isEmpty() ) {
  127. wait();
  128. }
  129. }
  130. public synchronized void waitUntilFull() 
  131. throws InterruptedException {
  132. while ( !isFull() ) {
  133. wait();
  134. }
  135. }
  136. public synchronized void waitWhileFull() 
  137. throws InterruptedException {
  138. while ( isFull() ) {
  139. wait();
  140. }
  141. }
  142. }