18 #include "mt_thr_config.hpp"
19 #include <kernel/ndb_limits.h>
20 #include "../../common/util/parse_mask.hpp"
22 #ifndef TEST_MT_THR_CONFIG
23 #define SUPPORT_CPU_SET 0
25 #define SUPPORT_CPU_SET 1
31 {
"main", THRConfig::T_MAIN, 1, 1 },
32 {
"ldm", THRConfig::T_LDM, 1, MAX_NDBMT_LQH_THREADS },
33 {
"recv", THRConfig::T_RECV, 1, 1 },
34 {
"rep", THRConfig::T_REP, 1, 1 },
35 {
"io", THRConfig::T_IO, 1, 1 }
40 {
"count", THRConfig::Param::S_UNSIGNED },
41 {
"cpubind", THRConfig::Param::S_BITMASK },
42 {
"cpuset", THRConfig::Param::S_BITMASK }
51 getMaxEntries(Uint32
type)
53 for (Uint32
i = 0;
i<NDB_ARRAY_SIZE(m_entries);
i++)
55 if (m_entries[
i].m_type == type)
56 return m_entries[
i].m_max_cnt;
63 getEntryName(Uint32 type)
65 for (Uint32
i = 0;
i<NDB_ARRAY_SIZE(m_entries);
i++)
67 if (m_entries[
i].m_type == type)
68 return m_entries[
i].m_name;
75 getEntryType(
const char * type)
77 for (Uint32
i = 0;
i<NDB_ARRAY_SIZE(m_entries);
i++)
79 if (strcasecmp(type, m_entries[
i].m_name) == 0)
83 return THRConfig::T_END;
86 THRConfig::THRConfig()
91 THRConfig::~THRConfig()
96 THRConfig::setLockExecuteThreadToCPU(
const char * mask)
98 int res = parse_mask(mask, m_LockExecuteThreadToCPU);
101 m_err_msg.
assfmt(
"failed to parse 'LockExecuteThreadToCPU=%s' "
110 THRConfig::setLockIoThreadsToCPU(
unsigned val)
112 m_LockIoThreadsToCPU.set(val);
117 THRConfig::add(T_Type t)
121 tmp.m_bind_type = T_Thread::B_UNBOUND;
122 tmp.m_no = m_threads[t].size();
123 m_threads[t].push_back(tmp);
127 THRConfig::do_parse(
unsigned MaxNoOfExecutionThreads,
128 unsigned __ndbmt_lqh_threads,
129 unsigned __ndbmt_classic)
143 Uint32 lqhthreads = 0;
144 switch(MaxNoOfExecutionThreads){
160 if (__ndbmt_lqh_threads)
162 lqhthreads = __ndbmt_lqh_threads;
169 for(Uint32
i = 0;
i < lqhthreads;
i++)
180 if (m_LockIoThreadsToCPU.count() == 1)
182 m_threads[T_IO][0].m_bind_type = T_Thread::B_CPU_BOUND;
183 m_threads[T_IO][0].m_bind_no = m_LockIoThreadsToCPU.getBitNo(0);
185 else if (m_LockIoThreadsToCPU.count() > 1)
187 unsigned no = createCpuSet(m_LockIoThreadsToCPU);
188 m_threads[T_IO][0].m_bind_type = T_Thread::B_CPUSET_BOUND;
189 m_threads[T_IO][0].m_bind_no = no;
195 for (
unsigned i = 0;
i<m_cpu_sets.size();
i++)
197 for (
unsigned j =
i + 1; j < m_cpu_sets.size(); j++)
199 if (m_cpu_sets[
i].overlaps(m_cpu_sets[j]))
201 m_err_msg.
assfmt(
"Overlapping cpuset's [ %s ] and [ %s ]",
202 m_cpu_sets[
i].str().c_str(),
203 m_cpu_sets[j].str().c_str());
212 for (
unsigned i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
214 for (
unsigned j = 0; j < m_threads[
i].size(); j++)
216 if (m_threads[
i][j].m_bind_type == T_Thread::B_CPU_BOUND)
218 unsigned cpu = m_threads[
i][j].m_bind_no;
219 for (
unsigned k = 0; k<m_cpu_sets.size(); k++)
221 if (m_cpu_sets[k].
get(cpu))
223 m_err_msg.
assfmt(
"Overlapping cpubind %u with cpuset [ %s ]",
225 m_cpu_sets[k].str().c_str());
237 for (
unsigned i = 0;
i<m_cpu_sets.size();
i++)
239 for (
unsigned j = 0; j < m_cpu_sets[
i].count(); j++)
241 m_LockExecuteThreadToCPU.clear(m_cpu_sets[
i].getBitNo(j));
245 unsigned cnt_unbound = 0;
246 for (
unsigned i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
248 for (
unsigned j = 0; j < m_threads[
i].size(); j++)
250 if (m_threads[
i][j].m_bind_type == T_Thread::B_CPU_BOUND)
252 unsigned cpu = m_threads[
i][j].m_bind_no;
253 m_LockExecuteThreadToCPU.clear(cpu);
255 else if (m_threads[
i][j].m_bind_type == T_Thread::B_UNBOUND)
262 if (m_threads[T_IO][0].m_bind_type == T_Thread::B_UNBOUND)
270 if (m_LockExecuteThreadToCPU.count())
276 unsigned cnt = mask.count();
277 unsigned num_threads = cnt_unbound;
278 bool isMtLqh = !m_classic;
280 if (cnt < num_threads)
282 m_info_msg.
assfmt(
"WARNING: Too few CPU's specified with "
283 "LockExecuteThreadToCPU. Only %u specified "
284 " but %u was needed, this may cause contention.\n",
288 if (cnt >= num_threads)
290 m_info_msg.
appfmt(
"Assigning each thread its own CPU\n");
292 for (
unsigned i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
296 for (
unsigned j = 0; j < m_threads[
i].size(); j++)
298 if (m_threads[
i][j].m_bind_type == T_Thread::B_UNBOUND)
300 m_threads[
i][j].m_bind_type = T_Thread::B_CPU_BOUND;
301 m_threads[
i][j].m_bind_no = mask.getBitNo(no);
309 unsigned cpu = mask.getBitNo(0);
310 m_info_msg.
appfmt(
"Assigning all threads to CPU %u\n", cpu);
311 for (
unsigned i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
315 bind_unbound(m_threads[
i], cpu);
320 unsigned unbound_ldm = count_unbound(m_threads[T_LDM]);
321 if (cnt > unbound_ldm)
326 m_info_msg.
append(
"Assigning LQH threads to dedicated CPU(s) and "
327 "other threads will share remaining\n");
328 unsigned cpu = mask.find(0);
329 for (
unsigned i = 0;
i < m_threads[T_LDM].size();
i++)
331 if (m_threads[T_LDM][
i].m_bind_type == T_Thread::B_UNBOUND)
333 m_threads[T_LDM][
i].m_bind_type = T_Thread::B_CPU_BOUND;
334 m_threads[T_LDM][
i].m_bind_no = cpu;
336 cpu = mask.find(cpu + 1);
341 bind_unbound(m_threads[T_MAIN], cpu);
342 bind_unbound(m_threads[T_REP], cpu);
343 if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
347 bind_unbound(m_threads[T_RECV], cpu);
353 unsigned cpu = mask.find(0);
354 m_info_msg.
appfmt(
"Assigning LQH threads round robin to CPU(s) and "
355 "other threads will share CPU %u\n", cpu);
356 bind_unbound(m_threads[T_MAIN], cpu);
357 bind_unbound(m_threads[T_REP], cpu);
358 bind_unbound(m_threads[T_RECV], cpu);
362 for (
unsigned i = 0;
i < m_threads[T_LDM].size();
i++)
364 if (m_threads[T_LDM][
i].m_bind_type == T_Thread::B_UNBOUND)
366 m_threads[T_LDM][
i].m_bind_type = T_Thread::B_CPU_BOUND;
367 m_threads[T_LDM][
i].m_bind_no = cpu;
368 if ((cpu = mask.find(cpu + 1)) == mask.NotFound)
378 unsigned cpu = mask.find(0);
379 m_info_msg.
appfmt(
"Assigning LQH thread to CPU %u and "
380 "other threads will share\n", cpu);
381 bind_unbound(m_threads[T_LDM], cpu);
382 cpu = mask.find(cpu + 1);
383 bind_unbound(m_threads[T_MAIN], cpu);
384 bind_unbound(m_threads[T_RECV], cpu);
395 for (
unsigned i = 0;
i < vec.size();
i++)
397 if (vec[
i].m_bind_type == T_Thread::B_UNBOUND)
406 for (
unsigned i = 0;
i < vec.size();
i++)
408 if (vec[
i].m_bind_type == T_Thread::B_UNBOUND)
410 vec[
i].m_bind_type = T_Thread::B_CPU_BOUND;
411 vec[
i].m_bind_no = cpu;
422 for (
unsigned i = 0;
i< NDB_ARRAY_SIZE(m_threads);
i++)
424 if (m_threads[
i].
size() > getMaxEntries(
i))
426 m_err_msg.
assfmt(
"Too many instances(%u) of %s max supported: %u",
437 if (m_threads[T_LDM].
size() == 3)
439 m_err_msg.
assfmt(
"No of LDM-instances can be 1,2,4. Specified: %u",
440 m_threads[T_LDM].
size());
448 THRConfig::getConfigString()
450 m_cfg_string.
clear();
451 const char * sep =
"";
452 for (
unsigned i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
454 if (m_threads[
i].
size())
456 const char *
name = getEntryName(
i);
459 for (
unsigned j = 0; j < m_threads[
i].size(); j++)
463 m_cfg_string.
append(name);
464 if (m_threads[
i][j].m_bind_type != T_Thread::B_UNBOUND)
466 m_cfg_string.
append(
"={");
467 if (m_threads[
i][j].m_bind_type == T_Thread::B_CPU_BOUND)
469 m_cfg_string.
appfmt(
"cpubind=%u", m_threads[
i][j].m_bind_no);
471 else if (m_threads[
i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
473 m_cfg_string.
appfmt(
"cpuset=%s",
474 m_cpu_sets[m_threads[
i][j].m_bind_no].str().c_str());
482 for (
unsigned j = 0; j < m_threads[
i].size(); j++)
484 if (m_threads[
i][j].m_bind_type != T_Thread::B_UNBOUND)
488 m_cfg_string.
append(name);
489 m_cfg_string.
append(
"={");
490 if (m_threads[
i][j].m_bind_type == T_Thread::B_CPU_BOUND)
492 m_cfg_string.
appfmt(
"cpubind=%u", m_threads[
i][j].m_bind_no);
494 else if (m_threads[
i][j].m_bind_type == T_Thread::B_CPUSET_BOUND)
496 m_cfg_string.
appfmt(
"cpuset=%s",
497 m_cpu_sets[m_threads[
i][j].m_bind_no].str().c_str());
505 return m_cfg_string.
c_str();
509 THRConfig::getThreadCount()
const
513 for (Uint32
i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
517 cnt += m_threads[
i].size();
524 THRConfig::getThreadCount(T_Type type)
const
526 for (Uint32
i = 0;
i < NDB_ARRAY_SIZE(m_threads);
i++)
528 if (
i == (Uint32)
type)
530 return m_threads[
i].size();
537 THRConfig::getErrorMessage()
const
539 if (m_err_msg.
empty())
541 return m_err_msg.
c_str();
545 THRConfig::getInfoMessage()
const
547 if (m_info_msg.
empty())
549 return m_info_msg.
c_str();
554 skipblank(
char * str)
556 while (isspace(* str))
562 THRConfig::find_type(
char *& str)
564 str = skipblank(str);
569 m_err_msg.
assfmt(
"empty thread specification");
573 while(isalpha(* end))
578 Uint32 t = getEntryType(name);
581 m_err_msg.
assfmt(
"unknown thread type '%s'", name);
592 const char * string_val;
593 unsigned unsigned_val;
599 parseUnsigned(
char *& str,
unsigned * dst)
601 str = skipblank(str);
604 long val = strtol(str, &endptr, 0);
607 if (val < 0 || Int64(val) > 0xFFFFFFFF)
612 *dst = (unsigned)val;
620 str = skipblank(str);
621 size_t len = strspn(str,
"0123456789-, ");
625 while (isspace(str[len-1]))
627 if (str[len-1] ==
',')
629 char save = str[len];
631 int res = parse_mask(str, *mask);
641 const char *
const save = str;
644 str = skipblank(str);
647 for (; idx < NDB_ARRAY_SIZE(m_params); idx++)
650 #if ! SUPPORT_CPU_SET
651 if (idx == IX_CPUSET)
655 if (strncasecmp(str, m_params[idx].name, strlen(m_params[idx].name)) == 0)
657 str += strlen(m_params[idx].name);
662 if (idx == NDB_ARRAY_SIZE(m_params))
664 err.
assfmt(
"Unknown param near: '%s'", str);
668 if (values[idx].found ==
true)
670 err.
assfmt(
"Param '%s' found twice", m_params[idx].name);
674 str = skipblank(str);
677 err.
assfmt(
"Missing '=' after %s in '%s'", m_params[idx].name, save);
681 str = skipblank(str);
684 switch(m_params[idx].type){
685 case THRConfig::Param::S_UNSIGNED:
686 res = parseUnsigned(str, &values[idx].unsigned_val);
688 case THRConfig::Param::S_BITMASK:
689 res = parseBitmask(str, &values[idx].mask_val);
692 err.
assfmt(
"Internal error, unknown type for param: '%s'",
698 err.
assfmt(
"Unable to parse %s=%s", m_params[idx].name, str);
701 values[idx].found =
true;
702 str = skipblank(str);
709 err.
assfmt(
"Unable to parse near '%s'", str);
720 str = skipblank(str);
732 int len = (int)strlen(str);
733 m_err_msg.
assfmt(
"Invalid format near: '%.*s'",
734 (len > 10) ? 10 : len, str);
739 str = skipblank(str);
752 while (* str && (* str) !=
'}')
765 values[IX_COUNT].unsigned_val = 1;
766 int res = parseParams(start, values, m_err_msg);
774 if (values[IX_CPUBOUND].found && values[IX_CPUSET].found)
776 m_err_msg.
assfmt(
"Both cpuset and cpubind specified!");
780 unsigned cnt = values[IX_COUNT].unsigned_val;
781 const int index = m_threads[
type].size();
782 for (
unsigned i = 0;
i < cnt;
i++)
787 assert(m_threads[type].
size() == index + cnt);
788 if (values[IX_CPUSET].found)
791 unsigned no = createCpuSet(mask);
792 for (
unsigned i = 0;
i < cnt;
i++)
794 m_threads[
type][index+
i].m_bind_type = T_Thread::B_CPUSET_BOUND;
795 m_threads[
type][index+
i].m_bind_no = no;
798 else if (values[IX_CPUBOUND].found)
801 if (mask.count() < cnt)
803 m_err_msg.
assfmt(
"%s: trying to bind %u threads to %u cpus [%s]",
810 for (
unsigned i = 0;
i < cnt;
i++)
812 m_threads[
type][index+
i].m_bind_type = T_Thread::B_CPU_BOUND;
813 m_threads[
type][index+
i].m_bind_no = mask.getBitNo(
i % mask.count());
822 THRConfig::find_next(
char *& str)
824 str = skipblank(str);
830 else if (* str ==
',')
836 int len = (int)strlen(str);
837 m_err_msg.
assfmt(
"Invalid format near: '%.*s'",
838 (len > 10) ? 10 : len, str);
846 char * ptr = (
char*)str.c_str();
849 Uint32 type = find_type(ptr);
856 int ret = find_next(ptr);
864 for (Uint32
i = 0;
i < T_END;
i++)
866 while (m_threads[
i].
size() < m_entries[
i].m_min_cnt)
876 for (
unsigned i = 0;
i < m_cpu_sets.size();
i++)
877 if (m_cpu_sets[
i].equal(mask))
880 m_cpu_sets.push_back(mask);
881 return m_cpu_sets.size() - 1;
887 #ifndef TEST_MT_THR_CONFIG
888 #include <BlockNumbers.h>
889 #include <NdbThread.h>
893 findBlock(Uint32 blockNo,
const unsigned short list[],
unsigned cnt)
895 for (Uint32
i = 0;
i < cnt;
i++)
897 if (blockToMain(list[
i]) == blockNo)
898 return blockToInstance(list[i]);
904 THRConfigApplier::find_thread(
const unsigned short instancelist[],
unsigned cnt)
const
907 if ((instanceNo = findBlock(SUMA, instancelist, cnt)) >= 0)
909 return &m_threads[T_REP][instanceNo];
911 else if ((instanceNo = findBlock(CMVMI, instancelist, cnt)) >= 0)
913 return &m_threads[T_RECV][instanceNo];
915 else if ((instanceNo = findBlock(DBDIH, instancelist, cnt)) >= 0)
917 return &m_threads[T_MAIN][instanceNo];
919 else if ((instanceNo = findBlock(DBLQH, instancelist, cnt)) >= 0)
921 return &m_threads[T_LDM][instanceNo - 1];
928 const unsigned short list[],
unsigned cnt)
const
930 const T_Thread*
thr = find_thread(list, cnt);
932 str.
appfmt(
"(%s) ", getEntryName(thr->m_type));
933 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
935 str.
appfmt(
"cpu: %u ", thr->m_bind_no);
937 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
939 str.
appfmt(
"cpuset: [ %s ] ", m_cpu_sets[thr->m_bind_no].str().c_str());
944 THRConfigApplier::create_cpusets()
950 THRConfigApplier::do_bind(
NdbThread* thread,
951 const unsigned short list[],
unsigned cnt)
953 const T_Thread* thr = find_thread(list, cnt);
954 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
956 int res = NdbThread_LockCPU(thread, thr->m_bind_no);
963 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
972 THRConfigApplier::do_bind_io(
NdbThread* thread)
974 const T_Thread* thr = &m_threads[T_IO][0];
975 if (thr->m_bind_type == T_Thread::B_CPU_BOUND)
977 int res = NdbThread_LockCPU(thread, thr->m_bind_no);
984 else if (thr->m_bind_type == T_Thread::B_CPUSET_BOUND)
993 #ifdef TEST_MT_THR_CONFIG
995 #include <NdbTap.hpp>
997 TAPTEST(mt_thr_config)
1001 OK(tmp.do_parse(8, 0, 0) == 0);
1011 "ldm={count=3},ldm",
1012 "ldm={cpubind=1-2,5,count=3},ldm",
1013 "ldm={ cpubind = 1- 2, 5 , count = 3 },ldm",
1014 "ldm={count=3,cpubind=1-2,5 }, ldm",
1015 "ldm={cpuset=1-3,count=3 },ldm",
1020 const char * fail [] =
1023 "ldm={cpubind= 1 , cpuset=2 },ldm",
1024 "ldm={count=4,cpubind=1-3},ldm",
1025 "main,main,ldm,ldm",
1026 "main={ keso=88, count=23},ldm,ldm",
1027 "main={ cpuset=1-3 }, ldm={cpuset=3-4}",
1028 "main={ cpuset=1-3 }, ldm={cpubind=2}",
1032 for (Uint32
i = 0; ok[
i];
i++)
1035 int res = tmp.do_parse(ok[
i]);
1036 printf(
"do_parse(%s) => %s - %s\n", ok[i],
1037 res == 0 ?
"OK" :
"FAIL",
1038 res == 0 ?
"" : tmp.getErrorMessage());
1043 OK(check.do_parse(out.c_str()) == 0);
1044 OK(strcmp(out.c_str(), check.getConfigString()) == 0);
1048 for (Uint32 i = 0; fail[
i]; i++)
1051 int res = tmp.do_parse(fail[i]);
1052 printf(
"do_parse(%s) => %s - %s\n", fail[i],
1053 res == 0 ?
"OK" :
"FAIL",
1054 res == 0 ?
"" : tmp.getErrorMessage());
1068 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7}",
1072 "main={cpubind=5},ldm={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},recv={cpubind=5},rep={cpubind=5}",
1076 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=2},ldm={cpubind=3},recv={cpubind=1},rep={cpubind=1}",
1080 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=2},recv={cpubind=1},rep={cpubind=1}",
1083 "ldm={count=4},io={cpubind=8}",
1084 "main={cpubind=1},ldm={cpubind=2},ldm={cpubind=3},ldm={cpubind=4},ldm={cpubind=5},recv={cpubind=6},rep={cpubind=7},io={cpubind=8}",
1087 "ldm={count=4,cpubind=1,4,5,6}",
1088 "main={cpubind=2},ldm={cpubind=1},ldm={cpubind=4},ldm={cpubind=5},ldm={cpubind=6},recv={cpubind=3},rep={cpubind=7}",
1094 for (
unsigned i = 0; t[
i]; i+= 3)
1097 tmp.setLockExecuteThreadToCPU(t[i+0]);
1098 int res = tmp.do_parse(t[i+1]);
1099 int ok = strcmp(tmp.getConfigString(), t[i+2]) == 0;
1100 printf(
"mask: %s conf: %s => %s(%s) - %s - %s\n",
1103 res == 0 ?
"OK" :
"FAIL",
1104 res == 0 ?
"" : tmp.getErrorMessage(),
1105 tmp.getConfigString(),
1106 ok == 1 ?
"CORRECT" :
"INCORRECT");