From a0e615000c536e3e2853f95018b2d29436320551 Mon Sep 17 00:00:00 2001
From: Ilya Kosarev <i.kosarev@tarantool.org>
Date: Tue, 24 Dec 2019 18:00:20 +0300
Subject: [PATCH] relay: fix vclock obtainment on join

In case of high load vclock used to join replica could be in advance
comparing to an actual WAL. Therefore replica could have missed some
tuples from master. In order to fix this wal_sync is updated so that
now we can obtain up to date vclock on the flushed state using it.

Prerequisites #4160
---
 src/box/relay.cc |  7 +++----
 src/box/vinyl.c  |  4 ++--
 src/box/wal.c    | 23 +++++++++++++++++------
 src/box/wal.h    |  4 +++-
 4 files changed, 25 insertions(+), 13 deletions(-)

diff --git a/src/box/relay.cc b/src/box/relay.cc
index 7ffc79285f..b89632273f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -307,13 +307,12 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock)
 
 	/*
 	 * Sync WAL to make sure that all changes visible from
-	 * the frozen read view are successfully committed.
+	 * the frozen read view are successfully committed and
+	 * obtain corresponding vclock.
 	 */
-	if (wal_sync() != 0)
+	if (wal_sync(vclock) != 0)
 		diag_raise();
 
-	vclock_copy(vclock, &replicaset.vclock);
-
 	/* Respond to the JOIN request with the current vclock. */
 	struct xrow_header row;
 	xrow_encode_vclock_xc(&row, vclock);
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 15a136f810..5f169f09b2 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -1087,7 +1087,7 @@ vinyl_space_check_format(struct space *space, struct tuple_format *format)
 	 */
 	int rc;
 	if (need_wal_sync) {
-		rc = wal_sync();
+		rc = wal_sync(NULL);
 		if (rc != 0)
 			goto out;
 	}
@@ -4180,7 +4180,7 @@ vinyl_space_build_index(struct space *src_space, struct index *new_index,
 	 */
 	int rc;
 	if (need_wal_sync) {
-		rc = wal_sync();
+		rc = wal_sync(NULL);
 		if (rc != 0)
 			goto out;
 	}
diff --git a/src/box/wal.c b/src/box/wal.c
index 2b238b7432..0ae66ff32c 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -519,21 +519,27 @@ wal_free(void)
 	wal_writer_destroy(writer);
 }
 
+struct wal_vclock_msg {
+    struct cbus_call_msg base;
+    struct vclock vclock;
+};
+
 static int
-wal_sync_f(struct cbus_call_msg *msg)
+wal_sync_f(struct cbus_call_msg *data)
 {
-	(void)msg;
+	struct wal_vclock_msg *msg = (struct wal_vclock_msg *) data;
 	struct wal_writer *writer = &wal_writer_singleton;
 	if (writer->in_rollback.route != NULL) {
 		/* We're rolling back a failed write. */
 		diag_set(ClientError, ER_WAL_IO);
 		return -1;
 	}
+	vclock_copy(&msg->vclock, &writer->vclock);
 	return 0;
 }
 
 int
-wal_sync(void)
+wal_sync(struct vclock *vclock)
 {
 	ERROR_INJECT(ERRINJ_WAL_SYNC, {
 		diag_set(ClientError, ER_INJECTION, "wal sync");
@@ -541,18 +547,23 @@ wal_sync(void)
 	});
 
 	struct wal_writer *writer = &wal_writer_singleton;
-	if (writer->wal_mode == WAL_NONE)
+	if (writer->wal_mode == WAL_NONE) {
+		if (vclock != NULL)
+			vclock_copy(vclock, &writer->vclock);
 		return 0;
+	}
 	if (!stailq_empty(&writer->rollback)) {
 		/* We're rolling back a failed write. */
 		diag_set(ClientError, ER_WAL_IO);
 		return -1;
 	}
 	bool cancellable = fiber_set_cancellable(false);
-	struct cbus_call_msg msg;
+	struct wal_vclock_msg msg;
 	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
-			   &msg, wal_sync_f, NULL, TIMEOUT_INFINITY);
+			   &msg.base, wal_sync_f, NULL, TIMEOUT_INFINITY);
 	fiber_set_cancellable(cancellable);
+	if (vclock != NULL)
+		vclock_copy(vclock, &msg.vclock);
 	return rc;
 }
 
diff --git a/src/box/wal.h b/src/box/wal.h
index b76b0a41f9..76b44941a7 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -182,9 +182,11 @@ wal_mode();
 /**
  * Wait until all submitted writes are successfully flushed
  * to disk. Returns 0 on success, -1 if write failed.
+ * Corresponding vclock is returned in @a vclock unless it is
+ * NULL.
  */
 int
-wal_sync(void);
+wal_sync(struct vclock *vclock);
 
 struct wal_checkpoint {
 	struct cbus_call_msg base;
-- 
GitLab