VideoServerThread.java
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:29k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4.    VideoServer application
  5.      $Id: VideoServerThread.java,v 1.29 1999/03/13 03:06:34 andrew Exp $
  6. */
  7. package edu.sunysb.cs.ecsl.videoserver;
  8. import java.io.*;
  9. import java.net.*;
  10. import java.sql.*;
  11. import java.util.Vector;
  12. import java.lang.reflect.Field;
  13. import java.util.Hashtable;
  14. import java.util.Calendar;
  15. /** When new connection is accepted, the VideoServerThread starts and 
  16.     establish the text-based protocol 
  17.     @author Andrew Shuvalov
  18. */
  19. class VideoServerThread extends Thread implements TextProtocol 
  20. {
  21.   protected VideoServer application = null;
  22.   Socket socket;
  23.   private DataOutputStream outputstream = null;
  24.   private BufferedReader inputreader = null;
  25.   /** VideoServerThread class provides also the debugging facilities to 
  26.       corresponding connected user. System-level debug is performed by the 
  27.       main thread, method "log"
  28.   */
  29.   private int debugLevel = 0;
  30.   /** database connection
  31.    */
  32.   DatabaseConnection dbConnection = null;
  33.   private boolean SuperuserStatus = false;
  34.   /** share single copy
  35.    */
  36.   static VSProperties properties = null;
  37.   /** holds all protocol items - wich function to call on which keyword
  38.    */
  39.   java.util.Hashtable TextProtocolItems = new java.util.Hashtable();
  40.   /** types of this client
  41.    */
  42.   final int clientIsPlayer = 0;
  43.   final int clientIsAcqServer = 1;
  44.   /** default is player
  45.    */
  46.   private int typeOfClient = 0;
  47.   /** if client is acquisition server */
  48.   private int channelSelfId = -1;
  49.   public int get_channel_id() { return channelSelfId; }
  50.   /** 0 if not recording now, otherwise the movie ID */
  51.   private int movieId = 0;
  52.   /** Use the _local_ port number as ID of the thread
  53.    */
  54.   public VideoServerThread( VideoServer app, Socket sock, 
  55.     VSProperties props, ThreadGroup group ) 
  56.     {
  57.       super( group, String.valueOf( sock.getLocalPort() ) );
  58.       socket = sock;
  59.       application = app;
  60.       properties = props;
  61.       
  62.       // init the protocol items
  63.       Field fields[]; 
  64.       try
  65. {
  66.   fields = Class.forName
  67.     ( "edu.sunysb.cs.ecsl.videoserver.TextProtocol" ).
  68.     getDeclaredFields();
  69.   String c_pattern[] = new String[3];
  70.   for( int i = 0; i < fields.length; i++ )
  71.     {
  72.       Class c = fields[i].getType();
  73.       if( c.isArray() == false )
  74. continue;
  75.       
  76.       try
  77. {
  78.   String value[] = (String[])fields[i].get( c_pattern );
  79.   String key = value[0].toLowerCase().trim();
  80.   // now value[0] represents the string of our keyword
  81.   TextProtocolItem item = 
  82.     new TextProtocolItem( this, value[0] );
  83.   TextProtocolItems.put( key, item );
  84. } catch( Exception e ) {
  85.   application.log( 2, e.toString() );
  86. }
  87.     }
  88. } catch( ClassNotFoundException e ) {
  89.   System.err.print( e.toString() );
  90. }
  91.     }
  92.   public void run() 
  93.     {
  94.       try {
  95. outputstream = new DataOutputStream( socket.getOutputStream() );
  96. inputreader = new BufferedReader( new InputStreamReader
  97.   ( socket.getInputStream() ));
  98. dbConnection = new DatabaseConnection( this, application, 
  99.       outputstream );
  100.       } catch (IOException e) {
  101. return;
  102.       } catch (SQLException e) {
  103. return;
  104.       }
  105.       InterfaceLoop();
  106.       
  107.       // remove itself from channel list ( this call is ignored if this 
  108.       // connection is not a channel
  109.       if( typeOfClient == clientIsAcqServer && 
  110. application.channelRegistry != null )
  111. application.channelRegistry.removeChannel( channelSelfId );
  112.       
  113.       /** alwais close port and close database when going out
  114.        */
  115.       application.log( 1, "close connection " + socket.toString() );
  116.       try {
  117. outputstream.writeBytes( "rn" );
  118. socket.close();
  119.       } catch (IOException e) {
  120. return;
  121.       }
  122.       // when the 'run' method ends, thread is ended too
  123.     }
  124.   private void putHelp() {
  125.     try {
  126.       outputstream.writeBytes("n# Text protocoln" );
  127.       outputstream.writeBytes("    Prefixes of server messages:n");
  128.       outputstream.writeBytes
  129. ("t>>>tpromptnt->tfollowing text must be parsed");
  130.       outputstream.writeBytes("nt#tcommentnt#!tErrorn");
  131.       outputstream.writeBytes("    Keywords:n");
  132.       outputstream.writeBytes( shortHelp( _help_ ));
  133.       outputstream.writeBytes( shortHelp( _kill_server_ ));
  134.       outputstream.writeBytes( shortHelp( _superuser_ ));
  135.       outputstream.writeBytes( shortHelp( _debug_level_ ));
  136.       outputstream.writeBytes( shortHelp( _database_name_ ));
  137.       outputstream.writeBytes( shortHelp( _list_movies_ ));
  138.       outputstream.writeBytes( shortHelp( _list_mov_files_ ));
  139.       outputstream.writeBytes( shortHelp( _list_captions_ ));
  140.       outputstream.writeBytes( shortHelp( _play_movie_ ));
  141.       if( SuperuserStatus == true )
  142. {
  143.   outputstream.writeBytes( "    Superuser commands:n" );
  144.   outputstream.writeBytes( shortHelp( _add_new_movie_ ));
  145.   outputstream.writeBytes( shortHelp( _add_movie_file_ ));
  146.   outputstream.writeBytes( shortHelp( _add_caption_ ));
  147.   outputstream.writeBytes( shortHelp( _save_properties_ ));
  148.   outputstream.writeBytes( shortHelp( _list_properties_ ));
  149. }
  150.     } catch (IOException e) {
  151.     }
  152.   }
  153.   private String shortHelp( String[] h ) 
  154.     {
  155.       return "t" + h[KEY] + "t" + h[SHDESC] + "n";
  156.     }
  157.   /** parse protocol 
  158.    */
  159.   private void InterfaceLoop() {
  160.     try {
  161.       outputstream.writeBytes( "Video ServernConnection establishedn" );
  162.     } catch (IOException e) {
  163.       debug_forced( "!" + e.toString() );
  164.     }
  165.     while( true ) {
  166.       // this is a prompt
  167.       try {
  168. outputstream.writeBytes( _prompt_[KEY] );
  169.       } catch (IOException e) {
  170. debug_forced( "!" + e.toString() );
  171. break;
  172.       }
  173.       
  174.       String s = null;
  175.       try {
  176. s = inputreader.readLine();
  177. if( s == null ) // connection broken?
  178.   break;
  179. if( s.trim().length() == 0 )
  180.   continue;
  181.       } catch (IOException e) {
  182. debug_forced( "!" + e.toString() );
  183. break;
  184.       }
  185.       // the first word of s is keyword. Break s into the array of words
  186.       Vector args = splitStringToWords( s );
  187.       String keyCommand = ((String)args.elementAt( 0 )).
  188. toLowerCase().trim();
  189.       application.log( 2, "command " + keyCommand );
  190.       // try - block for syntax errors
  191.       try 
  192. {
  193.   TextProtocolItem item = 
  194.     (TextProtocolItem)TextProtocolItems.get( keyCommand );
  195.   if( item != null )
  196.     {
  197.       // here the parsed command is executed
  198.       item.execute( args );
  199.     }
  200.   else 
  201.     {
  202.       application.log( 1, "Unknown command " + keyCommand );
  203.       throw new SyntaxException( "Unknown command " + keyCommand );
  204.     }
  205. }
  206.       catch ( KillRequestedException e ) {
  207. break;
  208.       }
  209.       catch ( SyntaxException e ) {
  210. debug_forced( "!" + e.toString() );
  211.       }
  212.       catch ( IOException e ) {
  213. debug_forced( "!" + e.toString() );
  214. break;  // we end this thread
  215.       }
  216.       catch ( Exception e ) {
  217. // all other exceptions...
  218. debug_forced( "!" + e.toString() );
  219. break;  // we end this thread
  220.       }
  221.     } // while
  222.     try {
  223.       outputstream.writeBytes( "->n" + _shutdown_[KEY] + "n<-n" );
  224.     } catch (IOException e) {
  225.     }
  226.   }
  227.   /** this is not called too often; so create datagram socket, send data
  228.       and close the socket. Do not keep the connection
  229.   */
  230.   protected void commandToPushServer( String serverNum, String arg )
  231.     throws IOException
  232.     {
  233.       byte[] byterepr = arg.getBytes();
  234.       InetAddress addr;
  235.       try
  236. {
  237.   addr = InetAddress.getByName
  238.     ( properties.getProperty( properties.PushServN_IP + 
  239.       serverNum ) );
  240. } catch ( UnknownHostException e ) {
  241.   application.log( 1, e.toString() );
  242.   throw new IOException( e.toString() );
  243. }
  244.       int port = properties.getPropertyInt
  245. ( properties.PushServN_Control + serverNum );
  246.       application.log( 2, "command to push server " + addr.toString() + 
  247. " port " + String.valueOf( port ));
  248.       DatagramPacket packet = new DatagramPacket( byterepr, byterepr.length,
  249.   addr, port );
  250.       DatagramSocket socket = new DatagramSocket();
  251.       socket.send( packet );
  252.     }
  253.   /** utility to split the string
  254.    */
  255.   public static Vector splitStringToWords( String s ) {
  256.     Vector res = new Vector();
  257.     if( s == null )
  258.       return res;
  259.     int pos1 = 0, pos2 = 0;
  260.     while( s.length() > pos1 ) {
  261.       while( pos1 < s.length() && ( s.charAt( pos1 ) == ' ' 
  262.     || s.charAt( pos1 ) == 't' ) )
  263. pos1 ++;
  264.       pos2 = pos1;
  265.       while( pos2 < s.length() && s.charAt( pos2 ) != ' ' 
  266.      && s.charAt( pos2 ) != 't' )
  267. pos2 ++;
  268.       String ts = s.substring( pos1, pos2 );
  269.       res.addElement( ts );
  270.       pos1 = pos2;
  271.     }
  272.     return res;
  273.   }
  274.   /** debugging is performed through protocol, debug strings begins 
  275.       by # */
  276.   public void debug( String s, int level ) {
  277.     if( debugLevel < 1 || debugLevel < level )
  278.       return;
  279.     try {
  280.       outputstream.writeBytes( "#" + s );
  281.       outputstream.writeBytes( "n" );
  282.     } catch (IOException e) {
  283.     }
  284.   }
  285.   public void debug( String s ) {
  286.     debug( s, 1 );
  287.   }
  288.   /** debug message that is 'forced' to appear
  289.    */
  290.   public void debug_forced( String s ) {
  291.     try {
  292.       outputstream.writeBytes( "#" + s + "n" );
  293.     } catch (IOException e) {
  294.     }
  295.   }
  296.   public void kill_impl( Vector args )
  297.     throws KillRequestedException
  298.     {
  299.       throw new KillRequestedException( "Explicit kill" );
  300.     }
  301.   public void superuser_impl( Vector args )
  302.     throws KillRequestedException
  303.     {
  304.       // if authorization fails - break connection immediately
  305.       if( args.size() != 2 )
  306. throw new KillRequestedException( "Explicit kill" );
  307.       if( false == application.testPassward
  308.   ( (String)args.elementAt( 1 ) ))
  309. throw new KillRequestedException( "Explicit kill" );
  310.       SuperuserStatus = true;
  311.     }
  312.   public void debug_impl( Vector args )
  313.     {
  314.       if( args.size() == 1 )  // no digit argument -> default 1
  315. {
  316.   debugLevel = 1;
  317.   debug( "Deafault debug level 1" );
  318.   return;
  319. }
  320.       else {
  321. try {
  322.   debugLevel = Integer.parseInt( ((String)args.elementAt( 1 )) );
  323.   debug( "Debug level " + debugLevel );
  324. } catch (NumberFormatException e ) {
  325.   debug_forced( "!" + e.toString() );
  326. }
  327.       }
  328.     }
  329.   public void help_impl( Vector args )
  330.     {
  331.       putHelp();
  332.     }
  333.   public void database_impl( Vector args )
  334.     throws SyntaxException, IOException
  335.     {
  336.       try {
  337. dbConnection.database_impl( args, inputreader, outputstream,
  338.     SuperuserStatus );
  339.       } catch ( SQLException e ) {
  340. debug_forced( "!" + e.toString() );
  341.       }
  342.     }
  343.   public void listmovies_impl( Vector args )
  344.     throws SyntaxException, IOException
  345.     {
  346.       try {
  347. dbConnection.listmovies_impl( args, inputreader, outputstream,
  348.       SuperuserStatus );
  349.       } catch ( SQLException e ) {
  350. debug_forced( "!" + e.toString() );
  351.       }
  352.     }
  353.   public void listmovfiles_impl( Vector args )
  354.     throws SyntaxException, IOException
  355.     {
  356.       try {
  357. dbConnection.listmovfiles_impl( args, inputreader, outputstream,
  358. SuperuserStatus );
  359.       } catch ( SQLException e ) {
  360. debug_forced( "!" + e.toString() );
  361.       }
  362.     }
  363.   public void listcaptions_impl( Vector args )
  364.     throws SyntaxException, IOException
  365.     {
  366.       try {
  367. dbConnection.list_captions( args, inputreader, outputstream,
  368.     SuperuserStatus );
  369.       } catch ( SQLException e ) {
  370. debug_forced( "!" + e.toString() );
  371.       }
  372.     }
  373.   public void addnewmovie_impl( Vector args )
  374.     throws SyntaxException, IOException
  375.     {
  376.       int id = 0;
  377.       try {
  378. if( args.size() != 1 ) 
  379.   throw new SyntaxException( _add_new_movie_[KEY] + " " + 
  380.      _add_new_movie_[LDESC] );
  381. String movieName = "", startTime = "", stopTime = "";
  382. try {
  383.   do {
  384.     movieName = inputreader.readLine();
  385.   } while( movieName.trim().length() == 0 );
  386.   do {
  387.     startTime = inputreader.readLine();
  388.   } while( startTime.trim().length() == 0 );
  389.   do {
  390.     stopTime = inputreader.readLine();
  391.   } while( stopTime.trim().length() == 0 );
  392. } catch (IOException e) {
  393.   throw new SyntaxException( _add_new_movie_[KEY] + " " + 
  394.      _add_new_movie_[LDESC] );
  395. }
  396. if( SuperuserStatus == false )
  397.   {
  398.     debug_forced
  399.       ("!You are not authorized to add movien");
  400.     outputstream.writeBytes( "->n" + _disconnect_[KEY] + "n<-n" );
  401.     return;
  402.   }
  403. id = dbConnection.add_new_movie( movieName, startTime, stopTime );
  404.       } catch ( SQLException e ) {
  405. debug_forced( "!" + e.toString() );
  406.       }
  407.       outputstream.writeBytes( "->n" + String.valueOf( id ) + "n<-n" );
  408.     }
  409.   public void addmoviefile_impl( Vector args )
  410.     throws SyntaxException, IOException
  411.     {
  412.       if( args.size() != 1 ) 
  413. throw new SyntaxException( _add_movie_file_[KEY] + " " + 
  414.    _add_movie_file_[LDESC] );
  415.       try {
  416. String mov_id = "", pushserv_id = "", mpegName = "", 
  417.   startTime = "", stopTime = "";
  418.     
  419. try {
  420.   do {
  421.     mov_id = inputreader.readLine();
  422.   } while( mov_id.trim().length() == 0 );
  423.   do {
  424.     pushserv_id = inputreader.readLine();
  425.   } while( pushserv_id.trim().length() == 0 );
  426.   do {
  427.     mpegName = inputreader.readLine();
  428.   } while( mpegName.trim().length() == 0 );
  429.   do {
  430.     startTime = inputreader.readLine();
  431.   } while( startTime.trim().length() == 0 );
  432.   do {
  433.     stopTime = inputreader.readLine();
  434.   } while( stopTime.trim().length() == 0 );
  435. } catch (IOException e) {
  436.   throw new SyntaxException( _add_movie_file_[KEY] + " " + 
  437.      _add_movie_file_[LDESC] );
  438. }
  439. if( SuperuserStatus == false )
  440.   {
  441.     debug_forced("!You are not authorized to " +
  442.        "add movie filesn");
  443.     outputstream.writeBytes( "->n" + _disconnect_[KEY] + "n<-n" );
  444.     return;
  445.   }
  446. dbConnection.add_movie_file( mov_id, pushserv_id, mpegName, 
  447.      startTime, stopTime );
  448.       } catch ( SQLException e ) {
  449. debug_forced( "!" + e.toString() );
  450.       }
  451.     }
  452.   public void addcaption_impl( Vector args )
  453.     throws SyntaxException, IOException
  454.     {
  455.       if( args.size() != 1 ) 
  456. throw new SyntaxException( _add_caption_[KEY] + " " + 
  457.    _add_caption_[LDESC] );
  458.       String id = "", time = "", text = "";
  459.       try {
  460. do {
  461.   id = inputreader.readLine();
  462. } while( id.trim().length() == 0 );
  463. do {
  464.   time = inputreader.readLine();
  465. } while( time.trim().length() == 0 );
  466. // text may be empty string. don't trim it
  467. text = inputreader.readLine();
  468.       } catch (IOException e) {
  469. throw new SyntaxException( _add_caption_[KEY] + " " + 
  470.    _add_caption_[LDESC] );
  471.       }
  472.       if( SuperuserStatus == false && typeOfClient != clientIsAcqServer )
  473. {
  474.   debug_forced
  475.     ("!You are not authorized to add captions");
  476.   outputstream.writeBytes( "->n" + _disconnect_[KEY] + "n<-n" );
  477.   return;
  478. }
  479.       try {
  480. dbConnection.add_caption( id, time, text );
  481.       } catch ( SQLException e ) {
  482. debug_forced( "!" + e.toString() );
  483.       }
  484.     }
  485.   /** that's a variant of the same command but without the movie ID. This
  486.       may come only from acquisition server and movie ID is known. It 
  487.       may be zero as well if no recording happens */
  488.   public void addcaptionnoid_impl( Vector args )
  489.     throws SyntaxException, IOException
  490.     {
  491.       if( args.size() != 1 ) 
  492. throw new SyntaxException( _add_caption_[KEY] + " " + 
  493.    _add_caption_[LDESC] );
  494.       String time = "", text = "";
  495.       try {
  496. do {
  497.   time = inputreader.readLine();
  498. } while( time.trim().length() == 0 );
  499. // text may be empty string. don't trim it
  500. text = inputreader.readLine();
  501.       } catch (IOException e) {
  502. throw new SyntaxException( _add_caption_[KEY] + " " + 
  503.    _add_caption_[LDESC] );
  504.       }
  505.       if( typeOfClient != clientIsAcqServer )
  506. {
  507.   debug_forced
  508.     ("!You are not authorized to add captions");
  509.   outputstream.writeBytes( "->n" + _disconnect_[KEY] + "n<-n" );
  510.   return;
  511. }
  512.       // what is the movie id?
  513.       if( movieId == 0 )
  514. return;
  515.       try {
  516. dbConnection.add_caption( String.valueOf( movieId ), time, text );
  517.       } catch ( SQLException e ) {
  518. debug_forced( "!" + e.toString() );
  519.       }
  520.     }
  521.   public void saveproperties_impl( Vector args )
  522.     {
  523.       if( SuperuserStatus == true )
  524. {
  525.   try {
  526.     application.properties.save_app_defaults();
  527.   }
  528.   catch( Throwable e ) {
  529.     debug_forced( "!" + e.toString() );
  530.   }
  531. }
  532.     }
  533.   public void listproperties_impl( Vector args )
  534.     throws IOException
  535.     {
  536.       if( SuperuserStatus == true )
  537. {
  538.   properties.list_properties( outputstream );
  539. }
  540.     }
  541.   public void setproperty_impl( Vector args )
  542.     throws SyntaxException, IOException
  543.     {
  544.       if( SuperuserStatus == true )
  545. {
  546.   properties.set_property( inputreader, outputstream );
  547. }
  548.     }
  549.   public void startrecording_impl( Vector args )
  550.     throws SyntaxException, IOException
  551.     {
  552.       if( SuperuserStatus == true )
  553. {
  554.   // get channel ID from input
  555.   String ch_id = "";
  556.   try {
  557.     do {
  558.       ch_id = inputreader.readLine();
  559.     } while( ch_id.trim().length() == 0 );
  560.   } catch (IOException e) {
  561.     throw new SyntaxException( _start_recording_[KEY] + 
  562.        " " + 
  563.        _start_recording_[LDESC] );
  564.   }
  565.   application.log( 1, "Start recording the channel " + ch_id );
  566.   // we got channel id and wants to start the recording
  567.   // let first find all push servers who receive it
  568.   Vector pushserv_ids = 
  569.     properties.list_push_serv_ids_by_ch_id( Integer.
  570.     parseInt( ch_id ));
  571.   if( pushserv_ids.size() == 0 )
  572.     {
  573.       outputstream.writeBytes( "->n" );
  574.       debug( "Nobody can record this channel", 0 );
  575.       outputstream.writeBytes( "<-n" );
  576.       return;
  577.     }
  578.   // first add the movie title to the database and receive 
  579.   // the movie ID. 
  580.   String description = properties.getProperty
  581.     ( properties.AcquisitionServerN_DefDescr + ch_id );
  582.   java.util.Calendar rightNow = Calendar.getInstance();
  583.   
  584.   try {
  585.     int movie_id = dbConnection.add_new_movie
  586.       ( description, rightNow, null );   // stop time is null (unknown)
  587.     application.log( 2, "New movie id is " + 
  588.      String.valueOf( movie_id ) );
  589.     for( int i = 0; i < pushserv_ids.size(); i++ )
  590.       {
  591. String push_id = (String) pushserv_ids.elementAt( i );
  592. // tell this push server to start recording
  593. String filename = set_pushserv_mpeg_filename( push_id );
  594. // then push server already started recording - store this 
  595. // filename into the database
  596. dbConnection.add_movie_file
  597.   ( String.valueOf( movie_id ), 
  598.     push_id, filename, rightNow, null );
  599.       }
  600.     // that's not all yet. We got the new movie ID and we want the
  601.     // acquisition server to send close captions to the database 
  602.     // and they will be stored by movie ID. So find the acq. server
  603.     // thread and tell it the movie ID.
  604.     application.set_acq_serv_movie_id( Integer.parseInt( ch_id ), 
  605.        movie_id );
  606.     application.log( 3, "recording started" );
  607.     outputstream.writeBytes( "->n" );
  608.     debug( "recording started", 1 );
  609.     outputstream.writeBytes( "<-n" );
  610.   } catch ( SQLException e) {
  611.     outputstream.writeBytes( "->n" );
  612.     debug_forced( "Can't start recording: " + e.toString() );
  613.     outputstream.writeBytes( "<-n" );
  614.   } catch ( Exception e) {
  615.     outputstream.writeBytes( "->n" );
  616.     outputstream.writeBytes( "<-n" );
  617.     application.log( 0, "recording: " + e.toString() );
  618.   }
  619. }
  620.       else
  621. {
  622.   outputstream.writeBytes( "->n" );
  623.   debug_forced( "You are not authorized to start recording" );
  624.   outputstream.writeBytes( "<-n" );
  625.   application.log( 0, "client not authorized to start recording" );
  626. }
  627.     }
  628.   /** the single parameter is the channel id */
  629.   public void stoprecording_impl( Vector args )
  630.     throws SyntaxException, IOException
  631.     {
  632.       if( SuperuserStatus == true )
  633. {
  634.   // get channel ID from input
  635.   String ch_id = "";
  636.   try {
  637.     do {
  638.       ch_id = inputreader.readLine();
  639.     } while( ch_id.trim().length() == 0 );
  640.   } catch (IOException e) {
  641.     throw new SyntaxException( _start_recording_[KEY] + 
  642.        " " + 
  643.        _start_recording_[LDESC] );
  644.   }
  645.   application.log( 1, "Stop recording the channel " + ch_id );
  646.   // we got channel id and wants to stop the recording
  647.   // let first find all push servers who receive it
  648.   Vector pushserv_ids = 
  649.     properties.list_push_serv_ids_by_ch_id( Integer.
  650.     parseInt( ch_id ));
  651.   if( pushserv_ids.size() == 0 )
  652.     {
  653.       outputstream.writeBytes( "->n" );
  654.       debug( "Nobody is recording this channel", 0 );
  655.       outputstream.writeBytes( "<-n" );
  656.       return;
  657.     }
  658.   // tell each push server to stop the recording
  659.   for( int i = 0; i < pushserv_ids.size(); i++ )
  660.     {
  661.       String push_id = (String) pushserv_ids.elementAt( i );
  662.       reset_pushserv_mpeg_filename( push_id );
  663.     }
  664.   int movie_id = 
  665.     application.get_acq_serv_movie_id( Integer.parseInt( ch_id ) );
  666.   application.set_acq_serv_movie_id( Integer.parseInt( ch_id ), 0 );
  667.   try {
  668.     // database should store the time when movie did ends
  669.     // this call will modify the database to setup the
  670.     // stop time for this movie
  671.     dbConnection.mark_movie_stop_time( movie_id );
  672.   } catch ( SQLException e) {
  673.     debug_forced( "Stop time remains empty: " + e.toString() );
  674.   }
  675.   outputstream.writeBytes( "->n" );
  676.   debug( "recording stopped", 1 );
  677.   outputstream.writeBytes( "<-n" );
  678. }
  679.       else
  680. {
  681.   outputstream.writeBytes( "->n" );
  682.   debug_forced( "You are not authorized to stop recording" );
  683.   outputstream.writeBytes( "<-n" );
  684.   application.log( 0, "client not authorized to stop recording" );
  685. }
  686.     }
  687.   /** 
  688.       if the id of movie is not zero, that means that this acquisition server 
  689.       thread is in process of recording. If zero - recording stopped
  690.   */
  691.   protected void set_movie_id( int id ) 
  692.     { 
  693.       movieId = id; 
  694.     }
  695.     
  696.   protected int get_movie_id() 
  697.     { 
  698.       return movieId; 
  699.     }
  700.     
  701.   public void listchannels_impl( Vector args )
  702.     throws IOException
  703.     {
  704.       // get this client IP from socket and pass it to the channelRegistry
  705.       // to find all channels that are broadcasted here
  706.       String ip = socket.getInetAddress().getHostAddress();
  707.       ip = convert_gen_127_addr( ip );
  708.       outputstream.writeBytes( "->n" );
  709.       if( application.channelRegistry != null )
  710. application.channelRegistry.listChannels( outputstream, ip );
  711.       outputstream.writeBytes( "<-n" );
  712.     }
  713.   // ---------- Acquisition server commands --------------
  714.   /** look at the incoming connection address ( it was stored in socket ) 
  715.       and determine this channel Id. It should be known from properties.
  716.       If this acquisition server is not authorized to connect - return
  717.       -1 as Id 
  718.   */
  719.   public void registeracquisitionserver_impl( Vector args )
  720.     throws IOException, KillRequestedException
  721.     {
  722.       typeOfClient = clientIsAcqServer;
  723.       InetAddress iaddr = socket.getInetAddress();
  724.       String saddr = iaddr.getHostAddress();
  725.       int ch_nums = properties.getPropertyInt( properties.
  726.        AcquisitionServersNumber );
  727.       channelSelfId = -1;
  728.       for( int i = 0; i < ch_nums; i++ )
  729. {
  730.   String id = String.valueOf( i + 1 );
  731.   String s_ip = properties.getProperty( properties.
  732. AcquisitionServerN_IP + id);
  733.   // if this ip address is equal to incoming - Id is determined
  734.   if( s_ip.equals( saddr ))
  735.     {
  736.       channelSelfId = i + 1;
  737.       break;
  738.     }
  739. }
  740.       if( channelSelfId != -1 )
  741.         application.channelRegistry.addNewChannel( this, channelSelfId );
  742.       // reply the ID only
  743.       outputstream.writeBytes( "->n" );
  744.       outputstream.writeBytes( String.valueOf( channelSelfId ) + "n" );
  745.       outputstream.writeBytes( "<-n" );
  746.       application.log( 2, "new acq. server, #"
  747. + String.valueOf( channelSelfId ) );
  748.       
  749.       // if authorization failed
  750.       if( channelSelfId == -1 )
  751. throw new KillRequestedException( "Authorization failed" );
  752.       // at that point reply nothing else, client will request the broadcast 
  753.       // parameters with separate command
  754.     }
  755.   /** reply format: num of push servers, for each server: ip, control port,
  756.       broadcast port
  757.    */
  758.   public void requestbroadcastparameters_impl( Vector args )
  759.     throws IOException
  760.     {
  761.       Vector push_ids = properties.list_push_serv_ids_by_ch_id(channelSelfId);
  762.       outputstream.writeBytes( "->n" );
  763.       outputstream.writeBytes( push_ids.size() + "n" ); // number of servers
  764.       application.log( 2, "reporting " + push_ids.size() + " push servers " +
  765.        "for channel #" + String.valueOf( channelSelfId ));
  766.       if( push_ids.size() == 0 )
  767. {
  768.   // sorry, nothing
  769.   outputstream.writeBytes( "<-n" );
  770.   return;
  771. }
  772.       for( int i = 0; i < push_ids.size(); i++ )
  773. {
  774.   String id = (String) push_ids.elementAt( i );
  775.   outputstream.writeBytes( id + "n" );
  776.   outputstream.writeBytes( properties.getProperty
  777.    ( properties.PushServN_IP + id ) + "n" );
  778.   outputstream.writeBytes( properties.getProperty
  779.    ( properties.PushServN_Control
  780.      + id ) + "n" );
  781.   outputstream.writeBytes( properties.getProperty
  782.    ( properties.PushServN_BroadcastInPort 
  783.      + id ) + "n" );
  784.   application.log( 2, "push server #" + id );
  785.   // push server should wait for incoming data connection on start-up,
  786.   // if it does not - acq server may troubleshut that by sending
  787.   // a command 
  788. }
  789.       outputstream.writeBytes( "<-n" );
  790.     }
  791.   /** everything is ok - so inform the push server what to do next */
  792.   public void pushserverok_impl( Vector args )
  793.     throws IOException
  794.     {
  795.       try {
  796. String s_id = "";
  797. try {
  798.   do {
  799.     s_id = inputreader.readLine();
  800.     
  801.   } while( s_id.trim().length() == 0 );
  802.   // tell server to broadcast
  803.   set_pushserv_broadcasts( s_id );
  804.   // and then report success
  805.   outputstream.writeBytes( "->n" );
  806.   outputstream.writeBytes( "1n" );
  807.   outputstream.writeBytes( "<-n" );
  808.   application.log( 2, "push server #" + s_id + " is reported ok" );
  809. } catch (IOException e) {
  810.   // report error back
  811.   outputstream.writeBytes( "->n" );
  812.   outputstream.writeBytes( "0n" );
  813.   outputstream.writeBytes( "<-n" );
  814.   throw new SyntaxException( _push_server_ok_[KEY] + 
  815.      " " + _push_server_ok_[LDESC] );
  816. }
  817.       } catch ( Exception e ) {
  818. System.err.println("Exception: " + e.toString());
  819.       }
  820.     }
  821.   /** this method requests to construct the filename for the mpeg data file
  822.       stored on the particular push server and send this command to push 
  823.       server.
  824.       Returns relative path of the new mpeg filename - store it in 
  825.       database
  826.   */
  827.   private String set_pushserv_mpeg_filename( String push_server_id )
  828.     throws IOException
  829.     {
  830.       String pprefix = properties.getProperty
  831. ( properties.PushServN_PPrefix + push_server_id );
  832.       java.util.Calendar rightNow = Calendar.getInstance();
  833.       String subpath = 
  834. String.valueOf( rightNow.get(Calendar.YEAR) ) + 
  835. padZero( rightNow.get(Calendar.MONTH), 2 ) +
  836. padZero( rightNow.get(Calendar.DAY_OF_MONTH), 2 );
  837.       // file name must be unique for this day
  838.       String fname = padZero( rightNow.get(Calendar.HOUR_OF_DAY), 2 ) +
  839. padZero( rightNow.get(Calendar.MINUTE), 2 );
  840.       // this is what will be recorded in database:
  841.       fname = subpath + "/" + fname;
  842.       // concat 
  843.       String full_fname = pprefix + "/" + fname;
  844.       commandToPushServer
  845. ( push_server_id, push__set_filename_[KEY] + "n" + 
  846.   full_fname + ".mpgn" );
  847.       return fname + ".mpg";
  848.     }
  849.   /** tell push server 'id' to stop recording. Each push server may record 
  850.       only its own well known channel, so no more information is needed 
  851.   */
  852.   private void reset_pushserv_mpeg_filename( String push_server_id )
  853.     throws IOException
  854.     {
  855.       commandToPushServer
  856. ( push_server_id, push__stop_recording_[KEY] + "n" );
  857.     }
  858.       
  859.     /** instruct push server to broadcast what it have on input to those 
  860. broadcast destinations 
  861.     */
  862.     protected void set_pushserv_broadcasts( String push_server_id )
  863.       throws IOException
  864.     {
  865.       // push server not only store file to disk, but also broadcast it 
  866.       // outside
  867.       int num_of_broadcasts = properties.getPropertyInt
  868. ( properties.PushServN_NumBroadcasts + push_server_id );
  869.       application.log( 2, "Set " + String.valueOf( num_of_broadcasts ) + 
  870. " broadcasts for push server #" + push_server_id );
  871.       for( int c = 0; c < num_of_broadcasts; c++ )
  872. {
  873.   try {
  874.     String c_id = String.valueOf( c + 1 );
  875.     String broadcast_addr = properties.getProperty
  876. ( properties.PushServN_BroadcastsM_IP + push_server_id + 
  877.   "_" + c_id );
  878.     String broadcast_port = properties.getProperty
  879. ( properties.PushServN_BroadcastsM_Port + push_server_id + 
  880.   "_" + c_id );
  881.     commandToPushServer( push_server_id, 
  882.  push__add_broadcast_destination_[KEY] + "n" +
  883.  broadcast_addr + "n" + broadcast_port+"n");
  884.   } catch ( IOException e ) {
  885.       application.log( 1, "can't send command to push server " +
  886. e.toString() );
  887.   }
  888. }
  889.     }
  890.   /** utility to pad zeroes before integer, if integer representation 
  891.       is shorter than len */
  892.   public String padZero( int value, int len )
  893.     {
  894.       String repr = String.valueOf( value );
  895.       while( repr.length() < len )
  896. repr = new String( "0" ) + repr;
  897.       return repr;
  898.     }
  899.   public String convert_gen_127_addr( String ip )
  900.     {
  901.       try {
  902. if( ip.equals( "127.0.0.1" ))
  903.   {
  904.     ip = InetAddress.getLocalHost().getHostAddress();
  905.     return ip;
  906.   }
  907.       } catch ( Exception e ) {
  908. application.log( 2, e.toString() );
  909.       }
  910.       return ip;
  911.     }
  912. }