video_thread.cpp
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:11k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include "stdafx.h"
- #include "video_thread.h"
- #include "protocol_thread.h"
- #include "../../pusher/src/packet_header.h"
- #include "text_thread.h"
- #include "exceptions.h"
- // mpegator
- #include <mtruid.h>
- #include <mtrif.h>
- #include "mpegator.h"
- // -------------------- broadcast_target -----------------------------
- broadcast_target::broadcast_target( int _push_serv_id,
- const Sockaddr &control,
- const Sockaddr &broadcast )
- : push_serv_id( _push_serv_id ), control_addr( control ),
- broadcast_addr( broadcast ), control_sock( -1 ), broadcast_sock( -1 ),
- sequence( 0 )
- {
- }
- broadcast_target::~broadcast_target()
- {
- if( control_sock > 0 )
- close( control_sock );
- if( broadcast_sock > 0 )
- close( broadcast_sock );
- }
- void broadcast_target::connect()
- {
- if( broadcast_sock == -1 )
- {
- broadcast_sock = socket (PF_INET, SOCK_STREAM, 0);
- if( broadcast_sock < 0 )
- {
- broadcast_sock = -1;
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "socket: %s", sys_errlist[ errno ] );
- }
- // now connect
- if( 0 != ::connect( broadcast_sock,
- ( struct sockaddr * )&broadcast_addr.addr,
- sizeof( broadcast_addr.addr )))
- {
- broadcast_sock = -1;
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "connect: %s", sys_errlist[ errno ] );
- }
- }
- if( control_sock == -1 )
- {
- control_sock = socket (PF_INET, SOCK_DGRAM, 0);
- if( control_sock < 0 )
- {
- control_sock = -1;
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "socket: %s", sys_errlist[ errno ] );
- }
- // now connect
- if( 0 != ::connect( control_sock,
- ( struct sockaddr * )&control_addr.addr,
- sizeof( control_addr.addr )))
- {
- control_sock = -1;
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "connect: %s", sys_errlist[ errno ] );
- }
- }
- protocolThread->log( 1, "target connected to the push server" );
- protocolThread->report_push_serv_ok( push_serv_id );
- }
- /** Send to TCP stream the ready-to-resend by UDP "packet". Text may be
- incorporated as well. Defragment to not exceed maxPacketSize.
- Of course text is attached only to the first packet.
- raise exception if failed */
- void broadcast_target::send( const char *data, unsigned size,
- const std::string &text,
- const Timeval &now )
- {
- PacketHeader ph;
- bool text_is_sent = false;
-
- while( size > 0 )
- {
- unsigned plen = maxPacketSize;
- unsigned size_to_send = size;
-
- if( !text_is_sent )
- {
- if( plen - sizeof( PacketHeader ) > text.length() + size_to_send )
- plen = size + sizeof( PacketHeader ) + text.length();
- else
- if( plen < sizeof( PacketHeader ) + text.length() + size_to_send )
- size_to_send = plen - sizeof( PacketHeader ) - text.length();
- }
- else
- {
- if( plen - sizeof( PacketHeader ) > size_to_send )
- plen = size + sizeof( PacketHeader );
- else
- if( plen < sizeof( PacketHeader ) + size_to_send )
- size_to_send = plen - sizeof( PacketHeader );
- }
- strcpy( ph.type, mpegT );
- ph.when = now;
- *ph.filename = ' ';
- ph.packetSize = plen;
- if( text.length() > 0 && !text_is_sent )
- {
- ph.textOffset = sizeof( PacketHeader );
- ph.textSize = text.length();
- // do not copy text - send it by TCP after header
- ph.dataOffset = sizeof( PacketHeader ) + ph.textSize;
- }
- else
- {
- ph.textOffset = 0;
- ph.textSize = 0;
- ph.dataOffset = sizeof( PacketHeader );
- }
- ph.dataSize = size_to_send;
- ph.sequence = sequence++;
- // do not copy data to save time - it's TCP, so first send packet
- // header and text, next data
- // send header
- if( SOCKET_ERROR == ::send( broadcast_sock, ( const char * )&ph,
- sizeof( PacketHeader ), 0 ))
- {
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
- }
- // send text, if any
- if( ph.textSize > 0 )
- if( SOCKET_ERROR == ::send( broadcast_sock,
- text.c_str(), ph.textSize, 0 ))
- {
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
- }
- text_is_sent = true;
- // data
- if( SOCKET_ERROR == ::send( broadcast_sock, data, size_to_send, 0 ))
- {
- protocolThread->log
- ( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( this, "send: %s", sys_errlist[ errno ] );
- }
- // it may be not the last loop:
- size -= size_to_send;
- data += size_to_send;
- }
- }
- std::string broadcast_target::describe_itself() const
- {
- return std::string( "broadcast target, addr " ) +
- broadcast_addr.get_description() +
- ( broadcast_sock == -1? " not connected": " connected" );
- }
- // ------------------- video_thread ----------------------------------
- video_thread::video_thread( int argc, char **argv )
- : shutdown_now( false ), shutdown_performed( false ),
- shutdown_mutex( "lock shutdown procedure" ),
- add_drop_target_mutex( "lock adding/dropping broadcast targets" ),
- lock_new_text( "lock access to the new text" )
- {
- target_reconnect_time.now();
- }
- video_thread *video_thread::create_instance( int argc, char **argv )
- {
- // default. Add more servers here
- return new mpegator( argc, argv );
- }
- void video_thread::video_thread_entry_func( void *_instance )
- {
- video_thread *instance = (video_thread*) _instance;
- try
- {
- instance->init();
- instance->start_capture(); // in simple case capture always
- }
- catch ( VideoException e )
- {
- // that means game over
- protocolThread->log(1,"video exception: %s",e.getText().c_str());
- protocolThread->shutdown_requested();
- instance->shutdown();
- return;
- }
- // loop
- while( true )
- {
- try
- {
- instance->loop(); // until some exception
- }
- catch ( ShutdownException e )
- {
- // that means game over
- protocolThread->log(1,"shutdown exception: %s",e.getText().c_str());
- protocolThread->shutdown_requested();
- instance->shutdown();
- break;
- }
- catch ( VideoException e )
- {
- // that means game over
- protocolThread->log(1,"video exception: %s",e.getText().c_str());
- protocolThread->shutdown_requested();
- instance->shutdown();
- break;
- }
- catch ( NetworkException e )
- {
- protocolThread->log(1,"Network exception: %s",e.getText().c_str());
- broadcast_target *target = (broadcast_target *)e.get_pointer();
- // remove this target from broadcast list and put back into
- // waiting queue
- instance->reschedule_broadcast_target( target );
- }
- }
- }
- video_thread::~video_thread()
- {
- }
- void video_thread::loop()
- {
- while( true )
- {
- if( shutdown_now )
- {
- protocolThread->log
- ( 0, "throw ShutdownException, line %d", __LINE__ );
- throw ShutdownException( "shutdown requested" );
- }
- // try to load data from video board
- check_for_new_data();
- // protocol thread may supply some new targets to add
- check_for_add_drop_targets();
- }
- }
- void video_thread::check_for_add_drop_targets()
- {
- wait_for_mutex wm( add_drop_target_mutex, 3 );
- broadcast_target *target = NULL;
- if( target_reconnect_time.reached() )
- {
- try {
- while( add_target_q.size() > 0 )
- {
- target = add_target_q.front();
- // we keep the pointer, so we may pop right now
- add_target_q.pop();
- // makes sure that no duplicates:
- broadcast_targetsT::iterator it;
- for( it = broadcast_targets.begin(); it!=broadcast_targets.end();
- it++ )
- {
- if( (*it)->compare( *target ) )
- {
- // duplicate - do nothing
- break;
- }
- else if( target->get_push_serv_id() ==
- (*it)->get_push_serv_id() )
- {
- // id is the same - addr is different
- // maybe address did change? reconnect...
- // in static topology that's error, so log it
- protocolThread->log( 0, "strange change of address, "
- "line %d", __LINE__ );
- delete (*it);
- // connect and set to null - we don't need it anymore
- // exception may be rased here: connect
- target->connect();
- (*it) = target; // replace old if connected
- target = NULL;
- break;
- }
- }
- // check if previous loop finds duplicates:
- if( target == NULL )
- continue;
- // exception may be rased here: connect
- target->connect();
- broadcast_targets.push_back( target );
- protocolThread->log( 1, "connected to push server %s",
- target->describe_itself().c_str() );
- }
- } catch( NetworkException e ) {
- // log this
- protocolThread->log( 1, "connecting to push server: %s",
- e.getText().c_str() );
- // pop and put to the queue end (to reconnect later)
- add_target_q.push( target );
- // and set reconnect timeout - just 5 seconds
- target_reconnect_time.now();
- target_reconnect_time.tv_sec() += 5;
- }
- }
- else {
- # if defined( _DEBUG ) && 0
- protocolThread->log( 3, "target reconnect time not reached yet" );
- Timeval now;
- now.now();
- protocolThread->log( 3, "trigger time is %s, now %s",
- target_reconnect_time.get_description().c_str(),
- now.get_description().c_str() );
- # endif
- }
- // now process dropping queue
- while( drop_target_q.size() > 0 )
- {
- Sockaddr &addr = drop_target_q.front();
- // find this address in the list of targets and drop it
- broadcast_targetsT::iterator it;
- for( it = broadcast_targets.begin(); it != broadcast_targets.end(); it++)
- {
- if( (*it)->compare_addr( addr ) )
- {
- // stop that
- delete (*it);
- it = broadcast_targets.erase( it ); // erase and take next
- }
- }
- drop_target_q.pop(); // don't need it
- }
- }
- void video_thread::reschedule_broadcast_target( broadcast_target *target )
- {
- broadcast_targetsT::iterator it;
- for( it = broadcast_targets.begin(); it != broadcast_targets.end(); it++ )
- {
- if( (*it) == target )
- {
- it = broadcast_targets.erase( it );
- // and put it back into wait queue
- add_target_q.push( target );
- return;
- }
- }
- // if we are here this a very bad error!
- protocolThread->log( 0, "VERY bad error at line %d", __LINE__ );
- }
- // ------------ external --------------------------------------
- void video_thread::shutdown_requested()
- {
- shutdown_now = true;
- }
- void video_thread::add_broadcast_target( int pushserv_id,
- const Sockaddr &c_addr,
- const Sockaddr &b_addr )
- {
- wait_for_mutex wm( add_drop_target_mutex, 3 );
- // do not connect now!
- add_target_q.push( new broadcast_target( pushserv_id, c_addr, b_addr ));
- }
- void video_thread::new_text_data( text_data *data )
- {
- wait_for_mutex wm( lock_new_text, 1 );
- new_text.push( data );
- }