vclient_sii.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:12k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include <sys/socket.h>
  8. #include <sys/un.h> 
  9. #include <arpa/inet.h>
  10. #undef  HAVE_UNISTD_H
  11. #include <mtvp/mtvp.h>
  12. #include <mtvp/sii_api.h>
  13. #undef assert
  14. #include <assert.h>
  15. #include "session.h"
  16. #include "playback_window.h"
  17. #include "../../pusher/src/packet_header.h"
  18. #include "../../pusher/src/timeval.h"
  19. #define DEBUG
  20. /** both playback window and SII plugin must be sinchronized
  21.     to the same mode
  22. */
  23. typedef enum { stopped, playback, broadcast } operatingModeT;
  24. /** playback, broadcast, stopped mode.
  25.  */
  26. operatingModeT operatingMode = stopped;
  27. static int playbackSocketIn = -1;
  28. static int broadcastSocketIn = -1;
  29. static int sequence;
  30. static void close_sock();
  31. /** UDP packet is loaded into FIFO buffer, including header. When output to
  32.     the playerSocket, header must be stripped */
  33. static queue< PacketHeader * > dataToPlay;
  34. static stack< PacketHeader * > inusedBuffers;
  35. const unsigned maxPacketLength = 1472;
  36. static void debug( int level, const char *format, ... )
  37. {
  38. # ifdef DEBUG
  39.   va_list ap;
  40.   va_start( ap, format );
  41.   vprintf( format, ap );
  42.   va_end( ap );
  43. # else
  44.   va_list ap;
  45.   va_start( ap, format );
  46.   mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) + 1024 ];
  47.   msg->type = msg->debug;
  48.   msg->int_param = level;
  49.   vsnprintf( msg->data, 1024, format, ap );
  50.   va_end( ap );
  51.   sii_feedback( msg, sizeof( mtv_msg ) + strlen( msg->data ) );
  52.   delete msg;
  53. # endif
  54. }
  55. static void send_timestamp( const PacketHeader *data )
  56. {
  57.   // data may come with no timestamp. Don't send it
  58.   if( data->when.tv_sec == 0 )
  59.     return;
  60.   static int previous_timestamp_sec;
  61.   if( data->when.tv_sec == previous_timestamp_sec ) // not changed
  62.     return;
  63.   previous_timestamp_sec = data->when.tv_sec;
  64.   mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) 
  65.      + sizeof( struct timeval ) ];
  66.   msg->type = msg->timestamp;
  67.   *( struct timeval * ) msg->data = data->when;
  68.   sii_feedback( msg, sizeof( mtv_msg ) + sizeof( struct timeval ) );
  69.   delete msg;
  70. }
  71. static void send_port_id_watch( int portnum )
  72. {
  73.   mtv_msg *msg = (mtv_msg *) new char[ sizeof( mtv_msg ) ];
  74.   msg->type = msg->port_id_watch;
  75.   msg->int_param = portnum;
  76.   sii_feedback( msg, sizeof( mtv_msg ) );
  77.   delete msg;
  78. }
  79. static void *open_sock( mtv_open_struct *os )
  80. {
  81.   if( ! os ) 
  82.     return NULL;
  83.   if( os->action != os->open )
  84.     {
  85.       debug( 0, "SII: call open with close commandn" );
  86.       return NULL;
  87.     }
  88.   debug( 3, "SII: open socket, ports %d and %dn", os->playback_input_socket,
  89.  os->broadcast_input_socket );
  90.   if( playbackSocketIn == -1 && os->playback_input_socket != -1 )
  91.     {
  92.       playbackSocketIn = socket( PF_INET, SOCK_DGRAM, 0 );
  93.       if( -1 == playbackSocketIn )
  94. {
  95.   debug( 1, "SII plugin: socket fails, %sn", sys_errlist[ errno ] );
  96.   return NULL;
  97. }
  98.       // create input socket
  99.       // assign local address
  100.       struct sockaddr_in saddr;
  101.       saddr.sin_family = AF_INET;
  102.       saddr.sin_port = htons ( os->playback_input_socket );
  103.       saddr.sin_addr.s_addr = htonl (INADDR_ANY);             // input
  104.       if( -1 == bind ( playbackSocketIn, (struct sockaddr *) &saddr, 
  105.        sizeof(saddr)))
  106. {
  107.   playbackSocketIn = -1;
  108.   debug( 1, "SII plugin: bind playback %d port fails, %sn", 
  109.  os->playback_input_socket, sys_errlist[ errno ] );
  110.   return NULL;
  111. }
  112.       // set non-block mode
  113.       int oldflags = fcntl( playbackSocketIn, F_GETFL, 0 );
  114.       if( oldflags == -1 )
  115. {
  116.   playbackSocketIn = -1;
  117.   debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
  118.   return NULL;
  119. }
  120.       oldflags |= O_NONBLOCK;
  121.       if( -1 == fcntl( playbackSocketIn, F_SETFL, oldflags ))
  122. {
  123.   playbackSocketIn = -1;
  124.   debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
  125.   return NULL;
  126. }
  127.       debug( 2, "SII: playbackSocketIn is %dn", playbackSocketIn );
  128.     }
  129.   if( broadcastSocketIn == -1 && os->broadcast_input_socket != -1 )
  130.     {
  131.       broadcastSocketIn = socket( PF_INET, SOCK_DGRAM, 0 );
  132.       if( -1 == broadcastSocketIn )
  133. {
  134.   debug( 1, "SII plugin: socket fails, %sn", sys_errlist[ errno ] );
  135.   return NULL;
  136. }
  137.       // create input socket
  138.       // assign local address
  139.       struct sockaddr_in saddr;
  140.       saddr.sin_family = AF_INET;
  141.       saddr.sin_port = htons ( os->broadcast_input_socket );
  142.       saddr.sin_addr.s_addr = htonl (INADDR_ANY);             // input
  143.       if( -1 == bind ( broadcastSocketIn, (struct sockaddr *) &saddr, 
  144.        sizeof(saddr)))
  145. {
  146.   broadcastSocketIn = -1;
  147.   debug( 1, "SII plugin: bind broadcast %d port fails, %sn", 
  148.  os->broadcast_input_socket, sys_errlist[ errno ] );
  149.   return NULL;
  150. }
  151.       // set non-block mode
  152.       int oldflags = fcntl( broadcastSocketIn, F_GETFL, 0 );
  153.       if( oldflags == -1 )
  154. {
  155.   broadcastSocketIn = -1;
  156.   debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
  157.   return NULL;
  158. }
  159.       oldflags |= O_NONBLOCK;
  160.       if( -1 == fcntl( broadcastSocketIn, F_SETFL, oldflags ))
  161. {
  162.   broadcastSocketIn = -1;
  163.   debug( 1, "SII plugin: fcntl fails, %sn", sys_errlist[ errno ] );
  164.   return NULL;
  165. }
  166.       debug( 2, "SII: broadcastSocketIn is %dn", broadcastSocketIn );
  167.     }
  168.   operatingMode = stopped; // at the beginning
  169.   sequence = 0;
  170.   return (void *)1;
  171. }
  172. /** close socket
  173.     We must pay attention to the bug that if we close the socket and 
  174.     immediately re-open it, it will report that socket is in use.
  175.     So we must have two sockets and keep them open all the time.
  176.     Close sockets at the end of use
  177. */
  178. static void close_sock()
  179. {
  180.   if( playbackSocketIn > 0 ) 
  181.     {
  182.       shutdown( playbackSocketIn, 2 );
  183.       close( playbackSocketIn );
  184.     }
  185.   playbackSocketIn = -1;
  186.   if( broadcastSocketIn > 0 ) 
  187.     {
  188.       shutdown( broadcastSocketIn, 2 );
  189.       close( broadcastSocketIn );
  190.     }
  191.   broadcastSocketIn = -1;
  192. }
  193. void *SII_open( const void *data, u_int size )
  194. {
  195.   mtv_open_struct *os = (mtv_open_struct *)data;
  196.   if( size != sizeof( mtv_open_struct ))
  197.     {
  198.       debug( 1, "SII plugin: bad sizen" );
  199.       return NULL;
  200.     }
  201.   debug( 1, "SII plugin: open. Listen for sockets %d and %dn", 
  202.  os->playback_input_socket, os->broadcast_input_socket );
  203.   return open_sock( os );
  204. }
  205. int SII_close(void *)
  206. {
  207.   debug( 3, "SII closen" );
  208.   close_sock();
  209. }
  210. int SII_fd(void *)
  211. {
  212.   int reply;
  213.   reply = operatingMode == broadcast? broadcastSocketIn: playbackSocketIn;
  214.   debug( 3, "SII get fd, reply is %dn", reply );
  215.   // inform application that this port is watched
  216.   send_port_id_watch( reply );
  217.   return reply;
  218. }
  219. int SII_cntl( void *, const void *data, u_int size )
  220. {
  221.   debug( 3, "SII cntln" );
  222.   mtv_open_struct *os = (mtv_open_struct *)data;
  223.   if( os->action == os->open )
  224.     {
  225.       debug( 3, "SII cntl: open sockets, ports %d and %dn", 
  226.      os->playback_input_socket, os->broadcast_input_socket );
  227.       // reopen the socket with different port number
  228.       open_sock( os );
  229.       operatingMode = stopped;
  230.       return 1;
  231.     }
  232.   else if( os->action == os->close )
  233.     {
  234.       debug( 3, "SII cntl: close socket, id %d and %dn", 
  235.      playbackSocketIn, broadcastSocketIn );
  236.       close_sock();
  237.       operatingMode = stopped;
  238.       return 1;
  239.     }
  240.   else if( os->action == os->close )
  241.     {
  242.       debug( 3, "SII cntl: stopn", 
  243.      playbackSocketIn, broadcastSocketIn );
  244.       // do not close sockets
  245.       operatingMode = stopped;
  246.       return 1;
  247.     }
  248.   else if( os->action == os->goplayback )
  249.     {
  250.       debug( 3, "SII cntl: goplaybackn" );
  251.       operatingMode = playback;
  252.     }
  253.   else if( os->action == os->gobroadcast )
  254.     {
  255.       debug( 3, "SII cntl: gobroadcastn" );
  256.       operatingMode = broadcast;
  257.     }
  258.   return 0;
  259. }
  260. int SII_read(void *, char *buf, size_t count)
  261. {
  262.   debug( 4, "SII read %x, %dn", buf, count );
  263.   if( operatingMode == stopped )
  264.     return 0;
  265.   PacketHeader *data = 0;
  266.   int data_available = 0;
  267.   int num_of_buffers_we_need = count / maxPacketLength + 1;
  268.   int our_socket = operatingMode == playback? playbackSocketIn:
  269.     broadcastSocketIn;
  270.   while( --num_of_buffers_we_need >= 0  )
  271.     {
  272.       // get empty buffer
  273.       if( inusedBuffers.size() )
  274. {
  275.   data = inusedBuffers.top();
  276.   inusedBuffers.pop();
  277. }
  278.       else 
  279. data = (PacketHeader *)( new char[ maxPacketLength ] );
  280.       if( !data )
  281. {
  282.   debug( 1, "can't allocaten" );
  283.   return -1;
  284. }
  285.       fd_set set;
  286.       FD_ZERO( &set );
  287.       FD_SET( our_socket, &set );
  288.       // timeout is zero: read what we have, don't spend time to wait
  289.       Timeval tv( 0, 0 );
  290.       int ret = select( FD_SETSIZE, &set, NULL, NULL, &tv.val() );
  291.       if( -1 == ret )
  292. {            
  293.   perror ("select");
  294.   return -1;
  295. }
  296.       if( 0 == ret )  // timeout
  297. {
  298.   debug( 3, "SII: read called, but no data detectedn" );
  299.   // don't return 0 immediately
  300.   // it may happens that we have data in buffers
  301.   if( dataToPlay.size() )
  302.     break;
  303.   else
  304.     {
  305.       errno = EAGAIN; // this means that no data in stream, 
  306.       //              // call later again
  307.       return -1;
  308.     }
  309. }
  310.       int size = read( our_socket, data, maxPacketLength );
  311.       if( size == -1 && errno == EAGAIN )
  312. {
  313.   // non-blocking call - no data in socket
  314.   inusedBuffers.push( data );
  315.   break;
  316. }
  317.       if( size == -1 )
  318. {
  319.   // error
  320.   debug( 1, "recv error, %sn", sys_errlist[ errno ] );
  321.   inusedBuffers.push( data );
  322.   return size;
  323. }
  324.       if( size == 0 )  // EOF
  325. {
  326.   debug( 1, "no data in buffer, %sn", sys_errlist[ errno ] );
  327.   inusedBuffers.push( data );
  328.   break;
  329. }
  330.       if( size < maxPacketLength )
  331. {
  332.   debug( 1, "warning: udp packet corruptedn" );
  333. }
  334.       dataToPlay.push( data );
  335.       send_timestamp( data );
  336.     }
  337.   // maybe we have data in old buffers?
  338.   int amount_read = 0;
  339. # ifdef DEBUG
  340.   if( dataToPlay.size() == 0 )
  341.     debug( 1, "no data in buffersn" );
  342. # endif
  343.   while( count > 0 && dataToPlay.size() )
  344.     {
  345.       data = dataToPlay.front();
  346.       // reset sequence if needed
  347.       if( data->sequence == 0 )
  348. sequence = 0;
  349.       // if sequence is corrupted, than you have to skip data until next
  350.       // I-frame. Than reset sequence
  351.       if( sequence > 0 && sequence + 1 != data->sequence )
  352. {
  353.   cout << "sequence is " << data->sequence << " expected " << 
  354.     sequence + 1 << endl;
  355.   // reset sequence anyway
  356.   sequence = 0;
  357.   bool got_it = false;
  358.   while( dataToPlay.size() )
  359.     {
  360.       // fing a seqience of 7 times FF FF FF FF FF
  361.       int num = data->packetSize - data->dataOffset;
  362.       for( unsigned char *ptr = (unsigned char*)data+data->dataOffset; 
  363.    num > 7; num--, ptr++ )
  364. {
  365.   if( *ptr == 0xff && ptr[1] == 0xff && ptr[2] == 0xff && 
  366.       ptr[3] == 0xff && ptr[4] == 0xff && ptr[5] == 0xff &&
  367.       ptr[6] == 0xff )
  368.     {
  369.       got_it = true;
  370.       data->dataOffset = data->packetSize - num;
  371.       break;
  372.     }
  373. }
  374.       if( got_it == false )
  375. {
  376.   inusedBuffers.push( data );
  377.   dataToPlay.pop();
  378.   if( dataToPlay.size() )
  379.     data = dataToPlay.front();
  380.   continue;
  381. }
  382.       break;
  383.     }
  384. }
  385.       if( !dataToPlay.empty() && data->dataOffset < data->packetSize )
  386. {
  387.   int size = min( data->packetSize - data->dataOffset, count );
  388.   memcpy( buf, (char*)data + data->dataOffset, size );
  389.   buf += size;
  390.   data->dataOffset += size;
  391.   amount_read += size;
  392.   count -= size;
  393. }
  394.       if( data->dataOffset == data->packetSize )
  395. {
  396.   inusedBuffers.push( data );
  397.   // update sequence when current packet is gone
  398.   sequence = data->sequence;
  399.   dataToPlay.pop();
  400. }
  401.     }
  402.   // If there is no data available,
  403.   // SII_read should return -1 with errno set to EAGAIN
  404.   if( amount_read == 0 )
  405.     {
  406.       errno = EAGAIN;
  407.       return -1;
  408.     }
  409.   cout << amount_read << " readn";
  410.   return amount_read;
  411. }
  412. off_t SII_lseek( void *, off_t offs, int how )
  413. {
  414.   debug( 3, "SII lseek %offset %d, how %dn", offs, how );
  415.   return -1;
  416. }