diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 1b87dcb551f884ab8f70083ee254d4fc0c8a7596..e1e7609ef38c1b50f6df9b64c0b48a02541121cb 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -1312,12 +1312,8 @@ MemtxEngine::waitCheckpoint(struct vclock *vclock) /* wait for memtx-part snapshot completion */ int result = cord_cojoin(&m_checkpoint->cord); - - struct error *e = diag_last_error(&fiber()->diag); - if (e != NULL) { - error_log(e); - result = -1; - } + if (result != 0) + error_log(diag_last_error(diag_get())); m_checkpoint->waiting_for_snap_thread = false; return result; @@ -1359,11 +1355,8 @@ MemtxEngine::abortCheckpoint() */ if (m_checkpoint->waiting_for_snap_thread) { /* wait for memtx-part snapshot completion */ - cord_cojoin(&m_checkpoint->cord); - - struct error *e = diag_last_error(&fiber()->diag); - if (e) - error_log(e); + if (cord_cojoin(&m_checkpoint->cord) != 0) + error_log(diag_last_error(diag_get())); m_checkpoint->waiting_for_snap_thread = false; } @@ -1453,8 +1446,8 @@ MemtxEngine::join(struct xstream *stream) /* Send snapshot using a thread */ struct cord cord; cord_costart(&cord, "initial_join", memtx_initial_join_f, &arg); - cord_cojoin(&cord); - diag_raise(); + if (cord_cojoin(&cord) != 0) + diag_raise(); } /** diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 25928564421386541b07e80598c22ca34ed3ec33..224e8ab0ae523a7c5d46ed77e046f55afff8eb3a 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -559,8 +559,8 @@ recovery_stop_local(struct recovery *r) struct fiber *f = r->watcher; r->watcher = NULL; fiber_cancel(f); - fiber_join(f); - diag_raise(); + if (fiber_join(f) != 0) + diag_raise(); } } diff --git a/src/box/relay.cc b/src/box/relay.cc index 77aa0318b5fe2895a60a3b6afc65b4439b3e09f1..8f606afcaccf542c226e43a2037a7e9b03aa0b87 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -124,8 +124,8 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, }); cord_costart(&relay.cord, "final_join", relay_final_join_f, &relay); - cord_cojoin(&relay.cord); - diag_raise(); + if (cord_cojoin(&relay.cord) != 0) + diag_raise(); } static void @@ -231,8 +231,8 @@ relay_subscribe(int fd, uint64_t sync, struct server *server, struct cord cord; cord_costart(&cord, "subscribe", relay_subscribe_f, &relay); - cord_cojoin(&cord); - diag_raise(); + if (cord_cojoin(&cord) != 0) + diag_raise(); } static void diff --git a/src/box/vinyl.c b/src/box/vinyl.c index c2f697891edd4fa61afa48698573e4ff123b2622..547f1a3ce8fa00f03e3fa214a41dfa1174e9f50a 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -9359,8 +9359,6 @@ vy_index_send(struct vy_index *index, vy_send_row_f sendrow, void *ctx) if (rc != 0) break; } - if (rc == 0 && stmt == NULL) - rc = 1; vy_read_iterator_close(&ri); vy_stmt_unref(key); return rc; diff --git a/src/diag.h b/src/diag.h index 57ad223fc0a1c754a3d3ffe1e4d47df8c652beb8..85955ab5adeb740d126b42700ead50bd102ae8a1 100644 --- a/src/diag.h +++ b/src/diag.h @@ -237,8 +237,8 @@ static inline void diag_raise(void) { struct error *e = diag_last_error(diag_get()); - if (e) - error_raise(e); + assert(e != NULL); + error_raise(e); } diff --git a/src/fiber.c b/src/fiber.c index 19bc04c660f348db7984e99c68e3a2e4aef19f65..1c826f507afb8a6dfdd0f27ce63ba486a5c52147 100644 --- a/src/fiber.c +++ b/src/fiber.c @@ -267,7 +267,7 @@ fiber_reschedule(void) fiber_yield(); } -void +int fiber_join(struct fiber *fiber) { assert(fiber->flags & FIBER_IS_JOINABLE); @@ -279,13 +279,20 @@ fiber_join(struct fiber *fiber) assert(fiber_is_dead(fiber)); bool fiber_was_cancelled = fiber->flags & FIBER_IS_CANCELLED; /* Move exception to the caller */ - diag_move(&fiber->diag, &fiber()->diag); + int ret = fiber->f_ret; + if (ret != 0) { + assert(!diag_is_empty(&fiber->diag)); + diag_move(&fiber->diag, &fiber()->diag); + } /** Don't bother with propagation of FiberIsCancelled */ - if (fiber_was_cancelled) + if (fiber_was_cancelled) { diag_clear(&fiber()->diag); + ret = 0; + } /* The fiber is already dead. */ fiber_recycle(fiber); + return ret; } /** @@ -499,7 +506,8 @@ fiber_loop(MAYBE_UNUSED void *data) struct fiber *fiber = fiber(); assert(fiber != NULL && fiber->f != NULL && fiber->fid != 0); - if (fiber_invoke(fiber->f, fiber->f_data) != 0) { + fiber->f_ret = fiber_invoke(fiber->f, fiber->f_data); + if (fiber->f_ret != 0) { struct error *e = diag_last_error(&fiber->diag); /* diag must not be empty on error */ assert(e != NULL || fiber->flags & FIBER_IS_CANCELLED); @@ -907,17 +915,15 @@ cord_join(struct cord *cord) void *retval = NULL; int res = tt_pthread_join(cord->id, &retval); if (res == 0) { - /* - * cord_thread_func guarantees that - * cord->exception is only set if the subject cord - * has terminated with an uncaught exception, - * transfer it to the caller. If there is - * no exception, this clears the caller's - * diagnostics area. - */ - diag_move(&cord->fiber->diag, &fiber()->diag); - } else + struct fiber *f = cord->fiber; + if (f->f_ret != 0) { + assert(!diag_is_empty(&f->diag)); + diag_move(&f->diag, diag_get()); + res = -1; + } + } else { diag_set(SystemError, "failed to join with thread"); + } cord_destroy(cord); return res; } @@ -1054,7 +1060,7 @@ cord_costart_thread_func(void *arg) * terminated, if any. */ assert(fiber_is_dead(f)); - fiber_join(f); + fiber()->f_ret = fiber_join(f); return NULL; } diff --git a/src/fiber.h b/src/fiber.h index 9967161fc58b8a3ad55e6019acf8b5000389bb3e..98d61d0da6bcf9cab571e7965bcbb3bb430eb924 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -188,8 +188,9 @@ fiber_set_joinable(struct fiber *fiber, bool yesno); * @pre FIBER_IS_JOINABLE flag is set. * * \param f fiber to be woken up + * \return fiber function ret code */ -API_EXPORT void +API_EXPORT int fiber_join(struct fiber *f); /** @@ -276,6 +277,7 @@ struct fiber { */ fiber_func f; va_list f_data; + int f_ret; /** Fiber local storage */ void *fls[FIBER_KEY_MAX]; /** Exception which caused this fiber's death. */ @@ -427,7 +429,8 @@ cord_costart(struct cord *cord, const char *name, fiber_func f, void *arg); * @param cord cord * @sa pthread_join() * - * @return 0 on success, -1 if pthread_join failed. + * @return 0 on success, -1 if pthread_join failed or the + * thread function terminated with an exception. */ int cord_cojoin(struct cord *cord); @@ -440,11 +443,8 @@ cord_cojoin(struct cord *cord); * preserves the exception in the caller's cord. * * @param cord cord - * @retval 0 pthread_join succeeded. - * If the thread function terminated with an - * exception, the exception is raised in the - * caller cord. - * @retval -1 pthread_join failed. + * @return 0 on success, -1 if pthread_join failed or the + * thread function terminated with an exception. */ int cord_join(struct cord *cord); diff --git a/test/app-tap/module_api.c b/test/app-tap/module_api.c index 93345765e644a5d204c0942f8f9f0dd349aa0a69..2eb2a046604a4249e8bcbab20a8b085c70a4b584 100644 --- a/test/app-tap/module_api.c +++ b/test/app-tap/module_api.c @@ -204,9 +204,9 @@ test_fiber(lua_State *L) fiber_set_joinable(fiber, true); fiber_start(fiber); fiber_cancel(fiber); - fiber_join(fiber); + int ret = fiber_join(fiber); box_error_t *err = box_error_last(); - lua_pushboolean(L, (int )(err == NULL || box_error_code(err) != 10)); + lua_pushboolean(L, (int)(ret == 0 || box_error_code(err) != 10)); return 1; } diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc index 8806535a5c640e21c2115246b5a1b20ca4825d2c..b8341c0c8a623fce5f673183fadaad3175e3b958 100644 --- a/test/unit/fiber.cc +++ b/test/unit/fiber.cc @@ -67,8 +67,8 @@ fiber_join_test() fiber_set_joinable(fiber, true); fiber_wakeup(fiber); try { - fiber_join(fiber); - diag_raise(); + if (fiber_join(fiber) != 0) + diag_raise(); fail("exception not raised", ""); } catch (Exception *e) { note("exception propagated");