helper.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:10k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. #include "squid.h"
  2. #define HELPER_MAX_ARGS 64
  3. static PF helperHandleRead;
  4. static PF helperServerFree;
  5. static void Enqueue(helper * hlp, helper_request *);
  6. static helper_request *Dequeue(helper * hlp);
  7. static helper_server *GetFirstAvailable(helper * hlp);
  8. static void helperDispatch(helper_server * srv, helper_request * r);
  9. static void helperKickQueue(helper * hlp);
  10. static void helperRequestFree(helper_request * r);
  11. void
  12. helperOpenServers(helper * hlp)
  13. {
  14.     char *s;
  15.     char *progname;
  16.     char *shortname;
  17.     char *procname;
  18.     char *args[HELPER_MAX_ARGS];
  19.     char fd_note_buf[FD_DESC_SZ];
  20.     helper_server *srv;
  21.     int nargs = 0;
  22.     int k;
  23.     int x;
  24.     int rfd;
  25.     int wfd;
  26.     wordlist *w;
  27.     if (hlp->cmdline == NULL)
  28. return;
  29.     progname = hlp->cmdline->key;
  30.     if ((s = strrchr(progname, '/')))
  31. shortname = xstrdup(s + 1);
  32.     else
  33. shortname = xstrdup(progname);
  34.     debug(29, 1) ("helperOpenServers: Starting %d '%s' processesn",
  35. hlp->n_to_start, shortname);
  36.     procname = xmalloc(strlen(shortname) + 3);
  37.     snprintf(procname, strlen(shortname) + 3, "(%s)", shortname);
  38.     args[nargs++] = procname;
  39.     for (w = hlp->cmdline->next; w && nargs < HELPER_MAX_ARGS; w = w->next)
  40. args[nargs++] = w->key;
  41.     args[nargs++] = NULL;
  42.     assert(nargs <= HELPER_MAX_ARGS);
  43.     for (k = 0; k < hlp->n_to_start; k++) {
  44. getCurrentTime();
  45. rfd = wfd = -1;
  46. x = ipcCreate(hlp->ipc_type,
  47.     progname,
  48.     args,
  49.     shortname,
  50.     &rfd,
  51.     &wfd);
  52. if (x < 0) {
  53.     debug(29, 1) ("WARNING: Cannot run '%s' process.n", progname);
  54.     continue;
  55. }
  56. hlp->n_running++;
  57. srv = memAllocate(MEM_HELPER_SERVER);
  58. cbdataAdd(srv, memFree, MEM_HELPER_SERVER);
  59. srv->flags.alive = 1;
  60. srv->index = k;
  61. srv->rfd = rfd;
  62. srv->wfd = wfd;
  63. srv->buf = memAllocate(MEM_8K_BUF);
  64. srv->buf_sz = 8192;
  65. srv->offset = 0;
  66. srv->parent = hlp;
  67. cbdataLock(hlp); /* lock because of the parent backlink */
  68. dlinkAddTail(srv, &srv->link, &hlp->servers);
  69. if (rfd == wfd) {
  70.     snprintf(fd_note_buf, FD_DESC_SZ, "%s #%d", shortname, k + 1);
  71.     fd_note(rfd, fd_note_buf);
  72. } else {
  73.     snprintf(fd_note_buf, FD_DESC_SZ, "reading %s #%d", shortname, k + 1);
  74.     fd_note(rfd, fd_note_buf);
  75.     snprintf(fd_note_buf, FD_DESC_SZ, "writing %s #%d", shortname, k + 1);
  76.     fd_note(wfd, fd_note_buf);
  77. }
  78. commSetNonBlocking(rfd);
  79. if (wfd != rfd)
  80.     commSetNonBlocking(wfd);
  81. comm_add_close_handler(rfd, helperServerFree, srv);
  82.     }
  83.     safe_free(shortname);
  84.     safe_free(procname);
  85.     helperKickQueue(hlp);
  86. }
  87. void
  88. helperSubmit(helper * hlp, const char *buf, HLPCB * callback, void *data)
  89. {
  90.     helper_request *r = memAllocate(MEM_HELPER_REQUEST);
  91.     helper_server *srv;
  92.     if (hlp == NULL) {
  93. debug(29, 3) ("helperSubmit: hlp == NULLn");
  94. callback(data, NULL);
  95. return;
  96.     }
  97.     r->callback = callback;
  98.     r->data = data;
  99.     r->buf = xstrdup(buf);
  100.     cbdataLock(r->data);
  101.     if ((srv = GetFirstAvailable(hlp)))
  102. helperDispatch(srv, r);
  103.     else
  104. Enqueue(hlp, r);
  105. }
  106. void
  107. helperStats(StoreEntry * sentry, helper * hlp)
  108. {
  109.     helper_server *srv;
  110.     dlink_node *link;
  111.     double tt;
  112.     storeAppendPrintf(sentry, "number running: %d of %dn",
  113. hlp->n_running, hlp->n_to_start);
  114.     storeAppendPrintf(sentry, "requests sent: %dn",
  115. hlp->stats.requests);
  116.     storeAppendPrintf(sentry, "replies received: %dn",
  117. hlp->stats.replies);
  118.     storeAppendPrintf(sentry, "queue length: %dn",
  119. hlp->stats.queue_size);
  120.     storeAppendPrintf(sentry, "avg service time: %d msecn",
  121. hlp->stats.avg_svc_time);
  122.     storeAppendPrintf(sentry, "n");
  123.     storeAppendPrintf(sentry, "%7st%7st%11st%st%7st%7sn",
  124. "#",
  125. "FD",
  126. "# Requests",
  127. "Flags",
  128. "Time",
  129. "Offset");
  130.     for (link = hlp->servers.head; link; link = link->next) {
  131. srv = link->data;
  132. tt = 0.001 * tvSubMsec(srv->dispatch_time, current_time);
  133. storeAppendPrintf(sentry, "%7dt%7dt%11dt%c%c%c%ct%7.3ft%7dn",
  134.     srv->index + 1,
  135.     srv->rfd,
  136.     srv->stats.uses,
  137.     srv->flags.alive ? 'A' : ' ',
  138.     srv->flags.busy ? 'B' : ' ',
  139.     srv->flags.closing ? 'C' : ' ',
  140.     srv->flags.shutdown ? 'S' : ' ',
  141.     tt < 0.0 ? 0.0 : tt,
  142.     (int) srv->offset);
  143.     }
  144.     storeAppendPrintf(sentry, "nFlags key:nn");
  145.     storeAppendPrintf(sentry, "   A = ALIVEn");
  146.     storeAppendPrintf(sentry, "   B = BUSYn");
  147.     storeAppendPrintf(sentry, "   C = CLOSINGn");
  148.     storeAppendPrintf(sentry, "   S = SHUTDOWNn");
  149. }
  150. void
  151. helperShutdown(helper * hlp)
  152. {
  153.     dlink_node *link = hlp->servers.head;
  154.     helper_server *srv;
  155.     while (link) {
  156. srv = link->data;
  157. link = link->next;
  158. if (!srv->flags.alive) {
  159.     debug(34, 3) ("helperShutdown: %s #%d is NOT ALIVE.n",
  160. hlp->id_name, srv->index + 1);
  161.     continue;
  162. }
  163. srv->flags.shutdown = 1; /* request it to shut itself down */
  164. if (srv->flags.busy) {
  165.     debug(34, 3) ("helperShutdown: %s #%d is BUSY.n",
  166. hlp->id_name, srv->index + 1);
  167.     continue;
  168. }
  169. if (srv->flags.closing) {
  170.     debug(34, 3) ("helperShutdown: %s #%d is CLOSING.n",
  171. hlp->id_name, srv->index + 1);
  172.     continue;
  173. }
  174. srv->flags.closing = 1;
  175. comm_close(srv->rfd);
  176.     }
  177. }
  178. helper *
  179. helperCreate(const char *name)
  180. {
  181.     helper *hlp = memAllocate(MEM_HELPER);
  182.     cbdataAdd(hlp, memFree, MEM_HELPER);
  183.     hlp->id_name = name;
  184.     return hlp;
  185. }
  186. void
  187. helperFree(helper * hlp)
  188. {
  189.     /* note, don't free hlp->name, it probably points to static memory */
  190.     if (hlp->queue.head)
  191. debug(29, 0) ("WARNING: freeing %s helper with %d requests queuedn",
  192.     hlp->id_name, hlp->stats.queue_size);
  193.     cbdataFree(hlp);
  194. }
  195. /* ====================================================================== */
  196. /* LOCAL FUNCTIONS */
  197. /* ====================================================================== */
  198. static void
  199. helperServerFree(int fd, void *data)
  200. {
  201.     helper_server *srv = data;
  202.     helper *hlp = srv->parent;
  203.     helper_request *r;
  204.     assert(srv->rfd == fd);
  205.     if (srv->buf) {
  206. memFree(srv->buf, MEM_8K_BUF);
  207. srv->buf = NULL;
  208.     }
  209.     if ((r = srv->request)) {
  210. if (cbdataValid(r->data))
  211.     r->callback(r->data, srv->buf);
  212. helperRequestFree(r);
  213. srv->request = NULL;
  214.     }
  215.     if (srv->wfd != srv->rfd)
  216. comm_close(srv->wfd);
  217.     dlinkDelete(&srv->link, &hlp->servers);
  218.     hlp->n_running--;
  219.     assert(hlp->n_running >= 0);
  220.     if (!srv->flags.shutdown) {
  221. debug(34, 0) ("WARNING: %s #%d (FD %d) exitedn",
  222.     hlp->id_name, srv->index + 1, fd);
  223. assert(hlp->n_running >= hlp->n_to_start / 2);
  224. if (hlp->n_running < hlp->n_to_start / 2)
  225.     fatalf("Too few %s processes are running", hlp->id_name);
  226.     }
  227.     cbdataUnlock(srv->parent);
  228.     cbdataFree(srv);
  229. }
  230. static void
  231. helperHandleRead(int fd, void *data)
  232. {
  233.     int len;
  234.     char *t = NULL;
  235.     helper_server *srv = data;
  236.     helper_request *r;
  237.     helper *hlp = srv->parent;
  238.     assert(fd == srv->rfd);
  239.     assert(cbdataValid(data));
  240.     Counter.syscalls.sock.reads++;
  241.     len = read(fd, srv->buf + srv->offset, srv->buf_sz - srv->offset);
  242.     fd_bytes(fd, len, FD_READ);
  243.     debug(29, 5) ("helperHandleRead: %d bytes from %s #%d.n",
  244. len, hlp->id_name, srv->index + 1);
  245.     if (len <= 0) {
  246. if (len < 0)
  247.     debug(50, 1) ("helperHandleRead: FD %d read: %sn", fd, xstrerror());
  248. comm_close(fd);
  249. return;
  250.     }
  251.     srv->offset += len;
  252.     srv->buf[srv->offset] = '';
  253.     r = srv->request;
  254.     if (r == NULL) {
  255. /* someone spoke without being spoken to */
  256. debug(29, 1) ("helperHandleRead: unexpected read from %s #%d, %d bytesn",
  257.     hlp->id_name, srv->index + 1, len);
  258. srv->offset = 0;
  259.     } else if ((t = strchr(srv->buf, 'n'))) {
  260. /* end of reply found */
  261. debug(29, 3) ("helperHandleRead: end of reply foundn");
  262. *t = '';
  263. if (cbdataValid(r->data))
  264.     r->callback(r->data, srv->buf);
  265. srv->flags.busy = 0;
  266. srv->offset = 0;
  267. helperRequestFree(r);
  268. srv->request = NULL;
  269. hlp->stats.replies++;
  270. hlp->stats.avg_svc_time =
  271.     intAverage(hlp->stats.avg_svc_time,
  272.     tvSubMsec(srv->dispatch_time, current_time),
  273.     hlp->stats.replies, REDIRECT_AV_FACTOR);
  274. if (srv->flags.shutdown)
  275.     comm_close(srv->wfd);
  276. else
  277.     helperKickQueue(hlp);
  278.     } else {
  279. commSetSelect(srv->rfd, COMM_SELECT_READ, helperHandleRead, srv, 0);
  280.     }
  281. }
  282. static void
  283. Enqueue(helper * hlp, helper_request * r)
  284. {
  285.     dlink_node *link = memAllocate(MEM_DLINK_NODE);
  286.     dlinkAddTail(r, link, &hlp->queue);
  287.     hlp->stats.queue_size++;
  288.     if (hlp->stats.queue_size < hlp->n_running)
  289. return;
  290.     if (squid_curtime - hlp->last_queue_warn < 600)
  291. return;
  292.     if (shutting_down || reconfiguring)
  293. return;
  294.     hlp->last_queue_warn = squid_curtime;
  295.     debug(14, 0) ("WARNING: All %s processes are busy.n", hlp->id_name);
  296.     debug(14, 0) ("WARNING: %d pending requests queuedn", hlp->stats.queue_size);
  297.     if (hlp->stats.queue_size > hlp->n_running * 2)
  298. fatalf("Too many queued %s requests", hlp->id_name);
  299.     debug(14, 1) ("Consider increasing the number of %s processes in your config file.n", hlp->id_name);
  300. }
  301. static helper_request *
  302. Dequeue(helper * hlp)
  303. {
  304.     dlink_node *link;
  305.     helper_request *r = NULL;
  306.     if ((link = hlp->queue.head)) {
  307. r = link->data;
  308. dlinkDelete(link, &hlp->queue);
  309. memFree(link, MEM_DLINK_NODE);
  310. hlp->stats.queue_size--;
  311.     }
  312.     return r;
  313. }
  314. static helper_server *
  315. GetFirstAvailable(helper * hlp)
  316. {
  317.     dlink_node *n;
  318.     helper_server *srv = NULL;
  319.     if (hlp->n_running == 0)
  320. return NULL;
  321.     for (n = hlp->servers.head; n != NULL; n = n->next) {
  322. srv = n->data;
  323. if (srv->flags.busy)
  324.     continue;
  325. if (!srv->flags.alive)
  326.     continue;
  327. return srv;
  328.     }
  329.     return NULL;
  330. }
  331. static void
  332. helperDispatch(helper_server * srv, helper_request * r)
  333. {
  334.     helper *hlp = srv->parent;
  335.     if (!cbdataValid(r->data)) {
  336. debug(29, 1) ("helperDispatch: invalid callback datan");
  337. helperRequestFree(r);
  338. return;
  339.     }
  340.     assert(!srv->flags.busy);
  341.     srv->flags.busy = 1;
  342.     srv->request = r;
  343.     srv->dispatch_time = current_time;
  344.     comm_write(srv->wfd,
  345. r->buf,
  346. strlen(r->buf),
  347. NULL, /* Handler */
  348. NULL, /* Handler-data */
  349. NULL); /* free */
  350.     commSetSelect(srv->rfd,
  351. COMM_SELECT_READ,
  352. helperHandleRead,
  353. srv, 0);
  354.     debug(29, 5) ("helperDispatch: Request sent to %s #%d, %d bytesn",
  355. hlp->id_name, srv->index + 1, strlen(r->buf));
  356.     srv->stats.uses++;
  357.     hlp->stats.requests++;
  358. }
  359. static void
  360. helperKickQueue(helper * hlp)
  361. {
  362.     helper_request *r;
  363.     helper_server *srv;
  364.     while ((srv = GetFirstAvailable(hlp)) && (r = Dequeue(hlp)))
  365. helperDispatch(srv, r);
  366. }
  367. static void
  368. helperRequestFree(helper_request * r)
  369. {
  370.     cbdataUnlock(r->data);
  371.     xfree(r->buf);
  372.     memFree(r, MEM_HELPER_REQUEST);
  373. }