diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index a1ce4fff08428a9a87d916214bf8a5680142c540..f5ace9268f3c1e14aaacc2164b9b19170b383e4e 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -436,8 +436,9 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn, /* Only roll back the changes if they were made. */ if (stmt->engine_savepoint == NULL) - index_count = 0; - else if (memtx_space->replace == memtx_space_replace_all_keys) + return; + + if (memtx_space->replace == memtx_space_replace_all_keys) index_count = space->index_count; else if (memtx_space->replace == memtx_space_replace_primary_key) index_count = 1; @@ -455,16 +456,12 @@ memtx_engine_rollback_statement(struct engine *engine, struct txn *txn, panic("failed to rollback change"); } } - /** Reset to old bsize, if it was changed. */ - if (stmt->engine_savepoint != NULL) - memtx_space_update_bsize(space, stmt->new_tuple, - stmt->old_tuple); - if (stmt->new_tuple) + memtx_space_update_bsize(space, stmt->new_tuple, stmt->old_tuple); + if (stmt->old_tuple != NULL) + tuple_ref(stmt->old_tuple); + if (stmt->new_tuple != NULL) tuple_unref(stmt->new_tuple); - - stmt->old_tuple = NULL; - stmt->new_tuple = NULL; } static void @@ -480,17 +477,6 @@ memtx_engine_rollback(struct engine *engine, struct txn *txn) memtx_engine_rollback_statement(engine, txn, stmt); } -static void -memtx_engine_commit(struct engine *engine, struct txn *txn) -{ - (void)engine; - struct txn_stmt *stmt; - stailq_foreach_entry(stmt, &txn->stmts, next) { - if (stmt->old_tuple) - tuple_unref(stmt->old_tuple); - } -} - static int memtx_engine_bootstrap(struct engine *engine) { @@ -973,7 +959,7 @@ static const struct engine_vtab memtx_engine_vtab = { /* .begin = */ memtx_engine_begin, /* .begin_statement = */ memtx_engine_begin_statement, /* .prepare = */ memtx_engine_prepare, - /* .commit = */ memtx_engine_commit, + /* .commit = */ generic_engine_commit, /* .rollback_statement = */ memtx_engine_rollback_statement, /* .rollback = */ memtx_engine_rollback, /* .bootstrap = */ memtx_engine_bootstrap, diff --git a/src/box/memtx_space.c b/src/box/memtx_space.c index 26d704821814c68d9bfee57f65cc0c331bee1bc5..08ae0daa3d34074776a1f02958fb0b3a5785cbb2 100644 --- a/src/box/memtx_space.c +++ b/src/box/memtx_space.c @@ -127,6 +127,7 @@ memtx_space_replace_build_next(struct space *space, struct tuple *old_tuple, if (index_build_next(space->index[0], new_tuple) != 0) return -1; memtx_space_update_bsize(space, NULL, new_tuple); + tuple_ref(new_tuple); return 0; } @@ -144,6 +145,8 @@ memtx_space_replace_primary_key(struct space *space, struct tuple *old_tuple, new_tuple, mode, &old_tuple) != 0) return -1; memtx_space_update_bsize(space, old_tuple, new_tuple); + if (new_tuple != NULL) + tuple_ref(new_tuple); *result = old_tuple; return 0; } @@ -273,6 +276,8 @@ memtx_space_replace_all_keys(struct space *space, struct tuple *old_tuple, } memtx_space_update_bsize(space, old_tuple, new_tuple); + if (new_tuple != NULL) + tuple_ref(new_tuple); *result = old_tuple; return 0; @@ -338,11 +343,9 @@ memtx_space_execute_replace(struct space *space, struct txn *txn, if (stmt->new_tuple == NULL) return -1; tuple_ref(stmt->new_tuple); - struct tuple *old_tuple; - if (memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple, - mode, &old_tuple) != 0) + if (memtx_space->replace(space, NULL, stmt->new_tuple, + mode, &stmt->old_tuple) != 0) return -1; - stmt->old_tuple = old_tuple; stmt->engine_savepoint = stmt; /** The new tuple is referenced by the primary key. */ *result = stmt->new_tuple; @@ -363,14 +366,13 @@ memtx_space_execute_delete(struct space *space, struct txn *txn, uint32_t part_count = mp_decode_array(&key); if (exact_key_validate(pk->def->key_def, key, part_count) != 0) return -1; - if (index_get(pk, key, part_count, &stmt->old_tuple) != 0) + struct tuple *old_tuple; + if (index_get(pk, key, part_count, &old_tuple) != 0) return -1; - struct tuple *old_tuple = NULL; - if (stmt->old_tuple != NULL && - memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple, - DUP_REPLACE_OR_INSERT, &old_tuple) != 0) + if (old_tuple != NULL && + memtx_space->replace(space, old_tuple, NULL, + DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0) return -1; - stmt->old_tuple = old_tuple; stmt->engine_savepoint = stmt; *result = stmt->old_tuple; return 0; @@ -390,17 +392,18 @@ memtx_space_execute_update(struct space *space, struct txn *txn, uint32_t part_count = mp_decode_array(&key); if (exact_key_validate(pk->def->key_def, key, part_count) != 0) return -1; - if (index_get(pk, key, part_count, &stmt->old_tuple) != 0) + struct tuple *old_tuple; + if (index_get(pk, key, part_count, &old_tuple) != 0) return -1; - if (stmt->old_tuple == NULL) { + if (old_tuple == NULL) { *result = NULL; return 0; } /* Update the tuple; legacy, request ops are in request->tuple */ uint32_t new_size = 0, bsize; - const char *old_data = tuple_data_range(stmt->old_tuple, &bsize); + const char *old_data = tuple_data_range(old_tuple, &bsize); const char *new_data = tuple_update_execute(region_aligned_alloc_cb, &fiber()->gc, request->tuple, request->tuple_end, @@ -414,12 +417,9 @@ memtx_space_execute_update(struct space *space, struct txn *txn, if (stmt->new_tuple == NULL) return -1; tuple_ref(stmt->new_tuple); - struct tuple *old_tuple = NULL; - if (stmt->old_tuple != NULL && - memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple, - DUP_REPLACE, &old_tuple) != 0) + if (memtx_space->replace(space, old_tuple, stmt->new_tuple, + DUP_REPLACE, &stmt->old_tuple) != 0) return -1; - stmt->old_tuple = old_tuple; stmt->engine_savepoint = stmt; *result = stmt->new_tuple; return 0; @@ -453,10 +453,11 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn, mp_decode_array(&key); /* Try to find the tuple by primary key. */ - if (index_get(index, key, part_count, &stmt->old_tuple) != 0) + struct tuple *old_tuple; + if (index_get(index, key, part_count, &old_tuple) != 0) return -1; - if (stmt->old_tuple == NULL) { + if (old_tuple == NULL) { /** * Old tuple was not found. A write optimized * engine may only know this after commit, so @@ -486,8 +487,7 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn, tuple_ref(stmt->new_tuple); } else { uint32_t new_size = 0, bsize; - const char *old_data = tuple_data_range(stmt->old_tuple, - &bsize); + const char *old_data = tuple_data_range(old_tuple, &bsize); /* * Update the tuple. * tuple_upsert_execute() fails on totally wrong @@ -514,14 +514,13 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn, struct index *pk = space->index[0]; if (!key_update_can_be_skipped(pk->def->key_def->column_mask, column_mask) && - tuple_compare(stmt->old_tuple, stmt->new_tuple, + tuple_compare(old_tuple, stmt->new_tuple, pk->def->key_def) != 0) { /* Primary key is changed: log error and do nothing. */ diag_set(ClientError, ER_CANT_UPDATE_PRIMARY_KEY, pk->def->name, space_name(space)); diag_log(); tuple_unref(stmt->new_tuple); - stmt->old_tuple = NULL; stmt->new_tuple = NULL; } } @@ -531,12 +530,10 @@ memtx_space_execute_upsert(struct space *space, struct txn *txn, * we checked this case explicitly and skipped the upsert * above. */ - struct tuple *old_tuple = NULL; if (stmt->new_tuple != NULL && - memtx_space->replace(space, stmt->old_tuple, stmt->new_tuple, - DUP_REPLACE_OR_INSERT, &old_tuple) != 0) + memtx_space->replace(space, old_tuple, stmt->new_tuple, + DUP_REPLACE_OR_INSERT, &stmt->old_tuple) != 0) return -1; - stmt->old_tuple = old_tuple; stmt->engine_savepoint = stmt; /* Return nothing: UPSERT does not return data. */ return 0; diff --git a/src/box/txn.c b/src/box/txn.c index cb301015c36631e52046258e1c9de3046f8de03e..fbce612a5103780b5d32e1579cc878bf962f9068 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -99,6 +99,15 @@ txn_stmt_new(struct txn *txn) return stmt; } +static inline void +txn_stmt_unref_tuples(struct txn_stmt *stmt) +{ + if (stmt->old_tuple != NULL) + tuple_unref(stmt->old_tuple); + if (stmt->new_tuple != NULL) + tuple_unref(stmt->new_tuple); +} + static void txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) { @@ -116,6 +125,7 @@ txn_rollback_to_svp(struct txn *txn, struct stailq_entry *svp) txn->n_rows--; stmt->row = NULL; } + txn_stmt_unref_tuples(stmt); } } @@ -325,6 +335,10 @@ txn_commit(struct txn *txn) if (txn->engine != NULL) engine_commit(txn->engine, txn); + struct txn_stmt *stmt; + stailq_foreach_entry(stmt, &txn->stmts, next) + txn_stmt_unref_tuples(stmt); + TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); @@ -362,6 +376,11 @@ txn_rollback() } if (txn->engine) engine_rollback(txn->engine, txn); + + struct txn_stmt *stmt; + stailq_foreach_entry(stmt, &txn->stmts, next) + txn_stmt_unref_tuples(stmt); + TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); diff --git a/src/box/vinyl.c b/src/box/vinyl.c index f02fa638802cdb455cec66f100ebadccf882d6ec..374c52525ba5cc24e38f9fd7917122fed1afcb63 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -2324,17 +2324,6 @@ vinyl_space_execute_upsert(struct space *space, struct txn *txn, return vy_upsert(env, tx, stmt, space, request); } -static inline void -txn_stmt_unref_tuples(struct txn_stmt *stmt) -{ - if (stmt->old_tuple) - tuple_unref(stmt->old_tuple); - if (stmt->new_tuple) - tuple_unref(stmt->new_tuple); - stmt->old_tuple = NULL; - stmt->new_tuple = NULL; -} - static int vinyl_engine_begin(struct engine *engine, struct txn *txn) { @@ -2442,11 +2431,7 @@ vinyl_engine_commit(struct engine *engine, struct txn *txn) /* We can't abort the transaction at this point, use force. */ vy_quota_force_use(&env->quota, mem_used_after - mem_used_before); - struct txn_stmt *stmt; - stailq_foreach_entry(stmt, &txn->stmts, next) - txn_stmt_unref_tuples(stmt); txn->engine_tx = NULL; - if (!txn->is_autocommit) trigger_clear(&txn->fiber_on_stop); } @@ -2461,11 +2446,7 @@ vinyl_engine_rollback(struct engine *engine, struct txn *txn) vy_tx_rollback(tx); - struct txn_stmt *stmt; - stailq_foreach_entry(stmt, &txn->stmts, next) - txn_stmt_unref_tuples(stmt); txn->engine_tx = NULL; - if (!txn->is_autocommit) trigger_clear(&txn->fiber_on_stop); } @@ -2489,7 +2470,6 @@ vinyl_engine_rollback_statement(struct engine *engine, struct txn *txn, struct vy_tx *tx = txn->engine_tx; assert(tx != NULL); vy_tx_rollback_to_savepoint(tx, stmt->engine_savepoint); - txn_stmt_unref_tuples(stmt); } /* }}} Public API of transaction control */ @@ -3290,7 +3270,10 @@ vinyl_space_apply_initial_join_row(struct space *space, struct request *request) else vy_tx_rollback(tx); - txn_stmt_unref_tuples(&stmt); + if (stmt.old_tuple != NULL) + tuple_unref(stmt.old_tuple); + if (stmt.new_tuple != NULL) + tuple_unref(stmt.new_tuple); size_t mem_used_after = lsregion_used(&env->mem_env.allocator); assert(mem_used_after >= mem_used_before);