rate-limit.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:14k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* -*-  Mode:C++; c-basic-offset:4; tab-width:4; indent-tabs-mode:t -*- */
  2. /*
  3.  * Copyright (c) 2000  International Computer Science Institute
  4.  * All rights reserved.
  5.  *
  6.  * Redistribution and use in source and binary forms, with or without
  7.  * modification, are permitted provided that the following conditions
  8.  * are met:
  9.  * 1. Redistributions of source code must retain the above copyright
  10.  *    notice, this list of conditions and the following disclaimer.
  11.  * 2. Redistributions in binary form must reproduce the above copyright
  12.  *    notice, this list of conditions and the following disclaimer in the
  13.  *    documentation and/or other materials provided with the distribution.
  14.  * 3. All advertising materials mentioning features or use of this software
  15.  *    must display the following acknowledgement:
  16.  * This product includes software developed by ACIRI, the AT&T 
  17.  *      Center for Internet Research at ICSI (the International Computer
  18.  *      Science Institute).
  19.  * 4. Neither the name of ACIRI nor of ICSI may be used
  20.  *    to endorse or promote products derived from this software without
  21.  *    specific prior written permission.
  22.  *
  23.  * THIS SOFTWARE IS PROVIDED BY ICSI AND CONTRIBUTORS ``AS IS'' AND
  24.  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  25.  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  26.  * ARE DISCLAIMED.  IN NO EVENT SHALL ICSI OR CONTRIBUTORS BE LIABLE
  27.  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  28.  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  29.  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  30.  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  31.  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  32.  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  33.  * SUCH DAMAGE.
  34.  *
  35.  */
  36. #include "rate-limit.h"
  37. #include "random.h"
  38. #include "ident-tree.h"
  39. #include "pushback.h"
  40. // ######################### RateLmitSessionList Methods #####################
  41. int
  42. RateLimitSessionList::filter(Packet * pkt, int lowDemand) {
  43.   RateLimitSession * next = first_;
  44.   double dropP = -1;
  45.   
  46.   while (next != NULL) {
  47.     double p = next->log(pkt, lowDemand);
  48.     if (p >= dropP) dropP = p;
  49.     next = next->next_;
  50.   }
  51. #ifdef DEBUG_RLSL
  52.   if (dropP == -1) {
  53.     printf("RLSList: Found a non-member packet at %gn", Scheduler::instance().clock());
  54.     fflush(stdout);
  55.   }
  56. #endif
  57.   double u = Random::uniform();
  58.   if (u <= dropP) {
  59. #ifdef DEBUG_RLSL
  60.     printf("RLSList:%d Dropping Packet in filter. n", first_->logData_->myID_);
  61.     fflush(stdout);
  62. #endif
  63.    return 1;
  64.   }
  65.   
  66.   //not dropped 
  67.   return 0;
  68. }
  69. int 
  70. RateLimitSessionList::insert(RateLimitSession * session) {
  71.   RateLimitSession * listItem = first_;
  72.   
  73.   while (listItem != NULL) {
  74.     if ( listItem->aggSpec_->equals(session->aggSpec_) ) {
  75.       return 0;
  76.     }
  77.     listItem = listItem->next_;
  78.   }
  79.   
  80.   session->setSucc(first_);
  81.   first_ = session;
  82.   session->localID_ = IDCounter_++;
  83.   noSessions_++;
  84.   return 1;
  85. }
  86.   
  87. RateLimitSession *
  88. RateLimitSessionList::containsLocalAggSpec(AggSpec * spec, int myID) {
  89.   RateLimitSession * listItem = first_;
  90.   while (listItem != NULL) {
  91.     if ( listItem->origin_== myID ) {
  92.       if ( listItem->aggSpec_->contains(spec) ) {
  93. return listItem;
  94.       } 
  95.       //  else if ( spec->contains(listItem->aggSpec_) ) {
  96.       // return 2;
  97.       // }
  98.     }
  99.     listItem = listItem->next_;
  100.   }
  101.   return NULL;
  102. }
  103. int 
  104. RateLimitSessionList::containsAggSpec(AggSpec * spec) {
  105.   RateLimitSession * listItem = first_;
  106.   while (listItem != NULL) {
  107.     if ( listItem->aggSpec_->contains(spec) ) {
  108.       return 1;
  109.     }
  110.     listItem = listItem->next_;
  111.   }
  112.   return 0;
  113. }
  114. //merger will only take place for aggregates started at this node.
  115. void 
  116. RateLimitSessionList::mergeSessions(int myID) {
  117.   RateLimitSession * session1 = first_;
  118.   while (session1 != NULL) {
  119.     AggSpec * agg1 = session1->aggSpec_;
  120.     RateLimitSession * session2 = session1->next_;
  121.     while (session2 != NULL) {
  122.       AggSpec * agg2 = session2->aggSpec_;
  123.       if (session1->origin_== myID && session2->origin_ == myID &&
  124.   agg1->dstON_  && agg2->dstON_) {
  125. int bits = AggSpec::prefixBitsForMerger(agg1, agg2);
  126. if (bits==0) {
  127.     //a measure of how much are we broadening.
  128.     int bitsDiff = ((agg1->dstBits_<agg2->dstBits_)? agg1->dstBits_: agg2->dstBits_) - bits;
  129.     int prefix = PrefixTree::getPrefixBits(agg1->dstPrefix_, bits);
  130.     int count = getMySubsetCount(prefix, bits, myID);
  131.     if (count <2) {
  132.     printf("Error: Anomaly n");
  133.     exit(-1);
  134.     }
  135.     if (PushbackAgent::mergerAccept(count, bits, bitsDiff)) {
  136.     merge(prefix, bits, myID);
  137.     }
  138. }
  139.       }
  140.       session2 = session2->next_;
  141.     }
  142.     session1 = session1->next_;
  143.   }
  144. }
  145. void
  146. RateLimitSessionList::merge(int prefix, int bits, int myID) {
  147.   RateLimitSession * listItem = first_;
  148.   RateLimitSession * firstItem = NULL;
  149.   while (listItem != NULL) {
  150.     if ( listItem->origin_== myID && 
  151.  listItem->aggSpec_->subsetOfDst(prefix, bits) &&
  152.  !listItem->merged_) {
  153.       if (firstItem == NULL) {
  154. firstItem = listItem;
  155.       } else {
  156. //merge here with firstItem
  157. firstItem = RateLimitSession::merge(firstItem, listItem, bits);
  158.       }
  159.     }
  160.     listItem = listItem->next_;
  161.   }
  162.   if (firstItem == NULL) {
  163.     printf("Error: Anomaly no 2n");
  164.     exit(-1);
  165.   }
  166.   
  167.   firstItem->aggSpec_->expand(prefix, bits);
  168. }
  169.   
  170. int 
  171. RateLimitSessionList::getMySubsetCount(int prefix, int bits, int myID) {
  172.  RateLimitSession * listItem = first_;
  173.  int count=0;
  174.  while (listItem != NULL) {
  175.     if ( listItem->origin_== myID &&
  176.  listItem->aggSpec_->subsetOfDst(prefix, bits))
  177.       count++;
  178.     
  179.     listItem = listItem->next_;
  180.   }
  181.  return count;
  182. }
  183. int 
  184. RateLimitSessionList::noMySessions(int myID) {
  185.   RateLimitSession * listItem = first_;
  186.   int count=0;
  187.   while (listItem != NULL) {
  188.     if ( listItem->origin_==myID &&
  189.  !listItem->merged_ ) {
  190.       count++;
  191.     }
  192.     listItem = listItem->next_;
  193.   }
  194.   return count;
  195. }
  196.   
  197. RateLimitSession *
  198. RateLimitSessionList::getSessionByLocalID(int localID) {
  199.   
  200.   RateLimitSession * listItem = first_;
  201.   while (listItem != NULL) {
  202.     if (listItem->localID_ == localID) {
  203.       return listItem;
  204.     }
  205.     listItem = listItem->next_;
  206.   }
  207.   
  208.   return NULL;
  209. }
  210. RateLimitSession *
  211. RateLimitSessionList::getSessionByRemoteID(int remoteID) {
  212.   
  213.   RateLimitSession * listItem = first_;
  214.   while (listItem != NULL) {
  215.     if (listItem->remoteID_ == remoteID) {
  216.       return listItem;
  217.     }
  218.     listItem = listItem->next_;
  219.   }
  220.   
  221.   return NULL;
  222. }
  223. void 
  224. RateLimitSessionList::endSession(RateLimitSession * rls) {
  225.   
  226.   if (first_==NULL) {
  227.     printf("RLSL: Error. No session in progressn");
  228.     exit(-1);
  229.   }
  230.   if (first_==rls) {
  231.     first_=rls->next_;
  232.     noSessions_--;
  233.     delete(rls);
  234.     return;
  235.   } 
  236.   RateLimitSession * previous = first_;
  237.   RateLimitSession * current = first_->next_;
  238.   while (current!=NULL) {
  239.     if (current == rls) {
  240.       previous->next_=current->next_;
  241.       noSessions_--;
  242.       delete(rls);
  243.       return;
  244.     }
  245.     previous = current;
  246.     current=current->next_;
  247.   }
  248.   
  249.   printf("RLSL: Error. The correct RLS not foundn");
  250.   exit(-1);
  251. }
  252. //descending order
  253. int 
  254. RateLimitSessionList::rankRate(int myID, double rate) {
  255.     int rank=0;
  256.     RateLimitSession * listItem = first_;
  257.     while (listItem != NULL) {
  258. if (listItem->origin_ == myID && listItem->getArrivalRateForStatus() > rate) {
  259. rank++;
  260. }
  261. listItem = listItem->next_;
  262. }
  263.     
  264.     return rank;
  265. }
  266. //ascending order
  267. int 
  268. RateLimitSessionList::rankSession(int myID, RateLimitSession * session) {
  269.     int rank=0;
  270.     RateLimitSession * listItem = first_;
  271.     while (listItem != NULL) {
  272. if (listItem->origin_ == myID) {
  273. if (listItem->getArrivalRateForStatus() < session->getArrivalRateForStatus()) {
  274. rank++;
  275. }
  276. //to enforce deterministic ordering between sessions with same rate
  277. else if (listItem->getArrivalRateForStatus() == session->getArrivalRateForStatus() &&
  278.  listItem < session) {
  279. rank++;
  280. }
  281. }
  282. listItem = listItem->next_;
  283. }
  284.     
  285.     return rank;
  286. }
  287. // ############################# RateLmitSession Methods #####################
  288. //local constructor
  289. RateLimitSession::RateLimitSession(AggSpec * aggSpec, double rateEstimate, int initial, 
  290.    double limit, int origin, int locQID, 
  291.    double delay, double lowerBound, Node * node, RouteLogic * rtLogic):
  292.   pushbackON_(0), merged_(0), next_(NULL) {
  293.   aggSpec_ = aggSpec;
  294.   origin_ = origin;
  295.   remoteID_ = -1;
  296.   localQID_ = locQID;
  297.   remoteQID_ = -1;
  298.   heightInPTree_ = 0; //always begin as a leaf.
  299.   depthInPTree_ = 0;   
  300.   startTime_ = Scheduler::instance().clock();
  301.   expiryTime_ = startTime_ + delay;
  302.   refreshTime_ = startTime_;
  303.   lowerBound_ = lowerBound;
  304.   initialPhase_=initial;
  305.   rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, rateEstimate);
  306.   logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), rateEstimate);
  307. }
  308. //remote constructor
  309. RateLimitSession::RateLimitSession(AggSpec * aggSpec, double limit, int origin, int locQID, 
  310.    int remoteQID, int remoteID, int depth, double delay, 
  311.    double lowerBound, Node * node, RouteLogic * rtLogic):
  312.   pushbackON_(0), merged_(0), initialPhase_(0), next_(NULL) {
  313.   aggSpec_ = aggSpec;
  314.   origin_ = origin;
  315.   remoteID_ = remoteID;
  316.   localQID_ = locQID;
  317.   remoteQID_ = remoteQID;
  318.   heightInPTree_ = 0; //always begin as a leaf.
  319.   depthInPTree_ = depth;   
  320.   startTime_ = Scheduler::instance().clock();
  321.   expiryTime_ = startTime_ + delay;
  322.   lowerBound_ = lowerBound;
  323.   rlStrategy_ = new RateLimitStrategy(limit, aggSpec_->ptype_, aggSpec->ptypeShare_, 0);
  324.   logData_ = new LoggingDataStruct(node, rtLogic, aggSpec->getSampleAddress(), 0);
  325. }
  326. double 
  327. RateLimitSession::log(Packet *pkt, int lowDemand) {
  328.   
  329.   int member = aggSpec_->member(pkt); 
  330.   
  331.   if (member == 0) {
  332.     //printf("RLS: Found a non-member packet at %gn", Scheduler::instance().clock());
  333.     return 0;
  334.   }
  335.   //expired session
  336.   //   if (expiryTime_ < Scheduler::instance().clock()) {
  337.   //     printf("RLS: Session expired at %g. expiryTime = %gn", 
  338.   //     Scheduler::instance().clock(), expiryTime_);
  339.   //     return 0;
  340.   //   }
  341.   logData_->log(pkt);    //log the packet
  342.   int mine = (origin_ == logData_->myID_); 
  343.   double prob = rlStrategy_->process(pkt, mine, lowDemand);  //rate limit it.
  344.   return prob;
  345. }
  346. double
  347. RateLimitSession::getDropRate() {
  348.   return rlStrategy_->getDropRate();
  349. }
  350. void
  351. RateLimitSession::pushbackOn() {
  352.   pushbackON_ = 1;
  353.   rlStrategy_->reset();
  354. }
  355. void
  356. RateLimitSession::refreshed() {
  357.   refreshTime_ = Scheduler::instance().clock();
  358. }
  359. void 
  360. RateLimitSession::setAggSpec(AggSpec * aggSpec) {
  361.   aggSpec_->dstON_ = aggSpec->dstON_;
  362.   aggSpec_->dstPrefix_ = aggSpec->dstPrefix_;
  363.   aggSpec_->dstBits_ = aggSpec->dstBits_;
  364. }
  365. void
  366. RateLimitSession::setLimit(double limit) {
  367.   rlStrategy_->target_rate_=limit;
  368. }
  369.  
  370. double
  371. RateLimitSession::getArrivalRateForStatus()  {
  372.   // for a leaf PBA, this is the rate seen at the rlStrategy_;
  373.   // for non-leaf PBAs it is the sum of the rates reported by upstream PBAs 
  374.   // in their status messages.
  375.   
  376.   double rate;
  377.   
  378.   if (pushbackON_) {
  379.     logData_->consolidateStatus();
  380.     rate = logData_->statusArrivalRateAll_;
  381.   } 
  382.   else {
  383.     rate = rlStrategy_->getArrivalRate();
  384.   }
  385.   return rate;
  386. }
  387. RateLimitSession *
  388. RateLimitSession::merge(RateLimitSession * session1, RateLimitSession * session2, int bits) {
  389.   RateLimitSession *winner, *loser;
  390.   
  391.   if (session1->pushbackON_) {
  392.     winner = session1;
  393.     loser = session2;
  394.   } else {
  395.     winner = session2;
  396.     loser = session1;
  397.   }
  398.   loser->merged_=1;
  399.   
  400.   int envelope;
  401.   if (session1->aggSpec_->dstBits_==bits) 
  402.     envelope = 1;
  403.   else if (session2->aggSpec_->dstBits_==bits) 
  404.     envelope=2;
  405.   else 
  406.     envelope=0;
  407.   
  408.   double lowerBound = pick4merge(session1->lowerBound_, session2->lowerBound_, envelope);
  409.   winner->lowerBound_=lowerBound;
  410.   double target_rate = pick4merge(session1->rlStrategy_->target_rate_,
  411.    session2->rlStrategy_->target_rate_, 
  412.    envelope);
  413.   winner->setLimit(target_rate);
  414.   double estRate = pick4merge(session1->rlStrategy_->rateEstimator_->estRate_,
  415.       session2->rlStrategy_->rateEstimator_->estRate_,
  416.       envelope);
  417.   winner->rlStrategy_->rateEstimator_->estRate_=estRate;
  418.   LoggingDataStruct * log1 = session1->logData_;
  419.   LoggingDataStruct * log2 = session2->logData_;
  420.   if (log1->count_ != log2->count_ || log1->myID_ != log2->myID_) {
  421.     printf("RLS: Error: logdata count or ID anomalyn");
  422.     exit(-1);
  423.   }
  424.   
  425.   estRate = pick4merge(log1->rateEstimator_->estRate_,
  426.        log2->rateEstimator_->estRate_,
  427.        envelope);
  428.   winner->logData_->rateEstimator_->estRate_ = estRate;
  429.   
  430.   LoggingDataStructNode * node1 = log1->first_;
  431.   LoggingDataStructNode * node2 = log2->first_;
  432.   LoggingDataStructNode * nodew = winner->logData_->first_;
  433.   
  434.   while (node1 != NULL && node2!= NULL && nodew != NULL) {
  435.     if (node1->nid_ != node2->nid_) {
  436.       printf("RLS: Error: Out of order log nodes. Or something more sinistern");
  437.       exit(-1);
  438.     }
  439.     estRate =  pick4merge(node1->rateEstimator_->estRate_,
  440.   node2->rateEstimator_->estRate_,
  441.   envelope);
  442.     nodew->rateEstimator_->estRate_ = estRate;
  443.     
  444.     double statusArrivalRate = pick4merge(node1->statusArrivalRate_,
  445.   node2->statusArrivalRate_,
  446.   envelope);
  447.     nodew->statusArrivalRate_ = statusArrivalRate;
  448.     node1=node1->next_;
  449.     node2=node2->next_;
  450.     nodew=nodew->next_;
  451.   }
  452.   
  453.   if (node1 != NULL || node2 !=NULL || nodew != NULL) {
  454.     printf("RLS: Error: Different chainsn");
  455.     exit(-1);
  456.   }
  457.   
  458.   return winner;
  459. }
  460. double
  461. RateLimitSession::pick4merge(double q1, double q2, int envelope) {
  462.   
  463.   if (envelope == 1) {
  464.     return q1;
  465.   } else if (envelope == 2) {
  466.     return q2;
  467.   }
  468.   return q1+q2;
  469. }
  470. RateLimitSession::~RateLimitSession() {
  471.   delete(aggSpec_);
  472.   delete(rlStrategy_);
  473.   delete(logData_);
  474. }