filter_core.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:36k
源码类别:

通讯编程

开发平台:

Visual C++

  1. // 
  2. // filter_core.cc  : Main Diffusion program
  3. // authors         : Chalermek Intanagonwiwat and Fabio Silva
  4. //
  5. // Copyright (C) 2000-2003 by the University of Southern California
  6. // $Id: filter_core.cc,v 1.4 2005/09/13 04:53:47 tomh Exp $
  7. //
  8. // This program is free software; you can redistribute it and/or
  9. // modify it under the terms of the GNU General Public License,
  10. // version 2, as published by the Free Software Foundation.
  11. //
  12. // This program is distributed in the hope that it will be useful,
  13. // but WITHOUT ANY WARRANTY; without even the implied warranty of
  14. // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  15. // GNU General Public License for more details.
  16. //
  17. // You should have received a copy of the GNU General Public License along
  18. // with this program; if not, write to the Free Software Foundation, Inc.,
  19. // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  20. //
  21. // Linking this file statically or dynamically with other modules is making
  22. // a combined work based on this file.  Thus, the terms and conditions of
  23. // the GNU General Public License cover the whole combination.
  24. //
  25. // In addition, as a special exception, the copyright holders of this file
  26. // give you permission to combine this file with free software programs or
  27. // libraries that are released under the GNU LGPL and with code included in
  28. // the standard release of ns-2 under the Apache 2.0 license or under
  29. // otherwise-compatible licenses with advertising requirements (or modified
  30. // versions of such code, with unchanged license).  You may copy and
  31. // distribute such a system following the terms of the GNU GPL for this
  32. // file and the licenses of the other code concerned, provided that you
  33. // include the source code of that other code when and as the GNU GPL
  34. // requires distribution of source code.
  35. //
  36. // Note that people who make modified versions of this file are not
  37. // obligated to grant this special exception for their modified versions;
  38. // it is their choice whether to do so.  The GNU General Public License
  39. // gives permission to release a modified version without this exception;
  40. // this exception also makes it possible to release a modified version
  41. // which carries forward this exception.
  42. #include "filter_core.hh"
  43. #ifndef NS_DIFFUSION
  44. DiffusionCoreAgent *agent;
  45. #endif // !NS_DIFFUSION
  46. class HashEntry {
  47. public:
  48.   bool dummy;
  49.   HashEntry() { 
  50.     dummy  = false;
  51.   }
  52. };
  53. class NeighborEntry {
  54. public:
  55.   int32_t id;
  56.   struct timeval tmv;
  57.   NeighborEntry(int _id) : id(_id)
  58.   {
  59.     GetTime(&tmv);
  60.   }
  61. };
  62. int NeighborsTimeoutTimer::expire()
  63. {
  64.   agent_->neighborsTimeout();
  65.   return 0;
  66. }
  67. int FilterTimeoutTimer::expire()
  68. {
  69.   agent_->filterTimeout();
  70.   return 0;
  71. }
  72. int DiffusionStopTimer::expire()
  73. {
  74.   agent_->timeToStop();
  75. #ifndef NS_DIFFUSION
  76.   exit(0);
  77. #endif // !NS_DIFFUSION
  78.   // Never gets here !
  79.   return 0;
  80. }
  81. void DiffusionCoreAgent::timeToStop()
  82. {
  83. #ifdef STATS
  84.   char out_filename[100];
  85.   FILE *outfile = NULL;
  86.   if (stats_){
  87.     sprintf(out_filename, "/tmp/diffusion-%d.out", my_id_);
  88.     outfile = fopen(out_filename, "w");
  89.     if (outfile == NULL){
  90.       DiffPrint(DEBUG_ALWAYS,
  91. "Diffusion Error: Cannot create %sn", out_filename);
  92.       return;
  93.     }
  94.     stats_->printStats(stdout);
  95.     if (outfile){
  96.       stats_->printStats(outfile);
  97.       fclose(outfile);
  98.     }
  99.   }
  100. #endif // STATS
  101. }
  102. #ifndef NS_DIFFUSION
  103. void signal_handler(int p)
  104. {
  105.   agent->timeToStop();
  106.   exit(0);
  107. }
  108. void DiffusionCoreAgent::usage(char *s)
  109. {
  110.   DiffPrint(DEBUG_ALWAYS, "Usage: %s [-d debug] [-f filename] [-t stoptime] [-v] [-h] [-p port]", s);
  111. #ifdef IO_LOG
  112.   DiffPrint(DEBUG_ALWAYS, " [-l]");
  113. #endif // IO_LOG
  114. #ifdef STATS
  115.   DiffPrint(DEBUG_ALWAYS, " [-s] [-i warm_up_time]");
  116. #endif // STATS
  117.   DiffPrint(DEBUG_ALWAYS, "nn");
  118.   DiffPrint(DEBUG_ALWAYS, "t-d - Sets debug level (0-10)n");
  119.   DiffPrint(DEBUG_ALWAYS, "t-t - Stops after stoptime secondsn");
  120.   DiffPrint(DEBUG_ALWAYS, "t-f - Uses filename as the config filen");
  121.   DiffPrint(DEBUG_ALWAYS, "t-v - Prints diffusion versionn");
  122.   DiffPrint(DEBUG_ALWAYS, "t-h - Prints this informationn");
  123.   DiffPrint(DEBUG_ALWAYS, "t-p - Sets diffusion port to portn");
  124. #ifdef IO_LOG
  125.   DiffPrint(DEBUG_ALWAYS, "t-l - Turns on i/o loggingn");
  126. #endif // IO_LOG
  127. #ifdef STATS
  128.   DiffPrint(DEBUG_ALWAYS, "t-s - Disables statisticsn");
  129.   DiffPrint(DEBUG_ALWAYS, "t-i - Ignores traffic from the first warm_up_time seconds for statsn");
  130. #endif // STATS
  131.   DiffPrint(DEBUG_ALWAYS, "n");
  132.   exit(0);
  133. }
  134. void DiffusionCoreAgent::run()
  135. {
  136.   DeviceList::iterator device_itr;
  137.   DiffPacket in_pkt;
  138.   fd_set fds;
  139.   bool flag;
  140.   int status, max_sock, fd;
  141.   struct timeval tv;
  142.   // Main Select Loop
  143.   while (1){
  144.     // Wait for incoming packets
  145.     FD_ZERO(&fds);
  146.     max_sock = 0;
  147.     // Figure out how much time to wait
  148.     timers_manager_->nextTimerTime(&tv);
  149.     if (tv.tv_sec == 0 && tv.tv_usec == 0){
  150.       // Timer has expired !
  151.       timers_manager_->executeAllExpiredTimers();
  152.       continue;
  153.     }
  154.     for (device_itr = in_devices_.begin();
  155.  device_itr != in_devices_.end(); ++device_itr){
  156.       (*device_itr)->addInFDS(&fds, &max_sock);
  157.     }
  158.     status = select(max_sock+1, &fds, NULL, NULL, &tv);
  159.     if (status == 0){
  160.       // We process all expired timers
  161.       timers_manager_->executeAllExpiredTimers();
  162.     }
  163.     // Check for new packets
  164.     if (status > 0){
  165.       do{
  166. flag = false;
  167. for (device_itr = in_devices_.begin();
  168.      device_itr != in_devices_.end(); ++device_itr){
  169.   fd = (*device_itr)->checkInFDS(&fds);
  170.   if (fd != -1){
  171.     // Message waiting
  172.     in_pkt = (*device_itr)->recvPacket(fd);
  173.     if (in_pkt)
  174.       recvPacket(in_pkt);
  175.     // Clear this fd
  176.     FD_CLR(fd, &fds);
  177.     status--;
  178.     flag = true;
  179.   }
  180. }
  181.       } while ((status > 0) && (flag == true));
  182.     }
  183.     // This should not happen
  184.     if (status < 0){
  185.       DiffPrint(DEBUG_IMPORTANT, "Select returned %dn", status);
  186.     }
  187.   }
  188. }
  189. #endif // !NS_DIFFUSION
  190. void DiffusionCoreAgent::neighborsTimeout()
  191. {
  192.   struct timeval tmv;
  193.   NeighborEntry *neighbor_entry;
  194.   NeighborList::iterator neighbor_itr;
  195.   DiffPrint(DEBUG_MORE_DETAILS, "Neighbors Timeout !n");
  196.   GetTime(&tmv);
  197.   neighbor_itr = neighbor_list_.begin();
  198.   while(neighbor_itr != neighbor_list_.end()){
  199.     neighbor_entry = *neighbor_itr;
  200.     if (tmv.tv_sec > neighbor_entry->tmv.tv_sec + NEIGHBORS_TIMEOUT){
  201.       // This neighbor expired
  202.       neighbor_itr = neighbor_list_.erase(neighbor_itr);
  203.       delete neighbor_entry;
  204.     }
  205.     else{
  206.       neighbor_itr++;
  207.     }
  208.   }
  209. }
  210. void DiffusionCoreAgent::filterTimeout()
  211. {
  212.   struct timeval tmv;
  213.   FilterEntry *filter_entry;
  214.   FilterList::iterator filter_itr;
  215.   DiffPrint(DEBUG_MORE_DETAILS, "Filter Timeout !n");
  216.   GetTime(&tmv);
  217.   filter_itr = filter_list_.begin();
  218.   while(filter_itr != filter_list_.end()){
  219.     filter_entry = *filter_itr;
  220.     if (tmv.tv_sec > filter_entry->tmv_.tv_sec + FILTER_TIMEOUT){
  221.       // This filter expired
  222.       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d timed out !n",
  223. filter_entry->agent_, filter_entry->handle_,
  224. filter_entry->priority_);
  225.       filter_itr = filter_list_.erase(filter_itr);
  226.       delete filter_entry;
  227.     }
  228.     else{
  229.       filter_itr++;
  230.     }
  231.   }
  232. }
  233. void DiffusionCoreAgent::sendMessage(Message *msg)
  234. {
  235.   Tcl_HashEntry *tcl_hash_entry;
  236.   unsigned int key[2];
  237.   Message *send_message;
  238.   
  239.   send_message = new Message(DIFFUSION_VERSION, msg->msg_type_, diffusion_port_,
  240.      0, 0, msg->pkt_num_, msg->rdm_id_,
  241.      msg->next_hop_, 0);
  242.   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
  243.   send_message->num_attr_ = send_message->msg_attr_vec_->size();
  244.   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
  245.   // Adjust message size for logging and check hash
  246.   key[0] = msg->pkt_num_;
  247.   key[1] = msg->rdm_id_;
  248.   tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
  249.   if (tcl_hash_entry)
  250.     msg->new_message_ = 0;
  251.   else
  252.     msg->new_message_ = 1;
  253.   send_message->new_message_ = msg->new_message_;
  254.   // Check if message goes to an agent or the network
  255.   if (msg->next_port_){
  256.     // Message goes to an agent
  257.     send_message->last_hop_ = LOCALHOST_ADDR;
  258.     // If it's a local message, it has to go to a local agent
  259.     if (send_message->next_hop_ != LOCALHOST_ADDR){
  260.       DiffPrint(DEBUG_ALWAYS, "Error: Message destination is a local agent but next_hop != LOCALHOST_ADDR !n");
  261.       delete send_message;
  262.       return;
  263.     }
  264.     // Send the message to the agent specified
  265.     sendMessageToLibrary(send_message, msg->next_port_);
  266.   }
  267.   else{
  268.     // Message goes to the network
  269.     send_message->last_hop_ = my_id_;
  270. #ifdef STATS
  271.     if (stats_)
  272.       stats_->logOutgoingMessage(send_message);
  273. #endif // STATS
  274.     // Add message to the hash table      
  275.     if (tcl_hash_entry == NULL)
  276.       putHash(key[0], key[1]);
  277.     else
  278.       DiffPrint(DEBUG_DETAILS, "Node%d: Message being sent is an old message !n", my_id_);
  279.     // Send Message
  280.     sendMessageToNetwork(send_message);
  281.   }
  282.   delete send_message;
  283. }
  284. void DiffusionCoreAgent::forwardMessage(Message *msg, FilterEntry *filter_entry)
  285. {
  286.   RedirectMessage *original_hdr;
  287.   NRAttribute *original_header_attr;
  288.   Message *send_message;
  289.   // Create an attribute with the original header
  290.   original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
  291.      msg->source_port_, msg->data_len_,
  292.      msg->num_attr_, msg->rdm_id_,
  293.      msg->pkt_num_, msg->next_hop_,
  294.      msg->last_hop_, filter_entry->handle_,
  295.      msg->next_port_);
  296.   original_header_attr = OriginalHdrAttr.make(NRAttribute::IS,
  297.       (void *)original_hdr,
  298.       sizeof(RedirectMessage));
  299.   send_message = new Message(DIFFUSION_VERSION, REDIRECT, diffusion_port_, 0,
  300.      0, pkt_count_, random_id_, LOCALHOST_ADDR, my_id_);
  301.   // Increment pkt_counter
  302.   pkt_count_++;
  303.   // Duplicate the message's attributes
  304.   send_message->msg_attr_vec_ = CopyAttrs(msg->msg_attr_vec_);
  305.   
  306.   // Add the extra attribute
  307.   send_message->msg_attr_vec_->push_back(original_header_attr);
  308.   send_message->num_attr_ = send_message->msg_attr_vec_->size();
  309.   send_message->data_len_ = CalculateSize(send_message->msg_attr_vec_);
  310.   sendMessageToLibrary(send_message, filter_entry->agent_);
  311.   delete send_message;
  312.   delete original_hdr;
  313. }
  314. #ifndef NS_DIFFUSION
  315. void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
  316. {
  317.   DiffPacket out_pkt = NULL;
  318.   struct hdr_diff *dfh;
  319.   int len;
  320.   char *pos;
  321.   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
  322.   dfh = HDR_DIFF(out_pkt);
  323.   pos = (char *) out_pkt;
  324.   pos = pos + sizeof(struct hdr_diff);
  325.   len = PackAttrs(msg->msg_attr_vec_, pos);
  326.   LAST_HOP(dfh) = htonl(msg->last_hop_);
  327.   NEXT_HOP(dfh) = htonl(msg->next_hop_);
  328.   DIFF_VER(dfh) = msg->version_;
  329.   MSG_TYPE(dfh) = msg->msg_type_;
  330.   DATA_LEN(dfh) = htons(len);
  331.   PKT_NUM(dfh) = htonl(msg->pkt_num_);
  332.   RDM_ID(dfh) = htonl(msg->rdm_id_);
  333.   NUM_ATTR(dfh) = htons(msg->num_attr_);
  334.   SRC_PORT(dfh) = htons(msg->source_port_);
  335.   sendPacketToLibrary(out_pkt, sizeof(struct hdr_diff) + len, agent_id);
  336.   delete [] out_pkt;
  337. }
  338. #else
  339. void DiffusionCoreAgent::sendMessageToLibrary(Message *msg, u_int16_t agent_id)
  340. {
  341.   Message *send_message;
  342.   DeviceList::iterator device_itr;
  343.   int len;
  344.   send_message = CopyMessage(msg);
  345.   len = CalculateSize(send_message->msg_attr_vec_);
  346.   len = len + sizeof(struct hdr_diff);
  347.   for (device_itr = local_out_devices_.begin();
  348.        device_itr != local_out_devices_.end(); ++device_itr){
  349.     (*device_itr)->sendPacket((DiffPacket) send_message, len, agent_id);
  350.   }
  351. }
  352. #endif // !NS_DIFFUSION
  353. #ifndef NS_DIFFUSION
  354. void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
  355. {
  356.   DiffPacket out_pkt = NULL;
  357.   struct hdr_diff *dfh;
  358.   int len;
  359.   char *pos;
  360.   out_pkt = AllocateBuffer(msg->msg_attr_vec_);
  361.   dfh = HDR_DIFF(out_pkt);
  362.   pos = (char *) out_pkt;
  363.   pos = pos + sizeof(struct hdr_diff);
  364.   len = PackAttrs(msg->msg_attr_vec_, pos);
  365.   LAST_HOP(dfh) = htonl(msg->last_hop_);
  366.   NEXT_HOP(dfh) = htonl(msg->next_hop_);
  367.   DIFF_VER(dfh) = msg->version_;
  368.   MSG_TYPE(dfh) = msg->msg_type_;
  369.   DATA_LEN(dfh) = htons(len);
  370.   PKT_NUM(dfh) = htonl(msg->pkt_num_);
  371.   RDM_ID(dfh) = htonl(msg->rdm_id_);
  372.   NUM_ATTR(dfh) = htons(msg->num_attr_);
  373.   SRC_PORT(dfh) = htons(msg->source_port_);
  374.   sendPacketToNetwork(out_pkt, sizeof(struct hdr_diff) + len, msg->next_hop_);
  375.   delete [] out_pkt;
  376. }
  377. #else
  378. void DiffusionCoreAgent::sendMessageToNetwork(Message *msg)
  379. {
  380.   Message *send_message;
  381.   int len;
  382.   int32_t dst;
  383.   DeviceList::iterator device_itr;
  384.   send_message = CopyMessage(msg);
  385.   len = CalculateSize(send_message->msg_attr_vec_);
  386.   len = len + sizeof(struct hdr_diff);
  387.   dst = send_message->next_hop_;
  388.   for (device_itr = out_devices_.begin();
  389.        device_itr != out_devices_.end(); ++device_itr){
  390.     (*device_itr)->sendPacket((DiffPacket) send_message, len, dst);
  391.   }
  392. }
  393. #endif // !NS_DIFFUSION
  394. void DiffusionCoreAgent::sendPacketToLibrary(DiffPacket pkt, int len,
  395.      u_int16_t dst)
  396. {
  397.   DeviceList::iterator device_itr;
  398.   for (device_itr = local_out_devices_.begin();
  399.        device_itr != local_out_devices_.end(); ++device_itr){
  400.     (*device_itr)->sendPacket(pkt, len, dst);
  401.   }
  402. }
  403. void DiffusionCoreAgent::sendPacketToNetwork(DiffPacket pkt, int len, int dst)
  404. {
  405.   DeviceList::iterator device_itr;
  406.   for (device_itr = out_devices_.begin();
  407.        device_itr != out_devices_.end(); ++device_itr){
  408.     (*device_itr)->sendPacket(pkt, len, dst);
  409.   }
  410. }
  411. void DiffusionCoreAgent::updateNeighbors(int id)
  412. {
  413.   NeighborList::iterator neighbor_itr;
  414.   NeighborEntry *neighbor_entry;
  415.   if (id == LOCALHOST_ADDR || id == my_id_)
  416.     return;
  417.   for (neighbor_itr = neighbor_list_.begin();
  418.        neighbor_itr != neighbor_list_.end(); ++neighbor_itr){
  419.     if ((*neighbor_itr)->id == id)
  420.       break;
  421.   }
  422.   if (neighbor_itr == neighbor_list_.end()){
  423.     // This is a new neighbor
  424.     neighbor_entry = new NeighborEntry(id);
  425.     neighbor_list_.push_front(neighbor_entry);
  426.   }
  427.   else{
  428.     // Just update the neighbor timeout
  429.     GetTime(&((*neighbor_itr)->tmv));
  430.   }
  431. }
  432. FilterEntry * DiffusionCoreAgent::findFilter(int16_t handle, u_int16_t agent)
  433. {
  434.   FilterList::iterator filter_itr;
  435.   FilterEntry *filter_entry;
  436.   for (filter_itr = filter_list_.begin();
  437.        filter_itr != filter_list_.end(); ++filter_itr){
  438.     filter_entry = *filter_itr;
  439.     if (handle != filter_entry->handle_ || agent != filter_entry->agent_)
  440.       continue;
  441.     // Found
  442.     return filter_entry;
  443.   }
  444.   return NULL;
  445. }
  446. FilterEntry * DiffusionCoreAgent::deleteFilter(int16_t handle, u_int16_t agent)
  447. {
  448.   FilterList::iterator filter_itr = filter_list_.begin();
  449.   FilterEntry *filter_entry = NULL;
  450.   while (filter_itr != filter_list_.end()){
  451.     filter_entry = *filter_itr;
  452.     if (handle == filter_entry->handle_ && agent == filter_entry->agent_){
  453.       filter_list_.erase(filter_itr);
  454.       break;
  455.     }
  456.     filter_entry = NULL;
  457.     filter_itr++;
  458.   }
  459.   return filter_entry;
  460. }
  461. bool DiffusionCoreAgent::addFilter(NRAttrVec *attrs, u_int16_t agent,
  462.    int16_t handle, u_int16_t priority)
  463. {
  464.   FilterList::iterator filter_itr;
  465.   FilterEntry *filter_entry;
  466.   filter_itr = filter_list_.begin();
  467.   while (filter_itr != filter_list_.end()){
  468.     filter_entry = *filter_itr;
  469.     if (filter_entry->priority_ == priority)
  470.       return false;
  471.     filter_itr++;
  472.   }
  473.   filter_entry = new FilterEntry(handle, priority, agent);
  474.   // Copy the Attribute Vector
  475.   filter_entry->filter_attrs_ = CopyAttrs(attrs);
  476.   // Add this filter to the filter list
  477.   filter_list_.push_back(filter_entry);
  478.   return true;
  479. }
  480. FilterList::iterator DiffusionCoreAgent::findMatchingFilter(NRAttrVec *attrs,
  481.     FilterList::iterator filter_itr)
  482. {
  483.   FilterEntry *filter_entry;
  484.   for (;filter_itr != filter_list_.end(); ++filter_itr){
  485.     filter_entry = *filter_itr;
  486.     if (OneWayMatch(filter_entry->filter_attrs_, attrs)){
  487.       // That's a match !
  488.       break;
  489.     }
  490.   }
  491.   return filter_itr;
  492. }
  493. bool DiffusionCoreAgent::restoreOriginalHeader(Message *msg)
  494. {
  495.   NRAttrVec::iterator attr_itr = msg->msg_attr_vec_->begin();
  496.   NRSimpleAttribute<void *> *original_header_attr = NULL;
  497.   RedirectMessage *original_hdr;
  498.   // Find original Header
  499.   original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
  500.    attr_itr, &attr_itr);
  501.   if (!original_header_attr){
  502.     DiffPrint(DEBUG_ALWAYS, "Error: DiffusionCoreAgent::ProcessControlMessage couldn't find the OriginalHdrAttr !n");
  503.     return false;
  504.   }
  505.   // Restore original Header
  506.   original_hdr = (RedirectMessage *) original_header_attr->getVal();
  507.   msg->msg_type_ = original_hdr->msg_type_;
  508.   msg->source_port_ = original_hdr->source_port_;
  509.   msg->pkt_num_ = original_hdr->pkt_num_;
  510.   msg->rdm_id_ = original_hdr->rdm_id_;
  511.   msg->next_hop_ = original_hdr->next_hop_;
  512.   msg->last_hop_ = original_hdr->last_hop_;
  513.   msg->new_message_ = original_hdr->new_message_;
  514.   msg->num_attr_ = original_hdr->num_attr_;
  515.   msg->data_len_ = original_hdr->data_len_;
  516.   msg->next_port_ = original_hdr->next_port_;
  517.   // Delete attribute from original set
  518.   msg->msg_attr_vec_->erase(attr_itr);
  519.   delete original_header_attr;
  520.   return true;
  521. }
  522. FilterList * DiffusionCoreAgent::getFilterList(NRAttrVec *attrs)
  523. {
  524.   FilterList *matching_filter_list = new FilterList;
  525.   FilterList::iterator known_filters_itr, filter_list_itr;
  526.   FilterEntry *matching_filter_entry, *filter_entry;
  527.   // We need to come up with a list of filters to call
  528.   // F1 will be called before F2 if F1->priority > F2->priority
  529.   known_filters_itr = findMatchingFilter(attrs, filter_list_.begin());
  530.   while (known_filters_itr != filter_list_.end()){
  531.     // We have a match !
  532.     matching_filter_entry = *known_filters_itr;
  533.     for (filter_list_itr = matching_filter_list->begin();
  534.  filter_list_itr != matching_filter_list->end(); ++filter_list_itr){
  535.       filter_entry = *filter_list_itr;
  536.       // Figure out where to insert 
  537.       if (matching_filter_entry->priority_ > filter_entry->priority_)
  538. break;
  539.     }
  540.     // Insert matching filter in the list
  541.     matching_filter_list->insert(filter_list_itr, matching_filter_entry);
  542.     // Continue the search
  543.     known_filters_itr++;
  544.     known_filters_itr = findMatchingFilter(attrs, known_filters_itr);
  545.   }
  546.   return matching_filter_list;
  547. }
  548. u_int16_t DiffusionCoreAgent::getNextFilterPriority(int16_t handle,
  549.     u_int16_t priority,
  550.     u_int16_t agent)
  551. {
  552.   FilterList::iterator filter_itr;
  553.   FilterEntry *filter_entry;
  554.   if ((priority < FILTER_MIN_PRIORITY) ||
  555.       (priority > FILTER_KEEP_PRIORITY))
  556.     return FILTER_INVALID_PRIORITY;
  557.   if (priority < FILTER_KEEP_PRIORITY)
  558.     return (priority - 1);
  559.   filter_itr = filter_list_.begin();
  560.   while (filter_itr != filter_list_.end()){
  561.     filter_entry = *filter_itr;
  562.     if ((filter_entry->handle_ == handle) && (filter_entry->agent_ == agent)){
  563.       // Found this filter
  564.       return (filter_entry->priority_ - 1);
  565.     }
  566.     filter_itr++;
  567.   }
  568.   return FILTER_INVALID_PRIORITY;
  569. }
  570. void DiffusionCoreAgent::processMessage(Message *msg)
  571. {
  572.   FilterList *filter_list;
  573.   FilterList::iterator filter_list_itr;
  574.   FilterEntry *filter_entry;
  575.   filter_list = getFilterList(msg->msg_attr_vec_);
  576.   // Ok, we have a list of Filters to call. Send this message
  577.   // to the first filter on this list
  578.   if (filter_list->size() > 0){
  579.     filter_list_itr = filter_list->begin();
  580.     filter_entry = *filter_list_itr;
  581.     forwardMessage(msg, filter_entry);
  582.     filter_list->clear();
  583.   }
  584.   delete filter_list;
  585. }
  586. void DiffusionCoreAgent::processControlMessage(Message *msg)
  587. {
  588.   NRSimpleAttribute<void *> *ctrl_msg_attr = NULL;
  589.   NRAttrVec::iterator attr_itr;
  590.   ControlMessage *control_blob = NULL;
  591.   FilterList *filter_list;
  592.   FilterList::iterator filter_list_itr;
  593.   FilterEntry *filter_entry;
  594.   int command, param1, param2;
  595.   u_int16_t priority, source_port, new_priority;
  596.   int16_t handle;
  597.   bool filter_is_last = false;
  598.   // Control messages should not come from other nodes
  599.   if (msg->last_hop_ != LOCALHOST_ADDR){
  600.     DiffPrint(DEBUG_ALWAYS,
  601.       "Error: Received control message from another node !n");
  602.     return;
  603.   }
  604.   // Find the control attribute
  605.   attr_itr = msg->msg_attr_vec_->begin();
  606.   ctrl_msg_attr = ControlMsgAttr.find_from(msg->msg_attr_vec_,
  607.    attr_itr, &attr_itr);
  608.   if (!ctrl_msg_attr){
  609.     // Control message is invalid
  610.     DiffPrint(DEBUG_ALWAYS, "Error: Control message received is invalid !n");
  611.     return;
  612.   }
  613.   // Extract the control message info
  614.   control_blob = (ControlMessage *) ctrl_msg_attr->getVal();
  615.   command = control_blob->command_;
  616.   param1 = control_blob->param1_;
  617.   param2 = control_blob->param2_;
  618.   // Filter API definitions
  619.   //
  620.   // command = ADD_UPDATE_FILTER
  621.   // param1  = priority
  622.   // param2  = handle
  623.   // attrs   = other attrs specify the filter
  624.   // 
  625.   // Remarks: If this filter is already present for this module,
  626.   //          we don't create a new one. A filter is identified
  627.   //          by the handle and the originating agent. The filter
  628.   //          gets refreshed if it already exists. If attrs and
  629.   //          handle are the same, we update the priority.
  630.   //
  631.   //
  632.   // command = REMOVE_FILTER
  633.   // param1  = handle
  634.   //
  635.   // Remarks: Remove the filter identified by (agent, handle)
  636.   //          If it's not found, a warning message is generated.
  637.   //
  638.   //
  639.   // Remarks: Send message from a local App to another App or
  640.   //          a neighbor. If agent_id is zero, the packet goes
  641.   //          out to the network. Otherwise, it goes to the
  642.   //          agent_id located on this node.
  643.   //
  644.   //
  645.   // command = SEND_MESSAGE
  646.   // param1  = handle
  647.   // param2  = priority
  648.   //
  649.   // Remarks: Send this message to the next filter or to a local
  650.   //          application. We have to assemble the list again
  651.   //          and figure out the current agent's position on the
  652.   //          list. Then, we send to the next guy. If there is
  653.   //          no other filter in the list, we try to send it to
  654.   //          the network, if next_hop contains a node address.
  655.   logControlMessage(msg, command, param1, param2);
  656.   // First we remove the control attribute from the message
  657.   msg->msg_attr_vec_->erase(attr_itr);
  658.   delete ctrl_msg_attr;
  659.   switch(command){
  660.   case ADD_UPDATE_FILTER:
  661.     priority = param1;
  662.     handle = param2;
  663.     filter_entry = findFilter(handle, msg->source_port_);
  664.     if (filter_entry){
  665.       // Filter already present, must be an update message
  666.       if (PerfectMatch(filter_entry->filter_attrs_, msg->msg_attr_vec_)){
  667. // Attrs also match, let's update the filter's timeout
  668. GetTime(&(filter_entry->tmv_));
  669. // Check if the priority has changed...
  670. if (priority == filter_entry->priority_){
  671.   // Nothing to do !
  672.   DiffPrint(DEBUG_SOME_DETAILS, "Filter %d, %d, %d refreshed.n",
  673.     filter_entry->agent_, filter_entry->handle_,
  674.     filter_entry->priority_);
  675. }
  676. else{
  677.   // Update the priority
  678.   DiffPrint(DEBUG_NO_DETAILS,
  679.     "Updated priority of filter %d, %d, %d to %dn",
  680.     msg->source_port_, handle, filter_entry->priority_, priority);
  681.   filter_entry->priority_ = priority;
  682. }
  683. break;
  684.       }
  685.       else{
  686. // Filter attributes have changed ! This is not allowed !
  687. DiffPrint(DEBUG_ALWAYS,
  688.   "Filter attributes cannot change during an update !n");
  689. break;
  690.       }
  691.     }
  692.     else{
  693.       // This is a new filter
  694.       if (!addFilter(msg->msg_attr_vec_, msg->source_port_, handle, priority)){
  695. DiffPrint(DEBUG_ALWAYS, "Failed to add filter %d, %d, %dn",
  696.   msg->source_port_, handle, priority);
  697.       }
  698.       else{
  699. DiffPrint(DEBUG_NO_DETAILS, "Adding filter %d, %d, %dn",
  700.   msg->source_port_, handle, priority);
  701.       }
  702.     }
  703.     break;
  704.   case REMOVE_FILTER:
  705.     handle = param1;
  706.     filter_entry = deleteFilter(handle, msg->source_port_);
  707.     if (filter_entry){
  708.       // Filter deleted
  709.       DiffPrint(DEBUG_NO_DETAILS, "Filter %d, %d, %d deleted.n",
  710. filter_entry->agent_, filter_entry->handle_,
  711. filter_entry->priority_);
  712.       delete filter_entry;
  713.     }
  714.     else{
  715.       DiffPrint(DEBUG_ALWAYS, "Couldn't find filter to delete !n");
  716.     }
  717.     break;
  718.   case SEND_MESSAGE:
  719.     handle = param1;
  720.     priority = param2;
  721.     source_port = msg->source_port_;
  722.     if (!restoreOriginalHeader(msg))
  723.       break;
  724.     new_priority = getNextFilterPriority(handle, priority, source_port);
  725.     if (new_priority == FILTER_INVALID_PRIORITY)
  726.       break;
  727.     // Now process the incoming message
  728.     filter_list = getFilterList(msg->msg_attr_vec_);
  729.     // Find the filter after the 'current' filter on the list
  730.     if (filter_list->size() > 0){
  731.       for (filter_list_itr = filter_list->begin();
  732.    filter_list_itr != filter_list->end(); ++filter_list_itr){
  733. filter_entry = *filter_list_itr;
  734. if (filter_entry->priority_ <= new_priority){
  735.   forwardMessage(msg, filter_entry);
  736.   break;
  737. }
  738.       }
  739.       if (filter_list_itr == filter_list->end())
  740. filter_is_last = true;
  741.     }
  742.     else{
  743.       filter_is_last = true;
  744.     }
  745.     if (filter_is_last){
  746.       // Forward message to the network or the destination application
  747.       sendMessage(msg);
  748.     }
  749.     filter_list->clear();
  750.     delete filter_list;
  751.     break;
  752.   case ADD_TO_BLACKLIST:
  753.     DiffPrint(DEBUG_IMPORTANT, "Diffusion: Adding node %d to blacklist !n",
  754.       param1);
  755.     black_list_.push_front(param1);
  756.     break;
  757.   case CLEAR_BLACKLIST:
  758.     DiffPrint(DEBUG_IMPORTANT, "Diffusion: Clearing blacklist !n");
  759.     black_list_.clear();
  760.     break;
  761.   default:
  762.     DiffPrint(DEBUG_ALWAYS, "Error: Unknown control message received !n");
  763.     break;
  764.   }
  765. }
  766. void DiffusionCoreAgent::logControlMessage(Message *msg, int command,
  767.    int param1, int param2)
  768. {
  769.   // Logs the incoming message
  770. }
  771. #ifdef NS_DIFFUSION
  772. DiffusionCoreAgent::DiffusionCoreAgent(DiffRoutingAgent *diffrtg, int nodeid)
  773. {
  774. #else
  775. DiffusionCoreAgent::DiffusionCoreAgent(int argc, char **argv)
  776. {
  777.   int opt;
  778.   int debug_level;
  779. #endif // NS_DIFFUSION
  780.   DeviceList *in_devices, *out_devices, *local_out_devices;
  781.   DiffusionIO *device;
  782.   TimerCallback *callback;
  783.   char *node_id_env;
  784. #ifdef USE_EMSIM
  785.   char *sim_id = getenv("SIM_ID");
  786.   char *sim_group = getenv("SIM_GROUP");
  787.   int32_t group_id;
  788. #endif // USE_EMSIM
  789.   long stop_time;
  790.   struct timeval tv;
  791. #ifdef IO_LOG
  792.   IOLog *pseudo_io_device;
  793.   bool use_io_log = false;
  794. #endif // IO_LOG
  795. #ifdef STATS
  796.   bool use_io_stats = true;
  797.   int stats_warm_up_time = 0;
  798. #endif // STATS
  799.   //bool node_id_configured = false;
  800.   opterr = 0;
  801.   config_file_ = NULL;
  802.   stop_time = 0;
  803.   node_id_env = getenv("node_addr");
  804.   diffusion_port_ = DEFAULT_DIFFUSION_PORT;
  805. #ifndef NS_DIFFUSION
  806.   // Parse command line options
  807.   while (1){
  808.     opt = getopt(argc, argv, COMMAND_LINE_ARGS);
  809.     switch(opt){
  810.     case 'p':
  811.       diffusion_port_ = (u_int16_t) atoi(optarg);
  812.       if ((diffusion_port_ < 1024) || (diffusion_port_ >= 65535)){
  813. DiffPrint(DEBUG_ALWAYS,
  814.   "Diffusion Error: Port must be between 1024 and 65535 !n");
  815. exit(-1);
  816.       }
  817.       break;
  818.     case 't':
  819.       stop_time = atol(optarg);
  820.       if (stop_time <= 0){
  821. DiffPrint(DEBUG_ALWAYS, "Diffusion Error: stop time must be > 0n");
  822. exit(-1);
  823.       }
  824.       else{
  825. DiffPrint(DEBUG_ALWAYS, "%s will stop after %ld secondsn",
  826.   PROGRAM, stop_time);
  827.       }
  828.       break;
  829. #ifdef IO_LOG
  830.     case 'l':
  831.       use_io_log = true;
  832.       break;
  833. #endif // IO_LOG
  834. #ifdef STATS
  835.     case 's':
  836.       use_io_stats = false;
  837.       break;
  838.     case 'i':
  839.       stats_warm_up_time = atoi(optarg);
  840.       if (stats_warm_up_time < 0){
  841. DiffPrint(DEBUG_ALWAYS, "Diffusion Error: warm_up_time must be > 0n");
  842. exit(-1);
  843.       }
  844.       break;
  845. #endif // STATS
  846.     case 'h':
  847.       usage(argv[0]);
  848.       break;
  849.     case 'v':
  850.       DiffPrint(DEBUG_ALWAYS, "n%s %sn", PROGRAM, RELEASE);
  851.       exit(0);
  852.       break;
  853.     case 'd':
  854.       debug_level = atoi(optarg);
  855.       if (debug_level < 1 || debug_level > 10){
  856. DiffPrint(DEBUG_ALWAYS,
  857.   "Error: Debug level outside range or missing !n");
  858. usage(argv[0]);
  859.       }
  860.       global_debug_level = debug_level;
  861.       break;
  862.     case 'f':
  863.       if (!strncasecmp(optarg, "-", 1)){
  864. DiffPrint(DEBUG_ALWAYS, "Error: Parameter is missing !n");
  865. usage(argv[0]);
  866.       }
  867.       config_file_ = strdup(optarg);
  868.       break;
  869.     case '?':
  870.       DiffPrint(DEBUG_ALWAYS,
  871. "Error: %c isn't a valid option or its parameter is missing !n",
  872. optopt);
  873.       usage(argv[0]);
  874.       break;
  875.     case ':':
  876.       DiffPrint(DEBUG_ALWAYS, "Parameter missing !n");
  877.       usage(argv[0]);
  878.       break;
  879.     }
  880.     if (opt == -1)
  881.       break;
  882.   }
  883.   if (!config_file_)
  884.     config_file_ = strdup(DEFAULT_CONFIG_FILE);
  885.   // Get diffusion ID
  886.   if (!node_id_configured){
  887.     // Try to get id from environment variable
  888.     if (node_id_env != NULL){
  889.       my_id_ = atoi(node_id_env);
  890.       node_id_configured = true;
  891.     }
  892.   }
  893. #ifdef USE_EMSIM
  894.   if (!node_id_configured){
  895.     // Try to read groups and node id from emsim environment variables
  896.     if (sim_id && sim_group){
  897.       my_id_ = atoi(sim_id);
  898.       group_id = atoi(sim_group);
  899.       diffusion_port_ = diffusion_port_ + my_id_ + (100 * group_id);
  900.       node_id_configured = true;
  901.     }
  902.   }
  903. #endif // USE_EMSIM
  904.   // Use random node id if user has not specified it
  905.   if (!node_id_configured){
  906.     DiffPrint(DEBUG_ALWAYS, "Diffusion : node_addr not set. Using random id.n");
  907.     // Generate random ID
  908.     do{
  909.       GetTime(&tv);
  910.       SetSeed(&tv);
  911.       my_id_ = GetRand();
  912.     }
  913.     while(my_id_ == LOCALHOST_ADDR || my_id_ == BROADCAST_ADDR);
  914.   }
  915. #else
  916.   my_id_ = nodeid;
  917. #endif // !NS_DIFFUSION
  918.   // Initialize variables
  919.   lon_ = 0.0;
  920.   lat_ = 0.0;
  921. #ifdef STATS
  922.   if (use_io_stats)
  923.     stats_ = new DiffusionStats(my_id_, stats_warm_up_time);
  924.   else
  925.     stats_ = NULL;
  926. #endif // STATS
  927.   GetTime(&tv);
  928.   SetSeed(&tv);
  929.   pkt_count_ = GetRand();
  930.   random_id_ = GetRand();
  931.   Tcl_InitHashTable(&htable_, 2);
  932.   // Initialize EventQueue
  933.   timers_manager_ = new TimerManager;
  934.   // Create regular timers
  935.   callback = new NeighborsTimeoutTimer(this);
  936.   timers_manager_->addTimer(NEIGHBORS_DELAY, callback);
  937.   callback = new FilterTimeoutTimer(this);
  938.   timers_manager_->addTimer(FILTER_DELAY, callback);
  939.   if (stop_time > 0){
  940.     callback = new DiffusionStopTimer(this);
  941.     timers_manager_->addTimer((stop_time * 1000), callback);
  942.   }
  943.   GetTime(&tv);
  944.   // Print Initialization message
  945.   DiffPrint(DEBUG_ALWAYS, "Diffusion : starting at time %ld:%ldn",
  946.     tv.tv_sec, tv.tv_usec);
  947.   DiffPrint(DEBUG_ALWAYS, "Diffusion : Node id = %dn", my_id_);
  948.   // Initialize diffusion io devices
  949. #ifdef IO_LOG
  950.   if (use_io_log){
  951.     pseudo_io_device = new IOLog(my_id_);
  952.     in_devices_.push_back(pseudo_io_device);
  953.     out_devices_.push_back(pseudo_io_device);
  954.     in_devices = &(pseudo_io_device->in_devices_);
  955.     out_devices = &(pseudo_io_device->out_devices_);
  956.     local_out_devices = &(local_out_devices_);
  957.   }
  958.   else{
  959.     in_devices = &(in_devices_);
  960.     out_devices = &(out_devices_);
  961.     local_out_devices = &(local_out_devices_);
  962.   }
  963. #else
  964.   in_devices = &(in_devices_);
  965.   out_devices = &(out_devices_);
  966.   local_out_devices = &(local_out_devices_);
  967. #endif // IO_LOG
  968. #ifdef NS_DIFFUSION
  969.   device = new LocalApp(diffrtg);
  970.   local_out_devices->push_back(device);
  971.   device = new LinkLayerAbs(diffrtg);
  972.   out_devices->push_back(device);
  973. #endif // NS_DIFFUSION
  974. #ifdef UDP
  975.   device = new UDPLocal(&diffusion_port_);
  976.   in_devices->push_back(device);
  977.   local_out_devices->push_back(device);
  978. #ifdef WIRED
  979.   device = new UDPWired(config_file_);
  980.   out_devices->push_back(device);
  981. #endif // WIRED
  982. #endif // UDP
  983. #ifdef USE_RPC
  984.   device = new RPCIO();
  985.   in_devices->push_back(device);
  986.   out_devices->push_back(device);
  987. #endif // USE_RPC
  988. #ifdef USE_MOTE_NIC
  989.   device = new MOTEIO();
  990.   in_devices->push_back(device);
  991.   out_devices->push_back(device);
  992. #endif // USE_MOTE_NIC
  993. #ifdef USE_SMAC
  994.   device = new SMAC();
  995.   in_devices->push_back(device);
  996.   out_devices->push_back(device);
  997. #endif // USE_SMAC
  998. #ifdef USE_EMSTAR
  999. #ifdef USE_EMSIM
  1000.   device = new Emstar(my_id_, group_id, true);
  1001. #else
  1002.   device = new Emstar();
  1003. #endif // USE_EMSIM
  1004.   in_devices->push_back(device);
  1005.   out_devices->push_back(device);
  1006. #endif // USE_EMSTAR
  1007. #ifdef USE_WINSNG2
  1008.   device = new WINSNG2();
  1009.   in_devices->push_back(device);
  1010.   out_devices->push_back(device);
  1011. #endif // USE_WINSNG2
  1012. }
  1013. HashEntry * DiffusionCoreAgent::getHash(unsigned int pkt_num,
  1014.  unsigned int rdm_id)
  1015. {
  1016.   unsigned int key[2];
  1017.   key[0] = pkt_num;
  1018.   key[1] = rdm_id;
  1019.   Tcl_HashEntry *entryPtr = Tcl_FindHashEntry(&htable_, (char *)key);
  1020.   if (entryPtr == NULL)
  1021.     return NULL;
  1022.   return (HashEntry *)Tcl_GetHashValue(entryPtr);
  1023. }
  1024. void DiffusionCoreAgent::putHash(unsigned int pkt_num,
  1025.  unsigned int rdm_id)
  1026. {
  1027.   Tcl_HashEntry *tcl_hash_entry;
  1028.   HashEntry *hash_entry;
  1029.   HashList::iterator hash_itr;
  1030.   unsigned int key[2];
  1031.   int new_hash_key;
  1032.   if (hash_list_.size() == HASH_TABLE_MAX_SIZE){
  1033.     // Hash table reached maximum size
  1034.     for (int i = 0; ((i < HASH_TABLE_REMOVE_AT_ONCE)
  1035.      && (hash_list_.size() > 0)); i++){
  1036.       hash_itr = hash_list_.begin();
  1037.       tcl_hash_entry = *hash_itr;
  1038.       hash_entry = (HashEntry *) Tcl_GetHashValue(tcl_hash_entry);
  1039.       delete hash_entry;
  1040.       hash_list_.erase(hash_itr);
  1041.       Tcl_DeleteHashEntry(tcl_hash_entry);
  1042.     }
  1043.   }
  1044.   key[0] = pkt_num;
  1045.   key[1] = rdm_id;
  1046.   tcl_hash_entry = Tcl_CreateHashEntry(&htable_, (char *)key, &new_hash_key);
  1047.   if (new_hash_key == 0){
  1048.     DiffPrint(DEBUG_IMPORTANT, "Key already exists in hash !n");
  1049.     return;
  1050.   }
  1051.   hash_entry = new HashEntry;
  1052.   Tcl_SetHashValue(tcl_hash_entry, hash_entry);
  1053.   hash_list_.push_back(tcl_hash_entry);
  1054. }
  1055. #ifndef NS_DIFFUSION
  1056. void DiffusionCoreAgent::recvPacket(DiffPacket pkt)
  1057. {
  1058.   struct hdr_diff *dfh = HDR_DIFF(pkt);
  1059.   Message *rcv_message = NULL;
  1060.   int8_t version, msg_type;
  1061.   u_int16_t data_len, num_attr, source_port;
  1062.   int32_t rdm_id, pkt_num, next_hop, last_hop;   
  1063.   // Read header
  1064.   version = DIFF_VER(dfh);
  1065.   msg_type = MSG_TYPE(dfh);
  1066.   source_port = ntohs(SRC_PORT(dfh));
  1067.   pkt_num = ntohl(PKT_NUM(dfh));
  1068.   rdm_id = ntohl(RDM_ID(dfh));
  1069.   num_attr = ntohs(NUM_ATTR(dfh));
  1070.   next_hop = ntohl(NEXT_HOP(dfh));
  1071.   last_hop = ntohl(LAST_HOP(dfh));
  1072.   data_len = ntohs(DATA_LEN(dfh));
  1073.   // Packet is good, create a message
  1074.   rcv_message = new Message(version, msg_type, source_port, data_len,
  1075.     num_attr, pkt_num, rdm_id, next_hop, last_hop);
  1076.   // Read all attributes into the Message structure
  1077.   rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
  1078.   // Process the incoming message
  1079.   recvMessage(rcv_message);
  1080.   // Don't forget to message when we're done
  1081.   delete rcv_message;
  1082.   delete [] pkt;
  1083. }
  1084. #endif // !NS_DIFFUSION
  1085. void DiffusionCoreAgent::recvMessage(Message *msg)
  1086. {
  1087.   BlackList::iterator black_list_itr;
  1088.   Tcl_HashEntry *tcl_hash_entry;
  1089.   unsigned int key[2];
  1090.   // Check version
  1091.   if (msg->version_ != DIFFUSION_VERSION)
  1092.     return;
  1093.   // Check for ID conflict
  1094.   if (msg->last_hop_ == my_id_){
  1095.     DiffPrint(DEBUG_ALWAYS, "Error: A diffusion ID conflict has been detected !n");
  1096.     exit(-1);
  1097.   }
  1098.   // Address filtering
  1099.   if ((msg->next_hop_ != BROADCAST_ADDR) &&
  1100.       (msg->next_hop_ != LOCALHOST_ADDR) &&
  1101.       (msg->next_hop_ != my_id_))
  1102.     return;
  1103.   // Blacklist filtering
  1104.   black_list_itr = black_list_.begin();
  1105.   while (black_list_itr != black_list_.end()){
  1106.     if (*black_list_itr == msg->last_hop_){
  1107.       DiffPrint(DEBUG_DETAILS, "Ignoring message from blacklisted node %d !n",
  1108. msg->last_hop_);
  1109.       return;
  1110.     }
  1111.     black_list_itr++;
  1112.   }
  1113.   // Control Messages are unique and don't go to the hash
  1114.   if (msg->msg_type_ != CONTROL){
  1115.     // Hash table keeps info about packets
  1116.   
  1117.     key[0] = msg->pkt_num_;
  1118.     key[1] = msg->rdm_id_;
  1119.     tcl_hash_entry = Tcl_FindHashEntry(&htable_, (char *) key);
  1120.     if (tcl_hash_entry != NULL){
  1121.       DiffPrint(DEBUG_DETAILS, "Node%d: Received old message !n", my_id_);
  1122.       msg->new_message_ = 0;
  1123.     }
  1124.     else{
  1125.       // Add message to the hash table
  1126.       putHash(key[0], key[1]);
  1127.       msg->new_message_ = 1;
  1128.     }
  1129.   }
  1130. #ifdef STATS
  1131.   if (stats_)
  1132.     stats_->logIncomingMessage(msg);
  1133. #endif // STATS
  1134.   // Check if it's a control of a regular message
  1135.   if (msg->msg_type_ == CONTROL)
  1136.     processControlMessage(msg);
  1137.   else
  1138.     processMessage(msg);
  1139. }
  1140. #ifndef USE_SINGLE_ADDRESS_SPACE
  1141. int main(int argc, char **argv)
  1142. {
  1143.   agent = new DiffusionCoreAgent(argc, argv);
  1144.   signal(SIGINT, signal_handler);
  1145.   agent->run();
  1146.   return 0;
  1147. }
  1148. #endif // !USE_SINGLE_ADDRESS_SPACE