pump.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:13k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. /*
  2.  * $Id: pump.c,v 1.71 1999/01/29 23:39:22 wessels Exp $
  3.  *
  4.  * DEBUG: section 61    PUMP handler
  5.  * AUTHOR: Kostas Anagnostakis
  6.  *
  7.  * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
  8.  * ----------------------------------------------------------
  9.  *
  10.  *  Squid is the result of efforts by numerous individuals from the
  11.  *  Internet community.  Development is led by Duane Wessels of the
  12.  *  National Laboratory for Applied Network Research and funded by the
  13.  *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
  14.  *  Duane Wessels and the University of California San Diego.  Please
  15.  *  see the COPYRIGHT file for full details.  Squid incorporates
  16.  *  software developed and/or copyrighted by other sources.  Please see
  17.  *  the CREDITS file for full details.
  18.  *
  19.  *  This program is free software; you can redistribute it and/or modify
  20.  *  it under the terms of the GNU General Public License as published by
  21.  *  the Free Software Foundation; either version 2 of the License, or
  22.  *  (at your option) any later version.
  23.  *  
  24.  *  This program is distributed in the hope that it will be useful,
  25.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  26.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  27.  *  GNU General Public License for more details.
  28.  *  
  29.  *  You should have received a copy of the GNU General Public License
  30.  *  along with this program; if not, write to the Free Software
  31.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  32.  *
  33.  */
  34. #include "squid.h"
  35. #define PUMP_MAXBUFFER 2*SQUID_UDP_SO_SNDBUF
  36. struct _PumpStateData {
  37.     FwdState *fwd;
  38.     request_t *req;
  39.     int c_fd; /* client fd */
  40.     int s_fd; /* server end */
  41.     int rcvd; /* bytes received from client */
  42.     int sent; /* bytes sent to server */
  43.     int cont_len; /* Content-Length header */
  44.     StoreEntry *request_entry; /* the request entry */
  45.     StoreEntry *reply_entry; /* the reply entry */
  46.     CWCB *callback; /* what to do when we finish sending */
  47.     void *cbdata; /* callback data passed to callback func */
  48.     struct {
  49. int closing:1;
  50.     } flags;
  51.     struct _PumpStateData *next;
  52. };
  53. #define PUMP_FLAG_CLOSING 0x01
  54. typedef struct _PumpStateData PumpStateData;
  55. static PumpStateData *pump_head = NULL;
  56. static PF pumpReadFromClient;
  57. static STCB pumpServerCopy;
  58. static CWCB pumpServerCopyComplete;
  59. static PF pumpFree;
  60. static PF pumpTimeout;
  61. static PF pumpServerClosed;
  62. static DEFER pumpReadDefer;
  63. static void pumpClose(void *data);
  64. void
  65. pumpInit(int fd, request_t * r, char *uri)
  66. {
  67.     request_flags flags;
  68.     LOCAL_ARRAY(char, new_key, MAX_URL + 8);
  69.     int clen = 0;
  70.     PumpStateData *p = xcalloc(1, sizeof(PumpStateData));
  71.     debug(61, 3) ("pumpInit: FD %d, uri=%sn", fd, uri);
  72.     /*
  73.      * create a StoreEntry which will buffer the data 
  74.      * to be pumped
  75.      */
  76.     assert(fd > -1);
  77.     assert(uri != NULL);
  78.     assert(r != NULL);
  79.     clen = httpHeaderGetInt(&r->header, HDR_CONTENT_LENGTH);
  80.     /* we shouldn't have gotten this far if content-length is invalid */
  81.     assert(clen >= 0);
  82.     debug(61, 4) ("pumpInit: Content-Length=%d.n", clen);
  83.     flags = null_request_flags;
  84.     flags.nocache = 1;
  85.     snprintf(new_key, MAX_URL + 5, "%s|Pump", uri);
  86.     p->request_entry = storeCreateEntry(new_key, new_key, flags, r->method);
  87.     storeClientListAdd(p->request_entry, p);
  88. #if DELAY_POOLS
  89.     delaySetStoreClient(p->request_entry, p, delayClient(r));
  90. #endif
  91.     /*
  92.      * initialize data structure
  93.      */
  94.     p->c_fd = fd;
  95.     p->s_fd = -1;
  96.     p->cont_len = clen;
  97.     p->req = requestLink(r);
  98.     p->callback = NULL;
  99.     p->cbdata = NULL;
  100.     p->next = pump_head;
  101.     pump_head = p;
  102.     cbdataAdd(p, cbdataXfree, 0);
  103.     comm_add_close_handler(p->c_fd, pumpFree, p);
  104.     commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
  105.     debug(61, 4) ("pumpInit: FD %d, Created %pn", fd, p);
  106. }
  107. void
  108. pumpStart(int s_fd, FwdState * fwd, CWCB * callback, void *cbdata)
  109. {
  110.     PumpStateData *p = NULL;
  111.     request_t *r = fwd->request;
  112.     size_t copy_sz;
  113.     debug(61, 3) ("pumpStart: FD %d, key %sn",
  114. s_fd, storeKeyText(fwd->entry->key));
  115.     /*
  116.      * find state data generated by pumpInit in linked list
  117.      */
  118.     for (p = pump_head; p && p->req != r; p = p->next);
  119.     assert(p != NULL);
  120.     assert(p->request_entry);
  121.     assert(p->c_fd > -1);
  122.     assert(r == p->req);
  123.     /*
  124.      * fill in the rest of data needed by the pump
  125.      */
  126.     p->fwd = fwd;
  127.     p->s_fd = s_fd;
  128.     p->reply_entry = fwd->entry;
  129.     p->callback = callback;
  130.     p->cbdata = cbdata;
  131.     cbdataLock(p->cbdata);
  132.     storeLockObject(p->reply_entry);
  133.     comm_add_close_handler(p->s_fd, pumpServerClosed, p);
  134.     /*
  135.      * see if part of the body is in the request
  136.      */
  137.     if (p->rcvd < p->cont_len && r->body_sz > 0) {
  138. assert(p->request_entry->store_status == STORE_PENDING);
  139. assert(r->body != NULL);
  140. assert(r->body_sz <= p->cont_len);
  141. copy_sz = XMIN(r->body_sz, p->cont_len);
  142. debug(61, 3) ("pumpStart: Appending %d bytes from r->bodyn", copy_sz);
  143. storeAppend(p->request_entry, r->body, copy_sz);
  144. p->rcvd = copy_sz;
  145.     }
  146.     /*
  147.      * Do we need to read more data from the client?
  148.      */
  149.     if (p->rcvd < p->cont_len) {
  150. assert(p->request_entry->store_status == STORE_PENDING);
  151. commSetSelect(p->c_fd, COMM_SELECT_READ, pumpReadFromClient, p, 0);
  152. commSetTimeout(p->c_fd, Config.Timeout.read, pumpTimeout, p);
  153. commSetDefer(p->c_fd, pumpReadDefer, p);
  154.     }
  155.     p->sent = 0;
  156.     if (p->sent == p->cont_len) {
  157. pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);
  158.     } else {
  159. storeClientCopy(p->request_entry, p->sent, p->sent, 4096,
  160.     memAllocate(MEM_4K_BUF),
  161.     pumpServerCopy, p);
  162.     }
  163. }
  164. static void
  165. pumpServerCopy(void *data, char *buf, ssize_t size)
  166. {
  167.     PumpStateData *p = data;
  168.     debug(61, 5) ("pumpServerCopy: called with size=%dn", size);
  169.     if (size < 0) {
  170. debug(61, 5) ("pumpServerCopy: freeing and returningn");
  171. memFree(buf, MEM_4K_BUF);
  172. return;
  173.     }
  174.     if (size == 0) {
  175. debug(61, 5) ("pumpServerCopy: done, finishingn", size);
  176. pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);
  177. memFree(buf, MEM_4K_BUF);
  178. return;
  179.     }
  180.     debug(61, 5) ("pumpServerCopy: to FD %d, %d bytesn", p->s_fd, size);
  181.     comm_write(p->s_fd, buf, size, pumpServerCopyComplete, p, memFree4K);
  182. }
  183. static void
  184. pumpServerCopyComplete(int fd, char *bufnotused, size_t size, int errflag, void *data)
  185. {
  186.     PumpStateData *p = data;
  187.     int sfd;
  188.     debug(61, 5) ("pumpServerCopyComplete: called with size=%d (%d,%d)n",
  189. size, p->sent + size, p->cont_len);
  190.     if (errflag == COMM_ERR_CLOSING)
  191. return;
  192.     if (errflag != 0) {
  193. debug(61, 5) ("pumpServerCopyComplete: aborted, errflag %dn", errflag);
  194. pumpClose(p);
  195. return;
  196.     }
  197.     if (EBIT_TEST(p->request_entry->flags, ENTRY_ABORTED)) {
  198. debug(61, 5) ("pumpServerCopyComplete: ENTRY_ABORTEDn");
  199. pumpClose(p);
  200. return;
  201.     }
  202.     p->sent += size;
  203.     assert(p->sent <= p->cont_len);
  204.     if (p->sent < p->cont_len) {
  205. storeClientCopy(p->request_entry, p->sent, p->sent, 4096,
  206.     memAllocate(MEM_4K_BUF),
  207.     pumpServerCopy, p);
  208. return;
  209.     }
  210.     debug(61, 5) ("pumpServerCopyComplete: Done!n", size);
  211.     /*
  212.      * we don't care what happens on the server side now
  213.      */
  214.     sfd = p->s_fd;
  215.     comm_remove_close_handler(p->s_fd, pumpServerClosed, p);
  216.     p->s_fd = -1;
  217.     if (cbdataValid(p->cbdata))
  218. p->callback(sfd, NULL, p->sent, 0, p->cbdata);
  219.     cbdataUnlock(p->cbdata);
  220.     storeUnlockObject(p->reply_entry);
  221.     p->reply_entry = NULL;
  222. }
  223. static void
  224. pumpReadFromClient(int fd, void *data)
  225. {
  226.     PumpStateData *p = data;
  227.     StoreEntry *req = p->request_entry;
  228.     LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF);
  229.     int bytes_to_read = XMIN(p->cont_len - p->rcvd, SQUID_TCP_SO_RCVBUF);
  230.     int len = 0;
  231.     errno = 0;
  232.     Counter.syscalls.sock.reads++;
  233.     len = read(fd, buf, bytes_to_read);
  234.     fd_bytes(fd, len, FD_READ);
  235.     debug(61, 5) ("pumpReadFromClient: FD %d: len %d.n", fd, len);
  236.     if (len > 0) {
  237. (void) 0; /* continue */
  238.     } else if (len < 0) {
  239. debug(61, 2) ("pumpReadFromClient: FD %d: read failure: %s.n",
  240.     fd, xstrerror());
  241. if (ignoreErrno(errno)) {
  242.     debug(61, 5) ("pumpReadFromClient: FD %d: len %d and ignore!n",
  243. fd, len);
  244.     commSetSelect(fd,
  245. COMM_SELECT_READ,
  246. pumpReadFromClient,
  247. p,
  248. Config.Timeout.read);
  249. } else {
  250.     debug(61, 2) ("pumpReadFromClient: aborted.n");
  251.     pumpClose(p);
  252. }
  253. return;
  254.     } else if (req->mem_obj->inmem_hi == 0) {
  255. debug(61, 2) ("pumpReadFromClient: FD %d: failed.n", fd);
  256. pumpClose(p);
  257. return;
  258.     } else if (p->rcvd < p->cont_len) {
  259. debug(61, 4) ("pumpReadFromClient: FD %d, incomplete requestn", fd);
  260. pumpClose(p);
  261. return;
  262.     }
  263.     if (len > 0) {
  264. int delta = p->rcvd + len - p->cont_len;
  265. if (delta > 0 && p->req->flags.proxy_keepalive) {
  266.     debug(61, delta == 2 ? 3 : 1) ("pumpReadFromClient: Warning: read %d bytes past content-length, truncatingn", delta);
  267.     len = p->cont_len - p->rcvd;
  268. }
  269. storeAppend(req, buf, len);
  270. p->rcvd += len;
  271.     }
  272.     if (p->rcvd < p->cont_len) {
  273. /* We need more data */
  274. commSetSelect(fd, COMM_SELECT_READ, pumpReadFromClient,
  275.     p, Config.Timeout.read);
  276. return;
  277.     }
  278.     /* all done! */
  279.     if (p->req->flags.proxy_keepalive)
  280. assert(p->rcvd == p->cont_len);
  281.     debug(61, 2) ("pumpReadFromClient: finished!n");
  282.     storeComplete(req);
  283.     commSetDefer(p->c_fd, NULL, NULL);
  284. }
  285. static int
  286. pumpReadDefer(int fd, void *data)
  287. {
  288.     PumpStateData *p = data;
  289.     assert(p->rcvd >= p->sent);
  290.     if ((p->rcvd - p->sent) < PUMP_MAXBUFFER)
  291. return 0;
  292.     debug(61, 5) ("pumpReadDefer: deferring, rcvd=%d, sent=%dn",
  293. p->rcvd, p->sent);
  294.     return 1;
  295. }
  296. static void
  297. pumpClose(void *data)
  298. {
  299.     PumpStateData *p = data;
  300.     StoreEntry *req = p->request_entry;
  301.     StoreEntry *rep = p->reply_entry;
  302.     cbdataLock(p);
  303.     debug(61, 3) ("pumpClose: %p Server FD %d, Client FD %dn",
  304. p, p->s_fd, p->c_fd);
  305.     /* double-call detection */
  306.     assert(!p->flags.closing);
  307.     p->flags.closing = 1;
  308.     if (req != NULL && req->store_status == STORE_PENDING) {
  309. storeUnregister(req, p);
  310.     }
  311.     if (rep != NULL && rep->store_status == STORE_PENDING) {
  312. ErrorState *err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);
  313. fwdFail(p->fwd, err);
  314.     }
  315.     if (p->s_fd > -1) {
  316. comm_close(p->s_fd);
  317. p->s_fd = -1;
  318.     }
  319.     if (p->c_fd > -1) {
  320. comm_close(p->c_fd);
  321.     }
  322.     /* This tests that pumpFree() got called somewhere */
  323.     assert(0 == cbdataValid(p));
  324.     cbdataUnlock(p);
  325. }
  326. static void
  327. pumpFree(int fd, void *data)
  328. {
  329.     PumpStateData *p = NULL;
  330.     PumpStateData *q = NULL;
  331.     StoreEntry *req;
  332.     StoreEntry *rep;
  333.     debug(61, 3) ("pumpFree: FD %d, releasing %p!n", fd, data);
  334.     for (p = pump_head; p && p != data; q = p, p = p->next);
  335.     if (p == NULL) {
  336. debug(61, 1) ("pumpFree: p=%p not found?n", p);
  337. return;
  338.     }
  339.     if (q)
  340. q->next = p->next;
  341.     else
  342. pump_head = p->next;
  343.     assert(fd == p->c_fd);
  344.     p->c_fd = -1;
  345.     req = p->request_entry;
  346.     rep = p->reply_entry;
  347.     if (req != NULL) {
  348. storeUnregister(req, p);
  349. storeUnlockObject(req);
  350. p->request_entry = NULL;
  351.     }
  352.     if (rep != NULL) {
  353. debug(61, 3) ("pumpFree: did the server-side FD (%d) get closed?n", p->s_fd);
  354. storeUnlockObject(rep);
  355. p->reply_entry = NULL;
  356.     }
  357.     requestUnlink(p->req);
  358.     if (p->s_fd > -1) {
  359. assert(!fd_table[p->s_fd].flags.open);
  360. p->s_fd = -1;
  361.     }
  362.     cbdataFree(p);
  363. }
  364. static void
  365. pumpTimeout(int fd, void *data)
  366. {
  367.     PumpStateData *p = data;
  368.     debug(61, 3) ("pumpTimeout: FD %dn", p->c_fd);
  369.     pumpClose(p);
  370. }
  371. /*
  372.  *This is called only if the client connect closes unexpectedly
  373.  */
  374. static void
  375. pumpServerClosed(int fd, void *data)
  376. {
  377.     PumpStateData *p = data;
  378.     debug(61, 3) ("pumpServerClosed: FD %dn", fd);
  379.     /*
  380.      * we have been called from comm_close for the server side, so
  381.      * just need to clean up the client side
  382.      */
  383.     assert(p->s_fd == fd);
  384.     p->s_fd = -1;
  385.     if (p->flags.closing)
  386. return;
  387.     if (p->c_fd > -1)
  388. comm_close(p->c_fd);
  389. }
  390. /*
  391.  * This function returns true for the request methods handled
  392.  * by this module
  393.  */
  394. int
  395. pumpMethod(method_t method)
  396. {
  397.     switch (method) {
  398.     case METHOD_POST:
  399.     case METHOD_PUT:
  400. return 1;
  401. break;
  402.     default:
  403. return 0;
  404. break;
  405.     }
  406.     /* NOTREACHED */
  407. }
  408. /*
  409.  * This function returns True if we can submit this request again.
  410.  * The request may have been pipelined, but the connection got
  411.  * closed before we got a reply.  If we still have the whole
  412.  * request in memory then we can send it again.  If we want to
  413.  * be able to restart very large requests, then we'll have to
  414.  * swap them out to disk.
  415.  */
  416. int
  417. pumpRestart(request_t * r)
  418. {
  419.     PumpStateData *p;
  420.     MemObject *mem;
  421.     for (p = pump_head; p && p->req != r; p = p->next);
  422.     if (p == NULL) {
  423. debug(61, 3) ("pumpRestart: NO: Can't find pumpState!n");
  424. return 0;
  425.     }
  426.     mem = p->request_entry->mem_obj;
  427.     if (mem == NULL) {
  428. debug(61, 3) ("pumpRestart: NO: request_entry->mem_obj == NULL!n");
  429. return 0;
  430.     }
  431.     if (mem->inmem_lo > 0) {
  432. debug(61, 3) ("pumpRestart: NO: mem->inmem_lo == %dn",
  433.     (int) mem->inmem_lo);
  434. return 0;
  435.     }
  436.     debug(61, 3) ("pumpRestart: YES!n");
  437.     return 1;
  438. }