14 #include "default_engine.h"
15 #include "memcached/util.h"
16 #include "memcached/config_parser.h"
19 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
20 const char* config_str);
23 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
30 const rel_time_t exptime);
31 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
38 static void default_item_release(
ENGINE_HANDLE* handle,
const void *cookie,
46 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
51 static void default_reset_stats(
ENGINE_HANDLE* handle,
const void *cookie);
56 ENGINE_STORE_OPERATION operation,
58 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
65 const uint64_t initial,
66 const rel_time_t exptime,
71 const void* cookie, time_t when);
72 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
74 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
79 static ENGINE_ERROR_CODE default_tap_notify(
ENGINE_HANDLE* handle,
81 void *engine_specific,
85 tap_event_t tap_event,
101 const void* userdata,
104 static void default_handle_disconnect(
const void *cookie,
105 ENGINE_EVENT_TYPE
type,
106 const void *event_data,
107 const void *cb_data);
115 uint16_t vbid, vbucket_state_t
to) {
117 vi.c = e->vbucket_infos[vbid];
119 e->vbucket_infos[vbid] = vi.c;
122 static vbucket_state_t get_vbucket_state(
struct default_engine *e,
125 vi.c = e->vbucket_infos[vbid];
129 static bool handled_vbucket(
struct default_engine *e, uint16_t vbid) {
130 return e->config.ignore_vbucket
131 || (get_vbucket_state(e, vbid) == vbucket_state_active);
135 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; }
137 static bool get_item_info(
ENGINE_HANDLE *handle,
const void *cookie,
140 static const char const * vbucket_state_name(vbucket_state_t s) {
141 static const char const * vbucket_states[] = {
142 [vbucket_state_active] =
"active",
143 [vbucket_state_replica] =
"replica",
144 [vbucket_state_pending] =
"pending",
145 [vbucket_state_dead] =
"dead"
147 if (is_valid_vbucket_state_t(s)) {
148 return vbucket_states[s];
150 return "Illegal vbucket state";
155 GET_SERVER_API get_server_api,
158 if (interface != 1 || api == NULL) {
159 return ENGINE_ENOTSUP;
163 if (engine == NULL) {
164 return ENGINE_ENOMEM;
172 .get_info = default_get_info,
173 .initialize = default_initialize,
174 .destroy = default_destroy,
175 .allocate = default_item_allocate,
176 .remove = default_item_delete,
177 .release = default_item_release,
179 .get_stats = default_get_stats,
180 .reset_stats = default_reset_stats,
181 .store = default_store,
182 .arithmetic = default_arithmetic,
183 .flush = default_flush,
184 .unknown_command = default_unknown_command,
185 .tap_notify = default_tap_notify,
186 .get_tap_iterator = default_get_tap_iterator,
187 .item_set_cas = item_set_cas,
188 .get_item_info = get_item_info
191 .get_server_api = get_server_api,
197 .lock = PTHREAD_MUTEX_INITIALIZER
199 .cache_lock = PTHREAD_MUTEX_INITIALIZER,
201 .lock = PTHREAD_MUTEX_INITIALIZER,
207 .evict_to_free =
true,
208 .maxbytes = 64 * 1024 * 1024,
209 .preallocate =
false,
212 .item_size_max= 1024 * 1024,
215 .lock = PTHREAD_MUTEX_INITIALIZER,
218 .lock = PTHREAD_MUTEX_INITIALIZER,
221 .info.engine_info = {
222 .description =
"Default engine v0.1",
225 [0].feature = ENGINE_FEATURE_LRU
230 *engine = default_engine;
231 engine->tap_connections.clients = calloc(default_engine.tap_connections.size,
sizeof(
void*));
232 if (engine->tap_connections.clients == NULL) {
234 return ENGINE_ENOMEM;
237 return ENGINE_SUCCESS;
240 static inline struct default_engine* get_handle(
ENGINE_HANDLE* handle) {
241 return (
struct default_engine*)handle;
244 static inline hash_item* get_real_item(item* item) {
249 return &get_handle(handle)->info.engine_info;
252 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
253 const char* config_str) {
254 struct default_engine* se = get_handle(handle);
256 ENGINE_ERROR_CODE
ret = initalize_configuration(se, config_str);
257 if (ret != ENGINE_SUCCESS) {
262 if (se->config.use_cas) {
266 ret = assoc_init(se);
267 if (ret != ENGINE_SUCCESS) {
271 ret = slabs_init(se, se->config.maxbytes, se->config.factor,
272 se->config.preallocate);
273 if (ret != ENGINE_SUCCESS) {
277 se->server.callback->
register_callback(handle, ON_DISCONNECT, default_handle_disconnect, handle);
279 return ENGINE_SUCCESS;
282 static void default_destroy(
ENGINE_HANDLE* handle,
const bool force) {
284 struct default_engine* se = get_handle(handle);
288 pthread_mutex_destroy(&se->stats.lock);
289 pthread_mutex_destroy(&se->slabs.
lock);
291 free(se->tap_connections.clients);
296 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
303 const rel_time_t exptime) {
304 struct default_engine* engine = get_handle(handle);
305 size_t ntotal =
sizeof(
hash_item) + nkey + nbytes;
306 if (engine->config.use_cas) {
307 ntotal +=
sizeof(uint64_t);
309 unsigned int id = slabs_clsid(engine, ntotal);
315 it = item_alloc(engine, key, nkey, flags, engine->server.core->
realtime(exptime),
320 return ENGINE_SUCCESS;
322 return ENGINE_ENOMEM;
326 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
333 struct default_engine* engine = get_handle(handle);
334 VBUCKET_GUARD(engine, vbucket);
336 hash_item *it = item_get(engine, key, nkey);
338 return ENGINE_KEY_ENOENT;
341 if (cas == 0 || cas == item_get_cas(it)) {
342 item_unlink(engine, it);
343 item_release(engine, it);
345 return ENGINE_KEY_EEXISTS;
348 return ENGINE_SUCCESS;
354 item_release(get_handle(handle), get_real_item(item));
363 struct default_engine *engine = get_handle(handle);
364 VBUCKET_GUARD(engine, vbucket);
366 *item = item_get(engine, key, nkey);
368 return ENGINE_SUCCESS;
370 return ENGINE_KEY_ENOENT;
374 static void stats_vbucket(
struct default_engine *e,
376 const void *cookie) {
377 for (
int i = 0;
i < NUM_VBUCKETS;
i++) {
378 vbucket_state_t state = get_vbucket_state(e,
i);
379 if (state != vbucket_state_dead) {
381 snprintf(buf,
sizeof(buf),
"vb_%d",
i);
382 const char * state_name = vbucket_state_name(state);
383 add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
388 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
390 const char* stat_key,
394 struct default_engine* engine = get_handle(handle);
395 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
397 if (stat_key == NULL) {
401 pthread_mutex_lock(&engine->stats.lock);
402 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.evictions);
403 add_stat(
"evictions", 9, val, len, cookie);
404 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.curr_items);
405 add_stat(
"curr_items", 10, val, len, cookie);
406 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.total_items);
407 add_stat(
"total_items", 11, val, len, cookie);
408 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->stats.curr_bytes);
409 add_stat(
"bytes", 5, val, len, cookie);
410 len = sprintf(val,
"%"PRIu64, engine->stats.reclaimed);
411 add_stat(
"reclaimed", 9, val, len, cookie);
412 len = sprintf(val,
"%"PRIu64, (uint64_t)engine->config.maxbytes);
413 add_stat(
"engine_maxbytes", 15, val, len, cookie);
414 pthread_mutex_unlock(&engine->stats.lock);
415 }
else if (strncmp(stat_key,
"slabs", 5) == 0) {
416 slabs_stats(engine, add_stat, cookie);
417 }
else if (strncmp(stat_key,
"items", 5) == 0) {
418 item_stats(engine, add_stat, cookie);
419 }
else if (strncmp(stat_key,
"sizes", 5) == 0) {
420 item_stats_sizes(engine, add_stat, cookie);
421 }
else if (strncmp(stat_key,
"vbucket", 7) == 0) {
422 stats_vbucket(engine, add_stat, cookie);
423 }
else if (strncmp(stat_key,
"scrub", 5) == 0) {
427 pthread_mutex_lock(&engine->scrubber.lock);
428 if (engine->scrubber.running) {
429 add_stat(
"scrubber:status", 15,
"running", 7, cookie);
431 add_stat(
"scrubber:status", 15,
"stopped", 7, cookie);
434 if (engine->scrubber.started != 0) {
435 if (engine->scrubber.stopped != 0) {
436 time_t diff = engine->scrubber.started - engine->scrubber.stopped;
437 len = sprintf(val,
"%"PRIu64, (uint64_t)diff);
438 add_stat(
"scrubber:last_run", 17, val, len, cookie);
441 len = sprintf(val,
"%"PRIu64, engine->scrubber.visited);
442 add_stat(
"scrubber:visited", 16, val, len, cookie);
443 len = sprintf(val,
"%"PRIu64, engine->scrubber.cleaned);
444 add_stat(
"scrubber:cleaned", 16, val, len, cookie);
446 pthread_mutex_unlock(&engine->scrubber.lock);
448 ret = ENGINE_KEY_ENOENT;
454 static ENGINE_ERROR_CODE default_store(
ENGINE_HANDLE* handle,
458 ENGINE_STORE_OPERATION operation,
460 struct default_engine *engine = get_handle(handle);
461 VBUCKET_GUARD(engine, vbucket);
462 return store_item(engine, get_real_item(item), cas, operation,
466 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
470 const bool increment,
472 const uint64_t delta,
473 const uint64_t initial,
474 const rel_time_t exptime,
478 struct default_engine *engine = get_handle(handle);
479 VBUCKET_GUARD(engine, vbucket);
481 return arithmetic(engine, cookie, key, nkey, increment,
482 create, delta, initial, engine->server.core->
realtime(exptime), cas,
486 static ENGINE_ERROR_CODE default_flush(
ENGINE_HANDLE* handle,
487 const void* cookie, time_t when) {
488 item_flush_expired(get_handle(handle), when);
490 return ENGINE_SUCCESS;
493 static void default_reset_stats(
ENGINE_HANDLE* handle,
const void *cookie) {
494 struct default_engine *engine = get_handle(handle);
495 item_stats_reset(engine);
497 pthread_mutex_lock(&engine->stats.lock);
498 engine->stats.evictions = 0;
499 engine->stats.reclaimed = 0;
500 engine->stats.total_items = 0;
501 pthread_mutex_unlock(&engine->stats.lock);
504 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
505 const char *cfg_str) {
506 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
508 se->config.vb0 =
true;
510 if (cfg_str != NULL) {
514 .value.dt_bool = &se->config.use_cas },
517 .value.dt_size = &se->config.verbose },
520 .value.dt_bool = &se->config.evict_to_free },
521 { .key =
"cache_size",
523 .value.dt_size = &se->config.maxbytes },
524 { .key =
"preallocate",
526 .value.dt_bool = &se->config.preallocate },
528 .datatype = DT_FLOAT,
529 .value.dt_float = &se->config.factor },
530 { .key =
"chunk_size",
532 .value.dt_size = &se->config.chunk_size },
533 { .key =
"item_size_max",
535 .value.dt_size = &se->config.item_size_max },
536 { .key =
"ignore_vbucket",
538 .value.dt_bool = &se->config.ignore_vbucket },
541 .value.dt_bool = &se->config.vb0 },
542 { .key =
"config_file",
543 .datatype = DT_CONFIGFILE },
547 ret = se->server.core->
parse_config(cfg_str, items, stderr);
550 if (se->config.vb0) {
551 set_vbucket_state(se, 0, vbucket_state_active);
554 return ENGINE_SUCCESS;
557 static bool set_vbucket(
struct default_engine *e,
561 size_t bodylen = ntohl(req->message.header.request.bodylen)
562 - ntohs(req->message.header.request.keylen);
563 if (bodylen !=
sizeof(vbucket_state_t)) {
564 const char *
msg =
"Incorrect packet format";
565 return response(NULL, 0, NULL, 0, msg, strlen(msg),
566 PROTOCOL_BINARY_RAW_BYTES,
567 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
569 vbucket_state_t state;
570 memcpy(&state, &req->message.body.state,
sizeof(state));
571 state = ntohl(state);
573 if (!is_valid_vbucket_state_t(state)) {
574 const char *msg =
"Invalid vbucket state";
575 return response(NULL, 0, NULL, 0, msg, strlen(msg),
576 PROTOCOL_BINARY_RAW_BYTES,
577 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
580 set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
581 return response(NULL, 0, NULL, 0, &state,
sizeof(state),
582 PROTOCOL_BINARY_RAW_BYTES,
583 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
586 static bool get_vbucket(
struct default_engine *e,
590 vbucket_state_t state;
591 state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
592 state = ntohl(state);
594 return response(NULL, 0, NULL, 0, &state,
sizeof(state),
595 PROTOCOL_BINARY_RAW_BYTES,
596 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
599 static bool rm_vbucket(
struct default_engine *e,
603 set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
604 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
605 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
608 static bool scrub_cmd(
struct default_engine *e,
614 if (!item_start_scrub(e)) {
615 res = PROTOCOL_BINARY_RESPONSE_EBUSY;
618 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
622 static bool touch(
struct default_engine *e,
const void *cookie,
625 if (request->request.extlen != 4 || request->request.keylen == 0) {
626 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
627 PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
631 void *key = t->bytes +
sizeof(t->bytes);
632 uint32_t exptime = ntohl(t->message.body.expiration);
633 uint16_t nkey = ntohs(request->request.keylen);
635 hash_item *item = touch_item(e, key, nkey,
638 if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
641 return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
642 PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
646 if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
647 ret = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
648 PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
650 ret = response(NULL, 0, &item->
flags,
sizeof(item->
flags),
651 item_get_data(item), item->
nbytes,
652 PROTOCOL_BINARY_RAW_BYTES,
653 PROTOCOL_BINARY_RESPONSE_SUCCESS,
654 item_get_cas(item), cookie);
656 item_release(e, item);
661 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
666 struct default_engine* e = get_handle(handle);
669 switch(request->request.opcode) {
670 case PROTOCOL_BINARY_CMD_SCRUB:
671 sent = scrub_cmd(e, cookie, request, response);
673 case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
674 sent = rm_vbucket(e, cookie, request, response);
676 case PROTOCOL_BINARY_CMD_SET_VBUCKET:
677 sent = set_vbucket(e, cookie, (
void*)request, response);
679 case PROTOCOL_BINARY_CMD_GET_VBUCKET:
680 sent = get_vbucket(e, cookie, (
void*)request, response);
682 case PROTOCOL_BINARY_CMD_TOUCH:
683 case PROTOCOL_BINARY_CMD_GAT:
684 case PROTOCOL_BINARY_CMD_GATQ:
685 sent = touch(e, cookie, request, response);
688 sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
689 PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
694 return ENGINE_SUCCESS;
696 return ENGINE_FAILED;
701 uint64_t item_get_cas(
const hash_item* item)
703 if (item->
iflag & ITEM_WITH_CAS) {
704 return *(uint64_t*)(item + 1);
710 item* item, uint64_t val)
713 if (it->
iflag & ITEM_WITH_CAS) {
714 *(uint64_t*)(it + 1) = val;
718 const void* item_get_key(
const hash_item* item)
720 char *ret = (
void*)(item + 1);
721 if (item->
iflag & ITEM_WITH_CAS) {
722 ret +=
sizeof(uint64_t);
728 char* item_get_data(
const hash_item* item)
730 return ((
char*)item_get_key(item)) + item->
nkey;
733 uint8_t item_get_clsid(
const hash_item* item)
738 static bool get_item_info(
ENGINE_HANDLE *handle,
const void *cookie,
742 if (item_info->nvalue < 1) {
745 item_info->cas = item_get_cas(it);
749 item_info->clsid = it->slabs_clsid;
751 item_info->nvalue = 1;
752 item_info->
key = item_get_key(it);
753 item_info->value[0].iov_base = item_get_data(it);
754 item_info->value[0].iov_len = it->
nbytes;
758 static ENGINE_ERROR_CODE default_tap_notify(
ENGINE_HANDLE* handle,
760 void *engine_specific,
764 tap_event_t tap_event,
774 struct default_engine* engine = get_handle(handle);
775 vbucket_state_t state;
777 ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
785 return default_flush(handle, cookie, 0);
788 return default_item_delete(handle, cookie, key, nkey, cas, vbucket);
793 ret = default_item_allocate(handle, cookie, &it, key, nkey, ndata, flags, exptime);
798 return ENGINE_TMPFAIL;
803 memcpy(item_get_data(it), data, ndata);
805 item_set_cas(handle, cookie, it, cas);
806 ret = default_store(handle, cookie, it, &cas, OPERATION_SET, vbucket);
807 if (ret == ENGINE_EWOULDBLOCK) {
810 item_release(engine, it);
815 case TAP_VBUCKET_SET:
816 if (nengine !=
sizeof(vbucket_state_t)) {
818 return ENGINE_DISCONNECT;
821 memcpy(&state, engine_specific, nengine);
822 state = (vbucket_state_t)ntohl(state);
824 if (!is_valid_vbucket_state_t(state)) {
825 return ENGINE_DISCONNECT;
828 set_vbucket_state(engine, vbucket, state);
829 return ENGINE_SUCCESS;
834 engine->server.log->get_logger()->
log(EXTENSION_LOG_DEBUG, cookie,
835 "Ignoring unknown tap event: %x", tap_event);
846 const void* userdata,
848 struct default_engine* engine = get_handle(handle);
850 if ((flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) {
854 pthread_mutex_lock(&engine->tap_connections.lock);
856 for (ii = 0; ii < engine->tap_connections.size; ++ii) {
857 if (engine->tap_connections.clients[ii] == NULL) {
858 engine->tap_connections.clients[ii] = cookie;
862 pthread_mutex_unlock(&engine->tap_connections.lock);
863 if (ii == engine->tap_connections.size) {
868 if (!initialize_item_tap_walker(engine, cookie)) {
870 pthread_mutex_lock(&engine->tap_connections.lock);
871 engine->tap_connections.clients[ii] = NULL;
872 pthread_mutex_unlock(&engine->tap_connections.lock);
876 return item_tap_walker;
879 static void default_handle_disconnect(
const void *cookie,
880 ENGINE_EVENT_TYPE
type,
881 const void *event_data,
882 const void *cb_data) {
883 struct default_engine *engine = (
struct default_engine*)cb_data;
884 pthread_mutex_lock(&engine->tap_connections.lock);
886 for (ii = 0; ii < engine->tap_connections.size; ++ii) {
887 if (engine->tap_connections.clients[ii] == cookie) {
892 pthread_mutex_unlock(&engine->tap_connections.lock);