stompconnect.patch
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:9k
源码类别:

中间件编程

开发平台:

Java

  1. Index: pom.xml
  2. ===================================================================
  3. --- pom.xml (revision 88)
  4. +++ pom.xml (working copy)
  5. @@ -3,7 +3,7 @@
  6.    <groupId>org.codehaus.stomp</groupId>
  7.    <artifactId>stompconnect</artifactId>
  8.    <packaging>jar</packaging>
  9. -  <version>1.0</version>
  10. +  <version>1.0-BT</version>
  11.  
  12.    <name>StompConnect</name>
  13.    <description>
  14. @@ -33,6 +33,30 @@
  15.    </scm>
  16.  
  17.    <dependencies>
  18. +   <dependency>
  19. + <groupId>log4j</groupId>
  20. + <artifactId>log4j</artifactId>
  21. + <version>1.2.15</version>
  22. + <exclusions>
  23. + <exclusion>
  24. + <groupId>com.sun.jmx</groupId>
  25. + <artifactId>jmxri</artifactId>
  26. + </exclusion>
  27. + <exclusion>
  28. + <groupId>com.sun.jdmk</groupId>
  29. + <artifactId>jmxtools</artifactId>
  30. + </exclusion>
  31. + <exclusion>
  32. + <groupId>javax.jms</groupId>
  33. + <artifactId>jms</artifactId>
  34. + </exclusion>
  35. + <exclusion>
  36. + <groupId>javax.mail</groupId>
  37. + <artifactId>mail</artifactId>
  38. + </exclusion>
  39. + </exclusions>
  40. + </dependency>
  41. +  
  42.      <dependency>
  43.        <groupId>commons-logging</groupId>
  44.        <artifactId>commons-logging</artifactId>
  45. Index: src/main/java/org/codehaus/stomp/jms/StompSession.java
  46. ===================================================================
  47. --- src/main/java/org/codehaus/stomp/jms/StompSession.java (revision 88)
  48. +++ src/main/java/org/codehaus/stomp/jms/StompSession.java (working copy)
  49. @@ -27,6 +27,8 @@
  50.  import java.util.HashMap;
  51.  import java.util.Iterator;
  52.  import java.util.Map;
  53. +import java.util.List;
  54. +import java.util.ArrayList;
  55.  
  56.  /**
  57.   * Represents a logical session (a parallel unit of work) within a Stomp connection
  58. @@ -37,7 +39,8 @@
  59.      private final ProtocolConverter protocolConverter;
  60.      private final Session session;
  61.      private MessageProducer producer;
  62. -    private Map<String, Destination> temporaryDestinations = new HashMap<String, Destination>();
  63. +    private static Map<String, Destination> temporaryDestinations = new HashMap<String, Destination>();
  64. +    private List<String> created = new ArrayList<String>();
  65.  
  66.      public StompSession(ProtocolConverter protocolConverter, Session session) {
  67.          this.protocolConverter = protocolConverter;
  68. @@ -61,14 +64,20 @@
  69.  
  70.      public void close() throws JMSException {
  71.          session.close();
  72. + synchronized (temporaryDestinations) {
  73. + Iterator<String> i = created.iterator();
  74. + while (i.hasNext()) {
  75. + temporaryDestinations.remove(i.next());
  76. + }
  77. + }
  78.      }
  79.  
  80.      public void sendToJms(StompFrame command) throws JMSException, ProtocolException {
  81.          Map headers = command.getHeaders();
  82.          String destinationName = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
  83.          Message message = convertFrame(command);
  84. +        Destination destination = convertDestination(destinationName, false);
  85.  
  86. -        Destination destination = convertDestination(destinationName);
  87.  
  88.          int deliveryMode = getDeliveryMode(headers);
  89.          int priority = getPriority(headers);
  90. @@ -83,7 +92,7 @@
  91.          protocolConverter.sendToStomp(frame);
  92.      }
  93.  
  94. -    public Destination convertDestination(String name) throws ProtocolException, JMSException {
  95. +    public Destination convertDestination(String name, boolean forceNew) throws ProtocolException, JMSException {
  96.          if (name == null) {
  97.              throw new ProtocolException("No destination is specified!");
  98.          }
  99. @@ -97,11 +106,22 @@
  100.          }
  101.          else if (name.startsWith("/temp-queue/")) {
  102.              String tempName = name.substring("/temp-queue/".length(), name.length());
  103. -            return temporaryDestination(tempName, session.createTemporaryQueue());
  104. +     Destination answer = temporaryDestinations.get(tempName);
  105. +
  106. +            if (forceNew || answer == null) {
  107. +             return temporaryDestination(tempName, session.createTemporaryQueue());
  108. +     } else {
  109. +     return answer;
  110. +     }
  111.          }
  112.          else if (name.startsWith("/temp-topic/")) {
  113.              String tempName = name.substring("/temp-topic/".length(), name.length());
  114. -            return temporaryDestination(tempName, session.createTemporaryTopic());
  115. +            Destination answer = temporaryDestinations.get(tempName);
  116. +            if (forceNew || answer == null) {
  117. +             return temporaryDestination(tempName, session.createTemporaryTopic());
  118. +     } else {
  119. +     return answer;
  120. +     }
  121.          }
  122.          else {
  123.              throw new ProtocolException("Illegal destination name: [" + name + "] -- StompConnect destinations " +
  124. @@ -118,6 +138,7 @@
  125.              Topic topic = (Topic) d;
  126.              if (d instanceof TemporaryTopic) {
  127.                  buffer.append("/temp-topic/");
  128. +                temporaryDestination(topic.getTopicName(), d);
  129.              }
  130.              else {
  131.                  buffer.append("/topic/");
  132. @@ -128,6 +149,7 @@
  133.              Queue queue = (Queue) d;
  134.              if (d instanceof TemporaryQueue) {
  135.                  buffer.append("/temp-queue/");
  136. +                temporaryDestination(queue.getQueueName(), d);
  137.              }
  138.              else {
  139.                  buffer.append("/queue/");
  140. @@ -139,12 +161,17 @@
  141.  
  142.  
  143.      protected synchronized Destination temporaryDestination(String tempName, Destination temporaryDestination) {
  144. -        Destination answer = temporaryDestinations.get(tempName);
  145. -        if (answer == null) {
  146. -            temporaryDestinations.put(tempName, temporaryDestination);
  147. -            answer = temporaryDestination;
  148. +        synchronized (temporaryDestinations) {
  149. +                temporaryDestinations.put(tempName, temporaryDestination);
  150. +      created.add(tempName);
  151. + try {
  152. +// System.out.println("Added: " + tempName + " for " + ((Queue)temporaryDestination).getQueueName());
  153. + //throw new Throwable();
  154. + } catch (Throwable t) {
  155. + //t.printStackTrace();
  156. + }
  157.          }
  158. -        return answer;
  159. + return temporaryDestination;
  160.      }
  161.  
  162.      protected int getDeliveryMode(Map headers) throws JMSException {
  163. @@ -223,7 +250,7 @@
  164.  
  165.          o = headers.remove(Stomp.Headers.Send.REPLY_TO);
  166.          if (o != null) {
  167. -            msg.setJMSReplyTo(convertDestination((String) o));
  168. +            msg.setJMSReplyTo(convertDestination((String) o, false));
  169.          }
  170.  
  171.          // now the general headers
  172. Index: src/main/java/org/codehaus/stomp/jms/StompSubscription.java
  173. ===================================================================
  174. --- src/main/java/org/codehaus/stomp/jms/StompSubscription.java (revision 88)
  175. +++ src/main/java/org/codehaus/stomp/jms/StompSubscription.java (working copy)
  176. @@ -53,7 +53,7 @@
  177.          Map headers = frame.getHeaders();
  178.          String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
  179.          String destinationName = (String) headers.get(Stomp.Headers.Subscribe.DESTINATION);
  180. -        destination = session.convertDestination(destinationName);
  181. +        destination = session.convertDestination(destinationName, true);
  182.          Session jmsSession = session.getSession();
  183.          boolean noLocal = false;
  184. Index: src/main/java/org/codehaus/stomp/tcp/TcpTransport.java
  185. ===================================================================
  186. --- src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (revision 88)
  187. +++ src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (working copy)
  188. @@ -99,7 +99,9 @@
  189.      /**
  190.       * A one way asynchronous send
  191.       */
  192. -    public void onStompFrame(StompFrame command) throws Exception {
  193. +    // PATCHED BY TOM - THIS CAN BE INVOKED BY THE RECEIPT FOR A SUBSCRIPTION AT THE SAME
  194. +    // TIME AS THE FIRST MESSAGE IS RECEIVED
  195. +    public synchronized void onStompFrame(StompFrame command) throws Exception {
  196.          checkStarted();
  197.          marshaller.marshal(command, dataOut);
  198.          dataOut.flush();
  199. @@ -391,10 +393,10 @@
  200.      }
  201.  
  202.      protected void initializeStreams() throws Exception {
  203. -        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
  204. -        this.dataIn = new DataInputStream(buffIn);
  205. -        TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
  206. -        this.dataOut = new DataOutputStream(buffOut);
  207. +//        TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
  208. +        this.dataIn = new DataInputStream(socket.getInputStream());//new DataInputStream(buffIn);
  209. +  //      TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
  210. +        this.dataOut = new DataOutputStream(socket.getOutputStream());//new DataOutputStream(buffOut);
  211.      }
  212.  
  213.      protected void closeStreams() throws IOException {