12 #include "default_engine.h"
18 const void *key,
const size_t nkey,
19 const int flags,
const rel_time_t exptime,
23 const char *key,
const size_t nkey);
37 #define ITEM_UPDATE_INTERVAL 60
39 void cache_set_initial_cas_id(uint64_t cas);
40 static uint64_t cas_id;
44 memset(engine->items.itemstats, 0,
sizeof(engine->items.itemstats));
53 if (engine->config.use_cas) {
54 ret +=
sizeof(uint64_t);
61 void cache_set_initial_cas_id(uint64_t cas) {
66 static uint64_t get_cas_id(
void) {
72 # define DEBUG_REFCNT(it,op) \
73 fprintf(stderr, "item %x refcnt(%c) %d %c%c%c\n", \
74 it, op, it->refcount, \
75 (it->it_flags & ITEM_LINKED) ? 'L' : ' ', \
76 (it->it_flags & ITEM_SLABBED) ? 'S' : ' ')
78 # define DEBUG_REFCNT(it,op) while(0)
87 const rel_time_t exptime,
91 size_t ntotal =
sizeof(
hash_item) + nkey + nbytes;
92 if (engine->config.use_cas) {
93 ntotal +=
sizeof(uint64_t);
96 unsigned int id = slabs_clsid(engine, ntotal);
106 for (search = engine->items.tails[
id];
107 tries > 0 && search != NULL;
108 tries--, search=search->prev) {
109 if (search->refcount == 0 &&
115 pthread_mutex_lock(&engine->stats.lock);
116 engine->stats.reclaimed++;
117 pthread_mutex_unlock(&engine->stats.lock);
118 engine->items.itemstats[
id].reclaimed++;
120 do_item_unlink(engine, it);
128 if (it == NULL && (it = slabs_alloc(engine, ntotal,
id)) == NULL) {
139 if (engine->config.evict_to_free == 0) {
140 engine->items.itemstats[
id].outofmemory++;
151 if (engine->items.tails[
id] == 0) {
152 engine->items.itemstats[
id].outofmemory++;
156 for (search = engine->items.tails[
id]; tries > 0 && search != NULL; tries--, search=search->prev) {
157 if (search->refcount == 0) {
159 engine->items.itemstats[
id].evicted++;
160 engine->items.itemstats[
id].evicted_time = current_time - search->time;
162 engine->items.itemstats[
id].evicted_nonzero++;
164 pthread_mutex_lock(&engine->stats.lock);
165 engine->stats.evictions++;
166 pthread_mutex_unlock(&engine->stats.lock);
167 engine->server.stat->
evicting(cookie,
168 item_get_key(search),
171 engine->items.itemstats[
id].reclaimed++;
172 pthread_mutex_lock(&engine->stats.lock);
173 engine->stats.reclaimed++;
174 pthread_mutex_unlock(&engine->stats.lock);
176 do_item_unlink(engine, search);
180 it = slabs_alloc(engine, ntotal,
id);
182 engine->items.itemstats[
id].outofmemory++;
191 for (search = engine->items.tails[
id]; tries > 0 && search != NULL; tries--, search=search->prev) {
192 if (search->refcount != 0 && search->time + TAIL_REPAIR_TIME < current_time) {
193 engine->items.itemstats[
id].tailrepairs++;
194 search->refcount = 0;
195 do_item_unlink(engine, search);
199 it = slabs_alloc(engine, ntotal,
id);
206 assert(it->slabs_clsid == 0);
208 it->slabs_clsid =
id;
210 assert(it != engine->items.heads[it->slabs_clsid]);
212 it->next = it->prev = it->h_next = 0;
214 DEBUG_REFCNT(it,
'*');
215 it->
iflag = engine->config.use_cas ? ITEM_WITH_CAS : 0;
219 memcpy((
void*)item_get_key(it), key, nkey);
225 size_t ntotal = ITEM_ntotal(engine, it);
227 assert((it->
iflag & ITEM_LINKED) == 0);
228 assert(it != engine->items.heads[it->slabs_clsid]);
229 assert(it != engine->items.tails[it->slabs_clsid]);
230 assert(it->refcount == 0);
233 clsid = it->slabs_clsid;
235 it->
iflag |= ITEM_SLABBED;
236 DEBUG_REFCNT(it,
'F');
237 slabs_free(engine, it, ntotal, clsid);
242 assert(it->slabs_clsid < POWER_LARGEST);
243 assert((it->
iflag & ITEM_SLABBED) == 0);
245 head = &engine->items.heads[it->slabs_clsid];
246 tail = &engine->items.tails[it->slabs_clsid];
248 assert((*head && *tail) || (*head == 0 && *tail == 0));
251 if (it->next) it->next->prev = it;
253 if (*tail == 0) *tail = it;
254 engine->items.sizes[it->slabs_clsid]++;
260 assert(it->slabs_clsid < POWER_LARGEST);
261 head = &engine->items.heads[it->slabs_clsid];
262 tail = &engine->items.tails[it->slabs_clsid];
265 assert(it->prev == 0);
269 assert(it->next == 0);
272 assert(it->next != it);
273 assert(it->prev != it);
275 if (it->next) it->next->prev = it->prev;
276 if (it->prev) it->prev->next = it->next;
277 engine->items.sizes[it->slabs_clsid]--;
282 MEMCACHED_ITEM_LINK(item_get_key(it), it->
nkey, it->
nbytes);
283 assert((it->
iflag & (ITEM_LINKED|ITEM_SLABBED)) == 0);
284 assert(it->
nbytes < (1024 * 1024));
285 it->
iflag |= ITEM_LINKED;
287 assoc_insert(engine, engine->server.core->
hash(item_get_key(it),
291 pthread_mutex_lock(&engine->stats.lock);
292 engine->stats.curr_bytes += ITEM_ntotal(engine, it);
293 engine->stats.curr_items += 1;
294 engine->stats.total_items += 1;
295 pthread_mutex_unlock(&engine->stats.lock);
298 item_set_cas(NULL, NULL, it, get_cas_id());
300 item_link_q(engine, it);
306 MEMCACHED_ITEM_UNLINK(item_get_key(it), it->
nkey, it->
nbytes);
307 if ((it->
iflag & ITEM_LINKED) != 0) {
308 it->
iflag &= ~ITEM_LINKED;
309 pthread_mutex_lock(&engine->stats.lock);
310 engine->stats.curr_bytes -= ITEM_ntotal(engine, it);
311 engine->stats.curr_items -= 1;
312 pthread_mutex_unlock(&engine->stats.lock);
313 assoc_delete(engine, engine->server.core->
hash(item_get_key(it),
315 item_get_key(it), it->
nkey);
316 item_unlink_q(engine, it);
317 if (it->refcount == 0) {
318 item_free(engine, it);
324 MEMCACHED_ITEM_REMOVE(item_get_key(it), it->
nkey, it->
nbytes);
325 if (it->refcount != 0) {
327 DEBUG_REFCNT(it,
'-');
329 if (it->refcount == 0 && (it->
iflag & ITEM_LINKED) == 0) {
330 item_free(engine, it);
336 MEMCACHED_ITEM_UPDATE(item_get_key(it), it->
nkey, it->
nbytes);
337 if (it->time < current_time - ITEM_UPDATE_INTERVAL) {
338 assert((it->
iflag & ITEM_SLABBED) == 0);
340 if ((it->
iflag & ITEM_LINKED) != 0) {
341 item_unlink_q(engine, it);
342 it->time = current_time;
343 item_link_q(engine, it);
350 MEMCACHED_ITEM_REPLACE(item_get_key(it), it->
nkey, it->
nbytes,
351 item_get_key(new_it), new_it->
nkey, new_it->
nbytes);
352 assert((it->
iflag & ITEM_SLABBED) == 0);
354 do_item_unlink(engine, it);
355 return do_item_link(engine, new_it);
359 static char *do_item_cachedump(
const unsigned int slabs_clsid,
360 const unsigned int limit,
361 unsigned int *bytes) {
363 unsigned int memlimit = 2 * 1024 * 1024;
365 unsigned int bufcurr;
368 unsigned int shown = 0;
372 it = engine->items.heads[slabs_clsid];
374 buffer = malloc((
size_t)memlimit);
375 if (buffer == 0)
return NULL;
379 while (it != NULL && (limit == 0 || shown < limit)) {
382 strncpy(key_temp, item_get_key(it), it->
nkey);
383 key_temp[it->
nkey] = 0x00;
384 len = snprintf(temp,
sizeof(temp),
"ITEM %s [%d b; %lu s]\r\n",
386 (
unsigned long)it->
exptime + process_started);
387 if (bufcurr + len + 6 > memlimit)
389 memcpy(buffer + bufcurr, temp, len);
396 memcpy(buffer + bufcurr,
"END\r\n", 6);
409 ADD_STAT add_stats,
const void *c) {
411 for (i = 0; i < POWER_LARGEST; i++) {
412 if (engine->items.tails[i] != NULL) {
413 const char *prefix =
"items";
414 add_statistics(c, add_stats, prefix, i,
"number",
"%u",
415 engine->items.sizes[i]);
416 add_statistics(c, add_stats, prefix, i,
"age",
"%u",
417 engine->items.tails[i]->time);
418 add_statistics(c, add_stats, prefix, i,
"evicted",
419 "%u", engine->items.itemstats[i].evicted);
420 add_statistics(c, add_stats, prefix, i,
"evicted_nonzero",
421 "%u", engine->items.itemstats[i].evicted_nonzero);
422 add_statistics(c, add_stats, prefix, i,
"evicted_time",
423 "%u", engine->items.itemstats[i].evicted_time);
424 add_statistics(c, add_stats, prefix, i,
"outofmemory",
425 "%u", engine->items.itemstats[i].outofmemory);
426 add_statistics(c, add_stats, prefix, i,
"tailrepairs",
427 "%u", engine->items.itemstats[i].tailrepairs);;
428 add_statistics(c, add_stats, prefix, i,
"reclaimed",
429 "%u", engine->items.itemstats[i].reclaimed);;
437 ADD_STAT add_stats,
const void *c) {
440 const int num_buckets = 32768;
441 unsigned int *histogram = calloc(num_buckets,
sizeof(
int));
443 if (histogram != NULL) {
447 for (i = 0; i < POWER_LARGEST; i++) {
450 int ntotal = ITEM_ntotal(engine, iter);
452 if ((ntotal % 32) != 0) bucket++;
453 if (bucket < num_buckets) histogram[bucket]++;
459 for (i = 0; i < num_buckets; i++) {
460 if (histogram[i] != 0) {
461 char key[8], val[32];
463 klen = snprintf(key,
sizeof(key),
"%d", i * 32);
464 vlen = snprintf(val,
sizeof(val),
"%u", histogram[i]);
465 assert(klen <
sizeof(key));
466 assert(vlen <
sizeof(val));
467 add_stats(key, klen, val, vlen, c);
476 const char *key,
const size_t nkey) {
478 hash_item *it = assoc_find(engine, engine->server.core->
hash(key,
483 if (engine->config.verbose > 2) {
485 fprintf(stderr,
"> NOT FOUND %s", key);
487 fprintf(stderr,
"> FOUND KEY %s", (
const char*)item_get_key(it));
492 if (it != NULL && engine->config.oldest_live != 0 &&
493 engine->config.oldest_live <= current_time &&
494 it->time <= engine->config.oldest_live) {
495 do_item_unlink(engine, it);
499 if (it == NULL && was_found) {
500 fprintf(stderr,
" -nuked by flush");
504 if (it != NULL && it->
exptime != 0 && it->
exptime <= current_time) {
505 do_item_unlink(engine, it);
509 if (it == NULL && was_found) {
510 fprintf(stderr,
" -nuked by expire");
516 DEBUG_REFCNT(it,
'+');
517 do_item_update(engine, it);
520 if (engine->config.verbose > 2)
521 fprintf(stderr,
"\n");
532 static ENGINE_ERROR_CODE do_store_item(
struct default_engine *engine,
534 ENGINE_STORE_OPERATION operation,
535 const void *cookie) {
536 const char *key = item_get_key(it);
538 ENGINE_ERROR_CODE stored = ENGINE_NOT_STORED;
542 if (old_it != NULL && operation == OPERATION_ADD) {
544 do_item_update(engine, old_it);
545 }
else if (!old_it && (operation == OPERATION_REPLACE
546 || operation == OPERATION_APPEND || operation == OPERATION_PREPEND))
549 }
else if (operation == OPERATION_CAS) {
553 stored = ENGINE_KEY_ENOENT;
555 else if (item_get_cas(it) == item_get_cas(old_it)) {
559 do_item_replace(engine, old_it, it);
560 stored = ENGINE_SUCCESS;
562 if (engine->config.verbose > 1) {
564 "CAS: failure: expected %"PRIu64
", got %"PRIu64
"\n",
565 item_get_cas(old_it),
568 stored = ENGINE_KEY_EEXISTS;
575 if (operation == OPERATION_APPEND || operation == OPERATION_PREPEND) {
579 if (item_get_cas(it) != 0) {
581 if (item_get_cas(it) != item_get_cas(old_it)) {
582 stored = ENGINE_KEY_EEXISTS;
586 if (stored == ENGINE_NOT_STORED) {
588 new_it = do_item_alloc(engine, key, it->
nkey,
594 if (new_it == NULL) {
596 if (old_it != NULL) {
597 do_item_release(engine, old_it);
600 return ENGINE_NOT_STORED;
605 if (operation == OPERATION_APPEND) {
606 memcpy(item_get_data(new_it), item_get_data(old_it), old_it->
nbytes);
607 memcpy(item_get_data(new_it) + old_it->
nbytes - 2 , item_get_data(it), it->
nbytes);
610 memcpy(item_get_data(new_it), item_get_data(it), it->
nbytes);
611 memcpy(item_get_data(new_it) + it->
nbytes - 2 , item_get_data(old_it), old_it->
nbytes);
618 if (stored == ENGINE_NOT_STORED) {
619 if (old_it != NULL) {
620 do_item_replace(engine, old_it, it);
622 do_item_link(engine, it);
625 *cas = item_get_cas(it);
626 stored = ENGINE_SUCCESS;
630 if (old_it != NULL) {
631 do_item_release(engine, old_it);
634 if (new_it != NULL) {
635 do_item_release(engine, new_it);
638 if (stored == ENGINE_SUCCESS) {
639 *cas = item_get_cas(it);
657 static ENGINE_ERROR_CODE do_add_delta(
struct default_engine *engine,
659 const int64_t delta, uint64_t *rcas,
660 uint64_t *result,
const void *cookie) {
665 ptr = item_get_data(it);
667 if (!safe_strtoull(ptr, &value)) {
668 return ENGINE_EINVAL;
683 if ((res = snprintf(buf,
sizeof(buf),
"%" PRIu64
"\r\n", value)) == -1) {
684 return ENGINE_EINVAL;
686 hash_item *new_it = do_item_alloc(engine, item_get_key(it),
691 do_item_unlink(engine, it);
692 return ENGINE_ENOMEM;
694 memcpy(item_get_data(new_it), buf, res);
695 do_item_replace(engine, it, new_it);
696 *rcas = item_get_cas(new_it);
697 do_item_release(engine, new_it);
699 return ENGINE_SUCCESS;
708 const void *key,
size_t nkey,
int flags,
709 rel_time_t exptime,
int nbytes,
const void *cookie) {
712 it = do_item_alloc(engine, key, nkey, flags, exptime, nbytes, cookie);
722 const void *key,
const size_t nkey) {
725 it = do_item_get(engine, key, nkey);
736 do_item_release(engine, item);
745 do_item_unlink(engine, item);
749 static ENGINE_ERROR_CODE do_arithmetic(
struct default_engine *engine,
753 const bool increment,
755 const uint64_t delta,
756 const uint64_t initial,
757 const rel_time_t exptime,
761 hash_item *item = do_item_get(engine, key, nkey);
762 ENGINE_ERROR_CODE
ret;
766 return ENGINE_KEY_ENOENT;
769 int len = snprintf(buffer,
sizeof(buffer),
"%"PRIu64
"\r\n",
772 item = do_item_alloc(engine, key, nkey, 0, exptime, len, cookie);
774 return ENGINE_ENOMEM;
776 memcpy((
void*)item_get_data(item), buffer, len);
777 if ((ret = do_store_item(engine, item, cas,
778 OPERATION_ADD, cookie)) == ENGINE_SUCCESS) {
780 *cas = item_get_cas(item);
782 do_item_release(engine, item);
785 ret = do_add_delta(engine, item, increment, delta, cas, result, cookie);
786 do_item_release(engine, item);
796 const bool increment,
798 const uint64_t delta,
799 const uint64_t initial,
800 const rel_time_t exptime,
804 ENGINE_ERROR_CODE
ret;
807 ret = do_arithmetic(engine, cookie, key, nkey, increment,
808 create, delta, initial, exptime, cas,
819 ENGINE_STORE_OPERATION operation,
820 const void *cookie) {
821 ENGINE_ERROR_CODE
ret;
824 ret = do_store_item(engine, item, cas, operation, cookie);
832 void item_flush_expired(
struct default_engine *engine, time_t when) {
841 engine->config.oldest_live = engine->server.core->
realtime(when) - 1;
844 if (engine->config.oldest_live != 0) {
845 for (i = 0; i < POWER_LARGEST; i++) {
853 for (iter = engine->items.heads[i]; iter != NULL; iter = next) {
854 if (iter->time >= engine->config.oldest_live) {
856 if ((iter->
iflag & ITEM_SLABBED) == 0) {
857 do_item_unlink(engine, iter);
873 unsigned int slabs_clsid,
875 unsigned int *bytes) {
879 ret = do_item_cachedump(slabs_clsid, limit, bytes);
885 ADD_STAT add_stat,
const void *cookie)
888 do_item_stats(engine, add_stat, cookie);
894 ADD_STAT add_stat,
const void *cookie)
897 do_item_stats_sizes(engine, add_stat, cookie);
904 cursor->slabs_clsid = (uint8_t)ii;
906 cursor->prev = engine->items.tails[ii];
907 engine->items.tails[ii]->next =
cursor;
908 engine->items.tails[ii] =
cursor;
909 engine->items.sizes[ii]++;
912 typedef ENGINE_ERROR_CODE (*ITERFUNC)(
struct default_engine *engine,
920 ENGINE_ERROR_CODE *error)
923 *error = ENGINE_SUCCESS;
925 while (cursor->prev != NULL && ii < steplength) {
929 item_unlink_q(engine, cursor);
933 if (ptr == engine->items.heads[cursor->slabs_clsid]) {
937 cursor->prev = ptr->prev;
938 cursor->prev->next =
cursor;
944 *error = itemfunc(engine, ptr, itemdata);
945 if (*error != ENGINE_SUCCESS) {
958 static ENGINE_ERROR_CODE item_scrub(
struct default_engine *engine,
962 engine->scrubber.visited++;
964 if (item->refcount == 0 &&
966 do_item_unlink(engine, item);
967 engine->scrubber.cleaned++;
969 return ENGINE_SUCCESS;
975 ENGINE_ERROR_CODE
ret;
979 more = do_item_walk_cursor(engine, cursor, 200, item_scrub, NULL, &ret);
981 if (ret != ENGINE_SUCCESS) {
987 static void *item_scubber_main(
void *arg)
992 for (
int ii = 0; ii < POWER_LARGEST; ++ii) {
995 if (engine->items.heads[ii] == NULL) {
999 do_item_link_cursor(engine, &cursor, ii);
1004 item_scrub_class(engine, &cursor);
1008 pthread_mutex_lock(&engine->scrubber.lock);
1009 engine->scrubber.stopped = time(NULL);
1010 engine->scrubber.running =
false;
1011 pthread_mutex_unlock(&engine->scrubber.lock);
1019 pthread_mutex_lock(&engine->scrubber.lock);
1020 if (!engine->scrubber.running) {
1021 engine->scrubber.started = time(NULL);
1022 engine->scrubber.stopped = 0;
1023 engine->scrubber.visited = 0;
1024 engine->scrubber.cleaned = 0;
1025 engine->scrubber.running =
true;
1028 pthread_attr_t attr;
1030 if (pthread_attr_init(&attr) != 0 ||
1031 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED) != 0 ||
1032 pthread_create(&t, &attr, item_scubber_main, engine) != 0)
1034 engine->scrubber.running =
false;
1039 pthread_mutex_unlock(&engine->scrubber.lock);