ServiceDispatcher.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:3k
- /*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat, Inc., and others contributors as indicated
- * by the @authors tag. All rights reserved.
- * See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- * This copyrighted material is made available to anyone wishing to use,
- * modify, copy, or redistribute it subject to the terms and conditions
- * of the GNU Lesser General Public License, v. 2.1.
- * This program is distributed in the hope that it will be useful, but WITHOUT A
- * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
- * PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
- * You should have received a copy of the GNU Lesser General Public License,
- * v.2.1 along with this distribution; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
- * MA 02110-1301, USA.
- */
- package org.jboss.blacktie.jatmibroker.core.server;
- import org.apache.log4j.LogManager;
- import org.apache.log4j.Logger;
- import org.jboss.blacktie.jatmibroker.core.conf.ConfigurationException;
- import org.jboss.blacktie.jatmibroker.core.transport.Message;
- import org.jboss.blacktie.jatmibroker.core.transport.Receiver;
- import org.jboss.blacktie.jatmibroker.xatmi.BlacktieService;
- import org.jboss.blacktie.jatmibroker.xatmi.Connection;
- import org.jboss.blacktie.jatmibroker.xatmi.ConnectionException;
- import org.jboss.blacktie.jatmibroker.xatmi.Response;
- import org.jboss.blacktie.jatmibroker.xatmi.Service;
- import org.jboss.blacktie.jatmibroker.xatmi.TPSVCINFO;
- public class ServiceDispatcher extends Service implements Runnable {
- private static final Logger log = LogManager
- .getLogger(ServiceDispatcher.class);
- private BlacktieService callback;
- private Receiver receiver;
- private Thread thread;
- private volatile boolean closed;
- ServiceDispatcher(String serviceName, BlacktieService callback,
- Receiver receiver) throws ConfigurationException,
- ConnectionException {
- super(serviceName);
- this.callback = callback;
- this.receiver = receiver;
- thread = new Thread(this, serviceName + "-Dispatcher");
- thread.start();
- log.debug("Created: " + thread.getName());
- }
- public void run() {
- log.debug("Running");
- try {
- while (!closed) {
- try {
- Message message = receiver.receive(0);
- log.trace("Received");
- try {
- this.processMessage(message);
- log.trace("Processed");
- } catch (Throwable t) {
- log.error("Can't process the message", t);
- }
- } catch (ConnectionException e) {
- if (closed) {
- throw e;
- }
- if (e.getTperrno() == Connection.TPETIME) {
- log.debug("Got a timeout");
- } else {
- throw e;
- }
- }
- }
- } catch (Throwable t) {
- if (!closed) {
- log
- .error("Could not receive the message: "
- + t.getMessage(), t);
- } else {
- log.debug("Did not receive the message during shutdown: "
- + t.getMessage(), t);
- }
- }
- }
- public void startClose() {
- closed = true;
- log.trace("Closed set");
- }
- public void close() throws ConnectionException {
- log.trace("closing");
- try {
- log.trace("Joining");
- thread.join();
- log.trace("Joined");
- } catch (InterruptedException e) {
- log.error("Could not join the dispatcher", e);
- }
- super.close();
- log.trace("closed");
- }
- public Response tpservice(TPSVCINFO svcinfo) {
- log.trace("Invoking callback");
- return callback.tpservice(svcinfo);
- }
- }