28 #include <sys/types.h>
34 #ifdef HAVE_SYS_TIME_H
55 void bufferevent_read_pressure_cb(
struct evbuffer *,
size_t,
size_t,
void *);
58 bufferevent_add(
struct event *ev,
int timeout)
63 evutil_timerclear(&tv);
68 return (event_add(ev, ptv));
77 bufferevent_read_pressure_cb(
struct evbuffer *
buf,
size_t old,
size_t now,
84 if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) {
85 evbuffer_setcb(buf, NULL, NULL);
87 if (bufev->enabled & EV_READ)
88 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
93 bufferevent_readcb(
int fd,
short event,
void *arg)
97 short what = EVBUFFER_READ;
101 if (event == EV_TIMEOUT) {
102 what |= EVBUFFER_TIMEOUT;
110 if (bufev->wm_read.high != 0) {
111 howmuch = bufev->wm_read.high - EVBUFFER_LENGTH(bufev->input);
114 struct evbuffer *buf = bufev->input;
115 event_del(&bufev->ev_read);
117 bufferevent_read_pressure_cb, bufev);
122 res = evbuffer_read(bufev->input, fd, howmuch);
124 if (errno == EAGAIN || errno == EINTR)
127 what |= EVBUFFER_ERROR;
128 }
else if (res == 0) {
130 what |= EVBUFFER_EOF;
136 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
139 len = EVBUFFER_LENGTH(bufev->input);
140 if (bufev->wm_read.low != 0 && len < bufev->wm_read.low)
142 if (bufev->wm_read.high != 0 && len >= bufev->wm_read.high) {
143 struct evbuffer *buf = bufev->input;
144 event_del(&bufev->ev_read);
147 evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev);
151 if (bufev->readcb != NULL)
152 (*bufev->readcb)(bufev, bufev->cbarg);
156 bufferevent_add(&bufev->ev_read, bufev->timeout_read);
160 (*bufev->errorcb)(bufev, what, bufev->cbarg);
164 bufferevent_writecb(
int fd,
short event,
void *arg)
168 short what = EVBUFFER_WRITE;
170 if (event == EV_TIMEOUT) {
171 what |= EVBUFFER_TIMEOUT;
175 if (EVBUFFER_LENGTH(bufev->output)) {
176 res = evbuffer_write(bufev->output, fd);
181 if (errno == EAGAIN ||
183 errno == EINPROGRESS)
186 what |= EVBUFFER_ERROR;
192 }
else if (res == 0) {
194 what |= EVBUFFER_EOF;
200 if (EVBUFFER_LENGTH(bufev->output) != 0)
201 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
207 if (bufev->writecb != NULL &&
208 EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low)
209 (*bufev->writecb)(bufev, bufev->cbarg);
214 if (EVBUFFER_LENGTH(bufev->output) != 0)
215 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
219 (*bufev->errorcb)(bufev, what, bufev->cbarg);
234 bufferevent_new(
int fd, evbuffercb readcb, evbuffercb writecb,
235 everrorcb errorcb,
void *cbarg)
239 if ((bufev = calloc(1,
sizeof(
struct bufferevent))) == NULL)
242 if ((bufev->input = evbuffer_new()) == NULL) {
247 if ((bufev->output = evbuffer_new()) == NULL) {
248 evbuffer_free(bufev->input);
253 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
254 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
256 bufferevent_setcb(bufev, readcb, writecb, errorcb, cbarg);
263 bufev->enabled = EV_WRITE;
270 evbuffercb readcb, evbuffercb writecb, everrorcb errorcb,
void *cbarg)
272 bufev->readcb = readcb;
273 bufev->writecb = writecb;
274 bufev->errorcb = errorcb;
276 bufev->cbarg = cbarg;
282 event_del(&bufev->ev_read);
283 event_del(&bufev->ev_write);
285 event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev);
286 event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev);
287 if (bufev->ev_base != NULL) {
288 event_base_set(bufev->ev_base, &bufev->ev_read);
289 event_base_set(bufev->ev_base, &bufev->ev_write);
298 if (event_priority_set(&bufev->ev_read, priority) == -1)
300 if (event_priority_set(&bufev->ev_write, priority) == -1)
311 event_del(&bufev->ev_read);
312 event_del(&bufev->ev_write);
314 evbuffer_free(bufev->input);
315 evbuffer_free(bufev->output);
330 res = evbuffer_add(bufev->output, data, size);
336 if (size > 0 && (bufev->enabled & EV_WRITE))
337 bufferevent_add(&bufev->ev_write, bufev->timeout_write);
347 res = bufferevent_write(bufev, buf->buffer, buf->off);
349 evbuffer_drain(buf, buf->off);
357 struct evbuffer *buf = bufev->input;
363 memcpy(data, buf->buffer, size);
366 evbuffer_drain(buf, size);
374 if (event & EV_READ) {
375 if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1)
378 if (event & EV_WRITE) {
379 if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1)
383 bufev->enabled |= event;
390 if (event & EV_READ) {
391 if (event_del(&bufev->ev_read) == -1)
394 if (event & EV_WRITE) {
395 if (event_del(&bufev->ev_write) == -1)
399 bufev->enabled &= ~event;
409 int timeout_read,
int timeout_write) {
410 bufev->timeout_read = timeout_read;
411 bufev->timeout_write = timeout_write;
413 if (event_pending(&bufev->ev_read, EV_READ, NULL))
414 bufferevent_add(&bufev->ev_read, timeout_read);
415 if (event_pending(&bufev->ev_write, EV_WRITE, NULL))
416 bufferevent_add(&bufev->ev_write, timeout_write);
425 size_t lowmark,
size_t highmark)
427 if (events & EV_READ) {
428 bufev->wm_read.low = lowmark;
429 bufev->wm_read.high = highmark;
432 if (events & EV_WRITE) {
433 bufev->wm_write.low = lowmark;
434 bufev->wm_write.high = highmark;
438 bufferevent_read_pressure_cb(bufev->input,
439 0, EVBUFFER_LENGTH(bufev->input), bufev);
447 bufev->ev_base = base;
449 res = event_base_set(base, &bufev->ev_read);
453 res = event_base_set(base, &bufev->ev_write);