XATMIc.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:29k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include <stdio.h>
  19. #include <stdlib.h>
  20. #include <string.h>
  21. #include <stdarg.h>
  22. #include <iostream>
  23. #include "log4cxx/logger.h"
  24. #include "ThreadLocalStorage.h"
  25. #include "txx.h"
  26. #include "xatmi.h"
  27. #include "Session.h"
  28. #include "AtmiBrokerClientControl.h"
  29. #include "AtmiBrokerServerControl.h"
  30. #include "AtmiBrokerClient.h"
  31. #include "AtmiBrokerServer.h"
  32. #include "AtmiBrokerMem.h"
  33. #include "AtmiBrokerEnv.h"
  34. #include "ServiceDispatcher.h"
  35. #include "txx.h"
  36. long DISCON = 0x00000003;
  37. bool warnedTPNOBLOCK = false;
  38. bool warnedTPGETANY = false;
  39. // Logger for XATMIc
  40. log4cxx::LoggerPtr loggerXATMI(log4cxx::Logger::getLogger("XATMIc"));
  41. int bufferSize(char* data, int suggestedSize) {
  42. if (data == NULL) {
  43. return 0;
  44. }
  45. int data_size = ::tptypes(data, NULL, NULL);
  46. if (data_size >= 0) {
  47. if (suggestedSize <= 0 || suggestedSize > data_size) {
  48. return data_size;
  49. } else {
  50. return suggestedSize;
  51. }
  52. } else {
  53. LOG4CXX_DEBUG(loggerXATMI,
  54. (char*) "A NON-BUFFER WAS ATTEMPTED TO BE SENT");
  55. setSpecific(TPE_KEY, TSS_TPEINVAL);
  56. return -1;
  57. }
  58. }
  59. void setTpurcode(long rcode) {
  60. char* retrieved = (char*) getSpecific(TPR_KEY);
  61. if (retrieved != NULL) {
  62. destroySpecific( TPR_KEY);
  63. free(retrieved);
  64. }
  65. if (rcode > 0) {
  66. char* toStore = (char*) malloc(8 * sizeof(long));
  67. sprintf(toStore, "%ld", rcode);
  68. setSpecific(TPR_KEY, toStore);
  69. }
  70. }
  71. int send(Session* session, const char* replyTo, char* idata, long ilen,
  72. int correlationId, long flags, long rval, long rcode) {
  73. LOG4CXX_DEBUG(loggerXATMI, (char*) "send - ilen: " << ilen << ": "
  74. << "cd: " << correlationId << "flags: " << flags);
  75. if (flags & TPNOBLOCK && !warnedTPNOBLOCK) {
  76. LOG4CXX_ERROR(loggerXATMI, (char*) "TPNOBLOCK NOT SUPPORTED FOR SENDS");
  77. warnedTPNOBLOCK = true;
  78. }
  79. int toReturn = -1;
  80. if (session->getCanSend() || rval == DISCON) {
  81. try {
  82. LOG4CXX_TRACE(loggerXATMI, (char*) "allocating data to go: "
  83. << ilen);
  84. MESSAGE message;
  85. message.replyto = replyTo;
  86. message.data = idata;
  87. message.len = ilen;
  88. message.correlationId = correlationId;
  89. message.flags = flags;
  90. message.rcode = rcode;
  91. message.rval = rval;
  92. message.type = NULL;
  93. message.subtype = NULL;
  94. if (message.data != NULL) {
  95. message.type = (char*) malloc(MAX_TYPE_SIZE + 1);
  96. memset(message.type, '', MAX_TYPE_SIZE + 1);
  97. message.subtype = (char*) malloc(MAX_SUBTYPE_SIZE + 1);
  98. memset(message.subtype, '', MAX_SUBTYPE_SIZE + 1);
  99. ::tptypes(idata, message.type, message.subtype);
  100. }
  101. if (message.type == NULL) {
  102. message.type = (char*) "";
  103. }
  104. if (message.subtype == NULL) {
  105. message.subtype = (char*) "";
  106. }
  107. message.control = (TPNOTRAN & flags) ? NULL : txx_serialize(&(message.ttl));
  108. if (message.control == NULL)
  109. message.ttl = mqConfig.timeToLive * 1000;
  110. session->blockSignals((flags & TPSIGRSTRT));
  111. if (session->send(message))
  112. toReturn = 0;
  113. if (session->unblockSignals() != 0 && (flags & TPSIGRSTRT) == 0)
  114. toReturn = -1;
  115. if (message.control)
  116. free(message.control);
  117. if (message.data != NULL) {
  118. free(message.type);
  119. free(message.subtype);
  120. }
  121. } catch (...) {
  122. LOG4CXX_ERROR(loggerXATMI, (char*) "send: call failed");
  123. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  124. }
  125. } else {
  126. LOG4CXX_ERROR(loggerXATMI, (char*) "Session " << correlationId
  127. << "can't send");
  128. setSpecific(TPE_KEY, TSS_TPEPROTO);
  129. }
  130. return toReturn;
  131. }
  132. int receive(int id, Session* session, char ** odata, long *olen, long flags,
  133. long* event, bool closeSession) {
  134. LOG4CXX_DEBUG(loggerXATMI, (char*) "Receive invoked");
  135. setTpurcode(0);
  136. int toReturn = -1;
  137. int len = ::bufferSize(*odata, *olen);
  138. if (len != -1) {
  139. LOG4CXX_DEBUG(loggerXATMI, (char*) "receive session: "
  140. << session->getId() << " olen: " << olen << " flags: " << flags);
  141. if (flags & TPGETANY && !warnedTPGETANY) {
  142. LOG4CXX_ERROR(loggerXATMI,
  143. (char*) "TPGETANY NOT SUPPORTED FOR RECEIVES");
  144. warnedTPGETANY = true;
  145. }
  146. if (session->getCanRecv()) {
  147. // TODO Make configurable Default wait time is blocking (x1000 in SynchronizableObject)
  148. long time;
  149. if (TPNOBLOCK & flags) { // NB flags override any XATMI or transaction timeouts
  150. time = 1;
  151. LOG4CXX_DEBUG(loggerXATMI, (char*) "Setting timeout to 1");
  152. } else if (TPNOTIME & flags) {
  153. time = 0;
  154. LOG4CXX_DEBUG(loggerXATMI, (char*) "TPNOTIME = BLOCKING CALL");
  155. } else {
  156. switch (txx_ttl(&time)) {
  157. case -1: // No transaction so use XATMI timeouts
  158. time = (long) mqConfig.requestTimeout + (long) mqConfig.timeToLive;
  159. {LOG4CXX_TRACE(loggerXATMI, (char*) "receive txx_ttl returned -1 time=" << time)};
  160. break;
  161. case 1: // txn not subject to a timeout so block
  162. time = 0;
  163. {LOG4CXX_TRACE(loggerXATMI, (char*) "receive txx_ttl returned 1 time=0"<< time)};
  164. break;
  165. default: /*FALLTHRU txx_ttl will only returns -1, 0 or 1*/
  166. case 0: // time has already been updated
  167. {LOG4CXX_TRACE(loggerXATMI, (char*) "receive txx_ttl returned 0 time=" << time)};
  168. break;
  169. }
  170. }
  171. LOG4CXX_DEBUG(loggerXATMI, (char*) "Setting timeout to: " << time);
  172. try {
  173. session->blockSignals((flags & TPSIGRSTRT));
  174. MESSAGE message = session->receive(time);
  175. if (session->unblockSignals() != 0 && (flags & TPSIGRSTRT) == 0) {
  176. // signalled during the receive call TPSIGRSTRT wasn't requested
  177. txx_rollback_only();
  178. } else if (message.received) {
  179. if (message.rval == DISCON) {
  180. *event = TPEV_DISCONIMM;
  181. txx_rollback_only();
  182. } else {
  183. char* type = message.type;
  184. if (type == NULL) {
  185. type = (char*) "";
  186. }
  187. char* subtype = message.subtype;
  188. if (subtype == NULL) {
  189. subtype = (char*) "";
  190. }
  191. char* messageType = (char*) malloc(MAX_TYPE_SIZE);
  192. char* messageSubtype = (char*) malloc(MAX_SUBTYPE_SIZE);
  193. tptypes(*odata, messageType, messageSubtype);
  194. bool typesChanged = strncmp(type, messageType,
  195. MAX_TYPE_SIZE) != 0 || strncmp(subtype,
  196. messageSubtype, MAX_SUBTYPE_SIZE) != 0;
  197. free(messageType);
  198. free(messageSubtype);
  199. if (flags & TPNOCHANGE && typesChanged) {
  200. // TODO rollback-only
  201. setSpecific(TPE_KEY, TSS_TPEOTYPE);
  202. setTpurcode(message.rcode);
  203. txx_rollback_only();
  204. free(message.data);
  205. free(message.type);
  206. free(message.subtype);
  207. free((char*) message.replyto);
  208. if (closeSession) {
  209. ptrAtmiBrokerClient->closeSession(id);
  210. LOG4CXX_TRACE(loggerXATMI,
  211. (char*) "receive session closed: "
  212. << id);
  213. }
  214. return toReturn;
  215. }
  216. if (len < message.len && !typesChanged) {
  217. *odata = AtmiBrokerMem::get_instance()->tprealloc(
  218. *odata, message.len, NULL, NULL, true);
  219. } else if (len < message.len && typesChanged) {
  220. *odata = AtmiBrokerMem::get_instance()->tprealloc(
  221. *odata, message.len, message.type,
  222. message.subtype, true);
  223. } else if (typesChanged) {
  224. *odata = AtmiBrokerMem::get_instance()->tprealloc(
  225. *odata, message.len, message.type,
  226. message.subtype, true);
  227. }
  228. *olen = message.len;
  229. if (message.len > 0) {
  230. memcpy(*odata, (char*) message.data, *olen);
  231. } else if (message.data == NULL) {
  232. *odata = NULL;
  233. }
  234. free(message.data);
  235. free(message.type);
  236. free(message.subtype);
  237. free((char*) message.replyto);
  238. if (message.rcode == TPESVCERR) {
  239. *event = TPEV_SVCERR;
  240. setSpecific(TPE_KEY, TSS_TPESVCERR);
  241. txx_rollback_only();
  242. ptrAtmiBrokerClient->closeSession(id);
  243. closeSession = false;
  244. } else if (message.rval == TPFAIL) {
  245. setTpurcode(message.rcode);
  246. *event = TPEV_SVCFAIL;
  247. setSpecific(TPE_KEY, TSS_TPESVCFAIL);
  248. txx_rollback_only();
  249. ptrAtmiBrokerClient->closeSession(id);
  250. closeSession = false;
  251. } else if (message.rval == TPSUCCESS) {
  252. toReturn = 0;
  253. setTpurcode(message.rcode);
  254. *event = TPEV_SVCSUCC;
  255. ptrAtmiBrokerClient->closeSession(id);
  256. closeSession = false;
  257. } else if (message.flags & TPRECVONLY) {
  258. toReturn = 0;
  259. *event = TPEV_SENDONLY;
  260. session->setCanSend(true);
  261. session->setCanRecv(false);
  262. LOG4CXX_DEBUG(
  263. loggerXATMI,
  264. (char*) "receive TPRECVONLY set constraints session: "
  265. << session->getId() << " send: "
  266. << session->getCanSend()
  267. << " recv: "
  268. << session->getCanRecv());
  269. } else if (message.correlationId >= 0) {
  270. toReturn = 0;
  271. } else {
  272. LOG4CXX_ERROR(loggerXATMI,
  273. (char*) "COULD NOT PARSE RECEIVED MESSAGE");
  274. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  275. }
  276. }
  277. } else if (TPNOBLOCK & flags) {
  278. LOG4CXX_DEBUG(loggerXATMI,
  279. (char*) "Message not immediately available");
  280. } else {
  281. setSpecific(TPE_KEY, TSS_TPETIME);
  282. txx_rollback_only();
  283. }
  284. } catch (...) {
  285. LOG4CXX_ERROR(
  286. loggerXATMI,
  287. (char*) "Could not set the receive from the destination");
  288. }
  289. } else {
  290. LOG4CXX_DEBUG(loggerXATMI, (char*) "Session can't receive");
  291. setSpecific(TPE_KEY, TSS_TPEPROTO);
  292. }
  293. }
  294. if (closeSession) {
  295. ptrAtmiBrokerClient->closeSession(id);
  296. LOG4CXX_TRACE(loggerXATMI, (char*) "receive session closed: " << id);
  297. }
  298. return toReturn;
  299. }
  300. int _get_tperrno(void) {
  301. LOG4CXX_TRACE(loggerXATMI, (char*) "_get_tperrno");
  302. char* err = (char*) getSpecific(TPE_KEY);
  303. int toReturn = 0;
  304. if (err != NULL) {
  305. LOG4CXX_TRACE(loggerXATMI, (char*) "found _get_tperrno" << err);
  306. toReturn = atoi(err);
  307. }
  308. LOG4CXX_TRACE(loggerXATMI, (char*) "returning _get_tperrno" << toReturn);
  309. return toReturn;
  310. }
  311. long _get_tpurcode(void) {
  312. LOG4CXX_TRACE(loggerXATMI, (char*) "_get_tpurcode");
  313. char* rcode = (char*) getSpecific(TPR_KEY);
  314. long toReturn = 0;
  315. if (rcode != NULL) {
  316. LOG4CXX_TRACE(loggerXATMI, (char*) "found _get_tpurcode" << rcode);
  317. toReturn = atol(rcode);
  318. }
  319. LOG4CXX_TRACE(loggerXATMI, (char*) "returning _get_tpurcode" << toReturn);
  320. return toReturn;
  321. }
  322. int tpadvertise(char * svcname, void(*func)(TPSVCINFO *)) {
  323. LOG4CXX_TRACE(loggerXATMI, (char*) "tpadvertise: " << svcname);
  324. setSpecific(TPE_KEY, TSS_TPERESET);
  325. int toReturn = -1;
  326. if (ptrServer != NULL) {
  327. if (ptrServer->advertiseService(svcname, func)) {
  328. toReturn = 0;
  329. }
  330. } else {
  331. LOG4CXX_ERROR(loggerXATMI, (char*) "server not initialized");
  332. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  333. }
  334. LOG4CXX_TRACE(loggerXATMI, (char*) "tpadvertise return: " << toReturn
  335. << " tperrno: " << tperrno);
  336. return toReturn;
  337. }
  338. int tpunadvertise(char * svcname) {
  339. LOG4CXX_TRACE(loggerXATMI, (char*) "tpunadvertise: " << svcname);
  340. setSpecific(TPE_KEY, TSS_TPERESET);
  341. int toReturn = -1;
  342. if (ptrServer != NULL) {
  343. if (svcname && strcmp(svcname, "") != 0) {
  344. if (ptrServer->isAdvertised(svcname)) {
  345. ptrServer->unadvertiseService(svcname, false);
  346. toReturn = 0;
  347. } else {
  348. setSpecific(TPE_KEY, TSS_TPENOENT);
  349. }
  350. } else {
  351. setSpecific(TPE_KEY, TSS_TPEINVAL);
  352. }
  353. } else {
  354. LOG4CXX_ERROR(loggerXATMI, (char*) "server not initialized");
  355. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  356. }
  357. LOG4CXX_TRACE(loggerXATMI, (char*) "tpunadvertise return: " << toReturn
  358. << " tperrno: " << tperrno);
  359. return toReturn;
  360. }
  361. char* tpalloc(char* type, char* subtype, long size) {
  362. LOG4CXX_TRACE(loggerXATMI, (char*) "tpalloc type: " << type << " size: "
  363. << size);
  364. if (subtype) {
  365. LOG4CXX_TRACE(loggerXATMI, (char*) "tpalloc subtype: " << type);
  366. }
  367. setSpecific(TPE_KEY, TSS_TPERESET);
  368. char* toReturn = NULL;
  369. if (clientinit() != -1) {
  370. toReturn = AtmiBrokerMem::get_instance()->tpalloc(type, subtype, size,
  371. false);
  372. }
  373. LOG4CXX_TRACE(loggerXATMI, (char*) "tpalloc returning" << " tperrno: "
  374. << tperrno);
  375. return toReturn;
  376. }
  377. char* tprealloc(char * addr, long size) {
  378. LOG4CXX_TRACE(loggerXATMI, (char*) "tprealloc: " << size);
  379. setSpecific(TPE_KEY, TSS_TPERESET);
  380. char* toReturn = NULL;
  381. if (clientinit() != -1) {
  382. toReturn = AtmiBrokerMem::get_instance()->tprealloc(addr, size, NULL,
  383. NULL, false);
  384. }
  385. LOG4CXX_TRACE(loggerXATMI, (char*) "tprealloc returning" << " tperrno: "
  386. << tperrno);
  387. return toReturn;
  388. }
  389. void tpfree(char* ptr) {
  390. LOG4CXX_TRACE(loggerXATMI, (char*) "tpfree");
  391. setSpecific(TPE_KEY, TSS_TPERESET);
  392. if (clientinit() != -1) {
  393. AtmiBrokerMem::get_instance()->tpfree(ptr, false);
  394. }
  395. LOG4CXX_TRACE(loggerXATMI, (char*) "tpfree returning" << " tperrno: "
  396. << tperrno);
  397. }
  398. long tptypes(char* ptr, char* type, char* subtype) {
  399. LOG4CXX_TRACE(loggerXATMI, (char*) "tptypes called");
  400. setSpecific(TPE_KEY, TSS_TPERESET);
  401. LOG4CXX_TRACE(loggerXATMI, (char*) "set the specific");
  402. long toReturn = -1;
  403. if (clientinit() != -1) {
  404. toReturn = AtmiBrokerMem::get_instance()->tptypes(ptr, type, subtype);
  405. }
  406. LOG4CXX_TRACE(loggerXATMI, (char*) "tptypes return: " << toReturn
  407. << " tperrno: " << tperrno);
  408. return toReturn;
  409. }
  410. int tpcall(char * svc, char* idata, long ilen, char ** odata, long *olen,
  411. long flags) {
  412. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcall: " << svc << " ilen: " << ilen
  413. << " flags: " << flags);
  414. int toReturn = -1;
  415. setSpecific(TPE_KEY, TSS_TPERESET);
  416. long toCheck = flags & ~(TPNOTRAN | TPNOCHANGE | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  417. if (toCheck != 0) {
  418. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  419. setSpecific(TPE_KEY, TSS_TPEINVAL);
  420. } else {
  421. if (clientinit() != -1) {
  422. long tpacallFlags = flags;
  423. tpacallFlags &= ~TPNOCHANGE;
  424. int cd = tpacall(svc, idata, ilen, tpacallFlags);
  425. if (cd != -1) {
  426. long tpgetrplyFlags = flags;
  427. tpgetrplyFlags &= ~TPNOTRAN;
  428. tpgetrplyFlags &= ~TPNOBLOCK;
  429. toReturn = tpgetrply(&cd, odata, olen, tpgetrplyFlags);
  430. }
  431. }
  432. }
  433. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcall return: " << toReturn
  434. << " tperrno: " << tperrno);
  435. return toReturn;
  436. }
  437. int tpacall(char * svc, char* idata, long ilen, long flags) {
  438. LOG4CXX_TRACE(loggerXATMI, (char*) "tpacall: " << svc << " ilen: " << ilen
  439. << " flags: 0x" << std::hex << flags);
  440. int toReturn = -1;
  441. setSpecific(TPE_KEY, TSS_TPERESET);
  442. long toCheck = flags & ~(TPNOTRAN | TPNOREPLY | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  443. if (toCheck != 0) {
  444. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  445. setSpecific(TPE_KEY, TSS_TPEINVAL);
  446. } else {
  447. bool transactional = !(flags & TPNOTRAN);
  448. if (transactional) {
  449. void *ctrl = txx_get_control();
  450. if (ctrl == NULL) {
  451. transactional = false;
  452. }
  453. txx_release_control(ctrl);
  454. }
  455. if (transactional && (flags & TPNOREPLY)) {
  456. LOG4CXX_ERROR(
  457. loggerXATMI,
  458. (char*) "TPNOREPLY CALLED WITHOUT TPNOTRAN DURING TRANSACTION");
  459. setSpecific(TPE_KEY, TSS_TPEINVAL);
  460. } else {
  461. int len = ::bufferSize(idata, ilen);
  462. if (len != -1) {
  463. if (clientinit() != -1) {
  464. Session* session = NULL;
  465. int cd = -1;
  466. try {
  467. session = ptrAtmiBrokerClient->createSession(cd, svc);
  468. LOG4CXX_TRACE(loggerXATMI, (char*) "new session: "
  469. << session << " cd: " << cd
  470. << " transactional: " << transactional);
  471. if (cd != -1) {
  472. if (transactional)
  473. txx_suspend(cd, tpdiscon);
  474. toReturn = ::send(session, session->getReplyTo(),
  475. idata, len, cd, flags, 0, 0);
  476. if (toReturn >= 0) {
  477. if (TPNOREPLY & flags) {
  478. toReturn = 0;
  479. ptrAtmiBrokerClient->closeSession(cd);
  480. } else {
  481. toReturn = cd;
  482. }
  483. } else {
  484. LOG4CXX_DEBUG(loggerXATMI,
  485. (char*) "Session got dudded: " << cd);
  486. ptrAtmiBrokerClient->closeSession(cd);
  487. }
  488. } else if (tperrno == 0) {
  489. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  490. ptrAtmiBrokerClient->closeSession(cd);
  491. }
  492. } catch (...) {
  493. LOG4CXX_ERROR(
  494. loggerXATMI,
  495. (char*) "tpacall failed to connect to service queue");
  496. setSpecific(TPE_KEY, TSS_TPENOENT);
  497. if (cd != -1) {
  498. ptrAtmiBrokerClient->closeSession(cd);
  499. }
  500. }
  501. if (transactional && toReturn < 0) {
  502. // txx_suspend was called but there was an error so
  503. // resume (note we didn't check for TPNOREPLY since we are in
  504. // the else arm of if (transactional && (flags & TPNOREPLY))
  505. LOG4CXX_DEBUG(loggerXATMI, (char*) "tpacall resume cd="
  506. << cd << " rv=" << toReturn);
  507. txx_resume(cd);
  508. }
  509. }
  510. }
  511. }
  512. }
  513. LOG4CXX_TRACE(loggerXATMI, (char*) "tpacall return: " << toReturn
  514. << " tperrno: " << tperrno);
  515. return toReturn;
  516. }
  517. int tpconnect(char * svc, char* idata, long ilen, long flags) {
  518. LOG4CXX_TRACE(loggerXATMI, (char*) "tpconnect: " << svc << " ilen: "
  519. << ilen << " flags: " << flags);
  520. int toReturn = -1;
  521. setSpecific(TPE_KEY, TSS_TPERESET);
  522. long toCheck = flags & ~(TPNOTRAN | TPSENDONLY | TPRECVONLY | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  523. if (toCheck != 0) {
  524. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  525. setSpecific(TPE_KEY, TSS_TPEINVAL);
  526. } else {
  527. if (!(flags & TPSENDONLY || flags & TPRECVONLY)) {
  528. setSpecific(TPE_KEY, TSS_TPEINVAL);
  529. } else {
  530. int len = 0;
  531. if (idata != NULL) {
  532. len = ::bufferSize(idata, ilen);
  533. }
  534. if (len != -1) {
  535. if (clientinit() != -1) {
  536. int cd = -1;
  537. Session* session = NULL;
  538. try {
  539. session = ptrAtmiBrokerClient->createSession(cd, svc);
  540. if (cd != -1) {
  541. int sendOk = ::send(session, session->getReplyTo(),
  542. idata, len, cd, flags | TPCONV, 0, 0);
  543. if (sendOk != -1) {
  544. long olen = 4;
  545. char* odata = (char*) tpalloc(
  546. (char*) "X_OCTET", NULL, olen);
  547. long event = 0;
  548. ::tprecv(cd, &odata, &olen, 0, &event);
  549. bool connected = strcmp(odata, "ACK") == 0;
  550. tpfree(odata);
  551. if (connected) {
  552. toReturn = cd;
  553. if (flags & TPRECVONLY) {
  554. session->setCanSend(false);
  555. LOG4CXX_DEBUG(
  556. loggerXATMI,
  557. (char*) "tpconnect set constraints session: "
  558. << session->getId()
  559. << " send: "
  560. << session->getCanSend()
  561. << " recv (not changed): "
  562. << session->getCanRecv());
  563. } else {
  564. session->setCanRecv(false);
  565. LOG4CXX_DEBUG(
  566. loggerXATMI,
  567. (char*) "tpconnect set constraints session: "
  568. << session->getId()
  569. << " send (not changed): "
  570. << session->getCanSend()
  571. << " recv: "
  572. << session->getCanRecv());
  573. }
  574. } else {
  575. LOG4CXX_DEBUG(loggerXATMI,
  576. (char*) "COULD NOT CONNECT: " << cd);
  577. ptrAtmiBrokerClient->closeSession(cd);
  578. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  579. }
  580. } else {
  581. LOG4CXX_DEBUG(loggerXATMI,
  582. (char*) "Session got dudded: " << cd);
  583. ptrAtmiBrokerClient->closeSession(cd);
  584. }
  585. } else if (tperrno == 0) {
  586. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  587. }
  588. } catch (...) {
  589. LOG4CXX_ERROR(
  590. loggerXATMI,
  591. (char*) "tpconnect failed to connect to service queue");
  592. setSpecific(TPE_KEY, TSS_TPENOENT);
  593. if (cd != -1) {
  594. ptrAtmiBrokerClient->closeSession(cd);
  595. }
  596. }
  597. }
  598. }
  599. }
  600. }
  601. LOG4CXX_TRACE(loggerXATMI, (char*) "tpconnect return: " << toReturn
  602. << " tperrno: " << tperrno);
  603. return toReturn;
  604. }
  605. int tpgetrply(int *id, char ** odata, long *olen, long flags) {
  606. LOG4CXX_TRACE(loggerXATMI, (char*) "tpgetrply " << id);
  607. int toReturn = -1;
  608. setSpecific(TPE_KEY, TSS_TPERESET);
  609. if (odata == NULL || *odata == NULL) {
  610. setSpecific(TPE_KEY, TSS_TPEINVAL);
  611. return toReturn;
  612. }
  613. long toCheck = flags & ~(TPGETANY | TPNOCHANGE | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  614. if (toCheck != 0) {
  615. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  616. setSpecific(TPE_KEY, TSS_TPEINVAL);
  617. } else {
  618. if (clientinit() != -1) {
  619. if (id && olen) {
  620. Session* session = ptrAtmiBrokerClient->getSession(*id);
  621. if (session == NULL) {
  622. setSpecific(TPE_KEY, TSS_TPEBADDESC);
  623. } else {
  624. long event = 0;
  625. toReturn = ::receive(*id, session, odata, olen, flags,
  626. &event, true);
  627. txx_resume(*id);
  628. }
  629. } else {
  630. setSpecific(TPE_KEY, TSS_TPEINVAL);
  631. }
  632. }
  633. }
  634. LOG4CXX_TRACE(loggerXATMI, (char*) "tpgetrply return: " << toReturn
  635. << " tperrno: " << tperrno);
  636. return toReturn;
  637. }
  638. int tpcancel(int id) {
  639. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcancel " << id);
  640. int toReturn = -1;
  641. setSpecific(TPE_KEY, TSS_TPERESET);
  642. if (::txx_isCdTransactional(id)) {
  643. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcancel not allowed (TSS_TPETRAN)");
  644. setSpecific(TPE_KEY, TSS_TPETRAN);
  645. } else if (clientinit() != -1) {
  646. if (getSpecific(TSS_KEY)) {
  647. setSpecific(TPE_KEY, TSS_TPETRAN);
  648. }
  649. if (ptrAtmiBrokerClient->getSession(id) != NULL) {
  650. ptrAtmiBrokerClient->closeSession(id);
  651. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcancel session closed");
  652. toReturn = 0;
  653. } else {
  654. setSpecific(TPE_KEY, TSS_TPEBADDESC);
  655. }
  656. }
  657. LOG4CXX_TRACE(loggerXATMI, (char*) "tpcancel return: " << toReturn
  658. << " tperrno: " << tperrno);
  659. return toReturn;
  660. }
  661. int tpsend(int id, char* idata, long ilen, long flags, long *revent) {
  662. LOG4CXX_TRACE(loggerXATMI, (char*) "tpsend " << id);
  663. int toReturn = -1;
  664. setSpecific(TPE_KEY, TSS_TPERESET);
  665. long toCheck = flags & ~(TPRECVONLY | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  666. if (toCheck != 0) {
  667. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  668. setSpecific(TPE_KEY, TSS_TPEINVAL);
  669. } else {
  670. int len = ::bufferSize(idata, ilen);
  671. if (len != -1) {
  672. if (clientinit() != -1) {
  673. Session* session = (Session*) getSpecific(SVC_SES);
  674. if (session != NULL) {
  675. if (session->getId() != id) {
  676. session = NULL;
  677. }
  678. }
  679. if (session == NULL) {
  680. if (clientinit() != -1) {
  681. session = ptrAtmiBrokerClient->getSession(id);
  682. if (session == NULL) {
  683. setSpecific(TPE_KEY, TSS_TPEBADDESC);
  684. len = -1;
  685. }
  686. }
  687. }
  688. if (len != -1) {
  689. if (session->getLastEvent() != 0) {
  690. if (revent != 0) {
  691. *revent = session->getLastEvent();
  692. LOG4CXX_DEBUG(
  693. loggerXATMI,
  694. (char*) "Session has event, will be closed: "
  695. << *revent);
  696. } else {
  697. LOG4CXX_ERROR(
  698. loggerXATMI,
  699. (char*) "Session has event, will be closed: "
  700. << session->getLastEvent());
  701. }
  702. if (session->getLastEvent() == TPEV_SVCFAIL) {
  703. setTpurcode(session->getLastRCode());
  704. } else if (session->getLastEvent() == TPEV_SVCSUCC
  705. || session->getLastEvent() == TPEV_DISCONIMM) {
  706. setSpecific(TPE_KEY, TSS_TPEEVENT);
  707. toReturn = -1;
  708. }
  709. ptrAtmiBrokerClient->closeSession(id);
  710. LOG4CXX_TRACE(loggerXATMI,
  711. (char*) "tpsend closed session");
  712. } else {
  713. toReturn = ::send(session, session->getReplyTo(),
  714. idata, len, id, flags, 0, 0);
  715. if (toReturn != -1 && flags & TPRECVONLY) {
  716. session->setCanSend(false);
  717. session->setCanRecv(true);
  718. LOG4CXX_DEBUG(loggerXATMI,
  719. (char*) "tpsend set constraints session: "
  720. << session->getId() << " send: "
  721. << session->getCanSend()
  722. << " recv: "
  723. << session->getCanRecv());
  724. }
  725. }
  726. }
  727. }
  728. }
  729. }
  730. LOG4CXX_TRACE(loggerXATMI, (char*) "tpsend return: " << toReturn
  731. << " tperrno: " << tperrno);
  732. return toReturn;
  733. }
  734. int tprecv(int id, char ** odata, long *olen, long flags, long* event) {
  735. LOG4CXX_TRACE(loggerXATMI, (char*) "tprecv " << id);
  736. int toReturn = -1;
  737. setSpecific(TPE_KEY, TSS_TPERESET);
  738. *event = 0;
  739. long toCheck = flags & ~(TPNOCHANGE | TPNOBLOCK | TPNOTIME | TPSIGRSTRT);
  740. if (toCheck != 0) {
  741. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  742. setSpecific(TPE_KEY, TSS_TPEINVAL);
  743. } else {
  744. if (clientinit() != -1) {
  745. Session* session = (Session*) getSpecific(SVC_SES);
  746. if (session != NULL && session->getId() != id) {
  747. session = NULL;
  748. }
  749. if (session == NULL) {
  750. if (clientinit() != -1) {
  751. session = ptrAtmiBrokerClient->getSession(id);
  752. }
  753. }
  754. if (session == NULL) {
  755. setSpecific(TPE_KEY, TSS_TPEBADDESC);
  756. } else {
  757. toReturn = ::receive(id, session, odata, olen, flags, event,
  758. false);
  759. if (*event == TPEV_SVCSUCC || *event == TPEV_DISCONIMM
  760. || *event == TPEV_SENDONLY || *event == TPEV_SVCFAIL
  761. || *event == TPEV_SVCERR) {
  762. setSpecific(TPE_KEY, TSS_TPEEVENT);
  763. toReturn = -1;
  764. }
  765. }
  766. }
  767. }
  768. LOG4CXX_TRACE(loggerXATMI, (char*) "tprecv return: " << toReturn
  769. << " tperrno: " << tperrno);
  770. return toReturn;
  771. }
  772. void tpreturn(int rval, long rcode, char* idata, long ilen, long flags) {
  773. LOG4CXX_TRACE(loggerXATMI, (char*) "tpreturn " << rval);
  774. setSpecific(TPE_KEY, TSS_TPERESET);
  775. long toCheck = flags;
  776. if (toCheck != 0) {
  777. LOG4CXX_TRACE(loggerXATMI, (char*) "invalid flags remain: " << toCheck);
  778. setSpecific(TPE_KEY, TSS_TPEINVAL);
  779. } else {
  780. if (clientinit() != -1) {
  781. int len = 0;
  782. if (idata != NULL) {
  783. len = ::bufferSize(idata, ilen);
  784. }
  785. Session* session = (Session*) getSpecific(SVC_SES);
  786. ServiceDispatcher* dispatcher = (ServiceDispatcher*) getSpecific(SVC_KEY);
  787. if (session != NULL) {
  788. if (!session->getCanSend()
  789. && !(rval == TPFAIL && idata == NULL)) {
  790. LOG4CXX_TRACE(loggerXATMI, (char*) "generating TPESVCERR");
  791. rcode = TPESVCERR;
  792. rval = TPFAIL;
  793. }
  794. if (rcode == TPESVCERR || len == -1) {
  795. rval = TPFAIL;
  796. }
  797. if (!session->getCanSend()) {
  798. rval = DISCON;
  799. }
  800. session->setCanRecv(false);
  801. if (rcode == TPESVCERR || len == -1) {
  802. // mark rollback only and disassociate tx if present
  803. txx_rollback_only();
  804. if (getSpecific(TSS_KEY) != NULL)
  805. txx_release_control(txx_unbind(true));
  806. if (idata != NULL) {
  807. ::tpfree(idata);
  808. }
  809. ::send(session, "", NULL, 0, 0, flags, rval, TPESVCERR);
  810. LOG4CXX_TRACE(loggerXATMI, (char*) "sent TPESVCERR");
  811. if(dispatcher != NULL) {
  812. LOG4CXX_TRACE(loggerXATMI, (char*) "update error counter");
  813. dispatcher->updateErrorCounter();
  814. }
  815. } else {
  816. if (rval != TPSUCCESS && rval != TPFAIL) {
  817. rval = TPFAIL;
  818. LOG4CXX_TRACE(loggerXATMI, (char*) "generating TPFAIL");
  819. }
  820. if (rval == TPFAIL) {
  821. txx_rollback_only();
  822. LOG4CXX_TRACE(loggerXATMI, (char*) "will send TPFAIL");
  823. if(dispatcher != NULL) {
  824. LOG4CXX_TRACE(loggerXATMI, (char*) "update error counter");
  825. dispatcher->updateErrorCounter();
  826. }
  827. }
  828. // TODO send a fail if there are any outstanding replies or
  829. // open connections, or if any work done within the service
  830. // caused its transaction to be marked rollback-only
  831. // mark rollback only and disassociate tx if present
  832. if (getSpecific(TSS_KEY) != NULL)
  833. txx_release_control(txx_unbind((rval == TPFAIL)));
  834. ::send(session, "", idata, len, 0, flags, rval, rcode);
  835. LOG4CXX_TRACE(loggerXATMI, (char*) "sent response");
  836. }
  837. ::tpfree(idata);
  838. session->setSendTo(NULL);
  839. session->setCanSend(false);
  840. LOG4CXX_DEBUG(loggerXATMI,
  841. (char*) "tpreturn set constraints session: "
  842. << session->getId() << " send: "
  843. << session->getCanSend() << " recv: "
  844. << session->getCanRecv());
  845. } else {
  846. LOG4CXX_DEBUG(loggerXATMI, (char*) "Session is null");
  847. setSpecific(TPE_KEY, TSS_TPEPROTO);
  848. }
  849. }
  850. }
  851. LOG4CXX_TRACE(loggerXATMI, (char*) "tpreturn returning" << " tperrno: "
  852. << tperrno);
  853. }
  854. int tpdiscon(int id) {
  855. LOG4CXX_TRACE(loggerXATMI, (char*) "tpdiscon " << id);
  856. setSpecific(TPE_KEY, TSS_TPERESET);
  857. int toReturn = -1;
  858. if (clientinit() != -1) {
  859. LOG4CXX_DEBUG(loggerXATMI, (char*) "end - id: " << id);
  860. Session* session = ptrAtmiBrokerClient->getSession(id);
  861. if (session == NULL) {
  862. setSpecific(TPE_KEY, TSS_TPEBADDESC);
  863. } else {
  864. // CHECK TO MAKE SURE THE REMOTE SIDE IS "EXPECTING" DISCONNECTS STILL
  865. if (session->getLastEvent() == 0) {
  866. // SEND THE DISCONNECT TO THE REMOTE SIDE
  867. ::send(session, "", NULL, 0, id, TPNOTRAN, DISCON, 0);
  868. }
  869. try {
  870. if (getSpecific(TSS_KEY)) {
  871. toReturn = txx_rollback_only();
  872. }
  873. ptrAtmiBrokerClient->closeSession(id);
  874. LOG4CXX_TRACE(loggerXATMI, (char*) "tpdiscon session closed");
  875. } catch (...) {
  876. LOG4CXX_ERROR(loggerXATMI, (char*) "tpdiscon: call failed");
  877. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  878. }
  879. }
  880. }
  881. LOG4CXX_TRACE(loggerXATMI, (char*) "tpdiscon return: " << toReturn
  882. << " tperrno: " << tperrno);
  883. return toReturn;
  884. }