From 9b4ab9fe0ef051630b658e002402874a8de7417b Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov@tarantool.org>
Date: Mon, 1 Nov 2021 19:28:30 +0300
Subject: [PATCH] iproto: use iostream abstraction

Instead of writing to the socket fd directly using sio, we wrap it in
iostream. This will allow us to use complex communication protocols in
iproto.

One thing that should be noted about this patch is how we handle
ev_io_start when we need to wait for the socket to become readable or
writable. Since iostream_write can block because it wants to read from
the socket and iostream_read can block because it wants to write to the
socket, we might need to update input/output events before ev_io_start.
Since ev_io events can't be updated while ev_io is active, we need to
stop ev_io for this.
---
 src/box/iproto.cc | 75 ++++++++++++++++++++++++++---------------------
 src/box/xrow.c    |  9 +++---
 src/box/xrow.h    |  5 ++--
 3 files changed, 49 insertions(+), 40 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index c6075ab308..fdf55e11e0 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -45,7 +45,7 @@
 #include "say.h"
 #include "sio.h"
 #include "evio.h"
-#include "coio.h"
+#include "iostream.h"
 #include "scoped_guard.h"
 #include "memory.h"
 #include "random.h"
@@ -558,6 +558,8 @@ struct iproto_connection
 	 * connections.
 	 */
 	int long_poll_count;
+	/** I/O stream used for communication with the client. */
+	struct iostream io;
 	struct ev_io input;
 	struct ev_io output;
 	/** Logical session. */
@@ -652,18 +654,19 @@ struct iproto_connection
 static inline const char *
 iproto_connection_name(const struct iproto_connection *con)
 {
-	return sio_socketname(con->input.fd);
+	return sio_socketname(con->io.fd);
 }
 
 #ifdef NDEBUG
-#define iproto_write_error(fd, e, schema_version, sync)                         \
-	iproto_do_write_error(fd, e, schema_version, sync);
+#define iproto_write_error(io, e, schema_version, sync)                         \
+	iproto_do_write_error(io, e, schema_version, sync);
 #else
-#define iproto_write_error(fd, e, schema_version, sync) do {                    \
+#define iproto_write_error(io, e, schema_version, sync) do {                    \
+	int fd = (io)->fd;                                                      \
 	int flags = fcntl(fd, F_GETFL, 0);                                      \
 	if (flags >= 0)                                                         \
 		fcntl(fd, F_SETFL, flags & (~O_NONBLOCK));                      \
-	iproto_do_write_error(fd, e, schema_version, sync);                     \
+	iproto_do_write_error(io, e, schema_version, sync);                     \
 	if (flags >= 0)                                                         \
 		fcntl(fd, F_SETFL, flags);                                      \
 } while (0);
@@ -877,13 +880,12 @@ iproto_connection_close(struct iproto_connection *con)
 		/* Clears all pending events. */
 		ev_io_stop(con->loop, &con->input);
 		ev_io_stop(con->loop, &con->output);
-		int fd = con->input.fd;
 		/*
 		 * Invalidate fd to prevent undefined behavior in case
 		 * we mistakenly try to use it after this point.
 		 */
 		con->input.fd = con->output.fd = -1;
-		close(fd);
+		iostream_close(&con->io);
 		/*
 		 * Discard unparsed data, to recycle the
 		 * connection in net_send_msg() as soon as all
@@ -1202,7 +1204,7 @@ iproto_connection_resume(struct iproto_connection *con)
 	 */
 	if (iproto_enqueue_batch(con, con->p_ibuf) != 0) {
 		struct error *e = box_error_last();
-		iproto_write_error(con->input.fd, e, ::schema_version, 0);
+		iproto_write_error(&con->io, e, ::schema_version, 0);
 		error_log(e);
 		iproto_connection_close(con);
 	}
@@ -1240,7 +1242,7 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 {
 	struct iproto_connection *con =
 		(struct iproto_connection *) watcher->data;
-	int fd = con->input.fd;
+	struct iostream *io = &con->io;
 	assert(con->state == IPROTO_CONNECTION_ALIVE);
 	assert(rlist_empty(&con->in_stop_list));
 	assert(loop == con->loop);
@@ -1262,10 +1264,15 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 			return;
 		}
 		/* Read input. */
-		int nrd = sio_read(fd, in->wpos, ibuf_unused(in));
+		ssize_t nrd = iostream_read(io, in->wpos, ibuf_unused(in));
 		if (nrd < 0) {                  /* Socket is not ready. */
-			if (! sio_wouldblock(errno))
+			if (nrd == IOSTREAM_ERROR)
 				diag_raise();
+			int events = iostream_status_to_events(nrd);
+			if (con->input.events != events) {
+				ev_io_stop(loop, &con->input);
+				ev_io_set(&con->input, con->input.fd, events);
+			}
 			ev_io_start(loop, &con->input);
 			return;
 		}
@@ -1285,18 +1292,16 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 			diag_raise();
 	} catch (Exception *e) {
 		/* Best effort at sending the error message to the client. */
-		iproto_write_error(fd, e, ::schema_version, 0);
+		iproto_write_error(io, e, ::schema_version, 0);
 		e->log();
 		iproto_connection_close(con);
 	}
 }
 
 /** writev() to the socket and handle the result. */
-
 static int
 iproto_flush(struct iproto_connection *con)
 {
-	int fd = con->output.fd;
 	struct obuf *obuf = con->wpos.obuf;
 	struct obuf_svp obuf_end = obuf_create_svp(obuf);
 	struct obuf_svp *begin = &con->wpos.svp;
@@ -1335,9 +1340,8 @@ iproto_flush(struct iproto_connection *con)
 	/* *Overwrite* iov_len of the last pos as it may be garbage. */
 	iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1);
 
-	ssize_t nwr = sio_writev(fd, iov, iovcnt);
-
-	if (nwr > 0) {
+	ssize_t nwr = iostream_writev(&con->io, iov, iovcnt);
+	if (nwr >= 0) {
 		/* Count statistics */
 		rmean_collect(con->iproto_thread->rmean, IPROTO_SENT, nwr);
 		if (begin->used + nwr == end->used) {
@@ -1351,8 +1355,8 @@ iproto_flush(struct iproto_connection *con)
 		begin->iov_len = advance == 0 ? begin->iov_len + offset: offset;
 		begin->pos += advance;
 		assert(begin->pos <= end->pos);
-		return -1;
-	} else if (nwr < 0 && ! sio_wouldblock(errno)) {
+		return IOSTREAM_WANT_WRITE;
+	} else if (nwr == IOSTREAM_ERROR) {
 		/*
 		 * Don't close the connection on write error. Log the error and
 		 * don't write to the socket anymore. Continue processing
@@ -1364,7 +1368,7 @@ iproto_flush(struct iproto_connection *con)
 		*begin = *end;
 		return 0;
 	}
-	return -1;
+	return nwr;
 }
 
 static void
@@ -1376,6 +1380,11 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher,
 	int rc;
 	while ((rc = iproto_flush(con)) <= 0) {
 		if (rc != 0) {
+			int events = iostream_status_to_events(rc);
+			if (con->output.events != events) {
+				ev_io_stop(loop, &con->output);
+				ev_io_set(&con->output, con->output.fd, events);
+			}
 			ev_io_start(loop, &con->output);
 			return;
 		}
@@ -1398,6 +1407,7 @@ iproto_connection_new(struct iproto_thread *iproto_thread, int fd)
 	con->iproto_thread = iproto_thread;
 	con->input.data = con->output.data = con;
 	con->loop = loop();
+	iostream_create(&con->io, fd);
 	ev_io_init(&con->input, iproto_connection_on_input, fd, EV_READ);
 	ev_io_init(&con->output, iproto_connection_on_output, fd, EV_WRITE);
 	ibuf_create(&con->ibuf[0], cord_slab_cache(), iproto_readahead);
@@ -1430,6 +1440,7 @@ static inline void
 iproto_connection_delete(struct iproto_connection *con)
 {
 	assert(iproto_connection_is_idle(con));
+	assert(!iostream_is_initialized(&con->io));
 	assert(con->session == NULL);
 	assert(con->state == IPROTO_CONNECTION_DESTROYED);
 	/*
@@ -2311,8 +2322,7 @@ tx_process_replication(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
 	struct iproto_connection *con = msg->connection;
-	struct iostream io;
-	iostream_create(&io, con->input.fd);
+	struct iostream *io = &con->io;
 	assert(!in_txn());
 	try {
 		switch (msg->header.type) {
@@ -2322,13 +2332,13 @@ tx_process_replication(struct cmsg *m)
 			 * the lambda in the beginning of the block
 			 * will re-activate the watchers for us.
 			 */
-			box_process_join(&io, &msg->header);
+			box_process_join(io, &msg->header);
 			break;
 		case IPROTO_FETCH_SNAPSHOT:
-			box_process_fetch_snapshot(&io, &msg->header);
+			box_process_fetch_snapshot(io, &msg->header);
 			break;
 		case IPROTO_REGISTER:
-			box_process_register(&io, &msg->header);
+			box_process_register(io, &msg->header);
 			break;
 		case IPROTO_SUBSCRIBE:
 			/*
@@ -2337,7 +2347,7 @@ tx_process_replication(struct cmsg *m)
 			 * the write watcher will be re-activated
 			 * the same way as for JOIN.
 			 */
-			box_process_subscribe(&io, &msg->header);
+			box_process_subscribe(io, &msg->header);
 			break;
 		default:
 			unreachable();
@@ -2350,10 +2360,8 @@ tx_process_replication(struct cmsg *m)
 		  * written row. Do not push it on top.
 		  */
 	} catch (Exception *e) {
-		iproto_write_error(con->input.fd, e, ::schema_version,
-				   msg->header.sync);
+		iproto_write_error(io, e, ::schema_version, msg->header.sync);
 	}
-	iostream_destroy(&io);
 	tx_end_msg(msg);
 }
 
@@ -2523,14 +2531,13 @@ net_send_greeting(struct cmsg *m)
 	struct iproto_connection *con = msg->connection;
 	if (msg->close_connection) {
 		struct obuf *out = msg->wpos.obuf;
-		int64_t nwr = sio_writev(con->output.fd, out->iov,
-					 obuf_iovcnt(out));
-
+		int64_t nwr = iostream_writev(&con->io, out->iov,
+					      obuf_iovcnt(out));
 		if (nwr > 0) {
 			/* Count statistics. */
 			rmean_collect(con->iproto_thread->rmean,
 				      IPROTO_SENT, nwr);
-		} else if (nwr < 0 && ! sio_wouldblock(errno)) {
+		} else if (nwr == IOSTREAM_ERROR) {
 			diag_log();
 		}
 		assert(iproto_connection_is_idle(con));
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 632f449a65..eb52d2f3b6 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -36,6 +36,7 @@
 #include <base64.h>
 
 #include "fiber.h"
+#include "iostream.h"
 #include "version.h"
 #include "tt_static.h"
 #include "error.h"
@@ -609,8 +610,8 @@ iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync,
 }
 
 void
-iproto_do_write_error(int fd, const struct error *e, uint32_t schema_version,
-		      uint64_t sync)
+iproto_do_write_error(struct iostream *io, const struct error *e,
+		      uint32_t schema_version, uint64_t sync)
 {
 	bool is_error = false;
 	struct mpstream stream;
@@ -637,8 +638,8 @@ iproto_do_write_error(int fd, const struct error *e, uint32_t schema_version,
 	ssize_t unused;
 
 	ERROR_INJECT_YIELD(ERRINJ_IPROTO_WRITE_ERROR_DELAY);
-	unused = write(fd, header, sizeof(header));
-	unused = write(fd, payload, payload_size);
+	unused = iostream_write(io, header, sizeof(header));
+	unused = iostream_write(io, payload, payload_size);
 	(void) unused;
 cleanup:
 	region_truncate(region, region_svp);
diff --git a/src/box/xrow.h b/src/box/xrow.h
index 03bf4f27d2..e5907a49cd 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -56,6 +56,7 @@ enum {
 	IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7,
 };
 
+struct iostream;
 struct region;
 
 struct xrow_header {
@@ -803,8 +804,8 @@ iproto_send_event(struct obuf *out, const char *key, size_t key_len,
 
 /** Write error directly to a socket. */
 void
-iproto_do_write_error(int fd, const struct error *e, uint32_t schema_version,
-		      uint64_t sync);
+iproto_do_write_error(struct iostream *io, const struct error *e,
+		      uint32_t schema_version, uint64_t sync);
 
 enum {
 	/* Maximal length of protocol name in handshake */
-- 
GitLab