AtmiBrokerClient.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:7k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "log4cxx/basicconfigurator.h"
  19. #include "log4cxx/propertyconfigurator.h"
  20. #include "log4cxx/logger.h"
  21. #include "log4cxx/logmanager.h"
  22. #include "AtmiBrokerInit.h"
  23. #include "AtmiBrokerClient.h"
  24. #include "xatmi.h"
  25. #include "AtmiBrokerClientControl.h"
  26. #include "AtmiBrokerMem.h"
  27. #include "AtmiBrokerEnv.h"
  28. #include "txx.h"
  29. #include "SymbolLoader.h"
  30. #include "ace/OS_NS_stdio.h"
  31. #include "ace/OS_NS_stdlib.h"
  32. #include "ace/OS_NS_string.h"
  33. #include "ace/Default_Constants.h"
  34. #include "ace/Signal.h"
  35. #include "ThreadLocalStorage.h"
  36. #include "AtmiBrokerSignalHandler.h"
  37. AtmiBrokerClient * ptrAtmiBrokerClient;
  38. log4cxx::LoggerPtr loggerAtmiBrokerClient(log4cxx::Logger::getLogger(
  39. "AtmiBrokerClient"));
  40. bool clientInitialized = false;
  41. SynchronizableObject client_lock;
  42. int client_sigint_handler_callback(int sig_type) {
  43. LOG4CXX_INFO(
  44. loggerAtmiBrokerClient,
  45. (char*) "SIGINT Detected: Shutting down client this may take several minutes");
  46. clientdone(sig_type);
  47. LOG4CXX_INFO(loggerAtmiBrokerClient, (char*) "Shutdown complete");
  48. return 0;
  49. }
  50. int clientinit() {
  51. AtmiBrokerInitSingleton::instance();
  52. setSpecific(TPE_KEY, TSS_TPERESET);
  53. int toReturn = -1;
  54. client_lock.lock();
  55. if (ptrAtmiBrokerClient == NULL) {
  56. try {
  57. // This must be in the try catch as the configuration may not exist
  58. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  59. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientinit called");
  60. ptrAtmiBrokerClient = new AtmiBrokerClient();
  61. if (!clientInitialized) {
  62. LOG4CXX_DEBUG(loggerAtmiBrokerClient,
  63. (char*) "clientinit deleting Client");
  64. delete ptrAtmiBrokerClient;
  65. ptrAtmiBrokerClient = NULL;
  66. LOG4CXX_DEBUG(loggerAtmiBrokerClient,
  67. (char*) "clientinit deleted Client");
  68. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  69. } else {
  70. // install a handler for SIGINT and SIGTERM
  71. (env->getSignalHandler()).addSignalHandler(
  72. client_sigint_handler_callback);
  73. LOG4CXX_DEBUG(loggerAtmiBrokerClient,
  74. (char*) "Client Initialized");
  75. toReturn = 0;
  76. }
  77. } catch (...) {
  78. LOG4CXX_ERROR(loggerAtmiBrokerClient,
  79. (char*) "clientinit failed");
  80. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  81. }
  82. } else {
  83. toReturn = 0;
  84. }
  85. client_lock.unlock();
  86. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientinit returning "
  87. << toReturn);
  88. return toReturn;
  89. }
  90. int clientdone(int reason = 0) {
  91. setSpecific(TPE_KEY, TSS_TPERESET);
  92. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientdone called");
  93. client_lock.lock();
  94. if (ptrAtmiBrokerClient) {
  95. if (reason == 0) {
  96. LOG4CXX_DEBUG(loggerAtmiBrokerClient,
  97. (char*) "clientdone deleting Corba Client");
  98. delete ptrAtmiBrokerClient;
  99. ptrAtmiBrokerClient = NULL;
  100. LOG4CXX_DEBUG(loggerAtmiBrokerClient,
  101. (char*) "clientdone deleted Corba Client");
  102. } else {
  103. // cannot use closeSession since it deletes the underlying queue - instead
  104. // we need to interrupt any threads waiting on the underlying queues:
  105. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientdone interrupting connections");
  106. ptrAtmiBrokerClient->disconnectSessions();
  107. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientdone connections interrupted");
  108. }
  109. }
  110. client_lock.unlock();
  111. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientdone returning 0");
  112. return 0;
  113. }
  114. AtmiBrokerClient::AtmiBrokerClient() :
  115. currentConnection(NULL), nextSessionId(0) {
  116. try {
  117. lock = new SynchronizableObject();
  118. clientInitialized = true;
  119. } catch (...) {
  120. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  121. LOG4CXX_ERROR(loggerAtmiBrokerClient,
  122. (char*) "clientinit Unexpected exception");
  123. }
  124. }
  125. AtmiBrokerClient::~AtmiBrokerClient() {
  126. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "destructor");
  127. AtmiBrokerMem::discard_instance();
  128. txx_stop();
  129. clientConnectionManager.closeConnections();
  130. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "clientinit deleted services");
  131. delete lock;
  132. clientInitialized = false;
  133. AtmiBrokerEnv::discard_instance();
  134. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "Client Shutdown");
  135. }
  136. Session* AtmiBrokerClient::createSession(int& id, char* serviceName) {
  137. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "creating session: "
  138. << serviceName);
  139. if (serviceName == NULL) {
  140. setSpecific(TPE_KEY, TSS_TPEINVAL);
  141. return NULL;
  142. }
  143. Session* session = NULL;
  144. Connection* clientConnection = NULL;
  145. clientConnection = clientConnectionManager.getClientConnection();
  146. if (clientConnection != NULL) {
  147. lock->lock();
  148. currentConnection = clientConnection;
  149. id = nextSessionId++;
  150. lock->unlock();
  151. session = clientConnection->createSession(id, serviceName);
  152. session->setSigHandler(
  153. &(AtmiBrokerEnv::get_instance()->getSignalHandler()));
  154. AtmiBrokerEnv::discard_instance();
  155. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "created session: " << id
  156. << " send: " << session->getCanSend() << " recv: "
  157. << session->getCanRecv());
  158. }
  159. return session;
  160. }
  161. Session* AtmiBrokerClient::getSession(int id) {
  162. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "get session: " << id);
  163. Session* session = NULL;
  164. if (currentConnection != NULL) {
  165. session = currentConnection->getSession(id);
  166. }
  167. if (session != NULL) {
  168. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "got session: " << id
  169. << " send: " << session->getCanSend() << " recv: "
  170. << session->getCanRecv());
  171. } else {
  172. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "did not get session: "
  173. << id);
  174. }
  175. return session;
  176. }
  177. void AtmiBrokerClient::closeSession(int id) {
  178. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "close session: " << id);
  179. if (currentConnection != NULL) {
  180. currentConnection->closeSession(id);
  181. }
  182. }
  183. void AtmiBrokerClient::disconnectSessions() {
  184. if (currentConnection != NULL) {
  185. currentConnection->disconnectSession(-1);
  186. } else {
  187. LOG4CXX_DEBUG(loggerAtmiBrokerClient, (char*) "AtmiBrokerClient no connections to disconnect");
  188. }
  189. }