From b9f7204b5e0d10b443c6f198e9f7f04e0d16a867 Mon Sep 17 00:00:00 2001
From: mechanik20051988 <mechanik20051988@tarantool.org>
Date: Fri, 1 Oct 2021 19:11:10 +0300
Subject: [PATCH] iproto: implement timeout for iproto transactions

Same as for local transactions, timeout for iproto transactions
was implemented. If timeout is not specified in client request
it's sets to box.cfg.txn_timeout, which specified on server side.

Closes #6177

@TarantoolBot document
Title: ability to set timeout for iproto transactions was implemented
A new `IPROTO_TIMEOUT 0x56` key has been added. Currently it is used to
set a timeout for transactions over iproto streams. It is stored in the
body of 'IPROTO_BEGIN' request. If user want's to specify timeout using
netbox (3s for example), he should use 'stream:begin({timeout = 3}).
---
 ...ent-txn-timeout-for-iproto-transactions.md |   8 +
 src/box/iproto.cc                             |  11 +
 src/box/iproto_constants.c                    |   2 +
 src/box/iproto_constants.h                    |   2 +
 src/box/lua/net_box.c                         |  29 +--
 src/box/lua/net_box.lua                       |  17 +-
 src/box/xrow.c                                |  38 ++++
 src/box/xrow.h                                |  22 ++
 test/box/net.box_tx_timeout.result            | 204 ++++++++++++++++++
 test/box/net.box_tx_timeout.test.lua          |  74 +++++++
 10 files changed, 392 insertions(+), 15 deletions(-)
 create mode 100644 changelogs/unreleased/gh-6177-implement-txn-timeout-for-iproto-transactions.md
 create mode 100644 test/box/net.box_tx_timeout.result
 create mode 100644 test/box/net.box_tx_timeout.test.lua

diff --git a/changelogs/unreleased/gh-6177-implement-txn-timeout-for-iproto-transactions.md b/changelogs/unreleased/gh-6177-implement-txn-timeout-for-iproto-transactions.md
new file mode 100644
index 0000000000..2376076544
--- /dev/null
+++ b/changelogs/unreleased/gh-6177-implement-txn-timeout-for-iproto-transactions.md
@@ -0,0 +1,8 @@
+## feature/core
+
+* Implemented a timeout for iproto transactions after
+  which they are rolled back (gh-6177).
+  Implemented new `IPROTO_TIMEOUT 0x56` key, which is
+  used to set a timeout for transactions over iproto
+  streams. It is stored in the body of 'IPROTO_BEGIN'
+  request.
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 91498e7345..e4d9f8c72c 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -321,6 +321,8 @@ struct iproto_msg
 		struct id_request id;
 		/* SQL request, if this is the EXECUTE/PREPARE request. */
 		struct sql_request sql;
+		/* BEGIN request */
+		struct begin_request begin;
 		/** In case of iproto parse error, saved diagnostics. */
 		struct diag diag;
 	};
@@ -1522,6 +1524,8 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend,
 		cmsg_init(&msg->base, iproto_thread->dml_route[type]);
 		break;
 	case IPROTO_BEGIN:
+		if (xrow_decode_begin(&msg->header, &msg->begin) != 0)
+			goto error;
 		cmsg_init(&msg->base, iproto_thread->begin_route);
 		break;
 	case IPROTO_COMMIT:
@@ -1876,6 +1880,13 @@ tx_process_begin(struct cmsg *m)
 	if (box_txn_begin() != 0)
 		goto error;
 
+	if (msg->begin.timeout != 0 &&
+	    box_txn_set_timeout(msg->begin.timeout) != 0) {
+		int rc = box_txn_rollback();
+		assert(rc == 0);
+		(void)rc;
+		goto error;
+	}
 	out = msg->connection->tx.p_obuf;
 	iproto_reply_ok(out, msg->header.sync, ::schema_version);
 	iproto_wpos_create(&msg->wpos, out);
diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c
index 453fc63a4d..e3ffb0ab92 100644
--- a/src/box/iproto_constants.c
+++ b/src/box/iproto_constants.c
@@ -149,6 +149,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] =
 	/* 0x53 */	MP_UINT, /* IPROTO_TERM */
 	/* 0x54 */	MP_UINT, /* IPROTO_VERSION */
 	/* 0x55 */	MP_ARRAY, /* IPROTO_FEATURES */
+	/* 0x56 */	MP_DOUBLE, /* IPROTO_TIMEOUT */
 	/* }}} */
 };
 
@@ -282,6 +283,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = {
 	"term",             /* 0x53 */
 	"version",          /* 0x54 */
 	"features",         /* 0x55 */
+	"timeout",          /* 0x56 */
 };
 
 const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = {
diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h
index 01df6762c9..f616e66b2f 100644
--- a/src/box/iproto_constants.h
+++ b/src/box/iproto_constants.h
@@ -143,6 +143,8 @@ enum iproto_key {
 	IPROTO_VERSION = 0x54,
 	/** Protocol features. */
 	IPROTO_FEATURES = 0x55,
+	/** Operation timeout. Specific to request type. */
+	IPROTO_TIMEOUT = 0x56,
 	/*
 	 * Be careful to not extend iproto_key values over 0x7f.
 	 * iproto_keys are encoded in msgpack as positive fixnum, which ends at
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index f23e6919d5..c0104585c8 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -1004,15 +1004,13 @@ netbox_encode_unprepare(lua_State *L, int idx, struct mpstream *stream,
 }
 
 static inline void
-netbox_encode_txn(lua_State *L, enum iproto_type type, int idx,
-		  struct mpstream *stream, uint64_t sync,
-		  uint64_t stream_id)
+netbox_encode_commit_or_rollback(lua_State *L, enum iproto_type type, int idx,
+				 struct mpstream *stream, uint64_t sync,
+				 uint64_t stream_id)
 {
 	(void)L;
 	(void) idx;
-	assert(type == IPROTO_BEGIN ||
-	       type == IPROTO_COMMIT ||
-	       type == IPROTO_ROLLBACK);
+	assert(type == IPROTO_COMMIT || type == IPROTO_ROLLBACK);
 	size_t svp = netbox_begin_encode(stream, sync, type, stream_id);
 	netbox_end_encode(stream, svp);
 }
@@ -1021,24 +1019,31 @@ static void
 netbox_encode_begin(struct lua_State *L, int idx, struct mpstream *stream,
 		    uint64_t sync, uint64_t stream_id)
 {
-	return netbox_encode_txn(L, IPROTO_BEGIN, idx, stream,
-				 sync, stream_id);
+	size_t svp = netbox_begin_encode(stream, sync, IPROTO_BEGIN, stream_id);
+	if (!lua_isnoneornil(L, idx)) {
+		assert(lua_type(L, idx) == LUA_TNUMBER);
+		double timeout = lua_tonumber(L, idx);
+		mpstream_encode_map(stream, 1);
+		mpstream_encode_uint(stream, IPROTO_TIMEOUT);
+		mpstream_encode_double(stream, timeout);
+	}
+	netbox_end_encode(stream, svp);
 }
 
 static void
 netbox_encode_commit(struct lua_State *L, int idx, struct mpstream *stream,
 		     uint64_t sync, uint64_t stream_id)
 {
-	return netbox_encode_txn(L, IPROTO_COMMIT, idx, stream,
-				 sync, stream_id);
+	return netbox_encode_commit_or_rollback(L, IPROTO_COMMIT, idx, stream,
+						sync, stream_id);
 }
 
 static void
 netbox_encode_rollback(struct lua_State *L, int idx, struct mpstream *stream,
 		       uint64_t sync, uint64_t stream_id)
 {
-	return netbox_encode_txn(L, IPROTO_ROLLBACK, idx, stream,
-				 sync, stream_id);
+	return netbox_encode_commit_or_rollback(L, IPROTO_ROLLBACK, idx, stream,
+						sync, stream_id);
 }
 
 static void
diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index be527fa481..c1383261e0 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -826,10 +826,21 @@ local function stream_new_stream(stream)
     return stream._conn:new_stream()
 end
 
-local function stream_begin(stream, opts)
+local function stream_begin(stream, txn_opts, netbox_opts)
     check_remote_arg(stream, 'begin')
-    local res = stream:_request(M_BEGIN, opts, nil, stream._stream_id)
-    if opts and opts.is_async then
+    local timeout
+    if txn_opts then
+        if type(txn_opts) ~= 'table' then
+            error("txn_opts should be a table")
+        end
+        timeout = txn_opts.timeout
+        if timeout and (type(timeout) ~= "number" or timeout <= 0) then
+            error("timeout must be a number greater than 0")
+        end
+    end
+    local res = stream:_request(M_BEGIN, netbox_opts, nil,
+                                stream._stream_id, timeout)
+    if netbox_opts and netbox_opts.is_async then
         return res
     end
 end
diff --git a/src/box/xrow.c b/src/box/xrow.c
index 12e9670019..825067728d 100644
--- a/src/box/xrow.c
+++ b/src/box/xrow.c
@@ -1416,6 +1416,44 @@ xrow_decode_error(struct xrow_header *row)
 	box_error_set(__FILE__, __LINE__, code, error);
 }
 
+int
+xrow_decode_begin(const struct xrow_header *row, struct begin_request *request)
+{
+	assert(row->type == IPROTO_BEGIN);
+	memset(request, 0, sizeof(*request));
+
+	/** Request without extra options. */
+	if (row->bodycnt == 0)
+		return 0;
+
+	const char *d = row->body[0].iov_base;
+	if (mp_typeof(*d) != MP_MAP)
+		goto bad_msgpack;
+
+	uint32_t map_size = mp_decode_map(&d);
+	for (uint32_t i = 0; i < map_size; ++i) {
+		if (mp_typeof(*d) != MP_UINT)
+			goto bad_msgpack;
+		uint64_t key = mp_decode_uint(&d);
+		if (key >= IPROTO_KEY_MAX ||
+		    mp_typeof(*d) != iproto_key_type[key])
+			goto bad_msgpack;
+		switch (key) {
+		case IPROTO_TIMEOUT:
+			request->timeout = mp_decode_double(&d);
+			break;
+		default:
+			mp_next(&d);
+			break;
+		}
+	}
+	return 0;
+
+bad_msgpack:
+	xrow_on_decode_err(row, ER_INVALID_MSGPACK, "request body");
+	return -1;
+}
+
 void
 xrow_encode_vote(struct xrow_header *row)
 {
diff --git a/src/box/xrow.h b/src/box/xrow.h
index b7acedc932..762b6e36b6 100644
--- a/src/box/xrow.h
+++ b/src/box/xrow.h
@@ -845,6 +845,28 @@ xrow_to_iovec(const struct xrow_header *row, struct iovec *out);
 void
 xrow_decode_error(struct xrow_header *row);
 
+/**
+ * BEGIN request.
+ */
+struct begin_request {
+	/**
+	 * Timeout for transaction. If timeout expired, transaction
+	 * will be rolled back. Must be greater than zero.
+	 */
+	double timeout;
+};
+
+/**
+ * Parse the BEGIN request.
+ * @param row Encoded data.
+ * @param[out] request Request to decode to.
+ *
+ * @retval  0 Sucess.
+ * @retval -1 Format error.
+ */
+int
+xrow_decode_begin(const struct xrow_header *row, struct begin_request *request);
+
 /**
  * Update vclock with the next LSN value for given replica id.
  * The function will cause panic if the next LSN happens to be
diff --git a/test/box/net.box_tx_timeout.result b/test/box/net.box_tx_timeout.result
new file mode 100644
index 0000000000..3493e062db
--- /dev/null
+++ b/test/box/net.box_tx_timeout.result
@@ -0,0 +1,204 @@
+test_run = require("test_run").new()
+---
+...
+net_box = require("net.box")
+---
+...
+fiber = require("fiber")
+---
+...
+test_run:cmd("create server test with script='box/tx_man.lua'")
+---
+- true
+...
+test_run:cmd("start server test")
+---
+- true
+...
+test_run:cmd("push filter '(.builtin/.*.lua):[0-9]+' to '\\1:<line>'")
+---
+- true
+...
+test_run:switch("test")
+---
+- true
+...
+s = box.schema.space.create("test")
+---
+...
+_ = s:create_index("pk")
+---
+...
+txn_timeout = 0.5
+---
+...
+box.cfg({ txn_timeout = txn_timeout })
+---
+...
+box.schema.user.grant("guest", "super")
+---
+...
+test_run:switch("default")
+---
+- true
+...
+-- Checks for remote transactions
+server_addr = test_run:eval("test", "return box.cfg.listen")[1]
+---
+...
+txn_timeout = test_run:eval("test", "return box.cfg.txn_timeout")[1]
+---
+...
+conn = net_box.connect(server_addr)
+---
+...
+stream = conn:new_stream()
+---
+...
+space = stream.space.test
+---
+...
+-- Check invalid timeout for transaction using raw request
+conn:_request(net_box._method.begin, nil, nil, stream._stream_id, -1)
+---
+- error: Illegal parameters, timeout must be a number greater than 0
+...
+-- Check arguments for 'stream:begin'
+stream:begin(1)
+---
+- error: 'builtin/box/net_box.lua:<line>: txn_opts should be a table'
+...
+stream:begin({timeout = 0})
+---
+- error: 'builtin/box/net_box.lua:<line>: timeout must be a number greater than 0'
+...
+stream:begin({timeout = -1})
+---
+- error: 'builtin/box/net_box.lua:<line>: timeout must be a number greater than 0'
+...
+stream:begin({timeout = "5"})
+---
+- error: 'builtin/box/net_box.lua:<line>: timeout must be a number greater than 0'
+...
+-- Check that transaction aborted by timeout, which
+-- was set by the change of box.cfg.txn_timeout on server
+stream:begin()
+---
+...
+space:replace({1})
+---
+- [1]
+...
+space:select({}) -- [1]
+---
+- - [1]
+...
+fiber.sleep(txn_timeout + 0.1)
+---
+...
+space:select({}) -- []
+---
+- []
+...
+space:replace({2})
+---
+- error: Transaction has been aborted by timeout
+...
+fiber.yield()
+---
+...
+space:select({}) -- []
+---
+- []
+...
+stream:commit() -- transaction was aborted by timeout
+---
+- error: Transaction has been aborted by timeout
+...
+-- Check that transaction aborted by timeout, which
+-- was set by appropriate option in stream:begin
+stream:begin({timeout = txn_timeout})
+---
+...
+space:replace({1})
+---
+- [1]
+...
+space:select({}) -- [1]
+---
+- - [1]
+...
+fiber.sleep(txn_timeout + 0.1)
+---
+...
+space:select({}) -- []
+---
+- []
+...
+space:replace({2})
+---
+- error: Transaction has been aborted by timeout
+...
+fiber.yield()
+---
+...
+space:select({}) -- []
+---
+- []
+...
+stream:commit() -- transaction was aborted by timeout
+---
+- error: Transaction has been aborted by timeout
+...
+-- Check that transaction is not rollback until timeout expired.
+stream:begin({timeout = 1000})
+---
+...
+space:replace({1})
+---
+- [1]
+...
+space:select({}) -- [1]
+---
+- - [1]
+...
+fiber.sleep(0.1)
+---
+...
+space:select({}) -- [1]
+---
+- - [1]
+...
+stream:commit() --Success
+---
+...
+test_run:switch("test")
+---
+- true
+...
+box.schema.user.revoke("guest", "super")
+---
+...
+s:select() -- [1]
+---
+- - [1]
+...
+s:drop()
+---
+...
+test_run:switch("default")
+---
+- true
+...
+test_run:cmd("stop server test")
+---
+- true
+...
+test_run:cmd("cleanup server test")
+---
+- true
+...
+test_run:cmd("delete server test")
+---
+- true
+...
diff --git a/test/box/net.box_tx_timeout.test.lua b/test/box/net.box_tx_timeout.test.lua
new file mode 100644
index 0000000000..1247575683
--- /dev/null
+++ b/test/box/net.box_tx_timeout.test.lua
@@ -0,0 +1,74 @@
+test_run = require("test_run").new()
+net_box = require("net.box")
+fiber = require("fiber")
+test_run:cmd("create server test with script='box/tx_man.lua'")
+test_run:cmd("start server test")
+
+test_run:cmd("push filter '(.builtin/.*.lua):[0-9]+' to '\\1:<line>'")
+
+test_run:switch("test")
+s = box.schema.space.create("test")
+_ = s:create_index("pk")
+txn_timeout = 0.5
+box.cfg({ txn_timeout = txn_timeout })
+box.schema.user.grant("guest", "super")
+test_run:switch("default")
+
+-- Checks for remote transactions
+server_addr = test_run:eval("test", "return box.cfg.listen")[1]
+txn_timeout = test_run:eval("test", "return box.cfg.txn_timeout")[1]
+conn = net_box.connect(server_addr)
+stream = conn:new_stream()
+space = stream.space.test
+
+-- Check invalid timeout for transaction using raw request
+conn:_request(net_box._method.begin, nil, nil, stream._stream_id, -1)
+
+-- Check arguments for 'stream:begin'
+stream:begin(1)
+stream:begin({timeout = 0})
+stream:begin({timeout = -1})
+stream:begin({timeout = "5"})
+
+-- Check that transaction aborted by timeout, which
+-- was set by the change of box.cfg.txn_timeout on server
+stream:begin()
+space:replace({1})
+space:select({}) -- [1]
+fiber.sleep(txn_timeout + 0.1)
+space:select({}) -- []
+space:replace({2})
+fiber.yield()
+space:select({}) -- []
+stream:commit() -- transaction was aborted by timeout
+
+-- Check that transaction aborted by timeout, which
+-- was set by appropriate option in stream:begin
+stream:begin({timeout = txn_timeout})
+space:replace({1})
+space:select({}) -- [1]
+fiber.sleep(txn_timeout + 0.1)
+space:select({}) -- []
+space:replace({2})
+fiber.yield()
+space:select({}) -- []
+stream:commit() -- transaction was aborted by timeout
+
+-- Check that transaction is not rollback until timeout expired.
+stream:begin({timeout = 1000})
+space:replace({1})
+space:select({}) -- [1]
+fiber.sleep(0.1)
+space:select({}) -- [1]
+stream:commit() --Success
+
+
+test_run:switch("test")
+box.schema.user.revoke("guest", "super")
+s:select() -- [1]
+s:drop()
+test_run:switch("default")
+
+test_run:cmd("stop server test")
+test_run:cmd("cleanup server test")
+test_run:cmd("delete server test")
-- 
GitLab