video_thread.cpp
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:11k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. #include "stdafx.h"
  6. #include "video_thread.h"
  7. #include "protocol_thread.h"
  8. #include "../../pusher/src/packet_header.h"
  9. #include "text_thread.h"
  10. #include "exceptions.h"
  11. // mpegator
  12. #include <mtruid.h>
  13. #include <mtrif.h>
  14. #include "mpegator.h"
  15. // -------------------- broadcast_target -----------------------------
  16. broadcast_target::broadcast_target( int _push_serv_id, 
  17.     const Sockaddr &control, 
  18.     const Sockaddr &broadcast ) 
  19.   : push_serv_id( _push_serv_id ), control_addr( control ), 
  20.     broadcast_addr( broadcast ), control_sock( -1 ), broadcast_sock( -1 ),
  21.     sequence( 0 )
  22. {
  23. }
  24. broadcast_target::~broadcast_target()
  25. {
  26.   if( control_sock > 0 )
  27.     close( control_sock );
  28.   if( broadcast_sock > 0 )
  29.     close( broadcast_sock );
  30. }
  31. void broadcast_target::connect()
  32. {
  33.   if( broadcast_sock == -1 )
  34.     {
  35.       broadcast_sock = socket (PF_INET, SOCK_STREAM, 0);
  36.       if( broadcast_sock < 0 )
  37. {
  38.   broadcast_sock = -1;
  39.   protocolThread->log
  40.     ( 0, "throw NetworkException, line %d", __LINE__ );
  41.   throw NetworkException( this, "socket: %s", sys_errlist[ errno ] );
  42. }
  43.       // now connect
  44.       if( 0 != ::connect( broadcast_sock, 
  45.   ( struct sockaddr * )&broadcast_addr.addr,
  46.   sizeof( broadcast_addr.addr )))
  47.   broadcast_sock = -1;
  48.   protocolThread->log
  49.     ( 0, "throw NetworkException, line %d", __LINE__ );
  50.   throw NetworkException( this, "connect: %s", sys_errlist[ errno ] );
  51. }
  52.     }
  53.   if( control_sock == -1 )
  54.     {
  55.       control_sock = socket (PF_INET, SOCK_DGRAM, 0);
  56.       if( control_sock < 0 )
  57. {
  58.   control_sock = -1;
  59.   protocolThread->log
  60.     ( 0, "throw NetworkException, line %d", __LINE__ );
  61.   throw NetworkException( this, "socket: %s", sys_errlist[ errno ] );
  62. }
  63.       // now connect
  64.       if( 0 != ::connect( control_sock, 
  65.   ( struct sockaddr * )&control_addr.addr,
  66.   sizeof( control_addr.addr )))
  67.   control_sock = -1;
  68.   protocolThread->log
  69.     ( 0, "throw NetworkException, line %d", __LINE__ );
  70.   throw NetworkException( this, "connect: %s", sys_errlist[ errno ] );
  71. }
  72.     }
  73.   protocolThread->log( 1, "target connected to the push server" );
  74.   protocolThread->report_push_serv_ok( push_serv_id );
  75. }
  76. /** Send to TCP stream the ready-to-resend by UDP "packet". Text may be 
  77.     incorporated as well. Defragment to not exceed maxPacketSize.
  78.     Of course text is attached only to the first packet.
  79.     raise exception if failed */
  80. void broadcast_target::send( const char *data, unsigned size, 
  81.      const std::string &text, 
  82.      const Timeval &now )
  83. {
  84.   PacketHeader ph;
  85.   bool text_is_sent = false;
  86.   
  87.   while( size > 0 )
  88.     {
  89.       unsigned plen = maxPacketSize;
  90.       unsigned size_to_send = size;
  91.       
  92.       if( !text_is_sent )
  93. {
  94.   if( plen - sizeof( PacketHeader )  > text.length() + size_to_send )
  95.     plen = size + sizeof( PacketHeader ) + text.length();
  96.   else 
  97.     if( plen < sizeof( PacketHeader ) + text.length() + size_to_send )
  98.       size_to_send = plen - sizeof( PacketHeader ) - text.length();
  99. }
  100.       else
  101. {
  102.   if( plen - sizeof( PacketHeader )  > size_to_send )
  103.     plen = size + sizeof( PacketHeader );
  104.   else 
  105.     if( plen < sizeof( PacketHeader ) + size_to_send )
  106.       size_to_send = plen - sizeof( PacketHeader );
  107. }
  108.       strcpy( ph.type, mpegT );
  109.       ph.when = now;
  110.       *ph.filename = '';
  111.       ph.packetSize = plen;
  112.       if( text.length() > 0 && !text_is_sent )
  113. {
  114.   ph.textOffset = sizeof( PacketHeader );
  115.   ph.textSize = text.length();
  116.   // do not copy text - send it by TCP after header
  117.   ph.dataOffset = sizeof( PacketHeader ) + ph.textSize;
  118. }
  119.       else
  120. {
  121.   ph.textOffset = 0;
  122.   ph.textSize = 0;
  123.   ph.dataOffset = sizeof( PacketHeader );
  124. }
  125.       ph.dataSize = size_to_send;
  126.       ph.sequence = sequence++;
  127.       // do not copy data to save time - it's TCP, so first send packet
  128.       // header and text, next data
  129.       // send header
  130.       if( SOCKET_ERROR == ::send( broadcast_sock, ( const char * )&ph, 
  131.   sizeof( PacketHeader ), 0 ))
  132. {
  133.   protocolThread->log
  134.     ( 0, "throw NetworkException, line %d", __LINE__ );
  135.   throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
  136. }
  137.       // send text, if any
  138.       if( ph.textSize > 0 )
  139. if( SOCKET_ERROR == ::send( broadcast_sock, 
  140.     text.c_str(), ph.textSize, 0 ))
  141.   {
  142.     protocolThread->log
  143.       ( 0, "throw NetworkException, line %d", __LINE__ );
  144.     throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
  145.   }
  146.       text_is_sent = true;
  147.       // data
  148.       if( SOCKET_ERROR == ::send( broadcast_sock, data, size_to_send, 0 ))
  149. {
  150.   protocolThread->log
  151.     ( 0, "throw NetworkException, line %d", __LINE__ );
  152.   throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
  153. }
  154.       // it may be not the last loop: 
  155.       size -= size_to_send;
  156.       data += size_to_send;
  157.     }
  158. }
  159. std::string broadcast_target::describe_itself() const
  160. {
  161.   return std::string( "broadcast target, addr " ) + 
  162.     broadcast_addr.get_description() +
  163.     ( broadcast_sock == -1? " not connected": " connected" );
  164. }
  165. // ------------------- video_thread ----------------------------------
  166. video_thread::video_thread( int argc, char **argv )
  167.   : shutdown_now( false ), shutdown_performed( false ),
  168.     shutdown_mutex( "lock shutdown procedure" ), 
  169.     add_drop_target_mutex( "lock adding/dropping broadcast targets" ),
  170.     lock_new_text( "lock access to the new text" )
  171. {
  172.   target_reconnect_time.now();
  173. }
  174. video_thread *video_thread::create_instance( int argc, char **argv )
  175. {
  176.   // default. Add more servers here
  177.   return new mpegator( argc, argv );
  178. }
  179. void video_thread::video_thread_entry_func( void *_instance )
  180. {
  181.   video_thread *instance = (video_thread*) _instance;
  182.   try
  183.     {
  184.       instance->init();
  185.       instance->start_capture();  // in simple case capture always
  186.     }
  187.   catch ( VideoException e )
  188.     {
  189.       // that means game over
  190.       protocolThread->log(1,"video exception: %s",e.getText().c_str());
  191.       protocolThread->shutdown_requested();
  192.       instance->shutdown();
  193.       return;
  194.     }
  195.   // loop
  196.   while( true )
  197.     {
  198.       try
  199. {
  200.   instance->loop();       // until some exception
  201. }
  202.       catch ( ShutdownException e )
  203. {
  204.   // that means game over
  205.   protocolThread->log(1,"shutdown exception: %s",e.getText().c_str());
  206.   protocolThread->shutdown_requested();
  207.   instance->shutdown();
  208.   break;
  209. }
  210.       catch ( VideoException e )
  211. {
  212.   // that means game over
  213.   protocolThread->log(1,"video exception: %s",e.getText().c_str());
  214.   protocolThread->shutdown_requested();
  215.   instance->shutdown();
  216.   break;
  217. }
  218.       catch ( NetworkException e )
  219. {
  220.   protocolThread->log(1,"Network exception: %s",e.getText().c_str());
  221.   broadcast_target *target = (broadcast_target *)e.get_pointer();
  222.   // remove this target from broadcast list and put back into 
  223.   // waiting queue
  224.   instance->reschedule_broadcast_target( target );
  225. }
  226.     }
  227. }
  228. video_thread::~video_thread()
  229. {
  230. }
  231. void video_thread::loop()
  232. {
  233.   while( true )
  234.     {
  235.       if( shutdown_now )
  236. {
  237.   protocolThread->log
  238.     ( 0, "throw ShutdownException, line %d", __LINE__ );
  239.   throw ShutdownException( "shutdown requested" );
  240. }
  241.       // try to load data from video board
  242.       check_for_new_data();
  243.       // protocol thread may supply some new targets to add
  244.       check_for_add_drop_targets();
  245.     }
  246. }
  247. void video_thread::check_for_add_drop_targets()
  248. {
  249.   wait_for_mutex wm( add_drop_target_mutex, 3 );
  250.   broadcast_target *target = NULL;
  251.   if( target_reconnect_time.reached() )
  252.     {
  253.       try {
  254. while( add_target_q.size() > 0 )
  255.   {
  256.     target = add_target_q.front();
  257.     // we keep the pointer, so we may pop right now
  258.     add_target_q.pop();
  259.     // makes sure that no duplicates:
  260.     broadcast_targetsT::iterator it;
  261.     for( it = broadcast_targets.begin(); it!=broadcast_targets.end(); 
  262.  it++ )
  263.       {
  264. if( (*it)->compare( *target ) )
  265.   {
  266.     // duplicate - do nothing
  267.     break;
  268.   }
  269. else if( target->get_push_serv_id() == 
  270.  (*it)->get_push_serv_id() )
  271.   {
  272.     // id is the same - addr is different
  273.     // maybe address did change? reconnect...
  274.     // in static topology that's error, so log it
  275.     protocolThread->log( 0, "strange change of address, "
  276.  "line %d", __LINE__ );
  277.     delete (*it);
  278.     // connect and set to null - we don't need it anymore
  279.     // exception may be rased here: connect
  280.     target->connect();
  281.     (*it) = target; // replace old if connected
  282.     target = NULL;
  283.     break;
  284.   }
  285.       }
  286.     // check if previous loop finds duplicates:
  287.     if( target == NULL )
  288.       continue;
  289.     // exception may be rased here: connect
  290.     target->connect();
  291.     broadcast_targets.push_back( target );
  292.     protocolThread->log( 1, "connected to push server %s", 
  293.  target->describe_itself().c_str() );
  294.   }
  295.       } catch( NetworkException e ) {
  296. // log this
  297. protocolThread->log( 1, "connecting to push server: %s", 
  298.      e.getText().c_str() );
  299. // pop and put to the queue end (to reconnect later)
  300. add_target_q.push( target );
  301. // and set reconnect timeout - just 5 seconds
  302. target_reconnect_time.now();
  303. target_reconnect_time.tv_sec() += 5;
  304.       }
  305.     }
  306.   else {
  307. #   if defined( _DEBUG ) && 0
  308.     protocolThread->log( 3, "target reconnect time not reached yet" );
  309.     Timeval now;
  310.     now.now();
  311.     protocolThread->log( 3, "trigger time is %s, now %s",
  312.  target_reconnect_time.get_description().c_str(), 
  313.  now.get_description().c_str() );
  314. #   endif
  315.   }
  316.   // now process dropping queue
  317.   while( drop_target_q.size() > 0 )
  318.     {
  319.       Sockaddr &addr = drop_target_q.front();
  320.       // find this address in the list of targets and drop it
  321.       broadcast_targetsT::iterator it;
  322.       for( it = broadcast_targets.begin(); it != broadcast_targets.end(); it++)
  323. {
  324.   if( (*it)->compare_addr( addr ) )
  325.     {
  326.       // stop that
  327.       delete (*it);
  328.       it = broadcast_targets.erase( it ); // erase and take next
  329.     }
  330. }
  331.       drop_target_q.pop();  // don't need it
  332.     }
  333. }
  334. void video_thread::reschedule_broadcast_target( broadcast_target *target )
  335. {
  336.   broadcast_targetsT::iterator it;
  337.   for( it = broadcast_targets.begin(); it != broadcast_targets.end(); it++ )
  338.     {
  339.       if( (*it) == target )
  340. {
  341.   it = broadcast_targets.erase( it );
  342.   // and put it back into wait queue 
  343.   add_target_q.push( target );
  344.   return;
  345. }
  346.     }
  347.   // if we are here this a very bad error!
  348.   protocolThread->log( 0, "VERY bad error at line %d", __LINE__ );
  349. }
  350. // ------------ external --------------------------------------
  351. void video_thread::shutdown_requested()
  352. {
  353.   shutdown_now = true;
  354. }
  355. void video_thread::add_broadcast_target( int pushserv_id, 
  356.  const Sockaddr &c_addr,
  357.  const Sockaddr &b_addr )
  358. {
  359.   wait_for_mutex wm( add_drop_target_mutex, 3 );
  360.   // do not connect now!
  361.   add_target_q.push( new broadcast_target( pushserv_id, c_addr, b_addr ));
  362. }
  363. void video_thread::new_text_data( text_data *data )
  364. {
  365.   wait_for_mutex wm( lock_new_text, 1 );
  366.   new_text.push( data );
  367. }