12 #include "default_engine.h"
18 const void *key,
const size_t nkey,
19 const int flags,
const rel_time_t exptime,
23 const char *key,
const size_t nkey);
37 #define ITEM_UPDATE_INTERVAL 60
42 static const int search_items = 50;
46 memset(engine->items.itemstats, 0,
sizeof(engine->items.itemstats));
55 if (engine->config.use_cas) {
56 ret +=
sizeof(uint64_t);
63 static uint64_t get_cas_id(
void) {
64 static uint64_t cas_id = 0;
70 # define DEBUG_REFCNT(it,op) \
71 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
72 it, op, it->refcount, \
73 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
74 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
76 # define DEBUG_REFCNT(it,op) while(0)
85 const rel_time_t exptime,
89 size_t ntotal =
sizeof(
hash_item) + nkey + nbytes;
90 if (engine->config.use_cas) {
91 ntotal +=
sizeof(uint64_t);
94 unsigned int id = slabs_clsid(engine, ntotal);
99 int tries = search_items;
104 for (search = engine->items.tails[
id];
105 tries > 0 && search != NULL;
106 tries--, search=search->prev) {
107 if (search->refcount == 0 &&
113 pthread_mutex_lock(&engine->stats.lock);
114 engine->stats.reclaimed++;
115 pthread_mutex_unlock(&engine->stats.lock);
116 engine->items.itemstats[
id].reclaimed++;
118 slabs_adjust_mem_requested(engine, it->slabs_clsid, ITEM_ntotal(engine, it), ntotal);
119 do_item_unlink(engine, it);
127 if (it == NULL && (it = slabs_alloc(engine, ntotal,
id)) == NULL) {
132 tries = search_items;
138 if (engine->config.evict_to_free == 0) {
139 engine->items.itemstats[
id].outofmemory++;
150 if (engine->items.tails[
id] == 0) {
151 engine->items.itemstats[
id].outofmemory++;
155 for (search = engine->items.tails[
id]; tries > 0 && search != NULL; tries--, search=search->prev) {
156 if (search->refcount == 0) {
158 engine->items.itemstats[
id].evicted++;
159 engine->items.itemstats[
id].evicted_time = current_time - search->time;
161 engine->items.itemstats[
id].evicted_nonzero++;
163 pthread_mutex_lock(&engine->stats.lock);
164 engine->stats.evictions++;
165 pthread_mutex_unlock(&engine->stats.lock);
166 engine->server.stat->
evicting(cookie,
167 item_get_key(search),
170 engine->items.itemstats[
id].reclaimed++;
171 pthread_mutex_lock(&engine->stats.lock);
172 engine->stats.reclaimed++;
173 pthread_mutex_unlock(&engine->stats.lock);
175 do_item_unlink(engine, search);
179 it = slabs_alloc(engine, ntotal,
id);
181 engine->items.itemstats[
id].outofmemory++;
189 tries = search_items;
190 for (search = engine->items.tails[
id]; tries > 0 && search != NULL; tries--, search=search->prev) {
191 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
192 engine->items.itemstats[
id].tailrepairs++;
193 search->refcount = 0;
194 do_item_unlink(engine, search);
198 it = slabs_alloc(engine, ntotal,
id);
205 assert(it->slabs_clsid == 0);
207 it->slabs_clsid =
id;
209 assert(it != engine->items.heads[it->slabs_clsid]);
211 it->next = it->prev = it->h_next = 0;
213 DEBUG_REFCNT(it,
'*');
214 it->
iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
218 memcpy((
void*)item_get_key(it), key, nkey);
224 size_t ntotal = ITEM_ntotal(engine, it);
226 assert((it->
iflag & ITEM_LINKED) == 0);
227 assert(it != engine->items.heads[it->slabs_clsid]);
228 assert(it != engine->items.tails[it->slabs_clsid]);
229 assert(it->refcount == 0);
232 clsid = it->slabs_clsid;
234 it->
iflag |= ITEM_SLABBED;
235 DEBUG_REFCNT(it,
'F');
236 slabs_free(engine, it, ntotal, clsid);
241 assert(it->slabs_clsid < POWER_LARGEST);
242 assert((it->
iflag & ITEM_SLABBED) == 0);
244 head = &engine->items.heads[it->slabs_clsid];
245 tail = &engine->items.tails[it->slabs_clsid];
247 assert((*head && *tail) || (*head == 0 && *tail == 0));
250 if (it->next) it->next->prev = it;
252 if (*tail == 0) *tail = it;
253 engine->items.sizes[it->slabs_clsid]++;
259 assert(it->slabs_clsid < POWER_LARGEST);
260 head = &engine->items.heads[it->slabs_clsid];
261 tail = &engine->items.tails[it->slabs_clsid];
264 assert(it->prev == 0);
268 assert(it->next == 0);
271 assert(it->next != it);
272 assert(it->prev != it);
274 if (it->next) it->next->prev = it->prev;
275 if (it->prev) it->prev->next = it->next;
276 engine->items.sizes[it->slabs_clsid]--;
281 MEMCACHED_ITEM_LINK(item_get_key(it), it->
nkey, it->
nbytes);
282 assert((it->
iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
283 assert(it->
nbytes < (1024 * 1024));
284 it->
iflag |= ITEM_LINKED;
286 assoc_insert(engine, engine->server.core->
hash(item_get_key(it),
290 pthread_mutex_lock(&engine->stats.lock);
291 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
292 engine->stats.curr_items += 1;
293 engine->stats.total_items += 1;
294 pthread_mutex_unlock(&engine->stats.lock);
297 item_set_cas(NULL, NULL, it, get_cas_id());
299 item_link_q(engine, it);
305 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->
nkey, it->
nbytes);
306 if ((it->
iflag & ITEM_LINKED) != 0) {
307 it->
iflag &= ~ITEM_LINKED;
308 pthread_mutex_lock(&engine->stats.lock);
309 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
310 engine->stats.curr_items -= 1;
311 pthread_mutex_unlock(&engine->stats.lock);
312 assoc_delete(engine, engine->server.core->
hash(item_get_key(it),
314 item_get_key(it), it->
nkey);
315 item_unlink_q(engine, it);
316 if (it->refcount == 0) {
317 item_free(engine, it);
323 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->
nkey, it->
nbytes);
324 if (it->refcount != 0) {
326 DEBUG_REFCNT(it,
'-');
328 if (it->refcount == 0 && (it->
iflag & ITEM_LINKED) == 0) {
329 item_free(engine, it);
335 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->
nkey, it->
nbytes);
336 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
337 assert((it->
iflag & ITEM_SLABBED) == 0);
339 if ((it->
iflag & ITEM_LINKED) != 0) {
340 item_unlink_q(engine, it);
341 it->time = current_time;
342 item_link_q(engine, it);
349 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->
nkey, it->
nbytes,
350 item_get_key(new_it), new_it->
nkey, new_it->
nbytes);
351 assert((it->
iflag & ITEM_SLABBED) == 0);
353 do_item_unlink(engine, it);
354 return do_item_link(engine, new_it);
358 static char *do_item_cachedump(
const unsigned int slabs_clsid,
359 const unsigned int limit,
360 unsigned int *bytes) {
362 unsigned int memlimit = 2 * 1024 * 1024;
364 unsigned int bufcurr;
367 unsigned int shown = 0;
371 it = engine->items.heads[slabs_clsid];
373 buffer = malloc((
size_t)memlimit);
374 if (buffer == 0)
return NULL;
378 while (it != NULL && (limit == 0 || shown < limit)) {
381 strncpy(key_temp, item_get_key(it), it->
nkey);
382 key_temp[it->
nkey] = 0x00;
383 len = snprintf(temp,
sizeof(temp),
"ITEM %s [%d b; %lu s]\r\n",
385 (
unsigned long)it->
exptime + process_started);
386 if (bufcurr + len + 6 > memlimit)
388 memcpy(buffer + bufcurr, temp, len);
395 memcpy(buffer + bufcurr,
"END\r\n", 6);
408 ADD_STAT add_stats,
const void *c) {
411 for (i = 0; i < POWER_LARGEST; i++) {
412 if (engine->items.tails[i] != NULL) {
413 int search = search_items;
415 engine->items.tails[i] != NULL &&
416 ((engine->config.oldest_live != 0 &&
417 engine->config.oldest_live <= current_time &&
418 engine->items.tails[i]->time <= engine->config.oldest_live) ||
419 (engine->items.tails[i]->
exptime != 0 &&
420 engine->items.tails[i]->
exptime < current_time))) {
422 if (engine->items.tails[i]->refcount == 0) {
423 do_item_unlink(engine, engine->items.tails[i]);
428 if (engine->items.tails[i] == NULL) {
433 const char *prefix =
"items";
434 add_statistics(c, add_stats, prefix, i,
"number",
"%u",
435 engine->items.sizes[i]);
436 add_statistics(c, add_stats, prefix, i,
"age",
"%u",
437 engine->items.tails[i]->time);
438 add_statistics(c, add_stats, prefix, i,
"evicted",
439 "%u", engine->items.itemstats[i].evicted);
440 add_statistics(c, add_stats, prefix, i,
"evicted_nonzero",
441 "%u", engine->items.itemstats[i].evicted_nonzero);
442 add_statistics(c, add_stats, prefix, i,
"evicted_time",
443 "%u", engine->items.itemstats[i].evicted_time);
444 add_statistics(c, add_stats, prefix, i,
"outofmemory",
445 "%u", engine->items.itemstats[i].outofmemory);
446 add_statistics(c, add_stats, prefix, i,
"tailrepairs",
447 "%u", engine->items.itemstats[i].tailrepairs);;
448 add_statistics(c, add_stats, prefix, i,
"reclaimed",
449 "%u", engine->items.itemstats[i].reclaimed);;
457 ADD_STAT add_stats,
const void *c) {
460 const int num_buckets = 32768;
461 unsigned int *histogram = calloc(num_buckets,
sizeof(
int));
463 if (histogram != NULL) {
467 for (i = 0; i < POWER_LARGEST; i++) {
470 int ntotal = ITEM_ntotal(engine, iter);
472 if ((ntotal % 32) != 0) bucket++;
473 if (bucket < num_buckets) histogram[bucket]++;
479 for (i = 0; i < num_buckets; i++) {
480 if (histogram[i] != 0) {
481 char key[8], val[32];
483 klen = snprintf(key,
sizeof(key),
"%d", i * 32);
484 vlen = snprintf(val,
sizeof(val),
"%u", histogram[i]);
485 assert(klen <
sizeof(key));
486 assert(vlen <
sizeof(val));
487 add_stats(key, klen, val, vlen, c);
496 const char *key,
const size_t nkey) {
498 hash_item *it = assoc_find(engine, engine->server.core->
hash(key,
503 if (engine->config.verbose > 2) {
507 logger->
log(EXTENSION_LOG_DEBUG, NULL,
508 "> NOT FOUND %s", key);
510 logger->
log(EXTENSION_LOG_DEBUG, NULL,
512 (
const char*)item_get_key(it));
517 if (it != NULL && engine->config.oldest_live != 0 &&
518 engine->config.oldest_live <= current_time &&
519 it->time <= engine->config.oldest_live) {
520 do_item_unlink(engine, it);
524 if (it == NULL && was_found) {
527 logger->
log(EXTENSION_LOG_DEBUG, NULL,
" -nuked by flush");
531 if (it != NULL && it->
exptime != 0 && it->
exptime <= current_time) {
532 do_item_unlink(engine, it);
536 if (it == NULL && was_found) {
539 logger->
log(EXTENSION_LOG_DEBUG, NULL,
" -nuked by expire");
545 DEBUG_REFCNT(it,
'+');
546 do_item_update(engine, it);
558 static ENGINE_ERROR_CODE do_store_item(
struct default_engine *engine,
560 ENGINE_STORE_OPERATION operation,
561 const void *cookie) {
562 const char *key = item_get_key(it);
564 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
568 if (old_it != NULL && operation == OPERATION_ADD) {
570 do_item_update(engine, old_it);
571 }
else if (!old_it && (operation == OPERATION_REPLACE
572 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
575 }
else if (operation == OPERATION_CAS) {
579 stored = ENGINE_KEY_ENOENT;
581 else if (item_get_cas(it) == item_get_cas(old_it)) {
585 do_item_replace(engine, old_it, it);
586 stored = ENGINE_SUCCESS;
588 if (engine->config.verbose > 1) {
591 logger->
log(EXTENSION_LOG_INFO, NULL,
592 "CAS: failure: expected %"PRIu64
", got %"PRIu64
"\n",
593 item_get_cas(old_it),
596 stored = ENGINE_KEY_EEXISTS;
603 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
607 if (item_get_cas(it) != 0) {
609 if (item_get_cas(it) != item_get_cas(old_it)) {
610 stored = ENGINE_KEY_EEXISTS;
614 if (stored == ENGINE_NOT_STORED) {
616 new_it = do_item_alloc(engine, key, it->
nkey,
622 if (new_it == NULL) {
624 if (old_it != NULL) {
625 do_item_release(engine, old_it);
628 return ENGINE_NOT_STORED;
633 if (operation == OPERATION_APPEND) {
634 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->
nbytes);
635 memcpy(item_get_data(new_it) + old_it->
nbytes, item_get_data(it), it->
nbytes);
638 memcpy(item_get_data(new_it), item_get_data(it), it->
nbytes);
639 memcpy(item_get_data(new_it) + it->
nbytes, item_get_data(old_it), old_it->
nbytes);
646 if (stored == ENGINE_NOT_STORED) {
647 if (old_it != NULL) {
648 do_item_replace(engine, old_it, it);
650 do_item_link(engine, it);
653 *cas = item_get_cas(it);
654 stored = ENGINE_SUCCESS;
658 if (old_it != NULL) {
659 do_item_release(engine, old_it);
662 if (new_it != NULL) {
663 do_item_release(engine, new_it);
666 if (stored == ENGINE_SUCCESS) {
667 *cas = item_get_cas(it);
685 static ENGINE_ERROR_CODE do_add_delta(
struct default_engine *engine,
687 const int64_t delta, uint64_t *rcas,
688 uint64_t *result,
const void *cookie) {
694 if (it->
nbytes >= (
sizeof(buf) - 1)) {
695 return ENGINE_EINVAL;
698 ptr = item_get_data(it);
699 memcpy(buf, ptr, it->
nbytes);
702 if (!safe_strtoull(buf, &value)) {
703 return ENGINE_EINVAL;
717 if ((res = snprintf(buf,
sizeof(buf),
"%" PRIu64, value)) == -1) {
718 return ENGINE_EINVAL;
721 if (it->refcount == 1 && res <= it->nbytes) {
723 memcpy(item_get_data(it), buf, res);
724 memset(item_get_data(it) + res,
' ', it->
nbytes - res);
725 item_set_cas(NULL, NULL, it, get_cas_id());
726 *rcas = item_get_cas(it);
728 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
732 if (new_it == NULL) {
733 do_item_unlink(engine, it);
734 return ENGINE_ENOMEM;
736 memcpy(item_get_data(new_it), buf, res);
737 do_item_replace(engine, it, new_it);
738 *rcas = item_get_cas(new_it);
739 do_item_release(engine, new_it);
742 return ENGINE_SUCCESS;
751 const void *key,
size_t nkey,
int flags,
752 rel_time_t exptime,
int nbytes,
const void *cookie) {
755 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
765 const void *key,
const size_t nkey) {
768 it = do_item_get(engine, key, nkey);
779 do_item_release(engine, item);
788 do_item_unlink(engine, item);
792 static ENGINE_ERROR_CODE do_arithmetic(
struct default_engine *engine,
796 const bool increment,
798 const uint64_t delta,
799 const uint64_t initial,
800 const rel_time_t exptime,
804 hash_item *item = do_item_get(engine, key, nkey);
805 ENGINE_ERROR_CODE
ret;
809 return ENGINE_KEY_ENOENT;
812 int len = snprintf(buffer,
sizeof(buffer),
"%"PRIu64,
815 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
817 return ENGINE_ENOMEM;
819 memcpy((
void*)item_get_data(item), buffer, len);
820 if ((ret = do_store_item(engine, item, cas,
821 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
823 *cas = item_get_cas(item);
825 do_item_release(engine, item);
828 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
829 do_item_release(engine, item);
839 const bool increment,
841 const uint64_t delta,
842 const uint64_t initial,
843 const rel_time_t exptime,
847 ENGINE_ERROR_CODE
ret;
850 ret = do_arithmetic(engine, cookie, key, nkey, increment,
851 create, delta, initial, exptime, cas,
862 ENGINE_STORE_OPERATION operation,
863 const void *cookie) {
864 ENGINE_ERROR_CODE
ret;
867 ret = do_store_item(engine, item, cas, operation, cookie);
877 hash_item *item = do_item_get(engine, key, nkey);
892 ret = do_touch_item(engine, key, nkey, exptime);
900 void item_flush_expired(
struct default_engine *engine, time_t when) {
909 engine->config.oldest_live = engine->server.core->
realtime(when) - 1;
912 if (engine->config.oldest_live != 0) {
913 for (i = 0; i < POWER_LARGEST; i++) {
921 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
922 if (iter->time >= engine->config.oldest_live) {
924 if ((iter->
iflag & ITEM_SLABBED) == 0) {
925 do_item_unlink(engine, iter);
941 unsigned int slabs_clsid,
943 unsigned int *bytes) {
947 ret = do_item_cachedump(slabs_clsid, limit, bytes);
953 ADD_STAT add_stat,
const void *cookie)
956 do_item_stats(engine, add_stat, cookie);
962 ADD_STAT add_stat,
const void *cookie)
965 do_item_stats_sizes(engine, add_stat, cookie);
972 cursor->slabs_clsid = (uint8_t)ii;
974 cursor->prev = engine->items.tails[ii];
975 engine->items.tails[ii]->next =
cursor;
976 engine->items.tails[ii] =
cursor;
977 engine->items.sizes[ii]++;
980 typedef ENGINE_ERROR_CODE (*ITERFUNC)(
struct default_engine *engine,
988 ENGINE_ERROR_CODE *error)
991 *error = ENGINE_SUCCESS;
993 while (cursor->prev != NULL && ii < steplength) {
997 item_unlink_q(engine, cursor);
1000 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
1002 cursor->prev = NULL;
1005 cursor->prev = ptr->prev;
1006 cursor->prev->next =
cursor;
1014 *error = itemfunc(engine, ptr, itemdata);
1015 if (*error != ENGINE_SUCCESS) {
1025 return (cursor->prev != NULL);
1028 static ENGINE_ERROR_CODE item_scrub(
struct default_engine *engine,
1032 engine->scrubber.visited++;
1034 if (item->refcount == 0 &&
1036 do_item_unlink(engine, item);
1037 engine->scrubber.cleaned++;
1039 return ENGINE_SUCCESS;
1045 ENGINE_ERROR_CODE
ret;
1049 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
1051 if (ret != ENGINE_SUCCESS) {
1057 static void *item_scubber_main(
void *arg)
1062 for (
int ii = 0; ii < POWER_LARGEST; ++ii) {
1065 if (engine->items.heads[ii] == NULL) {
1069 do_item_link_cursor(engine, &cursor, ii);
1074 item_scrub_class(engine, &cursor);
1078 pthread_mutex_lock(&engine->scrubber.lock);
1079 engine->scrubber.stopped = time(NULL);
1080 engine->scrubber.running =
false;
1081 pthread_mutex_unlock(&engine->scrubber.lock);
1089 pthread_mutex_lock(&engine->scrubber.lock);
1090 if (!engine->scrubber.running) {
1091 engine->scrubber.started = time(NULL);
1092 engine->scrubber.stopped = 0;
1093 engine->scrubber.visited = 0;
1094 engine->scrubber.cleaned = 0;
1095 engine->scrubber.running =
true;
1098 pthread_attr_t attr;
1100 if (pthread_attr_init(&attr) != 0 ||
1101 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1102 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1104 engine->scrubber.running =
false;
1109 pthread_mutex_unlock(&engine->scrubber.lock);
1119 static ENGINE_ERROR_CODE item_tap_iterfunc(
struct default_engine *engine,
1124 ++client->it->refcount;
1125 return ENGINE_SUCCESS;
1128 static tap_event_t do_item_tap_walker(
struct default_engine *engine,
1129 const void *cookie, item **itm,
1130 void **es, uint16_t *nes, uint8_t *ttl,
1131 uint16_t *flags, uint32_t *seqno,
1135 if (client == NULL) {
1136 return TAP_DISCONNECT;
1147 ENGINE_ERROR_CODE r;
1149 if (!do_item_walk_cursor(engine, &client->cursor, 1, item_tap_iterfunc, client, &r)) {
1151 bool linked =
false;
1152 for (
int ii = client->cursor.slabs_clsid + 1; ii < POWER_LARGEST && !linked; ++ii) {
1153 if (engine->items.heads[ii] != NULL) {
1155 do_item_link_cursor(engine, &client->cursor, ii);
1163 }
while (client->it == NULL);
1166 return (*itm == NULL) ? TAP_DISCONNECT : TAP_MUTATION;
1170 const void *cookie, item **itm,
1171 void **es, uint16_t *nes, uint8_t *ttl,
1172 uint16_t *flags, uint32_t *seqno,
1178 ret = do_item_tap_walker(engine, cookie, itm, es, nes, ttl, flags, seqno, vbucket);
1187 struct tap_client *client = calloc(1,
sizeof(*client));
1188 if (client == NULL) {
1191 client->cursor.refcount = 1;
1194 bool linked =
false;
1195 for (
int ii = 0; ii < POWER_LARGEST && !linked; ++ii) {
1197 if (engine->items.heads[ii] != NULL) {
1199 do_item_link_cursor(engine, &client->cursor, ii);