From ad37f51f1bb168ac972d3bde70d161d8ebd880da Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Sat, 10 Oct 2015 17:17:36 +0300 Subject: [PATCH] arm: fiber.cc -> fiber.c (first part) Avoid throwing exceptions from most fiber functions. --- src/box/applier.cc | 11 ++++----- src/box/iproto.cc | 3 +-- src/box/lua/call.cc | 4 +--- src/box/memtx_engine.cc | 20 ++++++++-------- src/box/recovery.cc | 1 + src/box/relay.cc | 2 ++ src/coeio.h | 18 +++----------- src/diag.h | 1 - src/fiber.cc | 52 +++++++++++++++++------------------------ src/fiber.h | 33 ++++++++++++++++++++++---- src/lua/fiber.cc | 2 ++ test/unit/coio.cc | 1 + test/unit/fiber.cc | 2 +- 13 files changed, 77 insertions(+), 73 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 29b82ce537..1d937b4564 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -366,6 +366,7 @@ applier_f(va_list ap) * See: https://github.com/tarantool/tarantool/issues/136 */ fiber_sleep(RECONNECT_DELAY); + fiber_testcancel(); } } @@ -399,12 +400,7 @@ applier_stop(struct applier *applier) const char *uri = uri_format(&applier->uri); say_crit("shutting down applier %s", uri); fiber_cancel(f); - /** - * If the applier died from an exception, don't throw it - * up. - */ - diag_clear(&f->diag); - fiber_join(f); /* doesn't throw due do diag_clear() */ + fiber_join(f); applier_set_state(applier, APPLIER_OFF); applier->reader = NULL; } @@ -414,7 +410,8 @@ applier_wait(struct applier *applier) { assert(applier->reader != NULL); auto fiber_guard = make_scoped_guard([=] { applier->reader = NULL; }); - fiber_join(applier->reader); /* may throw */ + fiber_join(applier->reader); + fiber_testerror(); } struct applier * diff --git a/src/box/iproto.cc b/src/box/iproto.cc index f6c0e31f4e..a0abef1fb5 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -997,8 +997,7 @@ iproto_set_listen(const char *uri) fiber_yield(); if (! diag_is_empty(&msg.diag)) { diag_move(&msg.diag, &fiber()->diag); - Exception *e = (Exception *) diag_last_error(&fiber()->diag); - e->raise(); + fiber_testerror(); } } diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index c012e5ed42..c6c67e04c2 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -464,9 +464,7 @@ execute_c_call(struct func *func, struct request *request, struct obuf *out) } if (rc != 0) { - Exception *e = (Exception *) diag_last_error(&fiber()->diag); - if (e != NULL) - e->raise(); + fiber_testerror(); tnt_raise(ClientError, ER_PROC_C, "unknown procedure error"); } diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 348c1b1350..f4d5bc5a44 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -1113,11 +1113,11 @@ MemtxEngine::waitCheckpoint() assert(m_checkpoint); assert(m_checkpoint->waiting_for_snap_thread); - int result; - try { - /* wait for memtx-part snapshot completion */ - result = cord_cojoin(&m_checkpoint->cord); - } catch (Exception *e) { + /* wait for memtx-part snapshot completion */ + int result = cord_cojoin(&m_checkpoint->cord); + + Exception *e = (Exception *) diag_last_error(&fiber()->diag); + if (e != NULL) { e->log(); result = -1; SystemError *se = type_cast(SystemError, e); @@ -1160,12 +1160,12 @@ MemtxEngine::abortCheckpoint() * An error in the other engine's first phase. */ if (m_checkpoint->waiting_for_snap_thread) { - try { - /* wait for memtx-part snapshot completion */ - cord_cojoin(&m_checkpoint->cord); - } catch (Exception *e) { + /* wait for memtx-part snapshot completion */ + cord_cojoin(&m_checkpoint->cord); + + Exception *e = (Exception *) diag_last_error(&fiber()->diag); + if (e) e->log(); - } m_checkpoint->waiting_for_snap_thread = false; } diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 6381d4f7c7..d3b176d049 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -536,6 +536,7 @@ recovery_stop_local(struct recovery *r) r->watcher = NULL; fiber_cancel(f); fiber_join(f); + fiber_testerror(); } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 82f47471f5..25dd6cf065 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -111,6 +111,7 @@ relay_join(int fd, struct xrow_header *packet, cord_costart(&relay.cord, "join", relay_join_f, &relay); cord_cojoin(&relay.cord); + fiber_testerror(); /** * Call the server-side hook which stores the replica uuid * in _cluster space after sending the last row but before @@ -254,6 +255,7 @@ relay_subscribe(int fd, struct xrow_header *packet, struct cord cord; cord_costart(&cord, "subscribe", relay_subscribe_f, &relay); cord_cojoin(&cord); + fiber_testerror(); } void diff --git a/src/coeio.h b/src/coeio.h index 374d6db4c5..6226bc5e92 100644 --- a/src/coeio.h +++ b/src/coeio.h @@ -40,7 +40,7 @@ #if defined(__cplusplus) extern "C" { -#endif +#endif /* defined(__cplusplus) */ /** * Asynchronous IO Tasks (libeio wrapper) @@ -127,19 +127,7 @@ coio_getaddrinfo(const char *host, const char *port, /** \endcond public */ #if defined(__cplusplus) -} -#endif - -struct cord; -/** - * \brief Yield until \a cord has terminated. - * If \a cord has terminated with an uncaught exception - * **re-throws** this exception in the calling cord/fiber. - * \param cord cord - * \sa pthread_join() - * \return 0 on success - */ -int -cord_cojoin(struct cord *cord); +} /* extern "C" */ +#endif /* defined(__cplusplus) */ #endif /* TARANTOOL_COEIO_H_INCLUDED */ diff --git a/src/diag.h b/src/diag.h index 91db765b56..7427aa1bd4 100644 --- a/src/diag.h +++ b/src/diag.h @@ -178,7 +178,6 @@ diag_last_error(struct diag *diag) return diag->last; } - /** * A helper for tnt_error to avoid cyclic includes (fiber.h and exception.h) * \cond false diff --git a/src/fiber.cc b/src/fiber.cc index 3eb90638fe..8451da94b3 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -216,10 +216,9 @@ fiber_join(struct fiber *fiber) /* Move exception to the caller */ diag_move(&fiber->diag, &fiber()->diag); Exception *e = (Exception *) diag_last_error(&fiber()->diag); - /** Raise exception again, unless it's FiberCancelException */ - if (e != NULL && type_cast(FiberCancelException, e) == NULL) - e->raise(); - fiber_testcancel(); + /** Don't bother with propagation of FiberCancelException */ + if (e != NULL && type_cast(FiberCancelException, e) != NULL) + diag_clear(&fiber()->diag); } /** @@ -307,7 +306,6 @@ fiber_sleep(ev_tstamp delay) if (delay == 0) { ev_idle_stop(loop(), &cord()->idle_event); } - fiber_testcancel(); } void @@ -618,12 +616,15 @@ struct cord_thread_arg pthread_cond_t start_cond; }; +/** + * Cord main thread function. It's not exception-safe, the + * body function must catch all exceptions instead. + */ void *cord_thread_func(void *p) { struct cord_thread_arg *ct_arg = (struct cord_thread_arg *) p; cord() = ct_arg->cord; slab_cache_set_thread(&cord()->slabc); - struct cord *cord = cord(); cord_init(ct_arg->name); /** Can't possibly be the main thread */ assert(cord->id != main_thread_id); @@ -633,21 +634,7 @@ void *cord_thread_func(void *p) ct_arg->is_started = true; tt_pthread_cond_signal(&ct_arg->start_cond); tt_pthread_mutex_unlock(&ct_arg->start_mutex); - void *res; - try { - res = f(arg); - /* - * Clear a possible leftover exception object - * to not confuse the invoker of the thread. - */ - diag_clear(&cord->fiber->diag); - } catch (Exception *) { - /* - * The exception is now available to the caller - * via cord->diag. - */ - res = NULL; - } + void *res = f(arg); /* * cord()->on_exit initially holds NULL. This field is * change-once. @@ -693,17 +680,16 @@ cord_join(struct cord *cord) assert(cord() != cord); /* Can't join self. */ void *retval = NULL; int res = tt_pthread_join(cord->id, &retval); - if (res == 0 && !diag_is_empty(&cord->fiber->diag)) { + if (res == 0) { /* * cord_thread_func guarantees that * cord->exception is only set if the subject cord * has terminated with an uncaught exception, - * transfer it to the caller. + * transfer it to the caller. If there is + * no exception, this clears the caller's + * diagnostics area. */ diag_move(&cord->fiber->diag, &fiber()->diag); - cord_destroy(cord); - Exception *e = (Exception *) diag_last_error(&fiber()->diag); - e->raise(); } cord_destroy(cord); return res; @@ -815,8 +801,13 @@ cord_costart_thread_func(void *arg) { struct costart_ctx ctx = *(struct costart_ctx *) arg; free(arg); + struct fiber *f; - struct fiber *f = fiber_new("main", ctx.run); + try { + f = fiber_new("main", ctx.run); + } catch (...) { + return NULL; + } struct trigger break_ev_loop = { RLIST_LINK_INITIALIZER, break_ev_loop_f, NULL, NULL @@ -831,10 +822,11 @@ cord_costart_thread_func(void *arg) /* The fiber hasn't died right away at start. */ ev_run(loop(), 0); } + /* + * Preserve the exception with which the main fiber + * terminated, if any. + */ diag_move(&f->diag, &fiber()->diag); - Exception *e = (Exception *) diag_last_error(&fiber()->diag); - if (e != NULL && type_cast(FiberCancelException, e) == NULL) - e->raise(); return NULL; } diff --git a/src/fiber.h b/src/fiber.h index 707f9196e5..e8052b6d5e 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -207,10 +207,10 @@ extern __thread struct cord *cord_ptr; /** * Start a cord with the given thread function. * The return value of the function can be collected - * with cord_join(). If the function terminates with - * an exception, the return value is NULL, and cord_join() - * moves the exception from the terminated cord to - * the caller of cord_join(). + * with cord_join(). The function *must catch* all + * exceptions and leave them in the diagnostics + * area, cord_join() moves the exception from the + * terminated cord to the caller of cord_join(). */ int cord_start(struct cord *cord, const char *name, @@ -225,6 +225,23 @@ cord_start(struct cord *cord, const char *name, int cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg); +/** + * Yield until \a cord has terminated. + * + * On success: + * + * If \a cord has terminated with an uncaught exception + * the exception is moved to the current fiber's diagnostics + * area, otherwise the current fiber's diagnostics area is + * cleared. + * @param cord cord + * @sa pthread_join() + * + * @return 0 on success, pthread_join return code on error + */ +int +cord_cojoin(struct cord *cord); + /** * Wait for \a cord to terminate. If \a cord has already * terminated, then returns immediately. @@ -437,6 +454,14 @@ fiber_testcancel(void) tnt_raise(FiberCancelException); } +static inline void +fiber_testerror(void) +{ + Exception *e = (Exception *) diag_last_error(&fiber()->diag); + if (e) + e->raise(); +} + #endif /* defined(__cplusplus) */ #endif /* TARANTOOL_FIBER_H_INCLUDED */ diff --git a/src/lua/fiber.cc b/src/lua/fiber.cc index 0645f2b1ec..a028f4ec86 100644 --- a/src/lua/fiber.cc +++ b/src/lua/fiber.cc @@ -439,6 +439,7 @@ lbox_fiber_sleep(struct lua_State *L) luaL_error(L, "fiber.sleep(delay): bad arguments"); double delay = lua_tonumber(L, 1); fiber_sleep(delay); + fiber_testcancel(); return 0; } @@ -446,6 +447,7 @@ static int lbox_fiber_yield(struct lua_State * /* L */) { fiber_sleep(0); + fiber_testcancel(); return 0; } diff --git a/test/unit/coio.cc b/test/unit/coio.cc index 989d851cdf..aeb1154692 100644 --- a/test/unit/coio.cc +++ b/test/unit/coio.cc @@ -15,6 +15,7 @@ touch_f(va_list ap) fail_unless(rc == 1); fflush(f); fiber_sleep(0.01); + fiber_testcancel(); } } diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc index 76d3e4c8e4..c6a8d8983d 100644 --- a/test/unit/fiber.cc +++ b/test/unit/fiber.cc @@ -64,6 +64,7 @@ fiber_join_test() fiber_wakeup(fiber); try { fiber_join(fiber); + fiber_testerror(); fail("exception not raised", ""); } catch (Exception *e) { note("exception propagated"); @@ -89,7 +90,6 @@ fiber_join_test() fiber_yield(); note("by this time the fiber should be dead already"); fiber_cancel(fiber); - diag_clear(&fiber->diag); fiber_join(fiber); footer(); -- GitLab