diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 25aefd6474e107fc831b80d14b334e8b938861b3..abea958f977808d41724228cc4b532413509f82a 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -30,7 +30,9 @@ */ #include "recovery.h" +#include "small/rlist.h" #include "scoped_guard.h" +#include "trigger.h" #include "fiber.h" #include "xlog.h" #include "xrow.h" @@ -131,6 +133,7 @@ recovery_new(const char *wal_dirname, bool force_recovery, xdir_check_xc(&r->wal_dir); r->watcher = NULL; + rlist_create(&r->on_close_log); guard.is_active = false; return r; @@ -148,6 +151,7 @@ recovery_close_log(struct recovery *r) r->cursor.name); } xlog_cursor_close(&r->cursor, false); + trigger_run(&r->on_close_log, NULL); } void @@ -155,6 +159,7 @@ recovery_delete(struct recovery *r) { recovery_stop_local(r); + trigger_destroy(&r->on_close_log); xdir_destroy(&r->wal_dir); if (r->cursor.state != XLOG_CURSOR_CLOSED) { /* diff --git a/src/box/recovery.h b/src/box/recovery.h index b72a77ec5dbe605c0683afccbb6007f4648561a0..0e59bdafcda6f19ff3c61d643b17276810843f79 100644 --- a/src/box/recovery.h +++ b/src/box/recovery.h @@ -30,6 +30,7 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include "small/rlist.h" #include "trivia/util.h" #include "third_party/tarantool_ev.h" #include "xlog.h" @@ -54,6 +55,8 @@ struct recovery { * locally or send to the replica. */ struct fiber *watcher; + /** List of triggers invoked when the current WAL is closed. */ + struct rlist on_close_log; }; struct recovery * diff --git a/src/box/relay.cc b/src/box/relay.cc index 81767bec03250bac8b1dccc6a2df775344e1c7ff..39d12126ff635cb44eb094b3bb42796eb13f2b79 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -67,6 +67,18 @@ struct relay_status_msg { struct vclock vclock; }; +/** + * Cbus message to update replica gc state in tx thread. + */ +struct relay_gc_msg { + /** Parent */ + struct cmsg msg; + /** Relay instance */ + struct relay *relay; + /** Vclock signature to advance to */ + int64_t signature; +}; + /** * Cbus message to notify tx thread that relay is stopping. */ @@ -234,8 +246,6 @@ tx_status_update(struct cmsg *msg) { struct relay_status_msg *status = (struct relay_status_msg *)msg; vclock_copy(&status->relay->tx.vclock, &status->vclock); - gc_consumer_advance(status->relay->replica->gc, - vclock_sum(&status->vclock)); static const struct cmsg_hop route[] = { {relay_status_update, NULL} }; @@ -243,6 +253,35 @@ tx_status_update(struct cmsg *msg) cpipe_push(&status->relay->relay_pipe, msg); } +/** + * Update replica gc state in tx thread. + */ +static void +tx_gc_advance(struct cmsg *msg) +{ + struct relay_gc_msg *m = (struct relay_gc_msg *)msg; + gc_consumer_advance(m->relay->replica->gc, m->signature); + free(m); +} + +static void +relay_on_close_log_f(struct trigger *trigger, void * /* event */) +{ + static const struct cmsg_hop route[] = { + {tx_gc_advance, NULL} + }; + struct relay *relay = (struct relay *)trigger->data; + struct relay_gc_msg *m = (struct relay_gc_msg *)malloc(sizeof(*m)); + if (m == NULL) { + say_warn("failed to allocate relay gc message"); + return; + } + cmsg_init(&m->msg, route); + m->relay = relay; + m->signature = vclock_sum(&relay->r->vclock); + cpipe_push(&relay->tx_pipe, &m->msg); +} + static void tx_exit_cb(struct cmsg *msg) { @@ -288,9 +327,17 @@ relay_subscribe_f(va_list ap) cbus_endpoint_create(&relay->endpoint, cord_name(cord()), fiber_schedule_cb, fiber()); cpipe_create(&relay->tx_pipe, "tx"); - - /* Create a guard to detach the relay from cbus on exit */ - auto cbus_guard = make_scoped_guard([&]{ + /* Setup garbage collection trigger. */ + struct trigger on_close_log = { + RLIST_LINK_INITIALIZER, relay_on_close_log_f, relay, NULL + }; + trigger_add(&r->on_close_log, &on_close_log); + /* + * Create a guard to detach the relay from cbus and + * clear the garbage collection trigger on exit. + */ + auto guard = make_scoped_guard([&]{ + trigger_clear(&on_close_log); relay_cbus_detach(relay); }); relay_set_cord_name(relay->io.fd);