sync.cc
上传用户:aoeyumen
上传日期:2007-01-06
资源大小:3329k
文件大小:8k
- /*
- File: sync.cc
- By: Alex Theo de Jong
- Created: February 1996
- Description:
- Synchronization object for multiple streams
- */
- #ifdef __GNUG__
- #pragma implementation
- #endif
- #include "athread.hh"
- #include <stdio.h>
- #include <iostream.h>
- #include <sys/time.h>
- #ifdef LINUX
- #include <sys/sem.h>
- #endif
- #include "error.hh"
- #include "debug.hh"
- #include "util.hh"
- #include "sync.hh"
- /*
- *
- * StampQueue
- *
- */
- StampQueue::StampQueue(int max) :
- in(max), out(0),
- index_in(0), index_out(0),
- resync(0),
- newtime(0.0),maximum(max), total(0)
- {
- if (!(stamps=new double[maximum]))
- error("could not allocate memory");
- if (!(bytes=new int[maximum]))
- error("could not allocate memory");
- }
- StampQueue::~StampQueue(){
- delete[] stamps;
- delete[] bytes;
- }
- int StampQueue::get(double& stamp){
- #ifdef TRACE
- if (total==0) warning("stamp queue underflow (stamp)");
- #endif
- wait_out();
- lock();
- stamp=stamps[index_out];
- TRACER("StampQueue::get " << dtoa(stamp));
- if (++index_out>=maximum) index_out=0;
- total--;
- unlock();
- post_in();
- return 1;
- }
- int StampQueue::get(double& stamp, int& b){
- #ifdef TRACE
- if (total==0) warning("stamp queue underflow (stamp and bytes)");
- #endif
- wait_out();
- lock();
- stamp=stamps[index_out];
- b+=bytes[index_out];
- if (++index_out>=maximum) index_out=0;
- TRACER("StampQueue::get" << dtoa(stamp) << " " << itoa(bytes[index_out]));
- total--;
- unlock();
- post_in();
- return 1;
- }
- int StampQueue::put(const double stamp, const int b){
- #ifdef TRACE
- TRACER("StampQueue::put " << dtoa(stamp) << " " << itoa(b));
- if (total==maximum){
- warning("stamp queue overflow");
- }
- #endif
- wait_in();
- lock();
- stamps[index_in]=stamp;
- bytes[index_in]=b;
- if (++index_in>=maximum) index_in=0;
- total++;
- unlock();
- post_out();
- return 1;
- }
- /*
- *
- * SyncTimer
- *
- */
- SyncTimer::SyncTimer(int max) : StampQueue(max), time(-1.0) {
- }
- int SyncTimer::update(){
- get(newtime);
- lock();
- // == continuous re-sync if time-stamps are not used
- // < only re-sync if next time stamp drops (ie. jumping back into stream)
- resync=(newtime<=time) ? 1 : 0;
- #ifdef TRACE
- if (resync==1) TRACER("re_sync time");
- #endif
- time=newtime;
- TRACER("Timeupdate " << dtoa(time));
- time_cond.broadcast();
- unlock();
- return resync;
- }
- int SyncTimer::done(){
- TRACER("int SyncTimer::done()");
- lock();
- time=0x0fffffff; // maximum time
- time_cond.broadcast();
- unlock();
- return 1;
- }
- /*
- *
- * SyncData
- *
- */
- SyncData::SyncData(int max, SyncTimer* t) :
- StampQueue(max),
- terminated(0),
- time(0.0),
- bytes(0)
- {
- timer=t;
- }
- int SyncData::wait(){
- #ifdef UPTIGHT
- bytelock(); // bytes
- #endif
- DEBUGGER("W" << itoa(bytes));
- if (bytes<=0){
- get(newtime, bytes);
- //message("wait for time " << dtoa(newtime) << " " << dtoa(time));
- if (newtime<=time){
- resync=0; // found new sync time; stop re-sync
- TRACER("reset data resync");
- }
- //msg("9");
- time_lock.lock();
- //msg("8");
- time=newtime;
- time_cond.signal();
- //msg("7");
- time_lock.unlock();
- //msg("6");
- timer->lock();
- //message("wait for time " << dtoa(timer->time) << " it is " << dtoa(time) << " resync " << itoa(resync));
- while (resync==0 && (time>timer->time) && (terminated==0)){
- //message("inwhile");
- timer->time_cond.wait(&timer->time_lock);
- //message("wait for time " << dtoa(timer->time) << "it is " << dtoa(time));
- }
- TRACER("Twait " << dtoa(time) << " =< " << dtoa(timer->time));
- timer->unlock();
- #ifdef UPTIGHT
- byteunlock();
- #endif
- return 1;
- }
- #ifdef UPTIGHT
- byteunlock();
- #endif
- return 0;
- }
- int SyncData::skip(){
- TRACER("int SyncData::skip()");
- #ifdef UPTIGHT
- bytelock();
- #endif
- if (bytes<=0){
- get(newtime, bytes);
- if (newtime<time)
- resync=0; // found new sync time; stop re-sync
- time_lock.lock();
- time=newtime;
- time_cond.signal();
- time_lock.unlock();
- #ifdef UPTIGHT
- byteunlock();
- #endif
- return 1;
- }
- #ifdef UPTIGHT
- byteunlock();
- #endif
- return 0;
- }
- int SyncData::update(){
- time_lock.lock();
- if (timer->resync==1){
- resync=1; // re_sync data
- TRACER("resync data");
- }
- else {
- while ((time<=timer->time) && (terminated==0)){
- time_cond.wait(&time_lock);
- }
- }
- DEBUGGER("T" << dtoa(time) << "<" << dtoa(timer->time));
- time_lock.unlock();
- return resync;
- }
- int SyncData::done(int term=0){ return (term) ? terminated=term : terminated; }
- /*
- *
- * Synchronization
- *
- */
- /*
- type sync id notes
- 0 1, 2 video + audio
- 1 1 video
- 2 2 audio
- n 1,2,... n video + audio + others
- */
- Synchronization::Synchronization(int type, int t_qsize, int f_qsize) :
- timer(t_qsize),
- terminate(0),
- terminated(0)
- {
- int i;
- int error=0;
- sched_param param;
- int policy;
- for (i=0; i<Max_Sync_Process; i++) syncs[i]=0; // set all to 0
- if (type>2){
- for (i=0; i<Max_Sync_Process; i++)
- syncs[i]=new SyncData(f_qsize, &timer);
- }
- else if (type==0){
- for (i=0; i<2 && i<Max_Sync_Process; i++)
- syncs[i]=new SyncData(f_qsize, &timer); // id 1 & 2
- }
- else if (type==1){
- syncs[0]=new SyncData(f_qsize, &timer); // only id=1
- }
- else if (type==2){
- syncs[0]=0;
- syncs[1]=new SyncData(f_qsize, &timer); // only id=2
- }
- if ((error=athr_create((void*(*)(void*)) Synchronization::init, this, &id))<0){
- error("failed to create thread");
- athr_exit(0);
- }
- if ((error=athr_getschedparam(id, &policy, ¶m))<0){
- warning("could not get thread prio - ignored");
- }
- else {
- #ifdef LINUX
- param.sched_priority+=1;
- // policy = SCHED_RR;
- TRACER("TIMERPRIORITY=" << param.sched_priority << "(" << param.sched_priority-1 << ")");
- #else
- param.prio+=1;
- TRACER("TIMERPRIORITY=" << param.prio << "(" << param.prio-1 << ")");
- #endif
- if ((error=athr_setschedparam(id, policy, ¶m))<0){
- warning("could not set thread prio - ignored");
- }
- }
- }
- Synchronization::~Synchronization(){
- if (!terminated){
- TRACER("waiting for synchronization thread to terminate ...");
- athr_join(id);
- }
- TRACER("synchronization thread terminate!");
- }
- void* Synchronization::init(Synchronization* s){
- TRACER("void* Synchronization::init(Synchronization* s)");
- int i;
- int resync=0;
- while (!s->terminate){
- resync=s->timer.update();
- #ifdef TRACE
- msg("!");
- if (s->syncs[0] && s->syncs[1]){
- if (s->syncs[0]->total==0 && s->syncs[1]->total==s->syncs[1]->maximum){
- TRACER("video stamps underflow, audio stamps overflow");
- }
- if (s->syncs[0]->total==s->syncs[0]->maximum && s->syncs[1]->total==0){
- TRACER("audio stamps underflow, video stamps overflow");
- }
- }
- #endif
- for (i=0; ((s->terminate==0) && (i<Max_Sync_Process)); i++){
- if ((s->syncs[i]!=0) && ((s->syncs[i]->total!=0) || resync)){
- #ifdef TRACE
- msg(itoa(i));
- #endif
- s->syncs[i]->update();
- }
- }
- }
- // finish up
- while (s->timer.total) s->timer.update();
- s->timer.done();
- s->terminated=1;
- athr_exit(0);
- return 0;
- }
- int Synchronization::stop(){
- TRACER("int Synchronization::stop()");
- terminate=1;
- double finaltime=0.0;
- for (int i=0; i<Max_Sync_Process; i++){
- if (syncs[i]){
- syncs[i]->terminated=1;
- syncs[i]->put(finaltime); // put last dummy time stamp
- syncs[i]->time_cond.signal();
- }
- }
- finaltime=0x0fffffff;
- timer.put(finaltime);
- return terminated;
- }
- int Synchronization::usedbytes(int ID, int b){ return (ID!=0) ? syncs[ID-1]->usedbytes(b) : -1; }
- int Synchronization::wait(int ID){ return (ID!=0) ? syncs[ID-1]->wait() : -1; }
- int Synchronization::skip(int ID){ return (ID!=0) ? syncs[ID-1]->skip() : -1; }
- int Synchronization::put(const double stamp){ return timer.put(stamp,0); }
- int Synchronization::put(int ID, const double s, const int b){ return (ID!=0) ? syncs[ID-1]->put(s, b) : -1; }
- int Synchronization::pause(){ return athr_suspend(id); } // stop timer
- int Synchronization::resume(){ return athr_continue(id); } // continu timer
- int Synchronization::done(int ID){ return (ID!=0) ? syncs[ID-1]->done() : -1; }
- #ifdef MAIN
- main(int argc, char** argv){
- // Just a test to compile and link
- Synchronization sync();
- while (1){
- }
- }
- // set higher priority for reading process
- int prio=0;
- thr_getprio(thr_self(), &prio);
- prio++;
- thr_setprio(thr_self(), prio);
- #endif