http.cc
上传用户:rrhhcc
上传日期:2015-12-11
资源大小:54129k
文件大小:41k
源码类别:

通讯编程

开发平台:

Visual C++

  1. /* 
  2.  * Copyright (c) Xerox Corporation 1998. All rights reserved.
  3.  *
  4.  * This program is free software; you can redistribute it and/or modify it
  5.  * under the terms of the GNU General Public License as published by the
  6.  * Free Software Foundation; either version 2 of the License, or (at your
  7.  * option) any later version.
  8.  *
  9.  * This program is distributed in the hope that it will be useful, but
  10.  * WITHOUT ANY WARRANTY; without even the implied warranty of
  11.  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
  12.  * General Public License for more details.
  13.  *
  14.  * You should have received a copy of the GNU General Public License along
  15.  * with this program; if not, write to the Free Software Foundation, Inc.,
  16.  * 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA
  17.  *
  18.  * Linking this file statically or dynamically with other modules is making
  19.  * a combined work based on this file.  Thus, the terms and conditions of
  20.  * the GNU General Public License cover the whole combination.
  21.  *
  22.  * In addition, as a special exception, the copyright holders of this file
  23.  * give you permission to combine this file with free software programs or
  24.  * libraries that are released under the GNU LGPL and with code included in
  25.  * the standard release of ns-2 under the Apache 2.0 license or under
  26.  * otherwise-compatible licenses with advertising requirements (or modified
  27.  * versions of such code, with unchanged license).  You may copy and
  28.  * distribute such a system following the terms of the GNU GPL for this
  29.  * file and the licenses of the other code concerned, provided that you
  30.  * include the source code of that other code when and as the GNU GPL
  31.  * requires distribution of source code.
  32.  *
  33.  * Note that people who make modified versions of this file are not
  34.  * obligated to grant this special exception for their modified versions;
  35.  * it is their choice whether to do so.  The GNU General Public License
  36.  * gives permission to release a modified version without this exception;
  37.  * this exception also makes it possible to release a modified version
  38.  * which carries forward this exception.
  39.  *
  40.  * $Header: /cvsroot/nsnam/ns-2/webcache/http.cc,v 1.21 2005/09/18 23:33:35 tomh Exp $
  41.  *
  42.  */
  43. //
  44. // Implementation of the HTTP agent. We want a separate agent for HTTP because
  45. // we are interested in (detailed) HTTP headers, instead of just request and 
  46. // response patterns.
  47. //
  48. #include <stdlib.h>
  49. #include <assert.h>
  50. #include <string.h>
  51. #include <stdarg.h>
  52. #include "tclcl.h"
  53. #include "agent.h"
  54. #include "app.h"
  55. #include "tcp-simple.h"
  56. #include "http.h"
  57. #include "http-aux.h"
  58. #include "trace.h"
  59. #include "tcpapp.h"
  60. #include "mcache.h"
  61. //----------------------------------------------------------------------
  62. // Http Application
  63. //
  64. // Allows multiple concurrent HTTP connections
  65. //----------------------------------------------------------------------
  66. static class HttpAppClass : public TclClass {
  67. public:
  68.         HttpAppClass() : TclClass("Http") {}
  69.         TclObject* create(int, const char*const*) {
  70. return (new HttpApp());
  71. }
  72. } class_http_app;
  73. // What states should be in a http agent?
  74. HttpApp::HttpApp() : log_(0)
  75. {
  76. bind("id_", &id_);
  77. // Map a client address to a particular TCP agent
  78. tpa_ = new Tcl_HashTable;
  79. Tcl_InitHashTable(tpa_, TCL_ONE_WORD_KEYS);
  80. }
  81. HttpApp::~HttpApp()
  82. {
  83. if (tpa_ != NULL) {
  84. Tcl_DeleteHashTable(tpa_);
  85. delete tpa_;
  86. }
  87. }
  88. int HttpApp::add_cnc(HttpApp* client, TcpApp *agt)
  89. {
  90. int newEntry = 1;
  91. long key = client->id();
  92. Tcl_HashEntry *he = Tcl_CreateHashEntry(tpa_, 
  93. (const char *) key,
  94. &newEntry);
  95. if (he == NULL) 
  96. return -1;
  97. if (newEntry)
  98. Tcl_SetHashValue(he, (ClientData)agt);
  99. return 0;
  100. }
  101. void HttpApp::delete_cnc(HttpApp* client)
  102. {
  103.         long key = client->id();
  104. Tcl_HashEntry *he = Tcl_FindHashEntry(tpa_,(const char *)key);
  105. if (he != NULL) {
  106. TcpApp *cnc = (TcpApp *)Tcl_GetHashValue(he);
  107. Tcl_DeleteHashEntry(he);
  108. delete cnc;
  109. }
  110. }
  111. TcpApp* HttpApp::lookup_cnc(HttpApp* client)
  112. {
  113.         long key = client->id();
  114. Tcl_HashEntry *he = 
  115. Tcl_FindHashEntry(tpa_, (const char *)key);
  116. if (he == NULL)
  117. return NULL;
  118. return (TcpApp *)Tcl_GetHashValue(he);
  119. }
  120. // Basic functionalities: 
  121. int HttpApp::command(int argc, const char*const* argv)
  122. {
  123. Tcl& tcl = Tcl::instance();
  124. if (argc == 2) {
  125. if (strcmp(argv[1], "id") == 0) {
  126. if (argc == 3) {
  127. id_ = atoi(argv[2]);
  128. tcl.resultf("%d", id_);
  129. } else
  130. tcl.resultf("%d", id_);
  131. return TCL_OK;
  132. } else if (strcmp(argv[1], "log") == 0) {
  133. // Return the name of the log channel
  134. if (log_ != NULL)
  135. tcl.resultf("%s", Tcl_GetChannelName(log_));
  136. else
  137. tcl.result("");
  138. return TCL_OK;
  139. }
  140. } else if (argc == 3) {
  141. if (strcmp(argv[1], "get-modtime") == 0) {
  142. double mt;
  143. if (pool_->get_mtime(argv[2], mt) != -1) {
  144. tcl.resultf("%.17g", mt);
  145. return TCL_OK;
  146. } else 
  147. return TCL_ERROR;
  148. } else if (strcmp(argv[1], "exist-page") == 0) { 
  149. tcl.resultf("%d", pool_->exist_page(argv[2]));
  150. return TCL_OK;
  151. } else if (strcmp(argv[1], "get-size") == 0) {
  152. int size;
  153. if (pool_->get_size(argv[2], size) != -1) {
  154. tcl.resultf("%d", size);
  155. return TCL_OK;
  156. } else 
  157. return TCL_ERROR;
  158. } else if (strcmp(argv[1], "get-age") == 0) {
  159. double age;
  160. if (pool_->get_age(argv[2], age) != -1) {
  161. tcl.resultf("%.17g", age);
  162. return TCL_OK;
  163. } else 
  164. return TCL_ERROR;
  165. } else if (strcmp(argv[1], "get-cachetime") == 0) {
  166. double et;
  167. if (pool_->get_etime(argv[2], et) != -1) {
  168. tcl.resultf("%.17g", et);
  169. return TCL_OK;
  170. } else 
  171. return TCL_ERROR;
  172. } else if (strcmp(argv[1], "get-page") == 0) {
  173. char buf[4096];
  174. if (pool_->get_pageinfo(argv[2], buf) != -1) {
  175. tcl.resultf("%s", buf);
  176. return TCL_OK;
  177. } else 
  178. return TCL_ERROR;
  179. } else if (strcmp(argv[1], "get-cnc") == 0) {
  180. /*
  181.  * <http> get-cnc <client>
  182.  *
  183.  * Given the communication party, get the tcp agent 
  184.  * connected to it.
  185.  */
  186. HttpApp *client = 
  187. (HttpApp *)TclObject::lookup(argv[2]);
  188. TcpApp *cnc = (TcpApp *)lookup_cnc(client);
  189. if (cnc == NULL)
  190. tcl.result("");
  191. else 
  192. tcl.resultf("%s", cnc->name());
  193. return TCL_OK;
  194. } else if (strcmp(argv[1], "set-pagepool") == 0) {
  195. pool_ = (ClientPagePool*)TclObject::lookup(argv[2]);
  196. if (pool_ != NULL) 
  197. return TCL_OK;
  198. else 
  199. return TCL_ERROR;
  200. } else if (strcmp(argv[1], "is-connected") == 0) {
  201. /*
  202.  * <http> is-connected <server>
  203.  */
  204. HttpApp *a = (HttpApp*)TclObject::lookup(argv[2]);
  205. TcpApp *cnc = (TcpApp*)lookup_cnc(a);
  206. if (cnc == NULL) 
  207. tcl.result("0");
  208. else 
  209. tcl.result("1");
  210. return TCL_OK;
  211. } else if (strcmp(argv[1], "is-valid") == 0) {
  212. ClientPage *pg = 
  213. (ClientPage *)pool_->get_page(argv[2]);
  214. if (pg == NULL) {
  215. tcl.resultf("%d is-valid: No page %s", 
  216.     id_, argv[2]);
  217. return TCL_ERROR;
  218. }
  219. tcl.resultf("%d", pg->is_valid());
  220. return TCL_OK;
  221. } else if (strcmp(argv[1], "log") == 0) {
  222. int mode;
  223. log_ = Tcl_GetChannel(tcl.interp(), 
  224.       (char*)argv[2], &mode);
  225. if (log_ == 0) {
  226. tcl.resultf("%d: invalid log file handle %sn",
  227.     id_, argv[2]);
  228. return TCL_ERROR;
  229. }
  230. return TCL_OK;
  231. } else if (strcmp(argv[1], "disconnect") == 0) {
  232. /*
  233.  * <http> disconnect <client> 
  234.  * Delete the association of source and sink TCP.
  235.  */
  236. HttpApp *client = 
  237. (HttpApp *)TclObject::lookup(argv[2]);
  238. delete_cnc(client);
  239. return TCL_OK;
  240. } else if (strcmp(argv[1], "get-pagetype") == 0) {
  241. /*
  242.  * <http> get-pagetype <pageid>
  243.  * return the page type
  244.  */
  245. ClientPage *pg = 
  246. (ClientPage*)pool_->get_page(argv[2]);
  247. if (pg == NULL) {
  248. tcl.resultf("%d get-pagetype: No page %s", 
  249.     id_, argv[2]);
  250. return TCL_ERROR;
  251. }
  252. switch (pg->type()) {
  253. case HTML:
  254. tcl.result("HTML");
  255. break;
  256. case MEDIA:
  257. tcl.result("MEDIA");
  258. break;
  259. default:
  260. fprintf(stderr, "Unknown page type %d", 
  261. pg->type());
  262. return TCL_ERROR;
  263. }
  264. return TCL_OK;
  265. } else if (strcmp(argv[1], "get-layer") == 0) {
  266. // Assume the page is a MediaPage
  267. MediaPage *pg = (MediaPage *)pool_->get_page(argv[2]);
  268. if (pg == NULL) {
  269. tcl.resultf("%d get-layer: No page %s", 
  270.     id_, argv[2]);
  271. return TCL_ERROR;
  272. }
  273. if (pg->type() != MEDIA) {
  274. tcl.resultf("%d get-layer %s not a media page",
  275.     id_, argv[2]);
  276. return TCL_ERROR;
  277. }
  278. tcl.resultf("%d", pg->num_layer());
  279. return TCL_OK;
  280. }
  281. } else if (argc == 4) {
  282. if (strcmp(argv[1], "connect") == 0) {
  283. /*
  284.  * <http> connect <client> <ts>
  285.  *
  286.  * Associate a TCP agent with the given client. 
  287.  * <ts> is the agent used to send packets out.
  288.  * We assume two-way TCP connection, therefore we 
  289.  * only need one agent.
  290.  */
  291. HttpApp *client = 
  292. (HttpApp *)TclObject::lookup(argv[2]);
  293. TcpApp *cnc = (TcpApp *)TclObject::lookup(argv[3]);
  294. if (add_cnc(client, cnc)) {
  295. tcl.resultf("%s: failed to connect to %s", 
  296.     name_, argv[2]);
  297. return TCL_ERROR;
  298. }
  299. // Set data delivery target
  300. cnc->target() = (Process*)this;
  301. return TCL_OK;
  302. } else if (strcmp(argv[1], "set-modtime") == 0) {
  303. double mt = strtod(argv[3], NULL);
  304. if (pool_->set_mtime(argv[2], mt) != -1)
  305. return TCL_OK;
  306. else 
  307. return TCL_ERROR;
  308. } else if (strcmp(argv[1], "set-cachetime") == 0) {
  309. double et = Scheduler::instance().clock();
  310. if (pool_->set_etime(argv[2], et) != -1)
  311. return TCL_OK;
  312. else 
  313. return TCL_ERROR;
  314. }
  315. } else {
  316. if (strcmp(argv[1], "send") == 0) {
  317. /*
  318.  * <http> send <client> <bytes> <callback> 
  319.  */
  320. HttpApp *client = 
  321. (HttpApp *)TclObject::lookup(argv[2]);
  322. if (client == NULL) {
  323. tcl.add_errorf("%s: bad client name %s",
  324.        name_, argv[2]);
  325. return TCL_ERROR;
  326. }
  327. int bytes = atoi(argv[3]);
  328. TcpApp *cnc = (TcpApp *)lookup_cnc(client);
  329. if (cnc == NULL) {
  330. //tcl.resultf("%s: no connection to client %s",
  331. //     name_, argv[2]);
  332. // Tolerate it
  333. return TCL_OK;
  334. }
  335. char *buf = strdup(argv[4]);
  336. HttpNormalData *d = 
  337. new HttpNormalData(id_, bytes, buf);
  338. cnc->send(bytes, d);
  339. // delete d;
  340. free(buf);
  341. return TCL_OK;
  342. } else if (strcmp(argv[1], "enter-page") == 0) {
  343. ClientPage* pg = pool_->enter_page(argc, argv);
  344. if (pg == NULL)
  345. return TCL_ERROR;
  346. else 
  347. return TCL_OK;
  348. } else if (strcmp(argv[1], "evTrace") == 0) { 
  349. char buf[1024], *p;
  350. if (log_ != 0) {
  351. sprintf(buf, TIME_FORMAT" i %d ", 
  352.   BaseTrace::round(Scheduler::instance().clock()), 
  353. id_);
  354. p = &(buf[strlen(buf)]);
  355. for (int i = 2; i < argc; i++) {
  356. strcpy(p, argv[i]);
  357. p += strlen(argv[i]);
  358. *(p++) = ' ';
  359. }
  360. // Stick in a newline.
  361. *(p++) = 'n', *p = 0;
  362. Tcl_Write(log_, buf, p-buf);
  363. }
  364. return TCL_OK;
  365. }
  366. }
  367. return TclObject::command(argc, argv);
  368. }
  369. void HttpApp::log(const char* fmt, ...)
  370. {
  371. // Don't do anything if we don't have a log file.
  372. if (log_ == 0) 
  373. return;
  374. char buf[10240], *p;
  375. sprintf(buf, TIME_FORMAT" i %d ", 
  376. BaseTrace::round(Scheduler::instance().clock()), id_);
  377. p = &(buf[strlen(buf)]);
  378. va_list ap;
  379. va_start(ap, fmt);
  380. vsprintf(p, fmt, ap);
  381. Tcl_Write(log_, buf, strlen(buf));
  382. }
  383. void HttpApp::process_data(int, AppData* data)
  384. {
  385. if (data == NULL) 
  386. return;
  387. switch (data->type()) {
  388. case HTTP_NORMAL: {
  389. HttpNormalData *tmp = (HttpNormalData*)data;
  390. Tcl::instance().eval(tmp->str());
  391. break;
  392. }
  393. default:
  394. fprintf(stderr, "Bad http invalidation data type %dn", 
  395. data->type());
  396. abort();
  397. break;
  398. }
  399. }
  400. //----------------------------------------------------------------------
  401. // Clients
  402. //----------------------------------------------------------------------
  403. static class HttpClientClass : public TclClass {
  404. public:
  405. HttpClientClass() : TclClass("Http/Client") {}
  406.         TclObject* create(int, const char*const*) {
  407. return (new HttpClient());
  408. }
  409. } class_httpclient_app;
  410. //----------------------------------------------------------------------
  411. // Servers
  412. //----------------------------------------------------------------------
  413. static class HttpServerClass : public TclClass {
  414. public:
  415.         HttpServerClass() : TclClass("Http/Server") {}
  416.         TclObject* create(int, const char*const*) {
  417. return (new HttpServer());
  418. }
  419. } class_httpserver_app;
  420. static class HttpInvalServerClass : public TclClass {
  421. public:
  422.         HttpInvalServerClass() : TclClass("Http/Server/Inval") {}
  423.         TclObject* create(int, const char*const*) {
  424. return (new HttpInvalServer());
  425. }
  426. } class_httpinvalserver_app;
  427. static class HttpYucInvalServerClass : public TclClass {
  428. public:
  429.         HttpYucInvalServerClass() : TclClass("Http/Server/Inval/Yuc") {}
  430.         TclObject* create(int, const char*const*) {
  431. return (new HttpYucInvalServer());
  432. }
  433. } class_httpyucinvalserver_app;
  434. HttpYucInvalServer::HttpYucInvalServer() :
  435. inv_sender_(0), invlist_(0), num_inv_(0)
  436. {
  437. bind("hb_interval_", &hb_interval_);
  438. bind("enable_upd_", &enable_upd_);
  439. bind("Ca_", &Ca_);
  440. bind("Cb_", &Cb_);
  441. bind("push_thresh_", &push_thresh_);
  442. bind("push_high_bound_", &push_high_bound_);
  443. bind("push_low_bound_", &push_low_bound_);
  444. }
  445. int HttpYucInvalServer::command(int argc, const char*const* argv)
  446. {
  447. Tcl& tcl = Tcl::instance();
  448. switch (argv[1][0]) {
  449. case 'a': 
  450. if (strcmp(argv[1], "add-inval-sender") == 0) {
  451. HttpUInvalAgent *tmp = 
  452. (HttpUInvalAgent *)TclObject::lookup(argv[2]);
  453. if (tmp == NULL) {
  454. tcl.resultf("Non-existent agent %s", argv[2]);
  455. return TCL_ERROR;
  456. }
  457. inv_sender_ = tmp;
  458. return TCL_OK;
  459. } if (strcmp(argv[1], "add-inv") == 0) {
  460. /*
  461.  * <server> add-inv <pageid> <modtime>
  462.  */
  463. double mtime = strtod(argv[3], NULL);
  464. add_inv(argv[2], mtime);
  465. return TCL_OK;
  466. }
  467. break;
  468. case 'c': 
  469. if (strcmp(argv[1], "count-request") == 0) {
  470. ClientPage *pg = 
  471. (ClientPage *)pool_->get_page(argv[2]);
  472. if (pg == NULL) {
  473. tcl.resultf("%d count-request: No page %s", 
  474.     id_, argv[2]);
  475. return TCL_ERROR;
  476. }
  477. pg->count_request(Cb_, push_high_bound_);
  478. log("S NTF p %s v %dn", argv[2], pg->counter());
  479. return TCL_OK;
  480. } else if (strcmp(argv[1], "count-inval") == 0) {
  481. ClientPage *pg = 
  482. (ClientPage *)pool_->get_page(argv[2]);
  483. if (pg == NULL) {
  484. tcl.resultf("%d count-inval: No page %s", 
  485.     id_, argv[2]);
  486. return TCL_ERROR;
  487. }
  488. pg->count_inval(Ca_, push_low_bound_);
  489. log("S NTF p %s v %dn", argv[2], pg->counter());
  490. return TCL_OK;
  491. break;
  492. case 'i': 
  493. if (strcmp(argv[1], "is-pushable") == 0) {
  494. ClientPage *pg = 
  495. (ClientPage *)pool_->get_page(argv[2]);
  496. if (pg == NULL) {
  497. tcl.resultf("%d is-pushable: No page %s", 
  498.     id_, argv[2]);
  499. return TCL_ERROR;
  500. }
  501. if (pg->is_mpush() && 
  502.     (Scheduler::instance().clock() - pg->mpush_time() >
  503.      HTTP_HBEXPIRE_COUNT*hb_interval_)) {
  504. // If mandatory push timer expires, stop push
  505. pg->clear_mpush();
  506. fprintf(stderr, 
  507. "server %d timeout mpushn", id_);
  508. }
  509. tcl.resultf("%d", (enable_upd_ && 
  510.    (pg->counter() >= push_thresh_) ||
  511.    pg->is_mpush()));
  512. return TCL_OK;
  513. }
  514. break;
  515. case 'r': 
  516. if ((strcmp(argv[1], "request-mpush") == 0) ||
  517.     (strcmp(argv[1], "refresh-mpush") == 0)) {
  518. ClientPage *pg = 
  519. (ClientPage *)pool_->get_page(argv[2]);
  520. if (pg == NULL) {
  521. tcl.resultf("%d is-valid: No page %s", 
  522.     id_, argv[2]);
  523. return TCL_ERROR;
  524. }
  525. pg->set_mpush(Scheduler::instance().clock());
  526. return TCL_OK;
  527. break;
  528. case 's': 
  529. if (strcmp(argv[1], "send-hb") == 0) {
  530. send_heartbeat();
  531. return TCL_OK;
  532. } else if (strcmp(argv[1], "stop-mpush") == 0) {
  533. ClientPage *pg = 
  534. (ClientPage *)pool_->get_page(argv[2]);
  535. if (pg == NULL) {
  536. tcl.resultf("%d is-valid: No page %s", 
  537.     id_, argv[2]);
  538. return TCL_ERROR;
  539. }
  540. pg->clear_mpush();
  541. fprintf(stderr, "server %d stopped mpushn", id_);
  542. return TCL_OK;
  543. }
  544. break;
  545. }
  546. return HttpApp::command(argc, argv);
  547. }
  548. void HttpYucInvalServer::add_inv(const char *name, double mtime)
  549. {
  550. InvalidationRec *p = get_invrec(name);
  551. if ((p != NULL) && (p->mtime() < mtime)) {
  552. p->detach();
  553. delete p;
  554. p = NULL;
  555. num_inv_--;
  556. if (p == NULL) {
  557. p = new InvalidationRec(name, mtime);
  558. p->insert(&invlist_);
  559. num_inv_++;
  560. }
  561. }
  562. InvalidationRec* HttpYucInvalServer::get_invrec(const char *name)
  563. {
  564. // XXX What should we do if we already have an
  565. // invalidation record of this page in our 
  566. // invlist_? --> We should replace it with the new one
  567. InvalidationRec *r = invlist_;
  568. for (r = invlist_; r != NULL; r = r->next())
  569. if (strcmp(name, r->pg()) == 0)
  570. return r;
  571. return NULL;
  572. }
  573. HttpHbData* HttpYucInvalServer::pack_heartbeat()
  574. {
  575. HttpHbData *data = new HttpHbData(id_, num_inv_);
  576. InvalidationRec *p = invlist_, *q;
  577. int i = 0;
  578. while (p != NULL) {
  579. data->add(i++, p);
  580. // Clearing up invalidation sending list
  581. if (!p->dec_scount()) {
  582. // Each invalidation is sent to its children
  583. // for at most HTTP_HBEXPIRE times. After that 
  584. // the invalidation record is removed from 
  585. // the list
  586. q = p;
  587. p = p->next();
  588. q->detach();
  589. delete q;
  590. num_inv_--;
  591. } else 
  592. p = p->next();
  593. }
  594. return data;
  595. }
  596. void HttpYucInvalServer::send_hb_helper(int size, AppData *data)
  597. {
  598. inv_sender_->send(size, data);
  599. }
  600. void HttpYucInvalServer::send_heartbeat()
  601. {
  602. if (inv_sender_ == NULL)
  603. return;
  604. HttpHbData* d = pack_heartbeat();
  605. send_hb_helper(d->cost(), d);
  606. }
  607. //----------------------------------------------------------------------
  608. // Http cache with invalidation protocols. Http/Cache and Http/Cache/Inval
  609. // are used as base classes and provide common TCL methods. Http/Cache 
  610. // derives Http/Cache/TTL and Http/Cache/TTL/Old. Http/Cache/Inval derives
  611. // unicast invalidation and multicast invalidation.
  612. //----------------------------------------------------------------------
  613. static class HttpCacheClass : public TclClass {
  614. public:
  615.         HttpCacheClass() : TclClass("Http/Cache") {}
  616.         TclObject* create(int, const char*const*) {
  617. return (new HttpCache());
  618. }
  619. } class_httpcache_app;
  620. static class HttpInvalCacheClass : public TclClass {
  621. public:
  622.         HttpInvalCacheClass() : TclClass("Http/Cache/Inval") {}
  623.         TclObject* create(int, const char*const*) {
  624. return (new HttpInvalCache());
  625. }
  626. } class_httpinvalcache_app;
  627. static class HttpMInvalCacheClass : public TclClass {
  628. public:
  629.         HttpMInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast") {}
  630.         TclObject* create(int, const char*const*) {
  631. return (new HttpMInvalCache());
  632. }
  633. } class_HttpMInvalCache_app;
  634. // Static members and functions
  635. HttpMInvalCache** HttpMInvalCache::CacheRepository_ = NULL;
  636. int HttpMInvalCache::NumCache_ = 0;
  637. void HttpMInvalCache::add_cache(HttpMInvalCache *c)
  638. {
  639. if (CacheRepository_ == NULL) {
  640. CacheRepository_ = new HttpMInvalCache* [c->id() + 1];
  641. CacheRepository_[c->id()] = c;
  642. NumCache_ = c->id();
  643. } else if (NumCache_ < c->id()) {
  644. HttpMInvalCache** p = new HttpMInvalCache* [c->id()+1];
  645. memcpy(p, CacheRepository_, 
  646.        (c->id()+1)*sizeof(HttpMInvalCache*));
  647. delete[]CacheRepository_;
  648. CacheRepository_ = p;
  649. NumCache_ = c->id();
  650. p[c->id()] = c;
  651. } else
  652. CacheRepository_[c->id()] = c;
  653. }
  654. HttpMInvalCache::HttpMInvalCache() : 
  655. hb_timer_(this, HTTP_HBINTERVAL),
  656. inv_sender_(0), num_sender_(0), size_sender_(0), 
  657. invlist_(0), num_inv_(0), inv_parent_(NULL),
  658. upd_sender_(NULL), num_updater_(0), size_updater_(0)
  659. {
  660. bind("hb_interval_", &hb_interval_);
  661. bind("enable_upd_", &enable_upd_); // If we allow push
  662. bind("Ca_", &Ca_);
  663. bind("Cb_", &Cb_);
  664. bind("push_thresh_", &push_thresh_);
  665. bind("push_high_bound_", &push_high_bound_);
  666. bind("push_low_bound_", &push_low_bound_);
  667. hb_timer_.set_interval(hb_interval_);
  668. Tcl_InitHashTable(&sstate_, TCL_ONE_WORD_KEYS);
  669. Tcl_InitHashTable(&nbr_, TCL_ONE_WORD_KEYS);
  670. }
  671. HttpMInvalCache::~HttpMInvalCache() 
  672. {
  673. if (num_sender_ > 0) 
  674. delete []inv_sender_;
  675. Tcl_DeleteHashTable(&sstate_);
  676. Tcl_DeleteHashTable(&nbr_);
  677. }
  678. int HttpMInvalCache::command(int argc, const char*const* argv)
  679. {
  680. Tcl& tcl = Tcl::instance();
  681. if (argc < 2) 
  682. return HttpInvalCache::command(argc, argv);
  683. switch (argv[1][0]) {
  684. case 'a':
  685. if ((strcmp(argv[1], "add-inval-listener") == 0) ||
  686.     (strcmp(argv[1], "add-upd-listener") == 0)) {
  687. HttpInvalAgent *tmp = 
  688. (HttpInvalAgent *)TclObject::lookup(argv[2]);
  689. tmp->attachApp((Application *)this);
  690. return TCL_OK;
  691. } else if (strcmp(argv[1], "add-inval-sender") == 0) {
  692. HttpInvalAgent *tmp = 
  693. (HttpInvalAgent *)TclObject::lookup(argv[2]);
  694. if (tmp == NULL) {
  695. tcl.resultf("Non-existent agent %s", argv[2]);
  696. return TCL_ERROR;
  697. }
  698. if (num_sender_ == size_sender_) {
  699. HttpInvalAgent **tt = 
  700. new HttpInvalAgent*[size_sender_+5];
  701. memcpy(tt, inv_sender_, 
  702.        sizeof(HttpInvalAgent*)*size_sender_);
  703. delete []inv_sender_;
  704. size_sender_ += 5;
  705. inv_sender_ = tt;
  706. }
  707. inv_sender_[num_sender_++] = tmp;
  708. return TCL_OK;
  709. } else if (strcmp(argv[1], "add-to-map") == 0) {
  710. add_cache(this);
  711. return TCL_OK;
  712. } else if (strcmp(argv[1], "add-upd-sender") == 0) {
  713. HttpInvalAgent *tmp = 
  714. (HttpInvalAgent *)TclObject::lookup(argv[2]);
  715. if (tmp == NULL) {
  716. tcl.resultf("Non-existent agent %s", argv[2]);
  717. return TCL_ERROR;
  718. }
  719. if (num_updater_ == size_updater_) {
  720. HttpInvalAgent **tt = 
  721. new HttpInvalAgent*[size_updater_+5];
  722. memcpy(tt, upd_sender_, 
  723.        sizeof(HttpInvalAgent*)*size_updater_);
  724. delete []upd_sender_;
  725. size_updater_ += 5;
  726. upd_sender_ = tt;
  727. }
  728. upd_sender_[num_updater_++] = tmp;
  729. return TCL_OK;
  730. }
  731. break;
  732. case 'c':
  733. if (strcmp(argv[1], "count-request") == 0) {
  734. ClientPage *pg = 
  735. (ClientPage *)pool_->get_page(argv[2]);
  736. if (pg == NULL) {
  737. tcl.resultf("%d count-request: No page %s", 
  738.     id_, argv[2]);
  739. return TCL_ERROR;
  740. }
  741. pg->count_request(Cb_, push_high_bound_);
  742. log("E NTF p %s v %dn", argv[2], pg->counter());
  743. return TCL_OK;
  744. } else if (strcmp(argv[1], "check-sstate") == 0) {
  745. /*
  746.  * <cache> check-sstate <sid> <cid>
  747.  * If server is re-connected, reinstate it
  748.  */
  749. int sid = atoi(argv[2]);
  750. int cid = atoi(argv[3]);
  751. check_sstate(sid, cid);
  752. return TCL_OK;
  753. }
  754. break;
  755. case 'i':
  756. // XXX We don't need a "is-pushable" for cache!
  757. if (strcmp(argv[1], "is-unread") == 0) {
  758. ClientPage *pg = 
  759. (ClientPage *)pool_->get_page(argv[2]);
  760. if (pg == NULL) {
  761. tcl.resultf("%d is-unread: No page %s", 
  762.     id_, argv[2]);
  763. return TCL_ERROR;
  764. }
  765. tcl.resultf("%d", pg->is_unread());
  766. return TCL_OK;
  767. }
  768. break;
  769. case 'j':
  770. if (strcmp(argv[1], "join") == 0) {
  771. /*
  772.  * <cache> join <server_id> <cache>
  773.  *
  774.  * <server> join via <cache>. If they are the same,
  775.  * it means we are the primary cache for <server>.
  776.  */
  777. int sid = atoi(argv[2]);
  778. HttpMInvalCache *cache = 
  779. (HttpMInvalCache*)TclObject::lookup(argv[3]);
  780. if (cache == NULL) {
  781.     tcl.add_errorf("Non-existent cache %s", argv[3]);
  782.     return TCL_ERROR;
  783. }
  784. // Add neighbor cache if necessary
  785. NeighborCache *c = lookup_nbr(cache->id());
  786. if (c == NULL)
  787. add_nbr(cache);
  788. // Establish server invalidation contract
  789. check_sstate(sid, cache->id());
  790. return TCL_OK;
  791. }
  792. break;
  793. case 'p':
  794. if (strcmp(argv[1], "parent-cache") == 0) {
  795. /*
  796.  * <cache> parent-cache <web_server_id>
  797.  * Return the parent cache of <web_server_id> in the 
  798.  * virtual distribution tree. 
  799.  */
  800. int sid = atoi(argv[2]);
  801. SState *sst = lookup_sstate(sid);
  802. if (sst == NULL)
  803. tcl.result("");
  804. else {
  805. // Bad hack... :(
  806. NeighborCache *c = lookup_nbr(sst->cache()->cache()->id());
  807. tcl.resultf("%s", c->cache()->name());
  808. }
  809. return TCL_OK;
  810. } else if (strcmp(argv[1], "push-children") == 0) {
  811. // Multicast the pushed page to all children
  812. ClientPage *pg = 
  813. (ClientPage *)pool_->get_page(argv[2]);
  814. if (pg == NULL) {
  815. tcl.resultf("%d is-valid: No page %s", 
  816.     id_, argv[2]);
  817. return TCL_ERROR;
  818. }
  819. send_upd(pg);
  820. return TCL_OK;
  821. }
  822. break;
  823. case 'r':
  824. if (strcmp(argv[1], "recv-inv") == 0) {
  825. /*
  826.  * <cache> recv-inv <pageid> <modtime>
  827.  * This should be called only by a web server, 
  828.  * therefore we do not check the validity of the 
  829.  * invalidation
  830.  */
  831. // Pack it into a HttpHbData, and process it
  832. HttpHbData *d = new HttpHbData(id_, 1);
  833. strcpy(d->rec_pg(0), argv[2]);
  834. d->rec_mtime(0) = strtod(argv[3], NULL);
  835. //int old_inv = num_inv_;
  836. tcl.resultf("%d", recv_inv(d));
  837. delete d;
  838. return TCL_OK;
  839. } else if (strcmp(argv[1], "recv-push") == 0) {
  840. /* 
  841.  * <cache> recv-push <pageid> args
  842.  */
  843. HttpUpdateData *d = new HttpUpdateData(id_, 1);
  844. strcpy(d->rec_page(0), argv[2]);
  845. for (int i = 3; i < argc; i+=2) {
  846. if (strcmp(argv[i], "modtime") == 0)
  847.   d->rec_mtime(0) = strtod(argv[i+1], NULL);
  848. else if (strcmp(argv[i], "size") == 0) {
  849.   d->rec_size(0) = atoi(argv[i+1]);
  850.   // XXX need to set total update page size
  851.   d->set_pgsize(d->rec_size(0));
  852. } else if (strcmp(argv[i], "age") == 0)
  853.   d->rec_age(0) = strtod(argv[i+1], NULL);
  854. }
  855. tcl.resultf("%d", recv_upd(d));
  856. delete d;
  857. return TCL_OK;
  858. } else if (strcmp(argv[1], "register-server") == 0) {
  859. /*
  860.  * <self> register-server <cache_id> <server_id>
  861.  * We get a GET response about a page from <server>, 
  862.  * which we hear from <cache> 
  863.  */
  864. int cid = atoi(argv[2]);
  865. int sid = atoi(argv[3]);
  866. // Assuming we've already known the cache
  867. check_sstate(sid, cid);
  868. return TCL_OK;
  869. }
  870. break;
  871. case 's':
  872. if (strcmp(argv[1], "start-hbtimer") == 0) {
  873. if (hb_timer_.status() == TIMER_IDLE)
  874. hb_timer_.sched();
  875. return TCL_OK;
  876. } else if (strcmp(argv[1], "server-hb") == 0) {
  877. int id = atoi(argv[2]);
  878. recv_heartbeat(id);
  879. return TCL_OK;
  880. } else if (strcmp(argv[1], "set-pinv-agent") == 0) {
  881. inv_parent_ = 
  882. (HttpUInvalAgent*)TclObject::lookup(argv[2]);
  883. return TCL_OK;
  884. } else if (strcmp(argv[1], "set-parent") == 0) {
  885. HttpMInvalCache *c = 
  886. (HttpMInvalCache*)TclObject::lookup(argv[2]);
  887. if (c == NULL) {
  888.     tcl.add_errorf("Non-existent cache %s", argv[2]);
  889.     return TCL_ERROR;
  890. }
  891. // Add parent cache into known cache list
  892. add_nbr(c);
  893. return TCL_OK;
  894. } else if (strcmp(argv[1], "set-unread") == 0) {
  895. ClientPage *pg = 
  896. (ClientPage *)pool_->get_page(argv[2]);
  897. if (pg == NULL) {
  898. tcl.resultf("%d is-valid: No page %s", 
  899.     id_, argv[2]);
  900. return TCL_ERROR;
  901. }
  902. pg->set_unread();
  903. return TCL_OK;
  904. } else if (strcmp(argv[1], "set-read") == 0) {
  905. ClientPage *pg = 
  906. (ClientPage *)pool_->get_page(argv[2]);
  907. if (pg == NULL) {
  908. tcl.resultf("%d is-valid: No page %s", 
  909.     id_, argv[2]);
  910. return TCL_ERROR;
  911. }
  912. pg->set_read();
  913. return TCL_OK;
  914. } else if (strcmp(argv[1], "set-mandatory-push") == 0) { 
  915. ClientPage *pg = 
  916. (ClientPage *)pool_->get_page(argv[2]);
  917. if (pg == NULL) {
  918. tcl.resultf("%d is-valid: No page %s", 
  919.     id_, argv[2]);
  920. return TCL_ERROR;
  921. }
  922. pg->set_mpush(Scheduler::instance().clock());
  923. return TCL_OK;
  924. } else if (strcmp(argv[1], "stop-mpush") == 0) {
  925. ClientPage *pg = 
  926. (ClientPage *)pool_->get_page(argv[2]);
  927. if (pg == NULL) {
  928. tcl.resultf("%d is-valid: No page %s", 
  929.     id_, argv[2]);
  930. return TCL_ERROR;
  931. }
  932. pg->clear_mpush();
  933. return TCL_OK;
  934. }
  935. break;
  936. default:
  937. break;
  938. }
  939. return HttpInvalCache::command(argc, argv);
  940. }
  941. void HttpMInvalCache::check_sstate(int sid, int cid)
  942. {
  943. if ((sid == cid) && (cid == id_))
  944. // How come?
  945. return;
  946. SState *sst = lookup_sstate(sid);
  947. NeighborCache *c = lookup_nbr(cid);
  948. if (sst == NULL) {
  949. if (c == NULL) {
  950. fprintf(stderr, 
  951. "%g: cache %d: No neighbor cache for received invalidation from %d via %dn", 
  952. Scheduler::instance().clock(), id_, sid, cid);
  953. abort();
  954. }
  955. #ifdef WEBCACHE_DEBUG
  956. fprintf(stderr,
  957. "%g: cache %d: registered server %d via cache %dn",
  958. Scheduler::instance().clock(), id_, sid, cid);
  959. #endif
  960. sst = new SState(c);
  961. add_sstate(sid, sst);
  962. c->add_server(sid);
  963. } else if (sst->is_down()) {
  964. sst->up();
  965. if (cid != id_) {
  966. if (c == NULL) {
  967. fprintf(stderr, 
  968.  "[%g]: Cache %d has an invalid neighbor cache %dn",
  969.  Scheduler::instance().clock(), id_, cid);
  970. abort();
  971. }
  972. c->server_up(sid);
  973. }
  974. #ifdef WEBCACHE_DEBUG
  975. fprintf(stderr, 
  976. "[%g] Cache %d reconnected to server %d via cache %dn", 
  977. Scheduler::instance().clock(), id_, 
  978. sid, cid);
  979. #endif
  980. Tcl::instance().evalf("%s mark-rejoin", name_);
  981. }
  982. }
  983. void HttpMInvalCache::add_sstate(int sid, SState *sst)
  984. {
  985. int newEntry = 1;
  986. long key = sid;
  987. Tcl_HashEntry *he = 
  988. Tcl_CreateHashEntry(&sstate_, (const char *)key, &newEntry);
  989. if (he == NULL) 
  990. return;
  991. if (newEntry)
  992. Tcl_SetHashValue(he, (ClientData)sst);
  993. }
  994. HttpMInvalCache::SState* HttpMInvalCache::lookup_sstate(int sid)
  995. {
  996.         long key = sid;
  997. Tcl_HashEntry *he = Tcl_FindHashEntry(&sstate_, (const char *)key);
  998. if (he == NULL)
  999. return NULL;
  1000. return (SState *)Tcl_GetHashValue(he);
  1001. }
  1002. NeighborCache* HttpMInvalCache::lookup_nbr(int id)
  1003. {
  1004.         long key = id;
  1005. Tcl_HashEntry *he = Tcl_FindHashEntry(&nbr_, (const char *)key);
  1006. if (he == NULL)
  1007. return NULL;
  1008. return (NeighborCache *)Tcl_GetHashValue(he);
  1009. }
  1010. // Add a new neighbor cache
  1011. void HttpMInvalCache::add_nbr(HttpMInvalCache *cache)
  1012. {
  1013. int newEntry = 1;
  1014. long key = cache->id ();
  1015. Tcl_HashEntry *he = 
  1016. Tcl_CreateHashEntry(&nbr_, (const char *)key, 
  1017.     &newEntry);
  1018. if (he == NULL) 
  1019. return;
  1020. // If this cache already exists, don't do anything
  1021. if (!newEntry)
  1022. return;
  1023. // Start a timer for the neighbor
  1024. LivenessTimer *timer = 
  1025. new LivenessTimer(this,HTTP_HBEXPIRE_COUNT*hb_interval_,
  1026.   cache->id());
  1027. double time = Scheduler::instance().clock();
  1028. NeighborCache *c = new NeighborCache(cache, time, timer);
  1029. Tcl_SetHashValue(he, (ClientData)c);
  1030. }
  1031. // Two ways to receive a heartbeat: (1) via HttpInvalAgent; (2) via TCP 
  1032. // connection between a server and a primary cache. (See "server-hb" handling
  1033. // in command().
  1034. void HttpMInvalCache::recv_heartbeat(int id)
  1035. {
  1036. // Receive time of the heartbeat
  1037. double time = Scheduler::instance().clock();
  1038. NeighborCache *c = lookup_nbr(id);
  1039. if (c == NULL) {
  1040. // XXX
  1041. // The only possible place for this to happen is in the TLC
  1042. // group, where no JOIN could ever reach. Moreover, 
  1043. // we don't even have an entry for that cache yet, so here
  1044. // we add that cache into our entry, and later on we'll add
  1045. // corresponding servers there.
  1046. if (id == id_) 
  1047. return;
  1048. add_nbr(map_cache(id));
  1049. #ifdef WEBCACHE_DEBUG
  1050. fprintf(stderr, "TLC %d discovered TLC %dn", id_, id);
  1051. #endif
  1052. return;
  1053. } else if (c->is_down()) {
  1054. // Neighbor cache recovers. Don't do anything special and
  1055. // let invalid entries recover themselves
  1056. c->up();
  1057. #ifdef WEBCACHE_DEBUG
  1058. fprintf(stderr, "[%g] Cache %d reconnected to cache %dn", 
  1059. Scheduler::instance().clock(), id_, id);
  1060. #endif
  1061. Tcl::instance().evalf("%s mark-rejoin", name_);
  1062. } else
  1063. // Update heartbeat time
  1064. c->reset_timer(time);
  1065. }
  1066. void HttpMInvalCache::invalidate_server(int sid)
  1067. {
  1068. SState *sst = lookup_sstate(sid);
  1069. if (sst->is_down())
  1070. // If this server is already marked down, return
  1071. return;
  1072. sst->down();
  1073. pool_->invalidate_server(sid);
  1074. }
  1075. void HttpMInvalCache::handle_node_failure(int cid)
  1076. {
  1077. #ifdef WEBCACHE_DEBUG
  1078. fprintf(stderr, "[%g] Cache %d disconnected from cache %dn", 
  1079. Scheduler::instance().clock(), id_, cid);
  1080. #endif
  1081. Tcl::instance().evalf("%s mark-leave", name_);
  1082. NeighborCache *c = lookup_nbr(cid);
  1083. if (c == NULL) {
  1084. fprintf(stderr, "%s: An unknown neighbor cache %d failed.n",
  1085. name_, cid);
  1086. }
  1087. // Mark the cache down
  1088. c->down();
  1089. // Invalidate entries of all servers related to that cache
  1090. // XXX We don't have an iterator for all servers in NeighborCache!
  1091. c->invalidate(this);
  1092. // Send leave message to all children
  1093. HttpLeaveData* data = new HttpLeaveData(id_, c->num());
  1094. c->pack_leave(*data);
  1095. send_leave(data);
  1096. }
  1097. void HttpMInvalCache::recv_leave(HttpLeaveData *d)
  1098. {
  1099. #ifdef WEBCACHE_DEBUG
  1100. fprintf(stderr, "[%g] Cache %d gets a LEAVE from cache %dn", 
  1101. Scheduler::instance().clock(), id_, d->id());
  1102. #endif
  1103. if (d->num() == 0) {
  1104. fprintf(stderr, 
  1105.     "%s (%g) gets a leave from cache without server!n", 
  1106. name_, Scheduler::instance().clock());
  1107. return;
  1108. }
  1109. SState *sst;
  1110. HttpLeaveData* data = new HttpLeaveData(id_, d->num());
  1111. NeighborCache *c = lookup_nbr(d->id());
  1112. int i, j;
  1113. for (i = 0, j = 0; i < d->num(); i++) {
  1114. sst = lookup_sstate(d->rec_id(i));
  1115. // If we haven't heard of that server, which means we don't 
  1116. // have any page of that server, ignore the leave message.
  1117. if (sst == NULL) 
  1118. continue;
  1119. // If it's already marked down, don't bother again.
  1120. if (sst->is_down()) 
  1121. continue;
  1122. // If we hear a LEAVE about a server from one of 
  1123. // our child in the virtual distribution tree 
  1124. // of the server, ignore it.
  1125. if (c != sst->cache()) 
  1126. continue;
  1127. // We have the page, and we hold inval contract. Invalidate 
  1128. // the page and inform our children of it.
  1129. sst->down();
  1130. data->add(j++, d->rec_id(i));
  1131. pool_->invalidate_server(d->rec_id(i));
  1132. Tcl::instance().evalf("%s mark-leave", name_);
  1133. }
  1134. // Delete it if it's not sent out 
  1135. if (j > 0)
  1136. send_leave(data);
  1137. delete data;
  1138. }
  1139. void HttpMInvalCache::send_leave(HttpLeaveData *d)
  1140. {
  1141. send_hb_helper(d->cost(), d);
  1142. }
  1143. void HttpMInvalCache::timeout(int reason)
  1144. {
  1145. switch (reason) {
  1146. case HTTP_INVALIDATION:
  1147. // Send an invalidation message
  1148. send_heartbeat();
  1149. break;
  1150. case HTTP_UPDATE:
  1151. // XXX do nothing. May put client selective joining update
  1152. // group here.
  1153. break;
  1154. default:
  1155. fprintf(stderr, "%s: Unknown reason %d", name_, reason);
  1156. break;
  1157. }
  1158. }
  1159. void HttpMInvalCache::process_data(int size, AppData* data)
  1160. {
  1161. if (data == NULL)
  1162. return;
  1163. switch (data->type()) {
  1164. case HTTP_INVALIDATION: {
  1165. // Update timer for the source of the heartbeat
  1166. HttpHbData *inv = (HttpHbData*)data;
  1167. recv_heartbeat(inv->id());
  1168. recv_inv(inv);
  1169. break;
  1170. }
  1171. case HTTP_UPDATE: {
  1172. // Replace all updated pages
  1173. HttpUpdateData *pg = (HttpUpdateData*)data;
  1174. recv_upd(pg);
  1175. break;
  1176. }
  1177. // JOIN messages are sent via TCP and direct TCL callback.
  1178. case HTTP_LEAVE: {
  1179. HttpLeaveData *l = (HttpLeaveData*)data;
  1180. recv_leave(l);
  1181. break;
  1182. }
  1183. default:
  1184. HttpApp::process_data(size, data);
  1185. return;
  1186. }
  1187. }
  1188. void HttpMInvalCache::add_inv(const char *name, double mtime)
  1189. {
  1190. InvalidationRec *p = get_invrec(name);
  1191. if ((p != NULL) && (p->mtime() < mtime)) {
  1192. p->detach();
  1193. delete p;
  1194. p = NULL;
  1195. num_inv_--;
  1196. if (p == NULL) {
  1197. p = new InvalidationRec(name, mtime);
  1198. p->insert(&invlist_);
  1199. num_inv_++;
  1200. }
  1201. }
  1202. InvalidationRec* HttpMInvalCache::get_invrec(const char *name)
  1203. {
  1204. // XXX What should we do if we already have an
  1205. // invalidation record of this page in our 
  1206. // invlist_? --> We should replace it with the new one
  1207. InvalidationRec *r = invlist_;
  1208. for (r = invlist_; r != NULL; r = r->next())
  1209. if (strcmp(name, r->pg()) == 0)
  1210. return r;
  1211. return NULL;
  1212. }
  1213. HttpHbData* HttpMInvalCache::pack_heartbeat()
  1214. {
  1215. HttpHbData *data = new HttpHbData(id_, num_inv_);
  1216. InvalidationRec *p = invlist_, *q;
  1217. int i = 0;
  1218. while (p != NULL) {
  1219. data->add(i++, p);
  1220. // Clearing up invalidation sending list
  1221. if (!p->dec_scount()) {
  1222. // Each invalidation is sent to its children
  1223. // for at most HTTP_HBEXPIRE times. After that 
  1224. // the invalidation record is removed from 
  1225. // the list
  1226. q = p;
  1227. p = p->next();
  1228. q->detach();
  1229. delete q;
  1230. num_inv_--;
  1231. } else 
  1232. p = p->next();
  1233. }
  1234. return data;
  1235. }
  1236. int HttpMInvalCache::recv_inv(HttpHbData *data)
  1237. {
  1238. if (data->num_inv() == 0)
  1239. return 0;
  1240. InvalidationRec *head;
  1241. data->extract(head);
  1242. int old_inv = num_inv_;
  1243. process_inv(data->num_inv(), head, data->id());
  1244. //log("E GINV z %dn", data->size());
  1245. if (old_inv < num_inv_) 
  1246. // This invalidation is valid
  1247. return 1;
  1248. else 
  1249. return 0;
  1250. }
  1251. // Get an invalidation, check invalidation modtimes, then setup 
  1252. // invalidation forwarding entries
  1253. // The input invalidation record list is destroyed.
  1254. void HttpMInvalCache::process_inv(int, InvalidationRec *ivlist, int cache)
  1255. {
  1256. InvalidationRec *p = ivlist, *q, *r;
  1257. //int upd = 0;
  1258. while (p != NULL) {
  1259. ClientPage* pg = (ClientPage *)pool_->get_page(p->pg());
  1260. // XXX Establish server states. Server states only gets 
  1261. // established when we have a page (no matter if we have its
  1262. // content), and we have got an invalidation for the page. 
  1263. // Then we know we've got an invalidation contract for the 
  1264. // page.
  1265. if (pg != NULL) {
  1266. check_sstate(pg->server()->id(), cache);
  1267. // Count this invalidation no matter whether we're
  1268. // going to drop it. But if we doesn't get it 
  1269. // from our virtual parent, don't count it
  1270. SState *sst = lookup_sstate(pg->server()->id());
  1271. if (sst == NULL) {
  1272. // How come we doesn't know the server???
  1273. fprintf(stderr, 
  1274. "%s %d: couldn't find the server.n", 
  1275. __FILE__, __LINE__);
  1276. abort();
  1277. }
  1278. if ((sst->cache()->cache()->id() == cache) && 
  1279.     (pg->mtime() > p->mtime())) {
  1280. // Don't count repeated invalidations.
  1281. pg->count_inval(Ca_, push_low_bound_);
  1282. log("E NTF p %s v %dn",p->pg(),pg->counter());
  1283. }
  1284. }
  1285. // Hook for filters of derived classes
  1286. if (recv_inv_filter(pg, p) == HTTP_INVALCACHE_FILTERED) {
  1287. // If we do not have the page, or we have (or know 
  1288. // about) a newer page, ignore this invalidation 
  1289. // record and keep going.
  1290. //
  1291. // If we have this version of the page, and it's 
  1292. // already invalid, ignore this extra invalidation
  1293. q = p;
  1294. p = p->next();
  1295. q->detach();
  1296. delete q;
  1297. } else {
  1298. // Otherwise we invalidate our page and setup a 
  1299. // invalidation sending record for the page
  1300. pg->invalidate(p->mtime());
  1301. // Delete existing record for that page if any
  1302. q = get_invrec(p->pg());
  1303. if ((q != NULL) && (q->mtime() < p->mtime())) {
  1304. q->detach();
  1305. delete q;
  1306. q = NULL;
  1307. num_inv_--;
  1308. }
  1309. r = p; 
  1310. p = p->next();
  1311. r->detach();
  1312. // Insert it if necessary
  1313. if (q == NULL) {
  1314. r->insert(&invlist_);
  1315. num_inv_++;
  1316. // XXX
  1317. Tcl::instance().evalf("%s mark-invalid",name_);
  1318. log("E GINV p %s m %.17gn", r->pg(), r->mtime());
  1319. } else
  1320. delete r;
  1321. }
  1322. }
  1323. }
  1324. void HttpMInvalCache::send_hb_helper(int size, AppData *data)
  1325. {
  1326. if (inv_parent_ != NULL) 
  1327. inv_parent_->send(size, data->copy());
  1328. for (int i = 0; i < num_sender_; i++)
  1329. inv_sender_[i]->send(size, data->copy());
  1330. }
  1331. void HttpMInvalCache::send_heartbeat()
  1332. {
  1333. if ((num_sender_ == 0) && (inv_parent_ == NULL))
  1334. return;
  1335. HttpHbData* d = pack_heartbeat();
  1336. send_hb_helper(d->cost(), d);
  1337. delete d;
  1338. }
  1339. int HttpMInvalCache::recv_upd(HttpUpdateData *d)
  1340. {
  1341. if (d->num() != 1) {
  1342. fprintf(stderr, 
  1343. "%d gets an update which contain !=1 pages.n", id_);
  1344. abort();
  1345. }
  1346. ClientPage *pg = pool_->get_page(d->rec_page(0));
  1347. if (pg != NULL) 
  1348. if (pg->mtime() >= d->rec_mtime(0)) {
  1349. // If we've already had this version, or a newer 
  1350. // version, ignore this old push
  1351. // fprintf(stderr, "[%g] %d gets an old pushn", 
  1352. //  Scheduler::instance().clock(), id_);
  1353. // log("E OLD m %g p %gn", d->rec_mtime(0), pg->mtime());
  1354. return 0;
  1355. } else {
  1356. // Our old page is invalidated by this new push,
  1357. // set up invalidation records for our children
  1358. add_inv(d->rec_page(0), d->rec_mtime(0));
  1359. pg->count_inval(Ca_, push_low_bound_);
  1360. log("E NTF p %s v %dn", d->rec_page(0),pg->counter());
  1361. }
  1362. // Add the new page into our pool
  1363. ClientPage *q = pool_->enter_page(d->rec_page(0), d->rec_size(0), 
  1364.   d->rec_mtime(0),
  1365.   Scheduler::instance().clock(),
  1366.   d->rec_age(0));
  1367. // By default the page is valid and read. Set it as unread
  1368. q->set_unread();
  1369. log("E GUPD m %.17g z %dn", d->rec_mtime(0), d->pgsize());
  1370. Tcl::instance().evalf("%s mark-valid", name_);
  1371. // XXX If the page was previously marked as MandatoryPush, then
  1372. // we need to check if it's timed out
  1373. if (q->is_mpush() && (Scheduler::instance().clock() - q->mpush_time()
  1374.       > HTTP_HBEXPIRE_COUNT*hb_interval_)) {
  1375. // If mandatory push timer expires, stop push
  1376. q->clear_mpush();
  1377. Tcl::instance().evalf("%s cancel-mpush-refresh %s", 
  1378.       name_, d->rec_page(0));
  1379. }
  1380. if (enable_upd_ && (q->counter() >= push_thresh_) || q->is_mpush())
  1381. // XXX Continue pushing if we either select to push, or 
  1382. // were instructed to do so.
  1383. return 1;
  1384. else 
  1385. return 0;
  1386. }
  1387. HttpUpdateData* HttpMInvalCache::pack_upd(ClientPage* page)
  1388. {
  1389. HttpUpdateData *data = new HttpUpdateData(id_, 1);
  1390. data->add(0, page);
  1391. return data;
  1392. }
  1393. void HttpMInvalCache::send_upd_helper(int pgsize, AppData* data)
  1394. {
  1395. for (int i = 0; i < num_updater_; i++)
  1396. upd_sender_[i]->send(pgsize, data->copy());
  1397. }
  1398. void HttpMInvalCache::send_upd(ClientPage *page)
  1399. {
  1400. if ((num_updater_ == 0) || !enable_upd_) 
  1401. return;
  1402. HttpUpdateData* d = pack_upd(page);
  1403. send_upd_helper(d->pgsize(), d);
  1404. delete d;
  1405. }
  1406. //----------------------------------------------------------------------
  1407. // Multicast invalidation + two way liveness messages + 
  1408. // invalidation filtering. 
  1409. //----------------------------------------------------------------------
  1410. static class HttpPercInvalCacheClass : public TclClass {
  1411. public:
  1412.         HttpPercInvalCacheClass() : TclClass("Http/Cache/Inval/Mcast/Perc") {}
  1413.         TclObject* create(int, const char*const*) {
  1414. return (new HttpPercInvalCache());
  1415. }
  1416. } class_HttpPercInvalCache_app;
  1417. HttpPercInvalCache::HttpPercInvalCache() 
  1418. {
  1419. bind("direct_request_", &direct_request_);
  1420. }
  1421. int HttpPercInvalCache::command(int argc, const char*const* argv)
  1422. {
  1423. Tcl& tcl = Tcl::instance();
  1424. if (strcmp(argv[1], "is-header-valid") == 0) {
  1425. ClientPage *pg = 
  1426. (ClientPage *)pool_->get_page(argv[2]);
  1427. if (pg == NULL) {
  1428. tcl.resultf("%d is-valid: No page %s", 
  1429.     id_, argv[2]);
  1430. return TCL_ERROR;
  1431. }
  1432. tcl.resultf("%d", pg->is_header_valid());
  1433. return TCL_OK;
  1434. } else if (strcmp(argv[1], "enter-metadata") == 0) {
  1435. /* 
  1436.  * <cache> enter-metadata <args...>
  1437.  * The same arguments as enter-page, but set the page status
  1438.  * as HTTP_VALID_HEADER, i.e., if we get a request, we need 
  1439.  * to fetch the actual valid page content
  1440.  */
  1441. ClientPage *pg = pool_->enter_metadata(argc, argv);
  1442. if (pg == NULL)
  1443. return TCL_ERROR;
  1444. else
  1445. return TCL_OK;
  1446. }
  1447. return HttpMInvalCache::command(argc, argv);
  1448. }