pgm-agent.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:32k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /*
  2.  * pgm-agent.cc
  3.  * Copyright (C) 2001 by the University of Southern California
  4.  * $Id: pgm-agent.cc,v 1.10 2006/02/21 15:20:19 mahrenho Exp $
  5.  *
  6.  * This program is free software; you can redistribute it and/or
  7.  * modify it under the terms of the GNU General Public License,
  8.  * version 2, as published by the Free Software Foundation.
  9.  *
  10.  * This program is distributed in the hope that it will be useful,
  11.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.  * GNU General Public License for more details.
  14.  *
  15.  * You should have received a copy of the GNU General Public License along
  16.  * with this program; if not, write to the Free Software Foundation, Inc.,
  17.  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  18.  *
  19.  *
  20.  * The copyright of this module includes the following
  21.  * linking-with-specific-other-licenses addition:
  22.  *
  23.  * In addition, as a special exception, the copyright holders of
  24.  * this module give you permission to combine (via static or
  25.  * dynamic linking) this module with free software programs or
  26.  * libraries that are released under the GNU LGPL and with code
  27.  * included in the standard release of ns-2 under the Apache 2.0
  28.  * license or under otherwise-compatible licenses with advertising
  29.  * requirements (or modified versions of such code, with unchanged
  30.  * license).  You may copy and distribute such a system following the
  31.  * terms of the GNU GPL for this module and the licenses of the
  32.  * other code concerned, provided that you include the source code of
  33.  * that other code when and as the GNU GPL requires distribution of
  34.  * source code.
  35.  *
  36.  * Note that people who make modified versions of this module
  37.  * are not obligated to grant this special exception for their
  38.  * modified versions; it is their choice whether to do so.  The GNU
  39.  * General Public License gives permission to release a modified
  40.  * version without this exception; this exception also makes it
  41.  * possible to release a modified version which carries forward this
  42.  * exception.
  43.  *
  44.  */
  45. /*
  46.  * Pragmatic General Multicast (PGM), Reliable Multicast
  47.  *
  48.  * pgm-agent.cc
  49.  *
  50.  * This implements the network element PGM agent, "Agent/PGM".
  51.  *
  52.  * Ryan S. Barnett, 2001
  53.  * rbarnett@catarina.usc.edu
  54.  */
  55. #include "config.h"
  56. #ifdef HAVE_STL
  57. #include <stdlib.h>
  58. #include <stdio.h>
  59. #include <map>
  60. #include <list>
  61. #include <algorithm>
  62. #include "config.h"
  63. #include "tclcl.h"
  64. #include "agent.h"
  65. #include "packet.h"
  66. #include "ip.h"
  67. #include "random.h"
  68. #include "basetrace.h"
  69. #include "pgm.h"
  70. // ************************************************************
  71. // Register the PGM packet headers.
  72. // ************************************************************
  73. // Declare the static header offsets.
  74. int hdr_pgm::offset_;
  75. int hdr_pgm_spm::offset_;
  76. int hdr_pgm_nak::offset_;
  77. // Register the hdr_pgm with the packet header manager.
  78. class PGMHeaderClass : public PacketHeaderClass {
  79. public:
  80.   PGMHeaderClass() : PacketHeaderClass("PacketHeader/PGM", sizeof(hdr_pgm)) {
  81.     bind_offset(&hdr_pgm::offset_);
  82.   }
  83. } class_pgmhdr;
  84. // Register the hdr_pgm_spm with the packet header manager.
  85. class PGM_SPMHeaderClass : public PacketHeaderClass {
  86. public:
  87.   PGM_SPMHeaderClass() : PacketHeaderClass("PacketHeader/PGM_SPM",
  88.    sizeof(hdr_pgm_spm)) {
  89.     bind_offset(&hdr_pgm_spm::offset_);
  90.   }
  91. } class_pgm_spmhdr;
  92. // Register the hdr_pgm_nak with the packet header manager.
  93. class PGM_NAKHeaderClass : public PacketHeaderClass {
  94. public:
  95.   PGM_NAKHeaderClass() : PacketHeaderClass("PacketHeader/PGM_NAK",
  96.    sizeof(hdr_pgm_nak)) {
  97.     bind_offset(&hdr_pgm_nak::offset_);
  98.   }
  99. } class_pgm_nakhdr;
  100. // ************************************************************
  101. // Define the PGM Agent Timer Class
  102. // ************************************************************
  103. class PgmAgent;
  104. // Different timer types.
  105. enum {
  106.   TIMER_NAK_RETRANS = 0,
  107.   TIMER_NAK_RPT = 1,
  108.   TIMER_NAK_RDATA = 2,
  109.   TIMER_NAK_ELIM = 3
  110. };
  111. class PgmAgentTimer : public TimerHandler {
  112. public:
  113.   PgmAgentTimer(PgmAgent *a, int type) : TimerHandler(), data_(NULL) {
  114.     a_ = a;
  115.     type_ = type;
  116.   }
  117.   void * &data() { return data_; }
  118. protected:
  119.   virtual void expire(Event *e);
  120.   PgmAgent *a_;
  121.   int type_;
  122.   void *data_;
  123. };
  124. // ************************************************************
  125. // Define the Repair State control block.
  126. // ************************************************************
  127. // Different repair states.
  128. enum {
  129.   NAK_PENDING = 0,
  130.   NAK_CONFIRMED = 1
  131. };
  132. class StateInfo;
  133. class RepairState {
  134. public:
  135.   RepairState(PgmAgent *a, StateInfo *sinfo, int seqno, ns_addr_t &source,
  136.       ns_addr_t &group) : seqno_(seqno), source_(source),
  137.   group_(group),
  138.         nak_state_(NAK_PENDING), nak_elimination_(true),
  139. nak_retrans_timer_(a, TIMER_NAK_RETRANS),
  140. nak_rpt_timer_(a, TIMER_NAK_RPT),
  141. nak_rdata_timer_(a, TIMER_NAK_RDATA),
  142. nak_elim_timer_(a, TIMER_NAK_ELIM),
  143.         sinfo_(sinfo)
  144.   { }
  145.   int & seqno() { return seqno_; }
  146.   ns_addr_t & source() { return source_; }
  147.   ns_addr_t & group() { return group_; }
  148.   int & nak_state() { return nak_state_; }
  149.   bool & nak_elimination() { return nak_elimination_; }
  150.   PgmAgentTimer & nak_retrans_timer() { return nak_retrans_timer_; }
  151.   PgmAgentTimer & nak_rpt_timer() { return nak_rpt_timer_; }
  152.   PgmAgentTimer & nak_rdata_timer() { return nak_rdata_timer_; }
  153.   PgmAgentTimer & nak_elim_timer() { return nak_elim_timer_; }
  154.   list<int> & iface_list() { return iface_list_; }
  155.   list<NsObject *> & agent_list() { return agent_list_; }
  156.   StateInfo * & sinfo() { return sinfo_; }
  157. protected:
  158.   // Which sequence number is being requested for repair.
  159.   int seqno_;
  160.   ns_addr_t source_; // Original source of ODATA for the repair.
  161.   ns_addr_t group_; // The multicast group.
  162.   int nak_state_; // Present repair block state.
  163.   // Indicates whether or not we are to discard incoming NAK packets
  164.   // once a previous NAK is outstanding (got NCF, waiting for RDATA). 
  165.   // (See 7.4 of PGM specification) By default we do. When nak_elim_timer_
  166.   // expires, then we do not.
  167.   bool nak_elimination_;
  168.   // This timer controls sending retransmissions of NAK packets.
  169.   PgmAgentTimer nak_retrans_timer_;
  170.   // Timer that measures how long we can repeat NAK packets while waiting
  171.   // for NCF confirmation. Once expires, the repair state is removed.
  172.   PgmAgentTimer nak_rpt_timer_;
  173.   // Timer that is triggered waiting for RDATA for a given NAK seqno,
  174.   // provided that NAK has been confirmed by an NCF.  Only gets set
  175.   // once NCF is received.
  176.   PgmAgentTimer nak_rdata_timer_;
  177.   // Timer that is triggered when we disable nak_elimination_, allowing
  178.   // a duplicate NAK to be processed. This occurs after a previous
  179.   // NAK has been confirmed with an NCF, but before the RDATA has been
  180.   // received.  This timer gets set when we receive an NCF for a pending
  181.   // NAK.
  182.   PgmAgentTimer nak_elim_timer_;
  183.   // List of interfaces upon which the RDATA will be sent to.
  184.   list<int> iface_list_;
  185.   // List of agents upon which the RDATA will be sent to.
  186.   list<NsObject *> agent_list_;
  187.   // Back-pointer to the state information block that is holding this
  188.   // repair data.  We use this so we can get the upstream_path and the TSI.
  189.   StateInfo *sinfo_;
  190. };
  191. // ************************************************************
  192. // Define the TSI State control block.
  193. // ************************************************************
  194. class StateInfo {
  195. public:
  196.   StateInfo(ns_addr_t tsi) : tsi_(tsi), spm_seqno_(-1) { }
  197.   // Only used if the container holding StateInfo's will be in sorted order.
  198.   int operator<(const StateInfo &right) const {
  199.     return ((tsi_.addr_ < right.tsi_.addr_) || ( (tsi_.addr_==right.tsi_.addr_) && (tsi_.port_ < right.tsi_.port_)));
  200.   }
  201.   ns_addr_t & tsi() { return tsi_; }
  202.   int & spm_seqno() { return spm_seqno_; }
  203.   ns_addr_t & upstream_node() { return upstream_node_; }
  204.   int & upstream_iface() { return upstream_iface_; }
  205.   map<int, RepairState> & repair() { return repair_; }
  206. protected:
  207.   ns_addr_t tsi_; // Transport Session ID
  208.   int spm_seqno_; // Most recent SPM sequence number.
  209.   ns_addr_t upstream_node_; // Upstream node address.
  210.   int upstream_iface_; // Upstream interface number.
  211.   // Map between a NAK sequence number and the corresponding repair state
  212.   // for that sequence number.
  213.   map<int, RepairState> repair_;
  214. };
  215. // ************************************************************
  216. // Define the PGM Agent Class
  217. // ************************************************************
  218. // Structure to hold statistical information for PGM Agent.
  219. struct Stats {
  220.   int num_unsolicited_ncf_;
  221.   int num_unsolicited_rdata_;
  222.   int num_suppressed_naks_;
  223.   int num_naks_transmitted_;
  224. };
  225. // Used to count number of unique pgm agents.
  226. static int pgm_agent_uid_ = 0;
  227. class PgmAgent : public Agent {
  228. public:
  229.   PgmAgent();
  230.   virtual void recv(Packet *, Handler *);
  231.   virtual void timeout(int type, void *data);
  232.   virtual int command(int argc, const char*const* argv);
  233. protected:
  234.   void handle_spm(Packet *pkt);
  235.   void handle_odata(Packet *pkt);
  236.   void handle_rdata(Packet *pkt);
  237.   void handle_nak(Packet *pkt);
  238.   void handle_ncf(Packet *pkt);
  239.   void send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group);
  240.   void timeout_nak_retrans(RepairState *rstate);
  241.   void timeout_nak_rpt(RepairState *rstate);
  242.   void timeout_nak_rdata(RepairState *rstate);
  243.   void timeout_nak_elim(RepairState *rstate);
  244.   void remove_repair_state(RepairState *rstate);
  245.   void print_stats();
  246.   void trace_event(char *evType, double evTime);
  247. #ifdef PGM_DEBUG
  248.   void display_packet(Packet *pkt);
  249. #endif
  250.   NsObject* iface2link(int iface);
  251.   NsObject* pkt2agent (Packet *pkt);
  252.   StateInfo * find_TSI(ns_addr_t &tsi);
  253.   StateInfo * insert_TSI(ns_addr_t &tsi);
  254.   EventTrace * et_; //Trace Object for custom event trace
  255.   int pgm_enabled_; // Is this agent enabled? Default is YES.
  256.   char uname_[16]; // Agent's unique name.
  257.   Stats stats_; // Statistical information.
  258.   // TSI-indexed state control block list.
  259.   list<StateInfo> state_list_;
  260.   // Number of seconds to wait between retransmitting a NAK that is waiting
  261.   // for a NCF packet.
  262.   double nak_retrans_ival_;
  263.   // The length of time for which a network element will continue to repeat
  264.   // NAKs while waiting for a corresponding NCF.  Once this time expires and
  265.   // no NCF is received, then we remove the entire repair state.
  266.   double nak_rpt_ival_;
  267.   // The length of time for which a network element will wait for the
  268.   // corresponding RDATA before removing the entire repair state.
  269.   double nak_rdata_ival_;
  270.   // Once a NAK has been confirmed, network elements must discard all
  271.   // further NAKs for up to this length of time.  Should be a fraction
  272.   // of nak_rdata_ival_.
  273.   double nak_elim_ival_;
  274. };
  275. static class PgmClass : public TclClass {
  276. public:
  277.   PgmClass() : TclClass("Agent/PGM") {}
  278.   TclObject * create(int argc, const char * const * argv) {
  279.     return (new PgmAgent());
  280.   }
  281. } class_pgm_agent;
  282. void PgmAgentTimer::expire(Event *e) {
  283.   a_->timeout(type_, data_);
  284. }
  285. // Constructor.
  286. PgmAgent::PgmAgent() : Agent(PT_PGM), pgm_enabled_(1)
  287. {
  288.   // Set the unique identifier.
  289.   sprintf (uname_, "pgmAgent-%d", pgm_agent_uid_++);
  290.   // Initialize statistics.
  291.   stats_.num_unsolicited_ncf_ = 0;
  292.   stats_.num_unsolicited_rdata_ = 0;
  293.   stats_.num_suppressed_naks_ = 0;
  294.   stats_.num_naks_transmitted_ = 0;
  295.   bind("pgm_enabled_", &pgm_enabled_);
  296.   bind_time("nak_retrans_ival_", &nak_retrans_ival_);
  297.   bind_time("nak_rpt_ival_", &nak_rpt_ival_);
  298.   bind_time("nak_rdata_ival_", &nak_rdata_ival_);
  299.   bind_time("nak_elim_ival_", &nak_elim_ival_);
  300.   et_ = (EventTrace *) NULL;
  301. }
  302. // Code to execute when a packet is received.
  303. void PgmAgent::recv(Packet* pkt, Handler*)
  304. {
  305.   hdr_pgm *hp = HDR_PGM(pkt);
  306.   if (!pgm_enabled_) {
  307.     target_->recv(pkt);
  308.     return;
  309.   }
  310.   hdr_cmn *hc = HDR_CMN(pkt);
  311.   if (hc->ptype_ != PT_PGM) {
  312.     printf("%s ERROR (PgmAgent::recv): received non PGM pkt type %d, discarding.n", uname_, hc->ptype_);
  313.     Packet::free(pkt);
  314.     return;
  315.   }
  316. #ifdef PGM_DEBUG
  317.   display_packet(pkt);
  318. #endif
  319.   // Note, each handle function will free the packet itself or modify the
  320.   // headers and pass it on to the next NS object.
  321.   switch(hp->type_) {
  322.   case PGM_SPM:
  323.     handle_spm(pkt);
  324.     break;
  325.   case PGM_ODATA:
  326.     handle_odata(pkt);
  327.     break;
  328.   case PGM_RDATA:
  329.     handle_rdata(pkt);
  330.     break;
  331.   case PGM_NAK:
  332.     handle_nak(pkt);
  333.     break;
  334.   case PGM_NCF:
  335.     handle_ncf(pkt);
  336.     break;
  337.   default:
  338.     printf("ERROR (PgmAgent::recv): Received PGM packet with unknown type %d.n", hp->type_);
  339.     Packet::free(pkt);
  340.     break;
  341.   }
  342. }
  343. // Code to execute when a timeout occurs.
  344. void PgmAgent::timeout(int type, void *data)
  345. {
  346.   switch(type) {
  347.   case TIMER_NAK_RETRANS:
  348.     timeout_nak_retrans((RepairState *) data);
  349.     break;
  350.   case TIMER_NAK_RPT:
  351.     timeout_nak_rpt((RepairState *) data);
  352.     break;
  353.   case TIMER_NAK_RDATA:
  354.     timeout_nak_rdata((RepairState *) data);
  355.     break;
  356.   case TIMER_NAK_ELIM:
  357.     timeout_nak_elim((RepairState *) data);
  358.     break;
  359.   default:
  360.     printf("ERROR (PgmAgent::timeout()): Invalid timeout type %d.n", type);
  361.     break;
  362.   }
  363. }
  364. // Code to execute when a TCL command is issued to the PGM Agent object.
  365. int PgmAgent::command (int argc, const char*const* argv)
  366. {
  367.   //  Tcl& tcl = Tcl::instance();
  368.   if (argc == 2) {
  369.     if (strcmp(argv[1], "print-stats") == 0) {
  370.       print_stats();
  371.       return (TCL_OK);
  372.     }
  373.   }
  374.   else if (argc == 3) { //Set the Event Trace handle if Event Tracing is on
  375.     if (strcmp(argv[1], "eventtrace") == 0) {
  376.       et_ = (EventTrace *)TclObject::lookup(argv[2]);
  377.       return (TCL_OK);
  378.     }
  379.   }
  380.   return (Agent::command(argc, argv));
  381. }      
  382. void PgmAgent::trace_event(char *evType, double evTime) {
  383.   if (et_ == NULL) return;
  384.   char *wrk = et_->buffer();
  385.   char *nwrk = et_->nbuffer();
  386.   if (wrk != NULL) {
  387.     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
  388.             et_->round(Scheduler::instance().clock()),   
  389.             addr(),                    
  390.             0,                   
  391.             evType,                  
  392. evTime);
  393.   if (nwrk != 0)
  394.     sprintf(nwrk,
  395. "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
  396. et_->round(Scheduler::instance().clock()),   // time
  397. evType,                    // event type
  398. addr(),                       // owner (src) node id
  399. port(),                       // owner (src) port id
  400. 0,                      // dst node id
  401. 0                       // dst port id
  402. );
  403. et_->dump();
  404.   }
  405. }
  406. void PgmAgent::handle_spm(Packet *pkt)
  407. {
  408.   hdr_pgm *hp = HDR_PGM(pkt);
  409.   hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
  410.   hdr_cmn *hc = HDR_CMN(pkt);
  411.   // Use the TSI from the SPM packet and locate the proper state block.
  412.   StateInfo *state = find_TSI(hp->tsi_);
  413.   if (state == NULL) {
  414.     // There is no state block for this TSI. Create new state.
  415.     state = insert_TSI(hp->tsi_);
  416.     // Set the sequence number.
  417.     state->spm_seqno() = hp->seqno_;
  418.     // Set the upstream path.
  419.     state->upstream_node() = hs->spm_path_;
  420.     state->upstream_iface() = hc->iface();
  421.   }
  422.   else {
  423.     // State already exists for this TSI. Check if the sequence number is
  424.     // newer than the last recorded sequence number.
  425.     if ( state->spm_seqno() < hp->seqno_ ) {
  426.       // Update the SPM sequence number.
  427.       state->spm_seqno() = hp->seqno_;
  428.       // Set the upstream path.
  429.       state->upstream_node() = hs->spm_path_;
  430.       state->upstream_iface() = hc->iface();
  431.     }
  432.     else {
  433.       printf("%s received an old SPM seqno, discarding.n", uname_);
  434.       Packet::free(pkt);
  435.       return;
  436.     }
  437.   }
  438.   // Modify the SPM packet and set the upstream path to be equal to
  439.   // the address of this agent.
  440.   hs->spm_path_ = here_;
  441.   // Send the modified packet off to the rest of the multicast group.
  442.   send(pkt, 0);
  443. }
  444. void PgmAgent::handle_odata(Packet *pkt)
  445. {
  446.   // Pass the ODATA along to the rest of the multicast group.  ODATA
  447.   // does not cancel NAK forwarding.
  448.   //hdr_cmn *hc = HDR_CMN(pkt);
  449.   send(pkt, 0);
  450. }
  451. void PgmAgent::handle_rdata(Packet *pkt)
  452. {
  453.   // Look for the TSI for this RDATA packet.
  454.   hdr_pgm *hp = HDR_PGM(pkt);
  455.   //  hdr_ip *hip = HDR_IP(pkt);
  456.   StateInfo *state = find_TSI(hp->tsi_);
  457.   if (state == NULL) {
  458.     printf("%s received RDATA for which no SPM state is established, discarding.n", uname_);
  459.     stats_.num_unsolicited_rdata_++;
  460.     Packet::free(pkt);
  461.     return;
  462.   }
  463.   // Look for the repair state for this RDATA packet.
  464.   map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
  465.   if (result == state->repair().end()) {
  466.     // No repair state present for this RDATA packet.
  467.     printf("%s received RDATA for which no repair state is present, discarding.n", uname_);
  468.     stats_.num_unsolicited_rdata_++;
  469.     Packet::free(pkt);
  470.     return;
  471.   }
  472.   RepairState *rstate = &((*result).second);
  473.   // Get the interface list for the repair state. For each interface, send
  474.   // out the RDATA packet.  Similarly for each agent that is also receiving
  475.   // RDATA attached to this node.
  476.   if (rstate->iface_list().empty() && rstate->agent_list().empty()) {
  477.     printf("%s received RDATA but repair state has no interfaces for it, discarding.n", uname_);
  478.     stats_.num_unsolicited_rdata_++;
  479.     Packet::free(pkt);
  480.   }
  481.   NsObject *tgt;
  482.   Packet *new_pkt;
  483.   int flag = 0;
  484.   trace_event("SEND RDATA", 0); //Repair is being forwarded
  485.   //  hdr_cmn *hc = HDR_CMN(pkt);
  486.   if (!rstate->iface_list().empty()) {
  487.     list<int>::iterator iter = rstate->iface_list().begin();
  488.     while (iter != rstate->iface_list().end()) {
  489.       if (!flag) {
  490. tgt = iface2link(*iter);
  491. if (tgt == NULL) {
  492.   printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.n");
  493.   abort();
  494. }
  495. tgt->recv(pkt);
  496. flag = 1;
  497.       }
  498.       else {
  499. // Make a copy of each packet before sending it, so we don't free()
  500. // the same packet at the different receivers causing a deallocation
  501. // problem.
  502. new_pkt = pkt->copy();
  503. tgt = iface2link(*iter);
  504. if (tgt == NULL) {
  505.   printf("ERROR (PgmAgent::handle_rdata): iface2link returned NULL.n");
  506.   abort();
  507. }
  508. tgt->recv(new_pkt);
  509.       }
  510.       iter++;
  511.     }
  512.   }
  513.   if (!rstate->agent_list().empty()) {
  514.     list<NsObject *>::iterator iter = rstate->agent_list().begin();
  515.     while (iter != rstate->agent_list().end()) {
  516.       if (!flag) {
  517. (*iter)->recv(pkt);
  518. flag = 1;
  519.       }
  520.       else {
  521. // Make a copy of each packet before sending it, so we don't free()
  522. // the same packet at the different receivers causing a deallocation
  523. // problem.
  524. new_pkt = pkt->copy();
  525. (*iter)->recv(new_pkt);
  526.       }
  527.       iter++;
  528.     }
  529.   }
  530.   // Remove the repair state for this RDATA sequence number, since we sent
  531.   // out the repairs.
  532.   remove_repair_state(&((*result).second));
  533. }
  534. void PgmAgent::handle_nak(Packet *pkt)
  535. {
  536.   hdr_pgm *hp = HDR_PGM(pkt);
  537.   hdr_pgm_nak *hpn = HDR_PGM_NAK(pkt);
  538.   hdr_cmn *hc = HDR_CMN(pkt);
  539.   //  hdr_ip *hip = HDR_IP(pkt);
  540.   // Check to see if there is a state control block for the given TSI.
  541.   StateInfo *state = find_TSI(hp->tsi_);
  542.   if (state == NULL) {
  543.     printf("PGM Agent received NAK for which no SPM state is established, discarding.n");
  544.     Packet::free(pkt);
  545.     return;
  546.   }
  547.   // Create an NCF packet in response to the NAK packet.
  548.   Packet *ncf_pkt = allocpkt();
  549.   hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
  550.   ncf_hc->size_ = sizeof(hdr_pgm); // Size of NCF packet.
  551.   ncf_hc->ptype_ = PT_PGM;
  552.   hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
  553.   ncf_hp->type_ = PGM_NCF;
  554.   ncf_hp->tsi_ = hp->tsi_;
  555.   ncf_hp->seqno_ = hp->seqno_;
  556.   // Change the source of the NCF packet to be the original ODATA source.
  557.   hdr_ip *ncf_ip = HDR_IP(ncf_pkt);
  558.   ncf_ip->src() = hpn->source_;
  559.   // Set the destination to be the multicast group.
  560.   ncf_ip->dst() = hpn->group_;
  561.   // Set the color of NCF packets in nam to be green.
  562.   ncf_ip->fid_ = 6;
  563.   // Send out the NCF to the interface (or agent) for which the NAK was
  564.   // received.
  565.   NsObject *tgt;
  566.   if (hc->iface() < 0) {
  567.     tgt = pkt2agent(pkt);
  568.     if (tgt == NULL) {
  569.       printf("ERROR: (PgmAgent::handle_nak) pkt2agent returned NULL.n");
  570.       abort();
  571.     }
  572.     tgt->recv(ncf_pkt);
  573.   }
  574.   else {
  575.     tgt = iface2link(hc->iface());
  576.     if (tgt == NULL) {
  577.       printf("ERROR: (PgmAgent::handle_nak) iface2link returned NULL.n");
  578.       abort();
  579.     }
  580.     tgt->recv(ncf_pkt);
  581.   }
  582.   // Create repair state for the NAK query. Associate the sequence number
  583.   // of the NAK packet with the interface where the packet was received.
  584.   pair<map<int, RepairState>::iterator, bool> result;
  585.   result = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hpn->source_, hpn->group_)));
  586.   RepairState *rstate = &(result.first->second);
  587.   if (result.second == true) {
  588.     // There was no previous repair state for the given NAK seqno.
  589.     // This must be a new NAK.
  590.     // Set the data fields of the timer.
  591.     rstate->nak_retrans_timer().data() = rstate;
  592.     rstate->nak_rpt_timer().data() = rstate;
  593.     rstate->nak_rdata_timer().data() = rstate;
  594.     rstate->nak_elim_timer().data() = rstate;
  595.     // Add the interface (or agent) to the interface list.
  596.     if (hc->iface() < 0) {
  597.       rstate->agent_list().push_back(pkt2agent(pkt));
  598.     }
  599.     else {
  600.       rstate->iface_list().push_back(hc->iface());
  601.     }
  602.     // Start the nak retransmission cycle time.
  603.     rstate->nak_retrans_timer().resched(nak_retrans_ival_);
  604.     // Set the nak repeat interval.
  605.     rstate->nak_rpt_timer().resched(nak_rpt_ival_);
  606.     trace_event("SEND NACK", nak_rpt_ival_); //Nack being Sent, Nack will refire after ival
  607.     // Don't set the RDATA timer until the NCF is received.
  608.     // Don't set the elimintation timer until the NCF is received.
  609.     // We're now in the NAK_PENDING state.
  610.   }
  611.   else {
  612.     // There was previous repair state for the given NAK seqno.
  613.     if (hc->iface() < 0) {
  614.       // Scan the agent list to see if the agent is in the list already
  615.       // for this repair state.
  616.       list<NsObject *> *agent_list = &(rstate->agent_list());
  617.       list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
  618.       if (res == agent_list->end()) {
  619. agent_list->push_back(pkt2agent(pkt));
  620.       }
  621.     }
  622.     else {
  623.       // Scan the interface list to see if the interface is in the list
  624.       // already for this repair state.
  625.       list<int> *iface_list = &(rstate->iface_list());
  626.       list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
  627.       if (res == iface_list->end()) {
  628. // Interface not found in iface list for this NAK, add it.
  629. iface_list->push_back(hc->iface());
  630.       }
  631.     }
  632.     // If the NAK elimination timer has expired, then we are allowed to
  633.     // send another NAK to our upstream.
  634.     if (rstate->nak_elimination() == false) {
  635.       rstate->nak_state() = NAK_PENDING;
  636.       // Start the nak retransmission cycle time.
  637.       rstate->nak_retrans_timer().resched(nak_retrans_ival_);
  638.       rstate->nak_rpt_timer().resched(nak_rpt_ival_);
  639.       // Disable the rdata and elim timer if they were previously running.
  640.       rstate->nak_rdata_timer().force_cancel();
  641.       rstate->nak_elim_timer().force_cancel();
  642.       rstate->nak_elimination() = true;
  643. #ifdef PGM_DEBUG
  644.       printf("%s: Got NAK for seqno %d with previous NAK state, accepted.n",
  645.      uname_, hp->seqno_);
  646. #endif
  647.     }
  648.     else {
  649.       // NAK elimination requires us to not act on this duplicate NAK packet.
  650. #ifdef PGM_DEBUG
  651.       printf("%s: Got NAK for seqno %d but have previous NAK state, discarding.n", uname_, hp->seqno_);
  652. #endif
  653.       stats_.num_suppressed_naks_++;
  654.       Packet::free(pkt);
  655.       return;
  656.     }
  657.   }
  658.   stats_.num_naks_transmitted_++;
  659.   // Send the NAK packet to our upstream
  660.   send_nak(state->upstream_node(), hp->tsi_, hp->seqno_, hpn->source_, hpn->group_);
  661.   Packet::free(pkt);
  662. }
  663. void PgmAgent::handle_ncf(Packet *pkt)
  664. {
  665.   hdr_pgm *hp = HDR_PGM(pkt);
  666.   hdr_cmn *hc = HDR_CMN(pkt);
  667.   hdr_ip *hip = HDR_IP(pkt);
  668.   // Locate the state control block for this TSI.
  669.   StateInfo *state = find_TSI(hp->tsi_);
  670.   if (state == NULL) {
  671.     printf("%s received NCF for which no SPM state is established, discarding.n", uname_);
  672.     stats_.num_unsolicited_ncf_++;
  673.     Packet::free(pkt);
  674.     return;
  675.   }
  676.   if (hc->iface() != state->upstream_iface()) {
  677.     printf("%s received NCF from non-upstream interface, discarding.n", uname_);
  678.     stats_.num_unsolicited_ncf_++;
  679.     Packet::free(pkt);
  680.     return;
  681.   }
  682.   trace_event("SEND NCF", 0);
  683.   // Look for the repair state for this NCF packet.
  684.   map<int, RepairState>::iterator result = state->repair().find(hp->seqno_);
  685.   RepairState *rstate;
  686.   if (result == state->repair().end()) {
  687.     // No repair state present for this NCF packet.
  688.     // Since the interface for this NCF packet comes from the same interface
  689.     // used to reach our upstream node, we can create empty repair state.
  690.     // This is NAK Anticipation (see 7.5 in PGM specification).
  691.     pair<map<int, RepairState>::iterator, bool> res;
  692.     res = state->repair().insert(pair<int, RepairState>(hp->seqno_, RepairState(this, state, hp->seqno_, hip->src(), hip->dst())));
  693.     rstate = &(res.first->second);
  694.     // Set the data field of the timers.
  695.     rstate->nak_retrans_timer().data() = rstate;
  696.     rstate->nak_rpt_timer().data() = rstate;
  697.     rstate->nak_rdata_timer().data() = rstate;
  698.     rstate->nak_elim_timer().data() = rstate;
  699.     stats_.num_unsolicited_ncf_++;
  700.   }
  701.   else {
  702.     rstate = &((*result).second);
  703.     // Disable either of the retransmission or repeat interval timers since
  704.     // the NAK is confirmed.
  705.     rstate->nak_retrans_timer().force_cancel();
  706.     rstate->nak_rpt_timer().force_cancel();
  707.   }
  708.   rstate->nak_state() = NAK_CONFIRMED;
  709.   // Set/reset the rdata and elim timer to expire at the appropriate time.
  710.   rstate->nak_rdata_timer().resched(nak_rdata_ival_);
  711.   rstate->nak_elim_timer().resched(nak_elim_ival_);
  712.   Packet::free(pkt);
  713. }
  714. // Create and send a nak packet to the upstream path.
  715. void PgmAgent::send_nak(ns_addr_t &upstream_node, ns_addr_t &tsi, int seqno, ns_addr_t &source, ns_addr_t &group)
  716. {
  717. #ifdef PGM_DEBUG
  718.   double now = Scheduler::instance().clock();
  719.   printf("at %f %s sending NAK for seqno %d upstream.n", now, uname_, seqno);
  720. #endif
  721.   Packet *nak_pkt = allocpkt();
  722.   // Set the simulated size of the NAK packet.
  723.   hdr_cmn *nak_hc = HDR_CMN(nak_pkt);
  724.   nak_hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_nak);
  725.   nak_hc->ptype_ = PT_PGM;
  726.   // Set the destination address to be our upstream node.
  727.   hdr_ip *nak_hip = HDR_IP(nak_pkt);
  728.   nak_hip->dst() = upstream_node;
  729.   // Set the color of NAK packet to be black in nam.
  730.   nak_hip->fid_ = 8;
  731.   // Fill in the PGM header for the NAK packet.
  732.   hdr_pgm *nak_hp = HDR_PGM(nak_pkt);
  733.   nak_hp->type_ = PGM_NAK;
  734.   nak_hp->tsi_ = tsi;
  735.   nak_hp->seqno_ = seqno;
  736.   // Fill in the PGM NAK header for the NAK packet.
  737.   hdr_pgm_nak *nak_hpn = HDR_PGM_NAK(nak_pkt);
  738.   nak_hpn->source_ = source;
  739.   nak_hpn->group_ = group;
  740.   // Send out the packet.
  741.   send(nak_pkt, 0);
  742. }
  743. // Code that is executed when the nak retransmission timer expires.
  744. void PgmAgent::timeout_nak_retrans(RepairState *rstate)
  745. {
  746.   stats_.num_naks_transmitted_++;
  747.   // Send out a new NAK packet.
  748.   send_nak(rstate->sinfo()->upstream_node(), rstate->sinfo()->tsi(), rstate->seqno(), rstate->source(), rstate->group());
  749.   // Reset the retransmission timer.
  750.   rstate->nak_retrans_timer().resched(nak_retrans_ival_);
  751. }
  752. // Code that is executed when a repair state NAK RPT timer expires.
  753. void PgmAgent::timeout_nak_rpt(RepairState *rstate)
  754. {
  755.   printf("%s: timeout_nak_rpt expired, removing repair state.n", uname_);
  756.   // We never got a confirmation for our NAK packet. We must now
  757.   // remove the repair state entirely.
  758.   remove_repair_state(rstate);
  759. }
  760. // Code that is executed when a repair state NAK RDATA timer expires.
  761. void PgmAgent::timeout_nak_rdata(RepairState *rstate)
  762. {
  763.   printf("%s: timeout_nak_rdata expired, removing repair state.n", uname_);
  764.   // We never got the RDATA for our NAK. We must now remove the repair
  765.   // state entirely.
  766.   remove_repair_state(rstate);
  767. }
  768. // Code that is executed when a repair state NAK elimination timer expires.
  769. void PgmAgent::timeout_nak_elim(RepairState *rstate)
  770. {
  771.   // Allow one duplicate NAK to come in to be processed and forwarded.
  772.   rstate->nak_elimination() = false;
  773. }
  774. void PgmAgent::remove_repair_state(RepairState *rstate)
  775. {
  776.   // Cancel all timers.
  777.   rstate->nak_retrans_timer().force_cancel();
  778.   rstate->nak_rpt_timer().force_cancel();
  779.   rstate->nak_rdata_timer().force_cancel();
  780.   rstate->nak_elim_timer().force_cancel();
  781.   // Erase the repair state from the TSI repair map.
  782.   if (!rstate->sinfo()->repair().erase(rstate->seqno())) {
  783.     printf("ERROR (PgmAgent::remove_repair_state): Did not erase seqno from map.n");
  784.   }
  785. }
  786. NsObject* PgmAgent::iface2link (int iface)
  787. {
  788. //      Tcl::instance().evalf("%s get-outlink %d", name(), iface);
  789. //      char* ni = Tcl::instance().result();
  790.         Tcl&    tcl = Tcl::instance();
  791.         char    wrk[64];
  792. if (iface == -1) {
  793.   return NULL;
  794. }
  795.         sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
  796.         tcl.evalc (wrk);
  797.         const char* result = tcl.result ();
  798. #ifdef PGM_DEBUG
  799. printf ("[iface2link] agent %sn", result);
  800. #endif
  801.         NsObject* obj = (NsObject*)TclObject::lookup(result);
  802.         return (obj);
  803. }
  804. NsObject* PgmAgent::pkt2agent (Packet *pkt)
  805. {
  806.         Tcl&            tcl = Tcl::instance();
  807.         char            wrk[64];
  808.         const char     *result;
  809.         int             port;
  810.         NsObject*       agent;
  811.         hdr_ip*         ih = HDR_IP(pkt);
  812.         //nsaddr_t        src = ih->saddr();
  813.         port = ih->sport();
  814.         sprintf (wrk, "[%s set node_] agent %d", name (), port);
  815.         tcl.evalc (wrk);
  816.         result = tcl.result ();
  817. #ifdef PGM_DEBUG
  818. printf ("[pkt2agent] port %d, agent %sn", port, result);
  819. #endif
  820.         agent = (NsObject*)TclObject::lookup (result);
  821.         return (agent);
  822. }
  823. // Find the state control block given a TSI. Returns NULL if not found.
  824. StateInfo * PgmAgent::find_TSI(ns_addr_t &tsi)
  825. {
  826.   // Use the TSI from the SPM packet and locate the proper state block.
  827.   list<StateInfo>::iterator iter = state_list_.begin();
  828.   while(iter != state_list_.end()) {
  829.     if ( (*iter).tsi().isEqual (tsi) ) {
  830.       return &(*iter);
  831.     }
  832.     iter++;
  833.   }
  834.   return NULL;
  835. }
  836. // Insert a new state control block for the given TSI, and return a pointer
  837. // to the control block.
  838. StateInfo * PgmAgent::insert_TSI(ns_addr_t &tsi)
  839. {
  840.   state_list_.push_back(StateInfo(tsi));
  841.   return &(state_list_.back());
  842. }
  843. void PgmAgent::print_stats()
  844. {
  845.   printf("%s:n", uname_);
  846.   printf("tNAKs Transmitted: t%dn", stats_.num_naks_transmitted_);
  847.   printf("tNAKs Suppressed: t%dn", stats_.num_suppressed_naks_);
  848.   printf("tUnsolicited NCFs: t%dn", stats_.num_unsolicited_ncf_);
  849.   printf("tUnsolicited RDATA: t%dn", stats_.num_unsolicited_rdata_);
  850. }
  851. #ifdef PGM_DEBUG
  852. void PgmAgent::display_packet(Packet *pkt)
  853. {
  854.   double now = Scheduler::instance().clock();
  855.   hdr_ip *hip = HDR_IP(pkt);
  856.   hdr_cmn *hc = HDR_CMN(pkt);
  857.   printf("at %f %s received packet type ", now, uname_);
  858.   hdr_pgm *hp = HDR_PGM(pkt);
  859.   
  860.   hdr_pgm_spm *hps;
  861.   hdr_pgm_nak *hpn;
  862.   switch(hp->type_) {
  863.   case PGM_SPM:
  864.     hps = HDR_PGM_SPM(pkt);
  865.     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
  866.     break;
  867.   case PGM_ODATA:
  868.     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  869.     break;
  870.   case PGM_RDATA:
  871.     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  872.     break;
  873.   case PGM_NAK:
  874.     hpn = HDR_PGM_NAK(pkt);
  875.     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
  876.     break;
  877.   case PGM_NCF:
  878.     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  879.     break;
  880.   default:
  881.     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  882.     break;
  883.   }
  884. }
  885. #endif // PGM_DEBUG
  886. #endif //HAVE_STL