job.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:6k
源码类别:

mpeg/mp3

开发平台:

C/C++

  1. /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
  2.    Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
  3.    Software license is located in file "COPYING"
  4. */
  5. #include <stdio.h>
  6. #include <errno.h>
  7. #if HAVE_UNISTD_H
  8. #include <unistd.h>
  9. #endif
  10. #include <stdlib.h>
  11. #include <sys/socket.h>
  12. #include <sys/un.h> 
  13. #include <netinet/in.h> 
  14. #include <signal.h>
  15. #include <assert.h>
  16. #include <sys/stat.h>
  17. #include <string> 
  18. #include <strstream> 
  19. #include "sockaddr.h"
  20. #include "vpusher.h"
  21. #include "../../client/src/play_commands.h"
  22. #include "packet_header.h"
  23. #define DEBUG
  24. JobElem::JobElem( RemoteUser &user, istrstream &input, bool firstFile,
  25.   struct tm &firstt )
  26.   : fp( NULL )
  27. {
  28.   fullpath.resize( PATH_MAX );
  29.   input.getline( (char *)fullpath.c_str(), PATH_MAX );
  30.   if( fullpath.length() < 4 )
  31.     return;
  32.   if( !input )
  33.     return;
  34.   cout << "filename: " << fullpath.c_str() << endl;
  35.   // extract filename from full path
  36.   filename = fullpath;
  37.   int pos = fullpath.rfind( '/' );
  38.   if( pos >= 0 ) 
  39.     filename = 
  40.       filename.substr( pos + 1, fullpath.length() );
  41.   string s;
  42.   s.resize( 100 );
  43.   // read start time 
  44.   input.getline( (char *) s.c_str(), 100 );
  45.   struct tm t;
  46.   STRTOTIME( s.c_str(), t );
  47.   TMTOTIMEVAL( t, start.val() );
  48.   // if this is the first file, playFrom is not equal to start
  49.   if( firstFile )
  50.     {
  51.       firstFile = false;
  52.       TMTOTIMEVAL( firstt, playFrom.val() );
  53.     }
  54.   else
  55.     TMTOTIMEVAL( t, playFrom.val() );
  56.   localStartTime.tv_sec() = 0;   // specify that start time is unknown
  57.   s.resize( 100 );
  58.   // read stop time 
  59.   input.getline( (char *) s.c_str(), 100 );
  60.   STRTOTIME( s.c_str(), t );
  61.   TMTOTIMEVAL( t, stop.val() );
  62.   // open file
  63.   fp = fopen( fullpath.c_str(), "r" );
  64.   if( fp == NULL ) {
  65.     perror( ((string)"Try to open file " + filename).c_str() );
  66.     return;
  67.   }
  68.   struct stat file_stat;
  69.   if( -1 == stat( fullpath.c_str(), &file_stat )) {
  70.     perror( ((string)"stat " + filename).c_str() );
  71.     return;
  72.   }
  73.   fileLength = file_stat.st_size;
  74.   // calculate data rate in bytes per second
  75.   if( stop.tv_sec() - start.tv_sec() > 0 )
  76.     datarate = fileLength / ( stop.tv_sec() - start.tv_sec() );
  77.   else
  78.     datarate = 200000;
  79.   cout << "data rate " << datarate << endl;
  80.   // position stream by first packet time
  81.   if( playFrom > start )
  82.     {
  83.       unsigned pos = ( playFrom.tv_sec() - start.tv_sec() ) * datarate;
  84.       fseek( fp, pos, SEEK_SET );
  85.     }
  86. }
  87. void JobElem::restart_count( unsigned loadDataChunk ) 
  88. {
  89.   localStartTime.now();
  90.   datasent = 0;
  91.   dataloaded = 0;
  92.   startOffset = ftell( fp );
  93.   if( startOffset > (unsigned)loadDataChunk )
  94.     startOffset -= loadDataChunk;
  95.   else
  96.     startOffset = 0;
  97.   movieStartTime = start;
  98.   movieStartTime.tv_sec() += startOffset / datarate;
  99. }
  100. bool JobElem::send_packet( RemoteUser &user, PacketHeader *data )
  101. {
  102.   // packet header should be filled by load event
  103.   int size = send( user.outputSocket, data, data->packetSize, 0 );
  104.   if( size == -1 ) { // can't send
  105.     perror( "send to remote socket" );
  106.     app->log( 0, "send to remote socket: %s", sys_errlist[ errno ] );
  107.     if( errno == EMSGSIZE ) {  // too big
  108.       // ???
  109.     }
  110.     return false;
  111.   }
  112. # ifdef DEBUG  
  113.   cout << "packet sent, size " << size << " port " << 
  114.     user.outputSocket << " total " << datasent << endl;
  115. # endif
  116.   if( localStartTime.tv_sec() == 0 )
  117.     {
  118.       // first packet
  119.       restart_count( (unsigned)user.loadDataChunk );
  120.     }
  121.   datasent += data->packetSize;
  122.   return true;
  123. }
  124. void JobElem::calc_next_packet_time( Timeval &t )
  125. {
  126.   t.tv_usec() = localStartTime.tv_usec() + 
  127.     (int)( datasent * 1000000.0 / datarate );
  128.   // normalize
  129.   t.tv_sec() = localStartTime.tv_sec() + t.tv_usec() / 1000000;
  130.   t.tv_usec() = t.tv_usec() % 1000000;
  131. # ifdef DEBUG
  132.   {
  133.     static Timeval prev;
  134.     cout << "Calc p.tm: diff from prev is " << t - prev << endl;
  135.     prev = t;
  136.   }
  137. # endif
  138. }
  139. bool JobElem::load_packet( RemoteUser &user, 
  140.    stack< PacketHeader * > &inusedBuffers )
  141. {
  142.   int size = user.loadDataChunk;
  143.   int total = 0;
  144.   while( size > 0 )
  145.     {
  146.       PacketHeader *data;
  147.       // if we have inused buffers, use one
  148.       if( inusedBuffers.size() )
  149. {
  150.   data = inusedBuffers.top();
  151.   inusedBuffers.pop();
  152. }
  153.       // or allocate the new one
  154.       else
  155. data = (PacketHeader *)(new char[ user.maxPacketLength ]);
  156.       if( !data ) {
  157. cout << "allocation problem" << endl;
  158. return false;
  159.       }
  160.       unsigned fileoffset = startOffset + dataloaded;
  161.       int size_read = 
  162. fread( (char*)data + sizeof( PacketHeader ), 1,
  163.        user.maxPacketLength - sizeof( PacketHeader ), fp );
  164.       if( !size_read )
  165. {
  166.   // assume this job is done. close file and pop queue
  167.   cout << "job is done" << endl;
  168.   inusedBuffers.push( data );
  169.   fclose( fp );
  170.   fp = NULL;
  171.   user.jobs.pop();
  172.   // if this was the last job
  173.   if( !user.jobs.size() )
  174.     {
  175.       return false;
  176.     }
  177.   return false;
  178. }
  179.       dataloaded += size_read;
  180.       // text size will be 0
  181.       data->textSize = 0;
  182.       data->dataSize = size_read;
  183.       data->packetSize = sizeof( PacketHeader ) + data->textSize +
  184. data->dataSize;
  185.       data->textOffset = 0;
  186.       data->dataOffset = sizeof( PacketHeader );
  187.       strcpy( data->type, mpegT );
  188.       // calculate movie packet time
  189.       unsigned timeoffset = fileoffset / datarate;
  190.       data->when.tv_sec = start.tv_sec() + timeoffset;
  191.       strncpy( data->filename, filename.c_str(), sizeof(data->filename));
  192.       data->sequence = user.sequence++;
  193.       // header filled, push
  194.       user.dataToSend.push( data );
  195.       size -= size_read;
  196.       total += size_read;
  197.     }
  198.   //  cout << total << " data loaded, fp " << fp << endl;
  199.   return true;
  200. }