AsyncTopicSubscriber.java
上传用户:dinglihq
上传日期:2013-02-04
资源大小:99958k
文件大小:4k
源码类别:

Java编程

开发平台:

Java

  1. package bible.jms;
  2. import javax.jms.*;
  3. import javax.naming.*;
  4. import java.util.*;
  5. /**
  6.  * Class AsyncTopicSubscriber
  7.  *
  8.  *
  9.  * @author
  10.  * @version %I%, %G%
  11.  */
  12. public class AsyncTopicSubscriber implements MessageListener {
  13.   // Instance variables.
  14.   private boolean                finished           = false;
  15.   private Context                ctx                = null;
  16.   private Hashtable              ht                 = null;
  17.   private TopicConnectionFactory tConnectionFactory = null;
  18.   private TopicConnection        tConnection        = null;
  19.   private TopicSession           tSession           = null;
  20.   private TopicSubscriber        tSubscriber        = null;
  21.   private Topic                  t                  = null;
  22.   /**
  23.    * Method init
  24.    *
  25.    *
  26.    */
  27.   public void init() {
  28.     try {
  29.       // Obtain references to JMS queue components.
  30.       ht = new Hashtable();
  31.       ht.put(Context.INITIAL_CONTEXT_FACTORY,
  32.              "weblogic.jndi.WLInitialContextFactory");
  33.       ht.put(Context.PROVIDER_URL, "t3://localhost:7001");
  34.       ctx                = new InitialContext(ht);
  35.       tConnectionFactory =
  36.         (TopicConnectionFactory) ctx.lookup("BibleJMSFactory");
  37.       tConnection        = tConnectionFactory.createTopicConnection();
  38.       tSession           = tConnection.createTopicSession(false,
  39.         javax.jms.QueueSession.AUTO_ACKNOWLEDGE);
  40.       t                  = (Topic) ctx.lookup("BibleJMSTopic");
  41.       tSubscriber        = tSession.createSubscriber(t);
  42.       tSubscriber.setMessageListener(this);
  43.       // Must start the connection in order to listen.
  44.       tConnection.start();
  45.     } catch (Exception e) {
  46.       e.printStackTrace();
  47.     }
  48.   }
  49.   /**
  50.    * Method close
  51.    *
  52.    *
  53.    * @throws JMSException
  54.    *
  55.    */
  56.   public void close() throws JMSException {
  57.     try {
  58.       // Release JMS resources.
  59.       tSubscriber.close();
  60.       tSession.close();
  61.       tConnection.close();
  62.     } catch (Exception e) {
  63.       e.printStackTrace();
  64.     }
  65.   }
  66.   /**
  67.    * Method onMessage
  68.    *
  69.    *
  70.    * @param message
  71.    *
  72.    */
  73.   public void onMessage(Message message) {
  74.     if (message instanceof BytesMessage) {
  75.       BytesMessage bytesMessage = (BytesMessage) message;
  76.       // Process bytesMessage here
  77.     } else {
  78.       if (message instanceof MapMessage) {
  79.         MapMessage mapMessage = (MapMessage) message;
  80.         // Process mapMessage here
  81.       } else {
  82.         if (message instanceof ObjectMessage) {
  83.           ObjectMessage objectMessage = (ObjectMessage) message;
  84.           // Process objectMessage here
  85.         } else {
  86.           if (message instanceof StreamMessage) {
  87.             StreamMessage streamMessage = (StreamMessage) message;
  88.             // Process streamMessage here
  89.           } else {
  90.             if (message instanceof TextMessage) {
  91.               try {
  92.                 TextMessage textMessage = (TextMessage) message;
  93.                 // Process textMessage here
  94.                 String msg = textMessage.getText();
  95.                 System.out.println("Received message: " + msg);
  96.                 if (msg.equals("Stop")) {
  97.                   synchronized (this) {
  98.                     finished = true;
  99.                     this.notifyAll();  // Notify main thread to quit
  100.                   }
  101.                 }
  102.               } catch (Exception e) {
  103.                 e.printStackTrace();
  104.               }
  105.             }
  106.           }
  107.         }
  108.       }
  109.     }
  110.   }
  111.   /**
  112.    * Method main
  113.    *
  114.    *
  115.    * @param args
  116.    *
  117.    */
  118.   public static void main(String[] args) {
  119.     try {
  120.       AsyncTopicSubscriber ts = new AsyncTopicSubscriber();
  121.       ts.init();
  122.       System.out.println("Ready to receive messages....");
  123.       // Receive text messages until one is equal to "Stop"
  124.       synchronized (ts) {
  125.         while (!ts.finished) {
  126.           try {
  127.             ts.wait();
  128.           } catch (InterruptedException ie) {}
  129.         }
  130.       }
  131.       ts.close();
  132.     } catch (Exception e) {
  133.       e.printStackTrace();
  134.     }
  135.   }
  136. }
  137. /*--- Formatted in Bible Style on Thu, Sep 6, '01 ---*/
  138. /*------ Formatted by Jindent 3.24 Gold 1.02 --- http://www.jindent.de ------*/