From d5e479ee221a726b497a141df3bcc6f32066bb11 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Thu, 18 Jan 2018 16:10:01 +0300
Subject: [PATCH] Make struct port abstract

So that it can be used not only for serializing a list of tuples, but
also for serializing a Lua stack that stores output of CALL/EVAL.

Needed for #946
---
 extra/exports          |   1 -
 src/box/box.cc         |  11 +++--
 src/box/box.h          |   5 +-
 src/box/call.c         |  28 ++++-------
 src/box/iproto.cc      |  26 ++++++----
 src/box/lua/misc.cc    |  13 +++--
 src/box/lua/schema.lua |  38 +++++++-------
 src/box/port.c         |  81 ++++++++++++++++++++++--------
 src/box/port.h         | 109 ++++++++++++++++++++++++++++++++---------
 9 files changed, 208 insertions(+), 104 deletions(-)

diff --git a/extra/exports b/extra/exports
index 1f1ca0fbf2..b4a89c23d9 100644
--- a/extra/exports
+++ b/extra/exports
@@ -20,7 +20,6 @@ ibuf_create
 ibuf_reinit
 ibuf_destroy
 ibuf_reserve_slow
-port_create
 port_destroy
 csv_create
 csv_destroy
diff --git a/src/box/box.cc b/src/box/box.cc
index a253d4cdcd..0d4a35a761 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -742,7 +742,7 @@ boxk(int type, uint32_t space_id, const char *format, ...)
 int
 box_return_tuple(box_function_ctx_t *ctx, box_tuple_t *tuple)
 {
-	return port_add_tuple(ctx->port, tuple);
+	return port_tuple_add(ctx->port, tuple);
 }
 
 /* schema_find_id()-like method using only public API */
@@ -812,9 +812,10 @@ box_process1(struct request *request, box_tuple_t **result)
 }
 
 int
-box_select(struct port *port, uint32_t space_id, uint32_t index_id,
+box_select(uint32_t space_id, uint32_t index_id,
 	   int iterator, uint32_t offset, uint32_t limit,
-	   const char *key, const char *key_end)
+	   const char *key, const char *key_end,
+	   struct port *port)
 {
 	(void)key_end;
 
@@ -860,6 +861,7 @@ box_select(struct port *port, uint32_t space_id, uint32_t index_id,
 	int rc = 0;
 	uint32_t found = 0;
 	struct tuple *tuple;
+	port_tuple_create(port);
 	while (found < limit) {
 		rc = iterator_next(it, &tuple);
 		if (rc != 0 || tuple == NULL)
@@ -868,7 +870,7 @@ box_select(struct port *port, uint32_t space_id, uint32_t index_id,
 			offset--;
 			continue;
 		}
-		rc = port_add_tuple(port, tuple);
+		rc = port_tuple_add(port, tuple);
 		if (rc != 0)
 			break;
 		found++;
@@ -876,6 +878,7 @@ box_select(struct port *port, uint32_t space_id, uint32_t index_id,
 	iterator_delete(it);
 
 	if (rc != 0) {
+		port_destroy(port);
 		txn_rollback_stmt();
 		return -1;
 	}
diff --git a/src/box/box.h b/src/box/box.h
index 5d758bc92d..dc5d88679d 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -164,9 +164,10 @@ typedef struct tuple box_tuple_t;
 
 /* box_select is private and used only by FFI */
 API_EXPORT int
-box_select(struct port *port, uint32_t space_id, uint32_t index_id,
+box_select(uint32_t space_id, uint32_t index_id,
 	   int iterator, uint32_t offset, uint32_t limit,
-	   const char *key, const char *key_end);
+	   const char *key, const char *key_end,
+	   struct port *port);
 
 /** \cond public */
 
diff --git a/src/box/call.c b/src/box/call.c
index b47df6a754..9000ec7463 100644
--- a/src/box/call.c
+++ b/src/box/call.c
@@ -104,7 +104,7 @@ box_c_call(struct func *func, struct call_request *request, struct obuf *out)
 
 	/* Create a call context */
 	struct port port;
-	port_create(&port);
+	port_tuple_create(&port);
 	box_function_ctx_t ctx = { &port };
 
 	/* Clear all previous errors */
@@ -127,28 +127,20 @@ box_c_call(struct func *func, struct call_request *request, struct obuf *out)
 	if (iproto_prepare_select(out, &svp) != 0)
 		goto error;
 
+	int count;
 	if (request->header->type == IPROTO_CALL_16) {
 		/* Tarantool < 1.7.1 compatibility */
-		if (port_dump(&port, out) != 0) {
-			obuf_rollback_to_svp(out, &svp);
-			goto error;
-		}
-		iproto_reply_select(out, &svp, request->header->sync,
-				    schema_version, port.size);
+		count = port_dump_16(&port, out);
 	} else {
 		assert(request->header->type == IPROTO_CALL);
-		char *size_buf = (char *)
-			obuf_alloc(out, mp_sizeof_array(port.size));
-		if (size_buf == NULL)
-			goto error;
-		mp_encode_array(size_buf, port.size);
-		if (port_dump(&port, out) != 0) {
-			obuf_rollback_to_svp(out, &svp);
-			goto error;
-		}
-		iproto_reply_select(out, &svp, request->header->sync,
-				    schema_version, 1);
+		count = port_dump(&port, out);
+	}
+	if (count < 0) {
+		obuf_rollback_to_svp(out, &svp);
+		goto error;
 	}
+	iproto_reply_select(out, &svp, request->header->sync,
+			    schema_version, count);
 
 	port_destroy(&port);
 	return 0;
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index f878c8113c..b20f36581d 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -1154,30 +1154,36 @@ tx_process_select(struct cmsg *m)
 	struct obuf *out = msg->connection->tx.p_obuf;
 	struct obuf_svp svp;
 	struct port port;
+	int count;
 	int rc;
 	struct request *req = &msg->dml;
 
 	tx_fiber_init(msg->connection->session, msg->header.sync);
 
-	port_create(&port);
-	auto port_guard = make_scoped_guard([&](){ port_destroy(&port); });
-
 	if (tx_check_schema(msg->header.schema_version))
 		goto error;
 
-	rc = box_select(&port,
-			req->space_id, req->index_id,
+	rc = box_select(req->space_id, req->index_id,
 			req->iterator, req->offset, req->limit,
-			req->key, req->key_end);
-	if (rc < 0 || iproto_prepare_select(out, &svp) != 0)
+			req->key, req->key_end, &port);
+	if (rc < 0)
+		goto error;
+	if (iproto_prepare_select(out, &svp) != 0) {
+		port_destroy(&port);
 		goto error;
-	if (port_dump(&port, out) != 0) {
+	}
+	/*
+	 * SELECT output format has not changed since Tarantool 1.6
+	 */
+	count = port_dump_16(&port, out);
+	port_destroy(&port);
+	if (count < 0) {
 		/* Discard the prepared select. */
 		obuf_rollback_to_svp(out, &svp);
 		goto error;
 	}
-	iproto_reply_select(out, &svp, msg->header.sync, ::schema_version,
-			    port.size);
+	iproto_reply_select(out, &svp, msg->header.sync,
+			    ::schema_version, count);
 	iproto_wpos_create(&msg->wpos, out);
 	return;
 error:
diff --git a/src/box/lua/misc.cc b/src/box/lua/misc.cc
index c049d95bd0..bc76065396 100644
--- a/src/box/lua/misc.cc
+++ b/src/box/lua/misc.cc
@@ -60,11 +60,12 @@ lbox_encode_tuple_on_gc(lua_State *L, int idx, size_t *p_len)
 /** {{{ Lua/C implementation of index:select(): used only by Vinyl **/
 
 static inline void
-lbox_port_to_table(lua_State *L, struct port *port)
+lbox_port_to_table(lua_State *L, struct port *port_base)
 {
+	struct port_tuple *port = port_tuple(port_base);
 	lua_createtable(L, port->size, 0);
-	struct port_entry *entry = port->first;
-	for (size_t i = 0 ; i < port->size; i++) {
+	struct port_tuple_entry *entry = port->first;
+	for (int i = 0 ; i < port->size; i++) {
 		luaT_pushtuple(L, entry->tuple);
 		lua_rawseti(L, -2, i + 1);
 		entry = entry->next;
@@ -90,10 +91,8 @@ lbox_select(lua_State *L)
 	const char *key = lbox_encode_tuple_on_gc(L, 6, &key_len);
 
 	struct port port;
-	port_create(&port);
-	if (box_select((struct port *) &port, space_id, index_id, iterator,
-			offset, limit, key, key + key_len) != 0) {
-		port_destroy(&port);
+	if (box_select(space_id, index_id, iterator, offset, limit,
+		       key, key + key_len, &port) != 0) {
 		return luaT_error(L);
 	}
 
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 2e5b4b4dd6..207e94440b 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -75,28 +75,28 @@ ffi.cdef[[
     int
     box_txn_rollback_to_savepoint(box_txn_savepoint_t *savepoint);
 
-    struct port_entry {
-        struct port_entry *next;
+    struct port_tuple_entry {
+        struct port_tuple_entry *next;
         struct tuple *tuple;
     };
 
-    struct port {
+    struct port_tuple {
+        const struct port_vtab *vtab;
         size_t size;
-        struct port_entry *first;
-        struct port_entry *last;
-        struct port_entry first_entry;
+        struct port_tuple_entry *first;
+        struct port_tuple_entry *last;
+        struct port_tuple_entry first_entry;
     };
 
-    void
-    port_create(struct port *port);
-
     void
     port_destroy(struct port *port);
 
     int
-    box_select(struct port *port, uint32_t space_id, uint32_t index_id,
+    box_select(uint32_t space_id, uint32_t index_id,
                int iterator, uint32_t offset, uint32_t limit,
-               const char *key, const char *key_end);
+               const char *key, const char *key_end,
+               struct port *port);
+
     void password_prepare(const char *password, int len,
                           char *out, int out_len);
 ]]
@@ -994,8 +994,8 @@ local iterator_gen_luac = function(param, state)
 end
 
 -- global struct port instance to use by select()/get()
-local port = ffi.new('struct port')
-local port_entry_t = ffi.typeof('struct port_entry')
+local port_tuple = ffi.new('struct port_tuple')
+local port_tuple_entry_t = ffi.typeof('struct port_tuple_entry')
 
 -- Helper function to check space:method() usage
 local function check_space_arg(space, method)
@@ -1224,16 +1224,16 @@ function box.schema.space.bless(space)
         local key, key_end = tuple_encode(key)
         local iterator, offset, limit = check_select_opts(opts, key + 1 >= key_end)
 
-        builtin.port_create(port)
-        if builtin.box_select(port, index.space_id,
-            index.id, iterator, offset, limit, key, key_end) ~=0 then
-            builtin.port_destroy(port);
+        local port = ffi.cast('struct port *', port_tuple)
+
+        if builtin.box_select(index.space_id, index.id,
+            iterator, offset, limit, key, key_end, port) ~= 0 then
             return box.error()
         end
 
         local ret = {}
-        local entry = port.first
-        for i=1,tonumber(port.size),1 do
+        local entry = port_tuple.first
+        for i=1,tonumber(port_tuple.size),1 do
             ret[i] = tuple_bless(entry.tuple)
             entry = entry.next
         end
diff --git a/src/box/port.c b/src/box/port.c
index 175fa60f4f..03f6be79d3 100644
--- a/src/box/port.c
+++ b/src/box/port.c
@@ -37,26 +37,26 @@
 #include <fiber.h>
 #include "errinj.h"
 
-static struct mempool port_entry_pool;
+static struct mempool port_tuple_entry_pool;
 
 int
-port_add_tuple(struct port *port, struct tuple *tuple)
+port_tuple_add(struct port *base, struct tuple *tuple)
 {
-	struct port_entry *e;
+	struct port_tuple *port = port_tuple(base);
+	struct port_tuple_entry *e;
 	if (port->size == 0) {
 		if (tuple_ref(tuple) != 0)
 			return -1;
 		e = &port->first_entry;
 		port->first = port->last = e;
 	} else {
-		e = (struct port_entry *)
-			mempool_alloc(&port_entry_pool);
+		e = mempool_alloc(&port_tuple_entry_pool);
 		if (e == NULL) {
 			diag_set(OutOfMemory, sizeof(*e), "mempool_alloc", "e");
 			return -1;
 		}
 		if (tuple_ref(tuple) != 0) {
-			mempool_free(&port_entry_pool, e);
+			mempool_free(&port_tuple_entry_pool, e);
 			return -1;
 		}
 		port->last->next = e;
@@ -69,54 +69,95 @@ port_add_tuple(struct port *port, struct tuple *tuple)
 }
 
 void
-port_create(struct port *port)
+port_tuple_create(struct port *base)
 {
+	struct port_tuple *port = (struct port_tuple *)base;
+	port->vtab = &port_tuple_vtab;
 	port->size = 0;
 	port->first = NULL;
 	port->last = NULL;
 }
 
-void
-port_destroy(struct port *port)
+static void
+port_tuple_destroy(struct port *base)
 {
-	struct port_entry *e = port->first;
+	struct port_tuple *port = port_tuple(base);
+	struct port_tuple_entry *e = port->first;
 	if (e == NULL)
 		return;
 	tuple_unref(e->tuple);
 	e = e->next;
 	while (e != NULL) {
-		struct port_entry *cur = e;
+		struct port_tuple_entry *cur = e;
 		e = e->next;
 		tuple_unref(cur->tuple);
-		mempool_free(&port_entry_pool, cur);
+		mempool_free(&port_tuple_entry_pool, cur);
 	}
 }
 
-int
-port_dump(struct port *port, struct obuf *out)
+static int
+port_tuple_dump_16(struct port *base, struct obuf *out)
 {
-	for (struct port_entry *pe = port->first; pe != NULL; pe = pe->next) {
+	struct port_tuple *port = port_tuple(base);
+	struct port_tuple_entry *pe;
+	for (pe = port->first; pe != NULL; pe = pe->next) {
 		if (tuple_to_obuf(pe->tuple, out) != 0)
 			return -1;
-
 		ERROR_INJECT(ERRINJ_PORT_DUMP, {
 			diag_set(OutOfMemory, tuple_size(pe->tuple), "obuf_dup",
 				 "data");
 			return -1;
 		});
 	}
-	return 0;
+	return port->size;
+}
+
+static int
+port_tuple_dump(struct port *base, struct obuf *out)
+{
+	struct port_tuple *port = port_tuple(base);
+	char *size_buf = obuf_alloc(out, mp_sizeof_array(port->size));
+	if (size_buf == NULL)
+		return -1;
+	mp_encode_array(size_buf, port->size);
+	if (port_tuple_dump_16(base, out) < 0)
+		return -1;
+	return 1;
+}
+
+void
+port_destroy(struct port *port)
+{
+	return port->vtab->destroy(port);
+}
+
+int
+port_dump(struct port *port, struct obuf *out)
+{
+	return port->vtab->dump(port, out);
+}
+
+int
+port_dump_16(struct port *port, struct obuf *out)
+{
+	return port->vtab->dump_16(port, out);
 }
 
 void
 port_init(void)
 {
-	mempool_create(&port_entry_pool, &cord()->slabc,
-		       sizeof(struct port_entry));
+	mempool_create(&port_tuple_entry_pool, &cord()->slabc,
+		       sizeof(struct port_tuple_entry));
 }
 
 void
 port_free(void)
 {
-	mempool_destroy(&port_entry_pool);
+	mempool_destroy(&port_tuple_entry_pool);
 }
+
+const struct port_vtab port_tuple_vtab = {
+	.dump = port_tuple_dump,
+	.dump_16 = port_tuple_dump_16,
+	.destroy = port_tuple_destroy,
+};
diff --git a/src/box/port.h b/src/box/port.h
index ea185b5d64..7cf3339b53 100644
--- a/src/box/port.h
+++ b/src/box/port.h
@@ -47,43 +47,116 @@ struct obuf;
  * for every server request. State of the instance is represented
  * by the tuples added to it. E.g.:
  *
- * struct port_iproto *port = port_iproto_new(...)
+ * struct port port;
+ * port_tuple_create(&port);
  * for (tuple in tuples)
- *	port_add_tuple(tuple);
+ *	port_tuple_add(tuple);
+ *
+ * port_dump(&port, obuf);
+ * port_destroy(&port);
  *
  * Beginning with Tarantool 1.5, tuple can have different internal
- * structure and port_add_tuple() requires a double
+ * structure and port_tuple_add() requires a double
  * dispatch: first, by the type of the port the tuple is being
  * added to, second, by the type of the tuple format, since the
  * format defines the internal structure of the tuple.
  */
 
-struct port_entry {
-	struct port_entry *next;
-	struct tuple *tuple;
+struct port;
+
+struct port_vtab {
+	/**
+	 * Dump the content of a port to an output buffer.
+	 * On success returns number of entries dumped.
+	 * On failure sets diag and returns -1.
+	 */
+	int (*dump)(struct port *port, struct obuf *out);
+	/**
+	 * Same as dump(), but use the legacy Tarantool 1.6
+	 * format.
+	 */
+	int (*dump_16)(struct port *port, struct obuf *out);
+	/**
+	 * Destroy a port and release associated resources.
+	 */
+	void (*destroy)(struct port *port);
 };
 
+/**
+ * Abstract port instance. It is supposed to be converted to
+ * a concrete port realization, e.g. port_tuple.
+ */
 struct port {
-	size_t size;
-	struct port_entry *first;
-	struct port_entry *last;
-	struct port_entry first_entry;
+	/** Virtual method table. */
+	const struct port_vtab *vtab;
+	/**
+	 * Implementation dependent content. Needed to declare
+	 * an abstract port instance on stack.
+	 */
+	char pad[48];
+};
+
+struct port_tuple_entry {
+	struct port_tuple_entry *next;
+	struct tuple *tuple;
+};
+
+/**
+ * Port implementation used for storing tuples.
+ */
+struct port_tuple {
+	const struct port_vtab *vtab;
+	int size;
+	struct port_tuple_entry *first;
+	struct port_tuple_entry *last;
+	struct port_tuple_entry first_entry;
 };
+static_assert(sizeof(struct port_tuple) <= sizeof(struct port),
+	      "sizeof(struct port_tuple) must be <= sizeof(struct port)");
+
+extern const struct port_vtab port_tuple_vtab;
+
+/**
+ * Convert an abstract port instance to a tuple port.
+ */
+static inline struct port_tuple *
+port_tuple(struct port *port)
+{
+	assert(port->vtab == &port_tuple_vtab);
+	return (struct port_tuple *)port;
+}
 
+/**
+ * Create a port for storing tuples.
+ */
 void
-port_create(struct port *port);
+port_tuple_create(struct port *port);
+
+/**
+ * Append a tuple to a port.
+ */
+int
+port_tuple_add(struct port *port, struct tuple *tuple);
 
 /**
- * Unref all tuples and free allocated memory
+ * Destroy an abstract port instance.
  */
 void
 port_destroy(struct port *port);
 
+/**
+ * Dump an abstract port instance to an output buffer.
+ * Return number of entries dumped on success, -1 on error.
+ */
 int
 port_dump(struct port *port, struct obuf *out);
 
+/**
+ * Same as port_dump(), but use the legacy Tarantool 1.6
+ * format.
+ */
 int
-port_add_tuple(struct port *port, struct tuple *tuple);
+port_dump_16(struct port *port, struct obuf *out);
 
 void
 port_init(void);
@@ -93,16 +166,6 @@ port_free(void);
 
 #if defined(__cplusplus)
 } /* extern "C" */
-
-#include "diag.h"
-
-static inline void
-port_add_tuple_xc(struct port *port, struct tuple *tuple)
-{
-	if (port_add_tuple(port, tuple) != 0)
-		diag_raise();
-}
-
 #endif /* defined __cplusplus */
 
 #endif /* INCLUDES_TARANTOOL_BOX_PORT_H */
-- 
GitLab