From 539aee3d07c9abe654108cd9ed960efc6182c856 Mon Sep 17 00:00:00 2001
From: Alexander Turenko <alexander.turenko@tarantool.org>
Date: Wed, 15 May 2019 23:07:35 +0300
Subject: [PATCH] iproto: init coio watcher before join/subscribe

box_process_join() and box_process_subscribe() use coio_write_xrow(),
which calls coio_writev_timeout() under hood. If a socket will block at
write() the function calls ev_io_start() to wake the fiber up when the
socket will be ready to write. This code assumes that the watcher
(struct ev_io) is initialized as coio watcher, i.e. coio_create() has
been called.

The reason why the code works before is that coio_write_xrow() in
box_process_{join,subscribe}() writes a small piece of data and so the
situation when a socket write buffer has less free space then needed is
rare.

Fixes #4110.
---
 src/box/box.h     | 16 ++++++++++++++++
 src/box/iproto.cc |  6 ++++--
 2 files changed, 20 insertions(+), 2 deletions(-)

diff --git a/src/box/box.h b/src/box/box.h
index 53d88ab717..ddcfbe2e59 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -172,9 +172,25 @@ box_reset_stat(void);
 void
 box_process_auth(struct auth_request *request, const char *salt);
 
+/**
+ * Join a replica.
+ *
+ * Register a replica and feed it with data.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param JOIN packet header
+ */
 void
 box_process_join(struct ev_io *io, struct xrow_header *header);
 
+/**
+ * Subscribe a replica.
+ *
+ * Perform necessary checks and start a relay thread.
+ *
+ * \param io coio watcher (initialized with coio_create())
+ * \param SUBSCRIBE packet header
+ */
 void
 box_process_subscribe(struct ev_io *io, struct xrow_header *header);
 
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 02b558ede5..8f899fed8c 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1693,6 +1693,8 @@ tx_process_join_subscribe(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
+	struct ev_io io;
+	coio_create(&io, con->input.fd);
 	try {
 		switch (msg->header.type) {
 		case IPROTO_JOIN:
@@ -1701,7 +1703,7 @@ tx_process_join_subscribe(struct cmsg *m)
 			 * the lambda in the beginning of the block
 			 * will re-activate the watchers for us.
 			 */
-			box_process_join(&con->input, &msg->header);
+			box_process_join(&io, &msg->header);
 			break;
 		case IPROTO_SUBSCRIBE:
 			/*
@@ -1710,7 +1712,7 @@ tx_process_join_subscribe(struct cmsg *m)
 			 * the write watcher will be re-activated
 			 * the same way as for JOIN.
 			 */
-			box_process_subscribe(&con->input, &msg->header);
+			box_process_subscribe(&io, &msg->header);
 			break;
 		default:
 			unreachable();
-- 
GitLab