JMSSenderImpl.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:5k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.core.transport.hybrid;
  19. import javax.jms.BytesMessage;
  20. import javax.jms.Destination;
  21. import javax.jms.JMSException;
  22. import javax.jms.MessageProducer;
  23. import javax.jms.Queue;
  24. import javax.jms.Session;
  25. import javax.jms.Topic;
  26. import javax.naming.NamingException;
  27. import org.apache.log4j.LogManager;
  28. import org.apache.log4j.Logger;
  29. import org.jboss.blacktie.jatmibroker.core.transport.JtsTransactionImple;
  30. import org.jboss.blacktie.jatmibroker.core.transport.Sender;
  31. import org.jboss.blacktie.jatmibroker.xatmi.Connection;
  32. import org.jboss.blacktie.jatmibroker.xatmi.ConnectionException;
  33. public class JMSSenderImpl implements Sender {
  34. private static final Logger log = LogManager.getLogger(JMSSenderImpl.class);
  35. private MessageProducer sender;
  36. private Session session;
  37. private String name;
  38. boolean service;
  39. private boolean closed;
  40. private Destination destination;
  41. private int pad = 0;
  42. JMSSenderImpl(Session session, Destination destination)
  43. throws NamingException, JMSException {
  44. this.session = session;
  45. sender = session.createProducer(destination);
  46. if (destination instanceof Queue) {
  47. this.name = ((Queue) destination).getQueueName();
  48. } else {
  49. this.name = ((Topic) destination).getTopicName();
  50. }
  51. this.destination = destination;
  52. service = true;
  53. log.debug("Sender Created: " + name);
  54. }
  55. public void send(Object replyTo, short rval, int rcode, byte[] data,
  56. int len, int correlationId, int flags, int ttl, String type,
  57. String subtype) throws ConnectionException {
  58. if (closed) {
  59. throw new ConnectionException(Connection.TPEPROTO, "Sender closed");
  60. }
  61. if (data == null) {
  62. data = new byte[1];
  63. len = 1;
  64. }
  65. if (len < 1) {
  66. throw new ConnectionException(Connection.TPEINVAL,
  67. "Length of buffer must be greater than 0");
  68. }
  69. if (name != null) {
  70. log.debug("Sender sending: " + name);
  71. }
  72. try {
  73. BytesMessage message = session.createBytesMessage();
  74. String ior = JtsTransactionImple.getTransactionIOR();
  75. message.setStringProperty("messagecontrol", ior);
  76. log.debug("Sender sending IOR: " + ior);
  77. if (replyTo != null) {
  78. message.setStringProperty("messagereplyto", (String) replyTo);
  79. }
  80. if (service) {
  81. message.setStringProperty("servicename", name);
  82. }
  83. message.setStringProperty("messagecorrelationId", String
  84. .valueOf(correlationId));
  85. message.setStringProperty("messageflags", String.valueOf(flags));
  86. message.setStringProperty("messagerval", String.valueOf(rval));
  87. message.setStringProperty("messagercode", String.valueOf(rcode));
  88. message.setStringProperty("messagetype", type == null ? "" : type);
  89. message.setStringProperty("messagesubtype", subtype == null ? ""
  90. : subtype);
  91. byte[] toSend = new byte[len + pad];
  92. if (data != null) {
  93. int min = Math.min(toSend.length, data.length);
  94. System.arraycopy(data, 0, toSend, 0, min);
  95. }
  96. message.writeBytes(toSend, 0, toSend.length);
  97. if (ttl > 0) {
  98. int deliveryMode = message.getJMSDeliveryMode();
  99. int priority = message.getJMSPriority();
  100. log.debug("send message with time-to-live " + ttl);
  101. sender.send(message, deliveryMode, priority, ttl);
  102. } else {
  103. sender.send(message);
  104. }
  105. log.debug("sent message");
  106. } catch (JMSException e) {
  107. throw new ConnectionException(-1, "Could not send the message: "
  108. + e.getMessage(), e);
  109. }
  110. }
  111. public void close() throws ConnectionException {
  112. try {
  113. log.debug("Sender closing: " + name);
  114. sender.close();
  115. sender = null;
  116. closed = true;
  117. log.debug("Sender closed: " + name);
  118. } catch (Throwable t) {
  119. throw new ConnectionException(-1, "Could not send the message", t);
  120. }
  121. }
  122. public Object getSendTo() {
  123. return destination;
  124. }
  125. }