diff --git a/src/box/relay.cc b/src/box/relay.cc index d6acc993fe470de95bf54754d4d112ac8a6f4beb..81767bec03250bac8b1dccc6a2df775344e1c7ff 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -104,8 +104,6 @@ struct relay { struct cpipe relay_pipe; /** Status message */ struct relay_status_msg status_msg; - /** A condition to signal when status message is handled. */ - struct ipc_cond status_cond; /** Relay exit orchestration message */ struct relay_exit_msg exit_msg; @@ -226,8 +224,6 @@ static void relay_status_update(struct cmsg *msg) { msg->route = NULL; - struct relay_status_msg *status_msg = (struct relay_status_msg *)msg; - ipc_cond_signal(&status_msg->relay->status_cond); } /** @@ -259,9 +255,7 @@ relay_cbus_detach(struct relay *relay) { say_crit("exiting the relay loop"); /* Check that we have no in-flight status message */ - while (relay->status_msg.msg.route != NULL) { - ipc_cond_wait(&relay->status_cond); - } + cbus_flush(&relay->tx_pipe, &relay->relay_pipe, cbus_process); static const struct cmsg_hop exit_route[] = { {tx_exit_cb, NULL} @@ -277,7 +271,6 @@ relay_cbus_detach(struct relay *relay) */ cpipe_destroy(&relay->tx_pipe); cbus_endpoint_destroy(&relay->endpoint, cbus_process); - ipc_cond_destroy(&relay->status_cond); } /** @@ -292,7 +285,6 @@ relay_subscribe_f(va_list ap) struct recovery *r = relay->r; coio_enable(); relay->stream.write = relay_send_row; - ipc_cond_create(&relay->status_cond); cbus_endpoint_create(&relay->endpoint, cord_name(cord()), fiber_schedule_cb, fiber()); cpipe_create(&relay->tx_pipe, "tx"); diff --git a/src/cbus.c b/src/cbus.c index 2c1270e14300c471b97f222905ae8fd1783b6e0d..5f7048eef84e8bf26cb64a43609c6c3bae4f864b 100644 --- a/src/cbus.c +++ b/src/cbus.c @@ -398,6 +398,52 @@ cbus_call(struct cpipe *callee, struct cpipe *caller, struct cbus_call_msg *msg, return rc; } +struct cbus_flush_msg { + struct cmsg cmsg; + bool complete; + struct ipc_cond cond; +}; + +static void +cbus_flush_perform(struct cmsg *cmsg) +{ + (void)cmsg; +} + +static void +cbus_flush_complete(struct cmsg *cmsg) +{ + struct cbus_flush_msg *msg = container_of(cmsg, + struct cbus_flush_msg, cmsg); + msg->complete = true; + ipc_cond_signal(&msg->cond); +} + +void +cbus_flush(struct cpipe *callee, struct cpipe *caller, + void (*process_cb)(struct cbus_endpoint *endpoint)) +{ + struct cmsg_hop route[] = { + {cbus_flush_perform, caller}, + {cbus_flush_complete, NULL}, + }; + struct cbus_flush_msg msg; + + cmsg_init(&msg.cmsg, route); + msg.complete = false; + ipc_cond_create(&msg.cond); + + cpipe_push(callee, &msg.cmsg); + + while (true) { + if (process_cb != NULL) + process_cb(caller->endpoint); + if (msg.complete) + break; + ipc_cond_wait(&msg.cond); + } +} + void cbus_process(struct cbus_endpoint *endpoint) { diff --git a/src/cbus.h b/src/cbus.h index 2cfa0de2f0749809eb42f13eb8f5c24a15c72952..9431946561130a3a3dd69c8ec6f2c9349cd9afc6 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -370,6 +370,15 @@ cbus_call(struct cpipe *callee, struct cpipe *caller, struct cbus_call_msg *msg, cbus_call_f func, cbus_call_f free_cb, double timeout); +/** + * Block until all messages queued in a pipe have been processed. + * Done by submitting a dummy message to the pipe and waiting + * until it is complete. + */ +void +cbus_flush(struct cpipe *callee, struct cpipe *caller, + void (*process_cb)(struct cbus_endpoint *)); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */