From 624ab51b916dbb33353cfae9432e4a6709c7777a Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Tue, 2 Jun 2015 15:39:56 +0300
Subject: [PATCH] recovery: rewrite recover_remaining_wals()

Don't log exceptions of dead joinable fibers - it will
be logged by the caller, so avoid double logging.

recover_remaining_wals() is now more like an iterator,
vclockset_match() is only called when there is no current WAL.

Add a test case for gaps in LSNs.
---
 src/box/box.cc                        |   6 +-
 src/box/memtx_engine.cc               |   2 +-
 src/box/recovery.cc                   | 164 ++++++++---------
 src/box/recovery.h                    |   4 +-
 src/box/sophia_engine.cc              |   2 +-
 src/box/vclock.c                      |   2 +-
 src/box/vclock.h                      |  21 ++-
 src/box/xlog.cc                       |  25 ++-
 src/box/xlog.h                        |   9 +
 src/fiber.cc                          |   7 +-
 test/unit/fiber.result                |   2 -
 test/unit/vclock.cc                   |  10 +-
 test/xlog/errinj.result               |   1 +
 test/xlog/errinj.test.lua             |   1 +
 test/xlog/lsn_gap.result              |   4 +-
 test/xlog/lsn_gap.test.py             |   2 +-
 test/xlog/missing.result              |   4 +-
 test/xlog/missing.test.py             |   2 +-
 test/xlog/panic.lua                   |  12 ++
 test/xlog/panic_on_lsn_gap.result     | 249 ++++++++++++++++++++++++++
 test/xlog/panic_on_lsn_gap.test.lua   | 112 ++++++++++++
 test/xlog/panic_on_wal_error.result   |  20 ++-
 test/xlog/panic_on_wal_error.test.lua |   8 +
 test/xlog/suite.ini                   |   2 +-
 test/xlog/xlog.lua                    |   2 +-
 25 files changed, 547 insertions(+), 126 deletions(-)
 create mode 100644 test/xlog/panic.lua
 create mode 100644 test/xlog/panic_on_lsn_gap.result
 create mode 100644 test/xlog/panic_on_lsn_gap.test.lua

diff --git a/src/box/box.cc b/src/box/box.cc
index d151548230..fb83de3f3b 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -456,14 +456,14 @@ box_init(void)
 		/* Initialize a new replica */
 		engine_begin_join();
 		replica_bootstrap(recovery);
-		int64_t checkpoint_id = vclock_signature(&recovery->vclock);
+		int64_t checkpoint_id = vclock_sum(&recovery->vclock);
 		engine_checkpoint(checkpoint_id);
 	} else {
 		/* Initialize the first server of a new cluster */
 		recovery_bootstrap(recovery);
 		box_set_cluster_uuid();
 		box_set_server_uuid();
-		int64_t checkpoint_id = vclock_signature(&recovery->vclock);
+		int64_t checkpoint_id = vclock_sum(&recovery->vclock);
 		engine_checkpoint(checkpoint_id);
 	}
 	fiber_gc();
@@ -518,7 +518,7 @@ int
 box_snapshot()
 {
 	/* create snapshot file */
-	int64_t checkpoint_id = vclock_signature(&recovery->vclock);
+	int64_t checkpoint_id = vclock_sum(&recovery->vclock);
 	return engine_checkpoint(checkpoint_id);
 }
 
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index bd905a1ee0..8647dadd43 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -267,7 +267,7 @@ recover_snap(struct recovery_state *r)
 	 */
 	if (res == NULL)
 		tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
-	int64_t signature = vclock_signature(res);
+	int64_t signature = vclock_sum(res);
 
 	struct xlog *snap = xlog_open(&r->snap_dir, signature);
 	auto guard = make_scoped_guard([=]{
diff --git a/src/box/recovery.cc b/src/box/recovery.cc
index 1333f19d68..95c2bcaf1c 100644
--- a/src/box/recovery.cc
+++ b/src/box/recovery.cc
@@ -169,7 +169,6 @@ recovery_new(const char *snap_dirname, const char *wal_dirname,
 
 	r->apply_row = apply_row;
 	r->apply_row_param = apply_row_param;
-	r->signature = -1;
 	r->snap_io_rate_limit = UINT64_MAX;
 
 	xdir_create(&r->snap_dir, snap_dirname, SNAP, &r->server_uuid);
@@ -221,6 +220,14 @@ recovery_setup_panic(struct recovery_state *r, bool on_snap_error,
 static inline void
 recovery_close_log(struct recovery_state *r)
 {
+	if (r->current_wal == NULL)
+		return;
+	if (r->current_wal->eof_read) {
+		say_info("done `%s'", r->current_wal->filename);
+	} else {
+		say_warn("file `%s` wasn't correctly closed",
+			 r->current_wal->filename);
+	}
 	xlog_close(r->current_wal);
 	r->current_wal = NULL;
 }
@@ -263,13 +270,12 @@ recovery_apply_row(struct recovery_state *r, struct xrow_header *row)
 		r->apply_row(r, r->apply_row_param, row);
 }
 
-#define LOG_EOF 0
-
 /**
- * @retval 0 OK, read full xlog.
- * @retval 1 OK, read some but not all rows, or no EOF marker
+ * Read all rows in a file starting from the last position.
+ * Advance the position. If end of file is reached,
+ * set l.eof_read.
  */
-int
+void
 recover_xlog(struct recovery_state *r, struct xlog *l)
 {
 	struct xlog_cursor i;
@@ -281,6 +287,12 @@ recover_xlog(struct recovery_state *r, struct xlog *l)
 	});
 
 	struct xrow_header row;
+	/*
+	 * xlog_cursor_next() returns 1 when
+	 * it can not read more rows. This doesn't mean
+	 * the file is fully read: it's fully read only
+	 * when EOF marker has been read, see i.eof_read
+	 */
 	while (xlog_cursor_next(&i, &row) == 0) {
 		try {
 			recovery_apply_row(r, &row);
@@ -301,15 +313,6 @@ recover_xlog(struct recovery_state *r, struct xlog *l)
 		panic("snapshot `%s' has no EOF marker", l->filename);
 	}
 
-	/*
-	 * xlog_cursor_next() returns 1 when
-	 * it can not read more rows. This doesn't mean
-	 * the file is fully read: it's fully read only
-	 * when EOF marker has been read. This is
-	 * why eof_read is used here to indicate the
-	 * end of log.
-	 */
-	return !i.eof_read;
 }
 
 void
@@ -331,7 +334,8 @@ recovery_bootstrap(struct recovery_state *r)
 	recover_xlog(r, snap);
 }
 
-/** Find out if there are new .xlog files since the current
+/**
+ * Find out if there are new .xlog files since the current
  * LSN, and read them all up.
  *
  * This function will not close r->current_wal if
@@ -342,75 +346,65 @@ recover_remaining_wals(struct recovery_state *r)
 {
 	xdir_scan(&r->wal_dir);
 
-	struct vclock *last_vclock = vclockset_last(&r->wal_dir.index);
-	int64_t last_signature = last_vclock != NULL ?
-		vclock_signature(last_vclock) : -1;
+	struct vclock *last = vclockset_last(&r->wal_dir.index);
+	if (last == NULL) {
+		assert(r->current_wal == NULL);
+		/** Nothing to do. */
+		return;
+	}
+	assert(vclockset_next(&r->wal_dir.index, last) == NULL);
+
 	/* If the caller already opened WAL for us, recover from it first */
-	struct vclock *current_vclock;
+	struct vclock *clock;
 	if (r->current_wal != NULL) {
-		current_vclock = &r->current_wal->vclock;
-		goto recover_current_wal;
+		clock = vclockset_match(&r->wal_dir.index,
+					&r->current_wal->vclock);
+		if (clock != NULL &&
+		    vclock_compare(clock, &r->current_wal->vclock) == 0)
+			goto recover_current_wal;
+		/*
+		 * The current WAL has disappeared under our feet -
+		 * assume anything can happen in production and go
+		 * on.
+		 */
+		assert(false);
 	}
 
-	while (1) {
-		current_vclock =
-			vclockset_match(&r->wal_dir.index, &r->vclock,
-					r->wal_dir.panic_if_error);
-		if (current_vclock == NULL)
-			break; /* No more WALs */
-
-		if (vclock_signature(current_vclock) <= r->signature) {
-			if (r->signature == last_signature)
-				break;
-			say_error("missing xlog between %020lld and %020lld",
-				  (long long) vclock_signature(current_vclock),
-				  (long long) last_signature);
-			if (r->wal_dir.panic_if_error)
-				break;
+	for (clock = vclockset_match(&r->wal_dir.index, &r->vclock);
+	     clock != NULL;
+	     clock = vclockset_next(&r->wal_dir.index, clock)) {
 
-			/* Ignore missing WALs */
-			say_warn("ignoring missing WALs");
-			current_vclock = vclockset_next(&r->wal_dir.index,
-							current_vclock);
-			assert(current_vclock != NULL);
-		}
-		assert(r->current_wal == NULL);
-		try {
-			r->current_wal = xlog_open(&r->wal_dir,
-					   vclock_signature(current_vclock));
-		} catch (XlogError *e) {
+		if (vclock_compare(clock, &r->vclock) > 0) {
+			/**
+			 * The best clock we could find is
+			 * greater or is incomparable with the
+			 * current state of recovery.
+			 */
+			XlogGapError *e =
+				tnt_error(XlogGapError, &r->vclock, clock);
+
+			if (r->wal_dir.panic_if_error)
+				throw e;
 			e->log();
-			break;
+			/* Ignore missing WALs */
+			say_warn("ignoring a gap in LSN");
 		}
+		recovery_close_log(r);
+
+		r->current_wal = xlog_open(&r->wal_dir, vclock_sum(clock));
+
 		say_info("recover from `%s'", r->current_wal->filename);
 
 recover_current_wal:
-		r->signature = vclock_signature(current_vclock);
-		int result = recover_xlog(r, r->current_wal);
-
-		if (result == LOG_EOF) {
-			say_info("done `%s'", r->current_wal->filename);
-			recovery_close_log(r);
-			/* goto find_next_wal; */
-		} else if (r->signature == last_signature) {
-			/* last file is not finished */
-			break;
-		} else {
-			say_warn("WAL `%s` wasn't correctly closed",
-				 r->current_wal->filename);
-			recovery_close_log(r);
-		}
-	}
-
-	/*
-	 * It's not a fatal error when last WAL is empty, but if
-	 * we lose some logs it is a fatal error.
-	 */
-	if (last_signature > r->signature) {
-		tnt_raise(XlogError,
-			  "not all WALs have been successfully read");
+		if (r->current_wal->eof_read == false)
+			recover_xlog(r, r->current_wal);
+		/**
+		 * Keep the last log open to remember recovery
+		 * position. This speeds up recovery in local hot
+		 * standby mode, since we don't have to re-open
+		 * and re-scan the last log in recovery_finalize().
+		 */
 	}
-
 	region_free(&fiber()->gc);
 }
 
@@ -422,15 +416,11 @@ recovery_finalize(struct recovery_state *r, enum wal_mode wal_mode,
 
 	recover_remaining_wals(r);
 
-	if (r->current_wal != NULL) {
-		say_warn("WAL `%s' wasn't correctly closed",
-			 r->current_wal->filename);
-		recovery_close_log(r);
-	}
+	recovery_close_log(r);
 
 	if (vclockset_last(&r->wal_dir.index) != NULL &&
-	    vclock_signature(&r->vclock) ==
-	    vclock_signature(vclockset_last(&r->wal_dir.index))) {
+	    vclock_sum(&r->vclock) ==
+	    vclock_sum(vclockset_last(&r->wal_dir.index))) {
 		/**
 		 * The last log file had zero rows -> bump
 		 * LSN so that we don't stumble over this
@@ -957,8 +947,10 @@ wal_writer_thread(void *worker_args)
 		ev_async_send(writer->txn_loop, &writer->write_event);
 	}
 	(void) tt_pthread_mutex_unlock(&writer->mutex);
-	if (r->current_wal != NULL)
-		recovery_close_log(r);
+	if (r->current_wal != NULL) {
+		xlog_close(r->current_wal);
+		r->current_wal = NULL;
+	}
 	return NULL;
 }
 
@@ -975,7 +967,7 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
 	 */
 	fill_lsn(r, row);
 	if (r->wal_mode == WAL_NONE)
-		return vclock_signature(&r->vclock);
+		return vclock_sum(&r->vclock);
 
 	ERROR_INJECT_RETURN(ERRINJ_WAL_IO);
 
@@ -1011,7 +1003,7 @@ wal_write(struct recovery_state *r, struct xrow_header *row)
 	fiber_set_cancellable(cancellable);
 	if (req->res == -1)
 		return -1;
-	return vclock_signature(&r->vclock);
+	return vclock_sum(&r->vclock);
 }
 
 /* }}} */
@@ -1023,7 +1015,7 @@ recovery_last_checkpoint(struct recovery_state *r)
 {
 	/* recover last snapshot lsn */
 	struct vclock *vclock = vclockset_last(&r->snap_dir.index);
-	return vclock ? vclock_signature(vclock) : -1;
+	return vclock ? vclock_sum(vclock) : -1;
 }
 
 /* }}} */
diff --git a/src/box/recovery.h b/src/box/recovery.h
index 93c5c2eb7e..9aae1746b3 100644
--- a/src/box/recovery.h
+++ b/src/box/recovery.h
@@ -85,8 +85,6 @@ struct recovery_state {
 	struct xlog *current_wal;
 	struct xdir snap_dir;
 	struct xdir wal_dir;
-	/** Used to find missing xlog files */
-	int64_t signature;
 	struct wal_writer *writer;
 	/**
 	 * This is used in local hot standby or replication
@@ -132,7 +130,7 @@ recovery_has_data(struct recovery_state *r)
 	       vclockset_first(&r->wal_dir.index) != NULL;
 }
 void recovery_bootstrap(struct recovery_state *r);
-int
+void
 recover_xlog(struct recovery_state *r, struct xlog *l);
 void recovery_follow_local(struct recovery_state *r,
 			   const char *name,
diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc
index dac9e99ab4..6bccf55785 100644
--- a/src/box/sophia_engine.cc
+++ b/src/box/sophia_engine.cc
@@ -165,7 +165,7 @@ SophiaEngine::join(Relay *relay)
 	struct vclock *res = vclockset_last(&relay->r->snap_dir.index);
 	if (res == NULL)
 		tnt_raise(ClientError, ER_MISSING_SNAPSHOT);
-	int64_t signt = vclock_signature(res);
+	int64_t signt = vclock_sum(res);
 
 	/* get snapshot object */
 	char id[128];
diff --git a/src/box/vclock.c b/src/box/vclock.c
index 2d0cfbe08e..49d62e0d53 100644
--- a/src/box/vclock.c
+++ b/src/box/vclock.c
@@ -181,7 +181,7 @@ vclock_from_string(struct vclock *vclock, const char *str)
 		goto error;
 	end:
 		if (*p == '\0') {
-			vclock->signature = vclock_sum(vclock);
+			vclock->signature = vclock_calc_sum(vclock);
 			return 0;
 		} else if (isblank(*p)) {
 			++p;
diff --git a/src/box/vclock.h b/src/box/vclock.h
index 6c1eb35f8b..e0cbe1952b 100644
--- a/src/box/vclock.h
+++ b/src/box/vclock.h
@@ -131,7 +131,7 @@ vclock_size(const struct vclock *vclock)
 }
 
 static inline int64_t
-vclock_sum(const struct vclock *vclock)
+vclock_calc_sum(const struct vclock *vclock)
 {
 	int64_t sum = 0;
 	struct vclock_iterator it;
@@ -142,7 +142,7 @@ vclock_sum(const struct vclock *vclock)
 }
 
 static inline int64_t
-vclock_signature(const struct vclock *vclock)
+vclock_sum(const struct vclock *vclock)
 {
 	return vclock->signature;
 }
@@ -229,17 +229,16 @@ rb_proto(, vclockset_, vclockset_t, struct vclock);
  * @return a vclock that <= than \a key
  */
 static inline struct vclock *
-vclockset_match(vclockset_t *set, struct vclock *key,
-		bool panic_if_error)
+vclockset_match(vclockset_t *set, struct vclock *key)
 {
 	struct vclock *match = vclockset_psearch(set, key);
 	/**
 	 * vclockset comparator returns 0 for
-	 * incomparable keys. So the match doesn't have to be
+	 * incomparable keys, rendering them equal.
+	 * So the match, even when found, is not necessarily
 	 * strictly preceding the search key, it may be
-	 * incomparable. If this is the case, unwind until
-	 * we get to a key which is strictly below the search
-	 * pattern.
+	 * incomparable. If this is the case, unwind until we get
+	 * to a key which is strictly below the search pattern.
 	 */
 	while (match != NULL) {
 		if (vclock_compare(match, key) <= 0)
@@ -249,10 +248,10 @@ vclockset_match(vclockset_t *set, struct vclock *key,
 	}
 	/*
 	 * There is no xlog which is strictly less than the search
-	 * pattern. Return the fist successor log - it is either
+	 * pattern. Return the first log - it is either
 	 * strictly greater, or incomparable with the key.
 	 */
-	return panic_if_error ? NULL: vclockset_first(set);
+	return vclockset_first(set);
 }
 
 #if defined(__cplusplus)
@@ -293,7 +292,7 @@ vclock_del_server(struct vclock *vclock, uint32_t server_id)
 	assert(vclock_has(vclock, server_id));
 	vclock->lsn[server_id] = 0;
 	vclock->map &= ~(1 << server_id);
-	vclock->signature = vclock_sum(vclock);
+	vclock->signature = vclock_calc_sum(vclock);
 }
 
 #endif /* defined(__cplusplus) */
diff --git a/src/box/xlog.cc b/src/box/xlog.cc
index dbe5cc6418..2b80411426 100644
--- a/src/box/xlog.cc
+++ b/src/box/xlog.cc
@@ -64,6 +64,21 @@ XlogError::XlogError(const char *file, unsigned line,
 	va_end(ap);
 }
 
+XlogGapError::XlogGapError(const char *file, unsigned line,
+			   const struct vclock *from,
+			   const struct vclock *to)
+	:XlogError(file, line, "")
+{
+	char *s_from = vclock_to_string(from);
+	char *s_to = vclock_to_string(to);
+	snprintf(m_errmsg, sizeof(m_errmsg),
+		 "Missing .xlog file between LSN %lld %s and %lld %s",
+		 (long long) vclock_sum(from), s_from ? s_from : "",
+		 (long long) vclock_sum(to), s_to ? s_to : "");
+	free(s_from);
+	free(s_to);
+}
+
 /* {{{ struct xdir */
 
 void
@@ -290,7 +305,7 @@ xdir_scan(struct xdir *dir)
 	struct vclock *vclock = vclockset_first(&dir->index);
 	int i = 0;
 	while (i < s_count || vclock != NULL) {
-		int64_t s_old = vclock ? vclock_signature(vclock) : LLONG_MAX;
+		int64_t s_old = vclock ? vclock_sum(vclock) : LLONG_MAX;
 		int64_t s_new = i < s_count ? signatures[i] : LLONG_MAX;
 		if (s_old < s_new) {
 			/** Remove a deleted file from the index */
@@ -465,6 +480,7 @@ xlog_cursor_close(struct xlog_cursor *i)
 {
 	struct xlog *l = i->log;
 	l->rows += i->row_count;
+	l->eof_read = i->eof_read;
 	/*
 	 * Since we don't close the xlog
 	 * we must rewind it to the last known
@@ -764,7 +780,7 @@ xlog_read_meta(struct xlog *l, int64_t signature)
 	 * the sum of vector clock coordinates must be the same
 	 * as the name of the file.
 	 */
-	int64_t signature_check = vclock_signature(&l->vclock);
+	int64_t signature_check = vclock_sum(&l->vclock);
 	if (signature_check != signature) {
 		tnt_raise(XlogError, "%s: signature check failed",
 			  l->filename);
@@ -796,6 +812,7 @@ xlog_open_stream(struct xdir *dir, int64_t signature, FILE *file, const char *fi
 	l->mode = LOG_READ;
 	l->dir = dir;
 	l->is_inprogress = false;
+	l->eof_read = false;
 	vclock_create(&l->vclock);
 
 	xlog_read_meta(l, signature);
@@ -847,7 +864,7 @@ xlog_create(struct xdir *dir, const struct vclock *vclock)
 	FILE *f = NULL;
 	struct xlog *l = NULL;
 
-	int64_t signature = vclock_signature(vclock);
+	int64_t signature = vclock_sum(vclock);
 	assert(signature >= 0);
 	assert(!tt_uuid_is_nil(dir->server_uuid));
 
@@ -884,6 +901,8 @@ xlog_create(struct xdir *dir, const struct vclock *vclock)
 	l->mode = LOG_WRITE;
 	l->dir = dir;
 	l->is_inprogress = true;
+	/*  Makes no sense, but well. */
+	l->eof_read = false;
 	vclock_copy(&l->vclock, vclock);
 	setvbuf(l->f, NULL, _IONBF, 0);
 	if (xlog_write_meta(l) != 0)
diff --git a/src/box/xlog.h b/src/box/xlog.h
index 56b7a5200f..8ddea55742 100644
--- a/src/box/xlog.h
+++ b/src/box/xlog.h
@@ -45,6 +45,13 @@ struct XlogError: public Exception
 		  const char *format, ...);
 };
 
+struct XlogGapError: public XlogError
+{
+	XlogGapError(const char *file, unsigned line,
+		  const struct vclock *from,
+		  const struct vclock *to);
+};
+
 /* {{{ log dir */
 
 /**
@@ -182,6 +189,8 @@ struct xlog {
 	char filename[PATH_MAX + 1];
 	/** Whether this file has .inprogress suffix. */
 	bool is_inprogress;
+	/** True if eof has been read when reading the log. */
+	bool eof_read;
 	/**
 	 * Text file header: server uuid. We read
 	 * only logs with our own uuid, to avoid situations
diff --git a/src/fiber.cc b/src/fiber.cc
index 22c0d12d68..f83ce7ac1a 100644
--- a/src/fiber.cc
+++ b/src/fiber.cc
@@ -410,7 +410,12 @@ fiber_loop(void *data __attribute__((unused)))
 				 fiber_name(fiber));
 			say_info("fiber `%s': exiting", fiber_name(fiber));
 		} catch (Exception *e) {
-			e->log();
+			/*
+			 * For joinable fibers, it's the business
+			 * of the caller to deal with the error.
+			 */
+			if (! (fiber->flags & FIBER_IS_JOINABLE))
+				e->log();
 		} catch (...) {
 			/* This can only happen in case of a server bug. */
 			say_error("fiber `%s': unknown exception",
diff --git a/test/unit/fiber.result b/test/unit/fiber.result
index 66e6d3e914..e0a1605f8c 100644
--- a/test/unit/fiber.result
+++ b/test/unit/fiber.result
@@ -1,7 +1,5 @@
 (null): fiber `cancel' has been cancelled
 (null): fiber `cancel': exiting
-(null): SystemError Failed to allocate 42 bytes in allocator for exception: Cannot allocate memory
-(null): SystemError Failed to allocate 42 bytes in allocator for exception: Cannot allocate memory
 	*** fiber_join_test ***
 # exception propagated
 # cancel dead has started
diff --git a/test/unit/vclock.cc b/test/unit/vclock.cc
index a73d6abb37..913f5dcbd0 100644
--- a/test/unit/vclock.cc
+++ b/test/unit/vclock.cc
@@ -156,9 +156,9 @@ test_isearch()
 
 	int64_t queries[][NODE_N + 1] = {
 		/* not found (lsns are too old) */
-		{  0,  0, 0, 0, /* => */ INT64_MAX},
-		{  1,  0, 0, 0, /* => */ INT64_MAX},
-		{  5,  0, 0, 0, /* => */ INT64_MAX},
+		{  0,  0, 0, 0, /* => */ 10},
+		{  1,  0, 0, 0, /* => */ 10},
+		{  5,  0, 0, 0, /* => */ 10},
 
 		/* =10.xlog (left bound) */
 		{  10, 0, 0, 0, /* => */ 10},
@@ -230,8 +230,8 @@ test_isearch()
 		}
 
 		int64_t check = *(query + NODE_N);
-		struct vclock *res = vclockset_match(&set, &vclock, true);
-		int64_t value = res != NULL ? vclock_signature(res) : INT64_MAX;
+		struct vclock *res = vclockset_match(&set, &vclock);
+		int64_t value = res != NULL ? vclock_sum(res) : INT64_MAX;
 		is(value, check, "query #%d", q + 1);
 	}
 
diff --git a/test/xlog/errinj.result b/test/xlog/errinj.result
index 84ac4d54f1..28e4ad5f43 100644
--- a/test/xlog/errinj.result
+++ b/test/xlog/errinj.result
@@ -34,6 +34,7 @@ box.space._schema:delete{"key"}
 ---
 - ['key']
 ...
+-- list all the logs
 require('fio').glob("*.xlog")
 ---
 - - 00000000000000000000.xlog
diff --git a/test/xlog/errinj.test.lua b/test/xlog/errinj.test.lua
index 81e17ec91b..8666ae3207 100644
--- a/test/xlog/errinj.test.lua
+++ b/test/xlog/errinj.test.lua
@@ -19,6 +19,7 @@ box.space._schema:insert{"key"}
 --# start server dont_panic 
 box.space._schema:get{"key"}
 box.space._schema:delete{"key"}
+-- list all the logs
 require('fio').glob("*.xlog")
 --# stop server dont_panic 
 --# cleanup server dont_panic
diff --git a/test/xlog/lsn_gap.result b/test/xlog/lsn_gap.result
index 69d1de749e..685fc1efa8 100644
--- a/test/xlog/lsn_gap.result
+++ b/test/xlog/lsn_gap.result
@@ -20,9 +20,9 @@ box.space.test:insert{4, 'fourth tuple'}
 ---
 - [4, 'fourth tuple']
 ...
-check log line for 'ignoring missing WAL'
+check log line for 'ignoring a gap in LSN'
 
-'ignoring missing WAL' exists in server log
+'ignoring a gap in LSN' exists in server log
 
 box.space.test:select{}
 ---
diff --git a/test/xlog/lsn_gap.test.py b/test/xlog/lsn_gap.test.py
index 3409ba3c0b..4ba2664468 100644
--- a/test/xlog/lsn_gap.test.py
+++ b/test/xlog/lsn_gap.test.py
@@ -26,7 +26,7 @@ server.stop()
 os.unlink(wal)
 
 server.start()
-line="ignoring missing WAL"
+line="ignoring a gap in LSN"
 print "check log line for '%s'" % line
 print
 if server.logfile_pos.seek_once(line) >= 0:
diff --git a/test/xlog/missing.result b/test/xlog/missing.result
index 546519abd8..c52f75c68d 100644
--- a/test/xlog/missing.result
+++ b/test/xlog/missing.result
@@ -28,9 +28,9 @@ box.space.test:delete{3}
 ---
 - [3, 'third tuple']
 ...
-check log line for 'ignoring missing WAL'
+check log line for 'ignoring a gap in LSN'
 
-'ignoring missing WAL' exists in server log
+'ignoring a gap in LSN' exists in server log
 
 box.space.test:select{}
 ---
diff --git a/test/xlog/missing.test.py b/test/xlog/missing.test.py
index 2b47f510c3..50c97df4cf 100644
--- a/test/xlog/missing.test.py
+++ b/test/xlog/missing.test.py
@@ -32,7 +32,7 @@ os.unlink(wal)
 # tarantool doesn't issue an LSN for deletes which delete nothing
 # this may lead to infinite recursion at start
 server.start()
-line="ignoring missing WAL"
+line="ignoring a gap in LSN"
 print "check log line for '%s'" % line
 print
 if server.logfile_pos.seek_once(line) >= 0:
diff --git a/test/xlog/panic.lua b/test/xlog/panic.lua
new file mode 100644
index 0000000000..7b47126940
--- /dev/null
+++ b/test/xlog/panic.lua
@@ -0,0 +1,12 @@
+#!/usr/bin/env tarantool
+os = require('os')
+
+box.cfg{
+    listen              = os.getenv("LISTEN"),
+    slab_alloc_arena    = 0.1,
+    pid_file            = "tarantool.pid",
+    panic_on_wal_error  = true,
+    rows_per_wal        = 10
+}
+
+require('console').listen(os.getenv('ADMIN'))
diff --git a/test/xlog/panic_on_lsn_gap.result b/test/xlog/panic_on_lsn_gap.result
new file mode 100644
index 0000000000..aacd7086a9
--- /dev/null
+++ b/test/xlog/panic_on_lsn_gap.result
@@ -0,0 +1,249 @@
+--
+-- we actually need to know what xlogs the server creates,
+-- so start from a clean state
+--
+--
+-- Check how the server is able to find the next
+-- xlog if there are failed writes (lsn gaps).
+--
+--# create server panic with script='xlog/panic.lua'
+--# start server panic
+--# set connection panic
+box.info.vclock
+---
+- - 0
+...
+s = box.space._schema
+---
+...
+-- we need to have at least one record in the
+-- xlog otherwise the server believes that there
+-- is an lsn gap during recovery.
+--
+s:replace{"key", 'test 1'}
+---
+- ['key', 'test 1']
+...
+box.info.vclock
+---
+- - 1
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+---
+- ok
+...
+t = {}
+---
+...
+--
+-- Try to insert rows, so that it's time to
+-- switch WALs. No switch will happen though,
+-- since no writes were made.
+--
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    status, msg = pcall(s.replace, s, {"key"})
+    table.insert(t, msg)
+end;
+---
+...
+--# setopt delimiter ''
+t
+---
+- - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+...
+--
+-- Before restart: oops, our LSN is 11,
+-- even though we didn't insert anything.
+--
+box.info.vclock
+---
+- - 11
+...
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+...
+--# stop server panic
+--# start server panic
+--
+-- after restart: our LSN is the LSN of the
+-- last *written* row, all the failed
+-- rows are gone from lsn counter.
+--
+box.info.vclock
+---
+- - 1
+...
+box.space._schema:select{'key'}
+---
+- - ['key', 'test 1']
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+---
+- ok
+...
+t = {}
+---
+...
+s = box.space._schema
+---
+...
+--
+-- now do the same
+--
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    status, msg = pcall(s.replace, s, {"key"})
+    table.insert(t, msg)
+end;
+---
+...
+--# setopt delimiter ''
+t
+---
+- - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+  - Failed to write to disk
+...
+box.info.vclock
+---
+- - 11
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", false)
+---
+- ok
+...
+--
+-- Write a good row after a series of failed
+-- rows. There is a gap in LSN, correct,
+-- but it's *inside* a single WAL, so doesn't
+-- affect WAL search in recover_remaining_wals()
+--
+s:replace{'key', 'test 2'}
+---
+- ['key', 'test 2']
+...
+--
+-- notice that vclock before and after
+-- server stop is the same -- because it's
+-- recorded in the last row
+--
+box.info.vclock
+---
+- - 12
+...
+--# stop server panic
+--# start server panic
+box.info.vclock
+---
+- - 12
+...
+box.space._schema:select{'key'}
+---
+- - ['key', 'test 2']
+...
+-- list all the logs
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+  - 00000000000000000001.xlog
+...
+-- now insert 10 rows - so that the next
+-- row will need to switch the WAL
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    box.space._schema:replace{"key", 'test 3'}
+end;
+---
+...
+--# setopt delimiter ''
+-- the next insert should switch xlog, but aha - it fails
+-- a new xlog file is created but has 0 rows
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+  - 00000000000000000001.xlog
+  - 00000000000000000012.xlog
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+---
+- ok
+...
+box.space._schema:replace{"key", 'test 3'}
+---
+- error: Failed to write to disk
+...
+box.info.vclock
+---
+- - 23
+...
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+  - 00000000000000000001.xlog
+  - 00000000000000000012.xlog
+  - 00000000000000000022.xlog
+...
+-- and the next one (just to be sure
+box.space._schema:replace{"key", 'test 3'}
+---
+- error: Failed to write to disk
+...
+box.info.vclock
+---
+- - 24
+...
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+  - 00000000000000000001.xlog
+  - 00000000000000000012.xlog
+  - 00000000000000000022.xlog
+...
+box.error.injection.set("ERRINJ_WAL_WRITE", false)
+---
+- ok
+...
+-- then a success
+box.space._schema:replace{"key", 'test 4'}
+---
+- ['key', 'test 4']
+...
+box.info.vclock
+---
+- - 25
+...
+require('fio').glob("*.xlog")
+---
+- - 00000000000000000000.xlog
+  - 00000000000000000001.xlog
+  - 00000000000000000012.xlog
+  - 00000000000000000022.xlog
+...
+-- restart is ok
+--# stop server panic
+--# start server panic
+box.space._schema:select{'key'}
+---
+- - ['key', 'test 4']
+...
+--# stop server panic
+--# cleanup server panic
+--# set connection default
diff --git a/test/xlog/panic_on_lsn_gap.test.lua b/test/xlog/panic_on_lsn_gap.test.lua
new file mode 100644
index 0000000000..bf40d08346
--- /dev/null
+++ b/test/xlog/panic_on_lsn_gap.test.lua
@@ -0,0 +1,112 @@
+--
+-- we actually need to know what xlogs the server creates,
+-- so start from a clean state
+--
+--
+-- Check how the server is able to find the next
+-- xlog if there are failed writes (lsn gaps).
+--
+--# create server panic with script='xlog/panic.lua'
+--# start server panic
+--# set connection panic
+box.info.vclock
+s = box.space._schema
+-- we need to have at least one record in the
+-- xlog otherwise the server believes that there
+-- is an lsn gap during recovery.
+--
+s:replace{"key", 'test 1'}
+box.info.vclock
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+t = {}
+--
+-- Try to insert rows, so that it's time to
+-- switch WALs. No switch will happen though,
+-- since no writes were made.
+--
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    status, msg = pcall(s.replace, s, {"key"})
+    table.insert(t, msg)
+end;
+--# setopt delimiter ''
+t
+--
+-- Before restart: oops, our LSN is 11,
+-- even though we didn't insert anything.
+--
+box.info.vclock
+require('fio').glob("*.xlog")
+--# stop server panic
+--# start server panic
+--
+-- after restart: our LSN is the LSN of the
+-- last *written* row, all the failed
+-- rows are gone from lsn counter.
+--
+box.info.vclock
+box.space._schema:select{'key'}
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+t = {}
+s = box.space._schema
+--
+-- now do the same
+--
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    status, msg = pcall(s.replace, s, {"key"})
+    table.insert(t, msg)
+end;
+--# setopt delimiter ''
+t
+box.info.vclock
+box.error.injection.set("ERRINJ_WAL_WRITE", false)
+--
+-- Write a good row after a series of failed
+-- rows. There is a gap in LSN, correct,
+-- but it's *inside* a single WAL, so doesn't
+-- affect WAL search in recover_remaining_wals()
+--
+s:replace{'key', 'test 2'}
+--
+-- notice that vclock before and after
+-- server stop is the same -- because it's
+-- recorded in the last row
+--
+box.info.vclock
+--# stop server panic
+--# start server panic
+box.info.vclock
+box.space._schema:select{'key'}
+-- list all the logs
+require('fio').glob("*.xlog")
+-- now insert 10 rows - so that the next
+-- row will need to switch the WAL
+--# setopt delimiter ';'
+for i=1,box.cfg.rows_per_wal do
+    box.space._schema:replace{"key", 'test 3'}
+end;
+--# setopt delimiter ''
+-- the next insert should switch xlog, but aha - it fails
+-- a new xlog file is created but has 0 rows
+require('fio').glob("*.xlog")
+box.error.injection.set("ERRINJ_WAL_WRITE", true)
+box.space._schema:replace{"key", 'test 3'}
+box.info.vclock
+require('fio').glob("*.xlog")
+-- and the next one (just to be sure
+box.space._schema:replace{"key", 'test 3'}
+box.info.vclock
+require('fio').glob("*.xlog")
+box.error.injection.set("ERRINJ_WAL_WRITE", false)
+-- then a success
+box.space._schema:replace{"key", 'test 4'}
+box.info.vclock
+require('fio').glob("*.xlog")
+-- restart is ok
+--# stop server panic
+--# start server panic
+box.space._schema:select{'key'}
+--# stop server panic
+--# cleanup server panic
+--# set connection default
diff --git a/test/xlog/panic_on_wal_error.result b/test/xlog/panic_on_wal_error.result
index fe3b84ab47..a54e6c6587 100644
--- a/test/xlog/panic_on_wal_error.result
+++ b/test/xlog/panic_on_wal_error.result
@@ -1,4 +1,21 @@
 -- preparatory stuff
+fio = require('fio')
+---
+...
+glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
+---
+...
+for _, file in pairs(fio.glob(glob)) do fio.unlink(file) end
+---
+...
+glob = fio.pathjoin(box.cfg.wal_dir, '*.snap')
+---
+...
+for _, file in pairs(fio.glob(glob)) do fio.unlink(file) end
+---
+...
+--# stop server default
+--# start server default
 box.schema.user.grant('guest', 'replication')
 ---
 ...
@@ -116,7 +133,7 @@ box.info.replication.status
 ...
 box.info.replication.message
 ---
-- not all WALs have been successfully read
+- 'Missing .xlog file between LSN 6 {1: 6, 2: 0} and 8 {1: 8, 2: 0}'
 ...
 box.space.test:select{}
 ---
@@ -126,6 +143,7 @@ box.space.test:select{}
 --
 --# set connection default 
 --# stop server replica
+--# cleanup server replica
 --
 -- cleanup
 box.space.test:drop()
diff --git a/test/xlog/panic_on_wal_error.test.lua b/test/xlog/panic_on_wal_error.test.lua
index c2a4ab28c4..243c06465e 100644
--- a/test/xlog/panic_on_wal_error.test.lua
+++ b/test/xlog/panic_on_wal_error.test.lua
@@ -1,4 +1,11 @@
 -- preparatory stuff
+fio = require('fio')
+glob = fio.pathjoin(box.cfg.wal_dir, '*.xlog')
+for _, file in pairs(fio.glob(glob)) do fio.unlink(file) end
+glob = fio.pathjoin(box.cfg.wal_dir, '*.snap')
+for _, file in pairs(fio.glob(glob)) do fio.unlink(file) end
+--# stop server default
+--# start server default
 box.schema.user.grant('guest', 'replication')
 _ = box.schema.space.create('test')
 _ = box.space.test:create_index('pk')
@@ -67,6 +74,7 @@ box.space.test:select{}
 --
 --# set connection default 
 --# stop server replica
+--# cleanup server replica
 --
 -- cleanup
 box.space.test:drop()
diff --git a/test/xlog/suite.ini b/test/xlog/suite.ini
index f49b38ac4e..e76c7fc0f0 100644
--- a/test/xlog/suite.ini
+++ b/test/xlog/suite.ini
@@ -4,6 +4,6 @@ description = tarantool write ahead log tests
 script = xlog.lua
 disabled =
 valgrind_disabled =
-release_disabled = errinj.test.lua
+release_disabled = errinj.test.lua, panic_on_lsn_gap.test.lua
 lua_libs =
 use_unix_sockets = True
diff --git a/test/xlog/xlog.lua b/test/xlog/xlog.lua
index 2041feb7c8..3bd8968c20 100644
--- a/test/xlog/xlog.lua
+++ b/test/xlog/xlog.lua
@@ -6,7 +6,7 @@ box.cfg{
     slab_alloc_arena    = 0.1,
     pid_file            = "tarantool.pid",
     panic_on_wal_error  = false,
-    rows_per_wal        = 50
+    rows_per_wal        = 10
 }
 
 require('console').listen(os.getenv('ADMIN'))
-- 
GitLab