From 6bdb67c6a962d1521732f5452d7cca5298faab1d Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Tue, 21 Apr 2015 17:32:54 +0300
Subject: [PATCH] sophia-integration: review fixes

Remove Engine::finish().
Encapsulate tuple reference management into Engine.

Sophia::rollbackStatement is not implemented.

long-run tests don't pass for sophia.
---
 src/box/engine.cc        | 11 +++----
 src/box/engine.h         | 27 +++++++--------
 src/box/lua/call.cc      |  1 -
 src/box/memtx_engine.cc  | 33 +++++++++----------
 src/box/memtx_engine.h   |  6 ++--
 src/box/request.cc       |  6 ++--
 src/box/sophia_engine.cc | 38 +++++----------------
 src/box/sophia_engine.h  |  5 ++-
 src/box/sophia_index.cc  |  8 ++---
 src/box/txn.cc           | 71 ++++++++++++----------------------------
 src/box/txn.h            | 47 +++++++++++++-------------
 11 files changed, 92 insertions(+), 161 deletions(-)

diff --git a/src/box/engine.cc b/src/box/engine.cc
index c459c627ec..338142a293 100644
--- a/src/box/engine.cc
+++ b/src/box/engine.cc
@@ -45,19 +45,16 @@ Engine::Engine(const char *engine_name)
 void Engine::init()
 {}
 
-void Engine::begin(struct txn*, struct space*)
+void Engine::beginStatement(struct txn *)
 {}
 
-void Engine::commit(struct txn*)
+void Engine::commit(struct txn *)
 {}
 
-void Engine::rollback(struct txn*)
+void Engine::rollback(struct txn *)
 {}
 
-void Engine::rollbackStmt(struct txn_stmt*)
-{}
-
-void Engine::finish(struct txn*, bool)
+void Engine::rollbackStatement(struct txn_stmt *)
 {}
 
 void Engine::initSystemSpace(struct space * /* space */)
diff --git a/src/box/engine.h b/src/box/engine.h
index c8570c5c2f..d64ed77399 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -105,24 +105,19 @@ class Engine: public Object {
 
 	virtual void join(Relay *) = 0;
 	/**
-	 * Engine specific transaction life-cycle routines.
-	 */
-	virtual void begin(struct txn *, struct space *);
+	 * Begin a new statement in an existing or new
+	 * transaction.
+	 * We use a single call to save a virtual method call
+	 * since it's always clear from txn whether it's
+	 * autocommit mode or not, the first statement or
+	 * a subsequent statement.  Effectively it means that
+	 * transaction in the engine begins with the first
+	 * statement.
+	 */
+	virtual void beginStatement(struct txn *);
 	virtual void commit(struct txn *);
-	virtual void rollbackStmt(struct txn_stmt *);
+	virtual void rollbackStatement(struct txn_stmt *);
 	virtual void rollback(struct txn *);
-	/**
-	 * Called at the end of a transaction, if a transaction
-	 * has committed. Is an
-	 * artifact of autocommit mode, when commit
-	 * happens *before* the result is sent to the client,
-	 * and is used to dereference tuples which otherwise
-	 * could have been freed already at commit.
-	 * Has nothing to do in case of rollback, since
-	 * in that case the client doesn't get any tuples
-	 * back, so they can be freed already in rollback.
-	 */
-	virtual void finish(struct txn *, bool);
 	/**
 	 * Recover the engine to a checkpoint it has.
 	 * After that the engine will be given rows
diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc
index dd218d6e2d..7cae56b639 100644
--- a/src/box/lua/call.cc
+++ b/src/box/lua/call.cc
@@ -368,7 +368,6 @@ lbox_commit(lua_State * /* L */)
 		txn_rollback();
 		throw;
 	}
-	txn_finish(txn, true);
 	return 0;
 }
 
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index a51fd2f29a..53f8c20410 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -109,6 +109,7 @@ memtx_replace_build_next(struct space *space, struct tuple *old_tuple,
 		      "from snapshot");
 	}
 	space->index[0]->buildNext(new_tuple);
+	tuple_ref(new_tuple);
 	return NULL; /* replace found no old tuple */
 }
 
@@ -120,7 +121,10 @@ struct tuple *
 memtx_replace_primary_key(struct space *space, struct tuple *old_tuple,
 			  struct tuple *new_tuple, enum dup_replace_mode mode)
 {
-	return space->index[0]->replace(old_tuple, new_tuple, mode);
+	struct tuple *dup = space->index[0]->replace(old_tuple, new_tuple, mode);
+	if (new_tuple)
+		tuple_ref(new_tuple);
+	return dup;
 }
 
 static struct tuple *
@@ -145,7 +149,6 @@ memtx_replace_all_keys(struct space *space, struct tuple *old_tuple,
 			Index *index = space->index[i];
 			index->replace(old_tuple, new_tuple, DUP_INSERT);
 		}
-		return old_tuple;
 	} catch (Exception *e) {
 		/* Rollback all changes */
 		for (; i > 0; i--) {
@@ -154,9 +157,9 @@ memtx_replace_all_keys(struct space *space, struct tuple *old_tuple,
 		}
 		throw;
 	}
-
-	assert(false);
-	return NULL;
+	if (new_tuple)
+		tuple_ref(new_tuple);
+	return old_tuple;
 }
 
 static void
@@ -433,7 +436,7 @@ MemtxEngine::keydefCheck(struct space *space, struct key_def *key_def)
 }
 
 void
-MemtxEngine::rollbackStmt(struct txn_stmt *stmt)
+MemtxEngine::rollbackStatement(struct txn_stmt *stmt)
 {
 	if (stmt->old_tuple || stmt->new_tuple)
 	{
@@ -455,23 +458,19 @@ MemtxEngine::rollback(struct txn *txn)
 				      stmt->old_tuple, DUP_INSERT);
 		}
 	}
+	rlist_foreach_entry(stmt, &txn->stmts, next) {
+		if (stmt->new_tuple)
+			tuple_unref(stmt->new_tuple);
+	}
 }
 
 void
-MemtxEngine::finish(struct txn *txn, bool commit)
+MemtxEngine::commit(struct txn *txn)
 {
 	struct txn_stmt *stmt;
-	if (commit) {
-		rlist_foreach_entry(stmt, &txn->stmts, next) {
-			if (stmt->old_tuple)
-				tuple_unref(stmt->old_tuple);
-		}
-		return;
-	}
-	/* rollback */
 	rlist_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->new_tuple)
-			tuple_unref(stmt->new_tuple);
+		if (stmt->old_tuple)
+			tuple_unref(stmt->old_tuple);
 	}
 }
 
diff --git a/src/box/memtx_engine.h b/src/box/memtx_engine.h
index 46f61775be..a12cd0606b 100644
--- a/src/box/memtx_engine.h
+++ b/src/box/memtx_engine.h
@@ -46,9 +46,9 @@ struct MemtxEngine: public Engine {
 	virtual void dropPrimaryKey(struct space *space);
 	virtual bool needToBuildSecondaryKey(struct space *space);
 	virtual void keydefCheck(struct space *space, struct key_def *key_def);
-	virtual void rollbackStmt(struct txn_stmt*);
-	virtual void rollback(struct txn*);
-	virtual void finish(struct txn*, bool);
+	virtual void rollbackStatement(struct txn_stmt *stmt);
+	virtual void rollback(struct txn *txn);
+	virtual void commit(struct txn *txn);
 	virtual void beginJoin();
 	virtual void recoverToCheckpoint(int64_t lsn);
 	virtual void endRecovery();
diff --git a/src/box/request.cc b/src/box/request.cc
index e79ffbd22b..b43bf195a7 100644
--- a/src/box/request.cc
+++ b/src/box/request.cc
@@ -54,7 +54,7 @@ static void
 execute_replace(struct request *request, struct port *port)
 {
 	struct space *space = space_cache_find(request->space_id);
-	struct txn *txn = txn_begin_stmt(request, space);
+	struct txn *txn = txn_begin_stmt(request, space->handler->engine);
 
 	access_check_space(space, PRIV_W);
 	struct tuple *new_tuple = tuple_new(space->format, request->tuple,
@@ -77,7 +77,7 @@ static void
 execute_update(struct request *request, struct port *port)
 {
 	struct space *space = space_cache_find(request->space_id);
-	struct txn *txn = txn_begin_stmt(request, space);
+	struct txn *txn = txn_begin_stmt(request, space->handler->engine);
 
 	access_check_space(space, PRIV_W);
 	Index *pk = index_find(space, 0);
@@ -118,7 +118,7 @@ static void
 execute_delete(struct request *request, struct port *port)
 {
 	struct space *space = space_cache_find(request->space_id);
-	struct txn *txn = txn_begin_stmt(request, space);
+	struct txn *txn = txn_begin_stmt(request, space->handler->engine);
 
 	access_check_space(space, PRIV_W);
 
diff --git a/src/box/sophia_engine.cc b/src/box/sophia_engine.cc
index f5f807fe30..d17197b3fd 100644
--- a/src/box/sophia_engine.cc
+++ b/src/box/sophia_engine.cc
@@ -299,20 +299,14 @@ SophiaEngine::keydefCheck(struct space *space, struct key_def *key_def)
 }
 
 void
-SophiaEngine::begin(struct txn *txn, struct space *space)
+SophiaEngine::beginStatement(struct txn *txn)
 {
-	assert(space->handler->engine == this);
+	assert(txn->engine_tx == NULL || txn->n_stmts != 1);
 	if (txn->n_stmts == 1) {
-		assert(txn->engine_tx == NULL);
-		SophiaIndex *index = (SophiaIndex *)index_find(space, 0);
-		(void) index;
-		assert(index->db != NULL);
 		txn->engine_tx = sp_begin(env);
 		if (txn->engine_tx == NULL)
 			sophia_raise(env);
-		return;
 	}
-	assert(txn->engine_tx != NULL);
 }
 
 void
@@ -343,38 +337,22 @@ SophiaEngine::commit(struct txn *txn)
 	}
 	rc = sp_commit(txn->engine_tx);
 	if (rc == -1) {
-		panic("sophia commit failed: txn->signature = %" PRIu64, txn->signature);
+		panic("sophia commit failed: txn->signature = %"
+		      PRIu64, txn->signature);
 	}
 }
 
 void
-SophiaEngine::rollbackStmt(struct txn_stmt *stmt)
+SophiaEngine::rollbackStatement(struct txn_stmt *)
 {
-	if (stmt->old_tuple)
-		tuple_unref(stmt->old_tuple);
-	if (stmt->new_tuple)
-		tuple_unref(stmt->new_tuple);
+	panic("not implemented");
 }
 
 void
 SophiaEngine::rollback(struct txn *txn)
 {
-	if (txn->engine_tx == NULL)
-		return;
-	sp_destroy(txn->engine_tx);
-	txn->engine_tx = NULL;
-}
-
-void
-SophiaEngine::finish(struct txn *txn, bool)
-{
-	struct txn_stmt *stmt;
-	rlist_foreach_entry(stmt, &txn->stmts, next) {
-		if (stmt->old_tuple)
-			tuple_unref(stmt->old_tuple);
-		if (stmt->new_tuple)
-			tuple_unref(stmt->new_tuple);
-	}
+	if (txn->engine_tx)
+		sp_destroy(txn->engine_tx);
 }
 
 void
diff --git a/src/box/sophia_engine.h b/src/box/sophia_engine.h
index 8f5c521b65..ab78441c59 100644
--- a/src/box/sophia_engine.h
+++ b/src/box/sophia_engine.h
@@ -37,11 +37,10 @@ struct SophiaEngine: public Engine {
 	virtual Index *createIndex(struct key_def *);
 	virtual void dropIndex(Index*);
 	virtual void keydefCheck(struct space *space, struct key_def *f);
-	virtual void begin(struct txn *txn, struct space *space);
+	virtual void beginStatement(struct txn *txn);
 	virtual void commit(struct txn *txn);
-	virtual void rollbackStmt(struct txn_stmt *stmt);
+	virtual void rollbackStatement(struct txn_stmt *stmt);
 	virtual void rollback(struct txn *txn);
-	virtual void finish(struct txn *txn, bool is_commit);
 	virtual void beginJoin();
 	virtual void recoverToCheckpoint(int64_t);
 	virtual void endRecovery();
diff --git a/src/box/sophia_index.cc b/src/box/sophia_index.cc
index f4b9b2d276..f3b4d102cc 100644
--- a/src/box/sophia_index.cc
+++ b/src/box/sophia_index.cc
@@ -252,7 +252,6 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple,
 	/* delete */
 	if (old_tuple && new_tuple == NULL) {
 		sophia_write(env, db, tx, SOPHIA_DELETE, key_def, old_tuple);
-		tuple_ref(old_tuple);
 		return old_tuple;
 	}
 
@@ -273,11 +272,8 @@ SophiaIndex::replace(struct tuple *old_tuple, struct tuple *new_tuple,
 		struct tuple *dup_tuple =
 			sophia_read(env, db, tx, key, keysize, space->format);
 		if (dup_tuple) {
-			tuple_ref(dup_tuple);
-			int error = 0;
-			if (tuple_compare(dup_tuple, new_tuple, key_def) == 0)
-				error = 1;
-			tuple_unref(dup_tuple);
+			int error = tuple_compare(dup_tuple, new_tuple, key_def) == 0;
+			tuple_delete(dup_tuple);
 			if (error) {
 				struct space *sp =
 					space_cache_find(key_def->space_id);
diff --git a/src/box/txn.cc b/src/box/txn.cc
index cb4fc3e74c..ab5f2d4a33 100644
--- a/src/box/txn.cc
+++ b/src/box/txn.cc
@@ -83,10 +83,7 @@ txn_replace(struct txn *txn, struct space *space,
 	 * another transaction in rollback().
 	 */
 	stmt->old_tuple = space_replace(space, old_tuple, new_tuple, mode);
-	if (new_tuple) {
-		stmt->new_tuple = new_tuple;
-		tuple_ref(stmt->new_tuple);
-	}
+	stmt->new_tuple = new_tuple;
 	stmt->space = space;
 
 	/*
@@ -144,54 +141,33 @@ txn_begin(bool autocommit)
 		rlist_nil, txn_on_yield_or_stop, NULL, NULL
 	};
 	txn->autocommit = autocommit;
-	txn->engine_tx = NULL;
 	fiber_set_txn(fiber(), txn);
 	return txn;
 }
 
-static void
-txn_engine_begin_stmt(struct txn *txn, struct space *space)
-{
-	assert(txn->n_stmts >= 1);
-	/**
-	 * Notify storage engine about the transaction.
-	 * Ensure various storage engine constraints:
-	 * a. check if it supports multi-statement transactions
-	 * b. only one engine can be used in a multi-statement
-	 *    transaction
-	 */
-	Engine *engine = space->handler->engine;
-	if (txn->n_stmts == 1) {
-		/* First statement. */
-		txn->engine = engine;
-	} else {
-		if (txn->engine->id != engine_id(space->handler))
-			tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION);
-	}
-	engine->begin(txn, space);
-}
-
 struct txn *
-txn_begin_stmt(struct request *request, struct space *space)
+txn_begin_stmt(struct request *request, Engine *engine)
 {
 	struct txn *txn = in_txn();
 	if (txn == NULL)
 		txn = txn_begin(true);
+
+	if (txn->engine == NULL) {
+		assert(txn->n_stmts == 0);
+		txn->engine = engine;
+	} else if (txn->engine != engine) {
+		/**
+		 * Only one engine can be used in
+		 * a multi-statement transaction currently.
+		 */
+		tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION);
+	}
 	struct txn_stmt *stmt = txn_stmt_new(txn);
 	txn_add_redo(stmt, request);
-	txn_engine_begin_stmt(txn, space);
+	engine->beginStatement(txn);
 	return txn;
 }
 
-void
-txn_commit_stmt(struct txn *txn)
-{
-	if (txn->autocommit) {
-		txn_commit(txn);
-		txn_finish(txn, true);
-	}
-}
-
 void
 txn_commit(struct txn *txn)
 {
@@ -220,18 +196,10 @@ txn_commit(struct txn *txn)
 		txn->signature = res;
 	}
 
+	trigger_run(&txn->on_commit, txn); /* must not throw. */
 	/* xxx: engine commit may throw on conflict or error */
 	if (txn->engine)
 		txn->engine->commit(txn);
-
-	trigger_run(&txn->on_commit, txn); /* must not throw. */
-}
-
-void
-txn_finish(struct txn *txn, bool commit)
-{
-	if (txn->engine)
-		txn->engine->finish(txn, commit);
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
@@ -253,7 +221,7 @@ txn_rollback_stmt()
 	if (txn->autocommit)
 		return txn_rollback();
 	struct txn_stmt *stmt = txn_stmt(txn);
-	txn->engine->rollbackStmt(stmt);
+	txn->engine->rollbackStatement(stmt);
 	stmt->old_tuple = NULL;
 	stmt->new_tuple = NULL;
 	stmt->space = NULL;
@@ -266,13 +234,16 @@ txn_rollback()
 	struct txn *txn = in_txn();
 	if (txn == NULL)
 		return;
+	trigger_run(&txn->on_rollback, txn); /* must not throw. */
 	if (txn->engine)
 		txn->engine->rollback(txn);
-	trigger_run(&txn->on_rollback, txn); /* must not throw. */
 	/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
 		trigger_clear(&txn->fiber_on_yield);
 		trigger_clear(&txn->fiber_on_stop);
-	txn_finish(txn, false);
+	TRASH(txn);
+	/** Free volatile txn memory. */
+	fiber_gc();
+	fiber_set_txn(fiber(), NULL);
 }
 
 void
diff --git a/src/box/txn.h b/src/box/txn.h
index 9a205fa1e1..1798d9773e 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -86,50 +86,47 @@ in_txn()
 }
 
 /**
- * Start a new statement. If no current transaction,
- * start a new transaction with autocommit = true.
+ * Start a transaction explicitly.
+ * @pre no transaction is active
  */
 struct txn *
-txn_begin_stmt(struct request *request, struct space *space);
+txn_begin(bool autocommit);
 
 /**
- * End a statement. In autocommit mode, end
- * the current transaction as well.
+ * Commit a transaction.
+ * @pre txn == in_txn()
  */
 void
-txn_commit_stmt(struct txn *txn);
+txn_commit(struct txn *txn);
 
-/**
- * Rollback a statement. In autocommit mode,
- * rolls back the entire transaction.
- */
+/** Rollback a transaction, if any. */
 void
-txn_rollback_stmt();
+txn_rollback();
 
 /**
- * Start a transaction explicitly.
- * @pre no transaction is active
+ * Start a new statement. If no current transaction,
+ * start a new transaction with autocommit = true.
  */
 struct txn *
-txn_begin(bool autocommit);
+txn_begin_stmt(struct request *request, Engine *engine);
 
 /**
- * Commit a transaction. txn_finish must be called after that.
- * @pre txn == in_txn()
+ * End a statement. In autocommit mode, end
+ * the current transaction as well.
  */
-void
-txn_commit(struct txn *txn);
+static inline void
+txn_commit_stmt(struct txn *txn)
+{
+	if (txn->autocommit)
+		txn_commit(txn);
+}
 
 /**
- * Finish a transaction. Must be called after txn_commit.
- * @pre txn == in_txn()
+ * Rollback a statement. In autocommit mode,
+ * rolls back the entire transaction.
  */
 void
-txn_finish(struct txn *txn, bool commit);
-
-/** Rollback a transaction, if any. */
-void
-txn_rollback();
+txn_rollback_stmt();
 
 /**
  * Raise an error if this is a multi-statement
-- 
GitLab