14 #include "default_engine.h" 
   15 #include "memcached/util.h" 
   16 #include "memcached/config_parser.h" 
   19 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
 
   20                                             const char* config_str);
 
   23 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
 
   30                                                const rel_time_t exptime);
 
   31 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
 
   38 static void default_item_release(
ENGINE_HANDLE* handle, 
const void *cookie,
 
   46 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
 
   51 static void default_reset_stats(
ENGINE_HANDLE* handle, 
const void *cookie);
 
   56                                        ENGINE_STORE_OPERATION operation,
 
   58 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
 
   65                                             const uint64_t initial,
 
   66                                             const rel_time_t exptime,
 
   71                                        const void* cookie, time_t when);
 
   72 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
 
   74 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
 
   79 static ENGINE_ERROR_CODE default_tap_notify(
ENGINE_HANDLE* handle,
 
   81                                             void *engine_specific,
 
   85                                             tap_event_t tap_event,
 
  101                                              const void* userdata,
 
  104 static void default_handle_disconnect(
const void *cookie,
 
  105                                       ENGINE_EVENT_TYPE 
type,
 
  106                                       const void *event_data,
 
  107                                       const void *cb_data);
 
  115                               uint16_t vbid, vbucket_state_t 
to) {
 
  117     vi.c = e->vbucket_infos[vbid];
 
  119     e->vbucket_infos[vbid] = vi.c;
 
  122 static vbucket_state_t get_vbucket_state(
struct default_engine *e,
 
  125     vi.c = e->vbucket_infos[vbid];
 
  129 static bool handled_vbucket(
struct default_engine *e, uint16_t vbid) {
 
  130     return e->config.ignore_vbucket
 
  131         || (get_vbucket_state(e, vbid) == vbucket_state_active);
 
  135 #define VBUCKET_GUARD(e, v) if (!handled_vbucket(e, v)) { return ENGINE_NOT_MY_VBUCKET; } 
  137 static bool get_item_info(
ENGINE_HANDLE *handle, 
const void *cookie,
 
  140 static const char const * vbucket_state_name(vbucket_state_t s) {
 
  141     static const char const * vbucket_states[] = {
 
  142         [vbucket_state_active] = 
"active",
 
  143         [vbucket_state_replica] = 
"replica",
 
  144         [vbucket_state_pending] = 
"pending",
 
  145         [vbucket_state_dead] = 
"dead" 
  147     if (is_valid_vbucket_state_t(s)) {
 
  148         return vbucket_states[s];
 
  150         return "Illegal vbucket state";
 
  155                                   GET_SERVER_API get_server_api,
 
  158    if (interface != 1 || api == NULL) {
 
  159       return ENGINE_ENOTSUP;
 
  163    if (engine == NULL) {
 
  164       return ENGINE_ENOMEM;
 
  172          .get_info = default_get_info,
 
  173          .initialize = default_initialize,
 
  174          .destroy = default_destroy,
 
  175          .allocate = default_item_allocate,
 
  176          .remove = default_item_delete,
 
  177          .release = default_item_release,
 
  179          .get_stats = default_get_stats,
 
  180          .reset_stats = default_reset_stats,
 
  181          .store = default_store,
 
  182          .arithmetic = default_arithmetic,
 
  183          .flush = default_flush,
 
  184          .unknown_command = default_unknown_command,
 
  185          .tap_notify = default_tap_notify,
 
  186          .get_tap_iterator = default_get_tap_iterator,
 
  187          .item_set_cas = item_set_cas,
 
  188          .get_item_info = get_item_info
 
  191       .get_server_api = get_server_api,
 
  197          .lock = PTHREAD_MUTEX_INITIALIZER
 
  199       .cache_lock = PTHREAD_MUTEX_INITIALIZER,
 
  201          .lock = PTHREAD_MUTEX_INITIALIZER,
 
  207          .evict_to_free = 
true,
 
  208          .maxbytes = 64 * 1024 * 1024,
 
  209          .preallocate = 
false,
 
  212          .item_size_max= 1024 * 1024,
 
  215          .lock = PTHREAD_MUTEX_INITIALIZER,
 
  218          .lock = PTHREAD_MUTEX_INITIALIZER,
 
  221       .info.engine_info = {
 
  222            .description = 
"Default engine v0.1",
 
  225                [0].feature = ENGINE_FEATURE_LRU
 
  230    *engine = default_engine;
 
  231    engine->tap_connections.clients = calloc(default_engine.tap_connections.size, 
sizeof(
void*));
 
  232    if (engine->tap_connections.clients == NULL) {
 
  234        return ENGINE_ENOMEM;
 
  237    return ENGINE_SUCCESS;
 
  240 static inline struct default_engine* get_handle(
ENGINE_HANDLE* handle) {
 
  241    return (
struct default_engine*)handle;
 
  244 static inline hash_item* get_real_item(item* item) {
 
  249     return &get_handle(handle)->info.engine_info;
 
  252 static ENGINE_ERROR_CODE default_initialize(
ENGINE_HANDLE* handle,
 
  253                                             const char* config_str) {
 
  254    struct default_engine* se = get_handle(handle);
 
  256    ENGINE_ERROR_CODE 
ret = initalize_configuration(se, config_str);
 
  257    if (ret != ENGINE_SUCCESS) {
 
  262    if (se->config.use_cas) {
 
  266    ret = assoc_init(se);
 
  267    if (ret != ENGINE_SUCCESS) {
 
  271    ret = slabs_init(se, se->config.maxbytes, se->config.factor,
 
  272                     se->config.preallocate);
 
  273    if (ret != ENGINE_SUCCESS) {
 
  277    se->server.callback->
register_callback(handle, ON_DISCONNECT, default_handle_disconnect, handle);
 
  279    return ENGINE_SUCCESS;
 
  282 static void default_destroy(
ENGINE_HANDLE* handle, 
const bool force) {
 
  284    struct default_engine* se = get_handle(handle);
 
  288       pthread_mutex_destroy(&se->stats.lock);
 
  289       pthread_mutex_destroy(&se->slabs.
lock);
 
  291       free(se->tap_connections.clients);
 
  296 static ENGINE_ERROR_CODE default_item_allocate(
ENGINE_HANDLE* handle,
 
  303                                                const rel_time_t exptime) {
 
  304    struct default_engine* engine = get_handle(handle);
 
  305    size_t ntotal = 
sizeof(
hash_item) + nkey + nbytes;
 
  306    if (engine->config.use_cas) {
 
  307       ntotal += 
sizeof(uint64_t);
 
  309    unsigned int id = slabs_clsid(engine, ntotal);
 
  315    it = item_alloc(engine, key, nkey, flags, engine->server.core->
realtime(exptime),
 
  320       return ENGINE_SUCCESS;
 
  322       return ENGINE_ENOMEM;
 
  326 static ENGINE_ERROR_CODE default_item_delete(
ENGINE_HANDLE* handle,
 
  333    struct default_engine* engine = get_handle(handle);
 
  334    VBUCKET_GUARD(engine, vbucket);
 
  336    hash_item *it = item_get(engine, key, nkey);
 
  338       return ENGINE_KEY_ENOENT;
 
  341    if (cas == 0 || cas == item_get_cas(it)) {
 
  342       item_unlink(engine, it);
 
  343       item_release(engine, it);
 
  345       return ENGINE_KEY_EEXISTS;
 
  348    return ENGINE_SUCCESS;
 
  354    item_release(get_handle(handle), get_real_item(item));
 
  363    struct default_engine *engine = get_handle(handle);
 
  364    VBUCKET_GUARD(engine, vbucket);
 
  366    *item = item_get(engine, key, nkey);
 
  368       return ENGINE_SUCCESS;
 
  370       return ENGINE_KEY_ENOENT;
 
  374 static void stats_vbucket(
struct default_engine *e,
 
  376                           const void *cookie) {
 
  377     for (
int i = 0; 
i < NUM_VBUCKETS; 
i++) {
 
  378         vbucket_state_t state = get_vbucket_state(e, 
i);
 
  379         if (state != vbucket_state_dead) {
 
  381             snprintf(buf, 
sizeof(buf), 
"vb_%d", 
i);
 
  382             const char * state_name = vbucket_state_name(state);
 
  383             add_stat(buf, strlen(buf), state_name, strlen(state_name), cookie);
 
  388 static ENGINE_ERROR_CODE default_get_stats(
ENGINE_HANDLE* handle,
 
  390                                            const char* stat_key,
 
  394    struct default_engine* engine = get_handle(handle);
 
  395    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 
  397    if (stat_key == NULL) {
 
  401       pthread_mutex_lock(&engine->stats.lock);
 
  402       len = sprintf(val, 
"%"PRIu64, (uint64_t)engine->stats.evictions);
 
  403       add_stat(
"evictions", 9, val, len, cookie);
 
  404       len = sprintf(val, 
"%"PRIu64, (uint64_t)engine->stats.curr_items);
 
  405       add_stat(
"curr_items", 10, val, len, cookie);
 
  406       len = sprintf(val, 
"%"PRIu64, (uint64_t)engine->stats.total_items);
 
  407       add_stat(
"total_items", 11, val, len, cookie);
 
  408       len = sprintf(val, 
"%"PRIu64, (uint64_t)engine->stats.curr_bytes);
 
  409       add_stat(
"bytes", 5, val, len, cookie);
 
  410       len = sprintf(val, 
"%"PRIu64, engine->stats.reclaimed);
 
  411       add_stat(
"reclaimed", 9, val, len, cookie);
 
  412       len = sprintf(val, 
"%"PRIu64, (uint64_t)engine->config.maxbytes);
 
  413       add_stat(
"engine_maxbytes", 15, val, len, cookie);
 
  414       pthread_mutex_unlock(&engine->stats.lock);
 
  415    } 
else if (strncmp(stat_key, 
"slabs", 5) == 0) {
 
  416       slabs_stats(engine, add_stat, cookie);
 
  417    } 
else if (strncmp(stat_key, 
"items", 5) == 0) {
 
  418       item_stats(engine, add_stat, cookie);
 
  419    } 
else if (strncmp(stat_key, 
"sizes", 5) == 0) {
 
  420       item_stats_sizes(engine, add_stat, cookie);
 
  421    } 
else if (strncmp(stat_key, 
"vbucket", 7) == 0) {
 
  422       stats_vbucket(engine, add_stat, cookie);
 
  423    } 
else if (strncmp(stat_key, 
"scrub", 5) == 0) {
 
  427       pthread_mutex_lock(&engine->scrubber.lock);
 
  428       if (engine->scrubber.running) {
 
  429          add_stat(
"scrubber:status", 15, 
"running", 7, cookie);
 
  431          add_stat(
"scrubber:status", 15, 
"stopped", 7, cookie);
 
  434       if (engine->scrubber.started != 0) {
 
  435          if (engine->scrubber.stopped != 0) {
 
  436             time_t diff = engine->scrubber.started - engine->scrubber.stopped;
 
  437             len = sprintf(val, 
"%"PRIu64, (uint64_t)diff);
 
  438             add_stat(
"scrubber:last_run", 17, val, len, cookie);
 
  441          len = sprintf(val, 
"%"PRIu64, engine->scrubber.visited);
 
  442          add_stat(
"scrubber:visited", 16, val, len, cookie);
 
  443          len = sprintf(val, 
"%"PRIu64, engine->scrubber.cleaned);
 
  444          add_stat(
"scrubber:cleaned", 16, val, len, cookie);
 
  446       pthread_mutex_unlock(&engine->scrubber.lock);
 
  448       ret = ENGINE_KEY_ENOENT;
 
  454 static ENGINE_ERROR_CODE default_store(
ENGINE_HANDLE* handle,
 
  458                                        ENGINE_STORE_OPERATION operation,
 
  460     struct default_engine *engine = get_handle(handle);
 
  461     VBUCKET_GUARD(engine, vbucket);
 
  462     return store_item(engine, get_real_item(item), cas, operation,
 
  466 static ENGINE_ERROR_CODE default_arithmetic(
ENGINE_HANDLE* handle,
 
  470                                             const bool increment,
 
  472                                             const uint64_t delta,
 
  473                                             const uint64_t initial,
 
  474                                             const rel_time_t exptime,
 
  478    struct default_engine *engine = get_handle(handle);
 
  479    VBUCKET_GUARD(engine, vbucket);
 
  481    return arithmetic(engine, cookie, key, nkey, increment,
 
  482                      create, delta, initial, engine->server.core->
realtime(exptime), cas,
 
  486 static ENGINE_ERROR_CODE default_flush(
ENGINE_HANDLE* handle,
 
  487                                        const void* cookie, time_t when) {
 
  488    item_flush_expired(get_handle(handle), when);
 
  490    return ENGINE_SUCCESS;
 
  493 static void default_reset_stats(
ENGINE_HANDLE* handle, 
const void *cookie) {
 
  494    struct default_engine *engine = get_handle(handle);
 
  495    item_stats_reset(engine);
 
  497    pthread_mutex_lock(&engine->stats.lock);
 
  498    engine->stats.evictions = 0;
 
  499    engine->stats.reclaimed = 0;
 
  500    engine->stats.total_items = 0;
 
  501    pthread_mutex_unlock(&engine->stats.lock);
 
  504 static ENGINE_ERROR_CODE initalize_configuration(
struct default_engine *se,
 
  505                                                  const char *cfg_str) {
 
  506    ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 
  508    se->config.vb0 = 
true;
 
  510    if (cfg_str != NULL) {
 
  514            .value.dt_bool = &se->config.use_cas },
 
  517            .value.dt_size = &se->config.verbose },
 
  520            .value.dt_bool = &se->config.evict_to_free },
 
  521          { .key = 
"cache_size",
 
  523            .value.dt_size = &se->config.maxbytes },
 
  524          { .key = 
"preallocate",
 
  526            .value.dt_bool = &se->config.preallocate },
 
  528            .datatype = DT_FLOAT,
 
  529            .value.dt_float = &se->config.factor },
 
  530          { .key = 
"chunk_size",
 
  532            .value.dt_size = &se->config.chunk_size },
 
  533          { .key = 
"item_size_max",
 
  535            .value.dt_size = &se->config.item_size_max },
 
  536          { .key = 
"ignore_vbucket",
 
  538            .value.dt_bool = &se->config.ignore_vbucket },
 
  541            .value.dt_bool = &se->config.vb0 },
 
  542          { .key = 
"config_file",
 
  543            .datatype = DT_CONFIGFILE },
 
  547       ret = se->server.core->
parse_config(cfg_str, items, stderr);
 
  550    if (se->config.vb0) {
 
  551        set_vbucket_state(se, 0, vbucket_state_active);
 
  554    return ENGINE_SUCCESS;
 
  557 static bool set_vbucket(
struct default_engine *e,
 
  561     size_t bodylen = ntohl(req->message.header.request.bodylen)
 
  562         - ntohs(req->message.header.request.keylen);
 
  563     if (bodylen != 
sizeof(vbucket_state_t)) {
 
  564         const char *
msg = 
"Incorrect packet format";
 
  565         return response(NULL, 0, NULL, 0, msg, strlen(msg),
 
  566                         PROTOCOL_BINARY_RAW_BYTES,
 
  567                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
 
  569     vbucket_state_t state;
 
  570     memcpy(&state, &req->message.body.state, 
sizeof(state));
 
  571     state = ntohl(state);
 
  573     if (!is_valid_vbucket_state_t(state)) {
 
  574         const char *msg = 
"Invalid vbucket state";
 
  575         return response(NULL, 0, NULL, 0, msg, strlen(msg),
 
  576                         PROTOCOL_BINARY_RAW_BYTES,
 
  577                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
 
  580     set_vbucket_state(e, ntohs(req->message.header.request.vbucket), state);
 
  581     return response(NULL, 0, NULL, 0, &state, 
sizeof(state),
 
  582                     PROTOCOL_BINARY_RAW_BYTES,
 
  583                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
 
  586 static bool get_vbucket(
struct default_engine *e,
 
  590     vbucket_state_t state;
 
  591     state = get_vbucket_state(e, ntohs(req->message.header.request.vbucket));
 
  592     state = ntohl(state);
 
  594     return response(NULL, 0, NULL, 0, &state, 
sizeof(state),
 
  595                     PROTOCOL_BINARY_RAW_BYTES,
 
  596                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
 
  599 static bool rm_vbucket(
struct default_engine *e,
 
  603     set_vbucket_state(e, ntohs(req->request.vbucket), vbucket_state_dead);
 
  604     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  605                     PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
 
  608 static bool scrub_cmd(
struct default_engine *e,
 
  614     if (!item_start_scrub(e)) {
 
  615         res = PROTOCOL_BINARY_RESPONSE_EBUSY;
 
  618     return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  622 static bool touch(
struct default_engine *e, 
const void *cookie,
 
  625     if (request->request.extlen != 4 || request->request.keylen == 0) {
 
  626         return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  627                         PROTOCOL_BINARY_RESPONSE_EINVAL, 0, cookie);
 
  631     void *key = t->bytes + 
sizeof(t->bytes);
 
  632     uint32_t exptime = ntohl(t->message.body.expiration);
 
  633     uint16_t nkey = ntohs(request->request.keylen);
 
  635     hash_item *item = touch_item(e, key, nkey,
 
  638         if (request->request.opcode == PROTOCOL_BINARY_CMD_GATQ) {
 
  641             return response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  642                             PROTOCOL_BINARY_RESPONSE_KEY_ENOENT, 0, cookie);
 
  646         if (request->request.opcode == PROTOCOL_BINARY_CMD_TOUCH) {
 
  647             ret = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  648                            PROTOCOL_BINARY_RESPONSE_SUCCESS, 0, cookie);
 
  650             ret = response(NULL, 0, &item->
flags, 
sizeof(item->
flags),
 
  651                            item_get_data(item), item->
nbytes,
 
  652                            PROTOCOL_BINARY_RAW_BYTES,
 
  653                            PROTOCOL_BINARY_RESPONSE_SUCCESS,
 
  654                            item_get_cas(item), cookie);
 
  656         item_release(e, item);
 
  661 static ENGINE_ERROR_CODE default_unknown_command(
ENGINE_HANDLE* handle,
 
  666     struct default_engine* e = get_handle(handle);
 
  669     switch(request->request.opcode) {
 
  670     case PROTOCOL_BINARY_CMD_SCRUB:
 
  671         sent = scrub_cmd(e, cookie, request, response);
 
  673     case PROTOCOL_BINARY_CMD_DEL_VBUCKET:
 
  674         sent = rm_vbucket(e, cookie, request, response);
 
  676     case PROTOCOL_BINARY_CMD_SET_VBUCKET:
 
  677         sent = set_vbucket(e, cookie, (
void*)request, response);
 
  679     case PROTOCOL_BINARY_CMD_GET_VBUCKET:
 
  680         sent = get_vbucket(e, cookie, (
void*)request, response);
 
  682     case PROTOCOL_BINARY_CMD_TOUCH:
 
  683     case PROTOCOL_BINARY_CMD_GAT:
 
  684     case PROTOCOL_BINARY_CMD_GATQ:
 
  685         sent = touch(e, cookie, request, response);
 
  688         sent = response(NULL, 0, NULL, 0, NULL, 0, PROTOCOL_BINARY_RAW_BYTES,
 
  689                         PROTOCOL_BINARY_RESPONSE_UNKNOWN_COMMAND, 0, cookie);
 
  694         return ENGINE_SUCCESS;
 
  696         return ENGINE_FAILED;
 
  701 uint64_t item_get_cas(
const hash_item* item)
 
  703     if (item->
iflag & ITEM_WITH_CAS) {
 
  704         return *(uint64_t*)(item + 1);
 
  710                   item* item, uint64_t val)
 
  713     if (it->
iflag & ITEM_WITH_CAS) {
 
  714         *(uint64_t*)(it + 1) = val;
 
  718 const void* item_get_key(
const hash_item* item)
 
  720     char *ret = (
void*)(item + 1);
 
  721     if (item->
iflag & ITEM_WITH_CAS) {
 
  722         ret += 
sizeof(uint64_t);
 
  728 char* item_get_data(
const hash_item* item)
 
  730     return ((
char*)item_get_key(item)) + item->
nkey;
 
  733 uint8_t item_get_clsid(
const hash_item* item)
 
  738 static bool get_item_info(
ENGINE_HANDLE *handle, 
const void *cookie,
 
  742     if (item_info->nvalue < 1) {
 
  745     item_info->cas = item_get_cas(it);
 
  749     item_info->clsid = it->slabs_clsid;
 
  751     item_info->nvalue = 1;
 
  752     item_info->
key = item_get_key(it);
 
  753     item_info->value[0].iov_base = item_get_data(it);
 
  754     item_info->value[0].iov_len = it->
nbytes;
 
  758 static ENGINE_ERROR_CODE default_tap_notify(
ENGINE_HANDLE* handle,
 
  760                                             void *engine_specific,
 
  764                                             tap_event_t tap_event,
 
  774     struct default_engine* engine = get_handle(handle);
 
  775     vbucket_state_t state;
 
  777     ENGINE_ERROR_CODE ret = ENGINE_SUCCESS;
 
  785         return default_flush(handle, cookie, 0);
 
  788         return default_item_delete(handle, cookie, key, nkey, cas, vbucket);
 
  793             ret = default_item_allocate(handle, cookie, &it, key, nkey, ndata, flags, exptime);
 
  798                 return ENGINE_TMPFAIL;
 
  803         memcpy(item_get_data(it), data, ndata);
 
  805         item_set_cas(handle, cookie, it, cas);
 
  806         ret = default_store(handle, cookie, it, &cas, OPERATION_SET, vbucket);
 
  807         if (ret == ENGINE_EWOULDBLOCK) {
 
  810             item_release(engine, it);
 
  815     case TAP_VBUCKET_SET:
 
  816         if (nengine != 
sizeof(vbucket_state_t)) {
 
  818             return ENGINE_DISCONNECT;
 
  821         memcpy(&state, engine_specific, nengine);
 
  822         state = (vbucket_state_t)ntohl(state);
 
  824         if (!is_valid_vbucket_state_t(state)) {
 
  825             return ENGINE_DISCONNECT;
 
  828         set_vbucket_state(engine, vbucket, state);
 
  829         return ENGINE_SUCCESS;
 
  834         engine->server.log->get_logger()->
log(EXTENSION_LOG_DEBUG, cookie,
 
  835                     "Ignoring unknown tap event: %x", tap_event);
 
  846                                              const void* userdata,
 
  848     struct default_engine* engine = get_handle(handle);
 
  850     if ((flags & TAP_CONNECT_FLAG_TAKEOVER_VBUCKETS)) { 
 
  854     pthread_mutex_lock(&engine->tap_connections.lock);
 
  856     for (ii = 0; ii < engine->tap_connections.size; ++ii) {
 
  857         if (engine->tap_connections.clients[ii] == NULL) {
 
  858             engine->tap_connections.clients[ii] = cookie;
 
  862     pthread_mutex_unlock(&engine->tap_connections.lock);
 
  863     if (ii == engine->tap_connections.size) {
 
  868     if (!initialize_item_tap_walker(engine, cookie)) {
 
  870         pthread_mutex_lock(&engine->tap_connections.lock);
 
  871         engine->tap_connections.clients[ii] = NULL;
 
  872         pthread_mutex_unlock(&engine->tap_connections.lock);
 
  876     return item_tap_walker;
 
  879 static void default_handle_disconnect(
const void *cookie,
 
  880                                       ENGINE_EVENT_TYPE 
type,
 
  881                                       const void *event_data,
 
  882                                       const void *cb_data) {
 
  883     struct default_engine *engine = (
struct default_engine*)cb_data;
 
  884     pthread_mutex_lock(&engine->tap_connections.lock);
 
  886     for (ii = 0; ii < engine->tap_connections.size; ++ii) {
 
  887         if (engine->tap_connections.clients[ii] == cookie) {
 
  892     pthread_mutex_unlock(&engine->tap_connections.lock);