From b49bb45896f9b5ee36eeb0bc42ef533f7c03109e Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Mon, 3 Jul 2017 12:39:53 +0300 Subject: [PATCH] relay: fix potential thread hang on exit To make sure there is no status message pending in the tx pipe, relay_cbus_detach() waits on relay->status_cond before proceeding to relay destruction. The status_cond is signaled by the status message completion routine (relay_status_update()) handled by cbus on the relay's side. The problem is by the time we call relay_cbus_detach(), the cbus loop has been stopped (see relay_subscribe_f()), i.e. there's no one to process the message that is supposed to signal status_cond. That means, if there happens to be a status message en route when the relay is stopped, the relay thread will hang forever. To fix this issue, let's introduce a new helper function, cbus_flush(), which blocks the caller until all cbus messages queued on a pipe have been processed, and use it in relay_cbus_detach() to wait for in-flight status messages to complete. Apart from source and destination pipes, this new function takes a callback to be used for processing incoming cbus messages, so it can be used even if the loop that is supposed to invoke cbus_process() stopped. --- src/box/relay.cc | 10 +--------- src/cbus.c | 46 ++++++++++++++++++++++++++++++++++++++++++++++ src/cbus.h | 9 +++++++++ 3 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/box/relay.cc b/src/box/relay.cc index d6acc993fe..81767bec03 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -104,8 +104,6 @@ struct relay { struct cpipe relay_pipe; /** Status message */ struct relay_status_msg status_msg; - /** A condition to signal when status message is handled. */ - struct ipc_cond status_cond; /** Relay exit orchestration message */ struct relay_exit_msg exit_msg; @@ -226,8 +224,6 @@ static void relay_status_update(struct cmsg *msg) { msg->route = NULL; - struct relay_status_msg *status_msg = (struct relay_status_msg *)msg; - ipc_cond_signal(&status_msg->relay->status_cond); } /** @@ -259,9 +255,7 @@ relay_cbus_detach(struct relay *relay) { say_crit("exiting the relay loop"); /* Check that we have no in-flight status message */ - while (relay->status_msg.msg.route != NULL) { - ipc_cond_wait(&relay->status_cond); - } + cbus_flush(&relay->tx_pipe, &relay->relay_pipe, cbus_process); static const struct cmsg_hop exit_route[] = { {tx_exit_cb, NULL} @@ -277,7 +271,6 @@ relay_cbus_detach(struct relay *relay) */ cpipe_destroy(&relay->tx_pipe); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - ipc_cond_destroy(&relay->status_cond); } /** @@ -292,7 +285,6 @@ relay_subscribe_f(va_list ap) struct recovery *r = relay->r; coio_enable(); relay->stream.write = relay_send_row; - ipc_cond_create(&relay->status_cond); cbus_endpoint_create(&relay->endpoint, cord_name(cord()), fiber_schedule_cb, fiber()); cpipe_create(&relay->tx_pipe, "tx"); diff --git a/src/cbus.c b/src/cbus.c index 2c1270e143..5f7048eef8 100644 --- a/src/cbus.c +++ b/src/cbus.c @@ -398,6 +398,52 @@ cbus_call(struct cpipe *callee, struct cpipe *caller, struct cbus_call_msg *msg, return rc; } +struct cbus_flush_msg { + struct cmsg cmsg; + bool complete; + struct ipc_cond cond; +}; + +static void +cbus_flush_perform(struct cmsg *cmsg) +{ + (void)cmsg; +} + +static void +cbus_flush_complete(struct cmsg *cmsg) +{ + struct cbus_flush_msg *msg = container_of(cmsg, + struct cbus_flush_msg, cmsg); + msg->complete = true; + ipc_cond_signal(&msg->cond); +} + +void +cbus_flush(struct cpipe *callee, struct cpipe *caller, + void (*process_cb)(struct cbus_endpoint *endpoint)) +{ + struct cmsg_hop route[] = { + {cbus_flush_perform, caller}, + {cbus_flush_complete, NULL}, + }; + struct cbus_flush_msg msg; + + cmsg_init(&msg.cmsg, route); + msg.complete = false; + ipc_cond_create(&msg.cond); + + cpipe_push(callee, &msg.cmsg); + + while (true) { + if (process_cb != NULL) + process_cb(caller->endpoint); + if (msg.complete) + break; + ipc_cond_wait(&msg.cond); + } +} + void cbus_process(struct cbus_endpoint *endpoint) { diff --git a/src/cbus.h b/src/cbus.h index 2cfa0de2f0..9431946561 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -370,6 +370,15 @@ cbus_call(struct cpipe *callee, struct cpipe *caller, struct cbus_call_msg *msg, cbus_call_f func, cbus_call_f free_cb, double timeout); +/** + * Block until all messages queued in a pipe have been processed. + * Done by submitting a dummy message to the pipe and waiting + * until it is complete. + */ +void +cbus_flush(struct cpipe *callee, struct cpipe *caller, + void (*process_cb)(struct cbus_endpoint *)); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ -- GitLab