diff --git a/src/box/box.cc b/src/box/box.cc index 51ac718c7a2bee7083317881f5b985bd045066a5..bf1a72ecccb7368f575628568d9c44a6b82751ac 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -3683,6 +3683,17 @@ tx_prio_cb(struct ev_loop *loop, ev_watcher *watcher, int events) (void) loop; (void) events; struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data; +#ifndef NDEBUG + /* + * The sleep is legal because it is not a fiber sleep. It puts the + * entire thread to sleep to simulate it being slow. It can happen in + * reality if the thread somewhy isn't scheduled for too long. + */ + struct errinj *inj = errinj(ERRINJ_TX_DELAY_PRIO_ENDPOINT, + ERRINJ_DOUBLE); + if (inj->dparam != 0) + usleep(inj->dparam * 1000000); +#endif cbus_process(endpoint); } diff --git a/src/box/txn.c b/src/box/txn.c index f5060c69ebe79a506f5a09eb6ea9b15d2b71f3fe..2087567b1347771e066e19c5fdb9dcf0908f82ef 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -443,6 +443,7 @@ txn_new(void) rlist_create(&txn->in_all_txs); txn->space_on_replace_triggers_depth = 0; txn->acquired_region_used = 0; + txn->limbo_entry = NULL; return txn; } @@ -452,6 +453,7 @@ txn_new(void) inline static void txn_free(struct txn *txn) { + assert(txn->limbo_entry == NULL); if (txn->rollback_timer != NULL) ev_timer_stop(loop(), txn->rollback_timer); memtx_tx_clean_txn(txn); @@ -749,6 +751,11 @@ txn_complete_fail(struct txn *txn) assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(txn->signature < 0); assert(txn->signature != TXN_SIGNATURE_UNKNOWN); + if (txn->limbo_entry != NULL) { + assert(txn_has_flag(txn, TXN_WAIT_SYNC)); + txn_limbo_abort(&txn_limbo, txn->limbo_entry); + txn->limbo_entry = NULL; + } txn->status = TXN_ABORTED; struct txn_stmt *stmt; stailq_reverse(&txn->stmts); @@ -839,10 +846,22 @@ txn_on_journal_write(struct journal_entry *entry) } if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) txn_run_wal_write_triggers(txn); - if (!txn_has_flag(txn, TXN_WAIT_SYNC)) + if (!txn_has_flag(txn, TXN_WAIT_SYNC)) { txn_complete_success(txn); - else if (txn->fiber != NULL) - fiber_wakeup(txn->fiber); + } else { + int64_t lsn; + /* + * XXX: that is quite ugly. Need a more reliable way to get the + * synchro LSN. + */ + if (txn->n_applier_rows > 0) + lsn = entry->rows[txn->n_applier_rows - 1]->lsn; + else + lsn = entry->rows[entry->n_rows - 1]->lsn; + txn_limbo_assign_lsn(&txn_limbo, txn->limbo_entry, lsn); + if (txn->fiber != NULL) + fiber_wakeup(txn->fiber); + } finish: fiber_set_txn(fiber(), NULL); fiber_set_user(fiber(), orig_creds); @@ -1045,23 +1064,6 @@ txn_commit_nop(struct txn *txn) return false; } -/* - * A trigger called on tx rollback due to a failed WAL write, - * when tx is waiting for confirmation. - */ -static int -txn_limbo_on_rollback(struct trigger *trig, void *event) -{ - (void) event; - struct txn *txn = (struct txn *) event; - /* Check whether limbo has performed the cleanup. */ - if (!txn_has_flag(txn, TXN_WAIT_SYNC)) - return 0; - struct txn_limbo_entry *entry = (struct txn_limbo_entry *) trig->data; - txn_limbo_abort(&txn_limbo, entry); - return 0; -} - int txn_commit_try_async(struct txn *txn) { @@ -1084,47 +1086,12 @@ txn_commit_try_async(struct txn *txn) goto rollback; bool is_sync = txn_has_flag(txn, TXN_WAIT_SYNC); - struct txn_limbo_entry *limbo_entry; if (is_sync) { - /* - * We'll need this trigger for sync transactions later, - * but allocation failure is inappropriate after the entry - * is sent to journal, so allocate early. - */ - size_t size; - struct trigger *trig = - tx_region_alloc_object(txn, TX_OBJECT_TRIGGER, &size); - if (trig == NULL) { - diag_set(OutOfMemory, size, "tx_region_alloc_object", - "trig"); - goto rollback; - } - /* See txn_commit(). */ uint32_t origin_id = req->rows[0]->replica_id; - limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); - if (limbo_entry == NULL) + txn->limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); + if (txn->limbo_entry == NULL) goto rollback; - - if (txn_has_flag(txn, TXN_WAIT_ACK)) { - int64_t lsn = req->rows[txn->n_applier_rows - 1]->lsn; - /* - * Can't tell whether it is local or not - - * async commit is used both by applier - * and during recovery. Use general LSN - * assignment to let the limbo rule this - * out. - */ - txn_limbo_assign_lsn(&txn_limbo, limbo_entry, lsn); - } - - /* - * Set a trigger to abort waiting for confirm on - * WAL write failure. - */ - trigger_create(trig, txn_limbo_on_rollback, - limbo_entry, NULL); - txn_on_rollback(txn, trig); } fiber_set_txn(fiber(), NULL); @@ -1183,6 +1150,7 @@ txn_commit(struct txn *txn) limbo_entry = txn_limbo_append(&txn_limbo, origin_id, txn); if (limbo_entry == NULL) goto rollback_abort; + txn->limbo_entry = limbo_entry; } fiber_set_txn(fiber(), NULL); @@ -1193,17 +1161,15 @@ txn_commit(struct txn *txn) goto rollback_io; } if (txn_has_flag(txn, TXN_WAIT_SYNC)) { + assert(limbo_entry->lsn > 0); + /* + * XXX: ACK should be done on WAL write too. But it can make + * another WAL write. Can't be done until it works + * asynchronously. + */ if (txn_has_flag(txn, TXN_WAIT_ACK)) { - int64_t lsn = req->rows[req->n_rows - 1]->lsn; - /* - * Use local LSN assignment. Because - * blocking commit is used by local - * transactions only. - */ - txn_limbo_assign_local_lsn(&txn_limbo, limbo_entry, - lsn); - /* Local WAL write is a first 'ACK'. */ - txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, lsn); + txn_limbo_ack(&txn_limbo, txn_limbo.owner_id, + limbo_entry->lsn); } if (txn_limbo_wait_complete(&txn_limbo, limbo_entry) < 0) goto rollback; @@ -1217,8 +1183,6 @@ txn_commit(struct txn *txn) rollback_io: diag_log(); - if (txn_has_flag(txn, TXN_WAIT_SYNC)) - txn_limbo_abort(&txn_limbo, limbo_entry); rollback_abort: txn->signature = TXN_SIGNATURE_ABORT; rollback: diff --git a/src/box/txn.h b/src/box/txn.h index a0fb38953fe08c9e29ec41920da5e6398b7b6f9e..a269b5f6d7321fafe230875f2e8f3ec6e2615bf9 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -541,6 +541,10 @@ struct txn { * to complete within the timeout specified when it was created. */ struct ev_timer *rollback_timer; + /** + * For synchronous transactions - their context in the synchro queue. + */ + struct txn_limbo_entry *limbo_entry; /** * Nesting level of space on_replace triggers for current txn. */ diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index bc6d0562d389b18199ec796cd936db820b87f8c6..b1626b83c8362258566209e806228422c381fdd3 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -170,8 +170,14 @@ txn_limbo_assign_remote_lsn(struct txn_limbo *limbo, assert(limbo->owner_id != instance_id); assert(entry->lsn == -1); assert(lsn > 0); - assert(txn_has_flag(entry->txn, TXN_WAIT_ACK)); (void) limbo; + /* + * Same as with local LSN assign, it is given after a WAL write. But for + * remotely received transactions it doesn't matter so far. They don't + * needs ACKs. They wait for explicit confirmations. That will be a + * problem when need acks for anything else and when local txns will + * become optionally non-blocking. + */ entry->lsn = lsn; } @@ -183,7 +189,6 @@ txn_limbo_assign_local_lsn(struct txn_limbo *limbo, assert(limbo->owner_id == instance_id); assert(entry->lsn == -1); assert(lsn > 0); - assert(txn_has_flag(entry->txn, TXN_WAIT_ACK)); entry->lsn = lsn; /* @@ -267,6 +272,7 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) rlist_foreach_entry_safe_reverse(e, &limbo->queue, in_queue, tmp) { e->txn->signature = TXN_SIGNATURE_QUORUM_TIMEOUT; + e->txn->limbo_entry = NULL; txn_limbo_abort(limbo, e); txn_clear_flags(e->txn, TXN_WAIT_SYNC | TXN_WAIT_ACK); txn_complete_fail(e->txn); @@ -432,10 +438,12 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) * the txn to get a crash on any usage attempt instead * of potential undefined behaviour. */ + e->txn->limbo_entry = NULL; e->txn = NULL; continue; } e->is_commit = true; + e->txn->limbo_entry = NULL; txn_limbo_remove(limbo, e); txn_clear_flags(e->txn, TXN_WAIT_SYNC | TXN_WAIT_ACK); /* @@ -488,6 +496,7 @@ txn_limbo_read_rollback(struct txn_limbo *limbo, int64_t lsn) */ assert(e->txn->signature >= 0); e->txn->signature = TXN_SIGNATURE_SYNC_ROLLBACK; + e->txn->limbo_entry = NULL; txn_complete_fail(e->txn); if (e == last_rollback) break; @@ -604,7 +613,6 @@ txn_limbo_ack(struct txn_limbo *limbo, uint32_t replica_id, int64_t lsn) * after all the previous sync transactions are. */ if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { - assert(e->lsn == -1); continue; } else if (e->lsn <= prev_lsn) { continue; @@ -901,7 +909,6 @@ txn_limbo_on_parameters_change(struct txn_limbo *limbo) rlist_foreach_entry(e, &limbo->queue, in_queue) { assert(e->ack_count <= VCLOCK_MAX); if (!txn_has_flag(e->txn, TXN_WAIT_ACK)) { - assert(e->lsn == -1); continue; } else if (e->ack_count < replication_synchro_quorum) { continue; diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index 2c8edfa58a610fbff21b3338693153153c52f929..19bbbac170967b28a8966f1a6e3dc17e76fa9be1 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -169,6 +169,7 @@ struct errinj { _(ERRINJ_XLOG_META, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_XLOG_READ, ERRINJ_INT, {.iparam = -1}) \ _(ERRINJ_XLOG_RENAME_DELAY, ERRINJ_BOOL, {.bparam = false}) \ + _(ERRINJ_TX_DELAY_PRIO_ENDPOINT, ERRINJ_DOUBLE, {.dparam = 0}) \ ENUM0(errinj_id, ERRINJ_LIST); extern struct errinj errinjs[]; diff --git a/test/box/errinj.result b/test/box/errinj.result index 577f71a564487b8329b9ef303e6e5de50589cfb6..54b769e7011b94ad7c9a1541805274302b7b01ad 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -100,6 +100,7 @@ evals - ERRINJ_TUPLE_FIELD_COUNT_LIMIT: -1 - ERRINJ_TUPLE_FORMAT_COUNT: -1 - ERRINJ_TXN_COMMIT_ASYNC: false + - ERRINJ_TX_DELAY_PRIO_ENDPOINT: 0 - ERRINJ_VYRUN_DATA_READ: false - ERRINJ_VY_COMPACTION_DELAY: false - ERRINJ_VY_DELAY_PK_LOOKUP: false diff --git a/test/replication-luatest/gh_6842_qsync_applier_order_test.lua b/test/replication-luatest/gh_6842_qsync_applier_order_test.lua index 88f640e7643d9037655e348eb3b43d784419f8bb..1a2a37e7cd934ba971e800bd54a328671d541134 100644 --- a/test/replication-luatest/gh_6842_qsync_applier_order_test.lua +++ b/test/replication-luatest/gh_6842_qsync_applier_order_test.lua @@ -95,6 +95,30 @@ local function wait_synchro_owner(server, owner_id) end, {owner_id}) end +-- +-- Wait until the server sees synchro queue has the given length. +-- +local function wait_synchro_len(server, len_arg) + luatest.helpers.retrying({timeout = wait_timeout}, server.exec, server, + function(len) + if box.info.synchro.queue.len ~= len then + error('Waiting for queue len') + end + end, {len_arg}) +end + +-- +-- Wait until the server sees synchro queue as busy. +-- +local function wait_synchro_is_busy(server) + luatest.helpers.retrying({timeout = wait_timeout}, server.exec, server, + function() + if not box.info.synchro.queue.busy then + error('Waiting for busy queue') + end + end) +end + -- -- Server 1 was a synchro queue owner. Then it receives a foreign PROMOTE which -- goes to WAL but is not applied yet. Server 1 during that tries to make a @@ -162,3 +186,91 @@ g.test_local_txn_during_remote_promote = function(g) end) luatest.assert_equals(content, {{3}}, 'synchro transactions work') end + +-- +-- Server 1 was a synchro queue owner. It starts a synchro txn. The txn is +-- written to WAL but not reported to TX yet. Relay manages to send it. Server 2 +-- receives the txn and makes a PROMOTE including this txn. +-- +-- Then the PROMOTE is delivered to server 1 and goes to WAL too. Then the txn +-- and PROMOTE WAL writes are processed by TX thread in a single batch without +-- yields. +-- +-- The bug was that the txn's synchro queue entry didn't get an LSN right after +-- WAL write and PROMOTE couldn't process synchro entries not having an LSN. +-- +g.test_remote_promote_during_local_txn_including_it = function(g) + -- Start synchro txns on server 1. + local fids = g.server1:exec(function() + box.ctl.promote() + local s = box.schema.create_space('test', {is_sync = true}) + s:create_index('pk') + box.cfg{ + -- To hang own transactions in the synchro queue. + replication_synchro_quorum = 3, + } + box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', true) + local fiber = require('fiber') + -- More than one transaction to ensure that it works not just for one. + local f1 = fiber.new(s.replace, s, {1}) + f1:set_joinable(true) + local f2 = fiber.new(s.replace, s, {2}) + f2:set_joinable(true) + return {f1:id(), f2:id()} + end) + + -- The txns are delivered to server 2. + wait_synchro_len(g.server2, #fids) + + -- Server 2 confirms the txns and sends a PROMOTE covering it to server 1. + g.server2:exec(function() + box.cfg{ + replication_synchro_quorum = 1, + -- To make it not wait for confirm from server 1. Just confirm via + -- the promotion ASAP. + replication_synchro_timeout = 0.001, + } + box.ctl.promote() + box.cfg{ + replication_synchro_quorum = 2, + replication_synchro_timeout = 1000, + } + end) + + -- Server 1 receives the foreign PROMOTE. Now the local txns and a remote + -- PROMOTE are being written to WAL. + wait_synchro_is_busy(g.server1) + + local rows = g.server1:exec(function(fids) + -- Simulate the TX thread being slow. To increase likelihood of the txn + -- and PROMOTE WAL writes being processed by TX in a single batch. + box.error.injection.set('ERRINJ_TX_DELAY_PRIO_ENDPOINT', 0.1) + -- Let them finish WAL writes. + box.error.injection.set('ERRINJ_RELAY_FASTER_THAN_TX', false) + + local fiber = require('fiber') + for _, fid in pairs(fids) do + fiber.find(fid):join() + end + box.error.injection.set('ERRINJ_TX_DELAY_PRIO_ENDPOINT', 0) + return box.space.test:select() + end, {fids}) + -- The txn was confirmed by another instance even before it was confirmed + -- locally. + luatest.assert_equals(rows, {{1}, {2}}, 'txn was confirmed') + + local rows2 = g.server2:exec(function() + return box.space.test:select() + end) + luatest.assert_equals(rows2, rows, 'on instance 2 too') + + -- The synchronous replication is functional - new owner can use the queue. + g.server2:exec(function() + box.space.test:replace{3} + end) + g.server1:wait_vclock_of(g.server2) + rows = g.server1:exec(function() + return box.space.test:select{} + end) + luatest.assert_equals(rows, {{1}, {2}, {3}}, 'synchro transactions work') +end