mq.cpp
上传用户:hnnddl
上传日期:2007-01-06
资源大小:3580k
文件大小:10k
源码类别:

IP电话/视频会议

开发平台:

WINDOWS

  1. /*
  2.  * $Revision: 1.8 $
  3.  * $Date: 1998/06/08 16:57:49 $
  4.  */
  5. ////////////////////////////////////////////////////////////////
  6. //               Copyright (c) 1996-98 Lucent Technologies    //
  7. //                       All Rights Reserved                  //
  8. //                                                            //
  9. //                       THIS IS UNPUBLISHED                  //
  10. //                       PROPRIETARY SOURCE                   //
  11. //                   CODE OF Lucent Technologies              //
  12. // AND elemedia   //
  13. //                                                            //
  14. ////////////////////////////////////////////////////////////////
  15. //
  16. ////////////////////////////////////////////////////////////////
  17. // Example programs are provided soley to demonstrate one     //
  18. // possible use of the stack libraries and are included for   //
  19. // instructional purposes only.  You are free to use, modify  //
  20. // and/or redistribute any portion of code in the example     //
  21. // programs.  However, such examples are not intended to      //
  22. // represent production quality code.                         //
  23. //                                                            //
  24. // THE COPYRIGHT HOLDERS PROVIDE THESE EXAMPLE PROGRAMS       //
  25. // "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED     //
  26. // OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED     //
  27. // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A            //
  28. // PARTICULAR PURPOSE.                                        //
  29. ////////////////////////////////////////////////////////////////
  30. #include "exchange.h"
  31. #include "mq.h"
  32. #if (defined(WIN32))
  33. MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
  34. {
  35. thread_id = 0;
  36. thread_handle = INVALID_HANDLE_VALUE;
  37. }
  38. MessageQueue::~MessageQueue()
  39. {
  40. }
  41. void
  42. MessageQueue::QueueMessage(USER_MESSAGE *msg)
  43. {
  44. USER_MESSAGE *m = new USER_MESSAGE;
  45. *m = *msg;
  46. //LOG("MessageQueue::QueueMessage posting message for thread %dn",
  47. // thread_id);
  48. if (PostThreadMessage(thread_id, USER_MESSAGE_ID,
  49. 0, (unsigned int)m) == FALSE)
  50. {
  51. LOG("PostThreadMessage failed, error = %dn",GetLastError());
  52. }
  53. return;
  54. }
  55. int
  56. MessageQueue::DequeueMessage(USER_MESSAGE *msg)
  57. {
  58. MSG __msg;
  59. while(::GetMessageA(&__msg, NULL, 0, 0))
  60. {
  61. if (__msg.message == USER_MESSAGE_ID)
  62. {
  63. *msg = *(USER_MESSAGE *)__msg.lParam; 
  64. delete (USER_MESSAGE *)__msg.lParam;
  65. return 1;
  66. }
  67. printf("unexpected message, msg,message = %dn", __msg.message);
  68. }
  69. return(0);
  70. }
  71. int
  72. MessageQueue::Start()
  73. {
  74. LOG("Creating MQ threadn");
  75. thread_handle = (HANDLE)_beginthreadex(
  76. NULL,
  77. 0,
  78. _boot,
  79. this,
  80. 0,
  81. (unsigned *)&thread_id);
  82. if (thread_handle == 0)
  83. {
  84. DWORD err = GetLastError();
  85. LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",err);
  86. return(err);
  87. }
  88. // Wait for  thread_id to become valid.
  89. Sleep(0);
  90. return(0);
  91. }
  92. void
  93. MessageQueue::Stop()
  94. {
  95. USER_MESSAGE msg;
  96. if (thread_id == 0)
  97. {
  98. return;
  99. }
  100. msg.message = MQ_INTERNAL_ID;
  101. msg.wParam = 0xDEAD;
  102. msg.lParam = 0xDEAD;
  103. QueueMessage(&msg);
  104. WaitForSingleObject(thread_handle, INFINITE);
  105. CloseHandle(thread_handle);
  106. thread_handle = INVALID_HANDLE_VALUE;
  107. thread_id = 0;
  108. }
  109. #elif defined(VXWORKS)
  110. #define MAX_MSGS 4
  111. #define MAX_MSG_LEN sizeof(USER_MESSAGE*)
  112. MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
  113. {
  114. q_len = mqlen > 0 ? mqlen : MAX_MSGS;
  115. q_name[0] = '';
  116. if (name != NULL)
  117. {
  118. strncpy(q_name, name, 80);
  119. q_name[79] = '';
  120. }
  121. mqueue_id = NULL;
  122. thread_id = ERROR;
  123. thread_exited_flag = NULL;
  124. mqueue_id = msgQCreate(q_len,MAX_MSG_LEN,MSG_Q_PRIORITY);
  125. }
  126. MessageQueue::~MessageQueue()
  127. {
  128. USER_MESSAGE *m;
  129. // Cleanup all messages in the queue.
  130. while (msgQReceive(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),
  131. NO_WAIT) != ERROR)
  132. {
  133. delete m;
  134. }
  135. msgQDelete(mqueue_id);
  136. }
  137. void
  138. MessageQueue::QueueMessage(USER_MESSAGE *msg)
  139. {
  140. USER_MESSAGE *m = new USER_MESSAGE;
  141. *m = *msg;
  142. if (msgQSend(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),WAIT_FOREVER,
  143. MSG_PRI_NORMAL) == ERROR)
  144. {
  145. LOG("msgQsend failedn");
  146. }
  147. else
  148. {
  149. LOG("Message queuedn");
  150. }
  151. }
  152. int
  153. MessageQueue::DequeueMessage(USER_MESSAGE *msg)
  154. {
  155. USER_MESSAGE *m;
  156. if (msgQReceive(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),
  157. WAIT_FOREVER) == ERROR)
  158. {
  159. LOG("msgQReceive failedn");
  160. return 0;
  161. }
  162. else
  163. {
  164. *msg = *m;
  165. delete m;
  166. return 1;
  167. }
  168. }
  169. int
  170. MessageQueue::Start()
  171. {
  172. LOG("Creating MQ threadn");
  173. thread_exited_flag = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
  174. if (thread_exited_flag == NULL)
  175. {
  176. LOG("Failed to created thread_exited_flag semaphoren");
  177. return errno;
  178. }
  179. thread_id = taskSpawn(q_name,
  180. 90,
  181. 0,
  182. 8096,
  183. (FUNCPTR)_boot,
  184. (int)this,
  185. 0,0,0,0,0,0,0,0,0);
  186. if (thread_id == ERROR)
  187. {
  188. LOG("taskSpawn failed, errno = 0x%xn",errno);
  189. return errno;
  190. }
  191. return(0);
  192. }
  193. void
  194. MessageQueue::Stop()
  195. {
  196. USER_MESSAGE msg;
  197. if (thread_id == 0)
  198. {
  199. return;
  200. }
  201. msg.message = MQ_INTERNAL_ID;
  202. msg.wParam = 0xDEAD;
  203. msg.lParam = 0xDEAD;
  204. QueueMessage(&msg);
  205. LOG("Waiting for mq thread to exit..n");
  206. if ((taskIdVerify(thread_id) != ERROR) && thread_exited_flag != NULL)
  207. {
  208. semTake(thread_exited_flag, WAIT_FOREVER);
  209. semDelete(thread_exited_flag);
  210. LOG("MQ thread exitedn");
  211. }
  212. else
  213. {
  214. LOG("Oops.. such a thread does not exist..n");
  215. }
  216. thread_id = 0;
  217. }
  218. #elif defined(__hpux)
  219. MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
  220. {
  221. int ret;
  222. thread_id = cma_c_null;
  223. mqueue_head = mqueue_tail = NULL;
  224. ret = pthread_mutex_init( &mutex,
  225. pthread_mutexattr_default);
  226. if (ret != 0)
  227. {
  228. printf("mutex_init failed ret = %dn",ret);
  229. }
  230. ret = pthread_cond_init(&mqueue_not_empty,pthread_condattr_default);
  231. if (ret != 0)
  232. {
  233. printf("cond_init failed ret = %dn",ret);
  234. }
  235. }
  236. MessageQueue::~MessageQueue()
  237. {
  238. pthread_mutex_lock(&mutex);
  239. while(mqueue_head != NULL)
  240. {
  241. struct mqueue_node *del = mqueue_head;
  242. delete del;
  243. mqueue_head = mqueue_head->next;
  244. }
  245. pthread_mutex_destroy(&mutex);
  246. pthread_cond_destroy(&mqueue_not_empty);
  247. }
  248. void
  249. MessageQueue::QueueMessage(USER_MESSAGE *msg)
  250. {
  251. pthread_mutex_lock(&mutex);
  252. if (mqueue_head == NULL)
  253. {
  254. mqueue_head = new mqueue_node;
  255. mqueue_tail = mqueue_head;
  256. }
  257. else
  258. {
  259. mqueue_tail->next = new mqueue_node;
  260. mqueue_tail = mqueue_tail->next;
  261. }
  262. mqueue_tail->msg = *msg;
  263. mqueue_tail->next = NULL;
  264. pthread_cond_signal(&mqueue_not_empty);
  265. pthread_mutex_unlock(&mutex);
  266. LOG("Message queuedn");
  267. }
  268. int
  269. MessageQueue::DequeueMessage(USER_MESSAGE *msg)
  270. {
  271. mqueue_node *del;
  272. pthread_mutex_lock(&mutex);
  273. if (mqueue_head == NULL)
  274. {
  275. pthread_cond_wait(&mqueue_not_empty,&mutex);
  276. }
  277. *msg = mqueue_head->msg;
  278. del = mqueue_head;
  279. if (mqueue_tail == mqueue_head)
  280. {
  281. mqueue_head = mqueue_tail = NULL;
  282. }
  283. else
  284. {
  285. mqueue_head = mqueue_head->next;
  286. }
  287. delete del;
  288. pthread_mutex_unlock(&mutex);
  289. return 1;
  290. }
  291. int
  292. MessageQueue::Start()
  293. {
  294. LOG("Creating MQ threadn");
  295. int ret = pthread_create(  &thread_id, 
  296. pthread_attr_default,
  297. _boot,
  298. this);
  299. //printf("pthread_create returned &thread_id of %xn",&thread_id);
  300. if (ret != 0)
  301. {
  302. LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",ret);
  303. return(ret);
  304. }
  305. return(0);
  306. }
  307. void
  308. MessageQueue::Stop()
  309. {
  310. USER_MESSAGE msg;
  311. if (pthread_equal(thread_id,cma_c_null))
  312. {
  313. return;
  314. }
  315. msg.message = MQ_INTERNAL_ID;
  316. msg.wParam = 0xDEAD;
  317. msg.lParam = 0xDEAD;
  318. QueueMessage(&msg);
  319. pthread_addr_t status;
  320. pthread_join(thread_id,&status);
  321. pthread_detach(&thread_id); // pthread_detach required on hpux
  322. //printf("pthread_detach with &thread_id=%xn",&thread_id);
  323. thread_id = cma_c_null;
  324. }
  325. #else
  326. MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
  327. {
  328. int ret;
  329. thread_id = 0;
  330. mqueue_head = mqueue_tail = NULL;
  331. ret = mutex_init(&mutex, USYNC_THREAD, NULL);
  332. if (ret != 0)
  333. {
  334. printf("mutex_init failed ret = %dn",ret);
  335. }
  336. ret = cond_init(&mqueue_not_empty,USYNC_THREAD,NULL);
  337. if (ret != 0)
  338. {
  339. printf("cond_init failed ret = %dn",ret);
  340. }
  341. }
  342. MessageQueue::~MessageQueue()
  343. {
  344. mutex_lock(&mutex);
  345. while(mqueue_head != NULL)
  346. {
  347. struct mqueue_node *del = mqueue_head;
  348. delete del;
  349. mqueue_head = mqueue_head->next;
  350. }
  351. mutex_destroy(&mutex);
  352. cond_destroy(&mqueue_not_empty);
  353. }
  354. void
  355. MessageQueue::QueueMessage(USER_MESSAGE *msg)
  356. {
  357. mutex_lock(&mutex);
  358. if (mqueue_head == NULL)
  359. {
  360. mqueue_head = new mqueue_node;
  361. mqueue_tail = mqueue_head;
  362. }
  363. else
  364. {
  365. mqueue_tail->next = new mqueue_node;
  366. mqueue_tail = mqueue_tail->next;
  367. }
  368. mqueue_tail->msg = *msg;
  369. mqueue_tail->next = NULL;
  370. cond_signal(&mqueue_not_empty);
  371. mutex_unlock(&mutex);
  372. LOG("Message queuedn");
  373. }
  374. int
  375. MessageQueue::DequeueMessage(USER_MESSAGE *msg)
  376. {
  377. mqueue_node *del;
  378. mutex_lock(&mutex);
  379. if (mqueue_head == NULL)
  380. {
  381. cond_wait(&mqueue_not_empty,&mutex);
  382. }
  383. *msg = mqueue_head->msg;
  384. del = mqueue_head;
  385. if (mqueue_tail == mqueue_head)
  386. {
  387. mqueue_head = mqueue_tail = NULL;
  388. }
  389. else
  390. {
  391. mqueue_head = mqueue_head->next;
  392. }
  393. delete del;
  394. mutex_unlock(&mutex);
  395. return 1;
  396. }
  397. int
  398. MessageQueue::Start()
  399. {
  400. LOG("Creating MQ threadn");
  401. int ret = thr_create(NULL, 0, _boot, this,
  402. THR_BOUND,&thread_id);
  403. if (ret != 0)
  404. {
  405. LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",ret);
  406. return(ret);
  407. }
  408. return(0);
  409. }
  410. void
  411. MessageQueue::Stop()
  412. {
  413. USER_MESSAGE msg;
  414. if (thread_id == 0)
  415. {
  416. return;
  417. }
  418. msg.message = MQ_INTERNAL_ID;
  419. msg.wParam = 0xDEAD;
  420. msg.lParam = 0xDEAD;
  421. QueueMessage(&msg);
  422. void *status;
  423. thread_t who_exited;
  424. thr_join(thread_id,&who_exited,&status); 
  425. thread_id = 0;
  426. }
  427. #endif
  428. #if (defined(WIN32))
  429. unsigned __stdcall
  430. #elif defined(VXWORKS)
  431. int
  432. #else
  433. void *
  434. #endif
  435. MessageQueue::_boot(void *context)
  436. {
  437. MessageQueue *mq = (MessageQueue *)context;
  438. LOG("Calling mq->wait_for_messagen");
  439. mq->wait_for_message();
  440. return(0);
  441. }
  442. void
  443. MessageQueue::wait_for_message()
  444. {
  445. USER_MESSAGE  msg;
  446. while(1)
  447. {
  448. DequeueMessage(&msg);
  449. if (msg.message == MQ_INTERNAL_ID && msg.lParam == 0xDEAD &&
  450. msg.wParam == 0xDEAD)
  451. {
  452. #if defined(VXWORKS)
  453. semGive(thread_exited_flag);
  454. #endif
  455. return;
  456. }
  457. NotifyMessage(msg);
  458. }
  459. }