cs.cxx
上传用户:xfwatch
上传日期:2020-12-14
资源大小:872k
文件大小:17k
源码类别:

中间件编程

开发平台:

Java

  1. /*
  2.  * JBoss, Home of Professional Open Source
  3.  * Copyright 2009, Red Hat, Inc., and others contributors as indicated
  4.  * by the @authors tag. All rights reserved.
  5.  * See the copyright.txt in the distribution for a
  6.  * full listing of individual contributors.
  7.  * This copyrighted material is made available to anyone wishing to use,
  8.  * modify, copy, or redistribute it subject to the terms and conditions
  9.  * of the GNU Lesser General Public License, v. 2.1.
  10.  * This program is distributed in the hope that it will be useful, but WITHOUT A
  11.  * WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
  12.  * PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
  13.  * You should have received a copy of the GNU Lesser General Public License,
  14.  * v.2.1 along with this distribution; if not, write to the Free Software
  15.  * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  16.  * MA  02110-1301, USA.
  17.  */
  18. #include "AtmiBrokerServerControl.h"
  19. #include <stdlib.h>
  20. #include "xatmi.h"
  21. #include "userlogc.h"
  22. #include "string.h"
  23. #include "ace/Thread_Manager.h"
  24. #include "ace/Thread.h"
  25. #include "ace/Synch.h"
  26. #include <stdarg.h>
  27. #include <stdio.h>
  28. #include <string.h>
  29. #ifndef WIN32
  30. #include "tx.h"
  31. #endif
  32. #ifdef WIN32 #include "atmiBrokerTxMacro.h" #define TX_OK              0   /* normal execution */ #ifdef __cplusplus extern "C" { #endif extern BLACKTIE_TX_DLL int tx_begin(void); extern BLACKTIE_TX_DLL int tx_close(void); extern BLACKTIE_TX_DLL int tx_commit(void); extern BLACKTIE_TX_DLL int tx_open(void); #ifdef __cplusplus } #endif #endif
  33. static ACE_Mutex mutex_;
  34. const char *MSG1 = "CLIENT REQUEST ";
  35. const char *MSG2 = "PAUSE - CLIENT REQUEST";
  36. static int tx = 0;
  37. static int startTx(int);
  38. static int endTx(int);
  39. // data type for controlling the work done by each thread
  40. typedef struct thr_arg {
  41. int failonerror;
  42. const char *data;
  43. const char *msg;
  44. const char *svc;
  45. const char *sndtype;
  46. const char *rcvtype;
  47. long flags;
  48. int expect;
  49. long expect2;
  50. int signum;
  51. int result;
  52. } thr_arg_t;
  53. // arbitray subtype for testing C type buffers
  54. typedef struct subtype {
  55. char data[64];
  56. int id;
  57. char op;
  58. } subtype_t;
  59. static void do_assert(int failonerror, int* res, int cond, const char *fmt, ...) {
  60. char str[1024];
  61. va_list args;
  62. va_start(args, fmt);
  63. vsnprintf(str, 1024, fmt, args);
  64. va_end(args);
  65. if (!cond) {
  66. *res = 1;
  67. userlogc((char*) "UNSUCCESSFULL assert %s", str);
  68. if (failonerror)
  69. _exit(1);
  70. }
  71. // userlogc((char*) "successful assert for: cond=%d (%s)", cond, str);
  72. }
  73. /**
  74.  * Split a name value pair of the form "name=value" returning the name and value
  75.  * as distinct strings. In addition if the value is an integral type then its long
  76.  * value representation is returned.
  77.  *
  78.  * WARNING: the input char pointer is modified and therefore must not be a constant
  79.  * char pointer.
  80.  *
  81.  * @param nvp
  82.  *  the input name value pair
  83.  * @param name
  84.  *  holds the return value for the name portion of the pair
  85.  * @param value
  86.  *  holds the return value for the value portion of the pair
  87.  * @param lvalue
  88.  *  holds the return value for the value portion of the pair as a long
  89.  *  (only valid if *value is non-zero
  90.  *
  91.  *  @return
  92.  *  zero if the value is not null
  93.  *  non-zero otherwise
  94.  */
  95. int decode_nvp(char *nvp, char** value, long* lvalue) {
  96.     char* v = strchr(nvp, '=');
  97.     *value = (v == NULL ? NULL : v + 1);
  98.     if (v != NULL) {
  99.         *v = '';
  100.         *lvalue = atol(*value);
  101. return 0;
  102.     }
  103. return 1;
  104. }
  105. //static int do_tpcall(int failonerror, const char *data, const char *msg, const char *svc, const char *sndtype, long flags, int expect) {
  106. static int do_tpcall(thr_arg_t *args) {
  107. int tpstatus = 0;
  108. char *rbuf;
  109. char *sbuf;
  110. char type[20];
  111. char subtype[20];
  112. int isSbCType = (strcmp(args->sndtype, X_C_TYPE) == 0); // is send buffer is X_C_TYPE
  113. int isRbCType = (strcmp(args->rcvtype, X_C_TYPE) == 0); // is recv buffer is X_C_TYPE
  114. long sbufsz = (isSbCType ? 0 : strlen(args->data) + 1); // api sets the buffer size if X_C_TYPE
  115. long rbufsz = (isRbCType ? 0 : 64);
  116. int res;
  117. sbuf = tpalloc((char *) args->sndtype, (isSbCType ? (char*) "sub_type" : NULL), sbufsz);
  118. rbuf = tpalloc((char *) args->rcvtype, (isRbCType ? (char*) "sub_type" : NULL), rbufsz);
  119. do_assert(args->failonerror, &args->result, sbuf != 0, "tpalloc send buf failed tperrno=%d", tperrno);
  120. do_assert(args->failonerror, &args->result, rbuf != 0, "tpalloc recv buf failed tperrno=%d", tperrno);
  121. strcpy(sbuf, args->data);
  122. memset(rbuf, 0, rbufsz);
  123. tptypes(sbuf, type, subtype);
  124. userlogc((char *) "sbuf type: %s rbuf type: %s type: %s subtype: %s %d vrs %d",
  125. args->sndtype, args->rcvtype, type, subtype, tpstatus, args->expect);
  126. if (strstr(args->data, "T8") == args->data) {
  127. userlogc((char *) "T8: startTX");
  128. if (startTx(true) != 0)
  129. do_assert(args->failonerror, &res, false, "Could not open or begin transaction: ");
  130. userlogc((char *) "T8: tpacall");
  131. int cd = tpacall((char *) args->svc, sbuf, sbufsz, args->flags);
  132. userlogc((char *) "T8: endTx");
  133. int txres = endTx(true);
  134. userlogc((char *) "T8: check assert");
  135. do_assert(args->failonerror, &res, txres != TX_OK, "commit or close transaction succeeded with active descriptors");
  136. userlogc((char *) "T8: tpgetrply");
  137. tpstatus = tpgetrply(&cd, (char **) (char **) &rbuf, &rbufsz, args->flags);
  138. userlogc((char *) "T8: finished");
  139. } else {
  140. tpstatus = tpcall((char *) args->svc, sbuf, sbufsz, (char **) &rbuf, &rbufsz, args->flags);
  141. }
  142. res = (tperrno == args->expect ? 0 : 1);
  143. if (tpstatus)
  144. userlogc((char *) "tpcall returned %d tperrno=%d expect=%d", tpstatus, tperrno, args->expect);
  145. // check that tperrno has the expected value
  146. do_assert(args->failonerror, &args->result, tperrno == args->expect,
  147. "%s: wrong response from tpcall %s %s tpstatus=%d flags=%d expect=%d tperrno=%d",
  148. args->msg, args->svc, args->sndtype, tpstatus, args->flags, args->expect, tperrno);
  149. // if there was no service error then check that the service returned the expected value
  150. if (tperrno == 0)
  151. do_assert(args->failonerror, &args->result, tpurcode == args->expect2,
  152. "tpurcode: expected=%d tpurcode=%d",
  153. args->expect2, tpurcode);
  154. tpfree(sbuf);
  155. tpfree(rbuf);
  156. return res;
  157. }
  158. // thread entry point
  159. static void* work(void *args)
  160. {
  161. (void) do_tpcall((thr_arg_t *) args);
  162. return args;
  163. }
  164. static void signal_thread(ACE_thread_t& tid, int signum)
  165. {
  166. userlogc((char*) "sleep 2 secs before sending signal %d to thread %d", signum, tid);
  167. // allow enough time for the thread to perform a tpcall request
  168. ACE_OS::sleep(2);
  169. userlogc((char*) "sending signal %d to thread %d", signum, tid);
  170. int rv1 = ACE_Thread::kill (tid, signum);
  171. userlogc((char*) "thread kill returned %d", rv1);
  172. // sending a signal to the process doesn't really test TPSIGRSTRT since the
  173. // signal is unlikely to be sent to the thread that issued the tpcall with
  174. // the TPSIGRSTRT flag set. But we test it anyway.
  175. #if 0
  176. int rv2 = ACE_OS::kill(ACE_OS::getpid(), signum);
  177. userlogc((char*) "process kill returned %d", rv2);
  178. #endif
  179. }
  180. // another thread entry point
  181. static int tcnt_ = 0;
  182. static void* work2(void *args)
  183. {
  184. thr_arg_t *params = (thr_arg_t *) args;
  185. char *s1, *s2;
  186. int ncalls = 2;
  187. int okcalls = 0;
  188. int rv;
  189. s1 = (char *) "BAR";
  190. s2 = (char *) "BAR";
  191. mutex_.acquire();
  192. tcnt_ += 1;
  193. tpfree(tpalloc((char *) params->sndtype, 0, 10));
  194. #if 0 /* I've disabled using multiple service since it fails with just one service */
  195. if (tcnt_  % 2 == 0)
  196. s2 = (char *) "TestTPCall";
  197. else
  198. s1 = (char *) "TestTPCall";
  199. #endif
  200. mutex_.release();
  201. //XXX ACE_OS::sleep(4); // yield to ensure that all threads have initialised env (see bug BLACKTIE-211)
  202. for (int i = 0; i < ncalls; i++) {
  203. params->svc = s1;
  204. if ((rv = do_tpcall(params)))
  205. userlogc((char*) "%s: tpcall %d error: %d", params->svc, i, rv);
  206. else
  207. okcalls += 1;
  208. params->svc = s2;
  209. if ((rv = do_tpcall(params)))
  210. userlogc((char*) "%s: tpcall %d error: %d", params->svc, i, rv);
  211. else
  212. okcalls += 1;
  213. }
  214. userlogc("Thread (t) finished %d out of %d calls successfuln", okcalls, ncalls * 2);
  215. params->result = ((okcalls == ncalls * 2)?0:1);
  216. return args;
  217. }
  218. static int lotsofwork(int nthreads, ACE_THR_FUNC tfunc, thr_arg_t* arg) {
  219. ACE_thread_t *tids = new ACE_thread_t[nthreads];
  220. ACE_hthread_t *handles = new ACE_hthread_t[nthreads];
  221. int i;
  222. for (i = 0; i < nthreads; i++)
  223. handles[i] = 0;
  224. // spawn nthreads threads
  225. if (ACE_Thread::spawn_n(tids, // return thread id for each thread
  226. nthreads,
  227. tfunc, // entry point for new thread
  228. (void *) arg, // args for thread entry point
  229. THR_JOINABLE | THR_NEW_LWP,
  230. ACE_DEFAULT_THREAD_PRIORITY,
  231. 0, 0, handles) != (size_t) nthreads) {
  232. userlogc("Unable to start request number of threadsn");
  233. }
  234. if (arg->signum > 0)
  235. signal_thread(tids[0], arg->signum);
  236. for (int i = 0; i < nthreads; i++)
  237. if (handles[i] != 0)
  238. ACE_Thread::join(handles[i]);
  239. return arg->result;
  240. }
  241. // XsdValidator is not thread safe
  242. static int bug211() {
  243. thr_arg_t args = {1, MSG1, "bug211: two threads reading env", "BAR", X_OCTET, X_OCTET, 0, 0, 99, 0};
  244. return lotsofwork(2, ACE_THR_FUNC(&work), &args);
  245. }
  246. // tpcall should return TPEINVAL if the service name is invalid
  247. static int bug213() {
  248. thr_arg_t args = {1, MSG1, "bug213: NULL service name", NULL, X_OCTET, X_OCTET, 0, TPEINVAL, 0, 0};
  249. return do_tpcall(&args);
  250. }
  251. // tpcall incorrectly returns TPNOTIME whenever the TPNOBLOCK or TPNOTIME flags are specified
  252. static int bug212a() {
  253. // Specifying TPNOTIME means the caller wants to be imune to blocking conditions (such
  254. // as no output buffers). Thus if such a condition does not exist the call should succeed as normal.
  255. // However if bug 212 is present then the call returns TPNOTIME
  256. long flags2 = TPNOTRAN | TPNOTIME;
  257. thr_arg_t arg1 = {1, MSG1, "bug212a: TPNOTIME", "BAR", X_OCTET, X_OCTET, flags2, 0, 99, 0};
  258. return lotsofwork(1, ACE_THR_FUNC(&work), &arg1);
  259. }
  260. static int bug212b() {
  261. // Similarly specifying TPNOBLOCK means that if a blocking condition does exist then the caller
  262. // should get the error TPEBLOCK
  263. // However if bug 212 is present then the call returns TPNOTIME
  264. long flags3 = TPNOTRAN | TPNOBLOCK;
  265. thr_arg_t args = {1, MSG1, "bug212b: TPNOBLOCK", "BAR", X_OCTET, X_OCTET, flags3, 0, 99, 0};
  266. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  267. }
  268. // TPSIGRSTRT flag isn't supported on tpcall
  269. static int bug214() {
  270. thr_arg_t args = {1, MSG1, "bug214: TPSIGRSTRT flag not supported on tpcall", "BAR", X_OCTET, X_OCTET, TPSIGRSTRT, 0, 99, 0};
  271. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  272. }
  273. // tpcall failure with multiple threads
  274. static int bug215() {
  275. thr_arg_t args = {0, MSG1, "bug215: tpcall failure with lots of threads", "BAR", X_OCTET, X_OCTET, 0, 0, 99, 0};
  276. return lotsofwork(2, ACE_THR_FUNC(&work2), &args);
  277. }
  278. static int bug216a() {
  279. thr_arg_t args = {1, MSG1, "bug216: tp bufs should morph if they're the wrong type", "BAR", X_OCTET, X_C_TYPE, 0, 0, 99, 0};
  280. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  281. }
  282. static int bug216b() {
  283. thr_arg_t args = {1, MSG1, "bug216: passing the wrong return buffer type with TPNOCHANGE",
  284. "BAR", X_OCTET, X_C_TYPE, TPNOCHANGE, TPEOTYPE, 99, 0};
  285. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  286. }
  287. static int bug217() {
  288. thr_arg_t args = {1, MSG1, "bug217: make sure tpurcode works", "BAR", X_OCTET, X_OCTET, 0, 0, 99, 0};
  289. (void) lotsofwork(1, ACE_THR_FUNC(&work), &args);
  290. return args.result;
  291. }
  292. static int t9001() {
  293. char *buf = (char *) tpalloc((char *) X_C_TYPE, (char*) "sub_type", 10);
  294. int res = 0;
  295. do_assert(1, &res, buf != 0, "tpalloc with X_C_TYPE and non-zero len: tperrno=%d (spec says size is optional)", tperrno);
  296. tpfree(buf);
  297. return res;
  298. }
  299. // sanity check
  300. static int t0() {
  301. thr_arg_t args = {1, MSG1, "ok test", "BAR", X_OCTET, X_OCTET, 0, 0, 99, 0};
  302. return do_tpcall(&args);
  303. }
  304. // tell the server to set a flag on tpreturn (should generate TPESVCERR)
  305. static int t1() {
  306. thr_arg_t args = {1, "T1", "set flag on tpreturn should fail", "BAR", X_OCTET, X_OCTET, TPNOTRAN, TPESVCERR, 0, 0};
  307. return do_tpcall(&args);
  308. }
  309. static int t2() {
  310. thr_arg_t args = {1, "T2", "tell the service to free the the service buffer", "BAR", X_OCTET, X_OCTET, TPNOTRAN, 0, 99, 0};
  311. return do_tpcall(&args);
  312. }
  313. // telling the service to not tpreturn should generate an error
  314. static int t3() {
  315. thr_arg_t args = {1, "T3", "no tpreturn", "BAR", X_OCTET, X_OCTET, 0, TPESVCERR, 0, 0};
  316. return do_tpcall(&args);
  317. }
  318. // telling service to call tpreturn outside service routine should have no effect
  319. static int t4() {
  320. thr_arg_t args = {1, "T4", "tpreturn outside service routing", "BAR", X_OCTET, X_OCTET, 0, 0, 99, 0};
  321. return do_tpcall(&args);
  322. }
  323. static int t5() {
  324. thr_arg_t args = {1, "T5", "tpreturn TPFAIL", "BAR", X_OCTET, X_OCTET, 0, TPESVCFAIL, 99, 0};
  325. return do_tpcall(&args);
  326. }
  327. static int t6() {
  328. // TODO this test is for tpcall. Add tests for other xatmi API calls (tpacall etc_)
  329. thr_arg_t args = {1, "T6=4", "set TPSIGRSTRT flag and send a signal", "BAR", X_OCTET, X_OCTET, TPSIGRSTRT, 0, 99, SIGALRM};
  330. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  331. }
  332. static int t7() {
  333. thr_arg_t args = {1, "T6=4", "do not set TPSIGRSTRT flag and send a signal", "BAR", X_OCTET, X_OCTET, 0, TPGOTSIG, 99, SIGALRM};
  334. return lotsofwork(1, ACE_THR_FUNC(&work), &args);
  335. }
  336. static int t8() {
  337. thr_arg_t args = {1, "T8", "commit tx with active descriptors", "BAR", X_OCTET, X_OCTET, 0, TPEBADDESC, 99, 0};
  338. return do_tpcall(&args);
  339. }
  340. static int startTx(int enable) {
  341. if (enable && (tx_open() != TX_OK || tx_begin() != TX_OK))
  342. return 1;
  343. return 0;
  344. }
  345. static int endTx(int enable) {
  346. if (enable && (tx_commit() != TX_OK || tx_close() != TX_OK))
  347. return 1;
  348. return 0;
  349. }
  350. int run_client(int argc, char **argv) {
  351. int res = 0;
  352. int bug = 217;
  353. if (argc > 1)
  354. bug = atoi(argv[1]);
  355. userlogc((char*) "starting test %d", bug);
  356. if (startTx(tx) != 0)
  357. userlogc((char*) "ERROR - Could not open or begin transaction: ");
  358. else {
  359. switch (bug) {
  360. case 211: res = bug211(); break;
  361. case 2120: res = bug212a(); break;
  362. case 2121: res = bug212b(); break;
  363. case 213: res = bug213(); break;
  364. case 214: res = bug214(); break;
  365. case 215: res = bug215(); break;
  366. case 2160: res = bug216a(); break;
  367. case 2161: res = bug216b(); break;
  368. case 217: res = bug217(); break;
  369. case 9001: res = t9001(); break;
  370. case 0: res = t0(); break;
  371. case 1: res = t1(); break;
  372. case 2: res = t2(); break;
  373. case 3: res = t3(); break;
  374. case 4: res = t4(); break;
  375. case 5: res = t5(); break;
  376. case 6: res = t6(); break;
  377. case 7: res = t7(); break;
  378. case 8: res = t8(); break;
  379. default: break;
  380. }
  381. if (endTx(tx) != 0)
  382. userlogc((char*) "ERROR - Could not commit or close transaction: ");
  383. }
  384. userlogc((char*) "test %d %s with code %d", bug, (res == 0 ? "passed" : "failed"), res);
  385. return res;
  386. }
  387. void BAR (TPSVCINFO *);
  388. void TestTPCall (TPSVCINFO *);
  389. void BAR(TPSVCINFO * svcinfo) {
  390. char* buffer;
  391. int sendlen = 15;
  392. long rflag = 0L;
  393. int rval = TPSUCCESS;
  394. char *arg;
  395. long larg;
  396. userlogc((char*) "bar called  - svc=%s data=%s len=%d flags=%d rcode=%d tperrno=%d",
  397. svcinfo->name, svcinfo->data, svcinfo->len, svcinfo->flags, 99, tperrno);
  398. decode_nvp(svcinfo->data, &arg, &larg);
  399. if (strcmp(svcinfo->data, "T1") == 0) {
  400. rflag = TPEBLOCK;
  401. } else if (strcmp(svcinfo->data, "T2") == 0) {
  402. tpfree(svcinfo->data);
  403. } else if (strcmp(svcinfo->data, "T5") == 0) {
  404. rval = TPFAIL;
  405. } else if (strcmp(svcinfo->data, "T6") == 0 && arg != NULL) {
  406. userlogc((char*) "bar sleeping for %d seconds", larg);
  407. ACE_OS::sleep(larg);
  408. }
  409. buffer = tpalloc((char *) "X_OCTET", 0, sendlen);
  410. strcpy(buffer, "BAR SAYS HELLO");
  411. if (strcmp(svcinfo->data, "T3") != 0)
  412. tpreturn(rval, 99, buffer, sendlen, rflag);
  413. if (strcmp(svcinfo->data, "T4") == 0)
  414. tpreturn(TPFAIL, 99, buffer, sendlen, rflag);
  415. if (tperrno)
  416. userlogc((char*) "bar returned: tperrno=%d", tperrno);
  417. }
  418. void TestTPCall(TPSVCINFO * svcinfo) {
  419. BAR(svcinfo);
  420. }
  421. /* the byte pattern written to file descriptor 1 to indicate that the server has advertised its services */
  422. static const unsigned char HANDSHAKE[] = {83,69,82,86,73,67,69,83,32,82,69,65,68,89};
  423. static const ssize_t HANDSHAKE_LEN = 14;
  424. int run_server(int argc, char **argv) {
  425. int exit_status = serverinit(argc, argv);
  426. if (exit_status != -1) {
  427. tpadvertise((char *) "BAR", BAR);
  428. tpadvertise((char *) "TestTPCall", TestTPCall);
  429. if (write(1, HANDSHAKE, HANDSHAKE_LEN) != HANDSHAKE_LEN) {
  430. return -1;
  431. }
  432. /* flush stdout */
  433. fprintf(stdout, "n");
  434. exit_status = serverrun();
  435. } else {
  436. userlogc((char*) "main Unexpected exception in serverrun()");
  437. }
  438. userlogc((char*) "Test Server: calling serverdone()");
  439. serverdone();
  440. userlogc((char*) "Test Server: returning status %d", exit_status);
  441. return exit_status;
  442. }
  443. int main(int argc, char **argv) {
  444. int i;
  445.     for (i = 0; i < argc; i++) {
  446.         if (strcmp(argv[i], "-i") == 0)
  447.             return run_server(argc, argv);
  448.     }
  449.     return run_client(argc, argv);
  450. }