17 #include "config_static.h"
19 #include "memcached/extension_loggers.h"
20 #include "utilities/engine_loader.h"
38 #define INNODB_MEMCACHED
40 static inline void item_set_cas(
const void *cookie, item *it, uint64_t cas) {
45 #define SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
46 thread_stats->slab_stats[info.clsid].slab_op++;
48 #define THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
49 thread_stats->thread_op++;
51 #define THREAD_GUTS2(conn, thread_stats, slab_op, thread_op) \
52 thread_stats->slab_op++; \
53 thread_stats->thread_op++;
55 #define SLAB_THREAD_GUTS(conn, thread_stats, slab_op, thread_op) \
56 SLAB_GUTS(conn, thread_stats, slab_op, thread_op) \
57 THREAD_GUTS(conn, thread_stats, slab_op, thread_op)
59 #define STATS_INCR1(GUTS, conn, slab_op, thread_op, key, nkey) { \
60 struct independent_stats *independent_stats = get_independent_stats(conn); \
61 struct thread_stats *thread_stats = \
62 &independent_stats->thread_stats[conn->thread->index]; \
63 topkeys_t *topkeys = independent_stats->topkeys; \
64 pthread_mutex_lock(&thread_stats->mutex); \
65 GUTS(conn, thread_stats, slab_op, thread_op); \
66 pthread_mutex_unlock(&thread_stats->mutex); \
67 TK(topkeys, slab_op, key, nkey, current_time); \
70 #define STATS_INCR(conn, op, key, nkey) \
71 STATS_INCR1(THREAD_GUTS, conn, op, op, key, nkey)
73 #define SLAB_INCR(conn, op, key, nkey) \
74 STATS_INCR1(SLAB_GUTS, conn, op, op, key, nkey)
76 #define STATS_TWO(conn, slab_op, thread_op, key, nkey) \
77 STATS_INCR1(THREAD_GUTS2, conn, slab_op, thread_op, key, nkey)
79 #define SLAB_TWO(conn, slab_op, thread_op, key, nkey) \
80 STATS_INCR1(SLAB_THREAD_GUTS, conn, slab_op, thread_op, key, nkey)
82 #define STATS_HIT(conn, op, key, nkey) \
83 SLAB_TWO(conn, op##_hits, cmd_##op, key, nkey)
85 #define STATS_MISS(conn, op, key, nkey) \
86 STATS_TWO(conn, op##_misses, cmd_##op, key, nkey)
88 #define STATS_NOKEY(conn, op) { \
89 struct thread_stats *thread_stats = \
90 get_thread_stats(conn); \
91 pthread_mutex_lock(&thread_stats->mutex); \
93 pthread_mutex_unlock(&thread_stats->mutex); \
96 #define STATS_NOKEY2(conn, op1, op2) { \
97 struct thread_stats *thread_stats = \
98 get_thread_stats(conn); \
99 pthread_mutex_lock(&thread_stats->mutex); \
100 thread_stats->op1++; \
101 thread_stats->op2++; \
102 pthread_mutex_unlock(&thread_stats->mutex); \
105 #define STATS_ADD(conn, op, amt) { \
106 struct thread_stats *thread_stats = \
107 get_thread_stats(conn); \
108 pthread_mutex_lock(&thread_stats->mutex); \
109 thread_stats->op += amt; \
110 pthread_mutex_unlock(&thread_stats->mutex); \
113 volatile sig_atomic_t memcached_shutdown;
123 volatile rel_time_t current_time;
128 static SOCKET new_socket(
struct addrinfo *ai);
129 static int try_read_command(
conn *c);
133 ENGINE_EVENT_TYPE
type,
134 EVENT_CALLBACK cb,
const void *cb_data);
135 enum try_read_result {
137 READ_NO_DATA_RECEIVED,
142 static enum try_read_result try_read_network(
conn *c);
143 static enum try_read_result try_read_udp(
conn *c);
146 static void stats_init(
void);
147 static void server_stats(
ADD_STAT add_stats,
conn *c,
bool aggregate);
148 static void process_stat_settings(
ADD_STAT add_stats,
void *c);
152 static void settings_init(
void);
155 static void event_handler(
const int fd,
const short which,
void *arg);
156 static void complete_nread(
conn *c);
157 static char *process_command(
conn *c,
char *command);
158 static void write_and_free(
conn *c,
char *
buf,
int bytes);
159 static int ensure_iov_space(
conn *c);
160 static int add_iov(
conn *c,
const void *
buf,
int len);
161 static int add_msghdr(
conn *c);
165 static void set_current_time(
void);
172 static time_t process_started;
175 static conn *listen_conn = NULL;
176 static int udp_socket[100];
177 static int num_udp_socket;
183 enum transmit_result {
190 static enum transmit_result transmit(
conn *c);
192 #define REALTIME_MAXDELTA 60*60*24*30
195 static void perform_callbacks(ENGINE_EVENT_TYPE
type,
200 h->cb(c, type, data, h->cb_data);
209 static rel_time_t realtime(
const time_t exptime) {
212 if (exptime == 0)
return 0;
214 if (exptime > REALTIME_MAXDELTA) {
221 if (exptime <= process_started)
222 return (rel_time_t)1;
223 return (rel_time_t)(exptime - process_started);
225 return (rel_time_t)(exptime + current_time);
232 static time_t abstime(
const rel_time_t exptime)
234 return process_started + exptime;
237 static void stats_init(
void) {
238 stats.daemon_conns = 0;
239 stats.rejected_conns = 0;
245 static void stats_reset(
const void *cookie) {
246 struct conn *
conn = (
struct conn*)cookie;
248 stats.rejected_conns = 0;
249 stats.total_conns = 0;
250 stats_prefix_clear();
252 threadlocal_stats_reset(get_independent_stats(conn)->
thread_stats);
256 static void settings_init(
void) {
263 settings.maxbytes = 64 * 1024 * 1024;
276 settings.reqs_per_event = DEFAULT_REQS_PER_EVENT;
278 settings.binding_protocol = negotiating_prot;
279 settings.item_size_max = 1024 * 1024;
282 settings.extensions.logger = get_stderr_logger();
290 static int add_msghdr(conn *c)
296 if (c->msgsize == c->msgused) {
297 msg = realloc(c->msglist, c->msgsize * 2 *
sizeof(
struct msghdr));
304 msg = c->msglist + c->msgused;
308 memset(msg, 0,
sizeof(
struct msghdr));
310 msg->msg_iov = &c->iov[c->iovused];
312 if (c->request_addr_size > 0) {
313 msg->msg_name = &c->request_addr;
314 msg->msg_namelen = c->request_addr_size;
320 if (IS_UDP(c->transport)) {
322 return add_iov(c, NULL, UDP_HEADER_SIZE);
328 static const char *prot_text(
enum protocol prot) {
329 char *rv =
"unknown";
337 case negotiating_prot:
338 rv =
"auto-negotiate";
345 pthread_mutex_t mutex;
348 uint64_t num_disable;
351 static bool is_listen_disabled(
void) {
353 pthread_mutex_lock(&listen_state.mutex);
354 ret = listen_state.disabled;
355 pthread_mutex_unlock(&listen_state.mutex);
359 static uint64_t get_listen_disabled_num(
void) {
361 pthread_mutex_lock(&listen_state.mutex);
362 ret = listen_state.num_disable;
363 pthread_mutex_unlock(&listen_state.mutex);
367 static void disable_listen(
void) {
368 pthread_mutex_lock(&listen_state.mutex);
369 listen_state.disabled =
true;
370 listen_state.count = 10;
371 ++listen_state.num_disable;
372 pthread_mutex_unlock(&listen_state.mutex);
375 for (next = listen_conn; next; next = next->next) {
376 update_event(next, 0);
377 if (listen(next->sfd, 1) != 0) {
378 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
385 void safe_close(SOCKET sfd) {
386 if (sfd != INVALID_SOCKET) {
388 while ((rval = closesocket(sfd)) == SOCKET_ERROR &&
389 (errno == EINTR || errno == EAGAIN)) {
393 if (rval == SOCKET_ERROR) {
394 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
395 "Failed to close socket %d (%s)!!\n", (
int)sfd,
402 if (is_listen_disabled()) {
426 static bool conn_reset_buffersize(conn *c) {
429 if (c->
rsize != DATA_BUFFER_SIZE) {
430 void *ptr = malloc(DATA_BUFFER_SIZE);
434 c->
rsize = DATA_BUFFER_SIZE;
440 if (c->wsize != DATA_BUFFER_SIZE) {
441 void *ptr = malloc(DATA_BUFFER_SIZE);
445 c->wsize = DATA_BUFFER_SIZE;
507 static int conn_constructor(
void *buffer,
void *unused1,
int unused2) {
508 (void)unused1; (void)unused2;
511 memset(c, 0,
sizeof(*c));
512 MEMCACHED_CONN_CREATE(c);
514 if (!conn_reset_buffersize(c)) {
521 settings.extensions.logger->
log(EXTENSION_LOG_WARNING,
523 "Failed to allocate buffers for connection\n");
528 stats.conn_structs++;
540 static void conn_destructor(
void *buffer,
void *unused) {
551 stats.conn_structs--;
555 conn *conn_new(
const SOCKET sfd, STATE_FUNC init_state,
556 const int event_flags,
557 const int read_buffer_size,
enum network_transport transport,
559 conn *c = cache_alloc(conn_cache);
564 assert(c->thread == NULL);
566 if (c->
rsize < read_buffer_size) {
567 void *mem = malloc(read_buffer_size);
569 c->
rsize = read_buffer_size;
573 assert(c->thread == NULL);
574 cache_free(conn_cache, c);
579 c->transport = transport;
580 c->protocol =
settings.binding_protocol;
586 c->request_addr_size =
sizeof(c->request_addr);
588 c->request_addr_size = 0;
592 if (init_state == conn_listening) {
593 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
594 "<%d server listening (%s)\n", sfd,
595 prot_text(c->protocol));
596 }
else if (IS_UDP(transport)) {
597 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
598 "<%d server listening (udp)\n", sfd);
599 }
else if (c->protocol == negotiating_prot) {
600 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
601 "<%d new auto-negotiating client connection\n",
603 }
else if (c->protocol == ascii_prot) {
604 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
605 "<%d new ascii client connection.\n", sfd);
606 }
else if (c->protocol == binary_prot) {
607 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
608 "<%d new binary client connection.\n", sfd);
610 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
611 "<%d new unknown (%d) client connection\n",
618 c->state = init_state;
622 c->
rbytes = c->wbytes = 0;
627 c->suffixcurr = c->suffixlist;
637 c->write_and_free = 0;
642 event_set(&c->event, sfd, event_flags, event_handler, (
void *)c);
643 event_base_set(base, &c->event);
644 c->ev_flags = event_flags;
646 if (!register_event(c, timeout)) {
647 assert(c->thread == NULL);
648 cache_free(conn_cache, c);
656 c->aiostat = ENGINE_SUCCESS;
657 c->ewouldblock =
false;
660 MEMCACHED_CONN_ALLOCATE(c->sfd);
662 perform_callbacks(ON_CONNECT, NULL, c);
667 static void conn_cleanup(conn *c) {
676 for (; c->ileft > 0; c->ileft--,c->icurr++) {
681 if (c->suffixleft != 0) {
682 for (; c->suffixleft > 0; c->suffixleft--, c->suffixcurr++) {
683 cache_free(c->thread->suffix_cache, *(c->suffixcurr));
687 if (c->write_and_free) {
688 free(c->write_and_free);
689 c->write_and_free = 0;
693 sasl_dispose(&c->sasl_conn);
697 if (c->engine_storage) {
702 c->engine_storage = NULL;
703 c->tap_iterator = NULL;
705 assert(c->next == NULL);
707 c->sfd = INVALID_SOCKET;
708 c->tap_nack_mode =
false;
711 void conn_close(conn *c) {
713 assert(c->sfd == INVALID_SOCKET);
720 LOCK_THREAD(c->thread);
722 if (
settings.verbose > 1 && list_contains(c->thread->pending_io, c)) {
723 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
724 "Current connection was in the pending-io list.. Nuking it\n");
726 c->thread->pending_io = list_remove(c->thread->pending_io, c);
727 c->thread->pending_close = list_remove(c->thread->pending_close, c);
728 UNLOCK_THREAD(c->thread);
737 conn_reset_buffersize(c);
738 assert(c->thread == NULL);
739 cache_free(conn_cache, c);
750 static void conn_shrink(conn *c) {
753 if (IS_UDP(c->transport))
762 newbuf = (
char *)realloc((
void *)c->
rbuf, DATA_BUFFER_SIZE);
766 c->
rsize = DATA_BUFFER_SIZE;
772 if (c->isize > ITEM_LIST_HIGHWAT) {
773 item **newbuf = (item**) realloc((
void *)c->ilist,
ITEM_LIST_INITIAL *
sizeof(c->ilist[0]));
781 if (c->msgsize > MSG_LIST_HIGHWAT) {
790 if (c->iovsize > IOV_LIST_HIGHWAT) {
803 const char *state_text(STATE_FUNC state) {
804 if (state == conn_listening) {
805 return "conn_listening";
806 }
else if (state == conn_new_cmd) {
807 return "conn_new_cmd";
808 }
else if (state == conn_waiting) {
809 return "conn_waiting";
810 }
else if (state == conn_read) {
812 }
else if (state == conn_parse_cmd) {
813 return "conn_parse_cmd";
814 }
else if (state == conn_write) {
816 }
else if (state == conn_nread) {
818 }
else if (state == conn_swallow) {
819 return "conn_swallow";
820 }
else if (state == conn_closing) {
821 return "conn_closing";
822 }
else if (state == conn_mwrite) {
823 return "conn_mwrite";
824 }
else if (state == conn_ship_log) {
825 return "conn_ship_log";
826 }
else if (state == conn_add_tap_client) {
827 return "conn_add_tap_client";
828 }
else if (state == conn_setup_tap_stream) {
829 return "conn_setup_tap_stream";
830 }
else if (state == conn_pending_close) {
831 return "conn_pending_close";
832 }
else if (state == conn_immediate_close) {
833 return "conn_immediate_close";
844 void conn_set_state(conn *c, STATE_FUNC state) {
847 if (state != c->state) {
854 if (c->thread == tap_thread) {
855 if (state == conn_waiting) {
857 state = conn_ship_log;
861 if (
settings.verbose > 2 || c->state == conn_closing
862 || c->state == conn_add_tap_client) {
863 settings.extensions.logger->
log(EXTENSION_LOG_DETAIL, c,
864 "%d: going from %s to %s\n",
865 c->sfd, state_text(c->state),
871 if (state == conn_write || state == conn_mwrite) {
872 MEMCACHED_PROCESS_COMMAND_END(c->sfd, c->
wbuf, c->wbytes);
883 static int ensure_iov_space(conn *c) {
886 if (c->iovused >= c->iovsize) {
888 struct iovec *new_iov = (
struct iovec *)realloc(c->iov,
889 (c->iovsize * 2) *
sizeof(
struct iovec));
896 for (i = 0, iovnum = 0; i < c->msgused; i++) {
897 c->msglist[
i].msg_iov = &c->iov[iovnum];
898 iovnum += c->msglist[
i].msg_iovlen;
913 static int add_iov(conn *c,
const void *
buf,
int len) {
921 m = &c->msglist[c->msgused - 1];
927 limit_to_mtu = IS_UDP(c->transport) || (1 == c->msgused);
930 if (m->msg_iovlen == IOV_MAX ||
931 (limit_to_mtu && c->msgbytes >= UDP_MAX_PAYLOAD_SIZE)) {
933 m = &c->msglist[c->msgused - 1];
936 if (ensure_iov_space(c) != 0)
940 if (limit_to_mtu && len + c->msgbytes > UDP_MAX_PAYLOAD_SIZE) {
941 leftover = len + c->msgbytes - UDP_MAX_PAYLOAD_SIZE;
947 m = &c->msglist[c->msgused - 1];
948 m->msg_iov[m->msg_iovlen].iov_base = (
void *)buf;
949 m->msg_iov[m->msg_iovlen].iov_len = len;
955 buf = ((
char *)buf) + len;
957 }
while (leftover > 0);
966 static int build_udp_headers(conn *c) {
972 if (c->msgused > c->hdrsize) {
975 new_hdrbuf = realloc(c->hdrbuf, c->msgused * 2 * UDP_HEADER_SIZE);
977 new_hdrbuf = malloc(c->msgused * 2 * UDP_HEADER_SIZE);
980 c->hdrbuf = (
unsigned char *)new_hdrbuf;
981 c->hdrsize = c->msgused * 2;
985 for (i = 0; i < c->msgused; i++) {
986 c->msglist[
i].msg_iov[0].iov_base = (
void*)hdr;
987 c->msglist[
i].msg_iov[0].iov_len = UDP_HEADER_SIZE;
988 *hdr++ = c->request_id / 256;
989 *hdr++ = c->request_id % 256;
992 *hdr++ = c->msgused / 256;
993 *hdr++ = c->msgused % 256;
996 assert((
void *) hdr == (caddr_t)c->msglist[i].msg_iov[0].iov_base + UDP_HEADER_SIZE);
1003 static void out_string(conn *c,
const char *str) {
1010 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
1011 ">%d NOREPLY %s\n", c->sfd, str);
1014 if (c->sbytes > 0) {
1015 conn_set_state(c, conn_swallow);
1017 conn_set_state(c, conn_new_cmd);
1023 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
1024 ">%d %s\n", c->sfd, str);
1034 if ((len + 2) > c->wsize) {
1036 str =
"SERVER_ERROR output line too long";
1040 memcpy(c->
wbuf, str, len);
1041 memcpy(c->
wbuf + len,
"\r\n", 2);
1042 c->wbytes = len + 2;
1045 conn_set_state(c, conn_write);
1047 if (c->sbytes > 0) {
1060 static void complete_update_ascii(conn *c) {
1067 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
1068 "%d: Failed to get item info\n",
1070 out_string(c,
"SERVER_ERROR failed to get item details");
1075 ENGINE_ERROR_CODE ret = c->aiostat;
1076 c->aiostat = ENGINE_SUCCESS;
1077 if (ret == ENGINE_SUCCESS) {
1082 #ifdef ENABLE_DTRACE
1083 switch (c->store_op) {
1085 MEMCACHED_COMMAND_ADD(c->sfd, info.
key, info.
nkey,
1086 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1088 case OPERATION_REPLACE:
1089 MEMCACHED_COMMAND_REPLACE(c->sfd, info.
key, info.
nkey,
1090 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1092 case OPERATION_APPEND:
1093 MEMCACHED_COMMAND_APPEND(c->sfd, info.
key, info.
nkey,
1094 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1096 case OPERATION_PREPEND:
1097 MEMCACHED_COMMAND_PREPEND(c->sfd, info.
key, info.
nkey,
1098 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1101 MEMCACHED_COMMAND_SET(c->sfd, info.
key, info.
nkey,
1102 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1105 MEMCACHED_COMMAND_CAS(c->sfd, info.
key, info.
nkey, info.
nbytes, c->cas);
1111 case ENGINE_SUCCESS:
1112 out_string(c,
"STORED");
1114 case ENGINE_KEY_EEXISTS:
1115 out_string(c,
"EXISTS");
1117 case ENGINE_KEY_ENOENT:
1118 out_string(c,
"NOT_FOUND");
1120 case ENGINE_NOT_STORED:
1121 out_string(c,
"NOT_STORED");
1123 case ENGINE_DISCONNECT:
1124 c->state = conn_closing;
1126 case ENGINE_ENOTSUP:
1127 out_string(c,
"SERVER_ERROR not supported");
1130 out_string(c,
"SERVER_ERROR out of memory");
1132 case ENGINE_TMPFAIL:
1133 out_string(c,
"SERVER_ERROR temporary failure");
1136 out_string(c,
"CLIENT_ERROR invalid arguments");
1139 out_string(c,
"CLIENT_ERROR value too big");
1141 case ENGINE_EACCESS:
1142 out_string(c,
"CLIENT_ERROR access control violation");
1144 case ENGINE_NOT_MY_VBUCKET:
1145 out_string(c,
"SERVER_ERROR not my vbucket");
1148 out_string(c,
"SERVER_ERROR failure");
1150 case ENGINE_EWOULDBLOCK:
1151 c->ewouldblock =
true;
1153 case ENGINE_WANT_MORE:
1155 c->state = conn_closing;
1159 out_string(c,
"SERVER_ERROR internal");
1162 if (c->store_op == OPERATION_CAS) {
1164 case ENGINE_SUCCESS:
1165 SLAB_INCR(c, cas_hits, info.
key, info.
nkey);
1167 case ENGINE_KEY_EEXISTS:
1168 SLAB_INCR(c, cas_badval, info.
key, info.
nkey);
1170 case ENGINE_KEY_ENOENT:
1171 STATS_NOKEY(c, cas_misses);
1177 SLAB_INCR(c, cmd_set, info.
key, info.
nkey);
1180 if (!c->ewouldblock) {
1190 static void* binary_get_request(conn *c) {
1191 char *ret = c->
rcurr;
1192 ret -= (
sizeof(c->binary_header) + c->binary_header.request.keylen +
1193 c->binary_header.request.extlen);
1195 assert(ret >= c->
rbuf);
1202 static char* binary_get_key(conn *c) {
1203 return c->
rcurr - (c->binary_header.request.keylen);
1219 static ssize_t key_to_printable_buffer(
char *dest,
size_t destsz,
1220 int client,
bool from_client,
1225 ssize_t nw = snprintf(dest, destsz,
"%c%d %s ", from_client ?
'>' :
'<',
1231 char *ptr = dest + nw;
1233 if (nkey > destsz) {
1237 for (ssize_t ii = 0; ii < nkey; ++ii, ++key, ++ptr) {
1238 if (isgraph(*key)) {
1261 static ssize_t bytes_to_output_string(
char *dest,
size_t destsz,
1262 int client,
bool from_client,
1267 ssize_t nw = snprintf(dest, destsz,
"%c%d %s", from_client ?
'>' :
'<',
1274 for (ssize_t ii = 0; ii <
size; ++ii) {
1276 if ((nw = snprintf(dest + offset, destsz - offset,
"\n%c%d ",
1277 from_client ?
'>' :
'<', client)) == -1) {
1282 if ((nw = snprintf(dest + offset, destsz - offset,
1283 " 0x%02x", (
unsigned char)data[ii])) == -1) {
1289 if ((nw = snprintf(dest + offset, destsz - offset,
"\n")) == -1) {
1296 static void add_bin_header(conn *c, uint16_t err, uint8_t hdr_len, uint16_t key_len, uint32_t body_len) {
1304 if (add_msghdr(c) != 0) {
1306 out_string(c,
"SERVER_ERROR out of memory");
1312 header->response.magic = (uint8_t)PROTOCOL_BINARY_RES;
1313 header->response.opcode = c->binary_header.request.opcode;
1314 header->response.keylen = (uint16_t)htons(key_len);
1316 header->response.extlen = (uint8_t)hdr_len;
1317 header->response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES;
1318 header->response.status = (uint16_t)htons(err);
1320 header->response.bodylen = htonl(body_len);
1321 header->response.opaque = c->opaque;
1322 header->response.cas = htonll(c->cas);
1326 if (bytes_to_output_string(buffer,
sizeof(buffer), c->sfd,
false,
1327 "Writing bin response:",
1328 (
const char*)header->bytes,
1329 sizeof(header->bytes)) != -1) {
1330 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
1335 add_iov(c, c->
wbuf,
sizeof(header->response));
1348 case ENGINE_SUCCESS:
1349 return PROTOCOL_BINARY_RESPONSE_SUCCESS;
1350 case ENGINE_KEY_ENOENT:
1351 return PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1352 case ENGINE_KEY_EEXISTS:
1353 return PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1355 return PROTOCOL_BINARY_RESPONSE_ENOMEM;
1356 case ENGINE_TMPFAIL:
1357 return PROTOCOL_BINARY_RESPONSE_ETMPFAIL;
1358 case ENGINE_NOT_STORED:
1359 return PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1361 return PROTOCOL_BINARY_RESPONSE_EINVAL;
1362 case ENGINE_ENOTSUP:
1363 return PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED;
1365 return PROTOCOL_BINARY_RESPONSE_E2BIG;
1366 case ENGINE_NOT_MY_VBUCKET:
1367 return PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET;
1369 ret = PROTOCOL_BINARY_RESPONSE_EINTERNAL;
1377 char buffer[1024] = { [
sizeof(buffer) - 1] =
'\0' };
1380 case PROTOCOL_BINARY_RESPONSE_SUCCESS:
1383 case PROTOCOL_BINARY_RESPONSE_ENOMEM:
1384 len = snprintf(buffer,
sizeof(buffer),
"Out of memory");
1386 case PROTOCOL_BINARY_RESPONSE_ETMPFAIL:
1387 len = snprintf(buffer,
sizeof(buffer),
"Temporary failure");
1389 case PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND:
1390 len = snprintf(buffer,
sizeof(buffer),
"Unknown command");
1392 case PROTOCOL_BINARY_RESPONSE_KEY_ENOENT:
1393 len = snprintf(buffer,
sizeof(buffer),
"Not found");
1395 case PROTOCOL_BINARY_RESPONSE_EINVAL:
1396 len = snprintf(buffer,
sizeof(buffer),
"Invalid arguments");
1398 case PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS:
1399 len = snprintf(buffer,
sizeof(buffer),
"Data exists for key");
1401 case PROTOCOL_BINARY_RESPONSE_E2BIG:
1402 len = snprintf(buffer,
sizeof(buffer),
"Too large");
1404 case PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL:
1405 len = snprintf(buffer,
sizeof(buffer),
1406 "Non-numeric server-side value for incr or decr");
1408 case PROTOCOL_BINARY_RESPONSE_NOT_STORED:
1409 len = snprintf(buffer,
sizeof(buffer),
"Not stored");
1411 case PROTOCOL_BINARY_RESPONSE_AUTH_ERROR:
1412 len = snprintf(buffer,
sizeof(buffer),
"Auth failure");
1414 case PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED:
1415 len = snprintf(buffer,
sizeof(buffer),
"Not supported");
1417 case PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET:
1418 len = snprintf(buffer,
sizeof(buffer),
1419 "I'm not responsible for this vbucket");
1423 len = snprintf(buffer,
sizeof(buffer),
"UNHANDLED ERROR (%d)", err);
1424 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
1425 ">%d UNHANDLED ERROR: %d\n", c->sfd, err);
1431 sizeof(buffer) - len - 3);
1434 memcpy(buffer + len,
": ", 2);
1439 if (err != PROTOCOL_BINARY_RESPONSE_SUCCESS &&
settings.verbose > 1) {
1440 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
1441 ">%d Writing an error: %s\n", c->sfd,
1445 add_bin_header(c, err, 0, 0, len);
1447 add_iov(c, buffer, len);
1449 conn_set_state(c, conn_mwrite);
1451 c->sbytes = swallow;
1459 static void write_bin_response(conn *c,
void *d,
int hlen,
int keylen,
int dlen) {
1460 if (!c->noreply || c->cmd == PROTOCOL_BINARY_CMD_GET ||
1461 c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1462 add_bin_header(c, 0, hlen, keylen, dlen);
1464 add_iov(c, d, dlen);
1466 conn_set_state(c, conn_mwrite);
1469 conn_set_state(c, conn_new_cmd);
1474 static void complete_incr_bin(conn *c) {
1479 assert(c->wsize >=
sizeof(*rsp));
1482 uint64_t delta = ntohll(req->message.body.delta);
1483 uint64_t initial = ntohll(req->message.body.initial);
1484 rel_time_t expiration = ntohl(req->message.body.expiration);
1485 char *key = binary_get_key(c);
1486 size_t nkey = c->binary_header.request.keylen;
1487 bool incr = (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT ||
1488 c->cmd == PROTOCOL_BINARY_CMD_INCREMENTQ);
1493 nw = key_to_printable_buffer(buffer,
sizeof(buffer), c->sfd,
true,
1494 incr ?
"INCR" :
"DECR", key, nkey);
1496 if (snprintf(buffer + nw,
sizeof(buffer) - nw,
1497 " %" PRIu64
", %" PRIu64
", %" PRIu64
"\n",
1498 delta, initial, (uint64_t)expiration) != -1) {
1499 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
"%s",
1505 ENGINE_ERROR_CODE ret = c->aiostat;
1506 c->aiostat = ENGINE_SUCCESS;
1507 if (ret == ENGINE_SUCCESS) {
1510 req->message.body.expiration != 0xffffffff,
1511 delta, initial, expiration,
1513 &rsp->message.body.value,
1514 c->binary_header.request.vbucket);
1518 case ENGINE_SUCCESS:
1519 rsp->message.body.value = htonll(rsp->message.body.value);
1520 write_bin_response(c, &rsp->message.body, 0, 0,
1521 sizeof (rsp->message.body.value));
1523 STATS_INCR(c, incr_hits, key, nkey);
1525 STATS_INCR(c, decr_hits, key, nkey);
1528 case ENGINE_KEY_EEXISTS:
1529 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1531 case ENGINE_KEY_ENOENT:
1532 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1533 if (c->cmd == PROTOCOL_BINARY_CMD_INCREMENT) {
1534 STATS_INCR(c, incr_misses, key, nkey);
1536 STATS_INCR(c, decr_misses, key, nkey);
1540 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1542 case ENGINE_TMPFAIL:
1543 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1546 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_DELTA_BADVAL, 0);
1548 case ENGINE_NOT_STORED:
1549 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_STORED, 0);
1551 case ENGINE_DISCONNECT:
1552 c->state = conn_closing;
1554 case ENGINE_ENOTSUP:
1555 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1557 case ENGINE_NOT_MY_VBUCKET:
1558 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1560 case ENGINE_EWOULDBLOCK:
1561 c->ewouldblock =
true;
1568 static void complete_update_bin(conn *c) {
1576 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
1577 "%d: Failed to get item info\n",
1579 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1583 ENGINE_ERROR_CODE ret = c->aiostat;
1584 c->aiostat = ENGINE_SUCCESS;
1585 if (ret == ENGINE_SUCCESS) {
1587 it, &c->cas, c->store_op,
1588 c->binary_header.request.vbucket);
1591 #ifdef ENABLE_DTRACE
1594 MEMCACHED_COMMAND_ADD(c->sfd, info.
key, info.
nkey,
1595 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1597 case OPERATION_REPLACE:
1598 MEMCACHED_COMMAND_REPLACE(c->sfd, info.
key, info.
nkey,
1599 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1601 case OPERATION_APPEND:
1602 MEMCACHED_COMMAND_APPEND(c->sfd, info.
key, info.
nkey,
1603 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1605 case OPERATION_PREPEND:
1606 MEMCACHED_COMMAND_PREPEND(c->sfd, info.
key, info.
nkey,
1607 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1610 MEMCACHED_COMMAND_SET(c->sfd, info.
key, info.
nkey,
1611 (ret == ENGINE_SUCCESS) ? info.
nbytes : -1, c->cas);
1617 case ENGINE_SUCCESS:
1619 write_bin_response(c, NULL, 0, 0, 0);
1621 case ENGINE_KEY_EEXISTS:
1622 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
1624 case ENGINE_KEY_ENOENT:
1625 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1628 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1630 case ENGINE_TMPFAIL:
1631 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1633 case ENGINE_EWOULDBLOCK:
1634 c->ewouldblock =
true;
1636 case ENGINE_DISCONNECT:
1637 c->state = conn_closing;
1639 case ENGINE_ENOTSUP:
1640 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1642 case ENGINE_NOT_MY_VBUCKET:
1643 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1646 if (c->store_op == OPERATION_ADD) {
1647 eno = PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS;
1648 }
else if(c->store_op == OPERATION_REPLACE) {
1649 eno = PROTOCOL_BINARY_RESPONSE_KEY_ENOENT;
1651 eno = PROTOCOL_BINARY_RESPONSE_NOT_STORED;
1653 write_bin_packet(c, eno, 0);
1656 if (c->store_op == OPERATION_CAS) {
1658 case ENGINE_SUCCESS:
1659 SLAB_INCR(c, cas_hits, info.
key, info.
nkey);
1661 case ENGINE_KEY_EEXISTS:
1662 SLAB_INCR(c, cas_badval, info.
key, info.
nkey);
1664 case ENGINE_KEY_ENOENT:
1665 STATS_NOKEY(c, cas_misses);
1671 SLAB_INCR(c, cmd_set, info.
key, info.
nkey);
1674 if (!c->ewouldblock) {
1681 static void process_bin_get(conn *c) {
1685 char* key = binary_get_key(c);
1686 size_t nkey = c->binary_header.request.keylen;
1690 if (key_to_printable_buffer(buffer,
sizeof(buffer), c->sfd,
true,
1691 "GET", key, nkey) != -1) {
1692 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
"%s\n",
1697 ENGINE_ERROR_CODE ret = c->aiostat;
1698 c->aiostat = ENGINE_SUCCESS;
1699 if (ret == ENGINE_SUCCESS) {
1701 c->binary_header.request.vbucket);
1709 case ENGINE_SUCCESS:
1712 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
1713 "%d: Failed to get item info\n",
1715 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
1720 bodylen =
sizeof(rsp->message.body) + info.
nbytes;
1722 STATS_HIT(c,
get, key, nkey);
1724 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1728 add_bin_header(c, 0,
sizeof(rsp->message.body), keylen, bodylen);
1729 rsp->message.header.response.cas = htonll(info.cas);
1732 rsp->message.body.flags = info.
flags;
1733 add_iov(c, &rsp->message.body,
sizeof(rsp->message.body));
1735 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1736 add_iov(c, info.
key, nkey);
1739 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
1740 conn_set_state(c, conn_mwrite);
1744 case ENGINE_KEY_ENOENT:
1745 STATS_MISS(c,
get, key, nkey);
1747 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
1750 conn_set_state(c, conn_new_cmd);
1752 if (c->cmd == PROTOCOL_BINARY_CMD_GETK) {
1754 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT,
1756 memcpy(ofs, key, nkey);
1757 add_iov(c, ofs, nkey);
1758 conn_set_state(c, conn_mwrite);
1760 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1764 case ENGINE_EWOULDBLOCK:
1765 c->ewouldblock =
true;
1767 case ENGINE_DISCONNECT:
1768 c->state = conn_closing;
1770 case ENGINE_TMPFAIL:
1772 case ENGINE_ENOTSUP:
1773 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1775 case ENGINE_NOT_MY_VBUCKET:
1776 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
1780 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
1781 "Unknown error code: %d\n", ret);
1785 if (
settings.detail_enabled && ret != ENGINE_EWOULDBLOCK) {
1786 stats_prefix_record_get(key, nkey, ret == ENGINE_SUCCESS);
1790 static void append_bin_stats(
const char *key,
const uint16_t klen,
1791 const char *val,
const uint32_t vlen,
1793 char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1794 uint32_t bodylen = klen + vlen;
1796 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
1797 .response.opcode = PROTOCOL_BINARY_CMD_STAT,
1798 .response.keylen = (uint16_t)htons(klen),
1799 .response.datatype = (uint8_t)PROTOCOL_BINARY_RAW_BYTES,
1800 .response.bodylen = htonl(bodylen),
1801 .response.opaque = c->opaque
1804 memcpy(buf, header.bytes,
sizeof(header.response));
1805 buf +=
sizeof(header.response);
1808 memcpy(buf, key, klen);
1812 memcpy(buf, val, vlen);
1816 c->dynamic_buffer.offset +=
sizeof(header.response) + bodylen;
1824 static void append_ascii_stats(
const char *key,
const uint16_t klen,
1825 const char *val,
const uint32_t vlen,
1827 char *pos = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
1828 uint32_t nbytes = 5;
1830 if (klen == 0 && vlen == 0) {
1831 memcpy(pos,
"END\r\n", 5);
1833 memcpy(pos,
"STAT ", 5);
1834 memcpy(pos + nbytes, key, klen);
1839 memcpy(pos + nbytes, val, vlen);
1842 memcpy(pos + nbytes,
"\r\n", 2);
1846 c->dynamic_buffer.offset += nbytes;
1849 static bool grow_dynamic_buffer(conn *c,
size_t needed) {
1850 size_t nsize = c->dynamic_buffer.size;
1851 size_t available = nsize - c->dynamic_buffer.offset;
1855 if (c->dynamic_buffer.buffer == NULL) {
1857 available = c->dynamic_buffer.size = c->dynamic_buffer.offset = 0;
1860 while (needed > available) {
1863 available = nsize - c->dynamic_buffer.offset;
1866 if (nsize != c->dynamic_buffer.size) {
1867 char *ptr = realloc(c->dynamic_buffer.buffer, nsize);
1869 c->dynamic_buffer.buffer = ptr;
1870 c->dynamic_buffer.size = nsize;
1879 static void append_stats(
const char *key,
const uint16_t klen,
1880 const char *val,
const uint32_t vlen,
1884 if (klen == 0 && vlen > 0) {
1888 conn *c = (conn*)cookie;
1890 if (c->protocol == binary_prot) {
1892 if (!grow_dynamic_buffer(c, needed)) {
1895 append_bin_stats(key, klen, val, vlen, c);
1897 size_t needed = vlen + klen + 10;
1898 if (!grow_dynamic_buffer(c, needed)) {
1901 append_ascii_stats(key, klen, val, vlen, c);
1904 assert(c->dynamic_buffer.offset <= c->dynamic_buffer.size);
1907 static void process_bin_stat(conn *c) {
1908 char *subcommand = binary_get_key(c);
1909 size_t nkey = c->binary_header.request.keylen;
1913 if (key_to_printable_buffer(buffer,
sizeof(buffer), c->sfd,
true,
1914 "STATS", subcommand, nkey) != -1) {
1915 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
"%s\n",
1920 ENGINE_ERROR_CODE ret = c->aiostat;
1921 c->aiostat = ENGINE_SUCCESS;
1922 c->ewouldblock =
false;
1924 if (ret == ENGINE_SUCCESS) {
1928 if (ret == ENGINE_SUCCESS) {
1929 server_stats(&append_stats, c,
false);
1931 }
else if (strncmp(subcommand,
"reset", 5) == 0) {
1934 }
else if (strncmp(subcommand,
"settings", 8) == 0) {
1935 process_stat_settings(&append_stats, c);
1936 }
else if (strncmp(subcommand,
"detail", 6) == 0) {
1937 char *subcmd_pos = subcommand + 6;
1939 if (strncmp(subcmd_pos,
" dump", 5) == 0) {
1941 char *dump_buf = stats_prefix_dump(&len);
1942 if (dump_buf == NULL || len <= 0) {
1943 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1946 append_stats(
"detailed", strlen(
"detailed"), dump_buf, len, c);
1949 }
else if (strncmp(subcmd_pos,
" on", 3) == 0) {
1951 }
else if (strncmp(subcmd_pos,
" off", 4) == 0) {
1954 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1958 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1961 }
else if (strncmp(subcommand,
"aggregate", 9) == 0) {
1962 server_stats(&append_stats, c,
true);
1963 }
else if (strncmp(subcommand,
"topkeys", 7) == 0) {
1964 topkeys_t *tk = get_independent_stats(c)->topkeys;
1966 topkeys_stats(tk, c, current_time, append_stats);
1968 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1979 case ENGINE_SUCCESS:
1980 append_stats(NULL, 0, NULL, 0, c);
1981 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
1982 c->dynamic_buffer.buffer = NULL;
1985 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, 0);
1987 case ENGINE_TMPFAIL:
1988 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ETMPFAIL, 0);
1990 case ENGINE_KEY_ENOENT:
1991 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
1993 case ENGINE_DISCONNECT:
1994 c->state = conn_closing;
1996 case ENGINE_ENOTSUP:
1997 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
1999 case ENGINE_EWOULDBLOCK:
2000 c->ewouldblock =
true;
2003 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2007 static void bin_read_chunk(conn *c,
enum bin_substates next_substate, uint32_t chunk) {
2009 c->substate = next_substate;
2015 size_t nsize = c->
rsize;
2018 while (size > nsize) {
2022 if (nsize != c->
rsize) {
2024 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2025 "%d: Need to grow buffer from %lu to %lu\n",
2026 c->sfd, (
unsigned long)c->
rsize, (
unsigned long)nsize);
2028 char *newm = realloc(c->
rbuf, nsize);
2031 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2032 "%d: Failed to grow buffer.. closing connection\n",
2035 conn_set_state(c, conn_closing);
2048 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2049 "%d: Repack input buffer\n",
2057 conn_set_state(c, conn_nread);
2060 static void bin_read_key(conn *c,
enum bin_substates next_substate,
int extra) {
2061 bin_read_chunk(c, next_substate, c->keylen + extra);
2066 static void handle_binary_protocol_error(conn *c) {
2067 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
2069 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2070 "%d: Protocol error (opcode %02x), close connection\n",
2071 c->sfd, c->binary_header.request.opcode);
2076 static void init_sasl_conn(conn *c) {
2078 if (!c->sasl_conn) {
2079 int result=sasl_server_new(
"memcached",
2080 NULL, NULL, NULL, NULL,
2081 NULL, 0, &c->sasl_conn);
2082 if (result != SASL_OK) {
2084 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2085 "%d: Failed to initialize SASL conn.\n",
2088 c->sasl_conn = NULL;
2093 static void get_auth_data(
const void *cookie,
auth_data_t *data) {
2094 conn *c = (conn*)cookie;
2096 sasl_getprop(c->sasl_conn, SASL_USERNAME, (
void*)&data->username);
2098 sasl_getprop(c->sasl_conn, ISASL_CONFIG, (
void*)&data->config);
2104 static void bin_list_sasl_mechs(conn *c) {
2106 const char *result_string = NULL;
2107 unsigned int string_length = 0;
2108 int result=sasl_listmech(c->sasl_conn, NULL,
2112 &result_string, &string_length,
2114 if (result != SASL_OK) {
2117 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2118 "%d: Failed to list SASL mechanisms.\n",
2121 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2124 write_bin_response(c, (
char*)result_string, 0, 0, string_length);
2134 static void process_bin_sasl_auth(conn *c) {
2135 assert(c->binary_header.request.extlen == 0);
2137 int nkey = c->binary_header.request.keylen;
2138 int vlen = c->binary_header.request.bodylen - nkey;
2140 if (nkey > MAX_SASL_MECH_LEN) {
2141 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, vlen);
2146 char *key = binary_get_key(c);
2149 size_t buffer_size =
sizeof(
struct sasl_tmp) + nkey + vlen + 2;
2152 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
2159 memcpy(data->data, key, nkey);
2162 c->
ritem = data->data + nkey;
2164 conn_set_state(c, conn_nread);
2165 c->substate = bin_reading_sasl_auth_data;
2168 static void process_bin_complete_sasl_auth(conn *c) {
2169 const char *out = NULL;
2170 unsigned int outlen = 0;
2175 int nkey = c->binary_header.request.keylen;
2176 int vlen = c->binary_header.request.bodylen - nkey;
2180 memcpy(mech, stmp->data, nkey);
2184 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2185 "%d: mech: ``%s'' with %d bytes of data\n", c->sfd, mech, vlen);
2188 const char *challenge = vlen == 0 ? NULL : (stmp->data + nkey);
2193 case PROTOCOL_BINARY_CMD_SASL_AUTH:
2194 result = sasl_server_start(c->sasl_conn, mech,
2198 case PROTOCOL_BINARY_CMD_SASL_STEP:
2199 result = sasl_server_step(c->sasl_conn,
2208 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2209 "%d: Unhandled command %d with challenge %s\n",
2210 c->sfd, c->cmd, challenge);
2220 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2221 "%d: sasl result code: %d\n",
2227 write_bin_response(c,
"Authenticated", 0, 0, strlen(
"Authenticated"));
2229 get_auth_data(c, &data);
2230 perform_callbacks(ON_AUTH, (
const void*)&data, c);
2231 STATS_NOKEY(c, auth_cmds);
2234 add_bin_header(c, PROTOCOL_BINARY_RESPONSE_AUTH_CONTINUE, 0, 0, outlen);
2236 add_iov(c, out, outlen);
2238 conn_set_state(c, conn_mwrite);
2243 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2244 "%d: Unknown sasl response: %d\n",
2247 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2248 STATS_NOKEY2(c, auth_cmds, auth_errors);
2252 static bool authenticated(conn *c) {
2256 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
2257 case PROTOCOL_BINARY_CMD_SASL_AUTH:
2258 case PROTOCOL_BINARY_CMD_SASL_STEP:
2259 case PROTOCOL_BINARY_CMD_VERSION:
2264 const void *uname = NULL;
2265 sasl_getprop(c->sasl_conn, SASL_USERNAME, &uname);
2271 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2272 "%d: authenticated() in cmd 0x%02x is %s\n",
2273 c->sfd, c->cmd, rv ?
"true" :
"false");
2279 static bool binary_response_handler(
const void *key, uint16_t keylen,
2280 const void *ext, uint8_t extlen,
2281 const void *body, uint32_t bodylen,
2282 uint8_t datatype, uint16_t status,
2283 uint64_t cas,
const void *cookie)
2285 conn *c = (conn*)cookie;
2288 if (!grow_dynamic_buffer(c, needed)) {
2290 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
2291 "<%d ERROR: Failed to allocate memory for response\n",
2297 char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
2299 .response.magic = (uint8_t)PROTOCOL_BINARY_RES,
2300 .response.opcode = c->binary_header.request.opcode,
2301 .response.keylen = (uint16_t)htons(keylen),
2302 .response.extlen = extlen,
2303 .response.datatype = datatype,
2304 .response.status = (uint16_t)htons(status),
2305 .response.bodylen = htonl(bodylen + keylen + extlen),
2306 .response.opaque = c->opaque,
2307 .response.cas = htonll(cas),
2310 memcpy(buf, header.bytes,
sizeof(header.response));
2311 buf +=
sizeof(header.response);
2314 memcpy(buf, ext, extlen);
2319 memcpy(buf, key, keylen);
2324 memcpy(buf, body, bodylen);
2327 c->dynamic_buffer.offset += needed;
2339 uint64_t checkpoint_start;
2340 uint64_t checkpoint_end;
2344 uint64_t vbucket_set;
2348 pthread_mutex_t mutex;
2351 }
tap_stats = { .mutex = PTHREAD_MUTEX_INITIALIZER };
2353 static void ship_tap_log(conn *c) {
2354 assert(c->thread->type == TAP);
2358 if (add_msghdr(c) != 0) {
2360 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2361 "%d: Failed to create output headers. Shutting down tap connection\n", c->sfd);
2363 conn_set_state(c, conn_closing);
2369 bool more_data =
true;
2370 bool send_data =
false;
2371 bool disconnect =
false;
2376 c->icurr = c->ilist;
2390 tap_event_t
event = c->tap_iterator(
settings.engine.v0, c, &it,
2391 &engine, &nengine, &ttl,
2392 &tap_flags, &seqno, &vbucket);
2400 .mutation.message.header.request.magic = (uint8_t)PROTOCOL_BINARY_REQ,
2403 msg.opaque.message.header.request.opaque = htonl(seqno);
2404 msg.opaque.message.body.tap.enginespecific_length = htons(nengine);
2405 msg.opaque.message.body.tap.ttl = ttl;
2406 msg.opaque.message.body.tap.flags = htons(tap_flags);
2407 msg.opaque.message.header.request.extlen = 8;
2408 msg.opaque.message.header.request.vbucket = htons(vbucket);
2414 msg.noop.message.header.request.opcode = PROTOCOL_BINARY_CMD_NOOP;
2415 msg.noop.message.header.request.extlen = 0;
2416 msg.noop.message.header.request.bodylen = htonl(0);
2417 memcpy(c->wcurr, msg.noop.bytes,
sizeof(msg.noop.bytes));
2418 add_iov(c, c->wcurr,
sizeof(msg.noop.bytes));
2419 c->wcurr +=
sizeof(msg.noop.bytes);
2420 c->wbytes +=
sizeof(msg.noop.bytes);
2425 case TAP_CHECKPOINT_START:
2426 case TAP_CHECKPOINT_END:
2430 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2431 "%d: Failed to get item info\n", c->sfd);
2435 c->ilist[c->ileft++] = it;
2437 if (
event == TAP_CHECKPOINT_START) {
2438 msg.mutation.message.header.request.opcode =
2439 PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START;
2443 }
else if (
event == TAP_CHECKPOINT_END) {
2444 msg.mutation.message.header.request.opcode =
2445 PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END;
2449 }
else if (
event == TAP_MUTATION) {
2450 msg.mutation.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_MUTATION;
2456 msg.mutation.message.header.request.cas = htonll(info.cas);
2457 msg.mutation.message.header.request.keylen = htons(info.
nkey);
2458 msg.mutation.message.header.request.extlen = 16;
2460 bodylen = 16 + info.
nkey + nengine;
2461 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2464 msg.mutation.message.header.request.bodylen = htonl(bodylen);
2465 msg.mutation.message.body.item.flags = htonl(info.
flags);
2466 msg.mutation.message.body.item.expiration = htonl(info.
exptime);
2467 msg.mutation.message.body.tap.enginespecific_length = htons(nengine);
2468 msg.mutation.message.body.tap.ttl = ttl;
2469 msg.mutation.message.body.tap.flags = htons(tap_flags);
2470 memcpy(c->wcurr, msg.mutation.bytes,
sizeof(msg.mutation.bytes));
2472 add_iov(c, c->wcurr,
sizeof(msg.mutation.bytes));
2473 c->wcurr +=
sizeof(msg.mutation.bytes);
2474 c->wbytes +=
sizeof(msg.mutation.bytes);
2477 memcpy(c->wcurr, engine, nengine);
2478 add_iov(c, c->wcurr, nengine);
2479 c->wcurr += nengine;
2480 c->wbytes += nengine;
2483 add_iov(c, info.
key, info.
nkey);
2484 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2485 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2493 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2494 "%d: Failed to get item info\n", c->sfd);
2498 c->ilist[c->ileft++] = it;
2499 msg.delete.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_DELETE;
2500 msg.delete.message.header.request.cas = htonll(info.cas);
2501 msg.delete.message.header.request.keylen = htons(info.
nkey);
2503 bodylen = 8 + info.
nkey + nengine;
2504 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2507 msg.delete.message.header.request.bodylen = htonl(bodylen);
2509 memcpy(c->wcurr, msg.delete.bytes,
sizeof(msg.delete.bytes));
2510 add_iov(c, c->wcurr,
sizeof(msg.delete.bytes));
2511 c->wcurr +=
sizeof(msg.delete.bytes);
2512 c->wbytes +=
sizeof(msg.delete.bytes);
2515 memcpy(c->wcurr, engine, nengine);
2516 add_iov(c, c->wcurr, nengine);
2517 c->wcurr += nengine;
2518 c->wbytes += nengine;
2521 add_iov(c, info.
key, info.
nkey);
2522 if ((tap_flags & TAP_FLAG_NO_VALUE) == 0) {
2523 add_iov(c, info.value[0].iov_base, info.value[0].iov_len);
2531 case TAP_DISCONNECT:
2535 case TAP_VBUCKET_SET:
2540 if (
event == TAP_OPAQUE) {
2541 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_OPAQUE;
2546 }
else if (
event == TAP_FLUSH) {
2547 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_FLUSH;
2551 }
else if (
event == TAP_VBUCKET_SET) {
2552 msg.flush.message.header.request.opcode = PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET;
2553 msg.flush.message.body.tap.flags = htons(tap_flags);
2559 msg.flush.message.header.request.bodylen = htonl(8 + nengine);
2560 memcpy(c->wcurr, msg.flush.bytes,
sizeof(msg.flush.bytes));
2561 add_iov(c, c->wcurr,
sizeof(msg.flush.bytes));
2562 c->wcurr +=
sizeof(msg.flush.bytes);
2563 c->wbytes +=
sizeof(msg.flush.bytes);
2565 memcpy(c->wcurr, engine, nengine);
2566 add_iov(c, c->wcurr, nengine);
2567 c->wcurr += nengine;
2568 c->wbytes += nengine;
2574 }
while (more_data);
2576 c->ewouldblock =
false;
2578 conn_set_state(c, conn_mwrite);
2586 conn_set_state(c, conn_closing);
2590 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2591 "%d: No more items in tap log.. waiting\n",
2594 c->ewouldblock =
true;
2599 static void process_bin_unknown_packet(conn *c) {
2600 void *packet = c->
rcurr - (c->binary_header.request.bodylen +
2601 sizeof(c->binary_header));
2603 ENGINE_ERROR_CODE ret = c->aiostat;
2604 c->aiostat = ENGINE_SUCCESS;
2605 c->ewouldblock =
false;
2607 if (ret == ENGINE_SUCCESS) {
2609 binary_response_handler);
2612 if (ret == ENGINE_SUCCESS) {
2613 if (c->dynamic_buffer.buffer != NULL) {
2614 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
2615 c->dynamic_buffer.buffer = NULL;
2617 conn_set_state(c, conn_new_cmd);
2619 }
else if (ret == ENGINE_ENOTSUP) {
2620 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0);
2621 }
else if (ret == ENGINE_EWOULDBLOCK) {
2622 c->ewouldblock =
true;
2625 conn_set_state(c, conn_closing);
2629 static void process_bin_tap_connect(conn *c) {
2630 char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
2631 sizeof(c->binary_header)));
2633 const char *key = packet +
sizeof(req->bytes);
2634 const char *data = key + c->binary_header.request.keylen;
2636 size_t ndata = c->binary_header.request.bodylen -
2637 c->binary_header.request.extlen -
2638 c->binary_header.request.keylen;
2640 if (c->binary_header.request.extlen == 4) {
2641 flags = ntohl(req->message.body.flags);
2643 if (flags & TAP_CONNECT_FLAG_BACKFILL) {
2646 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2647 "%d: ERROR: Invalid tap connect message\n",
2649 conn_set_state(c, conn_closing);
2658 if (
settings.verbose && c->binary_header.request.keylen > 0) {
2660 int len = c->binary_header.request.keylen;
2661 if (len >=
sizeof(buffer)) {
2662 len =
sizeof(buffer) - 1;
2664 memcpy(buffer, key, len);
2666 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
2667 "%d: Trying to connect with named tap connection: <%s>\n",
2672 settings.engine.v0, c, key, c->binary_header.request.keylen,
2673 flags, data, ndata);
2675 if (iterator == NULL) {
2676 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
2677 "%d: FATAL: The engine does not support tap\n",
2679 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
2682 c->tap_iterator = iterator;
2683 c->which = EV_WRITE;
2684 conn_set_state(c, conn_ship_log);
2688 static void process_bin_tap_packet(tap_event_t
event, conn *c) {
2690 char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
2691 sizeof(c->binary_header)));
2693 uint16_t nengine = ntohs(tap->message.body.tap.enginespecific_length);
2694 uint16_t tap_flags = ntohs(tap->message.body.tap.flags);
2695 uint32_t seqno = ntohl(tap->message.header.request.opaque);
2696 uint8_t ttl = tap->message.body.tap.ttl;
2698 char *engine_specific = packet +
sizeof(tap->bytes);
2699 char *key = engine_specific + nengine;
2700 uint16_t nkey = c->binary_header.request.keylen;
2701 char *data = key + nkey;
2703 uint32_t exptime = 0;
2704 uint32_t ndata = c->binary_header.request.bodylen - nengine - nkey - 8;
2706 if (event == TAP_MUTATION || event == TAP_CHECKPOINT_START ||
2707 event == TAP_CHECKPOINT_END) {
2709 flags = ntohl(mutation->message.body.item.flags);
2710 exptime = ntohl(mutation->message.body.item.expiration);
2716 ENGINE_ERROR_CODE ret = c->aiostat;
2717 if (ret == ENGINE_SUCCESS) {
2719 engine_specific, nengine,
2724 ntohll(tap->message.header.request.cas),
2726 c->binary_header.request.vbucket);
2730 case ENGINE_DISCONNECT:
2731 conn_set_state(c, conn_closing);
2733 case ENGINE_EWOULDBLOCK:
2734 c->ewouldblock =
true;
2737 if ((tap_flags & TAP_FLAG_ACK) ||
2738 (ret != ENGINE_SUCCESS && c->tap_nack_mode))
2740 write_bin_packet(c, engine_error_2_protocol_error(ret), 0);
2742 conn_set_state(c, conn_new_cmd);
2747 static void process_bin_tap_ack(conn *c) {
2749 char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
2750 sizeof(c->binary_header)));
2752 uint32_t seqno = ntohl(rsp->message.header.response.opaque);
2753 uint16_t status = ntohs(rsp->message.header.response.status);
2754 char *key = packet +
sizeof(rsp->bytes);
2756 ENGINE_ERROR_CODE ret = ENGINE_DISCONNECT;
2759 TAP_ACK, seqno, key,
2760 c->binary_header.request.keylen, 0, 0,
2764 if (ret == ENGINE_DISCONNECT) {
2765 conn_set_state(c, conn_closing);
2767 conn_set_state(c, conn_ship_log);
2774 static void process_bin_noop_response(conn *c) {
2776 conn_set_state(c, conn_new_cmd);
2779 static void process_bin_verbosity(conn *c) {
2780 char *packet = (c->
rcurr - (c->binary_header.request.bodylen +
2781 sizeof(c->binary_header)));
2783 uint32_t
level = (uint32_t)ntohl(req->message.body.level);
2784 if (level > MAX_VERBOSITY_LEVEL) {
2785 level = MAX_VERBOSITY_LEVEL;
2788 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
2789 write_bin_response(c, NULL, 0, 0, 0);
2792 static void process_bin_packet(conn *c) {
2794 switch (c->binary_header.request.opcode) {
2795 case PROTOCOL_BINARY_CMD_TAP_CONNECT:
2799 conn_set_state(c, conn_add_tap_client);
2801 case PROTOCOL_BINARY_CMD_TAP_MUTATION:
2805 process_bin_tap_packet(TAP_MUTATION, c);
2807 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
2811 process_bin_tap_packet(TAP_CHECKPOINT_START, c);
2813 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
2817 process_bin_tap_packet(TAP_CHECKPOINT_END, c);
2819 case PROTOCOL_BINARY_CMD_TAP_DELETE:
2823 process_bin_tap_packet(TAP_DELETION, c);
2825 case PROTOCOL_BINARY_CMD_TAP_FLUSH:
2829 process_bin_tap_packet(TAP_FLUSH, c);
2831 case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
2835 process_bin_tap_packet(TAP_OPAQUE, c);
2837 case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
2841 process_bin_tap_packet(TAP_VBUCKET_SET, c);
2843 case PROTOCOL_BINARY_CMD_VERBOSITY:
2844 process_bin_verbosity(c);
2847 process_bin_unknown_packet(c);
2853 typedef void (*RESPONSE_HANDLER)(conn*);
2858 static RESPONSE_HANDLER response_handlers[256] = {
2859 [PROTOCOL_BINARY_CMD_NOOP] = process_bin_noop_response,
2860 [PROTOCOL_BINARY_CMD_TAP_MUTATION] = process_bin_tap_ack,
2861 [PROTOCOL_BINARY_CMD_TAP_DELETE] = process_bin_tap_ack,
2862 [PROTOCOL_BINARY_CMD_TAP_FLUSH] = process_bin_tap_ack,
2863 [PROTOCOL_BINARY_CMD_TAP_OPAQUE] = process_bin_tap_ack,
2864 [PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET] = process_bin_tap_ack,
2865 [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START] = process_bin_tap_ack,
2866 [PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END] = process_bin_tap_ack
2869 static void dispatch_bin_command(conn *c) {
2870 int protocol_error = 0;
2872 int extlen = c->binary_header.request.extlen;
2873 uint16_t keylen = c->binary_header.request.keylen;
2874 uint32_t bodylen = c->binary_header.request.bodylen;
2876 if (
settings.require_sasl && !authenticated(c)) {
2877 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_AUTH_ERROR, 0);
2882 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->
rcurr, c->
rbytes);
2887 handle_binary_protocol_error(c);
2892 case PROTOCOL_BINARY_CMD_SETQ:
2893 c->cmd = PROTOCOL_BINARY_CMD_SET;
2895 case PROTOCOL_BINARY_CMD_ADDQ:
2896 c->cmd = PROTOCOL_BINARY_CMD_ADD;
2898 case PROTOCOL_BINARY_CMD_REPLACEQ:
2899 c->cmd = PROTOCOL_BINARY_CMD_REPLACE;
2901 case PROTOCOL_BINARY_CMD_DELETEQ:
2902 c->cmd = PROTOCOL_BINARY_CMD_DELETE;
2904 case PROTOCOL_BINARY_CMD_INCREMENTQ:
2905 c->cmd = PROTOCOL_BINARY_CMD_INCREMENT;
2907 case PROTOCOL_BINARY_CMD_DECREMENTQ:
2908 c->cmd = PROTOCOL_BINARY_CMD_DECREMENT;
2910 case PROTOCOL_BINARY_CMD_QUITQ:
2911 c->cmd = PROTOCOL_BINARY_CMD_QUIT;
2913 case PROTOCOL_BINARY_CMD_FLUSHQ:
2914 c->cmd = PROTOCOL_BINARY_CMD_FLUSH;
2916 case PROTOCOL_BINARY_CMD_APPENDQ:
2917 c->cmd = PROTOCOL_BINARY_CMD_APPEND;
2919 case PROTOCOL_BINARY_CMD_PREPENDQ:
2920 c->cmd = PROTOCOL_BINARY_CMD_PREPEND;
2922 case PROTOCOL_BINARY_CMD_GETQ:
2923 c->cmd = PROTOCOL_BINARY_CMD_GET;
2925 case PROTOCOL_BINARY_CMD_GETKQ:
2926 c->cmd = PROTOCOL_BINARY_CMD_GETK;
2933 case PROTOCOL_BINARY_CMD_VERSION:
2934 if (extlen == 0 && keylen == 0 && bodylen == 0) {
2935 write_bin_response(c, VERSION, 0, 0, strlen(VERSION));
2940 case PROTOCOL_BINARY_CMD_FLUSH:
2941 if (keylen == 0 && bodylen == extlen && (extlen == 0 || extlen == 4)) {
2942 bin_read_key(c, bin_read_flush_exptime, extlen);
2947 case PROTOCOL_BINARY_CMD_NOOP:
2948 if (extlen == 0 && keylen == 0 && bodylen == 0) {
2949 write_bin_response(c, NULL, 0, 0, 0);
2954 case PROTOCOL_BINARY_CMD_SET:
2955 case PROTOCOL_BINARY_CMD_ADD:
2956 case PROTOCOL_BINARY_CMD_REPLACE:
2957 if (extlen == 8 && keylen != 0 && bodylen >= (keylen + 8)) {
2958 bin_read_key(c, bin_reading_set_header, 8);
2963 case PROTOCOL_BINARY_CMD_GETQ:
2964 case PROTOCOL_BINARY_CMD_GET:
2965 case PROTOCOL_BINARY_CMD_GETKQ:
2966 case PROTOCOL_BINARY_CMD_GETK:
2967 if (extlen == 0 && bodylen == keylen && keylen > 0) {
2968 bin_read_key(c, bin_reading_get_key, 0);
2973 case PROTOCOL_BINARY_CMD_DELETE:
2974 if (keylen > 0 && extlen == 0 && bodylen == keylen) {
2975 bin_read_key(c, bin_reading_del_header, extlen);
2980 case PROTOCOL_BINARY_CMD_INCREMENT:
2981 case PROTOCOL_BINARY_CMD_DECREMENT:
2982 if (keylen > 0 && extlen == 20 && bodylen == (keylen + extlen)) {
2983 bin_read_key(c, bin_reading_incr_header, 20);
2988 case PROTOCOL_BINARY_CMD_APPEND:
2989 case PROTOCOL_BINARY_CMD_PREPEND:
2990 if (keylen > 0 && extlen == 0) {
2991 bin_read_key(c, bin_reading_set_header, 0);
2996 case PROTOCOL_BINARY_CMD_STAT:
2998 bin_read_key(c, bin_reading_stat, 0);
3003 case PROTOCOL_BINARY_CMD_QUIT:
3004 if (keylen == 0 && extlen == 0 && bodylen == 0) {
3005 write_bin_response(c, NULL, 0, 0, 0);
3008 conn_set_state(c, conn_closing);
3014 case PROTOCOL_BINARY_CMD_TAP_CONNECT:
3016 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3018 bin_read_chunk(c, bin_reading_packet,
3019 c->binary_header.request.bodylen);
3022 case PROTOCOL_BINARY_CMD_TAP_MUTATION:
3023 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_START:
3024 case PROTOCOL_BINARY_CMD_TAP_CHECKPOINT_END:
3025 case PROTOCOL_BINARY_CMD_TAP_DELETE:
3026 case PROTOCOL_BINARY_CMD_TAP_FLUSH:
3027 case PROTOCOL_BINARY_CMD_TAP_OPAQUE:
3028 case PROTOCOL_BINARY_CMD_TAP_VBUCKET_SET:
3030 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, bodylen);
3032 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3036 case PROTOCOL_BINARY_CMD_SASL_LIST_MECHS:
3037 if (extlen == 0 && keylen == 0 && bodylen == 0) {
3038 bin_list_sasl_mechs(c);
3043 case PROTOCOL_BINARY_CMD_SASL_AUTH:
3044 case PROTOCOL_BINARY_CMD_SASL_STEP:
3045 if (extlen == 0 && keylen != 0) {
3046 bin_read_key(c, bin_reading_sasl_auth, 0);
3052 case PROTOCOL_BINARY_CMD_VERBOSITY:
3053 if (extlen == 4 && keylen == 0 && bodylen == 4) {
3054 bin_read_chunk(c, bin_reading_packet,
3055 c->binary_header.request.bodylen);
3062 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND,
3065 bin_read_chunk(c, bin_reading_packet, c->binary_header.request.bodylen);
3070 handle_binary_protocol_error(c);
3073 static void process_bin_update(conn *c) {
3082 key = binary_get_key(c);
3083 nkey = c->binary_header.request.keylen;
3086 req->message.body.flags = req->message.body.flags;
3087 rel_time_t expiration = ntohl(req->message.body.expiration);
3089 vlen = c->binary_header.request.bodylen - (nkey + c->binary_header.request.extlen);
3094 if (c->cmd == PROTOCOL_BINARY_CMD_ADD) {
3096 }
else if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3103 nw = key_to_printable_buffer(buffer,
sizeof(buffer), c->sfd,
true,
3107 if (snprintf(buffer + nw,
sizeof(buffer) - nw,
3108 " Value len is %d\n", vlen)) {
3109 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
"%s",
3116 stats_prefix_record_set(key, nkey);
3119 ENGINE_ERROR_CODE ret = c->aiostat;
3120 c->aiostat = ENGINE_SUCCESS;
3121 c->ewouldblock =
false;
3124 if (ret == ENGINE_SUCCESS) {
3128 req->message.body.flags,
3133 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3139 case ENGINE_SUCCESS:
3140 item_set_cas(c, it, c->binary_header.request.cas);
3143 case PROTOCOL_BINARY_CMD_ADD:
3144 c->store_op = OPERATION_ADD;
3146 case PROTOCOL_BINARY_CMD_SET:
3147 c->store_op = OPERATION_SET;
3149 case PROTOCOL_BINARY_CMD_REPLACE:
3150 c->store_op = OPERATION_REPLACE;
3156 if (c->binary_header.request.cas != 0) {
3157 c->store_op = OPERATION_CAS;
3161 c->
ritem = info.value[0].iov_base;
3163 conn_set_state(c, conn_nread);
3164 c->substate = bin_read_set_value;
3166 case ENGINE_EWOULDBLOCK:
3167 c->ewouldblock =
true;
3169 case ENGINE_DISCONNECT:
3170 c->state = conn_closing;
3173 if (ret == ENGINE_E2BIG) {
3174 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3176 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3184 if (c->cmd == PROTOCOL_BINARY_CMD_SET) {
3187 ntohll(req->message.header.request.cas),
3188 c->binary_header.request.vbucket);
3196 static void process_bin_append_prepend(conn *c) {
3204 key = binary_get_key(c);
3205 nkey = c->binary_header.request.keylen;
3206 vlen = c->binary_header.request.bodylen - nkey;
3209 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
3210 "Value len is %d\n", vlen);
3214 stats_prefix_record_set(key, nkey);
3217 ENGINE_ERROR_CODE ret = c->aiostat;
3218 c->aiostat = ENGINE_SUCCESS;
3219 c->ewouldblock =
false;
3222 if (ret == ENGINE_SUCCESS) {
3229 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINTERNAL, 0);
3235 case ENGINE_SUCCESS:
3236 item_set_cas(c, it, c->binary_header.request.cas);
3239 case PROTOCOL_BINARY_CMD_APPEND:
3240 c->store_op = OPERATION_APPEND;
3242 case PROTOCOL_BINARY_CMD_PREPEND:
3243 c->store_op = OPERATION_PREPEND;
3250 c->
ritem = info.value[0].iov_base;
3252 conn_set_state(c, conn_nread);
3253 c->substate = bin_read_set_value;
3255 case ENGINE_EWOULDBLOCK:
3256 c->ewouldblock =
true;
3258 case ENGINE_DISCONNECT:
3259 c->state = conn_closing;
3262 if (ret == ENGINE_E2BIG) {
3263 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_E2BIG, vlen);
3265 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_ENOMEM, vlen);
3272 static void process_bin_flush(conn *c) {
3276 if (c->binary_header.request.extlen ==
sizeof(req->message.body)) {
3277 exptime = ntohl(req->message.body.expiration);
3281 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
3282 "%d: flush %ld", c->sfd,
3286 ENGINE_ERROR_CODE
ret;
3289 if (ret == ENGINE_SUCCESS) {
3290 write_bin_response(c, NULL, 0, 0, 0);
3291 }
else if (ret == ENGINE_ENOTSUP) {
3292 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_SUPPORTED, 0);
3294 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3296 STATS_NOKEY(c, cmd_flush);
3299 static void process_bin_delete(conn *c) {
3302 char* key = binary_get_key(c);
3303 size_t nkey = c->binary_header.request.keylen;
3309 if (key_to_printable_buffer(buffer,
sizeof(buffer), c->sfd,
true,
3310 "DELETE", key, nkey) != -1) {
3311 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
"%s\n",
3316 ENGINE_ERROR_CODE ret = c->aiostat;
3317 c->aiostat = ENGINE_SUCCESS;
3318 c->ewouldblock =
false;
3320 if (ret == ENGINE_SUCCESS) {
3322 stats_prefix_record_delete(key, nkey);
3325 ntohll(req->message.header.request.cas),
3326 c->binary_header.request.vbucket);
3332 case ENGINE_SUCCESS:
3333 write_bin_response(c, NULL, 0, 0, 0);
3334 SLAB_INCR(c, delete_hits, key, nkey);
3336 case ENGINE_KEY_EEXISTS:
3337 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_EEXISTS, 0);
3339 case ENGINE_KEY_ENOENT:
3340 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0);
3341 STATS_INCR(c, delete_misses, key, nkey);
3343 case ENGINE_NOT_MY_VBUCKET:
3344 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_NOT_MY_VBUCKET, 0);
3346 case ENGINE_EWOULDBLOCK:
3347 c->ewouldblock =
true;
3350 write_bin_packet(c, PROTOCOL_BINARY_RESPONSE_EINVAL, 0);
3354 static void complete_nread_binary(conn *c) {
3356 assert(c->cmd >= 0);
3358 switch(c->substate) {
3359 case bin_reading_set_header:
3360 if (c->cmd == PROTOCOL_BINARY_CMD_APPEND ||
3361 c->cmd == PROTOCOL_BINARY_CMD_PREPEND) {
3362 process_bin_append_prepend(c);
3364 process_bin_update(c);
3367 case bin_read_set_value:
3368 complete_update_bin(c);
3370 case bin_reading_get_key:
3373 case bin_reading_stat:
3374 process_bin_stat(c);
3376 case bin_reading_del_header:
3377 process_bin_delete(c);
3379 case bin_reading_incr_header:
3380 complete_incr_bin(c);
3382 case bin_read_flush_exptime:
3383 process_bin_flush(c);
3385 case bin_reading_sasl_auth:
3386 process_bin_sasl_auth(c);
3388 case bin_reading_sasl_auth_data:
3389 process_bin_complete_sasl_auth(c);
3391 case bin_reading_packet:
3392 if (c->binary_header.request.magic == PROTOCOL_BINARY_RES) {
3394 handler = response_handlers[c->binary_header.request.opcode];
3398 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
3399 "%d: ERROR: Unsupported response packet received: %u\n",
3400 c->sfd, (
unsigned int)c->binary_header.request.opcode);
3401 conn_set_state(c, conn_closing);
3404 process_bin_packet(c);
3408 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
3409 "Not handling substate %d\n", c->substate);
3414 static void reset_cmd_handler(conn *c) {
3418 c->substate = bin_no_state;
3419 if(c->
item != NULL) {
3425 conn_set_state(c, conn_parse_cmd);
3427 conn_set_state(c, conn_waiting);
3431 static ENGINE_ERROR_CODE ascii_response_handler(
const void *cookie,
3435 conn *c = (conn*)cookie;
3436 if (!grow_dynamic_buffer(c, nbytes)) {
3438 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
3439 "<%d ERROR: Failed to allocate memory for response\n",
3442 return ENGINE_ENOMEM;
3445 char *buf = c->dynamic_buffer.buffer + c->dynamic_buffer.offset;
3446 memcpy(buf, dta, nbytes);
3447 c->dynamic_buffer.offset += nbytes;
3449 return ENGINE_SUCCESS;
3452 static void complete_nread_ascii(conn *c) {
3454 c->ewouldblock =
false;
3456 ascii_response_handler)) {
3457 case ENGINE_SUCCESS:
3458 if (c->dynamic_buffer.buffer != NULL) {
3459 write_and_free(c, c->dynamic_buffer.buffer,
3460 c->dynamic_buffer.offset);
3461 c->dynamic_buffer.buffer = NULL;
3463 conn_set_state(c, conn_new_cmd);
3466 case ENGINE_EWOULDBLOCK:
3467 c->ewouldblock =
true;
3469 case ENGINE_DISCONNECT:
3471 conn_set_state(c, conn_closing);
3474 complete_update_ascii(c);
3478 static void complete_nread(conn *c) {
3480 assert(c->protocol == ascii_prot
3481 || c->protocol == binary_prot);
3483 if (c->protocol == ascii_prot) {
3484 complete_nread_ascii(c);
3485 }
else if (c->protocol == binary_prot) {
3486 complete_nread_binary(c);
3490 #define COMMAND_TOKEN 0
3491 #define SUBCOMMAND_TOKEN 1
3494 #define MAX_TOKENS 30
3513 static size_t tokenize_command(
char *command,
token_t *tokens,
const size_t max_tokens) {
3517 assert(command != NULL && tokens != NULL && max_tokens > 1);
3519 for (s = e = command; ntokens < max_tokens - 1; ++e) {
3522 tokens[ntokens].value = s;
3523 tokens[ntokens].length = e - s;
3529 else if (*e ==
'\0') {
3531 tokens[ntokens].value = s;
3532 tokens[ntokens].length = e - s;
3544 tokens[ntokens].value = *e ==
'\0' ? NULL : e;
3545 tokens[ntokens].length = 0;
3551 static void detokenize(
token_t *tokens,
int ntokens,
char **out,
int *nbytes) {
3556 for (i = 0; i < ntokens; ++
i) {
3557 nb += tokens[
i].length;
3560 buf = malloc(nb *
sizeof(
char));
3563 for (i = 0; i < ntokens; ++
i) {
3564 memcpy(p, tokens[i].value, tokens[i].length);
3565 p += tokens[
i].length;
3577 static void write_and_free(conn *c,
char *buf,
int bytes) {
3579 c->write_and_free =
buf;
3582 conn_set_state(c, conn_write);
3585 out_string(c,
"SERVER_ERROR out of memory writing stats");
3589 static inline bool set_noreply_maybe(conn *c,
token_t *tokens,
size_t ntokens)
3591 int noreply_index = ntokens - 2;
3600 if (tokens[noreply_index].value
3601 && strcmp(tokens[noreply_index].value,
"noreply") == 0) {
3607 void append_stat(
const char *
name,
ADD_STAT add_stats, conn *c,
3608 const char *
fmt, ...) {
3609 char val_str[STAT_VAL_LEN];
3619 vlen = vsnprintf(val_str,
sizeof(val_str) - 1, fmt, ap);
3622 add_stats(name, strlen(name), val_str, vlen, c);
3625 inline static void process_stats_detail(conn *c,
const char *command) {
3629 if (strcmp(command,
"on") == 0) {
3631 out_string(c,
"OK");
3633 else if (strcmp(command,
"off") == 0) {
3635 out_string(c,
"OK");
3637 else if (strcmp(command,
"dump") == 0) {
3639 char *
stats = stats_prefix_dump(&len);
3640 write_and_free(c, stats, len);
3643 out_string(c,
"CLIENT_ERROR usage: stats detail on|off|dump");
3647 out_string(c,
"CLIENT_ERROR detailed stats disabled");
3651 static void aggregate_callback(
void *in,
void *out) {
3654 threadlocal_stats_aggregate(in_independent_stats->thread_stats,
3659 static void server_stats(
ADD_STAT add_stats, conn *c,
bool aggregate) {
3660 pid_t pid = getpid();
3661 rel_time_t now = current_time;
3672 threadlocal_stats_aggregate(get_independent_stats(c)->
thread_stats,
3681 getrusage(RUSAGE_SELF, &usage);
3688 APPEND_STAT(
"time",
"%ld", now + (
long)process_started);
3690 APPEND_STAT(
"libevent",
"%s", event_get_version());
3691 APPEND_STAT(
"pointer_size",
"%d", (
int)(8 *
sizeof(
void *)));
3694 append_stat(
"rusage_user", add_stats, c,
"%ld.%06ld",
3695 (
long)usage.ru_utime.tv_sec,
3696 (
long)usage.ru_utime.tv_usec);
3697 append_stat(
"rusage_system", add_stats, c,
"%ld.%06ld",
3698 (
long)usage.ru_stime.tv_sec,
3699 (
long)usage.ru_stime.tv_usec);
3725 APPEND_STAT(
"accepting_conns",
"%u", is_listen_disabled() ? 0 : 1);
3726 APPEND_STAT(
"listen_disabled_num",
"%"PRIu64, get_listen_disabled_num());
3727 APPEND_STAT(
"rejected_conns",
"%" PRIu64, (
unsigned long long)
stats.rejected_conns);
3740 if (ts.sent.connect) {
3741 APPEND_STAT(
"tap_connect_sent",
"%"PRIu64, ts.sent.connect);
3743 if (ts.sent.mutation) {
3744 APPEND_STAT(
"tap_mutation_sent",
"%"PRIu64, ts.sent.mutation);
3746 if (ts.sent.checkpoint_start) {
3747 APPEND_STAT(
"tap_checkpoint_start_sent",
"%"PRIu64, ts.sent.checkpoint_start);
3749 if (ts.sent.checkpoint_end) {
3750 APPEND_STAT(
"tap_checkpoint_end_sent",
"%"PRIu64, ts.sent.checkpoint_end);
3752 if (ts.sent.delete) {
3753 APPEND_STAT(
"tap_delete_sent",
"%"PRIu64, ts.sent.delete);
3755 if (ts.sent.flush) {
3756 APPEND_STAT(
"tap_flush_sent",
"%"PRIu64, ts.sent.flush);
3758 if (ts.sent.opaque) {
3759 APPEND_STAT(
"tap_opaque_sent",
"%"PRIu64, ts.sent.opaque);
3761 if (ts.sent.vbucket_set) {
3763 ts.sent.vbucket_set);
3765 if (ts.received.connect) {
3766 APPEND_STAT(
"tap_connect_received",
"%"PRIu64, ts.received.connect);
3768 if (ts.received.mutation) {
3769 APPEND_STAT(
"tap_mutation_received",
"%"PRIu64, ts.received.mutation);
3771 if (ts.received.checkpoint_start) {
3772 APPEND_STAT(
"tap_checkpoint_start_received",
"%"PRIu64, ts.received.checkpoint_start);
3774 if (ts.received.checkpoint_end) {
3775 APPEND_STAT(
"tap_checkpoint_end_received",
"%"PRIu64, ts.received.checkpoint_end);
3777 if (ts.received.delete) {
3778 APPEND_STAT(
"tap_delete_received",
"%"PRIu64, ts.received.delete);
3780 if (ts.received.flush) {
3781 APPEND_STAT(
"tap_flush_received",
"%"PRIu64, ts.received.flush);
3783 if (ts.received.opaque) {
3784 APPEND_STAT(
"tap_opaque_received",
"%"PRIu64, ts.received.opaque);
3786 if (ts.received.vbucket_set) {
3787 APPEND_STAT(
"tap_vbucket_set_received",
"%"PRIu64,
3788 ts.received.vbucket_set);
3792 static void process_stat_settings(
ADD_STAT add_stats,
void *c) {
3811 settings.detail_enabled ?
"yes" :
"no");
3813 settings.allow_detailed ?
"yes" :
"no");
3819 prot_text(
settings.binding_protocol));
3828 #elif defined(ENABLE_SASL)
3848 APPEND_STAT(
"ascii_extension",
"%s", ptr->get_name(ptr->cookie));
3852 static char *process_stat(conn *c,
token_t *tokens,
const size_t ntokens) {
3853 const char *subcommand = tokens[SUBCOMMAND_TOKEN].value;
3854 c->dynamic_buffer.offset = 0;
3857 ENGINE_ERROR_CODE ret = c->aiostat;
3858 c->aiostat = ENGINE_SUCCESS;
3859 c->ewouldblock =
false;
3860 if (ret == ENGINE_SUCCESS) {
3861 server_stats(&append_stats, c,
false);
3863 NULL, 0, &append_stats);
3864 if (ret == ENGINE_EWOULDBLOCK) {
3865 c->ewouldblock =
true;
3866 return c->
rcurr + 5;
3869 }
else if (strcmp(subcommand,
"reset") == 0) {
3871 out_string(c,
"RESET");
3873 }
else if (strcmp(subcommand,
"detail") == 0) {
3876 process_stats_detail(c,
"");
3878 process_stats_detail(c, tokens[2].value);
3882 }
else if (strcmp(subcommand,
"settings") == 0) {
3883 process_stat_settings(&append_stats, c);
3884 }
else if (strcmp(subcommand,
"cachedump") == 0) {
3886 unsigned int bytes = 0,
id,
limit = 0;
3889 out_string(c,
"CLIENT_ERROR bad command line");
3893 if (!safe_strtoul(tokens[2].value, &
id) ||
3894 !safe_strtoul(tokens[3].value, &limit)) {
3895 out_string(c,
"CLIENT_ERROR bad command line format");
3899 if (
id >= POWER_LARGEST) {
3900 out_string(c,
"CLIENT_ERROR Illegal slab id");
3905 buf = item_cachedump(
id, limit, &bytes);
3907 write_and_free(c, buf, bytes);
3909 }
else if (strcmp(subcommand,
"aggregate") == 0) {
3910 server_stats(&append_stats, c,
true);
3911 }
else if (strcmp(subcommand,
"topkeys") == 0) {
3912 topkeys_t *tk = get_independent_stats(c)->topkeys;
3914 topkeys_stats(tk, c, current_time, append_stats);
3916 out_string(c,
"ERROR");
3922 ENGINE_ERROR_CODE ret = c->aiostat;
3923 c->aiostat = ENGINE_SUCCESS;
3924 c->ewouldblock =
false;
3925 if (ret == ENGINE_SUCCESS) {
3928 detokenize(&tokens[1], ntokens - 2, &buf, &nb);
3935 case ENGINE_SUCCESS:
3936 append_stats(NULL, 0, NULL, 0, c);
3937 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
3938 c->dynamic_buffer.buffer = NULL;
3941 out_string(c,
"SERVER_ERROR out of memory writing stats");
3943 case ENGINE_DISCONNECT:
3944 c->state = conn_closing;
3946 case ENGINE_ENOTSUP:
3947 out_string(c,
"SERVER_ERROR not supported");
3949 case ENGINE_EWOULDBLOCK:
3950 c->ewouldblock =
true;
3951 return tokens[SUBCOMMAND_TOKEN].value;
3953 out_string(c,
"ERROR");
3961 append_stats(NULL, 0, NULL, 0, c);
3963 if (c->dynamic_buffer.buffer == NULL) {
3964 out_string(c,
"SERVER_ERROR out of memory writing stats");
3966 write_and_free(c, c->dynamic_buffer.buffer, c->dynamic_buffer.offset);
3967 c->dynamic_buffer.buffer = NULL;
3978 static char *get_suffix_buffer(conn *c) {
3979 if (c->suffixleft == c->suffixsize) {
3980 char **new_suffix_list;
3981 size_t sz =
sizeof(
char*) * c->suffixsize * 2;
3983 new_suffix_list = realloc(c->suffixlist, sz);
3984 if (new_suffix_list) {
3986 c->suffixlist = new_suffix_list;
3989 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
3990 "=%d Failed to resize suffix buffer\n", c->sfd);
3997 char *suffix = cache_alloc(c->thread->suffix_cache);
3998 if (suffix != NULL) {
3999 *(c->suffixlist + c->suffixleft) = suffix;
4007 static inline char* process_get_command(conn *c,
token_t *tokens,
size_t ntokens,
bool return_cas) {
4012 token_t *key_token = &tokens[KEY_TOKEN];
4016 while(key_token->length != 0) {
4018 key = key_token->value;
4019 nkey = key_token->length;
4022 out_string(c,
"CLIENT_ERROR bad command line format");
4026 ENGINE_ERROR_CODE ret = c->aiostat;
4027 c->aiostat = ENGINE_SUCCESS;
4029 if (ret == ENGINE_SUCCESS) {
4034 case ENGINE_EWOULDBLOCK:
4035 c->ewouldblock =
true;
4039 case ENGINE_SUCCESS:
4041 case ENGINE_KEY_ENOENT:
4048 stats_prefix_record_get(key, nkey, NULL != it);
4056 out_string(c,
"SERVER_ERROR error getting item data");
4060 if (i >= c->isize) {
4061 item **new_list = realloc(c->ilist,
sizeof(item *) * c->isize * 2);
4064 c->ilist = new_list;
4072 char *suffix = get_suffix_buffer(c);
4073 if (suffix == NULL) {
4074 out_string(c,
"SERVER_ERROR out of memory rebuilding suffix");
4078 int suffix_len = snprintf(suffix, SUFFIX_SIZE,
4079 " %u %u\r\n", htonl(info.
flags),
4090 MEMCACHED_COMMAND_GET(c->sfd, info.
key, info.
nkey,
4095 char *cas = get_suffix_buffer(c);
4097 out_string(c,
"SERVER_ERROR out of memory making CAS suffix");
4101 int cas_len = snprintf(cas, SUFFIX_SIZE,
" %"PRIu64
"\r\n",
4103 if (add_iov(c,
"VALUE ", 6) != 0 ||
4104 add_iov(c, info.
key, info.
nkey) != 0 ||
4105 add_iov(c, suffix, suffix_len - 2) != 0 ||
4106 add_iov(c, cas, cas_len) != 0 ||
4107 add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4108 add_iov(c,
"\r\n", 2) != 0)
4116 if (add_iov(c,
"VALUE ", 6) != 0 ||
4117 add_iov(c, info.
key, info.
nkey) != 0 ||
4118 add_iov(c, suffix, suffix_len) != 0 ||
4119 add_iov(c, info.value[0].iov_base, info.value[0].iov_len) != 0 ||
4120 add_iov(c,
"\r\n", 2) != 0)
4129 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4130 ">%d sending key %s\n",
4135 STATS_HIT(c,
get, key, nkey);
4136 *(c->ilist +
i) = it;
4140 STATS_MISS(c,
get, key, nkey);
4141 MEMCACHED_COMMAND_GET(c->sfd, key, nkey, -1, 0);
4151 if(key_token->value != NULL) {
4152 ntokens = tokenize_command(key_token->value, tokens, MAX_TOKENS);
4156 }
while(key_token->value != NULL);
4158 c->icurr = c->ilist;
4160 c->suffixcurr = c->suffixlist;
4163 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4164 ">%d END\n", c->sfd);
4172 if (key_token->value != NULL || add_iov(c,
"END\r\n", 5) != 0
4173 || (IS_UDP(c->transport) && build_udp_headers(c) != 0)) {
4174 out_string(c,
"SERVER_ERROR out of memory writing get response");
4177 conn_set_state(c, conn_mwrite);
4184 static void process_update_command(conn *c,
token_t *tokens,
const size_t ntokens, ENGINE_STORE_OPERATION store_op,
bool handle_cas) {
4188 int32_t exptime_int = 0;
4191 uint64_t req_cas_id=0;
4196 set_noreply_maybe(c, tokens, ntokens);
4199 out_string(c,
"CLIENT_ERROR bad command line format");
4203 key = tokens[KEY_TOKEN].value;
4204 nkey = tokens[KEY_TOKEN].length;
4206 if (! (safe_strtoul(tokens[2].value, (uint32_t *)&flags)
4207 && safe_strtol(tokens[3].value, &exptime_int)
4208 && safe_strtol(tokens[4].value, (int32_t *)&vlen))) {
4209 out_string(c,
"CLIENT_ERROR bad command line format");
4214 exptime = exptime_int;
4218 if (!safe_strtoull(tokens[5].value, &req_cas_id)) {
4219 out_string(c,
"CLIENT_ERROR bad command line format");
4225 out_string(c,
"CLIENT_ERROR bad command line format");
4230 stats_prefix_record_set(key, nkey);
4233 ENGINE_ERROR_CODE ret = c->aiostat;
4234 c->aiostat = ENGINE_SUCCESS;
4235 c->ewouldblock =
false;
4237 if (ret == ENGINE_SUCCESS) {
4240 vlen, htonl(flags), exptime);
4245 case ENGINE_SUCCESS:
4246 item_set_cas(c, it, req_cas_id);
4249 out_string(c,
"SERVER_ERROR error getting item data");
4253 c->
ritem = info.value[0].iov_base;
4255 c->store_op = store_op;
4256 conn_set_state(c, conn_nread);
4258 case ENGINE_EWOULDBLOCK:
4259 c->ewouldblock =
true;
4261 case ENGINE_DISCONNECT:
4262 c->state = conn_closing;
4265 if (ret == ENGINE_E2BIG) {
4266 out_string(c,
"SERVER_ERROR object too large for cache");
4268 out_string(c,
"SERVER_ERROR out of memory storing object");
4272 c->sbytes = vlen + 2;
4276 if (store_op == OPERATION_SET) {
4282 static char* process_arithmetic_command(conn *c,
token_t *tokens,
const size_t ntokens,
const bool incr) {
4290 set_noreply_maybe(c, tokens, ntokens);
4293 out_string(c,
"CLIENT_ERROR bad command line format");
4297 key = tokens[KEY_TOKEN].value;
4298 nkey = tokens[KEY_TOKEN].length;
4300 if (!safe_strtoull(tokens[2].value, &delta)) {
4301 out_string(c,
"CLIENT_ERROR invalid numeric delta argument");
4305 ENGINE_ERROR_CODE ret = c->aiostat;
4306 c->aiostat = ENGINE_SUCCESS;
4309 if (ret == ENGINE_SUCCESS) {
4311 incr,
false, delta, 0, 0, &cas,
4317 case ENGINE_SUCCESS:
4319 STATS_INCR(c, incr_hits, key, nkey);
4321 STATS_INCR(c, decr_hits, key, nkey);
4323 snprintf(temp,
sizeof(temp),
"%"PRIu64, result);
4324 out_string(c, temp);
4326 case ENGINE_KEY_ENOENT:
4328 STATS_INCR(c, incr_misses, key, nkey);
4330 STATS_INCR(c, decr_misses, key, nkey);
4332 out_string(c,
"NOT_FOUND");
4335 out_string(c,
"SERVER_ERROR out of memory");
4337 case ENGINE_TMPFAIL:
4338 out_string(c,
"SERVER_ERROR temporary failure");
4341 out_string(c,
"CLIENT_ERROR cannot increment or decrement non-numeric value");
4343 case ENGINE_NOT_STORED:
4344 out_string(c,
"SERVER_ERROR failed to store item");
4346 case ENGINE_DISCONNECT:
4347 c->state = conn_closing;
4349 case ENGINE_ENOTSUP:
4350 out_string(c,
"SERVER_ERROR not supported");
4352 case ENGINE_EWOULDBLOCK:
4353 c->ewouldblock =
true;
4362 static char *process_delete_command(conn *c,
token_t *tokens,
4363 const size_t ntokens) {
4370 bool hold_is_zero = strcmp(tokens[KEY_TOKEN+1].value,
"0") == 0;
4371 bool sets_noreply = set_noreply_maybe(c, tokens, ntokens);
4372 bool valid = (ntokens == 4 && (hold_is_zero || sets_noreply))
4373 || (ntokens == 5 && hold_is_zero && sets_noreply);
4375 out_string(c,
"CLIENT_ERROR bad command line format. "
4376 "Usage: delete <key> [noreply]");
4381 key = tokens[KEY_TOKEN].value;
4382 nkey = tokens[KEY_TOKEN].length;
4385 out_string(c,
"CLIENT_ERROR bad command line format");
4389 ENGINE_ERROR_CODE ret = c->aiostat;
4390 c->aiostat = ENGINE_SUCCESS;
4391 c->ewouldblock =
false;
4392 if (ret == ENGINE_SUCCESS) {
4400 case ENGINE_SUCCESS:
4401 out_string(c,
"DELETED");
4402 SLAB_INCR(c, delete_hits, key, nkey);
4404 case ENGINE_EWOULDBLOCK:
4405 c->ewouldblock =
true;
4407 case ENGINE_TMPFAIL:
4408 out_string(c,
"SERVER_ERROR temporary failure");
4411 out_string(c,
"NOT_FOUND");
4412 STATS_INCR(c, delete_misses, key, nkey);
4415 if (ret != ENGINE_EWOULDBLOCK &&
settings.detail_enabled) {
4416 stats_prefix_record_delete(key, nkey);
4421 static char *process_bind_command(conn *c,
token_t *tokens,
4422 const size_t ntokens) {
4429 out_string(c,
"CLIENT_ERROR bad command line format. "
4430 "Usage: bind <table_id_name>");
4434 name = tokens[KEY_TOKEN].value;
4435 name_len = tokens[KEY_TOKEN].length;
4438 out_string(c,
"CLIENT_ERROR bad command line format");
4442 ENGINE_ERROR_CODE ret = c->aiostat;
4443 c->aiostat = ENGINE_SUCCESS;
4444 c->ewouldblock =
false;
4445 if (ret == ENGINE_SUCCESS) {
4453 case ENGINE_SUCCESS:
4454 out_string(c,
"SUCCEED");
4456 case ENGINE_EWOULDBLOCK:
4457 c->ewouldblock =
true;
4459 case ENGINE_TMPFAIL:
4461 out_string(c,
"NOT_FOUND");
4467 static void process_verbosity_command(conn *c,
token_t *tokens,
const size_t ntokens) {
4472 set_noreply_maybe(c, tokens, ntokens);
4473 if (c->noreply && ntokens == 3) {
4476 out_string(c,
"ERROR");
4480 if (safe_strtoul(tokens[1].value, &level)) {
4481 settings.verbose = level > MAX_VERBOSITY_LEVEL ? MAX_VERBOSITY_LEVEL :
level;
4482 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
4483 out_string(c,
"OK");
4485 out_string(c,
"ERROR");
4489 static char* process_command(conn *c,
char *command) {
4498 MEMCACHED_PROCESS_COMMAND_START(c->sfd, c->
rcurr, c->
rbytes);
4501 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4502 "<%d %s\n", c->sfd, command);
4510 if (c->ewouldblock) {
4515 c->ewouldblock =
false;
4520 if (add_msghdr(c) != 0) {
4521 out_string(c,
"SERVER_ERROR out of memory preparing response");
4526 ntokens = tokenize_command(command, tokens, MAX_TOKENS);
4528 ((strcmp(tokens[COMMAND_TOKEN].value,
"get") == 0) ||
4529 (strcmp(tokens[COMMAND_TOKEN].value,
"bget") == 0))) {
4531 ret = process_get_command(c, tokens, ntokens,
false);
4533 }
else if ((ntokens == 6 || ntokens == 7) &&
4534 ((strcmp(tokens[COMMAND_TOKEN].value,
"add") == 0 && (comm = (
int)OPERATION_ADD)) ||
4535 (strcmp(tokens[COMMAND_TOKEN].value,
"set") == 0 && (comm = (
int)OPERATION_SET)) ||
4536 (strcmp(tokens[COMMAND_TOKEN].value,
"replace") == 0 && (comm = (
int)OPERATION_REPLACE)) ||
4537 (strcmp(tokens[COMMAND_TOKEN].value,
"prepend") == 0 && (comm = (
int)OPERATION_PREPEND)) ||
4538 (strcmp(tokens[COMMAND_TOKEN].value,
"append") == 0 && (comm = (
int)OPERATION_APPEND)) )) {
4540 process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm,
false);
4542 }
else if ((ntokens == 7 || ntokens == 8) && (strcmp(tokens[COMMAND_TOKEN].value,
"cas") == 0 && (comm = (
int)OPERATION_CAS))) {
4544 process_update_command(c, tokens, ntokens, (ENGINE_STORE_OPERATION)comm,
true);
4546 }
else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value,
"incr") == 0)) {
4548 ret = process_arithmetic_command(c, tokens, ntokens, 1);
4550 }
else if (ntokens >= 3 && (strcmp(tokens[COMMAND_TOKEN].value,
"gets") == 0)) {
4552 ret = process_get_command(c, tokens, ntokens,
true);
4554 }
else if ((ntokens == 4 || ntokens == 5) && (strcmp(tokens[COMMAND_TOKEN].value,
"decr") == 0)) {
4556 ret = process_arithmetic_command(c, tokens, ntokens, 0);
4558 }
else if (ntokens >= 3 && ntokens <= 5 && (strcmp(tokens[COMMAND_TOKEN].value,
"delete") == 0)) {
4560 ret = process_delete_command(c, tokens, ntokens);
4562 }
else if (ntokens == 3 && (strcmp(tokens[COMMAND_TOKEN].value,
"bind") == 0)) {
4564 ret = process_bind_command(c, tokens, ntokens);
4566 }
else if (ntokens >= 2 && (strcmp(tokens[COMMAND_TOKEN].value,
"stats") == 0)) {
4568 ret = process_stat(c, tokens, ntokens);
4570 }
else if (ntokens >= 2 && ntokens <= 4 && (strcmp(tokens[COMMAND_TOKEN].value,
"flush_all") == 0)) {
4573 set_noreply_maybe(c, tokens, ntokens);
4575 if (ntokens == (c->noreply ? 3 : 2)) {
4578 exptime = strtol(tokens[1].value, NULL, 10);
4579 if(errno == ERANGE) {
4580 out_string(c,
"CLIENT_ERROR bad command line format");
4585 ENGINE_ERROR_CODE ret = c->aiostat;
4586 c->aiostat = ENGINE_SUCCESS;
4587 c->ewouldblock =
false;
4588 if (ret == ENGINE_SUCCESS) {
4593 case ENGINE_SUCCESS:
4594 out_string(c,
"OK");
4596 case ENGINE_ENOTSUP:
4597 out_string(c,
"SERVER_ERROR not supported");
4599 case ENGINE_EWOULDBLOCK:
4600 c->ewouldblock =
true;
4601 return c->
rcurr + 9;
4603 out_string(c,
"SERVER_ERROR failed to flush cache");
4606 if (ret != ENGINE_EWOULDBLOCK) {
4607 STATS_NOKEY(c, cmd_flush);
4611 }
else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value,
"version") == 0)) {
4613 out_string(c,
"VERSION " VERSION);
4615 }
else if (ntokens == 2 && (strcmp(tokens[COMMAND_TOKEN].value,
"quit") == 0)) {
4617 conn_set_state(c, conn_closing);
4619 }
else if ((ntokens == 3 || ntokens == 4) && (strcmp(tokens[COMMAND_TOKEN].value,
"verbosity") == 0)) {
4620 process_verbosity_command(c, tokens, ntokens);
4621 }
else if (
settings.extensions.ascii != NULL) {
4627 if (ntokens == MAX_TOKENS) {
4628 out_string(c,
"ERROR too many arguments");
4632 if (tokens[ntokens - 1].length == 0) {
4637 for (cmd =
settings.extensions.ascii; cmd != NULL; cmd = cmd->
next) {
4638 if (cmd->
accept(cmd->
cookie, c, ntokens, tokens, &nbytes, &ptr)) {
4644 out_string(c,
"ERROR unknown command");
4645 }
else if (nbytes == 0) {
4647 ascii_response_handler)) {
4648 case ENGINE_SUCCESS:
4649 if (c->dynamic_buffer.buffer != NULL) {
4650 write_and_free(c, c->dynamic_buffer.buffer,
4651 c->dynamic_buffer.offset);
4652 c->dynamic_buffer.buffer = NULL;
4654 conn_set_state(c, conn_new_cmd);
4657 case ENGINE_EWOULDBLOCK:
4658 c->ewouldblock =
true;
4659 ret = tokens[KEY_TOKEN].value;;
4661 case ENGINE_DISCONNECT:
4663 conn_set_state(c, conn_closing);
4671 conn_set_state(c, conn_nread);
4674 out_string(c,
"ERROR");
4682 static int try_read_command(conn *c) {
4687 if (c->protocol == negotiating_prot || c->transport == udp_transport) {
4688 if ((
unsigned char)c->
rbuf[0] == (
unsigned char)PROTOCOL_BINARY_REQ) {
4689 c->protocol = binary_prot;
4691 c->protocol = ascii_prot;
4695 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4696 "%d: Client using the %s protocol\n", c->sfd,
4697 prot_text(c->protocol));
4701 if (c->protocol == binary_prot) {
4703 if (c->
rbytes <
sizeof(c->binary_header)) {
4708 if (((
long)(c->
rcurr)) % 8 != 0) {
4713 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4714 "%d: Realign input buffer\n", c->sfd);
4725 nw = bytes_to_output_string(buffer,
sizeof(buffer), c->sfd,
4726 true,
"Read binary protocol data:",
4727 (
const char*)req->bytes,
4728 sizeof(req->bytes));
4730 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
4735 c->binary_header = *req;
4736 c->binary_header.request.keylen = ntohs(req->request.keylen);
4737 c->binary_header.request.bodylen = ntohl(req->request.bodylen);
4738 c->binary_header.request.vbucket = ntohs(req->request.vbucket);
4739 c->binary_header.request.cas = ntohll(req->request.cas);
4742 if (c->binary_header.request.magic != PROTOCOL_BINARY_REQ &&
4743 !(c->binary_header.request.magic == PROTOCOL_BINARY_RES &&
4744 response_handlers[c->binary_header.request.opcode])) {
4746 if (c->binary_header.request.magic != PROTOCOL_BINARY_RES) {
4747 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
4748 "%d: Invalid magic: %x\n", c->sfd,
4749 c->binary_header.request.magic);
4751 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
4752 "%d: ERROR: Unsupported response packet received: %u\n",
4753 c->sfd, (
unsigned int)c->binary_header.request.opcode);
4757 conn_set_state(c, conn_closing);
4764 if (add_msghdr(c) != 0) {
4765 out_string(c,
"SERVER_ERROR out of memory");
4769 c->cmd = c->binary_header.request.opcode;
4770 c->keylen = c->binary_header.request.keylen;
4771 c->opaque = c->binary_header.request.opaque;
4775 dispatch_bin_command(c);
4777 c->
rbytes -=
sizeof(c->binary_header);
4778 c->
rcurr +=
sizeof(c->binary_header);
4781 char *el, *cont, *left, lb;
4794 char *ptr = c->
rcurr;
4795 while (*ptr ==
' ') {
4799 if (ptr - c->
rcurr > 100 ||
4800 (strncmp(ptr,
"get ", 4) && strncmp(ptr,
"gets ", 5))) {
4802 conn_set_state(c, conn_closing);
4810 if ((el - c->
rcurr) > 1 && *(el - 1) ==
'\r') {
4819 LOCK_THREAD(thread);
4820 left = process_command(c, c->
rcurr);
4821 if (c->ewouldblock) {
4822 unregister_event(c);
4824 UNLOCK_THREAD(thread);
4832 assert (left <= el);
4834 int count = strlen(c->
rcurr);
4835 if ((c->
rcurr + count) == left) {
4839 left -= (count + 1);
4841 assert(cont >= c->
rcurr);
4842 if (cont > c->
rcurr) {
4843 memmove(cont, c->
rcurr, count);
4848 while ((left = memchr(left,
'\0', el - left)) != NULL) {
4866 static enum try_read_result try_read_udp(conn *c) {
4871 c->request_addr_size =
sizeof(c->request_addr);
4872 res = recvfrom(c->sfd, c->
rbuf, c->
rsize,
4873 0, (
struct sockaddr *)&c->request_addr, &c->request_addr_size);
4875 unsigned char *buf = (
unsigned char *)c->
rbuf;
4876 STATS_ADD(c, bytes_read, res);
4879 c->request_id = buf[0] * 256 + buf[1];
4882 if (buf[4] != 0 || buf[5] != 1) {
4883 out_string(c,
"SERVER_ERROR multi-packet request not supported");
4884 return READ_NO_DATA_RECEIVED;
4889 memmove(c->
rbuf, c->
rbuf + 8, res);
4893 return READ_DATA_RECEIVED;
4895 return READ_NO_DATA_RECEIVED;
4910 static enum try_read_result try_read_network(conn *c) {
4911 enum try_read_result gotdata = READ_NO_DATA_RECEIVED;
4924 if (num_allocs == 4) {
4928 char *new_rbuf = realloc(c->
rbuf, c->
rsize * 2);
4931 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
4932 "Couldn't realloc input buffer\n");
4935 out_string(c,
"SERVER_ERROR out of memory reading request");
4937 return READ_MEMORY_ERROR;
4944 res = recv(c->sfd, c->
rbuf + c->
rbytes, avail, 0);
4946 STATS_ADD(c, bytes_read, res);
4947 gotdata = READ_DATA_RECEIVED;
4959 if (errno == EAGAIN || errno == EWOULDBLOCK) {
4968 bool register_event(conn *c,
struct timeval *timeout) {
4970 assert(!c->registered_in_libevent);
4973 if (event_add(&c->event, timeout) == -1) {
4974 settings.extensions.logger->
log(EXTENSION_LOG_WARNING,
4976 "Failed to add connection to libevent: %s",
4982 c->registered_in_libevent =
true;
4988 bool unregister_event(conn *c) {
4990 assert(c->registered_in_libevent);
4993 if (event_del(&c->event) == -1) {
4998 c->registered_in_libevent =
false;
5005 bool update_event(conn *c,
const int new_flags) {
5009 if (c->ev_flags == new_flags)
5012 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
5013 "Updated event for %d to read=%s, write=%s\n",
5014 c->sfd, (new_flags & EV_READ ?
"yes" :
"no"),
5015 (new_flags & EV_WRITE ?
"yes" :
"no"));
5017 if (!unregister_event(c)) {
5021 event_set(&c->event, c->sfd, new_flags, event_handler, (
void *)c);
5022 event_base_set(base, &c->event);
5023 c->ev_flags = new_flags;
5025 return register_event(c, NULL);
5037 static enum transmit_result transmit(conn *c) {
5040 if (c->msgcurr < c->msgused &&
5041 c->msglist[c->msgcurr].msg_iovlen == 0) {
5045 if (c->msgcurr < c->msgused) {
5047 struct msghdr *m = &c->msglist[c->msgcurr];
5049 res = sendmsg(c->sfd, m, 0);
5051 STATS_ADD(c, bytes_written, res);
5055 while (m->msg_iovlen > 0 && res >= m->msg_iov->iov_len) {
5056 res -= m->msg_iov->iov_len;
5064 m->msg_iov->iov_base = (caddr_t)m->msg_iov->iov_base + res;
5065 m->msg_iov->iov_len -= res;
5067 return TRANSMIT_INCOMPLETE;
5069 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5070 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5072 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
5073 "Couldn't update event\n");
5075 conn_set_state(c, conn_closing);
5076 return TRANSMIT_HARD_ERROR;
5078 return TRANSMIT_SOFT_ERROR;
5083 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
5084 "Failed to write, and not due to blocking: %s",
5088 if (IS_UDP(c->transport))
5089 conn_set_state(c, conn_read);
5091 conn_set_state(c, conn_closing);
5092 return TRANSMIT_HARD_ERROR;
5094 return TRANSMIT_COMPLETE;
5098 bool conn_listening(conn *c)
5101 struct sockaddr_storage addr;
5102 socklen_t addrlen =
sizeof(addr);
5104 if ((sfd = accept(c->sfd, (
struct sockaddr *)&addr, &addrlen)) == -1) {
5105 if (errno == EMFILE) {
5107 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5108 "Too many open connections\n");
5111 }
else if (errno != EAGAIN && errno != EWOULDBLOCK) {
5112 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
5113 "Failed to accept new client: %s\n",
5121 int curr_conns = ++
stats.curr_conns;
5124 if (curr_conns >=
settings.maxconns) {
5126 ++
stats.rejected_conns;
5130 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5131 "Too many open connections\n");
5138 if (evutil_make_socket_nonblocking(sfd) == -1) {
5143 dispatch_conn_new(sfd, conn_new_cmd, EV_READ | EV_PERSIST,
5144 DATA_BUFFER_SIZE, tcp_transport);
5160 bool conn_ship_log(conn *c) {
5163 if (c->sfd == INVALID_SOCKET) {
5167 short mask = EV_READ | EV_PERSIST | EV_WRITE;
5169 if (c->which & EV_READ || c->
rbytes > 0) {
5171 if (try_read_command(c) == 0) {
5172 conn_set_state(c, conn_read);
5175 conn_set_state(c, conn_read);
5189 c->nevents =
settings.reqs_per_tap_event;
5190 }
else if (c->which & EV_WRITE) {
5192 if (c->nevents >= 0) {
5193 LOCK_THREAD(c->thread);
5194 c->ewouldblock =
false;
5196 if (c->ewouldblock) {
5197 mask = EV_READ | EV_PERSIST;
5201 UNLOCK_THREAD(c->thread);
5205 if (!update_event(c, mask)) {
5207 settings.extensions.logger->
log(EXTENSION_LOG_INFO,
5208 c,
"Couldn't update event\n");
5210 conn_set_state(c, conn_closing);
5216 bool conn_waiting(conn *c) {
5217 if (!update_event(c, EV_READ | EV_PERSIST)) {
5219 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5220 "Couldn't update event\n");
5222 conn_set_state(c, conn_closing);
5225 conn_set_state(c, conn_read);
5229 bool conn_read(conn *c) {
5230 int res = IS_UDP(c->transport) ? try_read_udp(c) : try_read_network(c);
5232 case READ_NO_DATA_RECEIVED:
5233 conn_set_state(c, conn_waiting);
5235 case READ_DATA_RECEIVED:
5236 conn_set_state(c, conn_parse_cmd);
5239 conn_set_state(c, conn_closing);
5241 case READ_MEMORY_ERROR:
5249 bool conn_parse_cmd(conn *c) {
5250 if (try_read_command(c) == 0) {
5252 conn_set_state(c, conn_waiting);
5255 return !c->ewouldblock;
5258 bool conn_new_cmd(conn *c) {
5261 if (c->nevents >= 0) {
5262 reset_cmd_handler(c);
5264 STATS_NOKEY(c, conn_yields);
5272 if (!update_event(c, EV_WRITE | EV_PERSIST)) {
5274 settings.extensions.logger->
log(EXTENSION_LOG_INFO,
5275 c,
"Couldn't update event\n");
5277 conn_set_state(c, conn_closing);
5288 bool conn_swallow(conn *c) {
5291 if (c->sbytes == 0) {
5292 conn_set_state(c, conn_new_cmd);
5298 uint32_t tocopy = c->
rbytes > c->sbytes ? c->sbytes : c->
rbytes;
5299 c->sbytes -= tocopy;
5306 res = recv(c->sfd, c->
rbuf, c->
rsize > c->sbytes ? c->sbytes : c->
rsize, 0);
5308 STATS_ADD(c, bytes_read, res);
5313 conn_set_state(c, conn_closing);
5316 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5317 if (!update_event(c, EV_READ | EV_PERSIST)) {
5319 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5320 "Couldn't update event\n");
5322 conn_set_state(c, conn_closing);
5328 if (errno != ENOTCONN && errno != ECONNRESET) {
5330 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5331 "Failed to read, and not due to blocking (%s)\n",
5335 conn_set_state(c, conn_closing);
5341 bool conn_nread(conn *c) {
5347 bool block = c->ewouldblock =
false;
5354 if (c->ewouldblock) {
5355 unregister_event(c);
5379 STATS_ADD(c, bytes_read, res);
5388 conn_set_state(c, conn_closing);
5392 #ifdef INNODB_MEMCACHED
5397 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK || !errno)) {
5399 if (res == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
5401 if (!update_event(c, EV_READ | EV_PERSIST)) {
5403 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5404 "Couldn't update event\n");
5406 conn_set_state(c, conn_closing);
5412 if (errno != ENOTCONN && errno != ECONNRESET) {
5414 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
5415 "Failed to read, and not due to blocking:\n"
5417 "rcurr=%lx ritem=%lx rbuf=%lx rlbytes=%d rsize=%d\n",
5418 errno, strerror(errno),
5422 conn_set_state(c, conn_closing);
5426 bool conn_write(conn *c) {
5432 if (c->iovused == 0 || (IS_UDP(c->transport) && c->iovused == 1)) {
5433 if (add_iov(c, c->wcurr, c->wbytes) != 0) {
5435 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5436 "Couldn't build response\n");
5438 conn_set_state(c, conn_closing);
5443 return conn_mwrite(c);
5446 bool conn_mwrite(conn *c) {
5447 if (IS_UDP(c->transport) && c->msgcurr == 0 && build_udp_headers(c) != 0) {
5449 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5450 "Failed to build UDP headers\n");
5452 conn_set_state(c, conn_closing);
5456 switch (transmit(c)) {
5457 case TRANSMIT_COMPLETE:
5458 if (c->state == conn_mwrite) {
5459 while (c->ileft > 0) {
5460 item *it = *(c->icurr);
5465 while (c->suffixleft > 0) {
5466 char *suffix = *(c->suffixcurr);
5467 cache_free(c->thread->suffix_cache, suffix);
5472 if(c->protocol == binary_prot) {
5475 conn_set_state(c, conn_new_cmd);
5477 }
else if (c->state == conn_write) {
5478 if (c->write_and_free) {
5479 free(c->write_and_free);
5480 c->write_and_free = 0;
5485 settings.extensions.logger->
log(EXTENSION_LOG_INFO, c,
5486 "Unexpected state %d\n", c->state);
5488 conn_set_state(c, conn_closing);
5492 case TRANSMIT_INCOMPLETE:
5493 case TRANSMIT_HARD_ERROR:
5496 case TRANSMIT_SOFT_ERROR:
5503 bool conn_pending_close(conn *c) {
5504 assert(c->sfd == INVALID_SOCKET);
5505 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
5506 "Awaiting clients to release the cookie (pending close for %p)",
5508 LOCK_THREAD(c->thread);
5509 c->thread->pending_io = list_remove(c->thread->pending_io, c);
5510 if (!list_contains(c->thread->pending_close, c)) {
5511 enlist_conn(c, &c->thread->pending_close);
5513 UNLOCK_THREAD(c->thread);
5519 perform_callbacks(ON_DISCONNECT, NULL, c);
5525 return c->state != conn_pending_close;
5528 bool conn_immediate_close(conn *c) {
5529 settings.extensions.logger->
log(EXTENSION_LOG_DETAIL, c,
5530 "Immediate close of %p",
5532 perform_callbacks(ON_DISCONNECT, NULL, c);
5538 bool conn_closing(conn *c) {
5539 if (IS_UDP(c->transport)) {
5545 unregister_event(c);
5547 c->sfd = INVALID_SOCKET;
5549 if (c->refcount > 1) {
5550 conn_set_state(c, conn_pending_close);
5552 conn_set_state(c, conn_immediate_close);
5557 bool conn_add_tap_client(conn *c) {
5561 assert(orig_thread);
5562 assert(orig_thread != tp);
5564 c->ewouldblock =
true;
5566 unregister_event(c);
5568 LOCK_THREAD(orig_thread);
5570 orig_thread->pending_io = list_remove(orig_thread->pending_io, c);
5571 orig_thread->pending_close = list_remove(orig_thread->pending_close, c);
5575 conn_set_state(c, conn_setup_tap_stream);
5576 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
5577 "Moving %d conn from %p to %p\n",
5578 c->sfd, c->thread, tp);
5580 c->event.ev_base = tp->base;
5581 assert(c->next == NULL);
5582 assert(c->list_state == 0);
5583 enlist_conn(c, &tp->pending_io);
5587 UNLOCK_THREAD(orig_thread);
5594 bool conn_setup_tap_stream(conn *c) {
5595 process_bin_tap_connect(c);
5599 void event_handler(
const int fd,
const short which,
void *arg) {
5605 if (memcached_shutdown) {
5606 event_base_loopbreak(c->event.ev_base);
5615 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, c,
5616 "Catastrophic: event fd doesn't match conn fd!\n");
5622 perform_callbacks(ON_SWITCH_CONN, c, c);
5624 c->nevents =
settings.reqs_per_event;
5625 if (c->state == conn_ship_log) {
5626 c->nevents =
settings.reqs_per_tap_event;
5632 const size_t max_items = 256;
5633 conn *pending_close[max_items];
5634 size_t n_pending_close = 0;
5637 if (thr->pending_close && thr->last_checked != current_time) {
5638 assert(!has_cycle(thr->pending_close));
5639 thr->last_checked = current_time;
5641 n_pending_close = list_to_array(pending_close, max_items,
5642 &thr->pending_close);
5649 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, c,
5650 "%d - Running task: (%s)\n",
5651 c->sfd, state_text(c->state));
5652 }
while (c->state(c));
5654 while (c->state(c)) {
5660 if (n_pending_close > 0) {
5661 for (
size_t i = 0; i < n_pending_close; ++
i) {
5662 conn *ce = pending_close[
i];
5663 if (ce->refcount == 1) {
5664 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
5665 "OK, time to nuke: %p\n",
5669 LOCK_THREAD(ce->thread);
5670 enlist_conn(ce, &ce->thread->pending_close);
5671 UNLOCK_THREAD(ce->thread);
5678 finalize_list(pending_close, n_pending_close);
5683 static void dispatch_event_handler(
int fd,
short which,
void *arg) {
5685 ssize_t nr = recv(fd, buffer,
sizeof(buffer), 0);
5687 if (nr != -1 && is_listen_disabled()) {
5688 bool enable =
false;
5689 pthread_mutex_lock(&listen_state.mutex);
5690 listen_state.count -= nr;
5691 if (listen_state.count <= 0) {
5693 listen_state.disabled =
false;
5695 pthread_mutex_unlock(&listen_state.mutex);
5698 for (next = listen_conn; next; next = next->next) {
5699 update_event(next, EV_READ | EV_PERSIST);
5700 if (listen(next->sfd,
settings.backlog) != 0) {
5701 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5712 static SOCKET new_socket(
struct addrinfo *ai) {
5715 sfd = socket(ai->ai_family, ai->ai_socktype, ai->ai_protocol);
5716 if (sfd == INVALID_SOCKET) {
5717 return INVALID_SOCKET;
5720 if (evutil_make_socket_nonblocking(sfd) == -1) {
5722 return INVALID_SOCKET;
5732 static void maximize_sndbuf(
const int sfd) {
5733 socklen_t intsize =
sizeof(int);
5739 if (getsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (
void *)&old_size, &intsize) != 0) {
5741 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5742 "getsockopt(SO_SNDBUF): %s",
5751 max = MAX_SENDBUF_SIZE;
5753 while (min <= max) {
5754 avg = ((
unsigned int)(min + max)) / 2;
5755 if (setsockopt(sfd, SOL_SOCKET, SO_SNDBUF, (
void *)&avg, intsize) == 0) {
5764 settings.extensions.logger->
log(EXTENSION_LOG_DEBUG, NULL,
5765 "<%d send buffer was %d, now %d\n", sfd, old_size, last_good);
5780 static int server_socket(
const char *interface,
5782 enum network_transport transport,
5783 FILE *portnumber_file) {
5785 struct linger ling = {0, 0};
5788 struct addrinfo hints = { .ai_flags = AI_PASSIVE,
5789 .ai_family = AF_UNSPEC };
5790 char port_buf[NI_MAXSERV];
5796 hints.ai_socktype = IS_UDP(transport) ? SOCK_DGRAM : SOCK_STREAM;
5801 snprintf(port_buf,
sizeof(port_buf),
"%d", port);
5802 error= getaddrinfo(interface, port_buf, &hints, &ai);
5804 if (error != EAI_SYSTEM) {
5805 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5806 "getaddrinfo(): %s\n", gai_strerror(error));
5808 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5809 "getaddrinfo(): %s\n", strerror(error));
5814 for (next= ai; next; next= next->ai_next) {
5815 conn *listen_conn_add;
5816 if ((sfd = new_socket(next)) == INVALID_SOCKET) {
5824 if (next->ai_family == AF_INET6) {
5825 error = setsockopt(sfd, IPPROTO_IPV6, IPV6_V6ONLY, (
char *) &flags,
sizeof(flags));
5827 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5828 "setsockopt(IPV6_V6ONLY): %s",
5836 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (
void *)&flags,
sizeof(flags));
5837 if (IS_UDP(transport)) {
5838 maximize_sndbuf(sfd);
5839 udp_socket[num_udp_socket] = sfd;
5842 error = setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (
void *)&flags,
sizeof(flags));
5844 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5845 "setsockopt(SO_KEEPALIVE): %s",
5849 error = setsockopt(sfd, SOL_SOCKET, SO_LINGER, (
void *)&ling,
sizeof(ling));
5851 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5852 "setsockopt(SO_LINGER): %s",
5856 error = setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (
void *)&flags,
sizeof(flags));
5858 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5859 "setsockopt(TCP_NODELAY): %s",
5864 if (bind(sfd, next->ai_addr, next->ai_addrlen) == SOCKET_ERROR) {
5865 if (errno != EADDRINUSE) {
5866 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5877 if (!IS_UDP(transport) && listen(sfd,
settings.backlog) == SOCKET_ERROR) {
5878 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5885 if (portnumber_file != NULL &&
5886 (next->ai_addr->sa_family == AF_INET ||
5887 next->ai_addr->sa_family == AF_INET6)) {
5889 struct sockaddr_in in;
5890 struct sockaddr_in6 in6;
5892 socklen_t len =
sizeof(my_sockaddr);
5893 if (getsockname(sfd, (
struct sockaddr*)&my_sockaddr, &len)==0) {
5894 if (next->ai_addr->sa_family == AF_INET) {
5895 fprintf(portnumber_file,
"%s INET: %u\n",
5896 IS_UDP(transport) ?
"UDP" :
"TCP",
5897 ntohs(my_sockaddr.in.sin_port));
5899 fprintf(portnumber_file,
"%s INET6: %u\n",
5900 IS_UDP(transport) ?
"UDP" :
"TCP",
5901 ntohs(my_sockaddr.in6.sin6_port));
5907 if (IS_UDP(transport)) {
5910 for (c = 0; c <
settings.num_threads_per_udp; c++) {
5912 dispatch_conn_new(sfd, conn_read, EV_READ | EV_PERSIST,
5913 UDP_READ_BUFFER_SIZE, transport);
5916 ++
stats.daemon_conns;
5920 if (!(listen_conn_add = conn_new(sfd, conn_listening,
5921 EV_READ | EV_PERSIST, 1,
5922 transport, main_base, NULL))) {
5923 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5924 "failed to create listening connection\n");
5927 listen_conn_add->next = listen_conn;
5928 listen_conn = listen_conn_add;
5931 ++
stats.daemon_conns;
5939 return success == 0;
5942 static int server_sockets(
int port,
enum network_transport transport,
5943 FILE *portnumber_file) {
5945 return server_socket(
settings.inter, port, transport, portnumber_file);
5950 char *list = strdup(
settings.inter);
5953 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5954 "Failed to allocate memory for parsing server interface string\n");
5957 for (
char *p = strtok_r(list,
";,", &b);
5959 p = strtok_r(NULL,
";,", &b)) {
5960 int the_port = port;
5962 char *s = strchr(p,
':');
5966 if (!safe_strtol(s, &the_port)) {
5967 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5968 "Invalid port number: \"%s\"", s);
5972 if (strcmp(p,
"*") == 0) {
5975 ret |= server_socket(p, the_port, transport, portnumber_file);
5982 static int new_socket_unix(
void) {
5985 if ((sfd = socket(AF_UNIX, SOCK_STREAM, 0)) == INVALID_SOCKET) {
5986 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
5987 "socket(AF_UNIX, SOCK_STREAM, 0): %s",
5989 return INVALID_SOCKET;
5992 if (evutil_make_socket_nonblocking(sfd) == -1) {
5994 return INVALID_SOCKET;
6000 static int server_socket_unix(
const char *path,
int access_mask) {
6002 struct linger ling = {0, 0};
6003 struct sockaddr_un addr;
6012 if ((sfd = new_socket_unix()) == -1) {
6019 if (lstat(path, &tstat) == 0) {
6020 if (S_ISSOCK(tstat.st_mode))
6024 setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (
void *)&flags,
sizeof(flags));
6025 setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (
void *)&flags,
sizeof(flags));
6026 setsockopt(sfd, SOL_SOCKET, SO_LINGER, (
void *)&ling,
sizeof(ling));
6032 memset(&addr, 0,
sizeof(addr));
6034 addr.sun_family = AF_UNIX;
6035 strncpy(addr.sun_path, path,
sizeof(addr.sun_path) - 1);
6036 assert(strcmp(addr.sun_path, path) == 0);
6037 old_umask = umask( ~(access_mask&0777));
6038 if (bind(sfd, (
struct sockaddr *)&addr,
sizeof(addr)) == -1) {
6039 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6047 if (listen(sfd,
settings.backlog) == -1) {
6048 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6054 if (!(listen_conn = conn_new(sfd, conn_listening,
6055 EV_READ | EV_PERSIST, 1,
6056 local_transport, main_base, NULL))) {
6057 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6058 "failed to create listening connection\n");
6062 ++
stats.daemon_conns;
6068 static struct event clockevent;
6071 static void set_current_time(
void) {
6074 gettimeofday(&timer, NULL);
6075 current_time = (rel_time_t) (timer.tv_sec - process_started);
6078 static void clock_handler(
const int fd,
const short which,
void *arg) {
6079 struct timeval t = {.tv_sec = 1, .tv_usec = 0};
6080 static bool initialized =
false;
6082 if (memcached_shutdown) {
6083 event_base_loopbreak(main_base);
6095 event_base_set(main_base, &clockevent);
6101 static void usage(
void) {
6102 printf(PACKAGE
" " VERSION
"\n");
6103 printf(
"-p <num> TCP port number to listen on (default: 11211)\n"
6104 "-U <num> UDP port number to listen on (default: 11211, 0 is off)\n"
6105 "-s <file> UNIX socket path to listen on (disables network support)\n"
6106 "-a <mask> access mask for UNIX socket, in octal (default: 0700)\n"
6107 "-l <addr> interface to listen on (default: INADDR_ANY, all addresses)\n"
6108 " <addr> may be specified as host:port. If you don't specify\n"
6109 " a port number, the value you specified with -p or -U is\n"
6110 " used. You may specify multiple addresses separated by comma\n"
6111 " or by using -l multiple times\n"
6112 "-d run as a daemon\n"
6113 "-r maximize core file limit\n"
6114 "-u <username> assume identity of <username> (only when run as root)\n"
6115 "-m <num> max memory to use for items in megabytes (default: 64 MB)\n"
6116 "-M return error on memory exhausted (rather than removing items)\n"
6117 "-c <num> max simultaneous connections (default: 1000)\n"
6118 "-k lock down all paged memory. Note that there is a\n"
6119 " limit on how much memory you may lock. Trying to\n"
6120 " allocate more than that would fail, so be sure you\n"
6121 " set the limit correctly for the user you started\n"
6122 " the daemon with (not for -u <username> user;\n"
6123 " under sh this is done with 'ulimit -S -l NUM_KB').\n"
6124 "-v verbose (print errors/warnings while in event loop)\n"
6125 "-vv very verbose (also print client commands/reponses)\n"
6126 "-vvv extremely verbose (also print internal state transitions)\n"
6127 "-h print this help and exit\n"
6128 "-i print memcached and libevent license\n"
6129 "-P <file> save PID in <file>, only used with -d option\n"
6130 "-f <factor> chunk size growth factor (default: 1.25)\n"
6131 "-n <bytes> minimum space allocated for key+value+flags (default: 48)\n");
6132 printf(
"-L Try to use large memory pages (if available). Increasing\n"
6133 " the memory page size could reduce the number of TLB misses\n"
6134 " and improve the performance. In order to get large pages\n"
6135 " from the OS, memcached will allocate the total item-cache\n"
6136 " in one large chunk.\n");
6137 printf(
"-D <char> Use <char> as the delimiter between key prefixes and IDs.\n"
6138 " This is used for per-prefix stats reporting. The default is\n"
6139 " \":\" (colon). If this option is specified, stats collection\n"
6140 " is turned on automatically; if not, then it may be turned on\n"
6141 " by sending the \"stats detail on\" command to the server.\n");
6142 printf(
"-t <num> number of threads to use (default: 4)\n");
6143 printf(
"-R Maximum number of requests per event, limits the number of\n"
6144 " requests process for a given connection to prevent \n"
6145 " starvation (default: 20)\n");
6146 printf(
"-C Disable use of CAS\n");
6147 printf(
"-b Set the backlog queue limit (default: 1024)\n");
6148 printf(
"-B Binding protocol - one of ascii, binary, or auto (default)\n");
6149 printf(
"-I Override the size of each slab page. Adjusts max item size\n"
6150 " (default: 1mb, min: 1k, max: 128m)\n");
6151 printf(
"-q Disable detailed stats commands\n");
6153 printf(
"-S Require SASL authentication\n");
6155 printf(
"-X module,cfg Load the module and initialize it with the config\n");
6156 printf(
"-E engine Load engine as the storage engine\n");
6157 printf(
"-e config Pass config as configuration options to the storage engine\n");
6158 printf(
"\nEnvironment variables:\n"
6159 "MEMCACHED_PORT_FILENAME File to write port information to\n"
6160 "MEMCACHED_TOP_KEYS Number of top keys to keep track of\n"
6161 "MEMCACHED_REQS_TAP_EVENT Similar to -R but for tap_ship_log\n");
6163 static void usage_license(
void) {
6164 printf(PACKAGE
" " VERSION
"\n\n");
6166 "Copyright (c) 2003, Danga Interactive, Inc. <http://www.danga.com/>\n"
6167 "All rights reserved.\n"
6169 "Redistribution and use in source and binary forms, with or without\n"
6170 "modification, are permitted provided that the following conditions are\n"
6173 " * Redistributions of source code must retain the above copyright\n"
6174 "notice, this list of conditions and the following disclaimer.\n"
6176 " * Redistributions in binary form must reproduce the above\n"
6177 "copyright notice, this list of conditions and the following disclaimer\n"
6178 "in the documentation and/or other materials provided with the\n"
6181 " * Neither the name of the Danga Interactive nor the names of its\n"
6182 "contributors may be used to endorse or promote products derived from\n"
6183 "this software without specific prior written permission.\n"
6185 "THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS\n"
6186 "\"AS IS\" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT\n"
6187 "LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR\n"
6188 "A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT\n"
6189 "OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,\n"
6190 "SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT\n"
6191 "LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6192 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6193 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6194 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE\n"
6195 "OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6198 "This product includes software developed by Niels Provos.\n"
6202 "Copyright 2000-2003 Niels Provos <provos@citi.umich.edu>\n"
6203 "All rights reserved.\n"
6205 "Redistribution and use in source and binary forms, with or without\n"
6206 "modification, are permitted provided that the following conditions\n"
6208 "1. Redistributions of source code must retain the above copyright\n"
6209 " notice, this list of conditions and the following disclaimer.\n"
6210 "2. Redistributions in binary form must reproduce the above copyright\n"
6211 " notice, this list of conditions and the following disclaimer in the\n"
6212 " documentation and/or other materials provided with the distribution.\n"
6213 "3. All advertising materials mentioning features or use of this software\n"
6214 " must display the following acknowledgement:\n"
6215 " This product includes software developed by Niels Provos.\n"
6216 "4. The name of the author may not be used to endorse or promote products\n"
6217 " derived from this software without specific prior written permission.\n"
6219 "THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR\n"
6220 "IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES\n"
6221 "OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.\n"
6222 "IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT,\n"
6223 "INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT\n"
6224 "NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,\n"
6225 "DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY\n"
6226 "THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT\n"
6227 "(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF\n"
6228 "THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.\n"
6234 static void save_pid(
const char *pid_file) {
6237 if (access(pid_file, F_OK) == 0) {
6238 if ((fp = fopen(pid_file,
"r")) != NULL) {
6240 if (fgets(buffer,
sizeof(buffer), fp) != NULL) {
6242 if (safe_strtoul(buffer, &pid) &&
kill((pid_t)pid, 0) == 0) {
6243 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6244 "WARNING: The pid file contained the following (running) pid: %u\n", pid);
6251 if ((fp = fopen(pid_file,
"w")) == NULL) {
6252 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6253 "Could not open the pid file %s for writing: %s\n",
6254 pid_file, strerror(errno));
6258 fprintf(fp,
"%ld\n", (
long)getpid());
6259 if (fclose(fp) == -1) {
6260 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6261 "Could not close the pid file %s: %s\n",
6262 pid_file, strerror(errno));
6266 static void remove_pidfile(
const char *pid_file) {
6267 if (pid_file != NULL) {
6268 if (unlink(pid_file) != 0) {
6269 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6270 "Could not remove the pid file %s: %s\n",
6271 pid_file, strerror(errno));
6276 #ifndef HAVE_SIGIGNORE
6277 static int sigignore(
int sig) {
6278 struct sigaction sa = { .sa_handler = SIG_IGN, .sa_flags = 0 };
6280 if (sigemptyset(&sa.sa_mask) == -1 ||
sigaction(sig, &sa, 0) == -1) {
6287 static void sigterm_handler(
int sig) {
6288 assert(sig == SIGTERM || sig == SIGINT);
6289 memcached_shutdown = 1;
6292 static int install_sigterm_handler(
void) {
6293 struct sigaction sa = {.sa_handler = sigterm_handler, .sa_flags = 0};
6295 if (sigemptyset(&sa.sa_mask) == -1 ||
sigaction(SIGTERM, &sa, 0) == -1 ||
6307 static int enable_large_pages(
void) {
6308 #if defined(HAVE_GETPAGESIZES) && defined(HAVE_MEMCNTL)
6311 int avail = getpagesizes(sizes, 32);
6313 size_t max = sizes[0];
6314 struct memcntl_mha arg = {0};
6317 for (ii = 1; ii < avail; ++ii) {
6318 if (max < sizes[ii]) {
6324 arg.mha_pagesize = max;
6325 arg.mha_cmd = MHA_MAPSIZE_BSSBRK;
6327 if (memcntl(0, 0, MC_HAT_ADVISE, (caddr_t)&arg, 0, 0) == -1) {
6328 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6329 "Failed to set large pages: %s\nWill use default page size\n",
6335 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6336 "Failed to get supported pagesizes: %s\nWill use default page size\n",
6346 static const char* get_server_version(
void) {
6350 static void store_engine_specific(
const void *cookie,
6351 void *engine_data) {
6352 conn *c = (conn*)cookie;
6353 c->engine_storage = engine_data;
6356 static void *get_engine_specific(
const void *cookie) {
6357 conn *c = (conn*)cookie;
6358 return c->engine_storage;
6361 static int get_socket_fd(
const void *cookie) {
6362 conn *c = (conn *)cookie;
6366 static void set_tap_nack_mode(
const void *cookie,
bool enable) {
6367 conn *c = (conn *)cookie;
6368 c->tap_nack_mode = enable;
6371 static void reserve_cookie(
const void *cookie) {
6372 conn *c = (conn *)cookie;
6376 static void release_cookie(
const void *cookie) {
6377 conn *c = (conn *)cookie;
6381 static int num_independent_stats(
void) {
6385 static void *new_independent_stats(
void) {
6387 int nrecords = num_independent_stats();
6390 independent_stats->topkeys = topkeys_init(
settings.topkeys);
6391 for (ii = 0; ii < nrecords; ii++)
6392 pthread_mutex_init(&independent_stats->thread_stats[ii].mutex, NULL);
6393 return independent_stats;
6396 static void release_independent_stats(
void *
stats) {
6398 int nrecords = num_independent_stats();
6399 struct independent_stats *independent_stats = stats;
6400 if (independent_stats->topkeys)
6401 topkeys_free(independent_stats->topkeys);
6402 for (ii = 0; ii < nrecords; ii++)
6403 pthread_mutex_destroy(&independent_stats->thread_stats[ii].mutex);
6404 free(independent_stats);
6407 static inline struct independent_stats *get_independent_stats(conn *c) {
6408 struct independent_stats *independent_stats;
6411 if (independent_stats == NULL)
6412 independent_stats = default_independent_stats;
6414 independent_stats = default_independent_stats;
6416 return independent_stats;
6419 static inline struct thread_stats *get_thread_stats(conn *c) {
6420 struct independent_stats *independent_stats = get_independent_stats(c);
6421 assert(c->thread->index < num_independent_stats());
6422 return &independent_stats->thread_stats[c->thread->index];
6426 ENGINE_EVENT_TYPE type,
6427 EVENT_CALLBACK cb,
const void *cb_data) {
6433 h->cb_data = cb_data;
6434 h->next = engine_event_handlers[
type];
6435 engine_event_handlers[
type] = h;
6438 static rel_time_t get_current_time(
void)
6440 return current_time;
6443 static void count_eviction(
const void *cookie,
const void *key,
const int nkey) {
6444 topkeys_t *tk = get_independent_stats((conn*)cookie)->topkeys;
6445 TK(tk, evictions, key, nkey, get_current_time());
6455 static ENGINE_ERROR_CODE internal_arithmetic(
ENGINE_HANDLE* handle,
6459 const bool increment,
6461 const uint64_t delta,
6462 const uint64_t initial,
6463 const rel_time_t exptime,
6472 ENGINE_ERROR_CODE
ret;
6473 ret = e->
get(handle, cookie, &it, key, nkey, vbucket);
6475 if (ret == ENGINE_SUCCESS) {
6479 e->
release(handle, cookie, it);
6480 return ENGINE_FAILED;
6485 if (info.value[0].iov_len > (
sizeof(value) - 1)) {
6486 e->
release(handle, cookie, it);
6487 return ENGINE_EINVAL;
6490 memcpy(value, info.value[0].iov_base, info.value[0].iov_len);
6491 value[info.value[0].iov_len] =
'\0';
6494 if (!safe_strtoull(value, &val)) {
6495 e->
release(handle, cookie, it);
6496 return ENGINE_EINVAL;
6509 size_t nb = snprintf(value,
sizeof(value),
"%"PRIu64, val);
6512 if (e->
allocate(handle, cookie, &nit, key,
6513 nkey, nb, info.
flags, info.
exptime) != ENGINE_SUCCESS) {
6514 e->
release(handle, cookie, it);
6515 return ENGINE_ENOMEM;
6520 e->
release(handle, cookie, it);
6521 e->
release(handle, cookie, nit);
6522 return ENGINE_FAILED;
6525 memcpy(i2.value[0].iov_base, value, nb);
6527 ret = e->
store(handle, cookie, nit, cas, OPERATION_CAS, vbucket);
6528 e->
release(handle, cookie, it);
6529 e->
release(handle, cookie, nit);
6530 }
else if (ret == ENGINE_KEY_ENOENT && create) {
6532 size_t nb = snprintf(value,
sizeof(value),
"%"PRIu64
"\r\n", initial);
6534 if (e->
allocate(handle, cookie, &it, key, nkey, nb, 0, exptime) != ENGINE_SUCCESS) {
6535 e->
release(handle, cookie, it);
6536 return ENGINE_ENOMEM;
6541 e->
release(handle, cookie, it);
6542 return ENGINE_FAILED;
6545 memcpy(info.value[0].iov_base, value, nb);
6546 ret = e->
store(handle, cookie, it, cas, OPERATION_CAS, vbucket);
6547 e->
release(handle, cookie, it);
6551 if (ret == ENGINE_KEY_EEXISTS) {
6552 return internal_arithmetic(handle, cookie, key, nkey, increment, create, delta,
6553 initial, exptime, cas, result, vbucket);
6568 if (extension == NULL) {
6577 if (ptr == extension) {
6582 settings.extensions.daemons = extension;
6585 settings.extensions.logger = extension;
6588 if (
settings.extensions.ascii != NULL) {
6590 for (last =
settings.extensions.ascii; last->
next != NULL;
6591 last = last->
next) {
6592 if (last == extension) {
6596 if (last == extension) {
6599 last->
next = extension;
6602 settings.extensions.ascii = extension;
6626 while (ptr != NULL && ptr != extension) {
6631 if (ptr != NULL && prev != NULL) {
6635 if (
settings.extensions.daemons == ptr) {
6641 if (
settings.extensions.logger == extension) {
6642 if (get_stderr_logger() == extension) {
6643 settings.extensions.logger = get_null_logger();
6645 settings.extensions.logger = get_stderr_logger();
6654 while (ptr != NULL && ptr != extension) {
6659 if (ptr != NULL && prev != NULL) {
6663 if (
settings.extensions.ascii == ptr) {
6682 return settings.extensions.daemons;
6695 #ifdef INNODB_MEMCACHED
6696 void shutdown_server(
void) {
6698 static void shutdown_server(
void) {
6700 #ifdef INNODB_MEMCACHED
6703 while (listen_conn) {
6704 conn_closing(listen_conn);
6705 listen_conn = listen_conn->next;
6708 for (i = 0; i < num_udp_socket; i++) {
6709 safe_close(udp_socket[i]);
6712 memcached_shutdown = 1;
6715 #ifdef INNODB_MEMCACHED
6716 bool shutdown_complete(
void)
6718 return(memcached_shutdown == 2);
6727 static EXTENSION_LOG_LEVEL get_log_level(
void)
6729 EXTENSION_LOG_LEVEL
ret;
6731 case 0: ret = EXTENSION_LOG_WARNING;
break;
6732 case 1: ret = EXTENSION_LOG_INFO;
break;
6733 case 2: ret = EXTENSION_LOG_DEBUG;
break;
6735 ret = EXTENSION_LOG_DETAIL;
6740 static void set_log_level(EXTENSION_LOG_LEVEL severity)
6743 case EXTENSION_LOG_WARNING:
settings.verbose = 0;
break;
6744 case EXTENSION_LOG_INFO:
settings.verbose = 1;
break;
6745 case EXTENSION_LOG_DEBUG:
settings.verbose = 2;
break;
6751 static void get_config_append_stats(
const char *key,
const uint16_t klen,
6752 const char *val,
const uint32_t vlen,
6755 if (klen == 0 || vlen == 0) {
6759 char *pos = (
char*)cookie;
6760 size_t nbytes = strlen(pos);
6762 if ((nbytes + klen + vlen + 3) > 1024) {
6767 memcpy(pos + nbytes, key, klen);
6771 memcpy(pos + nbytes, val, vlen);
6773 memcpy(pos + nbytes,
";", 2);
6779 process_stat_settings(get_config_append_stats, config);
6780 int rval = parse_config(config, items, NULL);
6794 .realtime = realtime,
6796 .get_current_time = get_current_time,
6797 .parse_config = parse_config,
6798 .shutdown = shutdown_server,
6799 .get_config = get_config
6804 .store_engine_specific = store_engine_specific,
6805 .get_engine_specific = get_engine_specific,
6806 .get_socket_fd = get_socket_fd,
6807 .set_tap_nack_mode = set_tap_nack_mode,
6808 .notify_io_complete = notify_io_complete,
6809 .reserve = reserve_cookie,
6810 .release = release_cookie
6815 .release_stats = release_independent_stats,
6816 .evicting = count_eviction
6820 .get_logger = get_logger,
6821 .get_level = get_log_level,
6822 .set_level = set_log_level
6826 .unregister_extension = unregister_extension,
6827 .get_extension = get_extension
6832 .perform_callbacks = perform_callbacks,
6838 .stat = &server_stat_api,
6839 .extension = &extension_api,
6840 .callback = &callback_api,
6841 .log = &server_log_api,
6842 .cookie = &server_cookie_api
6845 if (rv.engine == NULL) {
6859 static bool load_extension(
const char *soname,
const char *
config) {
6860 if (soname == NULL) {
6868 } funky = {.initialize = NULL };
6870 void *handle = dlopen(soname, RTLD_NOW | RTLD_LOCAL);
6871 if (handle == NULL) {
6872 const char *msg = dlerror();
6873 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6874 "Failed to open library \"%s\": %s\n",
6875 soname, msg ? msg :
"unknown error");
6879 void *symbol = dlsym(handle,
"memcached_extensions_initialize");
6880 if (symbol == NULL) {
6881 const char *msg = dlerror();
6882 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6883 "Could not find symbol \"memcached_extensions_initialize\" in %s: %s\n",
6884 soname, msg ? msg :
"unknown error");
6887 funky.voidptr = symbol;
6892 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6893 "Failed to initalize extensions from %s. Error code: %d\n",
6900 settings.extensions.logger->
log(EXTENSION_LOG_INFO, NULL,
6901 "Loaded extensions from: %s\n", soname);
6911 static bool sanitycheck(
void) {
6913 const char *ever = event_get_version();
6915 if (strncmp(ever,
"1.", 2) == 0) {
6917 if ((ever[2] ==
'1' || ever[2] ==
'2') && !isdigit(ever[3])) {
6918 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
6919 "You are using libevent %s.\nPlease upgrade to"
6920 " a more recent version (1.3 or newer)\n",
6921 event_get_version());
6930 #ifdef INNODB_MEMCACHED
6933 my_strdupl(
const char* str,
int len)
6935 char* s = (
char*) malloc(len + 1);
6937 return((
char*) memcpy(s, str, len));
6944 daemon_memcached_make_option(
char*
option,
int* option_argc,
6945 char*** option_argv)
6947 static const char* sep =
" ";
6954 my_str = my_strdupl(option, strlen(option));
6956 for (opt_str = strtok_r(my_str, sep, &last);
6958 opt_str = strtok_r(NULL, sep, &last)) {
6966 *option_argv = (
char**) malloc((num_arg + 1)
6967 *
sizeof(**option_argv));
6969 for (opt_str = strtok_r(my_str, sep, &last);
6971 opt_str = strtok_r(NULL, sep, &last)) {
6972 (*option_argv)[
i] = my_strdupl(opt_str, strlen(opt_str));
6976 assert(i == num_arg + 1);
6978 *option_argc = (num_arg + 1);
6988 unsigned int eng_r_batch_size;
6989 unsigned int eng_w_batch_size;
6994 #ifdef INNODB_MEMCACHED
6995 void* daemon_memcached_main(
void *p) {
6997 int main (
int argc,
char **argv) {
7000 bool lock_memory =
false;
7001 bool do_daemonize =
false;
7002 bool preallocate =
false;
7004 char *username = NULL;
7005 char *pid_file = NULL;
7011 bool protocol_specified =
false;
7012 bool tcp_specified =
false;
7013 bool udp_specified =
false;
7016 const char *engine_config = NULL;
7017 char old_options[1024] = { [0] =
'\0' };
7018 char *old_opts = old_options;
7019 #ifdef INNODB_MEMCACHED
7020 int option_argc = 0;
7021 char** option_argv = NULL;
7024 if (m_config->m_engine_library) {
7025 engine = m_config->m_engine_library;
7030 my_eng_config.
cb_ptr = m_config->m_innodb_api_cb;
7031 my_eng_config.eng_r_batch_size = m_config->m_r_batch_size;
7032 my_eng_config.eng_w_batch_size = m_config->m_w_batch_size;
7033 my_eng_config.enable_binlog = m_config->m_enable_binlog;
7035 engine_config = (
const char *) (&my_eng_config);
7038 engine =
"default_engine.so";
7041 engine =
"default_engine.so";
7044 memcached_shutdown = 0;
7046 if (!sanitycheck()) {
7054 process_started = time(0) - 2;
7058 initialize_sockets();
7064 fprintf(stderr,
"Failed to initialize log system\n");
7068 if (m_config->m_mem_option) {
7069 daemon_memcached_make_option(m_config->m_mem_option,
7074 #ifdef INNODB_MEMCACHED
7076 if (option_argc > 0 && option_argv) {
7080 while (-1 != (c = getopt(option_argc, option_argv,
7115 settings.access= strtol(optarg,NULL,8);
7120 udp_specified =
true;
7124 tcp_specified =
true;
7130 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
7131 old_opts += sprintf(old_opts,
"cache_size=%lu;",
7136 old_opts += sprintf(old_opts,
"eviction=false;");
7152 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
7158 do_daemonize =
true;
7164 settings.reqs_per_event = atoi(optarg);
7165 if (
settings.reqs_per_event <= 0) {
7166 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7167 "Number of requests per event must be greater than 0\n");
7180 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7181 "Factor must be greater than 1\n");
7184 old_opts += sprintf(old_opts,
"factor=%f;",
7188 settings.chunk_size = atoi(optarg);
7190 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7191 "Chunk size must be greater than 0\n");
7194 old_opts += sprintf(old_opts,
"chunk_size=%u;",
7198 settings.num_threads = atoi(optarg);
7200 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7201 "Number of threads must be greater than 0\n");
7209 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7210 "WARNING: Setting a high number of worker"
7211 "threads is not recommended.\n"
7212 " Set this value to the number of cores in"
7213 " your machine or less.\n");
7217 settings.prefix_delimiter = optarg[0];
7221 if (enable_large_pages() == 0) {
7223 old_opts += sprintf(old_opts,
"preallocate=true;");
7233 protocol_specified =
true;
7234 if (strcmp(optarg,
"auto") == 0) {
7235 settings.binding_protocol = negotiating_prot;
7236 }
else if (strcmp(optarg,
"binary") == 0) {
7237 settings.binding_protocol = binary_prot;
7238 }
else if (strcmp(optarg,
"ascii") == 0) {
7239 settings.binding_protocol = ascii_prot;
7241 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7242 "Invalid value for binding protocol: %s\n"
7243 " -- should be one of auto, binary, or ascii\n", optarg);
7248 unit = optarg[strlen(optarg)-1];
7249 if (unit ==
'k' || unit ==
'm' ||
7250 unit ==
'K' || unit ==
'M') {
7251 optarg[strlen(optarg)-1] =
'\0';
7252 size_max = atoi(optarg);
7253 if (unit ==
'k' || unit ==
'K')
7255 if (unit ==
'm' || unit ==
'M')
7256 size_max *= 1024 * 1024;
7259 settings.item_size_max = atoi(optarg);
7261 if (
settings.item_size_max < 1024) {
7262 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7263 "Item max size cannot be less than 1024 bytes.\n");
7266 if (
settings.item_size_max > 1024 * 1024 * 128) {
7267 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7268 "Cannot set item size limit higher than 128 mb.\n");
7271 if (
settings.item_size_max > 1024 * 1024) {
7272 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7273 "WARNING: Setting item max size above 1MB is not"
7275 " Raising this limit increases the minimum memory requirements\n"
7276 " and will decrease your memory efficiency.\n"
7280 old_opts += sprintf(old_opts,
"item_size_max=%zu;",
7283 old_opts += sprintf(old_opts,
"item_size_max=%lu;", (
long unsigned)
7299 # ifdef ENABLE_MEMCACHED_SASL
7300 # ifndef SASL_ENABLED
7301 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7302 "This server is not built with SASL support.\n");
7310 char *ptr = strchr(optarg,
',');
7315 if (!load_extension(optarg, ptr)) {
7324 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7325 "Illegal argument \"%c\"\n", c);
7334 while (-1 != (c = getopt(argc, argv,
7369 settings.access= strtol(optarg,NULL,8);
7374 udp_specified =
true;
7378 tcp_specified =
true;
7384 settings.maxbytes = ((size_t)atoi(optarg)) * 1024 * 1024;
7385 old_opts += sprintf(old_opts,
"cache_size=%lu;",
7390 old_opts += sprintf(old_opts,
"eviction=false;");
7406 perform_callbacks(ON_LOG_LEVEL, NULL, NULL);
7410 size_t len = strlen(
settings.inter) + strlen(optarg) + 2;
7411 char *p = malloc(len);
7413 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7414 "Failed to allocate memory\n");
7417 snprintf(p, len,
"%s,%s",
settings.inter, optarg);
7425 do_daemonize =
true;
7431 settings.reqs_per_event = atoi(optarg);
7432 if (
settings.reqs_per_event <= 0) {
7433 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7434 "Number of requests per event must be greater than 0\n");
7447 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7448 "Factor must be greater than 1\n");
7451 old_opts += sprintf(old_opts,
"factor=%f;",
7455 settings.chunk_size = atoi(optarg);
7457 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7458 "Chunk size must be greater than 0\n");
7461 old_opts += sprintf(old_opts,
"chunk_size=%u;",
7465 settings.num_threads = atoi(optarg);
7467 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7468 "Number of threads must be greater than 0\n");
7476 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7477 "WARNING: Setting a high number of worker"
7478 "threads is not recommended.\n"
7479 " Set this value to the number of cores in"
7480 " your machine or less.\n");
7484 settings.prefix_delimiter = optarg[0];
7488 if (enable_large_pages() == 0) {
7490 old_opts += sprintf(old_opts,
"preallocate=true;");
7500 protocol_specified =
true;
7501 if (strcmp(optarg,
"auto") == 0) {
7502 settings.binding_protocol = negotiating_prot;
7503 }
else if (strcmp(optarg,
"binary") == 0) {
7504 settings.binding_protocol = binary_prot;
7505 }
else if (strcmp(optarg,
"ascii") == 0) {
7506 settings.binding_protocol = ascii_prot;
7508 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7509 "Invalid value for binding protocol: %s\n"
7510 " -- should be one of auto, binary, or ascii\n", optarg);
7515 unit = optarg[strlen(optarg)-1];
7516 if (unit ==
'k' || unit ==
'm' ||
7517 unit ==
'K' || unit ==
'M') {
7518 optarg[strlen(optarg)-1] =
'\0';
7519 size_max = atoi(optarg);
7520 if (unit ==
'k' || unit ==
'K')
7522 if (unit ==
'm' || unit ==
'M')
7523 size_max *= 1024 * 1024;
7526 settings.item_size_max = atoi(optarg);
7528 if (
settings.item_size_max < 1024) {
7529 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7530 "Item max size cannot be less than 1024 bytes.\n");
7533 if (
settings.item_size_max > 1024 * 1024 * 128) {
7534 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7535 "Cannot set item size limit higher than 128 mb.\n");
7538 if (
settings.item_size_max > 1024 * 1024) {
7539 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7540 "WARNING: Setting item max size above 1MB is not"
7542 " Raising this limit increases the minimum memory requirements\n"
7543 " and will decrease your memory efficiency.\n"
7547 old_opts += sprintf(old_opts,
"item_size_max=%zu;",
7550 old_opts += sprintf(old_opts,
"item_size_max=%lu;", (
long unsigned)
7558 engine_config = optarg;
7564 #ifndef SASL_ENABLED
7565 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7566 "This server is not built with SASL support.\n");
7573 char *ptr = strchr(optarg,
',');
7578 if (!load_extension(optarg, ptr)) {
7587 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7588 "Illegal argument \"%c\"\n", c);
7594 if (getenv(
"MEMCACHED_REQS_TAP_EVENT") != NULL) {
7595 settings.reqs_per_tap_event = atoi(getenv(
"MEMCACHED_REQS_TAP_EVENT"));
7598 if (
settings.reqs_per_tap_event <= 0) {
7599 settings.reqs_per_tap_event = DEFAULT_REQS_PER_TAP_EVENT;
7603 if (install_sigterm_handler() != 0) {
7604 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7605 "Failed to install SIGTERM handler\n");
7609 char *topkeys_env = getenv(
"MEMCACHED_TOP_KEYS");
7610 if (topkeys_env != NULL) {
7611 settings.topkeys = atoi(topkeys_env);
7618 if (!protocol_specified) {
7619 settings.binding_protocol = binary_prot;
7621 if (
settings.binding_protocol == negotiating_prot) {
7622 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7623 "ERROR: You cannot use auto-negotiating protocol while requiring SASL.\n");
7626 if (
settings.binding_protocol == ascii_prot) {
7627 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7628 "ERROR: You cannot use only ASCII protocol while requiring SASL.\n");
7634 if (tcp_specified && !udp_specified) {
7636 }
else if (udp_specified && !tcp_specified) {
7650 struct rlimit rlim_new;
7655 if (getrlimit(RLIMIT_CORE, &rlim) == 0) {
7656 rlim_new.rlim_cur = rlim_new.rlim_max = RLIM_INFINITY;
7657 if (setrlimit(RLIMIT_CORE, &rlim_new)!= 0) {
7659 rlim_new.rlim_cur = rlim_new.rlim_max = rlim.rlim_max;
7660 (void)setrlimit(RLIMIT_CORE, &rlim_new);
7669 if ((getrlimit(RLIMIT_CORE, &rlim) != 0) || rlim.rlim_cur == 0) {
7670 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7671 "failed to ensure corefile creation\n");
7681 if (getrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7682 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7683 "failed to getrlimit number of files\n");
7687 if (rlim.rlim_cur < maxfiles)
7688 rlim.rlim_cur = maxfiles;
7689 if (rlim.rlim_max < rlim.rlim_cur)
7690 rlim.rlim_max = rlim.rlim_cur;
7691 if (setrlimit(RLIMIT_NOFILE, &rlim) != 0) {
7692 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7693 "failed to set rlimit for open files. Try running as"
7694 " root or requesting smaller maxconns value.\n");
7705 nfiles +=
settings.num_threads * 2;
7709 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7710 "Configuratioin error. \n"
7711 "You specified %d connections, but the system will use at "
7712 "least %d\nconnection structures to start.\n",
7718 if (getuid() == 0 || geteuid() == 0) {
7719 if (username == 0 || *username ==
'\0') {
7720 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7721 "can't run as root without the -u switch\n");
7724 if ((pw = getpwnam(username)) == 0) {
7725 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7726 "can't find the user %s to switch to\n", username);
7729 if (setgid(pw->pw_gid) < 0 || setuid(pw->pw_uid) < 0) {
7730 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7731 "failed to assume identity of user %s: %s\n", username,
7744 if (sigignore(SIGHUP) == -1) {
7745 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7746 "Failed to ignore SIGHUP: ", strerror(errno));
7748 if (daemonize(maxcore,
settings.verbose) == -1) {
7749 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7750 "failed to daemon() in order to daemonize\n");
7757 #ifdef HAVE_MLOCKALL
7758 int res = mlockall(MCL_CURRENT | MCL_FUTURE);
7760 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7761 "warning: -k invalid, mlockall() failed: %s\n",
7765 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7766 "warning: -k invalid, mlockall() not supported on this platform. proceeding without.\n");
7771 main_base = event_init();
7775 if (!load_engine(engine,get_server_api,
settings.extensions.logger,&engine_handle)) {
7780 if(!init_engine(engine_handle,engine_config,
settings.extensions.logger)) {
7781 #ifdef INNODB_MEMCACHED
7790 log_engine_details(engine_handle,
settings.extensions.logger);
7801 if (!(conn_cache = cache_create(
"conn",
sizeof(conn),
sizeof(
void*),
7802 conn_constructor, conn_destructor))) {
7803 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7804 "Failed to create connection cache\n");
7808 default_independent_stats = new_independent_stats();
7815 if (sigignore(SIGPIPE) == -1) {
7816 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7817 "failed to ignore SIGPIPE; sigaction");
7823 thread_init(
settings.num_threads, main_base, dispatch_event_handler);
7826 clock_handler(0, 0, 0);
7831 vperror(
"failed to listen on UNIX socket: %s",
settings.socketpath);
7840 const char *portnumber_filename = getenv(
"MEMCACHED_PORT_FILENAME");
7841 char temp_portnumber_filename[PATH_MAX];
7842 FILE *portnumber_file = NULL;
7844 if (portnumber_filename != NULL) {
7845 snprintf(temp_portnumber_filename,
7846 sizeof(temp_portnumber_filename),
7847 "%s.lck", portnumber_filename);
7849 portnumber_file = fopen(temp_portnumber_filename,
"a");
7850 if (portnumber_file == NULL) {
7851 settings.extensions.logger->
log(EXTENSION_LOG_WARNING, NULL,
7852 "Failed to open \"%s\": %s\n",
7853 temp_portnumber_filename, strerror(errno));
7859 vperror(
"failed to listen on TCP port %d",
settings.port);
7860 #ifdef INNODB_MEMCACHED
7879 vperror(
"failed to listen on UDP port %d",
settings.udpport);
7883 if (portnumber_file) {
7884 fclose(portnumber_file);
7885 rename(temp_portnumber_filename, portnumber_filename);
7889 if (pid_file != NULL) {
7897 event_base_loop(main_base, 0);
7900 settings.extensions.logger->
log(EXTENSION_LOG_INFO, NULL,
7901 "Initiating shutdown\n");
7913 remove_pidfile(pid_file);
7918 memcached_shutdown = 2;
7920 return EXIT_SUCCESS;