pgm-receiver.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:23k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /*
  2.  * pgm-receiver.cc
  3.  * Copyright (C) 2001 by the University of Southern California
  4.  * $Id: pgm-receiver.cc,v 1.8 2006/02/21 15:20:19 mahrenho Exp $
  5.  *
  6.  * This program is free software; you can redistribute it and/or
  7.  * modify it under the terms of the GNU General Public License,
  8.  * version 2, as published by the Free Software Foundation.
  9.  *
  10.  * This program is distributed in the hope that it will be useful,
  11.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.  * GNU General Public License for more details.
  14.  *
  15.  * You should have received a copy of the GNU General Public License along
  16.  * with this program; if not, write to the Free Software Foundation, Inc.,
  17.  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  18.  *
  19.  *
  20.  * The copyright of this module includes the following
  21.  * linking-with-specific-other-licenses addition:
  22.  *
  23.  * In addition, as a special exception, the copyright holders of
  24.  * this module give you permission to combine (via static or
  25.  * dynamic linking) this module with free software programs or
  26.  * libraries that are released under the GNU LGPL and with code
  27.  * included in the standard release of ns-2 under the Apache 2.0
  28.  * license or under otherwise-compatible licenses with advertising
  29.  * requirements (or modified versions of such code, with unchanged
  30.  * license).  You may copy and distribute such a system following the
  31.  * terms of the GNU GPL for this module and the licenses of the
  32.  * other code concerned, provided that you include the source code of
  33.  * that other code when and as the GNU GPL requires distribution of
  34.  * source code.
  35.  *
  36.  * Note that people who make modified versions of this module
  37.  * are not obligated to grant this special exception for their
  38.  * modified versions; it is their choice whether to do so.  The GNU
  39.  * General Public License gives permission to release a modified
  40.  * version without this exception; this exception also makes it
  41.  * possible to release a modified version which carries forward this
  42.  * exception.
  43.  *
  44.  */
  45. /*
  46.  * Pragmatic General Multicast (PGM), Reliable Multicast
  47.  *
  48.  * pgm-receiver.cc
  49.  *
  50.  * This implements the Receiving PGM agent, "Agent/PGM/Receiver".
  51.  *
  52.  * Ryan S. Barnett, 2001
  53.  * rbarnett@catarina.usc.edu
  54.  */
  55. #include "config.h"
  56. #ifdef HAVE_STL
  57. #include <stdlib.h>
  58. #include <stdio.h>
  59. #include <map>
  60. #include "config.h"
  61. #include "tclcl.h"
  62. #include "agent.h"
  63. #include "packet.h"
  64. #include "ip.h"
  65. #include "random.h"
  66. #include "basetrace.h"
  67. #include "rcvbuf.h"
  68. #include "pgm.h"
  69. // ************************************************************
  70. // Define the PGM Receive Timer Class
  71. // ************************************************************
  72. class PgmReceiver;
  73. // Types of timers.
  74. enum {
  75.   NAK_TIMER = 0
  76. };
  77. class PgmReceiverTimer : public TimerHandler {
  78. public:
  79.   PgmReceiverTimer(PgmReceiver *a, int type) : TimerHandler(), data_(NULL) {
  80.     a_ = a;
  81.     type_ = type;
  82.   }
  83.   void * & data() { return data_; }
  84. protected:
  85.   virtual void expire(Event *e);
  86.   PgmReceiver *a_;
  87.   int type_;
  88.   void *data_;
  89. };
  90. // ************************************************************
  91. // Define the NakItem Class
  92. // ************************************************************
  93. // The different states of a NakItem entry.
  94. enum {
  95.   BACK_OFF_STATE = 0,
  96.   WAIT_NCF_STATE = 1,
  97.   WAIT_DATA_STATE = 2
  98. };
  99. class NakItem {
  100. public:
  101.   NakItem(PgmReceiver *a, int seqno) : nak_state_(BACK_OFF_STATE),
  102.        nak_sent_(false), seqno_(seqno), ncf_retry_count_(0),
  103.        data_retry_count_(0), nak_timer_(a, NAK_TIMER) { }
  104.   int & nak_state() { return nak_state_; }
  105.   bool & nak_sent() { return nak_sent_; }
  106.   int & seqno() { return seqno_; }
  107.   int & ncf_retry_count() { return ncf_retry_count_; }
  108.   int & data_retry_count() { return data_retry_count_; }
  109.   PgmReceiverTimer & nak_timer() { return nak_timer_; }
  110. protected:
  111.   // The current state of the NAK entry.
  112.   int nak_state_;
  113.   // Whether or not this NAK has been transmitted once or more.
  114.   bool nak_sent_;
  115.   // Sequence number of the missing NAK item.
  116.   int seqno_;
  117.   // Number of times we have sent out a NAK but timed out waiting for an NCF.
  118.   int ncf_retry_count_;
  119.   // Number of times we got an NCF but timed out waiting for RDATA/ODATA.
  120.   int data_retry_count_;
  121.   PgmReceiverTimer nak_timer_;
  122. };
  123. // ************************************************************
  124. // Define the PGM Receiver Class
  125. // ************************************************************
  126. static int pgm_rcv_uid_ = 0;
  127. struct Stats {
  128.   // Number of naks that did NOT get sent because we received an NCF
  129.   // before our timer went up.
  130.   int naks_transmitted_;
  131.   // Number of duplicate naks that were sent to the upstream node.
  132.   int naks_duplicated_;
  133. };
  134. class PgmReceiver: public Agent {
  135. public:
  136.   PgmReceiver();
  137.   virtual void recv(Packet *, Handler *);
  138.   virtual void timeout(int type, void *data);
  139.   virtual int command(int argc, const char*const* argv);
  140. protected:
  141.   void handle_spm(Packet *pkt);
  142.   void handle_odata(Packet *pkt);
  143.   void handle_rdata(Packet *pkt);
  144.   void handle_nak(Packet *pkt);
  145.   void handle_ncf(Packet *pkt);
  146.   void generate_Nak(int seqno);
  147.   void cancel_Nak(int seqno, NakItem *nitem = NULL);
  148.   void timeout_nak(NakItem *data);
  149.   void send_nak(int seqno);
  150.   void print_stats();
  151.   void display_packet(Packet *pkt); // For debugging.
  152.   void trace_event(char *evType, double evTime);
  153.   EventTrace * et_;  //Trace Object for Custom Event Trace
  154.   char uname_[16]; // Unique PGM receiver name, for debugging.
  155.   // Various statistical information.
  156.   Stats stats_;
  157.   // Maximum number of times we can send out a NAK and time-out waiting for
  158.   // an NCF reply. Once we hit this many times, we discard the NAK state
  159.   // entirely and loose data.
  160.   int max_nak_ncf_retries_;
  161.   // Maximum number of times we can time-out waiting for RDATA after an
  162.   // NCF confirmation for a NAK request.  Once we hit this many times, we
  163.   // discard the NAK state entirely and loose data.
  164.   int max_nak_data_retries_;
  165.   // A random amount of this time period will be selected to wait for an
  166.   // NCF after detecting a gap in the data stream, before sending out a NAK.
  167.   double nak_bo_ivl_;
  168.   // The amount of time to wait for a NCF packet after sending out a NAK
  169.   // packet to the upstream node.
  170.   double nak_rpt_ivl_;
  171.   // The amount of time to wait for RDATA after receiving an NCF confirmation
  172.   // for a given NAK.
  173.   double nak_rdata_ivl_;
  174.   // Whether or not the tsi/upstream_node/upstream_face are valid, i.e. have
  175.   // we received at least one SPM packet for the session.
  176.   bool have_tsi_state_;
  177.   int spm_seqno_; // Last largest received SPM sequence number.
  178.   ns_addr_t tsi_; // Transport Session ID
  179.   ns_addr_t upstream_node_; // Address of upstream PGM router.
  180.   int upstream_iface_; // Interface of upstream PGM router.
  181.   // Source and group of ODATA/RDATA packets. Used when sending NAK messages.
  182.   ns_addr_t source_;
  183.   ns_addr_t group_;
  184.   // Keep track of received packets and collect various statistics.
  185.   RcvBuffer rcvbuf_;
  186.   // Collection of sequence numbers that we are waiting for RDATA/ODATA.
  187.   map<int, NakItem> naks_;
  188. };
  189. static class PgmReceiverClass : public TclClass {
  190. public:
  191.   PgmReceiverClass() : TclClass("Agent/PGM/Receiver") {}
  192.   TclObject * create(int argc, const char * const * argv) {
  193.     return (new PgmReceiver());
  194.   }
  195. } class_pgm_receiver;
  196. void PgmReceiverTimer::expire(Event *e) {
  197.   a_->timeout(type_, data_);
  198. }
  199. // Constructor.
  200. PgmReceiver::PgmReceiver() : Agent(PT_PGM), have_tsi_state_(false),
  201.      spm_seqno_(-1)
  202. {
  203.   stats_.naks_transmitted_ = 0;
  204.   stats_.naks_duplicated_ = 0;
  205.   sprintf (uname_, "pgmRecv-%d", pgm_rcv_uid_++);
  206.   bind("max_nak_ncf_retries_", &max_nak_ncf_retries_);
  207.   bind("max_nak_data_retries_", &max_nak_data_retries_);
  208.   bind_time("nak_bo_ivl_", &nak_bo_ivl_);
  209.   bind_time("nak_rpt_ivl_", &nak_rpt_ivl_);
  210.   bind_time("nak_rdata_ivl_", &nak_rdata_ivl_);
  211.   et_ = (EventTrace *) NULL;
  212. }
  213. // Code to execute when a packet is received.
  214. void PgmReceiver::recv(Packet *pkt, Handler *)
  215. {
  216.   hdr_pgm *hp = HDR_PGM(pkt);
  217.   hdr_cmn *hc = HDR_CMN(pkt);
  218.   if (hc->ptype_ != PT_PGM) {
  219.     printf("%s ERROR (PgmReceiver::recv): received non PGM pkt type %d, discarding.n", uname_, hc->ptype_);
  220.     Packet::free(pkt);
  221.     return;
  222.   }
  223. #ifdef PGM_DEBUG
  224.   display_packet(pkt);
  225. #endif
  226.   switch(hp->type_) {
  227.   case PGM_SPM:
  228.     handle_spm(pkt);
  229.     break;
  230.   case PGM_ODATA:
  231.     handle_odata(pkt);
  232.     break;
  233.   case PGM_RDATA:
  234.     handle_rdata(pkt);
  235.     break;
  236.   case PGM_NAK:
  237.     // We only receive a NAK if it is multicast from another receiver
  238.     // who is not directly connected to a PGM router.
  239.     handle_nak(pkt);
  240.     break;
  241.   case PGM_NCF:
  242.     handle_ncf(pkt);
  243.     break;
  244.   default:
  245.     printf("ERROR (PgmReceiver::recv): Received invalid PGM type %d.n",
  246.    hp->type_);
  247.     break;
  248.   }
  249.   Packet::free(pkt);
  250. }
  251. // Code to execute when a timeout occurs.
  252. void PgmReceiver::timeout(int type, void *data)
  253. {
  254.   switch(type) {
  255.   case NAK_TIMER:
  256.     timeout_nak((NakItem *) data);
  257.     break;
  258.   default:
  259.     printf("ERROR (PgmReceiver::timeout): Unknown timeout type %d.n", type);
  260.     break;
  261.   }
  262. }
  263. // Called when a TCL command is issued to the PGM Receiver object.
  264. int PgmReceiver::command(int argc, const char*const* argv)
  265. {
  266.   //  Tcl& tcl = Tcl::instance();
  267.   if (argc == 2) {
  268.     if (strcmp(argv[1], "print-stats") == 0) {
  269.       print_stats();
  270.       return (TCL_OK);
  271.     }
  272.   }
  273.   else if (argc == 3) {
  274.     if (strcmp(argv[1], "eventtrace") == 0) {
  275.       et_ = (EventTrace *)TclObject::lookup(argv[2]);
  276.       return (TCL_OK);
  277.     }
  278.   }
  279.   return (Agent::command(argc, argv));
  280. }
  281. void PgmReceiver::trace_event(char *evType, double evTime) {
  282.   if (et_ == NULL) return;
  283.   char *wrk = et_->buffer();
  284.   char *nwrk = et_->nbuffer();
  285.   if (wrk != NULL) {
  286.     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
  287.             et_->round(Scheduler::instance().clock()),   
  288.             addr(),                    
  289.             0,                   
  290.             evType,                  
  291. evTime);
  292.   if (nwrk != 0)
  293.     sprintf(nwrk,
  294. "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
  295. et_->round(Scheduler::instance().clock()),   // time
  296. evType,                    // event type
  297. addr(),                       // owner (src) node id
  298. port(),                       // owner (src) port id
  299. 0,                      // dst node id
  300. 0                       // dst port id
  301. );
  302. et_->dump();
  303.   }
  304. }
  305. void PgmReceiver::handle_spm(Packet *pkt)
  306. {
  307.   hdr_cmn *hc = HDR_CMN(pkt);
  308.   hdr_ip *hip = HDR_IP(pkt);
  309.   hdr_pgm *hp = HDR_PGM(pkt);
  310.   hdr_pgm_spm *hps = HDR_PGM_SPM(pkt);
  311.   if (have_tsi_state_ == false) {
  312.     // First SPM message.
  313.     have_tsi_state_ = true;
  314.     // Set the TSI.
  315.     tsi_ = hp->tsi_;
  316.     // Set the source and group addresses for this TSI.
  317.     source_ = hip->src();
  318.     group_ = hip->dst();
  319.   }
  320.   else {
  321.     // Check that the TSI is correct.
  322.     if (!(hp->tsi_.isEqual (tsi_))) {
  323.       printf("%s Received SPM with incorrect TSI, discarding.n", uname_);
  324.       return;
  325.     }
  326.     // Check that the sequence number is newer than a previous SPM message.
  327.     if (hp->seqno_ <= spm_seqno_) {
  328.       printf("%s received an old SPM seqno, discarding.n", uname_);
  329.       return;
  330.     }
  331.   }
  332.   // Set the initial sequence number.
  333.   spm_seqno_ = hp->seqno_;
  334.   // Set the upstream node.
  335.   upstream_node_ = hps->spm_path_;
  336.   // Set the upstream interface.
  337.   upstream_iface_ = hc->iface();
  338. }
  339. void PgmReceiver::handle_odata(Packet *pkt)
  340. {
  341.   hdr_pgm *hp = HDR_PGM(pkt);
  342.   // Check that the TSI is correct.
  343.   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
  344.     printf("PGM Receiver received ODATA with incorrect TSI, discarding.n");
  345.     return;
  346.   }
  347.   double clock = Scheduler::instance().clock();
  348.   if (rcvbuf_.nextpkt_ < hp->seqno_) {
  349.     int lo = rcvbuf_.nextpkt_;
  350.     int hi = hp->seqno_ - 1;
  351.     for (int i = lo; i <= hi; i++) {
  352.       printf("%s detected loss of seq %dn", uname_, i);
  353.       if (have_tsi_state_ == false) {
  354. printf("%s has no TSI/SPM state when lost packet was detected. This results in unrecoverable data loss.n", uname_);
  355.       }
  356.       else {
  357. generate_Nak(i);
  358.       }
  359.     }
  360.   }
  361.   rcvbuf_.add_pkt(hp->seqno_, clock);
  362.   // Recept of ODATA that came in late could cancel a previously
  363.   // generated NAK.
  364.   cancel_Nak(hp->seqno_, NULL);
  365. }
  366. void PgmReceiver::handle_rdata(Packet *pkt)
  367. {
  368.   hdr_pgm *hp = HDR_PGM(pkt);
  369.   // Check that the TSI is correct.
  370.   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
  371.     printf("%s received RDATA with incorrect TSI, discarding.n", uname_);
  372.     return;
  373.   }
  374.   if ( (rcvbuf_.nextpkt_ > hp->seqno_) && !rcvbuf_.exists_pkt(hp->seqno_) ) {
  375.     // The receive state may or may not exist, depending on how late the
  376.     // RDATA packet came back to this node.
  377.     cancel_Nak(hp->seqno_, NULL);
  378.   }
  379.   double clock = Scheduler::instance().clock();
  380.   rcvbuf_.add_pkt (hp->seqno_, clock);
  381. }
  382. // The receiver will receive a NAK only if it is sent as a multicast from
  383. // another receiver in the event that the other receiver is not directly
  384. // connected to a PGM router.
  385. void PgmReceiver::handle_nak(Packet *pkt)
  386. {
  387.   hdr_pgm *hp = HDR_PGM(pkt);
  388.   // Check that the TSI is correct.
  389.   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
  390.     printf("%s received NAK with incorrect TSI, discarding.n", uname_);
  391.     return;
  392.   }
  393.   // Locate the nak state for the given multicast NAK.
  394.   map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
  395.   if (result == naks_.end()) {
  396.     // No state was found. Discard the NCF.
  397.     printf("%s received multicast NAK but no NAK state found, discarding.n", uname_);
  398.     return;
  399.   }
  400.   NakItem *nitem = &((*result).second);
  401.   switch( nitem->nak_state() ) {
  402.   case BACK_OFF_STATE:
  403.     // Move to WAIT_NCF_STATE.
  404.     nitem->nak_state() = WAIT_NCF_STATE;
  405.     // Reset the timer.
  406.     nitem->nak_timer().resched(nak_rpt_ivl_);
  407.     break;
  408.   case WAIT_NCF_STATE:
  409.     // Stay in the same state.
  410.     // Reset the timer.
  411.     nitem->nak_timer().resched(nak_rpt_ivl_);
  412.     break;
  413.   case WAIT_DATA_STATE:
  414.     // Stay in the same state.
  415.     // Reset the timer.
  416.     nitem->nak_timer().resched(nak_rdata_ivl_);
  417.     break;
  418.   default:
  419.     printf("ERROR (PgmReceiver::handle_nak): Unknown nak state %d.n", nitem->nak_state());
  420.     break;
  421.   }
  422. }
  423. void PgmReceiver::handle_ncf(Packet *pkt)
  424. {
  425.   hdr_pgm *hp = HDR_PGM(pkt);
  426.   // Check that the TSI is correct.
  427.   if ( (have_tsi_state_ == true) && !(hp->tsi_.isEqual (tsi_)) ) {
  428.     printf("%s received NCF with incorrect TSI, discarding.n", uname_);
  429.     return;
  430.   }
  431.   // Check that the NCF came from our uplink interface.
  432.   // Causes a problem because iface doesn't get relabeled.  But this isn't
  433.   // needed if every receiver also has a PGM/Agent running on the node.
  434.   /*
  435.   if (hc->iface() != upstream_iface_) {
  436.     printf("%s received NCF from non-upstream interface, discarding. Upstream_iface = %dn", uname_, upstream_iface_);
  437.     return;
  438.   }
  439.   */
  440.   // Locate the nak state for the given NCF.
  441.   map<int, NakItem>::iterator result = naks_.find(hp->seqno_);
  442.   if (result == naks_.end()) {
  443.     // No state was found. Discard the NCF.
  444.     printf("%s received NCF but no NAK state found, discarding.n", uname_);
  445.     return;
  446.   }  
  447.   NakItem *nitem = &((*result).second);
  448.   switch( nitem->nak_state() ) {
  449.   case BACK_OFF_STATE:
  450.     // Move to WAIT_DATA_STATE.
  451.     nitem->nak_state() = WAIT_DATA_STATE;
  452.     // Reset the timer.
  453.     nitem->nak_timer().resched(nak_rdata_ivl_);
  454.     break;
  455.   case WAIT_NCF_STATE:
  456.     // Move to WAIT_DATA_STATE.
  457.     nitem->nak_state() = WAIT_DATA_STATE;
  458.     // Reset the timer.
  459.     nitem->nak_timer().resched(nak_rdata_ivl_);
  460.     break;
  461.   case WAIT_DATA_STATE:
  462.     // Stay in the same state.
  463.     // Reset the timer.
  464.     nitem->nak_timer().resched(nak_rdata_ivl_);
  465.     
  466.     break;
  467.   default:
  468.     printf("ERROR (PgmReceiver::handle_ncf): Unknown nak state %d.n", nitem->nak_state());
  469.     return;
  470.   }
  471. }
  472. void PgmReceiver::generate_Nak(int seqno)
  473. {
  474. #ifdef PGM_DEBUG
  475.   double now = Scheduler::instance().clock();
  476.   printf("at %f %s generating NAK state for seqno %d.n", now, uname_, seqno);
  477. #endif
  478.   // Insert the given sequence number into the nak map.
  479.   pair<map<int, NakItem>::iterator, bool> result;
  480.   result = naks_.insert(pair<int, NakItem>(seqno, NakItem(this, seqno)));
  481.   NakItem *nitem = &(result.first->second);
  482.   if (result.second == true) {
  483.     // New NAK entry was added. Select a backoff time period over nak_bo_ivl_.
  484.     double backoff = Random::uniform(nak_bo_ivl_);
  485.     // Set the data field of the nak timer.
  486.     nitem->nak_timer().data() = nitem;
  487.     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
  488.     nitem->nak_timer().resched(backoff);
  489.     printf("backoff: %fn", backoff);
  490.     trace_event("DETECT", backoff);  //Detected Loss, will send NACK after backoff
  491.   }
  492.   else {
  493.     printf("%s generate_Nak was called with NAK state already established, ignoring.n", uname_);
  494.   }
  495. }
  496. void PgmReceiver::cancel_Nak(int seqno, NakItem *nitem)
  497. {
  498.   if (nitem == NULL) {
  499.     // Look up the sequence number in the nak map.
  500.     map<int, NakItem>::iterator result = naks_.find(seqno);
  501.     if (result == naks_.end()) {
  502.       // The NAK state was not found. This is fine since the handle_odata()
  503.       // function calls cancel_Nak on all packets in case 
  504.       return;
  505.     }
  506.     nitem = &((*result).second);
  507.   }
  508.   // Cancel the NAK timer.
  509.   nitem->nak_timer().force_cancel();
  510.   // Erase the item from the Nak map.
  511.   if (!naks_.erase(seqno)) {
  512.     printf("ERROR (PgmReceiver::cancel_Nak): Failed erasing seqno from nak map.n");
  513.   }
  514. }
  515. void PgmReceiver::timeout_nak(NakItem *nitem)
  516. {
  517.   double backoff;
  518.   switch(nitem->nak_state()) {
  519.   case BACK_OFF_STATE:
  520.     if (nitem->nak_sent() == false) {
  521.       nitem->nak_sent() = true;
  522.     }
  523.     else {
  524.       stats_.naks_duplicated_++;
  525.     }
  526.     // Move into WAIT_NCF_STATE.
  527.     nitem->nak_state() = WAIT_NCF_STATE;
  528.     // Set new timer to go off.
  529.     nitem->nak_timer().resched(nak_rpt_ivl_);
  530.     send_nak(nitem->seqno());
  531.     break;
  532.   case WAIT_NCF_STATE:
  533.     // If we have exceeded the number of times we can retry this NAK,
  534.     // then cancel.
  535.     if (nitem->ncf_retry_count() > max_nak_ncf_retries_) {
  536.       // Cancel this NAK generation. Remove all state associated with the
  537.       // NAK, we have unrecoverable data loss.
  538.       printf("%s reached max_nak_ncf_retries, stopping NAK generation.n", uname_);
  539.       cancel_Nak(nitem->seqno(), nitem);
  540.       return;
  541.     }
  542.     nitem->ncf_retry_count() += 1;
  543.     // Move into BACK_OFF_STATE
  544.     nitem->nak_state() = BACK_OFF_STATE;
  545.     // Set timer to go off.
  546.     backoff = Random::uniform(nak_bo_ivl_);
  547.     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
  548.     nitem->nak_timer().resched(backoff);
  549.     break;
  550.   case WAIT_DATA_STATE:
  551.     // Exceeded the number of times we wait for RDATA for this confirmed NAK?
  552.     if (nitem->data_retry_count() > max_nak_data_retries_) {
  553.       printf("%s reached max_nak_data_retries, stopping NAK generation.n", uname_);
  554.       cancel_Nak(nitem->seqno(), nitem);
  555.       return;
  556.     }
  557.     nitem->data_retry_count() += 1;
  558.     // Move into BACK_OFF_STATE
  559.     nitem->nak_state() = BACK_OFF_STATE;
  560.     // Set timer to go off.
  561.     backoff = Random::uniform(nak_bo_ivl_);
  562.     // Set the NAK timer to expire in BACK_OFF_STATE with the selected time.
  563.     nitem->nak_timer().resched(backoff);
  564.     break;
  565.   default:
  566.     printf("ERROR (PgmReceiver::timeout_nak): Unknown NAK state %d.n", nitem->nak_state());
  567.     break;
  568.   }
  569. }
  570. void PgmReceiver::send_nak(int seqno)
  571. {
  572.   printf("%s send_nak is called.n", uname_);
  573.   Packet *nak_pkt = allocpkt();
  574.   // Set the simulated size of the NAK packet.
  575.   hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
  576.   nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
  577.   nak_hc->ptype_ = PT_PGM;
  578.   // Set the destination address to be our upstream node.
  579.   hdr_ip *nak_hip = HDR_IP(nak_pkt);
  580.   nak_hip->dst() = upstream_node_;
  581.   // Set the color for NAK packets in nam.
  582.   nak_hip->fid_ = 8;
  583.   // Fill in the PGM header for the NAK packet.
  584.   hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
  585.   nak_hp->type_ = PGM_NAK;
  586.   nak_hp->tsi_ = tsi_;
  587.   nak_hp->seqno_ = seqno;
  588.   // Fill in the PGM NAK header for the NAK packet.
  589.   hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
  590.   nak_hpn->source_ = source_;
  591.   nak_hpn->group_ = group_;
  592.   // Increment the statistical counter that keeps track of the number
  593.   // of naks transmitted.
  594.   stats_.naks_transmitted_++;
  595.   // Send out the packet.
  596.   send(nak_pkt, 0);
  597.   // TBA: Send out the NAK packet to multicast with TTL 1 if the uplink
  598.   //      PGM router is not directly connected to this node.
  599. }
  600. void PgmReceiver::print_stats()
  601. {
  602.   printf("%s:n", uname_);
  603.   printf("tLast packet:tt%dn", rcvbuf_.nextpkt_-1);
  604.   printf("tMax packet:tt%dn", rcvbuf_.maxpkt_);
  605.   if (rcvbuf_.pkts_recovered_) {
  606.     printf("tPackets recovered:t%dn", rcvbuf_.pkts_recovered_);
  607.     printf("tLatency (min, max, avg):t%f, %f, %fn",
  608.    rcvbuf_.min_delay_, rcvbuf_.max_delay_,
  609.    rcvbuf_.delay_sum_ / rcvbuf_.pkts_recovered_);
  610.   }
  611.   if (rcvbuf_.duplicates_) {
  612.     printf("tDuplicate RDATA:t%dn", rcvbuf_.duplicates_);
  613.   }
  614.   printf("tTotal NAKs sent:t%dn", stats_.naks_transmitted_);
  615.   printf("tRetransmitted NAKs:t%dn", stats_.naks_duplicated_);
  616. }
  617. #ifdef PGM_DEBUG
  618. void PgmReceiver::display_packet(Packet *pkt)
  619. {
  620.   double now = Scheduler::instance().clock();
  621.   hdr_ip *hip = HDR_IP(pkt);
  622.   hdr_cmn *hc = HDR_CMN(pkt);
  623.   printf("at %f %s received packet type ", now, uname_);
  624.   hdr_pgm *hp = HDR_PGM(pkt);
  625.   
  626.   hdr_pgm_spm *hps;
  627.   hdr_pgm_nak *hpn;
  628.   switch(hp->type_) {
  629.   case PGM_SPM:
  630.     hps = HDR_PGM_SPM(pkt);
  631.     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
  632.     break;
  633.   case PGM_ODATA:
  634.     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  635.     break;
  636.   case PGM_RDATA:
  637.     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  638.     break;
  639.   case PGM_NAK:
  640.     hpn = HDR_PGM_NAK(pkt);
  641.     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
  642.     break;
  643.   case PGM_NCF:
  644.     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  645.     break;
  646.   default:
  647.     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  648.     break;
  649.   }
  650. }
  651. #endif // PGM_DEBUG
  652. #endif //HAVE_STL