From 9b8fe42b41d25b01c7aa4df3afc3fb2e59dc59a3 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Mon, 7 Sep 2015 00:31:19 +0300
Subject: [PATCH] replication: refactor replica.cc

* log messages correctly in case of SocketError
* a step towards making replica_connect() reusable
  from the outside.
---
 src/box/box.cc     |  4 +--
 src/box/replica.cc | 87 ++++++++++++++++++++++------------------------
 src/box/replica.h  |  8 +++--
 src/coio.h         |  6 ++++
 4 files changed, 54 insertions(+), 51 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index f2678bb7de..ed2e3a85d8 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -708,10 +708,10 @@ box_init(void)
 		/* Add a surrogate server id for snapshot rows */
 		vclock_add_server(&recovery->vclock, 0);
 
-		/* Bootstrap from replica */
+		/* Bootstrap from a remote master */
 		replica = replica_new(source);
 		replica_start(replica, recovery);
-		replica_join(replica); /* throws on failure */
+		replica_wait(replica); /* throws on failure */
 
 		int64_t checkpoint_id = vclock_sum(&recovery->vclock);
 		engine_checkpoint(checkpoint_id);
diff --git a/src/box/replica.cc b/src/box/replica.cc
index 01206312b6..cabb5ee46e 100644
--- a/src/box/replica.cc
+++ b/src/box/replica.cc
@@ -92,11 +92,13 @@ replica_write_row(struct ev_io *coio, const struct xrow_header *row)
 /**
  * Connect to a remote host and authenticate the client.
  */
-static void
-replica_connect(struct replica *replica, struct ev_io *coio,
-		struct iobuf *iobuf)
+void
+replica_connect(struct replica *replica)
 {
-	assert(replica->io.fd < 0);
+	struct ev_io *coio = &replica->io;
+	struct iobuf *iobuf = replica->iobuf;
+	if (coio->fd >= 0)
+		return;
 	char greeting[IPROTO_GREETING_SIZE];
 
 	struct uri *uri = &replica->uri;
@@ -145,13 +147,14 @@ replica_connect(struct replica *replica, struct ev_io *coio,
  * Execute and process JOIN request (bootstrap the server).
  */
 static void
-replica_join(struct replica *replica, struct recovery_state *r,
-	     struct ev_io *coio, struct iobuf *iobuf)
+replica_join(struct replica *replica, struct recovery_state *r)
 {
 	say_info("downloading a snapshot from %s",
 		 sio_strfaddr(&replica->addr, replica->addr_len));
 
 	/* Send JOIN request */
+	struct ev_io *coio = &replica->io;
+	struct iobuf *iobuf = replica->iobuf;
 	struct xrow_header row;
 	xrow_encode_join(&row, &r->server_uuid);
 	replica_write_row(coio, &row);
@@ -191,10 +194,11 @@ replica_join(struct replica *replica, struct recovery_state *r,
  * Execute and process SUBSCRIBE request (follow updates from a master).
  */
 static void
-replica_subscribe(struct replica *replica, struct recovery_state *r,
-		  struct ev_io *coio, struct iobuf *iobuf)
+replica_subscribe(struct replica *replica, struct recovery_state *r)
 {
 	/* Send SUBSCRIBE request */
+	struct ev_io *coio = &replica->io;
+	struct iobuf *iobuf = replica->iobuf;
 	struct xrow_header row;
 	xrow_encode_subscribe(&row, &cluster_id, &r->server_uuid, &r->vclock);
 	replica_write_row(coio, &row);
@@ -230,6 +234,8 @@ replica_subscribe(struct replica *replica, struct recovery_state *r,
 static inline void
 replica_log_exception(struct replica *replica, Exception *e)
 {
+	if (type_cast(FiberCancelException, e))
+		return;
 	if (replica->warning_said)
 		return;
 	switch (replica->state) {
@@ -250,34 +256,41 @@ replica_log_exception(struct replica *replica, Exception *e)
 		break;
 	}
 	e->log();
+	if (type_cast(SocketError, e))
+		say_info("will retry every %i second", RECONNECT_DELAY);
 	replica->warning_said = true;
 }
 
+static inline void
+replica_disconnect(struct replica *replica, Exception *e,
+		   enum replica_state state)
+{
+	replica_log_exception(replica, e);
+	coio_close(loop(), &replica->io);
+	iobuf_reset(replica->iobuf);
+	replica_set_state(replica, state);
+	fiber_gc();
+}
+
 static void
 replica_f(va_list ap)
 {
 	struct replica *replica = va_arg(ap, struct replica *);
 	struct recovery_state *r = va_arg(ap, struct recovery_state *);
-	struct ev_io *coio = &replica->io;
-	struct iobuf *iobuf = iobuf_new();
-	ev_loop *loop = loop();
-
-	coio_init(coio, coio->fd); /* re-use connection if any */
 
 	/* Re-connect loop */
 	while (true) {
 		try {
-			if (coio->fd < 0)
-				replica_connect(replica, coio, iobuf);
+			replica_connect(replica);
 			/*
 			 * Execute JOIN if this is a bootstrap, and
 			 * there is no snapshot, and SUBSCRIBE
 			 * otherwise.
 			 */
 			if (r->writer == NULL) {
-				replica_join(replica, r, coio, iobuf);
+				replica_join(replica, r);
 			} else {
-				replica_subscribe(replica, r, coio, iobuf);
+				replica_subscribe(replica, r);
 				/*
 				 * subscribe() has an infinite
 				 * loop which is stoppable only
@@ -285,33 +298,20 @@ replica_f(va_list ap)
 				 */
 				assert(0);
 			}
-			ev_io_stop(loop(), coio);
+			ev_io_stop(loop(), &replica->io);
+			iobuf_reset(replica->iobuf);
 			/* Don't close the socket */
 			return;
 		} catch (ClientError *e) {
-			replica_log_exception(replica, e);
-			evio_close(loop, coio);
-			iobuf_delete(iobuf);
-			replica_set_state(replica, REPLICA_STOPPED);
+			replica_disconnect(replica, e, REPLICA_STOPPED);
 			throw;
 		} catch (FiberCancelException *e) {
-			evio_close(loop, coio);
-			iobuf_delete(iobuf);
-			replica_set_state(replica, REPLICA_OFF);
+			replica_disconnect(replica, e, REPLICA_OFF);
 			throw;
 		} catch (SocketError *e) {
-			replica_log_exception(replica, e);
-			evio_close(loop, coio);
-			replica_set_state(replica, REPLICA_DISCONNECTED);
+			replica_disconnect(replica, e, REPLICA_DISCONNECTED);
 			/* fall through */
 		}
-
-		if (!replica->warning_said)
-			say_info("will retry every %i second", RECONNECT_DELAY);
-		replica->warning_said = true;
-		iobuf_reset(iobuf);
-		fiber_gc();
-
 		/* Put fiber_sleep() out of catch block.
 		 *
 		 * This is done to avoid situation, when two or more
@@ -325,14 +325,7 @@ replica_f(va_list ap)
 		 *
 		 * See: https://github.com/tarantool/tarantool/issues/136
 		*/
-
-		try {
-			fiber_sleep(RECONNECT_DELAY);
-		} catch (FiberCancelException *e) {
-			/* Cleanup resources on fiber_cancel() */
-			iobuf_delete(iobuf);
-			throw;
-		}
+		fiber_sleep(RECONNECT_DELAY);
 	}
 }
 
@@ -377,7 +370,7 @@ replica_stop(struct replica *replica)
 }
 
 void
-replica_join(struct replica *replica)
+replica_wait(struct replica *replica)
 {
 	assert(replica->reader != NULL);
 	auto fiber_guard = make_scoped_guard([=] { replica->reader = NULL; });
@@ -393,7 +386,8 @@ replica_new(const char *uri)
 		tnt_raise(OutOfMemory, sizeof(*replica), "malloc",
 			  "struct replica");
 	}
-	replica->io.fd = -1;
+	coio_init(&replica->io, -1);
+	replica->iobuf = iobuf_new();
 
 	/* uri_parse() sets pointers to replica->source buffer */
 	snprintf(replica->source, sizeof(replica->source), "%s", uri);
@@ -408,6 +402,7 @@ void
 replica_delete(struct replica *replica)
 {
 	assert(replica->reader == NULL);
-	evio_close(loop(), &replica->io);
+	iobuf_delete(replica->iobuf);
+	coio_close(loop(), &replica->io);
 	free(replica);
 }
diff --git a/src/box/replica.h b/src/box/replica.h
index 95e061a727..77962226eb 100644
--- a/src/box/replica.h
+++ b/src/box/replica.h
@@ -71,8 +71,10 @@ struct replica {
 		struct sockaddr_storage addrstorage;
 	};
 	socklen_t addr_len;
-	/* save master fd to re-use a connection between JOIN and SUBSCRIBE */
+	/** Save master fd to re-use a connection between JOIN and SUBSCRIBE */
 	struct ev_io io;
+	/** Input/output buffer for buffered IO */
+	struct iobuf *iobuf;
 };
 
 /**
@@ -100,14 +102,14 @@ replica_stop(struct replica *replica);
 
 /**
  * Wait replication client to finish and rethrow exception (if any).
- * Use this function to wait bootstrap.
+ * Use this function to wait until bootstrap.
  *
  * \post This function keeps a open connection in io->fd.
  * \sa replica_start()
  * \sa fiber_join()
  */
 void
-replica_join(struct replica *replica);
+replica_wait(struct replica *replica);
 
 /**
  * Allocate an instance of replica object, create replica and initialize
diff --git a/src/coio.h b/src/coio.h
index 9509ffeca1..3fdc8579d7 100644
--- a/src/coio.h
+++ b/src/coio.h
@@ -68,6 +68,12 @@ coio_accept(struct ev_io *coio, struct sockaddr *addr, socklen_t addrlen,
 void
 coio_init(struct ev_io *coio, int fd);
 
+static inline void
+coio_close(ev_loop *loop, struct ev_io *coio)
+{
+	return evio_close(loop, coio);
+}
+
 ssize_t
 coio_read_ahead_timeout(struct ev_io *coio, void *buf, size_t sz, size_t bufsiz,
 		        ev_tstamp timeout);
-- 
GitLab