diff --git a/src/box/engine.cc b/src/box/engine.cc index c459c627ecd13e92da946325fd53dbeace77f7a6..338142a293ed2b80a1ccde6cefa53af58b56373a 100644 --- a/src/box/engine.cc +++ b/src/box/engine.cc @@ -45,19 +45,16 @@ Engine::Engine(const char *engine_name) void Engine::init() {} -void Engine::begin(struct txn*, struct space*) +void Engine::beginStatement(struct txn *) {} -void Engine::commit(struct txn*) +void Engine::commit(struct txn *) {} -void Engine::rollback(struct txn*) +void Engine::rollback(struct txn *) {} -void Engine::rollbackStmt(struct txn_stmt*) -{} - -void Engine::finish(struct txn*, bool) +void Engine::rollbackStatement(struct txn_stmt *) {} void Engine::initSystemSpace(struct space * /* space */) diff --git a/src/box/engine.h b/src/box/engine.h index c8570c5c2f0a3c3b7a40b03a7a8eb09b0ab9c310..d64ed7739985a228779fb246f43ab56889c9117f 100644 --- a/src/box/engine.h +++ b/src/box/engine.h @@ -105,24 +105,19 @@ class Engine: public Object { virtual void join(Relay *) = 0; /** - * Engine specific transaction life-cycle routines. - */ - virtual void begin(struct txn *, struct space *); + * Begin a new statement in an existing or new + * transaction. + * We use a single call to save a virtual method call + * since it's always clear from txn whether it's + * autocommit mode or not, the first statement or + * a subsequent statement. Effectively it means that + * transaction in the engine begins with the first + * statement. + */ + virtual void beginStatement(struct txn *); virtual void commit(struct txn *); - virtual void rollbackStmt(struct txn_stmt *); + virtual void rollbackStatement(struct txn_stmt *); virtual void rollback(struct txn *); - /** - * Called at the end of a transaction, if a transaction - * has committed. Is an - * artifact of autocommit mode, when commit - * happens *before* the result is sent to the client, - * and is used to dereference tuples which otherwise - * could have been freed already at commit. - * Has nothing to do in case of rollback, since - * in that case the client doesn't get any tuples - * back, so they can be freed already in rollback. - */ - virtual void finish(struct txn *, bool); /** * Recover the engine to a checkpoint it has. * After that the engine will be given rows diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index dd218d6e2df2b42ae0ad8ea2bfd5e89f43fc3319..7cae56b639345566098bfa842ca0fce4857f2109 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -368,7 +368,6 @@ lbox_commit(lua_State * /* L */) txn_rollback(); throw; } - txn_finish(txn, true); return 0; } diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index a51fd2f29a269d1f98655959b301f3d5b7d68aa5..53f8c204102ee82199ad382934e1b88374946878 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -109,6 +109,7 @@ memtx_replace_build_next(struct space *space, struct tuple *old_tuple, "from snapshot"); } space->index[0]->buildNext(new_tuple); + tuple_ref(new_tuple); return NULL; /* replace found no old tuple */ } @@ -120,7 +121,10 @@ struct tuple * memtx_replace_primary_key(struct space *space, struct tuple *old_tuple, struct tuple *new_tuple, enum dup_replace_mode mode) { - return space->index[0]->replace(old_tuple, new_tuple, mode); + struct tuple *dup = space->index[0]->replace(old_tuple, new_tuple, mode); + if (new_tuple) + tuple_ref(new_tuple); + return dup; } static struct tuple * @@ -145,7 +149,6 @@ memtx_replace_all_keys(struct space *space, struct tuple *old_tuple, Index *index = space->index[i]; index->replace(old_tuple, new_tuple, DUP_INSERT); } - return old_tuple; } catch (Exception *e) { /* Rollback all changes */ for (; i > 0; i--) { @@ -154,9 +157,9 @@ memtx_replace_all_keys(struct space *space, struct tuple *old_tuple, } throw; } - - assert(false); - return NULL; + if (new_tuple) + tuple_ref(new_tuple); + return old_tuple; } static void @@ -433,7 +436,7 @@ MemtxEngine::keydefCheck(struct space *space, struct key_def *key_def) } void -MemtxEngine::rollbackStmt(struct txn_stmt *stmt) +MemtxEngine::rollbackStatement(struct txn_stmt *stmt) { if (stmt->old_tuple || stmt->new_tuple) { @@ -455,23 +458,19 @@ MemtxEngine::rollback(struct txn *txn) stmt->old_tuple, DUP_INSERT); } } + rlist_foreach_entry(stmt, &txn->stmts, next) { + if (stmt->new_tuple) + tuple_unref(stmt->new_tuple); + } } void -MemtxEngine::finish(struct txn *txn, bool commit) +MemtxEngine::commit(struct txn *txn) { struct txn_stmt *stmt; - if (commit) { - rlist_foreach_entry(stmt, &txn->stmts, next) { - if (stmt->old_tuple) - tuple_unref(stmt->old_tuple); - } - return; - } - /* rollback */ rlist_foreach_entry(stmt, &txn->stmts, next) { - if (stmt->new_tuple) - tuple_unref(stmt->new_tuple); + if (stmt->old_tuple) + tuple_unref(stmt->old_tuple); } } diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h index 46f61775bed4d8f80bbc894bfd3e0249af765ac2..a12cd0606befcc10e966df81aa3d0a37be2d905e 100644 --- a/src/box/memtx_engine.h +++ b/src/box/memtx_engine.h @@ -46,9 +46,9 @@ struct MemtxEngine: public Engine { virtual void dropPrimaryKey(struct space *space); virtual bool needToBuildSecondaryKey(struct space *space); virtual void keydefCheck(struct space *space, struct key_def *key_def); - virtual void rollbackStmt(struct txn_stmt*); - virtual void rollback(struct txn*); - virtual void finish(struct txn*, bool); + virtual void rollbackStatement(struct txn_stmt *stmt); + virtual void rollback(struct txn *txn); + virtual void commit(struct txn *txn); virtual void beginJoin(); virtual void recoverToCheckpoint(int64_t lsn); virtual void endRecovery(); diff --git a/src/box/request.cc b/src/box/request.cc index e79ffbd22bf81d8ae9988c0d86b99e1ea1e38144..b43bf195a76fec10fe3c27534384c9689b9aab92 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -54,7 +54,7 @@ static void execute_replace(struct request *request, struct port *port) { struct space *space = space_cache_find(request->space_id); - struct txn *txn = txn_begin_stmt(request, space); + struct txn *txn = txn_begin_stmt(request, space->handler->engine); access_check_space(space, PRIV_W); struct tuple *new_tuple = tuple_new(space->format, request->tuple, @@ -77,7 +77,7 @@ static void execute_update(struct request *request, struct port *port) { struct space *space = space_cache_find(request->space_id); - struct txn *txn = txn_begin_stmt(request, space); + struct txn *txn = txn_begin_stmt(request, space->handler->engine); access_check_space(space, PRIV_W); Index *pk = index_find(space, 0); @@ -118,7 +118,7 @@ static void execute_delete(struct request *request, struct port *port) { struct space *space = space_cache_find(request->space_id); - struct txn *txn = txn_begin_stmt(request, space); + struct txn *txn = txn_begin_stmt(request, space->handler->engine); access_check_space(space, PRIV_W); diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc index f5f807fe309b4edf4edfb6480f730c39f08ce76f..d17197b3fda20e5d3e24bf94aa8b763459c20b06 100644 --- a/src/box/sophia_engine.cc +++ b/src/box/sophia_engine.cc @@ -299,20 +299,14 @@ SophiaEngine::keydefCheck(struct space *space, struct key_def *key_def) } void -SophiaEngine::begin(struct txn *txn, struct space *space) +SophiaEngine::beginStatement(struct txn *txn) { - assert(space->handler->engine == this); + assert(txn->engine_tx == NULL || txn->n_stmts != 1); if (txn->n_stmts == 1) { - assert(txn->engine_tx == NULL); - SophiaIndex *index = (SophiaIndex *)index_find(space, 0); - (void) index; - assert(index->db != NULL); txn->engine_tx = sp_begin(env); if (txn->engine_tx == NULL) sophia_raise(env); - return; } - assert(txn->engine_tx != NULL); } void @@ -343,38 +337,22 @@ SophiaEngine::commit(struct txn *txn) } rc = sp_commit(txn->engine_tx); if (rc == -1) { - panic("sophia commit failed: txn->signature = %" PRIu64, txn->signature); + panic("sophia commit failed: txn->signature = %" + PRIu64, txn->signature); } } void -SophiaEngine::rollbackStmt(struct txn_stmt *stmt) +SophiaEngine::rollbackStatement(struct txn_stmt *) { - if (stmt->old_tuple) - tuple_unref(stmt->old_tuple); - if (stmt->new_tuple) - tuple_unref(stmt->new_tuple); + panic("not implemented"); } void SophiaEngine::rollback(struct txn *txn) { - if (txn->engine_tx == NULL) - return; - sp_destroy(txn->engine_tx); - txn->engine_tx = NULL; -} - -void -SophiaEngine::finish(struct txn *txn, bool) -{ - struct txn_stmt *stmt; - rlist_foreach_entry(stmt, &txn->stmts, next) { - if (stmt->old_tuple) - tuple_unref(stmt->old_tuple); - if (stmt->new_tuple) - tuple_unref(stmt->new_tuple); - } + if (txn->engine_tx) + sp_destroy(txn->engine_tx); } void diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h index 8f5c521b65f2141ab6c314932da1ceb70ac18724..ab78441c59e92dc1df6b8816ba3fc9e74aaa4319 100644 --- a/src/box/sophia_engine.h +++ b/src/box/sophia_engine.h @@ -37,11 +37,10 @@ struct SophiaEngine: public Engine { virtual Index *createIndex(struct key_def *); virtual void dropIndex(Index*); virtual void keydefCheck(struct space *space, struct key_def *f); - virtual void begin(struct txn *txn, struct space *space); + virtual void beginStatement(struct txn *txn); virtual void commit(struct txn *txn); - virtual void rollbackStmt(struct txn_stmt *stmt); + virtual void rollbackStatement(struct txn_stmt *stmt); virtual void rollback(struct txn *txn); - virtual void finish(struct txn *txn, bool is_commit); virtual void beginJoin(); virtual void recoverToCheckpoint(int64_t); virtual void endRecovery(); diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc index f4b9b2d276eae68ec1793c7e32f5d4778b2fba6a..f3b4d102cc73a70c50bf431f03c5d107839cf447 100644 --- a/src/box/sophia_index.cc +++ b/src/box/sophia_index.cc @@ -252,7 +252,6 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple, /* delete */ if (old_tuple && new_tuple == NULL) { sophia_write(env, db, tx, SOPHIA_DELETE, key_def, old_tuple); - tuple_ref(old_tuple); return old_tuple; } @@ -273,11 +272,8 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple, struct tuple *dup_tuple = sophia_read(env, db, tx, key, keysize, space->format); if (dup_tuple) { - tuple_ref(dup_tuple); - int error = 0; - if (tuple_compare(dup_tuple, new_tuple, key_def) == 0) - error = 1; - tuple_unref(dup_tuple); + int error = tuple_compare(dup_tuple, new_tuple, key_def) == 0; + tuple_delete(dup_tuple); if (error) { struct space *sp = space_cache_find(key_def->space_id); diff --git a/src/box/txn.cc b/src/box/txn.cc index cb4fc3e74cac66052234c662691c6327e0176c87..ab5f2d4a33faafbc748eaded6cdeeb021d022b07 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -83,10 +83,7 @@ txn_replace(struct txn *txn, struct space *space, * another transaction in rollback(). */ stmt->old_tuple = space_replace(space, old_tuple, new_tuple, mode); - if (new_tuple) { - stmt->new_tuple = new_tuple; - tuple_ref(stmt->new_tuple); - } + stmt->new_tuple = new_tuple; stmt->space = space; /* @@ -144,54 +141,33 @@ txn_begin(bool autocommit) rlist_nil, txn_on_yield_or_stop, NULL, NULL }; txn->autocommit = autocommit; - txn->engine_tx = NULL; fiber_set_txn(fiber(), txn); return txn; } -static void -txn_engine_begin_stmt(struct txn *txn, struct space *space) -{ - assert(txn->n_stmts >= 1); - /** - * Notify storage engine about the transaction. - * Ensure various storage engine constraints: - * a. check if it supports multi-statement transactions - * b. only one engine can be used in a multi-statement - * transaction - */ - Engine *engine = space->handler->engine; - if (txn->n_stmts == 1) { - /* First statement. */ - txn->engine = engine; - } else { - if (txn->engine->id != engine_id(space->handler)) - tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION); - } - engine->begin(txn, space); -} - struct txn * -txn_begin_stmt(struct request *request, struct space *space) +txn_begin_stmt(struct request *request, Engine *engine) { struct txn *txn = in_txn(); if (txn == NULL) txn = txn_begin(true); + + if (txn->engine == NULL) { + assert(txn->n_stmts == 0); + txn->engine = engine; + } else if (txn->engine != engine) { + /** + * Only one engine can be used in + * a multi-statement transaction currently. + */ + tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION); + } struct txn_stmt *stmt = txn_stmt_new(txn); txn_add_redo(stmt, request); - txn_engine_begin_stmt(txn, space); + engine->beginStatement(txn); return txn; } -void -txn_commit_stmt(struct txn *txn) -{ - if (txn->autocommit) { - txn_commit(txn); - txn_finish(txn, true); - } -} - void txn_commit(struct txn *txn) { @@ -220,18 +196,10 @@ txn_commit(struct txn *txn) txn->signature = res; } + trigger_run(&txn->on_commit, txn); /* must not throw. */ /* xxx: engine commit may throw on conflict or error */ if (txn->engine) txn->engine->commit(txn); - - trigger_run(&txn->on_commit, txn); /* must not throw. */ -} - -void -txn_finish(struct txn *txn, bool commit) -{ - if (txn->engine) - txn->engine->finish(txn, commit); TRASH(txn); /** Free volatile txn memory. */ fiber_gc(); @@ -253,7 +221,7 @@ txn_rollback_stmt() if (txn->autocommit) return txn_rollback(); struct txn_stmt *stmt = txn_stmt(txn); - txn->engine->rollbackStmt(stmt); + txn->engine->rollbackStatement(stmt); stmt->old_tuple = NULL; stmt->new_tuple = NULL; stmt->space = NULL; @@ -266,13 +234,16 @@ txn_rollback() struct txn *txn = in_txn(); if (txn == NULL) return; + trigger_run(&txn->on_rollback, txn); /* must not throw. */ if (txn->engine) txn->engine->rollback(txn); - trigger_run(&txn->on_rollback, txn); /* must not throw. */ /* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */ trigger_clear(&txn->fiber_on_yield); trigger_clear(&txn->fiber_on_stop); - txn_finish(txn, false); + TRASH(txn); + /** Free volatile txn memory. */ + fiber_gc(); + fiber_set_txn(fiber(), NULL); } void diff --git a/src/box/txn.h b/src/box/txn.h index 9a205fa1e1454fbc8aa61cd19a4031f4689767bd..1798d9773e719e85d44cc564e6d50f008ec27a55 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -86,50 +86,47 @@ in_txn() } /** - * Start a new statement. If no current transaction, - * start a new transaction with autocommit = true. + * Start a transaction explicitly. + * @pre no transaction is active */ struct txn * -txn_begin_stmt(struct request *request, struct space *space); +txn_begin(bool autocommit); /** - * End a statement. In autocommit mode, end - * the current transaction as well. + * Commit a transaction. + * @pre txn == in_txn() */ void -txn_commit_stmt(struct txn *txn); +txn_commit(struct txn *txn); -/** - * Rollback a statement. In autocommit mode, - * rolls back the entire transaction. - */ +/** Rollback a transaction, if any. */ void -txn_rollback_stmt(); +txn_rollback(); /** - * Start a transaction explicitly. - * @pre no transaction is active + * Start a new statement. If no current transaction, + * start a new transaction with autocommit = true. */ struct txn * -txn_begin(bool autocommit); +txn_begin_stmt(struct request *request, Engine *engine); /** - * Commit a transaction. txn_finish must be called after that. - * @pre txn == in_txn() + * End a statement. In autocommit mode, end + * the current transaction as well. */ -void -txn_commit(struct txn *txn); +static inline void +txn_commit_stmt(struct txn *txn) +{ + if (txn->autocommit) + txn_commit(txn); +} /** - * Finish a transaction. Must be called after txn_commit. - * @pre txn == in_txn() + * Rollback a statement. In autocommit mode, + * rolls back the entire transaction. */ void -txn_finish(struct txn *txn, bool commit); - -/** Rollback a transaction, if any. */ -void -txn_rollback(); +txn_rollback_stmt(); /** * Raise an error if this is a multi-statement