JxtaServerSocketService.java
上传用户:damzkj
上传日期:2022-01-07
资源大小:24k
文件大小:5k
源码类别:

P2P编程

开发平台:

Java

  1. package jxtamessenger.service;
  2. import java.io.DataInput;
  3. import java.io.DataInputStream;
  4. import java.io.IOException;
  5. import java.io.InputStream;
  6. import java.net.Socket;
  7. import java.net.SocketException;
  8. import java.util.concurrent.ExecutorService;
  9. import java.util.concurrent.Executors;
  10. import java.util.logging.Logger;
  11. import jxtamessenger.ChatWindow;
  12. import jxtamessenger.MainApplicationWindow;
  13. import jxtamessenger.bean.ChatMessage;
  14. import jxtamessenger.bean.OnlineMessage;
  15. import jxtamessenger.util.Constants;
  16. import jxtamessenger.util.MiscUtil;
  17. import jxtamessenger.util.PipeUtil;
  18. import jxtamessenger.util.ThreadPoolUtil;
  19. import jxtamessenger.xml.XmlParser;
  20. import net.jxta.peergroup.PeerGroup;
  21. import net.jxta.pipe.PipeService;
  22. import net.jxta.socket.JxtaServerSocket;
  23. import org.eclipse.jface.viewers.TableViewer;
  24. import org.eclipse.swt.widgets.Display;
  25. import org.eclipse.swt.widgets.Text;
  26. public class JxtaServerSocketService implements Service {
  27. private static final Logger LOG = Logger.getLogger(JxtaServerSocketService.class.getName());
  28. private TableViewer viewer;
  29. private PeerGroup pg;
  30. public static final String JxtaServerSocketPipeAdvPrefix = "JxtaServerSocketPipeAdv.";
  31.     private JxtaServerSocket serverSocket = null;
  32. private final ExecutorService pool;
  33. public JxtaServerSocketService(PeerGroup pg, TableViewer viewer) {
  34.         try {
  35.          this.viewer = viewer;
  36.          this.pg = pg;
  37.             this.serverSocket = new JxtaServerSocket(pg, 
  38.              PipeUtil.getPipeAdv(pg, 
  39.              JxtaServerSocketPipeAdvPrefix + MiscUtil.getHostName(), 
  40.              PipeService.UnicastType, 
  41.              true));
  42.         } catch (IOException ioe) {
  43.             ioe.printStackTrace(System.out);
  44.         }
  45.         
  46.         if (this.serverSocket != null) {
  47.             try {
  48.                 this.serverSocket.setSoTimeout(0);
  49.             } catch (SocketException se) {
  50.                 se.printStackTrace(System.out);
  51.             }
  52.         }
  53.         
  54. pool = Executors.newCachedThreadPool();
  55. }
  56. public void shutdownAndAwaitTermination() {
  57. ThreadPoolUtil.shutdownAndAwaitTermination(pool);
  58. if (this.serverSocket != null) {
  59.             try {
  60.                 this.serverSocket.close();
  61.             } catch (IOException ioe) {
  62.                 ioe.printStackTrace();
  63.             }
  64. }
  65. }
  66. public void run() {
  67. try {
  68. for (;;) {
  69. pool.execute(new Handler(serverSocket.accept()));
  70. }
  71. } catch(IOException e) {
  72. // e.printStackTrace();
  73. pool.shutdown();
  74. } catch (Exception e) {
  75. // ignore
  76. // RejectedExecutionException(线程池关闭shutdown/saturated, 抛出java.util.concurrent.RejectedExecutionException)
  77. //  e.printStackTrace();
  78. }
  79. }
  80. class Handler implements Runnable {
  81.     private final Socket socket;
  82.     Handler(Socket socket) { this.socket = socket; }
  83.     public void run() {
  84. try {
  85. while(true) {
  86. InputStream in = socket.getInputStream();
  87. DataInput dis = new DataInputStream(in);
  88. int type = dis.readInt();
  89. if(type == Constants.MESSAGE_EXITWINDOW) {
  90. // 如果对应窗口打开,那么将该窗口禁用,该用户会话已结束
  91. // String hostName = dis.readUTF();
  92. // String userName = dis.readUTF();
  93. // LOG.info("host name=" + hostName);
  94. // if(!MainApplicationWindow.chatwin.containsKey(hostName)) {
  95. // ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(hostName);
  96. // if(chatWindow != null)
  97. // chatWindow.disableInputAndSend(userName);
  98. // }
  99. // TODO: Close socket gracefully! Memory leak!
  100. socket.shutdownInput();
  101. socket.shutdownOutput();
  102. // socket.close();
  103. break;
  104. } else if(type == Constants.MESSAGE_CHAT) {
  105. // 交谈
  106. int length = dis.readInt();
  107. StringBuffer sb = new StringBuffer();
  108. for(int i=0; i<length; i++)
  109. sb.append(dis.readUTF());
  110. final ChatMessage msg = (ChatMessage)XmlParser.getObject(sb.toString());
  111. Display.getDefault().asyncExec(new Runnable() {
  112. @SuppressWarnings("unchecked")
  113. public void run(){
  114. synchronized(viewer) {
  115. ChatWindow chatWindow = null;
  116.                      if(!MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
  117.                      OnlineMessage onlineMessage = new OnlineMessage();
  118.                      onlineMessage.setHostName(msg.getHostName());
  119.                      onlineMessage.setUserName(msg.getUserName());
  120.                  chatWindow = new ChatWindow(viewer.getControl().getShell(), pg, onlineMessage);
  121.                  MainApplicationWindow.chatwin.put(msg.getHostName(), chatWindow);
  122.                  chatWindow.open();
  123.                      } else {
  124.                      chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
  125.                      chatWindow.getShell().forceActive();
  126.                      }
  127.                      Text up_text = chatWindow.getUp_text();
  128.                      up_text.append(msg.getUserName());
  129.          up_text.append(":");
  130.          up_text.append(msg.getMessage());
  131.          up_text.append("rn");
  132. }
  133. }
  134. });
  135. } else if(type == Constants.MESSAGE_FILE) {
  136. // 文件传输
  137. } else if(type == Constants.MESSAGE_DIRECTORY) {
  138. // 目录传输
  139. } else {
  140. // ignore
  141. }
  142. }
  143. LOG.info("");
  144. } catch (IOException e) {
  145. e.printStackTrace();
  146. }
  147.     }
  148. }
  149. }