dr.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:40k
源码类别:

通讯编程

开发平台:

Visual C++

  1. //
  2. // dr.cc           : Diffusion Routing Class
  3. // authors         : John Heidemann and Fabio Silva
  4. //
  5. // Copyright (C) 2000-2003 by the University of Southern California
  6. // $Id: dr.cc,v 1.17 2005/09/13 04:53:49 tomh Exp $
  7. //
  8. // This program is free software; you can redistribute it and/or
  9. // modify it under the terms of the GNU General Public License,
  10. // version 2, as published by the Free Software Foundation.
  11. //
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. //
  17. // You should have received a copy of the GNU General Public License along
  18. // with this program; if not, write to the Free Software Foundation, Inc.,
  19. // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  20. //
  21. // Linking this file statically or dynamically with other modules is making
  22. // a combined work based on this file.  Thus, the terms and conditions of
  23. // the GNU General Public License cover the whole combination.
  24. //
  25. // In addition, as a special exception, the copyright holders of this file
  26. // give you permission to combine this file with free software programs or
  27. // libraries that are released under the GNU LGPL and with code included in
  28. // the standard release of ns-2 under the Apache 2.0 license or under
  29. // otherwise-compatible licenses with advertising requirements (or modified
  30. // versions of such code, with unchanged license).  You may copy and
  31. // distribute such a system following the terms of the GNU GPL for this
  32. // file and the licenses of the other code concerned, provided that you
  33. // include the source code of that other code when and as the GNU GPL
  34. // requires distribution of source code.
  35. //
  36. // Note that people who make modified versions of this file are not
  37. // obligated to grant this special exception for their modified versions;
  38. // it is their choice whether to do so.  The GNU General Public License
  39. // gives permission to release a modified version without this exception;
  40. // this exception also makes it possible to release a modified version
  41. // which carries forward this exception.
  42. //
  43. #include <stdlib.h>
  44. #include <stdio.h>
  45. #include "dr.hh"
  46. class CallbackEntry {
  47. public:
  48.   NR::Callback *cb_;
  49.   NR::handle subscription_handle_;
  50.   CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
  51.     cb_(cb), subscription_handle_(subscription_handle) {};
  52. };
  53. class HandleEntry {
  54. public:
  55.   handle hdl_;
  56.   bool valid_;
  57.   NRAttrVec *attrs_;
  58.   NR::Callback *cb_;
  59.   struct timeval exploratory_time_;
  60.   int32_t subscription_id_; // Used for One-Phase Pull
  61.   HandleEntry()
  62.   {
  63.     GetTime(&exploratory_time_);
  64.     valid_ = true;
  65.     cb_ = NULL;
  66.   };
  67.   ~HandleEntry(){
  68.     ClearAttrs(attrs_);
  69.     delete attrs_;
  70.   };
  71. };
  72. int InterestCallback::expire()
  73. {
  74.   int retval;
  75.   // Call the interestTimeout function
  76.   retval = drt_->interestTimeout(handle_entry_);
  77.   if (retval < 0)
  78.     delete this;
  79.   return retval;
  80. }
  81. int FilterKeepaliveCallback::expire()
  82. {
  83.   int retval;
  84.   // Call the filterTimeout function
  85.   retval = drt_->filterKeepaliveTimeout(filter_entry_);
  86.   if (retval < 0)
  87.     delete this;
  88.   return retval;
  89. }
  90. int OldAPITimer::expire()
  91. {
  92.   int retval;
  93.   // Call the callback function with the provided API
  94.   retval = cb_->expire(0, p_);
  95.   if (retval < 0)
  96.     delete this;
  97.   return retval;
  98. }
  99. #ifdef NS_DIFFUSION
  100. class DiffEventQueue;
  101. int DiffusionRouting::getNodeId() {
  102.   return node_->address();
  103. }
  104. int DiffusionRouting::getAgentId(int id) {
  105.   if (id != -1)
  106.     agent_id_ = id;
  107.   return agent_id_;
  108. }
  109. NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
  110.   return(new DiffusionRouting(port, da));
  111. }
  112. #else
  113. NR *dr = NULL;
  114. #ifdef USE_THREADS
  115. void * ReceiveThread(void *dr)
  116. {
  117.   // Never returns
  118.   ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
  119.   return NULL;
  120. }
  121. #endif // USE_THREADS
  122. NR * NR::createNR(u_int16_t port)
  123. {
  124.   // Create Diffusion Routing Class
  125.   if (dr)
  126.     return dr;
  127.   dr = new DiffusionRouting(port);
  128. #ifdef USE_THREADS
  129.   int retval;
  130.   pthread_t thread;
  131.   // Fork a thread for receiving Messages
  132.   retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
  133.   if (retval){
  134.     DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...n");
  135.     exit(-1);
  136.   }
  137. #endif // USE_THREADS
  138.   return dr;
  139. }
  140. #endif // NS_DIFFUSION
  141. void GetLock(pthread_mutex_t *mutex)
  142. {
  143. #ifdef USE_THREADS
  144.   pthread_mutex_lock(mutex);
  145. #endif // USE_THREADS
  146. }
  147. void ReleaseLock(pthread_mutex_t *mutex)
  148. {
  149. #ifdef USE_THREADS
  150.   pthread_mutex_unlock(mutex);
  151. #endif // USE_THREADS
  152. }
  153. #ifdef NS_DIFFUSION
  154. DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
  155. {
  156. #else
  157. DiffusionRouting::DiffusionRouting(u_int16_t port)
  158. {
  159. #ifdef USE_EMSIM
  160.   char *sim_id;
  161.   char *sim_group;
  162. #endif // USE_EMSIM
  163. #endif // NS_DIFFUSION
  164.   struct timeval tv;
  165.   DiffusionIO *device;
  166.   // Initialize basic stuff
  167.   next_handle_ = 1;
  168.   GetTime(&tv);
  169.   SetSeed(&tv);
  170.   pkt_count_ = GetRand();
  171.   random_id_ = GetRand();
  172.   agent_id_ = 0;
  173.   if (port == 0)
  174.     port = DEFAULT_DIFFUSION_PORT;
  175.   diffusion_port_ = port;
  176. #ifdef USE_EMSIM
  177.   // Check if we are running in the emstar simulator
  178.   sim_id = getenv("SIM_ID");
  179.   sim_group = getenv("SIM_GROUP");
  180.   // Update diffusion port if running inside the simulator
  181.   if (sim_id && sim_group){
  182.     diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));
  183.   }
  184. #endif // USE_EMSIM
  185.   // Initialize timer manager
  186.   timers_manager_ = new TimerManager;
  187.   // Initialize input device
  188. #ifdef NS_DIFFUSION
  189.   device = new NsLocal(da);
  190.   local_out_devices_.push_back(device);
  191. #endif // NS_DIFFUSION
  192. #ifdef UDP
  193.   device = new UDPLocal(&agent_id_);
  194.   in_devices_.push_back(device);
  195.   local_out_devices_.push_back(device);
  196. #endif // UDP
  197.   // Print initialization message
  198.   DiffPrint(DEBUG_ALWAYS,
  199.     "Diffusion Routing Agent initializing... Agent Id = %dn",
  200.     agent_id_);
  201. #ifdef USE_THREADS
  202.   // Initialize Semaphores
  203.   dr_mtx_ = new pthread_mutex_t;
  204.   pthread_mutex_init(dr_mtx_, NULL);
  205. #endif // USE_THREADS
  206. }
  207. DiffusionRouting::~DiffusionRouting()
  208. {
  209.   HandleList::iterator itr;
  210.   HandleEntry *current;
  211.   // Delete all Handles
  212.   for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
  213.     current = *itr;
  214.     delete current;
  215.   }
  216.   for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
  217.     current = *itr;
  218.     delete current;
  219.   }
  220. }
  221. handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
  222. {
  223.   NRSimpleAttribute<int> *nr_algorithm = NULL;
  224.   TimerCallback *timer_callback;
  225.   NRAttribute *scope_attr;
  226.   HandleEntry *my_handle;
  227.   // Get lock first
  228.   GetLock(dr_mtx_);
  229.   // Check the published attributes
  230.   if (!checkSubscription(subscribe_attrs)){
  231.     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !n");
  232.     ReleaseLock(dr_mtx_);
  233.     return FAIL;
  234.   }
  235.   // Create and Initialize the handle_entry structute
  236.   my_handle = new HandleEntry;
  237.   my_handle->hdl_ = next_handle_;
  238.   next_handle_++;
  239.   my_handle->cb_ = (NR::Callback *) cb;
  240.   sub_list_.push_back(my_handle);
  241.   // Copy the attributes
  242.   my_handle->attrs_ = CopyAttrs(subscribe_attrs);
  243.   // For subscriptions, scope is global if not specified
  244.   if (!hasScope(subscribe_attrs)){
  245.     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
  246.     my_handle->attrs_->push_back(scope_attr);
  247.   }
  248.   // For One-Phase Pull, we need a subscription id
  249.   nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);
  250.   if (nr_algorithm &&
  251.       nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){
  252.     my_handle->subscription_id_ = GetRand();
  253.     my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,
  254.  my_handle->subscription_id_));
  255.   }
  256.   // Create Interest Timer and add it to the queue
  257.   timer_callback = new InterestCallback(this, my_handle);
  258.   timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
  259.   // Release lock
  260.   ReleaseLock(dr_mtx_);
  261.   return my_handle->hdl_;
  262. }
  263. int DiffusionRouting::unsubscribe(handle subscription_handle)
  264. {
  265.   HandleEntry *my_handle = NULL;
  266.   // Get the lock first
  267.   GetLock(dr_mtx_);
  268.   my_handle = findHandle(subscription_handle, &sub_list_);
  269.   if (!my_handle){
  270.     // Handle doesn't exist, return FAIL
  271.     ReleaseLock(dr_mtx_);
  272.     return FAIL;
  273.   }
  274.   // Handle will be destroyed when next interest timeout happens
  275.   my_handle->valid_ = false;
  276.   // Release the lock
  277.   ReleaseLock(dr_mtx_);
  278.   return OK;
  279. }
  280. handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
  281. {
  282.   HandleEntry *my_handle;
  283.   NRAttribute *scope_attr;
  284.   // Get the lock first
  285.   GetLock(dr_mtx_);
  286.   // Check the published attributes
  287.   if (!checkPublication(publish_attrs)){
  288.     DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !n");
  289.     ReleaseLock(dr_mtx_);
  290.     return FAIL;
  291.   }
  292.   // Create and Initialize the handle_entry structute
  293.   my_handle = new HandleEntry;
  294.   my_handle->hdl_ = next_handle_;
  295.   next_handle_++;
  296.   pub_list_.push_back(my_handle);
  297.   // Copy the attributes
  298.   my_handle->attrs_ = CopyAttrs(publish_attrs);
  299.   // For publications, scope is local if not specified
  300.   if (!hasScope(publish_attrs)){
  301.     scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
  302.     my_handle->attrs_->push_back(scope_attr);
  303.   }
  304.   // Release the lock
  305.   ReleaseLock(dr_mtx_);
  306.   return my_handle->hdl_;
  307. }
  308. int DiffusionRouting::unpublish(handle publication_handle)
  309. {
  310.   HandleEntry *my_handle = NULL;
  311.   // Get the lock first
  312.   GetLock(dr_mtx_);
  313.   my_handle = removeHandle(publication_handle, &pub_list_);
  314.   if (!my_handle){
  315.     // Handle doesn't exist, return FAIL
  316.     ReleaseLock(dr_mtx_);
  317.     return FAIL;
  318.   }
  319.   // Free structures
  320.   delete my_handle;
  321.   // Release the lock
  322.   ReleaseLock(dr_mtx_);
  323.   return OK;
  324. }
  325. int DiffusionRouting::send(handle publication_handle,
  326.    NRAttrVec *send_attrs)
  327. {
  328.   NRSimpleAttribute<int> *nr_algorithm = NULL;
  329.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  330.   int8_t send_message_type = DATA;
  331.   struct timeval current_time;
  332.   HandleEntry *my_handle;
  333.   Message *my_message;
  334.   // Get the lock first
  335.   GetLock(dr_mtx_);
  336.   // Get attributes associated with handle
  337.   my_handle = findHandle(publication_handle, &pub_list_);
  338.   if (!my_handle){
  339.     ReleaseLock(dr_mtx_);
  340.     return FAIL;
  341.   }
  342.   // Check the send attributes
  343.   if (!checkSend(send_attrs)){
  344.     DiffPrint(DEBUG_ALWAYS,
  345.       "Error : Invalid class/scope attributes in send attributes !n");
  346.     ReleaseLock(dr_mtx_);
  347.     return FAIL;
  348.   }
  349.   // Check if it is time to send another exploratory data message
  350.   GetTime(&current_time);
  351.   // Check algorithms
  352.   nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);
  353.   rmst_id_attr = RmstIdAttr.find(send_attrs);
  354.   if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&
  355.       nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){
  356.     // In One-Phase Pull, there are no exploratory messages
  357.     if (TimevalCmp(&current_time, &(my_handle->exploratory_time_)) >= 0){
  358.       // Check if it is a push data message or a regular data message
  359.       if (isPushData(my_handle->attrs_)){
  360. // Push data message
  361. // Update time for the next push exploratory message
  362. GetTime(&(my_handle->exploratory_time_));
  363. my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
  364. send_message_type = PUSH_EXPLORATORY_DATA;
  365.       }
  366.       else{
  367. // Regular data message
  368. // Update time for the next exploratory message
  369. GetTime(&(my_handle->exploratory_time_));
  370. my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
  371.     
  372. send_message_type = EXPLORATORY_DATA;
  373.       }
  374.     }
  375.   }
  376.   // Initialize message structure
  377.   my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
  378.    0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
  379.    LOCALHOST_ADDR);
  380.   // Increment pkt_counter
  381.   pkt_count_++;
  382.   // First, we duplicate the 'publish' attributes
  383.   my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
  384.   // Now, we add the send attributes
  385.   AddAttrs(my_message->msg_attr_vec_, send_attrs);
  386.   // Compute the total number and size of the joined attribute sets
  387.   my_message->num_attr_ = my_message->msg_attr_vec_->size();
  388.   my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
  389.   // Release the lock
  390.   ReleaseLock(dr_mtx_);
  391.   // Send Packet
  392.   sendMessageToDiffusion(my_message);
  393.   delete my_message;
  394.   return OK;
  395. }
  396. int DiffusionRouting::sendRmst(handle publication_handle,
  397.        NRAttrVec *send_attrs, int fragment_size)
  398. {
  399.   NRSimpleAttribute<void *> *rmst_data_attr;
  400.   NRSimpleAttribute<int> *frag_number_attr;
  401.   NRSimpleAttribute<int> *max_frag_attr;
  402.   void *frag_ptr, *blob_ptr;
  403.   char *blob;
  404.   timeval send_interval;
  405.   int retval;
  406.   int id = GetRand() % 500;
  407.   int size;
  408.   int num_frag;
  409.   int max_frag_len;
  410.   // Find RMST blob to send
  411.   rmst_data_attr = RmstDataAttr.find(send_attrs);
  412.   // We must have a RMST data attribute to send
  413.   if(!rmst_data_attr){
  414.     DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !n");
  415.     return FAIL;
  416.   }
  417.   // Copy RMST blob and calculate number of fragments
  418.   blob_ptr = rmst_data_attr->getVal();
  419.   size = rmst_data_attr->getLen();
  420.   blob = new char[size];
  421.   memcpy((void *)blob, blob_ptr, size);
  422.   num_frag = (size + fragment_size - 1) / fragment_size;
  423.   // We index starting at zero
  424.   num_frag--;
  425.   max_frag_len = size - (num_frag * fragment_size);
  426.   DiffPrint(DEBUG_DETAILS,
  427.     "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %dn",
  428.     num_frag, fragment_size, max_frag_len);
  429.   // Prepare attribute vector with RMST attributes
  430.   max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag);
  431.   send_attrs->push_back(max_frag_attr);
  432.   send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
  433.   frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
  434.   send_attrs->push_back(frag_number_attr);
  435.   send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id));
  436.   // Replace the large blob with a blob fragment
  437.   frag_ptr = (void *)&blob[0];
  438.   // The call to setVal will delete the original blob!!
  439.   if (num_frag == 0)
  440.     rmst_data_attr->setVal(frag_ptr, max_frag_len);
  441.   else
  442.     rmst_data_attr->setVal(frag_ptr, fragment_size);
  443.   // Send 1st fragment
  444.   retval = send(publication_handle, send_attrs);
  445.   // Send other fragments
  446.   for (int i = 1; i <= num_frag; i++){
  447.     // Small delay between sending fragments
  448.     send_interval.tv_sec = 0;
  449.     send_interval.tv_usec = 25000;
  450.     select(0, NULL, NULL, NULL, &send_interval);
  451.     // Send next fragment
  452.     frag_number_attr->setVal(i);
  453.     frag_ptr = (void *)&blob[i * fragment_size];
  454.     if (num_frag == i)
  455.       rmst_data_attr->setVal(frag_ptr, max_frag_len);
  456.     else
  457.       rmst_data_attr->setVal(frag_ptr, fragment_size);
  458.     retval = send(publication_handle, send_attrs);
  459.   }
  460.   ClearAttrs(send_attrs);
  461.   delete blob;
  462.   return OK;
  463. }
  464. int DiffusionRouting::addToBlacklist(int32_t node)
  465. {
  466.   ControlMessage *control_blob;
  467.   NRAttribute *ctrl_msg_attr;
  468.   Message *my_message;
  469.   NRAttrVec *attrs;
  470.   control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0);
  471.   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
  472.       (void *)control_blob,
  473.       sizeof(ControlMessage));
  474.   attrs = new NRAttrVec;
  475.   attrs->push_back(ctrl_msg_attr);
  476.   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  477.    0, pkt_count_, random_id_, LOCALHOST_ADDR,
  478.    LOCALHOST_ADDR);
  479.   // Increment pkt_counter
  480.   pkt_count_++;
  481.   // Add attributes to the message
  482.   my_message->msg_attr_vec_ = attrs;
  483.   my_message->num_attr_ = attrs->size();
  484.   my_message->data_len_ = CalculateSize(attrs);
  485.   // Send Packet
  486.   sendMessageToDiffusion(my_message);
  487.   // Delete message
  488.   delete my_message;
  489.   delete control_blob;
  490.   return OK;
  491. }
  492. int DiffusionRouting::clearBlacklist()
  493. {
  494.   ControlMessage *control_blob;
  495.   NRAttribute *ctrl_msg_attr;
  496.   Message *my_message;
  497.   NRAttrVec *attrs;
  498.   
  499.   control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0);
  500.   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
  501.       (void *)control_blob,
  502.       sizeof(ControlMessage));
  503.   attrs = new NRAttrVec;
  504.   attrs->push_back(ctrl_msg_attr);
  505.   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  506.    0, pkt_count_, random_id_, LOCALHOST_ADDR,
  507.    LOCALHOST_ADDR);
  508.   // Increment pkt_counter
  509.   pkt_count_++;
  510.   // Add attributes to the message
  511.   my_message->msg_attr_vec_ = attrs;
  512.   my_message->num_attr_ = attrs->size();
  513.   my_message->data_len_ = CalculateSize(attrs);
  514.   // Send Packet
  515.   sendMessageToDiffusion(my_message);
  516.   // Delete message
  517.   delete my_message;
  518.   delete control_blob;
  519.   
  520.   return OK;
  521. }
  522. handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
  523.    FilterCallback *cb)
  524. {
  525.   FilterEntry *filter_entry;
  526.   NRAttrVec *attrs;
  527.   NRAttribute *ctrl_msg_attr;
  528.   ControlMessage *control_blob;
  529.   Message *my_message;
  530.   TimerCallback *timer_callback;
  531.   // Check parameters
  532.   if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
  533.     DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !n");
  534.     return FAIL;
  535.   }
  536.   // Get lock first
  537.   GetLock(dr_mtx_);
  538.   // Create and Initialize the handle_entry structute
  539.   filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
  540.   next_handle_++;
  541.   filter_entry->cb_ = (FilterCallback *) cb;
  542.   filter_list_.push_back(filter_entry);
  543.   // Copy attributes (keep them for matching later)
  544.   filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
  545.   // Copy the attributes (and add the control attr)
  546.   attrs = CopyAttrs(filter_attrs);
  547.   control_blob = new ControlMessage(ADD_UPDATE_FILTER,
  548.     priority, filter_entry->handle_);
  549.   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
  550.       (void *)control_blob,
  551.       sizeof(ControlMessage));
  552.   attrs->push_back(ctrl_msg_attr);
  553.   // Initialize message structure
  554.   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  555.    0, pkt_count_, random_id_, LOCALHOST_ADDR,
  556.    LOCALHOST_ADDR);
  557.   // Increment pkt_counter
  558.   pkt_count_++;
  559.   // Add attributes to the message
  560.   my_message->msg_attr_vec_ = attrs;
  561.   my_message->num_attr_ = attrs->size();
  562.   my_message->data_len_ = CalculateSize(attrs);
  563.   // Release the lock
  564.   ReleaseLock(dr_mtx_);
  565.   // Send Packet
  566.   sendMessageToDiffusion(my_message);
  567.   // Add keepalive timer to the event queue
  568.   timer_callback = new FilterKeepaliveCallback(this, filter_entry);
  569.   timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
  570.   // Delete message, attribute set and controlblob
  571.   delete my_message;
  572.   delete control_blob;
  573.   return filter_entry->handle_;
  574. }
  575. int DiffusionRouting::removeFilter(handle filter_handle)
  576. {
  577.   FilterEntry *filter_entry = NULL;
  578.   ControlMessage *control_blob;
  579.   NRAttribute *ctrl_msg_attr;
  580.   NRAttrVec *attrs;
  581.   Message *my_message;
  582.   // Get lock first
  583.   GetLock(dr_mtx_);
  584.   filter_entry = findFilter(filter_handle);
  585.   if (!filter_entry){
  586.     // Handle doesn't exist, return FAIL
  587.     ReleaseLock(dr_mtx_);
  588.     return FAIL;
  589.   }
  590.   control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
  591.   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
  592.       (void *)control_blob,
  593.       sizeof(ControlMessage));
  594.   attrs = new NRAttrVec;
  595.   attrs->push_back(ctrl_msg_attr);
  596.   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  597.    0, pkt_count_, random_id_, LOCALHOST_ADDR,
  598.    LOCALHOST_ADDR);
  599.   // Increment pkt_counter
  600.   pkt_count_++;
  601.   // Add attributes to the message
  602.   my_message->msg_attr_vec_ = attrs;
  603.   my_message->num_attr_ = attrs->size();
  604.   my_message->data_len_ = CalculateSize(attrs);
  605.   // Handle will be destroyed when next keepalive timer happens
  606.   filter_entry->valid_ = false;
  607.   // Send Packet
  608.   sendMessageToDiffusion(my_message);
  609.   // Release the lock
  610.   ReleaseLock(dr_mtx_);
  611.   // Delete message
  612.   delete my_message;
  613.   delete control_blob;
  614.   return OK;
  615. }
  616. handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
  617. {
  618.   return (timers_manager_->addTimer(timeout, callback));
  619. }
  620. handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
  621. {
  622.   TimerCallback *callback;
  623.   callback = new OldAPITimer(cb, p);
  624.   return (addTimer(timeout, callback));
  625. }
  626. bool DiffusionRouting::removeTimer(handle hdl)
  627. {
  628.   return (timers_manager_->removeTimer(hdl));
  629. }
  630. int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
  631. {
  632.   FilterEntry *my_entry = NULL;
  633.   ControlMessage *control_blob;
  634.   NRAttribute *ctrl_msg_attr;
  635.   NRAttrVec *attrs;
  636.   Message *my_message;
  637.   // Acquire lock first
  638.   GetLock(dr_mtx_);
  639.   if (filter_entry->valid_){
  640.     // Send keepalive
  641.     control_blob = new ControlMessage(ADD_UPDATE_FILTER,
  642.       filter_entry->priority_,
  643.       filter_entry->handle_);
  644.     ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
  645. (void *)control_blob,
  646. sizeof(ControlMessage));
  647.     attrs = CopyAttrs(filter_entry->filter_attrs_);
  648.     attrs->push_back(ctrl_msg_attr);
  649.     my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  650.      0, pkt_count_, random_id_, LOCALHOST_ADDR,
  651.      LOCALHOST_ADDR);
  652.     // Increment pkt_counter
  653.     pkt_count_++;
  654.     // Add attributes to the message
  655.     my_message->msg_attr_vec_ = attrs;
  656.     my_message->num_attr_ = attrs->size();
  657.     my_message->data_len_ = CalculateSize(attrs);
  658.     // Send Message
  659.     sendMessageToDiffusion(my_message);
  660.     delete my_message;
  661.     delete control_blob;
  662.     // Release lock
  663.     ReleaseLock(dr_mtx_);
  664.     // Reschedule another filter keepalive timer in event queue
  665.     return (FILTER_KEEPALIVE_DELAY);
  666.   }
  667.   else{
  668.     // Filter was removed
  669.     my_entry = deleteFilter(filter_entry->handle_);
  670.     // We should have removed the correct handle
  671.     if (my_entry != filter_entry){
  672.       DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !n");
  673.       exit(-1);
  674.     }
  675.     delete my_entry;
  676.     // Release lock
  677.     ReleaseLock(dr_mtx_);
  678.     return -1;
  679.   }
  680. }
  681. int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
  682. {
  683.   HandleEntry *my_handle = NULL;
  684.   Message *my_message;
  685.   // Acquire lock first
  686.   GetLock(dr_mtx_);
  687.   if (handle_entry->valid_){
  688.     // Send the interest message if entry is still valid
  689.     my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
  690.      0, pkt_count_, random_id_, LOCALHOST_ADDR,
  691.      LOCALHOST_ADDR);
  692.     // Increment pkt_counter
  693.     pkt_count_++;
  694.     // Add attributes to the message
  695.     my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
  696.     my_message->num_attr_ = handle_entry->attrs_->size();
  697.     my_message->data_len_ = CalculateSize(handle_entry->attrs_);
  698.     // Send Packet
  699.     sendMessageToDiffusion(my_message);
  700.     delete my_message;
  701.     // Release lock
  702.     ReleaseLock(dr_mtx_);
  703.     // Reschedule this timer in the queue
  704.     return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) /
  705. INTEREST_LAMBDA));
  706.   }
  707.   else{
  708.     // Interest was canceled. Just delete it from the handle_list
  709.     my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
  710.     // We should have removed the correct handle
  711.     if (my_handle != handle_entry){
  712.       DiffPrint(DEBUG_ALWAYS,
  713. "Error: interestTimeout: Handles should match !n");
  714.       exit(-1);
  715.     }
  716.     delete my_handle;
  717.     // Release lock
  718.     ReleaseLock(dr_mtx_);
  719.     // Delete timer from the queue
  720.     return -1;
  721.   }
  722. }
  723. int DiffusionRouting::sendMessage(Message *msg, handle h,
  724.   u_int16_t priority)
  725. {
  726.   RedirectMessage *original_hdr;
  727.   NRAttribute *original_attr, *ctrl_msg_attr;
  728.   ControlMessage *control_blob;
  729.   NRAttrVec *attrs;
  730.   Message *my_message;
  731.   if ((priority < FILTER_MIN_PRIORITY) ||
  732.       (priority > FILTER_KEEP_PRIORITY))
  733.     return FAIL;
  734.   // Create an attribute with the original header
  735.   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
  736.      msg->source_port_, msg->data_len_,
  737.      msg->num_attr_, msg->rdm_id_,
  738.      msg->pkt_num_, msg->next_hop_,
  739.      msg->last_hop_, 0,
  740.      msg->next_port_);
  741.   original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
  742.        sizeof(RedirectMessage));
  743.   // Create the attribute with the control message
  744.   control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
  745.   ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
  746.       sizeof(ControlMessage));
  747.   // Copy Attributes and add originalAttr and controlAttr
  748.   attrs = CopyAttrs(msg->msg_attr_vec_);
  749.   attrs->push_back(original_attr);
  750.   attrs->push_back(ctrl_msg_attr);
  751.   my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
  752.    0, pkt_count_, random_id_, LOCALHOST_ADDR,
  753.    LOCALHOST_ADDR);
  754.   // Increment pkt_counter
  755.   pkt_count_++;
  756.   // Add attributes to the message
  757.   my_message->msg_attr_vec_ = attrs;
  758.   my_message->num_attr_ = attrs->size();
  759.   my_message->data_len_ = CalculateSize(attrs);
  760.   // Send Packet
  761.   sendMessageToDiffusion(my_message);
  762.   delete my_message;
  763.   delete control_blob;
  764.   delete original_hdr;
  765.   return OK;
  766. }
  767. #ifndef NS_DIFFUSION
  768. void DiffusionRouting::doIt()
  769. {
  770.   run(true, WAIT_FOREVER);
  771. }
  772. void DiffusionRouting::doOne(long timeout)
  773. {
  774.   run(false, timeout);
  775. }
  776. void DiffusionRouting::run(bool wait_condition, long max_timeout)
  777. {
  778.   DeviceList::iterator itr;
  779.   int status, max_sock, fd;
  780.   bool flag;
  781.   DiffPacket in_pkt;
  782.   fd_set fds;
  783.   struct timeval tv;
  784.   struct timeval max_tv;
  785.   do{
  786.     FD_ZERO(&fds);
  787.     max_sock = 0;
  788.     // Set the maximum timeout value
  789.     max_tv.tv_sec = (int) (max_timeout / 1000);
  790.     max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
  791.     for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
  792.       (*itr)->addInFDS(&fds, &max_sock);
  793.     }
  794.     // Check for the next timer
  795.     timers_manager_->nextTimerTime(&tv);
  796.     if (tv.tv_sec == MAXVALUE){
  797.       // If we don't have any timers, we wait for POLLING_INTERVAL
  798.       if (max_timeout == WAIT_FOREVER){
  799. tv.tv_sec = POLLING_INTERVAL;
  800. tv.tv_usec = 0;
  801.       }
  802.       else{
  803. tv = max_tv;
  804.       }
  805.     }
  806.     else{
  807.       if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
  808. // max_timeout value is smaller than next timer's time, so we
  809. // use themax_timeout value instead
  810. tv = max_tv;
  811.       }
  812.     }
  813.     status = select(max_sock+1, &fds, NULL, NULL, &tv);
  814.     if (status == 0){
  815.       // Process all timers that have expired
  816.       timers_manager_->executeAllExpiredTimers();
  817.     }
  818.     if (status > 0){
  819.       do{
  820. flag = false;
  821. for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
  822.   fd = (*itr)->checkInFDS(&fds);
  823.   if (fd != -1){
  824.     // Message waiting
  825.     in_pkt = (*itr)->recvPacket(fd);
  826.     recvPacket(in_pkt);
  827.     // Clear this fd
  828.     FD_CLR(fd, &fds);
  829.     status--;
  830.     flag = true;
  831.   }
  832. }
  833.       } while ((status > 0) && (flag == true));
  834.     }
  835.     else
  836.       if (status < 0){
  837. DiffPrint(DEBUG_IMPORTANT, "Select returned %dn", status);
  838.       }
  839.   } while (wait_condition);
  840. }
  841. #endif // NS_DIFFUSION
  842. #ifndef NS_DIFFUSION
  843. void DiffusionRouting::sendMessageToDiffusion(Message *msg)
  844. {
  845.   DiffPacket out_pkt = NULL;
  846.   struct hdr_diff *dfh;
  847.   char *pos;
  848.   int len;
  849.   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
  850.   dfh = HDR_DIFF(out_pkt);
  851.   pos = (char *) out_pkt;
  852.   pos = pos + sizeof(struct hdr_diff);
  853.   len = PackAttrs(msg->msg_attr_vec_, pos);
  854.   LAST_HOP(dfh) = htonl(msg->last_hop_);
  855.   NEXT_HOP(dfh) = htonl(msg->next_hop_);
  856.   DIFF_VER(dfh) = msg->version_;
  857.   MSG_TYPE(dfh) = msg->msg_type_;
  858.   DATA_LEN(dfh) = htons(len);
  859.   PKT_NUM(dfh) = htonl(msg->pkt_num_);
  860.   RDM_ID(dfh) = htonl(msg->rdm_id_);
  861.   NUM_ATTR(dfh) = htons(msg->num_attr_);
  862.   SRC_PORT(dfh) = htons(msg->source_port_);
  863.   sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
  864.   delete [] out_pkt;
  865. }
  866. #else
  867. void DiffusionRouting::sendMessageToDiffusion(Message *msg)
  868. {
  869.   Message *my_msg;
  870.   DeviceList::iterator itr;
  871.   int len;
  872.   my_msg = CopyMessage(msg);
  873.   len = CalculateSize(my_msg->msg_attr_vec_);
  874.   len = len + sizeof(struct hdr_diff);
  875.   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
  876.     (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
  877.   }
  878. }
  879. #endif // !NS_DIFFUSION
  880. void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
  881. {
  882.   DeviceList::iterator itr;
  883.   for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
  884.     (*itr)->sendPacket(pkt, len, dst);
  885.   }
  886. }
  887. #ifndef NS_DIFFUSION
  888. void DiffusionRouting::recvPacket(DiffPacket pkt)
  889. {
  890.   struct hdr_diff *dfh = HDR_DIFF(pkt);
  891.   Message *rcv_message = NULL;
  892.   int8_t version, msg_type;
  893.   u_int16_t data_len, num_attr, source_port;
  894.   int32_t pkt_num, rdm_id, next_hop, last_hop;
  895.   // Read header
  896.   version = DIFF_VER(dfh);
  897.   msg_type = MSG_TYPE(dfh);
  898.   source_port = ntohs(SRC_PORT(dfh));
  899.   pkt_num = ntohl(PKT_NUM(dfh));
  900.   rdm_id = ntohl(RDM_ID(dfh));
  901.   num_attr = ntohs(NUM_ATTR(dfh));
  902.   next_hop = ntohl(NEXT_HOP(dfh));
  903.   last_hop = ntohl(LAST_HOP(dfh));
  904.   data_len = ntohs(DATA_LEN(dfh));
  905.   // Create a message structure from the incoming packet
  906.   rcv_message = new Message(version, msg_type, source_port, data_len,
  907.     num_attr, pkt_num, rdm_id, next_hop, last_hop);
  908.   // Read all attributes into the Message structure
  909.   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
  910.   // Process the incoming message
  911.   recvMessage(rcv_message);
  912.   // We are done
  913.   delete rcv_message;
  914.   delete [] pkt;
  915. }
  916. #endif // !NS_DIFFUSION
  917. void DiffusionRouting::recvMessage(Message *msg)
  918. {
  919.   // Check version
  920.   if (msg->version_ != DIFFUSION_VERSION)
  921.     return;
  922.   // Check destination
  923.   if (msg->next_hop_ != LOCALHOST_ADDR)
  924.     return;
  925.   // Process the incoming message
  926.   if (msg->msg_type_ == REDIRECT)
  927.     processControlMessage(msg);
  928.   else
  929.     processMessage(msg);
  930. }
  931. void DiffusionRouting::processControlMessage(Message *msg)
  932. {
  933.   NRSimpleAttribute<void *> *original_header_attr = NULL;
  934.   NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
  935.   RedirectMessage *original_header;
  936.   FilterEntry *entry;
  937.   handle my_handle;
  938.   // Find the attribute containing the original packet header
  939.   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
  940.    place, &place);
  941.   if (!original_header_attr){
  942.     DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !n");
  943.     return;
  944.   }
  945.   // Restore original message header
  946.   original_header = (RedirectMessage *) original_header_attr->getVal();
  947.   my_handle = original_header->handle_;
  948.   msg->msg_type_ = original_header->msg_type_;
  949.   msg->source_port_ = original_header->source_port_;
  950.   msg->pkt_num_ = original_header->pkt_num_;
  951.   msg->rdm_id_ = original_header->rdm_id_;
  952.   msg->next_hop_ = original_header->next_hop_;
  953.   msg->last_hop_ = original_header->last_hop_;
  954.   msg->num_attr_ = original_header->num_attr_;
  955.   msg->new_message_ = original_header->new_message_;
  956.   msg->next_port_ = original_header->next_port_;
  957.   // Delete attribute from the original set
  958.   msg->msg_attr_vec_->erase(place);
  959.   delete original_header_attr;
  960.   // Find the right callback
  961.   GetLock(dr_mtx_);
  962.   entry = findFilter(my_handle);
  963.   if (entry && entry->valid_){
  964.     // Just to confirm
  965.     if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
  966.       ReleaseLock(dr_mtx_);
  967.       entry->cb_->recv(msg, my_handle);
  968.       return;
  969.     }
  970.     else{
  971.       DiffPrint(DEBUG_ALWAYS,
  972. "Warning: Filter doesn't match incoming message's attributes !n");
  973.     }
  974.   }
  975.   else{
  976.     DiffPrint(DEBUG_IMPORTANT,
  977.       "Report: Cannot find filter (possibly deleted ?)n");
  978.   }
  979.   ReleaseLock(dr_mtx_);
  980. }
  981. void DiffusionRouting::processMessage(Message *msg)
  982. {
  983.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  984.   CallbackList::iterator cbl_itr;
  985.   HandleList::iterator sub_itr;
  986.   NRAttrVec *callback_attrs;
  987.   HandleEntry *entry; 
  988.   CallbackEntry *aux;
  989.   CallbackList cbl;
  990.   // First, acquire the lock
  991.   GetLock(dr_mtx_);
  992.   for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
  993.     entry = *sub_itr;
  994.     if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
  995.       if (entry->cb_){
  996. aux = new CallbackEntry(entry->cb_, entry->hdl_);
  997. cbl.push_back(aux);
  998.       }
  999.   }
  1000.   // We can release the lock now
  1001.   ReleaseLock(dr_mtx_);
  1002.   // Check for RMST id attribute
  1003.   rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
  1004.   cbl_itr = cbl.begin();
  1005.   // Process RMST fragment if we have callbacks and this message has an RmstId
  1006.   if (rmst_id_attr && (cbl_itr != cbl.end())){
  1007.     if (!processRmst(msg)){
  1008.       cbl.clear();
  1009.       return;
  1010.     }
  1011.   }
  1012.   // Now we just call all callback functions
  1013.   for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
  1014.     // Copy attributes
  1015.     callback_attrs = CopyAttrs(msg->msg_attr_vec_);
  1016.     // Call app-specific callback function
  1017.     aux = *cbl_itr;
  1018.     aux->cb_->recv(callback_attrs, aux->subscription_handle_);
  1019.     delete aux;
  1020.     // Clean up callback attributes
  1021.     ClearAttrs(callback_attrs);
  1022.     delete callback_attrs;
  1023.   }
  1024.   // We are done
  1025.   cbl.clear();
  1026. }
  1027. bool DiffusionRouting::processRmst(Message *msg)
  1028. {
  1029.   NRSimpleAttribute<void *> *data_buf_attr = NULL;
  1030.   NRSimpleAttribute<int> *max_frag_attr = NULL;
  1031.   NRSimpleAttribute<int> *rmst_id_attr = NULL;
  1032.   NRSimpleAttribute<int> *frag_attr = NULL;
  1033.   int rmst_no, frag_no, data_buf_len, count;
  1034.   void *blob_ptr, *tmp_frag_ptr;
  1035.   Int2RecRmst::iterator rmst_iterator;
  1036.   Int2Frag::iterator frag_iterator;
  1037.   char *dstPtr;
  1038.   int dstSize;
  1039.   RecRmst *rmst_ptr;
  1040.   // Read Rmst attributes
  1041.   data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_);
  1042.   rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
  1043.   frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
  1044.   rmst_no = rmst_id_attr->getVal();
  1045.   frag_no = frag_attr->getVal();
  1046.   blob_ptr = data_buf_attr->getVal();
  1047.   data_buf_len = data_buf_attr->getLen();
  1048.   // See if we are receiving this blob, if not start a new RecRmst
  1049.   rmst_iterator = rec_rmst_map_.find(rmst_no);
  1050.   if (rmst_iterator == rec_rmst_map_.end()){
  1051.     rmst_ptr = new RecRmst(rmst_no);
  1052.     rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr));
  1053.   }
  1054.   else
  1055.     rmst_ptr = (*rmst_iterator).second;
  1056.   if (frag_no == 0){
  1057.     max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_);
  1058.     rmst_ptr->max_frag_ = max_frag_attr->getVal();
  1059.     rmst_ptr->mtu_len_ = data_buf_len;
  1060.   }
  1061.   // Copy fragment to map
  1062.   tmp_frag_ptr = new char[data_buf_len];
  1063.   memcpy(tmp_frag_ptr, blob_ptr, data_buf_len);
  1064.   rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr));
  1065.   if (frag_no == rmst_ptr->max_frag_)
  1066.     rmst_ptr->max_frag_len_ = data_buf_len;
  1067.   count = rmst_ptr->frag_map_.size();
  1068.   // If this is the last rmst fragment, create the entire rmst
  1069.   if (count == (rmst_ptr->max_frag_ + 1)){
  1070.     
  1071.     DiffPrint(DEBUG_DETAILS, 
  1072.       "RMST #%d is complete, creating big blob !n", rmst_no);
  1073.     // Allocate memory for the big blob
  1074.     dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_;
  1075.     dstPtr = new char[dstSize];
  1076.     
  1077.     // Copy all but last fragment to a buffer
  1078.     for (int i = 0; i < rmst_ptr->max_frag_; i++){
  1079.       frag_iterator = rmst_ptr->frag_map_.find(i);
  1080.       tmp_frag_ptr = (*frag_iterator).second;
  1081.       memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_],
  1082.      (void *)tmp_frag_ptr, rmst_ptr->mtu_len_);
  1083.     }
  1084.     // Now, copy the last fragment to the buffer
  1085.     frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_);
  1086.     tmp_frag_ptr = (*frag_iterator).second;
  1087.     memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_],
  1088.    (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_);
  1089.     // Since we copied everything from the map - clean it up
  1090.     rec_rmst_map_.erase(rmst_iterator);
  1091.     delete rmst_ptr;
  1092.     // Now we substitute the last fragment with the reconstructed blob
  1093.     data_buf_attr->setVal(dstPtr, dstSize);
  1094.     // Deliver this to the application
  1095.     return true;
  1096.   }
  1097.   // We don't have the entire blob
  1098.   return false;
  1099. }
  1100. HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
  1101. {
  1102.   HandleList::iterator itr;
  1103.   HandleEntry *entry;
  1104.   for (itr = hl->begin(); itr != hl->end(); ++itr){
  1105.     entry = *itr;
  1106.     if (entry->hdl_ == my_handle){
  1107.       hl->erase(itr);
  1108.       return entry;
  1109.     }
  1110.   }
  1111.   return NULL;
  1112. }
  1113. HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
  1114. {
  1115.   HandleList::iterator itr;
  1116.   HandleEntry *entry;
  1117.   for (itr = hl->begin(); itr != hl->end(); ++itr){
  1118.     entry = *itr;
  1119.     if (entry->hdl_ == my_handle)
  1120.       return entry;
  1121.   }
  1122.   return NULL;
  1123. }
  1124. FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
  1125. {
  1126.   FilterList::iterator itr;
  1127.   FilterEntry *entry;
  1128.   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
  1129.     entry = *itr;
  1130.     if (entry->handle_ == my_handle){
  1131.       filter_list_.erase(itr);
  1132.       return entry;
  1133.     }
  1134.   }
  1135.   return NULL;
  1136. }
  1137. FilterEntry * DiffusionRouting::findFilter(handle my_handle)
  1138. {
  1139.   FilterList::iterator itr;
  1140.   FilterEntry *entry;
  1141.   for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
  1142.     entry = *itr;
  1143.     if (entry->handle_ == my_handle)
  1144.       return entry;
  1145.   }
  1146.   return NULL;
  1147. }
  1148. bool DiffusionRouting::hasScope(NRAttrVec *attrs)
  1149. {
  1150.   NRAttribute *temp = NULL;
  1151.   temp = NRScopeAttr.find(attrs);
  1152.   if (temp)
  1153.     return true;
  1154.   return false;
  1155. }
  1156. bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
  1157. {
  1158.   NRSimpleAttribute<int> *nrclass = NULL;
  1159.   NRSimpleAttribute<int> *nrscope = NULL;
  1160.   // We first try to locate both class and scope attributes
  1161.   nrclass = NRClassAttr.find(attrs);
  1162.   nrscope = NRScopeAttr.find(attrs);
  1163.   // There must be a class attribute in subscriptions
  1164.   if (!nrclass)
  1165.     return false;
  1166.   if (nrscope){
  1167.     // This subcription has both class and scope attribute. So, we
  1168.     // check if class/scope attributes comply with the Diffusion
  1169.     // Routing API
  1170.     // Must check scope's operator. The API requires it to be "IS"
  1171.     if (nrscope->getOp() != NRAttribute::IS)
  1172.       return false;
  1173.     // Ok, so first check if this is a global subscription
  1174.     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
  1175. (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
  1176. (nrclass->getOp() == NRAttribute::IS))
  1177.       return true;
  1178.     // Check for local subscriptions
  1179.     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
  1180.       return true;
  1181.     // Just to be sure we did not miss any case
  1182.     return false;
  1183.   }
  1184.   // If there is no scope attribute, we will insert one later if this
  1185.   // subscription looks like a global subscription
  1186.   if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
  1187.       (nrclass->getOp() == NRAttribute::IS))
  1188.     return true;
  1189.   return false;
  1190. }
  1191. bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
  1192. {
  1193.   NRSimpleAttribute<int> *nrclass = NULL;
  1194.   NRSimpleAttribute<int> *nrscope = NULL;
  1195.   // We first try to locate both class and scope attributes
  1196.   nrclass = NRClassAttr.find(attrs);
  1197.   nrscope = NRScopeAttr.find(attrs);
  1198.   // There must be a class attribute in the publication
  1199.   if (!nrclass)
  1200.     return false;
  1201.   // In addition, the Diffusion Routing API requires the class
  1202.   // attribute to be set to "IS DATA_CLASS"
  1203.   if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
  1204.       (nrclass->getOp() != NRAttribute::IS))
  1205.     return false;
  1206.   if (nrscope){
  1207.     // Ok, so this publication has both class and scope attributes. We
  1208.     // now have to check if they comply to the Diffusion Routing API
  1209.     // semantics for publish
  1210.     // Must check scope's operator. The API requires it to be "IS"
  1211.     if (nrscope->getOp() != NRAttribute::IS)
  1212.       return false;
  1213.     // We accept both global and local scope data messages
  1214.     if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
  1215. (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
  1216.       return true;
  1217.     // Just not to miss any case
  1218.     return false;
  1219.   }
  1220.   // A publish without a scope attribute is fine, we will include a
  1221.   // default NODE_LOCAL_SCOPE attribute later
  1222.   return true;
  1223. }
  1224. bool DiffusionRouting::checkSend(NRAttrVec *attrs)
  1225. {
  1226.   NRSimpleAttribute<int> *nrclass = NULL;
  1227.   NRSimpleAttribute<int> *nrscope = NULL;
  1228.   // Currently only checks for Class and Scope attributes
  1229.   nrclass = NRClassAttr.find(attrs);
  1230.   nrscope = NRScopeAttr.find(attrs);
  1231.   if (nrclass || nrscope)
  1232.     return false;
  1233.   return true;
  1234. }
  1235. bool DiffusionRouting::isPushData(NRAttrVec *attrs)
  1236. {
  1237.   NRSimpleAttribute<int> *nrclass = NULL;
  1238.   NRSimpleAttribute<int> *nrscope = NULL;
  1239.   // Currently only checks for Class and Scope attributes
  1240.   nrclass = NRClassAttr.find(attrs);
  1241.   nrscope = NRScopeAttr.find(attrs);
  1242.   // We should have both class and scope
  1243.   if (nrclass && nrscope){
  1244.     if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
  1245.       return false;
  1246.     return true;
  1247.   }
  1248.   else{
  1249.     DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !n");
  1250.     return false;
  1251.   }
  1252. }