18 #include <ndb_global.h>
19 #include <ndb_limits.h>
20 #include "TransporterFacade.hpp"
21 #include "trp_client.hpp"
22 #include "ClusterMgr.hpp"
23 #include <IPCConfig.hpp>
24 #include <TransporterCallback.hpp>
25 #include <TransporterRegistry.hpp>
26 #include "NdbApiSignal.hpp"
27 #include "NdbWaiter.hpp"
32 #include <kernel/GlobalSignalNumbers.h>
33 #include <mgmapi_config_parameters.h>
34 #include <mgmapi_configuration.hpp>
35 #include <NdbConfig.h>
36 #include <ndb_version.h>
37 #include <SignalLoggerManager.hpp>
38 #include <kernel/ndb_limits.h>
39 #include <signaldata/AlterTable.hpp>
40 #include <signaldata/SumaImpl.hpp>
41 #include <signaldata/AllocNodeId.hpp>
46 static int numberToIndex(
int number)
48 return number - MIN_API_BLOCK_NO;
51 static int indexToNumber(
int index)
53 return index + MIN_API_BLOCK_NO;
56 #if defined DEBUG_TRANSPORTER
57 #define TRP_DEBUG(t) ndbout << __FILE__ << ":" << __LINE__ << ":" << t << endl;
67 TransporterError errorCode,
const char *info)
69 #ifdef REPORT_TRANSPORTER
70 ndbout_c(
"REPORT_TRANSP: reportError (nodeId=%d, errorCode=%d) %s",
71 (
int)nodeId, (
int)errorCode, info ? info :
"");
73 if(errorCode & TE_DO_DISCONNECT) {
74 ndbout_c(
"reportError (%d, %d) %s", (
int)nodeId, (
int)errorCode,
86 #ifdef REPORT_TRANSPORTER
87 ndbout_c(
"REPORT_TRANSP: reportSendLen (nodeId=%d, bytes/count=%d)",
88 (
int)nodeId, (Uint32)(bytes/count));
101 #ifdef REPORT_TRANSPORTER
102 ndbout_c(
"REPORT_TRANSP: reportReceiveLen (nodeId=%d, bytes/count=%d)",
103 (
int)nodeId, (Uint32)(bytes/count));
116 #ifdef REPORT_TRANSPORTER
117 ndbout_c(
"REPORT_TRANSP: API reportConnect (nodeId=%d)", (
int)nodeId);
119 reportConnected(nodeId);
127 #ifdef REPORT_TRANSPORTER
128 ndbout_c(
"REPORT_TRANSP: API reportDisconnect (nodeId=%d)", (
int)nodeId);
130 reportDisconnected(nodeId);
153 static const char * API_SIGNAL_LOG =
"API_SIGNAL_LOG";
154 static const char * apiSignalLog = 0;
160 signalLogger.flushSignalLog();
162 const char * tmp = NdbEnv_GetEnv(API_SIGNAL_LOG, (
char *)0, 0);
163 if(tmp != 0 && apiSignalLog != 0 && strcmp(tmp,apiSignalLog) == 0){
165 }
else if(tmp == 0 && apiSignalLog == 0){
167 }
else if(tmp == 0 && apiSignalLog != 0){
172 if (strcmp(tmp,
"-") == 0)
175 else if (strcmp(tmp,
"+") == 0)
187 TRACE_GSN(Uint32 gsn)
190 #ifndef TRACE_APIREGREQ
192 case GSN_API_REGCONF:
196 case GSN_SUB_GCP_COMPLETE_REP:
197 case GSN_SUB_GCP_COMPLETE_ACK:
211 Uint8 prio, Uint32 *
const theData,
214 Uint32 tRecBlockNo = header->theReceiversBlockNumber;
217 if(setSignalLog() && TRACE_GSN(header->theVerId_signalNumber)){
222 ptr, header->m_noOfSections);
223 signalLogger.flushSignalLog();
227 if (tRecBlockNo >= MIN_API_BLOCK_NO)
229 trp_client * clnt = m_threads.get(tRecBlockNo);
245 tSignal->setDataPtr(theData);
246 clnt->trp_deliver_signal(tSignal, ptr);
249 else if (tRecBlockNo == API_PACKED)
257 Uint32 Tlength = header->theLength;
263 while (Tsent < Tlength) {
264 Uint32 Theader = theData[Tsent];
266 Uint32 TpacketLen = (Theader & 0x1F) + 3;
267 tRecBlockNo = Theader >> 16;
268 if (TpacketLen <= 25)
270 if ((TpacketLen + Tsent) <= Tlength)
276 header->theLength = TpacketLen;
277 header->theReceiversBlockNumber = tRecBlockNo;
278 Uint32* tDataPtr = &theData[Tsent];
280 if (tRecBlockNo >= MIN_API_BLOCK_NO)
282 trp_client * clnt = m_threads.get(tRecBlockNo);
287 tSignal->setDataPtr(tDataPtr);
288 clnt->trp_deliver_signal(tSignal, 0);
296 else if (tRecBlockNo >= MIN_API_FIXED_BLOCK_NO &&
297 tRecBlockNo <= MAX_API_FIXED_BLOCK_NO)
299 Uint32 dynamic= m_fixed2dynamic[tRecBlockNo - MIN_API_FIXED_BLOCK_NO];
305 tSignal->setDataPtr(theData);
306 clnt->trp_deliver_signal(tSignal, ptr);
312 if(header->theVerId_signalNumber != GSN_API_REGREQ)
314 TRP_DEBUG(
"TransporterFacade received signal to unknown block no." );
315 ndbout <<
"BLOCK NO: " << tRecBlockNo <<
" sig "
316 << header->theVerId_signalNumber << endl;
331 copy(Uint32 * & insertPtr,
347 assert(theOwnId == 0);
350 #if defined SIGPIPE && !defined _WIN32
351 (void)signal(SIGPIPE, SIG_IGN);
355 if (theTransporterRegistry == NULL)
358 if (!theTransporterRegistry->init(nodeId))
361 if (theClusterMgr == NULL)
364 if (theClusterMgr == NULL)
370 if (!theTransporterRegistry->start_service(m_socket_server))
373 theReceiveThread = NdbThread_Create(runReceiveResponse_C,
377 NDB_THREAD_PRIO_LOW);
379 theSendThread = NdbThread_Create(runSendRequest_C,
383 NDB_THREAD_PRIO_LOW);
385 theClusterMgr->startThread();
398 DBUG_ENTER(
"TransporterFacade::stop_instance");
404 TransporterFacade::doStop(){
405 DBUG_ENTER(
"TransporterFacade::doStop");
410 if (theClusterMgr != NULL) theClusterMgr->doStop();
417 if (theReceiveThread) {
418 NdbThread_WaitFor(theReceiveThread, &status);
419 NdbThread_Destroy(&theReceiveThread);
422 NdbThread_WaitFor(theSendThread, &status);
423 NdbThread_Destroy(&theSendThread);
430 runSendRequest_C(
void * me)
436 void TransporterFacade::threadMainSend(
void)
439 if (theTransporterRegistry->start_clients() == 0){
440 ndbout_c(
"Unable to start theTransporterRegistry->start_clients");
446 while(!theStopReceive) {
447 NdbSleep_MilliSleep(10);
448 NdbMutex_Lock(theMutexPtr);
449 if (sendPerformedLastInterval == 0) {
452 sendPerformedLastInterval = 0;
453 NdbMutex_Unlock(theMutexPtr);
455 theTransporterRegistry->stopSending();
457 m_socket_server.stopServer();
460 theTransporterRegistry->stop_clients();
465 runReceiveResponse_C(
void * me)
479 void TransporterFacade::threadMainReceive(
void)
482 #ifdef NDB_SHM_TRANSPORTER
483 NdbThread_set_shm_sigmask(TRUE);
485 while(!theStopReceive)
487 theClusterMgr->lock();
489 theClusterMgr->unlock();
490 NdbSleep_MilliSleep(100);
500 void TransporterFacade::external_poll(Uint32 wait_time)
502 NdbMutex_Unlock(theMutexPtr);
504 #ifdef NDB_SHM_TRANSPORTER
510 NdbThread_set_shm_sigmask(FALSE);
513 const int res = theTransporterRegistry->pollReceive(wait_time);
515 #ifdef NDB_SHM_TRANSPORTER
516 NdbThread_set_shm_sigmask(TRUE);
519 NdbMutex_Lock(theMutexPtr);
528 m_poll_queue_head(NULL),
529 m_poll_queue_tail(NULL),
530 theTransporterRegistry(0),
538 theReceiveThread(NULL),
539 m_fragmented_signal_id(0),
540 m_globalDictCache(cache)
542 DBUG_ENTER(
"TransporterFacade::TransporterFacade");
543 theMutexPtr = NdbMutex_CreateWithName(
"TTFM");
544 sendPerformedLastInterval = 0;
546 for (
int i = 0;
i < NO_API_FIXED_BLOCKS;
i++)
547 m_fixed2dynamic[
i]= RNIL;
560 static bool is_mgmd(Uint32 nodeId,
564 if (iter.find(CFG_NODE_ID, nodeId))
567 if(iter.get(CFG_TYPE_OF_SECTION, &type))
570 return (type == NODE_TYPE_MGM);
575 TransporterFacade::do_connect_mgm(NodeId nodeId,
579 DBUG_ENTER(
"TransporterFacade::do_connect_mgm");
581 for(iter.first(); iter.valid(); iter.next())
583 Uint32 nodeId1, nodeId2;
584 if (iter.get(CFG_CONNECTION_NODE_1, &nodeId1) ||
585 iter.get(CFG_CONNECTION_NODE_2, &nodeId2))
589 if (nodeId1 != nodeId && nodeId2 != nodeId)
593 if(is_mgmd(nodeId1, conf) && is_mgmd(nodeId2, conf))
595 Uint32 remoteNodeId = (nodeId == nodeId1 ? nodeId2 : nodeId1);
596 DBUG_PRINT(
"info", (
"opening connection to node %d", remoteNodeId));
597 doConnect(remoteNodeId);
608 DBUG_ENTER(
"TransporterFacade::configure");
610 assert(theOwnId == nodeId);
611 assert(theTransporterRegistry);
612 assert(theClusterMgr);
617 * theTransporterRegistry,
622 theClusterMgr->configure(nodeId, conf);
625 if(iter.find(CFG_NODE_ID, nodeId))
629 Uint32 total_send_buffer = 0;
630 iter.get(CFG_TOTAL_SEND_BUFFER_MEMORY, &total_send_buffer);
633 Uint32 auto_reconnect=1;
634 iter.get(CFG_AUTO_RECONNECT, &auto_reconnect);
636 const char * priospec = 0;
637 if (iter.get(CFG_HB_THREAD_PRIO, &priospec) == 0)
639 NdbThread_SetHighPrioProperties(priospec);
645 if (theClusterMgr->m_auto_reconnect == -1)
647 theClusterMgr->m_auto_reconnect = auto_reconnect;
651 signalLogger.logOn(
true, 0, SignalLoggerManager::LogInOut);
655 if (!do_connect_mgm(nodeId, conf))
671 Uint32 sz = m_threads.m_statusNext.size();
672 for (Uint32
i = 0;
i < sz ;
i ++)
675 if (clnt != 0 && clnt != sender)
677 clnt->trp_deliver_signal(aSignal, ptr);
683 TransporterFacade::connected()
685 DBUG_ENTER(
"TransporterFacade::connected");
686 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theOwnId));
687 signal.theVerId_signalNumber = GSN_ALLOC_NODEID_CONF;
688 signal.theReceiversBlockNumber = 0;
690 signal.theLength = AllocNodeIdConf::SignalLength;
695 rep->nodeId = theOwnId;
699 Uint32 sz = m_threads.m_statusNext.size();
700 for (Uint32
i = 0;
i < sz ;
i ++)
705 clnt->trp_deliver_signal(&signal, 0);
712 TransporterFacade::close_clnt(
trp_client* clnt)
717 NdbMutex_Lock(theMutexPtr);
718 if (m_threads.get(clnt->m_blockNo) == clnt)
720 m_threads.close(clnt->m_blockNo);
727 NdbMutex_Unlock(theMutexPtr);
735 DBUG_ENTER(
"TransporterFacade::open");
736 Guard g(theMutexPtr);
737 int r= m_threads.open(clnt);
743 if (unlikely(blockNo != -1))
746 Uint32 fixed_index = blockNo - MIN_API_FIXED_BLOCK_NO;
748 assert(blockNo >= MIN_API_FIXED_BLOCK_NO &&
749 fixed_index <= NO_API_FIXED_BLOCKS);
751 m_fixed2dynamic[fixed_index]= r;
756 r = numberToRef(r, theOwnId);
760 r = numberToRef(r, 0);
765 TransporterFacade::~TransporterFacade()
767 DBUG_ENTER(
"TransporterFacade::~TransporterFacade");
769 delete theClusterMgr;
770 NdbMutex_Lock(theMutexPtr);
771 delete theTransporterRegistry;
772 NdbMutex_Unlock(theMutexPtr);
773 NdbMutex_Destroy(theMutexPtr);
781 TransporterFacade::calculateSendLimit()
784 Uint32 TthreadCount = 0;
786 Uint32 sz = m_threads.m_statusNext.size();
787 for (Ti = 0; Ti < sz; Ti++) {
788 if (m_threads.m_statusNext[Ti] == (ThreadData::ACTIVE)){
790 m_threads.m_statusNext[Ti] = ThreadData::INACTIVE;
793 currentSendLimit = TthreadCount;
794 if (currentSendLimit == 0) {
795 currentSendLimit = 1;
797 checkCounter = currentSendLimit << 2;
805 void TransporterFacade::forceSend(Uint32 block_number) {
807 m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
808 sendPerformedLastInterval = 1;
809 if (checkCounter < 0) {
810 calculateSendLimit();
819 TransporterFacade::checkForceSend(Uint32 block_number) {
820 m_threads.m_statusNext[numberToIndex(block_number)] = ThreadData::ACTIVE;
832 if (theTransporterRegistry->
forceSendCheck(currentSendLimit) == 1) {
833 sendPerformedLastInterval = 1;
836 if (checkCounter < 0) {
837 calculateSendLimit();
846 TransporterFacade::sendSignal(
const NdbApiSignal * aSignal, NodeId aNode)
848 const Uint32* tDataPtr = aSignal->getConstDataPtrSend();
849 Uint32 Tlen = aSignal->theLength;
850 Uint32 TBno = aSignal->theReceiversBlockNumber;
852 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
854 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
860 signalLogger.flushSignalLog();
863 if ((Tlen != 0) && (Tlen <= 25) && (TBno != 0)) {
864 SendStatus ss = theTransporterRegistry->
prepareSend(aSignal,
872 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
873 aSignal->readSignalNumber() == GSN_API_REGREQ ||
874 (aSignal->readSignalNumber() == GSN_CONNECT_REP &&
877 return (ss == SEND_OK ? 0 : -1);
881 ndbout <<
"ERR: SigLen = " << Tlen <<
" BlockRec = " << TBno;
882 ndbout <<
" SignalNo = " << aSignal->theVerId_signalNumber << endl;
903 Uint32 realIterWords;
908 const Uint32* lastReadPtr;
910 Uint32 lastReadPtrLen;
919 realIterator= ptr.sectionIter;
920 realIterWords= ptr.sz;
923 rangeLen= rangeRemain= realIterWords;
928 assert(checkInvariants());
937 bool checkInvariants()
939 assert( (realIterator != NULL) || (realIterWords == 0) );
940 assert( realCurrPos <= realIterWords );
941 assert( rangeStart <= realIterWords );
942 assert( (rangeStart+rangeLen) <= realIterWords);
943 assert( rangeRemain <= rangeLen );
946 assert( (lastReadPtr != NULL) || (rangeRemain == 0));
951 assert( (lastReadPtr == NULL) ||
952 ((rangeRemain == 0) || (lastReadPtrLen != 0)));
964 void moveToPos(Uint32 pos)
966 assert(pos <= realIterWords);
968 if (pos < realCurrPos)
971 realIterator->reset();
977 if ((lastReadPtr == NULL) &&
978 (realIterWords != 0) &&
979 (pos != realIterWords))
980 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
982 if (pos == realCurrPos)
986 while (pos >= realCurrPos + lastReadPtrLen)
988 realCurrPos+= lastReadPtrLen;
989 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
990 assert(lastReadPtr != NULL);
993 const Uint32 chunkOffset= pos - realCurrPos;
994 lastReadPtr+= chunkOffset;
995 lastReadPtrLen-= chunkOffset;
1009 assert(checkInvariants());
1010 if (start+len > realIterWords)
1015 rangeLen= rangeRemain= len;
1017 assert(checkInvariants());
1030 assert(checkInvariants());
1031 moveToPos(rangeStart);
1032 rangeRemain= rangeLen;
1033 assert(checkInvariants());
1043 assert(checkInvariants());
1044 const Uint32* currPtr= NULL;
1048 assert(lastReadPtr != NULL);
1049 assert(lastReadPtrLen != 0);
1050 currPtr= lastReadPtr;
1052 sz= MIN(rangeRemain, lastReadPtrLen);
1054 if (sz == lastReadPtrLen)
1058 lastReadPtr= realIterator->getNextWords(lastReadPtrLen);
1063 lastReadPtrLen-= sz;
1073 assert(checkInvariants());
1082 #define CHUNK_SZ ((((MAX_SEND_MESSAGE_BYTESIZE >> 2) / NDB_SECTION_SEGMENT_SZ) - 2 ) \
1083 * NDB_SECTION_SEGMENT_SZ)
1112 TransporterFacade::sendFragmentedSignal(
const NdbApiSignal* inputSignal,
1121 Uint32 totalSectionLength= 0;
1122 for (i= 0; i < secs; i++)
1123 totalSectionLength+= ptr[i].sz;
1126 if (totalSectionLength <= CHUNK_SZ)
1127 return sendSignal(aSignal, aNode, ptr, secs);
1131 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1133 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1136 aSignal->getConstDataPtrSend(),
1138 signalLogger.flushSignalLog();
1139 for (Uint32 i = 0; i<secs; i++)
1140 ptr[i].sectionIter->reset();
1147 Uint32 unique_id= m_fragmented_signal_id++;
1152 for (i= 0; i < 3; i++)
1153 tmp_ptr[i]= (i < secs)? ptr[
i] : empty;
1161 tmp_ptr[0].sectionIter= &sec0;
1162 tmp_ptr[1].sectionIter= &sec1;
1163 tmp_ptr[2].sectionIter= &sec2;
1165 unsigned start_i= 0;
1166 unsigned this_chunk_sz= 0;
1167 unsigned fragment_info= 0;
1168 Uint32 *tmp_signal_data= tmp_signal.getDataPtrSend();
1169 for (i= 0; i < secs;) {
1170 unsigned remaining_sec_sz= tmp_ptr[
i].sz;
1171 tmp_signal_data[i-start_i]=
i;
1172 if (this_chunk_sz + remaining_sec_sz <= CHUNK_SZ)
1175 this_chunk_sz+= remaining_sec_sz;
1181 unsigned send_sz= CHUNK_SZ - this_chunk_sz;
1197 NDB_SECTION_SEGMENT_SZ
1198 *((send_sz+NDB_SECTION_SEGMENT_SZ-1)
1199 /NDB_SECTION_SEGMENT_SZ);
1200 if (send_sz > remaining_sec_sz)
1201 send_sz= remaining_sec_sz;
1207 tmp_ptr[
i].sz= send_sz;
1210 const Uint32 total_sec_sz= ptr[
i].sz;
1211 const Uint32 start= (total_sec_sz - remaining_sec_sz);
1217 if (fragment_info < 2)
1222 tmp_signal_data[i-start_i+1]= unique_id;
1223 tmp_signal.setLength(i-start_i+2);
1224 tmp_signal.m_fragmentInfo= fragment_info;
1225 tmp_signal.m_noOfSections= i-start_i+1;
1228 SendStatus ss = theTransporterRegistry->
prepareSend
1234 assert(ss != SEND_MESSAGE_TOO_BIG);
1235 if (ss != SEND_OK)
return -1;
1238 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1239 tmp_signal.readSignalNumber() == GSN_API_REGREQ);
1245 assert(remaining_sec_sz >= send_sz);
1246 Uint32 remaining= remaining_sec_sz - send_sz;
1247 tmp_ptr[
i].sz= remaining;
1249 ok= fragIter->
setRange(start+send_sz, remaining);
1260 unsigned a_sz= aSignal->getLength();
1262 if (fragment_info > 0) {
1264 Uint32 *a_data= aSignal->getDataPtrSend();
1265 unsigned tmp_sz= i-start_i;
1268 tmp_sz*
sizeof(Uint32));
1269 a_data[a_sz+tmp_sz]= unique_id;
1270 aSignal->setLength(a_sz+tmp_sz+1);
1273 aSignal->m_fragmentInfo= 3;
1274 aSignal->m_noOfSections= i-start_i;
1276 aSignal->m_noOfSections= secs;
1282 SendStatus ss = theTransporterRegistry->
prepareSend
1285 aSignal->getConstDataPtrSend(),
1288 assert(ss != SEND_MESSAGE_TOO_BIG);
1291 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1292 aSignal->readSignalNumber() == GSN_API_REGREQ);
1294 ret = (ss == SEND_OK ? 0 : -1);
1296 aSignal->m_noOfSections = 0;
1297 aSignal->m_fragmentInfo = 0;
1298 aSignal->setLength(a_sz);
1303 TransporterFacade::sendFragmentedSignal(
const NdbApiSignal* aSignal,
1314 for (Uint32 j=0; j<3; j++)
1315 linCopy[j]= (j < secs)? ptr[j] : empty;
1322 tmpPtr[0].sz= linCopy[0].sz;
1323 tmpPtr[0].sectionIter= &zero;
1324 tmpPtr[1].sz= linCopy[1].sz;
1325 tmpPtr[1].sectionIter= &one;
1326 tmpPtr[2].sz= linCopy[2].sz;
1327 tmpPtr[2].sectionIter= &two;
1329 return sendFragmentedSignal(aSignal, aNode, tmpPtr, secs);
1334 TransporterFacade::sendSignal(
const NdbApiSignal* aSignal, NodeId aNode,
1337 Uint32 save = aSignal->m_noOfSections;
1338 const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = secs;
1340 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1342 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1345 aSignal->getConstDataPtrSend(),
1347 signalLogger.flushSignalLog();
1350 SendStatus ss = theTransporterRegistry->
prepareSend
1353 aSignal->getConstDataPtrSend(),
1356 assert(ss != SEND_MESSAGE_TOO_BIG);
1357 const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = save;
1360 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1361 aSignal->readSignalNumber() == GSN_API_REGREQ);
1363 return (ss == SEND_OK ? 0 : -1);
1367 TransporterFacade::sendSignal(
const NdbApiSignal* aSignal, NodeId aNode,
1370 Uint32 save = aSignal->m_noOfSections;
1371 const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = secs;
1373 if(setSignalLog() && TRACE_GSN(aSignal->theVerId_signalNumber)){
1375 tmp.theSendersBlockRef = numberToRef(aSignal->theSendersBlockRef, theOwnId);
1378 aSignal->getConstDataPtrSend(),
1380 signalLogger.flushSignalLog();
1381 for (Uint32 i = 0; i<secs; i++)
1382 ptr[i].sectionIter->reset();
1385 SendStatus ss = theTransporterRegistry->
prepareSend
1388 aSignal->getConstDataPtrSend(),
1391 assert(ss != SEND_MESSAGE_TOO_BIG);
1392 const_cast<NdbApiSignal*
>(aSignal)->m_noOfSections = save;
1395 assert(theClusterMgr->getNodeInfo(aNode).is_confirmed() ||
1396 aSignal->readSignalNumber() == GSN_API_REGREQ);
1398 return (ss == SEND_OK ? 0 : -1);
1405 TransporterFacade::doConnect(
int aNodeId){
1406 theTransporterRegistry->setIOState(aNodeId, NoHalt);
1411 TransporterFacade::doDisconnect(
int aNodeId)
1417 TransporterFacade::reportConnected(
int aNodeId)
1424 TransporterFacade::reportDisconnected(
int aNodeId)
1431 TransporterFacade::ownId()
const
1437 TransporterFacade::isConnected(NodeId aNodeId){
1438 return theTransporterRegistry->is_connected(aNodeId);
1442 TransporterFacade::get_an_alive_node()
1444 DBUG_ENTER(
"TransporterFacade::get_an_alive_node");
1445 DBUG_PRINT(
"enter", (
"theStartNodeId: %d", theStartNodeId));
1447 const char* p = NdbEnv_GetEnv(
"NDB_ALIVE_NODE_ID", (
char*)0, 0);
1448 if (p != 0 && *p != 0)
1452 for (i = theStartNodeId; i < MAX_NDB_NODES; i++) {
1453 if (get_node_alive(i)){
1454 DBUG_PRINT(
"info", (
"Node %d is alive", i));
1455 theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1459 for (i = 1; i < theStartNodeId; i++) {
1460 if (get_node_alive(i)){
1461 DBUG_PRINT(
"info", (
"Node %d is alive", i));
1462 theStartNodeId = ((i + 1) % MAX_NDB_NODES);
1466 DBUG_RETURN((NodeId)0);
1469 TransporterFacade::ThreadData::ThreadData(Uint32
size){
1471 m_firstFree = END_OF_LIST;
1476 TransporterFacade::ThreadData::expand(Uint32
size){
1479 const Uint32 sz = m_statusNext.size();
1480 m_objectExecute.fill(sz + size, oe);
1481 for(Uint32 i = 0; i<
size; i++){
1482 m_statusNext.push_back(sz + i + 1);
1485 m_statusNext.back() = m_firstFree;
1486 m_firstFree = m_statusNext.size() -
size;
1491 TransporterFacade::ThreadData::open(
trp_client * clnt)
1493 Uint32 nextFree = m_firstFree;
1495 if(m_statusNext.size() >= MAX_NO_THREADS && nextFree == END_OF_LIST){
1499 if(nextFree == END_OF_LIST){
1501 nextFree = m_firstFree;
1505 m_firstFree = m_statusNext[nextFree];
1507 m_statusNext[nextFree] = INACTIVE;
1508 m_objectExecute[nextFree] = clnt;
1510 return indexToNumber(nextFree);
1514 TransporterFacade::ThreadData::close(
int number){
1515 number= numberToIndex(number);
1516 assert(m_objectExecute[number] != 0);
1517 m_statusNext[number] = m_firstFree;
1520 m_firstFree = number;
1521 m_objectExecute[number] = 0;
1526 TransporterFacade::get_active_ndb_objects()
const
1528 return m_threads.m_use_cnt;
1533 TransporterFacade::start_poll(
trp_client* clnt)
1536 clnt->m_poll.m_locked =
true;
1540 TransporterFacade::do_poll(
trp_client* clnt, Uint32 wait_time)
1542 clnt->m_poll.m_waiting =
true;
1543 assert(clnt->m_poll.m_locked ==
true);
1545 if (owner != NULL && owner != clnt)
1556 assert(clnt->m_poll.m_poll_owner ==
false);
1557 add_to_poll_queue(clnt);
1558 NdbCondition_WaitTimeout(clnt->m_poll.m_condition, theMutexPtr,
1560 if (clnt != m_poll_owner && clnt->m_poll.m_waiting)
1562 remove_from_poll_queue(clnt);
1572 assert(owner == clnt || clnt->m_poll.m_poll_owner ==
false);
1573 m_poll_owner = clnt;
1574 clnt->m_poll.m_poll_owner =
true;
1575 external_poll(wait_time);
1582 if (clnt->m_poll.m_waiting)
1584 clnt->m_poll.m_waiting =
false;
1585 if (m_poll_owner != clnt)
1587 remove_from_poll_queue(clnt);
1588 NdbCondition_Signal(clnt->m_poll.m_condition);
1594 TransporterFacade::complete_poll(
trp_client* clnt)
1596 clnt->m_poll.m_waiting =
false;
1597 if (!clnt->m_poll.m_locked)
1599 assert(clnt->m_poll.m_poll_owner ==
false);
1616 if (m_poll_owner == clnt)
1618 assert(clnt->m_poll.m_poll_owner ==
true);
1619 m_poll_owner = new_owner = remove_last_from_poll_queue();
1623 assert(new_owner->m_poll.m_poll_owner ==
false);
1624 assert(new_owner->m_poll.m_locked ==
true);
1625 assert(new_owner->m_poll.m_waiting ==
true);
1626 NdbCondition_Signal(new_owner->m_poll.m_condition);
1627 new_owner->m_poll.m_poll_owner =
true;
1629 clnt->m_poll.m_locked =
false;
1630 clnt->m_poll.m_poll_owner =
false;
1635 TransporterFacade::add_to_poll_queue(
trp_client* clnt)
1638 assert(clnt->m_poll.m_prev == 0);
1639 assert(clnt->m_poll.m_next == 0);
1640 assert(clnt->m_poll.m_locked ==
true);
1641 assert(clnt->m_poll.m_poll_owner ==
false);
1643 if (m_poll_queue_head == 0)
1645 assert(m_poll_queue_tail == 0);
1646 m_poll_queue_head = clnt;
1647 m_poll_queue_tail = clnt;
1651 assert(m_poll_queue_tail->m_poll.m_next == 0);
1652 m_poll_queue_tail->m_poll.m_next = clnt;
1653 clnt->m_poll.m_prev = m_poll_queue_tail;
1654 m_poll_queue_tail = clnt;
1659 TransporterFacade::remove_from_poll_queue(
trp_client* clnt)
1662 assert(clnt->m_poll.m_locked ==
true);
1663 assert(clnt->m_poll.m_poll_owner ==
false);
1665 if (clnt->m_poll.m_prev != 0)
1667 clnt->m_poll.m_prev->m_poll.m_next = clnt->m_poll.m_next;
1671 assert(m_poll_queue_head == clnt);
1672 m_poll_queue_head = clnt->m_poll.m_next;
1675 if (clnt->m_poll.m_next != 0)
1677 clnt->m_poll.m_next->m_poll.m_prev = clnt->m_poll.m_prev;
1681 assert(m_poll_queue_tail == clnt);
1682 m_poll_queue_tail = clnt->m_poll.m_prev;
1685 if (m_poll_queue_head == 0)
1686 assert(m_poll_queue_tail == 0);
1687 else if (m_poll_queue_tail == 0)
1688 assert(m_poll_queue_head == 0);
1690 clnt->m_poll.m_prev = 0;
1691 clnt->m_poll.m_next = 0;
1695 TransporterFacade::remove_last_from_poll_queue()
1701 remove_from_poll_queue(clnt);
1707 #include "SignalSender.hpp"
1710 SignalSectionIterator::getNextWords(Uint32& sz)
1712 if (likely(currentSignal != NULL))
1715 currentSignal= currentSignal->next();
1716 sz= signal->getLength();
1717 return signal->getDataPtrSend();
1728 #define VERIFY(x) if ((x) == 0) { printf("VERIFY failed at Line %u : %s\n",__LINE__, #x); return -1; }
1736 while (pos < dataWords)
1738 const Uint32* readPtr=NULL;
1741 readPtr= gsi.getNextWords(len);
1743 VERIFY(readPtr != NULL);
1745 VERIFY(len <= (Uint32) (dataWords - pos));
1747 for (
int j=0; j < (int) len; j++)
1748 VERIFY(readPtr[j] == (Uint32) (bias ++));
1760 VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1765 VERIFY(iter.getNextWords(sz) == NULL);
1768 VERIFY(iter.getNextWords(sz) == NULL);
1774 VERIFY(verifyIteratorContents(iter, size, bias) == 0);
1777 VERIFY(iter.getNextWords(sz) == NULL);
1789 VERIFY(checkGenericSectionIterator(iter, size, bias) == 0);
1794 const int subranges= 20;
1799 ptr.sectionIter= &iter;
1802 for (
int s=0; s< subranges; s++)
1808 start= (Uint32) myRandom48(size);
1809 if (0 != (size-start))
1810 len= (Uint32) myRandom48(size-start);
1817 fsi.setRange(start, len);
1818 VERIFY(checkGenericSectionIterator(fsi, len, bias + start) == 0);
1827 testLinearSectionIterator()
1832 const int totalSize= 200000;
1835 Uint32 data[totalSize];
1836 for (
int i=0; i<totalSize; i++)
1839 for (
int len= 0; len < 50000; len++)
1843 VERIFY(checkIterator(something, len, bias) == 0);
1850 createSignalChain(
NdbApiSignal*& poolHead,
int length,
int bias)
1858 while (pos < length)
1860 int offset= pos % NdbApiSignal::MaxSignalWords;
1864 if (poolHead == NULL)
1868 poolHead= poolHead->next();
1873 if (chainHead == NULL)
1875 chainHead= chainTail= newSig;
1879 chainTail->next(newSig);
1884 chainTail->getDataPtrSend()[
offset]= (bias + pos);
1885 chainTail->setLength(offset + 1);
1893 testSignalSectionIterator()
1899 const int totalNumSignals= 1000;
1903 for (
int i=0; i < totalNumSignals; i++)
1907 if (poolHead == NULL)
1914 sig->next(poolHead);
1920 for (
int dataWords= 1;
1921 dataWords <= (int)(totalNumSignals *
1922 NdbApiSignal::MaxSignalWords);
1927 VERIFY((signalChain= createSignalChain(poolHead, dataWords, bias)) != NULL );
1931 VERIFY(checkIterator(ssi, dataWords, bias) == 0);
1934 while (signalChain != NULL)
1937 signalChain= signalChain->next();
1939 sig->next(poolHead);
1945 while (poolHead != NULL)
1948 poolHead= sig->next();
1955 int main(
int arg,
char** argv)
1968 VERIFY(testLinearSectionIterator() == 0);
1969 VERIFY(testSignalSectionIterator() == 0);
1978 TransporterFacade::set_auto_reconnect(
int val)
1980 theClusterMgr->m_auto_reconnect = val;
1984 TransporterFacade::get_auto_reconnect()
const
1986 return theClusterMgr->m_auto_reconnect;
1992 theClusterMgr->set_max_api_reg_req_interval(interval);
1996 TransporterFacade::ext_update_connections()
1998 theClusterMgr->lock();
2000 theClusterMgr->unlock();
2004 TransporterFacade::ext_get_connect_address(Uint32 nodeId)
2006 return theTransporterRegistry->get_connect_address(nodeId);
2010 TransporterFacade::ext_forceHB()
2012 theClusterMgr->forceHB();
2016 TransporterFacade::ext_isConnected(NodeId aNodeId)
2019 theClusterMgr->lock();
2020 val = theClusterMgr->theNodes[aNodeId].is_connected();
2021 theClusterMgr->unlock();
2026 TransporterFacade::ext_doConnect(
int aNodeId)
2028 theClusterMgr->lock();
2029 assert(theClusterMgr->theNodes[aNodeId].is_connected() ==
false);
2031 theClusterMgr->unlock();