diff --git a/src/box/applier.cc b/src/box/applier.cc index cd16893804a1b3d18a8f7fcd1b7aefa866d09c6f..a7874324ff1480ab24cc4601fc9a06505f858be0 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -893,9 +893,6 @@ static uint64_t applier_read_tx(struct applier *applier, struct stailq *rows, const struct applier_read_ctx *ctx, double timeout); -static int -apply_final_join_tx(uint32_t replica_id, struct stailq *rows); - /** * A helper struct to link xrow objects in a list. */ @@ -1011,23 +1008,8 @@ applier_wait_register(struct applier *applier, uint64_t row_count) next)->row); break; } - if (recovery_state < FINISHED_RECOVERY) { - /* - * Register during recovery means the joining is still - * in progress and is at its final stage. - */ - if (apply_final_join_tx(applier->instance_id, &rows) != 0) - diag_raise(); - } else { - /* - * Register after recovery means the instance is already - * functional, but is becoming non-anonymous or is - * changing its name. Transactions coming from such - * registration are no different than during subscribe. - */ - if (applier_apply_tx(applier, &rows) != 0) - diag_raise(); - } + if (applier_apply_tx(applier, &rows) != 0) + diag_raise(); } return row_count; @@ -1407,8 +1389,7 @@ applier_handle_raft_request(struct applier *applier, struct raft_request *req) } static int -apply_plain_tx(uint32_t replica_id, struct stailq *rows, - bool skip_conflict, bool use_triggers) +apply_plain_tx(uint32_t replica_id, struct stailq *rows) { /* * Explicitly begin the transaction so that we can @@ -1425,7 +1406,7 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows, stailq_foreach_entry(item, rows, next) { struct xrow_header *row = &item->row; int res = apply_request(&item->req.dml); - if (res != 0 && skip_conflict) { + if (res != 0 && replication_skip_conflict) { struct error *e = diag_last_error(diag_get()); /* * In case of ER_TUPLE_FOUND error and enabled @@ -1475,67 +1456,41 @@ apply_plain_tx(uint32_t replica_id, struct stailq *rows, */ if ((item->row.flags & IPROTO_FLAG_WAIT_ACK) != 0) box_txn_make_sync(); - - if (use_triggers) { - /* We are ready to submit txn to wal. */ - struct trigger *on_rollback, *on_wal_write; - size_t size; - on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback), - &size); - on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write), - &size); - if (on_rollback == NULL || on_wal_write == NULL) { - diag_set(OutOfMemory, size, "region_alloc_object", - "on_rollback/on_wal_write"); - goto fail; - } - - struct replica_cb_data *rcb; - rcb = region_alloc_object(&txn->region, typeof(*rcb), &size); - if (rcb == NULL) { - diag_set(OutOfMemory, size, "region_alloc_object", "rcb"); - goto fail; - } - - trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); - txn_on_rollback(txn, on_rollback); - - /* - * We use *last* entry timestamp because ack comes up to - * last entry in transaction. Same time this shows more - * precise result because we're interested in how long - * transaction traversed network + remote WAL bundle before - * ack get received. - */ - rcb->replica_id = replica_id; - rcb->txn_last_tm = item->row.tm; - - trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL); - txn_on_wal_write(txn, on_wal_write); + size_t size; + struct trigger *on_rollback, *on_wal_write; + on_rollback = region_alloc_object(&txn->region, typeof(*on_rollback), + &size); + on_wal_write = region_alloc_object(&txn->region, typeof(*on_wal_write), + &size); + if (on_rollback == NULL || on_wal_write == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", + "on_rollback/on_wal_write"); + goto fail; } - + struct replica_cb_data *rcb; + rcb = region_alloc_object(&txn->region, typeof(*rcb), &size); + if (rcb == NULL) { + diag_set(OutOfMemory, size, "region_alloc_object", "rcb"); + goto fail; + } + trigger_create(on_rollback, applier_txn_rollback_cb, NULL, NULL); + txn_on_rollback(txn, on_rollback); + /* + * We use *last* entry timestamp because ack comes up to last entry in + * transaction. Same time this shows more precise result because we're + * interested in how long transaction traversed network + remote WAL + * bundle before ack get received. + */ + rcb->replica_id = replica_id; + rcb->txn_last_tm = item->row.tm; + trigger_create(on_wal_write, applier_txn_wal_write_cb, rcb, NULL); + txn_on_wal_write(txn, on_wal_write); return txn_commit_try_async(txn); fail: txn_abort(txn); return -1; } -/** A simpler version of applier_apply_tx() for final join stage. */ -static int -apply_final_join_tx(uint32_t replica_id, struct stailq *rows) -{ - struct applier_tx_row *txr = - stailq_first_entry(rows, struct applier_tx_row, next); - int rc = 0; - if (unlikely(iproto_type_is_synchro_request(txr->row.type))) { - rc = apply_synchro_req(replica_id, &txr->row, - &txr->req.synchro); - } else { - rc = apply_plain_tx(replica_id, rows, false, false); - } - return rc; -} - /** * We must filter out synchronous rows coming from an instance that fell behind * the current synchro queue owner. This includes both synchronous tx rows and @@ -1723,8 +1678,7 @@ applier_apply_tx(struct applier *applier, struct stailq *rows) rc = apply_synchro_req(applier->instance_id, &txr->row, &txr->req.synchro); } else { - rc = apply_plain_tx(applier->instance_id, rows, - replication_skip_conflict, true); + rc = apply_plain_tx(applier->instance_id, rows); } if (rc != 0) goto finish; diff --git a/src/box/box.cc b/src/box/box.cc index 2141c50bc5036a9c97128e64a6c226e48b1a0af5..4b79ca6f981cbbb9efd497fa2e8734af6748331f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -5559,7 +5559,6 @@ box_cfg_xc(void) if (box_set_replication_synchro_timeout() != 0) diag_raise(); box_set_replication_sync_timeout(); - box_set_replication_skip_conflict(); if (box_check_instance_name(cfg_instance_name) != 0) diag_raise(); if (box_set_wal_queue_max_size() != 0) @@ -5605,6 +5604,11 @@ box_cfg_xc(void) /* Bootstrap a new instance */ bootstrap(&is_bootstrap_leader); } + /* + * During bootstrap from a remote master try not to ignore the + * conflicts, neither during snapshot fetch, not join. + */ + box_set_replication_skip_conflict(); replicaset_state = REPLICASET_READY; /* diff --git a/test/xlog/panic_on_broken_lsn.result b/test/xlog/panic_on_broken_lsn.result index 540f562e057a424ae74c7bd81eb644918f8b25b9..7976d3cc596320b40066cb6d8a934d01e2f27be0 100644 --- a/test/xlog/panic_on_broken_lsn.result +++ b/test/xlog/panic_on_broken_lsn.result @@ -1,5 +1,4 @@ -- Issue 3105: Test logging of request with broken lsn before panicking --- Two cases are covered: Recovery and Joining a new replica env = require('test_run') --- ... @@ -89,98 +88,3 @@ test_run:cmd('delete server panic') --- - true ... --- Testing case of panic on joining a new replica -box.schema.user.grant('guest', 'replication') ---- -... -_ = box.schema.space.create('test', {id = 9000}) ---- -... -_ = box.space.test:create_index('pk') ---- -... -box.space.test:auto_increment{'v0'} ---- -- [1, 'v0'] -... --- Inject a broken LSN in the final join stage. -lsn = -1 ---- -... -box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true) ---- -- ok -... -fiber = require('fiber') ---- -... --- Asynchronously run a function that will: --- 1. Wait for the replica join to start. --- 2. Make sure that the record about the new replica written to --- the _cluster space hits the WAL by writing a row to the test --- space. This is important, because at the next step we need --- to compute the LSN of the row that is going to be written to --- the WAL next so we don't want to race with in-progress WAL --- writes. --- 3. Inject an error into replication of the next WAL row and write --- a row to the test space. This row should break replication. --- 4. Resume the replica join. -test_run:cmd("setopt delimiter ';'") ---- -- true -... -_ = fiber.create(function() - test_run:wait_cond(function() return box.info.replication[2] ~= nil end) - box.space.test:auto_increment{'v1'} - lsn = box.info.vclock[1] - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1) - box.space.test:auto_increment{'v2'} - box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false) -end); ---- -... -test_run:cmd("setopt delimiter ''"); ---- -- true -... -test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"') ---- -- true -... -test_run:cmd('start server replica with crash_expected=True') ---- -- false -... -box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) ---- -- ok -... --- Check that log contains the mention of broken LSN and the request printout -filename = fio.pathjoin(fio.cwd(), 'replica.log') ---- -... -str = string.format("LSN for 1 is used twice or COMMIT order is broken: confirmed: %d, new: %d, req: .*", lsn, lsn) ---- -... -found = test_run:grep_log(nil, str, 256, {filename = filename}) ---- -... -(found:gsub('^.*, req: ', ''):gsub('lsn: %d+', 'lsn: <lsn>')) ---- -- '{type: ''INSERT'', replica_id: 1, lsn: <lsn>, space_id: 9000, index_id: 0, tuple: - [3, "v2"]}' -... -test_run:cmd('cleanup server replica') ---- -- true -... -test_run:cmd('delete server replica') ---- -- true -... -box.space.test:drop() ---- -... -box.schema.user.revoke('guest', 'replication') ---- -... diff --git a/test/xlog/panic_on_broken_lsn.test.lua b/test/xlog/panic_on_broken_lsn.test.lua index 811303405d43eedb433285c11b1e29aee06b2d02..e949288e7a28eb47cee51b6aecd2969c0e216e0a 100644 --- a/test/xlog/panic_on_broken_lsn.test.lua +++ b/test/xlog/panic_on_broken_lsn.test.lua @@ -1,5 +1,4 @@ -- Issue 3105: Test logging of request with broken lsn before panicking --- Two cases are covered: Recovery and Joining a new replica env = require('test_run') test_run = env.new() test_run:cleanup_cluster() @@ -34,52 +33,3 @@ found = test_run:grep_log(nil, str, 256, {filename = filename}) test_run:cmd('cleanup server panic') test_run:cmd('delete server panic') - --- Testing case of panic on joining a new replica -box.schema.user.grant('guest', 'replication') -_ = box.schema.space.create('test', {id = 9000}) -_ = box.space.test:create_index('pk') -box.space.test:auto_increment{'v0'} - --- Inject a broken LSN in the final join stage. -lsn = -1 -box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", true) - -fiber = require('fiber') --- Asynchronously run a function that will: --- 1. Wait for the replica join to start. --- 2. Make sure that the record about the new replica written to --- the _cluster space hits the WAL by writing a row to the test --- space. This is important, because at the next step we need --- to compute the LSN of the row that is going to be written to --- the WAL next so we don't want to race with in-progress WAL --- writes. --- 3. Inject an error into replication of the next WAL row and write --- a row to the test space. This row should break replication. --- 4. Resume the replica join. -test_run:cmd("setopt delimiter ';'") -_ = fiber.create(function() - test_run:wait_cond(function() return box.info.replication[2] ~= nil end) - box.space.test:auto_increment{'v1'} - lsn = box.info.vclock[1] - box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", lsn + 1) - box.space.test:auto_increment{'v2'} - box.error.injection.set("ERRINJ_REPLICA_JOIN_DELAY", false) -end); -test_run:cmd("setopt delimiter ''"); - -test_run:cmd('create server replica with rpl_master=default, script="xlog/replica.lua"') -test_run:cmd('start server replica with crash_expected=True') -box.error.injection.set("ERRINJ_RELAY_BREAK_LSN", -1) - --- Check that log contains the mention of broken LSN and the request printout -filename = fio.pathjoin(fio.cwd(), 'replica.log') -str = string.format("LSN for 1 is used twice or COMMIT order is broken: confirmed: %d, new: %d, req: .*", lsn, lsn) -found = test_run:grep_log(nil, str, 256, {filename = filename}) -(found:gsub('^.*, req: ', ''):gsub('lsn: %d+', 'lsn: <lsn>')) - -test_run:cmd('cleanup server replica') -test_run:cmd('delete server replica') - -box.space.test:drop() -box.schema.user.revoke('guest', 'replication')