Skip to content
Snippets Groups Projects
Commit 9b00608e authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Merge remote-tracking branch 'origin/stable'

Conflicts:
	doc/user/target.db
	src/ipc.cc
	test/box/ipc.test
parents a84cec28 f8449827
No related branches found
No related tags found
No related merge requests found
......@@ -2727,7 +2727,7 @@ for instructions about defining triggers for connect and disconnect events
The channel is garbage collected when no one is using it, as with any
other Lua object.
Object-oriented and functional APIs are equivalent, so <code>channel:put(message)</code>
is the same as <code>box.ipc.channel.put(channel, message)</code>.
is the same as <code>channel:put(channel, message)</code>.
</para>
<variablelist xml:id="box.ipc">
<para>
......@@ -2747,11 +2747,11 @@ for instructions about defining triggers for connect and disconnect events
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.put(<replaceable>channel, message[, timeout]</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:put(<replaceable>channel, message[, timeout]</replaceable>)</emphasis></term>
<listitem>
<para>
Send a message using a channel. If the channel is full,
<code>box.ipc.channel.put()</code>
<code>channel:put()</code>
blocks until there is a free slot in the channel.
</para>
<para>
......@@ -2761,17 +2761,28 @@ for instructions about defining triggers for connect and disconnect events
Returns: If <code>timeout</code> is provided,
and the channel doesn't become empty for the duration
of the timeout,
<code>box.ipc.channel.put()</code>
<code>channel:put()</code>
returns false. Otherwise it returns true.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.get(<replaceable>channel[, timeout]</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:close()</emphasis></term>
<listitem>
<simpara>
Close the channel. All waiters in the channel will be
woken up. All following <code>channel:put()</code>
or <code>channel:get()</code> operations will return
an error (nil).
</simpara>
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">channel:get(<replaceable>channel[, timeout]</replaceable>)</emphasis></term>
<listitem>
<para>
Fetch a message from a channel. If the channel is empty,
<code>box.ipc.channel.get()</code>
<code>channel:get()</code>
blocks until there is a message.
</para>
<para>
......@@ -2781,18 +2792,18 @@ for instructions about defining triggers for connect and disconnect events
Possible errors: If <code>timeout</code> is provided,
and there are no new messages for the duration
of the timeout,
<code>box.ipc.channel.get()</code>
<code>channel:get()</code>
returns error.
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.broadcast(<replaceable>channel, message, timeout</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:broadcast(<replaceable>channel, message, timeout</replaceable>)</emphasis></term>
<listitem>
<para>
If the channel is empty, <code>box.ipc.channel.broadcast()</code> is equivalent to
<code>box.ipc.channel.put()</code>.
Otherwise, <code>box.ipc.channel.broadcast()</code> sends the message to all readers of the
If the channel is empty, <code>channel:broadcast()</code> is equivalent to
<code>channel:put()</code>.
Otherwise, <code>channel:broadcast()</code> sends the message to all readers of the
channel.
</para>
<para>
......@@ -2801,7 +2812,7 @@ for instructions about defining triggers for connect and disconnect events
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.is_empty(<replaceable>channel</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:is_empty(<replaceable>channel</replaceable>)</emphasis></term>
<listitem>
<para>
Check whether the specified channel is empty (has no messages).
......@@ -2815,7 +2826,7 @@ for instructions about defining triggers for connect and disconnect events
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.is_full(<replaceable>channel</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:is_full(<replaceable>channel</replaceable>)</emphasis></term>
<listitem>
<para>
Check whether the specified channel is full.
......@@ -2829,11 +2840,11 @@ for instructions about defining triggers for connect and disconnect events
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.has_readers(<replaceable>channel</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:has_readers(<replaceable>channel</replaceable>)</emphasis></term>
<listitem>
<para>
Check whether the specified channel is empty and has readers waiting
for a message (because they have issued <code>box.ipc.channel.get()</code> and then
for a message (because they have issued <code>channel:get()</code> and then
blocked).
</para>
<para>
......@@ -2845,11 +2856,11 @@ for instructions about defining triggers for connect and disconnect events
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">box.ipc.channel.has_writers(<replaceable>channel</replaceable>)</emphasis></term>
<term><emphasis role="lua">channel:has_writers(<replaceable>channel</replaceable>)</emphasis></term>
<listitem>
<para>
Check whether the specified channel is full and has writers waiting
(because they have issued <code>box.ipc.channel.put()</code> and then blocked
(because they have issued <code>channel:put()</code> and then blocked
due to lack of room).
</para>
<para>
......@@ -2860,6 +2871,16 @@ for instructions about defining triggers for connect and disconnect events
</para>
</listitem>
</varlistentry>
<varlistentry>
<term><emphasis role="lua">channel:is_closed()</emphasis></term>
<listitem>
<simpara>
Return true if the specified channel is already
closed.
Otherwise return false.
</simpara>
</listitem>
</varlistentry>
</variablelist>
<para>
<bridgehead renderas="sect4">Example</bridgehead><programlisting>
......
This diff is collapsed.
......@@ -32,7 +32,10 @@
#include "salad/rlist.h"
struct ipc_channel {
struct rlist readers, writers, bcast;
struct rlist readers, writers;
struct fiber *bcast; /* broadcast waiter */
struct fiber *close; /* close waiter */
bool closed; /* channel is closed */
unsigned size;
unsigned beg;
unsigned count;
......@@ -83,7 +86,9 @@ static void
ipc_channel_create(struct ipc_channel *ch)
{
ch->beg = ch->count = 0;
rlist_create(&ch->bcast);
ch->closed = false;
ch->close = NULL;
ch->bcast = NULL;
rlist_create(&ch->readers);
rlist_create(&ch->writers);
}
......@@ -106,12 +111,15 @@ ipc_channel_destroy(struct ipc_channel *ch)
void *
ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
{
if (ch->closed)
return NULL;
struct fiber *f;
bool first_try = true;
ev_tstamp started = ev_now(loop());
void *res;
/* channel is empty */
while (ch->count == 0) {
/* try to be in FIFO order */
if (first_try) {
rlist_add_tail_entry(&ch->readers, fiber(), state);
......@@ -124,24 +132,30 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
rlist_del_entry(fiber(), state);
/* broadcast messsage wakes us up */
if (!rlist_empty(&ch->bcast)) {
f = rlist_first_entry(&ch->bcast, struct fiber, state);
rlist_del_entry(f, state);
fiber_wakeup(f);
if (ch->bcast) {
fiber_wakeup(ch->bcast);
fiber_testcancel();
fiber_setcancellable(cancellable);
return ch->bcast_msg;
res = ch->bcast_msg;
goto exit;
}
fiber_testcancel();
fiber_setcancellable(cancellable);
timeout -= ev_now(loop()) - started;
if (timeout <= 0)
return NULL;
if (timeout <= 0) {
res = NULL;
goto exit;
}
if (ch->closed) {
res = NULL;
goto exit;
}
}
void *res = ch->item[ch->beg];
res = ch->item[ch->beg];
if (++ch->beg >= ch->size)
ch->beg -= ch->size;
ch->count--;
......@@ -152,6 +166,11 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
fiber_wakeup(f);
}
exit:
if (ch->closed && ch->close) {
fiber_wakeup(ch->close);
ch->close = NULL;
}
return res;
}
......@@ -162,11 +181,60 @@ ipc_channel_get(struct ipc_channel *ch)
return ipc_channel_get_timeout(ch, TIMEOUT_INFINITY);
}
static void
ipc_channel_close_waiter(struct ipc_channel *ch, struct fiber *f)
{
ch->close = fiber();
while (ch->close) {
bool cancellable = fiber_setcancellable(true);
fiber_wakeup(f);
fiber_yield();
ch->close = NULL;
rlist_del_entry(fiber(), state);
fiber_testcancel();
fiber_setcancellable(cancellable);
}
}
void
ipc_channel_close(struct ipc_channel *ch)
{
if (ch->closed)
return;
ch->closed = true;
struct fiber *f;
while(!rlist_empty(&ch->readers)) {
f = rlist_first_entry(&ch->readers, struct fiber, state);
ipc_channel_close_waiter(ch, f);
}
while(!rlist_empty(&ch->writers)) {
f = rlist_first_entry(&ch->writers, struct fiber, state);
ipc_channel_close_waiter(ch, f);
}
if (ch->bcast)
fiber_wakeup(ch->bcast);
}
bool
ipc_channel_is_closed(struct ipc_channel *ch)
{
return ch->closed;
}
int
ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
ev_tstamp timeout)
{
if (ch->closed) {
errno = EBADF;
return -1;
}
bool first_try = true;
int res;
unsigned i;
ev_tstamp started = ev_now(loop());
/* channel is full */
while (ch->count >= ch->size) {
......@@ -189,11 +257,18 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
timeout -= ev_now(loop()) - started;
if (timeout <= 0) {
errno = ETIMEDOUT;
return -1;
res = -1;
goto exit;
}
if (ch->closed) {
errno = EBADF;
res = -1;
goto exit;
}
}
unsigned i = ch->beg;
i = ch->beg;
i += ch->count;
ch->count++;
if (i >= ch->size)
......@@ -206,7 +281,15 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
rlist_del_entry(f, state);
fiber_wakeup(f);
}
return 0;
res = 0;
exit:
if (ch->closed && ch->close) {
int save_errno = errno;
fiber_wakeup(ch->close);
ch->close = NULL;
errno = save_errno;
}
return res;
}
void
......@@ -230,8 +313,12 @@ ipc_channel_has_writers(struct ipc_channel *ch)
int
ipc_channel_broadcast(struct ipc_channel *ch, void *data)
{
/* do nothing at closed channel */
if (ch->closed)
return 0;
/* broadcast in broadcast: marasmus */
if (!rlist_empty(&ch->bcast))
if (ch->bcast)
return 0;
/* there is no reader on channel */
......@@ -248,13 +335,16 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data)
unsigned cnt = 0;
while (!rlist_empty(&ch->readers)) {
if (ch->closed)
break;
f = rlist_first_entry(&ch->readers, struct fiber, state);
ch->bcast_msg = data;
rlist_add_tail_entry(&ch->bcast, fiber(), state);
ch->bcast = fiber();
fiber_wakeup(f);
bool cancellable = fiber_setcancellable(true);
fiber_yield();
ch->bcast = NULL;
rlist_del_entry(fiber(), state);
fiber_testcancel();
fiber_setcancellable(cancellable);
......@@ -263,5 +353,10 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data)
break;
}
if (ch->closed && ch->close) {
fiber_wakeup(ch->close);
ch->close = NULL;
}
return cnt;
}
......@@ -150,7 +150,6 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
void *
ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout);
/**
* @brief return true if channel has reader fibers that wait data
* @param channel
......@@ -165,4 +164,16 @@ ipc_channel_has_readers(struct ipc_channel *ch);
bool
ipc_channel_has_writers(struct ipc_channel *ch);
/**
* @brief close the channel. Wake up readers and writers (if they exist)
*/
void
ipc_channel_close(struct ipc_channel *ch);
/**
* @brief return true if the channel is closed
*/
bool
ipc_channel_is_closed(struct ipc_channel *ch);
#endif /* TARANTOOL_IPC_H_INCLUDED */
......@@ -41,6 +41,8 @@ extern "C" {
static const char channel_lib[] = "box.ipc.channel";
#define BROADCAST_MASK (((size_t)1) << (CHAR_BIT * sizeof(size_t) - 1))
/******************** channel ***************************/
static int
......@@ -131,35 +133,16 @@ lbox_ipc_channel_put(struct lua_State *L)
}
ch = lbox_check_channel(L, -top);
lua_getmetatable(L, -top);
lua_pushstring(L, "rid");
lua_gettable(L, -2);
lua_Integer rid = lua_tointeger(L, -1);
if (rid < 0x7FFFFFFF)
rid++;
else
rid = 1;
lua_pushstring(L, "rid"); /* update object id */
lua_pushnumber(L, rid);
lua_settable(L, -4);
lua_pushnumber(L, rid);
lua_pushvalue(L, 2);
lua_settable(L, -4);
size_t vref = luaL_ref(L, LUA_REGISTRYINDEX);
int retval;
if (ipc_channel_put_timeout(ch, (void *)rid, timeout) == 0) {
if (ipc_channel_put_timeout(ch, (void *)vref, timeout) == 0) {
retval = 1;
} else {
/* put timeout */
/* timed out or closed */
luaL_unref(L, LUA_REGISTRYINDEX, vref);
retval = 0;
lua_pushnumber(L, rid);
lua_pushnil(L);
lua_settable(L, -4);
}
lua_settop(L, top);
......@@ -188,30 +171,20 @@ lbox_ipc_channel_get(struct lua_State *L)
struct ipc_channel *ch = lbox_check_channel(L, 1);
lua_Integer rid = (lua_Integer)ipc_channel_get_timeout(ch, timeout);
size_t vref = (size_t)ipc_channel_get_timeout(ch, timeout);
if (!rid) {
if (!vref) {
/* timed out or closed */
lua_pushnil(L);
return 1;
}
lua_getmetatable(L, 1);
lua_pushstring(L, "broadcast_message");
lua_gettable(L, -2);
if (lua_isnil(L, -1)) { /* common messages */
lua_pop(L, 1); /* nil */
lua_pushnumber(L, rid); /* extract and delete value */
lua_gettable(L, -2);
lua_pushnumber(L, rid);
lua_pushnil(L);
lua_settable(L, -4);
if (vref & BROADCAST_MASK) {
vref &= ~BROADCAST_MASK;
lua_rawgeti(L, LUA_REGISTRYINDEX, vref);
return 1;
}
lua_remove(L, -2); /* cleanup stack (metatable) */
lua_rawgeti(L, LUA_REGISTRYINDEX, vref);
luaL_unref(L, LUA_REGISTRYINDEX, vref);
return 1;
}
......@@ -223,30 +196,17 @@ lbox_ipc_channel_broadcast(struct lua_State *L)
if (lua_gettop(L) != 2)
luaL_error(L, "usage: channel:broadcast(variable)");
ch = lbox_check_channel(L, -2);
ch = lbox_check_channel(L, 1);
if (!ipc_channel_has_readers(ch))
return lbox_ipc_channel_put(L);
lua_getmetatable(L, -2); /* 3 */
lua_pushstring(L, "broadcast_message"); /* 4 */
/* save old value */
lua_pushstring(L, "broadcast_message");
lua_gettable(L, 3); /* 5 */
lua_pushstring(L, "broadcast_message"); /* save object */
lua_pushvalue(L, 2);
lua_settable(L, 3);
int count = ipc_channel_broadcast(ch, (void *)1);
lua_settable(L, 3);
lua_pop(L, 1); /* stack cleanup */
size_t vref = luaL_ref(L, LUA_REGISTRYINDEX);
int count = ipc_channel_broadcast(ch, (void *)(vref | BROADCAST_MASK));
luaL_unref(L, LUA_REGISTRYINDEX, vref);
lua_pushnumber(L, count);
return 1;
}
......@@ -270,6 +230,29 @@ lbox_ipc_channel_has_writers(struct lua_State *L)
return 1;
}
static int
lbox_ipc_channel_close(struct lua_State *L)
{
if (lua_gettop(L) != 1)
luaL_error(L, "usage: channel:close()");
struct ipc_channel *ch = lbox_check_channel(L, 1);
ipc_channel_close(ch);
return 0;
}
static int
lbox_ipc_channel_is_closed(struct lua_State *L)
{
if (lua_gettop(L) != 1)
luaL_error(L, "usage: channel:is_closed()");
struct ipc_channel *ch = lbox_check_channel(L, 1);
if (ipc_channel_is_closed(ch))
lua_pushboolean(L, 1);
else
lua_pushboolean(L, 0);
return 1;
}
void
tarantool_lua_ipc_init(struct lua_State *L)
{
......@@ -282,6 +265,8 @@ tarantool_lua_ipc_init(struct lua_State *L)
{"broadcast", lbox_ipc_channel_broadcast},
{"has_readers", lbox_ipc_channel_has_readers},
{"has_writers", lbox_ipc_channel_has_writers},
{"close", lbox_ipc_channel_close},
{"is_closed", lbox_ipc_channel_is_closed},
{NULL, NULL}
};
luaL_register_type(L, channel_lib, channel_meta);
......@@ -291,7 +276,6 @@ tarantool_lua_ipc_init(struct lua_State *L)
{NULL, NULL}
};
lua_getfield(L, LUA_GLOBALSINDEX, "box");
lua_pushstring(L, "ipc");
......@@ -300,4 +284,3 @@ tarantool_lua_ipc_init(struct lua_State *L)
lua_settable(L, -3);
lua_pop(L, 1);
}
......@@ -299,3 +299,81 @@ buffer
- - tfbr2
- '45'
...
ch = box.ipc.channel(1)
---
...
ch:is_closed()
---
- false
...
passed = false
---
...
type(box.fiber.wrap(function() if ch:get() == nil then passed = true end end))
---
- userdata
...
ch:close()
---
...
passed
---
- true
...
ch:get()
---
- null
...
ch:get()
---
- null
...
ch:put(10)
---
- false
...
ch:is_closed()
---
- true
...
ch = box.ipc.channel(1)
---
...
ch:put(true)
---
- true
...
ch:is_closed()
---
- false
...
passed = false
---
...
type(box.fiber.wrap(function() if ch:put(true) == false then passed = true end end))
---
- userdata
...
ch:close()
---
...
passed
---
- true
...
ch:get()
---
- null
...
ch:get()
---
- null
...
ch:put(10)
---
- false
...
ch:is_closed()
---
- true
...
# encoding: tarantool
#
import sys
exec admin "lua ch = box.ipc.channel()"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua ch:get(.1)"
exec admin "lua ch:put()"
exec admin "lua ch:put('test')"
exec admin "lua ch:get()"
exec admin "lua ch:get('wrong timeout')"
exec admin "lua ch:get(-10)"
exec admin "lua ch:put(234)"
exec admin "lua ch:put(345, .5)"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua buffer = {}"
exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true do table.insert(buffer, ch:get()) end end)"
exec admin "lua box.fiber.resume(tfbr)"
exec admin "lua for i = 1, 10 do print(i, ' ', ch:put(i, 0.1)) end"
exec admin "lua ch:has_readers()"
exec admin "lua ch:has_writers()"
exec admin "lua box.fiber.cancel(tfbr)"
exec admin "lua ch:has_readers()"
exec admin "lua ch:has_writers()"
exec admin "lua ch:put(box.info.pid)"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua ch:get(box.info.pid) == box.info.pid"
exec admin "lua for i, v in pairs(buffer) do print(v) end"
exec admin "lua ch:is_empty()"
exec admin "lua ch:broadcast()"
exec admin "lua ch:broadcast(123)"
exec admin "lua ch:get()"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr - ' .. tostring(v)) end end)"
exec admin "lua box.fiber.resume(tfbr)"
exec admin "lua tfbr2 = box.fiber.create(function() box.fiber.detach() while true do local v = ch:get() table.insert(buffer, 'tfbr2 - ' .. tostring(v)) end end)"
exec admin "lua box.fiber.resume(tfbr2)"
exec admin "lua buffer = {}"
exec admin "lua for i, v in pairs(buffer) do print(v) end"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua ch:put(1)"
exec admin "lua ch:put(2)"
exec admin "lua ch:put(3)"
exec admin "lua ch:put(4)"
exec admin "lua ch:put(5)"
exec admin "lua ch:broadcast('broadcast message!')"
exec admin "lua for i = 35, 45 do print(ch:put(i)) end"
exec admin "lua for i, v in pairs(buffer) do print(v) end"
......@@ -85,3 +85,26 @@ t = {}
for i = 35, 45 do table.insert(t, ch:put(i)) end
t
buffer
ch = box.ipc.channel(1)
ch:is_closed()
passed = false
type(box.fiber.wrap(function() if ch:get() == nil then passed = true end end))
ch:close()
passed
ch:get()
ch:get()
ch:put(10)
ch:is_closed()
ch = box.ipc.channel(1)
ch:put(true)
ch:is_closed()
passed = false
type(box.fiber.wrap(function() if ch:put(true) == false then passed = true end end))
ch:close()
passed
ch:get()
ch:get()
ch:put(10)
ch:is_closed()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment