From d9e440f6e083bdbc815074b5c46e3a2629df3589 Mon Sep 17 00:00:00 2001
From: "Dmitry E. Oboukhov" <unera@debian.org>
Date: Thu, 11 Oct 2012 18:08:54 +0400
Subject: [PATCH] channel:broadcast

---
 include/ifc.h       |   9 ++
 src/ifc.m           | 242 ++++++++++++++++++++------------------------
 src/ifc_lua.m       |  62 +++++++++---
 test/box/ifc.result | 116 ++++++++++++++++++++-
 test/box/ifc.test   |  30 +++++-
 5 files changed, 310 insertions(+), 149 deletions(-)

diff --git a/include/ifc.h b/include/ifc.h
index fe8795603d..e77784a25a 100644
--- a/include/ifc.h
+++ b/include/ifc.h
@@ -245,6 +245,15 @@ void fiber_channel_put(struct fiber_channel *ch, void *data);
 */
 void *fiber_channel_get(struct fiber_channel *ch);
 
+
+/**
+@brief wake up all fibers that sleep by fiber_channel_get and send message to them
+@param channel
+@param data
+@return count of fibers received the message
+*/
+int fiber_channel_broadcast(struct fiber_channel *ch, void *data);
+
 /**
 @brief check if channel is empty
 @param channel
diff --git a/src/ifc.m b/src/ifc.m
index 22658e1cd5..3a20bc5acb 100644
--- a/src/ifc.m
+++ b/src/ifc.m
@@ -31,19 +31,24 @@
 #include "fiber.h"
 #include <stdlib.h>
 
+#define STAILQ_REMOVE_SAFE(elm, head, type, field)			\
+	{								\
+		struct type *it;					\
+		STAILQ_FOREACH(it, (head), field) {			\
+			if (it != elm)					\
+				continue;				\
+			STAILQ_REMOVE((head), elm, type, field);	\
+			break;						\
+		}							\
+	}
+
+
 
 struct fiber_semaphore {
 	int count;
-	STAILQ_HEAD(, fiber) fibers;
-	ev_async async;
+	STAILQ_HEAD(, fiber) fibers, wakeup;
 };
 
-static void
-ifc_timeout_signal(ev_watcher *watcher, int event __attribute__((unused)))
-{
-	struct fiber *f = watcher->data;
-	fiber_call(f);
-}
 
 struct fiber_semaphore *
 fiber_semaphore_alloc(void)
@@ -51,20 +56,6 @@ fiber_semaphore_alloc(void)
 	return malloc(sizeof(struct fiber_semaphore));
 }
 
-static void
-fiber_semaphore_signal(ev_watcher *watcher, int event __attribute__((unused)))
-{
-	struct fiber_semaphore *s = watcher->data;
-	assert(!STAILQ_EMPTY(&s->fibers));
-
-	struct fiber *f = STAILQ_FIRST(&s->fibers);
-	STAILQ_REMOVE_HEAD(&s->fibers, ifc);
-
-	if (STAILQ_EMPTY(&s->fibers))
-		ev_async_stop(&s->async);
-
-	fiber_call(f);
-}
 
 int
 fiber_semaphore_counter(struct fiber_semaphore *s)
@@ -77,57 +68,56 @@ fiber_semaphore_init(struct fiber_semaphore *s, int cnt)
 {
 	s->count = cnt;
 	STAILQ_INIT(&s->fibers);
-	ev_async_init(&s->async, (void *)fiber_semaphore_signal);
-	s->async.data = s;
+	STAILQ_INIT(&s->wakeup);
 }
 
 
 int
 fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout)
 {
-	if (--s->count >= 0)
+	int count = --s->count;
+
+	if (count >= 0)		/* semaphore is still unlocked */
 		return 0;
 
 	if (timeout < 0)
 		timeout = 0;
 
-	if (STAILQ_EMPTY(&s->fibers))
-		ev_async_start(&s->async);
 	STAILQ_INSERT_TAIL(&s->fibers, fiber, ifc);
 
-	ev_timer timer;
+	bool cancellable = fiber_setcancellable(true);
+
 	if (timeout) {
-		ev_timer_init(&timer, (void *)ifc_timeout_signal, timeout, 0);
-		timer.data = fiber;
-		ev_timer_start(&timer);
+		ev_timer_set(&fiber->timer, timeout, 0);
+		ev_timer_start(&fiber->timer);
+		fiber_yield();
+		ev_timer_stop(&fiber->timer);
+	} else {
+		fiber_yield();
 	}
 
-	bool cancellable = fiber_setcancellable(true);
-	fiber_yield();
-
-	if (timeout)
-		ev_timer_stop(&timer);
 
 	if (fiber_is_cancelled()) {
 		s->count++;
-		struct fiber *f;
-		STAILQ_FOREACH(f, &s->fibers, ifc) {
-			if (f == fiber) {
-				STAILQ_REMOVE(&s->fibers, f, fiber, ifc);
-				if (STAILQ_EMPTY(&s->fibers))
-					ev_async_stop(&s->async);
-				break;
-			}
-		}
+		STAILQ_REMOVE_SAFE(fiber, &s->fibers, fiber, ifc);
+		STAILQ_REMOVE_SAFE(fiber, &s->wakeup, fiber, ifc);
 		fiber_testcancel();
 	}
+
 	fiber_setcancellable(cancellable);
 
-	if (s->count < 0) {
+
+	struct fiber *f;
+	STAILQ_FOREACH(f, &s->fibers, ifc) {
+		if (f != fiber)
+			continue;
 		s->count++;
+		STAILQ_REMOVE(&s->fibers, f, fiber, ifc);
 		return ETIMEDOUT;
 	}
 
+	STAILQ_REMOVE_SAFE(fiber, &s->wakeup, fiber, ifc);
+
 	return 0;
 }
 
@@ -141,8 +131,12 @@ void
 fiber_semaphore_up(struct fiber_semaphore *s)
 {
 	++s->count;
-	if (!STAILQ_EMPTY(&s->fibers))	/* wake up one fiber */
-		ev_async_send(&s->async);
+	if (!STAILQ_EMPTY(&s->fibers)) {	/* wake up one fiber */
+		struct fiber *f = STAILQ_FIRST(&s->fibers);
+		STAILQ_REMOVE_HEAD(&s->fibers, ifc);
+		STAILQ_INSERT_TAIL(&s->wakeup, f, ifc);
+		fiber_wakeup(f);
+	}
 }
 
 int
@@ -209,7 +203,8 @@ struct fiber_channel {
 	unsigned beg;
 	unsigned count;
 
-	ev_async rasync, wasync;
+	void *bcast_msg;
+	struct fiber *bcast;
 
 	void *item[0];
 } __attribute__((packed));
@@ -226,36 +221,6 @@ fiber_channel_isfull(struct fiber_channel *ch)
 	return ch->count >= ch->size;
 }
 
-static void
-fiber_channel_rsignal(ev_watcher *watcher, int event __attribute__((unused)))
-{
-	struct fiber_channel *ch = watcher->data;
-	assert(!STAILQ_EMPTY(&ch->readers));
-
-	struct fiber *f = STAILQ_FIRST(&ch->readers);
-	STAILQ_REMOVE_HEAD(&ch->readers, ifc);
-
-	if (STAILQ_EMPTY(&ch->readers))
-		ev_async_stop(&ch->rasync);
-
-	fiber_call(f);
-}
-
-static void
-fiber_channel_wsignal(ev_watcher *watcher, int event __attribute__((unused)))
-{
-	struct fiber_channel *ch = watcher->data;
-	assert(!STAILQ_EMPTY(&ch->writers));
-
-	struct fiber *f = STAILQ_FIRST(&ch->writers);
-	STAILQ_REMOVE_HEAD(&ch->writers, ifc);
-
-	if (STAILQ_EMPTY(&ch->writers))
-		ev_async_stop(&ch->wasync);
-
-	fiber_call(f);
-}
-
 
 struct fiber_channel *
 fiber_channel_alloc(unsigned size)
@@ -274,13 +239,10 @@ fiber_channel_init(struct fiber_channel *ch)
 {
 
 	ch->beg = ch->count = 0;
+	ch->bcast = NULL;
 
 	STAILQ_INIT(&ch->readers);
 	STAILQ_INIT(&ch->writers);
-	ev_async_init(&ch->rasync, (void *)fiber_channel_rsignal);
-	ch->rasync.data = ch;
-	ev_async_init(&ch->wasync, (void *)fiber_channel_wsignal);
-	ch->wasync.data = ch;
 }
 
 void *
@@ -290,40 +252,30 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
 		timeout = 0;
 	/* channel is empty */
 	if (!ch->count) {
-		if (STAILQ_EMPTY(&ch->readers))
-			ev_async_start(&ch->rasync);
 		STAILQ_INSERT_TAIL(&ch->readers, fiber, ifc);
 		bool cancellable = fiber_setcancellable(true);
 
-		ev_timer timer;
 		if (timeout) {
-			ev_timer_init(&timer,
-				(void *)ifc_timeout_signal, timeout, 0);
-			ev_timer_start(&timer);
-			timer.data = fiber;
+			ev_timer_set(&fiber->timer, timeout, 0);
+			ev_timer_start(&fiber->timer);
+			fiber_yield();
+			ev_timer_stop(&fiber->timer);
+		} else {
+			fiber_yield();
 		}
 
-		fiber_yield();
-
-		if (timeout)
-			ev_timer_stop(&timer);
-
 		if (fiber_is_cancelled() || !ch->count) {
-			struct fiber *f;
-			STAILQ_FOREACH(f, &ch->readers, ifc) {
-				if (f != fiber)
-					continue;
-
-				STAILQ_REMOVE(&ch->readers, f, fiber, ifc);
-
-				if (STAILQ_EMPTY(&ch->readers))
-					ev_async_stop(&ch->rasync);
-
-			}
+			STAILQ_REMOVE_SAFE(fiber, &ch->readers, fiber, ifc);
+			say_info("==== %s(): testcancel", __func__);
 			fiber_testcancel();
 		}
 		fiber_setcancellable(cancellable);
 
+		if (ch->bcast) {
+			fiber_wakeup(ch->bcast);
+			ch->bcast = NULL;
+			return ch->bcast_msg;
+		}
 	}
 
 	/* timeout */
@@ -335,8 +287,13 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
 		ch->beg -= ch->size;
 	ch->count--;
 
-	if (!STAILQ_EMPTY(&ch->writers))
-		ev_async_send(&ch->wasync);
+	if (!STAILQ_EMPTY(&ch->writers)) {
+		struct fiber *f = STAILQ_FIRST(&ch->writers);
+		STAILQ_REMOVE_HEAD(&ch->writers, ifc);
+		fiber_wakeup(f);
+	}
+
+	say_info("==== %s() -> %lu", __func__, (unsigned long)res);
 
 	return res;
 }
@@ -351,43 +308,28 @@ int
 fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
 							ev_tstamp timeout)
 {
+	say_info("==== %s(%lu)", __func__, (unsigned long)data);
 	if (timeout < 0)
 		timeout = 0;
 
 	/* channel is full */
 	if (ch->count >= ch->size) {
-		if (STAILQ_EMPTY(&ch->writers))
-			ev_async_start(&ch->wasync);
-
 
 		STAILQ_INSERT_TAIL(&ch->writers, fiber, ifc);
 
-		ev_timer timer;
+		bool cancellable = fiber_setcancellable(true);
 		if (timeout) {
-			ev_timer_init(&timer,
-				(void *)ifc_timeout_signal, timeout, 0);
-			ev_timer_start(&timer);
-			timer.data = fiber;
+			ev_timer_set(&fiber->timer, timeout, 0);
+			ev_timer_start(&fiber->timer);
+			fiber_yield();
+			ev_timer_stop(&fiber->timer);
+		} else {
+			fiber_yield();
 		}
 
-		bool cancellable = fiber_setcancellable(true);
-		fiber_yield();
-
-		if (timeout)
-			ev_timer_stop(&timer);
 
 		if (fiber_is_cancelled() || ch->count >= ch->size) {
-			struct fiber *f;
-			STAILQ_FOREACH(f, &ch->writers, ifc) {
-				if (f != fiber)
-					continue;
-
-				STAILQ_REMOVE(&ch->writers, f, fiber, ifc);
-
-				if (STAILQ_EMPTY(&ch->writers))
-					ev_async_stop(&ch->wasync);
-
-			}
+			STAILQ_REMOVE_SAFE(fiber, &ch->writers, fiber, ifc);
 			fiber_testcancel();
 		}
 		fiber_setcancellable(cancellable);
@@ -403,8 +345,11 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
 		i -= ch->size;
 
 	ch->item[i] = data;
-	if (!STAILQ_EMPTY(&ch->readers))
-		ev_async_send(&ch->rasync);
+	if (!STAILQ_EMPTY(&ch->readers)) {
+		struct fiber *f = STAILQ_FIRST(&ch->readers);
+		STAILQ_REMOVE_HEAD(&ch->readers, ifc);
+		fiber_wakeup(f);
+	}
 	return 0;
 }
 
@@ -414,3 +359,32 @@ fiber_channel_put(struct fiber_channel *ch, void *data)
 	fiber_channel_put_timeout(ch, data, 0);
 }
 
+int
+fiber_channel_broadcast(struct fiber_channel *ch, void *data)
+{
+	if (STAILQ_EMPTY(&ch->readers))
+		return 0;
+
+	struct fiber *f;
+	int count = 0;
+	STAILQ_FOREACH(f, &ch->readers, ifc) {
+		count++;
+	}
+
+	for (int i = 0; i < count && !STAILQ_EMPTY(&ch->readers); i++) {
+		struct fiber *f = STAILQ_FIRST(&ch->readers);
+		STAILQ_REMOVE_HEAD(&ch->readers, ifc);
+		ch->bcast = fiber;
+		ch->bcast_msg = data;
+		fiber_wakeup(f);
+		fiber_yield();
+		ch->bcast = NULL;
+		fiber_testcancel();
+		if (STAILQ_EMPTY(&ch->readers)) {
+			count = i;
+			break;
+		}
+	}
+
+	return count;
+}
diff --git a/src/ifc_lua.m b/src/ifc_lua.m
index b347cf1726..0d2f82b71c 100644
--- a/src/ifc_lua.m
+++ b/src/ifc_lua.m
@@ -121,8 +121,6 @@ lbox_fiber_semaphore_counter(struct lua_State *L)
 
 	struct fiber_semaphore *sm = lbox_check_semaphore(L, -1);
 
-
-	lua_pop(L, -1);
 	lua_pushnumber(L, fiber_semaphore_counter(sm));
 	return 1;
 }
@@ -389,16 +387,16 @@ lbox_fiber_channel_get(struct lua_State *L)
 		luaL_error(L, "usage: channel:get([timeout])");
 
 	if (top == 2) {
-		if (!lua_isnumber(L, -1))
+		if (!lua_isnumber(L, 2))
 			luaL_error(L, "timeout must be number");
-		timeout = lua_tonumber(L, -1);
+		timeout = lua_tonumber(L, 2);
 		if (timeout < 0)
 			luaL_error(L, "wrong timeout");
 	} else {
 		timeout = 0;
 	}
 
-	struct fiber_channel *ch = lbox_check_channel(L, -top);
+	struct fiber_channel *ch = lbox_check_channel(L, 1);
 
 	lua_Integer rid = (lua_Integer)fiber_channel_get_timeout(ch, timeout);
 
@@ -407,20 +405,59 @@ lbox_fiber_channel_get(struct lua_State *L)
 		return 1;
 	}
 
-	lua_getmetatable(L, -1);
-	lua_pushnumber(L, rid);
+	lua_getmetatable(L, 1);
+
+	lua_pushstring(L, "broadcast_message");
 	lua_gettable(L, -2);
 
-	lua_pushnumber(L, rid);
-	lua_pushnil(L);
-	lua_settable(L, -4);
+	if (lua_isnil(L, -1)) {	/* common messages */
+		lua_pop(L, 1);		/* nil */
 
-	lua_remove(L, -2);	/* cleanup stack */
+		lua_pushnumber(L, rid);		/* extract and delete value */
+		lua_gettable(L, -2);
+
+		lua_pushnumber(L, rid);
+		lua_pushnil(L);
+		lua_settable(L, -4);
+	}
+
+	lua_remove(L, -2);	/* cleanup stack (metatable) */
 	return 1;
 }
 
 
 
+static int
+lbox_fiber_channel_broadcast(struct lua_State *L)
+{
+	struct fiber_channel *ch;
+
+	if (lua_gettop(L) != 2)
+		luaL_error(L, "usage: channel:broadcast(variable)");
+
+	ch = lbox_check_channel(L, -2);
+
+	lua_getmetatable(L, -2);			/* 3 */
+
+	lua_pushstring(L, "broadcast_message");		/* 4 */
+
+	/* save old value */
+	lua_pushstring(L, "broadcast_message");
+	lua_gettable(L, 3);				/* 5 */
+
+	lua_pushstring(L, "broadcast_message");		/* save object */
+	lua_pushvalue(L, 2);
+	lua_settable(L, 3);
+
+	int count = fiber_channel_broadcast(ch, (void *)1);
+
+	lua_settable(L, 3);
+
+	lua_pop(L, 1);		/* stack cleanup */
+	lua_pushnumber(L, count);
+
+	return 1;
+}
 
 
 
@@ -456,6 +493,7 @@ fiber_ifc_lua_init(struct lua_State *L)
 		{"is_empty",	lbox_fiber_channel_isempty},
 		{"put",		lbox_fiber_channel_put},
 		{"get",		lbox_fiber_channel_get},
+		{"broadcast",	lbox_fiber_channel_broadcast},
 		{NULL, NULL}
 	};
 	tarantool_lua_register_type(L, channel_lib, channel_meta);
@@ -476,6 +514,6 @@ fiber_ifc_lua_init(struct lua_State *L)
 	lua_newtable(L);			/* box.ifc table */
 	luaL_register(L, NULL, ifc_meta);
 	lua_settable(L, -3);
-	lua_pop(L, -1);
+	lua_pop(L, 1);
 }
 
diff --git a/test/box/ifc.result b/test/box/ifc.result
index 8594790add..c33c5c1737 100644
--- a/test/box/ifc.result
+++ b/test/box/ifc.result
@@ -105,6 +105,7 @@ lua  for k, v in pairs(state) do print(k, ': ', v) end
 ---
 1: tfbr started
 2: tfbrd started
+3: tfbr
 ...
 ================ mutexes =======================
 lua m = box.ifc.mutex()
@@ -250,9 +251,13 @@ lua box.fiber.sleep(.5)
 lua box.fiber.cancel(tfbr)
 ---
 ...
-lua ch:put(box.info.pid) > 0
+lua ch:put(box.info.pid)
 ---
-error: '[string "return ch:put(box.info.pid) > 0"]:1: attempt to compare number with boolean'
+ - true
+...
+lua ch:get(box.info.pid) == box.info.pid
+---
+ - true
 ...
 lua for i, v in pairs(buffer) do print(v) end
 ---
@@ -268,3 +273,110 @@ lua for i, v in pairs(buffer) do print(v) end
 9
 10
 ...
+lua ch:broadcast()
+---
+error: 'usage: channel:broadcast(variable)'
+...
+lua ch:broadcast(123)
+---
+ - 0
+...
+lua ch:is_full()
+---
+ - false
+...
+lua ch:is_empty()
+---
+ - true
+...
+lua tfbr = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr  - ' .. tostring(v)) end end)
+---
+...
+lua box.fiber.resume(tfbr)
+---
+...
+lua tfbr2 = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr2 - ' .. tostring(v)) end end)
+---
+...
+lua box.fiber.resume(tfbr2)
+---
+...
+lua buffer = {}
+---
+...
+lua box.fiber.sleep(2)
+---
+...
+lua for i, v in pairs(buffer) do print(v) end
+---
+...
+lua ch:is_full()
+---
+ - false
+...
+lua ch:is_empty()
+---
+ - true
+...
+lua ch:put(1)
+---
+ - true
+...
+lua ch:put(2)
+---
+ - true
+...
+lua ch:put(3)
+---
+ - true
+...
+lua ch:put(4)
+---
+ - true
+...
+lua ch:put(5)
+---
+ - true
+...
+lua box.fiber.sleep(0.5)
+---
+...
+lua ch:broadcast('broadcast message!')
+---
+ - 2
+...
+lua for i = 35, 45 do print(ch:put(i)) end
+---
+true
+true
+true
+true
+true
+true
+true
+true
+true
+true
+true
+...
+lua for i, v in pairs(buffer) do print(v) end
+---
+tfbr  - 1
+tfbr2 - 2
+tfbr  - 3
+tfbr2 - 4
+tfbr  - 5
+tfbr2 - broadcast message!
+tfbr  - broadcast message!
+tfbr2 - 35
+tfbr  - 36
+tfbr2 - 37
+tfbr  - 38
+tfbr2 - 39
+tfbr  - 40
+tfbr2 - 41
+tfbr  - 42
+tfbr2 - 43
+tfbr  - 44
+tfbr2 - 45
+...
diff --git a/test/box/ifc.test b/test/box/ifc.test
index 0a61091bef..15a8d790a3 100644
--- a/test/box/ifc.test
+++ b/test/box/ifc.test
@@ -89,7 +89,35 @@ exec admin "lua box.fiber.resume(tfbr)"
 exec admin "lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end"
 exec admin "lua box.fiber.sleep(.5)"
 exec admin "lua box.fiber.cancel(tfbr)"
-exec admin "lua ch:put(box.info.pid) > 0"
+exec admin "lua ch:put(box.info.pid)"
+exec admin "lua ch:get(box.info.pid) == box.info.pid"
 exec admin "lua for i, v in pairs(buffer) do print(v) end"
 
+exec admin "lua ch:broadcast()"
+exec admin "lua ch:broadcast(123)"
+
+exec admin "lua ch:is_full()"
+exec admin "lua ch:is_empty()"
+exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr  - ' .. tostring(v)) end end)"
+exec admin "lua box.fiber.resume(tfbr)"
+exec admin "lua tfbr2 = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr2 - ' .. tostring(v)) end end)"
+exec admin "lua box.fiber.resume(tfbr2)"
+
+exec admin "lua buffer = {}"
+exec admin "lua box.fiber.sleep(2)"
+
+exec admin "lua for i, v in pairs(buffer) do print(v) end"
+exec admin "lua ch:is_full()"
+exec admin "lua ch:is_empty()"
+exec admin "lua ch:put(1)"
+exec admin "lua ch:put(2)"
+exec admin "lua ch:put(3)"
+exec admin "lua ch:put(4)"
+exec admin "lua ch:put(5)"
+exec admin "lua box.fiber.sleep(0.5)"
+exec admin "lua ch:broadcast('broadcast message!')"
+exec admin "lua for i = 35, 45 do print(ch:put(i)) end"
+
+
+exec admin "lua for i, v in pairs(buffer) do print(v) end"
 
-- 
GitLab