protocol_thread.cpp
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:12k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. #include "stdafx.h"
  6. #include "protocol_thread.h"
  7. #include "video_thread.h"
  8. #include "text_thread.h"
  9. #include "exceptions.h"
  10. extern video_thread    *videoThread;
  11. const char protocol_thread::register_acquisition_server[] = 
  12. "registeracquisitionserver";
  13. const char protocol_thread::cmd_request_broadcast_parameters[] = 
  14. "requestbroadcastparameters";
  15. const char protocol_thread::cmd_push_server_ok[] = "pushserverok";
  16. const char protocol_thread::cmd_add_caption_noid[] = "addcaptionnoid";
  17. const int protocol_thread::retryConnectionInterval = 10; // seconds
  18. const int protocol_thread::setupConnectionTimeout = 5;
  19. static const char *help()
  20. {
  21.   return "Usage: acquisition -S database_server_addr -P port_numbern";
  22. }
  23. protocol_thread::protocol_thread( int argc, char **argv )
  24.   : protocolStream( NULL ), shutdown_now( false ), log_output( NULL ),
  25.     lock_log( "locking the log stream" ), 
  26.     lock_various_cmdstacks( "lock queues of commands to execute" ),
  27.     lock_new_text( "lock access to the new text" ),
  28.     verbose_level( 3 )
  29. {
  30.   if( argc <= 1 )
  31.     {
  32.       log( 0, "throw SyntaxException, line %d", __LINE__ );
  33.       throw SyntaxException( "%s", help() );
  34.     }
  35.   std::string s_server;  s_server.c_str();
  36.   std::string s_port;    s_port.c_str();
  37.   for( int i = 1; i < argc; i++ )
  38.     {
  39.       std::string arg( argv[i] );
  40.       if( arg == "-S" || arg == "-s" )
  41. {
  42.   if( i == argc - 1 )
  43.     {
  44.       log( 0, "throw SyntaxException, line %d", __LINE__ );
  45.       throw SyntaxException( "%s", help() );
  46.     }
  47.   s_server = argv[i+1];
  48. }
  49.       if( arg == "-P" || arg == "-p" )
  50. {
  51.   if( i == argc - 1 )
  52.     {
  53.       log( 0, "throw SyntaxException, line %d", __LINE__ );
  54.       throw SyntaxException( "%s", help() );
  55.     }
  56.   s_port = argv[i+1];
  57. }
  58.     }
  59.   if( !s_server.length() || !s_port.length() )
  60.     {
  61.       log( 0, "throw SyntaxException, line %d", __LINE__ );
  62.       throw SyntaxException( "%s", help() );
  63.     }
  64.   dbServerAddr = Sockaddr( s_server, s_port );
  65.   if( dbServerAddr.addr.sin_port == 0 )
  66.     {
  67.       log( 0, "throw NetworkException, line %d", __LINE__ );
  68.       throw NetworkException( NULL, "Server or port is incorrect" );
  69.     }
  70.   // set that connection must be restarted now
  71.   whenTryToRetryConnection.now();
  72.   // logging: if log_output is still zero - than set it to default file:
  73.   if( !log_output )
  74.     {
  75.       filebuf *fb = new filebuf();
  76.       fb->open( "acquisition.log", std::ios_base::out | std::ios_base::app );
  77.       log_output = new ostream( fb );
  78.     }
  79.   Timeval tv;
  80.   tv.now();
  81.   *log_output << "nStarting acquisition server " << tv << endl;
  82. }
  83. /** static thread - entry function. When it ends - thread ends */
  84. /*static*/ 
  85. void protocol_thread::protocol_thread_entry_func( void *_instance )
  86. {
  87.   protocol_thread *instance = (protocol_thread*) _instance;
  88.   // loop
  89.   while( true )
  90.     {
  91.       try
  92. {
  93.   instance->open_connection(); // do nothing if conn is fine
  94.   instance->handshake();
  95.   // database server may indicate some kind of default 
  96.   // target to start broadcast immediately.
  97.   instance->request_broadcast_parameters();
  98.   instance->loop();       // until some exception
  99. }
  100.       catch ( NetworkException e ) 
  101. {
  102.   instance->log( 1, e.getText().c_str() );
  103.   // on network exception - wait some time to restart the conn.
  104.   instance->cleanup_connection();
  105.   // we must restart the connection after timeout
  106.   instance->whenTryToRetryConnection.now();
  107.   instance->whenTryToRetryConnection.tv_sec() 
  108.     += retryConnectionInterval;
  109. }
  110.       catch ( ShutdownException e )
  111. {
  112.   // that means game over
  113.   instance->log( 1, e.getText().c_str() );
  114.   videoThread->shutdown_requested();
  115.   break;
  116. }
  117.     }
  118.   instance->cleanup();
  119.   delete instance;
  120. }
  121. void protocol_thread::cleanup()
  122. {
  123.   cleanup_connection();
  124.   for( int i = 0; i < 20 && videoThread != NULL && 
  125.  videoThread->is_shutdown_performed() == false; i++ )
  126.     {
  127.       videoThread->shutdown_requested();
  128.       sleep( 1 );
  129.     }
  130.   // if video thread still can't shutdown it should be in blocking call -
  131.   // shutdown video board from this thread, it's ok, mutex provides
  132.   // exclusive access to the board
  133.   if( videoThread != NULL && videoThread->is_shutdown_performed() == false )
  134.     videoThread->shutdown();
  135.   if( log_output )
  136.     {
  137.       log( 1, "protocol thread cleanup" );
  138.       delete log_output;
  139.       log_output = NULL;
  140.     }
  141. }
  142. void protocol_thread::open_connection()
  143. {
  144.   // check for time to start or restart the connection
  145.   if( protocolStream )
  146.     return;
  147.   Timeval test;
  148.   test.now();
  149.   if( test.tv_sec() < whenTryToRetryConnection.tv_sec() )
  150.     {
  151.       sleep( 1 );
  152.       return;
  153.     }
  154. # ifdef WIN32
  155.   WORD wVersionRequested = MAKEWORD( 2, 0 );
  156.   WSADATA wsaData;
  157.   int err = WSAStartup( wVersionRequested, &wsaData );
  158.   if( err != 0 )
  159.     {
  160.       log( 0, "cannot init Windows socket, line %d", __LINE__ );
  161.       throw NetworkException( NULL, "cannot init Windows socket" );
  162.     }
  163. # endif // WIN32
  164. # ifndef WIN32
  165.   int
  166. # else
  167.   SOCKET
  168. # endif
  169.     dbSocket = socket (PF_INET, SOCK_STREAM, 0);
  170.   if( dbSocket < 0 )
  171.     {
  172.       dbSocket = 0;
  173.       log( 0, "cannot create socket" );
  174. #     ifndef WIN32
  175.       log( 0, "throw NetworkException, line %d", __LINE__ );
  176.       throw NetworkException( NULL, "socket: %s", sys_errlist[ errno ] );
  177. #     else
  178.       throw NetworkException( NULL, "socket: %s, err code %d", 
  179.       sys_errlist[ errno ], WSAGetLastError() );
  180. #     endif
  181.     }
  182.   // now connect
  183.   if( 0 != ::connect( dbSocket, (struct sockaddr *) &dbServerAddr, 
  184.       sizeof (dbServerAddr)))
  185.     { 
  186.       dbSocket = 0;
  187.       log( 1, "connect to database server failed" );
  188.       throw NetworkException( NULL, "connect: %s", sys_errlist[ errno ] );
  189.     }
  190.   // test if sock is readable
  191.   fd_set set;
  192.   FD_ZERO( &set );
  193.   FD_SET( dbSocket, &set );
  194.   Timeval timeout( setupConnectionTimeout, 0 );
  195.   
  196.   int ret = select( FD_SETSIZE, &set, NULL, NULL, &timeout.val() );
  197.   if( ret == -1 )
  198.     {
  199.       log( 0, "throw NetworkException, line %d", __LINE__ );
  200.       throw NetworkException( NULL, "select: %s", sys_errlist[ errno ] );
  201.     }
  202.   if( !ret )  // timeval expires
  203.     {
  204.       log( 0, "throw NetworkException, line %d", __LINE__ );
  205.       throw NetworkException( NULL, "timeout expired, not connected" );
  206.     }
  207.   protocolStream = new protocol_stream( dbSocket, setupConnectionTimeout );
  208.   // expect the prompt
  209.   protocolStream->skip_to_prompt();
  210.   log( 1, "connection successful" );
  211. }
  212. void protocol_thread::cleanup_connection()
  213. {
  214.   if( protocolStream )
  215.     delete protocolStream;
  216.   protocolStream = NULL;
  217. }
  218. /** handshake with server */
  219. void protocol_thread::handshake()
  220. {
  221.   if( !protocolStream )
  222.     {
  223.       log( 0, "throw NetworkException, line %d", __LINE__ );
  224.       throw NetworkException( NULL, "not connected" );
  225.     }
  226.   *protocolStream << register_acquisition_server << "n";
  227.   protocolStream->wait_input();
  228.   *protocolStream >> acquisition_server_id;
  229.   protocolStream->skip_to_prompt();
  230.   log( 1, "handshake success, id is %d", acquisition_server_id );
  231. }
  232. void protocol_thread::request_broadcast_parameters()
  233. {
  234.   *protocolStream << cmd_request_broadcast_parameters << "n";
  235.   protocolStream->wait_input();
  236.   // receive number of broadcast targets
  237.   std::string targ_num_s;
  238.   *protocolStream >> targ_num_s;
  239.   int targ_num = 0; 
  240.   if( targ_num_s.length() > 0 )
  241.     targ_num = atoi( targ_num_s.c_str() );
  242.   log( 1, "%d broadcast targets", targ_num );
  243.   for( int i = 0; i < targ_num; i++ )
  244.     {
  245.       std::string s_id;
  246.       *protocolStream >> s_id;
  247.       int id = atoi( s_id.c_str() );
  248.       // receive address and port number
  249.       std::string addr;
  250.       *protocolStream >> addr;
  251.       std::string control_portnum;
  252.       *protocolStream >> control_portnum;
  253.       int c_port = atoi( control_portnum.c_str() );
  254.       Sockaddr control( addr, c_port );
  255.       std::string broadcast_portnum;
  256.       *protocolStream >> broadcast_portnum;
  257.       int b_port = atoi( broadcast_portnum.c_str() );
  258.       Sockaddr broadcast( addr, b_port );
  259.       // add, but not connect
  260.       // add this as pending target: the connection involve reconnect, so 
  261.       // don't try to connect right here: this function only place that 
  262.       // new target into the queue
  263.       videoThread->add_broadcast_target( id, control, broadcast );
  264.       log( 2, "broadcast target: %s, control %d, broadcast %d", 
  265.    addr.c_str(), control_portnum.c_str(), broadcast_portnum.c_str() );
  266.     }
  267.   protocolStream->skip_to_prompt();
  268. }
  269. /** never exit, work until some exception */
  270. void protocol_thread::loop()
  271. {
  272.   while( true )
  273.     {
  274.       if( shutdown_now )
  275. {
  276.   log( 0, "throw ShutdownException, line %d", __LINE__ );
  277.   throw ShutdownException( "shutdown by flag" );
  278. }
  279.       // look for command queue
  280.       dump_log();
  281.       // and look for any pending input
  282.       if( protocolStream->test_for_any_pending_input( 100 ) )
  283. process_unknown_input();
  284.       dump_log();
  285.       // pop commands from stack
  286.       lookup_cmd_stack();
  287.       // do we have text to transfer into the database?
  288.       check_new_text();
  289. #     ifdef _DEBUG
  290.       static ask_interrupt_cnt = 0;
  291.       if( ++ask_interrupt_cnt > 200 )
  292. {
  293.   ask_interrupt_cnt = 0;
  294.   printf( "do you wants to continue? (Y/n)" );
  295.   int ch = getchar();
  296.   if( ch == 'n' || ch == 'N' )
  297.     {
  298.       log( 0, "throw ShutdownException, line %d", __LINE__ );
  299.       throw ShutdownException( "shutdown from keyboard" );
  300.     }
  301. }
  302. #     endif
  303.     }
  304. }
  305. void protocol_thread::process_unknown_input()
  306. {
  307.   std::vector< std::string > input;
  308.   *protocolStream >> input;
  309.   if( !input.size() ) // nothing
  310.     {
  311.       dump_log();
  312.       return;
  313.     }
  314.   // if this is command, it will starts with word 'command'
  315.   if( !protocolStream->is_command( input[0] ))
  316.     {
  317.       log( 0, "throw SyntaxException, line %d", __LINE__ );
  318.       throw SyntaxException( "expected command, got %s", input[0].c_str() );
  319.     }
  320. }
  321. void protocol_thread::sleep( int sec )
  322. {
  323. # ifdef _WIN32
  324.   Sleep( sec * 1000 );
  325. # endif
  326. }
  327. void protocol_thread::lookup_cmd_stack()
  328. {
  329.   wait_for_mutex wm( lock_various_cmdstacks, 5 );
  330.   if( connected_push_servers.size() > 0 )
  331.     {
  332.       int id = connected_push_servers.front();
  333.       log( 3, "cmd: report push server #%d ok to db server", id );
  334.       connected_push_servers.pop();
  335.       char buf[10];
  336.       _itoa( id, buf, 10 );
  337.       *protocolStream << cmd_push_server_ok << "n" << buf << "n";
  338.       protocolStream->wait_input();
  339.       // e are actually not interested about the result
  340.       protocolStream->skip_to_prompt();
  341.     }
  342. }
  343. void protocol_thread::check_new_text()
  344. {
  345.   text_data *td = NULL;
  346.   {
  347.     // this is shared data:
  348.     wait_for_mutex wm( lock_new_text, 1 );
  349.     if( !new_text.size() )
  350.       return; // nothing
  351.     td = new_text.front();
  352.     new_text.pop();
  353.     // mutex is released here
  354.   }
  355.   // send this data to the database server
  356.   *protocolStream << cmd_add_caption_noid << "n" << td->get_time() << "n"
  357.   << td->get_text() << "n";
  358.   protocolStream->skip_to_prompt();
  359.   delete td;
  360. }
  361. void protocol_thread::shutdown_requested()
  362. {
  363.   log( 3, "protocol thread: shutdown requested" );
  364.   shutdown_now = true;
  365.   if( videoThread )
  366.     videoThread->shutdown_requested();
  367. }
  368. void protocol_thread::report_push_serv_ok( int push_id )
  369. {
  370.   log( 3, "report push server #%d ok to db server", push_id );
  371.   wait_for_mutex wm( lock_various_cmdstacks, 5 );
  372.   connected_push_servers.push( push_id );
  373. }
  374. void protocol_thread::new_text_data( text_data *data )
  375. {
  376.   wait_for_mutex wm( lock_new_text, 1 );
  377.   new_text.push( data );
  378. }
  379. void protocol_thread::dump_log()
  380. {
  381.   if( !log_output )
  382.     return;
  383.   wait_for_mutex wm( lock_log, 1 );
  384.   std::vector< std::string > msgs;
  385.   protocolStream->append_messages( msgs );
  386.   // dump
  387.   for( int i = 0; i < msgs.size(); i++ )
  388.     {
  389.       *log_output << msgs[i].c_str() << endl;
  390.     }
  391. }
  392. void protocol_thread::log( int level, const char *s, ... )
  393. {
  394.   if( level > verbose_level || !log_output )
  395.     return;
  396.   wait_for_mutex wm( lock_log, 1 );
  397.   Timeval tv;
  398.   tv.now();
  399.   va_list ap;
  400.   va_start( ap, s );
  401.   char buf[1024];
  402.   vsnprintf( buf, sizeof( buf ), s, ap );
  403.   *log_output << tv << buf << endl;
  404.   va_end( ap );
  405. }