From 577ef3a79f8bd552ce1381724abc47a19025d6b8 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Mon, 9 Apr 2012 20:30:46 +0400
Subject: [PATCH] Thread-based WAL writer: fix the bug with out of order
 confirm_lsn().

---
 core/log_io.m | 83 ++++++++++++++++++++++++++++++++++++---------------
 core/say.m    |  2 +-
 2 files changed, 60 insertions(+), 25 deletions(-)

diff --git a/core/log_io.m b/core/log_io.m
index 345901a9cb..c0e2b66421 100644
--- a/core/log_io.m
+++ b/core/log_io.m
@@ -67,10 +67,11 @@ struct recovery_state *recovery_state;
 
 struct wal_writer
 {
-	STAILQ_HEAD(wal_fifo, wal_write_request) input;
+	STAILQ_HEAD(wal_fifo, wal_write_request) input, output;
 	pthread_t thread;
 	pthread_mutex_t mutex;
 	pthread_cond_t cond;
+	ev_async async;
 	bool is_shutdown;
 };
 
@@ -120,7 +121,7 @@ confirm_lsn(struct recovery_state *r, i64 lsn)
 
 		return 0;
 	} else {
-		say_warn("lsn double confirmed:%" PRIi64, r->confirmed_lsn);
+		say_warn("lsn double confirmed:%" PRIi64, lsn);
 	}
 
 	return -1;
@@ -1211,6 +1212,29 @@ wal_writer_init_once()
 	pthread_atfork(NULL, NULL, wal_writer_child);
 }
 
+static void
+wal_writer_schedule(ev_watcher *watcher, int event __attribute__((unused)))
+{
+	struct wal_writer *writer = watcher->data;
+	struct wal_fifo output;
+
+	tt_pthread_mutex_lock(&writer->mutex);
+	output = writer->output;
+	STAILQ_INIT(&writer->output);
+	tt_pthread_mutex_unlock(&writer->mutex);
+
+	/*
+	 * Can't use STAILQ_FOREACH since fiber_call()
+	 * destroys the list entry.
+	 */
+	struct wal_write_request *req = STAILQ_FIRST(&output);
+	while (req) {
+		struct fiber *f = req->fiber;
+		req = STAILQ_NEXT(req, wal_fifo_entry);
+		fiber_call(f);
+	}
+}
+
 static void
 wal_writer_init(struct wal_writer *writer)
 {
@@ -1238,6 +1262,10 @@ wal_writer_init(struct wal_writer *writer)
 	tt_pthread_condattr_destroy(&clock_monotonic);
 
 	STAILQ_INIT(&writer->input);
+	STAILQ_INIT(&writer->output);
+
+	ev_async_init(&writer->async, (void *) wal_writer_schedule);
+	writer->async.data = writer;
 
 	tt_pthread_once(&wal_writer_once, wal_writer_init_once);
 }
@@ -1265,11 +1293,15 @@ wal_writer_start(struct recovery_state *state)
 {
 	assert(state->writer == NULL);
 	assert(wal_writer.is_shutdown == false);
+	assert(STAILQ_EMPTY(&wal_writer.input));
+	assert(STAILQ_EMPTY(&wal_writer.output));
 
 	/* I. Initialize the state. */
 	wal_writer_init(&wal_writer);
 	state->writer = &wal_writer;
 
+	ev_async_start(&wal_writer.async);
+
 	/* II. Start the thread. */
 
 	if (pthread_create(&wal_writer.thread, NULL, wal_writer_thread,
@@ -1295,22 +1327,26 @@ wal_writer_stop(struct recovery_state *state)
 	tt_pthread_cond_signal(&writer->cond);
 	tt_pthread_mutex_unlock(&writer->mutex);
 
-	if (pthread_join(writer->thread, NULL) == 0) {
-		wal_writer_destroy(writer);
-		return 0;
-	}
-	say_syserror("WAL writer: thread join failed");
+	if (pthread_join(writer->thread, NULL) != 0)
+		goto error;
+
+	ev_async_stop(&writer->async);
+	wal_writer_destroy(writer);
+	return 0;
+error:
+	/* We can't recover from this in any reasonable way. */
+	panic_syserror("WAL writer: thread join failed");
 	return -1;
 }
 
 struct wal_fifo
-wal_writer_pop(struct wal_writer *writer, bool wait)
+wal_writer_pop(struct wal_writer *writer, bool input_was_empty)
 {
 	struct wal_fifo input;
 	do {
 		input = writer->input;
 		STAILQ_INIT(&writer->input);
-		if (STAILQ_EMPTY(&input) == false || wait == false)
+		if (STAILQ_EMPTY(&input) == false || input_was_empty == false)
 			break;
 		tt_pthread_cond_wait(&writer->cond, &writer->mutex);
 	} while (writer->is_shutdown == false);
@@ -1413,35 +1449,32 @@ wal_writer_thread(void *worker_args)
 {
 	struct recovery_state *r = worker_args;
 	struct wal_writer *writer = r->writer;
-	struct wal_fifo output = STAILQ_HEAD_INITIALIZER(output);
+	bool input_was_empty = true;
 	struct wal_write_request *req;
 
 	tt_pthread_mutex_lock(&writer->mutex);
 	while (writer->is_shutdown == false) {
-		struct wal_fifo input =
-			wal_writer_pop(writer, STAILQ_EMPTY(&output));
+		struct wal_fifo input = wal_writer_pop(writer, input_was_empty);
 		pthread_mutex_unlock(&writer->mutex);
 		/*
-		 * Check the old list of fibers to wakeup *here*
-		 * since we needed a membar for its out_lsn's to
+		 * Wake up fibers waiting on the old list *here*
+		 * since we need a membar for request out_lsn's to
 		 * sync up.
 		 */
-		STAILQ_FOREACH(req, &output, wal_fifo_entry) {
-			/*
-			 * @todo:
-			 * Even though wal_write() is not
-			 * a cancellation point, check the fiber
-			 * wasn't cancelled and recycled.
-			 * */
-			fiber_wakeup(req->fiber);
-		}
+		if (input_was_empty == false)
+			ev_async_send(&writer->async);
+
 		STAILQ_FOREACH(req, &input, wal_fifo_entry) {
 			(void) write_to_disk(r, req);
 		}
-		output = input;
+		input_was_empty = STAILQ_EMPTY(&input);
 		tt_pthread_mutex_lock(&writer->mutex);
+		STAILQ_CONCAT(&writer->output, &input);
 	}
 	tt_pthread_mutex_unlock(&writer->mutex);
+
+	if (input_was_empty == false)
+		ev_async_send(&writer->async);
 	write_to_disk(r, NULL);
 	return NULL;
 }
@@ -1478,6 +1511,8 @@ wal_write(struct recovery_state *r, u16 tag, u16 op, u64 cookie,
 
 	fiber_yield();
 
+	assert(req->out_lsn == 0 || (req->lsn == lsn && req->out_lsn == lsn));
+
 	return req->out_lsn == 0 ? -1 : 0;
 }
 
diff --git a/core/say.m b/core/say.m
index fd3d517439..fdbe0e5667 100644
--- a/core/say.m
+++ b/core/say.m
@@ -116,7 +116,7 @@ vsay(int level, const char *filename, int line, const char *error, const char *f
 	const char *peer_name = fiber_peer_name(fiber);
 	size_t p = 0, len = PIPE_BUF;
 	const char *f;
-	static char buf[PIPE_BUF];
+	static __thread char buf[PIPE_BUF];
 
 	if (booting) {
 		fprintf(stderr, "%s: ", binary_filename);
-- 
GitLab