diff --git a/src/box/applier.cc b/src/box/applier.cc index 556502bfee143b8a965ef15cefdd7e14e30fbe30..02486acaeb5decac91b2d25f265e5fbc9ad79991 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -218,14 +218,12 @@ applier_connect(struct applier *applier) * It will be used for leader election on bootstrap. */ if (applier->version_id >= version_id(1, 7, 7)) { - xrow_encode_request_vote(&row); + xrow_encode_vote(&row); coio_write_xrow(coio, &row); coio_read_xrow(coio, ibuf, &row); if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); - vclock_create(&applier->vclock); - xrow_decode_request_vote_xc(&row, &applier->vclock, - &applier->remote_is_ro); + xrow_decode_ballot_xc(&row, &applier->ballot); } applier_set_state(applier, APPLIER_CONNECTED); diff --git a/src/box/applier.h b/src/box/applier.h index c33562cc84acbc2969d0bb86ef8164eeeeafd9f4..fbf1685e78ae0d7b8adab5d82abc3eb2dc6795eb 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -43,7 +43,7 @@ #include "tt_uuid.h" #include "uri.h" -#include "vclock.h" +#include "xrow.h" struct xstream; @@ -94,10 +94,8 @@ struct applier { struct uri uri; /** Remote version encoded as a number, see version_id() macro */ uint32_t version_id; - /** Remote vclock at time of connect. */ - struct vclock vclock; - /** Remote peer mode, true if read-only, default: false */ - bool remote_is_ro; + /** Remote status at time of connect. */ + struct ballot ballot; /** Remote address */ union { struct sockaddr addr; diff --git a/src/box/box.cc b/src/box/box.cc index b07eefa86cf728c0da43fa8dac921500ba7b9412..a3348fef5202d5f0cc481b9427f435268408e751 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1564,6 +1564,13 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) replica_version_id); } +void +box_process_vote(struct ballot *ballot) +{ + ballot->is_ro = cfg_geti("read_only") != 0; + vclock_copy(&ballot->vclock, &replicaset.vclock); +} + /** Insert a new cluster into _schema */ static void box_set_replicaset_uuid(const struct tt_uuid *replicaset_uuid) diff --git a/src/box/box.h b/src/box/box.h index 182e1b720166599ce265b52222ac0109b7080faa..e2e06d9776c8fa10aef699c707bd35fe90f43dc3 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -163,6 +163,9 @@ box_process_join(struct ev_io *io, struct xrow_header *header); void box_process_subscribe(struct ev_io *io, struct xrow_header *header); +void +box_process_vote(struct ballot *ballot); + /** * Check Lua configuration before initialization or * in case of a configuration change. diff --git a/src/box/iproto.cc b/src/box/iproto.cc index cba81a227fc0ac15cd2e1fa3fa6469fdcaadd833..04b66ee14f5579a8579a1e6cc6a7a112fe840be7 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -1158,7 +1158,8 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, cmsg_init(&msg->base, subscribe_route); *stop_input = true; break; - case IPROTO_REQUEST_VOTE: + case IPROTO_REQUEST_VOTE: /* deprecated. */ + case IPROTO_VOTE: cmsg_init(&msg->base, misc_route); break; case IPROTO_AUTH: @@ -1526,6 +1527,7 @@ tx_process_misc(struct cmsg *m) goto error; try { + struct ballot ballot; switch (msg->header.type) { case IPROTO_AUTH: box_process_auth(&msg->auth, con->salt); @@ -1542,6 +1544,11 @@ tx_process_misc(struct cmsg *m) &replicaset.vclock, cfg_geti("read_only")); break; + case IPROTO_VOTE: + box_process_vote(&ballot); + iproto_reply_vote_xc(out, &ballot, msg->header.sync, + ::schema_version); + break; default: unreachable(); } diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 3bc965bde308610a9e2625864b130eeaf27c69f9..3cd91cc4d1e9c4786baec531ac0ba8a3d4476838 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -87,6 +87,7 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x27 */ MP_STR, /* IPROTO_EXPR */ /* 0x28 */ MP_ARRAY, /* IPROTO_OPS */ /* 0x29 */ MP_BOOL, /* IPROTO_SERVER_IS_RO */ + /* 0x2a */ MP_MAP, /* IPROTO_BALLOT */ /* }}} */ }; @@ -168,7 +169,7 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "expression", /* 0x27 */ "operations", /* 0x28 */ "server is ro", /* 0x29 */ - NULL, /* 0x2a */ + "status", /* 0x2a */ NULL, /* 0x2b */ NULL, /* 0x2c */ NULL, /* 0x2d */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index ccbf2da53c62e41d510994a941306425be2dfe59..a4f0f3b01e565611dce04521176bda751615f0cc 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -78,12 +78,18 @@ enum iproto_key { IPROTO_EXPR = 0x27, /* EVAL */ IPROTO_OPS = 0x28, /* UPSERT but not UPDATE ops, because of legacy */ IPROTO_SERVER_IS_RO = 0x29, + IPROTO_BALLOT = 0x2a, /* Leave a gap between request keys and response keys */ IPROTO_DATA = 0x30, IPROTO_ERROR = 0x31, IPROTO_KEY_MAX }; +enum iproto_ballot_key { + IPROTO_BALLOT_IS_RO = 0x01, + IPROTO_BALLOT_VCLOCK = 0x02, +}; + #define bit(c) (1ULL<<IPROTO_##c) #define IPROTO_HEAD_BMAP (bit(REQUEST_TYPE) | bit(SYNC) | bit(REPLICA_ID) |\ @@ -155,8 +161,13 @@ enum iproto_type { IPROTO_JOIN = 65, /** Replication SUBSCRIBE command */ IPROTO_SUBSCRIBE = 66, - /** Vote request command for master election */ + /** + * Vote request command for master election + * DEPRECATED: use IPROTO_VOTE instead + */ IPROTO_REQUEST_VOTE = 67, + /** Instance status request command */ + IPROTO_VOTE = 68, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/replication.cc b/src/box/replication.cc index c1e176984d15c7a3f930fa3b9218e474b1fb99cf..c4d6e6f2690072fc725489731fa9441237c32161 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -732,7 +732,8 @@ replicaset_round(bool skip_ro) { struct replica *leader = NULL; replicaset_foreach(replica) { - if (replica->applier == NULL) + struct applier *applier = replica->applier; + if (applier == NULL) continue; /** * While bootstrapping a new cluster, read-only @@ -741,7 +742,7 @@ replicaset_round(bool skip_ro) * replicas since there is still a possibility * that all replicas exist in cluster table. */ - if (skip_ro && replica->applier->remote_is_ro) + if (skip_ro && applier->ballot.is_ro) continue; if (leader == NULL) { leader = replica; @@ -753,8 +754,8 @@ replicaset_round(bool skip_ro) * with the same vclock, prefer the one with * the lowest uuid. */ - int cmp = vclock_compare(&replica->applier->vclock, - &leader->applier->vclock); + int cmp = vclock_compare(&applier->ballot.vclock, + &leader->applier->ballot.vclock); if (cmp < 0) continue; if (cmp == 0 && tt_uuid_compare(&replica->uuid, diff --git a/src/box/xrow.c b/src/box/xrow.c index 56197d0e4b27622dc365dfe46ad23e4b389ee972..499379bcf7f98e0d8b0f0976bf1cdde282608536 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -343,6 +343,41 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync, return 0; } +int +iproto_reply_vote(struct obuf *out, const struct ballot *ballot, + uint64_t sync, uint32_t schema_version) +{ + size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(2) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(ballot->is_ro) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_vclock(&ballot->vclock); + + char *buf = obuf_reserve(out, max_size); + if (buf == NULL) { + diag_set(OutOfMemory, max_size, + "obuf_alloc", "buf"); + return -1; + } + + char *data = buf + IPROTO_HEADER_LEN; + data = mp_encode_map(data, 1); + data = mp_encode_uint(data, IPROTO_BALLOT); + data = mp_encode_map(data, 2); + data = mp_encode_uint(data, IPROTO_BALLOT_IS_RO); + data = mp_encode_bool(data, ballot->is_ro); + data = mp_encode_uint(data, IPROTO_BALLOT_VCLOCK); + data = mp_encode_vclock(data, &ballot->vclock); + size_t size = data - buf; + assert(size <= max_size); + + iproto_header_encode(buf, IPROTO_OK, sync, schema_version, + size - IPROTO_HEADER_LEN); + + char *ptr = obuf_alloc(out, size); + assert(ptr == buf); + return 0; +} + int iproto_reply_error(struct obuf *out, const struct error *e, uint64_t sync, uint32_t schema_version) @@ -847,10 +882,69 @@ xrow_decode_error(struct xrow_header *row) } void -xrow_encode_request_vote(struct xrow_header *row) +xrow_encode_vote(struct xrow_header *row) { memset(row, 0, sizeof(*row)); - row->type = IPROTO_REQUEST_VOTE; + row->type = IPROTO_VOTE; +} + +int +xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot) +{ + ballot->is_ro = false; + vclock_create(&ballot->vclock); + + if (row->bodycnt == 0) + goto err; + assert(row->bodycnt == 1); + + const char *data = (const char *) row->body[0].iov_base; + const char *end = data + row->body[0].iov_len; + const char *tmp = data; + if (mp_check(&tmp, end) != 0 || mp_typeof(*data) != MP_MAP) + goto err; + + /* Find STATUS key. */ + uint32_t map_size = mp_decode_map(&data); + for (uint32_t i = 0; i < map_size; i++) { + if (mp_typeof(*data) != MP_UINT) { + mp_next(&data); /* key */ + mp_next(&data); /* value */ + continue; + } + if (mp_decode_uint(&data) == IPROTO_BALLOT) + break; + } + if (data == end) + return 0; + + /* Decode STATUS map. */ + map_size = mp_decode_map(&data); + for (uint32_t i = 0; i < map_size; i++) { + if (mp_typeof(*data) != MP_UINT) { + mp_next(&data); /* key */ + mp_next(&data); /* value */ + continue; + } + uint32_t key = mp_decode_uint(&data); + switch (key) { + case IPROTO_BALLOT_IS_RO: + if (mp_typeof(*data) != MP_BOOL) + goto err; + ballot->is_ro = mp_decode_bool(&data); + break; + case IPROTO_BALLOT_VCLOCK: + if (mp_decode_vclock(&data, &ballot->vclock) != 0) + goto err; + break; + default: + mp_next(&data); + } + } + return 0; +err: + diag_set(ClientError, ER_INVALID_MSGPACK, "packet body"); + return -1; } int diff --git a/src/box/xrow.h b/src/box/xrow.h index 92ea3c97fdd51a77752bef565704c24a08c1c31d..4653aff2df070de4d8104ddede0814205b48c2d5 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -30,19 +30,19 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ +#include <stdbool.h> #include <stdint.h> #include <stddef.h> #include <sys/uio.h> /* struct iovec */ #include "tt_uuid.h" #include "diag.h" +#include "vclock.h" #if defined(__cplusplus) extern "C" { #endif -struct vclock; - enum { XROW_HEADER_IOVMAX = 1, XROW_BODY_IOVMAX = 2, @@ -223,12 +223,28 @@ xrow_encode_auth(struct xrow_header *row, const char *salt, size_t salt_len, const char *login, size_t login_len, const char *password, size_t password_len); +/** Instance status. */ +struct ballot { + /** Set if the instance is running in read-only mode. */ + bool is_ro; + /** Current instance vclock. */ + struct vclock vclock; +}; + /** - * Encode a vote request for master election. + * Decode ballot response to IPROTO_VOTE from MessagePack. + * @param row Row to decode. + * @param[out] status + */ +int +xrow_decode_ballot(struct xrow_header *row, struct ballot *ballot); + +/** + * Encode an instance vote request. * @param row[out] Row to encode into. */ void -xrow_encode_request_vote(struct xrow_header *row); +xrow_encode_vote(struct xrow_header *row); /** * Encode SUBSCRIBE command. @@ -314,22 +330,6 @@ xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); } -/** - * Decode peer vclock and access rights (a response to VOTE command). - * @param row Row to decode. - * @param[out] vclock. - * @param[out] read_only. - * - * @retval 0 Success. - * @retval -1 Memory or format error. - */ -static inline int -xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock, - bool *read_only) -{ - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only); -} - /** * Encode a heartbeat message. * @param row[out] Row to encode into. @@ -388,7 +388,7 @@ int iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version); /** - * Encode iproto header with IPROTO_OK response code + * Encode iproto header with IPROTO_OK response code. DEPRECATED. * and vclock in the body. * @param out Encode to. * @param sync Request sync. @@ -404,6 +404,20 @@ iproto_reply_request_vote(struct obuf *out, uint64_t sync, uint32_t schema_version, const struct vclock *vclock, bool read_only); +/** + * Encode a reply to an instance status request. + * @param out Buffer to write to. + * @param status Instance status to encode. + * @param sync Request sync. + * @param schema_version Actual schema version. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +iproto_reply_vote(struct obuf *out, const struct ballot *ballot, + uint64_t sync, uint32_t schema_version); + /** * Write an error packet int output buffer. Doesn't throw if out * of memory @@ -585,6 +599,14 @@ xrow_encode_auth_xc(struct xrow_header *row, const char *salt, size_t salt_len, diag_raise(); } +/** @copydoc xrow_decode_ballot. */ +static inline void +xrow_decode_ballot_xc(struct xrow_header *row, struct ballot *ballot) +{ + if (xrow_decode_ballot(row, ballot) != 0) + diag_raise(); +} + /** @copydoc xrow_encode_subscribe. */ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, @@ -642,15 +664,6 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock) diag_raise(); } -/** @copydoc xrow_decode_request_vote. */ -static inline void -xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock, - bool *read_only) -{ - if (xrow_decode_request_vote(row, vclock, read_only) != 0) - diag_raise(); -} - /** @copydoc iproto_reply_ok. */ static inline void iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) @@ -659,7 +672,7 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) diag_raise(); } -/** @copydoc iproto_reply_request_vote_xc. */ +/** @copydoc iproto_reply_request_vote. */ static inline void iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync, uint32_t schema_version, @@ -670,6 +683,15 @@ iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync, diag_raise(); } +/** @copydoc iproto_reply_status. */ +static inline void +iproto_reply_vote_xc(struct obuf *out, const struct ballot *status, + uint64_t sync, uint32_t schema_version) +{ + if (iproto_reply_vote(out, status, sync, schema_version) != 0) + diag_raise(); +} + #endif #endif /* TARANTOOL_XROW_H_INCLUDED */