Connection.java
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:14k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. package org.jboss.blacktie.jatmibroker.xatmi;
  19. import java.util.Arrays;
  20. import java.util.HashMap;
  21. import java.util.Iterator;
  22. import java.util.Map;
  23. import java.util.Properties;
  24. import org.apache.log4j.LogManager;
  25. import org.apache.log4j.Logger;
  26. import org.jboss.blacktie.jatmibroker.core.conf.ConfigurationException;
  27. import org.jboss.blacktie.jatmibroker.core.transport.Message;
  28. import org.jboss.blacktie.jatmibroker.core.transport.Receiver;
  29. import org.jboss.blacktie.jatmibroker.core.transport.Transport;
  30. import org.jboss.blacktie.jatmibroker.core.transport.TransportFactory;
  31. /**
  32.  * This is the connection to remote Blacktie services. It must be created using
  33.  * the ConnectionFactory
  34.  * 
  35.  * @see ConnectionFactory
  36.  */
  37. public class Connection {
  38. private static final Logger log = LogManager.getLogger(Connection.class);
  39. // AVAILABLE FLAGS
  40. public static final int TPNOBLOCK = 0x00000001;
  41. public static final int TPSIGRSTRT = 0x00000002;
  42. public static final int TPNOREPLY = 0x00000004;
  43. public static final int TPNOTRAN = 0x00000008;
  44. public static final int TPTRAN = 0x00000010;
  45. public static final int TPNOTIME = 0x00000020;
  46. public static final int TPGETANY = 0x00000080;
  47. public static final int TPNOCHANGE = 0x00000100;
  48. public static final int TPCONV = 0x00000400;
  49. public static final int TPSENDONLY = 0x00000800;
  50. public static final int TPRECVONLY = 0x00001000;
  51. // ERROR CONDITIONS
  52. public static final int TPEBADDESC = 2;
  53. public static final int TPEBLOCK = 3;
  54. public static final int TPEINVAL = 4;
  55. public static final int TPELIMIT = 5;
  56. public static final int TPENOENT = 6;
  57. public static final int TPEOS = 7;
  58. public static final int TPEPROTO = 9;
  59. public static final int TPESVCERR = 10;
  60. public static final int TPESVCFAIL = 11;
  61. public static final int TPESYSTEM = 12;
  62. public static final int TPETIME = 13;
  63. public static final int TPETRAN = 14;
  64. public static final int TPGOTSIG = 15;
  65. public static final int TPEITYPE = 17;
  66. public static final int TPEOTYPE = 18;
  67. public static final int TPEEVENT = 22;
  68. public static final int TPEMATCH = 23;
  69. // SERVICE CONDITIONS
  70. public static final short TPFAIL = 0x00000001;
  71. public static final short TPSUCCESS = 0x00000002;
  72. // Events
  73. public static final long TPEV_DISCONIMM = 0x0001;
  74. public static final long TPEV_SVCERR = 0x0002;
  75. public static final long TPEV_SVCFAIL = 0x0004;
  76. public static final long TPEV_SVCSUCC = 0x0008;
  77. public static final long TPEV_SENDONLY = 0x0020;
  78. public static final int XATMI_SERVICE_NAME_LENGTH = 15;
  79. private static int nextId;
  80. private Map<String, Transport> transports = new HashMap<String, Transport>();
  81. /**
  82.  * Any local temporary queues created in this connection
  83.  */
  84. private Map<java.lang.Integer, Receiver> temporaryQueues = new HashMap<java.lang.Integer, Receiver>();
  85. private Properties properties;
  86. private Map<Integer, Session> sessions = new HashMap<Integer, Session>();
  87. private static boolean warnedTPSIGRSTRT;
  88. /**
  89.  * The connection
  90.  * 
  91.  * @param properties
  92.  * @param username
  93.  * @param password
  94.  * @throws ConnectionException
  95.  */
  96. Connection(Properties properties) {
  97. this.properties = properties;
  98. }
  99. /**
  100.  * Allocate a new buffer
  101.  * 
  102.  * @param type
  103.  *            The type of the buffer
  104.  * @param subType
  105.  *            The subtype of the buffer
  106.  * @return The new buffer
  107.  * @throws ConnectionException
  108.  */
  109. public Buffer tpalloc(String type, String subtype)
  110. throws ConnectionException {
  111. if (type == null) {
  112. throw new ConnectionException(Connection.TPEINVAL,
  113. "No type provided");
  114. } else if (type.equals("X_OCTET")) {
  115. log.debug("Initializing a new X_OCTET");
  116. return new X_OCTET();
  117. } else if (type.equals("X_C_TYPE")) {
  118. log.debug("Initializing a new X_C_TYPE");
  119. return new X_C_TYPE(subtype, properties);
  120. } else {
  121. log.debug("Initializing a new X_COMMON");
  122. return new X_COMMON(subtype, properties);
  123. }
  124. }
  125. /**
  126.  * Synchronous call
  127.  * 
  128.  * @param svc
  129.  *            The name of the service to call
  130.  * @param idata
  131.  *            The inbound data
  132.  * @param flags
  133.  *            The flags to use
  134.  * @return The returned buffer
  135.  */
  136. public Response tpcall(String svc, Buffer buffer, int len, int flags)
  137. throws ConnectionException {
  138. log.debug("tpcall");
  139. int tpacallFlags = flags;
  140. tpacallFlags &= ~TPNOCHANGE;
  141. int cd = tpacall(svc, buffer, len, tpacallFlags);
  142. return receive(cd, flags);
  143. }
  144. /**
  145.  * Asynchronous call
  146.  * 
  147.  * @param svc
  148.  *            The name of the service to call
  149.  * @param idata
  150.  *            The inbound data
  151.  * @param flags
  152.  *            The flags to use
  153.  * @return The connection descriptor
  154.  */
  155. public int tpacall(String svc, Buffer toSend, int len, int flags)
  156. throws ConnectionException {
  157. log.debug("tpacall");
  158. int toCheck = flags & ~(TPNOTRAN | TPNOREPLY | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  159. if (toCheck != 0) {
  160. log.trace("invalid flags remain: " + toCheck);
  161. throw new ConnectionException(Connection.TPEINVAL, "Invalid flags remain: " + toCheck);
  162. }
  163. boolean hasTPSIGSTRT = (flags & TPSIGRSTRT) == TPSIGRSTRT;
  164. if (hasTPSIGSTRT && !warnedTPSIGRSTRT) {
  165. log.error("TPSIGRSTRT NOT SUPPORTED FOR SENDS OR RECEIVES");
  166. warnedTPSIGRSTRT = true;
  167. }
  168. svc = svc.substring(0, Math.min(Connection.XATMI_SERVICE_NAME_LENGTH,
  169. svc.length()));
  170. int correlationId = 0;
  171. synchronized (this) {
  172. correlationId = ++nextId;
  173. }
  174. Transport transport = getTransport(svc);
  175. Receiver endpoint = transport.createReceiver();
  176. temporaryQueues.put(correlationId, endpoint);
  177. // TODO HANDLE TRANSACTION
  178. String type = null;
  179. String subtype = null;
  180. byte[] data = null;
  181. if (toSend != null) {
  182. data = toSend.serialize();
  183. type = toSend.getType();
  184. subtype = toSend.getSubtype();
  185. if (!type.equals("X_OCTET")) {
  186. len = data.length;
  187. }
  188. }
  189. String timeToLive = properties.getProperty("TimeToLive");
  190. int ttl = 0;
  191. if (timeToLive != null) {
  192. ttl = Integer.parseInt(timeToLive) * 1000;
  193. }
  194. transport.getSender(svc).send(endpoint.getReplyTo(), (short) 0, 0,
  195. data, len, correlationId, flags, ttl, type, subtype);
  196. if ((flags & Connection.TPNOREPLY) == Connection.TPNOREPLY) {
  197. correlationId = 0;
  198. }
  199. log.debug("Returning cd: " + correlationId);
  200. return correlationId;
  201. }
  202. /**
  203.  * Cancel the outstanding asynchronous call.
  204.  * 
  205.  * @param cd
  206.  *            The connection descriptor
  207.  * @param flags
  208.  *            The flags to use
  209.  */
  210. public int tpcancel(int cd) throws ConnectionException {
  211. log.debug("tpcancel: " + cd);
  212. int toReturn = -1;
  213. Receiver endpoint = temporaryQueues.remove(cd);
  214. if (endpoint != null) {
  215. log.debug("closing endpoint");
  216. endpoint.close();
  217. log.debug("endpoint closed");
  218. toReturn = 0;
  219. } else {
  220. log.debug("No endpoint available");
  221. throw new ConnectionException(Connection.TPEBADDESC, "cd " + cd
  222. + " does not exist");
  223. }
  224. log.debug("tpcancel returning: " + toReturn);
  225. return toReturn;
  226. }
  227. /**
  228.  * Get the reply from the server
  229.  * 
  230.  * @param cd
  231.  *            The connection descriptor to use
  232.  * @param flags
  233.  *            The flags to use
  234.  * @return The response from the server
  235.  */
  236. public Response tpgetrply(int cd, int flags) throws ConnectionException {
  237. log.debug("tpgetrply: " + cd);
  238. int toCheck = flags & ~(TPGETANY | TPNOCHANGE | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  239. if (toCheck != 0) {
  240. log.trace("invalid flags remain: " + toCheck);
  241. throw new ConnectionException(Connection.TPEINVAL, "Invalid flags remain: " + toCheck);
  242. }
  243. boolean hasTPSIGSTRT = (flags & TPSIGRSTRT) == TPSIGRSTRT;
  244. if (hasTPSIGSTRT && !warnedTPSIGRSTRT) {
  245. log.error("TPSIGRSTRT NOT SUPPORTED FOR SENDS OR RECEIVES");
  246. warnedTPSIGRSTRT = true;
  247. }
  248. Response toReturn = receive(cd, flags);
  249. Session session = sessions.remove(cd);
  250. if (session != null) {
  251. log.debug("closing session");
  252. session.close();
  253. log.debug("closed session");
  254. }
  255. log.debug("tpgetrply returning: " + toReturn);
  256. return toReturn;
  257. }
  258. /**
  259.  * Handle the initiation of a conversation with the server
  260.  * 
  261.  * @param svc
  262.  *            The name of the service
  263.  * @param idata
  264.  *            The outbound buffer
  265.  * @param flags
  266.  *            The flags to use
  267.  * @return The connection descriptor
  268.  */
  269. public Session tpconnect(String svc, Buffer toSend, int len, int flags)
  270. throws ConnectionException {
  271. log.debug("tpconnect: " + svc);
  272. svc = svc.substring(0, Math.min(Connection.XATMI_SERVICE_NAME_LENGTH,
  273. svc.length()));
  274. // Initiate the session
  275. boolean hasTPSIGSTRT = (flags & TPSIGRSTRT) == TPSIGRSTRT;
  276. if (hasTPSIGSTRT && !warnedTPSIGRSTRT) {
  277. log.error("TPSIGRSTRT NOT SUPPORTED FOR SENDS OR RECEIVES");
  278. warnedTPSIGRSTRT = true;
  279. }
  280. svc = svc.substring(0, Math.min(Connection.XATMI_SERVICE_NAME_LENGTH,
  281. svc.length()));
  282. int correlationId = 0;
  283. synchronized (this) {
  284. correlationId = nextId++;
  285. }
  286. Transport transport = getTransport(svc);
  287. Session session = new Session(properties, transport, correlationId);
  288. Receiver endpoint = session.getReceiver();
  289. // TODO HANDLE TRANSACTION
  290. String type = null;
  291. String subtype = null;
  292. byte[] data = null;
  293. if (toSend != null) {
  294. data = toSend.serialize();
  295. type = toSend.getType();
  296. subtype = toSend.getSubtype();
  297. if (!type.equals("X_OCTET")) {
  298. len = data.length;
  299. }
  300. }
  301. String timeToLive = properties.getProperty("TimeToLive");
  302. int ttl = 0;
  303. if (timeToLive != null) {
  304. ttl = Integer.parseInt(timeToLive) * 1000;
  305. }
  306. log.debug("tpconnect sending data");
  307. transport.getSender(svc).send(endpoint.getReplyTo(), (short) 0, 0,
  308. data, len, correlationId, flags | TPCONV, ttl, type, subtype);
  309. byte[] response = null;
  310. try {
  311. log.debug("tpconnect receiving data");
  312. X_OCTET odata = (X_OCTET) session.tprecv(0);
  313. response = odata.getByteArray();
  314. log.debug("tpconnect received data");
  315. } catch (ConnectionException e) {
  316. if (e.getReceived() != null) {
  317. response = ((X_OCTET) e.getReceived()).getByteArray();
  318. log.debug("Caught an exception with data", e);
  319. } else {
  320. throw new ConnectionException(e.getTperrno(),
  321. "Could not connect");
  322. }
  323. }
  324. byte[] ack = new byte[4];
  325. byte[] bytes = "ACK".getBytes();
  326. System.arraycopy(bytes, 0, ack, 0, 3);
  327. boolean connected = response == null ? false : Arrays.equals(ack,
  328. response);
  329. if (!connected) {
  330. log.error("Could not connect");
  331. session.close();
  332. throw new ConnectionException(Connection.TPETIME,
  333. "Could not connect");
  334. }
  335. session.setCreatorState(flags);
  336. temporaryQueues.put(correlationId, endpoint);
  337. sessions.put(correlationId, session);
  338. // Return a handle to allow the connection to send/receive data on
  339. return session;
  340. }
  341. /**
  342.  * Close any resources associated with this connection
  343.  * 
  344.  * @throws ConnectionException
  345.  */
  346. public void close() throws ConnectionException {
  347. log.debug("Close connection called");
  348. Iterator<Receiver> receivers = temporaryQueues.values().iterator();
  349. while (receivers.hasNext()) {
  350. Receiver receiver = receivers.next();
  351. log.debug("closing receiver");
  352. receiver.close();
  353. log.debug("closed receiver");
  354. }
  355. temporaryQueues.clear();
  356. Iterator<Transport> transports = this.transports.values().iterator();
  357. while (transports.hasNext()) {
  358. Transport transport = transports.next();
  359. log.debug("closing transport");
  360. transport.close();
  361. log.debug("closed transport");
  362. }
  363. this.transports.clear();
  364. log.debug("Close connection finished");
  365. }
  366. private Transport getTransport(String serviceName)
  367. throws ConnectionException {
  368. Transport toReturn = transports.get(serviceName);
  369. if (toReturn == null) {
  370. try {
  371. toReturn = TransportFactory.loadTransportFactory(serviceName,
  372. properties).createTransport();
  373. } catch (ConfigurationException e) {
  374. throw new ConnectionException(Connection.TPENOENT,
  375. "Could not load transport for: " + serviceName, e);
  376. }
  377. transports.put(serviceName, toReturn);
  378. }
  379. return toReturn;
  380. }
  381. private Response receive(int cd, int flags) throws ConnectionException {
  382. log.debug("receive: " + cd);
  383. Receiver endpoint = temporaryQueues.remove(cd);
  384. if (endpoint == null) {
  385. throw new ConnectionException(Connection.TPEBADDESC,
  386. "Session does not exist");
  387. }
  388. Message message = endpoint.receive(flags);
  389. // TODO WE SHOULD BE SENDING THE CONNECTION ID?
  390. Buffer buffer = null;
  391. if (message.type != null && !message.type.equals("")) {
  392. if (message.type.equals("X_OCTET")) {
  393. log.debug("Initializing a new X_OCTET");
  394. buffer = new X_OCTET(message.data);
  395. } else if (message.type.equals("X_C_TYPE")) {
  396. log.debug("Initializing a new X_C_TYPE");
  397. buffer = new X_C_TYPE(message.subtype, properties, message.data);
  398. } else {
  399. log.debug("Initializing a new X_COMMON");
  400. buffer = new X_COMMON(message.subtype, properties, message.data);
  401. }
  402. }
  403. if (message.rval == Connection.TPFAIL) {
  404. if (message.rcode == Connection.TPESVCERR) {
  405. throw new ConnectionException(Connection.TPESVCERR, 0L,
  406. message.rcode,
  407. "Got an error back from the remote service", buffer);
  408. }
  409. throw new ConnectionException(Connection.TPESVCFAIL, 0L,
  410. message.rcode, "Got a fail back from the remote service",
  411. buffer);
  412. } else {
  413. Response response = new Response(message.rval, message.rcode,
  414. buffer, message.len, message.flags);
  415. log.debug("received returned a response? "
  416. + (response == null ? "false" : "true"));
  417. return response;
  418. }
  419. }
  420. }