From a76eb6ef5fbd136352d01d8e62c1e8429979201a Mon Sep 17 00:00:00 2001 From: mechanik20051988 <mechanik20051988@tarantool.org> Date: Fri, 1 Oct 2021 18:30:10 +0300 Subject: [PATCH] txn: implement timeout for transactions Client code errors or manual mistakes can create transactions that are never closed. Such transaction will work as a memory leak. Implement timeout for transactions after which they are rolled back. Part of #6177 @TarantoolBot document Title: ability to set timeout for transactions was implemented Previously transactions are never closed until commit or rollback. Timeout for transactions was implemented after which they are rolled back. For these purpose, in `box.begin` the optional table parameter was added. For example if user want to start transaction with timeout 3s, he should use `box.begin({timeout = 3})`. Also was implement new configuration option `box.cfg.txn_timeout` which determines timeout for transactions, for which the timeout was not explicitly set. By default this option is set to infinity (TIMEOUT_INFINITY = 365 * 100 * 86400). Also in C API was added new function to set timeout for transaction - 'box_txn_set_timeout'. --- .../gh-6177-implement-txn-timeout.md | 6 + extra/exports | 1 + src/box/box.cc | 26 +++ src/box/box.h | 7 + src/box/errcode.h | 2 + src/box/lua/cfg.cc | 9 + src/box/lua/load_cfg.lua | 3 + src/box/lua/schema.lua | 16 +- src/box/txn.c | 68 +++++- src/box/txn.h | 24 ++ test/app-tap/init_script.result | 1 + test/box/admin.result | 2 + test/box/cfg.result | 4 + test/box/error.result | 2 + test/box/tx_timeout.result | 219 ++++++++++++++++++ test/box/tx_timeout.test.lua | 82 +++++++ 16 files changed, 470 insertions(+), 2 deletions(-) create mode 100644 changelogs/unreleased/gh-6177-implement-txn-timeout.md create mode 100644 test/box/tx_timeout.result create mode 100644 test/box/tx_timeout.test.lua diff --git a/changelogs/unreleased/gh-6177-implement-txn-timeout.md b/changelogs/unreleased/gh-6177-implement-txn-timeout.md new file mode 100644 index 0000000000..206ab81b23 --- /dev/null +++ b/changelogs/unreleased/gh-6177-implement-txn-timeout.md @@ -0,0 +1,6 @@ +## feature/core + +* Implemented a timeout for transactions after + which they are rolled back (gh-6177). + Implemented new C API function 'box_txn_set_timeout' + to set timeout for transaction. diff --git a/extra/exports b/extra/exports index 354ae0d4f0..19e23f5db5 100644 --- a/extra/exports +++ b/extra/exports @@ -100,6 +100,7 @@ box_txn_id box_txn_rollback box_txn_rollback_to_savepoint box_txn_savepoint +box_txn_set_timeout box_update box_upsert clock_monotonic diff --git a/src/box/box.cc b/src/box/box.cc index e082e1a3db..20361db3a3 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -91,6 +91,8 @@ struct rmean *rmean_box; double on_shutdown_trigger_timeout = 3.0; +double txn_timeout_default; + struct rlist box_on_shutdown_trigger_list = RLIST_HEAD_INITIALIZER(box_on_shutdown_trigger_list); @@ -1141,6 +1143,18 @@ box_check_iproto_options(void) return 0; } +static double +box_check_txn_timeout(void) +{ + double timeout = cfg_getd_default("txn_timeout", TIMEOUT_INFINITY); + if (timeout <= 0) { + diag_set(ClientError, ER_CFG, "txn_timeout", + "the value must be greather than 0"); + return -1; + } + return timeout; +} + void box_check_config(void) { @@ -1183,6 +1197,8 @@ box_check_config(void) diag_raise(); if (box_check_sql_cache_size(cfg_geti("sql_cache_size")) != 0) diag_raise(); + if (box_check_txn_timeout() < 0) + diag_raise(); } int @@ -2036,6 +2052,16 @@ box_set_crash(void) return 0; } +int +box_set_txn_timeout(void) +{ + double timeout = box_check_txn_timeout(); + if (timeout < 0) + return -1; + txn_timeout_default = timeout; + return 0; +} + /* }}} configuration bindings */ /** diff --git a/src/box/box.h b/src/box/box.h index a9e5176682..14278a2945 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -70,6 +70,12 @@ extern double on_shutdown_trigger_timeout; /** Invoked on box shutdown. */ extern struct rlist box_on_shutdown_trigger_list; +/** + * Timeout during which the transaction must complete, + * otherwise it will be rolled back. + */ +extern double txn_timeout_default; + /* * Initialize box library * @throws C++ exception @@ -264,6 +270,7 @@ void box_set_replication_skip_conflict(void); void box_set_replication_anon(void); void box_set_net_msg_max(void); int box_set_crash(void); +int box_set_txn_timeout(void); int box_set_prepared_stmt_cache_size(void); diff --git a/src/box/errcode.h b/src/box/errcode.h index a6f0966983..ac178f1f19 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -283,6 +283,8 @@ struct errcode_record { /*228 */_(ER_SYNC_QUEUE_FOREIGN, "The synchronous transaction queue belongs to other instance with id %u")\ /*226 */_(ER_UNABLE_TO_PROCESS_IN_STREAM, "Unable to process %s request in stream") \ /*227 */_(ER_UNABLE_TO_PROCESS_OUT_OF_STREAM, "Unable to process %s request out of stream") \ + /*228 */_(ER_TRANSACTION_TIMEOUT, "Transaction has been aborted by timeout") \ + /*229 */_(ER_ACTIVE_TIMER, "Operation is not permitted if timer is already running") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc index e0c5a40021..5ea23917d3 100644 --- a/src/box/lua/cfg.cc +++ b/src/box/lua/cfg.cc @@ -396,6 +396,14 @@ lbox_cfg_set_crash(struct lua_State *L) return 0; } +static int +lbox_cfg_set_txn_timeout(struct lua_State *L) +{ + if (box_set_txn_timeout() != 0) + luaT_error(L); + return 0; +} + void box_lua_cfg_init(struct lua_State *L) { @@ -435,6 +443,7 @@ box_lua_cfg_init(struct lua_State *L) {"cfg_set_net_msg_max", lbox_cfg_set_net_msg_max}, {"cfg_set_sql_cache_size", lbox_set_prepared_stmt_cache_size}, {"cfg_set_crash", lbox_cfg_set_crash}, + {"cfg_set_txn_timeout", lbox_cfg_set_txn_timeout}, {NULL, NULL} }; diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index 13605c4ac9..7395a2e2f3 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -109,6 +109,7 @@ local default_cfg = { feedback_interval = 3600, net_msg_max = 768, sql_cache_size = 5 * 1024 * 1024, + txn_timeout = 365 * 100 * 86400, } -- cfg variables which are covered by modules @@ -215,6 +216,7 @@ local template_cfg = { feedback_interval = ifdef_feedback('number'), net_msg_max = 'number', sql_cache_size = 'number', + txn_timeout = 'number', } local function normalize_uri(port) @@ -335,6 +337,7 @@ local dynamic_cfg = { replicaset_uuid = check_replicaset_uuid, net_msg_max = private.cfg_set_net_msg_max, sql_cache_size = private.cfg_set_sql_cache_size, + txn_timeout = private.cfg_set_txn_timeout, } -- dynamically settable options, which should be reverted in case diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 1fe2e1c5c0..b0ff416980 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -71,6 +71,8 @@ ffi.cdef[[ box_txn_id(); int box_txn_begin(); + int + box_txn_set_timeout(double timeout); /** \endcond public */ /** \cond public */ int @@ -328,10 +330,22 @@ local function feedback_save_event(event) end end -box.begin = function() +box.begin = function(options) + local timeout + if options then + check_param(options, 'options', 'table') + timeout = options.timeout + if timeout and (type(timeout) ~= "number" or timeout <= 0) then + box.error(box.error.ILLEGAL_PARAMS, + "timeout must be a number greater than 0") + end + end if builtin.box_txn_begin() == -1 then box.error() end + if timeout then + assert(builtin.box_txn_set_timeout(timeout) == 0) + end end box.is_in_txn = builtin.box_txn diff --git a/src/box/txn.c b/src/box/txn.c index 9203d2cff8..a1b0b45142 100644 --- a/src/box/txn.c +++ b/src/box/txn.c @@ -38,6 +38,7 @@ #include "xrow.h" #include "errinj.h" #include "iproto_constants.h" +#include "box.h" double too_long_threshold; @@ -60,13 +61,17 @@ txn_flags_to_error_code(struct txn *txn) return ER_TRANSACTION_CONFLICT; else if (txn_has_flag(txn, TXN_IS_ABORTED_BY_YIELD)) return ER_TRANSACTION_YIELD; + else if (txn_has_flag(txn, TXN_IS_ABORTED_BY_TIMEOUT)) + return ER_TRANSACTION_TIMEOUT; return ER_UNKNOWN; } static inline int txn_check_can_continue(struct txn *txn) { - enum txn_flag flags = TXN_IS_CONFLICTED | TXN_IS_ABORTED_BY_YIELD; + enum txn_flag flags = + TXN_IS_CONFLICTED | TXN_IS_ABORTED_BY_YIELD | + TXN_IS_ABORTED_BY_TIMEOUT; if (txn_has_any_of_flags(txn, flags)) { diag_set(ClientError, txn_flags_to_error_code(txn)); return -1; @@ -74,6 +79,13 @@ txn_check_can_continue(struct txn *txn) return 0; } +static inline void +txn_set_timeout(struct txn *txn, double timeout) +{ + assert(timeout > 0); + txn->timeout = timeout; +} + static int txn_add_redo(struct txn *txn, struct txn_stmt *stmt, struct request *request) { @@ -237,6 +249,8 @@ txn_new(void) inline static void txn_free(struct txn *txn) { + if (txn->rollback_timer != NULL) + ev_timer_stop(loop(), txn->rollback_timer); memtx_tx_clean_txn(txn); struct tx_read_tracker *tracker, *tmp; rlist_foreach_entry_safe(tracker, &txn->read_set, @@ -327,6 +341,8 @@ txn_begin(void) rlist_create(&txn->savepoints); memtx_tx_register_tx(txn); txn->fiber = NULL; + txn->timeout = TIMEOUT_INFINITY; + txn->rollback_timer = NULL; fiber_set_txn(fiber(), txn); trigger_create(&txn->fiber_on_yield, txn_on_yield, NULL, NULL); trigger_add(&fiber()->on_yield, &txn->fiber_on_yield); @@ -721,6 +737,11 @@ txn_prepare(struct txn *txn) if (txn_check_can_continue(txn) != 0) return -1; + + if (txn->rollback_timer != NULL) { + ev_timer_stop(loop(), txn->rollback_timer); + txn->rollback_timer = NULL; + } /* * If transaction has been started in SQL, deferred * foreign key constraints must not be violated. @@ -1059,6 +1080,7 @@ box_txn_begin(void) } if (txn_begin() == NULL) return -1; + txn_set_timeout(in_txn(), txn_timeout_default); return 0; } @@ -1118,6 +1140,27 @@ box_txn_alloc(size_t size) alignof(union natural_align)); } +int +box_txn_set_timeout(double timeout) +{ + if (timeout <= 0) { + diag_set(ClientError, ER_ILLEGAL_PARAMS, + "timeout must be a number greater than 0"); + return -1; + } + struct txn *txn = in_txn(); + if (txn == NULL) { + diag_set(ClientError, ER_NO_TRANSACTION); + return -1; + } + if (txn->rollback_timer != NULL) { + diag_set(ClientError, ER_ACTIVE_TIMER); + return -1; + } + txn_set_timeout(txn, timeout); + return 0; +} + struct txn_savepoint * txn_savepoint_new(struct txn *txn, const char *name) { @@ -1238,6 +1281,16 @@ txn_on_stop(struct trigger *trigger, void *event) return 0; } +static void +txn_on_timeout(ev_loop *loop, ev_timer *watcher, int revents) +{ + (void) loop; + (void) revents; + struct txn *txn = (struct txn *)watcher->data; + txn_rollback_to_svp(txn, NULL); + txn_set_flags(txn, TXN_IS_ABORTED_BY_TIMEOUT); +} + /** * Memtx yield-in-transaction trigger callback. * @@ -1267,6 +1320,19 @@ txn_on_yield(struct trigger *trigger, void *event) txn_rollback_to_svp(txn, NULL); txn_set_flags(txn, TXN_IS_ABORTED_BY_YIELD); say_warn("Transaction has been aborted by a fiber yield"); + return 0; + } + if (txn->rollback_timer == NULL && txn->timeout != TIMEOUT_INFINITY) { + int size; + txn->rollback_timer = region_alloc_object(&txn->region, + struct ev_timer, + &size); + if (txn->rollback_timer == NULL) + panic("Out of memory on creation of rollback timer"); + ev_timer_init(txn->rollback_timer, txn_on_timeout, + txn->timeout, 0); + txn->rollback_timer->data = txn; + ev_timer_start(loop(), txn->rollback_timer); } return 0; } diff --git a/src/box/txn.h b/src/box/txn.h index 0a13d341fd..b93441a95a 100644 --- a/src/box/txn.h +++ b/src/box/txn.h @@ -96,6 +96,11 @@ enum txn_flag { * committed due to conflict. */ TXN_IS_CONFLICTED = 0x80, + /* + * Transaction has been aborted by timeout so should be + * rolled back at commit. + */ + TXN_IS_ABORTED_BY_TIMEOUT = 0x100, }; enum { @@ -433,6 +438,13 @@ struct txn { struct rlist in_all_txs; /** True in case transaction provides any DDL change. */ bool is_schema_changed; + /** Timeout for transaction, or TIMEOUT_INFINITY if not set. */ + double timeout; + /** + * Timer that is alarmed if the transaction did not have time + * to complete within the timeout specified when it was created. + */ + struct ev_timer *rollback_timer; }; static inline bool @@ -831,6 +843,18 @@ box_txn_rollback(void); API_EXPORT void * box_txn_alloc(size_t size); +/** + * Set @a timeout for transaction, when it expires, transaction + * will be rolled back. + * + * @retval 0 if success + * @retval -1 if timeout is less than or equal to 0, there is + * no current transaction or rollback timer for + * current transaction is already started. + */ +API_EXPORT int +box_txn_set_timeout(double timeout); + /** \endcond public */ typedef struct txn_savepoint box_txn_savepoint_t; diff --git a/test/app-tap/init_script.result b/test/app-tap/init_script.result index e849a33180..aefc506630 100644 --- a/test/app-tap/init_script.result +++ b/test/app-tap/init_script.result @@ -44,6 +44,7 @@ slab_alloc_granularity:8 sql_cache_size:5242880 strip_core:true too_long_threshold:0.5 +txn_timeout:3153600000 vinyl_bloom_fpr:0.05 vinyl_cache:134217728 vinyl_dir:. diff --git a/test/box/admin.result b/test/box/admin.result index f3be0869bb..68636c1d61 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -109,6 +109,8 @@ cfg_filter(box.cfg) - true - - too_long_threshold - 0.5 + - - txn_timeout + - 3153600000 - - vinyl_bloom_fpr - 0.05 - - vinyl_cache diff --git a/test/box/cfg.result b/test/box/cfg.result index d2f2a8ddeb..b8c8d5d819 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -97,6 +97,8 @@ cfg_filter(box.cfg) | - true | - - too_long_threshold | - 0.5 + | - - txn_timeout + | - 3153600000 | - - vinyl_bloom_fpr | - 0.05 | - - vinyl_cache @@ -222,6 +224,8 @@ cfg_filter(box.cfg) | - true | - - too_long_threshold | - 0.5 + | - - txn_timeout + | - 3153600000 | - - vinyl_bloom_fpr | - 0.05 | - - vinyl_cache diff --git a/test/box/error.result b/test/box/error.result index bc804197a5..19ea538731 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -449,6 +449,8 @@ t; | 228: box.error.SYNC_QUEUE_FOREIGN | 229: box.error.UNABLE_TO_PROCESS_IN_STREAM | 230: box.error.UNABLE_TO_PROCESS_OUT_OF_STREAM + | 231: box.error.TRANSACTION_TIMEOUT + | 232: box.error.ACTIVE_TIMER | ... test_run:cmd("setopt delimiter ''"); diff --git a/test/box/tx_timeout.result b/test/box/tx_timeout.result new file mode 100644 index 0000000000..a64da87e61 --- /dev/null +++ b/test/box/tx_timeout.result @@ -0,0 +1,219 @@ +test_run = require("test_run").new() +--- +... +fiber = require("fiber") +--- +... +test_run:cmd("create server test with script='box/tx_man.lua'") +--- +- true +... +test_run:cmd("start server test") +--- +- true +... +-- Checks for local transactions +test_run:switch("test") +--- +- true +... +fiber = require("fiber") +--- +... +ffi = require("ffi") +--- +... +ffi.cdef("int box_txn_set_timeout(double timeout);") +--- +... +-- Check error when we try to set timeout, when +-- there is no active transaction +assert(ffi.C.box_txn_set_timeout(5) == -1) +--- +- true +... +-- No active transaction +box.error.last() +--- +- No active transaction +... +-- Check error when try to set timeout, when +-- transaction rollback timer is already running +box.begin({timeout = 100}) +--- +... +fiber.yield() +--- +... +assert(ffi.C.box_txn_set_timeout(5) == -1) +--- +- true +... +-- Operation is not permitted if timer is already running +box.error.last() +--- +- Operation is not permitted if timer is already running +... +box.commit() +--- +... +-- Check arguments for 'box.begin' +box.begin(1) +--- +- error: Illegal parameters, options should be a table +... +box.begin({timeout = 0}) +--- +- error: Illegal parameters, timeout must be a number greater than 0 +... +box.begin({timeout = -1}) +--- +- error: Illegal parameters, timeout must be a number greater than 0 +... +box.begin({timeout = "5"}) +--- +- error: Illegal parameters, timeout must be a number greater than 0 +... +-- Check new configuration option 'txn_timeout' +box.cfg({txn_timeout = 0}) +--- +- error: 'Incorrect value for option ''txn_timeout'': the value must be greather than + 0' +... +box.cfg({txn_timeout = -1}) +--- +- error: 'Incorrect value for option ''txn_timeout'': the value must be greather than + 0' +... +box.cfg({txn_timeout = "5"}) +--- +- error: 'Incorrect value for option ''txn_timeout'': should be of type number' +... +s = box.schema.space.create("test") +--- +... +_ = s:create_index("pk") +--- +... +txn_timeout = 0.1 +--- +... +box.cfg({ txn_timeout = txn_timeout }) +--- +... +-- Check that transaction aborted by timeout, which +-- was set by the change of box.cfg.txn_timeout +box.begin() +--- +... +s:replace({1}) +--- +- [1] +... +s:select({}) -- [1] +--- +- - [1] +... +fiber.sleep(txn_timeout + 0.1) +--- +... +s:select({}) --[] +--- +- [] +... +s:replace({2}) +--- +- error: Transaction has been aborted by timeout +... +fiber.yield() +--- +... +s:select({}) -- [] +--- +- [] +... +box.commit() -- Transaction has been aborted by timeout +--- +- error: Transaction has been aborted by timeout +... +-- Check that transaction aborted by timeout, which +-- was set by appropriate option in box.begin +box.begin({timeout = txn_timeout}) +--- +... +s:replace({1}) +--- +- [1] +... +s:select({}) -- [1] +--- +- - [1] +... +fiber.sleep(txn_timeout / 2 + 0.1) +--- +... +s:select({}) --[] +--- +- [] +... +s:replace({2}) +--- +- error: Transaction has been aborted by timeout +... +fiber.yield() +--- +... +s:select({}) -- [] +--- +- [] +... +box.commit() -- Transaction has been aborted by timeout +--- +- error: Transaction has been aborted by timeout +... +-- Check that transaction is not rollback until timeout expired. +box.begin({timeout = 1000}) +--- +... +s:replace({1}) +--- +- [1] +... +s:select({1}) -- [1] +--- +- - [1] +... +fiber.sleep(0.1) +--- +... +-- timeout is not expired +s:select({}) -- [1] +--- +- - [1] +... +box.commit() -- Success +--- +... +s:select({}) -- [1] +--- +- - [1] +... +s:drop() +--- +... +test_run:switch("default") +--- +- true +... +test_run:cmd("stop server test") +--- +- true +... +test_run:cmd("cleanup server test") +--- +- true +... +test_run:cmd("delete server test") +--- +- true +... diff --git a/test/box/tx_timeout.test.lua b/test/box/tx_timeout.test.lua new file mode 100644 index 0000000000..cbd6bc4de6 --- /dev/null +++ b/test/box/tx_timeout.test.lua @@ -0,0 +1,82 @@ +test_run = require("test_run").new() +fiber = require("fiber") +test_run:cmd("create server test with script='box/tx_man.lua'") +test_run:cmd("start server test") + +-- Checks for local transactions +test_run:switch("test") +fiber = require("fiber") +ffi = require("ffi") +ffi.cdef("int box_txn_set_timeout(double timeout);") + +-- Check error when we try to set timeout, when +-- there is no active transaction +assert(ffi.C.box_txn_set_timeout(5) == -1) +-- No active transaction +box.error.last() + +-- Check error when try to set timeout, when +-- transaction rollback timer is already running +box.begin({timeout = 100}) +fiber.yield() +assert(ffi.C.box_txn_set_timeout(5) == -1) +-- Operation is not permitted if timer is already running +box.error.last() +box.commit() + + +-- Check arguments for 'box.begin' +box.begin(1) +box.begin({timeout = 0}) +box.begin({timeout = -1}) +box.begin({timeout = "5"}) +-- Check new configuration option 'txn_timeout' +box.cfg({txn_timeout = 0}) +box.cfg({txn_timeout = -1}) +box.cfg({txn_timeout = "5"}) + +s = box.schema.space.create("test") +_ = s:create_index("pk") +txn_timeout = 0.1 +box.cfg({ txn_timeout = txn_timeout }) + +-- Check that transaction aborted by timeout, which +-- was set by the change of box.cfg.txn_timeout +box.begin() +s:replace({1}) +s:select({}) -- [1] +fiber.sleep(txn_timeout + 0.1) +s:select({}) --[] +s:replace({2}) +fiber.yield() +s:select({}) -- [] +box.commit() -- Transaction has been aborted by timeout + +-- Check that transaction aborted by timeout, which +-- was set by appropriate option in box.begin +box.begin({timeout = txn_timeout}) +s:replace({1}) +s:select({}) -- [1] +fiber.sleep(txn_timeout / 2 + 0.1) +s:select({}) --[] +s:replace({2}) +fiber.yield() +s:select({}) -- [] +box.commit() -- Transaction has been aborted by timeout + +-- Check that transaction is not rollback until timeout expired. +box.begin({timeout = 1000}) +s:replace({1}) +s:select({1}) -- [1] +fiber.sleep(0.1) +-- timeout is not expired +s:select({}) -- [1] +box.commit() -- Success +s:select({}) -- [1] + +s:drop() +test_run:switch("default") + +test_run:cmd("stop server test") +test_run:cmd("cleanup server test") +test_run:cmd("delete server test") -- GitLab