16 #include <ndb_global.h>
18 #include <VMSignal.hpp>
19 #include <kernel_types.h>
21 #include <SignalLoggerManager.hpp>
22 #include <SimulatedBlock.hpp>
23 #include <ErrorHandlingMacros.hpp>
24 #include <GlobalData.hpp>
25 #include <WatchDog.hpp>
26 #include <TransporterDefinitions.hpp>
27 #include "FastScheduler.hpp"
29 #include <DebuggerNames.hpp>
30 #include <signaldata/StopForCrash.hpp>
31 #include "TransporterCallbackKernel.hpp"
33 #include <portlib/ndb_prefetch.h>
39 GlobalData::mt_getBlock(BlockNumber blockNo, Uint32 instanceNo)
42 if (b != 0 && instanceNo != 0)
43 b = b->getInstance(instanceNo);
49 #define memcpy __builtin_memcpy
58 static const Uint32 MAX_SIGNALS_BEFORE_SEND = 200;
64 static const Uint32 MAX_SIGNALS_PER_JB = 100;
69 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER = 2;
70 static const Uint32 MAX_SIGNALS_BEFORE_FLUSH_OTHER = 20;
71 static const Uint32 MAX_SIGNALS_BEFORE_WAKEUP = 128;
75 #define MAX_BLOCK_INSTANCES (1 + MAX_NDBMT_LQH_WORKERS + 1) //main+lqh+extra
76 #define NUM_MAIN_THREADS 2 // except receiver
77 #define MAX_THREADS (NUM_MAIN_THREADS + MAX_NDBMT_LQH_THREADS + 1)
80 #define MAX_INSTANCES_PER_THREAD (16 + 8 * MAX_NDBMT_LQH_THREADS)
82 static Uint32 num_lqh_workers = 0;
83 static Uint32 num_lqh_threads = 0;
84 static Uint32 num_threads = 0;
85 static Uint32 receiver_thread_no = 0;
87 #define NO_SEND_THREAD (MAX_THREADS + 1)
90 #define MIN_SIGNALS_PER_PAGE (thr_job_buffer::SIZE / 32)
96 Uint32 m_contended_count;
99 static void register_lock(
const void * ptr,
const char *
name);
102 #if defined(HAVE_LINUX_FUTEX) && defined(NDB_HAVE_XCNG)
111 #include <sys/syscall.h>
112 #include <sys/types.h>
117 #define FUTEX_REQUEUE 3
118 #define FUTEX_CMP_REQUEUE 4
119 #define FUTEX_WAKE_OP 5
123 futex_wait(
volatile unsigned * addr,
int val,
const struct timespec * timeout)
125 return syscall(SYS_futex,
126 addr, FUTEX_WAIT, val, timeout, 0, 0) == 0 ? 0 : errno;
131 futex_wake(
volatile unsigned * addr)
133 return syscall(SYS_futex, addr, FUTEX_WAKE, 1, 0, 0, 0) == 0 ? 0 : errno;
138 volatile unsigned m_futex_state;
143 thr_wait() { xcng(&m_futex_state, FS_RUNNING);}
158 yield(
struct thr_wait* wait,
const Uint32 nsec,
161 volatile unsigned * val = &wait->m_futex_state;
165 xcng(val, thr_wait::FS_SLEEPING);
166 assert(old == thr_wait::FS_RUNNING);
179 bool waited = (*check_callback)(check_arg);
184 timeout.tv_nsec = nsec;
185 futex_wait(val, thr_wait::FS_SLEEPING, &timeout);
187 xcng(val, thr_wait::FS_RUNNING);
195 volatile unsigned * val = &wait->m_futex_state;
201 if (xcng(val, thr_wait::FS_RUNNING) == thr_wait::FS_SLEEPING)
203 return futex_wake(val);
208 #include <NdbMutex.h>
209 #include <NdbCondition.h>
216 thr_wait() : m_need_wakeup(
false), m_mutex(0), m_cond(0) {}
219 m_mutex = NdbMutex_Create();
220 m_cond = NdbCondition_Create();
226 yield(
struct thr_wait* wait,
const Uint32 nsec,
230 NdbCondition_ComputeAbsTime(&end, nsec/1000000);
231 NdbMutex_Lock(wait->m_mutex);
235 while ((*check_callback)(check_arg))
237 wait->m_need_wakeup =
true;
239 if (NdbCondition_WaitTimeoutAbs(wait->m_cond,
240 wait->m_mutex, &end) == ETIMEDOUT)
242 wait->m_need_wakeup =
false;
246 NdbMutex_Unlock(wait->m_mutex);
255 NdbMutex_Lock(wait->m_mutex);
257 if (wait->m_need_wakeup)
259 wait->m_need_wakeup =
false;
260 NdbCondition_Signal(wait->m_cond);
262 NdbMutex_Unlock(wait->m_mutex);
269 template <
unsigned SZ>
272 thr_spin_lock(
const char *
name = 0)
275 register_lock(
this,
name);
279 volatile Uint32 m_lock;
287 lock_slow(
void * sl,
volatile unsigned * val)
296 }
while (* val == 1);
298 if (unlikely(xcng(val, 1) != 0))
303 s->m_spin_count += spins;
304 Uint32 count = ++s->m_contended_count;
305 Uint32 freq = (count > 10000 ? 5000 : (count > 20 ? 200 : 1));
307 if ((count % freq) == 0)
308 printf(
"%s waiting for lock, contentions: %u spins: %u\n",
309 s->m_name, count, s->m_spin_count);
313 template <
unsigned SZ>
317 lock(
struct thr_spin_lock<SZ>* sl)
319 volatile unsigned* val = &sl->m_lock;
320 if (likely(xcng(val, 1) == 0))
326 template <
unsigned SZ>
330 unlock(
struct thr_spin_lock<SZ>* sl)
340 template <
unsigned SZ>
344 trylock(
struct thr_spin_lock<SZ>* sl)
346 volatile unsigned* val = &sl->m_lock;
350 #define thr_spin_lock thr_mutex
353 template <
unsigned SZ>
357 NdbMutex_Init(&m_mutex);
358 register_lock(
this,
name);
367 template <
unsigned SZ>
373 NdbMutex_Lock(&sl->m_mutex);
376 template <
unsigned SZ>
382 NdbMutex_Unlock(&sl->m_mutex);
385 template <
unsigned SZ>
391 return NdbMutex_Trylock(&sl->m_mutex);
404 thr_spin_lock<NDB_CL - (
sizeof(
void*) +
sizeof(Uint32))> m_lock;
414 m_free_list = ret->m_next;
421 ret =
reinterpret_cast<T*
>
422 (mm->alloc_page(rg, &dummy,
423 Ndbd_mem_manager::NDB_ZONE_ANY));
433 t->m_next = m_free_list;
440 T* head, T* tail, Uint32 cnt) {
442 tail->m_next = m_free_list;
457 m_max_free(max_free),
460 m_global_pool(global_pool)
468 m_freelist = tmp->m_next;
473 tmp = m_global_pool->seize(mm, rg);
480 unsigned free = m_free;
481 if (free < m_max_free)
484 t->m_next = m_freelist;
488 m_global_pool->release(mm, rg, t);
499 t->m_next = m_freelist;
505 void validate()
const {
514 assert(cnt == m_free);
525 unsigned free = m_free;
526 Uint32 maxfree = m_max_free;
529 T* head = m_freelist;
530 T* tail = m_freelist;
536 while (free > maxfree)
543 assert(free == maxfree);
546 m_freelist = tail->m_next;
547 m_global_pool->release_list(mm, rg, head, tail, cnt);
554 T* head = m_freelist;
555 T* tail = m_freelist;
559 while (tail->m_next != 0)
564 m_global_pool->release_list(mm, rg, head, tail, cnt);
590 static const unsigned SIZE = 8190;
612 calc_fifo_used(Uint32 ri, Uint32 wi, Uint32 sz)
614 return (wi >= ri) ? wi - ri : (sz - ri) + wi;
628 unsigned m_read_index;
629 unsigned m_write_index;
636 static const unsigned SIZE = 31;
644 thr_job_queue_head::used()
const
646 return calc_fifo_used(m_read_index, m_write_index, thr_job_queue::SIZE);
673 Uint32 m_write_index;
680 Uint32 m_pending_signals;
683 Uint32 m_pending_signals_wakeup;
714 Uint32 m_write_index;
716 bool is_empty()
const
718 assert(m_read_index != m_write_index || m_read_pos <= m_read_end);
719 return (m_read_index == m_write_index) && (m_read_pos >= m_read_end);
728 static const unsigned SQ_SIZE = 512;
729 static const unsigned LQ_SIZE = 512;
730 static const unsigned PAGES = 32 * (SQ_SIZE + LQ_SIZE) / 8192;
732 Uint32 * m_delayed_signals[PAGES];
735 Uint32 m_current_time;
737 Uint32 m_short_queue[SQ_SIZE];
738 Uint32 m_long_queue[LQ_SIZE];
745 #define THR_FREE_BUF_MAX 32
747 #define THR_FREE_BUF_MIN 12
752 #define THR_FREE_BUF_BATCH 6
759 static const Uint32 PGSIZE = 32768;
760 #if SIZEOF_CHARP == 4
761 static const Uint32 HEADER_SIZE = 8;
763 static const Uint32 HEADER_SIZE = 12;
766 static Uint32 max_bytes() {
797 unsigned m_write_index;
798 #if SIZEOF_CHARP == 8
801 static const unsigned SIZE = 7;
804 static const unsigned SIZE = 15;
810 thr_data() : m_jba_write_lock(
"jbalock"),
811 m_send_buffer_pool(0, THR_FREE_BUF_MAX) {}
830 struct thr_spin_lock<64> m_jba_write_lock;
858 Uint32 m_first_unused;
874 Uint32 m_watchdog_counter;
876 Uint32 m_prioa_count;
878 Uint32 m_priob_count;
882 Uint8 m_pending_send_nodes[MAX_NTRANSPORTERS];
884 Uint32 m_pending_send_count;
900 Uint32 m_instance_count;
901 BlockNumber m_instance_list[MAX_INSTANCES_PER_THREAD];
903 SectionSegmentPool::Cache m_sectionPoolCache;
916 virtual Uint32 *
getWritePtr(NodeId node, Uint32 len, Uint32 prio, Uint32 max);
917 virtual Uint32
updateWritePtr(NodeId node, Uint32 lenBytes, Uint32 prio);
927 void reportSendLen(NodeId nodeId, Uint32 count, Uint64 bytes);
929 void unlock_transporter(NodeId node);
939 #include <NdbMutex.h>
940 #include <NdbCondition.h>
945 : m_receive_lock(
"recvlock"),
946 m_section_lock(
"sectionlock"),
947 m_mem_manager_lock(
"memmanagerlock"),
948 m_jb_pool(
"jobbufferpool"),
949 m_sb_pool(
"sendbufferpool")
952 struct thr_spin_lock<64> m_receive_lock;
953 struct thr_spin_lock<64> m_section_lock;
954 struct thr_spin_lock<64> m_mem_manager_lock;
958 unsigned m_thread_count;
959 struct thr_data m_thread[MAX_THREADS];
1000 Uint32 m_read_index[MAX_THREADS];
1001 } m_send_buffers[MAX_NTRANSPORTERS];
1004 thr_send_queue m_thread_send_buffers[MAX_NTRANSPORTERS][MAX_THREADS];
1010 NdbMutex stop_for_crash_mutex;
1012 Uint32 stopped_threads;
1018 fifo_used_pages(
struct thr_data* selfptr)
1020 return calc_fifo_used(selfptr->m_first_unused,
1021 selfptr->m_first_free,
1028 job_buffer_full(
struct thr_data* selfptr)
1030 ndbout_c(
"job buffer full");
1036 out_of_job_buffer(
struct thr_data* selfptr)
1038 ndbout_c(
"out of job buffer");
1047 thr_data* selfptr = rep->m_thread + thr_no;
1048 Uint32 first_free = selfptr->m_first_free;
1049 Uint32 first_unused = selfptr->m_first_unused;
1062 Uint32 buffers = (first_free > first_unused ?
1063 first_unused + THR_FREE_BUF_MAX - first_free :
1064 first_unused - first_free);
1065 if (unlikely(buffers <= THR_FREE_BUF_MIN))
1075 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1077 assert(batch + THR_FREE_BUF_MIN < THR_FREE_BUF_MAX);
1079 jb = rep->m_jb_pool.seize(rep->m_mm, RG_JOBBUFFER);
1080 if (unlikely(jb == 0))
1082 if (unlikely(cnt == 0))
1084 out_of_job_buffer(selfptr);
1089 jb->m_prioa =
false;
1090 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1091 selfptr->m_free_fifo[first_free] = jb;
1093 }
while (cnt < batch);
1094 selfptr->m_first_free = first_free;
1097 jb= selfptr->m_free_fifo[first_free];
1098 selfptr->m_first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1101 jb->m_prioa = prioa;
1109 struct thr_data* selfptr = rep->m_thread + thr_no;
1110 Uint32 first_free = selfptr->m_first_free;
1111 Uint32 first_unused = selfptr->m_first_unused;
1120 Uint32 last_free = (first_unused ? first_unused : THR_FREE_BUF_MAX) - 1;
1125 first_free != first_unused &&
1126 !last_jb->m_prioa &&
1127 (len2 = jb->m_len) <= (thr_job_buffer::SIZE / 4) &&
1128 (len1 = last_jb->m_len) + len2 <= thr_job_buffer::SIZE)
1144 memcpy(last_jb->m_data + len1, jb->m_data, len2*
sizeof(jb->m_data[0]));
1145 last_jb->m_len = len1 + len2;
1147 first_free = (first_free ? first_free : THR_FREE_BUF_MAX) - 1;
1148 selfptr->m_free_fifo[first_free] = jb;
1149 selfptr->m_first_free = first_free;
1155 selfptr->m_free_fifo[first_unused] = jb;
1156 first_unused = (first_unused + 1) % THR_FREE_BUF_MAX;
1157 selfptr->m_first_unused = first_unused;
1160 if (unlikely(first_unused == first_free))
1163 Uint32 batch = THR_FREE_BUF_MAX / THR_FREE_BUF_BATCH;
1165 assert(batch < THR_FREE_BUF_MAX);
1167 rep->m_jb_pool.release(rep->m_mm, RG_JOBBUFFER,
1168 selfptr->m_free_fifo[first_free]);
1169 first_free = (first_free + 1) % THR_FREE_BUF_MAX;
1171 }
while (batch > 0);
1172 selfptr->m_first_free = first_free;
1179 scan_queue(
struct thr_data* selfptr, Uint32 cnt, Uint32 end, Uint32* ptr)
1181 Uint32 thr_no = selfptr->m_thr_no;
1182 Uint32 **pages = selfptr->m_tq.m_delayed_signals;
1183 Uint32 free = selfptr->m_tq.m_next_free;
1185 for (Uint32
i = 0;
i < cnt;
i++, ptr++)
1188 if ((val & 0xFFFF) <= end)
1190 Uint32 idx = val >> 16;
1191 Uint32
buf = idx >> 8;
1192 Uint32 pos = 32 * (idx & 0xFF);
1194 Uint32*
page = * (pages +
buf);
1197 const Uint32 *data = page + pos + (
sizeof(*s)>>2);
1199 ndbout_c(
"found %p val: %d end: %d", s, val & 0xFFFF, end);
1206 sendprioa(thr_no, s, data,
1207 data + s->theLength);
1208 * (page + pos) = free;
1213 selfptr->m_tq.m_next_free = free;
1214 memmove(save, ptr, 4 * (cnt -
i));
1222 selfptr->m_tq.m_next_free = free;
1228 handle_time_wrap(
struct thr_data* selfptr)
1231 struct thr_tq * tq = &selfptr->m_tq;
1232 Uint32 cnt0 = tq->m_cnt[0];
1233 Uint32 cnt1 = tq->m_cnt[1];
1234 Uint32 tmp0 = scan_queue(selfptr, cnt0, 32767, tq->m_short_queue);
1235 Uint32 tmp1 = scan_queue(selfptr, cnt1, 32767, tq->m_long_queue);
1238 tq->m_cnt[0] = cnt0;
1239 tq->m_cnt[1] = cnt1;
1240 for (i = 0; i<cnt0; i++)
1242 assert((tq->m_short_queue[i] & 0xFFFF) > 32767);
1243 tq->m_short_queue[
i] -= 32767;
1245 for (i = 0; i<cnt1; i++)
1247 assert((tq->m_long_queue[i] & 0xFFFF) > 32767);
1248 tq->m_long_queue[
i] -= 32767;
1254 scan_time_queues_impl(
struct thr_data* selfptr, NDB_TICKS now)
1256 struct thr_tq * tq = &selfptr->m_tq;
1257 NDB_TICKS last = selfptr->m_time;
1259 Uint32 curr = tq->m_current_time;
1260 Uint32 cnt0 = tq->m_cnt[0];
1261 Uint32 cnt1 = tq->m_cnt[1];
1264 Uint64 diff = now - last;
1265 Uint32 step = (Uint32)((diff > 20) ? 20 : diff);
1266 Uint32 end = (curr + step);
1269 handle_time_wrap(selfptr);
1270 cnt0 = tq->m_cnt[0];
1271 cnt1 = tq->m_cnt[1];
1275 Uint32 tmp0 = scan_queue(selfptr, cnt0, end, tq->m_short_queue);
1276 Uint32 tmp1 = scan_queue(selfptr, cnt1, end, tq->m_long_queue);
1278 tq->m_current_time = end;
1279 tq->m_cnt[0] = cnt0 - tmp0;
1280 tq->m_cnt[1] = cnt1 - tmp1;
1281 selfptr->m_time = last + step;
1286 scan_time_queues(
struct thr_data* selfptr, NDB_TICKS now)
1288 if (selfptr->m_time != now)
1289 scan_time_queues_impl(selfptr, now);
1299 struct thr_tq * tq = &selfptr->m_tq;
1300 Uint32 idx = tq->m_next_free;
1302 Uint32 buf = idx >> 8;
1303 Uint32 pos = idx & 0xFF;
1307 Uint32* page = * (tq->m_delayed_signals +
buf);
1308 Uint32* ptr = page + (32 * pos);
1309 tq->m_next_free = * ptr;
1314 Uint32 thr_no = selfptr->m_thr_no;
1315 for (Uint32 i = 0; i<thr_tq::PAGES; i++)
1317 if (tq->m_delayed_signals[i] == 0)
1320 Uint32 * page =
reinterpret_cast<Uint32*
>(jb);
1321 tq->m_delayed_signals[
i] =
page;
1323 ndbout_c(
"saving %p at %p (%d)", page, tq->m_delayed_signals+i, i);
1328 for (Uint32 j = 0; j<255; j ++)
1330 page[j * 32] = (i << 8) + (j + 1);
1332 page[255*32] = RNIL;
1342 senddelay(Uint32 thr_no,
const SignalHeader* s, Uint32 delay)
1345 struct thr_data * selfptr = rep->m_thread + thr_no;
1346 assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
1347 unsigned siglen = (
sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
1353 Uint32 alarm = selfptr->m_tq.m_current_time + delay;
1354 Uint32 nexttimer = selfptr->m_tq.m_next_timer;
1357 cntptr = selfptr->m_tq.m_cnt + 0;
1358 queueptr = selfptr->m_tq.m_short_queue;
1359 max = thr_tq::SQ_SIZE;
1363 cntptr = selfptr->m_tq.m_cnt + 1;
1364 queueptr = selfptr->m_tq.m_long_queue;
1365 max = thr_tq::LQ_SIZE;
1369 Uint32* ptr = get_free_slot(rep, selfptr, &idx);
1370 memcpy(ptr, s, 4*siglen);
1373 ndbout_c(
"now: %d alarm: %d send %s from %s to %s delay: %d idx: %x %p",
1374 selfptr->m_tq.m_current_time,
1376 getSignalName(s->theVerId_signalNumber),
1377 getBlockName(refToBlock(s->theSendersBlockRef)),
1378 getBlockName(s->theReceiversBlockNumber),
1383 Uint32 cnt = *cntptr;
1384 Uint32 newentry = (idx << 16) | (alarm & 0xFFFF);
1387 selfptr->m_tq.m_next_timer = alarm < nexttimer ? alarm : nexttimer;
1391 queueptr[0] = newentry;
1396 for (i = 0; i<cnt; i++)
1398 Uint32 save = queueptr[
i];
1399 if ((save & 0xFFFF) > alarm)
1401 memmove(queueptr+i+1, queueptr+i, 4*(cnt - i));
1402 queueptr[
i] = newentry;
1407 queueptr[
i] = newentry;
1437 w->m_write_buffer->m_len = w->m_write_pos;
1438 q_head->m_write_index = w->m_write_index;
1439 w->m_pending_signals_wakeup = 0;
1440 w->m_pending_signals = 0;
1459 w->m_write_buffer->m_len = w->m_write_pos;
1461 q_head->m_write_index = w->m_write_index;
1463 w->m_pending_signals_wakeup += w->m_pending_signals;
1464 w->m_pending_signals = 0;
1466 if (w->m_pending_signals_wakeup >= MAX_SIGNALS_BEFORE_WAKEUP)
1468 w->m_pending_signals_wakeup = 0;
1469 wakeup(&(dstptr->m_waiter));
1478 if (dstptr == selfptr)
1480 flush_write_state_self(q_head, w);
1484 flush_write_state_other(dstptr, q_head, w);
1491 flush_jbb_write_state(
thr_data *selfptr)
1493 Uint32 thr_count = g_thr_repository.m_thread_count;
1494 Uint32
self = selfptr->m_thr_no;
1497 thr_data *thrptr = g_thr_repository.m_thread;
1498 for (Uint32 thr_no = 0; thr_no < thr_count; thr_no++, thrptr++, w++)
1500 if (w->m_pending_signals || w->m_pending_signals_wakeup)
1502 w->m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
1504 flush_write_state(selfptr, thrptr, q_head, w);
1520 const Uint32 minfree = (1024 + MIN_SIGNALS_PER_PAGE - 1)/MIN_SIGNALS_PER_PAGE;
1521 unsigned thr_no = receiver_thread_no;
1522 const thr_data *thrptr = rep->m_thread;
1523 for (
unsigned i = 0; i<num_threads; i++, thrptr++)
1533 unsigned ri = q_head->m_read_index;
1534 unsigned wi = q_head->m_write_index;
1535 unsigned busy = (wi >= ri) ? wi - ri : (thr_job_queue::SIZE - ri) + wi;
1536 if (1 + minfree + busy >= thr_job_queue::SIZE)
1564 compute_max_signals_to_execute(Uint32 thr_no)
1566 Uint32 minfree = thr_job_queue::SIZE;
1568 const thr_data *thrptr = rep->m_thread;
1570 for (
unsigned i = 0; i<num_threads; i++, thrptr++)
1580 unsigned ri = q_head->m_read_index;
1581 unsigned wi = q_head->m_write_index;
1582 unsigned free = (wi < ri) ? ri - wi : (thr_job_queue::SIZE + ri) - wi;
1584 assert(free <= thr_job_queue::SIZE);
1592 if (minfree >= (1 + SAFETY))
1594 return (3 + (minfree - (1 + SAFETY)) * MIN_SIGNALS_PER_PAGE) / 4;
1603 #ifdef NDBMT_RAND_YIELD
1604 static Uint32 g_rand_yield = 0;
1607 rand_yield(Uint32
limit,
void* ptr0,
void * ptr1)
1610 UintPtr tmp = UintPtr(ptr0) + UintPtr(ptr1);
1611 Uint8* tmpptr = (Uint8*)&tmp;
1612 Uint32 sum = g_rand_yield;
1613 for (Uint32 i = 0; i<
sizeof(tmp); i++)
1614 sum = 33 * sum + tmpptr[i];
1616 if ((sum % 100) <
limit)
1623 static inline void rand_yield(Uint32 limit,
void* ptr0,
void * ptr1) {}
1633 memset(&signal.header, 0,
sizeof(signal.header));
1635 signal.header.theLength = 3;
1636 signal.header.theSendersSignalId = 0;
1637 signal.header.theSendersBlockRef = numberToRef(0, globalData.ownId);
1639 signal.theData[1] = nodeId;
1640 signal.theData[2] = (Uint32)(bytes/count);
1641 signal.header.theVerId_signalNumber = GSN_EVENT_REP;
1642 signal.header.theReceiversBlockNumber = CMVMI;
1643 sendlocal(g_thr_repository.m_send_buffers[nodeId].
m_send_thread,
1644 &signalT.header, signalT.theData, NULL);
1672 lock(&rep->m_receive_lock);
1676 trp_callback::unlock_transporter(NodeId node)
1679 unlock(&rep->m_receive_lock);
1687 if (unlikely(check_job_buffers(rep)))
1710 #if defined HAVE_SCHED_YIELD
1712 #elif defined _WIN32
1715 NdbSleep_MilliSleep(0);
1718 }
while (check_job_buffers(rep));
1736 Uint32 ri[MAX_THREADS];
1737 Uint32 wi[MAX_THREADS];
1738 thr_send_queue * src = g_thr_repository.m_thread_send_buffers[node];
1739 for (
unsigned thr = 0;
thr < num_threads;
thr++)
1741 ri[
thr] = sb->m_read_index[
thr];
1742 wi[
thr] = src[
thr].m_write_index;
1745 Uint64 sentinel[thr_send_page::HEADER_SIZE >> 1];
1747 sentinel_page->m_next = 0;
1750 tmp.m_first_page = sentinel_page;
1751 tmp.m_last_page = sentinel_page;
1754 for (
unsigned thr = 0;
thr < num_threads;
thr++, src++)
1764 assert(p->m_start == 0);
1765 bytes += p->m_bytes;
1766 tmp.m_last_page->m_next = p;
1767 while (p->m_next != 0)
1770 assert(p->m_start == 0);
1771 bytes += p->m_bytes;
1773 tmp.m_last_page = p;
1774 assert(tmp.m_last_page != 0);
1775 r = (r + 1) % thr_send_queue::SIZE;
1777 sb->m_read_index[
thr] = r;
1785 assert(sb->
m_buffer.m_first_page != 0);
1786 assert(sb->
m_buffer.m_last_page != 0);
1787 sb->
m_buffer.m_last_page->m_next = tmp.m_first_page->m_next;
1788 sb->
m_buffer.m_last_page = tmp.m_last_page;
1792 assert(sb->
m_buffer.m_first_page == 0);
1793 assert(sb->
m_buffer.m_last_page == 0);
1794 sb->
m_buffer.m_first_page = tmp.m_first_page->m_next;
1795 sb->
m_buffer.m_last_page = tmp.m_last_page;
1805 struct iovec *dst, Uint32 max)
1809 Uint32 bytes = link_thread_send_buffers(sb, node);
1810 if (max == 0 || bytes == 0)
1821 dst[pos].iov_len = p->m_bytes;
1822 dst[pos].iov_base = p->m_data + p->m_start;
1823 assert(p->m_start + p->m_bytes <= p->max_bytes());
1828 }
while (max && p != 0);
1838 while (head != tail)
1841 head = head->m_next;
1855 Uint32 remain = bytes;
1860 while (remain && remain >= curr->m_bytes)
1862 remain -= curr->m_bytes;
1864 curr = curr->m_next;
1867 Uint32 total_bytes = sb->
m_bytes;
1868 if (total_bytes == bytes)
1884 curr->m_start += remain;
1885 assert(curr->m_bytes > remain);
1886 curr->m_bytes -= remain;
1889 release_list(pool, sb->
m_buffer.m_first_page, prev);
1899 release_list(pool, sb->
m_buffer.m_first_page, prev);
1918 assert(thr_no != NO_SEND_THREAD);
1930 assert(thr_no != NO_SEND_THREAD);
1935 thr_send_queue * dst = g_thr_repository.m_thread_send_buffers[node]+thr_no;
1937 return sb->m_read_index[thr_no] != dst->m_write_index;
1956 assert(!should_be_empty);
1958 for (Uint32 i = 0; i < count; i++)
1959 bytes += v[i].iov_len;
1966 pool.release_all(rep->m_mm, RG_TRANSPORTER_BUFFERS);
1971 register_pending_send(
thr_data *selfptr, Uint32 nodeId)
1977 Uint32 i = selfptr->m_pending_send_count;
1978 selfptr->m_pending_send_nodes[
i] = nodeId;
1979 selfptr->m_pending_send_count = i + 1;
1988 flush_send_buffer(
thr_data* selfptr, Uint32 node)
1990 Uint32 thr_no = selfptr->m_thr_no;
1994 if (src->m_first_page == 0)
1998 assert(src->m_last_page != 0);
2003 Uint32 wi = dst->m_write_index;
2004 Uint32 next = (wi + 1) % thr_send_queue::SIZE;
2005 Uint32 ri = sb->m_read_index[thr_no];
2007 if (unlikely(next == ri))
2010 link_thread_send_buffers(sb, node);
2014 dst->m_buffers[wi] = src->m_first_page;
2016 dst->m_write_index = next;
2018 src->m_first_page = 0;
2019 src->m_last_page = 0;
2030 struct thr_data *selfptr = m_selfptr;
2043 selfptr->m_send_buffer_pool.
release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2053 try_send(
thr_data * selfptr, Uint32 node)
2074 selfptr->m_send_buffer_pool.
release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2090 Uint32 count = selfptr->m_pending_send_count;
2091 Uint8 *nodes = selfptr->m_pending_send_nodes;
2093 for (i = 0; i < count; i++)
2095 flush_send_buffer(selfptr, nodes[i]);
2115 do_send(
struct thr_data* selfptr,
bool must_send)
2118 Uint32 count = selfptr->m_pending_send_count;
2119 Uint8 *nodes = selfptr->m_pending_send_nodes;
2129 selfptr->m_pending_send_count = 0;
2131 for (i = 0; i < count; i++)
2133 Uint32 node = nodes[
i];
2134 selfptr->m_watchdog_counter = 6;
2136 flush_send_buffer(selfptr, node);
2167 register_pending_send(selfptr, node);
2194 int res = globalTransporterRegistry.
performSend(node);
2199 register_pending_send(selfptr, node);
2204 selfptr->m_send_buffer_pool.
release_global(rep->m_mm, RG_TRANSPORTER_BUFFERS);
2206 return selfptr->m_pending_send_count;
2214 if ((p != 0) && (p->m_bytes + p->m_start + len <= thr_send_page::max_bytes()))
2216 return (Uint32*)(p->m_data + p->m_start + p->m_bytes);
2221 flush_send_buffer(m_selfptr, node);
2222 try_send(m_selfptr, node);
2225 if ((p = m_selfptr->m_send_buffer_pool.seize(g_thr_repository.m_mm,
2226 RG_TRANSPORTER_BUFFERS)) != 0)
2231 b->m_first_page = b->m_last_page = p;
2232 return (Uint32*)p->m_data;
2242 p->m_bytes += lenBytes;
2264 Uint32 write_pos = w->m_write_pos;
2265 Uint32 datalen = sh->theLength;
2266 assert(w->m_write_buffer == q->m_buffers[w->m_write_index]);
2267 memcpy(w->m_write_buffer->m_data + write_pos, sh,
sizeof(*sh));
2268 write_pos += (
sizeof(*sh) >> 2);
2269 memcpy(w->m_write_buffer->m_data + write_pos, data, 4*datalen);
2270 write_pos += datalen;
2271 const Uint32 *p= secPtr;
2272 for (Uint32 i = 0; i < sh->m_noOfSections; i++)
2273 w->m_write_buffer->m_data[write_pos++] = *p++;
2274 w->m_pending_signals++;
2276 #
if SIZEOF_CHARP == 8
2278 write_pos= (write_pos+1) & ~((Uint32)1);
2286 if (likely(write_pos + 32 <= thr_job_buffer::SIZE))
2288 w->m_write_pos = write_pos;
2302 w->m_write_buffer->m_len = write_pos;
2303 Uint32 write_index = (w->m_write_index + 1) % thr_job_queue::SIZE;
2313 if (unlikely(write_index == q->m_head->m_read_index))
2317 new_buffer->m_len = 0;
2318 new_buffer->m_prioa = prioa;
2319 q->m_buffers[write_index] = new_buffer;
2320 w->m_write_index = write_index;
2322 w->m_write_buffer = new_buffer;
2331 read_jbb_state(
thr_data *selfptr, Uint32 count)
2336 for (Uint32 i = 0; i < count; i++,r++,q++)
2338 Uint32 read_index = r->m_read_index;
2344 if (r->m_write_index == read_index)
2346 r->m_write_index = q->m_head->m_write_index;
2347 read_barrier_depends();
2348 r->m_read_end = q->m_buffers[read_index]->m_len;
2358 r->m_write_index = selfptr->m_jba_head.m_write_index;
2359 read_barrier_depends();
2360 r->m_read_end = selfptr->m_jba.m_buffers[r->m_read_index]->m_len;
2361 return r->is_empty();
2366 check_queues_empty(
thr_data *selfptr)
2368 Uint32 thr_count = g_thr_repository.m_thread_count;
2369 bool empty = read_jba_state(selfptr);
2373 read_jbb_state(selfptr, thr_count);
2375 for (Uint32 i = 0; i < thr_count; i++,r++)
2392 Signal *sig, Uint32 max_signals, Uint32 *signalIdCounter)
2395 Uint32 read_index = r->m_read_index;
2396 Uint32 write_index = r->m_write_index;
2397 Uint32 read_pos = r->m_read_pos;
2398 Uint32 read_end = r->m_read_end;
2399 Uint32 *watchDogCounter = &selfptr->m_watchdog_counter;
2401 if (read_index == write_index && read_pos >= read_end)
2406 for (num_signals = 0; num_signals < max_signals; num_signals++)
2408 while (read_pos >= read_end)
2410 if (read_index == write_index)
2418 read_index = (read_index + 1) % thr_job_queue::SIZE;
2419 release_buffer(&g_thr_repository, selfptr->m_thr_no, read_buffer);
2420 read_buffer = q->m_buffers[read_index];
2422 read_end = read_buffer->m_len;
2424 r->m_read_index = q->m_head->m_read_index = read_index;
2425 r->m_read_buffer = read_buffer;
2426 r->m_read_pos = read_pos;
2427 r->m_read_end = read_end;
2436 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 16);
2437 NDB_PREFETCH_WRITE ((Uint32 *)&sig->header + 16);
2441 reinterpret_cast<SignalHeader*
>(read_buffer->m_data + read_pos);
2442 Uint32 seccnt = s->m_noOfSections;
2443 Uint32 siglen = (
sizeof(*s)>>2) + s->theLength;
2446 NDB_PREFETCH_READ (read_buffer->m_data + read_pos + 32);
2448 Uint32 bno = blockToMain(s->theReceiversBlockNumber);
2449 Uint32 ino = blockToInstance(s->theReceiversBlockNumber);
2453 Uint32 gsn = s->theVerId_signalNumber;
2454 *watchDogCounter = 1;
2456 s->theSignalId = (*signalIdCounter)++;
2457 memcpy(&sig->header, s, 4*siglen);
2458 sig->m_sectionPtrI[0] = read_buffer->m_data[read_pos + siglen + 0];
2459 sig->m_sectionPtrI[1] = read_buffer->m_data[read_pos + siglen + 1];
2460 sig->m_sectionPtrI[2] = read_buffer->m_data[read_pos + siglen + 2];
2462 read_pos += siglen + seccnt;
2463 #if SIZEOF_CHARP == 8
2465 read_pos = (read_pos + 1) & ~((Uint32)1);
2469 r->m_read_pos = read_pos;
2472 if (globalData.testOn)
2475 ptr[0].i = sig->m_sectionPtrI[0];
2476 ptr[1].i = sig->m_sectionPtrI[1];
2477 ptr[2].i = sig->m_sectionPtrI[2];
2478 ::getSections(seccnt, ptr);
2479 globalSignalLoggers.executeSignal(*s,
2495 run_job_buffers(
thr_data *selfptr,
Signal *sig, Uint32 *signalIdCounter)
2497 Uint32 thr_count = g_thr_repository.m_thread_count;
2498 Uint32 signal_count = 0;
2501 read_jbb_state(selfptr, thr_count);
2510 for (Uint32 send_thr_no = 0; send_thr_no < thr_count;
2511 send_thr_no++,queue++,read_state++)
2514 bool jba_empty = read_jba_state(selfptr);
2517 static Uint32 max_prioA = thr_job_queue::SIZE * thr_job_buffer::SIZE;
2518 signal_count += execute_signals(selfptr, &(selfptr->m_jba),
2519 &(selfptr->m_jba_read_state), sig,
2520 max_prioA, signalIdCounter);
2524 signal_count += execute_signals(selfptr, queue, read_state,
2525 sig, perjb, signalIdCounter);
2528 return signal_count;
2532 enum { NULL_THR_NO = 0xFF };
2537 static struct thr_map_entry thr_map[NO_OF_BLOCKS][MAX_BLOCK_INSTANCES];
2539 static inline Uint32
2540 block2ThreadId(Uint32 block, Uint32 instance)
2542 assert(block >= MIN_BLOCK_NO && block <= MAX_BLOCK_NO);
2543 Uint32
index = block - MIN_BLOCK_NO;
2544 assert(instance < MAX_BLOCK_INSTANCES);
2546 assert(entry.thr_no < num_threads);
2547 return entry.thr_no;
2551 add_thr_map(Uint32 main, Uint32 instance, Uint32 thr_no)
2553 assert(main == blockToMain(main));
2554 Uint32
index = main - MIN_BLOCK_NO;
2555 assert(index < NO_OF_BLOCKS);
2556 assert(instance < MAX_BLOCK_INSTANCES);
2562 Uint32 block = numberToBlock(main, instance);
2564 require(thr_no < num_threads);
2566 thr_data* thr_ptr = rep->m_thread + thr_no;
2571 for (i = 0; i < thr_ptr->m_instance_count; i++)
2572 require(thr_ptr->m_instance_list[i] != block);
2574 require(thr_ptr->m_instance_count < MAX_INSTANCES_PER_THREAD);
2575 thr_ptr->m_instance_list[thr_ptr->m_instance_count++] =
block;
2578 ctx.threadId = thr_no;
2579 ctx.jamBuffer = &thr_ptr->m_jam;
2580 ctx.watchDogCounter = &thr_ptr->m_watchdog_counter;
2581 ctx.sectionPoolCache = &thr_ptr->m_sectionPoolCache;
2582 b->assignToThread(ctx);
2586 require(entry.thr_no == thr_map_entry::NULL_THR_NO);
2587 entry.thr_no = thr_no;
2595 const Uint32 thr_GLOBAL = 0;
2596 const Uint32 thr_LOCAL = 1;
2597 const Uint32 thr_RECEIVER = receiver_thread_no;
2599 add_thr_map(BACKUP, 0, thr_LOCAL);
2600 add_thr_map(DBTC, 0, thr_GLOBAL);
2601 add_thr_map(DBDIH, 0, thr_GLOBAL);
2602 add_thr_map(DBLQH, 0, thr_LOCAL);
2603 add_thr_map(DBACC, 0, thr_LOCAL);
2604 add_thr_map(DBTUP, 0, thr_LOCAL);
2605 add_thr_map(DBDICT, 0, thr_GLOBAL);
2606 add_thr_map(NDBCNTR, 0, thr_GLOBAL);
2607 add_thr_map(QMGR, 0, thr_GLOBAL);
2608 add_thr_map(NDBFS, 0, thr_GLOBAL);
2609 add_thr_map(CMVMI, 0, thr_RECEIVER);
2610 add_thr_map(TRIX, 0, thr_GLOBAL);
2611 add_thr_map(DBUTIL, 0, thr_GLOBAL);
2612 add_thr_map(SUMA, 0, thr_LOCAL);
2613 add_thr_map(DBTUX, 0, thr_LOCAL);
2614 add_thr_map(TSMAN, 0, thr_LOCAL);
2615 add_thr_map(LGMAN, 0, thr_LOCAL);
2616 add_thr_map(PGMAN, 0, thr_LOCAL);
2617 add_thr_map(RESTORE, 0, thr_LOCAL);
2618 add_thr_map(DBINFO, 0, thr_LOCAL);
2619 add_thr_map(DBSPJ, 0, thr_GLOBAL);
2624 add_lqh_worker_thr_map(Uint32 block, Uint32 instance)
2626 require(instance != 0);
2627 Uint32 i = instance - 1;
2628 Uint32 thr_no = NUM_MAIN_THREADS + i % num_lqh_threads;
2629 add_thr_map(block, instance, thr_no);
2634 add_extra_worker_thr_map(Uint32 block, Uint32 instance)
2636 require(instance != 0);
2637 Uint32 thr_no = block2ThreadId(block, 0);
2638 add_thr_map(block, instance, thr_no);
2654 for (Uint32 b = 0; b < NO_OF_BLOCKS; b++)
2656 Uint32 bno = b + MIN_BLOCK_NO;
2658 while (cnt < MAX_BLOCK_INSTANCES &&
2659 thr_map[b][cnt].thr_no != thr_map_entry::NULL_THR_NO)
2662 if (cnt != MAX_BLOCK_INSTANCES)
2665 for (Uint32 i = cnt; i < MAX_BLOCK_INSTANCES; i++)
2667 Uint32 dup = (cnt == 1) ? 0 : 1 + ((i - 1) % (cnt - 1));
2668 if (thr_map[b][i].thr_no == thr_map_entry::NULL_THR_NO)
2670 thr_map[b][
i] = thr_map[b][dup];
2671 main->addInstance(globalData.getBlock(bno, dup),
i);
2678 require(bno == PGMAN);
2685 static void reportSignalStats(Uint32
self, Uint32 a_count, Uint32 a_size,
2686 Uint32 b_count, Uint32 b_size)
2691 memset(&s->header, 0,
sizeof(s->header));
2692 s->header.theLength = 6;
2693 s->header.theSendersSignalId = 0;
2694 s->header.theSendersBlockRef = numberToRef(0, 0);
2695 s->header.theVerId_signalNumber = GSN_EVENT_REP;
2696 s->header.theReceiversBlockNumber = CMVMI;
2697 s->theData[0] = NDB_LE_MTSignalStatistics;
2698 s->theData[1] =
self;
2699 s->theData[2] = a_count;
2700 s->theData[3] = a_size;
2701 s->theData[4] = b_count;
2702 s->theData[5] = b_size;
2704 sendlocal(
self, &s->header, s->theData,
2709 update_sched_stats(
thr_data *selfptr)
2711 if(selfptr->m_prioa_count + selfptr->m_priob_count >= 2000000)
2713 reportSignalStats(selfptr->m_thr_no,
2714 selfptr->m_prioa_count,
2715 selfptr->m_prioa_size,
2716 selfptr->m_priob_count,
2717 selfptr->m_priob_size);
2718 selfptr->m_prioa_count = 0;
2719 selfptr->m_prioa_size = 0;
2720 selfptr->m_priob_count = 0;
2721 selfptr->m_priob_size = 0;
2724 Uint32 thr_no = selfptr->m_thr_no;
2725 ndbout_c(
"--- %u fifo: %u jba: %u global: %u",
2727 fifo_used_pages(selfptr),
2728 selfptr->m_jba_head.used(),
2729 g_thr_repository.m_free_list.m_cnt);
2730 for (Uint32 i = 0; i<num_threads; i++)
2732 ndbout_c(
" %u-%u : %u",
2733 thr_no, i, selfptr->m_in_queue_head[i].used());
2742 selfptr->m_waiter.init();
2743 selfptr->m_jam.theEmulatedJamIndex = 0;
2744 selfptr->m_jam.theEmulatedJamBlockNumber = 0;
2745 bzero(selfptr->m_jam.theEmulatedJam,
sizeof(selfptr->m_jam.theEmulatedJam));
2746 NdbThread_SetTlsKey(NDB_THREAD_TLS_JAM, &selfptr->m_jam);
2747 NdbThread_SetTlsKey(NDB_THREAD_TLS_THREAD, selfptr);
2749 unsigned thr_no = selfptr->m_thr_no;
2750 globalEmulatorData.theWatchDog->
2751 registerWatchedThread(&selfptr->m_watchdog_counter, thr_no);
2753 while(selfptr->m_thread == 0)
2754 NdbSleep_MilliSleep(30);
2757 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
2759 tmp.
appfmt(
"thr: %u ", thr_no);
2761 int tid = NdbThread_GetTid(selfptr->m_thread);
2764 tmp.
appfmt(
"tid: %u ", tid);
2767 conf.appendInfo(tmp,
2768 selfptr->m_instance_list, selfptr->m_instance_count);
2769 int res = conf.do_bind(selfptr->m_thread,
2770 selfptr->m_instance_list, selfptr->m_instance_count);
2773 tmp.
appfmt(
"err: %d ", -res);
2780 selfptr->m_thr_id = pthread_self();
2782 for (Uint32 i = 0; i < selfptr->m_instance_count; i++)
2784 BlockReference block = selfptr->m_instance_list[
i];
2785 Uint32 main = blockToMain(block);
2786 Uint32 instance = blockToInstance(block);
2787 tmp.
appfmt(
"%s(%u) ", getBlockName(main), instance);
2789 printf(
"%s\n", tmp.
c_str());
2797 #define SIGBUF_SIZE (sizeof(Signal) + 63 + 256 * MAX_THREADS)
2799 aligned_signal(
unsigned char signal_buf[SIGBUF_SIZE],
unsigned thr_no)
2801 UintPtr sigtmp= (UintPtr)signal_buf;
2802 sigtmp= (sigtmp+63) & (~(UintPtr)63);
2803 sigtmp+= thr_no*256;
2807 Uint32 receiverThreadId;
2826 mt_receiver_thread_main(
void *thr_arg)
2828 unsigned char signal_buf[SIGBUF_SIZE];
2832 unsigned thr_no = selfptr->m_thr_no;
2833 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
2834 Uint32 thrSignalId = 0;
2835 bool has_received =
false;
2837 init_thread(selfptr);
2838 receiverThreadId = thr_no;
2839 signal = aligned_signal(signal_buf, thr_no);
2841 while (globalData.theRestartFlag != perform_stop)
2845 update_sched_stats(selfptr);
2849 watchDogCounter = 5;
2852 cnt = (cnt + 1) & 15;
2854 watchDogCounter = 2;
2856 NDB_TICKS now = NdbTick_CurrentMillisecond();
2857 scan_time_queues(selfptr, now);
2859 Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
2861 if (sum || has_received)
2863 watchDogCounter = 6;
2864 flush_jbb_write_state(selfptr);
2867 do_send(selfptr, TRUE);
2869 watchDogCounter = 7;
2871 has_received =
false;
2872 if (globalTransporterRegistry.pollReceive(1))
2874 if (check_job_buffers(rep) == 0)
2876 watchDogCounter = 8;
2877 lock(&rep->m_receive_lock);
2879 unlock(&rep->m_receive_lock);
2880 has_received =
true;
2885 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
2895 for (i = 0; i < thr_ptr->m_instance_count; i++)
2897 BlockReference block = thr_ptr->m_instance_list[
i];
2898 Uint32 main = blockToMain(block);
2899 Uint32 instance = blockToInstance(block);
2902 assert(b != 0 && b->getThreadId() == thr_ptr->m_thr_no);
2913 check_job_buffer_full(
thr_data *selfptr)
2915 Uint32 thr_no = selfptr->m_thr_no;
2916 Uint32 tmp = compute_max_signals_to_execute(thr_no);
2918 Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2927 if (tmp < g_thr_repository.m_thread_count)
2957 update_sched_config(
struct thr_data* selfptr, Uint32 pending_send)
2959 Uint32 sleeploop = 0;
2960 Uint32 thr_no = selfptr->m_thr_no;
2962 Uint32 tmp = compute_max_signals_to_execute(thr_no);
2963 Uint32 perjb = tmp / g_thr_repository.m_thread_count;
2965 if (perjb > MAX_SIGNALS_PER_JB)
2966 perjb = MAX_SIGNALS_PER_JB;
2971 if (unlikely(perjb == 0))
2974 if (sleeploop == 10)
2980 ndbout_c(
"%u - sleeploop 10!!", selfptr->m_thr_no);
2987 pending_send = do_send(selfptr, TRUE);
2990 const Uint32 wait = 1000000;
2991 yield(&selfptr->m_waiter, wait, check_job_buffer_full, selfptr);
2995 return sleeploop > 0;
3000 mt_job_thread_main(
void *thr_arg)
3002 unsigned char signal_buf[SIGBUF_SIZE];
3004 const Uint32 nowait = 10 * 1000000;
3005 Uint32 thrSignalId = 0;
3008 init_thread(selfptr);
3009 Uint32& watchDogCounter = selfptr->m_watchdog_counter;
3011 unsigned thr_no = selfptr->m_thr_no;
3012 signal = aligned_signal(signal_buf, thr_no);
3015 watchDogCounter = 1;
3017 Uint32 pending_send = 0;
3018 Uint32 send_sum = 0;
3021 NDB_TICKS now = selfptr->m_time;
3023 while (globalData.theRestartFlag != perform_stop)
3026 update_sched_stats(selfptr);
3028 watchDogCounter = 2;
3029 scan_time_queues(selfptr, now);
3031 Uint32 sum = run_job_buffers(selfptr, signal, &thrSignalId);
3033 watchDogCounter = 1;
3034 signal->header.m_noOfSections = 0;
3035 sendpacked(selfptr, signal);
3039 watchDogCounter = 6;
3040 flush_jbb_write_state(selfptr);
3043 if (send_sum > MAX_SIGNALS_BEFORE_SEND)
3046 pending_send = do_send(selfptr, FALSE);
3058 if (pending_send || send_sum > 0)
3061 pending_send = do_send(selfptr, TRUE);
3065 if (pending_send == 0)
3067 bool waited = yield(&selfptr->m_waiter, nowait, check_queues_empty,
3072 now = NdbTick_CurrentMillisecond();
3084 if (update_sched_config(selfptr, pending_send))
3087 now = NdbTick_CurrentMillisecond();
3100 if (loops > maxloops)
3102 now = NdbTick_CurrentMillisecond();
3103 Uint64 diff = now - selfptr->m_time;
3107 maxloops += ((maxloops/10) + 1);
3108 else if (diff > 1 && maxloops > 1)
3109 maxloops -= ((maxloops/10) + 1);
3115 globalEmulatorData.theWatchDog->unregisterWatchedThread(thr_no);
3120 sendlocal(Uint32
self,
const SignalHeader *s,
const Uint32 *data,
3121 const Uint32 secPtr[3])
3123 Uint32 block = blockToMain(s->theReceiversBlockNumber);
3124 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3131 Uint32 MAX_SIGNALS_BEFORE_FLUSH = (
self == receiver_thread_no) ?
3132 MAX_SIGNALS_BEFORE_FLUSH_RECEIVER :
3133 MAX_SIGNALS_BEFORE_FLUSH_OTHER;
3135 Uint32 dst = block2ThreadId(block, instance);
3137 struct thr_data * selfptr = rep->m_thread +
self;
3138 assert(pthread_equal(selfptr->m_thr_id, pthread_self()));
3139 struct thr_data * dstptr = rep->m_thread + dst;
3141 selfptr->m_priob_count++;
3142 Uint32 siglen = (
sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3143 selfptr->m_priob_size += siglen;
3147 if (insert_signal(q, w,
false, s, data, secPtr, selfptr->m_next_buffer))
3149 selfptr->m_next_buffer = seize_buffer(rep,
self,
false);
3151 if (w->m_pending_signals >= MAX_SIGNALS_BEFORE_FLUSH)
3152 flush_write_state(selfptr, dstptr, q->m_head, w);
3156 sendprioa(Uint32
self,
const SignalHeader *s,
const uint32 *data,
3157 const Uint32 secPtr[3])
3159 Uint32 block = blockToMain(s->theReceiversBlockNumber);
3160 Uint32 instance = blockToInstance(s->theReceiversBlockNumber);
3162 Uint32 dst = block2ThreadId(block, instance);
3164 struct thr_data *selfptr = rep->m_thread +
self;
3165 assert(s->theVerId_signalNumber == GSN_START_ORD ||
3166 pthread_equal(selfptr->m_thr_id, pthread_self()));
3167 struct thr_data *dstptr = rep->m_thread + dst;
3169 selfptr->m_prioa_count++;
3170 Uint32 siglen = (
sizeof(*s) >> 2) + s->theLength + s->m_noOfSections;
3171 selfptr->m_prioa_size += siglen;
3176 lock(&dstptr->m_jba_write_lock);
3178 Uint32 index = q->m_head->m_write_index;
3179 w.m_write_index =
index;
3181 w.m_write_buffer = buffer;
3182 w.m_write_pos = buffer->m_len;
3183 w.m_pending_signals = 0;
3184 w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3185 bool buf_used = insert_signal(q, &w,
true, s, data, secPtr,
3186 selfptr->m_next_buffer);
3187 flush_write_state(selfptr, dstptr, q->m_head, &w);
3189 unlock(&dstptr->m_jba_write_lock);
3192 selfptr->m_next_buffer = seize_buffer(rep,
self,
true);
3201 mt_send_remote(Uint32
self,
const SignalHeader *sh, Uint8 prio,
3202 const Uint32 * data, NodeId nodeId,
3206 thr_data *selfptr = rep->m_thread +
self;
3210 register_pending_send(selfptr, nodeId);
3212 ss = globalTransporterRegistry.
prepareSend(&handle,
3213 sh, prio, data, nodeId, ptr);
3218 mt_send_remote(Uint32
self,
const SignalHeader *sh, Uint8 prio,
3219 const Uint32 *data, NodeId nodeId,
3224 thr_data *selfptr = rep->m_thread +
self;
3228 register_pending_send(selfptr, nodeId);
3229 ss = globalTransporterRegistry.
prepareSend(&handle,
3230 sh, prio, data, nodeId,
3244 sendprioa_STOP_FOR_CRASH(
const struct thr_data *selfptr, Uint32 dst)
3257 struct thr_data * dstptr = rep->m_thread + dst;
3258 Uint32 bno = dstptr->m_instance_list[0];
3261 signalT.header.theVerId_signalNumber = GSN_STOP_FOR_CRASH;
3262 signalT.header.theReceiversBlockNumber = bno;
3263 signalT.header.theSendersBlockRef = 0;
3264 signalT.header.theTrace = 0;
3265 signalT.header.theSendersSignalId = 0;
3266 signalT.header.theSignalId = 0;
3267 signalT.header.theLength = StopForCrash::SignalLength;
3269 stopForCrash->flags = 0;
3274 lock(&dstptr->m_jba_write_lock);
3276 Uint32 index = q->m_head->m_write_index;
3277 w.m_write_index =
index;
3279 w.m_write_buffer = buffer;
3280 w.m_write_pos = buffer->m_len;
3281 w.m_pending_signals = 0;
3282 w.m_pending_signals_wakeup = MAX_SIGNALS_BEFORE_WAKEUP;
3283 insert_signal(q, &w,
true, &signalT.header, signalT.theData, NULL,
3285 flush_write_state(selfptr, dstptr, q->m_head, &w);
3287 unlock(&dstptr->m_jba_write_lock);
3295 queue_init(
struct thr_tq* tq)
3297 tq->m_next_timer = 0;
3298 tq->m_current_time = 0;
3299 tq->m_next_free = RNIL;
3300 tq->m_cnt[0] = tq->m_cnt[1] = 0;
3301 bzero(tq->m_delayed_signals,
sizeof(tq->m_delayed_signals));
3311 selfptr->m_thr_no = thr_no;
3314 selfptr->m_first_free = 0;
3315 selfptr->m_first_unused = 0;
3320 register_lock(&selfptr->m_jba_write_lock, buf);
3322 selfptr->m_jba_head.m_read_index = 0;
3323 selfptr->m_jba_head.m_write_index = 0;
3324 selfptr->m_jba.m_head = &selfptr->m_jba_head;
3326 selfptr->m_jba.m_buffers[0] = buffer;
3327 selfptr->m_jba_read_state.m_read_index = 0;
3328 selfptr->m_jba_read_state.m_read_buffer = buffer;
3329 selfptr->m_jba_read_state.m_read_pos = 0;
3330 selfptr->m_jba_read_state.m_read_end = 0;
3331 selfptr->m_jba_read_state.m_write_index = 0;
3332 selfptr->m_next_buffer = seize_buffer(rep, thr_no,
false);
3333 selfptr->m_send_buffer_pool.set_pool(&rep->m_sb_pool);
3335 for (i = 0; i<cnt; i++)
3337 selfptr->m_in_queue_head[
i].m_read_index = 0;
3338 selfptr->m_in_queue_head[
i].m_write_index = 0;
3339 selfptr->m_in_queue[
i].m_head = &selfptr->m_in_queue_head[
i];
3340 buffer = seize_buffer(rep, thr_no,
false);
3341 selfptr->m_in_queue[
i].m_buffers[0] = buffer;
3342 selfptr->m_read_states[
i].m_read_index = 0;
3343 selfptr->m_read_states[
i].m_read_buffer = buffer;
3344 selfptr->m_read_states[
i].m_read_pos = 0;
3345 selfptr->m_read_states[
i].m_read_end = 0;
3346 selfptr->m_read_states[
i].m_write_index = 0;
3348 queue_init(&selfptr->m_tq);
3350 selfptr->m_prioa_count = 0;
3351 selfptr->m_prioa_size = 0;
3352 selfptr->m_priob_count = 0;
3353 selfptr->m_priob_size = 0;
3355 selfptr->m_pending_send_count = 0;
3358 selfptr->m_instance_count = 0;
3359 for (i = 0; i < MAX_INSTANCES_PER_THREAD; i++)
3360 selfptr->m_instance_list[i] = 0;
3362 bzero(&selfptr->m_send_buffers,
sizeof(selfptr->m_send_buffers));
3364 selfptr->m_thread = 0;
3365 selfptr->m_cpu = NO_LOCK_CPU;
3372 unsigned int cnt,
unsigned thr_no)
3374 for (Uint32 i = 0; i<cnt; i++)
3376 selfptr->m_write_states[
i].m_write_index = 0;
3377 selfptr->m_write_states[
i].m_write_pos = 0;
3378 selfptr->m_write_states[
i].m_write_buffer =
3379 rep->m_thread[
i].m_in_queue[thr_no].m_buffers[0];
3380 selfptr->m_write_states[
i].m_pending_signals = 0;
3381 selfptr->m_write_states[
i].m_pending_signals_wakeup = 0;
3396 bzero(sb->m_read_index,
sizeof(sb->m_read_index));
3405 rep->m_thread_count = cnt;
3406 for (
unsigned int i = 0; i<cnt; i++)
3408 thr_init(rep, rep->m_thread + i, cnt, i);
3410 for (
unsigned int i = 0; i<cnt; i++)
3412 thr_init2(rep, rep->m_thread + i, cnt, i);
3415 rep->stopped_threads = 0;
3416 NdbMutex_Init(&rep->stop_for_crash_mutex);
3417 NdbCondition_Init(&rep->stop_for_crash_cond);
3419 for (
int i = 0 ; i < MAX_NTRANSPORTERS; i++)
3421 send_buffer_init(i, rep->m_send_buffers+i);
3424 bzero(rep->m_thread_send_buffers,
sizeof(rep->m_thread_send_buffers));
3432 #include "ThreadConfig.hpp"
3433 #include <signaldata/StartOrd.hpp>
3438 Uint32 cnt = NUM_MAIN_THREADS + globalData.ndbMtLqhThreads + 1;
3440 Uint32 perthread = 0;
3446 perthread += cnt * (1 + thr_job_queue::SIZE);
3451 perthread += (1 + thr_job_queue::SIZE);
3461 perthread += THR_FREE_BUF_MAX;
3466 Uint32 tot = cnt * perthread;
3471 ThreadConfig::ThreadConfig()
3475 ThreadConfig::~ThreadConfig()
3484 ThreadConfig::init()
3486 num_lqh_workers = globalData.ndbMtLqhWorkers;
3487 num_lqh_threads = globalData.ndbMtLqhThreads;
3488 num_threads = NUM_MAIN_THREADS + num_lqh_threads + 1;
3489 require(num_threads <= MAX_THREADS);
3490 receiver_thread_no = num_threads - 1;
3492 ndbout <<
"NDBMT: num_threads=" << num_threads << endl;
3494 ::rep_init(&g_thr_repository, num_threads,
3495 globalEmulatorData.m_mem_manager);
3502 THRConfigApplier & conf = globalEmulatorData.theConfiguration->m_thr_config;
3503 conf.create_cpusets();
3504 if (conf.getInfoMessage())
3506 printf(
"%s", conf.getInfoMessage());
3514 unsigned int thr_no;
3520 setcpuaffinity(rep);
3526 for (thr_no = 0; thr_no < num_threads; thr_no++)
3528 rep->m_thread[thr_no].m_time = NdbTick_CurrentMillisecond();
3530 if (thr_no == receiver_thread_no)
3537 rep->m_thread[thr_no].m_thread =
3538 NdbThread_Create(mt_job_thread_main,
3539 (
void **)(rep->m_thread + thr_no),
3542 NDB_THREAD_PRIO_MEAN);
3543 require(rep->m_thread[thr_no].m_thread != NULL);
3547 rep->m_thread[receiver_thread_no].m_thread = pThis;
3548 mt_receiver_thread_main(&(rep->m_thread[receiver_thread_no]));
3551 for (thr_no = 0; thr_no < num_threads; thr_no++)
3553 if (thr_no == receiver_thread_no)
3555 void *dummy_return_status;
3556 NdbThread_WaitFor(rep->m_thread[thr_no].m_thread, &dummy_return_status);
3557 NdbThread_Destroy(&(rep->m_thread[thr_no].m_thread));
3567 signalT.header.theVerId_signalNumber = GSN_START_ORD;
3568 signalT.header.theReceiversBlockNumber = CMVMI;
3569 signalT.header.theSendersBlockRef = 0;
3570 signalT.header.theTrace = 0;
3571 signalT.header.theSignalId = 0;
3572 signalT.header.theLength = StartOrd::SignalLength;
3577 sendprioa(block2ThreadId(CMVMI, 0), &signalT.header, signalT.theData, 0);
3593 wrap_compare(Uint32 a, Uint32 b)
3596 if (a >= 0x80000000)
3597 if (b >= 0x80000000)
3598 return (
int)(a & 0x7fffffff) - (
int)(b & 0x7fffffff);
3600 return (a - b) >= 0x80000000 ? -1 : 1;
3602 if (b >= 0x80000000)
3603 return (b - a) >= 0x80000000 ? 1 : -1;
3605 return (
int)a - (int)b;
3609 FastScheduler::traceDumpGetNumThreads()
3616 FastScheduler::traceDumpGetJam(Uint32 thr_no, Uint32 & jamBlockNumber,
3617 const Uint32 * & thrdTheEmulatedJam,
3618 Uint32 & thrdTheEmulatedJamIndex)
3620 if (thr_no >= num_threads)
3623 #ifdef NO_EMULATED_JAM
3625 thrdTheEmulatedJam = NULL;
3626 thrdTheEmulatedJamIndex = 0;
3629 thrdTheEmulatedJam = jamBuffer->theEmulatedJam;
3630 thrdTheEmulatedJamIndex = jamBuffer->theEmulatedJamIndex;
3631 jamBlockNumber = jamBuffer->theEmulatedJamBlockNumber;
3637 FastScheduler::traceDumpPrepare(NdbShutdownType& nst)
3656 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3660 Uint32 waitFor_count = 0;
3661 NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3662 g_thr_repository.stopped_threads = 0;
3664 for (Uint32 thr_no = 0; thr_no < num_threads; thr_no++)
3666 if (selfptr != NULL && selfptr->m_thr_no == thr_no)
3672 sendprioa_STOP_FOR_CRASH(selfptr, thr_no);
3677 static const Uint32 max_wait_seconds = 2;
3678 NDB_TICKS start = NdbTick_CurrentMillisecond();
3679 while (g_thr_repository.stopped_threads < waitFor_count)
3681 NdbCondition_WaitTimeout(&g_thr_repository.stop_for_crash_cond,
3682 &g_thr_repository.stop_for_crash_mutex,
3684 NDB_TICKS now = NdbTick_CurrentMillisecond();
3685 if (now > start + max_wait_seconds * 1000)
3688 if (g_thr_repository.stopped_threads < waitFor_count)
3690 if (nst != NST_ErrorInsert)
3694 ndbout_c(
"Warning: %d thread(s) did not stop before starting crash dump.",
3695 waitFor_count - g_thr_repository.stopped_threads);
3697 NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3702 void mt_execSTOP_FOR_CRASH()
3704 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3706 require(selfptr != NULL);
3708 NdbMutex_Lock(&g_thr_repository.stop_for_crash_mutex);
3709 g_thr_repository.stopped_threads++;
3710 NdbCondition_Signal(&g_thr_repository.stop_for_crash_cond);
3711 NdbMutex_Unlock(&g_thr_repository.stop_for_crash_mutex);
3714 globalEmulatorData.theWatchDog->unregisterWatchedThread(selfptr->m_thr_no);
3720 FastScheduler::dumpSignalMemory(Uint32 thr_no, FILE* out)
3722 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3730 Uint32 *watchDogCounter;
3732 watchDogCounter = &selfptr->m_watchdog_counter;
3734 watchDogCounter = NULL;
3754 static const Uint32 MAX_SIGNALS_TO_DUMP = 4096;
3758 } signalSequence[MAX_SIGNALS_TO_DUMP];
3759 Uint32 seq_start = 0;
3762 const thr_data *thr_ptr = &rep->m_thread[thr_no];
3763 if (watchDogCounter)
3764 *watchDogCounter = 4;
3785 } jbs[THR_FREE_BUF_MAX + MAX_THREADS + 1];
3790 Uint32 idx = thr_ptr->m_first_free;
3791 while (idx != thr_ptr->m_first_unused)
3796 jbs[num_jbs].m_jb = q;
3797 jbs[num_jbs].m_pos = 0;
3798 jbs[num_jbs].m_max = q->m_len;
3801 idx = (idx + 1) % THR_FREE_BUF_MAX;
3804 for (Uint32 thr_no = 0; thr_no < rep->m_thread_count; thr_no++)
3808 Uint32 read_pos = r->m_read_pos;
3811 jbs[num_jbs].m_jb = q->m_buffers[r->m_read_index];
3812 jbs[num_jbs].m_pos = 0;
3813 jbs[num_jbs].m_max = read_pos;
3819 Uint32 read_pos = r->m_read_pos;
3822 jbs[num_jbs].m_jb = thr_ptr->m_jba.m_buffers[r->m_read_index];
3823 jbs[num_jbs].m_pos = 0;
3824 jbs[num_jbs].m_max = read_pos;
3831 if (watchDogCounter)
3832 *watchDogCounter = 4;
3836 const Uint32 *p = jbs[idx_min].m_jb->m_data + jbs[idx_min].m_pos;
3838 Uint32 sid_min = s_min->theSignalId;
3840 for (Uint32 i = 1; i < num_jbs; i++)
3842 p = jbs[
i].m_jb->m_data + jbs[
i].m_pos;
3844 Uint32 sid = s->theSignalId;
3845 if (wrap_compare(sid, sid_min) < 0)
3854 signalSequence[seq_end].ptr = s_min;
3855 signalSequence[seq_end].prioa = jbs[idx_min].m_jb->m_prioa;
3857 (
sizeof(
SignalHeader)>>2) + s_min->m_noOfSections + s_min->theLength;
3858 #if SIZEOF_CHARP == 8
3860 siglen= (siglen+1) & ~((Uint32)1);
3862 jbs[idx_min].m_pos += siglen;
3863 if (jbs[idx_min].m_pos >= jbs[idx_min].m_max)
3867 jbs[idx_min] = jbs[num_jbs];
3869 seq_end = (seq_end + 1) % MAX_SIGNALS_TO_DUMP;
3871 if (seq_end == seq_start)
3872 seq_start = (seq_start + 1) % MAX_SIGNALS_TO_DUMP;
3877 bool first_one =
true;
3878 bool out_of_signals =
false;
3879 Uint32 lastSignalId = 0;
3880 while (seq_end != seq_start)
3882 if (watchDogCounter)
3883 *watchDogCounter = 4;
3886 seq_end = MAX_SIGNALS_TO_DUMP;
3890 unsigned siglen = (
sizeof(*s)>>2) + s->theLength;
3893 memcpy(&signal.header, s, 4*siglen);
3895 if (num_lqh_workers == 0)
3896 signal.header.theReceiversBlockNumber &= NDBMT_BLOCK_MASK;
3898 const Uint32 *posptr =
reinterpret_cast<const Uint32 *
>(s);
3899 signal.m_sectionPtrI[0] = posptr[siglen + 0];
3900 signal.m_sectionPtrI[1] = posptr[siglen + 1];
3901 signal.m_sectionPtrI[2] = posptr[siglen + 2];
3902 bool prioa = signalSequence[seq_end].prioa;
3905 if (!first_one && !out_of_signals && (s->theSignalId + 1) != lastSignalId)
3907 out_of_signals =
true;
3908 fprintf(out,
"\n\n\nNo more prio %s signals, rest of dump will be "
3909 "incomplete.\n\n\n\n", prioa ?
"B" :
"A");
3912 lastSignalId = s->theSignalId;
3914 fprintf(out,
"--------------- Signal ----------------\n");
3915 Uint32 prio = (prioa ? JBA : JBB);
3923 &signal.theData[0]);
3929 FastScheduler::traceDumpGetCurrentThread()
3931 void *value= NdbThread_GetTlsKey(NDB_THREAD_TLS_THREAD);
3941 return (
int)selfptr->m_thr_no;
3948 lock(&(g_thr_repository.m_section_lock));
3954 unlock(&(g_thr_repository.m_section_lock));
3958 mt_mem_manager_init()
3963 mt_mem_manager_lock()
3965 lock(&(g_thr_repository.m_mem_manager_lock));
3969 mt_mem_manager_unlock()
3971 unlock(&(g_thr_repository.m_mem_manager_lock));
3979 register_lock(
const void * ptr,
const char *
name)
3985 for (
size_t i = 0; i<g_locks.size(); i++)
3987 if (arr[i].m_ptr == ptr)
3991 free(arr[i].m_name);
3993 arr[
i].m_name = strdup(name);
4000 ln.m_name = strdup(name);
4001 ln.m_contended_count = 0;
4002 ln.m_spin_count = 0;
4003 g_locks.push_back(ln);
4008 lookup_lock(
const void * ptr)
4011 for (
size_t i = 0; i<g_locks.size(); i++)
4013 if (arr[i].m_ptr == ptr)
4021 mt_get_thread_references_for_blocks(
const Uint32 blocks[], Uint32 threadId,
4022 Uint32 dst[], Uint32 len)
4025 Bitmask<(MAX_THREADS+31)/32> mask;
4027 for (Uint32 i = 0; blocks[
i] != 0; i++)
4029 Uint32 block = blocks[
i];
4033 assert(block == blockToMain(block));
4034 Uint32 index = block - MIN_BLOCK_NO;
4035 for (Uint32 instance = 0; instance < MAX_BLOCK_INSTANCES; instance++)
4037 Uint32 thr_no = thr_map[
index][instance].thr_no;
4038 if (thr_no == thr_map_entry::NULL_THR_NO)
4041 if (mask.get(thr_no))
4046 dst[cnt++] = numberToRef(block, instance, 0);
4055 Uint32 thr_no = block->getThreadId();
4056 thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4057 wakeup(&thrptr->m_waiter);
4064 Uint32 thr_no = block->getThreadId();
4065 thr_data *thrptr = g_thr_repository.m_thread + thr_no;
4067 if (unlikely(pthread_equal(thrptr->m_thr_id, pthread_self()) == 0))
4069 fprintf(stderr,
"mt_assert_own_thread() - assertion-failure\n");