mq.cpp
资源名称:h323.zip [点击查看]
上传用户:hnnddl
上传日期:2007-01-06
资源大小:3580k
文件大小:10k
源码类别:
IP电话/视频会议
开发平台:
WINDOWS
- /*
- * $Revision: 1.8 $
- * $Date: 1998/06/08 16:57:49 $
- */
- ////////////////////////////////////////////////////////////////
- // Copyright (c) 1996-98 Lucent Technologies //
- // All Rights Reserved //
- // //
- // THIS IS UNPUBLISHED //
- // PROPRIETARY SOURCE //
- // CODE OF Lucent Technologies //
- // AND elemedia //
- // //
- ////////////////////////////////////////////////////////////////
- //
- ////////////////////////////////////////////////////////////////
- // Example programs are provided soley to demonstrate one //
- // possible use of the stack libraries and are included for //
- // instructional purposes only. You are free to use, modify //
- // and/or redistribute any portion of code in the example //
- // programs. However, such examples are not intended to //
- // represent production quality code. //
- // //
- // THE COPYRIGHT HOLDERS PROVIDE THESE EXAMPLE PROGRAMS //
- // "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER EXPRESSED //
- // OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED //
- // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A //
- // PARTICULAR PURPOSE. //
- ////////////////////////////////////////////////////////////////
- #include "exchange.h"
- #include "mq.h"
- #if (defined(WIN32))
- MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
- {
- thread_id = 0;
- thread_handle = INVALID_HANDLE_VALUE;
- }
- MessageQueue::~MessageQueue()
- {
- }
- void
- MessageQueue::QueueMessage(USER_MESSAGE *msg)
- {
- USER_MESSAGE *m = new USER_MESSAGE;
- *m = *msg;
- //LOG("MessageQueue::QueueMessage posting message for thread %dn",
- // thread_id);
- if (PostThreadMessage(thread_id, USER_MESSAGE_ID,
- 0, (unsigned int)m) == FALSE)
- {
- LOG("PostThreadMessage failed, error = %dn",GetLastError());
- }
- return;
- }
- int
- MessageQueue::DequeueMessage(USER_MESSAGE *msg)
- {
- MSG __msg;
- while(::GetMessageA(&__msg, NULL, 0, 0))
- {
- if (__msg.message == USER_MESSAGE_ID)
- {
- *msg = *(USER_MESSAGE *)__msg.lParam;
- delete (USER_MESSAGE *)__msg.lParam;
- return 1;
- }
- printf("unexpected message, msg,message = %dn", __msg.message);
- }
- return(0);
- }
- int
- MessageQueue::Start()
- {
- LOG("Creating MQ threadn");
- thread_handle = (HANDLE)_beginthreadex(
- NULL,
- 0,
- _boot,
- this,
- 0,
- (unsigned *)&thread_id);
- if (thread_handle == 0)
- {
- DWORD err = GetLastError();
- LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",err);
- return(err);
- }
- // Wait for thread_id to become valid.
- Sleep(0);
- return(0);
- }
- void
- MessageQueue::Stop()
- {
- USER_MESSAGE msg;
- if (thread_id == 0)
- {
- return;
- }
- msg.message = MQ_INTERNAL_ID;
- msg.wParam = 0xDEAD;
- msg.lParam = 0xDEAD;
- QueueMessage(&msg);
- WaitForSingleObject(thread_handle, INFINITE);
- CloseHandle(thread_handle);
- thread_handle = INVALID_HANDLE_VALUE;
- thread_id = 0;
- }
- #elif defined(VXWORKS)
- #define MAX_MSGS 4
- #define MAX_MSG_LEN sizeof(USER_MESSAGE*)
- MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
- {
- q_len = mqlen > 0 ? mqlen : MAX_MSGS;
- q_name[0] = ' ';
- if (name != NULL)
- {
- strncpy(q_name, name, 80);
- q_name[79] = ' ';
- }
- mqueue_id = NULL;
- thread_id = ERROR;
- thread_exited_flag = NULL;
- mqueue_id = msgQCreate(q_len,MAX_MSG_LEN,MSG_Q_PRIORITY);
- }
- MessageQueue::~MessageQueue()
- {
- USER_MESSAGE *m;
- // Cleanup all messages in the queue.
- while (msgQReceive(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),
- NO_WAIT) != ERROR)
- {
- delete m;
- }
- msgQDelete(mqueue_id);
- }
- void
- MessageQueue::QueueMessage(USER_MESSAGE *msg)
- {
- USER_MESSAGE *m = new USER_MESSAGE;
- *m = *msg;
- if (msgQSend(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),WAIT_FOREVER,
- MSG_PRI_NORMAL) == ERROR)
- {
- LOG("msgQsend failedn");
- }
- else
- {
- LOG("Message queuedn");
- }
- }
- int
- MessageQueue::DequeueMessage(USER_MESSAGE *msg)
- {
- USER_MESSAGE *m;
- if (msgQReceive(mqueue_id,(char*)&m,sizeof(USER_MESSAGE*),
- WAIT_FOREVER) == ERROR)
- {
- LOG("msgQReceive failedn");
- return 0;
- }
- else
- {
- *msg = *m;
- delete m;
- return 1;
- }
- }
- int
- MessageQueue::Start()
- {
- LOG("Creating MQ threadn");
- thread_exited_flag = semBCreate(SEM_Q_PRIORITY, SEM_EMPTY);
- if (thread_exited_flag == NULL)
- {
- LOG("Failed to created thread_exited_flag semaphoren");
- return errno;
- }
- thread_id = taskSpawn(q_name,
- 90,
- 0,
- 8096,
- (FUNCPTR)_boot,
- (int)this,
- 0,0,0,0,0,0,0,0,0);
- if (thread_id == ERROR)
- {
- LOG("taskSpawn failed, errno = 0x%xn",errno);
- return errno;
- }
- return(0);
- }
- void
- MessageQueue::Stop()
- {
- USER_MESSAGE msg;
- if (thread_id == 0)
- {
- return;
- }
- msg.message = MQ_INTERNAL_ID;
- msg.wParam = 0xDEAD;
- msg.lParam = 0xDEAD;
- QueueMessage(&msg);
- LOG("Waiting for mq thread to exit..n");
- if ((taskIdVerify(thread_id) != ERROR) && thread_exited_flag != NULL)
- {
- semTake(thread_exited_flag, WAIT_FOREVER);
- semDelete(thread_exited_flag);
- LOG("MQ thread exitedn");
- }
- else
- {
- LOG("Oops.. such a thread does not exist..n");
- }
- thread_id = 0;
- }
- #elif defined(__hpux)
- MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
- {
- int ret;
- thread_id = cma_c_null;
- mqueue_head = mqueue_tail = NULL;
- ret = pthread_mutex_init( &mutex,
- pthread_mutexattr_default);
- if (ret != 0)
- {
- printf("mutex_init failed ret = %dn",ret);
- }
- ret = pthread_cond_init(&mqueue_not_empty,pthread_condattr_default);
- if (ret != 0)
- {
- printf("cond_init failed ret = %dn",ret);
- }
- }
- MessageQueue::~MessageQueue()
- {
- pthread_mutex_lock(&mutex);
- while(mqueue_head != NULL)
- {
- struct mqueue_node *del = mqueue_head;
- delete del;
- mqueue_head = mqueue_head->next;
- }
- pthread_mutex_destroy(&mutex);
- pthread_cond_destroy(&mqueue_not_empty);
- }
- void
- MessageQueue::QueueMessage(USER_MESSAGE *msg)
- {
- pthread_mutex_lock(&mutex);
- if (mqueue_head == NULL)
- {
- mqueue_head = new mqueue_node;
- mqueue_tail = mqueue_head;
- }
- else
- {
- mqueue_tail->next = new mqueue_node;
- mqueue_tail = mqueue_tail->next;
- }
- mqueue_tail->msg = *msg;
- mqueue_tail->next = NULL;
- pthread_cond_signal(&mqueue_not_empty);
- pthread_mutex_unlock(&mutex);
- LOG("Message queuedn");
- }
- int
- MessageQueue::DequeueMessage(USER_MESSAGE *msg)
- {
- mqueue_node *del;
- pthread_mutex_lock(&mutex);
- if (mqueue_head == NULL)
- {
- pthread_cond_wait(&mqueue_not_empty,&mutex);
- }
- *msg = mqueue_head->msg;
- del = mqueue_head;
- if (mqueue_tail == mqueue_head)
- {
- mqueue_head = mqueue_tail = NULL;
- }
- else
- {
- mqueue_head = mqueue_head->next;
- }
- delete del;
- pthread_mutex_unlock(&mutex);
- return 1;
- }
- int
- MessageQueue::Start()
- {
- LOG("Creating MQ threadn");
- int ret = pthread_create( &thread_id,
- pthread_attr_default,
- _boot,
- this);
- //printf("pthread_create returned &thread_id of %xn",&thread_id);
- if (ret != 0)
- {
- LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",ret);
- return(ret);
- }
- return(0);
- }
- void
- MessageQueue::Stop()
- {
- USER_MESSAGE msg;
- if (pthread_equal(thread_id,cma_c_null))
- {
- return;
- }
- msg.message = MQ_INTERNAL_ID;
- msg.wParam = 0xDEAD;
- msg.lParam = 0xDEAD;
- QueueMessage(&msg);
- pthread_addr_t status;
- pthread_join(thread_id,&status);
- pthread_detach(&thread_id); // pthread_detach required on hpux
- //printf("pthread_detach with &thread_id=%xn",&thread_id);
- thread_id = cma_c_null;
- }
- #else
- MessageQueue::MessageQueue(char *name, int mqlen, unsigned long param)
- {
- int ret;
- thread_id = 0;
- mqueue_head = mqueue_tail = NULL;
- ret = mutex_init(&mutex, USYNC_THREAD, NULL);
- if (ret != 0)
- {
- printf("mutex_init failed ret = %dn",ret);
- }
- ret = cond_init(&mqueue_not_empty,USYNC_THREAD,NULL);
- if (ret != 0)
- {
- printf("cond_init failed ret = %dn",ret);
- }
- }
- MessageQueue::~MessageQueue()
- {
- mutex_lock(&mutex);
- while(mqueue_head != NULL)
- {
- struct mqueue_node *del = mqueue_head;
- delete del;
- mqueue_head = mqueue_head->next;
- }
- mutex_destroy(&mutex);
- cond_destroy(&mqueue_not_empty);
- }
- void
- MessageQueue::QueueMessage(USER_MESSAGE *msg)
- {
- mutex_lock(&mutex);
- if (mqueue_head == NULL)
- {
- mqueue_head = new mqueue_node;
- mqueue_tail = mqueue_head;
- }
- else
- {
- mqueue_tail->next = new mqueue_node;
- mqueue_tail = mqueue_tail->next;
- }
- mqueue_tail->msg = *msg;
- mqueue_tail->next = NULL;
- cond_signal(&mqueue_not_empty);
- mutex_unlock(&mutex);
- LOG("Message queuedn");
- }
- int
- MessageQueue::DequeueMessage(USER_MESSAGE *msg)
- {
- mqueue_node *del;
- mutex_lock(&mutex);
- if (mqueue_head == NULL)
- {
- cond_wait(&mqueue_not_empty,&mutex);
- }
- *msg = mqueue_head->msg;
- del = mqueue_head;
- if (mqueue_tail == mqueue_head)
- {
- mqueue_head = mqueue_tail = NULL;
- }
- else
- {
- mqueue_head = mqueue_head->next;
- }
- delete del;
- mutex_unlock(&mutex);
- return 1;
- }
- int
- MessageQueue::Start()
- {
- LOG("Creating MQ threadn");
- int ret = thr_create(NULL, 0, _boot, this,
- THR_BOUND,&thread_id);
- if (ret != 0)
- {
- LOG("ERROR: Failed to create the MQ thread, error = 0x%lxn",ret);
- return(ret);
- }
- return(0);
- }
- void
- MessageQueue::Stop()
- {
- USER_MESSAGE msg;
- if (thread_id == 0)
- {
- return;
- }
- msg.message = MQ_INTERNAL_ID;
- msg.wParam = 0xDEAD;
- msg.lParam = 0xDEAD;
- QueueMessage(&msg);
- void *status;
- thread_t who_exited;
- thr_join(thread_id,&who_exited,&status);
- thread_id = 0;
- }
- #endif
- #if (defined(WIN32))
- unsigned __stdcall
- #elif defined(VXWORKS)
- int
- #else
- void *
- #endif
- MessageQueue::_boot(void *context)
- {
- MessageQueue *mq = (MessageQueue *)context;
- LOG("Calling mq->wait_for_messagen");
- mq->wait_for_message();
- return(0);
- }
- void
- MessageQueue::wait_for_message()
- {
- USER_MESSAGE msg;
- while(1)
- {
- DequeueMessage(&msg);
- if (msg.message == MQ_INTERNAL_ID && msg.lParam == 0xDEAD &&
- msg.wParam == 0xDEAD)
- {
- #if defined(VXWORKS)
- semGive(thread_exited_flag);
- #endif
- return;
- }
- NotifyMessage(msg);
- }
- }