diff --git a/src/box/applier.cc b/src/box/applier.cc index 0b674d0e862e8991b44986289d583252a8efa6c9..67160211b1c1a2a64cac7bd52c0b477523756ca5 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -516,7 +516,7 @@ applier_new(const char *uri, struct xstream *join_stream, applier->subscribe_stream = subscribe_stream; applier->last_row_time = ev_now(loop()); rlist_create(&applier->on_state); - ipc_channel_create(&applier->pause, 0); + fiber_channel_create(&applier->pause, 0); return applier; } @@ -527,7 +527,7 @@ applier_delete(struct applier *applier) assert(applier->reader == NULL); iobuf_delete(applier->iobuf); assert(applier->io.fd == -1); - ipc_channel_destroy(&applier->pause); + fiber_channel_destroy(&applier->pause); trigger_destroy(&applier->on_state); free(applier); } @@ -537,7 +537,7 @@ applier_resume(struct applier *applier) { assert(!fiber_is_dead(applier->reader)); void *data = NULL; - ipc_channel_put_xc(&applier->pause, data); + fiber_channel_put_xc(&applier->pause, data); } static inline void @@ -545,14 +545,14 @@ applier_pause(struct applier *applier) { /* Sleep until applier_resume() wake us up */ void *data; - ipc_channel_get_xc(&applier->pause, &data); + fiber_channel_get_xc(&applier->pause, &data); } struct applier_on_state { struct trigger base; struct applier *applier; enum applier_state desired_state; - struct ipc_channel wakeup; + struct fiber_channel wakeup; }; /** Used by applier_connect_all() */ @@ -571,7 +571,7 @@ applier_on_state_f(struct trigger *trigger, void *event) return; /* Wake up waiter */ - ipc_channel_put_xc(&on_state->wakeup, applier); + fiber_channel_put_xc(&on_state->wakeup, applier); applier_pause(applier); } @@ -583,7 +583,7 @@ applier_add_on_state(struct applier *applier, { trigger_create(&trigger->base, applier_on_state_f, NULL, NULL); trigger->applier = applier; - ipc_channel_create(&trigger->wakeup, 0); + fiber_channel_create(&trigger->wakeup, 0); trigger->desired_state = desired_state; trigger_add(&applier->on_state, &trigger->base); } @@ -591,7 +591,7 @@ applier_add_on_state(struct applier *applier, static inline void applier_clear_on_state(struct applier_on_state *trigger) { - ipc_channel_destroy(&trigger->wakeup); + fiber_channel_destroy(&trigger->wakeup); trigger_clear(&trigger->base); } @@ -599,7 +599,7 @@ static inline int applier_wait_for_state(struct applier_on_state *trigger, double timeout) { void *data = NULL; - if (ipc_channel_get_timeout(&trigger->wakeup, &data, timeout) != 0) + if (fiber_channel_get_timeout(&trigger->wakeup, &data, timeout) != 0) return -1; /* ER_TIMEOUT */ struct applier *applier = trigger->applier; diff --git a/src/box/applier.h b/src/box/applier.h index 6012b505bc1be8fdd376e3094ef0b553abf84e5d..b4b0d9ff5a8f751e5de07512fae7b0897b192d98 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -100,7 +100,7 @@ struct applier { /** Triggers invoked on state change */ struct rlist on_state; /** Channel used by applier_connect_all() and applier_resume() */ - struct ipc_channel pause; + struct fiber_channel pause; /** xstream to process rows during initial JOIN */ struct xstream *join_stream; /** xstream to process rows during final JOIN and SUBSCRIBE */ diff --git a/src/box/vinyl.c b/src/box/vinyl.c index b698c7ef6608759d8461bfccc17fbbb5c6466312..58b41a392101365296e5dd4411fbe8cdb47371a2 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -598,7 +598,7 @@ struct vy_scheduler { /** * There is no pending tasks for workers, so scheduler * needs to create one, or we want to shutdown the - * scheduler. Scheduler is a fiber in TX, so ev_async + ipc_channel + * scheduler. Scheduler is a fiber in TX, so ev_async + fiber_channel * are used here instead of pthread_cond_t. */ struct ev_async scheduler_async; diff --git a/src/fiber_channel.c b/src/fiber_channel.c index 43312de8a03fde43720a9aadd7329328390f13b0..6f1afda653eb564ace353c7357eb2a44ddc9fe0c 100644 --- a/src/fiber_channel.c +++ b/src/fiber_channel.c @@ -34,11 +34,11 @@ #include "fiber.h" -enum ipc_wait_status { - IPC_WAIT_READER, /* A reader is waiting for writer */ - IPC_WAIT_WRITER, /* A writer waiting for reader. */ - IPC_WAIT_DONE, /* Wait is done, message sent/received. */ - IPC_WAIT_CLOSED /* Wait is aborted, the channel is closed. */ +enum fiber_channel_wait_status { + FIBER_CHANNEL_WAIT_READER, /* A reader is waiting for writer */ + FIBER_CHANNEL_WAIT_WRITER, /* A writer waiting for reader. */ + FIBER_CHANNEL_WAIT_DONE, /* Wait is done, message sent/received. */ + FIBER_CHANNEL_WAIT_CLOSED /* Wait is aborted, the channel is closed. */ }; /** @@ -47,11 +47,11 @@ enum ipc_wait_status { */ struct ipc_wait_pad { struct ipc_msg *msg; - enum ipc_wait_status status; + enum fiber_channel_wait_status status; }; void -ipc_channel_create(struct ipc_channel *ch, uint32_t size) +fiber_channel_create(struct fiber_channel *ch, uint32_t size) { ch->size = size; ch->count = 0; @@ -64,22 +64,23 @@ ipc_channel_create(struct ipc_channel *ch, uint32_t size) } -struct ipc_channel * -ipc_channel_new(uint32_t size) +struct fiber_channel * +fiber_channel_new(uint32_t size) { - struct ipc_channel *res = (struct ipc_channel *) - malloc(ipc_channel_memsize(size)); + struct fiber_channel *res = (struct fiber_channel *) + malloc(fiber_channel_memsize(size)); if (res == NULL) { diag_set(OutOfMemory, size, - "malloc", "struct ipc_channel"); + "malloc", "struct fiber_channel"); return NULL; } - ipc_channel_create(res, size); + fiber_channel_create(res, size); return res; } bool -ipc_channel_has_waiter(struct ipc_channel *ch, enum ipc_wait_status status) +fiber_channel_has_waiter(struct fiber_channel *ch, + enum fiber_channel_wait_status status) { if (rlist_empty(&ch->waiters)) return false; @@ -90,15 +91,15 @@ ipc_channel_has_waiter(struct ipc_channel *ch, enum ipc_wait_status status) } bool -ipc_channel_has_readers(struct ipc_channel *ch) +fiber_channel_has_readers(struct fiber_channel *ch) { - return ipc_channel_has_waiter(ch, IPC_WAIT_READER); + return fiber_channel_has_waiter(ch, FIBER_CHANNEL_WAIT_READER); } bool -ipc_channel_has_writers(struct ipc_channel *ch) +fiber_channel_has_writers(struct fiber_channel *ch) { - return ipc_channel_has_waiter(ch, IPC_WAIT_WRITER); + return fiber_channel_has_waiter(ch, FIBER_CHANNEL_WAIT_WRITER); } /** @@ -107,7 +108,7 @@ ipc_channel_has_writers(struct ipc_channel *ch) * @pre The buffer has space for a message. */ static inline void -ipc_channel_buffer_push(struct ipc_channel *ch, struct ipc_msg *msg) +fiber_channel_buffer_push(struct fiber_channel *ch, struct ipc_msg *msg) { assert(ch->count < ch->size); /* Find an empty slot in the ring buffer. */ @@ -119,7 +120,7 @@ ipc_channel_buffer_push(struct ipc_channel *ch, struct ipc_msg *msg) } static inline struct ipc_msg * -ipc_channel_buffer_pop(struct ipc_channel *ch) +fiber_channel_buffer_pop(struct fiber_channel *ch) { assert(ch->count > 0); struct ipc_msg *msg = ch->buf[ch->beg]; @@ -130,7 +131,8 @@ ipc_channel_buffer_pop(struct ipc_channel *ch) } static inline void -ipc_channel_waiter_wakeup(struct fiber *f, enum ipc_wait_status status) +fiber_channel_waiter_wakeup(struct fiber *f, + enum fiber_channel_wait_status status) { struct ipc_wait_pad *pad = (struct ipc_wait_pad *) fiber_get_key(f, FIBER_KEY_MSG); @@ -141,7 +143,7 @@ ipc_channel_waiter_wakeup(struct fiber *f, enum ipc_wait_status status) */ pad->status = status; /* - * ipc_channel allows an asynchronous cancel. If a fiber + * fiber_channel allows an asynchronous cancel. If a fiber * is cancelled while waiting on a timeout, it is done via * fiber_wakeup(), which modifies fiber->state link. * This ensures that a fiber is never on two "state" @@ -161,7 +163,7 @@ ipc_channel_waiter_wakeup(struct fiber *f, enum ipc_wait_status status) int -ipc_channel_check_wait(struct ipc_channel *ch, ev_tstamp start_time, +fiber_channel_check_wait(struct fiber_channel *ch, ev_tstamp start_time, ev_tstamp timeout) { /* @@ -191,34 +193,34 @@ ipc_channel_check_wait(struct ipc_channel *ch, ev_tstamp start_time, } void -ipc_channel_close(struct ipc_channel *ch) +fiber_channel_close(struct fiber_channel *ch) { if (ch->is_closed) return; while (ch->count) { - struct ipc_msg *msg = ipc_channel_buffer_pop(ch); + struct ipc_msg *msg = fiber_channel_buffer_pop(ch); msg->destroy(msg); } struct fiber *f; while (! rlist_empty(&ch->waiters)) { f = rlist_first_entry(&ch->waiters, struct fiber, state); - ipc_channel_waiter_wakeup(f, IPC_WAIT_CLOSED); + fiber_channel_waiter_wakeup(f, FIBER_CHANNEL_WAIT_CLOSED); } ch->is_closed = true; } void -ipc_channel_destroy(struct ipc_channel *ch) +fiber_channel_destroy(struct fiber_channel *ch) { - ipc_channel_close(ch); + fiber_channel_close(ch); } void -ipc_channel_delete(struct ipc_channel *ch) +fiber_channel_delete(struct fiber_channel *ch) { - ipc_channel_destroy(ch); + fiber_channel_destroy(ch); free(ch); } @@ -254,7 +256,7 @@ ipc_value_delete(struct ipc_msg *msg) } int -ipc_channel_put_timeout(struct ipc_channel *ch, +fiber_channel_put_timeout(struct fiber_channel *ch, void *data, ev_tstamp timeout) { @@ -262,18 +264,18 @@ ipc_channel_put_timeout(struct ipc_channel *ch, if (value == NULL) return -1; value->data = data; - int rc = ipc_channel_put_msg_timeout(ch, &value->base, timeout); + int rc = fiber_channel_put_msg_timeout(ch, &value->base, timeout); if (rc < 0) ipc_value_delete(&value->base); return rc; } int -ipc_channel_get_timeout(struct ipc_channel *ch, void **data, +fiber_channel_get_timeout(struct fiber_channel *ch, void **data, ev_tstamp timeout) { struct ipc_value *value; - int rc = ipc_channel_get_msg_timeout(ch, (struct ipc_msg **) &value, + int rc = fiber_channel_get_msg_timeout(ch, (struct ipc_msg **) &value, timeout); if (rc < 0) return rc; @@ -283,7 +285,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, void **data, } int -ipc_channel_put_msg_timeout(struct ipc_channel *ch, +fiber_channel_put_msg_timeout(struct fiber_channel *ch, struct ipc_msg *msg, ev_tstamp timeout) { @@ -297,7 +299,7 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, * only if there is no reader try to put a message * into the channel buffer. */ - if (ipc_channel_has_readers(ch)) { + if (fiber_channel_has_readers(ch)) { /** * There is a reader, push the message * immediately. @@ -320,7 +322,7 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, pad->msg = msg; - ipc_channel_waiter_wakeup(f, IPC_WAIT_DONE); + fiber_channel_waiter_wakeup(f, FIBER_CHANNEL_WAIT_DONE); return 0; } if (ch->count < ch->size) { @@ -337,7 +339,7 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, diag_set(ChannelIsClosed); return -1; } - ipc_channel_buffer_push(ch, msg); + fiber_channel_buffer_push(ch, msg); return 0; } /** @@ -346,12 +348,12 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, */ struct fiber *f = fiber(); - if (ipc_channel_check_wait(ch, start_time, timeout)) + if (fiber_channel_check_wait(ch, start_time, timeout)) return -1; /* Prepare a wait pad. */ struct ipc_wait_pad pad; - pad.status = IPC_WAIT_WRITER; + pad.status = FIBER_CHANNEL_WAIT_WRITER; pad.msg = msg; fiber_set_key(f, FIBER_KEY_MSG, &pad); @@ -370,7 +372,7 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, rlist_del_entry(f, state); fiber_set_key(f, FIBER_KEY_MSG, NULL); - if (pad.status == IPC_WAIT_CLOSED) { + if (pad.status == FIBER_CHANNEL_WAIT_CLOSED) { /* * The channel is closed. Do not touch * the channel object. It might be gone @@ -379,14 +381,14 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, diag_set(ChannelIsClosed); return -1; } - if (pad.status == IPC_WAIT_DONE) + if (pad.status == FIBER_CHANNEL_WAIT_DONE) return 0; /* OK, someone took the message. */ timeout -= ev_now(loop()) - start_time; } } int -ipc_channel_get_msg_timeout(struct ipc_channel *ch, +fiber_channel_get_msg_timeout(struct fiber_channel *ch, struct ipc_msg **msg, ev_tstamp timeout) { @@ -409,9 +411,9 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, */ assert(ch->is_closed == false); - *msg = ipc_channel_buffer_pop(ch); + *msg = fiber_channel_buffer_pop(ch); - if (ipc_channel_has_writers(ch)) { + if (fiber_channel_has_writers(ch)) { /* * Move a waiting writer, if any, * from the wait list to the tail @@ -424,12 +426,13 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, struct ipc_wait_pad *pad = (struct ipc_wait_pad *) fiber_get_key(f, FIBER_KEY_MSG); - ipc_channel_buffer_push(ch, pad->msg); - ipc_channel_waiter_wakeup(f, IPC_WAIT_DONE); + fiber_channel_buffer_push(ch, pad->msg); + fiber_channel_waiter_wakeup(f, + FIBER_CHANNEL_WAIT_DONE); } return 0; } - if (ipc_channel_has_writers(ch)) { + if (fiber_channel_has_writers(ch)) { /** * There is no buffered messages, *but* * there is a writer. This is only @@ -446,10 +449,10 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, (struct ipc_wait_pad *) fiber_get_key(f, FIBER_KEY_MSG); *msg = pad->msg; - ipc_channel_waiter_wakeup(f, IPC_WAIT_DONE); + fiber_channel_waiter_wakeup(f, FIBER_CHANNEL_WAIT_DONE); return 0; } - if (ipc_channel_check_wait(ch, start_time, timeout)) + if (fiber_channel_check_wait(ch, start_time, timeout)) return -1; f = fiber(); /** @@ -457,7 +460,7 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, * Have to wait. */ struct ipc_wait_pad pad; - pad.status = IPC_WAIT_READER; + pad.status = FIBER_CHANNEL_WAIT_READER; fiber_set_key(f, FIBER_KEY_MSG, &pad); if (first_try) { rlist_add_tail_entry(&ch->waiters, f, state); @@ -473,11 +476,11 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, */ rlist_del_entry(f, state); fiber_set_key(f, FIBER_KEY_MSG, NULL); - if (pad.status == IPC_WAIT_CLOSED) { + if (pad.status == FIBER_CHANNEL_WAIT_CLOSED) { diag_set(ChannelIsClosed); return -1; } - if (pad.status == IPC_WAIT_DONE) { + if (pad.status == FIBER_CHANNEL_WAIT_DONE) { *msg = pad.msg; return 0; } diff --git a/src/fiber_channel.h b/src/fiber_channel.h index 90438042c2ebf3fb5dc2e6c6380494ae0891d8fb..50c286463b5d5c8cc6c2a214919bb957202e1a93 100644 --- a/src/fiber_channel.h +++ b/src/fiber_channel.h @@ -119,11 +119,11 @@ ipc_value_new(); * Channel structure has a fixed size. If a channel is created * with a buffer, the buffer must be allocated in a continuous * memory chunk, directly after the channel itself. - * ipc_channel_memsize() can be used to find out the amount + * fiber_channel_memsize() can be used to find out the amount * of memory necessary to store a channel, given the desired * buffer size. */ -struct ipc_channel { +struct fiber_channel { /** Channel buffer size, if the channel is buffered. */ uint32_t size; /** The number of messages in the buffer. */ @@ -148,9 +148,9 @@ struct ipc_channel { * buffer size. */ static inline size_t -ipc_channel_memsize(uint32_t size) +fiber_channel_memsize(uint32_t size) { - return sizeof(struct ipc_channel) + sizeof(struct ipc_msg *) * size; + return sizeof(struct fiber_channel) + sizeof(struct ipc_msg *) * size; } /** @@ -158,11 +158,11 @@ ipc_channel_memsize(uint32_t size) * been correctly allocated for the channel). */ void -ipc_channel_create(struct ipc_channel *ch, uint32_t size); +fiber_channel_create(struct fiber_channel *ch, uint32_t size); /** Destroy a channel. Does not free allocated memory. */ void -ipc_channel_destroy(struct ipc_channel *ch); +fiber_channel_destroy(struct fiber_channel *ch); /** * Allocate and construct a channel. @@ -172,11 +172,11 @@ ipc_channel_destroy(struct ipc_channel *ch); * @param size of the channel buffer * @return new channel * @code - * struct ipc_channel *ch = ipc_channel_new(10); + * struct fiber_channel *ch = fiber_channel_new(10); * @endcode */ -struct ipc_channel * -ipc_channel_new(uint32_t size); +struct fiber_channel * +fiber_channel_new(uint32_t size); /** * Destroy and free an IPC channel. @@ -184,7 +184,7 @@ ipc_channel_new(uint32_t size); * @param ch channel */ void -ipc_channel_delete(struct ipc_channel *ch); +fiber_channel_delete(struct fiber_channel *ch); /** * Check if the channel buffer is empty. @@ -197,12 +197,12 @@ ipc_channel_delete(struct ipc_channel *ch); * @retval false otherwise * * @code - * if (!ipc_channel_is_empty(ch)) - * ipc_channel_get(ch, ...); + * if (!fiber_channel_is_empty(ch)) + * fiber_channel_get(ch, ...); * @endcode */ static inline bool -ipc_channel_is_empty(struct ipc_channel *ch) +fiber_channel_is_empty(struct fiber_channel *ch) { return ch->count == 0; } @@ -217,12 +217,12 @@ ipc_channel_is_empty(struct ipc_channel *ch) * * @return false otherwise * @code - * if (!ipc_channel_is_full(ch)) - * ipc_channel_put(ch, "message"); + * if (!fiber_channel_is_full(ch)) + * fiber_channel_put(ch, "message"); * @endcode */ static inline bool -ipc_channel_is_full(struct ipc_channel *ch) +fiber_channel_is_full(struct fiber_channel *ch) { return ch->count >= ch->size; } @@ -233,7 +233,7 @@ ipc_channel_is_full(struct ipc_channel *ch) * a custom destructor. */ int -ipc_channel_put_msg_timeout(struct ipc_channel *ch, +fiber_channel_put_msg_timeout(struct fiber_channel *ch, struct ipc_msg *msg, ev_tstamp timeout); @@ -251,7 +251,7 @@ ipc_channel_put_msg_timeout(struct ipc_channel *ch, * */ int -ipc_channel_put_timeout(struct ipc_channel *ch, +fiber_channel_put_timeout(struct fiber_channel *ch, void *data, ev_tstamp timeout); @@ -266,14 +266,14 @@ ipc_channel_put_timeout(struct ipc_channel *ch, * @param data * * @code - * ipc_channel_put(ch, "message"); + * fiber_channel_put(ch, "message"); * @endcode * @return -1 if the channel is closed */ static inline int -ipc_channel_put(struct ipc_channel *ch, void *data) +fiber_channel_put(struct fiber_channel *ch, void *data) { - return ipc_channel_put_timeout(ch, data, TIMEOUT_INFINITY); + return fiber_channel_put_timeout(ch, data, TIMEOUT_INFINITY); } /** @@ -281,7 +281,7 @@ ipc_channel_put(struct ipc_channel *ch, void *data) * The caller is responsible for message destruction. */ int -ipc_channel_get_msg_timeout(struct ipc_channel *ch, +fiber_channel_get_msg_timeout(struct fiber_channel *ch, struct ipc_msg **msg, ev_tstamp timeout); /** @@ -295,13 +295,13 @@ ipc_channel_get_msg_timeout(struct ipc_channel *ch, * @code * do { * struct ipc_msg *msg; - * int rc = ipc_channel_get_timeout(ch, 0.5, ); + * int rc = fiber_channel_get_timeout(ch, 0.5, ); * printf("message: %p\n", msg); * } while (msg); * @endcode */ int -ipc_channel_get_timeout(struct ipc_channel *ch, +fiber_channel_get_timeout(struct fiber_channel *ch, void **data, ev_tstamp timeout); @@ -313,9 +313,9 @@ ipc_channel_get_timeout(struct ipc_channel *ch, * @return 0 on success, -1 on error */ static inline int -ipc_channel_get(struct ipc_channel *ch, void **data) +fiber_channel_get(struct fiber_channel *ch, void **data) { - return ipc_channel_get_timeout(ch, data, TIMEOUT_INFINITY); + return fiber_channel_get_timeout(ch, data, TIMEOUT_INFINITY); } /** @@ -323,18 +323,18 @@ ipc_channel_get(struct ipc_channel *ch, void **data) * for new messages. */ bool -ipc_channel_has_readers(struct ipc_channel *ch); +fiber_channel_has_readers(struct fiber_channel *ch); /** * Check if the channel has writer fibers that wait * for readers. */ bool -ipc_channel_has_writers(struct ipc_channel *ch); +fiber_channel_has_writers(struct fiber_channel *ch); /** Channel buffer size. */ static inline uint32_t -ipc_channel_size(struct ipc_channel *ch) +fiber_channel_size(struct fiber_channel *ch) { return ch->size; } @@ -345,7 +345,7 @@ ipc_channel_size(struct ipc_channel *ch) * if the buffer is full. */ static inline uint32_t -ipc_channel_count(struct ipc_channel *ch) +fiber_channel_count(struct fiber_channel *ch) { return ch->count; } @@ -355,14 +355,14 @@ ipc_channel_count(struct ipc_channel *ch) * and wakes up all readers and writers. */ void -ipc_channel_close(struct ipc_channel *ch); +fiber_channel_close(struct fiber_channel *ch); /** * True if the channel is closed for both for reading * and writing. */ static inline bool -ipc_channel_is_closed(struct ipc_channel *ch) +fiber_channel_is_closed(struct fiber_channel *ch) { return ch->is_closed; } @@ -374,30 +374,30 @@ ipc_channel_is_closed(struct ipc_channel *ch) #include "fiber.h" struct IpcChannelGuard { - struct ipc_channel *ch; + struct fiber_channel *ch; IpcChannelGuard(uint32_t size) { - ch = ipc_channel_new(size); + ch = fiber_channel_new(size); if (ch == NULL) diag_raise(); } ~IpcChannelGuard() { - ipc_channel_delete(ch); + fiber_channel_delete(ch); } }; static inline void -ipc_channel_get_xc(struct ipc_channel *ch, void **data) +fiber_channel_get_xc(struct fiber_channel *ch, void **data) { - if (ipc_channel_get(ch, data) != 0) + if (fiber_channel_get(ch, data) != 0) diag_raise(); } static inline void -ipc_channel_put_xc(struct ipc_channel *ch, void *data) +fiber_channel_put_xc(struct fiber_channel *ch, void *data) { - if (ipc_channel_put(ch, data) != 0) + if (fiber_channel_put(ch, data) != 0) diag_raise(); } diff --git a/src/lua/fiber_channel.c b/src/lua/fiber_channel.c index 68b79affad7a5a37fb494156156cc6873a9f1965..de6d3770d64585bf1bcb40aab1fc7b0854369c5a 100644 --- a/src/lua/fiber_channel.c +++ b/src/lua/fiber_channel.c @@ -48,10 +48,8 @@ luaL_error(lua_State *L, const char *fmt, ...); static const char channel_typename[] = "fiber.channel"; -/******************** channel ***************************/ - static int -lbox_ipc_channel(struct lua_State *L) +luaT_fiber_channel(struct lua_State *L) { lua_Integer size = 0; @@ -65,11 +63,11 @@ lbox_ipc_channel(struct lua_State *L) luaL_error(L, "fiber.channel(size): bad arguments"); } - struct ipc_channel *ch = (struct ipc_channel *) - lua_newuserdata(L, ipc_channel_memsize(size)); + struct fiber_channel *ch = (struct fiber_channel *) + lua_newuserdata(L, fiber_channel_memsize(size)); if (ch == NULL) luaL_error(L, "fiber.channel: not enough memory"); - ipc_channel_create(ch, size); + fiber_channel_create(ch, size); luaL_getmetatable(L, channel_typename); @@ -77,40 +75,41 @@ lbox_ipc_channel(struct lua_State *L) return 1; } -static inline struct ipc_channel * -lbox_check_channel(struct lua_State *L, int index, const char *source) +static inline struct fiber_channel * +luaT_checkfiberchannel(struct lua_State *L, int index, const char *source) { assert(index > 0); if (index > lua_gettop(L)) luaL_error(L, "usage: %s", source); /* Note: checkudata errs on mismatch, no point in checking res */ - return (struct ipc_channel *) luaL_checkudata(L, index, channel_typename); + return (struct fiber_channel *) luaL_checkudata(L, index, + channel_typename); } static int -lbox_ipc_channel_gc(struct lua_State *L) +luaT_fiber_channel_gc(struct lua_State *L) { - struct ipc_channel *ch = (struct ipc_channel *) + struct fiber_channel *ch = (struct fiber_channel *) luaL_checkudata(L, -1, channel_typename); if (ch) - ipc_channel_destroy(ch); + fiber_channel_destroy(ch); return 0; } static int -lbox_ipc_channel_is_full(struct lua_State *L) +luaT_fiber_channel_is_full(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, "channel:is_full()"); - lua_pushboolean(L, ipc_channel_is_full(ch)); + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:is_full()"); + lua_pushboolean(L, fiber_channel_is_full(ch)); return 1; } static int -lbox_ipc_channel_is_empty(struct lua_State *L) +luaT_fiber_channel_is_empty(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:is_empty()"); - lua_pushboolean(L, ipc_channel_is_empty(ch)); + lua_pushboolean(L, fiber_channel_is_empty(ch)); return 1; } @@ -123,12 +122,12 @@ lua_ipc_value_destroy(struct ipc_msg *base) } static int -lbox_ipc_channel_put(struct lua_State *L) +luaT_fiber_channel_put(struct lua_State *L) { static const char usage[] = "channel:put(var [, timeout])"; int rc = -1; - struct ipc_channel *ch = - lbox_check_channel(L, 1, usage); + struct fiber_channel *ch = + luaT_checkfiberchannel(L, 1, usage); ev_tstamp timeout; /* val */ @@ -154,7 +153,7 @@ lbox_ipc_channel_put(struct lua_State *L) lua_pushvalue(L, 2); value->i = luaL_ref(L, LUA_REGISTRYINDEX); - rc = ipc_channel_put_msg_timeout(ch, &value->base, timeout); + rc = fiber_channel_put_msg_timeout(ch, &value->base, timeout); if (rc) { value->base.destroy(&value->base); #if 0 @@ -171,11 +170,11 @@ lbox_ipc_channel_put(struct lua_State *L) } static int -lbox_ipc_channel_get(struct lua_State *L) +luaT_fiber_channel_get(struct lua_State *L) { static const char usage[] = "channel:get([timeout])"; - struct ipc_channel *ch = - lbox_check_channel(L, 1, usage); + struct fiber_channel *ch = + luaT_checkfiberchannel(L, 1, usage); ev_tstamp timeout; /* timeout (optional) */ @@ -190,7 +189,7 @@ lbox_ipc_channel_get(struct lua_State *L) } struct ipc_value *value; - if (ipc_channel_get_msg_timeout(ch, (struct ipc_msg **) &value, + if (fiber_channel_get_msg_timeout(ch, (struct ipc_msg **) &value, timeout)) { #if 0 /* Treat everything except timeout as error. */ @@ -208,65 +207,65 @@ lbox_ipc_channel_get(struct lua_State *L) } static int -lbox_ipc_channel_has_readers(struct lua_State *L) +luaT_fiber_channel_has_readers(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:has_readers()"); - lua_pushboolean(L, ipc_channel_has_readers(ch)); + lua_pushboolean(L, fiber_channel_has_readers(ch)); return 1; } static int -lbox_ipc_channel_has_writers(struct lua_State *L) +luaT_fiber_channel_has_writers(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:has_writers()"); - lua_pushboolean(L, ipc_channel_has_writers(ch)); + lua_pushboolean(L, fiber_channel_has_writers(ch)); return 1; } static int -lbox_ipc_channel_size(struct lua_State *L) +luaT_fiber_channel_size(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, "channel:size()"); - lua_pushinteger(L, ipc_channel_size(ch)); + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:size()"); + lua_pushinteger(L, fiber_channel_size(ch)); return 1; } static int -lbox_ipc_channel_count(struct lua_State *L) +luaT_fiber_channel_count(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, "channel:count()"); - lua_pushinteger(L, ipc_channel_count(ch)); + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:count()"); + lua_pushinteger(L, fiber_channel_count(ch)); return 1; } static int -lbox_ipc_channel_close(struct lua_State *L) +luaT_fiber_channel_close(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, "channel:close()"); + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:close()"); /* Shutdown the channel for writing and wakeup waiters */ - ipc_channel_close(ch); + fiber_channel_close(ch); return 0; } static int -lbox_ipc_channel_is_closed(struct lua_State *L) +luaT_fiber_channel_is_closed(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, "channel:is_closed()"); - lua_pushboolean(L, ipc_channel_is_closed(ch)); + lua_pushboolean(L, fiber_channel_is_closed(ch)); return 1; } static int -lbox_ipc_channel_to_string(struct lua_State *L) +luaT_fiber_channel_to_string(struct lua_State *L) { - struct ipc_channel *ch = lbox_check_channel(L, 1, ""); - if (ipc_channel_is_closed(ch)) { + struct fiber_channel *ch = luaT_checkfiberchannel(L, 1, ""); + if (fiber_channel_is_closed(ch)) { lua_pushstring(L, "channel: closed"); } else { - lua_pushfstring(L, "channel: %d", (int)ipc_channel_count(ch)); + lua_pushfstring(L, "channel: %d", (int)fiber_channel_count(ch)); } return 1; } @@ -275,24 +274,24 @@ void tarantool_lua_fiber_channel_init(struct lua_State *L) { static const struct luaL_Reg channel_meta[] = { - {"__gc", lbox_ipc_channel_gc}, - {"__tostring", lbox_ipc_channel_to_string}, - {"is_full", lbox_ipc_channel_is_full}, - {"is_empty", lbox_ipc_channel_is_empty}, - {"put", lbox_ipc_channel_put}, - {"get", lbox_ipc_channel_get}, - {"has_readers", lbox_ipc_channel_has_readers}, - {"has_writers", lbox_ipc_channel_has_writers}, - {"count", lbox_ipc_channel_count}, - {"size", lbox_ipc_channel_size}, - {"close", lbox_ipc_channel_close}, - {"is_closed", lbox_ipc_channel_is_closed}, + {"__gc", luaT_fiber_channel_gc}, + {"__tostring", luaT_fiber_channel_to_string}, + {"is_full", luaT_fiber_channel_is_full}, + {"is_empty", luaT_fiber_channel_is_empty}, + {"put", luaT_fiber_channel_put}, + {"get", luaT_fiber_channel_get}, + {"has_readers", luaT_fiber_channel_has_readers}, + {"has_writers", luaT_fiber_channel_has_writers}, + {"count", luaT_fiber_channel_count}, + {"size", luaT_fiber_channel_size}, + {"close", luaT_fiber_channel_close}, + {"is_closed", luaT_fiber_channel_is_closed}, {NULL, NULL} }; luaL_register_type(L, channel_typename, channel_meta); static const struct luaL_Reg ipc_lib[] = { - {"channel", lbox_ipc_channel}, + {"channel", luaT_fiber_channel}, {NULL, NULL} }; diff --git a/test/unit/fiber_channel.cc b/test/unit/fiber_channel.cc index 936014b1728918cb2325c5a90098e3527d1650d8..90c73a58126b58654d7bfe7bc787e9f8585bb383 100644 --- a/test/unit/fiber_channel.cc +++ b/test/unit/fiber_channel.cc @@ -6,73 +6,73 @@ int status; void -ipc_basic() +fiber_channel_basic() { header(); plan(10); - struct ipc_channel *channel = ipc_channel_new(1); - ok(channel != NULL, "ipc_channel_new()"); + struct fiber_channel *channel = fiber_channel_new(1); + ok(channel != NULL, "fiber_channel_new()"); - ok(ipc_channel_size(channel) == 1, "ipc_channel_size()"); + ok(fiber_channel_size(channel) == 1, "fiber_channel_size()"); - ok(ipc_channel_count(channel) == 0, "ipc_channel_count()"); + ok(fiber_channel_count(channel) == 0, "fiber_channel_count()"); - ok(ipc_channel_is_full(channel) == false, "ipc_channel_is_full()"); + ok(fiber_channel_is_full(channel) == false, "fiber_channel_is_full()"); - ok(ipc_channel_is_empty(channel) == true, "ipc_channel_is_empty()"); + ok(fiber_channel_is_empty(channel) == true, "fiber_channel_is_empty()"); char dummy; - ipc_channel_put(channel, &dummy); + fiber_channel_put(channel, &dummy); - ok(ipc_channel_size(channel) == 1, "ipc_channel_size(1)"); + ok(fiber_channel_size(channel) == 1, "fiber_channel_size(1)"); - ok(ipc_channel_count(channel) == 1, "ipc_channel_count(1)"); + ok(fiber_channel_count(channel) == 1, "fiber_channel_count(1)"); - ok(ipc_channel_is_full(channel) == true, "ipc_channel_is_full(1)"); + ok(fiber_channel_is_full(channel) == true, "fiber_channel_is_full(1)"); - ok(ipc_channel_is_empty(channel) == false, "ipc_channel_is_empty(1)"); + ok(fiber_channel_is_empty(channel) == false, "fiber_channel_is_empty(1)"); void *ptr; - ipc_channel_get(channel, &ptr); - ok(ptr == &dummy, "ipc_channel_get()"); + fiber_channel_get(channel, &ptr); + ok(ptr == &dummy, "fiber_channel_get()"); - ipc_channel_delete(channel); + fiber_channel_delete(channel); footer(); status = check_plan(); } void -ipc_get() +fiber_channel_get() { header(); plan(7); - struct ipc_channel *channel = ipc_channel_new(1); + struct fiber_channel *channel = fiber_channel_new(1); char dummy; - ok(ipc_channel_put_timeout(channel, &dummy, 0) == 0, - "ipc_channel_put(0)"); - ok(ipc_channel_put_timeout(channel, &dummy, 0) == -1, - "ipc_channel_put_timeout(0)"); + ok(fiber_channel_put_timeout(channel, &dummy, 0) == 0, + "fiber_channel_put(0)"); + ok(fiber_channel_put_timeout(channel, &dummy, 0) == -1, + "fiber_channel_put_timeout(0)"); void *ptr; - ipc_channel_get(channel, &ptr); - ok(ptr == &dummy, "ipc_channel_get(0)"); - ok(ipc_channel_put_timeout(channel, &dummy, 0.01) == 0, - "ipc_channel_put_timeout(1)"); - ipc_channel_get(channel, &ptr); - ok(ptr == &dummy, "ipc_channel_get(1)"); + fiber_channel_get(channel, &ptr); + ok(ptr == &dummy, "fiber_channel_get(0)"); + ok(fiber_channel_put_timeout(channel, &dummy, 0.01) == 0, + "fiber_channel_put_timeout(1)"); + fiber_channel_get(channel, &ptr); + ok(ptr == &dummy, "fiber_channel_get(1)"); - ipc_channel_close(channel); + fiber_channel_close(channel); - ok(ipc_channel_put(channel, &dummy) == -1, "ipc_channel_put(closed)"); + ok(fiber_channel_put(channel, &dummy) == -1, "fiber_channel_put(closed)"); - ok(ipc_channel_get(channel, &ptr) == -1, "ipc_channel_get(closed)"); + ok(fiber_channel_get(channel, &ptr) == -1, "fiber_channel_get(closed)"); - ipc_channel_delete(channel); + fiber_channel_delete(channel); footer(); status = check_plan(); @@ -82,8 +82,8 @@ int main_f(va_list ap) { (void) ap; - ipc_basic(); - ipc_get(); + fiber_channel_basic(); + fiber_channel_get(); ev_break(loop(), EVBREAK_ALL); return 0; } diff --git a/test/unit/fiber_channel.result b/test/unit/fiber_channel.result index 92667aab5f72f8560dbaa0d77fcecad5a2a56669..b03949d8c78746f9c44a2c6201f63f367d4d3b0a 100644 --- a/test/unit/fiber_channel.result +++ b/test/unit/fiber_channel.result @@ -1,23 +1,23 @@ - *** ipc_basic *** + *** fiber_channel_basic *** 1..10 -ok 1 - ipc_channel_new() -ok 2 - ipc_channel_size() -ok 3 - ipc_channel_count() -ok 4 - ipc_channel_is_full() -ok 5 - ipc_channel_is_empty() -ok 6 - ipc_channel_size(1) -ok 7 - ipc_channel_count(1) -ok 8 - ipc_channel_is_full(1) -ok 9 - ipc_channel_is_empty(1) -ok 10 - ipc_channel_get() - *** ipc_basic: done *** - *** ipc_get *** +ok 1 - fiber_channel_new() +ok 2 - fiber_channel_size() +ok 3 - fiber_channel_count() +ok 4 - fiber_channel_is_full() +ok 5 - fiber_channel_is_empty() +ok 6 - fiber_channel_size(1) +ok 7 - fiber_channel_count(1) +ok 8 - fiber_channel_is_full(1) +ok 9 - fiber_channel_is_empty(1) +ok 10 - fiber_channel_get() + *** fiber_channel_basic: done *** + *** fiber_channel_get *** 1..7 -ok 1 - ipc_channel_put(0) -ok 2 - ipc_channel_put_timeout(0) -ok 3 - ipc_channel_get(0) -ok 4 - ipc_channel_put_timeout(1) -ok 5 - ipc_channel_get(1) -ok 6 - ipc_channel_put(closed) -ok 7 - ipc_channel_get(closed) - *** ipc_get: done *** +ok 1 - fiber_channel_put(0) +ok 2 - fiber_channel_put_timeout(0) +ok 3 - fiber_channel_get(0) +ok 4 - fiber_channel_put_timeout(1) +ok 5 - fiber_channel_get(1) +ok 6 - fiber_channel_put(closed) +ok 7 - fiber_channel_get(closed) + *** fiber_channel_get: done *** diff --git a/test/unit/fiber_channel_stress.cc b/test/unit/fiber_channel_stress.cc index b487c7d3aa63a7dd25171cd44ca43d92f8293eb6..98aec697bd39bcc2f92b889c2136f6c1ef4dd243 100644 --- a/test/unit/fiber_channel_stress.cc +++ b/test/unit/fiber_channel_stress.cc @@ -10,21 +10,21 @@ enum { static int push_f(va_list ap) { - struct ipc_channel *channel = va_arg(ap, struct ipc_channel *); + struct fiber_channel *channel = va_arg(ap, struct fiber_channel *); for (int i = 0; i < ITERATIONS; i++) - ipc_channel_put(channel, NULL); + fiber_channel_put(channel, NULL); return 0; } static int pop_f(va_list ap) { - struct ipc_channel *channel = va_arg(ap, struct ipc_channel *); + struct fiber_channel *channel = va_arg(ap, struct fiber_channel *); for (int i = 0; i < ITERATIONS; i++) { void *ptr; - ipc_channel_get(channel, &ptr); + fiber_channel_get(channel, &ptr); } return 0; } @@ -37,12 +37,12 @@ main_f(va_list ap) fiber_set_joinable(push, true); struct fiber *pop = fiber_new_xc("pop_f", pop_f); fiber_set_joinable(pop, true); - struct ipc_channel *channel = ipc_channel_new(1); + struct fiber_channel *channel = fiber_channel_new(1); fiber_start(push, channel); fiber_start(pop, channel); fiber_join(push); fiber_join(pop); - ipc_channel_delete(channel); + fiber_channel_delete(channel); ev_break(loop(), EVBREAK_ALL); footer(); return 0;