Skip to content
Snippets Groups Projects
Commit a12b3d45 authored by Roman Tsisyk's avatar Roman Tsisyk
Browse files

Fix #756: channel:close() leaks memory

parent 7a8668d8
No related branches found
No related tags found
No related merge requests found
......@@ -61,7 +61,7 @@ static void
ipc_channel_create(struct ipc_channel *ch)
{
ch->beg = ch->count = 0;
ch->closed = false;
ch->readonly = ch->closed = false;
ch->close = NULL;
ch->bcast = NULL;
rlist_create(&ch->readers);
......@@ -95,6 +95,8 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
void *res;
/* channel is empty */
while (ch->count == 0) {
if (ch->readonly)
return NULL;
/* try to be in FIFO order */
if (first_try) {
rlist_add_tail_entry(&ch->readers, fiber(), state);
......@@ -121,7 +123,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
goto exit;
}
if (ch->closed) {
if (ch->readonly) {
res = NULL;
goto exit;
}
......@@ -139,7 +141,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
}
exit:
if (ch->closed && ch->close) {
if (ch->readonly && ch->close) {
fiber_wakeup(ch->close);
ch->close = NULL;
}
......@@ -168,11 +170,11 @@ ipc_channel_close_waiter(struct ipc_channel *ch, struct fiber *f)
}
void
ipc_channel_close(struct ipc_channel *ch)
ipc_channel_shutdown(struct ipc_channel *ch)
{
if (ch->closed)
if (ch->readonly)
return;
ch->closed = true;
ch->readonly = true;
struct fiber *f;
while(!rlist_empty(&ch->readers)) {
......@@ -187,11 +189,23 @@ ipc_channel_close(struct ipc_channel *ch)
fiber_wakeup(ch->bcast);
}
void
ipc_channel_close(struct ipc_channel *ch)
{
if (ch->closed)
return;
assert(ch->readonly);
assert(ch->count == 0);
assert(rlist_empty(&ch->readers));
assert(rlist_empty(&ch->writers));
assert(ch->bcast == NULL);
ch->closed = true;
}
int
ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
ev_tstamp timeout)
{
if (ch->closed) {
if (ch->readonly) {
errno = EBADF;
return -1;
}
......@@ -223,7 +237,7 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
goto exit;
}
if (ch->closed) {
if (ch->readonly) {
errno = EBADF;
res = -1;
goto exit;
......@@ -245,7 +259,7 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
}
res = 0;
exit:
if (ch->closed && ch->close) {
if (ch->readonly && ch->close) {
int save_errno = errno;
fiber_wakeup(ch->close);
ch->close = NULL;
......@@ -264,7 +278,7 @@ int
ipc_channel_broadcast(struct ipc_channel *ch, void *data)
{
/* do nothing at closed channel */
if (ch->closed)
if (ch->readonly)
return 0;
/* broadcast in broadcast: marasmus */
......@@ -285,7 +299,7 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data)
unsigned cnt = 0;
while (!rlist_empty(&ch->readers)) {
if (ch->closed)
if (ch->readonly)
break;
f = rlist_first_entry(&ch->readers, struct fiber, state);
......@@ -301,7 +315,7 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data)
break;
}
if (ch->closed && ch->close) {
if (ch->readonly && ch->close) {
fiber_wakeup(ch->close);
ch->close = NULL;
}
......
......@@ -40,6 +40,7 @@ struct ipc_channel {
struct rlist readers, writers;
struct fiber *bcast; /* broadcast waiter */
struct fiber *close; /* close waiter */
bool readonly; /* channel is for read only */
bool closed; /* channel is closed */
unsigned size;
unsigned beg;
......@@ -208,13 +209,31 @@ ipc_channel_count(struct ipc_channel *ch)
}
/**
* @brief close the channel. Wake up readers and writers (if they exist)
* @brief shutdown channel for writing.
* Wake up readers and writers (if they exist)
*/
void
ipc_channel_shutdown(struct ipc_channel *ch);
/**
* @brief return true if the channel is closed for writing
*/
static inline bool
ipc_channel_is_readonly(struct ipc_channel *ch)
{
return ch->readonly;
}
/**
* @brief close the channel.
* @pre ipc_channel_is_readonly(ch) && ipc_channel_is_empty(ch)
*/
void
ipc_channel_close(struct ipc_channel *ch);
/**
* @brief return true if the channel is closed
* @brief return true if the channel is closed for both
* for reading and writing.
*/
static inline bool
ipc_channel_is_closed(struct ipc_channel *ch)
......
......@@ -256,6 +256,20 @@ lbox_ipc_channel_close(struct lua_State *L)
if (lua_gettop(L) != 1)
luaL_error(L, "usage: channel:close()");
struct ipc_channel *ch = lbox_check_channel(L, 1);
/* Shutdown the channel for writing and wakeup waiters */
ipc_channel_shutdown(ch);
/* Discard all remaining items*/
while (!ipc_channel_is_empty(ch)) {
/* Never yields because channel is not empty */
size_t vref = (size_t)ipc_channel_get_timeout(ch, 0);
assert(vref);
assert((vref & BROADCAST_MASK) == 0);
/* Unref lua items */
luaL_unref(L, LUA_REGISTRYINDEX, vref);
}
/* Close the channel */
ipc_channel_close(ch);
return 0;
}
......
......@@ -458,3 +458,62 @@ count > 2000, #res, res;
- true
...
--# setopt delimiter ''
--
-- gh-756: channel:close() leaks memory
--
ffi = require('ffi')
---
...
ffi.cdef[[ struct gh756 { int k; }; ]]
---
...
ct = ffi.metatype('struct gh756', { __gc = function() refs = refs - 1; end })
---
...
-- create 10 objects and put they to a channel
refs = 10
---
...
ch = fiber.channel(refs)
---
...
for i=1,refs do ch:put(ffi.new(ct, i)) end
---
...
-- get an object from the channel, run GC and check the number of objects
ch:get().k == 1
---
- true
...
collectgarbage('collect')
---
- 0
...
refs
---
- 9
...
ch:get().k == 2
---
- true
...
collectgarbage('collect')
---
- 0
...
refs
---
- 8
...
-- close the channel and check the number of objects
ch:close()
---
...
collectgarbage('collect')
---
- 0
...
refs -- must be zero
---
- 0
...
......@@ -157,3 +157,29 @@ for i = 1, 100 do fiber.sleep(0.01) if count > 2000 then break end end;
count > 2000, #res, res;
--# setopt delimiter ''
--
-- gh-756: channel:close() leaks memory
--
ffi = require('ffi')
ffi.cdef[[ struct gh756 { int k; }; ]]
ct = ffi.metatype('struct gh756', { __gc = function() refs = refs - 1; end })
-- create 10 objects and put they to a channel
refs = 10
ch = fiber.channel(refs)
for i=1,refs do ch:put(ffi.new(ct, i)) end
-- get an object from the channel, run GC and check the number of objects
ch:get().k == 1
collectgarbage('collect')
refs
ch:get().k == 2
collectgarbage('collect')
refs
-- close the channel and check the number of objects
ch:close()
collectgarbage('collect')
refs -- must be zero
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment