From 2a0c4f2bb1370bfd065fa8470a130c895f8812f4 Mon Sep 17 00:00:00 2001 From: Serge Petrenko <sergepetrenko@tarantool.org> Date: Thu, 8 Dec 2022 15:18:34 +0300 Subject: [PATCH] replication: make replica subscribe to master's ballot Previously replicas chose the remote master to boot from by comparing master ballot, which are received in response to IPROTO_VOTE request right on connection init. Such information is not enough in some scenarios. For example, when implementing anonymous replicas and retrying relica join, we had to restart all connections in order to get the latest ballot information. Let's change that: make replica subscribe to the built-in "internal.ballot" event instead of relying on request-response scheme of IPROTO_VOTE. Now replicas always have up-to-date ballot information and there is no need to reinitialize connections to update the ballots. Introduce a new fiber running in tx thread for this purpose: applier ballot watcher. The fiber subscribes on "internal.ballot" event and watches it all the time while the connection to master is alive. In case the master isn't aware of IPROTO_WATCH request or of "internal.ballot" event, old behaviour is also implemented: ballot watcher simply waits for IPROTO_VOTE response and exits. The ballot watcher is started whenever replica tries to connect or reconnect to the remote master and is cancelled whenever its parent connection to the master is closed. We do not put much effort into restarting the fiber and retrying to connect in case it fails. For now ballot info is only used during bootstrap, and not trying to keep the fiber alive at all costs simplifies the code quite a lot. Later on ballot subscriptions will play a more significant role in choosing the bootstrap leader: replicas will re-check remote ballots every now and then during the bootstrap leader election. Part-of #5272 NO_CHANGELOG=internal change NO_TEST=tested by existing replication tests NO_DOC=internal change --- src/box/applier.cc | 242 ++++++++++++++++++++++++++++++++++++++---- src/box/applier.h | 6 ++ src/box/errcode.h | 1 + src/box/xrow.c | 36 +++++++ src/box/xrow.h | 37 +++++++ test/box/error.result | 1 + 6 files changed, 303 insertions(+), 20 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6d77d72f24..18780dd2b3 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -369,6 +369,200 @@ applier_connection_init(struct iostream *io, const struct uri *uri, } } +static void +applier_run_ballot_triggers(struct applier *applier, bool success) +{ + trigger_run(&applier->on_ballot_update, &success); +} + +/** + * Perform a IPROTO_VOTE exchange: send a request and decode the response. + * Fallback to this method if master isn't aware of "internal.ballot" event. + */ +static void +applier_get_ballot_from_vote(struct iostream *io, struct xrow_header *row, + struct ibuf *ibuf, struct ballot *ballot) +{ + RegionGuard guard(&fiber()->gc); + xrow_encode_vote(row); + coio_write_xrow(io, row); + coio_read_xrow(io, ibuf, row); + if (row->type == IPROTO_OK) { + xrow_decode_ballot_xc(row, ballot); + return; + } + xrow_decode_error(row); + struct diag *diag = diag_get(); + struct error *e = diag_last_error(diag); + /* + * Master isn't aware of IPROTO_VOTE request. + * It's OK - we can proceed without it. + */ + if (box_error_code(e) == ER_UNKNOWN_REQUEST_TYPE) { + diag_clear(diag); + return; + } + error_raise(e); +} + +/** + * Perform one round of IPROTO_WATCH("internal.ballot") exchange: send the + * request / acknowledge that previous notification is accepted, wait for + * response. + */ +static void +applier_get_ballot_from_event(struct iostream *io, struct xrow_header *row, + struct ibuf *ibuf, struct ballot *ballot, + bool *is_empty) +{ + RegionGuard guard(&fiber()->gc); + xrow_encode_watch_key(row, box_ballot_event_key, IPROTO_WATCH); + coio_write_xrow(io, row); + coio_read_xrow(io, ibuf, row); + if (row->type != IPROTO_EVENT) { + xrow_decode_error_xc(row); + unreachable(); + } + struct watch_request watch_request; + xrow_decode_watch_xc(row, &watch_request); + if (watch_request.data == NULL) + tnt_raise(ClientError, ER_NO_SUCH_EVENT, box_ballot_event_key); + xrow_decode_ballot_event_xc(&watch_request, ballot, is_empty); +} + +/** Get applier ballot via the newest of available methods. */ +static void +applier_watch_ballot(struct applier *applier) +{ + struct xrow_header row; + struct iostream io; + struct iostream_ctx io_ctx; + struct ibuf ibuf; + iostream_ctx_clear(&io_ctx); + if (iostream_ctx_create(&io_ctx, IOSTREAM_CLIENT, &applier->uri) != 0) + diag_raise(); + iostream_clear(&io); + ibuf_create(&ibuf, &cord()->slabc, 1024); + auto guard = make_scoped_guard([&] { + if (iostream_is_initialized(&io)) + iostream_close(&io); + iostream_ctx_destroy(&io_ctx); + ibuf_destroy(&ibuf); + }); + + struct greeting greeting; + + applier->addr_len = sizeof(applier->addrstorage); + applier_connection_init(&io, &applier->uri, &applier->addr, + &applier->addr_len, &io_ctx, &greeting); + if (!iproto_features_test(&applier->features, + IPROTO_FEATURE_WATCHERS)) { + goto try_vote; + } + try { + while (true) { + bool is_empty; + applier_get_ballot_from_event(&io, &row, &ibuf, + &applier->ballot, + &is_empty); + /* + * Some events received right after master start + * might be empty. Ignore those. + */ + if (is_empty) + continue; + applier_run_ballot_triggers(applier, true); + } + } catch (ClientError *e) { + if (e->errcode() != ER_NO_SUCH_EVENT) + throw; + } +try_vote: + applier_get_ballot_from_vote(&io, &row, &ibuf, + &applier->ballot); + applier_run_ballot_triggers(applier, true); +} + +static int +applier_ballot_watcher_f(va_list ap) +{ + (void)ap; + struct applier *applier = (struct applier *)fiber()->f_arg; + while (true) { + try { + applier_watch_ballot(applier); + break; + } catch (TimedOut *) { + fiber_sleep(replication_reconnect_interval()); + diag_clear(diag_get()); + } catch (FiberIsCancelled *) { + diag_clear(diag_get()); + return 0; + } catch (Exception *) { + diag_log(); + applier_run_ballot_triggers(applier, false); + diag_clear(diag_get()); + break; + } + } + applier->ballot_watcher = NULL; + return 0; +} + +/** Trigger data for waiting for the first ballot update. */ +struct applier_ballot_data { + /** Diagnostics set in case of error. */ + struct diag diag; + /** The fiber waiting for the ballot update. */ + struct fiber *fiber; + /** Whether the ballot was updated. */ + bool done; +}; + +static void +applier_ballot_data_create(struct applier_ballot_data *data) +{ + diag_create(&data->diag); + data->fiber = fiber(); + data->done = false; +} + +static int +applier_on_first_ballot_update_f(struct trigger *trigger, void *event) +{ + struct applier_ballot_data *data = + (struct applier_ballot_data *)trigger->data; + bool success = *(bool *)event; + data->done = true; + if (!success) + diag_move(diag_get(), &data->diag); + fiber_wakeup(data->fiber); + return 0; +} + +/** Wait until remote node ballot is updated. */ +static void +applier_wait_first_ballot(struct applier *applier) +{ + struct trigger on_ballot_update; + struct applier_ballot_data data; + applier_ballot_data_create(&data); + trigger_create(&on_ballot_update, applier_on_first_ballot_update_f, + &data, NULL); + trigger_add(&applier->on_ballot_update, &on_ballot_update); + while (!data.done && !fiber_is_cancelled()) { + fiber_yield(); + } + trigger_clear(&on_ballot_update); + if (fiber_is_cancelled()) + tnt_raise(FiberIsCancelled); + assert(data.done); + if (!diag_is_empty(&data.diag)) { + diag_move(&data.diag, diag_get()); + diag_raise(); + } +} + /** * Connect to a remote host and authenticate the client. */ @@ -428,6 +622,7 @@ applier_connect(struct applier *applier) method_default = auth_method_by_name( id.auth_type, id.auth_type_len); } + applier->features = id.features; } else { xrow_decode_error(&row); diag_log(); @@ -438,26 +633,12 @@ applier_connect(struct applier *applier) if (method_default == NULL) method_default = AUTH_METHOD_DEFAULT; - /* - * Send an IPROTO_VOTE request to fetch the master's ballot - * before proceeding to "join". It will be used for leader - * election on bootstrap. - */ - xrow_encode_vote(&row); - coio_write_xrow(io, &row); - coio_read_xrow(io, ibuf, &row); - if (row.type == IPROTO_OK) { - xrow_decode_ballot_xc(&row, &applier->ballot); - } else try { - xrow_decode_error_xc(&row); - } catch (ClientError *e) { - if (e->errcode() != ER_UNKNOWN_REQUEST_TYPE) - e->raise(); - /* - * Master isn't aware of IPROTO_VOTE request. - * It's OK - we can proceed without it. - */ - } + applier->ballot_watcher = fiber_new_system("applier_ballot_watcher", + applier_ballot_watcher_f); + applier->ballot_watcher->f_arg = applier; + fiber_wakeup(applier->ballot_watcher); + + applier_wait_first_ballot(applier); applier_set_state(applier, APPLIER_CONNECTED); @@ -2014,12 +2195,30 @@ applier_thread_data_create(struct applier *applier, return 0; } +/** Interrupt the ballot watcher. */ +static void +applier_unwatch_ballot(struct applier *applier) +{ + if (applier->ballot_watcher != NULL) { + assert((applier->ballot_watcher->flags & + FIBER_IS_JOINABLE) == 0); + /* Non-joinable fibers are automatically recycled. */ + fiber_cancel(applier->ballot_watcher); + applier->ballot_watcher = NULL; + } +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ static void applier_subscribe(struct applier *applier) { + /* + * Applier doesn't need ballot updates once it subscribes. They were + * needed only during bootstrap or join. + */ + applier_unwatch_ballot(applier); /* Send SUBSCRIBE request */ struct iostream *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -2168,6 +2367,7 @@ applier_disconnect(struct applier *applier, enum applier_state state) applier_set_state(applier, state); if (iostream_is_initialized(&applier->io)) iostream_close(&applier->io); + applier_unwatch_ballot(applier); /* Clear all unparsed input. */ ibuf_reinit(&applier->ibuf); } @@ -2383,6 +2583,7 @@ applier_new(struct uri *uri) uri_move(&applier->uri, uri); applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); + rlist_create(&applier->on_ballot_update); fiber_cond_create(&applier->resume_cond); diag_create(&applier->diag); @@ -2398,6 +2599,7 @@ applier_delete(struct applier *applier) ibuf_destroy(&applier->ibuf); uri_destroy(&applier->uri); trigger_destroy(&applier->on_state); + trigger_destroy(&applier->on_ballot_update); diag_destroy(&applier->diag); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index 30ad83d179..baec9a443a 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -161,8 +161,12 @@ struct applier { struct uri uri; /** Remote version encoded as a number, see version_id() macro */ uint32_t version_id; + /** Remote instance features. */ + struct iproto_features features; /** Remote ballot at the time of connect. */ struct ballot ballot; + /** The fiber responsible for ballot updates. */ + struct fiber *ballot_watcher; /** Last requested vclock sync. */ uint64_t last_vclock_sync; /** Remote address */ @@ -180,6 +184,8 @@ struct applier { struct ibuf ibuf; /** Triggers invoked on state change */ struct rlist on_state; + /** Triggers invoked on ballot update. */ + struct rlist on_ballot_update; /** * Set if the applier was paused (see applier_pause()) and is now * waiting on resume_cond to be resumed (see applier_resume()). diff --git a/src/box/errcode.h b/src/box/errcode.h index 74e2cfbb5b..0153b0c673 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -312,6 +312,7 @@ struct errcode_record { /*257 */_(ER_AUTH_DELAY, "Too many authentication attempts") \ /*258 */_(ER_AUTH_REQUIRED, "Authentication required") \ /*259 */_(ER_SQL_SEQ_SCAN, "Scanning is not allowed for %s") \ + /*260 */_(ER_NO_SUCH_EVENT, "Unknown event %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/xrow.c b/src/box/xrow.c index 289338b20e..9e278cf773 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1481,6 +1481,23 @@ xrow_decode_call(const struct xrow_header *row, struct call_request *request) return 0; } +void +xrow_encode_watch_key(struct xrow_header *row, const char *key, uint16_t type) +{ + memset(row, 0, sizeof(*row)); + size_t size = mp_sizeof_map(1) + + mp_sizeof_uint(IPROTO_EVENT_KEY) + + mp_sizeof_str(strlen(key)); + char *buf = xregion_alloc(&fiber()->gc, size); + row->body[0].iov_base = buf; + buf = mp_encode_map(buf, 1); + buf = mp_encode_uint(buf, IPROTO_EVENT_KEY); + buf = mp_encode_str0(buf, key); + row->body[0].iov_len = buf - (char *)row->body[0].iov_base; + row->bodycnt = 1; + row->type = type; +} + int xrow_decode_watch(const struct xrow_header *row, struct watch_request *request) { @@ -1825,6 +1842,25 @@ mp_decode_ballot(const char *data, const char *end, return 0; } +int +xrow_decode_ballot_event(const struct watch_request *req, + struct ballot *ballot, bool *is_empty) +{ + assert(req->data != NULL); + assert(req->data_end > req->data); + /* + * Note that in contrary to xrow_decode_ballot() we do not nullify the + * ballot here. If some of the fields are omitted in the event, their + * previous values hold. + */ + if (mp_decode_ballot(req->data, req->data_end, ballot, is_empty) < 0) { + diag_set(ClientError, ER_INVALID_MSGPACK, "packet body"); + dump_row_hex(req->data, req->data_end); + return -1; + } + return 0; +} + /** * A template which can represent any replication request - join, register, * subscribe, etc. All fields are optional - when left NULL, they are not diff --git a/src/box/xrow.h b/src/box/xrow.h index f03450fb3e..04d749e664 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -409,6 +409,15 @@ struct watch_request { int xrow_decode_watch(const struct xrow_header *row, struct watch_request *request); +/** + * Encode a WATCH/UNWATCH request. + * @param[out] row Row to encode to. + * @param key The key to start/stop watching for. + * @param type Request type (WATCH or UNWATCH). + */ +void +xrow_encode_watch_key(struct xrow_header *row, const char *key, uint16_t type); + /** * AUTH request */ @@ -495,6 +504,16 @@ mp_encode_ballot(char *data, const struct ballot *ballot); int xrow_decode_ballot(const struct xrow_header *row, struct ballot *ballot); +/** + * Decode ballot as received in response to an IPROTO_WATCH request. + * @param req a decoded notification. + * @param[out] ballot Where to store the decoded ballot. + * @param[out] is_empty Whether the ballot is empty. + */ +int +xrow_decode_ballot_event(const struct watch_request *req, + struct ballot *ballot, bool *is_empty); + /** * Encode an instance vote request. * @param row[out] Row to encode into. @@ -1037,6 +1056,15 @@ xrow_decode_auth_xc(const struct xrow_header *row, diag_raise(); } +/** @copydoc xrow_decode_watch. */ +static inline void +xrow_decode_watch_xc(const struct xrow_header *row, + struct watch_request *request) +{ + if (xrow_decode_watch(row, request) != 0) + diag_raise(); +} + /** @copydoc xrow_decode_ballot. */ static inline void xrow_decode_ballot_xc(const struct xrow_header *row, struct ballot *ballot) @@ -1045,6 +1073,15 @@ xrow_decode_ballot_xc(const struct xrow_header *row, struct ballot *ballot) diag_raise(); } +/** @copydoc xrow_decode_ballot_event. */ +static inline void +xrow_decode_ballot_event_xc(const struct watch_request *req, + struct ballot *ballot, bool *is_empty) +{ + if (xrow_decode_ballot_event(req, ballot, is_empty) != 0) + diag_raise(); +} + /** @copydoc xrow_decode_subscribe. */ static inline void xrow_decode_subscribe_xc(const struct xrow_header *row, diff --git a/test/box/error.result b/test/box/error.result index 7c9b9c94bd..edb7fa0f30 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -478,6 +478,7 @@ t; | 257: box.error.AUTH_DELAY | 258: box.error.AUTH_REQUIRED | 259: box.error.SQL_SEQ_SCAN + | 260: box.error.NO_SUCH_EVENT | ... test_run:cmd("setopt delimiter ''"); -- GitLab