diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 8501db575da31a52b0c461b1eba1080b3cf477d9..1fc3c1bb723fcf37d969b0d52501ff54b51e41bc 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -436,6 +436,9 @@ pub struct Peer { pub replicaset_id: String, pub replicaset_uuid: String, + /// Signifies whether this instance is a master of it's replicaset or not. + pub is_master: bool, + /// Index of the most recent raft log entry that persisted this peer. /// `0` means it's not committed yet. pub commit_index: RaftIndex, @@ -937,6 +940,8 @@ define_peer_change! { TargetGrade(TargetGrade), #[setter = with_failure_domain, field = failure_domain] FailureDomain(FailureDomain), + #[setter = with_is_master, field = is_master] + IsMaster(bool), } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 6f0f93dc2eefc0576ffaa6fc4572b450fd75708f..089cc48adb969ed76fefb361a3c80487fe4fdde8 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -1162,8 +1162,13 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { let cluster_id = cluster_id.clone(); let peer_iid_2 = peer_iid.clone(); let res = resp.and_then(move |replication::Response { lsn }| { - let req = UpdatePeerRequest::new(peer_iid_2, cluster_id) + let mut req = UpdatePeerRequest::new(peer_iid_2, cluster_id) .with_current_grade(CurrentGrade::Replicated); + if replicaset_size == 1 { + // TODO: ignore expelled peers + // TODO: ignore offline peers + req = req.with_is_master(true); + } node.handle_topology_request_and_wait(req.into()) .map(|_| lsn) }); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 337317fb3bf12c75580584ed328cfd0cacf186ea..01917891e053725ce76bb789caa42105c971ca7d 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -424,6 +424,7 @@ define_peer_fields! { PeerAddress : String = ("peer_address", FieldType::String) ReplicasetId : String = ("replicaset_id", FieldType::String) ReplicasetUuid : String = ("replicaset_uuid", FieldType::String) + IsMaster : bool = ("is_master", FieldType::Boolean) CommitIndex : RaftIndex = ("commit_index", FieldType::Unsigned) CurrentGrade : traft::CurrentGrade = ("current_grade", FieldType::String) TargetGrade : traft::TargetGrade = ("target_grade", FieldType::String) @@ -537,13 +538,13 @@ inventory::submit!(crate::InnerTest { for peer in vec![ // r1 - ("i1", "i1-uuid", 1u64, "addr:1", "r1", "r1-uuid", 1u64, CurrentGrade::Online, TargetGrade::Online, &faildom,), - ("i2", "i2-uuid", 2u64, "addr:2", "r1", "r1-uuid", 2, CurrentGrade::Online, TargetGrade::Online, &faildom,), + ("i1", "i1-uuid", 1u64, "addr:1", "r1", "r1-uuid", false, 1u64, CurrentGrade::Online, TargetGrade::Online, &faildom,), + ("i2", "i2-uuid", 2u64, "addr:2", "r1", "r1-uuid", false, 2, CurrentGrade::Online, TargetGrade::Online, &faildom,), // r2 - ("i3", "i3-uuid", 3u64, "addr:3", "r2", "r2-uuid", 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), - ("i4", "i4-uuid", 4u64, "addr:4", "r2", "r2-uuid", 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), + ("i3", "i3-uuid", 3u64, "addr:3", "r2", "r2-uuid", false, 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), + ("i4", "i4-uuid", 4u64, "addr:4", "r2", "r2-uuid", false, 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), // r3 - ("i5", "i5-uuid", 5u64, "addr:5", "r3", "r3-uuid", 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), + ("i5", "i5-uuid", 5u64, "addr:5", "r3", "r3-uuid", false, 10, CurrentGrade::Online, TargetGrade::Online, &faildom,), ] { raft_group.put(&peer).unwrap(); } @@ -567,9 +568,9 @@ inventory::submit!(crate::InnerTest { " in unique index \"raft_id\"", " in space \"raft_group\"", " with old tuple", - r#" - ["i1", "i1-uuid", 1, "addr:1", "r1", "r1-uuid", 1, "{gon}", "{tgon}", {{"A": "B"}}]"#, + r#" - ["i1", "i1-uuid", 1, "addr:1", "r1", "r1-uuid", false, 1, "{gon}", "{tgon}", {{"A": "B"}}]"#, " and new tuple", - r#" - ["i99", "", 1, "", "", "", 0, "{goff}", "{tgon}", {{}}]"#, + r#" - ["i99", "", 1, "", "", "", false, 0, "{goff}", "{tgon}", {{}}]"#, ), gon = CurrentGrade::Online, goff = CurrentGrade::Offline, diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 69677fdbdee577bc365b9ea2d424dfccb92b00c0..7cd2741feb85b6059deedb5c21bae54dd1b03ee8 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -169,6 +169,7 @@ impl Topology { current_grade: CurrentGrade::Offline, target_grade: TargetGrade::Offline, failure_domain, + ..Peer::default() }; self.put_peer(peer.clone()); @@ -290,6 +291,7 @@ mod tests { $( let _f = $failure_domain; )? _f }, + .. Peer::default() } }; } diff --git a/test/int/test_joining.py b/test/int/test_joining.py index edf59cac2275cf846cafef0e450a8ba94687e22a..5f0606abfbdc9eaecb0711319a27f6dce19857a6 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -136,25 +136,24 @@ def test_replication(cluster: Cluster): for instance in cluster.instances: with instance.connect(1) as conn: - raft_peer = conn.select("raft_group", [instance.instance_id])[0] + raft_peer = conn.eval( + "return box.space.raft_group:get(...):tomap()", + instance.instance_id, + )[0] space_cluster = conn.select("_cluster") - # Field 6 is commit_index. - # It's unperdictable - erase it before comparison. - raft_peer[6] = None - - assert raft_peer == [ - instance.instance_id, - instance.eval("return box.info.uuid"), - instance.raft_id, - instance.eval("return box.info.listen"), - "r1", - instance.eval("return box.info.cluster.uuid"), - None, - "Online", - "Online", - dict(), - ] + expected = { + "instance_id": instance.instance_id, + "instance_uuid": instance.eval("return box.info.uuid"), + "raft_id": instance.raft_id, + "peer_address": instance.eval("return box.info.listen"), + "replicaset_id": "r1", + "replicaset_uuid": instance.eval("return box.info.cluster.uuid"), + "current_grade": "Online", + "target_grade": "Online", + "failure_domain": dict(), + } + assert {k: v for k, v in raft_peer.items() if k in expected} == expected assert list(space_cluster) == [ [1, i1.instance_uuid()],