From 4a93f5baceb07caabfdfb803474932c5ab6aa473 Mon Sep 17 00:00:00 2001 From: Nick Zavaritsky <mejedi@gmail.com> Date: Fri, 21 Aug 2015 21:18:30 +0300 Subject: [PATCH] gh-845: coeio-less cord_cojoin --- src/coeio.cc | 24 ----------- src/fiber.cc | 120 ++++++++++++++++++++++++++++++++++++++++++++++++++- src/fiber.h | 3 ++ 3 files changed, 122 insertions(+), 25 deletions(-) diff --git a/src/coeio.cc b/src/coeio.cc index c15cfabf23..627df66c3d 100644 --- a/src/coeio.cc +++ b/src/coeio.cc @@ -347,27 +347,3 @@ coio_getaddrinfo(const char *host, const char *port, return rc; } -static ssize_t -cord_cojoin_cb(va_list ap) -{ - struct cord *cord = va_arg(ap, struct cord *); - void *retval = NULL; - int res = tt_pthread_join(cord->id, &retval); - return res; -} - -int -cord_cojoin(struct cord *cord) -{ - assert(cord() != cord); /* Can't join self. */ - int rc = coio_call(cord_cojoin_cb, cord); - if (rc == 0 && !diag_is_empty(&cord->fiber->diag)) { - diag_move(&cord->fiber->diag, &fiber()->diag); - cord_destroy(cord); - /* re-throw exception in this fiber */ - diag_last_error(&fiber()->diag)->raise(); - } - cord_destroy(cord); - return rc; -} - diff --git a/src/fiber.cc b/src/fiber.cc index 80e3f3a7e0..b6436e8bc1 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -38,6 +38,25 @@ #include "assoc.h" #include "memory.h" #include "trigger.h" +#include <typeinfo> +#include "third_party/pmatomic.h" + +/* + * Defines a handler to be executed on exit from cord's thread func, + * accessible via cord()->on_exit (normally NULL). It is used to + * implement cord_cojoin. + */ +struct cord_on_exit { + void (*callback) (void*); + void *argument; +}; + +/* + * The special value distinct from any valid pointer to cord_on_exit + * structure AND NULL). This value is stored in cord()->on_exit by the + * thread func prior to a thread termination. + */ +#define CORD_ON_EXIT_WONT_RUN ((struct cord_on_exit*)1) static struct cord main_cord; __thread struct cord *cord_ptr = NULL; @@ -548,6 +567,7 @@ void cord_create(struct cord *cord, const char *name) { cord->id = pthread_self(); + cord->on_exit = NULL; cord->loop = cord->id == main_thread_id ? ev_default_loop(EVFLAG_AUTO) : ev_loop_new(EVFLAG_AUTO); slab_cache_create(&cord->slabc, &runtime); @@ -631,10 +651,26 @@ void *cord_thread_func(void *p) */ res = NULL; } + /* + * cord()->on_exit initially holds a NULL value. This field is + * change-once. + * Either a handler installation suceeds (in cord_cojoin) or prior + * to thread exit the thread func discovers that no handler was installed + * so far and it stores CORD_ON_EXIT_WONT_RUN to prevent a future + * handler installation (since a handler won't run anyway). + */ + struct cord_on_exit *handler = NULL; /* expected value */ + bool rc = pm_atomic_compare_exchange_strong(&cord()->on_exit, + &handler, + CORD_ON_EXIT_WONT_RUN); + if (!rc) { + assert(handler); + assert(handler->callback); + handler->callback(handler->argument); + } return res; } - int cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg) { @@ -675,6 +711,88 @@ cord_join(struct cord *cord) return res; } +struct cord_cojoin_state +{ + struct ev_loop *loop; + struct fiber *fiber; + struct ev_async async; + bool task_complete; +}; + +static void +cord_cojoin_on_exit(void *arg) +{ + struct cord_cojoin_state *b = + (struct cord_cojoin_state *)arg; + + assert(b->loop); + ev_async_send(b->loop, &b->async); +} + +static void +cord_cojoin_wakeup(struct ev_loop *loop, struct ev_async *ev, int revents) +{ + assert(ev); + assert(ev->data); + (void)loop; + (void)revents; + + struct cord_cojoin_state *b = (cord_cojoin_state *)ev->data; + + assert(b->fiber); + b->task_complete = true; + fiber_wakeup(b->fiber); +} + +int +cord_cojoin(struct cord *cord) +{ + assert(cord() != cord); /* Can't join self. */ + + struct cord_cojoin_state b; + b.loop = loop(); + b.fiber = fiber(); + b.task_complete = false; + + ev_async_init(&b.async, cord_cojoin_wakeup); + b.async.data = &b; + ev_async_start(loop(), &b.async); + + struct cord_on_exit handler = { cord_cojoin_on_exit, &b }; + + /* + * cord->on_exit initially holds a NULL value. This field is + * change-once. + */ + struct cord_on_exit *prev_handler = NULL; /* expected value */ + bool rc = pm_atomic_compare_exchange_strong(&cord->on_exit, + &prev_handler, &handler); + /* + * A handler installation fails either if the thread did exit or + * if someone is already joining this cord (BUG). + */ + if (!rc) { + /* Assume cord's thread already exited. */ + assert(prev_handler == CORD_ON_EXIT_WONT_RUN); + } else { + /* + * Wait until the thread exits. Prior to exit the thread invokes + * cord_cojoin_on_exit, signaling ev_async, making the event loop + * call cord_cojoin_wakeup, waking up this fiber again. + * + * The fiber is non-cancellable during the wait to avoid + * invalidating the state struct on stack. + */ + bool cancellable = fiber_set_cancellable(false); + fiber_yield(); + assert(b.task_complete); + fiber_set_cancellable(cancellable); + } + + ev_async_stop(loop(), &b.async); + return cord_join(cord); +} + void break_ev_loop_f(struct trigger * /* trigger */, void * /* event */) { diff --git a/src/fiber.h b/src/fiber.h index a1bb55379a..033061dd3a 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -167,6 +167,8 @@ struct fiber { enum { FIBER_CALL_STACK = 16 }; +struct cord_on_exit; + /** * @brief An independent execution unit that can be managed by a separate OS * thread. Each cord consists of fibers to implement cooperative multitasking @@ -184,6 +186,7 @@ struct cord { */ uint32_t max_fid; pthread_t id; + struct cord_on_exit *on_exit; /** A helper hash to map id -> fiber. */ struct mh_i32ptr_t *fiber_registry; /** All fibers */ -- GitLab