diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index fcea7d14d60253c1f133c071bca4c6e8725357c0..286e84c28db4ee846369af51b9f6d8daecfc8415 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -1294,7 +1294,14 @@ send_join_header(struct xstream *stream, const struct vclock *vclock) struct xrow_header row; /* Encoding replication request uses fiber()->gc region. */ RegionGuard region_guard(&fiber()->gc); - xrow_encode_vclock_ignore0(&row, vclock); + /* + * Vclock is encoded with 0th component, as in case of checkpoint + * join it corresponds to the vclock of the checkpoint, where 0th + * component is essential, as otherwise signature won't be correct. + * Client sends this vclock in IPROTO_CURSOR, when he wants to + * continue fetching from the same checkpoint. + */ + xrow_encode_vclock(&row, vclock); xstream_write(stream, &row); } diff --git a/src/box/xrow.c b/src/box/xrow.c index c327027ba4914bd9bd93797b2316b3cb63f002d0..e7f1dcf20f8e155b791d9d3e1b108669aca869dc 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -73,15 +73,22 @@ mp_sizeof_vclock_ignore0(const struct vclock *vclock) mp_sizeof_uint(UINT64_MAX)); } +static inline uint32_t +mp_sizeof_vclock(const struct vclock *vclock) +{ + uint32_t size = vclock_size(vclock); + return mp_sizeof_map(size) + size * (mp_sizeof_uint(UINT32_MAX) + + mp_sizeof_uint(UINT64_MAX)); +} + static inline char * -mp_encode_vclock_ignore0(char *data, const struct vclock *vclock) +mp_encode_vclock_impl(char *data, const struct vclock *vclock, bool ignore0) { - data = mp_encode_map(data, vclock_size_ignore0(vclock)); struct vclock_iterator it; vclock_iterator_init(&it, vclock); struct vclock_c replica; replica = vclock_iterator_next(&it); - if (replica.id == 0) + if (replica.id == 0 && ignore0) replica = vclock_iterator_next(&it); for ( ; replica.id < VCLOCK_MAX; replica = vclock_iterator_next(&it)) { data = mp_encode_uint(data, replica.id); @@ -90,8 +97,22 @@ mp_encode_vclock_ignore0(char *data, const struct vclock *vclock) return data; } +static inline char * +mp_encode_vclock_ignore0(char *data, const struct vclock *vclock) +{ + data = mp_encode_map(data, vclock_size_ignore0(vclock)); + return mp_encode_vclock_impl(data, vclock, true); +} + +static inline char * +mp_encode_vclock(char *data, const struct vclock *vclock) +{ + data = mp_encode_map(data, vclock_size(vclock)); + return mp_encode_vclock_impl(data, vclock, false); +} + static int -mp_decode_vclock_ignore0(const char **data, struct vclock *vclock) +mp_decode_vclock(const char **data, struct vclock *vclock) { vclock_create(vclock); if (mp_typeof(**data) != MP_MAP) @@ -104,16 +125,21 @@ mp_decode_vclock_ignore0(const char **data, struct vclock *vclock) if (mp_typeof(**data) != MP_UINT) return -1; int64_t lsn = mp_decode_uint(data); - /* - * Skip vclock[0] coming from the remote - * instances. - */ - if (lsn > 0 && id != 0) + if (lsn > 0) vclock_follow(vclock, id, lsn); } return 0; } +static int +mp_decode_vclock_ignore0(const char **data, struct vclock *vclock) +{ + if (mp_decode_vclock(data, vclock) != 0) + return -1; + vclock_reset(vclock, 0, 0); + return 0; +} + /** * If log_level is 'verbose' or greater, * dump the corrupted row contents in hex to the log. @@ -2059,6 +2085,8 @@ struct replication_request { char *instance_name; /** IPROTO_VCLOCK. */ struct vclock *vclock_ignore0; + /** IPROTO_VCLOCK. */ + struct vclock *vclock; /** IPROTO_ID_FILTER. */ uint32_t *id_filter; /** IPROTO_SERVER_VERSION. */ @@ -2075,8 +2103,14 @@ xrow_encode_replication_request(struct xrow_header *row, { memset(row, 0, sizeof(*row)); size_t size = XROW_BODY_LEN_MAX; - if (req->vclock_ignore0 != NULL) + if (req->vclock_ignore0 != NULL) { size += mp_sizeof_vclock_ignore0(req->vclock_ignore0); + assert(req->vclock == NULL); + } + if (req->vclock != NULL) { + size += mp_sizeof_vclock(req->vclock); + assert(req->vclock_ignore0 == NULL); + } char *buf = xregion_alloc(&fiber()->gc, size); /* Skip one byte for future map header. */ char *data = buf + 1; @@ -2106,6 +2140,11 @@ xrow_encode_replication_request(struct xrow_header *row, data = mp_encode_uint(data, IPROTO_VCLOCK); data = mp_encode_vclock_ignore0(data, req->vclock_ignore0); } + if (req->vclock != NULL) { + ++map_size; + data = mp_encode_uint(data, IPROTO_VCLOCK); + data = mp_encode_vclock(data, req->vclock); + } if (req->version_id != NULL) { ++map_size; data = mp_encode_uint(data, IPROTO_SERVER_VERSION); @@ -2542,6 +2581,15 @@ xrow_encode_vclock_ignore0(struct xrow_header *row, const struct vclock *vclock) xrow_encode_replication_request(row, &base_req, IPROTO_OK); } +void +xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock) +{ + const struct replication_request base_req = { + .vclock = (struct vclock *)vclock, + }; + xrow_encode_replication_request(row, &base_req, IPROTO_OK); +} + int xrow_decode_vclock_ignore0(const struct xrow_header *row, struct vclock *vclock) { diff --git a/src/box/xrow.h b/src/box/xrow.h index a80081edd855566a60ef7f40fc42e8654fe4b639..8b9dc7d4ad1c0246e1aa9dd89bae5f9c5f7fef89 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -732,6 +732,10 @@ void xrow_encode_vclock_ignore0(struct xrow_header *row, const struct vclock *vclock); +/** Encode vclock including 0th component. */ +void +xrow_encode_vclock(struct xrow_header *row, const struct vclock *vclock); + /** Decode vclock ignoring 0th component. */ int xrow_decode_vclock_ignore0(const struct xrow_header *row,