HttpDataChannel.java
上传用户:szyujian
上传日期:2016-09-20
资源大小:320k
文件大小:16k
源码类别:

android开发

开发平台:

C/C++

  1. /*
  2.  * Copyright (C) 2007-2008 Esmertec AG.
  3.  * Copyright (C) 2007-2008 The Android Open Source Project
  4.  *
  5.  * Licensed under the Apache License, Version 2.0 (the "License");
  6.  * you may not use this file except in compliance with the License.
  7.  * You may obtain a copy of the License at
  8.  *
  9.  *      http://www.apache.org/licenses/LICENSE-2.0
  10.  *
  11.  * Unless required by applicable law or agreed to in writing, software
  12.  * distributed under the License is distributed on an "AS IS" BASIS,
  13.  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14.  * See the License for the specific language governing permissions and
  15.  * limitations under the License.
  16.  */
  17. package com.android.im.imps;
  18. import java.io.ByteArrayInputStream;
  19. import java.io.ByteArrayOutputStream;
  20. import java.io.IOException;
  21. import java.io.InputStream;
  22. import java.net.HttpURLConnection;
  23. import java.net.URI;
  24. import java.net.URISyntaxException;
  25. import java.util.concurrent.LinkedBlockingQueue;
  26. import java.util.concurrent.TimeUnit;
  27. import java.util.concurrent.atomic.AtomicBoolean;
  28. import org.apache.http.Header;
  29. import org.apache.http.HttpEntity;
  30. import org.apache.http.HttpResponse;
  31. import org.apache.http.StatusLine;
  32. import org.apache.http.client.methods.HttpPost;
  33. import org.apache.http.entity.ByteArrayEntity;
  34. import org.apache.http.message.BasicHeader;
  35. import org.apache.http.params.HttpConnectionParams;
  36. import org.apache.http.params.HttpParams;
  37. import android.net.http.AndroidHttpClient;
  38. import android.os.SystemClock;
  39. import android.util.Log;
  40. import com.android.im.engine.ImErrorInfo;
  41. import com.android.im.engine.ImException;
  42. import com.android.im.imps.Primitive.TransactionMode;
  43. /**
  44.  * The <code>HttpDataChannel</code> is an implementation of IMPS data channel
  45.  * in which the protocol binding is HTTP.
  46.  */
  47. class HttpDataChannel extends DataChannel implements Runnable {
  48.     private static final int MAX_RETRY_COUNT = 10;
  49.     private static final int INIT_RETRY_DELAY_MS = 5000;
  50.     private static final int MAX_RETRY_DELAY_MS = 300 * 1000;
  51.     private Thread mSendThread;
  52.     private boolean mStopped;
  53.     private boolean mConnected;
  54.     private boolean mStopRetry;
  55.     private Object mRetryLock = new Object();
  56.     private LinkedBlockingQueue<Primitive> mSendQueue;
  57.     private LinkedBlockingQueue<Primitive> mReceiveQueue;
  58.     private long mLastActive;
  59.     private int mKeepAliveMillis;
  60.     private Primitive mKeepAlivePrimitive;
  61.     private AtomicBoolean mHasPendingPolling = new AtomicBoolean(false);
  62.     private final AndroidHttpClient mHttpClient;
  63.     private final Header mContentTypeHeader;
  64.     private final Header mMsisdnHeader;
  65.     private URI mPostUri;
  66.     private ImpsTransactionManager mTxManager;
  67.     /**
  68.      * Constructs a new HttpDataChannel for a connection.
  69.      *
  70.      * @param connection the connection which uses the data channel.
  71.      */
  72.     public HttpDataChannel(ImpsConnection connection) throws ImException {
  73.         super(connection);
  74.         mTxManager = connection.getTransactionManager();
  75.         ImpsConnectionConfig cfg = connection.getConfig();
  76.         try {
  77.             mPostUri = new URI(cfg.getHost());
  78.             if (mPostUri.getPath() == null || "".equals(mPostUri.getPath())) {
  79.                 mPostUri = new URI(cfg.getHost() + "/");
  80.             }
  81.             if (!"http".equalsIgnoreCase(mPostUri.getScheme())
  82.                     && !"https".equalsIgnoreCase(mPostUri.getScheme())) {
  83.                 throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
  84.                         "Non HTTP/HTTPS host name.");
  85.             }
  86.             mHttpClient = AndroidHttpClient.newInstance("Android-Imps/0.1");
  87.             HttpParams params = mHttpClient.getParams();
  88.             HttpConnectionParams.setConnectionTimeout(params, cfg.getReplyTimeout());
  89.             HttpConnectionParams.setSoTimeout(params, cfg.getReplyTimeout());
  90.         } catch (URISyntaxException e) {
  91.             throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
  92.                     e.getLocalizedMessage());
  93.         }
  94.         mContentTypeHeader = new BasicHeader("Content-Type", cfg.getTransportContentType());
  95.         String msisdn = cfg.getMsisdn();
  96.         mMsisdnHeader = (msisdn != null) ? new BasicHeader("MSISDN", msisdn) : null;
  97.     }
  98.     @Override
  99.     public void connect() throws ImException {
  100.         if (mConnected) {
  101.             throw new ImException("Already connected");
  102.         }
  103.         mStopped = false;
  104.         mStopRetry = false;
  105.         mSendQueue = new LinkedBlockingQueue<Primitive>();
  106.         mReceiveQueue = new LinkedBlockingQueue<Primitive>();
  107.         mSendThread = new Thread(this, "HttpDataChannel");
  108.         mSendThread.setDaemon(true);
  109.         mSendThread.start();
  110.         mConnected = true;
  111.     }
  112.     @Override
  113.     public void shutdown() {
  114.         // Stop the sending thread
  115.         mStopped = true;
  116.         mSendThread.interrupt();
  117.         mConnected = false;
  118.     }
  119.     @Override
  120.     public void sendPrimitive(Primitive p) {
  121.         if (!mConnected || mStopped) {
  122.             ImpsLog.log("DataChannel not connected, ignore primitive " + p.getType());
  123.             return;
  124.         }
  125.         if (ImpsTags.Polling_Request.equals(p.getType())) {
  126.             if (!mHasPendingPolling.compareAndSet(false, true)) {
  127.                 ImpsLog.log("HttpDataChannel: Ignoring Polling-Request");
  128.                 return;
  129.             }
  130.         } else if (ImpsTags.Logout_Request.equals(p.getType())) {
  131.             mStopRetry = true;
  132.             synchronized (mRetryLock) {
  133.                 mRetryLock.notify();
  134.             }
  135.         }
  136.         if (!mSendQueue.offer(p)) {
  137.             // This is almost impossible for a LinkedBlockingQueue. We don't
  138.             // even bother to assign an error code for this. ;)
  139.             mTxManager.notifyErrorResponse(p.getTransactionID(),
  140.                     ImErrorInfo.UNKNOWN_ERROR, "sending queue full");
  141.         }
  142.     }
  143.     @Override
  144.     public Primitive receivePrimitive() throws InterruptedException {
  145.         if (!mConnected || mStopped) {
  146.             throw new IllegalStateException();
  147.         }
  148.         return mReceiveQueue.take();
  149.     }
  150.     @Override
  151.     public void startKeepAlive(long interval) {
  152.         if (!mConnected || mStopped) {
  153.             throw new IllegalStateException();
  154.         }
  155.         if (interval <= 0) {
  156.             interval = mConnection.getConfig().getDefaultKeepAliveInterval();
  157.         }
  158.         mKeepAliveMillis = (int)(interval * 1000 - 100);
  159.         if (mKeepAliveMillis < 0) {
  160.             ImpsLog.log("Negative keep alive time. Won't send keep-alive");
  161.         }
  162.         mKeepAlivePrimitive = new Primitive(ImpsTags.KeepAlive_Request);
  163.     }
  164.     @Override
  165.     public long getLastActiveTime() {
  166.         return mLastActive;
  167.     }
  168.     @Override
  169.     public boolean isSendingQueueEmpty() {
  170.         if (!mConnected || mStopped) {
  171.             throw new IllegalStateException();
  172.         }
  173.         return mSendQueue.isEmpty();
  174.     }
  175.     public void run() {
  176.         boolean needKeepAlive = false;
  177.         while (!mStopped) {
  178.             if (needKeepAlive) {
  179.                 sendKeepAlive();
  180.                 needKeepAlive = false;
  181.             }
  182.             Primitive primitive;
  183.             try {
  184.                 if (mKeepAliveMillis <= 0) {
  185.                     primitive = mSendQueue.take();
  186.                 } else {
  187.                     primitive = mSendQueue.poll(mKeepAliveMillis, TimeUnit.MILLISECONDS);
  188.                     if (primitive == null) {
  189.                         if (!mStopped) {
  190.                             needKeepAlive = true;
  191.                         }
  192.                         continue;
  193.                     }
  194.                 }
  195.             } catch (InterruptedException e) {
  196.                 if (mStopped) {
  197.                     break;
  198.                 }
  199.                 continue;
  200.             }
  201.             if (primitive.getType().equals(ImpsTags.Polling_Request)) {
  202.                 mHasPendingPolling.set(false);
  203.             }
  204.             doSendPrimitive(primitive);
  205.         }
  206.         mHttpClient.close();
  207.     }
  208.     private void sendKeepAlive() {
  209.         ImpsTransactionManager tm = mConnection.getTransactionManager();
  210.         AsyncTransaction tx = new AsyncTransaction(tm) {
  211.             @Override
  212.             public void onResponseError(ImpsErrorInfo error) {
  213.             }
  214.             @Override
  215.             public void onResponseOk(Primitive response) {
  216.                 // Since we never request a new timeout value, the response
  217.                 // can be ignored
  218.             }
  219.         };
  220.         tx.sendRequest(mKeepAlivePrimitive);
  221.     }
  222.     /**
  223.      * Sends a primitive to the IMPS server through HTTP.
  224.      *
  225.      * @param p The primitive to send.
  226.      */
  227.     private void doSendPrimitive(Primitive p) {
  228.         String errorInfo = null;
  229.         int retryCount = 0;
  230.         long retryDelay = INIT_RETRY_DELAY_MS;
  231.         while (retryCount < MAX_RETRY_COUNT) {
  232.             try {
  233.                 trySend(p);
  234.                 return;
  235.             } catch (IOException e) {
  236.                 errorInfo = e.getLocalizedMessage();
  237.                 String type = p.getType();
  238.                 if (ImpsTags.Login_Request.equals(type)
  239.                         || ImpsTags.Logout_Request.equals(type)) {
  240.                     // we don't retry to send login/logout request. The request
  241.                     // might be sent to the server successfully but we failed to
  242.                     // get the response from the server. Retry in this case might
  243.                     // cause multiple login which is not allowed by some server.
  244.                     break;
  245.                 }
  246.                 if (p.getTransactionMode() == TransactionMode.Response) {
  247.                     // Ignore the failure of sending response to the server since
  248.                     // it's only an acknowledgment. When we get here, the
  249.                     // primitive might have been sent successfully but failed to
  250.                     // get the http response. The server might or might not send
  251.                     // the request again if it does not receive the acknowledgment,
  252.                     // the client is ok to either case.
  253.                     return;
  254.                 }
  255.                 retryCount++;
  256.                 // sleep for a while and retry to send the primitive in a new
  257.                 // transaction if we havn't met the max retry count.
  258.                 if (retryCount < MAX_RETRY_COUNT) {
  259.                    mTxManager.reassignTransactionId(p);
  260.                     Log.w(ImpsLog.TAG, "Send primitive failed, retry after " + retryDelay + "ms");
  261.                     synchronized (mRetryLock) {
  262.                         try {
  263.                             mRetryLock.wait(retryDelay);
  264.                         } catch (InterruptedException ignore) {
  265.                         }
  266.                         if (mStopRetry) {
  267.                             break;
  268.                         }
  269.                     }
  270.                     retryDelay = retryDelay * 2;
  271.                     if (retryDelay > MAX_RETRY_DELAY_MS) {
  272.                         retryDelay = MAX_RETRY_DELAY_MS;
  273.                     }
  274.                 }
  275.             }
  276.         }
  277.         Log.w(ImpsLog.TAG, "Failed to send primitive after " + MAX_RETRY_COUNT + " retries");
  278.         mTxManager.notifyErrorResponse(p.getTransactionID(),
  279.                 ImErrorInfo.NETWORK_ERROR, errorInfo);
  280.     }
  281.     private void trySend(Primitive p) throws IOException {
  282.         ByteArrayOutputStream out = new ByteArrayOutputStream();
  283.         try {
  284.             mSerializer.serialize(p, out);
  285.         } catch (SerializerException e) {
  286.             mTxManager.notifyErrorResponse(p.getTransactionID(),
  287.                     ImErrorInfo.SERIALIZER_ERROR,
  288.                     "Internal serializer error, primitive: " + p.getType());
  289.             out.close();
  290.             return;
  291.         }
  292.         HttpPost req = new HttpPost(mPostUri);
  293.         req.addHeader(mContentTypeHeader);
  294.         if (mMsisdnHeader != null) {
  295.             req.addHeader(mMsisdnHeader);
  296.         }
  297.         ByteArrayEntity entity = new ByteArrayEntity(out.toByteArray());
  298.         req.setEntity(entity);
  299.         mLastActive = SystemClock.elapsedRealtime();
  300.         if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
  301.             long sendBytes = entity.getContentLength() + 176 /* approx. header length */;
  302.             ImpsLog.log(mConnection.getLoginUserName() + " >> " + p.getType() + " HTTP payload approx. " + sendBytes + " bytes");
  303.         }
  304.         if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
  305.             ImpsLog.dumpRawPacket(out.toByteArray());
  306.             ImpsLog.dumpPrimitive(p);
  307.         }
  308.         HttpResponse res = mHttpClient.execute(req);
  309.         StatusLine statusLine = res.getStatusLine();
  310.         HttpEntity resEntity = res.getEntity();
  311.         InputStream in = resEntity.getContent();
  312.         if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
  313.             Log.d(ImpsLog.PACKET_TAG, statusLine.toString());
  314.             Header[] headers = res.getAllHeaders();
  315.             for (Header h : headers) {
  316.                 Log.d(ImpsLog.PACKET_TAG, h.toString());
  317.             }
  318.             int len = (int) resEntity.getContentLength();
  319.             if (len > 0) {
  320.                 byte[] content = new byte[len];
  321.                 int offset = 0;
  322.                 int bytesRead = 0;
  323.                 do {
  324.                     bytesRead = in.read(content, offset, len);
  325.                     offset += bytesRead;
  326.                     len -= bytesRead;
  327.                 } while (bytesRead > 0);
  328.                 in.close();
  329.                 ImpsLog.dumpRawPacket(content);
  330.                 in = new ByteArrayInputStream(content);
  331.             }
  332.         }
  333.         try {
  334.             if (statusLine.getStatusCode() != HttpURLConnection.HTTP_OK) {
  335.                 mTxManager.notifyErrorResponse(p.getTransactionID(), statusLine.getStatusCode(),
  336.                         statusLine.getReasonPhrase());
  337.                 return;
  338.             }
  339.             if (resEntity.getContentLength() == 0) {
  340.                 // empty responses are only valid for Polling-Request or
  341.                 // server initiated transactions
  342.                 if ((p.getTransactionMode() != TransactionMode.Response)
  343.                         && !p.getType().equals(ImpsTags.Polling_Request)) {
  344.                     mTxManager.notifyErrorResponse(p.getTransactionID(),
  345.                             ImErrorInfo.ILLEGAL_SERVER_RESPONSE,
  346.                             "bad response from server");
  347.                 }
  348.                 return;
  349.             }
  350.             Primitive response = mParser.parse(in);
  351.             if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
  352.                 ImpsLog.dumpPrimitive(response);
  353.             }
  354.             if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
  355.                 long len = 2 + resEntity.getContentLength() + statusLine.toString().length() + 2;
  356.                 Header[] headers = res.getAllHeaders();
  357.                 for (Header header : headers) {
  358.                     len += header.getName().length() + header.getValue().length() + 4;
  359.                 }
  360.                 ImpsLog.log(mConnection.getLoginUserName() + " << "
  361.                         + response.getType() + " HTTP payload approx. " + len + "bytes");
  362.             }
  363.             if (!mReceiveQueue.offer(response)) {
  364.                 // This is almost impossible for a LinkedBlockingQueue.
  365.                 // We don't even bother to assign an error code for it.
  366.                 mTxManager.notifyErrorResponse(p.getTransactionID(),
  367.                         ImErrorInfo.UNKNOWN_ERROR, "receiving queue full");
  368.             }
  369.         } catch (ParserException e) {
  370.             ImpsLog.logError(e);
  371.             mTxManager.notifyErrorResponse(p.getTransactionID(),
  372.                     ImErrorInfo.PARSER_ERROR,
  373.                     "Parser error, received a bad response from server");
  374.         } finally {
  375.             //consume all the content so that the connection can be re-used.
  376.             resEntity.consumeContent();
  377.         }
  378.     }
  379. }