From 295399ce3d73c72d4d7dd802156af1c75f9c50b7 Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Mon, 31 Oct 2016 18:20:15 +0300 Subject: [PATCH] fiber: return fiber function return code from fiber_join and cord_join To be used as an indicator of exception. From now on one should write if (fiber_join(...)) // or cord_join(...) diag_raise(); instead of fiber_join(...); diag_raise(); Now we can make diag_raise() assert that diag is not empty. Closes #1847 --- src/box/memtx_engine.cc | 19 ++++++------------- src/box/recovery.cc | 4 ++-- src/box/relay.cc | 8 ++++---- src/box/vinyl.c | 2 -- src/diag.h | 4 ++-- src/fiber.c | 36 +++++++++++++++++++++--------------- src/fiber.h | 14 +++++++------- test/app-tap/module_api.c | 4 ++-- test/unit/fiber.cc | 4 ++-- 9 files changed, 46 insertions(+), 49 deletions(-) diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 1b87dcb551..e1e7609ef3 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -1312,12 +1312,8 @@ MemtxEngine::waitCheckpoint(struct vclock *vclock) /* wait for memtx-part snapshot completion */ int result = cord_cojoin(&m_checkpoint->cord); - - struct error *e = diag_last_error(&fiber()->diag); - if (e != NULL) { - error_log(e); - result = -1; - } + if (result != 0) + error_log(diag_last_error(diag_get())); m_checkpoint->waiting_for_snap_thread = false; return result; @@ -1359,11 +1355,8 @@ MemtxEngine::abortCheckpoint() */ if (m_checkpoint->waiting_for_snap_thread) { /* wait for memtx-part snapshot completion */ - cord_cojoin(&m_checkpoint->cord); - - struct error *e = diag_last_error(&fiber()->diag); - if (e) - error_log(e); + if (cord_cojoin(&m_checkpoint->cord) != 0) + error_log(diag_last_error(diag_get())); m_checkpoint->waiting_for_snap_thread = false; } @@ -1453,8 +1446,8 @@ MemtxEngine::join(struct xstream *stream) /* Send snapshot using a thread */ struct cord cord; cord_costart(&cord, "initial_join", memtx_initial_join_f, &arg); - cord_cojoin(&cord); - diag_raise(); + if (cord_cojoin(&cord) != 0) + diag_raise(); } /** diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 2592856442..224e8ab0ae 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -559,8 +559,8 @@ recovery_stop_local(struct recovery *r) struct fiber *f = r->watcher; r->watcher = NULL; fiber_cancel(f); - fiber_join(f); - diag_raise(); + if (fiber_join(f) != 0) + diag_raise(); } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 77aa0318b5..8f606afcac 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -124,8 +124,8 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, }); cord_costart(&relay.cord, "final_join", relay_final_join_f, &relay); - cord_cojoin(&relay.cord); - diag_raise(); + if (cord_cojoin(&relay.cord) != 0) + diag_raise(); } static void @@ -231,8 +231,8 @@ relay_subscribe(int fd, uint64_t sync, struct server *server, struct cord cord; cord_costart(&cord, "subscribe", relay_subscribe_f, &relay); - cord_cojoin(&cord); - diag_raise(); + if (cord_cojoin(&cord) != 0) + diag_raise(); } static void diff --git a/src/box/vinyl.c b/src/box/vinyl.c index c2f697891e..547f1a3ce8 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -9359,8 +9359,6 @@ vy_index_send(struct vy_index *index, vy_send_row_f sendrow, void *ctx) if (rc != 0) break; } - if (rc == 0 && stmt == NULL) - rc = 1; vy_read_iterator_close(&ri); vy_stmt_unref(key); return rc; diff --git a/src/diag.h b/src/diag.h index 57ad223fc0..85955ab5ad 100644 --- a/src/diag.h +++ b/src/diag.h @@ -237,8 +237,8 @@ static inline void diag_raise(void) { struct error *e = diag_last_error(diag_get()); - if (e) - error_raise(e); + assert(e != NULL); + error_raise(e); } diff --git a/src/fiber.c b/src/fiber.c index 19bc04c660..1c826f507a 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -267,7 +267,7 @@ fiber_reschedule(void) fiber_yield(); } -void +int fiber_join(struct fiber *fiber) { assert(fiber->flags & FIBER_IS_JOINABLE); @@ -279,13 +279,20 @@ fiber_join(struct fiber *fiber) assert(fiber_is_dead(fiber)); bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED; /* Move exception to the caller */ - diag_move(&fiber->diag, &fiber()->diag); + int ret = fiber->f_ret; + if (ret != 0) { + assert(!diag_is_empty(&fiber->diag)); + diag_move(&fiber->diag, &fiber()->diag); + } /** Don't bother with propagation of FiberIsCancelled */ - if (fiber_was_cancelled) + if (fiber_was_cancelled) { diag_clear(&fiber()->diag); + ret = 0; + } /* The fiber is already dead. */ fiber_recycle(fiber); + return ret; } /** @@ -499,7 +506,8 @@ fiber_loop(MAYBE_UNUSED void *data) struct fiber *fiber = fiber(); assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0); - if (fiber_invoke(fiber->f, fiber->f_data) != 0) { + fiber->f_ret = fiber_invoke(fiber->f, fiber->f_data); + if (fiber->f_ret != 0) { struct error *e = diag_last_error(&fiber->diag); /* diag must not be empty on error */ assert(e != NULL || fiber->flags & FIBER_IS_CANCELLED); @@ -907,17 +915,15 @@ cord_join(struct cord *cord) void *retval = NULL; int res = tt_pthread_join(cord->id, &retval); if (res == 0) { - /* - * cord_thread_func guarantees that - * cord->exception is only set if the subject cord - * has terminated with an uncaught exception, - * transfer it to the caller. If there is - * no exception, this clears the caller's - * diagnostics area. - */ - diag_move(&cord->fiber->diag, &fiber()->diag); - } else + struct fiber *f = cord->fiber; + if (f->f_ret != 0) { + assert(!diag_is_empty(&f->diag)); + diag_move(&f->diag, diag_get()); + res = -1; + } + } else { diag_set(SystemError, "failed to join with thread"); + } cord_destroy(cord); return res; } @@ -1054,7 +1060,7 @@ cord_costart_thread_func(void *arg) * terminated, if any. */ assert(fiber_is_dead(f)); - fiber_join(f); + fiber()->f_ret = fiber_join(f); return NULL; } diff --git a/src/fiber.h b/src/fiber.h index 9967161fc5..98d61d0da6 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -188,8 +188,9 @@ fiber_set_joinable(struct fiber *fiber, bool yesno); * @pre FIBER_IS_JOINABLE flag is set. * * \param f fiber to be woken up + * \return fiber function ret code */ -API_EXPORT void +API_EXPORT int fiber_join(struct fiber *f); /** @@ -276,6 +277,7 @@ struct fiber { */ fiber_func f; va_list f_data; + int f_ret; /** Fiber local storage */ void *fls[FIBER_KEY_MAX]; /** Exception which caused this fiber's death. */ @@ -427,7 +429,8 @@ cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg); * @param cord cord * @sa pthread_join() * - * @return 0 on success, -1 if pthread_join failed. + * @return 0 on success, -1 if pthread_join failed or the + * thread function terminated with an exception. */ int cord_cojoin(struct cord *cord); @@ -440,11 +443,8 @@ cord_cojoin(struct cord *cord); * preserves the exception in the caller's cord. * * @param cord cord - * @retval 0 pthread_join succeeded. - * If the thread function terminated with an - * exception, the exception is raised in the - * caller cord. - * @retval -1 pthread_join failed. + * @return 0 on success, -1 if pthread_join failed or the + * thread function terminated with an exception. */ int cord_join(struct cord *cord); diff --git a/test/app-tap/module_api.c b/test/app-tap/module_api.c index 93345765e6..2eb2a04660 100644 --- a/test/app-tap/module_api.c +++ b/test/app-tap/module_api.c @@ -204,9 +204,9 @@ test_fiber(lua_State *L) fiber_set_joinable(fiber, true); fiber_start(fiber); fiber_cancel(fiber); - fiber_join(fiber); + int ret = fiber_join(fiber); box_error_t *err = box_error_last(); - lua_pushboolean(L, (int )(err == NULL || box_error_code(err) != 10)); + lua_pushboolean(L, (int)(ret == 0 || box_error_code(err) != 10)); return 1; } diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc index 8806535a5c..b8341c0c8a 100644 --- a/test/unit/fiber.cc +++ b/test/unit/fiber.cc @@ -67,8 +67,8 @@ fiber_join_test() fiber_set_joinable(fiber, true); fiber_wakeup(fiber); try { - fiber_join(fiber); - diag_raise(); + if (fiber_join(fiber) != 0) + diag_raise(); fail("exception not raised", ""); } catch (Exception *e) { note("exception propagated"); -- GitLab