job.cc
上传用户:psq1974
上传日期:2007-01-06
资源大小:1195k
文件大小:6k
- /* Copyright (C) 1998, 1999 State University of New York at Stony Brook
- Author: Andrew V. Shuvalov ( andrew@ecsl.cs.sunysb.edu )
- Software license is located in file "COPYING"
- */
- #include <stdio.h>
- #include <errno.h>
- #if HAVE_UNISTD_H
- #include <unistd.h>
- #endif
- #include <stdlib.h>
- #include <sys/socket.h>
- #include <sys/un.h>
- #include <netinet/in.h>
- #include <signal.h>
- #include <assert.h>
- #include <sys/stat.h>
- #include <string>
- #include <strstream>
- #include "sockaddr.h"
- #include "vpusher.h"
- #include "../../client/src/play_commands.h"
- #include "packet_header.h"
- #define DEBUG
- JobElem::JobElem( RemoteUser &user, istrstream &input, bool firstFile,
- struct tm &firstt )
- : fp( NULL )
- {
- fullpath.resize( PATH_MAX );
- input.getline( (char *)fullpath.c_str(), PATH_MAX );
- if( fullpath.length() < 4 )
- return;
- if( !input )
- return;
- cout << "filename: " << fullpath.c_str() << endl;
- // extract filename from full path
- filename = fullpath;
- int pos = fullpath.rfind( '/' );
- if( pos >= 0 )
- filename =
- filename.substr( pos + 1, fullpath.length() );
- string s;
- s.resize( 100 );
- // read start time
- input.getline( (char *) s.c_str(), 100 );
- struct tm t;
- STRTOTIME( s.c_str(), t );
- TMTOTIMEVAL( t, start.val() );
- // if this is the first file, playFrom is not equal to start
- if( firstFile )
- {
- firstFile = false;
- TMTOTIMEVAL( firstt, playFrom.val() );
- }
- else
- TMTOTIMEVAL( t, playFrom.val() );
- localStartTime.tv_sec() = 0; // specify that start time is unknown
- s.resize( 100 );
- // read stop time
- input.getline( (char *) s.c_str(), 100 );
- STRTOTIME( s.c_str(), t );
- TMTOTIMEVAL( t, stop.val() );
- // open file
- fp = fopen( fullpath.c_str(), "r" );
- if( fp == NULL ) {
- perror( ((string)"Try to open file " + filename).c_str() );
- return;
- }
- struct stat file_stat;
- if( -1 == stat( fullpath.c_str(), &file_stat )) {
- perror( ((string)"stat " + filename).c_str() );
- return;
- }
- fileLength = file_stat.st_size;
- // calculate data rate in bytes per second
- if( stop.tv_sec() - start.tv_sec() > 0 )
- datarate = fileLength / ( stop.tv_sec() - start.tv_sec() );
- else
- datarate = 200000;
- cout << "data rate " << datarate << endl;
- // position stream by first packet time
- if( playFrom > start )
- {
- unsigned pos = ( playFrom.tv_sec() - start.tv_sec() ) * datarate;
- fseek( fp, pos, SEEK_SET );
- }
- }
- void JobElem::restart_count( unsigned loadDataChunk )
- {
- localStartTime.now();
- datasent = 0;
- dataloaded = 0;
- startOffset = ftell( fp );
- if( startOffset > (unsigned)loadDataChunk )
- startOffset -= loadDataChunk;
- else
- startOffset = 0;
- movieStartTime = start;
- movieStartTime.tv_sec() += startOffset / datarate;
- }
- bool JobElem::send_packet( RemoteUser &user, PacketHeader *data )
- {
- // packet header should be filled by load event
- int size = send( user.outputSocket, data, data->packetSize, 0 );
- if( size == -1 ) { // can't send
- perror( "send to remote socket" );
- app->log( 0, "send to remote socket: %s", sys_errlist[ errno ] );
- if( errno == EMSGSIZE ) { // too big
- // ???
- }
- return false;
- }
- # ifdef DEBUG
- cout << "packet sent, size " << size << " port " <<
- user.outputSocket << " total " << datasent << endl;
- # endif
- if( localStartTime.tv_sec() == 0 )
- {
- // first packet
- restart_count( (unsigned)user.loadDataChunk );
- }
- datasent += data->packetSize;
- return true;
- }
- void JobElem::calc_next_packet_time( Timeval &t )
- {
- t.tv_usec() = localStartTime.tv_usec() +
- (int)( datasent * 1000000.0 / datarate );
- // normalize
- t.tv_sec() = localStartTime.tv_sec() + t.tv_usec() / 1000000;
- t.tv_usec() = t.tv_usec() % 1000000;
- # ifdef DEBUG
- {
- static Timeval prev;
- cout << "Calc p.tm: diff from prev is " << t - prev << endl;
- prev = t;
- }
- # endif
- }
- bool JobElem::load_packet( RemoteUser &user,
- stack< PacketHeader * > &inusedBuffers )
- {
- int size = user.loadDataChunk;
- int total = 0;
- while( size > 0 )
- {
- PacketHeader *data;
- // if we have inused buffers, use one
- if( inusedBuffers.size() )
- {
- data = inusedBuffers.top();
- inusedBuffers.pop();
- }
- // or allocate the new one
- else
- data = (PacketHeader *)(new char[ user.maxPacketLength ]);
- if( !data ) {
- cout << "allocation problem" << endl;
- return false;
- }
- unsigned fileoffset = startOffset + dataloaded;
- int size_read =
- fread( (char*)data + sizeof( PacketHeader ), 1,
- user.maxPacketLength - sizeof( PacketHeader ), fp );
- if( !size_read )
- {
- // assume this job is done. close file and pop queue
- cout << "job is done" << endl;
- inusedBuffers.push( data );
- fclose( fp );
- fp = NULL;
- user.jobs.pop();
- // if this was the last job
- if( !user.jobs.size() )
- {
- return false;
- }
- return false;
- }
- dataloaded += size_read;
- // text size will be 0
- data->textSize = 0;
- data->dataSize = size_read;
- data->packetSize = sizeof( PacketHeader ) + data->textSize +
- data->dataSize;
- data->textOffset = 0;
- data->dataOffset = sizeof( PacketHeader );
- strcpy( data->type, mpegT );
- // calculate movie packet time
- unsigned timeoffset = fileoffset / datarate;
- data->when.tv_sec = start.tv_sec() + timeoffset;
- strncpy( data->filename, filename.c_str(), sizeof(data->filename));
- data->sequence = user.sequence++;
- // header filled, push
- user.dataToSend.push( data );
- size -= size_read;
- total += size_read;
- }
- // cout << total << " data loaded, fp " << fp << endl;
- return true;
- }