dr.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:40k
- //
- // dr.cc : Diffusion Routing Class
- // authors : John Heidemann and Fabio Silva
- //
- // Copyright (C) 2000-2003 by the University of Southern California
- // $Id: dr.cc,v 1.17 2005/09/13 04:53:49 tomh Exp $
- //
- // This program is free software; you can redistribute it and/or
- // modify it under the terms of the GNU General Public License,
- // version 2, as published by the Free Software Foundation.
- //
- // This program is distributed in the hope that it will be useful,
- // but WITHOUT ANY WARRANTY; without even the implied warranty of
- // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- // GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License along
- // with this program; if not, write to the Free Software Foundation, Inc.,
- // 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
- //
- // Linking this file statically or dynamically with other modules is making
- // a combined work based on this file. Thus, the terms and conditions of
- // the GNU General Public License cover the whole combination.
- //
- // In addition, as a special exception, the copyright holders of this file
- // give you permission to combine this file with free software programs or
- // libraries that are released under the GNU LGPL and with code included in
- // the standard release of ns-2 under the Apache 2.0 license or under
- // otherwise-compatible licenses with advertising requirements (or modified
- // versions of such code, with unchanged license). You may copy and
- // distribute such a system following the terms of the GNU GPL for this
- // file and the licenses of the other code concerned, provided that you
- // include the source code of that other code when and as the GNU GPL
- // requires distribution of source code.
- //
- // Note that people who make modified versions of this file are not
- // obligated to grant this special exception for their modified versions;
- // it is their choice whether to do so. The GNU General Public License
- // gives permission to release a modified version without this exception;
- // this exception also makes it possible to release a modified version
- // which carries forward this exception.
- //
- #include <stdlib.h>
- #include <stdio.h>
- #include "dr.hh"
- class CallbackEntry {
- public:
- NR::Callback *cb_;
- NR::handle subscription_handle_;
- CallbackEntry(NR::Callback *cb, NR::handle subscription_handle) :
- cb_(cb), subscription_handle_(subscription_handle) {};
- };
- class HandleEntry {
- public:
- handle hdl_;
- bool valid_;
- NRAttrVec *attrs_;
- NR::Callback *cb_;
- struct timeval exploratory_time_;
- int32_t subscription_id_; // Used for One-Phase Pull
- HandleEntry()
- {
- GetTime(&exploratory_time_);
- valid_ = true;
- cb_ = NULL;
- };
- ~HandleEntry(){
- ClearAttrs(attrs_);
- delete attrs_;
- };
- };
- int InterestCallback::expire()
- {
- int retval;
- // Call the interestTimeout function
- retval = drt_->interestTimeout(handle_entry_);
- if (retval < 0)
- delete this;
- return retval;
- }
- int FilterKeepaliveCallback::expire()
- {
- int retval;
- // Call the filterTimeout function
- retval = drt_->filterKeepaliveTimeout(filter_entry_);
- if (retval < 0)
- delete this;
- return retval;
- }
- int OldAPITimer::expire()
- {
- int retval;
- // Call the callback function with the provided API
- retval = cb_->expire(0, p_);
- if (retval < 0)
- delete this;
- return retval;
- }
- #ifdef NS_DIFFUSION
- class DiffEventQueue;
- int DiffusionRouting::getNodeId() {
- return node_->address();
- }
- int DiffusionRouting::getAgentId(int id) {
- if (id != -1)
- agent_id_ = id;
- return agent_id_;
- }
- NR * NR::create_ns_NR(u_int16_t port, DiffAppAgent *da) {
- return(new DiffusionRouting(port, da));
- }
- #else
- NR *dr = NULL;
- #ifdef USE_THREADS
- void * ReceiveThread(void *dr)
- {
- // Never returns
- ((DiffusionRouting *)dr)->run(true, WAIT_FOREVER);
- return NULL;
- }
- #endif // USE_THREADS
- NR * NR::createNR(u_int16_t port)
- {
- // Create Diffusion Routing Class
- if (dr)
- return dr;
- dr = new DiffusionRouting(port);
- #ifdef USE_THREADS
- int retval;
- pthread_t thread;
- // Fork a thread for receiving Messages
- retval = pthread_create(&thread, NULL, &ReceiveThread, (void *)dr);
- if (retval){
- DiffPrint(DEBUG_ALWAYS, "Error creating receiving thread ! Aborting...n");
- exit(-1);
- }
- #endif // USE_THREADS
- return dr;
- }
- #endif // NS_DIFFUSION
- void GetLock(pthread_mutex_t *mutex)
- {
- #ifdef USE_THREADS
- pthread_mutex_lock(mutex);
- #endif // USE_THREADS
- }
- void ReleaseLock(pthread_mutex_t *mutex)
- {
- #ifdef USE_THREADS
- pthread_mutex_unlock(mutex);
- #endif // USE_THREADS
- }
- #ifdef NS_DIFFUSION
- DiffusionRouting::DiffusionRouting(u_int16_t port, DiffAppAgent *da)
- {
- #else
- DiffusionRouting::DiffusionRouting(u_int16_t port)
- {
- #ifdef USE_EMSIM
- char *sim_id;
- char *sim_group;
- #endif // USE_EMSIM
- #endif // NS_DIFFUSION
- struct timeval tv;
- DiffusionIO *device;
- // Initialize basic stuff
- next_handle_ = 1;
- GetTime(&tv);
- SetSeed(&tv);
- pkt_count_ = GetRand();
- random_id_ = GetRand();
- agent_id_ = 0;
- if (port == 0)
- port = DEFAULT_DIFFUSION_PORT;
- diffusion_port_ = port;
- #ifdef USE_EMSIM
- // Check if we are running in the emstar simulator
- sim_id = getenv("SIM_ID");
- sim_group = getenv("SIM_GROUP");
- // Update diffusion port if running inside the simulator
- if (sim_id && sim_group){
- diffusion_port_ = diffusion_port_ + atoi(sim_id) + (100 * atoi(sim_group));
- }
- #endif // USE_EMSIM
- // Initialize timer manager
- timers_manager_ = new TimerManager;
- // Initialize input device
- #ifdef NS_DIFFUSION
- device = new NsLocal(da);
- local_out_devices_.push_back(device);
- #endif // NS_DIFFUSION
- #ifdef UDP
- device = new UDPLocal(&agent_id_);
- in_devices_.push_back(device);
- local_out_devices_.push_back(device);
- #endif // UDP
- // Print initialization message
- DiffPrint(DEBUG_ALWAYS,
- "Diffusion Routing Agent initializing... Agent Id = %dn",
- agent_id_);
- #ifdef USE_THREADS
- // Initialize Semaphores
- dr_mtx_ = new pthread_mutex_t;
- pthread_mutex_init(dr_mtx_, NULL);
- #endif // USE_THREADS
- }
- DiffusionRouting::~DiffusionRouting()
- {
- HandleList::iterator itr;
- HandleEntry *current;
- // Delete all Handles
- for (itr = sub_list_.begin(); itr != sub_list_.end(); ++itr){
- current = *itr;
- delete current;
- }
- for (itr = pub_list_.begin(); itr != pub_list_.end(); ++itr){
- current = *itr;
- delete current;
- }
- }
- handle DiffusionRouting::subscribe(NRAttrVec *subscribe_attrs, NR::Callback *cb)
- {
- NRSimpleAttribute<int> *nr_algorithm = NULL;
- TimerCallback *timer_callback;
- NRAttribute *scope_attr;
- HandleEntry *my_handle;
- // Get lock first
- GetLock(dr_mtx_);
- // Check the published attributes
- if (!checkSubscription(subscribe_attrs)){
- DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the subscribe attributes !n");
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Create and Initialize the handle_entry structute
- my_handle = new HandleEntry;
- my_handle->hdl_ = next_handle_;
- next_handle_++;
- my_handle->cb_ = (NR::Callback *) cb;
- sub_list_.push_back(my_handle);
- // Copy the attributes
- my_handle->attrs_ = CopyAttrs(subscribe_attrs);
- // For subscriptions, scope is global if not specified
- if (!hasScope(subscribe_attrs)){
- scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::GLOBAL_SCOPE);
- my_handle->attrs_->push_back(scope_attr);
- }
- // For One-Phase Pull, we need a subscription id
- nr_algorithm = NRAlgorithmAttr.find(subscribe_attrs);
- if (nr_algorithm &&
- nr_algorithm->getVal() == NRAttribute::ONE_PHASE_PULL_ALGORITHM){
- my_handle->subscription_id_ = GetRand();
- my_handle->attrs_->push_back(NRSubscriptionAttr.make(NRAttribute::IS,
- my_handle->subscription_id_));
- }
- // Create Interest Timer and add it to the queue
- timer_callback = new InterestCallback(this, my_handle);
- timers_manager_->addTimer(SMALL_TIMEOUT, timer_callback);
- // Release lock
- ReleaseLock(dr_mtx_);
- return my_handle->hdl_;
- }
- int DiffusionRouting::unsubscribe(handle subscription_handle)
- {
- HandleEntry *my_handle = NULL;
- // Get the lock first
- GetLock(dr_mtx_);
- my_handle = findHandle(subscription_handle, &sub_list_);
- if (!my_handle){
- // Handle doesn't exist, return FAIL
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Handle will be destroyed when next interest timeout happens
- my_handle->valid_ = false;
- // Release the lock
- ReleaseLock(dr_mtx_);
- return OK;
- }
- handle DiffusionRouting::publish(NRAttrVec *publish_attrs)
- {
- HandleEntry *my_handle;
- NRAttribute *scope_attr;
- // Get the lock first
- GetLock(dr_mtx_);
- // Check the published attributes
- if (!checkPublication(publish_attrs)){
- DiffPrint(DEBUG_ALWAYS, "Error : Invalid class/scope attributes in the publish attributes !n");
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Create and Initialize the handle_entry structute
- my_handle = new HandleEntry;
- my_handle->hdl_ = next_handle_;
- next_handle_++;
- pub_list_.push_back(my_handle);
- // Copy the attributes
- my_handle->attrs_ = CopyAttrs(publish_attrs);
- // For publications, scope is local if not specified
- if (!hasScope(publish_attrs)){
- scope_attr = NRScopeAttr.make(NRAttribute::IS, NRAttribute::NODE_LOCAL_SCOPE);
- my_handle->attrs_->push_back(scope_attr);
- }
- // Release the lock
- ReleaseLock(dr_mtx_);
- return my_handle->hdl_;
- }
- int DiffusionRouting::unpublish(handle publication_handle)
- {
- HandleEntry *my_handle = NULL;
- // Get the lock first
- GetLock(dr_mtx_);
- my_handle = removeHandle(publication_handle, &pub_list_);
- if (!my_handle){
- // Handle doesn't exist, return FAIL
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Free structures
- delete my_handle;
- // Release the lock
- ReleaseLock(dr_mtx_);
- return OK;
- }
- int DiffusionRouting::send(handle publication_handle,
- NRAttrVec *send_attrs)
- {
- NRSimpleAttribute<int> *nr_algorithm = NULL;
- NRSimpleAttribute<int> *rmst_id_attr = NULL;
- int8_t send_message_type = DATA;
- struct timeval current_time;
- HandleEntry *my_handle;
- Message *my_message;
- // Get the lock first
- GetLock(dr_mtx_);
- // Get attributes associated with handle
- my_handle = findHandle(publication_handle, &pub_list_);
- if (!my_handle){
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Check the send attributes
- if (!checkSend(send_attrs)){
- DiffPrint(DEBUG_ALWAYS,
- "Error : Invalid class/scope attributes in send attributes !n");
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- // Check if it is time to send another exploratory data message
- GetTime(¤t_time);
- // Check algorithms
- nr_algorithm = NRAlgorithmAttr.find(my_handle->attrs_);
- rmst_id_attr = RmstIdAttr.find(send_attrs);
- if (!nr_algorithm && !rmst_id_attr || nr_algorithm &&
- nr_algorithm->getVal() != NRAttribute::ONE_PHASE_PULL_ALGORITHM){
- // In One-Phase Pull, there are no exploratory messages
- if (TimevalCmp(¤t_time, &(my_handle->exploratory_time_)) >= 0){
- // Check if it is a push data message or a regular data message
- if (isPushData(my_handle->attrs_)){
- // Push data message
- // Update time for the next push exploratory message
- GetTime(&(my_handle->exploratory_time_));
- my_handle->exploratory_time_.tv_sec += PUSH_EXPLORATORY_DELAY;
- send_message_type = PUSH_EXPLORATORY_DATA;
- }
- else{
- // Regular data message
- // Update time for the next exploratory message
- GetTime(&(my_handle->exploratory_time_));
- my_handle->exploratory_time_.tv_sec += EXPLORATORY_DATA_DELAY;
-
- send_message_type = EXPLORATORY_DATA;
- }
- }
- }
- // Initialize message structure
- my_message = new Message(DIFFUSION_VERSION, send_message_type, agent_id_,
- 0, 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // First, we duplicate the 'publish' attributes
- my_message->msg_attr_vec_ = CopyAttrs(my_handle->attrs_);
- // Now, we add the send attributes
- AddAttrs(my_message->msg_attr_vec_, send_attrs);
- // Compute the total number and size of the joined attribute sets
- my_message->num_attr_ = my_message->msg_attr_vec_->size();
- my_message->data_len_ = CalculateSize(my_message->msg_attr_vec_);
- // Release the lock
- ReleaseLock(dr_mtx_);
- // Send Packet
- sendMessageToDiffusion(my_message);
- delete my_message;
- return OK;
- }
- int DiffusionRouting::sendRmst(handle publication_handle,
- NRAttrVec *send_attrs, int fragment_size)
- {
- NRSimpleAttribute<void *> *rmst_data_attr;
- NRSimpleAttribute<int> *frag_number_attr;
- NRSimpleAttribute<int> *max_frag_attr;
- void *frag_ptr, *blob_ptr;
- char *blob;
- timeval send_interval;
- int retval;
- int id = GetRand() % 500;
- int size;
- int num_frag;
- int max_frag_len;
- // Find RMST blob to send
- rmst_data_attr = RmstDataAttr.find(send_attrs);
- // We must have a RMST data attribute to send
- if(!rmst_data_attr){
- DiffPrint(DEBUG_ALWAYS, "sendRMST - can't find blob to send !n");
- return FAIL;
- }
- // Copy RMST blob and calculate number of fragments
- blob_ptr = rmst_data_attr->getVal();
- size = rmst_data_attr->getLen();
- blob = new char[size];
- memcpy((void *)blob, blob_ptr, size);
- num_frag = (size + fragment_size - 1) / fragment_size;
- // We index starting at zero
- num_frag--;
- max_frag_len = size - (num_frag * fragment_size);
- DiffPrint(DEBUG_DETAILS,
- "sendRMST: rmst num_frag = %d, fragment_size = %d, max_frag_len = %dn",
- num_frag, fragment_size, max_frag_len);
- // Prepare attribute vector with RMST attributes
- max_frag_attr = RmstMaxFragAttr.make(NRAttribute::IS, num_frag);
- send_attrs->push_back(max_frag_attr);
- send_attrs->push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
- frag_number_attr = RmstFragAttr.make(NRAttribute::IS, 0);
- send_attrs->push_back(frag_number_attr);
- send_attrs->push_back(RmstIdAttr.make(NRAttribute::IS, id));
- // Replace the large blob with a blob fragment
- frag_ptr = (void *)&blob[0];
- // The call to setVal will delete the original blob!!
- if (num_frag == 0)
- rmst_data_attr->setVal(frag_ptr, max_frag_len);
- else
- rmst_data_attr->setVal(frag_ptr, fragment_size);
- // Send 1st fragment
- retval = send(publication_handle, send_attrs);
- // Send other fragments
- for (int i = 1; i <= num_frag; i++){
- // Small delay between sending fragments
- send_interval.tv_sec = 0;
- send_interval.tv_usec = 25000;
- select(0, NULL, NULL, NULL, &send_interval);
- // Send next fragment
- frag_number_attr->setVal(i);
- frag_ptr = (void *)&blob[i * fragment_size];
- if (num_frag == i)
- rmst_data_attr->setVal(frag_ptr, max_frag_len);
- else
- rmst_data_attr->setVal(frag_ptr, fragment_size);
- retval = send(publication_handle, send_attrs);
- }
- ClearAttrs(send_attrs);
- delete blob;
- return OK;
- }
- int DiffusionRouting::addToBlacklist(int32_t node)
- {
- ControlMessage *control_blob;
- NRAttribute *ctrl_msg_attr;
- Message *my_message;
- NRAttrVec *attrs;
- control_blob = new ControlMessage(ADD_TO_BLACKLIST, node, 0);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
- (void *)control_blob,
- sizeof(ControlMessage));
- attrs = new NRAttrVec;
- attrs->push_back(ctrl_msg_attr);
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Send Packet
- sendMessageToDiffusion(my_message);
- // Delete message
- delete my_message;
- delete control_blob;
- return OK;
- }
- int DiffusionRouting::clearBlacklist()
- {
- ControlMessage *control_blob;
- NRAttribute *ctrl_msg_attr;
- Message *my_message;
- NRAttrVec *attrs;
-
- control_blob = new ControlMessage(CLEAR_BLACKLIST, 0, 0);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
- (void *)control_blob,
- sizeof(ControlMessage));
- attrs = new NRAttrVec;
- attrs->push_back(ctrl_msg_attr);
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Send Packet
- sendMessageToDiffusion(my_message);
- // Delete message
- delete my_message;
- delete control_blob;
-
- return OK;
- }
- handle DiffusionRouting::addFilter(NRAttrVec *filter_attrs, u_int16_t priority,
- FilterCallback *cb)
- {
- FilterEntry *filter_entry;
- NRAttrVec *attrs;
- NRAttribute *ctrl_msg_attr;
- ControlMessage *control_blob;
- Message *my_message;
- TimerCallback *timer_callback;
- // Check parameters
- if (!filter_attrs || !cb || priority < FILTER_MIN_PRIORITY || priority > FILTER_MAX_PRIORITY){
- DiffPrint(DEBUG_ALWAYS, "Received invalid parameters when adding filter !n");
- return FAIL;
- }
- // Get lock first
- GetLock(dr_mtx_);
- // Create and Initialize the handle_entry structute
- filter_entry = new FilterEntry(next_handle_, priority, agent_id_);
- next_handle_++;
- filter_entry->cb_ = (FilterCallback *) cb;
- filter_list_.push_back(filter_entry);
- // Copy attributes (keep them for matching later)
- filter_entry->filter_attrs_ = CopyAttrs(filter_attrs);
- // Copy the attributes (and add the control attr)
- attrs = CopyAttrs(filter_attrs);
- control_blob = new ControlMessage(ADD_UPDATE_FILTER,
- priority, filter_entry->handle_);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
- (void *)control_blob,
- sizeof(ControlMessage));
- attrs->push_back(ctrl_msg_attr);
- // Initialize message structure
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Release the lock
- ReleaseLock(dr_mtx_);
- // Send Packet
- sendMessageToDiffusion(my_message);
- // Add keepalive timer to the event queue
- timer_callback = new FilterKeepaliveCallback(this, filter_entry);
- timers_manager_->addTimer(FILTER_KEEPALIVE_DELAY, timer_callback);
- // Delete message, attribute set and controlblob
- delete my_message;
- delete control_blob;
- return filter_entry->handle_;
- }
- int DiffusionRouting::removeFilter(handle filter_handle)
- {
- FilterEntry *filter_entry = NULL;
- ControlMessage *control_blob;
- NRAttribute *ctrl_msg_attr;
- NRAttrVec *attrs;
- Message *my_message;
- // Get lock first
- GetLock(dr_mtx_);
- filter_entry = findFilter(filter_handle);
- if (!filter_entry){
- // Handle doesn't exist, return FAIL
- ReleaseLock(dr_mtx_);
- return FAIL;
- }
- control_blob = new ControlMessage(REMOVE_FILTER, filter_entry->handle_, 0);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
- (void *)control_blob,
- sizeof(ControlMessage));
- attrs = new NRAttrVec;
- attrs->push_back(ctrl_msg_attr);
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Handle will be destroyed when next keepalive timer happens
- filter_entry->valid_ = false;
- // Send Packet
- sendMessageToDiffusion(my_message);
- // Release the lock
- ReleaseLock(dr_mtx_);
- // Delete message
- delete my_message;
- delete control_blob;
- return OK;
- }
- handle DiffusionRouting::addTimer(int timeout, TimerCallback *callback)
- {
- return (timers_manager_->addTimer(timeout, callback));
- }
- handle DiffusionRouting::addTimer(int timeout, void *p, TimerCallbacks *cb)
- {
- TimerCallback *callback;
- callback = new OldAPITimer(cb, p);
- return (addTimer(timeout, callback));
- }
- bool DiffusionRouting::removeTimer(handle hdl)
- {
- return (timers_manager_->removeTimer(hdl));
- }
- int DiffusionRouting::filterKeepaliveTimeout(FilterEntry *filter_entry)
- {
- FilterEntry *my_entry = NULL;
- ControlMessage *control_blob;
- NRAttribute *ctrl_msg_attr;
- NRAttrVec *attrs;
- Message *my_message;
- // Acquire lock first
- GetLock(dr_mtx_);
- if (filter_entry->valid_){
- // Send keepalive
- control_blob = new ControlMessage(ADD_UPDATE_FILTER,
- filter_entry->priority_,
- filter_entry->handle_);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS,
- (void *)control_blob,
- sizeof(ControlMessage));
- attrs = CopyAttrs(filter_entry->filter_attrs_);
- attrs->push_back(ctrl_msg_attr);
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Send Message
- sendMessageToDiffusion(my_message);
- delete my_message;
- delete control_blob;
- // Release lock
- ReleaseLock(dr_mtx_);
- // Reschedule another filter keepalive timer in event queue
- return (FILTER_KEEPALIVE_DELAY);
- }
- else{
- // Filter was removed
- my_entry = deleteFilter(filter_entry->handle_);
- // We should have removed the correct handle
- if (my_entry != filter_entry){
- DiffPrint(DEBUG_ALWAYS, "DiffusionRouting::KeepaliveTimeout: Handles should match !n");
- exit(-1);
- }
- delete my_entry;
- // Release lock
- ReleaseLock(dr_mtx_);
- return -1;
- }
- }
- int DiffusionRouting::interestTimeout(HandleEntry *handle_entry)
- {
- HandleEntry *my_handle = NULL;
- Message *my_message;
- // Acquire lock first
- GetLock(dr_mtx_);
- if (handle_entry->valid_){
- // Send the interest message if entry is still valid
- my_message = new Message(DIFFUSION_VERSION, INTEREST, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = CopyAttrs(handle_entry->attrs_);
- my_message->num_attr_ = handle_entry->attrs_->size();
- my_message->data_len_ = CalculateSize(handle_entry->attrs_);
- // Send Packet
- sendMessageToDiffusion(my_message);
- delete my_message;
- // Release lock
- ReleaseLock(dr_mtx_);
- // Reschedule this timer in the queue
- return (int) (floor(-1 * (log(1 - (GetRand() * 1.0 / RAND_MAX))) /
- INTEREST_LAMBDA));
- }
- else{
- // Interest was canceled. Just delete it from the handle_list
- my_handle = removeHandle(handle_entry->hdl_, &sub_list_);
- // We should have removed the correct handle
- if (my_handle != handle_entry){
- DiffPrint(DEBUG_ALWAYS,
- "Error: interestTimeout: Handles should match !n");
- exit(-1);
- }
- delete my_handle;
- // Release lock
- ReleaseLock(dr_mtx_);
- // Delete timer from the queue
- return -1;
- }
- }
- int DiffusionRouting::sendMessage(Message *msg, handle h,
- u_int16_t priority)
- {
- RedirectMessage *original_hdr;
- NRAttribute *original_attr, *ctrl_msg_attr;
- ControlMessage *control_blob;
- NRAttrVec *attrs;
- Message *my_message;
- if ((priority < FILTER_MIN_PRIORITY) ||
- (priority > FILTER_KEEP_PRIORITY))
- return FAIL;
- // Create an attribute with the original header
- original_hdr = new RedirectMessage(msg->new_message_, msg->msg_type_,
- msg->source_port_, msg->data_len_,
- msg->num_attr_, msg->rdm_id_,
- msg->pkt_num_, msg->next_hop_,
- msg->last_hop_, 0,
- msg->next_port_);
- original_attr = OriginalHdrAttr.make(NRAttribute::IS, (void *)original_hdr,
- sizeof(RedirectMessage));
- // Create the attribute with the control message
- control_blob = new ControlMessage(SEND_MESSAGE, h, priority);
- ctrl_msg_attr = ControlMsgAttr.make(NRAttribute::IS, (void *)control_blob,
- sizeof(ControlMessage));
- // Copy Attributes and add originalAttr and controlAttr
- attrs = CopyAttrs(msg->msg_attr_vec_);
- attrs->push_back(original_attr);
- attrs->push_back(ctrl_msg_attr);
- my_message = new Message(DIFFUSION_VERSION, CONTROL, agent_id_, 0,
- 0, pkt_count_, random_id_, LOCALHOST_ADDR,
- LOCALHOST_ADDR);
- // Increment pkt_counter
- pkt_count_++;
- // Add attributes to the message
- my_message->msg_attr_vec_ = attrs;
- my_message->num_attr_ = attrs->size();
- my_message->data_len_ = CalculateSize(attrs);
- // Send Packet
- sendMessageToDiffusion(my_message);
- delete my_message;
- delete control_blob;
- delete original_hdr;
- return OK;
- }
- #ifndef NS_DIFFUSION
- void DiffusionRouting::doIt()
- {
- run(true, WAIT_FOREVER);
- }
- void DiffusionRouting::doOne(long timeout)
- {
- run(false, timeout);
- }
- void DiffusionRouting::run(bool wait_condition, long max_timeout)
- {
- DeviceList::iterator itr;
- int status, max_sock, fd;
- bool flag;
- DiffPacket in_pkt;
- fd_set fds;
- struct timeval tv;
- struct timeval max_tv;
- do{
- FD_ZERO(&fds);
- max_sock = 0;
- // Set the maximum timeout value
- max_tv.tv_sec = (int) (max_timeout / 1000);
- max_tv.tv_usec = (int) ((max_timeout % 1000) * 1000);
- for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
- (*itr)->addInFDS(&fds, &max_sock);
- }
- // Check for the next timer
- timers_manager_->nextTimerTime(&tv);
- if (tv.tv_sec == MAXVALUE){
- // If we don't have any timers, we wait for POLLING_INTERVAL
- if (max_timeout == WAIT_FOREVER){
- tv.tv_sec = POLLING_INTERVAL;
- tv.tv_usec = 0;
- }
- else{
- tv = max_tv;
- }
- }
- else{
- if ((max_timeout != WAIT_FOREVER) && (TimevalCmp(&tv, &max_tv) > 0)){
- // max_timeout value is smaller than next timer's time, so we
- // use themax_timeout value instead
- tv = max_tv;
- }
- }
- status = select(max_sock+1, &fds, NULL, NULL, &tv);
- if (status == 0){
- // Process all timers that have expired
- timers_manager_->executeAllExpiredTimers();
- }
- if (status > 0){
- do{
- flag = false;
- for (itr = in_devices_.begin(); itr != in_devices_.end(); ++itr){
- fd = (*itr)->checkInFDS(&fds);
- if (fd != -1){
- // Message waiting
- in_pkt = (*itr)->recvPacket(fd);
- recvPacket(in_pkt);
- // Clear this fd
- FD_CLR(fd, &fds);
- status--;
- flag = true;
- }
- }
- } while ((status > 0) && (flag == true));
- }
- else
- if (status < 0){
- DiffPrint(DEBUG_IMPORTANT, "Select returned %dn", status);
- }
- } while (wait_condition);
- }
- #endif // NS_DIFFUSION
- #ifndef NS_DIFFUSION
- void DiffusionRouting::sendMessageToDiffusion(Message *msg)
- {
- DiffPacket out_pkt = NULL;
- struct hdr_diff *dfh;
- char *pos;
- int len;
- out_pkt = AllocateBuffer(msg->msg_attr_vec_);
- dfh = HDR_DIFF(out_pkt);
- pos = (char *) out_pkt;
- pos = pos + sizeof(struct hdr_diff);
- len = PackAttrs(msg->msg_attr_vec_, pos);
- LAST_HOP(dfh) = htonl(msg->last_hop_);
- NEXT_HOP(dfh) = htonl(msg->next_hop_);
- DIFF_VER(dfh) = msg->version_;
- MSG_TYPE(dfh) = msg->msg_type_;
- DATA_LEN(dfh) = htons(len);
- PKT_NUM(dfh) = htonl(msg->pkt_num_);
- RDM_ID(dfh) = htonl(msg->rdm_id_);
- NUM_ATTR(dfh) = htons(msg->num_attr_);
- SRC_PORT(dfh) = htons(msg->source_port_);
- sendPacketToDiffusion(out_pkt, sizeof(struct hdr_diff) + len, diffusion_port_);
- delete [] out_pkt;
- }
- #else
- void DiffusionRouting::sendMessageToDiffusion(Message *msg)
- {
- Message *my_msg;
- DeviceList::iterator itr;
- int len;
- my_msg = CopyMessage(msg);
- len = CalculateSize(my_msg->msg_attr_vec_);
- len = len + sizeof(struct hdr_diff);
- for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
- (*itr)->sendPacket((DiffPacket) my_msg, len, diffusion_port_);
- }
- }
- #endif // !NS_DIFFUSION
- void DiffusionRouting::sendPacketToDiffusion(DiffPacket pkt, int len, int dst)
- {
- DeviceList::iterator itr;
- for (itr = local_out_devices_.begin(); itr != local_out_devices_.end(); ++itr){
- (*itr)->sendPacket(pkt, len, dst);
- }
- }
- #ifndef NS_DIFFUSION
- void DiffusionRouting::recvPacket(DiffPacket pkt)
- {
- struct hdr_diff *dfh = HDR_DIFF(pkt);
- Message *rcv_message = NULL;
- int8_t version, msg_type;
- u_int16_t data_len, num_attr, source_port;
- int32_t pkt_num, rdm_id, next_hop, last_hop;
- // Read header
- version = DIFF_VER(dfh);
- msg_type = MSG_TYPE(dfh);
- source_port = ntohs(SRC_PORT(dfh));
- pkt_num = ntohl(PKT_NUM(dfh));
- rdm_id = ntohl(RDM_ID(dfh));
- num_attr = ntohs(NUM_ATTR(dfh));
- next_hop = ntohl(NEXT_HOP(dfh));
- last_hop = ntohl(LAST_HOP(dfh));
- data_len = ntohs(DATA_LEN(dfh));
- // Create a message structure from the incoming packet
- rcv_message = new Message(version, msg_type, source_port, data_len,
- num_attr, pkt_num, rdm_id, next_hop, last_hop);
- // Read all attributes into the Message structure
- rcv_message->msg_attr_vec_ = UnpackAttrs(pkt, num_attr);
- // Process the incoming message
- recvMessage(rcv_message);
- // We are done
- delete rcv_message;
- delete [] pkt;
- }
- #endif // !NS_DIFFUSION
- void DiffusionRouting::recvMessage(Message *msg)
- {
- // Check version
- if (msg->version_ != DIFFUSION_VERSION)
- return;
- // Check destination
- if (msg->next_hop_ != LOCALHOST_ADDR)
- return;
- // Process the incoming message
- if (msg->msg_type_ == REDIRECT)
- processControlMessage(msg);
- else
- processMessage(msg);
- }
- void DiffusionRouting::processControlMessage(Message *msg)
- {
- NRSimpleAttribute<void *> *original_header_attr = NULL;
- NRAttrVec::iterator place = msg->msg_attr_vec_->begin();
- RedirectMessage *original_header;
- FilterEntry *entry;
- handle my_handle;
- // Find the attribute containing the original packet header
- original_header_attr = OriginalHdrAttr.find_from(msg->msg_attr_vec_,
- place, &place);
- if (!original_header_attr){
- DiffPrint(DEBUG_ALWAYS, "Error: Received an invalid REDIRECT message !n");
- return;
- }
- // Restore original message header
- original_header = (RedirectMessage *) original_header_attr->getVal();
- my_handle = original_header->handle_;
- msg->msg_type_ = original_header->msg_type_;
- msg->source_port_ = original_header->source_port_;
- msg->pkt_num_ = original_header->pkt_num_;
- msg->rdm_id_ = original_header->rdm_id_;
- msg->next_hop_ = original_header->next_hop_;
- msg->last_hop_ = original_header->last_hop_;
- msg->num_attr_ = original_header->num_attr_;
- msg->new_message_ = original_header->new_message_;
- msg->next_port_ = original_header->next_port_;
- // Delete attribute from the original set
- msg->msg_attr_vec_->erase(place);
- delete original_header_attr;
- // Find the right callback
- GetLock(dr_mtx_);
- entry = findFilter(my_handle);
- if (entry && entry->valid_){
- // Just to confirm
- if (OneWayMatch(entry->filter_attrs_, msg->msg_attr_vec_)){
- ReleaseLock(dr_mtx_);
- entry->cb_->recv(msg, my_handle);
- return;
- }
- else{
- DiffPrint(DEBUG_ALWAYS,
- "Warning: Filter doesn't match incoming message's attributes !n");
- }
- }
- else{
- DiffPrint(DEBUG_IMPORTANT,
- "Report: Cannot find filter (possibly deleted ?)n");
- }
- ReleaseLock(dr_mtx_);
- }
- void DiffusionRouting::processMessage(Message *msg)
- {
- NRSimpleAttribute<int> *rmst_id_attr = NULL;
- CallbackList::iterator cbl_itr;
- HandleList::iterator sub_itr;
- NRAttrVec *callback_attrs;
- HandleEntry *entry;
- CallbackEntry *aux;
- CallbackList cbl;
- // First, acquire the lock
- GetLock(dr_mtx_);
- for (sub_itr = sub_list_.begin(); sub_itr != sub_list_.end(); ++sub_itr){
- entry = *sub_itr;
- if ((entry->valid_) && (MatchAttrs(msg->msg_attr_vec_, entry->attrs_)))
- if (entry->cb_){
- aux = new CallbackEntry(entry->cb_, entry->hdl_);
- cbl.push_back(aux);
- }
- }
- // We can release the lock now
- ReleaseLock(dr_mtx_);
- // Check for RMST id attribute
- rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
- cbl_itr = cbl.begin();
- // Process RMST fragment if we have callbacks and this message has an RmstId
- if (rmst_id_attr && (cbl_itr != cbl.end())){
- if (!processRmst(msg)){
- cbl.clear();
- return;
- }
- }
- // Now we just call all callback functions
- for (cbl_itr = cbl.begin(); cbl_itr != cbl.end(); ++cbl_itr){
- // Copy attributes
- callback_attrs = CopyAttrs(msg->msg_attr_vec_);
- // Call app-specific callback function
- aux = *cbl_itr;
- aux->cb_->recv(callback_attrs, aux->subscription_handle_);
- delete aux;
- // Clean up callback attributes
- ClearAttrs(callback_attrs);
- delete callback_attrs;
- }
- // We are done
- cbl.clear();
- }
- bool DiffusionRouting::processRmst(Message *msg)
- {
- NRSimpleAttribute<void *> *data_buf_attr = NULL;
- NRSimpleAttribute<int> *max_frag_attr = NULL;
- NRSimpleAttribute<int> *rmst_id_attr = NULL;
- NRSimpleAttribute<int> *frag_attr = NULL;
- int rmst_no, frag_no, data_buf_len, count;
- void *blob_ptr, *tmp_frag_ptr;
- Int2RecRmst::iterator rmst_iterator;
- Int2Frag::iterator frag_iterator;
- char *dstPtr;
- int dstSize;
- RecRmst *rmst_ptr;
- // Read Rmst attributes
- data_buf_attr = RmstDataAttr.find(msg->msg_attr_vec_);
- rmst_id_attr = RmstIdAttr.find(msg->msg_attr_vec_);
- frag_attr = RmstFragAttr.find(msg->msg_attr_vec_);
- rmst_no = rmst_id_attr->getVal();
- frag_no = frag_attr->getVal();
- blob_ptr = data_buf_attr->getVal();
- data_buf_len = data_buf_attr->getLen();
- // See if we are receiving this blob, if not start a new RecRmst
- rmst_iterator = rec_rmst_map_.find(rmst_no);
- if (rmst_iterator == rec_rmst_map_.end()){
- rmst_ptr = new RecRmst(rmst_no);
- rec_rmst_map_.insert(Int2RecRmst::value_type(rmst_no, rmst_ptr));
- }
- else
- rmst_ptr = (*rmst_iterator).second;
- if (frag_no == 0){
- max_frag_attr = RmstMaxFragAttr.find(msg->msg_attr_vec_);
- rmst_ptr->max_frag_ = max_frag_attr->getVal();
- rmst_ptr->mtu_len_ = data_buf_len;
- }
- // Copy fragment to map
- tmp_frag_ptr = new char[data_buf_len];
- memcpy(tmp_frag_ptr, blob_ptr, data_buf_len);
- rmst_ptr->frag_map_.insert(Int2Frag::value_type(frag_no, tmp_frag_ptr));
- if (frag_no == rmst_ptr->max_frag_)
- rmst_ptr->max_frag_len_ = data_buf_len;
- count = rmst_ptr->frag_map_.size();
- // If this is the last rmst fragment, create the entire rmst
- if (count == (rmst_ptr->max_frag_ + 1)){
-
- DiffPrint(DEBUG_DETAILS,
- "RMST #%d is complete, creating big blob !n", rmst_no);
- // Allocate memory for the big blob
- dstSize = rmst_ptr->max_frag_ * rmst_ptr->mtu_len_ + rmst_ptr->max_frag_len_;
- dstPtr = new char[dstSize];
-
- // Copy all but last fragment to a buffer
- for (int i = 0; i < rmst_ptr->max_frag_; i++){
- frag_iterator = rmst_ptr->frag_map_.find(i);
- tmp_frag_ptr = (*frag_iterator).second;
- memcpy((void *)&dstPtr[i * rmst_ptr->mtu_len_],
- (void *)tmp_frag_ptr, rmst_ptr->mtu_len_);
- }
- // Now, copy the last fragment to the buffer
- frag_iterator = rmst_ptr->frag_map_.find(rmst_ptr->max_frag_);
- tmp_frag_ptr = (*frag_iterator).second;
- memcpy((void *)&dstPtr[rmst_ptr->max_frag_ * rmst_ptr->mtu_len_],
- (void *)tmp_frag_ptr, rmst_ptr->max_frag_len_);
- // Since we copied everything from the map - clean it up
- rec_rmst_map_.erase(rmst_iterator);
- delete rmst_ptr;
- // Now we substitute the last fragment with the reconstructed blob
- data_buf_attr->setVal(dstPtr, dstSize);
- // Deliver this to the application
- return true;
- }
- // We don't have the entire blob
- return false;
- }
- HandleEntry * DiffusionRouting::removeHandle(handle my_handle, HandleList *hl)
- {
- HandleList::iterator itr;
- HandleEntry *entry;
- for (itr = hl->begin(); itr != hl->end(); ++itr){
- entry = *itr;
- if (entry->hdl_ == my_handle){
- hl->erase(itr);
- return entry;
- }
- }
- return NULL;
- }
- HandleEntry * DiffusionRouting::findHandle(handle my_handle, HandleList *hl)
- {
- HandleList::iterator itr;
- HandleEntry *entry;
- for (itr = hl->begin(); itr != hl->end(); ++itr){
- entry = *itr;
- if (entry->hdl_ == my_handle)
- return entry;
- }
- return NULL;
- }
- FilterEntry * DiffusionRouting::deleteFilter(handle my_handle)
- {
- FilterList::iterator itr;
- FilterEntry *entry;
- for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
- entry = *itr;
- if (entry->handle_ == my_handle){
- filter_list_.erase(itr);
- return entry;
- }
- }
- return NULL;
- }
- FilterEntry * DiffusionRouting::findFilter(handle my_handle)
- {
- FilterList::iterator itr;
- FilterEntry *entry;
- for (itr = filter_list_.begin(); itr != filter_list_.end(); ++itr){
- entry = *itr;
- if (entry->handle_ == my_handle)
- return entry;
- }
- return NULL;
- }
- bool DiffusionRouting::hasScope(NRAttrVec *attrs)
- {
- NRAttribute *temp = NULL;
- temp = NRScopeAttr.find(attrs);
- if (temp)
- return true;
- return false;
- }
- bool DiffusionRouting::checkSubscription(NRAttrVec *attrs)
- {
- NRSimpleAttribute<int> *nrclass = NULL;
- NRSimpleAttribute<int> *nrscope = NULL;
- // We first try to locate both class and scope attributes
- nrclass = NRClassAttr.find(attrs);
- nrscope = NRScopeAttr.find(attrs);
- // There must be a class attribute in subscriptions
- if (!nrclass)
- return false;
- if (nrscope){
- // This subcription has both class and scope attribute. So, we
- // check if class/scope attributes comply with the Diffusion
- // Routing API
- // Must check scope's operator. The API requires it to be "IS"
- if (nrscope->getOp() != NRAttribute::IS)
- return false;
- // Ok, so first check if this is a global subscription
- if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) &&
- (nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
- (nrclass->getOp() == NRAttribute::IS))
- return true;
- // Check for local subscriptions
- if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
- return true;
- // Just to be sure we did not miss any case
- return false;
- }
- // If there is no scope attribute, we will insert one later if this
- // subscription looks like a global subscription
- if ((nrclass->getVal() == NRAttribute::INTEREST_CLASS) &&
- (nrclass->getOp() == NRAttribute::IS))
- return true;
- return false;
- }
- bool DiffusionRouting::checkPublication(NRAttrVec *attrs)
- {
- NRSimpleAttribute<int> *nrclass = NULL;
- NRSimpleAttribute<int> *nrscope = NULL;
- // We first try to locate both class and scope attributes
- nrclass = NRClassAttr.find(attrs);
- nrscope = NRScopeAttr.find(attrs);
- // There must be a class attribute in the publication
- if (!nrclass)
- return false;
- // In addition, the Diffusion Routing API requires the class
- // attribute to be set to "IS DATA_CLASS"
- if ((nrclass->getVal() != NRAttribute::DATA_CLASS) ||
- (nrclass->getOp() != NRAttribute::IS))
- return false;
- if (nrscope){
- // Ok, so this publication has both class and scope attributes. We
- // now have to check if they comply to the Diffusion Routing API
- // semantics for publish
- // Must check scope's operator. The API requires it to be "IS"
- if (nrscope->getOp() != NRAttribute::IS)
- return false;
- // We accept both global and local scope data messages
- if ((nrscope->getVal() == NRAttribute::GLOBAL_SCOPE) ||
- (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE))
- return true;
- // Just not to miss any case
- return false;
- }
- // A publish without a scope attribute is fine, we will include a
- // default NODE_LOCAL_SCOPE attribute later
- return true;
- }
- bool DiffusionRouting::checkSend(NRAttrVec *attrs)
- {
- NRSimpleAttribute<int> *nrclass = NULL;
- NRSimpleAttribute<int> *nrscope = NULL;
- // Currently only checks for Class and Scope attributes
- nrclass = NRClassAttr.find(attrs);
- nrscope = NRScopeAttr.find(attrs);
- if (nrclass || nrscope)
- return false;
- return true;
- }
- bool DiffusionRouting::isPushData(NRAttrVec *attrs)
- {
- NRSimpleAttribute<int> *nrclass = NULL;
- NRSimpleAttribute<int> *nrscope = NULL;
- // Currently only checks for Class and Scope attributes
- nrclass = NRClassAttr.find(attrs);
- nrscope = NRScopeAttr.find(attrs);
- // We should have both class and scope
- if (nrclass && nrscope){
- if (nrscope->getVal() == NRAttribute::NODE_LOCAL_SCOPE)
- return false;
- return true;
- }
- else{
- DiffPrint(DEBUG_ALWAYS, "Error: Cannot find class/scope attributes !n");
- return false;
- }
- }