rtp_bytestream.cpp
上传用户:sun1608
上传日期:2007-02-02
资源大小:6116k
文件大小:22k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

Visual C++

  1. /*
  2.  * The contents of this file are subject to the Mozilla Public
  3.  * License Version 1.1 (the "License"); you may not use this file
  4.  * except in compliance with the License. You may obtain a copy of
  5.  * the License at http://www.mozilla.org/MPL/
  6.  * 
  7.  * Software distributed under the License is distributed on an "AS
  8.  * IS" basis, WITHOUT WARRANTY OF ANY KIND, either express or
  9.  * implied. See the License for the specific language governing
  10.  * rights and limitations under the License.
  11.  * 
  12.  * The Original Code is MPEG4IP.
  13.  * 
  14.  * The Initial Developer of the Original Code is Cisco Systems Inc.
  15.  * Portions created by Cisco Systems Inc. are
  16.  * Copyright (C) Cisco Systems Inc. 2001.  All Rights Reserved.
  17.  * 
  18.  * Contributor(s): 
  19.  *              Bill May        wmay@cisco.com
  20.  */
  21. #include "systems.h"
  22. #include <rtp/rtp.h>
  23. #include <rtp/memory.h>
  24. #include <sdp/sdp.h> // for NTP_TO_UNIX_TIME
  25. #include "rtp_bytestream.h"
  26. #include "our_config_file.h"
  27. //#define DEBUG_RTP_PAKS 1
  28. //#define DEBUG_RTP_BCAST 1
  29. #ifdef _WIN32
  30. DEFINE_MESSAGE_MACRO(rtp_message, "rtpbyst")
  31. #else
  32. #define rtp_message(loglevel, fmt...) message(loglevel, "rtpbyst", fmt)
  33. #endif
  34. /*
  35.  * add_rtp_packet_to_queue() - adds rtp packet to doubly linked lists - 
  36.  * this is used both by the bytestream, and by the player_media when trying
  37.  * to determine which rtp payload type is being used.
  38.  */
  39. int add_rtp_packet_to_queue (rtp_packet *pak, 
  40.      rtp_packet **head,
  41.      rtp_packet **tail)
  42. {
  43.   rtp_packet *q;
  44.   int inserted = TRUE;
  45.   int16_t diff;
  46. #ifdef DEBUG_RTP_PAKS
  47.   rtp_message(LOG_DEBUG, "CBThread %u - m %u pt %u seq %u ts %u len %d", 
  48.       SDL_ThreadID(),
  49.       pak->rtp_pak_m, pak->rtp_pak_pt, pak->rtp_pak_seq, 
  50.       pak->rtp_pak_ts, pak->rtp_data_len);
  51. #endif
  52.   
  53.   if (*head == NULL) {
  54.     *head = *tail = pak;
  55.   } else if (*head == *tail) {
  56.     if (pak->rtp_pak_seq == (*head)->rtp_pak_seq) {
  57.       rtp_message(LOG_ERR, "Duplicate RTP sequence number %d received", 
  58.   pak->rtp_pak_seq);
  59.       inserted = FALSE;
  60.     } else {
  61.       (*head)->rtp_next = pak;
  62.       (*head)->rtp_prev = pak;
  63.       pak->rtp_next = pak->rtp_prev = *head;
  64.       diff = pak->rtp_pak_seq - (*head)->rtp_pak_seq;
  65.       if (diff > 0) {
  66. *tail = pak;
  67.       } else {
  68. *head = pak;
  69.       }
  70.     }
  71.   } else if ((((*head)->rtp_pak_seq < (*tail)->rtp_pak_seq) &&
  72.       ((pak->rtp_pak_seq > (*tail)->rtp_pak_seq) || 
  73.        (pak->rtp_pak_seq < (*head)->rtp_pak_seq))) ||
  74.      (((*head)->rtp_pak_seq > (*tail)->rtp_pak_seq) &&
  75.       ((pak->rtp_pak_seq > (*tail)->rtp_pak_seq && 
  76. pak->rtp_pak_seq < (*head)->rtp_pak_seq)))) {
  77.     // insert between tail and head
  78.     // Maybe move head - probably move tail.
  79.     (*tail)->rtp_next = pak;
  80.     pak->rtp_prev = *tail;
  81.     (*head)->rtp_prev = pak;
  82.     pak->rtp_next = *head;
  83.     diff = (*head)->rtp_pak_seq - pak->rtp_pak_seq;
  84.     if (diff > 0 && diff < 4) {
  85.       // between tail and head, and really close to the head - move the
  86.       // head pointer
  87.       *head = pak;
  88.     } else {
  89.       // otherwise, just insert at end
  90.       *tail = pak;
  91.     }
  92.   } else {
  93.     // insert in middle
  94.     // Loop through until we find where it should fit
  95.     q = *head;
  96.     do {
  97.       // check for duplicates
  98.       if (pak->rtp_pak_seq == q->rtp_pak_seq || pak->rtp_pak_seq == q->rtp_next->rtp_pak_seq) {
  99. // dup seq number
  100. inserted = FALSE;
  101. break;
  102.       }
  103.       /*
  104.        * Okay - this is disgusting, but works.  The first part (before the
  105.        * or) will see if pak->rtp_pak_seq is between q and q->rtp_next, 
  106.        * assuming that the sequence number for q and q->rtp_next are ascending.
  107.        *
  108.        * 2nd part of the if is the converse case (q->rtp_next->rtp_pak_seq 
  109.        * is smaller than q->rtp_pak_seq).  In that case, we need to make 
  110.        * sure that pak->rtp_pak_seq is either larger than q->rtp_pak_seq 
  111.        * or less than q->rtp_next->rtp_pak_seq
  112.        */
  113.       if (((q->rtp_next->rtp_pak_seq > q->rtp_pak_seq) &&
  114.    (q->rtp_pak_seq < pak->rtp_pak_seq && 
  115.     pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq)) ||
  116.   ((q->rtp_next->rtp_pak_seq < q->rtp_pak_seq) &&
  117.    (pak->rtp_pak_seq < q->rtp_next->rtp_pak_seq || 
  118.     pak->rtp_pak_seq > q->rtp_pak_seq))) {
  119. q->rtp_next->rtp_prev = pak;
  120. pak->rtp_next = q->rtp_next;
  121. q->rtp_next = pak;
  122. pak->rtp_prev = q;
  123. break;
  124.       }
  125.       q = q->rtp_next;
  126.     } while (q != *tail);
  127.     if (q == *tail) {
  128.       inserted = FALSE;
  129.       rtp_message(LOG_ERR, "Couldn't insert %u between %u and %u", 
  130.   pak->rtp_pak_seq, 
  131.   (*head)->rtp_pak_seq, 
  132.   (*tail)->rtp_pak_seq);
  133.     }
  134.   }
  135.   if (inserted == FALSE) {
  136.     rtp_message(LOG_ERR, "Couldn't insert pak");
  137.     rtp_message(LOG_DEBUG, "pak seq %u", pak->rtp_pak_seq);
  138.     if (*head != NULL) {
  139.       rtp_message(LOG_DEBUG, "head seq %u, tail seq %u", 
  140.   (*head)->rtp_pak_seq,
  141.   (*tail)->rtp_pak_seq);
  142.     }
  143.     xfree(pak);
  144.     return (0);
  145.   }
  146.   return (1);
  147. }
  148. CRtpByteStreamBase::CRtpByteStreamBase(const char *name,
  149.        format_list_t *fmt,
  150.        unsigned int rtp_pt,
  151.        int ondemand,
  152.        uint64_t tps,
  153.        rtp_packet **head, 
  154.        rtp_packet **tail,
  155.        int rtp_seq_set,
  156.        uint16_t rtp_base_seq,
  157.        int rtp_ts_set,
  158.        uint32_t rtp_base_ts,
  159.        int rtcp_received,
  160.        uint32_t ntp_frac,
  161.        uint32_t ntp_sec,
  162.        uint32_t rtp_ts) :
  163.   COurInByteStream(name)
  164. {
  165.   m_fmt = fmt;
  166.   m_head = *head;
  167.   *head = NULL;
  168.   m_tail = *tail;
  169.   *tail = NULL;
  170.   m_rtp_base_ts_set = rtp_ts_set;
  171.   m_rtp_base_ts = rtp_base_ts;
  172.   m_rtp_base_seq_set = rtp_seq_set;
  173.   m_rtp_base_seq = rtp_base_seq;
  174.   
  175.   m_rtp_pt = rtp_pt;
  176.   uint64_t temp;
  177.   temp = config.get_config_value(CONFIG_RTP_BUFFER_TIME_MSEC);
  178.   if (temp > 0) {
  179.     m_rtp_buffer_time = temp;
  180.   } else {
  181.     m_rtp_buffer_time = (ondemand ? 2 : 5)* M_LLU;
  182.   }
  183.   m_rtptime_tickpersec = tps;
  184.   init();
  185.   m_ts = 0;
  186.   m_total =0;
  187.   m_skip_on_advance_bytes = 0;
  188.   m_stream_ondemand = ondemand;
  189.   m_wallclock_offset_set = 0;
  190.   m_rtp_packet_mutex = SDL_CreateMutex();
  191.   m_buffering = 0;
  192.   m_eof = 0;
  193.   if (rtcp_received) {
  194.     calculate_wallclock_offset_from_rtcp(ntp_frac, ntp_sec, rtp_ts);
  195.   }
  196. }
  197. CRtpByteStreamBase::~CRtpByteStreamBase (void)
  198. {
  199.   flush_rtp_packets();
  200.   if (m_rtp_packet_mutex) {
  201.     SDL_DestroyMutex(m_rtp_packet_mutex);
  202.     m_rtp_packet_mutex = NULL;
  203.   }
  204. }
  205. void CRtpByteStreamBase::init (void)
  206. {
  207.   m_wrap_offset = 0;
  208.   m_offset_in_pak = m_skip_on_advance_bytes;
  209.   m_eof = 0;
  210. }
  211. void CRtpByteStreamBase::set_wallclock_offset (uint64_t wclock, 
  212.        uint32_t rtp_ts) 
  213. {
  214.   m_wallclock_offset_set = 1;
  215.   SDL_LockMutex(m_rtp_packet_mutex);
  216.   m_wallclock_offset = wclock;
  217.   m_wallclock_rtp_ts = rtp_ts;
  218.   m_wallclock_offset_wrap = m_wrap_offset;
  219.   if (((m_ts & 0x80000000) == 0x80000000) &&
  220.       ((rtp_ts & 0x80000000) == 0)) {
  221.     m_wallclock_offset_wrap += (I_LLU << 32);
  222.   }
  223.   SDL_UnlockMutex(m_rtp_packet_mutex);
  224. }
  225. /*
  226.  * calculate_wallclock_offset_from_rtcp
  227.  * Given certain information from an rtcp message, Calculate what the
  228.  * wallclock time for rtp sequence number 0 would be.
  229.  */
  230. void
  231. CRtpByteStreamBase::calculate_wallclock_offset_from_rtcp (uint32_t ntp_frac,
  232.   uint32_t ntp_sec,
  233.   uint32_t rtp_ts)
  234. {
  235.   uint64_t wclock;
  236.   wclock = ntp_frac;
  237.   wclock *= M_LLU;
  238.   wclock /= (I_LLU << 32);
  239.   uint64_t offset;
  240.   offset = ntp_sec;
  241.   offset -= NTP_TO_UNIX_TIME;
  242.   offset *= M_LLU;
  243.   wclock += offset;
  244. #ifdef DEBUG_RTP_BCAST
  245.   rtp_message(LOG_DEBUG, "%s RTCP data - sec %u frac %u value %llu ts %d", 
  246.       m_name, ntp_sec, ntp_frac, wclock, rtp_ts);
  247. #endif
  248.   set_wallclock_offset(wclock, rtp_ts);
  249. }
  250. /*
  251.  * recv_callback - callback for when bytestream is active - basically, 
  252.  * put things on the queue
  253.  */
  254. void CRtpByteStreamBase::recv_callback (struct rtp *session, rtp_event *e)
  255. {
  256.   switch (e->type) {
  257.   case RX_RTP:
  258.     rtp_packet *rpak;
  259.     rpak = (rtp_packet *)e->data;
  260.     if (rpak->rtp_data_len == 0) {
  261.       xfree(rpak);
  262.     } else {
  263.       // need to add lock/unlock of mutex here
  264.       if (SDL_mutexP(m_rtp_packet_mutex) == -1) {
  265. rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv");
  266. return;
  267.       }
  268.       add_rtp_packet_to_queue(rpak, &m_head, &m_tail);
  269.       if (SDL_mutexV(m_rtp_packet_mutex) == -1) {
  270. rtp_message(LOG_CRIT, "SDL Lock mutex failure in rtp bytestream recv");
  271. return;
  272.       }
  273.       m_recvd_pak = 1;
  274.     }
  275.     break;
  276.   case RX_SR:
  277.     rtcp_sr *srpak;
  278.     srpak = (rtcp_sr *)e->data;
  279.     calculate_wallclock_offset_from_rtcp(srpak->ntp_frac, 
  280.  srpak->ntp_sec, 
  281.  srpak->rtp_ts);
  282.     break;
  283.   default:
  284. #if 0
  285.     rtp_message(LOG_DEBUG, "Thread %u - Callback from rtp with %d %p", 
  286. SDL_ThreadID(),e->type, e->rtp_data);
  287. #endif
  288.     break;
  289.     break;
  290.   }
  291. }
  292. void CRtpByteStreamBase::remove_packet_rtp_queue (rtp_packet *pak, 
  293.        int free)
  294. {
  295.   SDL_LockMutex(m_rtp_packet_mutex);
  296.   if ((m_head == pak) &&
  297.       (m_head->rtp_next == NULL || m_head->rtp_next == m_head)) {
  298.     m_head = NULL;
  299.     m_tail = NULL;
  300.   } else {
  301.     pak->rtp_next->rtp_prev = pak->rtp_prev;
  302.     pak->rtp_prev->rtp_next = pak->rtp_next;
  303.     if (m_head == pak) {
  304.       m_head = pak->rtp_next;
  305.     }
  306.     if (m_tail == pak) {
  307.       m_tail = pak->rtp_prev;
  308.     }
  309.   }
  310.   if (pak->rtp_data_len < 0) {
  311.     // restore the packet data length
  312.     pak->rtp_data_len = 0 - pak->rtp_data_len;
  313.   }
  314.   if (free == 1) {
  315.     xfree(pak);
  316.   }
  317.   SDL_UnlockMutex(m_rtp_packet_mutex);
  318. }
  319. void CRtpByteStreamBase::flush_rtp_packets (void)
  320. {
  321.   while (m_head != NULL) {
  322.     remove_packet_rtp_queue(m_head, 1);
  323.   }
  324.   m_buffering = 0;
  325. }
  326. /*
  327.  * recv_task - called from the player media rtp task - make sure
  328.  * we have 2 seconds of buffering, then go...
  329.  */
  330. int CRtpByteStreamBase::recv_task (int decode_thread_waiting)
  331. {
  332.   /*
  333.    * We need to make sure we have some buffering.  We'll buffer
  334.    * about 2 seconds worth, then let the decode task know to go...
  335.    */
  336.   if (m_buffering == 0) {
  337.     uint32_t head_ts, tail_ts;
  338.     if (m_head != NULL) {
  339.       /*
  340.        * Payload type the same.  Make sure we have at least 2 seconds of
  341.        * good data
  342.        */
  343.       if (rtp_ready() == 0) {
  344. rtp_message(LOG_DEBUG, 
  345.     "Determined payload type, but rtp bytestream is not ready");
  346. uint64_t calc;
  347. do {
  348.   head_ts = m_head->rtp_pak_ts;
  349.   tail_ts = m_tail->rtp_pak_ts;
  350.   calc = (tail_ts - head_ts);
  351.   calc *= 1000;
  352.   calc /= m_rtptime_tickpersec;
  353.   if (calc > m_rtp_buffer_time) {
  354.     rtp_packet *temp = m_head;
  355.     m_head = m_head->rtp_next;
  356.     m_tail->rtp_next = m_head;
  357.     m_head->rtp_prev = m_tail;
  358.     xfree((void *)temp);
  359.   }
  360. } while (calc > m_rtp_buffer_time);
  361. return 0;
  362.       }
  363.       if (check_rtp_frame_complete_for_payload_type()) {
  364. head_ts = m_head->rtp_pak_ts;
  365. tail_ts = m_tail->rtp_pak_ts;
  366. if (head_ts > tail_ts &&
  367.     ((head_ts & (1 << 31)) == (tail_ts & (1 << 31)))) {
  368.   return 0;
  369. }
  370. uint64_t calc;
  371. calc = tail_ts;
  372. calc -= head_ts;
  373. calc *= M_LLU;
  374. calc /= m_rtptime_tickpersec;
  375. if (calc > m_rtp_buffer_time) {
  376.   if (m_rtp_base_ts_set == 0) {
  377.     rtp_message(LOG_NOTICE, "Setting rtp seq and time from 1st pak");
  378.     m_rtp_base_ts = m_head->rtp_pak_ts;
  379.     m_rtp_base_ts_set = 1;
  380.     m_rtpinfo_set_from_pak = 1;
  381.   } else {
  382.     m_rtpinfo_set_from_pak = 0;
  383.     if (m_rtp_base_seq_set != 0 &&
  384. m_rtp_base_seq == m_head->rtp_pak_seq &&
  385. m_rtp_base_ts != m_head->rtp_pak_ts) {
  386.       rtp_message(LOG_NOTICE, "%s - rtp ts doesn't match RTPInfo %d", 
  387.   m_name, m_head->rtp_pak_ts);
  388.       m_rtp_base_ts = m_head->rtp_pak_ts;
  389.     }
  390.     //
  391.   }
  392.   m_buffering = 1;
  393. #if 1
  394.   rtp_message(LOG_INFO, 
  395.       "%s buffering complete - seq %d head %u tail %u "LLD, 
  396.       m_name, m_head->rtp_pak_seq,
  397.       head_ts, tail_ts, calc);
  398. #endif
  399.   rtp_done_buffering();
  400.   
  401. }
  402.       }
  403.     }
  404.   } else {
  405.     if (decode_thread_waiting != 0) {
  406.       /*
  407.        * We're good with buffering - but the decode thread might have
  408.        * caught up, and will be waiting.  Post a message to kickstart
  409.        * it
  410.        */
  411.       if (check_rtp_frame_complete_for_payload_type()) {
  412. return (1);
  413.       }
  414.     }
  415.     if (m_recvd_pak == 0) {
  416.       if (m_recvd_pak_timeout == 0) {
  417. m_recvd_pak_timeout_time = get_time_of_day();
  418.       } else {
  419. uint64_t timeout;
  420. timeout = get_time_of_day() - m_recvd_pak_timeout_time;
  421. if (m_stream_ondemand) {
  422.   uint64_t range_end = (uint64_t)(get_max_playtime() * 1000.0);
  423.   if (m_last_realtime + timeout >= range_end) {
  424.     rtp_message(LOG_DEBUG, "%s Timedout at range end", m_name);
  425.     m_eof = 1;
  426.   }
  427. } else {
  428.   // broadcast - perhaps if we time out for a second or 2, we
  429.   // should re-init rtp ?  We definately need to put some timing
  430.   // checks here.
  431.   session_desc_t *sptr = m_fmt->media->parent;
  432.   if (sptr->time_desc != NULL &&
  433.       sptr->time_desc->end_time != 0) {
  434.     time_t this_time;
  435.     this_time = time(NULL);
  436.     if (this_time > sptr->time_desc->end_time && 
  437. timeout >= M_LLU) {
  438.       m_eof = 1;
  439.     }
  440.   }
  441.   
  442. }
  443.       }
  444.       m_recvd_pak_timeout++;
  445.     } else {
  446.       m_recvd_pak = 0;
  447.       m_recvd_pak_timeout = 0;
  448.     }
  449.   }
  450.   return (m_buffering);
  451. }
  452. int CRtpByteStreamBase::check_rtp_frame_complete_for_payload_type (void)
  453. {
  454.   return (m_head && m_tail->rtp_pak_m == 1);
  455. }
  456. uint64_t CRtpByteStreamBase::rtp_ts_to_msec (uint32_t ts,
  457.      uint64_t &wrap_offset)
  458. {
  459.   uint64_t timetick;
  460.   uint64_t adjusted_rtp_ts;
  461.   uint64_t adjusted_wc_rtp_ts;
  462.   if (((m_ts & 0x80000000) == 0x80000000) &&
  463.       ((ts & 0x80000000) == 0)) {
  464.     wrap_offset += (I_LLU << 32);
  465.   }
  466.   if (m_stream_ondemand) {
  467.     adjusted_rtp_ts = wrap_offset;
  468.     adjusted_rtp_ts += ts;
  469.     adjusted_wc_rtp_ts = m_rtp_base_ts;
  470.     if (adjusted_wc_rtp_ts > adjusted_rtp_ts) {
  471.       timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts;
  472.       timetick *= M_LLU;
  473.       timetick /= m_rtptime_tickpersec;
  474.       if (timetick > m_play_start_time) {
  475. timetick = 0;
  476.       } else {
  477. timetick = m_play_start_time - timetick;
  478.       }
  479.     } else {
  480.       timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts;
  481.       timetick *= M_LLU;
  482.       timetick /= m_rtptime_tickpersec;
  483.       timetick += m_play_start_time;
  484.     }
  485.   } else {
  486.     SDL_LockMutex(m_rtp_packet_mutex);
  487.     adjusted_rtp_ts = wrap_offset;
  488.     adjusted_rtp_ts += ts;
  489.     adjusted_wc_rtp_ts = m_wallclock_offset_wrap;
  490.     adjusted_wc_rtp_ts += m_wallclock_rtp_ts;
  491.     SDL_UnlockMutex(m_rtp_packet_mutex);
  492.     if (adjusted_rtp_ts >= adjusted_wc_rtp_ts) {
  493.       timetick = adjusted_rtp_ts - adjusted_wc_rtp_ts;
  494.       timetick *= M_LLU;
  495.       timetick /= m_rtptime_tickpersec;
  496.       timetick += m_wallclock_offset;
  497.     } else {
  498.       timetick = adjusted_wc_rtp_ts - adjusted_rtp_ts;
  499.       timetick *= M_LLU;
  500.       timetick /= m_rtptime_tickpersec;
  501.       timetick = m_wallclock_offset - timetick;
  502.     }
  503. #ifdef DEBUG_RTP_BCAST
  504.     rtp_message(LOG_DEBUG, "%s wcts %llu ts %llu wcntp %llu tp %llu",
  505. m_name, adjusted_wc_rtp_ts, adjusted_rtp_ts, m_wallclock_offset,
  506. timetick);
  507. #endif
  508.   }
  509.   // record time
  510.   m_last_realtime = timetick;
  511.   return (timetick);
  512. }
  513. CRtpByteStream::CRtpByteStream(const char *name,
  514.        format_list_t *fmt,
  515.        unsigned int rtp_pt,
  516.        int ondemand,
  517.        uint64_t tickpersec,
  518.        rtp_packet **head, 
  519.        rtp_packet **tail,
  520.        int rtp_seq_set,
  521.        uint16_t rtp_base_seq,
  522.        int rtp_ts_set,
  523.        uint32_t rtp_base_ts,
  524.        int rtcp_received,
  525.        uint32_t ntp_frac,
  526.        uint32_t ntp_sec,
  527.        uint32_t rtp_ts) :
  528.   CRtpByteStreamBase(name, fmt, rtp_pt, ondemand, tickpersec, head, tail,
  529.      rtp_seq_set, rtp_base_seq, rtp_ts_set, rtp_base_ts, 
  530.      rtcp_received, ntp_frac, ntp_sec, rtp_ts)
  531. {
  532.   m_buffer = (uint8_t *)malloc(4096);
  533.   m_buffer_len_max = 4096;
  534.   m_bytes_used = m_buffer_len = 0;
  535. }
  536. CRtpByteStream::~CRtpByteStream (void)
  537. {
  538.   free(m_buffer);
  539.   m_buffer = NULL;
  540. }
  541. void CRtpByteStream::reset (void)
  542. {
  543.   m_buffer_len = m_bytes_used = 0;
  544.   CRtpByteStreamBase::reset();
  545. }
  546. uint64_t CRtpByteStream::start_next_frame (uint8_t **buffer, 
  547.    uint32_t *buflen,
  548.    void **ud)
  549. {
  550.   uint16_t seq = 0;
  551.   uint32_t ts = 0;
  552.   uint64_t timetick;
  553.   int first = 0;
  554.   int finished = 0;
  555.   rtp_packet *rpak;
  556.   int32_t diff;
  557.   diff = m_buffer_len - m_bytes_used;
  558.   m_doing_add = 0;
  559.   if (diff > 2) {
  560.     // Still bytes in the buffer...
  561.     *buffer = m_buffer + m_bytes_used;
  562.     *buflen = diff;
  563. #ifdef DEBUG_RTP_PAKS
  564.     rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);
  565. #endif
  566.     return (m_last_realtime);
  567.   } else {
  568.     m_buffer_len = 0;
  569.     while (finished == 0) {
  570.       rpak = m_head;
  571.       remove_packet_rtp_queue(rpak, 0);
  572.       
  573.       if (first == 0) {
  574. seq = rpak->rtp_pak_seq + 1;
  575. ts = rpak->rtp_pak_ts;
  576. first = 1;
  577.       } else {
  578. if ((seq != rpak->rtp_pak_seq) ||
  579.     (ts != rpak->rtp_pak_ts)) {
  580.   if (seq != rpak->rtp_pak_seq) {
  581.     rtp_message(LOG_INFO, "%s missing rtp sequence - should be %u is %u", 
  582. m_name, seq, rpak->rtp_pak_seq);
  583.   } else {
  584.     rtp_message(LOG_INFO, "%s timestamp error - seq %u should be %x is %x", 
  585. m_name, seq, ts, rpak->rtp_pak_ts);
  586.   }
  587.   m_buffer_len = 0;
  588.   ts = rpak->rtp_pak_ts;
  589. }
  590. seq = rpak->rtp_pak_seq + 1;
  591.       }
  592.       uint8_t *from;
  593.       uint32_t len;
  594.       from = (uint8_t *)rpak->rtp_data + m_skip_on_advance_bytes;
  595.       len = rpak->rtp_data_len - m_skip_on_advance_bytes;
  596.       if ((m_buffer_len + len) > m_buffer_len_max) {
  597. // realloc
  598. m_buffer_len_max = m_buffer_len + len + 1024;
  599. m_buffer = (uint8_t *)realloc(m_buffer, m_buffer_len_max);
  600.       }
  601.       memcpy(m_buffer + m_buffer_len, 
  602.      from,
  603.      len);
  604.       m_buffer_len += len;
  605.       if (rpak->rtp_pak_m == 1) {
  606. finished = 1;
  607.       }
  608.       xfree(rpak);
  609.     }
  610.     m_bytes_used = 0;
  611.     *buffer = m_buffer + m_bytes_used;
  612.     *buflen = m_buffer_len - m_bytes_used;
  613. #ifdef DEBUG_RTP_PAKS
  614.     rtp_message(LOG_DEBUG, "%s buffer len %d", m_name, m_buffer_len);
  615. #endif
  616.   }
  617.   timetick = rtp_ts_to_msec(ts, m_wrap_offset);
  618.   m_ts = ts;
  619.   
  620.   return (timetick);
  621. }
  622. int CRtpByteStream::skip_next_frame (uint64_t *pts, int *hasSyncFrame,
  623.      uint8_t **buffer, 
  624.      uint32_t *buflen)
  625. {
  626.   uint64_t ts;
  627.   *hasSyncFrame = -1;  // we don't know if we have a sync frame
  628.   m_buffer_len = m_bytes_used = 0;
  629.   if (m_head == NULL) return 0;
  630.   ts = m_head->rtp_pak_ts;
  631.   do {
  632.     remove_packet_rtp_queue(m_head, 1);
  633.   } while (m_head != NULL && m_head->rtp_pak_ts == ts);
  634.   if (m_head == NULL) return 0;
  635.   init();
  636.   m_buffer_len = m_bytes_used = 0;
  637.   ts = start_next_frame(buffer, buflen, NULL);
  638.   *pts = ts;
  639.   return (1);
  640. }
  641. void CRtpByteStream::used_bytes_for_frame (uint32_t bytes)
  642. {
  643.   m_bytes_used += bytes;
  644. #ifdef DEBUG_RTP_PAKS
  645.   rtp_message(LOG_DEBUG, "%s Used %d bytes", m_name, bytes);
  646. #endif
  647. }
  648. int CRtpByteStream::have_no_data (void)
  649. {
  650.   rtp_packet *temp, *first;
  651.   first = temp = m_head;
  652.   if (temp == NULL) return TRUE;
  653.   do {
  654.     if (temp->rtp_pak_m == 1) return FALSE;
  655.     temp = temp->rtp_next;
  656.   } while (temp != NULL && temp != first);
  657.   return TRUE;
  658. }
  659. void CRtpByteStream::flush_rtp_packets (void)
  660. {
  661.   CRtpByteStreamBase::flush_rtp_packets();
  662.   m_bytes_used = m_buffer_len = 0;
  663. }
  664. CAudioRtpByteStream::CAudioRtpByteStream (unsigned int rtp_pt,
  665.   format_list_t *fmt,
  666.   int ondemand,
  667.   uint64_t tps,
  668.   rtp_packet **head, 
  669.   rtp_packet **tail,
  670.   int rtp_seq_set,
  671.   uint16_t rtp_base_seq,
  672.   int rtp_ts_set,
  673.   uint32_t rtp_base_ts,
  674.   int rtcp_received,
  675.   uint32_t ntp_frac,
  676.   uint32_t ntp_sec,
  677.   uint32_t rtp_ts) :
  678.   CRtpByteStream("audio", 
  679.  fmt,
  680.  rtp_pt,
  681.  ondemand,
  682.  tps,
  683.  head, 
  684.  tail,
  685.  rtp_seq_set, rtp_base_seq, 
  686.  rtp_ts_set, rtp_base_ts,
  687.  rtcp_received,
  688.  ntp_frac,
  689.  ntp_sec,
  690.  rtp_ts)
  691. {
  692.   init();
  693.   m_working_pak = NULL;
  694. }
  695. CAudioRtpByteStream::~CAudioRtpByteStream(void)
  696. {
  697. }
  698. int CAudioRtpByteStream::have_no_data (void)
  699. {
  700.   if (m_head == NULL) return TRUE;
  701.   return FALSE;
  702. }
  703. int CAudioRtpByteStream::check_rtp_frame_complete_for_payload_type (void)
  704. {
  705.   return m_head != NULL;
  706. }
  707. void CAudioRtpByteStream::reset (void)
  708. {
  709.   if (m_working_pak != NULL) {
  710.     xfree(m_working_pak);
  711.     m_working_pak = NULL;
  712.   }
  713.   CRtpByteStream::reset();
  714. }
  715. uint64_t CAudioRtpByteStream::start_next_frame (uint8_t **buffer, 
  716. uint32_t *buflen,
  717. void **ud)
  718. {
  719.   uint32_t ts;
  720.   int32_t diff;
  721.   if (m_working_pak != NULL) {
  722.     diff = m_working_pak->rtp_data_len - m_bytes_used;
  723.   } else diff = 0;
  724.   m_doing_add = 0;
  725.   if (diff > 2) {
  726.     // Still bytes in the buffer...
  727.     *buffer = (uint8_t *)m_working_pak->rtp_data + m_bytes_used;
  728.     *buflen = diff;
  729.     ts = m_ts;
  730. #ifdef DEBUG_RTP_PAKS
  731.     rtp_message(LOG_DEBUG, "%s Still left - %d bytes", m_name, *buflen);
  732. #endif
  733.     return (m_last_realtime);
  734.   } else {
  735.     if (m_working_pak) xfree(m_working_pak);
  736.     m_buffer_len = 0;
  737.     m_working_pak = m_head;
  738.     remove_packet_rtp_queue(m_working_pak, 0);
  739.     *buffer = (uint8_t *)m_working_pak->rtp_data;
  740.     *buflen = m_working_pak->rtp_data_len;
  741.     ts = m_working_pak->rtp_pak_ts;
  742. #ifdef DEBUG_RTP_PAKS
  743.     rtp_message(LOG_DEBUG, "%s buffer seq %d ts %x len %d", m_name, 
  744. m_working_pak->rtp_pak_seq, 
  745. m_working_pak->rtp_pak_ts, m_buffer_len);
  746. #endif
  747.   }
  748.   // We're going to have to handle wrap better...
  749.   uint64_t retts = rtp_ts_to_msec(ts, m_wrap_offset);
  750.   m_ts = ts;
  751.   return retts;
  752. }