diff --git a/changelogs/unreleased/gh-9340-fix-on_rollback-function-args.md b/changelogs/unreleased/gh-9340-fix-on_rollback-function-args.md new file mode 100644 index 0000000000000000000000000000000000000000..168f59869fbb3671b8154b6a5428878be269fcea --- /dev/null +++ b/changelogs/unreleased/gh-9340-fix-on_rollback-function-args.md @@ -0,0 +1,5 @@ +## bugfix/core + +* Fixed a bug when `on_rollback` trigger functions were invoked with an empty + iterator argument if a transaction was aborted by a fiber yield or by a + timeout (gh-9340). diff --git a/src/box/txn.c b/src/box/txn.c index d0fc7cc5d2e36ef86ef8d4aed276959f86b99154..7daf3d33610b68732e8464110a09272891c8eae9 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -382,13 +382,26 @@ txn_update_row_counts(struct txn *txn, struct txn_stmt *stmt, int ofs) } } +/** + * Rollback all statements of `txn' newer than `svp', and run `on_rollback' + * triggers if `run_triggers' is true. + * The statements are rolled back in reverse order. + */ static void -txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) +txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp, + bool run_triggers) { struct txn_stmt *stmt; struct stailq rollback; stailq_cut_tail(&txn->stmts, svp, &rollback); stailq_reverse(&rollback); + if (run_triggers && txn_has_flag(txn, TXN_HAS_TRIGGERS)) { + stmt = stailq_first_entry(&rollback, struct txn_stmt, next); + if (trigger_run(&txn->on_rollback, stmt) != 0) { + diag_log(); + panic("transaction rollback trigger failed"); + } + } stailq_foreach_entry(stmt, &rollback, next) { txn_rollback_one_stmt(txn, stmt); if (stmt->row != NULL) @@ -1116,7 +1129,7 @@ txn_rollback_stmt(struct txn *txn) return; assert(txn->in_sub_stmt > 0); txn->in_sub_stmt--; - txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt]); + txn_rollback_to_svp(txn, txn->sub_stmt_begin[txn->in_sub_stmt], false); } void @@ -1424,7 +1437,7 @@ box_txn_rollback_to_savepoint(box_txn_savepoint_t *svp) diag_set(ClientError, ER_NO_SUCH_SAVEPOINT); return -1; } - txn_rollback_to_svp(txn, svp->stmt); + txn_rollback_to_svp(txn, svp->stmt, false); /* Discard from list all newer savepoints. */ RLIST_HEAD(discard); rlist_cut_before(&discard, &txn->savepoints, &svp->link); @@ -1461,13 +1474,26 @@ txn_on_stop(struct trigger *trigger, void *event) return 0; } +/** + * Transaction rollback timer callback. + * + * If there are `on_rollback' triggers, the transaction can not be rolled back + * completely here, because this callback is invoked outside of the transaction, + * however triggers must be executed in the fiber with the active transaction. + * Thus, the transaction is marked as aborted here, and rolled back at commit. + * As opposed to abort-by-yield, it is OK to postpone the rollback, because in + * memtx the transaction can be aborted by timeout only when MVCC is on. + */ static void txn_on_timeout(ev_loop *loop, ev_timer *watcher, int revents) { (void) loop; (void) revents; struct txn *txn = (struct txn *)watcher->data; - txn_rollback_to_svp(txn, NULL); + if (!txn_has_flag(txn, TXN_HAS_TRIGGERS) || + rlist_empty(&txn->on_rollback)) { + txn_rollback_to_svp(txn, NULL, false); + } txn->status = TXN_ABORTED; txn_set_flags(txn, TXN_IS_ABORTED_BY_TIMEOUT); } @@ -1497,10 +1523,16 @@ txn_on_yield(struct trigger *trigger, void *event) struct txn *txn = in_txn(); assert(txn != NULL); if (txn->status != TXN_ABORTED && !txn_has_flag(txn, TXN_CAN_YIELD)) { - txn_rollback_to_svp(txn, NULL); + txn_rollback_to_svp(txn, NULL, true); txn->status = TXN_ABORTED; txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD); say_warn("Transaction has been aborted by a fiber yield"); + if (txn_has_flag(txn, TXN_HAS_TRIGGERS)) { + /* Can't rollback more than once. */ + trigger_destroy(&txn->on_rollback); + /* Commit won't happen after rollback. */ + trigger_destroy(&txn->on_commit); + } return 0; } if (txn->rollback_timer == NULL && txn->timeout != TIMEOUT_INFINITY) { diff --git a/test/engine-luatest/gh_9340_on_rollback_trigger_args_test.lua b/test/engine-luatest/gh_9340_on_rollback_trigger_args_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..94b8e55bda478baad966a9c14eef09fabf953ba6 --- /dev/null +++ b/test/engine-luatest/gh_9340_on_rollback_trigger_args_test.lua @@ -0,0 +1,173 @@ +local t = require('luatest') +local server = require('luatest.server') + +local function before_all(cg, box_cfg) + cg.server = server:new({box_cfg = box_cfg}) + cg.server:start() + cg.server:exec(function() + -- A trigger that saves everything from `iterator' into `_G.result'. + rawset(_G, 'on_rollback_trigger', function(iterator) + local result = {} + for num, old_tuple, new_tuple, space_id in iterator() do + table.insert(result, { + num = num, space_id = space_id, + old_tuple = old_tuple, new_tuple = new_tuple + }) + end + rawset(_G, 'result', result) + end) + end) +end + +local function after_all(cg) + cg.server:drop() +end + +local function after_each(cg) + cg.server:exec(function() + box.space.test1:drop() + box.space.test2:drop() + end) +end + +-- Enable MVCC to abort transactions by a timeout or by a conflict. +local g_mvcc_on = t.group('gh-9340-mvcc-on', {{engine = 'memtx'}, + {engine = 'vinyl'}}) +g_mvcc_on.before_all(function(cg) + before_all(cg, {memtx_use_mvcc_engine = true}) +end) +g_mvcc_on.after_all(after_all) +g_mvcc_on.after_each(after_each) + +-- Check arguments of the `on_rollback' triggers. +g_mvcc_on.test_trigger_args = function(cg) + cg.server:exec(function(engine) + local fiber = require('fiber') + local s1 = box.schema.space.create('test1', {engine = engine}) + local s2 = box.schema.space.create('test2', {engine = engine}) + s1:create_index('pk') + s2:create_index('pk') + s1:on_replace(function() box.on_rollback(_G.on_rollback_trigger) end) + + -- Check rollback by `box.rollback()'. + box.begin() + s1:insert{1} + s2:insert{2} + box.rollback() + t.assert_equals(_G.result, { + {num = 1, space_id = 513, old_tuple = nil, new_tuple = {2}}, + {num = 2, space_id = 512, old_tuple = nil, new_tuple = {1}}, + }) + + -- Check rollback by a conflict. + box.begin() + s1:insert{3} + s2:insert{4} + local f = fiber.new(s1.insert, s1, {3, 3}) + f:set_joinable(true) + t.assert_equals({f:join()}, {true, {3, 3}}) + local errmsg = 'Transaction has been aborted by conflict' + t.assert_error_msg_equals(errmsg, s1.insert, s1, {33}) + t.assert_error_msg_equals(errmsg, s2.insert, s2, {44}) + t.assert_error_msg_equals(errmsg, box.commit) + t.assert_equals(_G.result, { + {num = 1, space_id = 513, old_tuple = nil, new_tuple = {4}}, + {num = 2, space_id = 512, old_tuple = nil, new_tuple = {3}}, + }) + + -- Check rollback by a timeout. + box.begin({timeout = 0.01}) + s1:insert{5} + s2:insert{6} + fiber.sleep(0.1) + local errmsg = 'Transaction has been aborted by timeout' + t.assert_error_msg_equals(errmsg, s1.insert, s1, {55}) + t.assert_error_msg_equals(errmsg, s2.insert, s2, {66}) + t.assert_error_msg_equals(errmsg, box.commit) + t.assert_equals(_G.result, { + {num = 1, space_id = 513, old_tuple = nil, new_tuple = {6}}, + {num = 2, space_id = 512, old_tuple = nil, new_tuple = {5}}, + }) + + -- Check that rollback of a single statement (without full transaction + -- rollback) doesn't invoke `on_rollback' triggers. + local function error_in_on_replace() + error('err') + end + s2:on_replace(error_in_on_replace) + _G.result = {} + box.begin() + s1:insert{7} + t.assert_error_msg_content_equals('err', s2.insert, s2, {77}) + box.commit() + s2:on_replace(nil, error_in_on_replace) + t.assert_equals(_G.result, {}) + end, {cg.params.engine}) +end + +-- Tests with error injection to force rollback due to WAL failure. +g_mvcc_on.test_trigger_args_debug = function(cg) + t.tarantool.skip_if_not_debug() + cg.server:exec(function(engine) + local s1 = box.schema.space.create('test1', {engine = engine}) + local s2 = box.schema.space.create('test2', {engine = engine}) + s1:create_index('pk') + s2:create_index('pk') + s1:on_replace(function() box.on_rollback(_G.on_rollback_trigger) end) + + -- Check rollback by a WAL I/O error. + box.begin() + s1:insert{1} + s2:insert{2} + box.error.injection.set("ERRINJ_WAL_IO", true) + t.assert_error_msg_equals('Failed to write to disk', box.commit) + box.error.injection.set("ERRINJ_WAL_IO", false) + t.assert_equals(_G.result, { + {num = 1, space_id = 513, old_tuple = nil, new_tuple = {2}}, + {num = 2, space_id = 512, old_tuple = nil, new_tuple = {1}}, + }) + + -- Check that `on_rollback' triggers are called with correct arguments + -- for single-statement transactions as well. + box.error.injection.set("ERRINJ_WAL_IO", true) + t.assert_error_msg_equals('Failed to write to disk', s1.insert, s1, {3}) + box.error.injection.set("ERRINJ_WAL_IO", false) + t.assert_equals(_G.result, { + {num = 1, space_id = 512, old_tuple = nil, new_tuple = {3}}, + }) + end, {cg.params.engine}) +end + +-- Force disable MVCC to abort the transaction by fiber yield. +local g_mvcc_off = t.group('gh-9340-mvcc-off-memtx') +g_mvcc_off.before_all(function(cg) + before_all(cg, {memtx_use_mvcc_engine = false}) +end) +g_mvcc_off.after_all(after_all) +g_mvcc_off.after_each(after_each) + +-- Check arguments of the `on_rollback' triggers. +g_mvcc_off.test_trigger_args = function(cg) + cg.server:exec(function() + local fiber = require('fiber') + local s1 = box.schema.space.create('test1', {engine = 'memtx'}) + local s2 = box.schema.space.create('test2', {engine = 'memtx'}) + s1:create_index('pk') + s2:create_index('pk') + + -- Check rollback by `fiber.yield()'. + box.begin() + box.on_rollback(_G.on_rollback_trigger) + s1:insert{1} + s2:insert{2} + fiber.yield() + local errmsg = 'Transaction has been aborted by a fiber yield' + t.assert_error_msg_equals(errmsg, s1.insert, s1, {11}) + t.assert_error_msg_equals(errmsg, s2.insert, s2, {22}) + t.assert_error_msg_equals(errmsg, box.commit) + t.assert_equals(_G.result, { + {num = 1, space_id = 513, old_tuple = nil, new_tuple = {2}}, + {num = 2, space_id = 512, old_tuple = nil, new_tuple = {1}}, + }) + end) +end