Skip to content
Snippets Groups Projects
Commit e43cc253 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

gh-306: implement automatic transaction rollback on yield.

parent 10d3c31e
No related branches found
No related tags found
No related merge requests found
......@@ -34,6 +34,8 @@
#include <string.h>
static RLIST_HEAD(engines);
uint32_t engine_flags[BOX_ENGINE_MAX];
int n_engines;
EngineFactory::EngineFactory(const char *engine_name)
:name(engine_name),
......@@ -63,6 +65,8 @@ Engine::Engine(EngineFactory *f)
void engine_register(EngineFactory *engine)
{
rlist_add_entry(&engines, engine, link);
engine->id = ++n_engines;
engine_flags[engine->id] = engine->flags;
}
/** Find factory engine by name. */
......
......@@ -34,6 +34,13 @@
struct space;
struct tuple;
enum engine_flags {
ENGINE_TRANSACTIONAL = 1,
ENGINE_NO_YIELD = 2,
};
extern uint32_t engine_flags[BOX_ENGINE_MAX];
/** Reflects what space_replace() is supposed to do. */
enum engine_recovery_state {
/**
......@@ -117,6 +124,10 @@ class EngineFactory: public Object {
public:
/** Name of the engine. */
const char *name;
/** Engine id. */
uint32_t id;
/** Engine flags */
uint32_t flags;
struct engine_recovery recovery;
/** Used for search for engine by name. */
struct rlist link;
......@@ -162,4 +173,24 @@ EngineFactory *engine_find(const char *name);
/** Shutdown all engine factories. */
void engine_shutdown();
static inline bool
engine_transactional(uint32_t id)
{
assert(id);
return engine_flags[id] & ENGINE_TRANSACTIONAL;
}
static inline bool
engine_no_yield(uint32_t id)
{
assert(id);
return engine_flags[id] & ENGINE_NO_YIELD;
}
static inline uint32_t
engine_id(Engine *engine)
{
return engine->factory->id;
}
#endif /* TARANTOOL_BOX_ENGINE_H_INCLUDED */
......@@ -74,6 +74,7 @@ memtx_recovery_prepare(struct engine_recovery *r)
MemtxFactory::MemtxFactory()
:EngineFactory("memtx")
{
flags = ENGINE_TRANSACTIONAL | ENGINE_NO_YIELD;
memtx_recovery_prepare(&recovery);
}
......
......@@ -37,6 +37,7 @@
#include <wctype.h>
enum {
BOX_ENGINE_MAX = 3, /* + 1 to the actual number of engines */
BOX_SPACE_MAX = INT32_MAX,
BOX_FUNCTION_MAX = 32000,
BOX_INDEX_MAX = 10,
......
......@@ -54,6 +54,16 @@ txn_add_redo(struct txn_stmt *stmt, struct request *request)
stmt->row = row;
}
extern "C" void
txn_on_yield(struct trigger * /* trigger */, void * /* event */)
{
txn_rollback(); /* doesn't throw */
}
struct trigger on_yield = {
rlist_nil, txn_on_yield, NULL, NULL
};
void
txn_replace(struct txn *txn, struct space *space,
struct tuple *old_tuple, struct tuple *new_tuple,
......@@ -73,6 +83,27 @@ txn_replace(struct txn *txn, struct space *space,
tuple_ref(stmt->new_tuple, 1);
}
stmt->space = space;
/**
* Various checks of the used storage engine:
* - check if it supports transactions
* - only one engine can be used in a multi-statement
* transaction
* - memtx doesn't allow yields between statements of
* a transaction. For this engine, set a trigger which
* would roll back the transaction if there is a yield.
*/
if (txn->autocommit == false) {
if (txn->n_stmts == 1) {
txn->engine = engine_id(space->engine);
if (engine_no_yield(txn->engine))
trigger_add(&fiber()->on_yield, &on_yield);
} else if (txn->engine != engine_id(space->engine))
tnt_raise(ClientError, ER_CROSS_ENGINE_TRANSACTION);
if (! engine_transactional(txn->engine)) {
tnt_raise(ClientError, ER_UNSUPPORTED,
space->def.engine_name, "transactions");
}
}
/*
* Run on_replace triggers. For now, disallow mutation
* of tuples in the trigger.
......@@ -139,6 +170,8 @@ txn_commit(struct txn *txn)
{
assert(txn == in_txn());
struct txn_stmt *stmt;
/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
trigger_clear(&on_yield);
rlist_foreach_entry(stmt, &txn->stmts, next) {
if ((!stmt->old_tuple && !stmt->new_tuple) ||
space_is_temporary(stmt->space))
......@@ -218,6 +251,8 @@ txn_rollback()
if (stmt->new_tuple)
tuple_ref(stmt->new_tuple, -1);
}
/* if (!txn->autocommit && txn->n_stmts && engine_no_yield(txn->engine)) */
trigger_clear(&on_yield);
TRASH(txn);
/** Free volatile txn memory. */
fiber_gc();
......
......@@ -66,6 +66,8 @@ struct txn {
* (statement end causes an automatic transaction commit).
*/
bool autocommit;
/** Id of the engine involved in multi-statement transaction. */
uint8_t engine;
};
/* Pointer to the current transaction (if any) */
......
......@@ -130,6 +130,7 @@ enum { TNT_ERRMSG_MAX = 512 };
/* 78 */_(ER_TIMEOUT, 2, "Timeout exceeded") \
/* 79 */_(ER_ACTIVE_TRANSACTION, 2, "Operation is not permitted when there is an active transaction ") \
/* 80 */_(ER_NO_ACTIVE_TRANSACTION, 2, "Operation is not permitted when there is no active transaction ") \
/* 81 */_(ER_CROSS_ENGINE_TRANSACTION, 2, "A multi-statement transaction can not use multiple storage engines") \
/*
* !IMPORTANT! Please follow instructions at start of the file
......
......@@ -190,6 +190,7 @@ t;
- 'box.error.TUPLE_NOT_FOUND : 4'
- 'box.error.DROP_USER : 44'
- 'box.error.SERVER_ID_IS_RO : 66'
- 'box.error.CROSS_ENGINE_TRANSACTION : 81'
- 'box.error.MODIFY_INDEX : 14'
- 'box.error.PASSWORD_MISMATCH : 47'
- 'box.error.NO_SUCH_ENGINE : 57'
......
......@@ -50,12 +50,20 @@ f = fiber.create(sloppy);
...
-- when the sloppy fiber ends, its session has an active transction
-- ensure it's rolled back automatically
f:status();
---
- dead
...
fiber.sleep(0);
---
...
fiber.sleep(0);
---
...
f:status();
---
- dead
...
-- transactions and system spaces
box.begin() box.schema.space.create('test');
---
......@@ -178,3 +186,82 @@ s:select{};
---
- - [1, 'first row']
...
s:truncate();
---
...
--
-- Test that fiber yield causes a transaction rollback
-- but only if the transaction has changed any data
--
-- Test admin console
box.begin();
---
...
-- should be ok - active transaction, and we don't
-- know, maybe it will use sophia engine, which
-- may support yield() in the future, so we don't roll
-- back a transction with no statements.
box.commit();
---
...
box.begin() s:insert{1, 'Must be rolled back'};
---
...
-- error: no active transaction because of yield
box.commit();
---
- error: 'Operation is not permitted when there is no active transaction '
...
-- nothing - the transaction was rolled back
-- Test background fiber
--
s:select{}
function sloppy()
box.begin()
s:insert{1, 'From background fiber'}
end;
---
...
f = fiber.create(sloppy);
---
...
while f:status() == 'running' do
fiber.sleep(0)
end;
---
...
-- When the sloppy fiber ends, its session has an active transction
-- It's rolled back automatically
s:select{};
---
- []
...
function sloppy()
box.begin()
s:insert{1, 'From background fiber'}
fiber.sleep(0)
pcall(box.commit)
t = s:select{}
end;
---
...
f = fiber.create(sloppy);
---
...
while f:status() == 'running' do
fiber.sleep(0)
end;
---
...
t;
---
- []
...
s:select{};
---
- []
...
s:drop();
---
...
--# setopt delimiter ''
......@@ -24,8 +24,10 @@ end;
f = fiber.create(sloppy);
-- when the sloppy fiber ends, its session has an active transction
-- ensure it's rolled back automatically
f:status();
fiber.sleep(0);
fiber.sleep(0);
f:status();
-- transactions and system spaces
box.begin() box.schema.space.create('test');
box.rollback();
......@@ -74,3 +76,48 @@ end;
multi();
t;
s:select{};
s:truncate();
--
-- Test that fiber yield causes a transaction rollback
-- but only if the transaction has changed any data
--
-- Test admin console
box.begin();
-- should be ok - active transaction, and we don't
-- know, maybe it will use sophia engine, which
-- may support yield() in the future, so we don't roll
-- back a transction with no statements.
box.commit();
box.begin() s:insert{1, 'Must be rolled back'};
-- error: no active transaction because of yield
box.commit();
-- nothing - the transaction was rolled back
s:select{}
-- Test background fiber
--
function sloppy()
box.begin()
s:insert{1, 'From background fiber'}
end;
f = fiber.create(sloppy);
while f:status() == 'running' do
fiber.sleep(0)
end;
-- When the sloppy fiber ends, its session has an active transction
-- It's rolled back automatically
s:select{};
function sloppy()
box.begin()
s:insert{1, 'From background fiber'}
fiber.sleep(0)
pcall(box.commit)
t = s:select{}
end;
f = fiber.create(sloppy);
while f:status() == 'running' do
fiber.sleep(0)
end;
t;
s:select{};
s:drop();
--# setopt delimiter ''
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment