SocketOutputStream.java
上传用户:quxuerui
上传日期:2018-01-08
资源大小:41811k
文件大小:8k
源码类别:

网格计算

开发平台:

Java

  1. /**
  2.  * Licensed to the Apache Software Foundation (ASF) under one
  3.  * or more contributor license agreements.  See the NOTICE file
  4.  * distributed with this work for additional information
  5.  * regarding copyright ownership.  The ASF licenses this file
  6.  * to you under the Apache License, Version 2.0 (the
  7.  * "License"); you may not use this file except in compliance
  8.  * with the License.  You may obtain a copy of the License at
  9.  *
  10.  *     http://www.apache.org/licenses/LICENSE-2.0
  11.  *
  12.  * Unless required by applicable law or agreed to in writing, software
  13.  * distributed under the License is distributed on an "AS IS" BASIS,
  14.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15.  * See the License for the specific language governing permissions and
  16.  * limitations under the License.
  17.  */
  18. package org.apache.hadoop.net;
  19. import java.io.EOFException;
  20. import java.io.IOException;
  21. import java.io.OutputStream;
  22. import java.net.Socket;
  23. import java.net.SocketTimeoutException;
  24. import java.nio.ByteBuffer;
  25. import java.nio.channels.FileChannel;
  26. import java.nio.channels.SelectableChannel;
  27. import java.nio.channels.SelectionKey;
  28. import java.nio.channels.WritableByteChannel;
  29. /**
  30.  * This implements an output stream that can have a timeout while writing.
  31.  * This sets non-blocking flag on the socket channel.
  32.  * So after creating this object , read() on 
  33.  * {@link Socket#getInputStream()} and write() on 
  34.  * {@link Socket#getOutputStream()} on the associated socket will throw 
  35.  * llegalBlockingModeException.
  36.  * Please use {@link SocketInputStream} for reading.
  37.  */
  38. public class SocketOutputStream extends OutputStream 
  39.                                 implements WritableByteChannel {                                
  40.   
  41.   private Writer writer;
  42.   
  43.   private static class Writer extends SocketIOWithTimeout {
  44.     WritableByteChannel channel;
  45.     
  46.     Writer(WritableByteChannel channel, long timeout) throws IOException {
  47.       super((SelectableChannel)channel, timeout);
  48.       this.channel = channel;
  49.     }
  50.     
  51.     int performIO(ByteBuffer buf) throws IOException {
  52.       return channel.write(buf);
  53.     }
  54.   }
  55.   
  56.   /**
  57.    * Create a new ouput stream with the given timeout. If the timeout
  58.    * is zero, it will be treated as infinite timeout. The socket's
  59.    * channel will be configured to be non-blocking.
  60.    * 
  61.    * @param channel 
  62.    *        Channel for writing, should also be a {@link SelectableChannel}.  
  63.    *        The channel will be configured to be non-blocking.
  64.    * @param timeout timeout in milliseconds. must not be negative.
  65.    * @throws IOException
  66.    */
  67.   public SocketOutputStream(WritableByteChannel channel, long timeout) 
  68.                                                          throws IOException {
  69.     SocketIOWithTimeout.checkChannelValidity(channel);
  70.     writer = new Writer(channel, timeout);
  71.   }
  72.   
  73.   /**
  74.    * Same as SocketOutputStream(socket.getChannel(), timeout):<br><br>
  75.    * 
  76.    * Create a new ouput stream with the given timeout. If the timeout
  77.    * is zero, it will be treated as infinite timeout. The socket's
  78.    * channel will be configured to be non-blocking.
  79.    * 
  80.    * @see SocketOutputStream#SocketOutputStream(WritableByteChannel, long)
  81.    *  
  82.    * @param socket should have a channel associated with it.
  83.    * @param timeout timeout timeout in milliseconds. must not be negative.
  84.    * @throws IOException
  85.    */
  86.   public SocketOutputStream(Socket socket, long timeout) 
  87.                                          throws IOException {
  88.     this(socket.getChannel(), timeout);
  89.   }
  90.   
  91.   public void write(int b) throws IOException {
  92.     /* If we need to, we can optimize this allocation.
  93.      * probably no need to optimize or encourage single byte writes.
  94.      */
  95.     byte[] buf = new byte[1];
  96.     buf[0] = (byte)b;
  97.     write(buf, 0, 1);
  98.   }
  99.   
  100.   public void write(byte[] b, int off, int len) throws IOException {
  101.     ByteBuffer buf = ByteBuffer.wrap(b, off, len);
  102.     while (buf.hasRemaining()) {
  103.       try {
  104.         if (write(buf) < 0) {
  105.           throw new IOException("The stream is closed");
  106.         }
  107.       } catch (IOException e) {
  108.         /* Unlike read, write can not inform user of partial writes.
  109.          * So will close this if there was a partial write.
  110.          */
  111.         if (buf.capacity() > buf.remaining()) {
  112.           writer.close();
  113.         }
  114.         throw e;
  115.       }
  116.     }
  117.   }
  118.   public synchronized void close() throws IOException {
  119.     /* close the channel since Socket.getOuputStream().close() 
  120.      * closes the socket.
  121.      */
  122.     writer.channel.close();
  123.     writer.close();
  124.   }
  125.   /**
  126.    * Returns underlying channel used by this stream.
  127.    * This is useful in certain cases like channel for 
  128.    * {@link FileChannel#transferTo(long, long, WritableByteChannel)}
  129.    */
  130.   public WritableByteChannel getChannel() {
  131.     return writer.channel; 
  132.   }
  133.   //WritableByteChannle interface 
  134.   
  135.   public boolean isOpen() {
  136.     return writer.isOpen();
  137.   }
  138.   public int write(ByteBuffer src) throws IOException {
  139.     return writer.doIO(src, SelectionKey.OP_WRITE);
  140.   }
  141.   
  142.   /**
  143.    * waits for the underlying channel to be ready for writing.
  144.    * The timeout specified for this stream applies to this wait.
  145.    *
  146.    * @throws SocketTimeoutException 
  147.    *         if select on the channel times out.
  148.    * @throws IOException
  149.    *         if any other I/O error occurs. 
  150.    */
  151.   public void waitForWritable() throws IOException {
  152.     writer.waitForIO(SelectionKey.OP_WRITE);
  153.   }
  154.   
  155.   /**
  156.    * Transfers data from FileChannel using 
  157.    * {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
  158.    * 
  159.    * Similar to readFully(), this waits till requested amount of 
  160.    * data is transfered.
  161.    * 
  162.    * @param fileCh FileChannel to transfer data from.
  163.    * @param position position within the channel where the transfer begins
  164.    * @param count number of bytes to transfer.
  165.    * 
  166.    * @throws EOFException 
  167.    *         If end of input file is reached before requested number of 
  168.    *         bytes are transfered.
  169.    *
  170.    * @throws SocketTimeoutException 
  171.    *         If this channel blocks transfer longer than timeout for 
  172.    *         this stream.
  173.    *          
  174.    * @throws IOException Includes any exception thrown by 
  175.    *         {@link FileChannel#transferTo(long, long, WritableByteChannel)}. 
  176.    */
  177.   public void transferToFully(FileChannel fileCh, long position, int count) 
  178.                               throws IOException {
  179.     
  180.     while (count > 0) {
  181.       /* 
  182.        * Ideally we should wait after transferTo returns 0. But because of
  183.        * a bug in JRE on Linux (http://bugs.sun.com/view_bug.do?bug_id=5103988),
  184.        * which throws an exception instead of returning 0, we wait for the
  185.        * channel to be writable before writing to it. If you ever see 
  186.        * IOException with message "Resource temporarily unavailable" 
  187.        * thrown here, please let us know.
  188.        * 
  189.        * Once we move to JAVA SE 7, wait should be moved to correct place.
  190.        */
  191.       waitForWritable();
  192.       int nTransfered = (int) fileCh.transferTo(position, count, getChannel());
  193.       
  194.       if (nTransfered == 0) {
  195.         //check if end of file is reached.
  196.         if (position >= fileCh.size()) {
  197.           throw new EOFException("EOF Reached. file size is " + fileCh.size() + 
  198.                                  " and " + count + " more bytes left to be " +
  199.                                  "transfered.");
  200.         }
  201.         //otherwise assume the socket is full.
  202.         //waitForWritable(); // see comment above.
  203.       } else if (nTransfered < 0) {
  204.         throw new IOException("Unexpected return of " + nTransfered + 
  205.                               " from transferTo()");
  206.       } else {
  207.         position += nTransfered;
  208.         count -= nTransfered;
  209.       }
  210.     }
  211.   }  
  212. }