From c061893aba1c10bb679b4415cbe65bdf6ef2ba54 Mon Sep 17 00:00:00 2001
From: Nikita Zheleztsov <n.zheleztsov@proton.me>
Date: Wed, 19 Jul 2023 13:08:16 +0300
Subject: [PATCH] relay: fix unterminated final_join thread

Currently if tarantool exits during relay's final join stage,
corresponding thread isn't terminated. This causes the flakiness
of the replicaset_ro_mostly.test.lua.

Let's reuse the same relay, in which subscribe cord is running, for
the final join stage. This way the cord will be cancelled during
replication_free().

Closes #8082

NO_DOC=not user-visible
NO_TEST=fix flaky test
NO_CHANGELOG=not user-visible

Co-authored-by: Sergey Petrenko <sergepetrenko@tarantool.org>

(cherry picked from commit 70a6883663a84becd5291322229afaf8592a7e9d)
---
 src/box/box.cc                              | 12 +++++++++--
 src/box/relay.cc                            | 23 ++++++++++++++-------
 src/box/relay.h                             |  2 +-
 test/replication/anon_register_gap.result   |  4 ++--
 test/replication/anon_register_gap.test.lua |  2 +-
 5 files changed, 29 insertions(+), 14 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index ad280609be..9963b1a2d3 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -3831,6 +3831,13 @@ box_process_register(struct iostream *io, const struct xrow_header *header)
 			  "registration of non-anonymous nodes.");
 	}
 
+	/* Don't allow multiple relays for the same replica */
+	if (replica != NULL &&
+	    relay_get_state(replica->relay) == RELAY_FOLLOW) {
+		tnt_raise(ClientError, ER_CFG, "replication",
+			  "duplicate connection with the same replica UUID");
+	}
+
 	/* See box_process_join() */
 	box_check_writable_xc();
 	struct space *space = space_cache_find_xc(BOX_CLUSTER_ID);
@@ -3873,7 +3880,7 @@ box_process_register(struct iostream *io, const struct xrow_header *header)
 	 * (req.vclock, stop_vclock) so that it gets its
 	 * registration.
 	 */
-	relay_final_join(io, header->sync, &req.vclock, &stop_vclock);
+	relay_final_join(replica, io, header->sync, &req.vclock, &stop_vclock);
 	say_info("final data sent.");
 
 	RegionGuard region_guard(&fiber()->gc);
@@ -4028,7 +4035,8 @@ box_process_join(struct iostream *io, const struct xrow_header *header)
 	 * Final stage: feed replica with WALs in range
 	 * (start_vclock, stop_vclock).
 	 */
-	relay_final_join(io, header->sync, &start_vclock, &stop_vclock);
+	relay_final_join(replica, io, header->sync, &start_vclock,
+			 &stop_vclock);
 	say_info("final data sent.");
 
 	/* Send end of WAL stream marker */
diff --git a/src/box/relay.cc b/src/box/relay.cc
index a231018d60..9f83bfecdd 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -526,17 +526,21 @@ relay_final_join_f(va_list ap)
 }
 
 void
-relay_final_join(struct iostream *io, uint64_t sync,
+relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync,
 		 struct vclock *start_vclock, struct vclock *stop_vclock)
 {
-	struct relay *relay = relay_new(NULL);
-	if (relay == NULL)
-		diag_raise();
+	/*
+	 * As a new thread is started for the final join stage, its cancellation
+	 * should be handled properly during an unexpected shutdown, so, we
+	 * reuse the subscribe relay in order to cancel the final join thread
+	 * during replication_free().
+	 */
+	struct relay *relay = replica->relay;
+	assert(relay->state != RELAY_FOLLOW);
 
 	relay_start(relay, io, sync, relay_send_row, relay_yield, UINT64_MAX);
 	auto relay_guard = make_scoped_guard([=] {
 		relay_stop(relay);
-		relay_delete(relay);
 	});
 	/*
 	 * Save the first vclock as 'received'. Because it was really received.
@@ -1211,8 +1215,12 @@ static void
 relay_send_row(struct xstream *stream, struct xrow_header *packet)
 {
 	struct relay *relay = container_of(stream, struct relay, stream);
+	assert(cord() == &relay->cord);
+	assert(fiber()->f == relay_subscribe_f ||
+	       fiber()->f == relay_final_join_f);
+	bool is_subscribe = fiber()->f == relay_subscribe_f;
 	/* Do not send heartbeats during a final join. */
-	if (relay->replica != NULL)
+	if (is_subscribe)
 		relay_send_heartbeat_on_timeout(relay);
 	if (packet->group_id == GROUP_LOCAL) {
 		/*
@@ -1247,8 +1255,7 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet)
 	 * it). In the latter case packet's LSN is less than or equal to
 	 * local master's LSN at the moment it received 'SUBSCRIBE' request.
 	 */
-	if (relay->replica == NULL ||
-	    packet->replica_id != relay->replica->id ||
+	if (!is_subscribe || packet->replica_id != relay->replica->id ||
 	    packet->lsn <= vclock_get(&relay->local_vclock_at_subscribe,
 				      packet->replica_id)) {
 		struct errinj *inj = errinj(ERRINJ_RELAY_BREAK_LSN,
diff --git a/src/box/relay.h b/src/box/relay.h
index 733d6bc310..5bbc87e46f 100644
--- a/src/box/relay.h
+++ b/src/box/relay.h
@@ -138,7 +138,7 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock,
  * @param sync      sync from incoming JOIN request
  */
 void
-relay_final_join(struct iostream *io, uint64_t sync,
+relay_final_join(struct replica *replica, struct iostream *io, uint64_t sync,
 		 struct vclock *start_vclock, struct vclock *stop_vclock);
 
 /**
diff --git a/test/replication/anon_register_gap.result b/test/replication/anon_register_gap.result
index 24a3548c8b..854b4a1032 100644
--- a/test/replication/anon_register_gap.result
+++ b/test/replication/anon_register_gap.result
@@ -85,9 +85,9 @@ test_run:switch('replica')
 test_run:wait_lsn('replica', 'default')
  | ---
  | ...
-f:status()
+test_run:wait_cond(function() return f:status() == 'dead' end)
  | ---
- | - dead
+ | - true
  | ...
 box.space.test:select{}
  | ---
diff --git a/test/replication/anon_register_gap.test.lua b/test/replication/anon_register_gap.test.lua
index c71576a239..ff5ed06e37 100644
--- a/test/replication/anon_register_gap.test.lua
+++ b/test/replication/anon_register_gap.test.lua
@@ -32,7 +32,7 @@ box.space.test:insert{2}
 
 test_run:switch('replica')
 test_run:wait_lsn('replica', 'default')
-f:status()
+test_run:wait_cond(function() return f:status() == 'dead' end)
 box.space.test:select{}
 
 -- Cleanup
-- 
GitLab