vpusher.h
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:10k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- /**@name Pusher
- *
- *
- *
- *
- *
- * @memo Video data push server
- * @version 1.0
- * @author Andrew Shuvalov
- */
- #ifndef _pusher_h_
- #define _pusher_h_
- #include <sys/time.h>
- #include <string>
- #include <queue>
- #include <deque>
- #include <map>
- #include <stack>
- #include <strstream>
- #include "timeval.h"
- #include "packet_header.h"
- /** event with the type and timestamp attached.
- Put events in priority queue
- */
- class Event {
- public:
- /** possible types
- */
- typedef enum { error, send, load, broadcast } typeT;
- typeT type;
- /** when type is broadcast - undefined
- */
- Sockaddr userSocketAddr;
- Timeval when;
- public:
- Event( typeT t ) : type( t ) {}
- Event( const Event &e )
- {
- *this = e;
- }
- const Event &operator =( const Event &e ) {
- if( this != &e ) {
- type = e.type;
- userSocketAddr = e.userSocketAddr;
- when = e.when;
- }
- return *this;
- }
- /** the compare op gets result in reverse order:
- the biggest element has the nearest execution time.
- It is needed for priority queue */
- bool operator <( const Event &e ) const {
- return e.when < when;
- }
- };
- class RemoteUser;
- // Note: don't use reference users[ addr ], because red-black tree
- // does not garantee that ref remains valid
- typedef map< Sockaddr, RemoteUser * > usersT;
- /** Each user have a queue of jobs
- */
- class JobElem {
- public:
- string filename;
- string fullpath;
- Timeval start;
- Timeval stop;
- Timeval playFrom;
- protected:
- //: read from file
- FILE *fp;
- unsigned fileLength;
- // control flow things:
- /** start time in local time */
- Timeval localStartTime;
- /** start time in movie time */
- Timeval movieStartTime;
- /** start file pos */
- unsigned startOffset;
- /** data sent since last local StartTime setup */
- unsigned datasent;
- /** data loaded since last local StartTime setup */
- unsigned dataloaded;
- /** bytes per second */
- unsigned datarate;
- public:
- JobElem() : fp( NULL ) {}
- JobElem( RemoteUser &user, istrstream &input, bool firstFile,
- struct tm &firstt );
- JobElem( const JobElem &j ) {
- *this = j;
- }
- const JobElem &operator =( const JobElem &j ) {
- if( this != &j ) {
- filename = j.filename;
- fullpath = j.fullpath;
- start = j.start;
- stop = j.stop;
- playFrom = j.playFrom;
- fp = j.fp;
- fileLength = j.fileLength;
- localStartTime = j.localStartTime;
- movieStartTime = j.movieStartTime;
- startOffset = j.startOffset;
- datasent = j.datasent;
- dataloaded = j.dataloaded;
- datarate = j.datarate;
- }
- return *this;
- }
- // do nothing!, because destr is called when stack is reorganized
- ~JobElem() {}
- void cleanup()
- {
- if( fp )
- fclose( fp );
- fp = NULL;
- }
- bool valid() { return fp != NULL; }
- void restart_count( unsigned loadDataChunk );
- bool send_packet( RemoteUser &user, PacketHeader *data );
- void calc_next_packet_time( Timeval &t );
- bool load_packet( RemoteUser &user,
- stack< PacketHeader * > &inusedBuffers );
- };
- /** all data associated with particular broadcast destination */
- struct OutputDestination {
- int sock;
- };
- /** one record per each remote user; Note that the broadcast stream from
- channel and broadcast channels output from this stream is the user too.
- */
- class RemoteUser {
- public:
- /** unique user id = socket */
- Sockaddr socketAddr;
- /** socket to write data to this user, ( write-only socket) */
- int outputSocket;
- typedef map< Sockaddr, OutputDestination * > outputSocketsT;
- /** if user is actually the broadcast destination - it's outputSocket remains
- -1, but the vector of outputSockets is used. For regular user
- outputSockets map is empty */
- outputSocketsT outputSockets;
- int priority;
- /** queue of jobs to do. For broadcast user it's empty */
- queue< JobElem > jobs;
- /** what chunk of data to read from disk per call */
- int loadDataChunk;
- /** allocate a vector of buffers to send to user. Each block starts
- with PacketHeader.
- We assume that MPEG stream can be broken on parts, each of them
- beginning at least of 6 times 0xff ( or more, i don't know? ) */
- queue< PacketHeader * > dataToSend;
- /** sequence number is attached to user and is not related to jobs */
- unsigned sequence;
- static const unsigned maxPacketLength;
- public:
- RemoteUser( const Sockaddr &addr ) : loadDataChunk( 1024 * 128 ),
- outputSocket( -1 )
- {
- socketAddr = addr;
- }
- ~RemoteUser();
- /** cleanup everything for this user */
- void cleanup_jobs( stack< PacketHeader * > &inusedBuffers );
- /** Only for the broadcast fake user */
- void add_broadcast_channel( const Sockaddr &addr );
- };
- class Appl;
- /** the command was read from UDP socket in a single shot, so implementation
- function should accept input stream, where the command itself is
- already fetched.
- */
- typedef map< string, void (Appl::*)( istrstream &cmd, const Sockaddr &addr ) >
- commandsMapT;
- /** topmost class
- */
- class Appl {
- /** Control socket. For all sockets - not used value is -1, check for it.
- The port number for this control socket is loaded from environment
- variable PUSH1CP, where 1 is the sequence number to determine the
- control/data port pairs. If binding this port fails - try the next
- PUSH2CP and so on. Then use the same sequence number to determine
- the acceptWaitSocket value from the environment PUSH1WP (wait port).
- The value of PUSH1CP should be equal to corresponding PushServN_Control
- and value of PUSH1WP - to PushServN_BroadcastInPort properties of the
- database server. For example 7078 & 7081
- */
- int controlSocket;
- /** socket to accept incoming broadcast TCP stream. If it -1 - server
- do not expect any incoming data anymore, it should have the opened
- stream at present, from the previous acceptWaitSocket. When connection
- is established, this socket is closed, when data connection is lost,
- this socket should be reopen again
- */
- int acceptWaitSocket;
- /** remember the port number of acceptWaitSocket for future reconnections */
- int acceptWaitPortnum;
- /** use this socket to read incoming TCP data stream.
- */
- int incomingDataSocket;
- /** this single sockaddr uniquely identify the incoming data stream. It
- is initialized in acceptNewConnection().
- The fact that push server accepts only one incoming stream from the
- acquisition server in fact does not reduce its capabilities -
- Video Server expects that we have multiple push servers, even on the
- single computer
- */
- Sockaddr incomingDataSockaddr;
- /** buffer to preload incoming data from the acquisition server.
- It is not circular - reset to zero when empty, move to start when
- close to the end. The fact that all data must be saved and pumped
- out to broadcast network means that this buffer is almost empty
- */
- char incomingDataBuffer[ maxPacketSize * 10 ];
- unsigned incomingDataBufferOffset;
- unsigned incomingDataBufferDataLength;
- /** current fname to store video stream
- */
- string videoFileName;
- /** file handle to store video stream
- */
- int videoFileHandle;
- /** fill this map in constructor
- */
- commandsMapT commandsMap;
- /** list of users, organized as map, key = sockaddr */
- usersT users;
- /** this is special user, which represents the outgoing broadcasts
- connections. This must be allocated once and forever
- */
- RemoteUser *broadcast_user;
- typedef priority_queue< Event > eQueueT;
- eQueueT eventQueue;
- // how many events must be in priority queue, at least and until there
- // are jobs in users list
- int queueLowWaterMark;
- // at most
- int queueHighWaterMark;
- // stack of inused buffers, each of size maxPackLen. Recicle space
- stack< PacketHeader * > inusedBuffers;
-
- /** store the next time when input sockets must be checked. Check must be
- forced when timeout expire, even if we have no time right now.
- Broadcast input TCP stream don't loose data, but let not waste our
- buffers. Load stream at least every 1/5 second.
- We do not use the event queue for this event.
- */
- Timeval nextTimeToCheckInput;
- /** stream to output all logging */
- ostream *log_output;
- /** debug level */
- int verbose_level;
- static const char env_prefix[];
- static const char env_postfix_cp[];
- static const char env_postfix_wp[];
- private:
- /** return true if no errors */
- bool bindAcceptWaitSocket();
- public:
- Appl( int argc, char **argv );
- void run();
- void shutdown();
- /** check if we have nothing to do and wait for incoming packets or
- check for input when nextTimeToCheckInput come
- */
- void wait_for_network_event();
- /** wait_for_network_event calls this to read data */
- void readControlData();
- /** after readControlData receive command accept_stream, it open the server
- socket and wait for incoming connection. Call this when new connection
- is pending after select call.
- At this point incomingDataSocket must be zero - we may have only 1
- connection
- */
- void acceptNewConnection();
- /** as is */
- void closeOldBroadcasting();
- /** nonblock read of all pending TCP stream data at the socket.
- */
- void loadIncomingData();
- /** forced to execute the next send event. This call is also resposible
- to remove jobs from list, when done
- */
- void execute_send_event();
- /** process next event, even it is too early to do it
- if this function is called, then we have nothing to do else
- */
- void execute_load_event();
- /** parse, call callbacks, but do not delete buf
- */
- void parse_command( const char *cmd, const Sockaddr &addr );
- /** callback to parse interface commands are postfixed with _impl */
- void play_impl( istrstream &cmd, const Sockaddr &addr );
- void stop_impl( istrstream &cmd, const Sockaddr &addr );
- void setfilename_impl( istrstream &cmd, const Sockaddr &addr );
- void stoprecording_impl( istrstream &cmd, const Sockaddr &addr );
- /** no parameters passed: push server should be configured to what
- port to listen from environment
- */
- void accept_stream_impl( istrstream &cmd, const Sockaddr &addr );
- void add_broadcast_destination_impl( istrstream &cmd, const Sockaddr &addr );
- /** util */
- void trim_string( string &s );
- /** log message if its level is equal or less than verbose level */
- void log( int level, const char *s, ... );
- };
- extern Appl *app;
- #ifndef MAX_PATH
- #define MAX_PATH 1024
- #endif
- #endif // _pusher_h_