one_phase_pull.hh
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:10k
源码类别:

通讯编程

开发平台:

Visual C++

  1. //
  2. // one_phase_pull.hh    : One-Phase Pull Include File
  3. // author               : Fabio Silva
  4. //
  5. // Copyright (C) 2000-2003 by the University of Southern California
  6. // $Id: one_phase_pull.hh,v 1.4 2005/09/13 04:53:47 tomh Exp $
  7. //
  8. // This program is free software; you can redistribute it and/or
  9. // modify it under the terms of the GNU General Public License,
  10. // version 2, as published by the Free Software Foundation.
  11. //
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. //
  17. // You should have received a copy of the GNU General Public License along
  18. // with this program; if not, write to the Free Software Foundation, Inc.,
  19. // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  20. //
  21. // Linking this file statically or dynamically with other modules is making
  22. // a combined work based on this file.  Thus, the terms and conditions of
  23. // the GNU General Public License cover the whole combination.
  24. //
  25. // In addition, as a special exception, the copyright holders of this file
  26. // give you permission to combine this file with free software programs or
  27. // libraries that are released under the GNU LGPL and with code included in
  28. // the standard release of ns-2 under the Apache 2.0 license or under
  29. // otherwise-compatible licenses with advertising requirements (or modified
  30. // versions of such code, with unchanged license).  You may copy and
  31. // distribute such a system following the terms of the GNU GPL for this
  32. // file and the licenses of the other code concerned, provided that you
  33. // include the source code of that other code when and as the GNU GPL
  34. // requires distribution of source code.
  35. //
  36. // Note that people who make modified versions of this file are not
  37. // obligated to grant this special exception for their modified versions;
  38. // it is their choice whether to do so.  The GNU General Public License
  39. // gives permission to release a modified version without this exception;
  40. // this exception also makes it possible to release a modified version
  41. // which carries forward this exception.
  42. #ifndef _ONE_PHASE_PULL_HH_
  43. #define _ONE_PHASE_PULL_HH_
  44. #ifdef HAVE_CONFIG_H
  45. #include "config.h"
  46. #endif // HAVE_CONFIG_H
  47. #include <algorithm>
  48. #include "diffapp.hh"
  49. #ifdef NS_DIFFUSION
  50. #include <tcl.h>
  51. #include "diffagent.h"
  52. #else
  53. #include "main/hashutils.hh"
  54. #endif // NS_DIFFUSION
  55. #define ONE_PHASE_PULL_FILTER_PRIORITY 80
  56. class OPPGradientEntry {
  57. public:
  58.   OPPGradientEntry(int32_t node_id) : node_id_(node_id)
  59.   {
  60.     GetTime(&tv_);
  61.   };
  62.   int32_t node_id_;
  63.   struct timeval tv_;
  64. };
  65. typedef list<OPPGradientEntry *> GradientList;
  66. class SinkEntry {
  67. public:
  68.   SinkEntry(u_int16_t port) : port_(port)
  69.   {
  70.     GetTime(&tv_);
  71.   };
  72.   u_int16_t port_;
  73.   struct timeval tv_;
  74. };
  75. typedef list<SinkEntry *> SinkList;
  76. class OPPDataNeighborEntry {
  77. public:
  78.   OPPDataNeighborEntry(int32_t node_id) : node_id_(node_id)
  79.   {
  80.     messages_ = 1;
  81.   };
  82.   int32_t node_id_;
  83.   int messages_;
  84.   bool new_messages_;
  85. };
  86. typedef list<OPPDataNeighborEntry *> DataNeighborList;
  87. class SubscriptionEntry {
  88. public:
  89.   SubscriptionEntry(NRAttrVec *attrs) : attrs_(attrs)
  90.   {
  91.     GetTime(&tv_);
  92.   };
  93.   ~SubscriptionEntry()
  94.   {
  95.     ClearAttrs(attrs_);
  96.     delete attrs_;
  97.   };
  98.   struct timeval tv_;
  99.   NRAttrVec *attrs_;
  100. };
  101. typedef list<SubscriptionEntry *> SubscriptionList;
  102. typedef list<int> FlowIdList;
  103. class RoundIdEntry {
  104. public:
  105.   RoundIdEntry(int32_t round_id) : round_id_(round_id)
  106.   {
  107.     GetTime(&tv_);
  108.   };
  109.   ~RoundIdEntry()
  110.   {
  111.     GradientList::iterator gradient_itr;
  112.     SinkList::iterator sink_itr;
  113.     // Clear the gradient list
  114.     for (gradient_itr = gradients_.begin();
  115.  gradient_itr != gradients_.end(); gradient_itr++){
  116.       delete (*gradient_itr);
  117.     }
  118.     gradients_.clear();
  119.     // Clear the local sink list
  120.     for (sink_itr = sinks_.begin(); sink_itr != sinks_.end(); sink_itr++){
  121.       delete (*sink_itr);
  122.     }
  123.     sinks_.clear();
  124.   };
  125.   OPPGradientEntry * findGradient(int32_t node_id);
  126.   void deleteGradient(int32_t node_id);
  127.   void addGradient(int32_t node_id);
  128.   void updateSink(u_int16_t sink_id);
  129.   void deleteExpiredSinks();
  130.   void deleteExpiredGradients();
  131.   
  132.   int32_t round_id_;
  133.   struct timeval tv_;
  134.   GradientList gradients_;
  135.   SinkList sinks_;
  136. };
  137. typedef list<RoundIdEntry *> RoundIdList;
  138. class RoutingEntry {
  139. public:
  140.   RoutingEntry() {
  141.     GetTime(&tv_);
  142.   };
  143.   ~RoutingEntry() {
  144.     DataNeighborList::iterator data_neighbor_itr;
  145.     RoundIdList::iterator round_id_itr;
  146.     SubscriptionList::iterator subscription_itr;
  147.     // Clear Attributes
  148.     ClearAttrs(attrs_);
  149.     delete attrs_;
  150.     // Clear the attribute list
  151.     for (subscription_itr = subscription_list_.begin();
  152.  subscription_itr != subscription_list_.end();
  153.  subscription_itr++){
  154.       delete (*subscription_itr);
  155.     }
  156.     subscription_list_.clear();
  157.     // Clear the round_ids list
  158.     for (round_id_itr = round_ids_.begin(); round_id_itr != round_ids_.end(); round_id_itr++){
  159.       delete (*round_id_itr);
  160.     }
  161.     round_ids_.clear();
  162.     // Clear the data neighbor's list
  163.     for (data_neighbor_itr = data_neighbors_.begin(); data_neighbor_itr != data_neighbors_.end(); data_neighbor_itr++){
  164.       delete (*data_neighbor_itr);
  165.     }
  166.     data_neighbors_.clear();
  167.   };
  168.   RoundIdEntry * findRoundIdEntry(int32_t round_id);
  169.   RoundIdEntry * addRoundIdEntry(int32_t round_id);
  170.   void updateNeighborDataInfo(int32_t node_id, bool new_message);
  171.   void addGradient(int32_t last_hop, int32_t round_id, bool new_gradient);
  172.   void updateSink(u_int16_t sink_id, int32_t round_id);
  173.   void deleteExpiredRoundIds();
  174.   void getSinksFromList(FlowIdList *msg_list, FlowIdList *sink_list);
  175.   void getFlowsFromList(FlowIdList *msg_list, FlowIdList *flow_list);
  176.   int32_t getNeighborFromFlow(int32_t flow_id);
  177.   struct timeval tv_;
  178.   NRAttrVec *attrs_;
  179.   RoundIdList round_ids_;
  180.   SubscriptionList subscription_list_;
  181.   DataNeighborList data_neighbors_;
  182. };
  183. typedef list<RoutingEntry *> RoutingTable;
  184. class OnePhasePullFilter;
  185. class OnePhasePullFilterReceive : public FilterCallback {
  186. public:
  187.   OnePhasePullFilterReceive(OnePhasePullFilter *filter) : filter_(filter) {};
  188.   void recv(Message *msg, handle h);
  189.   OnePhasePullFilter *filter_;
  190. };
  191. class DataForwardingHistory {
  192. public:
  193.   DataForwardingHistory()
  194.   {
  195.     data_reinforced_ = false;
  196.   };
  197.   ~DataForwardingHistory()
  198.   {
  199.     node_list_.clear();
  200.     sink_list_.clear();
  201.   };
  202.   bool alreadyForwardedToNetwork(int32_t node_id)
  203.   {
  204.     list<int32_t>::iterator list_itr;
  205.     list_itr = find(node_list_.begin(), node_list_.end(), node_id);
  206.     if (list_itr == node_list_.end())
  207.       return false;
  208.     return true;
  209.   };
  210.   bool alreadyForwardedToLibrary(u_int16_t sink_id)
  211.   {
  212.     list<u_int16_t>::iterator list_itr;
  213.     list_itr = find(sink_list_.begin(), sink_list_.end(), sink_id);
  214.     if (list_itr == sink_list_.end())
  215.       return false;
  216.     return true;
  217.   };
  218.   bool alreadyReinforced()
  219.   {
  220.     return data_reinforced_;
  221.   };
  222.   void sendingReinforcement()
  223.   {
  224.     data_reinforced_ = true;
  225.   };
  226.   void forwardingToNetwork(int32_t node_id)
  227.   {
  228.     node_list_.push_back(node_id);
  229.   };
  230.   void forwardingToLibrary(u_int16_t sink_id)
  231.   {
  232.     sink_list_.push_back(sink_id);
  233.   };
  234. private:
  235.   list<int32_t> node_list_;
  236.   list<u_int16_t> sink_list_;
  237.   bool data_reinforced_;
  238. };
  239. class OnePhasePullFilter : public DiffApp {
  240. public:
  241. #ifdef NS_DIFFUSION
  242.   OnePhasePullFilter(const char *dr);
  243.   int command(int argc, const char*const* argv);
  244.   void run() {}
  245. #else
  246.   OnePhasePullFilter(int argc, char **argv);
  247.   void run();
  248. #endif // NS_DIFFUSION
  249.   virtual ~OnePhasePullFilter()
  250.   {
  251.     // Nothing to do
  252.   };
  253.   void recv(Message *msg, handle h);
  254.   // Timers
  255.   void messageTimeout(Message *msg);
  256.   void interestTimeout(Message *msg);
  257.   void gradientTimeout();
  258.   void reinforcementTimeout();
  259.   int subscriptionTimeout(NRAttrVec *attrs);
  260. protected:
  261.   // General Variables
  262.   handle filter_handle_;
  263.   int pkt_count_;
  264.   int random_id_;
  265.   // Receive Callback for the filter
  266.   OnePhasePullFilterReceive *filter_callback_;
  267.   // List of all known datatypes
  268.   RoutingTable routing_list_;
  269.   // Setup the filter
  270.   handle setupFilter();
  271.   // Matching functions
  272.   RoutingEntry * findRoutingEntry(NRAttrVec *attrs);
  273.   void deleteRoutingEntry(RoutingEntry *routing_entry);
  274.   RoutingEntry * matchRoutingEntry(NRAttrVec *attrs, RoutingTable::iterator start, RoutingTable::iterator *place);
  275.   SubscriptionEntry * findMatchingSubscription(RoutingEntry *routing_entry, NRAttrVec *attrs);
  276.   // Message forwarding functions
  277.   void sendInterest(NRAttrVec *attrs, RoutingEntry *routing_entry);
  278.   void sendDisinterest(NRAttrVec *attrs, RoutingEntry *routing_entry);
  279.   void forwardData(Message *msg, RoutingEntry *routing_entry,
  280.    DataForwardingHistory *forwarding_history);
  281.   // Message Processing functions
  282.   void processOldMessage(Message *msg);
  283.   void processNewMessage(Message *msg);
  284.   // Flow Ids Processing functions
  285.   void addLocalFlowsToMessage(Message *msg);
  286.   void readFlowsFromList(int number_of_flows, FlowIdList *flow_list,
  287.  void *source_blob);
  288.   int * writeFlowsToList(FlowIdList *flow_list);
  289.   bool removeFlowFromList(FlowIdList *flow_list, int32_t flow);
  290. };
  291. class OppGradientExpirationCheckTimer : public TimerCallback {
  292. public:
  293.   OppGradientExpirationCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {};
  294.   ~OppGradientExpirationCheckTimer() {};
  295.   int expire();
  296.   OnePhasePullFilter *agent_;
  297. };
  298. class OppReinforcementCheckTimer : public TimerCallback {
  299. public:
  300.   OppReinforcementCheckTimer(OnePhasePullFilter *agent) : agent_(agent) {};
  301.   ~OppReinforcementCheckTimer() {};
  302.   int expire();
  303.   OnePhasePullFilter *agent_;
  304. };
  305. class OppMessageSendTimer : public TimerCallback {
  306. public:
  307.   OppMessageSendTimer(OnePhasePullFilter *agent, Message *msg) :
  308.     agent_(agent), msg_(msg) {};
  309.   ~OppMessageSendTimer()
  310.   {
  311.     delete msg_;
  312.   };
  313.   int expire();
  314.   OnePhasePullFilter *agent_;
  315.   Message *msg_;
  316. };
  317. class OppInterestForwardTimer : public TimerCallback {
  318. public:
  319.   OppInterestForwardTimer(OnePhasePullFilter *agent, Message *msg) :
  320.     agent_(agent), msg_(msg) {};
  321.   ~OppInterestForwardTimer()
  322.   {
  323.     delete msg_;
  324.   };
  325.   int expire();
  326.   OnePhasePullFilter *agent_;
  327.   Message *msg_;
  328. };
  329. class OppSubscriptionExpirationTimer : public TimerCallback {
  330. public:
  331.   OppSubscriptionExpirationTimer(OnePhasePullFilter *agent, NRAttrVec *attrs) :
  332.     agent_(agent), attrs_(attrs) {};
  333.   ~OppSubscriptionExpirationTimer()
  334.   {
  335.     ClearAttrs(attrs_);
  336.     delete attrs_;
  337.   };
  338.   int expire();
  339.   OnePhasePullFilter *agent_;
  340.   NRAttrVec *attrs_;
  341. };
  342. #endif // !_ONE_PHASE_PULL_HH_