diff --git a/src/traft/mod.rs b/src/traft/mod.rs index cc38a84653f2198f04ef9015e07a28a62b15f601..94db0ff5c89389b4e41497ad1fffbba2825835e2 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -69,7 +69,7 @@ pub struct Peer { pub peer_address: String, pub voter: bool, pub instance_id: String, - // pub replicaset_id: String, + pub replicaset_id: String, pub instance_uuid: String, // pub replicaset_uuid: String, /// `0` means it's not committed yet. diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 7f2b8e58542efc9d64e6a25bf25ece62f3f40e4c..d05266300c6fb853ae6c0fdbc805e9f8666c05b6 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -76,7 +76,7 @@ impl Storage { {name = 'peer_address', type = 'string', is_nullable = false}, {name = 'voter', type = 'boolean', is_nullable = false}, {name = 'instance_id', type = 'string', is_nullable = false}, - -- {name = 'replicaset_id', type = 'string', is_nullable = false}, + {name = 'replicaset_id', type = 'string', is_nullable = false}, {name = 'instance_uuid', type = 'string', is_nullable = false}, -- {name = 'replicaset_uuid', type = 'string', is_nullable = false}, {name = 'commit_index', type = 'unsigned', is_nullable = false}, diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 12df06d78e447407929db5e185a68d902b5f0aa6..1f31b138fe9744a48964db4a5d5a2aea0dacdc4d 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::collections::BTreeSet; use crate::traft::instance_uuid; use crate::traft::JoinRequest; @@ -13,6 +14,7 @@ pub struct Topology { max_raft_id: RaftId, instance_id_map: BTreeMap<String, RaftId>, + replicaset_map: BTreeMap<String, BTreeSet<RaftId>>, } impl Topology { @@ -23,6 +25,7 @@ impl Topology { max_raft_id: 0, instance_id_map: Default::default(), + replicaset_map: Default::default(), }; for peer in peers.drain(..) { @@ -37,6 +40,10 @@ impl Topology { self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id); self.instance_id_map.insert(peer.instance_id, peer.raft_id); + self.replicaset_map + .entry(peer.replicaset_id.clone()) + .or_default() + .insert(peer.raft_id); } fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> { @@ -44,9 +51,33 @@ impl Topology { self.peers.get(raft_id) } + fn choose_replicaset_id(&self) -> String { + let mut i = 0u64; + loop { + i += 1; + let replicaset_id = format!("r{i}"); + if self.replicaset_map.get(&replicaset_id).is_none() { + return replicaset_id; + } + } + } + pub fn process(&mut self, req: &JoinRequest) -> Result<(), String> { if let Some(peer) = self.peer_by_instance_id(&req.instance_id) { - // TODO check replicaset_id didn't change + match &req.replicaset_id { + Some(replicaset_id) if replicaset_id != &peer.replicaset_id => { + let e = format!( + std::concat!( + "{} already joined with a different replicaset_id,", + " requested: {},", + " existing: {}.", + ), + req.instance_id, replicaset_id, peer.replicaset_id + ); + return Err(e); + } + _ => (), + } let mut peer = peer.clone(); peer.peer_address = req.advertise_address.clone(); @@ -58,10 +89,15 @@ impl Topology { return Ok(()); } else { let raft_id = self.max_raft_id + 1; + let replicaset_id = match &req.replicaset_id { + Some(v) => v.clone(), + None => self.choose_replicaset_id(), + }; let peer = Peer { raft_id, instance_id: req.instance_id.clone(), + replicaset_id, commit_index: INVALID_INDEX, instance_uuid: instance_uuid(&req.instance_id), peer_address: req.advertise_address.clone(), @@ -91,6 +127,7 @@ mod tests { [ $( ( $raft_id:expr, $instance_id:literal, + $replicaset_id:literal, $peer_address:literal, $voter:literal ) ),* $(,)? ] => { @@ -100,6 +137,7 @@ mod tests { peer_address: $peer_address.into(), voter: $voter, instance_id: $instance_id.into(), + replicaset_id: $replicaset_id.into(), commit_index: raft::INVALID_INDEX, instance_uuid: instance_uuid($instance_id), } @@ -110,12 +148,13 @@ mod tests { macro_rules! req { ( $instance_id:literal, + $replicaset_id:expr, $advertise_address:literal, $voter:literal ) => { &JoinRequest { instance_id: $instance_id.into(), - replicaset_id: None, + replicaset_id: $replicaset_id.map(|v: &str| v.into()), advertise_address: $advertise_address.into(), voter: $voter, } @@ -139,28 +178,30 @@ mod tests { fn test_simple() { assert_eq!(Topology::from_peers(vec![]).diff(), vec![]); - let peers = peers![(1, "i1", "addr:1", true)]; + let peers = peers![(1, "i1", "R1", "addr:1", true)]; assert_eq!(Topology::from_peers(peers).diff(), vec![]); test!( init: peers![], req: [ - req!("i1", "nowhere", true), + req!("i1", None, "nowhere", true), + req!("i2", None, "nowhere", true), ], expected_diff: peers![ - (1, "i1", "nowhere", true), + (1, "i1", "r1", "nowhere", true), + (2, "i2", "r2", "nowhere", true), ] ); test!( init: peers![ - (1, "i1", "addr:1", true), + (1, "i1", "R1", "addr:1", true), ], req: [ - req!("i2", "addr:2", false), + req!("i2", Some("R2"), "addr:2", false), ], expected_diff: peers![ - (2, "i2", "addr:2", false), + (2, "i2", "R2", "addr:2", false), ] ); } @@ -169,13 +210,13 @@ mod tests { fn test_override() { test!( init: peers![ - (1, "i1", "addr:1", false), + (1, "i1", "R1", "addr:1", false), ], req: [ - req!("i1", "addr:2", true), + req!("i1", None, "addr:2", true), ], expected_diff: peers![ - (1, "i1", "addr:2", true), + (1, "i1", "R1", "addr:2", true), ] ); } @@ -185,12 +226,40 @@ mod tests { test!( init: peers![], req: [ - req!("i1", "addr:1", false), - req!("i1", "addr:2", true), + req!("i1", Some("R1"), "addr:1", false), + req!("i1", None, "addr:2", true), ], expected_diff: peers![ - (1, "i1", "addr:2", true), + (1, "i1", "R1", "addr:2", true), ] ); } + + #[test] + fn test_replicaset_mismatch() { + let expected_error = concat!( + "i3 already joined with a different replicaset_id,", + " requested: R-B,", + " existing: R-A.", + ); + + let peers = peers![(3, "i3", "R-A", "x:1", false)]; + let mut topology = Topology::from_peers(peers); + topology + .process(req!("i3", Some("R-B"), "x:2", true)) + .map_err(|e| assert_eq!(e, expected_error)) + .unwrap_err(); + assert_eq!(topology.diff(), vec![]); + + let peers = peers![(2, "i2", "R-A", "nowhere", true)]; + let mut topology = Topology::from_peers(peers); + topology + .process(req!("i3", Some("R-A"), "y:1", false)) + .unwrap(); + topology + .process(req!("i3", Some("R-B"), "y:2", true)) + .map_err(|e| assert_eq!(e, expected_error)) + .unwrap_err(); + assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]); + } }