diff --git a/src/box/applier.cc b/src/box/applier.cc index 9aa951c3401cc619266017cdf253e88719ccbd5a..127a8c9050922961726bd92d44976d50d15b4e81 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -223,7 +223,8 @@ applier_connect(struct applier *applier) if (row.type != IPROTO_OK) xrow_decode_error_xc(&row); vclock_create(&applier->vclock); - xrow_decode_vclock_xc(&row, &applier->vclock); + xrow_decode_request_vote_xc(&row, &applier->vclock, + &applier->remote_is_ro); } applier_set_state(applier, APPLIER_CONNECTED); diff --git a/src/box/applier.h b/src/box/applier.h index f25d6cb264cf2f60dc1d72610531c5f54d9a1788..392113e2fe56b6f4cdd4b58f4bc9f46becc2b6dd 100644 --- a/src/box/applier.h +++ b/src/box/applier.h @@ -95,6 +95,8 @@ struct applier { uint32_t version_id; /** Remote vclock at time of connect. */ struct vclock vclock; + /** Remote peer mode, true if read-only, default: false */ + bool remote_is_ro; /** Remote address */ union { struct sockaddr addr; diff --git a/src/box/iproto.cc b/src/box/iproto.cc index db5820806e39dcbbe9f49b3059236030148c230c..81938ce0bff83a61c5f93def361215828540bf3a 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -60,6 +60,8 @@ #include "iproto_constants.h" #include "rmean.h" #include "errinj.h" +#include "applier.h" +#include "cfg.h" /* The number of iproto messages in flight */ enum { IPROTO_MSG_MAX = 768 }; @@ -1363,9 +1365,10 @@ tx_process_misc(struct cmsg *m) ::schema_version); break; case IPROTO_REQUEST_VOTE: - iproto_reply_vclock_xc(out, msg->header.sync, - ::schema_version, - &replicaset.vclock); + iproto_reply_request_vote_xc(out, msg->header.sync, + ::schema_version, + &replicaset.vclock, + cfg_geti("read_only")); break; default: unreachable(); diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index cd7b1d03bc6e5504b21ae96c6c5863a09352d05b..3735a91a53e020479cbf71b9a9b9b908fb0904b7 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -40,10 +40,10 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x04 */ MP_DOUBLE, /* IPROTO_TIMESTAMP */ /* 0x05 */ MP_UINT, /* IPROTO_SCHEMA_VERSION */ /* 0x06 */ MP_UINT, /* IPROTO_SERVER_VERSION */ + /* 0x07 */ MP_UINT, /* IPROTO_SERVER_IS_RO */ /* }}} */ /* {{{ unused */ - /* 0x07 */ MP_UINT, /* 0x08 */ MP_UINT, /* 0x09 */ MP_UINT, /* 0x0a */ MP_UINT, diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 9518424851e7a6aabb1e2156d17aafbf8de3b0e0..358122287acf9e04ef6b9211f3215127eefb55bb 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -58,6 +58,7 @@ enum iproto_key { IPROTO_TIMESTAMP = 0x04, IPROTO_SCHEMA_VERSION = 0x05, IPROTO_SERVER_VERSION = 0x06, + IPROTO_SERVER_IS_RO = 0x07, /* Leave a gap for other keys in the header. */ IPROTO_SPACE_ID = 0x10, IPROTO_INDEX_ID = 0x11, diff --git a/src/box/replication.cc b/src/box/replication.cc index 760f8375198a9e26b3af53fa5cb68c75cffa99d2..c704f4ee265b8c08a38e1b8deae9bbe940714a1c 100644 --- a/src/box/replication.cc +++ b/src/box/replication.cc @@ -689,6 +689,14 @@ replicaset_leader(void) replicaset_foreach(replica) { if (replica->applier == NULL) continue; + /** + * While bootstrapping a new cluster, + * read-only replicas shouldn't be considered + * as a leader. + */ + if (replica->applier->remote_is_ro && + replica->applier->vclock.signature == 0) + continue; if (leader == NULL) { leader = replica; continue; diff --git a/src/box/xrow.c b/src/box/xrow.c index f485256450662f4cf1936519a3b629900312ad58..b3f81a86f7f69dd90bd5a4f4eda876339d521048 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -261,14 +261,16 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version) } int -iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version, - const struct vclock *vclock) +iproto_reply_request_vote(struct obuf *out, uint64_t sync, + uint32_t schema_version, const struct vclock *vclock, + bool read_only) { uint32_t replicaset_size = vclock_size(vclock); - size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(1) + + size_t max_size = IPROTO_HEADER_LEN + mp_sizeof_map(2) + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_map(replicaset_size) + replicaset_size * (mp_sizeof_uint(UINT32_MAX) + - mp_sizeof_uint(UINT64_MAX)); + mp_sizeof_uint(UINT64_MAX)) + + mp_sizeof_uint(UINT32_MAX) + mp_sizeof_bool(true); char *buf = obuf_reserve(out, max_size); if (buf == NULL) { @@ -278,7 +280,9 @@ iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version, } char *data = buf + IPROTO_HEADER_LEN; - data = mp_encode_map(data, 1); + data = mp_encode_map(data, 2); + data = mp_encode_uint(data, IPROTO_SERVER_IS_RO); + data = mp_encode_bool(data, read_only); data = mp_encode_uint(data, IPROTO_VCLOCK); data = mp_encode_map(data, replicaset_size); struct vclock_iterator it; @@ -837,7 +841,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id) + uint32_t *version_id, bool *read_only) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -852,6 +856,9 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, return -1; } + /* For backward compatibility initialize read-only with false. */ + if (read_only) + *read_only = false; const char *lsnmap = NULL; d = data; uint32_t map_size = mp_decode_map(&d); @@ -896,6 +903,16 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *version_id = mp_decode_uint(&d); break; + case IPROTO_SERVER_IS_RO: + if (read_only == NULL) + goto skip; + if (mp_typeof(*d) != MP_BOOL) { + diag_set(ClientError, ER_INVALID_MSGPACK, + "invalid STATUS"); + return -1; + } + *read_only = mp_decode_bool(&d); + break; default: skip: mp_next(&d); /* value */ } diff --git a/src/box/xrow.h b/src/box/xrow.h index d407d151ba0e226cd3909d6c3440a36abc56598a..b10bf26d50d66a2daf57f625e465e246c7347337 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -251,6 +251,8 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] replicaset_uuid. * @param[out] instance_uuid. * @param[out] vclock. + * @param[out] version_id. + * @param[out] read_only. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -258,7 +260,7 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id); + uint32_t *version_id, bool *read_only); /** * Encode JOIN command. @@ -282,7 +284,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); static inline int xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, + NULL); } /** @@ -307,7 +310,23 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); +} + +/** + * Decode peer vclock and access rights (a response to VOTE command). + * @param row Row to decode. + * @param[out] vclock. + * @param[out] read_only. + * + * @retval 0 Success. + * @retval -1 Memory or format error. + */ +static inline int +xrow_decode_request_vote(struct xrow_header *row, struct vclock *vclock, + bool *read_only) +{ + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, read_only); } /** @@ -374,13 +393,15 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version); * @param sync Request sync. * @param schema_version. * @param vclock. + * @param read_only. * * @retval 0 Success. * @retval -1 Memory error. */ int -iproto_reply_vclock(struct obuf *out, uint64_t sync, uint32_t schema_version, - const struct vclock *vclock); +iproto_reply_request_vote(struct obuf *out, uint64_t sync, + uint32_t schema_version, const struct vclock *vclock, + bool read_only); /** * Write an error packet int output buffer. Doesn't throw if out @@ -571,7 +592,7 @@ xrow_decode_subscribe_xc(struct xrow_header *row, uint32_t *replica_version_id) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id) != 0) + vclock, replica_version_id, NULL) != 0) diag_raise(); } @@ -608,6 +629,15 @@ xrow_decode_vclock_xc(struct xrow_header *row, struct vclock *vclock) diag_raise(); } +/** @copydoc xrow_decode_request_vote. */ +static inline void +xrow_decode_request_vote_xc(struct xrow_header *row, struct vclock *vclock, + bool *read_only) +{ + if (xrow_decode_request_vote(row, vclock, read_only) != 0) + diag_raise(); +} + /** @copydoc iproto_reply_ok. */ static inline void iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) @@ -616,12 +646,14 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) diag_raise(); } -/** @copydoc iproto_reply_vclock. */ +/** @copydoc iproto_reply_request_vote_xc. */ static inline void -iproto_reply_vclock_xc(struct obuf *out, uint64_t sync, uint32_t schema_version, - const struct vclock *vclock) +iproto_reply_request_vote_xc(struct obuf *out, uint64_t sync, + uint32_t schema_version, + const struct vclock *vclock, bool read_only) { - if (iproto_reply_vclock(out, sync, schema_version, vclock) != 0) + if (iproto_reply_request_vote(out, sync, schema_version, + vclock, read_only) != 0) diag_raise(); } diff --git a/test/replication/replica_uuid_ro.lua b/test/replication/replica_uuid_ro.lua new file mode 100644 index 0000000000000000000000000000000000000000..dd5dae9e883bb5e4c6c937d5549b9e18b210315a --- /dev/null +++ b/test/replication/replica_uuid_ro.lua @@ -0,0 +1,33 @@ +#!/usr/bin/env tarantool + +-- get instance name from filename (replica_uuid_ro1.lua => replica_uuid_ro1) +local INSTANCE_ID = string.match(arg[0], "%d") +local USER = 'cluster' +local PASSWORD = 'somepassword' +local SOCKET_DIR = require('fio').cwd() +local function instance_uri(instance_id) + --return 'localhost:'..(3310 + instance_id) + return SOCKET_DIR..'/replica_uuid_ro'..instance_id..'.sock'; +end + +-- start console first +require('console').listen(os.getenv('ADMIN')) + +box.cfg({ + instance_uuid = arg[1]; + listen = instance_uri(INSTANCE_ID); +-- log_level = 7; + replication = { + USER..':'..PASSWORD..'@'..instance_uri(1); + USER..':'..PASSWORD..'@'..instance_uri(2); + }; + read_only = (INSTANCE_ID ~= '1' and true or false); +}) + +box.once("bootstrap", function() + local test_run = require('test_run').new() + box.schema.user.create(USER, { password = PASSWORD }) + box.schema.user.grant(USER, 'replication') + box.schema.space.create('test', {engine = test_run:get_cfg('engine')}) + box.space.test:create_index('primary') +end) diff --git a/test/replication/replica_uuid_ro1.lua b/test/replication/replica_uuid_ro1.lua new file mode 120000 index 0000000000000000000000000000000000000000..342d71c57c8898ab1af4a9513d2bb1f3f7ea4962 --- /dev/null +++ b/test/replication/replica_uuid_ro1.lua @@ -0,0 +1 @@ +replica_uuid_ro.lua \ No newline at end of file diff --git a/test/replication/replica_uuid_ro2.lua b/test/replication/replica_uuid_ro2.lua new file mode 120000 index 0000000000000000000000000000000000000000..342d71c57c8898ab1af4a9513d2bb1f3f7ea4962 --- /dev/null +++ b/test/replication/replica_uuid_ro2.lua @@ -0,0 +1 @@ +replica_uuid_ro.lua \ No newline at end of file diff --git a/test/replication/replicaset_ro_mostly.result b/test/replication/replicaset_ro_mostly.result new file mode 100644 index 0000000000000000000000000000000000000000..d753a182d37c78bc426698331a20526c6e1d5d31 --- /dev/null +++ b/test/replication/replicaset_ro_mostly.result @@ -0,0 +1,59 @@ +-- gh-3257 check bootstrap with read-only replica in cluster. +-- Old behaviour: failed, since read-only is chosen by uuid. +test_run = require('test_run').new() +--- +... +SERVERS = {'replica_uuid_ro1', 'replica_uuid_ro2'} +--- +... +uuid = require('uuid') +--- +... +uuid1 = uuid.new() +--- +... +uuid2 = uuid.new() +--- +... +function sort_cmp(a, b) return a.time_low > b.time_low and true or false end +--- +... +function sort(t) table.sort(t, sort_cmp) return t end +--- +... +UUID = sort({uuid1, uuid2}, sort_cmp) +--- +... +create_cluster_cmd1 = 'create server %s with script="replication/%s.lua"' +--- +... +create_cluster_cmd2 = 'start server %s with args="%s", wait_load=False, wait=False' +--- +... +test_run:cmd("setopt delimiter ';'") +--- +- true +... +function create_cluster_uuid(servers, uuids) + for i, name in ipairs(servers) do + test_run:cmd(create_cluster_cmd1:format(name, name)) + test_run:cmd(create_cluster_cmd2:format(name, uuids[i])) + end +end; +--- +... +test_run:cmd("setopt delimiter ''"); +--- +- true +... +-- Deploy a cluster. +create_cluster_uuid(SERVERS, UUID) +--- +... +test_run:wait_fullmesh(SERVERS) +--- +... +-- Cleanup. +test_run:drop_cluster(SERVERS) +--- +... diff --git a/test/replication/replicaset_ro_mostly.test.lua b/test/replication/replicaset_ro_mostly.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..539ca5a136c5e98d9afeb89d8f12cca04984c67a --- /dev/null +++ b/test/replication/replicaset_ro_mostly.test.lua @@ -0,0 +1,30 @@ +-- gh-3257 check bootstrap with read-only replica in cluster. +-- Old behaviour: failed, since read-only is chosen by uuid. +test_run = require('test_run').new() + +SERVERS = {'replica_uuid_ro1', 'replica_uuid_ro2'} + +uuid = require('uuid') +uuid1 = uuid.new() +uuid2 = uuid.new() +function sort_cmp(a, b) return a.time_low > b.time_low and true or false end +function sort(t) table.sort(t, sort_cmp) return t end +UUID = sort({uuid1, uuid2}, sort_cmp) + +create_cluster_cmd1 = 'create server %s with script="replication/%s.lua"' +create_cluster_cmd2 = 'start server %s with args="%s", wait_load=False, wait=False' + +test_run:cmd("setopt delimiter ';'") +function create_cluster_uuid(servers, uuids) + for i, name in ipairs(servers) do + test_run:cmd(create_cluster_cmd1:format(name, name)) + test_run:cmd(create_cluster_cmd2:format(name, uuids[i])) + end +end; +test_run:cmd("setopt delimiter ''"); + +-- Deploy a cluster. +create_cluster_uuid(SERVERS, UUID) +test_run:wait_fullmesh(SERVERS) +-- Cleanup. +test_run:drop_cluster(SERVERS)