From d461caf3d00c091d7bb5796ce9b3f312b0411193 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Thu, 18 Jan 2018 17:00:21 +0300
Subject: [PATCH] call: separate function invocation from result encoding

The iproto subsystem switches between two output buffers once in a while
in order to reclaim memory so passing a pointer to the output buffer
directly to box_process_call() or box_process_eval() is incorrect in
case the called function yields. To fix that, let's make these functions
return the CALL/EVAL result in a port object, which then can then be
encoded in msgpack with port_dump().

Needed for #946
---
 src/box/call.c     |  47 ++++-------------
 src/box/call.h     |   5 +-
 src/box/iproto.cc  |  44 ++++++++++++++--
 src/box/lua/call.c | 122 +++++++++++++++++++++++++++++----------------
 src/box/lua/call.h |   9 ++--
 5 files changed, 136 insertions(+), 91 deletions(-)

diff --git a/src/box/call.c b/src/box/call.c
index 9000ec7463..d3d6877a2a 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -98,14 +98,13 @@ access_check_func(const char *name, uint32_t name_len, struct func **funcp)
 }
 
 static int
-box_c_call(struct func *func, struct call_request *request, struct obuf *out)
+box_c_call(struct func *func, struct call_request *request, struct port *port)
 {
 	assert(func != NULL && func->def->language == FUNC_LANGUAGE_C);
 
 	/* Create a call context */
-	struct port port;
-	port_tuple_create(&port);
-	box_function_ctx_t ctx = { &port };
+	port_tuple_create(port);
+	box_function_ctx_t ctx = { port };
 
 	/* Clear all previous errors */
 	diag_clear(&fiber()->diag);
@@ -119,36 +118,10 @@ box_c_call(struct func *func, struct call_request *request, struct obuf *out)
 			/* Stored procedure forget to set diag  */
 			diag_set(ClientError, ER_PROC_C, "unknown error");
 		}
-		goto error;
-	}
-
-	/* Push results to obuf */
-	struct obuf_svp svp;
-	if (iproto_prepare_select(out, &svp) != 0)
-		goto error;
-
-	int count;
-	if (request->header->type == IPROTO_CALL_16) {
-		/* Tarantool < 1.7.1 compatibility */
-		count = port_dump_16(&port, out);
-	} else {
-		assert(request->header->type == IPROTO_CALL);
-		count = port_dump(&port, out);
-	}
-	if (count < 0) {
-		obuf_rollback_to_svp(out, &svp);
-		goto error;
+		port_destroy(port);
+		return -1;
 	}
-	iproto_reply_select(out, &svp, request->header->sync,
-			    schema_version, count);
-
-	port_destroy(&port);
 	return 0;
-
-error:
-	txn_rollback();
-	port_destroy(&port);
-	return -1;
 }
 
 int
@@ -170,7 +143,7 @@ box_func_reload(const char *name)
 }
 
 int
-box_process_call(struct call_request *request, struct obuf *out)
+box_process_call(struct call_request *request, struct port *port)
 {
 	rmean_collect(rmean_box, IPROTO_CALL, 1);
 	/**
@@ -216,9 +189,9 @@ box_process_call(struct call_request *request, struct obuf *out)
 
 	int rc;
 	if (func && func->def->language == FUNC_LANGUAGE_C) {
-		rc = box_c_call(func, request, out);
+		rc = box_c_call(func, request, port);
 	} else {
-		rc = box_lua_call(request, out);
+		rc = box_lua_call(request, port);
 	}
 	/* Restore the original user */
 	if (orig_credentials)
@@ -240,13 +213,13 @@ box_process_call(struct call_request *request, struct obuf *out)
 }
 
 int
-box_process_eval(struct call_request *request, struct obuf *out)
+box_process_eval(struct call_request *request, struct port *port)
 {
 	rmean_collect(rmean_box, IPROTO_EVAL, 1);
 	/* Check permissions */
 	if (access_check_universe(PRIV_X) != 0)
 		return -1;
-	if (box_lua_eval(request, out) != 0) {
+	if (box_lua_eval(request, port) != 0) {
 		txn_rollback();
 		return -1;
 	}
diff --git a/src/box/call.h b/src/box/call.h
index b334626ae0..eabba69763 100644
--- a/src/box/call.h
+++ b/src/box/call.h
@@ -35,7 +35,6 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
-struct obuf;
 struct port;
 struct call_request;
 
@@ -47,10 +46,10 @@ int
 box_func_reload(const char *name);
 
 int
-box_process_call(struct call_request *request, struct obuf *out);
+box_process_call(struct call_request *request, struct port *port);
 
 int
-box_process_eval(struct call_request *request, struct obuf *out);
+box_process_eval(struct call_request *request, struct port *port);
 
 #if defined(__cplusplus)
 } /* extern "C" */
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index b20f36581d..47506ebaf7 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1194,7 +1194,6 @@ static void
 tx_process_call(struct cmsg *m)
 {
 	struct iproto_msg *msg = tx_accept_msg(m);
-	struct obuf *out = msg->connection->tx.p_obuf;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
@@ -1202,13 +1201,15 @@ tx_process_call(struct cmsg *m)
 		goto error;
 
 	int rc;
+	struct port port;
+
 	switch (msg->header.type) {
 	case IPROTO_CALL:
 	case IPROTO_CALL_16:
-		rc = box_process_call(&msg->call, out);
+		rc = box_process_call(&msg->call, &port);
 		break;
 	case IPROTO_EVAL:
-		rc = box_process_eval(&msg->call, out);
+		rc = box_process_eval(&msg->call, &port);
 		break;
 	default:
 		unreachable();
@@ -1217,6 +1218,43 @@ tx_process_call(struct cmsg *m)
 	if (rc != 0)
 		goto error;
 
+	/*
+	 * Add all elements returned by the function to iproto.
+	 *
+	 * To allow clients to understand a complex return from
+	 * a procedure, we are compatible with SELECT protocol,
+	 * and return the number of return values first, and
+	 * then each return value as a tuple.
+	 *
+	 * (!) Please note that a save point for output buffer
+	 * must be taken only after finishing executing of Lua
+	 * function because Lua can yield and leave the
+	 * buffer in inconsistent state (a parallel request
+	 * from the same connection will break the protocol).
+	 */
+
+	int count;
+	struct obuf *out;
+	struct obuf_svp svp;
+
+	out = msg->connection->tx.p_obuf;
+	if (iproto_prepare_select(out, &svp) != 0) {
+		port_destroy(&port);
+		goto error;
+	}
+
+	if (msg->header.type == IPROTO_CALL_16)
+		count = port_dump_16(&port, out);
+	else
+		count = port_dump(&port, out);
+	port_destroy(&port);
+	if (count < 0) {
+		obuf_rollback_to_svp(out, &svp);
+		goto error;
+	}
+
+	iproto_reply_select(out, &svp, msg->header.sync,
+			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
 error:
diff --git a/src/box/lua/call.c b/src/box/lua/call.c
index fb5bb26be5..be13812aaa 100644
--- a/src/box/lua/call.c
+++ b/src/box/lua/call.c
@@ -36,12 +36,11 @@
 #include "lua/utils.h"
 #include "lua/msgpack.h"
 
-#include "box/txn.h"
 #include "box/xrow.h"
-#include "box/iproto_constants.h"
+#include "box/port.h"
 #include "box/lua/tuple.h"
-#include "box/schema.h"
 #include "small/obuf.h"
+#include "trivia/util.h"
 
 /**
  * A helper to find a Lua function by name and put it
@@ -345,6 +344,12 @@ encode_lua_call(lua_State *L)
 		lua_topointer(L, -1);
 	lua_pop(L, 1);
 
+	/*
+	 * Add all elements from Lua stack to the buffer.
+	 *
+	 * TODO: forbid explicit yield from __serialize or __index here
+	 */
+
 	struct mpstream stream;
 	mpstream_init(&stream, ctx->out, obuf_reserve_cb, obuf_alloc_cb,
 		      luamp_error, L);
@@ -359,68 +364,101 @@ encode_lua_call(lua_State *L)
 	return 0;
 }
 
+/**
+ * Port for storing the result of a Lua CALL/EVAL.
+ */
+struct port_lua {
+	const struct port_vtab *vtab;
+	/** Lua state that stores the result. */
+	struct lua_State *L;
+	/** Reference to L in tarantool_L. */
+	int ref;
+};
+static_assert(sizeof(struct port_lua) <= sizeof(struct port),
+	      "sizeof(struct port_lua) must be <= sizeof(struct port)");
+
+static const struct port_vtab port_lua_vtab;
+
 static inline int
-box_process_lua(struct call_request *request, struct obuf *out, lua_CFunction handler)
+port_lua_do_dump(struct port *base, bool call_16, struct obuf *out)
+{
+	struct port_lua *port = (struct port_lua *)base;
+	assert(port->vtab == &port_lua_vtab);
+
+	struct lua_State *L = port->L;
+	struct encode_lua_call_ctx ctx = { out, call_16, 0 };
+	lua_pushlightuserdata(L, &ctx);
+	if (luaT_call(L, lua_gettop(L) - 1, 0) != 0)
+		return -1;
+
+	return ctx.count;
+}
+
+static int
+port_lua_dump(struct port *base, struct obuf *out)
+{
+	return port_lua_do_dump(base, false, out);
+}
+
+static int
+port_lua_dump_16(struct port *base, struct obuf *out)
+{
+	return port_lua_do_dump(base, true, out);
+}
+
+static void
+port_lua_destroy(struct port *base)
+{
+	struct port_lua *port = (struct port_lua *)base;
+	assert(port->vtab == &port_lua_vtab);
+
+	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, port->ref);
+}
+
+static const struct port_vtab port_lua_vtab = {
+	.dump = port_lua_dump,
+	.dump_16 = port_lua_dump_16,
+	.destroy = port_lua_destroy,
+};
+
+static inline int
+box_process_lua(struct call_request *request, struct port *base,
+		lua_CFunction handler)
 {
 	lua_State *L = lua_newthread(tarantool_L);
 	int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
 
 	/*
 	 * Push the encoder function first - values returned by
-	 * the handler will be passed to it as arguments.
+	 * the handler will be passed to it as arguments, see
+	 * port_lua_dump().
 	 */
 	lua_pushcfunction(L, encode_lua_call);
 
 	lua_pushcfunction(L, handler);
 	lua_pushlightuserdata(L, request);
-	if (luaT_call(L, 1, LUA_MULTRET) != 0)
-		goto error;
-
-	/*
-	 * Add all elements from Lua stack to iproto.
-	 *
-	 * (!) Please note that a save point for output buffer
-	 * must be taken only after finishing executing of Lua
-	 * function because Lua can yield and leave the
-	 * buffer in inconsistent state (a parallel request
-	 * from the same connection will break the protocol).
-	 *
-	 * TODO: forbid explicit yield from __serialize or __index here
-	 */
-	struct obuf_svp svp;
-	if (iproto_prepare_select(out, &svp) != 0)
-		goto error;
-
-	struct encode_lua_call_ctx ctx = { out, false, 0 };
-	if (request->header->type == IPROTO_CALL_16)
-		ctx.call_16 = true;
-
-	lua_pushlightuserdata(L, &ctx);
-	if (luaT_call(L, lua_gettop(L) - 1, 0) != 0) {
-		obuf_rollback_to_svp(out, &svp);
-		goto error;
+	if (luaT_call(L, 1, LUA_MULTRET) != 0) {
+		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+		return -1;
 	}
 
-	iproto_reply_select(out, &svp, request->header->sync,
-			    schema_version, ctx.count);
-
-	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
+	struct port_lua *port = (struct port_lua *)base;
+	port->vtab = &port_lua_vtab;
+	port->L = L;
+	port->ref = coro_ref;
 	return 0;
-error:
-	luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
-	return -1;
 }
 
 int
-box_lua_call(struct call_request *request, struct obuf *out)
+box_lua_call(struct call_request *request, struct port *port)
 {
-	return box_process_lua(request, out, execute_lua_call);
+	return box_process_lua(request, port, execute_lua_call);
 }
 
 int
-box_lua_eval(struct call_request *request, struct obuf *out)
+box_lua_eval(struct call_request *request, struct port *port)
 {
-	return box_process_lua(request, out, execute_lua_eval);
+	return box_process_lua(request, port, execute_lua_eval);
 }
 
 static int
diff --git a/src/box/lua/call.h b/src/box/lua/call.h
index 655ff893c6..0542123da6 100644
--- a/src/box/lua/call.h
+++ b/src/box/lua/call.h
@@ -31,9 +31,6 @@
  * SUCH DAMAGE.
  */
 
-#include <stdint.h>
-#include "trivia/util.h"
-
 #if defined(__cplusplus)
 extern "C" {
 #endif /* defined(__cplusplus) */
@@ -43,18 +40,18 @@ struct lua_State;
 void
 box_lua_call_init(struct lua_State *L);
 
+struct port;
 struct call_request;
-struct obuf;
 
 /**
  * Invoke a Lua stored procedure from the binary protocol
  * (implementation of 'CALL' command code).
  */
 int
-box_lua_call(struct call_request *request, struct obuf *out);
+box_lua_call(struct call_request *request, struct port *port);
 
 int
-box_lua_eval(struct call_request *request, struct obuf *out);
+box_lua_eval(struct call_request *request, struct port *port);
 
 #if defined(__cplusplus)
 } /* extern "C" */
-- 
GitLab