RtspRtpProcessor.cxx
上传用户:sy_wanhua
上传日期:2013-07-25
资源大小:3048k
文件大小:17k
源码类别:

流媒体/Mpeg4/MP4

开发平台:

C/C++

  1. /* ====================================================================
  2.  * The Vovida Software License, Version 1.0 
  3.  * 
  4.  * Copyright (c) 2000 Vovida Networks, Inc.  All rights reserved.
  5.  * 
  6.  * Redistribution and use in source and binary forms, with or without
  7.  * modification, are permitted provided that the following conditions
  8.  * are met:
  9.  * 
  10.  * 1. Redistributions of source code must retain the above copyright
  11.  *    notice, this list of conditions and the following disclaimer.
  12.  * 
  13.  * 2. Redistributions in binary form must reproduce the above copyright
  14.  *    notice, this list of conditions and the following disclaimer in
  15.  *    the documentation and/or other materials provided with the
  16.  *    distribution.
  17.  * 
  18.  * 3. The names "VOCAL", "Vovida Open Communication Application Library",
  19.  *    and "Vovida Open Communication Application Library (VOCAL)" must
  20.  *    not be used to endorse or promote products derived from this
  21.  *    software without prior written permission. For written
  22.  *    permission, please contact vocal@vovida.org.
  23.  *
  24.  * 4. Products derived from this software may not be called "VOCAL", nor
  25.  *    may "VOCAL" appear in their name, without prior written
  26.  *    permission of Vovida Networks, Inc.
  27.  * 
  28.  * THIS SOFTWARE IS PROVIDED "AS IS" AND ANY EXPRESSED OR IMPLIED
  29.  * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
  30.  * OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, TITLE AND
  31.  * NON-INFRINGEMENT ARE DISCLAIMED.  IN NO EVENT SHALL VOVIDA
  32.  * NETWORKS, INC. OR ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT DAMAGES
  33.  * IN EXCESS OF 281421,000, NOR FOR ANY INDIRECT, INCIDENTAL, SPECIAL,
  34.  * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
  35.  * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
  36.  * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
  37.  * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38.  * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
  39.  * USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
  40.  * DAMAGE.
  41.  * 
  42.  * ====================================================================
  43.  * 
  44.  * This software consists of voluntary contributions made by Vovida
  45.  * Networks, Inc. and many individuals on behalf of Vovida Networks,
  46.  * Inc.  For more information on Vovida Networks, Inc., please see
  47.  * <http://www.vovida.org/>.
  48.  *
  49.  */
  50. static const char* const RtspRtpProcessor_cxx_version =
  51.     "$Id: RtspRtpProcessor.cxx,v 1.37 2001/07/26 21:47:01 kle Exp $";
  52. #include "RtspServer.hxx"
  53. #include "RtspRtpProcessor.hxx"
  54. #include "StateMachine.hxx"
  55. #include "OpPlay.hxx"
  56. #include "OpPause.hxx"
  57. #include <iostream>
  58. static const int MIN_HANDLING_LOAD = 10;
  59. // singelton object definition
  60. RtspRtpProcessor* RtspRtpProcessor::myInstance = 0;
  61. RtspRtpProcessor&
  62. RtspRtpProcessor::instance()
  63. {
  64.     if( myInstance == 0 )
  65.     {
  66.         myInstance = new RtspRtpProcessor();
  67.     }
  68.     return *myInstance;
  69. }
  70. RtspRtpProcessor::RtspRtpProcessor()
  71.     : myRtpPortLow( 8000 ),
  72.       myRtpPortHigh( 8001 ),
  73.       myPlayBuffer( 0 ),
  74.       myPlayBufferMax( 0 ),
  75.       myMaxRecordFd( 0 ),
  76.       myPrevSelectResult( 0 ),
  77.       myRecBuffer( 0 ),
  78.       myRecBufferMax( 0 ),
  79.       myShutdown( false )
  80. {
  81. #if defined (_linux_)    
  82.     // register destroy function to delete singelton
  83.     if( atexit( RtspRtpProcessor::destroy ) < 0 )
  84.     {
  85.         cpLog( LOG_ALERT, "Failed to register with atexit()" );
  86.     }
  87. #endif
  88. }
  89. RtspRtpProcessor::~RtspRtpProcessor()
  90. {
  91. }
  92. void
  93. RtspRtpProcessor::destroy()
  94. {
  95.     delete RtspRtpProcessor::myInstance;
  96.     RtspRtpProcessor::myInstance = 0;
  97. }
  98. void*
  99. RtspRtpProcessor::playThreadWrapper( void* p )
  100. {
  101.     return static_cast<RtspRtpProcessor*>(p)->runPlayThread();
  102. }
  103. void*
  104. RtspRtpProcessor::recordThreadWrapper( void* p )
  105. {
  106.     return static_cast<RtspRtpProcessor*>(p)->runRecordThread();
  107. }
  108. void
  109. RtspRtpProcessor::startThread()
  110. {
  111.     myPlayThread.spawn( recordThreadWrapper, this );
  112.     myRecordThread.spawn( playThreadWrapper, this );
  113. }
  114. void
  115. RtspRtpProcessor::stopThread()
  116. {
  117.     cpLog( LOG_DEBUG, "RtspRtpProcessor threads stopping" );
  118.     // stop threads
  119.     myShutdownMutex.lock();
  120.     myShutdown = true;
  121.     myShutdownMutex.unlock();
  122.     // wait until threads complete
  123.     myPlayingFifo.add( 0 );
  124.     myPlayThread.join();
  125.     myRecordThread.join();
  126. }
  127. bool
  128. RtspRtpProcessor::addInitalEvent( Sptr<RtspSession> session )
  129. {
  130.     if( session->sessionMode() == RTSP_SESSION_MODE_PLAY )
  131.     {
  132.         cpLog( LOG_DEBUG, "Adding session %s into playing fifo",
  133.                session->sessionId().getData() );
  134.         // add to play fifo
  135.         session->myFifoTimeBase.now();
  136.         myPlayingFifo.addUntil( session, session->myFifoTimeBase );
  137.     }
  138.     else if( session->sessionMode() == RTSP_SESSION_MODE_REC )
  139.     {
  140.         cpLog( LOG_DEBUG, "Adding session %s into recording list",
  141.                session->sessionId().getData() );
  142.         // add to recording list
  143.         myRecordingListMutex.lock();
  144.         myRecordingList.push_front( session );
  145.         myRecordingListMutex.unlock();
  146.         // add to select map
  147.         int fd = session->rtpFd();
  148.         myFdSetMutex.lock();
  149.         FD_SET( fd, &myBaseRecordFd );
  150.         if( fd > myMaxRecordFd )  
  151.         {
  152.             myMaxRecordFd = fd;
  153.         }
  154.         myFdSetMutex.unlock();
  155.     }
  156.     else
  157.     {
  158.         cpLog( LOG_ERR, "Fail: Adding unknown mode to RtspRtpProcessor" );
  159.         assert( 0 );
  160.     }
  161.     cpLog( LOG_ERR, "Now processing %d play and %d record rtp sessions",
  162.            myPlayingFifo.size(), myRecordingList.size() );
  163.     return true;
  164. }
  165. bool
  166. RtspRtpProcessor::delEvent( Sptr<RtspSession> session )
  167. {
  168.     if( session->sessionMode() == RTSP_SESSION_MODE_PLAY )
  169.     {
  170.         // remove session from playing fifo
  171.         cpLog( LOG_DEBUG, "Removing session %s from playing fifo",
  172.                session->sessionId().getData() );
  173.         vusleep( session->myPacketIntervalMs * 1000 );
  174.     }
  175.     else if( session->sessionMode() == RTSP_SESSION_MODE_REC )
  176.     {
  177.         // remove session from recording list
  178.         cpLog( LOG_DEBUG, "Removing session %s from recording list",
  179.                session->sessionId().getData() );
  180.         myFdSetMutex.lock();
  181.         FD_CLR( session->rtpFd(), &myBaseRecordFd );
  182.         myFdSetMutex.unlock();
  183.         myRecordingListMutex.lock();
  184.         myRecordingList.remove( session );
  185.         myRecordingListMutex.unlock();
  186.     }
  187.     else
  188.     {
  189.         cpLog( LOG_ERR, "Deleting unknown session mode from RtspRtpProcessor" );
  190.         return false;
  191.     }
  192.     cpLog( LOG_DEBUG, "Now processing %d play and %d record rtp sessions",
  193.            myPlayingFifo.size(), myRecordingList.size() );
  194.     return true;
  195. }
  196. bool
  197. RtspRtpProcessor::buildRtpPortList( const int low, const int high )
  198. {
  199.     bool result = true;
  200.     if( low >= high || ( ( high - low ) % 2 != 0 ) )
  201.     {
  202.         cpLog( LOG_ERR, "Rtp Port Range valid: %d-%d, using default values",
  203.                low, high );
  204.         assert( ! ( myRtpPortLow >= myRtpPortHigh ||
  205.                   ( ( myRtpPortHigh- myRtpPortLow ) % 2 != 0 ) ) );
  206.         result = false;
  207.     }
  208.     else
  209.     {
  210.         myRtpPortLow = low;
  211.         myRtpPortHigh = high;
  212.     }
  213.     cpLog( LOG_DEBUG, "Rtp Port Range: %d-%d", myRtpPortLow, myRtpPortHigh );
  214.     for( int i = myRtpPortLow; i < myRtpPortHigh; i += 2 )
  215.     {
  216.         myRtpPortSet.insert( i );
  217.     }
  218.     return result;
  219. }
  220. int
  221. RtspRtpProcessor::allocateRtpPort()
  222. {
  223.     if( myRtpPortSet.empty() )
  224.     {
  225.         return 0;
  226.     }
  227.     int result = *myRtpPortSet.begin();
  228.     myRtpPortSet.erase( myRtpPortSet.begin() );
  229.     cpLog( LOG_DEBUG, "Rtp port %d allocated", result );
  230.     return result;
  231. }
  232. bool
  233. RtspRtpProcessor::deallocateRtpPort( const int port )
  234. {
  235.     if( port >= myRtpPortLow  &&  port <= myRtpPortHigh )
  236.     {
  237.         cpLog( LOG_DEBUG, "Rtp port %d deallocated", port );
  238.         myRtpPortSet.insert( port );
  239.         return true;
  240.     }
  241.     return false;
  242. }
  243. void*
  244. RtspRtpProcessor::runPlayThread()
  245. {
  246.     cpLog( LOG_DEBUG, "RtspRtpProcessor play thread running" );
  247.     Sptr<RtspSession> session;
  248.     Sptr<State> PLAY_STATE =
  249.                 StateMachine::instance().findState( "StatePlaying", 1 );
  250.     myPlayBufferMax = 1024;
  251.     myPlayBuffer = new char[myPlayBufferMax];
  252.     while( 1 )
  253.     {
  254.         session = myPlayingFifo.getNext();
  255.         if( session == 0 )
  256.         {
  257.             // check if stop processing
  258.             myShutdownMutex.lock();
  259.             bool shutdownNow = myShutdown;
  260.             myShutdownMutex.unlock();
  261.             if( shutdownNow )
  262.             {
  263.                 break;
  264.             }
  265.             else
  266.             {
  267.                 assert( 0 );
  268.             }
  269.         }
  270.         if( cpLogGetPriority() >= LOG_DEBUG_HB )
  271.         {
  272.             //cerr<<"play";
  273.         }
  274.         if( session->myState != PLAY_STATE )
  275.         {
  276.             cpLog( LOG_DEBUG,
  277.                    "Server trying to remove session from play fifo" );
  278.             continue;
  279.         } 
  280.         // process rtp packet
  281.         if( !playRtpPacket( session ) )
  282.         {
  283.             cpLog( LOG_DEBUG, "Fail in playRtpPacket" );
  284.             continue;
  285.         }
  286.         session->myCurrentNpt += session->myPacketIntervalMs;
  287.         session->myPacketCounter++;
  288.         // check for pause request
  289.         if( session->myPausePoint != -1  &&
  290.             session->myPausePoint <= session->myCurrentNpt )
  291.         {
  292.             OpPause::processPendingEvent( session->pendingPause() );
  293.             // check for pending play requests for immediate unpause
  294.             if( session->myPendingEvents.messageAvailable() )
  295.             {
  296.                 OpPlay::processPendingEvent(
  297.                         session->myPendingEvents.getNext() );
  298.             }
  299.         }
  300.         // check if done sending
  301.         if( session->myPacketTotal == -1  ||
  302.             session->myPacketCounter < session->myPacketTotal )
  303.         {
  304.             // add next packet event into fifo
  305.             milliseconds_t delay = session->myPacketIntervalMs *
  306.                                    session->myPacketCounter;
  307.             TimeVal fifoTime = session->myFifoTimeBase + delay;
  308.             myPlayingFifo.addUntil( session, fifoTime );
  309.         }
  310.         else
  311.         {
  312.             // This play request is done, update statistics
  313.             if (RtspConfiguration::instance().logStatistics)
  314.             {
  315.                 session->myPerPlayReqData.myBytesSent = 
  316.                     session->myPerPlayReqData.myPktsSent *
  317.                     session->fileHandler()->packetSize();
  318.                 session->myPerPlayReqData.myStreamWaitSent.stop();
  319.                 if (session->myPerPlayReqData.myPktsSent > 0)
  320.                 {
  321.                     session->myStatsMutex.lock();
  322.                     session->myStats.myPlayDataList.push_front(session->myPerPlayReqData);
  323.                     session->myStatsMutex.unlock();
  324.                     session->myPerPlayReqData.myPktsSent = 0;
  325.                 }
  326.             }
  327.             // start next play request
  328.             if( session->myPendingEvents.messageAvailable() )
  329.             {
  330.                 OpPlay::processPendingEvent(
  331.                         session->myPendingEvents.getNext() );
  332.             }
  333.             else
  334.             {
  335.                 cpLog( LOG_DEBUG,
  336.                        "Done processing rtp packets for %s at npt %d",
  337.                        session->sessionId().getData(),
  338.                        session->myCurrentNpt );
  339.                 session->myPausePoint = - 1;
  340.                 session->state( StateMachine::instance().
  341.                                 findState( "StatePausePlay" ) );
  342.             }
  343.         }
  344.     }
  345.     return 0;
  346. }
  347. bool
  348. RtspRtpProcessor::playRtpPacket( Sptr<RtspSession> session )
  349. {
  350.     int cc = session->myFileHandler->read( myPlayBuffer, myPlayBufferMax );
  351.     if( cc > 0 )
  352.     {
  353.         if( !session->myRtpSession->transmitRaw( myPlayBuffer, cc ) )
  354.         {
  355.             cpLog( LOG_DEBUG, "Error in myRtpSession->transmit()", cc );
  356.         }
  357.         else
  358.         {
  359.             if (RtspConfiguration::instance().logStatistics)
  360.             {
  361.                 session->myPerPlayReqData.myPktsSent++;
  362.             }
  363.         }
  364.         return true;
  365.     }
  366.     else if( cc == 0 )
  367.     {
  368.         cpLog( LOG_DEBUG, "End of file reached" );
  369.         // modify values to correct debug printing
  370.         session->myPacketTotal = session->myPacketCounter;
  371.         session->myCurrentNpt -= session->myPacketIntervalMs;
  372.         return true;
  373.     }
  374.     else // cc < 0
  375.     {
  376.         cpLog( LOG_DEBUG, "Error in myFileHandler.read(), %d", cc );
  377.         return false;
  378.     }
  379. }
  380. bool
  381. RtspRtpProcessor::playThreadLoaded() 
  382. {
  383.     if( (int) myPlayingFifo.size() > MIN_HANDLING_LOAD )
  384.     {
  385.         if( myPlayingFifo.sizePending() == 0 )
  386.         {
  387.             return true;
  388.         }
  389.     }
  390.     return false;
  391. }
  392. void*
  393. RtspRtpProcessor::runRecordThread()
  394. {
  395.     cpLog( LOG_DEBUG, "RtspRtpProcessor record thread running" );
  396.     Sptr<State> RECORD_STATE =
  397.                 StateMachine::instance().findState( "StateRecording", 1 );
  398.     myRecBufferMax = 1024;
  399.     myRecBuffer = new char[myRecBufferMax];
  400.     myMaxRecordFd = 0;
  401.     FD_ZERO( &myBaseRecordFd );
  402.     struct timeval recTimeout;
  403.     int selectResult;
  404.     myPrevSelectResult = 0;
  405.     while( 1 )
  406.     {
  407.         // setup select parameters
  408.         myFdSetMutex.lock();
  409.         fd_set recordFd = myBaseRecordFd;
  410.         myFdSetMutex.unlock();
  411.         recTimeout.tv_sec = 3;
  412.         recTimeout.tv_usec = 0;
  413.         selectResult = select( myMaxRecordFd+1, &recordFd, 0, 0, &recTimeout );
  414.         if( cpLogGetPriority() >= LOG_DEBUG_HB )
  415.         {
  416.             //cerr<<"rec";
  417.         }
  418.         if( selectResult < 0 )
  419.         {
  420.             //TODO handle select errors better
  421.             //TODO pass EINT as continue;
  422.             cpLog( LOG_DEBUG, "Select() error in runRecordThread" );
  423.         }
  424.         else if( selectResult == 0 )
  425.         {
  426.             // check if stop processing
  427.             myShutdownMutex.lock();
  428.             bool shutdownNow = myShutdown;
  429.             myShutdownMutex.unlock();
  430.             if( shutdownNow )
  431.             {
  432.                 break;
  433.             }
  434.             else
  435.             {
  436.                 continue;
  437.             }
  438.         }
  439.         myPrevSelectResult = selectResult;
  440.         // process some sessions
  441.         myRecordingListMutex.lock();
  442.         for( RecordingList::iterator itr = myRecordingList.begin();
  443.              itr != myRecordingList.end(); itr++ )
  444.         {
  445.             if( FD_ISSET( (*itr)->rtpFd(), &recordFd ) )
  446.             {
  447.                 Sptr<RtspSession> session = (*itr);
  448.                 if( session->myState == RECORD_STATE )
  449.                 {
  450.                     // process rtp packet
  451.                     recordRtpPacket( session );
  452.                     // check for pause request
  453.                     if( session->myPausePoint != -1  &&
  454.                         session->myPausePoint <= session->myCurrentNpt )
  455.                     {
  456.                         OpPause::processPendingEvent( session->pendingPause() );
  457.                     }
  458.                 }
  459.                 else
  460.                 {
  461.                     cpLog( LOG_DEBUG,
  462.                       "Server trying to remove session from recording list" );
  463.                 }
  464.             }
  465.         }
  466.         myRecordingListMutex.unlock();
  467.     }
  468.     delete []myRecBuffer;
  469.     myRecBuffer = 0;
  470.     return 0;
  471. }
  472. void
  473. RtspRtpProcessor::recordRtpPacket ( Sptr<RtspSession> session )
  474. {
  475.     myRtpPacket = session->myRtpSession->receive();
  476.     if( RtspConfiguration::instance().logStatistics && !myRtpPacket )
  477.     {
  478.         if( !session->myPerRecordReqData.myFirstPktsFlag && myRtpPacket )
  479.         {
  480.             session->myPerRecordReqData.myFirstPktsFlag = true;
  481.             session->myPerRecordReqData.myPreStreamWaitRecvd.stop();
  482.             session->myPerRecordReqData.myStreamWaitRecvd.start();
  483.         }
  484.     }
  485.     while( myRtpPacket )
  486.     {
  487.         session->myFileHandler->write( myRtpPacket->getPayloadLoc(),
  488.                                        myRtpPacket->getPayloadUsage(),
  489.                                        myRtpPacket->getSequence(),
  490.                                        myRtpPacket->getRtpTime() );
  491.         //TODO check result of write call
  492.         //TODO check record bytes haven't exceeded max
  493.         // max = RtspConfiguration::instance().maxRecordingFileBytes
  494.         delete myRtpPacket;
  495.         session->myCurrentNpt += session->myPacketIntervalMs;
  496.         session->myPacketCounter++;
  497.         if (RtspConfiguration::instance().logStatistics)
  498.         {
  499.             session->myPerRecordReqData.myPktsRecvd++;
  500.         }
  501.         myRtpPacket = session->myRtpSession->receive();
  502.     }
  503. }
  504. bool
  505. RtspRtpProcessor::recordThreadLoaded() 
  506. {
  507.     int recSize = myRecordingList.size();
  508.     if( recSize > MIN_HANDLING_LOAD )
  509.     {
  510.         if( myPrevSelectResult == recSize )
  511.         {
  512.             return true;
  513.         }
  514.     }
  515.     return false;
  516. }
  517. /* Local Variables: */
  518. /* c-file-style: "stroustrup" */
  519. /* indent-tabs-mode: nil */
  520. /* c-file-offsets: ((access-label . -) (inclass . ++)) */
  521. /* c-basic-offset: 4 */
  522. /* End: */