rtsp_thread.c
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:17k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. /*
  2.  * The contents of this file are subject to the Mozilla Public
  3.  * License Version 1.1 (the "License"); you may not use this file
  4.  * except in compliance with the License. You may obtain a copy of
  5.  * the License at http://www.mozilla.org/MPL/
  6.  * 
  7.  * Software distributed under the License is distributed on an "AS
  8.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  9.  * implied. See the License for the specific language governing
  10.  * rights and limitations under the License.
  11.  * 
  12.  * The Original Code is MPEG4IP.
  13.  * 
  14.  * The Initial Developer of the Original Code is Cisco Systems Inc.
  15.  * Portions created by Cisco Systems Inc. are
  16.  * Copyright (C) Cisco Systems Inc. 2000, 2001.  All Rights Reserved.
  17.  * 
  18.  * Contributor(s): 
  19.  *              Bill May        wmay@cisco.com
  20.  */
  21. /*
  22.  * rtsp_thread.c
  23.  */
  24. #include "rtsp_private.h"
  25. #include <rtp/rtp.h>
  26. #include <rtp/memory.h>
  27. typedef enum rtsp_rtp_state_t {
  28.   RTP_DATA_UNKNOWN = 0,
  29.   RTP_DATA_START = 1,
  30.   RTP_DATA_CONTINUE = 2,
  31.   RTSP_HEADER_CHECK = 3,
  32.   RTP_HEADER_CHECK = 4,
  33. } rtsp_rtp_state_t;
  34. #ifdef DEBUG_RTP_STATES
  35. static const char *states[] = {
  36.   "DATA UNKNOWN",
  37.   "DATA START",
  38.   "DATA CONTINUE",
  39.   "RTSP HEADER CHECK",
  40.   "RTP HEADER CHECK"
  41. };
  42. #endif
  43. typedef struct rtp_state_t {
  44.   rtp_packet *rtp_ptr;
  45.   rtsp_rtp_state_t state;
  46.   int try_periodic;
  47.   int receiving_rtsp_response;
  48.   unsigned short rtp_len, rtp_len_gotten;
  49.   unsigned char header[3];
  50. } rtp_state_t;
  51. /*
  52.  * rtsp_thread_start_cmd()
  53.  * Handle the start command - create the socket
  54.  */
  55. static int rtsp_thread_start_cmd (rtsp_client_t *info)
  56. {
  57.   int ret;
  58. #ifdef _WINDOWS
  59.   WORD wVersionRequested;
  60.   WSADATA wsaData;
  61.  
  62.   wVersionRequested = MAKEWORD( 2, 0 );
  63.  
  64.   ret = WSAStartup( wVersionRequested, &wsaData );
  65.   if ( ret != 0 ) {
  66.     /* Tell the user that we couldn't find a usable */
  67.     /* WinSock DLL.*/
  68.     return (ret);
  69.   }
  70. #endif
  71.   ret = rtsp_create_socket(info);
  72.   return (ret);
  73. }
  74. /*
  75.  * rtsp_thread_send_and_get()
  76.  * send a command and get the response
  77.  */
  78. static int rtsp_thread_send_and_get (rtsp_client_t *info)
  79. {
  80.   rtsp_msg_send_and_get_t msg;
  81.   int ret;
  82.   ret = rtsp_thread_ipc_receive(info, (char *)&msg, sizeof(msg));
  83.   
  84.   if (ret != sizeof(msg)) {
  85.     rtsp_debug(LOG_DEBUG, "Send and get - recv %d", ret);
  86.     return -1;
  87.   }
  88.   ret = rtsp_send(info, msg.buffer, msg.buflen);
  89.   rtsp_debug(LOG_DEBUG, "Send and get - send %d", ret);
  90.   return ret;
  91. }
  92. static int rtsp_msg_thread_perform_callback (rtsp_client_t *info)
  93. {
  94.   rtsp_msg_callback_t callback;
  95.   int ret;
  96.   uint32_t cbs;
  97.   cbs = sizeof(callback);
  98.   ret = rtsp_thread_ipc_receive(info, (char *)&callback, cbs);
  99.   if (ret != cbs) {
  100.     rtsp_debug(LOG_ERR, "Perform callback msg - recvd %d instead of %u",
  101.        ret, cbs); 
  102.     return -1;
  103.   }
  104.   return ((callback.func)(callback.ud));
  105. }
  106. static int rtsp_msg_thread_set_rtp_callback (rtsp_client_t *info)
  107. {
  108.   rtsp_msg_rtp_callback_t callback;
  109.   int interleave_num;
  110.   int ret;
  111.   uint32_t cbs = sizeof(callback);
  112.   rtsp_debug(LOG_DEBUG, "In rtsp_msg_thread_set_rtp_callback");
  113.   ret = rtsp_thread_ipc_receive(info, (char *)&callback, cbs);
  114.   if (ret != cbs) {
  115.     rtsp_debug(LOG_ERR, "Perform callback msg - recvd %d instead of %u",
  116.        ret, cbs);
  117.     return -1;
  118.   }
  119.   interleave_num = callback.interleave;
  120.   if (interleave_num >= MAX_RTP_THREAD_SESSIONS) {
  121.     return -1;
  122.   }
  123.   if (info->m_callback[interleave_num].rtp_callback_set != 0) {
  124.     return -2;
  125.   }
  126.   info->m_callback[interleave_num].rtp_callback_set = 1;
  127.   info->m_callback[interleave_num].rtp_callback = callback.callback_func;
  128.   info->m_callback[interleave_num].rtp_periodic = callback.periodic_func;
  129.   info->m_callback[interleave_num].rtp_userdata = callback.ud;
  130.   return 0;
  131. }
  132. int rtsp_thread_send_rtcp (rtsp_client_t *info,
  133.    int interleave,
  134.    uint8_t *buffer,
  135.    int buflen)
  136. {
  137.   int ret;
  138.   uint8_t buff[4];
  139.   buff[0] = '$';
  140.   buff[1] = (interleave * 2) + 1;
  141.   buff[2] = (buflen >> 8) & 0xff;
  142.   buff[3] = (buflen & 0xff);
  143.   ret = send(info->server_socket, buff, 4, 0);
  144.   ret += send(info->server_socket, buffer, buflen, 0);
  145.   return ret;
  146. }
  147. static void callback_for_rtp_packet (rtsp_client_t *info,
  148.      unsigned char interleave,
  149.      rtp_packet *rtp_ptr,
  150.      unsigned short rtp_len)
  151. {
  152.   unsigned char  which = interleave;
  153.   interleave /= 2;
  154.   if (info->m_callback[interleave].rtp_callback_set) {
  155.     (info->m_callback[interleave].rtp_callback)(info->m_callback[interleave].rtp_userdata,
  156. which,
  157. rtp_ptr,
  158. rtp_len);
  159.   } else {
  160.     xfree(rtp_ptr);
  161.   }
  162. }
  163. /****************************************************************************
  164.  *
  165.  * Thread RTP data receive state machine
  166.  *
  167.  ****************************************************************************/
  168. static __inline void change_state (rtp_state_t *state, rtsp_rtp_state_t s)
  169. {
  170.   if (state->state != s) {
  171.     state->state = s;
  172.     //rtsp_debug(LOG_DEBUG, "changing state to %d %s", s,states[s]);
  173.   }
  174. }
  175. static void move_end_of_buffer (rtsp_client_t *info, int bytes)
  176. {
  177.   memmove(info->m_resp_buffer,
  178.   &info->m_resp_buffer[info->m_offset_on],
  179.   bytes);
  180.   info->m_offset_on = 0;
  181.   info->m_buffer_len = bytes;
  182. }
  183. static int get_rtp_packet (rtsp_client_t *info, rtp_state_t *state)
  184. {
  185.   int ret;
  186.   if (state->rtp_ptr == NULL) {
  187.     //rtsp_debug(LOG_DEBUG, "PAK %d %d", state->header[0], state->rtp_len);
  188.     state->rtp_ptr =
  189.       (rtp_packet *)xmalloc(state->rtp_len + RTP_PACKET_HEADER_SIZE);
  190.     state->rtp_len_gotten = 0;
  191.   }
  192.   ret = 
  193.     rtsp_recv(info,
  194.       ((char *)state->rtp_ptr) + RTP_PACKET_HEADER_SIZE + state->rtp_len_gotten,
  195.       state->rtp_len - state->rtp_len_gotten);
  196.   //rtsp_debug(LOG_DEBUG, "recv %d", ret);
  197.   state->rtp_len_gotten += ret;
  198.   if (state->rtp_len_gotten == state->rtp_len) {
  199.     callback_for_rtp_packet(info,
  200.     state->header[0],
  201.     state->rtp_ptr,
  202.     state->rtp_len);
  203.     state->rtp_ptr = NULL;
  204.     state->rtp_len = 0;
  205.     state->rtp_len_gotten = 0;
  206.     state->try_periodic = 1;
  207.     change_state(state, RTP_DATA_START);
  208.     return 0;
  209.   } 
  210.   change_state(state, RTP_DATA_CONTINUE);
  211.   return 1;
  212. }
  213. static const char *rtsp_cmp = "rtsp/1.0";
  214. static int rtsp_get_resp (rtsp_client_t *info,
  215.   rtp_state_t *state)
  216. {
  217.   int ret;
  218.   ret = rtsp_get_response(info);
  219.   if (ret == RTSP_RESPONSE_RECV_ERROR) {
  220.     change_state(state, RTP_DATA_UNKNOWN);
  221.   } else {
  222.     change_state(state, RTP_DATA_START);
  223.     if (state->receiving_rtsp_response != 0) {
  224.       /*
  225.        * Are they waiting ?  If so, pop them the return value
  226.        */
  227.       rtsp_thread_ipc_respond(info, ret);
  228.       state->receiving_rtsp_response = 0;
  229.     }
  230.   }
  231.   return 1;
  232. }
  233.   
  234. static int check_rtsp_resp (rtsp_client_t *info,
  235.     rtp_state_t *state)
  236. {
  237.   int len, blen, ret;
  238.   len = strlen(rtsp_cmp);
  239.   blen = rtsp_bytes_in_buffer(info);
  240.   if (len > blen) {
  241.     if (info->m_offset_on != 0) {
  242.       move_end_of_buffer(info, blen);
  243.     }
  244.     ret = rtsp_receive_socket(info,
  245.       info->m_resp_buffer + blen,
  246.       len - blen,
  247.       0,
  248.       0);
  249.     if (ret < 0) return 0;
  250.     info->m_buffer_len += ret;
  251.     blen = rtsp_bytes_in_buffer(info);
  252.   }
  253.   
  254.   if (len <= blen) {
  255.     if (strncasecmp(rtsp_cmp,
  256.     &info->m_resp_buffer[info->m_offset_on],
  257.     len) == 0) {
  258.       //rtsp_debug(LOG_DEBUG, "Found resp");
  259.       return (rtsp_get_resp(info, state));
  260.     }
  261.     info->m_offset_on++;
  262.     change_state(state, RTP_DATA_UNKNOWN);
  263.     return 0;
  264.   }
  265.   return -1;
  266. }
  267. static int check_rtp_header (rtsp_client_t *info,
  268.      rtp_state_t *state)
  269. {
  270.   int ix;
  271.   int blen;
  272.   blen = rtsp_bytes_in_buffer(info);
  273.   if (blen < 3) {
  274.     return -1;
  275.   }
  276.   state->header[0] = info->m_resp_buffer[info->m_offset_on];
  277.   state->header[1] = info->m_resp_buffer[info->m_offset_on + 1];
  278.   state->header[2] = info->m_resp_buffer[info->m_offset_on + 2];
  279.   ix = state->header[0] / 2;
  280.   if ((ix >= MAX_RTP_THREAD_SESSIONS) ||
  281.       !(info->m_callback[ix].rtp_callback_set)) {
  282.     // header failure
  283.     info->m_offset_on++;
  284.     change_state(state, RTP_DATA_UNKNOWN);
  285.     return 0;
  286.   }
  287.   state->rtp_len = (state->header[1] << 8) | state->header[2];
  288.   if (state->rtp_len < 1514) {
  289.     info->m_offset_on += 3; // increment past the header
  290.     return (get_rtp_packet(info, state));
  291.   }
  292.   // length is most likely incorrect
  293.   info->m_offset_on++;
  294.   change_state(state, RTP_DATA_UNKNOWN);
  295.   return 0;
  296. }
  297. static int data_unknown (rtsp_client_t *info,
  298.  rtp_state_t *state)
  299. {
  300.   int blen;
  301.   int ix;
  302.   blen = rtsp_bytes_in_buffer(info);
  303.   if (info->m_offset_on != 0) {
  304.     move_end_of_buffer(info, blen);
  305.   }
  306.   blen = rtsp_receive_socket(info,
  307.      info->m_resp_buffer + info->m_offset_on,
  308.      RECV_BUFF_DEFAULT_LEN - info->m_offset_on,
  309.      0,
  310.      0);
  311.   if (blen < 0) return -1;
  312.   info->m_buffer_len += blen;
  313.   blen = rtsp_bytes_in_buffer(info);
  314.   for (ix = 0; ix < blen; ix++) {
  315.     if (info->m_resp_buffer[info->m_offset_on] == '$') {
  316.       info->m_offset_on++;
  317.       change_state(state, RTP_HEADER_CHECK);
  318.       return 0;
  319.     } else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') {
  320.       change_state(state, RTSP_HEADER_CHECK);
  321.       return 0;
  322.     } else {
  323.       info->m_offset_on++;
  324.     }
  325.   }
  326.       
  327.   return -1;
  328. }
  329. /*
  330.  * rtsp_thread() - rtsp thread handler - receives and
  331.  * processes all data
  332.  */
  333. int rtsp_thread (void *data)
  334. {
  335.   rtsp_client_t *info = (rtsp_client_t *)data;
  336.   int continue_thread;
  337.   int ret;
  338.   unsigned int ix;
  339.   int state_cont;
  340.   int bytes;
  341.   int got_rtp_pak;
  342.   rtp_state_t state;
  343.   
  344.   continue_thread = 0;
  345.   memset(&state, sizeof(state), 0);
  346.   state.rtp_ptr = NULL;
  347.   state.state = RTP_DATA_UNKNOWN;
  348.   rtsp_thread_init_thread_info(info);
  349.   
  350.   while (continue_thread == 0) {
  351. //  rtsp_debug(LOG_DEBUG, "thread waiting");
  352.     ret = rtsp_thread_wait_for_event(info);
  353.     if (ret <= 0) {
  354.       if (ret < 0) {
  355. //rtsp_debug(LOG_ERR, "RTSP loop error %d errno %d", ret, errno);
  356.       } else {
  357. if (info->server_socket != -1) {
  358.   for (ix = 0; ix < MAX_RTP_THREAD_SESSIONS; ix++) {
  359.     if (info->m_callback[ix].rtp_callback_set &&
  360. info->m_callback[ix].rtp_periodic != NULL) {
  361.       (info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata);
  362.     }
  363.   }
  364. }
  365.       }
  366.       continue;
  367.     }
  368.     /*
  369.      * See if the communications socket for IPC has any data
  370.      */
  371. //rtsp_debug(LOG_DEBUG, "Thread checking control");
  372.     ret = rtsp_thread_has_control_message(info);
  373.     
  374.     if (ret) {
  375.       rtsp_msg_type_t msg_type;
  376.       int read;
  377.       /*
  378.        * Yes - read the message type.
  379.        */
  380.       read = rtsp_thread_get_control_message(info, &msg_type);
  381.       if (read == sizeof(msg_type)) {
  382. // received message
  383. //rtsp_debug(LOG_DEBUG, "Comm socket msg %d", msg_type);
  384. switch (msg_type) {
  385. case RTSP_MSG_QUIT:
  386.   continue_thread = 1;
  387.   break;
  388. case RTSP_MSG_START:
  389.   ret = rtsp_thread_start_cmd(info);
  390.   rtsp_thread_ipc_respond(info, ret);
  391.   break;
  392. case RTSP_MSG_SEND_AND_GET:
  393.   ret = rtsp_thread_send_and_get(info);
  394.   if (ret < 0) {
  395.     rtsp_thread_ipc_respond(info, ret);
  396.   } else {
  397.     // indicate we're supposed to receive...
  398.     state.receiving_rtsp_response = 1;
  399.   }
  400.   break;
  401. case RTSP_MSG_PERFORM_CALLBACK:
  402.   ret = rtsp_msg_thread_perform_callback(info);
  403.   rtsp_thread_ipc_respond(info, ret);
  404.   break;
  405. case RTSP_MSG_SET_RTP_CALLBACK:
  406.   ret = rtsp_msg_thread_set_rtp_callback(info);
  407.   rtsp_thread_ipc_respond(info, ret);
  408.   break;
  409. default:
  410. rtsp_debug(LOG_ERR, "Unknown message %d received", msg_type);
  411. }
  412.       }
  413.     }
  414.     /*
  415.      * See if the data socket has any data
  416.      */
  417. //rtsp_debug(LOG_DEBUG, "Thread checking socket");
  418.     if (info->server_socket != -1) {
  419.       ret = rtsp_thread_has_receive_data(info);
  420.       if (ret) {
  421. state_cont = 0;
  422. while (state_cont == 0) {
  423.   got_rtp_pak = 0;
  424.   switch (state.state) {
  425.   case RTP_DATA_UNKNOWN:
  426.     state_cont = data_unknown(info, &state);
  427.     break;
  428.   case RTP_HEADER_CHECK:
  429.     got_rtp_pak = 1;
  430.     state_cont = check_rtp_header(info, &state);
  431.     break;
  432.   case RTSP_HEADER_CHECK:
  433.     state_cont = check_rtsp_resp(info, &state);
  434.     break;
  435.   case RTP_DATA_START:
  436.     /*
  437.      * At the beginning... Either we're getting a $, or getting
  438.      * a RTP packet.
  439.      */
  440.     bytes = rtsp_bytes_in_buffer(info);
  441.     if (bytes < 4) {
  442.       if (bytes != 0 && info->m_offset_on != 0) {
  443. move_end_of_buffer(info, bytes);
  444.       }
  445.       ret = rtsp_receive_socket(info,
  446. info->m_resp_buffer + bytes,
  447. 4 - bytes,
  448. 0,
  449. 0);
  450.       if (ret < 0) {
  451. state_cont = 1;
  452. break;
  453.       }
  454.       bytes += ret;
  455.       info->m_offset_on = 0;
  456.       info->m_buffer_len = bytes;
  457.       if (bytes < 4) {
  458. state_cont  = 1;
  459. break;
  460.       }
  461.     }
  462.     // we either have a $ - indicating RTP, or a R (for RTSP response)
  463.     if (info->m_resp_buffer[info->m_offset_on] == '$') {
  464.       /*
  465.        * read the 3 byte header - 1 byte for interleaved channel,
  466.        * 2 byte length.
  467.        */
  468.       info->m_offset_on++;
  469.       ret = rtsp_recv(info, state.header, 3);
  470.       if (ret != 3) continue;
  471.       state.rtp_len = (state.header[1] << 8) | state.header[2];
  472.       state_cont = get_rtp_packet(info, &state);
  473.       got_rtp_pak = 1;
  474.     } else if (tolower(info->m_resp_buffer[info->m_offset_on]) == 'r') {
  475.       state_cont = rtsp_get_resp(info, &state);
  476.     } else {
  477.       info->m_offset_on++;
  478.       rtsp_debug(LOG_INFO, "Unknown data %d in rtp stream",
  479.   info->m_resp_buffer[info->m_offset_on]);
  480.       change_state(&state, RTP_DATA_UNKNOWN);
  481.     }
  482.     break;
  483.   case RTP_DATA_CONTINUE:
  484.     state_cont = get_rtp_packet(info, &state);
  485.     got_rtp_pak = 1;
  486.     break;
  487.   }
  488.   if (got_rtp_pak == 1 && state.try_periodic != 0) {
  489.     state.try_periodic = 0;
  490.     ix = state.header[0] / 2;
  491.     if (info->m_callback[ix].rtp_callback_set &&
  492. info->m_callback[ix].rtp_periodic != NULL) {
  493.       (info->m_callback[ix].rtp_periodic)(info->m_callback[ix].rtp_userdata);
  494.     }
  495.   }
  496. }
  497.       } // end server_socket has data
  498.     } // end have server socket
  499.   } // end while continue_thread
  500.   SDL_Delay(10);
  501.   /*
  502.    * Okay - we've gotten a quit - we're done
  503.    */
  504.   if (state.rtp_ptr != NULL) {
  505.     xfree(state.rtp_ptr);
  506.   }
  507.   // exiting thread - get rid of the sockets.
  508.   rtsp_thread_close(info);
  509. #ifdef _WINDOWS
  510.   WSACleanup();
  511. #endif
  512.   return 0;
  513. }
  514. int rtsp_thread_perform_callback (rtsp_client_t *info,
  515.   rtsp_thread_callback_f func,
  516.   void *ud)
  517. {
  518.   int ret, callback_ret;
  519.   rtsp_wrap_msg_callback_t callback_body;
  520.   callback_body.msg = RTSP_MSG_PERFORM_CALLBACK;
  521.   callback_body.body.func = func;
  522.   callback_body.body.ud = ud;
  523.   
  524.   ret = rtsp_thread_ipc_send_wait(info,
  525.   (unsigned char *)&callback_body,
  526.   sizeof(callback_body),
  527.   &callback_ret);
  528.   if (ret != sizeof(callback_ret)) {
  529.     return -1;
  530.   }
  531.   return (callback_ret);
  532. }
  533. int rtsp_thread_set_rtp_callback (rtsp_client_t *info,
  534.   rtp_callback_f rtp_callback,
  535.   rtsp_thread_callback_f rtp_periodic,
  536.   int rtp_interleave,
  537.   void *ud)
  538. {
  539.   int ret, callback_ret;
  540.   rtsp_wrap_msg_rtp_callback_t callback_body;
  541.   callback_body.msg = RTSP_MSG_SET_RTP_CALLBACK;
  542.   callback_body.body.callback_func = rtp_callback;
  543.   callback_body.body.periodic_func = rtp_periodic;
  544.   callback_body.body.ud = ud;
  545.   callback_body.body.interleave = rtp_interleave;
  546.   
  547.   ret = rtsp_thread_ipc_send_wait(info,
  548.   (unsigned char *)&callback_body,
  549.   sizeof(callback_body),
  550.   &callback_ret);
  551.   if (ret != sizeof(callback_ret)) {
  552.     return -1;
  553.   }
  554.   return (callback_ret);
  555. }  
  556. /*
  557.  * rtsp_create_client_for_rtp_tcp
  558.  * create threaded rtsp session
  559.  */
  560. rtsp_client_t *rtsp_create_client_for_rtp_tcp (const char *url,
  561.        int *err)
  562. {
  563.   rtsp_client_t *info;
  564.   int ret;
  565.   rtsp_msg_type_t msg;
  566.   rtsp_msg_resp_t resp;
  567. #if 0
  568.   if (func == NULL) {
  569.     rtsp_debug(LOG_CRIT, "Callback is NULL");
  570.     *err = EINVAL;
  571.     return NULL;
  572.   }
  573. #endif
  574.   info = rtsp_create_client_common(url, err);
  575.   if (info == NULL) return (NULL);
  576.   info->msg_mutex = SDL_CreateMutex();
  577.   if (rtsp_create_thread(info) != 0) {
  578.     free_rtsp_client(info);
  579.     return (NULL);
  580.   }
  581.   msg = RTSP_MSG_START;
  582.   ret = rtsp_thread_ipc_send_wait(info,
  583.   (unsigned char *)&msg,
  584.   sizeof(msg),
  585.   &resp);
  586.   if (ret < 0 || resp < 0) {
  587.     free_rtsp_client(info);
  588.     *err = resp;
  589.     return NULL;
  590.   }
  591.   
  592.   return (info);
  593. }