store_client.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:17k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. /*
  2.  * $Id: store_client.c,v 1.59.2.3 1999/02/12 05:02:12 wessels Exp $
  3.  *
  4.  * DEBUG: section 20    Storage Manager Client-Side Interface
  5.  * AUTHOR: Duane Wessels
  6.  *
  7.  * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
  8.  * ----------------------------------------------------------
  9.  *
  10.  *  Squid is the result of efforts by numerous individuals from the
  11.  *  Internet community.  Development is led by Duane Wessels of the
  12.  *  National Laboratory for Applied Network Research and funded by the
  13.  *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
  14.  *  Duane Wessels and the University of California San Diego.  Please
  15.  *  see the COPYRIGHT file for full details.  Squid incorporates
  16.  *  software developed and/or copyrighted by other sources.  Please see
  17.  *  the CREDITS file for full details.
  18.  *
  19.  *  This program is free software; you can redistribute it and/or modify
  20.  *  it under the terms of the GNU General Public License as published by
  21.  *  the Free Software Foundation; either version 2 of the License, or
  22.  *  (at your option) any later version.
  23.  *  
  24.  *  This program is distributed in the hope that it will be useful,
  25.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  26.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  27.  *  GNU General Public License for more details.
  28.  *  
  29.  *  You should have received a copy of the GNU General Public License
  30.  *  along with this program; if not, write to the Free Software
  31.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  32.  *
  33.  */
  34. #include "squid.h"
  35. /*
  36.  * NOTE: 'Header' refers to the swapfile metadata header.
  37.  *       'Body' refers to the swapfile body, which is the full
  38.  *        HTTP reply (including HTTP headers and body).
  39.  */
  40. static DRCB storeClientReadBody;
  41. static DRCB storeClientReadHeader;
  42. static SIH storeClientFileOpened;
  43. static void storeClientCopy2(StoreEntry * e, store_client * sc);
  44. static void storeClientFileRead(store_client * sc);
  45. static EVH storeClientCopyEvent;
  46. static store_client_t storeClientType(StoreEntry *);
  47. static int CheckQuickAbort2(StoreEntry * entry);
  48. static void CheckQuickAbort(StoreEntry * entry);
  49. /* check if there is any client waiting for this object at all */
  50. /* return 1 if there is at least one client */
  51. int
  52. storeClientWaiting(const StoreEntry * e)
  53. {
  54.     MemObject *mem = e->mem_obj;
  55.     store_client *sc;
  56.     for (sc = mem->clients; sc; sc = sc->next) {
  57. if (sc->callback_data != NULL)
  58.     return 1;
  59.     }
  60.     return 0;
  61. }
  62. store_client *
  63. storeClientListSearch(const MemObject * mem, void *data)
  64. {
  65.     store_client *sc;
  66.     for (sc = mem->clients; sc; sc = sc->next) {
  67. if (sc->callback_data == data)
  68.     break;
  69.     }
  70.     return sc;
  71. }
  72. static store_client_t
  73. storeClientType(StoreEntry * e)
  74. {
  75.     MemObject *mem = e->mem_obj;
  76.     if (mem->inmem_lo)
  77. return STORE_DISK_CLIENT;
  78.     if (EBIT_TEST(e->flags, ENTRY_ABORTED)) {
  79. /* I don't think we should be adding clients to aborted entries */
  80. debug(20, 1) ("storeClientType: adding to ENTRY_ABORTED entryn");
  81. return STORE_MEM_CLIENT;
  82.     }
  83.     if (e->store_status == STORE_OK) {
  84. if (mem->inmem_lo == 0 && mem->inmem_hi > 0)
  85.     return STORE_MEM_CLIENT;
  86. else
  87.     return STORE_DISK_CLIENT;
  88.     }
  89.     /* here and past, entry is STORE_PENDING */
  90.     /*
  91.      * If this is the first client, let it be the mem client
  92.      */
  93.     else if (mem->nclients == 1)
  94. return STORE_MEM_CLIENT;
  95.     /*
  96.      * otherwise, make subsequent clients read from disk so they
  97.      * can not delay the first, and vice-versa.
  98.      */
  99.     else
  100. return STORE_DISK_CLIENT;
  101. }
  102. /* add client with fd to client list */
  103. void
  104. storeClientListAdd(StoreEntry * e, void *data)
  105. {
  106.     MemObject *mem = e->mem_obj;
  107.     store_client **T;
  108.     store_client *sc;
  109.     assert(mem);
  110.     if (storeClientListSearch(mem, data) != NULL)
  111. return;
  112.     mem->nclients++;
  113.     sc = memAllocate(MEM_STORE_CLIENT);
  114.     cbdataAdd(sc, memFree, MEM_STORE_CLIENT); /* sc is callback_data for file_read */
  115.     sc->callback_data = data;
  116.     sc->seen_offset = 0;
  117.     sc->copy_offset = 0;
  118.     sc->swapin_fd = -1;
  119.     sc->flags.disk_io_pending = 0;
  120.     sc->entry = e;
  121.     sc->type = storeClientType(e);
  122.     if (sc->type == STORE_DISK_CLIENT)
  123. /* assert we'll be able to get the data we want */
  124. /* maybe we should open swapin_fd here */
  125. assert(e->swap_file_number > -1 || storeSwapOutAble(e));
  126.     for (T = &mem->clients; *T; T = &(*T)->next);
  127.     *T = sc;
  128. }
  129. static void
  130. storeClientCopyEvent(void *data)
  131. {
  132.     store_client *sc = data;
  133.     debug(20, 3) ("storeClientCopyEvent: Runningn");
  134.     sc->flags.copy_event_pending = 0;
  135.     if (!sc->callback)
  136. return;
  137.     storeClientCopy2(sc->entry, sc);
  138. }
  139. /* copy bytes requested by the client */
  140. void
  141. storeClientCopy(StoreEntry * e,
  142.     off_t seen_offset,
  143.     off_t copy_offset,
  144.     size_t size,
  145.     char *buf,
  146.     STCB * callback,
  147.     void *data)
  148. {
  149.     store_client *sc;
  150.     assert(!EBIT_TEST(e->flags, ENTRY_ABORTED));
  151.     debug(20, 3) ("storeClientCopy: %s, seen %d, want %d, size %d, cb %p, cbdata %pn",
  152. storeKeyText(e->key),
  153. (int) seen_offset,
  154. (int) copy_offset,
  155. (int) size,
  156. callback,
  157. data);
  158.     sc = storeClientListSearch(e->mem_obj, data);
  159.     assert(sc != NULL);
  160.     assert(sc->callback == NULL);
  161.     sc->copy_offset = copy_offset;
  162.     sc->seen_offset = seen_offset;
  163.     sc->callback = callback;
  164.     sc->copy_buf = buf;
  165.     sc->copy_size = size;
  166.     sc->copy_offset = copy_offset;
  167.     storeClientCopy2(e, sc);
  168. }
  169. /*
  170.  * This function is used below to decide if we have any more data to
  171.  * send to the client.  If the store_status is STORE_PENDING, then we
  172.  * do have more data to send.  If its STORE_OK, then
  173.  * we continue checking.  If the object length is negative, then we
  174.  * don't know the real length and must open the swap file to find out.
  175.  * If the length is >= 0, then we compare it to the requested copy
  176.  * offset.
  177.  */
  178. static int
  179. storeClientNoMoreToSend(StoreEntry * e, store_client * sc)
  180. {
  181.     ssize_t len;
  182.     if (e->store_status == STORE_PENDING)
  183. return 0;
  184.     if ((len = objectLen(e)) < 0)
  185. return 0;
  186.     if (sc->copy_offset < len)
  187. return 0;
  188.     return 1;
  189. }
  190. static void
  191. storeClientCopy2(StoreEntry * e, store_client * sc)
  192. {
  193.     STCB *callback = sc->callback;
  194.     MemObject *mem = e->mem_obj;
  195.     size_t sz;
  196.     if (sc->flags.copy_event_pending)
  197. return;
  198.     if (EBIT_TEST(e->flags, ENTRY_FWD_HDR_WAIT)) {
  199. debug(20, 5) ("storeClientCopy2: returning because ENTRY_FWD_HDR_WAIT setn");
  200. return;
  201.     }
  202.     if (sc->flags.store_copying) {
  203. sc->flags.copy_event_pending = 1;
  204. debug(20, 3) ("storeClientCopy2: Queueing storeClientCopyEvent()n");
  205. eventAdd("storeClientCopyEvent", storeClientCopyEvent, sc, 0.0, 0);
  206. return;
  207.     }
  208.     cbdataLock(sc); /* ick, prevent sc from getting freed */
  209.     sc->flags.store_copying = 1;
  210.     debug(20, 3) ("storeClientCopy2: %sn", storeKeyText(e->key));
  211.     assert(callback != NULL);
  212.     /*
  213.      * We used to check for ENTRY_ABORTED here.  But there were some
  214.      * problems.  For example, we might have a slow client (or two) and
  215.      * the server-side is reading far ahead and swapping to disk.  Even
  216.      * if the server-side aborts, we want to give the client(s)
  217.      * everything we got before the abort condition occurred.
  218.      */
  219.     if (storeClientNoMoreToSend(e, sc)) {
  220. /* There is no more to send! */
  221. #if USE_ASYNC_IO
  222. if (sc->flags.disk_io_pending) {
  223.     if (sc->swapin_fd >= 0)
  224. aioCancel(sc->swapin_fd, NULL);
  225.     else
  226. aioCancel(-1, sc);
  227. }
  228. #endif
  229. sc->flags.disk_io_pending = 0;
  230. sc->callback = NULL;
  231. callback(sc->callback_data, sc->copy_buf, 0);
  232.     } else if (e->store_status == STORE_PENDING && sc->seen_offset >= mem->inmem_hi) {
  233. /* client has already seen this, wait for more */
  234. debug(20, 3) ("storeClientCopy2: Waiting for moren");
  235.     } else if (sc->copy_offset >= mem->inmem_lo && sc->copy_offset < mem->inmem_hi) {
  236. /* What the client wants is in memory */
  237. debug(20, 3) ("storeClientCopy2: Copying from memoryn");
  238. sz = stmemCopy(&mem->data_hdr, sc->copy_offset, sc->copy_buf, sc->copy_size);
  239. #if USE_ASYNC_IO
  240. if (sc->flags.disk_io_pending) {
  241.     if (sc->swapin_fd >= 0)
  242. aioCancel(sc->swapin_fd, NULL);
  243.     else
  244. aioCancel(-1, sc);
  245. }
  246. #endif
  247. sc->flags.disk_io_pending = 0;
  248. sc->callback = NULL;
  249. callback(sc->callback_data, sc->copy_buf, sz);
  250.     } else if (sc->swapin_fd < 0) {
  251. debug(20, 3) ("storeClientCopy2: Need to open swap in filen");
  252. assert(sc->type == STORE_DISK_CLIENT);
  253. /* gotta open the swapin file */
  254. if (storeTooManyDiskFilesOpen()) {
  255.     /* yuck -- this causes a TCP_SWAPFAIL_MISS on the client side */
  256.     sc->callback = NULL;
  257.     callback(sc->callback_data, sc->copy_buf, -1);
  258. } else if (!sc->flags.disk_io_pending) {
  259.     sc->flags.disk_io_pending = 1;
  260.     storeSwapInStart(e, storeClientFileOpened, sc);
  261. } else {
  262.     debug(20, 2) ("storeClientCopy2: Averted multiple fd operationn");
  263. }
  264.     } else {
  265. debug(20, 3) ("storeClientCopy: reading from disk FD %dn",
  266.     sc->swapin_fd);
  267. assert(sc->type == STORE_DISK_CLIENT);
  268. if (!sc->flags.disk_io_pending) {
  269.     storeClientFileRead(sc);
  270. } else {
  271.     debug(20, 2) ("storeClientCopy2: Averted multiple fd operationn");
  272. }
  273.     }
  274.     sc->flags.store_copying = 0;
  275.     cbdataUnlock(sc); /* ick, allow sc to be freed */
  276. }
  277. static void
  278. storeClientFileOpened(int fd, void *data)
  279. {
  280.     store_client *sc = data;
  281.     STCB *callback = sc->callback;
  282.     if (fd < 0) {
  283. debug(20, 3) ("storeClientFileOpened: failedn");
  284. sc->flags.disk_io_pending = 0;
  285. sc->callback = NULL;
  286. callback(sc->callback_data, sc->copy_buf, -1);
  287. return;
  288.     }
  289.     sc->swapin_fd = fd;
  290.     storeClientFileRead(sc);
  291. }
  292. static void
  293. storeClientFileRead(store_client * sc)
  294. {
  295.     MemObject *mem = sc->entry->mem_obj;
  296.     assert(sc->callback != NULL);
  297. #ifdef OPTIMISTIC_IO
  298.     sc->flags.disk_io_pending = 1;
  299. #endif
  300.     if (mem->swap_hdr_sz == 0) {
  301. file_read(sc->swapin_fd,
  302.     sc->copy_buf,
  303.     sc->copy_size,
  304.     0,
  305.     storeClientReadHeader,
  306.     sc);
  307.     } else {
  308. if (sc->entry->swap_status == SWAPOUT_WRITING)
  309.     assert(mem->swapout.done_offset > sc->copy_offset + mem->swap_hdr_sz);
  310. file_read(sc->swapin_fd,
  311.     sc->copy_buf,
  312.     sc->copy_size,
  313.     sc->copy_offset + mem->swap_hdr_sz,
  314.     storeClientReadBody,
  315.     sc);
  316.     }
  317. #ifndef OPTIMISTIC_IO
  318.     sc->flags.disk_io_pending = 1;
  319. #endif
  320. }
  321. static void
  322. storeClientReadBody(int fd, const char *buf, int len, int flagnotused, void *data)
  323. {
  324.     store_client *sc = data;
  325.     MemObject *mem = sc->entry->mem_obj;
  326.     STCB *callback = sc->callback;
  327.     assert(sc->flags.disk_io_pending);
  328.     sc->flags.disk_io_pending = 0;
  329.     assert(sc->callback != NULL);
  330.     debug(20, 3) ("storeClientReadBody: FD %d, len %dn", fd, len);
  331.     if (sc->copy_offset == 0 && len > 0 && mem->reply->sline.status == 0)
  332. httpReplyParse(mem->reply, sc->copy_buf);
  333.     sc->callback = NULL;
  334.     callback(sc->callback_data, sc->copy_buf, len);
  335. }
  336. static void
  337. storeClientReadHeader(int fd, const char *buf, int len, int flagnotused, void *data)
  338. {
  339.     store_client *sc = data;
  340.     StoreEntry *e = sc->entry;
  341.     MemObject *mem = e->mem_obj;
  342.     STCB *callback = sc->callback;
  343.     int swap_hdr_sz = 0;
  344.     size_t body_sz;
  345.     size_t copy_sz;
  346.     tlv *tlv_list;
  347.     assert(sc->flags.disk_io_pending);
  348.     sc->flags.disk_io_pending = 0;
  349.     assert(sc->callback != NULL);
  350.     debug(20, 3) ("storeClientReadHeader: FD %d, len %dn", fd, len);
  351.     if (len < 0) {
  352. debug(20, 3) ("storeClientReadHeader: FD %d: %sn", fd, xstrerror());
  353. sc->callback = NULL;
  354. callback(sc->callback_data, sc->copy_buf, len);
  355. return;
  356.     }
  357.     tlv_list = storeSwapMetaUnpack(buf, &swap_hdr_sz);
  358.     if (tlv_list == NULL) {
  359. debug(20, 1) ("storeClientReadHeader: failed to unpack meta datan");
  360. sc->callback = NULL;
  361. callback(sc->callback_data, sc->copy_buf, -1);
  362. return;
  363.     }
  364.     /*
  365.      * XXX Here we should check the meta data and make sure we got
  366.      * the right object.
  367.      */
  368.     storeSwapTLVFree(tlv_list);
  369.     mem->swap_hdr_sz = swap_hdr_sz;
  370.     mem->object_sz = e->swap_file_sz - swap_hdr_sz;
  371.     /*
  372.      * If our last read got some data the client wants, then give
  373.      * it to them, otherwise schedule another read.
  374.      */
  375.     body_sz = len - swap_hdr_sz;
  376.     if (sc->copy_offset < body_sz) {
  377. /*
  378.  * we have (part of) what they want
  379.  */
  380. copy_sz = XMIN(sc->copy_size, body_sz);
  381. debug(20, 3) ("storeClientReadHeader: copying %d bytes of bodyn",
  382.     copy_sz);
  383. xmemmove(sc->copy_buf, sc->copy_buf + swap_hdr_sz, copy_sz);
  384. if (sc->copy_offset == 0 && len > 0 && mem->reply->sline.status == 0)
  385.     httpReplyParse(mem->reply, sc->copy_buf);
  386. sc->callback = NULL;
  387. callback(sc->callback_data, sc->copy_buf, copy_sz);
  388. return;
  389.     }
  390.     /*
  391.      * we don't have what the client wants, but at least we now
  392.      * know the swap header size.
  393.      */
  394.     storeClientFileRead(sc);
  395. }
  396. int
  397. storeClientCopyPending(StoreEntry * e, void *data)
  398. {
  399.     /* return 1 if there is a callback registered for this client */
  400.     store_client *sc = storeClientListSearch(e->mem_obj, data);
  401.     if (sc == NULL)
  402. return 0;
  403.     if (sc->callback == NULL)
  404. return 0;
  405.     return 1;
  406. }
  407. int
  408. storeUnregister(StoreEntry * e, void *data)
  409. {
  410.     MemObject *mem = e->mem_obj;
  411.     store_client *sc;
  412.     store_client **S;
  413.     STCB *callback;
  414.     if (mem == NULL)
  415. return 0;
  416.     debug(20, 3) ("storeUnregister: called for '%s'n", storeKeyText(e->key));
  417.     for (S = &mem->clients; (sc = *S) != NULL; S = &(*S)->next) {
  418. if (sc->callback_data == data)
  419.     break;
  420.     }
  421.     if (sc == NULL)
  422. return 0;
  423.     if (sc == mem->clients) {
  424. /*
  425.  * If we are unregistering the _first_ client for this
  426.  * entry, then we have to reset the client FD to -1.
  427.  */
  428. mem->fd = -1;
  429.     }
  430.     *S = sc->next;
  431.     mem->nclients--;
  432.     sc->flags.disk_io_pending = 0;
  433.     if (e->store_status == STORE_OK && e->swap_status != SWAPOUT_DONE)
  434. storeCheckSwapOut(e);
  435.     if (sc->swapin_fd > -1) {
  436. commSetSelect(sc->swapin_fd, COMM_SELECT_READ, NULL, NULL, 0);
  437. file_close(sc->swapin_fd);
  438. store_open_disk_fd--;
  439. /* XXX this probably leaks file_read handler structures */
  440.     }
  441. #if USE_ASYNC_IO
  442.     else
  443. aioCancel(-1, sc);
  444. #endif
  445.     if ((callback = sc->callback) != NULL) {
  446. /* callback with ssize = -1 to indicate unexpected termination */
  447. debug(20, 3) ("storeUnregister: store_client for %s has a callbackn",
  448.     mem->url);
  449. sc->callback = NULL;
  450. callback(sc->callback_data, sc->copy_buf, -1);
  451.     }
  452.     cbdataFree(sc);
  453.     assert(e->lock_count > 0);
  454.     if (mem->nclients == 0)
  455. CheckQuickAbort(e);
  456.     return 1;
  457. }
  458. off_t
  459. storeLowestMemReaderOffset(const StoreEntry * entry)
  460. {
  461.     const MemObject *mem = entry->mem_obj;
  462.     off_t lowest = mem->inmem_hi;
  463.     store_client *sc;
  464.     store_client *nx = NULL;
  465.     for (sc = mem->clients; sc; sc = nx) {
  466. nx = sc->next;
  467. if (sc->callback_data == NULL) /* open slot */
  468.     continue;
  469. if (sc->type != STORE_MEM_CLIENT)
  470.     continue;
  471. if (sc->copy_offset < lowest)
  472.     lowest = sc->copy_offset;
  473.     }
  474.     return lowest;
  475. }
  476. /* Call handlers waiting for  data to be appended to E. */
  477. void
  478. InvokeHandlers(StoreEntry * e)
  479. {
  480.     int i = 0;
  481.     MemObject *mem = e->mem_obj;
  482.     store_client *sc;
  483.     store_client *nx = NULL;
  484.     assert(mem->clients != NULL || mem->nclients == 0);
  485.     debug(20, 3) ("InvokeHandlers: %sn", storeKeyText(e->key));
  486.     /* walk the entire list looking for valid callbacks */
  487.     for (sc = mem->clients; sc; sc = nx) {
  488. nx = sc->next;
  489. debug(20, 3) ("InvokeHandlers: checking client #%dn", i++);
  490. if (sc->callback_data == NULL)
  491.     continue;
  492. if (sc->callback == NULL)
  493.     continue;
  494. storeClientCopy2(e, sc);
  495.     }
  496. }
  497. int
  498. storePendingNClients(const StoreEntry * e)
  499. {
  500.     MemObject *mem = e->mem_obj;
  501.     int npend = NULL == mem ? 0 : mem->nclients;
  502.     debug(20, 3) ("storePendingNClients: returning %dn", npend);
  503.     return npend;
  504. }
  505. /* return 1 if the request should be aborted */
  506. static int
  507. CheckQuickAbort2(StoreEntry * entry)
  508. {
  509.     int curlen;
  510.     int minlen;
  511.     int expectlen;
  512.     MemObject *mem = entry->mem_obj;
  513.     assert(mem);
  514.     debug(20, 3) ("CheckQuickAbort2: entry=%p, mem=%pn", entry, mem);
  515.     if (mem->request && !mem->request->flags.cachable) {
  516. debug(20, 3) ("CheckQuickAbort2: YES !mem->request->flags.cachablen");
  517. return 1;
  518.     }
  519.     if (EBIT_TEST(entry->flags, KEY_PRIVATE)) {
  520. debug(20, 3) ("CheckQuickAbort2: YES KEY_PRIVATEn");
  521. return 1;
  522.     }
  523.     expectlen = mem->reply->content_length + mem->reply->hdr_sz;
  524.     curlen = (int) mem->inmem_hi;
  525.     minlen = (int) Config.quickAbort.min << 10;
  526.     if (minlen < 0) {
  527. debug(20, 3) ("CheckQuickAbort2: NO disabledn");
  528. return 0;
  529.     }
  530.     if (curlen > expectlen) {
  531. debug(20, 3) ("CheckQuickAbort2: YES bad content lengthn");
  532. return 1;
  533.     }
  534.     if ((expectlen - curlen) < minlen) {
  535. debug(20, 3) ("CheckQuickAbort2: NO only little more leftn");
  536. return 0;
  537.     }
  538.     if ((expectlen - curlen) > (Config.quickAbort.max << 10)) {
  539. debug(20, 3) ("CheckQuickAbort2: YES too much left to gon");
  540. return 1;
  541.     }
  542.     if (expectlen < 100) {
  543. debug(20, 3) ("CheckQuickAbort2: NO avoid FPEn");
  544. return 0;
  545.     }
  546.     if ((curlen / (expectlen / 100)) > Config.quickAbort.pct) {
  547. debug(20, 3) ("CheckQuickAbort2: NO past point of no returnn");
  548. return 0;
  549.     }
  550.     debug(20, 3) ("CheckQuickAbort2: YES default, returning 1n");
  551.     return 1;
  552. }
  553. static void
  554. CheckQuickAbort(StoreEntry * entry)
  555. {
  556.     if (entry == NULL)
  557. return;
  558.     if (storePendingNClients(entry) > 0)
  559. return;
  560.     if (entry->store_status != STORE_PENDING)
  561. return;
  562.     if (EBIT_TEST(entry->flags, ENTRY_SPECIAL))
  563. return;
  564.     if (CheckQuickAbort2(entry) == 0)
  565. return;
  566.     Counter.aborted_requests++;
  567.     storeAbort(entry);
  568. }