diff --git a/src/box/lua/load_cfg.lua b/src/box/lua/load_cfg.lua index dd93c9a6c286e398710e969c2d5c02d7d90cc5ae..5bc522f85787c3986c0674c42efb7bd061383ecd 100644 --- a/src/box/lua/load_cfg.lua +++ b/src/box/lua/load_cfg.lua @@ -29,7 +29,7 @@ local default_sophia_cfg = { local default_cfg = { listen = nil, slab_alloc_arena = 1.0, - slab_alloc_minimal = 64, + slab_alloc_minimal = 16, slab_alloc_maximal = 1024 * 1024, slab_alloc_factor = 1.1, work_dir = nil, diff --git a/src/ipc.cc b/src/ipc.cc index 53bc65350441986c4eb55650152db61ac77693b5..637f64435f32f7e70a3f5d3e27e2581e5d5ef5b6 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -61,7 +61,7 @@ static void ipc_channel_create(struct ipc_channel *ch) { ch->beg = ch->count = 0; - ch->closed = false; + ch->readonly = ch->closed = false; ch->close = NULL; ch->bcast = NULL; rlist_create(&ch->readers); @@ -95,6 +95,8 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) void *res; /* channel is empty */ while (ch->count == 0) { + if (ch->readonly) + return NULL; /* try to be in FIFO order */ if (first_try) { rlist_add_tail_entry(&ch->readers, fiber(), state); @@ -121,7 +123,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) goto exit; } - if (ch->closed) { + if (ch->readonly) { res = NULL; goto exit; } @@ -139,7 +141,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) } exit: - if (ch->closed && ch->close) { + if (ch->readonly && ch->close) { fiber_wakeup(ch->close); ch->close = NULL; } @@ -168,18 +170,18 @@ ipc_channel_close_waiter(struct ipc_channel *ch, struct fiber *f) } void -ipc_channel_close(struct ipc_channel *ch) +ipc_channel_shutdown(struct ipc_channel *ch) { - if (ch->closed) + if (ch->readonly) return; - ch->closed = true; + ch->readonly = true; struct fiber *f; - while(!rlist_empty(&ch->readers)) { + while (!rlist_empty(&ch->readers)) { f = rlist_first_entry(&ch->readers, struct fiber, state); ipc_channel_close_waiter(ch, f); } - while(!rlist_empty(&ch->writers)) { + while (!rlist_empty(&ch->writers)) { f = rlist_first_entry(&ch->writers, struct fiber, state); ipc_channel_close_waiter(ch, f); } @@ -187,11 +189,23 @@ ipc_channel_close(struct ipc_channel *ch) fiber_wakeup(ch->bcast); } +void +ipc_channel_close(struct ipc_channel *ch) +{ + if (ch->closed) + return; + assert(ch->readonly); + assert(ch->count == 0); + assert(rlist_empty(&ch->readers)); + assert(rlist_empty(&ch->writers)); + assert(ch->bcast == NULL); + ch->closed = true; +} int ipc_channel_put_timeout(struct ipc_channel *ch, void *data, ev_tstamp timeout) { - if (ch->closed) { + if (ch->readonly) { errno = EBADF; return -1; } @@ -223,7 +237,7 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data, goto exit; } - if (ch->closed) { + if (ch->readonly) { errno = EBADF; res = -1; goto exit; @@ -245,7 +259,7 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data, } res = 0; exit: - if (ch->closed && ch->close) { + if (ch->readonly && ch->close) { int save_errno = errno; fiber_wakeup(ch->close); ch->close = NULL; @@ -264,7 +278,7 @@ int ipc_channel_broadcast(struct ipc_channel *ch, void *data) { /* do nothing at closed channel */ - if (ch->closed) + if (ch->readonly) return 0; /* broadcast in broadcast: marasmus */ @@ -285,7 +299,7 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data) unsigned cnt = 0; while (!rlist_empty(&ch->readers)) { - if (ch->closed) + if (ch->readonly) break; f = rlist_first_entry(&ch->readers, struct fiber, state); @@ -301,7 +315,7 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data) break; } - if (ch->closed && ch->close) { + if (ch->readonly && ch->close) { fiber_wakeup(ch->close); ch->close = NULL; } diff --git a/src/ipc.h b/src/ipc.h index 0b38ca361bdcdbd4199eabb040331af44e6968af..e35e56a2c9ea8011dc56fb0e9950b8a1362b13a6 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -40,6 +40,7 @@ struct ipc_channel { struct rlist readers, writers; struct fiber *bcast; /* broadcast waiter */ struct fiber *close; /* close waiter */ + bool readonly; /* channel is for read only */ bool closed; /* channel is closed */ unsigned size; unsigned beg; @@ -208,13 +209,31 @@ ipc_channel_count(struct ipc_channel *ch) } /** - * @brief close the channel. Wake up readers and writers (if they exist) + * @brief shutdown channel for writing. + * Wake up readers and writers (if they exist) + */ +void +ipc_channel_shutdown(struct ipc_channel *ch); + +/** + * @brief return true if the channel is closed for writing + */ +static inline bool +ipc_channel_is_readonly(struct ipc_channel *ch) +{ + return ch->readonly; +} + +/** + * @brief close the channel. + * @pre ipc_channel_is_readonly(ch) && ipc_channel_is_empty(ch) */ void ipc_channel_close(struct ipc_channel *ch); /** - * @brief return true if the channel is closed + * @brief return true if the channel is closed for both + * for reading and writing. */ static inline bool ipc_channel_is_closed(struct ipc_channel *ch) diff --git a/src/lib/small/slab_arena.c b/src/lib/small/slab_arena.c index f9e28236cae162258c872e8085a625dd153464b5..a233e538fc0bde3f6a7605c710b1f483c6c15874 100644 --- a/src/lib/small/slab_arena.c +++ b/src/lib/small/slab_arena.c @@ -61,17 +61,28 @@ mmap_checked(size_t size, size_t align, int flags) /* The size must be a multiple of alignment */ assert((size & (align - 1)) == 0); /* - * mmap twice the requested amount to be able to align - * the mapped address. - * @todo all mappings except the first are likely to - * be aligned already. Find out if trying to map - * optimistically exactly the requested amount and fall - * back to double-size mapping is a viable strategy. + * All mappings except the first are likely to + * be aligned already. Be optimistic by trying + * to map exactly the requested amount. */ - void *map = mmap(NULL, size + align, PROT_READ | PROT_WRITE, + void *map = mmap(NULL, size, PROT_READ | PROT_WRITE, flags | MAP_ANONYMOUS, -1, 0); if (map == MAP_FAILED) return NULL; + if (((intptr_t) map & (align - 1)) == 0) + return map; + munmap_checked(map, size); + + /* + * mmap twice the requested amount to be able to align + * the mapped address. This can lead to virtual memory + * fragmentation depending on the kernels allocation + * strategy. + */ + map = mmap(NULL, size + align, PROT_READ | PROT_WRITE, + flags | MAP_ANONYMOUS, -1, 0); + if (map == MAP_FAILED) + return NULL; /* Align the mapped address around slab size. */ size_t offset = (intptr_t) map & (align - 1); diff --git a/src/lua/ipc.cc b/src/lua/ipc.cc index 9b4470c32f666c7fef6af0f5b5bd83d771b18449..89e7851c577a7c2034a1760b4c09d77128e413b4 100644 --- a/src/lua/ipc.cc +++ b/src/lua/ipc.cc @@ -256,6 +256,20 @@ lbox_ipc_channel_close(struct lua_State *L) if (lua_gettop(L) != 1) luaL_error(L, "usage: channel:close()"); struct ipc_channel *ch = lbox_check_channel(L, 1); + + /* Shutdown the channel for writing and wakeup waiters */ + ipc_channel_shutdown(ch); + + /* Discard all remaining items*/ + while (!ipc_channel_is_empty(ch)) { + /* Never yields because channel is not empty */ + size_t vref = (size_t)ipc_channel_get_timeout(ch, 0); + assert(vref); + assert((vref & BROADCAST_MASK) == 0); + /* Unref lua items */ + luaL_unref(L, LUA_REGISTRYINDEX, vref); + } + /* Close the channel */ ipc_channel_close(ch); return 0; } diff --git a/test/app/float_value.result b/test/app/float_value.result index f592757013664b3edab859e613102ffcabc88a88..4fd16c37bd6d54aa142a3bcc6b0765919d933d86 100644 --- a/test/app/float_value.result +++ b/test/app/float_value.result @@ -10,7 +10,7 @@ box.cfg 9 logger_nonblock:true 10 snap_dir:. 11 coredump:false -12 slab_alloc_minimal:64 +12 slab_alloc_minimal:16 13 sophia_dir:. 14 wal_mode:write 15 wal_dir:. diff --git a/test/app/init_script.result b/test/app/init_script.result index 03ef3f3517410971252831ec43c0bbcb6899bec2..dc7f846591c7a41d79e030e02046b727620f5385 100644 --- a/test/app/init_script.result +++ b/test/app/init_script.result @@ -14,7 +14,7 @@ box.cfg 9 logger_nonblock:true 10 snap_dir:. 11 coredump:false -12 slab_alloc_minimal:64 +12 slab_alloc_minimal:16 13 sophia_dir:. 14 wal_mode:write 15 rows_per_wal:500000 diff --git a/test/box/admin.result b/test/box/admin.result index 30217aee9a5eb3ee37f3f58fc503faebccd5c6c8..7c3ef8b360bcc2325670073e00ddfb1d4922b1f2 100644 --- a/test/box/admin.result +++ b/test/box/admin.result @@ -34,7 +34,7 @@ box.cfg logger_nonblock: true snap_dir: . coredump: false - slab_alloc_minimal: 64 + slab_alloc_minimal: 16 sophia_dir: . wal_mode: write wal_dir: . diff --git a/test/box/cfg.result b/test/box/cfg.result index bab6b13c6faaf3f2946f6bd057ae1b0d58f6e06a..a6849cc41ce108a9045a448baf5a5aa70acfd192 100644 --- a/test/box/cfg.result +++ b/test/box/cfg.result @@ -20,7 +20,7 @@ t - 'logger_nonblock: true' - 'snap_dir: .' - 'coredump: false' - - 'slab_alloc_minimal: 64' + - 'slab_alloc_minimal: 16' - 'sophia_dir: .' - 'wal_mode: write' - 'wal_dir: .' @@ -54,7 +54,7 @@ t - 'logger_nonblock: true' - 'snap_dir: .' - 'coredump: false' - - 'slab_alloc_minimal: 64' + - 'slab_alloc_minimal: 16' - 'sophia_dir: .' - 'wal_mode: write' - 'wal_dir: .' diff --git a/test/box/ipc.result b/test/box/ipc.result index fca439ac2ce004a2d6fa10b0533e22ea0b07fca8..4c9b42e59b19c32eb9620551ca2d7eb755ecace6 100644 --- a/test/box/ipc.result +++ b/test/box/ipc.result @@ -458,3 +458,62 @@ count > 2000, #res, res; - true ... --# setopt delimiter '' +-- +-- gh-756: channel:close() leaks memory +-- +ffi = require('ffi') +--- +... +ffi.cdef[[ struct gh756 { int k; }; ]] +--- +... +ct = ffi.metatype('struct gh756', { __gc = function() refs = refs - 1; end }) +--- +... +-- create 10 objects and put they to a channel +refs = 10 +--- +... +ch = fiber.channel(refs) +--- +... +for i=1,refs do ch:put(ffi.new(ct, i)) end +--- +... +-- get an object from the channel, run GC and check the number of objects +ch:get().k == 1 +--- +- true +... +collectgarbage('collect') +--- +- 0 +... +refs +--- +- 9 +... +ch:get().k == 2 +--- +- true +... +collectgarbage('collect') +--- +- 0 +... +refs +--- +- 8 +... +-- close the channel and check the number of objects +ch:close() +--- +... +collectgarbage('collect') +--- +- 0 +... +refs -- must be zero +--- +- 0 +... diff --git a/test/box/ipc.test.lua b/test/box/ipc.test.lua index bcd3130e8aa0bd2dc31d990c165caef01fff0951..3b60b03be0a66e20d2a5973bdfc6d95cb0dbbaf4 100644 --- a/test/box/ipc.test.lua +++ b/test/box/ipc.test.lua @@ -157,3 +157,29 @@ for i = 1, 100 do fiber.sleep(0.01) if count > 2000 then break end end; count > 2000, #res, res; --# setopt delimiter '' + +-- +-- gh-756: channel:close() leaks memory +-- + +ffi = require('ffi') +ffi.cdef[[ struct gh756 { int k; }; ]] +ct = ffi.metatype('struct gh756', { __gc = function() refs = refs - 1; end }) + +-- create 10 objects and put they to a channel +refs = 10 +ch = fiber.channel(refs) +for i=1,refs do ch:put(ffi.new(ct, i)) end + +-- get an object from the channel, run GC and check the number of objects +ch:get().k == 1 +collectgarbage('collect') +refs +ch:get().k == 2 +collectgarbage('collect') +refs + +-- close the channel and check the number of objects +ch:close() +collectgarbage('collect') +refs -- must be zero diff --git a/test/wal_off/oom.result b/test/wal_off/oom.result index 4619a92bebec234a0a9fde5c86725e68e0698ce8..f547528c4d156b01e85f7aa23f41a1eb4e1e9fea 100644 --- a/test/wal_off/oom.result +++ b/test/wal_off/oom.result @@ -15,11 +15,11 @@ while true do i = i + 1 end; --- -- error: Failed to allocate 24643 bytes in slab allocator for tuple +- error: Failed to allocate 25031 bytes in slab allocator for tuple ... space:len(); --- -- 6155 +- 6252 ... i = 1; --- @@ -29,11 +29,11 @@ while true do i = i + 1 end; --- -- error: Failed to allocate 5187 bytes in slab allocator for tuple +- error: Failed to allocate 4167 bytes in slab allocator for tuple ... space:len(); --- -- 7446 +- 7288 ... i = 1; --- @@ -43,12 +43,12 @@ while true do i = i + 1 end; --- -- error: Failed to allocate 2751 bytes in slab allocator for tuple +- error: Failed to allocate 2123 bytes in slab allocator for tuple ... --# setopt delimiter '' space:len() --- -- 8128 +- 7813 ... space.index['primary']:get{0} ---