media-app.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:53k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* -*- Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */
  2. /*
  3.  * media-app.cc
  4.  * Copyright (C) 1997 by the University of Southern California
  5.  * $Id: media-app.cc,v 1.14 2005/08/25 18:58:10 johnh Exp $
  6.  *
  7.  * This program is free software; you can redistribute it and/or
  8.  * modify it under the terms of the GNU General Public License,
  9.  * version 2, as published by the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License along
  17.  * with this program; if not, write to the Free Software Foundation, Inc.,
  18.  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  19.  *
  20.  *
  21.  * The copyright of this module includes the following
  22.  * linking-with-specific-other-licenses addition:
  23.  *
  24.  * In addition, as a special exception, the copyright holders of
  25.  * this module give you permission to combine (via static or
  26.  * dynamic linking) this module with free software programs or
  27.  * libraries that are released under the GNU LGPL and with code
  28.  * included in the standard release of ns-2 under the Apache 2.0
  29.  * license or under otherwise-compatible licenses with advertising
  30.  * requirements (or modified versions of such code, with unchanged
  31.  * license).  You may copy and distribute such a system following the
  32.  * terms of the GNU GPL for this module and the licenses of the
  33.  * other code concerned, provided that you include the source code of
  34.  * that other code when and as the GNU GPL requires distribution of
  35.  * source code.
  36.  *
  37.  * Note that people who make modified versions of this module
  38.  * are not obligated to grant this special exception for their
  39.  * modified versions; it is their choice whether to do so.  The GNU
  40.  * General Public License gives permission to release a modified
  41.  * version without this exception; this exception also makes it
  42.  * possible to release a modified version which carries forward this
  43.  * exception.
  44.  *
  45.  */
  46. //
  47. // Implementation of media application
  48. //
  49. // $Header: /cvsroot/nsnam/ns-2/rap/media-app.cc,v 1.14 2005/08/25 18:58:10 johnh Exp $
  50. #include <stdarg.h>
  51. #include "template.h"
  52. #include "media-app.h"
  53. #include "utilities.h"
  54. //----------------------------------------------------------------------
  55. // Classes related to a multimedia object
  56. //
  57. // MediaSegment
  58. // MediaSegmentList: segments in a layer
  59. // MediaPage: a stored multimedia object (stream)
  60. //----------------------------------------------------------------------
  61. MediaSegment::MediaSegment(const HttpMediaData& d) : flags_(0)
  62. {
  63. start_ = d.st();
  64. end_ = d.et();
  65. if (d.is_last())
  66. set_last();
  67. if (d.is_pref())
  68. set_pref();
  69. }
  70. void MediaSegmentList::add(const MediaSegment& s) 
  71. {
  72. MediaSegment* tmp = (MediaSegment *)head_;
  73. while ((tmp != NULL) && (tmp->before(s))) {
  74. tmp = tmp->next();
  75. }
  76. // Append at the tail, or the first element in list
  77. if (tmp == NULL) {
  78. length_ += s.datasize();
  79. if ((tail_ != NULL) && ((MediaSegment *)tail_)->overlap(s)) 
  80. // Don't need to merge because it's merged at the end
  81. ((MediaSegment*)tail_)->merge(s);
  82. else {
  83. MediaSegment *p = new MediaSegment(s);
  84. if (head_ == NULL)
  85. head_ = tail_ = p;
  86. else 
  87. append(p, tail_);
  88. }
  89. if (getsize() != length_) {
  90. fprintf(stderr, 
  91. "MediaSegmentList corrupted: Point 1.n");
  92. abort();
  93. }
  94. return;
  95. }
  96. // Update total stored length ONLY IF s is not in tmp.
  97. if (tmp->in(s)) {
  98. fprintf(stderr, 
  99. "MediaSegmentList: get a seg (%d %d) which is already in cache!n",
  100. s.start(), s.end());
  101. fprintf(stderr, "List contents: ");
  102. print();
  103. #if 0 
  104. //Tcl::instance().eval("[Simulator instance] flush-trace");
  105. //abort();
  106. #endif
  107. // XXX Don't abort, simply continue
  108. return;
  109. }
  110. // Insert a MediaSegment into list. Note: Don't do merge!
  111. if (tmp->overlap(s)) {
  112. length_ += (s.datasize() - tmp->merge(s));
  113. } else {
  114. MediaSegment *p = new MediaSegment(s);
  115. insert(p, tmp);
  116. tmp = p;
  117. length_ += s.datasize();
  118. }
  119. if (getsize() != length_) {
  120. fprintf(stderr, "MediaSegmentList corrupted: Point 2.n");
  121. abort();
  122. }
  123. merge_seg(tmp);
  124. if (getsize() != length_) {
  125. fprintf(stderr, "MediaSegmentList corrupted: Point 3.n");
  126. abort();
  127. }
  128. }
  129. void MediaSegmentList::merge_seg(MediaSegment* tmp)
  130. {
  131. // See if <tmp> can be merged with next segments
  132. MediaSegment *q = tmp->next();
  133. while (q && q->overlap(*tmp)) {
  134. #if 1
  135. if ((tmp->start() == q->start()) && (tmp->end() == q->end())) {
  136. abort();
  137. }
  138. #endif
  139. tmp->merge(*q);
  140. detach(q);
  141. delete q;
  142. q = tmp->next();
  143. }
  144. // See if <tmp> can be merged with previous segments
  145. q = tmp->prev();
  146. while (q && q->overlap(*tmp)) {
  147. tmp->merge(*q);
  148. assert(tail_ != q);
  149. detach(q);
  150. delete q;
  151. q = tmp->prev();
  152. }
  153. }
  154. int MediaSegmentList::in(const MediaSegment& s)
  155. {
  156. MediaSegment* tmp = (MediaSegment *)head_;
  157. while ((tmp != NULL) && (tmp->before(s)))
  158. tmp = tmp->next();
  159. // If all segments are before s, or the first segment which isn't 
  160. // before s doesn't overlap with s, s isn't in this list.
  161. if ((tmp == NULL) || !s.in(*tmp))
  162. return 0;
  163. else 
  164. return 1;
  165. }
  166. // Get the next segment which is not before 's', but with the same size
  167. // as the given 's'. This segment may not overlap with s. 
  168. MediaSegment MediaSegmentList::get_nextseg(const MediaSegment& s) 
  169. {
  170. MediaSegment res(0, 0); // If unsuccessful, return start() = 0
  171. MediaSegment* tmp = (MediaSegment *)head_;
  172. while ((tmp != NULL) && (tmp->before(s))) 
  173. tmp = tmp->next();
  174. if (tmp == NULL) {
  175. res.set_last();
  176. return res;
  177. }
  178. assert(tmp->end() > s.start());
  179. //  // Don't return a segment which do not *OVERLAP* with s 
  180. //  // (boundary overlap is excluded).
  181. //  if ((tmp->end() <= s.start()) || (tmp->start() >= s.end())) 
  182. //      return res;
  183. // XXX How to flag that no more data is available in the future??
  184. res = s;
  185. int orig_size = s.datasize();
  186. if (res.start() < tmp->start()) {
  187. // |-------| (s)    ---> time axis
  188. //    |--------| (tmp)
  189. //
  190. // The start time of s is invalid, we need to adjust both 
  191. // the start time (and size if necessary)
  192. res.set_start(tmp->start());
  193. if (tmp->datasize() < orig_size) 
  194. // Not enough data available??
  195. res.set_datasize(tmp->datasize());
  196. else
  197. res.set_datasize(orig_size);
  198. } else if (res.end() > tmp->end()) {
  199. //    |---------| (s)    ---> time axis
  200. // |-------| (tmp)
  201. // 
  202. // The start time in s is valid, but we may need to adjust the 
  203. // end time (i.e., size) of s.
  204. res.set_datasize(tmp->end()-res.start());
  205. }
  206. // Falling through means that the requested segment is available 
  207. // and can be returned as it is.
  208. assert(res.datasize() <= tmp->datasize());
  209. if ((res.end() == tmp->end()) && (tmp->next() == NULL))
  210. // This is the last data segment of the layer
  211. res.set_last();
  212. return res;
  213. }
  214. // Note that evicting all segments in this layer may not leave enough 
  215. // space, so we return the number of bytes evicted from this layer
  216. int MediaSegmentList::evict_tail(int size)
  217. {
  218. int sz = size, tz;
  219. MediaSegment *tmp = (MediaSegment *)tail_;
  220. while ((tmp != NULL) && (sz > 0)) {
  221. // Reduce the last segment's size and adjust its playout time
  222. tz = tmp->evict_tail(sz);
  223. length_ -= tz;
  224. sz -= tz; 
  225. if (tmp->datasize() == 0) {
  226. // This segment is empty now
  227. detach(tmp);
  228. delete tmp;
  229. tmp = (MediaSegment *)tail_;
  230. }
  231. }
  232. return size - sz;
  233. }
  234. // Evicting <size> from the head of the list
  235. int MediaSegmentList::evict_head(int size)
  236. {
  237. int sz = size, tz;
  238. MediaSegment *tmp = (MediaSegment *)head_;
  239. while ((tmp != NULL) && (sz > 0)) {
  240. // Reduce the last segment's size and adjust its playout time
  241. tz = tmp->evict_head(sz);
  242. sz -= tz; 
  243. length_ -= tz;
  244. if (tmp->datasize() == 0) {
  245. // This segment is empty now
  246. detach(tmp);
  247. delete tmp;
  248. tmp = (MediaSegment *)head_;
  249. }
  250. }
  251. return size - sz;
  252. }
  253. // Evict all segments before <offset> from head and returns the size of 
  254. // evicted segments.
  255. int MediaSegmentList::evict_head_offset(int offset)
  256. {
  257. int sz = 0;
  258. MediaSegment *tmp = (MediaSegment *)head_;
  259. while ((tmp != NULL) && (tmp->start() < offset)) {
  260. if (tmp->end() <= offset) {
  261. // delete whole segment
  262. sz += tmp->datasize();
  263. length_ -= tmp->datasize();
  264. detach(tmp);
  265. delete tmp;
  266. tmp = (MediaSegment *)head_;
  267. } else {
  268. // remove part of the segment
  269. sz += offset - tmp->start();
  270. length_ -= offset - tmp->start();
  271. tmp->set_start(offset);
  272. }
  273. }
  274. if (head_ == NULL)
  275. tail_ = NULL;
  276. return sz;
  277. }
  278. // Return a list of "holes" between the given offsets
  279. MediaSegmentList MediaSegmentList::check_holes(const MediaSegment& s)
  280. {
  281. MediaSegmentList res;  // empty list
  282. MediaSegment* tmp = (MediaSegment *)head_;
  283. while ((tmp != NULL) && (tmp->before(s)))
  284. tmp = tmp->next();
  285. // If all segments are before s, s is a hole
  286. if (tmp == NULL) {
  287. res.add(s);
  288. return res;
  289. }
  290. // If s is within *tmp, there is no hole
  291. if (s.in(*tmp))
  292. return res;
  293. // Otherwise return a list of holes
  294. int soff, eoff;
  295. soff = s.start();
  296. eoff = s.end();
  297. while ((tmp != NULL) && (tmp->overlap(s))) {
  298. if (soff < tmp->start()) {
  299. // Only refetches the missing part
  300. res.add(MediaSegment(soff, min(eoff, tmp->start())));
  301. #if 1
  302. // DEBUG ONLY
  303. // Check if these holes are really holes!
  304. if (in(MediaSegment(soff, min(eoff, tmp->start())))) {
  305. fprintf(stderr, "Wrong hole: (%d %d) ", 
  306. soff, min(eoff, tmp->start()));
  307. fprintf(stderr, "tmp(%d %d), s(%d %d)n",
  308. tmp->start(), tmp->end(),
  309. soff, eoff);
  310. fprintf(stderr, "List content: ");
  311. print();
  312. }
  313. #endif
  314. }
  315. soff = tmp->end();
  316. tmp = tmp->next();
  317. }
  318. if (soff < eoff) {
  319. res.add(MediaSegment(soff, eoff));
  320. #if 1
  321. // DEBUG ONLY
  322. // Check if these holes are really holes!
  323. if (in(MediaSegment(soff, eoff))) {
  324. fprintf(stderr, "Wrong hole #2: (%d %d)n", 
  325. soff, eoff);
  326. fprintf(stderr, "List content: ");
  327. print();
  328. }
  329. #endif
  330. }
  331. #if 0
  332. check_integrity();
  333. #endif
  334. return res;
  335. }
  336. void MediaSegmentList::check_integrity()
  337. {
  338. MediaSegment *p, *q;
  339. p = (MediaSegment*)head_;
  340. while (p != NULL) {
  341. q = p; 
  342. p = p->next();
  343. if (p == NULL)
  344. break;
  345. if (!q->before(*p)) {
  346. fprintf(stderr, 
  347. "Invalid segment added: (%d %d), (%d %d)n", 
  348. q->start(), q->end(), p->start(), p->end());
  349. abort();
  350. }
  351. }
  352. }
  353. // Return the portion in s that is overlap with any segments in this list
  354. // Sort of complementary to check_holes(), but it does not return a list, 
  355. // hence smaller overhead. 
  356. int MediaSegmentList::overlap_size(const MediaSegment& s) const
  357. {
  358. int res = 0;
  359. MediaSegment* tmp = (MediaSegment *)head_;
  360. while ((tmp != NULL) && (tmp->before(s)))
  361. tmp = tmp->next();
  362. // If all segments are before s, there's no overlap
  363. if (tmp == NULL)
  364. return 0;
  365. // If s is within *tmp, entire s overlaps with the list
  366. if (s.in(*tmp))
  367. return s.datasize();
  368. // Otherwise adds all overlapping parts together.
  369. int soff, eoff;
  370. soff = s.start();
  371. eoff = s.end();
  372. while ((tmp != NULL) && (tmp->overlap(s))) {
  373. res += min(eoff, tmp->end()) - max(soff, tmp->start());
  374. soff = tmp->end();
  375. tmp = tmp->next();
  376. }
  377. return res;
  378. }
  379. // Debug only
  380. void MediaSegmentList::print() 
  381. {
  382. MediaSegment *p = (MediaSegment *)head_;
  383. int i = 0, sz = 0;
  384. while (p != NULL) {
  385. printf("(%d, %d)  ", p->start(), p->end());
  386. sz += p->datasize();
  387. p = p->next();
  388. if (++i % 8 == 0)
  389. printf("n");
  390. }
  391. printf("nTotal = %dn", sz);
  392. }
  393. // Debug only
  394. int MediaSegmentList::getsize()
  395. {
  396. MediaSegment *p = (MediaSegment *)head_;
  397. int sz = 0;
  398. while (p != NULL) {
  399. sz += p->datasize();
  400. p = p->next();
  401. }
  402. return sz;
  403. }
  404. // Print into a char array with a given size. Abort if the size is exceeded.
  405. char* MediaSegmentList::dump2buf()
  406. {
  407. char *buf = new char[1024];
  408. char *b = buf;
  409. MediaSegment *p = (MediaSegment *)head_;
  410. int i = 0, sz = 1024;
  411. buf[0] = 0;
  412. while (p != NULL) {
  413. // XXX snprintf() should either be in libc or implemented
  414. // by TclCL (see Tcl2.cc there).
  415. i = snprintf(b, sz, "{%d %d} ", p->start(), p->end());
  416. sz -= i;
  417. // Boundary check: if less than 50 bytes, allocate new buf
  418. if (sz < 50) {
  419. char *tmp = new char[strlen(buf)+1024];
  420. strcpy(tmp, buf);
  421. delete []buf;
  422. buf = tmp;
  423. b = buf + strlen(buf);
  424. sz += 1024;
  425. } else 
  426. b += i;
  427. p = p->next();
  428. }
  429. return buf;
  430. }
  431. HttpMediaData::HttpMediaData(const char* sender, const char* page, int layer, 
  432.      int st, int et) :
  433. HttpData(MEDIA_DATA, 0), layer_(layer), st_(st), et_(et), flags_(0)
  434. {
  435. assert(strlen(page)+1 <= (size_t)HTTP_MAXURLLEN);
  436. strcpy(page_, page);
  437. assert(strlen(sender)+1 <= (size_t)HTTP_MAXURLLEN);
  438. strcpy(sender_, sender);
  439. }
  440. static class MappClass : public TclClass {
  441. public:
  442. MappClass() : TclClass("Application/MediaApp") {}
  443. TclObject* create(int argc, const char*const* argv) {
  444. if (argc > 4) 
  445. return (new MediaApp(argv[4]));
  446. return NULL;
  447. }
  448. } class_mapp;
  449. MediaApp::MediaApp(const char* page) : 
  450. log_(0), num_layer_(0), last_layer_(0)
  451. {
  452. strcpy(page_, page);
  453. // Initialize all layer data pointers
  454. for (int i = 0; i < MAX_LAYER; i++)
  455. data_[i].set_start(0); 
  456. bind("segmentSize_", &seg_size_);
  457. }
  458. void MediaApp::start()
  459. {
  460.   fprintf(stderr, "MediaApp::start() not supportedn");
  461.   abort();
  462. }
  463. void MediaApp::stop()
  464. {
  465. // Called when we want to stop the RAP agent
  466. rap()->stop();
  467. }
  468. AppData* MediaApp::get_data(int& nbytes, AppData* req) 
  469. {
  470. AppData *res;
  471. if (req == NULL) {
  472. MediaRequest p(MEDIAREQ_GETSEG);
  473. p.set_name(page_);
  474. // We simply rotating the layers from which to send data
  475. if (num_layer_ > 0) {
  476. p.set_layer(last_layer_++);
  477. last_layer_ = last_layer_ % num_layer_;
  478. } else 
  479. p.set_layer(0); 
  480. p.set_st(data_[0].start());
  481. p.set_datasize(seg_size_);
  482. p.set_app(this);
  483. res = target()->get_data(nbytes, &p);
  484. } else 
  485. res = target()->get_data(nbytes, req);
  486. // Update the current data pointer
  487. assert(res != NULL);
  488. HttpMediaData *p = (HttpMediaData *)res;
  489. // XXX For now, if the return size is 0, we assume that the 
  490. // transmission stops. Otherwise there is no way to tell the 
  491. // RAP agent that there's no more data to send
  492. if (p->datasize() <= 0) {
  493. // Should NOT advance sending data pointer because 
  494. // if this is a cache which is downloading from a slow
  495. // link, it is possible that the requested data will
  496. // become available in the near future!!
  497. delete p;
  498. return NULL;
  499. } else {
  500. // Set current data pointer to the right ones
  501. // If available data is more than seg_size_, only advance data
  502. // pointer by seg_size_. If less data is available, only 
  503. // advance data by the amount of available data.
  504. //
  505. // XXX Currently the cache above does NOT pack data from 
  506. // discontinugous blocks into one packet. May need to do 
  507. // that later. 
  508. assert((p->datasize() > 0) && (p->datasize() <= seg_size_));
  509. data_[p->layer()].set_start(p->et());
  510. data_[p->layer()].set_datasize(seg_size_);
  511. }
  512. return res;
  513. }
  514. int MediaApp::command(int argc, const char*const* argv)
  515. {
  516. Tcl& tcl = Tcl::instance();
  517. if (strcmp(argv[1], "log") == 0) {
  518. int mode;
  519. log_ = Tcl_GetChannel(tcl.interp(), 
  520.       (char*)argv[2], &mode);
  521. if (log_ == 0) {
  522. tcl.resultf("%s: invalid log file handle %sn",
  523.     name(), argv[2]);
  524. return TCL_ERROR;
  525. }
  526. return TCL_OK;
  527. } else if (strcmp(argv[1], "evTrace") == 0) { 
  528. char buf[1024], *p;
  529. if (log_ != 0) {
  530. sprintf(buf, "%.17g ", 
  531. Scheduler::instance().clock());
  532. p = &(buf[strlen(buf)]);
  533. for (int i = 2; i < argc; i++) {
  534. strcpy(p, argv[i]);
  535. p += strlen(argv[i]);
  536. *(p++) = ' ';
  537. }
  538. // Stick in a newline.
  539. *(p++) = 'n', *p = 0;
  540. Tcl_Write(log_, buf, p-buf);
  541. }
  542. return TCL_OK;
  543. } else if (strcmp(argv[1], "set-layer") == 0) {
  544. int n = atoi(argv[2]);
  545. if (n >= MAX_LAYER) {
  546. fprintf(stderr, 
  547. "Too many layers than maximum allowed.n");
  548. return TCL_ERROR;
  549. }
  550. num_layer_ = n;
  551. return TCL_OK;
  552. }
  553. return Application::command(argc, argv);
  554. }
  555. void MediaApp::log(const char* fmt, ...)
  556. {
  557. char buf[1024], *p;
  558. char *src = Address::instance().print_nodeaddr(rap()->addr());
  559. sprintf(buf, "%.17g i %s ", Scheduler::instance().clock(), src);
  560. delete []src;
  561. p = &(buf[strlen(buf)]);
  562. va_list ap;
  563. va_start(ap, fmt);
  564. vsprintf(p, fmt, ap);
  565. if (log_ != 0)
  566. Tcl_Write(log_, buf, strlen(buf));
  567. }
  568. //----------------------------------------------------------------------
  569. // MediaApp enhanced with quality adaptation
  570. //----------------------------------------------------------------------
  571. void QATimer::expire(Event *)
  572. {
  573. a_->UpdateState();
  574. resched(a_->UpdateInterval());
  575. }
  576. static class QAClass : public TclClass {
  577. public:
  578. QAClass() : TclClass("Application/MediaApp/QA") {}
  579. TclObject* create(int argc, const char*const* argv) {
  580. if (argc > 4) 
  581. return (new QA((const char *)(argv[4])));
  582. return NULL;
  583. }
  584. } class_qa_app;
  585. //#define CHECK 1
  586. //#define DBG 1
  587. QA::QA(const char *page) : MediaApp(page)
  588. {
  589. updTimer_ = new QATimer(this);
  590. bind("LAYERBW_", &LAYERBW_);
  591. bind("MAXACTIVELAYERS_", &MAXACTIVELAYERS_);
  592. bind("SRTTWEIGHT_", &SRTTWEIGHT_);
  593. bind("SMOOTHFACTOR_", &SMOOTHFACTOR_);
  594. bind("MAXBKOFF_", &MAXBKOFF_);
  595. bind("debug_output_", &debug_);
  596. bind("pref_srtt_", &pref_srtt_);
  597. for(int j = 0; j < MAX_LAYER; j++) {
  598. buffer_[j] = 0.0;
  599. sending_[j] = 0;
  600. playing_[j] = 0;
  601. drained_[j] = 0.0;
  602. bw_[j] = 0.0;
  603. pref_[j] = 0;
  604. }
  605. poffset_ = 0; 
  606. playTime_ = 0;   // Should initialize it
  607. startTime_ = -1; // Used to tell the first packet
  608. // Moving average weight for transmission rate average
  609. rate_weight_ = 0.01;
  610. avgrate_ = 0.0;
  611. }
  612. QA::~QA()
  613. {
  614. if (updTimer_) {
  615. if (updTimer_->status() != TIMER_IDLE)
  616. updTimer_->cancel();
  617. delete updTimer_;
  618. }
  619. }
  620. void QA::debug(const char* fmt, ...)
  621. {
  622. if (!debug_) 
  623. return;
  624. char buf[1024], *p;
  625. char *src = Address::instance().print_nodeaddr(rap()->addr());
  626. char *port = Address::instance().print_portaddr(rap()->addr());
  627. sprintf(buf, "# t %.17g i %s.%s QA ", 
  628. Scheduler::instance().clock(), src, port);
  629. delete []port;
  630. delete []src;
  631. p = &(buf[strlen(buf)]);
  632. va_list ap;
  633. va_start(ap, fmt);
  634. vsprintf(p, fmt, ap);
  635. fprintf(stderr, "%s", buf);
  636. }
  637. void QA::panic(const char* fmt, ...) 
  638. {
  639. char buf[1024], *p;
  640. char *src = Address::instance().print_nodeaddr(rap()->addr());
  641. char *port = Address::instance().print_portaddr(rap()->addr());
  642. sprintf(buf, "# t %.17g i %s.%s QA PANIC ", 
  643. Scheduler::instance().clock(), src, port);
  644. delete []port;
  645. delete []src;
  646. p = &(buf[strlen(buf)]);
  647. va_list ap;
  648. va_start(ap, fmt);
  649. vsprintf(p, fmt, ap);
  650. fprintf(stderr, "%s", buf);
  651. #if 0
  652. // XXX This is specific to OUR test. Remove it in release!!
  653. Tcl::instance().eval("[Simulator instance] flush-trace");
  654. abort();
  655. #endif
  656. }
  657. // Stop all timers
  658. void QA::stop()
  659. {
  660. rap()->stop();
  661. if (updTimer_->status() != TIMER_IDLE)
  662. updTimer_->cancel();
  663. }
  664. // Empty for now
  665. int QA::command(int argc, const char*const* argv)
  666. {
  667. return MediaApp::command(argc, argv);
  668. }
  669. // When called by RAP, req is NULL. We fill in the next data segment and 
  670. // return its real size in 'size' and return the app data. 
  671. AppData* QA::get_data(int& size, AppData*)
  672. {
  673. int layers, dropped, i, l, idx, bs1, bs2,scenario, done, cnt;
  674. double slope, bufavail, bufneeded, totbufs1, totbufs2, 
  675. optbufs1[MAX_LAYER], optbufs2[MAX_LAYER], bufToDrain;
  676.   
  677. static double last_rate = 0.0, last_depart, nextAdjPoint = -1,
  678. FinalDrainArray[MAX_LAYER],
  679. tosend[MAX_LAYER], FinalBuffer[MAX_LAYER];
  680. static int flag,  /* flag keeps the state of the last phase */
  681. tosendPtr = 0;
  682.   
  683. // Get RAP info
  684. double rate = seg_size_ / rap()->ipg();
  685. double srtt =  rap()->srtt();
  686. Scheduler& s = Scheduler::instance();
  687. double now = s.clock();
  688. int anyAck = rap()->anyack();
  689. assert((num_layer_ > 0) && (num_layer_ < MAX_LAYER));
  690.   
  691. // this part is added for the startup
  692. // to send data for the base layer until the first ACK arrives.
  693. // This is because we don't have an estimate for SRTT and slope of inc
  694. // Make sure that SRTT is updated properly when ACK arrives
  695. if (anyAck == 0) {
  696. sending_[0] = 1;
  697. return output(size, 0);
  698. debug("INIT Phase, send packet: layer 0 in send_pkt, 
  699. rate: %.3f, avgrate: %.3f, srtt:%.3fn", rate, avgrate_, srtt);
  700. }
  701.   
  702. layers = 0;
  703. // we can only calc slope when srttt has a right value
  704. // i.e. RAP has received an ACK
  705. slope = seg_size_/srtt;
  706. bufavail = 0.0;
  707. // XXX Is this a correct initial value????
  708. bufneeded = 0.0; 
  709.   
  710. // calculate layers & bufavail
  711. for (i = 0; i < MAX_LAYER; i++) {
  712. layers += sending_[i];
  713. if (sending_[i] == 1) 
  714. bufavail += buffer_[i];
  715. else
  716. /* debug only */
  717. if ((i < MAX_LAYER - 1) && (sending_[i+1] == 1))
  718. panic("ERROR L%d is not sent but L%d is.n",
  719.       i, i+1);
  720. }
  721. // check for startup phase
  722. if((layers == 1) && (playing_[0] != 1)){
  723. // L0 still buffers data, we are in startup phase
  724. // let's check
  725. if (sending_[0] == 0) {
  726. panic("ERROR sending[0]=0 !!!");
  727. }
  728. AppData *res = output(size, 0);
  729. debug("STARTUP, send packet: layer 0n");
  730. // Start playout if we have enough data for L0
  731. // The amount of buffered data for startup can be diff
  732. bufneeded = max(4*BufNeed((LAYERBW_-rate/2.0), slope), 
  733. 2*MWM(srtt));
  734. if (buffer_[0] >= bufneeded) {
  735. playing_[0] = 1;
  736. sending_[0] = 1;  
  737. drained_[0] = 0;  /* srtt*LAYERBW; */
  738. startTime_ = now; // start the playback at the client
  739. playTime_ = now;  // playout time of the receiver. 
  740. debug("... START Playing_ layer 0, buffer[0] = %f!n",
  741.       buffer_[0]);
  742. // start emulating clients consumption
  743. if (updTimer_->status() == TIMER_IDLE)
  744. updTimer_->sched(srtt);
  745. }
  746. return(res);
  747. }
  748.   
  749. // Store enough buffer before playing a layer. 
  750. // XXX, NOTE: it is hard to do this, when we add a new layer
  751. // the server sets the playout time of the first segment
  752. // to get to the client in time, It is hard to make sure
  753. // that a layer has MRM worth if data before stasting its
  754. // playback because it adds more delay
  755. // the base layer starts when it has enough buffering
  756. // the higher layers are played out when their data is available
  757. // so this is not needed 
  758. //for (i = 0; i < MAX_LAYER; i++) {
  759. // if ((sending_[i] == 1) && (playing_[i] == 0) &&
  760. //    (buffer_[i] > MWM(srtt))) {
  761. //  debug("Resume PLAYING Layer %d, play: %d send: %dn",
  762. //   i, playing_[i], sending_[i]);
  763. //  playing_[i]=1;
  764. //  drained_[i] = 0; /* XXX, not sure about this yet 
  765. //       * but if we set this to max it causes
  766. //       * a spike at the adding time
  767. //       */
  768. //  /* drained_[i]=LAYERBW*SRTT; */
  769. //}
  770. //}
  771.   
  772. // perform the primary drop if we are in drain phase 
  773. if (rate < layers*LAYERBW_) {
  774. bufneeded = (MWM(srtt)*layers) + 
  775. BufNeed((layers*LAYERBW_-rate), slope);
  776. //   debug("tot_bufavail: %7.1f bufneeded: %7.1f, layers: %d",
  777. //  bufavail, bufneeded, layers);
  778. dropped = 0;
  779. // XXX Never ever do primary drop layer 0!!!!
  780. while ((bufneeded > TotalBuf(layers, buffer_)) && 
  781.        (layers > 1)) {
  782. debug("** Primary DROPPED L%d, TotBuf(avail:%.1f 
  783. needed:%.1f), buf[%d]: %.2fn", 
  784.       layers-1, TotalBuf(layers, buffer_), bufneeded,
  785.       layers-1,buffer_[layers-1]);
  786. layers--;
  787. dropped++;
  788. sending_[layers] = 0;
  789. bufneeded = (MWM(srtt)*layers)+ 
  790. BufNeed(((layers)*LAYERBW_-rate),slope); 
  791. }
  792.     
  793. // just for debugging
  794. // here is the case when even the base layer can not be kept
  795. if ((bufneeded > TotalBuf(layers, buffer_)) && (layers == 1)) {
  796. // XXX We should still continue, shouldn't we????
  797. debug("** Not enough buf to keep the base layer, 
  798. TotBuf(avail:%.1f, needed:%.1f), n",
  799.       TotalBuf(layers, buffer_), bufneeded);
  800. }
  801. if (layers == 0) {
  802. // panic("** layers =0 !!");
  803. sending_[0] = 1;
  804. playing_[0] = 0;
  805. if (updTimer_->status() != TIMER_IDLE)
  806. updTimer_->cancel();
  807. debug("** RESTART Phase, set playing_[0] to 0 to rebuffer datan");
  808. return output(size, 0);
  809. }
  810. // now check to see which phase we are in
  811. if (rate >= layers*LAYERBW_) {
  812. /******************
  813.  ** filling phase **
  814.  *******************/
  815. /*      
  816. debug("-->> FILLING, layers: %d now: %.2f, rate: %.3f, avgrate: %.3f, 
  817.  srtt:%.3f, slope: %.3fn",
  818.     layers, now, rate, avgrate_, srtt, slope);
  819. */
  820.       
  821. last_rate = rate; /* this is used for the next drain phase */
  822. flag = 1;
  823. /* 
  824.  * 1) send for any layer that its buffer is below MWM
  825.  * MWM is the min amount of buffering required to absorbe 
  826.  * jitter
  827.  * each active layer must have atleast MWM data at all time
  828.  * this also ensures proper bw share, we do NOT explicitly 
  829.  * alloc BW share during filling
  830.  * Note: since we update state of the buffers on a per-packet 
  831.  * basis, we don't need to ensure that each layer gets a share 
  832.  * of bandwidth equal to its consumption rate. 
  833.  */
  834. for (i=0;i<layers;i++) {
  835. if (buffer_[i] < MWM(srtt)) {
  836. if ((buffer_[i-1] <= buffer_[i]+seg_size_) &&
  837.     (i > 0))
  838. idx = i-1;
  839. else 
  840. idx = i;
  841. //    debug("A:sending layer %d, less than MWM, t: %.2fn", 
  842. //  i,now);
  843. return output(size, idx);
  844. }
  845. }
  846. /* 
  847.  * Main filling algorithm based on the pesudo code 
  848.  * find the next optimal state to reach 
  849.  */
  850. /* init param */
  851. bs1 = 0;
  852. bs2 = 0;
  853. totbufs1 = 0;
  854. totbufs2 = 0;
  855. for (l=0; l<MAX_LAYER; l++) {
  856. optbufs1[l] = 0.0;
  857. optbufs2[l] = 0.0;
  858. }
  859. // XXX Note: when per-layer BW is low, and srtt is very small 
  860. // (e.g., in a LAN), the following code will result in that 
  861. // one buffered 
  862. // segment will produce a abort() of "maximum backoff reached".
  863. /* next scenario 1 state */
  864. while ((totbufs1 <= TotalBuf(layers, buffer_)) && 
  865.        (bs1 <= MAXBKOFF_)) {
  866. totbufs1 = 0.0;
  867. bs1++;
  868. for (l=0; l<layers;l++) {
  869. optbufs1[l] = bufOptScen1(l,layers,rate,slope,
  870.   bs1)+MWM(srtt);
  871. totbufs1 += optbufs1[l];
  872. }
  873. }
  874. // bs1 is the min no of back off that we can not handle for 
  875. // s1 now
  876. /* next secenario 2 state */
  877. while ((totbufs2 <= TotalBuf(layers, buffer_)) && 
  878.        (bs2 <= MAXBKOFF_)) {
  879. totbufs2 = 0.0;
  880. bs2++;
  881. for (l=0; l<layers;l++) {
  882. optbufs2[l] = bufOptScen2(l,layers,rate,slope,
  883.   bs2)+MWM(srtt);
  884. totbufs2 += optbufs2[l];
  885. }
  886. }
  887. /* 
  888.  * NOTE: at this point, totbufs1 could be less than total 
  889.  * buffering
  890.  * when it is enough for recovery from rate = 0;
  891.  * so totbufs1 <= TotalBuf(layers, buffer) is OK
  892.  * HOWEVER, in this case, we MUST shoot for scenario 2
  893.  */
  894. /* debug */
  895. /*
  896.        if ((totbufs2 <= TotalBuf(layers, buffer_)) && (bs2 <= MAXBKOFF_)) {
  897.   panic("# ERROR: totbufs1: %.2f,tot bufs2: %.2f, 
  898.  totbuf: %.2f, bs1: %d, bs2: %d, totneededbuf1: %.2f, totneededbuf2: %2fn",
  899.         totbufs1, totbufs2, TotalBuf(layers, buffer_), bs1, bs2,
  900.         TotalBuf(layers, optbufs1), TotalBuf(layers, optbufs2));
  901.        }
  902. */
  903. /* debug */
  904. if (bs2 >= MAXBKOFF_)
  905. debug("WARNING: MAX No of backoff Reached, bs1: %d, 
  906. bs2: %dn", bs1, bs2);
  907. /* Check for adding condition */
  908. //if ((bs1 > SMOOTHFACTOR_) && (bs2 > SMOOTHFACTOR_) && 
  909. //  (layers < MAX_LAYER)) {
  910. if ((bs1 > SMOOTHFACTOR_) && (bs2 > SMOOTHFACTOR_)){
  911. // Check if all layers are already playing
  912. // Assume all streams have the same # of layer: 
  913. // MAX_LAYER
  914. assert(layers <= num_layer_);
  915. // XXX Only limit the rate when we have all layers 
  916. // playing. There should be a better way to limit the 
  917. // transmission rate earlier! Note that we need RAP to
  918. // fix its IPG as soon as we fix the rate here. Thus, 
  919. // RAP should do that in its IpgTimeout()
  920. // instead of DecreaseIpg(). See rap.cc.
  921. if (layers == num_layer_){
  922. #if 0
  923. if (rate < num_layer_*LAYERBW_)
  924. panic("ERROR: rate: %.2f is less than 
  925. MAX BW for all %d layers!n", rate, layers);
  926. #endif
  927. // Ask RAP to fix the rate at MAX_LAYER*LAYERBW
  928. rap()->FixIpg((double)seg_size_/
  929.       (double)(num_layer_*LAYERBW_));
  930. // Mux the bandwidth evenly among layers  
  931. return output(size, layers - 1);   
  932. }
  933. // Calculate the first packet offset in this new layer
  934. int off_start = (int)floor((poffset_ + MWM(srtt)) / 
  935.    seg_size_) * seg_size_;
  936. // XXX Does the application have data between 
  937. // off_start_ and off_start_+MWM(srtt)??
  938.   
  939. // XXX If the computed offset falls behind, we just 
  940. // continue to send.
  941. if (data_[layers].start() <= off_start) {
  942. // Set LayerOffset[newlayer] = 
  943. //     poffset_ + MWM(srtt) * n: 
  944. // - n times roundtrip time of data, LET n BE 1
  945. // Round this offset to whole segment
  946. data_[layers].set_start(off_start);
  947. data_[layers].set_datasize(seg_size_);
  948. }
  949. // Make sure that all corresponding data in lower 
  950. // layers have been sent out, i.e., the last byte of 
  951. // current segment of the new layer should be less 
  952. // than the last byte of all lower layers
  953. if (data_[layers].end() > data_[layers-1].start()) 
  954. // XXX Do not send anything if we don't have
  955. // data!! Otherwise we'll dramatically increase
  956. // the sending rate of lower laters. 
  957. return NULL;
  958. //  return output(size, layers-1);
  959. sending_[layers] = 1;
  960. AppData *res = output(size, layers);
  961. if (res == NULL) {
  962. // Drop the newly added layer because we 
  963. // don't have data 
  964. sending_[layers] = 0;
  965. // However, do prefetching in case we'll add 
  966. // it again later
  967. int st = (int)floor((data_[layers].start()+
  968. pref_srtt_*LAYERBW_)
  969.        /seg_size_+0.5)*seg_size_;
  970. int et = (int)floor((data_[layers].end()+
  971. pref_srtt_*LAYERBW_)
  972.        /seg_size_+0.5)*seg_size_;
  973. if (et > pref_[layers]) {
  974. pref_[i] = et;
  975. MediaSegment s(st, et);
  976. check_availability(i, s);
  977. }
  978. for (i = 0; i < layers; i++) 
  979. if (buffer_[i] < MWM(srtt)) {
  980. res = output(size, i);
  981. if (res != NULL)
  982. break;
  983. }
  984. } else {
  985. /* LAYERBW_*srtt;should we drain this */
  986. drained_[layers]= 0; 
  987. debug("sending Just ADDED layer %d, t: %.2fn",
  988.       i, now);
  989. }
  990. return res;
  991. }
  992. /* 
  993.  * Find out which next step is closer
  994.  * Second cond is for the cases where totbufs2 becomes 
  995.  * saturated
  996.  */
  997. scenario = 0; // Initial value
  998. if((totbufs1 <= totbufs2) && 
  999.    (totbufs1 > TotalBuf(layers, buffer_))) {
  1000. /* go for next scenario 1 with sb1 backoff */
  1001. scenario = 1;
  1002. } else {
  1003. /* go for next scenario 2 with sb2 backoffs */
  1004. scenario = 2;
  1005. }
  1006. /* decide which layer needs more data */
  1007. if (scenario == 1) {
  1008. for (l=0; l<layers; l++) {
  1009. if (buffer_[l] >= optbufs1[l]) 
  1010. continue;
  1011. //if (buffer_[l] < optbufs1[l]) {
  1012. if ((buffer_[l-1] <= buffer_[l]+seg_size_) && 
  1013.     (l > 0))
  1014. idx = l-1;
  1015. else 
  1016. idx = l;
  1017. // debug("Cs1:sending layer %d to fill buffer, t: %.2fn", 
  1018. // idx,now);
  1019. return output(size, idx);
  1020. }
  1021. } else if (scenario == 2) {
  1022. l=0;
  1023. done = 0;
  1024. while ((l<layers) && (!done)){
  1025. if (TotalBuf(layers, buffer_) >= totbufs2) {
  1026. done ++;
  1027. } else {
  1028. if (buffer_[l]<min(optbufs2[l],
  1029.    optbufs1[l])) {
  1030. if((buffer_[l-1] <= buffer_[l]+
  1031.     seg_size_) && (l>0))
  1032. idx = l-1;
  1033. else 
  1034. idx = l;
  1035. //   debug("Cs2:sending layer %d to fill buffer, t: %.2fn", 
  1036. //  idx,now);
  1037. return output(size, idx);
  1038. }
  1039. l++;
  1040. }
  1041. } /* while */
  1042. } else 
  1043. panic("# ERROR: Unknown scenario: %d !!n", scenario);
  1044. /* special cases when we get out of this for loop */
  1045. if(scenario == 1){
  1046. panic("# Should not reach here, totbuf: %.2f, 
  1047. totbufs1: %.2f, layers: %dn",
  1048.       TotalBuf(layers, buffer_), totbufs1, layers);
  1049. }
  1050. if (scenario == 2) {
  1051. /* 
  1052.  * this is the point where we have satisfied buffer 
  1053.  * requirement for the next scenario 1 already, 
  1054.  * i.e. the MIN() value.
  1055.  * so we relax that and shoot for bufs2[l]
  1056.  */
  1057. /* 
  1058.  * if scenario 2, repeat the while loop without min 
  1059.  * cond we have alreddy satisfied the condition for 
  1060.  * the next scenario 1
  1061.  */
  1062. l=0;
  1063. while (l < layers) {
  1064. if (buffer_[l] < optbufs2[l]) {
  1065. if ((buffer_[l-1] <= buffer_[l]+
  1066.      seg_size_) && (l>0))
  1067. idx = l-1;
  1068. else 
  1069. idx = l;
  1070. //      debug("Cs22:sending layer %d to fill buffer, t: %.2fn", idx,now);
  1071. return output(size, idx);
  1072. }
  1073. l++;
  1074. }/* while */
  1075. }
  1076. panic("# Opps, should not reach here, bs1: %d, bs2: %d, 
  1077. scen: %d, totbufs1: %.2f, totbufs2: %.2f, totbufavail: %.2fn", 
  1078.       bs1, bs2, scenario, totbufs1,
  1079.       totbufs2, TotalBuf(layers, buffer_));
  1080.     
  1081. /* NEVER REACH HERE */
  1082.     
  1083. } else { /* rate < layers*LAYERBW_ */
  1084. /*******************
  1085.  ** Draining phase **
  1086.  *******************/
  1087. /*
  1088.     debug("-->> DRAINING, layers: %d rate: %.3f, avgrate: %.3f, srtt:%.3f, 
  1089.  slope: %.3fn", 
  1090.    layers, rate, avgrate_, srtt, seg_size_/srtt);
  1091. */
  1092. /*
  1093.  * At the beginning of a new drain phase OR 
  1094.  * another drop in rate during a draining phase OR
  1095.  * dec of slope during a draining phase that results in 
  1096.  * a new drop 
  1097.  */
  1098. /*
  1099.  * 1) the highest priority action at this point is to ensure 
  1100.  * all surviving layers have min amount of buffering, if not, 
  1101.  * try to fill that layer
  1102.  */
  1103. double lowest=buffer_[0];
  1104. int lowix=0;
  1105. for(i=0;i<layers;i++) {
  1106. if (lowest>buffer_[i]) {
  1107. lowest=buffer_[i];
  1108. lowix=i;
  1109. }
  1110. }
  1111. if (lowest<MWM(srtt)) {
  1112. last_depart = now;
  1113. //       debug("A':sending layer %d, below MWM in Drain t: %.2fn",
  1114. //      lowix, now);
  1115. return output(size, lowix);
  1116. }
  1117. if((nextAdjPoint < 0) || /* first draining phase */
  1118.    (flag >= 0) || /* after a filling phase */
  1119.    (now >= nextAdjPoint) || /* end of the curr interval */
  1120.    ((rate < last_rate) && (flag < 0)) || /* new backoff */
  1121.    (AllZero(tosend, layers))) /* all pkt are sent */ {
  1122. /* start of a new interval */
  1123. /* 
  1124.  * XXX, should update the nextAdjPoint diff for 
  1125.  * diff cases 
  1126.  */
  1127. nextAdjPoint = now + srtt;
  1128. bufToDrain = LAYERBW_*layers - rate;
  1129. /*
  1130.  * calculate optimal dist. of bufToDrain across all 
  1131.  * layers. FinalDrainArray[] is the output
  1132.  * FinalBuffer[] is the final state
  1133.  */
  1134. if (bufToDrain <= 0)
  1135. panic("# ERROR: bufToDrain: %.2fn", 
  1136.       bufToDrain);
  1137. DrainPacket(bufToDrain, FinalDrainArray, layers, rate, 
  1138.     srtt, FinalBuffer);
  1139. for(l=0; l<MAX_LAYER; l++){
  1140. tosend[l] = 0;
  1141. }
  1142. for(l=0; l<layers; l++){
  1143. tosend[l] = srtt*LAYERBW_ - FinalDrainArray[l];
  1144. // Correct for numerical error
  1145. if (fabs(tosend[l]) < QA_EPSILON)
  1146. tosend[l] = 0.0;
  1147. }
  1148. /* 
  1149.  * XXX, not sure if this is the best thing 
  1150.  * we might only increase it
  1151.  */
  1152. tosendPtr = 0;
  1153. /* debug only */
  1154. if ((bufToDrain <= 0) || 
  1155.     AllZero(FinalDrainArray, layers) ||
  1156.     AllZero(tosend, layers)) {
  1157. debug("# Error: bufToDrain: %.2f, %d layers, "
  1158.       "srtt: %.2fn", 
  1159.       bufToDrain, layers, srtt);
  1160. for (l=0; l<layers; l++)
  1161. debug("# FinalDrainArray[%d]: %.2f, "
  1162.       "tosend[%d]: %.2fn", l, 
  1163.       FinalDrainArray[l],l, tosend[l]);
  1164. }
  1165. /*******/
  1166. }
  1167. flag = -1;
  1168. last_rate = rate;
  1169. done = 0;
  1170. cnt = 1;  
  1171. while ((!done) && (cnt <= layers)) {
  1172. if (tosend[tosendPtr] > 0) {
  1173. if ((buffer_[tosendPtr-1] <= buffer_[tosendPtr]
  1174.      + seg_size_) && (tosendPtr > 0))
  1175. idx = tosendPtr-1;
  1176. else 
  1177. idx = tosendPtr;
  1178. tosend[tosendPtr] -= seg_size_;
  1179. if (tosend[tosendPtr] < 0)
  1180. tosend[tosendPtr] = 0;
  1181. return output(size, idx);
  1182. }
  1183. cnt++;
  1184. tosendPtr = (tosendPtr+1) % layers;
  1185. }
  1186. // XXX End of Drain Phase
  1187. // For now, send a chunk from the base layer. Modify it later!!
  1188. return output(size, 0);
  1189. } /* if (rate >= layers*LAYERBW_) */
  1190. panic("# QA::get_data() reached the end. n");
  1191. /*NOTREACHED*/
  1192. return NULL;
  1193. }
  1194. //-----------------------------------------
  1195. //-------------- misc routine
  1196. //------------------------------------------
  1197. // return 1 is all first "len" element of "arr" are zero
  1198. // and 0 otherwise
  1199. int QA::AllZero(double *arr, int len)
  1200. {
  1201. int i;
  1202.   
  1203. for (i=0; i<len; i++)
  1204. if (arr[i] != 0.0)
  1205. // debug("-- arr[%d}: %fn", i, arr[i]);
  1206. return 0;
  1207. return 1;
  1208. }
  1209. //
  1210. // Calculate accumulative amount of buffering for the lowest "n" layers
  1211. //
  1212. double QA::TotalBuf(int n, double *buffer)
  1213. {
  1214. double totbuf = 0.0;
  1215. int i;
  1216. for(i=0; i<n; i++)
  1217. totbuf += buffer[i]; 
  1218. return totbuf;
  1219. }
  1220. // Update buffer_ information for a given layer
  1221. // Get an output data packet from applications above
  1222. AppData* QA::output(int& size, int layer)
  1223. {
  1224. int i;
  1225. assert((sending_[layer] == 1) || (startTime_ == -1));
  1226. // In order to send out a segment, all corresponding segments of 
  1227. // the lower layers must have been sent out
  1228. if (layer > 0)
  1229. if (data_[layer-1].start() <= data_[layer].start())
  1230. return output(size, layer-1);
  1231. // Get and output the data at the current data pointer
  1232. MediaRequest q(MEDIAREQ_GETSEG);
  1233. q.set_name(page_);
  1234. q.set_layer(layer);
  1235. q.set_st(data_[layer].start());
  1236. q.set_datasize(seg_size_);
  1237. q.set_app(this);
  1238. AppData* res = target()->get_data(size, &q);
  1239. assert(res != NULL);
  1240. HttpMediaData *p = (HttpMediaData *)res; 
  1241. if (p->datasize() <= 0) {
  1242. // When the data is not available:
  1243. // Should NOT advance sending data pointer because 
  1244. // if this is a cache which is downloading from a slow
  1245. // link, it is possible that the requested data will
  1246. // become available in the near future!!
  1247. // We have already sent out the last segment of the base layer,
  1248. // now we are requested for the segment beyond the last one
  1249. // in the base layer. In this case, consider the transmission
  1250. // is complete and tear down the connection.
  1251. if (p->is_finished()) {
  1252. rap()->stop();
  1253. // XXX Shouldn't this be done inside mcache/mserver??
  1254. Tcl::instance().evalf("%s finish-stream %s", 
  1255.       target()->name(), name());
  1256. } else if (!p->is_last()) {
  1257. // If we coulnd't find anything within q, move data 
  1258. // pointer forward to skip holes.
  1259. MediaSegment tmp(q.et(), q.et()+seg_size_);
  1260. check_layers(p->layer(), tmp);
  1261. // If we can, advance. Otherwise wait for
  1262. // lower layers to advance first.
  1263. if (tmp.datasize() > 0) {
  1264. assert(tmp.datasize() <= seg_size_);
  1265. data_[p->layer()].set_start(tmp.start());
  1266. data_[p->layer()].set_end(tmp.end());
  1267. }
  1268. }
  1269. delete p;
  1270. return NULL;
  1271. }
  1272. // Set current data pointer to the right ones
  1273. // If available data is more than seg_size_, only 
  1274. // advance data pointer by seg_size_. If less data 
  1275. // is available, only advance data by the amount 
  1276. // of available data.
  1277. //
  1278. // XXX Currently the cache above does NOT pack data 
  1279. // from discontinugous blocks into one packet. May 
  1280. // need to do that later. 
  1281. //  if (p->is_last())
  1282. //  data_[p->layer()].set_last();
  1283. assert((p->datasize() > 0) && (p->datasize() <= seg_size_));
  1284. // XXX Before we move data pointer forward, make sure we don't violate
  1285. // layer ordering rules. Note we only need to check end_ because 
  1286. // start_ is p->et() which is guaranteed to be valid
  1287. MediaSegment tmp(p->et(), p->et()+seg_size_);
  1288. check_layers(p->layer(), tmp);
  1289. if (tmp.datasize() > 0) {
  1290. assert(tmp.datasize() <= seg_size_);
  1291. data_[p->layer()].set_start(tmp.start());
  1292. data_[p->layer()].set_end(tmp.end());
  1293. } else {
  1294. // Print error messages, do not send anything and wait for 
  1295. // next time so that hopefully lower layers will already 
  1296. // have advanced.
  1297. fprintf(stderr, "# ERROR We cannot advance pointers for "
  1298. "segment (%d %d)n", tmp.start(), tmp.end());
  1299. for (i = 0; i < layer; i++) 
  1300. fprintf(stderr, "Layer %d, data ptr (%d %d) n",
  1301. i, data_[i].start(), data_[i].end());
  1302. delete p;
  1303. return NULL;
  1304. }
  1305. // Let me know that we've sent out this segment. This is used
  1306. // later to drain data (DrainBuffers())
  1307. outlist_[p->layer()].add(MediaSegment(p->st(), p->et()));
  1308. buffer_[layer] += p->datasize();
  1309. bw_[layer] += p->datasize();
  1310. drained_[layer] -= p->datasize();
  1311.   
  1312. //offset_[layer] += seg_size_;
  1313. avgrate_ = rate_weight_*rate() + (1-rate_weight_)*avgrate_;
  1314. // DEBUG check
  1315. for (i = 0; i < layer-1; i++)
  1316. if (data_[i].end() < data_[i+1].end()) {
  1317. for (int j = 0; j < layer; j++)
  1318. fprintf(stderr, "layer i: (%d %d)n", 
  1319. data_[i].start(), data_[i].end());
  1320. panic("# ERROR Wrong layer sending order!!n");
  1321. }
  1322. return res;
  1323. }
  1324. void QA::check_layers(int layer, MediaSegment& tmp) {
  1325. // XXX While we are moving pointer forward, make sure
  1326. // that we are not violating layer boundary constraint
  1327. for (int i = layer-1; i >= 0; i--) 
  1328. // We cannot go faster than a lower layer!!
  1329. if (tmp.end() > data_[i].end())
  1330. tmp.set_end(data_[i].end());
  1331. }
  1332. //
  1333. // This is optimal buffer distribution for scenario 1.
  1334. // NOTE: rate is the current rate before the backoff
  1335. // Jan 28, 99
  1336. //
  1337. // This routines performs buffer sharing by giveing max share
  1338. // to the lowest layer, i.e. it fills the triangle in a bottom-up
  1339. // starting from the base layer. We use this routine instead of bufOpt,
  1340. // for all cases during filling phase. Allocation based on diagonal strips
  1341. //
  1342. double QA::bufOptScen1(int layer, int layers, double currrate, 
  1343.        double slope, int backoffs)
  1344. {
  1345. double smallt, larget, side, rate;
  1346.   
  1347. if (backoffs < 0) {
  1348. panic("# ERROR: backoff: %d in bufOptScen1n", 
  1349.       backoffs);
  1350. }
  1351. rate = currrate/pow(2,backoffs);
  1352. side = LAYERBW_*layers - (rate + layer*LAYERBW_);
  1353. if (side <= 0.0) 
  1354. return(0.0);
  1355. larget = BufNeed(side, slope);
  1356. side = LAYERBW_*layers - (rate + (layer+1)*LAYERBW_);
  1357. if (side < 0.0) 
  1358. side = 0.0;
  1359. smallt = BufNeed(side, slope);
  1360. return (larget-smallt);
  1361. }
  1362. //
  1363. // This routine calculate optimal buffer distribution for a layer
  1364. // in scenario 2 based on the 
  1365. // 1) current rate, 2) no of layers, 3) no of backoffs
  1366. //
  1367. // Jan 28, 99bufOptScen1(layer, layers, currrate, slope, backoffs)
  1368. //
  1369. double QA::bufOptScen2(int layer, int layers, double currrate, 
  1370.        double slope, int backoffs)
  1371. {
  1372. double bufopt = 0.0;
  1373. int bmin, done;
  1374. if(backoffs < 0) {
  1375. panic("# ERROR: backoff: %d in bufOptScen2n", 
  1376. backoffs);
  1377. }
  1378. if ((currrate/pow(2,backoffs)) >= layers*LAYERBW_)
  1379. return(0.0);
  1380. bmin = 0;
  1381. done = 0;
  1382. while ((!done) && bmin<=backoffs) {
  1383. if(currrate/pow(2,bmin) >= LAYERBW_*layers)
  1384. bmin++;
  1385. else 
  1386. done++;
  1387. }
  1388. // buf required for the first triangle
  1389. // we could have dec bmin and go for 1 backoff as well
  1390. bufopt = bufOptScen1(layer, layers, currrate/pow(2,bmin), slope, 0);
  1391.   
  1392. // remaining sequential backoffs
  1393. bufopt += (backoffs - bmin)*BufNeed(layers*LAYERBW_/2, slope);
  1394. return(bufopt);
  1395. }
  1396. //
  1397. // This routine returns the optimal distribution of requested-to-drained
  1398. // buffer across active layers based on:
  1399. // 1) curr rate, 2) curr drain distr(FinalDrainArry), etc
  1400. // NOTE, the caller must update FinalDrainArray from 
  1401. // 
  1402. // Jan 29, 99
  1403. //
  1404. // DrainArr: return value, used as an incremental chaneg for 
  1405. //   FinalDrainArray
  1406. // bufAvail:  current buffer_ state
  1407. void QA::drain_buf(double* DrainArr, double bufToDrain, 
  1408.    double* FinalDrainArray, double* bufAvail, 
  1409.    int layers, double rate, double srtt)
  1410. {
  1411. double bufReq1, bufReq2, bufs1[MAX_LAYER], bufs2[MAX_LAYER], slope, 
  1412. extra, targetArr[MAX_LAYER], maxDrainRemain;
  1413. int bs1, bs2, l;
  1414. slope = seg_size_/srtt;
  1415. bs1 = MAXBKOFF_ + 1;
  1416. bs2 = MAXBKOFF_ + 1;
  1417. bufReq1 = bufReq2 = 0;
  1418. for(l=0; l<layers; l++){
  1419. bufReq1 += bufOptScen1(l, layers, rate, slope, bs1);
  1420. bufReq2 += bufOptScen2(l, layers, rate, slope, bs2);
  1421. for(l=0; l<MAX_LAYER; l++){
  1422. bufs1[l] = 0;
  1423. bufs2[l] = 0;
  1424. DrainArr[l] = 0.0;
  1425. }
  1426. while(bufReq1 > TotalBuf(layers, bufAvail)){
  1427. bufReq1 = 0;
  1428. bs1--;
  1429. for(l=0; l<layers; l++){
  1430. bufs1[l] = bufOptScen1(l, layers, rate, slope, bs1);
  1431. bufReq1 += bufs1[l];
  1432. }
  1433.   
  1434. while(bufReq2 > TotalBuf(layers, bufAvail)){
  1435. bufReq2 = 0;
  1436. bs2--;
  1437. for(l=0; l<layers; l++){
  1438. bufs2[l] =  bufOptScen2(l, layers, rate, slope, bs2);
  1439. bufReq2 += bufs2[l];
  1440. }
  1441. if (bufReq1 >= bufReq2) {
  1442. // drain toward last optimal scenario 1
  1443. for (l=layers-1; l>=0; l--){
  1444. // we try to drain the maximum amount from
  1445. // min no of highest layers
  1446. // note that there is a limit on total draining
  1447. // from a layer
  1448. maxDrainRemain = srtt*LAYERBW_ - FinalDrainArray[l];
  1449. if ((bufAvail[l] > bufs1[l] + maxDrainRemain) &&
  1450.     (bufToDrain >= maxDrainRemain)) {
  1451. DrainArr[l] = maxDrainRemain;
  1452. bufToDrain -= maxDrainRemain;
  1453. } else {
  1454. if(bufAvail[l] > bufs1[l] + maxDrainRemain){
  1455. DrainArr[l] = bufToDrain;
  1456. bufToDrain = 0.0;
  1457. } else {
  1458. DrainArr[l] = bufAvail[l] - bufs1[l];
  1459. bufToDrain -= bufAvail[l] - bufs1[l];
  1460. /* for debug */
  1461. if(DrainArr[l] < 0.0){
  1462. //      panic("# ERROR, DrainArr[%d]: %.2f, bufAvail: %.2f, bufs1: %.2fn",
  1463. //    l, DrainArr[l], bufAvail[l], bufs1[l]);
  1464. DrainArr[l] = 0.0;
  1465. }
  1466. }
  1467. }
  1468. if(bufToDrain == 0.0)
  1469. return;
  1470. }
  1471. return;
  1472. } else {   /* if (bufReq1 >= bufReq2) */
  1473. // Drain towards he last optima scenario 2 
  1474. // We're draining - don't care about the upper bound on 
  1475. // scenario 2.
  1476. // Have to calculate all the layers together to get this max 
  1477. // thing to work
  1478. extra = 0.0;
  1479. // Calculate the extra buffering 
  1480. for (l=0; l<layers; l++) {
  1481. if(bufs1[l] > bufs2[l])
  1482. extra += bufs1[l] - bufs2[l];
  1483. }
  1484. for (l=layers-1; l>=0; l--)
  1485. if(bufs1[l] >= bufs2[l])
  1486. targetArr[l] = bufs1[l];
  1487. else
  1488. if (bufs2[l] - bufs1[l] >= extra) {
  1489. targetArr[l] = bufs2[l] - extra;
  1490. extra = 0;
  1491. } else {
  1492. // there is enough extra to compenstae the dif
  1493. if (extra > 0) {
  1494. targetArr[l] = bufs2[l];
  1495. extra -= bufs2[l] - bufs1[l];
  1496. } else 
  1497. panic("# ERROR Should not 
  1498. reach here, extra: %.2f, bufs2: %.2f, bufs1: %.2f, L%dn", 
  1499. extra, bufs2[l], bufs1[l], l);
  1500. }
  1501. } /* end of if (bufReq1 >= bufReq2) */
  1502.    
  1503. // drain toward last optimal scenario 2
  1504. for (l=layers-1; l>=0; l--) {
  1505. // we try to drain the maximum amount from
  1506. // min no of highest layers
  1507. // note that there is a limit on total draining
  1508. // from a layer
  1509. maxDrainRemain = srtt*LAYERBW_ - FinalDrainArray[l];
  1510. if ((bufAvail[l] > targetArr[l] + maxDrainRemain) &&
  1511.     (bufToDrain >= maxDrainRemain)) {
  1512. DrainArr[l] = maxDrainRemain;
  1513. bufToDrain -= maxDrainRemain;
  1514. } else {
  1515. if(bufAvail[l] > targetArr[l] + maxDrainRemain){
  1516. DrainArr[l] = bufToDrain;
  1517. bufToDrain = 0.0;
  1518. } else {
  1519. DrainArr[l] = bufAvail[l] - targetArr[l];
  1520. bufToDrain -= bufAvail[l] - targetArr[l];
  1521. // for debug 
  1522. if (DrainArr[l] < 0.0) {
  1523. //    panic("# ERROR, DrainArr[%d]: %.2f, bufAvail: %.2f, bufs1: %.2fn",
  1524. //  l, DrainArr[l], bufAvail[l], bufs1[l]);
  1525. DrainArr[l] = 0;
  1526. }
  1527. }
  1528. }
  1529. if (bufToDrain == 0.0)
  1530. return;
  1531. } /* end of for */
  1532. return;
  1533. }
  1534. //
  1535. // This routine calculate an optimal distribution of a given 
  1536. // amount of buffered data to drain.
  1537. // the main algorithm is in drain_buf() and this one mainly init
  1538. // the input and calls that routine ad then update FinalDrainArray,
  1539. // based on its old value and return value for DrainArr.
  1540. //
  1541. // FinalDrainArray: output
  1542. // FinalBuffer: output, expected buf state at the end of the interval
  1543. void QA::DrainPacket(double bufToDrain, double* FinalDrainArray, int layers,
  1544.      double rate, double srtt, double* FinalBuffer)
  1545. {
  1546. double DrainArr[MAX_LAYER],  bufAvail[MAX_LAYER], TotBufAvail;
  1547. int l,cnt;
  1548.   
  1549. for(l=0; l<MAX_LAYER; l++){
  1550. FinalDrainArray[l] = 0.0;
  1551. bufAvail[l] = buffer_[l];
  1552. }
  1553. TotBufAvail = TotalBuf(layers, bufAvail);
  1554. cnt = 0;
  1555. while ((bufToDrain > 0) && (cnt < 10)) {
  1556. // debug("bufToDrain%d: %.2fn", cnt, bufToDrain);
  1557. drain_buf(DrainArr, bufToDrain, FinalDrainArray, bufAvail, 
  1558.   layers, rate, srtt);
  1559. for(l=0; l<layers; l++){
  1560. bufToDrain -= DrainArr[l];
  1561. TotBufAvail -= DrainArr[l];
  1562. FinalDrainArray[l] += DrainArr[l];
  1563. bufAvail[l] -= DrainArr[l];
  1564. FinalBuffer[l] = buffer_[l] - FinalDrainArray[l];
  1565. }
  1566. cnt++;
  1567. }
  1568. }
  1569. void QA::check_availability(int layer, const MediaSegment& s) 
  1570. {
  1571. int dummy; 
  1572. MediaRequest p(MEDIAREQ_CHECKSEG);
  1573. p.set_name(page_);
  1574. p.set_layer(layer);
  1575. p.set_st(s.start());
  1576. p.set_et(s.end());
  1577. p.set_app(this);
  1578. // Ask cache/server to do prefetching if necessary.
  1579. target()->get_data(dummy, &p);
  1580. }
  1581. /* 
  1582.  * This routine is called once every SRTT to drain some data from
  1583.  * recv's buffer and src's image from recv's buf.
  1584.  */
  1585. void QA::DrainBuffers()
  1586. {
  1587. int i, j, layers = 0;
  1588. Scheduler& s = Scheduler::instance();
  1589. double now = s.clock();
  1590. // interval since last drain
  1591. double interval = now - playTime_;
  1592. playTime_ = now;  // update playTime
  1593. if ((layers > 1) && (playing_[0] != 1)) {
  1594. panic("ERROR in DrainBuffer: layers>0 but L0 isn't playingn");
  1595. }
  1596. // Updating playout offset, but do nothing if we are in the initial
  1597. // startup filling phase! This offset measures the playing progress
  1598. // of the client side. It is actually the playing offset of the lowest
  1599. // layer.
  1600. // This is the real amount of data to be drained from layers
  1601. int todrain[MAX_LAYER]; 
  1602. // Expected offset of base layer after draining, without considering 
  1603. // holes in data. This has to be satisfied, otherwise base layer will
  1604. // be dropped and an error condition will be raised.
  1605. poffset_ += (int)floor(interval*LAYERBW_+0.5);
  1606. // Started from MAX_LAYER to make debugging easier
  1607. for (i = MAX_LAYER-1; i >= 0; i--) {
  1608. // If this layer is not being played, don't drain anything
  1609. if (sending_[i] == 0) {
  1610. todrain[i] = 0;
  1611. drained_[i] = 0.0;
  1612. continue; 
  1613. }
  1614. todrain[i] = outlist_[i].evict_head_offset(poffset_);
  1615. assert(todrain[i] >= 0);
  1616. buffer_[i] -= todrain[i];
  1617. // A buffer must have more than one byte
  1618. if ((int)buffer_[i] <= 0) {
  1619. debug("Buffer %d ran dry: %.2f after draining, DROPn",
  1620.       i, buffer_[i]);
  1621. playing_[i] = 0;
  1622. sending_[i] = 0;
  1623. buffer_[i] = 0;
  1624.     
  1625. /* Drop all higher layers if they still have data */
  1626. for (j = i+1; j < MAX_LAYER; j++)
  1627. if (sending_[j] == 1) {
  1628. /*
  1629.   panic("# ERROR: layer %d 
  1630. is playing with %.2f buf but layer %d ran dry with %.2f bufn",
  1631.         j, buffer_[j], i, buffer_[i]);
  1632. */
  1633.   debug("# DROP layer %d: it 
  1634. is playing with %.2f buf but layer %d ran dry with %.2f bufn",
  1635.         j, buffer_[j], i, buffer_[i]);
  1636. sending_[j] = 0;
  1637. playing_[j] = 0;
  1638. buffer_[j] = 0;
  1639. }
  1640. // We don't need to set it to -1. The old address 
  1641. // will be used to see if we are sending old data if 
  1642. // that later is added again
  1643. //
  1644. // XXX Where is this -1 mark ever used????
  1645. // data_[i].set_start(-1); // drop layer i
  1646. } else {
  1647. // Prefetch for this layer. Round to whole segment
  1648. int st = (int)floor((poffset_+pref_srtt_*LAYERBW_)
  1649.     /seg_size_+0.5)*seg_size_;
  1650. int et = (int)floor((poffset_+(pref_srtt_+interval)*
  1651.    LAYERBW_)/seg_size_+0.5)*seg_size_;
  1652. if (et > pref_[i]) {
  1653. pref_[i] = et;
  1654. MediaSegment s(st, et);
  1655. check_availability(i, s);
  1656. }
  1657. }
  1658. } /* end of for */
  1659. }
  1660. // This routine dumps info into a file
  1661. // format of each line is as follows:
  1662. //  time tot-rate avg-rate per-layer-bw[MAXLAYER] tot-bw drain-rate[MAXLAYER] 
  1663. //  & cumulative-buffer[MAXLAYER] & no-of-layers 
  1664. // ADDED: use the old value of SRTT for bw/etc estimation !!! Jan 26
  1665. // XXX: need to be more compressed add more hooks to for ctrling from  
  1666. // tcl level
  1667. void QA::DumpInfo(double t, double last_t, double rate, 
  1668.   double avgrate, double srtt)
  1669. {
  1670. #define MAXLEN 2000
  1671. int i,j;
  1672. char s1[MAXLEN], s2[MAXLEN], tmp[MAXLEN];
  1673. static double last_srtt = 0, t1,t2 = 0;
  1674. #undef MAXLEN
  1675. double  tot_bw = 0.0, interval, diff;
  1676. //  if(rate > 1000000.0){
  1677. //  debug("WARNING rate: %f is too largen", rate);
  1678. //  }
  1679. interval = t - last_t ;
  1680.   
  1681. if((t2 != last_t) && (t2 > 0)){
  1682. diff = interval - last_srtt;
  1683. if ((diff > 0.001) || (diff < -0.001)) {
  1684. if (last_t == 0) 
  1685. // Startup phase
  1686. return;
  1687. /*
  1688.   debug("WARNING: last_srtt: %.4f != 
  1689. interval: %.4f, diff: %f t1: %f, t2: %f, last_t: %f, t: %fn",
  1690.   last_srtt, interval, diff, t1, t2, last_t, t);
  1691. */
  1692. //abort();
  1693. }
  1694. } else 
  1695. /* for the first call to init */
  1696. last_srtt = srtt;
  1697. t1 = last_t;
  1698. t2 = t;
  1699. if (interval <= 0.0) {
  1700. panic("# ERROR interval is negativen");
  1701. }
  1702. sprintf(s1, " %.2f %.2f %.2f X", last_t, rate, avgrate); 
  1703. sprintf(s2, " %.2f %.2f %.2f X", t, rate, avgrate);
  1704. j = 0;
  1705. for (i = 0; i < MAX_LAYER; i++) 
  1706.   //if (playing_[i] == 1)
  1707.   if (sending_[i] == 1)
  1708.     j++;
  1709. //no of layers being playback
  1710. sprintf(tmp, " %d", j*LAYERBW_);
  1711. strcat(s1, tmp);
  1712. strcat(s2, tmp);
  1713. for (i = 0; i < MAX_LAYER; i++) {
  1714. sprintf(tmp, " %.2g ", (bw_[i]/interval)+i*10000.0);
  1715. strcat(s1,tmp);
  1716. strcat(s2,tmp);
  1717. tot_bw += bw_[i]/interval;
  1718. bw_[i] = 0;
  1719. }
  1720. sprintf(tmp, " %.2f X", tot_bw );
  1721. strcat(s1,tmp);
  1722. strcat(s2,tmp);
  1723.   
  1724. j = 0;
  1725. for (i = 0; i < MAX_LAYER; i++) {
  1726.   //if (playing_[i] == 1) {
  1727.   if(sending_[i] ==1){
  1728.     j++;
  1729.     // drained_[] can be neg when allocated buf for this 
  1730.     // layer is more than consumed data
  1731.     if (drained_[i] < 0.0) {
  1732.       // this means that this layer was drained 
  1733.       // with max rate
  1734.       drained_[i] = 0.0;
  1735.     } 
  1736.     // XXX, we could have used interval*LAYERBW_ - bw_[i]
  1737.     // that was certainly better
  1738.     sprintf(tmp, " %.2f ", 
  1739.     (drained_[i]/interval)+i*10000.0);
  1740.     strcat(s1,tmp);
  1741.     strcat(s2,tmp);
  1742.     // Note that drained[] shows the amount of data that 
  1743.     // is used from buffered data, i.e. rd[i]
  1744.     // This must be srtt instead of interval because this 
  1745.     // is for next dumping.
  1746.     drained_[i]=srtt*LAYERBW_;
  1747.   } else {
  1748.     sprintf(tmp, " %.2f ", i*10000.0);
  1749.     strcat(s1,tmp);
  1750.     strcat(s2,tmp);
  1751.     drained_[i] = 0.0;
  1752.   }
  1753. }
  1754. for (i=0;i<MAX_LAYER;i++) {
  1755. sprintf(tmp, " %.2f", buffer_[i]+i*10000);
  1756. strcat(s1,tmp);
  1757. strcat(s2,tmp);
  1758. }
  1759. log("QA %s n", s1);
  1760. log("QA %s n", s2);
  1761. fflush(stdout);
  1762. }
  1763. // This routine models draining of buffers at the recv
  1764. // it periodically updates state of buffers
  1765. // Ir must be called once and then it reschedules itself
  1766. // it is first called after playout is started!
  1767. void QA::UpdateState()
  1768. {
  1769. double last_ptime = playTime_; // Last time to drain buffer
  1770. DrainBuffers();
  1771. DumpInfo(Scheduler::instance().clock(), last_ptime, 
  1772.  rate(), avgrate_, rap()->srtt());
  1773. }