xcpq.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:10k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* -*-  Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*-
  2.  *
  3.  * Copyright (C) 2004 by USC/ISI
  4.  *               2002 by Dina Katabi
  5.  *
  6.  * All rights reserved.
  7.  *
  8.  * Redistribution and use in source and binary forms are permitted
  9.  * provided that the above copyright notice and this paragraph are
  10.  * duplicated in all such forms and that any documentation, advertising
  11.  * materials, and other materials related to such distribution and use
  12.  * acknowledge that the software was developed by the University of
  13.  * Southern California, Information Sciences Institute.  The name of the
  14.  * University may not be used to endorse or promote products derived from
  15.  * this software without specific prior written permission.
  16.  *
  17.  * THIS SOFTWARE IS PROVIDED "AS IS" AND WITHOUT ANY EXPRESS OR IMPLIED
  18.  * WARRANTIES, INCLUDING, WITHOUT LIMITATION, THE IMPLIED WARRANTIES OF
  19.  * MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE.
  20.  *
  21.  */
  22. #include "xcpq.h"
  23. #include "xcp.h"
  24. #include "random.h"
  25. const double     XCPQueue::ALPHA_          = 0.4;
  26. const double     XCPQueue::BETA_           = 0.226;
  27. const double     XCPQueue::GAMMA_          = 0.1;
  28. const double     XCPQueue::XCP_MAX_INTERVAL= 1.0;
  29. const double     XCPQueue::XCP_MIN_INTERVAL= .001; 
  30. static class XCPQClass : public TclClass {
  31. public:
  32. XCPQClass() : TclClass("Queue/DropTail/XCPQ") {}
  33. TclObject* create(int, const char*const*) {
  34. return (new XCPQueue);
  35. }
  36. } class_droptail_xcpq;
  37. XCPQueue::XCPQueue(): queue_timer_(NULL), 
  38.       estimation_control_timer_(NULL),
  39.       rtt_timer_(NULL), effective_rtt_(0.0)
  40. {
  41. init_vars();
  42. }
  43. void XCPQueue::setupTimers()
  44. {
  45. queue_timer_ = new XCPTimer(this, &XCPQueue::Tq_timeout);
  46. estimation_control_timer_ = new XCPTimer(this, &XCPQueue::Te_timeout);
  47. rtt_timer_ = new XCPTimer(this, &XCPQueue::everyRTT);
  48. // Scheduling timers randomly so routers are not synchronized
  49. double T;
  50.   
  51. T = max(0.004, Random::normal(Tq_, 0.2 * Tq_));
  52. queue_timer_->sched(T);
  53. T = max(0.004, Random::normal(Te_, 0.2 * Te_));
  54. estimation_control_timer_->sched(T);
  55. T = max(0.004, Random::normal(Tr_, 0.2 * Tr_));
  56. rtt_timer_->sched(T);
  57. }
  58. void XCPQueue::routerId(XCPWrapQ* q, int id)
  59. {
  60. if (id < 0 && q == 0)
  61. fprintf(stderr, "XCP:invalid routerId and queuen");
  62. routerId_ = id;
  63. myQueue_ = q;
  64. }
  65. int XCPQueue::routerId(int id)
  66. {
  67. if (id > -1) 
  68. routerId_ = id;
  69. return ((int)routerId_); 
  70. }
  71. int XCPQueue::limit(int qlim)
  72. {
  73. if (qlim > 0) 
  74. qlim_ = qlim;
  75. return (qlim_);
  76. }
  77. void XCPQueue::setBW(double bw)
  78. {
  79. if (bw > 0) 
  80. link_capacity_bps_ = bw;
  81. }
  82. void XCPQueue::setChannel(Tcl_Channel queue_trace_file)
  83. {
  84. queue_trace_file_ = queue_trace_file;
  85. }
  86. Packet* XCPQueue::deque()
  87. {
  88. double inst_queue = byteLength();
  89. /* L 32 */
  90. if (inst_queue < running_min_queue_bytes_) 
  91. running_min_queue_bytes_= inst_queue;
  92.   
  93. Packet* p = DropTail::deque();
  94. do_before_packet_departure(p);
  95.   
  96. max_queue_ci_ = max(length(), max_queue_ci_);
  97. min_queue_ci_ = min(length(), min_queue_ci_);
  98.   
  99. return (p);
  100. }
  101. void XCPQueue::enque(Packet* pkt)
  102. {
  103. max_queue_ci_ = max(length(), max_queue_ci_);
  104. min_queue_ci_ = min(length(), min_queue_ci_);
  105. do_on_packet_arrival(pkt);
  106. DropTail::enque(pkt);
  107. }
  108. void XCPQueue::do_on_packet_arrival(Packet* pkt){
  109.   
  110. double pkt_size = double(hdr_cmn::access(pkt)->size());
  111. /* L 1 */
  112. input_traffic_bytes_ += pkt_size;
  113. hdr_xcp *xh = hdr_xcp::access(pkt); 
  114. if (xh->xcp_enabled_ != hdr_xcp::XCP_ENABLED)
  115. return; // Estimates depend only on Forward XCP Traffic
  116.       
  117. ++num_cc_packets_in_Te_;
  118.   
  119. if (xh->rtt_ != 0.0) {
  120. /* L 2 */
  121. sum_inv_throughput_ += xh->x_;
  122. /* L 3 */
  123. if (xh->rtt_ < XCP_MAX_INTERVAL) {
  124. /* L 4 */
  125. double y = xh->rtt_ * xh->x_;
  126. sum_rtt_by_throughput_ += y;
  127. /* L 5 */
  128. } else {
  129. /* L 6 */
  130. double y = XCP_MAX_INTERVAL * xh->x_;
  131. sum_rtt_by_throughput_ += y;
  132. }
  133. }
  134. }
  135. void XCPQueue::do_before_packet_departure(Packet* p)
  136. {
  137. if (!p) return;
  138.   
  139. hdr_xcp *xh = hdr_xcp::access(p);
  140.     
  141. if (xh->xcp_enabled_ != hdr_xcp::XCP_ENABLED)
  142. return;
  143. if (xh->rtt_ == 0.0) {
  144. xh->delta_throughput_ = 0;
  145. return;
  146. }
  147. double pkt_size = double(hdr_cmn::access(p)->size());
  148. /* L 20, 21 */
  149. double pos_fbk = Cp_ * xh->x_;
  150. double neg_fbk = Cn_ * pkt_size;
  151. pos_fbk = min(residue_pos_fbk_, pos_fbk);
  152. neg_fbk = min(residue_neg_fbk_, neg_fbk);
  153. /* L 22 */
  154. double feedback = pos_fbk - neg_fbk;
  155.  
  156. /* L 23 */
  157. if (xh->delta_throughput_ >= feedback) {
  158. /* L 24 */
  159. xh->delta_throughput_ = feedback;
  160. xh->controlling_hop_ = routerId_;
  161. /* L 25 */
  162. } else {
  163. /* L 26 */
  164. neg_fbk = min(residue_neg_fbk_, neg_fbk + (feedback - xh->delta_throughput_));
  165. /* L 27 */
  166. pos_fbk = xh->delta_throughput_ + neg_fbk;
  167. }
  168. /* L 28, L 29 */
  169. residue_pos_fbk_ = max(0.0, residue_pos_fbk_ - pos_fbk);
  170. residue_neg_fbk_ = max(0.0, residue_neg_fbk_ - neg_fbk);
  171. /* L 30 */
  172. if (residue_pos_fbk_ == 0.0)
  173. Cp_ = 0.0;
  174. /* L 31 */
  175. if (residue_neg_fbk_ == 0.0)
  176. Cn_ = 0.0;
  177.   
  178. if (TRACE && (queue_trace_file_ != 0 )) {
  179. trace_var("pos_fbk", pos_fbk);
  180. trace_var("neg_fbk", neg_fbk);
  181. trace_var("delta_throughput", xh->delta_throughput_);
  182. int id = hdr_ip::access(p)->flowid();
  183. char buf[25];
  184. sprintf(buf, "X%d",id);
  185. trace_var(buf, xh->x_);
  186. // tracing measured thruput info
  187. if (xh->rtt_ > high_rtt_)
  188. high_rtt_ = xh->rtt_;
  189. total_thruput_ += pkt_size;
  190. if (num_mice_ != 0) {
  191. if (id >= num_mice_) 
  192. thruput_elep_ += pkt_size;
  193. else 
  194. thruput_mice_ += pkt_size;
  195. }
  196. }
  197. }
  198. void XCPQueue::Tq_timeout()
  199. {
  200. double inst_queue = byteLength();
  201. /* L 33 */
  202. queue_bytes_ = running_min_queue_bytes_;
  203. /* L 34 */
  204. running_min_queue_bytes_ = inst_queue;
  205. /* L 35 */
  206. Tq_ = max(0.002, (avg_rtt_ - inst_queue/link_capacity_bps_)/2.0); 
  207. /* L 36 */
  208. queue_timer_->resched(Tq_);
  209. if (TRACE && (queue_trace_file_ != 0)) {
  210. trace_var("Tq_", Tq_);
  211. trace_var("queue_bytes_", queue_bytes_);
  212. trace_var("routerId_", routerId_);
  213. }
  214. }
  215. void XCPQueue::Te_timeout()
  216. {
  217. if (TRACE && (queue_trace_file_ != 0)) {
  218. trace_var("residue_pos_fbk_not_allocated", residue_pos_fbk_);
  219. trace_var("residue_neg_fbk_not_allocated", residue_neg_fbk_);
  220. }
  221. /* L 8 */
  222. double input_bw = input_traffic_bytes_ / Te_;
  223. double phi_bps = 0.0;
  224. double shuffled_traffic_bps = 0.0;
  225. if (sum_inv_throughput_ != 0.0) {
  226. /* L 7 */
  227. avg_rtt_ = sum_rtt_by_throughput_ / sum_inv_throughput_;
  228. } else
  229. avg_rtt_ = INITIAL_Te_VALUE;
  230. if (input_traffic_bytes_ > 0) {
  231. /* L 9 */
  232. phi_bps = ALPHA_ * (link_capacity_bps_- input_bw) 
  233. - BETA_ * queue_bytes_ / avg_rtt_;
  234. /* L 10 */
  235. shuffled_traffic_bps = GAMMA_ * input_bw;
  236. if (shuffled_traffic_bps > abs(phi_bps))
  237. shuffled_traffic_bps -= abs(phi_bps);
  238. else
  239. shuffled_traffic_bps = 0.0;
  240. /* L 10 ends here */
  241. }
  242. /* L 11, 12 */
  243. residue_pos_fbk_ = max(0.0,  phi_bps) + shuffled_traffic_bps;
  244. residue_neg_fbk_ = max(0.0, -phi_bps) + shuffled_traffic_bps;
  245. if (sum_inv_throughput_ == 0.0)
  246. sum_inv_throughput_ = 1.0;
  247. if (input_traffic_bytes_ > 0) {
  248. /* L 13 */
  249. Cp_ =  residue_pos_fbk_ / sum_inv_throughput_;
  250. /* L 14 */
  251. Cn_ =  residue_neg_fbk_ / input_traffic_bytes_;
  252. } else 
  253. Cp_ = Cn_ = 0.0;
  254. if (TRACE && (queue_trace_file_ != 0)) {
  255. trace_var("input_traffic_bytes_", input_traffic_bytes_);
  256. trace_var("avg_rtt_", avg_rtt_);
  257. trace_var("residue_pos_fbk_", residue_pos_fbk_);
  258. trace_var("residue_neg_fbk_", residue_neg_fbk_);
  259. //trace_var("Qavg", edv_.v_ave);
  260. trace_var("Qsize", length());
  261. trace_var("min_queue_ci_", double(min_queue_ci_));
  262. trace_var("max_queue_ci_", double(max_queue_ci_));
  263. trace_var("routerId", routerId_);
  264. num_cc_packets_in_Te_ = 0;
  265. /* L 15 */  
  266. input_traffic_bytes_ = 0.0;
  267. /* L 16 */
  268. sum_inv_throughput_ = 0.0;
  269. /* L 17 */
  270. sum_rtt_by_throughput_ = 0.0;
  271. /* L 18 */
  272. Te_ = max(avg_rtt_, XCP_MIN_INTERVAL);
  273. /* L 19 */
  274. estimation_control_timer_->resched(Te_);
  275. min_queue_ci_ = max_queue_ci_ = length();
  276. }
  277. void XCPQueue::everyRTT ()
  278. {
  279. if (effective_rtt_ != 0.0)
  280. Tr_ = effective_rtt_;
  281. else 
  282. if (high_rtt_ != 0.0)
  283. Tr_ = high_rtt_;
  284. // measure drops, if any
  285. trace_var("d", drops_);
  286. drops_=0;
  287. // sample the current queue size
  288. trace_var("q",length());
  289.   
  290. // Update utilization 
  291. trace_var("u", total_thruput_/(Tr_*link_capacity_bps_));
  292. trace_var("u_elep", thruput_elep_/(Tr_*link_capacity_bps_));
  293. trace_var("u_mice", thruput_mice_/(Tr_*link_capacity_bps_));
  294. total_thruput_ = 0;
  295.   
  296. rtt_timer_->resched(Tr_);
  297. }
  298. void  XCPQueue::drop(Packet* p)
  299. {
  300. drops_++;
  301. total_drops_ = total_drops_++;
  302.   
  303. Connector::drop(p);
  304. }
  305. void  XCPQueue::setEffectiveRtt(double rtt)
  306. {
  307. effective_rtt_ = rtt;
  308. rtt_timer_->resched(effective_rtt_);
  309. }
  310. // Estimation & Control Helpers
  311. void XCPQueue::init_vars() 
  312. {
  313. link_capacity_bps_ = 0.0;
  314. avg_rtt_ = INITIAL_Te_VALUE;
  315. Te_ = INITIAL_Te_VALUE;
  316.    Tq_ = INITIAL_Te_VALUE; 
  317. Tr_                     = 0.1;
  318. high_rtt_               = 0.0;
  319. Cp_ = 0.0;
  320. Cn_ = 0.0;     
  321. residue_pos_fbk_ = 0.0;
  322. residue_neg_fbk_ = 0.0;     
  323. queue_bytes_ = 0.0; // our estimate of the fluid model queue
  324.   
  325. input_traffic_bytes_ = 0.0; 
  326. sum_rtt_by_throughput_ = 0.0;
  327. sum_inv_throughput_ = 0.0;
  328. running_min_queue_bytes_= 0;
  329. num_cc_packets_in_Te_   = 0;
  330.   
  331. queue_trace_file_ = 0;
  332. myQueue_ = 0;
  333.   
  334. min_queue_ci_ = max_queue_ci_ = length();
  335.   
  336. // measuring drops
  337. drops_ = 0;
  338. total_drops_ = 0;
  339. // utilisation
  340. num_mice_ = 0;
  341. thruput_elep_ = 0.0;
  342. thruput_mice_ = 0.0;
  343. total_thruput_ = 0.0;
  344. }
  345. void XCPTimer::expire(Event *) { 
  346. (*a_.*call_back_)();
  347. }
  348. void XCPQueue::trace_var(char * var_name, double var)
  349. {
  350. char wrk[500];
  351. double now = Scheduler::instance().clock();
  352. if (queue_trace_file_) {
  353. int n;
  354. sprintf(wrk, "%s %g %g",var_name, now, var);
  355. n = strlen(wrk);
  356. wrk[n] = 'n'; 
  357. wrk[n+1] = 0;
  358. (void)Tcl_Write(queue_trace_file_, wrk, n+1);
  359. }
  360. return; 
  361. }