From c1f30ad5551d70c2fc3d654bb65e29a71d17765a Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Tue, 8 May 2012 10:45:59 +0400
Subject: [PATCH] Extract members pertaining t local recovery into a separate
 data struct.

Start extracting recovery members/functions related to local
recovery to a separate data structure (recover_remaining_wals()
mess still remains).
---
 include/log_io.h |   8 +-
 src/log_io.m     | 197 +++++++++++++++++++++++++++++------------------
 2 files changed, 126 insertions(+), 79 deletions(-)

diff --git a/include/log_io.h b/include/log_io.h
index 7ff8f9e2d6..4d002983cd 100644
--- a/include/log_io.h
+++ b/include/log_io.h
@@ -91,16 +91,16 @@ struct log_io {
 	struct log_io_class *class;
 	FILE *f;
 
-	ev_stat stat;
 	enum log_mode mode;
 	size_t rows;
-	size_t retry;
+	int retry;
 	char filename[PATH_MAX + 1];
 
 	bool is_inprogress;
 };
 
 struct wal_writer;
+struct wal_watcher;
 
 struct recovery_state {
 	i64 lsn, confirmed_lsn;
@@ -115,14 +115,14 @@ struct recovery_state {
 	struct log_io_class *snap_class;
 	struct log_io_class *wal_class;
 	struct wal_writer *writer;
+	struct wal_watcher *watcher;
+	struct fiber *remote_recovery;
 
 	/* row_handler will be presented by most recent format of data
 	   log_io_class->reader is responsible of converting data from old format */
 	row_handler *row_handler;
 	struct sockaddr_in remote_addr;
-	struct fiber *remote_recovery;
 
-	ev_timer wal_timer;
 	ev_tstamp recovery_lag, recovery_last_update_tstamp;
 
 	int snap_io_rate_limit;
diff --git a/src/log_io.m b/src/log_io.m
index 9c84823fa5..11dd7a828d 100644
--- a/src/log_io.m
+++ b/src/log_io.m
@@ -146,6 +146,28 @@ static struct wal_writer wal_writer;
 
 static struct tbuf *row_reader_v11(FILE *f, struct palloc_pool *pool);
 
+/**
+ * This is used in local hot standby or replication
+ * relay mode: look for changes in the wal_dir and apply them
+ * locally or send to the replica.
+ */
+struct wal_watcher {
+	/**
+	 * Rescan the WAL directory in search for new WAL files
+	 * every wal_dir_rescan_delay seconds.
+	 */
+	ev_timer dir_timer;
+	/**
+	 * When the latest WAL does not contain a EOF marker,
+	 * re-read its tail on every change in file metadata.
+	 */
+	ev_stat stat;
+	/** Path to the file being watched with 'stat'. */
+	char filename[PATH_MAX+1];
+};
+
+static struct wal_watcher wal_watcher;
+
 struct log_io_iter {
 	struct tarantool_coro coro;
 	struct log_io *log;
@@ -194,7 +216,6 @@ confirm_lsn(struct recovery_state *r, i64 lsn)
 
 
 /** Wait until the given LSN makes its way to disk. */
-
 void
 recovery_wait_lsn(struct recovery_state *r, i64 lsn)
 {
@@ -633,8 +654,6 @@ log_io_close(struct log_io **lptr)
 			say_error("can't write eof_marker");
 	}
 
-	if (ev_is_active(&l->stat))
-		ev_stat_stop(&l->stat);
 	r = fclose(l->f);
 	if (r < 0)
 		say_error("can't close");
@@ -759,8 +778,7 @@ error:
 
 
 static struct log_io *
-log_io_open_for_read(struct recovery_state *recover,
-		     struct log_io_class *class,
+log_io_open_for_read(struct log_io_class *class,
 		     i64 lsn, int suffix,
 		     const char *filename)
 {
@@ -772,7 +790,6 @@ log_io_open_for_read(struct recovery_state *recover,
 		return NULL;
 	}
 	l->mode = LOG_READ;
-	l->stat.data = recover;
 	l->is_inprogress = suffix == -1 ? true : false;
 
 	/* when filename is not null it is forced open for debug reading */
@@ -811,8 +828,7 @@ error:
  * and sets errno.
  */
 struct log_io *
-log_io_open_for_write(struct recovery_state *recover,
-		      struct log_io_class *class, i64 lsn, int suffix)
+log_io_open_for_write(struct log_io_class *class, i64 lsn, int suffix)
 {
 	struct log_io *l = NULL;
 	int fd;
@@ -827,7 +843,6 @@ log_io_open_for_write(struct recovery_state *recover,
 	}
 	l->mode = LOG_WRITE;
 	l->class = class;
-	l->stat.data = recover;
 
 	assert(lsn > 0);
 
@@ -901,7 +916,7 @@ read_log(const char *filename,
 		return -1;
 	}
 
-	l = log_io_open_for_read(NULL, c, 0, 0, filename);
+	l = log_io_open_for_read(c, 0, 0, filename);
 	iter_open(l, &i, read_rows);
 	while ((row = iter_inner(&i, (void *)1)))
 		h(state, row);
@@ -938,7 +953,7 @@ recover_snap(struct recovery_state *r)
 			return -1;
 		}
 
-		snap = log_io_open_for_read(r, r->snap_class, lsn, 0, NULL);
+		snap = log_io_open_for_read(r->snap_class, lsn, 0, NULL);
 		if (snap == NULL) {
 			say_error("can't find/open snapshot");
 			return -1;
@@ -1073,11 +1088,12 @@ recover_remaining_wals(struct recovery_state *r)
 		 * one last time. */
 		if (r->current_wal != NULL) {
 			if (r->current_wal->retry++ < 3) {
-				say_warn("try reread `%s' despite newer WAL exists",
-					 r->current_wal->filename);
+				say_warn("`%s' has no EOF marker, yet a newer WAL file exists:"
+					 "trying to read (attempt %d)",
+					 r->current_wal->filename, r->current_wal->retry);
 				goto recover_current_wal;
 			} else {
-				say_warn("wal `%s' wasn't correctly closed",
+				say_warn("WAL `%s' wasn't correctly closed",
 					 r->current_wal->filename);
 				log_io_close(&r->current_wal);
 			}
@@ -1085,14 +1101,14 @@ recover_remaining_wals(struct recovery_state *r)
 
 		/* TODO: find a better way of finding the next xlog */
 		current_lsn = r->confirmed_lsn + 1;
-		next_wal = log_io_open_for_read(r, r->wal_class, current_lsn,
+		next_wal = log_io_open_for_read(r->wal_class, current_lsn,
 						0, NULL);
 		/*
 		 * When doing final recovery, and dealing with the
 		 * last file, try opening .<suffix>.inprogress.
 		 */
 		if (next_wal == NULL && r->finalize && current_lsn == wal_greatest_lsn) {
-			next_wal = log_io_open_for_read(r, r->wal_class, current_lsn, -1, NULL);
+			next_wal = log_io_open_for_read(r->wal_class, current_lsn, -1, NULL);
 			if (next_wal == NULL) {
 				char *filename =
 					format_filename(NULL, r->wal_class, current_lsn, -1);
@@ -1142,7 +1158,7 @@ recover_current_wal:
 
 	/*
 	 * It's not a fatal error when last WAL is empty, but if
-	 * we lost some logs it is a fatal error.
+	 * we lose some logs it is a fatal error.
 	 */
 	if (wal_greatest_lsn > r->confirmed_lsn + 1) {
 		say_error("not all WALs have been successfully read");
@@ -1198,7 +1214,7 @@ recover(struct recovery_state *r, i64 lsn)
 		/* No WALs to recover from. */
 		goto out;
 	}
-	r->current_wal = log_io_open_for_read(r, r->wal_class, wal_lsn, 0, NULL);
+	r->current_wal = log_io_open_for_read(r->wal_class, wal_lsn, 0, NULL);
 	if (r->current_wal == NULL)
 		goto out;
 	if (recover_remaining_wals(r) < 0)
@@ -1208,31 +1224,46 @@ out:
 	prelease(fiber->gc_pool);
 }
 
-static void recovery_follow_file(ev_stat *w, int revents __attribute__((unused)));
+static void recovery_rescan_file(ev_stat *w, int revents __attribute__((unused)));
+
+static void
+recovery_watch_file(struct wal_watcher *watcher, struct log_io *wal)
+{
+	strncpy(watcher->filename, wal->filename, PATH_MAX);
+	ev_stat_init(&watcher->stat, recovery_rescan_file, watcher->filename, 0.);
+	ev_stat_start(&watcher->stat);
+}
 
 static void
-recovery_follow_dir(ev_timer *w, int revents __attribute__((unused)))
+recovery_stop_file(struct wal_watcher *watcher)
+{
+	ev_stat_stop(&watcher->stat);
+}
+
+static void
+recovery_rescan_dir(ev_timer *w, int revents __attribute__((unused)))
 {
 	struct recovery_state *r = w->data;
-	struct log_io *wal = r->current_wal;
+	struct wal_watcher *watcher = r->watcher;
+	struct log_io *save_current_wal = r->current_wal;
+
 	int result = recover_remaining_wals(r);
 	if (result < 0)
 		panic("recover failed: %i", result);
-
-	/* recover_remaining_wals found new wal */
-	if (r->current_wal != NULL && wal != r->current_wal) {
-		ev_stat *stat = &r->current_wal->stat;
-		ev_stat_init(stat, recovery_follow_file, r->current_wal->filename, 0.);
-		ev_stat_start(stat);
+	if (save_current_wal != r->current_wal) {
+		if (save_current_wal != NULL)
+			recovery_stop_file(watcher);
+		if (r->current_wal != NULL)
+			recovery_watch_file(watcher, r->current_wal);
 	}
 }
 
 static void
-recovery_follow_file(ev_stat *w, int revents __attribute__((unused)))
+recovery_rescan_file(ev_stat *w, int revents __attribute__((unused)))
 {
 	struct recovery_state *r = w->data;
-	int result;
-	result = recover_wal(r, r->current_wal);
+	struct wal_watcher *watcher = r->watcher;
+	int result = recover_wal(r, r->current_wal);
 	if (result < 0)
 		panic("recover failed");
 	if (result == LOG_EOF) {
@@ -1240,21 +1271,42 @@ recovery_follow_file(ev_stat *w, int revents __attribute__((unused)))
 			 r->current_wal->filename,
 			 r->confirmed_lsn);
 		log_io_close(&r->current_wal);
-		recovery_follow_dir((ev_timer *)w, 0);
+		recovery_stop_file(watcher);
+		/* Don't wait for wal_dir_rescan_delay. */
+		recovery_rescan_dir(&watcher->dir_timer, 0);
 	}
 }
 
 void
 recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay)
 {
-	ev_timer_init(&r->wal_timer, recovery_follow_dir,
+	assert(r->watcher == NULL);
+	assert(r->writer == NULL);
+
+	struct wal_watcher  *watcher = r->watcher= &wal_watcher;
+
+	ev_timer_init(&watcher->dir_timer, recovery_rescan_dir,
 		      wal_dir_rescan_delay, wal_dir_rescan_delay);
-	ev_timer_start(&r->wal_timer);
-	if (r->current_wal != NULL) {
-		ev_stat *stat = &r->current_wal->stat;
-		ev_stat_init(stat, recovery_follow_file, r->current_wal->filename, 0.);
-		ev_stat_start(stat);
-	}
+	watcher->dir_timer.data = watcher->stat.data = r;
+	ev_timer_start(&watcher->dir_timer);
+	/*
+	 * recover() leaves the current wal open if it has no
+	 * EOF marker.
+	 */
+	if (r->current_wal != NULL)
+		recovery_watch_file(watcher, r->current_wal);
+}
+
+static void
+recovery_stop_local(struct recovery_state *r)
+{
+	struct wal_watcher *watcher = r->watcher;
+	assert(ev_is_active(&watcher->dir_timer));
+	ev_timer_stop(&watcher->dir_timer);
+	if (ev_is_active(&watcher->stat))
+		ev_stat_stop(&watcher->stat);
+
+	r->watcher = NULL;
 }
 
 void
@@ -1262,15 +1314,10 @@ recovery_finalize(struct recovery_state *r)
 {
 	int result;
 
-	r->finalize = true;
-
-	if (ev_is_active(&r->wal_timer))
-		ev_timer_stop(&r->wal_timer);
+	if (r->watcher)
+		recovery_stop_local(r);
 
-	if (r->current_wal != NULL) {
-		if (ev_is_active(&r->current_wal->stat))
-			ev_stat_stop(&r->current_wal->stat);
-	}
+	r->finalize = true;
 
 	result = recover_remaining_wals(r);
 	if (result < 0)
@@ -1426,35 +1473,35 @@ static void *wal_writer_thread(void *worker_args);
  *         points to a newly created WAL writer.
  */
 static int
-wal_writer_start(struct recovery_state *state)
+wal_writer_start(struct recovery_state *r)
 {
-	assert(state->writer == NULL);
+	assert(r->writer == NULL);
+	assert(r->watcher == NULL);
 	assert(wal_writer.is_shutdown == false);
 	assert(STAILQ_EMPTY(&wal_writer.input));
 	assert(STAILQ_EMPTY(&wal_writer.output));
 
 	/* I. Initialize the state. */
 	wal_writer_init(&wal_writer);
-	state->writer = &wal_writer;
+	r->writer = &wal_writer;
 
 	ev_async_start(&wal_writer.async);
 
 	/* II. Start the thread. */
 
-	if (tt_pthread_create(&wal_writer.thread, NULL, wal_writer_thread,
-			      state)) {
+	if (tt_pthread_create(&wal_writer.thread, NULL, wal_writer_thread, r)) {
 		wal_writer_destroy(&wal_writer);
-		state->writer = NULL;
+		r->writer = NULL;
 		return -1;
 	}
 	return 0;
 }
 
 /** Stop and destroy the writer thread (at shutdown). */
-static int
-wal_writer_stop(struct recovery_state *state)
+void
+wal_writer_stop(struct recovery_state *r)
 {
-	struct wal_writer *writer = state->writer;
+	struct wal_writer *writer = r->writer;
 
 	/* Stop the worker thread. */
 
@@ -1463,18 +1510,15 @@ wal_writer_stop(struct recovery_state *state)
 	tt_pthread_cond_signal(&writer->cond);
 	tt_pthread_mutex_unlock(&writer->mutex);
 
-	if (pthread_join(writer->thread, NULL) != 0)
-		goto error;
+	if (pthread_join(writer->thread, NULL) != 0) {
+		/* We can't recover from this in any reasonable way. */
+		panic_syserror("WAL writer: thread join failed");
+	}
 
 	ev_async_stop(&writer->async);
 	wal_writer_destroy(writer);
 
-	state->writer = NULL;
-	return 0;
-error:
-	/* We can't recover from this in any reasonable way. */
-	panic_syserror("WAL writer: thread join failed");
-	return -1;
+	r->writer = NULL;
 }
 
 /**
@@ -1508,7 +1552,7 @@ write_to_disk(struct recovery_state *r, struct wal_write_request *req)
 	if (r->current_wal == NULL) {
 		/* Open WAL with '.inprogress' suffix. */
 		r->current_wal =
-			log_io_open_for_write(r, r->wal_class, req->lsn, -1);
+			log_io_open_for_write(r->wal_class, req->lsn, -1);
 	}
 	else if (r->current_wal->rows == 1) {
 		/* rename WAL after first successful write to name
@@ -1682,7 +1726,6 @@ recovery_init(const char *snap_dirname, const char *wal_dirname,
 	if (rows_per_wal <= 1)
 		panic("unacceptable value of 'rows_per_wal'");
 
-	r->wal_timer.data = r;
 	r->row_handler = row_handler;
 	r->remote_recovery = NULL;
 
@@ -1719,22 +1762,26 @@ recovery_update_io_rate_limit(double new_limit)
 void
 recovery_free()
 {
-	struct recovery_state *recovery = recovery_state;
-	if (recovery == NULL)
+	struct recovery_state *r = recovery_state;
+	if (r == NULL)
 		return;
-	if (recovery->writer)
-		wal_writer_stop(recovery);
 
-	v11_class_free(recovery->snap_class);
-	v11_class_free(recovery->wal_class);
-	if (recovery->current_wal) {
+	if (r->watcher)
+		recovery_stop_local(r);
+
+	if (r->writer)
+		wal_writer_stop(r);
+
+	v11_class_free(r->snap_class);
+	v11_class_free(r->wal_class);
+	if (r->current_wal) {
 		/*
 		 * Possible if shutting down a replication
 		 * relay or if error during startup.
 		 */
-		log_io_close(&recovery->current_wal);
+		log_io_close(&r->current_wal);
 	}
-	assert(recovery->previous_wal == NULL);
+	assert(r->previous_wal == NULL);
 
 	recovery_state = NULL;
 }
@@ -1835,7 +1882,7 @@ snapshot_save(struct recovery_state *r, void (*f) (struct log_io_iter *))
 
 	memset(&i, 0, sizeof(i));
 
-	snap = log_io_open_for_write(r, r->snap_class, r->confirmed_lsn, -1);
+	snap = log_io_open_for_write(r->snap_class, r->confirmed_lsn, -1);
 	if (snap == NULL)
 		panic_status(errno, "Failed to save snapshot: failed to open file in write mode.");
 
-- 
GitLab