diff --git a/src/box/applier.cc b/src/box/applier.cc index 69c3a3b529827521c689903d0bdb0561f5c004c4..162bc9d81c5fbaffbe1ac5c579284782b95a51a3 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -1801,8 +1801,7 @@ applier_free(void) { for (int i = 0; i < replication_threads; i++) { struct applier_thread *thread = applier_threads[i]; - tt_pthread_cancel(thread->cord.id); - tt_pthread_join(thread->cord.id, NULL); + cord_cancel_and_join(&thread->cord); free(thread); } free(applier_threads); diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 27c9424b52095976280d1eb8c87694aa98e5dc5f..958babd0ee3b3eea9a8e15cc344dfb30711ba216 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -3302,8 +3302,7 @@ void iproto_free(void) { for (int i = 0; i < iproto_threads_count; i++) { - tt_pthread_cancel(iproto_threads[i].net_cord.id); - tt_pthread_join(iproto_threads[i].net_cord.id, NULL); + cord_cancel_and_join(&iproto_threads[i].net_cord); /* * Close socket descriptor to prevent hot standby instance * failing to bind in case it tries to bind before socket diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 597c85f35f1826af487a9cab235b06041379c30e..3c2c1930e64f94e50eaea288dc0b40df38f56847 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -778,10 +778,8 @@ checkpoint_cancel(struct checkpoint *ckpt) * for it to terminate so as to eliminate the possibility * of use-after-free. */ - if (ckpt->waiting_for_snap_thread) { - tt_pthread_cancel(ckpt->cord.id); - tt_pthread_join(ckpt->cord.id, NULL); - } + if (ckpt->waiting_for_snap_thread) + cord_cancel_and_join(&ckpt->cord); checkpoint_delete(ckpt); } @@ -793,8 +791,7 @@ replica_join_cancel(struct cord *replica_join_cord) * running and wait for it to terminate so as to * eliminate the possibility of use-after-free. */ - tt_pthread_cancel(replica_join_cord->id); - tt_pthread_join(replica_join_cord->id, NULL); + cord_cancel_and_join(replica_join_cord); } static int diff --git a/src/box/relay.cc b/src/box/relay.cc index 86d3fb9e8d8496cb20863e2c0c6d9defc8e497be..0101dd267f2b31fba8f356b185f09e2120c2f401 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -353,9 +353,8 @@ relay_cancel(struct relay *relay) { /* Check that the thread is running first. */ if (relay->cord.id != 0) { - if (tt_pthread_cancel(relay->cord.id) == ESRCH) - return; - tt_pthread_join(relay->cord.id, NULL); + cord_cancel_and_join(&relay->cord); + relay->cord.id = 0; } } @@ -382,6 +381,11 @@ relay_exit(struct relay *relay) static void relay_stop(struct relay *relay) { + /* + * The thread has to be already stopped or it could use the destroyed + * data below. + */ + assert(relay->cord.id == 0); struct relay_gc_msg *gc_msg, *next_gc_msg; stailq_foreach_entry_safe(gc_msg, next_gc_msg, &relay->pending_gc, in_pending) { @@ -393,12 +397,6 @@ relay_stop(struct relay *relay) recovery_delete(relay->r); relay->r = NULL; relay->state = RELAY_STOPPED; - /* - * Needed to track whether relay thread is running or not - * for relay_cancel(). Id is reset to a positive value - * upon cord_create(). - */ - relay->cord.id = 0; /* * If relay is stopped then lag statistics should * be updated on next new ACK packets obtained. diff --git a/src/box/vy_run.c b/src/box/vy_run.c index 8ad7835f12aefcd345cb39ce279efb9882d315bd..973cb29e6ab8abae2ef4109b638c540ac322a101 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -166,8 +166,7 @@ vy_run_env_stop_readers(struct vy_run_env *env) { for (int i = 0; i < env->reader_pool_size; i++) { struct vy_run_reader *reader = &env->reader_pool[i]; - tt_pthread_cancel(reader->cord.id); - tt_pthread_join(reader->cord.id, NULL); + cord_cancel_and_join(&reader->cord); } free(env->reader_pool); } diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 65739345db77478a71e14c2a7581d1dbe7e829fb..64b17d76f59bcbcbf7a89f1291c3807cc4ed79d0 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -365,8 +365,7 @@ vy_worker_pool_stop(struct vy_worker_pool *pool) assert(pool->workers != NULL); for (int i = 0; i < pool->size; i++) { struct vy_worker *worker = &pool->workers[i]; - tt_pthread_cancel(worker->cord.id); - tt_pthread_join(worker->cord.id, NULL); + cord_cancel_and_join(&worker->cord); } free(pool->workers); pool->workers = NULL; diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c index a408cf57980119ffb40b779ae20185c952a5107e..47e1382d7d84e43416d27415febec48c9d6d883f 100644 --- a/src/lib/core/fiber.c +++ b/src/lib/core/fiber.c @@ -50,6 +50,8 @@ extern void cord_on_yield(void); static struct fiber_slice zero_slice = {.warn = 0.0, .err = 0.0}; static struct fiber_slice default_slice = {.warn = 0.5, .err = 1.0}; +/** Number of cord-threads still running right now. */ +static int cord_count = 0; static inline void clock_stat_add_delta(struct clock_stat *stat, uint64_t clock_delta) @@ -1684,6 +1686,8 @@ cord_exit(struct cord *cord) void cord_destroy(struct cord *cord) { + assert(cord->id == 0 || pthread_equal(cord->id, pthread_self())); + cord->id = 0; slab_cache_set_thread(&cord->slabc); if (cord->loop) ev_loop_destroy(cord->loop); @@ -1769,6 +1773,7 @@ cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg) diag_set(SystemError, "failed to create thread"); goto end; } + pm_atomic_fetch_add(&cord_count, 1); res = 0; while (! ct_arg.is_started) tt_pthread_cond_wait(&ct_arg.start_cond, &ct_arg.start_mutex); @@ -1788,10 +1793,16 @@ cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg) int cord_join(struct cord *cord) { - assert(cord() != cord); /* Can't join self. */ + assert(cord() != cord); + assert(cord->id != 0); + void *retval = NULL; int res = tt_pthread_join(cord->id, &retval); if (res == 0) { + int old_cord_count = pm_atomic_fetch_sub(&cord_count, 1); + assert(old_cord_count > 0); + (void)old_cord_count; + cord->id = 0; struct fiber *f = cord->fiber; if (f->f_ret != 0) { assert(!diag_is_empty(&f->diag)); @@ -1842,7 +1853,8 @@ cord_cojoin_wakeup(struct ev_loop *loop, struct ev_async *ev, int revents) int cord_cojoin(struct cord *cord) { - assert(cord() != cord); /* Can't join self. */ + assert(cord() != cord); + assert(cord->id != 0); struct cord_cojoin_ctx ctx; ctx.loop = loop(); @@ -1878,6 +1890,7 @@ cord_cojoin(struct cord *cord) * cord_cojoin_wakeup, waking up this fiber again. */ do { + assert(cord->id != 0); fiber_yield(); } while (!ctx.task_complete); } @@ -1976,6 +1989,29 @@ cord_slab_cache(void) return &cord()->slabc; } +void +cord_cancel_and_join(struct cord *cord) +{ + assert(cord->id != 0); + tt_pthread_cancel(cord->id); + if (tt_pthread_join(cord->id, NULL) != 0) + panic("failed to join a canceled thread"); + int old_cord_count = pm_atomic_fetch_sub(&cord_count, 1); + assert(old_cord_count > 0); + (void)old_cord_count; + /* + * Can't destroy the cord safely. The cancellation could even happen + * before the cord was properly initialized in its own thread. It might + * be fixed if cord would be initialized before its thread is started. + * + * Also obviously even if the creation would be fine, the destruction + * can't free everything. The cord could have some resources allocated + * on the heap with pointers not stored anywhere in struct cord - they + * can't be possibly located. + */ + memset(cord, 0, sizeof(*cord)); +} + static NOINLINE int check_stack_direction(void *prev_stack_frame) { diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h index 7f1dd305bf5fc3c54b33a5184fca08d8ba9a47da..0f282d8d700de7fc5446f096eb3c54d00805f7f5 100644 --- a/src/lib/core/fiber.h +++ b/src/lib/core/fiber.h @@ -895,6 +895,15 @@ cord_is_main(void); void cord_collect_garbage(struct cord *cord); +/** + * Pthread-cancel the thread and join it in a blocking way, without yielding. + * That way is the only possible one if the event loop is already destroyed. + * Should only be used as an emergency, because all the cord resources simply + * leak. + */ +void +cord_cancel_and_join(struct cord *cord); + /** * @brief Create a new system fiber. * diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c index 8b36710017356b1ef18b1e2c0a5d8c7fb53662b6..d0eb8ffb7cb532df166305a324843ea3136cd002 100644 --- a/test/unit/cbus_hang.c +++ b/test/unit/cbus_hang.c @@ -138,7 +138,7 @@ main_f(va_list ap) tt_pthread_mutex_unlock(&(hang_endpoint.mutex)); /* Hack end */ - tt_pthread_join(canceled_worker.id, NULL); + cord_join(&canceled_worker); unsigned join_timeout = 5; signal(SIGALRM, join_fail); // For exit in a hang case diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc index 5b42a4aae6da1e31e552468b761d736c5b83e99e..add51eea47d768f656fd0a26d5f050a83304924e 100644 --- a/test/unit/fiber.cc +++ b/test/unit/fiber.cc @@ -72,6 +72,15 @@ cancel_dead_f(va_list ap) return 0; } +static int +usleep_f(va_list ap) +{ + (void)ap; + while (true) + usleep(1000); + return 0; +} + static void NOINLINE stack_expand(unsigned long *ret, unsigned long nr_calls) { @@ -376,6 +385,29 @@ cord_cojoin_test(void) footer(); } +static void +cord_cancel_and_join_test(void) +{ + header(); + struct cord tcord; + + /* Join an exited but not yet joined thread. */ + memset(&tcord, 0, sizeof(tcord)); + fail_if(cord_costart(&tcord, "test", noop_f, NULL) != 0); + /* Give the thread some time to exit. */ + fiber_sleep(0.01); + cord_cancel_and_join(&tcord); + + /* Cancel and join a hanging thread. */ + memset(&tcord, 0, sizeof(tcord)); + fail_if(cord_costart(&tcord, "test", usleep_f, NULL) != 0); + /* Give the thread some time to start. */ + fiber_sleep(0.01); + cord_cancel_and_join(&tcord); + + footer(); +} + static void fiber_test_defaults() { @@ -502,6 +534,7 @@ main_f(va_list ap) fiber_flags_respect_test(); fiber_wait_on_deadline_test(); cord_cojoin_test(); + cord_cancel_and_join_test(); fiber_test_defaults(); fiber_test_leak_modes(); ev_break(loop(), EVBREAK_ALL); diff --git a/test/unit/fiber.result b/test/unit/fiber.result index a90a5a32b02b979ae10790e200848106265c7f56..24b7883798160916743bc7a7a341b512d34f70d1 100644 --- a/test/unit/fiber.result +++ b/test/unit/fiber.result @@ -29,6 +29,8 @@ OutOfMemory: Failed to allocate 42 bytes in allocator for exception *** fiber_wait_on_deadline_test: done *** *** cord_cojoin_test *** *** cord_cojoin_test: done *** + *** cord_cancel_and_join_test *** + *** cord_cancel_and_join_test: done *** *** fiber_test_defaults *** *** fiber_test_defaults: done *** *** fiber_test_leak ***