mpeg2t_thread_nx.cpp
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:8k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. #include "mpeg2t_private.h"
  2. #include "mpeg2t_thread_nx.h"
  3. #include "player_util.h"
  4. #undef HAVE_SOCKETPAIR
  5. #define COMM_SOCKET_THREAD info->m_thread_info->comm_socket_write_to
  6. #define COMM_SOCKET_CALLER info->m_thread_info->comm_socket[1]
  7. #ifdef _WIN32
  8. DEFINE_MESSAGE_MACRO(mpeg2t_message, "mpeg2t")
  9. #else
  10. #define mpeg2t_message(loglevel, fmt...) message(loglevel, "mpeg2t", fmt)
  11. #endif
  12. /*
  13.  * mpeg2t_thread_ipc_respond
  14.  * respond with the message given
  15.  */
  16. int mpeg2t_thread_ipc_respond (mpeg2t_client_t *info, int msg)
  17. {
  18.   size_t ret;
  19.   mpeg2t_message(LOG_DEBUG, "Sending resp to thread %d", msg);
  20.   ret = send(COMM_SOCKET_THREAD, (char *)&msg, sizeof(int), 0);
  21.   return ret == sizeof(int);
  22. }
  23. int mpeg2t_thread_ipc_receive (mpeg2t_client_t *info, char *msg, size_t msg_len)
  24. {
  25.   int ret;
  26.   
  27.   ret = recv(COMM_SOCKET_THREAD, msg, msg_len, 0);
  28.   return (ret);
  29. }
  30. void mpeg2t_thread_init_thread_info (mpeg2t_client_t *info)
  31. {
  32. #ifndef HAVE_SOCKETPAIR
  33.   struct sockaddr_un our_name, his_name;
  34.   socklen_t len;
  35.   int ret;
  36.   info->m_thread_info->comm_socket[0] = socket(AF_UNIX, SOCK_STREAM, 0);
  37.   memset(&our_name, 0, sizeof(our_name));
  38.   our_name.sun_family = AF_UNIX;
  39.   strcpy(our_name.sun_path, info->m_thread_info->socket_name);
  40.   ret = bind(info->m_thread_info->comm_socket[0], 
  41.      (struct sockaddr *)&our_name, sizeof(our_name));
  42.   if (listen(info->m_thread_info->comm_socket[0], 1) < 0) {
  43.     mpeg2t_message(LOG_ERR, "Listen failure %d", errno);
  44.   }
  45.   len = sizeof(his_name);
  46.   COMM_SOCKET_THREAD = accept(info->m_thread_info->comm_socket[0],
  47.       (struct sockaddr *)&his_name,
  48.       &len);
  49.   mpeg2t_message(LOG_DEBUG, "return from accept %d", COMM_SOCKET_THREAD);
  50. #else
  51.   COMM_SOCKET_THREAD = info->m_thread_info->comm_socket[0];
  52. #endif
  53.   mpeg2t_message(LOG_DEBUG, "mpeg2t_thread running");
  54. }
  55. int mpeg2t_thread_wait_for_event (mpeg2t_client_t *info)
  56. {
  57.   mpeg2t_thread_info_t *tinfo = info->m_thread_info;
  58.   int count;
  59.   int ret;
  60.   #ifdef HAVE_POLL
  61. #define DATA_SOCKET_HAS_DATA ((tinfo->pollit[1].revents & (POLLIN | POLLPRI)) != 0)
  62. #define RTCP_SOCKET_HAS_DATA ((tinfo->pollit[2].revents & (POLLIN | POLLPRI)) != 0)
  63. #define COMM_SOCKET_HAS_DATA   ((tinfo->pollit[0].revents & (POLLIN | POLLPRI)) != 0)
  64.     tinfo->pollit[0].fd = COMM_SOCKET_THREAD;
  65.     tinfo->pollit[0].events = POLLIN | POLLPRI;
  66.     tinfo->pollit[0].revents = 0;
  67.     tinfo->pollit[1].fd = info->data_socket;
  68.     tinfo->pollit[1].events = POLLIN | POLLPRI;
  69.     tinfo->pollit[1].revents = 0;
  70.     tinfo->pollit[2].fd = info->rtcp_socket;
  71.     tinfo->pollit[2].events = POLLIN | POLLPRI;
  72.     tinfo->pollit[2].revents = 0;
  73.     count = info->data_socket == 0 ? 1 : info->useRTP ? 3 : 2;
  74.     mpeg2t_message(LOG_DEBUG, "start poll");
  75.     ret = poll(tinfo->pollit,count, info->recv_timeout);
  76.     mpeg2t_message(LOG_DEBUG, "poll ret %d - count %d", ret, count);
  77. #else
  78. #define DATA_SOCKET_HAS_DATA (FD_ISSET(info->data_socket, &tinfo->read_set))
  79. #define RTCP_SOCKET_HAS_DATA (FD_ISSET(info->rtcp_socket, &tinfo->read_set))
  80. #define COMM_SOCKET_HAS_DATA   (FD_ISSET(COMM_SOCKET_THREAD, &tinfo->read_set))
  81.     int max_fd;
  82.     struct timeval timeout;
  83.     FD_ZERO(&tinfo->read_set);
  84.     max_fd = COMM_SOCKET_THREAD;
  85.     if (info->data_socket > 0) {
  86.       FD_SET(info->data_socket, &tinfo->read_set);
  87.       max_fd = MAX(info->data_socket, max_fd);
  88.       if (info->useRTP && info->rtcp_socket > 0) {
  89. FD_SET(info->rtcp_socket, &tinfo->read_set);
  90. max_fd = MAX(info->rtcp_socket, max_fd);
  91.     }
  92.     FD_SET(COMM_SOCKET_THREAD, &tinfo->read_set);
  93.     timeout.tv_sec = info->recv_timeout / 1000;
  94.     timeout.tv_usec = (info->recv_timeout % 1000) * 1000;
  95.     ret = select(max_fd + 1, &tinfo->read_set, NULL, NULL, &timeout);
  96. #endif
  97.     return ret;
  98. }
  99. int mpeg2t_thread_has_control_message (mpeg2t_client_t *info)
  100. {
  101.   mpeg2t_thread_info_t *tinfo = info->m_thread_info;
  102.   return COMM_SOCKET_HAS_DATA;
  103. }
  104. int mpeg2t_thread_has_receive_data (mpeg2t_client_t *info)
  105. {
  106.   mpeg2t_thread_info_t *tinfo = info->m_thread_info;
  107.   return DATA_SOCKET_HAS_DATA;
  108. }
  109. int mpeg2t_thread_has_rtcp_data (mpeg2t_client_t *info)
  110. {
  111.   mpeg2t_thread_info_t *tinfo = info->m_thread_info;
  112.   return RTCP_SOCKET_HAS_DATA;
  113. }
  114. int mpeg2t_thread_get_control_message (mpeg2t_client_t *info,
  115.      mpeg2t_msg_type_t *msg)
  116. {
  117.   return recv(COMM_SOCKET_THREAD, (char *)msg, sizeof(mpeg2t_msg_type_t), 0);
  118. }
  119. void mpeg2t_thread_close (mpeg2t_client_t *rptr)
  120. {
  121.     uint32_t msg = MPEG2T_MSG_QUIT;
  122.     mpeg2t_thread_info_t *info;
  123.     
  124.     mpeg2t_thread_ipc_send(rptr, (unsigned char *)&msg, sizeof(msg));
  125.     SDL_WaitThread(rptr->thread, NULL);
  126.     info = rptr->m_thread_info;
  127.     
  128.     closesocket(info->comm_socket[0]);
  129.     closesocket(info->comm_socket[1]);
  130.     free(info);
  131.     rptr->m_thread_info = NULL;
  132.     rptr->thread = NULL;
  133. }
  134. /*
  135.  * mpeg2t_create_thread - create the thread we need, along with the
  136.  * communications socket.
  137.  */
  138. int mpeg2t_create_thread (mpeg2t_client_t *info)
  139. {
  140.   mpeg2t_thread_info_t *tinfo;
  141. #ifndef HAVE_SOCKETPAIR
  142.   int ret;
  143.   struct sockaddr_un addr;
  144. #endif
  145.   tinfo = info->m_thread_info =
  146.     (mpeg2t_thread_info_t *)malloc(sizeof(mpeg2t_thread_info_t));
  147.   if (tinfo == NULL) return -1;
  148.   
  149.   tinfo->comm_socket[0] = -1;
  150.   tinfo->comm_socket[1] = -1;
  151.   tinfo->comm_socket_write_to = -1;
  152. #ifdef HAVE_SOCKETPAIR
  153.   if (socketpair(PF_UNIX, SOCK_STREAM, 0, tinfo->comm_socket) < 0) {
  154.     mpeg2t_message(LOG_CRIT, "Couldn't create comm sockets - errno %d", errno);
  155.     return -1;
  156.   }
  157.   mpeg2t_message(LOG_DEBUG, "values are %d %d", tinfo->comm_socket[0],
  158.  tinfo->comm_socket[1]);
  159. #else
  160.   COMM_SOCKET_THREAD = -1;
  161.   COMM_SOCKET_CALLER = socket(AF_UNIX, SOCK_STREAM, 0);
  162.   snprintf(tinfo->socket_name, sizeof(tinfo->socket_name) - 1, "MPEG2TCLIENT%p", tinfo);
  163.   unlink(tinfo->socket_name);
  164. #endif
  165.   info->thread = SDL_CreateThread(mpeg2t_thread, info);
  166.   if (info->thread == NULL) {
  167.     mpeg2t_message(LOG_CRIT, "Couldn't create comm thread");
  168.     return -1;
  169.   }
  170. #ifndef HAVE_SOCKETPAIR
  171.   addr.sun_family = AF_UNIX;
  172.   strcpy(addr.sun_path, tinfo->socket_name);
  173.   ret = -1;
  174.   do {
  175.     ret = connect(COMM_SOCKET_CALLER, (struct sockaddr *)&addr, sizeof(addr));
  176.     if (ret == -1)
  177.       SDL_Delay(10);
  178.   } while (ret < 0);
  179. #endif
  180.   return 0;
  181. }
  182. /*
  183.  * mpeg2t_thread_ipc_send - send message to mpeg2t thread
  184.  */
  185. int mpeg2t_thread_ipc_send (mpeg2t_client_t *info,
  186.   unsigned char *msg,
  187.   int len)
  188. {
  189.   int ret;
  190.   mpeg2t_message(LOG_DEBUG, "Sending msg to thread %d - len %d", msg[0], len);
  191.   ret = send(COMM_SOCKET_CALLER, msg, len, 0);
  192.   return ret == len;
  193. }
  194. /*
  195.  * mpeg2t_thread_ipc_send_wait
  196.  * send a message, and wait for response
  197.  * returns number of bytes we've received.
  198.  */
  199. int mpeg2t_thread_ipc_send_wait (mpeg2t_client_t *info,
  200.        unsigned char *msg,
  201.        int msg_len,
  202.        int *return_msg)
  203. {
  204.   int read, ret;
  205. #ifdef HAVE_POLL
  206.   struct pollfd pollit;
  207. #else
  208.   fd_set read_set;
  209.   struct timeval timeout;
  210. #endif
  211.   mpeg2t_message(LOG_DEBUG, "Send-wait msg to thread %d - len %d",
  212.      *(mpeg2t_msg_type_t *)msg, msg_len);
  213.   SDL_LockMutex(info->msg_mutex);
  214.   ret = send(COMM_SOCKET_CALLER, msg, msg_len, 0);
  215.   if (ret != msg_len) {
  216.     SDL_UnlockMutex(info->msg_mutex);
  217.     return -1;
  218.   }
  219. #ifdef HAVE_POLL
  220.   pollit.fd = COMM_SOCKET_CALLER;
  221.   pollit.events = POLLIN | POLLPRI;
  222.   pollit.revents = 0;
  223.   ret = poll(&pollit, 1, 30 * 1000);
  224.   mpeg2t_message(LOG_DEBUG, "return comm socket value %x", pollit.revents);
  225. #else
  226.   FD_ZERO(&read_set);
  227.   FD_SET(COMM_SOCKET_CALLER, &read_set);
  228.   timeout.tv_sec = 30;
  229.   timeout.tv_usec = 0;
  230.   ret = select(COMM_SOCKET_CALLER + 1, &read_set, NULL, NULL, &timeout);
  231. #endif
  232.   if (ret <= 0) {
  233.     if (ret < 0) {
  234.       //mpeg2t_message(LOG_ERR, "MPEG2T loop error %d errno %d", ret, errno);
  235.     }
  236.     SDL_UnlockMutex(info->msg_mutex);
  237.     return -1;
  238.   }
  239.   read = recv(COMM_SOCKET_CALLER, return_msg, sizeof(int), 0);
  240.   SDL_UnlockMutex(info->msg_mutex);
  241.   mpeg2t_message(LOG_DEBUG, "comm socket got return value of %d", read);
  242.   return (read);
  243. }
  244. void mpeg2t_close_thread (mpeg2t_client_t *info)
  245. {
  246. #ifndef HAVE_SOCKETPAIR
  247.   closesocket(info->m_thread_info->comm_socket_write_to);
  248.   info->m_thread_info->comm_socket_write_to = -1;
  249.   unlink(info->m_thread_info->socket_name);
  250. #endif
  251. }