From 855d0067e85e09b014c09fabac9dd1f8d4f70164 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Fri, 24 Jul 2015 20:28:07 +0300
Subject: [PATCH] wal: use cbus as much as possible for wal interconnect

Use staged mutex lock/unlock, cool off the WAL mutex
---
 src/box/wal.cc | 184 ++++++++++++++++++++++++++-----------------------
 src/box/wal.h  |   3 +-
 src/cbus.cc    |   7 ++
 src/cbus.h     |  19 +++++
 4 files changed, 123 insertions(+), 90 deletions(-)

diff --git a/src/box/wal.cc b/src/box/wal.cc
index 2ecb657aff..de7be5060f 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -42,17 +42,12 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL };
  * WAL writer - maintain a Write Ahead Log for every change
  * in the data state.
  */
-
-/* Context of the WAL writer thread. */
-STAILQ_HEAD(wal_fifo, wal_request);
-
 struct wal_writer
 {
-	struct wal_fifo input;
-	struct wal_fifo commit;
 	struct cord cord;
+	struct cpipe tx_pipe;
+	struct cpipe wal_pipe;
 	struct cbus tx_wal_bus;
-	ev_async write_event;
 	int rows_per_wal;
 	struct fio_batch *batch;
 	bool is_shutdown;
@@ -61,11 +56,23 @@ struct wal_writer
 	struct vclock vclock;
 };
 
-static struct wal_writer wal_writer;
+static void
+wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */)
+{
+	struct cpipe *pipe = (struct cpipe *) watcher->data;
 
+	cbus_lock(pipe->bus);
+	bool input_was_empty = STAILQ_EMPTY(&pipe->pipe);
+	STAILQ_CONCAT(&pipe->pipe, &pipe->input);
+	cbus_unlock(pipe->bus);
+
+	if (input_was_empty)
+		cbus_signal(pipe->bus);
+	pipe->n_input = 0;
+}
 /**
  * A commit watcher callback is invoked whenever there
- * are requests in wal_writer->commit. This callback is
+ * are requests in wal_writer->tx.pipe. This callback is
  * associated with an internal WAL writer watcher and is
  * invoked in the front-end main event loop.
  *
@@ -79,33 +86,36 @@ static struct wal_writer wal_writer;
  * call in the writer thread loop).
  */
 static void
-wal_schedule_queue(struct wal_fifo *queue)
+tx_schedule_queue(struct cmsg_fifo *queue)
 {
 	/*
 	 * Can't use STAILQ_FOREACH since fiber_call()
 	 * destroys the list entry.
 	 */
-	struct wal_request *req, *tmp;
-	STAILQ_FOREACH_SAFE(req, queue, wal_fifo_entry, tmp)
-		fiber_call(req->fiber);
+	struct cmsg *m, *tmp;
+	STAILQ_FOREACH_SAFE(m, queue, fifo, tmp)
+		fiber_call(((struct wal_request *) m)->fiber);
 }
 
 static void
-wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */)
+tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */)
 {
 	struct wal_writer *writer = (struct wal_writer *) watcher->data;
-	struct wal_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
-	struct wal_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
+	struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
+	struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
 
+	bool is_rollback;
 	cbus_lock(&writer->tx_wal_bus);
-	STAILQ_CONCAT(&commit, &writer->commit);
-	if (writer->is_rollback) {
-		STAILQ_CONCAT(&rollback, &writer->input);
-		writer->is_rollback = false;
-	}
+	STAILQ_CONCAT(&commit, &writer->tx_pipe.pipe);
+	is_rollback = writer->is_rollback;
+	if (is_rollback)
+		STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe);
+	writer->is_rollback = false;
 	cbus_unlock(&writer->tx_wal_bus);
+	if (is_rollback)
+		STAILQ_CONCAT(&rollback, &writer->wal_pipe.input);
 
-	wal_schedule_queue(&commit);
+	tx_schedule_queue(&commit);
 	/*
 	 * Perform a cascading abort of all transactions which
 	 * depend on the transaction which failed to get written
@@ -113,8 +123,8 @@ wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */)
 	 * in reverse order, performing a playback of the
 	 * in-memory database state.
 	 */
-	STAILQ_REVERSE(&rollback, wal_request, wal_fifo_entry);
-	wal_schedule_queue(&rollback);
+	STAILQ_REVERSE(&rollback, cmsg, fifo);
+	tx_schedule_queue(&rollback);
 }
 
 /**
@@ -128,12 +138,9 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock,
 {
 	cbus_create(&writer->tx_wal_bus);
 
-	STAILQ_INIT(&writer->input);
-	STAILQ_INIT(&writer->commit);
+	cpipe_create(&writer->tx_pipe);
+	cpipe_set_fetch_cb(&writer->tx_pipe, tx_fetch_output, writer);
 
-	ev_async_init(&writer->write_event, wal_schedule);
-	writer->write_event.data = writer;
-	writer->txn_loop = loop();
 	writer->rows_per_wal = rows_per_wal;
 
 	writer->batch = fio_batch_alloc(sysconf(_SC_IOV_MAX));
@@ -150,12 +157,13 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock,
 static void
 wal_writer_destroy(struct wal_writer *writer)
 {
+	cpipe_destroy(&writer->tx_pipe);
 	cbus_destroy(&writer->tx_wal_bus);
 	free(writer->batch);
 }
 
 /** WAL writer thread routine. */
-static void *wal_writer_thread(void *worker_args);
+static void wal_writer_f(va_list ap);
 
 /**
  * Initialize WAL writer, start the thread.
@@ -175,23 +183,25 @@ wal_writer_start(struct recovery_state *r, int rows_per_wal)
 	assert(r->writer == NULL);
 	assert(r->current_wal == NULL);
 	assert(rows_per_wal > 1);
-	assert(! wal_writer.is_shutdown);
-	assert(STAILQ_EMPTY(&wal_writer.input));
-	assert(STAILQ_EMPTY(&wal_writer.commit));
 
-	/* I. Initialize the state. */
-	wal_writer_init(&wal_writer, &r->vclock, rows_per_wal);
-	r->writer = &wal_writer;
+	static struct wal_writer wal_writer;
 
-	ev_async_start(wal_writer.txn_loop, &wal_writer.write_event);
+	struct wal_writer *writer = &wal_writer;
+	r->writer = writer;
+
+	/* I. Initialize the state. */
+	wal_writer_init(writer, &r->vclock, rows_per_wal);
 
 	/* II. Start the thread. */
 
-	if (cord_start(&wal_writer.cord, "wal", wal_writer_thread, r)) {
-		wal_writer_destroy(&wal_writer);
+	if (cord_costart(&writer->cord, "wal", wal_writer_f, r)) {
+		wal_writer_destroy(writer);
 		r->writer = NULL;
 		return -1;
 	}
+	cbus_join(&writer->tx_wal_bus, &writer->tx_pipe);
+	cpipe_set_flush_cb(&writer->wal_pipe, wal_flush_input,
+			   &writer->wal_pipe);
 	return 0;
 }
 
@@ -205,37 +215,18 @@ wal_writer_stop(struct recovery_state *r)
 
 	cbus_lock(&writer->tx_wal_bus);
 	writer->is_shutdown= true;
-	cbus_signal(&writer->tx_wal_bus);
 	cbus_unlock(&writer->tx_wal_bus);
+	cbus_signal(&writer->tx_wal_bus);
 	if (cord_join(&writer->cord)) {
 		/* We can't recover from this in any reasonable way. */
 		panic_syserror("WAL writer: thread join failed");
 	}
 
-	ev_async_stop(writer->txn_loop, &writer->write_event);
 	wal_writer_destroy(writer);
 
 	r->writer = NULL;
 }
 
-/**
- * Pop a bulk of requests to write to disk to process.
- * Block on the condition only if we have no other work to
- * do. Loop in case of a spurious wakeup.
- */
-void
-wal_writer_pop(struct wal_writer *writer, struct wal_fifo *input)
-{
-	while (! writer->is_shutdown)
-	{
-		if (! writer->is_rollback && ! STAILQ_EMPTY(&writer->input)) {
-			STAILQ_CONCAT(input, &writer->input);
-			break;
-		}
-		cbus_wait_signal(&writer->tx_wal_bus);
-	}
-}
-
 /**
  * If there is no current WAL, try to open it, and close the
  * previous WAL. We close the previous WAL only after opening
@@ -310,7 +301,7 @@ wal_fill_batch(struct xlog *wal, struct fio_batch *batch, int rows_per_wal,
 			iovcnt += xlog_encode_row(*row, iov);
 		}
 		fio_batch_add(batch, iovcnt);
-		req = STAILQ_NEXT(req, wal_fifo_entry);
+		req = (struct wal_request *) STAILQ_NEXT(req, fifo);
 	}
 	return req;
 }
@@ -338,20 +329,41 @@ wal_write_batch(struct xlog *wal, struct fio_batch *batch,
 			      req->rows[req->n_rows - 1]->server_id,
 			      req->rows[req->n_rows - 1]->lsn);
 		req->res = 0;
-		req = STAILQ_NEXT(req, wal_fifo_entry);
+		req = (struct wal_request *) STAILQ_NEXT(req, fifo);
 	}
 	return req;
 }
 
+/**
+ * Pop a bulk of requests to write to disk to process.
+ * Block on the condition only if we have no other work to
+ * do. Loop in case of a spurious wakeup.
+ */
+void
+wal_writer_pop(struct wal_writer *writer)
+{
+	while (! writer->is_shutdown)
+	{
+		if (! writer->is_rollback &&
+		    ! STAILQ_EMPTY(&writer->wal_pipe.pipe)) {
+			STAILQ_CONCAT(&writer->wal_pipe.output,
+				      &writer->wal_pipe.pipe);
+			break;
+		}
+		cbus_wait_signal(&writer->tx_wal_bus);
+	}
+}
+
+
 static void
 wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer,
-		  struct wal_fifo *input, struct wal_fifo *commit,
-		  struct wal_fifo *rollback)
+		  struct cmsg_fifo *input, struct cmsg_fifo *commit,
+		  struct cmsg_fifo *rollback)
 {
 	struct xlog **wal = &r->current_wal;
 	struct fio_batch *batch = writer->batch;
 
-	struct wal_request *req = STAILQ_FIRST(input);
+	struct wal_request *req = (struct wal_request *) STAILQ_FIRST(input);
 	struct wal_request *write_end = req;
 
 	while (req) {
@@ -369,29 +381,33 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer,
 		req = write_end;
 	}
 	fiber_gc();
-	STAILQ_SPLICE(input, write_end, wal_fifo_entry, rollback);
+	STAILQ_SPLICE(input, write_end, fifo, rollback);
 	STAILQ_CONCAT(commit, input);
 }
 
 /** WAL writer thread main loop.  */
-static void *
-wal_writer_thread(void *worker_args)
+static void
+wal_writer_f(va_list ap)
 {
-	struct recovery_state *r = (struct recovery_state *) worker_args;
+	struct recovery_state *r = va_arg(ap, struct recovery_state *);
 	struct wal_writer *writer = r->writer;
-	struct wal_fifo input = STAILQ_HEAD_INITIALIZER(input);
-	struct wal_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
-	struct wal_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
+
+	cpipe_create(&writer->wal_pipe);
+	cbus_join(&writer->tx_wal_bus, &writer->wal_pipe);
+
+	struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
+	struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
 
 	cbus_lock(&writer->tx_wal_bus);
 	while (! writer->is_shutdown) {
-		wal_writer_pop(writer, &input);
+		wal_writer_pop(writer);
 		cbus_unlock(&writer->tx_wal_bus);
 
-		wal_write_to_disk(r, writer, &input, &commit, &rollback);
+		wal_write_to_disk(r, writer, &writer->wal_pipe.output,
+				  &commit, &rollback);
 
 		cbus_lock(&writer->tx_wal_bus);
-		STAILQ_CONCAT(&writer->commit, &commit);
+		STAILQ_CONCAT(&writer->tx_pipe.pipe, &commit);
 		if (! STAILQ_EMPTY(&rollback)) {
 			/*
 			 * Begin rollback: create a rollback queue
@@ -400,17 +416,18 @@ wal_writer_thread(void *worker_args)
 			 * input queue.
 			 */
 			writer->is_rollback = true;
-			STAILQ_CONCAT(&rollback, &writer->input);
-			STAILQ_CONCAT(&writer->input, &rollback);
+			STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe);
+			STAILQ_CONCAT(&writer->wal_pipe.pipe, &rollback);
 		}
-		ev_async_send(writer->txn_loop, &writer->write_event);
+		ev_async_send(writer->tx_pipe.consumer,
+			      &writer->tx_pipe.fetch_output);
 	}
 	cbus_unlock(&writer->tx_wal_bus);
 	if (r->current_wal != NULL) {
 		xlog_close(r->current_wal);
 		r->current_wal = NULL;
 	}
-	return NULL;
+	cpipe_destroy(&writer->wal_pipe);
 }
 
 /**
@@ -430,16 +447,7 @@ wal_write(struct recovery_state *r, struct wal_request *req)
 	req->fiber = fiber();
 	req->res = -1;
 
-	cbus_lock(&writer->tx_wal_bus);
-
-	bool input_was_empty = STAILQ_EMPTY(&writer->input);
-	STAILQ_INSERT_TAIL(&writer->input, req, wal_fifo_entry);
-
-	if (input_was_empty)
-		cbus_signal(&writer->tx_wal_bus);
-
-	cbus_unlock(&writer->tx_wal_bus);
-
+	cpipe_push(&writer->wal_pipe, req);
 	/**
 	 * It's not safe to spuriously wakeup this fiber
 	 * since in that case it will ignore a possible
diff --git a/src/box/wal.h b/src/box/wal.h
index f2bea13e1b..42ac8f2c8f 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -40,8 +40,7 @@ enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX };
 /** String constants for the supported modes. */
 extern const char *wal_mode_STRS[];
 
-struct wal_request {
-	STAILQ_ENTRY(wal_request) wal_fifo_entry;
+struct wal_request: public cmsg {
 	/* Auxiliary. */
 	int64_t res;
 	struct fiber *fiber;
diff --git a/src/cbus.cc b/src/cbus.cc
index bf8115d281..0d07ddae34 100644
--- a/src/cbus.cc
+++ b/src/cbus.cc
@@ -66,6 +66,13 @@ cpipe_create(struct cpipe *pipe)
 	ev_async_start(pipe->consumer, &pipe->fetch_output);
 }
 
+void
+cpipe_destroy(struct cpipe *pipe)
+{
+	assert(loop() == pipe->consumer);
+	ev_async_stop(pipe->consumer, &pipe->fetch_output);
+}
+
 static void
 cpipe_join(struct cpipe *pipe, struct cbus *bus, struct cpipe *peer)
 {
diff --git a/src/cbus.h b/src/cbus.h
index 98d6c3990c..02b87bc3a6 100644
--- a/src/cbus.h
+++ b/src/cbus.h
@@ -148,6 +148,9 @@ struct cpipe {
 void
 cpipe_create(struct cpipe *pipe);
 
+void
+cpipe_destroy(struct cpipe *pipe);
+
 /**
  * Reset the default fetch output callback with a custom one.
  */
@@ -164,6 +167,22 @@ cpipe_set_fetch_cb(struct cpipe *pipe, ev_async_cb fetch_output_cb,
 	pipe->fetch_output.data = data;
 }
 
+/**
+ * Reset the default fetch output callback with a custom one.
+ */
+static inline void
+cpipe_set_flush_cb(struct cpipe *pipe, ev_async_cb flush_input_cb,
+		   void *data)
+{
+	assert(loop() == pipe->producer);
+	/*
+	 * According to libev documentation, you can set cb at
+	 * virtually any time, modulo threads.
+	 */
+	ev_set_cb(&pipe->flush_input, flush_input_cb);
+	pipe->flush_input.data = data;
+}
+
 /**
  * Pop a single message from the staged output area. If
  * the output is empty, returns NULL. There may be messages
-- 
GitLab