- package jxtamessenger.service;
- import java.io.DataInput;
- import java.io.DataInputStream;
- import java.io.IOException;
- import java.io.InputStream;
- import java.net.Socket;
- import java.net.SocketException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.logging.Logger;
- import jxtamessenger.ChatWindow;
- import jxtamessenger.MainApplicationWindow;
- import jxtamessenger.bean.ChatMessage;
- import jxtamessenger.bean.OnlineMessage;
- import jxtamessenger.util.Constants;
- import jxtamessenger.util.MiscUtil;
- import jxtamessenger.util.PipeUtil;
- import jxtamessenger.util.ThreadPoolUtil;
- import jxtamessenger.xml.XmlParser;
- import net.jxta.peergroup.PeerGroup;
- import net.jxta.pipe.PipeService;
- import net.jxta.socket.JxtaServerSocket;
- import org.eclipse.jface.viewers.TableViewer;
- import org.eclipse.swt.widgets.Display;
- import org.eclipse.swt.widgets.Text;
- public class JxtaServerSocketService implements Service {
- private static final Logger LOG = Logger.getLogger(JxtaServerSocketService.class.getName());
- private TableViewer viewer;
- private PeerGroup pg;
- public static final String JxtaServerSocketPipeAdvPrefix = "JxtaServerSocketPipeAdv.";
- private JxtaServerSocket serverSocket = null;
- private final ExecutorService pool;
- public JxtaServerSocketService(PeerGroup pg, TableViewer viewer) {
- try {
- this.viewer = viewer;
- this.pg = pg;
- this.serverSocket = new JxtaServerSocket(pg,
- PipeUtil.getPipeAdv(pg,
- JxtaServerSocketPipeAdvPrefix + MiscUtil.getHostName(),
- PipeService.UnicastType,
- true));
- } catch (IOException ioe) {
- ioe.printStackTrace(System.out);
- }
- if (this.serverSocket != null) {
- try {
- this.serverSocket.setSoTimeout(0);
- } catch (SocketException se) {
- se.printStackTrace(System.out);
- }
- }
- pool = Executors.newCachedThreadPool();
- }
- public void shutdownAndAwaitTermination() {
- ThreadPoolUtil.shutdownAndAwaitTermination(pool);
- if (this.serverSocket != null) {
- try {
- this.serverSocket.close();
- } catch (IOException ioe) {
- ioe.printStackTrace();
- }
- }
- }
- public void run() {
- try {
- for (;;) {
- pool.execute(new Handler(serverSocket.accept()));
- }
- } catch(IOException e) {
- // e.printStackTrace();
- pool.shutdown();
- } catch (Exception e) {
- // ignore
- // RejectedExecutionException(线程池关闭shutdown/saturated, 抛出java.util.concurrent.RejectedExecutionException)
- // e.printStackTrace();
- }
- }
- class Handler implements Runnable {
- private final Socket socket;
- Handler(Socket socket) { this.socket = socket; }
- public void run() {
- try {
- while(true) {
- InputStream in = socket.getInputStream();
- DataInput dis = new DataInputStream(in);
- int type = dis.readInt();
- if(type == Constants.MESSAGE_EXITWINDOW) {
- // 如果对应窗口打开,那么将该窗口禁用,该用户会话已结束
- // String hostName = dis.readUTF();
- // String userName = dis.readUTF();
- // LOG.info("host name=" + hostName);
- // if(!MainApplicationWindow.chatwin.containsKey(hostName)) {
- // ChatWindow chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(hostName);
- // if(chatWindow != null)
- // chatWindow.disableInputAndSend(userName);
- // }
- // TODO: Close socket gracefully! Memory leak!
- socket.shutdownInput();
- socket.shutdownOutput();
- // socket.close();
- break;
- } else if(type == Constants.MESSAGE_CHAT) {
- // 交谈
- int length = dis.readInt();
- StringBuffer sb = new StringBuffer();
- for(int i=0; i<length; i++)
- sb.append(dis.readUTF());
- final ChatMessage msg = (ChatMessage)XmlParser.getObject(sb.toString());
- Display.getDefault().asyncExec(new Runnable() {
- @SuppressWarnings("unchecked")
- public void run(){
- synchronized(viewer) {
- ChatWindow chatWindow = null;
- if(!MainApplicationWindow.chatwin.containsKey(msg.getHostName())) {
- OnlineMessage onlineMessage = new OnlineMessage();
- onlineMessage.setHostName(msg.getHostName());
- onlineMessage.setUserName(msg.getUserName());
- chatWindow = new ChatWindow(viewer.getControl().getShell(), pg, onlineMessage);
- MainApplicationWindow.chatwin.put(msg.getHostName(), chatWindow);
- chatWindow.open();
- } else {
- chatWindow = (ChatWindow)MainApplicationWindow.chatwin.get(msg.getHostName());
- chatWindow.getShell().forceActive();
- }
- Text up_text = chatWindow.getUp_text();
- up_text.append(msg.getUserName());
- up_text.append(":");
- up_text.append(msg.getMessage());
- up_text.append("rn");
- }
- }
- });
- } else if(type == Constants.MESSAGE_FILE) {
- // 文件传输
- } else if(type == Constants.MESSAGE_DIRECTORY) {
- // 目录传输
- } else {
- // ignore
- }
- }
- LOG.info("");
- } catch (IOException e) {
- e.printStackTrace();
- }
- }
- }
- }