From 056deb2cf0488486fd667c6161eda8f7bdbf2096 Mon Sep 17 00:00:00 2001
From: Georgy Kirichenko <georgy@tarantool.org>
Date: Tue, 25 Dec 2018 23:51:57 +0300
Subject: [PATCH] replication: promote tx vclock only after successful wal
 write

Applier used to promote vclock prior to applying the row. This lead to
a situation when master's row would be skipped forever in case there is
an error trying to apply it. However, some errors are transient, and we
might be able to successfully apply the same row later.

While we're at it, make wal writer the only one responsible for
advancing replicaset vclock. It was already doing it for rows coming
from the local instance, besides, it makes the code cleaner since now we
want to advance vclock direct from wal batch reply and lets us get rid of
unnecessary checks whether applier or wal has already advanced the
vclock.

Closes #2283
Prerequisite #980
---
 src/box/applier.cc                          | 42 +++++---------
 src/box/wal.c                               | 43 ++++----------
 test/replication/skip_conflict_row.result   | 63 +++++++++++++++++++++
 test/replication/skip_conflict_row.test.lua | 20 +++++++
 4 files changed, 110 insertions(+), 58 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 512d05dfa1..7f37fe2ee0 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -530,34 +530,18 @@ applier_subscribe(struct applier *applier)
 
 		applier->lag = ev_now(loop()) - row.tm;
 		applier->last_row_time = ev_monotonic_now(loop());
-
+		struct replica *replica = replica_by_id(row.replica_id);
+		struct latch *latch = (replica ? &replica->order_latch :
+				       &replicaset.applier.order_latch);
+		/*
+		 * In a full mesh topology, the same set of changes
+		 * may arrive via two concurrently running appliers.
+		 * Hence we need a latch to strictly order all changes
+		 * that belong to the same server id.
+		 */
+		latch_lock(latch);
 		if (vclock_get(&replicaset.vclock, row.replica_id) < row.lsn) {
-			/**
-			 * Promote the replica set vclock before
-			 * applying the row. If there is an
-			 * exception (conflict) applying the row,
-			 * the row is skipped when the replication
-			 * is resumed.
-			 */
-			vclock_follow_xrow(&replicaset.vclock, &row);
-			struct replica *replica = replica_by_id(row.replica_id);
-			struct latch *latch = (replica ? &replica->order_latch :
-					       &replicaset.applier.order_latch);
-			/*
-			 * In a full mesh topology, the same set
-			 * of changes may arrive via two
-			 * concurrently running appliers. Thanks
-			 * to vclock_follow() above, the first row
-			 * in the set will be skipped - but the
-			 * remaining may execute out of order,
-			 * when the following xstream_write()
-			 * yields on WAL. Hence we need a latch to
-			 * strictly order all changes which belong
-			 * to the same server id.
-			 */
-			latch_lock(latch);
 			int res = xstream_write(applier->subscribe_stream, &row);
-			latch_unlock(latch);
 			if (res != 0) {
 				struct error *e = diag_last_error(diag_get());
 				/**
@@ -568,10 +552,14 @@ applier_subscribe(struct applier *applier)
 				    box_error_code(e) == ER_TUPLE_FOUND &&
 				    replication_skip_conflict)
 					diag_clear(diag_get());
-				else
+				else {
+					latch_unlock(latch);
 					diag_raise();
+				}
 			}
 		}
+		latch_unlock(latch);
+
 		if (applier->state == APPLIER_SYNC ||
 		    applier->state == APPLIER_FOLLOW)
 			fiber_cond_signal(&applier->writer_cond);
diff --git a/src/box/wal.c b/src/box/wal.c
index 7b09a032ac..cdcaabc008 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -171,6 +171,8 @@ struct wal_msg {
 	 * be rolled back.
 	 */
 	struct stailq rollback;
+	/** vclock after the batch processed. */
+	struct vclock vclock;
 };
 
 /**
@@ -209,6 +211,7 @@ wal_msg_create(struct wal_msg *batch)
 	batch->approx_len = 0;
 	stailq_create(&batch->commit);
 	stailq_create(&batch->rollback);
+	vclock_create(&batch->vclock);
 }
 
 static struct wal_msg *
@@ -284,6 +287,8 @@ tx_schedule_commit(struct cmsg *msg)
 		/* Closes the input valve. */
 		stailq_concat(&writer->rollback, &batch->rollback);
 	}
+	/* Update the tx vclock to the latest written by wal. */
+	vclock_copy(&replicaset.vclock, &batch->vclock);
 	tx_schedule_queue(&batch->commit);
 }
 
@@ -1023,6 +1028,12 @@ wal_write_to_disk(struct cmsg *msg)
 		error_log(error);
 		diag_clear(diag_get());
 	}
+	/*
+	 * Remember the vclock of the last successfully written row so
+	 * that we can update replicaset.vclock once this message gets
+	 * back to tx.
+	 */
+	vclock_copy(&wal_msg->vclock, &writer->vclock);
 	/*
 	 * We need to start rollback from the first request
 	 * following the last committed request. If
@@ -1154,31 +1165,6 @@ wal_write(struct journal *journal, struct journal_entry *entry)
 	bool cancellable = fiber_set_cancellable(false);
 	fiber_yield(); /* Request was inserted. */
 	fiber_set_cancellable(cancellable);
-	if (entry->res > 0) {
-		struct xrow_header **last = entry->rows + entry->n_rows - 1;
-		while (last >= entry->rows) {
-			/*
-			 * Find last row from local instance id
-			 * and promote vclock.
-			 */
-			if ((*last)->replica_id == instance_id) {
-				/*
-				 * In master-master configuration, during sudden
-				 * power-loss, if the data have not been written
-				 * to WAL but have already been sent to others,
-				 * they will send the data back. In this case
-				 * vclock has already been promoted by applier.
-				 */
-				if (vclock_get(&replicaset.vclock,
-					       instance_id) < (*last)->lsn) {
-					vclock_follow_xrow(&replicaset.vclock,
-							   *last);
-				}
-				break;
-			}
-			--last;
-		}
-	}
 	return entry->res;
 }
 
@@ -1188,12 +1174,7 @@ wal_write_in_wal_mode_none(struct journal *journal,
 {
 	struct wal_writer *writer = (struct wal_writer *) journal;
 	wal_assign_lsn(&writer->vclock, entry->rows, entry->rows + entry->n_rows);
-	int64_t old_lsn = vclock_get(&replicaset.vclock, instance_id);
-	int64_t new_lsn = vclock_get(&writer->vclock, instance_id);
-	if (new_lsn > old_lsn) {
-		/* There were local writes, promote vclock. */
-		vclock_follow(&replicaset.vclock, instance_id, new_lsn);
-	}
+	vclock_copy(&replicaset.vclock, &writer->vclock);
 	return vclock_sum(&writer->vclock);
 }
 
diff --git a/test/replication/skip_conflict_row.result b/test/replication/skip_conflict_row.result
index 6ca13b4724..bcbbbcc34e 100644
--- a/test/replication/skip_conflict_row.result
+++ b/test/replication/skip_conflict_row.result
@@ -82,6 +82,69 @@ box.info.status
 ---
 - running
 ...
+-- gh-2283: test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+---
+- true
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+box.space.test:insert{3}
+---
+- [3]
+...
+lsn1 = box.info.vclock[1]
+---
+...
+test_run:cmd("switch default")
+---
+- true
+...
+box.space.test:insert{3, 3}
+---
+- [3, 3]
+...
+box.space.test:insert{4}
+---
+- [4]
+...
+test_run:cmd("switch replica")
+---
+- true
+...
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+---
+- true
+...
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+box.info.replication[1].upstream.status
+---
+- stopped
+...
+test_run:cmd("switch default")
+---
+- true
+...
+test_run:cmd("restart server replica")
+---
+- true
+...
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+---
+- Duplicate key exists in unique index 'primary' in space 'test'
+...
+test_run:cmd("switch default")
+---
+- true
+...
 -- cleanup
 test_run:cmd("stop server replica")
 ---
diff --git a/test/replication/skip_conflict_row.test.lua b/test/replication/skip_conflict_row.test.lua
index 4406ced957..3a9076b397 100644
--- a/test/replication/skip_conflict_row.test.lua
+++ b/test/replication/skip_conflict_row.test.lua
@@ -28,6 +28,26 @@ box.space.test:select()
 test_run:cmd("switch default")
 box.info.status
 
+-- gh-2283: test that if replication_skip_conflict is off vclock
+-- is not advanced on errors.
+test_run:cmd("restart server replica")
+test_run:cmd("switch replica")
+box.space.test:insert{3}
+lsn1 = box.info.vclock[1]
+test_run:cmd("switch default")
+box.space.test:insert{3, 3}
+box.space.test:insert{4}
+test_run:cmd("switch replica")
+-- lsn is not promoted
+lsn1 == box.info.vclock[1]
+box.info.replication[1].upstream.message
+box.info.replication[1].upstream.status
+test_run:cmd("switch default")
+test_run:cmd("restart server replica")
+-- applier is not in follow state
+box.info.replication[1].upstream.message
+test_run:cmd("switch default")
+
 -- cleanup
 test_run:cmd("stop server replica")
 test_run:cmd("cleanup server replica")
-- 
GitLab