diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 89a9d76ae68ebde64227966ed59a0b1ce0ce6237..c6075ab30800064bc8ee733d016710a22d57b5da 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -751,6 +751,28 @@ iproto_msg_new(struct iproto_connection *con) return msg; } +/** + * Signal input unless it's blocked on I/O or stopped. + */ +static inline void +iproto_connection_feed_input(struct iproto_connection *con) +{ + assert(con->state == IPROTO_CONNECTION_ALIVE); + if (!ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) + ev_feed_event(con->loop, &con->input, EV_CUSTOM); +} + +/** + * Signal output unless it's blocked on I/O. + */ +static inline void +iproto_connection_feed_output(struct iproto_connection *con) +{ + assert(con->state == IPROTO_CONNECTION_ALIVE); + if (!ev_is_active(&con->output)) + ev_feed_event(con->loop, &con->output, EV_CUSTOM); +} + /** * A connection is idle when the client is gone * and there are no outstanding msgs in the msg queue. @@ -1158,7 +1180,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * requests, keep reading input, if only to avoid * a deadlock on this connection. */ - ev_feed_event(con->loop, &con->input, EV_READ); + iproto_connection_feed_input(con); } cpipe_flush_input(&con->iproto_thread->tx_pipe); return 0; @@ -1357,10 +1379,7 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, ev_io_start(loop, &con->output); return; } - if (!ev_is_active(&con->input) && - rlist_empty(&con->in_stop_list)) { - ev_feed_event(loop, &con->input, EV_READ); - } + iproto_connection_feed_input(con); } if (ev_is_active(&con->output)) ev_io_stop(con->loop, &con->output); @@ -1745,9 +1764,8 @@ net_discard_input(struct cmsg *m) msg->p_ibuf->rpos += msg->len; msg->len = 0; con->long_poll_count++; - if (con->state == IPROTO_CONNECTION_ALIVE && - !ev_is_active(&con->input) && rlist_empty(&con->in_stop_list)) - ev_feed_event(con->loop, &con->input, EV_READ); + if (con->state == IPROTO_CONNECTION_ALIVE) + iproto_connection_feed_input(con); } static void @@ -2406,8 +2424,7 @@ net_send_msg(struct cmsg *m) con->wend = msg->wpos; if (con->state == IPROTO_CONNECTION_ALIVE) { - if (! ev_is_active(&con->output)) - ev_feed_event(con->loop, &con->output, EV_WRITE); + iproto_connection_feed_output(con); } else if (iproto_connection_is_idle(con)) { iproto_connection_close(con); } @@ -2529,7 +2546,7 @@ net_send_greeting(struct cmsg *m) */ assert(con->state == IPROTO_CONNECTION_ALIVE); /* Handshake OK, start reading input. */ - ev_feed_event(con->loop, &con->output, EV_WRITE); + iproto_connection_feed_output(con); iproto_msg_delete(msg); } @@ -2640,9 +2657,8 @@ iproto_process_push(struct cmsg *m) container_of(kharon, struct iproto_connection, kharon); con->wend = kharon->wpos; kharon->wpos = con->wpos; - if (con->state == IPROTO_CONNECTION_ALIVE && - !ev_is_active(&con->output)) - ev_feed_event(con->loop, &con->output, EV_WRITE); + if (con->state == IPROTO_CONNECTION_ALIVE) + iproto_connection_feed_output(con); } /**