pump.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:13k
- /*
- * $Id: pump.c,v 1.71 1999/01/29 23:39:22 wessels Exp $
- *
- * DEBUG: section 61 PUMP handler
- * AUTHOR: Kostas Anagnostakis
- *
- * SQUID Internet Object Cache http://squid.nlanr.net/Squid/
- * ----------------------------------------------------------
- *
- * Squid is the result of efforts by numerous individuals from the
- * Internet community. Development is led by Duane Wessels of the
- * National Laboratory for Applied Network Research and funded by the
- * National Science Foundation. Squid is Copyrighted (C) 1998 by
- * Duane Wessels and the University of California San Diego. Please
- * see the COPYRIGHT file for full details. Squid incorporates
- * software developed and/or copyrighted by other sources. Please see
- * the CREDITS file for full details.
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 2 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program; if not, write to the Free Software
- * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
- *
- */
- #include "squid.h"
- #define PUMP_MAXBUFFER 2*SQUID_UDP_SO_SNDBUF
- struct _PumpStateData {
- FwdState *fwd;
- request_t *req;
- int c_fd; /* client fd */
- int s_fd; /* server end */
- int rcvd; /* bytes received from client */
- int sent; /* bytes sent to server */
- int cont_len; /* Content-Length header */
- StoreEntry *request_entry; /* the request entry */
- StoreEntry *reply_entry; /* the reply entry */
- CWCB *callback; /* what to do when we finish sending */
- void *cbdata; /* callback data passed to callback func */
- struct {
- int closing:1;
- } flags;
- struct _PumpStateData *next;
- };
- #define PUMP_FLAG_CLOSING 0x01
- typedef struct _PumpStateData PumpStateData;
- static PumpStateData *pump_head = NULL;
- static PF pumpReadFromClient;
- static STCB pumpServerCopy;
- static CWCB pumpServerCopyComplete;
- static PF pumpFree;
- static PF pumpTimeout;
- static PF pumpServerClosed;
- static DEFER pumpReadDefer;
- static void pumpClose(void *data);
- void
- pumpInit(int fd, request_t * r, char *uri)
- {
- request_flags flags;
- LOCAL_ARRAY(char, new_key, MAX_URL + 8);
- int clen = 0;
- PumpStateData *p = xcalloc(1, sizeof(PumpStateData));
- debug(61, 3) ("pumpInit: FD %d, uri=%sn", fd, uri);
- /*
- * create a StoreEntry which will buffer the data
- * to be pumped
- */
- assert(fd > -1);
- assert(uri != NULL);
- assert(r != NULL);
- clen = httpHeaderGetInt(&r->header, HDR_CONTENT_LENGTH);
- /* we shouldn't have gotten this far if content-length is invalid */
- assert(clen >= 0);
- debug(61, 4) ("pumpInit: Content-Length=%d.n", clen);
- flags = null_request_flags;
- flags.nocache = 1;
- snprintf(new_key, MAX_URL + 5, "%s|Pump", uri);
- p->request_entry = storeCreateEntry(new_key, new_key, flags, r->method);
- storeClientListAdd(p->request_entry, p);
- #if DELAY_POOLS
- delaySetStoreClient(p->request_entry, p, delayClient(r));
- #endif
- /*
- * initialize data structure
- */
- p->c_fd = fd;
- p->s_fd = -1;
- p->cont_len = clen;
- p->req = requestLink(r);
- p->callback = NULL;
- p->cbdata = NULL;
- p->next = pump_head;
- pump_head = p;
- cbdataAdd(p, cbdataXfree, 0);
- comm_add_close_handler(p->c_fd, pumpFree, p);
- commSetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0);
- debug(61, 4) ("pumpInit: FD %d, Created %pn", fd, p);
- }
- void
- pumpStart(int s_fd, FwdState * fwd, CWCB * callback, void *cbdata)
- {
- PumpStateData *p = NULL;
- request_t *r = fwd->request;
- size_t copy_sz;
- debug(61, 3) ("pumpStart: FD %d, key %sn",
- s_fd, storeKeyText(fwd->entry->key));
- /*
- * find state data generated by pumpInit in linked list
- */
- for (p = pump_head; p && p->req != r; p = p->next);
- assert(p != NULL);
- assert(p->request_entry);
- assert(p->c_fd > -1);
- assert(r == p->req);
- /*
- * fill in the rest of data needed by the pump
- */
- p->fwd = fwd;
- p->s_fd = s_fd;
- p->reply_entry = fwd->entry;
- p->callback = callback;
- p->cbdata = cbdata;
- cbdataLock(p->cbdata);
- storeLockObject(p->reply_entry);
- comm_add_close_handler(p->s_fd, pumpServerClosed, p);
- /*
- * see if part of the body is in the request
- */
- if (p->rcvd < p->cont_len && r->body_sz > 0) {
- assert(p->request_entry->store_status == STORE_PENDING);
- assert(r->body != NULL);
- assert(r->body_sz <= p->cont_len);
- copy_sz = XMIN(r->body_sz, p->cont_len);
- debug(61, 3) ("pumpStart: Appending %d bytes from r->bodyn", copy_sz);
- storeAppend(p->request_entry, r->body, copy_sz);
- p->rcvd = copy_sz;
- }
- /*
- * Do we need to read more data from the client?
- */
- if (p->rcvd < p->cont_len) {
- assert(p->request_entry->store_status == STORE_PENDING);
- commSetSelect(p->c_fd, COMM_SELECT_READ, pumpReadFromClient, p, 0);
- commSetTimeout(p->c_fd, Config.Timeout.read, pumpTimeout, p);
- commSetDefer(p->c_fd, pumpReadDefer, p);
- }
- p->sent = 0;
- if (p->sent == p->cont_len) {
- pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);
- } else {
- storeClientCopy(p->request_entry, p->sent, p->sent, 4096,
- memAllocate(MEM_4K_BUF),
- pumpServerCopy, p);
- }
- }
- static void
- pumpServerCopy(void *data, char *buf, ssize_t size)
- {
- PumpStateData *p = data;
- debug(61, 5) ("pumpServerCopy: called with size=%dn", size);
- if (size < 0) {
- debug(61, 5) ("pumpServerCopy: freeing and returningn");
- memFree(buf, MEM_4K_BUF);
- return;
- }
- if (size == 0) {
- debug(61, 5) ("pumpServerCopy: done, finishingn", size);
- pumpServerCopyComplete(p->s_fd, NULL, 0, DISK_OK, p);
- memFree(buf, MEM_4K_BUF);
- return;
- }
- debug(61, 5) ("pumpServerCopy: to FD %d, %d bytesn", p->s_fd, size);
- comm_write(p->s_fd, buf, size, pumpServerCopyComplete, p, memFree4K);
- }
- static void
- pumpServerCopyComplete(int fd, char *bufnotused, size_t size, int errflag, void *data)
- {
- PumpStateData *p = data;
- int sfd;
- debug(61, 5) ("pumpServerCopyComplete: called with size=%d (%d,%d)n",
- size, p->sent + size, p->cont_len);
- if (errflag == COMM_ERR_CLOSING)
- return;
- if (errflag != 0) {
- debug(61, 5) ("pumpServerCopyComplete: aborted, errflag %dn", errflag);
- pumpClose(p);
- return;
- }
- if (EBIT_TEST(p->request_entry->flags, ENTRY_ABORTED)) {
- debug(61, 5) ("pumpServerCopyComplete: ENTRY_ABORTEDn");
- pumpClose(p);
- return;
- }
- p->sent += size;
- assert(p->sent <= p->cont_len);
- if (p->sent < p->cont_len) {
- storeClientCopy(p->request_entry, p->sent, p->sent, 4096,
- memAllocate(MEM_4K_BUF),
- pumpServerCopy, p);
- return;
- }
- debug(61, 5) ("pumpServerCopyComplete: Done!n", size);
- /*
- * we don't care what happens on the server side now
- */
- sfd = p->s_fd;
- comm_remove_close_handler(p->s_fd, pumpServerClosed, p);
- p->s_fd = -1;
- if (cbdataValid(p->cbdata))
- p->callback(sfd, NULL, p->sent, 0, p->cbdata);
- cbdataUnlock(p->cbdata);
- storeUnlockObject(p->reply_entry);
- p->reply_entry = NULL;
- }
- static void
- pumpReadFromClient(int fd, void *data)
- {
- PumpStateData *p = data;
- StoreEntry *req = p->request_entry;
- LOCAL_ARRAY(char, buf, SQUID_TCP_SO_RCVBUF);
- int bytes_to_read = XMIN(p->cont_len - p->rcvd, SQUID_TCP_SO_RCVBUF);
- int len = 0;
- errno = 0;
- Counter.syscalls.sock.reads++;
- len = read(fd, buf, bytes_to_read);
- fd_bytes(fd, len, FD_READ);
- debug(61, 5) ("pumpReadFromClient: FD %d: len %d.n", fd, len);
- if (len > 0) {
- (void) 0; /* continue */
- } else if (len < 0) {
- debug(61, 2) ("pumpReadFromClient: FD %d: read failure: %s.n",
- fd, xstrerror());
- if (ignoreErrno(errno)) {
- debug(61, 5) ("pumpReadFromClient: FD %d: len %d and ignore!n",
- fd, len);
- commSetSelect(fd,
- COMM_SELECT_READ,
- pumpReadFromClient,
- p,
- Config.Timeout.read);
- } else {
- debug(61, 2) ("pumpReadFromClient: aborted.n");
- pumpClose(p);
- }
- return;
- } else if (req->mem_obj->inmem_hi == 0) {
- debug(61, 2) ("pumpReadFromClient: FD %d: failed.n", fd);
- pumpClose(p);
- return;
- } else if (p->rcvd < p->cont_len) {
- debug(61, 4) ("pumpReadFromClient: FD %d, incomplete requestn", fd);
- pumpClose(p);
- return;
- }
- if (len > 0) {
- int delta = p->rcvd + len - p->cont_len;
- if (delta > 0 && p->req->flags.proxy_keepalive) {
- debug(61, delta == 2 ? 3 : 1) ("pumpReadFromClient: Warning: read %d bytes past content-length, truncatingn", delta);
- len = p->cont_len - p->rcvd;
- }
- storeAppend(req, buf, len);
- p->rcvd += len;
- }
- if (p->rcvd < p->cont_len) {
- /* We need more data */
- commSetSelect(fd, COMM_SELECT_READ, pumpReadFromClient,
- p, Config.Timeout.read);
- return;
- }
- /* all done! */
- if (p->req->flags.proxy_keepalive)
- assert(p->rcvd == p->cont_len);
- debug(61, 2) ("pumpReadFromClient: finished!n");
- storeComplete(req);
- commSetDefer(p->c_fd, NULL, NULL);
- }
- static int
- pumpReadDefer(int fd, void *data)
- {
- PumpStateData *p = data;
- assert(p->rcvd >= p->sent);
- if ((p->rcvd - p->sent) < PUMP_MAXBUFFER)
- return 0;
- debug(61, 5) ("pumpReadDefer: deferring, rcvd=%d, sent=%dn",
- p->rcvd, p->sent);
- return 1;
- }
- static void
- pumpClose(void *data)
- {
- PumpStateData *p = data;
- StoreEntry *req = p->request_entry;
- StoreEntry *rep = p->reply_entry;
- cbdataLock(p);
- debug(61, 3) ("pumpClose: %p Server FD %d, Client FD %dn",
- p, p->s_fd, p->c_fd);
- /* double-call detection */
- assert(!p->flags.closing);
- p->flags.closing = 1;
- if (req != NULL && req->store_status == STORE_PENDING) {
- storeUnregister(req, p);
- }
- if (rep != NULL && rep->store_status == STORE_PENDING) {
- ErrorState *err = errorCon(ERR_READ_ERROR, HTTP_INTERNAL_SERVER_ERROR);
- fwdFail(p->fwd, err);
- }
- if (p->s_fd > -1) {
- comm_close(p->s_fd);
- p->s_fd = -1;
- }
- if (p->c_fd > -1) {
- comm_close(p->c_fd);
- }
- /* This tests that pumpFree() got called somewhere */
- assert(0 == cbdataValid(p));
- cbdataUnlock(p);
- }
- static void
- pumpFree(int fd, void *data)
- {
- PumpStateData *p = NULL;
- PumpStateData *q = NULL;
- StoreEntry *req;
- StoreEntry *rep;
- debug(61, 3) ("pumpFree: FD %d, releasing %p!n", fd, data);
- for (p = pump_head; p && p != data; q = p, p = p->next);
- if (p == NULL) {
- debug(61, 1) ("pumpFree: p=%p not found?n", p);
- return;
- }
- if (q)
- q->next = p->next;
- else
- pump_head = p->next;
- assert(fd == p->c_fd);
- p->c_fd = -1;
- req = p->request_entry;
- rep = p->reply_entry;
- if (req != NULL) {
- storeUnregister(req, p);
- storeUnlockObject(req);
- p->request_entry = NULL;
- }
- if (rep != NULL) {
- debug(61, 3) ("pumpFree: did the server-side FD (%d) get closed?n", p->s_fd);
- storeUnlockObject(rep);
- p->reply_entry = NULL;
- }
- requestUnlink(p->req);
- if (p->s_fd > -1) {
- assert(!fd_table[p->s_fd].flags.open);
- p->s_fd = -1;
- }
- cbdataFree(p);
- }
- static void
- pumpTimeout(int fd, void *data)
- {
- PumpStateData *p = data;
- debug(61, 3) ("pumpTimeout: FD %dn", p->c_fd);
- pumpClose(p);
- }
- /*
- *This is called only if the client connect closes unexpectedly
- */
- static void
- pumpServerClosed(int fd, void *data)
- {
- PumpStateData *p = data;
- debug(61, 3) ("pumpServerClosed: FD %dn", fd);
- /*
- * we have been called from comm_close for the server side, so
- * just need to clean up the client side
- */
- assert(p->s_fd == fd);
- p->s_fd = -1;
- if (p->flags.closing)
- return;
- if (p->c_fd > -1)
- comm_close(p->c_fd);
- }
- /*
- * This function returns true for the request methods handled
- * by this module
- */
- int
- pumpMethod(method_t method)
- {
- switch (method) {
- case METHOD_POST:
- case METHOD_PUT:
- return 1;
- break;
- default:
- return 0;
- break;
- }
- /* NOTREACHED */
- }
- /*
- * This function returns True if we can submit this request again.
- * The request may have been pipelined, but the connection got
- * closed before we got a reply. If we still have the whole
- * request in memory then we can send it again. If we want to
- * be able to restart very large requests, then we'll have to
- * swap them out to disk.
- */
- int
- pumpRestart(request_t * r)
- {
- PumpStateData *p;
- MemObject *mem;
- for (p = pump_head; p && p->req != r; p = p->next);
- if (p == NULL) {
- debug(61, 3) ("pumpRestart: NO: Can't find pumpState!n");
- return 0;
- }
- mem = p->request_entry->mem_obj;
- if (mem == NULL) {
- debug(61, 3) ("pumpRestart: NO: request_entry->mem_obj == NULL!n");
- return 0;
- }
- if (mem->inmem_lo > 0) {
- debug(61, 3) ("pumpRestart: NO: mem->inmem_lo == %dn",
- (int) mem->inmem_lo);
- return 0;
- }
- debug(61, 3) ("pumpRestart: YES!n");
- return 1;
- }