Skip to content
Snippets Groups Projects
Commit 56cf737c authored by Georgiy Lebedev's avatar Georgiy Lebedev Committed by Aleksandr Lyapunov
Browse files

memtx: rework transaction rollback

When we rollback a transaction statement, we relink its read trackers
to a newer story in the history chain, if present (6c990a7b), but we do not
handle the case when there is no newer story.

If there is an older story in the history chain, we can relink the
rollbacked story's reader to it, but if the rollbacked story is the
only one left, we need to retain it, because it stores the reader list
needed for conflict resolution — such stories are distinguished by the
rollbacked flag, and there can be no more than one such story located
strictly at the end of a given history chain (which means a story can be
fully unlinked from some indexes and present at the end of others).

There are several nuances we need to account for:

Firstly, such rollbacked stories must be impossible to read from an index:
this is ensured by `memtx_tx_story_is_visible`.

Secondly, rollbacked transactions need to be treated as prepared with
stories that have `add_psn == del_psn`, so that they are correctly deleted
during garbage collection.

After this logical change we have the following partially ordered set over
tuple stories:
———————————————————————————————————————————————————————> serialization time
|- - - - - - - -|— — — — — -|— — — — — |— — — — — — -|— — — — — — — -
| No more than  | Committed | Prepared | In-progress | One dirty
| one rollbacked|           |          |             | story in index
| story         |           |          |             |
|- - - - - - - -|— — — — — -| — — — — —|— — — — — — -|— — — — — — — —

Closes #7343

NO_DOC=bugfix
parent 55e64a8d
No related branches found
No related tags found
No related merge requests found
## bugfix/memtx
* Fixed unserializable reads tracked incorrectly after transaction rollback
(gh-7343).
......@@ -801,6 +801,7 @@ memtx_tx_story_new(struct space *space, struct tuple *tuple,
rlist_create(&story->link[i].nearby_gaps);
story->link[i].in_index = space->index[i];
}
story->rollbacked = false;
return story;
}
......@@ -1147,28 +1148,31 @@ memtx_tx_story_unlink_both(struct memtx_story *story, uint32_t idx)
assert(idx < story->index_count);
struct memtx_story_link *link = &story->link[idx];
struct memtx_story *newer_story = link->newer_story;
struct memtx_story *older_story = link->older_story;
if (link->newer_story == NULL) {
memtx_tx_story_unlink_top(story, idx);
} else {
struct memtx_story *newer_story = link->newer_story;
struct memtx_story *older_story = link->older_story;
memtx_tx_story_unlink(newer_story, story, idx);
memtx_tx_story_unlink(story, older_story, idx);
memtx_tx_story_link(newer_story, older_story, idx);
/*
* Rebind read trackers in order to conflict
* readers in case of rollback of this txn.
*/
struct tx_read_tracker *tracker, *tmp;
uint64_t index_mask = 1ull << (idx & 63);
rlist_foreach_entry_safe(tracker, &story->reader_list,
in_reader_list, tmp) {
if ((tracker->index_mask & index_mask) != 0) {
memtx_tx_track_read_story_slow(tracker->reader,
newer_story,
index_mask);
}
}
/*
* Rebind read trackers in order to conflict
* readers in case of rollback of this txn.
*/
struct memtx_story *rebind_story =
newer_story != NULL ? newer_story : older_story;
struct tx_read_tracker *tracker, *tmp;
uint64_t index_mask = 1ull << (idx & 63);
rlist_foreach_entry_safe(tracker, &story->reader_list,
in_reader_list, tmp) {
if ((tracker->index_mask & index_mask) != 0) {
memtx_tx_track_read_story_slow(tracker->reader,
rebind_story,
index_mask);
rlist_del(&tracker->in_reader_list);
rlist_del(&tracker->in_read_set);
}
}
}
......@@ -1231,6 +1235,9 @@ memtx_tx_story_full_unlink(struct memtx_story *story)
* index.
*/
if (story->del_psn > 0 && link->in_index != NULL) {
assert(!story->rollbacked ||
link->older_story == NULL);
struct index *index = link->in_index;
struct tuple *removed, *unused;
if (index_replace(index, story->tuple, NULL,
......@@ -1349,6 +1356,9 @@ memtx_tx_story_is_visible(struct memtx_story *story, struct txn *txn,
*is_own_change = false;
*visible_tuple = NULL;
if (story->rollbacked)
return false;
int64_t rv_psn = INT64_MAX;
if (txn != NULL && txn->rv_psn != 0)
rv_psn = txn->rv_psn;
......@@ -2094,9 +2104,19 @@ memtx_tx_history_rollback_added_story(struct txn_stmt *stmt)
assert(stmt->add_story->tuple == stmt->rollback_info.new_tuple);
struct memtx_story *story = stmt->add_story;
memtx_tx_history_remove_story_del_stmts(story);
for (uint32_t i = 0; i < story->index_count; i++)
for (uint32_t i = 0; i < story->index_count; i++) {
struct memtx_story_link *link = &story->link[i];
if (link->newer_story == NULL && link->older_story == NULL) {
story->rollbacked = true;
continue;
}
memtx_tx_story_unlink_both(story, i);
}
memtx_tx_story_unlink_added_by(story, stmt);
if (!story->rollbacked) {
assert(rlist_empty(&story->reader_list));
memtx_tx_story_delete(story);
}
}
/*
......@@ -2112,8 +2132,12 @@ memtx_tx_history_rollback_deleted_story(struct txn_stmt *stmt)
void
memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
{
if (stmt->add_story != NULL)
assert(stmt->txn->psn != 0);
if (stmt->add_story != NULL) {
stmt->add_story->add_psn = stmt->txn->psn;
stmt->add_story->del_psn = stmt->txn->psn;
memtx_tx_history_rollback_added_story(stmt);
}
if (stmt->del_story != NULL)
memtx_tx_history_rollback_deleted_story(stmt);
assert(stmt->add_story == NULL && stmt->del_story == NULL);
......@@ -2125,7 +2149,13 @@ memtx_tx_history_rollback_stmt(struct txn_stmt *stmt)
static void
memtx_tx_history_remove_added_story(struct txn_stmt *stmt)
{
memtx_tx_history_rollback_added_story(stmt);
assert(stmt->add_story != NULL);
assert(stmt->add_story->tuple == stmt->rollback_info.new_tuple);
struct memtx_story *story = stmt->add_story;
memtx_tx_history_remove_story_del_stmts(story);
for (uint32_t i = 0; i < story->index_count; i++)
memtx_tx_story_unlink_both(story, i);
memtx_tx_story_unlink_added_by(story, stmt);
}
/*
......@@ -2163,10 +2193,18 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
* The list begins with several (or zero) of stories that are added by
* in-progress transactions, then the list continues with several
* (or zero) of prepared stories, which are followed by several
* (or zero) of committed stories.
* If a statement becomes prepared, its story must be moved to the
* point in list exactly between all still in-progress and all already
* prepared.
* (or zero) of committed stories, interleaved by rollbacked stories. We
* have the following totally ordered set over tuple stories:
*
* —————————————————————————————————————————————————> serialization time
* |- - - - - - - -|— — — — — -|— — — — — |— — — — — — -|— — — — — — — -
* | No more than | Committed | Prepared | In-progress | One dirty
* | one rollbacked| | | | story in index
* | story | | | |
* |- - - - - - - -|— — — — — -| — — — — —|— — — — — — -|— — — — — — — —
*
* If a statement becomes prepared, the story it adds must be 'sunk' to
* the level of prepared stories.
*/
struct memtx_story *story = stmt->add_story;
uint32_t index_count = story->index_count;
......@@ -2174,9 +2212,11 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
struct memtx_story *old_story = story->link[0].older_story;
if (stmt->del_story == NULL)
assert(old_story == NULL || old_story->del_psn != 0);
assert(old_story == NULL || old_story->rollbacked ||
old_story->del_psn != 0);
else
assert(stmt->del_story == story->link[0].older_story);
assert(old_story != NULL &&
(old_story->rollbacked || stmt->del_story == old_story));
if (stmt->del_story == NULL) {
/*
......@@ -2210,6 +2250,11 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
}
}
if (old_story != NULL && old_story->rollbacked) {
assert(old_story->link[0].older_story == NULL);
old_story = NULL;
}
if (old_story != NULL) {
/*
* There can be some transactions that want to delete old_story.
......@@ -2508,8 +2553,13 @@ memtx_tx_index_invisible_count_slow(struct txn *txn,
* A history chain is represented by the top story, which is
* stored in index.
*/
if (link->in_index == NULL)
if (link->in_index == NULL) {
assert(link->newer_story != NULL ||
(story->rollbacked &&
link->older_story == NULL));
continue;
}
assert(link->newer_story == NULL);
struct tuple *visible = NULL;
bool is_prepared_ok = detect_whether_prepared_ok(txn);
......
......@@ -207,6 +207,14 @@ struct memtx_story {
* the story is the only reason why @a tuple cannot be deleted.
*/
bool tuple_is_retained;
/*
* Transaction that added this story was rollbacked: this story is
* absolutely invisible — its only purpose is to retain the reader list.
* It is present at the end of some history chains and completely
* unlinked from others, which also implies it is not present in the
* corresponding indexes.
*/
bool rollbacked;
/**
* Link with older and newer stories (and just tuples) for each
* index respectively.
......
......@@ -391,9 +391,30 @@ txn_rollback_one_stmt(struct txn *txn, struct txn_stmt *stmt)
}
}
/*
* Begins the rollback to savepoint process by assigning a PSN to the
* transaction (rolled back statements require a PSN).
*/
static void
txn_rollback_to_svp_begin(struct txn *txn)
{
txn->psn = ++txn_last_psn;
}
/*
* Finishes the rollback to savepoint process by resetting the transaction's
* PSN (since this not a complete transaction rollback).
*/
static void
txn_rollback_to_svp_finish(struct txn *txn)
{
txn->psn = 0;
}
static void
txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
{
txn_rollback_to_svp_begin(txn);
struct txn_stmt *stmt;
struct stailq rollback;
stailq_cut_tail(&txn->stmts, svp, &rollback);
......@@ -414,6 +435,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp)
stmt->space = NULL;
stmt->row = NULL;
}
txn_rollback_to_svp_finish(txn);
}
/*
......@@ -763,6 +785,7 @@ txn_free_or_wakeup(struct txn *txn)
void
txn_complete_fail(struct txn *txn)
{
assert(txn->psn != 0);
assert(!txn_has_flag(txn, TXN_IS_DONE));
assert(txn->signature < 0);
assert(txn->signature != TXN_SIGNATURE_UNKNOWN);
......@@ -1225,6 +1248,10 @@ txn_rollback(struct txn *txn)
{
assert(txn == in_txn());
assert(txn->signature != TXN_SIGNATURE_UNKNOWN);
/*
* Rolled back statements require a PSN.
*/
txn->psn = ++txn_last_psn;
txn->status = TXN_ABORTED;
trigger_clear(&txn->fiber_on_stop);
trigger_clear(&txn->fiber_on_yield);
......
local server = require('test.luatest_helpers.server')
local t = require('luatest')
local pg = t.group(nil, t.helpers.matrix({op = {'get', 'delete', 'update'}}))
pg.before_all(function(cg)
cg.server = server:new{
alias = 'dflt',
box_cfg = {memtx_use_mvcc_engine = true}
}
cg.server:start()
cg.server:exec(function()
box.schema.create_space('s')
box.space.s:create_index('pk')
end)
end)
pg.after_all(function(cg)
cg.server:drop()
end)
--[[
Checks that unserializable reads are tracked correctly after transaction
rollback.
]]
pg.test_unserializable_read_after_rollback = function(cg)
local stream1 = cg.server.net_box:new_stream()
local stream2 = cg.server.net_box:new_stream()
local stream3 = cg.server.net_box:new_stream()
stream1:begin()
stream2:begin()
stream3:begin()
stream1.space.s:replace{0, 0}
local args = {{0}}
if cg.params.op == 'update' then
table.insert(args, {{'=', 2, 0}})
end
stream2.space.s[cg.params.op](stream2.space.s, unpack(args))
stream2.space.s:replace{1, 0}
stream1:rollback()
stream3.space.s:insert{0, 1}
stream3:commit()
local conflict_err_msg = 'Transaction has been aborted by conflict'
t.assert_error_msg_content_equals(conflict_err_msg, function()
stream2:commit()
end)
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment