XAResourceManagerFactory.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:13k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "XAResourceManagerFactory.h"
  19. #include "ThreadLocalStorage.h"
  20. #include "SymbolLoader.h"
  21. #include "AtmiBrokerEnv.h"
  22. #include "ace/DLL.h"
  23. #include "ace/ACE.h"
  24. #include "ace/OS.h"
  25. #ifdef ACE_HAS_POSITION_INDEPENDENT_POINTERS
  26. #include "ace/Based_Pointer_Repository.h"
  27. #endif /* ACE_HAS_POSITION_INDEPENDENT_POINTERS */
  28. #include "ace/Malloc_T.h"
  29. #include "ace/MMAP_Memory_Pool.h"
  30. #include "ace/PI_Malloc.h"
  31. #include "ace/Null_Mutex.h"
  32. #include "ace/Based_Pointer_T.h"
  33. log4cxx::LoggerPtr xarflogger(log4cxx::Logger::getLogger("TxLogXAFactory"));
  34. extern std::ostream& operator<<(std::ostream &os, const XID& xid);
  35. /**
  36.  * Convert an OTS tid (using the currently associated OTS transaction) into an XA XID:
  37.  * - the gtrid (global transaction id) is provided by the first bytes in tid
  38.  *   and the following bqual_length bytes correspond to the bqual (branch qualifier)
  39.  *   part of the XID
  40.  *
  41.  * Only the gtrid portion is of interest since BlackTie creates its own XIDs for
  42.  * driving RMs (but the gtrid must match the one the transaction manager is using).
  43.  * Refer to the method XAResourceManager::gen_xid for how the branch qualifier
  44.  * portion of the XID is generated.
  45.  *
  46.  * Refer to sections 4.2 and 7.3 of the XA spec and appendix B.2.2 of the OTS spec
  47.  * for details
  48.  */
  49. bool XAResourceManagerFactory::getXID(XID& xid)
  50. {
  51. FTRACE(xarflogger, "ENTER");
  52. CosTransactions::Control_ptr cp = (CosTransactions::Control_ptr) txx_get_control();
  53. bool ok = false;
  54. if (CORBA::is_nil(cp)) {
  55. LOG4CXX_WARN(xarflogger,  (char *) "getXID: no tx associated with the callers thread");
  56. return false;
  57. }
  58. try {
  59. CosTransactions::Coordinator_var cv = cp->get_coordinator();
  60. CosTransactions::PropagationContext_var pcv = cv->get_txcontext();
  61. CosTransactions::otid_t otid = pcv->current.otid;
  62. int otidlen = (int) otid.tid.length();
  63. char JBOSSTS_NODE_SEPARATOR = '-';
  64. char *tid, *p; // copy of the ots tid
  65. char *bq;   // the branch qualifier component
  66. p = tid = (char*) malloc(otidlen);
  67. if (tid == 0) {
  68. LOG4CXX_WARN(xarflogger, (char*) "Out of memory whilst converting OTS tid");
  69. return false;
  70. }
  71. memset(&xid, 0, sizeof (XID));
  72. xid.formatID = otid.formatID;
  73. for (int k = 0; k < otidlen; p++, k++)
  74. *p = otid.tid[k];
  75. LOG4CXX_TRACE(xarflogger,  (char *) "converting OTS tid " << tid);
  76. bq = strchr(tid, JBOSSTS_NODE_SEPARATOR);
  77. if (bq == 0) {
  78. // fingers crossed JBTM-577 has been fixed - do it the OTS way
  79. LOG4CXX_WARN(xarflogger, (char*) "no JBOSS separator in otid - assuming JBTM-577 is fixed");
  80. xid.bqual_length = otid.bqual_length;
  81. xid.gtrid_length = otidlen - otid.bqual_length;
  82. memcpy(xid.data, tid, otidlen);
  83. } else {
  84. // TODO com.arjuna.ats.jts.utils.Utility.uidToOtid is not OTS compliant
  85. // duplicate what JBossTS does - will be fixed in JBossTS 4.8.0 (see JBTM-577)
  86. bq += 1;
  87. xid.gtrid_length = (long) (bq - tid - 1);
  88. xid.bqual_length = strlen(bq);
  89. memset(xid.data, 0, XIDDATASIZE);
  90. memcpy(xid.data, tid, xid.gtrid_length);
  91. memcpy(xid.data + xid.gtrid_length, bq, xid.bqual_length);
  92. }
  93. free(tid);
  94. LOG4CXX_TRACE(xarflogger,  (char *) "converted OTS tid len:" << otidlen << (char *) " XID: "
  95. << xid.formatID << ':' << xid.gtrid_length << ':' << xid.bqual_length << ':' << xid.data);
  96.         ok = true;
  97.     } catch (CosTransactions::Unavailable & e) {
  98.         LOG4CXX_ERROR(xarflogger,  (char *) "XA-compatible Transaction Service raised unavailable: " << e._name());
  99.     } catch (const CORBA::OBJECT_NOT_EXIST &e) {
  100. // transaction has most likely timed out
  101.         LOG4CXX_DEBUG(xarflogger,  (char *) "Unexpected exception converting xid: " << e._name());
  102.     } catch  (CORBA::Exception& e) {
  103.         LOG4CXX_ERROR(xarflogger,  (char *) "Unexpected exception converting xid: " << e._name());
  104.     } catch  (...) {
  105.         LOG4CXX_ERROR(xarflogger,  (char *) "Unexpected generic exception converting xid");
  106.     }
  107. txx_release_control(cp);
  108. return ok;
  109. }
  110. static int _rm_start(XAResourceManager* rm, XID& xid, long flags)
  111. {
  112. FTRACE(xarflogger, "ENTER");
  113. return rm->xa_start(&xid, flags);
  114. }
  115. static int _rm_end(XAResourceManager* rm, XID& xid, long flags)
  116. {
  117. FTRACE(xarflogger, "ENTER");
  118. return rm->xa_end(&xid, flags);
  119. }
  120. static int _rmiter(ResourceManagerMap& rms, int (*func)(XAResourceManager *, XID&, long),
  121. bool isOriginator, int flags, int altflags)
  122. {
  123. FTRACE(xarflogger, "ENTER: flags=0x" << std::hex << flags << " tx owner=" << isOriginator);
  124. XID xid;
  125. if (!XAResourceManagerFactory::getXID(xid)) {
  126. LOG4CXX_TRACE(xarflogger,  (char *) "No tx ... returning");
  127. return XAER_NOTA;
  128. }
  129. for (ResourceManagerMap::iterator i = rms.begin(); i != rms.end(); ++i) {
  130. XAResourceManager * rm = i->second;
  131. int rc;
  132. LOG4CXX_TRACE(xarflogger,  (char *) rm->name() << ": xa flags=0x" << std::hex << rm->xa_flags());
  133. rc = func(rm, xid, (rm->xa_flags() & TMNOMIGRATE) && altflags != -1 ? altflags : flags);
  134. if (rc != XA_OK) {
  135. LOG4CXX_DEBUG(xarflogger,  (char *) rm->name() << ": rm operation failed");
  136. return rc;
  137. }
  138. LOG4CXX_TRACE(xarflogger,  rm->name() << ": rm operation ok");
  139. }
  140. return XA_OK;
  141. }
  142. XAResourceManagerFactory::XAResourceManagerFactory() : poa_(0)
  143. {
  144. FTRACE(xarflogger, "ENTER");
  145. }
  146. XAResourceManagerFactory::~XAResourceManagerFactory()
  147. {
  148. FTRACE(xarflogger, "ENTER");
  149. destroyRMs();
  150. if (!CORBA::is_nil(poa_)) {
  151. CORBA::release(poa_);
  152. poa_ = NULL;
  153. }
  154. }
  155. XAResourceManager * XAResourceManagerFactory::findRM(long id)
  156. {
  157. FTRACE(xarflogger, "ENTER");
  158. ResourceManagerMap::iterator i = rms_.find(id);
  159. return (i == rms_.end() ? NULL : i->second);
  160. }
  161. void XAResourceManagerFactory::destroyRMs()
  162. {
  163. FTRACE(xarflogger, "ENTER");
  164. for (ResourceManagerMap::iterator i = rms_.begin(); i != rms_.end(); ++i)
  165. delete i->second;
  166. rms_.clear();
  167. }
  168. int XAResourceManagerFactory::startRMs(bool isOriginator, int flags, int altflags)
  169. {
  170. FTRACE(xarflogger, "ENTER");
  171. LOG4CXX_DEBUG(xarflogger, (char *) " starting RMs flags=0x" << std::hex << flags);
  172. // there is a current transaction (otherwise the call doesn't need to start the RMs
  173. return _rmiter(rms_, _rm_start, isOriginator, flags, altflags);
  174. }
  175. int XAResourceManagerFactory::endRMs(bool isOriginator, int flags, int altflags)
  176. {
  177. FTRACE(xarflogger, "ENTER");
  178. LOG4CXX_DEBUG(xarflogger,  (char *) "end RMs flags=0x" << std::hex << flags);
  179. return _rmiter(rms_, _rm_end, isOriginator, flags, altflags);
  180. }
  181. /**
  182.  * See if there are any transaction branches in need of revovery. This call is performed
  183.  * once at startup so there should be no transactions created during the recovery scan.
  184.  */
  185. void XAResourceManagerFactory::run_recovery()
  186. {
  187. FTRACE(xarflogger, "ENTER");
  188. /*
  189.  * If the TM failed before updating its recovery log then there may RMs with pending
  190.  * branches. Ask each registered RM to return all pending XIDs and if any belong to
  191.  * the TM but weren't in the recovery log then rollback the branch:
  192.  */
  193. for (ResourceManagerMap::iterator i = rms_.begin(); i != rms_.end(); ++i)
  194. i->second->recover();
  195. /*
  196.  * iterate through the recovery log and try to recover each branch
  197.  */
  198. for (rrec_t* rrp = rclog_.find_next(0); rrp; rrp = rclog_.find_next(rrp)) {
  199. // the first long in the XID data contains the RM id
  200. long rmid = ACE_OS::atol((char *) ((rrp->xid).data + (rrp->xid).gtrid_length));
  201. XAResourceManager *rm = findRM(rmid);
  202. if (rm != NULL) {
  203. if (rm->recover(rrp->xid, rclog_.get_ior(*rrp)))
  204. rclog_.del_rec(rrp->xid);
  205. } else {
  206. LOG4CXX_DEBUG(xarflogger,  (char *) "recover_branches rm " << rmid << " not found");
  207. }
  208. }
  209. }
  210. void XAResourceManagerFactory::createRMs(CORBA_CONNECTION * connection) throw (RMException)
  211. {
  212. FTRACE(xarflogger, "ENTER rmsize: " << rms_.size());
  213. if (CORBA::is_nil(poa_))
  214. create_poa(connection);
  215. if (rms_.size() == 0) {
  216. xarm_config_t * rmp = (xarmp == 0 ? 0 : xarmp->head);
  217. while (rmp != 0) {
  218. LOG4CXX_TRACE(xarflogger,  (char*) "createRM:"
  219. << (char *) " xaResourceMgrId: " << rmp->resourceMgrId
  220. << (char *) " xaResourceName: " << rmp->resourceName
  221. << (char *) " xaOpenString: " << rmp->openString
  222. << (char *) " xaCloseString: " << rmp->closeString
  223. << (char *) " xaSwitch: " << rmp->xasw
  224. << (char *) " xaLibName: " << rmp->xalib
  225. );
  226. (void) createRM(connection, rmp);
  227. rmp = rmp->next;
  228. }
  229. }
  230. run_recovery();
  231. }
  232. /**
  233.  * Create a Resource Manager proxy for a XA compliant RM.
  234.  * RMs must have a unique rmid.
  235.  * A separate POA is created for each RM whose name is
  236.  * derived from the unique rmid. The POA is responsible for
  237.  * generating servants that correspond to each transaction branch
  238.  * (a branch is created when start on the RM is called).
  239.  */
  240. XAResourceManager * XAResourceManagerFactory::createRM(
  241. CORBA_CONNECTION * connection,
  242. xarm_config_t *rmp)
  243. throw (RMException)
  244. {
  245. FTRACE(xarflogger, "ENTER");
  246. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  247. const char* serverId = env->getenv("BLACKTIE_SERVER_ID", "0");
  248. AtmiBrokerEnv::discard_instance();
  249. // make sure the XA_RESOURCE XML config is valid
  250. if (rmp->resourceMgrId == 0 || rmp->xasw == NULL || rmp->xalib == NULL) {
  251. LOG4CXX_DEBUG(xarflogger, 
  252. (char *) "Bad XA_RESOURCE config: "
  253. << " rmid: " << rmp->resourceMgrId
  254. << " xaswitch symbol: " << rmp->xasw
  255. << " xa lib name: " << rmp->xalib);
  256. //destroyRMs(NULL);
  257. RMException ex = RMException("Invalid XA_RESOURCE XML config", EINVAL);
  258. throw ex;
  259. }
  260. // Check that rmid is unique
  261. XAResourceManager * id = findRM(rmp->resourceMgrId);
  262. if (id != 0) {
  263. LOG4CXX_INFO(xarflogger, 
  264. (char *) "Duplicate RM with id " << rmp->resourceMgrId);
  265. RMException ex("RMs must have unique ids", EINVAL);
  266. throw ex;
  267. }
  268. void * symbol = lookup_symbol(rmp->xalib, rmp->xasw);
  269. ptrdiff_t tmp = reinterpret_cast<ptrdiff_t> (symbol);
  270. struct xa_switch_t * xa_switch = reinterpret_cast<struct xa_switch_t *>(tmp);
  271. if (xa_switch == NULL) {
  272. LOG4CXX_ERROR(xarflogger, 
  273. (char *) " xa_switch " << rmp->xasw << (char *) " not found in library " << rmp->xalib);
  274. RMException ex("Could not find xa_switch in library", 0);
  275. throw ex;
  276. }
  277. LOG4CXX_TRACE(xarflogger,  (char *) "creating xa rm: " << xa_switch->name);
  278. XAResourceManager * a = new XAResourceManager(
  279. connection, rmp->resourceName, rmp->openString, rmp->closeString, rmp->resourceMgrId, ACE_OS::atol((char *) serverId),
  280. xa_switch, rclog_, poa_);
  281. LOG4CXX_TRACE(xarflogger,  (char *) "created xarm");
  282. if (a != NULL)
  283. rms_[rmp->resourceMgrId] = a;
  284. return a;
  285. }
  286. // All resource managers share the same POA.
  287. void XAResourceManagerFactory::create_poa(CORBA_CONNECTION * connection) throw (RMException) {
  288. FTRACE(xarflogger, "ENTER");
  289. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  290. const char* poaname = env->getenv("BLACKTIE_SERVER_NAME", "ATMI_RM_POA");
  291. AtmiBrokerEnv::discard_instance();
  292. PortableServer::POAManager_ptr poa_manager = (PortableServer::POAManager_ptr) connection->root_poa_manager;
  293. PortableServer::POA_ptr parent_poa = (PortableServer::POA_ptr) connection->root_poa;
  294. PortableServer::LifespanPolicy_var p1 = parent_poa->create_lifespan_policy(PortableServer::PERSISTENT);
  295. PortableServer::IdAssignmentPolicy_var p2 = parent_poa->create_id_assignment_policy(PortableServer::USER_ID);
  296. CORBA::PolicyList policies;
  297. policies.length(2); // set number of policies to 1 to disable USER_ID policy
  298. // the servant object references must survive failure of the ORB in order to support recover of 
  299. // transaction branches (the default orb policy for servants is transient)
  300. policies[0] = PortableServer::LifespanPolicy::_duplicate(p1);
  301. policies[1] = PortableServer::IdAssignmentPolicy::_duplicate(p2);
  302. // create a the POA
  303. try {
  304. this->poa_ = parent_poa->create_POA(poaname, poa_manager, policies);
  305. p1->destroy(); p2->destroy();
  306. } catch (PortableServer::POA::AdapterAlreadyExists &) {
  307. p1->destroy(); p2->destroy();
  308. try {
  309. this->poa_ = parent_poa->find_POA(poaname, false);
  310. } catch (const PortableServer::POA::AdapterNonExistent &) {
  311. LOG4CXX_ERROR(xarflogger, (char *) "Duplicate RM POA with name " << poaname <<
  312. " (check that the server was started with a unique name using the -n <name> flag)");
  313. RMException ex("Duplicate RM POA", EINVAL);
  314. throw ex;
  315. }
  316. } catch (PortableServer::POA::InvalidPolicy &) {
  317. p1->destroy(); p2->destroy();
  318. LOG4CXX_WARN(xarflogger, (char *) "Invalid RM POA policy");
  319. RMException ex("Invalid RM POA policy", EINVAL);
  320. throw ex;
  321. }
  322. // take the POA out of its holding state
  323. LOG4CXX_TRACE(xarflogger,  (char *) "activating RM POA");
  324. PortableServer::POAManager_var mgr = this->poa_->the_POAManager();
  325. mgr->activate();
  326. FTRACE(xarflogger, ">");
  327. }