mpeg2t_thread.cpp
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:10k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. /*
  2.  * The contents of this file are subject to the Mozilla Public
  3.  * License Version 1.1 (the "License"); you may not use this file
  4.  * except in compliance with the License. You may obtain a copy of
  5.  * the License at http://www.mozilla.org/MPL/
  6.  * 
  7.  * Software distributed under the License is distributed on an "AS
  8.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  9.  * implied. See the License for the specific language governing
  10.  * rights and limitations under the License.
  11.  * 
  12.  * The Original Code is MPEG4IP.
  13.  * 
  14.  * The Initial Developer of the Original Code is Cisco Systems Inc.
  15.  * Portions created by Cisco Systems Inc. are
  16.  * Copyright (C) Cisco Systems Inc. 2000, 2001.  All Rights Reserved.
  17.  * 
  18.  * Contributor(s): 
  19.  *              Bill May        wmay@cisco.com
  20.  */
  21. /*
  22.  * mpeg2t_thread.c
  23.  */
  24. #include "mpeg2t_private.h"
  25. #include <rtp/rtp.h>
  26. #include <rtp/memory.h>
  27. #include <rtp/net_udp.h>
  28. #include "player_session.h"
  29. #include "player_util.h"
  30. #include "our_config_file.h"
  31. #include "media_utils.h"
  32. #ifdef _WIN32
  33. DEFINE_MESSAGE_MACRO(mpeg2t_message, "mpeg2t")
  34. #else
  35. #define mpeg2t_message(loglevel, fmt...) message(loglevel, "mpeg2t", fmt)
  36. #endif
  37. /*
  38.  * mpeg2t_decode_buffer - this is called from the mpeg2t thread.
  39.  */
  40. static void mpeg2t_decode_buffer (mpeg2t_client_t *info, 
  41.   uint8_t *buffer, 
  42.   int blen)
  43. {
  44.   mpeg2t_pid_t *pidptr;
  45.   uint32_t buflen;
  46.   uint32_t buflen_used;
  47.   buflen = blen;
  48.   do {
  49.     pidptr = mpeg2t_process_buffer(info->decoder,
  50.    buffer, 
  51.    buflen,
  52.    &buflen_used);
  53.     if (pidptr != NULL) {
  54.       // have a return - process it
  55.       switch (pidptr->pak_type) {
  56.       case MPEG2T_PAS_PAK:
  57. break;
  58.       case MPEG2T_PROG_MAP_PAK:
  59. if (info->pam_recvd_sem != NULL) {
  60.   SDL_SemPost(info->pam_recvd_sem);
  61. }
  62. break;
  63.       case MPEG2T_ES_PAK:
  64. break;
  65.       }
  66.     }
  67.     buffer += buflen_used;
  68.     buflen -= buflen_used;
  69.   } while (buflen >= 188);
  70.   if (buflen > 0) {
  71.     mpeg2t_message(LOG_ERR, "decode buffer size is not multiple of 188 - %d %d",
  72.    blen, buflen);
  73.   }
  74. }
  75. static void mpeg2t_rtp_callback (struct rtp *session, rtp_event *e)
  76. {
  77.   if (e->type == RX_RTP) {
  78.     rtp_packet *rpak;
  79.     rpak = (rtp_packet *)e->data;
  80.     mpeg2t_decode_buffer((mpeg2t_client_t *)rtp_get_userdata(session), 
  81.   rpak->rtp_data, 
  82.   rpak->rtp_data_len);
  83.     xfree(rpak);
  84.   }
  85. }
  86. /*
  87.  * mpeg2t_thread_start_cmd()
  88.  * Handle the start command - create the socket
  89.  */
  90. static int mpeg2t_thread_start_cmd (mpeg2t_client_t *info)
  91. {
  92.   int ret;
  93.   mpeg2t_message(LOG_DEBUG, "Processing start command");
  94. #ifdef _WINDOWS
  95.   WORD wVersionRequested;
  96.   WSADATA wsaData;
  97.  
  98.   wVersionRequested = MAKEWORD( 2, 0 );
  99.  
  100.   ret = WSAStartup( wVersionRequested, &wsaData );
  101.   if ( ret != 0 ) {
  102.     /* Tell the user that we couldn't find a usable */
  103.     /* WinSock DLL.*/
  104.     return (ret);
  105.   }
  106. #else
  107.   ret = 0;
  108. #endif
  109.   if (info->useRTP == FALSE) {
  110.     info->udp = udp_init(info->address, info->rx_port,
  111.  info->tx_port, info->ttl);
  112.     if (info->udp == NULL) 
  113.       return -1;
  114.     info->data_socket = udp_fd(info->udp);
  115.   } else {
  116.     info->rtp_session = rtp_init(info->address,
  117.  info->rx_port,
  118.  info->tx_port,
  119.  info->ttl,
  120.  info->rtcp_bw,
  121.  mpeg2t_rtp_callback,
  122.  (uint8_t *)info);
  123.     if (info->rtp_session == NULL) {
  124.       return -1;
  125.     }
  126.     rtp_set_option(info->rtp_session, RTP_OPT_WEAK_VALIDATION, FALSE);
  127.     rtp_set_option(info->rtp_session, RTP_OPT_PROMISC, TRUE);
  128.     info->data_socket = udp_fd(get_rtp_data_socket(info->rtp_session));
  129.     info->rtcp_socket = udp_fd(get_rtp_rtcp_socket(info->rtp_session));
  130.   }
  131.   return (ret);
  132. }
  133. /*
  134.  * mpeg2t_thread() - mpeg2t thread handler - receives and
  135.  * processes all data
  136.  */
  137. int mpeg2t_thread (void *data)
  138. {
  139.   mpeg2t_client_t *info = (mpeg2t_client_t *)data;
  140.   int ret;
  141.   int continue_thread;
  142.   uint8_t buffer[RTP_MAX_PACKET_LEN];
  143.   int buflen;
  144.   
  145.   continue_thread = 0;
  146.   mpeg2t_message(LOG_DEBUG, "thread started");
  147.   mpeg2t_thread_init_thread_info(info);
  148.   while (continue_thread == 0) {
  149.     //    mpeg2t_message(LOG_DEBUG, "thread waiting");
  150.     ret = mpeg2t_thread_wait_for_event(info);
  151.     if (ret <= 0) {
  152.       if (ret < 0) {
  153. //mpeg2t_message(LOG_ERR, "MPEG2T loop error %d errno %d", ret, errno);
  154.       } else {
  155.       }
  156.       continue;
  157.     }
  158.     /*
  159.      * See if the communications socket for IPC has any data
  160.      */
  161. //mpeg2t_message(LOG_DEBUG, "Thread checking control");
  162.     ret = mpeg2t_thread_has_control_message(info);
  163.     
  164.     if (ret) {
  165.       mpeg2t_msg_type_t msg_type;
  166.       int read;
  167.       /*
  168.        * Yes - read the message type.
  169.        */
  170.       read = mpeg2t_thread_get_control_message(info, &msg_type);
  171.       if (read == sizeof(msg_type)) {
  172. // received message
  173. mpeg2t_message(LOG_DEBUG, "Comm socket msg %d", msg_type);
  174. switch (msg_type) {
  175. case MPEG2T_MSG_QUIT:
  176.   continue_thread = 1;
  177.   break;
  178. case MPEG2T_MSG_START:
  179.   ret = mpeg2t_thread_start_cmd(info);
  180.   mpeg2t_thread_ipc_respond(info, ret);
  181.   break;
  182. default:
  183.   //   mpeg2t_message(LOG_ERR, "Unknown message %d received", msg_type);
  184.   break;
  185. }
  186.       }
  187.     }
  188.     if (info->useRTP) {
  189.       if (mpeg2t_thread_has_receive_data(info)) {
  190. rtp_recv_data(info->rtp_session, 0);
  191.       }
  192.       if (mpeg2t_thread_has_rtcp_data(info)) {
  193. buflen = udp_recv(get_rtp_rtcp_socket(info->rtp_session), 
  194.   buffer, RTP_MAX_PACKET_LEN);
  195. rtp_process_ctrl(info->rtp_session, buffer, buflen);
  196.       }
  197.       rtp_send_ctrl(info->rtp_session, 0, NULL);
  198.       rtp_update(info->rtp_session);
  199.     } else {
  200.       if (mpeg2t_thread_has_receive_data(info)) {
  201. if (info->udp != NULL) {
  202.   mpeg2t_message(LOG_DEBUG, "receiving udp data");
  203.   buflen = udp_recv(info->udp, buffer, RTP_MAX_PACKET_LEN);
  204.   mpeg2t_decode_buffer(info, buffer, buflen);
  205. }
  206.       }
  207.     }
  208.   } // end while continue_thread
  209.   SDL_Delay(10);
  210.   /*
  211.    * Okay - we've gotten a quit - we're done
  212.    */
  213.   if (info->useRTP) {
  214.     rtp_send_bye(info->rtp_session);
  215.     rtp_done(info->rtp_session);
  216.     info->rtp_session = NULL;
  217.   } else {
  218.     if (info->udp != NULL) {
  219.       udp_exit(info->udp);
  220.       info->udp = NULL;
  221.     }
  222.   }
  223.   mpeg2t_close_thread(info);
  224. #ifdef _WINDOWS
  225.   WSACleanup();
  226. #endif
  227.   return 0;
  228. }
  229. /*
  230.  * mpeg2t_create_client_for_rtp_tcp
  231.  * create threaded mpeg2t session
  232.  */
  233. mpeg2t_client_t *mpeg2t_create_client (const char *address,
  234.        in_port_t rx_port,
  235.        in_port_t tx_port,
  236.        int use_rtp,
  237.        double rtcp_bw, 
  238.        int ttl,
  239.        char *errmsg,
  240.        uint32_t errmsg_len)
  241. {
  242.   mpeg2t_client_t *info;
  243.   int ret;
  244.   mpeg2t_msg_type_t msg;
  245.   mpeg2t_msg_resp_t resp;
  246. #if 0
  247.   if (func == NULL) {
  248.     mpeg2t_message(LOG_CRIT, "Callback is NULL");
  249.     *err = EINVAL;
  250.     return NULL;
  251.   }
  252. #endif
  253.   info = MALLOC_STRUCTURE(mpeg2t_client_t);
  254.   if (info == NULL) return (NULL);
  255.   memset(info, 0, sizeof(mpeg2t_client_t));
  256.   info->address = strdup(address);
  257.   info->rx_port = rx_port;
  258.   info->tx_port = tx_port;
  259.   info->useRTP = use_rtp;
  260.   info->rtcp_bw = rtcp_bw;
  261.   info->ttl = ttl;
  262.   info->recv_timeout = 100;
  263.   info->decoder = create_mpeg2_transport();
  264.   if (info->decoder == NULL) {
  265.     mpeg2t_delete_client(info);
  266.     return (NULL);
  267.   }
  268.   info->pam_recvd_sem = SDL_CreateSemaphore(0);
  269.   info->msg_mutex = SDL_CreateMutex();
  270.   if (mpeg2t_create_thread(info) != 0) {
  271.     mpeg2t_delete_client(info);
  272.     return (NULL);
  273.   }
  274.   SDL_Delay(100);
  275.   msg = MPEG2T_MSG_START;
  276.   ret = mpeg2t_thread_ipc_send_wait(info,
  277.   (unsigned char *)&msg,
  278.   sizeof(msg),
  279.   &resp);
  280.   if (ret < 0 || resp < 0) {
  281.     mpeg2t_delete_client(info);
  282.     snprintf(errmsg, errmsg_len, "Couldn't create client - error %d", resp);
  283.     return NULL;
  284.   }
  285.   
  286.   int max = config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS);
  287.   do {
  288.     ret = SDL_SemWaitTimeout(info->pam_recvd_sem, 1000);
  289.     if (ret == SDL_MUTEX_TIMEDOUT) {
  290.       max--;
  291.       mpeg2t_message(LOG_DEBUG, "timeout - still left %d", max);
  292.     }
  293.   } while (ret == SDL_MUTEX_TIMEDOUT && max >= 0);
  294.   if (ret == SDL_MUTEX_TIMEDOUT) {
  295.     snprintf(errmsg, errmsg_len, "Did not receive Transport Stream Program Map in %d seconds", config.get_config_value(CONFIG_MPEG2T_PAM_WAIT_SECS));
  296.     mpeg2t_delete_client(info);
  297.     return NULL;
  298.   }
  299.   return (info);
  300. }
  301. void mpeg2t_delete_client (mpeg2t_client_t *info)
  302. {
  303.   mpeg2t_thread_close(info);
  304.   CHECK_AND_FREE(info, address);
  305.   free(info);
  306. }
  307. static void close_mpeg2t_client (void *data)
  308. {
  309.   mpeg2t_delete_client((mpeg2t_client_t *)data);
  310. }
  311. int create_mpeg2t_session (CPlayerSession *psptr,
  312.    const char *orig_name, 
  313.    char *errmsg, 
  314.    uint32_t errlen, 
  315.    //int have_audio_driver,
  316.    control_callback_vft_t *cc_vft)
  317. {
  318.   const char *colon, *slash, *name;
  319.   char *addr, *port;
  320.   uint32_t addrlen, portlen;
  321.   name = orig_name + strlen("mpeg2t://");
  322.   colon = strchr(name, ':');
  323.   slash = strchr(name, '/');
  324.   if (slash == NULL) slash = name + strlen(name);
  325.   if (colon == NULL || slash == NULL || colon > slash) {
  326.     snprintf(errmsg, errlen, "Misformed mpeg2 url %s", orig_name);
  327.     return -1;
  328.   }
  329.   
  330.   addrlen = colon - name;
  331.   portlen = slash - colon - 1;
  332.   addr = (char *)malloc(1 + addrlen);
  333.   port = (char *)malloc(1 + portlen);
  334.   memcpy(addr, name, addrlen);
  335.   addr[addrlen] = '';
  336.   memcpy(port, colon + 1, portlen);
  337.   port[portlen] = '';
  338.   in_port_t rxport;
  339.   char *eport;
  340.   rxport = strtoul(port, &eport, 10);
  341.   if (eport == NULL || *eport != '') {
  342.     snprintf(errmsg, errlen, "Illegal port number in url %s", orig_name);
  343.     free(addr);
  344.     free(port);
  345.    return -1;
  346.   }
  347.   free(port);
  348.   
  349.   mpeg2t_client_t *mp2t;
  350.   mp2t = mpeg2t_create_client(addr, rxport, 0, 0, 0.0, 0, errmsg, errlen);
  351.   free(addr);
  352.   if (mp2t == NULL) {
  353.     return -1;
  354.   }
  355.   psptr->set_media_close_callback(close_mpeg2t_client, mp2t);
  356.   return -1;
  357. }