12 #include <arpa/inet.h>
14 #include "default_engine.h"
15 #include "memcached/util.h"
16 #include "memcached/config_parser.h"
18 #define CMD_SET_VBUCKET 0x83
19 #define CMD_GET_VBUCKET 0x84
20 #define CMD_DEL_VBUCKET 0x85
23 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
24 const char* config_str);
25 static void default_destroy(
ENGINE_HANDLE* handle,
bool force);
26 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
33 const rel_time_t exptime);
34 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
41 static void default_item_release(
ENGINE_HANDLE* handle,
const void *cookie,
49 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
54 static void default_reset_stats(
ENGINE_HANDLE* handle,
const void *cookie);
59 ENGINE_STORE_OPERATION operation,
61 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
68 const uint64_t initial,
69 const rel_time_t exptime,
74 const void* cookie, time_t when);
75 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
78 const void* client,
size_t nclient,
80 const void* userdata,
size_t nuserdata);
81 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
92 uint16_t vbid,
enum vbucket_state
to) {
94 vi.c = e->vbucket_infos[vbid];
96 e->vbucket_infos[vbid] = vi.c;
99 static enum vbucket_state get_vbucket_state(
struct default_engine *e,
102 vi.c = e->vbucket_infos[vbid];
106 static bool handled_vbucket(
struct default_engine *e, uint16_t vbid) {
107 return e->config.ignore_vbucket
108 || (get_vbucket_state(e, vbid) == VBUCKET_STATE_ACTIVE);
112 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
114 static bool get_item_info(
ENGINE_HANDLE *handle,
const void *cookie,
117 static const char const * vbucket_state_name(
enum vbucket_state s) {
118 static const char const * vbucket_states[] = {
119 "dead",
"active",
"replica",
"pending"
121 return vbucket_states[s];
125 GET_SERVER_API get_server_api,
128 if (interface != 1 || api == NULL) {
129 return ENGINE_ENOTSUP;
133 if (engine == NULL) {
134 return ENGINE_ENOMEM;
142 .get_info = default_get_info,
143 .initialize = default_initialize,
144 .destroy = default_destroy,
145 .allocate = default_item_allocate,
146 .remove = default_item_delete,
147 .release = default_item_release,
149 .get_stats = default_get_stats,
150 .reset_stats = default_reset_stats,
151 .store = default_store,
152 .arithmetic = default_arithmetic,
153 .flush = default_flush,
154 .unknown_command = default_unknown_command,
155 .item_set_cas = item_set_cas,
156 .get_item_info = get_item_info,
157 .get_tap_iterator = get_tap_iterator
160 .get_server_api = get_server_api,
166 .lock = PTHREAD_MUTEX_INITIALIZER
168 .cache_lock = PTHREAD_MUTEX_INITIALIZER,
170 .lock = PTHREAD_MUTEX_INITIALIZER,
176 .evict_to_free =
true,
177 .maxbytes = 64 * 1024 * 1024,
178 .preallocate =
false,
181 .item_size_max= 1024 * 1024,
184 .lock = PTHREAD_MUTEX_INITIALIZER,
196 *engine = default_engine;
199 return ENGINE_SUCCESS;
206 static inline hash_item* get_real_item(item* item) {
211 return &get_handle(handle)->info.engine_info;
214 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
215 const char* config_str) {
218 ENGINE_ERROR_CODE
ret = initalize_configuration(se, config_str);
219 if (ret != ENGINE_SUCCESS) {
224 if (se->config.use_cas) {
228 ret = assoc_init(se);
229 if (ret != ENGINE_SUCCESS) {
233 ret = slabs_init(se, se->config.maxbytes, se->config.factor,
234 se->config.preallocate);
235 if (ret != ENGINE_SUCCESS) {
239 return ENGINE_SUCCESS;
242 static void default_destroy(
ENGINE_HANDLE* handle,
bool force) {
247 pthread_mutex_destroy(&se->stats.lock);
248 pthread_mutex_destroy(&se->slabs.
lock);
254 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
261 const rel_time_t exptime) {
263 size_t ntotal =
sizeof(
hash_item) + nkey + nbytes;
264 if (engine->config.use_cas) {
265 ntotal +=
sizeof(uint64_t);
267 unsigned int id = slabs_clsid(engine, ntotal);
273 it = item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
277 return ENGINE_SUCCESS;
279 return ENGINE_ENOMEM;
283 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
291 VBUCKET_GUARD(engine, vbucket);
293 hash_item *it = item_get(engine, key, nkey);
295 return ENGINE_KEY_ENOENT;
298 if (cas == 0 || cas == item_get_cas(it)) {
299 item_unlink(engine, it);
300 item_release(engine, it);
302 return ENGINE_KEY_EEXISTS;
305 return ENGINE_SUCCESS;
311 item_release(get_handle(handle), get_real_item(item));
321 VBUCKET_GUARD(engine, vbucket);
323 *item = item_get(engine, key, nkey);
325 return ENGINE_SUCCESS;
327 return ENGINE_KEY_ENOENT;
333 const void *cookie) {
334 for (
int i = 0;
i < NUM_VBUCKETS;
i++) {
335 enum vbucket_state state = get_vbucket_state(e,
i);
336 if (state != VBUCKET_STATE_DEAD) {
338 snprintf(buf,
sizeof(buf),
"vb_%d",
i);
339 const char * state_name = vbucket_state_name(state);
340 add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
345 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
347 const char* stat_key,
352 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
354 if (stat_key == NULL) {
358 pthread_mutex_lock(&engine->stats.lock);
359 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.evictions);
360 add_stat(
"evictions", 9, val, len, cookie);
361 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.curr_items);
362 add_stat(
"curr_items", 10, val, len, cookie);
363 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.total_items);
364 add_stat(
"total_items", 11, val, len, cookie);
365 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.curr_bytes);
366 add_stat(
"bytes", 5, val, len, cookie);
367 len = sprintf(val,
"%"PRIu64, engine->stats.reclaimed);
368 add_stat(
"reclaimed", 9, val, len, cookie);
369 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->config.maxbytes);
370 add_stat(
"engine_maxbytes", 15, val, len, cookie);
371 pthread_mutex_unlock(&engine->stats.lock);
372 }
else if (strncmp(stat_key,
"slabs", 5) == 0) {
373 slabs_stats(engine, add_stat, cookie);
374 }
else if (strncmp(stat_key,
"items", 5) == 0) {
375 item_stats(engine, add_stat, cookie);
376 }
else if (strncmp(stat_key,
"sizes", 5) == 0) {
377 item_stats_sizes(engine, add_stat, cookie);
378 }
else if (strncmp(stat_key,
"vbucket", 7) == 0) {
379 stats_vbucket(engine, add_stat, cookie);
380 }
else if (strncmp(stat_key,
"scrub", 5) == 0) {
384 pthread_mutex_lock(&engine->scrubber.lock);
385 if (engine->scrubber.running) {
386 add_stat(
"scrubber:status", 15,
"running", 7, cookie);
388 add_stat(
"scrubber:status", 15,
"stopped", 7, cookie);
391 if (engine->scrubber.started != 0) {
392 if (engine->scrubber.stopped != 0) {
393 time_t diff = engine->scrubber.started - engine->scrubber.stopped;
394 len = sprintf(val,
"%"PRIu64, (uint64_t)diff);
395 add_stat(
"scrubber:last_run", 17, val, len, cookie);
398 len = sprintf(val,
"%"PRIu64, engine->scrubber.visited);
399 add_stat(
"scrubber:visited", 16, val, len, cookie);
400 len = sprintf(val,
"%"PRIu64, engine->scrubber.cleaned);
401 add_stat(
"scrubber:cleaned", 16, val, len, cookie);
403 pthread_mutex_unlock(&engine->scrubber.lock);
405 ret = ENGINE_KEY_ENOENT;
411 static ENGINE_ERROR_CODE default_store(
ENGINE_HANDLE* handle,
415 ENGINE_STORE_OPERATION operation,
418 VBUCKET_GUARD(engine, vbucket);
419 return store_item(engine, get_real_item(item), cas, operation,
423 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
427 const bool increment,
429 const uint64_t delta,
430 const uint64_t initial,
431 const rel_time_t exptime,
436 VBUCKET_GUARD(engine, vbucket);
438 return arithmetic(engine, cookie, key, nkey, increment,
439 create, delta, initial, exptime, cas,
443 static ENGINE_ERROR_CODE default_flush(
ENGINE_HANDLE* handle,
444 const void* cookie, time_t when) {
445 item_flush_expired(get_handle(handle), when);
447 return ENGINE_SUCCESS;
450 static void default_reset_stats(
ENGINE_HANDLE* handle,
const void *cookie) {
452 item_stats_reset(engine);
454 pthread_mutex_lock(&engine->stats.lock);
455 engine->stats.evictions = 0;
456 engine->stats.reclaimed = 0;
457 engine->stats.total_items = 0;
458 pthread_mutex_unlock(&engine->stats.lock);
462 const void *cookie, item **itm,
void **es,
463 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
464 uint32_t *seqno, uint16_t *vbucket) {
469 const void *cookie, item **itm,
void **es,
470 uint16_t *nes, uint8_t *ttl, uint16_t *flags,
471 uint32_t *seqno, uint16_t *vbucket) {
472 return TAP_DISCONNECT;
476 const void* client,
size_t nclient,
478 const void* userdata,
size_t nuserdata) {
480 if ((flags & TAP_CONNECT_FLAG_DUMP)
481 || (flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) {
482 rv = tap_always_disconnect;
487 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
488 const char *cfg_str) {
489 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
491 se->config.vb0 =
true;
493 if (cfg_str != NULL) {
497 .value.dt_bool = &se->config.use_cas },
500 .value.dt_size = &se->config.verbose },
503 .value.dt_bool = &se->config.evict_to_free },
504 { .key =
"cache_size",
506 .value.dt_size = &se->config.maxbytes },
507 { .key =
"preallocate",
509 .value.dt_bool = &se->config.preallocate },
511 .datatype = DT_FLOAT,
512 .value.dt_float = &se->config.factor },
513 { .key =
"chunk_size",
515 .value.dt_size = &se->config.chunk_size },
516 { .key =
"item_size_max",
518 .value.dt_size = &se->config.item_size_max },
519 { .key =
"ignore_vbucket",
521 .value.dt_bool = &se->config.ignore_vbucket },
524 .value.dt_bool = &se->config.vb0 },
525 { .key =
"config_file",
526 .datatype = DT_CONFIGFILE },
530 ret = se->server.core->
parse_config(cfg_str, items, stderr);
533 if (se->config.vb0) {
534 set_vbucket_state(se, 0, VBUCKET_STATE_ACTIVE);
537 return ENGINE_SUCCESS;
551 int keylen = ntohs(req->message.header.request.keylen);
552 if (keylen >= (
int)
sizeof(keyz)) {
553 *msg =
"Key is too large.";
554 return PROTOCOL_BINARY_RESPONSE_EINVAL;
556 memcpy(keyz, ((
char*)request) +
sizeof(req->message.header), keylen);
560 size_t bodylen = ntohl(req->message.header.request.bodylen)
561 - ntohs(req->message.header.request.keylen);
562 if (bodylen >=
sizeof(valz)) {
563 *msg =
"Value is too large.";
564 return PROTOCOL_BINARY_RESPONSE_EINVAL;
566 memcpy(valz, (
char*)request +
sizeof(req->message.header)
568 valz[bodylen] = 0x00;
573 enum vbucket_state state;
574 if (strcmp(valz,
"active") == 0) {
575 state = VBUCKET_STATE_ACTIVE;
576 }
else if(strcmp(valz,
"replica") == 0) {
577 state = VBUCKET_STATE_REPLICA;
578 }
else if(strcmp(valz,
"pending") == 0) {
579 state = VBUCKET_STATE_PENDING;
580 }
else if(strcmp(valz,
"dead") == 0) {
581 state = VBUCKET_STATE_DEAD;
583 *msg =
"Invalid state.";
584 return PROTOCOL_BINARY_RESPONSE_EINVAL;
587 uint32_t vbucket = 0;
588 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
589 *msg =
"Value out of range.";
590 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
592 set_vbucket_state(e, (uint16_t)vbucket, state);
608 int keylen = ntohs(req->message.header.request.keylen);
609 if (keylen >= (
int)
sizeof(keyz)) {
610 *msg =
"Key is too large.";
611 return PROTOCOL_BINARY_RESPONSE_EINVAL;
613 memcpy(keyz, ((
char*)request) +
sizeof(req->message.header), keylen);
618 uint32_t vbucket = 0;
619 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
620 *msg =
"Value out of range.";
621 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
623 *msg = vbucket_state_name(get_vbucket_state(e, (uint16_t)vbucket));
639 int keylen = ntohs(req->message.header.request.keylen);
640 if (keylen >= (
int)
sizeof(keyz)) {
641 *msg =
"Key is too large.";
642 return PROTOCOL_BINARY_RESPONSE_EINVAL;
644 memcpy(keyz, ((
char*)request) +
sizeof(req->message.header), keylen);
649 uint32_t vbucket = 0;
650 if (!safe_strtoul(keyz, &vbucket) || vbucket > NUM_VBUCKETS) {
651 *msg =
"Value out of range.";
652 rv = PROTOCOL_BINARY_RESPONSE_EINVAL;
654 set_vbucket_state(e, (uint16_t)vbucket, VBUCKET_STATE_DEAD);
664 return item_start_scrub(e) ? PROTOCOL_BINARY_RESPONSE_SUCCESS
665 : PROTOCOL_BINARY_RESPONSE_EBUSY;
668 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
676 const char *msg = NULL;
678 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND;
680 switch(request->request.opcode) {
681 case PROTOCOL_BINARY_CMD_SCRUB:
682 res = scrub_cmd(e, request, &msg);
684 case CMD_DEL_VBUCKET:
685 res = rm_vbucket(e, request, &msg);
687 case CMD_SET_VBUCKET:
688 res = set_vbucket(e, request, &msg);
690 case CMD_GET_VBUCKET:
691 res = get_vbucket(e, request, &msg);
700 size_t msg_size = msg ? strlen(msg) : 0;
701 sent = response(NULL, 0, NULL, 0,
702 msg, (uint16_t)msg_size,
703 PROTOCOL_BINARY_RAW_BYTES,
704 (uint16_t)res, 0, cookie);
706 sent = response(NULL, 0, NULL, 0, NULL, 0,
707 PROTOCOL_BINARY_RAW_BYTES,
708 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
712 return ENGINE_SUCCESS;
714 return ENGINE_FAILED;
719 uint64_t item_get_cas(
const hash_item* item)
721 if (item->
iflag & ITEM_WITH_CAS) {
722 return *(uint64_t*)(item + 1);
728 item* item, uint64_t val)
731 if (it->
iflag & ITEM_WITH_CAS) {
732 *(uint64_t*)(it + 1) = val;
736 const void* item_get_key(
const hash_item* item)
738 char *ret = (
void*)(item + 1);
739 if (item->
iflag & ITEM_WITH_CAS) {
740 ret +=
sizeof(uint64_t);
746 char* item_get_data(
const hash_item* item)
748 return ((
char*)item_get_key(item)) + item->
nkey;
751 uint8_t item_get_clsid(
const hash_item* item)
756 static bool get_item_info(
ENGINE_HANDLE *handle,
const void *cookie,
760 if (item_info->nvalue < 1) {
763 item_info->cas = item_get_cas(it);
767 item_info->clsid = it->slabs_clsid;
769 item_info->nvalue = 1;
770 item_info->
key = item_get_key(it);
771 item_info->value[0].iov_base = item_get_data(it);
772 item_info->value[0].iov_len = it->
nbytes;