pqcomm.c
上传用户:blenddy
上传日期:2007-01-07
资源大小:6495k
文件大小:17k
源码类别:

数据库系统

开发平台:

Unix_Linux

  1. /*-------------------------------------------------------------------------
  2.  *
  3.  * pqcomm.c
  4.  *   Communication functions between the Frontend and the Backend
  5.  *
  6.  * These routines handle the low-level details of communication between
  7.  * frontend and backend.  They just shove data across the communication
  8.  * channel, and are ignorant of the semantics of the data --- or would be,
  9.  * except for major brain damage in the design of the COPY OUT protocol.
  10.  * Unfortunately, COPY OUT is designed to commandeer the communication
  11.  * channel (it just transfers data without wrapping it into messages).
  12.  * No other messages can be sent while COPY OUT is in progress; and if the
  13.  * copy is aborted by an elog(ERROR), we need to close out the copy so that
  14.  * the frontend gets back into sync.  Therefore, these routines have to be
  15.  * aware of COPY OUT state.
  16.  *
  17.  * NOTE: generally, it's a bad idea to emit outgoing messages directly with
  18.  * pq_putbytes(), especially if the message would require multiple calls
  19.  * to send.  Instead, use the routines in pqformat.c to construct the message
  20.  * in a buffer and then emit it in one call to pq_putmessage.  This helps
  21.  * ensure that the channel will not be clogged by an incomplete message
  22.  * if execution is aborted by elog(ERROR) partway through the message.
  23.  * The only non-libpq code that should call pq_putbytes directly is COPY OUT.
  24.  *
  25.  * At one time, libpq was shared between frontend and backend, but now
  26.  * the backend's "backend/libpq" is quite separate from "interfaces/libpq".
  27.  * All that remains is similarities of names to trap the unwary...
  28.  *
  29.  * Copyright (c) 1994, Regents of the University of California
  30.  *
  31.  * $Id: pqcomm.c,v 1.73.2.1 1999/09/08 23:00:51 tgl Exp $
  32.  *
  33.  *-------------------------------------------------------------------------
  34.  */
  35. /*------------------------
  36.  * INTERFACE ROUTINES
  37.  *
  38.  * setup/teardown:
  39.  * StreamServerPort - Open postmaster's server port
  40.  * StreamConnection - Create new connection with client
  41.  * StreamClose - Close a client/backend connection
  42.  * pq_getport - return the PGPORT setting
  43.  * pq_init - initialize libpq at backend startup
  44.  * pq_close - shutdown libpq at backend exit
  45.  *
  46.  * low-level I/O:
  47.  * pq_getbytes - get a known number of bytes from connection
  48.  * pq_getstring - get a null terminated string from connection
  49.  * pq_peekbyte - peek at next byte from connection
  50.  * pq_putbytes - send bytes to connection (not flushed until pq_flush)
  51.  * pq_flush - flush pending output
  52.  *
  53.  * message-level I/O (and COPY OUT cruft):
  54.  * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
  55.  * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  56.  * pq_endcopyout - end a COPY OUT transfer
  57.  *
  58.  *------------------------
  59.  */
  60. #include "postgres.h"
  61. #include <stdio.h>
  62. #if defined(HAVE_STRING_H)
  63. #include <string.h>
  64. #else
  65. #include <strings.h>
  66. #endif
  67. #include <signal.h>
  68. #include <errno.h>
  69. #include <fcntl.h>
  70. #include <unistd.h> /* for ttyname() */
  71. #include <sys/types.h>
  72. #include <sys/stat.h>
  73. #include <sys/socket.h>
  74. #include <netdb.h>
  75. #include <netinet/in.h>
  76. #include <netinet/tcp.h>
  77. #include <arpa/inet.h>
  78. #include <sys/file.h>
  79. #include "libpq/libpq.h" /* where my declarations go */
  80. #include "miscadmin.h"
  81. #include "libpq/pqsignal.h"
  82. #include "libpq/auth.h"
  83. #include "storage/ipc.h"
  84. #include "utils/trace.h"
  85. #ifndef SOMAXCONN
  86. #define SOMAXCONN 5 /* from Linux listen(2) man page */
  87. #endif  /* SOMAXCONN */
  88. extern FILE *debug_port; /* in util.c */
  89. /*
  90.  * Buffers for low-level I/O
  91.  */
  92. #define PQ_BUFFER_SIZE 8192
  93. static unsigned char PqSendBuffer[PQ_BUFFER_SIZE];
  94. static int PqSendPointer; /* Next index to store a byte in
  95.  * PqSendBuffer */
  96. static unsigned char PqRecvBuffer[PQ_BUFFER_SIZE];
  97. static int PqRecvPointer; /* Next index to read a byte from
  98.  * PqRecvBuffer */
  99. static int PqRecvLength; /* End of data available in PqRecvBuffer */
  100. /*
  101.  * Message status
  102.  */
  103. static bool DoingCopyOut;
  104. /* --------------------------------
  105.  * pq_init - initialize libpq at backend startup
  106.  * --------------------------------
  107.  */
  108. void
  109. pq_init(void)
  110. {
  111. PqSendPointer = PqRecvPointer = PqRecvLength = 0;
  112. DoingCopyOut = false;
  113. if (getenv("LIBPQ_DEBUG"))
  114. debug_port = stderr;
  115. }
  116. /* --------------------------------
  117.  * pq_getport - return the PGPORT setting
  118.  * --------------------------------
  119.  */
  120. int
  121. pq_getport(void)
  122. {
  123. char    *envport = getenv("PGPORT");
  124. if (envport)
  125. return atoi(envport);
  126. return atoi(DEF_PGPORT);
  127. }
  128. /* --------------------------------
  129.  * pq_close - shutdown libpq at backend exit
  130.  * --------------------------------
  131.  */
  132. void
  133. pq_close(void)
  134. {
  135. close(MyProcPort->sock);
  136. }
  137. /*
  138.  * Streams -- wrapper around Unix socket system calls
  139.  *
  140.  *
  141.  * Stream functions are used for vanilla TCP connection protocol.
  142.  */
  143. static char sock_path[MAXPGPATH + 1] = "";
  144. /* StreamDoUnlink()
  145.  * Shutdown routine for backend connection
  146.  * If a Unix socket is used for communication, explicitly close it.
  147.  */
  148. static void
  149. StreamDoUnlink()
  150. {
  151. Assert(sock_path[0]);
  152. unlink(sock_path);
  153. }
  154. /*
  155.  * StreamServerPort -- open a sock stream "listening" port.
  156.  *
  157.  * This initializes the Postmaster's connection-accepting port.
  158.  *
  159.  * RETURNS: STATUS_OK or STATUS_ERROR
  160.  */
  161. int
  162. StreamServerPort(char *hostName, unsigned short portName, int *fdP)
  163. {
  164. SockAddr saddr;
  165. int fd,
  166. err,
  167. family;
  168. size_t len;
  169. int one = 1;
  170. #ifdef HAVE_FCNTL_SETLK
  171. int lock_fd;
  172. #endif
  173. family = ((hostName != NULL) ? AF_INET : AF_UNIX);
  174. if ((fd = socket(family, SOCK_STREAM, 0)) < 0)
  175. {
  176. snprintf(PQerrormsg, ERROR_MSG_LENGTH,
  177.  "FATAL: StreamServerPort: socket() failed: %sn",
  178.  strerror(errno));
  179. fputs(PQerrormsg, stderr);
  180. pqdebug("%s", PQerrormsg);
  181. return STATUS_ERROR;
  182. }
  183. #ifdef ONLY_REUSE_INET_SOCKETS
  184. if (family == AF_INET)
  185. {
  186. #endif
  187. if ((setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *) &one,
  188. sizeof(one))) == -1)
  189. {
  190. snprintf(PQerrormsg, ERROR_MSG_LENGTH,
  191.  "FATAL: StreamServerPort: setsockopt(SO_REUSEADDR) failed: %sn",
  192.  strerror(errno));
  193. fputs(PQerrormsg, stderr);
  194. pqdebug("%s", PQerrormsg);
  195. return STATUS_ERROR;
  196. }
  197. #ifdef ONLY_REUSE_INET_SOCKETS
  198. }
  199. #endif
  200. MemSet((char *) &saddr, 0, sizeof(saddr));
  201. saddr.sa.sa_family = family;
  202. if (family == AF_UNIX)
  203. {
  204. len = UNIXSOCK_PATH(saddr.un, portName);
  205. strcpy(sock_path, saddr.un.sun_path);
  206. /*
  207.  * If the socket exists but nobody has an advisory lock on it we
  208.  * can safely delete the file.
  209.  */
  210. #ifdef HAVE_FCNTL_SETLK
  211. #ifndef __CYGWIN32__
  212. if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0)
  213. #else
  214. if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0)
  215. #endif
  216. {
  217. struct flock lck;
  218. lck.l_whence = SEEK_SET;
  219. lck.l_start = lck.l_len = 0;
  220. lck.l_type = F_WRLCK;
  221. if (fcntl(lock_fd, F_SETLK, &lck) == 0)
  222. {
  223. TPRINTF(TRACE_VERBOSE, "flock on %s, deleting", sock_path);
  224. unlink(sock_path);
  225. }
  226. else
  227. TPRINTF(TRACE_VERBOSE, "flock failed for %s", sock_path);
  228. close(lock_fd);
  229. }
  230. #endif  /* HAVE_FCNTL_SETLK */
  231. }
  232. else
  233. {
  234. saddr.in.sin_addr.s_addr = htonl(INADDR_ANY);
  235. saddr.in.sin_port = htons(portName);
  236. len = sizeof(struct sockaddr_in);
  237. }
  238. err = bind(fd, &saddr.sa, len);
  239. if (err < 0)
  240. {
  241. snprintf(PQerrormsg, ERROR_MSG_LENGTH,
  242.  "FATAL: StreamServerPort: bind() failed: %sn",
  243.  strerror(errno));
  244. strcat(PQerrormsg,
  245.    "tIs another postmaster already running on that port?n");
  246. if (family == AF_UNIX)
  247. {
  248. snprintf(PQerrormsg + strlen(PQerrormsg),
  249.  ERROR_MSG_LENGTH - strlen(PQerrormsg),
  250.  "tIf not, remove socket node (%s) and retry.n",
  251.  sock_path);
  252. }
  253. else
  254. strcat(PQerrormsg, "tIf not, wait a few seconds and retry.n");
  255. fputs(PQerrormsg, stderr);
  256. pqdebug("%s", PQerrormsg);
  257. return STATUS_ERROR;
  258. }
  259. if (family == AF_UNIX)
  260. {
  261. on_proc_exit(StreamDoUnlink, NULL);
  262. /*
  263.  * Open the socket file and get an advisory lock on it. The
  264.  * lock_fd is left open to keep the lock.
  265.  */
  266. #ifdef HAVE_FCNTL_SETLK
  267. #ifndef __CYGWIN32__
  268. if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK, 0666)) >= 0)
  269. #else
  270. if ((lock_fd = open(sock_path, O_WRONLY | O_NONBLOCK | O_BINARY, 0666)) >= 0)
  271. #endif
  272. {
  273. struct flock lck;
  274. lck.l_whence = SEEK_SET;
  275. lck.l_start = lck.l_len = 0;
  276. lck.l_type = F_WRLCK;
  277. if (fcntl(lock_fd, F_SETLK, &lck) != 0)
  278. TPRINTF(TRACE_VERBOSE, "flock error for %s", sock_path);
  279. }
  280. #endif  /* HAVE_FCNTL_SETLK */
  281. }
  282. listen(fd, SOMAXCONN);
  283. /*
  284.  * MS: I took this code from Dillon's version.  It makes the listening
  285.  * port non-blocking.  That is not necessary (and may tickle kernel
  286.  * bugs).
  287.  *
  288.  * fcntl(fd, F_SETFD, 1); fcntl(fd, F_SETFL, FNDELAY);
  289.  */
  290. *fdP = fd;
  291. if (family == AF_UNIX)
  292. chmod(sock_path, 0777);
  293. return STATUS_OK;
  294. }
  295. /*
  296.  * StreamConnection -- create a new connection with client using
  297.  * server port.
  298.  *
  299.  * ASSUME: that this doesn't need to be non-blocking because
  300.  * the Postmaster uses select() to tell when the server master
  301.  * socket is ready for accept().
  302.  *
  303.  * NB: this can NOT call elog() because it is invoked in the postmaster,
  304.  * not in standard backend context.  If we get an error, the best we can do
  305.  * is log it to stderr.
  306.  *
  307.  * RETURNS: STATUS_OK or STATUS_ERROR
  308.  */
  309. int
  310. StreamConnection(int server_fd, Port *port)
  311. {
  312. SOCKET_SIZE_TYPE addrlen;
  313. /* accept connection (and fill in the client (remote) address) */
  314. addrlen = sizeof(port->raddr);
  315. if ((port->sock = accept(server_fd,
  316.  (struct sockaddr *) & port->raddr,
  317.  &addrlen)) < 0)
  318. {
  319. perror("postmaster: StreamConnection: accept");
  320. return STATUS_ERROR;
  321. }
  322. /* fill in the server (local) address */
  323. addrlen = sizeof(port->laddr);
  324. if (getsockname(port->sock, (struct sockaddr *) & port->laddr,
  325. &addrlen) < 0)
  326. {
  327. perror("postmaster: StreamConnection: getsockname");
  328. return STATUS_ERROR;
  329. }
  330. /* select TCP_NODELAY option if it's a TCP connection */
  331. if (port->laddr.sa.sa_family == AF_INET)
  332. {
  333. struct protoent *pe;
  334. int on = 1;
  335. pe = getprotobyname("TCP");
  336. if (pe == NULL)
  337. {
  338. perror("postmaster: StreamConnection: getprotobyname");
  339. return STATUS_ERROR;
  340. }
  341. if (setsockopt(port->sock, pe->p_proto, TCP_NODELAY,
  342.    &on, sizeof(on)) < 0)
  343. {
  344. perror("postmaster: StreamConnection: setsockopt");
  345. return STATUS_ERROR;
  346. }
  347. }
  348. /* reset to non-blocking */
  349. fcntl(port->sock, F_SETFL, 1);
  350. return STATUS_OK;
  351. }
  352. /*
  353.  * StreamClose -- close a client/backend connection
  354.  */
  355. void
  356. StreamClose(int sock)
  357. {
  358. close(sock);
  359. }
  360. /* --------------------------------
  361.  * Low-level I/O routines begin here.
  362.  *
  363.  * These routines communicate with a frontend client across a connection
  364.  * already established by the preceding routines.
  365.  * --------------------------------
  366.  */
  367. /* --------------------------------
  368.  * pq_recvbuf - load some bytes into the input buffer
  369.  *
  370.  * returns 0 if OK, EOF if trouble
  371.  * --------------------------------
  372.  */
  373. static int
  374. pq_recvbuf(void)
  375. {
  376. if (PqRecvPointer > 0)
  377. {
  378. if (PqRecvLength > PqRecvPointer)
  379. {
  380. /* still some unread data, left-justify it in the buffer */
  381. memmove(PqRecvBuffer, PqRecvBuffer + PqRecvPointer,
  382. PqRecvLength - PqRecvPointer);
  383. PqRecvLength -= PqRecvPointer;
  384. PqRecvPointer = 0;
  385. }
  386. else
  387. PqRecvLength = PqRecvPointer = 0;
  388. }
  389. /* Can fill buffer from PqRecvLength and upwards */
  390. for (;;)
  391. {
  392. int r = recv(MyProcPort->sock, PqRecvBuffer + PqRecvLength,
  393.  PQ_BUFFER_SIZE - PqRecvLength, 0);
  394. if (r < 0)
  395. {
  396. if (errno == EINTR)
  397. continue; /* Ok if interrupted */
  398. /*
  399.  * We would like to use elog() here, but dare not because elog
  400.  * tries to write to the client, which will cause problems if
  401.  * we have a hard communications failure ... So just write the
  402.  * message to the postmaster log.
  403.  */
  404. fprintf(stderr, "pq_recvbuf: recv() failed: %sn",
  405. strerror(errno));
  406. return EOF;
  407. }
  408. if (r == 0)
  409. {
  410. /* as above, elog not safe */
  411. fprintf(stderr, "pq_recvbuf: unexpected EOF on client connectionn");
  412. return EOF;
  413. }
  414. /* r contains number of bytes read, so just incr length */
  415. PqRecvLength += r;
  416. return 0;
  417. }
  418. }
  419. /* --------------------------------
  420.  * pq_getbyte - get a single byte from connection, or return EOF
  421.  * --------------------------------
  422.  */
  423. static int
  424. pq_getbyte(void)
  425. {
  426. while (PqRecvPointer >= PqRecvLength)
  427. {
  428. if (pq_recvbuf()) /* If nothing in buffer, then recv some */
  429. return EOF; /* Failed to recv data */
  430. }
  431. return PqRecvBuffer[PqRecvPointer++];
  432. }
  433. /* --------------------------------
  434.  * pq_peekbyte - peek at next byte from connection
  435.  *
  436.  *  Same as pq_getbyte() except we don't advance the pointer.
  437.  * --------------------------------
  438.  */
  439. int
  440. pq_peekbyte(void)
  441. {
  442. while (PqRecvPointer >= PqRecvLength)
  443. {
  444. if (pq_recvbuf()) /* If nothing in buffer, then recv some */
  445. return EOF; /* Failed to recv data */
  446. }
  447. return PqRecvBuffer[PqRecvPointer];
  448. }
  449. /* --------------------------------
  450.  * pq_getbytes - get a known number of bytes from connection
  451.  *
  452.  * returns 0 if OK, EOF if trouble
  453.  * --------------------------------
  454.  */
  455. int
  456. pq_getbytes(char *s, size_t len)
  457. {
  458. size_t amount;
  459. while (len > 0)
  460. {
  461. while (PqRecvPointer >= PqRecvLength)
  462. {
  463. if (pq_recvbuf()) /* If nothing in buffer, then recv some */
  464. return EOF; /* Failed to recv data */
  465. }
  466. amount = PqRecvLength - PqRecvPointer;
  467. if (amount > len)
  468. amount = len;
  469. memcpy(s, PqRecvBuffer + PqRecvPointer, amount);
  470. PqRecvPointer += amount;
  471. s += amount;
  472. len -= amount;
  473. }
  474. return 0;
  475. }
  476. /* --------------------------------
  477.  * pq_getstring - get a null terminated string from connection
  478.  *
  479.  * NOTE: this routine does not do any MULTIBYTE conversion,
  480.  * even though it is presumably useful only for text, because
  481.  * no code in this module should depend on MULTIBYTE mode.
  482.  * See pq_getstr in pqformat.c for that.
  483.  *
  484.  * FIXME: we ought to use an expansible StringInfo buffer,
  485.  * rather than dropping data if the message is too long.
  486.  *
  487.  * returns 0 if OK, EOF if trouble
  488.  * --------------------------------
  489.  */
  490. int
  491. pq_getstring(char *s, size_t len)
  492. {
  493. int c;
  494. /*
  495.  * Keep on reading until we get the terminating '', discarding any
  496.  * bytes we don't have room for.
  497.  */
  498. while ((c = pq_getbyte()) != EOF && c != '')
  499. {
  500. if (len > 1)
  501. {
  502. *s++ = c;
  503. len--;
  504. }
  505. }
  506. *s = '';
  507. if (c == EOF)
  508. return EOF;
  509. return 0;
  510. }
  511. /* --------------------------------
  512.  * pq_putbytes - send bytes to connection (not flushed until pq_flush)
  513.  *
  514.  * returns 0 if OK, EOF if trouble
  515.  * --------------------------------
  516.  */
  517. int
  518. pq_putbytes(const char *s, size_t len)
  519. {
  520. size_t amount;
  521. while (len > 0)
  522. {
  523. if (PqSendPointer >= PQ_BUFFER_SIZE)
  524. if (pq_flush()) /* If buffer is full, then flush it out */
  525. return EOF;
  526. amount = PQ_BUFFER_SIZE - PqSendPointer;
  527. if (amount > len)
  528. amount = len;
  529. memcpy(PqSendBuffer + PqSendPointer, s, amount);
  530. PqSendPointer += amount;
  531. s += amount;
  532. len -= amount;
  533. }
  534. return 0;
  535. }
  536. /* --------------------------------
  537.  * pq_flush - flush pending output
  538.  *
  539.  * returns 0 if OK, EOF if trouble
  540.  * --------------------------------
  541.  */
  542. int
  543. pq_flush(void)
  544. {
  545. unsigned char *bufptr = PqSendBuffer;
  546. unsigned char *bufend = PqSendBuffer + PqSendPointer;
  547. while (bufptr < bufend)
  548. {
  549. int r = send(MyProcPort->sock, bufptr, bufend - bufptr, 0);
  550. if (r <= 0)
  551. {
  552. if (errno == EINTR)
  553. continue; /* Ok if we were interrupted */
  554. /*
  555.  * We would like to use elog() here, but cannot because elog
  556.  * tries to write to the client, which would cause a recursive
  557.  * flush attempt!  So just write it out to the postmaster log.
  558.  */
  559. fprintf(stderr, "pq_flush: send() failed: %sn",
  560. strerror(errno));
  561. /*
  562.  * We drop the buffered data anyway so that processing can
  563.  * continue, even though we'll probably quit soon.
  564.  */
  565. PqSendPointer = 0;
  566. return EOF;
  567. }
  568. bufptr += r;
  569. }
  570. PqSendPointer = 0;
  571. return 0;
  572. }
  573. /* --------------------------------
  574.  * Message-level I/O routines begin here.
  575.  *
  576.  * These routines understand about COPY OUT protocol.
  577.  * --------------------------------
  578.  */
  579. /* --------------------------------
  580.  * pq_putmessage - send a normal message (suppressed in COPY OUT mode)
  581.  *
  582.  * If msgtype is not '', it is a message type code to place before
  583.  * the message body (len counts only the body size!).
  584.  * If msgtype is '', then the buffer already includes the type code.
  585.  *
  586.  * All normal messages are suppressed while COPY OUT is in progress.
  587.  * (In practice only NOTICE messages might get emitted then; dropping
  588.  * them is annoying, but at least they will still appear in the
  589.  * postmaster log.)
  590.  *
  591.  * returns 0 if OK, EOF if trouble
  592.  * --------------------------------
  593.  */
  594. int
  595. pq_putmessage(char msgtype, const char *s, size_t len)
  596. {
  597. if (DoingCopyOut)
  598. return 0;
  599. if (msgtype)
  600. if (pq_putbytes(&msgtype, 1))
  601. return EOF;
  602. return pq_putbytes(s, len);
  603. }
  604. /* --------------------------------
  605.  * pq_startcopyout - inform libpq that a COPY OUT transfer is beginning
  606.  * --------------------------------
  607.  */
  608. void
  609. pq_startcopyout(void)
  610. {
  611. DoingCopyOut = true;
  612. }
  613. /* --------------------------------
  614.  * pq_endcopyout - end a COPY OUT transfer
  615.  *
  616.  * If errorAbort is indicated, we are aborting a COPY OUT due to an error,
  617.  * and must send a terminator line.  Since a partial data line might have
  618.  * been emitted, send a couple of newlines first (the first one could
  619.  * get absorbed by a backslash...)
  620.  * --------------------------------
  621.  */
  622. void
  623. pq_endcopyout(bool errorAbort)
  624. {
  625. if (!DoingCopyOut)
  626. return;
  627. if (errorAbort)
  628. pq_putbytes("nn\.n", 5);
  629. /* in non-error case, copy.c will have emitted the terminator line */
  630. DoingCopyOut = false;
  631. }