Skip to content
Snippets Groups Projects
Commit e9015074 authored by Aleksandr Lyapunov's avatar Aleksandr Lyapunov Committed by Aleksandr Lyapunov
Browse files

memtx: simplify and refactor memtx_tx_history_prepare_stmt

Almost completely rewrite, simplify and comment this part of code.

Part of #8648
Part of #8654

NO_DOC=refactoring
NO_TEST=refactoring
NO_CHANGELOG=refactoring
parent 63da3bed
No related branches found
No related tags found
No related merge requests found
......@@ -1403,6 +1403,17 @@ memtx_tx_story_full_unlink_on_space_delete(struct memtx_story *story)
}
}
/**
* Find a top story in chain of @a story by index @a ind.
*/
static struct memtx_story *
memtx_tx_story_find_top(struct memtx_story *story, uint32_t ind)
{
while (story->link[ind].newer_story != NULL)
story = story->link[ind].newer_story;
return story;
}
/**
* Unlink @a story from all chains and remove corresponding tuple from
* indexes if necessary: used in garbage collection step and preserves the top
......@@ -2166,25 +2177,6 @@ memtx_tx_history_remove_story_del_stmts(struct memtx_story *story)
}
}
/*
* Push story down the history chain to the level of prepared stories in each
* index.
*/
static void
memtx_tx_history_sink_story(struct memtx_story *story)
{
for (uint32_t i = 0; i < story->index_count; ) {
struct memtx_story *old_story = story->link[i].older_story;
if (old_story == NULL || old_story->add_psn != 0 ||
old_story->add_stmt == NULL) {
/* Old story is absent or prepared or committed. */
i++; /* Go to the next index. */
continue;
}
memtx_tx_story_reorder(story, old_story, i);
}
}
/*
* Rollback addition of story by statement.
*/
......@@ -2280,6 +2272,41 @@ memtx_tx_history_remove_stmt(struct txn_stmt *stmt)
stmt->engine_savepoint = NULL;
}
/**
* Abort or send to read view readers of @a story, except the transaction
* @a writer that is actually deletes the story.
*/
static void
memtx_tx_handle_conflict_story_readers(struct memtx_story *story,
struct txn *writer)
{
struct tx_read_tracker *tracker, *tmp;
rlist_foreach_entry_safe(tracker, &story->reader_list,
in_reader_list, tmp) {
if (tracker->reader == writer)
continue;
memtx_tx_handle_conflict(writer, tracker->reader);
}
}
/**
* Abort or send to read view readers of @a story, except the transaction
* @a writer that is actually deletes the story.
*/
static void
memtx_tx_handle_conflict_gap_readers(struct memtx_story *top_story,
uint32_t ind, struct txn *writer)
{
assert(top_story->link[ind].newer_story == NULL);
struct gap_item_base *item, *tmp;
rlist_foreach_entry_safe(item, &top_story->link[ind].read_gaps,
in_read_gaps, tmp) {
if (item->txn == writer || item->type != GAP_INPLACE)
continue;
memtx_tx_handle_conflict(writer, item->txn);
}
}
/**
* Helper of memtx_tx_history_prepare_stmt. Do the job in case when
* stmt->add_story != NULL, that is REPLACE, INSERT, UPDATE etc.
......@@ -2287,7 +2314,8 @@ memtx_tx_history_remove_stmt(struct txn_stmt *stmt)
static void
memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
{
assert(stmt->add_story != NULL);
struct memtx_story *story = stmt->add_story;
assert(story != NULL);
/**
* History of a key in an index can consist of several stories.
* The list of stories is started with a dirty tuple that is in index.
......@@ -2306,154 +2334,163 @@ memtx_tx_history_prepare_insert_stmt(struct txn_stmt *stmt)
* If a statement becomes prepared, the story it adds must be 'sunk' to
* the level of prepared stories.
*/
struct memtx_story *story = stmt->add_story;
uint32_t index_count = story->index_count;
memtx_tx_history_sink_story(story);
for (uint32_t i = 0; i < story->index_count; ) {
struct memtx_story *old_story = story->link[i].older_story;
if (old_story == NULL || old_story->add_psn != 0 ||
old_story->add_stmt == NULL) {
/* Old story is absent or prepared or committed. */
i++; /* Go to the next index. */
continue;
}
memtx_tx_story_reorder(story, old_story, i);
}
/* Consistency asserts. */
{
assert(story->del_stmt == NULL ||
story->del_stmt->next_in_del_list == NULL);
struct memtx_story *old_story = story->link[0].older_story;
if (stmt->del_story == NULL)
assert(old_story == NULL || old_story->del_psn != 0);
else
assert(old_story == stmt->del_story);
(void)old_story;
}
/*
* Set newer (in-progress) statements in the primary chain to delete
* this story.
*/
if (stmt->del_story == NULL) {
/*
* This statement replaced nothing. That means that before
* this preparation there was no visible tuple in index, and
* now there is.
* There also can be some in-progress transactions that think
* that the replaced nothing. Some of them must be aborted
* (for example inserts, that must replace nothing), other
* must be told that they replace this tuple now.
* There could be some in-progress transactions that think
* that the replaced nothing. They must be told that they
* replace this tuple now.
*/
struct memtx_story_link *link = &story->link[0];
while (link->newer_story != NULL) {
struct memtx_story *test = link->newer_story;
link = &test->link[0];
struct txn_stmt *test_stmt = test->add_stmt;
if (test_stmt->txn == stmt->txn)
continue;
if (test_stmt->is_own_change &&
test_stmt->del_story == NULL)
continue;
if (test_stmt->del_story != NULL) {
assert(test_stmt->del_story->add_stmt->txn
== test_stmt->txn);
struct memtx_story *test_story;
for (test_story = story->link[0].newer_story;
test_story != NULL;
test_story = test_story->link[0].newer_story) {
struct txn_stmt *test_stmt = test_story->add_stmt;
if (test_stmt->is_own_change)
continue;
}
assert(test_stmt->txn != stmt->txn);
assert(test_stmt->del_story == NULL);
assert(test_stmt->txn->psn == 0);
memtx_tx_story_link_deleted_by(story, test_stmt);
}
/* Now link is in top of chain, where gap records are stored. */
struct gap_item_base *item_base, *tmp;
rlist_foreach_entry_safe(item_base, &link->read_gaps,
in_read_gaps, tmp) {
if (item_base->txn == stmt->txn ||
item_base->type != GAP_INPLACE)
continue;
memtx_tx_handle_conflict(stmt->txn, item_base->txn);
}
}
struct memtx_story *old_story = story->link[0].older_story;
if (stmt->del_story == NULL)
assert(old_story == NULL || old_story->del_psn != 0);
else
assert(old_story != NULL && stmt->del_story == old_story);
if (old_story != NULL && old_story->del_psn != 0) {
assert(stmt->del_story == NULL);
old_story = NULL;
}
if (old_story != NULL) {
} else {
/*
* There can be some transactions that want to delete old_story.
* It can be this transaction.
* All other transactions must be aborted or relinked to delete
* this tuple.
* This statement replaced older story. That means that before
* this preparation there was another visible tuple in this
* place of index.
* There could be some in-progress transactions that think
* they deleted or replaced that another tuple. They must be
* told that they replace this tuple now.
*/
struct txn_stmt **from = &old_story->del_stmt;
struct txn_stmt **to = &story->del_stmt;
while (*to != NULL)
to = &((*to)->next_in_del_list);
struct txn_stmt **from = &stmt->del_story->del_stmt;
while (*from != NULL) {
struct txn_stmt *test_stmt = *from;
assert(test_stmt->del_story == old_story);
if (test_stmt->txn == stmt->txn) {
assert(test_stmt == stmt ||
test_stmt->add_story == NULL);
/*
* Statement from the same transaction. Go to
* the next statement.
*/
assert(test_stmt->del_story == stmt->del_story);
if (test_stmt == stmt) {
/* Leave this statement, go to the next. */
from = &test_stmt->next_in_del_list;
continue;
}
assert(test_stmt->txn != stmt->txn);
assert(test_stmt->txn->psn == 0);
/* Unlink from old list in any case. */
/* Unlink from old story list. */
*from = test_stmt->next_in_del_list;
test_stmt->next_in_del_list = NULL;
test_stmt->del_story = NULL;
/* Link to story's list. */
test_stmt->del_story = story;
*to = test_stmt;
to = &test_stmt->next_in_del_list;
memtx_tx_story_link_deleted_by(story, test_stmt);
}
}
/* Handle main conflicts. */
if (stmt->del_story != NULL) {
/*
* The story stmt->del_story ends by now. Every TX that
* depend on it must go to read view or be aborted.
*/
memtx_tx_handle_conflict_story_readers(stmt->del_story,
stmt->txn);
} else {
/*
* A tuple is inserted. Every TX that depends on absence of
* a tuple (in any index) must go to read view or be aborted.
* We check only primary index here, we will check all other
* indexes below.
*/
struct memtx_story *top_story =
memtx_tx_story_find_top(story, 0);
memtx_tx_handle_conflict_gap_readers(top_story, 0, stmt->txn);
}
/* Handle conflicts in the secondary indexes. */
for (uint32_t i = 1; i < story->index_count; i++) {
struct memtx_story_link *link = &story->link[i];
while (link->newer_story != NULL) {
struct memtx_story *test = link->newer_story;
link = &test->link[i];
struct txn_stmt *test_stmt = test->add_stmt;
/*
* Handle secondary cross-write conflict. This case is too
* complicated and deserves an explanation with example.
* Imagine a space with primary index (pk) by the first field
* and secondary index (sk) by the second field.
* Imagine then three in-progress transactions that executes
* replaces {1, 1, 1}, {2, 1, 2} and {1, 1, 3} correspondingly.
* What must happen when the first transaction commits?
* Both other transactions intersect the current in the sk.
* But the second transaction with {2, 1, 2} must be aborted
* (or sent to read view) because of conflict: it now introduces
* duplicate insertion to the sk.
* On the other hand the third transactions with {1, 1, 3} has
* a right to live since it tends to overwrite {1, 1, 1} in
* both pk and sk.
* To handle those conflicts in general we must scan chains
* towards the top and check insert statements.
*/
struct memtx_story *newer_story = story;
while (newer_story->link[i].newer_story != NULL) {
newer_story = newer_story->link[i].newer_story;
struct txn_stmt *test_stmt = newer_story->add_stmt;
/* Don't conflict own changes. */
if (test_stmt->txn == stmt->txn)
continue;
/*
* Ignore case when other TX executes insert after
* precedence delete.
*/
if (test_stmt->is_own_change &&
test_stmt->del_story == NULL)
continue;
if (test_stmt->del_story == story)
continue;
memtx_tx_handle_conflict(stmt->txn, test_stmt->txn);
/*
* Note that it's a secondary index and there's no
* need to call memtx_tx_story_link_deleted_by since
* all is done by the primary story chain.
* Ignore the case when other TX overwrites in both
* primary and secondary index.
*/
}
/* Now link is in top of chain, where gap records are stored. */
struct gap_item_base *item_base, *tmp;
rlist_foreach_entry_safe(item_base, &link->read_gaps,
in_read_gaps, tmp) {
if (item_base->txn == stmt->txn ||
item_base->type != GAP_INPLACE)
continue;
memtx_tx_handle_conflict(stmt->txn, item_base->txn);
}
}
for (uint32_t i = 0; i < index_count; i++) {
old_story = story->link[i].older_story;
if (old_story == NULL)
continue;
struct tx_read_tracker *tracker;
rlist_foreach_entry(tracker, &old_story->reader_list,
in_reader_list) {
if (tracker->reader == stmt->txn)
if (test_stmt->del_story == story)
continue;
memtx_tx_handle_conflict(stmt->txn, tracker->reader);
memtx_tx_handle_conflict(stmt->txn, test_stmt->txn);
}
/*
* We have already checked gap readers before for the case
* of insertion to the primary index.
* In any (replace or insert) case we must handle gap readers
* in the secondary indexes as well since in all kinds of
* statements can insert new value to secondary index.
* Note that newer_story is in top of chain due to previous
* manipulations.
*/
memtx_tx_handle_conflict_gap_readers(newer_story, i, stmt->txn);
}
for (uint32_t i = 0; i < index_count; i++) {
for (struct memtx_story *read_story = story;
read_story != NULL;
read_story = read_story->link[i].newer_story) {
struct tx_read_tracker *tracker;
rlist_foreach_entry(tracker, &read_story->reader_list,
in_reader_list) {
if (tracker->reader == stmt->txn)
continue;
memtx_tx_handle_conflict(stmt->txn,
tracker->reader);
}
}
}
/* Finally set PSNs in stories to mark them add/delete as prepared. */
stmt->add_story->add_psn = stmt->txn->psn;
if (stmt->del_story != NULL)
stmt->del_story->del_psn = stmt->txn->psn;
}
/**
......@@ -2466,55 +2503,59 @@ memtx_tx_history_prepare_delete_stmt(struct txn_stmt *stmt)
assert(stmt->add_story == NULL);
assert(stmt->del_story != NULL);
struct memtx_story *story = stmt->del_story;
/*
* There can be some transactions that want to delete old_story.
* All other transactions must be aborted.
* There can be other transactions that want to delete old_story.
* Since the story ends, all of them must be unlinked from the story.
*/
struct txn_stmt **itr = &story->del_stmt;
while (*itr != NULL) {
struct txn_stmt *test_stmt = *itr;
assert(test_stmt->del_story == story);
if (test_stmt->txn == stmt->txn) {
assert(test_stmt == stmt ||
test_stmt->add_story == NULL);
/*
* Statement from the same transaction. Go to the next
* statement.
*/
itr = &test_stmt->next_in_del_list;
struct txn_stmt **from = &stmt->del_story->del_stmt;
while (*from != NULL) {
struct txn_stmt *test_stmt = *from;
assert(test_stmt->del_story == stmt->del_story);
if (test_stmt == stmt) {
/* Leave this statement, go to the next. */
from = &test_stmt->next_in_del_list;
continue;
}
assert(test_stmt->txn != stmt->txn);
assert(test_stmt->del_story == stmt->del_story);
assert(test_stmt->txn->psn == 0);
/* Unlink from old list in any case. */
*itr = test_stmt->next_in_del_list;
/* Unlink from old story list. */
*from = test_stmt->next_in_del_list;
test_stmt->next_in_del_list = NULL;
test_stmt->del_story = NULL;
}
struct tx_read_tracker *tracker;
rlist_foreach_entry(tracker, &story->reader_list, in_reader_list) {
if (tracker->reader == stmt->txn)
continue;
memtx_tx_handle_conflict(stmt->txn, tracker->reader);
}
/*
* The story stmt->del_story ends by now. Every TX that
* depend on it must go to read view or be aborted.
*/
memtx_tx_handle_conflict_story_readers(stmt->del_story, stmt->txn);
/* Finally set PSN in story to mark its deletion as prepared. */
stmt->del_story->del_psn = stmt->txn->psn;
}
void
memtx_tx_history_prepare_stmt(struct txn_stmt *stmt)
{
assert(stmt->txn->psn != 0);
assert(stmt->space != NULL);
if (stmt->space->def->opts.is_ephemeral)
assert(stmt->add_story == NULL && stmt->del_story == NULL);
if (stmt->add_story != NULL) {
/*
* Note that both add_story and del_story can be NULL in cases:
* * The space is is_ephemeral.
* * It's an initial recovery.
* * It's a deletion from space by key that was not found in the space.
* In all these cases nothing must be done in MVCC engine.
*/
if (stmt->add_story != NULL)
memtx_tx_history_prepare_insert_stmt(stmt);
} else if (stmt->del_story != NULL) {
else if (stmt->del_story != NULL)
memtx_tx_history_prepare_delete_stmt(stmt);
}
if (stmt->add_story != NULL)
stmt->add_story->add_psn = stmt->txn->psn;
if (stmt->del_story != NULL)
stmt->del_story->del_psn = stmt->txn->psn;
memtx_tx_story_gc();
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment