18 #include <ndb_global.h>
19 #include <ndb_limits.h>
20 #include <util/version.h>
22 #include "TransporterFacade.hpp"
23 #include <kernel/GlobalSignalNumbers.h>
25 #include "ClusterMgr.hpp"
26 #include <IPCConfig.hpp>
27 #include "NdbApiSignal.hpp"
33 #include <signaldata/NodeFailRep.hpp>
34 #include <signaldata/NFCompleteRep.hpp>
35 #include <signaldata/ApiRegSignalData.hpp>
36 #include <signaldata/AlterTable.hpp>
37 #include <signaldata/SumaImpl.hpp>
40 #include <mgmapi_configuration.hpp>
41 #include <mgmapi_config_parameters.h>
43 int global_flag_skip_invalidate_cache = 0;
44 int global_flag_skip_waiting_for_clean_cache = 0;
50 runClusterMgr_C(
void * me)
62 m_max_api_reg_req_interval(~0),
64 noOfConnectedNodes(0),
66 theClusterMgrThread(NULL),
68 m_cluster_state(CS_waiting_for_clean_cache)
70 DBUG_ENTER(
"ClusterMgr::ClusterMgr");
71 clusterMgrThreadMutex = NdbMutex_Create();
72 waitForHBCond= NdbCondition_Create();
73 m_auto_reconnect = -1;
75 Uint32
ret = this->open(&theFacade, API_CLUSTERMGR);
76 if (unlikely(ret == 0))
78 ndbout_c(
"Failed to register ClusterMgr! ret: %d", ret);
84 ClusterMgr::~ClusterMgr()
86 DBUG_ENTER(
"ClusterMgr::~ClusterMgr");
94 NdbCondition_Destroy(waitForHBCond);
95 NdbMutex_Destroy(clusterMgrThreadMutex);
100 ClusterMgr::configure(Uint32 nodeId,
104 for(iter.first(); iter.valid(); iter.next()){
106 if(iter.get(CFG_NODE_ID, &nodeId))
110 assert(nodeId > 0 && nodeId < MAX_NODES);
111 trp_node& theNode = theNodes[nodeId];
112 theNode.defined =
true;
115 if(iter.get(CFG_TYPE_OF_SECTION, &type))
135 for(Uint32
i = 0;
i<MAX_NODES;
i++) {
139 if (iter.find(CFG_NODE_ID,
i))
150 iter.find(CFG_NODE_ID, nodeId);
151 iter.get(CFG_NODE_ARBIT_RANK, &rank);
158 theArbitMgr->setRank(rank);
161 iter.get(CFG_NODE_ARBIT_DELAY, &delay);
162 theArbitMgr->setDelay(delay);
164 else if (theArbitMgr)
167 theArbitMgr->doStop(NULL);
174 ClusterMgr::startThread() {
175 Guard g(clusterMgrThreadMutex);
178 theClusterMgrThread = NdbThread_Create(runClusterMgr_C,
182 NDB_THREAD_PRIO_HIGH);
184 while (theStop == -1 && cnt < 60)
186 NdbCondition_WaitTimeout(waitForHBCond, clusterMgrThreadMutex, 1000);
189 assert(theStop == 0);
193 ClusterMgr::doStop( ){
194 DBUG_ENTER(
"ClusterMgr::doStop");
196 Guard g(clusterMgrThreadMutex);
204 if (theClusterMgrThread) {
205 NdbThread_WaitFor(theClusterMgrThread, &status);
206 NdbThread_Destroy(&theClusterMgrThread);
209 if (theArbitMgr != NULL)
211 theArbitMgr->doStop(NULL);
218 ClusterMgr::forceHB()
220 theFacade.lock_mutex();
224 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
225 theFacade.unlock_mutex();
233 waitForHBFromNodes.
clear();
234 for(Uint32
i = 1;
i < MAX_NDB_NODES;
i++)
242 waitForHBFromNodes.
bitOR(node.m_state.m_connected_nodes);
245 waitForHBFromNodes.
bitAND(ndb_nodes);
246 theFacade.unlock_mutex();
250 ndbout <<
"Waiting for HB from " << waitForHBFromNodes.
getText(buf) << endl;
252 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
254 signal.theVerId_signalNumber = GSN_API_REGREQ;
255 signal.theReceiversBlockNumber = QMGR;
257 signal.theLength = ApiRegReq::SignalLength;
260 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
261 req->version = NDB_VERSION;
262 req->mysql_version = NDB_MYSQL_VERSION_D;
268 (int) NodeBitmask::NotFound != (nodeId= waitForHBFromNodes.
find(
i));
272 ndbout <<
"FORCE HB to " << nodeId << endl;
274 raw_sendSignal(&signal, nodeId);
279 theFacade.lock_mutex();
280 if (!waitForHBFromNodes.
isclear())
281 NdbCondition_WaitTimeout(waitForHBCond, theFacade.theMutexPtr, 1000);
285 ndbout <<
"Still waiting for HB from " << waitForHBFromNodes.
getText(buf) << endl;
287 theFacade.unlock_mutex();
291 ClusterMgr::startup()
293 assert(theStop == -1);
294 Uint32 nodeId = getOwnNodeId();
295 Node & cm_node = theNodes[nodeId];
297 assert(theNode.defined);
300 theFacade.doConnect(nodeId);
303 for (Uint32
i = 0;
i<3000;
i++)
308 if (theNode.is_connected())
310 NdbSleep_MilliSleep(20);
313 assert(theNode.is_connected());
314 Guard g(clusterMgrThreadMutex);
316 NdbCondition_Broadcast(waitForHBCond);
320 ClusterMgr::threadMain()
324 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
326 signal.theVerId_signalNumber = GSN_API_REGREQ;
328 signal.theLength = ApiRegReq::SignalLength;
331 req->ref = numberToRef(API_CLUSTERMGR, theFacade.ownId());
332 req->version = NDB_VERSION;
333 req->mysql_version = NDB_MYSQL_VERSION_D;
335 NdbApiSignal nodeFail_signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
336 nodeFail_signal.theVerId_signalNumber = GSN_NODE_FAILREP;
337 nodeFail_signal.theReceiversBlockNumber = API_CLUSTERMGR;
338 nodeFail_signal.theTrace = 0;
339 nodeFail_signal.theLength = NodeFailRep::SignalLengthLong;
341 NDB_TICKS timeSlept = 100;
342 NDB_TICKS now = NdbTick_CurrentMillisecond();
347 NDB_TICKS before = now;
348 for (Uint32
i = 0;
i<10;
i++)
350 NdbSleep_MilliSleep(10);
352 Guard g(clusterMgrThreadMutex);
361 now = NdbTick_CurrentMillisecond();
362 timeSlept = (now - before);
364 if (m_cluster_state == CS_waiting_for_clean_cache &&
365 theFacade.m_globalDictCache)
367 if (!global_flag_skip_waiting_for_clean_cache)
369 theFacade.m_globalDictCache->lock();
370 unsigned sz= theFacade.m_globalDictCache->get_size();
371 theFacade.m_globalDictCache->unlock();
375 m_cluster_state = CS_waiting_for_first_connect;
380 nodeFail_signal.getDataPtrSend());
381 nodeFailRep->noOfNodes = 0;
382 NodeBitmask::clear(nodeFailRep->theNodes);
385 for (
int i = 1;
i < MAX_NODES;
i++){
390 const NodeId nodeId =
i;
392 assert(nodeId > 0 && nodeId < MAX_NODES);
393 Node & cm_node = theNodes[nodeId];
396 if (!theNode.defined)
399 if (theNode.is_connected() ==
false){
400 theFacade.doConnect(nodeId);
404 if (!theNode.compatible){
408 if (nodeId == getOwnNodeId() && theNode.is_confirmed())
417 cm_node.hbCounter += (Uint32)timeSlept;
418 if (cm_node.hbCounter >= m_max_api_reg_req_interval ||
419 cm_node.hbCounter >= cm_node.hbFrequency)
424 if (cm_node.hbCounter >= cm_node.hbFrequency)
427 cm_node.hbCounter = 0;
431 signal.theReceiversBlockNumber = API_CLUSTERMGR;
433 signal.theReceiversBlockNumber = QMGR;
436 ndbout_c(
"ClusterMgr: Sending API_REGREQ to node %d", (
int)nodeId);
438 raw_sendSignal(&signal, nodeId);
441 if (cm_node.hbMissed == 4 && cm_node.hbFrequency > 0)
443 nodeFailRep->noOfNodes++;
444 NodeBitmask::set(nodeFailRep->theNodes, nodeId);
448 if (nodeFailRep->noOfNodes)
450 raw_sendSignal(&nodeFail_signal, getOwnNodeId());
452 trp_client::unlock();
460 const Uint32 gsn = sig->theVerId_signalNumber;
465 execAPI_REGREQ(theData);
468 case GSN_API_REGCONF:
469 execAPI_REGCONF(sig, ptr);
473 execAPI_REGREF(theData);
476 case GSN_NODE_FAILREP:
477 execNODE_FAILREP(sig, ptr);
480 case GSN_NF_COMPLETEREP:
481 execNF_COMPLETEREP(sig, ptr);
483 case GSN_ARBIT_STARTREQ:
484 if (theArbitMgr != NULL)
485 theArbitMgr->doStart(theData);
488 case GSN_ARBIT_CHOOSEREQ:
489 if (theArbitMgr != NULL)
490 theArbitMgr->doChoose(theData);
493 case GSN_ARBIT_STOPORD:
494 if(theArbitMgr != NULL)
495 theArbitMgr->doStop(theData);
498 case GSN_ALTER_TABLE_REP:
500 if (theFacade.m_globalDictCache == NULL)
503 theFacade.m_globalDictCache->lock();
504 theFacade.m_globalDictCache->
505 alter_table_rep((
const char*)ptr[0].p,
508 rep->changeType == AlterTableRep::CT_ALTERED);
509 theFacade.m_globalDictCache->unlock();
512 case GSN_SUB_GCP_COMPLETE_REP:
523 BlockReference ownRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
525 Uint32* send= tSignal.getDataPtrSend();
526 memcpy(send, theData, tSignal.getLength() << 2);
528 Uint32 ref= sig->theSendersBlockRef;
529 Uint32 aNodeId= refToNode(ref);
530 tSignal.theReceiversBlockNumber= refToBlock(ref);
531 tSignal.theVerId_signalNumber= GSN_SUB_GCP_COMPLETE_ACK;
532 tSignal.theSendersBlockRef = API_CLUSTERMGR;
537 case GSN_TAKE_OVERTCCONF:
545 case GSN_CONNECT_REP:
547 execCONNECT_REP(sig, ptr);
550 case GSN_DISCONNECT_REP:
552 execDISCONNECT_REP(sig, ptr);
562 ClusterMgr::Node::Node()
563 : hbFrequency(0), hbCounter(0)
579 ClusterMgr::recalcMinDbVersion()
581 Uint32 newMinDbVersion = ~ (Uint32) 0;
583 for (Uint32
i = 0;
i < MAX_NODES;
i++)
587 if (node.is_connected() &&
588 node.is_confirmed() &&
596 if (node.minDbVersion < newMinDbVersion)
598 newMinDbVersion = node.minDbVersion;
606 newMinDbVersion = (newMinDbVersion == ~ (Uint32) 0) ?
613 if (newMinDbVersion != minDbVersion)
615 ndbout <<
"Previous min Db node version was "
623 ndbout <<
"MinDbVersion recalculated, but is same : "
629 minDbVersion = newMinDbVersion;
637 ClusterMgr::execAPI_REGREQ(
const Uint32 * theData){
639 const NodeId nodeId = refToNode(apiRegReq->ref);
642 ndbout_c(
"ClusterMgr: Recd API_REGREQ from node %d", nodeId);
645 assert(nodeId > 0 && nodeId < MAX_NODES);
647 Node & cm_node = theNodes[nodeId];
649 assert(node.defined ==
true);
650 assert(node.is_connected() ==
true);
652 if(node.m_info.
m_version != apiRegReq->version){
653 node.m_info.
m_version = apiRegReq->version;
655 if (node.m_info.
m_version < NDBD_SPLIT_VERSION)
658 if (getMajor(node.m_info.
m_version) < getMajor(NDB_VERSION) ||
659 getMinor(node.m_info.
m_version) < getMinor(NDB_VERSION)) {
660 node.compatible =
false;
662 node.compatible =
true;
666 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, theFacade.ownId()));
667 signal.theVerId_signalNumber = GSN_API_REGCONF;
668 signal.theReceiversBlockNumber = API_CLUSTERMGR;
670 signal.theLength = ApiRegConf::SignalLength;
673 conf->qmgrRef = numberToRef(API_CLUSTERMGR, theFacade.ownId());
674 conf->version = NDB_VERSION;
675 conf->mysql_version = NDB_MYSQL_VERSION_D;
676 conf->apiHeartbeatFrequency = cm_node.hbFrequency;
678 conf->minDbVersion= 0;
679 conf->nodeState= node.m_state;
681 node.set_confirmed(
true);
683 node.set_confirmed(
false);
687 ClusterMgr::execAPI_REGCONF(
const NdbApiSignal * signal,
692 const NodeId nodeId = refToNode(apiRegConf->qmgrRef);
695 ndbout_c(
"ClusterMgr: Recd API_REGCONF from node %d", nodeId);
698 assert(nodeId > 0 && nodeId < MAX_NODES);
700 Node & cm_node = theNodes[nodeId];
702 assert(node.defined ==
true);
703 assert(node.is_connected() ==
true);
705 if(node.m_info.
m_version != apiRegConf->version){
706 node.m_info.
m_version = apiRegConf->version;
708 if (node.m_info.
m_version < NDBD_SPLIT_VERSION)
711 if(theNodes[theFacade.ownId()].m_info.m_type ==
NodeInfo::MGM)
712 node.compatible = ndbCompatible_mgmt_ndb(NDB_VERSION,
715 node.compatible = ndbCompatible_api_ndb(NDB_VERSION,
719 node.set_confirmed(
true);
721 if (node.minDbVersion != apiRegConf->minDbVersion)
723 node.minDbVersion = apiRegConf->minDbVersion;
724 recalcMinDbVersion();
727 if (node.m_info.
m_version >= NDBD_255_NODES_VERSION)
729 node.m_state = apiRegConf->nodeState;
736 memcpy(&node.m_state, &apiRegConf->nodeState,
sizeof(node.m_state) - 24);
747 set_node_alive(node,
true);
751 set_node_alive(node,
false);
755 cm_node.hbMissed = 0;
756 cm_node.hbCounter = 0;
757 cm_node.hbFrequency = (apiRegConf->apiHeartbeatFrequency * 10) - 50;
761 theFacade.
for_each(
this, signal, ptr);
763 check_wait_for_hb(nodeId);
767 ClusterMgr::check_wait_for_hb(NodeId nodeId)
771 waitForHBFromNodes.
clear(nodeId);
773 if(waitForHBFromNodes.
isclear())
776 NdbCondition_Broadcast(waitForHBCond);
784 ClusterMgr::execAPI_REGREF(
const Uint32 * theData){
788 const NodeId nodeId = refToNode(ref->ref);
790 assert(nodeId > 0 && nodeId < MAX_NODES);
792 Node & cm_node = theNodes[nodeId];
795 assert(node.is_connected() ==
true);
796 assert(node.defined ==
true);
800 node.compatible =
false;
801 set_node_alive(node,
false);
805 switch(ref->errorCode){
806 case ApiRegRef::WrongType:
807 ndbout_c(
"Node %d reports that this node should be a NDB node", nodeId);
809 case ApiRegRef::UnsupportedVersion:
814 check_wait_for_hb(nodeId);
818 ClusterMgr::execNF_COMPLETEREP(
const NdbApiSignal* signal,
824 assert(nodeId > 0 && nodeId < MAX_NODES);
827 if (node.nfCompleteRep ==
false)
829 node.nfCompleteRep =
true;
830 theFacade.
for_each(
this, signal, ptr);
837 DBUG_ENTER(
"ClusterMgr::reportConnected");
838 DBUG_PRINT(
"info", (
"nodeId: %u", nodeId));
844 assert(nodeId > 0 && nodeId < MAX_NODES);
845 if (nodeId == getOwnNodeId())
847 noOfConnectedNodes--;
850 noOfConnectedNodes++;
852 Node & cm_node = theNodes[nodeId];
855 cm_node.hbMissed = 0;
856 cm_node.hbCounter = 0;
859 assert(theNode.is_connected() ==
false);
865 theNode.set_connected(
true);
866 theNode.m_state.m_connected_nodes.
set(nodeId);
868 theNode.compatible =
true;
869 theNode.nfCompleteRep =
true;
870 theNode.m_node_fail_rep =
false;
872 theNode.minDbVersion = 0;
881 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
882 signal.theVerId_signalNumber = GSN_CONNECT_REP;
883 signal.theReceiversBlockNumber = API_CLUSTERMGR;
885 signal.theLength = 1;
886 signal.getDataPtrSend()[0] = nodeId;
887 raw_sendSignal(&signal, getOwnNodeId());
899 ClusterMgr::set_node_dead(
trp_node& theNode)
901 set_node_alive(theNode,
false);
902 theNode.set_confirmed(
false);
903 theNode.m_state.m_connected_nodes.
clear();
906 theNode.nfCompleteRep =
false;
912 assert(nodeId > 0 && nodeId < MAX_NODES);
913 assert(noOfConnectedNodes > 0);
922 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
923 signal.theVerId_signalNumber = GSN_DISCONNECT_REP;
924 signal.theReceiversBlockNumber = API_CLUSTERMGR;
926 signal.theLength = DisconnectRep::SignalLength;
929 rep->nodeId = nodeId;
931 raw_sendSignal(&signal, getOwnNodeId());
939 Uint32 nodeId = rep->nodeId;
941 assert(nodeId > 0 && nodeId < MAX_NODES);
942 Node & cm_node = theNodes[nodeId];
945 bool node_failrep = theNode.m_node_fail_rep;
946 set_node_dead(theNode);
947 theNode.set_connected(
false);
949 noOfConnectedNodes--;
950 if (noOfConnectedNodes == 0)
952 if (!global_flag_skip_invalidate_cache &&
953 theFacade.m_globalDictCache)
955 theFacade.m_globalDictCache->lock();
956 theFacade.m_globalDictCache->invalidate_all();
957 theFacade.m_globalDictCache->unlock();
959 m_cluster_state = CS_waiting_for_clean_cache;
962 if (m_auto_reconnect == 0)
968 if (node_failrep ==
false)
973 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
974 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
975 signal.theReceiversBlockNumber = API_CLUSTERMGR;
977 signal.theLength = NodeFailRep::SignalLengthLong;
983 NodeBitmask::clear(rep->theNodes);
984 NodeBitmask::set(rep->theNodes, nodeId);
985 execNODE_FAILREP(&signal, 0);
996 signal.theVerId_signalNumber = GSN_NODE_FAILREP;
997 signal.theReceiversBlockNumber = API_CLUSTERMGR;
999 signal.theLength = NodeFailRep::SignalLengthLong;
1004 copy->noOfNodes = 0;
1005 NodeBitmask::clear(copy->theNodes);
1007 for (Uint32
i = NdbNodeBitmask::find_first(rep->theNodes);
1008 i != NdbNodeBitmask::NotFound;
1011 Node & cm_node = theNodes[
i];
1014 bool node_failrep = theNode.m_node_fail_rep;
1015 bool connected = theNode.is_connected();
1016 set_node_dead(theNode);
1018 if (node_failrep ==
false)
1020 theNode.m_node_fail_rep =
true;
1021 NodeBitmask::set(copy->theNodes,
i);
1027 theFacade.doDisconnect(
i);
1031 recalcMinDbVersion();
1032 if (copy->noOfNodes)
1034 theFacade.
for_each(
this, &signal, 0);
1037 if (noOfAliveNodes == 0)
1039 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, getOwnNodeId()));
1040 signal.theVerId_signalNumber = GSN_NF_COMPLETEREP;
1041 signal.theReceiversBlockNumber = 0;
1042 signal.theTrace = 0;
1043 signal.theLength = NFCompleteRep::SignalLength;
1047 rep->
nodeId = getOwnNodeId();
1049 rep->from = __LINE__;
1051 for (Uint32
i = 1;
i < MAX_NODES;
i++)
1054 if (theNode.defined && theNode.nfCompleteRep ==
false)
1057 execNF_COMPLETEREP(&signal, 0);
1064 ClusterMgr::print_nodes(
const char* where, NdbOut& out)
1066 out << where <<
" >>" << endl;
1067 for (NodeId
n = 1;
n < MAX_NODES ;
n++)
1072 out <<
"node: " <<
n << endl;
1074 out <<
" connected: " << node.is_connected();
1075 out <<
", compatible: " << node.compatible;
1076 out <<
", nf_complete_rep: " << node.nfCompleteRep;
1077 out <<
", alive: " << node.m_alive;
1078 out <<
", confirmed: " << node.is_confirmed();
1081 out <<
" - " << node.m_info << endl;
1082 out <<
" - " << node.m_state << endl;
1084 out <<
"<<" << endl;
1094 DBUG_ENTER(
"ArbitMgr::ArbitMgr");
1096 theThreadMutex = NdbMutex_Create();
1097 theInputCond = NdbCondition_Create();
1098 theInputMutex = NdbMutex_Create();
1104 theInputTimeout = 0;
1105 theInputFull =
false;
1106 memset(&theInputBuffer, 0,
sizeof(theInputBuffer));
1107 theState = StateInit;
1109 memset(&theStartReq, 0,
sizeof(theStartReq));
1110 memset(&theChooseReq1, 0,
sizeof(theChooseReq1));
1111 memset(&theChooseReq2, 0,
sizeof(theChooseReq2));
1112 memset(&theStopOrd, 0,
sizeof(theStopOrd));
1117 ArbitMgr::~ArbitMgr()
1119 DBUG_ENTER(
"ArbitMgr::~ArbitMgr");
1120 NdbMutex_Destroy(theThreadMutex);
1121 NdbCondition_Destroy(theInputCond);
1122 NdbMutex_Destroy(theInputMutex);
1130 ArbitMgr::doStart(
const Uint32* theData)
1132 ArbitSignal aSignal;
1133 NdbMutex_Lock(theThreadMutex);
1134 if (theThread != NULL) {
1135 aSignal.init(GSN_ARBIT_STOPORD, NULL);
1136 aSignal.data.code = StopRestart;
1137 sendSignalToThread(aSignal);
1139 NdbThread_WaitFor(theThread, &value);
1140 NdbThread_Destroy(&theThread);
1141 theState = StateInit;
1142 theInputFull =
false;
1144 aSignal.init(GSN_ARBIT_STARTREQ, theData);
1145 sendSignalToThread(aSignal);
1146 theThread = NdbThread_Create(
1147 runArbitMgr_C, (
void**)
this,
1150 NDB_THREAD_PRIO_HIGH);
1151 NdbMutex_Unlock(theThreadMutex);
1156 ArbitMgr::doChoose(
const Uint32* theData)
1158 ArbitSignal aSignal;
1159 aSignal.init(GSN_ARBIT_CHOOSEREQ, theData);
1160 sendSignalToThread(aSignal);
1166 ArbitMgr::doStop(
const Uint32* theData)
1168 DBUG_ENTER(
"ArbitMgr::doStop");
1169 ArbitSignal aSignal;
1170 NdbMutex_Lock(theThreadMutex);
1171 if (theThread != NULL) {
1172 aSignal.init(GSN_ARBIT_STOPORD, theData);
1174 aSignal.data.code = StopExit;
1176 aSignal.data.code = StopRequest;
1178 sendSignalToThread(aSignal);
1180 NdbThread_WaitFor(theThread, &value);
1181 NdbThread_Destroy(&theThread);
1182 theState = StateInit;
1184 NdbMutex_Unlock(theThreadMutex);
1192 runArbitMgr_C(
void* me)
1199 ArbitMgr::sendSignalToThread(ArbitSignal& aSignal)
1203 ndbout <<
"arbit recv: ";
1204 ndbout <<
" gsn=" << aSignal.gsn;
1205 ndbout <<
" send=" << aSignal.data.sender;
1206 ndbout <<
" code=" << aSignal.data.code;
1207 ndbout <<
" node=" << aSignal.data.node;
1208 ndbout <<
" ticket=" << aSignal.data.ticket.getText(buf,
sizeof(buf));
1209 ndbout <<
" mask=" << aSignal.data.mask.getText(buf,
sizeof(buf));
1212 aSignal.setTimestamp();
1213 NdbMutex_Lock(theInputMutex);
1214 while (theInputFull) {
1215 NdbCondition_WaitTimeout(theInputCond, theInputMutex, 1000);
1217 theInputBuffer = aSignal;
1218 theInputFull =
true;
1219 NdbCondition_Signal(theInputCond);
1220 NdbMutex_Unlock(theInputMutex);
1224 ArbitMgr::threadMain()
1226 ArbitSignal aSignal;
1227 aSignal = theInputBuffer;
1228 threadStart(aSignal);
1231 NdbMutex_Lock(theInputMutex);
1232 while (! theInputFull) {
1233 NdbCondition_WaitTimeout(theInputCond, theInputMutex, theInputTimeout);
1236 aSignal = theInputBuffer;
1237 theInputFull =
false;
1238 NdbCondition_Signal(theInputCond);
1239 NdbMutex_Unlock(theInputMutex);
1240 switch (aSignal.gsn) {
1241 case GSN_ARBIT_CHOOSEREQ:
1242 threadChoose(aSignal);
1244 case GSN_ARBIT_STOPORD:
1249 threadStop(aSignal);
1255 ArbitMgr::threadStart(ArbitSignal& aSignal)
1257 theStartReq = aSignal;
1258 sendStartConf(theStartReq, ArbitCode::ApiStart);
1259 theState = StateStarted;
1260 theInputTimeout = 1000;
1264 ArbitMgr::threadChoose(ArbitSignal& aSignal)
1268 if (! theStartReq.data.match(aSignal.data)) {
1269 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1272 theChooseReq1 = aSignal;
1273 if (theDelay == 0) {
1274 sendChooseConf(aSignal, ArbitCode::WinChoose);
1275 theState = StateFinished;
1276 theInputTimeout = 1000;
1279 theState = StateChoose1;
1280 theInputTimeout = 1;
1283 if (! theStartReq.data.match(aSignal.data)) {
1284 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1287 theChooseReq2 = aSignal;
1288 theState = StateChoose2;
1289 theInputTimeout = 1;
1292 if (! theStartReq.data.match(aSignal.data)) {
1293 sendChooseRef(aSignal, ArbitCode::ErrTicket);
1296 sendChooseRef(theChooseReq1, ArbitCode::ErrToomany);
1297 sendChooseRef(theChooseReq2, ArbitCode::ErrToomany);
1298 sendChooseRef(aSignal, ArbitCode::ErrToomany);
1299 theState = StateFinished;
1300 theInputTimeout = 1000;
1303 sendChooseRef(aSignal, ArbitCode::ErrState);
1309 ArbitMgr::threadTimeout()
1315 if (theChooseReq1.getTimediff() < theDelay)
1317 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1318 theState = StateFinished;
1319 theInputTimeout = 1000;
1322 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1323 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1324 theState = StateFinished;
1325 theInputTimeout = 1000;
1333 ArbitMgr::threadStop(ArbitSignal& aSignal)
1335 switch (aSignal.data.code) {
1339 sendStopRep(theStartReq, 0);
1342 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1345 sendChooseConf(theChooseReq1, ArbitCode::WinChoose);
1346 sendChooseConf(theChooseReq2, ArbitCode::LoseChoose);
1364 ArbitMgr::sendStartConf(ArbitSignal& aSignal, Uint32
code)
1366 ArbitSignal copySignal = aSignal;
1367 copySignal.gsn = GSN_ARBIT_STARTCONF;
1368 copySignal.data.code = code;
1369 sendSignalToQmgr(copySignal);
1373 ArbitMgr::sendChooseConf(ArbitSignal& aSignal, Uint32
code)
1375 ArbitSignal copySignal = aSignal;
1376 copySignal.gsn = GSN_ARBIT_CHOOSECONF;
1377 copySignal.data.code = code;
1378 sendSignalToQmgr(copySignal);
1382 ArbitMgr::sendChooseRef(ArbitSignal& aSignal, Uint32
code)
1384 ArbitSignal copySignal = aSignal;
1385 copySignal.gsn = GSN_ARBIT_CHOOSEREF;
1386 copySignal.data.code = code;
1387 sendSignalToQmgr(copySignal);
1391 ArbitMgr::sendStopRep(ArbitSignal& aSignal, Uint32
code)
1393 ArbitSignal copySignal = aSignal;
1394 copySignal.gsn = GSN_ARBIT_STOPREP;
1395 copySignal.data.code = code;
1396 sendSignalToQmgr(copySignal);
1406 ArbitMgr::sendSignalToQmgr(ArbitSignal& aSignal)
1408 NdbApiSignal signal(numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId()));
1410 signal.theVerId_signalNumber = aSignal.gsn;
1411 signal.theReceiversBlockNumber = QMGR;
1412 signal.theTrace = 0;
1413 signal.theLength = ArbitSignalData::SignalLength;
1417 sd->sender = numberToRef(API_CLUSTERMGR, m_clusterMgr.getOwnNodeId());
1418 sd->code = aSignal.data.code;
1419 sd->node = aSignal.data.node;
1420 sd->ticket = aSignal.data.ticket;
1421 sd->mask = aSignal.data.mask;
1425 ndbout <<
"arbit send: ";
1426 ndbout <<
" gsn=" << aSignal.gsn;
1427 ndbout <<
" recv=" << aSignal.data.sender;
1428 ndbout <<
" code=" << aSignal.data.code;
1429 ndbout <<
" node=" << aSignal.data.node;
1430 ndbout <<
" ticket=" << aSignal.data.ticket.getText(buf,
sizeof(buf));
1431 ndbout <<
" mask=" << aSignal.data.mask.getText(buf,
sizeof(buf));
1436 m_clusterMgr.lock();
1437 m_clusterMgr.raw_sendSignal(&signal, aSignal.data.sender);
1438 m_clusterMgr.unlock();