Service.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:8k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.xatmi;
  19. import java.util.Properties;
  20. import org.apache.log4j.LogManager;
  21. import org.apache.log4j.Logger;
  22. import org.jboss.blacktie.jatmibroker.core.conf.AtmiBrokerClientXML;
  23. import org.jboss.blacktie.jatmibroker.core.conf.ConfigurationException;
  24. import org.jboss.blacktie.jatmibroker.core.transport.JtsTransactionImple;
  25. import org.jboss.blacktie.jatmibroker.core.transport.Message;
  26. import org.jboss.blacktie.jatmibroker.core.transport.Sender;
  27. import org.jboss.blacktie.jatmibroker.core.transport.Transport;
  28. import org.jboss.blacktie.jatmibroker.core.transport.TransportFactory;
  29. import org.jboss.blacktie.jatmibroker.jab.JABException;
  30. import org.jboss.blacktie.jatmibroker.jab.JABTransaction;
  31. /**
  32.  * All services should extend this class as it provides the core service
  33.  * template method.
  34.  */
  35. public abstract class Service implements BlacktieService {
  36. /**
  37.  * The logger to use.
  38.  */
  39. private static final Logger log = LogManager.getLogger(Service.class);
  40. /**
  41.  * The transport to use.
  42.  */
  43. private Transport transport;
  44. /**
  45.  * The properties to use
  46.  */
  47. private Properties properties;
  48. /**
  49.  * The service needs the name of the service so that it can be resolved in
  50.  * the btconfig.xml file
  51.  * 
  52.  * @param name
  53.  *            The name of the service
  54.  * @throws ConfigurationException
  55.  * @throws ConnectionException
  56.  */
  57. public Service(String name) throws ConfigurationException,
  58. ConnectionException {
  59. AtmiBrokerClientXML xml = new AtmiBrokerClientXML();
  60. properties = xml.getProperties();
  61. transport = TransportFactory.loadTransportFactory(name, properties)
  62. .createTransport();
  63. log.debug("Service created: " + name);
  64. }
  65. /**
  66.  * Clean up resources for this service
  67.  * 
  68.  * @throws ConnectionException
  69.  */
  70. public void close() throws ConnectionException {
  71. transport.close();
  72. }
  73. /**
  74.  * Entry points should pass control to this method as soon as reasonably
  75.  * possible.
  76.  * 
  77.  * @param message
  78.  *            The message to process
  79.  * @throws ConnectionException
  80.  *             In case communication fails
  81.  */
  82. protected void processMessage(Message message) throws ConnectionException {
  83. if (message.control != null) {
  84. try {
  85. JABTransaction.associateTx(message.control); // associate tx
  86. // with current
  87. // thread
  88. } catch (JABException e) {
  89. log.warn("Got an invalid tx from queue: " + e);
  90. }
  91. }
  92. if (JtsTransactionImple.hasTransaction()) {
  93. log
  94. .error("Blacktie MDBs must not be called with a transactional context");
  95. } else {
  96. log.trace("Service invoked");
  97. }
  98. log.trace("obtained transport");
  99. Sender sender = null;
  100. boolean hasTPNOREPLY = (message.flags & Connection.TPNOREPLY) == Connection.TPNOREPLY;
  101. if (!hasTPNOREPLY && message.replyTo != null
  102. && !message.replyTo.equals("")) {
  103. sender = transport.createSender(message.replyTo);
  104. } else {
  105. log.trace("NO REPLY TO REQUIRED");
  106. }
  107. Session session = null;
  108. try {
  109. boolean hasTPCONV = (message.flags & Connection.TPCONV) == Connection.TPCONV;
  110. if (hasTPCONV) {
  111. session = new Session(properties, transport, message.cd, sender);
  112. log.debug("Created the session");
  113. int olen = 4;
  114. X_OCTET odata = new X_OCTET();
  115. odata.setByteArray("ACK".getBytes());
  116. long result = session.tpsend(odata, olen, 0);
  117. if (result == -1) {
  118. log.debug("Could not send ack");
  119. session.close();
  120. return;
  121. } else {
  122. log.debug("Sent ack");
  123. session.setCreatedState(message.flags);
  124. }
  125. } else {
  126. log.debug("cd not being set");
  127. }
  128. // THIS IS THE FIRST CALL
  129. Buffer buffer = null;
  130. if (message.type != null && !message.type.equals("")) {
  131. if (message.type.equals("X_OCTET")) {
  132. log.debug("Initializing a new X_OCTET");
  133. buffer = new X_OCTET(message.data);
  134. } else if (message.type.equals("X_C_TYPE")) {
  135. log.debug("Initializing a new X_C_TYPE");
  136. buffer = new X_C_TYPE(message.subtype, properties,
  137. message.data);
  138. } else {
  139. log.debug("Initializing a new X_COMMON");
  140. buffer = new X_COMMON(message.subtype, properties,
  141. message.data);
  142. }
  143. }
  144. // TODO NO SESSIONS
  145. // NOT PASSING OVER THE SERVICE NAME
  146. TPSVCINFO tpsvcinfo = new TPSVCINFO(message.serviceName, buffer,
  147. message.flags, session, properties);
  148. log.debug("Prepared the data for passing to the service");
  149. boolean hasTx = (message.control != null && message.control
  150. .length() != 0);
  151. log.debug("hasTx=" + hasTx + " ior: " + message.control);
  152. if (hasTx) // make sure any foreign tx is resumed before calling the
  153. // service routine
  154. JtsTransactionImple.resume(message.control);
  155. log.debug("Invoking the XATMI service");
  156. Response response = null;
  157. try {
  158. response = tpservice(tpsvcinfo);
  159. log.debug("Service invoked");
  160. if (!hasTPNOREPLY && response == null) {
  161. log.error("Error, expected response but none returned");
  162. response = new Response(Connection.TPFAIL,
  163. Connection.TPESVCERR, null, 0, 0);
  164. }
  165. } catch (Throwable t) {
  166. log.error("Service error detected", t);
  167. response = new Response(Connection.TPFAIL,
  168. Connection.TPESVCERR, null, 0, 0);
  169. }
  170. if (hasTx) // and suspend it again
  171. JtsTransactionImple.suspend();
  172. if (sender != null && response != null) {
  173. log.trace("Sending response");
  174. int rcode = response.rcode;
  175. if (rcode == Connection.TPESVCERR) {
  176. if (JABTransaction.current() != null) {
  177. try {
  178. JABTransaction.current().rollback_only();
  179. } catch (JABException e) {
  180. throw new ConnectionException(Connection.TPESYSTEM,
  181. "Could not mark transaction for rollback only");
  182. }
  183. }
  184. }
  185. short rval = response.getRval();
  186. if (rval != Connection.TPSUCCESS && rval != Connection.TPFAIL) {
  187. rval = Connection.TPFAIL;
  188. }
  189. if (rval == Connection.TPFAIL) {
  190. if (JABTransaction.current() != null) {
  191. try {
  192. JABTransaction.current().rollback_only();
  193. } catch (JABException e) {
  194. throw new ConnectionException(Connection.TPESYSTEM,
  195. "Could not mark transaction for rollback only");
  196. }
  197. }
  198. }
  199. Buffer toSend = response.getBuffer();
  200. int len = response.getLen();
  201. String type = null;
  202. String subtype = null;
  203. byte[] data = null;
  204. if (toSend != null) {
  205. data = toSend.serialize();
  206. type = toSend.getType();
  207. subtype = toSend.getSubtype();
  208. if (!type.equals("X_OCTET")) {
  209. len = data.length;
  210. }
  211. }
  212. log.debug("Returning desired message");
  213. sender.send("", rval, rcode, data, len, response.getFlags(), 0,
  214. 0, type, subtype);
  215. } else if (sender == null && response != null) {
  216. log.error("No sender avaible but message to be sent");
  217. } else if (sender != null && response == null) {
  218. log.error("Returning error - marking tx as rollback only if ");
  219. if (JABTransaction.current() != null) {
  220. try {
  221. JABTransaction.current().rollback_only();
  222. } catch (JABException e) {
  223. throw new ConnectionException(Connection.TPESYSTEM,
  224. "Could not mark transaction for rollback only");
  225. }
  226. }
  227. log.debug("Returning failed message");
  228. sender.send("", Connection.TPFAIL, Connection.TPESVCERR, null,
  229. 0, 0, 0, 0, null, null);
  230. log.error("Returned error");
  231. } else {
  232. log.debug("No need to send a response");
  233. }
  234. } finally {
  235. if (session != null) {
  236. session.close();
  237. } else if (sender != null) {
  238. sender.close();
  239. }
  240. }
  241. }
  242. }