From 866777c61f992c9539c6736148346edfb87980d0 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Sun, 3 May 2015 16:50:44 +0300
Subject: [PATCH] ipc: allocate channels fully in Lua memory

This saves one malloc() per channel and tells the GC more
about how much memory is taken by a channel.
---
 src/ipc.cc     | 51 +++++++++++++++++++++++++++++++++++---------------
 src/ipc.h      | 20 ++++++++++++++++++++
 src/lua/ipc.cc | 10 +++++-----
 3 files changed, 61 insertions(+), 20 deletions(-)

diff --git a/src/ipc.cc b/src/ipc.cc
index 637f64435f..3b282666fe 100644
--- a/src/ipc.cc
+++ b/src/ipc.cc
@@ -30,23 +30,15 @@
 #include "fiber.h"
 #include <stdlib.h>
 
-static void
-ipc_channel_create(struct ipc_channel *ch);
-
-static void
-ipc_channel_destroy(struct ipc_channel *ch);
-
 struct ipc_channel *
 ipc_channel_new(unsigned size)
 {
-	if (!size)
+	if (size == 0)
 		size = 1;
 	struct ipc_channel *res = (struct ipc_channel *)
-		malloc(sizeof(struct ipc_channel) + sizeof(void *) * size);
-	if (res == NULL)
-		return NULL;
-	res->size = size;
-	ipc_channel_create(res);
+		malloc(ipc_channel_memsize(size));
+	if (res != NULL)
+		ipc_channel_create(res, size);
 	return res;
 }
 
@@ -57,9 +49,10 @@ ipc_channel_delete(struct ipc_channel *ch)
 	free(ch);
 }
 
-static void
-ipc_channel_create(struct ipc_channel *ch)
+void
+ipc_channel_create(struct ipc_channel *ch, unsigned size)
 {
+	ch->size = size;
 	ch->beg = ch->count = 0;
 	ch->readonly = ch->closed = false;
 	ch->close = NULL;
@@ -68,17 +61,45 @@ ipc_channel_create(struct ipc_channel *ch)
 	rlist_create(&ch->writers);
 }
 
-static void
+void
 ipc_channel_destroy(struct ipc_channel *ch)
 {
+	/**
+	 * XXX: this code is buggy, since the deleted fibers are
+	 * not woken up, but luckily it never gets called.
+	 * As long as channels are only used in Lua, the situation
+	 * that ipc_channel_destroy() is called on a channel which
+	 * has waiters is impossible:
+	 *
+	 * if there is a Lua fiber waiting on a channel, neither
+	 * the channel nor the fiber will ever get collected. The
+	 * fiber Lua stack will keep a reference to the channel
+	 * userdata, and the stack itself is referenced while the
+	 * fiber is waiting.  So, as long as channels are used
+	 * from Lua, only a channel which has no waiters can get
+	 * collected.
+	 *
+	 * The other part of the problem, however, is that such
+	 * orphaned waiters create a garbage collection loop and
+	 * leak memory.
+	 * The only solution, it seems, is to implement some sort
+	 * of shutdown() on a channel, which wakes up all waiters,
+	 * and use it explicitly in Lua.  Waking up waiters in
+	 * __gc/destroy is not a solution, since __gc will simply
+	 * never get called.
+         */
 	while (!rlist_empty(&ch->writers)) {
 		struct fiber *f =
 			rlist_first_entry(&ch->writers, struct fiber, state);
+		say_error("closing a channel which has a write waiter %d %s",
+			  f->fid, fiber_name(f));
 		rlist_del_entry(f, state);
 	}
 	while (!rlist_empty(&ch->readers)) {
 		struct fiber *f =
 			rlist_first_entry(&ch->readers, struct fiber, state);
+		say_error("closing a channel which has a read waiter %d %s",
+			  f->fid, fiber_name(f));
 		rlist_del_entry(f, state);
 	}
 }
diff --git a/src/ipc.h b/src/ipc.h
index e35e56a2c9..7d5a8fa7b6 100644
--- a/src/ipc.h
+++ b/src/ipc.h
@@ -49,6 +49,26 @@ struct ipc_channel {
 	void *item[0];
 };
 
+static inline size_t
+ipc_channel_memsize(unsigned size)
+{
+	return sizeof(struct ipc_channel) + sizeof(void *) * size;
+}
+
+/**
+ * Initialize a channel (the memory should have
+ * been correctly allocated for the channel.
+ */
+void
+ipc_channel_create(struct ipc_channel *ch, unsigned size);
+
+
+/**
+ * Destroy a channel. Does not free allocated memory.
+ */
+void
+ipc_channel_destroy(struct ipc_channel *ch);
+
 /**
  * @brief Allocate and construct new IPC channel
  * @param size of channel
diff --git a/src/lua/ipc.cc b/src/lua/ipc.cc
index 89e7851c57..526d225f40 100644
--- a/src/lua/ipc.cc
+++ b/src/lua/ipc.cc
@@ -58,11 +58,12 @@ lbox_ipc_channel(struct lua_State *L)
 		if (size < 0)
 			luaL_error(L, "fiber.channel(size): negative size");
 	}
-	struct ipc_channel *ch = ipc_channel_new(size);
+	struct ipc_channel *ch = (struct ipc_channel *)
+		lua_newuserdata(L, ipc_channel_memsize(size));
 	if (!ch)
 		luaL_error(L, "fiber.channel: Not enough memory");
+	ipc_channel_create(ch, size);
 
-	void **ptr = (void **) lua_newuserdata(L, sizeof(void *));
 	luaL_getmetatable(L, channel_lib);
 
 	lua_pushstring(L, "rid");	/* first object id */
@@ -70,14 +71,13 @@ lbox_ipc_channel(struct lua_State *L)
 	lua_settable(L, -3);
 
 	lua_setmetatable(L, -2);
-	*ptr = ch;
 	return 1;
 }
 
 static inline struct ipc_channel *
 lbox_check_channel(struct lua_State *L, int narg)
 {
-	return *(struct ipc_channel **) luaL_checkudata(L, narg, channel_lib);
+	return (struct ipc_channel *) luaL_checkudata(L, narg, channel_lib);
 }
 
 static int
@@ -86,7 +86,7 @@ lbox_ipc_channel_gc(struct lua_State *L)
 	if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
 		return 0;
 	struct ipc_channel *ch = lbox_check_channel(L, -1);
-	ipc_channel_delete(ch);
+	ipc_channel_destroy(ch);
 	return 0;
 }
 
-- 
GitLab