diff --git a/src/box/applier.cc b/src/box/applier.cc index 6e1391a1afc13bcab5526bc2a68a3e17acec9dfd..f9b0261d8b24e4ae36698e0d97dee0162bd7abb3 100644 --- a/src/box/applier.cc +++ b/src/box/applier.cc @@ -795,8 +795,11 @@ applier_wait_snapshot(struct applier *applier) xrow_decode_error_xc(&row); } else if (iproto_type_is_promote_request(row.type)) { struct synchro_request req; - if (xrow_decode_synchro(&row, &req) != 0) + struct vclock limbo_vclock; + if (xrow_decode_synchro(&row, &req, + &limbo_vclock) != 0) { diag_raise(); + } if (txn_limbo_process(&txn_limbo, &req) != 0) diag_raise(); } else if (iproto_type_is_raft_request(row.type)) { @@ -1102,7 +1105,7 @@ applier_parse_tx_row(struct applier_tx_row *tx_row) diag_raise(); } } else if (iproto_type_is_synchro_request(type)) { - if (xrow_decode_synchro(row, &tx_row->req.synchro) != 0) { + if (xrow_decode_synchro(row, &tx_row->req.synchro, NULL) != 0) { diag_raise(); } } else if (iproto_type_is_raft_request(type)) { diff --git a/src/box/box.cc b/src/box/box.cc index 29fbc65ebb10e9d0c1e2f2dcea1cb90bfb9198f6..59912cd457100ab4d9dd850574991d007085efc0 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -681,7 +681,7 @@ wal_stream_apply_synchro_row(struct wal_stream *stream, struct xrow_header *row) return -1; } struct synchro_request syn_req; - if (xrow_decode_synchro(row, &syn_req) != 0) { + if (xrow_decode_synchro(row, &syn_req, NULL) != 0) { say_error("couldn't decode a synchro request"); return -1; } diff --git a/src/box/memtx_engine.cc b/src/box/memtx_engine.cc index 8fc4d70c6c7ed1f749a6cfddc6c7492d34e48566..0a9051d8412afabffb6a31f3a15b715f31dc25b1 100644 --- a/src/box/memtx_engine.cc +++ b/src/box/memtx_engine.cc @@ -327,7 +327,8 @@ memtx_engine_recover_synchro(const struct xrow_header *row) { assert(row->type == IPROTO_RAFT_PROMOTE); struct synchro_request req; - if (xrow_decode_synchro(row, &req) != 0) + struct vclock synchro_vclock; + if (xrow_decode_synchro(row, &req, &synchro_vclock) != 0) return -1; /* * Origin id cannot be deduced from row.replica_id in a checkpoint, @@ -767,6 +768,8 @@ struct checkpoint { struct xdir dir; struct raft_request raft; struct synchro_request synchro_state; + /** The limbo confirmed vclock at the moment of checkpoint creation. */ + struct vclock synchro_vclock; /** * Do nothing, just touch the snapshot file - the * checkpoint already exists. @@ -823,7 +826,8 @@ checkpoint_new(const char *snap_dirname, uint64_t snap_io_rate_limit) xdir_create(&ckpt->dir, snap_dirname, SNAP, &INSTANCE_UUID, &opts); vclock_create(&ckpt->vclock); box_raft_checkpoint_local(&ckpt->raft); - txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state); + txn_limbo_checkpoint(&txn_limbo, &ckpt->synchro_state, + &ckpt->synchro_vclock); ckpt->touch = false; return ckpt; } diff --git a/src/box/relay.cc b/src/box/relay.cc index bec8b76bf34d0caf0435d90cf64a144b780977a9..4aa8751358974599548ba4f6ea8b55241b7b7da4 100644 --- a/src/box/relay.cc +++ b/src/box/relay.cc @@ -489,7 +489,8 @@ relay_initial_join(struct iostream *io, uint64_t sync, struct vclock *vclock, struct synchro_request req; struct raft_request raft_req; - txn_limbo_checkpoint(&txn_limbo, &req); + struct vclock limbo_vclock; + txn_limbo_checkpoint(&txn_limbo, &req, &limbo_vclock); box_raft_checkpoint_local(&raft_req); /* Respond to the JOIN request with the current vclock. */ @@ -1279,7 +1280,7 @@ relay_filter_row(struct relay *relay, struct xrow_header *packet) */ if (iproto_type_is_promote_request(packet->type)) { struct synchro_request req; - xrow_decode_synchro(packet, &req); + xrow_decode_synchro(packet, &req, NULL); while (relay->sent_raft_term < req.term) { if (fiber_is_cancelled()) { diag_set(FiberIsCancelled); diff --git a/src/box/txn_limbo.c b/src/box/txn_limbo.c index d78775bcc602f4416285e8232609a515c9f2e43b..e4b77ba7e4a40a864d9c540eb02bdf9ef829e287 100644 --- a/src/box/txn_limbo.c +++ b/src/box/txn_limbo.c @@ -49,6 +49,7 @@ txn_limbo_create(struct txn_limbo *limbo) fiber_cond_create(&limbo->wait_cond); vclock_create(&limbo->vclock); vclock_create(&limbo->promote_term_map); + vclock_create(&limbo->confirmed_vclock); limbo->promote_greatest_term = 0; latch_create(&limbo->promote_latch); limbo->confirmed_lsn = 0; @@ -355,13 +356,16 @@ txn_limbo_wait_complete(struct txn_limbo *limbo, struct txn_limbo_entry *entry) } void -txn_limbo_checkpoint(const struct txn_limbo *limbo, - struct synchro_request *req) +txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req, + struct vclock *vclock) { req->type = IPROTO_RAFT_PROMOTE; req->replica_id = limbo->owner_id; req->lsn = limbo->confirmed_lsn; req->term = limbo->promote_greatest_term; + if (vclock != NULL) + vclock_copy(vclock, &limbo->confirmed_vclock); + req->confirmed_vclock = vclock; } /** Write a request to WAL. */ @@ -408,7 +412,7 @@ synchro_request_write(const struct synchro_request *req) /** Create a request for a specific limbo and write it to WAL. */ static void txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn, - uint64_t term) + uint64_t term, struct vclock *vclock) { assert(lsn >= 0); @@ -417,6 +421,7 @@ txn_limbo_write_synchro(struct txn_limbo *limbo, uint16_t type, int64_t lsn, .replica_id = limbo->owner_id, .lsn = lsn, .term = term, + .confirmed_vclock = vclock, }; synchro_request_write(&req); } @@ -431,7 +436,8 @@ txn_limbo_write_confirm(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->confirmed_lsn = lsn; - txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0); + vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn); + txn_limbo_write_synchro(limbo, IPROTO_RAFT_CONFIRM, lsn, 0, NULL); } /** Confirm all the entries <= @a lsn. */ @@ -503,8 +509,10 @@ txn_limbo_read_confirm(struct txn_limbo *limbo, int64_t lsn) * comparing existing confirm_lsn with the one arriving from a remote * instance. */ - if (limbo->confirmed_lsn < lsn) + if (limbo->confirmed_lsn < lsn) { limbo->confirmed_lsn = lsn; + vclock_follow(&limbo->confirmed_vclock, limbo->owner_id, lsn); + } } /** @@ -518,7 +526,7 @@ txn_limbo_write_rollback(struct txn_limbo *limbo, int64_t lsn) assert(lsn > limbo->confirmed_lsn); assert(!limbo->is_in_rollback); limbo->is_in_rollback = true; - txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0); + txn_limbo_write_synchro(limbo, IPROTO_RAFT_ROLLBACK, lsn, 0, NULL); limbo->is_in_rollback = false; } @@ -572,6 +580,11 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) .origin_id = instance_id, .lsn = lsn, .term = term, + /* + * Confirmed_vclock is only persisted in checkpoints. It doesn't + * appear in WALs and replication. + */ + .confirmed_vclock = NULL, }; if (txn_limbo_req_prepare(limbo, &req) < 0) return -1; @@ -586,19 +599,15 @@ txn_limbo_write_promote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) */ static void txn_limbo_read_promote(struct txn_limbo *limbo, uint32_t replica_id, - uint32_t prev_id, int64_t lsn) + int64_t lsn) { txn_limbo_read_confirm(limbo, lsn); txn_limbo_read_rollback(limbo, lsn + 1); assert(txn_limbo_is_empty(limbo)); limbo->owner_id = replica_id; + limbo->confirmed_lsn = vclock_get(&limbo->confirmed_vclock, + replica_id); box_update_ro_summary(); - /* - * Only nullify confirmed_lsn when the new value is unknown. I.e. when - * prev_id != replica_id. - */ - if (replica_id != prev_id) - limbo->confirmed_lsn = 0; } int @@ -614,6 +623,7 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) .origin_id = instance_id, .lsn = lsn, .term = term, + .confirmed_vclock = NULL, }; if (txn_limbo_req_prepare(limbo, &req) < 0) return -1; @@ -628,9 +638,9 @@ txn_limbo_write_demote(struct txn_limbo *limbo, int64_t lsn, uint64_t term) * @sa txn_limbo_read_promote. */ static void -txn_limbo_read_demote(struct txn_limbo *limbo, uint32_t prev_id, int64_t lsn) +txn_limbo_read_demote(struct txn_limbo *limbo, int64_t lsn) { - return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, prev_id, lsn); + return txn_limbo_read_promote(limbo, REPLICA_ID_NIL, lsn); } void @@ -1108,6 +1118,8 @@ txn_limbo_req_prepare(struct txn_limbo *limbo, assert(limbo->svp_confirmed_lsn == -1); limbo->svp_confirmed_lsn = limbo->confirmed_lsn; limbo->confirmed_lsn = req->lsn; + vclock_reset(&limbo->confirmed_vclock, limbo->owner_id, + req->lsn); break; } /* @@ -1129,6 +1141,8 @@ txn_limbo_req_rollback(struct txn_limbo *limbo, assert(limbo->is_in_rollback); assert(limbo->svp_confirmed_lsn >= 0); limbo->confirmed_lsn = limbo->svp_confirmed_lsn; + vclock_reset(&limbo->confirmed_vclock, limbo->owner_id, + limbo->svp_confirmed_lsn); limbo->svp_confirmed_lsn = -1; limbo->is_in_rollback = false; break; @@ -1184,6 +1198,8 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req) } } } + if (req->confirmed_vclock != NULL) + vclock_copy(&limbo->confirmed_vclock, req->confirmed_vclock); int64_t lsn = req->lsn; switch (req->type) { @@ -1194,11 +1210,10 @@ txn_limbo_req_commit(struct txn_limbo *limbo, const struct synchro_request *req) txn_limbo_read_rollback(limbo, lsn); break; case IPROTO_RAFT_PROMOTE: - txn_limbo_read_promote(limbo, req->origin_id, req->replica_id, - lsn); + txn_limbo_read_promote(limbo, req->origin_id, lsn); break; case IPROTO_RAFT_DEMOTE: - txn_limbo_read_demote(limbo, req->replica_id, lsn); + txn_limbo_read_demote(limbo, lsn); break; default: unreachable(); diff --git a/src/box/txn_limbo.h b/src/box/txn_limbo.h index bf10f3af22c46e29cc07b197cd6602cf51ed5878..fba16a47128bdd69cefe977de2a6e7ba6ebbd0b3 100644 --- a/src/box/txn_limbo.h +++ b/src/box/txn_limbo.h @@ -138,6 +138,11 @@ struct txn_limbo { * except outdated nodes. */ struct vclock promote_term_map; + /** + * A vclock containing biggest known confirmed lsns for each previous + * limbo owner. + */ + struct vclock confirmed_vclock; /** * The biggest PROMOTE term seen by the instance and persisted in WAL. * It is related to raft term, but not the same. Synchronous replication @@ -408,8 +413,8 @@ txn_limbo_wait_empty(struct txn_limbo *limbo, double timeout); * Persist limbo state to a given synchro request. */ void -txn_limbo_checkpoint(const struct txn_limbo *limbo, - struct synchro_request *req); +txn_limbo_checkpoint(const struct txn_limbo *limbo, struct synchro_request *req, + struct vclock *vclock); /** * Write a PROMOTE request, which has the same effect as CONFIRM(@a lsn) and diff --git a/src/box/xrow.c b/src/box/xrow.c index f8978095223e6f94d19acdb4882d88d210304f9b..7a71b1b1a8b0f1a9c16b28d9b0a5bb740015e2bc 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -1252,20 +1252,32 @@ xrow_encode_synchro(struct xrow_header *row, char *body, char *pos = body; - pos = mp_encode_map(pos, - iproto_type_is_promote_request(req->type) ? 3 : 2); + /* Skip one byte for the map. */ + pos++; + uint32_t map_size = 0; pos = mp_encode_uint(pos, IPROTO_REPLICA_ID); pos = mp_encode_uint(pos, req->replica_id); + map_size++; pos = mp_encode_uint(pos, IPROTO_LSN); pos = mp_encode_uint(pos, req->lsn); + map_size++; - if (iproto_type_is_promote_request(req->type)) { + if (req->term != 0) { pos = mp_encode_uint(pos, IPROTO_TERM); pos = mp_encode_uint(pos, req->term); + map_size++; + } + + if (req->confirmed_vclock != NULL) { + pos = mp_encode_uint(pos, IPROTO_VCLOCK); + pos = mp_encode_vclock_ignore0(pos, req->confirmed_vclock); + map_size++; } + mp_encode_map(body, map_size); + assert(pos - body < XROW_BODY_LEN_MAX); memset(row, 0, sizeof(*row)); @@ -1276,7 +1288,8 @@ xrow_encode_synchro(struct xrow_header *row, char *body, } int -xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req, + struct vclock *vclock) { if (row->bodycnt == 0) { diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); @@ -1303,6 +1316,7 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) uint8_t key = mp_decode_uint(&d); if (key < iproto_key_MAX && iproto_key_type[key] != mp_typeof(*d)) { +bad_msgpack: xrow_on_decode_err(row, ER_INVALID_MSGPACK, "request body"); return -1; @@ -1317,6 +1331,13 @@ xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req) case IPROTO_TERM: req->term = mp_decode_uint(&d); break; + case IPROTO_VCLOCK: + if (vclock == NULL) + mp_next(&d); + else if (mp_decode_vclock_ignore0(&d, vclock) != 0) + goto bad_msgpack; + req->confirmed_vclock = vclock; + break; default: mp_next(&d); } diff --git a/src/box/xrow.h b/src/box/xrow.h index cdf6b1d34daaa1bdfad36b713a3027c1512f6047..4f86c465c18860217e92ea820ae3fe3993e33117 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -314,9 +314,14 @@ struct synchro_request { int64_t lsn; /** * The new term the instance issuing this request is in. Only used for - * PROMOTE request. + * PROMOTE and DEMOTE requests. */ uint64_t term; + /** + * Confirmed lsns of all the previous limbo owners. Only used for + * PROMOTE and DEMOTE requests. + */ + struct vclock *confirmed_vclock; }; /** @@ -333,11 +338,13 @@ xrow_encode_synchro(struct xrow_header *row, char *body, * Decode synchronous replication request. * @param row xrow header. * @param[out] req Request parameters. + * @param[out] vclock Storage for request vclock. * @retval -1 on error. * @retval 0 success. */ int -xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req); +xrow_decode_synchro(const struct xrow_header *row, struct synchro_request *req, + struct vclock *vclock); /** * Raft request. It repeats Raft message to the letter, but can be extended in diff --git a/test/unit/xrow.cc b/test/unit/xrow.cc index d07fdf1b38201f4b84924c31e49cbf13dbaa8df6..9b57ecb5ea0a40f74d0491fb2efbc640d545df3b 100644 --- a/test/unit/xrow.cc +++ b/test/unit/xrow.cc @@ -493,7 +493,8 @@ test_xrow_decode_unknown_key(void) struct synchro_request synchro; header.type = IPROTO_RAFT_PROMOTE; - is(xrow_decode_synchro(&header, &synchro), 0, "xrow_decode_synchro"); + is(xrow_decode_synchro(&header, &synchro, NULL), 0, + "xrow_decode_synchro"); struct raft_request raft; header.type = IPROTO_RAFT; @@ -558,7 +559,7 @@ test_xrow_decode_synchro_types(void) header.body[0].iov_len = mp_format(buf, sizeof(buf), "{%u%s}", IPROTO_INSTANCE_UUID, "someuuid"); struct synchro_request synchro; - is(xrow_decode_synchro(&header, &synchro), 0, + is(xrow_decode_synchro(&header, &synchro, NULL), 0, "xrow_decode_synchro correctly handles key types"); check_plan();