diff --git a/include/fiber.h b/include/fiber.h index 9c7de99c6b6d9e2deaea127d60196e0c5cdac376..0f4e1827131242035048b46d74c5c993dc1a87a5 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -40,16 +40,19 @@ #include "exception.h" #include "palloc.h" +#include <rlist.h> #define FIBER_NAME_MAXLEN 32 -#define FIBER_READING_INBOX 0x1 +#define FIBER_READING_INBOX (1 << 0) /** This fiber can be cancelled synchronously. */ -#define FIBER_CANCELLABLE 0x2 +#define FIBER_CANCELLABLE (1 << 1) /** Indicates that a fiber has been cancelled. */ -#define FIBER_CANCEL 0x4 +#define FIBER_CANCEL (1 << 2) /** This fiber was created via stored procedures API. */ -#define FIBER_USER_MODE 0x8 +#define FIBER_USER_MODE (1 << 3) +/** This fiber was marked as ready for wake up */ +#define FIBER_READY (1 << 4) /** This is thrown by fiber_* API calls when the fiber is * cancelled. @@ -59,7 +62,6 @@ @end struct fiber { - ev_async async; #ifdef ENABLE_BACKTRACE void *last_stack_frame; #endif @@ -73,7 +75,9 @@ struct fiber { ev_child cw; SLIST_ENTRY(fiber) link, zombie_link; - STAILQ_ENTRY(fiber) ifc; /* inter fiber communication */ + + struct rlist ready; /* wakeup queue */ + struct rlist ifc; /* inter fiber communication */ /* ASCIIZ name of this fiber. */ char name[FIBER_NAME_MAXLEN]; @@ -123,7 +127,6 @@ bool fiber_setcancellable(bool enable); void fiber_sleep(ev_tstamp s); struct tbuf; void fiber_info(struct tbuf *out); -void -fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))); +void fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))); #endif /* TARANTOOL_FIBER_H_INCLUDED */ diff --git a/include/ifc.h b/include/ifc.h index e77784a25ad85011a4c59b759772bd80987b9bf4..112571cdce33ae9b1c3b0b5f0ac3928d0dc8c822 100644 --- a/include/ifc.h +++ b/include/ifc.h @@ -31,33 +31,33 @@ #define TARANTOOL_IFC_H_INCLUDED #include <tarantool_ev.h> -#include "third_party/queue.h" /** @brief SEMAPHORES */ -struct fiber_semaphore; +struct ifc_semaphore; + /** @brief allocator @return malloced semaphore @code - struct fiber_semaphore *s = fiber_semaphore_alloc(); + struct ifc_semaphore *s = ifc_semaphore_alloc(); @endcode */ -struct fiber_semaphore *fiber_semaphore_alloc(void); +struct ifc_semaphore *ifc_semaphore_alloc(void); /** @brief init semaphore @param s semaphore @param cnt initial value of semaphore @code - struct fiber_semaphore *s = fiber_semaphore_alloc(); - fiber_semaphore_init(s); + struct ifc_semaphore *s = ifc_semaphore_alloc(); + ifc_semaphore_init(s); @endcode */ -void fiber_semaphore_init(struct fiber_semaphore *s, int cnt); +void ifc_semaphore_init(struct ifc_semaphore *s, int cnt); /** @brief down semafore @@ -66,25 +66,25 @@ fiber (if semaphore counter < 0) until the other fiber increments the counter. @param s semaphore @code - fiber_semaphore_down(s); + ifc_semaphore_down(s); // do something - fiber_semaphore_up(s); + ifc_semaphore_up(s); @endcode */ -void fiber_semaphore_down(struct fiber_semaphore *s); +void ifc_semaphore_down(struct ifc_semaphore *s); /** @brief up semaphore @detail increment semaphore's counter and unlock one locked fiber @param s semaphore @code - fiber_semaphore_down(s); + ifc_semaphore_down(s); // do something - fiber_semaphore_up(s); + ifc_semaphore_up(s); @endcode */ -void fiber_semaphore_up(struct fiber_semaphore *s); +void ifc_semaphore_up(struct ifc_semaphore *s); /** @brief down semaphore in timeout @@ -96,7 +96,7 @@ increments the counter or timeout exceeded. @return 0 if success @return ETIMEDOUT if timeout exceeded */ -int fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout); +int ifc_semaphore_down_timeout(struct ifc_semaphore *s, ev_tstamp timeout); /** @brief try to down semaphore @@ -104,14 +104,14 @@ int fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout); @param s semaphore @return 0 if success */ -int fiber_semaphore_trydown(struct fiber_semaphore *s); +int ifc_semaphore_trydown(struct ifc_semaphore *s); /** @brief get semaphore's counter @param s semaphore @return semaphore's counter */ -int fiber_semaphore_counter(struct fiber_semaphore *s); +int ifc_semaphore_counter(struct ifc_semaphore *s); @@ -119,26 +119,26 @@ int fiber_semaphore_counter(struct fiber_semaphore *s); @brief MUTEXES */ -struct fiber_mutex; +struct ifc_mutex; /** @brief allocate new mutex @return malloced mutex structure @code - struct fiber_mutex *m = fiber_mutex_alloc(); + struct ifc_mutex *m = ifc_mutex_alloc(); @endcode */ -struct fiber_mutex *fiber_mutex_alloc(void); +struct ifc_mutex *ifc_mutex_alloc(void); /** @brief init mutex @param m mutex @code - struct fiber_mutex *m = fiber_mutex_alloc(); - fiber_mutex_init(m); + struct ifc_mutex *m = ifc_mutex_alloc(); + ifc_mutex_init(m); @endcode */ -void fiber_mutex_init(struct fiber_mutex *m); +void ifc_mutex_init(struct ifc_mutex *m); /** @brief lock mutex @@ -146,12 +146,12 @@ void fiber_mutex_init(struct fiber_mutex *m); the other fiber unlock the mutex. @param m mutex @code - fiber_mutex_lock(m); + ifc_mutex_lock(m); // do something - fiber_mutex_unlock(m); + ifc_mutex_unlock(m); @endcode */ -void fiber_mutex_lock(struct fiber_mutex *m); +void ifc_mutex_lock(struct ifc_mutex *m); /** @brief lock mutex in timeout @@ -161,24 +161,24 @@ unlock the mutex. @param mutex @param timeout @code - fiber_mutex_lock(m); + ifc_mutex_lock(m); // do something - fiber_mutex_unlock(m); + ifc_mutex_unlock(m); @endcode */ -int fiber_mutex_lock_timeout(struct fiber_mutex *m, ev_tstamp timeout); +int ifc_mutex_lock_timeout(struct ifc_mutex *m, ev_tstamp timeout); /** @brief unlock mutex @detail unlock one locked fiber @param mutex @code - fiber_mutex_lock(m); + ifc_mutex_lock(m); // do something - fiber_mutex_unlock(m); + ifc_mutex_unlock(m); @endcode */ -void fiber_mutex_unlock(struct fiber_mutex *m); +void ifc_mutex_unlock(struct ifc_mutex *m); /** @brief try to lock mutex @@ -186,7 +186,7 @@ void fiber_mutex_unlock(struct fiber_mutex *m); @return 0 if mutex locked @return ETIMEDOUT if mutex is already locked */ -int fiber_mutex_trylock(struct fiber_mutex *m); +int ifc_mutex_trylock(struct ifc_mutex *m); /** @brief check if mutex is locked @@ -194,34 +194,34 @@ int fiber_mutex_trylock(struct fiber_mutex *m); @return 0 if mutex is free @return 1 if mutex is locked */ -int fiber_mutex_islocked(struct fiber_mutex *m); +int ifc_mutex_islocked(struct ifc_mutex *m); + /** @brief CHANNELS */ +struct ifc_channel; - -struct fiber_channel; /** @brief allocator @param size @return malloced channel (or NULL) @code - struct fiber_channel *ch = fiber_channel_alloc(10); + struct ifc_channel *ch = ifc_channel_alloc(10); @endcode */ -struct fiber_channel *fiber_channel_alloc(unsigned size); +struct ifc_channel *ifc_channel_alloc(unsigned size); /** @brief init channel @param channel @code - struct fiber_channel *ch = fiber_channel_alloc(10); - fiber_channel_init(ch); + struct ifc_channel *ch = ifc_channel_alloc(10); + ifc_channel_init(ch); @endcode */ -void fiber_channel_init(struct fiber_channel *ch); +void ifc_channel_init(struct ifc_channel *ch); /** @brief put data into channel @@ -229,30 +229,29 @@ void fiber_channel_init(struct fiber_channel *ch); @param channel @param data @code - fiber_channel_put(ch, "message"); + ifc_channel_put(ch, "message"); @endcode */ -void fiber_channel_put(struct fiber_channel *ch, void *data); +void ifc_channel_put(struct ifc_channel *ch, void *data); /** @brief get data from channel @detail lock current fiber if channel is empty @param channel -@return data that was put into channel by fiber_channel_put +@return data that was put into channel by ifc_channel_put @code - char *msg = fiber_channel_get(ch); + char *msg = ifc_channel_get(ch); @endcode */ -void *fiber_channel_get(struct fiber_channel *ch); - +void *ifc_channel_get(struct ifc_channel *ch); /** -@brief wake up all fibers that sleep by fiber_channel_get and send message to them +@brief wake up all fibers that sleep by ifc_channel_get and send message to them @param channel @param data @return count of fibers received the message */ -int fiber_channel_broadcast(struct fiber_channel *ch, void *data); +int ifc_channel_broadcast(struct ifc_channel *ch, void *data); /** @brief check if channel is empty @@ -260,11 +259,11 @@ int fiber_channel_broadcast(struct fiber_channel *ch, void *data); @return 1 (TRUE) if channel is empty @return 0 otherwise @code - if (!fiber_channel_isempty(ch)) - char *msg = fiber_channel_get(ch); + if (!ifc_channel_isempty(ch)) + char *msg = ifc_channel_get(ch); @endcode */ -int fiber_channel_isempty(struct fiber_channel *ch); +int ifc_channel_isempty(struct ifc_channel *ch); /** @brief check if channel is full @@ -272,11 +271,12 @@ int fiber_channel_isempty(struct fiber_channel *ch); @return 1 (TRUE) if channel is full @return 0 otherwise @code - if (!fiber_channel_isfull(ch)) - fiber_channel_put(ch, "message"); + if (!ifc_channel_isfull(ch)) + ifc_channel_put(ch, "message"); @endcode */ -int fiber_channel_isfull(struct fiber_channel *ch); + +int ifc_channel_isfull(struct ifc_channel *ch); /** @brief put data into channel in timeout @@ -286,14 +286,15 @@ int fiber_channel_isfull(struct fiber_channel *ch); @return 0 if success @return ETIMEDOUT if timeout exceeded @code - if (fiber_channel_put_timeout(ch, "message", 0.25) == 0) + if (ifc_channel_put_timeout(ch, "message", 0.25) == 0) return "ok"; else return "timeout exceeded"; @endcode */ -int fiber_channel_put_timeout(struct fiber_channel *ch, - void *data, ev_tstamp timeout); +int +ifc_channel_put_timeout(struct ifc_channel *ch, void *data, ev_tstamp timeout); + /** @brief get data into channel in timeout @param channel @@ -302,13 +303,26 @@ int fiber_channel_put_timeout(struct fiber_channel *ch, @return NULL if timeout exceeded @code do { - char *msg = fiber_channel_get_timeout(ch, 0.5); + char *msg = ifc_channel_get_timeout(ch, 0.5); printf("message: %p\n", msg); } until(msg); return msg; @endcode */ -void *fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout); +void *ifc_channel_get_timeout(struct ifc_channel *ch, ev_tstamp timeout); + + +/** +@brief return true if channel has reader fibers that wait data +@param channel +*/ +int ifc_channel_has_readers(struct ifc_channel *ch); + +/** +@brief return true if channel has writer fibers that wait data +@param channel +*/ +int ifc_channel_has_writers(struct ifc_channel *ch); #endif /* TARANTOOL_IFC_H_INCLUDED */ diff --git a/include/rlist.h b/include/rlist.h index e24f1b5d319e7f4be91ec866b8694355e5891f82..4b38d4ab903458a4f9f8444d03f69e8d49752026 100644 --- a/include/rlist.h +++ b/include/rlist.h @@ -151,6 +151,13 @@ rlist_empty(struct rlist *item) #define rlist_add_tail_entry(head, item, member) \ rlist_add_tail((head), &(item)->member) + +/** + * delete entry from list + */ +#define rlist_del_entry(item, member) \ + rlist_del(&((item)->member)) + /** * foreach through list */ diff --git a/src/fiber.m b/src/fiber.m index a240543017d3d62f95cdc89229809c5ecd1f3d9f..f1f6542bb8058466a35c6bdd647b48663f0259e2 100644 --- a/src/fiber.m +++ b/src/fiber.m @@ -41,10 +41,12 @@ #include <stat.h> #include <pickle.h> #include "iobuf.h" +#include <rlist.h> @implementation FiberCancelException @end + enum { FIBER_CALL_STACK = 16 }; static struct fiber sched; @@ -55,6 +57,9 @@ static __thread uint32_t last_used_fid; static __thread struct mh_i32ptr_t *fibers_registry; __thread SLIST_HEAD(, fiber) fibers, zombie_fibers; +static RLIST_HEAD(ready_fibers); +static ev_async ready_async; + static void update_last_stack_frame(struct fiber *fiber) { @@ -80,6 +85,8 @@ fiber_call(struct fiber *callee, ...) callee->csw++; + fiber->flags &= ~FIBER_READY; + va_start(fiber->f_data, callee); coro_transfer(&caller->coro.ctx, &callee->coro.ctx); va_end(fiber->f_data); @@ -94,7 +101,12 @@ fiber_call(struct fiber *callee, ...) void fiber_wakeup(struct fiber *f) { - ev_async_send(&f->async); + if (f->flags & FIBER_READY) + return; + f->flags |= FIBER_READY; + if (rlist_empty(&ready_fibers)) + ev_async_send(&ready_async); + rlist_add_tail(&ready_fibers, &f->ready); } /** Cancel the subject fiber. @@ -247,6 +259,17 @@ fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))) fiber_call(watcher->data); } +static void +fiber_ready_async(void) +{ + while(!rlist_empty(&ready_fibers)) { + struct fiber *f = + rlist_first_entry(&ready_fibers, struct fiber, ready); + rlist_del_entry(f, ready); + fiber_call(f); + } +} + struct fiber * fiber_find(int fid) { @@ -373,15 +396,15 @@ fiber_create(const char *name, void (*f) (va_list)) fiber->gc_pool = palloc_create_pool(""); fiber_alloc(fiber); - ev_async_init(&fiber->async, (void *)fiber_schedule); - ev_async_start(&fiber->async); ev_init(&fiber->timer, (void *)fiber_schedule); ev_init(&fiber->cw, (void *)fiber_schedule); - fiber->async.data = fiber->timer.data = fiber->cw.data = fiber; + fiber->timer.data = fiber->cw.data = fiber; SLIST_INSERT_HEAD(&fibers, fiber, link); + rlist_init(&fiber->ready); } + fiber->f = f; while (++last_used_fid <= 100) ; /* fids from 0 to 100 are reserved */ fiber->fid = last_used_fid; @@ -407,7 +430,7 @@ fiber_destroy(struct fiber *f) if (strcmp(f->name, "sched") == 0) return; - ev_async_stop(&f->async); + rlist_del(&f->ready); palloc_destroy_pool(f->gc_pool); tarantool_coro_destroy(&f->coro); } @@ -460,11 +483,15 @@ fiber_init(void) last_used_fid = 100; iobuf_init_readahead(cfg.readahead); + + ev_async_init(&ready_async, (void *)fiber_ready_async); + ev_async_start(&ready_async); } void fiber_free(void) { + ev_async_stop(&ready_async); /* Only clean up if initialized. */ if (fibers_registry) { fiber_destroy_all(); diff --git a/src/ifc.m b/src/ifc.m index 2066fcdf1efe5b129f9ea014fc950df7ae8dfdb8..c7b1cdb776024b99b4fb69e3c1bd0c5a904772c7 100644 --- a/src/ifc.m +++ b/src/ifc.m @@ -30,38 +30,38 @@ #include "ifc.h" #include "fiber.h" #include <stdlib.h> +#include <rlist.h> - -struct fiber_semaphore { +struct ifc_semaphore { int count; - STAILQ_HEAD(, fiber) fibers, wakeup; + struct rlist fibers, wakeup; }; -struct fiber_semaphore * -fiber_semaphore_alloc(void) +struct ifc_semaphore * +ifc_semaphore_alloc(void) { - return malloc(sizeof(struct fiber_semaphore)); + return malloc(sizeof(struct ifc_semaphore)); } int -fiber_semaphore_counter(struct fiber_semaphore *s) +ifc_semaphore_counter(struct ifc_semaphore *s) { return s->count; } void -fiber_semaphore_init(struct fiber_semaphore *s, int cnt) +ifc_semaphore_init(struct ifc_semaphore *s, int cnt) { s->count = cnt; - STAILQ_INIT(&s->fibers); - STAILQ_INIT(&s->wakeup); + rlist_init(&s->fibers); + rlist_init(&s->wakeup); } int -fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) +ifc_semaphore_down_timeout(struct ifc_semaphore *s, ev_tstamp timeout) { int count = --s->count; @@ -71,7 +71,7 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) if (timeout < 0) timeout = 0; - STAILQ_INSERT_TAIL(&s->fibers, fiber, ifc); + rlist_add_tail_entry(&s->fibers, fiber, ifc); bool cancellable = fiber_setcancellable(true); @@ -89,105 +89,102 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) int timeouted = ETIMEDOUT; struct fiber *f; - STAILQ_FOREACH(f, &s->wakeup, ifc) { + rlist_foreach_entry(f, &s->wakeup, ifc) { if (f != fiber) continue; - STAILQ_REMOVE(&s->wakeup, fiber, fiber, ifc); timeouted = 0; break; } - if (timeouted) { - STAILQ_REMOVE(&s->fibers, fiber, fiber, ifc); + if (timeouted) s->count++; - } + rlist_del_entry(fiber, ifc); fiber_testcancel(); - return timeouted; } void -fiber_semaphore_down(struct fiber_semaphore *s) +ifc_semaphore_down(struct ifc_semaphore *s) { - fiber_semaphore_down_timeout(s, 0); + ifc_semaphore_down_timeout(s, 0); } void -fiber_semaphore_up(struct fiber_semaphore *s) +ifc_semaphore_up(struct ifc_semaphore *s) { s->count++; - if (STAILQ_EMPTY(&s->fibers)) + if (rlist_empty(&s->fibers)) return; /* wake up one fiber */ - struct fiber *f = STAILQ_FIRST(&s->fibers); - STAILQ_REMOVE(&s->fibers, f, fiber, ifc); - STAILQ_INSERT_TAIL(&s->wakeup, f, ifc); + struct fiber *f = rlist_first_entry(&s->fibers, struct fiber, ifc); + rlist_del_entry(f, ifc); + rlist_add_tail_entry(&s->wakeup, f, ifc); fiber_wakeup(f); } int -fiber_semaphore_trydown(struct fiber_semaphore *s) +ifc_semaphore_trydown(struct ifc_semaphore *s) { if (s->count <= 0) return s->count - 1; - fiber_semaphore_down(s); + ifc_semaphore_down(s); return 0; } -struct fiber_mutex { - struct fiber_semaphore semaphore; +struct ifc_mutex { + struct ifc_semaphore semaphore; }; -struct fiber_mutex * -fiber_mutex_alloc(void) +struct ifc_mutex * +ifc_mutex_alloc(void) { - return malloc(sizeof(struct fiber_mutex)); + return malloc(sizeof(struct ifc_mutex)); } void -fiber_mutex_init(struct fiber_mutex *m) +ifc_mutex_init(struct ifc_mutex *m) { - fiber_semaphore_init(&m->semaphore, 1); + ifc_semaphore_init(&m->semaphore, 1); } void -fiber_mutex_lock(struct fiber_mutex *m) +ifc_mutex_lock(struct ifc_mutex *m) { - fiber_semaphore_down(&m->semaphore); + ifc_semaphore_down(&m->semaphore); } int -fiber_mutex_lock_timeout(struct fiber_mutex *m, ev_tstamp timeout) +ifc_mutex_lock_timeout(struct ifc_mutex *m, ev_tstamp timeout) { - return fiber_semaphore_down_timeout(&m->semaphore, timeout); + return ifc_semaphore_down_timeout(&m->semaphore, timeout); } void -fiber_mutex_unlock(struct fiber_mutex *m) +ifc_mutex_unlock(struct ifc_mutex *m) { - fiber_semaphore_up(&m->semaphore); + ifc_semaphore_up(&m->semaphore); } int -fiber_mutex_trylock(struct fiber_mutex *m) +ifc_mutex_trylock(struct ifc_mutex *m) { - return fiber_semaphore_trydown(&m->semaphore); + return ifc_semaphore_trydown(&m->semaphore); } int -fiber_mutex_islocked(struct fiber_mutex *m) +ifc_mutex_islocked(struct ifc_mutex *m) { return m->semaphore.count <= 0; } /**********************************************************************/ -struct fiber_channel { - STAILQ_HEAD(, fiber) readers, writers, wakeup; +struct ifc_channel { + struct rlist readers, writers, wakeup; unsigned size; unsigned beg; unsigned count; @@ -199,50 +196,50 @@ struct fiber_channel { } __attribute__((packed)); int -fiber_channel_isempty(struct fiber_channel *ch) +ifc_channel_isempty(struct ifc_channel *ch) { return ch->count == 0; } int -fiber_channel_isfull(struct fiber_channel *ch) +ifc_channel_isfull(struct ifc_channel *ch) { return ch->count >= ch->size; } -struct fiber_channel * -fiber_channel_alloc(unsigned size) +struct ifc_channel * +ifc_channel_alloc(unsigned size) { if (!size) size = 1; - struct fiber_channel *res = - malloc(sizeof(struct fiber_channel) + sizeof(void *) * size); + struct ifc_channel *res = + malloc(sizeof(struct ifc_channel) + sizeof(void *) * size); if (res) res->size = size; return res; } void -fiber_channel_init(struct fiber_channel *ch) +ifc_channel_init(struct ifc_channel *ch) { ch->beg = ch->count = 0; ch->bcast = NULL; - STAILQ_INIT(&ch->readers); - STAILQ_INIT(&ch->writers); - STAILQ_INIT(&ch->wakeup); + rlist_init(&ch->readers); + rlist_init(&ch->writers); + rlist_init(&ch->wakeup); } void * -fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) +ifc_channel_get_timeout(struct ifc_channel *ch, ev_tstamp timeout) { if (timeout < 0) timeout = 0; /* channel is empty */ if (!ch->count) { - STAILQ_INSERT_TAIL(&ch->readers, fiber, ifc); + rlist_add_tail_entry(&ch->readers, fiber, ifc); bool cancellable = fiber_setcancellable(true); if (timeout) { @@ -255,18 +252,7 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) } - int timeouted = ETIMEDOUT; - struct fiber *f; - STAILQ_FOREACH(f, &ch->wakeup, ifc) { - if (f != fiber) - continue; - STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc); - timeouted = 0; - break; - } - if (timeouted) - STAILQ_REMOVE(&ch->readers, fiber, fiber, ifc); - + rlist_del_entry(fiber, ifc); fiber_testcancel(); fiber_setcancellable(cancellable); @@ -287,10 +273,11 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) ch->beg -= ch->size; ch->count--; - if (!STAILQ_EMPTY(&ch->writers)) { - struct fiber *f = STAILQ_FIRST(&ch->writers); - STAILQ_REMOVE_HEAD(&ch->writers, ifc); - STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); + if (!rlist_empty(&ch->writers)) { + struct fiber *f = + rlist_first_entry(&ch->writers, struct fiber, ifc); + rlist_del_entry(f, ifc); + rlist_add_tail_entry(&ch->wakeup, f, ifc); fiber_wakeup(f); } @@ -298,23 +285,22 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) } void * -fiber_channel_get(struct fiber_channel *ch) +ifc_channel_get(struct ifc_channel *ch) { - return fiber_channel_get_timeout(ch, 0); + return ifc_channel_get_timeout(ch, 0); } int -fiber_channel_put_timeout(struct fiber_channel *ch, void *data, +ifc_channel_put_timeout(struct ifc_channel *ch, void *data, ev_tstamp timeout) { - say_info("==== %s(%lu)", __func__, (unsigned long)data); if (timeout < 0) timeout = 0; /* channel is full */ if (ch->count >= ch->size) { - STAILQ_INSERT_TAIL(&ch->writers, fiber, ifc); + rlist_add_tail_entry(&ch->writers, fiber, ifc); bool cancellable = fiber_setcancellable(true); if (timeout) { @@ -326,24 +312,15 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data, fiber_yield(); } - int timeouted = ETIMEDOUT; - struct fiber *f; - STAILQ_FOREACH(f, &ch->wakeup, ifc) { - if (f != fiber) - continue; - STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc); - timeouted = 0; - break; - } - if (timeouted) - STAILQ_REMOVE(&ch->writers, fiber, fiber, ifc); + rlist_del_entry(fiber, ifc); fiber_testcancel(); fiber_setcancellable(cancellable); - if (timeouted) - return timeouted; } + if (ch->count >= ch->size) + return ETIMEDOUT; + unsigned i = ch->beg; i += ch->count; ch->count++; @@ -351,44 +328,60 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data, i -= ch->size; ch->item[i] = data; - if (!STAILQ_EMPTY(&ch->readers)) { - struct fiber *f = STAILQ_FIRST(&ch->readers); - STAILQ_REMOVE_HEAD(&ch->readers, ifc); - STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); + if (!rlist_empty(&ch->readers)) { + struct fiber *f = + rlist_first_entry(&ch->readers, struct fiber, ifc); + rlist_del_entry(f, ifc); + rlist_add_tail_entry(&ch->wakeup, f, ifc); fiber_wakeup(f); } return 0; } void -fiber_channel_put(struct fiber_channel *ch, void *data) +ifc_channel_put(struct ifc_channel *ch, void *data) { - fiber_channel_put_timeout(ch, data, 0); + ifc_channel_put_timeout(ch, data, 0); } int -fiber_channel_broadcast(struct fiber_channel *ch, void *data) +ifc_channel_has_readers(struct ifc_channel *ch) { - if (STAILQ_EMPTY(&ch->readers)) - return 0; + return !rlist_empty(&ch->readers); +} + +int +ifc_channel_has_writers(struct ifc_channel *ch) +{ + return !rlist_empty(&ch->writers); +} + +int +ifc_channel_broadcast(struct ifc_channel *ch, void *data) +{ + if (rlist_empty(&ch->readers)) { + ifc_channel_put(ch, data); + return 1; + } struct fiber *f; int count = 0; - STAILQ_FOREACH(f, &ch->readers, ifc) { + rlist_foreach_entry(f, &ch->readers, ifc) { count++; } - for (int i = 0; i < count && !STAILQ_EMPTY(&ch->readers); i++) { - struct fiber *f = STAILQ_FIRST(&ch->readers); - STAILQ_REMOVE_HEAD(&ch->readers, ifc); - STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); + for (int i = 0; i < count && !rlist_empty(&ch->readers); i++) { + struct fiber *f = + rlist_first_entry(&ch->readers, struct fiber, ifc); + rlist_del_entry(f, ifc); + rlist_add_tail_entry(&ch->wakeup, f, ifc); ch->bcast = fiber; ch->bcast_msg = data; fiber_wakeup(f); fiber_yield(); ch->bcast = NULL; fiber_testcancel(); - if (STAILQ_EMPTY(&ch->readers)) { + if (rlist_empty(&ch->readers)) { count = i; break; } diff --git a/src/lua/lua_ifc.m b/src/lua/lua_ifc.m index b73e8c10609854f7c69799e30c3cfdd204de3cb9..3808c6e5d7ac5528790f30a398d1a2e0f794165c 100644 --- a/src/lua/lua_ifc.m +++ b/src/lua/lua_ifc.m @@ -44,16 +44,16 @@ static const char channel_lib[] = "box.ifc.channel"; /******************** semaphore ***************************/ static int -lbox_fiber_semaphore(struct lua_State *L) +lbox_ifc_semaphore(struct lua_State *L) { say_info(":%s()", __func__); if (lua_gettop(L) != 1 || !lua_isnumber(L, 1)) luaL_error(L, "fiber.semaphore(count): bad arguments"); - struct fiber_semaphore *sm = fiber_semaphore_alloc(); + struct ifc_semaphore *sm = ifc_semaphore_alloc(); if (!sm) luaL_error(L, "fiber.semaphore: Not enough memory"); - fiber_semaphore_init(sm, lua_tointeger(L, -1)); + ifc_semaphore_init(sm, lua_tointeger(L, -1)); @@ -64,30 +64,30 @@ lbox_fiber_semaphore(struct lua_State *L) return 1; } -static inline struct fiber_semaphore * +static inline struct ifc_semaphore * lbox_check_semaphore(struct lua_State *L, int narg) { return * (void **) luaL_checkudata(L, narg, semaphore_lib); } static int -lbox_fiber_semaphore_up(struct lua_State *L) +lbox_ifc_semaphore_up(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: semaphore:up()"); - struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); - fiber_semaphore_up(sm); - lua_pushnumber(L, fiber_semaphore_counter(sm)); + struct ifc_semaphore *sm = lbox_check_semaphore(L, -1); + ifc_semaphore_up(sm); + lua_pushnumber(L, ifc_semaphore_counter(sm)); return 1; } static int -lbox_fiber_semaphore_down(struct lua_State *L) +lbox_ifc_semaphore_down(struct lua_State *L) { lua_Number timeout; - struct fiber_semaphore *sm; + struct ifc_semaphore *sm; int top = lua_gettop(L); @@ -105,7 +105,7 @@ lbox_fiber_semaphore_down(struct lua_State *L) sm = lbox_check_semaphore(L, -top); - if (fiber_semaphore_down_timeout(sm, timeout) == 0) + if (ifc_semaphore_down_timeout(sm, timeout) == 0) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -113,38 +113,38 @@ lbox_fiber_semaphore_down(struct lua_State *L) } static int -lbox_fiber_semaphore_counter(struct lua_State *L) +lbox_ifc_semaphore_counter(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: semaphore()"); - struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); + struct ifc_semaphore *sm = lbox_check_semaphore(L, -1); - lua_pushnumber(L, fiber_semaphore_counter(sm)); + lua_pushnumber(L, ifc_semaphore_counter(sm)); return 1; } static int -lbox_fiber_semaphore_gc(struct lua_State *L) +lbox_ifc_semaphore_gc(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) return 0; - struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); + struct ifc_semaphore *sm = lbox_check_semaphore(L, -1); free(sm); return 0; } static int -lbox_fiber_trydown(struct lua_State *L) +lbox_ifc_trydown(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: semaphore:trydown()"); - struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); - if (fiber_semaphore_trydown(sm) == 0) + struct ifc_semaphore *sm = lbox_check_semaphore(L, -1); + if (ifc_semaphore_trydown(sm) == 0) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -154,13 +154,13 @@ lbox_fiber_trydown(struct lua_State *L) /******************** mutex ***************************/ static int -lbox_fiber_mutex(struct lua_State *L) +lbox_ifc_mutex(struct lua_State *L) { say_info(":%s()", __func__); - struct fiber_mutex *m = fiber_mutex_alloc(); + struct ifc_mutex *m = ifc_mutex_alloc(); if (!m) luaL_error(L, "fiber.mutex: Not enough memory"); - fiber_mutex_init(m); + ifc_mutex_init(m); void **ptr = lua_newuserdata(L, sizeof(void *)); luaL_getmetatable(L, mutex_lib); @@ -169,7 +169,7 @@ lbox_fiber_mutex(struct lua_State *L) return 1; } -static inline struct fiber_mutex * +static inline struct ifc_mutex * lbox_check_mutex(struct lua_State *L, int narg) { return * (void **) luaL_checkudata(L, narg, mutex_lib); @@ -177,34 +177,34 @@ lbox_check_mutex(struct lua_State *L, int narg) static int -lbox_fiber_mutex_gc(struct lua_State *L) +lbox_ifc_mutex_gc(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) return 0; - struct fiber_mutex *m = lbox_check_mutex(L, -1); + struct ifc_mutex *m = lbox_check_mutex(L, -1); free(m); return 0; } static int -lbox_fiber_mutex_locked(struct lua_State *L) +lbox_ifc_mutex_locked(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: mutex()"); - struct fiber_mutex *m = lbox_check_mutex(L, -1); + struct ifc_mutex *m = lbox_check_mutex(L, -1); - lua_pushboolean(L, fiber_mutex_islocked(m)); + lua_pushboolean(L, ifc_mutex_islocked(m)); return 1; } static int -lbox_fiber_mutex_lock(struct lua_State *L) +lbox_ifc_mutex_lock(struct lua_State *L) { int top = lua_gettop(L); if (top < 1 || top > 2 || !lua_isuserdata(L, -top)) luaL_error(L, "usage: mutex:lock([timeout])"); - struct fiber_mutex *m = lbox_check_mutex(L, -top); + struct ifc_mutex *m = lbox_check_mutex(L, -top); lua_Number timeout; if (top == 2) { if (!lua_isnumber(L, -1)) @@ -214,7 +214,7 @@ lbox_fiber_mutex_lock(struct lua_State *L) timeout = 0; } - if (fiber_mutex_lock_timeout(m, timeout) == 0) + if (ifc_mutex_lock_timeout(m, timeout) == 0) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -223,25 +223,25 @@ lbox_fiber_mutex_lock(struct lua_State *L) static int -lbox_fiber_mutex_unlock(struct lua_State *L) +lbox_ifc_mutex_unlock(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: mutex:unlock()"); - struct fiber_mutex *m = lbox_check_mutex(L, -1); + struct ifc_mutex *m = lbox_check_mutex(L, -1); - fiber_mutex_unlock(m); + ifc_mutex_unlock(m); return 0; } static int -lbox_fiber_mutex_trylock(struct lua_State *L) +lbox_ifc_mutex_trylock(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: mutex:trylock()"); - struct fiber_mutex *m = lbox_check_mutex(L, -1); + struct ifc_mutex *m = lbox_check_mutex(L, -1); - lua_pushboolean(L, fiber_mutex_trylock(m) == 0); + lua_pushboolean(L, ifc_mutex_trylock(m) == 0); return 1; } @@ -249,7 +249,7 @@ lbox_fiber_mutex_trylock(struct lua_State *L) /******************** channel ***************************/ static int -lbox_fiber_channel(struct lua_State *L) +lbox_ifc_channel(struct lua_State *L) { say_info(":%s()", __func__); @@ -263,10 +263,10 @@ lbox_fiber_channel(struct lua_State *L) if (size < 0) luaL_error(L, "fiber.channel(size): negative size"); } - struct fiber_channel *ch = fiber_channel_alloc(size); + struct ifc_channel *ch = ifc_channel_alloc(size); if (!ch) luaL_error(L, "fiber.channel: Not enough memory"); - fiber_channel_init(ch); + ifc_channel_init(ch); void **ptr = lua_newuserdata(L, sizeof(void *)); luaL_getmetatable(L, channel_lib); @@ -280,7 +280,7 @@ lbox_fiber_channel(struct lua_State *L) return 1; } -static inline struct fiber_channel * +static inline struct ifc_channel * lbox_check_channel(struct lua_State *L, int narg) { return * (void **) luaL_checkudata(L, narg, channel_lib); @@ -288,23 +288,23 @@ lbox_check_channel(struct lua_State *L, int narg) static int -lbox_fiber_channel_gc(struct lua_State *L) +lbox_ifc_channel_gc(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) return 0; - struct fiber_channel *ch = lbox_check_channel(L, -1); + struct ifc_channel *ch = lbox_check_channel(L, -1); free(ch); return 0; } static int -lbox_fiber_channel_isfull(struct lua_State *L) +lbox_ifc_channel_isfull(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: channel:is_full()"); - struct fiber_channel *ch = lbox_check_channel(L, -1); - if (fiber_channel_isfull(ch)) + struct ifc_channel *ch = lbox_check_channel(L, -1); + if (ifc_channel_isfull(ch)) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -312,12 +312,12 @@ lbox_fiber_channel_isfull(struct lua_State *L) } static int -lbox_fiber_channel_isempty(struct lua_State *L) +lbox_ifc_channel_isempty(struct lua_State *L) { if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) luaL_error(L, "usage: channel:is_empty()"); - struct fiber_channel *ch = lbox_check_channel(L, -1); - if (fiber_channel_isempty(ch)) + struct ifc_channel *ch = lbox_check_channel(L, -1); + if (ifc_channel_isempty(ch)) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -325,11 +325,11 @@ lbox_fiber_channel_isempty(struct lua_State *L) } static int -lbox_fiber_channel_put(struct lua_State *L) +lbox_ifc_channel_put(struct lua_State *L) { double timeout; int top = lua_gettop(L); - struct fiber_channel *ch; + struct ifc_channel *ch; switch(top) { case 2: @@ -370,7 +370,7 @@ lbox_fiber_channel_put(struct lua_State *L) lua_settop(L, top); - if (fiber_channel_put_timeout(ch, (void *)rid, timeout) == 0) + if (ifc_channel_put_timeout(ch, (void *)rid, timeout) == 0) lua_pushboolean(L, 1); else lua_pushboolean(L, 0); @@ -378,7 +378,7 @@ lbox_fiber_channel_put(struct lua_State *L) } static int -lbox_fiber_channel_get(struct lua_State *L) +lbox_ifc_channel_get(struct lua_State *L) { int top = lua_gettop(L); double timeout; @@ -396,9 +396,9 @@ lbox_fiber_channel_get(struct lua_State *L) timeout = 0; } - struct fiber_channel *ch = lbox_check_channel(L, 1); + struct ifc_channel *ch = lbox_check_channel(L, 1); - lua_Integer rid = (lua_Integer)fiber_channel_get_timeout(ch, timeout); + lua_Integer rid = (lua_Integer)ifc_channel_get_timeout(ch, timeout); if (!rid) { lua_pushnil(L); @@ -428,15 +428,18 @@ lbox_fiber_channel_get(struct lua_State *L) static int -lbox_fiber_channel_broadcast(struct lua_State *L) +lbox_ifc_channel_broadcast(struct lua_State *L) { - struct fiber_channel *ch; + struct ifc_channel *ch; if (lua_gettop(L) != 2) luaL_error(L, "usage: channel:broadcast(variable)"); ch = lbox_check_channel(L, -2); + if (!ifc_channel_has_readers(ch)) + return lbox_ifc_channel_put(L); + lua_getmetatable(L, -2); /* 3 */ lua_pushstring(L, "broadcast_message"); /* 4 */ @@ -449,7 +452,7 @@ lbox_fiber_channel_broadcast(struct lua_State *L) lua_pushvalue(L, 2); lua_settable(L, 3); - int count = fiber_channel_broadcast(ch, (void *)1); + int count = ifc_channel_broadcast(ch, (void *)1); lua_settable(L, 3); @@ -460,16 +463,35 @@ lbox_fiber_channel_broadcast(struct lua_State *L) } +static int +lbox_ifc_channel_has_readers(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "usage: channel:has_readers()"); + struct ifc_channel *ch = lbox_check_channel(L, -1); + lua_pushboolean(L, ifc_channel_has_readers(ch)); + return 1; +} + +static int +lbox_ifc_channel_has_writers(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "usage: channel:has_writers()"); + struct ifc_channel *ch = lbox_check_channel(L, -1); + lua_pushboolean(L, ifc_channel_has_writers(ch)); + return 1; +} void ifc_lua_init(struct lua_State *L) { static const struct luaL_reg semaphore_meta[] = { - {"__call", lbox_fiber_semaphore_counter}, - {"__gc", lbox_fiber_semaphore_gc}, - {"up", lbox_fiber_semaphore_up}, - {"down", lbox_fiber_semaphore_down}, - {"trydown", lbox_fiber_trydown}, + {"__call", lbox_ifc_semaphore_counter}, + {"__gc", lbox_ifc_semaphore_gc}, + {"up", lbox_ifc_semaphore_up}, + {"down", lbox_ifc_semaphore_down}, + {"trydown", lbox_ifc_trydown}, {NULL, NULL} }; @@ -477,23 +499,25 @@ ifc_lua_init(struct lua_State *L) static const struct luaL_reg mutex_meta[] = { - {"__gc", lbox_fiber_mutex_gc}, - {"__call", lbox_fiber_mutex_locked}, - {"locked", lbox_fiber_mutex_locked}, - {"lock", lbox_fiber_mutex_lock}, - {"unlock", lbox_fiber_mutex_unlock}, - {"trylock", lbox_fiber_mutex_trylock}, + {"__gc", lbox_ifc_mutex_gc}, + {"__call", lbox_ifc_mutex_locked}, + {"locked", lbox_ifc_mutex_locked}, + {"lock", lbox_ifc_mutex_lock}, + {"unlock", lbox_ifc_mutex_unlock}, + {"trylock", lbox_ifc_mutex_trylock}, {NULL, NULL} }; tarantool_lua_register_type(L, mutex_lib, mutex_meta); static const struct luaL_reg channel_meta[] = { - {"__gc", lbox_fiber_channel_gc}, - {"is_full", lbox_fiber_channel_isfull}, - {"is_empty", lbox_fiber_channel_isempty}, - {"put", lbox_fiber_channel_put}, - {"get", lbox_fiber_channel_get}, - {"broadcast", lbox_fiber_channel_broadcast}, + {"__gc", lbox_ifc_channel_gc}, + {"is_full", lbox_ifc_channel_isfull}, + {"is_empty", lbox_ifc_channel_isempty}, + {"put", lbox_ifc_channel_put}, + {"get", lbox_ifc_channel_get}, + {"broadcast", lbox_ifc_channel_broadcast}, + {"has_readers", lbox_ifc_channel_has_readers}, + {"has_writers", lbox_ifc_channel_has_writers}, {NULL, NULL} }; tarantool_lua_register_type(L, channel_lib, channel_meta); @@ -501,9 +525,9 @@ ifc_lua_init(struct lua_State *L) static const struct luaL_reg ifc_meta[] = { - {"semaphore", lbox_fiber_semaphore}, - {"mutex", lbox_fiber_mutex}, - {"channel", lbox_fiber_channel}, + {"semaphore", lbox_ifc_semaphore}, + {"mutex", lbox_ifc_mutex}, + {"channel", lbox_ifc_channel}, {NULL, NULL} }; diff --git a/test/box/ifc.result b/test/box/ifc.result index b80dee8700c01abc2ef3ded7bb361d43ff127095..56be613355dac9e58b6900193401665be022dd55 100644 --- a/test/box/ifc.result +++ b/test/box/ifc.result @@ -248,9 +248,25 @@ lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end lua box.fiber.sleep(.5) --- ... +lua ch:has_readers() +--- + - true +... +lua ch:has_writers() +--- + - false +... lua box.fiber.cancel(tfbr) --- ... +lua ch:has_readers() +--- + - false +... +lua ch:has_writers() +--- + - false +... lua ch:put(box.info.pid) --- - true @@ -281,13 +297,21 @@ lua for i, v in pairs(buffer) do print(v) end 9 10 ... +lua ch:is_empty() +--- + - true +... lua ch:broadcast() --- error: 'usage: channel:broadcast(variable)' ... lua ch:broadcast(123) --- - - 0 + - true +... +lua ch:get() +--- + - 123 ... lua ch:is_full() --- diff --git a/test/box/ifc.test b/test/box/ifc.test index 9eb11ba116586ed405fb82319509173a0d00091b..0ab55c5241435745406adf5abeb1e08bab90b30b 100644 --- a/test/box/ifc.test +++ b/test/box/ifc.test @@ -88,15 +88,21 @@ exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true exec admin "lua box.fiber.resume(tfbr)" exec admin "lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end" exec admin "lua box.fiber.sleep(.5)" +exec admin "lua ch:has_readers()" +exec admin "lua ch:has_writers()" exec admin "lua box.fiber.cancel(tfbr)" +exec admin "lua ch:has_readers()" +exec admin "lua ch:has_writers()" exec admin "lua ch:put(box.info.pid)" exec admin "lua ch:is_full()" exec admin "lua ch:is_empty()" exec admin "lua ch:get(box.info.pid) == box.info.pid" exec admin "lua for i, v in pairs(buffer) do print(v) end" +exec admin "lua ch:is_empty()" exec admin "lua ch:broadcast()" exec admin "lua ch:broadcast(123)" +exec admin "lua ch:get()" exec admin "lua ch:is_full()" exec admin "lua ch:is_empty()"