Session.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:10k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.xatmi;
  19. import java.util.Properties;
  20. import org.apache.log4j.LogManager;
  21. import org.apache.log4j.Logger;
  22. import org.jboss.blacktie.jatmibroker.core.conf.ConfigurationException;
  23. import org.jboss.blacktie.jatmibroker.core.transport.EventListener;
  24. import org.jboss.blacktie.jatmibroker.core.transport.Message;
  25. import org.jboss.blacktie.jatmibroker.core.transport.Receiver;
  26. import org.jboss.blacktie.jatmibroker.core.transport.Sender;
  27. import org.jboss.blacktie.jatmibroker.core.transport.Transport;
  28. import org.jboss.blacktie.jatmibroker.jab.JABException;
  29. import org.jboss.blacktie.jatmibroker.jab.JABTransaction;
  30. /**
  31.  * This is the session to send data on.
  32.  */
  33. public class Session {
  34. private static final Logger log = LogManager.getLogger(Session.class);
  35. /**
  36.  * The transport to manage data on
  37.  */
  38. private Transport transport;
  39. /**
  40.  * The descriptor
  41.  */
  42. private int cd;
  43. /**
  44.  * The sessions sender
  45.  */
  46. private Sender sender;
  47. /**
  48.  * The sessions receiver
  49.  */
  50. private Receiver receiver;
  51. /**
  52.  * The event listener allows us to hear events on tpsend
  53.  */
  54. private EventListener eventListener;
  55. /**
  56.  * The last event received by the session, this is either discon, SUCC,
  57.  * FAIL, ERR
  58.  */
  59. private long lastEvent = -1;
  60. /**
  61.  * The last rcode
  62.  */
  63. private int lastRCode = 0;
  64. /**
  65.  * Is the session in read mode.
  66.  */
  67. private boolean canSend = true;
  68. /**
  69.  * Is the session in write mode, so to speak.
  70.  */
  71. private boolean canRecv = true;
  72. private Properties properties;
  73. /**
  74.  * Create a new session
  75.  * 
  76.  * @param transport
  77.  * @param cd
  78.  * @param c
  79.  * @param b
  80.  * @throws ConfigurationException
  81.  */
  82. Session(Properties properties, Transport transport, int cd)
  83. throws ConnectionException {
  84. this.properties = properties;
  85. this.transport = transport;
  86. this.cd = cd;
  87. this.eventListener = new EventListenerImpl(this);
  88. this.receiver = transport.createReceiver(eventListener);
  89. this.canSend = false;
  90. this.canRecv = true;
  91. }
  92. /**
  93.  * Create a new session
  94.  * 
  95.  * @param transport
  96.  * @param cd
  97.  * @param receiver2
  98.  * @throws ConfigurationException
  99.  */
  100. Session(Properties properties, Transport transport, int cd, Sender sender)
  101. throws ConnectionException {
  102. this.properties = properties;
  103. this.transport = transport;
  104. this.cd = cd;
  105. this.sender = sender;
  106. this.eventListener = new EventListenerImpl(this);
  107. this.receiver = transport.createReceiver(eventListener);
  108. this.canRecv = false;
  109. this.canSend = true;
  110. }
  111. void setCreatorState(long sentFlags) {
  112. // Sort out session state
  113. if ((sentFlags & Connection.TPSENDONLY) == Connection.TPSENDONLY) {
  114. canSend = true;
  115. canRecv = false;
  116. } else if ((sentFlags & Connection.TPRECVONLY) == Connection.TPRECVONLY) {
  117. canSend = false;
  118. canRecv = true;
  119. }
  120. }
  121. void setCreatedState(long receivedFlags) {
  122. // Sort out session state
  123. if ((receivedFlags & Connection.TPSENDONLY) == Connection.TPSENDONLY) {
  124. canSend = false;
  125. canRecv = true;
  126. } else if ((receivedFlags & Connection.TPRECVONLY) == Connection.TPRECVONLY) {
  127. canSend = true;
  128. canRecv = false;
  129. }
  130. }
  131. /**
  132.  * Close the session
  133.  * 
  134.  * @throws ConnectionException
  135.  */
  136. void close() throws ConnectionException {
  137. log.debug("Closing session");
  138. if (sender != null) {
  139. log.debug("Sender closing");
  140. sender.close();
  141. sender = null;
  142. }
  143. if (receiver != null) {
  144. log.debug("Receiver closing");
  145. receiver.close();
  146. receiver = null;
  147. }
  148. log.debug("Closed session");
  149. }
  150. /**
  151.  * Send a buffer to a remote server in a conversation
  152.  * 
  153.  * @param cd
  154.  *            The connection descriptor
  155.  * @param idata
  156.  *            The outbound data
  157.  * @param flags
  158.  *            The flags to use
  159.  */
  160. public int tpsend(Buffer toSend, int len, int flags)
  161. throws ConnectionException {
  162. int toReturn = -1;
  163. log.debug("tpsend invoked");
  164. int toCheck = flags & ~(Connection.TPRECVONLY | Connection.TPNOBLOCK | Connection.TPNOTIME | Connection.TPSIGRSTRT);
  165. if (toCheck != 0) {
  166. log.trace("invalid flags remain: " + toCheck);
  167. throw new ConnectionException(Connection.TPEINVAL, "Invalid flags remain: " + toCheck);
  168. }
  169. if (this.lastEvent > -1) {
  170. throw new ConnectionException(Connection.TPEEVENT, lastEvent,
  171. lastRCode, "Event existed on descriptor: " + lastEvent,
  172. null);
  173. } else if (!canSend) {
  174. throw new ConnectionException(Connection.TPEPROTO,
  175. "Session can't send");
  176. }
  177. // Can only send in certain circumstances
  178. if (sender != null) {
  179. log.debug("Sender not null, sending");
  180. String type = null;
  181. String subtype = null;
  182. byte[] data = null;
  183. if (toSend != null) {
  184. data = toSend.serialize();
  185. type = toSend.getType();
  186. subtype = toSend.getSubtype();
  187. if (!type.equals("X_OCTET")) {
  188. len = data.length;
  189. }
  190. }
  191. sender.send(receiver.getReplyTo(), (short) 0, 0, data, len, cd,
  192. flags, 0, type, subtype);
  193. // Sort out session state
  194. if ((flags & Connection.TPRECVONLY) == Connection.TPRECVONLY) {
  195. canSend = false;
  196. canRecv = true;
  197. }
  198. toReturn = 0;
  199. } else {
  200. throw new ConnectionException(-1, "Session in receive mode", null);
  201. }
  202. return toReturn;
  203. }
  204. /**
  205.  * Received the next response in a conversation
  206.  * 
  207.  * @param cd
  208.  *            The connection descriptor to use
  209.  * @param flags
  210.  *            The flags to use
  211.  * @return The next response
  212.  */
  213. public Buffer tprecv(int flags) throws ConnectionException {
  214. log.debug("Receiving");
  215. int toCheck = flags & ~(Connection.TPNOCHANGE | Connection.TPNOBLOCK | Connection.TPNOTIME | Connection.TPSIGRSTRT);
  216. if (toCheck != 0) {
  217. log.trace("invalid flags remain: " + toCheck);
  218. throw new ConnectionException(Connection.TPEINVAL, "Invalid flags remain: " + toCheck);
  219. }
  220. if (!canRecv) {
  221. throw new ConnectionException(Connection.TPEPROTO,
  222. "Session can't receive");
  223. }
  224. Message m = receiver.receive(flags);
  225. // Prepare the outbound channel
  226. if (m.replyTo == null
  227. || (sender != null && !m.replyTo.equals(sender.getSendTo()))) {
  228. log.trace("Send to location has altered");
  229. sender.close();
  230. sender = null;
  231. }
  232. if (sender == null && m.replyTo != null && !m.replyTo.equals("")) {
  233. log.trace("Will require a new sender");
  234. sender = transport.createSender(m.replyTo);
  235. } else {
  236. log.debug("Not setting the sender");
  237. }
  238. X_OCTET received = null;
  239. if (m.type != null) {
  240. received = (X_OCTET) tpalloc("X_OCTET", null);
  241. received.setByteArray(m.data);
  242. }
  243. log.debug("Prepared and ready to launch");
  244. // Sort out session state
  245. if ((m.flags & Connection.TPRECVONLY) == Connection.TPRECVONLY) {
  246. canSend = true;
  247. canRecv = false;
  248. }
  249. // Check we didn't just get an event while waiting
  250. if (this.lastEvent > -1) {
  251. throw new ConnectionException(Connection.TPEEVENT, lastEvent,
  252. lastRCode, "Event existed on descriptor: " + lastEvent,
  253. received);
  254. } else if ((m.flags & Connection.TPRECVONLY) == Connection.TPRECVONLY) {
  255. throw new ConnectionException(Connection.TPEEVENT,
  256. Connection.TPEV_SENDONLY, 0, "Reporting send only event",
  257. received);
  258. } else if (m.rval == Connection.TPSUCCESS) {
  259. throw new ConnectionException(Connection.TPEEVENT,
  260. Connection.TPEV_SVCSUCC, 0,
  261. "Service completed successfully event", received);
  262. }
  263. return received;
  264. }
  265. /**
  266.  * Close the conversation with the remote service.
  267.  * 
  268.  * @param cd
  269.  *            The connection descriptor to use
  270.  */
  271. public void tpdiscon() throws ConnectionException {
  272. if (JABTransaction.current() != null) {
  273. try {
  274. JABTransaction.current().rollback_only();
  275. } catch (JABException e) {
  276. throw new ConnectionException(Connection.TPESYSTEM,
  277. "Could not mark transaction for rollback only");
  278. }
  279. }
  280. try {
  281. sender.send("", EventListener.DISCON_CODE, 0, null, 0, cd, 0, 0,
  282. null, null);
  283. } catch (org.omg.CORBA.OBJECT_NOT_EXIST one) {
  284. log.warn("The disconnect called failed to notify the remote end");
  285. log.debug("The disconnect called failed to notify the remote end",
  286. one);
  287. }
  288. }
  289. /**
  290.  * Return the connection descriptor
  291.  * 
  292.  * @return
  293.  */
  294. public int getCd() {
  295. return cd;
  296. }
  297. private void setLastEvent(long lastEvent) {
  298. log.debug("Set lastEvent: " + lastEvent);
  299. this.lastEvent = lastEvent;
  300. }
  301. public void setLastRCode(int rcode) {
  302. log.debug("Set lastRCode: " + lastRCode);
  303. this.lastRCode = rcode;
  304. }
  305. private class EventListenerImpl implements EventListener {
  306. private Session session;
  307. public EventListenerImpl(Session session) {
  308. this.session = session;
  309. }
  310. public void setLastEvent(long lastEvent) {
  311. session.setLastEvent(lastEvent);
  312. }
  313. public void setLastRCode(int rcode) {
  314. session.setLastRCode(rcode);
  315. }
  316. }
  317. Receiver getReceiver() {
  318. return receiver;
  319. }
  320. /**
  321.  * Allocate a new buffer
  322.  * 
  323.  * @param type
  324.  *            The type of the buffer
  325.  * @param subtype
  326.  *            The subtype of the buffer
  327.  * @return The new buffer
  328.  * @throws ConnectionException
  329.  *             If the buffer cannot be created or the subtype located
  330.  */
  331. private Buffer tpalloc(String type, String subtype)
  332. throws ConnectionException {
  333. if (type == null) {
  334. throw new ConnectionException(Connection.TPEINVAL,
  335. "No type provided");
  336. } else if (type.equals("X_OCTET")) {
  337. log.debug("Initializing a new X_OCTET");
  338. return new X_OCTET();
  339. } else if (type.equals("X_C_TYPE")) {
  340. log.debug("Initializing a new X_C_TYPE");
  341. return new X_C_TYPE(subtype, properties);
  342. } else {
  343. log.debug("Initializing a new X_COMMON");
  344. return new X_COMMON(subtype, properties);
  345. }
  346. }
  347. }