vclient_sii.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:12k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include <stdlib.h>
- #include <string.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <arpa/inet.h>
- #undef HAVE_UNISTD_H
- #include <mtvp/mtvp.h>
- #include <mtvp/sii_api.h>
- #undef assert
- #include <assert.h>
- #include "session.h"
- #include "playback_window.h"
- #include "../../pusher/src/packet_header.h"
- #include "../../pusher/src/timeval.h"
- #define DEBUG
- /** both playback window and SII plugin must be sinchronized
- to the same mode
- */
- typedef enum { stopped, playback, broadcast } operatingModeT;
- /** playback, broadcast, stopped mode.
- */
- operatingModeT operatingMode = stopped;
- static int playbackSocketIn = -1;
- static int broadcastSocketIn = -1;
- static int sequence;
- static void close_sock();
- /** UDP packet is loaded into FIFO buffer, including header. When output to
- the playerSocket, header must be stripped */
- static queue< PacketHeader * > dataToPlay;
- static stack< PacketHeader * > inusedBuffers;
- const unsigned maxPacketLength = 1472;
- static void debug( int level, const char *format, ... )
- {
- # ifdef DEBUG
- va_list ap;
- va_start( ap, format );
- vprintf( format, ap );
- va_end( ap );
- # else
- va_list ap;
- va_start( ap, format );
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) + 1024 ];
- msg->type = msg->debug;
- msg->int_param = level;
- vsnprintf( msg->data, 1024, format, ap );
- va_end( ap );
- sii_feedback( msg, sizeof( mtv_msg ) + strlen( msg->data ) );
- delete msg;
- # endif
- }
- static void send_timestamp( const PacketHeader *data )
- {
- // data may come with no timestamp. Don't send it
- if( data->when.tv_sec == 0 )
- return;
- static int previous_timestamp_sec;
- if( data->when.tv_sec == previous_timestamp_sec ) // not changed
- return;
- previous_timestamp_sec = data->when.tv_sec;
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg )
- + sizeof( struct timeval ) ];
- msg->type = msg->timestamp;
- *( struct timeval * ) msg->data = data->when;
- sii_feedback( msg, sizeof( mtv_msg ) + sizeof( struct timeval ) );
- delete msg;
- }
- static void send_port_id_watch( int portnum )
- {
- mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) ];
- msg->type = msg->port_id_watch;
- msg->int_param = portnum;
- sii_feedback( msg, sizeof( mtv_msg ) );
- delete msg;
- }
- static void *open_sock( mtv_open_struct *os )
- {
- if( ! os )
- return NULL;
- if( os->action != os->open )
- {
- debug( 0, "SII: call open with close commandn" );
- return NULL;
- }
- debug( 3, "SII: open socket, ports %d and %dn", os->playback_input_socket,
- os->broadcast_input_socket );
- if( playbackSocketIn == -1 && os->playback_input_socket != -1 )
- {
- playbackSocketIn = socket( PF_INET, SOCK_DGRAM, 0 );
- if( -1 == playbackSocketIn )
- {
- debug( 1, "SII plugin: socket fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- // create input socket
- // assign local address
- struct sockaddr_in saddr;
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons ( os->playback_input_socket );
- saddr.sin_addr.s_addr = htonl (INADDR_ANY); // input
- if( -1 == bind ( playbackSocketIn, (struct sockaddr *) &saddr,
- sizeof(saddr)))
- {
- playbackSocketIn = -1;
- debug( 1, "SII plugin: bind playback %d port fails, %sn",
- os->playback_input_socket, sys_errlist[ errno ] );
- return NULL;
- }
- // set non-block mode
- int oldflags = fcntl( playbackSocketIn, F_GETFL, 0 );
- if( oldflags == -1 )
- {
- playbackSocketIn = -1;
- debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- oldflags |= O_NONBLOCK;
- if( -1 == fcntl( playbackSocketIn, F_SETFL, oldflags ))
- {
- playbackSocketIn = -1;
- debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- debug( 2, "SII: playbackSocketIn is %dn", playbackSocketIn );
- }
- if( broadcastSocketIn == -1 && os->broadcast_input_socket != -1 )
- {
- broadcastSocketIn = socket( PF_INET, SOCK_DGRAM, 0 );
- if( -1 == broadcastSocketIn )
- {
- debug( 1, "SII plugin: socket fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- // create input socket
- // assign local address
- struct sockaddr_in saddr;
- saddr.sin_family = AF_INET;
- saddr.sin_port = htons ( os->broadcast_input_socket );
- saddr.sin_addr.s_addr = htonl (INADDR_ANY); // input
- if( -1 == bind ( broadcastSocketIn, (struct sockaddr *) &saddr,
- sizeof(saddr)))
- {
- broadcastSocketIn = -1;
- debug( 1, "SII plugin: bind broadcast %d port fails, %sn",
- os->broadcast_input_socket, sys_errlist[ errno ] );
- return NULL;
- }
- // set non-block mode
- int oldflags = fcntl( broadcastSocketIn, F_GETFL, 0 );
- if( oldflags == -1 )
- {
- broadcastSocketIn = -1;
- debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- oldflags |= O_NONBLOCK;
- if( -1 == fcntl( broadcastSocketIn, F_SETFL, oldflags ))
- {
- broadcastSocketIn = -1;
- debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
- return NULL;
- }
- debug( 2, "SII: broadcastSocketIn is %dn", broadcastSocketIn );
- }
- operatingMode = stopped; // at the beginning
- sequence = 0;
- return (void *)1;
- }
- /** close socket
- We must pay attention to the bug that if we close the socket and
- immediately re-open it, it will report that socket is in use.
- So we must have two sockets and keep them open all the time.
- Close sockets at the end of use
- */
- static void close_sock()
- {
- if( playbackSocketIn > 0 )
- {
- shutdown( playbackSocketIn, 2 );
- close( playbackSocketIn );
- }
- playbackSocketIn = -1;
- if( broadcastSocketIn > 0 )
- {
- shutdown( broadcastSocketIn, 2 );
- close( broadcastSocketIn );
- }
- broadcastSocketIn = -1;
- }
- void *SII_open( const void *data, u_int size )
- {
- mtv_open_struct *os = (mtv_open_struct *)data;
- if( size != sizeof( mtv_open_struct ))
- {
- debug( 1, "SII plugin: bad sizen" );
- return NULL;
- }
- debug( 1, "SII plugin: open. Listen for sockets %d and %dn",
- os->playback_input_socket, os->broadcast_input_socket );
- return open_sock( os );
- }
- int SII_close(void *)
- {
- debug( 3, "SII closen" );
- close_sock();
- }
- int SII_fd(void *)
- {
- int reply;
- reply = operatingMode == broadcast? broadcastSocketIn: playbackSocketIn;
- debug( 3, "SII get fd, reply is %dn", reply );
- // inform application that this port is watched
- send_port_id_watch( reply );
- return reply;
- }
- int SII_cntl( void *, const void *data, u_int size )
- {
- debug( 3, "SII cntln" );
- mtv_open_struct *os = (mtv_open_struct *)data;
- if( os->action == os->open )
- {
- debug( 3, "SII cntl: open sockets, ports %d and %dn",
- os->playback_input_socket, os->broadcast_input_socket );
- // reopen the socket with different port number
- open_sock( os );
- operatingMode = stopped;
- return 1;
- }
- else if( os->action == os->close )
- {
- debug( 3, "SII cntl: close socket, id %d and %dn",
- playbackSocketIn, broadcastSocketIn );
- close_sock();
- operatingMode = stopped;
- return 1;
- }
- else if( os->action == os->close )
- {
- debug( 3, "SII cntl: stopn",
- playbackSocketIn, broadcastSocketIn );
- // do not close sockets
- operatingMode = stopped;
- return 1;
- }
- else if( os->action == os->goplayback )
- {
- debug( 3, "SII cntl: goplaybackn" );
- operatingMode = playback;
- }
- else if( os->action == os->gobroadcast )
- {
- debug( 3, "SII cntl: gobroadcastn" );
- operatingMode = broadcast;
- }
- return 0;
- }
- int SII_read(void *, char *buf, size_t count)
- {
- debug( 4, "SII read %x, %dn", buf, count );
- if( operatingMode == stopped )
- return 0;
- PacketHeader *data = 0;
- int data_available = 0;
- int num_of_buffers_we_need = count / maxPacketLength + 1;
- int our_socket = operatingMode == playback? playbackSocketIn:
- broadcastSocketIn;
- while( --num_of_buffers_we_need >= 0 )
- {
- // get empty buffer
- if( inusedBuffers.size() )
- {
- data = inusedBuffers.top();
- inusedBuffers.pop();
- }
- else
- data = (PacketHeader *)( new char[ maxPacketLength ] );
- if( !data )
- {
- debug( 1, "can't allocaten" );
- return -1;
- }
- fd_set set;
- FD_ZERO( &set );
- FD_SET( our_socket, &set );
- // timeout is zero: read what we have, don't spend time to wait
- Timeval tv( 0, 0 );
- int ret = select( FD_SETSIZE, &set, NULL, NULL, &tv.val() );
- if( -1 == ret )
- {
- perror ("select");
- return -1;
- }
- if( 0 == ret ) // timeout
- {
- debug( 3, "SII: read called, but no data detectedn" );
- // don't return 0 immediately
- // it may happens that we have data in buffers
- if( dataToPlay.size() )
- break;
- else
- {
- errno = EAGAIN; // this means that no data in stream,
- // // call later again
- return -1;
- }
- }
- int size = read( our_socket, data, maxPacketLength );
- if( size == -1 && errno == EAGAIN )
- {
- // non-blocking call - no data in socket
- inusedBuffers.push( data );
- break;
- }
- if( size == -1 )
- {
- // error
- debug( 1, "recv error, %sn", sys_errlist[ errno ] );
- inusedBuffers.push( data );
- return size;
- }
- if( size == 0 ) // EOF
- {
- debug( 1, "no data in buffer, %sn", sys_errlist[ errno ] );
- inusedBuffers.push( data );
- break;
- }
- if( size < maxPacketLength )
- {
- debug( 1, "warning: udp packet corruptedn" );
- }
- dataToPlay.push( data );
- send_timestamp( data );
- }
- // maybe we have data in old buffers?
- int amount_read = 0;
- # ifdef DEBUG
- if( dataToPlay.size() == 0 )
- debug( 1, "no data in buffersn" );
- # endif
- while( count > 0 && dataToPlay.size() )
- {
- data = dataToPlay.front();
- // reset sequence if needed
- if( data->sequence == 0 )
- sequence = 0;
- // if sequence is corrupted, than you have to skip data until next
- // I-frame. Than reset sequence
- if( sequence > 0 && sequence + 1 != data->sequence )
- {
- cout << "sequence is " << data->sequence << " expected " <<
- sequence + 1 << endl;
- // reset sequence anyway
- sequence = 0;
- bool got_it = false;
- while( dataToPlay.size() )
- {
- // fing a seqience of 7 times FF FF FF FF FF
- int num = data->packetSize - data->dataOffset;
- for( unsigned char *ptr = (unsigned char*)data+data->dataOffset;
- num > 7; num--, ptr++ )
- {
- if( *ptr == 0xff && ptr[1] == 0xff && ptr[2] == 0xff &&
- ptr[3] == 0xff && ptr[4] == 0xff && ptr[5] == 0xff &&
- ptr[6] == 0xff )
- {
- got_it = true;
- data->dataOffset = data->packetSize - num;
- break;
- }
- }
- if( got_it == false )
- {
- inusedBuffers.push( data );
- dataToPlay.pop();
- if( dataToPlay.size() )
- data = dataToPlay.front();
- continue;
- }
- break;
- }
- }
- if( !dataToPlay.empty() && data->dataOffset < data->packetSize )
- {
- int size = min( data->packetSize - data->dataOffset, count );
- memcpy( buf, (char*)data + data->dataOffset, size );
- buf += size;
- data->dataOffset += size;
- amount_read += size;
- count -= size;
- }
- if( data->dataOffset == data->packetSize )
- {
- inusedBuffers.push( data );
- // update sequence when current packet is gone
- sequence = data->sequence;
- dataToPlay.pop();
- }
- }
- // If there is no data available,
- // SII_read should return -1 with errno set to EAGAIN
- if( amount_read == 0 )
- {
- errno = EAGAIN;
- return -1;
- }
- cout << amount_read << " readn";
- return amount_read;
- }
- off_t SII_lseek( void *, off_t offs, int how )
- {
- debug( 3, "SII lseek %offset %d, how %dn", offs, how );
- return -1;
- }