vpusher.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:20k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. #include <stdio.h>
  6. #include <errno.h>
  7. #if HAVE_UNISTD_H
  8. #include <unistd.h>
  9. #endif
  10. #include <stdlib.h>
  11. #include <sys/socket.h>
  12. #include <sys/un.h> 
  13. #include <netinet/in.h> 
  14. #include <signal.h>
  15. #include <assert.h>
  16. #include <sys/stat.h>
  17. #include <fcntl.h>
  18. #include <string> 
  19. #include <strstream> 
  20. #include <iostream>
  21. #include <fstream>
  22. #include "sockaddr.h"
  23. #include "vpusher.h"
  24. #include "../../client/src/play_commands.h"
  25. #include "packet_header.h"
  26. //#define DEBUG
  27. Appl *app;
  28. const unsigned RemoteUser::maxPacketLength = 1472;
  29. const char Appl::env_prefix[] = "PUSH";
  30. const char Appl::env_postfix_cp[] = "CP";
  31. const char Appl::env_postfix_wp[] = "WP";
  32. void
  33. signalHandler( int sig )
  34. {
  35.   cout << "got signal " << sig << endl;
  36.   if( app )
  37.     app->shutdown();
  38.   exit(0);
  39. }
  40. // main
  41. int main( int argc, char **argv )
  42. {
  43.   signal(SIGINT, signalHandler);
  44.   signal(SIGSEGV, signalHandler);
  45.   app = new Appl( argc, argv );
  46.   app->run();
  47. }
  48. RemoteUser::~RemoteUser()
  49. {
  50.   // close all sockets
  51.   if( outputSocket != -1 )
  52.     close( outputSocket );
  53.   if( outputSockets.size() > 0 )
  54.     for( outputSocketsT::iterator it = outputSockets.begin(); 
  55.  it != outputSockets.end(); it++ )
  56.       close( (*it).second->sock );
  57.   // clear job queue
  58.   while( !jobs.empty() )
  59.     {
  60.       jobs.front().cleanup();
  61.       jobs.pop();
  62.     }
  63.   // and all buffers
  64.   while( !dataToSend.empty() )
  65.     {
  66.       delete dataToSend.front();
  67.       dataToSend.pop();
  68.     }
  69. }
  70. void RemoteUser::cleanup_jobs( stack< PacketHeader * > &inusedBuffers )
  71. {
  72.   while( !jobs.empty() )         // clear job queue
  73.     {
  74.       jobs.front().cleanup();
  75.       jobs.pop();
  76.     }
  77.   // and all buffers
  78.   while( !dataToSend.empty() )
  79.     {
  80.       inusedBuffers.push( dataToSend.front() );
  81.       dataToSend.pop();
  82.     }
  83. }
  84. void RemoteUser::add_broadcast_channel( const Sockaddr &addr )
  85. {
  86.   // first check if this socket already exists :
  87.   outputSocketsT::iterator it = outputSockets.find( addr );
  88.   if( it == outputSockets.end() )
  89.     {
  90.       // create output socket and
  91.       int sock = socket( PF_INET, SOCK_DGRAM, 0 );
  92.       if( -1 == sock )
  93. {
  94.   app->log( 0, "socket for broadcast: %s", sys_errlist[ errno ] );
  95.   return;
  96. }
  97.       // do not connect - use sendto
  98. //        if( -1 == connect( sock, (struct sockaddr *) &addr.addr, 
  99. //    sizeof(addr.addr)))
  100. //   {
  101. //     app->log( 0, "connect broadcast: %s", sys_errlist[ errno ] );
  102. //     return;
  103. //   }
  104.       OutputDestination *dest = new OutputDestination;
  105.       dest->sock = sock;
  106.       // add this socket to the list
  107.       outputSockets[ addr ] = dest;
  108.       app->log( 3, "add new broadcast target %s", 
  109. addr.get_description().c_str() );
  110.     }
  111. }
  112. // -----------------------------------------------------------------------
  113. // private members
  114. bool Appl::bindAcceptWaitSocket()
  115. {
  116.   if( acceptWaitSocket != -1 )  // why not?
  117.     close( acceptWaitSocket );
  118.   acceptWaitSocket = socket( PF_INET, SOCK_STREAM, 0 );
  119.   if( -1 == acceptWaitSocket ) 
  120.     {
  121.       perror( "socket " );
  122.       close( acceptWaitSocket );
  123.       acceptWaitSocket = -1;
  124.       return false;
  125.     }
  126.   struct sockaddr_in name;
  127.   // if control port is already binded - the wait connection port
  128.   // should be binded as well, or this is error
  129.   name.sin_family = AF_INET;
  130.   name.sin_port = htons ( acceptWaitPortnum );
  131.   name.sin_addr.s_addr = htonl (INADDR_ANY);
  132.   if( bind( acceptWaitSocket, (struct sockaddr *)&name, sizeof (name)) < 0)
  133.     {            
  134.       perror ("bind");
  135.       close( acceptWaitSocket );
  136.       acceptWaitSocket = -1;
  137.       return false;
  138.     }
  139.   // and listen
  140.   if( listen( acceptWaitSocket, 1 ) < 0 )
  141.     {
  142.       perror ("listen");
  143.       close( acceptWaitSocket );
  144.       acceptWaitSocket = -1;
  145.       return false;
  146.     }
  147.   log( 1, "accept wait socket successfully binded, port %d", 
  148.        acceptWaitPortnum );
  149.   return true;
  150. }
  151. // public members
  152. Appl::Appl( int argc, char **argv ) :
  153.   // all sockets should be -1
  154.   controlSocket( -1 ), acceptWaitSocket( -1 ), incomingDataSocket( -1 ),
  155.   videoFileHandle( -1 ), broadcast_user( NULL ),
  156.   queueLowWaterMark( 100 ), queueHighWaterMark( 1000 ),
  157.   incomingDataBufferOffset(0), incomingDataBufferDataLength(0),
  158.   log_output( NULL ), verbose_level( 3 )
  159. {
  160.   // logging: if log_output is still zero - than set it to default file:
  161.   if( !log_output )
  162.     {
  163.       log_output = new ofstream( "push.log", ios::out | ios::app );
  164.     }
  165.   log( 0, "" );
  166.   log( 0, "start push server application" );
  167.   controlSocket = socket( PF_INET, SOCK_DGRAM, 0 );
  168.   if( -1 == controlSocket ) 
  169.     {
  170.       perror( "socket " );
  171.       exit( 1 );
  172.     }
  173.   // load control socket port from the environment
  174.   // try each possible combination in sequence
  175.   for( int sequence = 1; ; sequence++ )
  176.     {
  177.       char env[20];
  178.       sprintf( env, "%s%d%s", env_prefix, sequence, env_postfix_cp );
  179.       char *control_port_s = getenv( env );
  180.       if( control_port_s == NULL )
  181. {
  182.   cerr << "environment " << env << " is not setn";
  183.   close( controlSocket );
  184.   exit(1);
  185. }
  186.       int control_port = atoi( control_port_s );
  187.       sprintf( env, "%s%d%s", env_prefix, sequence, env_postfix_wp );
  188.       char *wait_port_s = getenv( env );
  189.       if( wait_port_s == NULL )
  190. {
  191.   cerr << "environment " << env << " is not setn";
  192.   close( controlSocket );
  193.   exit(1);
  194. }
  195.       acceptWaitPortnum = atoi( wait_port_s );
  196.       // first try the control port 
  197.       struct sockaddr_in name;
  198.       name.sin_family = AF_INET;
  199.       name.sin_port = htons ( control_port );
  200.       name.sin_addr.s_addr = htonl (INADDR_ANY);
  201.       if (bind ( controlSocket, (struct sockaddr *) &name, sizeof (name)) < 0)
  202. {            
  203.   cerr << "port " << control_port_s << " is busy, try next onen";
  204.   continue;
  205. }
  206.       // try to open this port
  207.       bool result = bindAcceptWaitSocket();
  208.       if( result == false )
  209. {
  210.   close( controlSocket );
  211.   exit(1);
  212. }
  213.       break;
  214.     }
  215.   // next - create the user for broadsast
  216.   broadcast_user = new RemoteUser( Sockaddr() );
  217.   // fill this map in constructor 
  218.   commandsMap.insert( commandsMapT::value_type( string( _play_cmd_ ), 
  219. &play_impl ));
  220.   commandsMap.insert( commandsMapT::value_type( string( _stop_cmd_ ), 
  221. &stop_impl ));
  222.   commandsMap.insert( commandsMapT::value_type( string( _accept_stream_ ), 
  223. &accept_stream_impl ));
  224.   commandsMap.insert( commandsMapT::value_type( string( _set_filename_ ), 
  225. &setfilename_impl ));
  226.   commandsMap.insert( commandsMapT::value_type( string( _stop_recording_ ), 
  227. &stoprecording_impl ));
  228.   
  229.   commandsMap.insert
  230.     ( commandsMapT::value_type( string( _add_broadcast_destination_ ), 
  231. &add_broadcast_destination_impl ));
  232.   
  233.   nextTimeToCheckInput.now();
  234. void Appl::run()
  235. {
  236.   while( true )
  237.     {
  238.       wait_for_network_event();
  239.       // now it's time to fetch an event from queue and process it
  240.       if( eventQueue.size() > 0 )
  241. {
  242.   const Event &e = eventQueue.top();           // nearest event
  243.   switch( e.type )
  244.     {
  245.     case e.send:
  246.       execute_send_event();
  247.       break;
  248.     case e.load:
  249.       execute_load_event();
  250.       break;
  251.     case e.broadcast:
  252.       break;
  253.     case e.error:
  254.       cerr << "event error!n";
  255.       break;
  256.     }
  257. }
  258.     }
  259. }
  260. void Appl::shutdown()
  261. {
  262.   exit( 1 );
  263. }
  264. /** if there are no jobs in queue,
  265.     we can wait in select 3 sec, for example
  266. */
  267. void Appl::wait_for_network_event()
  268. {
  269.   Timeval whenNextEvent;
  270.   // if there are no jobs in queue,
  271.   // we can wait in select 3 sec, for example
  272.   Timeval now;
  273.   now.now();
  274.   whenNextEvent = now;
  275.   whenNextEvent.tv_sec() += 3;
  276.   if( eventQueue.size() > 0 )
  277.     {
  278.       const Event &e = eventQueue.top();
  279.       if( e.when < whenNextEvent )
  280. whenNextEvent = e.when;
  281.     }
  282.   while( true )
  283.     {
  284. #     ifdef DEBUG
  285.       //      cout << "diff= " << whenNextEvent << "- " << now << endl;
  286. #     endif
  287.       if( whenNextEvent < now ) // no time, go out
  288. break;
  289.       Timeval diff = whenNextEvent - now;
  290.       if( now < nextTimeToCheckInput )    // not yet -> check free time
  291. {
  292.   // call select only if we have time for this
  293.   if( diff.tv_sec() == 0 && diff.tv_usec() < 10000 ) // only 1/100 sec
  294.     break;
  295. }
  296.       // BTW, when to do this next time?
  297.       nextTimeToCheckInput = now;
  298.       nextTimeToCheckInput.tv_usec() += 200000;  // next 1/5 second
  299.       nextTimeToCheckInput.normalize();
  300. #     ifdef DEBUG
  301.       //      cout << "Wait for the network event, diff " << diff << endl;
  302. #     endif
  303.       // we have time. call select
  304.       fd_set set;
  305.       FD_ZERO( &set );
  306.       if( controlSocket != -1 )
  307. FD_SET( controlSocket, &set );
  308.       // we may have been waiting for incoming connection
  309.       if( acceptWaitSocket != -1 )
  310. FD_SET( acceptWaitSocket, &set );
  311.       if( incomingDataSocket != -1 )
  312. FD_SET( incomingDataSocket, &set );
  313.       int ret = select( FD_SETSIZE, &set, NULL, NULL, &diff.val() );
  314.       if( -1 == ret )
  315. {            
  316.   cerr << "line " << __LINE__ << endl;
  317.   perror ("select");
  318.   exit (1);
  319. }
  320.       if( 0 == ret )  // timeout
  321. break;
  322.       // ret holds the value of waiting connections. use FD_ISSET
  323.       if( FD_ISSET( controlSocket, &set ))
  324. {
  325.   readControlData();
  326.   ret --;            // tell that one less is left
  327. }
  328.       if( ret > 0 && acceptWaitSocket != -1 && 
  329.   FD_ISSET( acceptWaitSocket, &set ))
  330. {
  331.   acceptNewConnection();
  332.   ret --;            // tell that one less is left
  333. }
  334.       if( ret > 0 && incomingDataSocket != -1 && 
  335.   FD_ISSET( incomingDataSocket, &set ))
  336. {
  337.   loadIncomingData();
  338.   ret --;            // tell that one less is left
  339. }
  340.       now.now();
  341.     }
  342. }
  343. void Appl::readControlData()
  344. {
  345. # ifdef DEBUG
  346.   cout << "Read control data" << endl;
  347. # endif
  348.   Sockaddr user_sockaddr;
  349.   socklen_t namelen = sizeof( user_sockaddr );
  350.       
  351.   char *buf = new char[ 1024 * 16 ];
  352.   int len = recvfrom( controlSocket, buf, 1024 * 16, 0, 
  353.       (struct sockaddr *) &user_sockaddr.addr, &namelen );
  354.   if( -1 == len )  // nothing
  355.     {
  356.       delete buf;
  357.       return;
  358.     }
  359.   parse_command( buf, user_sockaddr );
  360.   delete buf;
  361. }
  362. void Appl::acceptNewConnection()
  363. {
  364.   closeOldBroadcasting();
  365.   log( 1, "new incoming data connection" );
  366.   unsigned int size = sizeof( incomingDataSockaddr.addr );
  367.   incomingDataSocket = accept( acceptWaitSocket, (struct sockaddr *) 
  368.        &incomingDataSockaddr.addr, &size );
  369.   if( incomingDataSocket < 0 )
  370.     {
  371.       perror( "accept" );
  372.       incomingDataSocket = -1;
  373.     }
  374.   // it should operate in non-blocking mode
  375.   int oldflags = fcntl( incomingDataSocket, F_GETFL, 0 );
  376.   if( oldflags == -1 )
  377.     {
  378.       close( incomingDataSocket );
  379.       incomingDataSocket = -1;
  380.       perror( "fcntl" );
  381.       return;
  382.     }
  383.   oldflags |= O_NONBLOCK;
  384.   if( -1 == fcntl( incomingDataSocket, F_SETFL, oldflags ))
  385.     {
  386.       close( incomingDataSocket );
  387.       incomingDataSocket = -1;
  388.       perror( "fcntl" );
  389.       return;
  390.     }
  391.   log( 1, "incoming connection from acq server accepted" );
  392.   
  393.   // we don't expect extra connections here:
  394.   close( acceptWaitSocket );
  395.   acceptWaitSocket = -1;
  396. }
  397. void Appl::closeOldBroadcasting()
  398. {
  399.   if( incomingDataSocket != -1 )
  400.     {
  401.       close( incomingDataSocket );
  402.       incomingDataSocket = -1;
  403.     }
  404.   if( videoFileHandle != -1 )
  405.     {
  406.       close( videoFileHandle );
  407.       videoFileHandle = -1;
  408.     }
  409. }
  410. void Appl::loadIncomingData()
  411. {
  412.   // data should come in pseudo - packet form: header + data
  413.   // check the buffer for reset or shift
  414.   if( incomingDataBufferOffset == incomingDataBufferDataLength )
  415.     {
  416.       // reset
  417.       incomingDataBufferOffset = incomingDataBufferDataLength = 0;
  418.     }
  419.   if( incomingDataBufferOffset > sizeof( incomingDataBuffer ) / 2 )
  420.     {
  421.       // shift
  422.       memcpy( incomingDataBuffer, incomingDataBuffer + 
  423.       incomingDataBufferOffset, incomingDataBufferDataLength );
  424.       incomingDataBufferOffset = 0;
  425.     }
  426.   // load as much as we can into buffer
  427.   int len = read( incomingDataSocket, incomingDataBuffer + 
  428.   incomingDataBufferOffset + incomingDataBufferDataLength, 
  429.   sizeof( incomingDataBuffer ) - incomingDataBufferDataLength
  430.   - incomingDataBufferOffset );
  431.   if( !len )   // nothing
  432.     return;
  433.   if( len < 0 )
  434.     {
  435.       // this link is dead
  436.       perror( "read" );
  437.       close( incomingDataSocket );
  438.       incomingDataSocket = -1;
  439.       incomingDataSockaddr.addr.sin_port = 0;
  440.       log( 1, "incoming channel is dead, wait for new connection" );
  441.       // second step - open the incoming wait socket to accept the next 
  442.       // connection
  443.       bindAcceptWaitSocket();
  444.       return;
  445.     }
  446.   // data expands...
  447.   incomingDataBufferDataLength += len;
  448.   /** loop over small packets in buffer */
  449.   while( incomingDataBufferDataLength >= sizeof( PacketHeader ))
  450.     {
  451.       PacketHeader *ph = (PacketHeader *)( incomingDataBuffer + 
  452.    incomingDataBufferOffset );
  453.       if( strncmp( ph->type, mpegT, sizeof( mpegT )))
  454. {
  455.   log( 1, "incoming data have unknown format" );
  456.   ph = NULL;
  457.   // that's not our data - 'mpeg' magic key expected
  458.   char *ptr = incomingDataBuffer + incomingDataBufferOffset;
  459.   // find the first char
  460.   char *ptrend = incomingDataBuffer + incomingDataBufferOffset +
  461.     incomingDataBufferDataLength - sizeof( mpegT );
  462.   char ch_to_find = mpegT[0];
  463.   while( ptr < ptrend )
  464.     {
  465.       if( *ptr != ch_to_find || strncmp( ptr, mpegT, sizeof( mpegT )))
  466. {
  467.   ptr++;
  468.   continue;
  469. }
  470.       // that's it
  471.       ph = (PacketHeader *) ptr;
  472.       break;
  473.     }
  474.   if( ph == NULL )
  475.     return;
  476. }
  477.       // do we have the entire packet in memory?
  478.       if( ph->packetSize > incomingDataBufferDataLength ) // not yet
  479. return;
  480.       // doesn't matter what happens next - but we may adjust 
  481.       // buffer pointers now
  482.       incomingDataBufferOffset += ph->packetSize;
  483.       incomingDataBufferDataLength -= ph->packetSize;
  484.       // log( 3, "got incoming packet" );
  485.       // store data in file
  486.       if( videoFileHandle != -1 )
  487. {
  488.   int wlen = write( videoFileHandle, ((char*)ph) + ph->dataOffset, 
  489.     ph->dataSize );
  490.   if( -1 == wlen )
  491.     {
  492.       close( videoFileHandle );
  493.       videoFileHandle = -1;
  494.       perror( "write" );
  495.       return;
  496.     }
  497. }
  498.       // now, if we have broadcast - we must put data in queue
  499.       // broadcast is the special user
  500.       if( broadcast_user != NULL )
  501. {
  502.   // send the entire packet as-is
  503.   RemoteUser::outputSocketsT::iterator sock_it;
  504. #         ifdef DEBUG
  505.   log( 3, "sending data to %d targets", 
  506.        broadcast_user->outputSockets.size() );
  507. #         endif
  508.   for( sock_it = broadcast_user->outputSockets.begin(); sock_it != 
  509.  broadcast_user->outputSockets.end(); sock_it++ )
  510.     {
  511.       OutputDestination *dest = (*sock_it).second;
  512.       if( dest->sock == -1 )  // set to error
  513. {
  514.   log( 1, "sending data packet: socket is closed" ); 
  515.   continue;
  516. }
  517.       int size = sendto( dest->sock, ph, ph->packetSize, 0,
  518.  (struct sockaddr *)&(*sock_it).first.addr, 
  519.  sizeof( (*sock_it).first.addr ) );
  520.       if( size == -1 )
  521. {
  522.   log( 1, "sending data packet to %s failed: %s", 
  523.        (*sock_it).first.get_description().c_str(), 
  524.        sys_errlist[ errno ] );
  525.   // do not close the socket - client should bind it
  526. }
  527.     }
  528. }
  529.     } // loop over packets
  530. }
  531. void Appl::execute_send_event()
  532. {
  533.   if( eventQueue.size() == 0 )   // nothing to do
  534.     return;
  535.   
  536.   const Event e = eventQueue.top();           // nearest event
  537.   if( e.type != e.send )   // how this happens?
  538.     return;
  539.   // and pop immediately
  540.   eventQueue.pop();
  541.   if( users.find( e.userSocketAddr ) == users.end() ) 
  542.     {
  543.       cout << "No such user (send event)" << endl;
  544.       return;
  545.     }
  546.   RemoteUser &user = *users[ e.userSocketAddr ];
  547. # ifdef DEBUG
  548.   {
  549.     static Timeval prev;
  550.     cout << "send event. diff from prev is: " << e.when - prev << endl;
  551.     prev = e.when;
  552.   }
  553. # endif
  554.   if( !user.jobs.size() )
  555.     {
  556.       // this is not the error, just nothing to do
  557.       cout << "this user has the empty job queue (send event)" << endl;
  558.       return;
  559.     }
  560.   // current job
  561.   JobElem &job = user.jobs.front();
  562.   
  563.   // packet to send
  564.   if( !user.dataToSend.size() )
  565.     {
  566.       cout << "this user has the empty send queue" << endl;
  567.       // and create new load event
  568.       Event newLe( Event::load );
  569.       newLe.userSocketAddr = user.socketAddr;
  570.       newLe.when.now().tv_usec() += 1000;
  571.       eventQueue.push( newLe );
  572. #     ifdef DEBUG
  573.       cout << "RELOAD at " << newLe.when << " top is " << 
  574. eventQueue.top().when << endl;
  575. #     endif
  576.       // and new send event
  577.       Event newSe( Event::send );
  578.       newSe.userSocketAddr = e.userSocketAddr;
  579.       newSe.when = newLe.when;
  580.       newSe.when.tv_usec() += 1000;
  581.       eventQueue.push( newSe );
  582.       return;
  583.     }
  584.   PacketHeader *data = user.dataToSend.front();
  585.   // and pop immediately
  586.   user.dataToSend.pop();
  587.   // put to inused buffer now
  588.   inusedBuffers.push( data );
  589.   if( !data ) 
  590.     {
  591.       cout << "bad data pointer" << endl;
  592.       return;
  593.     }
  594.       
  595.   // this command actually send data to client
  596.   if( !job.send_packet( user, data ))
  597.     {
  598.       // strong policy - can't send - remove from the queue...
  599.       // client can ask to send more data
  600.       return;
  601.     }
  602.   // calculate time to play next packet
  603.   Timeval next;
  604.   job.calc_next_packet_time( next );
  605.   Event newEv = e;
  606.   newEv.type = newEv.send;
  607. //   // try to preload a bit
  608. //   Timeval now;
  609. //   now.now();
  610. //   if( next.tv_sec() > now.tv_sec() + 5 )
  611. //     next.tv_sec() -= 5;
  612. //   else if( next > now )
  613. //     {
  614. //       Timeval delta = newEv.when - now;
  615. //       delta *= (float) 0.5;
  616. //       next = now + delta;
  617. //     }
  618. //   else 
  619. //     next = now;
  620.   newEv.when = next;
  621.   // to make happens following load event to be 'a bit' before
  622.   newEv.when.tv_usec() ++;
  623.   // replace event
  624.   eventQueue.push( newEv );
  625.   // manage next load event
  626.   if( user.dataToSend.size() >= user.loadDataChunk /
  627.       user.maxPacketLength )
  628.     return; // no need to load
  629.   Event newLe( Event::load );
  630.   newLe.userSocketAddr = e.userSocketAddr;
  631.   newLe.when = next;
  632. # ifdef DEBUG
  633.   cout << "top " << eventQueue.top().when << " " << 
  634.     eventQueue.top().userSocketAddr << endl;
  635. # endif
  636.   eventQueue.push( newLe );
  637. # ifdef DEBUG
  638.   cout << "NEXT LOAD for port " << user.outputSocket << " at " 
  639.        << newLe.when << "addr " << user.socketAddr << endl;
  640.   cout << "top " << eventQueue.top().when << " " << 
  641.     eventQueue.top().userSocketAddr << endl;
  642. # endif
  643. }
  644. void Appl::execute_load_event()
  645. {
  646.   if( eventQueue.size() == 0 )   // nothing to do
  647.     return;
  648.   const Event e = eventQueue.top();           // nearest event
  649.   if( e.type != e.load )
  650.     return;
  651.   // and pop
  652.   eventQueue.pop();
  653.   if( users.find( e.userSocketAddr ) == users.end() ) 
  654.     {
  655.       cout << "No such user (load event)" << endl;
  656.       return;
  657.     }
  658.       
  659.   RemoteUser &user = *users[ e.userSocketAddr ];
  660. # ifdef DEBUG
  661.   {
  662.     Timeval now;
  663.     now.now().tv_usec() += 5000;
  664.     cout << "LOAD event for port " << user.outputSocket << " time "
  665.  << e.when << "addr " << user.socketAddr << endl;
  666.   }
  667.   assert( user.socketAddr == e.userSocketAddr );
  668. # endif
  669.   if( !user.jobs.size() )
  670.     {
  671.       cout << "this user has the empty job queue (load event)" << endl;
  672.       return;
  673.     }
  674.       
  675.   // current job
  676.   JobElem &job = user.jobs.front();
  677.   // if fp == NULL -> something wrong, don't load and remove queue
  678.   if( !job.valid() )
  679.     {
  680.       cout << "file descr is zero, file " << job.filename.c_str() << endl;
  681.       return;
  682.     }
  683.   // load data
  684.   if( !job.load_packet( user, inusedBuffers ))
  685.     {
  686.       return;
  687.     }
  688. }
  689. void Appl::log( int level, const char *s, ... )
  690. {
  691.   if( level > verbose_level || !log_output )
  692.     return;
  693.   Timeval tv;
  694.   tv.now();
  695.   va_list ap;
  696.   va_start( ap, s );
  697.   char buf[1024];
  698.   if( s[0] )
  699.     {
  700.       vsnprintf( buf, sizeof( buf ), s, ap );
  701.       *log_output << tv << buf << endl;
  702.     }
  703.   else
  704.     *log_output << endl;
  705.   va_end( ap );
  706. }