vclient_sih.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:19k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <arpa/inet.h>
- #undef HAVE_UNISTD_H
- #include <mtvp/mtvp.h>
- #include "mtvp/misc.h"
- #include "mtvp/sih_api.h"
- #undef assert
- #include <assert.h>
- #include "session.h"
- #include "playback_window.h"
- #include "../../pusher/src/packet_header.h"
- #include "../../pusher/src/timeval.h"
- #include "../../pusher/src/sockaddr.h"
- //#define DEBUG
- #define CLOSE_SOCK
- const unsigned maxPacketLength = 1472;
- static void debug( int level, const SIH_RESOURCES *sih_resources,
- const char *format, ... )
- {
- # ifdef DEBUG
- va_list ap;
- va_start( ap, format );
- vprintf( format, ap );
- va_end( ap );
- # else
- va_list ap;
- va_start( ap, format );
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) + 1024 ];
- msg->type = msg->debug;
- msg->int_param = level;
- vsnprintf( msg->data, 1024, format, ap );
- va_end( ap );
- (*sih_resources->feedback)(sih_resources, msg,
- sizeof( mtv_msg ) + strlen( msg->data ) );
- delete msg;
- # endif
- }
- /** just forvard declarations...
- */
- class HANDLER_DATA;
- class STREAM_DATA {
- public:
- HANDLER_DATA *handler_data; /* should always be there */
- int sock;
- /** UDP packet is loaded into FIFO buffer, including header. When output to
- the playerSocket, header must be stripped */
- queue< PacketHeader * > dataToPlay;
- int sequence;
- STREAM_DATA( const Sockaddr &addr, const SIH_RESOURCES *sih_resources )
- : sock( -1 )
- {
- # ifdef CLOSE_SOCK
- sock = open_socket( addr, sih_resources );
- # endif
- }
- static int open_socket( const Sockaddr &addr,
- const SIH_RESOURCES *sih_resources )
- {
- int sock = socket( PF_INET, SOCK_DGRAM, 0 );
- if( -1 == sock )
- {
- debug( 1, sih_resources,
- "SIH plugin: socket fails, %sn", sys_errlist[ errno ] );
- return -1;
- }
- if( -1 == bind ( sock, (struct sockaddr *) &addr.addr,
- sizeof(addr.addr)))
- {
- sock = -1;
- debug( 1, sih_resources,
- "SIH plugin: bind playback %d port fails, %sn",
- sock, sys_errlist[ errno ] );
- return -1;
- }
- // set non-block mode
- int oldflags = fcntl( sock, F_GETFL, 0 );
- if( oldflags == -1 )
- {
- close( sock );
- sock = -1;
- debug( 1, sih_resources,
- "SIH plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return -1;
- }
- oldflags |= O_NONBLOCK;
- if( -1 == fcntl( sock, F_SETFL, oldflags ))
- {
- close( sock );
- sock = -1;
- debug( 1, sih_resources,
- "SIH plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return -1;
- }
- debug( 2, sih_resources,
- "SIH: new sockaddr is %dn", sock );
- return sock;
- }
- /** do not close socket - all sockets remains open until program ends,
- if not directed by #ifdef CLOSE_SOCK
- */
- ~STREAM_DATA()
- {
- while( dataToPlay.size() > 0 )
- {
- delete dataToPlay.front();
- dataToPlay.pop();
- }
- # ifdef CLOSE_SOCK
- close( sock );
- # ifdef DEBUG
- cout << "SIH: close socketn";
- # endif
- # endif
- }
- };
- class static_stream_dataC {
- STREAM_DATA *stream_data;
- Sockaddr saddr;
- # ifndef CLOSE_SOCK
- /** when allocates stream_data - copy this sockaddr there
- */
- int static_sockaddr;
- # endif
- public:
- static_stream_dataC() : stream_data( NULL )
- # ifndef CLOSE_SOCK
- , static_sockaddr( -1 )
- # endif
- {}
- /** we expect the url to be plugin_url_prefix host_name_or_ip:port
- ( plugin_url_prefix must end by :// ) in the case when host is
- specified or in the form plugin_url_prefix port
- if host must be set to htonl (INADDR_ANY);
- */
- static_stream_dataC( const char *url, HANDLER_DATA *handler_data,
- const SIH_RESOURCES *sih_resources )
- : stream_data( NULL )
- # ifndef CLOSE_SOCK
- , static_sockaddr( -1 )
- # endif
- {
- cout << "static_stream_dataCn";
- if( strncmp( url, plugin_url_prefix, sizeof( plugin_url_prefix )-1 ))
- {
- debug( 1, sih_resources,
- "SIH plugin: wrong url prefixn" );
- return; // error
- }
- string site_and_port( url + sizeof( plugin_url_prefix ) - 1 );
- // and fill out the saddr structure:
- saddr.addr.sin_family = AF_INET;
-
- int pos = site_and_port.find( ':' );
- if( pos == -1 )
- {
- // no hostaddr -> any addr
- saddr.addr.sin_addr.s_addr = htonl (INADDR_ANY); // input
- saddr.addr.sin_port = htons( atoi( site_and_port.c_str() ) );
- }
- else
- {
- string s_site( site_and_port, 0, pos );
- string s_port( site_and_port, pos + 1 );
- // detect if site is hostname or ip addr
- if( s_site[0] < '0' || s_site[0] > '9' )
- {
- // hostname
- struct hostent *he = gethostbyname( s_site.c_str() );
- if( he == NULL )
- {
- perror( "gethostbyname" );
- return;
- }
- saddr.addr.sin_addr = *(struct in_addr *) he->h_addr;
- }
- else
- {
- // just numbers of IP address
- if( !inet_aton( s_site.c_str(), &saddr.addr.sin_addr ))
- {
- perror( "Error converting inet_aton" );
- return;
- }
- }
- saddr.addr.sin_port = htons( atoi( s_port.c_str() ) );
- }
- # ifndef CLOSE_SOCK
- static_sockaddr =
- STREAM_DATA::open_socket( saddr, sih_resources );
- # endif
- }
- /** close socket, finally
- */
- ~static_stream_dataC()
- {
- if( stream_data )
- {
- cerr << "SIH: delete stream_data for opened stream" << endl;
- delete stream_data;
- }
- # ifndef CLOSE_SOCK
- close( static_sockaddr );
- cerr << "SIH: close socket " << static_sockaddr << endl;
- # endif
- }
-
- bool isValid()
- {
- // port is not zero only if constructor executed without errors
- return saddr.addr.sin_port != 0;
- }
- STREAM_DATA *allocate_stream_data( HANDLER_DATA *handler_data,
- const SIH_RESOURCES *sih_resources )
- {
- if( stream_data != NULL )
- {
- cerr << "SIH: try to allocate stream_data when it is already "
- "allocated" << endl;
- return stream_data;
- }
- stream_data = new STREAM_DATA( saddr, sih_resources );
- stream_data->handler_data = handler_data;
- # ifndef CLOSE_SOCK
- stream_data->sock = static_sockaddr;
- # endif
- return stream_data;
- }
- /** just return true if this is ours stream data
- */
- bool check_by_stream_data( STREAM_DATA *_stream_data )
- {
- return stream_data == _stream_data;
- }
- void deallocate_stream_data_struct()
- {
- # if defined( CLOSE_SOCK ) && defined( DEBUG )
- cout << "SIH: deallocate_stream_datan";
- # endif
- delete stream_data;
- stream_data = NULL;
- }
- };
- /** map from url name to static stream data
- */
- typedef map< string, static_stream_dataC * > url_to_streamdataT;
- /** will be allocated by the SIH_init
- */
- class HANDLER_DATA {
- const SIH_RESOURCES *sih_resources;
- url_to_streamdataT url_to_streamdata;
- public:
- stack< PacketHeader * > inusedBuffers;
- int previous_timestamp_sec;
- public:
- HANDLER_DATA(const SIH_RESOURCES *r) : sih_resources(r) {}
- ~HANDLER_DATA()
- {
- // loop to close all streams
- url_to_streamdataT::iterator it;
-
- for( it = url_to_streamdata.begin(); it != url_to_streamdata.end();
- it++ )
- {
- // remove static_stream_dataC
- delete (*it).second;
- }
- // and free the map itself
- url_to_streamdata.clear();
- }
- /** find existing stream ( socket ) or create the new one
- */
- STREAM_DATA *get_stream_dat_byUrl( const char *url,
- HANDLER_DATA *handler_data )
- {
- if( strncmp( url, plugin_url_prefix, sizeof( plugin_url_prefix )-1 ))
- return NULL; // error
- // test if we already have it
- url_to_streamdataT::iterator it = url_to_streamdata.find( url );
- if( it != url_to_streamdata.end() )
- {
- // this socket was opened previously.
- return (*it).second->allocate_stream_data( handler_data,
- sih_resources );
- }
- // create the new one
- static_stream_dataC *sd = new static_stream_dataC( url, handler_data,
- sih_resources );
- if( ! sd->isValid() )
- return NULL;
- // only if it is valid
- url_to_streamdata[ url ] = sd;
- return sd->allocate_stream_data( handler_data, sih_resources );
- }
- void deallocate_stream_data( STREAM_DATA *stream_data )
- {
- // loop over all stream data's and locate which to clear
- url_to_streamdataT::iterator it;
-
- for( it = url_to_streamdata.begin(); it != url_to_streamdata.end();
- it++ )
- {
- if( (*it).second->check_by_stream_data( stream_data ))
- {
- (*it).second->deallocate_stream_data_struct();
- break;
- }
- }
- }
- void debug( int level, const char *format, ... )
- {
- # ifdef DEBUG
- va_list ap;
- va_start( ap, format );
- vprintf( format, ap );
- va_end( ap );
- # else
- va_list ap;
- va_start( ap, format );
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) + 1024 ];
- msg->type = msg->debug;
- msg->int_param = level;
- vsnprintf( msg->data, 1024, format, ap );
- va_end( ap );
- (*sih_resources->feedback)(sih_resources, msg,
- sizeof( mtv_msg ) + strlen( msg->data ) );
- delete msg;
- # endif
- }
- void send_timestamp( const PacketHeader *data )
- {
- // data may come with no timestamp. Don't send it
- if( data->when.tv_sec == 0 )
- return;
- if( data->when.tv_sec == previous_timestamp_sec ) // not changed
- return;
- previous_timestamp_sec = data->when.tv_sec;
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg )
- + sizeof( struct timeval ) ];
- msg->type = msg->timestamp;
- *( struct timeval * ) msg->data = data->when;
- (*sih_resources->feedback)(sih_resources, msg, sizeof( mtv_msg )
- + sizeof( struct timeval ) );
- delete msg;
- }
- };
- /** tests if it can handle this URL
- */
- SIH_probe_url(const SIH_RESOURCES *, const char *url)
- {
- if( !strncmp( url, plugin_url_prefix, sizeof( plugin_url_prefix )-1 ))
- return 0;
- return -1;
- }
- static void *SIH_init( const SIH_RESOURCES *sih_resources, const char *p )
- {
- printf("SIH_init with "%s"n", p);
- HANDLER_DATA *handler_data = new HANDLER_DATA( sih_resources );
- if (handler_data == NULL) {
- perror("SIH_init");
- return NULL;
- }
- return (void *)handler_data;
- }
- static int SIH_free( void *void_handler_data )
- {
- /* this call should be thread safe. do synchronisation */
- static bool execute_flag = false;
- if( execute_flag )
- return 0;
- execute_flag = true;
- HANDLER_DATA *handler_data = (HANDLER_DATA *)void_handler_data;
- printf("SIH_freen");
- delete handler_data;
- execute_flag = false;
- return 0;
- }
- static void *SIH_open_url( void *void_handler_data, const char *url )
- {
- # ifdef DEBUG
- cout << "SIH: open url " << url << endl;
- # endif
- HANDLER_DATA *handler_data = (HANDLER_DATA *)void_handler_data;
- /*
- * The parameter of SIH_open_url is an URL passed by the application
- * to the COM_SIH_OPEN_URL command. The URL identifies the stream
- * to be opened.
- *
- * SIH_open_url returns a pointer to an opaque object that
- * identifies the stream, i.e.
- * only your SIH plug-in known about the structure of the
- * opaque object. If the specified stream cannot be opened,
- * SIH_open_url returns NULL and sets errno.
- */
- STREAM_DATA *stream_data =
- handler_data->get_stream_dat_byUrl( url, handler_data );
- if (stream_data == NULL) {
- perror("SIH_open_url");
- return NULL;
- }
- return stream_data;
- }
- static int SIH_close( void *void_stream_data )
- {
- STREAM_DATA *stream_data = (STREAM_DATA *)void_stream_data;
- /*
- * Close the stream.
- *
- * This routine must close every fd opened by SIH_open_url and free the
- * PLUGIN_DATA
- * structure. All the resources allocated for the stream should be released.
- *
- * After a call to SIH_close, the "void *stream"
- * should be considered invalid.
- */
- printf("SIH_closen");
-
- // we deallocate stream_data, but do not actually close the socket.
- if( !stream_data->handler_data )
- {
- cerr << "SIH: deallocate null stream data" << endl;
- return 0;
- }
- stream_data->handler_data->deallocate_stream_data( stream_data );
- return 0;
- }
- static int SIH_fd( void *void_stream_data )
- {
- STREAM_DATA *stream_data = (STREAM_DATA *)void_stream_data;
- /*
- * Returns the file descriptor associated with the stream.
- *
- * This routine must return a real file descriptor.
- * The Player calls a blocking select on this file descriptor
- * and if there is something available for reading on it, calls SIH_read
- * (provided that there is buffer space available to read some input data).
- *
- * After calling SIH_fd, mtvp calls fcntl with F_SETFL
- * to set O_NONBLOCK on the fd.
- */
- printf( "SIH_fd returns %dn", stream_data->sock );
-
- return stream_data->sock;
- }
- static int SIH_read( void *void_stream_data, char *buf, size_t count )
- {
- STREAM_DATA *stream_data = (STREAM_DATA *)void_stream_data;
- # ifdef DEBUG
- // stream_data->handler_data->debug( 4, "SIH read %x, %dn", buf, count );
- # endif
- /*
- * SIH_read must behave exactly like the
- * regular read routine when used on a fd
- * that has O_NONBLOCK set.
- *
- * If data becomes available for reading on the fd returned by SIH_fd,
- * SIH_read is called. Note that mtvp may call SIH_read at any time, even
- * when there is no data available on the fd.
- *
- * SIH_read should always behave like a non-blocking read
- * (i.e. like read behaves when the flag O_NONBLOCK set).
- * This means that SIH_read should never block, even
- * if there is not enough data available.
- *
- * If there is no data available,
- * SIH_read should return -1 with errno set to EAGAIN.
- *
- * If there is less data available than requested,
- * SIH_read should copy all the data available into the
- * buffer and return the number of bytes copied.
- *
- * SIH_read returning 0 means EOF.
- *
- * SIH_read returning -1 with errno NOT set to EAGAIN
- * means an error occured. All read errors (except EAGAIN)
- * are considered like EOF.
- */
- // Don't call select - we have non-blocking sockets opened
- PacketHeader *data = 0;
- int data_available = 0;
- int num_of_buffers_we_need = count / maxPacketLength + 1;
- while( --num_of_buffers_we_need >= 0 )
- {
- // get empty buffer
- if( stream_data->handler_data->inusedBuffers.size() )
- {
- data = stream_data->handler_data->inusedBuffers.top();
- stream_data->handler_data->inusedBuffers.pop();
- }
- else
- data = (PacketHeader *)( new char[ maxPacketLength ] );
- if( !data )
- {
- stream_data->handler_data->debug( 1, "can't allocaten" );
- return -1;
- }
- // read
- int size = read( stream_data->sock, data, maxPacketLength );
- if( size == -1 && errno == EAGAIN )
- {
- // non-blocking call - no data in socket
- stream_data->handler_data->inusedBuffers.push( data );
- break;
- }
- if( size == -1 )
- {
- // error
- stream_data->handler_data->debug( 1, "recv error, %sn",
- sys_errlist[ errno ] );
- stream_data->handler_data->inusedBuffers.push( data );
- return size;
- }
- if( size == 0 ) // EOF
- {
- stream_data->handler_data->debug( 1,
- "no data in buffer, %sn", sys_errlist[ errno ] );
- stream_data->handler_data->inusedBuffers.push( data );
- break;
- }
- if( size < maxPacketLength )
- {
- stream_data->handler_data->debug( 1,
- "warning: udp packet corruptedn" );
- }
- stream_data->dataToPlay.push( data );
- stream_data->handler_data->send_timestamp( data );
- # ifdef DEBUG
- stream_data->handler_data->debug( 1, "got packet " );
- # endif
- }
-
- // maybe we have data in old buffers?
- int amount_read = 0;
- # ifdef DEBUG
- if( stream_data->dataToPlay.size() == 0 )
- stream_data->handler_data->debug( 1, "no data in buffersn" );
- # endif
- while( count > 0 && stream_data->dataToPlay.size() )
- {
- data = stream_data->dataToPlay.front();
- // reset sequence if needed
- if( data->sequence == 0 )
- stream_data->sequence = 0; // this is local counter
- // if sequence is corrupted, than you have to skip data until next
- // I-frame. Than reset sequence
- if( stream_data->sequence > 0 &&
- stream_data->sequence + 1 != data->sequence )
- {
- cout << "sequence is " << data->sequence << " expected " <<
- stream_data->sequence + 1 << endl;
- // reset sequence anyway, to start again
- stream_data->sequence = 0;
- bool got_it = false;
- while( stream_data->dataToPlay.size() )
- {
- // fing a seqience of 7 times FF FF FF FF FF
- int num = data->packetSize - data->dataOffset;
- for( unsigned char *ptr = (unsigned char*)data+data->dataOffset;
- num > 7; num--, ptr++ )
- {
- if( *ptr == 0xff && ptr[1] == 0xff && ptr[2] == 0xff &&
- ptr[3] == 0xff && ptr[4] == 0xff && ptr[5] == 0xff &&
- ptr[6] == 0xff )
- {
- got_it = true;
- data->dataOffset = data->packetSize - num;
- break;
- }
- }
- if( got_it == false )
- {
- stream_data->handler_data->inusedBuffers.push( data );
- stream_data->dataToPlay.pop();
- if( stream_data->dataToPlay.size() )
- data = stream_data->dataToPlay.front();
- continue;
- }
- break;
- }
- }
- // copy data to buffer
- if( !stream_data->dataToPlay.empty()
- && data->dataOffset < data->packetSize )
- {
- int size = min( data->packetSize - data->dataOffset, count );
- memcpy( buf, (char*)data + data->dataOffset, size );
- buf += size;
- data->dataOffset += size;
- amount_read += size;
- count -= size;
- }
- // remove packet from queue
- if( data->dataOffset == data->packetSize )
- {
- stream_data->handler_data->inusedBuffers.push( data );
- // update sequence when current packet is gone
- stream_data->sequence = data->sequence;
- stream_data->dataToPlay.pop();
- }
- }
- // If there is no data available,
- // SII_read should return -1 with errno set to EAGAIN
- if( amount_read == 0 )
- {
- errno = EAGAIN;
- return -1;
- }
- # ifdef DEBUG
- // cout << amount_read << " readn";
- # endif
- return amount_read;
- }
- static off_t SIH_lseek( void *void_stream_data, off_t offset, int whence )
- {
- STREAM_DATA *stream_data = (STREAM_DATA *)void_stream_data;
- stream_data->handler_data->debug( 3, "SIH lseek %offset %d, how %dn",
- offset, whence );
- return -1;
- }
- int SIH_register(SIH_EXPOSED *exposed, int version)
- {
- if (version == SIH_VERSION)
- {
- exposed->SIH_version = SIH_VERSION;
- exposed->SIH_init = SIH_init;
- exposed->SIH_free = SIH_free;
- exposed->SIH_probe_url = SIH_probe_url;
- exposed->SIH_open_url = SIH_open_url;
- exposed->SIH_close = SIH_close;
- exposed->SIH_read = SIH_read;
- exposed->SIH_lseek = SIH_lseek;
- exposed->SIH_cntl = NULL;
- exposed->SIH_fd = SIH_fd;
- exposed->SIH_size = NULL;
- exposed->SIH_buffer = NULL;
- }
- return SIH_VERSION;
- }