srm-ssm.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:21k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* -*- Mode:C++; c-basic-offset:8; tab-width:8; indent-tabs-mode:t -*- */
  2. /*
  3.  * srm-ssm.cc
  4.  * Copyright (C) 1997 by the University of Southern California
  5.  * $Id: srm-ssm.cc,v 1.11 2005/08/25 18:58:08 johnh Exp $
  6.  *
  7.  * This program is free software; you can redistribute it and/or
  8.  * modify it under the terms of the GNU General Public License,
  9.  * version 2, as published by the Free Software Foundation.
  10.  *
  11.  * This program is distributed in the hope that it will be useful,
  12.  * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  14.  * GNU General Public License for more details.
  15.  *
  16.  * You should have received a copy of the GNU General Public License along
  17.  * with this program; if not, write to the Free Software Foundation, Inc.,
  18.  * 59 Temple Place, Suite 330, Boston, MA 02111-1307, USA.
  19.  *
  20.  *
  21.  * The copyright of this module includes the following
  22.  * linking-with-specific-other-licenses addition:
  23.  *
  24.  * In addition, as a special exception, the copyright holders of
  25.  * this module give you permission to combine (via static or
  26.  * dynamic linking) this module with free software programs or
  27.  * libraries that are released under the GNU LGPL and with code
  28.  * included in the standard release of ns-2 under the Apache 2.0
  29.  * license or under otherwise-compatible licenses with advertising
  30.  * requirements (or modified versions of such code, with unchanged
  31.  * license).  You may copy and distribute such a system following the
  32.  * terms of the GNU GPL for this module and the licenses of the
  33.  * other code concerned, provided that you include the source code of
  34.  * that other code when and as the GNU GPL requires distribution of
  35.  * source code.
  36.  *
  37.  * Note that people who make modified versions of this module
  38.  * are not obligated to grant this special exception for their
  39.  * modified versions; it is their choice whether to do so.  The GNU
  40.  * General Public License gives permission to release a modified
  41.  * version without this exception; this exception also makes it
  42.  * possible to release a modified version which carries forward this
  43.  * exception.
  44.  *
  45.  */
  46. //
  47. // The code implements scalable session message. See
  48. // http://catarina.usc.edu/estrin/papers/infocom98/ssession.ps
  49. #ifndef lint
  50. static const char rcsid[] =
  51.     "@(#) $Header: /cvsroot/nsnam/ns-2/mcast/srm-ssm.cc,v 1.11 2005/08/25 18:58:08 johnh Exp $ (USC/ISI)";
  52. #endif
  53. #include <stdlib.h>
  54. #include <assert.h>
  55. #include <stdio.h>
  56. #include "config.h"
  57. #include "tclcl.h"
  58. #include "agent.h"
  59. #include "packet.h"
  60. #include "ip.h"
  61. #include "srm.h"
  62. #include "srm-ssm.h"
  63. #include "trace.h"
  64. int hdr_srm_ext::offset_;
  65. static class SRMEXTHeaderClass : public PacketHeaderClass {
  66. public:
  67. SRMEXTHeaderClass() : PacketHeaderClass("PacketHeader/SRMEXT",
  68. sizeof(hdr_srm_ext)) {
  69. bind_offset(&hdr_srm_ext::offset_);
  70. }
  71. } class_srmexthdr;
  72. static class SSMSRMAgentClass : public TclClass {
  73. public:
  74.   SSMSRMAgentClass() : TclClass("Agent/SRM/SSM") {}
  75.   TclObject* create(int, const char*const*) {
  76.     return (new SSMSRMAgent());
  77.   }
  78. } class_srm_ssm_agent;
  79. SSMSRMAgent::SSMSRMAgent() 
  80.   : SRMAgent(), glb_sessCtr_(-1), loc_sessCtr_(-1), rep_sessCtr_(-1)
  81. {
  82.   bind("group_scope_",&groupScope_);
  83.   bind("local_scope_",&localScope_);
  84.   bind("scope_flag_",&scopeFlag_);
  85.   bind("rep_id_", &repid_);
  86. }
  87. int SSMSRMAgent::command(int argc, const char*const* argv)
  88. {
  89.   Tcl& tcl = Tcl::instance();
  90.   if (strcmp(argv[1], "send") == 0) {
  91.     if (strcmp(argv[2], "session") == 0) {
  92.       send_sess();
  93.       return TCL_OK;
  94.     }
  95.     if (strcmp(argv[2], "request") == 0) {
  96.       int round = atoi(argv[3]);
  97.       int sender = atoi(argv[4]);
  98.       int msgid  = atoi(argv[5]);
  99.       send_ctrl(SRM_RQST, round, sender, msgid, 0);
  100.       return TCL_OK;
  101.     }
  102.     if (strcmp(argv[2], "repair") == 0) {
  103.       int round = atoi(argv[3]);
  104.       int sender = atoi(argv[4]);
  105.       int msgid  = atoi(argv[5]);
  106.       send_ctrl(SRM_REPR, round, sender, msgid, packetSize_);
  107.       return TCL_OK;
  108.     }
  109.     tcl.resultf("%s: invalid send request %s", name_, argv[2]);
  110.     return TCL_ERROR;
  111.     /*
  112.       #if 0
  113.       fprintf(stdout,"%s: send request %s passed to srm_agent", 
  114.       name_, argv[2]);
  115.       #endif
  116.       return SRMAgent::command(argc, argv);
  117.       */
  118.   }
  119.   if (argc == 2) {
  120.     if (strcmp(argv[1], "start") == 0) {
  121.       sip_->sender_ = addr();
  122.       sip_->distance_ = 0.0;
  123.       /* sip_->repid_ = addr_;
  124.  sip_->scopeFlag_ = SRM_GLOBAL;
  125.  repid_ = addr_;
  126.  scopeFlag_ = SRM_GLOBAL;
  127.  */   
  128.       groupScope_ = 32;
  129.       senderFlag_ = 0;
  130.       printf("%s is %d and rep-status %dn", name_, addr(), scopeFlag_);
  131.       return TCL_OK;
  132.     }
  133.     if (strcmp(argv[1], "ch-rep") == 0) {
  134.       if(scopeFlag_ == SRM_GLOBAL) {
  135. sip_->repid_ = repid_ = addr();
  136. sip_->scopeFlag_ = SRM_GLOBAL;
  137.       } else {
  138. sip_->repid_ = repid_;
  139. sip_->scopeFlag_ = SRM_LOCAL;
  140.       }    
  141.       return TCL_OK;
  142.     }
  143.     if (strcmp(argv[1], "distances?") == 0) {
  144.       if (sip_->sender_ < 0) { // i.e. this agent is not
  145. tcl.result("");  //      yet active.
  146. return TCL_OK;
  147.       }
  148.       for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
  149. if((sp->distanceFlag_ == REP_DISTANCE) || 
  150.    (sp->distanceFlag_ == SELF_DISTANCE)) {
  151.   
  152.   tcl.resultf("%s %d %f", tcl.result(),
  153.       sp->sender_, sp->distance_);
  154. } else { /* Return reps distance */
  155.   SRMinfo* rsp = get_state(sp->repid_);
  156.   tcl.resultf("%s %d %f", tcl.result(),
  157.       sp->sender_, rsp->distance_);
  158. }
  159.       }
  160.       return TCL_OK;
  161.     }
  162.     
  163.   }
  164.   if (argc == 3) {
  165.     if (strcmp(argv[1], "distance?") == 0) {
  166.       int sender = atoi(argv[2]);
  167.       SRMinfo* sp = get_state(sender);
  168.       if((sp->distanceFlag_ == REP_DISTANCE) || 
  169.  (sp->distanceFlag_ == SELF_DISTANCE)) {
  170. tcl.resultf("%f", sp->distance_);
  171.       } else { /* Return reps distance */
  172. SRMinfo* rsp = get_state(sp->repid_);
  173. tcl.resultf("%f", rsp->distance_);
  174.       }
  175.       return TCL_OK;
  176.     }
  177.   }
  178.   return SRMAgent::command(argc, argv);
  179. }
  180. void SSMSRMAgent::recv(Packet* p, Handler* h)
  181. {
  182.   hdr_ip*  ih = hdr_ip::access(p);
  183.   hdr_srm* sh = hdr_srm::access(p);
  184.   hdr_srm_ext* seh = hdr_srm_ext::access(p);
  185.   if (ih->daddr() == 0) {
  186.     // Packet from local agent.  Add srm headers, set dst, and fwd
  187.     sh->type() = SRM_DATA;
  188.     sh->sender() = addr();
  189.     sh->seqnum() = ++dataCtr_;
  190.     seh->repid() = repid_;
  191.     ih->dst() = dst_;
  192.     ih->src() = here_;
  193.     target_->recv(p, h);
  194.   } else {
  195. #if 0
  196.     static char *foo[] = {"NONE", "DATA", "SESS", "RQST", "REPR"};
  197.     fprintf(stdout, "%7.4f %s %d  recvd SRM_%s <%d, %d> from %dn",
  198.     Scheduler::instance().clock(), name_, addr_, foo[sh->type()],
  199.     sh->sender(), sh->seqnum(), ih->src());
  200.     fflush(stdout);
  201. #endif
  202.     switch (sh->type()) {
  203.     case SRM_DATA:
  204.       recv_data(sh->sender(), sh->seqnum(), seh->repid(), p->accessdata());
  205.       Packet::free(p);
  206.       break;
  207.     case SRM_RQST:
  208.       recv_rqst(ih->saddr(), sh->round(), sh->sender(), sh->seqnum(),
  209. seh->repid());  
  210.       Packet::free(p);
  211.       break;
  212.     case SRM_REPR:
  213.       recv_repr(sh->round(), sh->sender(), sh->seqnum(), p->accessdata());
  214.       Packet::free(p);
  215.       break;
  216.     case SRM_SESS:
  217.       // This seqnum() is the session sequence number,
  218.       // not the data packet sequence numbers seen before.
  219.       // Send the whole pkt for ttl etc..
  220.       recv_sess(sh->seqnum(), (int*) p->accessdata(), p);
  221.       break;
  222.     }
  223.   }
  224. }
  225. void SSMSRMAgent::recv_data(int sender, int id, int repid, u_char* data)
  226. {
  227.   SRMinfo* sp = get_state(sender);
  228.   /* Just store the repid and call srmagent recv_data */
  229.   sp->repid_ = repid;
  230.   SRMAgent::recv_data(sender,id,data);
  231. }
  232. void SSMSRMAgent::send_ctrl(int type, int round, int sender, int msgid, int size)
  233. {
  234.   Packet* p = Agent::allocpkt();
  235.   hdr_srm* sh = hdr_srm::access(p);
  236.   hdr_srm_ext* seh = hdr_srm_ext::access(p);
  237.   sh->type() = type;
  238.   sh->sender() = sender;
  239.   sh->seqnum() = msgid;
  240.   sh->round() = round;
  241.   seh->repid() = repid_;  /* For ctrl messages this is your own repid */
  242.   hdr_cmn* ch = hdr_cmn::access(p);
  243.   ch->size() = sizeof(hdr_srm) + size;
  244.   target_->recv(p, (Handler*)NULL);
  245. }
  246. void SSMSRMAgent::recv_rqst(int requestor, int round, int sender, 
  247.     int msgid, int repid)
  248. {
  249.   //Tcl& tcl = Tcl::instance();
  250.   SRMinfo* rsp = get_state(requestor);
  251.   rsp->repid_ = repid;
  252.   SRMAgent::recv_rqst(requestor,round, sender,msgid);
  253. }
  254. void SSMSRMAgent::send_sess() 
  255. {
  256.   if (scopeFlag_ == SRM_GLOBAL) {
  257.     send_glb_sess();
  258.     send_rep_sess();
  259.   } else {
  260.     send_loc_sess();
  261.   }
  262. //   timeout_info();
  263. }
  264. #define SESSINFO_SIZE   5
  265. #define SESS_CONST      2
  266. void SSMSRMAgent::send_glb_sess()
  267. {
  268. int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);  
  269. /* Currently do extra allocation, later change */
  270. int     num_entries;
  271.         Packet* p = Agent::allocpkt(size);
  272.         hdr_srm* sh = hdr_srm::access(p);
  273.         hdr_srm_ext* seh = hdr_srm_ext::access(p);
  274. #if 0
  275. printf("sending global session messagen");
  276. #endif
  277.         sh->type() = SRM_SESS;
  278.         sh->sender() = addr();
  279.         sh->seqnum() = ++glb_sessCtr_;
  280. seh->repid() = repid_;
  281.         int* data = (int*) p->accessdata();
  282. *data++ = groupSize_;
  283. *data++ = SRM_GLOBAL;
  284. num_entries = 0;
  285. for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
  286.   /* Global Session Message has information about Senders/reps */
  287.            if ((sp->senderFlag_ || 
  288. (sp->scopeFlag_ == SRM_GLOBAL) ||
  289. (sp->sender_ == addr()))
  290.        && (is_active(sp))) {      
  291.         *data++ = sp->sender_;
  292.                 *data++ = sp->ldata_;
  293.                 *data++ = sp->recvTime_;
  294.                 *data++ = sp->sendTime_;
  295. *data++ = sp->repid_;
  296. num_entries++;
  297.       }
  298.         }
  299. data = (int*) p->accessdata();
  300. data[0] = num_entries;
  301. data[1] = SRM_GLOBAL;
  302. size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);
  303. data[5] = (int) (Scheduler::instance().clock()*1000);
  304. hdr_cmn* ch = hdr_cmn::access(p);
  305.         ch->size() += size+ sizeof(hdr_srm); /* Add size of srm_hdr_ext */
  306.         hdr_ip*  ih = hdr_ip::access(p);
  307. ih->ttl() = groupScope_;
  308. // Currently put this to distinguish various session messages
  309. ih->flowid() = SRM_GLOBAL;
  310. seh->ottl() = groupScope_;
  311.         target_->recv(p, (Handler*)NULL);
  312. }
  313. void SSMSRMAgent::send_loc_sess()
  314. {
  315. int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);  
  316. /* Currently do extra allocation, later change */
  317. int     num_entries;
  318.         Packet* p = Agent::allocpkt(size);
  319.         hdr_srm* sh = hdr_srm::access(p);
  320.         hdr_srm_ext* seh = hdr_srm_ext::access(p);
  321.         sh->type() = SRM_SESS;
  322.         sh->sender() = addr();
  323.         sh->seqnum() = ++loc_sessCtr_;
  324. seh->repid() = repid_;
  325. #if 0
  326. printf("sending local session messagen");
  327. #endif
  328.         int* data = (int*) p->accessdata();
  329. //int* tmp_data = (int*) p->accessdata();
  330. *data++ = groupSize_;
  331. *data++ = SRM_LOCAL;
  332. num_entries = 0;
  333. for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
  334.   /* Local Session Message has information 
  335.      about Senders/other locals */
  336.   if ((sp->senderFlag_ || 
  337.        (sp->scopeFlag_ == SRM_LOCAL) ||
  338.        (sp->distanceFlag_ = SELF_DISTANCE) || 
  339.        /* For the reps that I am hearing from */
  340.        (sp->sender_ == addr()) ||   
  341.        // just in case, I have not set the flags properly, 
  342.        // one entry has to be there
  343.        (repid_ == sp->sender_)) 
  344.       && (is_active(sp))) {      
  345.     *data++ = sp->sender_;
  346.     *data++ = sp->ldata_;
  347.     *data++ = sp->recvTime_;
  348.     *data++ = sp->sendTime_;
  349.     *data++ = sp->repid_;
  350.     num_entries++;
  351.   }
  352.         }
  353. data = (int*) p->accessdata();
  354. data[0] = num_entries;
  355. data[1] = SRM_LOCAL;
  356. size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);
  357. data[5] = (int) (Scheduler::instance().clock()*1000);
  358. hdr_cmn* ch = hdr_cmn::access(p);
  359.         ch->size() += size+ sizeof(hdr_srm);
  360.         hdr_ip*  ih = hdr_ip::access(p);
  361. ih->ttl() = localScope_;
  362. // Currently put this to distinguish various session messages
  363. ih->flowid() = SRM_LOCAL;
  364. seh->ottl() = localScope_;
  365.         target_->recv(p, (Handler*)NULL);
  366. }
  367. void SSMSRMAgent::send_rep_sess()
  368. {
  369. int size = (SESS_CONST + groupSize_ * SESSINFO_SIZE) * sizeof(int);  
  370. /* Currently do extra allocation, later change */
  371. int     num_entries, num_local_members;
  372.         Packet* p = Agent::allocpkt(size);
  373.         hdr_srm* sh = hdr_srm::access(p);
  374.         hdr_srm_ext* seh = hdr_srm_ext::access(p);
  375.         sh->type() = SRM_SESS;
  376.         sh->sender() = addr();
  377.         sh->seqnum() = ++rep_sessCtr_;
  378. seh->repid() = repid_;
  379. #if 0
  380. printf("sending rep_info session messagen");
  381. #endif
  382.         int* data = (int*) p->accessdata();
  383. *data++ = groupSize_;
  384. *data++ = SRM_RINFO;
  385. num_entries = 0;
  386. num_local_members = 0;
  387. for (SRMinfo* sp = sip_; sp; sp = sp->next_) {
  388.   if (sp->activeFlag_ == ACTIVE) {
  389.   /* Rep info has distance to others reps and 
  390.      timestamps for everyone */
  391.         *data++ = sp->sender_;
  392.                 *data++ = sp->ldata_;
  393. if (sp->scopeFlag_ == SRM_GLOBAL) {
  394.   *data++ = (int) (sp->distance_*1000);
  395.   data++;
  396. } else { 
  397.   // Put a check here for only people I have heard from.??
  398.   *data++ = sp->recvTime_;
  399.   *data++ = sp->sendTime_;
  400.   num_local_members++;
  401. }
  402. *data++ = sp->repid_;
  403. num_entries++;
  404.       }
  405. }
  406. if (num_local_members <= 0) {
  407.   Packet::free(p);
  408.   return;
  409. }
  410. data = (int*) p->accessdata();
  411. data[0] = num_entries;
  412. data[1] = SRM_RINFO;
  413. size = (SESS_CONST + num_entries * SESSINFO_SIZE) * sizeof(int);  
  414. data[5] = (int) (Scheduler::instance().clock()*1000);
  415. hdr_cmn* ch = hdr_cmn::access(p);
  416.         ch->size() += size+ sizeof(hdr_srm);
  417.         hdr_ip*  ih = hdr_ip::access(p);
  418. ih->ttl() = localScope_;
  419. // Currently put this to distinguish various session messages
  420. ih->flowid() = SRM_RINFO;
  421. seh->ottl() = localScope_;
  422.         target_->recv(p, (Handler*)NULL);
  423. }
  424. #define GET_SESSION_INFO
  425. sender = *data++;
  426. dataCnt = *data++;
  427. rtime = *data++;
  428. stime = *data++;                         
  429.         repid = *data++;                        
  430. // printf("s:%d, d:%d, rt:%d, st:%d, rep:%dn",sender,
  431. // dataCnt,rtime,stime,repid)
  432. void SSMSRMAgent::recv_sess(int sessCtr, int* data, Packet* p)
  433. {
  434.   int type = data[1];
  435.   switch (type) {
  436.   case SRM_GLOBAL :
  437.     recv_glb_sess(sessCtr,data,p);
  438.     break;
  439.   case SRM_LOCAL :
  440.     recv_loc_sess(sessCtr,data,p);
  441.     break;
  442.   case SRM_RINFO :    
  443.     if (scopeFlag_ == SRM_GLOBAL) return;
  444.     recv_rep_sess(sessCtr,data,p);
  445.     break;
  446.   }
  447.   Packet::free(p);
  448. }
  449. void SSMSRMAgent::recv_glb_sess(int sessCtr, int* data, Packet* p)
  450. {
  451.   Tcl& tcl = Tcl::instance();
  452.   SRMinfo* sp;
  453.   int ttl;
  454.   hdr_ip*  ih = hdr_ip::access(p);
  455.   hdr_srm_ext* seh = hdr_srm_ext::access(p);
  456.   ttl = seh->ottl() - ih->ttl();
  457.   int sender, dataCnt, rtime, stime,repid;
  458.   int now, sentAt, sentBy;
  459.   int cnt = *data++;
  460.   //int type = *data++;
  461.   int i;
  462.   // data = data + SESS_CONST;  
  463.   /* As as included type of session message also */
  464.   /* The first block contains the sender's own state */
  465.   GET_SESSION_INFO;
  466.   if (sender == addr())
  467.     // sender's own session message
  468.     return;
  469.   if (seh->repid() != repid) {
  470.     fprintf(stdout,"%f Recvd a glb-sess with diff header(%d) != inside(%d)n",
  471.     Scheduler::instance().clock(),seh->repid(),repid);
  472.     /* abort(); */
  473.     return;
  474.   }
  475.   if (sender != repid) {
  476.     fprintf(stdout,"%f Recvd a glb-sess with repid(%d) != address(%d)n",
  477.     Scheduler::instance().clock(),repid,sender);
  478.     /* abort(); */
  479.     return;
  480.   }
  481.    
  482.   sp = get_state(sender);
  483.   if (sp->lglbsess_ > sessCtr) // older session message recd.
  484.     return;
  485. #if 0
  486.   fprintf(stdout,"%s recv-gsess from %dn",name_,sender);
  487. #endif 
  488.   tcl.evalf("%s recv-gsess %d %d", name_, sender, ttl);
  489.   
  490.   if (sp->scopeFlag_ != SRM_GLOBAL) {
  491.     sp->scopeFlag_ = SRM_GLOBAL;
  492.   }
  493.   sp->repid_ = repid;
  494.   
  495.   now = (int) (Scheduler::instance().clock() * 1000);
  496.   sentBy = sender; // to later compute rtt
  497.   sentAt = stime;
  498.   sp->lglbsess_ = sessCtr;
  499.   sp->recvTime_ = now;
  500.   sp->sendTime_ = stime;
  501.   for (i = sp->ldata_ + 1; i <= dataCnt; i++)
  502.     if (! sp->ifReceived(i))
  503.       tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
  504.   if (sp->ldata_ < dataCnt)
  505.     sp->ldata_ = dataCnt;
  506.   
  507.   for (i = 1; i < cnt; i++) {
  508.     GET_SESSION_INFO;
  509.     if (sender == addr() && now) {
  510.       int rtt = (now - sentAt) + (rtime - stime);
  511.       sp = get_state(sentBy);
  512.       sp->distance_ = (double) rtt / 2 / 1000;
  513.       sp->distanceFlag_ = SELF_DISTANCE;
  514. #if 0
  515.       fprintf(stderr,
  516.       "%7.4f %s compute distance to %d: %fn",
  517.       Scheduler::instance().clock(), name_,
  518.       sentBy, sp->distance_);
  519. #endif
  520.       continue;
  521.     }
  522.     sp = get_state(sender);
  523.     for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
  524.       if (! sp->ifReceived(j))
  525. tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
  526.     if (sp->ldata_ < dataCnt)
  527.       sp->ldata_ = dataCnt;
  528.   }
  529. }
  530. void SSMSRMAgent::recv_loc_sess(int sessCtr, int* data, Packet* p)
  531. {
  532.   Tcl& tcl = Tcl::instance();
  533.   SRMinfo* sp;
  534.   int ttl;
  535.   hdr_ip*  ih = hdr_ip::access(p);
  536.   hdr_srm_ext* seh = hdr_srm_ext::access(p);
  537.   ttl = seh->ottl() - ih->ttl();
  538.   
  539.   int sender, dataCnt, rtime, stime,repid;
  540.   int now, sentAt, sentBy;
  541.   int cnt = *data++;
  542.   /*int type = * */data++;
  543.   int i;
  544.   // data = data + SESS_CONST;  
  545.   /* As as included type of session message also */
  546.   /* The first block contains the sender's own state */
  547.   GET_SESSION_INFO;
  548.   if (sender == addr()) // sender's own session message
  549.     return;
  550.   
  551.   sp = get_state(sender);
  552.   if (sp->llocsess_ > sessCtr) // older session message recd.
  553.     return;
  554.   if (sp->scopeFlag_ != SRM_LOCAL) {
  555.     sp->scopeFlag_ = SRM_LOCAL;
  556.     // Also put a check if this is my child
  557.   }
  558.   sp->repid_ = repid;
  559. #if 0
  560.   fprintf(stdout,"%s recv-lsess from %dn",name_,sender);
  561. #endif 
  562.   tcl.evalf("%s recv-lsess %d %d %d", name_, sender, repid, ttl);
  563.   
  564.   now = (int) (Scheduler::instance().clock() * 1000);
  565.   sentBy = sender; // to later compute rtt
  566.   sentAt = stime;
  567.   sp->llocsess_ = sessCtr;
  568.   sp->recvTime_ = now;
  569.   sp->sendTime_ = stime;
  570.   for (i = sp->ldata_ + 1; i <= dataCnt; i++)
  571.     if (! sp->ifReceived(i))
  572.       tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
  573.   if (sp->ldata_ < dataCnt)
  574.     sp->ldata_ = dataCnt;
  575.   
  576.   for (i = 1; i < cnt; i++) {
  577.     GET_SESSION_INFO;
  578.     if (sender == addr() && now) {
  579.       int rtt = (now - sentAt) + (rtime - stime);
  580.       sp = get_state(sentBy);
  581.       sp->distance_ = (double) rtt / 2 / 1000;
  582.       sp->distanceFlag_ = SELF_DISTANCE;
  583. #if 0
  584.       fprintf(stderr,
  585.       "%7.4f %s compute distance to %d: %fn",
  586.       Scheduler::instance().clock(), name_,
  587.       sentBy, sp->distance_);
  588. #endif
  589.       continue;
  590.     }
  591.     sp = get_state(sender);
  592.     for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
  593.       if (! sp->ifReceived(j))
  594. tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
  595.     if (sp->ldata_ < dataCnt)
  596.       sp->ldata_ = dataCnt;
  597.   }
  598. }
  599. // For the global members the repid == addr
  600. void SSMSRMAgent::recv_rep_sess(int sessCtr, int* data, Packet*)
  601. {
  602.   Tcl& tcl = Tcl::instance();
  603.   SRMinfo* sp;
  604.   
  605.   int sender, dataCnt, rtime, stime,repid;
  606.   int now, sentAt, sentBy;
  607.   int cnt = *data++;
  608.   /*int type = **/data++;
  609.   int i;
  610.   //data = data + SESS_CONST;  
  611.   /* As as included type of session message also */
  612.   /* The first block contains the sender's own state */
  613.   GET_SESSION_INFO;
  614.   if (sender == addr()) // sender's own session message
  615.     return;
  616.   if (sender != repid_)                 // not from my rep
  617.     return;
  618.   if (sender != repid) {
  619.     fprintf(stdout,"Recvd a rep-sess with repid(%d) != address(%d)n",
  620.     repid,sender);
  621.     abort();
  622.   }
  623.   sp = get_state(sender);
  624.   if (sp->lrepsess_ > sessCtr) // older session message recd.
  625.     return;
  626.   if (sp->scopeFlag_ != SRM_GLOBAL)      // Should I change the repid also??
  627.     sp->scopeFlag_ = SRM_GLOBAL;
  628.   
  629.   now = (int) (Scheduler::instance().clock() * 1000);
  630.   sentBy = sender; // to later compute rtt
  631.   sentAt = stime;
  632.   sp->lrepsess_ = sessCtr;
  633.   sp->recvTime_ = now;
  634.   sp->sendTime_ = stime;
  635.   for (i = sp->ldata_ + 1; i <= dataCnt; i++)
  636.     if (! sp->ifReceived(i))
  637.       tcl.evalf("%s request %d %d", name_, sender, i, sp->repid_);
  638.   if (sp->ldata_ < dataCnt)
  639.     sp->ldata_ = dataCnt;
  640.   for (i = 1; i < cnt; i++) {
  641.     GET_SESSION_INFO;
  642.     if (sender == addr() && now) {
  643.       int rtt = (now - sentAt) + (rtime - stime);
  644.       sp = get_state(sentBy);
  645.       sp->distance_ = (double) rtt / 2 / 1000;
  646.       sp->distanceFlag_ = SELF_DISTANCE;
  647. #if 0
  648.       fprintf(stderr,
  649.       "%7.4f %s compute distance to %d: %fn",
  650.       Scheduler::instance().clock(), name_,
  651.       sentBy, sp->distance_);
  652. #endif
  653.       continue;
  654.     }
  655.     if ((sender == repid) && (sender != sentBy)) {
  656.       sp = get_state(sender);
  657.       if (!(is_active(sp) && (sp->distanceFlag_ == SELF_DISTANCE))) {
  658. sp->distance_ = (double) rtime/1000;   
  659. /* As for global members this is distance */
  660. sp->distanceFlag_ = REP_DISTANCE;
  661. /* ?? What if I am hearing from this guy already */
  662.       }
  663.     }
  664.     sp = get_state(sender);
  665.     for (int j = sp->ldata_ + 1; j <= dataCnt; j++)
  666.       if (! sp->ifReceived(j))
  667. tcl.evalf("%s request %d %d", name_, sender, j, sp->repid_);
  668.     if (sp->ldata_ < dataCnt)
  669.       sp->ldata_ = dataCnt;
  670.   }
  671. }
  672. #define sessionDelay   1000
  673. void SSMSRMAgent::timeout_info()
  674. {
  675.   int now;
  676.   now = (int) (Scheduler::instance().clock() * 1000);
  677.   for (SRMinfo* sp = sip_->next_; sp; sp = sp->next_) {
  678.     if ((now - sp->recvTime_) >= 3*sessionDelay) {
  679.       sp->activeFlag_ = INACTIVE;
  680.       groupSize_--;
  681.     }
  682.   }
  683. }
  684. int SSMSRMAgent::is_active(SRMinfo *sp)
  685. {
  686.   int now;
  687.   now = (int) (Scheduler::instance().clock() * 1000);
  688.   if ((sp->sender_ != addr()) && ((now - sp->recvTime_) >= 3*sessionDelay)) {
  689.     return 0;
  690.   } else {
  691.     return 1;
  692.   }
  693. }