vpusher.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:20k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include <stdio.h>
- #include <errno.h>
- #if HAVE_UNISTD_H
- #include <unistd.h>
- #endif
- #include <stdlib.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <netinet/in.h>
- #include <signal.h>
- #include <assert.h>
- #include <sys/stat.h>
- #include <fcntl.h>
- #include <string>
- #include <strstream>
- #include <iostream>
- #include <fstream>
- #include "sockaddr.h"
- #include "vpusher.h"
- #include "../../client/src/play_commands.h"
- #include "packet_header.h"
- //#define DEBUG
- Appl *app;
- const unsigned RemoteUser::maxPacketLength = 1472;
- const char Appl::env_prefix[] = "PUSH";
- const char Appl::env_postfix_cp[] = "CP";
- const char Appl::env_postfix_wp[] = "WP";
- void
- signalHandler( int sig )
- {
- cout << "got signal " << sig << endl;
- if( app )
- app->shutdown();
- exit(0);
- }
- // main
- int main( int argc, char **argv )
- {
- signal(SIGINT, signalHandler);
- signal(SIGSEGV, signalHandler);
- app = new Appl( argc, argv );
- app->run();
- }
- RemoteUser::~RemoteUser()
- {
- // close all sockets
- if( outputSocket != -1 )
- close( outputSocket );
- if( outputSockets.size() > 0 )
- for( outputSocketsT::iterator it = outputSockets.begin();
- it != outputSockets.end(); it++ )
- close( (*it).second->sock );
- // clear job queue
- while( !jobs.empty() )
- {
- jobs.front().cleanup();
- jobs.pop();
- }
- // and all buffers
- while( !dataToSend.empty() )
- {
- delete dataToSend.front();
- dataToSend.pop();
- }
- }
- void RemoteUser::cleanup_jobs( stack< PacketHeader * > &inusedBuffers )
- {
- while( !jobs.empty() ) // clear job queue
- {
- jobs.front().cleanup();
- jobs.pop();
- }
- // and all buffers
- while( !dataToSend.empty() )
- {
- inusedBuffers.push( dataToSend.front() );
- dataToSend.pop();
- }
- }
- void RemoteUser::add_broadcast_channel( const Sockaddr &addr )
- {
- // first check if this socket already exists :
- outputSocketsT::iterator it = outputSockets.find( addr );
- if( it == outputSockets.end() )
- {
- // create output socket and
- int sock = socket( PF_INET, SOCK_DGRAM, 0 );
- if( -1 == sock )
- {
- app->log( 0, "socket for broadcast: %s", sys_errlist[ errno ] );
- return;
- }
- // do not connect - use sendto
- // if( -1 == connect( sock, (struct sockaddr *) &addr.addr,
- // sizeof(addr.addr)))
- // {
- // app->log( 0, "connect broadcast: %s", sys_errlist[ errno ] );
- // return;
- // }
- OutputDestination *dest = new OutputDestination;
- dest->sock = sock;
- // add this socket to the list
- outputSockets[ addr ] = dest;
- app->log( 3, "add new broadcast target %s",
- addr.get_description().c_str() );
- }
- }
- // -----------------------------------------------------------------------
- // private members
- bool Appl::bindAcceptWaitSocket()
- {
- if( acceptWaitSocket != -1 ) // why not?
- close( acceptWaitSocket );
- acceptWaitSocket = socket( PF_INET, SOCK_STREAM, 0 );
- if( -1 == acceptWaitSocket )
- {
- perror( "socket " );
- close( acceptWaitSocket );
- acceptWaitSocket = -1;
- return false;
- }
- struct sockaddr_in name;
- // if control port is already binded - the wait connection port
- // should be binded as well, or this is error
- name.sin_family = AF_INET;
- name.sin_port = htons ( acceptWaitPortnum );
- name.sin_addr.s_addr = htonl (INADDR_ANY);
- if( bind( acceptWaitSocket, (struct sockaddr *)&name, sizeof (name)) < 0)
- {
- perror ("bind");
- close( acceptWaitSocket );
- acceptWaitSocket = -1;
- return false;
- }
- // and listen
- if( listen( acceptWaitSocket, 1 ) < 0 )
- {
- perror ("listen");
- close( acceptWaitSocket );
- acceptWaitSocket = -1;
- return false;
- }
- log( 1, "accept wait socket successfully binded, port %d",
- acceptWaitPortnum );
- return true;
- }
- // public members
- Appl::Appl( int argc, char **argv ) :
- // all sockets should be -1
- controlSocket( -1 ), acceptWaitSocket( -1 ), incomingDataSocket( -1 ),
- videoFileHandle( -1 ), broadcast_user( NULL ),
- queueLowWaterMark( 100 ), queueHighWaterMark( 1000 ),
- incomingDataBufferOffset(0), incomingDataBufferDataLength(0),
- log_output( NULL ), verbose_level( 3 )
- {
- // logging: if log_output is still zero - than set it to default file:
- if( !log_output )
- {
- log_output = new ofstream( "push.log", ios::out | ios::app );
- }
- log( 0, "" );
- log( 0, "start push server application" );
- controlSocket = socket( PF_INET, SOCK_DGRAM, 0 );
- if( -1 == controlSocket )
- {
- perror( "socket " );
- exit( 1 );
- }
- // load control socket port from the environment
- // try each possible combination in sequence
- for( int sequence = 1; ; sequence++ )
- {
- char env[20];
- sprintf( env, "%s%d%s", env_prefix, sequence, env_postfix_cp );
- char *control_port_s = getenv( env );
- if( control_port_s == NULL )
- {
- cerr << "environment " << env << " is not setn";
- close( controlSocket );
- exit(1);
- }
- int control_port = atoi( control_port_s );
- sprintf( env, "%s%d%s", env_prefix, sequence, env_postfix_wp );
- char *wait_port_s = getenv( env );
- if( wait_port_s == NULL )
- {
- cerr << "environment " << env << " is not setn";
- close( controlSocket );
- exit(1);
- }
- acceptWaitPortnum = atoi( wait_port_s );
- // first try the control port
- struct sockaddr_in name;
- name.sin_family = AF_INET;
- name.sin_port = htons ( control_port );
- name.sin_addr.s_addr = htonl (INADDR_ANY);
- if (bind ( controlSocket, (struct sockaddr *) &name, sizeof (name)) < 0)
- {
- cerr << "port " << control_port_s << " is busy, try next onen";
- continue;
- }
- // try to open this port
- bool result = bindAcceptWaitSocket();
- if( result == false )
- {
- close( controlSocket );
- exit(1);
- }
- break;
- }
- // next - create the user for broadsast
- broadcast_user = new RemoteUser( Sockaddr() );
- // fill this map in constructor
- commandsMap.insert( commandsMapT::value_type( string( _play_cmd_ ),
- &play_impl ));
- commandsMap.insert( commandsMapT::value_type( string( _stop_cmd_ ),
- &stop_impl ));
- commandsMap.insert( commandsMapT::value_type( string( _accept_stream_ ),
- &accept_stream_impl ));
- commandsMap.insert( commandsMapT::value_type( string( _set_filename_ ),
- &setfilename_impl ));
- commandsMap.insert( commandsMapT::value_type( string( _stop_recording_ ),
- &stoprecording_impl ));
-
- commandsMap.insert
- ( commandsMapT::value_type( string( _add_broadcast_destination_ ),
- &add_broadcast_destination_impl ));
-
- nextTimeToCheckInput.now();
- }
- void Appl::run()
- {
- while( true )
- {
- wait_for_network_event();
- // now it's time to fetch an event from queue and process it
- if( eventQueue.size() > 0 )
- {
- const Event &e = eventQueue.top(); // nearest event
- switch( e.type )
- {
- case e.send:
- execute_send_event();
- break;
- case e.load:
- execute_load_event();
- break;
- case e.broadcast:
- break;
- case e.error:
- cerr << "event error!n";
- break;
- }
- }
- }
- }
- void Appl::shutdown()
- {
- exit( 1 );
- }
- /** if there are no jobs in queue,
- we can wait in select 3 sec, for example
- */
- void Appl::wait_for_network_event()
- {
- Timeval whenNextEvent;
- // if there are no jobs in queue,
- // we can wait in select 3 sec, for example
- Timeval now;
- now.now();
- whenNextEvent = now;
- whenNextEvent.tv_sec() += 3;
- if( eventQueue.size() > 0 )
- {
- const Event &e = eventQueue.top();
- if( e.when < whenNextEvent )
- whenNextEvent = e.when;
- }
- while( true )
- {
- # ifdef DEBUG
- // cout << "diff= " << whenNextEvent << "- " << now << endl;
- # endif
- if( whenNextEvent < now ) // no time, go out
- break;
- Timeval diff = whenNextEvent - now;
- if( now < nextTimeToCheckInput ) // not yet -> check free time
- {
- // call select only if we have time for this
- if( diff.tv_sec() == 0 && diff.tv_usec() < 10000 ) // only 1/100 sec
- break;
- }
- // BTW, when to do this next time?
- nextTimeToCheckInput = now;
- nextTimeToCheckInput.tv_usec() += 200000; // next 1/5 second
- nextTimeToCheckInput.normalize();
- # ifdef DEBUG
- // cout << "Wait for the network event, diff " << diff << endl;
- # endif
- // we have time. call select
- fd_set set;
- FD_ZERO( &set );
- if( controlSocket != -1 )
- FD_SET( controlSocket, &set );
- // we may have been waiting for incoming connection
- if( acceptWaitSocket != -1 )
- FD_SET( acceptWaitSocket, &set );
- if( incomingDataSocket != -1 )
- FD_SET( incomingDataSocket, &set );
- int ret = select( FD_SETSIZE, &set, NULL, NULL, &diff.val() );
- if( -1 == ret )
- {
- cerr << "line " << __LINE__ << endl;
- perror ("select");
- exit (1);
- }
- if( 0 == ret ) // timeout
- break;
- // ret holds the value of waiting connections. use FD_ISSET
- if( FD_ISSET( controlSocket, &set ))
- {
- readControlData();
- ret --; // tell that one less is left
- }
- if( ret > 0 && acceptWaitSocket != -1 &&
- FD_ISSET( acceptWaitSocket, &set ))
- {
- acceptNewConnection();
- ret --; // tell that one less is left
- }
- if( ret > 0 && incomingDataSocket != -1 &&
- FD_ISSET( incomingDataSocket, &set ))
- {
- loadIncomingData();
- ret --; // tell that one less is left
- }
- now.now();
- }
- }
- void Appl::readControlData()
- {
- # ifdef DEBUG
- cout << "Read control data" << endl;
- # endif
- Sockaddr user_sockaddr;
- socklen_t namelen = sizeof( user_sockaddr );
-
- char *buf = new char[ 1024 * 16 ];
- int len = recvfrom( controlSocket, buf, 1024 * 16, 0,
- (struct sockaddr *) &user_sockaddr.addr, &namelen );
- if( -1 == len ) // nothing
- {
- delete buf;
- return;
- }
- parse_command( buf, user_sockaddr );
- delete buf;
- }
- void Appl::acceptNewConnection()
- {
- closeOldBroadcasting();
- log( 1, "new incoming data connection" );
- unsigned int size = sizeof( incomingDataSockaddr.addr );
- incomingDataSocket = accept( acceptWaitSocket, (struct sockaddr *)
- &incomingDataSockaddr.addr, &size );
- if( incomingDataSocket < 0 )
- {
- perror( "accept" );
- incomingDataSocket = -1;
- }
- // it should operate in non-blocking mode
- int oldflags = fcntl( incomingDataSocket, F_GETFL, 0 );
- if( oldflags == -1 )
- {
- close( incomingDataSocket );
- incomingDataSocket = -1;
- perror( "fcntl" );
- return;
- }
- oldflags |= O_NONBLOCK;
- if( -1 == fcntl( incomingDataSocket, F_SETFL, oldflags ))
- {
- close( incomingDataSocket );
- incomingDataSocket = -1;
- perror( "fcntl" );
- return;
- }
- log( 1, "incoming connection from acq server accepted" );
-
- // we don't expect extra connections here:
- close( acceptWaitSocket );
- acceptWaitSocket = -1;
- }
- void Appl::closeOldBroadcasting()
- {
- if( incomingDataSocket != -1 )
- {
- close( incomingDataSocket );
- incomingDataSocket = -1;
- }
- if( videoFileHandle != -1 )
- {
- close( videoFileHandle );
- videoFileHandle = -1;
- }
- }
- void Appl::loadIncomingData()
- {
- // data should come in pseudo - packet form: header + data
- // check the buffer for reset or shift
- if( incomingDataBufferOffset == incomingDataBufferDataLength )
- {
- // reset
- incomingDataBufferOffset = incomingDataBufferDataLength = 0;
- }
- if( incomingDataBufferOffset > sizeof( incomingDataBuffer ) / 2 )
- {
- // shift
- memcpy( incomingDataBuffer, incomingDataBuffer +
- incomingDataBufferOffset, incomingDataBufferDataLength );
- incomingDataBufferOffset = 0;
- }
- // load as much as we can into buffer
- int len = read( incomingDataSocket, incomingDataBuffer +
- incomingDataBufferOffset + incomingDataBufferDataLength,
- sizeof( incomingDataBuffer ) - incomingDataBufferDataLength
- - incomingDataBufferOffset );
- if( !len ) // nothing
- return;
- if( len < 0 )
- {
- // this link is dead
- perror( "read" );
- close( incomingDataSocket );
- incomingDataSocket = -1;
- incomingDataSockaddr.addr.sin_port = 0;
- log( 1, "incoming channel is dead, wait for new connection" );
- // second step - open the incoming wait socket to accept the next
- // connection
- bindAcceptWaitSocket();
- return;
- }
- // data expands...
- incomingDataBufferDataLength += len;
- /** loop over small packets in buffer */
- while( incomingDataBufferDataLength >= sizeof( PacketHeader ))
- {
- PacketHeader *ph = (PacketHeader *)( incomingDataBuffer +
- incomingDataBufferOffset );
- if( strncmp( ph->type, mpegT, sizeof( mpegT )))
- {
- log( 1, "incoming data have unknown format" );
- ph = NULL;
- // that's not our data - 'mpeg' magic key expected
- char *ptr = incomingDataBuffer + incomingDataBufferOffset;
- // find the first char
- char *ptrend = incomingDataBuffer + incomingDataBufferOffset +
- incomingDataBufferDataLength - sizeof( mpegT );
- char ch_to_find = mpegT[0];
- while( ptr < ptrend )
- {
- if( *ptr != ch_to_find || strncmp( ptr, mpegT, sizeof( mpegT )))
- {
- ptr++;
- continue;
- }
- // that's it
- ph = (PacketHeader *) ptr;
- break;
- }
- if( ph == NULL )
- return;
- }
- // do we have the entire packet in memory?
- if( ph->packetSize > incomingDataBufferDataLength ) // not yet
- return;
- // doesn't matter what happens next - but we may adjust
- // buffer pointers now
- incomingDataBufferOffset += ph->packetSize;
- incomingDataBufferDataLength -= ph->packetSize;
- // log( 3, "got incoming packet" );
- // store data in file
- if( videoFileHandle != -1 )
- {
- int wlen = write( videoFileHandle, ((char*)ph) + ph->dataOffset,
- ph->dataSize );
- if( -1 == wlen )
- {
- close( videoFileHandle );
- videoFileHandle = -1;
- perror( "write" );
- return;
- }
- }
- // now, if we have broadcast - we must put data in queue
- // broadcast is the special user
- if( broadcast_user != NULL )
- {
- // send the entire packet as-is
- RemoteUser::outputSocketsT::iterator sock_it;
- # ifdef DEBUG
- log( 3, "sending data to %d targets",
- broadcast_user->outputSockets.size() );
- # endif
- for( sock_it = broadcast_user->outputSockets.begin(); sock_it !=
- broadcast_user->outputSockets.end(); sock_it++ )
- {
- OutputDestination *dest = (*sock_it).second;
- if( dest->sock == -1 ) // set to error
- {
- log( 1, "sending data packet: socket is closed" );
- continue;
- }
- int size = sendto( dest->sock, ph, ph->packetSize, 0,
- (struct sockaddr *)&(*sock_it).first.addr,
- sizeof( (*sock_it).first.addr ) );
- if( size == -1 )
- {
- log( 1, "sending data packet to %s failed: %s",
- (*sock_it).first.get_description().c_str(),
- sys_errlist[ errno ] );
- // do not close the socket - client should bind it
- }
- }
- }
- } // loop over packets
- }
- void Appl::execute_send_event()
- {
- if( eventQueue.size() == 0 ) // nothing to do
- return;
-
- const Event e = eventQueue.top(); // nearest event
- if( e.type != e.send ) // how this happens?
- return;
- // and pop immediately
- eventQueue.pop();
- if( users.find( e.userSocketAddr ) == users.end() )
- {
- cout << "No such user (send event)" << endl;
- return;
- }
- RemoteUser &user = *users[ e.userSocketAddr ];
- # ifdef DEBUG
- {
- static Timeval prev;
- cout << "send event. diff from prev is: " << e.when - prev << endl;
- prev = e.when;
- }
- # endif
- if( !user.jobs.size() )
- {
- // this is not the error, just nothing to do
- cout << "this user has the empty job queue (send event)" << endl;
- return;
- }
- // current job
- JobElem &job = user.jobs.front();
-
- // packet to send
- if( !user.dataToSend.size() )
- {
- cout << "this user has the empty send queue" << endl;
- // and create new load event
- Event newLe( Event::load );
- newLe.userSocketAddr = user.socketAddr;
- newLe.when.now().tv_usec() += 1000;
- eventQueue.push( newLe );
- # ifdef DEBUG
- cout << "RELOAD at " << newLe.when << " top is " <<
- eventQueue.top().when << endl;
- # endif
- // and new send event
- Event newSe( Event::send );
- newSe.userSocketAddr = e.userSocketAddr;
- newSe.when = newLe.when;
- newSe.when.tv_usec() += 1000;
- eventQueue.push( newSe );
- return;
- }
- PacketHeader *data = user.dataToSend.front();
- // and pop immediately
- user.dataToSend.pop();
- // put to inused buffer now
- inusedBuffers.push( data );
- if( !data )
- {
- cout << "bad data pointer" << endl;
- return;
- }
-
- // this command actually send data to client
- if( !job.send_packet( user, data ))
- {
- // strong policy - can't send - remove from the queue...
- // client can ask to send more data
- return;
- }
- // calculate time to play next packet
- Timeval next;
- job.calc_next_packet_time( next );
- Event newEv = e;
- newEv.type = newEv.send;
- // // try to preload a bit
- // Timeval now;
- // now.now();
- // if( next.tv_sec() > now.tv_sec() + 5 )
- // next.tv_sec() -= 5;
- // else if( next > now )
- // {
- // Timeval delta = newEv.when - now;
- // delta *= (float) 0.5;
- // next = now + delta;
- // }
- // else
- // next = now;
- newEv.when = next;
- // to make happens following load event to be 'a bit' before
- newEv.when.tv_usec() ++;
- // replace event
- eventQueue.push( newEv );
- // manage next load event
- if( user.dataToSend.size() >= user.loadDataChunk /
- user.maxPacketLength )
- return; // no need to load
- Event newLe( Event::load );
- newLe.userSocketAddr = e.userSocketAddr;
- newLe.when = next;
- # ifdef DEBUG
- cout << "top " << eventQueue.top().when << " " <<
- eventQueue.top().userSocketAddr << endl;
- # endif
- eventQueue.push( newLe );
- # ifdef DEBUG
- cout << "NEXT LOAD for port " << user.outputSocket << " at "
- << newLe.when << "addr " << user.socketAddr << endl;
- cout << "top " << eventQueue.top().when << " " <<
- eventQueue.top().userSocketAddr << endl;
- # endif
- }
- void Appl::execute_load_event()
- {
- if( eventQueue.size() == 0 ) // nothing to do
- return;
- const Event e = eventQueue.top(); // nearest event
- if( e.type != e.load )
- return;
- // and pop
- eventQueue.pop();
- if( users.find( e.userSocketAddr ) == users.end() )
- {
- cout << "No such user (load event)" << endl;
- return;
- }
-
- RemoteUser &user = *users[ e.userSocketAddr ];
- # ifdef DEBUG
- {
- Timeval now;
- now.now().tv_usec() += 5000;
- cout << "LOAD event for port " << user.outputSocket << " time "
- << e.when << "addr " << user.socketAddr << endl;
- }
- assert( user.socketAddr == e.userSocketAddr );
- # endif
- if( !user.jobs.size() )
- {
- cout << "this user has the empty job queue (load event)" << endl;
- return;
- }
-
- // current job
- JobElem &job = user.jobs.front();
- // if fp == NULL -> something wrong, don't load and remove queue
- if( !job.valid() )
- {
- cout << "file descr is zero, file " << job.filename.c_str() << endl;
- return;
- }
- // load data
- if( !job.load_packet( user, inusedBuffers ))
- {
- return;
- }
- }
- void Appl::log( int level, const char *s, ... )
- {
- if( level > verbose_level || !log_output )
- return;
- Timeval tv;
- tv.now();
- va_list ap;
- va_start( ap, s );
- char buf[1024];
- if( s[0] )
- {
- vsnprintf( buf, sizeof( buf ), s, ap );
- *log_output << tv << buf << endl;
- }
- else
- *log_output << endl;
- va_end( ap );
- }