XAResourceManager.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:17k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include <string.h>
  19. #include "XAResourceManager.h"
  20. #include "ThreadLocalStorage.h"
  21. #include "ace/OS_NS_time.h"
  22. #include "ace/OS.h"
  23. #include "AtmiBrokerEnv.h"
  24. log4cxx::LoggerPtr xarmlogger(log4cxx::Logger::getLogger("TxLogXAManager"));
  25. SynchronizableObject* XAResourceManager::lock = new SynchronizableObject();
  26. long XAResourceManager::counter = 0l;
  27. ostream& operator<<(ostream &os, const XID& xid)
  28. {
  29. os << xid.formatID << ':' << xid.gtrid_length << ':' << xid.bqual_length << ':' << xid.data;
  30. return os;
  31. }
  32. void XAResourceManager::show_branches(const char *msg, XID * xid)
  33. {
  34. FTRACE(xarmlogger, "ENTER " << *xid);
  35. for (XABranchMap::iterator i = branches_.begin(); i != branches_.end(); ++i) {
  36. LOG4CXX_TRACE(xarmlogger, (char *) "XID: " << (i->first)); 
  37. }
  38. }
  39. static int compareXids(const XID& xid1, const XID& xid2)
  40. {
  41. char *x1 = (char *) &xid1;
  42. char *x2 = (char *) &xid2;
  43. char *e = (char *) (x1 + sizeof (XID));
  44. while (x1 < e)
  45. if (*x1 < *x2)
  46. return -1;
  47. else if (*x1++ > *x2++)
  48. return 1;
  49. return 0;
  50. }
  51. bool xid_cmp::operator()(const XID& xid1, const XID& xid2) {
  52. return (compareXids(xid1, xid2) < 0);
  53. }
  54. XAResourceManager::XAResourceManager(
  55. CORBA_CONNECTION* connection,
  56. const char * name,
  57. const char * openString,
  58. const char * closeString,
  59. CORBA::Long rmid,
  60. long sid,
  61. struct xa_switch_t * xa_switch,
  62. XARecoveryLog& log, PortableServer::POA_ptr poa) throw (RMException) :
  63. poa_(poa), connection_(connection), name_(name), openString_(openString), closeString_(closeString),
  64. rmid_(rmid), sid_(sid), xa_switch_(xa_switch), rclog_(log) {
  65. FTRACE(xarmlogger, "ENTER " << (char *) "new RM name: " << name << (char *) " openinfo: " <<
  66. openString << (char *) " rmid: " << rmid_);
  67. if (name == NULL) {
  68. RMException ex("Invalid RM name", EINVAL);
  69. throw ex;
  70. }
  71. int rv = xa_switch_->xa_open_entry((char *) openString, rmid_, TMNOFLAGS);
  72. LOG4CXX_TRACE(xarmlogger,  (char *) "xa_open: " << rv);
  73. if (rv != XA_OK) {
  74. LOG4CXX_ERROR(xarmlogger,  (char *) "xa_open error: " << rv);
  75. RMException ex("xa_open", rv);
  76. throw ex;
  77. }
  78. // each RM has its own POA
  79. // createPOA();
  80. }
  81. XAResourceManager::~XAResourceManager() {
  82. FTRACE(xarmlogger, "ENTER");
  83. int rv = xa_switch_->xa_close_entry((char *) closeString_, rmid_, TMNOFLAGS);
  84. LOG4CXX_TRACE(xarmlogger, (char *) "xa_close: " << rv);
  85. if (rv != XA_OK)
  86. LOG4CXX_WARN(xarmlogger, (char *) " close RM " << name_ << " failed: " << rv);
  87. }
  88. /**
  89.  * replay branch completion on an XID that needs recovering.
  90.  * The rc parameter is the CORBA object reference of the
  91.  * Recovery Coordinator for the the branch represented by the XID
  92.  *
  93.  * return bool true if it is OK for the caller to delete the associated recovery record
  94.  */
  95. bool XAResourceManager::recover(XID& bid, const char* rc)
  96. {
  97. bool delRecoveryRec = true;
  98. FTRACE(xarmlogger, "ENTER");
  99. CORBA::Object_var ref = connection_->orbRef->string_to_object(rc);
  100. XAResourceAdaptorImpl *ra = NULL;
  101. if (CORBA::is_nil(ref)) {
  102. LOG4CXX_INFO(xarmlogger, (char *) "Invalid recovery coordinator ref: " << rc);
  103. return delRecoveryRec;
  104. } else {
  105. CosTransactions::RecoveryCoordinator_var rcv = CosTransactions::RecoveryCoordinator::_narrow(ref);
  106. if (CORBA::is_nil(rcv)) {
  107. LOG4CXX_INFO(xarmlogger, (char *) "Could not narrow recovery coordinator ref: " << rc);
  108. } else {
  109. int rv;
  110. ra = new XAResourceAdaptorImpl(this, bid, bid, rmid_, xa_switch_, rclog_);
  111. if (branches_[bid] != NULL) {
  112. // log an error since we forgot to clean up the previous servant
  113. LOG4CXX_ERROR(xarmlogger, (char *) "Recovery: branch already exists: " << bid);
  114. }
  115. // activate the servant
  116. std::string s = (char *) (bid.data + bid.gtrid_length);
  117. PortableServer::ObjectId_var objId = PortableServer::string_to_ObjectId(s.c_str());
  118. poa_->activate_object_with_id(objId, ra);
  119. // get a CORBA reference to the servant so that it can be enlisted in the OTS transaction
  120. CORBA::Object_var ref = poa_->id_to_reference(objId.in());
  121. ra->_remove_ref(); // now only the POA has a reference to ra
  122. CosTransactions::Resource_var v = CosTransactions::Resource::_narrow(ref);
  123. LOG4CXX_DEBUG(xarmlogger, (char*) "Recovering resource with branch id: " << bid <<
  124. " and recovery IOR: " << connection_->orbRef->object_to_string(v));
  125. try {
  126. /*
  127.  * Replay phase 2. The spec says we should use the same resource.
  128.  * If we really do need to use the same one then we need to reconstruct it
  129.  * with the same reference by setting the PortableServer::USER_ID
  130.  * policy on the creating POA (and use the same name based on the branch id).
  131.  *
  132.  * Remember to deal with heuristics (see section 2-50 of the OMG OTS spec).
  133.  */
  134. Status txs = rcv->replay_completion(v);
  135. LOG4CXX_DEBUG(xarmlogger, (char *) "Recovery: TM reports transaction status: " << txs
  136. << " (" << CosTransactions::StatusActive);
  137. switch (txs) {
  138. case CosTransactions::StatusUnknown:
  139. // the TM must have presumed abort and discarded the transaction (? is this always the case)
  140. // fallthru to next case
  141. case CosTransactions::StatusNoTransaction:
  142. // the TM must have presumed abort and discarded the transaction
  143. // fallthru to next case
  144. case CosTransactions::StatusRolledBack:
  145. // the TM has already rolled back the transaction
  146. // presumed abort - force the branch to rollback
  147. LOG4CXX_INFO(xarmlogger, (char *) "Recovery: rolling back branch for RM " << xa_switch_->name);
  148. rv = xa_switch_->xa_rollback_entry(&bid, rmid_, TMNOFLAGS);
  149. if (rv != XA_OK) {
  150. // ? under what circumstances is it possible to recover from this error
  151. LOG4CXX_INFO(xarmlogger, (char *) "Recovery: RM returned XA error " << rv);
  152. }
  153. break;
  154. case CosTransactions::StatusCommitted:
  155. LOG4CXX_INFO(xarmlogger, (char *) "Recovery: committing branch for RM " << xa_switch_->name);
  156. rv = xa_switch_->xa_commit_entry(&bid, rmid_, TMNOFLAGS);
  157. if (rv != XA_OK) {
  158. // ? under what circumstances is it possible to recover from this error
  159. LOG4CXX_INFO(xarmlogger, (char *) "Recovery: RM returned XA error " << rv);
  160. }
  161. break;
  162. /*
  163.  * Note that the BlackTie server should only try to recover XIDs it finds in its prepared log
  164.  * so the following status codes should never occur:
  165.  */
  166. case CosTransactions::StatusActive:
  167. case CosTransactions::StatusMarkedRollback:
  168. LOG4CXX_INFO(xarmlogger, (char *) "Recovery: TM returned an unexpected status");
  169. break;
  170. /*
  171.  * The remaining cases imply that the TM will eventually tell us how to complete the branch
  172.  */
  173. case CosTransactions::StatusPrepared:
  174. case CosTransactions::StatusPreparing:
  175. case CosTransactions::StatusRollingBack:
  176. case CosTransactions::StatusCommitting:
  177. // the XAResourceAdapterImpl corresponding to the branch will remove the recovery record
  178. LOG4CXX_INFO(xarmlogger,
  179. (char *) "Recovery: replaying transaction (TM reports prepared/preparing or completing)");
  180. branches_[bid] = ra;
  181. delRecoveryRec = false;
  182. break;
  183. default:
  184. // shouldn't happend all cases have been dealt with
  185. break;
  186. }
  187. } catch (CosTransactions::NotPrepared& e) {
  188. LOG4CXX_WARN(xarmlogger, (char *) "Recovery: TM says the transaction as not prepared: " << e._name());
  189. } catch (const CORBA::SystemException& e) {
  190. LOG4CXX_WARN(xarmlogger, (char*) "Recovery: replay error: " << e._name() << " minor: " << e.minor());
  191. }
  192. if (delRecoveryRec)
  193. delete ra;
  194. }
  195. }
  196. return delRecoveryRec;
  197. }
  198. /**
  199.  * check whether it is OK to recover a given XID
  200.  */
  201. bool XAResourceManager::isRecoverable(XID &xid)
  202. {
  203. /*
  204.  * if the XID is one of ours it will encode the RM id and the server id
  205.  * in the first two longs of the XID data (see XAResourceManager::gen_xid)
  206.  */
  207. char *bdata = (char *) (xid.data + xid.gtrid_length);
  208. char *sp = strchr(bdata, ':');
  209. long rmid = ACE_OS::atol(bdata); // the RM id
  210. long sid = (sp == 0 || ++sp == 0 ? 0l : ACE_OS::atol(sp)); // the server id
  211. /*
  212.  * Only recover our own XIDs - the reason we need to check the server id is to
  213.  * avoid the possibility of a server rolling back another servers active XID
  214.  *
  215.  * Note that this means that a recovery log can only be processed by server
  216.  * with the correct id.
  217.  *
  218.  * The user can override this behaviour, so that any server or client can recover any log,
  219.  * via an environment variable:
  220.  */
  221. if (AtmiBrokerEnv::get_instance()->getenv((char*) "BLACKTIE_RC_LOG_NAME", NULL) != NULL)
  222. sid = sid_;
  223. AtmiBrokerEnv::discard_instance();
  224. if (rmid == rmid_ && sid == sid_) {
  225. /*
  226.  * If this XID does not appear in the recovery log then the server must have failed
  227.  * after calling prepare on the RM but before writing the recovery log in which case
  228.  * it is OK to recover the XID
  229.  */
  230. if (rclog_.find_ior(xid) == 0)
  231. return true;
  232. }
  233. return false;
  234. }
  235. /**
  236.  * Initiate a recovery scan on the RM looking for prepared or heuristically completed branches
  237.  * This functionality covers the following failure scenario:
  238.  * - server calls prepare on a RM
  239.  * - RM prepares but the the server fails before it can write to its transaction recovery log
  240.  * In this case the RM will have a pending transaction branch which does not appear in
  241.  * the recovery log. Calling xa_recover on the RM will return the 'missing' XID which the
  242.  * recovery scan can rollback.
  243.  */
  244. int XAResourceManager::recover()
  245. {
  246. XID xids[10];
  247. long count = sizeof (xids) / sizeof (XID);
  248. long flags = TMSTARTRSCAN; // position the cursor at the start of the list
  249. int i, nxids;
  250. do {
  251. // ask the RM for all XIDs that need recovering
  252. nxids = xa_switch_->xa_recover_entry(xids, count, rmid_, flags);
  253. flags = TMNOFLAGS; // on the next call continue the scan from the current cursor position
  254. for (i = 0; i < nxids; i++) {
  255. // check whether this id needs rolling back
  256. if (isRecoverable(xids[i])) {
  257. int rv = xa_switch_->xa_rollback_entry((XID *) (xids + i), rmid_, TMNOFLAGS);
  258. LOG4CXX_INFO(xarmlogger, (char*) "Recovery of xid " << xids[i] << " for RM " << rmid_ <<
  259.  " returned XA status " << rv);
  260. }
  261. }
  262. } while (count == nxids);
  263. return 0;
  264. }
  265. int XAResourceManager::createServant(XID& xid)
  266. {
  267. FTRACE(xarmlogger, "ENTER");
  268. int res = XAER_RMFAIL;
  269. XAResourceAdaptorImpl *ra = NULL;
  270. CosTransactions::Control_ptr curr = (CosTransactions::Control_ptr) txx_get_control();
  271. CosTransactions::Coordinator_ptr coord = NULL;
  272. if (CORBA::is_nil(curr))
  273. return XAER_NOTA;
  274. try {
  275. /*
  276.  * Generate an XID for the new branch. The XID should have the same global transaction id
  277.  * that the Transaction Manager generated but should have a different branch qualifier
  278.  * (since we want to have loose coupling semantics).
  279.  */
  280. XID bid = gen_xid(rmid_, sid_, xid);
  281. // Create a servant to represent the new branch.
  282. ra = new XAResourceAdaptorImpl(this, xid, bid, rmid_, xa_switch_, rclog_);
  283. #if 0
  284. // and activate it
  285. PortableServer::ObjectId_var objId = poa_->activate_object(ra);
  286. // get a CORBA reference to the servant so that it can be enlisted in the OTS transaction
  287. CORBA::Object_var ref = poa_->servant_to_reference(ra);
  288. #else
  289. std::string s = (char *) (bid.data + bid.gtrid_length);
  290. PortableServer::ObjectId_var objId = PortableServer::string_to_ObjectId(s.c_str());
  291. poa_->activate_object_with_id(objId, ra);
  292. // get a CORBA reference to the servant so that it can be enlisted in the OTS transaction
  293. CORBA::Object_var ref = poa_->id_to_reference(objId.in());
  294. #endif
  295. ra->_remove_ref(); // now only the POA has a reference to ra
  296. CosTransactions::Resource_var v = CosTransactions::Resource::_narrow(ref);
  297. // enlist it with the transaction
  298. LOG4CXX_TRACE(xarmlogger, (char*) "enlisting resource: "); // << connection_->orbRef->object_to_string(v));
  299. coord = curr->get_coordinator();
  300. CosTransactions::RecoveryCoordinator_ptr rc = coord->register_resource(v);
  301. //c->register_synchronization(new XAResourceSynchronization(xid, rmid_, xa_switch_));
  302. if (CORBA::is_nil(rc)) {
  303. LOG4CXX_TRACE(xarmlogger, (char*) "createServant: nill RecoveryCoordinator ");
  304. res = XAER_RMFAIL;
  305. } else {
  306. CORBA::String_var rcref = connection_->orbRef->object_to_string(rc);
  307. ra->set_recovery_coordinator(ACE_OS::strdup(rcref));
  308.     CORBA::release(rc);
  309. branches_[xid] = ra;
  310. res = XA_OK;
  311. ra = NULL;
  312. }
  313. } catch (RMException& ex) {
  314. LOG4CXX_WARN(xarmlogger, (char*) "unable to create resource adaptor for branch: " << ex.what());
  315. } catch (PortableServer::POA::ServantNotActive&) {
  316. LOG4CXX_ERROR(xarmlogger, (char*) "createServant: poa inactive");
  317. } catch (CosTransactions::Inactive&) {
  318. res = XAER_PROTO;
  319. LOG4CXX_TRACE(xarmlogger, (char*) "createServant: tx inactive (too late for registration)");
  320. } catch (const CORBA::SystemException& e) {
  321. LOG4CXX_WARN(xarmlogger, (char*) "Resource registration error: " << e._name() << " minor: " << e.minor());
  322. }
  323. if (ra)
  324. delete ra;
  325. if (!CORBA::is_nil(curr))
  326. CORBA::release(curr);
  327. if (!CORBA::is_nil(coord))
  328. CORBA::release(coord);
  329. return res;
  330. }
  331. void XAResourceManager::notify_error(XID * xid, int xa_error, bool forget)
  332. {
  333. FTRACE(xarmlogger, "ENTER: reason: " << xa_error);
  334. if (forget)
  335. set_complete(xid);
  336. }
  337. void XAResourceManager::set_complete(XID * xid)
  338. {
  339. FTRACE(xarmlogger, "ENTER");
  340. XABranchMap::iterator iter;
  341. LOG4CXX_TRACE(xarmlogger, (char*) "removing branch: " << *xid);
  342. if (rclog_.del_rec(*xid) != 0) {
  343. // probably a readonly resource
  344. LOG4CXX_TRACE(xarmlogger, (char*) "branch completion notification with out a corresponding log entry: " << *xid);
  345. }
  346. for (XABranchMap::iterator i = branches_.begin(); i != branches_.end(); ++i)
  347. {
  348. if (compareXids(i->first, (const XID&) (*xid)) == 0) {
  349. XAResourceAdaptorImpl *r = i->second;
  350. PortableServer::ObjectId_var id(poa_->servant_to_id(r));
  351. // Note: deactivate will delete r hence no call to r->_remove_ref();
  352. poa_->deactivate_object(id.in());
  353. branches_.erase(i->first);
  354. return;
  355. }
  356. }
  357. LOG4CXX_TRACE(xarmlogger, (char*) "... unknown branch");
  358. }
  359. int XAResourceManager::xa_start (XID * xid, long flags)
  360. {
  361. FTRACE(xarmlogger, "ENTER " << rmid_ << (char *) ": flags=" << std::hex << flags << " lookup XID: " << *xid);
  362. XAResourceAdaptorImpl * resource = locateBranch(xid);
  363. int rv;
  364. if (resource == NULL) {
  365. FTRACE(xarmlogger, "creating branch " << *xid);
  366. if ((rv = createServant(*xid)) != XA_OK)
  367. return rv;
  368. if ((resource = locateBranch(xid)) == NULL) // cannot be NULL
  369. return XAER_RMERR;
  370. FTRACE(xarmlogger, "starting branch " << *xid);
  371. return resource->xa_start(TMNOFLAGS);
  372. }
  373. FTRACE(xarmlogger, "existing branch " << *xid);
  374. return resource->xa_start(TMRESUME);
  375. }
  376. int XAResourceManager::xa_end (XID * xid, long flags)
  377. {
  378. FTRACE(xarmlogger, "ENTER end branch " << *xid << " rmid=" << rmid_ << " flags=" << std::hex << flags);
  379. XAResourceAdaptorImpl * resource = locateBranch(xid);
  380. if (resource == NULL) {
  381. LOG4CXX_WARN(xarmlogger, (char *) " no such branch " << *xid);
  382. return XAER_NOTA;
  383. }
  384. return resource->xa_end(flags);
  385. }
  386. XAResourceAdaptorImpl * XAResourceManager::locateBranch(XID * xid)
  387. {
  388. FTRACE(xarmlogger, "ENTER");
  389. XABranchMap::iterator iter;
  390. for (iter = branches_.begin(); iter != branches_.end(); ++iter) {
  391. LOG4CXX_TRACE(xarmlogger, (char *) "compare: " << *xid << " with " << (iter->first));
  392. if (compareXids(iter->first, (const XID&) (*xid)) == 0) {
  393. return (*iter).second;
  394. }
  395. }
  396. LOG4CXX_DEBUG(xarmlogger, (char *) " branch not found");
  397. return NULL;
  398. }
  399. int XAResourceManager::xa_flags()
  400. {
  401. return xa_switch_->flags;
  402. }
  403. /**
  404.  * Generate a new XID. The xid should be unique within the currently
  405.  * running process. Uniqueness is assured by including
  406.  *
  407.  * - the global transaction identifier (gid)
  408.  * - the server id (sid)
  409.  * - a counter
  410.  * - the current time
  411.  */
  412. XID XAResourceManager::gen_xid(long id, long sid, XID &gid)
  413. {
  414. FTRACE(xarmlogger, "ENTER");
  415. XID xid = {gid.formatID, gid.gtrid_length};
  416. int i;
  417. long myCounter = -1l;
  418. lock->lock();
  419. myCounter = ++counter;
  420. lock->unlock();
  421. for (i = 0; i < gid.gtrid_length; i++)
  422. xid.data[i] = gid.data[i];
  423. ACE_Time_Value now = ACE_OS::gettimeofday();
  424. /*
  425.  * The first two longs in the XID data should contain the RM id and server id respectively.
  426.  */
  427. (void) sprintf(xid.data + i, "%ld:%ld:%ld:%ld:%ld", id, sid, myCounter, (long) now.sec(), (long) now.usec());
  428. xid.bqual_length = strlen(xid.data + i);
  429. FTRACE(xarmlogger, "Leaving with XID: " << xid);
  430. return xid;
  431. }