Skip to content
Snippets Groups Projects
Commit e5ad459d authored by Dmitry E. Oboukhov's avatar Dmitry E. Oboukhov
Browse files

fixed race conditions

parent d9e440f6
No related branches found
No related tags found
No related merge requests found
......@@ -31,17 +31,6 @@
#include "fiber.h"
#include <stdlib.h>
#define STAILQ_REMOVE_SAFE(elm, head, type, field) \
{ \
struct type *it; \
STAILQ_FOREACH(it, (head), field) { \
if (it != elm) \
continue; \
STAILQ_REMOVE((head), elm, type, field); \
break; \
} \
}
struct fiber_semaphore {
......@@ -71,7 +60,6 @@ fiber_semaphore_init(struct fiber_semaphore *s, int cnt)
STAILQ_INIT(&s->wakeup);
}
int
fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout)
{
......@@ -96,29 +84,27 @@ fiber_semaphore_down_timeout(struct fiber_semaphore *s, ev_tstamp timeout)
fiber_yield();
}
if (fiber_is_cancelled()) {
s->count++;
STAILQ_REMOVE_SAFE(fiber, &s->fibers, fiber, ifc);
STAILQ_REMOVE_SAFE(fiber, &s->wakeup, fiber, ifc);
fiber_testcancel();
}
fiber_setcancellable(cancellable);
int timeouted = ETIMEDOUT;
struct fiber *f;
STAILQ_FOREACH(f, &s->fibers, ifc) {
STAILQ_FOREACH(f, &s->wakeup, ifc) {
if (f != fiber)
continue;
STAILQ_REMOVE(&s->wakeup, fiber, fiber, ifc);
timeouted = 0;
break;
}
if (timeouted) {
STAILQ_REMOVE(&s->fibers, fiber, fiber, ifc);
s->count++;
STAILQ_REMOVE(&s->fibers, f, fiber, ifc);
return ETIMEDOUT;
}
STAILQ_REMOVE_SAFE(fiber, &s->wakeup, fiber, ifc);
fiber_testcancel();
return 0;
return timeouted;
}
void
......@@ -130,13 +116,16 @@ fiber_semaphore_down(struct fiber_semaphore *s)
void
fiber_semaphore_up(struct fiber_semaphore *s)
{
++s->count;
if (!STAILQ_EMPTY(&s->fibers)) { /* wake up one fiber */
struct fiber *f = STAILQ_FIRST(&s->fibers);
STAILQ_REMOVE_HEAD(&s->fibers, ifc);
STAILQ_INSERT_TAIL(&s->wakeup, f, ifc);
fiber_wakeup(f);
}
s->count++;
if (STAILQ_EMPTY(&s->fibers))
return;
/* wake up one fiber */
struct fiber *f = STAILQ_FIRST(&s->fibers);
STAILQ_REMOVE(&s->fibers, f, fiber, ifc);
STAILQ_INSERT_TAIL(&s->wakeup, f, ifc);
fiber_wakeup(f);
}
int
......@@ -198,7 +187,7 @@ fiber_mutex_islocked(struct fiber_mutex *m)
/**********************************************************************/
struct fiber_channel {
STAILQ_HEAD(, fiber) readers, writers;
STAILQ_HEAD(, fiber) readers, writers, wakeup;
unsigned size;
unsigned beg;
unsigned count;
......@@ -243,6 +232,7 @@ fiber_channel_init(struct fiber_channel *ch)
STAILQ_INIT(&ch->readers);
STAILQ_INIT(&ch->writers);
STAILQ_INIT(&ch->wakeup);
}
void *
......@@ -264,11 +254,21 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
fiber_yield();
}
if (fiber_is_cancelled() || !ch->count) {
STAILQ_REMOVE_SAFE(fiber, &ch->readers, fiber, ifc);
say_info("==== %s(): testcancel", __func__);
fiber_testcancel();
int timeouted = ETIMEDOUT;
struct fiber *f;
STAILQ_FOREACH(f, &ch->wakeup, ifc) {
if (f != fiber)
continue;
STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc);
timeouted = 0;
break;
}
if (timeouted)
STAILQ_REMOVE(&ch->readers, fiber, fiber, ifc);
fiber_testcancel();
fiber_setcancellable(cancellable);
if (ch->bcast) {
......@@ -290,11 +290,10 @@ fiber_channel_get_timeout(struct fiber_channel *ch, ev_tstamp timeout)
if (!STAILQ_EMPTY(&ch->writers)) {
struct fiber *f = STAILQ_FIRST(&ch->writers);
STAILQ_REMOVE_HEAD(&ch->writers, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc);
fiber_wakeup(f);
}
say_info("==== %s() -> %lu", __func__, (unsigned long)res);
return res;
}
......@@ -327,17 +326,24 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
fiber_yield();
}
if (fiber_is_cancelled() || ch->count >= ch->size) {
STAILQ_REMOVE_SAFE(fiber, &ch->writers, fiber, ifc);
fiber_testcancel();
int timeouted = ETIMEDOUT;
struct fiber *f;
STAILQ_FOREACH(f, &ch->wakeup, ifc) {
if (f != fiber)
continue;
STAILQ_REMOVE(&ch->wakeup, fiber, fiber, ifc);
timeouted = 0;
break;
}
if (timeouted)
STAILQ_REMOVE(&ch->writers, fiber, fiber, ifc);
fiber_testcancel();
fiber_setcancellable(cancellable);
if (timeouted)
return timeouted;
}
if (ch->count >= ch->size)
return ETIMEDOUT;
unsigned i = ch->beg;
i += ch->count;
ch->count++;
......@@ -348,6 +354,7 @@ fiber_channel_put_timeout(struct fiber_channel *ch, void *data,
if (!STAILQ_EMPTY(&ch->readers)) {
struct fiber *f = STAILQ_FIRST(&ch->readers);
STAILQ_REMOVE_HEAD(&ch->readers, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc);
fiber_wakeup(f);
}
return 0;
......@@ -374,6 +381,7 @@ fiber_channel_broadcast(struct fiber_channel *ch, void *data)
for (int i = 0; i < count && !STAILQ_EMPTY(&ch->readers); i++) {
struct fiber *f = STAILQ_FIRST(&ch->readers);
STAILQ_REMOVE_HEAD(&ch->readers, ifc);
STAILQ_INSERT_TAIL(&ch->wakeup, f, ifc);
ch->bcast = fiber;
ch->bcast_msg = data;
fiber_wakeup(f);
......
......@@ -255,6 +255,14 @@ lua ch:put(box.info.pid)
---
- true
...
lua ch:is_full()
---
- true
...
lua ch:is_empty()
---
- false
...
lua ch:get(box.info.pid) == box.info.pid
---
- true
......
......@@ -90,6 +90,8 @@ exec admin "lua for i = 1, 10 do ch:put(i) box.fiber.sleep(0.01) end"
exec admin "lua box.fiber.sleep(.5)"
exec admin "lua box.fiber.cancel(tfbr)"
exec admin "lua ch:put(box.info.pid)"
exec admin "lua ch:is_full()"
exec admin "lua ch:is_empty()"
exec admin "lua ch:get(box.info.pid) == box.info.pid"
exec admin "lua for i, v in pairs(buffer) do print(v) end"
......
......@@ -529,6 +529,10 @@ struct { \
QMD_TRACE_ELEM(&(elm)->field); \
} while (0)
#define TAILQ_REMOVE_HEAD(head, field) do { \
TAILQ_REMOVE(head, TAILQ_FIRST(head), field); \
} while (0)
#ifdef _KERNEL
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment