277 #include <waiting_threads.h>
278 #include <m_string.h>
295 uint32 wt_success_stats;
299 #ifdef SAFE_STATISTICS
300 #define incr(VAR, LOCK) \
302 my_atomic_rwlock_wrlock(&(LOCK)); \
303 my_atomic_add32(&(VAR), 1); \
304 my_atomic_rwlock_wrunlock(&(LOCK)); \
307 #define incr(VAR,LOCK) do { (VAR)++; } while(0)
310 static void increment_success_stats()
312 incr(wt_success_stats, success_stats_lock);
315 static void increment_cycle_stats(uint depth, uint slot)
317 if (depth >= WT_CYCLE_STATS)
318 depth= WT_CYCLE_STATS;
322 static void increment_wait_stats(ulonglong waited,
int ret)
325 if ((ret) == ETIMEDOUT)
328 for (i= 0; i < WT_WAIT_STATS && waited/10 >
wt_wait_table[
i]; i++) ;
357 enum { ACTIVE, FREE } state;
366 #ifdef WT_RWLOCKS_USE_MUTEXES
401 uint pending_writers: 15;
402 uint write_locked: 1;
411 #ifdef WT_RWLOCKS_USE_MUTEXES
419 DBUG_ASSERT(rc->lock.write_locked == 0);
420 DBUG_ASSERT(rc->lock.readers == 0);
426 DBUG_PRINT(
"wt", (
"TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
428 while (rc->lock.write_locked)
432 DBUG_PRINT(
"wt", (
"LOCK resid=%ld for READ", (ulong)rc->id.value));
436 DBUG_PRINT(
"wt", (
"TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
438 while (rc->lock.write_locked || rc->lock.readers)
440 rc->lock.write_locked= 1;
442 DBUG_PRINT(
"wt", (
"LOCK resid=%ld for WRITE", (ulong)rc->id.value));
446 DBUG_PRINT(
"wt", (
"UNLOCK resid=%ld", (ulong)rc->id.value));
448 if (rc->lock.write_locked)
450 rc->lock.write_locked= 0;
453 else if (--rc->lock.readers == 0)
460 my_rwlock_init(&rc->lock, 0);
464 rwlock_destroy(&rc->lock);
468 DBUG_PRINT(
"wt", (
"TRYLOCK resid=%ld for READ", (ulong)rc->id.value));
469 rw_rdlock(&rc->lock);
470 DBUG_PRINT(
"wt", (
"LOCK resid=%ld for READ", (ulong)rc->id.value));
474 DBUG_PRINT(
"wt", (
"TRYLOCK resid=%ld for WRITE", (ulong)rc->id.value));
475 rw_wrlock(&rc->lock);
476 DBUG_PRINT(
"wt", (
"LOCK resid=%ld for WRITE", (ulong)rc->id.value));
480 DBUG_PRINT(
"wt", (
"UNLOCK resid=%ld", (ulong)rc->id.value));
481 rw_unlock(&rc->lock);
497 static void wt_resource_init(uchar *arg)
500 DBUG_ENTER(
"wt_resource_init");
502 memset(rc, 0,
sizeof(*rc));
505 my_init_dynamic_array(&rc->owners,
sizeof(
WT_THD *), 0, 5);
515 static void wt_resource_destroy(uchar *arg)
518 DBUG_ENTER(
"wt_resource_destroy");
520 DBUG_ASSERT(rc->owners.elements == 0);
521 rc_rwlock_destroy(rc);
523 delete_dynamic(&rc->owners);
529 DBUG_ENTER(
"wt_init");
530 DBUG_ASSERT(reshash.alloc.constructor != wt_resource_init);
532 lf_hash_init(&reshash,
sizeof(
WT_RESOURCE), LF_HASH_UNIQUE, 0,
533 sizeof_WT_RESOURCE_ID, 0, 0);
534 reshash.alloc.constructor= wt_resource_init;
535 reshash.alloc.destructor= wt_resource_destroy;
550 double to= log(60e6);
551 for (i= 0; i < WT_WAIT_STATS; i++)
553 wt_wait_table[
i]= (ulonglong)exp((to-from)/(WT_WAIT_STATS-1)*i+from);
557 my_atomic_rwlock_init(&cycle_stats_lock);
558 my_atomic_rwlock_init(&success_stats_lock);
559 my_atomic_rwlock_init(&wait_stats_lock);
565 DBUG_ENTER(
"wt_end");
567 DBUG_ASSERT(reshash.count == 0);
568 lf_hash_destroy(&reshash);
569 my_atomic_rwlock_destroy(&cycle_stats_lock);
570 my_atomic_rwlock_destroy(&success_stats_lock);
571 my_atomic_rwlock_destroy(&wait_stats_lock);
595 const ulong *dl,
const ulong *tl)
597 DBUG_ENTER(
"wt_thd_lazy_init");
600 thd->deadlock_search_depth_short= ds;
601 thd->timeout_short= ts;
602 thd->deadlock_search_depth_long= dl;
603 thd->timeout_long= tl;
605 my_init_dynamic_array(&thd->my_resources,
sizeof(
WT_RESOURCE *), 0, 5);
607 thd->name= my_thread_name();
620 static int fix_thd_pins(
WT_THD *thd)
622 if (unlikely(thd->pins == 0))
624 thd->pins= lf_hash_get_pins(&reshash);
626 thd->name= my_thread_name();
629 return thd->pins == 0;
632 void wt_thd_destroy(
WT_THD *thd)
634 DBUG_ENTER(
"wt_thd_destroy");
636 DBUG_ASSERT(thd->my_resources.elements == 0);
637 DBUG_ASSERT(thd->waiting_for == 0);
640 lf_hash_put_pins(thd->pins);
642 delete_dynamic(&thd->my_resources);
654 compile_time_assert(offsetof(WT_RESOURCE_ID,
type) ==
sizeof(ulonglong));
655 return memcmp(a, b, sizeof_WT_RESOURCE_ID);
673 if (found->weight < arg->
victim->weight)
677 rc_unlock(arg->
victim->waiting_for);
691 WT_RESOURCE *rc, *
volatile *shared_ptr= &blocker->waiting_for;
695 DBUG_ENTER(
"deadlock_search");
696 DBUG_PRINT(
"wt", (
"enter: thd=%s, blocker=%s, depth=%u",
697 arg->
thd->name, blocker->name, depth));
705 DBUG_PRINT(
"wt", (
"exit: WT_DEPTH_EXCEEDED (early)"));
706 DBUG_RETURN(WT_DEPTH_EXCEEDED);
717 lf_pin(arg->
thd->pins, 0, rc);
718 }
while (rc != *shared_ptr && LF_BACKOFF);
722 DBUG_PRINT(
"wt", (
"exit: OK (early)"));
727 if (rc->state != ACTIVE || *shared_ptr != rc)
731 lf_unpin(arg->
thd->pins, 0);
735 lf_unpin(arg->
thd->pins, 0);
756 for (i= 0; i < rc->owners.elements; i++)
758 cursor= *dynamic_element(&rc->owners, i,
WT_THD**);
771 if (cursor == arg->
thd)
774 increment_cycle_stats(depth, arg->
max_depth ==
775 *arg->
thd->deadlock_search_depth_long);
780 for (i= 0; i < rc->owners.elements; i++)
782 cursor= *dynamic_element(&rc->owners, i,
WT_THD**);
783 switch (deadlock_search(arg, cursor, depth+1)) {
786 case WT_DEPTH_EXCEEDED:
787 ret= WT_DEPTH_EXCEEDED;
791 change_victim(cursor, arg);
792 i= rc->owners.elements;
831 DBUG_PRINT(
"wt", (
"exit: %s",
832 ret == WT_DEPTH_EXCEEDED ?
"WT_DEPTH_EXCEEDED" :
833 ret ?
"WT_DEADLOCK" :
"OK"));
856 static int deadlock(
WT_THD *thd,
WT_THD *blocker, uint depth,
861 DBUG_ENTER(
"deadlock");
862 DBUG_ASSERT(depth < 2);
863 ret= deadlock_search(&arg, blocker, depth);
864 if (ret == WT_DEPTH_EXCEEDED)
866 increment_cycle_stats(WT_CYCLE_STATS, max_depth ==
867 *thd->deadlock_search_depth_long);
874 if (ret == WT_DEADLOCK && depth)
875 change_victim(blocker, &arg);
888 if (depth == 0 && ret == WT_OK && arg.
last_locked_rc->owners.elements == 0)
890 DBUG_ASSERT(thd == blocker);
897 if (ret == WT_DEADLOCK && arg.
victim != thd)
899 DBUG_PRINT(
"wt", (
"killing %s", arg.
victim->name));
902 rc_unlock(arg.
victim->waiting_for);
918 DBUG_ENTER(
"unlock_lock_and_free_resource");
920 DBUG_ASSERT(rc->state == ACTIVE);
922 if (rc->owners.elements || rc->waiter_count)
924 DBUG_PRINT(
"wt", (
"nothing to do, %u owners, %u waiters",
925 rc->owners.elements, rc->waiter_count));
930 if (fix_thd_pins(thd))
939 keylen= sizeof_WT_RESOURCE_ID;
951 DBUG_RETURN(lf_hash_delete(&reshash, thd->pins, key, keylen) == -1);
961 static int stop_waiting_locked(
WT_THD *thd)
965 DBUG_ENTER(
"stop_waiting_locked");
967 DBUG_ASSERT(rc->waiter_count);
968 DBUG_ASSERT(rc->state == ACTIVE);
971 ret= unlock_lock_and_free_resource(thd, rc);
972 DBUG_RETURN((thd->killed || ret) ? WT_DEADLOCK : WT_OK);
980 static int stop_waiting(
WT_THD *thd)
984 DBUG_ENTER(
"stop_waiting");
993 ret= stop_waiting_locked(thd);
1012 const WT_RESOURCE_ID *resid)
1016 DBUG_ENTER(
"wt_thd_will_wait_for");
1020 DBUG_PRINT(
"wt", (
"enter: thd=%s, blocker=%s, resid=%lu",
1021 thd->name, blocker->name, (ulong)resid->value));
1023 if (fix_thd_pins(thd))
1024 DBUG_RETURN(WT_DEADLOCK);
1026 if (thd->waiting_for == 0)
1033 keylen= sizeof_WT_RESOURCE_ID;
1036 DBUG_PRINT(
"wt", (
"first blocker"));
1039 while ((rc= lf_hash_search(&reshash, thd->pins, key, keylen)) == 0)
1043 DBUG_PRINT(
"wt", (
"failed to find rc in hash, inserting"));
1044 memset(&tmp, 0,
sizeof(tmp));
1048 if (lf_hash_insert(&reshash, thd->pins, &tmp) == -1)
1049 DBUG_RETURN(WT_DEADLOCK);
1059 if (rc == MY_ERRPTR)
1060 DBUG_RETURN(WT_DEADLOCK);
1062 DBUG_PRINT(
"wt", (
"found in hash rc=%p", rc));
1065 if (rc->state != ACTIVE)
1067 DBUG_PRINT(
"wt", (
"but it's not active, retrying"));
1070 lf_hash_search_unpin(thd->pins);
1074 lf_hash_search_unpin(thd->pins);
1075 thd->waiting_for= rc;
1081 DBUG_ASSERT(thd->waiting_for->id.type == resid->type);
1082 DBUG_ASSERT(resid->type->compare(&thd->waiting_for->id, resid) == 0);
1083 DBUG_PRINT(
"wt", (
"adding another blocker"));
1089 rc= thd->waiting_for;
1091 DBUG_ASSERT(rc->waiter_count);
1092 DBUG_ASSERT(rc->state == ACTIVE);
1096 stop_waiting_locked(thd);
1097 DBUG_RETURN(WT_DEADLOCK);
1104 for (i= 0; i < rc->owners.elements; i++)
1105 if (*dynamic_element(&rc->owners, i,
WT_THD**) == blocker)
1107 if (i >= rc->owners.elements)
1109 if (push_dynamic(&blocker->my_resources, (
void*)&rc))
1111 stop_waiting_locked(thd);
1112 DBUG_RETURN(WT_DEADLOCK);
1114 if (push_dynamic(&rc->owners, (
void*)&blocker))
1116 pop_dynamic(&blocker->my_resources);
1117 stop_waiting_locked(thd);
1118 DBUG_RETURN(WT_DEADLOCK);
1123 if (deadlock(thd, blocker, 1, *thd->deadlock_search_depth_short) != WT_OK)
1126 DBUG_RETURN(WT_DEADLOCK);
1141 int ret= WT_TIMEOUT;
1143 ulonglong before, after, starttime;
1145 DBUG_ENTER(
"wt_thd_cond_timedwait");
1146 DBUG_PRINT(
"wt", (
"enter: thd=%s, rc=%p", thd->name, rc));
1150 DBUG_ASSERT(rc->cond_mutex == mutex);
1152 rc->cond_mutex= mutex;
1156 before= starttime= my_getsystime();
1170 GetSystemTimeAsFileTime((PFILETIME)&starttime);
1174 if (rc->owners.elements == 0)
1178 set_timespec_time_nsec(timeout, starttime, (*thd->timeout_short)*1000ULL);
1179 if (ret == WT_TIMEOUT && !thd->killed)
1181 if (ret == WT_TIMEOUT && !thd->killed)
1183 int r= deadlock(thd, thd, 0, *thd->deadlock_search_depth_long);
1184 if (r == WT_FREE_TO_GO)
1186 else if (r != WT_OK)
1188 else if (*thd->timeout_long > *thd->timeout_short)
1190 set_timespec_time_nsec(timeout, starttime, (*thd->timeout_long)*1000ULL);
1195 after= my_getsystime();
1196 if (stop_waiting(thd) == WT_DEADLOCK)
1198 increment_wait_stats(after-before, ret);
1200 increment_success_stats();
1216 DBUG_ENTER(
"wt_thd_release");
1218 for (i= 0; i < thd->my_resources.elements; i++)
1221 if (!resid || (resid->type->compare(&rc->id, resid) == 0))
1230 DBUG_ASSERT(rc->state == ACTIVE);
1231 for (j= 0; j < rc->owners.elements; j++)
1232 if (*dynamic_element(&rc->owners, j,
WT_THD**) == thd)
1234 DBUG_ASSERT(j < rc->owners.elements);
1235 delete_dynamic_element(&rc->owners, j);
1236 if (rc->owners.elements == 0)
1244 unlock_lock_and_free_resource(thd, rc);
1247 delete_dynamic_element(&thd->my_resources, i);
1253 reset_dynamic(&thd->my_resources);