pushback.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:34k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* -*-  Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
  2. /*
  3.  * Copyright (c) 2000  International Computer Science Institute
  4.  * All rights reserved.
  5.  *
  6.  * Redistribution and use in source and binary forms, with or without
  7.  * modification, are permitted provided that the following conditions
  8.  * are met:
  9.  * 1. Redistributions of source code must retain the above copyright
  10.  *    notice, this list of conditions and the following disclaimer.
  11.  * 2. Redistributions in binary form must reproduce the above copyright
  12.  *    notice, this list of conditions and the following disclaimer in the
  13.  *    documentation and/or other materials provided with the distribution.
  14.  * 3. All advertising materials mentioning features or use of this software
  15.  *    must display the following acknowledgement:
  16.  * This product includes software developed by ACIRI, the AT&T 
  17.  *      Center for Internet Research at ICSI (the International Computer
  18.  *      Science Institute).
  19.  * 4. Neither the name of ACIRI nor of ICSI may be used
  20.  *    to endorse or promote products derived from this software without
  21.  *    specific prior written permission.
  22.  *
  23.  * THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
  24.  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26.  * ARE DISCLAIMED.  IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
  27.  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  28.  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  29.  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  30.  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31.  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  32.  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  33.  * SUCH DAMAGE.
  34.  *
  35.  */
  36. #include "pushback.h"
  37. #include "ident-tree.h"
  38. #include "pushback-queue.h"
  39. #include "rate-limit.h"
  40. #include "pushback-message.h"
  41. //#define DEBUG  
  42. int hdr_pushback::offset_;
  43. static class PushbackHeaderClass : public PacketHeaderClass {
  44. public:
  45.   PushbackHeaderClass() : PacketHeaderClass("PacketHeader/Pushback", 
  46.     sizeof(hdr_pushback)) {
  47.     bind_offset(&hdr_pushback::offset_);
  48.   }
  49. } class_Pushback_hdr;
  50. static class PushbackClass : public TclClass {
  51. public:
  52.   PushbackClass() : TclClass("Agent/Pushback") {}
  53.   TclObject* create(int, const char*const*) {
  54.     return (new PushbackAgent());
  55.   }
  56. } class_Pushback;
  57. PushbackAgent::PushbackAgent() : Agent(PT_PUSHBACK), last_index_(0), intResult_(-1) {
  58.   bind("last_index_", &last_index_);
  59.   bind("intResult_", &intResult_);
  60.   bind_bool("enable_pushback_", &enable_pushback_);
  61.   bind_bool("verbose_", &verbose_);
  62.   timer_ = new PushbackTimer(this);
  63.   debugLevel = 3;
  64.   //  debugLevel = 0;
  65. }
  66. int 
  67. PushbackAgent::command(int argc, const char*const* argv) {
  68.   
  69.   Tcl& tcl = Tcl::instance();
  70.   if (argc == 4 ) {
  71.     if (strcmp(argv[1], "initialize") == 0) {
  72.       //get the node and routeLogic object
  73.       node_ = (Node *)TclObject::lookup(argv[2]);
  74.       rtLogic_ = (RouteLogic *)TclObject::lookup(argv[3]);
  75.       if (node_ == NULL || rtLogic_ == NULL) {
  76. if (verbose_) printf("Improper Initialization for Pushback Agentn");
  77. return(TCL_ERROR);
  78.       }
  79.       
  80.       sprintf(prnMsg, "node=%s rtLogic=%s id=%d address=%dn", node_->name(), 
  81.      rtLogic_->name(), node_->nodeid(), node_->address());
  82.       printMsg(prnMsg,0);
  83.       return(TCL_OK);
  84.     } 
  85.   }  
  86.   else if (argc == 3) { 
  87.     //$pba add-queue $queue
  88.     if (strcmp(argv[1], "add-queue") == 0) {
  89.       if (last_index_==MAX_QUEUES) {
  90. printf("queue list size exhausted - recompile with a bigger MAX_QUEUESn");
  91. exit(-1);
  92.       }
  93.       PushbackQueue * queue = (PushbackQueue *) TclObject::lookup(argv[2]);
  94.       if (queue == NULL) {
  95. printf("NULL queue passed n");
  96. exit(-1);
  97.       }
  98.       int index = last_index_++;
  99.       queue_list_[index].pbq_ = queue;
  100.       queue_list_[index].idTree_ = new IdentStruct();
  101.       
  102.       tcl.resultf("%d", index);
  103.       return (TCL_OK);
  104.     }
  105.   }
  106.   return (Agent::command(argc, argv));
  107. }
  108. void 
  109. PushbackAgent::reportDrop(int qid, Packet * p) {
  110.   if (!checkQID(qid)) {
  111.     sprintf(prnMsg,"Got invalid qid %dn", qid);
  112.     printMsg(prnMsg,0);
  113.     exit(-1);
  114.   }
  115.   
  116.   hdr_ip * iph = hdr_ip::access(p);
  117.   ns_addr_t src = iph->src();
  118.   ns_addr_t dst = iph->dst();
  119.   int fid = iph->flowid();
  120.   sprintf(prnMsg,"DropDetails from queue %d: %d.%d -> %d.%d (%d)n", qid, 
  121.    src.addr_, src.port_, dst.addr_, dst.port_, fid);
  122.   printMsg(prnMsg, 5);
  123.   
  124.   queue_list_[qid].idTree_->registerDrop(p);
  125. }
  126. void 
  127. PushbackAgent::calculateLowerBound(int qid, double arrRate) {
  128.   if (!checkQID(qid)) {
  129.     sprintf(prnMsg, "Got invalid id from queue in identifyAggregaten");
  130.     printMsg(prnMsg,0);
  131.     exit(-1);
  132.   }
  133.   AggReturn * aggReturn = queue_list_[qid].idTree_->calculateLowerBound();
  134.   if (aggReturn == NULL) {
  135.   //not sure what to do here.
  136.   //maybe lower bound should be left as it is
  137.   
  138.   return;
  139.   }
  140.   double lowerBound = 0;
  141.   int i = 0;
  142.   for (; i <= aggReturn->finalIndex_; i++) {
  143.       cluster currCluster = aggReturn->clusterList_[i];
  144.       AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
  145.       RateLimitSession * rls1 = 
  146.       queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
  147.       if (rls1 !=NULL) continue;
  148.       lowerBound = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
  149.       sprintf(prnMsg, "LB: count: %d totalCount_: %d arrRate: %gn", currCluster.count_, aggReturn->totalCount_, arrRate);
  150.       printMsg(prnMsg,0);
  151.       break;
  152.   }
  153.   
  154.   if (i == aggReturn->finalIndex_+1) {
  155.     sprintf(prnMsg, "Warning: All clusters being rate limitedn");
  156.     printMsg(prnMsg,0);
  157.     //exit(-1);
  158.   }
  159.   queue_list_[qid].idTree_->setLowerBound(lowerBound, 1);
  160.   
  161.   delete(aggReturn);
  162. }
  163. void
  164. PushbackAgent::identifyAggregate(int qid, double arrRate, double linkBW) {
  165.   //set up refresh timer for this queue, if this is the firstime you come here.
  166.   if (!timer_->containsRefresh(qid)) {
  167.     PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
  168.     timer_->insert(event);
  169.   }
  170.   // if (debug_) 
  171.   sprintf(prnMsg, "identifyAggregate for %dn",  qid);
  172.   printMsg(prnMsg,0);
  173.   if (!checkQID(qid)) {
  174.     sprintf(prnMsg, "Got invalid id from queue in identifyAggregaten");
  175.     printMsg(prnMsg,0);
  176.     exit(-1);
  177.   }
  178.   if (verbose_) queue_list_[qid].idTree_->traverse();
  179.   //this is a quick way of achieving this.
  180.   //but it can be justified on some grounds. will do a check with Sally later.
  181.   int noSessions = queue_list_[qid].pbq_->rlsList_->noMySessions(node_->nodeid());
  182. //   if (noSessions >= MAX_SESSIONS) {
  183. //    sprintf(prnMsg, "My hands are fulln");
  184. //    printMsg(prnMsg,0); 
  185. //    return;
  186. //   }
  187.   
  188.   AggReturn * aggReturn = queue_list_[qid].idTree_->identifyAggregate(arrRate, linkBW);
  189.   if (aggReturn == NULL) return;
  190.   for (int i=0; i<=aggReturn->finalIndex_; i++) {
  191.     cluster currCluster = aggReturn->clusterList_[i];
  192.     AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
  193.     
  194.     //don't insert the same aggregate again.
  195.     RateLimitSession * rls1 = 
  196.       queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
  197.     if (rls1 != NULL) {
  198.       sprintf(prnMsg, "got subset aggregate. Lowerbound = %g. agg = ", aggReturn->limit_);
  199.       printMsg(prnMsg,0);
  200.       aggSpec->print(); fflush(stdout);
  201.       delete(aggSpec);
  202.       //this could keep the lowerbound unnecessarily down.
  203.       //but don't be sympathetic with aggregates, which have been identified again.
  204.       if (aggReturn->limit_ < rls1->lowerBound_) {
  205. rls1->lowerBound_ = aggReturn->limit_;
  206.       }
  207.       //set the last misbehavior signal.
  208.       rls1->refreshed();
  209.       continue;
  210.     }
  211.     
  212.     double estimate = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
  213.     
  214.     if (noSessions >= MAX_SESSIONS) {
  215. int rank = queue_list_[qid].pbq_->rlsList_->rankRate(node_->nodeid(), estimate);
  216. if (rank >= MAX_SESSIONS) {
  217.     sprintf(prnMsg, "got rate <= minRate. agg = ");
  218.     printMsg(prnMsg,0);aggSpec->print(); fflush(stdout);
  219.     delete(aggSpec);     
  220.     continue;
  221.     }
  222.     
  223.     sprintf(prnMsg, "starting rate-limiting lower=%g estimate=%g agg ",  
  224.     aggReturn->limit_, estimate);
  225.     printMsg(prnMsg,0);
  226.     aggSpec->print();  fflush(stdout);
  227.     
  228.     double initialLimit = estimate; //*(1 - ambientDropRate);
  229.     RateLimitSession * rls = new RateLimitSession(aggSpec, estimate, 1, initialLimit, 
  230.   node_->nodeid(), qid, 
  231.   RATE_LIMIT_TIME_DEFAULT, aggReturn->limit_,
  232.   node_, rtLogic_);
  233.     queue_list_[qid].pbq_->rlsList_->insert(rls);
  234.       
  235.     PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
  236.     timer_->insert(event);
  237.     //    }
  238.     noSessions++;
  239.   }
  240.   queue_list_[qid].idTree_->setLowerBound(aggReturn->limit_, 0);
  241.   delete(aggReturn);
  242. }
  243. void
  244. PushbackAgent::resetDropLog(int qid) {
  245.   sprintf(prnMsg, " drop log reset for qid %dn",  qid);
  246.   printMsg(prnMsg,5);
  247.   
  248.   if (!checkQID(qid)) {
  249.     printf("Got invalid id from queue in resetDropLogn");
  250.     exit(-1);
  251.   }
  252.   queue_list_[qid].idTree_->reset();
  253. }
  254. void 
  255. PushbackAgent::timeout(PushbackEvent * event) {
  256.   
  257.   sprintf(prnMsg, "  %s event for qid %dn",  PushbackEvent::type(event), event->qid_);
  258.   printMsg(prnMsg,0);
  259.   switch (event->eventID_) {
  260.   case PUSHBACK_CHECK_EVENT: pushbackCheck(event->rls_);
  261.     break;
  262.   case PUSHBACK_REFRESH_EVENT: pushbackRefresh(event->qid_);
  263.     break;
  264.   case PUSHBACK_STATUS_EVENT: pushbackStatus(event->rls_);
  265.     break;
  266.   case INITIAL_UPDATE_EVENT: initialUpdate(event->rls_);
  267.     break;
  268.   default: sprintf(prnMsg, " Unrecognized event %dn",  event->eventID_);
  269.     printMsg(prnMsg,0);
  270.     break;
  271.   }
  272. }
  273. void 
  274. PushbackAgent::initialUpdate(RateLimitSession * rls) {
  275.   
  276.   if ( !rls->initialPhase_ ) {
  277.     sprintf(prnMsg, " Error: Update when not in initialphasen");
  278.      printMsg(prnMsg,0);
  279.      exit(-1);
  280.   }
  281.   double qdrop = queue_list_[rls->localQID_].pbq_->getDropRate();
  282.   double dropRate = rls->getDropRate();
  283.   double arrRate = rls->getArrivalRateForStatus();
  284.   double newLimit = arrRate*(1 - 2*(dropRate+qdrop));
  285.   
  286.   sprintf(prnMsg,"Initial-Update: qdrop=%g dr=%g newL=%g oldTarget=%g lowerBound=%g arr=%gn",
  287.   qdrop, dropRate, newLimit, rls->rlStrategy_->target_rate_, rls->lowerBound_, arrRate);
  288.   printMsg(prnMsg,0);
  289.   //cancel right now, if arrRate is significantly less than lower bound.
  290.   if (arrRate < 0.75*rls->lowerBound_) {
  291.       #ifdef DEBUG
  292.         double now = Scheduler::instance().clock();
  293.         printf("Cancel pushback A time: %5.3fn", now);
  294.       #endif
  295.       pushbackCancel(rls);
  296.       return;
  297.   }
  298.   if (newLimit > rls->lowerBound_) {
  299.     rls->setLimit(newLimit);
  300.     
  301.     PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
  302.     timer_->insert(event);
  303.   }
  304.   else {
  305.     rls->setLimit(rls->lowerBound_);
  306.     rls->initialPhase_ = 0;
  307.     
  308.     if (rls->logData_->count_!=0 && enable_pushback_) {
  309.       PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
  310.       timer_->insert(event);
  311.     }
  312.   }
  313. }
  314. void 
  315. PushbackAgent::pushbackCheck(RateLimitSession * rls) {
  316.   
  317.   double dropRate = rls->getDropRate();
  318.   if (dropRate >= DROP_RATE_FOR_PUSHBACK) {
  319.     rls->pushbackOn();
  320.     rls->heightInPTree_++;
  321.     
  322.     double totalRate =  rls->rlStrategy_->target_rate_;
  323.     int count = rls->logData_->count_;
  324.     double fairShare = totalRate/count;
  325.     int done = count;
  326.     
  327.     //max-min allocation of limit.
  328.     while (done != 0) {
  329.       LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  330.       int countThisRound=0;
  331.       while (lgdsNode != NULL) {
  332. double rate = lgdsNode->rateEstimator_->estRate_;
  333. if (rate <= fairShare && !lgdsNode->pushbackSent_) {
  334.   AggSpec * aggSpec = rls->aggSpec_->clone();
  335.   PushbackMessage * msg;
  336.   if (rate < fairShare/2.0) {
  337.     msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_, 
  338.      rls->localID_, aggSpec, INFINITE_LIMIT, 
  339.      rls->depthInPTree_);
  340.     lgdsNode->pushbackSent(INFINITE_LIMIT, rate);
  341.   }   
  342.   else {
  343.     msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_, 
  344.      rls->localID_, aggSpec, rate, rls->depthInPTree_);
  345.     lgdsNode->pushbackSent(rate, rate);
  346.   }
  347.   sendMsg(msg);
  348.   countThisRound++;
  349.   done--;
  350.   totalRate -= rate;
  351. }
  352. lgdsNode = lgdsNode->next_;
  353.       }
  354.       if (done == 0) break;
  355.       if (countThisRound==0) {
  356. //allocate fairshare to everyone and end.
  357. LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  358. while (lgdsNode != NULL) {
  359.   if (!lgdsNode->pushbackSent_) {
  360.     AggSpec * aggSpec = rls->aggSpec_->clone();
  361.     PushbackMessage * msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, 
  362.        rls->localQID_, rls->localID_, 
  363.        aggSpec, fairShare, 
  364.        rls->depthInPTree_);
  365.     lgdsNode->pushbackSent(fairShare,lgdsNode->rateEstimator_->estRate_);
  366.     sendMsg(msg);
  367.     done--;
  368.     totalRate-=fairShare;
  369.   }
  370.   lgdsNode = lgdsNode->next_;
  371. }
  372.       }
  373.       else {
  374. fairShare= totalRate/done;
  375.       }
  376.     }
  377.     
  378.   }
  379.   else {
  380.     //set up pushback check for later.
  381.     PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
  382.     timer_->insert(event);
  383.   }
  384. }
  385. void 
  386. PushbackAgent::pushbackStatus(RateLimitSession * rls) {
  387.   if (rls->pushbackON_) {
  388.     sprintf(prnMsg, " Warning: status timer expired for non-leaf noden");
  389.     printMsg(prnMsg,0);
  390.      //exit(-1);
  391.   }
  392.   double rate = rls->getArrivalRateForStatus();
  393.   rls->logData_->resetStatus();
  394.   
  395.   PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_, 
  396.     rls->remoteQID_, rls->remoteID_, 
  397.     rate, rls->heightInPTree_);
  398.   sendMsg(msg);
  399. }
  400. void 
  401. PushbackAgent::pushbackRefresh(int qid) {
  402.    
  403.   PushbackQueue * pbq = queue_list_[qid].pbq_;
  404.   int oldSessions = pbq->rlsList_->noMySessions(node_->nodeid());
  405.   if (!oldSessions) {
  406.     //set up refresh timers for a later time and return.
  407. // PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
  408. // timer_->insert(event);
  409.     return;
  410.   }
  411.   
  412.   int noSessions = oldSessions;
  413.   if (MERGER_MODE == 1) {
  414.       pbq->rlsList_->mergeSessions(node_->nodeid());
  415.       noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
  416.       
  417.       if (noSessions!=oldSessions) {
  418.       sprintf(prnMsg, " Some sessions merged. old = %d new = %dn",  oldSessions, noSessions);
  419.       printMsg(prnMsg,0);
  420.   //get rid of merged RLS's
  421.   RateLimitSession * listItem = pbq->rlsList_->first_;
  422.   while (listItem != NULL) {
  423.   if (listItem->origin_ == node_->nodeid() && listItem->merged_) {
  424.   pushbackCancel(listItem);
  425.   listItem = listItem->next_;
  426.   
  427.   }
  428.   }
  429.   } else {
  430.       sprintf(prnMsg, " No sessions merged. number = %dn", noSessions);
  431.       printMsg(prnMsg,0);
  432.       }
  433.   } else {
  434.   sprintf(prnMsg, "Number of sessions = %dn", noSessions);
  435.   printMsg(prnMsg,0);
  436.   }
  437.   double now = Scheduler::instance().clock();
  438.   
  439.   //check if some sessions need to be discarded because of rate-limiting too many sessions
  440.   RateLimitSession * listItem1 = pbq->rlsList_->first_;
  441.   while (noSessions > MAX_SESSIONS && listItem1 != NULL) {
  442.       int rank = pbq->rlsList_->rankRate(node_->nodeid(), listItem1->getArrivalRateForStatus());
  443.       if (listItem1->origin_ == node_->nodeid() && 
  444.   rank >= MAX_SESSIONS && (now - listItem1->startTime_) >= EARLIEST_TIME_TO_FREE) {
  445.   sprintf(prnMsg,"Releasing because of too many being rate-limitedn");
  446.   printMsg(prnMsg,0);
  447.   if (LOWER_BOUND_MODE == 1 && 
  448.       queue_list_[qid].idTree_->lowerBound_ < listItem1->getArrivalRateForStatus()) {
  449.       queue_list_[qid].idTree_->lowerBound_ = listItem1->getArrivalRateForStatus();
  450.   }
  451.   pushbackCancel(listItem1);
  452.   noSessions--;
  453.       }
  454.       listItem1 = listItem1->next_;
  455.   }
  456.   
  457.   double linkBW = pbq->getBW();
  458.   double arrRate = pbq->getRate();
  459.   double targetRate = linkBW/(1 - TARGET_DROPRATE);
  460.   
  461.   double totalRateLimitedArrivalRate = 0;
  462.   double totalLimit=0;
  463.   double lowerBound=-1;
  464.   RateLimitSession * listItem = pbq->rlsList_->first_;
  465.   while (listItem != NULL) {
  466.     if (listItem->origin_ == node_->nodeid() && !listItem->merged_) {
  467.       double sessionArrRate = listItem->getArrivalRateForStatus();
  468.       double sessionLimit = listItem->rlStrategy_->target_rate_;
  469.       totalRateLimitedArrivalRate+= sessionArrRate;
  470.       totalLimit+= (sessionArrRate > sessionLimit)? sessionLimit: sessionArrRate;
  471.       if (listItem->lowerBound_ < lowerBound || lowerBound == -1) {
  472.   lowerBound = listItem->lowerBound_;
  473.       }
  474.     }
  475.     listItem = listItem->next_;
  476.   }
  477.   if (LOWER_BOUND_MODE == 1) {
  478.   lowerBound = queue_list_[qid].idTree_->lowerBound_;
  479.   }
  480.   double excessRate = (arrRate - totalLimit + totalRateLimitedArrivalRate) - targetRate;
  481.   
  482.   sprintf(prnMsg,"arr=%g totalLimit=%g totalRateLimit=%g excess=%gn",  arrRate, totalLimit, 
  483.   totalRateLimitedArrivalRate, excessRate);
  484.   printMsg(prnMsg,0);
  485.   
  486.   if (excessRate < 0) {
  487.   sprintf(prnMsg, "Negative Excess Rate. Things maybe fine now.n");
  488.   printMsg(prnMsg,0);
  489.   //this would make all sessions go away after a while.
  490. #ifdef DEBUG
  491.   printf("Negative Excess Rate - time: %5.3fn", now);
  492. #endif
  493.   requiredLimit_ = 2*totalRateLimitedArrivalRate;
  494.   } else {
  495.   //Should we allow such an abrupt increase when the number of sessions 
  496.   // changes?
  497.   // How about: Let L be the requiredLimit.
  498.   // We need Sum (session arrival rate - L ) = excessRate
  499.   requiredLimit_ = (totalRateLimitedArrivalRate - excessRate)/noSessions;
  500.   if (requiredLimit_ < lowerBound) {
  501.   requiredLimit_ = lowerBound;
  502.   }
  503. #ifdef DEBUG
  504.       printf("New requiredLimit - time: %5.3f limit: %5.3f lowerBound:%5.3f n", now, requiredLimit_, lowerBound);
  505. #endif
  506.   }
  507.   sprintf(prnMsg,"Refresh. target=%g limit=%g floor=%gn", targetRate, requiredLimit_,
  508.   lowerBound);
  509.   printMsg(prnMsg,0);
  510.   //consider all sessions in ascending order of their arrival rate
  511.   for (int i=0; i<noSessions; i++) {
  512.   listItem = pbq->rlsList_->first_;
  513.   while (listItem != NULL ) {
  514.   if (listItem->origin_ == node_->nodeid() && 
  515.   pbq->rlsList_->rankSession(node_->nodeid(),listItem) == i) 
  516.   break;
  517.  listItem = listItem->next_;
  518.   }
  519.   if (listItem == NULL) {
  520.   printf("Error: Rank %d not foundn", i);
  521.   exit(0);
  522.   }
  523.   
  524.   double oldLimit = listItem->rlStrategy_->target_rate_;
  525.   double sendRate = listItem->getArrivalRateForStatus();
  526. #ifdef DEBUG
  527.   printf("time: %5.3f ID: %d sendRate %5.3f oldLimit %5.3f requiredLimit %5.3fn", now,
  528.  listItem->localID_, sendRate, oldLimit, requiredLimit_);
  529. #endif
  530.   //Session sending less than the limit.
  531.   if (sendRate < requiredLimit_) {
  532.   //if it has been sending less for "some" time.
  533.   if (now - listItem->refreshTime_ >= MIN_TIME_TO_FREE) {
  534. #ifdef DEBUG
  535. printf("time: %5.3f ID: %d refreshTime %5.3f MIN %d Cancel pushback B n", 
  536.    now, listItem->localID_, listItem->refreshTime_, MIN_TIME_TO_FREE);
  537. #endif
  538. pushbackCancel(listItem);       //cancel rate-limiting
  539. requiredLimit_+= (requiredLimit_ - sendRate)/(noSessions - i - 1);
  540. i--; noSessions--;
  541.   } 
  542.   else {
  543.   //refresh upstream with double of max(sending rate, old limit)
  544.   //just using sending rate, limits the amount an aggregate can grow till next refresh
  545.   //using just old limit is tricky when different aggregates have different limits.
  546.   //at the same time, we would prefer not to loosen the hold too much in one step.
  547. #ifdef DEBUG
  548.   printf("time: %5.3f ID: %d double limitn", now, listItem->localID_);
  549. #endif
  550.   double maxR = sendRate>oldLimit? sendRate: oldLimit;
  551.   if (now - listItem->refreshTime_ <= PRIMARY_WAITING_ZONE) {
  552.   sprintf(prnMsg,"Waiting Zone 1: sendRate=%g oldLimit=%gn", sendRate, oldLimit);
  553.   printMsg(prnMsg,0);
  554.   }
  555.   else {
  556.   sprintf(prnMsg,"Waiting Zone 2: sendRate=%g oldLimit=%gn", sendRate, oldLimit);
  557.   printMsg(prnMsg,0);
  558.   maxR *= 1.5;
  559.   }
  560.   if (maxR < requiredLimit_) {
  561.   listItem->setLimit(maxR);
  562.   requiredLimit_ += (requiredLimit_ - maxR)/(noSessions - i - 1);
  563.   } 
  564.   else {
  565.   listItem->setLimit(requiredLimit_);
  566.   }
  567.   
  568.   if (listItem->pushbackON_) 
  569.   refreshUpstreamLimits(listItem);
  570.   }
  571.   }
  572.       else {
  573.   //change the rate limit most half way.
  574.   double newLimit;
  575.   if (oldLimit > 1.25 * requiredLimit_ || oldLimit ==0) 
  576.   newLimit = requiredLimit_;
  577.   else 
  578.   newLimit = 0.5*requiredLimit_ + 0.5*oldLimit;
  579.   
  580.   if (newLimit < lowerBound) 
  581.   newLimit = lowerBound;
  582.   
  583.   listItem->refreshed();
  584.   listItem->setLimit(newLimit);
  585.   if (listItem->pushbackON_) 
  586.   refreshUpstreamLimits(listItem);
  587. #ifdef DEBUG
  588.   printf("time: %5.3f ID: %d newLimit %5.3f oldLimit %5.3f requiredLimit %5.3fn", 
  589.  now, listItem->localID_, newLimit, oldLimit, requiredLimit_);
  590. #endif
  591.       }
  592.   }    
  593.   
  594.   //setup refresh timer again
  595.   noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
  596.   if (noSessions) {
  597.     PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
  598.     timer_->insert(event);
  599.   }
  600. }
  601. void
  602. PushbackAgent::pushbackCancel(RateLimitSession * rls) {
  603.  
  604.   sprintf(prnMsg,"Stopping rate-limiting for aggregate: ");
  605.   printMsg(prnMsg,0);
  606.   rls->aggSpec_->print();
  607.   fflush(stdout);
  608.   #ifdef DEBUG
  609.     double now = Scheduler::instance().clock();
  610.     printf("time: %5.3f ID: %d Cancel pushback Cn", now, rls->localID_);
  611.   #endif
  612.   if (rls->pushbackON_) {
  613.     LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  614.     while (lgdsNode != NULL) {
  615.       PushbackMessage * msg = new PushbackCancelMessage(node_->nodeid(), lgdsNode->nid_, 
  616. rls->localQID_, rls->localID_);
  617.       sendMsg(msg);
  618.       lgdsNode = lgdsNode->next_;
  619.     }
  620.   }
  621.   
  622.   //remove all events that point to this rls.
  623.   timer_->removeEvents(rls);
  624.   //local cancellation here.
  625.   queue_list_[rls->localQID_].pbq_->rlsList_->endSession(rls);
  626.   
  627. }
  628. //######################## Message Receiving Code #####################
  629. void 
  630. PushbackAgent::recv(Packet * pkt, Handler * h) {
  631.   
  632.   hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
  633.   PushbackMessage * msg = hdr_push->msg_;
  634.   
  635.   sprintf(prnMsg, " %s msg from %dn",  PushbackMessage::type(msg), msg->senderID_);
  636.   printMsg(prnMsg,0);
  637.  
  638.   switch (msg->msgID_) {
  639.   case PUSHBACK_REQUEST_MSG : processPushbackRequest((PushbackRequestMessage *)msg);
  640.     break;
  641.   case PUSHBACK_STATUS_MSG : processPushbackStatus((PushbackStatusMessage *) msg);
  642.     break;
  643.   case PUSHBACK_REFRESH_MSG : processPushbackRefresh((PushbackRefreshMessage *) msg);
  644.     break;
  645.   case PUSHBACK_CANCEL_MSG : processPushbackCancel((PushbackCancelMessage *) msg);
  646.     break;
  647.   default: fprintf(stderr,"PBA: %s Undefined Message ID %dn", name(),msg->msgID_);
  648.   } 
  649.   
  650.   delete(msg);
  651. }
  652. void 
  653. PushbackAgent::processPushbackRequest(PushbackRequestMessage * msg) {
  654.   
  655.   int qid = getQID(msg->senderID_);
  656.   sprintf(prnMsg, " pushback request from %d for qid=%d limit=%gn", msg->senderID_, 
  657.  qid, msg->limit_);
  658.   printMsg(prnMsg,0);
  659.  
  660.   AggSpec * aggSpec = msg->aggSpec_;
  661.   if (queue_list_[qid].pbq_->rlsList_->containsAggSpec(aggSpec)) {
  662.   fprintf(stdout,"PBA: %s got a pushback req for agg I already rate-limit. 
  663. Feature not yet Implementedn",name()); 
  664.   exit(-1);
  665.   }
  666.   
  667.   RateLimitSession * rls = new RateLimitSession(aggSpec, msg->limit_, msg->senderID_, qid, 
  668. msg->qid_, msg->rlsID_, msg->depth_+1, 
  669. RATE_LIMIT_TIME_DEFAULT, -1, node_, rtLogic_);
  670.   queue_list_[qid].pbq_->rlsList_->insert(rls);
  671.   //pushback propagation check if there are valid upstream neighbors && enable_pushback_
  672.   if (rls->logData_->count_ && enable_pushback_) {
  673.     PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
  674.     timer_->insert(event);
  675.   }
  676. }
  677. void 
  678. PushbackAgent::processPushbackStatus(PushbackStatusMessage * msg) {
  679.   int qid = msg->qid_;
  680.   if (!checkQID(qid)) {
  681.     sprintf(prnMsg, " Got invalid qid from %d in status messagen",  msg->senderID_);
  682.     printMsg(prnMsg,0);
  683.     exit(-1);
  684.   }
  685.   RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByLocalID(msg->rlsID_);
  686.   
  687.   if (rls == NULL) {
  688.     sprintf(prnMsg, " session %d not foundn",  msg->rlsID_);
  689.     printMsg(prnMsg,0);
  690.     exit(-1);
  691.   }
  692.   
  693.   //increase your height if you need to.
  694.   if (msg->height_ + 1 > rls->heightInPTree_) {
  695.     rls->heightInPTree_ = msg->height_ + 1;
  696.     sprintf(prnMsg, " height increased to %dn",  rls->heightInPTree_);
  697.     printMsg(prnMsg,0);
  698.   }
  699.   rls->logData_->registerStatus(msg->senderID_, msg->arrivalRate_);
  700.   sprintf(prnMsg, " got rate %gn",  msg->arrivalRate_);
  701.   printMsg(prnMsg,0);
  702.   //send status if you are not root.
  703.   if (rls->origin_!= node_->nodeid()) {
  704.     // 1. check to see if status from all the upstream neighbors has arrived.
  705.     // 2. if yes, then send status downstream.
  706.     int gotAll = rls->logData_->consolidateStatus();
  707.     if (gotAll==1) {
  708.       //send status down
  709.       double rate = rls->logData_->statusArrivalRateAll_;
  710.       PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_, 
  711. rls->remoteQID_, rls->remoteID_, 
  712.       rate, rls->heightInPTree_);
  713.       sendMsg(msg);
  714.       timer_->cancelStatus(rls);
  715.       //reset status arrivals.
  716.       rls->logData_->resetStatus();
  717.     }
  718.   }
  719. }
  720. void
  721. PushbackAgent::processPushbackRefresh(PushbackRefreshMessage *msg) {
  722.   int qid = getQID(msg->senderID_);
  723.   sprintf(prnMsg, " pushback refresh from %d for qid=%d with limit=%gn",  msg->senderID_, qid, msg->limit_);
  724.   printMsg(prnMsg,0);
  725.   RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
  726.   
  727.   if (rls == NULL) {
  728.     sprintf(prnMsg, " session %d not foundn",  msg->rlsID_);
  729.     printMsg(prnMsg,0);
  730.     exit(-1);
  731.   }
  732.   //1. change your own rate limit
  733.   rls->setAggSpec(msg->aggSpec_);
  734.   delete(msg->aggSpec_);
  735.   double newLimit = msg->limit_;
  736.   rls->setLimit(newLimit);
  737.   //2. if pushback has been propagated send out refreshes upstream with new limits
  738.   if (rls->pushbackON_) {
  739.     refreshUpstreamLimits(rls);
  740.   }
  741.   
  742.   //3. set up status timer.
  743.   PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME - 0.1*rls->depthInPTree_, 
  744.      PUSHBACK_STATUS_EVENT, rls);
  745.   timer_->insert(event);
  746. }
  747. void 
  748. PushbackAgent::processPushbackCancel(PushbackCancelMessage *msg) {
  749.  
  750.   int qid = getQID(msg->senderID_);
  751.   sprintf(prnMsg, " pushback cancel from %d for queue index %dn",  msg->senderID_, qid);
  752.   printMsg(prnMsg,0);
  753.   RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
  754.   
  755.   if (rls == NULL) {
  756.     sprintf(prnMsg, " session %d not foundn",  msg->rlsID_);
  757.     printMsg(prnMsg,0);
  758.     exit(-1);
  759.   }
  760.   pushbackCancel(rls);
  761. }
  762. void 
  763. PushbackAgent::refreshUpstreamLimits(RateLimitSession * rls) {
  764.   double totalRate =  rls->rlStrategy_->target_rate_;
  765.   int count = rls->logData_->count_;
  766.   double fairShare = totalRate/count;
  767.   int done = count;
  768.   double arrRate = rls->getArrivalRateForStatus();
  769.   sprintf(prnMsg, "Sending refresh messages to %d nodes. Limit = %g arrRate = %gn", count, totalRate, arrRate);
  770.   printMsg(prnMsg,0); 
  771.   
  772.   int excess = 0;
  773.   if (totalRate > arrRate) {
  774.   excess = 1;
  775.   }
  776.   //max-min allocation of limit.
  777.   while (done != 0) {
  778.     LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  779.     int countThisRound=0;
  780.     while (lgdsNode != NULL) {
  781.       double rate;
  782.       rate = lgdsNode->statusArrivalRate_;
  783.       if (rate <= fairShare && !lgdsNode->sentRefresh_) {
  784. AggSpec * aggSpec = rls->aggSpec_->clone();
  785. PushbackMessage * msg;
  786. if (rate < fairShare/2.0) {
  787.   msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_, 
  788.    rls->localID_, aggSpec, INFINITE_LIMIT);
  789.   lgdsNode->sentRefresh(INFINITE_LIMIT);
  790. }
  791. else if (!excess) {
  792.   msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_, 
  793.    rls->localID_, aggSpec, rate);
  794.   lgdsNode->sentRefresh(rate);
  795. }
  796. else {
  797.   msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_, 
  798.    rls->localID_, aggSpec, fairShare);
  799.   lgdsNode->sentRefresh(fairShare);
  800.   rate = fairShare;
  801. }
  802. sendMsg(msg);
  803. countThisRound++;
  804. done--;
  805. totalRate -= rate;
  806.       }
  807.       lgdsNode = lgdsNode->next_;
  808.     }
  809.     if (done == 0) break;
  810.     if (countThisRound==0) {
  811.       //allocate fairshare to everyone and end.
  812.       LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  813.       while (lgdsNode != NULL) {
  814. if (!lgdsNode->sentRefresh_) {
  815.   AggSpec * aggSpec = rls->aggSpec_->clone();
  816.   PushbackMessage * msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, 
  817.      rls->localQID_, rls->localID_, 
  818.      aggSpec, fairShare);
  819.     lgdsNode->sentRefresh(fairShare);
  820.     sendMsg(msg);
  821.     done--;
  822.     totalRate-=fairShare;
  823. }
  824. lgdsNode = lgdsNode->next_;
  825.       }
  826.     }
  827.     else {
  828.       fairShare = totalRate/done;
  829.     }
  830.   }
  831.   
  832.   //reset all the sentRefresh bits
  833.   LoggingDataStructNode * lgdsNode = rls->logData_->first_;
  834.   while (lgdsNode != NULL) {
  835.     lgdsNode->sentRefresh_ = 0;
  836.     lgdsNode = lgdsNode->next_;
  837.   }
  838. }
  839. int
  840. PushbackAgent::getQID(int sender) {
  841.   
  842.   Tcl& tcl = Tcl::instance();
  843.   intResult_ = -1;
  844.   int index = 0;
  845.   // there gotta be better ways of doing this;  todoLater.
  846.   // like make Tcl call you back and set a variable using command.
  847.   for (; index <last_index_; index++) {
  848.     tcl.evalf("%s set intResult_ [%s check-queue %d %d %s]", name(), name(), 
  849.       node_->nodeid(), sender , queue_list_[index].pbq_->name());
  850.     if (intResult_ == 1) break;
  851.   }
  852.   
  853.   if (index == last_index_) {
  854.     sprintf(prnMsg, " right queue not foundn");
  855.     printMsg(prnMsg,0);
  856.     exit(-1);
  857.   }
  858.   return index;
  859. }
  860. void
  861. PushbackAgent::sendMsg(PushbackMessage * msg) {
  862.   
  863.   Tcl& tcl = Tcl::instance();
  864.   
  865.   dst_.addr_ = msg->targetID_;
  866.   //this assumes that all pushback agents have port zero.
  867.   
  868.   tcl.evalf("%s set intResult_ [%s get-pba-port %d]", name(), name(),dst_.addr_ );
  869.   if ( intResult_ == -1 ) {
  870.     fprintf(stderr,"PBA: %s Pushback Agent not found on Node %dn", name(), dst_.addr_);
  871.     return;
  872.   }
  873.   dst_.port_ = intResult_;
  874.   Packet *pkt = allocpkt();
  875.   hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
  876.   hdr_push->msg_ = msg;
  877.     
  878.   sprintf(prnMsg, " sent %s message to %d.%dn", PushbackMessage::type(msg), dst_.addr_, dst_.port_);
  879.   printMsg(prnMsg,4);
  880.   send(pkt,0);
  881. }
  882. void 
  883. PushbackAgent::printMsg(char * msg, int msgLevel) {
  884.   
  885.   if (msgLevel < debugLevel) {
  886.     if (verbose_) printf("PBA:%d (%g) %s", node_->nodeid(), Scheduler::instance().clock(), msg);
  887.     fflush(stdout);
  888.   }
  889. }
  890. int 
  891. PushbackAgent::checkQID(int qid) {
  892.   if (qid < 0 || qid >= last_index_) 
  893.     return 0;
  894.   else 
  895.     return 1;
  896. }
  897. //decide whether to accept a merger involving "count" aggregates, 
  898. //the number of bits in the resultant aggregate would be "bits"
  899. //the aggregate is being broadended by "bitsDiff" (measured from shortest prefix)
  900. int
  901. PushbackAgent::mergerAccept(int count, int bits, int bitsDiff) {
  902.   
  903.   //todo: think of a smarter way.
  904.   //currently merge if bits < some value.
  905.   //return (bits <= MIN_BITS_FOR_MERGER);
  906.   return 0;
  907. }
  908.   
  909. // ############################### PushbackTimer Methods ############################
  910. void 
  911. PushbackTimer::expire(Event *e) {
  912.  
  913.   if (firstEvent_ == NULL) {
  914.     printf("PushbackTimer: No event found on expiryn");
  915.     exit(-1);
  916.   }
  917.   
  918.   PushbackEvent * event = firstEvent_;
  919.   firstEvent_= firstEvent_->next_;
  920.   schedule();
  921.   agent_->timeout(event);
  922.   delete(event);
  923. }
  924.  
  925. void
  926. PushbackTimer::insert(PushbackEvent * event) {
  927.   sprintf(agent_->prnMsg,"%s timer setn", PushbackEvent::type(event));
  928.   agent_->printMsg(agent_->prnMsg,4);
  929.   if (firstEvent_ == NULL) {
  930.     firstEvent_ = event;
  931.     schedule();
  932.     return;
  933.   }
  934.   if (event->time_ < firstEvent_->time_) {
  935.     event->setSucc(firstEvent_);
  936.     firstEvent_=event;
  937.     schedule();
  938.     return;
  939.   }
  940.   PushbackEvent * listItem = firstEvent_;
  941.   while (listItem->next_!=NULL && listItem->next_->time_ <= event->time_) {
  942.     listItem = listItem->next_;
  943.   }
  944.   
  945.   event->setSucc(listItem->next_);
  946.   listItem->setSucc(event);
  947.  
  948.   //comment the sanity check out later
  949.   sanityCheck();
  950.   
  951.   return;
  952. }
  953. void
  954. PushbackTimer::removeEvents(RateLimitSession * rls) {
  955.   if (firstEvent_==NULL) return;
  956.   while (firstEvent_!= NULL && firstEvent_->rls_==rls) {
  957.     cancel();
  958.     PushbackEvent * event = firstEvent_;
  959.     firstEvent_=firstEvent_->next_;
  960.     delete(event);
  961.     schedule();
  962.   }
  963.   if (firstEvent_==NULL) return;
  964.   PushbackEvent * previous = firstEvent_;
  965.   PushbackEvent * current = firstEvent_->next_;
  966.   while (current!=NULL) {
  967.     if (current->rls_==rls) {
  968.       previous->next_=current->next_;
  969.       delete(current);
  970.       current = previous->next_;
  971.       continue;
  972.     }
  973.     previous=current;
  974.     current=current->next_;
  975.   }
  976. }
  977.       
  978. void 
  979. PushbackTimer::schedule() {
  980.   if (firstEvent_== NULL) {
  981.     sprintf(agent_->prnMsg,"Timer: Nothing to schedulen");
  982.     agent_->printMsg(agent_->prnMsg, 0);
  983.     return;
  984.   }
  985.   resched(firstEvent_->time_ - Scheduler::instance().clock());
  986. }
  987. void 
  988. PushbackTimer::cancelStatus(RateLimitSession * rls) {
  989.   
  990.   if (firstEvent_==NULL) {
  991.     sprintf(agent_->prnMsg, " Error timer list emptyn");
  992.     agent_->printMsg(agent_->prnMsg, 0);
  993.     //return; 
  994.     exit(-1);
  995.   }
  996.   
  997.   if (firstEvent_->eventID_==PUSHBACK_STATUS_EVENT && firstEvent_->rls_==rls) {
  998.     cancel();
  999.     PushbackEvent * event = firstEvent_;
  1000.     firstEvent_=firstEvent_->next_;
  1001.     delete(event);
  1002.     schedule();
  1003.     return;
  1004.   }
  1005.   
  1006.   PushbackEvent * previous = firstEvent_;
  1007.   PushbackEvent * current = firstEvent_->next_;
  1008.   
  1009.   while (current!=NULL) {
  1010.     if (current->eventID_ == PUSHBACK_STATUS_EVENT && current->rls_==rls) {
  1011.       previous->next_=current->next_;
  1012.       delete(current);
  1013.       return;
  1014.     } 
  1015.     previous=current;
  1016.     current=current->next_;
  1017.   }
  1018.   sprintf(agent_->prnMsg, "Error status timer not foundn");
  1019.   agent_->printMsg(agent_->prnMsg, 0);
  1020.   exit(-1);
  1021. }
  1022. int 
  1023. PushbackTimer::containsRefresh(int qid) {
  1024.   PushbackEvent * listItem = firstEvent_;
  1025.   while (listItem!=NULL) {
  1026.     if (listItem->eventID_ == PUSHBACK_REFRESH_EVENT && listItem->qid_==qid) 
  1027.       return 1;
  1028.     listItem = listItem->next_;
  1029.   }
  1030.   return 0;
  1031. }
  1032. void 
  1033. PushbackTimer::sanityCheck() {
  1034.   if (firstEvent_==NULL || firstEvent_->next_ == NULL) return;
  1035.   
  1036.   PushbackEvent * listItem = firstEvent_;
  1037.   while (listItem->next_!=NULL) {
  1038.     if (listItem->time_ > listItem->next_->time_) {
  1039.       sprintf(agent_->prnMsg, "Sanity Check Failedn");
  1040.       agent_->printMsg(agent_->prnMsg, 0);
  1041.       exit(-1);
  1042.     }
  1043.     listItem = listItem->next_;
  1044.   }
  1045. }
  1046.