rtsp_thread_nx.c
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:7k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. #include "rtsp_private.h"
  2. #include "rtsp_thread_nx.h"
  3. #define COMM_SOCKET_THREAD info->m_thread_info->comm_socket_write_to
  4. #define COMM_SOCKET_CALLER info->m_thread_info->comm_socket[1]
  5. /*
  6.  * rtsp_thread_ipc_respond
  7.  * respond with the message given
  8.  */
  9. int rtsp_thread_ipc_respond (rtsp_client_t *info, int msg)
  10. {
  11.   size_t ret;
  12.   rtsp_debug(LOG_DEBUG, "Sending resp to thread %d", msg);
  13.   ret = send(COMM_SOCKET_THREAD, (char *)&msg, sizeof(int), 0);
  14.   return ret == sizeof(int);
  15. }
  16. int rtsp_thread_ipc_receive (rtsp_client_t *info, char *msg, size_t msg_len)
  17. {
  18.   int ret;
  19.   
  20.   ret = recv(COMM_SOCKET_THREAD, msg, msg_len, 0);
  21.   return (ret);
  22. }
  23. void rtsp_thread_init_thread_info (rtsp_client_t *info)
  24. {
  25. #ifndef HAVE_SOCKETPAIR
  26.   struct sockaddr_un our_name, his_name;
  27.   int len;
  28.   int ret;
  29.   COMM_SOCKET_THREAD = socket(AF_UNIX, SOCK_STREAM, 0);
  30.   memset(&our_name, 0, sizeof(our_name));
  31.   our_name.sun_family = AF_UNIX;
  32.   strcpy(our_name.sun_path, info->m_thread_info->socket_name);
  33.   ret = bind(COMM_SOCKET_THREAD, (struct sockaddr *)&our_name, sizeof(our_name));
  34.   listen(COMM_SOCKET_THREAD, 1);
  35.   len = sizeof(his_name);
  36.   COMM_SOCKET_THREAD = accept(COMM_SOCKET_THREAD,
  37.       (struct sockaddr *)&his_name,
  38.       &len);
  39. #else
  40.   COMM_SOCKET_THREAD = info->m_thread_info->comm_socket[0];
  41. #endif
  42.   rtsp_debug(LOG_DEBUG, "rtsp_thread running");
  43. }
  44. int rtsp_thread_wait_for_event (rtsp_client_t *info)
  45. {
  46.   rtsp_thread_info_t *tinfo = info->m_thread_info;
  47.   int ret;
  48.   #ifdef HAVE_POLL
  49. #define SERVER_SOCKET_HAS_DATA ((tinfo->pollit[1].revents & (POLLIN | POLLPRI)) != 0)
  50. #define COMM_SOCKET_HAS_DATA   ((tinfo->pollit[0].revents & (POLLIN | POLLPRI)) != 0)
  51.     tinfo->pollit[0].fd = COMM_SOCKET_THREAD;
  52.     tinfo->pollit[0].events = POLLIN | POLLPRI;
  53.     tinfo->pollit[0].revents = 0;
  54.     tinfo->pollit[1].fd = info->server_socket;
  55.     tinfo->pollit[1].events = POLLIN | POLLPRI;
  56.     tinfo->pollit[1].revents = 0;
  57.     ret = poll(tinfo->pollit,
  58.        info->server_socket == -1 ? 1 : 2,
  59.        info->recv_timeout);
  60.     //rtsp_debug(LOG_DEBUG, "poll ret %d", ret);
  61. #else
  62. #define SERVER_SOCKET_HAS_DATA (FD_ISSET(info->server_socket, &tinfo->read_set))
  63. #define COMM_SOCKET_HAS_DATA   (FD_ISSET(COMM_SOCKET_THREAD, &tinfo->read_set))
  64.     int max_fd;
  65.     struct timeval timeout;
  66.     FD_ZERO(&tinfo->read_set);
  67.     max_fd = COMM_SOCKET_THREAD;
  68.     if (info->server_socket != -1) {
  69.       FD_SET(info->server_socket, &tinfo->read_set);
  70.       max_fd = MAX(info->server_socket, max_fd);
  71.     }
  72.     FD_SET(COMM_SOCKET_THREAD, &tinfo->read_set);
  73.     timeout.tv_sec = info->recv_timeout / 1000;
  74.     timeout.tv_usec = (info->recv_timeout % 1000) * 1000;
  75.     ret = select(max_fd + 1, &tinfo->read_set, NULL, NULL, &timeout);
  76. #endif
  77.     return ret;
  78. }
  79. int rtsp_thread_has_control_message (rtsp_client_t *info)
  80. {
  81.   rtsp_thread_info_t *tinfo = info->m_thread_info;
  82.   return COMM_SOCKET_HAS_DATA;
  83. }
  84. int rtsp_thread_has_receive_data (rtsp_client_t *info)
  85. {
  86.   rtsp_thread_info_t *tinfo = info->m_thread_info;
  87.   return SERVER_SOCKET_HAS_DATA;
  88. }
  89. int rtsp_thread_get_control_message (rtsp_client_t *info,
  90.      rtsp_msg_type_t *msg)
  91. {
  92.   return recv(COMM_SOCKET_THREAD, (char *)msg, sizeof(rtsp_msg_type_t), 0);
  93. }
  94. void rtsp_close_thread (rtsp_client_t *rptr)
  95. {
  96.     uint32_t msg = RTSP_MSG_QUIT;
  97.     rtsp_thread_info_t *info;
  98.     
  99.     rtsp_thread_ipc_send(rptr, (unsigned char *)&msg, sizeof(msg));
  100.     SDL_WaitThread(rptr->thread, NULL);
  101.     info = rptr->m_thread_info;
  102.     
  103.     closesocket(info->comm_socket[0]);
  104.     closesocket(info->comm_socket[1]);
  105.     free(info);
  106.     rptr->m_thread_info = NULL;
  107.     rptr->thread = NULL;
  108. }
  109. /*
  110.  * rtsp_create_thread - create the thread we need, along with the
  111.  * communications socket.
  112.  */
  113. int rtsp_create_thread (rtsp_client_t *info)
  114. {
  115.   rtsp_thread_info_t *tinfo;
  116. #ifndef HAVE_SOCKETPAIR
  117.   int ret;
  118.   struct sockaddr_un addr;
  119. #endif
  120.   tinfo = info->m_thread_info =
  121.     (rtsp_thread_info_t *)malloc(sizeof(rtsp_thread_info_t));
  122.   if (tinfo == NULL) return -1;
  123.   
  124.   tinfo->comm_socket[0] = -1;
  125.   tinfo->comm_socket[1] = -1;
  126.   tinfo->comm_socket_write_to = -1;
  127. #ifdef HAVE_SOCKETPAIR
  128.   if (socketpair(PF_UNIX, SOCK_STREAM, 0, tinfo->comm_socket) < 0) {
  129.     rtsp_debug(LOG_CRIT, "Couldn't create comm sockets - errno %d", errno);
  130.     return -1;
  131.   }
  132. #else
  133.   COMM_SOCKET_THREAD = -1;
  134.   COMM_SOCKET_CALLER = socket(AF_UNIX, SOCK_STREAM, 0);
  135.   snprintf(tinfo->socket_name, sizeof(tinfo->socket_name) - 1, "RTSPCLIENT%p", tinfo);
  136.   unlink(tinfo->socket_name);
  137. #endif
  138.   info->thread = SDL_CreateThread(rtsp_thread, info);
  139.   if (info->thread == NULL) {
  140.     rtsp_debug(LOG_CRIT, "Couldn't create comm thread");
  141.     return -1;
  142.   }
  143. #ifndef HAVE_SOCKETPAIR
  144.   addr.sun_family = AF_UNIX;
  145.   strcpy(addr.sun_path, tinfo->socket_name);
  146.   ret = -1;
  147.   do {
  148.     ret = connect(COMM_SOCKET_CALLER, (struct sockaddr *)&addr, sizeof(addr));
  149.     if (ret == -1)
  150.       SDL_Delay(10);
  151.   } while (ret < 0);
  152. #endif
  153.   return 0;
  154. }
  155. /*
  156.  * rtsp_thread_ipc_send - send message to rtsp thread
  157.  */
  158. int rtsp_thread_ipc_send (rtsp_client_t *info,
  159.   unsigned char *msg,
  160.   int len)
  161. {
  162.   int ret;
  163.   rtsp_debug(LOG_DEBUG, "Sending msg to thread %d - len %d", msg[0], len);
  164.   ret = send(COMM_SOCKET_CALLER, msg, len, 0);
  165.   return ret == len;
  166. }
  167. /*
  168.  * rtsp_thread_ipc_send_wait
  169.  * send a message, and wait for response
  170.  * returns number of bytes we've received.
  171.  */
  172. int rtsp_thread_ipc_send_wait (rtsp_client_t *info,
  173.        unsigned char *msg,
  174.        int msg_len,
  175.        int *return_msg)
  176. {
  177.   int read, ret;
  178. #ifdef HAVE_POLL
  179.   struct pollfd pollit;
  180. #else
  181.   fd_set read_set;
  182.   struct timeval timeout;
  183. #endif
  184.   rtsp_debug(LOG_DEBUG, "Send-wait msg to thread %d - len %d",
  185.      *(rtsp_msg_type_t *)msg, msg_len);
  186.   SDL_LockMutex(info->msg_mutex);
  187.   ret = send(COMM_SOCKET_CALLER, msg, msg_len, 0);
  188.   if (ret != msg_len) {
  189.     SDL_UnlockMutex(info->msg_mutex);
  190.     return -1;
  191.   }
  192. #ifdef HAVE_POLL
  193.   pollit.fd = COMM_SOCKET_CALLER;
  194.   pollit.events = POLLIN | POLLPRI;
  195.   pollit.revents = 0;
  196.   ret = poll(&pollit, 1, 30 * 1000);
  197.   rtsp_debug(LOG_DEBUG, "return comm socket value %x", pollit.revents);
  198. #else
  199.   FD_ZERO(&read_set);
  200.   FD_SET(COMM_SOCKET_CALLER, &read_set);
  201.   timeout.tv_sec = 30;
  202.   timeout.tv_usec = 0;
  203.   ret = select(COMM_SOCKET_CALLER + 1, &read_set, NULL, NULL, &timeout);
  204. #endif
  205.   if (ret <= 0) {
  206.     if (ret < 0) {
  207.       //rtsp_debug(LOG_ERR, "RTSP loop error %d errno %d", ret, errno);
  208.     }
  209.     SDL_UnlockMutex(info->msg_mutex);
  210.     return -1;
  211.   }
  212.   read = recv(COMM_SOCKET_CALLER, return_msg, sizeof(int), 0);
  213.   SDL_UnlockMutex(info->msg_mutex);
  214.   rtsp_debug(LOG_DEBUG, "comm socket got return value of %d", read);
  215.   return (read);
  216. }
  217. void rtsp_thread_close (rtsp_client_t *info)
  218. {
  219.   rtsp_close_socket(info);
  220. #ifndef HAVE_SOCKETPAIR
  221.   closesocket(info->m_thread_info->comm_socket_write_to);
  222.   info->m_thread_info->comm_socket_write_to = -1;
  223.   unlink(info->m_thread_info->socket_name);
  224. #endif
  225. }