pushback.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:34k
- /* -*- Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
- /*
- * Copyright (c) 2000 International Computer Science Institute
- * All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions
- * are met:
- * 1. Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * 2. Redistributions in binary form must reproduce the above copyright
- * notice, this list of conditions and the following disclaimer in the
- * documentation and/or other materials provided with the distribution.
- * 3. All advertising materials mentioning features or use of this software
- * must display the following acknowledgement:
- * This product includes software developed by ACIRI, the AT&T
- * Center for Internet Research at ICSI (the International Computer
- * Science Institute).
- * 4. Neither the name of ACIRI nor of ICSI may be used
- * to endorse or promote products derived from this software without
- * specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
- * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
- * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
- * ARE DISCLAIMED. IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
- * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
- * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
- * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
- * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
- * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
- * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
- * SUCH DAMAGE.
- *
- */
- #include "pushback.h"
- #include "ident-tree.h"
- #include "pushback-queue.h"
- #include "rate-limit.h"
- #include "pushback-message.h"
- //#define DEBUG
- int hdr_pushback::offset_;
- static class PushbackHeaderClass : public PacketHeaderClass {
- public:
- PushbackHeaderClass() : PacketHeaderClass("PacketHeader/Pushback",
- sizeof(hdr_pushback)) {
- bind_offset(&hdr_pushback::offset_);
- }
- } class_Pushback_hdr;
- static class PushbackClass : public TclClass {
- public:
- PushbackClass() : TclClass("Agent/Pushback") {}
- TclObject* create(int, const char*const*) {
- return (new PushbackAgent());
- }
- } class_Pushback;
- PushbackAgent::PushbackAgent() : Agent(PT_PUSHBACK), last_index_(0), intResult_(-1) {
- bind("last_index_", &last_index_);
- bind("intResult_", &intResult_);
- bind_bool("enable_pushback_", &enable_pushback_);
- bind_bool("verbose_", &verbose_);
- timer_ = new PushbackTimer(this);
- debugLevel = 3;
- // debugLevel = 0;
- }
- int
- PushbackAgent::command(int argc, const char*const* argv) {
-
- Tcl& tcl = Tcl::instance();
- if (argc == 4 ) {
- if (strcmp(argv[1], "initialize") == 0) {
- //get the node and routeLogic object
- node_ = (Node *)TclObject::lookup(argv[2]);
- rtLogic_ = (RouteLogic *)TclObject::lookup(argv[3]);
- if (node_ == NULL || rtLogic_ == NULL) {
- if (verbose_) printf("Improper Initialization for Pushback Agentn");
- return(TCL_ERROR);
- }
-
- sprintf(prnMsg, "node=%s rtLogic=%s id=%d address=%dn", node_->name(),
- rtLogic_->name(), node_->nodeid(), node_->address());
- printMsg(prnMsg,0);
- return(TCL_OK);
- }
- }
- else if (argc == 3) {
- //$pba add-queue $queue
- if (strcmp(argv[1], "add-queue") == 0) {
- if (last_index_==MAX_QUEUES) {
- printf("queue list size exhausted - recompile with a bigger MAX_QUEUESn");
- exit(-1);
- }
- PushbackQueue * queue = (PushbackQueue *) TclObject::lookup(argv[2]);
- if (queue == NULL) {
- printf("NULL queue passed n");
- exit(-1);
- }
- int index = last_index_++;
- queue_list_[index].pbq_ = queue;
- queue_list_[index].idTree_ = new IdentStruct();
-
- tcl.resultf("%d", index);
- return (TCL_OK);
- }
- }
- return (Agent::command(argc, argv));
- }
- void
- PushbackAgent::reportDrop(int qid, Packet * p) {
- if (!checkQID(qid)) {
- sprintf(prnMsg,"Got invalid qid %dn", qid);
- printMsg(prnMsg,0);
- exit(-1);
- }
-
- hdr_ip * iph = hdr_ip::access(p);
- ns_addr_t src = iph->src();
- ns_addr_t dst = iph->dst();
- int fid = iph->flowid();
- sprintf(prnMsg,"DropDetails from queue %d: %d.%d -> %d.%d (%d)n", qid,
- src.addr_, src.port_, dst.addr_, dst.port_, fid);
- printMsg(prnMsg, 5);
-
- queue_list_[qid].idTree_->registerDrop(p);
- }
- void
- PushbackAgent::calculateLowerBound(int qid, double arrRate) {
-
- if (!checkQID(qid)) {
- sprintf(prnMsg, "Got invalid id from queue in identifyAggregaten");
- printMsg(prnMsg,0);
- exit(-1);
- }
- AggReturn * aggReturn = queue_list_[qid].idTree_->calculateLowerBound();
- if (aggReturn == NULL) {
- //not sure what to do here.
- //maybe lower bound should be left as it is
-
- return;
- }
- double lowerBound = 0;
- int i = 0;
- for (; i <= aggReturn->finalIndex_; i++) {
- cluster currCluster = aggReturn->clusterList_[i];
- AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
- RateLimitSession * rls1 =
- queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
- if (rls1 !=NULL) continue;
- lowerBound = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
- sprintf(prnMsg, "LB: count: %d totalCount_: %d arrRate: %gn", currCluster.count_, aggReturn->totalCount_, arrRate);
- printMsg(prnMsg,0);
- break;
- }
-
- if (i == aggReturn->finalIndex_+1) {
- sprintf(prnMsg, "Warning: All clusters being rate limitedn");
- printMsg(prnMsg,0);
- //exit(-1);
- }
- queue_list_[qid].idTree_->setLowerBound(lowerBound, 1);
-
- delete(aggReturn);
- }
- void
- PushbackAgent::identifyAggregate(int qid, double arrRate, double linkBW) {
- //set up refresh timer for this queue, if this is the firstime you come here.
- if (!timer_->containsRefresh(qid)) {
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
- timer_->insert(event);
- }
- // if (debug_)
- sprintf(prnMsg, "identifyAggregate for %dn", qid);
- printMsg(prnMsg,0);
- if (!checkQID(qid)) {
- sprintf(prnMsg, "Got invalid id from queue in identifyAggregaten");
- printMsg(prnMsg,0);
- exit(-1);
- }
- if (verbose_) queue_list_[qid].idTree_->traverse();
- //this is a quick way of achieving this.
- //but it can be justified on some grounds. will do a check with Sally later.
- int noSessions = queue_list_[qid].pbq_->rlsList_->noMySessions(node_->nodeid());
- // if (noSessions >= MAX_SESSIONS) {
- // sprintf(prnMsg, "My hands are fulln");
- // printMsg(prnMsg,0);
- // return;
- // }
-
- AggReturn * aggReturn = queue_list_[qid].idTree_->identifyAggregate(arrRate, linkBW);
- if (aggReturn == NULL) return;
- for (int i=0; i<=aggReturn->finalIndex_; i++) {
- cluster currCluster = aggReturn->clusterList_[i];
- AggSpec * aggSpec = new AggSpec(1, currCluster.prefix_, currCluster.bits_);
-
- //don't insert the same aggregate again.
- RateLimitSession * rls1 =
- queue_list_[qid].pbq_->rlsList_->containsLocalAggSpec(aggSpec, node_->nodeid());
- if (rls1 != NULL) {
- sprintf(prnMsg, "got subset aggregate. Lowerbound = %g. agg = ", aggReturn->limit_);
- printMsg(prnMsg,0);
- aggSpec->print(); fflush(stdout);
- delete(aggSpec);
- //this could keep the lowerbound unnecessarily down.
- //but don't be sympathetic with aggregates, which have been identified again.
- if (aggReturn->limit_ < rls1->lowerBound_) {
- rls1->lowerBound_ = aggReturn->limit_;
- }
- //set the last misbehavior signal.
- rls1->refreshed();
- continue;
- }
-
- double estimate = (currCluster.count_)*(arrRate/aggReturn->totalCount_);
-
- if (noSessions >= MAX_SESSIONS) {
- int rank = queue_list_[qid].pbq_->rlsList_->rankRate(node_->nodeid(), estimate);
- if (rank >= MAX_SESSIONS) {
- sprintf(prnMsg, "got rate <= minRate. agg = ");
- printMsg(prnMsg,0);aggSpec->print(); fflush(stdout);
- delete(aggSpec);
- continue;
- }
- }
-
- sprintf(prnMsg, "starting rate-limiting lower=%g estimate=%g agg ",
- aggReturn->limit_, estimate);
- printMsg(prnMsg,0);
- aggSpec->print(); fflush(stdout);
-
- double initialLimit = estimate; //*(1 - ambientDropRate);
- RateLimitSession * rls = new RateLimitSession(aggSpec, estimate, 1, initialLimit,
- node_->nodeid(), qid,
- RATE_LIMIT_TIME_DEFAULT, aggReturn->limit_,
- node_, rtLogic_);
- queue_list_[qid].pbq_->rlsList_->insert(rls);
-
- PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
- timer_->insert(event);
- // }
- noSessions++;
- }
- queue_list_[qid].idTree_->setLowerBound(aggReturn->limit_, 0);
- delete(aggReturn);
- }
- void
- PushbackAgent::resetDropLog(int qid) {
- sprintf(prnMsg, " drop log reset for qid %dn", qid);
- printMsg(prnMsg,5);
-
- if (!checkQID(qid)) {
- printf("Got invalid id from queue in resetDropLogn");
- exit(-1);
- }
- queue_list_[qid].idTree_->reset();
- }
- void
- PushbackAgent::timeout(PushbackEvent * event) {
-
- sprintf(prnMsg, " %s event for qid %dn", PushbackEvent::type(event), event->qid_);
- printMsg(prnMsg,0);
- switch (event->eventID_) {
- case PUSHBACK_CHECK_EVENT: pushbackCheck(event->rls_);
- break;
- case PUSHBACK_REFRESH_EVENT: pushbackRefresh(event->qid_);
- break;
- case PUSHBACK_STATUS_EVENT: pushbackStatus(event->rls_);
- break;
- case INITIAL_UPDATE_EVENT: initialUpdate(event->rls_);
- break;
- default: sprintf(prnMsg, " Unrecognized event %dn", event->eventID_);
- printMsg(prnMsg,0);
- break;
- }
- }
- void
- PushbackAgent::initialUpdate(RateLimitSession * rls) {
-
- if ( !rls->initialPhase_ ) {
- sprintf(prnMsg, " Error: Update when not in initialphasen");
- printMsg(prnMsg,0);
- exit(-1);
- }
- double qdrop = queue_list_[rls->localQID_].pbq_->getDropRate();
- double dropRate = rls->getDropRate();
- double arrRate = rls->getArrivalRateForStatus();
- double newLimit = arrRate*(1 - 2*(dropRate+qdrop));
-
- sprintf(prnMsg,"Initial-Update: qdrop=%g dr=%g newL=%g oldTarget=%g lowerBound=%g arr=%gn",
- qdrop, dropRate, newLimit, rls->rlStrategy_->target_rate_, rls->lowerBound_, arrRate);
- printMsg(prnMsg,0);
- //cancel right now, if arrRate is significantly less than lower bound.
- if (arrRate < 0.75*rls->lowerBound_) {
- #ifdef DEBUG
- double now = Scheduler::instance().clock();
- printf("Cancel pushback A time: %5.3fn", now);
- #endif
- pushbackCancel(rls);
- return;
- }
- if (newLimit > rls->lowerBound_) {
- rls->setLimit(newLimit);
-
- PushbackEvent * event = new PushbackEvent(INITIAL_UPDATE_TIME, INITIAL_UPDATE_EVENT, rls);
- timer_->insert(event);
- }
- else {
- rls->setLimit(rls->lowerBound_);
- rls->initialPhase_ = 0;
-
- if (rls->logData_->count_!=0 && enable_pushback_) {
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
- timer_->insert(event);
- }
- }
- }
- void
- PushbackAgent::pushbackCheck(RateLimitSession * rls) {
-
- double dropRate = rls->getDropRate();
- if (dropRate >= DROP_RATE_FOR_PUSHBACK) {
- rls->pushbackOn();
- rls->heightInPTree_++;
-
- double totalRate = rls->rlStrategy_->target_rate_;
- int count = rls->logData_->count_;
- double fairShare = totalRate/count;
- int done = count;
-
- //max-min allocation of limit.
- while (done != 0) {
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- int countThisRound=0;
- while (lgdsNode != NULL) {
- double rate = lgdsNode->rateEstimator_->estRate_;
- if (rate <= fairShare && !lgdsNode->pushbackSent_) {
- AggSpec * aggSpec = rls->aggSpec_->clone();
- PushbackMessage * msg;
- if (rate < fairShare/2.0) {
- msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
- rls->localID_, aggSpec, INFINITE_LIMIT,
- rls->depthInPTree_);
- lgdsNode->pushbackSent(INFINITE_LIMIT, rate);
- }
- else {
- msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
- rls->localID_, aggSpec, rate, rls->depthInPTree_);
- lgdsNode->pushbackSent(rate, rate);
- }
- sendMsg(msg);
- countThisRound++;
- done--;
- totalRate -= rate;
- }
- lgdsNode = lgdsNode->next_;
- }
- if (done == 0) break;
- if (countThisRound==0) {
- //allocate fairshare to everyone and end.
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- while (lgdsNode != NULL) {
- if (!lgdsNode->pushbackSent_) {
- AggSpec * aggSpec = rls->aggSpec_->clone();
- PushbackMessage * msg = new PushbackRequestMessage(node_->nodeid(), lgdsNode->nid_,
- rls->localQID_, rls->localID_,
- aggSpec, fairShare,
- rls->depthInPTree_);
- lgdsNode->pushbackSent(fairShare,lgdsNode->rateEstimator_->estRate_);
- sendMsg(msg);
- done--;
- totalRate-=fairShare;
- }
- lgdsNode = lgdsNode->next_;
- }
- }
- else {
- fairShare= totalRate/done;
- }
- }
-
- }
- else {
- //set up pushback check for later.
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
- timer_->insert(event);
- }
- }
- void
- PushbackAgent::pushbackStatus(RateLimitSession * rls) {
- if (rls->pushbackON_) {
- sprintf(prnMsg, " Warning: status timer expired for non-leaf noden");
- printMsg(prnMsg,0);
- //exit(-1);
- }
- double rate = rls->getArrivalRateForStatus();
- rls->logData_->resetStatus();
-
- PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
- rls->remoteQID_, rls->remoteID_,
- rate, rls->heightInPTree_);
- sendMsg(msg);
- }
- void
- PushbackAgent::pushbackRefresh(int qid) {
-
- PushbackQueue * pbq = queue_list_[qid].pbq_;
- int oldSessions = pbq->rlsList_->noMySessions(node_->nodeid());
- if (!oldSessions) {
- //set up refresh timers for a later time and return.
- // PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
- // timer_->insert(event);
- return;
- }
-
- int noSessions = oldSessions;
- if (MERGER_MODE == 1) {
- pbq->rlsList_->mergeSessions(node_->nodeid());
- noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
-
- if (noSessions!=oldSessions) {
- sprintf(prnMsg, " Some sessions merged. old = %d new = %dn", oldSessions, noSessions);
- printMsg(prnMsg,0);
- //get rid of merged RLS's
- RateLimitSession * listItem = pbq->rlsList_->first_;
- while (listItem != NULL) {
- if (listItem->origin_ == node_->nodeid() && listItem->merged_) {
- pushbackCancel(listItem);
- listItem = listItem->next_;
-
- }
- }
- } else {
- sprintf(prnMsg, " No sessions merged. number = %dn", noSessions);
- printMsg(prnMsg,0);
- }
- } else {
- sprintf(prnMsg, "Number of sessions = %dn", noSessions);
- printMsg(prnMsg,0);
- }
- double now = Scheduler::instance().clock();
-
- //check if some sessions need to be discarded because of rate-limiting too many sessions
- RateLimitSession * listItem1 = pbq->rlsList_->first_;
- while (noSessions > MAX_SESSIONS && listItem1 != NULL) {
- int rank = pbq->rlsList_->rankRate(node_->nodeid(), listItem1->getArrivalRateForStatus());
- if (listItem1->origin_ == node_->nodeid() &&
- rank >= MAX_SESSIONS && (now - listItem1->startTime_) >= EARLIEST_TIME_TO_FREE) {
- sprintf(prnMsg,"Releasing because of too many being rate-limitedn");
- printMsg(prnMsg,0);
- if (LOWER_BOUND_MODE == 1 &&
- queue_list_[qid].idTree_->lowerBound_ < listItem1->getArrivalRateForStatus()) {
- queue_list_[qid].idTree_->lowerBound_ = listItem1->getArrivalRateForStatus();
- }
- pushbackCancel(listItem1);
- noSessions--;
- }
- listItem1 = listItem1->next_;
- }
-
- double linkBW = pbq->getBW();
- double arrRate = pbq->getRate();
- double targetRate = linkBW/(1 - TARGET_DROPRATE);
-
- double totalRateLimitedArrivalRate = 0;
- double totalLimit=0;
- double lowerBound=-1;
- RateLimitSession * listItem = pbq->rlsList_->first_;
- while (listItem != NULL) {
- if (listItem->origin_ == node_->nodeid() && !listItem->merged_) {
- double sessionArrRate = listItem->getArrivalRateForStatus();
- double sessionLimit = listItem->rlStrategy_->target_rate_;
- totalRateLimitedArrivalRate+= sessionArrRate;
- totalLimit+= (sessionArrRate > sessionLimit)? sessionLimit: sessionArrRate;
- if (listItem->lowerBound_ < lowerBound || lowerBound == -1) {
- lowerBound = listItem->lowerBound_;
- }
- }
- listItem = listItem->next_;
- }
- if (LOWER_BOUND_MODE == 1) {
- lowerBound = queue_list_[qid].idTree_->lowerBound_;
- }
- double excessRate = (arrRate - totalLimit + totalRateLimitedArrivalRate) - targetRate;
-
- sprintf(prnMsg,"arr=%g totalLimit=%g totalRateLimit=%g excess=%gn", arrRate, totalLimit,
- totalRateLimitedArrivalRate, excessRate);
- printMsg(prnMsg,0);
-
- if (excessRate < 0) {
- sprintf(prnMsg, "Negative Excess Rate. Things maybe fine now.n");
- printMsg(prnMsg,0);
- //this would make all sessions go away after a while.
- #ifdef DEBUG
- printf("Negative Excess Rate - time: %5.3fn", now);
- #endif
- requiredLimit_ = 2*totalRateLimitedArrivalRate;
- } else {
- //Should we allow such an abrupt increase when the number of sessions
- // changes?
- // How about: Let L be the requiredLimit.
- // We need Sum (session arrival rate - L ) = excessRate
- requiredLimit_ = (totalRateLimitedArrivalRate - excessRate)/noSessions;
- if (requiredLimit_ < lowerBound) {
- requiredLimit_ = lowerBound;
- }
- #ifdef DEBUG
- printf("New requiredLimit - time: %5.3f limit: %5.3f lowerBound:%5.3f n", now, requiredLimit_, lowerBound);
- #endif
- }
- sprintf(prnMsg,"Refresh. target=%g limit=%g floor=%gn", targetRate, requiredLimit_,
- lowerBound);
- printMsg(prnMsg,0);
- //consider all sessions in ascending order of their arrival rate
- for (int i=0; i<noSessions; i++) {
- listItem = pbq->rlsList_->first_;
- while (listItem != NULL ) {
- if (listItem->origin_ == node_->nodeid() &&
- pbq->rlsList_->rankSession(node_->nodeid(),listItem) == i)
- break;
- listItem = listItem->next_;
- }
- if (listItem == NULL) {
- printf("Error: Rank %d not foundn", i);
- exit(0);
- }
-
- double oldLimit = listItem->rlStrategy_->target_rate_;
- double sendRate = listItem->getArrivalRateForStatus();
- #ifdef DEBUG
- printf("time: %5.3f ID: %d sendRate %5.3f oldLimit %5.3f requiredLimit %5.3fn", now,
- listItem->localID_, sendRate, oldLimit, requiredLimit_);
- #endif
- //Session sending less than the limit.
- if (sendRate < requiredLimit_) {
- //if it has been sending less for "some" time.
- if (now - listItem->refreshTime_ >= MIN_TIME_TO_FREE) {
- #ifdef DEBUG
- printf("time: %5.3f ID: %d refreshTime %5.3f MIN %d Cancel pushback B n",
- now, listItem->localID_, listItem->refreshTime_, MIN_TIME_TO_FREE);
- #endif
- pushbackCancel(listItem); //cancel rate-limiting
- requiredLimit_+= (requiredLimit_ - sendRate)/(noSessions - i - 1);
- i--; noSessions--;
- }
- else {
- //refresh upstream with double of max(sending rate, old limit)
- //just using sending rate, limits the amount an aggregate can grow till next refresh
- //using just old limit is tricky when different aggregates have different limits.
- //at the same time, we would prefer not to loosen the hold too much in one step.
- #ifdef DEBUG
- printf("time: %5.3f ID: %d double limitn", now, listItem->localID_);
- #endif
- double maxR = sendRate>oldLimit? sendRate: oldLimit;
- if (now - listItem->refreshTime_ <= PRIMARY_WAITING_ZONE) {
- sprintf(prnMsg,"Waiting Zone 1: sendRate=%g oldLimit=%gn", sendRate, oldLimit);
- printMsg(prnMsg,0);
- }
- else {
- sprintf(prnMsg,"Waiting Zone 2: sendRate=%g oldLimit=%gn", sendRate, oldLimit);
- printMsg(prnMsg,0);
- maxR *= 1.5;
- }
- if (maxR < requiredLimit_) {
- listItem->setLimit(maxR);
- requiredLimit_ += (requiredLimit_ - maxR)/(noSessions - i - 1);
- }
- else {
- listItem->setLimit(requiredLimit_);
- }
-
- if (listItem->pushbackON_)
- refreshUpstreamLimits(listItem);
- }
- }
- else {
- //change the rate limit most half way.
- double newLimit;
- if (oldLimit > 1.25 * requiredLimit_ || oldLimit ==0)
- newLimit = requiredLimit_;
- else
- newLimit = 0.5*requiredLimit_ + 0.5*oldLimit;
-
- if (newLimit < lowerBound)
- newLimit = lowerBound;
-
- listItem->refreshed();
- listItem->setLimit(newLimit);
- if (listItem->pushbackON_)
- refreshUpstreamLimits(listItem);
- #ifdef DEBUG
- printf("time: %5.3f ID: %d newLimit %5.3f oldLimit %5.3f requiredLimit %5.3fn",
- now, listItem->localID_, newLimit, oldLimit, requiredLimit_);
- #endif
- }
- }
-
- //setup refresh timer again
- noSessions = pbq->rlsList_->noMySessions(node_->nodeid());
- if (noSessions) {
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME, PUSHBACK_REFRESH_EVENT, qid);
- timer_->insert(event);
- }
- }
- void
- PushbackAgent::pushbackCancel(RateLimitSession * rls) {
-
- sprintf(prnMsg,"Stopping rate-limiting for aggregate: ");
- printMsg(prnMsg,0);
- rls->aggSpec_->print();
- fflush(stdout);
- #ifdef DEBUG
- double now = Scheduler::instance().clock();
- printf("time: %5.3f ID: %d Cancel pushback Cn", now, rls->localID_);
- #endif
- if (rls->pushbackON_) {
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- while (lgdsNode != NULL) {
- PushbackMessage * msg = new PushbackCancelMessage(node_->nodeid(), lgdsNode->nid_,
- rls->localQID_, rls->localID_);
- sendMsg(msg);
- lgdsNode = lgdsNode->next_;
- }
- }
-
- //remove all events that point to this rls.
- timer_->removeEvents(rls);
- //local cancellation here.
- queue_list_[rls->localQID_].pbq_->rlsList_->endSession(rls);
-
- }
- //######################## Message Receiving Code #####################
- void
- PushbackAgent::recv(Packet * pkt, Handler * h) {
-
- hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
- PushbackMessage * msg = hdr_push->msg_;
-
- sprintf(prnMsg, " %s msg from %dn", PushbackMessage::type(msg), msg->senderID_);
- printMsg(prnMsg,0);
-
- switch (msg->msgID_) {
- case PUSHBACK_REQUEST_MSG : processPushbackRequest((PushbackRequestMessage *)msg);
- break;
- case PUSHBACK_STATUS_MSG : processPushbackStatus((PushbackStatusMessage *) msg);
- break;
- case PUSHBACK_REFRESH_MSG : processPushbackRefresh((PushbackRefreshMessage *) msg);
- break;
- case PUSHBACK_CANCEL_MSG : processPushbackCancel((PushbackCancelMessage *) msg);
- break;
- default: fprintf(stderr,"PBA: %s Undefined Message ID %dn", name(),msg->msgID_);
- }
-
- delete(msg);
- }
- void
- PushbackAgent::processPushbackRequest(PushbackRequestMessage * msg) {
-
- int qid = getQID(msg->senderID_);
- sprintf(prnMsg, " pushback request from %d for qid=%d limit=%gn", msg->senderID_,
- qid, msg->limit_);
- printMsg(prnMsg,0);
-
- AggSpec * aggSpec = msg->aggSpec_;
- if (queue_list_[qid].pbq_->rlsList_->containsAggSpec(aggSpec)) {
- fprintf(stdout,"PBA: %s got a pushback req for agg I already rate-limit.
- Feature not yet Implementedn",name());
- exit(-1);
- }
-
- RateLimitSession * rls = new RateLimitSession(aggSpec, msg->limit_, msg->senderID_, qid,
- msg->qid_, msg->rlsID_, msg->depth_+1,
- RATE_LIMIT_TIME_DEFAULT, -1, node_, rtLogic_);
- queue_list_[qid].pbq_->rlsList_->insert(rls);
- //pushback propagation check if there are valid upstream neighbors && enable_pushback_
- if (rls->logData_->count_ && enable_pushback_) {
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CHECK_TIME, PUSHBACK_CHECK_EVENT, rls);
- timer_->insert(event);
- }
- }
- void
- PushbackAgent::processPushbackStatus(PushbackStatusMessage * msg) {
- int qid = msg->qid_;
- if (!checkQID(qid)) {
- sprintf(prnMsg, " Got invalid qid from %d in status messagen", msg->senderID_);
- printMsg(prnMsg,0);
- exit(-1);
- }
- RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByLocalID(msg->rlsID_);
-
- if (rls == NULL) {
- sprintf(prnMsg, " session %d not foundn", msg->rlsID_);
- printMsg(prnMsg,0);
- exit(-1);
- }
-
- //increase your height if you need to.
- if (msg->height_ + 1 > rls->heightInPTree_) {
- rls->heightInPTree_ = msg->height_ + 1;
- sprintf(prnMsg, " height increased to %dn", rls->heightInPTree_);
- printMsg(prnMsg,0);
- }
- rls->logData_->registerStatus(msg->senderID_, msg->arrivalRate_);
- sprintf(prnMsg, " got rate %gn", msg->arrivalRate_);
- printMsg(prnMsg,0);
- //send status if you are not root.
- if (rls->origin_!= node_->nodeid()) {
- // 1. check to see if status from all the upstream neighbors has arrived.
- // 2. if yes, then send status downstream.
- int gotAll = rls->logData_->consolidateStatus();
- if (gotAll==1) {
- //send status down
- double rate = rls->logData_->statusArrivalRateAll_;
- PushbackMessage * msg = new PushbackStatusMessage(node_->nodeid(), rls->origin_,
- rls->remoteQID_, rls->remoteID_,
- rate, rls->heightInPTree_);
- sendMsg(msg);
- timer_->cancelStatus(rls);
- //reset status arrivals.
- rls->logData_->resetStatus();
- }
- }
- }
- void
- PushbackAgent::processPushbackRefresh(PushbackRefreshMessage *msg) {
- int qid = getQID(msg->senderID_);
- sprintf(prnMsg, " pushback refresh from %d for qid=%d with limit=%gn", msg->senderID_, qid, msg->limit_);
- printMsg(prnMsg,0);
- RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
-
- if (rls == NULL) {
- sprintf(prnMsg, " session %d not foundn", msg->rlsID_);
- printMsg(prnMsg,0);
- exit(-1);
- }
- //1. change your own rate limit
- rls->setAggSpec(msg->aggSpec_);
- delete(msg->aggSpec_);
- double newLimit = msg->limit_;
- rls->setLimit(newLimit);
- //2. if pushback has been propagated send out refreshes upstream with new limits
- if (rls->pushbackON_) {
- refreshUpstreamLimits(rls);
- }
-
- //3. set up status timer.
- PushbackEvent * event = new PushbackEvent(PUSHBACK_CYCLE_TIME - 0.1*rls->depthInPTree_,
- PUSHBACK_STATUS_EVENT, rls);
- timer_->insert(event);
- }
- void
- PushbackAgent::processPushbackCancel(PushbackCancelMessage *msg) {
-
- int qid = getQID(msg->senderID_);
- sprintf(prnMsg, " pushback cancel from %d for queue index %dn", msg->senderID_, qid);
- printMsg(prnMsg,0);
- RateLimitSession * rls = queue_list_[qid].pbq_->rlsList_->getSessionByRemoteID(msg->rlsID_);
-
- if (rls == NULL) {
- sprintf(prnMsg, " session %d not foundn", msg->rlsID_);
- printMsg(prnMsg,0);
- exit(-1);
- }
- pushbackCancel(rls);
- }
- void
- PushbackAgent::refreshUpstreamLimits(RateLimitSession * rls) {
- double totalRate = rls->rlStrategy_->target_rate_;
- int count = rls->logData_->count_;
- double fairShare = totalRate/count;
- int done = count;
- double arrRate = rls->getArrivalRateForStatus();
- sprintf(prnMsg, "Sending refresh messages to %d nodes. Limit = %g arrRate = %gn", count, totalRate, arrRate);
- printMsg(prnMsg,0);
-
- int excess = 0;
- if (totalRate > arrRate) {
- excess = 1;
- }
- //max-min allocation of limit.
- while (done != 0) {
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- int countThisRound=0;
- while (lgdsNode != NULL) {
- double rate;
- rate = lgdsNode->statusArrivalRate_;
- if (rate <= fairShare && !lgdsNode->sentRefresh_) {
- AggSpec * aggSpec = rls->aggSpec_->clone();
- PushbackMessage * msg;
- if (rate < fairShare/2.0) {
- msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
- rls->localID_, aggSpec, INFINITE_LIMIT);
- lgdsNode->sentRefresh(INFINITE_LIMIT);
- }
- else if (!excess) {
- msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
- rls->localID_, aggSpec, rate);
- lgdsNode->sentRefresh(rate);
- }
- else {
- msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_, rls->localQID_,
- rls->localID_, aggSpec, fairShare);
- lgdsNode->sentRefresh(fairShare);
- rate = fairShare;
- }
- sendMsg(msg);
- countThisRound++;
- done--;
- totalRate -= rate;
- }
- lgdsNode = lgdsNode->next_;
- }
- if (done == 0) break;
- if (countThisRound==0) {
- //allocate fairshare to everyone and end.
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- while (lgdsNode != NULL) {
- if (!lgdsNode->sentRefresh_) {
- AggSpec * aggSpec = rls->aggSpec_->clone();
- PushbackMessage * msg = new PushbackRefreshMessage(node_->nodeid(), lgdsNode->nid_,
- rls->localQID_, rls->localID_,
- aggSpec, fairShare);
- lgdsNode->sentRefresh(fairShare);
- sendMsg(msg);
- done--;
- totalRate-=fairShare;
- }
- lgdsNode = lgdsNode->next_;
- }
- }
- else {
- fairShare = totalRate/done;
- }
- }
-
- //reset all the sentRefresh bits
- LoggingDataStructNode * lgdsNode = rls->logData_->first_;
- while (lgdsNode != NULL) {
- lgdsNode->sentRefresh_ = 0;
- lgdsNode = lgdsNode->next_;
- }
- }
- int
- PushbackAgent::getQID(int sender) {
-
- Tcl& tcl = Tcl::instance();
- intResult_ = -1;
- int index = 0;
- // there gotta be better ways of doing this; todoLater.
- // like make Tcl call you back and set a variable using command.
- for (; index <last_index_; index++) {
- tcl.evalf("%s set intResult_ [%s check-queue %d %d %s]", name(), name(),
- node_->nodeid(), sender , queue_list_[index].pbq_->name());
- if (intResult_ == 1) break;
- }
-
- if (index == last_index_) {
- sprintf(prnMsg, " right queue not foundn");
- printMsg(prnMsg,0);
- exit(-1);
- }
- return index;
- }
- void
- PushbackAgent::sendMsg(PushbackMessage * msg) {
-
- Tcl& tcl = Tcl::instance();
-
- dst_.addr_ = msg->targetID_;
- //this assumes that all pushback agents have port zero.
-
- tcl.evalf("%s set intResult_ [%s get-pba-port %d]", name(), name(),dst_.addr_ );
- if ( intResult_ == -1 ) {
- fprintf(stderr,"PBA: %s Pushback Agent not found on Node %dn", name(), dst_.addr_);
- return;
- }
- dst_.port_ = intResult_;
- Packet *pkt = allocpkt();
- hdr_pushback * hdr_push = ((hdr_pushback*)pkt)->access(pkt);
- hdr_push->msg_ = msg;
-
- sprintf(prnMsg, " sent %s message to %d.%dn", PushbackMessage::type(msg), dst_.addr_, dst_.port_);
- printMsg(prnMsg,4);
- send(pkt,0);
- }
- void
- PushbackAgent::printMsg(char * msg, int msgLevel) {
-
- if (msgLevel < debugLevel) {
- if (verbose_) printf("PBA:%d (%g) %s", node_->nodeid(), Scheduler::instance().clock(), msg);
- fflush(stdout);
- }
- }
- int
- PushbackAgent::checkQID(int qid) {
- if (qid < 0 || qid >= last_index_)
- return 0;
- else
- return 1;
- }
- //decide whether to accept a merger involving "count" aggregates,
- //the number of bits in the resultant aggregate would be "bits"
- //the aggregate is being broadended by "bitsDiff" (measured from shortest prefix)
- int
- PushbackAgent::mergerAccept(int count, int bits, int bitsDiff) {
-
- //todo: think of a smarter way.
- //currently merge if bits < some value.
- //return (bits <= MIN_BITS_FOR_MERGER);
- return 0;
- }
-
- // ############################### PushbackTimer Methods ############################
- void
- PushbackTimer::expire(Event *e) {
-
- if (firstEvent_ == NULL) {
- printf("PushbackTimer: No event found on expiryn");
- exit(-1);
- }
-
- PushbackEvent * event = firstEvent_;
- firstEvent_= firstEvent_->next_;
- schedule();
- agent_->timeout(event);
- delete(event);
- }
-
- void
- PushbackTimer::insert(PushbackEvent * event) {
- sprintf(agent_->prnMsg,"%s timer setn", PushbackEvent::type(event));
- agent_->printMsg(agent_->prnMsg,4);
- if (firstEvent_ == NULL) {
- firstEvent_ = event;
- schedule();
- return;
- }
- if (event->time_ < firstEvent_->time_) {
- event->setSucc(firstEvent_);
- firstEvent_=event;
- schedule();
- return;
- }
- PushbackEvent * listItem = firstEvent_;
- while (listItem->next_!=NULL && listItem->next_->time_ <= event->time_) {
- listItem = listItem->next_;
- }
-
- event->setSucc(listItem->next_);
- listItem->setSucc(event);
-
- //comment the sanity check out later
- sanityCheck();
-
- return;
- }
- void
- PushbackTimer::removeEvents(RateLimitSession * rls) {
- if (firstEvent_==NULL) return;
- while (firstEvent_!= NULL && firstEvent_->rls_==rls) {
- cancel();
- PushbackEvent * event = firstEvent_;
- firstEvent_=firstEvent_->next_;
- delete(event);
- schedule();
- }
- if (firstEvent_==NULL) return;
- PushbackEvent * previous = firstEvent_;
- PushbackEvent * current = firstEvent_->next_;
- while (current!=NULL) {
- if (current->rls_==rls) {
- previous->next_=current->next_;
- delete(current);
- current = previous->next_;
- continue;
- }
- previous=current;
- current=current->next_;
- }
- }
-
- void
- PushbackTimer::schedule() {
- if (firstEvent_== NULL) {
- sprintf(agent_->prnMsg,"Timer: Nothing to schedulen");
- agent_->printMsg(agent_->prnMsg, 0);
- return;
- }
- resched(firstEvent_->time_ - Scheduler::instance().clock());
- }
- void
- PushbackTimer::cancelStatus(RateLimitSession * rls) {
-
- if (firstEvent_==NULL) {
- sprintf(agent_->prnMsg, " Error timer list emptyn");
- agent_->printMsg(agent_->prnMsg, 0);
- //return;
- exit(-1);
- }
-
- if (firstEvent_->eventID_==PUSHBACK_STATUS_EVENT && firstEvent_->rls_==rls) {
- cancel();
- PushbackEvent * event = firstEvent_;
- firstEvent_=firstEvent_->next_;
- delete(event);
- schedule();
- return;
- }
-
- PushbackEvent * previous = firstEvent_;
- PushbackEvent * current = firstEvent_->next_;
-
- while (current!=NULL) {
- if (current->eventID_ == PUSHBACK_STATUS_EVENT && current->rls_==rls) {
- previous->next_=current->next_;
- delete(current);
- return;
- }
- previous=current;
- current=current->next_;
- }
- sprintf(agent_->prnMsg, "Error status timer not foundn");
- agent_->printMsg(agent_->prnMsg, 0);
- exit(-1);
- }
- int
- PushbackTimer::containsRefresh(int qid) {
- PushbackEvent * listItem = firstEvent_;
- while (listItem!=NULL) {
- if (listItem->eventID_ == PUSHBACK_REFRESH_EVENT && listItem->qid_==qid)
- return 1;
- listItem = listItem->next_;
- }
- return 0;
- }
- void
- PushbackTimer::sanityCheck() {
- if (firstEvent_==NULL || firstEvent_->next_ == NULL) return;
-
- PushbackEvent * listItem = firstEvent_;
- while (listItem->next_!=NULL) {
- if (listItem->time_ > listItem->next_->time_) {
- sprintf(agent_->prnMsg, "Sanity Check Failedn");
- agent_->printMsg(agent_->prnMsg, 0);
- exit(-1);
- }
- listItem = listItem->next_;
- }
- }
-