25 #include <ndb_global.h>
27 #include "SimulatedBlock.hpp"
29 #include <OutputStream.hpp>
30 #include <GlobalData.hpp>
31 #include <Emulator.hpp>
32 #include <WatchDog.hpp>
33 #include <ErrorHandlingMacros.hpp>
34 #include <TimeQueue.hpp>
35 #include <TransporterRegistry.hpp>
36 #include <SignalLoggerManager.hpp>
37 #include <FastScheduler.hpp>
38 #include "ndbd_malloc.hpp"
39 #include <signaldata/EventReport.hpp>
40 #include <signaldata/ContinueFragmented.hpp>
41 #include <signaldata/NodeStateSignalData.hpp>
42 #include <signaldata/FsRef.hpp>
43 #include <signaldata/SignalDroppedRep.hpp>
44 #include <signaldata/LocalRouteOrd.hpp>
45 #include <signaldata/TransIdAI.hpp>
46 #include <signaldata/Sync.hpp>
47 #include <DebuggerNames.hpp>
48 #include "LongSignal.hpp"
50 #include <Properties.hpp>
51 #include "Configuration.hpp"
52 #include <AttributeDescriptor.hpp>
53 #include <NdbSqlUtil.hpp>
55 #include "../blocks/dbdih/Dbdih.hpp"
56 #include <signaldata/CallbackSignal.hpp>
57 #include "LongSignalImpl.hpp"
59 #include <EventLogger.hpp>
62 #define ljamEntry() jamEntryLine(30000 + __LINE__)
63 #define ljam() jamLine(30000 + __LINE__)
70 Uint32 instanceNumber)
71 : theNodeId(globalData.ownId),
72 theNumber(blockNumber),
73 theInstance(instanceNumber),
74 theReference(numberToRef(blockNumber, instanceNumber, globalData.ownId)),
78 m_global_page_pool(globalData.m_global_page_pool),
79 m_shared_page_pool(globalData.m_shared_page_pool),
80 c_fragmentInfoHash(c_fragmentInfoPool),
81 c_linearFragmentSendList(c_fragmentSendPool),
82 c_segmentedFragmentSendList(c_fragmentSendPool),
86 ,debugOut(*new NdbOut(*new
FileOutputStream(globalSignalLoggers.getOutputStream())))
93 m_watchDogCounter = NULL;
99 if (theInstance == 0) {
100 ndbrequire(mainBlock == 0);
102 globalData.setBlock(blockNumber, mainBlock);
104 ndbrequire(mainBlock != 0);
105 mainBlock->addInstance(
this, theInstance);
107 theMainInstance = mainBlock;
109 c_fragmentIdCounter = 1;
110 c_fragSenderRunning =
false;
116 for(GlobalSignalNumber
i = 0;
i<=MAX_GSN;
i++)
119 installSimulatedBlockFunctions();
121 m_callbackTableAddr = 0;
123 CLEAR_ERROR_INSERT_VALUE;
126 m_global_variables =
new Ptr<void> * [1];
127 m_global_variables[0] = 0;
128 m_global_variables_save = 0;
133 SimulatedBlock::addInstance(
SimulatedBlock* b, Uint32 theInstance)
135 ndbrequire(theMainInstance ==
this);
136 ndbrequire(number() == b->number());
137 if (theInstanceList == 0)
140 ndbrequire(theInstanceList != 0);
141 for (Uint32
i = 0;
i < MaxInstances;
i++)
142 theInstanceList[
i] = 0;
144 ndbrequire(theInstance < MaxInstances);
145 ndbrequire(theInstanceList[theInstance] == 0);
146 theInstanceList[theInstance] = b;
150 SimulatedBlock::initCommon()
153 this->getParam(
"FragmentSendPool", &count);
154 c_fragmentSendPool.
setSize(count);
157 this->getParam(
"FragmentInfoPool", &count);
158 c_fragmentInfoPool.
setSize(count);
161 this->getParam(
"FragmentInfoHash", &count);
162 c_fragmentInfoHash.
setSize(count);
165 this->getParam(
"ActiveMutexes", &count);
166 c_mutexMgr.setSize(count);
169 this->getParam(
"ActiveCounters", &count);
170 c_counterMgr.setSize(count);
173 this->getParam(
"ActiveThreadSync", &count);
174 c_syncThreadPool.
setSize(count);
177 SimulatedBlock::~SimulatedBlock()
185 delete [] m_global_variables;
188 if (theInstanceList != 0) {
190 for (i = 0; i < MaxInstances; i++)
191 delete theInstanceList[i];
192 delete [] theInstanceList;
198 SimulatedBlock::installSimulatedBlockFunctions(){
199 ExecFunction * a = theExecArray;
200 a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP;
202 a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER;
203 a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP;
204 a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED;
205 a[GSN_STOP_FOR_CRASH]= &SimulatedBlock::execSTOP_FOR_CRASH;
206 a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF;
207 a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF;
208 a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF;
209 a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF;
210 a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF;
211 a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF;
212 a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF;
213 a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF;
214 a[GSN_FSOPENREF] = &SimulatedBlock::execFSOPENREF;
215 a[GSN_FSCLOSEREF] = &SimulatedBlock::execFSCLOSEREF;
216 a[GSN_FSWRITEREF] = &SimulatedBlock::execFSWRITEREF;
217 a[GSN_FSREADREF] = &SimulatedBlock::execFSREADREF;
218 a[GSN_FSREMOVEREF] = &SimulatedBlock::execFSREMOVEREF;
219 a[GSN_FSSYNCREF] = &SimulatedBlock::execFSSYNCREF;
220 a[GSN_FSAPPENDREF] = &SimulatedBlock::execFSAPPENDREF;
221 a[GSN_NODE_START_REP] = &SimulatedBlock::execNODE_START_REP;
222 a[GSN_API_START_REP] = &SimulatedBlock::execAPI_START_REP;
223 a[GSN_SEND_PACKED] = &SimulatedBlock::execSEND_PACKED;
224 a[GSN_CALLBACK_CONF] = &SimulatedBlock::execCALLBACK_CONF;
225 a[GSN_SYNC_THREAD_REQ] = &SimulatedBlock::execSYNC_THREAD_REQ;
226 a[GSN_SYNC_THREAD_CONF] = &SimulatedBlock::execSYNC_THREAD_CONF;
228 a[GSN_SYNC_REQ] = &SimulatedBlock::execSYNC_REQ;
229 a[GSN_SYNC_PATH_REQ] = &SimulatedBlock::execSYNC_PATH_REQ;
230 a[GSN_SYNC_PATH_CONF] = &SimulatedBlock::execSYNC_PATH_CONF;
234 SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn,
235 ExecFunction f,
bool force){
236 if(gsn > MAX_GSN || (!force && theExecArray[gsn] != 0)){
239 "GSN %d(%d))", gsn, MAX_GSN);
240 ERROR_SET(fatal, NDBD_EXIT_ILLEGAL_SIGNAL, errorMsg, errorMsg);
242 theExecArray[gsn] = f;
246 SimulatedBlock::assignToThread(ThreadContext ctx)
248 m_threadId = ctx.threadId;
249 m_jamBuffer = ctx.jamBuffer;
250 m_watchDogCounter = ctx.watchDogCounter;
251 m_sectionPoolCache = ctx.sectionPoolCache;
255 SimulatedBlock::getInstanceKey(Uint32 tabId, Uint32 fragId)
257 Dbdih* dbdih = (
Dbdih*)globalData.getBlock(DBDIH);
258 Uint32 instanceKey = dbdih->dihGetInstanceKey(tabId, fragId);
263 SimulatedBlock::getInstanceFromKey(Uint32 instanceKey)
265 Uint32 lqhWorkers = globalData.ndbMtLqhWorkers;
267 if (lqhWorkers == 0) {
270 assert(instanceKey != 0);
271 instanceNo = 1 + (instanceKey - 1) % lqhWorkers;
277 SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo,
278 const char* filename,
int lineno)
const
284 "Signal (GSN: %d, Length: %d, Rec Block No: %d)",
285 gsn, len, recBlockNo);
287 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
295 #define check_sections(signal, cnt, cnt2) do { if (unlikely(cnt)) { handle_invalid_sections_in_send_signal(signal); } else if (unlikely(cnt2 == 0 && (signal->header.m_fragmentInfo != 0 && signal->header.m_fragmentInfo != 3))) { handle_invalid_fragmentInfo(signal); } } while(0)
298 SimulatedBlock::handle_invalid_sections_in_send_signal(
Signal* signal)
const
301 #if defined VM_TRACE || defined ERROR_INSERT
302 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
303 "Unhandled sections in sendSignal",
306 infoEvent(
"Unhandled sections in sendSignal!!");
311 SimulatedBlock::handle_lingering_sections_after_execute(
Signal* signal)
const
314 #if defined VM_TRACE || defined ERROR_INSERT
315 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
316 "Unhandled sections after execute",
319 infoEvent(
"Unhandled sections after execute");
324 SimulatedBlock::handle_lingering_sections_after_execute(
SectionHandle* handle)
const
327 #if defined VM_TRACE || defined ERROR_INSERT
328 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
329 "Unhandled sections(handle) after execute",
332 infoEvent(
"Unhandled sections(handle) after execute");
337 SimulatedBlock::handle_invalid_fragmentInfo(
Signal* signal)
const
339 #if defined VM_TRACE || defined ERROR_INSERT
340 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
341 "Incorrect header->m_fragmentInfo in sendSignal()",
344 signal->header.m_fragmentInfo = 0;
345 infoEvent(
"Incorrect header->m_fragmentInfo in sendSignal");
350 SimulatedBlock::handle_out_of_longsignal_memory(
Signal * signal)
const
352 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
353 "Out of LongMessageBuffer in sendSignal",
358 SimulatedBlock::handle_send_failed(SendStatus ss,
Signal * signal)
const
361 case SEND_BUFFER_FULL:
362 ErrorReporter::handleError(NDBD_EXIT_GENERIC,
363 "Out of SendBufferMemory in sendSignal",
"");
365 case SEND_MESSAGE_TOO_BIG:
366 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
367 "Message to big in sendSignal",
"");
369 case SEND_UNKNOWN_NODE:
370 ErrorReporter::handleError(NDBD_EXIT_NDBREQUIRE,
371 "Unknown node in sendSignal",
"");
375 case SEND_DISCONNECTED:
382 linkSegments(Uint32 head, Uint32 tail){
385 g_sectionSegmentPool.
getPtr(headPtr, head);
388 g_sectionSegmentPool.
getPtr(tailPtr, tail);
391 g_sectionSegmentPool.
getPtr(oldTailPtr, headPtr.p->m_lastSegment);
396 if ((headPtr.p->m_sz % NDB_SECTION_SEGMENT_SZ) != 0)
398 #if defined VM_TRACE || defined ERROR_INSERT
399 ErrorReporter::handleError(NDBD_EXIT_BLOCK_BNR_ZERO,
400 "Bad head segment size",
403 ndbout_c(
"linkSegments : Bad head segment size");
407 headPtr.p->m_lastSegment = tailPtr.p->m_lastSegment;
408 headPtr.p->m_sz += tailPtr.p->m_sz;
410 oldTailPtr.p->m_nextSegment = tailPtr.i;
415 Uint32 tSec0 = ptr[0].i;
416 Uint32 tSec1 = ptr[1].i;
417 Uint32 tSec2 = ptr[2].i;
421 p = g_sectionSegmentPool.
getPtr(tSec2);
425 p = g_sectionSegmentPool.
getPtr(tSec1);
429 p = g_sectionSegmentPool.
getPtr(tSec0);
436 sprintf(msg,
"secCount=%d", secCount);
437 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
448 Uint32 getSectionSz(Uint32
id)
450 return g_sectionSegmentPool.
getPtr(
id)->m_sz;
453 Uint32* getLastWordPtr(Uint32
id)
457 Uint32
offset= (first->m_sz -1) % SectionSegment::DataLength;
458 return &last->theData[
offset];
461 #ifdef NDBD_MULTITHREADED
462 #define SB_SP_ARG *m_sectionPoolCache,
463 #define SB_SP_REL_ARG f_section_lock, *m_sectionPoolCache,
466 #define SB_SP_REL_ARG
472 Uint32 tSec0 = ptr[0].i;
473 Uint32 tSz0 = ptr[0].sz;
474 Uint32 tSec1 = ptr[1].i;
475 Uint32 tSz1 = ptr[1].sz;
476 Uint32 tSec2 = ptr[2].i;
477 Uint32 tSz2 = ptr[2].sz;
480 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
482 ptr[2].p->m_lastSegment);
484 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
486 ptr[1].p->m_lastSegment);
488 g_sectionSegmentPool.releaseList(SPC_SEIZE_ARG
490 ptr[0].p->m_lastSegment);
495 sprintf(msg,
"secCount=%d", secCount);
496 ErrorReporter::handleAssert(msg, __FILE__, __LINE__);
500 SimulatedBlock::sendSignal(BlockReference ref,
501 GlobalSignalNumber gsn,
504 JobBufferLevel jobBuffer)
const {
506 BlockReference sendBRef = reference();
508 Uint32 noOfSections = signal->header.m_noOfSections;
509 Uint32 recBlock = refToBlock(ref);
510 Uint32 recNode = refToNode(ref);
511 Uint32 ourProcessor = globalData.ownId;
513 signal->header.theLength = length;
514 signal->header.theVerId_signalNumber = gsn;
515 signal->header.theReceiversBlockNumber = recBlock;
516 signal->header.m_noOfSections = 0;
518 check_sections(signal, noOfSections, 0);
520 Uint32 tSignalId = signal->header.theSignalId;
522 if ((length == 0) || length > 25 || (recBlock == 0)) {
523 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
527 if(globalData.testOn){
529 (recNode == 0 ? globalData.ownId : recNode);
530 signal->header.theSendersBlockRef = sendBRef;
531 globalSignalLoggers.sendSignal(signal->header,
538 if(recNode == ourProcessor || recNode == 0) {
539 signal->header.theSendersSignalId = tSignalId;
540 signal->header.theSendersBlockRef = sendBRef;
541 #ifdef NDBD_MULTITHREADED
542 if (jobBuffer == JBB)
543 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
545 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
547 globalScheduler.execute(signal, jobBuffer, recBlock,
555 Uint32 tTrace = signal->getTrace();
557 sh.theVerId_signalNumber = gsn;
558 sh.theReceiversBlockNumber = recBlock;
559 sh.theSendersBlockRef = refToBlock(sendBRef);
560 sh.theLength = length;
561 sh.theTrace = tTrace;
562 sh.theSignalId = tSignalId;
563 sh.m_noOfSections = 0;
564 sh.m_fragmentInfo = 0;
566 #ifdef TRACE_DISTRIBUTED
567 ndbout_c(
"send: %s(%d) to (%s, %d)",
568 getSignalName(gsn), gsn, getBlockName(recBlock),
573 #ifdef NDBD_MULTITHREADED
574 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
577 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
578 &signal->theData[0], recNode,
582 if (unlikely(! (ss == SEND_OK ||
583 ss == SEND_BLOCKED ||
584 ss == SEND_DISCONNECTED)))
586 handle_send_failed(ss, signal);
594 GlobalSignalNumber gsn,
597 JobBufferLevel jobBuffer)
const {
599 Uint32 noOfSections = signal->header.m_noOfSections;
600 Uint32 tSignalId = signal->header.theSignalId;
601 Uint32 tTrace = signal->getTrace();
603 Uint32 ourProcessor = globalData.ownId;
604 Uint32 recBlock = rg.m_block;
606 signal->header.theLength = length;
607 signal->header.theVerId_signalNumber = gsn;
608 signal->header.theReceiversBlockNumber = recBlock;
609 signal->header.theSendersSignalId = tSignalId;
610 signal->header.theSendersBlockRef = reference();
611 signal->header.m_noOfSections = 0;
613 check_sections(signal, noOfSections, 0);
615 if ((length == 0) || (length > 25) || (recBlock == 0)) {
616 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
622 sh.theVerId_signalNumber = gsn;
623 sh.theReceiversBlockNumber = recBlock;
624 sh.theSendersBlockRef = refToBlock(reference());
625 sh.theLength = length;
626 sh.theTrace = tTrace;
627 sh.theSignalId = tSignalId;
628 sh.m_noOfSections = 0;
629 sh.m_fragmentInfo = 0;
634 if(rg.m_nodes.
get(0) || rg.m_nodes.
get(ourProcessor)){
636 if(globalData.testOn){
637 globalSignalLoggers.sendSignal(signal->header,
644 #ifdef NDBD_MULTITHREADED
645 if (jobBuffer == JBB)
646 sendlocal(m_threadId, &signal->header, signal->theData, NULL);
648 sendprioa(m_threadId, &signal->header, signal->theData, NULL);
650 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
653 rg.m_nodes.
clear((Uint32)0);
654 rg.m_nodes.
clear(ourProcessor);
662 recNode = rg.m_nodes.
find(recNode + 1);
663 rg.m_nodes.
clear(recNode);
665 if(globalData.testOn){
666 globalSignalLoggers.sendSignal(signal->header,
673 #ifdef TRACE_DISTRIBUTED
674 ndbout_c(
"send: %s(%d) to (%s, %d)",
675 getSignalName(gsn), gsn, getBlockName(recBlock),
680 #ifdef NDBD_MULTITHREADED
681 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
684 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
685 &signal->theData[0], recNode,
689 if (unlikely(! (ss == SEND_OK ||
690 ss == SEND_BLOCKED ||
691 ss == SEND_DISCONNECTED)))
693 handle_send_failed(ss, signal);
703 SimulatedBlock::sendSignal(BlockReference ref,
704 GlobalSignalNumber gsn,
707 JobBufferLevel jobBuffer,
709 Uint32 noOfSections)
const {
711 BlockReference sendBRef = reference();
713 Uint32 recBlock = refToBlock(ref);
714 Uint32 recNode = refToNode(ref);
715 Uint32 ourProcessor = globalData.ownId;
717 check_sections(signal, signal->header.m_noOfSections, noOfSections);
719 signal->header.theLength = length;
720 signal->header.theVerId_signalNumber = gsn;
721 signal->header.theReceiversBlockNumber = recBlock;
722 signal->header.m_noOfSections = noOfSections;
724 Uint32 tSignalId = signal->header.theSignalId;
725 Uint32 tFragInfo = signal->header.m_fragmentInfo;
727 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
728 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
732 if(globalData.testOn){
734 (recNode == 0 ? globalData.ownId : recNode);
735 signal->header.theSendersBlockRef = sendBRef;
736 globalSignalLoggers.sendSignal(signal->header,
744 if(recNode == ourProcessor || recNode == 0) {
745 signal->header.theSendersSignalId = tSignalId;
746 signal->header.theSendersBlockRef = sendBRef;
753 for(Uint32 i = 0; i<noOfSections; i++){
754 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
755 signal->theData[length+
i] = segptr[
i].i;
760 handle_out_of_longsignal_memory(signal);
763 #ifdef NDBD_MULTITHREADED
764 if (jobBuffer == JBB)
765 sendlocal(m_threadId, &signal->header, signal->theData,
766 signal->theData+length);
768 sendprioa(m_threadId, &signal->header, signal->theData,
769 signal->theData+length);
771 globalScheduler.execute(signal, jobBuffer, recBlock,
774 signal->header.m_noOfSections = 0;
780 Uint32 tTrace = signal->getTrace();
781 Uint32 noOfSections = signal->header.m_noOfSections;
783 sh.theVerId_signalNumber = gsn;
784 sh.theReceiversBlockNumber = recBlock;
785 sh.theSendersBlockRef = refToBlock(sendBRef);
786 sh.theLength = length;
787 sh.theTrace = tTrace;
788 sh.theSignalId = tSignalId;
789 sh.m_noOfSections = noOfSections;
790 sh.m_fragmentInfo = tFragInfo;
792 #ifdef TRACE_DISTRIBUTED
793 ndbout_c(
"send: %s(%d) to (%s, %d)",
794 getSignalName(gsn), gsn, getBlockName(recBlock),
799 #ifdef NDBD_MULTITHREADED
800 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
803 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
804 &signal->theData[0], recNode,
808 if (unlikely(! (ss == SEND_OK ||
809 ss == SEND_BLOCKED ||
810 ss == SEND_DISCONNECTED)))
812 handle_send_failed(ss, signal);
816 signal->header.m_noOfSections = 0;
817 signal->header.m_fragmentInfo = 0;
823 GlobalSignalNumber gsn,
826 JobBufferLevel jobBuffer,
828 Uint32 noOfSections)
const {
830 Uint32 tSignalId = signal->header.theSignalId;
831 Uint32 tTrace = signal->getTrace();
832 Uint32 tFragInfo = signal->header.m_fragmentInfo;
834 Uint32 ourProcessor = globalData.ownId;
835 Uint32 recBlock = rg.m_block;
837 check_sections(signal, signal->header.m_noOfSections, noOfSections);
839 signal->header.theLength = length;
840 signal->header.theVerId_signalNumber = gsn;
841 signal->header.theReceiversBlockNumber = recBlock;
842 signal->header.theSendersSignalId = tSignalId;
843 signal->header.theSendersBlockRef = reference();
844 signal->header.m_noOfSections = noOfSections;
846 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
847 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
852 sh.theVerId_signalNumber = gsn;
853 sh.theReceiversBlockNumber = recBlock;
854 sh.theSendersBlockRef = refToBlock(reference());
855 sh.theLength = length;
856 sh.theTrace = tTrace;
857 sh.theSignalId = tSignalId;
858 sh.m_noOfSections = noOfSections;
859 sh.m_fragmentInfo = tFragInfo;
864 if(rg.m_nodes.
get(0) || rg.m_nodes.
get(ourProcessor)){
866 if(globalData.testOn){
867 globalSignalLoggers.sendSignal(signal->header,
879 for(Uint32 i = 0; i<noOfSections; i++){
880 ok &= ::import(SB_SP_ARG segptr[i], ptr[i].p, ptr[i].sz);
881 signal->theData[length+
i] = segptr[
i].i;
886 handle_out_of_longsignal_memory(signal);
889 #ifdef NDBD_MULTITHREADED
890 if (jobBuffer == JBB)
891 sendlocal(m_threadId, &signal->header, signal->theData,
892 signal->theData + length);
894 sendprioa(m_threadId, &signal->header, signal->theData,
895 signal->theData + length);
897 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
900 rg.m_nodes.
clear((Uint32)0);
901 rg.m_nodes.
clear(ourProcessor);
909 recNode = rg.m_nodes.
find(recNode + 1);
910 rg.m_nodes.
clear(recNode);
913 if(globalData.testOn){
914 globalSignalLoggers.sendSignal(signal->header,
922 #ifdef TRACE_DISTRIBUTED
923 ndbout_c(
"send: %s(%d) to (%s, %d)",
924 getSignalName(gsn), gsn, getBlockName(recBlock),
929 #ifdef NDBD_MULTITHREADED
930 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
933 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
934 &signal->theData[0], recNode,
938 if (unlikely(! (ss == SEND_OK ||
939 ss == SEND_BLOCKED ||
940 ss == SEND_DISCONNECTED)))
942 handle_send_failed(ss, signal);
946 signal->header.m_noOfSections = 0;
947 signal->header.m_fragmentInfo = 0;
953 SimulatedBlock::sendSignal(BlockReference ref,
954 GlobalSignalNumber gsn,
957 JobBufferLevel jobBuffer,
960 Uint32 noOfSections = sections->m_cnt;
961 BlockReference sendBRef = reference();
963 Uint32 recBlock = refToBlock(ref);
964 Uint32 recNode = refToNode(ref);
965 Uint32 ourProcessor = globalData.ownId;
967 check_sections(signal, signal->header.m_noOfSections, noOfSections);
969 signal->header.theLength = length;
970 signal->header.theVerId_signalNumber = gsn;
971 signal->header.theReceiversBlockNumber = recBlock;
972 signal->header.m_noOfSections = noOfSections;
974 Uint32 tSignalId = signal->header.theSignalId;
975 Uint32 tFragInfo = signal->header.m_fragmentInfo;
977 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
978 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
982 if(globalData.testOn){
984 (recNode == 0 ? globalData.ownId : recNode);
985 signal->header.theSendersBlockRef = sendBRef;
986 globalSignalLoggers.sendSignal(signal->header,
990 sections->m_ptr, noOfSections);
994 if(recNode == ourProcessor || recNode == 0) {
995 signal->header.theSendersSignalId = tSignalId;
996 signal->header.theSendersBlockRef = sendBRef;
1001 Uint32 * dst = signal->theData + length;
1002 * dst ++ = sections->m_ptr[0].i;
1003 * dst ++ = sections->m_ptr[1].i;
1004 * dst ++ = sections->m_ptr[2].i;
1006 #ifdef NDBD_MULTITHREADED
1007 if (jobBuffer == JBB)
1008 sendlocal(m_threadId, &signal->header, signal->theData,
1009 signal->theData + length);
1011 sendprioa(m_threadId, &signal->header, signal->theData,
1012 signal->theData + length);
1014 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1020 Uint32 tTrace = signal->getTrace();
1022 sh.theVerId_signalNumber = gsn;
1023 sh.theReceiversBlockNumber = recBlock;
1024 sh.theSendersBlockRef = refToBlock(sendBRef);
1025 sh.theLength = length;
1026 sh.theTrace = tTrace;
1027 sh.theSignalId = tSignalId;
1028 sh.m_noOfSections = noOfSections;
1029 sh.m_fragmentInfo = tFragInfo;
1031 #ifdef TRACE_DISTRIBUTED
1032 ndbout_c(
"send: %s(%d) to (%s, %d)",
1033 getSignalName(gsn), gsn, getBlockName(recBlock),
1038 #ifdef NDBD_MULTITHREADED
1039 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1040 recNode, &g_sectionSegmentPool, sections->m_ptr);
1042 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
1043 &signal->theData[0], recNode,
1044 g_sectionSegmentPool,
1048 if (unlikely(! (ss == SEND_OK ||
1049 ss == SEND_BLOCKED ||
1050 ss == SEND_DISCONNECTED)))
1052 handle_send_failed(ss, signal);
1055 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1058 signal->header.m_noOfSections = 0;
1059 signal->header.m_fragmentInfo = 0;
1060 sections->m_cnt = 0;
1066 GlobalSignalNumber gsn,
1069 JobBufferLevel jobBuffer,
1072 Uint32 noOfSections = sections->m_cnt;
1073 Uint32 tSignalId = signal->header.theSignalId;
1074 Uint32 tTrace = signal->getTrace();
1075 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1077 Uint32 ourProcessor = globalData.ownId;
1078 Uint32 recBlock = rg.m_block;
1080 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1082 signal->header.theLength = length;
1083 signal->header.theVerId_signalNumber = gsn;
1084 signal->header.theReceiversBlockNumber = recBlock;
1085 signal->header.theSendersSignalId = tSignalId;
1086 signal->header.theSendersBlockRef = reference();
1087 signal->header.m_noOfSections = noOfSections;
1089 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1090 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1095 sh.theVerId_signalNumber = gsn;
1096 sh.theReceiversBlockNumber = recBlock;
1097 sh.theSendersBlockRef = refToBlock(reference());
1098 sh.theLength = length;
1099 sh.theTrace = tTrace;
1100 sh.theSignalId = tSignalId;
1101 sh.m_noOfSections = noOfSections;
1102 sh.m_fragmentInfo = tFragInfo;
1108 if(rg.m_nodes.
get(0) || rg.m_nodes.
get(ourProcessor))
1112 if(globalData.testOn){
1113 globalSignalLoggers.sendSignal(signal->header,
1115 &signal->theData[0],
1117 sections->m_ptr, noOfSections);
1123 Uint32 * dst = signal->theData + length;
1124 * dst ++ = sections->m_ptr[0].i;
1125 * dst ++ = sections->m_ptr[1].i;
1126 * dst ++ = sections->m_ptr[2].i;
1127 #ifdef NDBD_MULTITHREADED
1128 if (jobBuffer == JBB)
1129 sendlocal(m_threadId, &signal->header, signal->theData,
1130 signal->theData + length);
1132 sendprioa(m_threadId, &signal->header, signal->theData,
1133 signal->theData + length);
1135 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1138 rg.m_nodes.
clear((Uint32)0);
1139 rg.m_nodes.
clear(ourProcessor);
1147 recNode = rg.m_nodes.
find(recNode + 1);
1148 rg.m_nodes.
clear(recNode);
1151 if(globalData.testOn){
1152 globalSignalLoggers.sendSignal(signal->header,
1154 &signal->theData[0],
1156 sections->m_ptr, noOfSections);
1160 #ifdef TRACE_DISTRIBUTED
1161 ndbout_c(
"send: %s(%d) to (%s, %d)",
1162 getSignalName(gsn), gsn, getBlockName(recBlock),
1167 #ifdef NDBD_MULTITHREADED
1168 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1169 recNode, &g_sectionSegmentPool, sections->m_ptr);
1171 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
1172 &signal->theData[0], recNode,
1173 g_sectionSegmentPool,
1177 if (unlikely(! (ss == SEND_OK ||
1178 ss == SEND_BLOCKED ||
1179 ss == SEND_DISCONNECTED)))
1181 handle_send_failed(ss, signal);
1187 ::releaseSections(SB_SP_ARG noOfSections, sections->m_ptr);
1190 sections->m_cnt = 0;
1191 signal->header.m_noOfSections = 0;
1192 signal->header.m_fragmentInfo = 0;
1199 GlobalSignalNumber gsn,
1202 JobBufferLevel jobBuffer,
1211 Uint32 noOfSections = sections->m_cnt;
1212 BlockReference sendBRef = reference();
1214 Uint32 recBlock = refToBlock(ref);
1215 Uint32 recNode = refToNode(ref);
1216 Uint32 ourProcessor = globalData.ownId;
1218 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1220 signal->header.theLength = length;
1221 signal->header.theVerId_signalNumber = gsn;
1222 signal->header.theReceiversBlockNumber = recBlock;
1223 signal->header.m_noOfSections = noOfSections;
1225 Uint32 tSignalId = signal->header.theSignalId;
1226 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1228 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1229 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1233 if(globalData.testOn){
1235 (recNode == 0 ? globalData.ownId : recNode);
1236 signal->header.theSendersBlockRef = sendBRef;
1237 globalSignalLoggers.sendSignal(signal->header,
1239 &signal->theData[0],
1241 sections->m_ptr, noOfSections);
1245 if(recNode == ourProcessor || recNode == 0) {
1246 signal->header.theSendersSignalId = tSignalId;
1247 signal->header.theSendersBlockRef = sendBRef;
1249 Uint32 * dst = signal->theData + length;
1254 for (Uint32 sec=0; sec < noOfSections; sec++)
1257 bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1262 #ifdef NDBD_MULTITHREADED
1263 if (jobBuffer == JBB)
1264 sendlocal(m_threadId, &signal->header, signal->theData,
1265 signal->theData + length);
1267 sendprioa(m_threadId, &signal->header, signal->theData,
1268 signal->theData + length);
1270 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1276 Uint32 tTrace = signal->getTrace();
1278 sh.theVerId_signalNumber = gsn;
1279 sh.theReceiversBlockNumber = recBlock;
1280 sh.theSendersBlockRef = refToBlock(sendBRef);
1281 sh.theLength = length;
1282 sh.theTrace = tTrace;
1283 sh.theSignalId = tSignalId;
1284 sh.m_noOfSections = noOfSections;
1285 sh.m_fragmentInfo = tFragInfo;
1287 #ifdef TRACE_DISTRIBUTED
1288 ndbout_c(
"send: %s(%d) to (%s, %d)",
1289 getSignalName(gsn), gsn, getBlockName(recBlock),
1294 #ifdef NDBD_MULTITHREADED
1295 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1296 recNode, &g_sectionSegmentPool, sections->m_ptr);
1298 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
1299 &signal->theData[0], recNode,
1300 g_sectionSegmentPool,
1304 ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1307 signal->header.m_noOfSections = 0;
1308 signal->header.m_fragmentInfo = 0;
1314 GlobalSignalNumber gsn,
1317 JobBufferLevel jobBuffer,
1325 Uint32 noOfSections = sections->m_cnt;
1326 Uint32 tSignalId = signal->header.theSignalId;
1327 Uint32 tTrace = signal->getTrace();
1328 Uint32 tFragInfo = signal->header.m_fragmentInfo;
1330 Uint32 ourProcessor = globalData.ownId;
1331 Uint32 recBlock = rg.m_block;
1333 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1335 signal->header.theLength = length;
1336 signal->header.theVerId_signalNumber = gsn;
1337 signal->header.theReceiversBlockNumber = recBlock;
1338 signal->header.theSendersSignalId = tSignalId;
1339 signal->header.theSendersBlockRef = reference();
1340 signal->header.m_noOfSections = noOfSections;
1342 if ((length == 0) || (length + noOfSections > 25) || (recBlock == 0)) {
1343 signal_error(gsn, length, recBlock, __FILE__, __LINE__);
1348 sh.theVerId_signalNumber = gsn;
1349 sh.theReceiversBlockNumber = recBlock;
1350 sh.theSendersBlockRef = refToBlock(reference());
1351 sh.theLength = length;
1352 sh.theTrace = tTrace;
1353 sh.theSignalId = tSignalId;
1354 sh.m_noOfSections = noOfSections;
1355 sh.m_fragmentInfo = tFragInfo;
1360 if(rg.m_nodes.
get(0) || rg.m_nodes.
get(ourProcessor))
1363 if(globalData.testOn){
1364 globalSignalLoggers.sendSignal(signal->header,
1366 &signal->theData[0],
1368 sections->m_ptr, noOfSections);
1372 Uint32 * dst = signal->theData + length;
1377 for (Uint32 sec=0; sec < noOfSections; sec++)
1380 bool ok= ::dupSection(SB_SP_ARG secCopy, sections->m_ptr[sec].i);
1385 #ifdef NDBD_MULTITHREADED
1386 if (jobBuffer == JBB)
1387 sendlocal(m_threadId, &signal->header, signal->theData,
1388 signal->theData + length);
1390 sendprioa(m_threadId, &signal->header, signal->theData,
1391 signal->theData + length);
1393 globalScheduler.execute(signal, jobBuffer, recBlock, gsn);
1396 rg.m_nodes.
clear((Uint32)0);
1397 rg.m_nodes.
clear(ourProcessor);
1405 recNode = rg.m_nodes.
find(recNode + 1);
1406 rg.m_nodes.
clear(recNode);
1409 if(globalData.testOn){
1410 globalSignalLoggers.sendSignal(signal->header,
1412 &signal->theData[0],
1414 sections->m_ptr, noOfSections);
1418 #ifdef TRACE_DISTRIBUTED
1419 ndbout_c(
"send: %s(%d) to (%s, %d)",
1420 getSignalName(gsn), gsn, getBlockName(recBlock),
1425 #ifdef NDBD_MULTITHREADED
1426 ss = mt_send_remote(m_threadId, &sh, jobBuffer, &signal->theData[0],
1427 recNode, &g_sectionSegmentPool, sections->m_ptr);
1429 ss = globalTransporterRegistry.
prepareSend(&sh, jobBuffer,
1430 &signal->theData[0], recNode,
1431 g_sectionSegmentPool,
1435 ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED);
1438 signal->header.m_noOfSections = 0;
1439 signal->header.m_fragmentInfo = 0;
1446 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1447 GlobalSignalNumber gsn,
1449 Uint32 delayInMilliSeconds,
1450 Uint32 length)
const {
1452 BlockNumber bnr = refToBlock(ref);
1454 check_sections(signal, signal->header.m_noOfSections, 0);
1456 signal->header.theLength = length;
1457 signal->header.theSendersSignalId = signal->header.theSignalId;
1458 signal->header.theVerId_signalNumber = gsn;
1459 signal->header.theReceiversBlockNumber = bnr;
1460 signal->header.theSendersBlockRef = reference();
1464 if(globalData.testOn){
1465 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1468 &signal->theData[0],
1474 #ifdef NDBD_MULTITHREADED
1475 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1477 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1485 SimulatedBlock::sendSignalWithDelay(BlockReference ref,
1486 GlobalSignalNumber gsn,
1488 Uint32 delayInMilliSeconds,
1492 Uint32 noOfSections = sections->m_cnt;
1493 BlockNumber bnr = refToBlock(ref);
1495 BlockReference sendBRef = reference();
1501 check_sections(signal, signal->header.m_noOfSections, noOfSections);
1503 signal->header.theLength = length;
1504 signal->header.theSendersSignalId = signal->header.theSignalId;
1505 signal->header.theSendersBlockRef = sendBRef;
1506 signal->header.theVerId_signalNumber = gsn;
1507 signal->header.theReceiversBlockNumber = bnr;
1508 signal->header.m_noOfSections = noOfSections;
1510 Uint32 * dst = signal->theData + length;
1511 * dst ++ = sections->m_ptr[0].i;
1512 * dst ++ = sections->m_ptr[1].i;
1513 * dst ++ = sections->m_ptr[2].i;
1517 if(globalData.testOn){
1518 globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds,
1521 &signal->theData[0],
1527 #ifdef NDBD_MULTITHREADED
1528 senddelay(m_threadId, &signal->header, delayInMilliSeconds);
1530 globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds);
1533 signal->header.m_noOfSections = 0;
1534 signal->header.m_fragmentInfo = 0;
1535 sections->m_cnt = 0;
1545 SimulatedBlock::releaseSection(Uint32 firstSegmentIVal)
1547 ::releaseSection(SB_SP_ARG firstSegmentIVal);
1553 ::releaseSections(SB_SP_ARG handle.m_cnt, handle.m_ptr);
1558 SimulatedBlock::appendToSection(Uint32& firstSegmentIVal,
const Uint32* src, Uint32 len)
1560 return ::appendToSection(SB_SP_ARG firstSegmentIVal, src, len);
1566 return ::import(SB_SP_ARG first, src, len);
1573 if (::
import(SB_SP_ARG tmp, src, len))
1584 SimulatedBlock::dupSection(Uint32& copyFirstIVal, Uint32 srcFirstIVal)
1586 return ::dupSection(SB_SP_ARG copyFirstIVal, srcFirstIVal);
1590 SimulatedBlock::writeToSection(Uint32 firstSegmentIVal, Uint32
offset,
1591 const Uint32* src, Uint32 len)
1593 return ::writeToSection(firstSegmentIVal, offset, src, len);
1597 SimulatedBlock::getSectionSegmentPool(){
1598 return g_sectionSegmentPool;
1602 SimulatedBlock::allocateBat(
int batSize){
1606 theBATSize = batSize;
1611 SimulatedBlock::freeBat(){
1619 SimulatedBlock::getBat(Uint16 blockNo, Uint32 instanceNo){
1620 assert(blockNo == blockToMain(blockNo));
1622 if (sb != 0 && instanceNo != 0)
1623 sb = sb->getInstance(instanceNo);
1626 return sb->NewVarRef;
1630 SimulatedBlock::getBatSize(Uint16 blockNo, Uint32 instanceNo){
1631 assert(blockNo == blockToMain(blockNo));
1633 if (sb != 0 && instanceNo != 0)
1634 sb = sb->getInstance(instanceNo);
1637 return sb->theBATSize;
1642 return allocRecordAligned(type, s, n, 0, 0, clear, paramId);
1646 SimulatedBlock::allocRecordAligned(
const char *
type,
size_t s,
size_t n,
void **unaligned_buffer, Uint32 align,
bool clear, Uint32 paramId)
1650 Uint32 over_alloc = unaligned_buffer ? (align - 1) : 0;
1651 size_t size = n*s + over_alloc;
1652 Uint64 real_size = (Uint64)((Uint64)
n)*((Uint64)s) + over_alloc;
1656 ndbout_c(
"%s::allocRecord(%s, %u, %u) = %llu bytes",
1657 getBlockName(number()),
1663 if( real_size == (Uint64)size )
1664 p = ndbd_malloc(size);
1673 getBlockName(number()), param_info.m_name);
1676 getBlockName(number()), type);
1679 (Uint32)s, (Uint32)n, (Uint64)real_size);
1680 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1684 char * ptr = (
char*)p;
1685 const Uint32 chunk = 128 * 1024;
1686 while(size > chunk){
1688 memset(ptr, 0, chunk);
1693 memset(ptr, 0, size);
1695 if (unaligned_buffer)
1697 *unaligned_buffer = p;
1698 p = (
void *)(((UintPtr)p + over_alloc) & ~(UintPtr)(over_alloc));
1700 g_eventLogger->
info(
"'%s' (%u) %llu %llu, alignment correction %u bytes",
1701 type, align, (Uint64)p, (Uint64)p+n*s,
1702 (Uint32)((UintPtr)p - (UintPtr)*unaligned_buffer));
1711 const char * type,
size_t s,
size_t n){
1715 ndbd_free(* ptr, n*s);
1721 SimulatedBlock::sortchunks(
const void * _e0,
const void * _e1)
1723 const AllocChunk *p0 = (
const AllocChunk*)_e0;
1724 const AllocChunk *p1 = (
const AllocChunk*)_e1;
1726 if (p0->ptrI > p1->ptrI)
1728 if (p0->ptrI < p1->ptrI)
1734 SimulatedBlock::allocChunks(AllocChunk dst[],
1740 const Uint32 save = pages;
1742 for (; i<arraysize && pages > 0; i++)
1745 m_ctx.m_mm.alloc_pages(rg, &dst[i].ptrI, &cnt, 1);
1746 if (unlikely(cnt == 0))
1751 if (unlikely(pages != 0))
1754 qsort(dst, i,
sizeof(dst[0]), sortchunks);
1766 param_info.m_name =
"<unknown>";
1770 "%s could not allocate memory for parameter %s",
1771 getBlockName(number()), param_info.m_name);
1774 ERROR_SET(fatal, NDBD_EXIT_MEMALLOC, buf1, buf2);
1781 #ifdef NDBD_MULTITHREADED
1782 (*m_watchDogCounter) = place;
1784 globalData.incrementWatchDogCounter(place);
1789 SimulatedBlock::update_watch_dog_timer(Uint32 interval)
1792 globalEmulatorData.theWatchDog->setCheckInterval(interval);
1799 const char *aBlockName = getBlockName(number(),
"VM Kernel");
1804 (m_ctx.m_config.stopOnError()<<1) +
1805 (m_ctx.m_config.getInitialStart()<<2);
1810 aBlockName, line, magicStatus);
1812 ErrorReporter::handleError(err_code, extra, buf);
1823 char *
buf = (
char *)(signalT.theData+1);
1827 BaseString::vsnprintf(buf, 96, msg, ap);
1830 int len = strlen(buf) + 1;
1841 const Signal * signal = globalScheduler.getVMSignals();
1842 Uint32 tTrace = signal->header.theTrace;
1843 Uint32 tSignalId = signal->header.theSignalId;
1845 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1846 signalT.header.theReceiversBlockNumber = CMVMI;
1847 signalT.header.theSendersBlockRef = reference();
1848 signalT.header.theTrace = tTrace;
1849 signalT.header.theSignalId = tSignalId;
1850 signalT.header.theLength = ((len+3)/4)+1;
1852 #ifdef NDBD_MULTITHREADED
1853 sendlocal(m_threadId,
1854 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1856 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1857 signalT.m_sectionPtrI);
1868 char *
buf = (
char *)(signalT.theData+1);
1872 BaseString::vsnprintf(buf, 96, msg, ap);
1875 int len = strlen(buf) + 1;
1886 const Signal * signal = globalScheduler.getVMSignals();
1887 Uint32 tTrace = signal->header.theTrace;
1888 Uint32 tSignalId = signal->header.theSignalId;
1890 signalT.header.theVerId_signalNumber = GSN_EVENT_REP;
1891 signalT.header.theReceiversBlockNumber = CMVMI;
1892 signalT.header.theSendersBlockRef = reference();
1893 signalT.header.theTrace = tTrace;
1894 signalT.header.theSignalId = tSignalId;
1895 signalT.header.theLength = ((len+3)/4)+1;
1897 #ifdef NDBD_MULTITHREADED
1898 sendlocal(m_threadId,
1899 &signalT.header, signalT.theData, signalT.m_sectionPtrI);
1901 globalScheduler.execute(&signalT.header, JBB, signalT.theData,
1902 signalT.m_sectionPtrI);
1907 SimulatedBlock::execNODE_STATE_REP(
Signal* signal){
1910 this->theNodeState = rep->nodeState;
1918 this->theNodeState = req->nodeState;
1919 const Uint32 senderData = req->senderData;
1920 const BlockReference senderRef = req->senderRef;
1928 conf->senderData = senderData;
1930 sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal,
1931 ChangeNodeStateConf::SignalLength, JBB);
1935 SimulatedBlock::execNDB_TAMPER(
Signal * signal){
1936 if (signal->getLength() == 1)
1938 SET_ERROR_INSERT_VALUE(signal->theData[0]);
1942 SET_ERROR_INSERT_VALUE2(signal->theData[0], signal->theData[1]);
1947 SimulatedBlock::execSIGNAL_DROPPED_REP(
Signal * signal){
1954 rep->originalGsn, rep->originalLength,rep->originalSectionCount);
1955 ErrorReporter::handleError(NDBD_EXIT_OUT_OF_LONG_SIGNAL_MEMORY,
1962 SimulatedBlock::execCONTINUE_FRAGMENTED(
Signal * signal){
1966 ndbrequire(signal->getSendersBlockRef() == reference());
1970 case ContinueFragmented::CONTINUE_SENDING :
1975 c_segmentedFragmentSendList.
first(fragPtr);
1976 for(; !fragPtr.isNull();){
1979 c_segmentedFragmentSendList.
next(fragPtr);
1982 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
1984 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
1986 execute(signal, copyPtr.p->m_callback, 0);
1988 c_segmentedFragmentSendList.
release(copyPtr);
1992 c_linearFragmentSendList.
first(fragPtr);
1993 for(; !fragPtr.isNull();){
1996 c_linearFragmentSendList.
next(fragPtr);
1999 if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){
2001 if(copyPtr.p->m_callback.m_callbackFunction != 0) {
2003 execute(signal, copyPtr.p->m_callback, 0);
2005 c_linearFragmentSendList.
release(copyPtr);
2009 if(c_segmentedFragmentSendList.isEmpty() &&
2010 c_linearFragmentSendList.isEmpty()){
2012 c_fragSenderRunning =
false;
2016 sig->type = ContinueFragmented::CONTINUE_SENDING;
2017 sig->line = __LINE__;
2018 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
2021 case ContinueFragmented::CONTINUE_CLEANUP:
2025 const Uint32 callbackWords = (
sizeof(Callback) + 3) >> 2;
2027 ndbassert(signal->getLength() ==
2028 ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2032 memcpy(&cb, &sig->cleanup.callbackStart, callbackWords << 2);
2034 doNodeFailureCleanup(signal,
2035 sig->cleanup.failedNodeId,
2036 sig->cleanup.resource,
2037 sig->cleanup.cursor,
2038 sig->cleanup.elementsCleaned,
2048 SimulatedBlock::execSTOP_FOR_CRASH(
Signal* signal)
2050 #ifdef NDBD_MULTITHREADED
2051 mt_execSTOP_FOR_CRASH();
2056 SimulatedBlock::execNODE_START_REP(
Signal* signal)
2061 SimulatedBlock::execAPI_START_REP(
Signal* signal)
2066 SimulatedBlock::execSEND_PACKED(
Signal* signal)
2073 SimulatedBlock::getCallbackEntry(Uint32 ci)
2075 ndbrequire(m_callbackTableAddr != 0);
2076 const CallbackTable& ct = *m_callbackTableAddr;
2077 ndbrequire(ci < ct.m_count);
2078 return ct.m_entry[ci];
2082 SimulatedBlock::sendCallbackConf(
Signal* signal, Uint32 fullBlockNo,
2083 CallbackPtr& cptr, Uint32 returnCode)
2085 Uint32 blockNo = blockToMain(fullBlockNo);
2086 Uint32 instanceNo = blockToInstance(fullBlockNo);
2090 const CallbackEntry& ce = b->getCallbackEntry(cptr.m_callbackIndex);
2093 Uint32 senderData = returnCode;
2095 if (!isNdbMtLqh()) {
2097 c.m_callbackFunction = ce.m_function;
2098 c.m_callbackData = cptr.m_callbackData;
2099 b->execute(signal, c, returnCode);
2101 if (ce.m_flags & CALLBACK_ACK) {
2104 ack->senderData = senderData;
2106 signal, CallbackAck::SignalLength);
2110 conf->senderData = senderData;
2111 conf->senderRef = reference();
2112 conf->callbackIndex = cptr.m_callbackIndex;
2113 conf->callbackData = cptr.m_callbackData;
2114 conf->returnCode = returnCode;
2116 if (ce.m_flags & CALLBACK_DIRECT) {
2119 signal, CallbackConf::SignalLength, instanceNo);
2122 BlockReference ref = numberToRef(fullBlockNo, getOwnNodeId());
2123 sendSignal(ref, GSN_CALLBACK_CONF,
2124 signal, CallbackConf::SignalLength, JBB);
2127 cptr.m_callbackIndex = ZNIL;
2131 SimulatedBlock::execCALLBACK_CONF(
Signal* signal)
2135 Uint32 senderData = conf->senderData;
2136 Uint32 senderRef = conf->senderRef;
2138 ndbrequire(m_callbackTableAddr != 0);
2139 const CallbackEntry& ce = getCallbackEntry(conf->callbackIndex);
2140 CallbackFunction
function = ce.m_function;
2143 callback.m_callbackFunction =
function;
2144 callback.m_callbackData = conf->callbackData;
2145 execute(signal, callback, conf->returnCode);
2147 if (ce.m_flags & CALLBACK_ACK) {
2150 ack->senderData = senderData;
2151 sendSignal(senderRef, GSN_CALLBACK_ACK,
2152 signal, CallbackAck::SignalLength, JBB);
2156 #ifdef VM_TRACE_TIME
2158 SimulatedBlock::clearTimes() {
2159 for(Uint32 i = 0; i <= MAX_GSN; i++){
2160 m_timeTrace[
i].cnt = 0;
2161 m_timeTrace[
i].sum = 0;
2162 m_timeTrace[
i].sub = 0;
2167 SimulatedBlock::printTimes(FILE * output){
2168 fprintf(output,
"-- %s --\n", getBlockName(number()));
2170 for(Uint32 i = 0; i <= MAX_GSN; i++){
2171 Uint32 n = m_timeTrace[
i].cnt;
2175 double avg = m_timeTrace[
i].sum;
2176 double avg2 = avg - m_timeTrace[
i].sub;
2183 "%s ; #%d ; %dus ; %dus ; %dms\n",
2184 getSignalName(i), n, (Uint32)avg, (Uint32)avg2,
2185 (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000));
2187 sum += (m_timeTrace[
i].sum - m_timeTrace[
i].sub);
2190 sum = (sum + 500)/ 1000;
2191 fprintf(output,
"-- %s : %u --\n", getBlockName(number()), (Uint32)sum);
2192 fprintf(output,
"\n");
2198 SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){
2199 m_fragmentId = fragId;
2200 m_senderRef = sender;
2201 m_sectionPtrI[0] = RNIL;
2202 m_sectionPtrI[1] = RNIL;
2203 m_sectionPtrI[2] = RNIL;
2206 SimulatedBlock::FragmentSendInfo::FragmentSendInfo()
2212 Uint32 sigLen = signal->
length() - 1;
2213 Uint32 fragId = signal->theData[sigLen];
2214 Uint32 fragInfo = signal->header.m_fragmentInfo;
2215 Uint32 senderRef = signal->getSendersBlockRef();
2217 Uint32 *sectionPtr = signal->m_sectionPtrI;
2223 const Uint32 secs = signal->header.m_noOfSections;
2224 const Uint32 *
const secNos = &signal->theData[sigLen - secs];
2231 if(!c_fragmentInfoHash.
seize(fragPtr)){
2237 c_fragmentInfoHash.
add(fragPtr);
2239 for(Uint32 i = 0; i<secs; i++){
2240 Uint32 sectionNo = secNos[
i];
2241 ndbassert(sectionNo < 3);
2242 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtr[
i];
2245 ndbassert(! fragPtr.p->isDropped() );
2250 signal->header.m_fragmentInfo = 0;
2251 signal->header.m_noOfSections = 0;
2257 if(c_fragmentInfoHash.
find(fragPtr, key)){
2262 if ( likely(! fragPtr.p->isDropped()) )
2265 for(i = 0; i<secs; i++){
2266 Uint32 sectionNo = secNos[
i];
2267 ndbassert(sectionNo < 3);
2268 Uint32 sectionPtrI = sectionPtr[
i];
2269 if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){
2270 linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI);
2272 fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI;
2280 signal->header.m_fragmentInfo = 0;
2281 signal->header.m_noOfSections = 0;
2288 for(i = 0; i<3; i++){
2289 Uint32 ptrI = fragPtr.p->m_sectionPtrI[
i];
2291 signal->m_sectionPtrI[
i] = ptrI;
2297 signal->setLength(sigLen - secs);
2298 signal->header.m_noOfSections =
i;
2299 signal->header.m_fragmentInfo = 0;
2301 c_fragmentInfoHash.
release(fragPtr);
2309 for (Uint32 i=0; i < secs; i++)
2310 releaseSection( sectionPtr[i] );
2312 signal->header.m_fragmentInfo = 0;
2313 signal->header.m_noOfSections = 0;
2326 Uint32 gsn = signal->header.theVerId_signalNumber;
2327 Uint32 len = signal->header.theLength;
2328 Uint32 newLen= (len > 22 ? 22 : len);
2329 memmove(rep->originalData, signal->theData, (4 * newLen));
2330 rep->originalGsn = gsn;
2331 rep->originalLength = len;
2332 rep->originalSectionCount = 0;
2333 signal->header.theVerId_signalNumber = GSN_SIGNAL_DROPPED_REP;
2334 signal->header.theLength = newLen + 3;
2335 signal->header.m_noOfSections = 0;
2336 signal->header.m_fragmentInfo = 3;
2389 Uint32 sigLen = signal->
length() - 1;
2390 Uint32 fragId = signal->theData[sigLen];
2391 Uint32 fragInfo = signal->header.m_fragmentInfo;
2392 Uint32 senderRef = signal->getSendersBlockRef();
2399 ndbrequire(signal->header.theVerId_signalNumber == GSN_SIGNAL_DROPPED_REP);
2400 ndbrequire(signal->header.m_noOfSections == 0);
2407 if(!c_fragmentInfoHash.
seize(fragPtr)){
2413 c_fragmentInfoHash.
add(fragPtr);
2418 fragPtr.p->m_sectionPtrI[0]= RNIL;
2419 fragPtr.p->m_sectionPtrI[1]= RNIL;
2420 fragPtr.p->m_sectionPtrI[2]= RNIL;
2423 signal->header.m_fragmentInfo = 0;
2429 if(c_fragmentInfoHash.
find(fragPtr, key)){
2434 if (! fragPtr.p->isDropped() )
2439 releaseSection(fragPtr.p->m_sectionPtrI[0]);
2440 releaseSection(fragPtr.p->m_sectionPtrI[1]);
2441 releaseSection(fragPtr.p->m_sectionPtrI[2]);
2444 fragPtr.p->m_sectionPtrI[0]= RNIL;
2445 fragPtr.p->m_sectionPtrI[1]= RNIL;
2446 fragPtr.p->m_sectionPtrI[2]= RNIL;
2448 ndbassert( fragPtr.p->isDropped() );
2457 signal->header.m_fragmentInfo = 0;
2467 signal->header.m_fragmentInfo = 0;
2469 c_fragmentInfoHash.
release(fragPtr);
2504 SimulatedBlock::doCleanupFragInfo(Uint32 failedNodeId,
2506 Uint32& rtUnitsUsed,
2507 Uint32& elementsCleaned)
2512 c_fragmentInfoHash.
next(cursor, iter);
2514 const Uint32 startBucket = iter.bucket;
2516 while (!iter.isNull() &&
2517 (iter.bucket == startBucket))
2522 c_fragmentInfoHash.
next(iter);
2526 if (refToNode(fragInfo->m_senderRef) == failedNodeId)
2533 for(Uint32 s = 0; s<3; s++)
2535 if (fragInfo->m_sectionPtrI[s] != RNIL)
2539 getSection(ssptr, fragInfo->m_sectionPtrI[s]);
2545 c_fragmentInfoHash.
release(curr);
2554 cursor = iter.bucket;
2555 return iter.isNull();
2559 SimulatedBlock::doCleanupFragSend(Uint32 failedNodeId,
2561 Uint32& rtUnitsUsed,
2562 Uint32& elementsCleaned)
2567 const Uint32 NumSendLists = 2;
2568 ndbrequire(cursor < NumSendLists);
2571 { &c_segmentedFragmentSendList,
2572 &c_linearFragmentSendList };
2576 list->
first(fragPtr);
2577 for(; !fragPtr.isNull();){
2580 list->
next(fragPtr);
2585 if (rg.m_nodes.
get(failedNodeId))
2589 rg.m_nodes.
clear(failedNodeId);
2599 copyPtr.p->m_status = FragmentSendInfo::SendCancelled;
2608 return (cursor == NumSendLists);
2613 SimulatedBlock::doNodeFailureCleanup(
Signal* signal,
2614 Uint32 failedNodeId,
2617 Uint32 elementsCleaned,
2621 const bool userCallback = (cb.m_callbackFunction != 0);
2622 const Uint32 maxRtUnits = userCallback ?
2630 Uint32 rtUnitsUsed = 0;
2635 bool resourceDone=
false;
2637 case ContinueFragmented::RES_FRAGSEND:
2640 resourceDone = doCleanupFragSend(failedNodeId, cursor,
2641 rtUnitsUsed, elementsCleaned);
2644 case ContinueFragmented::RES_FRAGINFO:
2647 resourceDone = doCleanupFragInfo(failedNodeId, cursor,
2648 rtUnitsUsed, elementsCleaned);
2651 case ContinueFragmented::RES_LAST:
2656 execute(signal, cb, elementsCleaned);
2658 return elementsCleaned;
2671 }
while (rtUnitsUsed <= maxRtUnits);
2679 ndbassert(userCallback);
2684 sig->type = ContinueFragmented::CONTINUE_CLEANUP;
2685 sig->cleanup.failedNodeId = failedNodeId;
2686 sig->cleanup.resource = resource;
2687 sig->cleanup.cursor =
cursor;
2688 sig->cleanup.elementsCleaned= elementsCleaned;
2689 Uint32 callbackWords = (
sizeof(Callback) + 3) >> 2;
2690 Uint32 sigLen = ContinueFragmented::CONTINUE_CLEANUP_FIXED_WORDS +
2692 ndbassert(sigLen <= 25);
2693 memcpy(&sig->cleanup.callbackStart, &cb, callbackWords << 2);
2695 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, sigLen, JBB);
2697 return elementsCleaned;
2702 Uint32 failedNodeId,
2706 return doNodeFailureCleanup(signal, failedNodeId, 0, 0, 0, cb);
2710 SimulatedBlock::debugPrintFragmentCounts()
2712 const char* blockName = getBlockName(theNumber);
2714 Uint32 fragmentInfoCount = 0;
2715 c_fragmentInfoHash.
first(iter);
2717 while(!iter.isNull())
2719 fragmentInfoCount++;
2720 c_fragmentInfoHash.
next(iter);
2724 Uint32 linSendInfoCount = 0;
2726 c_linearFragmentSendList.
first(ptr);
2728 while (!ptr.isNull())
2731 c_linearFragmentSendList.
next(ptr);
2734 Uint32 segSendInfoCount = 0;
2735 c_segmentedFragmentSendList.
first(ptr);
2737 while (!ptr.isNull())
2740 c_segmentedFragmentSendList.
next(ptr);
2743 ndbout_c(
"%s : Fragment assembly hash entry count : %d",
2747 ndbout_c(
"%s : Linear fragment send list size : %d",
2751 ndbout_c(
"%s : Segmented fragment send list size : %d",
2755 return fragmentInfoCount +
2764 GlobalSignalNumber gsn,
2767 JobBufferLevel jbuf,
2770 Uint32 messageSize) {
2772 Uint32 noSections = sections->m_cnt;
2775 info.m_sectionPtr[0].m_segmented.i = RNIL;
2776 info.m_sectionPtr[1].m_segmented.i = RNIL;
2777 info.m_sectionPtr[2].m_segmented.i = RNIL;
2779 Uint32 totalSize = 0;
2782 info.m_sectionPtr[2].m_segmented.i = ptr[2].i;
2783 info.m_sectionPtr[2].m_segmented.p = ptr[2].p;
2784 totalSize += ptr[2].sz;
2786 info.m_sectionPtr[1].m_segmented.i = ptr[1].i;
2787 info.m_sectionPtr[1].m_segmented.p = ptr[1].p;
2788 totalSize += ptr[1].sz;
2790 info.m_sectionPtr[0].m_segmented.i = ptr[0].i;
2791 info.m_sectionPtr[0].m_segmented.p = ptr[0].p;
2792 totalSize += ptr[0].sz;
2795 if(totalSize <= messageSize + SectionSegment::DataLength){
2802 sendSignal(rg, gsn, signal, length, jbuf, sections);
2804 info.m_status = FragmentSendInfo::SendComplete;
2811 info.m_status = FragmentSendInfo::SendNotComplete;
2812 info.m_prio = (Uint8)jbuf;
2814 info.m_fragInfo = 1;
2816 info.m_messageSize = messageSize;
2817 info.m_fragmentId = c_fragmentIdCounter++;
2818 info.m_nodeReceiverGroup = rg;
2819 info.m_callback.m_callbackFunction = 0;
2824 info.m_flags|= FragmentSendInfo::SendNoReleaseSeg;
2832 sections->m_cnt = 0;
2837 if(!
import(tmp, &signal->theData[0], length))
2839 handle_out_of_longsignal_memory(0);
2842 info.m_theDataSection.p = &tmp.p->theData[0];
2843 info.m_theDataSection.sz = length;
2844 tmp.p->theData[length] = tmp.i;
2848 if(c_fragmentIdCounter == 0){
2852 c_fragmentIdCounter = 1;
2868 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
2873 if (0 == (info.m_flags & FragmentSendInfo::SendNoReleaseSeg))
2879 for (Uint32 s = 0; s < 3; s++)
2881 Uint32 sectionI = info.m_sectionPtr[s].m_segmented.i;
2882 if (sectionI != RNIL)
2884 getSection(handle.m_ptr[handle.m_cnt], sectionI);
2885 info.m_sectionPtr[s].m_segmented.i = RNIL;
2886 info.m_sectionPtr[s].m_segmented.p = NULL;
2891 releaseSections(handle);
2895 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
2896 g_sectionSegmentPool.
release(SB_SP_REL_ARG inlineDataI);
2898 info.m_status = FragmentSendInfo::SendComplete;
2905 const Uint32 sigLen = info.m_theDataSection.sz;
2906 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
2909 Uint32 maxSz = info.m_messageSize;
2912 Uint32 secCount = 0;
2913 Uint32 * secNos = &signal->theData[sigLen];
2919 Uint32 splitSectionStartI= RNIL;
2921 Uint32 splitSectionLastSegment= RNIL;
2922 Uint32 splitSectionSz= 0;
2924 enum { Unknown = 0, Full = 1 } loop = Unknown;
2925 for(; secNo >= 0 && secCount < 3; secNo--){
2926 Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i;
2930 info.m_sectionPtr[secNo].m_segmented.i = RNIL;
2933 const Uint32 size = ptrP->m_sz;
2935 ptr[secCount].i = ptrI;
2936 ptr[secCount].p = ptrP;
2937 ptr[secCount].sz =
size;
2938 secNos[secCount] = secNo;
2941 const Uint32 sizeLeft = maxSz - sz;
2942 if(size <= sizeLeft){
2947 lsout(ndbout_c(
"section %d saved as %d", secNo, secCount-1));
2951 const Uint32 overflow = size - sizeLeft;
2952 if(overflow <= SectionSegment::DataLength){
2957 lsout(ndbout_c(
"section %d saved as %d but full over: %d",
2958 secNo, secCount-1, overflow));
2964 if(sizeLeft < SectionSegment::DataLength){
2970 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
2972 lsout(ndbout_c(
"section %d not saved", secNo));
2984 Uint32 sum = SectionSegment::DataLength;
2985 Uint32 prevPtrI = ptrI;
2986 ptrI = ptrP->m_nextSegment;
2987 const Uint32 fill = sizeLeft - SectionSegment::DataLength;
2990 ptrP = g_sectionSegmentPool.
getPtr(ptrI);
2991 ptrI = ptrP->m_nextSegment;
2992 sum += SectionSegment::DataLength;
2995 Uint32 prev = secCount - 1;
3002 splitSectionStartI= ptr[prev].i;
3003 splitSectionStartP= ptr[prev].p;
3004 splitSectionLastSegment= splitSectionStartP->m_lastSegment;
3005 splitSectionSz= splitSectionStartP->m_sz;
3011 splitSectionStartP->m_lastSegment = prevPtrI;
3012 splitSectionStartP->m_sz = sum;
3019 ptrP = g_sectionSegmentPool.
getPtr(ptrI);
3020 ptrP->m_lastSegment = splitSectionLastSegment;
3021 ptrP->m_sz = size - sum;
3026 info.m_sectionPtr[secNo].m_segmented.i = ptrI;
3027 info.m_sectionPtr[secNo].m_segmented.p = ptrP;
3030 lsout(ndbout_c(
"section %d split into %d", secNo, prev));
3034 lsout(ndbout_c(
"loop: %d secNo: %d secCount: %d sz: %d",
3035 loop, secNo, secCount, sz));
3040 secNos[secCount] = info.m_fragmentId;
3042 Uint32 fragInfo = info.m_fragInfo;
3043 info.m_fragInfo = 2;
3047 lsout(ndbout_c(
"Unknown - Full"));
3054 lsout(ndbout_c(
"Unknown - Done"));
3055 info.m_status = FragmentSendInfo::SendComplete;
3056 ndbassert(fragInfo == 2);
3062 signal->header.m_fragmentInfo = fragInfo;
3063 signal->header.m_noOfSections = 0;
3064 sections.m_cnt = secCount;
3066 if (info.m_flags & FragmentSendInfo::SendNoReleaseSeg)
3071 sigLen + secCount + 1,
3072 (JobBufferLevel)info.m_prio,
3089 ndbrequire( splitSectionStartI != RNIL );
3090 ndbrequire( splitSectionStartP != NULL );
3091 ndbrequire( splitSectionLastSegment != RNIL );
3093 splitSectionStartP->m_lastSegment= splitSectionLastSegment;
3094 splitSectionStartP->m_sz= splitSectionSz;
3097 assert(verifySection(splitSectionStartI));
3103 sendSignal(info.m_nodeReceiverGroup,
3106 sigLen + secCount + 1,
3107 (JobBufferLevel)info.m_prio,
3116 g_sectionSegmentPool.
release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3123 GlobalSignalNumber gsn,
3126 JobBufferLevel jbuf,
3128 Uint32 noOfSections,
3129 Uint32 messageSize){
3131 check_sections(signal, signal->header.m_noOfSections, noOfSections);
3133 info.m_sectionPtr[0].m_linear.p = NULL;
3134 info.m_sectionPtr[1].m_linear.p = NULL;
3135 info.m_sectionPtr[2].m_linear.p = NULL;
3137 Uint32 totalSize = 0;
3138 switch(noOfSections){
3140 info.m_sectionPtr[2].m_linear = ptr[2];
3141 totalSize += ptr[2].sz;
3143 info.m_sectionPtr[1].m_linear = ptr[1];
3144 totalSize += ptr[1].sz;
3146 info.m_sectionPtr[0].m_linear = ptr[0];
3147 totalSize += ptr[0].sz;
3150 if(totalSize <= messageSize + SectionSegment::DataLength){
3154 sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections);
3155 info.m_status = FragmentSendInfo::SendComplete;
3167 info.m_status = FragmentSendInfo::SendNotComplete;
3168 info.m_prio = (Uint8)jbuf;
3170 info.m_messageSize = messageSize;
3171 info.m_fragInfo = 1;
3173 info.m_fragmentId = c_fragmentIdCounter++;
3174 info.m_nodeReceiverGroup = rg;
3175 info.m_callback.m_callbackFunction = 0;
3178 if(unlikely(!
import(tmp, &signal->theData[0], length)))
3180 handle_out_of_longsignal_memory(0);
3184 info.m_theDataSection.p = &tmp.p->theData[0];
3185 info.m_theDataSection.sz = length;
3186 tmp.p->theData[length] = tmp.i;
3190 if(c_fragmentIdCounter == 0){
3194 c_fragmentIdCounter = 1;
3204 if (unlikely(info.m_status == FragmentSendInfo::SendCancelled))
3210 Uint32 inlineDataI = info.m_theDataSection.p[info.m_theDataSection.sz];
3211 g_sectionSegmentPool.
release(SB_SP_REL_ARG inlineDataI);
3213 info.m_status = FragmentSendInfo::SendComplete;
3220 const Uint32 sigLen = info.m_theDataSection.sz;
3221 memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen);
3224 Uint32 maxSz = info.m_messageSize;
3227 Uint32 secCount = 0;
3228 Uint32 * secNos = &signal->theData[sigLen];
3231 enum { Unknown = 0, Full = 2 } loop = Unknown;
3232 for(; secNo >= 0 && secCount < 3; secNo--){
3233 Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p;
3237 info.m_sectionPtr[secNo].m_linear.p = NULL;
3238 const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz;
3240 signalPtr[secCount].p = ptrP;
3241 signalPtr[secCount].sz =
size;
3242 secNos[secCount] = secNo;
3245 const Uint32 sizeLeft = maxSz - sz;
3246 if(size <= sizeLeft){
3251 lsout(ndbout_c(
"section %d saved as %d", secNo, secCount-1));
3255 const Uint32 overflow = size - sizeLeft;
3256 if(overflow <= SectionSegment::DataLength){
3261 lsout(ndbout_c(
"section %d saved as %d but full over: %d",
3262 secNo, secCount-1, overflow));
3268 if(sizeLeft < SectionSegment::DataLength){
3274 info.m_sectionPtr[secNo].m_linear.p = ptrP;
3276 lsout(ndbout_c(
"section %d not saved", secNo));
3287 Uint32 sum = sizeLeft;
3288 sum /= SectionSegment::DataLength;
3289 sum *= SectionSegment::DataLength;
3294 Uint32 prev = secCount - 1;
3295 signalPtr[prev].sz = sum;
3300 info.m_sectionPtr[secNo].m_linear.p = ptrP + sum;
3301 info.m_sectionPtr[secNo].m_linear.sz = size - sum;
3304 lsout(ndbout_c(
"section %d split into %d", secNo, prev));
3308 lsout(ndbout_c(
"loop: %d secNo: %d secCount: %d sz: %d",
3309 loop, secNo, secCount, sz));
3314 secNos[secCount] = info.m_fragmentId;
3316 Uint32 fragInfo = info.m_fragInfo;
3317 info.m_fragInfo = 2;
3321 lsout(ndbout_c(
"Unknown - Full"));
3328 lsout(ndbout_c(
"Unknown - Done"));
3329 info.m_status = FragmentSendInfo::SendComplete;
3330 ndbassert(fragInfo == 2);
3336 signal->header.m_noOfSections = 0;
3337 signal->header.m_fragmentInfo = fragInfo;
3339 sendSignal(info.m_nodeReceiverGroup,
3342 sigLen + secCount + 1,
3343 (JobBufferLevel)info.m_prio,
3351 g_sectionSegmentPool.
release(SB_SP_REL_ARG info.m_theDataSection.p[sigLen]);
3356 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3357 GlobalSignalNumber gsn,
3360 JobBufferLevel jbuf,
3363 Uint32 messageSize){
3366 res = c_segmentedFragmentSendList.
seize(tmp);
3380 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3381 c_segmentedFragmentSendList.
release(tmp);
3382 if(c.m_callbackFunction != 0)
3383 execute(signal, c, 0);
3386 tmp.p->m_callback = c;
3388 if(!c_fragSenderRunning)
3391 c_fragSenderRunning =
true;
3393 sig->type = ContinueFragmented::CONTINUE_SENDING;
3394 sig->line = __LINE__;
3395 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3401 GlobalSignalNumber gsn,
3404 JobBufferLevel jbuf,
3407 Uint32 messageSize){
3410 res = c_segmentedFragmentSendList.
seize(tmp);
3424 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3425 c_segmentedFragmentSendList.
release(tmp);
3426 if(c.m_callbackFunction != 0)
3427 execute(signal, c, 0);
3430 tmp.p->m_callback = c;
3432 if(!c_fragSenderRunning)
3435 c_fragSenderRunning =
true;
3437 sig->type = ContinueFragmented::CONTINUE_SENDING;
3438 sig->line = __LINE__;
3439 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3445 SimulatedBlock::TheNULLCallbackFunction(
class Signal*, Uint32, Uint32)
3448 { &SimulatedBlock::TheNULLCallbackFunction, 0 };
3451 SimulatedBlock::sendFragmentedSignal(BlockReference ref,
3452 GlobalSignalNumber gsn,
3455 JobBufferLevel jbuf,
3457 Uint32 noOfSections,
3459 Uint32 messageSize){
3462 res = c_linearFragmentSendList.
seize(tmp);
3476 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3477 c_linearFragmentSendList.
release(tmp);
3478 if(c.m_callbackFunction != 0)
3479 execute(signal, c, 0);
3482 tmp.p->m_callback = c;
3484 if(!c_fragSenderRunning)
3487 c_fragSenderRunning =
true;
3489 sig->type = ContinueFragmented::CONTINUE_SENDING;
3490 sig->line = __LINE__;
3491 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3497 GlobalSignalNumber gsn,
3500 JobBufferLevel jbuf,
3502 Uint32 noOfSections,
3504 Uint32 messageSize){
3507 res = c_linearFragmentSendList.
seize(tmp);
3521 if(tmp.p->m_status == FragmentSendInfo::SendComplete){
3522 c_linearFragmentSendList.
release(tmp);
3523 if(c.m_callbackFunction != 0)
3524 execute(signal, c, 0);
3527 tmp.p->m_callback = c;
3529 if(!c_fragSenderRunning)
3532 c_fragSenderRunning =
true;
3534 sig->type = ContinueFragmented::CONTINUE_SENDING;
3535 sig->line = __LINE__;
3536 sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 2, JBB);
3541 SimulatedBlock::setNodeInfo(NodeId nodeId) {
3542 ndbrequire(nodeId > 0 && nodeId < MAX_NODES);
3543 return globalData.m_nodeInfo[nodeId];
3547 SimulatedBlock::isMultiThreaded()
3549 #ifdef NDBD_MULTITHREADED
3558 SimulatedBlock::execUTIL_CREATE_LOCK_REF(
Signal* signal){
3560 c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal);
3563 void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(
Signal* signal){
3565 c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal);
3568 void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(
Signal* signal){
3570 c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal);
3573 void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(
Signal* signal){
3575 c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal);
3578 void SimulatedBlock::execUTIL_LOCK_REF(
Signal* signal){
3580 c_mutexMgr.execUTIL_LOCK_REF(signal);
3583 void SimulatedBlock::execUTIL_LOCK_CONF(
Signal* signal){
3585 c_mutexMgr.execUTIL_LOCK_CONF(signal);
3588 void SimulatedBlock::execUTIL_UNLOCK_REF(
Signal* signal){
3590 c_mutexMgr.execUTIL_UNLOCK_REF(signal);
3593 void SimulatedBlock::execUTIL_UNLOCK_CONF(
Signal* signal){
3595 c_mutexMgr.execUTIL_UNLOCK_CONF(signal);
3599 SimulatedBlock::ignoreMutexUnlockCallback(
Signal* signal,
3600 Uint32 ptrI, Uint32 retVal){
3601 c_mutexMgr.release(ptrI);
3605 SimulatedBlock::fsRefError(
Signal* signal, Uint32 line,
const char *msg)
3607 const FsRef *fsRef = (
FsRef*)signal->getDataPtr();
3608 Uint32 errorCode = fsRef->errorCode;
3609 Uint32 osErrorCode = fsRef->osErrorCode;
3612 sprintf(msg2,
"%s: %s. OS errno: %u", getBlockName(number()), msg, osErrorCode);
3618 SimulatedBlock::execFSWRITEREF(
Signal* signal)
3620 fsRefError(signal, __LINE__,
"File system write failed");
3624 SimulatedBlock::execFSREADREF(
Signal* signal)
3626 fsRefError(signal, __LINE__,
"File system read failed");
3630 SimulatedBlock::execFSCLOSEREF(
Signal* signal)
3632 fsRefError(signal, __LINE__,
"File system close failed");
3636 SimulatedBlock::execFSOPENREF(
Signal* signal)
3638 fsRefError(signal, __LINE__,
"File system open failed");
3642 SimulatedBlock::execFSREMOVEREF(
Signal* signal)
3644 fsRefError(signal, __LINE__,
"File system remove failed");
3648 SimulatedBlock::execFSSYNCREF(
Signal* signal)
3650 fsRefError(signal, __LINE__,
"File system sync failed");
3654 SimulatedBlock::execFSAPPENDREF(
Signal* signal)
3656 fsRefError(signal, __LINE__,
"File system append failed");
3660 static Ptr<void> * m_empty_global_variables[] = { 0 };
3662 SimulatedBlock::disable_global_variables()
3664 m_global_variables_save = m_global_variables;
3665 m_global_variables = m_empty_global_variables;
3669 SimulatedBlock::enable_global_variables()
3671 if (m_global_variables == m_empty_global_variables)
3673 m_global_variables = m_global_variables_save;
3678 SimulatedBlock::clear_global_variables(){
3688 SimulatedBlock::init_globals_list(
void ** tmp,
size_t cnt){
3689 m_global_variables =
new Ptr<void> * [cnt+1];
3690 for(
size_t i = 0; i<cnt; i++){
3693 m_global_variables[cnt] = 0;
3698 #include "KeyDescriptor.hpp"
3702 Uint32 *dst, Uint32 dstSize,
3703 Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX])
const
3705 const KeyDescriptor * desc = g_key_descriptor_pool.getPtr(tab);
3706 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3711 while (i < noOfKeyAttr)
3715 xfrm_attr(keyAttr.attributeDescriptor, keyAttr.charsetInfo,
3716 src, srcPos, dst, dstPos, dstSize);
3717 keyPartLen[i++] = dstWords;
3718 if (unlikely(dstWords == 0))
3724 for(Uint32 i = 0; i<dstPos; i++)
3726 printf(
"%.8x ", dst[i]);
3734 SimulatedBlock::xfrm_attr(Uint32 attrDesc,
CHARSET_INFO* cs,
3735 const Uint32* src, Uint32 & srcPos,
3736 Uint32* dst, Uint32 & dstPos, Uint32 dstSize)
const
3739 AttributeDescriptor::getArrayType(attrDesc);
3741 AttributeDescriptor::getSizeInBytes(attrDesc);
3743 Uint32 srcWords = ~0;
3744 Uint32 dstWords = ~0;
3745 uchar* dstPtr = (uchar*)&dst[dstPos];
3746 const uchar* srcPtr = (
const uchar*)&src[srcPos];
3754 case NDB_ARRAYTYPE_SHORT_VAR:
3755 len = 1 + srcPtr[0];
3757 case NDB_ARRAYTYPE_MEDIUM_VAR:
3758 len = 2 + srcPtr[0] + (srcPtr[1] << 8);
3764 case NDB_ARRAYTYPE_FIXED:
3767 srcWords = (len + 3) >> 2;
3768 dstWords = srcWords;
3769 memcpy(dstPtr, srcPtr, dstWords << 2);
3773 ndbout_c(
"srcPos: %d dstPos: %d len: %d srcWords: %d dstWords: %d",
3774 srcPos, dstPos, len, srcWords, dstWords);
3776 for(Uint32 i = 0; i<srcWords; i++)
3777 printf(
"%.8x ", src[srcPos + i]);
3790 Uint32 xmul = cs->strxfrm_multiply;
3797 Uint32 dstLen = xmul * (srcBytes - lb);
3798 ndbrequire(dstLen <= ((dstSize - dstPos) << 2));
3800 if (unlikely(n == -1))
3802 while ((n & 3) != 0)
3806 dstWords = (n >> 2);
3807 srcWords = (lb + len + 3) >> 2;
3816 SimulatedBlock::create_distr_key(Uint32 tableId,
3820 keyPartLen[MAX_ATTRIBUTES_IN_INDEX])
const
3822 const KeyDescriptor* desc = g_key_descriptor_pool.getPtr(tableId);
3823 const Uint32 noOfKeyAttr = desc->noOfKeyAttr;
3824 Uint32 noOfDistrKeys = desc->noOfDistrKeys;
3833 while (i < noOfKeyAttr && noOfDistrKeys)
3835 Uint32 attr = desc->keyAttr[
i].attributeDescriptor;
3836 Uint32 len = keyPartLen[
i];
3837 if(AttributeDescriptor::getDKey(attr))
3840 memmove(dst+dstPos, src, len << 2);
3849 while (i < noOfKeyAttr && noOfDistrKeys)
3851 Uint32 attr = desc->keyAttr[
i].attributeDescriptor;
3852 Uint32 len = AttributeDescriptor::getSizeInWords(attr);
3853 ndbrequire(AttributeDescriptor::getArrayType(attr) == NDB_ARRAYTYPE_FIXED);
3854 if(AttributeDescriptor::getDKey(attr))
3857 memmove(dst+dstPos, src, len << 2);
3876 JobBufferLevel prio,
3879 ndbrequire(pathcnt > 0);
3883 Uint32 len = LocalRouteOrd::StaticLen + (2 * pathcnt) + dstcnt;
3884 ndbrequire(len <= 25);
3890 handle.m_cnt = userhandle->m_cnt;
3891 for (Uint32 i = 0; i<handle.m_cnt; i++)
3892 handle.m_ptr[i] = userhandle->m_ptr[i];
3893 userhandle->m_cnt = 0;
3896 if (len + sigLen > 25)
3903 ndbrequire(handle.m_cnt < 3);
3904 handle.m_ptr[2] = handle.m_ptr[1];
3905 handle.m_ptr[1] = handle.m_ptr[0];
3907 if (unlikely(!
import(tmp, signal->theData, sigLen)))
3909 handle_out_of_longsignal_memory(0);
3911 handle.m_ptr[0].p = tmp.p;
3912 handle.m_ptr[0].i = tmp.i;
3913 handle.m_ptr[0].sz = sigLen;
3919 memmove(signal->theData + len, signal->theData, 4 * sigLen);
3924 ord->cnt = (pathcnt << 16) | (dstcnt);
3926 ord->prio = Uint32(prio);
3928 Uint32 * dstptr = ord->path;
3929 for (Uint32 i = 1; i <= pathcnt; i++)
3931 ndbrequire(refToNode(path[i].ref) == 0 ||
3932 refToNode(path[i].ref) == getOwnNodeId());
3934 * dstptr++ = path[
i].ref;
3935 * dstptr++ = Uint32(path[i].prio);
3938 for (Uint32 i = 0; i<dstcnt; i++)
3940 ndbrequire(refToNode(dst[i]) == 0 ||
3941 refToNode(dst[i]) == getOwnNodeId());
3943 * dstptr++ = dst[
i];
3946 sendSignal(path[0].ref, GSN_LOCAL_ROUTE_ORD, signal, len,
3947 path[0].prio, &handle);
3961 if (ERROR_INSERTED(1001))
3968 sendSignalWithDelay(reference(), GSN_LOCAL_ROUTE_ORD, signal, 200,
3969 signal->getLength(), &handle);
3974 Uint32 pathcnt = ord->cnt >> 16;
3975 Uint32 dstcnt = ord->cnt & 0xFFFF;
3976 Uint32 sigLen = signal->getLength();
3984 Uint32 gsn = ord->gsn;
3985 Uint32 prio = ord->prio;
3986 memcpy(signal->theData+25, ord->path, 4*dstcnt);
3988 if (sigLen > LocalRouteOrd::StaticLen + dstcnt)
3994 memmove(signal->theData,
3995 signal->theData + LocalRouteOrd::StaticLen + dstcnt,
3996 4 * (sigLen - (LocalRouteOrd::StaticLen + dstcnt)));
3997 sigLen = sigLen - (LocalRouteOrd::StaticLen + dstcnt);
4005 sigLen = handle.m_ptr[0].sz;
4006 ndbrequire(sigLen <= 25);
4007 copy(signal->theData, handle.m_ptr[0]);
4008 release(handle.m_ptr[0]);
4010 for (Uint32 i = 0; i < handle.m_cnt - 1; i++)
4011 handle.m_ptr[i] = handle.m_ptr[i+1];
4022 for (Uint32 i = 0; i<dstcnt; i++)
4026 JobBufferLevel(prio), &handle);
4028 releaseSections(handle);
4033 sendSignal(signal->theData[25+0], gsn, signal, sigLen,
4034 JobBufferLevel(prio), &handle);
4044 Uint32 ref = ord->path[0];
4045 Uint32 prio = ord->path[1];
4046 Uint32 len = sigLen - 2;
4047 ord->cnt = ((pathcnt - 1) << 16) | dstcnt;
4048 memmove(ord->path, ord->path+2, 4 * (len - LocalRouteOrd::StaticLen));
4049 sendSignal(ref, GSN_LOCAL_ROUTE_ORD, signal, len,
4050 JobBufferLevel(prio), &handle);
4057 SimulatedBlock::debugOutOn()
4061 globalData.testOn &&
4062 globalSignalLoggers.logMatch(number(), mask);
4066 SimulatedBlock::debugOutTag(
char *
buf,
int line)
4069 char instancebuf[40];
4072 sprintf(blockbuf,
"%s", getBlockName(number(),
"UNKNOWN"));
4074 if (instance() != 0)
4075 sprintf(instancebuf,
"/%u", instance());
4076 sprintf(linebuf,
" %d", line);
4078 #ifdef VM_TRACE_TIME
4080 NDB_TICKS t = NdbTick_CurrentMillisecond();
4081 uint s = (t / 1000) % 3600;
4083 sprintf(timebuf,
" - %u.%03u -", s, ms);
4086 sprintf(buf,
"%s%s%s%s ", blockbuf, instancebuf, linebuf, timebuf);
4093 const Uint32 blocks[],
4095 JobBufferLevel prio)
4097 #ifndef NDBD_MULTITHREADED
4099 execute(signal, copy, 0);
4103 Uint32 cnt = mt_get_thread_references_for_blocks(blocks, getThreadId(),
4104 ref, NDB_ARRAY_SIZE(ref));
4109 execute(signal, copy, 0);
4114 ndbrequire(c_syncThreadPool.
seize(ptr));
4116 ptr.p->m_callback = cb;
4118 signal->theData[0] = reference();
4119 signal->theData[1] = ptr.i;
4120 signal->theData[2] = Uint32(prio);
4121 for (Uint32 i = 0; i<cnt; i++)
4123 sendSignal(ref[i], GSN_SYNC_THREAD_REQ, signal, 3, prio);
4129 SimulatedBlock::execSYNC_THREAD_REQ(
Signal* signal)
4132 Uint32 ref = signal->theData[0];
4133 Uint32 prio = signal->theData[2];
4134 sendSignal(ref, GSN_SYNC_THREAD_CONF, signal, signal->getLength(),
4135 JobBufferLevel(prio));
4139 SimulatedBlock::execSYNC_THREAD_CONF(
Signal* signal)
4143 c_syncThreadPool.
getPtr(ptr, signal->theData[1]);
4144 if (ptr.p->m_cnt == 1)
4147 Callback copy = ptr.p->m_callback;
4148 c_syncThreadPool.
release(ptr);
4149 execute(signal, copy, 0);
4156 SimulatedBlock::execSYNC_REQ(
Signal* signal)
4159 Uint32 ref = signal->theData[0];
4160 Uint32 prio = signal->theData[2];
4161 sendSignal(ref, GSN_SYNC_CONF, signal, signal->getLength(),
4162 JobBufferLevel(prio));
4167 const Uint32 blocks[],
4169 JobBufferLevel prio)
4175 ndbrequire(c_syncThreadPool.
seize(ptr));
4177 ptr.p->m_callback = cb;
4180 req->senderData = ptr.i;
4181 req->prio = Uint32(prio);
4192 for (; blocks[len+1] != 0; len++)
4194 req->path[len] = blocks[len+1];
4196 req->pathlen = 1 + len;
4197 req->path[len] = reference();
4198 sendSignal(numberToRef(blocks[0], getOwnNodeId()),
4199 GSN_SYNC_PATH_REQ, signal,
4200 SyncPathReq::SignalLength + (1 + len), prio);
4205 SimulatedBlock::execSYNC_PATH_REQ(
Signal* signal)
4209 if (req->pathlen == 1)
4214 conf->senderData = copy.senderData;
4215 conf->count = copy.count;
4216 sendSignal(copy.path[0], GSN_SYNC_PATH_CONF, signal,
4217 SyncPathConf::SignalLength, JobBufferLevel(copy.prio));
4222 Uint32 ref = numberToRef(req->path[0], getOwnNodeId());
4224 memmove(req->path, req->path + 1, 4 * req->pathlen);
4225 sendSignal(ref, GSN_SYNC_PATH_REQ, signal,
4226 SyncPathReq::SignalLength + (1 + req->pathlen),
4227 JobBufferLevel(req->prio));
4232 SimulatedBlock::execSYNC_PATH_CONF(
Signal* signal)
4238 c_syncThreadPool.
getPtr(ptr, conf.senderData);
4240 if (ptr.p->m_cnt == 0)
4243 ptr.p->m_cnt = conf.count;
4246 if (ptr.p->m_cnt == 1)
4249 Callback copy = ptr.p->m_callback;
4250 c_syncThreadPool.
release(ptr);
4251 execute(signal, copy, 0);
4262 Uint32 ref = signal->getSendersBlockRef();
4277 if (ref == reference() ||
4278 (refToNode(ref) == getOwnNodeId() &&
4279 refToMain(ref) == NDBCNTR))
4286 path[0].ref = QMGR_REF;
4288 path[1].ref = NDBCNTR_REF;
4292 dst[0] = reference();
4295 Uint32 gsn = signal->header.theVerId_signalNumber;
4296 Uint32 len = signal->getLength();
4305 #ifdef NDBD_MULTITHREADED
4307 globalTransporterRegistry.setup_wakeup_socket();
4314 #ifdef NDBD_MULTITHREADED
4317 globalTransporterRegistry.wakeup();
4323 SimulatedBlock::ndbinfo_send_row(
Signal* signal,
4329 assert(row.columns() == Ndbinfo::getTable(req.tableId).columns());
4332 tidai->connectPtr= req.resultData;
4333 tidai->transId[0]= req.transId[0];
4334 tidai->transId[1]= req.transId[1];
4337 ptr[0].p = row.getDataPtr();
4338 ptr[0].sz = row.getLength();
4341 rl.bytes += row.getLength();
4343 sendSignal(req.resultRef, GSN_DBINFO_TRANSID_AI,
4344 signal, TransIdAI::HeaderLength, JBB, ptr, 1);
4349 SimulatedBlock::ndbinfo_send_scan_break(
Signal* signal,
4352 Uint32 data1, Uint32 data2,
4353 Uint32 data3, Uint32 data4)
const
4356 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4357 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4359 conf->returnedRows = rl.rows;
4365 cursor->data[0] = data1;
4366 cursor->data[1] = data2;
4367 cursor->data[2] = data3;
4368 cursor->data[3] = data4;
4371 cursor->totalRows += rl.rows;
4372 cursor->totalBytes += rl.bytes;
4374 Ndbinfo::ScanCursor::setHasMoreData(cursor->
flags,
true);
4376 sendSignal(cursor->senderRef, GSN_DBINFO_SCANCONF, signal,
4377 signal_length, JBB);
4381 SimulatedBlock::ndbinfo_send_scan_conf(
Signal* signal,
4386 const Uint32 signal_length = DbinfoScanReq::SignalLength + req.cursor_sz;
4387 Uint32 sender_ref = req.resultRef;
4388 MEMCOPY_NO_WORDS(conf, &req, signal_length);
4390 conf->returnedRows = rl.rows;
4400 memset(cursor->data, 0,
sizeof(cursor->data));
4403 cursor->totalRows += rl.rows;
4404 cursor->totalBytes += rl.bytes;
4406 Ndbinfo::ScanCursor::setHasMoreData(cursor->
flags,
false);
4408 sender_ref = cursor->senderRef;
4411 sendSignal(sender_ref, GSN_DBINFO_SCANCONF, signal,
4412 signal_length, JBB);
4419 #ifdef NDBD_MULTITHREADED
4420 mt_assert_own_thread(
this);