diff --git a/src/box/applier.cc b/src/box/applier.cc index 6d77d72f24517dd43c108d05c8c14c99f2b0a370..18780dd2b36940c07e55b9b27c465c6e2e8e0422 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -369,6 +369,200 @@ applier_connection_init(struct iostream *io, const struct uri *uri, } } +static void +applier_run_ballot_triggers(struct applier *applier, bool success) +{ + trigger_run(&applier->on_ballot_update, &success); +} + +/** + * Perform a IPROTO_VOTE exchange: send a request and decode the response. + * Fallback to this method if master isn't aware of "internal.ballot" event. + */ +static void +applier_get_ballot_from_vote(struct iostream *io, struct xrow_header *row, + struct ibuf *ibuf, struct ballot *ballot) +{ + RegionGuard guard(&fiber()->gc); + xrow_encode_vote(row); + coio_write_xrow(io, row); + coio_read_xrow(io, ibuf, row); + if (row->type == IPROTO_OK) { + xrow_decode_ballot_xc(row, ballot); + return; + } + xrow_decode_error(row); + struct diag *diag = diag_get(); + struct error *e = diag_last_error(diag); + /* + * Master isn't aware of IPROTO_VOTE request. + * It's OK - we can proceed without it. + */ + if (box_error_code(e) == ER_UNKNOWN_REQUEST_TYPE) { + diag_clear(diag); + return; + } + error_raise(e); +} + +/** + * Perform one round of IPROTO_WATCH("internal.ballot") exchange: send the + * request / acknowledge that previous notification is accepted, wait for + * response. + */ +static void +applier_get_ballot_from_event(struct iostream *io, struct xrow_header *row, + struct ibuf *ibuf, struct ballot *ballot, + bool *is_empty) +{ + RegionGuard guard(&fiber()->gc); + xrow_encode_watch_key(row, box_ballot_event_key, IPROTO_WATCH); + coio_write_xrow(io, row); + coio_read_xrow(io, ibuf, row); + if (row->type != IPROTO_EVENT) { + xrow_decode_error_xc(row); + unreachable(); + } + struct watch_request watch_request; + xrow_decode_watch_xc(row, &watch_request); + if (watch_request.data == NULL) + tnt_raise(ClientError, ER_NO_SUCH_EVENT, box_ballot_event_key); + xrow_decode_ballot_event_xc(&watch_request, ballot, is_empty); +} + +/** Get applier ballot via the newest of available methods. */ +static void +applier_watch_ballot(struct applier *applier) +{ + struct xrow_header row; + struct iostream io; + struct iostream_ctx io_ctx; + struct ibuf ibuf; + iostream_ctx_clear(&io_ctx); + if (iostream_ctx_create(&io_ctx, IOSTREAM_CLIENT, &applier->uri) != 0) + diag_raise(); + iostream_clear(&io); + ibuf_create(&ibuf, &cord()->slabc, 1024); + auto guard = make_scoped_guard([&] { + if (iostream_is_initialized(&io)) + iostream_close(&io); + iostream_ctx_destroy(&io_ctx); + ibuf_destroy(&ibuf); + }); + + struct greeting greeting; + + applier->addr_len = sizeof(applier->addrstorage); + applier_connection_init(&io, &applier->uri, &applier->addr, + &applier->addr_len, &io_ctx, &greeting); + if (!iproto_features_test(&applier->features, + IPROTO_FEATURE_WATCHERS)) { + goto try_vote; + } + try { + while (true) { + bool is_empty; + applier_get_ballot_from_event(&io, &row, &ibuf, + &applier->ballot, + &is_empty); + /* + * Some events received right after master start + * might be empty. Ignore those. + */ + if (is_empty) + continue; + applier_run_ballot_triggers(applier, true); + } + } catch (ClientError *e) { + if (e->errcode() != ER_NO_SUCH_EVENT) + throw; + } +try_vote: + applier_get_ballot_from_vote(&io, &row, &ibuf, + &applier->ballot); + applier_run_ballot_triggers(applier, true); +} + +static int +applier_ballot_watcher_f(va_list ap) +{ + (void)ap; + struct applier *applier = (struct applier *)fiber()->f_arg; + while (true) { + try { + applier_watch_ballot(applier); + break; + } catch (TimedOut *) { + fiber_sleep(replication_reconnect_interval()); + diag_clear(diag_get()); + } catch (FiberIsCancelled *) { + diag_clear(diag_get()); + return 0; + } catch (Exception *) { + diag_log(); + applier_run_ballot_triggers(applier, false); + diag_clear(diag_get()); + break; + } + } + applier->ballot_watcher = NULL; + return 0; +} + +/** Trigger data for waiting for the first ballot update. */ +struct applier_ballot_data { + /** Diagnostics set in case of error. */ + struct diag diag; + /** The fiber waiting for the ballot update. */ + struct fiber *fiber; + /** Whether the ballot was updated. */ + bool done; +}; + +static void +applier_ballot_data_create(struct applier_ballot_data *data) +{ + diag_create(&data->diag); + data->fiber = fiber(); + data->done = false; +} + +static int +applier_on_first_ballot_update_f(struct trigger *trigger, void *event) +{ + struct applier_ballot_data *data = + (struct applier_ballot_data *)trigger->data; + bool success = *(bool *)event; + data->done = true; + if (!success) + diag_move(diag_get(), &data->diag); + fiber_wakeup(data->fiber); + return 0; +} + +/** Wait until remote node ballot is updated. */ +static void +applier_wait_first_ballot(struct applier *applier) +{ + struct trigger on_ballot_update; + struct applier_ballot_data data; + applier_ballot_data_create(&data); + trigger_create(&on_ballot_update, applier_on_first_ballot_update_f, + &data, NULL); + trigger_add(&applier->on_ballot_update, &on_ballot_update); + while (!data.done && !fiber_is_cancelled()) { + fiber_yield(); + } + trigger_clear(&on_ballot_update); + if (fiber_is_cancelled()) + tnt_raise(FiberIsCancelled); + assert(data.done); + if (!diag_is_empty(&data.diag)) { + diag_move(&data.diag, diag_get()); + diag_raise(); + } +} + /** * Connect to a remote host and authenticate the client. */ @@ -428,6 +622,7 @@ applier_connect(struct applier *applier) method_default = auth_method_by_name( id.auth_type, id.auth_type_len); } + applier->features = id.features; } else { xrow_decode_error(&row); diag_log(); @@ -438,26 +633,12 @@ applier_connect(struct applier *applier) if (method_default == NULL) method_default = AUTH_METHOD_DEFAULT; - /* - * Send an IPROTO_VOTE request to fetch the master's ballot - * before proceeding to "join". It will be used for leader - * election on bootstrap. - */ - xrow_encode_vote(&row); - coio_write_xrow(io, &row); - coio_read_xrow(io, ibuf, &row); - if (row.type == IPROTO_OK) { - xrow_decode_ballot_xc(&row, &applier->ballot); - } else try { - xrow_decode_error_xc(&row); - } catch (ClientError *e) { - if (e->errcode() != ER_UNKNOWN_REQUEST_TYPE) - e->raise(); - /* - * Master isn't aware of IPROTO_VOTE request. - * It's OK - we can proceed without it. - */ - } + applier->ballot_watcher = fiber_new_system("applier_ballot_watcher", + applier_ballot_watcher_f); + applier->ballot_watcher->f_arg = applier; + fiber_wakeup(applier->ballot_watcher); + + applier_wait_first_ballot(applier); applier_set_state(applier, APPLIER_CONNECTED); @@ -2014,12 +2195,30 @@ applier_thread_data_create(struct applier *applier, return 0; } +/** Interrupt the ballot watcher. */ +static void +applier_unwatch_ballot(struct applier *applier) +{ + if (applier->ballot_watcher != NULL) { + assert((applier->ballot_watcher->flags & + FIBER_IS_JOINABLE) == 0); + /* Non-joinable fibers are automatically recycled. */ + fiber_cancel(applier->ballot_watcher); + applier->ballot_watcher = NULL; + } +} + /** * Execute and process SUBSCRIBE request (follow updates from a master). */ static void applier_subscribe(struct applier *applier) { + /* + * Applier doesn't need ballot updates once it subscribes. They were + * needed only during bootstrap or join. + */ + applier_unwatch_ballot(applier); /* Send SUBSCRIBE request */ struct iostream *io = &applier->io; struct ibuf *ibuf = &applier->ibuf; @@ -2168,6 +2367,7 @@ applier_disconnect(struct applier *applier, enum applier_state state) applier_set_state(applier, state); if (iostream_is_initialized(&applier->io)) iostream_close(&applier->io); + applier_unwatch_ballot(applier); /* Clear all unparsed input. */ ibuf_reinit(&applier->ibuf); } @@ -2383,6 +2583,7 @@ applier_new(struct uri *uri) uri_move(&applier->uri, uri); applier->last_row_time = ev_monotonic_now(loop()); rlist_create(&applier->on_state); + rlist_create(&applier->on_ballot_update); fiber_cond_create(&applier->resume_cond); diag_create(&applier->diag); @@ -2398,6 +2599,7 @@ applier_delete(struct applier *applier) ibuf_destroy(&applier->ibuf); uri_destroy(&applier->uri); trigger_destroy(&applier->on_state); + trigger_destroy(&applier->on_ballot_update); diag_destroy(&applier->diag); free(applier); } diff --git a/src/box/applier.h b/src/box/applier.h index 30ad83d179a19c0ae46c17b78422b08c8adc37e7..baec9a443a2d7b39ec8d8b0f31e58b7e23d5f3b0 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -161,8 +161,12 @@ struct applier { struct uri uri; /** Remote version encoded as a number, see version_id() macro */ uint32_t version_id; + /** Remote instance features. */ + struct iproto_features features; /** Remote ballot at the time of connect. */ struct ballot ballot; + /** The fiber responsible for ballot updates. */ + struct fiber *ballot_watcher; /** Last requested vclock sync. */ uint64_t last_vclock_sync; /** Remote address */ @@ -180,6 +184,8 @@ struct applier { struct ibuf ibuf; /** Triggers invoked on state change */ struct rlist on_state; + /** Triggers invoked on ballot update. */ + struct rlist on_ballot_update; /** * Set if the applier was paused (see applier_pause()) and is now * waiting on resume_cond to be resumed (see applier_resume()). diff --git a/src/box/errcode.h b/src/box/errcode.h index 74e2cfbb5bd24dfa2341824ef7f0f089fb9bb43f..0153b0c673b14d62f8dcb132140f826d6a657686 100644 --- a/src/box/errcode.h +++ b/src/box/errcode.h @@ -312,6 +312,7 @@ struct errcode_record { /*257 */_(ER_AUTH_DELAY, "Too many authentication attempts") \ /*258 */_(ER_AUTH_REQUIRED, "Authentication required") \ /*259 */_(ER_SQL_SEQ_SCAN, "Scanning is not allowed for %s") \ + /*260 */_(ER_NO_SUCH_EVENT, "Unknown event %s") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/box/xrow.c b/src/box/xrow.c index 289338b20e3168c35c05d29954338fe3ed62ff58..9e278cf773d473a4f6d9f6242ecab3da55e140d8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1481,6 +1481,23 @@ xrow_decode_call(const struct xrow_header *row, struct call_request *request) return 0; } +void +xrow_encode_watch_key(struct xrow_header *row, const char *key, uint16_t type) +{ + memset(row, 0, sizeof(*row)); + size_t size = mp_sizeof_map(1) + + mp_sizeof_uint(IPROTO_EVENT_KEY) + + mp_sizeof_str(strlen(key)); + char *buf = xregion_alloc(&fiber()->gc, size); + row->body[0].iov_base = buf; + buf = mp_encode_map(buf, 1); + buf = mp_encode_uint(buf, IPROTO_EVENT_KEY); + buf = mp_encode_str0(buf, key); + row->body[0].iov_len = buf - (char *)row->body[0].iov_base; + row->bodycnt = 1; + row->type = type; +} + int xrow_decode_watch(const struct xrow_header *row, struct watch_request *request) { @@ -1825,6 +1842,25 @@ mp_decode_ballot(const char *data, const char *end, return 0; } +int +xrow_decode_ballot_event(const struct watch_request *req, + struct ballot *ballot, bool *is_empty) +{ + assert(req->data != NULL); + assert(req->data_end > req->data); + /* + * Note that in contrary to xrow_decode_ballot() we do not nullify the + * ballot here. If some of the fields are omitted in the event, their + * previous values hold. + */ + if (mp_decode_ballot(req->data, req->data_end, ballot, is_empty) < 0) { + diag_set(ClientError, ER_INVALID_MSGPACK, "packet body"); + dump_row_hex(req->data, req->data_end); + return -1; + } + return 0; +} + /** * A template which can represent any replication request - join, register, * subscribe, etc. All fields are optional - when left NULL, they are not diff --git a/src/box/xrow.h b/src/box/xrow.h index f03450fb3ed08c80cd24dbfb1ac64d8ac5abf45c..04d749e664f32175a02f7a94fa09d6d4a21025fa 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -409,6 +409,15 @@ struct watch_request { int xrow_decode_watch(const struct xrow_header *row, struct watch_request *request); +/** + * Encode a WATCH/UNWATCH request. + * @param[out] row Row to encode to. + * @param key The key to start/stop watching for. + * @param type Request type (WATCH or UNWATCH). + */ +void +xrow_encode_watch_key(struct xrow_header *row, const char *key, uint16_t type); + /** * AUTH request */ @@ -495,6 +504,16 @@ mp_encode_ballot(char *data, const struct ballot *ballot); int xrow_decode_ballot(const struct xrow_header *row, struct ballot *ballot); +/** + * Decode ballot as received in response to an IPROTO_WATCH request. + * @param req a decoded notification. + * @param[out] ballot Where to store the decoded ballot. + * @param[out] is_empty Whether the ballot is empty. + */ +int +xrow_decode_ballot_event(const struct watch_request *req, + struct ballot *ballot, bool *is_empty); + /** * Encode an instance vote request. * @param row[out] Row to encode into. @@ -1037,6 +1056,15 @@ xrow_decode_auth_xc(const struct xrow_header *row, diag_raise(); } +/** @copydoc xrow_decode_watch. */ +static inline void +xrow_decode_watch_xc(const struct xrow_header *row, + struct watch_request *request) +{ + if (xrow_decode_watch(row, request) != 0) + diag_raise(); +} + /** @copydoc xrow_decode_ballot. */ static inline void xrow_decode_ballot_xc(const struct xrow_header *row, struct ballot *ballot) @@ -1045,6 +1073,15 @@ xrow_decode_ballot_xc(const struct xrow_header *row, struct ballot *ballot) diag_raise(); } +/** @copydoc xrow_decode_ballot_event. */ +static inline void +xrow_decode_ballot_event_xc(const struct watch_request *req, + struct ballot *ballot, bool *is_empty) +{ + if (xrow_decode_ballot_event(req, ballot, is_empty) != 0) + diag_raise(); +} + /** @copydoc xrow_decode_subscribe. */ static inline void xrow_decode_subscribe_xc(const struct xrow_header *row, diff --git a/test/box/error.result b/test/box/error.result index 7c9b9c94bd25e75a01e8c2971ef467c72917995d..edb7fa0f305240bebc840eae36bfa092dfe4e170 100644 --- a/test/box/error.result +++ b/test/box/error.result @@ -478,6 +478,7 @@ t; | 257: box.error.AUTH_DELAY | 258: box.error.AUTH_REQUIRED | 259: box.error.SQL_SEQ_SCAN + | 260: box.error.NO_SUCH_EVENT | ... test_run:cmd("setopt delimiter ''");