bufipc.c
上传用户:dgyhgb
上传日期:2007-01-07
资源大小:676k
文件大小:15k
源码类别:

SQL Server

开发平台:

Unix_Linux

  1. /*
  2.  *  bufipc.c  - Buffer communication functions 
  3.  *              Kernel of GNU SQL-server. Buffer
  4.  *
  5.  * This file is a part of GNU SQL Server
  6.  *
  7.  *  Copyright (c) 1996, 1997, Free Software Foundation, Inc
  8.  *  Developed at the Institute of System Programming
  9.  *  This file is written by  Vera Ponomarenko
  10.  *
  11.  *  This program is free software; you can redistribute it and/or modify
  12.  *  it under the terms of the GNU General Public License as published by
  13.  *  the Free Software Foundation; either version 2 of the License, or
  14.  *  (at your option) any later version.
  15.  *
  16.  *  This program is distributed in the hope that it will be useful,
  17.  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
  18.  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  19.  *  GNU General Public License for more details.
  20.  *
  21.  *  You should have received a copy of the GNU General Public License
  22.  *  along with this program; if not, write to the Free Software
  23.  *  Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  24.  *
  25.  *  Contacts:   gss@ispras.ru
  26.  *
  27.  */
  28. /* $Id: bufipc.c,v 1.246 1997/03/31 11:05:20 kml Exp $ */
  29. #include "setup_os.h"
  30. #ifdef HAVE_SYS_FILES_H
  31. #include <sys/file.h>
  32. #endif
  33. #ifdef HAVE_UNISTD_H
  34. #include <unistd.h>
  35. #endif
  36. #include <sys/times.h>
  37. #include "fdeclbuf.h"
  38. #include <sys/msg.h>
  39. #include <sys/shm.h>
  40. #ifdef HAVE_FCNTL_H
  41. #include <fcntl.h>
  42. #endif
  43. #ifdef STDC_HEADERS
  44. #include <string.h>
  45. #else
  46. #include <strings.h>
  47. #endif
  48. #include "rnmtp.h"
  49. #include "inpop.h"
  50. #include "pupsi.h"
  51. #include "pupans.h"
  52. #include "bufdefs.h"
  53. #include "strml.h"
  54. #include "totdecl.h"
  55. #ifndef CLK_TCK
  56. #define CLK_TCK 60
  57. #endif
  58. #define PRINT(x, y) /* printf(x, y); */
  59. #define size4b sizeof(i4_t)
  60. #define size2b sizeof(i2_t)
  61. static struct tms buffer;
  62. static i4_t tt, buftime;
  63. extern i4_t errno;
  64. i4_t msqidm, msqidbf;
  65. i4_t MAXNBUF, MAXTACT, MAXNOP, MAXSEGNUM;
  66. key_t FSEGNUM;
  67. i4_t N_buf, N_opt;
  68. char PRXR = 0;
  69. i4_t *fdseg;
  70. char **tabdseg;
  71. char **seg_file_name;
  72. i4_t N_seg;
  73. pid_t parent;
  74. i4_t max_buffers_number;
  75. u2_t I_fix = NO; /* indicator of fixation */
  76. i4_t N_log = 0; /* number of operations by log */
  77. i4_t N_op = 0; /* number of ended operations */
  78. #define ARG(num, what, type)   sscanf (argv[num], "%d", &argum); 
  79.                                what = (type)(argum)
  80. void
  81. main (i4_t argc, char **argv)
  82. {
  83.   i4_t i, k;
  84.   i4_t n;
  85.   key_t keymj, keybf;
  86.   i4_t argum;
  87.   
  88.   MEET_DEBUGGER;
  89.   setbuf (stdout, NULL);
  90.   ARG(2, keybf, key_t);
  91.   ARG(3, keymj, key_t);
  92.   ARG(4, N_opt, i4_t);
  93.   ARG(5, MAXNBUF, i4_t);
  94.   ARG(6, MAXTACT, i4_t);
  95.   ARG(7, MAXNOP, i4_t);
  96.   ARG(8, FSEGNUM, key_t);
  97.   ARG(argc - 1, parent, pid_t);
  98.   /* 
  99.    * reading DB segments information 
  100.    */
  101.   for (i = 9, n = 0; i < argc - 1; i += 2)
  102.     {
  103.       if (!sscanf (argv[i], "%d", &k))
  104. break;
  105.       if (n < k)
  106. n = k;
  107.     }
  108.   /* 'n' contains the maximum number of segment */
  109.   MAXSEGNUM = n + 1;
  110.   /* allocating space for segment information */
  111.   fdseg         = (i4_t *)  xmalloc (MAXSEGNUM * sizeof (i4_t));
  112.   seg_file_name = (char **) xmalloc (MAXSEGNUM * sizeof (char **));
  113.   for (i = 0; i < MAXSEGNUM ; i++)
  114.     {
  115.       fdseg[i] = -1;
  116.       seg_file_name[i] = NULL;
  117.     }
  118.   seg_file_name[0] = xmalloc (strlen (argv[1]));
  119.   strcpy (seg_file_name[0], argv[1]);  
  120.   for (i = 9; i < argc - 1; i+=2)
  121.     {
  122.       sscanf (argv[i], "%d", &k);
  123.       seg_file_name[k] = xmalloc (strlen (argv[i+1]));
  124.       strcpy (seg_file_name[k], argv[i+1]);
  125.     }
  126.   /*---------------------------------------------------------*/
  127.   if ((msqidbf = msgget (keybf, IPC_CREAT | DEFAULT_ACCESS_RIGHTS)) < 0)
  128.     {
  129.       perror ("BUF.msgget: Queue for BUF");
  130.       exit (1);
  131.     }
  132.   MSG_INIT (msqidm, keymj, "MJ");
  133.   init_hash ();
  134.   tabdseg = (char **) xmalloc (MAXNBUF * sizeof (char *));
  135.   for (i = 0; i < MAXNBUF; i++)
  136.     tabdseg[i] = NULL;
  137.   max_buffers_number = MAXNBUF;
  138.   N_buf = 0;
  139.   N_seg = 0;
  140.   
  141.   for (;;)
  142.     {
  143.       msg_rcv ();
  144.       tact ();
  145.     }
  146. }
  147. #define UNPACKMSG(p) { BUFUPACK(p,trnum);BUFUPACK(p,segn);BUFUPACK(p,pn);}
  148. void
  149. msg_rcv (void)
  150. {
  151.   struct msg_buf rbuf;
  152.   u2_t trnum, pn, segn;
  153.   char *p, *p1;
  154.   register i4_t op;
  155.   i4_t admj;
  156.   struct BUFF *buf;
  157.   rbuf.mtype = 0;
  158.       
  159.   PRINT ("BUF.msg_rcv: msqidbf = %dn", (i4_t)msqidbf)
  160.   
  161.   __MSGRCV(msqidbf, &rbuf, SZMSGBUF, -(ANSBUF - 1), 0,"BUF.msgrcv");
  162.       
  163.   PRINT ("BUF.msg_rcv: rbuf.mtype = %dn", rbuf.mtype)
  164.       
  165.   times (&buffer);
  166.   tt = buffer.tms_utime;
  167.   op = rbuf.mtype;
  168.   p = rbuf.mtext;
  169.   switch (op)
  170.     {
  171.     case INIFIXB:
  172.       inifixb (t4bunpack (p), t4bunpack (p + size4b));
  173.       break;
  174.     case GETPAGE:
  175.       UNPACKMSG(p);
  176.       buf = get (segn, pn, 'r');
  177.       buf_to_user (trnum, (key_t) buf->b_seg->keyseg);
  178.       break;
  179.     case PUTPAGE:
  180.       UNPACKMSG(p);
  181.       BUFUPACK(p,admj);
  182.       put (segn, pn, admj, *p);
  183.       user_p (trnum, 0);
  184.       break;
  185.     case LOCKPAGE:
  186.       UNPACKMSG(p);
  187.       if (segn >= (u2_t) MAXSEGNUM || seg_file_name[segn] == NULL)
  188. user_p (trnum, NO_SUCH_SEG);
  189.       else
  190. {
  191.           PRINTF (("BUF.msg_rcv: LOCKPAGE segn=%d,pn = %d,f_name=%sn",
  192.                   segn, pn, seg_file_name[segn]));
  193.   if (buflock (trnum, segn, pn, *p, 0) == 0)
  194.     user_p (trnum, 0);
  195. }
  196.       break;
  197.     case ENFORCE:
  198.       UNPACKMSG(p);
  199.       if (enforce (trnum, segn, pn) == 0)
  200. user_p (trnum, 0);
  201.       break;
  202.     case UNLKPG:
  203.       UNPACKMSG(p);
  204.       unlock (segn, pn, p);
  205.       user_p (trnum, 0);
  206.       break;
  207.     case NEWGET:
  208.       UNPACKMSG(p);
  209.       buf = get (segn, pn, 'n');
  210.       buf_to_user (trnum, (key_t) buf->b_seg->keyseg);
  211.       break;
  212.     case LOCKGET:
  213.       UNPACKMSG(p);
  214.       if (segn >= (u2_t) MAXSEGNUM || seg_file_name[segn] == NULL)
  215. buf_to_user (trnum, (key_t)NO_SUCH_SEG);
  216.       else if (buflock (trnum, segn, pn, *p, 1) == 0)
  217.         {
  218.           buf = get (segn, pn, 'r');
  219.           buf_to_user (trnum, (key_t) buf->b_seg->keyseg);
  220. }
  221.       break;
  222.     case PUTUNL:
  223.       p1 = p + 2 * size2b;
  224.       UNPACKMSG(p);
  225.       BUFUPACK(p,admj);
  226.       put (segn, pn, admj, *p);
  227.       unlock (segn, 1, p1);
  228.       user_p (trnum, 0);
  229.       break;
  230.     case ENDOP:
  231.       end_op ();
  232.       break;
  233.     case BEGTACT:
  234.       tact ();
  235.       break;
  236.     case OPTNUM:
  237.       optimal (t4bunpack (p));
  238.       MAXTACT = t4bunpack (p + size4b);
  239.       break;
  240.     case FINIT:
  241.       finit ();
  242.       break;
  243.     default:
  244.       printf ("BUF.msg_rcv: No such operation in interface BUFn");
  245.       break;
  246.     }
  247.   times (&buffer);
  248.   tt = buffer.tms_utime - tt;
  249.   buftime += tt;
  250. }
  251. void
  252. buf_to_user (u2_t trnum, key_t keys)
  253. {
  254.   struct msg_buf sbuf;
  255.   sbuf.mtype = trnum;
  256.   TPACK(keys, sbuf.mtext);
  257.   __MSGSND(msqidbf, &sbuf, sizeof (key_t), 0,"BUF.msgsnd: Answer TRN");
  258. }
  259. void
  260. user_p (u2_t trnum, i4_t type)
  261. {
  262.   struct msg_buf sbuf;
  263.   sbuf.mtype = trnum;
  264.   TPACK(type, sbuf.mtext);
  265.   __MSGSND(msqidbf, &sbuf, size4b, 0,"BUF.msgsnd: Answer TRN");
  266. }
  267. /*  Functions realizing calls of Microlog */
  268. i4_t cur_endmj = NULL_MICRO; /* current end of microlog */
  269. i4_t pushaddmj = NULL_MICRO; /* last push MJ address */
  270. void
  271. push_micro (i4_t addr) /* push microlog buffer */
  272. {
  273.   u2_t pn, pn_push, off, off_push;
  274.   char *a, *b;
  275.   a = (char *) &addr;
  276.   b = (char *) &pushaddmj;
  277.   pn = t2bunpack (a);
  278.   pn_push = t2bunpack (b);
  279.   off = t2bunpack (a + size2b);
  280.   off_push = t2bunpack (b + size2b);
  281.   if (pn > pn_push)
  282.     {
  283.       pushmicro (addr);
  284.     }
  285.   else if (pn == pn_push && off > off_push)
  286.     {
  287.       pushmicro (addr);
  288.     }
  289. }
  290. void
  291. pushmicro (i4_t addr)
  292. {
  293.   struct msg_buf sbuf;
  294.   sbuf.mtype = OUTDISK;
  295.   t4bpack (addr, sbuf.mtext);
  296.   __MSGSND(msqidm, &sbuf, size4b, 0,"BUF.msgsnd: OUTDISK");
  297.   __MSGRCV(msqidm, &sbuf, 0, ANSMJ, 0,"BUF.msgrcv");
  298.   pushaddmj = addr;
  299. }
  300. struct des_seg *
  301. new_seg (void) /* Create a segment */
  302. {
  303.   register i4_t i;
  304.   struct des_seg *desseg;
  305.   key_t key;
  306.   i4_t shmid;
  307.   for (i = 0; i < max_buffers_number; i++)
  308.     if ((desseg = (struct des_seg *) tabdseg[i]) == NULL)
  309.       {
  310.         desseg = (struct des_seg *) get_empty (sizeof (struct des_seg));
  311.         key = FSEGNUM + i;
  312.         while ((shmid = shmget (key, BD_PAGESIZE,
  313.                                 IPC_CREAT | DEFAULT_ACCESS_RIGHTS)) < 0)
  314.           {
  315.             if ( (N_buf + 1) < N_opt)
  316.               fprintf (stderr, "BUF.new_seg: number of buffers is "
  317.                        "less optimal number of buffersn");
  318.             waitfor_seg (N_buf - 1);
  319.             max_buffers_number = N_buf;
  320.             /*            
  321.             perror ("BUF.shmget");
  322.             exit (1);
  323.             */
  324.           }
  325.         desseg->keyseg = key;
  326.         desseg->idseg = shmid;
  327.         tabdseg[i] = (char *) desseg;
  328.         N_seg++;
  329.         return (desseg);        
  330.       }
  331.   return (NULL);  
  332. }
  333. void
  334. del_seg (struct des_seg *desseg)
  335. {
  336.   struct shmid_ds shmstr;
  337.   register i4_t i;  
  338.   shmctl (desseg->idseg, IPC_RMID, &shmstr);
  339.   for (i = 0; i < MAXNBUF; i++)
  340.     if (desseg == (struct des_seg *) tabdseg[i])
  341.       {
  342. tabdseg[i] = NULL;
  343.         break;
  344.       }
  345.   xfree (desseg);
  346.   N_seg--;
  347. }
  348. /*****************************************************************************
  349.                                 FIXATION
  350. */
  351. void
  352. inifixb (i4_t nop, i4_t ljadd)
  353. {
  354.   struct msg_buf sbuf;
  355.   N_log = nop;
  356.   if (N_op != N_log)
  357.     fix_mode ();
  358.   push_micro (cur_endmj);
  359.   do_fix ();
  360.   sbuf.mtype = DOFIX; /* Fixation */
  361.   t4bpack (ljadd, sbuf.mtext);
  362.   t4bpack (PRXR, (char*)sbuf.mtext+size4b);
  363.   __MSGSND(msqidm, &sbuf, size4b + 1, 0,"BUF.msgsnd: DOFIX");
  364.   __MSGRCV(msqidm, &sbuf, 0, ANSMJ, 0,"BUF.msgrcv");
  365.   I_fix = NO;
  366.   N_op = 0;
  367.   sbuf.mtype = ANSBUF;
  368.   t4bpack (0, sbuf.mtext);
  369.   __MSGSND(msqidbf, &sbuf, sizeof (i4_t), 0,"BUF.msgsnd: ANSLJ");
  370. }
  371. /**************************** real fixation *********************************/
  372. extern struct BUFF *prios[PRIORITIES]; /* priority rings */
  373. void
  374. do_fix (void)
  375. {
  376.   u2_t prio;
  377.   struct BUFF *buf;
  378.   for (prio = 0; prio < PRIORITIES; prio++)
  379.     if ((buf = prios[prio]) != NULL)
  380.       do
  381. {
  382.   push_buf (buf);
  383.   buf = buf->b_next;
  384. }
  385.       while (buf != prios[prio]);
  386.   cur_endmj = NULL_MICRO;
  387.   pushaddmj = NULL_MICRO;
  388. }
  389. /**************************** fixation mode *********************************/
  390. void
  391. fix_mode (void)
  392. {
  393.   I_fix = YES;
  394.   while (N_op != N_log)
  395.     msg_rcv ();
  396. }
  397. /*************************** end of operation *******************************/
  398. void
  399. end_op (void)
  400. {
  401.   N_op++;
  402. }
  403. /****************************** weak error **********************************/
  404. void
  405. weak_err (char *text)
  406. {
  407.   printf ("%s.n", text);
  408. }
  409. /***************************** read/write buffer ********************************/
  410. void
  411. read_buf (struct BUFF *buf)
  412. {
  413.   u2_t segn, pn;
  414.   char *shm;
  415.   off_t n;
  416.   i4_t fd;
  417.   segn = buf->b_page->p_seg;
  418.   pn = buf->b_page->p_page;
  419.   if (fdseg[segn] < 0)
  420.     if ((fdseg[segn] = open (seg_file_name[segn], O_RDWR, 0644)) < 0)
  421.       {
  422.         printf ("BUF.read_buf: seg_file_name = %sn", seg_file_name[segn]);
  423. perror ("SEGFILE: open error");
  424. exit (1);
  425.       }
  426.   fd = fdseg[segn]; 
  427.   if ((n = lseek (fd, BD_PAGESIZE * pn, SEEK_SET)) < 0)
  428.     {
  429. /*      printf ("BUF.read_buf: lseek error errno=%d, fd = %dn", errno, fdseg[segn]);*/
  430.       ADM_ERRFU (MERR);
  431.     }
  432.   if ((shm = shmat (buf->b_seg->idseg, NULL, 0)) == (char *) -1)
  433.     {
  434.       perror ("BUF.shmat");
  435.       exit (1);
  436.     }
  437.   if (read (fd, shm, BD_PAGESIZE) < 0)
  438.     {
  439.       printf ("BUF.read_buf: read error errno=%dn", errno);
  440.       ADM_ERRFU (MERR);
  441.     }
  442.   if (shmdt (shm) < 0)
  443.     {
  444.       perror ("BUF.shmdt");
  445.       exit (1);
  446.     }
  447. }
  448. void
  449. write_buf (struct BUFF *buf)
  450. {
  451.   u2_t segn, pn;
  452.   char *shm;
  453.   off_t n;
  454.   i4_t fd;
  455.   segn = buf->b_page->p_seg;
  456.   pn = buf->b_page->p_page;
  457.   if (fdseg[segn] < 0)
  458.     if ((fdseg[segn] = open (seg_file_name[segn], O_RDWR, 0644)) < 0)
  459.       {
  460.         printf ("BUF.write_buf: sn = %d, seg_file_name = %sn", segn, seg_file_name[segn]);
  461. perror ("SEGFILE: open error");
  462. exit (1);
  463.       }
  464.   fd = fdseg[segn];
  465.   if ((n = lseek (fd, BD_PAGESIZE * pn, SEEK_SET)) < 0)
  466.     {
  467.       ADM_ERRFU (MERR);
  468.     }
  469.   if ((shm = shmat (buf->b_seg->idseg, NULL, 0)) == (char *) -1)
  470.     {
  471.       perror ("BUF.shmat");
  472.       exit (1);
  473.     }
  474.   if (write (fd, shm, BD_PAGESIZE) != BD_PAGESIZE)
  475.     ADM_ERRFU (MERR);
  476.   if (shmdt (shm) < 0)
  477.     {
  478.       perror ("BUF.shmdt");
  479.       exit (1);
  480.     }
  481.   buf->b_prmod = PRNMOD;
  482.   buf->b_micro = NULL_MICRO;
  483. }
  484. void
  485. ADM_ERRFU (i4_t p)
  486. {
  487. #if 1
  488.   perror ("BUF. ERRFU");
  489. #else
  490.   struct msg_buf sbuf;
  491.   sbuf.mtype = ERRFU;
  492.   sbuf.mtext[0] = (char) p;
  493.   sbuf.mtext[1] = BF_PPS;
  494.   __MSGSND(msqida, &sbuf, 2, 0,"BUF. ERRFU");
  495. #endif
  496. }
  497. void
  498. finit (void)
  499. {
  500.   register i4_t i;
  501.   struct shmid_ds shmstr;
  502.   struct msg_buf sbuf;
  503.   struct BUFF *buf;
  504.   for (i = 0; i < MAXSEGNUM; i++)
  505.     if (fdseg[i] >= 0)
  506.       close (fdseg[i]);
  507.   xfree ((char *) fdseg);
  508.   PRINTF (("BUF.finit: segments are closedn"));
  509.   for (i = 0; i < MAXSEGNUM; i++)
  510.     if (seg_file_name[i] != NULL)
  511.       xfree ((char *) seg_file_name[i]);
  512.   xfree ((char *) seg_file_name);
  513.   PRINTF (("BUF.finit: seg_file_names are free n"));
  514.   for (i = 0; i < PRIORITIES; i++)
  515.     {
  516.       PRINTF (("BUF.finit: prios[%d]=%dn", i, (i4_t) prios[i]));
  517.       if ((buf = prios[i]) != NULL)
  518. do
  519.   {
  520.     if (buf->b_seg != NULL)
  521.       {
  522. PRINTF (("BUF.finit: idseg=%dn", buf->b_seg->idseg));
  523. shmctl (buf->b_seg->idseg, IPC_RMID, &shmstr);
  524.       }
  525.   }
  526. while ((buf = buf->b_next) != prios[i]);
  527.     }
  528.   PRINTF (("BUF.finit.e: buftime=%ld(msec)n",
  529.            buffer.tms_utime * 1000 / CLK_TCK));
  530.   sbuf.mtype = ANSBUF;
  531.   sbuf.mtext[0] = 0;
  532.   __MSGSND(msqidbf, &sbuf, 1, 0,"BUF.msgsnd: Answer ADM");
  533. }
  534. void
  535. waitfor_seg (i4_t buf_num)
  536. {
  537.   struct msg_buf rbuf;
  538.   u2_t trnum, pn, segn;
  539.   char *p, *p1;
  540.   register i4_t op;
  541.   i4_t admj;
  542.   for (; (N_buf + 1) >= buf_num;)
  543.     {
  544.       rbuf.mtype = 0;
  545.       
  546.       PRINT ("BUF.waitfor_seg: msqidbf = %dn", (i4_t)msqidbf)
  547.       
  548.       __MSGRCV(msqidbf, &rbuf, BD_PAGESIZE, -(ANSBUF - 1), 0,"BUF.msgrcv");
  549.       
  550.       PRINT ("BUF.waitfor_seg: rbuf.mtype = %dn", rbuf.mtype)
  551.       
  552.       op = rbuf.mtype;
  553.       p = rbuf.mtext;
  554.       switch (op)
  555. {
  556. case INIFIXB:
  557.   inifixb (t4bunpack (p), t4bunpack (p + size4b));
  558.   break;
  559. case PUTPAGE:
  560.           UNPACKMSG(p);
  561.   BUFUPACK(p,admj);
  562.   put (segn, pn, admj, *p);
  563.   user_p (trnum, 0);
  564.   break;
  565. case LOCKPAGE:
  566.           UNPACKMSG(p);
  567.   if (segn >= (u2_t) MAXSEGNUM || seg_file_name[segn] == NULL)
  568.     user_p (trnum, NO_SUCH_SEG);
  569.   else if (buflock (trnum, segn, pn, *p, 0) == 0)
  570.             user_p (trnum, 0);
  571.   break;
  572. case ENFORCE:
  573.           UNPACKMSG(p);
  574.   if (enforce (trnum, segn, pn) == 0)
  575.     user_p (trnum, 0);
  576.   break;
  577. case UNLKPG:
  578.           UNPACKMSG(p);
  579.   unlock (segn, pn, p);
  580.   user_p (trnum, 0);
  581.   break;
  582. case PUTUNL:
  583.   p1 = p + 2*size2b;
  584.           UNPACKMSG(p);
  585.   BUFUPACK(p,admj);
  586.   put (segn, pn, admj, *p);
  587.   unlock (segn, 1, p1);
  588.   user_p (trnum, 0);
  589.   break;
  590. case ENDOP:
  591.   end_op ();
  592.   break;
  593. case BEGTACT:
  594.   tact ();
  595.   break;
  596. case OPTNUM:
  597.   optimal (t4bunpack (p));
  598.           MAXTACT = t4bunpack (p + size4b);          
  599.   break;
  600. case FINIT:
  601.   finit ();
  602.   break;
  603. default:
  604.   printf ("BUF.waitfor_seg: The operation op=%d is disabledn", op);
  605.   break;
  606. }
  607.       tact ();
  608.     }
  609. }
  610. /******************************** the end ***********************************/