JMSReceiverImpl.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:5k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.core.transport.hybrid;
  19. import java.util.Properties;
  20. import javax.jms.BytesMessage;
  21. import javax.jms.Destination;
  22. import javax.jms.JMSException;
  23. import javax.jms.MessageConsumer;
  24. import javax.jms.Queue;
  25. import javax.jms.Session;
  26. import javax.jms.TemporaryQueue;
  27. import javax.naming.NamingException;
  28. import org.apache.log4j.LogManager;
  29. import org.apache.log4j.Logger;
  30. import org.jboss.blacktie.jatmibroker.core.transport.Message;
  31. import org.jboss.blacktie.jatmibroker.core.transport.Receiver;
  32. import org.jboss.blacktie.jatmibroker.xatmi.Connection;
  33. import org.jboss.blacktie.jatmibroker.xatmi.ConnectionException;
  34. public class JMSReceiverImpl implements Receiver {
  35. private static final Logger log = LogManager
  36. .getLogger(JMSReceiverImpl.class);
  37. private Queue destination;
  38. private MessageConsumer receiver;
  39. private boolean isTemporary;
  40. private int timeout = 0;
  41. private static int pad = 0;
  42. JMSReceiverImpl(Session session, Properties properties) throws JMSException {
  43. destination = session.createTemporaryQueue();
  44. receiver = session.createConsumer(destination);
  45. isTemporary = true;
  46. timeout = Integer.parseInt(properties.getProperty("DestinationTimeout",
  47. "10")) * 1000;
  48. log.debug("Creating a consumer on: " + this.destination.getQueueName()
  49. + " with timeout: " + timeout);
  50. }
  51. JMSReceiverImpl(Session session, Destination destination,
  52. Properties properties) throws JMSException, NamingException {
  53. this.destination = (Queue) destination;
  54. receiver = session.createConsumer(destination);
  55. timeout = Integer.parseInt(properties.getProperty("DestinationTimeout",
  56. "2")) * 1000;
  57. log.debug("Creating a consumer on: " + this.destination.getQueueName()
  58. + " with timeout: " + timeout);
  59. }
  60. public Object getReplyTo() throws ConnectionException {
  61. if (isTemporary) {
  62. return destination;
  63. } else {
  64. return null;
  65. }
  66. }
  67. public Message receive(long flagsIn) throws ConnectionException {
  68. try {
  69. log.debug("Receiving from: " + destination.getQueueName());
  70. javax.jms.Message message = receiver.receive(timeout);
  71. if (message != null) {
  72. BytesMessage bytesMessage = ((BytesMessage) message);
  73. log.debug("Received from: " + destination.getQueueName());
  74. log.debug("getJMSExpiration is " + message.getJMSExpiration());
  75. org.jboss.blacktie.jatmibroker.core.transport.Message toProcess = convertFromBytesMessage(bytesMessage);
  76. return toProcess;
  77. }
  78. throw new ConnectionException(Connection.TPETIME,
  79. "Did not receive a message");
  80. } catch (JMSException t) {
  81. throw new ConnectionException(-1, "Couldn't receive the message", t);
  82. }
  83. }
  84. public void close() throws ConnectionException {
  85. log.debug("close");
  86. try {
  87. log.debug("closing consumer");
  88. receiver.close();
  89. log.debug("consumer closed");
  90. if (isTemporary) {
  91. log.debug("Deleting: " + destination.getQueueName());
  92. ((TemporaryQueue) destination).delete();
  93. log.debug("Deleted: " + destination.getQueueName());
  94. }
  95. } catch (Throwable t) {
  96. log.debug("consumer could not be closed");
  97. throw new ConnectionException(-1, "Could not delete the queue", t);
  98. }
  99. }
  100. public static org.jboss.blacktie.jatmibroker.core.transport.Message convertFromBytesMessage(
  101. BytesMessage message) throws JMSException {
  102. String controlIOR = message.getStringProperty("messagecontrol");
  103. String replyTo = message.getStringProperty("messagereplyto");
  104. int len = (int) message.getBodyLength();
  105. String serviceName = message.getStringProperty("servicename");
  106. int flags = new Integer(message.getStringProperty("messageflags"));
  107. int cd = new Integer(message.getStringProperty("messagecorrelationId"));
  108. String type = message.getStringProperty("messagetype");
  109. String subtype = message.getStringProperty("messagesubtype");
  110. log.debug("type: " + type + " subtype: " + subtype);
  111. org.jboss.blacktie.jatmibroker.core.transport.Message toProcess = new org.jboss.blacktie.jatmibroker.core.transport.Message();
  112. toProcess.type = type;
  113. toProcess.subtype = subtype;
  114. toProcess.replyTo = replyTo;
  115. toProcess.serviceName = serviceName;
  116. toProcess.flags = flags;
  117. toProcess.cd = cd;
  118. toProcess.len = len - pad;
  119. if (toProcess.type == "") {
  120. toProcess.data = null;
  121. } else {
  122. toProcess.data = new byte[toProcess.len];
  123. message.readBytes(toProcess.data);
  124. }
  125. toProcess.control = controlIOR;
  126. return toProcess;
  127. }
  128. }