From 91166ca82db0f9a12b05700e960637ee7b809067 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov@tarantool.org> Date: Mon, 1 Nov 2021 15:31:04 +0300 Subject: [PATCH] iproto: add helpers to signal input and output It's better than using ev_feed_event and ev_is_active directly. Also, let's use EV_CUSTOM instead EV_READ/EV_WRITE for signaling, to emphasize that this is an artificial event, which has nothing to do with fd read/write readiness. It's okay, because input/output callbacks don't use events at all. --- src/box/iproto.cc | 44 ++++++++++++++++++++++++++++++-------------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 89a9d76ae6..c6075ab308 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -751,6 +751,28 @@ iproto_msg_new(struct iproto_connection *con) return msg; } +/** + * Signal input unless it's blocked on I/O or stopped. + */ +static inline void +iproto_connection_feed_input(struct iproto_connection *con) +{ + assert(con->state == IPROTO_CONNECTION_ALIVE); + if (!ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) + ev_feed_event(con->loop, &con->input, EV_CUSTOM); +} + +/** + * Signal output unless it's blocked on I/O. + */ +static inline void +iproto_connection_feed_output(struct iproto_connection *con) +{ + assert(con->state == IPROTO_CONNECTION_ALIVE); + if (!ev_is_active(&con->output)) + ev_feed_event(con->loop, &con->output, EV_CUSTOM); +} + /** * A connection is idle when the client is gone * and there are no outstanding msgs in the msg queue. @@ -1158,7 +1180,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * requests, keep reading input, if only to avoid * a deadlock on this connection. */ - ev_feed_event(con->loop, &con->input, EV_READ); + iproto_connection_feed_input(con); } cpipe_flush_input(&con->iproto_thread->tx_pipe); return 0; @@ -1357,10 +1379,7 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, ev_io_start(loop, &con->output); return; } - if (!ev_is_active(&con->input) && - rlist_empty(&con->in_stop_list)) { - ev_feed_event(loop, &con->input, EV_READ); - } + iproto_connection_feed_input(con); } if (ev_is_active(&con->output)) ev_io_stop(con->loop, &con->output); @@ -1745,9 +1764,8 @@ net_discard_input(struct cmsg *m) msg->p_ibuf->rpos += msg->len; msg->len = 0; con->long_poll_count++; - if (con->state == IPROTO_CONNECTION_ALIVE && - !ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) - ev_feed_event(con->loop, &con->input, EV_READ); + if (con->state == IPROTO_CONNECTION_ALIVE) + iproto_connection_feed_input(con); } static void @@ -2406,8 +2424,7 @@ net_send_msg(struct cmsg *m) con->wend = msg->wpos; if (con->state == IPROTO_CONNECTION_ALIVE) { - if (! ev_is_active(&con->output)) - ev_feed_event(con->loop, &con->output, EV_WRITE); + iproto_connection_feed_output(con); } else if (iproto_connection_is_idle(con)) { iproto_connection_close(con); } @@ -2529,7 +2546,7 @@ net_send_greeting(struct cmsg *m) */ assert(con->state == IPROTO_CONNECTION_ALIVE); /* Handshake OK, start reading input. */ - ev_feed_event(con->loop, &con->output, EV_WRITE); + iproto_connection_feed_output(con); iproto_msg_delete(msg); } @@ -2640,9 +2657,8 @@ iproto_process_push(struct cmsg *m) container_of(kharon, struct iproto_connection, kharon); con->wend = kharon->wpos; kharon->wpos = con->wpos; - if (con->state == IPROTO_CONNECTION_ALIVE && - !ev_is_active(&con->output)) - ev_feed_event(con->loop, &con->output, EV_WRITE); + if (con->state == IPROTO_CONNECTION_ALIVE) + iproto_connection_feed_output(con); } /** -- GitLab