Suma.hpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:16k
源码类别:

MySQL数据库

开发平台:

Visual C++

  1. /* Copyright (C) 2003 MySQL AB
  2.    This program is free software; you can redistribute it and/or modify
  3.    it under the terms of the GNU General Public License as published by
  4.    the Free Software Foundation; either version 2 of the License, or
  5.    (at your option) any later version.
  6.    This program is distributed in the hope that it will be useful,
  7.    but WITHOUT ANY WARRANTY; without even the implied warranty of
  8.    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
  9.    GNU General Public License for more details.
  10.    You should have received a copy of the GNU General Public License
  11.    along with this program; if not, write to the Free Software
  12.    Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA */
  13. #ifndef SUMA_H
  14. #define SUMA_H
  15. #include <ndb_limits.h>
  16. #include <SimulatedBlock.hpp>
  17. #include <NodeBitmask.hpp>
  18. #include <SLList.hpp>
  19. #include <DLList.hpp>
  20. #include <KeyTable.hpp>
  21. #include <DataBuffer.hpp>
  22. #include <SignalCounter.hpp>
  23. #include <AttributeHeader.hpp>
  24. #include <AttributeList.hpp>
  25. #include <signaldata/UtilSequence.hpp>
  26. #include <signaldata/SumaImpl.hpp>
  27. class SumaParticipant : public SimulatedBlock {
  28. protected:
  29.   SumaParticipant(const Configuration & conf);
  30.   virtual ~SumaParticipant();
  31.   BLOCK_DEFINES(SumaParticipant);
  32.   
  33. protected:
  34.   /**
  35.    * Private interface
  36.    */
  37.   void execSUB_CREATE_REQ(Signal* signal);
  38.   void execSUB_REMOVE_REQ(Signal* signal);
  39.   
  40.   void execSUB_START_REQ(Signal* signal);
  41.   void execSUB_STOP_REQ(Signal* signal);
  42.   
  43.   void execSUB_SYNC_REQ(Signal* signal);
  44.   void execSUB_ABORT_SYNC_REQ(Signal* signal);
  45.   void execSUB_STOP_CONF(Signal* signal);
  46.   void execSUB_STOP_REF(Signal* signal);
  47.  /**
  48.    * Dict interface
  49.    */
  50.   void execLIST_TABLES_REF(Signal* signal);
  51.   void execLIST_TABLES_CONF(Signal* signal);
  52.   void execGET_TABINFOREF(Signal* signal);
  53.   void execGET_TABINFO_CONF(Signal* signal);
  54. #if 0
  55.   void execGET_TABLEID_CONF(Signal* signal);
  56.   void execGET_TABLEID_REF(Signal* signal);
  57. #endif
  58.   /**
  59.    * Scan interface
  60.    */
  61.   void execSCAN_HBREP(Signal* signal);
  62.   void execSCAN_FRAGREF(Signal* signal);
  63.   void execSCAN_FRAGCONF(Signal* signal);
  64.   void execTRANSID_AI(Signal* signal);
  65.   void execSUB_SYNC_CONTINUE_REF(Signal* signal);
  66.   void execSUB_SYNC_CONTINUE_CONF(Signal* signal);
  67.   
  68.   /**
  69.    * Trigger logging
  70.    */
  71.   void execTRIG_ATTRINFO(Signal* signal);
  72.   void execFIRE_TRIG_ORD(Signal* signal);
  73.   void execSUB_GCP_COMPLETE_REP(Signal* signal);
  74.   void runSUB_GCP_COMPLETE_ACC(Signal* signal);
  75.   
  76.   /**
  77.    * DIH signals
  78.    */
  79.   void execDI_FCOUNTREF(Signal* signal);
  80.   void execDI_FCOUNTCONF(Signal* signal);
  81.   void execDIGETPRIMREF(Signal* signal);
  82.   void execDIGETPRIMCONF(Signal* signal);
  83.   /**
  84.    * Trigger administration
  85.    */
  86.   void execCREATE_TRIG_REF(Signal* signal);
  87.   void execCREATE_TRIG_CONF(Signal* signal);
  88.   void execDROP_TRIG_REF(Signal* signal);
  89.   void execDROP_TRIG_CONF(Signal* signal);
  90.   
  91.   /**
  92.    * continueb
  93.    */
  94.   void execCONTINUEB(Signal* signal);
  95. public:
  96.   typedef DataBuffer<15> TableList;
  97.   
  98.   union FragmentDescriptor { 
  99.     struct  {
  100.       Uint16 m_fragmentNo;
  101.       Uint16 m_nodeId;
  102.     } m_fragDesc;
  103.     Uint32 m_dummy;
  104.   };
  105.   
  106.   /**
  107.    * Used when sending SCAN_FRAG
  108.    */
  109.   union AttributeDescriptor {
  110.     struct {
  111.       Uint16 attrId;
  112.       Uint16 unused;
  113.     } m_attrDesc;
  114.     Uint32 m_dummy;
  115.   };
  116.   struct Table {
  117.     Table() { m_tableId = ~0; }
  118.     void release(SumaParticipant&);
  119.     union { Uint32 m_tableId; Uint32 key; };
  120.     Uint32 m_schemaVersion;
  121.     Uint32 m_hasTriggerDefined[3]; // Insert/Update/Delete
  122.     Uint32 m_triggerIds[3]; // Insert/Update/Delete
  123.     
  124.     /**
  125.      * Default order in which to ask for attributes during scan
  126.      *   1) Fixed, not nullable
  127.      *   2) Rest
  128.      */
  129.     DataBuffer<15>::Head m_attributes; // Attribute id's
  130.     
  131.     /**
  132.      * Fragments
  133.      */
  134.     DataBuffer<15>::Head m_fragments;  // Fragment descriptors
  135.     
  136.     /**
  137.      * Hash table stuff
  138.      */
  139.     Uint32 nextHash;
  140.     union { Uint32 prevHash; Uint32 nextPool; };
  141.     Uint32 hashValue() const {
  142.       return m_tableId;
  143.     }
  144.     bool equal(const Table& rec) const {
  145.       return m_tableId == rec.m_tableId;
  146.     }
  147.   };
  148.   typedef Ptr<Table> TablePtr;
  149.   /**
  150.    * Subscriptions
  151.    */
  152.   struct SyncRecord {
  153.     SyncRecord(SumaParticipant& s, DataBuffer<15>::DataBufferPool & p)
  154.       : m_locked(false), m_tableList(p), suma(s)
  155. #ifdef ERROR_INSERT
  156. , cerrorInsert(s.cerrorInsert)
  157. #endif
  158.     {}
  159.     
  160.     void release();
  161.     Uint32 m_subscriptionPtrI;
  162.     bool   m_locked;
  163.     bool   m_doSendSyncData;
  164.     bool   m_error;
  165.     TableList m_tableList;    // Tables to sync (snapshoted at beginning)
  166.     TableList::DataBufferIterator m_tableList_it;
  167.     /**
  168.      * Sync meta
  169.      */
  170.     void startMeta(Signal*);
  171.     void nextMeta(Signal*);
  172.     void completeMeta(Signal*);
  173.     
  174.     /**
  175.      * Create triggers
  176.      */
  177.     Uint32 m_latestTriggerId;
  178.     void startTrigger(Signal* signal);
  179.     void nextTrigger(Signal* signal);
  180.     void completeTrigger(Signal* signal);
  181.     void createAttributeMask(AttributeMask&, Table*);
  182.     
  183.     /**
  184.      * Drop triggers
  185.      */
  186.     void startDropTrigger(Signal* signal);
  187.     void nextDropTrigger(Signal* signal);
  188.     void completeDropTrigger(Signal* signal);
  189.     /**
  190.      * Sync data
  191.      */
  192.     Uint32 m_currentTable;          // Index in m_tableList
  193.     Uint32 m_currentFragment;       // Index in tabPtr.p->m_fragments
  194.     DataBuffer<15>::Head m_attributeList; // Attribute if other than default
  195.     DataBuffer<15>::Head m_tabList; // tables if other than default
  196.     
  197.     Uint32 m_currentTableId;        // Current table
  198.     Uint32 m_currentNoOfAttributes; // No of attributes for current table
  199.     void startScan(Signal*);
  200.     void nextScan(Signal*);
  201.     bool getNextFragment(TablePtr * tab, FragmentDescriptor * fd);
  202.     void completeScan(Signal*);
  203.     SumaParticipant & suma;
  204. #ifdef ERROR_INSERT
  205.     UintR &cerrorInsert;
  206. #endif
  207.     BlockNumber number() const { return suma.number(); }
  208.     void progError(int line, int cause, const char * extra) { 
  209.       suma.progError(line, cause, extra); 
  210.     }
  211.     
  212.     void runLIST_TABLES_CONF(Signal* signal);
  213.     void runGET_TABINFO_CONF(Signal* signal);    
  214.     void runGET_TABINFOREF(Signal* signal);
  215.     
  216.     void runDI_FCOUNTCONF(Signal* signal);
  217.     void runDIGETPRIMCONF(Signal* signal);
  218.     void runCREATE_TRIG_CONF(Signal* signal);
  219.     void runDROP_TRIG_CONF(Signal* signal);
  220.     void runDROP_TRIG_REF(Signal* signal);
  221.     void runDropTrig(Signal* signal, Uint32 triggerId, Uint32 tableId);
  222.     union { Uint32 nextPool; Uint32 nextList; Uint32 ptrI; };
  223.   };
  224.   friend struct SyncRecord;
  225.   
  226.   struct Subscription {
  227.     Uint32 m_subscriberRef;
  228.     Uint32 m_subscriberData;
  229.     Uint32 m_senderRef;
  230.     Uint32 m_senderData;
  231.     Uint32 m_subscriptionId;
  232.     Uint32 m_subscriptionKey;
  233.     Uint32 m_subscriptionType;
  234.     Uint32 m_coordinatorRef;
  235.     Uint32 m_syncPtrI;  // Active sync operation
  236.     Uint32 m_nSubscribers;
  237.     bool m_markRemove;
  238.     Uint32 nextHash;
  239.     union { Uint32 prevHash; Uint32 nextPool; };
  240.     Uint32 hashValue() const {
  241.       return m_subscriptionId + m_subscriptionKey;
  242.     }
  243.     bool equal(const Subscription & s) const {
  244.       return 
  245. m_subscriptionId == s.m_subscriptionId && 
  246. m_subscriptionKey == s.m_subscriptionKey;
  247.     }
  248.     /**
  249.      * The following holds the table names of tables included 
  250.      * in the subscription.
  251.      */
  252.     // TODO we've got to fix this, this is to inefficient. Tomas
  253.     char m_tables[MAX_TABLES];
  254. #if 0
  255.     char m_tableNames[MAX_TABLES][MAX_TAB_NAME_SIZE];
  256. #endif
  257.     /**
  258.      * "Iterator" used to iterate through m_tableNames
  259.      */
  260.     Uint32 m_maxTables;
  261.     Uint32 m_currentTable;
  262.   };
  263.   typedef Ptr<Subscription> SubscriptionPtr;
  264.   
  265.   struct Subscriber {
  266.     Uint32 m_senderRef;
  267.     Uint32 m_senderData;
  268.     Uint32 m_subscriberRef;
  269.     Uint32 m_subscriberData;
  270.     Uint32 m_subPtrI; //reference to subscription
  271.     Uint32 m_firstGCI; // first GCI to send
  272.     Uint32 m_lastGCI; // last acnowledged GCI
  273.     Uint32 nextList;
  274.     union { Uint32 nextPool; Uint32 prevList; };
  275.   };
  276.   typedef Ptr<Subscriber> SubscriberPtr;
  277.   struct Bucket {
  278.     bool active;
  279.     bool handover;
  280.     bool handover_started;
  281.     Uint32 handoverGCI;
  282.   };
  283. #define NO_OF_BUCKETS 24
  284.   struct Bucket c_buckets[NO_OF_BUCKETS];
  285.   bool c_handoverToDo;
  286.   Uint32 c_lastCompleteGCI;
  287.   /**
  288.    * 
  289.    */
  290.   DLList<Subscriber> c_metaSubscribers;
  291.   DLList<Subscriber> c_dataSubscribers;
  292.   DLList<Subscriber> c_prepDataSubscribers;
  293.   DLList<Subscriber> c_removeDataSubscribers;
  294.   /**
  295.    * Lists
  296.    */
  297.   KeyTable<Table> c_tables;
  298.   DLHashTable<Subscription> c_subscriptions;
  299.   
  300.   /**
  301.    * Pools
  302.    */
  303.   ArrayPool<Subscriber> c_subscriberPool;
  304.   ArrayPool<Table> c_tablePool_;
  305.   ArrayPool<Subscription> c_subscriptionPool;
  306.   ArrayPool<SyncRecord> c_syncPool;
  307.   DataBuffer<15>::DataBufferPool c_dataBufferPool;
  308.   /**
  309.    * for restarting Suma not to start sending data too early
  310.    */
  311.   bool c_restartLock;
  312.   /**
  313.    * for flagging that a GCI containg inconsistent data
  314.    * typically due to node failiure
  315.    */
  316.   Uint32 c_lastInconsistentGCI;
  317.   Uint32 c_nodeFailGCI;
  318.   NodeBitmask c_failedApiNodes;
  319.   
  320.   /**
  321.    * Functions
  322.    */
  323.   bool removeSubscribersOnNode(Signal *signal, Uint32 nodeId);
  324.   bool parseTable(Signal* signal, class GetTabInfoConf* conf, Uint32 tableId,
  325.   SyncRecord* syncPtr_p);
  326.   bool checkTableTriggers(SegmentedSectionPtr ptr);
  327.   void addTableId(Uint32 TableId,
  328.   SubscriptionPtr subPtr, SyncRecord *psyncRec);
  329.   void sendSubIdRef(Signal* signal, Uint32 errorCode);
  330.   void sendSubCreateConf(Signal* signal, Uint32 sender, SubscriptionPtr subPtr);  
  331.   void sendSubCreateRef(Signal* signal, const SubCreateReq& req, Uint32 errorCode);  
  332.   void sendSubStartRef(SubscriptionPtr subPtr, Signal* signal,
  333.        Uint32 errorCode, bool temporary = false);
  334.   void sendSubStartRef(Signal* signal,
  335.        Uint32 errorCode, bool temporary = false);
  336.   void sendSubStopRef(Signal* signal,
  337.       Uint32 errorCode, bool temporary = false);
  338.   void sendSubSyncRef(Signal* signal, Uint32 errorCode);  
  339.   void sendSubRemoveRef(Signal* signal, const SubRemoveReq& ref,
  340. Uint32 errorCode, bool temporary = false);
  341.   void sendSubStartComplete(Signal*, SubscriberPtr, Uint32, 
  342.     SubscriptionData::Part);
  343.   void sendSubStopComplete(Signal*, SubscriberPtr);
  344.   void sendSubStopReq(Signal* signal);
  345.   void completeSubRemoveReq(Signal* signal, SubscriptionPtr subPtr);
  346.   Uint32 getFirstGCI(Signal* signal);
  347.   Uint32 decideWhoToSend(Uint32 nBucket, Uint32 gci);
  348.   virtual Uint32 getStoreBucket(Uint32 v) = 0;
  349.   virtual Uint32 getResponsibleSumaNodeId(Uint32 D) = 0;
  350.   virtual Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true) = 0;
  351.   struct FailoverBuffer {
  352.     //    FailoverBuffer(DataBuffer<15>::DataBufferPool & p);
  353.     FailoverBuffer();
  354.     bool subTableData(Uint32 gci, Uint32 *src, int sz);
  355.     bool subGcpCompleteRep(Uint32 gci);
  356.     bool nodeFailRep();
  357.     //    typedef DataBuffer<15> GCIDataBuffer;
  358.     //    GCIDataBuffer                      m_GCIDataBuffer;
  359.     //    GCIDataBuffer::DataBufferIterator  m_GCIDataBuffer_it;
  360.     Uint32 *c_gcis;
  361.     int c_sz;
  362.     //    Uint32 *c_buf;
  363.     //    int c_buf_sz;
  364.     int c_first;
  365.     int c_next;
  366.     bool c_full;
  367.   } c_failoverBuffer;
  368.   /**
  369.    * Table admin
  370.    */
  371.   void convertNameToId( SubscriptionPtr subPtr, Signal * signal);
  372. };
  373. class Suma : public SumaParticipant {
  374.   BLOCK_DEFINES(Suma);
  375. public:
  376.   Suma(const Configuration & conf);
  377.   virtual ~Suma();
  378. private:
  379.   /**
  380.    * Public interface
  381.    */
  382.   void execCREATE_SUBSCRIPTION_REQ(Signal* signal);
  383.   void execDROP_SUBSCRIPTION_REQ(Signal* signal);
  384.   
  385.   void execSTART_SUBSCRIPTION_REQ(Signal* signal);
  386.   void execSTOP_SUBSCRIPTION_REQ(Signal* signal);
  387.   
  388.   void execSYNC_SUBSCRIPTION_REQ(Signal* signal);
  389.   void execABORT_SYNC_REQ(Signal* signal);
  390.   /**
  391.    * Framework signals
  392.    */
  393.   void getNodeGroupMembers(Signal* signal);
  394.   void execSTTOR(Signal* signal);
  395.   void sendSTTORRY(Signal*);
  396.   void execNDB_STTOR(Signal* signal);
  397.   void execDUMP_STATE_ORD(Signal* signal);
  398.   void execREAD_NODESCONF(Signal* signal);
  399.   void execNODE_FAILREP(Signal* signal);
  400.   void execINCL_NODEREQ(Signal* signal);
  401.   void execCONTINUEB(Signal* signal);
  402.   void execSIGNAL_DROPPED_REP(Signal* signal);
  403.   void execAPI_FAILREQ(Signal* signal) ;
  404.   void execSUB_GCP_COMPLETE_ACC(Signal* signal);
  405.   /**
  406.    * Controller interface
  407.    */
  408.   void execSUB_CREATE_REF(Signal* signal);
  409.   void execSUB_CREATE_CONF(Signal* signal);
  410.   void execSUB_DROP_REF(Signal* signal);
  411.   void execSUB_DROP_CONF(Signal* signal);
  412.   void execSUB_START_REF(Signal* signal);
  413.   void execSUB_START_CONF(Signal* signal);
  414.   void execSUB_STOP_REF(Signal* signal);
  415.   void execSUB_STOP_CONF(Signal* signal);
  416.   void execSUB_SYNC_REF(Signal* signal);
  417.   void execSUB_SYNC_CONF(Signal* signal);
  418.   
  419.   void execSUB_ABORT_SYNC_REF(Signal* signal);
  420.   void execSUB_ABORT_SYNC_CONF(Signal* signal);
  421.   void execSUMA_START_ME(Signal* signal);
  422.   void execSUMA_HANDOVER_REQ(Signal* signal);
  423.   void execSUMA_HANDOVER_CONF(Signal* signal);
  424.   /**
  425.    * Subscription generation interface
  426.    */
  427.   void createSequence(Signal* signal);
  428.   void createSequenceReply(Signal* signal,
  429.    UtilSequenceConf* conf,
  430.    UtilSequenceRef* ref);
  431.   void execUTIL_SEQUENCE_CONF(Signal* signal);  
  432.   void execUTIL_SEQUENCE_REF(Signal* signal);
  433.   void execCREATE_SUBID_REQ(Signal* signal);
  434.   
  435.   Uint32 getStoreBucket(Uint32 v);
  436.   Uint32 getResponsibleSumaNodeId(Uint32 D);
  437.   /**
  438.    * for Suma that is restarting another
  439.    */
  440.   struct Restart {
  441.     Restart(Suma& s);
  442.     Suma & suma;
  443.     bool c_okToStart[MAX_REPLICAS];
  444.     bool c_waitingToStart[MAX_REPLICAS];
  445.     DLHashTable<SumaParticipant::Subscription>::Iterator c_subPtr; // TODO  [MAX_REPLICAS] 
  446.     SubscriberPtr c_subbPtr; // TODO [MAX_REPLICAS] 
  447.     void progError(int line, int cause, const char * extra) { 
  448.       suma.progError(line, cause, extra); 
  449.     }
  450.     void resetNode(Uint32 sumaRef);
  451.     void runSUMA_START_ME(Signal*, Uint32 sumaRef);
  452.     void startNode(Signal*, Uint32 sumaRef);
  453.     void createSubscription(Signal* signal, Uint32 sumaRef);
  454.     void nextSubscription(Signal* signal, Uint32 sumaRef);
  455.     void completeSubscription(Signal* signal, Uint32 sumaRef);
  456.     void startSync(Signal* signal, Uint32 sumaRef);
  457.     void nextSync(Signal* signal, Uint32 sumaRef);
  458.     void completeSync(Signal* signal, Uint32 sumaRef);
  459.     void sendSubStartReq(SubscriptionPtr subPtr, SubscriberPtr subbPtr,
  460.  Signal* signal, Uint32 sumaRef);
  461.     void startSubscriber(Signal* signal, Uint32 sumaRef);
  462.     void nextSubscriber(Signal* signal, Uint32 sumaRef);
  463.     void completeSubscriber(Signal* signal, Uint32 sumaRef);
  464.     void completeRestartingNode(Signal* signal, Uint32 sumaRef);
  465.   } Restart;
  466. private:
  467.   friend class Restart;
  468.   struct SubCoordinator {
  469.     Uint32 m_subscriberRef;
  470.     Uint32 m_subscriberData;
  471.     
  472.     Uint32 m_subscriptionId;
  473.     Uint32 m_subscriptionKey;
  474.     
  475.     NdbNodeBitmask m_participants;
  476.     
  477.     Uint32 m_outstandingGsn;
  478.     SignalCounter m_outstandingRequests;
  479.     
  480.     Uint32 nextList;
  481.     union { Uint32 prevList; Uint32 nextPool; };
  482.   };
  483.   Ptr<SubCoordinator> SubCoordinatorPtr;
  484.   
  485.   struct Node {
  486.     Uint32 nodeId;
  487.     Uint32 alive;
  488.     Uint32 nextList;
  489.     union { Uint32 prevList; Uint32 nextPool; };
  490.   };
  491.   typedef Ptr<Node> NodePtr;
  492.   /**
  493.    * Variables
  494.    */
  495.   NodeId c_masterNodeId;
  496.   SLList<Node> c_nodes;
  497.   NdbNodeBitmask c_aliveNodes;
  498.   NdbNodeBitmask c_preparingNodes;
  499.   Uint32 RtoI(Uint32 sumaRef, bool dieOnNotFound = true);
  500.   /**
  501.    * for all Suma's to keep track of other Suma's in Node group
  502.    */
  503.   Uint32 c_nodeGroup;
  504.   Uint32 c_noNodesInGroup;
  505.   Uint32 c_idInNodeGroup;
  506.   NodeId c_nodesInGroup[MAX_REPLICAS];
  507.   /**
  508.    * don't seem to be used
  509.    */
  510.   ArrayPool<Node> c_nodePool;
  511.   ArrayPool<SubCoordinator> c_subCoordinatorPool;
  512.   DLList<SubCoordinator> c_runningSubscriptions;
  513. };
  514. inline Uint32
  515. Suma::RtoI(Uint32 sumaRef, bool dieOnNotFound) {
  516.   for (Uint32 i = 0; i < c_noNodesInGroup; i++) {
  517.     if (sumaRef == calcSumaBlockRef(c_nodesInGroup[i]))
  518.       return i;
  519.   }
  520.   ndbrequire(!dieOnNotFound);
  521.   return RNIL;
  522. }
  523. #endif