19 #include <ndb_global.h>
20 #include <NdbThread.h>
21 #include <my_pthread.h>
24 #include <NdbCondition.h>
26 #ifdef HAVE_LINUX_SCHEDULING
30 #include <sys/types.h>
33 #include <sys/syscall.h>
34 #elif defined HAVE_SOLARIS_AFFINITY
35 #include <sys/types.h>
37 #include <sys/processor.h>
38 #include <sys/procset.h>
41 static int g_min_prio = 0;
42 static int g_max_prio = 0;
43 static int g_prio = 0;
45 static NdbMutex *g_ndb_thread_mutex = 0;
48 #ifdef NDB_SHM_TRANSPORTER
49 int g_ndb_shm_signum= 0;
52 static int f_high_prio_set = 0;
53 static int f_high_prio_policy;
54 static int f_high_prio_prio;
60 #if defined HAVE_SOLARIS_AFFINITY
62 #elif defined HAVE_LINUX_SCHEDULING
66 NDB_THREAD_FUNC * func;
70 #ifdef NDB_SHM_TRANSPORTER
71 void NdbThread_set_shm_sigmask(my_bool
block)
77 sigaddset(&mask, g_ndb_shm_signum);
79 pthread_sigmask(SIG_BLOCK, &mask, 0);
81 pthread_sigmask(SIG_UNBLOCK, &mask, 0);
91 #if defined HAVE_SOLARIS_AFFINITY
92 thr->tid = _lwp_self();
93 #elif defined HAVE_LINUX_SCHEDULING
94 thr->tid = syscall(SYS_gettid);
95 if (thr->tid == (pid_t)-1)
110 #if defined HAVE_SOLARIS_AFFINITY
111 return (
int)thr->tid;
112 #elif defined HAVE_LINUX_SCHEDULING
113 return (
int)thr->tid;
120 ndb_thread_wrapper(
void* _ss){
123 DBUG_ENTER(
"ndb_thread_wrapper");
124 #ifdef NDB_SHM_TRANSPORTER
125 NdbThread_set_shm_sigmask(TRUE);
128 #ifdef HAVE_PTHREAD_SIGMASK
136 pthread_sigmask(SIG_BLOCK, &mask, 0);
144 NdbMutex_Lock(g_ndb_thread_mutex);
146 NdbCondition_Signal(g_ndb_thread_condition);
147 NdbMutex_Unlock(g_ndb_thread_mutex);
148 ret= (* ss->func)(ss->object);
159 NdbThread_CreateObject(
const char *
name)
162 DBUG_ENTER(
"NdbThread_Create");
165 if (tmpThread == NULL)
168 bzero(tmpThread,
sizeof(* tmpThread));
171 strnmov(tmpThread->thread_name, name,
sizeof(tmpThread->thread_name));
175 strnmov(tmpThread->thread_name,
"main",
sizeof(tmpThread->thread_name));
178 #ifdef HAVE_PTHREAD_SELF
179 tmpThread->thread = pthread_self();
180 #elif defined HAVE_GETPID
181 tmpThread->thread = getpid();
184 tmpThread->inited = 1;
190 NdbThread_Create(NDB_THREAD_FUNC *p_thread_func,
191 NDB_THREAD_ARG *p_thread_arg,
192 const NDB_THREAD_STACKSIZE _stack_size,
193 const char* p_thread_name,
194 NDB_THREAD_PRIO thread_prio)
198 pthread_attr_t thread_attr;
199 NDB_THREAD_STACKSIZE thread_stack_size;
201 DBUG_ENTER(
"NdbThread_Create");
204 if (_stack_size == 0)
205 thread_stack_size = 64 * 1024 * SIZEOF_CHARP/4;
207 thread_stack_size = _stack_size * SIZEOF_CHARP/4;
211 if (p_thread_func == NULL)
215 if (tmpThread == NULL)
218 DBUG_PRINT(
"info",(
"thread_name: %s", p_thread_name));
220 strnmov(tmpThread->thread_name,p_thread_name,
sizeof(tmpThread->thread_name));
222 pthread_attr_init(&thread_attr);
223 #ifdef PTHREAD_STACK_MIN
224 if (thread_stack_size < PTHREAD_STACK_MIN)
225 thread_stack_size = PTHREAD_STACK_MIN;
227 DBUG_PRINT(
"info", (
"stack_size: %llu", (ulonglong)thread_stack_size));
228 pthread_attr_setstacksize(&thread_attr, thread_stack_size);
229 #ifdef USE_PTHREAD_EXTRAS
231 pthread_attr_setguardsize(&thread_attr, 2048);
234 #ifdef PTHREAD_CREATE_JOINABLE
235 pthread_attr_setdetachstate(&thread_attr, PTHREAD_CREATE_JOINABLE);
237 tmpThread->inited = 0;
238 tmpThread->func= p_thread_func;
239 tmpThread->object= p_thread_arg;
241 NdbMutex_Lock(g_ndb_thread_mutex);
242 result = pthread_create(&tmpThread->thread,
247 pthread_attr_destroy(&thread_attr);
251 NdbMem_Free((
char *)tmpThread);
252 NdbMutex_Unlock(g_ndb_thread_mutex);
256 if (thread_prio == NDB_THREAD_PRIO_HIGH && f_high_prio_set)
258 #ifdef HAVE_PTHREAD_SETSCHEDPARAM
259 struct sched_param param;
260 bzero(¶m,
sizeof(param));
261 param.sched_priority = f_high_prio_prio;
262 if (pthread_setschedparam(tmpThread->thread, f_high_prio_policy, ¶m))
263 perror(
"pthread_setschedparam failed");
269 NdbCondition_WaitTimeout(g_ndb_thread_condition, g_ndb_thread_mutex, 100);
270 }
while (tmpThread->inited == 0);
272 NdbMutex_Unlock(g_ndb_thread_mutex);
274 DBUG_PRINT(
"exit",(
"ret: 0x%lx", (
long) tmpThread));
275 DBUG_RETURN(tmpThread);
279 void NdbThread_Destroy(
struct NdbThread** p_thread)
281 DBUG_ENTER(
"NdbThread_Destroy");
282 if (*p_thread != NULL){
283 DBUG_PRINT(
"enter",(
"*p_thread: 0x%lx", (
long) *p_thread));
291 int NdbThread_WaitFor(
struct NdbThread* p_wait_thread,
void** status)
295 if (p_wait_thread == NULL)
298 if (p_wait_thread->thread == 0)
301 result = pthread_join(p_wait_thread->thread, status);
307 void NdbThread_Exit(
void *status)
310 pthread_exit(status);
314 int NdbThread_SetConcurrencyLevel(
int level)
316 #ifdef USE_PTHREAD_EXTRAS
317 return pthread_setconcurrency(level);
325 get_max_prio(
int policy)
328 #ifdef HAVE_SCHED_GET_PRIORITY_MAX
329 max_prio = sched_get_priority_max(policy);
341 get_min_prio(
int policy)
344 #ifdef HAVE_SCHED_GET_PRIORITY_MIN
345 min_prio = sched_get_priority_min(policy);
354 get_prio(my_bool rt_prio, my_bool high_prio,
int policy)
360 g_max_prio = get_max_prio(policy);
361 g_min_prio = get_min_prio(policy);
372 g_prio = g_min_prio + 3;
374 g_prio = g_min_prio + 1;
375 if (g_prio < g_min_prio)
381 NdbThread_SetScheduler(
struct NdbThread* pThread,
386 #if defined HAVE_LINUX_SCHEDULING
387 int ret, policy, prio;
388 struct sched_param loc_sched_param;
392 prio = get_prio(rt_prio, high_prio, policy);
396 policy = SCHED_OTHER;
399 bzero(&loc_sched_param,
sizeof(loc_sched_param));
400 loc_sched_param.sched_priority = prio;
401 ret= sched_setscheduler(pThread->tid, policy, &loc_sched_param);
404 #elif defined HAVE_PTHREAD_SET_SCHEDPARAM
409 int ret, policy, prio;
410 struct sched_param loc_sched_param;
414 prio = get_prio(rt_prio, high_prio, policy);
418 policy = SCHED_OTHER;
421 bzero(&loc_sched_param,
sizeof(loc_sched_param));
422 loc_sched_param.sched_priority = prio;
423 ret= pthread_setschedparam(pThread->thread, policy, &loc_sched_param);
434 NdbThread_LockCPU(
struct NdbThread* pThread, Uint32 cpu_id)
437 #if defined HAVE_LINUX_SCHEDULING
452 CPU_SET(cpu_id, &cpu_set);
453 ret= sched_setaffinity(pThread->tid,
sizeof(cpu_set), &cpu_set);
456 #elif defined HAVE_SOLARIS_AFFINITY
465 ret= processor_bind(P_LWPID, pThread->tid, cpu_id, NULL);
474 static pthread_key(
void*, tls_keys[NDB_THREAD_TLS_MAX]);
476 void *NdbThread_GetTlsKey(NDB_THREAD_TLS key)
478 return pthread_getspecific(tls_keys[key]);
481 void NdbThread_SetTlsKey(NDB_THREAD_TLS key,
void *value)
483 pthread_setspecific(tls_keys[key], value);
489 g_ndb_thread_mutex = NdbMutex_Create();
490 g_ndb_thread_condition = NdbCondition_Create();
491 pthread_key_create(&(tls_keys[NDB_THREAD_TLS_JAM]), NULL);
492 pthread_key_create(&(tls_keys[NDB_THREAD_TLS_THREAD]), NULL);
499 if (g_ndb_thread_mutex)
501 NdbMutex_Destroy(g_ndb_thread_mutex);
504 if (g_ndb_thread_condition)
506 NdbCondition_Destroy(g_ndb_thread_condition);
511 NdbThread_SetHighPrioProperties(
const char * spec)
526 while ((* spec ==
' ') || (*spec ==
'\t'))
536 prio = strchr(copy,
',');
543 if (prio && strchr(prio,
','))
552 #ifdef HAVE_PTHREAD_SETSCHEDPARAM
555 if (strcmp(
"fifo", copy) == 0)
558 f_high_prio_policy = SCHED_FIFO;
562 if (strcmp(
"rr", copy) == 0)
565 f_high_prio_policy = SCHED_RR;
574 f_high_prio_prio = 50;
578 long p = strtol(prio, &endptr, 10);
584 f_high_prio_prio = (int)p;