sync.cc
上传用户:aoeyumen
上传日期:2007-01-06
资源大小:3329k
文件大小:8k
源码类别:

DVD

开发平台:

Unix_Linux

  1. /*
  2.    File: sync.cc
  3.    By: Alex Theo de Jong
  4.    Created: February 1996
  5.    Description:
  6.    Synchronization object for multiple streams
  7. */
  8. #ifdef __GNUG__
  9. #pragma implementation
  10. #endif
  11. #include "athread.hh"
  12. #include <stdio.h>
  13. #include <iostream.h>
  14. #include <sys/time.h>
  15. #ifdef LINUX
  16. #include <sys/sem.h>
  17. #endif
  18. #include "error.hh"
  19. #include "debug.hh"
  20. #include "util.hh"
  21. #include "sync.hh"
  22. /*
  23.  *
  24.  * StampQueue
  25.  *
  26.  */
  27. StampQueue::StampQueue(int max) :
  28.   in(max), out(0),
  29.   index_in(0), index_out(0),
  30.   resync(0),
  31.   newtime(0.0),maximum(max), total(0)
  32. {
  33.   if (!(stamps=new double[maximum]))
  34.     error("could not allocate memory");
  35.   if (!(bytes=new int[maximum]))
  36.     error("could not allocate memory");  
  37. }
  38. StampQueue::~StampQueue(){
  39.   delete[] stamps;
  40.   delete[] bytes;
  41. }
  42. int StampQueue::get(double& stamp){
  43. #ifdef TRACE
  44.   if (total==0) warning("stamp queue underflow (stamp)");
  45. #endif
  46.   wait_out();
  47.   lock();
  48.   stamp=stamps[index_out];
  49.   TRACER("StampQueue::get " << dtoa(stamp));
  50.   if (++index_out>=maximum) index_out=0; 
  51.   total--;
  52.   unlock();
  53.   post_in();
  54.   return 1;
  55. }
  56. int StampQueue::get(double& stamp, int& b){
  57. #ifdef TRACE
  58.   if (total==0) warning("stamp queue underflow (stamp and bytes)");
  59. #endif
  60.   wait_out();
  61.   lock();
  62.   stamp=stamps[index_out];
  63.   b+=bytes[index_out];
  64.   if (++index_out>=maximum) index_out=0; 
  65.   TRACER("StampQueue::get" << dtoa(stamp) << " " << itoa(bytes[index_out]));
  66.   total--;
  67.   unlock();
  68.   post_in();
  69.   return 1;
  70. }
  71. int StampQueue::put(const double stamp, const int b){
  72. #ifdef TRACE
  73.   TRACER("StampQueue::put " << dtoa(stamp) << " " << itoa(b));
  74.   if (total==maximum){
  75.     warning("stamp queue overflow");
  76.   }
  77. #endif
  78.   wait_in();
  79.   lock();
  80.   stamps[index_in]=stamp;
  81.   bytes[index_in]=b;
  82.   if (++index_in>=maximum) index_in=0; 
  83.   total++;
  84.   unlock();
  85.   post_out();
  86.   return 1;
  87. }
  88. /*
  89.  *
  90.  * SyncTimer
  91.  *
  92.  */
  93. SyncTimer::SyncTimer(int max) : StampQueue(max), time(-1.0) {
  94. }
  95. int SyncTimer::update(){
  96.   get(newtime);
  97.   lock();
  98.   // == continuous re-sync if time-stamps are not used
  99.   //  < only re-sync if next time stamp drops (ie. jumping back into stream)
  100.   resync=(newtime<=time) ? 1 : 0;    
  101. #ifdef TRACE
  102.   if (resync==1) TRACER("re_sync time");
  103. #endif
  104.   time=newtime;
  105. TRACER("Timeupdate " << dtoa(time));
  106.   time_cond.broadcast();
  107.   unlock();
  108.   return resync;
  109. }
  110. int SyncTimer::done(){
  111.   TRACER("int SyncTimer::done()");
  112.   lock();
  113.   time=0x0fffffff; // maximum time
  114.   time_cond.broadcast();
  115.   unlock();
  116.   return 1;
  117. }
  118. /*
  119.  *
  120.  * SyncData
  121.  *
  122.  */
  123. SyncData::SyncData(int max, SyncTimer* t) : 
  124.   StampQueue(max),
  125.   terminated(0),
  126.   time(0.0),
  127.   bytes(0)
  128. {
  129.   timer=t;
  130. }
  131. int SyncData::wait(){
  132. #ifdef UPTIGHT
  133.   bytelock();  // bytes
  134. #endif
  135.   DEBUGGER("W" << itoa(bytes));
  136.   if (bytes<=0){
  137.     get(newtime, bytes);
  138. //message("wait for time " << dtoa(newtime) << " " << dtoa(time));
  139.     if (newtime<=time){
  140.       resync=0; // found new sync time; stop re-sync
  141.       TRACER("reset data resync");
  142.     }
  143. //msg("9");
  144.     time_lock.lock();
  145. //msg("8");
  146.     time=newtime;
  147.     time_cond.signal();
  148. //msg("7");
  149.     time_lock.unlock();
  150. //msg("6");
  151.     timer->lock();
  152. //message("wait for time " << dtoa(timer->time) << " it is " << dtoa(time) << " resync " << itoa(resync));
  153.     while (resync==0 && (time>timer->time) && (terminated==0)){
  154. //message("inwhile");
  155.       timer->time_cond.wait(&timer->time_lock);
  156. //message("wait for time " << dtoa(timer->time) << "it is " << dtoa(time));
  157.     }
  158.     TRACER("Twait " << dtoa(time) << " =< " << dtoa(timer->time));
  159.     timer->unlock();
  160. #ifdef UPTIGHT
  161.     byteunlock();
  162. #endif
  163.     return 1;
  164.   }
  165. #ifdef UPTIGHT
  166.   byteunlock();
  167. #endif
  168.   return 0;
  169. }
  170. int SyncData::skip(){
  171.   TRACER("int SyncData::skip()");
  172. #ifdef UPTIGHT
  173.   bytelock();
  174. #endif
  175.   if (bytes<=0){
  176.     get(newtime, bytes);
  177.     if (newtime<time)
  178.       resync=0;     // found new sync time; stop re-sync
  179.     time_lock.lock();
  180.     time=newtime;
  181.     time_cond.signal();
  182.     time_lock.unlock();
  183. #ifdef UPTIGHT
  184.     byteunlock();
  185. #endif
  186.     return 1;
  187.   }
  188. #ifdef UPTIGHT
  189.   byteunlock();
  190. #endif
  191.   return 0;
  192. }
  193. int SyncData::update(){
  194.   time_lock.lock();
  195.   if (timer->resync==1){
  196.     resync=1;  // re_sync data
  197.     TRACER("resync data");
  198.   }
  199.   else {
  200.     while ((time<=timer->time) && (terminated==0)){
  201.       time_cond.wait(&time_lock);
  202.     }
  203.   }
  204.   DEBUGGER("T" << dtoa(time) << "<" << dtoa(timer->time));
  205.   time_lock.unlock();
  206.   return resync;
  207. }
  208. int SyncData::done(int term=0){ return (term) ? terminated=term : terminated; }
  209. /*
  210.  *
  211.  * Synchronization
  212.  *
  213.  */
  214. /*
  215.   type  sync id     notes
  216.    0     1, 2       video + audio
  217.    1     1          video
  218.    2     2          audio
  219.    n     1,2,... n  video + audio + others
  220. */
  221. Synchronization::Synchronization(int type, int t_qsize, int f_qsize) : 
  222.   timer(t_qsize),
  223.   terminate(0), 
  224.   terminated(0) 
  225. {
  226.   int i;
  227.   int error=0;
  228.   sched_param param;
  229.   int policy;
  230.   for (i=0; i<Max_Sync_Process; i++) syncs[i]=0; // set all to 0
  231.   if (type>2){
  232.     for (i=0; i<Max_Sync_Process; i++)
  233.       syncs[i]=new SyncData(f_qsize, &timer);
  234.   }
  235.   else if (type==0){
  236.     for (i=0; i<2 && i<Max_Sync_Process; i++)
  237.       syncs[i]=new SyncData(f_qsize, &timer);  // id 1 & 2
  238.   }
  239.   else if (type==1){
  240.     syncs[0]=new SyncData(f_qsize, &timer);    // only id=1
  241.   }
  242.   else if (type==2){
  243.     syncs[0]=0;
  244.     syncs[1]=new SyncData(f_qsize, &timer);    // only id=2
  245.   }
  246.   if ((error=athr_create((void*(*)(void*)) Synchronization::init, this, &id))<0){
  247.     error("failed to create thread");
  248.     athr_exit(0);
  249.   }
  250.   if ((error=athr_getschedparam(id, &policy, &param))<0){
  251.     warning("could not get thread prio - ignored");
  252.   }
  253.   else {
  254. #ifdef LINUX
  255.     param.sched_priority+=1;
  256. //    policy = SCHED_RR;
  257.     TRACER("TIMERPRIORITY=" << param.sched_priority << "(" << param.sched_priority-1 << ")");
  258. #else
  259.     param.prio+=1;
  260.     TRACER("TIMERPRIORITY=" << param.prio << "(" << param.prio-1 << ")");
  261. #endif
  262.     if ((error=athr_setschedparam(id, policy, &param))<0){
  263.       warning("could not set thread prio - ignored");
  264.     }
  265.   }
  266. }
  267. Synchronization::~Synchronization(){
  268.   if (!terminated){
  269.     TRACER("waiting for synchronization thread to terminate ..."); 
  270.     athr_join(id);
  271.   }
  272.   TRACER("synchronization thread terminate!"); 
  273. }
  274. void* Synchronization::init(Synchronization* s){
  275.   TRACER("void* Synchronization::init(Synchronization* s)");
  276.   int i;
  277.   int resync=0;
  278.   while (!s->terminate){
  279.     resync=s->timer.update();
  280. #ifdef TRACE
  281.     msg("!");
  282.     if (s->syncs[0] && s->syncs[1]){
  283.       if (s->syncs[0]->total==0 && s->syncs[1]->total==s->syncs[1]->maximum){
  284.         TRACER("video stamps underflow, audio stamps overflow");
  285.       }
  286.       if (s->syncs[0]->total==s->syncs[0]->maximum && s->syncs[1]->total==0){
  287.         TRACER("audio stamps underflow, video stamps overflow");
  288.       }      
  289.     }
  290. #endif
  291.     for (i=0; ((s->terminate==0) && (i<Max_Sync_Process)); i++){
  292.       if ((s->syncs[i]!=0) && ((s->syncs[i]->total!=0) || resync)){
  293. #ifdef TRACE
  294.         msg(itoa(i));
  295. #endif
  296.         s->syncs[i]->update();
  297.       }
  298.     }
  299.   }
  300.   // finish up
  301.   while (s->timer.total) s->timer.update();
  302.   s->timer.done();
  303.   s->terminated=1;
  304.   athr_exit(0);
  305.   return 0;
  306. }
  307. int Synchronization::stop(){
  308.   TRACER("int Synchronization::stop()");
  309.   terminate=1;
  310.   double finaltime=0.0;
  311.   for (int i=0; i<Max_Sync_Process; i++){
  312.     if (syncs[i]){
  313.       syncs[i]->terminated=1;
  314.       syncs[i]->put(finaltime); // put last dummy time stamp
  315.       syncs[i]->time_cond.signal();
  316.     }
  317.   }
  318.   finaltime=0x0fffffff;
  319.   timer.put(finaltime);
  320.   return terminated;
  321. }
  322. int Synchronization::usedbytes(int ID, int b){ return (ID!=0) ? syncs[ID-1]->usedbytes(b) : -1; }
  323. int Synchronization::wait(int ID){ return (ID!=0) ? syncs[ID-1]->wait() : -1; }
  324. int Synchronization::skip(int ID){ return (ID!=0) ? syncs[ID-1]->skip() : -1; }
  325. int Synchronization::put(const double stamp){ return timer.put(stamp,0); }
  326. int Synchronization::put(int ID, const double s, const int b){ return (ID!=0) ? syncs[ID-1]->put(s, b) : -1; }
  327. int Synchronization::pause(){ return athr_suspend(id); }   // stop timer
  328. int Synchronization::resume(){ return athr_continue(id); } // continu timer
  329. int Synchronization::done(int ID){ return (ID!=0) ? syncs[ID-1]->done() : -1; }
  330. #ifdef MAIN
  331. main(int argc, char** argv){
  332.   // Just a test to compile and link
  333.   Synchronization sync();
  334.   while (1){
  335.   }
  336. }
  337.       // set higher priority for reading process
  338.       int prio=0;
  339.       thr_getprio(thr_self(), &prio);
  340.       prio++;
  341.       thr_setprio(thr_self(), prio);
  342. #endif