vpusher.h
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:10k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. /**@name Pusher
  6.  *
  7.  *
  8.  *
  9.  *
  10.  *
  11.  * @memo    Video data push server
  12.  * @version 1.0
  13.  * @author  Andrew Shuvalov
  14.  */
  15. #ifndef _pusher_h_
  16. #define _pusher_h_
  17. #include <sys/time.h>
  18. #include <string>
  19. #include <queue>
  20. #include <deque>
  21. #include <map>
  22. #include <stack>
  23. #include <strstream> 
  24. #include "timeval.h"
  25. #include "packet_header.h"
  26. /** event with the type and timestamp attached. 
  27.     Put events in priority queue
  28. */
  29. class Event {
  30. public:
  31.   /** possible types
  32.    */
  33.   typedef enum { error, send, load, broadcast } typeT;
  34.   typeT type;
  35.   /** when type is broadcast - undefined 
  36.    */
  37.   Sockaddr userSocketAddr;
  38.   Timeval when;
  39. public:
  40.   Event( typeT t ) : type( t ) {}
  41.   Event( const Event &e ) 
  42.     {
  43.       *this = e;
  44.     }
  45.   const Event &operator =( const Event &e ) {
  46.     if( this != &e ) {
  47.       type = e.type;
  48.       userSocketAddr = e.userSocketAddr;
  49.       when = e.when;
  50.     }
  51.     return *this;
  52.   }
  53.   /** the compare op gets result in reverse order:
  54.       the biggest element has the nearest execution time.
  55.       It is needed for priority queue */
  56.   bool operator <( const Event &e ) const {
  57.     return e.when < when;
  58.   }
  59. };
  60. class RemoteUser;
  61. // Note: don't use reference users[ addr ], because red-black tree
  62. // does not garantee that ref remains valid 
  63. typedef map< Sockaddr, RemoteUser * > usersT;
  64. /** Each user have a queue of jobs
  65.  */
  66. class JobElem {
  67. public:
  68.   string filename;
  69.   string fullpath;
  70.   Timeval start;
  71.   Timeval stop;
  72.   Timeval playFrom;
  73. protected:
  74.   //: read from file
  75.   FILE *fp;
  76.   unsigned fileLength;
  77.   // control flow things:
  78.   /** start time in local time */
  79.   Timeval localStartTime;
  80.   /** start time in movie time */
  81.   Timeval movieStartTime;
  82.   /** start file pos */
  83.   unsigned startOffset;
  84.   /** data sent since last local StartTime setup */
  85.   unsigned datasent;
  86.   /** data loaded since last local StartTime setup */
  87.   unsigned dataloaded;
  88.   /** bytes per second  */
  89.   unsigned datarate;
  90. public:
  91.   JobElem() : fp( NULL ) {}
  92.   JobElem( RemoteUser &user, istrstream &input, bool firstFile,
  93.    struct tm &firstt );
  94.   JobElem( const JobElem &j ) {
  95.     *this = j;
  96.   }
  97.   const JobElem &operator =( const JobElem &j ) {
  98.     if( this != &j ) {
  99.       filename = j.filename;
  100.       fullpath = j.fullpath;
  101.       start = j.start;
  102.       stop = j.stop;
  103.       playFrom = j.playFrom; 
  104.       fp = j.fp;
  105.       fileLength = j.fileLength;
  106.       localStartTime = j.localStartTime;
  107.       movieStartTime = j.movieStartTime;
  108.       startOffset = j.startOffset;
  109.       datasent = j.datasent;
  110.       dataloaded = j.dataloaded;
  111.       datarate = j.datarate;
  112.     }
  113.     return *this;
  114.   }
  115.   // do nothing!, because destr is called when stack is reorganized
  116.   ~JobElem() {}
  117.   void cleanup()
  118.     {
  119.       if( fp )
  120. fclose( fp );
  121.       fp = NULL;
  122.     }
  123.   bool valid() { return fp != NULL; }
  124.   void restart_count( unsigned loadDataChunk );
  125.   bool send_packet( RemoteUser &user, PacketHeader *data );
  126.   void calc_next_packet_time( Timeval &t );
  127.   bool load_packet( RemoteUser &user, 
  128.     stack< PacketHeader * > &inusedBuffers );
  129. };
  130. /** all data associated with particular broadcast destination */
  131. struct OutputDestination {
  132.   int sock;
  133. };
  134. /** one record per each remote user; Note that the broadcast stream from
  135.     channel and broadcast channels output from this stream is the user too.
  136.  */
  137. class RemoteUser {
  138. public:
  139.   /** unique user id = socket */
  140.   Sockaddr socketAddr;
  141.   /** socket to write data to this user, ( write-only socket) */
  142.   int outputSocket;
  143.   typedef map< Sockaddr, OutputDestination * > outputSocketsT;
  144.   /** if user is actually the broadcast destination - it's outputSocket remains
  145.       -1, but the vector of outputSockets is used. For regular user 
  146.       outputSockets map is empty */
  147.   outputSocketsT outputSockets;
  148.   int priority;
  149.   /** queue of jobs to do. For broadcast user it's empty */
  150.   queue< JobElem > jobs;
  151.   /** what chunk of data to read from disk per call */
  152.   int loadDataChunk;
  153.   /**   allocate a vector of buffers to send to user. Each block starts 
  154. with PacketHeader. 
  155.         We assume that MPEG stream can be broken on parts, each of them 
  156. beginning at least of 6 times 0xff ( or more, i don't know? ) */
  157.   queue< PacketHeader * > dataToSend;
  158.   /**  sequence number is attached to user and is not related to jobs */
  159.   unsigned sequence;
  160.   static const unsigned maxPacketLength;
  161. public:
  162.   RemoteUser( const Sockaddr &addr ) : loadDataChunk( 1024 * 128 ),
  163.     outputSocket( -1 )
  164.     {
  165.       socketAddr = addr;
  166.     }
  167.   ~RemoteUser();
  168.   /**  cleanup everything for this user */
  169.   void cleanup_jobs( stack< PacketHeader * > &inusedBuffers );
  170.   /** Only for the broadcast fake user */
  171.   void add_broadcast_channel( const Sockaddr &addr );
  172. };
  173. class Appl;
  174. /** the command was read from UDP socket in a single shot, so implementation
  175.     function should accept input stream, where the command itself is 
  176.     already fetched.
  177. */
  178. typedef map< string, void (Appl::*)( istrstream &cmd, const Sockaddr &addr ) >
  179. commandsMapT;
  180. /** topmost class
  181.  */
  182. class Appl {
  183.   /** Control socket. For all sockets - not used value is -1, check for it.
  184.       The port number for this control socket is loaded from environment
  185.       variable PUSH1CP, where 1 is the sequence number to determine the 
  186.       control/data port pairs. If binding this port fails - try the next
  187.       PUSH2CP and so on. Then use the same sequence number to determine 
  188.       the acceptWaitSocket value from the environment PUSH1WP (wait port).
  189.       The value of PUSH1CP should be equal to corresponding PushServN_Control
  190.       and value of PUSH1WP - to PushServN_BroadcastInPort properties of the
  191.       database server. For example 7078 & 7081
  192.    */
  193.   int controlSocket;
  194.   /** socket to accept incoming broadcast TCP stream. If it -1 - server
  195.       do not expect any incoming data anymore, it should have the opened
  196.       stream at present, from the previous acceptWaitSocket. When connection 
  197.       is established, this socket is closed, when data connection is lost,
  198.       this socket should be reopen again
  199.   */
  200.   int acceptWaitSocket;
  201.   /** remember the port number of acceptWaitSocket for future reconnections */
  202.   int acceptWaitPortnum;
  203.   /** use this socket to read incoming TCP data stream. 
  204.    */
  205.   int incomingDataSocket;
  206.   /** this single sockaddr uniquely identify the incoming data stream. It
  207.       is initialized in acceptNewConnection().
  208.       The fact that push server accepts only one incoming stream from the 
  209.       acquisition server in fact does not reduce its capabilities - 
  210.       Video Server expects that we have multiple push servers, even on the 
  211.       single computer
  212.   */
  213.   Sockaddr incomingDataSockaddr;
  214.   /** buffer to preload incoming data from the acquisition server.
  215.       It is not circular - reset to zero when empty, move to start when
  216.       close to the end. The fact that all data must be saved and pumped
  217.       out to broadcast network means that this buffer is almost empty
  218.   */
  219.   char incomingDataBuffer[ maxPacketSize * 10 ];
  220.   unsigned incomingDataBufferOffset;
  221.   unsigned incomingDataBufferDataLength;
  222.   /** current fname to store video stream
  223.    */
  224.   string videoFileName;
  225.   /** file handle to store video stream
  226.    */
  227.   int videoFileHandle;
  228.   /** fill this map in constructor 
  229.    */
  230.   commandsMapT commandsMap;
  231.   /** list of users, organized as map, key = sockaddr */
  232.   usersT users;
  233.   /** this is special user, which represents the outgoing broadcasts 
  234.       connections. This must be allocated once and forever
  235.   */
  236.   RemoteUser *broadcast_user;
  237.   typedef priority_queue< Event > eQueueT;
  238.   eQueueT eventQueue;
  239.   //  how many events must be in priority queue, at least and until there 
  240.   //  are jobs in users list
  241.   int queueLowWaterMark;
  242.   // at most
  243.   int queueHighWaterMark;
  244.   // stack of inused buffers, each of size maxPackLen. Recicle space 
  245.   stack< PacketHeader * > inusedBuffers;
  246.   
  247.   /** store the next time when input sockets must be checked. Check must be
  248.       forced when timeout expire, even if we have no time right now.
  249.       Broadcast input TCP stream don't loose data, but let not waste our
  250.       buffers. Load stream at least every 1/5 second.
  251.       We do not use the event queue for this event.
  252.   */
  253.   Timeval nextTimeToCheckInput;
  254.   /** stream to output all logging */
  255.   ostream *log_output;
  256.   /** debug level */
  257.   int verbose_level;
  258.   static const char env_prefix[];
  259.   static const char env_postfix_cp[];
  260.   static const char env_postfix_wp[];
  261. private:
  262.   /** return true if no errors */
  263.   bool bindAcceptWaitSocket();
  264. public:
  265.   Appl( int argc, char **argv );
  266.   void run();
  267.   void shutdown();
  268.   /** check if we have nothing to do and wait for incoming packets or 
  269.       check for input when nextTimeToCheckInput come
  270.    */
  271.   void wait_for_network_event();
  272.   /** wait_for_network_event calls this to read data */
  273.   void readControlData();
  274.   /** after readControlData receive command accept_stream, it open the server
  275.       socket and wait for incoming connection. Call this when new connection
  276.       is pending after select call.
  277.       At this point incomingDataSocket must be zero - we may have only 1
  278.       connection
  279.   */
  280.   void acceptNewConnection();
  281.   /** as is */
  282.   void closeOldBroadcasting();
  283.   /** nonblock read of all pending TCP stream data at the socket.
  284.    */
  285.   void loadIncomingData();
  286.   /** forced to execute the next send event. This call is also resposible 
  287.       to remove jobs from list, when done
  288.   */
  289.   void execute_send_event();
  290.   /** process next event, even it is too early to do it
  291.       if this function is called, then we have nothing to do else
  292.   */
  293.   void execute_load_event();
  294.   /** parse, call callbacks, but do not delete buf
  295.    */
  296.   void parse_command( const char *cmd, const Sockaddr &addr );
  297.   /** callback to parse interface commands are postfixed with _impl */
  298.   void play_impl( istrstream &cmd, const Sockaddr &addr );
  299.   void stop_impl( istrstream &cmd, const Sockaddr &addr );
  300.   void setfilename_impl( istrstream &cmd, const Sockaddr &addr );
  301.   void stoprecording_impl( istrstream &cmd, const Sockaddr &addr );
  302.   /** no parameters passed: push server should be configured to what 
  303.       port to listen from environment
  304.    */
  305.   void accept_stream_impl( istrstream &cmd, const Sockaddr &addr );
  306.   void add_broadcast_destination_impl( istrstream &cmd, const Sockaddr &addr );
  307.   /** util */
  308.   void trim_string( string &s );
  309.   /** log message if its level is equal or less than verbose level */
  310.   void log( int level, const char *s, ... );
  311. };
  312. extern Appl *app;
  313. #ifndef MAX_PATH
  314. #define MAX_PATH 1024
  315. #endif
  316. #endif   //  _pusher_h_