stompconnect.patch
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:9k
- Index: pom.xml
- ===================================================================
- --- pom.xml (revision 88)
- +++ pom.xml (working copy)
- @@ -3,7 +3,7 @@
- <groupId>org.codehaus.stomp</groupId>
- <artifactId>stompconnect</artifactId>
- <packaging>jar</packaging>
- - <version>1.0</version>
- + <version>1.0-BT</version>
-
- <name>StompConnect</name>
- <description>
- @@ -33,6 +33,30 @@
- </scm>
-
- <dependencies>
- + <dependency>
- + <groupId>log4j</groupId>
- + <artifactId>log4j</artifactId>
- + <version>1.2.15</version>
- + <exclusions>
- + <exclusion>
- + <groupId>com.sun.jmx</groupId>
- + <artifactId>jmxri</artifactId>
- + </exclusion>
- + <exclusion>
- + <groupId>com.sun.jdmk</groupId>
- + <artifactId>jmxtools</artifactId>
- + </exclusion>
- + <exclusion>
- + <groupId>javax.jms</groupId>
- + <artifactId>jms</artifactId>
- + </exclusion>
- + <exclusion>
- + <groupId>javax.mail</groupId>
- + <artifactId>mail</artifactId>
- + </exclusion>
- + </exclusions>
- + </dependency>
- +
- <dependency>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- Index: src/main/java/org/codehaus/stomp/jms/StompSession.java
- ===================================================================
- --- src/main/java/org/codehaus/stomp/jms/StompSession.java (revision 88)
- +++ src/main/java/org/codehaus/stomp/jms/StompSession.java (working copy)
- @@ -27,6 +27,8 @@
- import java.util.HashMap;
- import java.util.Iterator;
- import java.util.Map;
- +import java.util.List;
- +import java.util.ArrayList;
-
- /**
- * Represents a logical session (a parallel unit of work) within a Stomp connection
- @@ -37,7 +39,8 @@
- private final ProtocolConverter protocolConverter;
- private final Session session;
- private MessageProducer producer;
- - private Map<String, Destination> temporaryDestinations = new HashMap<String, Destination>();
- + private static Map<String, Destination> temporaryDestinations = new HashMap<String, Destination>();
- + private List<String> created = new ArrayList<String>();
-
- public StompSession(ProtocolConverter protocolConverter, Session session) {
- this.protocolConverter = protocolConverter;
- @@ -61,14 +64,20 @@
-
- public void close() throws JMSException {
- session.close();
- + synchronized (temporaryDestinations) {
- + Iterator<String> i = created.iterator();
- + while (i.hasNext()) {
- + temporaryDestinations.remove(i.next());
- + }
- + }
- }
-
- public void sendToJms(StompFrame command) throws JMSException, ProtocolException {
- Map headers = command.getHeaders();
- String destinationName = (String) headers.remove(Stomp.Headers.Send.DESTINATION);
- Message message = convertFrame(command);
- + Destination destination = convertDestination(destinationName, false);
-
- - Destination destination = convertDestination(destinationName);
-
- int deliveryMode = getDeliveryMode(headers);
- int priority = getPriority(headers);
- @@ -83,7 +92,7 @@
- protocolConverter.sendToStomp(frame);
- }
-
- - public Destination convertDestination(String name) throws ProtocolException, JMSException {
- + public Destination convertDestination(String name, boolean forceNew) throws ProtocolException, JMSException {
- if (name == null) {
- throw new ProtocolException("No destination is specified!");
- }
- @@ -97,11 +106,22 @@
- }
- else if (name.startsWith("/temp-queue/")) {
- String tempName = name.substring("/temp-queue/".length(), name.length());
- - return temporaryDestination(tempName, session.createTemporaryQueue());
- + Destination answer = temporaryDestinations.get(tempName);
- +
- + if (forceNew || answer == null) {
- + return temporaryDestination(tempName, session.createTemporaryQueue());
- + } else {
- + return answer;
- + }
- }
- else if (name.startsWith("/temp-topic/")) {
- String tempName = name.substring("/temp-topic/".length(), name.length());
- - return temporaryDestination(tempName, session.createTemporaryTopic());
- + Destination answer = temporaryDestinations.get(tempName);
- + if (forceNew || answer == null) {
- + return temporaryDestination(tempName, session.createTemporaryTopic());
- + } else {
- + return answer;
- + }
- }
- else {
- throw new ProtocolException("Illegal destination name: [" + name + "] -- StompConnect destinations " +
- @@ -118,6 +138,7 @@
- Topic topic = (Topic) d;
- if (d instanceof TemporaryTopic) {
- buffer.append("/temp-topic/");
- + temporaryDestination(topic.getTopicName(), d);
- }
- else {
- buffer.append("/topic/");
- @@ -128,6 +149,7 @@
- Queue queue = (Queue) d;
- if (d instanceof TemporaryQueue) {
- buffer.append("/temp-queue/");
- + temporaryDestination(queue.getQueueName(), d);
- }
- else {
- buffer.append("/queue/");
- @@ -139,12 +161,17 @@
-
-
- protected synchronized Destination temporaryDestination(String tempName, Destination temporaryDestination) {
- - Destination answer = temporaryDestinations.get(tempName);
- - if (answer == null) {
- - temporaryDestinations.put(tempName, temporaryDestination);
- - answer = temporaryDestination;
- + synchronized (temporaryDestinations) {
- + temporaryDestinations.put(tempName, temporaryDestination);
- + created.add(tempName);
- + try {
- +// System.out.println("Added: " + tempName + " for " + ((Queue)temporaryDestination).getQueueName());
- + //throw new Throwable();
- + } catch (Throwable t) {
- + //t.printStackTrace();
- + }
- }
- - return answer;
- + return temporaryDestination;
- }
-
- protected int getDeliveryMode(Map headers) throws JMSException {
- @@ -223,7 +250,7 @@
-
- o = headers.remove(Stomp.Headers.Send.REPLY_TO);
- if (o != null) {
- - msg.setJMSReplyTo(convertDestination((String) o));
- + msg.setJMSReplyTo(convertDestination((String) o, false));
- }
-
- // now the general headers
- Index: src/main/java/org/codehaus/stomp/jms/StompSubscription.java
- ===================================================================
- --- src/main/java/org/codehaus/stomp/jms/StompSubscription.java (revision 88)
- +++ src/main/java/org/codehaus/stomp/jms/StompSubscription.java (working copy)
- @@ -53,7 +53,7 @@
- Map headers = frame.getHeaders();
- String selector = (String) headers.remove(Stomp.Headers.Subscribe.SELECTOR);
- String destinationName = (String) headers.get(Stomp.Headers.Subscribe.DESTINATION);
- - destination = session.convertDestination(destinationName);
- + destination = session.convertDestination(destinationName, true);
- Session jmsSession = session.getSession();
- boolean noLocal = false;
- Index: src/main/java/org/codehaus/stomp/tcp/TcpTransport.java
- ===================================================================
- --- src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (revision 88)
- +++ src/main/java/org/codehaus/stomp/tcp/TcpTransport.java (working copy)
- @@ -99,7 +99,9 @@
- /**
- * A one way asynchronous send
- */
- - public void onStompFrame(StompFrame command) throws Exception {
- + // PATCHED BY TOM - THIS CAN BE INVOKED BY THE RECEIPT FOR A SUBSCRIPTION AT THE SAME
- + // TIME AS THE FIRST MESSAGE IS RECEIVED
- + public synchronized void onStompFrame(StompFrame command) throws Exception {
- checkStarted();
- marshaller.marshal(command, dataOut);
- dataOut.flush();
- @@ -391,10 +393,10 @@
- }
-
- protected void initializeStreams() throws Exception {
- - TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
- - this.dataIn = new DataInputStream(buffIn);
- - TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- - this.dataOut = new DataOutputStream(buffOut);
- +// TcpBufferedInputStream buffIn = new TcpBufferedInputStream(socket.getInputStream(), ioBufferSize);
- + this.dataIn = new DataInputStream(socket.getInputStream());//new DataInputStream(buffIn);
- + // TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(), ioBufferSize);
- + this.dataOut = new DataOutputStream(socket.getOutputStream());//new DataOutputStream(buffOut);
- }
-
- protected void closeStreams() throws IOException {