17 #include "LocalProxy.hpp"
19 LocalProxy::LocalProxy(BlockNumber blockNumber,
Block_context& ctx) :
24 ndbrequire(instance() == 0);
29 for (i = 0; i < MaxWorkers; i++)
34 c_typeOfStart = NodeState::ST_ILLEGAL_TYPE;
35 c_masterNodeId = ZNIL;
38 addRecSignal(GSN_READ_CONFIG_REQ, &LocalProxy::execREAD_CONFIG_REQ,
true);
39 addRecSignal(GSN_READ_CONFIG_CONF, &LocalProxy::execREAD_CONFIG_CONF,
true);
42 addRecSignal(GSN_STTOR, &LocalProxy::execSTTOR);
43 addRecSignal(GSN_STTORRY, &LocalProxy::execSTTORRY);
46 addRecSignal(GSN_NDB_STTOR, &LocalProxy::execNDB_STTOR);
47 addRecSignal(GSN_NDB_STTORRY, &LocalProxy::execNDB_STTORRY);
50 addRecSignal(GSN_READ_NODESCONF, &LocalProxy::execREAD_NODESCONF);
51 addRecSignal(GSN_READ_NODESREF, &LocalProxy::execREAD_NODESREF);
54 addRecSignal(GSN_NODE_FAILREP, &LocalProxy::execNODE_FAILREP);
55 addRecSignal(GSN_NF_COMPLETEREP, &LocalProxy::execNF_COMPLETEREP);
58 addRecSignal(GSN_INCL_NODEREQ, &LocalProxy::execINCL_NODEREQ);
59 addRecSignal(GSN_INCL_NODECONF, &LocalProxy::execINCL_NODECONF);
62 addRecSignal(GSN_NODE_STATE_REP, &LocalProxy::execNODE_STATE_REP,
true);
65 addRecSignal(GSN_CHANGE_NODE_STATE_REQ, &LocalProxy::execCHANGE_NODE_STATE_REQ,
true);
66 addRecSignal(GSN_CHANGE_NODE_STATE_CONF, &LocalProxy::execCHANGE_NODE_STATE_CONF);
69 addRecSignal(GSN_DUMP_STATE_ORD, &LocalProxy::execDUMP_STATE_ORD);
72 addRecSignal(GSN_NDB_TAMPER, &LocalProxy::execNDB_TAMPER,
true);
75 addRecSignal(GSN_TIME_SIGNAL, &LocalProxy::execTIME_SIGNAL);
78 addRecSignal(GSN_CREATE_TRIG_IMPL_REQ, &LocalProxy::execCREATE_TRIG_IMPL_REQ);
79 addRecSignal(GSN_CREATE_TRIG_IMPL_CONF, &LocalProxy::execCREATE_TRIG_IMPL_CONF);
80 addRecSignal(GSN_CREATE_TRIG_IMPL_REF, &LocalProxy::execCREATE_TRIG_IMPL_REF);
83 addRecSignal(GSN_DROP_TRIG_IMPL_REQ, &LocalProxy::execDROP_TRIG_IMPL_REQ);
84 addRecSignal(GSN_DROP_TRIG_IMPL_CONF, &LocalProxy::execDROP_TRIG_IMPL_CONF);
85 addRecSignal(GSN_DROP_TRIG_IMPL_REF, &LocalProxy::execDROP_TRIG_IMPL_REF);
88 addRecSignal(GSN_DBINFO_SCANREQ, &LocalProxy::execDBINFO_SCANREQ);
89 addRecSignal(GSN_DBINFO_SCANCONF, &LocalProxy::execDBINFO_SCANCONF);
92 addRecSignal(GSN_SYNC_REQ, &LocalProxy::execSYNC_REQ,
true);
93 addRecSignal(GSN_SYNC_REF, &LocalProxy::execSYNC_REF);
94 addRecSignal(GSN_SYNC_CONF, &LocalProxy::execSYNC_CONF);
97 addRecSignal(GSN_SYNC_PATH_REQ, &LocalProxy::execSYNC_PATH_REQ,
true);
100 LocalProxy::~LocalProxy()
108 LocalProxy::sendREQ(
Signal* signal, SsSequential& ss)
111 ndbrequire(ss.m_sendREQ != 0);
113 restoreHandle(handle, ss);
114 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
115 saveSections(ss, handle);
119 LocalProxy::recvCONF(
Signal* signal, SsSequential& ss)
121 ndbrequire(ss.m_sendCONF != 0);
122 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
125 if (ss.m_worker < c_workers) {
127 ndbrequire(ss.m_sendREQ != 0);
129 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
135 LocalProxy::recvREF(
Signal* signal, SsSequential& ss, Uint32 error)
137 ndbrequire(error != 0);
140 recvCONF(signal, ss);
144 LocalProxy::skipReq(SsSequential& ss)
149 LocalProxy::skipConf(SsSequential& ss)
154 LocalProxy::saveSections(SsCommon& ss,
SectionHandle & handle)
156 ss.m_sec_cnt = handle.m_cnt;
157 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
158 ss.m_sec_ptr[i] = handle.m_ptr[i].i;
163 LocalProxy::restoreHandle(
SectionHandle & handle, SsCommon& ss)
165 handle.m_cnt = ss.m_sec_cnt;
166 for (Uint32 i = 0; i<ss.m_sec_cnt; i++)
167 handle.m_ptr[i].i = ss.m_sec_ptr[i];
169 getSections(handle.m_cnt, handle.m_ptr);
174 LocalProxy::firstReply(
const SsSequential& ss)
176 return ss.m_worker == 0;
180 LocalProxy::lastReply(
const SsSequential& ss)
182 return ss.m_worker + 1 == c_workers;
186 LocalProxy::sendREQ(
Signal* signal, SsParallel& ss)
188 ndbrequire(ss.m_sendREQ != 0);
190 ss.m_workerMask.clear();
192 const Uint32 count = ss.m_extraLast ? c_lqhWorkers : c_workers;
194 restoreHandle(handle, ss);
195 while (ss.m_worker < count) {
197 ss.m_workerMask.set(ss.m_worker);
198 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
201 releaseSections(handle);
205 LocalProxy::recvCONF(
Signal* signal, SsParallel& ss)
207 ndbrequire(ss.m_sendCONF != 0);
209 BlockReference ref = signal->getSendersBlockRef();
210 ndbrequire(refToMain(ref) == number());
212 Uint32 ino = refToInstance(ref);
213 ss.m_worker = workerIndex(ino);
214 ndbrequire(ref == workerRef(ss.m_worker));
215 ndbrequire(ss.m_worker < c_workers);
216 ndbrequire(ss.m_workerMask.get(ss.m_worker));
217 ss.m_workerMask.clear(ss.m_worker);
219 (this->*ss.m_sendCONF)(signal, ss.m_ssId);
223 LocalProxy::recvREF(
Signal* signal, SsParallel& ss, Uint32 error)
225 ndbrequire(error != 0);
228 recvCONF(signal, ss);
232 LocalProxy::skipReq(SsParallel& ss)
234 ndbrequire(ss.m_workerMask.get(ss.m_worker));
235 ss.m_workerMask.clear(ss.m_worker);
240 LocalProxy::skipConf(SsParallel& ss)
242 ndbrequire(!ss.m_workerMask.get(ss.m_worker));
243 ss.m_workerMask.set(ss.m_worker);
247 LocalProxy::firstReply(
const SsParallel& ss)
249 const WorkerMask& mask = ss.m_workerMask;
250 const Uint32 count = mask.count();
253 ndbrequire(ss.m_worker < c_workers);
254 ndbrequire(!mask.get(ss.m_worker));
255 ndbrequire(count < c_workers);
256 return count + 1 == c_workers;
260 LocalProxy::lastReply(
const SsParallel& ss)
262 return ss.m_workerMask.isclear();
266 LocalProxy::lastExtra(
Signal* signal, SsParallel& ss)
269 if (c_lqhWorkers + ss.m_extraSent < c_workers) {
271 ss.m_worker = c_lqhWorkers + ss.m_extraSent;
272 ss.m_workerMask.set(ss.m_worker);
273 (this->*ss.m_sendREQ)(signal, ss.m_ssId, &handle);
282 LocalProxy::setMask(SsParallel& ss)
285 for (i = 0; i < c_workers; i++)
286 ss.m_workerMask.set(i);
290 LocalProxy::setMask(SsParallel& ss,
const WorkerMask& mask)
292 ss.m_workerMask.assign(mask);
298 LocalProxy::loadWorkers()
300 c_lqhWorkers = getLqhWorkers();
301 c_workers = c_lqhWorkers + c_extraWorkers;
304 for (i = 0; i < c_workers; i++) {
306 Uint32 instanceNo = workerInstance(i);
309 ndbrequire(worker->instance() == instanceNo);
310 ndbrequire(this->getInstance(instanceNo) == worker);
311 c_worker[
i] = worker;
313 if (i < c_lqhWorkers) {
314 add_lqh_worker_thr_map(number(), instanceNo);
316 add_extra_worker_thr_map(number(), instanceNo);
324 LocalProxy::execREAD_CONFIG_REQ(
Signal* signal)
326 Ss_READ_CONFIG_REQ& ss = ssSeize<Ss_READ_CONFIG_REQ>(1);
330 ndbrequire(ss.m_req.noOfParameters == 0);
331 callREAD_CONFIG_REQ(signal);
335 LocalProxy::callREAD_CONFIG_REQ(
Signal* signal)
337 backREAD_CONFIG_REQ(signal);
341 LocalProxy::backREAD_CONFIG_REQ(
Signal* signal)
343 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(1);
350 LocalProxy::sendREAD_CONFIG_REQ(
Signal* signal, Uint32 ssId,
353 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
356 req->senderRef = reference();
357 req->senderData = ssId;
358 req->noOfParameters = 0;
359 sendSignalNoRelease(workerRef(ss.m_worker), GSN_READ_CONFIG_REQ,
360 signal, ReadConfigReq::SignalLength, JBB, handle);
364 LocalProxy::execREAD_CONFIG_CONF(
Signal* signal)
367 Uint32 ssId = conf->senderData;
368 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
369 recvCONF(signal, ss);
373 LocalProxy::sendREAD_CONFIG_CONF(
Signal* signal, Uint32 ssId)
375 Ss_READ_CONFIG_REQ& ss = ssFind<Ss_READ_CONFIG_REQ>(ssId);
381 restoreHandle(handle, ss);
382 releaseSections(handle);
385 conf->senderRef = reference();
386 conf->senderData = ss.m_req.senderData;
387 sendSignal(ss.m_req.senderRef, GSN_READ_CONFIG_CONF,
388 signal, ReadConfigConf::SignalLength, JBB);
390 ssRelease<Ss_READ_CONFIG_REQ>(ssId);
396 LocalProxy::execSTTOR(
Signal* signal)
398 Ss_STTOR& ss = ssSeize<Ss_STTOR>(1);
400 const Uint32 startphase = signal->theData[1];
401 const Uint32 typeOfStart = signal->theData[7];
403 if (startphase == 3) {
405 c_typeOfStart = typeOfStart;
408 ss.m_reqlength = signal->getLength();
409 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
415 LocalProxy::callSTTOR(
Signal* signal)
421 LocalProxy::backSTTOR(
Signal* signal)
423 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
430 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
432 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
433 sendSignalNoRelease(workerRef(ss.m_worker), GSN_STTOR,
434 signal, ss.m_reqlength, JBB, handle);
438 LocalProxy::execSTTORRY(
Signal* signal)
440 Ss_STTOR& ss = ssFind<Ss_STTOR>(1);
441 recvCONF(signal, ss);
445 LocalProxy::sendSTTORRY(
Signal* signal, Uint32 ssId)
447 Ss_STTOR& ss = ssFind<Ss_STTOR>(ssId);
449 const Uint32 conflength = signal->getLength();
450 const Uint32* confdata = signal->getDataPtr();
453 if (firstReply(ss)) {
454 ss.m_conflength = conflength;
455 memcpy(ss.m_confdata, confdata, conflength << 2);
457 ndbrequire(ss.m_conflength == conflength);
458 ndbrequire(memcmp(ss.m_confdata, confdata, conflength << 2) == 0);
464 memcpy(signal->getDataPtrSend(), ss.m_confdata, ss.m_conflength << 2);
465 sendSignal(NDBCNTR_REF, GSN_STTORRY,
466 signal, ss.m_conflength, JBB);
468 ssRelease<Ss_STTOR>(ssId);
474 LocalProxy::execNDB_STTOR(
Signal* signal)
476 Ss_NDB_STTOR& ss = ssSeize<Ss_NDB_STTOR>(1);
481 callNDB_STTOR(signal);
485 LocalProxy::callNDB_STTOR(
Signal* signal)
487 backNDB_STTOR(signal);
491 LocalProxy::backNDB_STTOR(
Signal* signal)
493 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
500 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
504 req->senderRef = reference();
505 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_STTOR,
506 signal, ss.m_reqlength, JBB, handle);
510 LocalProxy::execNDB_STTORRY(
Signal* signal)
512 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(1);
516 ndbrequire(conf->senderRef == signal->getSendersBlockRef());
517 recvCONF(signal, ss);
521 LocalProxy::sendNDB_STTORRY(
Signal* signal, Uint32 ssId)
523 Ss_NDB_STTOR& ss = ssFind<Ss_NDB_STTOR>(ssId);
529 conf->senderRef = reference();
530 sendSignal(NDBCNTR_REF, GSN_NDB_STTORRY,
531 signal, NdbSttorry::SignalLength, JBB);
533 ssRelease<Ss_NDB_STTOR>(ssId);
539 LocalProxy::sendREAD_NODESREQ(
Signal* signal)
541 signal->theData[0] = reference();
542 sendSignal(NDBCNTR_REF, GSN_READ_NODESREQ, signal, 1, JBB);
546 LocalProxy::execREAD_NODESCONF(
Signal* signal)
548 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
552 c_masterNodeId = conf->masterNodeId;
559 backNDB_STTOR(signal);
570 LocalProxy::execREAD_NODESREF(
Signal* signal)
572 Ss_READ_NODES_REQ& ss = c_ss_READ_NODESREQ;
573 ndbrequire(ss.m_gsn != 0);
580 LocalProxy::execNODE_FAILREP(
Signal* signal)
582 Ss_NODE_FAILREP& ss = ssFindSeize<Ss_NODE_FAILREP>(1, 0);
585 ndbrequire(signal->getLength() == NodeFailRep::SignalLength);
588 mask.
assign(NdbNodeBitmask::Size, req->theNodes);
591 for (Uint32 i = 0; i < c_workers; i++)
599 if (ss.noReply(number()))
602 ssRelease<Ss_NODE_FAILREP>(ss);
609 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(ssId);
613 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_FAILREP,
614 signal, NodeFailRep::SignalLength, JBB, handle);
618 LocalProxy::execNF_COMPLETEREP(
Signal* signal)
620 Ss_NODE_FAILREP& ss = ssFind<Ss_NODE_FAILREP>(1);
621 ndbrequire(!ss.noReply(number()));
622 ss.m_workerMask.set(ss.m_worker);
623 recvCONF(signal, ss);
632 Uint32 node = conf->failedNodeId;
636 ndbrequire(waitFor.
get(node));
640 for (Uint32 i = 0; i < c_workers; i++)
644 if (waitFor.
get(node))
657 conf->
nodeId = getOwnNodeId();
660 conf->from = __LINE__;
662 sendSignal(DBDIH_REF, GSN_NF_COMPLETEREP,
663 signal, NFCompleteRep::SignalLength, JBB);
665 if (number() == DBTC)
674 sendSignal(QMGR_REF, GSN_NF_COMPLETEREP, signal,
675 NFCompleteRep::SignalLength, JBB);
683 LocalProxy::execINCL_NODEREQ(
Signal* signal)
685 Ss_INCL_NODEREQ& ss = ssSeize<Ss_INCL_NODEREQ>(1);
687 ss.m_reqlength = signal->getLength();
688 ndbrequire(
sizeof(ss.m_req) >= (ss.m_reqlength << 2));
689 memcpy(&ss.m_req, signal->getDataPtr(), ss.m_reqlength << 2);
697 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
699 Ss_INCL_NODEREQ::Req* req =
700 (Ss_INCL_NODEREQ::Req*)signal->getDataPtrSend();
702 memcpy(req, &ss.m_req, ss.m_reqlength << 2);
703 req->senderRef = reference();
704 sendSignalNoRelease(workerRef(ss.m_worker), GSN_INCL_NODEREQ,
705 signal, ss.m_reqlength, JBB, handle);
709 LocalProxy::execINCL_NODECONF(
Signal* signal)
711 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(1);
712 recvCONF(signal, ss);
716 LocalProxy::sendINCL_NODECONF(
Signal* signal, Uint32 ssId)
718 Ss_INCL_NODEREQ& ss = ssFind<Ss_INCL_NODEREQ>(ssId);
723 Ss_INCL_NODEREQ::Conf* conf =
724 (Ss_INCL_NODEREQ::Conf*)signal->getDataPtrSend();
726 conf->inclNodeId = ss.m_req.inclNodeId;
727 conf->senderRef = reference();
728 sendSignal(ss.m_req.senderRef, GSN_INCL_NODECONF,
731 ssRelease<Ss_INCL_NODEREQ>(ssId);
737 LocalProxy::execNODE_STATE_REP(
Signal* signal)
739 Ss_NODE_STATE_REP& ss = ssSeize<Ss_NODE_STATE_REP>();
741 SimulatedBlock::execNODE_STATE_REP(signal);
742 ssRelease<Ss_NODE_STATE_REP>(ss);
746 LocalProxy::sendNODE_STATE_REP(
Signal* signal, Uint32 ssId,
749 Ss_NODE_STATE_REP& ss = ssFind<Ss_NODE_STATE_REP>(ssId);
751 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NODE_STATE_REP,
752 signal,NodeStateRep::SignalLength, JBB, handle);
758 LocalProxy::execCHANGE_NODE_STATE_REQ(
Signal* signal)
760 Ss_CHANGE_NODE_STATE_REQ& ss = ssSeize<Ss_CHANGE_NODE_STATE_REQ>(1);
769 LocalProxy::sendCHANGE_NODE_STATE_REQ(
Signal* signal, Uint32 ssId,
772 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(ssId);
775 req->senderRef = reference();
777 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CHANGE_NODE_STATE_REQ,
778 signal, ChangeNodeStateReq::SignalLength, JBB, handle);
782 LocalProxy::execCHANGE_NODE_STATE_CONF(
Signal* signal)
784 Ss_CHANGE_NODE_STATE_REQ& ss = ssFind<Ss_CHANGE_NODE_STATE_REQ>(1);
787 ndbrequire(conf->senderData == ss.m_req.senderData);
788 recvCONF(signal, ss);
805 ssRelease<Ss_CHANGE_NODE_STATE_REQ>(ssId);
811 LocalProxy::execDUMP_STATE_ORD(
Signal* signal)
813 Ss_DUMP_STATE_ORD& ss = ssSeize<Ss_DUMP_STATE_ORD>();
815 ss.m_reqlength = signal->getLength();
816 memcpy(ss.m_reqdata, signal->getDataPtr(), ss.m_reqlength << 2);
818 ssRelease<Ss_DUMP_STATE_ORD>(ss);
822 LocalProxy::sendDUMP_STATE_ORD(
Signal* signal, Uint32 ssId,
825 Ss_DUMP_STATE_ORD& ss = ssFind<Ss_DUMP_STATE_ORD>(ssId);
827 memcpy(signal->getDataPtrSend(), ss.m_reqdata, ss.m_reqlength << 2);
828 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DUMP_STATE_ORD,
829 signal, ss.m_reqlength, JBB, handle);
835 LocalProxy::execNDB_TAMPER(
Signal* signal)
837 Ss_NDB_TAMPER& ss = ssSeize<Ss_NDB_TAMPER>();
839 ndbrequire(signal->getLength() == 1);
840 ss.m_errorInsert = signal->theData[0];
842 SimulatedBlock::execNDB_TAMPER(signal);
844 ssRelease<Ss_NDB_TAMPER>(ss);
850 Ss_NDB_TAMPER& ss = ssFind<Ss_NDB_TAMPER>(ssId);
852 signal->theData[0] = ss.m_errorInsert;
853 sendSignalNoRelease(workerRef(ss.m_worker), GSN_NDB_TAMPER,
854 signal, 1, JBB, handle);
860 LocalProxy::execTIME_SIGNAL(
Signal* signal)
862 Ss_TIME_SIGNAL& ss = ssSeize<Ss_TIME_SIGNAL>();
865 ssRelease<Ss_TIME_SIGNAL>(ss);
871 Ss_TIME_SIGNAL& ss = ssFind<Ss_TIME_SIGNAL>(ssId);
872 signal->theData[0] = 0;
873 sendSignalNoRelease(workerRef(ss.m_worker), GSN_TIME_SIGNAL,
874 signal, 1, JBB, handle);
880 LocalProxy::execCREATE_TRIG_IMPL_REQ(
Signal* signal)
882 if (!assembleFragments(signal))
885 if (ssQueue<Ss_CREATE_TRIG_IMPL_REQ>(signal))
888 Ss_CREATE_TRIG_IMPL_REQ& ss = ssSeize<Ss_CREATE_TRIG_IMPL_REQ>();
890 ndbrequire(signal->getLength() <= CreateTrigImplReq::SignalLength);
893 saveSections(ss, handle);
899 LocalProxy::sendCREATE_TRIG_IMPL_REQ(
Signal* signal, Uint32 ssId,
902 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
906 req->senderRef = reference();
907 req->senderData = ssId;
908 sendSignalNoRelease(workerRef(ss.m_worker), GSN_CREATE_TRIG_IMPL_REQ,
909 signal, CreateTrigImplReq::SignalLength, JBB,
914 LocalProxy::execCREATE_TRIG_IMPL_CONF(
Signal* signal)
917 Uint32 ssId = conf->senderData;
918 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
919 recvCONF(signal, ss);
923 LocalProxy::execCREATE_TRIG_IMPL_REF(
Signal* signal)
926 Uint32 ssId = ref->senderData;
927 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
928 recvREF(signal, ss, ref->errorCode);
932 LocalProxy::sendCREATE_TRIG_IMPL_CONF(
Signal* signal, Uint32 ssId)
934 Ss_CREATE_TRIG_IMPL_REQ& ss = ssFind<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
935 BlockReference dictRef = ss.m_req.senderRef;
940 if (ss.m_error == 0) {
943 conf->senderRef = reference();
944 conf->senderData = ss.m_req.senderData;
945 conf->tableId = ss.m_req.tableId;
946 conf->triggerId = ss.m_req.triggerId;
947 conf->triggerInfo = ss.m_req.triggerInfo;
948 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_CONF,
949 signal, CreateTrigImplConf::SignalLength, JBB);
952 ref->senderRef = reference();
953 ref->senderData = ss.m_req.senderData;
954 ref->tableId = ss.m_req.tableId;
955 ref->triggerId = ss.m_req.triggerId;
956 ref->triggerInfo = ss.m_req.triggerInfo;
957 ref->errorCode = ss.m_error;
958 sendSignal(dictRef, GSN_CREATE_TRIG_IMPL_REF,
959 signal, CreateTrigImplRef::SignalLength, JBB);
962 ssRelease<Ss_CREATE_TRIG_IMPL_REQ>(ssId);
968 LocalProxy::execDROP_TRIG_IMPL_REQ(
Signal* signal)
970 if (ssQueue<Ss_DROP_TRIG_IMPL_REQ>(signal))
973 Ss_DROP_TRIG_IMPL_REQ& ss = ssSeize<Ss_DROP_TRIG_IMPL_REQ>();
975 ndbrequire(signal->getLength() == DropTrigImplReq::SignalLength);
980 LocalProxy::sendDROP_TRIG_IMPL_REQ(
Signal* signal, Uint32 ssId,
983 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
987 req->senderRef = reference();
988 req->senderData = ssId;
989 sendSignalNoRelease(workerRef(ss.m_worker), GSN_DROP_TRIG_IMPL_REQ,
990 signal, DropTrigImplReq::SignalLength, JBB, handle);
994 LocalProxy::execDROP_TRIG_IMPL_CONF(
Signal* signal)
997 Uint32 ssId = conf->senderData;
998 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
999 recvCONF(signal, ss);
1003 LocalProxy::execDROP_TRIG_IMPL_REF(
Signal* signal)
1006 Uint32 ssId = ref->senderData;
1007 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1008 recvREF(signal, ss, ref->errorCode);
1012 LocalProxy::sendDROP_TRIG_IMPL_CONF(
Signal* signal, Uint32 ssId)
1014 Ss_DROP_TRIG_IMPL_REQ& ss = ssFind<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1015 BlockReference dictRef = ss.m_req.senderRef;
1020 if (ss.m_error == 0) {
1023 conf->senderRef = reference();
1024 conf->senderData = ss.m_req.senderData;
1025 conf->tableId = ss.m_req.tableId;
1026 conf->triggerId = ss.m_req.triggerId;
1027 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_CONF,
1028 signal, DropTrigImplConf::SignalLength, JBB);
1031 ref->senderRef = reference();
1032 ref->senderData = ss.m_req.senderData;
1033 ref->tableId = ss.m_req.tableId;
1034 ref->triggerId = ss.m_req.triggerId;
1035 ref->errorCode = ss.m_error;
1036 sendSignal(dictRef, GSN_DROP_TRIG_IMPL_REF,
1037 signal, DropTrigImplRef::SignalLength, JBB);
1040 ssRelease<Ss_DROP_TRIG_IMPL_REQ>(ssId);
1046 #ifdef DBINFO_SCAN_TRACE
1047 #include <debugger/DebuggerNames.hpp>
1051 switchRef(Uint32
block, Uint32 instance, Uint32 node)
1053 const Uint32 ref = numberToRef(block, instance, node);
1054 #ifdef DBINFO_SCAN_TRACE
1055 ndbout_c(
"Dbinfo::LocalProxy: switching to %s(%d) in node %d, ref: 0x%.8x",
1056 getBlockName(block,
"<unknown>"), instance, node, ref);
1065 const Uint32 node = refToNode(cursor->currRef);
1066 const Uint32 block = refToMain(cursor->currRef);
1067 Uint32 instance = refToInstance(cursor->currRef);
1069 ndbrequire(node == getOwnNodeId());
1070 ndbrequire(block == number());
1073 Uint32 worker = (instance > 0) ? workerIndex(instance) + 1 : 0;
1075 if (worker < c_workers)
1078 cursor->currRef = switchRef(block, workerInstance(worker), node);
1082 cursor->currRef = numberToRef(block, node);
1089 LocalProxy::execDBINFO_SCANREQ(
Signal* signal)
1093 Uint32 signal_length = signal->getLength();
1094 ndbrequire(signal_length == DbinfoScanReq::SignalLength+req->cursor_sz);
1099 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->
flags) &&
1100 cursor->saveCurrRef)
1104 cursor->currRef = cursor->saveCurrRef;
1105 cursor->saveCurrRef = 0;
1108 cursor->saveSenderRef = cursor->senderRef;
1109 cursor->senderRef = reference();
1111 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1112 signal, signal_length, JBB);
1116 Ndbinfo::ScanCursor::setHasMoreData(cursor->
flags,
false);
1118 if (find_next(cursor))
1121 ndbrequire(cursor->currRef);
1122 ndbrequire(cursor->saveCurrRef == 0);
1125 cursor->saveSenderRef = cursor->senderRef;
1126 cursor->senderRef = reference();
1128 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1129 signal, signal_length, JBB);
1134 ndbrequire(cursor->saveSenderRef == 0);
1136 ndbrequire(cursor->currRef);
1137 ndbrequire(cursor->saveCurrRef == 0);
1139 ndbrequire(refToInstance(cursor->currRef) == 0);
1140 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1145 LocalProxy::execDBINFO_SCANCONF(
Signal* signal)
1149 Uint32 signal_length = signal->getLength();
1150 ndbrequire(signal_length == DbinfoScanConf::SignalLength+conf->cursor_sz);
1155 if (Ndbinfo::ScanCursor::getHasMoreData(cursor->
flags))
1161 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1162 cursor->saveSenderRef = 0;
1165 cursor->saveCurrRef = cursor->currRef;
1166 cursor->currRef = reference();
1168 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1172 if (conf->returnedRows)
1181 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1182 cursor->saveSenderRef = 0;
1184 if (find_next(cursor))
1191 Ndbinfo::ScanCursor::setHasMoreData(cursor->
flags,
true);
1193 cursor->saveCurrRef = cursor->currRef;
1194 cursor->currRef = reference();
1199 ndbrequire(Ndbinfo::ScanCursor::getHasMoreData(cursor->
flags) ==
false);
1201 ndbrequire(cursor->currRef == reference());
1202 cursor->saveCurrRef = 0;
1205 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1211 if (find_next(cursor))
1215 ndbrequire(cursor->senderRef == reference());
1216 ndbrequire(cursor->saveSenderRef);
1218 ndbrequire(cursor->saveCurrRef == 0);
1220 sendSignal(cursor->currRef, GSN_DBINFO_SCANREQ,
1221 signal, signal_length, JBB);
1228 const Uint32 senderRef = cursor->senderRef = cursor->saveSenderRef;
1229 cursor->saveSenderRef = 0;
1231 ndbrequire(cursor->currRef);
1232 ndbrequire(cursor->saveCurrRef == 0);
1234 sendSignal(senderRef, GSN_DBINFO_SCANCONF, signal, signal_length, JBB);
1241 LocalProxy::execSYNC_REQ(
Signal* signal)
1243 Ss_SYNC_REQ& ss = ssSeize<Ss_SYNC_REQ>();
1245 ss.m_req = * CAST_CONSTPTR(
SyncReq, signal->getDataPtr());
1247 sendREQ(signal, ss);
1251 LocalProxy::sendSYNC_REQ(
Signal* signal, Uint32 ssId,
1254 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ssId);
1257 req->senderRef = reference();
1258 req->senderData = ssId;
1259 req->prio = ss.m_req.prio;
1261 sendSignalNoRelease(workerRef(ss.m_worker), GSN_SYNC_REQ,
1262 signal, SyncReq::SignalLength,
1263 JobBufferLevel(ss.m_req.prio), handle);
1267 LocalProxy::execSYNC_REF(
Signal* signal)
1270 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(ref.senderData);
1272 recvREF(signal, ss, ref.errorCode);
1276 LocalProxy::execSYNC_CONF(
Signal* signal)
1279 Ss_SYNC_REQ& ss = ssFind<Ss_SYNC_REQ>(conf.senderData);
1281 recvCONF(signal, ss);
1295 if (ss.m_error == 0)
1299 conf->senderRef = reference();
1300 conf->senderData = ss.m_req.senderData;
1302 Uint32 prio = ss.m_req.prio;
1303 sendSignal(ss.m_req.senderRef, GSN_SYNC_CONF, signal,
1304 SyncConf::SignalLength,
1305 JobBufferLevel(prio));
1311 ref->senderRef = reference();
1312 ref->senderData = ss.m_req.senderData;
1313 ref->errorCode = ss.m_error;
1315 Uint32 prio = ss.m_req.prio;
1316 sendSignal(ss.m_req.senderRef, GSN_SYNC_REF, signal,
1317 SyncRef::SignalLength,
1318 JobBufferLevel(prio));
1320 ssRelease<Ss_SYNC_REQ>(ssId);
1324 LocalProxy::execSYNC_PATH_REQ(
Signal* signal)
1327 req->count *= c_workers;
1329 for (Uint32 i = 0; i < c_workers; i++)
1332 Uint32 ref = numberToRef(number(), workerInstance(i), getOwnNodeId());
1333 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
1334 signal->getLength(),
1335 JobBufferLevel(req->prio));