diff --git a/src/box/applier.cc b/src/box/applier.cc index ecfe0771b4121537a92820f8117a120a3789fced..4b9a105159f424854f4cda8203db30329898130a 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -867,7 +867,7 @@ applier_subscribe(struct applier *applier) vclock_create(&vclock); vclock_copy(&vclock, &replicaset.vclock); xrow_encode_subscribe_xc(&row, &REPLICASET_UUID, &INSTANCE_UUID, - &vclock, replication_anon); + &vclock, replication_anon, 0); coio_write_xrow(coio, &row); /* Read SUBSCRIBE response */ diff --git a/src/box/box.cc b/src/box/box.cc index 5850894deb2ba4cc15da9e5f5bfc9cb4131122da..09dd67ab47362e80ebca45b26a4039d68cdf4c29 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1787,8 +1787,9 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) uint32_t replica_version_id; vclock_create(&replica_clock); bool anon; - xrow_decode_subscribe_xc(header, NULL, &replica_uuid, - &replica_clock, &replica_version_id, &anon); + uint32_t id_filter; + xrow_decode_subscribe_xc(header, NULL, &replica_uuid, &replica_clock, + &replica_version_id, &anon, &id_filter); /* Forbid connection to itself */ if (tt_uuid_is_equal(&replica_uuid, &INSTANCE_UUID)) @@ -1871,7 +1872,7 @@ box_process_subscribe(struct ev_io *io, struct xrow_header *header) * indefinitely). */ relay_subscribe(replica, io->fd, header->sync, &replica_clock, - replica_version_id); + replica_version_id, id_filter); } void diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index b66c05c0648df9b81eadd7d7ddff5bc636626cf7..f9d413a31daeb485de2d45477092553c86dcead3 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -125,6 +125,7 @@ enum iproto_key { IPROTO_STMT_ID = 0x43, /* Leave a gap between SQL keys and additional request keys */ IPROTO_REPLICA_ANON = 0x50, + IPROTO_ID_FILTER = 0x51, IPROTO_KEY_MAX }; diff --git a/src/box/relay.cc b/src/box/relay.cc index b89632273f7d96d0512a415992ba01f0a564e3e4..95245a3cfa6cf2c340e08d291f0756f4ae10ec61 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -109,6 +109,13 @@ struct relay { struct vclock recv_vclock; /** Replicatoin slave version. */ uint32_t version_id; + /** + * A filter of replica ids whose rows should be ignored. + * Each set filter bit corresponds to a replica id whose + * rows shouldn't be relayed. The list of ids to ignore + * is passed by the replica on subscribe. + */ + uint32_t id_filter; /** * Local vclock at the moment of subscribe, used to check * dataset on the other side and send missing data rows if any. @@ -676,7 +683,8 @@ relay_subscribe_f(va_list ap) /** Replication acceptor fiber handler. */ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, - struct vclock *replica_clock, uint32_t replica_version_id) + struct vclock *replica_clock, uint32_t replica_version_id, + uint32_t replica_id_filter) { assert(replica->anon || replica->id != REPLICA_ID_NIL); struct relay *relay = replica->relay; @@ -705,6 +713,8 @@ relay_subscribe(struct replica *replica, int fd, uint64_t sync, vclock_copy(&relay->tx.vclock, replica_clock); relay->version_id = replica_version_id; + relay->id_filter = replica_id_filter; + int rc = cord_costart(&relay->cord, "subscribe", relay_subscribe_f, relay); if (rc == 0) @@ -763,6 +773,9 @@ relay_send_row(struct xstream *stream, struct xrow_header *packet) packet->group_id = GROUP_DEFAULT; packet->bodycnt = 0; } + /* Check if the rows from the instance are filtered. */ + if ((1 << packet->replica_id & relay->id_filter) != 0) + return; /* * We're feeding a WAL, thus responding to FINAL JOIN or SUBSCRIBE * request. If this is FINAL JOIN (i.e. relay->replica is NULL), diff --git a/src/box/relay.h b/src/box/relay.h index e1782d78f93913094215d8bfa8ffed17a54ee515..0632fa91275f998e2adb003a97bdc5b9ec9bf3a1 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -124,6 +124,7 @@ relay_final_join(int fd, uint64_t sync, struct vclock *start_vclock, */ void relay_subscribe(struct replica *replica, int fd, uint64_t sync, - struct vclock *replica_vclock, uint32_t replica_version_id); + struct vclock *replica_vclock, uint32_t replica_version_id, + uint32_t replica_id_filter); #endif /* TARANTOOL_REPLICATION_RELAY_H_INCLUDED */ diff --git a/src/box/xrow.c b/src/box/xrow.c index 968c3a202b710dc5e160cfcaed048ccc4419f1b0..60204900478c2a517b967469b25053c4a52bdc19 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1194,7 +1194,8 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon) + const struct vclock *vclock, bool anon, + uint32_t id_filter) { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX + mp_sizeof_vclock(vclock); @@ -1204,7 +1205,8 @@ xrow_encode_subscribe(struct xrow_header *row, return -1; } char *data = buf; - data = mp_encode_map(data, 5); + int filter_size = __builtin_popcount(id_filter); + data = mp_encode_map(data, filter_size != 0 ? 6 : 5); data = mp_encode_uint(data, IPROTO_CLUSTER_UUID); data = xrow_encode_uuid(data, replicaset_uuid); data = mp_encode_uint(data, IPROTO_INSTANCE_UUID); @@ -1215,6 +1217,17 @@ xrow_encode_subscribe(struct xrow_header *row, data = mp_encode_uint(data, tarantool_version_id()); data = mp_encode_uint(data, IPROTO_REPLICA_ANON); data = mp_encode_bool(data, anon); + if (filter_size != 0) { + data = mp_encode_uint(data, IPROTO_ID_FILTER); + data = mp_encode_array(data, filter_size); + struct bit_iterator it; + bit_iterator_init(&it, &id_filter, sizeof(id_filter), + true); + for (size_t id = bit_iterator_next(&it); id < VCLOCK_MAX; + id = bit_iterator_next(&it)) { + data = mp_encode_uint(data, id); + } + } assert(data <= buf + size); row->body[0].iov_base = buf; row->body[0].iov_len = (data - buf); @@ -1226,7 +1239,8 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id, bool *anon) + uint32_t *version_id, bool *anon, + uint32_t *id_filter) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1244,6 +1258,8 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, if (anon) *anon = false; + if (id_filter) + *id_filter = 0; d = data; uint32_t map_size = mp_decode_map(&d); for (uint32_t i = 0; i < map_size; i++) { @@ -1301,6 +1317,24 @@ xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, } *anon = mp_decode_bool(&d); break; + case IPROTO_ID_FILTER: + if (id_filter == NULL) + goto skip; + if (mp_typeof(*d) != MP_ARRAY) { +id_filter_decode_err: xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, + "invalid ID_FILTER"); + return -1; + } + uint32_t len = mp_decode_array(&d); + for (uint32_t i = 0; i < len; ++i) { + if (mp_typeof(*d) != MP_UINT) + goto id_filter_decode_err; + uint64_t val = mp_decode_uint(&d); + if (val >= VCLOCK_MAX) + goto id_filter_decode_err; + *id_filter |= 1 << val; + } + break; default: skip: mp_next(&d); /* value */ } diff --git a/src/box/xrow.h b/src/box/xrow.h index 0973c497d438ea6a8cf6f7ae79f493b0cd2117bc..2a0a9c8526a8b2c965c9f776a1fdeb86495b534a 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -48,7 +48,7 @@ enum { XROW_BODY_IOVMAX = 2, XROW_IOVMAX = XROW_HEADER_IOVMAX + XROW_BODY_IOVMAX, XROW_HEADER_LEN_MAX = 52, - XROW_BODY_LEN_MAX = 128, + XROW_BODY_LEN_MAX = 256, IPROTO_HEADER_LEN = 28, /** 7 = sizeof(iproto_body_bin). */ IPROTO_SELECT_HEADER_LEN = IPROTO_HEADER_LEN + 7, @@ -322,6 +322,8 @@ xrow_encode_register(struct xrow_header *row, * @param instance_uuid Instance uuid. * @param vclock Replication clock. * @param anon Whether it is an anonymous subscribe request or not. + * @param id_filter A List of replica ids to skip rows from + * when feeding a replica. * * @retval 0 Success. * @retval -1 Memory error. @@ -330,7 +332,8 @@ int xrow_encode_subscribe(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon); + const struct vclock *vclock, bool anon, + uint32_t id_filter); /** * Decode SUBSCRIBE command. @@ -340,6 +343,8 @@ xrow_encode_subscribe(struct xrow_header *row, * @param[out] vclock. * @param[out] version_id. * @param[out] anon Whether it is an anonymous subscribe. + * @param[out] id_filter A list of ids to skip rows from when + * feeding a replica. * * @retval 0 Success. * @retval -1 Memory or format error. @@ -347,7 +352,8 @@ xrow_encode_subscribe(struct xrow_header *row, int xrow_decode_subscribe(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *version_id, bool *anon); + uint32_t *version_id, bool *anon, + uint32_t *id_filter); /** * Encode JOIN command. @@ -371,7 +377,8 @@ xrow_encode_join(struct xrow_header *row, const struct tt_uuid *instance_uuid); static inline int xrow_decode_join(struct xrow_header *row, struct tt_uuid *instance_uuid) { - return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, NULL, NULL, NULL, + NULL); } /** @@ -386,7 +393,8 @@ static inline int xrow_decode_register(struct xrow_header *row, struct tt_uuid *instance_uuid, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, NULL); + return xrow_decode_subscribe(row, NULL, instance_uuid, vclock, NULL, + NULL, NULL); } /** @@ -411,7 +419,7 @@ xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); static inline int xrow_decode_vclock(struct xrow_header *row, struct vclock *vclock) { - return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL); + return xrow_decode_subscribe(row, NULL, NULL, vclock, NULL, NULL, NULL); } /** @@ -442,7 +450,8 @@ xrow_decode_subscribe_response(struct xrow_header *row, struct tt_uuid *replicaset_uuid, struct vclock *vclock) { - return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, NULL); + return xrow_decode_subscribe(row, replicaset_uuid, NULL, vclock, NULL, + NULL, NULL); } /** @@ -817,10 +826,11 @@ static inline void xrow_encode_subscribe_xc(struct xrow_header *row, const struct tt_uuid *replicaset_uuid, const struct tt_uuid *instance_uuid, - const struct vclock *vclock, bool anon) + const struct vclock *vclock, bool anon, + uint32_t id_filter) { if (xrow_encode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, anon) != 0) + vclock, anon, id_filter) != 0) diag_raise(); } @@ -828,11 +838,13 @@ xrow_encode_subscribe_xc(struct xrow_header *row, static inline void xrow_decode_subscribe_xc(struct xrow_header *row, struct tt_uuid *replicaset_uuid, - struct tt_uuid *instance_uuid, struct vclock *vclock, - uint32_t *replica_version_id, bool *anon) + struct tt_uuid *instance_uuid, struct vclock *vclock, + uint32_t *replica_version_id, bool *anon, + uint32_t *id_filter) { if (xrow_decode_subscribe(row, replicaset_uuid, instance_uuid, - vclock, replica_version_id, anon) != 0) + vclock, replica_version_id, anon, + id_filter) != 0) diag_raise(); }