diff --git a/src/box/applier.cc b/src/box/applier.cc index a9b6cacbc5076227ff6e118e39c4a2ec7f4abc8d..2a6fe8b245e0a456fc33ec28ab5f1c3548f49ea5 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -447,12 +447,26 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_vclock_xc(&row, &replicaset.vclock); } + coio_read_xrow(coio, ibuf, &row); + if (row.type == IPROTO_JOIN_META) { + /* Read additional metadata. Empty at the moment. */ + do { + coio_read_xrow(coio, ibuf, &row); + if (iproto_type_is_error(row.type)) { + xrow_decode_error_xc(&row); + } else if (row.type != IPROTO_JOIN_SNAPSHOT) { + tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, + (uint32_t)row.type); + } + } while (row.type != IPROTO_JOIN_SNAPSHOT); + coio_read_xrow(coio, ibuf, &row); + } + /* * Receive initial data. */ uint64_t row_count = 0; while (true) { - coio_read_xrow(coio, ibuf, &row); applier->last_row_time = ev_monotonic_now(loop()); if (iproto_type_is_dml(row.type)) { if (apply_snapshot_row(&row) != 0) @@ -477,6 +491,7 @@ applier_wait_snapshot(struct applier *applier) tnt_raise(ClientError, ER_UNKNOWN_REQUEST_TYPE, (uint32_t) row.type); } + coio_read_xrow(coio, ibuf, &row); } return row_count; diff --git a/src/box/box.cc b/src/box/box.cc index 692c5274ace1c94a968312ff4f566cffb8d780b7..9dbeee5db44383310c86dd338e4db4c2a93716df 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -2458,7 +2458,7 @@ box_process_fetch_snapshot(struct ev_io *io, struct xrow_header *header) /* Send the snapshot data to the instance. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, 0); say_info("read-view sent."); /* Remember master's vclock after the last request */ @@ -2606,7 +2606,7 @@ box_process_join(struct ev_io *io, struct xrow_header *header) /* Decode JOIN request */ struct tt_uuid instance_uuid = uuid_nil; - uint32_t replica_version_id; + uint32_t replica_version_id = 0; xrow_decode_join_xc(header, &instance_uuid, &replica_version_id); /* Check that bootstrap has been finished */ @@ -2656,7 +2656,8 @@ box_process_join(struct ev_io *io, struct xrow_header *header) * Initial stream: feed replica with dirty data from engines. */ struct vclock start_vclock; - relay_initial_join(io->fd, header->sync, &start_vclock); + relay_initial_join(io->fd, header->sync, &start_vclock, + replica_version_id); say_info("initial data sent."); /** diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index ea7290da6d57bea344ffc597895b66f329b9ebff..758cd002c9f82241abcab027d28d4d19f1bce55b 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -262,6 +262,8 @@ enum iproto_type { IPROTO_FETCH_SNAPSHOT = 69, /** REGISTER request to leave anonymous replication. */ IPROTO_REGISTER = 70, + IPROTO_JOIN_META = 71, + IPROTO_JOIN_SNAPSHOT = 72, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/relay.cc b/src/box/relay.cc index 60f527b7f7332e9a929d2abfbfaa02446e084210..7f2dc368f6cce41539b26d6a9b57c49a98d135c0 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -392,7 +392,8 @@ relay_set_cord_name(int fd) } void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id) { struct relay *relay = relay_new(NULL); if (relay == NULL) @@ -432,6 +433,22 @@ relay_initial_join(int fd, uint64_t sync, struct vclock *vclock) row.sync = sync; coio_write_xrow(&relay->io, &row); + /* + * Version is present starting with 2.7.3, 2.8.2, 2.9.1 + * All these versions know of additional META stage of initial join. + */ + if (replica_version_id > 0) { + /* Mark the beginning of the metadata stream. */ + xrow_encode_type(&row, IPROTO_JOIN_META); + xstream_write(&relay->stream, &row); + + /* Empty at the moment. */ + + /* Mark the end of the metadata stream. */ + xrow_encode_type(&row, IPROTO_JOIN_SNAPSHOT); + xstream_write(&relay->stream, &row); + } + /* Send read view to the replica. */ engine_join_xc(&ctx, &relay->stream); } diff --git a/src/box/relay.h b/src/box/relay.h index 615ffb75d3d86d35414fcbacfee2fd3231d1d69e..112428ae8fed0f4d717590824b0818e720663e27 100644 --- a/src/box/relay.h +++ b/src/box/relay.h @@ -116,9 +116,11 @@ relay_push_raft(struct relay *relay, const struct raft_request *req); * @param fd client connection * @param sync sync from incoming JOIN request * @param vclock[out] vclock of the read view sent to the replica + * @param replica_version_id peer's version */ void -relay_initial_join(int fd, uint64_t sync, struct vclock *vclock); +relay_initial_join(int fd, uint64_t sync, struct vclock *vclock, + uint32_t replica_version_id); /** * Send final JOIN rows to the replica. diff --git a/src/box/xrow.c b/src/box/xrow.c index 5c5da480856dc7476ece4c2ee367a362172dc63d..8ab8b27687df0d18b529c8df3bf640541adb1160 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1730,6 +1730,13 @@ xrow_encode_timestamp(struct xrow_header *row, uint32_t replica_id, double tm) row->tm = tm; } +void +xrow_encode_type(struct xrow_header *row, uint16_t type) +{ + memset(row, 0, sizeof(*row)); + row->type = type; +} + void greeting_encode(char *greetingbuf, uint32_t version_id, const struct tt_uuid *uuid, const char *salt, uint32_t salt_len) diff --git a/src/box/xrow.h b/src/box/xrow.h index 30d6b8639c28633395b105254386c22aa177bfee..c6e8ed0fd019fc8d7952d72b912bf9906b4c2156 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -570,6 +570,14 @@ xrow_decode_subscribe_response(struct xrow_header *row, void xrow_encode_timestamp(struct xrow_header *row, uint32_t replica_id, double tm); +/** + * Encode any bodyless message. + * @param row[out] Row to encode into. + * @param type Message type. + */ +void +xrow_encode_type(struct xrow_header *row, uint16_t type); + /** * Fast encode xrow header using the specified header fields. * It is faster than the xrow_header_encode, because uses