From 008046f1eb685f0b4e448382054c9f7e56d2a82d Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Sat, 28 Apr 2012 17:37:14 +0400
Subject: [PATCH] Follow up on wal-fsync0-merge branch.

Fix bug
https://bugs.launchpad.net/tarantool/+bug/983094
"Valgrind tests were broken by WAL writer patch"

Keep track of WAL files opened for write
in recovery

Extend atfork handler to clear the wals,
to avoid collision with reuse of recovery
to save a snapshot.

Move some WAL-specific configuration settings
from wal_class to recovery, to avoid layers
of indirection when doing RELOAD CONFIGURATION
---
 include/log_io.h    |  19 +++---
 src/log_io.m        | 139 +++++++++++++++++++++++++-------------------
 src/log_io_remote.m |   6 +-
 src/replication.m   |   8 ++-
 src/tarantool.m     |   4 ++
 5 files changed, 103 insertions(+), 73 deletions(-)

diff --git a/include/log_io.h b/include/log_io.h
index 0450d48b32..c2236c5c41 100644
--- a/include/log_io.h
+++ b/include/log_io.h
@@ -57,9 +57,6 @@ struct log_io_class {
 	row_reader *reader;
 	u64 marker, eof_marker;
 	size_t marker_size, eof_marker_size;
-	size_t rows_per_file;
-	/* wal_fsync_delay value for the log class. */
-	double fsync_delay;
 	bool panic_if_error;
 
 	/* Additional flags to apply at open(2) to write. */
@@ -107,8 +104,14 @@ struct wal_writer;
 
 struct recovery_state {
 	i64 lsn, confirmed_lsn;
-
-	struct log_io *current_wal;	/* the WAL we'r currently reading/writing from/to */
+	/* The WAL we're currently reading/writing from/to. */
+	struct log_io *current_wal;
+	/*
+	 * When opening the next WAL, we want to first open
+	 * a new file before closing the previous one. Thus
+	 * we save the old WAL here.
+	 */
+	struct log_io *previous_wal;
 	struct log_io_class *snap_class;
 	struct log_io_class *wal_class;
 	struct wal_writer *writer;
@@ -123,6 +126,8 @@ struct recovery_state {
 	ev_tstamp recovery_lag, recovery_last_update_tstamp;
 
 	int snap_io_rate_limit;
+	int rows_per_wal;
+	double wal_fsync_delay;
 	u64 cookie;
 	struct wait_lsn wait_lsn;
 
@@ -173,8 +178,8 @@ struct tbuf *convert_to_v11(struct tbuf *orig, u16 tag, u64 cookie, i64 lsn);
 
 void recovery_init(const char *snap_dirname, const char *xlog_dirname,
 		   row_handler row_handler,
-		   int rows_per_file, const char *wal_mode,
-		   double fsync_delay,
+		   int rows_per_wal, const char *wal_mode,
+		   double wal_fsync_delay,
 		   int flags, void *data);
 void recovery_update_mode(const char *wal_mode, double fsync_delay);
 void recovery_free();
diff --git a/src/log_io.m b/src/log_io.m
index 043f78a0b5..37e898bb84 100644
--- a/src/log_io.m
+++ b/src/log_io.m
@@ -169,8 +169,6 @@ v11_class(struct log_io_class *c)
 	c->marker_size = sizeof(marker_v11);
 	c->eof_marker = eof_marker_v11;
 	c->eof_marker_size = sizeof(eof_marker_v11);
-
-	c->fsync_delay = 0;
 }
 
 static void
@@ -581,6 +579,26 @@ log_io_close(struct log_io **lptr)
 	return r;
 }
 
+/** Free log_io memory and destroy it cleanly, without side
+ * effects (for use in the atfork handler).
+ */
+static void
+log_io_atfork(struct log_io **lptr)
+{
+	struct log_io *l = *lptr;
+	if (l) {
+		/*
+		 * Close the file descriptor STDIO buffer does not
+		 * make its way into the respective file in
+		 * fclose().
+		 */
+		close(fileno(l->f));
+		fclose(l->f);
+		free(l);
+		*lptr = NULL;
+	}
+}
+
 static int
 log_io_flush(struct log_io *l)
 {
@@ -1205,15 +1223,20 @@ recover_finalize(struct recovery_state *r)
 }
 
 /**
- * A pthread_atfork() callback for the child.  We fork
- * to save a snapshot, and in the child the writer
- * thread is not necessary and not present.  Make sure
- * that atexit() handlers do not try to stop the
- * non-existent thread.
+ * A pthread_atfork() callback for a child process. Today we only
+ * fork the master process to save a snapshot, and in the child
+ * the WAL writer thread is not necessary and not present.
  */
 static void
 wal_writer_child()
 {
+	log_io_atfork(&recovery_state->current_wal);
+	log_io_atfork(&recovery_state->previous_wal);
+	/*
+	 * Make sure that atexit() handlers in the child do
+	 * not try to stop the non-existent thread.
+	 * The writer is not used in the child.
+	 */
 	recovery_state->writer = NULL;
 }
 
@@ -1389,55 +1412,38 @@ wal_writer_pop(struct wal_writer *writer, bool input_was_empty)
 static int
 write_to_disk(struct recovery_state *r, struct wal_write_request *req)
 {
-	static struct log_io *wal = NULL, *wal_to_close = NULL;
 	static ev_tstamp last_flush = 0;
-	bool is_bulk_end = req == NULL || STAILQ_NEXT(req, wal_fifo_entry) == NULL;
-
-#if 0
-	/* we're not running inside ev_loop, so update ev_now manually */
-	ev_now_update();
-#endif
+	bool is_bulk_end = STAILQ_NEXT(req, wal_fifo_entry) == NULL;
 
-	/* caller requested termination */
-	if (req == NULL) {
-		if (wal != NULL)
-			log_io_close(&wal);
-#if 0
-		if (wal_to_close != NULL)
-			log_io_close(&wal_to_close);
-		recover_free((struct recovery_state*)_state);
-#endif
-		return 0;
-	}
-
-	if (wal == NULL) {
+	if (r->current_wal == NULL) {
 		int unused;
 		/* Open WAL with '.inprogress' suffix. */
-		wal = log_io_open_for_write(r, r->wal_class, req->lsn, -1,
-					    &unused);
+		r->current_wal =
+			log_io_open_for_write(r, r->wal_class, req->lsn, -1,
+					      &unused);
 	}
-	else if (wal->rows == 1) {
+	else if (r->current_wal->rows == 1) {
 		/* rename WAL after first successful write to name
 		 * without inprogress suffix*/
-		if (log_io_inprogress_rename(wal->filename) != 0) {
-			say_error("can't rename inprogress wal");
+		if (log_io_inprogress_rename(r->current_wal->filename) != 0) {
+			say_error("can't rename inprogress WAL");
 			goto fail;
 		}
 	}
 	/*
-	 * Close the file *after* we write the first record
-	 * into the new WAL, since this is when replication
-	 * relays get an inotify alarm (when we close the file),
-	 * and try to reopen the next WAL. In other words,
-	 * make sure that replication realys try to open the
-	 * next WAL only when it exists.
+	 * Close the file *after* we create the new WAL, since
+	 * this is when replication relays get an inotify alarm
+	 * (when we close the file), and try to reopen the next
+	 * WAL. In other words, make sure that replication relays
+	 * try to open the next WAL only when it exists.
 	 */
-	if (wal_to_close != NULL) {
-		if (log_io_close(&wal_to_close) != 0)
+	if (r->previous_wal != NULL) {
+		if (log_io_close(&r->previous_wal) != 0)
 			goto fail;
 	}
+	struct log_io *wal = r->current_wal;
 	if (wal == NULL) {
-		say_syserror("can't open wal");
+		say_syserror("can't open WAL");
 		goto fail;
 	}
 	req->marker = marker_v11;
@@ -1451,30 +1457,30 @@ write_to_disk(struct recovery_state *r, struct wal_write_request *req)
 	sz += sizeof(req->marker) + sizeof(req->header_crc32c) + req->len;
 	/* Write the request. */
 	if (fwrite(&req->marker, sz, 1, wal->f) != 1) {
-		say_syserror("can't write row header to wal");
+		say_syserror("can't write row header to WAL");
 		goto fail;
 	}
 
-	/* flush stdio buffer to keep replication in sync */
+	/* Flush stdio buffer to keep replication in sync. */
 	if (is_bulk_end && fflush(wal->f) < 0) {
-		say_syserror("can't flush wal");
+		say_syserror("can't flush WAL");
 		goto fail;
 	}
 
-	if (wal->class->fsync_delay > 0 &&
-	    ev_now() - last_flush >= wal->class->fsync_delay) {
+	if (r->wal_fsync_delay > 0 &&
+	    ev_now() - last_flush >= r->wal_fsync_delay) {
 		if (log_io_flush(wal) < 0) {
-			say_syserror("can't flush wal");
+			say_syserror("can't flush WAL");
 			goto fail;
 		}
 		last_flush = ev_now();
 	}
 
 	wal->rows++;
-	if (wal->class->rows_per_file <= wal->rows ||
-	    (req->lsn + 1) % wal->class->rows_per_file == 0) {
-		wal_to_close = wal;
-		wal = NULL;
+	if (r->rows_per_wal <= wal->rows ||
+	    (req->lsn + 1) % r->rows_per_wal == 0) {
+		r->previous_wal = r->current_wal;
+		r->current_wal = NULL;
 	}
 
 	req->out_lsn = req->lsn;
@@ -1494,6 +1500,9 @@ wal_writer_thread(void *worker_args)
 	bool input_was_empty = true;
 	struct wal_write_request *req;
 
+	assert(r->current_wal == NULL);
+	assert(r->previous_wal == NULL);
+
 	tt_pthread_mutex_lock(&writer->mutex);
 	while (writer->is_shutdown == false) {
 		struct wal_fifo input = wal_writer_pop(writer, input_was_empty);
@@ -1519,9 +1528,12 @@ wal_writer_thread(void *worker_args)
 	 * we were able to awake all fibers waiting on the
 	 * previous pack.
 	 */
+	if (r->current_wal != NULL)
+		log_io_close(&r->current_wal);
+	if (r->previous_wal != NULL)
+		log_io_close(&r->previous_wal);
 	if (input_was_empty == false)
 		ev_async_send(&writer->async);
-	write_to_disk(r, NULL);
 	return NULL;
 }
 
@@ -1570,15 +1582,16 @@ wal_write(struct recovery_state *r, u16 tag, u16 op, u64 cookie,
 
 void
 recovery_init(const char *snap_dirname, const char *wal_dirname,
-	     row_handler row_handler, int rows_per_file,
-	     const char *wal_mode, double fsync_delay, int flags, void *data)
+	      row_handler row_handler, int rows_per_wal,
+	      const char *wal_mode, double wal_fsync_delay,
+	      int flags, void *data)
 {
 	assert(recovery_state == NULL);
 	recovery_state = p0alloc(eter_pool, sizeof(struct recovery_state));
 	struct recovery_state *r = recovery_state;
 
-	if (rows_per_file <= 1)
-		panic("unacceptable value of 'rows_per_file'");
+	if (rows_per_wal <= 1)
+		panic("unacceptable value of 'rows_per_wal'");
 
 	r->wal_timer.data = r;
 	r->row_handler = row_handler;
@@ -1588,8 +1601,8 @@ recovery_init(const char *snap_dirname, const char *wal_dirname,
 	r->snap_class = snapshot_class_create(snap_dirname);
 
 	r->wal_class = xlog_class_create(wal_dirname);
-	r->wal_class->rows_per_file = rows_per_file;
-	r->wal_class->fsync_delay = fsync_delay;
+	r->rows_per_wal = rows_per_wal;
+	r->wal_fsync_delay = wal_fsync_delay;
 	r->wal_class->open_wflags = strcasecmp(wal_mode, "fsync") ? 0 : WAL_SYNC_FLAG;
 	wait_lsn_clear(&r->wait_lsn);
 
@@ -1608,7 +1621,7 @@ recovery_update_mode(const char *mode, double fsync_delay)
 	 * to it whenever there is a next lock/unlock of
 	 * wal_writer->mutex.
 	 */
-	r->wal_class->fsync_delay = fsync_delay;
+	r->wal_fsync_delay = fsync_delay;
 }
 
 void
@@ -1622,8 +1635,14 @@ recovery_free()
 
 	v11_class_free(recovery->snap_class);
 	v11_class_free(recovery->wal_class);
-	if (recovery->current_wal)
+	if (recovery->current_wal) {
+		/*
+		 * Possible if shutting down a replication
+		 * relay or if error during startup.
+		 */
 		log_io_close(&recovery->current_wal);
+	}
+	assert(recovery->previous_wal == NULL);
 
 	recovery_state = NULL;
 }
diff --git a/src/log_io_remote.m b/src/log_io_remote.m
index cb96d3ddb5..d099afb6a1 100644
--- a/src/log_io_remote.m
+++ b/src/log_io_remote.m
@@ -40,7 +40,7 @@
 #include <pickle.h>
 
 static int
-default_remote_row_handler(struct recovery_state *r, struct tbuf *row);
+remote_apply_row(struct recovery_state *r, struct tbuf *row);
 
 static struct tbuf *
 remote_row_reader_v11()
@@ -134,7 +134,7 @@ pull_from_remote(void *state)
 		r->recovery_lag = ev_now() - row_v11(row)->tm;
 		r->recovery_last_update_tstamp = ev_now();
 
-		if (default_remote_row_handler(r, row) < 0) {
+		if (remote_apply_row(r, row) < 0) {
 			fiber_close();
 			continue;
 		}
@@ -144,7 +144,7 @@ pull_from_remote(void *state)
 }
 
 static int
-default_remote_row_handler(struct recovery_state *r, struct tbuf *row)
+remote_apply_row(struct recovery_state *r, struct tbuf *row)
 {
 	struct tbuf *data;
 	i64 lsn = row_v11(row)->lsn;
diff --git a/src/replication.m b/src/replication.m
index b40ee701ea..efc782d9a4 100644
--- a/src/replication.m
+++ b/src/replication.m
@@ -139,9 +139,11 @@ static int
 replication_relay_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t);
 
 
-/*-----------------------------------------------------------------------------*/
-/* replication module                                                          */
-/*-----------------------------------------------------------------------------*/
+/*
+ * ------------------------------------------------------------------------
+ * replication module
+ * ------------------------------------------------------------------------
+ */
 
 /** Check replication module configuration. */
 int
diff --git a/src/tarantool.m b/src/tarantool.m
index 01b21b93db..deb7f83bfc 100644
--- a/src/tarantool.m
+++ b/src/tarantool.m
@@ -279,6 +279,10 @@ snapshot(void *ev, int events __attribute__((unused)))
 	fiber_set_name(fiber, "dumper");
 	set_proc_title("dumper (%" PRIu32 ")", getppid());
 
+	/*
+	 * Safety: make sure we don't double-write
+	 * parent stdio buffers at exit().
+	 */
 	close_all_xcpt(1, sayfd);
 	snapshot_save(recovery_state, mod_snapshot);
 
-- 
GitLab