XARecoveryLog.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:12k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include <iostream>
  19. #include <string>
  20. #include <sstream>
  21. #include <stdio.h>
  22. #include <errno.h>
  23. #include "log4cxx/logger.h"
  24. #include "xa.h"
  25. #include "ace/OS_NS_string.h"
  26. #include "ace/OS_NS_stdio.h"
  27. #include "ace/OS_NS_stdlib.h"
  28. #include "XARecoveryLog.h"
  29. #include "AtmiBrokerEnv.h"
  30. #include "SynchronizableObject.h"
  31. #define INUSE 0xaf12L // marker to indicate that the block is allocated
  32. #define NBLOCKS 0x100 // the minimum number of blocks to allocate when expanding the arena
  33. #define MAXBLOCKS   "0x1000" // limit the size of the arena to this many blocks
  34. #define IOR(rr) ((char *) (rr + 1)) // extract the IOR from a recovery record
  35. #ifdef TESTSYNCDEL
  36. #define SEGVONDEL() {char *p = 0; *p = 0;}
  37. #else
  38. #define SEGVONDEL() {}
  39. #endif
  40. #ifdef TESTSYNCADD
  41. #define SEGVONADD(ior) {if (strcmp(ior, "SEGV") == 0) {char *p = 0; *p = 0;}}
  42. #else
  43. #define SEGVONADD(ior) {}
  44. #endif
  45. // the persistent store for recovery records
  46. static const char* DEF_LOG = "rclog";
  47. static char RCLOGPATH[1024];
  48. static SynchronizableObject lock_;
  49. log4cxx::LoggerPtr xarcllogger(log4cxx::Logger::getLogger("TxLogXARecoveryLog"));
  50. using namespace std;
  51. static int compareXids(const XID& xid1, const XID& xid2)
  52. {
  53. char *x1 = (char *) &xid1;
  54. char *x2 = (char *) &xid2;
  55. char *e = (char *) (x1 + sizeof (XID));
  56. while (x1 < e)
  57. if (*x1 < *x2)
  58. return -1;
  59. else if (*x1++ > *x2++)
  60. return 1;
  61. return 0;
  62. }
  63. // convert an X/Open XID to a string (used as a key into the allocator to retrieve the rrec_t data type
  64. // associated with the key)
  65. static string xid_to_string(XID& xid)
  66. {
  67. std::stringstream out;
  68. out << xid.formatID << ':' << xid.gtrid_length << ':'<< xid.bqual_length << ':' << (char *) (xid.data + xid.gtrid_length);
  69. return out.str();
  70. }
  71. /**
  72.  * locate the path to the backing store for the recovery log
  73.  */
  74. static void init_logpath(const char *fname)
  75. {
  76. if (fname) {
  77. ACE_OS::snprintf(RCLOGPATH, sizeof (RCLOGPATH), "%s", fname);
  78. } else {
  79. // if fname is not passed see if the log name is set in the environent
  80. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  81. const char *rcLog    = env->getenv((char*) "BLACKTIE_RC_LOG_NAME", DEF_LOG);
  82. const char *servName = env->getenv((char*) "BLACKTIE_SERVER_NAME", rcLog);
  83. ACE_OS::snprintf(RCLOGPATH, sizeof (RCLOGPATH), "%s", servName);
  84. AtmiBrokerEnv::discard_instance();
  85. }
  86. LOG4CXX_TRACE(xarcllogger, (char *) "Using log file " << RCLOGPATH);
  87. }
  88. /*
  89.  * Construct a recovery log for storing XID's and their associated OTS Recovery Records.
  90.  * The log is backed by a file.
  91.  */
  92. XARecoveryLog::XARecoveryLog(const char* logfile) throw (RMException) :
  93. arena_(0), nblocks_((size_t) 0), maxblocks_(0)
  94. {
  95. bool isClient = false;
  96. init_logpath(logfile);
  97. if (!isClient && !load_log(RCLOGPATH)) {
  98. LOG4CXX_ERROR(xarcllogger, (char *) "Error creating recovery log");
  99. //throw new RMException("Error creating recovery log ", -1);
  100. //TODO propagate the exception
  101. }
  102. }
  103. /**
  104.  * free the arena and close the backing file
  105.  */
  106. XARecoveryLog::~XARecoveryLog()
  107. {
  108. LOG4CXX_TRACE(xarcllogger, (char *) "destructor");
  109. if (arena_)
  110. ACE_OS::free(arena_);
  111. if (log_.is_open())
  112. log_.close();
  113. }
  114. /**
  115.  * Read a collection of recovery records from file and load them into memory
  116.  */
  117. bool XARecoveryLog::load_log(const char* logname)
  118. {
  119. LOG4CXX_TRACE(xarcllogger, (char *) "Loading log file: " << logname);
  120. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  121. const char* maxblk = env->getenv("BLACKTIE_MAX_RCLOG_SIZE", MAXBLOCKS);
  122. AtmiBrokerEnv::discard_instance();
  123. ios_base::openmode mode = ios::out | ios::in | ios::binary;
  124. // make sure the log file exists
  125. FILE *fp = fopen(logname, "a+");
  126. if (fp == NULL) {
  127. LOG4CXX_ERROR(xarcllogger, (char *) "log open failed: " << errno);
  128. return false;
  129. }
  130. (void) fclose(fp);
  131. //log_.open (logname, mode | ios::app);
  132. //log_.close();
  133. log_.open (logname, mode);
  134. if (!log_.is_open()) {
  135. LOG4CXX_ERROR(xarcllogger, (char *) "log open failed for: " << logname);
  136. return false;
  137. }
  138. // calculate the number of bytes in the file
  139. size_t sz;
  140. fstream::pos_type s = log_.tellg();
  141. log_.seekg (0, ios::end);
  142. sz = (size_t) (log_.tellg() - s);
  143. if ((maxblocks_ = ACE_OS::strtol(maxblk, NULL, 0)) == 0) {
  144. LOG4CXX_ERROR(xarcllogger, (char *) "the env variable BLACKTIE_MAX_RCLOG_SIZE is invalid: " << (char *) maxblk);
  145. return false;
  146. }
  147. LOG4CXX_TRACE(xarcllogger, (char *) "recovery log size: " << maxblocks_);
  148. // allocate enough space into the arena for sz bytes
  149. if (!morecore(sz / sizeof (rrec_t) + 1, false))
  150. return false;
  151. // read the bytes from the file into the arena
  152. log_.seekg(0, ios::beg);
  153. log_.read((char *) arena_, sz);
  154. // debug_dump(arena_, arena_ + nblocks_);
  155. // verify that the log is consistent (must be called
  156. // at log creation time)
  157. check_log();
  158. // LOG4CXX_TRACE(xarcllogger, (char *) "load_log: contents:");
  159. // for (rrec_t* rr = find_next(0); rr; rr = find_next(rr))
  160. // LOG4CXX_TRACE(xarcllogger, (char *) "next: " << IOR(rr));
  161. return true;
  162. }
  163. /**
  164.  * Increase the size of the arena for storing more records.
  165.  * The size of the storage area is increased on demand but is never
  166.  * reduced. If the backing file ends up containing large amounts of
  167.  * free space it may be beneficial to reduce its size periodically.
  168.  */
  169. bool XARecoveryLog::morecore(size_t nblocks, bool dosync) {
  170. size_t nsz;
  171. rrec_t* ar;
  172. if (!log_.is_open())
  173. return false;
  174. if (nblocks < NBLOCKS)
  175. nblocks = NBLOCKS;
  176. nsz = nblocks_ + nblocks;
  177. if (nblocks_ > maxblocks_) {
  178. LOG4CXX_WARN(xarcllogger, (char *) "recovery log has grown beyond its configurable limitn");
  179. } else if ((ar = (rrec_t*) ACE_OS::calloc(sizeof (rrec_t), nsz)) == 0) {
  180. LOG4CXX_WARN(xarcllogger, (char *) "recovery log: out of memory");
  181. } else {
  182. LOG4CXX_TRACE(xarcllogger, (char *) "increasing rc log blocks from " << hex << nblocks_ << " to " << nsz);
  183. ACE_OS::memcpy(ar, arena_, nblocks_);
  184. if (dosync)
  185. sync_rec(ar + nblocks_, nblocks * sizeof (rrec_t));
  186. nblocks_ = nsz;
  187. ACE_OS::free(arena_);
  188. arena_ = ar;
  189. return true;
  190. }
  191. return false;
  192. }
  193. void XARecoveryLog::sync_rec(void* p, size_t sz) {
  194. fstream::pos_type pos = (fstream::pos_type) ((char *) p - (char *) arena_);
  195. LOG4CXX_TRACE(xarcllogger, (char *) "sync " << p << " p size " << sz << " at pos " << (long) pos);
  196. log_.seekg(pos);
  197. log_.write((char *) p, sz);
  198. log_.sync();
  199. }
  200. rrec_t* XARecoveryLog::next_rec(rrec_t* p) {
  201. return (p->next && p->next < nblocks_ ?  arena_ + p->next : 0);
  202. }
  203. int XARecoveryLog::del_rec(XID& xid) {
  204. rrec_t* prev;
  205. rrec_t* next;
  206. if (!log_.is_open())
  207. return -1;
  208. lock_.lock();
  209. if (find(xid, &prev, &next) != 0) {
  210. LOG4CXX_TRACE(xarcllogger, (char *) "del_rec: xid " << xid_to_string(xid).c_str() << " not found");
  211. lock_.unlock();
  212. return -1;
  213. }
  214. // mark the block as free and sync it
  215. LOG4CXX_TRACE(xarcllogger, (char *) "deleting xid " << xid_to_string(xid).c_str() <<
  216. " at offset " << next->next << " IOR: " << IOR(next));
  217. next->magic = 0l;
  218. sync_rec(&(next->magic), sizeof (next->magic));
  219. // a failure here will leave prev pointing to a free block
  220. // which will be fixed up when the log is re-read
  221. SEGVONDEL();
  222. prev->next = next->next;
  223. sync_rec(&(prev->next), sizeof (prev->next));
  224. lock_.unlock();
  225. return 0;
  226. }
  227. const char* XARecoveryLog::get_ior(rrec_t& rr) {
  228. return IOR(&rr);
  229. }
  230. /**
  231.  * Locate the next record following the passed in record.
  232.  * If from is NULL the first record is returned - thus
  233.  * the returned record also serves as a handle for finding
  234.  * the next record - including the case where the record is
  235.  * deleted. Note that it will find records inserted after
  236.  * the handle but not ones inserted before it.
  237.  */
  238. rrec_t* XARecoveryLog::find_next(rrec_t* from) {
  239. if (!log_.is_open())
  240. return 0;
  241. if (from == 0) {
  242. if (arena_->magic == INUSE)
  243. return arena_;
  244. from = arena_;
  245. }
  246. for (rrec_t* p = next_rec(from); p; p = next_rec(p))
  247. if (p->magic == INUSE)
  248. return p;
  249. return 0;
  250. }
  251. /**
  252.  * Locate the record keyed by xid. The returned record is passed
  253.  * back to the caller in next (the record preceding it is returned
  254.  * in prev - use for deleting records).
  255.  */
  256. int XARecoveryLog::find(XID xid, rrec_t** prev, rrec_t** next) {
  257. if (!log_.is_open())
  258. return -1;
  259. *prev = arena_;
  260. *next = arena_;
  261. while (*next) {
  262. if (compareXids(xid, (*next)->xid) == 0)
  263. return 0;
  264. *prev = *next;
  265. *next = next_rec(*next);
  266. }
  267. return -1;
  268. }
  269. /**
  270.  * Lookup the IOR associated with the given XID
  271.  */
  272. char* XARecoveryLog::find_ior(XID& xid) {
  273. rrec_t* prev;
  274. rrec_t* next;
  275. if (find(xid, &prev, &next) == 0)
  276. return IOR(next);
  277. return 0;
  278. }
  279. /**
  280.  * Insert a recovery record into persistent storage.
  281.  */
  282. int XARecoveryLog::add_rec(XID& xid, char* ior) {
  283. size_t nblocks = ((sizeof (rrec_t) + strlen(ior)) / sizeof (rrec_t)) + 1;
  284. rrec_t* fb; // next free block of the correct size
  285. LOG4CXX_TRACE(xarcllogger, (char *) "looking for a block of size " << nblocks * sizeof (rrec_t));
  286. if (!log_.is_open())
  287. return -1;
  288. lock_.lock();
  289. if ((fb = next_free(nblocks)) == 0) {
  290. LOG4CXX_TRACE(xarcllogger, (char *) "tno space, increasing arenan");
  291. if (!morecore(nblocks, true) || (fb = next_free(nblocks)) == 0) {
  292. LOG4CXX_ERROR(xarcllogger, (char *) "tno large enough free region (required " <<
  293. nblocks << " blocks)");
  294. lock_.unlock();
  295. return -1;
  296. }
  297. }
  298. fb->xid = xid;
  299. fb->next = (fb - arena_) + nblocks; // nblocks is in rrec_t units
  300. fb->magic = INUSE;
  301. ACE_OS::memcpy(IOR(fb), ior, strlen(ior) + 1);
  302. LOG4CXX_TRACE(xarcllogger, (char *) "adding xid " << xid_to_string(xid).c_str() <<
  303. " at offset " << fb->next << ": IOR: " << ior);
  304. sync_rec(fb, nblocks * sizeof (rrec_t));
  305. // to test that the allocator synced the block correctly generate
  306. // a segmentation fault - the record should be available on restart.
  307. SEGVONADD(ior);
  308. lock_.unlock();
  309. return 0; //fb;
  310. }
  311. /**
  312.  * find the next free region of size greater than nblocks
  313.  */
  314. rrec_t* XARecoveryLog::next_free(size_t nblocks) {
  315. size_t sz = 0;
  316. if (!log_.is_open())
  317. return 0;
  318. for (rrec_t* p = arena_; p; ) {
  319. if (p->magic == INUSE) {
  320. sz = 0;
  321. p = next_rec(p);
  322. } else if (++sz >= nblocks) {
  323. return p - sz + 1;
  324. } else {
  325. p += 1;
  326. }
  327. }
  328. return 0;
  329. }
  330. /**
  331.  * there is a window during delete where a log record
  332.  * can end up pointing to a free block. If that happened
  333.  * it must have been the last action so provided check_log
  334.  * is called when XARecoveryLog is constructed all will
  335.  * be well.
  336.  */
  337. void XARecoveryLog::check_log() {
  338. for (rrec_t* rr = arena_; ;) {
  339. rrec_t* next = next_rec(rr);
  340. if (next == 0)
  341. return;
  342. if (rr->magic == INUSE && next->magic != INUSE) {
  343. // must have previously failed whilst deleting next
  344. LOG4CXX_INFO(xarcllogger, (char *) "fixing up recovery log");
  345. rr->next = next->next;
  346. sync_rec(&(rr->next), sizeof (rr->next));
  347. }
  348. rr = next;
  349. }
  350. }
  351. void XARecoveryLog::debug_dump(rrec_t* p, rrec_t* end) {
  352. LOG4CXX_TRACE(xarcllogger, (char *) "dumping from " << p << " to " << end);
  353. // LOG4CXX_TRACE(xarcllogger, (char *) "dumping arena: next=0x%x magic=0x%x formatID=0x%x recsz=0x%x ior=%sn",
  354. // p->next, p->magic, p->xid.formatID, sizeof (rrec_t), IOR(p));
  355. while (p && p < end) {
  356. if (p->magic == INUSE) {
  357. LOG4CXX_TRACE(xarcllogger, (char *) "addr: " << p << " ior: " << IOR(p));
  358. p = next_rec(p);
  359. } else {
  360. p++;
  361. }
  362. }
  363. }