From bcbe9232c66c2d0d21ab659d09a4bfe8499ee54b Mon Sep 17 00:00:00 2001 From: Serge Petrenko <sergepetrenko@tarantool.org> Date: Thu, 9 Nov 2023 18:50:21 +0300 Subject: [PATCH] replication: persist confirmed vclock on replicas Previously the replicas only persisted the confirmed lsn of the current synchronous transaction queue owner. As soon as the onwer changed, the info about which lsn was confirmed by the previous owner was lost. Actually, this info is needed to correctly filter synchro requests coming from the old term, so start tracking confirmed vclock instead of the confirmed lsn on replicas. In-scope of #9138 NO_TEST=covered by the next commit NO_CHANGELOG=internal change @TarantoolBot document Title: Document new IPROTO_RAFT_PROMOTE request field IPROTO_RAFT_PROMOTE and IPROTO_RAFT_DEMOTE requests receive a new key value pair: IPROTO_VCLOCK : MP_MAP The vclock holds a confirmed vclock of the node sending the request. (cherry picked from commit c4415d4442484d1b8faf83adef3d69e454f61769) --- src/box/applier.cc | 7 ++++-- src/box/box.cc | 2 +- src/box/memtx_engine.cc | 8 +++++-- src/box/relay.cc | 5 ++-- src/box/txn_limbo.c | 51 ++++++++++++++++++++++++++--------------- src/box/txn_limbo.h | 9 ++++++-- src/box/xrow.c | 29 +++++++++++++++++++---- src/box/xrow.h | 11 +++++++-- test/unit/xrow.cc | 5 ++-- 9 files changed, 92 insertions(+), 35 deletions(-) diff --git a/src/box/applier.cc b/src/box/applier.cc index 6e1391a1af..f9b0261d8b 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -795,8 +795,11 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_error_xc(&row); } else if (iproto_type_is_promote_request(row.type)) { struct synchro_request req; - if (xrow_decode_synchro(&row, &req) != 0) + struct vclock limbo_vclock; + if (xrow_decode_synchro(&row, &req, + &limbo_vclock) != 0) { diag_raise(); + } if (txn_limbo_process(&txn_limbo, &req) != 0) diag_raise(); } else if (iproto_type_is_raft_request(row.type)) { @@ -1102,7 +1105,7 @@ applier_parse_tx_row(struct applier_tx_row *tx_row) diag_raise(); } } else if (iproto_type_is_synchro_request(type)) { - if (xrow_decode_synchro(row, &tx_row->req.synchro) != 0) { + if (xrow_decode_synchro(row, &tx_row->req.synchro, NULL) != 0) { diag_raise(); } } else if (iproto_type_is_raft_request(type)) { diff --git a/src/box/box.cc b/src/box/box.cc index 29fbc65ebb..59912cd457 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -681,7 +681,7 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) return -1; } struct synchro_request syn_req; - if (xrow_decode_synchro(row, &syn_req) != 0) { + if (xrow_decode_synchro(row, &syn_req, NULL) != 0) { say_error("couldn't decode a synchro request"); return -1; } diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 8fc4d70c6c..0a9051d841 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -327,7 +327,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT_PROMOTE); struct synchro_request req; - if (xrow_decode_synchro(row, &req) != 0) + struct vclock synchro_vclock; + if (xrow_decode_synchro(row, &req, &synchro_vclock) != 0) return -1; /* * Origin id cannot be deduced from row.replica_id in a checkpoint, @@ -767,6 +768,8 @@ struct checkpoint { struct xdir dir; struct raft_request raft; struct synchro_request synchro_state; + /** The limbo confirmed vclock at the moment of checkpoint creation. */ + struct vclock synchro_vclock; /** * Do nothing, just touch the snapshot file - the * checkpoint already exists. @@ -823,7 +826,8 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); vclock_create(&ckpt->vclock); box_raft_checkpoint_local(&ckpt->raft); - txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state); + txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state, + &ckpt->synchro_vclock); ckpt->touch = false; return ckpt; } diff --git a/src/box/relay.cc b/src/box/relay.cc index bec8b76bf3..4aa8751358 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -489,7 +489,8 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock, struct synchro_request req; struct raft_request raft_req; - txn_limbo_checkpoint(&txn_limbo, &req); + struct vclock limbo_vclock; + txn_limbo_checkpoint(&txn_limbo, &req, &limbo_vclock); box_raft_checkpoint_local(&raft_req); /* Respond to the JOIN request with the current vclock. */ @@ -1279,7 +1280,7 @@ relay_filter_row(struct relay *relay, struct xrow_header *packet) */ if (iproto_type_is_promote_request(packet->type)) { struct synchro_request req; - xrow_decode_synchro(packet, &req); + xrow_decode_synchro(packet, &req, NULL); while (relay->sent_raft_term < req.term) { if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index d78775bcc6..e4b77ba7e4 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -49,6 +49,7 @@ txn_limbo_create(struct txn_limbo *limbo) fiber_cond_create(&limbo->wait_cond); vclock_create(&limbo->vclock); vclock_create(&limbo->promote_term_map); + vclock_create(&limbo->confirmed_vclock); limbo->promote_greatest_term = 0; latch_create(&limbo->promote_latch); limbo->confirmed_lsn = 0; @@ -355,13 +356,16 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) } void -txn_limbo_checkpoint(const struct txn_limbo *limbo, - struct synchro_request *req) +txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req, + struct vclock *vclock) { req->type = IPROTO_RAFT_PROMOTE; req->replica_id = limbo->owner_id; req->lsn = limbo->confirmed_lsn; req->term = limbo->promote_greatest_term; + if (vclock != NULL) + vclock_copy(vclock, &limbo->confirmed_vclock); + req->confirmed_vclock = vclock; } /** Write a request to WAL. */ @@ -408,7 +412,7 @@ synchro_request_write(const struct synchro_request *req) /** Create a request for a specific limbo and write it to WAL. */ static void txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn, - uint64_t term) + uint64_t term, struct vclock *vclock) { assert(lsn >= 0); @@ -417,6 +421,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn, .replica_id = limbo->owner_id, .lsn = lsn, .term = term, + .confirmed_vclock = vclock, }; synchro_request_write(&req); } @@ -431,7 +436,8 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->confirmed_lsn = lsn; - txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0); + vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn); + txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0, NULL); } /** Confirm all the entries <= @a lsn. */ @@ -503,8 +509,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) * comparing existing confirm_lsn with the one arriving from a remote * instance. */ - if (limbo->confirmed_lsn < lsn) + if (limbo->confirmed_lsn < lsn) { limbo->confirmed_lsn = lsn; + vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn); + } } /** @@ -518,7 +526,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->is_in_rollback = true; - txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0); + txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0, NULL); limbo->is_in_rollback = false; } @@ -572,6 +580,11 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) .origin_id = instance_id, .lsn = lsn, .term = term, + /* + * Confirmed_vclock is only persisted in checkpoints. It doesn't + * appear in WALs and replication. + */ + .confirmed_vclock = NULL, }; if (txn_limbo_req_prepare(limbo, &req) < 0) return -1; @@ -586,19 +599,15 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) */ static void txn_limbo_read_promote(struct txn_limbo *limbo, uint32_t replica_id, - uint32_t prev_id, int64_t lsn) + int64_t lsn) { txn_limbo_read_confirm(limbo, lsn); txn_limbo_read_rollback(limbo, lsn + 1); assert(txn_limbo_is_empty(limbo)); limbo->owner_id = replica_id; + limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock, + replica_id); box_update_ro_summary(); - /* - * Only nullify confirmed_lsn when the new value is unknown. I.e. when - * prev_id != replica_id. - */ - if (replica_id != prev_id) - limbo->confirmed_lsn = 0; } int @@ -614,6 +623,7 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) .origin_id = instance_id, .lsn = lsn, .term = term, + .confirmed_vclock = NULL, }; if (txn_limbo_req_prepare(limbo, &req) < 0) return -1; @@ -628,9 +638,9 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) * @sa txn_limbo_read_promote. */ static void -txn_limbo_read_demote(struct txn_limbo *limbo, uint32_t prev_id, int64_t lsn) +txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, prev_id, lsn); + return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn); } void @@ -1108,6 +1118,8 @@ txn_limbo_req_prepare(struct txn_limbo *limbo, assert(limbo->svp_confirmed_lsn == -1); limbo->svp_confirmed_lsn = limbo->confirmed_lsn; limbo->confirmed_lsn = req->lsn; + vclock_reset(&limbo->confirmed_vclock, limbo->owner_id, + req->lsn); break; } /* @@ -1129,6 +1141,8 @@ txn_limbo_req_rollback(struct txn_limbo *limbo, assert(limbo->is_in_rollback); assert(limbo->svp_confirmed_lsn >= 0); limbo->confirmed_lsn = limbo->svp_confirmed_lsn; + vclock_reset(&limbo->confirmed_vclock, limbo->owner_id, + limbo->svp_confirmed_lsn); limbo->svp_confirmed_lsn = -1; limbo->is_in_rollback = false; break; @@ -1184,6 +1198,8 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req) } } } + if (req->confirmed_vclock != NULL) + vclock_copy(&limbo->confirmed_vclock, req->confirmed_vclock); int64_t lsn = req->lsn; switch (req->type) { @@ -1194,11 +1210,10 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req) txn_limbo_read_rollback(limbo, lsn); break; case IPROTO_RAFT_PROMOTE: - txn_limbo_read_promote(limbo, req->origin_id, req->replica_id, - lsn); + txn_limbo_read_promote(limbo, req->origin_id, lsn); break; case IPROTO_RAFT_DEMOTE: - txn_limbo_read_demote(limbo, req->replica_id, lsn); + txn_limbo_read_demote(limbo, lsn); break; default: unreachable(); diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index bf10f3af22..fba16a4712 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -138,6 +138,11 @@ struct txn_limbo { * except outdated nodes. */ struct vclock promote_term_map; + /** + * A vclock containing biggest known confirmed lsns for each previous + * limbo owner. + */ + struct vclock confirmed_vclock; /** * The biggest PROMOTE term seen by the instance and persisted in WAL. * It is related to raft term, but not the same. Synchronous replication @@ -408,8 +413,8 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout); * Persist limbo state to a given synchro request. */ void -txn_limbo_checkpoint(const struct txn_limbo *limbo, - struct synchro_request *req); +txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req, + struct vclock *vclock); /** * Write a PROMOTE request, which has the same effect as CONFIRM(@a lsn) and diff --git a/src/box/xrow.c b/src/box/xrow.c index f897809522..7a71b1b1a8 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1252,20 +1252,32 @@ xrow_encode_synchro(struct xrow_header *row, char *body, char *pos = body; - pos = mp_encode_map(pos, - iproto_type_is_promote_request(req->type) ? 3 : 2); + /* Skip one byte for the map. */ + pos++; + uint32_t map_size = 0; pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); pos = mp_encode_uint(pos, req->replica_id); + map_size++; pos = mp_encode_uint(pos, IPROTO_LSN); pos = mp_encode_uint(pos, req->lsn); + map_size++; - if (iproto_type_is_promote_request(req->type)) { + if (req->term != 0) { pos = mp_encode_uint(pos, IPROTO_TERM); pos = mp_encode_uint(pos, req->term); + map_size++; + } + + if (req->confirmed_vclock != NULL) { + pos = mp_encode_uint(pos, IPROTO_VCLOCK); + pos = mp_encode_vclock_ignore0(pos, req->confirmed_vclock); + map_size++; } + mp_encode_map(body, map_size); + assert(pos - body < XROW_BODY_LEN_MAX); memset(row, 0, sizeof(*row)); @@ -1276,7 +1288,8 @@ xrow_encode_synchro(struct xrow_header *row, char *body, } int -xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req, + struct vclock *vclock) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1303,6 +1316,7 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) uint8_t key = mp_decode_uint(&d); if (key < iproto_key_MAX && iproto_key_type[key] != mp_typeof(*d)) { +bad_msgpack: xrow_on_decode_err(row, ER_INVALID_MSGPACK, "request body"); return -1; @@ -1317,6 +1331,13 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) case IPROTO_TERM: req->term = mp_decode_uint(&d); break; + case IPROTO_VCLOCK: + if (vclock == NULL) + mp_next(&d); + else if (mp_decode_vclock_ignore0(&d, vclock) != 0) + goto bad_msgpack; + req->confirmed_vclock = vclock; + break; default: mp_next(&d); } diff --git a/src/box/xrow.h b/src/box/xrow.h index cdf6b1d34d..4f86c465c1 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -314,9 +314,14 @@ struct synchro_request { int64_t lsn; /** * The new term the instance issuing this request is in. Only used for - * PROMOTE request. + * PROMOTE and DEMOTE requests. */ uint64_t term; + /** + * Confirmed lsns of all the previous limbo owners. Only used for + * PROMOTE and DEMOTE requests. + */ + struct vclock *confirmed_vclock; }; /** @@ -333,11 +338,13 @@ xrow_encode_synchro(struct xrow_header *row, char *body, * Decode synchronous replication request. * @param row xrow header. * @param[out] req Request parameters. + * @param[out] vclock Storage for request vclock. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req, + struct vclock *vclock); /** * Raft request. It repeats Raft message to the letter, but can be extended in diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index d07fdf1b38..9b57ecb5ea 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -493,7 +493,8 @@ test_xrow_decode_unknown_key(void) struct synchro_request synchro; header.type = IPROTO_RAFT_PROMOTE; - is(xrow_decode_synchro(&header, &synchro), 0, "xrow_decode_synchro"); + is(xrow_decode_synchro(&header, &synchro, NULL), 0, + "xrow_decode_synchro"); struct raft_request raft; header.type = IPROTO_RAFT; @@ -558,7 +559,7 @@ test_xrow_decode_synchro_types(void) header.body[0].iov_len = mp_format(buf, sizeof(buf), "{%u%s}", IPROTO_INSTANCE_UUID, "someuuid"); struct synchro_request synchro; - is(xrow_decode_synchro(&header, &synchro), 0, + is(xrow_decode_synchro(&header, &synchro, NULL), 0, "xrow_decode_synchro correctly handles key types"); check_plan(); -- GitLab