TxManager.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:15k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "ThreadLocalStorage.h"
  19. #include "XAResourceManagerFactory.h"
  20. #include "OrbManagement.h"
  21. #include "TxManager.h"
  22. #include "AtmiBrokerEnv.h"
  23. #include "ace/Thread.h"
  24. #include "txAvoid.h"
  25. #include "SynchronizableObject.h"
  26. #define TX_GUARD(cond) {
  27. FTRACE(txmlogger, "ENTER"); 
  28. if (_connection == NULL) { 
  29. LOG4CXX_DEBUG(txmlogger, (char*) "Cannot connect to an ORB"); 
  30. return TX_ERROR; 
  31. if (!cond) {  
  32. LOG4CXX_WARN(txmlogger, (char*) "protocol error: open: " << _isOpen << " transaction: " << getSpecific(TSS_KEY));   
  33. return TX_PROTOCOL_ERROR;   
  34. }}
  35. namespace atmibroker {
  36. namespace tx {
  37. log4cxx::LoggerPtr txmlogger(log4cxx::Logger::getLogger("TxLogManager"));
  38. TxManager *TxManager::_instance = NULL;
  39. SynchronizableObject lock_;
  40. TxManager *TxManager::get_instance()
  41. {
  42. FTRACE(txmlogger, "ENTER");
  43. lock_.lock();
  44. if (_instance == NULL)
  45. _instance = new TxManager();
  46. lock_.unlock();
  47. return _instance;
  48. }
  49. void TxManager::discard_instance()
  50. {
  51. FTRACE(txmlogger, "ENTER");
  52. if (_instance != NULL) {
  53. delete _instance;
  54. _instance = NULL;
  55. }
  56. }
  57. TxManager::TxManager() :
  58. _whenReturn(TX_COMMIT_DECISION_LOGGED), _controlMode(TX_UNCHAINED), _timeout (0l), _isOpen(false), _connection(NULL)
  59. {
  60. FTRACE(txmlogger, "ENTER");
  61. AtmiBrokerEnv::get_instance();
  62. try {
  63. _connection = ::initOrb((char*) "ots");
  64. LOG4CXX_DEBUG(txmlogger, (char*) "new CONNECTION: " << _connection);
  65. } catch (CORBA::SystemException & e) {
  66. LOG4CXX_WARN(txmlogger, (char*) "Failed to connect to ORB for TM: " << e._name() << " minor code: " << e.minor());
  67. } catch (...) {
  68. LOG4CXX_WARN(txmlogger, (char*) "Unknown error looking up ORB for TM");
  69. }
  70. }
  71. TxManager::~TxManager()
  72. {
  73. FTRACE(txmlogger, "ENTER");
  74. if (_connection) {
  75. (void) close();
  76. LOG4CXX_DEBUG(txmlogger, (char*) "deleting CONNECTION: " << _connection);
  77. shutdownBindings(_connection);
  78. AtmiBrokerEnv::discard_instance();
  79. }
  80. }
  81. CORBA::ORB_ptr TxManager::getOrb() {
  82. return _connection->orbRef;
  83. }
  84. atmibroker::tx::TxControl *TxManager::currentTx(const char *msg)
  85. {
  86. FTRACE(txmlogger, "ENTER");
  87. atmibroker::tx::TxControl *tx = NULL;
  88. if ((!_isOpen || (tx = (TxControl *) getSpecific(TSS_KEY)) == NULL || !tx->isActive(NULL, true)) && msg) {
  89. LOG4CXX_INFO(txmlogger, (char*) "protocol violation (" << msg << ") open="
  90. << _isOpen << " TSS_KEY=" << getSpecific(TSS_KEY));
  91. }
  92. return tx;
  93. }
  94. CosTransactions::Control_ptr TxManager::create_tx(TRANSACTION_TIMEOUT timeout)
  95. {
  96. CosTransactions::Control_ptr ctrl = NULL;
  97. if (!CORBA::is_nil(_txfac)) {
  98. try {
  99. ctrl = _txfac->create((long) timeout);
  100. } catch (CORBA::SystemException & e) {
  101. LOG4CXX_DEBUG(txmlogger, (char*) "Could not create tx: "
  102. << e._name() << " minor code: " << e.minor());
  103. }
  104. }
  105. return ctrl;
  106. }
  107. int TxManager::begin(void)
  108. {
  109. TX_GUARD((_isOpen && !getSpecific(TSS_KEY)));
  110. // start a new transaction
  111. TRANSACTION_TIMEOUT timeout = _timeout; // take a copy since _timeout can change at any time
  112. CosTransactions::Control_ptr ctrl = create_tx(timeout);
  113. if (CORBA::is_nil(ctrl)) {
  114. if (open_trans_factory() == TX_OK)
  115. ctrl = create_tx(timeout);
  116. if (CORBA::is_nil(ctrl)) {
  117. LOG4CXX_WARN(txmlogger, (char*) "Unable to start a new transaction (nil control)");
  118. return TX_ERROR;
  119. }
  120. }
  121. TxControl *tx = new TxControl(ctrl, (long) timeout, ACE_OS::thr_self());
  122. // associate the tx with the callers thread and enlist all open RMs with the tx
  123. int rc = TxManager::tx_resume(tx, TMNOFLAGS);
  124. if (rc != XA_OK) {
  125. // one or more RMs failed to start - roll back the transaction
  126. LOG4CXX_WARN(txmlogger, (char*) "begin: XA resume error: " << rc);
  127. try {
  128. CosTransactions::Terminator_var term = ctrl->get_terminator();
  129. if (!CORBA::is_nil(term))
  130. term->rollback();
  131. } catch (CORBA::SystemException & e) {
  132. LOG4CXX_DEBUG(txmlogger, (char*) "Could not terminate tx after tx_resume failure: "
  133. << e._name() << " minor code: " << e.minor());
  134. } catch (...) {
  135. LOG4CXX_DEBUG(txmlogger, (char*) "Could not terminate tx after tx_resume failure (generic exception)");
  136. }
  137. delete tx;
  138. return TX_ERROR;
  139. }
  140. LOG4CXX_DEBUG(txmlogger, (char*) "begin: ok");
  141. return TX_OK;
  142. }
  143. int TxManager::commit(void)
  144. {
  145. FTRACE(txmlogger, "ENTER");
  146. return complete(true);
  147. }
  148. int TxManager::rollback(void)
  149. {
  150. FTRACE(txmlogger, "ENTER");
  151. return complete(false);
  152. }
  153. int TxManager::rollback_only(void)
  154. {
  155. TxControl *tx;
  156. TX_GUARD(((tx = currentTx("rollback_only")) != NULL));
  157. // inform the local resource managers
  158. rm_end(TMFAIL);
  159. return tx->rollback_only();
  160. }
  161. int TxManager::complete(bool commit)
  162. {
  163. TxControl *tx;
  164. int outcome;
  165. TX_GUARD(((tx = currentTx("complete")) != NULL));
  166. std::map<int, int (*)(int)> &cds = tx->get_cds();
  167. if (cds.size() != 0) {
  168. LOG4CXX_WARN(txmlogger, (char*) "Ending a tx with outstanding xatmi descriptors is not allowed - rolling back");
  169. // invalidate the descriptors
  170. for (std::map<int,  int (*)(int)>::iterator i = cds.begin() ; i != cds.end(); i++) {
  171. int cd = (*i).first;
  172. int (*func)(int) = (*i).second;
  173.  LOG4CXX_DEBUG(txmlogger, (char*) "Invalidating descriptor " << cd);
  174. if (func != 0) {
  175. int rv = func(cd);
  176.   LOG4CXX_DEBUG(txmlogger, (char*) "Invalidate returned " << rv);
  177. // NOTE the invalidate function may mark the tx as rollback only
  178. }
  179. }
  180. commit = false;
  181. }
  182. // no need to call rm_end since each RM wrapper calls xa_end during the prepare call
  183. outcome = (commit ? tx->commit(reportHeuristics()) : tx->rollback());
  184. delete tx;
  185. return (isChained() ? chainTransaction(outcome) : outcome);
  186. }
  187. int TxManager::chainTransaction(int outcome)
  188. {
  189. FTRACE(txmlogger, "ENTER");
  190. /*
  191.  * NOTE: outcome will only truly represent the outcome of commit if the commit_return
  192.  * characteristic is TX_COMMIT_COMPLETED (see method reportHeuristics()).
  193.  * Using get Coordinator::get_status is ambiguous since NoTransaction can mean the
  194.  * transaction committed or rolled back and has been forgotten.
  195.  * TODO in begin register a participant in the transaction so we can definitively know
  196.  * the transaction outcome.
  197.  */
  198. switch(begin()) {
  199. case TX_OK:
  200. return TX_OK;
  201. default:
  202. switch (outcome) {
  203. case TX_OK:
  204. return TX_NO_BEGIN;
  205. case TX_ROLLBACK:
  206. return TX_ROLLBACK_NO_BEGIN;
  207. case TX_MIXED:
  208. return TX_MIXED_NO_BEGIN;
  209. case TX_HAZARD:
  210. return TX_HAZARD_NO_BEGIN;
  211. default:
  212. return outcome;
  213. }
  214. }
  215. }
  216. int TxManager::set_commit_return(COMMIT_RETURN when_return)
  217. {
  218. TX_GUARD(_isOpen);
  219. if (when_return != TX_COMMIT_DECISION_LOGGED &&
  220. when_return != TX_COMMIT_COMPLETED)
  221. return TX_EINVAL;
  222. _whenReturn = when_return;
  223. return TX_OK;
  224. }
  225. int TxManager::set_transaction_control(TRANSACTION_CONTROL mode)
  226. {
  227. TX_GUARD(_isOpen);
  228. if (mode != TX_UNCHAINED && mode != TX_CHAINED)
  229. return TX_EINVAL;
  230. _controlMode = mode;
  231. return TX_OK;
  232. }
  233. int TxManager::set_transaction_timeout(TRANSACTION_TIMEOUT timeout)
  234. {
  235. TX_GUARD(_isOpen);
  236. if (timeout < 0l)
  237. return TX_EINVAL;
  238. _timeout = timeout;
  239. return TX_OK;
  240. }
  241. int TxManager::info(void *info)
  242. {
  243. TX_GUARD(_isOpen);
  244. atmibroker::tx::TxControl *tx = currentTx(NULL);
  245.     if (info != 0) {
  246.         long whenReturn = _whenReturn;
  247.         long controlMode = _controlMode;
  248. long status = -1l;
  249.         long timeout = _timeout;
  250.         if (tx != NULL) {
  251. XAResourceManagerFactory::getXID(::getXid(info));
  252.             status = tx->get_status();
  253.         }
  254. ::updateInfo(info, whenReturn, controlMode, timeout, status);
  255.     }
  256.     LOG4CXX_DEBUG(txmlogger, (char*) "info tx=" << tx);
  257.     return (tx != NULL ? 1 : 0);
  258. }
  259. int TxManager::open_trans_factory(void)
  260. {
  261. char *transFactoryId = orbConfig.transactionFactoryName;
  262. if (transFactoryId == NULL || strlen(transFactoryId) == 0) {
  263. LOG4CXX_ERROR(txmlogger, (char*) "Please set the TRANS_FACTORY_ID env variable");
  264. return TX_ERROR;
  265. }
  266. try {
  267. CosNaming::Name *name = _connection->default_ctx->to_name(transFactoryId);
  268. LOG4CXX_DEBUG(txmlogger, (char*) "resolving Tx Fac Id: " << transFactoryId);
  269. CORBA::Object_var obj = _connection->default_ctx->resolve(*name);
  270. delete name;
  271. LOG4CXX_DEBUG(txmlogger, (char*) "resolved OK: " << (void*) obj);
  272. _txfac = CosTransactions::TransactionFactory::_narrow(obj);
  273. LOG4CXX_DEBUG(txmlogger, (char*) "narrowed OK: " << (void*) _txfac);
  274. } catch (CORBA::SystemException & e) {
  275. LOG4CXX_ERROR(txmlogger, 
  276. (char*) "Error resolving Tx Service: " << e._name() << " minor code: " << e.minor());
  277. return TX_ERROR;
  278. } catch (...) {
  279. LOG4CXX_ERROR(txmlogger, 
  280. (char*) "Unknown error resolving Tx Service did you run ant jts in the JBoss distribution and edit the jbossts properties to bind the service in the CORBA naming service: " << transFactoryId);
  281. return TX_ERROR;
  282. }
  283. return TX_OK;
  284. }
  285. int TxManager::open(void)
  286. {
  287. TX_GUARD((true));
  288. if (_isOpen)
  289. return TX_OK;
  290. if (open_trans_factory() != TX_OK)
  291. return TX_ERROR;
  292. if (rm_open() != 0) {
  293. LOG4CXX_ERROR(txmlogger, (char*) "At least one resource manager failed");
  294. (void) rm_close();
  295. return TX_ERROR;
  296. }
  297. // re-initialize values
  298. _controlMode = TX_UNCHAINED;
  299. _timeout = 0l;
  300. _isOpen = true;
  301. return TX_OK;
  302. }
  303. int TxManager::close(void)
  304. {
  305. FTRACE(txmlogger, "ENTER");
  306. if (!_isOpen)
  307. return TX_OK;
  308. TX_GUARD((getSpecific(TSS_KEY) == NULL));
  309. _isOpen = false;
  310. rm_close();
  311. return TX_OK;
  312. }
  313. int TxManager::rm_open(void)
  314. {
  315. FTRACE(txmlogger, "ENTER");
  316. try {
  317. _xaRMFac.createRMs(_connection);
  318. return 0;
  319. } catch (RMException& ex) {
  320. LOG4CXX_WARN(txmlogger, (char*) "failed to load RMs: " << ex.what());
  321. return -1;
  322. }
  323. }
  324. // private methods
  325. void TxManager::rm_close(void)
  326. {
  327. FTRACE(txmlogger, "ENTER");
  328. _xaRMFac.destroyRMs();
  329. }
  330. // pre-requisite:- there is an active transaction
  331. int TxManager::rm_end(int flags, int altflags)
  332. {
  333. FTRACE(txmlogger, "ENTER: " << std::hex << flags);
  334. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  335. return (tx ? _xaRMFac.endRMs(tx->isOriginator(), flags, altflags) : XA_OK);
  336. }
  337. // pre-requisite:- there is an active transaction
  338. int TxManager::rm_start(int flags, int altflags)
  339. {
  340. FTRACE(txmlogger, "ENTER: " << std::hex << flags);
  341. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  342. return (tx ? _xaRMFac.startRMs(tx->isOriginator(), flags, altflags) : XA_OK);
  343. }
  344. CosTransactions::Control_ptr TxManager::get_ots_control(long* ttl)
  345. {
  346. FTRACE(txmlogger, "ENTER");
  347. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  348. return (tx ? tx->get_ots_control(ttl) : 0);
  349. }
  350. int TxManager::tx_resume(CosTransactions::Control_ptr control, long ttl, int flags, int altflag)
  351. {
  352. FTRACE(txmlogger, "ENTER");
  353. TxControl *tx = new TxControl(control, ttl, 0);
  354. int rc = TxManager::tx_resume(tx, flags);
  355. if (rc != XA_OK) {
  356. delete tx;
  357. }
  358. return rc;
  359. }
  360. int TxManager::tx_resume(TxControl *tx, int flags, int altflags)
  361. {
  362. FTRACE(txmlogger, "ENTER " << tx << " - flags=" << std::hex << flags);
  363. int rc = XAER_NOTA;
  364. if (getSpecific(TSS_KEY)) {
  365. TxControl *pt = (TxControl *) getSpecific(TSS_KEY);
  366. LOG4CXX_WARN(txmlogger, (char *) "Thread already bound to " << pt << " (deleting it)");
  367. delete pt;
  368. }
  369. try {
  370. // TMJOIN TMRESUME TMNOFLAGS
  371. // must associate the tx with the thread before calling start on each open RM
  372.    setSpecific(TSS_KEY, tx);
  373. if ((rc = TxManager::get_instance()->rm_start(flags, altflags)) == XA_OK) {
  374. LOG4CXX_DEBUG(txmlogger, (char *) "Resume tx: ok");
  375. return XA_OK;
  376. } else {
  377. LOG4CXX_WARN(txmlogger, (char *) "Resume tx: error: " << rc);
  378. }
  379. } catch (PortableServer::POA::ObjectAlreadyActive &e) {
  380. // THIS COMES FROM XAResourceManager.cxx:204 poa_->activate_object_with_id(objId, ra);
  381. LOG4CXX_WARN(txmlogger, (char *) "Resume tx: ObjectAlreadyActive");
  382. } catch (...) {
  383. LOG4CXX_WARN(txmlogger, (char *) "Resume tx: generic exception");
  384. }
  385. destroySpecific(TSS_KEY);
  386. return rc;
  387. }
  388. CosTransactions::Control_ptr TxManager::tx_suspend(int flags, int altflags)
  389. {
  390. FTRACE(txmlogger, "ENTER");
  391. return (tx_suspend((TxControl *) getSpecific(TSS_KEY), flags, altflags));
  392. }
  393. /**
  394.  * Suspend the transaction and return the control.
  395.  * The caller is responsible for releasing the returned control
  396.  */
  397. CosTransactions::Control_ptr TxManager::tx_suspend(TxControl *tx, int flags, int altflags)
  398. {
  399. FTRACE(txmlogger, "ENTER");
  400. if (tx && tx->isActive(NULL, true)) {
  401. // increment the control reference count
  402. CosTransactions::Control_ptr ctrl = tx->get_ots_control(NULL);
  403. // suspend all open Resource Managers (TMSUSPEND TMMIGRATE TMSUCCESS TMFAIL)
  404. (void) rm_end(flags, altflags);
  405. // disassociate the transaction from the callers thread
  406. tx->suspend();
  407. delete tx;
  408. FTRACE(txmlogger, "< ctrl: " << ctrl);
  409. return ctrl;
  410. }
  411. FTRACE(txmlogger, "< ctrl: 0x0");
  412.  return NULL;
  413. }
  414. int TxManager::resume(int cd)
  415. {
  416. FTRACE(txmlogger, "ENTER");
  417. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  418. if (tx) {
  419. std::map<int, int (*)(int)> &cds = tx->get_cds();
  420. std::map<int,  int (*)(int)>::iterator i = cds.find(cd);
  421. if (i != cds.end()) {
  422. LOG4CXX_DEBUG(txmlogger, (char*) "Removing tp call " << cd << " from tx "
  423. << tx << " remaining tpcalls: " << cds.size());
  424. cds.erase(i);
  425. LOG4CXX_DEBUG(txmlogger, (char*) "Deleted cd - remaining tpcalls: " << cds.size());
  426. if (cds.size() == 0) {
  427. LOG4CXX_DEBUG(txmlogger, (char*) "No more outstanding calls - resume RMs");
  428. return rm_start(TMRESUME);
  429. }
  430. }
  431. }
  432. return XA_OK;
  433. }
  434. int TxManager::suspend(int cd, int (*invalidate)(int cd))
  435. {
  436. FTRACE(txmlogger, "ENTER: " << cd);
  437. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  438. if (tx) {
  439. std::map<int, int (*)(int)> &cds = tx->get_cds();
  440. std::map<int, int (*)(int)>::iterator i = cds.find(cd);
  441. if (i == cds.end()) {
  442. LOG4CXX_DEBUG(txmlogger, (char*) "Adding tp call " << cd << " to tx " << tx);
  443. cds[cd] = invalidate;
  444. if (cds.size() == 1) {
  445. LOG4CXX_DEBUG(txmlogger, (char*) "First outstanding call - suspending RMs");
  446. return rm_end(TMSUSPEND | TMMIGRATE);
  447. }
  448. }
  449. }
  450. return XA_OK;
  451. }
  452. bool TxManager::isCdTransactional(int cd)
  453. {
  454. FTRACE(txmlogger, "ENTER: " << cd);
  455. TxControl *tx = (TxControl *) getSpecific(TSS_KEY);
  456. if (tx) {
  457. std::map<int, int (*)(int)> &cds = tx->get_cds();
  458. std::map<int, int (*)(int)>::iterator i = cds.find(cd);
  459. LOG4CXX_TRACE(txmlogger, (char*) "found=" << (i != cds.end()) << " tx=" << tx << " calls=" << cds.size());
  460. return (i != cds.end());
  461. }
  462. return false;
  463. }
  464. #if defined (ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION)
  465. template class ACE_Singleton<TxManager, ACE_Null_Mutex>;
  466. #elif defined (ACE_HAS_TEMPLATE_INSTANTIATION_PRAGMA)
  467. pragma instantiate ACE_Singleton<TxManager, ACE_Null_Mutex>;
  468. #endif /* ACE_HAS_EXPLICIT_TEMPLATE_INSTANTIATION */
  469. } // namespace tx
  470. } //namespace atmibroker