diff --git a/src/ipc.cc b/src/ipc.cc index d8165c7a73fa013e1688d4c78b1a2d3386c5783a..53bc65350441986c4eb55650152db61ac77693b5 100644 --- a/src/ipc.cc +++ b/src/ipc.cc @@ -29,19 +29,6 @@ #include "ipc.h" #include "fiber.h" #include <stdlib.h> -#include "salad/rlist.h" - -struct ipc_channel { - struct rlist readers, writers; - struct fiber *bcast; /* broadcast waiter */ - struct fiber *close; /* close waiter */ - bool closed; /* channel is closed */ - unsigned size; - unsigned beg; - unsigned count; - void *bcast_msg; - void *item[0]; -}; static void ipc_channel_create(struct ipc_channel *ch); @@ -49,18 +36,6 @@ ipc_channel_create(struct ipc_channel *ch); static void ipc_channel_destroy(struct ipc_channel *ch); -bool -ipc_channel_is_empty(struct ipc_channel *ch) -{ - return ch->count == 0; -} - -bool -ipc_channel_is_full(struct ipc_channel *ch) -{ - return ch->count >= ch->size; -} - struct ipc_channel * ipc_channel_new(unsigned size) { @@ -212,12 +187,6 @@ ipc_channel_close(struct ipc_channel *ch) fiber_wakeup(ch->bcast); } -bool -ipc_channel_is_closed(struct ipc_channel *ch) -{ - return ch->closed; -} - int ipc_channel_put_timeout(struct ipc_channel *ch, void *data, ev_tstamp timeout) @@ -291,18 +260,6 @@ ipc_channel_put(struct ipc_channel *ch, void *data) ipc_channel_put_timeout(ch, data, TIMEOUT_INFINITY); } -bool -ipc_channel_has_readers(struct ipc_channel *ch) -{ - return !rlist_empty(&ch->readers); -} - -bool -ipc_channel_has_writers(struct ipc_channel *ch) -{ - return !rlist_empty(&ch->writers); -} - int ipc_channel_broadcast(struct ipc_channel *ch, void *data) { diff --git a/src/ipc.h b/src/ipc.h index 90ea5c3784a0ad4badad7a6ea5bddbf004580019..0b38ca361bdcdbd4199eabb040331af44e6968af 100644 --- a/src/ipc.h +++ b/src/ipc.h @@ -30,12 +30,23 @@ */ #include <stdbool.h> #include <tarantool_ev.h> +#include "salad/rlist.h" /** * @brief CHANNELS */ -struct ipc_channel; +struct ipc_channel { + struct rlist readers, writers; + struct fiber *bcast; /* broadcast waiter */ + struct fiber *close; /* close waiter */ + bool closed; /* channel is closed */ + unsigned size; + unsigned beg; + unsigned count; + void *bcast_msg; + void *item[0]; +}; /** * @brief Allocate and construct new IPC channel @@ -99,8 +110,11 @@ ipc_channel_broadcast(struct ipc_channel *ch, void *data); * char *msg = ipc_channel_get(ch); * @endcode */ -bool -ipc_channel_is_empty(struct ipc_channel *ch); +static inline bool +ipc_channel_is_empty(struct ipc_channel *ch) +{ + return ch->count == 0; +} /** * @brief check if channel is full @@ -112,8 +126,11 @@ ipc_channel_is_empty(struct ipc_channel *ch); * ipc_channel_put(ch, "message"); * @endcode */ -bool -ipc_channel_is_full(struct ipc_channel *ch); +static inline bool +ipc_channel_is_full(struct ipc_channel *ch) +{ + return ch->count >= ch->size; +} /** * @brief put data into channel in timeout @@ -154,15 +171,41 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout); * @brief return true if channel has reader fibers that wait data * @param channel */ -bool -ipc_channel_has_readers(struct ipc_channel *ch); +static inline bool +ipc_channel_has_readers(struct ipc_channel *ch) +{ + return !rlist_empty(&ch->readers); +} /** * @brief return true if channel has writer fibers that wait data * @param channel */ -bool -ipc_channel_has_writers(struct ipc_channel *ch); +static inline bool +ipc_channel_has_writers(struct ipc_channel *ch) +{ + return !rlist_empty(&ch->writers); +} + +/** + * @brief return channel size + * @param channel + */ +static inline unsigned +ipc_channel_size(struct ipc_channel *ch) +{ + return ch->size; +} + +/** + * @brief return the number of items + * @param channel + */ +static inline unsigned +ipc_channel_count(struct ipc_channel *ch) +{ + return ch->count; +} /** * @brief close the channel. Wake up readers and writers (if they exist) @@ -173,7 +216,10 @@ ipc_channel_close(struct ipc_channel *ch); /** * @brief return true if the channel is closed */ -bool -ipc_channel_is_closed(struct ipc_channel *ch); +static inline bool +ipc_channel_is_closed(struct ipc_channel *ch) +{ + return ch->closed; +} #endif /* TARANTOOL_IPC_H_INCLUDED */ diff --git a/src/lua/ipc.cc b/src/lua/ipc.cc index 8ba6ad10fd8979d780497b415b0d4dd62b40f6dd..40fade0ccc8e0f85d2f10c23ef90df930a7a8845 100644 --- a/src/lua/ipc.cc +++ b/src/lua/ipc.cc @@ -230,6 +230,26 @@ lbox_ipc_channel_has_writers(struct lua_State *L) return 1; } +static int +lbox_ipc_channel_size(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "usage: channel:has_writers()"); + struct ipc_channel *ch = lbox_check_channel(L, -1); + lua_pushinteger(L, ipc_channel_size(ch)); + return 1; +} + +static int +lbox_ipc_channel_count(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "usage: channel:has_writers()"); + struct ipc_channel *ch = lbox_check_channel(L, -1); + lua_pushinteger(L, ipc_channel_count(ch)); + return 1; +} + static int lbox_ipc_channel_close(struct lua_State *L) { @@ -265,6 +285,8 @@ tarantool_lua_ipc_init(struct lua_State *L) {"broadcast", lbox_ipc_channel_broadcast}, {"has_readers", lbox_ipc_channel_has_readers}, {"has_writers", lbox_ipc_channel_has_writers}, + {"count", lbox_ipc_channel_count}, + {"size", lbox_ipc_channel_size}, {"close", lbox_ipc_channel_close}, {"is_closed", lbox_ipc_channel_is_closed}, {NULL, NULL} diff --git a/test/box/ipc.result b/test/box/ipc.result index 9f98fbb22f11ea25129ad85656cdab2583edbb46..fca439ac2ce004a2d6fa10b0533e22ea0b07fca8 100644 --- a/test/box/ipc.result +++ b/test/box/ipc.result @@ -4,6 +4,14 @@ fiber = require('fiber') ch = fiber.channel() --- ... +ch:size() +--- +- 1 +... +ch:count() +--- +- 0 +... ch:is_full() --- - false @@ -20,6 +28,10 @@ ch:put() --- - error: 'usage: channel:put(var [, timeout])' ... +ch:count() +--- +- 0 +... ch:put('test') --- - true @@ -44,6 +56,10 @@ ch:put(345, .5) --- - false ... +ch:count() +--- +- 1 +... ch:is_full() --- - true @@ -116,10 +132,18 @@ ch:has_writers() --- - false ... +ch:count() +--- +- 0 +... ch:put(box.info.pid) --- - true ... +ch:count() +--- +- 1 +... ch:is_full() --- - true diff --git a/test/box/ipc.test.lua b/test/box/ipc.test.lua index fa2829234ceff1cc2e2ab755b3adb0b7211fd619..bcd3130e8aa0bd2dc31d990c165caef01fff0951 100644 --- a/test/box/ipc.test.lua +++ b/test/box/ipc.test.lua @@ -1,16 +1,20 @@ fiber = require('fiber') ch = fiber.channel() +ch:size() +ch:count() ch:is_full() ch:is_empty() ch:get(.1) ch:put() +ch:count() ch:put('test') ch:get() ch:get('wrong timeout') ch:get(-10) ch:put(234) ch:put(345, .5) +ch:count() ch:is_full() ch:is_empty() buffer = {} @@ -34,7 +38,9 @@ fiber.cancel(tfbr) ch:has_readers() ch:has_writers() +ch:count() ch:put(box.info.pid) +ch:count() ch:is_full() ch:is_empty() ch:get(box.info.pid) == box.info.pid