Skip to content
Snippets Groups Projects
Commit c754c7df authored by Dmitry E. Oboukhov's avatar Dmitry E. Oboukhov
Browse files

renamed all fiber_* functions to ifc_*

all functions use rlist: avoid spare foreaches
parent b152776a
No related branches found
No related tags found
No related merge requests found
...@@ -40,16 +40,19 @@ ...@@ -40,16 +40,19 @@
#include "exception.h" #include "exception.h"
#include "palloc.h" #include "palloc.h"
#include <rlist.h>
#define FIBER_NAME_MAXLEN 32 #define FIBER_NAME_MAXLEN 32
#define FIBER_READING_INBOX 0x1 #define FIBER_READING_INBOX (1 << 0)
/** This fiber can be cancelled synchronously. */ /** This fiber can be cancelled synchronously. */
#define FIBER_CANCELLABLE 0x2 #define FIBER_CANCELLABLE (1 << 1)
/** Indicates that a fiber has been cancelled. */ /** Indicates that a fiber has been cancelled. */
#define FIBER_CANCEL 0x4 #define FIBER_CANCEL (1 << 2)
/** This fiber was created via stored procedures API. */ /** This fiber was created via stored procedures API. */
#define FIBER_USER_MODE 0x8 #define FIBER_USER_MODE (1 << 3)
/** This fiber was marked as ready for wake up */
#define FIBER_READY (1 << 4)
/** This is thrown by fiber_* API calls when the fiber is /** This is thrown by fiber_* API calls when the fiber is
* cancelled. * cancelled.
...@@ -59,7 +62,6 @@ ...@@ -59,7 +62,6 @@
@end @end
struct fiber { struct fiber {
ev_async async;
#ifdef ENABLE_BACKTRACE #ifdef ENABLE_BACKTRACE
void *last_stack_frame; void *last_stack_frame;
#endif #endif
...@@ -73,7 +75,9 @@ struct fiber { ...@@ -73,7 +75,9 @@ struct fiber {
ev_child cw; ev_child cw;
SLIST_ENTRY(fiber) link, zombie_link; SLIST_ENTRY(fiber) link, zombie_link;
STAILQ_ENTRY(fiber) ifc; /* inter fiber communication */
struct rlist ready; /* wakeup queue */
struct rlist ifc; /* inter fiber communication */
/* ASCIIZ name of this fiber. */ /* ASCIIZ name of this fiber. */
char name[FIBER_NAME_MAXLEN]; char name[FIBER_NAME_MAXLEN];
...@@ -123,7 +127,6 @@ bool fiber_setcancellable(bool enable); ...@@ -123,7 +127,6 @@ bool fiber_setcancellable(bool enable);
void fiber_sleep(ev_tstamp s); void fiber_sleep(ev_tstamp s);
struct tbuf; struct tbuf;
void fiber_info(struct tbuf *out); void fiber_info(struct tbuf *out);
void void fiber_schedule(ev_watcher *watcher, int event __attribute__((unused)));
fiber_schedule(ev_watcher *watcher, int event __attribute__((unused)));
#endif /* TARANTOOL_FIBER_H_INCLUDED */ #endif /* TARANTOOL_FIBER_H_INCLUDED */
...@@ -31,33 +31,33 @@ ...@@ -31,33 +31,33 @@
#define TARANTOOL_IFC_H_INCLUDED #define TARANTOOL_IFC_H_INCLUDED
#include <tarantool_ev.h> #include <tarantool_ev.h>
#include "third_party/queue.h"
/** /**
@brief SEMAPHORES @brief SEMAPHORES
*/ */
struct fiber_semaphore; struct ifc_semaphore;
/** /**
@brief allocator @brief allocator
@return malloced semaphore @return malloced semaphore
@code @code
struct fiber_semaphore *s = fiber_semaphore_alloc(); struct ifc_semaphore *s = ifc_semaphore_alloc();
@endcode @endcode
*/ */
struct fiber_semaphore *fiber_semaphore_alloc(void); struct ifc_semaphore *ifc_semaphore_alloc(void);
/** /**
@brief init semaphore @brief init semaphore
@param s semaphore @param s semaphore
@param cnt initial value of semaphore @param cnt initial value of semaphore
@code @code
struct fiber_semaphore *s = fiber_semaphore_alloc(); struct ifc_semaphore *s = ifc_semaphore_alloc();
fiber_semaphore_init(s); ifc_semaphore_init(s);
@endcode @endcode
*/ */
void fiber_semaphore_init(struct fiber_semaphore *s, int cnt); void ifc_semaphore_init(struct ifc_semaphore *s, int cnt);
/** /**
@brief down semafore @brief down semafore
...@@ -66,25 +66,25 @@ fiber (if semaphore counter < 0) until the other fiber ...@@ -66,25 +66,25 @@ fiber (if semaphore counter < 0) until the other fiber
increments the counter. increments the counter.
@param s semaphore @param s semaphore
@code @code
fiber_semaphore_down(s); ifc_semaphore_down(s);
// do something // do something
fiber_semaphore_up(s); ifc_semaphore_up(s);
@endcode @endcode
*/ */
void fiber_semaphore_down(struct fiber_semaphore *s); void ifc_semaphore_down(struct ifc_semaphore *s);
/** /**
@brief up semaphore @brief up semaphore
@detail increment semaphore's counter and unlock one locked fiber @detail increment semaphore's counter and unlock one locked fiber
@param s semaphore @param s semaphore
@code @code
fiber_semaphore_down(s); ifc_semaphore_down(s);
// do something // do something
fiber_semaphore_up(s); ifc_semaphore_up(s);
@endcode @endcode
*/ */
void fiber_semaphore_up(struct fiber_semaphore *s); void ifc_semaphore_up(struct ifc_semaphore *s);
/** /**
@brief down semaphore in timeout @brief down semaphore in timeout
...@@ -96,7 +96,7 @@ increments the counter or timeout exceeded. ...@@ -96,7 +96,7 @@ increments the counter or timeout exceeded.
@return 0 if success @return 0 if success
@return ETIMEDOUT if timeout exceeded @return ETIMEDOUT if timeout exceeded
*/ */
int fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout); int ifc_semaphore_down_timeout(struct ifc_semaphore *s, ev_tstamp timeout);
/** /**
@brief try to down semaphore @brief try to down semaphore
...@@ -104,14 +104,14 @@ int fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout); ...@@ -104,14 +104,14 @@ int fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout);
@param s semaphore @param s semaphore
@return 0 if success @return 0 if success
*/ */
int fiber_semaphore_trydown(struct fiber_semaphore *s); int ifc_semaphore_trydown(struct ifc_semaphore *s);
/** /**
@brief get semaphore's counter @brief get semaphore's counter
@param s semaphore @param s semaphore
@return semaphore's counter @return semaphore's counter
*/ */
int fiber_semaphore_counter(struct fiber_semaphore *s); int ifc_semaphore_counter(struct ifc_semaphore *s);
...@@ -119,26 +119,26 @@ int fiber_semaphore_counter(struct fiber_semaphore *s); ...@@ -119,26 +119,26 @@ int fiber_semaphore_counter(struct fiber_semaphore *s);
@brief MUTEXES @brief MUTEXES
*/ */
struct fiber_mutex; struct ifc_mutex;
/** /**
@brief allocate new mutex @brief allocate new mutex
@return malloced mutex structure @return malloced mutex structure
@code @code
struct fiber_mutex *m = fiber_mutex_alloc(); struct ifc_mutex *m = ifc_mutex_alloc();
@endcode @endcode
*/ */
struct fiber_mutex *fiber_mutex_alloc(void); struct ifc_mutex *ifc_mutex_alloc(void);
/** /**
@brief init mutex @brief init mutex
@param m mutex @param m mutex
@code @code
struct fiber_mutex *m = fiber_mutex_alloc(); struct ifc_mutex *m = ifc_mutex_alloc();
fiber_mutex_init(m); ifc_mutex_init(m);
@endcode @endcode
*/ */
void fiber_mutex_init(struct fiber_mutex *m); void ifc_mutex_init(struct ifc_mutex *m);
/** /**
@brief lock mutex @brief lock mutex
...@@ -146,12 +146,12 @@ void fiber_mutex_init(struct fiber_mutex *m); ...@@ -146,12 +146,12 @@ void fiber_mutex_init(struct fiber_mutex *m);
the other fiber unlock the mutex. the other fiber unlock the mutex.
@param m mutex @param m mutex
@code @code
fiber_mutex_lock(m); ifc_mutex_lock(m);
// do something // do something
fiber_mutex_unlock(m); ifc_mutex_unlock(m);
@endcode @endcode
*/ */
void fiber_mutex_lock(struct fiber_mutex *m); void ifc_mutex_lock(struct ifc_mutex *m);
/** /**
@brief lock mutex in timeout @brief lock mutex in timeout
...@@ -161,24 +161,24 @@ unlock the mutex. ...@@ -161,24 +161,24 @@ unlock the mutex.
@param mutex @param mutex
@param timeout @param timeout
@code @code
fiber_mutex_lock(m); ifc_mutex_lock(m);
// do something // do something
fiber_mutex_unlock(m); ifc_mutex_unlock(m);
@endcode @endcode
*/ */
int fiber_mutex_lock_timeout(struct fiber_mutex *m, ev_tstamp timeout); int ifc_mutex_lock_timeout(struct ifc_mutex *m, ev_tstamp timeout);
/** /**
@brief unlock mutex @brief unlock mutex
@detail unlock one locked fiber @detail unlock one locked fiber
@param mutex @param mutex
@code @code
fiber_mutex_lock(m); ifc_mutex_lock(m);
// do something // do something
fiber_mutex_unlock(m); ifc_mutex_unlock(m);
@endcode @endcode
*/ */
void fiber_mutex_unlock(struct fiber_mutex *m); void ifc_mutex_unlock(struct ifc_mutex *m);
/** /**
@brief try to lock mutex @brief try to lock mutex
...@@ -186,7 +186,7 @@ void fiber_mutex_unlock(struct fiber_mutex *m); ...@@ -186,7 +186,7 @@ void fiber_mutex_unlock(struct fiber_mutex *m);
@return 0 if mutex locked @return 0 if mutex locked
@return ETIMEDOUT if mutex is already locked @return ETIMEDOUT if mutex is already locked
*/ */
int fiber_mutex_trylock(struct fiber_mutex *m); int ifc_mutex_trylock(struct ifc_mutex *m);
/** /**
@brief check if mutex is locked @brief check if mutex is locked
...@@ -194,34 +194,34 @@ int fiber_mutex_trylock(struct fiber_mutex *m); ...@@ -194,34 +194,34 @@ int fiber_mutex_trylock(struct fiber_mutex *m);
@return 0 if mutex is free @return 0 if mutex is free
@return 1 if mutex is locked @return 1 if mutex is locked
*/ */
int fiber_mutex_islocked(struct fiber_mutex *m); int ifc_mutex_islocked(struct ifc_mutex *m);
/** /**
@brief CHANNELS @brief CHANNELS
*/ */
struct ifc_channel;
struct fiber_channel;
/** /**
@brief allocator @brief allocator
@param size @param size
@return malloced channel (or NULL) @return malloced channel (or NULL)
@code @code
struct fiber_channel *ch = fiber_channel_alloc(10); struct ifc_channel *ch = ifc_channel_alloc(10);
@endcode @endcode
*/ */
struct fiber_channel *fiber_channel_alloc(unsigned size); struct ifc_channel *ifc_channel_alloc(unsigned size);
/** /**
@brief init channel @brief init channel
@param channel @param channel
@code @code
struct fiber_channel *ch = fiber_channel_alloc(10); struct ifc_channel *ch = ifc_channel_alloc(10);
fiber_channel_init(ch); ifc_channel_init(ch);
@endcode @endcode
*/ */
void fiber_channel_init(struct fiber_channel *ch); void ifc_channel_init(struct ifc_channel *ch);
/** /**
@brief put data into channel @brief put data into channel
...@@ -229,30 +229,29 @@ void fiber_channel_init(struct fiber_channel *ch); ...@@ -229,30 +229,29 @@ void fiber_channel_init(struct fiber_channel *ch);
@param channel @param channel
@param data @param data
@code @code
fiber_channel_put(ch, "message"); ifc_channel_put(ch, "message");
@endcode @endcode
*/ */
void fiber_channel_put(struct fiber_channel *ch, void *data); void ifc_channel_put(struct ifc_channel *ch, void *data);
/** /**
@brief get data from channel @brief get data from channel
@detail lock current fiber if channel is empty @detail lock current fiber if channel is empty
@param channel @param channel
@return data that was put into channel by fiber_channel_put @return data that was put into channel by ifc_channel_put
@code @code
char *msg = fiber_channel_get(ch); char *msg = ifc_channel_get(ch);
@endcode @endcode
*/ */
void *fiber_channel_get(struct fiber_channel *ch); void *ifc_channel_get(struct ifc_channel *ch);
/** /**
@brief wake up all fibers that sleep by fiber_channel_get and send message to them @brief wake up all fibers that sleep by ifc_channel_get and send message to them
@param channel @param channel
@param data @param data
@return count of fibers received the message @return count of fibers received the message
*/ */
int fiber_channel_broadcast(struct fiber_channel *ch, void *data); int ifc_channel_broadcast(struct ifc_channel *ch, void *data);
/** /**
@brief check if channel is empty @brief check if channel is empty
...@@ -260,11 +259,11 @@ int fiber_channel_broadcast(struct fiber_channel *ch, void *data); ...@@ -260,11 +259,11 @@ int fiber_channel_broadcast(struct fiber_channel *ch, void *data);
@return 1 (TRUE) if channel is empty @return 1 (TRUE) if channel is empty
@return 0 otherwise @return 0 otherwise
@code @code
if (!fiber_channel_isempty(ch)) if (!ifc_channel_isempty(ch))
char *msg = fiber_channel_get(ch); char *msg = ifc_channel_get(ch);
@endcode @endcode
*/ */
int fiber_channel_isempty(struct fiber_channel *ch); int ifc_channel_isempty(struct ifc_channel *ch);
/** /**
@brief check if channel is full @brief check if channel is full
...@@ -272,11 +271,12 @@ int fiber_channel_isempty(struct fiber_channel *ch); ...@@ -272,11 +271,12 @@ int fiber_channel_isempty(struct fiber_channel *ch);
@return 1 (TRUE) if channel is full @return 1 (TRUE) if channel is full
@return 0 otherwise @return 0 otherwise
@code @code
if (!fiber_channel_isfull(ch)) if (!ifc_channel_isfull(ch))
fiber_channel_put(ch, "message"); ifc_channel_put(ch, "message");
@endcode @endcode
*/ */
int fiber_channel_isfull(struct fiber_channel *ch);
int ifc_channel_isfull(struct ifc_channel *ch);
/** /**
@brief put data into channel in timeout @brief put data into channel in timeout
...@@ -286,14 +286,15 @@ int fiber_channel_isfull(struct fiber_channel *ch); ...@@ -286,14 +286,15 @@ int fiber_channel_isfull(struct fiber_channel *ch);
@return 0 if success @return 0 if success
@return ETIMEDOUT if timeout exceeded @return ETIMEDOUT if timeout exceeded
@code @code
if (fiber_channel_put_timeout(ch, "message", 0.25) == 0) if (ifc_channel_put_timeout(ch, "message", 0.25) == 0)
return "ok"; return "ok";
else else
return "timeout exceeded"; return "timeout exceeded";
@endcode @endcode
*/ */
int fiber_channel_put_timeout(struct fiber_channel *ch, int
void *data, ev_tstamp timeout); ifc_channel_put_timeout(struct ifc_channel *ch, void *data, ev_tstamp timeout);
/** /**
@brief get data into channel in timeout @brief get data into channel in timeout
@param channel @param channel
...@@ -302,13 +303,26 @@ int fiber_channel_put_timeout(struct fiber_channel *ch, ...@@ -302,13 +303,26 @@ int fiber_channel_put_timeout(struct fiber_channel *ch,
@return NULL if timeout exceeded @return NULL if timeout exceeded
@code @code
do { do {
char *msg = fiber_channel_get_timeout(ch, 0.5); char *msg = ifc_channel_get_timeout(ch, 0.5);
printf("message: %p\n", msg); printf("message: %p\n", msg);
} until(msg); } until(msg);
return msg; return msg;
@endcode @endcode
*/ */
void *fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout); void *ifc_channel_get_timeout(struct ifc_channel *ch, ev_tstamp timeout);
/**
@brief return true if channel has reader fibers that wait data
@param channel
*/
int ifc_channel_has_readers(struct ifc_channel *ch);
/**
@brief return true if channel has writer fibers that wait data
@param channel
*/
int ifc_channel_has_writers(struct ifc_channel *ch);
#endif /* TARANTOOL_IFC_H_INCLUDED */ #endif /* TARANTOOL_IFC_H_INCLUDED */
...@@ -151,6 +151,13 @@ rlist_empty(struct rlist *item) ...@@ -151,6 +151,13 @@ rlist_empty(struct rlist *item)
#define rlist_add_tail_entry(head, item, member) \ #define rlist_add_tail_entry(head, item, member) \
rlist_add_tail((head), &(item)->member) rlist_add_tail((head), &(item)->member)
/**
* delete entry from list
*/
#define rlist_del_entry(item, member) \
rlist_del(&((item)->member))
/** /**
* foreach through list * foreach through list
*/ */
......
...@@ -41,10 +41,12 @@ ...@@ -41,10 +41,12 @@
#include <stat.h> #include <stat.h>
#include <pickle.h> #include <pickle.h>
#include "iobuf.h" #include "iobuf.h"
#include <rlist.h>
@implementation FiberCancelException @implementation FiberCancelException
@end @end
enum { FIBER_CALL_STACK = 16 }; enum { FIBER_CALL_STACK = 16 };
static struct fiber sched; static struct fiber sched;
...@@ -55,6 +57,9 @@ static __thread uint32_t last_used_fid; ...@@ -55,6 +57,9 @@ static __thread uint32_t last_used_fid;
static __thread struct mh_i32ptr_t *fibers_registry; static __thread struct mh_i32ptr_t *fibers_registry;
__thread SLIST_HEAD(, fiber) fibers, zombie_fibers; __thread SLIST_HEAD(, fiber) fibers, zombie_fibers;
static RLIST_HEAD(ready_fibers);
static ev_async ready_async;
static void static void
update_last_stack_frame(struct fiber *fiber) update_last_stack_frame(struct fiber *fiber)
{ {
...@@ -80,6 +85,8 @@ fiber_call(struct fiber *callee, ...) ...@@ -80,6 +85,8 @@ fiber_call(struct fiber *callee, ...)
callee->csw++; callee->csw++;
fiber->flags &= ~FIBER_READY;
va_start(fiber->f_data, callee); va_start(fiber->f_data, callee);
coro_transfer(&caller->coro.ctx, &callee->coro.ctx); coro_transfer(&caller->coro.ctx, &callee->coro.ctx);
va_end(fiber->f_data); va_end(fiber->f_data);
...@@ -94,7 +101,12 @@ fiber_call(struct fiber *callee, ...) ...@@ -94,7 +101,12 @@ fiber_call(struct fiber *callee, ...)
void void
fiber_wakeup(struct fiber *f) fiber_wakeup(struct fiber *f)
{ {
ev_async_send(&f->async); if (f->flags & FIBER_READY)
return;
f->flags |= FIBER_READY;
if (rlist_empty(&ready_fibers))
ev_async_send(&ready_async);
rlist_add_tail(&ready_fibers, &f->ready);
} }
/** Cancel the subject fiber. /** Cancel the subject fiber.
...@@ -247,6 +259,17 @@ fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))) ...@@ -247,6 +259,17 @@ fiber_schedule(ev_watcher *watcher, int event __attribute__((unused)))
fiber_call(watcher->data); fiber_call(watcher->data);
} }
static void
fiber_ready_async(void)
{
while(!rlist_empty(&ready_fibers)) {
struct fiber *f =
rlist_first_entry(&ready_fibers, struct fiber, ready);
rlist_del_entry(f, ready);
fiber_call(f);
}
}
struct fiber * struct fiber *
fiber_find(int fid) fiber_find(int fid)
{ {
...@@ -373,15 +396,15 @@ fiber_create(const char *name, void (*f) (va_list)) ...@@ -373,15 +396,15 @@ fiber_create(const char *name, void (*f) (va_list))
fiber->gc_pool = palloc_create_pool(""); fiber->gc_pool = palloc_create_pool("");
fiber_alloc(fiber); fiber_alloc(fiber);
ev_async_init(&fiber->async, (void *)fiber_schedule);
ev_async_start(&fiber->async);
ev_init(&fiber->timer, (void *)fiber_schedule); ev_init(&fiber->timer, (void *)fiber_schedule);
ev_init(&fiber->cw, (void *)fiber_schedule); ev_init(&fiber->cw, (void *)fiber_schedule);
fiber->async.data = fiber->timer.data = fiber->cw.data = fiber; fiber->timer.data = fiber->cw.data = fiber;
SLIST_INSERT_HEAD(&fibers, fiber, link); SLIST_INSERT_HEAD(&fibers, fiber, link);
rlist_init(&fiber->ready);
} }
fiber->f = f; fiber->f = f;
while (++last_used_fid <= 100) ; /* fids from 0 to 100 are reserved */ while (++last_used_fid <= 100) ; /* fids from 0 to 100 are reserved */
fiber->fid = last_used_fid; fiber->fid = last_used_fid;
...@@ -407,7 +430,7 @@ fiber_destroy(struct fiber *f) ...@@ -407,7 +430,7 @@ fiber_destroy(struct fiber *f)
if (strcmp(f->name, "sched") == 0) if (strcmp(f->name, "sched") == 0)
return; return;
ev_async_stop(&f->async); rlist_del(&f->ready);
palloc_destroy_pool(f->gc_pool); palloc_destroy_pool(f->gc_pool);
tarantool_coro_destroy(&f->coro); tarantool_coro_destroy(&f->coro);
} }
...@@ -460,11 +483,15 @@ fiber_init(void) ...@@ -460,11 +483,15 @@ fiber_init(void)
last_used_fid = 100; last_used_fid = 100;
iobuf_init_readahead(cfg.readahead); iobuf_init_readahead(cfg.readahead);
ev_async_init(&ready_async, (void *)fiber_ready_async);
ev_async_start(&ready_async);
} }
void void
fiber_free(void) fiber_free(void)
{ {
ev_async_stop(&ready_async);
/* Only clean up if initialized. */ /* Only clean up if initialized. */
if (fibers_registry) { if (fibers_registry) {
fiber_destroy_all(); fiber_destroy_all();
......
...@@ -30,38 +30,38 @@ ...@@ -30,38 +30,38 @@
#include "ifc.h" #include "ifc.h"
#include "fiber.h" #include "fiber.h"
#include <stdlib.h> #include <stdlib.h>
#include <rlist.h>
struct ifc_semaphore {
struct fiber_semaphore {
int count; int count;
STAILQ_HEAD(, fiber) fibers, wakeup; struct rlist fibers, wakeup;
}; };
struct fiber_semaphore * struct ifc_semaphore *
fiber_semaphore_alloc(void) ifc_semaphore_alloc(void)
{ {
return malloc(sizeof(struct fiber_semaphore)); return malloc(sizeof(struct ifc_semaphore));
} }
int int
fiber_semaphore_counter(struct fiber_semaphore *s) ifc_semaphore_counter(struct ifc_semaphore *s)
{ {
return s->count; return s->count;
} }
void void
fiber_semaphore_init(struct fiber_semaphore *s, int cnt) ifc_semaphore_init(struct ifc_semaphore *s, int cnt)
{ {
s->count = cnt; s->count = cnt;
STAILQ_INIT(&s->fibers); rlist_init(&s->fibers);
STAILQ_INIT(&s->wakeup); rlist_init(&s->wakeup);
} }
int int
fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) ifc_semaphore_down_timeout(struct ifc_semaphore *s, ev_tstamp timeout)
{ {
int count = --s->count; int count = --s->count;
...@@ -71,7 +71,7 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) ...@@ -71,7 +71,7 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout)
if (timeout < 0) if (timeout < 0)
timeout = 0; timeout = 0;
STAILQ_INSERT_TAIL(&s->fibers, fiber, ifc); rlist_add_tail_entry(&s->fibers, fiber, ifc);
bool cancellable = fiber_setcancellable(true); bool cancellable = fiber_setcancellable(true);
...@@ -89,105 +89,102 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout) ...@@ -89,105 +89,102 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout)
int timeouted = ETIMEDOUT; int timeouted = ETIMEDOUT;
struct fiber *f; struct fiber *f;
STAILQ_FOREACH(f, &s->wakeup, ifc) { rlist_foreach_entry(f, &s->wakeup, ifc) {
if (f != fiber) if (f != fiber)
continue; continue;
STAILQ_REMOVE(&s->wakeup, fiber, fiber, ifc);
timeouted = 0; timeouted = 0;
break; break;
} }
if (timeouted) { if (timeouted)
STAILQ_REMOVE(&s->fibers, fiber, fiber, ifc);
s->count++; s->count++;
}
rlist_del_entry(fiber, ifc);
fiber_testcancel(); fiber_testcancel();
return timeouted; return timeouted;
} }
void void
fiber_semaphore_down(struct fiber_semaphore *s) ifc_semaphore_down(struct ifc_semaphore *s)
{ {
fiber_semaphore_down_timeout(s, 0); ifc_semaphore_down_timeout(s, 0);
} }
void void
fiber_semaphore_up(struct fiber_semaphore *s) ifc_semaphore_up(struct ifc_semaphore *s)
{ {
s->count++; s->count++;
if (STAILQ_EMPTY(&s->fibers)) if (rlist_empty(&s->fibers))
return; return;
/* wake up one fiber */ /* wake up one fiber */
struct fiber *f = STAILQ_FIRST(&s->fibers); struct fiber *f = rlist_first_entry(&s->fibers, struct fiber, ifc);
STAILQ_REMOVE(&s->fibers, f, fiber, ifc); rlist_del_entry(f, ifc);
STAILQ_INSERT_TAIL(&s->wakeup, f, ifc); rlist_add_tail_entry(&s->wakeup, f, ifc);
fiber_wakeup(f); fiber_wakeup(f);
} }
int int
fiber_semaphore_trydown(struct fiber_semaphore *s) ifc_semaphore_trydown(struct ifc_semaphore *s)
{ {
if (s->count <= 0) if (s->count <= 0)
return s->count - 1; return s->count - 1;
fiber_semaphore_down(s); ifc_semaphore_down(s);
return 0; return 0;
} }
struct fiber_mutex { struct ifc_mutex {
struct fiber_semaphore semaphore; struct ifc_semaphore semaphore;
}; };
struct fiber_mutex * struct ifc_mutex *
fiber_mutex_alloc(void) ifc_mutex_alloc(void)
{ {
return malloc(sizeof(struct fiber_mutex)); return malloc(sizeof(struct ifc_mutex));
} }
void void
fiber_mutex_init(struct fiber_mutex *m) ifc_mutex_init(struct ifc_mutex *m)
{ {
fiber_semaphore_init(&m->semaphore, 1); ifc_semaphore_init(&m->semaphore, 1);
} }
void void
fiber_mutex_lock(struct fiber_mutex *m) ifc_mutex_lock(struct ifc_mutex *m)
{ {
fiber_semaphore_down(&m->semaphore); ifc_semaphore_down(&m->semaphore);
} }
int int
fiber_mutex_lock_timeout(struct fiber_mutex *m, ev_tstamp timeout) ifc_mutex_lock_timeout(struct ifc_mutex *m, ev_tstamp timeout)
{ {
return fiber_semaphore_down_timeout(&m->semaphore, timeout); return ifc_semaphore_down_timeout(&m->semaphore, timeout);
} }
void void
fiber_mutex_unlock(struct fiber_mutex *m) ifc_mutex_unlock(struct ifc_mutex *m)
{ {
fiber_semaphore_up(&m->semaphore); ifc_semaphore_up(&m->semaphore);
} }
int int
fiber_mutex_trylock(struct fiber_mutex *m) ifc_mutex_trylock(struct ifc_mutex *m)
{ {
return fiber_semaphore_trydown(&m->semaphore); return ifc_semaphore_trydown(&m->semaphore);
} }
int int
fiber_mutex_islocked(struct fiber_mutex *m) ifc_mutex_islocked(struct ifc_mutex *m)
{ {
return m->semaphore.count <= 0; return m->semaphore.count <= 0;
} }
/**********************************************************************/ /**********************************************************************/
struct fiber_channel { struct ifc_channel {
STAILQ_HEAD(, fiber) readers, writers, wakeup; struct rlist readers, writers, wakeup;
unsigned size; unsigned size;
unsigned beg; unsigned beg;
unsigned count; unsigned count;
...@@ -199,50 +196,50 @@ struct fiber_channel { ...@@ -199,50 +196,50 @@ struct fiber_channel {
} __attribute__((packed)); } __attribute__((packed));
int int
fiber_channel_isempty(struct fiber_channel *ch) ifc_channel_isempty(struct ifc_channel *ch)
{ {
return ch->count == 0; return ch->count == 0;
} }
int int
fiber_channel_isfull(struct fiber_channel *ch) ifc_channel_isfull(struct ifc_channel *ch)
{ {
return ch->count >= ch->size; return ch->count >= ch->size;
} }
struct fiber_channel * struct ifc_channel *
fiber_channel_alloc(unsigned size) ifc_channel_alloc(unsigned size)
{ {
if (!size) if (!size)
size = 1; size = 1;
struct fiber_channel *res = struct ifc_channel *res =
malloc(sizeof(struct fiber_channel) + sizeof(void *) * size); malloc(sizeof(struct ifc_channel) + sizeof(void *) * size);
if (res) if (res)
res->size = size; res->size = size;
return res; return res;
} }
void void
fiber_channel_init(struct fiber_channel *ch) ifc_channel_init(struct ifc_channel *ch)
{ {
ch->beg = ch->count = 0; ch->beg = ch->count = 0;
ch->bcast = NULL; ch->bcast = NULL;
STAILQ_INIT(&ch->readers); rlist_init(&ch->readers);
STAILQ_INIT(&ch->writers); rlist_init(&ch->writers);
STAILQ_INIT(&ch->wakeup); rlist_init(&ch->wakeup);
} }
void * void *
fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) ifc_channel_get_timeout(struct ifc_channel *ch, ev_tstamp timeout)
{ {
if (timeout < 0) if (timeout < 0)
timeout = 0; timeout = 0;
/* channel is empty */ /* channel is empty */
if (!ch->count) { if (!ch->count) {
STAILQ_INSERT_TAIL(&ch->readers, fiber, ifc); rlist_add_tail_entry(&ch->readers, fiber, ifc);
bool cancellable = fiber_setcancellable(true); bool cancellable = fiber_setcancellable(true);
if (timeout) { if (timeout) {
...@@ -255,18 +252,7 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) ...@@ -255,18 +252,7 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
} }
int timeouted = ETIMEDOUT; rlist_del_entry(fiber, ifc);
struct fiber *f;
STAILQ_FOREACH(f, &ch->wakeup, ifc) {
if (f != fiber)
continue;
STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc);
timeouted = 0;
break;
}
if (timeouted)
STAILQ_REMOVE(&ch->readers, fiber, fiber, ifc);
fiber_testcancel(); fiber_testcancel();
fiber_setcancellable(cancellable); fiber_setcancellable(cancellable);
...@@ -287,10 +273,11 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) ...@@ -287,10 +273,11 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
ch->beg -= ch->size; ch->beg -= ch->size;
ch->count--; ch->count--;
if (!STAILQ_EMPTY(&ch->writers)) { if (!rlist_empty(&ch->writers)) {
struct fiber *f = STAILQ_FIRST(&ch->writers); struct fiber *f =
STAILQ_REMOVE_HEAD(&ch->writers, ifc); rlist_first_entry(&ch->writers, struct fiber, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); rlist_del_entry(f, ifc);
rlist_add_tail_entry(&ch->wakeup, f, ifc);
fiber_wakeup(f); fiber_wakeup(f);
} }
...@@ -298,23 +285,22 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout) ...@@ -298,23 +285,22 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
} }
void * void *
fiber_channel_get(struct fiber_channel *ch) ifc_channel_get(struct ifc_channel *ch)
{ {
return fiber_channel_get_timeout(ch, 0); return ifc_channel_get_timeout(ch, 0);
} }
int int
fiber_channel_put_timeout(struct fiber_channel *ch, void *data, ifc_channel_put_timeout(struct ifc_channel *ch, void *data,
ev_tstamp timeout) ev_tstamp timeout)
{ {
say_info("==== %s(%lu)", __func__, (unsigned long)data);
if (timeout < 0) if (timeout < 0)
timeout = 0; timeout = 0;
/* channel is full */ /* channel is full */
if (ch->count >= ch->size) { if (ch->count >= ch->size) {
STAILQ_INSERT_TAIL(&ch->writers, fiber, ifc); rlist_add_tail_entry(&ch->writers, fiber, ifc);
bool cancellable = fiber_setcancellable(true); bool cancellable = fiber_setcancellable(true);
if (timeout) { if (timeout) {
...@@ -326,24 +312,15 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data, ...@@ -326,24 +312,15 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
fiber_yield(); fiber_yield();
} }
int timeouted = ETIMEDOUT; rlist_del_entry(fiber, ifc);
struct fiber *f;
STAILQ_FOREACH(f, &ch->wakeup, ifc) {
if (f != fiber)
continue;
STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc);
timeouted = 0;
break;
}
if (timeouted)
STAILQ_REMOVE(&ch->writers, fiber, fiber, ifc);
fiber_testcancel(); fiber_testcancel();
fiber_setcancellable(cancellable); fiber_setcancellable(cancellable);
if (timeouted)
return timeouted;
} }
if (ch->count >= ch->size)
return ETIMEDOUT;
unsigned i = ch->beg; unsigned i = ch->beg;
i += ch->count; i += ch->count;
ch->count++; ch->count++;
...@@ -351,44 +328,60 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data, ...@@ -351,44 +328,60 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
i -= ch->size; i -= ch->size;
ch->item[i] = data; ch->item[i] = data;
if (!STAILQ_EMPTY(&ch->readers)) { if (!rlist_empty(&ch->readers)) {
struct fiber *f = STAILQ_FIRST(&ch->readers); struct fiber *f =
STAILQ_REMOVE_HEAD(&ch->readers, ifc); rlist_first_entry(&ch->readers, struct fiber, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); rlist_del_entry(f, ifc);
rlist_add_tail_entry(&ch->wakeup, f, ifc);
fiber_wakeup(f); fiber_wakeup(f);
} }
return 0; return 0;
} }
void void
fiber_channel_put(struct fiber_channel *ch, void *data) ifc_channel_put(struct ifc_channel *ch, void *data)
{ {
fiber_channel_put_timeout(ch, data, 0); ifc_channel_put_timeout(ch, data, 0);
} }
int int
fiber_channel_broadcast(struct fiber_channel *ch, void *data) ifc_channel_has_readers(struct ifc_channel *ch)
{ {
if (STAILQ_EMPTY(&ch->readers)) return !rlist_empty(&ch->readers);
return 0; }
int
ifc_channel_has_writers(struct ifc_channel *ch)
{
return !rlist_empty(&ch->writers);
}
int
ifc_channel_broadcast(struct ifc_channel *ch, void *data)
{
if (rlist_empty(&ch->readers)) {
ifc_channel_put(ch, data);
return 1;
}
struct fiber *f; struct fiber *f;
int count = 0; int count = 0;
STAILQ_FOREACH(f, &ch->readers, ifc) { rlist_foreach_entry(f, &ch->readers, ifc) {
count++; count++;
} }
for (int i = 0; i < count && !STAILQ_EMPTY(&ch->readers); i++) { for (int i = 0; i < count && !rlist_empty(&ch->readers); i++) {
struct fiber *f = STAILQ_FIRST(&ch->readers); struct fiber *f =
STAILQ_REMOVE_HEAD(&ch->readers, ifc); rlist_first_entry(&ch->readers, struct fiber, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc); rlist_del_entry(f, ifc);
rlist_add_tail_entry(&ch->wakeup, f, ifc);
ch->bcast = fiber; ch->bcast = fiber;
ch->bcast_msg = data; ch->bcast_msg = data;
fiber_wakeup(f); fiber_wakeup(f);
fiber_yield(); fiber_yield();
ch->bcast = NULL; ch->bcast = NULL;
fiber_testcancel(); fiber_testcancel();
if (STAILQ_EMPTY(&ch->readers)) { if (rlist_empty(&ch->readers)) {
count = i; count = i;
break; break;
} }
......
...@@ -44,16 +44,16 @@ static const char channel_lib[] = "box.ifc.channel"; ...@@ -44,16 +44,16 @@ static const char channel_lib[] = "box.ifc.channel";
/******************** semaphore ***************************/ /******************** semaphore ***************************/
static int static int
lbox_fiber_semaphore(struct lua_State *L) lbox_ifc_semaphore(struct lua_State *L)
{ {
say_info(":%s()", __func__); say_info(":%s()", __func__);
if (lua_gettop(L) != 1 || !lua_isnumber(L, 1)) if (lua_gettop(L) != 1 || !lua_isnumber(L, 1))
luaL_error(L, "fiber.semaphore(count): bad arguments"); luaL_error(L, "fiber.semaphore(count): bad arguments");
struct fiber_semaphore *sm = fiber_semaphore_alloc(); struct ifc_semaphore *sm = ifc_semaphore_alloc();
if (!sm) if (!sm)
luaL_error(L, "fiber.semaphore: Not enough memory"); luaL_error(L, "fiber.semaphore: Not enough memory");
fiber_semaphore_init(sm, lua_tointeger(L, -1)); ifc_semaphore_init(sm, lua_tointeger(L, -1));
...@@ -64,30 +64,30 @@ lbox_fiber_semaphore(struct lua_State *L) ...@@ -64,30 +64,30 @@ lbox_fiber_semaphore(struct lua_State *L)
return 1; return 1;
} }
static inline struct fiber_semaphore * static inline struct ifc_semaphore *
lbox_check_semaphore(struct lua_State *L, int narg) lbox_check_semaphore(struct lua_State *L, int narg)
{ {
return * (void **) luaL_checkudata(L, narg, semaphore_lib); return * (void **) luaL_checkudata(L, narg, semaphore_lib);
} }
static int static int
lbox_fiber_semaphore_up(struct lua_State *L) lbox_ifc_semaphore_up(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: semaphore:up()"); luaL_error(L, "usage: semaphore:up()");
struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); struct ifc_semaphore *sm = lbox_check_semaphore(L, -1);
fiber_semaphore_up(sm); ifc_semaphore_up(sm);
lua_pushnumber(L, fiber_semaphore_counter(sm)); lua_pushnumber(L, ifc_semaphore_counter(sm));
return 1; return 1;
} }
static int static int
lbox_fiber_semaphore_down(struct lua_State *L) lbox_ifc_semaphore_down(struct lua_State *L)
{ {
lua_Number timeout; lua_Number timeout;
struct fiber_semaphore *sm; struct ifc_semaphore *sm;
int top = lua_gettop(L); int top = lua_gettop(L);
...@@ -105,7 +105,7 @@ lbox_fiber_semaphore_down(struct lua_State *L) ...@@ -105,7 +105,7 @@ lbox_fiber_semaphore_down(struct lua_State *L)
sm = lbox_check_semaphore(L, -top); sm = lbox_check_semaphore(L, -top);
if (fiber_semaphore_down_timeout(sm, timeout) == 0) if (ifc_semaphore_down_timeout(sm, timeout) == 0)
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -113,38 +113,38 @@ lbox_fiber_semaphore_down(struct lua_State *L) ...@@ -113,38 +113,38 @@ lbox_fiber_semaphore_down(struct lua_State *L)
} }
static int static int
lbox_fiber_semaphore_counter(struct lua_State *L) lbox_ifc_semaphore_counter(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: semaphore()"); luaL_error(L, "usage: semaphore()");
struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); struct ifc_semaphore *sm = lbox_check_semaphore(L, -1);
lua_pushnumber(L, fiber_semaphore_counter(sm)); lua_pushnumber(L, ifc_semaphore_counter(sm));
return 1; return 1;
} }
static int static int
lbox_fiber_semaphore_gc(struct lua_State *L) lbox_ifc_semaphore_gc(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
return 0; return 0;
struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); struct ifc_semaphore *sm = lbox_check_semaphore(L, -1);
free(sm); free(sm);
return 0; return 0;
} }
static int static int
lbox_fiber_trydown(struct lua_State *L) lbox_ifc_trydown(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: semaphore:trydown()"); luaL_error(L, "usage: semaphore:trydown()");
struct fiber_semaphore *sm = lbox_check_semaphore(L, -1); struct ifc_semaphore *sm = lbox_check_semaphore(L, -1);
if (fiber_semaphore_trydown(sm) == 0) if (ifc_semaphore_trydown(sm) == 0)
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -154,13 +154,13 @@ lbox_fiber_trydown(struct lua_State *L) ...@@ -154,13 +154,13 @@ lbox_fiber_trydown(struct lua_State *L)
/******************** mutex ***************************/ /******************** mutex ***************************/
static int static int
lbox_fiber_mutex(struct lua_State *L) lbox_ifc_mutex(struct lua_State *L)
{ {
say_info(":%s()", __func__); say_info(":%s()", __func__);
struct fiber_mutex *m = fiber_mutex_alloc(); struct ifc_mutex *m = ifc_mutex_alloc();
if (!m) if (!m)
luaL_error(L, "fiber.mutex: Not enough memory"); luaL_error(L, "fiber.mutex: Not enough memory");
fiber_mutex_init(m); ifc_mutex_init(m);
void **ptr = lua_newuserdata(L, sizeof(void *)); void **ptr = lua_newuserdata(L, sizeof(void *));
luaL_getmetatable(L, mutex_lib); luaL_getmetatable(L, mutex_lib);
...@@ -169,7 +169,7 @@ lbox_fiber_mutex(struct lua_State *L) ...@@ -169,7 +169,7 @@ lbox_fiber_mutex(struct lua_State *L)
return 1; return 1;
} }
static inline struct fiber_mutex * static inline struct ifc_mutex *
lbox_check_mutex(struct lua_State *L, int narg) lbox_check_mutex(struct lua_State *L, int narg)
{ {
return * (void **) luaL_checkudata(L, narg, mutex_lib); return * (void **) luaL_checkudata(L, narg, mutex_lib);
...@@ -177,34 +177,34 @@ lbox_check_mutex(struct lua_State *L, int narg) ...@@ -177,34 +177,34 @@ lbox_check_mutex(struct lua_State *L, int narg)
static int static int
lbox_fiber_mutex_gc(struct lua_State *L) lbox_ifc_mutex_gc(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
return 0; return 0;
struct fiber_mutex *m = lbox_check_mutex(L, -1); struct ifc_mutex *m = lbox_check_mutex(L, -1);
free(m); free(m);
return 0; return 0;
} }
static int static int
lbox_fiber_mutex_locked(struct lua_State *L) lbox_ifc_mutex_locked(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: mutex()"); luaL_error(L, "usage: mutex()");
struct fiber_mutex *m = lbox_check_mutex(L, -1); struct ifc_mutex *m = lbox_check_mutex(L, -1);
lua_pushboolean(L, fiber_mutex_islocked(m)); lua_pushboolean(L, ifc_mutex_islocked(m));
return 1; return 1;
} }
static int static int
lbox_fiber_mutex_lock(struct lua_State *L) lbox_ifc_mutex_lock(struct lua_State *L)
{ {
int top = lua_gettop(L); int top = lua_gettop(L);
if (top < 1 || top > 2 || !lua_isuserdata(L, -top)) if (top < 1 || top > 2 || !lua_isuserdata(L, -top))
luaL_error(L, "usage: mutex:lock([timeout])"); luaL_error(L, "usage: mutex:lock([timeout])");
struct fiber_mutex *m = lbox_check_mutex(L, -top); struct ifc_mutex *m = lbox_check_mutex(L, -top);
lua_Number timeout; lua_Number timeout;
if (top == 2) { if (top == 2) {
if (!lua_isnumber(L, -1)) if (!lua_isnumber(L, -1))
...@@ -214,7 +214,7 @@ lbox_fiber_mutex_lock(struct lua_State *L) ...@@ -214,7 +214,7 @@ lbox_fiber_mutex_lock(struct lua_State *L)
timeout = 0; timeout = 0;
} }
if (fiber_mutex_lock_timeout(m, timeout) == 0) if (ifc_mutex_lock_timeout(m, timeout) == 0)
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -223,25 +223,25 @@ lbox_fiber_mutex_lock(struct lua_State *L) ...@@ -223,25 +223,25 @@ lbox_fiber_mutex_lock(struct lua_State *L)
static int static int
lbox_fiber_mutex_unlock(struct lua_State *L) lbox_ifc_mutex_unlock(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: mutex:unlock()"); luaL_error(L, "usage: mutex:unlock()");
struct fiber_mutex *m = lbox_check_mutex(L, -1); struct ifc_mutex *m = lbox_check_mutex(L, -1);
fiber_mutex_unlock(m); ifc_mutex_unlock(m);
return 0; return 0;
} }
static int static int
lbox_fiber_mutex_trylock(struct lua_State *L) lbox_ifc_mutex_trylock(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: mutex:trylock()"); luaL_error(L, "usage: mutex:trylock()");
struct fiber_mutex *m = lbox_check_mutex(L, -1); struct ifc_mutex *m = lbox_check_mutex(L, -1);
lua_pushboolean(L, fiber_mutex_trylock(m) == 0); lua_pushboolean(L, ifc_mutex_trylock(m) == 0);
return 1; return 1;
} }
...@@ -249,7 +249,7 @@ lbox_fiber_mutex_trylock(struct lua_State *L) ...@@ -249,7 +249,7 @@ lbox_fiber_mutex_trylock(struct lua_State *L)
/******************** channel ***************************/ /******************** channel ***************************/
static int static int
lbox_fiber_channel(struct lua_State *L) lbox_ifc_channel(struct lua_State *L)
{ {
say_info(":%s()", __func__); say_info(":%s()", __func__);
...@@ -263,10 +263,10 @@ lbox_fiber_channel(struct lua_State *L) ...@@ -263,10 +263,10 @@ lbox_fiber_channel(struct lua_State *L)
if (size < 0) if (size < 0)
luaL_error(L, "fiber.channel(size): negative size"); luaL_error(L, "fiber.channel(size): negative size");
} }
struct fiber_channel *ch = fiber_channel_alloc(size); struct ifc_channel *ch = ifc_channel_alloc(size);
if (!ch) if (!ch)
luaL_error(L, "fiber.channel: Not enough memory"); luaL_error(L, "fiber.channel: Not enough memory");
fiber_channel_init(ch); ifc_channel_init(ch);
void **ptr = lua_newuserdata(L, sizeof(void *)); void **ptr = lua_newuserdata(L, sizeof(void *));
luaL_getmetatable(L, channel_lib); luaL_getmetatable(L, channel_lib);
...@@ -280,7 +280,7 @@ lbox_fiber_channel(struct lua_State *L) ...@@ -280,7 +280,7 @@ lbox_fiber_channel(struct lua_State *L)
return 1; return 1;
} }
static inline struct fiber_channel * static inline struct ifc_channel *
lbox_check_channel(struct lua_State *L, int narg) lbox_check_channel(struct lua_State *L, int narg)
{ {
return * (void **) luaL_checkudata(L, narg, channel_lib); return * (void **) luaL_checkudata(L, narg, channel_lib);
...@@ -288,23 +288,23 @@ lbox_check_channel(struct lua_State *L, int narg) ...@@ -288,23 +288,23 @@ lbox_check_channel(struct lua_State *L, int narg)
static int static int
lbox_fiber_channel_gc(struct lua_State *L) lbox_ifc_channel_gc(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
return 0; return 0;
struct fiber_channel *ch = lbox_check_channel(L, -1); struct ifc_channel *ch = lbox_check_channel(L, -1);
free(ch); free(ch);
return 0; return 0;
} }
static int static int
lbox_fiber_channel_isfull(struct lua_State *L) lbox_ifc_channel_isfull(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: channel:is_full()"); luaL_error(L, "usage: channel:is_full()");
struct fiber_channel *ch = lbox_check_channel(L, -1); struct ifc_channel *ch = lbox_check_channel(L, -1);
if (fiber_channel_isfull(ch)) if (ifc_channel_isfull(ch))
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -312,12 +312,12 @@ lbox_fiber_channel_isfull(struct lua_State *L) ...@@ -312,12 +312,12 @@ lbox_fiber_channel_isfull(struct lua_State *L)
} }
static int static int
lbox_fiber_channel_isempty(struct lua_State *L) lbox_ifc_channel_isempty(struct lua_State *L)
{ {
if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1)) if (lua_gettop(L) != 1 || !lua_isuserdata(L, 1))
luaL_error(L, "usage: channel:is_empty()"); luaL_error(L, "usage: channel:is_empty()");
struct fiber_channel *ch = lbox_check_channel(L, -1); struct ifc_channel *ch = lbox_check_channel(L, -1);
if (fiber_channel_isempty(ch)) if (ifc_channel_isempty(ch))
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -325,11 +325,11 @@ lbox_fiber_channel_isempty(struct lua_State *L) ...@@ -325,11 +325,11 @@ lbox_fiber_channel_isempty(struct lua_State *L)
} }
static int static int
lbox_fiber_channel_put(struct lua_State *L) lbox_ifc_channel_put(struct lua_State *L)
{ {
double timeout; double timeout;
int top = lua_gettop(L); int top = lua_gettop(L);
struct fiber_channel *ch; struct ifc_channel *ch;
switch(top) { switch(top) {
case 2: case 2:
...@@ -370,7 +370,7 @@ lbox_fiber_channel_put(struct lua_State *L) ...@@ -370,7 +370,7 @@ lbox_fiber_channel_put(struct lua_State *L)
lua_settop(L, top); lua_settop(L, top);
if (fiber_channel_put_timeout(ch, (void *)rid, timeout) == 0) if (ifc_channel_put_timeout(ch, (void *)rid, timeout) == 0)
lua_pushboolean(L, 1); lua_pushboolean(L, 1);
else else
lua_pushboolean(L, 0); lua_pushboolean(L, 0);
...@@ -378,7 +378,7 @@ lbox_fiber_channel_put(struct lua_State *L) ...@@ -378,7 +378,7 @@ lbox_fiber_channel_put(struct lua_State *L)
} }
static int static int
lbox_fiber_channel_get(struct lua_State *L) lbox_ifc_channel_get(struct lua_State *L)
{ {
int top = lua_gettop(L); int top = lua_gettop(L);
double timeout; double timeout;
...@@ -396,9 +396,9 @@ lbox_fiber_channel_get(struct lua_State *L) ...@@ -396,9 +396,9 @@ lbox_fiber_channel_get(struct lua_State *L)
timeout = 0; timeout = 0;
} }
struct fiber_channel *ch = lbox_check_channel(L, 1); struct ifc_channel *ch = lbox_check_channel(L, 1);
lua_Integer rid = (lua_Integer)fiber_channel_get_timeout(ch, timeout); lua_Integer rid = (lua_Integer)ifc_channel_get_timeout(ch, timeout);
if (!rid) { if (!rid) {
lua_pushnil(L); lua_pushnil(L);
...@@ -428,15 +428,18 @@ lbox_fiber_channel_get(struct lua_State *L) ...@@ -428,15 +428,18 @@ lbox_fiber_channel_get(struct lua_State *L)
static int static int
lbox_fiber_channel_broadcast(struct lua_State *L) lbox_ifc_channel_broadcast(struct lua_State *L)
{ {
struct fiber_channel *ch; struct ifc_channel *ch;
if (lua_gettop(L) != 2) if (lua_gettop(L) != 2)
luaL_error(L, "usage: channel:broadcast(variable)"); luaL_error(L, "usage: channel:broadcast(variable)");
ch = lbox_check_channel(L, -2); ch = lbox_check_channel(L, -2);
if (!ifc_channel_has_readers(ch))
return lbox_ifc_channel_put(L);
lua_getmetatable(L, -2); /* 3 */ lua_getmetatable(L, -2); /* 3 */
lua_pushstring(L, "broadcast_message"); /* 4 */ lua_pushstring(L, "broadcast_message"); /* 4 */
...@@ -449,7 +452,7 @@ lbox_fiber_channel_broadcast(struct lua_State *L) ...@@ -449,7 +452,7 @@ lbox_fiber_channel_broadcast(struct lua_State *L)
lua_pushvalue(L, 2); lua_pushvalue(L, 2);
lua_settable(L, 3); lua_settable(L, 3);
int count = fiber_channel_broadcast(ch, (void *)1); int count = ifc_channel_broadcast(ch, (void *)1);
lua_settable(L, 3); lua_settable(L, 3);
...@@ -460,16 +463,35 @@ lbox_fiber_channel_broadcast(struct lua_State *L) ...@@ -460,16 +463,35 @@ lbox_fiber_channel_broadcast(struct lua_State *L)
} }
static int
lbox_ifc_channel_has_readers(struct lua_State *L)
{
if (lua_gettop(L) != 1)
luaL_error(L, "usage: channel:has_readers()");
struct ifc_channel *ch = lbox_check_channel(L, -1);
lua_pushboolean(L, ifc_channel_has_readers(ch));
return 1;
}
static int
lbox_ifc_channel_has_writers(struct lua_State *L)
{
if (lua_gettop(L) != 1)
luaL_error(L, "usage: channel:has_writers()");
struct ifc_channel *ch = lbox_check_channel(L, -1);
lua_pushboolean(L, ifc_channel_has_writers(ch));
return 1;
}
void void
ifc_lua_init(struct lua_State *L) ifc_lua_init(struct lua_State *L)
{ {
static const struct luaL_reg semaphore_meta[] = { static const struct luaL_reg semaphore_meta[] = {
{"__call", lbox_fiber_semaphore_counter}, {"__call", lbox_ifc_semaphore_counter},
{"__gc", lbox_fiber_semaphore_gc}, {"__gc", lbox_ifc_semaphore_gc},
{"up", lbox_fiber_semaphore_up}, {"up", lbox_ifc_semaphore_up},
{"down", lbox_fiber_semaphore_down}, {"down", lbox_ifc_semaphore_down},
{"trydown", lbox_fiber_trydown}, {"trydown", lbox_ifc_trydown},
{NULL, NULL} {NULL, NULL}
}; };
...@@ -477,23 +499,25 @@ ifc_lua_init(struct lua_State *L) ...@@ -477,23 +499,25 @@ ifc_lua_init(struct lua_State *L)
static const struct luaL_reg mutex_meta[] = { static const struct luaL_reg mutex_meta[] = {
{"__gc", lbox_fiber_mutex_gc}, {"__gc", lbox_ifc_mutex_gc},
{"__call", lbox_fiber_mutex_locked}, {"__call", lbox_ifc_mutex_locked},
{"locked", lbox_fiber_mutex_locked}, {"locked", lbox_ifc_mutex_locked},
{"lock", lbox_fiber_mutex_lock}, {"lock", lbox_ifc_mutex_lock},
{"unlock", lbox_fiber_mutex_unlock}, {"unlock", lbox_ifc_mutex_unlock},
{"trylock", lbox_fiber_mutex_trylock}, {"trylock", lbox_ifc_mutex_trylock},
{NULL, NULL} {NULL, NULL}
}; };
tarantool_lua_register_type(L, mutex_lib, mutex_meta); tarantool_lua_register_type(L, mutex_lib, mutex_meta);
static const struct luaL_reg channel_meta[] = { static const struct luaL_reg channel_meta[] = {
{"__gc", lbox_fiber_channel_gc}, {"__gc", lbox_ifc_channel_gc},
{"is_full", lbox_fiber_channel_isfull}, {"is_full", lbox_ifc_channel_isfull},
{"is_empty", lbox_fiber_channel_isempty}, {"is_empty", lbox_ifc_channel_isempty},
{"put", lbox_fiber_channel_put}, {"put", lbox_ifc_channel_put},
{"get", lbox_fiber_channel_get}, {"get", lbox_ifc_channel_get},
{"broadcast", lbox_fiber_channel_broadcast}, {"broadcast", lbox_ifc_channel_broadcast},
{"has_readers", lbox_ifc_channel_has_readers},
{"has_writers", lbox_ifc_channel_has_writers},
{NULL, NULL} {NULL, NULL}
}; };
tarantool_lua_register_type(L, channel_lib, channel_meta); tarantool_lua_register_type(L, channel_lib, channel_meta);
...@@ -501,9 +525,9 @@ ifc_lua_init(struct lua_State *L) ...@@ -501,9 +525,9 @@ ifc_lua_init(struct lua_State *L)
static const struct luaL_reg ifc_meta[] = { static const struct luaL_reg ifc_meta[] = {
{"semaphore", lbox_fiber_semaphore}, {"semaphore", lbox_ifc_semaphore},
{"mutex", lbox_fiber_mutex}, {"mutex", lbox_ifc_mutex},
{"channel", lbox_fiber_channel}, {"channel", lbox_ifc_channel},
{NULL, NULL} {NULL, NULL}
}; };
......
...@@ -248,9 +248,25 @@ lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end ...@@ -248,9 +248,25 @@ lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end
lua box.fiber.sleep(.5) lua box.fiber.sleep(.5)
--- ---
... ...
lua ch:has_readers()
---
- true
...
lua ch:has_writers()
---
- false
...
lua box.fiber.cancel(tfbr) lua box.fiber.cancel(tfbr)
--- ---
... ...
lua ch:has_readers()
---
- false
...
lua ch:has_writers()
---
- false
...
lua ch:put(box.info.pid) lua ch:put(box.info.pid)
--- ---
- true - true
...@@ -281,13 +297,21 @@ lua for i, v in pairs(buffer) do print(v) end ...@@ -281,13 +297,21 @@ lua for i, v in pairs(buffer) do print(v) end
9 9
10 10
... ...
lua ch:is_empty()
---
- true
...
lua ch:broadcast() lua ch:broadcast()
--- ---
error: 'usage: channel:broadcast(variable)' error: 'usage: channel:broadcast(variable)'
... ...
lua ch:broadcast(123) lua ch:broadcast(123)
--- ---
- 0 - true
...
lua ch:get()
---
- 123
... ...
lua ch:is_full() lua ch:is_full()
--- ---
......
...@@ -88,15 +88,21 @@ exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true ...@@ -88,15 +88,21 @@ exec admin "lua tfbr = box.fiber.create(function() box.fiber.detach() while true
exec admin "lua box.fiber.resume(tfbr)" exec admin "lua box.fiber.resume(tfbr)"
exec admin "lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end" exec admin "lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end"
exec admin "lua box.fiber.sleep(.5)" exec admin "lua box.fiber.sleep(.5)"
exec admin "lua ch:has_readers()"
exec admin "lua ch:has_writers()"
exec admin "lua box.fiber.cancel(tfbr)" exec admin "lua box.fiber.cancel(tfbr)"
exec admin "lua ch:has_readers()"
exec admin "lua ch:has_writers()"
exec admin "lua ch:put(box.info.pid)" exec admin "lua ch:put(box.info.pid)"
exec admin "lua ch:is_full()" exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()" exec admin "lua ch:is_empty()"
exec admin "lua ch:get(box.info.pid) == box.info.pid" exec admin "lua ch:get(box.info.pid) == box.info.pid"
exec admin "lua for i, v in pairs(buffer) do print(v) end" exec admin "lua for i, v in pairs(buffer) do print(v) end"
exec admin "lua ch:is_empty()"
exec admin "lua ch:broadcast()" exec admin "lua ch:broadcast()"
exec admin "lua ch:broadcast(123)" exec admin "lua ch:broadcast(123)"
exec admin "lua ch:get()"
exec admin "lua ch:is_full()" exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()" exec admin "lua ch:is_empty()"
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment