Skip to content
Snippets Groups Projects
Commit c6eddf18 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Merge branch 'ipc-as-kostya-said'

parents 28dc7d57 1e57c865
No related branches found
No related tags found
No related merge requests found
......@@ -34,15 +34,11 @@
const ev_tstamp IPC_TIMEOUT_INFINITY = 365*86400*100.0;
struct ipc_channel {
struct rlist readers, writers;
unsigned creaders;
unsigned cwriters;
struct rlist readers, writers, bcast;
unsigned size;
unsigned beg;
unsigned count;
void *bcast_msg;
void *item[0];
};
......@@ -75,8 +71,7 @@ void
ipc_channel_init(struct ipc_channel *ch)
{
ch->beg = ch->count = 0;
ch->creaders = 0;
ch->cwriters = 0;
rlist_init(&ch->bcast);
rlist_init(&ch->readers);
rlist_init(&ch->writers);
}
......@@ -99,36 +94,48 @@ ipc_channel_cleanup(struct ipc_channel *ch)
void *
ipc_channel_get_timeout(struct ipc_channel *ch, ev_tstamp timeout)
{
struct fiber *f;
bool first_try = true;
ev_tstamp started = ev_now();
/* channel is empty */
if (ch->count == 0 || ch->creaders >= ch->count) {
rlist_add_tail_entry(&ch->readers, fiber, state);
ch->creaders++;
while (ch->count == 0) {
/* try to be in FIFO order */
if (first_try) {
rlist_add_tail_entry(&ch->readers, fiber, state);
first_try = false;
} else {
rlist_add_entry(&ch->readers, fiber, state);
}
bool cancellable = fiber_setcancellable(true);
bool timed_out = fiber_yield_timeout(timeout);
fiber_yield_timeout(timeout);
rlist_del_entry(fiber, state);
ch->creaders--;
/* broadcast messsage wakes us up */
if (!rlist_empty(&ch->bcast)) {
f = rlist_first_entry(&ch->bcast, struct fiber, state);
rlist_del_entry(f, state);
fiber_wakeup(f);
fiber_testcancel();
fiber_setcancellable(cancellable);
return ch->bcast_msg;
}
fiber_testcancel();
fiber_setcancellable(cancellable);
if (timed_out)
timeout -= ev_now() - started;
if (timeout <= 0)
return NULL;
if (fiber->waiter) {
fiber_wakeup(fiber->waiter);
return ch->bcast_msg;
}
}
assert(ch->count > 0);
void *res = ch->item[ch->beg];
if (++ch->beg >= ch->size)
ch->beg -= ch->size;
ch->count--;
if (!rlist_empty(&ch->writers)) {
struct fiber *f =
rlist_first_entry(&ch->writers, struct fiber, state);
f = rlist_first_entry(&ch->writers, struct fiber, state);
rlist_del_entry(f, state);
fiber_wakeup(f);
}
......@@ -147,28 +154,33 @@ int
ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
ev_tstamp timeout)
{
bool first_try = true;
ev_tstamp started = ev_now();
/* channel is full */
if (ch->count >= ch->size || ch->cwriters >= ch->size - ch->count) {
rlist_add_tail_entry(&ch->writers, fiber, state);
ch->cwriters++;
while (ch->count >= ch->size) {
/* try to be in FIFO order */
if (first_try) {
rlist_add_tail_entry(&ch->writers, fiber, state);
first_try = false;
} else {
rlist_add_entry(&ch->writers, fiber, state);
}
bool cancellable = fiber_setcancellable(true);
bool timed_out = fiber_yield_timeout(timeout);
fiber_yield_timeout(timeout);
rlist_del_entry(fiber, state);
ch->cwriters--;
fiber_testcancel();
fiber_setcancellable(cancellable);
if (timed_out) {
timeout -= ev_now() - started;
if (timeout <= 0) {
errno = ETIMEDOUT;
return -1;
}
}
assert(ch->count < ch->size);
unsigned i = ch->beg;
i += ch->count;
ch->count++;
......@@ -177,8 +189,8 @@ ipc_channel_put_timeout(struct ipc_channel *ch, void *data,
ch->item[i] = data;
if (!rlist_empty(&ch->readers)) {
struct fiber *f =
rlist_first_entry(&ch->readers, struct fiber, state);
struct fiber *f;
f = rlist_first_entry(&ch->readers, struct fiber, state);
rlist_del_entry(f, state);
fiber_wakeup(f);
}
......@@ -194,45 +206,50 @@ ipc_channel_put(struct ipc_channel *ch, void *data)
bool
ipc_channel_has_readers(struct ipc_channel *ch)
{
return ch->creaders > 0;
return !rlist_empty(&ch->readers);
}
bool
ipc_channel_has_writers(struct ipc_channel *ch)
{
return ch->cwriters > 0;
return !rlist_empty(&ch->writers);
}
int
ipc_channel_broadcast(struct ipc_channel *ch, void *data)
{
/* broadcast in broadcast: marasmus */
if (!rlist_empty(&ch->bcast))
return 0;
/* there is no reader on channel */
if (rlist_empty(&ch->readers)) {
ipc_channel_put(ch, data);
return 1;
}
unsigned readers = 0;
struct fiber *f;
int count = 0;
rlist_foreach_entry(f, &ch->readers, state) {
count++;
readers++;
}
for (int i = 0; i < count && !rlist_empty(&ch->readers); i++) {
struct fiber *f =
rlist_first_entry(&ch->readers, struct fiber, state);
rlist_del_entry(f, state);
assert(f->waiter == NULL);
f->waiter = fiber;
unsigned cnt = 0;
while(!rlist_empty(&ch->readers)) {
f = rlist_first_entry(&ch->readers, struct fiber, state);
ch->bcast_msg = data;
rlist_add_tail_entry(&ch->bcast, fiber, state);
fiber_wakeup(f);
bool cancellable = fiber_setcancellable(true);
fiber_yield();
f->waiter = NULL;
rlist_del_entry(fiber, state);
fiber_testcancel();
if (rlist_empty(&ch->readers)) {
count = i;
fiber_setcancellable(cancellable);
/* if any other reader was added don't wake it up */
if (++cnt >= readers)
break;
}
}
return count;
return cnt;
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment