From e43cc253195170351289f0a4fd539fe629632fae Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Fri, 18 Jul 2014 02:15:36 +0400
Subject: [PATCH] gh-306: implement automatic transaction rollback on yield.

---
 src/box/engine.cc             |  4 ++
 src/box/engine.h              | 31 +++++++++++++
 src/box/engine_memtx.cc       |  1 +
 src/box/key_def.h             |  1 +
 src/box/txn.cc                | 35 ++++++++++++++
 src/box/txn.h                 |  2 +
 src/errcode.h                 |  1 +
 test/box/misc.result          |  1 +
 test/box/transaction.result   | 87 +++++++++++++++++++++++++++++++++++
 test/box/transaction.test.lua | 47 +++++++++++++++++++
 10 files changed, 210 insertions(+)

diff --git a/src/box/engine.cc b/src/box/engine.cc
index 8efcc17516..2772a1e325 100644
--- a/src/box/engine.cc
+++ b/src/box/engine.cc
@@ -34,6 +34,8 @@
 #include <string.h>
 
 static RLIST_HEAD(engines);
+uint32_t engine_flags[BOX_ENGINE_MAX];
+int n_engines;
 
 EngineFactory::EngineFactory(const char *engine_name)
 	:name(engine_name),
@@ -63,6 +65,8 @@ Engine::Engine(EngineFactory *f)
 void engine_register(EngineFactory *engine)
 {
 	rlist_add_entry(&engines, engine, link);
+	engine->id = ++n_engines;
+	engine_flags[engine->id] = engine->flags;
 }
 
 /** Find factory engine by name. */
diff --git a/src/box/engine.h b/src/box/engine.h
index 9a04d59a86..8d9d3b2cc9 100644
--- a/src/box/engine.h
+++ b/src/box/engine.h
@@ -34,6 +34,13 @@
 struct space;
 struct tuple;
 
+enum engine_flags {
+	ENGINE_TRANSACTIONAL = 1,
+	ENGINE_NO_YIELD = 2,
+};
+
+extern uint32_t engine_flags[BOX_ENGINE_MAX];
+
 /** Reflects what space_replace() is supposed to do. */
 enum engine_recovery_state {
 	/**
@@ -117,6 +124,10 @@ class EngineFactory: public Object {
 public:
 	/** Name of the engine. */
 	const char *name;
+	/** Engine id. */
+	uint32_t id;
+	/** Engine flags */
+	uint32_t flags;
 	struct engine_recovery recovery;
 	/** Used for search for engine by name. */
 	struct rlist link;
@@ -162,4 +173,24 @@ EngineFactory *engine_find(const char *name);
 /** Shutdown all engine factories. */
 void engine_shutdown();
 
+static inline bool
+engine_transactional(uint32_t id)
+{
+	assert(id);
+	return engine_flags[id] & ENGINE_TRANSACTIONAL;
+}
+
+static inline bool
+engine_no_yield(uint32_t id)
+{
+	assert(id);
+	return engine_flags[id] & ENGINE_NO_YIELD;
+}
+
+static inline uint32_t
+engine_id(Engine *engine)
+{
+	return engine->factory->id;
+}
+
 #endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */
diff --git a/src/box/engine_memtx.cc b/src/box/engine_memtx.cc
index cb8fab1c7d..23a74253c0 100644
--- a/src/box/engine_memtx.cc
+++ b/src/box/engine_memtx.cc
@@ -74,6 +74,7 @@ memtx_recovery_prepare(struct engine_recovery *r)
 MemtxFactory::MemtxFactory()
 	:EngineFactory("memtx")
 {
+	flags = ENGINE_TRANSACTIONAL | ENGINE_NO_YIELD;
 	memtx_recovery_prepare(&recovery);
 }
 
diff --git a/src/box/key_def.h b/src/box/key_def.h
index 96028f7a5b..6ead7f7f8d 100644
--- a/src/box/key_def.h
+++ b/src/box/key_def.h
@@ -37,6 +37,7 @@
 #include <wctype.h>
 
 enum {
+	BOX_ENGINE_MAX = 3, /* + 1 to the actual number of engines */
 	BOX_SPACE_MAX = INT32_MAX,
 	BOX_FUNCTION_MAX = 32000,
 	BOX_INDEX_MAX = 10,
diff --git a/src/box/txn.cc b/src/box/txn.cc
index f4167ee0fc..16c633591f 100644
--- a/src/box/txn.cc
+++ b/src/box/txn.cc
@@ -54,6 +54,16 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request)
 	stmt->row = row;
 }
 
+extern "C" void
+txn_on_yield(struct trigger * /* trigger */, void * /* event */)
+{
+	txn_rollback(); /* doesn't throw */
+}
+
+struct trigger on_yield = {
+	rlist_nil, txn_on_yield, NULL, NULL
+};
+
 void
 txn_replace(struct txn *txn, struct space *space,
 	    struct tuple *old_tuple, struct tuple *new_tuple,
@@ -73,6 +83,27 @@ txn_replace(struct txn *txn, struct space *space,
 		tuple_ref(stmt->new_tuple, 1);
 	}
 	stmt->space = space;
+	/**
+	 * Various checks of the used storage engine:
+	 * - check if it supports transactions
+	 * - only one engine can be used in a multi-statement
+	 *   transaction
+	 * - memtx doesn't allow yields between statements of
+	 *   a transaction. For this engine, set a trigger which
+	 *   would roll back the transaction if there is a yield.
+	 */
+	if (txn->autocommit == false) {
+		if (txn->n_stmts == 1) {
+			txn->engine = engine_id(space->engine);
+			if (engine_no_yield(txn->engine))
+				trigger_add(&fiber()->on_yield, &on_yield);
+		} else if (txn->engine != engine_id(space->engine))
+			tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION);
+		if (! engine_transactional(txn->engine)) {
+			tnt_raise(ClientError, ER_UNSUPPORTED,
+				  space->def.engine_name, "transactions");
+		}
+	}
 	/*
 	 * Run on_replace triggers. For now, disallow mutation
 	 * of tuples in the trigger.
@@ -139,6 +170,8 @@ txn_commit(struct txn *txn)
 {
 	assert(txn == in_txn());
 	struct txn_stmt *stmt;
+	/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
+		trigger_clear(&on_yield);
 	rlist_foreach_entry(stmt, &txn->stmts, next) {
 		if ((!stmt->old_tuple && !stmt->new_tuple) ||
 		    space_is_temporary(stmt->space))
@@ -218,6 +251,8 @@ txn_rollback()
 		if (stmt->new_tuple)
 			tuple_ref(stmt->new_tuple, -1);
 	}
+	/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
+		trigger_clear(&on_yield);
 	TRASH(txn);
 	/** Free volatile txn memory. */
 	fiber_gc();
diff --git a/src/box/txn.h b/src/box/txn.h
index 3f9870b5fb..2856abcc47 100644
--- a/src/box/txn.h
+++ b/src/box/txn.h
@@ -66,6 +66,8 @@ struct txn {
 	 * (statement end causes an automatic transaction commit).
 	 */
 	bool autocommit;
+	/** Id of the engine involved in multi-statement transaction. */
+	uint8_t engine;
 };
 
 /* Pointer to the current transaction (if any) */
diff --git a/src/errcode.h b/src/errcode.h
index 9b233d82b9..4da99c6572 100644
--- a/src/errcode.h
+++ b/src/errcode.h
@@ -130,6 +130,7 @@ enum { TNT_ERRMSG_MAX = 512 };
 	/* 78 */_(ER_TIMEOUT,			2, "Timeout exceeded") \
 	/* 79 */_(ER_ACTIVE_TRANSACTION,	2, "Operation is not permitted when there is an active transaction ") \
 	/* 80 */_(ER_NO_ACTIVE_TRANSACTION,	2, "Operation is not permitted when there is no active transaction ") \
+	/* 81 */_(ER_CROSS_ENGINE_TRANSACTION,	2, "A multi-statement transaction can not use multiple storage engines") \
 
 /*
  * !IMPORTANT! Please follow instructions at start of the file
diff --git a/test/box/misc.result b/test/box/misc.result
index 882661f71b..351a4edfc5 100644
--- a/test/box/misc.result
+++ b/test/box/misc.result
@@ -190,6 +190,7 @@ t;
   - 'box.error.TUPLE_NOT_FOUND : 4'
   - 'box.error.DROP_USER : 44'
   - 'box.error.SERVER_ID_IS_RO : 66'
+  - 'box.error.CROSS_ENGINE_TRANSACTION : 81'
   - 'box.error.MODIFY_INDEX : 14'
   - 'box.error.PASSWORD_MISMATCH : 47'
   - 'box.error.NO_SUCH_ENGINE : 57'
diff --git a/test/box/transaction.result b/test/box/transaction.result
index 1cf1fa1d15..a0dbea0e80 100644
--- a/test/box/transaction.result
+++ b/test/box/transaction.result
@@ -50,12 +50,20 @@ f = fiber.create(sloppy);
 ...
 -- when the sloppy fiber ends, its session has an active transction
 -- ensure it's rolled back automatically
+f:status();
+---
+- dead
+...
 fiber.sleep(0);
 ---
 ...
 fiber.sleep(0);
 ---
 ...
+f:status();
+---
+- dead
+...
 -- transactions and system spaces
 box.begin() box.schema.space.create('test');
 ---
@@ -178,3 +186,82 @@ s:select{};
 ---
 - - [1, 'first row']
 ...
+s:truncate();
+---
+...
+--
+-- Test that fiber yield causes a transaction rollback
+-- but only if the transaction has changed any data
+--
+-- Test admin console
+box.begin();
+---
+...
+-- should be ok - active transaction, and we don't
+-- know, maybe it will use sophia engine, which
+-- may support yield() in the future, so we don't roll
+-- back a transction with no statements.
+box.commit();
+---
+...
+box.begin() s:insert{1, 'Must be rolled back'};
+---
+...
+-- error: no active transaction because of yield
+box.commit();
+---
+- error: 'Operation is not permitted when there is no active transaction '
+...
+-- nothing - the transaction was rolled back
+-- Test background fiber
+--
+s:select{}
+function sloppy()
+    box.begin()
+    s:insert{1, 'From background fiber'}
+end;
+---
+...
+f = fiber.create(sloppy);
+---
+...
+while f:status() == 'running' do
+    fiber.sleep(0)
+end;
+---
+...
+-- When the sloppy fiber ends, its session has an active transction
+-- It's rolled back automatically
+s:select{};
+---
+- []
+...
+function sloppy()
+    box.begin()
+    s:insert{1, 'From background fiber'}
+    fiber.sleep(0)
+    pcall(box.commit)
+    t = s:select{}
+end;
+---
+...
+f = fiber.create(sloppy);
+---
+...
+while f:status() == 'running' do
+    fiber.sleep(0)
+end;
+---
+...
+t;
+---
+- []
+...
+s:select{};
+---
+- []
+...
+s:drop();
+---
+...
+--# setopt delimiter ''
diff --git a/test/box/transaction.test.lua b/test/box/transaction.test.lua
index 07260e2f8c..9d296148ba 100644
--- a/test/box/transaction.test.lua
+++ b/test/box/transaction.test.lua
@@ -24,8 +24,10 @@ end;
 f = fiber.create(sloppy);
 -- when the sloppy fiber ends, its session has an active transction
 -- ensure it's rolled back automatically
+f:status();
 fiber.sleep(0);
 fiber.sleep(0);
+f:status();
 -- transactions and system spaces
 box.begin() box.schema.space.create('test');
 box.rollback();
@@ -74,3 +76,48 @@ end;
 multi();
 t;
 s:select{};
+s:truncate();
+--
+-- Test that fiber yield causes a transaction rollback
+-- but only if the transaction has changed any data
+--
+-- Test admin console
+box.begin();
+-- should be ok - active transaction, and we don't
+-- know, maybe it will use sophia engine, which
+-- may support yield() in the future, so we don't roll
+-- back a transction with no statements.
+box.commit();
+box.begin() s:insert{1, 'Must be rolled back'};
+-- error: no active transaction because of yield
+box.commit();
+-- nothing - the transaction was rolled back
+s:select{}
+-- Test background fiber
+--
+function sloppy()
+    box.begin()
+    s:insert{1, 'From background fiber'}
+end;
+f = fiber.create(sloppy);
+while f:status() == 'running' do
+    fiber.sleep(0)
+end;
+-- When the sloppy fiber ends, its session has an active transction
+-- It's rolled back automatically
+s:select{};
+function sloppy()
+    box.begin()
+    s:insert{1, 'From background fiber'}
+    fiber.sleep(0)
+    pcall(box.commit)
+    t = s:select{}
+end;
+f = fiber.create(sloppy);
+while f:status() == 'running' do
+    fiber.sleep(0)
+end;
+t;
+s:select{};
+s:drop();
+--# setopt delimiter ''
-- 
GitLab