SHM_Transporter.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:9k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #include <ndb_global.h>
  14. #include "SHM_Transporter.hpp"
  15. #include "TransporterInternalDefinitions.hpp"
  16. #include <TransporterCallback.hpp>
  17. #include <NdbSleep.h>
  18. #include <NdbOut.hpp>
  19. #include <InputStream.hpp>
  20. #include <OutputStream.hpp>
  21. extern int g_ndb_shm_signum;
  22. SHM_Transporter::SHM_Transporter(TransporterRegistry &t_reg,
  23.  const char *lHostName,
  24.  const char *rHostName, 
  25.  int r_port,
  26.  NodeId lNodeId,
  27.  NodeId rNodeId, 
  28.  bool checksum, 
  29.  bool signalId,
  30.  key_t _shmKey,
  31.  Uint32 _shmSize) :
  32.   Transporter(t_reg, tt_SHM_TRANSPORTER,
  33.       lHostName, rHostName, r_port, lNodeId, rNodeId,
  34.       0, false, checksum, signalId),
  35.   shmKey(_shmKey),
  36.   shmSize(_shmSize)
  37. {
  38.   _shmSegCreated = false;
  39.   _attached = false;
  40.   shmBuf = 0;
  41.   reader = 0;
  42.   writer = 0;
  43.   
  44.   setupBuffersDone=false;
  45. #ifdef DEBUG_TRANSPORTER
  46.   printf("shm key (%d - %d) = %dn", lNodeId, rNodeId, shmKey);
  47. #endif
  48.   m_signal_threshold = 4096;
  49. }
  50. SHM_Transporter::~SHM_Transporter(){
  51.   doDisconnect();
  52. }
  53. bool 
  54. SHM_Transporter::initTransporter(){
  55.   if (g_ndb_shm_signum)
  56.     return true;
  57.   return false;
  58. }
  59.     
  60. void
  61. SHM_Transporter::setupBuffers(){
  62.   Uint32 sharedSize = 0;
  63.   sharedSize += 28; //SHM_Reader::getSharedSize();
  64.   sharedSize += 28; //SHM_Writer::getSharedSize();
  65.   const Uint32 slack = MAX_MESSAGE_SIZE;
  66.   /**
  67.    *  NOTE: There is 7th shared variable in Win2k (sharedCountAttached).
  68.    */
  69.   Uint32 sizeOfBuffer = shmSize;
  70.   sizeOfBuffer -= 2*sharedSize;
  71.   sizeOfBuffer /= 2;
  72.   Uint32 * base1 = (Uint32*)shmBuf;
  73.   Uint32 * sharedReadIndex1 = base1;
  74.   Uint32 * sharedWriteIndex1 = base1 + 1;
  75.   serverStatusFlag = base1 + 4;
  76.   char * startOfBuf1 = shmBuf+sharedSize;
  77.   Uint32 * base2 = (Uint32*)(shmBuf + sizeOfBuffer + sharedSize);
  78.   Uint32 * sharedReadIndex2 = base2;
  79.   Uint32 * sharedWriteIndex2 = base2 + 1;
  80.   clientStatusFlag = base2 + 4;
  81.   char * startOfBuf2 = ((char *)base2)+sharedSize;
  82.   
  83.   if(isServer){
  84.     * serverStatusFlag = 0;
  85.     reader = new SHM_Reader(startOfBuf1, 
  86.     sizeOfBuffer,
  87.     slack,
  88.     sharedReadIndex1,
  89.     sharedWriteIndex1);
  90.     writer = new SHM_Writer(startOfBuf2, 
  91.     sizeOfBuffer,
  92.     slack,
  93.     sharedReadIndex2,
  94.     sharedWriteIndex2);
  95.     * sharedReadIndex1 = 0;
  96.     * sharedWriteIndex1 = 0;
  97.     * sharedReadIndex2 = 0;
  98.     * sharedWriteIndex2 = 0;
  99.     
  100.     reader->clear();
  101.     writer->clear();
  102.     
  103.     * serverStatusFlag = 1;
  104. #ifdef DEBUG_TRANSPORTER 
  105.     printf("-- (%d - %d) - Server -n", localNodeId, remoteNodeId);
  106.     printf("Reader at: %d (%p)n", startOfBuf1 - shmBuf, startOfBuf1);
  107.     printf("sharedReadIndex1 at %d (%p) = %dn", 
  108.    (char*)sharedReadIndex1-shmBuf, 
  109.    sharedReadIndex1, *sharedReadIndex1);
  110.     printf("sharedWriteIndex1 at %d (%p) = %dn", 
  111.    (char*)sharedWriteIndex1-shmBuf, 
  112.    sharedWriteIndex1, *sharedWriteIndex1);
  113.     printf("Writer at: %d (%p)n", startOfBuf2 - shmBuf, startOfBuf2);
  114.     printf("sharedReadIndex2 at %d (%p) = %dn", 
  115.    (char*)sharedReadIndex2-shmBuf, 
  116.    sharedReadIndex2, *sharedReadIndex2);
  117.     printf("sharedWriteIndex2 at %d (%p) = %dn", 
  118.    (char*)sharedWriteIndex2-shmBuf, 
  119.    sharedWriteIndex2, *sharedWriteIndex2);
  120.     printf("sizeOfBuffer = %dn", sizeOfBuffer);
  121. #endif
  122.   } else {
  123.     * clientStatusFlag = 0;
  124.     reader = new SHM_Reader(startOfBuf2, 
  125.     sizeOfBuffer,
  126.     slack,
  127.     sharedReadIndex2,
  128.     sharedWriteIndex2);
  129.     
  130.     writer = new SHM_Writer(startOfBuf1, 
  131.     sizeOfBuffer,
  132.     slack,
  133.     sharedReadIndex1,
  134.     sharedWriteIndex1);
  135.     
  136.     * sharedReadIndex2 = 0;
  137.     * sharedWriteIndex1 = 0;
  138.     
  139.     reader->clear();
  140.     writer->clear();
  141.     * clientStatusFlag = 1;
  142. #ifdef DEBUG_TRANSPORTER
  143.     printf("-- (%d - %d) - Client -n", localNodeId, remoteNodeId);
  144.     printf("Reader at: %d (%p)n", startOfBuf2 - shmBuf, startOfBuf2);
  145.     printf("sharedReadIndex2 at %d (%p) = %dn", 
  146.    (char*)sharedReadIndex2-shmBuf, 
  147.    sharedReadIndex2, *sharedReadIndex2);
  148.     printf("sharedWriteIndex2 at %d (%p) = %dn", 
  149.    (char*)sharedWriteIndex2-shmBuf, 
  150.    sharedWriteIndex2, *sharedWriteIndex2);
  151.     printf("Writer at: %d (%p)n", startOfBuf1 - shmBuf, startOfBuf1);
  152.     printf("sharedReadIndex1 at %d (%p) = %dn", 
  153.    (char*)sharedReadIndex1-shmBuf, 
  154.    sharedReadIndex1, *sharedReadIndex1);
  155.     printf("sharedWriteIndex1 at %d (%p) = %dn", 
  156.    (char*)sharedWriteIndex1-shmBuf, 
  157.    sharedWriteIndex1, *sharedWriteIndex1);
  158.     
  159.     printf("sizeOfBuffer = %dn", sizeOfBuffer);
  160. #endif
  161.   }
  162. #ifdef DEBUG_TRANSPORTER
  163.   printf("Mapping from %p to %pn", shmBuf, shmBuf+shmSize);
  164. #endif
  165. }
  166. bool
  167. SHM_Transporter::connect_server_impl(NDB_SOCKET_TYPE sockfd)
  168. {
  169.   DBUG_ENTER("SHM_Transporter::connect_server_impl");
  170.   SocketOutputStream s_output(sockfd);
  171.   SocketInputStream s_input(sockfd);
  172.   char buf[256];
  173.   // Create
  174.   if(!_shmSegCreated){
  175.     if (!ndb_shm_create()) {
  176.       report_error(TE_SHM_UNABLE_TO_CREATE_SEGMENT);
  177.       NDB_CLOSE_SOCKET(sockfd);
  178.       DBUG_RETURN(false);
  179.     }
  180.     _shmSegCreated = true;
  181.   }
  182.   // Attach
  183.   if(!_attached){
  184.     if (!ndb_shm_attach()) {
  185.       report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
  186.       NDB_CLOSE_SOCKET(sockfd);
  187.       DBUG_RETURN(false);
  188.     }
  189.     _attached = true;
  190.   }
  191.   // Send ok to client
  192.   s_output.println("shm server 1 ok: %d", 
  193.    m_transporter_registry.m_shm_own_pid);
  194.   
  195.   // Wait for ok from client
  196.   if (s_input.gets(buf, 256) == 0) 
  197.   {
  198.     NDB_CLOSE_SOCKET(sockfd);
  199.     DBUG_RETURN(false);
  200.   }
  201.   if(sscanf(buf, "shm client 1 ok: %d", &m_remote_pid) != 1)
  202.   {
  203.     NDB_CLOSE_SOCKET(sockfd);
  204.     DBUG_RETURN(false);
  205.   }
  206.   int r= connect_common(sockfd);
  207.   if (r) {
  208.     // Send ok to client
  209.     s_output.println("shm server 2 ok");
  210.     // Wait for ok from client
  211.     if (s_input.gets(buf, 256) == 0) {
  212.       NDB_CLOSE_SOCKET(sockfd);
  213.       DBUG_RETURN(false);
  214.     }
  215.     DBUG_PRINT("info", ("Successfully connected server to node %d",
  216.                 remoteNodeId)); 
  217.   }
  218.   NDB_CLOSE_SOCKET(sockfd);
  219.   DBUG_RETURN(r);
  220. }
  221. bool
  222. SHM_Transporter::connect_client_impl(NDB_SOCKET_TYPE sockfd)
  223. {
  224.   DBUG_ENTER("SHM_Transporter::connect_client_impl");
  225.   SocketInputStream s_input(sockfd);
  226.   SocketOutputStream s_output(sockfd);
  227.   char buf[256];
  228. #if 1
  229. #endif
  230.   // Wait for server to create and attach
  231.   if (s_input.gets(buf, 256) == 0) {
  232.     NDB_CLOSE_SOCKET(sockfd);
  233.     DBUG_PRINT("error", ("Server id %d did not attach",
  234.                 remoteNodeId));
  235.     DBUG_RETURN(false);
  236.   }
  237.   if(sscanf(buf, "shm server 1 ok: %d", &m_remote_pid) != 1)
  238.   {
  239.     NDB_CLOSE_SOCKET(sockfd);
  240.     DBUG_RETURN(false);
  241.   }
  242.   
  243.   // Create
  244.   if(!_shmSegCreated){
  245.     if (!ndb_shm_get()) {
  246.       NDB_CLOSE_SOCKET(sockfd);
  247.       DBUG_PRINT("error", ("Failed create of shm seg to node %d",
  248.                   remoteNodeId));
  249.       DBUG_RETURN(false);
  250.     }
  251.     _shmSegCreated = true;
  252.   }
  253.   // Attach
  254.   if(!_attached){
  255.     if (!ndb_shm_attach()) {
  256.       report_error(TE_SHM_UNABLE_TO_ATTACH_SEGMENT);
  257.       NDB_CLOSE_SOCKET(sockfd);
  258.       DBUG_PRINT("error", ("Failed attach of shm seg to node %d",
  259.                   remoteNodeId));
  260.       DBUG_RETURN(false);
  261.     }
  262.     _attached = true;
  263.   }
  264.   // Send ok to server
  265.   s_output.println("shm client 1 ok: %d", 
  266.    m_transporter_registry.m_shm_own_pid);
  267.   
  268.   int r= connect_common(sockfd);
  269.   
  270.   if (r) {
  271.     // Wait for ok from server
  272.     if (s_input.gets(buf, 256) == 0) {
  273.       NDB_CLOSE_SOCKET(sockfd);
  274.       DBUG_PRINT("error", ("No ok from server node %d",
  275.                   remoteNodeId));
  276.       DBUG_RETURN(false);
  277.     }
  278.     // Send ok to server
  279.     s_output.println("shm client 2 ok");
  280.     DBUG_PRINT("info", ("Successfully connected client to node %d",
  281.                 remoteNodeId)); 
  282.   }
  283.   NDB_CLOSE_SOCKET(sockfd);
  284.   DBUG_RETURN(r);
  285. }
  286. bool
  287. SHM_Transporter::connect_common(NDB_SOCKET_TYPE sockfd)
  288. {
  289.   if (!checkConnected()) {
  290.     DBUG_PRINT("error", ("Already connected to node %d",
  291.                 remoteNodeId));
  292.     return false;
  293.   }
  294.   
  295.   if(!setupBuffersDone) 
  296.   {
  297.     setupBuffers();
  298.     setupBuffersDone=true;
  299.   }
  300.   if(setupBuffersDone) 
  301.   {
  302.     NdbSleep_MilliSleep(m_timeOutMillis);
  303.     if(*serverStatusFlag == 1 && *clientStatusFlag == 1)
  304.     {
  305.       m_last_signal = 0;
  306.       return true;
  307.     }
  308.   }
  309.   DBUG_PRINT("error", ("Failed to set up buffers to node %d",
  310.               remoteNodeId));
  311.   return false;
  312. }
  313. void
  314. SHM_Transporter::doSend()
  315. {
  316.   if(m_last_signal)
  317.   {
  318.     m_last_signal = 0;
  319.     kill(m_remote_pid, g_ndb_shm_signum);
  320.   }
  321. }
  322. Uint32
  323. SHM_Transporter::get_free_buffer() const 
  324. {
  325.   return writer->get_free_buffer();
  326. }