18 #include "FastScheduler.hpp"
19 #include "RefConvert.hpp"
21 #include "Emulator.hpp"
22 #include "VMSignal.hpp"
24 #include <SignalLoggerManager.hpp>
25 #include <BlockNumbers.h>
26 #include <GlobalSignalNumbers.h>
27 #include <signaldata/EventReport.hpp>
28 #include "LongSignal.hpp"
31 #define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
32 #define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
33 #define EXTRA_SIGNALS_PER_DO_JOB 32
35 FastScheduler::FastScheduler()
39 theJobBuffers[0].newBuffer(JBASIZE);
40 theJobBuffers[1].newBuffer(JBBSIZE);
41 theJobBuffers[2].newBuffer(JBCSIZE);
42 theJobBuffers[3].newBuffer(JBDSIZE);
46 FastScheduler::~FastScheduler()
51 FastScheduler::clear()
56 globalData.highestAvailablePrio = LEVEL_IDLE;
57 globalData.sendPackedActivated = 0;
58 globalData.activateSendPacked = 0;
59 for (i = 0; i < JB_LEVELS; i++){
60 theJobBuffers[
i].clear();
62 globalData.JobCounter = 0;
63 globalData.JobLap = 0;
64 globalData.loopMax = 32;
65 globalData.VMSignals[0].header.theSignalId = 0;
67 theDoJobTotalCounter = 0;
68 theDoJobCallCounter = 0;
72 FastScheduler::activateSendPacked()
74 globalData.sendPackedActivated = 1;
75 globalData.activateSendPacked = 0;
76 globalData.loopMax = 2048;
86 FastScheduler::doJob()
89 Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
90 Uint32 TloopMax = (Uint32)globalData.loopMax;
91 if (TminLoops < TloopMax) {
94 if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
95 TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
97 register Signal* signal = getVMSignals();
98 register Uint32 tHighPrio= globalData.highestAvailablePrio;
100 while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
103 register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
105 register BlockNumber reg_bnr = gsnbnr & NDBMT_BLOCK_MASK;
106 register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
107 globalData.incrementWatchDogCounter(1);
109 Uint32 tJobCounter = globalData.JobCounter;
110 Uint64 tJobLap = globalData.JobLap;
112 theJobPriority[tJobCounter] = (Uint8)tHighPrio;
113 globalData.JobCounter = (tJobCounter + 1) & 4095;
114 globalData.JobLap = tJobLap + 1;
119 NdbTick_CurrentMicrosecond(&ms1, &us1);
120 b->m_currentGsn = reg_gsn;
125 if (globalData.testOn) {
126 signal->header.theVerId_signalNumber = reg_gsn;
127 signal->header.theReceiversBlockNumber = reg_bnr;
129 globalSignalLoggers.executeSignal(signal->header,
138 NdbTick_CurrentMicrosecond(&ms2, &us2);
144 b->addTime(reg_gsn, diff);
146 tHighPrio = globalData.highestAvailablePrio;
149 globalData.highestAvailablePrio = tHighPrio;
154 tHighPrio = globalData.highestAvailablePrio;
155 if(getBOccupancy() > MAX_OCCUPANCY)
157 if(loopCount != TloopMax)
159 assert( loopCount == TloopMax );
162 }
while ((getBOccupancy() > MAX_OCCUPANCY) ||
163 ((loopCount < TloopMax) &&
164 (tHighPrio < LEVEL_IDLE)));
166 theDoJobCallCounter ++;
167 theDoJobTotalCounter += loopCount;
168 if (theDoJobCallCounter == 8192) {
169 reportDoJobStatistics(theDoJobTotalCounter >> 13);
170 theDoJobCallCounter = 0;
171 theDoJobTotalCounter = 0;
177 FastScheduler::postPoll()
179 Signal * signal = getVMSignals();
184 void FastScheduler::sendPacked()
186 if (globalData.sendPackedActivated == 1) {
191 Signal * signal = getVMSignals();
197 }
else if (globalData.activateSendPacked == 0) {
200 activateSendPacked();
206 APZJobBuffer::retrieve(
Signal* signal)
208 Uint32 tOccupancy = theOccupancy;
209 Uint32 myRPtr = rPtr;
212 Uint32 cond = (++myRPtr == bufSize) - 1;
213 Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
215 if (tOccupancy != 0) {
216 if (tRecBlockNo != 0) {
218 rPtr = myRPtr & cond;
219 theOccupancy = tOccupancy - 1;
220 gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
222 Uint32 tSignalId = globalData.theSignalId;
223 Uint32 tLength = buf.header.theLength;
224 Uint32 tFirstData = buf.theDataRegister[0];
225 signal->header = buf.header;
228 buf.header.theSignalId = tSignalId;
229 globalData.theSignalId = tSignalId + 1;
231 Uint32* tDataRegPtr = &buf.theDataRegister[0];
232 Uint32* tSigDataPtr = signal->getDataPtrSend();
233 *tSigDataPtr = tFirstData;
236 Uint32 tLengthCopied = 1;
237 while (tLengthCopied < tLength) {
238 Uint32 tData0 = tDataRegPtr[0];
239 Uint32 tData1 = tDataRegPtr[1];
240 Uint32 tData2 = tDataRegPtr[2];
241 Uint32 tData3 = tDataRegPtr[3];
246 tSigDataPtr[0] = tData0;
247 tSigDataPtr[1] = tData1;
248 tSigDataPtr[2] = tData2;
249 tSigDataPtr[3] = tData3;
253 tSigDataPtr = signal->m_sectionPtrI;
254 tDataRegPtr = buf.theDataRegister + buf.header.theLength;
255 Uint32 ptr0 = * tDataRegPtr ++;
256 Uint32 ptr1 = * tDataRegPtr ++;
257 Uint32 ptr2 = * tDataRegPtr ++;
258 * tSigDataPtr ++ = ptr0;
259 * tSigDataPtr ++ = ptr1;
260 * tSigDataPtr ++ = ptr2;
267 NDB_PREFETCH_READ((
void*)&buffer[rPtr]);
268 NDB_PREFETCH_READ((
void*)(((
char*)&buffer[rPtr]) + 64));
283 APZJobBuffer::signal2buffer(
Signal* signal,
284 BlockNumber bnr, GlobalSignalNumber gsn,
287 Uint32 tSignalId = globalData.theSignalId;
288 Uint32 tFirstData = signal->theData[0];
289 Uint32 tLength = signal->header.theLength + signal->header.m_noOfSections;
290 Uint32 tSigId = buf.header.theSignalId;
292 buf.header = signal->header;
293 buf.header.theVerId_signalNumber = gsn;
294 buf.header.theReceiversBlockNumber = bnr;
295 buf.header.theSendersSignalId = tSignalId - 1;
296 buf.header.theSignalId = tSigId;
297 buf.theDataRegister[0] = tFirstData;
299 Uint32 tLengthCopied = 1;
300 Uint32* tSigDataPtr = &signal->theData[1];
301 Uint32* tDataRegPtr = &buf.theDataRegister[1];
302 while (tLengthCopied < tLength) {
303 Uint32 tData0 = tSigDataPtr[0];
304 Uint32 tData1 = tSigDataPtr[1];
305 Uint32 tData2 = tSigDataPtr[2];
306 Uint32 tData3 = tSigDataPtr[3];
311 tDataRegPtr[0] = tData0;
312 tDataRegPtr[1] = tData1;
313 tDataRegPtr[2] = tData2;
314 tDataRegPtr[3] = tData3;
321 const Uint32 *
const theData,
const Uint32 secPtrI[3]){
322 Uint32 tOccupancy = theOccupancy + 1;
323 Uint32 myWPtr = wPtr;
326 if (tOccupancy < bufSize) {
327 Uint32 cond = (++myWPtr == bufSize) - 1;
328 wPtr = myWPtr & cond;
329 theOccupancy = tOccupancy;
332 const Uint32 len = buf.header.theLength;
333 memcpy(buf.theDataRegister, theData, 4 * len);
334 memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
340 NDB_PREFETCH_WRITE((
void*)&buffer[wPtr]);
341 NDB_PREFETCH_WRITE((
void*)(((
char*)&buffer[wPtr]) + 64));
346 APZJobBuffer::APZJobBuffer()
347 : bufSize(0), buffer(NULL), memRef(NULL)
352 APZJobBuffer::~APZJobBuffer()
358 APZJobBuffer::newBuffer(
int size)
371 APZJobBuffer::clear()
383 void print_restart(FILE * output,
Signal* signal, Uint32 aLevel);
385 void FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE * output)
396 fprintf(output,
"\n");
398 if (globalData.JobLap > 4095) {
399 if (globalData.JobCounter != 0)
400 tJob = globalData.JobCounter - 1;
403 tLastJob = globalData.JobCounter;
405 if (globalData.JobCounter == 0)
408 tJob = globalData.JobCounter - 1;
412 ReadPtr[0] = theJobBuffers[0].getReadPtr();
413 ReadPtr[1] = theJobBuffers[1].getReadPtr();
414 ReadPtr[2] = theJobBuffers[2].getReadPtr();
415 ReadPtr[3] = theJobBuffers[3].getReadPtr();
418 unsigned char tLevel = theJobPriority[tJob];
419 globalData.incrementWatchDogCounter(4);
420 if (ReadPtr[tLevel] == 0)
421 ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
425 theJobBuffers[tLevel].
retrieveDump(signal, ReadPtr[tLevel]);
427 signal->header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
428 print_restart(output, signal, tLevel);
435 }
while (tJob != tLastJob);
440 FastScheduler::prio_level_error()
442 ERROR_SET(ecError, NDBD_EXIT_WRONG_PRIO_LEVEL,
443 "Wrong Priority Level",
"FastScheduler.C");
449 ERROR_SET(ecError, NDBD_EXIT_BLOCK_JBUFCONGESTION,
450 "Job Buffer Full",
"APZJobBuffer.C");
456 ERROR_SET(ecError, NDBD_EXIT_BLOCK_BNR_ZERO,
457 "Block Number Zero",
"FastScheduler.C");
461 print_restart(FILE * output,
Signal* signal, Uint32 aLevel)
463 fprintf(output,
"--------------- Signal ----------------\n");
471 &signal->theData[0]);
475 FastScheduler::traceDumpPrepare(NdbShutdownType&)
481 FastScheduler::traceDumpGetNumThreads()
487 FastScheduler::traceDumpGetCurrentThread()
493 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
494 const Uint32 * & thrdTheEmulatedJam,
495 Uint32 & thrdTheEmulatedJamIndex)
500 #ifdef NO_EMULATED_JAM
502 thrdTheEmulatedJam = NULL;
503 thrdTheEmulatedJamIndex = 0;
507 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
508 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
509 jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
522 FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
525 memset(&signal.header, 0,
sizeof(signal.header));
526 signal.header.theLength = 2;
527 signal.header.theSendersSignalId = 0;
528 signal.header.theSendersBlockRef = numberToRef(0, 0);
529 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
530 signal.header.theReceiversBlockNumber = CMVMI;
533 signal.theData[1] = tMeanLoopCount;
536 execute(&signal.header, JBA, signal.theData, secPtr);
540 FastScheduler::reportThreadConfigLoop(Uint32 expired_time,
541 Uint32 extra_constant,
542 Uint32 *no_exec_loops,
543 Uint32 *tot_exec_time,
544 Uint32 *no_extra_loops,
545 Uint32 *tot_extra_time)
549 memset(&signal.header, 0,
sizeof(signal.header));
550 signal.header.theLength = 6;
551 signal.header.theSendersSignalId = 0;
552 signal.header.theSendersBlockRef = numberToRef(0, 0);
553 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
554 signal.header.theReceiversBlockNumber = CMVMI;
557 signal.theData[1] = expired_time;
558 signal.theData[2] = extra_constant;
559 signal.theData[3] = (*tot_exec_time)/(*no_exec_loops);
560 signal.theData[4] = *no_extra_loops;
561 if (*no_extra_loops > 0)
562 signal.theData[5] = (*tot_extra_time)/(*no_extra_loops);
564 signal.theData[5] = 0;
572 execute(&signal.header, JBA, signal.theData, secPtr);
575 static NdbMutex g_mm_mutex;
578 mt_mem_manager_init()
580 NdbMutex_Init(&g_mm_mutex);
584 mt_mem_manager_lock()
586 NdbMutex_Lock(&g_mm_mutex);
590 mt_mem_manager_unlock()
592 NdbMutex_Unlock(&g_mm_mutex);