protocol_thread.cpp
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:12k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include "stdafx.h"
- #include "protocol_thread.h"
- #include "video_thread.h"
- #include "text_thread.h"
- #include "exceptions.h"
- extern video_thread *videoThread;
- const char protocol_thread::register_acquisition_server[] =
- "registeracquisitionserver";
- const char protocol_thread::cmd_request_broadcast_parameters[] =
- "requestbroadcastparameters";
- const char protocol_thread::cmd_push_server_ok[] = "pushserverok";
- const char protocol_thread::cmd_add_caption_noid[] = "addcaptionnoid";
- const int protocol_thread::retryConnectionInterval = 10; // seconds
- const int protocol_thread::setupConnectionTimeout = 5;
- static const char *help()
- {
- return "Usage: acquisition -S database_server_addr -P port_numbern";
- }
- protocol_thread::protocol_thread( int argc, char **argv )
- : protocolStream( NULL ), shutdown_now( false ), log_output( NULL ),
- lock_log( "locking the log stream" ),
- lock_various_cmdstacks( "lock queues of commands to execute" ),
- lock_new_text( "lock access to the new text" ),
- verbose_level( 3 )
- {
- if( argc <= 1 )
- {
- log( 0, "throw SyntaxException, line %d", __LINE__ );
- throw SyntaxException( "%s", help() );
- }
- std::string s_server; s_server.c_str();
- std::string s_port; s_port.c_str();
- for( int i = 1; i < argc; i++ )
- {
- std::string arg( argv[i] );
- if( arg == "-S" || arg == "-s" )
- {
- if( i == argc - 1 )
- {
- log( 0, "throw SyntaxException, line %d", __LINE__ );
- throw SyntaxException( "%s", help() );
- }
- s_server = argv[i+1];
- }
- if( arg == "-P" || arg == "-p" )
- {
- if( i == argc - 1 )
- {
- log( 0, "throw SyntaxException, line %d", __LINE__ );
- throw SyntaxException( "%s", help() );
- }
- s_port = argv[i+1];
- }
- }
- if( !s_server.length() || !s_port.length() )
- {
- log( 0, "throw SyntaxException, line %d", __LINE__ );
- throw SyntaxException( "%s", help() );
- }
- dbServerAddr = Sockaddr( s_server, s_port );
- if( dbServerAddr.addr.sin_port == 0 )
- {
- log( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( NULL, "Server or port is incorrect" );
- }
- // set that connection must be restarted now
- whenTryToRetryConnection.now();
- // logging: if log_output is still zero - than set it to default file:
- if( !log_output )
- {
- filebuf *fb = new filebuf();
- fb->open( "acquisition.log", std::ios_base::out | std::ios_base::app );
- log_output = new ostream( fb );
- }
- Timeval tv;
- tv.now();
- *log_output << "nStarting acquisition server " << tv << endl;
- }
- /** static thread - entry function. When it ends - thread ends */
- /*static*/
- void protocol_thread::protocol_thread_entry_func( void *_instance )
- {
- protocol_thread *instance = (protocol_thread*) _instance;
- // loop
- while( true )
- {
- try
- {
- instance->open_connection(); // do nothing if conn is fine
- instance->handshake();
- // database server may indicate some kind of default
- // target to start broadcast immediately.
- instance->request_broadcast_parameters();
- instance->loop(); // until some exception
- }
- catch ( NetworkException e )
- {
- instance->log( 1, e.getText().c_str() );
- // on network exception - wait some time to restart the conn.
- instance->cleanup_connection();
- // we must restart the connection after timeout
- instance->whenTryToRetryConnection.now();
- instance->whenTryToRetryConnection.tv_sec()
- += retryConnectionInterval;
- }
- catch ( ShutdownException e )
- {
- // that means game over
- instance->log( 1, e.getText().c_str() );
- videoThread->shutdown_requested();
- break;
- }
- }
- instance->cleanup();
- delete instance;
- }
- void protocol_thread::cleanup()
- {
- cleanup_connection();
- for( int i = 0; i < 20 && videoThread != NULL &&
- videoThread->is_shutdown_performed() == false; i++ )
- {
- videoThread->shutdown_requested();
- sleep( 1 );
- }
- // if video thread still can't shutdown it should be in blocking call -
- // shutdown video board from this thread, it's ok, mutex provides
- // exclusive access to the board
- if( videoThread != NULL && videoThread->is_shutdown_performed() == false )
- videoThread->shutdown();
- if( log_output )
- {
- log( 1, "protocol thread cleanup" );
- delete log_output;
- log_output = NULL;
- }
- }
- void protocol_thread::open_connection()
- {
- // check for time to start or restart the connection
- if( protocolStream )
- return;
- Timeval test;
- test.now();
- if( test.tv_sec() < whenTryToRetryConnection.tv_sec() )
- {
- sleep( 1 );
- return;
- }
- # ifdef WIN32
- WORD wVersionRequested = MAKEWORD( 2, 0 );
- WSADATA wsaData;
- int err = WSAStartup( wVersionRequested, &wsaData );
- if( err != 0 )
- {
- log( 0, "cannot init Windows socket, line %d", __LINE__ );
- throw NetworkException( NULL, "cannot init Windows socket" );
- }
- # endif // WIN32
- # ifndef WIN32
- int
- # else
- SOCKET
- # endif
- dbSocket = socket (PF_INET, SOCK_STREAM, 0);
- if( dbSocket < 0 )
- {
- dbSocket = 0;
- log( 0, "cannot create socket" );
- # ifndef WIN32
- log( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( NULL, "socket: %s", sys_errlist[ errno ] );
- # else
- throw NetworkException( NULL, "socket: %s, err code %d",
- sys_errlist[ errno ], WSAGetLastError() );
- # endif
- }
- // now connect
- if( 0 != ::connect( dbSocket, (struct sockaddr *) &dbServerAddr,
- sizeof (dbServerAddr)))
- {
- dbSocket = 0;
- log( 1, "connect to database server failed" );
- throw NetworkException( NULL, "connect: %s", sys_errlist[ errno ] );
- }
- // test if sock is readable
- fd_set set;
- FD_ZERO( &set );
- FD_SET( dbSocket, &set );
- Timeval timeout( setupConnectionTimeout, 0 );
-
- int ret = select( FD_SETSIZE, &set, NULL, NULL, &timeout.val() );
- if( ret == -1 )
- {
- log( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( NULL, "select: %s", sys_errlist[ errno ] );
- }
- if( !ret ) // timeval expires
- {
- log( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( NULL, "timeout expired, not connected" );
- }
- protocolStream = new protocol_stream( dbSocket, setupConnectionTimeout );
- // expect the prompt
- protocolStream->skip_to_prompt();
- log( 1, "connection successful" );
- }
- void protocol_thread::cleanup_connection()
- {
- if( protocolStream )
- delete protocolStream;
- protocolStream = NULL;
- }
- /** handshake with server */
- void protocol_thread::handshake()
- {
- if( !protocolStream )
- {
- log( 0, "throw NetworkException, line %d", __LINE__ );
- throw NetworkException( NULL, "not connected" );
- }
- *protocolStream << register_acquisition_server << "n";
- protocolStream->wait_input();
- *protocolStream >> acquisition_server_id;
- protocolStream->skip_to_prompt();
- log( 1, "handshake success, id is %d", acquisition_server_id );
- }
- void protocol_thread::request_broadcast_parameters()
- {
- *protocolStream << cmd_request_broadcast_parameters << "n";
- protocolStream->wait_input();
- // receive number of broadcast targets
- std::string targ_num_s;
- *protocolStream >> targ_num_s;
- int targ_num = 0;
- if( targ_num_s.length() > 0 )
- targ_num = atoi( targ_num_s.c_str() );
- log( 1, "%d broadcast targets", targ_num );
- for( int i = 0; i < targ_num; i++ )
- {
- std::string s_id;
- *protocolStream >> s_id;
- int id = atoi( s_id.c_str() );
- // receive address and port number
- std::string addr;
- *protocolStream >> addr;
- std::string control_portnum;
- *protocolStream >> control_portnum;
- int c_port = atoi( control_portnum.c_str() );
- Sockaddr control( addr, c_port );
- std::string broadcast_portnum;
- *protocolStream >> broadcast_portnum;
- int b_port = atoi( broadcast_portnum.c_str() );
- Sockaddr broadcast( addr, b_port );
- // add, but not connect
- // add this as pending target: the connection involve reconnect, so
- // don't try to connect right here: this function only place that
- // new target into the queue
- videoThread->add_broadcast_target( id, control, broadcast );
- log( 2, "broadcast target: %s, control %d, broadcast %d",
- addr.c_str(), control_portnum.c_str(), broadcast_portnum.c_str() );
- }
- protocolStream->skip_to_prompt();
- }
- /** never exit, work until some exception */
- void protocol_thread::loop()
- {
- while( true )
- {
- if( shutdown_now )
- {
- log( 0, "throw ShutdownException, line %d", __LINE__ );
- throw ShutdownException( "shutdown by flag" );
- }
- // look for command queue
- dump_log();
- // and look for any pending input
- if( protocolStream->test_for_any_pending_input( 100 ) )
- process_unknown_input();
- dump_log();
- // pop commands from stack
- lookup_cmd_stack();
- // do we have text to transfer into the database?
- check_new_text();
- # ifdef _DEBUG
- static ask_interrupt_cnt = 0;
- if( ++ask_interrupt_cnt > 200 )
- {
- ask_interrupt_cnt = 0;
- printf( "do you wants to continue? (Y/n)" );
- int ch = getchar();
- if( ch == 'n' || ch == 'N' )
- {
- log( 0, "throw ShutdownException, line %d", __LINE__ );
- throw ShutdownException( "shutdown from keyboard" );
- }
- }
- # endif
- }
- }
- void protocol_thread::process_unknown_input()
- {
- std::vector< std::string > input;
- *protocolStream >> input;
- if( !input.size() ) // nothing
- {
- dump_log();
- return;
- }
- // if this is command, it will starts with word 'command'
- if( !protocolStream->is_command( input[0] ))
- {
- log( 0, "throw SyntaxException, line %d", __LINE__ );
- throw SyntaxException( "expected command, got %s", input[0].c_str() );
- }
- }
- void protocol_thread::sleep( int sec )
- {
- # ifdef _WIN32
- Sleep( sec * 1000 );
- # endif
- }
- void protocol_thread::lookup_cmd_stack()
- {
- wait_for_mutex wm( lock_various_cmdstacks, 5 );
- if( connected_push_servers.size() > 0 )
- {
- int id = connected_push_servers.front();
- log( 3, "cmd: report push server #%d ok to db server", id );
- connected_push_servers.pop();
- char buf[10];
- _itoa( id, buf, 10 );
- *protocolStream << cmd_push_server_ok << "n" << buf << "n";
- protocolStream->wait_input();
- // e are actually not interested about the result
- protocolStream->skip_to_prompt();
- }
- }
- void protocol_thread::check_new_text()
- {
- text_data *td = NULL;
- {
- // this is shared data:
- wait_for_mutex wm( lock_new_text, 1 );
- if( !new_text.size() )
- return; // nothing
- td = new_text.front();
- new_text.pop();
- // mutex is released here
- }
- // send this data to the database server
- *protocolStream << cmd_add_caption_noid << "n" << td->get_time() << "n"
- << td->get_text() << "n";
- protocolStream->skip_to_prompt();
- delete td;
- }
- void protocol_thread::shutdown_requested()
- {
- log( 3, "protocol thread: shutdown requested" );
- shutdown_now = true;
- if( videoThread )
- videoThread->shutdown_requested();
- }
- void protocol_thread::report_push_serv_ok( int push_id )
- {
- log( 3, "report push server #%d ok to db server", push_id );
- wait_for_mutex wm( lock_various_cmdstacks, 5 );
- connected_push_servers.push( push_id );
- }
- void protocol_thread::new_text_data( text_data *data )
- {
- wait_for_mutex wm( lock_new_text, 1 );
- new_text.push( data );
- }
- void protocol_thread::dump_log()
- {
- if( !log_output )
- return;
- wait_for_mutex wm( lock_log, 1 );
- std::vector< std::string > msgs;
- protocolStream->append_messages( msgs );
- // dump
- for( int i = 0; i < msgs.size(); i++ )
- {
- *log_output << msgs[i].c_str() << endl;
- }
- }
- void protocol_thread::log( int level, const char *s, ... )
- {
- if( level > verbose_level || !log_output )
- return;
- wait_for_mutex wm( lock_log, 1 );
- Timeval tv;
- tv.now();
- va_list ap;
- va_start( ap, s );
- char buf[1024];
- vsnprintf( buf, sizeof( buf ), s, ap );
- *log_output << tv << buf << endl;
- va_end( ap );
- }