peer_digest.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:27k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. /*
  2.  * $Id: peer_digest.c,v 1.70 1999/01/29 21:28:17 wessels Exp $
  3.  *
  4.  * DEBUG: section 72    Peer Digest Routines
  5.  * AUTHOR: Alex Rousskov
  6.  *
  7.  * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
  8.  * ----------------------------------------------------------
  9.  *
  10.  *  Squid is the result of efforts by numerous individuals from the
  11.  *  Internet community.  Development is led by Duane Wessels of the
  12.  *  National Laboratory for Applied Network Research and funded by the
  13.  *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
  14.  *  Duane Wessels and the University of California San Diego.  Please
  15.  *  see the COPYRIGHT file for full details.  Squid incorporates
  16.  *  software developed and/or copyrighted by other sources.  Please see
  17.  *  the CREDITS file for full details.
  18.  *
  19.  *  This program is free software; you can redistribute it and/or modify
  20.  *  it under the terms of the GNU General Public License as published by
  21.  *  the Free Software Foundation; either version 2 of the License, or
  22.  *  (at your option) any later version.
  23.  *  
  24.  *  This program is distributed in the hope that it will be useful,
  25.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  26.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  27.  *  GNU General Public License for more details.
  28.  *  
  29.  *  You should have received a copy of the GNU General Public License
  30.  *  along with this program; if not, write to the Free Software
  31.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  32.  *
  33.  */
  34. #include "squid.h"
  35. #if USE_CACHE_DIGESTS
  36. /* local types */
  37. /* local prototypes */
  38. static time_t peerDigestIncDelay(const PeerDigest * pd);
  39. static time_t peerDigestNewDelay(const StoreEntry * e);
  40. static void peerDigestSetCheck(PeerDigest * pd, time_t delay);
  41. static void peerDigestClean(PeerDigest *);
  42. static EVH peerDigestCheck;
  43. static void peerDigestRequest(PeerDigest * pd);
  44. static STCB peerDigestFetchReply;
  45. static STCB peerDigestSwapInHeaders;
  46. static STCB peerDigestSwapInCBlock;
  47. static STCB peerDigestSwapInMask;
  48. static int peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name);
  49. static void peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason);
  50. static void peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason);
  51. static void peerDigestReqFinish(DigestFetchState * fetch, char *buf, int, int, int, const char *reason, int err);
  52. static void peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err);
  53. static void peerDigestFetchFinish(DigestFetchState * fetch, int err);
  54. static void peerDigestFetchSetStats(DigestFetchState * fetch);
  55. static int peerDigestSetCBlock(PeerDigest * pd, const char *buf);
  56. static int peerDigestUseful(const PeerDigest * pd);
  57. /* local constants */
  58. #define StoreDigestCBlockSize sizeof(StoreDigestCBlock)
  59. /* min interval for requesting digests from a given peer */
  60. static const time_t PeerDigestReqMinGap = 5 * 60; /* seconds */
  61. /* min interval for requesting digests (cumulative request stream) */
  62. static const time_t GlobDigestReqMinGap = 1 * 60; /* seconds */
  63. /* local vars */
  64. static time_t pd_last_req_time = 0; /* last call to Check */
  65. /* initialize peer digest */
  66. static void
  67. peerDigestInit(PeerDigest * pd, peer * p)
  68. {
  69.     assert(pd && p);
  70.     memset(pd, 0, sizeof(*pd));
  71.     pd->peer = p;
  72.     /* if peer disappears, we will know it's name */
  73.     stringInit(&pd->host, p->host);
  74.     pd->times.initialized = squid_curtime;
  75. }
  76. static void
  77. peerDigestClean(PeerDigest * pd)
  78. {
  79.     assert(pd);
  80.     if (pd->cd)
  81. cacheDigestDestroy(pd->cd);
  82.     stringClean(&pd->host);
  83. }
  84. /* allocate new peer digest, call Init, and lock everything */
  85. PeerDigest *
  86. peerDigestCreate(peer * p)
  87. {
  88.     PeerDigest *pd;
  89.     assert(p);
  90.     pd = memAllocate(MEM_PEER_DIGEST);
  91.     cbdataAdd(pd, memFree, MEM_PEER_DIGEST);
  92.     peerDigestInit(pd, p);
  93.     cbdataLock(pd->peer); /* we will use the peer */
  94.     return pd;
  95. }
  96. /* call Clean and free/unlock everything */
  97. void
  98. peerDigestDestroy(PeerDigest * pd)
  99. {
  100.     peer *p;
  101.     assert(pd);
  102.     p = pd->peer;
  103.     pd->peer = NULL;
  104.     /* inform peer (if any) that we are gone */
  105.     if (cbdataValid(p))
  106. peerNoteDigestGone(p);
  107.     cbdataUnlock(p); /* must unlock, valid or not */
  108.     peerDigestClean(pd);
  109.     cbdataFree(pd);
  110. }
  111. /* called by peer to indicate that somebody actually needs this digest */
  112. void
  113. peerDigestNeeded(PeerDigest * pd)
  114. {
  115.     assert(pd);
  116.     assert(!pd->flags.needed);
  117.     assert(!pd->cd);
  118.     pd->flags.needed = 1;
  119.     pd->times.needed = squid_curtime;
  120.     peerDigestSetCheck(pd, 0); /* check asap */
  121. }
  122. /* currently we do not have a reason to disable without destroying */
  123. #if FUTURE_CODE
  124. /* disables peer for good */
  125. static void
  126. peerDigestDisable(PeerDigest * pd)
  127. {
  128.     debug(72, 2) ("peerDigestDisable: peer %s disabled for goodn",
  129. strBuf(pd->host));
  130.     pd->times.disabled = squid_curtime;
  131.     pd->times.next_check = -1; /* never */
  132.     pd->flags.usable = 0;
  133.     if (pd->cd) {
  134. cacheDigestDestroy(pd->cd);
  135. pd->cd = NULL;
  136.     }
  137.     /* we do not destroy the pd itself to preserve its "history" and stats */
  138. }
  139. #endif
  140. /* increment retry delay [after an unsuccessful attempt] */
  141. static time_t
  142. peerDigestIncDelay(const PeerDigest * pd)
  143. {
  144.     assert(pd);
  145.     return pd->times.retry_delay > 0 ?
  146. 2 * pd->times.retry_delay : /* exponential backoff */
  147. PeerDigestReqMinGap; /* minimal delay */
  148. }
  149. /* artificially increases Expires: setting to avoid race conditions 
  150.  * returns the delay till that [increased] expiration time */
  151. static time_t
  152. peerDigestNewDelay(const StoreEntry * e)
  153. {
  154.     assert(e);
  155.     if (e->expires > 0)
  156. return e->expires + PeerDigestReqMinGap - squid_curtime;
  157.     return PeerDigestReqMinGap;
  158. }
  159. /* registers next digest verification */
  160. static void
  161. peerDigestSetCheck(PeerDigest * pd, time_t delay)
  162. {
  163.     eventAdd("peerDigestCheck", peerDigestCheck, pd, (double) delay, 1);
  164.     pd->times.next_check = squid_curtime + delay;
  165.     debug(72, 3) ("peerDigestSetCheck: will check peer %s in %d secsn",
  166. strBuf(pd->host), delay);
  167. }
  168. /*
  169.  * called when peer is about to disappear or have already disappeared
  170.  */
  171. void
  172. peerDigestNotePeerGone(PeerDigest * pd)
  173. {
  174.     if (pd->flags.requested) {
  175. debug(72, 2) ("peerDigest: peer %s gone, will destroy after fetch.n",
  176.     strBuf(pd->host));
  177. /* do nothing now, the fetching chain will notice and take action */
  178.     } else {
  179. debug(72, 2) ("peerDigest: peer %s is gone, destroying now.n",
  180.     strBuf(pd->host));
  181. peerDigestDestroy(pd);
  182.     }
  183. }
  184. /* callback for eventAdd() (with peer digest locked)
  185.  * request new digest if our copy is too old or if we lack one; 
  186.  * schedule next check otherwise */
  187. static void
  188. peerDigestCheck(void *data)
  189. {
  190.     PeerDigest *pd = data;
  191.     time_t req_time;
  192.     /*
  193.      * you can't assert(cbdataValid(pd)) -- if its not valid this
  194.      * function never gets called
  195.      */
  196.     assert(!pd->flags.requested);
  197.     pd->times.next_check = 0; /* unknown */
  198.     if (!cbdataValid(pd->peer)) {
  199. peerDigestNotePeerGone(pd);
  200. return;
  201.     }
  202.     debug(72, 3) ("peerDigestCheck: peer %s:%dn", pd->peer->host, pd->peer->http_port);
  203.     debug(72, 3) ("peerDigestCheck: time: %d, last received: %d (%+d)n",
  204. squid_curtime, pd->times.received, (squid_curtime - pd->times.received));
  205.     /* decide when we should send the request:
  206.      * request now unless too close to other requests */
  207.     req_time = squid_curtime;
  208.     /* per-peer limit */
  209.     if (req_time - pd->times.received < PeerDigestReqMinGap) {
  210. debug(72, 2) ("peerDigestCheck: %s, avoiding close peer requests (%d < %d secs).n",
  211.     strBuf(pd->host), req_time - pd->times.received,
  212.     PeerDigestReqMinGap);
  213. req_time = pd->times.received + PeerDigestReqMinGap;
  214.     }
  215.     /* global limit */
  216.     if (req_time - pd_last_req_time < GlobDigestReqMinGap) {
  217. debug(72, 2) ("peerDigestCheck: %s, avoiding close requests (%d < %d secs).n",
  218.     strBuf(pd->host), req_time - pd_last_req_time,
  219.     GlobDigestReqMinGap);
  220. req_time = pd_last_req_time + GlobDigestReqMinGap;
  221.     }
  222.     if (req_time <= squid_curtime)
  223. peerDigestRequest(pd); /* will set pd->flags.requested */
  224.     else
  225. peerDigestSetCheck(pd, req_time - squid_curtime);
  226. }
  227. /* ask store for a digest */
  228. static void
  229. peerDigestRequest(PeerDigest * pd)
  230. {
  231.     peer *p = pd->peer;
  232.     StoreEntry *e, *old_e;
  233.     char *url;
  234.     const cache_key *key;
  235.     request_t *req;
  236.     DigestFetchState *fetch = NULL;
  237.     pd->req_result = NULL;
  238.     pd->flags.requested = 1;
  239.     /* compute future request components */
  240.     url = internalRemoteUri(p->host, p->http_port, "/squid-internal-periodic/", StoreDigestFileName);
  241.     key = storeKeyPublic(url, METHOD_GET);
  242.     debug(72, 2) ("peerDigestRequest: %s key: %sn", url, storeKeyText(key));
  243.     req = urlParse(METHOD_GET, url);
  244.     assert(req);
  245.     /* add custom headers */
  246.     assert(!req->header.len);
  247.     httpHeaderPutStr(&req->header, HDR_ACCEPT, StoreDigestMimeStr);
  248.     httpHeaderPutStr(&req->header, HDR_ACCEPT, "text/html");
  249.     /* create fetch state structure */
  250.     fetch = memAllocate(MEM_DIGEST_FETCH_STATE);
  251.     cbdataAdd(fetch, memFree, MEM_DIGEST_FETCH_STATE);
  252.     fetch->request = requestLink(req);
  253.     fetch->pd = pd;
  254.     fetch->offset = 0;
  255.     /* update timestamps */
  256.     fetch->start_time = squid_curtime;
  257.     pd->times.requested = squid_curtime;
  258.     pd_last_req_time = squid_curtime;
  259.     req->flags.cachable = 1;
  260.     /* the rest is based on clientProcessExpired() */
  261.     req->flags.refresh = 1;
  262.     old_e = fetch->old_entry = storeGet(key);
  263.     if (old_e) {
  264. debug(72, 5) ("peerDigestRequest: found old entryn");
  265. storeLockObject(old_e);
  266. storeCreateMemObject(old_e, url, url);
  267. storeClientListAdd(old_e, fetch);
  268.     }
  269.     e = fetch->entry = storeCreateEntry(url, url, req->flags, req->method);
  270.     assert(EBIT_TEST(e->flags, KEY_PRIVATE));
  271.     storeClientListAdd(e, fetch);
  272.     /* set lastmod to trigger IMS request if possible */
  273.     if (old_e)
  274. e->lastmod = old_e->lastmod;
  275.     /* push towards peer cache */
  276.     debug(72, 3) ("peerDigestRequest: forwarding to fwdStart...n");
  277.     fwdStart(-1, e, req, no_addr, no_addr);
  278.     cbdataLock(fetch);
  279.     cbdataLock(fetch->pd);
  280.     storeClientCopy(e, 0, 0, 4096, memAllocate(MEM_4K_BUF),
  281. peerDigestFetchReply, fetch);
  282. }
  283. /* wait for full http headers to be received then parse them */
  284. static void
  285. peerDigestFetchReply(void *data, char *buf, ssize_t size)
  286. {
  287.     DigestFetchState *fetch = data;
  288.     PeerDigest *pd = fetch->pd;
  289.     assert(pd && buf);
  290.     assert(!fetch->offset);
  291.     if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestFetchReply"))
  292. return;
  293.     if (headersEnd(buf, size)) {
  294. http_status status;
  295. HttpReply *reply = fetch->entry->mem_obj->reply;
  296. assert(reply);
  297. httpReplyParse(reply, buf);
  298. status = reply->sline.status;
  299. debug(72, 3) ("peerDigestFetchReply: %s status: %d, expires: %d (%+d)n",
  300.     strBuf(pd->host), status,
  301.     reply->expires, reply->expires - squid_curtime);
  302. /* this "if" is based on clientHandleIMSReply() */
  303. if (status == HTTP_NOT_MODIFIED) {
  304.     request_t *r = NULL;
  305.     /* our old entry is fine */
  306.     assert(fetch->old_entry);
  307.     if (!fetch->old_entry->mem_obj->request)
  308. fetch->old_entry->mem_obj->request = r =
  309.     requestLink(fetch->entry->mem_obj->request);
  310.     assert(fetch->old_entry->mem_obj->request);
  311.     httpReplyUpdateOnNotModified(fetch->old_entry->mem_obj->reply, reply);
  312.     storeTimestampsSet(fetch->old_entry);
  313.     /* get rid of 304 reply */
  314.     storeUnregister(fetch->entry, fetch);
  315.     storeUnlockObject(fetch->entry);
  316.     fetch->entry = fetch->old_entry;
  317.     fetch->old_entry = NULL;
  318.     /* preserve request -- we need its size to update counters */
  319.     /* requestUnlink(r); */
  320.     /* fetch->entry->mem_obj->request = NULL; */
  321. } else if (status == HTTP_OK) {
  322.     /* get rid of old entry if any */
  323.     if (fetch->old_entry) {
  324. debug(72, 3) ("peerDigestFetchReply: got new digest, releasing old onen");
  325. storeUnregister(fetch->old_entry, fetch);
  326. storeReleaseRequest(fetch->old_entry);
  327. storeUnlockObject(fetch->old_entry);
  328. fetch->old_entry = NULL;
  329.     }
  330. } else {
  331.     /* some kind of a bug */
  332.     peerDigestFetchAbort(fetch, buf, httpStatusLineReason(&reply->sline));
  333.     return;
  334. }
  335. /* must have a ready-to-use store entry if we got here */
  336. /* can we stay with the old in-memory digest? */
  337. if (status == HTTP_NOT_MODIFIED && fetch->pd->cd)
  338.     peerDigestFetchStop(fetch, buf, "Not modified");
  339. else
  340.     storeClientCopy(fetch->entry, /* have to swap in */
  341. 0, 0, SM_PAGE_SIZE, buf, peerDigestSwapInHeaders, fetch);
  342.     } else {
  343. /* need more data, do we have space? */
  344. if (size >= SM_PAGE_SIZE)
  345.     peerDigestFetchAbort(fetch, buf, "reply header too big");
  346. else
  347.     storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
  348. peerDigestFetchReply, fetch);
  349.     }
  350. }
  351. /* fetch headers from disk, pass on to SwapInCBlock */
  352. static void
  353. peerDigestSwapInHeaders(void *data, char *buf, ssize_t size)
  354. {
  355.     DigestFetchState *fetch = data;
  356.     size_t hdr_size;
  357.     if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInHeaders"))
  358. return;
  359.     assert(!fetch->offset);
  360.     if ((hdr_size = headersEnd(buf, size))) {
  361. assert(fetch->entry->mem_obj->reply);
  362. if (!fetch->entry->mem_obj->reply->sline.status)
  363.     httpReplyParse(fetch->entry->mem_obj->reply, buf);
  364. if (fetch->entry->mem_obj->reply->sline.status != HTTP_OK) {
  365.     debug(72, 1) ("peerDigestSwapInHeaders: %s status %d got cached!n",
  366. strBuf(fetch->pd->host), fetch->entry->mem_obj->reply->sline.status);
  367.     peerDigestFetchAbort(fetch, buf, "internal status error");
  368.     return;
  369. }
  370. fetch->offset += hdr_size;
  371. storeClientCopy(fetch->entry, size, fetch->offset,
  372.     SM_PAGE_SIZE, buf,
  373.     peerDigestSwapInCBlock, fetch);
  374.     } else {
  375. /* need more data, do we have space? */
  376. if (size >= SM_PAGE_SIZE)
  377.     peerDigestFetchAbort(fetch, buf, "stored header too big");
  378. else
  379.     storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
  380. peerDigestSwapInHeaders, fetch);
  381.     }
  382. }
  383. static void
  384. peerDigestSwapInCBlock(void *data, char *buf, ssize_t size)
  385. {
  386.     DigestFetchState *fetch = data;
  387.     if (peerDigestFetchedEnough(fetch, buf, size, "peerDigestSwapInCBlock"))
  388. return;
  389.     if (size >= StoreDigestCBlockSize) {
  390. PeerDigest *pd = fetch->pd;
  391. HttpReply *rep = fetch->entry->mem_obj->reply;
  392. const int seen = fetch->offset + size;
  393. assert(pd && rep);
  394. if (peerDigestSetCBlock(pd, buf)) {
  395.     /* XXX: soon we will have variable header size */
  396.     fetch->offset += StoreDigestCBlockSize;
  397.     /* switch to CD buffer and fetch digest guts */
  398.     memFree(buf, MEM_4K_BUF);
  399.     buf = NULL;
  400.     assert(pd->cd->mask);
  401.     storeClientCopy(fetch->entry,
  402. seen,
  403. fetch->offset,
  404. pd->cd->mask_size,
  405. pd->cd->mask,
  406. peerDigestSwapInMask, fetch);
  407. } else {
  408.     peerDigestFetchAbort(fetch, buf, "invalid digest cblock");
  409. }
  410.     } else {
  411. /* need more data, do we have space? */
  412. if (size >= SM_PAGE_SIZE)
  413.     peerDigestFetchAbort(fetch, buf, "digest cblock too big");
  414. else
  415.     storeClientCopy(fetch->entry, size, 0, SM_PAGE_SIZE, buf,
  416. peerDigestSwapInCBlock, fetch);
  417.     }
  418. }
  419. static void
  420. peerDigestSwapInMask(void *data, char *buf, ssize_t size)
  421. {
  422.     DigestFetchState *fetch = data;
  423.     PeerDigest *pd;
  424.     /* NOTE! buf points to the middle of pd->cd->mask! */
  425.     if (peerDigestFetchedEnough(fetch, NULL, size, "peerDigestSwapInMask"))
  426. return;
  427.     pd = fetch->pd;
  428.     assert(pd->cd && pd->cd->mask);
  429.     fetch->offset += size;
  430.     fetch->mask_offset += size;
  431.     if (fetch->mask_offset >= pd->cd->mask_size) {
  432. debug(72, 2) ("peerDigestSwapInMask: Done! Got %d, expected %dn",
  433.     fetch->mask_offset, pd->cd->mask_size);
  434. assert(fetch->mask_offset == pd->cd->mask_size);
  435. assert(peerDigestFetchedEnough(fetch, NULL, 0, "peerDigestSwapInMask"));
  436.     } else {
  437. const size_t buf_sz = pd->cd->mask_size - fetch->mask_offset;
  438. assert(buf_sz > 0);
  439. storeClientCopy(fetch->entry,
  440.     fetch->offset,
  441.     fetch->offset,
  442.     buf_sz,
  443.     pd->cd->mask + fetch->mask_offset,
  444.     peerDigestSwapInMask, fetch);
  445.     }
  446. }
  447. static int
  448. peerDigestFetchedEnough(DigestFetchState * fetch, char *buf, ssize_t size, const char *step_name)
  449. {
  450.     PeerDigest *pd = NULL;
  451.     const char *host = "<unknown>"; /* peer host */
  452.     const char *reason = NULL; /* reason for completion */
  453.     const char *no_bug = NULL; /* successful completion if set */
  454.     const int fcb_valid = cbdataValid(fetch);
  455.     const int pdcb_valid = fcb_valid && cbdataValid(fetch->pd);
  456.     const int pcb_valid = pdcb_valid && cbdataValid(fetch->pd->peer);
  457.     /* test possible exiting conditions (the same for most steps!)
  458.      * cases marked with '?!' should not happen */
  459.     if (!reason) {
  460. if (!fcb_valid)
  461.     reason = "fetch aborted?!";
  462. else if (!(pd = fetch->pd))
  463.     reason = "peer digest disappeared?!";
  464. #if DONT
  465. else if (!cbdataValid(pd))
  466.     reason = "invalidated peer digest?!";
  467. #endif
  468. else
  469.     host = strBuf(pd->host);
  470.     }
  471.     debug(72, 6) ("%s: peer %s, offset: %d size: %d.n",
  472. step_name, host, fcb_valid ? fetch->offset : -1, size);
  473.     /* continue checking (with pd and host known and valid) */
  474.     if (!reason) {
  475. if (!cbdataValid(pd->peer))
  476.     reason = "peer disappeared";
  477. else if (size < 0)
  478.     reason = "swap failure";
  479. else if (!fetch->entry)
  480.     reason = "swap aborted?!";
  481. else if (EBIT_TEST(fetch->entry->flags, ENTRY_ABORTED))
  482.     reason = "swap aborted";
  483.     }
  484.     /* continue checking (maybe-successful eof case) */
  485.     if (!reason && !size) {
  486. if (!pd->cd)
  487.     reason = "null digest?!";
  488. else if (fetch->mask_offset != pd->cd->mask_size)
  489.     reason = "premature end of digest?!";
  490. else if (!peerDigestUseful(pd))
  491.     reason = "useless digest";
  492. else
  493.     reason = no_bug = "success";
  494.     }
  495.     /* finish if we have a reason */
  496.     if (reason) {
  497. const int level = strstr(reason, "?!") ? 1 : 3;
  498. debug(72, level) ("%s: peer %s, exiting after '%s'n",
  499.     step_name, host, reason);
  500. peerDigestReqFinish(fetch, buf,
  501.     fcb_valid, pdcb_valid, pcb_valid, reason, !no_bug);
  502.     } else {
  503. /* paranoid check */
  504. assert(fcb_valid && pdcb_valid && pcb_valid);
  505.     }
  506.     return reason != NULL;
  507. }
  508. /* call this when all callback data is valid and fetch must be stopped but
  509.  * no error has occurred (e.g. we received 304 reply and reuse old digest) */
  510. static void
  511. peerDigestFetchStop(DigestFetchState * fetch, char *buf, const char *reason)
  512. {
  513.     assert(reason);
  514.     debug(72, 2) ("peerDigestFetchStop: peer %s, reason: %sn",
  515. strBuf(fetch->pd->host), reason);
  516.     peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 0);
  517. }
  518. /* call this when all callback data is valid but something bad happened */
  519. static void
  520. peerDigestFetchAbort(DigestFetchState * fetch, char *buf, const char *reason)
  521. {
  522.     assert(reason);
  523.     debug(72, 2) ("peerDigestFetchAbort: peer %s, reason: %sn",
  524. strBuf(fetch->pd->host), reason);
  525.     peerDigestReqFinish(fetch, buf, 1, 1, 1, reason, 1);
  526. }
  527. /* complete the digest transfer, update stats, unlock/release everything */
  528. static void
  529. peerDigestReqFinish(DigestFetchState * fetch, char *buf,
  530.     int fcb_valid, int pdcb_valid, int pcb_valid,
  531.     const char *reason, int err)
  532. {
  533.     assert(reason);
  534.     /* must go before peerDigestPDFinish */
  535.     if (pdcb_valid) {
  536. fetch->pd->flags.requested = 0;
  537. fetch->pd->req_result = reason;
  538.     }
  539.     /* schedule next check if peer is still out there */
  540.     if (pcb_valid) {
  541. PeerDigest *pd = fetch->pd;
  542. if (err) {
  543.     pd->times.retry_delay = peerDigestIncDelay(pd);
  544.     peerDigestSetCheck(pd, pd->times.retry_delay);
  545. } else {
  546.     pd->times.retry_delay = 0;
  547.     peerDigestSetCheck(pd, peerDigestNewDelay(fetch->entry));
  548. }
  549.     }
  550.     /* note: order is significant */
  551.     if (fcb_valid)
  552. peerDigestFetchSetStats(fetch);
  553.     if (pdcb_valid)
  554. peerDigestPDFinish(fetch, pcb_valid, err);
  555.     if (fcb_valid)
  556. peerDigestFetchFinish(fetch, err);
  557.     if (buf)
  558. memFree(buf, MEM_4K_BUF);
  559. }
  560. /* destroys digest if peer disappeared
  561.  * must be called only when fetch and pd cbdata are valid */
  562. static void
  563. peerDigestPDFinish(DigestFetchState * fetch, int pcb_valid, int err)
  564. {
  565.     PeerDigest *pd = fetch->pd;
  566.     const char *host = strBuf(pd->host);
  567.     pd->times.received = squid_curtime;
  568.     pd->times.req_delay = fetch->resp_time;
  569.     kb_incr(&pd->stats.sent.kbytes, (size_t) fetch->sent.bytes);
  570.     kb_incr(&pd->stats.recv.kbytes, (size_t) fetch->recv.bytes);
  571.     pd->stats.sent.msgs += fetch->sent.msg;
  572.     pd->stats.recv.msgs += fetch->recv.msg;
  573.     if (err) {
  574. debug(72, 1) ("%sdisabling (%s) digest from %sn",
  575.     pcb_valid ? "temporary " : "",
  576.     pd->req_result, host);
  577. if (pd->cd) {
  578.     cacheDigestDestroy(pd->cd);
  579.     pd->cd = NULL;
  580. }
  581. pd->flags.usable = 0;
  582. if (!pcb_valid)
  583.     peerDigestNotePeerGone(pd);
  584.     } else {
  585. assert(pcb_valid);
  586. pd->flags.usable = 1;
  587. /* XXX: ugly condition, but how? */
  588. if (fetch->entry->store_status == STORE_OK)
  589.     debug(72, 2) ("re-used old digest from %sn", host);
  590. else
  591.     debug(72, 2) ("received valid digest from %sn", host);
  592.     }
  593.     fetch->pd = NULL;
  594.     cbdataUnlock(pd);
  595. }
  596. /* free fetch state structures
  597.  * must be called only when fetch cbdata is valid */
  598. static void
  599. peerDigestFetchFinish(DigestFetchState * fetch, int err)
  600. {
  601.     assert(fetch->entry && fetch->request);
  602.     if (fetch->old_entry) {
  603. debug(72, 2) ("peerDigestFetchFinish: deleting old entryn");
  604. storeUnregister(fetch->old_entry, fetch);
  605. storeReleaseRequest(fetch->old_entry);
  606. storeUnlockObject(fetch->old_entry);
  607. fetch->old_entry = NULL;
  608.     }
  609.     /* update global stats */
  610.     kb_incr(&Counter.cd.kbytes_sent, (size_t) fetch->sent.bytes);
  611.     kb_incr(&Counter.cd.kbytes_recv, (size_t) fetch->recv.bytes);
  612.     Counter.cd.msgs_sent += fetch->sent.msg;
  613.     Counter.cd.msgs_recv += fetch->recv.msg;
  614.     /* unlock everything */
  615.     storeUnregister(fetch->entry, fetch);
  616.     storeUnlockObject(fetch->entry);
  617.     requestUnlink(fetch->request);
  618.     fetch->entry = NULL;
  619.     fetch->request = NULL;
  620.     assert(fetch->pd == NULL);
  621.     cbdataUnlock(fetch);
  622.     cbdataFree(fetch);
  623. }
  624. /* calculate fetch stats after completion */
  625. static void
  626. peerDigestFetchSetStats(DigestFetchState * fetch)
  627. {
  628.     MemObject *mem;
  629.     assert(fetch->entry && fetch->request);
  630.     mem = fetch->entry->mem_obj;
  631.     assert(mem);
  632.     /* XXX: outgoing numbers are not precise */
  633.     /* XXX: we must distinguish between 304 hits and misses here */
  634.     fetch->sent.bytes = httpRequestPrefixLen(fetch->request);
  635.     fetch->recv.bytes = fetch->entry->store_status == STORE_PENDING ?
  636. mem->inmem_hi : mem->object_sz;
  637.     fetch->sent.msg = fetch->recv.msg = 1;
  638.     fetch->expires = fetch->entry->expires;
  639.     fetch->resp_time = squid_curtime - fetch->start_time;
  640.     debug(72, 3) ("peerDigestFetchFinish: recv %d bytes in %d secsn",
  641. fetch->recv.bytes, fetch->resp_time);
  642.     debug(72, 3) ("peerDigestFetchFinish: expires: %d (%+d), lmt: %d (%+d)n",
  643. fetch->expires, fetch->expires - squid_curtime,
  644. fetch->entry->lastmod, fetch->entry->lastmod - squid_curtime);
  645. }
  646. static int
  647. peerDigestSetCBlock(PeerDigest * pd, const char *buf)
  648. {
  649.     StoreDigestCBlock cblock;
  650.     int freed_size = 0;
  651.     const char *host = strBuf(pd->host);
  652.     xmemcpy(&cblock, buf, sizeof(cblock));
  653.     /* network -> host conversions */
  654.     cblock.ver.current = ntohs(cblock.ver.current);
  655.     cblock.ver.required = ntohs(cblock.ver.required);
  656.     cblock.capacity = ntohl(cblock.capacity);
  657.     cblock.count = ntohl(cblock.count);
  658.     cblock.del_count = ntohl(cblock.del_count);
  659.     cblock.mask_size = ntohl(cblock.mask_size);
  660.     debug(72, 2) ("got digest cblock from %s; ver: %d (req: %d)n",
  661. host, (int) cblock.ver.current, (int) cblock.ver.required);
  662.     debug(72, 2) ("t size: %d bytes, e-cnt: %d, e-util: %d%%n",
  663. cblock.mask_size, cblock.count,
  664. xpercentInt(cblock.count, cblock.capacity));
  665.     /* check version requirements (both ways) */
  666.     if (cblock.ver.required > CacheDigestVer.current) {
  667. debug(72, 1) ("%s digest requires version %d; have: %dn",
  668.     host, cblock.ver.required, CacheDigestVer.current);
  669. return 0;
  670.     }
  671.     if (cblock.ver.current < CacheDigestVer.required) {
  672. debug(72, 1) ("%s digest is version %d; we require: %dn",
  673.     host, cblock.ver.current, CacheDigestVer.required);
  674. return 0;
  675.     }
  676.     /* check consistency */
  677.     if (cblock.ver.required > cblock.ver.current ||
  678. cblock.mask_size <= 0 || cblock.capacity <= 0 ||
  679. cblock.bits_per_entry <= 0 || cblock.hash_func_count <= 0) {
  680. debug(72, 0) ("%s digest cblock is corrupted.n", host);
  681. return 0;
  682.     }
  683.     /* check consistency further */
  684.     if (cblock.mask_size != cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry)) {
  685. debug(72, 0) ("%s digest cblock is corrupted (mask size mismatch: %d ? %d).n",
  686.     host, cblock.mask_size, cacheDigestCalcMaskSize(cblock.capacity, cblock.bits_per_entry));
  687. return 0;
  688.     }
  689.     /* there are some things we cannot do yet */
  690.     if (cblock.hash_func_count != CacheDigestHashFuncCount) {
  691. debug(72, 0) ("%s digest: unsupported #hash functions: %d ? %d.n",
  692.     host, cblock.hash_func_count, CacheDigestHashFuncCount);
  693. return 0;
  694.     }
  695.     /*
  696.      * no cblock bugs below this point
  697.      */
  698.     /* check size changes */
  699.     if (pd->cd && cblock.mask_size != pd->cd->mask_size) {
  700. debug(72, 2) ("%s digest changed size: %d -> %dn",
  701.     host, cblock.mask_size, pd->cd->mask_size);
  702. freed_size = pd->cd->mask_size;
  703. cacheDigestDestroy(pd->cd);
  704. pd->cd = NULL;
  705.     }
  706.     if (!pd->cd) {
  707. debug(72, 2) ("creating %s digest; size: %d (%+d) bytesn",
  708.     host, cblock.mask_size, (int) (cblock.mask_size - freed_size));
  709. pd->cd = cacheDigestCreate(cblock.capacity, cblock.bits_per_entry);
  710. if (cblock.mask_size >= freed_size)
  711.     kb_incr(&Counter.cd.memory, cblock.mask_size - freed_size);
  712.     }
  713.     assert(pd->cd);
  714.     /* these assignments leave us in an inconsistent state until we finish reading the digest */
  715.     pd->cd->count = cblock.count;
  716.     pd->cd->del_count = cblock.del_count;
  717.     return 1;
  718. }
  719. static int
  720. peerDigestUseful(const PeerDigest * pd)
  721. {
  722.     /* TODO: we should calculate the prob of a false hit instead of bit util */
  723.     const int bit_util = cacheDigestBitUtil(pd->cd);
  724.     if (bit_util > 65) {
  725. debug(72, 0) ("Warning: %s peer digest has too many bits on (%d%%).n",
  726.     strBuf(pd->host), bit_util);
  727. return 0;
  728.     }
  729.     return 1;
  730. }
  731. static int
  732. saneDiff(time_t diff)
  733. {
  734.     return abs(diff) > squid_curtime / 2 ? 0 : diff;
  735. }
  736. void
  737. peerDigestStatsReport(const PeerDigest * pd, StoreEntry * e)
  738. {
  739. #define f2s(flag) (pd->flags.flag ? "yes" : "no")
  740. #define appendTime(tm) storeAppendPrintf(e, "%st %10dt %+dt %+dn", 
  741.     ""#tm, pd->times.tm, 
  742.     saneDiff(pd->times.tm - squid_curtime), 
  743.     saneDiff(pd->times.tm - pd->times.initialized))
  744.     const char *host = pd ? strBuf(pd->host) : NULL;
  745.     assert(pd);
  746.     storeAppendPrintf(e, "npeer digest from %sn", host);
  747.     cacheDigestGuessStatsReport(&pd->stats.guess, e, host);
  748.     storeAppendPrintf(e, "neventt timestampt secs from nowt secs from initn");
  749.     appendTime(initialized);
  750.     appendTime(needed);
  751.     appendTime(requested);
  752.     appendTime(received);
  753.     appendTime(next_check);
  754.     storeAppendPrintf(e, "peer digest state:n");
  755.     storeAppendPrintf(e, "tneeded: %3s, usable: %3s, requested: %3sn",
  756. f2s(needed), f2s(usable), f2s(requested));
  757.     storeAppendPrintf(e, "ntlast retry delay: %d secsn",
  758. pd->times.retry_delay);
  759.     storeAppendPrintf(e, "tlast request response time: %d secsn",
  760. pd->times.req_delay);
  761.     storeAppendPrintf(e, "tlast request result: %sn",
  762. pd->req_result ? pd->req_result : "(none)");
  763.     storeAppendPrintf(e, "npeer digest traffic:n");
  764.     storeAppendPrintf(e, "trequests sent: %d, volume: %d KBn",
  765. pd->stats.sent.msgs, (int) pd->stats.sent.kbytes.kb);
  766.     storeAppendPrintf(e, "treplies recv:  %d, volume: %d KBn",
  767. pd->stats.recv.msgs, (int) pd->stats.recv.kbytes.kb);
  768.     storeAppendPrintf(e, "npeer digest structure:n");
  769.     if (pd->cd)
  770. cacheDigestReport(pd->cd, host, e);
  771.     else
  772. storeAppendPrintf(e, "tno in-memory copyn");
  773. }
  774. #endif