- #include "rmst_source.hh"
- #include <unistd.h>
- static class RmstSrcClass : public TclClass {
- public:
- RmstSrcClass() : TclClass("Application/DiffApp/RmstSource") {}
- TclObject* create(int , const char*const* ) {
- return(new RmstSource());
- }
- } class_rmst_source;
- void RmstSendDataTimer::expire(Event *e) {
- a_->send();
- }
- void RmstSource::send()
- {
- int sleep_interval;
- bool sent_first_blob = false;
- if (num_subscriptions_ > 0){
- if (!sent_first_blob){
- sendBlob();
- sent_first_blob = true;
- sleep_interval = 100;
- }
- else
- printf("RMST-SRC::sees subscriptionsn");
- }
- else{
- printf("RMST-SRC::sees no subscriptionsn");
- sleep_interval = 10;
- }
- // re-schedule the timer
- sdt_.resched(sleep_interval);
- }
- int RmstSource::command(int argc, const char*const* argv)
- {
- if (argc == 2) {
- if (strcmp(argv[1], "subscribe") == 0) {
- run();
- return TCL_OK;
- }
- }
- return DiffApp::command(argc, argv);
- }
- #endif // NS_DIFFUSION
- void RmstSrcReceive::recv(NRAttrVec *data, NR::handle my_handle)
- {
- NRSimpleAttribute<char*> *rmst_target_attr = NULL;
- NRSimpleAttribute<int> *nr_class = NULL;
- NRSimpleAttribute<int> *tsprt_ctl_attr = NULL;
- timeval cur_time;
- printf("RMST-SRC::recv got an attr vector.");
- GetTime(&cur_time);
- printf(" time: sec = %dn", (unsigned int) cur_time.tv_sec);
- nr_class = NRClassAttr.find(data);
- tsprt_ctl_attr = RmstTsprtCtlAttr.find(data);
- if (nr_class){
- switch (nr_class->getVal()){
- case NRAttribute::INTEREST_CLASS:
- if (tsprt_ctl_attr && (tsprt_ctl_attr->getVal() == RMST_RESP)){
- printf(" Source received an INTEREST messagen");
- src_->num_subscriptions_++;
- }
- break;
- case NRAttribute::DISINTEREST_CLASS:
- src_->num_subscriptions_--;
- rmst_target_attr = RmstTargetAttr.find(data);
- if (rmst_target_attr){
- printf(" Source received a DISINTEREST for %sn",
- rmst_target_attr->getVal());
- }
- else
- printf(" Source received a Disinterest message for unknown Interest!n");
- break;
- default:
- printf(" Source received an unknown or inappropriate class!(%d)!n",
- nr_class->getVal());
- break;
- }
- }
- if (tsprt_ctl_attr){
- switch (tsprt_ctl_attr->getVal()){
- case RMST_RESP:
- break;
- case RMST_CONT:
- printf(" Source received a RMST_CONT messagen");
- if(src_->blobs_to_send_ > 0){
- printf (" Source sending another blobn");
- src_->sendBlob();
- }
- else
- printf (" Source done sending blobsn");
- break;
- default:
- printf(" Source received an unexpected RmstTsprtCtlAttr (%d)!n",
- tsprt_ctl_attr->getVal());
- break;
- }
- }
- }
- RmstSource::RmstSource() : blobs_to_send_(4), sdt_(this)
- #else
- RmstSource::RmstSource(int argc, char **argv) : blobs_to_send_(4)
- #endif // NS_DIFFUSION
- {
- mr = new RmstSrcReceive(this);
- #ifndef NS_DIFFUSION
- parseCommandLine(argc, argv);
- dr_ = NR::createNR(diffusion_port_);
- #endif // NS_DIFFUSION
- ck_val_ = 100;
- }
- #ifndef NS_DIFFUSION
- int main(int argc, char **argv)
- {
- RmstSource *app;
- app = new RmstSource(argc, argv);
- app->run();
- return 0;
- }
- #endif // NS_DIFFUSION
- void RmstSource::run()
- {
- #ifndef NS_DIFFUSION
- int sleep_interval;
- bool sent_first_blob = false;
- #endif // !NS_DIFFUSION
- // Let diffusion know what kinds of interests we "latch."
- subscribe_handle_ = setupRmstInterest();
- // Let diffusion know what we intend to publish.
- send_handle_ = setupRmstPublication();
- #ifndef NS_DIFFUSION
- while(1){
- if (num_subscriptions_ > 0){
- if (!sent_first_blob){
- sendBlob();
- sent_first_blob = true;
- sleep_interval = 100;
- }
- else
- printf("RMST-SRC::sees subscriptionsn");
- }
- else{
- printf("RMST-SRC::sees no subscriptionsn");
- sleep_interval = 10;
- }
- sleep(sleep_interval);
- } //while loop
- #else
- send();
- #endif // !NS_DIFFUSION
- }
- handle RmstSource::setupRmstInterest()
- {
- NRAttrVec attrs;
- printf("RMST-SRC::sets up local subscription for PCM_SAMPLEsn");
- attrs.push_back(NRClassAttr.make(NRAttribute::NE, NRAttribute::DATA_CLASS));
- attrs.push_back(NRScopeAttr.make(NRAttribute::IS,
- NRAttribute::NODE_LOCAL_SCOPE));
- attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, "PCM_SAMPLE"));
- attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_RESP));
- attrs.push_back(RmstTsprtCtlAttr.make(NRAttribute::IS, RMST_CONT));
- handle h = dr_->subscribe(&attrs, mr);
- ClearAttrs(&attrs);
- return h;
- }
- handle RmstSource::setupRmstPublication()
- {
- NRAttrVec attrs;
- printf("RMST-SRC::publishes PCM_SAMPLEn");
- attrs.push_back(NRClassAttr.make(NRAttribute::IS, NRAttribute::DATA_CLASS));
- attrs.push_back(RmstTargetAttr.make(NRAttribute::IS, "PCM_SAMPLE"));
- handle h = dr_->publish(&attrs);
- ClearAttrs(&attrs);
- return h;
- }
- char* RmstSource::createBlob (int ck_val)
- {
- char *tmpPtr = new char[2500];
- for (int i = 0; i < 50; i++){
- sprintf(&tmpPtr[i*50], "PCM FragNo: %d of ck_val %d", i, ck_val);
- }
- return tmpPtr;
- }
- void RmstSource::sendBlob() {
- char *blob;
- int retval;
- NRAttrVec src_attrs;
- // Retrieve rmsb from the local cache to get pointer to it.
- blob = createBlob(ck_val_);
- ck_val_++;
- src_attrs.push_back(RmstDataAttr.make(NRAttribute::IS, blob, 2500));
- retval = ((DiffusionRouting *)dr_)->sendRmst(send_handle_,
- &src_attrs, PAYLOAD_SIZE);
- blobs_to_send_--;
- delete blob;
- }