pgm-sender.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:23k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /*
  2.  * pgm-sender.cc
  3.  * Copyright (C) 2001 by the University of Southern California
  4.  * $Id: pgm-sender.cc,v 1.12 2006/02/21 15:20:19 mahrenho Exp $
  5.  *
  6.  * This program is free software; you can redistribute it and/or
  7.  * modify it under the terms of the GNU General Public License,
  8.  * version 2, as published by the Free Software Foundation.
  9.  *
  10.  * This program is distributed in the hope that it will be useful,
  11.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  13.  * GNU General Public License for more details.
  14.  *
  15.  * You should have received a copy of the GNU General Public License along
  16.  * with this program; if not, write to the Free Software Foundation, Inc.,
  17.  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  18.  *
  19.  *
  20.  * The copyright of this module includes the following
  21.  * linking-with-specific-other-licenses addition:
  22.  *
  23.  * In addition, as a special exception, the copyright holders of
  24.  * this module give you permission to combine (via static or
  25.  * dynamic linking) this module with free software programs or
  26.  * libraries that are released under the GNU LGPL and with code
  27.  * included in the standard release of ns-2 under the Apache 2.0
  28.  * license or under otherwise-compatible licenses with advertising
  29.  * requirements (or modified versions of such code, with unchanged
  30.  * license).  You may copy and distribute such a system following the
  31.  * terms of the GNU GPL for this module and the licenses of the
  32.  * other code concerned, provided that you include the source code of
  33.  * that other code when and as the GNU GPL requires distribution of
  34.  * source code.
  35.  *
  36.  * Note that people who make modified versions of this module
  37.  * are not obligated to grant this special exception for their
  38.  * modified versions; it is their choice whether to do so.  The GNU
  39.  * General Public License gives permission to release a modified
  40.  * version without this exception; this exception also makes it
  41.  * possible to release a modified version which carries forward this
  42.  * exception.
  43.  *
  44.  */
  45. /*
  46.  * Pragmatic General Multicast (PGM), Reliable Multicast
  47.  *
  48.  * pgm-sender.cc
  49.  *
  50.  * This implements the Sending PGM agent, "Agent/PGM/Sender".
  51.  *
  52.  * Ryan S. Barnett, 2001
  53.  * rbarnett@catarina.usc.edu
  54.  */
  55. #include "config.h"
  56. #ifdef HAVE_STL
  57. #include <stdlib.h>
  58. #include <stdio.h>
  59. /* Standard Template Library */
  60. #include <map>
  61. #include <list>
  62. #include <algorithm>
  63. #include "config.h"
  64. #include "tclcl.h"
  65. #include "agent.h"
  66. #include "packet.h"
  67. #include "ip.h"
  68. #include "random.h"
  69. #include "basetrace.h"
  70. #include "pgm.h"
  71. // ************************************************************
  72. // Define the PGM Sender Timer Class
  73. // ************************************************************
  74. class PgmSender;
  75. /* Define the different timer types. */
  76. enum {
  77.   TIMER_SPM = 0,
  78.   TIMER_RDATA = 1
  79. };
  80. class PgmSenderTimer : public TimerHandler {
  81. public:
  82.   PgmSenderTimer(PgmSender *a, int type) : TimerHandler(), data_(NULL) {
  83.     a_ = a;
  84.     type_ = type;
  85.   }
  86.   int & type() { return type_; }
  87.   void * &data() { return data_; }
  88. protected:
  89.   virtual void expire(Event *e);
  90.   PgmSender *a_;
  91.   int type_;
  92.   void *data_;
  93. };
  94. // Bundles an RDATA packet with a timer (for sending the RDATA packet),
  95. // and a list of interfaces for which the packet should be sent to.
  96. class RdataItem {
  97. public:
  98.   RdataItem(int seqno, PgmSender *a, Packet *rdata_pkt = NULL) :
  99.     seqno_(seqno), rdata_pkt_(rdata_pkt), rdata_timer_(a, TIMER_RDATA) { }
  100.   // Allow direct access to the private data.
  101.   int & seqno() { return seqno_; }
  102.   Packet * & rdata_pkt() { return rdata_pkt_; }
  103.   PgmSenderTimer & rdata_timer() { return rdata_timer_; }
  104.   list<int> & iface_list() { return iface_list_; }
  105.   list<NsObject *> & agent_list() { return agent_list_; }
  106. protected:
  107.   // The sequence number of this RDATA item.
  108.   int seqno_;
  109.   // The RDATA packet itself.
  110.   Packet *rdata_pkt_;
  111.   // The timer responsible for sending out this RDATA packet.
  112.   PgmSenderTimer rdata_timer_;
  113.   // The list of interfaces for which this packet must be sent to.
  114.   list<int> iface_list_;
  115.   // The list of agents for which this packet must be sent to.
  116.   list<NsObject *> agent_list_;
  117. };
  118. // A class used to keep track of duplicate reply requests.
  119. class ReplyItem {
  120. public:
  121.   ReplyItem(int seqno) : seqno_(seqno), retransmissions_(0) { }
  122.   int & seqno() { return seqno_; }
  123.   int & retransmissions() { return retransmissions_; }
  124. protected:
  125.   int seqno_;
  126.   int retransmissions_;
  127. };
  128. // Largest size we will allow the reply list to grow.
  129. const int MAX_REPLY_LIST_SIZE = 100;
  130. // Miscellaneous statistical information gathered during simulation.
  131. struct Stats {
  132.   int num_naks_received_;
  133.   int num_rdata_sent_;
  134.   int max_num_repeated_rdata_;
  135. };
  136. // ************************************************************
  137. // Define the PGM Sender Class
  138. // ************************************************************
  139. static int pgm_snd_uid_ = 0;
  140. class PgmSender: public Agent {
  141. public:
  142.   PgmSender();
  143.   virtual void recv(Packet *, Handler *);
  144.   virtual void timeout(int type, void *data);
  145.   virtual int command(int argc, const char*const* argv);
  146.   virtual void sendmsg(int nbytes, const char *flags = 0);
  147. protected:
  148.   virtual void start(); // Starts the SPM heartbeats.
  149.   virtual void stop(); // Stops the SPM heartbeats.
  150.   virtual void handle_nak(Packet *pkt); // Process a NAK packet.
  151.   virtual void send_spm(); // Sends an SPM packet to the multicast group.
  152.   virtual void send_rdata(RdataItem *pkt); // Sends the given RDATA packet.
  153.   NsObject* iface2link(int iface);
  154.   NsObject* pkt2agent(Packet *pkt);
  155.   void print_stats();
  156.   void display_packet(Packet *pkt); // For debugging.
  157.   void trace_event(char *evType, nsaddr_t daddr, double evTime); 
  158.   EventTrace * et_;  //Trace Object for custom Event Traces
  159.   Stats stats_; // Keep track of various statistics.
  160.   char uname_[16]; // Unique PGM sender name.
  161.   // Map the sequence number of a NAK (requested RDATA) with an item
  162.   // that represents the RDATA packet including which interfaces the
  163.   // RDATA should be sent to, along with a timer that is used to trigger
  164.   // sending of the RDATA packet.
  165.   map<int, RdataItem> pending_rdata_;
  166.   // A list to keep track of the number of retransmissions for a given
  167.   // RDATA reply.  The max size this will grow is MAX_REPLY_LIST_SIZE.
  168.   list<ReplyItem> reply_;
  169.   // The 'typicial' size of a data packet including header. This should
  170.   // get set automatically from the application calling sendmsg().  We
  171.   // make a simplifying assumption that all packet sizes in a session are
  172.   // of equal size.
  173.   int pktSize_;
  174.   PgmSenderTimer spm_heartbeat_; // Timer for sending out SPM packets.
  175.   int spm_running_; // Whether the heartbeats are running or not.
  176.   double spm_interval_; // Time between SPM packets (in seconds).
  177.   // Time to delay sending out an RDATA in response to a NAK packet, this
  178.   // is to allow slow NAKs to get processed at one time, so we don't send
  179.   // out duplicate RDATA.
  180.   double rdata_delay_;
  181.   int odata_seqno_; // Current ODATA sequence number.
  182.   int spm_seqno_; // Current SPM sequence number.
  183.   //  nsaddr_t group_; // The multicast group we send to.
  184. };
  185. void PgmSenderTimer::expire(Event *e) {
  186.   a_->timeout(type_, data_);
  187. }
  188. static class PgmSenderClass : public TclClass {
  189. public:
  190.   PgmSenderClass() : TclClass("Agent/PGM/Sender") {}
  191.   TclObject * create(int argc, const char * const * argv) {
  192.     return (new PgmSender());
  193.   }
  194. } class_pgm_sender;
  195. // Constructor.
  196. PgmSender::PgmSender() : Agent(PT_PGM), pktSize_(0),
  197.  spm_heartbeat_(this, TIMER_SPM),
  198.  spm_running_(0), odata_seqno_(-1), spm_seqno_(-1)
  199. {
  200.   stats_.num_naks_received_ = 0;
  201.   stats_.num_rdata_sent_ = 0;
  202.   stats_.max_num_repeated_rdata_ = 0;
  203.   sprintf(uname_, "pgmSender-%d", pgm_snd_uid_++);
  204.   bind_time("spm_interval_", &spm_interval_);
  205.   bind_time("rdata_delay_", &rdata_delay_);
  206.   et_ = (EventTrace *) NULL;
  207. }
  208. // Code that is called when a packet is received.
  209. void PgmSender::recv(Packet *pkt, Handler *)
  210. {
  211.   hdr_cmn* hc = HDR_CMN(pkt);
  212.   if (hc->ptype_ == PT_PGM) {
  213. #ifdef PGM_DEBUG
  214.     display_packet(pkt);
  215. #endif
  216.     // Identify the type of PGM packet, if it is a NAK process it, otherwise
  217.     // throw an error.
  218.     hdr_pgm *hp = HDR_PGM(pkt);
  219.     if (hp->type_ == PGM_NAK) {
  220.       handle_nak(pkt);
  221.     }
  222.     else {
  223.       printf("ERROR (PgmSender::handle_pgm_pkt): received unexpected PGM packet type %d, discarding.n", hp->type_);
  224.     }
  225.   }
  226.   else {
  227.     printf ("%s ERROR (PgmSender::recv): received non PGM pkt type %d, discarding.n", uname_, hc->ptype_);
  228.   }
  229.   // Free all packets that this agent receives.
  230.   Packet::free(pkt);
  231. }
  232. // Code that is called when a timer expires.
  233. void PgmSender::timeout(int type, void *data)
  234. {
  235.   switch(type) {
  236.   case TIMER_SPM:
  237.     if (spm_running_) {
  238.       send_spm();
  239.       spm_heartbeat_.resched(spm_interval_);
  240.     }
  241.     break;
  242.   case TIMER_RDATA:
  243.     send_rdata((RdataItem *)data);
  244.     break;
  245.   default:
  246.     printf("ERROR (PgmSender::timeout): invalid timeout type.n");
  247.     break;
  248.   }
  249. }
  250. // Code that is called when a TCL command is issued to the PGM Sender object.
  251. int PgmSender::command(int argc, const char*const* argv)
  252. {
  253.   if (argc == 2) {
  254.     if (strcmp(argv[1], "start-SPM") == 0) {
  255.       start();
  256.       return (TCL_OK);
  257.     }
  258.     if (strcmp(argv[1], "stop-SPM") == 0) {
  259.       stop();
  260.       return (TCL_OK);
  261.     }
  262.     if (strcmp(argv[1], "print-stats") == 0) {
  263.       print_stats();
  264.       return (TCL_OK);
  265.     }
  266.   }
  267.   else if (argc == 3) {  //If Event Trace is on, set the Event trace handle
  268.     if (strcmp(argv[1], "eventtrace") == 0) {
  269.       et_ = (EventTrace *)TclObject::lookup(argv[2]);
  270.       return (TCL_OK);
  271.     }
  272.   }
  273.   return (Agent::command(argc, argv));
  274. }
  275. void PgmSender::trace_event(char *evType, nsaddr_t daddr, double evTime) {
  276.   if (et_ == NULL) return;
  277.   char *wrk = et_->buffer();
  278.   char *nwrk = et_->nbuffer();
  279.   if (wrk != NULL) {
  280.     sprintf(wrk, "E "TIME_FORMAT" %d %d PGM %s "TIME_FORMAT, 
  281.             et_->round(Scheduler::instance().clock()),   
  282.             addr(),                    
  283.             daddr,                   
  284.             evType,                  
  285. evTime);
  286.   if (nwrk != 0)
  287.     sprintf(nwrk,
  288. "E -t "TIME_FORMAT" -o PGM -e %s -s %d.%d -d %d.%d",
  289. et_->round(Scheduler::instance().clock()),   // time
  290. evType,                    // event type
  291. addr(),                       // owner (src) node id
  292. port(),                       // owner (src) port id
  293. daddr,                      // dst node id
  294. 0                       // dst port id
  295. );
  296. et_->dump();
  297.   }
  298. }
  299. // The application calls this function to send out new ODATA (original DATA).
  300. void PgmSender::sendmsg(int nbytes, const char *flags /* = 0 */)
  301. {
  302.   odata_seqno_++;
  303. #ifdef PGM_DEBUG
  304.   double now = Scheduler::instance().clock();
  305.   printf("at %f %s sending ODATA seqno %dn", now, uname_, odata_seqno_);
  306. #endif
  307.   // Create a packet with the given ODATA.
  308.   Packet *pkt = allocpkt();
  309.   // Set the simulated size of the packet to the indicated nbytes.
  310.   hdr_cmn *hc = HDR_CMN(pkt);
  311.   pktSize_ = nbytes + sizeof(hdr_pgm);
  312.   hc->size_ = pktSize_;
  313.   hc->ptype_ = PT_PGM;
  314.   // Fill in the PGM header.
  315.   hdr_pgm *hp = HDR_PGM(pkt);
  316.   hp->type_ = PGM_ODATA;
  317.   hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
  318.   hp->seqno_ = odata_seqno_;
  319.   hdr_ip *hip = HDR_IP(pkt);
  320.   // Set the color for ODATA packets.
  321.   hip->fid_ = 1;
  322.   // Send out the packet.
  323.   send(pkt, 0);
  324. }
  325. void PgmSender::start()
  326. {
  327.   spm_running_ = 1;
  328.   send_spm();
  329.   spm_heartbeat_.resched(spm_interval_);
  330. }
  331. void PgmSender::stop()
  332. {
  333.   spm_heartbeat_.cancel();
  334.   spm_running_ = 0;
  335.   Tcl::instance().evalf("%s done", this->name());
  336. }
  337. // Process a NAK packet.
  338. void PgmSender::handle_nak(Packet *pkt)
  339. {
  340.   hdr_cmn *hc = HDR_CMN(pkt);
  341.   hdr_pgm *hp = HDR_PGM(pkt);
  342.   //hdr_pgm_nak *pnak = HDR_PGM_NAK(pkt);
  343.   if (!(hp->tsi_.isEqual (here_))) {
  344.     printf("%s received NAK with wrong TSI, discarding.n", uname_);
  345.     return;
  346.   }
  347.   stats_.num_naks_received_++;
  348.   // Create the NCF packet.
  349.   Packet *ncf_pkt = allocpkt();
  350.   // Set the simulated size of the NCF packet.
  351.   hdr_cmn *ncf_hc = HDR_CMN(ncf_pkt);
  352.   ncf_hc->size_ = sizeof(hdr_pgm);
  353.   ncf_hc->ptype_ = PT_PGM;
  354.   // Fill in the PGM header for the NCF packet.
  355.   hdr_pgm *ncf_hp = HDR_PGM(ncf_pkt);
  356.   ncf_hp->type_ = PGM_NCF;
  357.   ncf_hp->tsi_ = here_;
  358.   ncf_hp->seqno_ = hp->seqno_;
  359.   hdr_ip *ncf_hip = HDR_IP(ncf_pkt);
  360.   // Set the color for NCF packets in nam.
  361.   ncf_hip->fid_ = 6;
  362.   // Immediately send the NCF packet to the interface where the NAK
  363.   // packet was received. If the packet came from another agent attached
  364.   // to this node, then send the packet to that agent.
  365.   NsObject *tgt;
  366.   if (hc->iface() < 0) {
  367.     tgt = pkt2agent(pkt);
  368.   }
  369.   else {
  370.     tgt = iface2link(hc->iface());
  371.   }
  372.   if (tgt == NULL) {
  373.     printf("ERROR (PgmSender::handle_nak): iface2link returned NULL.n");
  374.     abort();
  375.   }
  376.   tgt->recv(ncf_pkt);
  377.   // Queue up an RDATA packet to be transferred to the requestor on the
  378.   // appropriate interface.
  379.   // Attempt to locate this NAK sequence number on the pending RDATA map.
  380.   pair<map<int, RdataItem>::iterator, bool> result;
  381.   result = pending_rdata_.insert(pair<int, RdataItem>(hp->seqno_, RdataItem(hp->seqno_, this)));
  382.   RdataItem *ritem = &(result.first->second);
  383.   if (result.second == true) {
  384.     // The entry was added to the map.
  385.     // Set the data field of the timer.
  386.     ritem->rdata_timer().data() = ritem;
  387.     // There is NO pending RDATA for the indicated sequence number.
  388.     // Create the RDATA packet.
  389.     Packet *rdata_pkt = allocpkt();
  390.     // Set the simulated size of the RDATA packet to the typicial data size.
  391.     hdr_cmn *rdata_hc = HDR_CMN(rdata_pkt);
  392.     rdata_hc->size_ = pktSize_;
  393.     rdata_hc->ptype_ = PT_PGM;
  394.     // Fill in the PGM header for RDATA packet.
  395.     hdr_pgm *rdata_hp = HDR_PGM(rdata_pkt);
  396.     rdata_hp->type_ = PGM_RDATA;
  397.     rdata_hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
  398.     rdata_hp->seqno_ = hp->seqno_;
  399.     hdr_ip *rdata_hip = HDR_IP(rdata_pkt);
  400.     // Set the color for RDATA packets in nam.
  401.     rdata_hip->fid_ = 3;
  402.     // Place the new packet into the RdataItem in the map.
  403.     ritem->rdata_pkt() = rdata_pkt;
  404.     // Set the timer to go off at rdata_delay_ seconds from now.
  405.     ritem->rdata_timer().resched(rdata_delay_);
  406. //Output Event Trace, Repair will be sent after rdata_delay_
  407.     trace_event("REPAIR BACKOFF", rdata_hip->daddr(), rdata_delay_);
  408.     if (hc->iface() < 0) {
  409.       // The NAK was sent from a local agent attached to this node. Keep
  410.       // track of which agent this is.
  411.       ritem->agent_list().push_back(pkt2agent(pkt));
  412.     }
  413.     else {
  414.       // Set the interface number for this RdataItem.
  415.       ritem->iface_list().push_back(hc->iface());
  416.     }
  417.   }
  418.   else {
  419.     // Seqno entry already exists in the map.
  420.     // The RDATA is already pending.
  421.     if (hc->iface() < 0) {
  422.       // Scan the agent list to see if the agent is already registered
  423.       // for this RDATA.
  424.       list<NsObject *> *agent_list = &(ritem->agent_list());
  425.       list<NsObject *>::iterator res = find(agent_list->begin(), agent_list->end(), pkt2agent(pkt));
  426.       if (res == agent_list->end()) {
  427. // Agent not found in agent list for this RDATA, add it.
  428. agent_list->push_back(pkt2agent(pkt));
  429.       }
  430.       else {
  431. printf("%s: NAK received and already had NAK state for that same agent.n", uname_);
  432.       }
  433.     }
  434.     else {
  435.       // Scan the interface list to see if the interface is already registered
  436.       // for this RDATA.
  437.       list<int> *iface_list = &(ritem->iface_list());
  438.       list<int>::iterator res = find(iface_list->begin(), iface_list->end(), hc->iface());
  439.       if (res == iface_list->end()) {
  440. // Interface not found in iface list for this RDATA, add it.
  441. iface_list->push_back(hc->iface());
  442.       }
  443.       else {
  444. // Interface already present in the iface list for this RDATA,
  445. // therefore this NAK is a duplicate.
  446. printf("%s: NAK received and already had NAK state for that same interface.n", uname_);
  447.       }
  448.     }
  449.   }
  450. }
  451. // Send out a new SPM packet to the multicast group.
  452. void PgmSender::send_spm()
  453. {
  454.   spm_seqno_++;
  455.   // Create a packet with the given ODATA.
  456.   Packet *pkt = allocpkt();
  457.   // Set the simulated size of the packet to the indicated nbytes.
  458.   hdr_cmn *hc = HDR_CMN(pkt);
  459.   hc->size_ = sizeof(hdr_pgm) + sizeof(hdr_pgm_spm);
  460.   hc->ptype_ = PT_PGM;
  461.   hdr_ip *hip = HDR_IP(pkt);
  462.   // Set the color for SPM packets in nam.
  463.   hip->fid_ = 7;
  464.   //  hip->daddr() = group_;
  465.   // Fill in the PGM header.
  466.   hdr_pgm *hp = HDR_PGM(pkt);
  467.   hp->type_ = PGM_SPM;
  468.   hp->tsi_ = here_; // Set transport session ID to addr/port of this agent.
  469.   hp->seqno_ = spm_seqno_;
  470.   // Fill in SPM header.
  471.   hdr_pgm_spm *hs = HDR_PGM_SPM(pkt);
  472.   hs->spm_path_ = here_; // Set current path to the source agent.
  473. #ifdef PGM_DEBUG
  474.   double now = Scheduler::instance().clock();
  475.   printf("at %f %s sending SPM, from %d:%d (here = %d:%d) to %d:%d, TSI %d:%d, type %dn", now, uname_, hip->saddr(), hip->sport(), addr(), port(), hip->daddr(), hip->dport(), hp->tsi_.addr_, hp->tsi_.port_, hp->type_);
  476. #endif
  477.   // Send out the packet.
  478.   send(pkt, 0);
  479. }
  480. // Send out the given RDATA packet. The packet should be already created
  481. // and headers filled in.  This is triggered when the timer expires.
  482. void PgmSender::send_rdata(RdataItem *item)
  483. {
  484.   // Locate the sequence number of this rdata in the list of sent
  485.   // retransmissions.
  486.   int count = 0;
  487.   list<ReplyItem>::iterator iter = reply_.begin();
  488.   while (iter != reply_.end()) {
  489.     if ((*iter).seqno() == item->seqno()) {
  490.       (*iter).retransmissions() += 1;
  491.       if ((*iter).retransmissions() > stats_.max_num_repeated_rdata_) {
  492. stats_.max_num_repeated_rdata_ = (*iter).retransmissions();
  493.       }
  494.       break;
  495.     }
  496.     count++;
  497.     iter++;
  498.   }
  499.   if (iter == reply_.end()) {
  500.     // This is the first time we're sending out this RDATA. Append it to
  501.     // the back of the reply list.
  502.     if (count >= MAX_REPLY_LIST_SIZE) {
  503.       // Pop off the front-most element if we've reached the max size of
  504.       // the reply list.
  505.       reply_.pop_front();
  506.     }
  507.     reply_.push_back(ReplyItem(item->seqno()));
  508.   }
  509.   stats_.num_rdata_sent_++;
  510.   // Send the packet to each of the interfaces.
  511.   if (!item->iface_list().empty()) {
  512.     list<int>::iterator iter = item->iface_list().begin();
  513.     int flag = 0;   // Used to determine when we need to make additional copies of the packet.
  514.     while (iter != item->iface_list().end()) {
  515.       NsObject *tgt;
  516.       tgt = iface2link(*iter);
  517.       Packet *pkt = item->rdata_pkt();
  518.       if (flag) {
  519. // Make a copy of each packet before sending it, so we don't free()
  520. // the same packet at the different receivers causing a deallocation
  521. // problem.
  522. pkt = pkt->copy();
  523.       } else {
  524. trace_event("SEND RDATA", HDR_IP(pkt)->daddr(), 0);
  525. flag = 1;
  526.       }
  527.       tgt->recv(pkt);
  528.       iter++;
  529.     }
  530.   }
  531.   if (!item->agent_list().empty()) {
  532.     list<NsObject *>::iterator iter = item->agent_list().begin();
  533.     int flag = 0;
  534.     while (iter != item->agent_list().end()) {
  535.       Packet *pkt = item->rdata_pkt();
  536.       if (flag) {
  537. pkt = pkt->copy ();
  538.       } else {
  539. flag = 1;
  540.       }
  541.       (*iter)->recv(pkt);
  542.       iter++;
  543.     }
  544.   }
  545.   hdr_pgm *hp = HDR_PGM(item->rdata_pkt());
  546.   // Remove this sequence number from the pending RDATA list, since
  547.   // we have now sent that RDATA.
  548.   if (!pending_rdata_.erase(hp->seqno_)) {
  549.     printf("ERROR (PgmSender::send_rdata): Did not erase RdataItem from map.n");
  550.   }
  551. }
  552. NsObject* PgmSender::iface2link (int iface)
  553. {
  554. //      Tcl::instance().evalf("%s get-outlink %d", name(), iface);
  555. //      char* ni = Tcl::instance().result();
  556.         Tcl&    tcl = Tcl::instance();
  557.         char    wrk[64];
  558. if (iface == -1) {
  559.   return NULL;
  560. }
  561.         sprintf (wrk, "[%s set node_] ifaceGetOutLink %d", name (), iface);
  562.         tcl.evalc (wrk);
  563.         const char* result = tcl.result ();
  564. #ifdef PGM_DEBUG
  565. printf ("[iface2link] agent %sn", result);
  566. #endif
  567.         NsObject* obj = (NsObject*)TclObject::lookup(result);
  568.         return (obj);
  569. }
  570. NsObject* PgmSender::pkt2agent (Packet *pkt)
  571. {
  572.         Tcl&            tcl = Tcl::instance();
  573.         char            wrk[64];
  574.         const char            *result;
  575.         int             port;
  576.         NsObject*       agent;
  577.         hdr_ip*         ih = HDR_IP(pkt);
  578.         //nsaddr_t        src = ih->saddr();
  579.         port = ih->sport();
  580.         sprintf (wrk, "[%s set node_] agent %d", name (), port);
  581.         tcl.evalc (wrk);
  582.         result = tcl.result ();
  583. #ifdef PGM_DEBUG
  584. printf ("[pkt2agent] port %d, agent %sn", port, result);
  585. #endif
  586.         agent = (NsObject*)TclObject::lookup (result);
  587.         return (agent);
  588. }
  589. void PgmSender::print_stats()
  590. {
  591.   printf("%sn", uname_);
  592.   printf("tLast ODATA seqno: %dn", odata_seqno_);
  593.   printf("tLast SPM seqno: %dn", spm_seqno_);
  594.   printf("tNumber of NAKs received: %dn", stats_.num_naks_received_);
  595.   printf("tNumber of RDATA transmitted: %dn", stats_.num_rdata_sent_);
  596.   printf("tMax retransmission count for a single RDATA: %dn", stats_.max_num_repeated_rdata_);
  597. }
  598. #ifdef PGM_DEBUG
  599. void PgmSender::display_packet(Packet *pkt)
  600. {
  601.   double now = Scheduler::instance().clock();
  602.   hdr_ip *hip = HDR_IP(pkt);
  603.   hdr_cmn *hc = HDR_CMN(pkt);
  604.   printf("at %f %s received packet type ", now, uname_);
  605.   hdr_pgm *hp = HDR_PGM(pkt);
  606.   
  607.   hdr_pgm_spm *hps;
  608.   hdr_pgm_nak *hpn;
  609.   switch(hp->type_) {
  610.   case PGM_SPM:
  611.     hps = HDR_PGM_SPM(pkt);
  612.     printf("SPM (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, spm_path %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hps->spm_path_.addr_, hps->spm_path_.port_);
  613.     break;
  614.   case PGM_ODATA:
  615.     printf("ODATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  616.     break;
  617.   case PGM_RDATA:
  618.     printf("RDATA (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  619.     break;
  620.   case PGM_NAK:
  621.     hpn = HDR_PGM_NAK(pkt);
  622.     printf("NAK (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %d, source %d:%d, group %d:%dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_, hpn->source_.addr_, hpn->source_.port_, hpn->group_.addr_, hpn->group_.port_);
  623.     break;
  624.   case PGM_NCF:
  625.     printf("NCF (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  626.     break;
  627.   default:
  628.     printf("UNKNOWN (TSI %d:%d) from %d:%d to %d:%d iface %d, size %d, seqno %dn", hp->tsi_.addr_, hp->tsi_.port_, hip->saddr(), hip->sport(), hip->daddr(), hip->dport(), hc->iface(), hc->size(), hp->seqno_);
  629.     break;
  630.   }
  631. }
  632. #endif // PGM_DEBUG
  633. #endif //HAVE_STL