18 #define ITEMS_PER_ALLOC 64
20 static char devnull[8192];
21 extern volatile sig_atomic_t memcached_shutdown;
27 STATE_FUNC init_state;
30 enum network_transport transport;
44 pthread_mutex_t conn_lock = PTHREAD_MUTEX_INITIALIZER;
47 static pthread_mutex_t stats_lock;
51 static pthread_mutex_t cqi_freelist_lock;
61 static pthread_t *thread_ids;
67 static int init_count = 0;
68 static pthread_mutex_t init_lock;
69 static pthread_cond_t init_cond;
72 static void thread_libevent_process(
int fd,
short which,
void *arg);
73 static void libevent_tap_process(
int fd,
short which,
void *arg);
78 static void cq_init(
CQ *cq) {
79 pthread_mutex_init(&cq->lock, NULL);
80 pthread_cond_init(&cq->cond, NULL);
93 pthread_mutex_lock(&cq->lock);
96 cq->head = item->next;
100 pthread_mutex_unlock(&cq->lock);
108 static void cq_push(
CQ *cq,
CQ_ITEM *item) {
111 pthread_mutex_lock(&cq->lock);
112 if (NULL == cq->tail)
115 cq->tail->next = item;
117 pthread_cond_signal(&cq->cond);
118 pthread_mutex_unlock(&cq->lock);
124 static CQ_ITEM *cqi_new(
void) {
126 pthread_mutex_lock(&cqi_freelist_lock);
129 cqi_freelist = item->next;
131 pthread_mutex_unlock(&cqi_freelist_lock);
137 item = malloc(
sizeof(
CQ_ITEM) * ITEMS_PER_ALLOC);
146 for (i = 2; i < ITEMS_PER_ALLOC; i++)
147 item[i - 1].next = &item[i];
149 pthread_mutex_lock(&cqi_freelist_lock);
150 item[ITEMS_PER_ALLOC - 1].next = cqi_freelist;
151 cqi_freelist = &item[1];
152 pthread_mutex_unlock(&cqi_freelist_lock);
162 static void cqi_free(
CQ_ITEM *item) {
163 pthread_mutex_lock(&cqi_freelist_lock);
164 item->next = cqi_freelist;
166 pthread_mutex_unlock(&cqi_freelist_lock);
173 static void create_worker(
void *(*func)(
void *),
void *arg, pthread_t *
id) {
177 pthread_attr_init(&attr);
179 if ((ret = pthread_create(
id, &attr, func, arg)) != 0) {
180 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
181 "Can't create thread: %s\n",
191 if (evutil_socketpair(SOCKETPAIR_AF, SOCK_STREAM, 0,
192 (
void*)me->notify) == SOCKET_ERROR) {
193 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
194 "Can't create notify pipe: %s",
199 for (
int j = 0; j < 2; ++j) {
201 setsockopt(me->notify[j], IPPROTO_TCP,
202 TCP_NODELAY, (
void *)&flags,
sizeof(flags));
203 setsockopt(me->notify[j], SOL_SOCKET,
204 SO_REUSEADDR, (
void *)&flags,
sizeof(flags));
207 if (evutil_make_socket_nonblocking(me->notify[j]) == -1) {
208 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
209 "Failed to enable non-blocking: %s",
217 static void setup_dispatcher(
struct event_base *main_base,
218 void (*dispatcher_callback)(
int,
short,
void *))
220 memset(&dispatcher_thread, 0,
sizeof(dispatcher_thread));
221 dispatcher_thread.type = DISPATCHER;
222 dispatcher_thread.base = main_base;
223 dispatcher_thread.thread_id = pthread_self();
224 if (!create_notification_pipe(&dispatcher_thread)) {
228 event_set(&dispatcher_thread.notify_event, dispatcher_thread.notify[0],
229 EV_READ | EV_PERSIST, dispatcher_callback, &dispatcher_callback);
230 event_base_set(dispatcher_thread.base, &dispatcher_thread.notify_event);
232 if (event_add(&dispatcher_thread.notify_event, 0) == -1) {
233 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
234 "Can't monitor libevent notify pipe\n");
243 me->type = tap ? TAP : GENERAL;
244 me->base = event_init();
246 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
247 "Can't allocate event base\n");
252 event_set(&me->notify_event, me->notify[0],
253 EV_READ | EV_PERSIST,
254 tap ? libevent_tap_process : thread_libevent_process, me);
255 event_base_set(me->base, &me->notify_event);
257 if (event_add(&me->notify_event, 0) == -1) {
258 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
259 "Can't monitor libevent notify pipe\n");
264 me->new_conn_queue = malloc(
sizeof(
struct conn_queue));
265 if (me->new_conn_queue == NULL) {
266 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
267 "Failed to allocate memory for connection queue");
270 cq_init(me->new_conn_queue);
273 if ((pthread_mutex_init(&me->mutex, NULL) != 0)) {
274 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
275 "Failed to initialize mutex: %s\n",
280 me->suffix_cache = cache_create(
"suffix", SUFFIX_SIZE,
sizeof(
char*),
282 if (me->suffix_cache == NULL) {
283 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
284 "Failed to create suffix cache\n");
292 static void *worker_libevent(
void *arg) {
299 pthread_mutex_lock(&init_lock);
301 pthread_cond_signal(&init_cond);
302 pthread_mutex_unlock(&init_lock);
304 event_base_loop(me->base, 0);
308 int number_of_pending(
conn *c,
conn *list) {
310 for (; list; list = list->next) {
322 static void thread_libevent_process(
int fd,
short which,
void *arg) {
324 assert(me->type == GENERAL);
327 if (recv(fd, devnull,
sizeof(devnull), 0) == -1) {
329 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
330 "Can't read from libevent pipe: %s\n",
335 if (memcached_shutdown) {
336 event_base_loopbreak(me->base);
340 while ((item = cq_pop(me->new_conn_queue)) != NULL) {
341 conn *c = conn_new(item->sfd, item->init_state, item->event_flags,
342 item->read_buffer_size, item->transport, me->base,
345 if (IS_UDP(item->transport)) {
346 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
347 "Can't listen for events on UDP socket\n");
351 settings.extensions.logger->
log(EXTENSION_LOG_INFO, NULL,
352 "Can't listen for events on fd %d\n",
355 closesocket(item->sfd);
358 assert(c->thread == NULL);
364 pthread_mutex_lock(&me->mutex);
365 conn* pending = me->pending_io;
366 me->pending_io = NULL;
367 pthread_mutex_unlock(&me->mutex);
368 while (pending != NULL) {
370 assert(me == c->thread);
371 pending = pending->next;
373 register_event(c, 0);
381 while (c->state(c)) {
387 extern volatile rel_time_t current_time;
389 bool has_cycle(
conn *c) {
393 conn *slowNode, *fastNode1, *fastNode2;
394 slowNode = fastNode1 = fastNode2 = c;
395 while (slowNode && (fastNode1 = fastNode2->next) && (fastNode2 = fastNode1->next)) {
396 if (slowNode == fastNode1 || slowNode == fastNode2) {
399 slowNode = slowNode->next;
404 bool list_contains(
conn *haystack,
conn *needle) {
405 for (; haystack; haystack = haystack -> next) {
406 if (needle == haystack) {
418 if (haystack == needle) {
419 conn *rv = needle->next;
424 haystack->next = list_remove(haystack->next, needle);
429 size_t list_to_array(
conn **dest,
size_t max_items,
conn **l) {
431 for (; *l && n_items < max_items - 1; ++n_items) {
433 *l = dest[n_items]->next;
434 dest[n_items]->next = NULL;
435 dest[n_items]->list_state |= LIST_STATE_PROCESSING;
440 void enlist_conn(
conn *c,
conn **list) {
442 assert(list == &thr->pending_io || list == &thr->pending_close);
443 if ((c->list_state & LIST_STATE_PROCESSING) == 0) {
444 assert(!list_contains(thr->pending_close, c));
445 assert(!list_contains(thr->pending_io, c));
446 assert(c->next == NULL);
449 assert(list_contains(*list, c));
450 assert(!has_cycle(*list));
452 c->list_state |= (list == &thr->pending_io ?
453 LIST_STATE_REQ_PENDING_IO :
454 LIST_STATE_REQ_PENDING_CLOSE);
458 void finalize_list(
conn **list,
size_t items) {
459 for (
size_t i = 0; i < items; i++) {
460 list[
i]->list_state &= ~LIST_STATE_PROCESSING;
461 if (list[i]->sfd != INVALID_SOCKET) {
462 if (list[i]->list_state & LIST_STATE_REQ_PENDING_IO) {
463 enlist_conn(list[i], &list[i]->thread->pending_io);
464 }
else if (list[i]->list_state & LIST_STATE_REQ_PENDING_CLOSE) {
465 enlist_conn(list[i], &list[i]->thread->pending_close);
468 list[
i]->list_state = 0;
473 static void libevent_tap_process(
int fd,
short which,
void *arg) {
475 assert(me->type == TAP);
477 if (recv(fd, devnull,
sizeof(devnull), 0) == -1) {
479 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
480 "Can't read from libevent pipe: %s\n",
485 if (memcached_shutdown) {
486 event_base_loopbreak(me->base);
491 const size_t max_items = 256;
493 conn *pending_close[max_items];
494 size_t n_pending_close = 0;
496 if (me->pending_close && me->last_checked != current_time) {
497 assert(!has_cycle(me->pending_close));
498 me->last_checked = current_time;
500 n_pending_close = list_to_array(pending_close, max_items,
505 conn *pending_io[max_items];
506 size_t n_items = list_to_array(pending_io, max_items, &me->pending_io);
509 for (
size_t i = 0; i < n_items; ++
i) {
510 conn *c = pending_io[
i];
512 assert(c->thread == me);
514 LOCK_THREAD(c->thread);
515 assert(me == c->thread);
516 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
517 "Processing tap pending_io for %d\n", c->sfd);
520 register_event(c, NULL);
528 while (c->state(c)) {
534 if (n_pending_close > 0) {
535 for (
size_t i = 0; i < n_pending_close; ++
i) {
536 conn *ce = pending_close[
i];
537 if (ce->refcount == 1) {
538 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
539 "OK, time to nuke: %p\n",
541 assert(ce->next == NULL);
545 enlist_conn(ce, &me->pending_close);
552 finalize_list(pending_io, n_items);
553 finalize_list(pending_close, n_pending_close);
559 pthread_t tid = pthread_self();
560 return(tid.p == thr->thread_id.p && tid.x == thr->thread_id.x);
562 return pthread_self() == thr->thread_id;
566 void notify_io_complete(
const void *cookie, ENGINE_ERROR_CODE status)
568 if (cookie == NULL) {
569 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
570 "notify_io_complete called without a valid cookie (status %x)\n",
575 struct conn *
conn = (
struct conn *)cookie;
577 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
578 "Got notify from %d, status %x\n",
590 if (status == ENGINE_DISCONNECT && conn->thread == tap_thread) {
591 LOCK_THREAD(conn->thread);
592 if (conn->sfd != INVALID_SOCKET) {
593 unregister_event(conn);
594 safe_close(conn->sfd);
595 conn->sfd = INVALID_SOCKET;
598 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
599 "Immediate close of %p\n",
601 conn_set_state(conn, conn_immediate_close);
603 if (!is_thread_me(conn->thread)) {
605 notify_thread(conn->thread);
608 UNLOCK_THREAD(conn->thread);
619 if (thr == NULL || (conn->state == conn_closing ||
620 conn->state == conn_pending_close ||
621 conn->state == conn_immediate_close)) {
628 if (thr != conn->thread || !conn->ewouldblock) {
634 conn->aiostat = status;
639 if (status == ENGINE_DISCONNECT) {
640 conn->state = conn_closing;
642 thr->pending_io = list_remove(thr->pending_io, conn);
643 if (number_of_pending(conn, thr->pending_close) == 0) {
644 enlist_conn(conn, &thr->pending_close);
647 if (number_of_pending(conn, thr->pending_io) +
648 number_of_pending(conn, thr->pending_close) == 0) {
649 if (thr->pending_io == NULL) {
652 enlist_conn(conn, &thr->pending_io);
664 static int last_thread = -1;
671 void dispatch_conn_new(SOCKET sfd, STATE_FUNC init_state,
int event_flags,
672 int read_buffer_size,
enum network_transport transport) {
674 int tid = (last_thread + 1) %
settings.num_threads;
681 item->init_state = init_state;
682 item->event_flags = event_flags;
683 item->read_buffer_size = read_buffer_size;
684 item->transport = transport;
686 cq_push(thread->new_conn_queue, item);
688 MEMCACHED_CONN_DISPATCH(sfd, (uintptr_t)thread->thread_id);
689 notify_thread(thread);
695 int is_listen_thread() {
697 pthread_t tid = pthread_self();
698 return(tid.p == dispatcher_thread.thread_id.p && tid.x == dispatcher_thread.thread_id.x);
700 return pthread_self() == dispatcher_thread.thread_id;
704 void notify_dispatcher(
void) {
705 notify_thread(&dispatcher_thread);
711 pthread_mutex_lock(&stats_lock);
714 void STATS_UNLOCK() {
715 pthread_mutex_unlock(&stats_lock);
720 stats->get_misses = 0;
721 stats->delete_misses = 0;
722 stats->incr_misses = 0;
723 stats->decr_misses = 0;
724 stats->incr_hits = 0;
725 stats->decr_hits = 0;
726 stats->cas_misses = 0;
727 stats->bytes_written = 0;
728 stats->bytes_read = 0;
729 stats->cmd_flush = 0;
730 stats->conn_yields = 0;
731 stats->auth_cmds = 0;
732 stats->auth_errors = 0;
734 memset(stats->slab_stats, 0,
735 sizeof(
struct slab_stats) * MAX_NUMBER_OF_SLAB_CLASSES);
740 for (ii = 0; ii <
settings.num_threads; ++ii) {
741 pthread_mutex_lock(&thread_stats[ii].mutex);
742 threadlocal_stats_clear(&thread_stats[ii]);
743 pthread_mutex_unlock(&thread_stats[ii].mutex);
747 void threadlocal_stats_aggregate(
struct thread_stats *thread_stats,
struct thread_stats *
stats) {
749 for (ii = 0; ii <
settings.num_threads; ++ii) {
750 pthread_mutex_lock(&thread_stats[ii].mutex);
752 stats->cmd_get += thread_stats[ii].cmd_get;
753 stats->get_misses += thread_stats[ii].get_misses;
754 stats->delete_misses += thread_stats[ii].delete_misses;
755 stats->decr_misses += thread_stats[ii].decr_misses;
756 stats->incr_misses += thread_stats[ii].incr_misses;
757 stats->decr_hits += thread_stats[ii].decr_hits;
758 stats->incr_hits += thread_stats[ii].incr_hits;
759 stats->cas_misses += thread_stats[ii].cas_misses;
760 stats->bytes_read += thread_stats[ii].bytes_read;
761 stats->bytes_written += thread_stats[ii].bytes_written;
762 stats->cmd_flush += thread_stats[ii].cmd_flush;
763 stats->conn_yields += thread_stats[ii].conn_yields;
764 stats->auth_cmds += thread_stats[ii].auth_cmds;
765 stats->auth_errors += thread_stats[ii].auth_errors;
767 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
768 stats->slab_stats[sid].cmd_set +=
769 thread_stats[ii].slab_stats[sid].cmd_set;
770 stats->slab_stats[sid].get_hits +=
771 thread_stats[ii].slab_stats[sid].get_hits;
772 stats->slab_stats[sid].delete_hits +=
773 thread_stats[ii].slab_stats[sid].delete_hits;
774 stats->slab_stats[sid].cas_hits +=
775 thread_stats[ii].slab_stats[sid].cas_hits;
776 stats->slab_stats[sid].cas_badval +=
777 thread_stats[ii].slab_stats[sid].cas_badval;
780 pthread_mutex_unlock(&thread_stats[ii].mutex);
784 void slab_stats_aggregate(
struct thread_stats *stats,
struct slab_stats *out) {
789 out->delete_hits = 0;
793 for (sid = 0; sid < MAX_NUMBER_OF_SLAB_CLASSES; sid++) {
794 out->cmd_set += stats->slab_stats[sid].cmd_set;
795 out->get_hits += stats->slab_stats[sid].get_hits;
796 out->delete_hits += stats->slab_stats[sid].delete_hits;
797 out->cas_hits += stats->slab_stats[sid].cas_hits;
798 out->cas_badval += stats->slab_stats[sid].cas_badval;
808 void thread_init(
int nthr,
struct event_base *main_base,
809 void (*dispatcher_callback)(
int,
short,
void *)) {
813 pthread_mutex_init(&stats_lock, NULL);
814 pthread_mutex_init(&init_lock, NULL);
815 pthread_cond_init(&init_cond, NULL);
817 pthread_mutex_init(&cqi_freelist_lock, NULL);
822 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
823 "Can't allocate thread descriptors: %s",
827 thread_ids = calloc(nthreads,
sizeof(pthread_t));
829 perror(
"Can't allocate thread descriptors");
833 setup_dispatcher(main_base, dispatcher_callback);
835 for (i = 0; i < nthreads; i++) {
836 if (!create_notification_pipe(&threads[i])) {
839 threads[
i].index =
i;
841 setup_thread(&threads[i], i == (nthreads - 1));
845 for (i = 0; i < nthreads; i++) {
846 create_worker(worker_libevent, &threads[i], &thread_ids[i]);
847 threads[
i].thread_id = thread_ids[
i];
850 tap_thread = &threads[nthreads - 1];
853 pthread_mutex_lock(&init_lock);
854 while (init_count < nthreads) {
855 pthread_cond_wait(&init_cond, &init_lock);
857 pthread_mutex_unlock(&init_lock);
860 void threads_shutdown(
void)
862 for (
int ii = 0; ii < nthreads; ++ii) {
863 notify_thread(&threads[ii]);
864 pthread_join(thread_ids[ii], NULL);
866 for (
int ii = 0; ii < nthreads; ++ii) {
867 safe_close(threads[ii].notify[0]);
868 safe_close(threads[ii].notify[1]);
873 if (send(thread->notify[1],
"", 1, 0) != 1) {
874 if (thread == tap_thread) {
875 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
876 "Failed to notify TAP thread: %s",
879 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
880 "Failed to notify thread: %s",