ServiceDispatcher.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:9k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "ServiceDispatcher.h"
  19. #include "txx.h"
  20. #include "ThreadLocalStorage.h"
  21. #include "AtmiBrokerEnv.h"
  22. #include "AtmiBrokerMem.h"
  23. #include "txx.h"
  24. #include <tao/ORB.h>
  25. #include <ace/OS_NS_time.h>
  26. log4cxx::LoggerPtr ServiceDispatcher::logger(log4cxx::Logger::getLogger(
  27. "ServiceDispatcher"));
  28. extern void setTpurcode(long rcode);
  29. ServiceDispatcher::ServiceDispatcher(AtmiBrokerServer* server,
  30. Destination* destination, Connection* connection,
  31. const char *serviceName, void(*func)(TPSVCINFO *), bool isPause,
  32. SynchronizableObject* reconnect) {
  33. this->reconnect = reconnect;
  34. this->destination = destination;
  35. this->connection = connection;
  36. this->serviceName = strdup(serviceName);
  37. this->func = func;
  38. this->isPause = isPause;
  39. session = NULL;
  40. stop = false;
  41. this->timeout = (long) (mqConfig.destinationTimeout * 1000000);
  42. this->counter = 0;
  43. this->error_counter = 0;
  44. this->server = server;
  45. this->minResponseTime = 0;
  46. this->avgResponseTime = 0;
  47. this->maxResponseTime = 0;
  48. pauseLock = new SynchronizableObject();
  49. }
  50. ServiceDispatcher::~ServiceDispatcher() {
  51. free(this->serviceName);
  52. delete pauseLock;
  53. }
  54. int ServiceDispatcher::pause(void) {
  55. LOG4CXX_TRACE(logger, "ServiceDispatcher pause");
  56. if (isPause == false) {
  57. pauseLock->lock();
  58. isPause = true;
  59. pauseLock->unlock();
  60. }
  61. return 0;
  62. }
  63. int ServiceDispatcher::resume(void) {
  64. LOG4CXX_TRACE(logger, "ServiceDispatcher resume");
  65. if (isPause) {
  66. pauseLock->lock();
  67. isPause = false;
  68. pauseLock->notify();
  69. pauseLock->unlock();
  70. }
  71. return 0;
  72. }
  73. int ServiceDispatcher::svc(void) {
  74. while (!stop) {
  75. // This will wait while the server is paused
  76. pauseLock->lock();
  77. while (!stop && isPause) {
  78. LOG4CXX_DEBUG(logger, (char*) "pausing: " << serviceName);
  79. pauseLock->wait(0);
  80. LOG4CXX_DEBUG(logger, (char*) "paused: " << serviceName);
  81. }
  82. pauseLock->unlock();
  83. MESSAGE message = destination->receive(this->timeout);
  84. if (message.received) {
  85. try {
  86. counter += 1;
  87. ACE_Time_Value start = ACE_OS::gettimeofday();
  88. onMessage(message);
  89. ACE_Time_Value end = ACE_OS::gettimeofday();
  90. ACE_Time_Value tv = end - start;
  91. unsigned long responseTime = tv.msec();
  92. LOG4CXX_DEBUG(logger, (char*) "response time is "
  93. << responseTime);
  94. if (minResponseTime == 0 || responseTime < minResponseTime) {
  95. minResponseTime = responseTime;
  96. }
  97. avgResponseTime = ((avgResponseTime * (counter - 1))
  98. + responseTime) / counter;
  99. if (responseTime > maxResponseTime) {
  100. maxResponseTime = responseTime;
  101. }
  102. LOG4CXX_DEBUG(logger, (char*) "min:" << minResponseTime
  103. << (char*) " avg:" << avgResponseTime
  104. << (char*) " max:" << maxResponseTime);
  105. } catch (const CORBA::BAD_PARAM& ex) {
  106. LOG4CXX_WARN(logger, (char*) "Service dispatcher BAD_PARAM: "
  107. << ex._name());
  108. } catch (const CORBA::SystemException& ex) {
  109. LOG4CXX_WARN(logger,
  110. (char*) "Service dispatcher SystemException: "
  111. << ex._name());
  112. } catch (...) {
  113. LOG4CXX_ERROR(
  114. logger,
  115. (char*) "Service Dispatcher caught error running during onMessage");
  116. }
  117. if (message.data != NULL) {
  118. free(message.data);
  119. }
  120. setTpurcode(0);
  121. } else if (tperrno == TPESYSTEM) {
  122. LOG4CXX_WARN(
  123. logger,
  124. (char*) "Service dispatcher detected dead connection will reconnect after sleep");
  125. reconnect->lock();
  126. int timeout = 10;
  127. while (!stop && !destination->connected()) {
  128. LOG4CXX_DEBUG(logger, (char*) "sleeper, sleeping for "
  129. << timeout << " seconds");
  130. ACE_OS::sleep(timeout);
  131. LOG4CXX_DEBUG(logger, (char*) "sleeper, slept for " << timeout
  132. << " seconds");
  133. if (this->server->createAdminDestination(serviceName)) {
  134. LOG4CXX_INFO(logger,
  135. (char*) "Service dispatcher recreated: "
  136. << serviceName);
  137. destination->connect();
  138. }
  139. }
  140. reconnect->unlock();
  141. }
  142. }
  143. return 0;
  144. }
  145. void ServiceDispatcher::onMessage(MESSAGE message) {
  146. setSpecific(TPE_KEY, TSS_TPERESET);
  147. LOG4CXX_DEBUG(logger, (char*) "svc()");
  148. // INITIALISE THE SENDER AND RECEIVER FOR THIS CONVERSATION
  149. if (message.replyto) {
  150. LOG4CXX_DEBUG(logger, (char*) "replyTo: " << message.replyto);
  151. } else {
  152. LOG4CXX_DEBUG(logger, (char*) "replyTo: NULL");
  153. }
  154. LOG4CXX_TRACE(logger, (char*) "Creating session: " << message.correlationId);
  155. session = connection->createSession(message.correlationId, message.replyto);
  156. LOG4CXX_TRACE(logger, (char*) "Created session: " << message.correlationId);
  157. // EXTRACT THE DATA FROM THE INBOUND MESSAGE
  158. int correlationId = message.correlationId;
  159. long ilen = message.len;
  160. long flags = message.flags;
  161. LOG4CXX_DEBUG(logger, (char*) "ilen: " << ilen << " flags: " << flags
  162. << "cd: " << message.correlationId << " message.control="
  163. << message.control);
  164. // PREPARE THE STRUCT FOR SENDING TO THE CLIENT
  165. TPSVCINFO tpsvcinfo;
  166. memcpy(tpsvcinfo.name, message.serviceName, XATMI_SERVICE_NAME_LENGTH);
  167. memset(&tpsvcinfo, '', sizeof(tpsvcinfo));
  168. strcpy(tpsvcinfo.name, this->serviceName);
  169. tpsvcinfo.flags = flags;
  170. tpsvcinfo.len = ilen;
  171. if (message.data != NULL) {
  172. tpsvcinfo.data = AtmiBrokerMem::get_instance()->tpalloc(message.type,
  173. message.subtype, ilen, true);
  174. if (message.len > 0) {
  175. memcpy(tpsvcinfo.data, message.data, ilen);
  176. }
  177. } else {
  178. tpsvcinfo.data = NULL;
  179. }
  180. setSpecific(SVC_KEY, this);
  181. setSpecific(SVC_SES, session);
  182. if (tpsvcinfo.flags & TPCONV) {
  183. tpsvcinfo.cd = correlationId;
  184. long olen = 4;
  185. char* odata = (char*) tpalloc((char*) "X_OCTET", NULL, olen);
  186. strcpy(odata, "ACK");
  187. long revent = 0;
  188. long result = tpsend(tpsvcinfo.cd, odata, olen, 0, &revent);
  189. if (result == -1) {
  190. connection->closeSession(message.correlationId);
  191. destroySpecific( SVC_SES);
  192. destroySpecific( SVC_KEY);
  193. return;
  194. }
  195. } else {
  196. LOG4CXX_DEBUG(logger, (char*) "cd not being set");
  197. }
  198. if (tpsvcinfo.flags & TPRECVONLY) {
  199. session->setCanRecv(false);
  200. LOG4CXX_DEBUG(logger, (char*) "onMessage set constraints session: "
  201. << session->getId() << " send(not changed): "
  202. << session->getCanSend() << " recv: " << session->getCanRecv());
  203. } else if (tpsvcinfo.flags & TPSENDONLY) {
  204. session->setCanSend(false);
  205. LOG4CXX_DEBUG(logger, (char*) "onMessage set constraints session: "
  206. << session->getId() << " send: " << session->getCanSend()
  207. << " recv (not changed): " << session->getCanRecv());
  208. }
  209. // HANDLE THE CLIENT INVOCATION
  210. if (message.control != NULL && strcmp((char*) message.control, "null") != 0) {
  211. if (txx_associate_serialized((char*) message.control, message.ttl)
  212. != XA_OK) {
  213. LOG4CXX_ERROR(logger, "Unable to handle control");
  214. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  215. }
  216. tpsvcinfo.flags = (tpsvcinfo.flags | TPTRAN);
  217. }
  218. if (tperrno == 0) {
  219. try {
  220. LOG4CXX_TRACE(logger, (char*) "Calling function");
  221. this->func(&tpsvcinfo);
  222. LOG4CXX_TRACE(logger, (char*) "Called function");
  223. } catch (...) {
  224. LOG4CXX_ERROR(
  225. logger,
  226. (char*) "ServiceDispatcher caught error running during onMessage");
  227. }
  228. } else {
  229. LOG4CXX_ERROR(logger,
  230. (char*) "Not invoking tpservice as tpernno was not 0");
  231. }
  232. AtmiBrokerMem::get_instance()->tpfree(tpsvcinfo.data, true);
  233. // CLEAN UP THE SENDER AND RECEIVER FOR THIS CLIENT
  234. if (session->getCanSend()) {
  235. LOG4CXX_TRACE(logger,
  236. (char*) "Returning error - marking tx as rollback only if "
  237. << getSpecific(TSS_KEY));
  238. ::tpreturn(TPFAIL, TPESVCERR, NULL, 0, 0);
  239. LOG4CXX_TRACE(logger, (char*) "Returned error");
  240. } else if (getSpecific(TSS_KEY) != NULL) {
  241. txx_release_control(txx_unbind(true));
  242. }
  243. LOG4CXX_TRACE(logger, (char*) "ServiceDispatcher closing session: "
  244. << message.correlationId);
  245. connection->closeSession(message.correlationId);
  246. // session = NULL;
  247. LOG4CXX_TRACE(logger, (char*) "ServiceDispatcher session closed: "
  248. << message.correlationId);
  249. // HybridConnectionImpl* instance = dynamic_cast<HybridConnectionImpl*> (connection);
  250. //shutdownBindings(instance->connection);
  251. destroySpecific( SVC_SES);
  252. destroySpecific( SVC_KEY);
  253. LOG4CXX_TRACE(logger,
  254. (char*) "Freeing the data that was passed to the service");
  255. // free(idata);
  256. LOG4CXX_TRACE(logger, (char*) "Freed the data");
  257. }
  258. void ServiceDispatcher::shutdown() {
  259. pauseLock->lock();
  260. stop = true;
  261. isPause = false;
  262. pauseLock->notify();
  263. pauseLock->unlock();
  264. }
  265. long ServiceDispatcher::getCounter() {
  266. return counter;
  267. }
  268. long ServiceDispatcher::getErrorCounter() {
  269. return error_counter;
  270. }
  271. void ServiceDispatcher::updateErrorCounter() {
  272. error_counter++;
  273. }
  274. void ServiceDispatcher::getResponseTime(unsigned long* min, unsigned long* avg,
  275. unsigned long* max) {
  276. *min = minResponseTime;
  277. *avg = avgResponseTime;
  278. *max = maxResponseTime;
  279. }
  280. SynchronizableObject* ServiceDispatcher::getReconnect() {
  281. return reconnect;
  282. }