TransporterFacade.cpp
上传用户:romrleung
上传日期:2022-05-23
资源大小:18897k
文件大小:31k
源码类别:
MySQL数据库
开发平台:
Visual C++
- /* Copyright (C) 2003 MySQL AB
- This program is free software; you can redistribute it and/or modify
- it under the terms of the GNU General Public License as published by
- the Free Software Foundation; either version 2 of the License, or
- (at your option) any later version.
- This program is distributed in the hope that it will be useful,
- but WITHOUT ANY WARRANTY; without even the implied warranty of
- MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- GNU General Public License for more details.
- You should have received a copy of the GNU General Public License
- along with this program; if not, write to the Free Software
- Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
- #include <ndb_global.h>
- #include <my_pthread.h>
- #include <ndb_limits.h>
- #include "TransporterFacade.hpp"
- #include "ClusterMgr.hpp"
- #include <IPCConfig.hpp>
- #include <TransporterCallback.hpp>
- #include <TransporterRegistry.hpp>
- #include "NdbApiSignal.hpp"
- #include <NdbOut.hpp>
- #include <NdbEnv.h>
- #include <NdbSleep.h>
- #include "API.hpp"
- #include <ConfigRetriever.hpp>
- #include <mgmapi_config_parameters.h>
- #include <mgmapi_configuration.hpp>
- #include <NdbConfig.h>
- #include <ndb_version.h>
- #include <SignalLoggerManager.hpp>
- #include <kernel/ndb_limits.h>
- #include <signaldata/AlterTable.hpp>
- //#define REPORT_TRANSPORTER
- //#define API_TRACE;
- static int numberToIndex(int number)
- {
- return number - MIN_API_BLOCK_NO;
- }
- static int indexToNumber(int index)
- {
- return index + MIN_API_BLOCK_NO;
- }
- #if defined DEBUG_TRANSPORTER
- #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
- #else
- #define TRP_DEBUG(t)
- #endif
- TransporterFacade* TransporterFacade::theFacadeInstance = NULL;
- /*****************************************************************************
- * Call back functions
- *****************************************************************************/
- void
- reportError(void * callbackObj, NodeId nodeId, TransporterError errorCode){
- #ifdef REPORT_TRANSPORTER
- ndbout_c("REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d)",
- (int)nodeId, (int)errorCode);
- #endif
- if(errorCode & 0x8000) {
- ndbout_c("reportError (%d, %d)n", (int)nodeId, (int)errorCode);
- ((TransporterFacade*)(callbackObj))->doDisconnect(nodeId);
- }
- }
- /**
- * Report average send length in bytes (4096 last sends)
- */
- void
- reportSendLen(void * callbackObj, NodeId nodeId, Uint32 count, Uint64 bytes){
- #ifdef REPORT_TRANSPORTER
- ndbout_c("REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
- (int)nodeId, (Uint32)(bytes/count));
- #endif
- (void)nodeId;
- (void)count;
- (void)bytes;
- }
- /**
- * Report average receive length in bytes (4096 last receives)
- */
- void
- reportReceiveLen(void * callbackObj,
- NodeId nodeId, Uint32 count, Uint64 bytes){
- #ifdef REPORT_TRANSPORTER
- ndbout_c("REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
- (int)nodeId, (Uint32)(bytes/count));
- #endif
- (void)nodeId;
- (void)count;
- (void)bytes;
- }
- /**
- * Report connection established
- */
- void
- reportConnect(void * callbackObj, NodeId nodeId){
- #ifdef REPORT_TRANSPORTER
- ndbout_c("REPORT_TRANSP: API reportConnect (nodeId=%d)", (int)nodeId);
- #endif
- ((TransporterFacade*)(callbackObj))->reportConnected(nodeId);
- // TransporterFacade::instance()->reportConnected(nodeId);
- }
- /**
- * Report connection broken
- */
- void
- reportDisconnect(void * callbackObj, NodeId nodeId, Uint32 error){
- #ifdef REPORT_TRANSPORTER
- ndbout_c("REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (int)nodeId);
- #endif
- ((TransporterFacade*)(callbackObj))->reportDisconnected(nodeId);
- //TransporterFacade::instance()->reportDisconnected(nodeId);
- }
- /****************************************************************************
- *
- *****************************************************************************/
- /**
- * Report connection broken
- */
- int checkJobBuffer() {
- return 0;
- }
- #ifdef API_TRACE
- static const char * API_SIGNAL_LOG = "API_SIGNAL_LOG";
- static const char * apiSignalLog = 0;
- static SignalLoggerManager signalLogger;
- static
- inline
- bool
- setSignalLog(){
- signalLogger.flushSignalLog();
- const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (char *)0, 0);
- if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
- return true;
- } else if(tmp == 0 && apiSignalLog == 0){
- return false;
- } else if(tmp == 0 && apiSignalLog != 0){
- signalLogger.setOutputStream(0);
- apiSignalLog = tmp;
- return false;
- } else if(tmp !=0){
- if (strcmp(tmp, "-") == 0)
- signalLogger.setOutputStream(stdout);
- #ifndef DBUG_OFF
- else if (strcmp(tmp, "+") == 0)
- signalLogger.setOutputStream(DBUG_FILE);
- #endif
- else
- signalLogger.setOutputStream(fopen(tmp, "w"));
- apiSignalLog = tmp;
- return true;
- }
- return false;
- }
- #ifdef TRACE_APIREGREQ
- #define TRACE_GSN(gsn) true
- #else
- #define TRACE_GSN(gsn) (gsn != GSN_API_REGREQ && gsn != GSN_API_REGCONF)
- #endif
- #endif
- /**
- * The execute function : Handle received signal
- */
- void
- execute(void * callbackObj, SignalHeader * const header,
- Uint8 prio, Uint32 * const theData,
- LinearSectionPtr ptr[3]){
- TransporterFacade * theFacade = (TransporterFacade*)callbackObj;
- TransporterFacade::ThreadData::Object_Execute oe;
- Uint32 tRecBlockNo = header->theReceiversBlockNumber;
- #ifdef API_TRACE
- if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
- signalLogger.executeSignal(* header,
- prio,
- theData,
- theFacade->ownId(),
- ptr, header->m_noOfSections);
- signalLogger.flushSignalLog();
- }
- #endif
- if (tRecBlockNo >= MIN_API_BLOCK_NO) {
- oe = theFacade->m_threads.get(tRecBlockNo);
- if (oe.m_object != 0 && oe.m_executeFunction != 0) {
- /**
- * Handle received signal immediately to avoid any unnecessary
- * copying of data, allocation of memory and other things. Copying
- * of data could be interesting to support several priority levels
- * and to support a special memory structure when executing the
- * signals. Neither of those are interesting when receiving data
- * in the NDBAPI. The NDBAPI will thus read signal data directly as
- * it was written by the sender (SCI sender is other node, Shared
- * memory sender is other process and TCP/IP sender is the OS that
- * writes the TCP/IP message into a message buffer).
- */
- NdbApiSignal tmpSignal(*header);
- NdbApiSignal * tSignal = &tmpSignal;
- tSignal->setDataPtr(theData);
- (* oe.m_executeFunction) (oe.m_object, tSignal, ptr);
- }//if
- } else if (tRecBlockNo == API_PACKED) {
- /**
- * Block number == 2047 is used to signal a signal that consists of
- * multiple instances of the same signal. This is an effort to
- * package the signals so as to avoid unnecessary communication
- * overhead since TCP/IP has a great performance impact.
- */
- Uint32 Tlength = header->theLength;
- Uint32 Tsent = 0;
- /**
- * Since it contains at least two data packets we will first
- * copy the signal data to safe place.
- */
- while (Tsent < Tlength) {
- Uint32 Theader = theData[Tsent];
- Tsent++;
- Uint32 TpacketLen = (Theader & 0x1F) + 3;
- tRecBlockNo = Theader >> 16;
- if (TpacketLen <= 25) {
- if ((TpacketLen + Tsent) <= Tlength) {
- /**
- * Set the data length of the signal and the receivers block
- * reference and then call the API.
- */
- header->theLength = TpacketLen;
- header->theReceiversBlockNumber = tRecBlockNo;
- Uint32* tDataPtr = &theData[Tsent];
- Tsent += TpacketLen;
- if (tRecBlockNo >= MIN_API_BLOCK_NO) {
- oe = theFacade->m_threads.get(tRecBlockNo);
- if(oe.m_object != 0 && oe.m_executeFunction != 0){
- NdbApiSignal tmpSignal(*header);
- NdbApiSignal * tSignal = &tmpSignal;
- tSignal->setDataPtr(tDataPtr);
- (*oe.m_executeFunction)(oe.m_object, tSignal, 0);
- }
- }
- }
- }
- }
- return;
- } else if (tRecBlockNo == API_CLUSTERMGR) {
- /**
- * The signal was aimed for the Cluster Manager.
- * We handle it immediately here.
- */
- ClusterMgr * clusterMgr = theFacade->theClusterMgr;
- const Uint32 gsn = header->theVerId_signalNumber;
- switch (gsn){
- case GSN_API_REGREQ:
- clusterMgr->execAPI_REGREQ(theData);
- break;
- case GSN_API_REGCONF:
- clusterMgr->execAPI_REGCONF(theData);
- break;
- case GSN_API_REGREF:
- clusterMgr->execAPI_REGREF(theData);
- break;
- case GSN_NODE_FAILREP:
- clusterMgr->execNODE_FAILREP(theData);
- break;
- case GSN_NF_COMPLETEREP:
- clusterMgr->execNF_COMPLETEREP(theData);
- break;
- case GSN_ARBIT_STARTREQ:
- if (theFacade->theArbitMgr != NULL)
- theFacade->theArbitMgr->doStart(theData);
- break;
- case GSN_ARBIT_CHOOSEREQ:
- if (theFacade->theArbitMgr != NULL)
- theFacade->theArbitMgr->doChoose(theData);
- break;
- case GSN_ARBIT_STOPORD:
- if(theFacade->theArbitMgr != NULL)
- theFacade->theArbitMgr->doStop(theData);
- break;
- case GSN_ALTER_TABLE_REP:
- {
- const AlterTableRep* rep = (const AlterTableRep*)theData;
- theFacade->m_globalDictCache.lock();
- theFacade->m_globalDictCache.
- alter_table_rep((const char*)ptr[0].p,
- rep->tableId,
- rep->tableVersion,
- rep->changeType == AlterTableRep::CT_ALTERED);
- theFacade->m_globalDictCache.unlock();
- }
- default:
- break;
- }
- return;
- } else {
- ; // Ignore all other block numbers.
- if(header->theVerId_signalNumber!=3) {
- TRP_DEBUG( "TransporterFacade received signal to unknown block no." );
- ndbout << "BLOCK NO: " << tRecBlockNo << " sig "
- << header->theVerId_signalNumber << endl;
- abort();
- }
- }
- }
- // These symbols are needed, but not used in the API
- void
- SignalLoggerManager::printSegmentedSection(FILE *, const SignalHeader &,
- const SegmentedSectionPtr ptr[3],
- unsigned i){
- abort();
- }
- void
- copy(Uint32 * & insertPtr,
- class SectionSegmentPool & thePool, const SegmentedSectionPtr & _ptr){
- abort();
- }
- /**
- * Note that this function need no locking since its
- * only called from the constructor of Ndb (the NdbObject)
- *
- * Which is protected by a mutex
- */
- int
- TransporterFacade::start_instance(int nodeId,
- const ndb_mgm_configuration* props)
- {
- if (! theFacadeInstance->init(nodeId, props)) {
- return -1;
- }
- /**
- * Install signal handler for SIGPIPE
- *
- * This due to the fact that a socket connection might have
- * been closed in between a select and a corresponding send
- */
- #if !defined NDB_OSE && !defined NDB_SOFTOSE && !defined NDB_WIN32
- signal(SIGPIPE, SIG_IGN);
- #endif
- return 0;
- }
- /**
- * Note that this function need no locking since its
- * only called from the destructor of Ndb (the NdbObject)
- *
- * Which is protected by a mutex
- */
- void
- TransporterFacade::stop_instance(){
- DBUG_ENTER("TransporterFacade::stop_instance");
- if(theFacadeInstance)
- theFacadeInstance->doStop();
- DBUG_VOID_RETURN;
- }
- void
- TransporterFacade::doStop(){
- DBUG_ENTER("TransporterFacade::doStop");
- /**
- * First stop the ClusterMgr because it needs to send one more signal
- * and also uses theFacadeInstance to lock/unlock theMutexPtr
- */
- if (theClusterMgr != NULL) theClusterMgr->doStop();
- if (theArbitMgr != NULL) theArbitMgr->doStop(NULL);
- /**
- * Now stop the send and receive threads
- */
- void *status;
- theStopReceive = 1;
- if (theReceiveThread) {
- NdbThread_WaitFor(theReceiveThread, &status);
- NdbThread_Destroy(&theReceiveThread);
- }
- if (theSendThread) {
- NdbThread_WaitFor(theSendThread, &status);
- NdbThread_Destroy(&theSendThread);
- }
- DBUG_VOID_RETURN;
- }
- extern "C"
- void*
- runSendRequest_C(void * me)
- {
- ((TransporterFacade*) me)->threadMainSend();
- return 0;
- }
- void TransporterFacade::threadMainSend(void)
- {
- theTransporterRegistry->startSending();
- if (!theTransporterRegistry->start_clients()){
- ndbout_c("Unable to start theTransporterRegistry->start_clients");
- exit(0);
- }
- m_socket_server.startServer();
- while(!theStopReceive) {
- NdbSleep_MilliSleep(10);
- NdbMutex_Lock(theMutexPtr);
- if (sendPerformedLastInterval == 0) {
- theTransporterRegistry->performSend();
- }
- sendPerformedLastInterval = 0;
- NdbMutex_Unlock(theMutexPtr);
- }
- theTransporterRegistry->stopSending();
- m_socket_server.stopServer();
- m_socket_server.stopSessions(true);
- theTransporterRegistry->stop_clients();
- }
- extern "C"
- void*
- runReceiveResponse_C(void * me)
- {
- ((TransporterFacade*) me)->threadMainReceive();
- return 0;
- }
- void TransporterFacade::threadMainReceive(void)
- {
- theTransporterRegistry->startReceiving();
- NdbMutex_Lock(theMutexPtr);
- theTransporterRegistry->update_connections();
- NdbMutex_Unlock(theMutexPtr);
- while(!theStopReceive) {
- for(int i = 0; i<10; i++){
- const int res = theTransporterRegistry->pollReceive(10);
- if(res > 0){
- NdbMutex_Lock(theMutexPtr);
- theTransporterRegistry->performReceive();
- NdbMutex_Unlock(theMutexPtr);
- }
- }
- NdbMutex_Lock(theMutexPtr);
- theTransporterRegistry->update_connections();
- NdbMutex_Unlock(theMutexPtr);
- }//while
- theTransporterRegistry->stopReceiving();
- }
- TransporterFacade::TransporterFacade() :
- theTransporterRegistry(0),
- theStopReceive(0),
- theSendThread(NULL),
- theReceiveThread(NULL),
- m_fragmented_signal_id(0)
- {
- DBUG_ENTER("TransporterFacade::TransporterFacade");
- theOwnId = 0;
- theMutexPtr = NdbMutex_Create();
- sendPerformedLastInterval = 0;
- checkCounter = 4;
- currentSendLimit = 1;
- theClusterMgr = NULL;
- theArbitMgr = NULL;
- theStartNodeId = 1;
- m_scan_batch_size= MAX_SCAN_BATCH_SIZE;
- m_batch_byte_size= SCAN_BATCH_SIZE;
- m_batch_size= DEF_BATCH_SIZE;
- m_max_trans_id = 0;
- theClusterMgr = new ClusterMgr(* this);
- DBUG_VOID_RETURN;
- }
- bool
- TransporterFacade::init(Uint32 nodeId, const ndb_mgm_configuration* props)
- {
- DBUG_ENTER("TransporterFacade::init");
- theOwnId = nodeId;
- theTransporterRegistry = new TransporterRegistry(this);
- const int res = IPCConfig::configureTransporters(nodeId,
- * props,
- * theTransporterRegistry);
- if(res <= 0){
- TRP_DEBUG( "configureTransporters returned 0 or less" );
- DBUG_RETURN(false);
- }
- ndb_mgm_configuration_iterator iter(* props, CFG_SECTION_NODE);
- iter.first();
- theClusterMgr->init(iter);
- /**
- * Unless there is a "Name", the initiated transporter is within
- * an NDB Cluster. (If "Name" is defined, then the transporter
- * is used to connect to a different system, i.e. NDB Cluster.)
- */
- #if 0
- if (!props->contains("Name")) {
- #endif
- iter.first();
- if(iter.find(CFG_NODE_ID, nodeId)){
- TRP_DEBUG( "Node info missing from config." );
- DBUG_RETURN(false);
- }
- Uint32 rank = 0;
- if(!iter.get(CFG_NODE_ARBIT_RANK, &rank) && rank>0){
- theArbitMgr = new ArbitMgr(* this);
- theArbitMgr->setRank(rank);
- Uint32 delay = 0;
- iter.get(CFG_NODE_ARBIT_DELAY, &delay);
- theArbitMgr->setDelay(delay);
- }
- Uint32 scan_batch_size= 0;
- if (!iter.get(CFG_MAX_SCAN_BATCH_SIZE, &scan_batch_size)) {
- m_scan_batch_size= scan_batch_size;
- }
- Uint32 batch_byte_size= 0;
- if (!iter.get(CFG_BATCH_BYTE_SIZE, &batch_byte_size)) {
- m_batch_byte_size= batch_byte_size;
- }
- Uint32 batch_size= 0;
- if (!iter.get(CFG_BATCH_SIZE, &batch_size)) {
- m_batch_size= batch_size;
- }
- #if 0
- }
- #endif
- if (!theTransporterRegistry->start_service(m_socket_server)){
- ndbout_c("Unable to start theTransporterRegistry->start_service");
- DBUG_RETURN(false);
- }
- theReceiveThread = NdbThread_Create(runReceiveResponse_C,
- (void**)this,
- 32768,
- "ndb_receive",
- NDB_THREAD_PRIO_LOW);
- theSendThread = NdbThread_Create(runSendRequest_C,
- (void**)this,
- 32768,
- "ndb_send",
- NDB_THREAD_PRIO_LOW);
- theClusterMgr->startThread();
- #ifdef API_TRACE
- signalLogger.logOn(true, 0, SignalLoggerManager::LogInOut);
- #endif
- DBUG_RETURN(true);
- }
- void
- TransporterFacade::connected()
- {
- DBUG_ENTER("TransporterFacade::connected");
- Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, numberToRef(indexToNumber(i), theOwnId), true, true);
- }
- }
- DBUG_VOID_RETURN;
- }
- void
- TransporterFacade::ReportNodeDead(NodeId tNodeId)
- {
- /**
- * When a node fails we must report this to each Ndb object.
- * The function that is used for communicating node failures is called.
- * This is to ensure that the Ndb objects do not think their connections
- * are correct after a failure followed by a restart.
- * After the restart the node is up again and the Ndb object
- * might not have noticed the failure.
- */
- Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, false, false);
- }
- }
- }
- void
- TransporterFacade::ReportNodeFailureComplete(NodeId tNodeId)
- {
- /**
- * When a node fails we must report this to each Ndb object.
- * The function that is used for communicating node failures is called.
- * This is to ensure that the Ndb objects do not think their connections
- * are correct after a failure followed by a restart.
- * After the restart the node is up again and the Ndb object
- * might not have noticed the failure.
- */
- DBUG_ENTER("TransporterFacade::ReportNodeFailureComplete");
- DBUG_PRINT("enter",("nodeid= %d", tNodeId));
- Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, false, true);
- }
- }
- DBUG_VOID_RETURN;
- }
- void
- TransporterFacade::ReportNodeAlive(NodeId tNodeId)
- {
- /**
- * When a node fails we must report this to each Ndb object.
- * The function that is used for communicating node failures is called.
- * This is to ensure that the Ndb objects do not think there connections
- * are correct after a failure
- * followed by a restart.
- * After the restart the node is up again and the Ndb object
- * might not have noticed the failure.
- */
- Uint32 sz = m_threads.m_statusNext.size();
- for (Uint32 i = 0; i < sz ; i ++) {
- if (m_threads.getInUse(i)){
- void * obj = m_threads.m_objectExecute[i].m_object;
- NodeStatusFunction RegPC = m_threads.m_statusFunction[i];
- (*RegPC) (obj, tNodeId, true, false);
- }
- }
- }
- int
- TransporterFacade::close(BlockNumber blockNumber, Uint64 trans_id)
- {
- NdbMutex_Lock(theMutexPtr);
- Uint32 low_bits = (Uint32)trans_id;
- m_max_trans_id = m_max_trans_id > low_bits ? m_max_trans_id : low_bits;
- close_local(blockNumber);
- NdbMutex_Unlock(theMutexPtr);
- return 0;
- }
- int
- TransporterFacade::close_local(BlockNumber blockNumber){
- m_threads.close(blockNumber);
- return 0;
- }
- int
- TransporterFacade::open(void* objRef,
- ExecuteFunction fun,
- NodeStatusFunction statusFun)
- {
- DBUG_ENTER("TransporterFacade::open");
- int r= m_threads.open(objRef, fun, statusFun);
- if (r < 0)
- DBUG_RETURN(r);
- #if 1
- if (theOwnId > 0) {
- (*statusFun)(objRef, numberToRef(r, theOwnId), true, true);
- }
- #endif
- DBUG_RETURN(r);
- }
- TransporterFacade::~TransporterFacade()
- {
- DBUG_ENTER("TransporterFacade::~TransporterFacade");
- NdbMutex_Lock(theMutexPtr);
- delete theClusterMgr;
- delete theArbitMgr;
- delete theTransporterRegistry;
- NdbMutex_Unlock(theMutexPtr);
- NdbMutex_Destroy(theMutexPtr);
- #ifdef API_TRACE
- signalLogger.setOutputStream(0);
- #endif
- DBUG_VOID_RETURN;
- }
- void
- TransporterFacade::calculateSendLimit()
- {
- Uint32 Ti;
- Uint32 TthreadCount = 0;
- Uint32 sz = m_threads.m_statusNext.size();
- for (Ti = 0; Ti < sz; Ti++) {
- if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
- TthreadCount++;
- m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
- }
- }
- currentSendLimit = TthreadCount;
- if (currentSendLimit == 0) {
- currentSendLimit = 1;
- }
- checkCounter = currentSendLimit << 2;
- }
- //-------------------------------------------------
- // Force sending but still report the sending to the
- // adaptive algorithm.
- //-------------------------------------------------
- void TransporterFacade::forceSend(Uint32 block_number) {
- checkCounter--;
- m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
- sendPerformedLastInterval = 1;
- if (checkCounter < 0) {
- calculateSendLimit();
- }
- theTransporterRegistry->forceSendCheck(0);
- }
- //-------------------------------------------------
- // Improving API performance
- //-------------------------------------------------
- void
- TransporterFacade::checkForceSend(Uint32 block_number) {
- m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
- //-------------------------------------------------
- // This code is an adaptive algorithm to discover when
- // the API should actually send its buffers. The reason
- // is that the performance is highly dependent on the
- // size of the writes over the communication network.
- // Thus we try to ensure that the send size is as big
- // as possible. At the same time we don't want response
- // time to increase so therefore we have to keep track of
- // how the users are performing adaptively.
- //-------------------------------------------------
- if (theTransporterRegistry->forceSendCheck(currentSendLimit) == 1) {
- sendPerformedLastInterval = 1;
- }
- checkCounter--;
- if (checkCounter < 0) {
- calculateSendLimit();
- }
- }
- /******************************************************************************
- * SEND SIGNAL METHODS
- *****************************************************************************/
- int
- TransporterFacade::sendSignal(NdbApiSignal * aSignal, NodeId aNode){
- Uint32* tDataPtr = aSignal->getDataPtrSend();
- Uint32 Tlen = aSignal->theLength;
- Uint32 TBno = aSignal->theReceiversBlockNumber;
- if(getIsNodeSendable(aNode) == true){
- #ifdef API_TRACE
- if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
- Uint32 tmp = aSignal->theSendersBlockRef;
- aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
- LinearSectionPtr ptr[3];
- signalLogger.sendSignal(* aSignal,
- 1,
- aSignal->getDataPtr(),
- aNode, ptr, 0);
- signalLogger.flushSignalLog();
- aSignal->theSendersBlockRef = tmp;
- }
- #endif
- if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
- SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
- 1, // JBB
- tDataPtr,
- aNode,
- 0);
- //if (ss != SEND_OK) ndbout << ss << endl;
- return (ss == SEND_OK ? 0 : -1);
- } else {
- ndbout << "ERR: SigLen = " << Tlen << " BlockRec = " << TBno;
- ndbout << " SignalNo = " << aSignal->theVerId_signalNumber << endl;
- assert(0);
- }//if
- }
- //const ClusterMgr::Node & node = theClusterMgr->getNodeInfo(aNode);
- //const Uint32 startLevel = node.m_state.startLevel;
- return -1; // Node Dead
- }
- int
- TransporterFacade::sendSignalUnCond(NdbApiSignal * aSignal, NodeId aNode){
- #ifdef API_TRACE
- if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
- Uint32 tmp = aSignal->theSendersBlockRef;
- aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
- LinearSectionPtr ptr[3];
- signalLogger.sendSignal(* aSignal,
- 0,
- aSignal->getDataPtr(),
- aNode, ptr, 0);
- signalLogger.flushSignalLog();
- aSignal->theSendersBlockRef = tmp;
- }
- #endif
- assert((aSignal->theLength != 0) &&
- (aSignal->theLength <= 25) &&
- (aSignal->theReceiversBlockNumber != 0));
- SendStatus ss = theTransporterRegistry->prepareSend(aSignal,
- 0,
- aSignal->getDataPtr(),
- aNode,
- 0);
- return (ss == SEND_OK ? 0 : -1);
- }
- #define CHUNK_SZ NDB_SECTION_SEGMENT_SZ*64 // related to MAX_MESSAGE_SIZE
- int
- TransporterFacade::sendFragmentedSignal(NdbApiSignal* aSignal, NodeId aNode,
- LinearSectionPtr ptr[3], Uint32 secs)
- {
- if(getIsNodeSendable(aNode) != true)
- return -1;
- #ifdef API_TRACE
- if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
- Uint32 tmp = aSignal->theSendersBlockRef;
- aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
- signalLogger.sendSignal(* aSignal,
- 1,
- aSignal->getDataPtrSend(),
- aNode,
- ptr, secs);
- aSignal->theSendersBlockRef = tmp;
- }
- #endif
- NdbApiSignal tmp_signal(*(SignalHeader*)aSignal);
- LinearSectionPtr tmp_ptr[3];
- Uint32 unique_id= m_fragmented_signal_id++; // next unique id
- unsigned i;
- for (i= 0; i < secs; i++)
- tmp_ptr[i]= ptr[i];
- unsigned start_i= 0;
- unsigned chunk_sz= 0;
- unsigned fragment_info= 0;
- Uint32 *tmp_data= tmp_signal.getDataPtrSend();
- for (i= 0; i < secs;) {
- unsigned save_sz= tmp_ptr[i].sz;
- tmp_data[i-start_i]= i;
- if (chunk_sz + save_sz > CHUNK_SZ) {
- // truncate
- unsigned send_sz= CHUNK_SZ - chunk_sz;
- if (i != start_i) // first piece of a new section has to be a multiple of NDB_SECTION_SEGMENT_SZ
- {
- send_sz=
- NDB_SECTION_SEGMENT_SZ
- *(send_sz+NDB_SECTION_SEGMENT_SZ-1)
- /NDB_SECTION_SEGMENT_SZ;
- if (send_sz > save_sz)
- send_sz= save_sz;
- }
- tmp_ptr[i].sz= send_sz;
- if (fragment_info < 2) // 1 = first fragment, 2 = middle fragments
- fragment_info++;
- // send tmp_signal
- tmp_data[i-start_i+1]= unique_id;
- tmp_signal.setLength(i-start_i+2);
- tmp_signal.m_fragmentInfo= fragment_info;
- tmp_signal.m_noOfSections= i-start_i+1;
- // do prepare send
- {
- SendStatus ss = theTransporterRegistry->prepareSend
- (&tmp_signal,
- 1, /*JBB*/
- tmp_data,
- aNode,
- &tmp_ptr[start_i]);
- assert(ss != SEND_MESSAGE_TOO_BIG);
- if (ss != SEND_OK) return -1;
- }
- // setup variables for next signal
- start_i= i;
- chunk_sz= 0;
- tmp_ptr[i].sz= save_sz-send_sz;
- tmp_ptr[i].p+= send_sz;
- if (tmp_ptr[i].sz == 0)
- i++;
- }
- else
- {
- chunk_sz+=save_sz;
- i++;
- }
- }
- unsigned a_sz= aSignal->getLength();
- if (fragment_info > 0) {
- // update the original signal to include section info
- Uint32 *a_data= aSignal->getDataPtrSend();
- unsigned tmp_sz= i-start_i;
- memcpy(a_data+a_sz,
- tmp_data,
- tmp_sz*sizeof(Uint32));
- a_data[a_sz+tmp_sz]= unique_id;
- aSignal->setLength(a_sz+tmp_sz+1);
- // send last fragment
- aSignal->m_fragmentInfo= 3; // 3 = last fragment
- aSignal->m_noOfSections= i-start_i;
- } else {
- aSignal->m_noOfSections= secs;
- }
- // send aSignal
- int ret;
- {
- SendStatus ss = theTransporterRegistry->prepareSend
- (aSignal,
- 1/*JBB*/,
- aSignal->getDataPtrSend(),
- aNode,
- &tmp_ptr[start_i]);
- assert(ss != SEND_MESSAGE_TOO_BIG);
- ret = (ss == SEND_OK ? 0 : -1);
- }
- aSignal->m_noOfSections = 0;
- aSignal->m_fragmentInfo = 0;
- aSignal->setLength(a_sz);
- return ret;
- }
- int
- TransporterFacade::sendSignal(NdbApiSignal* aSignal, NodeId aNode,
- LinearSectionPtr ptr[3], Uint32 secs){
- aSignal->m_noOfSections = secs;
- if(getIsNodeSendable(aNode) == true){
- #ifdef API_TRACE
- if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
- Uint32 tmp = aSignal->theSendersBlockRef;
- aSignal->theSendersBlockRef = numberToRef(tmp, theOwnId);
- signalLogger.sendSignal(* aSignal,
- 1,
- aSignal->getDataPtrSend(),
- aNode,
- ptr, secs);
- signalLogger.flushSignalLog();
- aSignal->theSendersBlockRef = tmp;
- }
- #endif
- SendStatus ss = theTransporterRegistry->prepareSend
- (aSignal,
- 1, // JBB
- aSignal->getDataPtrSend(),
- aNode,
- ptr);
- assert(ss != SEND_MESSAGE_TOO_BIG);
- aSignal->m_noOfSections = 0;
- return (ss == SEND_OK ? 0 : -1);
- }
- aSignal->m_noOfSections = 0;
- return -1;
- }
- /******************************************************************************
- * CONNECTION METHODS Etc
- ******************************************************************************/
- void
- TransporterFacade::doConnect(int aNodeId){
- theTransporterRegistry->setIOState(aNodeId, NoHalt);
- theTransporterRegistry->do_connect(aNodeId);
- }
- void
- TransporterFacade::doDisconnect(int aNodeId)
- {
- theTransporterRegistry->do_disconnect(aNodeId);
- }
- void
- TransporterFacade::reportConnected(int aNodeId)
- {
- theClusterMgr->reportConnected(aNodeId);
- return;
- }
- void
- TransporterFacade::reportDisconnected(int aNodeId)
- {
- theClusterMgr->reportDisconnected(aNodeId);
- return;
- }
- NodeId
- TransporterFacade::ownId() const
- {
- return theOwnId;
- }
- bool
- TransporterFacade::isConnected(NodeId aNodeId){
- return theTransporterRegistry->is_connected(aNodeId);
- }
- NodeId
- TransporterFacade::get_an_alive_node()
- {
- DBUG_ENTER("TransporterFacade::get_an_alive_node");
- DBUG_PRINT("enter", ("theStartNodeId: %d", theStartNodeId));
- #ifdef VM_TRACE
- const char* p = NdbEnv_GetEnv("NDB_ALIVE_NODE_ID", (char*)0, 0);
- if (p != 0 && *p != 0)
- return atoi(p);
- #endif
- NodeId i;
- for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
- if (get_node_alive(i)){
- DBUG_PRINT("info", ("Node %d is alive", i));
- theStartNodeId = ((i + 1) % MAX_NDB_NODES);
- DBUG_RETURN(i);
- }
- }
- for (i = 1; i < theStartNodeId; i++) {
- if (get_node_alive(i)){
- DBUG_PRINT("info", ("Node %d is alive", i));
- theStartNodeId = ((i + 1) % MAX_NDB_NODES);
- DBUG_RETURN(i);
- }
- }
- DBUG_RETURN((NodeId)0);
- }
- TransporterFacade::ThreadData::ThreadData(Uint32 size){
- m_firstFree = END_OF_LIST;
- expand(size);
- }
- void
- TransporterFacade::ThreadData::expand(Uint32 size){
- Object_Execute oe = { 0 ,0 };
- NodeStatusFunction fun = 0;
- const Uint32 sz = m_statusNext.size();
- m_objectExecute.fill(sz + size, oe);
- m_statusFunction.fill(sz + size, fun);
- for(Uint32 i = 0; i<size; i++){
- m_statusNext.push_back(sz + i + 1);
- }
- m_statusNext.back() = m_firstFree;
- m_firstFree = m_statusNext.size() - size;
- }
- int
- TransporterFacade::ThreadData::open(void* objRef,
- ExecuteFunction fun,
- NodeStatusFunction fun2)
- {
- Uint32 nextFree = m_firstFree;
- if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
- return -1;
- }
- if(nextFree == END_OF_LIST){
- expand(10);
- nextFree = m_firstFree;
- }
- m_firstFree = m_statusNext[nextFree];
- Object_Execute oe = { objRef , fun };
- m_statusNext[nextFree] = INACTIVE;
- m_objectExecute[nextFree] = oe;
- m_statusFunction[nextFree] = fun2;
- return indexToNumber(nextFree);
- }
- int
- TransporterFacade::ThreadData::close(int number){
- number= numberToIndex(number);
- assert(getInUse(number));
- m_statusNext[number] = m_firstFree;
- m_firstFree = number;
- Object_Execute oe = { 0, 0 };
- m_objectExecute[number] = oe;
- m_statusFunction[number] = 0;
- return 0;
- }
- template class Vector<NodeStatusFunction>;
- template class Vector<TransporterFacade::ThreadData::Object_Execute>;