HttpDataChannel.java
上传用户:szyujian
上传日期:2016-09-20
资源大小:320k
文件大小:16k
- /*
- * Copyright (C) 2007-2008 Esmertec AG.
- * Copyright (C) 2007-2008 The Android Open Source Project
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- package com.android.im.imps;
- import java.io.ByteArrayInputStream;
- import java.io.ByteArrayOutputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.HttpURLConnection;
- import java.net.URI;
- import java.net.URISyntaxException;
- import java.util.concurrent.LinkedBlockingQueue;
- import java.util.concurrent.TimeUnit;
- import java.util.concurrent.atomic.AtomicBoolean;
- import org.apache.http.Header;
- import org.apache.http.HttpEntity;
- import org.apache.http.HttpResponse;
- import org.apache.http.StatusLine;
- import org.apache.http.client.methods.HttpPost;
- import org.apache.http.entity.ByteArrayEntity;
- import org.apache.http.message.BasicHeader;
- import org.apache.http.params.HttpConnectionParams;
- import org.apache.http.params.HttpParams;
- import android.net.http.AndroidHttpClient;
- import android.os.SystemClock;
- import android.util.Log;
- import com.android.im.engine.ImErrorInfo;
- import com.android.im.engine.ImException;
- import com.android.im.imps.Primitive.TransactionMode;
- /**
- * The <code>HttpDataChannel</code> is an implementation of IMPS data channel
- * in which the protocol binding is HTTP.
- */
- class HttpDataChannel extends DataChannel implements Runnable {
- private static final int MAX_RETRY_COUNT = 10;
- private static final int INIT_RETRY_DELAY_MS = 5000;
- private static final int MAX_RETRY_DELAY_MS = 300 * 1000;
- private Thread mSendThread;
- private boolean mStopped;
- private boolean mConnected;
- private boolean mStopRetry;
- private Object mRetryLock = new Object();
- private LinkedBlockingQueue<Primitive> mSendQueue;
- private LinkedBlockingQueue<Primitive> mReceiveQueue;
- private long mLastActive;
- private int mKeepAliveMillis;
- private Primitive mKeepAlivePrimitive;
- private AtomicBoolean mHasPendingPolling = new AtomicBoolean(false);
- private final AndroidHttpClient mHttpClient;
- private final Header mContentTypeHeader;
- private final Header mMsisdnHeader;
- private URI mPostUri;
- private ImpsTransactionManager mTxManager;
- /**
- * Constructs a new HttpDataChannel for a connection.
- *
- * @param connection the connection which uses the data channel.
- */
- public HttpDataChannel(ImpsConnection connection) throws ImException {
- super(connection);
- mTxManager = connection.getTransactionManager();
- ImpsConnectionConfig cfg = connection.getConfig();
- try {
- mPostUri = new URI(cfg.getHost());
- if (mPostUri.getPath() == null || "".equals(mPostUri.getPath())) {
- mPostUri = new URI(cfg.getHost() + "/");
- }
- if (!"http".equalsIgnoreCase(mPostUri.getScheme())
- && !"https".equalsIgnoreCase(mPostUri.getScheme())) {
- throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
- "Non HTTP/HTTPS host name.");
- }
- mHttpClient = AndroidHttpClient.newInstance("Android-Imps/0.1");
- HttpParams params = mHttpClient.getParams();
- HttpConnectionParams.setConnectionTimeout(params, cfg.getReplyTimeout());
- HttpConnectionParams.setSoTimeout(params, cfg.getReplyTimeout());
- } catch (URISyntaxException e) {
- throw new ImException(ImErrorInfo.INVALID_HOST_NAME,
- e.getLocalizedMessage());
- }
- mContentTypeHeader = new BasicHeader("Content-Type", cfg.getTransportContentType());
- String msisdn = cfg.getMsisdn();
- mMsisdnHeader = (msisdn != null) ? new BasicHeader("MSISDN", msisdn) : null;
- }
- @Override
- public void connect() throws ImException {
- if (mConnected) {
- throw new ImException("Already connected");
- }
- mStopped = false;
- mStopRetry = false;
- mSendQueue = new LinkedBlockingQueue<Primitive>();
- mReceiveQueue = new LinkedBlockingQueue<Primitive>();
- mSendThread = new Thread(this, "HttpDataChannel");
- mSendThread.setDaemon(true);
- mSendThread.start();
- mConnected = true;
- }
- @Override
- public void shutdown() {
- // Stop the sending thread
- mStopped = true;
- mSendThread.interrupt();
- mConnected = false;
- }
- @Override
- public void sendPrimitive(Primitive p) {
- if (!mConnected || mStopped) {
- ImpsLog.log("DataChannel not connected, ignore primitive " + p.getType());
- return;
- }
- if (ImpsTags.Polling_Request.equals(p.getType())) {
- if (!mHasPendingPolling.compareAndSet(false, true)) {
- ImpsLog.log("HttpDataChannel: Ignoring Polling-Request");
- return;
- }
- } else if (ImpsTags.Logout_Request.equals(p.getType())) {
- mStopRetry = true;
- synchronized (mRetryLock) {
- mRetryLock.notify();
- }
- }
- if (!mSendQueue.offer(p)) {
- // This is almost impossible for a LinkedBlockingQueue. We don't
- // even bother to assign an error code for this. ;)
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.UNKNOWN_ERROR, "sending queue full");
- }
- }
- @Override
- public Primitive receivePrimitive() throws InterruptedException {
- if (!mConnected || mStopped) {
- throw new IllegalStateException();
- }
- return mReceiveQueue.take();
- }
- @Override
- public void startKeepAlive(long interval) {
- if (!mConnected || mStopped) {
- throw new IllegalStateException();
- }
- if (interval <= 0) {
- interval = mConnection.getConfig().getDefaultKeepAliveInterval();
- }
- mKeepAliveMillis = (int)(interval * 1000 - 100);
- if (mKeepAliveMillis < 0) {
- ImpsLog.log("Negative keep alive time. Won't send keep-alive");
- }
- mKeepAlivePrimitive = new Primitive(ImpsTags.KeepAlive_Request);
- }
- @Override
- public long getLastActiveTime() {
- return mLastActive;
- }
- @Override
- public boolean isSendingQueueEmpty() {
- if (!mConnected || mStopped) {
- throw new IllegalStateException();
- }
- return mSendQueue.isEmpty();
- }
- public void run() {
- boolean needKeepAlive = false;
- while (!mStopped) {
- if (needKeepAlive) {
- sendKeepAlive();
- needKeepAlive = false;
- }
- Primitive primitive;
- try {
- if (mKeepAliveMillis <= 0) {
- primitive = mSendQueue.take();
- } else {
- primitive = mSendQueue.poll(mKeepAliveMillis, TimeUnit.MILLISECONDS);
- if (primitive == null) {
- if (!mStopped) {
- needKeepAlive = true;
- }
- continue;
- }
- }
- } catch (InterruptedException e) {
- if (mStopped) {
- break;
- }
- continue;
- }
- if (primitive.getType().equals(ImpsTags.Polling_Request)) {
- mHasPendingPolling.set(false);
- }
- doSendPrimitive(primitive);
- }
- mHttpClient.close();
- }
- private void sendKeepAlive() {
- ImpsTransactionManager tm = mConnection.getTransactionManager();
- AsyncTransaction tx = new AsyncTransaction(tm) {
- @Override
- public void onResponseError(ImpsErrorInfo error) {
- }
- @Override
- public void onResponseOk(Primitive response) {
- // Since we never request a new timeout value, the response
- // can be ignored
- }
- };
- tx.sendRequest(mKeepAlivePrimitive);
- }
- /**
- * Sends a primitive to the IMPS server through HTTP.
- *
- * @param p The primitive to send.
- */
- private void doSendPrimitive(Primitive p) {
- String errorInfo = null;
- int retryCount = 0;
- long retryDelay = INIT_RETRY_DELAY_MS;
- while (retryCount < MAX_RETRY_COUNT) {
- try {
- trySend(p);
- return;
- } catch (IOException e) {
- errorInfo = e.getLocalizedMessage();
- String type = p.getType();
- if (ImpsTags.Login_Request.equals(type)
- || ImpsTags.Logout_Request.equals(type)) {
- // we don't retry to send login/logout request. The request
- // might be sent to the server successfully but we failed to
- // get the response from the server. Retry in this case might
- // cause multiple login which is not allowed by some server.
- break;
- }
- if (p.getTransactionMode() == TransactionMode.Response) {
- // Ignore the failure of sending response to the server since
- // it's only an acknowledgment. When we get here, the
- // primitive might have been sent successfully but failed to
- // get the http response. The server might or might not send
- // the request again if it does not receive the acknowledgment,
- // the client is ok to either case.
- return;
- }
- retryCount++;
- // sleep for a while and retry to send the primitive in a new
- // transaction if we havn't met the max retry count.
- if (retryCount < MAX_RETRY_COUNT) {
- mTxManager.reassignTransactionId(p);
- Log.w(ImpsLog.TAG, "Send primitive failed, retry after " + retryDelay + "ms");
- synchronized (mRetryLock) {
- try {
- mRetryLock.wait(retryDelay);
- } catch (InterruptedException ignore) {
- }
- if (mStopRetry) {
- break;
- }
- }
- retryDelay = retryDelay * 2;
- if (retryDelay > MAX_RETRY_DELAY_MS) {
- retryDelay = MAX_RETRY_DELAY_MS;
- }
- }
- }
- }
- Log.w(ImpsLog.TAG, "Failed to send primitive after " + MAX_RETRY_COUNT + " retries");
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.NETWORK_ERROR, errorInfo);
- }
- private void trySend(Primitive p) throws IOException {
- ByteArrayOutputStream out = new ByteArrayOutputStream();
- try {
- mSerializer.serialize(p, out);
- } catch (SerializerException e) {
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.SERIALIZER_ERROR,
- "Internal serializer error, primitive: " + p.getType());
- out.close();
- return;
- }
- HttpPost req = new HttpPost(mPostUri);
- req.addHeader(mContentTypeHeader);
- if (mMsisdnHeader != null) {
- req.addHeader(mMsisdnHeader);
- }
- ByteArrayEntity entity = new ByteArrayEntity(out.toByteArray());
- req.setEntity(entity);
- mLastActive = SystemClock.elapsedRealtime();
- if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
- long sendBytes = entity.getContentLength() + 176 /* approx. header length */;
- ImpsLog.log(mConnection.getLoginUserName() + " >> " + p.getType() + " HTTP payload approx. " + sendBytes + " bytes");
- }
- if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
- ImpsLog.dumpRawPacket(out.toByteArray());
- ImpsLog.dumpPrimitive(p);
- }
- HttpResponse res = mHttpClient.execute(req);
- StatusLine statusLine = res.getStatusLine();
- HttpEntity resEntity = res.getEntity();
- InputStream in = resEntity.getContent();
- if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
- Log.d(ImpsLog.PACKET_TAG, statusLine.toString());
- Header[] headers = res.getAllHeaders();
- for (Header h : headers) {
- Log.d(ImpsLog.PACKET_TAG, h.toString());
- }
- int len = (int) resEntity.getContentLength();
- if (len > 0) {
- byte[] content = new byte[len];
- int offset = 0;
- int bytesRead = 0;
- do {
- bytesRead = in.read(content, offset, len);
- offset += bytesRead;
- len -= bytesRead;
- } while (bytesRead > 0);
- in.close();
- ImpsLog.dumpRawPacket(content);
- in = new ByteArrayInputStream(content);
- }
- }
- try {
- if (statusLine.getStatusCode() != HttpURLConnection.HTTP_OK) {
- mTxManager.notifyErrorResponse(p.getTransactionID(), statusLine.getStatusCode(),
- statusLine.getReasonPhrase());
- return;
- }
- if (resEntity.getContentLength() == 0) {
- // empty responses are only valid for Polling-Request or
- // server initiated transactions
- if ((p.getTransactionMode() != TransactionMode.Response)
- && !p.getType().equals(ImpsTags.Polling_Request)) {
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.ILLEGAL_SERVER_RESPONSE,
- "bad response from server");
- }
- return;
- }
- Primitive response = mParser.parse(in);
- if (Log.isLoggable(ImpsLog.PACKET_TAG, Log.DEBUG)) {
- ImpsLog.dumpPrimitive(response);
- }
- if (Log.isLoggable(ImpsLog.TAG, Log.DEBUG)) {
- long len = 2 + resEntity.getContentLength() + statusLine.toString().length() + 2;
- Header[] headers = res.getAllHeaders();
- for (Header header : headers) {
- len += header.getName().length() + header.getValue().length() + 4;
- }
- ImpsLog.log(mConnection.getLoginUserName() + " << "
- + response.getType() + " HTTP payload approx. " + len + "bytes");
- }
- if (!mReceiveQueue.offer(response)) {
- // This is almost impossible for a LinkedBlockingQueue.
- // We don't even bother to assign an error code for it.
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.UNKNOWN_ERROR, "receiving queue full");
- }
- } catch (ParserException e) {
- ImpsLog.logError(e);
- mTxManager.notifyErrorResponse(p.getTransactionID(),
- ImErrorInfo.PARSER_ERROR,
- "Parser error, received a bad response from server");
- } finally {
- //consume all the content so that the connection can be re-used.
- resEntity.consumeContent();
- }
- }
- }