diff --git a/src/ipc.m b/src/ipc.m index 27d507c5da4fb061d3b09535bdd4d7f8ae2d7f3b..889f57926e2f42f002c57b8e2e8adaf43eee7a87 100644 --- a/src/ipc.m +++ b/src/ipc.m @@ -35,6 +35,8 @@ const ev_tstamp IPC_TIMEOUT_INFINITY = 365*86400*100.0; struct ipc_channel { struct rlist readers, writers; + unsigned creaders; + unsigned cwriters; unsigned size; unsigned beg; unsigned count; @@ -73,6 +75,8 @@ void ipc_channel_init(struct ipc_channel *ch) { ch->beg = ch->count = 0; + ch->creaders = 0; + ch->cwriters = 0; rlist_init(&ch->readers); rlist_init(&ch->writers); } @@ -96,12 +100,13 @@ void * ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) { /* channel is empty */ - if (!ch->count) { + if (ch->count == 0 || ch->creaders >= ch->count) { rlist_add_tail_entry(&ch->readers, fiber, state); + ch->creaders++; bool cancellable = fiber_setcancellable(true); - bool timed_out = fiber_yield_timeout(timeout); rlist_del_entry(fiber, state); + ch->creaders--; fiber_testcancel(); fiber_setcancellable(cancellable); @@ -115,6 +120,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) } } + assert(ch->count > 0); void *res = ch->item[ch->beg]; if (++ch->beg >= ch->size) ch->beg -= ch->size; @@ -127,6 +133,7 @@ ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout) fiber_wakeup(f); } + return res; } @@ -141,13 +148,15 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data, ev_tstamp timeout) { /* channel is full */ - if (ch->count >= ch->size) { + if (ch->count >= ch->size || ch->cwriters >= ch->size - ch->count) { rlist_add_tail_entry(&ch->writers, fiber, state); + ch->cwriters++; bool cancellable = fiber_setcancellable(true); bool timed_out = fiber_yield_timeout(timeout); rlist_del_entry(fiber, state); + ch->cwriters--; fiber_testcancel(); fiber_setcancellable(cancellable); @@ -156,9 +165,10 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data, errno = ETIMEDOUT; return -1; } - } + assert(ch->count < ch->size); + unsigned i = ch->beg; i += ch->count; ch->count++;