From da158b9bf3c09915bb5367a13400165624395422 Mon Sep 17 00:00:00 2001
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date: Fri, 14 Jun 2024 19:14:21 +0200
Subject: [PATCH] applier: drop apply_final_join_tx

Can use the regular applier_apply_tx(), they do the same. The
latter is just more protective, but doesn't matter much in this
case if the code does a few latch locks.

The patch also drops an old test about double-received row panic
during final join. The logic is that absolutely the same situation
could happen during subscribe, but it was always filtered out by
checking replicaset.applier.vclock and skipping duplicate rows.

There doesn't seem to be a reason why final join must be any
different. It is, after all, same subscribe logic but the received
rows go into replica's initial snapshot instead of xlogs. Now it
even uses the same txn processing function applier_apply_tx().

The patch also moves `replication_skip_conflict` option setting
after bootstrap is finished. In theory, final join could deliver
a conflicting row and it must not be ignored. The problem is that
it can't be reproduced anyhow without illegal error injection
(which would corrupt something in an unrealistic way). But lets
anyway move it below bootstrap for clarity.

Follow-up #10113

NO_DOC=refactoring
NO_CHANGELOG=refactoring
---
 src/box/applier.cc                     | 112 ++++++++-----------------
 src/box/box.cc                         |   6 +-
 test/xlog/panic_on_broken_lsn.result   |  96 ---------------------
 test/xlog/panic_on_broken_lsn.test.lua |  50 -----------
 4 files changed, 38 insertions(+), 226 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index cd16893804..a7874324ff 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -893,9 +893,6 @@ static uint64_t
 applier_read_tx(struct applier *applier, struct stailq *rows,
 		const struct applier_read_ctx *ctx, double timeout);
 
-static int
-apply_final_join_tx(uint32_t replica_id, struct stailq *rows);
-
 /**
  * A helper struct to link xrow objects in a list.
  */
@@ -1011,23 +1008,8 @@ applier_wait_register(struct applier *applier, uint64_t row_count)
 						  next)->row);
 			break;
 		}
-		if (recovery_state < FINISHED_RECOVERY) {
-			/*
-			 * Register during recovery means the joining is still
-			 * in progress and is at its final stage.
-			 */
-			if (apply_final_join_tx(applier->instance_id, &rows) != 0)
-				diag_raise();
-		} else {
-			/*
-			 * Register after recovery means the instance is already
-			 * functional, but is becoming non-anonymous or is
-			 * changing its name. Transactions coming from such
-			 * registration are no different than during subscribe.
-			 */
-			if (applier_apply_tx(applier, &rows) != 0)
-				diag_raise();
-		}
+		if (applier_apply_tx(applier, &rows) != 0)
+			diag_raise();
 	}
 
 	return row_count;
@@ -1407,8 +1389,7 @@ applier_handle_raft_request(struct applier *applier, struct raft_request *req)
 }
 
 static int
-apply_plain_tx(uint32_t replica_id, struct stailq *rows,
-	       bool skip_conflict, bool use_triggers)
+apply_plain_tx(uint32_t replica_id, struct stailq *rows)
 {
 	/*
 	 * Explicitly begin the transaction so that we can
@@ -1425,7 +1406,7 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows,
 	stailq_foreach_entry(item, rows, next) {
 		struct xrow_header *row = &item->row;
 		int res = apply_request(&item->req.dml);
-		if (res != 0 && skip_conflict) {
+		if (res != 0 && replication_skip_conflict) {
 			struct error *e = diag_last_error(diag_get());
 			/*
 			 * In case of ER_TUPLE_FOUND error and enabled
@@ -1475,67 +1456,41 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows,
 	 */
 	if ((item->row.flags & IPROTO_FLAG_WAIT_ACK) != 0)
 		box_txn_make_sync();
-
-	if (use_triggers) {
-		/* We are ready to submit txn to wal. */
-		struct trigger *on_rollback, *on_wal_write;
-		size_t size;
-		on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
-						  &size);
-		on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
-						   &size);
-		if (on_rollback == NULL || on_wal_write == NULL) {
-			diag_set(OutOfMemory, size, "region_alloc_object",
-				 "on_rollback/on_wal_write");
-			goto fail;
-		}
-
-		struct replica_cb_data *rcb;
-		rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
-		if (rcb == NULL) {
-			diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
-			goto fail;
-		}
-
-		trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
-		txn_on_rollback(txn, on_rollback);
-
-		/*
-		 * We use *last* entry timestamp because ack comes up to
-		 * last entry in transaction. Same time this shows more
-		 * precise result because we're interested in how long
-		 * transaction traversed network + remote WAL bundle before
-		 * ack get received.
-		 */
-		rcb->replica_id = replica_id;
-		rcb->txn_last_tm = item->row.tm;
-
-		trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
-		txn_on_wal_write(txn, on_wal_write);
+	size_t size;
+	struct trigger *on_rollback, *on_wal_write;
+	on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback),
+					  &size);
+	on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write),
+					   &size);
+	if (on_rollback == NULL || on_wal_write == NULL) {
+		diag_set(OutOfMemory, size, "region_alloc_object",
+			 "on_rollback/on_wal_write");
+		goto fail;
 	}
-
+	struct replica_cb_data *rcb;
+	rcb = region_alloc_object(&txn->region, typeof(*rcb), &size);
+	if (rcb == NULL) {
+		diag_set(OutOfMemory, size, "region_alloc_object", "rcb");
+		goto fail;
+	}
+	trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL);
+	txn_on_rollback(txn, on_rollback);
+	/*
+	 * We use *last* entry timestamp because ack comes up to last entry in
+	 * transaction. Same time this shows more precise result because we're
+	 * interested in how long transaction traversed network + remote WAL
+	 * bundle before ack get received.
+	 */
+	rcb->replica_id = replica_id;
+	rcb->txn_last_tm = item->row.tm;
+	trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL);
+	txn_on_wal_write(txn, on_wal_write);
 	return txn_commit_try_async(txn);
 fail:
 	txn_abort(txn);
 	return -1;
 }
 
-/** A simpler version of applier_apply_tx() for final join stage. */
-static int
-apply_final_join_tx(uint32_t replica_id, struct stailq *rows)
-{
-	struct applier_tx_row *txr =
-		stailq_first_entry(rows, struct applier_tx_row, next);
-	int rc = 0;
-	if (unlikely(iproto_type_is_synchro_request(txr->row.type))) {
-		rc = apply_synchro_req(replica_id, &txr->row,
-				       &txr->req.synchro);
-	} else {
-		rc = apply_plain_tx(replica_id, rows, false, false);
-	}
-	return rc;
-}
-
 /**
  * We must filter out synchronous rows coming from an instance that fell behind
  * the current synchro queue owner. This includes both synchronous tx rows and
@@ -1723,8 +1678,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows)
 		rc = apply_synchro_req(applier->instance_id, &txr->row,
 				       &txr->req.synchro);
 	} else {
-		rc = apply_plain_tx(applier->instance_id, rows,
-				    replication_skip_conflict, true);
+		rc = apply_plain_tx(applier->instance_id, rows);
 	}
 	if (rc != 0)
 		goto finish;
diff --git a/src/box/box.cc b/src/box/box.cc
index 2141c50bc5..4b79ca6f98 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -5559,7 +5559,6 @@ box_cfg_xc(void)
 	if (box_set_replication_synchro_timeout() != 0)
 		diag_raise();
 	box_set_replication_sync_timeout();
-	box_set_replication_skip_conflict();
 	if (box_check_instance_name(cfg_instance_name) != 0)
 		diag_raise();
 	if (box_set_wal_queue_max_size() != 0)
@@ -5605,6 +5604,11 @@ box_cfg_xc(void)
 		/* Bootstrap a new instance */
 		bootstrap(&is_bootstrap_leader);
 	}
+	/*
+	 * During bootstrap from a remote master try not to ignore the
+	 * conflicts, neither during snapshot fetch, not join.
+	 */
+	box_set_replication_skip_conflict();
 	replicaset_state = REPLICASET_READY;
 
 	/*
diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result
index 540f562e05..7976d3cc59 100644
--- a/test/xlog/panic_on_broken_lsn.result
+++ b/test/xlog/panic_on_broken_lsn.result
@@ -1,5 +1,4 @@
 -- Issue 3105: Test logging of request with broken lsn before panicking
--- Two cases are covered: Recovery and Joining a new replica
 env = require('test_run')
 ---
 ...
@@ -89,98 +88,3 @@ test_run:cmd('delete server panic')
 ---
 - true
 ...
--- Testing case of panic on joining a new replica
-box.schema.user.grant('guest', 'replication')
----
-...
-_ = box.schema.space.create('test', {id = 9000})
----
-...
-_ = box.space.test:create_index('pk')
----
-...
-box.space.test:auto_increment{'v0'}
----
-- [1, 'v0']
-...
--- Inject a broken LSN in the final join stage.
-lsn = -1
----
-...
-box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
----
-- ok
-...
-fiber = require('fiber')
----
-...
--- Asynchronously run a function that will:
--- 1. Wait for the replica join to start.
--- 2. Make sure that the record about the new replica written to
---    the _cluster space hits the WAL by writing a row to the test
---    space. This is important, because at the next step we need
---    to compute the LSN of the row that is going to be written to
---    the WAL next so we don't want to race with in-progress WAL
---    writes.
--- 3. Inject an error into replication of the next WAL row and write
---    a row to the test space. This row should break replication.
--- 4. Resume the replica join.
-test_run:cmd("setopt delimiter ';'")
----
-- true
-...
-_ = fiber.create(function()
-    test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
-    box.space.test:auto_increment{'v1'}
-    lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
-    box.space.test:auto_increment{'v2'}
-    box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
-end);
----
-...
-test_run:cmd("setopt delimiter ''");
----
-- true
-...
-test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
----
-- true
-...
-test_run:cmd('start server replica with crash_expected=True')
----
-- false
-...
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
----
-- ok
-...
--- Check that log contains the mention of broken LSN and the request printout
-filename = fio.pathjoin(fio.cwd(), 'replica.log')
----
-...
-str = string.format("LSN for 1 is used twice or COMMIT order is broken: confirmed: %d, new: %d, req: .*", lsn, lsn)
----
-...
-found = test_run:grep_log(nil, str, 256, {filename = filename})
----
-...
-(found:gsub('^.*, req: ', ''):gsub('lsn: %d+', 'lsn: <lsn>'))
----
-- '{type: ''INSERT'', replica_id: 1, lsn: <lsn>, space_id: 9000, index_id: 0, tuple:
-  [3, "v2"]}'
-...
-test_run:cmd('cleanup server replica')
----
-- true
-...
-test_run:cmd('delete server replica')
----
-- true
-...
-box.space.test:drop()
----
-...
-box.schema.user.revoke('guest', 'replication')
----
-...
diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua
index 811303405d..e949288e7a 100644
--- a/test/xlog/panic_on_broken_lsn.test.lua
+++ b/test/xlog/panic_on_broken_lsn.test.lua
@@ -1,5 +1,4 @@
 -- Issue 3105: Test logging of request with broken lsn before panicking
--- Two cases are covered: Recovery and Joining a new replica
 env = require('test_run')
 test_run = env.new()
 test_run:cleanup_cluster()
@@ -34,52 +33,3 @@ found = test_run:grep_log(nil, str, 256, {filename = filename})
 
 test_run:cmd('cleanup server panic')
 test_run:cmd('delete server panic')
-
--- Testing case of panic on joining a new replica
-box.schema.user.grant('guest', 'replication')
-_ = box.schema.space.create('test', {id = 9000})
-_ = box.space.test:create_index('pk')
-box.space.test:auto_increment{'v0'}
-
--- Inject a broken LSN in the final join stage.
-lsn = -1
-box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true)
-
-fiber = require('fiber')
--- Asynchronously run a function that will:
--- 1. Wait for the replica join to start.
--- 2. Make sure that the record about the new replica written to
---    the _cluster space hits the WAL by writing a row to the test
---    space. This is important, because at the next step we need
---    to compute the LSN of the row that is going to be written to
---    the WAL next so we don't want to race with in-progress WAL
---    writes.
--- 3. Inject an error into replication of the next WAL row and write
---    a row to the test space. This row should break replication.
--- 4. Resume the replica join.
-test_run:cmd("setopt delimiter ';'")
-_ = fiber.create(function()
-    test_run:wait_cond(function() return box.info.replication[2] ~= nil end)
-    box.space.test:auto_increment{'v1'}
-    lsn = box.info.vclock[1]
-    box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1)
-    box.space.test:auto_increment{'v2'}
-    box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false)
-end);
-test_run:cmd("setopt delimiter ''");
-
-test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"')
-test_run:cmd('start server replica with crash_expected=True')
-box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1)
-
--- Check that log contains the mention of broken LSN and the request printout
-filename = fio.pathjoin(fio.cwd(), 'replica.log')
-str = string.format("LSN for 1 is used twice or COMMIT order is broken: confirmed: %d, new: %d, req: .*", lsn, lsn)
-found = test_run:grep_log(nil, str, 256, {filename = filename})
-(found:gsub('^.*, req: ', ''):gsub('lsn: %d+', 'lsn: <lsn>'))
-
-test_run:cmd('cleanup server replica')
-test_run:cmd('delete server replica')
-
-box.space.test:drop()
-box.schema.user.revoke('guest', 'replication')
-- 
GitLab