one_phase_pull.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:39k
源码类别:

通讯编程

开发平台:

Visual C++

  1. //
  2. // one_phase_pull.cc    : One-Phase Pull Filter
  3. // author               : Fabio Silva
  4. //
  5. // Copyright (C) 2000-2003 by the University of Southern California
  6. // $Id: one_phase_pull.cc,v 1.6 2005/09/13 04:53:47 tomh Exp $
  7. //
  8. // This program is free software; you can redistribute it and/or
  9. // modify it under the terms of the GNU General Public License,
  10. // version 2, as published by the Free Software Foundation.
  11. //
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. //
  17. // You should have received a copy of the GNU General Public License along
  18. // with this program; if not, write to the Free Software Foundation, Inc.,
  19. // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  20. //
  21. // Linking this file statically or dynamically with other modules is making
  22. // a combined work based on this file.  Thus, the terms and conditions of
  23. // the GNU General Public License cover the whole combination.
  24. //
  25. // In addition, as a special exception, the copyright holders of this file
  26. // give you permission to combine this file with free software programs or
  27. // libraries that are released under the GNU LGPL and with code included in
  28. // the standard release of ns-2 under the Apache 2.0 license or under
  29. // otherwise-compatible licenses with advertising requirements (or modified
  30. // versions of such code, with unchanged license).  You may copy and
  31. // distribute such a system following the terms of the GNU GPL for this
  32. // file and the licenses of the other code concerned, provided that you
  33. // include the source code of that other code when and as the GNU GPL
  34. // requires distribution of source code.
  35. //
  36. // Note that people who make modified versions of this file are not
  37. // obligated to grant this special exception for their modified versions;
  38. // it is their choice whether to do so.  The GNU General Public License
  39. // gives permission to release a modified version without this exception;
  40. // this exception also makes it possible to release a modified version
  41. // which carries forward this exception.
  42. #include "one_phase_pull.hh"
  43. #ifdef NS_DIFFUSION
  44. static class OnePhasePullFilterClass : public TclClass {
  45. public:
  46.   OnePhasePullFilterClass() : TclClass("Application/DiffApp/OnePhasePullFilter") {}
  47.   TclObject* create(int argc, const char*const* argv) {
  48.     if (argc == 5)
  49.       return(new OnePhasePullFilter(argv[4]));
  50.     else
  51.       fprintf(stderr,
  52.       "Insufficient number of args for creating OnePhasePullFilter");
  53.     return (NULL);
  54.   }
  55. } class_one_phase_pull_filter;
  56. int OnePhasePullFilter::command(int argc, const char*const* argv) {
  57.   if (argc == 3) {
  58.     if (strcasecmp(argv[1], "debug") == 0) {
  59.       global_debug_level = atoi(argv[2]);
  60.       if (global_debug_level < 1 || global_debug_level > 10) {
  61. global_debug_level = DEBUG_DEFAULT;
  62. printf("Error: Debug level outside range(1-10) or missing !n");
  63.       }
  64.     }
  65.   }
  66.   return DiffApp::command(argc, argv);
  67. }
  68. #endif // NS_DIFFUSION
  69. void OnePhasePullFilterReceive::recv(Message *msg, handle h)
  70. {
  71.   filter_->recv(msg, h);
  72. }
  73. int OppMessageSendTimer::expire()
  74. {
  75.   // Call timeout function
  76.   agent_->messageTimeout(msg_);
  77.   // Do not reschedule this timer
  78.   delete this;
  79.   return -1;
  80. }
  81. int OppInterestForwardTimer::expire()
  82. {
  83.   // Call timeout function
  84.   agent_->interestTimeout(msg_);
  85.   // Do not reschedule this timer
  86.   delete this;
  87.   return -1;
  88. }
  89. int OppSubscriptionExpirationTimer::expire()
  90. {
  91.   int retval;
  92.   retval = agent_->subscriptionTimeout(attrs_);
  93.   // Delete timer if we are not rescheduling it
  94.   if (retval == -1)
  95.     delete this;
  96.   return retval;
  97. }
  98. int OppGradientExpirationCheckTimer::expire()
  99. {
  100.   // Call the callback function
  101.   agent_->gradientTimeout();
  102.   // Reschedule this timer
  103.   return 0;
  104. }
  105. int OppReinforcementCheckTimer::expire()
  106. {
  107.   // Call the callback function
  108.   agent_->reinforcementTimeout();
  109.   // Reschedule this timer
  110.   return 0;
  111. }
  112. RoundIdEntry * RoutingEntry::findRoundIdEntry(int32_t round_id)
  113. {
  114.   RoundIdList::iterator round_id_itr;
  115.   RoundIdEntry *round_id_entry;
  116.   // Iterate through round ids for this routing entry
  117.   for (round_id_itr = round_ids_.begin();
  118.        round_id_itr != round_ids_.end(); round_id_itr++){
  119.     round_id_entry = *round_id_itr;
  120.     // Check if round ids match
  121.     if (round_id_entry->round_id_ == round_id)
  122.       return round_id_entry;
  123.   }
  124.   // Couldn't find a matching round id entry
  125.   return NULL;
  126. }
  127. RoundIdEntry * RoutingEntry::addRoundIdEntry(int32_t round_id)
  128. {
  129.   RoundIdEntry *round_id_entry;
  130.   // Create a new round id entry
  131.   round_id_entry = new RoundIdEntry(round_id);
  132.   // Add it to the round id list
  133.   round_ids_.push_back(round_id_entry);
  134.   return round_id_entry;
  135. }
  136. void RoutingEntry::updateNeighborDataInfo(int32_t node_id,
  137.   bool new_message)
  138. {
  139.   DataNeighborList::iterator data_neighbor_itr;
  140.   OPPDataNeighborEntry *data_neighbor_entry;
  141.   for (data_neighbor_itr = data_neighbors_.begin();
  142.        data_neighbor_itr != data_neighbors_.end(); ++data_neighbor_itr){
  143.     data_neighbor_entry = *data_neighbor_itr;
  144.     // Find neighbor
  145.     if (data_neighbor_entry->node_id_ == node_id){
  146.       // Increment message count
  147.       data_neighbor_entry->messages_++;
  148.       // If this is a new message, just set flag and return
  149.       if (new_message){
  150. data_neighbor_entry->new_messages_ = new_message;
  151. return;
  152.       }
  153.     }
  154.   }
  155.   // We need to add a new data neighbor
  156.   data_neighbor_entry = new OPPDataNeighborEntry(node_id);
  157.   data_neighbor_entry->new_messages_ = new_message;
  158.   data_neighbors_.push_back(data_neighbor_entry);
  159. }
  160. void RoutingEntry::addGradient(int32_t last_hop,
  161.        int32_t round_id, bool new_gradient)
  162. {
  163.   RoundIdEntry *round_id_entry;
  164.   OPPGradientEntry *gradient_entry;
  165.   // Look for an existing routing id entry
  166.   round_id_entry = findRoundIdEntry(round_id);
  167.   // Create new entry if not found
  168.   if (!round_id_entry)
  169.     round_id_entry = addRoundIdEntry(round_id);
  170.   if (new_gradient){
  171.     // Marks the beginning of a new round
  172.     round_id_entry->gradients_.clear();
  173.   }
  174.   else{
  175.     // Look for a gradient to our last_hop neighbor
  176.     gradient_entry = round_id_entry->findGradient(last_hop);
  177.     if (gradient_entry){
  178.       // Gradient already in the list, we just update time
  179.       GetTime(&gradient_entry->tv_);
  180.       return;
  181.     }
  182.   }
  183.   // Gradient not yet in the list, add this neighbor to the list
  184.   round_id_entry->addGradient(last_hop);
  185. }
  186. void RoutingEntry::updateSink(u_int16_t sink_id, int32_t round_id)
  187. {
  188.   RoundIdEntry *round_id_entry;
  189.   // Lock for an existing round id entry
  190.   round_id_entry = findRoundIdEntry(round_id);
  191.   // Create new entry if not found
  192.   if (!round_id_entry)
  193.     round_id_entry = addRoundIdEntry(round_id);
  194.   // Add/Update this sink
  195.   round_id_entry->updateSink(sink_id);
  196. }
  197. void RoutingEntry::deleteExpiredRoundIds()
  198. {
  199.   RoundIdList::iterator round_id_itr;
  200.   RoundIdEntry *round_id_entry;
  201.   struct timeval tmv;
  202.   GetTime(&tmv);
  203.   // Go through all round ids
  204.   for (round_id_itr = round_ids_.begin();
  205.        round_id_itr != round_ids_.end(); round_id_itr++){
  206.     round_id_entry = *round_id_itr;
  207.     round_id_entry->deleteExpiredSinks();
  208.     round_id_entry->deleteExpiredGradients();
  209.     // Delete round id if nothing left
  210.     if (round_id_entry->gradients_.size() == 0 &&
  211. round_id_entry->sinks_.size() == 0){
  212.       // Round Id has expired, delete it from the list
  213.       DiffPrint(DEBUG_NO_DETAILS, "Delete expired Round Id: %dn",
  214. round_id_entry->round_id_);
  215.       round_id_itr = round_ids_.erase(round_id_itr);
  216.       delete round_id_entry;
  217.     }
  218.   }
  219. }
  220. void RoutingEntry::getSinksFromList(FlowIdList *msg_list,
  221.     FlowIdList *sink_list)
  222. {
  223.   RoundIdList::iterator round_id_itr;
  224.   RoundIdEntry *round_id_entry;
  225.   FlowIdList::iterator flow_id_itr;
  226.   for (round_id_itr = round_ids_.begin();
  227.        round_id_itr != round_ids_.end(); round_id_itr++){
  228.     round_id_entry = *round_id_itr;
  229.     flow_id_itr = find(msg_list->begin(),
  230.        msg_list->end(), round_id_entry->round_id_);
  231.     if (flow_id_itr != msg_list->end()){
  232.       // Flow id in the list
  233.       if (round_id_entry->sinks_.size() > 0){
  234. sink_list->push_back(round_id_entry->round_id_);
  235.       }
  236.     }
  237.   }
  238. }
  239. void RoutingEntry::getFlowsFromList(FlowIdList *msg_list,
  240.     FlowIdList *flow_list)
  241. {
  242.   RoundIdList::iterator round_id_itr;
  243.   RoundIdEntry *round_id_entry;
  244.   FlowIdList::iterator flow_id_itr;
  245.   for (round_id_itr = round_ids_.begin();
  246.        round_id_itr != round_ids_.end(); round_id_itr++){
  247.     round_id_entry = *round_id_itr;
  248.     flow_id_itr = find(msg_list->begin(),
  249.        msg_list->end(), round_id_entry->round_id_);
  250.     if (flow_id_itr != msg_list->end()){
  251.       // Flow id in the list
  252.       if (round_id_entry->sinks_.size() == 0){
  253. // This is a flow we have no local sink for
  254. flow_list->push_back(round_id_entry->round_id_);
  255.       }
  256.     }
  257.   }
  258. }
  259. int32_t RoutingEntry::getNeighborFromFlow(int32_t flow_id)
  260. {
  261.   RoundIdList::iterator round_id_itr;
  262.   RoundIdEntry *round_id_entry;
  263.   OPPGradientEntry *gradient_entry;
  264.   for (round_id_itr = round_ids_.begin();
  265.        round_id_itr != round_ids_.end(); round_id_itr++){
  266.     round_id_entry = *round_id_itr;
  267.     if (round_id_entry->round_id_ == flow_id){
  268.       // Flow matches, get 'reinforced neighbor'
  269.       if (round_id_entry->gradients_.size() > 0){
  270. // Get the first gradient
  271. gradient_entry = *round_id_entry->gradients_.begin();
  272. return gradient_entry->node_id_;
  273.       }
  274.       DiffPrint(DEBUG_ALWAYS, "Cannot find 'reinforced neighbor !n");
  275.       break;
  276.     }
  277.   }
  278.   // Couldn't find neighbor for this flow
  279.   return BROADCAST_ADDR;
  280. }
  281. void RoundIdEntry::deleteExpiredSinks()
  282. {
  283.   SinkList::iterator sink_itr;
  284.   SinkEntry *sink_entry;
  285.   struct timeval tmv;
  286.   GetTime(&tmv);
  287.   // Go through all sinks
  288.   for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){
  289.     sink_entry = *sink_itr;
  290.     // Check if expired
  291.     if (tmv.tv_sec > (sink_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
  292.       // Expired, delete it
  293.       DiffPrint(DEBUG_NO_DETAILS,
  294. "Deleting Gradient to sink %d !n", sink_entry->port_);
  295.       sink_itr = sinks_.erase(sink_itr);
  296.       delete sink_entry;
  297.     }
  298.   }
  299. }
  300. void RoundIdEntry::deleteExpiredGradients()
  301. {
  302.   GradientList::iterator gradient_itr;
  303.   OPPGradientEntry *gradient_entry;
  304.   struct timeval tmv;
  305.   GetTime(&tmv);
  306.   // Go through all gradients
  307.   for (gradient_itr = gradients_.begin();
  308.        gradient_itr != gradients_.end(); gradient_itr++){
  309.     gradient_entry = *gradient_itr;
  310.     // Check if expired
  311.     if (tmv.tv_sec > (gradient_entry->tv_.tv_sec + GRADIENT_TIMEOUT)){
  312.       // Expired, delete it
  313.       DiffPrint(DEBUG_NO_DETAILS,
  314. "Deleting gradient to node %d !n",
  315. gradient_entry->node_id_);
  316.       gradient_itr = gradients_.erase(gradient_itr);
  317.       delete gradient_entry;
  318.     }
  319.   }
  320. }
  321. void RoundIdEntry::updateSink(u_int16_t sink_id)
  322. {
  323.   SinkList::iterator sink_itr;
  324.   SinkEntry *sink_entry;
  325.   // Go through all sinks
  326.   for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); ++sink_itr){
  327.     sink_entry = *sink_itr;
  328.     if (sink_entry->port_ == sink_id){
  329.       // We already have this guy
  330.       GetTime(&(sink_entry->tv_));
  331.       return;
  332.     }
  333.   }
  334.   // This is a new sink, so we create a new entry on the list
  335.   sink_entry = new SinkEntry(sink_id);
  336.   sinks_.push_back(sink_entry);
  337. }
  338. OPPGradientEntry * RoundIdEntry::findGradient(int32_t node_id)
  339. {
  340.   GradientList::iterator gradient_itr;
  341.   OPPGradientEntry *gradient_entry;
  342.   // Go through all gradients
  343.   for (gradient_itr = gradients_.begin();
  344.        gradient_itr != gradients_.end(); gradient_itr++){
  345.     gradient_entry = *gradient_itr;
  346.     // Is this the one we are looking for ?
  347.     if (gradient_entry->node_id_ == node_id)
  348.       return gradient_entry;
  349.   }
  350.   // Did not find a match
  351.   return NULL;
  352. }
  353. void RoundIdEntry::addGradient(int32_t node_id)
  354. {
  355.   OPPGradientEntry *gradient_entry;
  356.   // Create new gradient
  357.   gradient_entry = new OPPGradientEntry(node_id);
  358.   gradients_.push_back(gradient_entry);
  359. }
  360. void RoundIdEntry::deleteGradient(int32_t node_id)
  361. {
  362.   GradientList::iterator gradient_itr;
  363.   OPPGradientEntry *gradient_entry;
  364.   // Go through all gradients
  365.   for (gradient_itr = gradients_.begin();
  366.        gradient_itr != gradients_.end(); gradient_itr++){
  367.     gradient_entry = *gradient_itr;
  368.     // Is this the one we are looking for ?
  369.     if (gradient_entry->node_id_ == node_id){
  370.   
  371.       DiffPrint(DEBUG_NO_DETAILS, "Deleting gradient to node %d !n",
  372. node_id);
  373.       // Found. Delete it from the list and return
  374.       gradient_itr = gradients_.erase(gradient_itr);
  375.       delete gradient_entry;
  376.       return;
  377.     }
  378.   }
  379. }
  380. void OnePhasePullFilter::interestTimeout(Message *msg)
  381. {
  382.   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Interest Timeout !n", ((DiffusionRouting *)dr_)->getNodeId());
  383.   msg->last_hop_ = LOCALHOST_ADDR;
  384.   msg->next_hop_ = BROADCAST_ADDR;
  385.  
  386.   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
  387. }
  388. void OnePhasePullFilter::messageTimeout(Message *msg)
  389. {
  390.   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Message Timeout !n", ((DiffusionRouting *)dr_)->getNodeId());
  391.   ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
  392. }
  393. void OnePhasePullFilter::gradientTimeout()
  394. {
  395.   RoutingTable::iterator routing_itr;
  396.   RoutingEntry *routing_entry;
  397.   DiffPrint(DEBUG_MORE_DETAILS, "Node%d: Gradient Timeout !n",((DiffusionRouting *)dr_)->getNodeId());
  398.   routing_itr = routing_list_.begin();
  399.   // Iterate through the routing table
  400.   for (routing_itr = routing_list_.begin();
  401.        routing_itr != routing_list_.end(); routing_itr++){
  402.     routing_entry = *routing_itr;
  403.     // Step 1: Delete expired round ids
  404.     routing_entry->deleteExpiredRoundIds();
  405.     // Step 2: Remove the routing entry if no round ids left
  406.     if (routing_entry->round_ids_.size() == 0){
  407.       // Deleting Routing Entry
  408.       DiffPrint(DEBUG_DETAILS,
  409. "Nothing left for this data type, cleaning up !n");
  410.       routing_itr = routing_list_.erase(routing_itr);
  411.       delete routing_entry;
  412.     }
  413.   }
  414. }
  415. void OnePhasePullFilter::reinforcementTimeout()
  416. {
  417.   DataNeighborList::iterator data_neighbor_itr;
  418.   OPPDataNeighborEntry *data_neighbor_entry;
  419.   RoutingTable::iterator routing_itr;
  420.   RoutingEntry *routing_entry;
  421.   Message *my_message;
  422.   DiffPrint(DEBUG_MORE_DETAILS, "Reinforcement Timeout !n");
  423.   routing_itr = routing_list_.begin();
  424.   while (routing_itr != routing_list_.end()){
  425.     routing_entry = *routing_itr;
  426.     // Step 1: Delete expired gradients
  427.     data_neighbor_itr = routing_entry->data_neighbors_.begin();
  428.     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
  429.       data_neighbor_entry = *data_neighbor_itr;
  430.       if ((!data_neighbor_entry->new_messages_) &&
  431.   (data_neighbor_entry->messages_ > 0)){
  432. my_message = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
  433.  0, 0, routing_entry->attrs_->size(), pkt_count_,
  434.  random_id_, data_neighbor_entry->node_id_,
  435.  LOCALHOST_ADDR);
  436. my_message->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
  437. DiffPrint(DEBUG_NO_DETAILS,
  438.   "Node%d: Sending Negative Reinforcement to node %d !n",
  439.   ((DiffusionRouting *)dr_)->getNodeId(), data_neighbor_entry->node_id_);
  440. ((DiffusionRouting *)dr_)->sendMessage(my_message, filter_handle_);
  441. pkt_count_++;
  442. delete my_message;
  443. // Done. Delete entry
  444. data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
  445. delete data_neighbor_entry;
  446.       }
  447.       else{
  448. data_neighbor_itr++;
  449.       }
  450.     }
  451.     // Step 2: Delete data neighbors with no activity, zero flags
  452.     data_neighbor_itr = routing_entry->data_neighbors_.begin();
  453.     while (data_neighbor_itr != routing_entry->data_neighbors_.end()){
  454.       data_neighbor_entry = *data_neighbor_itr;
  455.       if (data_neighbor_entry->messages_ > 0){
  456. data_neighbor_entry->messages_ = 0;
  457. data_neighbor_entry->new_messages_ = false;
  458. data_neighbor_itr++;
  459.       }
  460.       else{
  461. // Delete entry
  462. data_neighbor_itr = routing_entry->data_neighbors_.erase(data_neighbor_itr);
  463. delete data_neighbor_entry;
  464.       }
  465.     }
  466.     // Advance to the next routing entry
  467.     routing_itr++;
  468.   }
  469. }
  470. int OnePhasePullFilter::subscriptionTimeout(NRAttrVec *attrs)
  471. {
  472.   SubscriptionList::iterator subscription_itr;
  473.   SubscriptionEntry *subscription_entry;
  474.   RoutingEntry *routing_entry;
  475.   struct timeval tmv;
  476.   DiffPrint(DEBUG_MORE_DETAILS, "Subscription Timeout !n");
  477.   GetTime(&tmv);
  478.   // Find the correct Routing entry
  479.   routing_entry = findRoutingEntry(attrs);
  480.   if (routing_entry){
  481.     // Routing entry found
  482.     subscription_itr = routing_entry->subscription_list_.begin();
  483.     // Go through all attributes
  484.     while (subscription_itr != routing_entry->subscription_list_.end()){
  485.       subscription_entry = *subscription_itr;
  486.       // Check timeouts
  487.       if (tmv.tv_sec > (subscription_entry->tv_.tv_sec + SUBSCRIPTION_TIMEOUT)){
  488. // Time expired, send disinterest message
  489. sendDisinterest(subscription_entry->attrs_, routing_entry);
  490. subscription_itr = routing_entry->subscription_list_.erase(subscription_itr);
  491. delete subscription_entry;
  492.       }
  493.       else{
  494. subscription_itr++;
  495.       }
  496.     }
  497.   }
  498.   else{
  499.     DiffPrint(DEBUG_DETAILS, "Warning: Could't find subscription entry - maybe deleted by GradientTimeout ?n");
  500.     // Cancel Timer
  501.     return -1;
  502.   }
  503.   // Keep Timer
  504.   return 0;
  505. }
  506. void OnePhasePullFilter::deleteRoutingEntry(RoutingEntry *routing_entry)
  507. {
  508.   RoutingTable::iterator routing_itr;
  509.   RoutingEntry *current_entry;
  510.   // Go through the routing table
  511.   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
  512.     current_entry = *routing_itr;
  513.     // Is this the entry we are looking for ?
  514.     if (current_entry == routing_entry){
  515.       routing_itr = routing_list_.erase(routing_itr);
  516.       delete routing_entry;
  517.       return;
  518.     }
  519.   }
  520.   DiffPrint(DEBUG_ALWAYS, "Error: Could not find entry to delete !n");
  521. }
  522. RoutingEntry * OnePhasePullFilter::matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place)
  523. {
  524.   RoutingTable::iterator routing_itr;
  525.   RoutingEntry *routing_entry;
  526.   for (routing_itr = start; routing_itr != routing_list_.end(); ++routing_itr){
  527.     routing_entry = *routing_itr;
  528.     if (MatchAttrs(routing_entry->attrs_, attrs)){
  529.       *place = routing_itr;
  530.       return routing_entry;
  531.     }
  532.   }
  533.   return NULL;
  534. }
  535. RoutingEntry * OnePhasePullFilter::findRoutingEntry(NRAttrVec *attrs)
  536. {
  537.   RoutingTable::iterator routing_itr;
  538.   RoutingEntry *routing_entry;
  539.   for (routing_itr = routing_list_.begin(); routing_itr != routing_list_.end(); ++routing_itr){
  540.     routing_entry = *routing_itr;
  541.     if (PerfectMatch(routing_entry->attrs_, attrs))
  542.       return routing_entry;
  543.   }
  544.   return NULL;
  545. }
  546. SubscriptionEntry * OnePhasePullFilter::findMatchingSubscription(RoutingEntry *routing_entry,
  547.  NRAttrVec *attrs)
  548. {
  549.   SubscriptionList::iterator subscription_itr;
  550.   SubscriptionEntry *subscription_entry;
  551.   for (subscription_itr = routing_entry->subscription_list_.begin(); subscription_itr != routing_entry->subscription_list_.end(); ++subscription_itr){
  552.     subscription_entry = *subscription_itr;
  553.     if (PerfectMatch(subscription_entry->attrs_, attrs))
  554.       return subscription_entry;
  555.   }
  556.   return NULL;
  557. }
  558. void OnePhasePullFilter::forwardData(Message *msg,
  559.      RoutingEntry *routing_entry,
  560.      DataForwardingHistory *forwarding_history)
  561. {
  562.   NRSimpleAttribute<void *> *nr_data_attr = NULL;
  563.   NRAttrVec::iterator attribute_iterator;
  564.   FlowIdList msg_flow_list, sinks_flow_list, local_flow_list;
  565.   FlowIdList out_flow_list;
  566.   int32_t out_neighbor;
  567.   int *packed_flows;
  568.   FlowIdList::iterator flow_id_itr;
  569.   RoundIdList::iterator round_id_itr;
  570.   SinkList::iterator sink_itr;
  571.   RoundIdEntry *round_id_entry;
  572.   SinkEntry *sink_entry;
  573.   Message *sink_message, *out_message;
  574.   // Step 0: Read flows from message
  575.   // Find NRFlowAttr and remove from the message
  576.   attribute_iterator = msg->msg_attr_vec_->begin();
  577.   nr_data_attr = NRFlowAttr.find_from(msg->msg_attr_vec_,
  578.       attribute_iterator,
  579.       &attribute_iterator);
  580.   if (!nr_data_attr){
  581.     DiffPrint(DEBUG_ALWAYS, "Cannot find NRFlowAttr !n");
  582.     return;
  583.   }
  584.   msg->msg_attr_vec_->erase(attribute_iterator);
  585.   // Read flow ids from list
  586.   readFlowsFromList(nr_data_attr->getLen() / sizeof(int),
  587.     &msg_flow_list, nr_data_attr->getVal());
  588.   // Fill lists of sinks and flows
  589.   routing_entry->getSinksFromList(&msg_flow_list, &sinks_flow_list);
  590.   routing_entry->getFlowsFromList(&msg_flow_list, &local_flow_list);
  591.   // Step 1: Sink Processing
  592.   if (sinks_flow_list.size() > 0){
  593.     // Copy original message so we can change it
  594.     sink_message = CopyMessage(msg);
  595.     // Go through all rounds
  596.     for (round_id_itr = routing_entry->round_ids_.begin();
  597.  round_id_itr != routing_entry->round_ids_.end();
  598.  round_id_itr++){
  599.       round_id_entry = *round_id_itr;
  600.       flow_id_itr = find(sinks_flow_list.begin(), sinks_flow_list.end(),
  601.  round_id_entry->round_id_);
  602.       if (flow_id_itr != sinks_flow_list.end()){
  603. // Flows match ! Send message to sink
  604. for (sink_itr = round_id_entry->sinks_.begin();
  605.      sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
  606.   sink_entry = *sink_itr;
  607.   if (!forwarding_history->alreadyForwardedToLibrary(sink_entry->port_)){
  608.     // Forward DATA to local sinks
  609.     sink_message->next_hop_ = LOCALHOST_ADDR;
  610.     sink_message->next_port_ = sink_entry->port_;
  611.     // Add sink to the forwarding list
  612.     forwarding_history->forwardingToLibrary(sink_entry->port_);
  613.     ((DiffusionRouting *)dr_)->sendMessage(sink_message, filter_handle_);
  614.   }
  615. }
  616. // Remove sink from the flow_list
  617. if (!removeFlowFromList(&msg_flow_list, round_id_entry->round_id_)){
  618.   // We should not get here
  619.   DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !n");
  620. }
  621.       }
  622.     }
  623.     // Delete sink message
  624.     delete sink_message;
  625.   }
  626.   // Step 2: Intermediate Processing
  627.   DiffPrint(DEBUG_NO_DETAILS, "Node%d: Forwarding Datan", ((DiffusionRouting *)dr_)->getNodeId());
  628.   // Set reinforcement flags
  629.   if (msg->last_hop_ != LOCALHOST_ADDR)
  630.     routing_entry->updateNeighborDataInfo(msg->last_hop_, true);
  631.   // Work on local list until we finish processing all flows
  632.   while (local_flow_list.size() > 0){
  633.     // Initialize out_flow_list
  634.     out_flow_list.clear();
  635.     // Move first flow from the local flow list to out_flow_list
  636.     out_flow_list.push_back(*(local_flow_list.begin()));
  637.     local_flow_list.erase(local_flow_list.begin());
  638.     // Remove flow from the flow_list
  639.     if (!removeFlowFromList(&msg_flow_list, *(out_flow_list.begin()))){
  640.       // We should not get here
  641.       DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !n");
  642.     }
  643.     // Select output_neighbor
  644.     out_neighbor = routing_entry->getNeighborFromFlow(*(out_flow_list.begin()));
  645.     // Must have a valid neighbor
  646.     if (out_neighbor == BROADCAST_ADDR)
  647.       continue;
  648.     
  649.     // Go through all other local flows
  650.     for (flow_id_itr = local_flow_list.begin();
  651.  flow_id_itr != local_flow_list.end(); flow_id_itr++){
  652.       // Check if output neighbor for this flow matches current
  653.       if (routing_entry->getNeighborFromFlow(*flow_id_itr) == out_neighbor){
  654. // Yes it does !
  655. // Remove flow from the flow_list
  656. if (!removeFlowFromList(&msg_flow_list, *flow_id_itr)){
  657.   // We should not get here
  658.   DiffPrint(DEBUG_ALWAYS, "Cannot remove flow from msg_flow_list !n");
  659. }
  660. // Aggregate both in a single message
  661. out_flow_list.push_back(*flow_id_itr);
  662. flow_id_itr = local_flow_list.erase(flow_id_itr);
  663.       }
  664.     }
  665.     // out_flow_list should have a list of flow for out_neighbor
  666.     out_message = CopyMessage(msg);
  667.     out_message->next_hop_ = out_neighbor;
  668.     packed_flows = writeFlowsToList(&out_flow_list);
  669.     out_message->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
  670.   (void *) packed_flows,
  671.   sizeof(int) * out_flow_list.size()));
  672.     // NRFlowAttr.make will copy this, so we must delete it
  673.     delete [] packed_flows;
  674.     // Send it out
  675.     DiffPrint(DEBUG_NO_DETAILS, "Forwarding data to node %d !n",
  676.       out_neighbor);
  677.     ((DiffusionRouting *)dr_)->sendMessage(out_message, filter_handle_);
  678.     // Delete message
  679.     delete out_message;
  680.   }
  681.   // Done processing for this data type, we replace the NRFlowAttr
  682.   // with the (possibly) shorter msg_flow_list list
  683.   packed_flows = writeFlowsToList(&msg_flow_list);
  684.   nr_data_attr->setVal((void *) packed_flows, sizeof(int) * msg_flow_list.size());
  685.   msg->msg_attr_vec_->push_back(nr_data_attr);
  686.   // setVal makes a copy of this, so we must delete it
  687.   delete [] packed_flows;
  688. }
  689. void OnePhasePullFilter::sendInterest(NRAttrVec *attrs,
  690.       RoutingEntry *routing_entry)
  691. {
  692.   RoundIdList::iterator round_id_itr;
  693.   RoundIdEntry *round_id_entry;
  694.   SinkList::iterator sink_itr;
  695.   SinkEntry *sink_entry;
  696.   Message *msg = new Message(DIFFUSION_VERSION, INTEREST, 0, 0,
  697.      attrs->size(), 0, 0, LOCALHOST_ADDR,
  698.      LOCALHOST_ADDR);
  699.   msg->msg_attr_vec_ = CopyAttrs(attrs);
  700.   // Go through all round ids
  701.   for (round_id_itr = routing_entry->round_ids_.begin();
  702.        round_id_itr != routing_entry->round_ids_.end(); round_id_itr++){
  703.     round_id_entry = *round_id_itr;
  704.     // Send interest message to all local sinks
  705.     for (sink_itr = round_id_entry->sinks_.begin();
  706.  sink_itr != round_id_entry->sinks_.end(); ++sink_itr){
  707.       sink_entry = *sink_itr;
  708.       msg->next_port_ = sink_entry->port_;
  709.       ((DiffusionRouting *)dr_)->sendMessage(msg, filter_handle_);
  710.     }
  711.   }
  712.   delete msg;
  713. }
  714. void OnePhasePullFilter::sendDisinterest(NRAttrVec *attrs,
  715.  RoutingEntry *routing_entry)
  716. {
  717.   NRAttrVec *new_attrs;
  718.   NRSimpleAttribute<int> *nrclass = NULL;
  719.   new_attrs = CopyAttrs(attrs);
  720.   nrclass = NRClassAttr.find(new_attrs);
  721.   if (!nrclass){
  722.     DiffPrint(DEBUG_ALWAYS,
  723.       "Error: sendDisinterest couldn't find the class attribute !n");
  724.     ClearAttrs(new_attrs);
  725.     delete new_attrs;
  726.     return;
  727.   }
  728.   // Change the class_key value
  729.   nrclass->setVal(NRAttribute::DISINTEREST_CLASS);
  730.   sendInterest(new_attrs, routing_entry);
  731.    
  732.   ClearAttrs(new_attrs);
  733.   delete new_attrs;
  734. }
  735. void OnePhasePullFilter::readFlowsFromList(int number_of_flows,
  736.    FlowIdList *flow_list,
  737.    void *source_blob)
  738. {
  739.   int *current_flow;
  740.   // Point to the beginning of the list
  741.   current_flow = (int *) source_blob;
  742.   for (int i = 0; i < number_of_flows; i++){
  743.     flow_list->push_back(*current_flow);
  744.     // Advance to next flow
  745.     current_flow++;
  746.   }
  747. }
  748. int * OnePhasePullFilter::writeFlowsToList(FlowIdList *flow_list)
  749. {
  750.   FlowIdList::iterator flow_itr;
  751.   int number_of_flows;
  752.   int *flows, *current;;
  753.   number_of_flows = flow_list->size();
  754.   flows = new int[number_of_flows];
  755.   current = flows;
  756.   for (flow_itr = flow_list->begin();
  757.        flow_itr != flow_list->end(); flow_itr++){
  758.     *current = *flow_itr;
  759.     current++;
  760.   }
  761.   return flows;
  762. }
  763. bool OnePhasePullFilter::removeFlowFromList(FlowIdList *flow_list,
  764.     int32_t flow)
  765. {
  766.   FlowIdList::iterator flow_itr;
  767.   flow_itr = find(flow_list->begin(), flow_list->end(), flow);
  768.   if (flow_itr != flow_list->end()){
  769.     flow_itr = flow_list->erase(flow_itr);
  770.     return true;
  771.   }
  772.   return false;
  773. }
  774. void OnePhasePullFilter::addLocalFlowsToMessage(Message *msg)
  775. {
  776.   RoutingTable::iterator routing_itr;
  777.   RoundIdList::iterator round_id_itr;
  778.   RoutingEntry *routing_entry;
  779.   RoundIdEntry *round_id_entry;
  780.   FlowIdList local_flows;
  781.   int *packed_flows;
  782.   // First we loop through our routing entries
  783.   for (routing_itr = routing_list_.begin();
  784.        routing_itr != routing_list_.end(); routing_itr++){
  785.     routing_entry = *routing_itr;
  786.     // Now go through each round
  787.     for (round_id_itr = routing_entry->round_ids_.begin();
  788.  round_id_itr != routing_entry->round_ids_.end();
  789.  round_id_itr++){
  790.       round_id_entry = *round_id_itr;
  791.       local_flows.push_back(round_id_entry->round_id_);
  792.     }
  793.   }
  794.   
  795.   packed_flows = writeFlowsToList(&local_flows);
  796.   msg->msg_attr_vec_->push_back(NRFlowAttr.make(NRAttribute::IS,
  797. (void *) packed_flows,
  798. sizeof(int) * local_flows.size()));
  799.   // NRFlowAttr.make will copy this, so we must delete it here
  800.   delete [] packed_flows;
  801.   local_flows.clear();
  802. }
  803. void OnePhasePullFilter::recv(Message *msg, handle h)
  804. {
  805.   if (h != filter_handle_){
  806.     DiffPrint(DEBUG_ALWAYS,
  807.       "Error: received msg for handle %d, subscribed to handle %d !n",
  808.       h, filter_handle_);
  809.     return;
  810.   }
  811.   if (msg->new_message_ == 1)
  812.     processNewMessage(msg);
  813.   else
  814.     processOldMessage(msg);
  815. }
  816. void OnePhasePullFilter::processOldMessage(Message *msg)
  817. {
  818.   NRSimpleAttribute<int> *nrsubscription = NULL;
  819.   NRAttrVec::iterator attribute_iterator;
  820.   RoutingTable::iterator routing_itr;
  821.   RoutingEntry *routing_entry;
  822.   int32_t round_id;
  823.   switch (msg->msg_type_){
  824.   case INTEREST:
  825.     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Old Interest !n",((DiffusionRouting *)dr_)->getNodeId());
  826.     if (msg->last_hop_ == LOCALHOST_ADDR){
  827.       // Old interest should not come from local sink
  828.       DiffPrint(DEBUG_ALWAYS, "Warning: Old Interest from local sink !n");
  829.       break;
  830.     }
  831.     // Step 0: Take out the subscription attribute
  832.     attribute_iterator = msg->msg_attr_vec_->begin();
  833.     nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
  834.   attribute_iterator,
  835.   &attribute_iterator);
  836.     // Return if we cannot find a subscription attribute
  837.     if (!nrsubscription){
  838.       DiffPrint(DEBUG_ALWAYS,
  839. "Warning: Can't find SUBSCRIPTION attribute in the message !n");
  840.       return;
  841.     }
  842.     // Delete attribute from the message
  843.     msg->msg_attr_vec_->erase(attribute_iterator);
  844.     // Get the routing entry for these attrs      
  845.     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
  846.     if (routing_entry){
  847.       // Use subscription id for identifying this flow
  848.       round_id = nrsubscription->getVal();
  849.       // Add gradient to the current round entry
  850.       routing_entry->addGradient(msg->last_hop_, round_id, false);
  851.     }
  852.     // Add the subscription attribute back to the message
  853.     msg->msg_attr_vec_->push_back(nrsubscription);
  854.     break;
  855.   case EXPLORATORY_DATA:
  856.   case PUSH_EXPLORATORY_DATA:
  857.     DiffPrint(DEBUG_ALWAYS, "Received and OLD EXPLORATORY message !n");
  858.     break;
  859.   case DATA: 
  860.     DiffPrint(DEBUG_NO_DETAILS, "Received an old Data message !n");
  861.     // Find the correct routing entry
  862.     routing_itr = routing_list_.begin();
  863.     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  864.       &routing_itr);
  865.     while (routing_entry){
  866.       DiffPrint(DEBUG_NO_DETAILS,
  867. "Set flags to %d to OLD_MESSAGE !n", msg->last_hop_);
  868.       // Set reinforcement flags
  869.       if (msg->last_hop_ != LOCALHOST_ADDR)
  870. routing_entry->updateNeighborDataInfo(msg->last_hop_, false);
  871.       // Continue going through other data types
  872.       routing_itr++;
  873.       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  874. &routing_itr);
  875.     }
  876.     break;
  877.    case NEGATIVE_REINFORCEMENT:
  878.     DiffPrint(DEBUG_IMPORTANT, "Received an old Negative Reinforcement !n");
  879.     break;
  880.   default:
  881.     DiffPrint(DEBUG_ALWAYS,
  882.       "Received an unknown message type: %dn", msg->msg_type_);
  883.     break;
  884.   }
  885. }
  886. void OnePhasePullFilter::processNewMessage(Message *msg)
  887. {
  888.   DataForwardingHistory *forwarding_history;
  889.   NRSimpleAttribute<int> *nrclass = NULL;
  890.   NRSimpleAttribute<int> *nrscope = NULL;
  891.   NRSimpleAttribute<int> *nrsubscription = NULL;
  892.   RoundIdList::iterator round_id_itr;
  893.   RoutingTable::iterator routing_itr;
  894.   NRAttrVec::iterator attribute_iterator;
  895.   RoundIdEntry *round_id_entry;
  896.   RoutingEntry *routing_entry;
  897.   SubscriptionEntry *subscription_entry;
  898.   Message *my_msg;
  899.   TimerCallback *interest_timer, *subscription_timer;
  900.   bool new_data_type = false;
  901.   int32_t round_id;
  902.   switch (msg->msg_type_){
  903.   case INTEREST:
  904.     DiffPrint(DEBUG_NO_DETAILS, "Received Interest !n");
  905.     nrclass = NRClassAttr.find(msg->msg_attr_vec_);
  906.     nrscope = NRScopeAttr.find(msg->msg_attr_vec_);
  907.     if (!nrclass || !nrscope){
  908.       DiffPrint(DEBUG_ALWAYS,
  909. "Warning: Can't find CLASS/SCOPE attributes in the message !n");
  910.       return;
  911.     }
  912.     // Step 0: Take out the subscription attribute
  913.     attribute_iterator = msg->msg_attr_vec_->begin();
  914.     nrsubscription = NRSubscriptionAttr.find_from(msg->msg_attr_vec_,
  915.   attribute_iterator,
  916.   &attribute_iterator);
  917.     // Return if we cannot find a subscription attribute
  918.     if (!nrsubscription){
  919.       DiffPrint(DEBUG_ALWAYS,
  920. "Warning: Can't find SUBSCRIPTION attribute in the message !n");
  921.       return;
  922.     }
  923.     // Delete attribute from the message
  924.     msg->msg_attr_vec_->erase(attribute_iterator);
  925.     // Step 1: Look for the same data type
  926.     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
  927.     if (!routing_entry){
  928.       // Create a new routing entry for this data type
  929.       routing_entry = new RoutingEntry;
  930.       routing_entry->attrs_ = CopyAttrs(msg->msg_attr_vec_);
  931.       routing_list_.push_back(routing_entry);
  932.       new_data_type = true;
  933.     }
  934.     // Add the subscription attribute back to the message
  935.     msg->msg_attr_vec_->push_back(nrsubscription);
  936.     // Use subscription id for identifying this flow
  937.     round_id = nrsubscription->getVal();
  938.     if (msg->last_hop_ == LOCALHOST_ADDR){
  939.       // From local sink
  940.       routing_entry->updateSink(msg->source_port_, round_id);
  941.     }
  942.     else{
  943.       // Interest received from the network. Add gradient to our
  944.       // last_hop neighbor
  945.       // Add gradient to the current round entry
  946.       routing_entry->addGradient(msg->last_hop_, round_id, true);
  947.     }
  948.     if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
  949. (nrclass->getOp() == NRAttribute::IS)){
  950.       // Global interest messages should always be forwarded
  951.       if (nrscope->getVal() == NRAttribute::GLOBAL_SCOPE){
  952. interest_timer = new OppInterestForwardTimer(this, CopyMessage(msg));
  953. ((DiffusionRouting *)dr_)->addTimer(INTEREST_FORWARD_DELAY +
  954.     (int) ((INTEREST_FORWARD_JITTER * (GetRand() * 1.0 / RAND_MAX) - (INTEREST_FORWARD_JITTER / 2))),
  955.     interest_timer);
  956.       }
  957.     }
  958.     else{
  959.       if ((nrclass->getOp() != NRAttribute::IS) &&
  960.   (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE) &&
  961.   (new_data_type)){
  962. subscription_timer = new OppSubscriptionExpirationTimer(this,
  963.      CopyAttrs(msg->msg_attr_vec_));
  964. ((DiffusionRouting *)dr_)->addTimer(SUBSCRIPTION_DELAY +
  965.     (int) (SUBSCRIPTION_DELAY * (GetRand() * 1.0 / RAND_MAX)),
  966.     subscription_timer);
  967.       }
  968.       // Subscriptions don't have to match other subscriptions
  969.       break;
  970.     }
  971.     // Step 2: Match interest against other subscriptions
  972.     routing_itr = routing_list_.begin();
  973.     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  974.       &routing_itr);
  975.     while (routing_entry){
  976.       // Got a match
  977.       subscription_entry = findMatchingSubscription(routing_entry,
  978.     msg->msg_attr_vec_);
  979.       // Do we already have this subscription
  980.       if (subscription_entry){
  981. GetTime(&(subscription_entry->tv_));
  982.       }
  983.       else{
  984. // Create a new attribute entry, add it to the attribute list
  985. // and send an interest message to the local sink
  986. subscription_entry = new SubscriptionEntry(CopyAttrs(msg->msg_attr_vec_));
  987. routing_entry->subscription_list_.push_back(subscription_entry);
  988. sendInterest(subscription_entry->attrs_, routing_entry);
  989.       }
  990.       // Move to the next RoutingEntry
  991.       routing_itr++;
  992.       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  993. &routing_itr);
  994.     }
  995.       break;
  996.   case EXPLORATORY_DATA:
  997.   case PUSH_EXPLORATORY_DATA:
  998.     DiffPrint(DEBUG_ALWAYS, "Node%d: Received EXPLORATORY Message !n",((DiffusionRouting *)dr_)->getNodeId());
  999.     break;
  1000.   case DATA:
  1001.     DiffPrint(DEBUG_NO_DETAILS, "Node%d: Received Data !n",((DiffusionRouting *)dr_)->getNodeId());
  1002.     // Create data message forwarding cache
  1003.     forwarding_history = new DataForwardingHistory;
  1004.     // If message comes from local source, we include our local flows
  1005.     if (msg->last_hop_ == LOCALHOST_ADDR){
  1006.       // From local source
  1007.       addLocalFlowsToMessage(msg);
  1008.     }
  1009.     // Find the correct routing entry
  1010.     routing_itr = routing_list_.begin();
  1011.     routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  1012.       &routing_itr);
  1013.     while (routing_entry){
  1014.       forwardData(msg, routing_entry, forwarding_history);
  1015.       routing_itr++;
  1016.       routing_entry = matchRoutingEntry(msg->msg_attr_vec_, routing_itr,
  1017. &routing_itr);
  1018.     }
  1019.     delete forwarding_history;
  1020.     break;
  1021.   case NEGATIVE_REINFORCEMENT:
  1022.     DiffPrint(DEBUG_NO_DETAILS, "Received a Negative Reinforcement !n");
  1023.     // Find matching routing entry
  1024.     routing_entry = findRoutingEntry(msg->msg_attr_vec_);
  1025.     if (routing_entry){
  1026.       // Go through all round ids
  1027.       for (round_id_itr = routing_entry->round_ids_.begin();
  1028.    round_id_itr != routing_entry->round_ids_.end();
  1029.    round_id_itr++){
  1030. round_id_entry = *round_id_itr;
  1031. // Delete gradient to last hop
  1032. round_id_entry->deleteGradient(msg->last_hop_);
  1033. // Delete round id entry if nothing left
  1034. if (round_id_entry->gradients_.size() == 0){
  1035.   round_id_itr = routing_entry->round_ids_.erase(round_id_itr);
  1036.   delete round_id_entry;
  1037. }
  1038.       }
  1039.       // If there are no other gradients we need to send our own
  1040.       // negative reinforcement
  1041.       if (routing_entry->round_ids_.size() == 0){
  1042. my_msg = new Message(DIFFUSION_VERSION, NEGATIVE_REINFORCEMENT,
  1043.      0, 0, routing_entry->attrs_->size(), pkt_count_,
  1044.      random_id_, BROADCAST_ADDR, LOCALHOST_ADDR);
  1045. my_msg->msg_attr_vec_ = CopyAttrs(routing_entry->attrs_);
  1046. DiffPrint(DEBUG_NO_DETAILS,
  1047.   "Broadcasting Negative Reinforcement !n");
  1048. ((DiffusionRouting *)dr_)->sendMessage(my_msg, filter_handle_);
  1049. pkt_count_++;
  1050. delete my_msg;
  1051.       }
  1052.     }
  1053.     break;
  1054.   default:
  1055.     break;
  1056.   }
  1057. }
  1058. handle OnePhasePullFilter::setupFilter()
  1059. {
  1060.   NRAttrVec attrs;
  1061.   handle h;
  1062.   // For the One-Phase Pull filter, we set up a filter to receive
  1063.   // messages using this protocol
  1064.   attrs.push_back(NRAlgorithmAttr.make(NRAttribute::EQ,
  1065.        NRAttribute::ONE_PHASE_PULL_ALGORITHM));
  1066.   h = ((DiffusionRouting *)dr_)->addFilter(&attrs,
  1067.    ONE_PHASE_PULL_FILTER_PRIORITY,
  1068.    filter_callback_);
  1069.   ClearAttrs(&attrs);
  1070.   return h;
  1071. }
  1072. #ifndef NS_DIFFUSION
  1073. void OnePhasePullFilter::run()
  1074. {
  1075.   // Doesn't do anything
  1076.   while (1){
  1077.     sleep(1000);
  1078.   }
  1079. }
  1080. #endif // !NS_DIFFUSION
  1081. #ifdef NS_DIFFUSION
  1082. OnePhasePullFilter::OnePhasePullFilter(const char *diffrtg)
  1083. {
  1084.   DiffAppAgent *agent;
  1085. #else
  1086. OnePhasePullFilter::OnePhasePullFilter(int argc, char **argv)
  1087. {
  1088. #endif // NS_DIFFUSION
  1089.   struct timeval tv;
  1090.   TimerCallback *reinforcement_timer, *gradient_timer;
  1091.   GetTime(&tv);
  1092.   SetSeed(&tv);
  1093.   pkt_count_ = GetRand();
  1094.   random_id_ = GetRand();
  1095.   // Create Diffusion Routing class
  1096. #ifdef NS_DIFFUSION
  1097.   agent = (DiffAppAgent *)TclObject::lookup(diffrtg);
  1098.   dr_ = agent->dr();
  1099. #else
  1100.   parseCommandLine(argc, argv);
  1101.   dr_ = NR::createNR(diffusion_port_);
  1102. #endif // NS_DIFFUSION
  1103.   // Create callback classes and set up pointers
  1104.   filter_callback_ = new OnePhasePullFilterReceive(this);
  1105.   // Set up the filter
  1106.   filter_handle_ = setupFilter();
  1107.   // Print filter information
  1108.   DiffPrint(DEBUG_IMPORTANT, "One-Phase Pull filter received handle %dn",
  1109.     filter_handle_);
  1110.   // Add timers for keeping state up-to-date
  1111.   gradient_timer = new OppGradientExpirationCheckTimer(this);
  1112.   ((DiffusionRouting *)dr_)->addTimer(GRADIENT_DELAY, gradient_timer);
  1113.   reinforcement_timer = new OppReinforcementCheckTimer(this);
  1114.   ((DiffusionRouting *)dr_)->addTimer(REINFORCEMENT_DELAY, reinforcement_timer);
  1115.   GetTime(&tv);
  1116.   DiffPrint(DEBUG_ALWAYS,
  1117.     "One-Phase Pull filter initialized at time %ld:%ld!n",
  1118.     tv.tv_sec, tv.tv_usec);
  1119. }
  1120. #ifndef USE_SINGLE_ADDRESS_SPACE
  1121. int main(int argc, char **argv)
  1122. {
  1123.   OnePhasePullFilter *app;
  1124.   // Initialize and run the Gradient Filter
  1125.   app = new OnePhasePullFilter(argc, argv);
  1126.   app->run();
  1127.   return 0;
  1128. }
  1129. #endif // !USE_SINGLE_ADDRESS_SPACE