diff --git a/changelogs/unreleased/gh-7343-unserializable-read-tracked-incorrectly-after-rollback.md b/changelogs/unreleased/gh-7343-unserializable-read-tracked-incorrectly-after-rollback.md new file mode 100644 index 0000000000000000000000000000000000000000..62987b78c5ae87edce33e5372a443360fd40fe2c --- /dev/null +++ b/changelogs/unreleased/gh-7343-unserializable-read-tracked-incorrectly-after-rollback.md @@ -0,0 +1,4 @@ +## bugfix/memtx + +* Fixed unserializable reads tracked incorrectly after transaction rollback + (gh-7343). diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index 95ec5f79e78556f1d1c3fde7a4332d9b68335a53..5098a221bc6014d176757ae259dd9ec2b74e4653 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -801,6 +801,7 @@ memtx_tx_story_new(struct space *space, struct tuple *tuple, rlist_create(&story->link[i].nearby_gaps); story->link[i].in_index = space->index[i]; } + story->rollbacked = false; return story; } @@ -1147,28 +1148,31 @@ memtx_tx_story_unlink_both(struct memtx_story *story, uint32_t idx) assert(idx < story->index_count); struct memtx_story_link *link = &story->link[idx]; + struct memtx_story *newer_story = link->newer_story; + struct memtx_story *older_story = link->older_story; if (link->newer_story == NULL) { memtx_tx_story_unlink_top(story, idx); } else { - struct memtx_story *newer_story = link->newer_story; - struct memtx_story *older_story = link->older_story; memtx_tx_story_unlink(newer_story, story, idx); memtx_tx_story_unlink(story, older_story, idx); memtx_tx_story_link(newer_story, older_story, idx); - - /* - * Rebind read trackers in order to conflict - * readers in case of rollback of this txn. - */ - struct tx_read_tracker *tracker, *tmp; - uint64_t index_mask = 1ull << (idx & 63); - rlist_foreach_entry_safe(tracker, &story->reader_list, - in_reader_list, tmp) { - if ((tracker->index_mask & index_mask) != 0) { - memtx_tx_track_read_story_slow(tracker->reader, - newer_story, - index_mask); - } + } + /* + * Rebind read trackers in order to conflict + * readers in case of rollback of this txn. + */ + struct memtx_story *rebind_story = + newer_story != NULL ? newer_story : older_story; + struct tx_read_tracker *tracker, *tmp; + uint64_t index_mask = 1ull << (idx & 63); + rlist_foreach_entry_safe(tracker, &story->reader_list, + in_reader_list, tmp) { + if ((tracker->index_mask & index_mask) != 0) { + memtx_tx_track_read_story_slow(tracker->reader, + rebind_story, + index_mask); + rlist_del(&tracker->in_reader_list); + rlist_del(&tracker->in_read_set); } } } @@ -1231,6 +1235,9 @@ memtx_tx_story_full_unlink(struct memtx_story *story) * index. */ if (story->del_psn > 0 && link->in_index != NULL) { + assert(!story->rollbacked || + link->older_story == NULL); + struct index *index = link->in_index; struct tuple *removed, *unused; if (index_replace(index, story->tuple, NULL, @@ -1349,6 +1356,9 @@ memtx_tx_story_is_visible(struct memtx_story *story, struct txn *txn, *is_own_change = false; *visible_tuple = NULL; + if (story->rollbacked) + return false; + int64_t rv_psn = INT64_MAX; if (txn != NULL && txn->rv_psn != 0) rv_psn = txn->rv_psn; @@ -2094,9 +2104,19 @@ memtx_tx_history_rollback_added_story(struct txn_stmt *stmt) assert(stmt->add_story->tuple == stmt->rollback_info.new_tuple); struct memtx_story *story = stmt->add_story; memtx_tx_history_remove_story_del_stmts(story); - for (uint32_t i = 0; i < story->index_count; i++) + for (uint32_t i = 0; i < story->index_count; i++) { + struct memtx_story_link *link = &story->link[i]; + if (link->newer_story == NULL && link->older_story == NULL) { + story->rollbacked = true; + continue; + } memtx_tx_story_unlink_both(story, i); + } memtx_tx_story_unlink_added_by(story, stmt); + if (!story->rollbacked) { + assert(rlist_empty(&story->reader_list)); + memtx_tx_story_delete(story); + } } /* @@ -2112,8 +2132,12 @@ memtx_tx_history_rollback_deleted_story(struct txn_stmt *stmt) void memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) { - if (stmt->add_story != NULL) + assert(stmt->txn->psn != 0); + if (stmt->add_story != NULL) { + stmt->add_story->add_psn = stmt->txn->psn; + stmt->add_story->del_psn = stmt->txn->psn; memtx_tx_history_rollback_added_story(stmt); + } if (stmt->del_story != NULL) memtx_tx_history_rollback_deleted_story(stmt); assert(stmt->add_story == NULL && stmt->del_story == NULL); @@ -2125,7 +2149,13 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt) static void memtx_tx_history_remove_added_story(struct txn_stmt *stmt) { - memtx_tx_history_rollback_added_story(stmt); + assert(stmt->add_story != NULL); + assert(stmt->add_story->tuple == stmt->rollback_info.new_tuple); + struct memtx_story *story = stmt->add_story; + memtx_tx_history_remove_story_del_stmts(story); + for (uint32_t i = 0; i < story->index_count; i++) + memtx_tx_story_unlink_both(story, i); + memtx_tx_story_unlink_added_by(story, stmt); } /* @@ -2163,10 +2193,18 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt) * The list begins with several (or zero) of stories that are added by * in-progress transactions, then the list continues with several * (or zero) of prepared stories, which are followed by several - * (or zero) of committed stories. - * If a statement becomes prepared, its story must be moved to the - * point in list exactly between all still in-progress and all already - * prepared. + * (or zero) of committed stories, interleaved by rollbacked stories. We + * have the following totally ordered set over tuple stories: + * + * —————————————————————————————————————————————————> serialization time + * |- - - - - - - -|— — — — — -|— — — — — |— — — — — — -|— — — — — — — - + * | No more than | Committed | Prepared | In-progress | One dirty + * | one rollbacked| | | | story in index + * | story | | | | + * |- - - - - - - -|— — — — — -| — — — — —|— — — — — — -|— — — — — — — — + * + * If a statement becomes prepared, the story it adds must be 'sunk' to + * the level of prepared stories. */ struct memtx_story *story = stmt->add_story; uint32_t index_count = story->index_count; @@ -2174,9 +2212,11 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt) struct memtx_story *old_story = story->link[0].older_story; if (stmt->del_story == NULL) - assert(old_story == NULL || old_story->del_psn != 0); + assert(old_story == NULL || old_story->rollbacked || + old_story->del_psn != 0); else - assert(stmt->del_story == story->link[0].older_story); + assert(old_story != NULL && + (old_story->rollbacked || stmt->del_story == old_story)); if (stmt->del_story == NULL) { /* @@ -2210,6 +2250,11 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt) } } + if (old_story != NULL && old_story->rollbacked) { + assert(old_story->link[0].older_story == NULL); + old_story = NULL; + } + if (old_story != NULL) { /* * There can be some transactions that want to delete old_story. @@ -2508,8 +2553,13 @@ memtx_tx_index_invisible_count_slow(struct txn *txn, * A history chain is represented by the top story, which is * stored in index. */ - if (link->in_index == NULL) + if (link->in_index == NULL) { + assert(link->newer_story != NULL || + (story->rollbacked && + link->older_story == NULL)); continue; + } + assert(link->newer_story == NULL); struct tuple *visible = NULL; bool is_prepared_ok = detect_whether_prepared_ok(txn); diff --git a/src/box/memtx_tx.h b/src/box/memtx_tx.h index 9f75ca8c42d6bfa23f9e5cbd4d23add7c664f895..4891b1434724ccf1f5159b069b7f70efa77d3e10 100644 --- a/src/box/memtx_tx.h +++ b/src/box/memtx_tx.h @@ -207,6 +207,14 @@ struct memtx_story { * the story is the only reason why @a tuple cannot be deleted. */ bool tuple_is_retained; + /* + * Transaction that added this story was rollbacked: this story is + * absolutely invisible — its only purpose is to retain the reader list. + * It is present at the end of some history chains and completely + * unlinked from others, which also implies it is not present in the + * corresponding indexes. + */ + bool rollbacked; /** * Link with older and newer stories (and just tuples) for each * index respectively. diff --git a/src/box/txn.c b/src/box/txn.c index 4a9ed1bbe690acbf195d6a09b4983b80037e72c0..a019298c413e6338c7b1dbfa17429ca74f4a491d 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -391,9 +391,30 @@ txn_rollback_one_stmt(struct txn *txn, struct txn_stmt *stmt) } } +/* + * Begins the rollback to savepoint process by assigning a PSN to the + * transaction (rolled back statements require a PSN). + */ +static void +txn_rollback_to_svp_begin(struct txn *txn) +{ + txn->psn = ++txn_last_psn; +} + +/* + * Finishes the rollback to savepoint process by resetting the transaction's + * PSN (since this not a complete transaction rollback). + */ +static void +txn_rollback_to_svp_finish(struct txn *txn) +{ + txn->psn = 0; +} + static void txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) { + txn_rollback_to_svp_begin(txn); struct txn_stmt *stmt; struct stailq rollback; stailq_cut_tail(&txn->stmts, svp, &rollback); @@ -414,6 +435,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) stmt->space = NULL; stmt->row = NULL; } + txn_rollback_to_svp_finish(txn); } /* @@ -763,6 +785,7 @@ txn_free_or_wakeup(struct txn *txn) void txn_complete_fail(struct txn *txn) { + assert(txn->psn != 0); assert(!txn_has_flag(txn, TXN_IS_DONE)); assert(txn->signature < 0); assert(txn->signature != TXN_SIGNATURE_UNKNOWN); @@ -1225,6 +1248,10 @@ txn_rollback(struct txn *txn) { assert(txn == in_txn()); assert(txn->signature != TXN_SIGNATURE_UNKNOWN); + /* + * Rolled back statements require a PSN. + */ + txn->psn = ++txn_last_psn; txn->status = TXN_ABORTED; trigger_clear(&txn->fiber_on_stop); trigger_clear(&txn->fiber_on_yield); diff --git a/test/box-luatest/gh_7343_unserializable_read_tracked_incorrectly_after_rollback_test.lua b/test/box-luatest/gh_7343_unserializable_read_tracked_incorrectly_after_rollback_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..1d05a67fd1ada6a51423853030da229bbd75a61f --- /dev/null +++ b/test/box-luatest/gh_7343_unserializable_read_tracked_incorrectly_after_rollback_test.lua @@ -0,0 +1,53 @@ +local server = require('test.luatest_helpers.server') +local t = require('luatest') + +local pg = t.group(nil, t.helpers.matrix({op = {'get', 'delete', 'update'}})) + +pg.before_all(function(cg) + cg.server = server:new{ + alias = 'dflt', + box_cfg = {memtx_use_mvcc_engine = true} + } + cg.server:start() + cg.server:exec(function() + box.schema.create_space('s') + box.space.s:create_index('pk') + end) +end) + +pg.after_all(function(cg) + cg.server:drop() +end) + +--[[ +Checks that unserializable reads are tracked correctly after transaction +rollback. +]] +pg.test_unserializable_read_after_rollback = function(cg) + local stream1 = cg.server.net_box:new_stream() + local stream2 = cg.server.net_box:new_stream() + local stream3 = cg.server.net_box:new_stream() + + stream1:begin() + stream2:begin() + stream3:begin() + + stream1.space.s:replace{0, 0} + + local args = {{0}} + if cg.params.op == 'update' then + table.insert(args, {{'=', 2, 0}}) + end + stream2.space.s[cg.params.op](stream2.space.s, unpack(args)) + stream2.space.s:replace{1, 0} + + stream1:rollback() + + stream3.space.s:insert{0, 1} + stream3:commit() + + local conflict_err_msg = 'Transaction has been aborted by conflict' + t.assert_error_msg_content_equals(conflict_err_msg, function() + stream2:commit() + end) +end