srr.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:19k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /*  * Copyright (c) 1991-1994 Regents of the University of California.  * All rights reserved.  *  * Redistribution and use in source and binary forms, with or without  * modification, are permitted provided that the following conditions  * are met:  * 1. Redistributions of source code must retain the above copyright  *    notice, this list of conditions and the following disclaimer.  * 2. Redistributions in binary form must reproduce the above copyright  *    notice, this list of conditions and the following disclaimer in the  *    documentation and/or other materials provided with the distribution.  * 3. All advertising materials mentioning features or use of this software  *    must display the following acknowledgement:  * This product includes software developed by the University of  * California, Berkeley and the Network Research Group at  * Lawrence Berkeley Laboratory.  * 4. Neither the name of the University nor of the Laboratory may be used  *    to endorse or promote products derived from this software without  *    specific prior written permission.  *  * THIS SOFTWARE IS PROVIDED BY THE REGENTS AND CONTRIBUTORS ``AS IS'' AND  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE  * ARE DISCLAIMED.  IN NO EVENT SHALL THE REGENTS OR CONTRIBUTORS BE LIABLE  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF  * SUCH DAMAGE.  *  */  /*  * SRR means Smoothed Round Robin.   * as compared with the DRR provided in NS,   * this version now can:  * 1, assign multi flows to a queue  * 2, assign same quantum to different queue  * 3, use the flowid information in the packet to disdinguish   *    different packets.  * 4, it has  a Weight Spread Sequence. the WSS with MAXWSSORDER is stored statically in the memory  * 5, it has the Weight Matrix. the WM can be adjusted in the processing of SRR  *  * for details about SRR, see:    * " SRR: An O(1) Time Complexity Scheduler for Flows in Multi-Service Packet Networks", Chuanxiong Guo, sigcomm'01.  */ // Ported by xuanc, 1/20/02 #include "config.h"   // for string.h #include <stdlib.h> #include "queue.h" #include <math.h> #include <assert.h> #define MAXFLOW 100  // this version supports up to 1024 flows and queues. #define MAXQUEUE 100 // if you want more, you can change it.  #define MAXWSSORDER 16 // a 16th WSS needs pow(2, 16) space //for debug  #ifndef DEBUG_SRR //#define DEBUG_SRR #endif // DEBUG_SRR // this struct is the basic element for the Weight Matrix struct wm_node
  2. { int queueid; // the queue the node belongs to  int weight;  // the weight of the node struct wm_node *next; struct wm_node *prev; struct wm_node *sibling; }; #include "wss.h"  class PacketSRR { PacketSRR(): pkts(0),bcount(0),deficitCounter(0),turn(0),head_(0), tail_(0), len_(0) {} friend class SRR; protected:  int pkts;   //total packets in this Queue int bcount; //count of bytes in each flow to find the max flow; int deficitCounter; // concept borrowed from DRR  int turn;    // whether the queue is on service or not  int queueid; // mark the id of this packet queue Packet *head_, *tail_; int len_; inline void enque(Packet* p) { if (!tail_) head_= tail_= p; else { tail_->next_= p; tail_= p; } tail_->next_= 0; ++len_; } inline Packet* deque() { if (!head_) return 0; Packet* p = head_; head_= p->next_; // 0 if p == tail_ if (p == tail_) head_= tail_= 0; --len_; return p; } Packet* lookup(int n) { for (Packet* p = head_; p != 0; p = p->next_) { if (--n < 0) return (p); } return (0); } }; class SRR : public Queue { public : SRR(); virtual int command(int argc, const char*const* argv); Packet *deque(void); void enque(Packet *pkt); void clear();
  3. public: int maxqueuenumber_ ; //total number of flows allowed int blimit_; //total number of bytes allowed across all flows int bytecnt; //cumulative sum of bytes across all flows int pktcnt; // cumulative sum of packets across all flows int flwcnt; //total number of active flows int last_queueid; // the id of the previously served queue int last_size; // the length of the last send packet PacketSRR srr[MAXFLOW]; //pointer to the entire srr struct int f2q[MAXFLOW]; // flow to queue mapping int private_rate[MAXFLOW]; // bandwidth for each queue int mtu_; // it is used to mark the quantum adding to  // each queue int granularity_; // the min rate that can be allocated to a queue int maxRate; int minRate;  class WSS wss;    // wss deals with the Weight Spread Sequence. /*  * wmHead[0] according to the least important weight,      * wmHead[maxColumn] according to the most important weight  */ struct wm_node wmHead[MAXWSSORDER]; // each column of WM has a head struct wm_node wmTail[MAXWSSORDER]; // each column of WM has a tail  struct wm_node *pwmCurr; // points to the current scanning position struct wm_node *pRowHead[MAXFLOW]; struct wm_node *pRowTail[MAXFLOW]; int wmEmptyFlag; // 0: WM is not empty, 1:WM is empty int maxColumn; // the max column number of the WM, so there are total  // maxColumn+1 columns, the column number is numbered from 0 to  // maxColumn from left to right. int currMaxColumn; // the current max number of the column int min_quantum; inline PacketSRR *getMaxflow () { //returns flow with max bytes  int j,i; PacketSRR *tmp=0; PacketSRR *maxflow=0; int bcount=0; for (i=0;i<maxqueuenumber_;i++) { tmp=&srr[i]; if (tmp->bcount > bcount){ bcount= tmp->bcount; maxflow=tmp;j=i; } } if(maxflow==0){ fprintf(stderr, "getMaxflow, err"); exit(1); } return maxflow; }    //returns queuelength in packets inline int length () { return pktcnt; } //returns queuelength in bytes inline int blength () { return bytecnt; } int add_to_WM(int queueid, int weight); int del_from_WM(int queueid, int weight); struct wm_node * getNextNode( );  }; static class SRRClass : public TclClass { public: SRRClass() : TclClass("Queue/SRR") {} TclObject* create(int, const char*const*) { return (new SRR); } } class_srr; SRR::SRR() { int i; maxqueuenumber_ = 10; blimit_ = 25000; mtu_= 1000;  //1000 bytes quantum at default setting granularity_ = 1000;  //default to 1K bit/s   last_queueid = -1; // -1 means that SRR does not have a previous deque operation last_size = 0; flwcnt = 0;  // init  bytecnt = 0; pktcnt = 0; min_quantum = 1000; pwmCurr = 0; // at first, pwmCurr points to NULL for(i=0; i<MAXFLOW; i++)
  4. { private_rate[i] = granularity_; //default quantum value for each flow f2q[i]=0;  // default queue id for all the flow  // or it will not works right at the default config } maxRate = 100000000; //100Mbps minRate = 1000; //1kbps // init the WM double queues here too. for(i=0;i<MAXWSSORDER;i++){ wmHead[i].prev=NULL; wmHead[i].next=&wmTail[i]; wmTail[i].prev=&wmHead[i]; wmTail[i].next=NULL; wmHead[i].queueid=wmTail[i].queueid=-1; //  wmHead[i].weight=wmHead[i].weight=i; }
  5. for (i=0;i<MAXFLOW; i++) pRowHead[i]= pRowTail[i] = NULL; wmEmptyFlag=1; // it is empty at first wss.init(MAXWSSORDER); // create the Weight Spread Sequence currMaxColumn = -1; maxColumn=0; // allow the TCL scripts to change the following values bind("maxqueuenumber_",&maxqueuenumber_);  //it is the max queuenumber set in the TCL script bind("mtu_", &mtu_);      // set the Max Transfer Unit of the output link bind("granularity_", &granularity_); // set the rate allocation granularity of the  // all the flows.  bind("blimit_",&blimit_); } // enque and deque are two interface functions that provided by  // NS  void SRR::enque(Packet* pkt) { PacketSRR *q; //PacketSRR *remq; int flowid, queueid; int weight; hdr_cmn *ch=hdr_cmn::access(pkt); hdr_ip *iph = hdr_ip::access(pkt); flowid= iph->flowid();  //get the flowid queueid= f2q[flowid];  // get the corresponding queue id if(queueid > maxqueuenumber_)
  6. { fprintf(stderr, "queueid too bign"); exit(1); } #ifdef DEBUG_SRR printf("   in enquen"); fflush(0); #endif // we drop packets from the longest queue /* if( bytecnt+ ch->size() > blimit_){ drop(pkt); return; } */ q=&srr[queueid]; if(q->pkts==qlim_){ drop(pkt); return; } q->enque(pkt); ++q->pkts; ++pktcnt; q->bcount += ch->size(); bytecnt +=ch->size(); if (q->pkts==1) { //add to the WM // adjust currMaxColumn // if it is a first active flow, put the pwmPtr         // all the above works are done in add_to_WM weight= private_rate[queueid]; weight/=granularity_; add_to_WM(queueid, weight); q->deficitCounter=0; }
  7. } Packet *SRR::deque(void)  { hdr_cmn *ch; hdr_ip *iph; Packet *pkt=0; int flowid; int queueid; PacketSRR *pP; // static int dcnt = 0; // if(dcnt <20){ // printf(" in dequeue: %lf n",  Scheduler::instance().clock() );  // dcnt++; // } #ifdef DEBUG_SRR printf("  in dequen"); #endif if(last_queueid>=0){ //printf("last size=%dn ", last_size); srr[last_queueid].bcount -= last_size; srr[last_queueid].pkts-=1; --pktcnt; bytecnt -= last_size; if (srr[last_queueid].pkts == 0) { srr[last_queueid].turn=0; srr[last_queueid].deficitCounter=0; // delete the queue from SRR del_from_WM(last_queueid, private_rate[last_queueid]/granularity_ ); } } assert(pktcnt>=0); if (pktcnt==0) { // fprintf (stderr,"No active flown"); last_queueid=-1; return(0); } if(pwmCurr == NULL){ printf("wrong, pwmCurr is NULLn"); exit(0); } //printf("pktcnt=%dn", pktcnt); while (!pkt) { if(pwmCurr->queueid==-1){ // it should never happen fprintf(stderr,"pwmCurr points to head or tailn");  fprintf(stderr, "weight:%d", pwmCurr->weight); exit(0); } pP=&srr[pwmCurr->queueid];  pkt=pP->lookup(0);   if(pkt==0){ fprintf(stderr, "wrong place, should never be heren"); exit(2); } iph = hdr_ip::access(pkt); flowid=iph->flowid(); queueid= f2q[flowid]; // get the corresponding queue id assert(queueid== pwmCurr->queueid); if (!pP->turn) { pP->deficitCounter+= mtu_; // each queue shares the same quantum! // this is the difference between DRR! pP->turn=1; } ch=hdr_cmn::access(pkt); if (pP->deficitCounter >= ch->size()) { pP->deficitCounter -= (ch->size()); pkt=pP->deque(); last_size=ch->size( ); // pP->bcount -= ch->size(); // --pP->pkts; // --pktcnt; // bytecnt -= ch->size(); // if (pP->pkts == 0) { // pP->turn=0; // pP->deficitCounter=0; // delete the queue from SRR // del_from_WM(queueid, private_rate[queueid]/granularity_ ); //getNextNode(); // } #ifdef DEBUG_SRR printf("deque a packet, id=%d, size=%dn", queueid, last_size); #endif last_queueid=queueid; return pkt; } else { pP->turn=0;    // pwmCurr should be adjusted. getNextNode( ); pkt=0; } } return 0;    // not reached } void SRR::clear() { PacketSRR *q =srr; int i = maxqueuenumber_; if (!q) return; while (i--) { if (q->pkts) { fprintf(stderr, "Changing non-empty flow from srrn"); exit(1); } ++q; } } // weight is the queues rate/granularity. int SRR::add_to_WM(int queueid, int weight) { struct wm_node *pNode; int i; //int j=maxColumn; //int temp=weight; //int flag=0; int old_colno = currMaxColumn; if(weight==0)
  8. { fprintf(stderr, "add_to_WM: weight should not be zero"); exit(1); } if(weight > ( (1<<(maxColumn+1))-1) )
  9. { fprintf(stderr, "add_to_WM: weight too big"); exit(1); }  // add to the WM // adjust currMaxColumn // if it is a first active flow, put the pwmPtr for(i=maxColumn; i>=0; i--)
  10. { if (weight & (1<<i) )
  11. {  
  12. //  // add to queueid= i; wmHead[queueid], wmTail[queueid] pNode=(struct wm_node*)malloc(sizeof(struct wm_node));
  13. if(pNode==NULL)
  14. { fprintf(stderr, "no memeory to create WM node"); exit(2); } pNode->queueid = queueid; pNode->weight = i; pNode->sibling = NULL; if(pRowTail[queueid] == NULL){ pRowHead[queueid]= pRowTail[queueid] = pNode; }else{ pRowTail[queueid]->sibling = pNode; pRowTail[queueid] = pNode; } if( pwmCurr && (pwmCurr->weight == i) ){ pNode-> prev =  pwmCurr->prev; pNode-> next =  pwmCurr; (pwmCurr->prev)->next = pNode; pwmCurr->prev = pNode; }else { pNode->prev = wmTail[i].prev; pNode->next = &wmTail[i]; (wmTail[i].prev)->next = pNode; wmTail[i].prev = pNode; } if(currMaxColumn < i) currMaxColumn = i; // adjust the current max column number if(wmEmptyFlag == 1)
  15. { wmEmptyFlag=0; if(pwmCurr == NULL) // we should let it points to the correct place.
  16. pwmCurr=pNode; } } }
  17. if (  old_colno < currMaxColumn )
  18. { if(old_colno >= 0){ // if(old_colno > 0){ int pc = wss.get_ptr () + 1;
  19. pc = pc << (currMaxColumn - old_colno);
  20. wss.set_ptr ( pc -1);
  21. // printf("set_ptr in add_to_wm: ptr:%dn", pc-1); // printf("old column no: %d %dn", old_colno, currMaxColumn); } } //printf("in add_to_wm: k:%d, j:%dn", currMaxColumn, old_colno); ++flwcnt; return 0; } //remove the wm_node from the links and free the memory int SRR::del_from_WM(int queueid, int weight)
  22. { struct wm_node *pNode, *pNode2; int i;
  23. int wss_term; 
  24. int temp; 
  25. if(pwmCurr->queueid==queueid)  // we adjust pwmCurr before we delete the row
  26. {  if (pwmCurr->next != &wmTail[pwmCurr->weight])
  27. pwmCurr = pwmCurr->next ;
  28. else pwmCurr = NULL; } /* for(i=0;i<=currMaxColumn;i++) // travel all double links, and delete the node whose id is queueid
  29. {  pNode=wmHead[i].next; while(pNode!=&wmTail[i]){ if(pNode->queueid==queueid){ //yes, we get one. //remove it from the link, and free the node. (pNode->prev)->next=pNode->next; (pNode->next)->prev=pNode->prev; free(pNode); break; } else pNode=pNode->next; } } */ pNode = pRowHead[queueid]; while (pNode){ (pNode->prev)->next=pNode->next; (pNode->next)->prev=pNode->prev; pNode2= pNode; pNode = pNode->sibling; free(pNode2); } pRowHead[queueid] = pRowTail[queueid] = NULL; // we should adjust currMaxColumn,  //and if currMaxColumn becomes less, we should also adjust WSS's pointer
  30. int old_colno = currMaxColumn;
  31. for(i=currMaxColumn;i>=0;i--)
  32. { if(wmHead[i].next != &wmTail[i])
  33. {   currMaxColumn=i;   break; } } if(i<0)
  34. { // it is empty now. #ifdef DEBUG_SRR //printf("WM emptyn"); #endif wmEmptyFlag=1; currMaxColumn=-1; pwmCurr=NULL; last_queueid=-1;
  35. wss.set_ptr (0); // reset the WSS sequence pointer to 0; goto rr; } else if ( currMaxColumn < old_colno)
  36. {
  37. int pc = wss.get_ptr () +1;
  38. //printf("pc = %d n", pc); int mul = 1 << (old_colno - currMaxColumn ); 
  39. int tmpc = pc / mul;
  40. if (pc % mul){
  41. tmpc += 1;
  42. if (tmpc > ((1<<(currMaxColumn+1) )- 1) )
  43. tmpc = 1;
  44. }
  45. wss.set_ptr (tmpc -1);
  46. //printf("set_ptr in del_from__wm: ptr:%dn", tmpc-1); //printf("k:%d j:%dn", currMaxColumn, old_colno);
  47. } if (pwmCurr == NULL){
  48. loop:
  49. wss_term=wss.get(currMaxColumn+1);
  50. wss.inc_ptr (currMaxColumn+1 );
  51. temp = currMaxColumn+1-wss_term;
  52.  
  53. // try to point pwmCurr to the right place.
  54. if( wmHead[temp].next != &wmTail[temp])
  55. {
  56. pwmCurr=wmHead[temp].next;
  57. }else
  58. goto loop;
  59. }
  60. rr: --flwcnt; return 0; } /* should be checked:  * whether it returns nothing when the WM is empty  */ struct wm_node *SRR::getNextNode(){   //struct wm_node *pNode;   int queueid;   int weight;   int wss_term;   int temp; if(bytecnt==0){ // printf("getNextNode, pwmCurr = NULL, wmEmptyFlag=%dn", wmEmptyFlag);
  61. return NULL; } queueid=pwmCurr->queueid; weight= pwmCurr->weight; if(pwmCurr->next != &wmTail[weight]){ // not the tail  pwmCurr=pwmCurr ->next; } else{ if (currMaxColumn==-1){ fprintf(stderr, "getNextNode, empty WM"); exit(0); } again:
  62. wss_term=wss.get(currMaxColumn+1);
  63. wss.inc_ptr (currMaxColumn+1 );
  64. temp = currMaxColumn+1-wss_term;
  65. // wss_term according to currMaxColumn+1-wss_term queue   if( wmHead[temp].next != &wmTail[temp]) { pwmCurr=wmHead[temp].next; }else goto again; } return pwmCurr; } int getOrder(int i){ int order=0; while( (1 << order)<= i) order++; return (order-1); } /*  *Allows one to change   * blimit_    * maxqueuenumber_   * mtu_  * granularity_  * for a particular srrQ :  *  *  */ int SRR::command(int argc, const char*const* argv) { if (argc==3) { if (strcmp(argv[1], "blimit") == 0) { blimit_ = atoi(argv[2]); if (bytecnt > blimit_) { fprintf (stderr,"More packets in buffer than the new limit"); exit (1); } return (TCL_OK); } if (strcmp(argv[1], "maxqueuenumber") == 0) { //clear(); maxqueuenumber_ = atoi(argv[2]); return (TCL_OK); } if (strcmp(argv[1],"mtu")==0) { mtu_= atoi(argv[2]); return (TCL_OK); } if (strcmp(argv[1], "granularity")==0) { granularity_ = atoi(argv[2]); return (TCL_OK); } } if (argc == 4) { if(!strcmp(argv[1],"setrate"))  { int rate; int queue,success=0; int temp; success += sscanf(argv[2],"%d",&queue); success += sscanf(argv[3],"%d",&rate); if(success!=2){ fprintf(stderr, "SRR setrate ??"); exit(0); exit(1); } if ( queue>MAXQUEUE ) { fprintf(stderr,"queue id out of range"); exit(0); } min_quantum= min_quantum<rate ? min_quantum:rate; private_rate[queue]=rate; if(private_rate[queue]<minRate) private_rate[queue]=minRate; if(private_rate[queue]>maxRate){ fprintf(stderr, "Rate too hight!n"); exit(1); } success=private_rate[queue]/granularity_; if(success==0) success=1; /*if(private_rate[queue]%granularity_) success++;*/   temp=getOrder(success);  //now we have the order of the WSS if(maxColumn<temp){ maxColumn=temp; } #ifdef DEBUG_SRR printf("maxColumn=%dn", maxColumn), fflush(0); #endif if(maxColumn>(MAXWSSORDER-1)){ //the order of max band flow is too big! fprintf(stderr, "granularity too small or band too big!"); exit(2); } return (TCL_OK); } else if(!strcmp(argv[1],"setqueue")) { int queue,flow,success=0; success += sscanf(argv[2],"%d",&flow); success += sscanf(argv[3],"%d",&queue); if(success==2) { if ( !(queue<MAXQUEUE) ) {                                         fprintf(stderr,"queue id out of range");exit(1);                                 } if ( !(flow<MAXFLOW) ) {                                         fprintf(stderr,"flow id out of range"); exit(1);                                 } f2q[flow]=queue; return TCL_OK; } } } return (Queue::command(argc, argv)); }