diff --git a/src/box/relay.cc b/src/box/relay.cc index 2a4cb41246646060f500178cc84ec189a191508b..0034f99a043a1a66c7711ad1f23db653e0e4536e 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -408,6 +408,8 @@ relay_schedule_pending_gc(struct relay *relay, const struct vclock *vclock) static void relay_process_wal_event(struct wal_watcher_msg *msg) { + assert((msg->events & (WAL_EVENT_WRITE | WAL_EVENT_ROTATE)) != 0); + struct relay *relay = container_of(msg->watcher, struct relay, wal_watcher); if (relay->state != RELAY_FOLLOW) { @@ -505,7 +507,8 @@ relay_subscribe_f(va_list ap) }; trigger_add(&r->on_close_log, &on_close_log); wal_set_watcher(&relay->wal_watcher, cord_name(cord()), - relay_process_wal_event, cbus_process); + relay_process_wal_event, cbus_process, + WAL_EVENT_WRITE | WAL_EVENT_ROTATE); relay_set_cord_name(relay->io.fd); diff --git a/src/box/wal.c b/src/box/wal.c index b239acd48ca70ce2ec5ba1259ec2556d46a8e55b..30e36d8e0678f37e6791e8e00a82eeb1292b6e04 100644 --- a/src/box/wal.c +++ b/src/box/wal.c @@ -1027,6 +1027,12 @@ wal_watcher_notify(struct wal_watcher *watcher, unsigned events) { assert(!rlist_empty(&watcher->next)); + events &= watcher->event_mask; + if (events == 0) { + /* The watcher isn't interested in this event. */ + return; + } + if (watcher->msg.cmsg.route != NULL) { /* * If the notification message is still en route, @@ -1100,7 +1106,8 @@ wal_watcher_detach(void *arg) void wal_set_watcher(struct wal_watcher *watcher, const char *name, void (*watcher_cb)(struct wal_watcher_msg *), - void (*process_cb)(struct cbus_endpoint *)) + void (*process_cb)(struct cbus_endpoint *), + unsigned event_mask) { assert(journal_is_initialized(&wal_writer_singleton.base)); @@ -1110,6 +1117,7 @@ wal_set_watcher(struct wal_watcher *watcher, const char *name, watcher->msg.events = 0; watcher->msg.cmsg.route = NULL; watcher->pending_events = 0; + watcher->event_mask = event_mask; assert(lengthof(watcher->route) == 2); watcher->route[0] = (struct cmsg_hop) @@ -1130,6 +1138,15 @@ wal_clear_watcher(struct wal_watcher *watcher, wal_watcher_detach, watcher, process_cb); } +/** + * Notify all interested watchers about a WAL event. + * + * XXX: Note, this function iterates over all registered watchers, + * including those that are not interested in the given event. + * This is OK only as long as the number of events/watchers is + * small. If this ever changes, we should consider maintaining + * a separate watcher list per each event type. + */ static void wal_notify_watchers(struct wal_writer *writer, unsigned events) { diff --git a/src/box/wal.h b/src/box/wal.h index b7edcd43f1e82d444299d2ef3429b28ad369926f..e8a4299cc9a7f8e1c3644ebe100aed00b8da77e2 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -95,6 +95,11 @@ struct wal_watcher { struct cmsg_hop route[2]; /** Message sent to notify the watcher. */ struct wal_watcher_msg msg; + /** + * Bit mask of WAL events that this watcher is + * interested in. + */ + unsigned event_mask; /** * Bit mask of WAL events that happened while * the notification message was en route. @@ -126,11 +131,13 @@ struct wal_watcher { * @param process_cb Function called to process cbus messages * while the watcher is being attached or NULL * if the cbus loop is running elsewhere. + * @param event_mask Bit mask of events the watcher is interested in. */ void wal_set_watcher(struct wal_watcher *watcher, const char *name, void (*watcher_cb)(struct wal_watcher_msg *), - void (*process_cb)(struct cbus_endpoint *)); + void (*process_cb)(struct cbus_endpoint *), + unsigned event_mask); /** * Unsubscribe from WAL events.