rmst_filter.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:66k
源码类别:

通讯编程

开发平台:

Visual C++

  1. //
  2. // rmst_filter.cc  : RmstFilter Class Methods
  3. // authors         : Fred Stann
  4. //
  5. // Copyright (C) 2003 by the University of Southern California
  6. // $Id: rmst_filter.cc,v 1.3 2005/09/13 04:53:48 tomh Exp $
  7. //
  8. // This program is free software; you can redistribute it and/or
  9. // modify it under the terms of the GNU General Public License,
  10. // version 2, as published by the Free Software Foundation.
  11. //
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. //
  17. // You should have received a copy of the GNU General Public License along
  18. // with this program; if not, write to the Free Software Foundation, Inc.,
  19. // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  20. //
  21. // Linking this file statically or dynamically with other modules is making
  22. // a combined work based on this file.  Thus, the terms and conditions of
  23. // the GNU General Public License cover the whole combination.
  24. //
  25. // In addition, as a special exception, the copyright holders of this file
  26. // give you permission to combine this file with free software programs or
  27. // libraries that are released under the GNU LGPL and with code included in
  28. // the standard release of ns-2 under the Apache 2.0 license or under
  29. // otherwise-compatible licenses with advertising requirements (or modified
  30. // versions of such code, with unchanged license).  You may copy and
  31. // distribute such a system following the terms of the GNU GPL for this
  32. // file and the licenses of the other code concerned, provided that you
  33. // include the source code of that other code when and as the GNU GPL
  34. // requires distribution of source code.
  35. //
  36. // Note that people who make modified versions of this file are not
  37. // obligated to grant this special exception for their modified versions;
  38. // it is their choice whether to do so.  The GNU General Public License
  39. // gives permission to release a modified version without this exception;
  40. // this exception also makes it possible to release a modified version
  41. // which carries forward this exception.
  42. #include "rmst_filter.hh"
  43. char *rmstmsg_types[] = {"INTEREST", "POSITIVE REINFORCEMENT",
  44.                      "NEGATIVE REINFORCEMENT", "DATA",
  45.                      "EXPLORATORY DATA", "PUSH EXPLORATORY DATA",
  46.                      "CONTROL", "REDIRECT"};
  47. #ifdef NS_DIFFUSION
  48. class DiffAppAgent;
  49. #endif // NS_DIFFUSION
  50. #ifdef NS_DIFFUSION
  51. static class RmstFilterClass : public TclClass {
  52. public:
  53.   RmstFilterClass() : TclClass("Application/DiffApp/RmstFilter") {}
  54.   TclObject* create(int argc, const char*const* argv) {
  55.     return(new RmstFilter());
  56.   }
  57. } class_rmst_filter;
  58. int RmstFilter::command(int argc, const char*const* argv) {
  59.   //Tcl& tcl =  Tcl::instance();
  60.   if (argc == 2) {
  61.     if (strcmp(argv[1], "start") == 0) {
  62.       run();
  63.       return (TCL_OK);
  64.     }
  65.   }
  66.   return (DiffApp::command(argc, argv));
  67. }
  68. #endif // NS_DIFFUSION
  69. class ReinfMessage {
  70. public:
  71.   int32_t rdm_id_;
  72.   int32_t pkt_num_;
  73. };
  74. // RmstFilterCallback::recv
  75. // Called by diffusion core when a message is available for this filter.
  76. // RmstFilterCallback is derived from the abstract class FilterCallback.
  77. // A pointer to the FilterCallback class is required in the API method "addFilter."
  78. void RmstFilterCallback::recv(Message *msg, handle h)
  79. {
  80.   app_->recv(msg, h);
  81. }
  82. // RmstFilter::recv
  83. //
  84. // Called by the Callback::recv method.
  85. void RmstFilter::recv(Message *msg, handle h)
  86. {
  87.   // Process the message handed to us by the core.
  88.   // If true is returned we forward the message. Otherwise it dies here.
  89.   if(processMessage(msg))
  90.     ((DiffusionRouting *)dr_)->sendMessage(msg, h);
  91. }
  92. // RmstFilter::processMessage
  93. //
  94. // Called by the RmstFilter::recv method when this filter gets a message.
  95. bool RmstFilter::processMessage(Message *msg)
  96. {
  97.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  98.   NRSimpleAttribute<int> *frag_attr = NULL;
  99.   NRSimpleAttribute<int> *pkts_sent_attr = NULL;
  100.   NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
  101.   NRSimpleAttribute<void *> *reinf_attr = NULL;
  102.   NRSimpleAttribute<int> *nrscope = NULL;
  103.   NRSimpleAttribute<int> *nr_class = NULL;
  104.   NRAttrVec *data;
  105.   Key2ExpLog::iterator exp_iterator;
  106.   Int2Rmst::iterator rmst_iterator;
  107.   int rmst_no;
  108.   int frag_no;
  109.   int class_type;
  110.   int rmst_ctl_type;
  111.   union LlToInt key;
  112.   Rmst *rmst_ptr;
  113.   // If this is a message that uses the transport layer, we process it.
  114.   // Otherwise we send it back to the core (by returning true).
  115.   tsprt_ctl_attr = RmstTsprtCtlAttr.find(msg->msg_attr_vec_);
  116.   if (!tsprt_ctl_attr){
  117.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter got non-transport messagen");
  118.     return true;
  119.   }
  120.   rmst_ctl_type = tsprt_ctl_attr->getVal();
  121.   DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processMessage got a");
  122.   if (msg->new_message_)
  123.     DiffPrint(DEBUG_IMPORTANT, " new (%d) ", msg->msg_type_);
  124.   else
  125.     DiffPrint(DEBUG_IMPORTANT, "n old (%d) ", msg->msg_type_);
  126.   if (msg->last_hop_ != LOCALHOST_ADDR)
  127.     DiffPrint(DEBUG_IMPORTANT, "%s message from %d to %dn",
  128.       rmstmsg_types[msg->msg_type_],
  129.       msg->last_hop_, msg->next_hop_);
  130.   else
  131.     DiffPrint(DEBUG_IMPORTANT, "%s message from local agentn",
  132.       rmstmsg_types[msg->msg_type_]);
  133.   // We only care about messages we haven't seen before,
  134.   // but we generally let other filters get them (because they may need them).
  135.   // However, if this is an old DATA message arriving at a sink, the sink may
  136.   // negatively reinforce a reinforced path.  This is because we withold the 
  137.   // new messages until we get the entire blob.  The old message is the result
  138.   // of a lost ACK when using SMAC with ARQ.
  139.   if (!msg->new_message_ && msg->msg_type_ == DATA
  140.       && rmst_ctl_type == RMST_RESP){
  141.     DiffPrint(DEBUG_SOME_DETAILS, 
  142.       "  Sink got an old DATA message from node %dn", msg->last_hop_);
  143.     data = msg->msg_attr_vec_;
  144.     rmst_id_attr = RmstIdAttr.find(data);
  145.     if (!rmst_id_attr){
  146.       DiffPrint(DEBUG_SOME_DETAILS,
  147.         "  Filter received a bad transport packet!n");
  148.       return false;
  149.     }
  150.     rmst_no = rmst_id_attr->getVal();
  151.     // Find the rmst.
  152.     rmst_iterator = rmst_map_.find(rmst_no);
  153.     if(rmst_iterator == rmst_map_.end()){
  154.       DiffPrint(DEBUG_IMPORTANT,
  155.         "  couldn't find DB entry for Rmst %dn", rmst_no);
  156.       return false;
  157.     }
  158.     else{
  159.       rmst_ptr = (*rmst_iterator).second;
  160.       if ( (local_sink_) && (msg->last_hop_ == rmst_ptr->last_hop_) ){
  161.         // This is the case where SMAC sent the same DATA message twice to a sink.
  162.         // We suppress this message so we don't kill our reinforced path.
  163.         DiffPrint(DEBUG_IMPORTANT,
  164.           "  We suppress old DATA message from smac retransmission!n");
  165.         return false;
  166.       }
  167.       else
  168.         return true;
  169.     }
  170.   }
  171.   else if (!msg->new_message_)
  172.     return true;
  173.   // When we get Rmst Fragments we must sync the local cache!
  174.   if ( (rmst_ctl_type == RMST_RESP) && 
  175.        ((msg->msg_type_ == DATA) || (msg->msg_type_ == EXPLORATORY_DATA)) ){
  176.     rmst_ptr = syncLocalCache(msg);
  177.     // syncLocalCache will return NULL if the 
  178.     // attribute set doesn't make sense.
  179.     if (rmst_ptr == NULL)
  180.       return false;
  181.     rmst_no = rmst_ptr->rmst_no_;
  182.     // Mark the time we got some kind of data.
  183.     GetTime (&last_data_rec_);
  184.   }
  185.   //  New exploratory messages are entered into the exp_map_,
  186.   //  so that we can find the last hop if it gets reinforced.
  187.   //  Positive reinforcement messages are used to find the 
  188.   //  corresponding message in the exp_map_, so we know the
  189.   //  current reinforced path to the source of an rmst.
  190.   switch (msg->msg_type_){
  191.     case(EXPLORATORY_DATA):
  192.       ExpLog exp_msg;
  193.       DiffPrint(DEBUG_LOTS_DETAILS,
  194.         "  Exploratory_Msg: ptk_num = %x, rdm_id_ = %x, last_hop = %dn",
  195.         msg->pkt_num_, msg->rdm_id_, msg->last_hop_);
  196.       // Put the ID for this Exploratory message, along with its last hop,
  197.       // into the exp_map_.  If this message gets reinforced, we will be
  198.       // able to identify the next hop in the back channel.
  199.       DiffPrint(DEBUG_SOME_DETAILS,
  200.         "  Exploratory message for Reliable transport Id = %dn", rmst_no);
  201.       key.int_val_[0] = msg->pkt_num_;
  202.       key.int_val_[1] = msg->rdm_id_;
  203.       DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llxn", key.ll_val_);
  204.       exp_msg.rmst_no_ = rmst_no;
  205.       exp_msg.last_hop_ = msg->last_hop_;
  206.       exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
  207.       // If this is a new exploratory message arriving at a sink,
  208.       // we assume that this path will get reinforced by the 
  209.       // gradient filter. Sinks don't get positive reinforcement
  210.       // messages, so we must record last_hop_ now.
  211.       if (local_sink_){
  212.         rmst_ptr->last_hop_ = msg->last_hop_; 
  213.         if (rmst_ptr->reinf_){
  214.           DiffPrint(DEBUG_IMPORTANT, "  got a new path exploratory msg at sink.n");
  215.           rmst_ptr->wait_for_new_path_ = true;
  216.         }
  217.         else{
  218.           rmst_ptr->reinf_ = true;
  219.           DiffPrint(DEBUG_IMPORTANT, "  got an initial exploratory msg at sink.n");
  220.         }
  221.         DiffPrint(DEBUG_IMPORTANT, "  set last_hop for rmst %d to %dn",
  222.           rmst_no, rmst_ptr->last_hop_);
  223.         rmst_ptr->pkts_rec_ = 0; 
  224.         rmst_ptr->last_hop_pkts_sent_ = 0; 
  225.       }
  226.       else{
  227.         // If this is not a sink we reset the base fragment that
  228.         // we look for holes from.
  229.         DiffPrint(DEBUG_LOTS_DETAILS,
  230.           "  intermediate node resets sync_base_ and reinf_.n");
  231.         frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
  232.         frag_no = frag_attr->getVal();
  233.         rmst_ptr->sync_base_ = frag_no;
  234.         if(rmst_ptr->reinf_)
  235.           rmst_ptr->reinf_ = false;
  236.         rmst_ptr->last_hop_ = 0;
  237.         rmst_ptr->pkts_sent_ = 0;
  238.         rmst_ptr->pkts_rec_ = 0;
  239.         rmst_ptr->last_hop_pkts_sent_ = 0; 
  240.         rmst_ptr->naks_rec_ = 0; 
  241.       }
  242.       // If this is not a sink and a watchdog timer is active, we cancel
  243.       // it because we may not end up on the new reinforced path. We
  244.       // don't want to look for fragments that will never arrive.
  245.       if ((rmst_ptr->watchdog_active_) && (!local_sink_) 
  246.         && (!rmst_ptr->local_source_)){
  247.         rmst_ptr->cancel_watchdog_ = true;
  248.         rmst_ptr->cleanHoleMap();
  249.       }
  250.       // We always forward exploratory data.
  251.       return(true);
  252.       break;
  253.     case(DATA):
  254.       if (rmst_ctl_type != RMST_RESP){
  255.         processCtrlMessage(msg);
  256.         // We don't let Rmst control messages go to the gradient or other filters.
  257.         return false;
  258.       }
  259.       // We have a normal DATA packet.
  260.       rmst_ptr->pkts_rec_++;
  261.       // If we got the upstream send count - update it in Rmst.
  262.       pkts_sent_attr = RmstPktsSentAttr.find(msg->msg_attr_vec_);
  263.       if (pkts_sent_attr){
  264.         rmst_ptr->last_hop_pkts_sent_ = pkts_sent_attr->getVal();
  265.         DiffPrint(DEBUG_SOME_DETAILS,
  266.           "processMessage:: got last_hop_pkts_sent_ = %d packetsn",
  267.           rmst_ptr->last_hop_pkts_sent_);
  268.         if ( (rmst_ptr->last_hop_pkts_sent_ > 20) && 
  269.              (rmst_ptr->pkts_rec_ < (rmst_ptr->last_hop_pkts_sent_ * BLACKLIST_THRESHOLD)) ){
  270.           Blacklist::iterator black_list_iterator;
  271.           black_list_iterator = black_list_.begin();
  272.           while(black_list_iterator != black_list_.end()){
  273.             if(*black_list_iterator == rmst_ptr->last_hop_)
  274.             break;
  275.             black_list_iterator++;
  276.           }
  277.           if(black_list_iterator == black_list_.end()){
  278.             DiffPrint(DEBUG_IMPORTANT, "Adding node %d to black_list_ !!n",
  279.               rmst_ptr->last_hop_);
  280.             black_list_.push_front(rmst_ptr->last_hop_);
  281.             ((DiffusionRouting *)dr_)->addToBlacklist(rmst_ptr->last_hop_);
  282.             // Now send an EXP_REQ!
  283.             sendExpReqUpstream(rmst_ptr);
  284.             rmst_ptr->sent_exp_req_ = true;
  285.             GetTime(&rmst_ptr->exp_req_time_);
  286.             // We need to send a negative reinforcement on blacklisted link!
  287.             Message *neg_reinf_msg;
  288.             neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
  289.               0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, 
  290.               rmst_ptr->last_hop_, LOCALHOST_ADDR);
  291.             neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
  292.             ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
  293.             pkt_count_++;
  294.             delete neg_reinf_msg;
  295.           }
  296.         }
  297.       }
  298.       // We suppress new DATA messages that don't arrive on the 
  299.       // reinforced path. 
  300.       if ( msg->last_hop_ != rmst_ptr->last_hop_ ){
  301.         DiffPrint(DEBUG_IMPORTANT,
  302.           "  We suppress new DATA message on non-backchannel path!; backchannel = %dn",
  303.           rmst_ptr->last_hop_);
  304.         msg->new_message_ = 0;
  305.         return true;
  306.       }
  307.       if (rmst_ptr->wait_for_new_path_){
  308.         rmst_ptr->wait_for_new_path_ = false;
  309.         DiffPrint(DEBUG_SOME_DETAILS, "  node resets wait_for_new_path_.n");
  310.       }
  311.       if (local_sink_ && rmst_ptr->sent_exp_req_){
  312.         DiffPrint(DEBUG_SOME_DETAILS,
  313.           "  source got a new path, set sent_exp_req_ false.n");
  314.         rmst_ptr->sent_exp_req_ = false;
  315.       }
  316.       // We forward DATA if we aren't a source or a sink.
  317.       // Sources collect all fragments and send them from a timer.
  318.       // Sinks collect all fragments and send them to the app when they
  319.       // have all arrived.
  320.       if(rmst_ptr->local_source_ || local_sink_)
  321.         return false;
  322.       else{
  323.         rmst_ptr->pkts_sent_++;
  324.         // We need to alter the RmstPktsSentAttr to reflect this node!
  325.         if(pkts_sent_attr)
  326.           pkts_sent_attr->setVal(rmst_ptr->pkts_sent_);
  327.         return true;
  328.       }
  329.       break;
  330.     case(INTEREST):
  331.       data = msg->msg_attr_vec_;
  332.       nr_class = NRClassAttr.find(data);
  333.       if (nr_class){
  334.         class_type = nr_class->getVal();
  335.         if (class_type == NRAttribute::DISINTEREST_CLASS)
  336.           DiffPrint(DEBUG_SOME_DETAILS, "  DISINTEREST_CLASSn");
  337.       }
  338.       nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
  339.       if(nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
  340.         DiffPrint(DEBUG_SOME_DETAILS, "  rmst LOCAL_SCOPE Interest Messagen");
  341.       else if (msg->last_hop_ == LOCALHOST_ADDR){
  342.         DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from local SINKn");
  343.         local_sink_ = true;
  344.         local_sink_port_ = msg->source_port_;
  345.         GetTime (&last_sink_time_);
  346.         if (interest_attrs_ == NULL)
  347.           interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
  348.       }
  349.       else{
  350.         DiffPrint(DEBUG_SOME_DETAILS, "  rmst Interest Message from non-local noden");
  351.           if (interest_attrs_ == NULL)
  352.             interest_attrs_ = CopyAttrs(msg->msg_attr_vec_);
  353.       }
  354.       break;
  355.     case(POSITIVE_REINFORCEMENT):
  356.       ReinfMessage *reinf_msg;
  357.       ExpLog exp_log;
  358.       DiffPrint(DEBUG_IMPORTANT, "  Positive Reinf arrivedn");
  359.       reinf_attr = ReinforcementAttr.find(msg->msg_attr_vec_);
  360.       reinf_msg = (ReinfMessage*)reinf_attr->getVal();
  361.       DiffPrint(DEBUG_LOTS_DETAILS, "  Pos_Reinf: ptk_num = %x, rdm_id_ = %xn",
  362.         reinf_msg->pkt_num_, reinf_msg->rdm_id_);
  363.       key.int_val_[0] = reinf_msg->pkt_num_;
  364.       key.int_val_[1] = reinf_msg->rdm_id_;
  365.       exp_iterator = exp_map_.find(key.ll_val_);
  366.       if(exp_iterator != exp_map_.end()){
  367.         exp_log = (*exp_iterator).second;
  368.         DiffPrint(DEBUG_SOME_DETAILS, "  Reinforcement for rmst_no = %d, last_hop_ = %dn",
  369.           exp_log.rmst_no_, exp_log.last_hop_);
  370.         // Here is where we must update the rmst with back-channel
  371.         // last hop.
  372.         rmst_no = exp_log.rmst_no_;
  373.         rmst_iterator = rmst_map_.find(rmst_no);
  374.         if(rmst_iterator != rmst_map_.end()){
  375.           rmst_ptr = (*rmst_iterator).second;
  376.           rmst_ptr->last_hop_ = exp_log.last_hop_;
  377.           rmst_ptr->fwd_hop_ = msg->last_hop_;
  378.           DiffPrint(DEBUG_SOME_DETAILS, "  Setting rmst_no %d last_hop_ = %d, fwd_hop_ = %dn",
  379.             rmst_no, rmst_ptr->last_hop_, rmst_ptr->fwd_hop_);
  380.           if(!rmst_ptr->reinf_){
  381.             rmst_ptr->reinf_ = true;
  382.             if(rmst_ptr->local_source_)
  383.               DiffPrint(DEBUG_LOTS_DETAILS, "  Local source got a Reinfn");
  384.           }
  385.         }
  386.         else{
  387.           DiffPrint(DEBUG_IMPORTANT, "  Reinforcement cant't find rmst_non");
  388.           break;
  389.         }
  390.         // We are on the reinforced path, so we must start a timer
  391.         // if one hasn't already been started. Sinks don't get
  392.         // reinforced, so they start a WATCHDOG in syncLocalCache.
  393.         if( (rmst_ptr->watchdog_active_ == false) && (caching_mode_) ){
  394.           TimerCallback *rmst_timer;
  395.           DiffPrint(DEBUG_IMPORTANT,
  396.             "  Set a WATCHDOG_TIMER at caching node for reinforced rmst_no %dn",
  397.             rmst_no);
  398.           rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
  399.           // We check on things every 10 seconds.
  400.           rmst_ptr->watchdog_handle_ = 
  401.             ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL, rmst_timer);
  402.           rmst_ptr->watchdog_active_ = true;
  403.         }
  404.         if (rmst_ptr->wait_for_new_path_){
  405.           DiffPrint(DEBUG_SOME_DETAILS, "  Resetting wait_for_new_path_ for rmst_no %dn", rmst_no);
  406.           rmst_ptr->wait_for_new_path_ = false;
  407.         }
  408.         if (rmst_ptr->sent_exp_req_){
  409.           DiffPrint(DEBUG_SOME_DETAILS,
  410.             "  intermediate node got a new path, set sent_exp_req_ false.n");
  411.           rmst_ptr->sent_exp_req_ = false;
  412.         }
  413.       }
  414.       else{
  415.         if(!rmst_ptr->local_source_)
  416.           DiffPrint(DEBUG_IMPORTANT, "  Reinforcement matches no Exploratory msgn");
  417.       }
  418.       break;
  419.     case(NEGATIVE_REINFORCEMENT):
  420.       bool ret_val;
  421.       if (tsprt_ctl_attr){
  422.         DiffPrint(DEBUG_SOME_DETAILS,
  423.             "  NEGATIVE_REINFORCEMENT, last_hop_ = %d, rmst_ctl_type = %dn", 
  424.             msg->last_hop_, rmst_ctl_type);
  425.       }
  426.       // We need to check if we got a NEGATIVE REINFORCEMENT from a node that is the
  427.       // next node in the forward direction (downstream).  If so, and we are the source
  428.       // we must send a new EXPLORATORY message;  else if we are not the source,
  429.       // we must send and exp request upstream.
  430.       ret_val = true;
  431.       rmst_iterator = rmst_map_.begin();
  432.       while(rmst_iterator != rmst_map_.end()){
  433.         rmst_ptr = (*rmst_iterator).second;
  434.         DiffPrint(DEBUG_SOME_DETAILS,
  435.             "  searching rmsts - rmst_no_ %d: fwd_hop_ = %d, reinf_ = %d, acked = %dn",
  436.             rmst_ptr->rmst_no_, rmst_ptr->fwd_hop_, rmst_ptr->reinf_, rmst_ptr->acked_);
  437.         if (rmst_ptr->local_source_ && rmst_ptr->reinf_
  438.             && (rmst_ptr->fwd_hop_ == msg->last_hop_)
  439.             && !rmst_ptr->acked_){
  440.           // If we are reinforced then we never got and EXP_REQ!!
  441.           DiffPrint(DEBUG_SOME_DETAILS, "  local source sees NEG_REINFn");
  442.           processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
  443.         }
  444.         else if (!rmst_ptr->local_source_ && (rmst_ptr->fwd_hop_ == msg->last_hop_)
  445.                  && rmst_ptr->reinf_ && !rmst_ptr->acked_){
  446.           DiffPrint(DEBUG_SOME_DETAILS, "  intermediate node sees NEG_REINF from reinforced noden");
  447.           DiffPrint(DEBUG_SOME_DETAILS, "  send Exp Request upstream!n");
  448.           ret_val = false;
  449.           sendExpReqUpstream(rmst_ptr);
  450.         }
  451.         else{
  452.           DiffPrint(DEBUG_SOME_DETAILS,
  453.             "  node sees NEG_REINF from non-reinforced node - let routing layer see itn");
  454.   ret_val = true;
  455. }
  456.         rmst_iterator++;
  457.       }
  458.       if (!ret_val)
  459.         return false;
  460.       break;
  461.     default:
  462.       break;
  463.   } // switch msg->type
  464.   return true;
  465. }
  466. // RmstFilter::syncLocalCache
  467. //
  468. // This routine adds new transport data messages to the local data base.
  469. Rmst* RmstFilter::syncLocalCache (Message *msg)
  470. {
  471.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  472.   NRSimpleAttribute<int> *frag_attr = NULL;
  473.   NRSimpleAttribute<int> *max_frag_attr = NULL;
  474.   NRSimpleAttribute<void *> *data_buf_attr = NULL;
  475.   NRAttrVec *data = msg->msg_attr_vec_;
  476.   Int2Rmst::iterator rmst_iterator;
  477.   int rmst_no;
  478.   int frag_no;
  479.   int max_frag_no;
  480.   void *blob_ptr;
  481.   int blob_len;
  482.   void *tmp_frag_ptr;
  483.   Rmst *rmst_ptr;
  484.   rmst_id_attr = RmstIdAttr.find(data);
  485.   frag_attr = RmstFragAttr.find(data);
  486.   max_frag_attr = RmstMaxFragAttr.find(data);
  487.   data_buf_attr = RmstDataAttr.find(data);
  488.   if (! (rmst_id_attr && frag_attr && data_buf_attr) ){
  489.     DiffPrint(DEBUG_IMPORTANT, "  Filter received a BAD transport packet!n");
  490.     return NULL;
  491.   }
  492.   rmst_no = rmst_id_attr->getVal();
  493.   frag_no = frag_attr->getVal();
  494.   if(max_frag_attr)
  495.     max_frag_no = max_frag_attr->getVal();
  496.   else
  497.     max_frag_no = 0;
  498.   blob_ptr = data_buf_attr->getVal();
  499.   blob_len = data_buf_attr->getLen();
  500.   // Here is where I consuslt the Data Base and possibly add a new map,
  501.   // or add to an existing map.
  502.   rmst_iterator = rmst_map_.find(rmst_no);
  503.   if(rmst_iterator == rmst_map_.end()){
  504.     DiffPrint(DEBUG_IMPORTANT, "  creating a new DB entry for Rmst %dn", rmst_no);
  505.     DiffPrint(DEBUG_SOME_DETAILS, "  Max Fragment number = %dn", max_frag_no);
  506.     rmst_ptr = new Rmst(rmst_no);
  507.     rmst_ptr->max_frag_ = max_frag_no;
  508.     rmst_map_.insert(Int2Rmst::value_type(rmst_no, rmst_ptr));
  509.     // Artificially initialize last_nak_time_ so it's older
  510.     // than any Naks we may get.
  511.     GetTime(&rmst_ptr->last_nak_time_);
  512.     // Several decisions in this routine relate to messages that emanated
  513.     // from a local source.
  514.     if (msg->last_hop_ == LOCALHOST_ADDR) {
  515.       rmst_ptr->local_source_ = true;
  516.       rmst_ptr->local_source_port_ = msg->source_port_;
  517.       // This is the first fragment of an rmst from a local source.
  518.       // The message will be marked exploratory by this filter.
  519.       // We need to mark the last hop as LOCALHOST_ADDR.
  520.       rmst_ptr->last_hop_ = LOCALHOST_ADDR;
  521.     }
  522.     // We need to capture the RmstTargetAttr for concantenation on sendMessage.
  523.     if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
  524.       NRSimpleAttribute<char *> *rmst_tgt_attr = NULL;
  525.       rmst_tgt_attr = RmstTargetAttr.find(msg->msg_attr_vec_);
  526.       if (rmst_tgt_attr){
  527.         char *tmp_str = rmst_tgt_attr->getVal();
  528.         rmst_ptr->target_str_ = new char[strlen(tmp_str)+1];
  529.         strcpy (rmst_ptr->target_str_, tmp_str);
  530.         DiffPrint(DEBUG_IMPORTANT, "  RmstTargetAttr = %sn", rmst_ptr->target_str_);
  531.       }
  532.       else
  533.         DiffPrint(DEBUG_IMPORTANT, "  no RmstTargetAttr Rmst %d !n", rmst_no);
  534.     }
  535.   }
  536.   else
  537.     rmst_ptr = (*rmst_iterator).second;
  538.   if(!rmst_ptr->local_source_)
  539.     DiffPrint(DEBUG_IMPORTANT, "  Got a blob, rmstId = %d, frag_no = %dn", rmst_no, frag_no);
  540.   // Update the time we last saw data for this Rmst.
  541.   GetTime(&rmst_ptr->last_data_time_);
  542.   // We cache the fragment at the sink and source,
  543.   // or in caching mode at each node that receives it.
  544.   if((rmst_ptr->local_source_)||(local_sink_)||(caching_mode_)){
  545.     tmp_frag_ptr = rmst_ptr->getFrag(frag_no);
  546.     if (tmp_frag_ptr == NULL){
  547.       if(!rmst_ptr->local_source_)
  548.         DiffPrint(DEBUG_SOME_DETAILS, "  creating a new frag %d entry for Rmst %dn",
  549.           frag_no, rmst_no);
  550.       if (frag_no == rmst_ptr->max_frag_)
  551.       rmst_ptr->max_frag_len_ = blob_len;
  552.       tmp_frag_ptr = new char[blob_len];
  553.       memcpy(tmp_frag_ptr, blob_ptr, blob_len);
  554.       rmst_ptr->putFrag(frag_no, tmp_frag_ptr);
  555.       // Check to see if this fragment was NAKed.
  556.       // If so, delete from the hole map.
  557.       if(!rmst_ptr->local_source_){
  558.         if ( rmst_ptr->inHoleMap(frag_no) ){
  559.           // We need to see if we sent a NAK for this frag.
  560.           NakData *nak_ptr = rmst_ptr->getHole(frag_no);
  561.           if(nak_ptr->nak_sent_)
  562.             DiffPrint(DEBUG_SOME_DETAILS, "  We sent a NAK_REQ for this fragment.n");
  563.           DiffPrint(DEBUG_SOME_DETAILS, "  filter removing hole %d from hole_map_n",frag_no);
  564.           rmst_ptr->delHole(frag_no);
  565.         }
  566.       }
  567.       // We start a WATCHDOG for an rmst here if: this is a local sink,
  568.       // we haven't started a timer, and this is not the initial fragment.
  569.       // Intermediate nodes (in caching mode) start a timer if they are on the
  570.       // reinforced path, which is checked in processMessage.
  571.       if((!rmst_ptr->local_source_)&&(local_sink_)&&(rmst_ptr->watchdog_active_ == false)
  572.         && (frag_no>0)){
  573.         TimerCallback *rmst_timer;
  574.         DiffPrint(DEBUG_IMPORTANT, "  Set a WATCHDOG_TIMER at sink for rmst_no %dn", rmst_no);
  575.         rmst_timer = new RmstTimeout(this, rmst_no, WATCHDOG_TIMER);
  576.         // We check on things every 10 seconds.
  577.         rmst_ptr->watchdog_handle_ = ((DiffusionRouting *)dr_)->addTimer(WATCHDOG_INTERVAL,
  578.           rmst_timer);
  579.         rmst_ptr->watchdog_active_ = true;
  580.       }
  581.     }
  582.     else
  583.       DiffPrint(DEBUG_SOME_DETAILS, "  got a duplicate frag %d for blob %dn",
  584.         frag_no, rmst_no);
  585.     // If we have still have a hole in the fragment map, update the hole map.
  586.     if ((!rmst_ptr->local_source_) && (rmst_ptr->holeInFragMap()))
  587.       rmst_ptr->syncHoleMap();
  588.     // If the Rmst is complete, cancell timer, stop timer,
  589.     // send Ack, give to local sinks.
  590.     if(rmst_ptr->rmstComplete()){
  591.       if ((rmst_ptr->watchdog_active_) && (!rmst_ptr->local_source_)){
  592.         DiffPrint(DEBUG_SOME_DETAILS, 
  593.           "  Rmst #%d is complete set cancel_watchdog_ to stop WATCHDOGn",
  594.           rmst_no);
  595.         rmst_ptr->cancel_watchdog_ = true;
  596.       }
  597.       // Send this Rmst to any local sink
  598.       if(local_sink_ && !(rmst_ptr->acked_)){
  599.         sendRmstToSink(rmst_ptr);
  600.         // We mark the rmst acked at the sink so it will clean up.
  601.         rmst_ptr->acked_ = true;
  602.       }
  603.       // If this is a source, we only send out the fragments when we've got
  604.       // them all from the application. If the Rmst is complete we add the
  605.       // Rmst to the send_list_, and if there is no send timer we start one.
  606.       if(rmst_ptr->local_source_){
  607.         SendMsgData new_send_msg;
  608.         // The Rmst is complete and this is a source - put in send list.
  609.         new_send_msg.rmst_no_ = rmst_no;
  610.         new_send_msg.last_frag_sent_ = -1;
  611.         new_send_msg.exp_base_ = 0;
  612.         send_list_.push_back(new_send_msg);
  613.         if(!send_timer_active_){
  614.           TimerCallback *send_timer;
  615.           // Now add a timer to send this and any NAKS.
  616.           DiffPrint(DEBUG_SOME_DETAILS,
  617.             "  Rmst %d ready to send - Set a SEND_TIMERn", rmst_no);
  618.           send_timer = new RmstTimeout(this, -1, SEND_TIMER);
  619.           // We check on things every second.
  620.           send_timer_handle_ = 
  621.             ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
  622.           send_timer_active_ = true;
  623.         }
  624.       }
  625.       else
  626.         // We must let upstream nodes know that we got the whole blob.
  627.         sendAckToSource(rmst_ptr);
  628.     }
  629.   }
  630.   else{
  631.     rmst_ptr->max_frag_rec_ = frag_no;
  632.     DiffPrint(DEBUG_LOTS_DETAILS, "  Not caching frag %d entry for Rmst %dn", frag_no, rmst_no);
  633.   }
  634.   return rmst_ptr;
  635. }
  636. void RmstFilter::processCtrlMessage(Message *msg)
  637. {
  638.   NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
  639.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  640.   NRSimpleAttribute<int> *frag_attr = NULL;
  641.   NRAttrVec *data;
  642.   NRAttrVec attrs;
  643.   Int2Rmst::iterator rmst_iterator;
  644.   int rmst_no; 
  645.   int frag_no;
  646.   int rmst_ctl_type;
  647.   Rmst *rmst_ptr;
  648.   void *frag_ptr;
  649.   Message *nak_msg;
  650.   NRAttrVec::iterator place;
  651.   bool forwarding_nak = false;
  652.   data = msg->msg_attr_vec_;
  653.   tsprt_ctl_attr = RmstTsprtCtlAttr.find(data);
  654.   rmst_ctl_type = tsprt_ctl_attr->getVal();
  655.   rmst_id_attr = RmstIdAttr.find(data);
  656.   if(!rmst_id_attr) {
  657.     DiffPrint(DEBUG_SOME_DETAILS, "  Node got a bad Rmst control msg - no RmstIdAttr!n");
  658.     return;
  659.   }
  660.   rmst_no = rmst_id_attr->getVal();
  661.   // Let's make sure we have this rmst
  662.   rmst_iterator = rmst_map_.begin();
  663.   rmst_iterator = rmst_map_.find(rmst_no);
  664.   if(rmst_iterator != rmst_map_.end())
  665.     rmst_ptr = (*rmst_iterator).second;
  666.   else{
  667.     DiffPrint(DEBUG_IMPORTANT, "  Filter can't find Rmst %d for Rmst control msgn", rmst_no);
  668.     return;
  669.   }
  670.   switch (rmst_ctl_type){
  671.   case(ACK_RESP):
  672.     DiffPrint(DEBUG_IMPORTANT, "  Got an ACK_RESPn");
  673.     // For now we automatically forward ACKs if we're not the source.
  674.     rmst_ptr->acked_ = true;
  675.     if(!rmst_ptr->local_source_){
  676.       Message *ack_msg;
  677.       // If we got an ACK and we aren't the source, we must be an
  678.       // intermediate node (Sinks don't get ACKs, they send them).
  679.       // We need to forward ACK toward source if possible.
  680.       if (rmst_ptr->reinf_) {
  681.         DiffPrint(DEBUG_SOME_DETAILS, "  forwarding ACK to %dn", rmst_ptr->last_hop_);
  682.         attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
  683.         attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
  684.         ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
  685.           pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
  686.         ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  687.         ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
  688.         pkt_count_++;
  689.         delete ack_msg;
  690.         ClearAttrs(&attrs);
  691.       }
  692.       else
  693.         DiffPrint(DEBUG_IMPORTANT, "  intermediate node can't forward ACK for Rmst %dn", rmst_no);
  694.     }
  695.     else{
  696.       DiffPrint(DEBUG_IMPORTANT, "  Source got ACK for Rmst %dn", rmst_no);
  697.       sendContToSource(rmst_ptr);
  698.     }
  699.     break;
  700.   case(NAK_REQ):
  701.     // Mark the time we got this NAK for cleanup timer.
  702.     GetTime(&rmst_ptr->last_nak_time_);
  703.     rmst_ptr->naks_rec_++;
  704.     DiffPrint(DEBUG_IMPORTANT, "  Got a NAK_REQ; number = %dn", rmst_ptr->naks_rec_);
  705.     if ((rmst_ptr->naks_rec_ > 10) && (rmst_ptr->naks_rec_ > (.30 * rmst_ptr->max_frag_)) &&
  706.             rmst_ptr->local_source_){
  707.       DiffPrint(DEBUG_IMPORTANT, "  Too many NAKs - send an EXPLORATORY msg!n");
  708.       processExpReq(rmst_ptr, rmst_ptr->max_frag_sent_);
  709.       return;
  710.     }
  711.     // If we sent an exp request more than 30 seconds ago,
  712.     // we send it again.
  713.     if (rmst_ptr->sent_exp_req_){
  714.       int exp_time = rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
  715.       DiffPrint(DEBUG_SOME_DETAILS, 
  716.         "  Node that sent an EXP_REQ got a NAK: time since last exp = %dn", exp_time);
  717.       if( (rmst_ptr->last_nak_time_.tv_sec - rmst_ptr->exp_req_time_.tv_sec) > 30){
  718.         // Resend an EXP_REQ!!!
  719.         DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!n");
  720.         sendExpReqUpstream(rmst_ptr);
  721.         GetTime(&rmst_ptr->exp_req_time_);
  722.         // Send another negative reinforcement on blacklisted link!
  723.         Message *neg_reinf_msg;
  724.         neg_reinf_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
  725.           0, 0, interest_attrs_->size(), pkt_count_, rdm_id_, 
  726.           rmst_ptr->last_hop_, LOCALHOST_ADDR);
  727.         neg_reinf_msg->msg_attr_vec_ = CopyAttrs(interest_attrs_);
  728.         ((DiffusionRouting *)dr_)->sendMessage(neg_reinf_msg, filter_handle_, 1);
  729.         pkt_count_++;
  730.         delete neg_reinf_msg;
  731.       }
  732.       return;
  733.     }
  734.     // We need to send the naked fragments if we are the source,
  735.     // or a caching node.
  736.     place = data->begin();
  737.     for(;;){
  738.       frag_attr = RmstFragAttr.find_from(data, place, &place);
  739.       if (!frag_attr)
  740.         break;
  741.       frag_no = frag_attr->getVal();
  742.       DiffPrint(DEBUG_IMPORTANT, "  Filter received a NAK_REQ for Rmst %d, frag %dn",
  743.         rmst_no, frag_no);
  744.       // Check if we have this fragment.
  745.       // If not forward NAK toward source if possible.
  746.       frag_ptr =  rmst_ptr->getFrag(frag_no);
  747.       if (frag_ptr == NULL){
  748.         DiffPrint(DEBUG_SOME_DETAILS, "  Filter can't find frag %d of Rmst %d for NAKn",
  749.         frag_no, rmst_no);
  750.         // We need to forward NAK toward source if possible.
  751.         if ( (rmst_ptr->reinf_) && (rmst_ptr->last_hop_ != LOCALHOST_ADDR) ){
  752.           forwarding_nak = true;
  753.           DiffPrint(DEBUG_IMPORTANT, "  forwarding NAK to %dn", rmst_ptr->last_hop_);
  754.           attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
  755.           if(caching_mode_){
  756.             // We need to add this fragment to our hole map!
  757.             rmst_ptr->putHole(frag_no);
  758.             NakData *nak_ptr = rmst_ptr->getHole(frag_no);
  759.             // Artificially age this hole so it gets
  760.             // naked immediately.
  761.             nak_ptr->tmv.tv_sec -= 1;
  762.           }
  763.         }
  764.         else
  765.           DiffPrint(DEBUG_IMPORTANT, "  not forwarding NAK! - no place to send it!n");
  766.       }
  767.       else{
  768.         // We have this fragment so add it to the NakList for sending.
  769.         NakMsgData nak_msg_data;
  770.         NakList::iterator nak_list_iterator;
  771.         nak_list_iterator = nak_list_.begin();
  772.         while(nak_list_iterator != nak_list_.end()){
  773.           if((nak_list_iterator->rmst_no_ == rmst_no) &&
  774.             (nak_list_iterator->frag_no_ == frag_no))
  775.             break;
  776.           nak_list_iterator++;
  777.         }
  778.         if(nak_list_iterator == nak_list_.end()){
  779.           DiffPrint(DEBUG_SOME_DETAILS,
  780.             "  adding NAK for rmst %d frag %d to nak_list_n", rmst_no, frag_no);
  781.           nak_msg_data.rmst_no_ = rmst_no;
  782.           nak_msg_data.frag_no_ = frag_no;
  783.           nak_list_.push_back(nak_msg_data);
  784.           if(!send_timer_active_){
  785.             TimerCallback *send_timer;
  786.             // Now add a timer to send this and any NAKS.
  787.             DiffPrint(DEBUG_LOTS_DETAILS, 
  788.               "  Set a SEND_TIMER for reinforced rmst_no %dn", rmst_no);
  789.             send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
  790.             // We check on things every second.
  791.             send_timer_handle_ = 
  792.               ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
  793.             send_timer_active_ = true;
  794.           }
  795.         }
  796.       }
  797.       place++;
  798.     }
  799.     if (forwarding_nak){
  800.       attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
  801.       attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
  802.       nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
  803.       attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
  804.         LOCALHOST_ADDR);
  805.       nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  806.       ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
  807.       pkt_count_++;
  808.       delete nak_msg;
  809.       ClearAttrs(&attrs);
  810.     }
  811.     break;
  812.   case(EXP_REQ):
  813.     DiffPrint(DEBUG_IMPORTANT, "  Got an EXP_REQn");
  814.     if(!rmst_ptr->local_source_){
  815.       DiffPrint(DEBUG_SOME_DETAILS, "  Filter forwarding EXP_REQ for Rmst %dn", rmst_no);
  816.       // We need to forward EXP_REQ toward source if possible.
  817.       if (rmst_ptr->reinf_)
  818.         sendExpReqUpstream(rmst_ptr);
  819.     }
  820.     else{
  821.       // We need to call a routine that will clean the NAK list of 
  822.       // outstanding NAK responses for this Rmst, put a new expBase
  823.       // in the send list (lowest of nak or send Lists), and set this
  824.       // rmst as not reinforced.
  825.       frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
  826.       frag_no = frag_attr->getVal();
  827.       DiffPrint(DEBUG_IMPORTANT, "  Source got EXP request for Rmst %dn", rmst_no);
  828.       if (rmst_ptr->reinf_)
  829.           processExpReq(rmst_ptr, frag_no);
  830.       else
  831.           DiffPrint(DEBUG_IMPORTANT, "  EXP request for non-reinforced Rmst %dn", rmst_no);
  832.     }
  833.     break;
  834.   default:
  835.     break;
  836.   }  // switch (rmst_ctl_type)
  837.   return;
  838. }
  839. void RmstFilter::setupNak(int rmst_id)
  840. {
  841.   NRAttrVec attrs;
  842.   int frag_id;
  843.   NakData *nak_ptr;
  844.   Rmst *rmst_ptr;
  845.   int nak_count = 0;
  846.   Int2Rmst::iterator rmst_iterator = rmst_map_.find(rmst_id);
  847.   if(rmst_iterator != rmst_map_.end())
  848.     rmst_ptr = (*rmst_iterator).second;
  849.   else{
  850.     DiffPrint(DEBUG_IMPORTANT, "setupNak - can't find Rmst %dn", rmst_id);
  851.     return;
  852.   }
  853.   Int2Nak::iterator hole_iter = rmst_ptr->hole_map_.begin();
  854.   bool send_new_nak = false;
  855.   timeval cur_time;
  856.   // We now have an iterator to look at each hole (hole_iter),
  857.   // a Rmst Id (rmst_id), a fragment Id ((*hole_iter).first),
  858.   // a NakData pointer ((*hole_iter).second), and an Rmst
  859.   // pointer (rmst_ptr).
  860.   GetTime (&cur_time);
  861.   // The first pass finds holes that haven't been NAKed and should be.
  862.   while(hole_iter != rmst_ptr->hole_map_.end()){
  863.     frag_id = (*hole_iter).first;
  864.     nak_ptr = (*hole_iter).second;
  865.     DiffPrint(DEBUG_SOME_DETAILS,
  866.       "  setupNak - found hole rmst_id %d, frag %dn", rmst_id, frag_id);
  867.     // If we never NAKed this fragment and it's past due,
  868.     // mark it so it gets NAKed.
  869.     if (!nak_ptr->nak_sent_){
  870.       if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > 3 ){
  871.         nak_ptr->nak_sent_ = true;
  872.         nak_ptr->send_nak_ = true;
  873.         send_new_nak = true;
  874.       }
  875.       else
  876.         DiffPrint(DEBUG_SOME_DETAILS,
  877.           "  setupNak - hole %d not old enough to NAKn", frag_id);
  878.     }
  879.     // If we NAKed this fragment and the NAK response is past due,
  880.     // NAK it again.
  881.     else if ( (cur_time.tv_sec - nak_ptr->tmv.tv_sec) > NAK_RESPONSE_WAIT ){
  882.       DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - hole %d has an overdue NAKn", frag_id);
  883.       nak_ptr->send_nak_ = true;
  884.       send_new_nak = true;
  885.     }
  886.     hole_iter++;
  887.   }
  888.   if (send_new_nak){
  889.     Message *nak_msg;
  890.     if ( rmst_ptr->last_hop_ == LOCALHOST_ADDR ){
  891.       DiffPrint(DEBUG_IMPORTANT, "  can't send NAK, no last_hop_!n");
  892.       return;
  893.     }
  894.     // The second pass adds all holes that should be NAKed to vector.
  895.     hole_iter = rmst_ptr->hole_map_.begin();
  896.     while( (hole_iter != rmst_ptr->hole_map_.end()) && (nak_count <= 10) ){
  897.       frag_id = (*hole_iter).first;
  898.       nak_ptr = (*hole_iter).second;
  899.       if ( nak_ptr->send_nak_ ){
  900.         nak_ptr->send_nak_ = false;
  901.         DiffPrint(DEBUG_SOME_DETAILS,
  902.           "  setupNak - adding a NAK for frag_id %d to attrsn", frag_id);
  903.         attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_id));
  904.         GetTime(&(nak_ptr->tmv));
  905.         nak_count++;
  906.       }
  907.       hole_iter++;
  908.     }
  909.     attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, NAK_REQ));
  910.     attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_id));
  911.     // Code to send a message to last_hop_
  912.     nak_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
  913.       pkt_count_, rdm_id_, rmst_ptr->last_hop_, LOCALHOST_ADDR);
  914.     pkt_count_++;
  915.     nak_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  916.     DiffPrint(DEBUG_IMPORTANT, "  Sending NAK_REQ to node %dn", rmst_ptr->last_hop_);
  917.     ((DiffusionRouting *)dr_)->sendMessage(nak_msg, filter_handle_, 1);
  918.     delete nak_msg;
  919.     ClearAttrs(&attrs);
  920.     // Mark the time we sent this NAK for cleanup timer.
  921.     GetTime(&rmst_ptr->last_nak_time_);
  922.   }
  923.   else
  924.     DiffPrint(DEBUG_SOME_DETAILS, "  setupNak - no need for a new NAK for rmst_id %dn", rmst_id);
  925.   return;
  926. }
  927. void RmstFilter::processExpReq(Rmst *rmst_ptr, int frag_no)
  928. {
  929.   NakList::iterator nak_list_iterator;
  930.   SendList::iterator send_list_iterator;
  931.   int rmst_no = rmst_ptr->rmst_no_;
  932.   DiffPrint(DEBUG_IMPORTANT, "  processExpReq called for rmstId %d, frag_no %dn", rmst_no, frag_no);
  933.   // Indicate that Rmst is not reinforced.
  934.   rmst_ptr->reinf_ = false;
  935.   rmst_ptr->pkts_sent_ = 0;
  936.   // If we have an ACK_TIMER active cancel it. We want to resend some packets.
  937.   if(rmst_ptr->ack_timer_active_){
  938.     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
  939.     rmst_ptr->ack_timer_active_ = false;
  940.   }
  941.   // Erase any NAKs. We are about to establish a new path.
  942.   nak_list_iterator = nak_list_.begin();
  943.   while (nak_list_iterator != nak_list_.end()){
  944.     if (nak_list_iterator->rmst_no_ == rmst_no){
  945.       DiffPrint(DEBUG_SOME_DETAILS,
  946.         "  processExpReq erasing frag_no %d from nak_list_n", nak_list_iterator->frag_no_);
  947.       nak_list_iterator = nak_list_.erase(nak_list_iterator);
  948.     }
  949.     else
  950.       nak_list_iterator++;
  951.   }
  952.   DiffPrint(DEBUG_LOTS_DETAILS, "  processExpReq done with nak_list_ for rmstId %dn", rmst_no);
  953.   // If we are being told to start by resending the last packet, back up by one.
  954.   // When a sink gets an exploratory message, they don't start re-NAKing until they
  955.   // know they are reinforced. Sinks only know they are reinforced when they get DATA.
  956.   if ( (frag_no == rmst_ptr->max_frag_) && (rmst_ptr->max_frag_ > 0) ){
  957.     frag_no--;
  958.     DiffPrint(DEBUG_IMPORTANT, "  processExpReq decrements frag_no to %dn", frag_no);
  959.   }
  960.   // Update send_list_ entry or add one.
  961.   send_list_iterator = send_list_.begin();
  962.   while (send_list_iterator != send_list_.end()){
  963.     if (send_list_iterator->rmst_no_ == rmst_no)
  964.       break;
  965.     send_list_iterator++;
  966.   }
  967.   if (send_list_iterator != send_list_.end()){
  968.     send_list_iterator->exp_base_ = frag_no;
  969.     DiffPrint(DEBUG_SOME_DETAILS, "  processExpReq sets send_list_ expBase to %dn", frag_no);
  970.     send_list_iterator->last_frag_sent_ = frag_no-1;
  971.   }
  972.   else{
  973.     SendMsgData new_send_msg;
  974.     DiffPrint(DEBUG_SOME_DETAILS,
  975.       "  processExpReq creating new send_list_ entry for rmstId %dn", rmst_no);
  976.     DiffPrint(DEBUG_SOME_DETAILS,
  977.       "  processExpReq sets send_list_ expBase to %dn", frag_no);
  978.     new_send_msg.rmst_no_ = rmst_no;
  979.     new_send_msg.exp_base_ = frag_no;
  980.     new_send_msg.last_frag_sent_ = frag_no-1;
  981.     send_list_.push_front(new_send_msg);
  982.     if(!send_timer_active_){
  983.       TimerCallback *send_timer;
  984.       // Now add a timer to send this and any NAKS.
  985.       DiffPrint(DEBUG_SOME_DETAILS, "  Set a SEND_TIMER for reinforced rmst_no %dn", rmst_no);
  986.       send_timer = new RmstTimeout(this, rmst_no, SEND_TIMER);
  987.       // We check on things every second.
  988.       send_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(SEND_INTERVAL, send_timer);
  989.       send_timer_active_ = true;
  990.     }
  991.   }
  992. }
  993. handle RmstFilter::setupFilter()
  994. {
  995.   NRAttrVec attrs;
  996.   handle h;
  997.   // This is a dummy attribute for filtering that matches everything
  998.   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
  999.     NRAttribute::INTEREST_CLASS));
  1000.   h = ((DiffusionRouting *)dr_)->addFilter(&attrs, RMST_FILTER_PRIORITY, fcb_);
  1001.   ClearAttrs(&attrs);
  1002.   return h;
  1003. }
  1004. void RmstFilter::run()
  1005. {
  1006. #ifdef NS_DIFFUSION
  1007.   TimerCallback *stat_timer;
  1008.   filter_handle_ = setupFilter();
  1009.   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %dn",
  1010.     (int)filter_handle_);
  1011.   
  1012.   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timern");
  1013.   stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
  1014.   stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
  1015. #else
  1016.   // Doesn't do anything
  1017.   while(1){
  1018.       sleep(1000);
  1019.   }
  1020. #endif // NS_DIFFUSION
  1021. }
  1022. RmstTimeout::RmstTimeout(RmstFilter *rmst_flt, int no, int type)
  1023. {
  1024.   filter_ = rmst_flt;
  1025.   rmst_no_ = no;
  1026.   timer_type_ = type;
  1027. }
  1028. int RmstTimeout::expire()
  1029. {
  1030.   int retval;
  1031.   retval = filter_->processTimer(rmst_no_, timer_type_);
  1032.   if(retval == -1)
  1033.     delete this;
  1034.   return retval;
  1035. }
  1036. int RmstFilter::processTimer(int rmst_no, int timer_type)
  1037. {
  1038.   Rmst *rmst_ptr;
  1039.   void *frag_ptr;
  1040.   int frag_no;
  1041.   Int2Rmst::iterator rmst_iterator;
  1042.   timeval cur_time;
  1043.   GetTime (&cur_time);
  1044.   switch (timer_type){
  1045.   case SEND_TIMER:
  1046.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer SEND_TIMER");
  1047.     PrintTime(&cur_time);
  1048.     // If we haven't got any NAKs pending, send the next fragment of the Rmst
  1049.     // in progress. If we've sent all fragments of the current Rmst, we cancel
  1050.     // ourself. When we get a NAK, if this timer is active we send NAK responses
  1051.     // here. Otherwise we can send them directly from the NAK response routine.
  1052.     if (!nak_list_.empty()){
  1053.       Message *nak_resp;
  1054.       NRAttrVec nak_data_attrs;
  1055.       NakMsgData nak_msg_data = nak_list_.front();
  1056.       rmst_no = nak_msg_data.rmst_no_;
  1057.       frag_no = nak_msg_data.frag_no_;
  1058.       rmst_iterator = rmst_map_.find(rmst_no);
  1059.       if(rmst_iterator != rmst_map_.end()){
  1060.         rmst_ptr = (*rmst_iterator).second;
  1061.         // We have the fragment, set the last_data_time_ so that we defer
  1062.         // the cleanup of this Rmst, then we send the fragment to last hop.
  1063.         if (rmst_ptr->reinf_){
  1064.           GetTime(&rmst_ptr->last_data_time_);
  1065.           rmst_ptr->pkts_sent_++;
  1066.           nak_data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS,
  1067.             rmst_ptr->target_str_));
  1068.           nak_data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS,
  1069.             RMST_RESP));
  1070.           nak_data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
  1071.           // We routinely send the packet sent count on NAKs.
  1072.           nak_data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
  1073.             rmst_ptr->pkts_sent_));
  1074.           nak_data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
  1075.           // Add the actual data; the length depends on if it's the last
  1076.           // fragment or not.
  1077.           frag_ptr =  rmst_ptr->getFrag(frag_no);
  1078.           if (frag_no == rmst_ptr->max_frag_)
  1079.             nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
  1080.               frag_ptr, rmst_ptr->max_frag_len_));
  1081.           else
  1082.             nak_data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
  1083.               frag_ptr, MAX_FRAG_SIZE));
  1084.           DiffPrint(DEBUG_IMPORTANT, "  Filter sending Data for NAKed frag %d of Rmst %dn",
  1085.             frag_no, rmst_no);
  1086.           nak_resp = new Message(DIFFUSION_VERSION, DATA, 0, 0,
  1087.             nak_data_attrs.size(), pkt_count_, rdm_id_,
  1088.             LOCALHOST_ADDR, LOCALHOST_ADDR);
  1089.           nak_resp->msg_attr_vec_ = CopyAttrs(&nak_data_attrs);
  1090.           ((DiffusionRouting *)dr_)->sendMessage(nak_resp, filter_handle_);
  1091.           pkt_count_++;
  1092.           delete nak_resp;
  1093.           ClearAttrs(&nak_data_attrs);
  1094.         }
  1095.         else
  1096.           DiffPrint(DEBUG_IMPORTANT, 
  1097.             "RmstFilter::processTimer sees non-reinforced path for NAK on rmst %d!n",
  1098.             rmst_no);
  1099.       }
  1100.       else
  1101.         DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer can't find Rmst %d for NAK!n",
  1102.           rmst_no);
  1103.       nak_list_.pop_front();
  1104.     }
  1105.     else if (!send_list_.empty()){
  1106.       int8_t msg_type;
  1107.       int action = DO_NOTHING;
  1108.       // Get the rmst and frag_no that is in progress.
  1109.       NRAttrVec data_attrs;
  1110.       SendMsgData send_data = send_list_.front();
  1111.       rmst_no = send_data.rmst_no_;
  1112.       rmst_iterator = rmst_map_.find(rmst_no);
  1113.       if(rmst_iterator == rmst_map_.end())
  1114.         action = DELETE_FROM_QUEUE;
  1115.       else{
  1116.         rmst_ptr = (*rmst_iterator).second;
  1117.         if ((send_data.last_frag_sent_ == rmst_ptr->max_frag_) && rmst_ptr->reinf_)
  1118.           action = DELETE_FROM_QUEUE;
  1119.         else if ( (send_data.last_frag_sent_ == send_data.exp_base_) &&
  1120.           (!rmst_ptr->reinf_) && (exp_gap_ < 10) ){
  1121.           action = DO_NOTHING;
  1122.           exp_gap_++;
  1123.         }
  1124.         else
  1125.           action = SEND_NEXT_FRAG;
  1126.       }
  1127.       switch (action){
  1128.       case(DELETE_FROM_QUEUE):
  1129.         // Delete message data from front. 
  1130.         send_list_.pop_front();
  1131.         break;
  1132.       case(SEND_NEXT_FRAG):
  1133.         send_list_.pop_front();
  1134.         if (rmst_ptr->reinf_){
  1135.           send_data.last_frag_sent_++;
  1136.           frag_no = send_data.last_frag_sent_;
  1137.         }
  1138.         else{
  1139.           frag_no = send_data.exp_base_;
  1140.           send_data.last_frag_sent_ = frag_no;
  1141.         }
  1142.         send_list_.push_front(send_data);
  1143.         rmst_ptr->max_frag_sent_ = frag_no;
  1144.         DiffPrint(DEBUG_IMPORTANT, "  Source Filter sending frag %d of Rmst %dn",
  1145.           frag_no, rmst_no);
  1146.         
  1147.         // Now make a message.
  1148.         data_attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
  1149.         data_attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
  1150.         data_attrs.push_back(RmstFragAttr.make(NRAttribute::IS, frag_no));
  1151.         // We send the MaxFragAttr on the first Exploratory packet,
  1152.         // and the PktsSentAttr on the last packet.
  1153.         if(frag_no == 0)
  1154.           data_attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS,
  1155.             rmst_ptr->max_frag_));
  1156.         else if (frag_no == rmst_ptr->max_frag_){
  1157.           rmst_ptr->pkts_sent_++;
  1158.           data_attrs.push_back(RmstPktsSentAttr.make(NRAttribute::IS,
  1159.             rmst_ptr->pkts_sent_));
  1160.         }
  1161.         else
  1162.           rmst_ptr->pkts_sent_++;
  1163.         data_attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_no));
  1164.         // Add the actual data; the length depends on if it's the last
  1165.         // fragment or not.
  1166.         frag_ptr =  rmst_ptr->getFrag(frag_no);
  1167.         if (rmst_ptr->max_frag_ == frag_no)
  1168.           data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, 
  1169.             frag_ptr, rmst_ptr->max_frag_len_));
  1170.         else
  1171.           data_attrs.push_back(RmstDataAttr.make(NRAttribute::IS,
  1172.             frag_ptr, MAX_FRAG_SIZE));
  1173.         if (frag_no == send_data.exp_base_){
  1174.           ExpLog exp_msg;
  1175.           union LlToInt key;
  1176.           // Insert this Exploratory message in exp_map_.
  1177.           // When we get a reinforcement we'll know what rmst it's for.
  1178.           key.int_val_[0] = pkt_count_;
  1179.           key.int_val_[1] = rdm_id_;
  1180.           DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llxn", key.ll_val_);
  1181.           exp_msg.rmst_no_ = rmst_no;
  1182.           exp_msg.last_hop_ = LOCALHOST_ADDR;
  1183.           exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
  1184.           msg_type = EXPLORATORY_DATA;
  1185.           rmst_ptr->reinf_ = false;
  1186.           rmst_ptr->pkts_sent_ = 0;
  1187.           rmst_ptr->naks_rec_ = 0;
  1188.           exp_gap_ = 0;
  1189.           DiffPrint(DEBUG_IMPORTANT,
  1190.             "  Source Filter sending EXPLORATORY frag %d of Rmst %dn",
  1191.             frag_no, rmst_no);
  1192.         }
  1193.         else
  1194.           msg_type = DATA;
  1195.         Message *new_frag;
  1196.         new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0,
  1197.           data_attrs.size(), pkt_count_, rdm_id_,
  1198.           LOCALHOST_ADDR, LOCALHOST_ADDR);
  1199.         new_frag->msg_attr_vec_ = CopyAttrs(&data_attrs);
  1200.         ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
  1201.         pkt_count_++;
  1202.         delete new_frag;
  1203.         ClearAttrs(&data_attrs);
  1204.         // We sent a fragment, set the last_data_time_ for the cleanup timer.
  1205.         GetTime(&rmst_ptr->last_data_time_);
  1206.         // If this is the last frag, we start an ACK_TIMER.
  1207.         if ( (rmst_ptr->max_frag_ == frag_no) &&
  1208.           (rmst_ptr->ack_timer_active_ == false) && (rmst_ptr->local_source_) ){
  1209.           TimerCallback *rmst_timer;
  1210.           DiffPrint(DEBUG_SOME_DETAILS, "  Set an ACK_TIMER at source for rmst_no %dn",
  1211.             rmst_no);
  1212.           rmst_timer = new RmstTimeout(this, rmst_no, ACK_TIMER);
  1213.           // We check on things every 20 seconds.
  1214.           rmst_ptr->ack_timer_handle_ = 
  1215.             ((DiffusionRouting *)dr_)->addTimer(ACK_INTERVAL, rmst_timer);
  1216.           rmst_ptr->ack_timer_active_ = true;
  1217.         }
  1218.         break;
  1219.       case(DO_NOTHING):
  1220.         DiffPrint(DEBUG_LOTS_DETAILS, "  Nothing to don");
  1221.         break;
  1222.       } // Switch on Action
  1223.     }
  1224.     if (nak_list_.empty() && send_list_.empty()){
  1225.       DiffPrint(DEBUG_LOTS_DETAILS, "   Cancelling SEND_TIMER, no NAKS or data to sendn");
  1226.       send_timer_active_ = false;
  1227.       return -1;
  1228.     }
  1229.     else
  1230.       return 0;
  1231.     break;
  1232.   case WATCHDOG_TIMER:
  1233.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer WATCHDOG_TIMER for Rmst %d", rmst_no);
  1234.     PrintTime(&cur_time);
  1235.     rmst_iterator = rmst_map_.find(rmst_no);
  1236.     if(rmst_iterator != rmst_map_.end())
  1237.       rmst_ptr = (*rmst_iterator).second;
  1238.     else{
  1239.       DiffPrint(DEBUG_IMPORTANT,
  1240.         "RmstFilter::processTimer can't find Rmst %d for WATCHDOG, cancell timer!n",
  1241.         rmst_no);
  1242.       return -1;
  1243.     }
  1244.     if(rmst_ptr->cancel_watchdog_){
  1245.       DiffPrint(DEBUG_SOME_DETAILS,
  1246.         "  processTimer cancelling WATCHDOG_TIMER for Rmst %dn", rmst_no);
  1247.       rmst_ptr->watchdog_active_ = false;
  1248.       rmst_ptr->cancel_watchdog_ = false;
  1249.       return -1;
  1250.     }
  1251.     if (rmst_ptr->wait_for_new_path_){
  1252.       DiffPrint (DEBUG_IMPORTANT, "  WATCHDOG_TIMER sees wait_for_new_path_ - suspend NAKsn");
  1253.       return 0;
  1254.     }
  1255.     // If we sent an exp request more than 20 seconds ago,
  1256.     // we send it again.
  1257.     if (rmst_ptr->sent_exp_req_){
  1258.       int exp_time;
  1259.       exp_time = cur_time.tv_sec - rmst_ptr->exp_req_time_.tv_sec;
  1260.       DiffPrint(DEBUG_SOME_DETAILS,
  1261.         "  Node sent an EXP_REQ: time since last exp = %dn", exp_time);
  1262.       if( exp_time > 20){
  1263.         // Resend an EXP_REQ!!!
  1264.         DiffPrint(DEBUG_IMPORTANT, "  Node resends EXP_REQ up blacklisted stream!n");
  1265.         sendExpReqUpstream(rmst_ptr);
  1266.         GetTime(&rmst_ptr->exp_req_time_);
  1267.       }
  1268.       else
  1269.         DiffPrint(DEBUG_LOTS_DETAILS, "  Node waits to send another EXP_REQn");
  1270.       return 0;
  1271.     }
  1272.     if (rmst_ptr->local_source_){
  1273.       if(rmst_ptr->acked_){
  1274.         DiffPrint(DEBUG_IMPORTANT, 
  1275.           "  WATCHDOG_TIMER Local Source sees acked state - cancel timern");
  1276.         return -1;
  1277.       }
  1278.       else{
  1279.         DiffPrint(DEBUG_LOTS_DETAILS, "  WATCHDOG_TIMER Local Source sees rmst not ackedn");
  1280.         return 0;
  1281.       }
  1282.     }
  1283.     // Check if we have waited too long for next fragment.
  1284.     if( ((cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > NEXT_FRAG_WAIT) &&
  1285.       (!rmst_ptr->rmstComplete()) ){
  1286.       int newHole = (rmst_ptr->max_frag_rec_)+1;
  1287.       if ( (newHole <= rmst_ptr->max_frag_) && (!rmst_ptr->inHoleMap(newHole)) ){
  1288.         DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER adds new hole, frag %dn",
  1289.           newHole);
  1290.         rmst_ptr->putHole(newHole);
  1291.         NakData *nak_ptr = rmst_ptr->getHole(newHole);
  1292.         // Artificially age this hole so it gets naked immediately
  1293.         nak_ptr->tmv.tv_sec -= 4;
  1294.       }
  1295.     }
  1296.     if(rmst_ptr->holeMapEmpty()){
  1297.       // There aren't any holes!
  1298.       DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees No holesn");
  1299.       return 0;
  1300.     }
  1301.     else{
  1302.       // The WATCHDOG_TIMER expired and we have a hole,
  1303.       //   so we may need to construct a NAK from the hole map
  1304.       //   If we've fallen off the reinforced path, we must
  1305.       //   stop adding NAKs to the NakList.
  1306.       DiffPrint(DEBUG_SOME_DETAILS, "  WATCHDOG_TIMER sees holes - check times.n");
  1307.       if (rmst_ptr->reinf_)
  1308.         setupNak(rmst_no);
  1309.       // Reschedule timer with same value.
  1310.       return 0;
  1311.     }
  1312.     break;
  1313.   case ACK_TIMER:
  1314.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER for Rmst %d", rmst_no);
  1315.     PrintTime(&cur_time);
  1316.     rmst_iterator = rmst_map_.find(rmst_no);
  1317.     if(rmst_iterator != rmst_map_.end())
  1318.       rmst_ptr = (*rmst_iterator).second;
  1319.     else{
  1320.       DiffPrint(DEBUG_IMPORTANT,
  1321.         "RmstFilter::processTimer can't find Rmst %d for ACK_TIMER, cancell timer!n",
  1322.            rmst_no);
  1323.       return -1;
  1324.     }
  1325.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer ACK_TIMER, at source for rmst_no %dn",
  1326.       rmst_no);
  1327.     if (rmst_ptr->acked_){
  1328.       DiffPrint(DEBUG_IMPORTANT, "RmstFilter::processTimer cancel ACK_TIMER, Rmst %d ACKedn",
  1329.         rmst_no);
  1330.       rmst_ptr->ack_timer_active_ = false;
  1331.       return -1;
  1332.     }
  1333.     // If there has been no data sent for 30 seconds like a NAK response, we need to resend a packet.
  1334.     if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > ACK_WAIT ){
  1335.       NRAttrVec attrs;
  1336.       int8_t msg_type;
  1337.       DiffPrint(DEBUG_IMPORTANT, 
  1338.         "RmstFilter::processTimer ACK_TIMER, waited too long for Rmst %d ACK!n", rmst_no);
  1339.       if(rmst_ptr->reinf_  && !rmst_ptr->resent_last_data_){
  1340.         // We should send the last frag again as an DATA packet.
  1341.         DiffPrint(DEBUG_SOME_DETAILS, 
  1342.           "RmstFilter::processTimer ACK_TIMER, resend last packet as DATAn");
  1343.         msg_type = DATA;
  1344.         rmst_ptr->resent_last_data_ = true;
  1345.       }
  1346.       else if(rmst_ptr->resent_last_data_ && !rmst_ptr->resent_last_exp_){
  1347.         ExpLog exp_msg;
  1348.         union LlToInt key;
  1349.         // We tried resending last frag as data and it didn't work, try as EXP
  1350.         DiffPrint(DEBUG_IMPORTANT,
  1351.           "RmstFilter::processTimer ACK_TIMER, resend last packet as EXPLORATORY_DATAn");
  1352.         // Insert this Exploratory message in exp_map_.
  1353.         // When we get a reinforcement we'll know what rmst it's for.
  1354.         key.int_val_[0] = pkt_count_;
  1355.         key.int_val_[1] = rdm_id_;
  1356.         DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llxn", key.ll_val_);
  1357.         exp_msg.rmst_no_ = rmst_no;
  1358.         exp_msg.last_hop_ = LOCALHOST_ADDR;
  1359.         exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
  1360.         msg_type = EXPLORATORY_DATA;
  1361.         rmst_ptr->reinf_ = false;
  1362.         rmst_ptr->naks_rec_ = 0;
  1363.         rmst_ptr->pkts_sent_ = 0;
  1364.         rmst_ptr->resent_last_exp_ = true;
  1365.       }
  1366.       else if(rmst_ptr->resent_last_data_ && rmst_ptr->resent_last_exp_ && rmst_ptr->reinf_){
  1367.         // We should send the last frag again as an DATA packet.
  1368.         DiffPrint(DEBUG_IMPORTANT,
  1369.           "RmstFilter::processTimer ACK_TIMER, resend last packet on new reinf path as DATAn");
  1370.         msg_type = DATA;
  1371.         rmst_ptr->resent_last_data_ = false;
  1372.         rmst_ptr->resent_last_exp_ = false;
  1373.       }
  1374.       else{
  1375.         ExpLog exp_msg;
  1376.         union LlToInt key;
  1377.         DiffPrint(DEBUG_IMPORTANT, 
  1378.           "RmstFilter::processTimer ACK_TIMER, resent last packet as EXP and no reinforced path, Try again!n");
  1379.         // Insert this Exploratory message in exp_map_.
  1380.         // When we get a reinforcement we'll know what rmst it's for.
  1381.         key.int_val_[0] = pkt_count_;
  1382.         key.int_val_[1] = rdm_id_;
  1383.         DiffPrint(DEBUG_LOTS_DETAILS, "  Key = %llxn", key.ll_val_);
  1384.         exp_msg.rmst_no_ = rmst_no;
  1385.         exp_msg.last_hop_ = LOCALHOST_ADDR;
  1386.         exp_map_.insert(Key2ExpLog::value_type(key.ll_val_, exp_msg));
  1387.         msg_type = EXPLORATORY_DATA;
  1388.       }
  1389.       attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
  1390.       attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
  1391.       attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
  1392.       attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
  1393.       frag_ptr =  rmst_ptr->getFrag(rmst_ptr->max_frag_);
  1394.       attrs.push_back(RmstDataAttr.make(NRAttribute::IS, 
  1395.         frag_ptr, rmst_ptr->max_frag_len_));
  1396.       Message *new_frag;
  1397.       new_frag = new Message(DIFFUSION_VERSION, msg_type, 0, 0, attrs.size(), pkt_count_,
  1398.         rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
  1399.       new_frag->msg_attr_vec_ = CopyAttrs(&attrs);
  1400.       ((DiffusionRouting *)dr_)->sendMessage(new_frag, filter_handle_);
  1401.       pkt_count_++;
  1402.       delete new_frag;
  1403.       ClearAttrs(&attrs);
  1404.       // We sent a fragment, set the last_data_time_ for the cleanup timer.
  1405.       GetTime(&rmst_ptr->last_data_time_);
  1406.     }
  1407.     return 0;
  1408.     break;
  1409.   case CLEANUP_TIMER:
  1410.     DiffPrint(DEBUG_SOME_DETAILS, "RmstFilter::processTimer CLEANUP_TIMER");
  1411.     PrintTime(&cur_time);
  1412.     DiffPrint(DEBUG_IMPORTANT, "  CLEANUP_TIMER calledn");
  1413.     rmst_iterator = rmst_map_.begin();
  1414.     while(rmst_iterator != rmst_map_.end()){
  1415.       rmst_ptr = (*rmst_iterator).second;
  1416.       DiffPrint(DEBUG_SOME_DETAILS, 
  1417.         "  CLEANUP_TIMER:: rmst_no %d : pkts_sent_ = %d, pkts_rec_ = %d, last_hop_pkts_sent_ = %dn",
  1418.         rmst_ptr->rmst_no_, rmst_ptr->pkts_sent_, rmst_ptr->pkts_rec_, rmst_ptr->last_hop_pkts_sent_);
  1419.       if((!rmst_ptr->reinf_)&&(!rmst_ptr->acked_)&&(!rmst_ptr->local_source_)&&(!local_sink_)){
  1420.         if( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > LONG_CLEANUP_WAIT )
  1421.           cleanUpRmst(rmst_ptr);
  1422.       }
  1423.       else if (rmst_ptr->acked_){
  1424.         if ( ( (cur_time.tv_sec - rmst_ptr->last_data_time_.tv_sec) > SHORT_CLEANUP_WAIT ) &&
  1425.            ( (cur_time.tv_sec - rmst_ptr->last_nak_time_.tv_sec) > SHORT_CLEANUP_WAIT ) )
  1426.            cleanUpRmst(rmst_ptr);
  1427.       }
  1428.             
  1429.       rmst_iterator++;
  1430.     }
  1431.     // Check on the BlackList, in case network is partitioned.
  1432.     if (!black_list_.empty()){
  1433.       if ( (cur_time.tv_sec - last_data_rec_.tv_sec) > RMST_BLACKLIST_WAIT ){
  1434.         DiffPrint(DEBUG_IMPORTANT, "  clearing black_list_!n");
  1435.         ((DiffusionRouting *)dr_)->clearBlacklist();
  1436.         black_list_.clear();
  1437.       }
  1438.     }
  1439.     if (local_sink_){
  1440.       if ( (cur_time.tv_sec - last_sink_time_.tv_sec) > SINK_REFRESH_WAIT ){
  1441.         DiffPrint(DEBUG_IMPORTANT, "  local sink timed outn");
  1442.         local_sink_ = false;
  1443.       }
  1444.       else
  1445.         DiffPrint(DEBUG_IMPORTANT, "  local sink still alive.n");
  1446.     }
  1447.     return 0;
  1448.     break;
  1449.   default:
  1450.     break;
  1451.   }
  1452.   return -1;
  1453. }
  1454. void RmstFilter::sendRmstToSink(Rmst *rmst_ptr)
  1455. {
  1456.   NRAttrVec attrs;
  1457.   Message *rmst_msg;
  1458.   NRSimpleAttribute<void *> *rmst_data_attr;
  1459.   NRSimpleAttribute<int> *frag_number_attr;
  1460.   void *frag_ptr;
  1461.   int size, i;
  1462.   DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - sending rmst %d to local sinkn",
  1463.     rmst_ptr->rmst_no_);
  1464.   // Prepare attribute vector
  1465.   attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, rmst_ptr->target_str_));
  1466.   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
  1467.   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
  1468.   attrs.push_back(RmstMaxFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_));
  1469.   frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
  1470.   attrs.push_back(frag_number_attr);
  1471.   // Add the blob fragment
  1472.   if (rmst_ptr->max_frag_ == 0)
  1473.     size = rmst_ptr->max_frag_len_;
  1474.   else
  1475.     size = MAX_FRAG_SIZE;
  1476.   frag_ptr =  rmst_ptr->getFrag(0);
  1477.   rmst_data_attr = RmstDataAttr.make(NRAttribute::IS, frag_ptr, size);
  1478.   attrs.push_back(rmst_data_attr);
  1479.   // Prepare the message
  1480.   rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
  1481.     pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
  1482.   rmst_msg->next_hop_ = LOCALHOST_ADDR;
  1483.   rmst_msg->next_port_ = local_sink_port_;
  1484.   rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
  1485.   ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
  1486.   delete rmst_msg;
  1487.   pkt_count_++;
  1488.   // Send all the fragments
  1489.   for (i=1; i <= (rmst_ptr->max_frag_); i++){
  1490.     frag_number_attr->setVal(i);
  1491.     frag_ptr =  rmst_ptr->getFrag(i);
  1492.     if(frag_ptr == NULL)
  1493.       DiffPrint(DEBUG_IMPORTANT, "RmstFilter::sendRmstToSink - got a null frag_ptr for frag!%dn",
  1494.       i);
  1495.     else{
  1496.       if (rmst_ptr->max_frag_ == i)
  1497.         rmst_data_attr->setVal(frag_ptr, rmst_ptr->max_frag_len_);
  1498.       else
  1499.         rmst_data_attr->setVal(frag_ptr, MAX_FRAG_SIZE);
  1500.     }
  1501.     rmst_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
  1502.       pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
  1503.     rmst_msg->next_hop_ = LOCALHOST_ADDR;
  1504.     rmst_msg->next_port_ = local_sink_port_;
  1505.     rmst_msg->msg_attr_vec_= CopyAttrs(&attrs);
  1506.     ((DiffusionRouting *)dr_)->sendMessage(rmst_msg, filter_handle_, 1);
  1507.     delete rmst_msg;
  1508.     pkt_count_++;
  1509.   }
  1510.   ClearAttrs(&attrs);
  1511. }
  1512. void RmstFilter::sendAckToSource(Rmst *rmst_ptr)
  1513. {
  1514.   NRAttrVec attrs;
  1515.   Message *ack_msg;
  1516.   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, ACK_RESP));
  1517.   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
  1518.   // New code to send a message to last_hop_
  1519.   ack_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
  1520.             attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
  1521.             LOCALHOST_ADDR);
  1522.   ack_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  1523.   DiffPrint(DEBUG_IMPORTANT, "  Sending ACK_RESP to node %dn", rmst_ptr->last_hop_);
  1524.   ((DiffusionRouting *)dr_)->sendMessage(ack_msg, filter_handle_, 1);
  1525.   pkt_count_++;
  1526.   delete ack_msg;
  1527.   ClearAttrs(&attrs);
  1528. }
  1529. void RmstFilter::sendExpReqUpstream(Rmst *rmst_ptr)
  1530. {
  1531.   NRAttrVec attrs;
  1532.   Message *exp_msg;
  1533.   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, EXP_REQ));
  1534.   attrs.push_back(RmstIdAttr.make(NRAttribute::IS, rmst_ptr->rmst_no_));
  1535.   attrs.push_back(RmstFragAttr.make(NRAttribute::IS, rmst_ptr->max_frag_rec_));
  1536.   exp_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0,
  1537.             attrs.size(), pkt_count_, rdm_id_, rmst_ptr->last_hop_,
  1538.             LOCALHOST_ADDR);
  1539.   exp_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  1540.   DiffPrint(DEBUG_IMPORTANT, "  Sending EXP_REQ to node %dn", rmst_ptr->last_hop_);
  1541.   ((DiffusionRouting *)dr_)->sendMessage(exp_msg, filter_handle_, 1);
  1542.   pkt_count_++;
  1543.   delete exp_msg;
  1544.   ClearAttrs(&attrs);
  1545. }
  1546. void RmstFilter::sendContToSource(Rmst *rmst_ptr)
  1547. {
  1548.   NRAttrVec attrs;
  1549.   Message *cont_msg;
  1550.   attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::EQ, RMST_CONT));
  1551.   attrs.push_back(NRClassAttr.make(NRAttribute::IS,
  1552.     NRAttribute::INTEREST_CLASS));
  1553.   DiffPrint(DEBUG_IMPORTANT, "  Sending a RMST_CONT to sourcen");
  1554.   cont_msg = new Message(DIFFUSION_VERSION, DATA, 0, 0, attrs.size(),
  1555.     pkt_count_, rdm_id_, LOCALHOST_ADDR, LOCALHOST_ADDR);
  1556.   cont_msg->msg_attr_vec_ = CopyAttrs(&attrs);
  1557.   cont_msg->next_hop_ = LOCALHOST_ADDR;
  1558.   cont_msg->next_port_ = rmst_ptr->local_source_port_;
  1559.   ((DiffusionRouting *)dr_)->sendMessage(cont_msg, filter_handle_, 1);
  1560.   pkt_count_++;
  1561.   delete cont_msg;
  1562.   ClearAttrs(&attrs);
  1563. }
  1564. void RmstFilter::cleanUpRmst(Rmst *rmst_ptr)
  1565. {
  1566.   int rmst_no = rmst_ptr->rmst_no_;
  1567.   Int2Rmst::iterator rmst_iterator;
  1568.   Key2ExpLog::iterator exp_iterator;
  1569.   ExpLog exp_msg;
  1570.   rmst_no = rmst_ptr->rmst_no_;
  1571.   DiffPrint(DEBUG_IMPORTANT, "  cleanUpRmst called to delete Rmst %dn", rmst_no);
  1572.   if(rmst_ptr->watchdog_active_)
  1573.     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->watchdog_handle_);
  1574.   if(rmst_ptr->ack_timer_active_)
  1575.     ((DiffusionRouting *)dr_)->removeTimer(rmst_ptr->ack_timer_handle_);
  1576.   rmst_iterator = rmst_map_.find(rmst_no);
  1577.   if(rmst_iterator != rmst_map_.end()){
  1578.     rmst_map_.erase(rmst_iterator);
  1579.   }
  1580.   delete rmst_ptr;
  1581.   // clean up the exp_map_ of any entries base on this rmst
  1582.   exp_iterator = exp_map_.begin();
  1583.   while(exp_iterator != exp_map_.end()){
  1584.     exp_msg = (*exp_iterator).second;
  1585.     if(exp_msg.rmst_no_ == rmst_no){
  1586.       DiffPrint(DEBUG_LOTS_DETAILS, "  cleanUpRmst deleting exp_map_ entry for Rmst %dn", rmst_no);
  1587.       exp_map_.erase(exp_iterator);
  1588.     }
  1589.     exp_iterator++;
  1590.   }
  1591. }
  1592. #ifdef NS_DIFFUSION
  1593. RmstFilter::RmstFilter()
  1594. {
  1595. #else
  1596. RmstFilter::RmstFilter(int argc, char **argv)
  1597. {
  1598.   TimerCallback *stat_timer;
  1599.   parseCommandLine(argc, argv);
  1600.   dr_ = NR::createNR(diffusion_port_);
  1601. #endif // NS_DIFFUSION
  1602.   fcb_ = new RmstFilterCallback;
  1603.   fcb_->app_ = this;
  1604.   rdm_id_ = rand();
  1605.   pkt_count_ = rand();
  1606.   local_sink_ = false;
  1607.   caching_mode_ = false;
  1608.   send_timer_active_ = false;
  1609.   DiffPrint(DEBUG_ALWAYS, "RmstFilter constructor: rdm_id_ = %x, pkt_count_ = %xn",
  1610.     rdm_id_, pkt_count_);
  1611. #ifndef NS_DIFFUSION
  1612.   filter_handle_ = setupFilter();
  1613.   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter:: subscribed to all, received handle %dn",
  1614.     (int)filter_handle_);
  1615.   
  1616.   DiffPrint(DEBUG_LOTS_DETAILS, "RmstFilter constructor: start cleanup timern");
  1617.   stat_timer = new RmstTimeout(this, -1, CLEANUP_TIMER);
  1618.   stat_timer_handle_ = ((DiffusionRouting *)dr_)->addTimer(CLEANUP_INTERVAL, stat_timer);
  1619. #endif // !NS_DIFFUSION
  1620. }
  1621. #ifndef NS_DIFFUSION
  1622. int main(int argc, char **argv)
  1623. {
  1624.   RmstFilter *app;
  1625.   app = new RmstFilter(argc, argv);
  1626.   app->run();
  1627.   return 0;
  1628. }
  1629. #endif // !NS_DIFFUSION