dispatch.c
上传用户:dgyhgb
上传日期:2007-01-07
资源大小:676k
文件大小:11k
源码类别:

SQL Server

开发平台:

Unix_Linux

  1. /*
  2.  * dispatch.c - Storage and Transaction Synchrohization Management 
  3.  *              System Administrator of GNU SQL server
  4.  *
  5.  * This file is a part of GNU SQL Server
  6.  *
  7.  * Copyright (c) 1996, 1997, Free Software Foundation, Inc
  8.  * Developed at the Institute of System Programming 
  9.  * This file is written by Vera Ponomarenko
  10.  *
  11.  * This program is free software; you can redistribute it and/or modify it under
  12.  * the terms of the GNU GeSETAneral Public License as published by the Free
  13.  * Software Foundation; either version 2 of the License, or (at your option)
  14.  * any later version.
  15.  *
  16.  * This program is distributed in the hope that it will be useful, but WITHOUT
  17.  * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  18.  * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General Public License for
  19.  * more details.
  20.  *
  21.  * You should have received a copy of the GNU General Public License along with
  22.  * this program; if not, write to the Free Software Foundation, Inc.,
  23.  * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  24.  *
  25.  * Contacts: gss@ispras.ru
  26.  *
  27.  */
  28. /* $Id: dispatch.c,v 1.245 1997/03/31 03:46:38 kml Exp $ */
  29. #include "setup_os.h"
  30. #include <sys/types.h>
  31. #include <sys/ipc.h>
  32. #include <sys/msg.h>
  33. #include <signal.h>
  34. #ifdef HAVE_SYS_WAIT_H
  35. #include <sys/wait.h>
  36. #endif
  37. #ifndef WEXITSTATUS
  38. # define WEXITSTATUS(stat_val) ((unsigned)(stat_val) >> 8)
  39. #endif
  40. #ifndef WIFEXITED
  41. # define WIFEXITED(stat_val) (((stat_val) & 0xff) == 0)
  42. #endif
  43. /* for alarm() : */
  44. #ifdef HAVE_UNISTD_H
  45. #include <unistd.h>
  46. #endif
  47. #include "dispatch.h"
  48. #include "global.h"
  49. #include "inpop.h"
  50. #include "totdecl.h"
  51. #include <assert.h>
  52. #define EXIT      finit(1)
  53. #define RET(r)  { rest = (r); return &rest; }
  54. i4_t default_num;
  55. extern i4_t maxtrans, maxusetran;
  56. extern struct des_trn *tabtr;
  57. extern pid_t pidlj, pidmj, pidbf, pidsn, pidsr, pidmcr;
  58. extern i4_t msgida;
  59. extern struct msg_buf rbuf, sbuf;
  60. extern char nocrtr_fl, finit_fl, *fix_path, fix_lj_fl;
  61. extern volatile i4_t finit_done;
  62. extern volatile i4_t children;
  63. #include "admdef.h"
  64. void finit (i4_t err);
  65. #define PRINT(x, y)  PRINTF((x, y))
  66. /* child signals SIGUSR1 & SIGCHLD handler */
  67. void
  68. sig_usr_hnd (i4_t sig_num)
  69. {
  70.   PRINT ("ADM: signal %d received ", sig_num);
  71.   PRINT ("(children = %d)n",children);
  72.   
  73.   switch (sig_num)
  74.     {
  75.     case SIGUSR1 :
  76.       while (1)
  77. {
  78.   if (msgrcv (msgida, (MSGBUFP)&rbuf,
  79.       BD_PAGESIZE, 0 /*NUM*/, IPC_NOWAIT) < 0)
  80.     break;
  81.   PRINT ("ADM.sig_hnd:  msgreceived mtype = %ldn", rbuf.mtype);
  82.   realop (t2bunpack (rbuf.mtext), rbuf.mtype/*NUM*/,
  83.   rbuf.mtext + sizeof(u2_t));
  84. }
  85.       break;
  86.       
  87.     case SIGCHLD :
  88.       while(1)
  89.         {
  90.           pid_t  pid;
  91.           char  *who = NULL;
  92.           i4_t    i = -1;
  93.           i4_t    status;
  94.           i4_t    exit_code = 0;
  95.           i4_t    exited = 0;
  96.           pid_t *to_clear = &pid;
  97.           char   str[100];
  98.           debug_pid_t *pd = NULL;
  99.           static i4_t subdebuggers = 0;
  100.           pid = waitpid ((pid_t)-1,&status,WNOHANG);
  101.           if (!pid)
  102.             break;
  103. #define CHK(p,name) 
  104.   if((pid==pid##p)||(-pid==pid##p)){to_clear=&(pid##p);who=name;}
  105.           CHK(lj,"LJ");
  106.           CHK(mj,"MJ");
  107.           CHK(bf,"BUF");
  108.           CHK(sn,"SYN");
  109.           CHK(sr,"SRT");
  110.           CHK(mcr,"MCR");
  111. #undef CHK
  112.           strcpy(str,"ADM.WAIT: ");
  113.         
  114.           if(who)
  115.             strcat(str,who);
  116.           else
  117.             {
  118.               for (i = NUM_LJ; i <= maxusetran; i++)
  119.                 if ((((tabtr[i]).idprtr) == pid) || (((tabtr[i]).idprtr) == -pid) )
  120.                   {
  121.                     to_clear = &((tabtr[i]).idprtr);
  122.                     sprintf(str+strlen(str),"trans %d",i);
  123.                     break;
  124.                   }
  125.               if ( i == maxusetran+1) /* if it isn't transaction or system service */
  126.                 for (pd = debuggers_pids; pd ; pd = pd->next)
  127.                   if (pd->debugger == pid)     /* it should be debugger proccess   */
  128.                     {
  129.                       if (pd->next)
  130.                         pd->next->prev = pd->prev;
  131.                       if (pd->prev)
  132.                         pd->prev->next = pd->next;
  133.                       else
  134.                         debuggers_pids = pd->next;
  135.                       xfree(pd);
  136.                       sprintf(str+strlen(str),"debugger %d",(i4_t)pid);
  137.                       i = -1;
  138.                       to_clear = &pid;
  139.                       break;
  140.                     }
  141.               if ( (i == maxusetran+1)  /*       if it isn't even debugger and     */
  142.                    && (subdebuggers>0)) /* there can be some subdebugger process   */
  143.                 {
  144.                   sprintf(str+strlen(str),"debugger %d",(i4_t)pid);
  145.                   i = -1;
  146.                   subdebuggers--;
  147.                   children++;
  148.                   to_clear = &pid;
  149.                 }
  150.             }
  151.           assert((i<= maxusetran) || (who));
  152.           /* let's kill debugger if debugger process died */
  153.           for (pd = debuggers_pids; pd ; pd = pd->next)
  154.             if (pd->to_debug == pid)        /* it should be debugged proccess      */
  155.               {                             /*                                     */
  156.                 kill(pd->debugger,SIGKILL); /* kill debugger                       */
  157.                 subdebuggers++;             /* and wait subdebuger exit            */
  158.               }                             /*                                     */
  159.           if (WIFEXITED (status))
  160.             {
  161.               exit_code = WEXITSTATUS(status);
  162.               exited = 1;
  163.               sprintf(str+strlen(str)," exited code(%d)n",exit_code);
  164.             }
  165.           else if (WIFSIGNALED (status))
  166.             {
  167.               char buf[256]; 
  168.               sprintf(buf,"test ! -f core || mv core core.%dn",pid);
  169.               exit_code = -1;
  170.               exited = 1;
  171.               strcat(str," signalledn");
  172.               system(buf);
  173.             }
  174.           else if (WIFSTOPPED (status))
  175.             strcat(str," stoppedn");
  176.           else
  177.             strcat(str," did something strangen");
  178.         
  179.           PRINTF (("%s",str));
  180.           if (exited)
  181.             {
  182.               i4_t is_killed = (-pid == *to_clear);
  183.               children--;
  184.               PRINTF ((">> children = %dn",children));
  185.               *to_clear = (pid_t)0;  /* clear process entry */
  186.               if ( i >= 0 )
  187.                 endotr (i,(is_killed? 0 : exit_code ));
  188.               else if (!is_killed && who )  /* we didn't kill it */
  189.                 {
  190.                   if (to_clear==&pidmcr) /* MCR exited */
  191.                     {
  192.                       if (exit_code)
  193.                         {
  194.                           printf ("ADM.wait: Crash recovery failed - STOPn");
  195.                           kill_all(0);
  196.                         }
  197.                       else
  198.                         printf ("ADM.wait: MCR exitn");
  199.                     }
  200.                   else if (!finit_done)
  201.                     {
  202.                       printf ("ADM.wait: %s crash -- "
  203.                               "kill all & start recoveryn"
  204.                               ,who);
  205.                       finit(-1); /* total stop and kill everything */
  206.                     }
  207.                 }
  208.               if(!children)
  209.                 {
  210.                   kill_all(0);
  211.                   break;
  212.                 }
  213.             }
  214.         } /* case SIGCHLD    */
  215.       break;
  216.     }   /* switch(sig_num) */
  217. } /* sig_usr_hnd */
  218. /* Initialization of transaction & interpretator (what == INTERPRET) *
  219.  * or transaction & compiler (what == COMPILER) process.             *
  220.  * arg->wait_time - after this time (in seconds) after last          *
  221.  * client calling created process must be finished.                  *
  222.  * Returns created process identifier or error (rpc_id < 0)          */
  223. RPC_SVC_PROTO(res,init_arg,create_transaction,1)
  224. {
  225.   static i4_t trn_num;
  226.   static res rest;
  227.   static i4_t id[2];
  228.   
  229.   trn_num = (nocrtr_fl) ? -NOCRTR : creatr (in);
  230.   rest.proc_id.opq_len = (trn_num < 0) ? 0 : 2 * SZ_LNG;
  231.   if (rest.proc_id.opq_len)
  232.     {
  233.       id[0] = trn_num;
  234.       id[1] = tabtr[trn_num].cretime;
  235.     }
  236.   rest.proc_id.opq_val = (rest.proc_id.opq_len) ? (char *)id : NULL;
  237.   rest.rpc_id = (trn_num < 0) ? trn_num : DEFAULT_TRN + trn_num;
  238.   return &rest;
  239. } /* transaction_create */
  240. /* Checking of interpretator state.     *
  241.  * Returns 0 if interpretator is ready, *
  242.  * NEED_WAIT or error (<0) else         */
  243. RPC_SVC_PROTO(i4_t,opq,is_ready,1)
  244. {
  245.   static i4_t rest;
  246.   i4_t trn_num;
  247.   
  248.   /* Attempt to receive messages by dispatcher */
  249.   while (1)
  250.     {
  251.       if (msgrcv (msgida, (MSGBUFP)&rbuf,
  252.   BD_PAGESIZE, 0 /*NUM*/, IPC_NOWAIT) < 0)
  253. break;
  254.       PRINT ("ADM.is_ready:  msgreceived mtype = %ldn", rbuf.mtype);
  255.       realop (t2bunpack (rbuf.mtext), rbuf.mtype,
  256.       rbuf.mtext + sizeof(u2_t));
  257.     }
  258.   
  259.   if (!in || in->opq_len != SZ_LNG * 2 || !(in->opq_val))
  260.     RET (-ER_CLNT);
  261.   
  262.   trn_num = ((i4_t *)(in->opq_val))[0];
  263.   if (trn_num > maxusetran || !(tabtr[trn_num].idprtr) ||
  264.       tabtr[trn_num].cretime != ((i4_t *)(in->opq_val))[1])
  265.     RET (-TRN_EXITED);
  266.     
  267.   PRINT ("ADM.IS_READY: request from trn %ldn", trn_num);
  268.   
  269.   switch (tabtr[trn_num].res_ready)
  270.     { 
  271.     case 0 : /* it's first request from client */
  272.       kill ( tabtr[trn_num].idprtr, SIGUSR2);
  273.       tabtr[trn_num].res_ready = 2;
  274.       
  275.     case 2 : /* it isn't first client's request & *
  276.       * interpretator isn't ready         */ 
  277.       rest = NEED_WAIT;
  278.       break;
  279.       
  280.     case 1 : /* interpretator is ready for sending of the result */
  281.       tabtr[trn_num].res_ready = 0;
  282.       rest = 0;
  283.       break;
  284.       
  285.     }      
  286.   
  287.   return &rest;
  288. } /* is_ready */
  289. /* Compilation of SQL text.                       *
  290.  * Returns error code in elem.indicator & (if not *
  291.  * error) path to created module (header.seq)     */
  292. /* Stop all works. All child process will be killed with  *
  293.    making ROLLBACK()  & dispatcher will finish it's work. */
  294. RPC_SVC_PROTO(i4_t,int,kill_all,1)
  295. {
  296.   static i4_t rest;
  297.   
  298.   finit(0); 
  299.   RET (0);
  300. }
  301. /* Process of transaction (for interpretator *
  302.  * or compilator) finishing                  */
  303. RPC_SVC_PROTO(i4_t,opq,trn_kill,1)
  304. {
  305.   i4_t i;
  306.   static i4_t rest = 0;
  307.     
  308.   if (!in || in->opq_len != SZ_LNG * 2 || !(in->opq_val))
  309.     RET (-TRN_ID);
  310.   
  311.   i = ((i4_t *)(in->opq_val))[0];
  312.   if (i <= maxtrans && tabtr[i].idprtr &&
  313.       tabtr[i].cretime == ((i4_t *)(in->opq_val))[1])
  314.     {
  315.       TRN_SEND (FINIT, 0);
  316.     }
  317.   RET (0);
  318. }
  319. /* Waiting for all existing transactions finished &       *
  320.    exiting. Dispatcher will not handle operation TRN_INIT */
  321. RPC_SVC_PROTO(i4_t,int,disp_finit,1)
  322. {
  323.   static i4_t rest;
  324.   
  325.   nocrtr_fl = 1;
  326.   finit_fl = 1;
  327.   RET (0);
  328. }
  329. i4_t 
  330. cp_lj_reg (i4_t to_sz, char *to)
  331. {
  332.   i4_t i, trn_exist = 0;
  333.   
  334.   fix_path = (char *) xmalloc (to_sz+1);
  335.   bcopy (to, fix_path, to_sz);
  336.   fix_path[to_sz] = 0;
  337.   
  338.   for (i = FIR_TRN_NUM; i <= maxusetran; i++)
  339.     if (tabtr[i].idprtr)
  340.       {
  341. trn_exist++;
  342. break;
  343.       }
  344.   if (trn_exist)
  345.     {
  346.       nocrtr_fl = 1;
  347.       fix_lj_fl = 1;
  348.     }
  349.   else
  350.     copylj();
  351.   
  352.   return 0;
  353. }
  354. /* Waiting for all existing transactions finished   *
  355.  * & log journal fixing. Dispatcher will not handle *
  356.  * operation TRN_INIT before it's fixing.           *
  357.  * Returns 0 if it's O'K or < 0 if error            */
  358. RPC_SVC_PROTO(i4_t,opq,copy_lj,1)
  359. {
  360.   static i4_t rest;
  361.   
  362.   RET (cp_lj_reg (in->opq_len, in->opq_val));
  363. }
  364. RPC_SVC_PROTO(i4_t,int,change_params,1)
  365. {
  366.   static i4_t int_res = 0;
  367.   
  368.   dyn_change_parameters();
  369.   return &int_res;
  370. }