diff --git a/src/traft/node.rs b/src/traft/node.rs index defb2e4723a42d5ed376cd09ae97aba0e727cfd7..efed75fc91ae1a75e1583bb9f0b3a70b09ec7323 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -624,7 +624,7 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor let term = Storage::term().unwrap().unwrap_or(0); let mut topology = match Storage::peers() { - Ok(v) => Topology::from_peers(v), + Ok(v) => Topology::from_peers(v).with_replication_factor(2), Err(e) => { for (_, notify) in batch { let e = RaftError::ConfChangeError(format!("{e}")); diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 1f31b138fe9744a48964db4a5d5a2aea0dacdc4d..3d59c7fc35e3a1e4fe167dba332ca054909a1c25 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -11,6 +11,7 @@ use raft::INVALID_INDEX; pub struct Topology { peers: BTreeMap<RaftId, Peer>, diff: BTreeMap<RaftId, Peer>, + replication_factor: u8, max_raft_id: RaftId, instance_id_map: BTreeMap<String, RaftId>, @@ -22,6 +23,7 @@ impl Topology { let mut ret = Self { peers: Default::default(), diff: Default::default(), + replication_factor: 2, max_raft_id: 0, instance_id_map: Default::default(), @@ -35,6 +37,11 @@ impl Topology { ret } + pub fn with_replication_factor(mut self, replication_factor: u8) -> Self { + self.replication_factor = replication_factor; + self + } + fn put_peer(&mut self, peer: Peer) { self.peers.insert(peer.raft_id, peer.clone()); @@ -52,6 +59,12 @@ impl Topology { } fn choose_replicaset_id(&self) -> String { + for (replicaset_id, peers) in self.replicaset_map.iter() { + if peers.len() < self.replication_factor as usize { + return replicaset_id.clone(); + } + } + let mut i = 0u64; loop { i += 1; @@ -163,11 +176,13 @@ mod tests { macro_rules! test { ( + replication_factor: $replication_factor:literal, init: $peers:expr, req: [ $( $req:expr ),* $(,)?], expected_diff: $expected:expr ) => { - let mut t = Topology::from_peers($peers); + let mut t = Topology::from_peers($peers) + .with_replication_factor($replication_factor); $( t.process($req).unwrap(); )* assert_eq!(t.diff(), $expected); @@ -182,6 +197,7 @@ mod tests { assert_eq!(Topology::from_peers(peers).diff(), vec![]); test!( + replication_factor: 1, init: peers![], req: [ req!("i1", None, "nowhere", true), @@ -194,6 +210,7 @@ mod tests { ); test!( + replication_factor: 1, init: peers![ (1, "i1", "R1", "addr:1", true), ], @@ -209,6 +226,7 @@ mod tests { #[test] fn test_override() { test!( + replication_factor: 1, init: peers![ (1, "i1", "R1", "addr:1", false), ], @@ -224,6 +242,7 @@ mod tests { #[test] fn test_batch_overlap() { test!( + replication_factor: 1, init: peers![], req: [ req!("i1", Some("R1"), "addr:1", false), @@ -262,4 +281,27 @@ mod tests { .unwrap_err(); assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]); } + + #[test] + fn test_replication_factor() { + test!( + replication_factor: 2, + init: peers![ + (9, "i9", "r9", "nowhere", false), + (10, "i9", "r9", "nowhere", false), + ], + req: [ + req!("i1", None, "addr:1", true), + req!("i2", None, "addr:2", false), + req!("i3", None, "addr:3", false), + req!("i4", None, "addr:4", false), + ], + expected_diff: peers![ + (11, "i1", "r1", "addr:1", true), + (12, "i2", "r1", "addr:2", false), + (13, "i3", "r2", "addr:3", false), + (14, "i4", "r2", "addr:4", false), + ] + ); + } }