diff --git a/core/fiber.m b/core/fiber.m index 682f68079cffe8aabe8576a7dc785ec314d1a633..0d1290e10e38650ba20b35e395e0bdeb75243e4f 100644 --- a/core/fiber.m +++ b/core/fiber.m @@ -258,45 +258,42 @@ wait_for_child(pid_t pid) } } -/** - * @note: this is a cancellation point. - */ static void -wait_for(int events) +fiber_io_start(int events) { ev_io *io = &fiber->io; - if (io->fd != fiber->fd || io->events != events) { /* events are not monitored */ - if (ev_is_active(io)) - ev_io_stop(io); - ev_io_set(io, fiber->fd, events); - } + assert (!ev_is_active(io)); - if (!ev_is_active(io)) - ev_io_start(io); + ev_io_set(io, fiber->fd, events); + ev_io_start(io); +} + +/** @note: this is a cancellation point. + */ + +static void +fiber_io_yield() +{ + assert(ev_is_active(&fiber->io)); @try { yield(); } @catch (id o) { - ev_io_stop(io); + ev_io_stop(&fiber->io); @throw; } } static void -unwait(int events) +fiber_io_stop(int events) { ev_io *io = &fiber->io; - assert(io->fd == fiber->fd); - - if (!ev_is_active(io)) - return; - if ((io->events & events) == 0) - return; + assert(ev_is_active(io) && io->fd == fiber->fd && (io->events & events)); ev_io_stop(io); } @@ -439,7 +436,7 @@ fiber_zombificate() static void fiber_loop(void *data __attribute__((unused))) { - while (true) { + for (;;) { assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0); @try { fiber->f(fiber->f_data); @@ -593,13 +590,16 @@ fiber_close(void) if (fiber->fd < 0) return 0; - unwait(-1); + /* We don't know if IO is active if there was an error. */ + if (ev_is_active(&fiber->io)) + fiber_io_stop(-1); + int r = close(fiber->fd); - if (r != -1) { - fiber->io.fd = fiber->fd = -1; - fiber->has_peer = false; - fiber->peer_name[0] = 0; - } + + fiber->fd = -1; + fiber->has_peer = false; + fiber->peer_name[0] = 0; + return r; } @@ -686,8 +686,9 @@ fiber_bread(struct tbuf *buf, size_t at_least) ssize_t r; tbuf_ensure(buf, MAX(cfg.readahead, at_least)); + fiber_io_start(EV_READ); for (;;) { - wait_for(EV_READ); + fiber_io_yield(); r = read(fiber->fd, buf->data + buf->len, buf->size - buf->len); if (r > 0) { buf->len += r; @@ -699,7 +700,7 @@ fiber_bread(struct tbuf *buf, size_t at_least) break; } } - unwait(EV_READ); + fiber_io_stop(EV_READ); return r; } @@ -723,8 +724,9 @@ fiber_flush_output(void) struct iovec *iov = iovec(fiber->iov); size_t iov_cnt = fiber->iov_cnt; + fiber_io_start(EV_WRITE); while (iov_cnt > 0) { - wait_for(EV_WRITE); + fiber_io_yield(); bytes += r = writev(fiber->fd, iov, MIN(iov_cnt, IOV_MAX)); if (r <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) @@ -745,7 +747,7 @@ fiber_flush_output(void) } } } - unwait(EV_WRITE); + fiber_io_stop(EV_WRITE); if (r < 0) { size_t rem = 0; @@ -771,11 +773,10 @@ fiber_read(void *buf, size_t count) { ssize_t r, done = 0; - if (count == 0) - return 0; - + fiber_io_start(EV_READ); while (count != done) { - wait_for(EV_READ); + + fiber_io_yield(); if ((r = read(fiber->fd, buf + done, count - done)) <= 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) @@ -785,7 +786,7 @@ fiber_read(void *buf, size_t count) } done += r; } - unwait(EV_READ); + fiber_io_stop(EV_READ); return done; } @@ -800,11 +801,10 @@ fiber_write(const void *buf, size_t count) int r; unsigned int done = 0; - if (count == 0) - return 0; + fiber_io_start(EV_WRITE); while (count != done) { - wait_for(EV_WRITE); + fiber_io_yield(); if ((r = write(fiber->fd, buf + done, count - done)) == -1) { if (errno == EAGAIN || errno == EWOULDBLOCK) continue; @@ -813,7 +813,7 @@ fiber_write(const void *buf, size_t count) } done += r; } - unwait(EV_WRITE); + fiber_io_stop(EV_WRITE); return done; } @@ -825,9 +825,6 @@ fiber_write(const void *buf, size_t count) int fiber_connect(struct sockaddr_in *addr) { - int error; - socklen_t error_size = sizeof(error); - fiber->fd = socket(AF_INET, SOCK_STREAM, 0); if (fiber->fd < 0) goto error; @@ -836,21 +833,28 @@ fiber_connect(struct sockaddr_in *addr) goto error; if (connect(fiber->fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) { + if (errno != EINPROGRESS) goto error; - } - wait_for(EV_WRITE); - if (getsockopt(fiber->fd, SOL_SOCKET, SO_ERROR, &error, &error_size) < 0) - goto error; + fiber_io_start(EV_WRITE); + fiber_io_yield(); + fiber_io_stop(EV_WRITE); - assert(error_size == sizeof(error)); + int error; + socklen_t error_size = sizeof(error); - if (error != 0) { - errno = error; - goto error; + if (getsockopt(fiber->fd, SOL_SOCKET, SO_ERROR, + &error, &error_size) < 0) + goto error; + + assert(error_size == sizeof(error)); + + if (error != 0) { + errno = error; + goto error; + } } - unwait(EV_WRITE); return fiber->fd; @@ -994,7 +998,6 @@ inbox2sock(void *_data __attribute__((unused))) if (fiber_write(out->data, out->len) != out->len) panic("child is dead"); fiber_gc(); - unwait(-1); } } @@ -1143,8 +1146,9 @@ tcp_server_handler(void *data) if (server->on_bind != NULL) server->on_bind(server->data); - while (1) { - wait_for(EV_READ); + fiber_io_start(EV_READ); + for (;;) { + fiber_io_yield(EV_READ); while ((fd = accept(fiber->fd, NULL, NULL)) > 0) { if (set_nonblock(fd) == -1) { @@ -1174,7 +1178,7 @@ tcp_server_handler(void *data) continue; } } - unwait(EV_READ); + fiber_io_stop(EV_READ); } struct fiber *