From bd6fb06a140fc4babc7fb1c1bee818149007d00b Mon Sep 17 00:00:00 2001
From: Ilya Verbin <iverbin@tarantool.org>
Date: Wed, 18 May 2022 19:19:00 +0300
Subject: [PATCH] core: allow spurious wakeups in cbus_call

Currently it's possible to wakeup a fiber, which is waiting for `cbus_call`
completion, using Tarantool C API. This will cause a misleading `TimedOut`
error. This patch reworks `cbus_call` in such a way that it yields until
a completion flag is set.

Part of #7166

NO_DOC=refactoring
NO_CHANGELOG=refactoring
---
 src/box/iproto.cc          |   2 -
 src/box/vy_run.c           |   2 -
 src/box/wal.c              |  14 ---
 src/lib/core/cbus.c        |  16 ++--
 src/lib/core/fiber.c       |   7 ++
 src/lib/core/fiber.h       |   7 ++
 test/unit/CMakeLists.txt   |   3 +
 test/unit/cbus_call.c      | 178 +++++++++++++++++++++++++++++++++++++
 test/unit/cbus_call.result |   7 ++
 9 files changed, 210 insertions(+), 26 deletions(-)
 create mode 100644 test/unit/cbus_call.c
 create mode 100644 test/unit/cbus_call.result

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 40154d7fe3..871cd7b059 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -3147,10 +3147,8 @@ static inline int
 iproto_do_cfg(struct iproto_thread *iproto_thread, struct iproto_cfg_msg *msg)
 {
 	msg->iproto_thread = iproto_thread;
-	bool prev = fiber_set_cancellable(false);
 	int rc = cbus_call(&iproto_thread->net_pipe, &iproto_thread->tx_pipe,
 			   msg, iproto_do_cfg_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(prev);
 	return rc;
 }
 
diff --git a/src/box/vy_run.c b/src/box/vy_run.c
index 0eb1250417..d55dd29ebd 100644
--- a/src/box/vy_run.c
+++ b/src/box/vy_run.c
@@ -226,10 +226,8 @@ vy_run_env_coio_call(struct vy_run_env *env, struct cbus_call_msg *msg,
 	env->next_reader %= env->reader_pool_size;
 
 	/* Post the task to the reader thread. */
-	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&reader->reader_pipe, &reader->tx_pipe,
 			   msg, func, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 	if (rc != 0)
 		return -1;
 
diff --git a/src/box/wal.c b/src/box/wal.c
index 8ad9fed0a5..57742632e7 100644
--- a/src/box/wal.c
+++ b/src/box/wal.c
@@ -632,11 +632,9 @@ wal_sync(struct vclock *vclock)
 		diag_set(ClientError, ER_CASCADE_ROLLBACK);
 		return -1;
 	}
-	bool cancellable = fiber_set_cancellable(false);
 	struct wal_vclock_msg msg;
 	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 			   &msg.base, wal_sync_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 	if (vclock != NULL)
 		vclock_copy(vclock, &msg.vclock);
 	return rc;
@@ -694,11 +692,9 @@ wal_begin_checkpoint(struct wal_checkpoint *checkpoint)
 		diag_set(ClientError, ER_CASCADE_ROLLBACK);
 		return -1;
 	}
-	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 			   &checkpoint->base, wal_begin_checkpoint_f, NULL,
 			   TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 	if (rc != 0)
 		return -1;
 	return 0;
@@ -734,11 +730,9 @@ wal_commit_checkpoint(struct wal_checkpoint *checkpoint)
 		vclock_copy(&writer->checkpoint_vclock, &checkpoint->vclock);
 		return;
 	}
-	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &checkpoint->base, wal_commit_checkpoint_f, NULL,
 		  TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 }
 
 struct wal_set_checkpoint_threshold_msg {
@@ -764,11 +758,9 @@ wal_set_checkpoint_threshold(int64_t threshold)
 		return;
 	struct wal_set_checkpoint_threshold_msg msg;
 	msg.checkpoint_threshold = threshold;
-	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 		  &msg.base, wal_set_checkpoint_threshold_f, NULL,
 		  TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 }
 
 void
@@ -819,10 +811,8 @@ wal_collect_garbage(const struct vclock *vclock)
 		return;
 	struct wal_gc_msg msg;
 	msg.vclock = vclock;
-	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg.base,
 		  wal_collect_garbage_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 }
 
 static void
@@ -1386,11 +1376,9 @@ wal_write_vy_log(struct journal_entry *entry)
 	struct wal_writer *writer = &wal_writer_singleton;
 	struct wal_write_vy_log_msg msg;
 	msg.entry= entry;
-	bool cancellable = fiber_set_cancellable(false);
 	int rc = cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe,
 			   &msg.base, wal_write_vy_log_f, NULL,
 			   TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 	return rc;
 }
 
@@ -1408,10 +1396,8 @@ wal_rotate_vy_log(void)
 {
 	struct wal_writer *writer = &wal_writer_singleton;
 	struct cbus_call_msg msg;
-	bool cancellable = fiber_set_cancellable(false);
 	cbus_call(&writer->wal_pipe, &writer->tx_prio_pipe, &msg,
 		  wal_rotate_vy_log_f, NULL, TIMEOUT_INFINITY);
-	fiber_set_cancellable(cancellable);
 }
 
 static void
diff --git a/src/lib/core/cbus.c b/src/lib/core/cbus.c
index 06acef732c..954d29371c 100644
--- a/src/lib/core/cbus.c
+++ b/src/lib/core/cbus.c
@@ -433,15 +433,15 @@ cbus_call(struct cpipe *callee, struct cpipe *caller, struct cbus_call_msg *msg,
 
 	cpipe_push(callee, cmsg(msg));
 
-	fiber_yield_timeout(timeout);
-	if (msg->complete == false) {           /* timed out or cancelled */
-		msg->caller = NULL;
-		if (fiber_is_cancelled())
-			diag_set(FiberIsCancelled);
-		else
+	ev_tstamp deadline = ev_monotonic_now(loop()) + timeout;
+	do {
+		bool exceeded = fiber_yield_deadline(deadline);
+		if (exceeded) {
 			diag_set(TimedOut);
-		return -1;
-	}
+			return -1;
+		}
+	} while (!msg->complete);
+
 	if ((rc = msg->rc))
 		diag_move(&msg->diag, &fiber()->diag);
 	return rc;
diff --git a/src/lib/core/fiber.c b/src/lib/core/fiber.c
index 19e73d7036..38cb3b18d7 100644
--- a/src/lib/core/fiber.c
+++ b/src/lib/core/fiber.c
@@ -761,6 +761,13 @@ fiber_yield_timeout(ev_tstamp delay)
 	return state.timed_out;
 }
 
+bool
+fiber_yield_deadline(ev_tstamp deadline)
+{
+	ev_tstamp timeout = deadline - ev_monotonic_now(loop());
+	return fiber_yield_timeout(timeout);
+}
+
 /**
  * Yield the current fiber to events in the event loop.
  */
diff --git a/src/lib/core/fiber.h b/src/lib/core/fiber.h
index 4ed5c1d81e..5ff959304f 100644
--- a/src/lib/core/fiber.h
+++ b/src/lib/core/fiber.h
@@ -865,6 +865,13 @@ fiber_checkstack(void);
 bool
 fiber_yield_timeout(ev_tstamp delay);
 
+/**
+ * Yield and check for deadline.
+ * Return true if deadline exceeded.
+ */
+bool
+fiber_yield_deadline(ev_tstamp deadline);
+
 void
 fiber_destroy_all(struct cord *cord);
 
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 71b31cca9b..7c8a0d7032 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -125,6 +125,9 @@ target_link_libraries(cbus_stress.test core stat)
 add_executable(cbus.test cbus.c core_test_utils.c)
 target_link_libraries(cbus.test core unit stat)
 
+add_executable(cbus_call.test cbus_call.c core_test_utils.c)
+target_link_libraries(cbus_call.test core unit stat)
+
 include(CheckSymbolExists)
 check_symbol_exists(__GLIBC__ features.h GLIBC_USED)
 if (GLIBC_USED)
diff --git a/test/unit/cbus_call.c b/test/unit/cbus_call.c
new file mode 100644
index 0000000000..4f59797825
--- /dev/null
+++ b/test/unit/cbus_call.c
@@ -0,0 +1,178 @@
+#include "exception.h"
+#include "memory.h"
+#include "cbus.h"
+#include "unit.h"
+
+static struct fiber *caller_fiber;
+static struct cpipe pipe_to_callee;
+static struct cpipe pipe_to_caller;
+
+static int
+func(struct cbus_call_msg *msg)
+{
+	usleep(100000);
+	return 0;
+}
+
+/** Check ordinary cbus_call, nothing special. */
+static void
+test_cbus_call(void)
+{
+	struct cbus_call_msg msg;
+	int rc = cbus_call(&pipe_to_callee, &pipe_to_caller, &msg, func, NULL,
+			   TIMEOUT_INFINITY);
+	is(rc, 0, "cbus_call ordinary");
+}
+
+static int
+empty(struct cbus_call_msg *msg)
+{
+	return 0;
+}
+
+/** Block until previously called func have been completed. */
+static void
+barrier(void)
+{
+	struct cbus_call_msg msg;
+	int rc = cbus_call(&pipe_to_callee, &pipe_to_caller, &msg, empty, NULL,
+			   TIMEOUT_INFINITY);
+	fail_if(rc != 0);
+}
+
+/** Set cbus_call timeout to 10 ms, while func runs for 100 ms. */
+static void
+test_cbus_call_timeout(void)
+{
+	struct cbus_call_msg msg;
+	int rc = cbus_call(&pipe_to_callee, &pipe_to_caller, &msg, func, NULL,
+			   0.01);
+	struct error *err = diag_last_error(diag_get());
+	bool pass = (rc == -1) && err && (err->type == &type_TimedOut);
+	ok(pass, "cbus_call timeout");
+	barrier();
+}
+
+static int
+waker_fn(va_list ap)
+{
+	fiber_sleep(0.05);
+	fiber_wakeup(caller_fiber);
+	return 0;
+}
+
+/** Check that cbus_call is not interrupted by fiber_wakeup. */
+static void
+test_cbus_call_wakeup(void)
+{
+	struct fiber *waker_fiber = fiber_new("waker", waker_fn);
+	fail_if(waker_fiber == NULL);
+	fiber_wakeup(waker_fiber);
+
+	struct cbus_call_msg msg;
+	int rc = cbus_call(&pipe_to_callee, &pipe_to_caller, &msg, func, NULL,
+			   TIMEOUT_INFINITY);
+	is(rc, 0, "cbus_call wakeup");
+	barrier();
+}
+
+static int
+canceler_fn(va_list ap)
+{
+	fiber_sleep(0.05);
+	fiber_cancel(caller_fiber);
+	return 0;
+}
+
+/** Check that cbus_call is not interrupted by fiber_cancel. */
+static void
+test_cbus_call_cancel(void)
+{
+	struct fiber *canceler_fiber = fiber_new("canceler", canceler_fn);
+	fail_if(canceler_fiber == NULL);
+	fiber_wakeup(canceler_fiber);
+
+	struct cbus_call_msg msg;
+	int rc = cbus_call(&pipe_to_callee, &pipe_to_caller, &msg, func, NULL,
+			   TIMEOUT_INFINITY);
+	is(rc, 0, "cbus_call cancel");
+	barrier();
+}
+
+static void
+caller_cb(struct ev_loop *loop, ev_watcher *watcher, int events)
+{
+	struct cbus_endpoint *endpoint = (struct cbus_endpoint *)watcher->data;
+	cbus_process(endpoint);
+}
+
+static int
+callee_fn(va_list ap)
+{
+	struct cbus_endpoint endpoint;
+	cpipe_create(&pipe_to_caller, "caller");
+	cbus_endpoint_create(&endpoint, "callee", fiber_schedule_cb, fiber());
+	cbus_loop(&endpoint);
+	cbus_endpoint_destroy(&endpoint, cbus_process);
+	cpipe_destroy(&pipe_to_caller);
+	return 0;
+}
+
+static void
+callee_start(struct cord *c)
+{
+	fail_if(cord_costart(c, "callee", callee_fn, NULL) != 0);
+	cpipe_create(&pipe_to_callee, "callee");
+}
+
+static void
+callee_stop(struct cord *c)
+{
+	cbus_stop_loop(&pipe_to_callee);
+	cpipe_destroy(&pipe_to_callee);
+	fail_if(cord_join(c) != 0);
+}
+
+static int
+caller_fn(va_list ap)
+{
+	test_cbus_call();
+	test_cbus_call_timeout();
+	test_cbus_call_wakeup();
+	test_cbus_call_cancel();
+
+	ev_break(loop(), EVBREAK_ALL);
+	return 0;
+}
+
+int
+main(void)
+{
+	struct cord callee_cord;
+	struct cbus_endpoint endpoint;
+
+	header();
+	plan(4);
+
+	memory_init();
+	fiber_init(fiber_c_invoke);
+	cbus_init();
+	cbus_endpoint_create(&endpoint, "caller", caller_cb, &endpoint);
+	callee_start(&callee_cord);
+
+	caller_fiber = fiber_new("caller", caller_fn);
+	fail_if(caller_fiber == NULL);
+	fiber_wakeup(caller_fiber);
+
+	ev_run(loop(), 0);
+
+	callee_stop(&callee_cord);
+	cbus_endpoint_destroy(&endpoint, cbus_process);
+	cbus_free();
+	fiber_free();
+	memory_free();
+
+	int rc = check_plan();
+	footer();
+	return rc;
+}
diff --git a/test/unit/cbus_call.result b/test/unit/cbus_call.result
new file mode 100644
index 0000000000..c0230c5c89
--- /dev/null
+++ b/test/unit/cbus_call.result
@@ -0,0 +1,7 @@
+	*** main ***
+1..4
+ok 1 - cbus_call ordinary
+ok 2 - cbus_call timeout
+ok 3 - cbus_call wakeup
+ok 4 - cbus_call cancel
+	*** main: done ***
-- 
GitLab