CPCD.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:10k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include <NdbOut.hpp>
  15. #include "APIService.hpp"
  16. #include "CPCD.hpp"
  17. #include <NdbMutex.h> 
  18. #include "common.hpp"
  19. extern const ParserRow<CPCDAPISession> commands[];
  20. CPCD::CPCD() {
  21.   loadingProcessList = false;
  22.   m_processes.clear();
  23.   m_monitor = NULL;
  24.   m_monitor = new Monitor(this);
  25.   m_procfile = "ndb_cpcd.db";
  26. }
  27. CPCD::~CPCD() {
  28.   if(m_monitor != NULL) {
  29.     delete m_monitor;
  30.     m_monitor = NULL;
  31.   }
  32. }
  33. int
  34. CPCD::findUniqueId() {
  35.   int id;
  36.   bool ok = false;
  37.   m_processes.lock();
  38.   
  39.   while(!ok) {
  40.     ok = true;
  41.     id = random() % 8192; /* Don't want so big numbers */
  42.     
  43.     if(id == 0)
  44.       ok = false;
  45.     for(size_t i = 0; i<m_processes.size(); i++) {
  46.       if(m_processes[i]->m_id == id)
  47. ok = false;
  48.     }
  49.   }
  50.   m_processes.unlock();
  51.   return id;
  52. }
  53. bool
  54. CPCD::defineProcess(RequestStatus * rs, Process * arg){
  55.   if(arg->m_id == -1)
  56.     arg->m_id = findUniqueId();
  57.   Guard tmp(m_processes);
  58.   for(size_t i = 0; i<m_processes.size(); i++) {
  59.     Process * proc = m_processes[i];
  60.     
  61.     if((strcmp(arg->m_name.c_str(), proc->m_name.c_str()) == 0) && 
  62.        (strcmp(arg->m_group.c_str(), proc->m_group.c_str()) == 0)) {
  63.       /* Identical names in the same group */
  64.       rs->err(AlreadyExists, "Name already exists");
  65.       return false;
  66.     }
  67.     if(arg->m_id == proc->m_id) {
  68.       /* Identical ID numbers */
  69.       rs->err(AlreadyExists, "Id already exists");
  70.       return false;
  71.     }
  72.   }
  73.   
  74.   m_processes.push_back(arg, false);
  75.   notifyChanges();
  76.   report(arg->m_id, CPCEvent::ET_PROC_USER_DEFINE);
  77.   return true;
  78. }
  79. bool
  80. CPCD::undefineProcess(CPCD::RequestStatus *rs, int id) {
  81.   Guard tmp(m_processes);
  82.   Process * proc = 0;
  83.   size_t i;
  84.   for(i = 0; i < m_processes.size(); i++) {
  85.     if(m_processes[i]->m_id == id) {
  86.       proc = m_processes[i];
  87.       break;
  88.     }
  89.   }
  90.   if(proc == 0){
  91.     rs->err(NotExists, "No such process");
  92.     return false;
  93.   }
  94.   switch(proc->m_status){
  95.   case RUNNING:
  96.   case STOPPED:
  97.   case STOPPING:
  98.   case STARTING:
  99.     proc->stop();
  100.     m_processes.erase(i, false /* Already locked */);
  101.   }
  102.   
  103.   
  104.   notifyChanges();
  105.   
  106.   report(id, CPCEvent::ET_PROC_USER_UNDEFINE);
  107.   return true;
  108. }
  109. bool
  110. CPCD::startProcess(CPCD::RequestStatus *rs, int id) {
  111.   Process * proc = 0;
  112.   {
  113.     Guard tmp(m_processes);
  114.     
  115.     for(size_t i = 0; i < m_processes.size(); i++) {
  116.       if(m_processes[i]->m_id == id) {
  117. proc = m_processes[i];
  118. break;
  119.       }
  120.     }
  121.     
  122.     if(proc == 0){
  123.       rs->err(NotExists, "No such process");
  124.       return false;
  125.     }
  126.     
  127.     switch(proc->m_status){
  128.     case STOPPED:
  129.       proc->m_status = STARTING;
  130.       if(proc->start() != 0){
  131. rs->err(Error, "Failed to start");
  132. return false;
  133.       }
  134.       break;
  135.     case STARTING:
  136.       rs->err(Error, "Already starting");
  137.       return false;
  138.     case RUNNING:
  139.       rs->err(Error, "Already started");
  140.       return false;
  141.     case STOPPING:
  142.       rs->err(Error, "Currently stopping");
  143.       return false;
  144.     }
  145.     
  146.     notifyChanges();
  147.   }
  148.   report(id, CPCEvent::ET_PROC_USER_START);
  149.   return true;
  150. }
  151. bool
  152. CPCD::stopProcess(CPCD::RequestStatus *rs, int id) {
  153.   Guard tmp(m_processes);
  154.   Process * proc = 0;
  155.   for(size_t i = 0; i < m_processes.size(); i++) {
  156.     if(m_processes[i]->m_id == id) {
  157.       proc = m_processes[i];
  158.       break;
  159.     }
  160.   }
  161.   if(proc == 0){
  162.     rs->err(NotExists, "No such process");
  163.     return false;
  164.   }
  165.   switch(proc->m_status){
  166.   case STARTING:
  167.   case RUNNING:
  168.     proc->stop();
  169.     break;
  170.   case STOPPED:
  171.     rs->err(AlreadyStopped, "Already stopped");
  172.     return false;
  173.     break;
  174.   case STOPPING:
  175.     rs->err(Error, "Already stopping");
  176.     return false;
  177.   }
  178.   
  179.   notifyChanges();
  180.   report(id, CPCEvent::ET_PROC_USER_START);
  181.   
  182.   return true;
  183. }
  184. bool
  185. CPCD::notifyChanges() {
  186.   bool ret = true;
  187.   if(!loadingProcessList)
  188.     ret = saveProcessList();
  189.   m_monitor->signal();
  190.   return ret;
  191. }
  192. /* Must be called with m_processlist locked */
  193. bool
  194. CPCD::saveProcessList(){
  195.   char newfile[PATH_MAX+4];
  196.   char oldfile[PATH_MAX+4];
  197.   char curfile[PATH_MAX];
  198.   FILE *f;
  199.   /* Create the filenames that we will use later */
  200.   BaseString::snprintf(newfile, sizeof(newfile), "%s.new", m_procfile.c_str());
  201.   BaseString::snprintf(oldfile, sizeof(oldfile), "%s.old", m_procfile.c_str());
  202.   BaseString::snprintf(curfile, sizeof(curfile), "%s", m_procfile.c_str());
  203.   f = fopen(newfile, "w");
  204.   if(f == NULL) {
  205.     /* XXX What should be done here? */
  206.     logger.critical("Cannot open `%s': %sn", newfile, strerror(errno));
  207.     return false;
  208.   }
  209.   for(size_t i = 0; i<m_processes.size(); i++){
  210.     m_processes[i]->print(f);
  211.     fprintf(f, "n");
  212.     if(m_processes[i]->m_processType == TEMPORARY){
  213.       /**
  214.        * Interactive process should never be "restarted" on cpcd restart
  215.        */
  216.       continue;
  217.     }
  218.     
  219.     if(m_processes[i]->m_status == RUNNING || 
  220.        m_processes[i]->m_status == STARTING){
  221.       fprintf(f, "start processnid: %dnn", m_processes[i]->m_id);
  222.     }
  223.   }
  224.   
  225.   fclose(f);
  226.   f = NULL;
  227.   
  228.   /* This will probably only work on reasonably Unix-like systems. You have
  229.    * been warned...
  230.    * 
  231.    * The motivation behind all this link()ing is that the daemon might
  232.    * crash right in the middle of updating the configuration file, and in
  233.    * that case we want to be sure that the old file is around until we are
  234.    * guaranteed that there is always at least one copy of either the old or
  235.    * the new configuration file left.
  236.    */
  237.   /* Remove an old config file if it exists */
  238.   unlink(oldfile);
  239.   if(link(curfile, oldfile) != 0) /* make a backup of the running config */
  240.     logger.error("Cannot rename '%s' -> '%s'", curfile, oldfile);
  241.   else {
  242.     if(unlink(curfile) != 0) { /* remove the running config file */
  243.       logger.critical("Cannot remove file '%s'", curfile);
  244.       return false;
  245.     }
  246.   }
  247.   if(link(newfile, curfile) != 0) { /* put the new config file in place */
  248.     printf("-->%dn", __LINE__);
  249.     logger.critical("Cannot rename '%s' -> '%s': %s", 
  250.     curfile, newfile, strerror(errno));
  251.     return false;
  252.   }
  253.   /* XXX Ideally we would fsync() the directory here, but I'm not sure if
  254.    * that actually works.
  255.    */
  256.   unlink(newfile); /* remove the temporary file */
  257.   unlink(oldfile); /* remove the old file */
  258.   logger.info("Process list saved as '%s'", curfile);
  259.   return true;
  260. }
  261. bool
  262. CPCD::loadProcessList(){
  263.   BaseString secondfile;
  264.   FILE *f;
  265.   loadingProcessList = true;
  266.   secondfile.assfmt("%s.new", m_procfile.c_str());
  267.   /* Try to open the config file */
  268.   f = fopen(m_procfile.c_str(), "r");
  269.   /* If it did not exist, try to open the backup. See the saveProcessList()
  270.    * method for an explanation why it is done this way.
  271.    */
  272.   if(f == NULL) {
  273.     f = fopen(secondfile.c_str(), "r");
  274.     
  275.     if(f == NULL) {
  276.       /* XXX What to do here? */
  277.       logger.info("Configuration file `%s' not found",
  278.   m_procfile.c_str());
  279.       logger.info("Starting with empty configuration");
  280.       loadingProcessList = false;
  281.       return false;
  282.     } else {
  283.       logger.info("Configuration file `%s' missing",
  284.   m_procfile.c_str());
  285.       logger.info("Backup configuration file `%s' is used",
  286.   secondfile.c_str());
  287.       /* XXX Maybe we should just rename the backup file to the official
  288.        * name, and be done with it?
  289.        */
  290.     }
  291.   }
  292.   CPCDAPISession sess(f, *this);
  293.   sess.loadFile();
  294.   loadingProcessList = false;
  295.   size_t i;
  296.   Vector<int> temporary;
  297.   for(i = 0; i<m_processes.size(); i++){
  298.     Process * proc = m_processes[i];
  299.     proc->readPid();
  300.     if(proc->m_processType == TEMPORARY){
  301.       temporary.push_back(proc->m_id);
  302.     }
  303.   }
  304.   
  305.   for(i = 0; i<temporary.size(); i++){
  306.     RequestStatus rs;
  307.     undefineProcess(&rs, temporary[i]);
  308.   }
  309.   
  310.   /* Don't call notifyChanges here, as that would save the file we just
  311.      loaded */
  312.   m_monitor->signal();
  313.   return true;
  314. }
  315. MutexVector<CPCD::Process *> *
  316. CPCD::getProcessList() {
  317.   return &m_processes;
  318. }
  319. void
  320. CPCD::RequestStatus::err(enum RequestStatusCode status, const char *msg) {
  321.   m_status = status;
  322.   BaseString::snprintf(m_errorstring, sizeof(m_errorstring), "%s", msg);
  323. }
  324. #if 0
  325. void
  326. CPCD::sigchild(int pid){
  327.   m_processes.lock(); 
  328.   for(size_t i = 0; i<m_processes.size(); i++){
  329.     if(m_processes[i].m_pid == pid){
  330.     }
  331.   }
  332.   wait(pid, 0, 0);
  333. }
  334. #endif
  335.   /** Register event subscriber */
  336. void
  337. CPCD::do_register(EventSubscriber * sub){
  338.   m_subscribers.lock();
  339.   m_subscribers.push_back(sub, false);
  340.   m_subscribers.unlock();  
  341. }
  342. EventSubscriber*
  343. CPCD::do_unregister(EventSubscriber * sub){
  344.   m_subscribers.lock();
  345.   for(size_t i = 0; i<m_subscribers.size(); i++){
  346.     if(m_subscribers[i] == sub){
  347.       m_subscribers.erase(i);
  348.       m_subscribers.unlock();  
  349.       return sub;
  350.     }
  351.   }
  352.   m_subscribers.unlock();  
  353.   return 0;
  354. }
  355. void
  356. CPCD::report(int id, CPCEvent::EventType t){
  357.   CPCEvent e;
  358.   e.m_time = time(0);
  359.   e.m_proc = id;
  360.   e.m_type = t;
  361.   m_subscribers.lock();
  362.   for(size_t i = 0; i<m_subscribers.size(); i++){
  363.     (* m_subscribers[i]).report(e);
  364.   }
  365.   m_subscribers.unlock();
  366. }
  367. template class MutexVector<EventSubscriber*>;