AtmiBrokerServer.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:32k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2008, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include <string>
  19. #include <sstream>
  20. #include <queue>
  21. #ifdef TAO_COMP
  22. #include <orbsvcs/CosNamingS.h>
  23. #endif
  24. #include "log4cxx/logger.h"
  25. #include "AtmiBrokerInit.h"
  26. #include "AtmiBrokerServer.h"
  27. #include "AtmiBrokerPoaFac.h"
  28. #include "AtmiBrokerEnv.h"
  29. #include "AtmiBrokerServerControl.h"
  30. #include "AtmiBrokerMem.h"
  31. #include "txx.h"
  32. #include "OrbManagement.h"
  33. #include "SymbolLoader.h"
  34. #include "ace/Get_Opt.h"
  35. #include "ace/OS_NS_stdio.h"
  36. #include "ace/OS_NS_stdlib.h"
  37. #include "ace/OS_NS_string.h"
  38. #include "ace/Default_Constants.h"
  39. #include "ace/Signal.h"
  40. #include "ThreadLocalStorage.h"
  41. #include "xatmi.h"
  42. // WORK AROUND FOR NO tx.h
  43. #define TX_OK              0
  44. #ifdef __cplusplus
  45. extern "C" {
  46. #endif
  47. extern BLACKTIE_TX_DLL int tx_open(void);
  48. #ifdef __cplusplus
  49. }
  50. #endif
  51. extern void ADMIN(TPSVCINFO* svcinfo);
  52. extern const char* version;
  53. log4cxx::LoggerPtr loggerAtmiBrokerServer(log4cxx::Logger::getLogger(
  54. "AtmiBrokerServer"));
  55. AtmiBrokerServer * ptrServer = NULL;
  56. bool serverInitialized = false;
  57. PortableServer::POA_var server_poa;
  58. bool configFromCmdline = false;
  59. int errorBootAdminService = 0;
  60. char configDir[256];
  61. char server[30];
  62. int serverid = 0;
  63. int server_sigint_handler_callback(int sig_type) {
  64. LOG4CXX_INFO(
  65. loggerAtmiBrokerServer,
  66. (char*) "SIGINT Detected: Shutting down server this may take several minutes");
  67. if (ptrServer != NULL)
  68. ptrServer->shutdown();
  69. LOG4CXX_INFO(loggerAtmiBrokerServer,
  70. (char*) "SIGINT Detected: Shutdown complete");
  71. return -1;
  72. }
  73. int serverrun() {
  74. setSpecific(TPE_KEY, TSS_TPERESET);
  75. return ptrServer->block();
  76. }
  77. int parsecmdline(int argc, char** argv) {
  78. ACE_Get_Opt getopt(argc, argv, ACE_TEXT("c:i:"));
  79. int c;
  80. int r = 0;
  81. bool isSetServerId = false;
  82. configFromCmdline = false;
  83. while ((c = getopt()) != -1) {
  84. switch ((char) c) {
  85. case 'c':
  86. configFromCmdline = true;
  87. ACE_OS::strncpy(configDir, getopt.opt_arg(), 256);
  88. break;
  89. case 'i':
  90. serverid = atoi(getopt.opt_arg());
  91. if (serverid <= 0 || serverid > 9) {
  92. r = -1;
  93. } else {
  94. isSetServerId = true;
  95. }
  96. break;
  97. default:
  98. r = -1;
  99. }
  100. }
  101. int last = getopt.opt_ind();
  102. if (r == 0 && last < argc) {
  103. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "opt_ind is " << last
  104. << ", server is " << argv[last]);
  105. ACE_OS::strncpy(server, argv[last], 30);
  106. }
  107. if (isSetServerId == false) {
  108. fprintf(stderr,
  109. "you must specify a server id with -i greater than 0 and less than 10n");
  110. r = -1;
  111. }
  112. return r;
  113. }
  114. const char* getConfiguration() {
  115. const char* dir = NULL;
  116. if (configFromCmdline) {
  117. dir = configDir;
  118. }
  119. return dir;
  120. }
  121. int serverinit(int argc, char** argv) {
  122. AtmiBrokerInitSingleton::instance();
  123. setSpecific(TPE_KEY, TSS_TPERESET);
  124. int toReturn = 0;
  125. ACE_OS::strncpy(server, "default", 30);
  126. if (argc > 0 && parsecmdline(argc, argv) != 0) {
  127. fprintf(stderr, "usage:%s [-c config] -i id [server]n", argv[0]);
  128. toReturn = -1;
  129. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  130. }
  131. if (toReturn != -1 && ptrServer == NULL) {
  132. const char* configuration = getConfiguration();
  133. if (configuration != NULL) {
  134. AtmiBrokerEnv::set_configuration(configuration);
  135. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  136. (char*) "set AtmiBrokerEnv configuration type "
  137. << configuration);
  138. }
  139. try {
  140. AtmiBrokerEnv* env = AtmiBrokerEnv::get_instance();
  141. std::stringstream sname;
  142. std::stringstream sid;
  143. sname << "BLACKTIE_SERVER_NAME=" << domain << server << serverid;
  144. sid << "BLACKTIE_SERVER_ID=" << serverid;
  145. env->putenv((char *) (sname.str().c_str()));
  146. env->putenv((char *) (sid.str().c_str()));
  147. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "serverinit called");
  148. ptrServer = new AtmiBrokerServer();
  149. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "serverInitialized=" << serverInitialized);
  150. if (!serverInitialized) {
  151. ::serverdone();
  152. toReturn = -1;
  153. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  154. } else {
  155. ptrServer->advertiseAtBootime();
  156. // install a handler for the default set of signals (namely, SIGINT and SIGTERM)
  157. (env->getSignalHandler()).addSignalHandler(
  158. server_sigint_handler_callback, true);
  159. LOG4CXX_INFO(loggerAtmiBrokerServer, (char*) "Server "
  160. << serverid << " Running");
  161. }
  162. } catch (...) {
  163. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  164. (char*) "Server startup failed");
  165. toReturn = -1;
  166. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  167. }
  168. }
  169. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "serverinit returning: "
  170. << toReturn);
  171. if (toReturn != 0) {
  172. LOG4CXX_FATAL(loggerAtmiBrokerServer, (char*) "serverinit failed");
  173. }
  174. return toReturn;
  175. }
  176. int serverdone() {
  177. setSpecific(TPE_KEY, TSS_TPERESET);
  178. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "serverdone called");
  179. if (ptrServer) {
  180. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  181. (char*) "serverdone deleting Corba server");
  182. delete ptrServer;
  183. ptrServer = NULL;
  184. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  185. (char*) "serverdone deleted Corba server");
  186. }
  187. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "serverdone returning 0");
  188. // if (configFromCmdline) {
  189. // char* toFree = ACE_OS::getenv("BLACKTIE_CONFIGURATION");
  190. // free(toFree);
  191. // }
  192. return 0;
  193. }
  194. int isadvertised(char* name) {
  195. if (ptrServer) {
  196. if (ptrServer->isAdvertised(name)) {
  197. return 0;
  198. }
  199. }
  200. return -1;
  201. }
  202. int getServiceStatus(char** str, char* svc) {
  203. if (ptrServer) {
  204. return ptrServer->getServiceStatus(str, svc);
  205. }
  206. return -1;
  207. }
  208. long getServiceMessageCounter(char* serviceName) {
  209. if (ptrServer) {
  210. return ptrServer->getServiceMessageCounter(serviceName);
  211. }
  212. return 0;
  213. }
  214. long getServiceErrorCounter(char* serviceName) {
  215. if (ptrServer) {
  216. return ptrServer->getServiceErrorCounter(serviceName);
  217. }
  218. return 0;
  219. }
  220. void getResponseTime(char* serviceName, unsigned long* min, unsigned long* avg,
  221. unsigned long* max) {
  222. if (ptrServer) {
  223. ptrServer->getResponseTime(serviceName, min, avg, max);
  224. }
  225. }
  226. int advertiseByAdmin(char* name) {
  227. if (isadvertised(name) == 0) {
  228. return 0;
  229. }
  230. if (ptrServer) {
  231. if (ptrServer->advertiseService(name)) {
  232. return 0;
  233. }
  234. }
  235. return -1;
  236. }
  237. int pauseServerByAdmin() {
  238. if (ptrServer) {
  239. return ptrServer->pause();
  240. }
  241. return -1;
  242. }
  243. int resumeServerByAdmin() {
  244. if (ptrServer) {
  245. return ptrServer->resume();
  246. }
  247. return -1;
  248. }
  249. // AtmiBrokerServer constructor
  250. //
  251. // Note: since we use virtual inheritance, we must include an
  252. // initialiser for all the virtual base class constructors that
  253. // require arguments, even those that we inherit indirectly.
  254. //
  255. AtmiBrokerServer::AtmiBrokerServer() {
  256. try {
  257. finish = new SynchronizableObject();
  258. serverName = server;
  259. isPause = false;
  260. unsigned int i;
  261. serverInfo.serverName = NULL;
  262. for (i = 0; i < servers.size(); i++) {
  263. if (strcmp(servers[i]->serverName, serverName) == 0) {
  264. serverInfo.serverName = strdup(servers[i]->serverName);
  265. // add service ADMIN
  266. char adm[XATMI_SERVICE_NAME_LENGTH + 1];
  267. ACE_OS::snprintf(adm, XATMI_SERVICE_NAME_LENGTH + 1,
  268. "%s_ADMIN_%d", server, serverid);
  269. ServiceInfo service;
  270. memset(&service, 0, sizeof(ServiceInfo));
  271. service.serviceName = strdup(adm);
  272. #ifdef WIN32
  273. service.transportLib = strdup("atmibroker-hybrid.dll");
  274. #else
  275. service.transportLib = strdup("libatmibroker-hybrid.so");
  276. #endif
  277. service.poolSize = 1;
  278. service.advertised = false;
  279. serverInfo.serviceVector.push_back(service);
  280. for (unsigned int j = 0; j < servers[i]->serviceVector.size(); j++) {
  281. ServiceInfo service;
  282. memset(&service, 0, sizeof(ServiceInfo));
  283. service.serviceName = strdup(
  284. servers[i]->serviceVector[j].serviceName);
  285. service.transportLib = strdup(
  286. servers[i]->serviceVector[j].transportLib);
  287. if (servers[i]->serviceVector[j].function_name) {
  288. service.function_name = strdup(
  289. servers[i]->serviceVector[j].function_name);
  290. } else {
  291. service.function_name = NULL;
  292. }
  293. if (servers[i]->serviceVector[j].library_name) {
  294. service.library_name = strdup(
  295. servers[i]->serviceVector[j].library_name);
  296. } else {
  297. service.library_name = NULL;
  298. }
  299. service.poolSize = servers[i]->serviceVector[j].poolSize;
  300. service.advertised
  301. = servers[i]->serviceVector[j].advertised;
  302. serverInfo.serviceVector.push_back(service);
  303. }
  304. break;
  305. }
  306. }
  307. if (i == servers.size()) {
  308. LOG4CXX_WARN(loggerAtmiBrokerServer, serverName
  309. << " has no configuration");
  310. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  311. return;
  312. }
  313. if (tx_open() != TX_OK) {
  314. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  315. LOG4CXX_ERROR(
  316. loggerAtmiBrokerServer,
  317. serverName
  318. << (char *) " transaction configuration error, aborting server startup");
  319. } else {
  320. // make ADMIN service mandatory for server
  321. char adm[XATMI_SERVICE_NAME_LENGTH + 1];
  322. ACE_OS::snprintf(adm, XATMI_SERVICE_NAME_LENGTH + 1, "%s_ADMIN_%d",
  323. server, serverid);
  324. if (!advertiseService(adm, ADMIN)) {
  325. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertise admin service failed");
  326. return;
  327. }
  328. if (errorBootAdminService == 2) {
  329. LOG4CXX_WARN(loggerAtmiBrokerServer,
  330. (char*) "Maybe the same server id running");
  331. throw std::exception();
  332. }
  333. serverInitialized = true;
  334. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  335. (char*) "server_init(): finished.");
  336. }
  337. } catch (CORBA::Exception& e) {
  338. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  339. (char*) "serverinit - Unexpected CORBA exception: "
  340. << e._name());
  341. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  342. }
  343. }
  344. // ~AtmiBrokerServer destructor.
  345. //
  346. AtmiBrokerServer::~AtmiBrokerServer() {
  347. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "destructor");
  348. server_done();
  349. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "Server done");
  350. if (finish != NULL) {
  351. delete finish;
  352. finish = NULL;
  353. }
  354. serviceData.clear();
  355. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "deleted service array");
  356. for (unsigned int i = 0; i < serverInfo.serviceVector.size(); i++) {
  357. ServiceInfo* service = &serverInfo.serviceVector[i];
  358. free(service->serviceName);
  359. free(service->transportLib);
  360. if (service->function_name != NULL) {
  361. free(service->function_name);
  362. }
  363. if (service->library_name != NULL) {
  364. free(service->library_name);
  365. }
  366. }
  367. if (serverInfo.serverName != NULL) {
  368. free(serverInfo.serverName);
  369. }
  370. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "deleting services");
  371. AtmiBrokerMem::discard_instance();
  372. txx_stop();
  373. AtmiBrokerEnv::discard_instance();
  374. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "deleted services");
  375. connections.closeConnections();
  376. serverInitialized = false;
  377. }
  378. void AtmiBrokerServer::advertiseAtBootime() {
  379. for (unsigned int i = 0; i < serverInfo.serviceVector.size(); i++) {
  380. ServiceInfo* service = &serverInfo.serviceVector[i];
  381. SVCFUNC func = NULL;
  382. bool status = false;
  383. if (service->library_name != NULL) {
  384. func = (SVCFUNC) ::lookup_symbol(service->library_name,
  385. service->function_name);
  386. if (func == NULL) {
  387. LOG4CXX_WARN(loggerAtmiBrokerServer, "can not find "
  388. << service->function_name << " in "
  389. << service->library_name);
  390. }
  391. }
  392. if (service->advertised && func != NULL) {
  393. LOG4CXX_DEBUG(loggerAtmiBrokerServer, "begin advertise "
  394. << service->serviceName);
  395. status = advertiseService((char*) service->serviceName, func);
  396. LOG4CXX_DEBUG(loggerAtmiBrokerServer, "end advertise "
  397. << service->serviceName);
  398. }
  399. updateServiceStatus(service, func, status);
  400. }
  401. }
  402. int AtmiBrokerServer::block() {
  403. int toReturn = 0;
  404. if (errorBootAdminService == 3) {
  405. LOG4CXX_INFO(loggerAtmiBrokerServer, "Domain is paused");
  406. pause();
  407. } else {
  408. LOG4CXX_INFO(loggerAtmiBrokerServer, "Server waiting for requests...");
  409. }
  410. try {
  411. finish->lock();
  412. finish->wait(0);
  413. finish->unlock();
  414. } catch (...) {
  415. LOG4CXX_ERROR(loggerAtmiBrokerServer, "Unexpected exception");
  416. toReturn = -1;
  417. }
  418. return toReturn;
  419. }
  420. void AtmiBrokerServer::shutdown() {
  421. LOG4CXX_INFO(loggerAtmiBrokerServer, "Server prepare to shutdown");
  422. // server_done(); You can't do this here as the service dispatcher will be cleaned up that is handling
  423. // the cleanup for an admin call
  424. finish->lock();
  425. finish->notify();
  426. finish->unlock();
  427. }
  428. int AtmiBrokerServer::pause() {
  429. if (!isPause) {
  430. char adm[XATMI_SERVICE_NAME_LENGTH + 1];
  431. ACE_OS::snprintf(adm, XATMI_SERVICE_NAME_LENGTH + 1, "%s_ADMIN_%d",
  432. server, serverid);
  433. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  434. != serviceData.end(); i++) {
  435. if (ACE_OS::strcmp((*i).serviceInfo->serviceName, adm) != 0) {
  436. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "pausing service"
  437. << (*i).serviceInfo->serviceName);
  438. for (std::vector<ServiceDispatcher*>::iterator j =
  439. (*i).dispatchers.begin(); j != (*i).dispatchers.end(); j++) {
  440. ServiceDispatcher* dispatcher = (*j);
  441. if (dispatcher->pause() != 0) {
  442. LOG4CXX_WARN(loggerAtmiBrokerServer,
  443. (char*) "pause service dispatcher "
  444. << dispatcher << " failed");
  445. }
  446. }
  447. }
  448. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "pause service"
  449. << (*i).serviceInfo->serviceName << " done");
  450. }
  451. isPause = true;
  452. LOG4CXX_INFO(loggerAtmiBrokerServer, (char*) "Server Pause");
  453. }
  454. return 0;
  455. }
  456. int AtmiBrokerServer::resume() {
  457. if (isPause) {
  458. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  459. != serviceData.end(); i++) {
  460. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "resuming service"
  461. << (*i).serviceInfo->serviceName);
  462. for (std::vector<ServiceDispatcher*>::iterator j =
  463. (*i).dispatchers.begin(); j != (*i).dispatchers.end(); j++) {
  464. ServiceDispatcher* dispatcher = (*j);
  465. if (dispatcher->resume() != 0) {
  466. LOG4CXX_WARN(loggerAtmiBrokerServer,
  467. (char*) "resume service dispatcher " << dispatcher
  468. << " failed");
  469. }
  470. }
  471. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "resume service"
  472. << (*i).serviceInfo->serviceName << " done");
  473. }
  474. isPause = false;
  475. LOG4CXX_INFO(loggerAtmiBrokerServer, (char*) "Server Resume");
  476. }
  477. return 0;
  478. }
  479. char *
  480. AtmiBrokerServer::getServerName() {
  481. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "getServerName");
  482. return serverName;
  483. }
  484. int AtmiBrokerServer::getServiceStatus(char** toReturn, char* svc) {
  485. int len = 0;
  486. char* str;
  487. int size = sizeof(char) * (9 + 14 + strlen(serverName) + 11 + 12 + 10);
  488. char adm[XATMI_SERVICE_NAME_LENGTH + 1];
  489. ACE_OS::snprintf(adm, XATMI_SERVICE_NAME_LENGTH + 1, "%s_ADMIN_%d", server,
  490. serverid);
  491. str = (char*) malloc(size);
  492. len += ACE_OS::sprintf(str + len, "<server>");
  493. len += ACE_OS::sprintf(str + len, "<name>%s</name>", serverName);
  494. len += ACE_OS::sprintf(str + len, "<services>");
  495. for (std::vector<ServiceStatus>::iterator i = serviceStatus.begin(); i
  496. != serviceStatus.end(); i++) {
  497. if (strcmp(adm, (*i).name) != 0 && (svc == NULL || ACE_OS::strcmp(svc,
  498. (*i).name) == 0)) {
  499. int svcsize = sizeof(char) * (50 + strlen((*i).name));
  500. size += svcsize;
  501. str = (char*) realloc(str, size);
  502. len += ACE_OS::sprintf(str + len,
  503. "<service><name>%.15s</name><status>%d</status></service>",
  504. (*i).name, isPause && (*i).status ? 2 : (*i).status);
  505. if (svc != NULL)
  506. break;
  507. }
  508. }
  509. len += ACE_OS::sprintf(str + len, "</services>");
  510. len += ACE_OS::sprintf(str + len, "</server>");
  511. *toReturn = str;
  512. return len;
  513. }
  514. void AtmiBrokerServer::updateServiceStatus(ServiceInfo* service, SVCFUNC func,
  515. bool status) {
  516. bool found = false;
  517. for (std::vector<ServiceStatus>::iterator i = serviceStatus.begin(); i
  518. != serviceStatus.end(); i++) {
  519. if (strncmp((*i).name, service->serviceName, XATMI_SERVICE_NAME_LENGTH)
  520. == 0) {
  521. (*i).func = func;
  522. (*i).status = status;
  523. found = true;
  524. break;
  525. }
  526. }
  527. if (found == false) {
  528. ServiceStatus aServiceStatus;
  529. memset(&aServiceStatus, 0, sizeof(aServiceStatus));
  530. ACE_OS::strncpy(aServiceStatus.name, service->serviceName,
  531. XATMI_SERVICE_NAME_LENGTH);
  532. aServiceStatus.func = func;
  533. aServiceStatus.status = status;
  534. serviceStatus.push_back(aServiceStatus);
  535. }
  536. }
  537. bool AtmiBrokerServer::advertiseService(char * svcname) {
  538. for (std::vector<ServiceStatus>::iterator i = serviceStatus.begin(); i
  539. != serviceStatus.end(); i++) {
  540. if (strncmp((*i).name, svcname, XATMI_SERVICE_NAME_LENGTH) == 0) {
  541. return advertiseService(svcname, (*i).func);
  542. }
  543. }
  544. LOG4CXX_WARN(
  545. loggerAtmiBrokerServer,
  546. (char*) "Could not advertise service, was not registered in btconfig.xml: "
  547. << svcname);
  548. return false;
  549. }
  550. bool AtmiBrokerServer::advertiseService(char * svcname,
  551. void(*func)(TPSVCINFO *)) {
  552. if (!svcname || strlen(svcname) == 0) {
  553. setSpecific(TPE_KEY, TSS_TPEINVAL);
  554. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertiseService invalid service name");
  555. return false;
  556. }
  557. char* serviceName = (char*) ::malloc(XATMI_SERVICE_NAME_LENGTH + 1);
  558. memset(serviceName, '', XATMI_SERVICE_NAME_LENGTH + 1);
  559. strncat(serviceName, svcname, XATMI_SERVICE_NAME_LENGTH);
  560. bool found = false;
  561. unsigned int i;
  562. ServiceInfo* service;
  563. for (i = 0; i < serverInfo.serviceVector.size(); i++) {
  564. if (strncmp(serverInfo.serviceVector[i].serviceName, serviceName,
  565. XATMI_SERVICE_NAME_LENGTH) == 0) {
  566. found = true;
  567. service = &serverInfo.serviceVector[i];
  568. break;
  569. }
  570. }
  571. if (!found) {
  572. LOG4CXX_WARN(
  573. loggerAtmiBrokerServer,
  574. (char*) "Could not advertise service, was not registered for server in btconfig.xml: "
  575. << svcname);
  576. setSpecific(TPE_KEY, TSS_TPELIMIT);
  577. free(serviceName);
  578. return false;
  579. }
  580. void (*serviceFunction)(TPSVCINFO*) = getServiceMethod(serviceName);
  581. if (serviceFunction != NULL) {
  582. if (serviceFunction == func) {
  583. free(serviceName);
  584. return true;
  585. } else {
  586. setSpecific(TPE_KEY, TSS_TPEMATCH);
  587. free(serviceName);
  588. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertiseService same service function");
  589. return false;
  590. }
  591. } else if (serviceFunction == NULL && func == NULL) {
  592. LOG4CXX_WARN(loggerAtmiBrokerServer,
  593. (char*) "Could not advertise service, no function available: "
  594. << svcname);
  595. return false;
  596. }
  597. Connection* connection = connections.getServerConnection();
  598. if (connection == NULL) {
  599. setSpecific(TPE_KEY, TSS_TPESYSTEM);
  600. free(serviceName);
  601. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertiseService no server connection");
  602. return false;
  603. }
  604. bool toReturn = false;
  605. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertiseService(): "
  606. << serviceName);
  607. // create reference for Service Queue and cache
  608. try {
  609. toReturn = createAdminDestination(serviceName);
  610. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "advertiseService status=" << toReturn);
  611. if (toReturn) {
  612. Destination* destination;
  613. destination = connection->createDestination(serviceName);
  614. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  615. (char*) "created destination: " << serviceName);
  616. addDestination(destination, func, service);
  617. updateServiceStatus(service, func, true);
  618. advertisedServices.push_back(serviceName);
  619. LOG4CXX_INFO(loggerAtmiBrokerServer, (char*) "advertised service "
  620. << serviceName);
  621. } else {
  622. free(serviceName);
  623. }
  624. } catch (CORBA::Exception& e) {
  625. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  626. (char*) "CORBA::Exception creating the destination: "
  627. << serviceName << " Exception: " << e._name());
  628. setSpecific(TPE_KEY, TSS_TPEMATCH);
  629. try {
  630. removeAdminDestination(serviceName, true);
  631. } catch (...) {
  632. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  633. (char*) "Could not remove the destination: " << serviceName);
  634. }
  635. free(serviceName);
  636. } catch (...) {
  637. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  638. (char*) "Could not create the destination: " << serviceName);
  639. setSpecific(TPE_KEY, TSS_TPEMATCH);
  640. try {
  641. removeAdminDestination(serviceName, true);
  642. } catch (...) {
  643. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  644. (char*) "Could not remove the destination: " << serviceName);
  645. }
  646. free(serviceName);
  647. }
  648. return toReturn;
  649. }
  650. void AtmiBrokerServer::unadvertiseService(char * svcname, bool decrement) {
  651. char* serviceName = (char*) ::malloc(XATMI_SERVICE_NAME_LENGTH + 1);
  652. memset(serviceName, '', XATMI_SERVICE_NAME_LENGTH + 1);
  653. strncat(serviceName, svcname, XATMI_SERVICE_NAME_LENGTH);
  654. Connection* connection = connections.getServerConnection();
  655. if (connection == NULL) {
  656. return;
  657. }
  658. // Connection* connz = connections.getServerConnection("BAR");
  659. // delete connz;
  660. for (std::vector<char*>::iterator i = advertisedServices.begin(); i
  661. != advertisedServices.end(); i++) {
  662. char* name = (*i);
  663. if (strcmp(serviceName, name) == 0) {
  664. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  665. (char*) "remove_service_queue: " << serviceName);
  666. Destination * destination = removeDestination(serviceName);
  667. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  668. (char*) "preparing to destroy" << serviceName);
  669. connection->destroyDestination(destination);
  670. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "destroyed"
  671. << serviceName);
  672. advertisedServices.erase(i);
  673. free(name);
  674. removeAdminDestination(serviceName, decrement);
  675. LOG4CXX_INFO(loggerAtmiBrokerServer,
  676. (char*) "unadvertised service " << serviceName);
  677. break;
  678. }
  679. }
  680. free(serviceName);
  681. }
  682. bool AtmiBrokerServer::createAdminDestination(char* serviceName) {
  683. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "Creating admin queue for: "
  684. << serviceName);
  685. bool isadm = false;
  686. char adm[XATMI_SERVICE_NAME_LENGTH + 1];
  687. ACE_OS::snprintf(adm, XATMI_SERVICE_NAME_LENGTH + 1, "%s_ADMIN_%d", server,
  688. serverid);
  689. if (strcmp(adm, serviceName) == 0) {
  690. isadm = true;
  691. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  692. (char*) "advertising admin service");
  693. }
  694. long commandLength;
  695. long responseLength = 1;
  696. commandLength = strlen(serverName) + strlen(serviceName) + strlen(version)
  697. + 15 + 1;
  698. char* command = (char*) ::tpalloc((char*) "X_OCTET", NULL, commandLength);
  699. char* response = (char*) ::tpalloc((char*) "X_OCTET", NULL, responseLength);
  700. memset(command, '', commandLength);
  701. sprintf(command, "tpadvertise,%s,%s,%s,", serverName, serviceName, version);
  702. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "createAdminDestination with command " << command);
  703. if (tpcall((char*) "BTStompAdmin", command, commandLength, &response,
  704. &responseLength, TPNOTRAN) != 0) {
  705. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  706. "Could not advertise service with command: " << command);
  707. tpfree(command);
  708. tpfree(response);
  709. return false;
  710. } else if (responseLength != 1) {
  711. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  712. "Service returned with unexpected response: " << response
  713. << " with length " << responseLength);
  714. tpfree(command);
  715. tpfree(response);
  716. return false;
  717. } else if (response[0] == 4) {
  718. LOG4CXX_WARN(loggerAtmiBrokerServer, (char*) "Server vresion "
  719. << version << " can not main Domain version");
  720. tpfree(command);
  721. tpfree(response);
  722. return false;
  723. } else if (response[0] == 3) {
  724. // Dispatcher needs to be paused
  725. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  726. (char*) "Created paused admin queue for: " << serviceName);
  727. tpfree(command);
  728. tpfree(response);
  729. return true;
  730. } else if (response[0] == 1) {
  731. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  732. (char*) "Created admin queue for: " << serviceName);
  733. tpfree(command);
  734. tpfree(response);
  735. return true;
  736. } else {
  737. int r = (int) response[0];
  738. char c = response[0];
  739. // REMOVED BY TOM, NOT CLEAR WHAT THIS IS REQUIRED FOR,
  740. // IF COMMENTED BACK IN, PLEASE PROVIDE A COMMENT
  741. // if (!isadm || (errorBootAdminService = response[0]) == 2) {
  742. LOG4CXX_ERROR(loggerAtmiBrokerServer, "Service returned with error: "
  743. << response[0] << " command was " << command << " r=" << r << " c=" << c);
  744. tpfree(command);
  745. tpfree(response);
  746. return false;
  747. // }
  748. }
  749. }
  750. void AtmiBrokerServer::removeAdminDestination(char* serviceName, bool decrement) {
  751. long commandLength;
  752. long responseLength = 1;
  753. char* command;
  754. if (decrement) {
  755. commandLength = strlen(serverName) + strlen(serviceName) + 21;
  756. command = (char*) ::tpalloc((char*) "X_OCTET", NULL, commandLength);
  757. sprintf(command, "decrementconsumer,%s,%s,", serverName, serviceName);
  758. } else {
  759. commandLength = strlen(serverName) + strlen(serviceName) + strlen(
  760. "tpunadvertise,,, ");
  761. command = (char*) ::tpalloc((char*) "X_OCTET", NULL, commandLength);
  762. sprintf(command, "tpunadvertise,%s,%s,", serverName, serviceName);
  763. }
  764. char* response = (char*) ::tpalloc((char*) "X_OCTET", NULL, responseLength);
  765. LOG4CXX_DEBUG(loggerAtmiBrokerServer, "Unadvertise with command: "
  766. << command);
  767. if (tpcall((char*) "BTStompAdmin", command, commandLength, &response,
  768. &responseLength, TPNOTRAN) != 0) {
  769. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  770. "Could not unadvertise service with command: " << command);
  771. } else if (responseLength != 1) {
  772. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  773. "Service returned with unexpected response: " << response
  774. << " with length " << responseLength);
  775. } else if (response[0] == 0) {
  776. LOG4CXX_ERROR(loggerAtmiBrokerServer, "Service returned with error: "
  777. << command);
  778. } else {
  779. LOG4CXX_DEBUG(loggerAtmiBrokerServer, "Unadvertise ok");
  780. }
  781. tpfree(command);
  782. tpfree(response);
  783. }
  784. bool AtmiBrokerServer::isAdvertised(char * serviceName) {
  785. bool toReturn = false;
  786. for (std::vector<char*>::iterator i = advertisedServices.begin(); i
  787. != advertisedServices.end(); i++) {
  788. if (strncmp(serviceName, (*i), XATMI_SERVICE_NAME_LENGTH) == 0) {
  789. toReturn = true;
  790. }
  791. }
  792. return toReturn;
  793. }
  794. void AtmiBrokerServer::addDestination(Destination* destination, void(*func)(
  795. TPSVCINFO *), ServiceInfo* service) {
  796. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "addDestination: "
  797. << destination->getName());
  798. ServiceData entry;
  799. entry.destination = destination;
  800. entry.func = func;
  801. entry.serviceInfo = service;
  802. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "constructor: "
  803. << destination->getName());
  804. Connection* connection = connections.getServerConnection();
  805. if (connection == NULL) {
  806. return;
  807. }
  808. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "createPool");
  809. SynchronizableObject* reconnect = new SynchronizableObject();
  810. for (int i = 0; i < entry.serviceInfo->poolSize; i++) {
  811. ServiceDispatcher* dispatcher = new ServiceDispatcher(this,
  812. destination, connection, destination->getName(), func, isPause,
  813. reconnect);
  814. if (dispatcher->activate(THR_NEW_LWP | THR_JOINABLE, 1, 0,
  815. ACE_DEFAULT_THREAD_PRIORITY, -1, 0, 0, 0, 0, 0, 0) != 0) {
  816. delete dispatcher;
  817. LOG4CXX_ERROR(loggerAtmiBrokerServer,
  818. (char*) "Could not start thread pool");
  819. } else {
  820. entry.dispatchers.push_back(dispatcher);
  821. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) " destination "
  822. << destination);
  823. }
  824. }
  825. serviceData.push_back(entry);
  826. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "added: "
  827. << destination->getName());
  828. }
  829. Destination* AtmiBrokerServer::removeDestination(const char * aServiceName) {
  830. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "removeDestination: "
  831. << aServiceName);
  832. Destination* toReturn = NULL;
  833. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  834. != serviceData.end(); i++) {
  835. if (strncmp((*i).serviceInfo->serviceName, aServiceName,
  836. XATMI_SERVICE_NAME_LENGTH) == 0) {
  837. toReturn = (*i).destination;
  838. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "removing service "
  839. << aServiceName);
  840. for (std::vector<ServiceDispatcher*>::iterator j =
  841. (*i).dispatchers.begin(); j != (*i).dispatchers.end(); j++) {
  842. ServiceDispatcher* dispatcher = (*j);
  843. dispatcher->shutdown();
  844. }
  845. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "shutdown notified "
  846. << aServiceName);
  847. for (std::vector<ServiceDispatcher*>::iterator j =
  848. (*i).dispatchers.begin(); j != (*i).dispatchers.end(); j++) {
  849. toReturn->disconnect();
  850. }
  851. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  852. (char*) "disconnect notified " << aServiceName);
  853. SynchronizableObject* reconnect = NULL;
  854. for (std::vector<ServiceDispatcher*>::iterator j =
  855. (*i).dispatchers.begin(); j != (*i).dispatchers.end();) {
  856. ServiceDispatcher* dispatcher = (*j);
  857. if (dispatcher != NULL) {
  858. LOG4CXX_TRACE(loggerAtmiBrokerServer,
  859. (char*) "Waiting for dispatcher notified "
  860. << aServiceName);
  861. dispatcher->wait();
  862. LOG4CXX_TRACE(loggerAtmiBrokerServer,
  863. (char*) "Deleting dispatcher " << aServiceName);
  864. reconnect = dispatcher->getReconnect();
  865. delete dispatcher;
  866. LOG4CXX_TRACE(loggerAtmiBrokerServer,
  867. (char*) "Dispatcher deleted " << aServiceName);
  868. } else {
  869. LOG4CXX_TRACE(loggerAtmiBrokerServer,
  870. (char*) "NULL Dispatcher detected for"
  871. << aServiceName);
  872. }
  873. LOG4CXX_TRACE(loggerAtmiBrokerServer,
  874. (char*) "Erasing dispatcher " << aServiceName);
  875. j = (*i).dispatchers.erase(j);
  876. }
  877. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  878. (char*) "waited for dispatcher: " << aServiceName);
  879. if (reconnect != NULL) {
  880. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  881. (char*) "Deleting reconnect");
  882. delete reconnect;
  883. }
  884. updateServiceStatus((*i).serviceInfo, (*i).func, false);
  885. serviceData.erase(i);
  886. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "removed: "
  887. << aServiceName);
  888. break;
  889. }
  890. }
  891. return toReturn;
  892. }
  893. void AtmiBrokerServer::server_done() {
  894. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "server_done()");
  895. for (int i = serverInfo.serviceVector.size() - 1; i >= 0; i--) {
  896. char* svcname = (char*) serverInfo.serviceVector[i].serviceName;
  897. if (isAdvertised(svcname)) {
  898. unadvertiseService(svcname, true);
  899. }
  900. }
  901. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "server_done(): returning.");
  902. }
  903. long AtmiBrokerServer::getServiceMessageCounter(char* serviceName) {
  904. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  905. != serviceData.end(); i++) {
  906. if (strncmp((*i).destination->getName(), serviceName,
  907. XATMI_SERVICE_NAME_LENGTH) == 0) {
  908. long counter = 0;
  909. for (std::vector<ServiceDispatcher*>::iterator dispatcher =
  910. (*i).dispatchers.begin(); dispatcher
  911. != (*i).dispatchers.end(); dispatcher++) {
  912. counter += (*dispatcher)->getCounter();
  913. }
  914. return counter;
  915. }
  916. }
  917. return 0;
  918. }
  919. long AtmiBrokerServer::getServiceErrorCounter(char* serviceName) {
  920. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  921. != serviceData.end(); i++) {
  922. if (strncmp((*i).destination->getName(), serviceName,
  923. XATMI_SERVICE_NAME_LENGTH) == 0) {
  924. long counter = 0;
  925. for (std::vector<ServiceDispatcher*>::iterator dispatcher =
  926. (*i).dispatchers.begin(); dispatcher
  927. != (*i).dispatchers.end(); dispatcher++) {
  928. counter += (*dispatcher)->getErrorCounter();
  929. }
  930. return counter;
  931. }
  932. }
  933. return 0;
  934. }
  935. void AtmiBrokerServer::getResponseTime(char* serviceName, unsigned long* min,
  936. unsigned long* avg, unsigned long* max) {
  937. *min = 0;
  938. *avg = 0;
  939. *max = 0;
  940. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  941. != serviceData.end(); i++) {
  942. if (strncmp((*i).destination->getName(), serviceName,
  943. XATMI_SERVICE_NAME_LENGTH) == 0) {
  944. long counter = 0;
  945. long total = 0;
  946. unsigned long min_time;
  947. unsigned long max_time;
  948. unsigned long avg_time;
  949. for (std::vector<ServiceDispatcher*>::iterator dispatcher =
  950. (*i).dispatchers.begin(); dispatcher
  951. != (*i).dispatchers.end(); dispatcher++) {
  952. counter = (*dispatcher)->getCounter();
  953. (*dispatcher)->getResponseTime(&min_time, &avg_time, &max_time);
  954. if (*min == 0 || min_time < *min) {
  955. *min = min_time;
  956. }
  957. if (total != 0 || counter != 0) {
  958. *avg = ((*avg) * total + avg_time * counter) / (total
  959. + counter);
  960. }
  961. if (max_time > *max) {
  962. *max = max_time;
  963. }
  964. }
  965. }
  966. }
  967. }
  968. void (*AtmiBrokerServer::getServiceMethod(const char * aServiceName))(TPSVCINFO *) {
  969. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "getServiceMethod: "
  970. << aServiceName);
  971. for (std::vector<ServiceData>::iterator i = serviceData.begin(); i
  972. != serviceData.end(); i++) {
  973. if (strncmp((*i).destination->getName(), aServiceName,
  974. XATMI_SERVICE_NAME_LENGTH) == 0) {
  975. LOG4CXX_DEBUG(loggerAtmiBrokerServer, (char*) "found: "
  976. << aServiceName);
  977. return (*i).func;
  978. }
  979. }
  980. LOG4CXX_DEBUG(loggerAtmiBrokerServer,
  981. (char*) "getServiceMethod out: " << aServiceName);
  982. return NULL;
  983. }