32 #define WIN32_LEAN_AND_MEAN
35 #undef WIN32_LEAN_AND_MEAN
38 #include <sys/types.h>
40 #include <sys/socket.h>
42 #ifdef HAVE_SYS_TIME_H
45 #include <sys/queue.h>
51 #ifndef HAVE_TAILQFOREACH
52 #include <event-internal.h>
62 #include "evrpc-internal.h"
68 evrpc_init(
struct evhttp *http_server)
77 TAILQ_INIT(&base->registered_rpcs);
78 TAILQ_INIT(&base->input_hooks);
79 TAILQ_INIT(&base->output_hooks);
80 base->http_server = http_server;
91 while ((rpc = TAILQ_FIRST(&base->registered_rpcs)) != NULL) {
92 assert(evrpc_unregister_rpc(base, rpc->uri));
94 while ((hook = TAILQ_FIRST(&base->input_hooks)) != NULL) {
97 while ((hook = TAILQ_FIRST(&base->output_hooks)) != NULL) {
104 evrpc_add_hook(
void *vbase,
110 struct evrpc_hook_list *head = NULL;
114 head = &base->in_hooks;
117 head = &base->out_hooks;
124 assert(hook != NULL);
127 hook->process_arg = cb_arg;
128 TAILQ_INSERT_TAIL(head, hook, next);
134 evrpc_remove_hook_internal(
struct evrpc_hook_list *head,
void *handle)
137 TAILQ_FOREACH(hook, head, next) {
138 if (hook == handle) {
139 TAILQ_REMOVE(head, hook, next);
156 struct evrpc_hook_list *head = NULL;
159 head = &base->in_hooks;
162 head = &base->out_hooks;
168 return (evrpc_remove_hook_internal(head, handle));
172 evrpc_process_hooks(
struct evrpc_hook_list *head,
176 TAILQ_FOREACH(hook, head, next) {
177 if (hook->process(req, evbuf, hook->process_arg) == -1)
184 static void evrpc_pool_schedule(
struct evrpc_pool *pool);
195 evrpc_construct_uri(
const char *uri)
197 char *constructed_uri;
198 int constructed_uri_len;
200 constructed_uri_len = strlen(EVRPC_URI_PREFIX) + strlen(uri) + 1;
201 if ((constructed_uri = malloc(constructed_uri_len)) == NULL)
202 event_err(1,
"%s: failed to register rpc at %s",
204 memcpy(constructed_uri, EVRPC_URI_PREFIX, strlen(EVRPC_URI_PREFIX));
205 memcpy(constructed_uri + strlen(EVRPC_URI_PREFIX), uri, strlen(uri));
206 constructed_uri[constructed_uri_len - 1] =
'\0';
208 return (constructed_uri);
215 char *constructed_uri = evrpc_construct_uri(rpc->uri);
219 rpc->cb_arg = cb_arg;
221 TAILQ_INSERT_TAIL(&base->registered_rpcs, rpc, next);
228 free(constructed_uri);
236 char *registered_uri = NULL;
240 TAILQ_FOREACH(rpc, &base->registered_rpcs, next) {
241 if (strcmp(rpc->uri, name) == 0)
248 TAILQ_REMOVE(&base->registered_rpcs, rpc, next);
250 free((
char *)rpc->uri);
253 registered_uri = evrpc_construct_uri(name);
256 assert(
evhttp_del_cb(base->http_server, registered_uri) == 0);
258 free(registered_uri);
265 struct evrpc *rpc = arg;
269 if (req->type != EVHTTP_REQ_POST ||
270 EVBUFFER_LENGTH(req->input_buffer) <= 0)
278 if (evrpc_process_hooks(&rpc->base->input_hooks,
279 req, req->input_buffer) == -1)
283 if (rpc_state == NULL)
287 rpc_state->request = rpc->request_new();
288 if (rpc_state->request == NULL)
291 rpc_state->rpc = rpc;
293 if (rpc->request_unmarshal(
294 rpc_state->request, req->input_buffer) == -1) {
301 rpc_state->reply = rpc->reply_new();
302 if (rpc_state->reply == NULL)
305 rpc_state->http_req = req;
306 rpc_state->done = evrpc_request_done;
309 rpc->cb(rpc_state, rpc->cb_arg);
314 evrpc_reqstate_free(rpc_state);
323 if (rpc_state != NULL) {
324 struct evrpc *rpc = rpc_state->rpc;
326 if (rpc_state->request != NULL)
327 rpc->request_free(rpc_state->request);
328 if (rpc_state->reply != NULL)
329 rpc->reply_free(rpc_state->reply);
338 struct evrpc *rpc = rpc_state->rpc;
341 if (rpc->reply_complete(rpc_state->reply) == -1) {
346 if ((data = evbuffer_new()) == NULL) {
352 rpc->reply_marshal(data, rpc_state->reply);
355 if (evrpc_process_hooks(&rpc->base->output_hooks,
360 if (evhttp_find_header(req->output_headers,
"Content-Type") == NULL) {
361 evhttp_add_header(req->output_headers,
362 "Content-Type",
"application/octet-stream");
369 evrpc_reqstate_free(rpc_state);
376 evrpc_reqstate_free(rpc_state);
393 TAILQ_INIT(&pool->connections);
394 TAILQ_INIT(&pool->requests);
396 TAILQ_INIT(&pool->input_hooks);
397 TAILQ_INIT(&pool->output_hooks);
419 while ((request = TAILQ_FIRST(&pool->requests)) != NULL) {
420 TAILQ_REMOVE(&pool->requests, request, next);
422 evrpc_request_wrapper_free(request);
425 while ((connection = TAILQ_FIRST(&pool->connections)) != NULL) {
426 TAILQ_REMOVE(&pool->connections, connection, next);
430 while ((hook = TAILQ_FIRST(&pool->input_hooks)) != NULL) {
431 assert(evrpc_remove_hook(pool,
EVRPC_INPUT, hook));
434 while ((hook = TAILQ_FIRST(&pool->output_hooks)) != NULL) {
447 evrpc_pool_add_connection(
struct evrpc_pool *pool,
449 assert(connection->http_server == NULL);
450 TAILQ_INSERT_TAIL(&pool->connections, connection, next);
455 if (pool->base != NULL)
462 if (connection->timeout == -1)
463 connection->timeout = pool->timeout;
470 if (TAILQ_FIRST(&pool->requests) != NULL) {
472 TAILQ_FIRST(&pool->requests);
473 TAILQ_REMOVE(&pool->requests, request, next);
474 evrpc_schedule_request(connection, request);
479 evrpc_pool_set_timeout(
struct evrpc_pool *pool,
int timeout_in_secs)
482 TAILQ_FOREACH(evcon, &pool->connections, next) {
483 evcon->timeout = timeout_in_secs;
485 pool->timeout = timeout_in_secs;
490 static void evrpc_request_timeout(
int,
short,
void *);
497 evrpc_pool_find_connection(
struct evrpc_pool *pool)
500 TAILQ_FOREACH(connection, &pool->connections, next) {
501 if (TAILQ_FIRST(&connection->requests) == NULL)
525 ctx->request_marshal(req->output_buffer, ctx->request);
527 uri = evrpc_construct_uri(ctx->name);
532 ctx->evcon = connection;
535 if (evrpc_process_hooks(&pool->output_hooks,
536 req, req->output_buffer) == -1)
539 if (pool->timeout > 0) {
544 evutil_timerclear(&tv);
545 tv.tv_sec = pool->timeout;
559 memset(&status, 0,
sizeof(status));
560 status.error = EVRPC_STATUS_ERR_UNSTARTED;
561 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
562 evrpc_request_wrapper_free(ctx);
572 evtimer_set(&ctx->ev_timeout, evrpc_request_timeout, ctx);
573 if (pool->base != NULL)
574 event_base_set(pool->base, &ctx->ev_timeout);
577 assert(TAILQ_FIRST(&pool->connections) != NULL);
583 TAILQ_INSERT_TAIL(&pool->requests, ctx, next);
585 evrpc_pool_schedule(pool);
599 event_del(&ctx->ev_timeout);
601 memset(&status, 0,
sizeof(status));
602 status.http_req = req;
607 if (evrpc_process_hooks(&pool->input_hooks,
608 req, req->input_buffer) == -1) {
609 status.error = EVRPC_STATUS_ERR_HOOKABORTED;
612 res = ctx->reply_unmarshal(ctx->reply,
615 status.error = EVRPC_STATUS_ERR_BADPAYLOAD;
619 status.error = EVRPC_STATUS_ERR_TIMEOUT;
624 ctx->reply_clear(ctx->reply);
627 (*ctx->cb)(&status, ctx->request, ctx->reply, ctx->cb_arg);
629 evrpc_request_wrapper_free(ctx);
634 evrpc_pool_schedule(pool);
647 if ((evcon = evrpc_pool_find_connection(pool)) != NULL) {
648 TAILQ_REMOVE(&pool->requests, ctx, next);
649 evrpc_schedule_request(evcon, ctx);
654 evrpc_request_timeout(
int fd,
short what,
void *arg)
658 assert(evcon != NULL);
660 evhttp_connection_fail(evcon, EVCON_HTTP_TIMEOUT);