From 9a71e8ee1a42495b7732e063d16f6789439b8aa4 Mon Sep 17 00:00:00 2001
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date: Mon, 5 Dec 2022 16:01:41 +0100
Subject: [PATCH] fiber: introduce cord_cancel_and_join()

It is a wrapper around pthread cancel and join. It was repeated
many times and was dangerous, because left cord.id set. An
accidental attempt to cord_join/cojoin() such cord would lead to
UB then.

The patch introduces a function which encapsulates the blocking
cancellation. It is going to be used in a next patch to count the
number of cords in the process. Which in turn is needed for a new
test.

The counter is atomic in case some cords would be created not by
the main cord.

There are now also more sanity checks against accidental attempts
to join the same cord twice.

Needed for #7743

NO_DOC=internal
NO_CHANGELOG=internal
---
 src/box/applier.cc      |  3 +--
 src/box/iproto.cc       |  3 +--
 src/box/memtx_engine.cc |  9 +++------
 src/box/relay.cc        | 16 +++++++---------
 src/box/vy_run.c        |  3 +--
 src/box/vy_scheduler.c  |  3 +--
 src/lib/core/fiber.c    | 40 ++++++++++++++++++++++++++++++++++++++--
 src/lib/core/fiber.h    |  9 +++++++++
 test/unit/cbus_hang.c   |  2 +-
 test/unit/fiber.cc      | 33 +++++++++++++++++++++++++++++++++
 test/unit/fiber.result  |  2 ++
 11 files changed, 97 insertions(+), 26 deletions(-)

diff --git a/src/box/applier.cc b/src/box/applier.cc
index 69c3a3b529..162bc9d81c 100644
--- a/src/box/applier.cc
+++ b/src/box/applier.cc
@@ -1801,8 +1801,7 @@ applier_free(void)
 {
 	for (int i = 0; i < replication_threads; i++) {
 		struct applier_thread *thread = applier_threads[i];
-		tt_pthread_cancel(thread->cord.id);
-		tt_pthread_join(thread->cord.id, NULL);
+		cord_cancel_and_join(&thread->cord);
 		free(thread);
 	}
 	free(applier_threads);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 27c9424b52..958babd0ee 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -3302,8 +3302,7 @@ void
 iproto_free(void)
 {
 	for (int i = 0; i < iproto_threads_count; i++) {
-		tt_pthread_cancel(iproto_threads[i].net_cord.id);
-		tt_pthread_join(iproto_threads[i].net_cord.id, NULL);
+		cord_cancel_and_join(&iproto_threads[i].net_cord);
 		/*
 		 * Close socket descriptor to prevent hot standby instance
 		 * failing to bind in case it tries to bind before socket
diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc
index 597c85f35f..3c2c1930e6 100644
--- a/src/box/memtx_engine.cc
+++ b/src/box/memtx_engine.cc
@@ -778,10 +778,8 @@ checkpoint_cancel(struct checkpoint *ckpt)
 	 * for it to terminate so as to eliminate the possibility
 	 * of use-after-free.
 	 */
-	if (ckpt->waiting_for_snap_thread) {
-		tt_pthread_cancel(ckpt->cord.id);
-		tt_pthread_join(ckpt->cord.id, NULL);
-	}
+	if (ckpt->waiting_for_snap_thread)
+		cord_cancel_and_join(&ckpt->cord);
 	checkpoint_delete(ckpt);
 }
 
@@ -793,8 +791,7 @@ replica_join_cancel(struct cord *replica_join_cord)
 	 * running and wait for it to terminate so as to
 	 * eliminate the possibility of use-after-free.
 	 */
-	tt_pthread_cancel(replica_join_cord->id);
-	tt_pthread_join(replica_join_cord->id, NULL);
+	cord_cancel_and_join(replica_join_cord);
 }
 
 static int
diff --git a/src/box/relay.cc b/src/box/relay.cc
index 86d3fb9e8d..0101dd267f 100644
--- a/src/box/relay.cc
+++ b/src/box/relay.cc
@@ -353,9 +353,8 @@ relay_cancel(struct relay *relay)
 {
 	/* Check that the thread is running first. */
 	if (relay->cord.id != 0) {
-		if (tt_pthread_cancel(relay->cord.id) == ESRCH)
-			return;
-		tt_pthread_join(relay->cord.id, NULL);
+		cord_cancel_and_join(&relay->cord);
+		relay->cord.id = 0;
 	}
 }
 
@@ -382,6 +381,11 @@ relay_exit(struct relay *relay)
 static void
 relay_stop(struct relay *relay)
 {
+	/*
+	 * The thread has to be already stopped or it could use the destroyed
+	 * data below.
+	 */
+	assert(relay->cord.id == 0);
 	struct relay_gc_msg *gc_msg, *next_gc_msg;
 	stailq_foreach_entry_safe(gc_msg, next_gc_msg,
 				  &relay->pending_gc, in_pending) {
@@ -393,12 +397,6 @@ relay_stop(struct relay *relay)
 		recovery_delete(relay->r);
 	relay->r = NULL;
 	relay->state = RELAY_STOPPED;
-	/*
-	 * Needed to track whether relay thread is running or not
-	 * for relay_cancel(). Id is reset to a positive value
-	 * upon cord_create().
-	 */
-	relay->cord.id = 0;
 	/*
 	 * If relay is stopped then lag statistics should
 	 * be updated on next new ACK packets obtained.
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 8ad7835f12..973cb29e6a 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -166,8 +166,7 @@ vy_run_env_stop_readers(struct vy_run_env *env)
 {
 	for (int i = 0; i < env->reader_pool_size; i++) {
 		struct vy_run_reader *reader = &env->reader_pool[i];
-		tt_pthread_cancel(reader->cord.id);
-		tt_pthread_join(reader->cord.id, NULL);
+		cord_cancel_and_join(&reader->cord);
 	}
 	free(env->reader_pool);
 }
diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c
index 65739345db..64b17d76f5 100644
--- a/src/box/vy_scheduler.c
+++ b/src/box/vy_scheduler.c
@@ -365,8 +365,7 @@ vy_worker_pool_stop(struct vy_worker_pool *pool)
 	assert(pool->workers != NULL);
 	for (int i = 0; i < pool->size; i++) {
 		struct vy_worker *worker = &pool->workers[i];
-		tt_pthread_cancel(worker->cord.id);
-		tt_pthread_join(worker->cord.id, NULL);
+		cord_cancel_and_join(&worker->cord);
 	}
 	free(pool->workers);
 	pool->workers = NULL;
diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
index a408cf5798..47e1382d7d 100644
--- a/src/lib/core/fiber.c
+++ b/src/lib/core/fiber.c
@@ -50,6 +50,8 @@ extern void cord_on_yield(void);
 
 static struct fiber_slice zero_slice = {.warn = 0.0, .err = 0.0};
 static struct fiber_slice default_slice = {.warn = 0.5, .err = 1.0};
+/** Number of cord-threads still running right now. */
+static int cord_count = 0;
 
 static inline void
 clock_stat_add_delta(struct clock_stat *stat, uint64_t clock_delta)
@@ -1684,6 +1686,8 @@ cord_exit(struct cord *cord)
 void
 cord_destroy(struct cord *cord)
 {
+	assert(cord->id == 0 || pthread_equal(cord->id, pthread_self()));
+	cord->id = 0;
 	slab_cache_set_thread(&cord->slabc);
 	if (cord->loop)
 		ev_loop_destroy(cord->loop);
@@ -1769,6 +1773,7 @@ cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg)
 		diag_set(SystemError, "failed to create thread");
 		goto end;
 	}
+	pm_atomic_fetch_add(&cord_count, 1);
 	res = 0;
 	while (! ct_arg.is_started)
 		tt_pthread_cond_wait(&ct_arg.start_cond, &ct_arg.start_mutex);
@@ -1788,10 +1793,16 @@ cord_start(struct cord *cord, const char *name, void *(*f)(void *), void *arg)
 int
 cord_join(struct cord *cord)
 {
-	assert(cord() != cord); /* Can't join self. */
+	assert(cord() != cord);
+	assert(cord->id != 0);
+
 	void *retval = NULL;
 	int res = tt_pthread_join(cord->id, &retval);
 	if (res == 0) {
+		int old_cord_count = pm_atomic_fetch_sub(&cord_count, 1);
+		assert(old_cord_count > 0);
+		(void)old_cord_count;
+		cord->id = 0;
 		struct fiber *f = cord->fiber;
 		if (f->f_ret != 0) {
 			assert(!diag_is_empty(&f->diag));
@@ -1842,7 +1853,8 @@ cord_cojoin_wakeup(struct ev_loop *loop, struct ev_async *ev, int revents)
 int
 cord_cojoin(struct cord *cord)
 {
-	assert(cord() != cord); /* Can't join self. */
+	assert(cord() != cord);
+	assert(cord->id != 0);
 
 	struct cord_cojoin_ctx ctx;
 	ctx.loop = loop();
@@ -1878,6 +1890,7 @@ cord_cojoin(struct cord *cord)
 		 * cord_cojoin_wakeup, waking up this fiber again.
 		 */
 		do {
+			assert(cord->id != 0);
 			fiber_yield();
 		} while (!ctx.task_complete);
 	}
@@ -1976,6 +1989,29 @@ cord_slab_cache(void)
 	return &cord()->slabc;
 }
 
+void
+cord_cancel_and_join(struct cord *cord)
+{
+	assert(cord->id != 0);
+	tt_pthread_cancel(cord->id);
+	if (tt_pthread_join(cord->id, NULL) != 0)
+		panic("failed to join a canceled thread");
+	int old_cord_count = pm_atomic_fetch_sub(&cord_count, 1);
+	assert(old_cord_count > 0);
+	(void)old_cord_count;
+	/*
+	 * Can't destroy the cord safely. The cancellation could even happen
+	 * before the cord was properly initialized in its own thread. It might
+	 * be fixed if cord would be initialized before its thread is started.
+	 *
+	 * Also obviously even if the creation would be fine, the destruction
+	 * can't free everything. The cord could have some resources allocated
+	 * on the heap with pointers not stored anywhere in struct cord - they
+	 * can't be possibly located.
+	 */
+	memset(cord, 0, sizeof(*cord));
+}
+
 static NOINLINE int
 check_stack_direction(void *prev_stack_frame)
 {
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index 7f1dd305bf..0f282d8d70 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -895,6 +895,15 @@ cord_is_main(void);
 void
 cord_collect_garbage(struct cord *cord);
 
+/**
+ * Pthread-cancel the thread and join it in a blocking way, without yielding.
+ * That way is the only possible one if the event loop is already destroyed.
+ * Should only be used as an emergency, because all the cord resources simply
+ * leak.
+ */
+void
+cord_cancel_and_join(struct cord *cord);
+
 /**
  * @brief Create a new system fiber.
  *
diff --git a/test/unit/cbus_hang.c b/test/unit/cbus_hang.c
index 8b36710017..d0eb8ffb7c 100644
--- a/test/unit/cbus_hang.c
+++ b/test/unit/cbus_hang.c
@@ -138,7 +138,7 @@ main_f(va_list ap)
 	tt_pthread_mutex_unlock(&(hang_endpoint.mutex));
 	/* Hack end */
 
-	tt_pthread_join(canceled_worker.id, NULL);
+	cord_join(&canceled_worker);
 
 	unsigned join_timeout = 5;
 	signal(SIGALRM, join_fail); // For exit in a hang case
diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc
index 5b42a4aae6..add51eea47 100644
--- a/test/unit/fiber.cc
+++ b/test/unit/fiber.cc
@@ -72,6 +72,15 @@ cancel_dead_f(va_list ap)
 	return 0;
 }
 
+static int
+usleep_f(va_list ap)
+{
+	(void)ap;
+	while (true)
+		usleep(1000);
+	return 0;
+}
+
 static void NOINLINE
 stack_expand(unsigned long *ret, unsigned long nr_calls)
 {
@@ -376,6 +385,29 @@ cord_cojoin_test(void)
 	footer();
 }
 
+static void
+cord_cancel_and_join_test(void)
+{
+	header();
+	struct cord tcord;
+
+	/* Join an exited but not yet joined thread. */
+	memset(&tcord, 0, sizeof(tcord));
+	fail_if(cord_costart(&tcord, "test", noop_f, NULL) != 0);
+	/* Give the thread some time to exit. */
+	fiber_sleep(0.01);
+	cord_cancel_and_join(&tcord);
+
+	/* Cancel and join a hanging thread. */
+	memset(&tcord, 0, sizeof(tcord));
+	fail_if(cord_costart(&tcord, "test", usleep_f, NULL) != 0);
+	/* Give the thread some time to start. */
+	fiber_sleep(0.01);
+	cord_cancel_and_join(&tcord);
+
+	footer();
+}
+
 static void
 fiber_test_defaults()
 {
@@ -502,6 +534,7 @@ main_f(va_list ap)
 	fiber_flags_respect_test();
 	fiber_wait_on_deadline_test();
 	cord_cojoin_test();
+	cord_cancel_and_join_test();
 	fiber_test_defaults();
 	fiber_test_leak_modes();
 	ev_break(loop(), EVBREAK_ALL);
diff --git a/test/unit/fiber.result b/test/unit/fiber.result
index a90a5a32b0..24b7883798 100644
--- a/test/unit/fiber.result
+++ b/test/unit/fiber.result
@@ -29,6 +29,8 @@ OutOfMemory: Failed to allocate 42 bytes in allocator for exception
 	*** fiber_wait_on_deadline_test: done ***
 	*** cord_cojoin_test ***
 	*** cord_cojoin_test: done ***
+	*** cord_cancel_and_join_test ***
+	*** cord_cancel_and_join_test: done ***
 	*** fiber_test_defaults ***
 	*** fiber_test_defaults: done ***
 	*** fiber_test_leak ***
-- 
GitLab