icp_v2.c
上传用户:liugui
上传日期:2007-01-04
资源大小:822k
文件大小:16k
源码类别:

代理服务器

开发平台:

Unix_Linux

  1. /*
  2.  * $Id: icp_v2.c,v 1.58 1999/01/29 21:28:13 wessels Exp $
  3.  *
  4.  * DEBUG: section 12    Internet Cache Protocol
  5.  * AUTHOR: Duane Wessels
  6.  *
  7.  * SQUID Internet Object Cache  http://squid.nlanr.net/Squid/
  8.  * ----------------------------------------------------------
  9.  *
  10.  *  Squid is the result of efforts by numerous individuals from the
  11.  *  Internet community.  Development is led by Duane Wessels of the
  12.  *  National Laboratory for Applied Network Research and funded by the
  13.  *  National Science Foundation.  Squid is Copyrighted (C) 1998 by
  14.  *  Duane Wessels and the University of California San Diego.  Please
  15.  *  see the COPYRIGHT file for full details.  Squid incorporates
  16.  *  software developed and/or copyrighted by other sources.  Please see
  17.  *  the CREDITS file for full details.
  18.  *
  19.  *  This program is free software; you can redistribute it and/or modify
  20.  *  it under the terms of the GNU General Public License as published by
  21.  *  the Free Software Foundation; either version 2 of the License, or
  22.  *  (at your option) any later version.
  23.  *  
  24.  *  This program is distributed in the hope that it will be useful,
  25.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  26.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  27.  *  GNU General Public License for more details.
  28.  *  
  29.  *  You should have received a copy of the GNU General Public License
  30.  *  along with this program; if not, write to the Free Software
  31.  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA.
  32.  *
  33.  */
  34. #include "squid.h"
  35. static void icpLogIcp(struct in_addr, log_type, int, const char *, int);
  36. static void icpHandleIcpV2(int, struct sockaddr_in, char *, int);
  37. static void icpCount(void *, int, size_t, int);
  38. /*
  39.  * IcpQueueHead is global so comm_incoming() knows whether or not
  40.  * to call icpUdpSendQueue.
  41.  */
  42. static icpUdpData *IcpQueueTail = NULL;
  43. static void
  44. icpLogIcp(struct in_addr caddr, log_type logcode, int len, const char *url, int delay)
  45. {
  46.     AccessLogEntry al;
  47.     if (LOG_TAG_NONE == logcode)
  48. return;
  49.     if (LOG_ICP_QUERY == logcode)
  50. return;
  51.     clientdbUpdate(caddr, logcode, PROTO_ICP, len);
  52.     if (!Config.onoff.log_udp)
  53. return;
  54.     memset(&al, '', sizeof(al));
  55.     al.icp.opcode = ICP_QUERY;
  56.     al.url = url;
  57.     al.cache.caddr = caddr;
  58.     al.cache.size = len;
  59.     al.cache.code = logcode;
  60.     al.cache.msec = delay;
  61.     accessLogLog(&al);
  62. }
  63. void
  64. icpUdpSendQueue(int fd, void *unused)
  65. {
  66.     icpUdpData *q;
  67.     int x;
  68.     int delay;
  69.     while ((q = IcpQueueHead) != NULL) {
  70. delay = tvSubUsec(q->queue_time, current_time);
  71. /* increment delay to prevent looping */
  72. x = icpUdpSend(fd, &q->address, q->msg, q->logcode, ++delay);
  73. IcpQueueHead = q->next;
  74. safe_free(q);
  75. if (x < 0)
  76.     break;
  77.     }
  78. }
  79. void *
  80. icpCreateMessage(
  81.     icp_opcode opcode,
  82.     int flags,
  83.     const char *url,
  84.     int reqnum,
  85.     int pad)
  86. {
  87.     char *buf = NULL;
  88.     icp_common_t *headerp = NULL;
  89.     char *urloffset = NULL;
  90.     int buf_len;
  91.     buf_len = sizeof(icp_common_t) + strlen(url) + 1;
  92.     if (opcode == ICP_QUERY)
  93. buf_len += sizeof(u_num32);
  94.     buf = xcalloc(buf_len, 1);
  95.     headerp = (icp_common_t *) (void *) buf;
  96.     headerp->opcode = (char) opcode;
  97.     headerp->version = ICP_VERSION_CURRENT;
  98.     headerp->length = (u_short) htons(buf_len);
  99.     headerp->reqnum = htonl(reqnum);
  100.     headerp->flags = htonl(flags);
  101.     headerp->pad = htonl(pad);
  102.     headerp->shostid = theOutICPAddr.s_addr;
  103.     urloffset = buf + sizeof(icp_common_t);
  104.     if (opcode == ICP_QUERY)
  105. urloffset += sizeof(u_num32);
  106.     xmemcpy(urloffset, url, strlen(url));
  107.     return buf;
  108. }
  109. int
  110. icpUdpSend(int fd,
  111.     const struct sockaddr_in *to,
  112.     icp_common_t * msg,
  113.     log_type logcode,
  114.     int delay)
  115. {
  116.     icpUdpData *queue;
  117.     int x;
  118.     int len;
  119.     len = (int) ntohs(msg->length);
  120.     debug(12, 5) ("icpUdpSend: FD %d sending %s, %d bytes to %s:%dn",
  121. fd,
  122. icp_opcode_str[msg->opcode],
  123. len,
  124. inet_ntoa(to->sin_addr),
  125. ntohs(to->sin_port));
  126.     x = comm_udp_sendto(fd, to, sizeof(*to), msg, len);
  127.     if (x >= 0) {
  128. /* successfully written */
  129. icpLogIcp(to->sin_addr, logcode, len, (char *) (msg + 1), delay);
  130. icpCount(msg, SENT, (size_t) len, delay);
  131. safe_free(msg);
  132.     } else if (0 == delay) {
  133. /* send failed, but queue it */
  134. queue = xcalloc(1, sizeof(icpUdpData));
  135. queue->address = *to;
  136. queue->msg = msg;
  137. queue->len = (int) ntohs(msg->length);
  138. queue->queue_time = current_time;
  139. queue->logcode = logcode;
  140. if (IcpQueueHead == NULL) {
  141.     IcpQueueHead = queue;
  142.     IcpQueueTail = queue;
  143. } else if (IcpQueueTail == IcpQueueHead) {
  144.     IcpQueueTail = queue;
  145.     IcpQueueHead->next = queue;
  146. } else {
  147.     IcpQueueTail->next = queue;
  148.     IcpQueueTail = queue;
  149. }
  150. commSetSelect(fd, COMM_SELECT_WRITE, icpUdpSendQueue, NULL, 0);
  151. Counter.icp.replies_queued++;
  152.     } else {
  153. /* don't queue it */
  154. Counter.icp.replies_dropped++;
  155.     }
  156.     return x;
  157. }
  158. int
  159. icpCheckUdpHit(StoreEntry * e, request_t * request)
  160. {
  161.     if (e == NULL)
  162. return 0;
  163.     if (!storeEntryValidToSend(e))
  164. return 0;
  165.     if (Config.onoff.icp_hit_stale)
  166. return 1;
  167.     if (refreshCheckICP(e, request))
  168. return 0;
  169.     return 1;
  170. }
  171. static void
  172. icpHandleIcpV2(int fd, struct sockaddr_in from, char *buf, int len)
  173. {
  174.     icp_common_t header;
  175.     StoreEntry *entry = NULL;
  176.     char *url = NULL;
  177.     const cache_key *key;
  178.     request_t *icp_request = NULL;
  179.     int allow = 0;
  180.     aclCheck_t checklist;
  181.     icp_common_t *reply;
  182.     int src_rtt = 0;
  183.     u_num32 flags = 0;
  184.     int rtt = 0;
  185.     int hops = 0;
  186.     xmemcpy(&header, buf, sizeof(icp_common_t));
  187.     /*
  188.      * Only these fields need to be converted
  189.      */
  190.     header.length = ntohs(header.length);
  191.     header.reqnum = ntohl(header.reqnum);
  192.     header.flags = ntohl(header.flags);
  193.     header.pad = ntohl(header.pad);
  194.     switch (header.opcode) {
  195.     case ICP_QUERY:
  196. /* We have a valid packet */
  197. url = buf + sizeof(icp_common_t) + sizeof(u_num32);
  198. if (strpbrk(url, w_space)) {
  199.     url = rfc1738_escape(url);
  200.     reply = icpCreateMessage(ICP_ERR, 0, url, header.reqnum, 0);
  201.     icpUdpSend(fd, &from, reply, LOG_UDP_INVALID, 0);
  202.     break;
  203. }
  204. if ((icp_request = urlParse(METHOD_GET, url)) == NULL) {
  205.     reply = icpCreateMessage(ICP_ERR, 0, url, header.reqnum, 0);
  206.     icpUdpSend(fd, &from, reply, LOG_UDP_INVALID, 0);
  207.     break;
  208. }
  209. checklist.src_addr = from.sin_addr;
  210. checklist.my_addr = no_addr;
  211. checklist.request = icp_request;
  212. allow = aclCheckFast(Config.accessList.icp, &checklist);
  213. if (!allow) {
  214.     debug(12, 2) ("icpHandleIcpV2: Access Denied for %s by %s.n",
  215. inet_ntoa(from.sin_addr), AclMatchedName);
  216.     if (clientdbCutoffDenied(from.sin_addr)) {
  217. /*
  218.  * count this DENIED query in the clientdb, even though
  219.  * we're not sending an ICP reply...
  220.  */
  221. clientdbUpdate(from.sin_addr, LOG_UDP_DENIED, PROTO_ICP, 0);
  222.     } else {
  223. reply = icpCreateMessage(ICP_DENIED, 0, url, header.reqnum, 0);
  224. icpUdpSend(fd, &from, reply, LOG_UDP_DENIED, 0);
  225.     }
  226.     break;
  227. }
  228. if (header.flags & ICP_FLAG_SRC_RTT) {
  229.     rtt = netdbHostRtt(icp_request->host);
  230.     hops = netdbHostHops(icp_request->host);
  231.     src_rtt = ((hops & 0xFFFF) << 16) | (rtt & 0xFFFF);
  232.     if (rtt)
  233. flags |= ICP_FLAG_SRC_RTT;
  234. }
  235. /* The peer is allowed to use this cache */
  236. entry = storeGetPublic(url, METHOD_GET);
  237. debug(12, 5) ("icpHandleIcpV2: OPCODE %sn", icp_opcode_str[header.opcode]);
  238. if (icpCheckUdpHit(entry, icp_request)) {
  239.     reply = icpCreateMessage(ICP_HIT, flags, url, header.reqnum, src_rtt);
  240.     icpUdpSend(fd, &from, reply, LOG_UDP_HIT, 0);
  241.     break;
  242. }
  243. if (Config.onoff.test_reachability && rtt == 0) {
  244.     if ((rtt = netdbHostRtt(icp_request->host)) == 0)
  245. netdbPingSite(icp_request->host);
  246. }
  247. /* if store is rebuilding, return a UDP_HIT, but not a MISS */
  248. if (store_rebuilding && opt_reload_hit_only) {
  249.     reply = icpCreateMessage(ICP_MISS_NOFETCH, flags, url, header.reqnum, src_rtt);
  250.     icpUdpSend(fd, &from, reply, LOG_UDP_MISS_NOFETCH, 0);
  251. } else if (hit_only_mode_until > squid_curtime) {
  252.     reply = icpCreateMessage(ICP_MISS_NOFETCH, flags, url, header.reqnum, src_rtt);
  253.     icpUdpSend(fd, &from, reply, LOG_UDP_MISS_NOFETCH, 0);
  254. } else if (Config.onoff.test_reachability && rtt == 0) {
  255.     reply = icpCreateMessage(ICP_MISS_NOFETCH, flags, url, header.reqnum, src_rtt);
  256.     icpUdpSend(fd, &from, reply, LOG_UDP_MISS_NOFETCH, 0);
  257. } else {
  258.     reply = icpCreateMessage(ICP_MISS, flags, url, header.reqnum, src_rtt);
  259.     icpUdpSend(fd, &from, reply, LOG_UDP_MISS, 0);
  260. }
  261. break;
  262.     case ICP_HIT:
  263. #if ALLOW_SOURCE_PING
  264.     case ICP_SECHO:
  265. #endif
  266.     case ICP_DECHO:
  267.     case ICP_MISS:
  268.     case ICP_DENIED:
  269.     case ICP_MISS_NOFETCH:
  270. if (neighbors_do_private_keys && header.reqnum == 0) {
  271.     debug(12, 0) ("icpHandleIcpV2: Neighbor %s returned reqnum = 0n",
  272. inet_ntoa(from.sin_addr));
  273.     debug(12, 0) ("icpHandleIcpV2: Disabling use of private keysn");
  274.     neighbors_do_private_keys = 0;
  275. }
  276. url = buf + sizeof(icp_common_t);
  277. debug(12, 3) ("icpHandleIcpV2: %s from %s for '%s'n",
  278.     icp_opcode_str[header.opcode],
  279.     inet_ntoa(from.sin_addr),
  280.     url);
  281. key = icpGetCacheKey(url, (int) header.reqnum);
  282. /* call neighborsUdpAck even if ping_status != PING_WAITING */
  283. neighborsUdpAck(key, &header, &from);
  284. break;
  285.     case ICP_INVALID:
  286.     case ICP_ERR:
  287. break;
  288.     default:
  289. debug(12, 0) ("icpHandleIcpV2: UNKNOWN OPCODE: %d from %sn",
  290.     header.opcode, inet_ntoa(from.sin_addr));
  291. break;
  292.     }
  293.     if (icp_request)
  294. requestDestroy(icp_request);
  295. }
  296. #ifdef ICP_PKT_DUMP
  297. static void
  298. icpPktDump(icp_common_t * pkt)
  299. {
  300.     struct in_addr a;
  301.     debug(12, 9) ("opcode:     %3d %sn",
  302. (int) pkt->opcode,
  303. icp_opcode_str[pkt->opcode]);
  304.     debug(12, 9) ("version: %-8dn", (int) pkt->version);
  305.     debug(12, 9) ("length:  %-8dn", (int) ntohs(pkt->length));
  306.     debug(12, 9) ("reqnum:  %-8dn", ntohl(pkt->reqnum));
  307.     debug(12, 9) ("flags:   %-8xn", ntohl(pkt->flags));
  308.     a.s_addr = pkt->shostid;
  309.     debug(12, 9) ("shostid: %sn", inet_ntoa(a));
  310.     debug(12, 9) ("payload: %sn", (char *) pkt + sizeof(icp_common_t));
  311. }
  312. #endif
  313. void
  314. icpHandleUdp(int sock, void *data)
  315. {
  316.     int *N = data;
  317.     struct sockaddr_in from;
  318.     socklen_t from_len;
  319.     LOCAL_ARRAY(char, buf, SQUID_UDP_SO_RCVBUF);
  320.     int len;
  321.     int icp_version;
  322.     int max = INCOMING_ICP_MAX;
  323.     commSetSelect(sock, COMM_SELECT_READ, icpHandleUdp, NULL, 0);
  324.     while (max--) {
  325. from_len = sizeof(from);
  326. memset(&from, '', from_len);
  327. Counter.syscalls.sock.recvfroms++;
  328. len = recvfrom(sock,
  329.     buf,
  330.     SQUID_UDP_SO_RCVBUF - 1,
  331.     0,
  332.     (struct sockaddr *) &from,
  333.     &from_len);
  334. if (len == 0)
  335.     break;
  336. if (len < 0) {
  337.     if (ignoreErrno(errno))
  338. break;
  339. #ifdef _SQUID_LINUX_
  340.     /* Some Linux systems seem to set the FD for reading and then
  341.      * return ECONNREFUSED when sendto() fails and generates an ICMP
  342.      * port unreachable message. */
  343.     /* or maybe an EHOSTUNREACH "No route to host" message */
  344.     if (errno != ECONNREFUSED && errno != EHOSTUNREACH)
  345. #endif
  346. debug(50, 1) ("icpHandleUdp: FD %d recvfrom: %sn",
  347.     sock, xstrerror());
  348.     break;
  349. }
  350. (*N)++;
  351. icpCount(buf, RECV, (size_t) len, 0);
  352. buf[len] = '';
  353. debug(12, 4) ("icpHandleUdp: FD %d: received %d bytes from %s.n",
  354.     sock,
  355.     len,
  356.     inet_ntoa(from.sin_addr));
  357. #ifdef ICP_PACKET_DUMP
  358. icpPktDump(buf);
  359. #endif
  360. if (len < sizeof(icp_common_t)) {
  361.     debug(12, 4) ("icpHandleUdp: Ignoring too-small UDP packetn");
  362.     break;
  363. }
  364. icp_version = (int) buf[1]; /* cheat! */
  365. if (icp_version == ICP_VERSION_2)
  366.     icpHandleIcpV2(sock, from, buf, len);
  367. else if (icp_version == ICP_VERSION_3)
  368.     icpHandleIcpV3(sock, from, buf, len);
  369. else
  370.     debug(12, 1) ("WARNING: Unused ICP version %d received from %s:%dn",
  371. icp_version,
  372. inet_ntoa(from.sin_addr),
  373. ntohs(from.sin_port));
  374.     }
  375. }
  376. void
  377. icpConnectionsOpen(void)
  378. {
  379.     u_short port;
  380.     struct in_addr addr;
  381.     struct sockaddr_in xaddr;
  382.     int x;
  383.     socklen_t len;
  384.     wordlist *s;
  385.     if (Config2.Accel.on && !Config.onoff.accel_with_proxy)
  386. return;
  387.     if ((port = Config.Port.icp) <= 0)
  388. return;
  389.     enter_suid();
  390.     theInIcpConnection = comm_open(SOCK_DGRAM,
  391. 0,
  392. Config.Addrs.udp_incoming,
  393. port,
  394. COMM_NONBLOCKING,
  395. "ICP Socket");
  396.     leave_suid();
  397.     if (theInIcpConnection < 0)
  398. fatal("Cannot open ICP Port");
  399.     commSetSelect(theInIcpConnection,
  400. COMM_SELECT_READ,
  401. icpHandleUdp,
  402. NULL,
  403. 0);
  404.     for (s = Config.mcast_group_list; s; s = s->next)
  405. ipcache_nbgethostbyname(s->key, mcastJoinGroups, NULL);
  406.     debug(12, 1) ("Accepting ICP messages on port %d, FD %d.n",
  407. (int) port, theInIcpConnection);
  408.     if ((addr = Config.Addrs.udp_outgoing).s_addr != no_addr.s_addr) {
  409. enter_suid();
  410. theOutIcpConnection = comm_open(SOCK_DGRAM,
  411.     0,
  412.     addr,
  413.     port,
  414.     COMM_NONBLOCKING,
  415.     "ICP Port");
  416. leave_suid();
  417. if (theOutIcpConnection < 0)
  418.     fatal("Cannot open Outgoing ICP Port");
  419. commSetSelect(theOutIcpConnection,
  420.     COMM_SELECT_READ,
  421.     icpHandleUdp,
  422.     NULL,
  423.     0);
  424. debug(12, 1) ("Outgoing ICP messages on port %d, FD %d.n",
  425.     (int) port, theOutIcpConnection);
  426. fd_note(theOutIcpConnection, "Outgoing ICP socket");
  427. fd_note(theInIcpConnection, "Incoming ICP socket");
  428.     } else {
  429. theOutIcpConnection = theInIcpConnection;
  430.     }
  431.     memset(&theOutICPAddr, '', sizeof(struct in_addr));
  432.     len = sizeof(struct sockaddr_in);
  433.     memset(&xaddr, '', len);
  434.     x = getsockname(theOutIcpConnection,
  435. (struct sockaddr *) &xaddr, &len);
  436.     if (x < 0)
  437. debug(50, 1) ("theOutIcpConnection FD %d: getsockname: %sn",
  438.     theOutIcpConnection, xstrerror());
  439.     else
  440. theOutICPAddr = xaddr.sin_addr;
  441. }
  442. /*
  443.  * icpConnectionShutdown only closes the 'in' socket if it is 
  444.  * different than the 'out' socket.
  445.  */
  446. void
  447. icpConnectionShutdown(void)
  448. {
  449.     if (theInIcpConnection < 0)
  450. return;
  451.     if (theInIcpConnection != theOutIcpConnection) {
  452. debug(12, 1) ("FD %d Closing ICP connectionn", theInIcpConnection);
  453. comm_close(theInIcpConnection);
  454.     }
  455.     /*
  456.      * Here we set 'theInIcpConnection' to -1 even though the ICP 'in'
  457.      * and 'out' sockets might be just one FD.  This prevents this
  458.      * function from executing repeatedly.  When we are really ready to
  459.      * exit or restart, main will comm_close the 'out' descriptor.
  460.      */
  461.     theInIcpConnection = -1;
  462.     /*
  463.      * Normally we only write to the outgoing ICP socket, but
  464.      * we also have a read handler there to catch messages sent
  465.      * to that specific interface.  During shutdown, we must
  466.      * disable reading on the outgoing socket.
  467.      */
  468.     assert(theOutIcpConnection > -1);
  469.     commSetSelect(theOutIcpConnection, COMM_SELECT_READ, NULL, NULL, 0);
  470. }
  471. void
  472. icpConnectionClose(void)
  473. {
  474.     icpConnectionShutdown();
  475.     if (theOutIcpConnection > -1) {
  476. debug(12, 1) ("FD %d Closing ICP connectionn", theOutIcpConnection);
  477. comm_close(theOutIcpConnection);
  478. theOutIcpConnection = -1;
  479.     }
  480. }
  481. static void
  482. icpCount(void *buf, int which, size_t len, int delay)
  483. {
  484.     icp_common_t *icp = buf;
  485.     if (len < sizeof(*icp))
  486. return;
  487.     if (SENT == which) {
  488. Counter.icp.pkts_sent++;
  489. kb_incr(&Counter.icp.kbytes_sent, len);
  490. if (ICP_QUERY == icp->opcode) {
  491.     Counter.icp.queries_sent++;
  492.     kb_incr(&Counter.icp.q_kbytes_sent, len);
  493. } else {
  494.     Counter.icp.replies_sent++;
  495.     kb_incr(&Counter.icp.r_kbytes_sent, len);
  496.     /* this is the sent-reply service time */
  497.     statHistCount(&Counter.icp.reply_svc_time, delay);
  498. }
  499. if (ICP_HIT == icp->opcode)
  500.     Counter.icp.hits_sent++;
  501.     } else if (RECV == which) {
  502. Counter.icp.pkts_recv++;
  503. kb_incr(&Counter.icp.kbytes_recv, len);
  504. if (ICP_QUERY == icp->opcode) {
  505.     Counter.icp.queries_recv++;
  506.     kb_incr(&Counter.icp.q_kbytes_recv, len);
  507. } else {
  508.     Counter.icp.replies_recv++;
  509.     kb_incr(&Counter.icp.r_kbytes_recv, len);
  510.     /* Counter.icp.query_svc_time set in clientUpdateCounters */
  511. }
  512. if (ICP_HIT == icp->opcode)
  513.     Counter.icp.hits_recv++;
  514.     }
  515. }
  516. #define N_QUERIED_KEYS 8192
  517. #define N_QUERIED_KEYS_MASK 8191
  518. static cache_key queried_keys[N_QUERIED_KEYS][MD5_DIGEST_CHARS];
  519. int
  520. icpSetCacheKey(const cache_key * key)
  521. {
  522.     static int reqnum = 0;
  523.     if (++reqnum < 0)
  524. reqnum = 1;
  525.     storeKeyCopy(queried_keys[reqnum & N_QUERIED_KEYS_MASK], key);
  526.     return reqnum;
  527. }
  528. const cache_key *
  529. icpGetCacheKey(const char *url, int reqnum)
  530. {
  531.     if (neighbors_do_private_keys && reqnum)
  532. return queried_keys[reqnum & N_QUERIED_KEYS_MASK];
  533.     return storeKeyPublic(url, METHOD_GET);
  534. }