diff --git a/src/ipc.m b/src/ipc.m index 9ece7799591f4f89e8cc45227ef1881c8ddee9f5..aed588e3528bcec1d00dea3e5c477cbaf2eef7f6 100644 --- a/src/ipc.m +++ b/src/ipc.m @@ -34,15 +34,11 @@ const ev_tstamp IPC_TIMEOUT_INFINITY = 365*86400*100.0; struct ipc_channel { - struct rlist readers, writers; - unsigned creaders; - unsigned cwriters; + struct rlist readers, writers, bcast; unsigned size; unsigned beg; unsigned count; - void *bcast_msg; - void *item[0]; }; @@ -75,8 +71,7 @@ void ipc_channel_init(struct ipc_channel *ch) { ch->beg = ch->count = 0; - ch->creaders = 0; - ch->cwriters = 0; + rlist_init(&ch->bcast); rlist_init(&ch->readers); rlist_init(&ch->writers); } @@ -99,36 +94,48 @@ ipc_channel_cleanup(struct ipc_channel *ch) void * ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) { + struct fiber *f; + bool first_try = true; + ev_tstamp started = ev_now(); /* channel is empty */ - if (ch->count == 0 || ch->creaders >= ch->count) { - rlist_add_tail_entry(&ch->readers, fiber, state); - ch->creaders++; + while (ch->count == 0) { + + /* try to be in FIFO order */ + if (first_try) { + rlist_add_tail_entry(&ch->readers, fiber, state); + first_try = false; + } else { + rlist_add_entry(&ch->readers, fiber, state); + } bool cancellable = fiber_setcancellable(true); - bool timed_out = fiber_yield_timeout(timeout); + fiber_yield_timeout(timeout); rlist_del_entry(fiber, state); - ch->creaders--; + + /* broadcast messsage wakes us up */ + if (!rlist_empty(&ch->bcast)) { + f = rlist_first_entry(&ch->bcast, struct fiber, state); + rlist_del_entry(f, state); + fiber_wakeup(f); + fiber_testcancel(); + fiber_setcancellable(cancellable); + return ch->bcast_msg; + } fiber_testcancel(); fiber_setcancellable(cancellable); - if (timed_out) + timeout -= ev_now() - started; + if (timeout <= 0) return NULL; - - if (fiber->waiter) { - fiber_wakeup(fiber->waiter); - return ch->bcast_msg; - } } - assert(ch->count > 0); void *res = ch->item[ch->beg]; if (++ch->beg >= ch->size) ch->beg -= ch->size; ch->count--; if (!rlist_empty(&ch->writers)) { - struct fiber *f = - rlist_first_entry(&ch->writers, struct fiber, state); + f = rlist_first_entry(&ch->writers, struct fiber, state); rlist_del_entry(f, state); fiber_wakeup(f); } @@ -147,28 +154,33 @@ int ipc_channel_put_timeout(struct ipc_channel *ch, void *data, ev_tstamp timeout) { + bool first_try = true; + ev_tstamp started = ev_now(); /* channel is full */ - if (ch->count >= ch->size || ch->cwriters >= ch->size - ch->count) { - - rlist_add_tail_entry(&ch->writers, fiber, state); - ch->cwriters++; + while (ch->count >= ch->size) { + + /* try to be in FIFO order */ + if (first_try) { + rlist_add_tail_entry(&ch->writers, fiber, state); + first_try = false; + } else { + rlist_add_entry(&ch->writers, fiber, state); + } bool cancellable = fiber_setcancellable(true); - bool timed_out = fiber_yield_timeout(timeout); + fiber_yield_timeout(timeout); rlist_del_entry(fiber, state); - ch->cwriters--; fiber_testcancel(); fiber_setcancellable(cancellable); - if (timed_out) { + timeout -= ev_now() - started; + if (timeout <= 0) { errno = ETIMEDOUT; return -1; } } - assert(ch->count < ch->size); - unsigned i = ch->beg; i += ch->count; ch->count++; @@ -177,8 +189,8 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data, ch->item[i] = data; if (!rlist_empty(&ch->readers)) { - struct fiber *f = - rlist_first_entry(&ch->readers, struct fiber, state); + struct fiber *f; + f = rlist_first_entry(&ch->readers, struct fiber, state); rlist_del_entry(f, state); fiber_wakeup(f); } @@ -194,45 +206,50 @@ ipc_channel_put(struct ipc_channel *ch, void *data) bool ipc_channel_has_readers(struct ipc_channel *ch) { - return ch->creaders > 0; + return !rlist_empty(&ch->readers); } bool ipc_channel_has_writers(struct ipc_channel *ch) { - return ch->cwriters > 0; + return !rlist_empty(&ch->writers); } int ipc_channel_broadcast(struct ipc_channel *ch, void *data) { + /* broadcast in broadcast: marasmus */ + if (!rlist_empty(&ch->bcast)) + return 0; + + /* there is no reader on channel */ if (rlist_empty(&ch->readers)) { ipc_channel_put(ch, data); return 1; } + unsigned readers = 0; struct fiber *f; - int count = 0; rlist_foreach_entry(f, &ch->readers, state) { - count++; + readers++; } - for (int i = 0; i < count && !rlist_empty(&ch->readers); i++) { - struct fiber *f = - rlist_first_entry(&ch->readers, struct fiber, state); - rlist_del_entry(f, state); - assert(f->waiter == NULL); - f->waiter = fiber; + unsigned cnt = 0; + while(!rlist_empty(&ch->readers)) { + f = rlist_first_entry(&ch->readers, struct fiber, state); + ch->bcast_msg = data; + rlist_add_tail_entry(&ch->bcast, fiber, state); fiber_wakeup(f); + bool cancellable = fiber_setcancellable(true); fiber_yield(); - f->waiter = NULL; + rlist_del_entry(fiber, state); fiber_testcancel(); - if (rlist_empty(&ch->readers)) { - count = i; + fiber_setcancellable(cancellable); + /* if any other reader was added don't wake it up */ + if (++cnt >= readers) break; - } } - return count; + return cnt; }