From 3f7971ea76b969e614b38682513aca3a93658e2a Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Thu, 13 Nov 2014 15:50:37 +0300
Subject: [PATCH] gh-578: review fixes

Fix obvious coding bugs in cord_cojoin/cord_join().

Remove unnecessary constants & arguments.
Keep an eye on the include dependencies becoming more messy.
Clear the O_NONBLOCK flag in the join thread, which
uses non-blocking API.
Suspend I/O in the main thread event loop for the
duration of replication join/subscribe.
---
 src/box/box.cc         |  9 ++---
 src/box/iproto.cc      | 22 +++-------
 src/box/recovery.cc    |  2 +-
 src/box/recovery.h     | 45 +++++++++++++++++----
 src/box/replica.cc     |  1 -
 src/box/replica.h      | 20 +--------
 src/box/replication.cc | 19 +++++----
 src/box/replication.h  | 16 +-------
 src/exception.cc       | 27 +++++++++++++
 src/exception.h        |  6 +++
 src/fiber.cc           | 92 +++++++++++++++++++++---------------------
 src/fiber.h            | 49 +++++++++++-----------
 src/tt_pthread.h       |  2 +-
 13 files changed, 166 insertions(+), 144 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index de3f1c33ca..d6c744896c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -27,15 +27,13 @@
  * SUCH DAMAGE.
  */
 #include "box/box.h"
-#include <arpa/inet.h>
-#include <sys/wait.h>
 
-#include <errcode.h>
-#include "recovery.h"
-#include "log_io.h"
 #include <say.h>
 #include "iproto.h"
+#include "iproto_constants.h"
+#include "recovery.h"
 #include "replication.h"
+#include "replica.h"
 #include <stat.h>
 #include <tarantool.h>
 #include "tuple.h"
@@ -53,7 +51,6 @@
 #include "user_cache.h"
 #include "cfg.h"
 #include "iobuf.h"
-#include "iproto_constants.h"
 
 static void process_ro(struct port *port, struct request *request);
 static void process_rw(struct port *port, struct request *request);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 4b561b5231..c184871f4b 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -49,18 +49,6 @@
 #include "iproto_constants.h"
 #include "user_def.h"
 
-class IprotoConnectionShutdown: public Exception
-{
-public:
-	IprotoConnectionShutdown(const char *file, int line)
-		:Exception(file, line) {}
-	virtual void log() const;
-};
-
-void
-IprotoConnectionShutdown::log() const
-{}
-
 /* {{{ iproto_request - declaration */
 
 struct iproto_connection;
@@ -558,8 +546,6 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher,
 		 */
 		if (!ev_is_active(&con->input))
 			ev_feed_event(loop, &con->input, EV_READ);
-	} catch (IprotoConnectionShutdown *e) {
-		iproto_connection_shutdown(con);
 	} catch (Exception *e) {
 		e->log();
 		iproto_connection_close(con);
@@ -711,13 +697,17 @@ iproto_process_admin(struct iproto_request *ireq)
 					  ireq->header.sync);
 			break;
 		case IPROTO_JOIN:
+			ev_io_stop(con->loop, &con->input);
+			ev_io_stop(con->loop, &con->output);
 			box_process_join(con->input.fd, &ireq->header);
-			/* TODO: check requests in `con; queue */
+			/* TODO: check requests in `con' queue */
 			iproto_connection_shutdown(con);
 			return;
 		case IPROTO_SUBSCRIBE:
+			ev_io_stop(con->loop, &con->input);
+			ev_io_stop(con->loop, &con->output);
 			box_process_subscribe(con->input.fd, &ireq->header);
-			/* TODO: check requests in `con; queue */
+			/* TODO: check requests in `con' queue */
 			iproto_connection_shutdown(con);
 			return;
 		default:
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 38c9cec289..cbfa110c2f 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -856,7 +856,7 @@ wal_writer_stop(struct recovery_state *r)
 	writer->is_shutdown= true;
 	(void) tt_pthread_cond_signal(&writer->cond);
 	(void) tt_pthread_mutex_unlock(&writer->mutex);
-	if (cord_rawjoin(&writer->cord, NULL, NULL)) {
+	if (cord_join(&writer->cord)) {
 		/* We can't recover from this in any reasonable way. */
 		panic_syserror("WAL writer: thread join failed");
 	}
diff --git a/src/box/recovery.h b/src/box/recovery.h
index d84ed940e3..4f1628064a 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -29,15 +29,14 @@
  * SUCH DAMAGE.
  */
 #include <stdbool.h>
+#include <netinet/in.h>
 
 #include "trivia/util.h"
 #include "third_party/tarantool_ev.h"
 #include "log_io.h"
 #include "vclock.h"
 #include "tt_uuid.h"
-#include "replica.h"
-#include "replication.h"
-#include "small/region.h"
+#include "uri.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -63,18 +62,50 @@ enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX };
 /** String constants for the supported modes. */
 extern const char *wal_mode_STRS[];
 
+/** State of a replication relay. */
+struct relay {
+	/** Replica connection */
+	int sock;
+	/* Request type - SUBSCRIBE or JOIN */
+	uint32_t type;
+	/* Request sync */
+	uint64_t sync;
+	/* Only used in SUBSCRIBE request */
+	uint32_t server_id;
+	struct vclock vclock;
+};
+
+enum { REMOTE_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
+
+/** State of a replication connection to the master */
+struct remote {
+	struct fiber *reader;
+	ev_tstamp recovery_lag, recovery_last_update_tstamp;
+	bool warning_said;
+	char source[REMOTE_SOURCE_MAXLEN];
+	struct uri uri;
+	union {
+		struct sockaddr addr;
+		struct sockaddr_storage addrstorage;
+	};
+	socklen_t addr_len;
+};
+
 struct recovery_state {
 	struct vclock vclock;
-	/* The WAL we're currently reading/writing from/to. */
+	/** The WAL we're currently reading/writing from/to. */
 	struct log_io *current_wal;
 	struct log_dir snap_dir;
 	struct log_dir wal_dir;
-	int64_t signature; /* used to find missing xlog files */
+	/** Used to find missing xlog files */
+	int64_t signature;
 	struct wal_writer *writer;
 	struct wal_watcher *watcher;
 	union {
-		struct remote remote; /* slave->master state  */
-		struct relay relay;   /* master->slave state */
+		/** slave->master state */
+		struct remote remote;
+		/** master->slave state */
+		struct relay relay;
 	};
 	/**
 	 * row_handler is a module callback invoked during initial
diff --git a/src/box/replica.cc b/src/box/replica.cc
index 727a70999b..e26c04d357 100644
--- a/src/box/replica.cc
+++ b/src/box/replica.cc
@@ -42,7 +42,6 @@
 #include "msgpuck/msgpuck.h"
 #include "box/cluster.h"
 #include "iproto_constants.h"
-#include "box/session.h"
 
 static const int RECONNECT_DELAY = 1.0;
 
diff --git a/src/box/replica.h b/src/box/replica.h
index 0883226353..2832e89c6f 100644
--- a/src/box/replica.h
+++ b/src/box/replica.h
@@ -28,26 +28,8 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
-#include <netinet/in.h>
-#include "tarantool_ev.h"
-#include <uri.h>
-
-enum { REMOTE_SOURCE_MAXLEN = 1024 }; /* enough to fit URI with passwords */
-
-/** Master connection */
-struct remote {
-	struct fiber *reader;
-	ev_tstamp recovery_lag, recovery_last_update_tstamp;
-	bool warning_said;
-	char source[REMOTE_SOURCE_MAXLEN];
-	struct uri uri;
-	union {
-		struct sockaddr addr;
-		struct sockaddr_storage addrstorage;
-	};
-	socklen_t addr_len;
-};
 
+struct recovery_state;
 /** Connect to a master and request a snapshot.
  * Raises an exception on error.
  *
diff --git a/src/box/replication.cc b/src/box/replication.cc
index cb26169785..4638df1f35 100644
--- a/src/box/replication.cc
+++ b/src/box/replication.cc
@@ -40,6 +40,7 @@
 #include <limits.h>
 #include <fcntl.h>
 
+#include "tarantool.h"
 #include "fiber.h"
 #include "recovery.h"
 #include "log_io.h"
@@ -49,7 +50,6 @@
 #include "box/schema.h"
 #include "box/vclock.h"
 #include "scoped_guard.h"
-#include "cfg.h"
 
 /** Replication topology
  * ----------------------
@@ -206,6 +206,7 @@ replication_join_thread(void *arg)
 
 	/* Turn off the non-blocking mode, if any. */
 	int nonblock = sio_getfl(r->relay.sock) & O_NONBLOCK;
+	sio_setfl(r->relay.sock, O_NONBLOCK, 0);
 	auto socket_guard = make_scoped_guard([=]{
 		/* Restore non-blocking mode */
 		sio_setfl(r->relay.sock, O_NONBLOCK, nonblock);
@@ -229,9 +230,10 @@ replication_join_thread(void *arg)
 void
 replication_join(int fd, struct xrow_header *packet)
 {
-	struct recovery_state *r = recovery_new(cfg_gets("snap_dir"),
-			cfg_gets("wal_dir"), replication_relay_send_row, NULL,
-			NULL, INT32_MAX);
+	struct recovery_state *r;
+	r = recovery_new(cfg_snap_dir, cfg_wal_dir,
+			 replication_relay_send_row,
+			 NULL, NULL, INT32_MAX);
 	auto recovery_guard = make_scoped_guard([&]{
 		recovery_delete(r);
 	});
@@ -249,7 +251,7 @@ replication_join(int fd, struct xrow_header *packet)
 
 	struct cord cord;
 	cord_start(&cord, name, replication_join_thread, r);
-	cord_join(&cord, NULL);
+	cord_cojoin(&cord);
 }
 
 /** Replication acceptor fiber handler. */
@@ -745,9 +747,10 @@ replication_relay_loop(struct relay *relay)
 	sio_setfl(relay->sock, O_NONBLOCK, 0);
 
 	/* Initialize the recovery process */
-	struct recovery_state *r = recovery_new(cfg_snap_dir, cfg_wal_dir,
-		      replication_relay_send_row,
-		      NULL, NULL, INT32_MAX);
+	struct recovery_state *r;
+	r = recovery_new(cfg_snap_dir, cfg_wal_dir,
+			 replication_relay_send_row,
+			 NULL, NULL, INT32_MAX);
 	r->relay = *relay; /* copy relay state to recovery */
 
 	int rc = EXIT_SUCCESS;
diff --git a/src/box/replication.h b/src/box/replication.h
index 678235a27f..3798b2a033 100644
--- a/src/box/replication.h
+++ b/src/box/replication.h
@@ -28,21 +28,7 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
-#include <tarantool.h>
-#include "trivia/util.h"
-#include "vclock.h"
-
-struct relay {
-	/** Replica connection */
-	int sock;
-	/* Request type - SUBSCRIBE or JOIN */
-	uint32_t type;
-	/* Request sync */
-	uint64_t sync;
-	/* Only used in SUBSCRIBE request */
-	uint32_t server_id;
-	struct vclock vclock;
-};
+struct xrow_header;
 
 /**
  * Pre-fork replication spawner process.
diff --git a/src/exception.cc b/src/exception.cc
index d42b42cdbc..aefc9f26e5 100644
--- a/src/exception.cc
+++ b/src/exception.cc
@@ -66,6 +66,33 @@ Exception::operator new(size_t size)
 	throw cord->exception;
 }
 
+void
+Exception::init(struct cord *cord)
+{
+	cord->exception = NULL;
+	cord->exception_size = 0;
+}
+
+void
+Exception::cleanup(struct cord *cord)
+{
+	if (cord->exception != NULL && cord->exception != &out_of_memory) {
+		cord->exception->~Exception();
+		free(cord->exception);
+	}
+	Exception::init(cord);
+}
+
+void
+Exception::move(struct cord *from, struct cord *to)
+{
+	Exception::cleanup(to);
+	to->exception = from->exception;
+	to->exception_size = from->exception_size;
+	Exception::init(from);
+}
+
+
 void
 Exception::operator delete(void * /* ptr */)
 {
diff --git a/src/exception.h b/src/exception.h
index 3cf645e399..d9afb096db 100644
--- a/src/exception.h
+++ b/src/exception.h
@@ -33,6 +33,7 @@
 #include "errcode.h"
 #include "say.h"
 
+struct cord;
 
 class Exception: public Object {
 public:
@@ -53,6 +54,11 @@ class Exception: public Object {
 	virtual void log() const = 0;
 	virtual ~Exception() {}
 
+	static void init(struct cord *cord);
+	/** Clear the last error saved in the current thread's TLS */
+	static void cleanup(struct cord *cord);
+	/** Move an exception from one thread to another. */
+	static void move(struct cord *from, struct cord *to);
 protected:
 	Exception(const char *file, unsigned line);
 	/* The copy constructor is needed for C++ throw */
diff --git a/src/fiber.cc b/src/fiber.cc
index a93683b520..b8b2dd07d8 100644
--- a/src/fiber.cc
+++ b/src/fiber.cc
@@ -513,18 +513,6 @@ fiber_destroy_all(struct cord *cord)
 		fiber_destroy(f);
 }
 
-static void
-cord_set_exception(struct cord *cord, Exception *e)
-{
-	/* Cleanup memory allocated for exceptions */
-	if (cord->exception == NULL || cord->exception == &out_of_memory)
-		return;
-	cord->exception->~Exception();
-	free(cord->exception);
-	cord->exception = e;
-	cord->exception_size = 0; /* force realloc() on next throw */
-}
-
 void
 cord_create(struct cord *cord, const char *name)
 {
@@ -546,6 +534,7 @@ cord_create(struct cord *cord, const char *name)
 
 	cord->sp = cord->stack;
 	cord->max_fid = 100;
+	Exception::init(cord);
 
 	ev_async_init(&cord->ready_async, fiber_ready_async);
 	ev_async_start(cord->loop, &cord->ready_async);
@@ -563,7 +552,7 @@ cord_destroy(struct cord *cord)
 	}
 	slab_cache_destroy(&cord->slabc);
 	ev_loop_destroy(cord->loop);
-	cord_set_exception(cord, NULL);
+	Exception::cleanup(cord);
 }
 
 struct cord_thread_arg
@@ -580,28 +569,38 @@ struct cord_thread_arg
 void *cord_thread_func(void *p)
 {
 	struct cord_thread_arg *ct_arg = (struct cord_thread_arg *) p;
-	struct cord *cord = cord() = ct_arg->cord;
+	cord() = ct_arg->cord;
+	struct cord *cord = cord();
 	cord_create(cord, ct_arg->name);
+	/** Can't possibly be the main thread */
+	assert(cord->id != main_thread_id);
 	tt_pthread_mutex_lock(&ct_arg->start_mutex);
 	void *(*f)(void *) = ct_arg->f;
 	void *arg = ct_arg->arg;
 	ct_arg->is_started = true;
 	tt_pthread_cond_signal(&ct_arg->start_cond);
 	tt_pthread_mutex_unlock(&ct_arg->start_mutex);
-
+	void *res;
 	try {
-		return f(arg);
+		res = f(arg);
+		/*
+		 * Clear a possible leftover exception object
+		 * to not confuse the invoker of the thread.
+		 */
+		Exception::cleanup(cord);
 	} catch (Exception *) {
-		if (cord->id == main_thread_id)
-			throw;
-		return CORD_EXCEPTION;
+		/*
+		 * The exception is now available to the caller
+		 * via cord->exception.
+		 */
+		res = NULL;
 	}
+	return res;
 }
 
 int
 cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg)
 {
-	memset(cord, 0, sizeof(*cord));
 	int res = -1;
 	struct cord_thread_arg ct_arg = { cord, name, f, arg, false,
 		PTHREAD_MUTEX_INITIALIZER, PTHREAD_COND_INITIALIZER };
@@ -619,48 +618,47 @@ cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg)
 }
 
 int
-cord_rawjoin(struct cord *cord, void **retval, struct Exception **exception)
+cord_join(struct cord *cord)
 {
-	void *ret = NULL;
-	int res = tt_pthread_join(cord->id, retval);
-	if (res != 0) {
-		/* We can't recover from this in any reasonable way. */
+	assert(cord() != cord); /* Can't join self. */
+	void *retval = NULL;
+	int res = tt_pthread_join(cord->id, &retval);
+	if (res == 0 && cord->exception) {
+		/*
+		 * cord_thread_func guarantees that
+		 * cord->exception is only set if the subject cord
+		 * has terminated with an uncaught exception,
+		 * transfer it to the caller.
+		 */
+		Exception::move(cord, cord());
 		cord_destroy(cord);
-		return res;
-	}
-	if (ret == CORD_EXCEPTION && exception != NULL) {
-		assert(cord->exception != NULL);
-		*exception = cord->exception; /* will be destroyed by caller */
-		cord->exception = NULL;
+		cord()->exception->raise();
 	}
-	if (retval)
-		*retval = ret;
 	cord_destroy(cord);
 	return res;
 }
 
 ssize_t
-cord_join_cb(va_list ap)
+cord_cojoin_cb(va_list ap)
 {
 	struct cord *cord = va_arg(ap, struct cord *);
-	void **retval = va_arg(ap, void **);
-	Exception **exception = va_arg(ap, Exception **);
-	return cord_rawjoin(cord, retval, exception);
+	void *retval = NULL;
+	int res = tt_pthread_join(cord->id, &retval);
+	return res;
 }
 
 int
-cord_join(struct cord *cord, void **retval)
-{
-	Exception *exception = NULL;
-	int rc = coeio_custom(cord_join_cb, TIMEOUT_INFINITY, cord, retval,
-			      &exception);
-	if (rc != 0)
-		return rc;
-	if (exception) {
-		cord_set_exception(cord, exception);
+cord_cojoin(struct cord *cord)
+{
+	assert(cord() != cord); /* Can't join self. */
+	int rc = coeio_custom(cord_cojoin_cb, TIMEOUT_INFINITY, cord);
+	if (rc == 0 && cord->exception) {
+		Exception::move(cord, cord());
+		cord_destroy(cord);
 		cord()->exception->raise(); /* re-throw exception from cord */
 	}
-	return 0;
+	cord_destroy(cord);
+	return rc;
 }
 
 void
diff --git a/src/fiber.h b/src/fiber.h
index c0d8d880c0..c6433d0b90 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -173,42 +173,45 @@ extern __thread struct cord *cord_ptr;
 #define fiber() cord()->fiber
 #define loop() (cord()->loop)
 
+/**
+ * Start a cord with the given thread function.
+ * The return value of the function can be collected
+ * with cord_join(). If the function terminates with
+ * an exception, the return value is NULL, and cord_join()
+ * moves the exception from the terminated cord to
+ * the caller of cord_join().
+ */
 int
 cord_start(struct cord *cord, const char *name,
 	   void *(*f)(void *), void *arg);
 
-#define CORD_EXCEPTION ((void *) -2)
-#define CORD_CANCELLED PTHREAD_CANCELED
-
 /**
- * \brief Synchronously wait for \a cord to terminate. If \a cord has already
- * terminated, then returns immediately.Safe to use from raw pthreads.
- * Doesn't throw exceptions.
- * \param cord cord
- * \param[out] retval exit status of the target cord, CORD_EXCEPTION if \a cord
- * was terminated due to an exception or CORD_CANCELLED if \a cord was
- * cancelled by pthread_cancel().
- * \param[out] exception if not NULL then a double pointer to original
- * exception caused \a cord to stop. It is your responsibility to free
- * allocated memory for this exception using free(3).
- * \sa pthread_join()
- * \return 0 on sucess
+ * Wait for \a cord to terminate. If \a cord has already
+ * terminated, then returns immediately.
+ *
+ * @post If the subject cord terminated with an exception,
+ * preserves the exception in the caller's cord.
+ *
+ * @param cord cord
+ * @retval  0  pthread_join succeeded.
+ *             If the thread function terminated with an
+ *             exception, the exception is raised in the
+ *             caller cord.
+ * @retval -1   pthread_join failed.
  */
 int
-cord_rawjoin(struct cord *cord, void **retval, struct Exception **exception);
+cord_join(struct cord *cord);
 
 /**
- * \brief Yield until \a cord terminated. If \a cord has already terminated,
- * then returns immediately. If \a cord was terminated due to an exception
- * when **re-throws** this exception in the calling cord/fiber.
+ * \brief Yield until \a cord has terminated.
+ * If \a cord has terminated with an uncaught exception
+ * **re-throws** this exception in the calling cord/fiber.
  * \param cord cord
- * \param retval[out] exit status of the target cord or CORD_CANCELLED if
- * \a cord was cancelled.
  * \sa pthread_join()
- * \return 0 on sucess
+ * \return 0 on success
  */
 int
-cord_join(struct cord *cord, void **retval);
+cord_cojoin(struct cord *cord);
 
 static inline void
 cord_set_name(const char *name)
diff --git a/src/tt_pthread.h b/src/tt_pthread.h
index c0a6683651..6f83a950e9 100644
--- a/src/tt_pthread.h
+++ b/src/tt_pthread.h
@@ -46,7 +46,7 @@
 
 #define tt_pthread_error(e)			\
 	if (e != 0)				\
-		say_error("%s error %d", __func__, e);\
+		say_syserror("%s error %d", __func__, e);\
 	assert(e == 0);				\
 	e
 
-- 
GitLab