Skip to content
Snippets Groups Projects
Verified Commit c47cc10f authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

feature: support replication_factor

Address `replication_factor` when choosing `relicaset_id` for a new
instance. It dosn't consider `failure_domain` yet, but takes into
account the number of instances.

Close https://git.picodata.io/picodata/picodata/picodata/-/issues/68
parent 00b2749e
No related branches found
No related tags found
1 merge request!109feature: support replication_factor
Pipeline #4982 passed
......@@ -624,7 +624,7 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor
let term = Storage::term().unwrap().unwrap_or(0);
let mut topology = match Storage::peers() {
Ok(v) => Topology::from_peers(v),
Ok(v) => Topology::from_peers(v).with_replication_factor(2),
Err(e) => {
for (_, notify) in batch {
let e = RaftError::ConfChangeError(format!("{e}"));
......
......@@ -11,6 +11,7 @@ use raft::INVALID_INDEX;
pub struct Topology {
peers: BTreeMap<RaftId, Peer>,
diff: BTreeMap<RaftId, Peer>,
replication_factor: u8,
max_raft_id: RaftId,
instance_id_map: BTreeMap<String, RaftId>,
......@@ -22,6 +23,7 @@ impl Topology {
let mut ret = Self {
peers: Default::default(),
diff: Default::default(),
replication_factor: 2,
max_raft_id: 0,
instance_id_map: Default::default(),
......@@ -35,6 +37,11 @@ impl Topology {
ret
}
pub fn with_replication_factor(mut self, replication_factor: u8) -> Self {
self.replication_factor = replication_factor;
self
}
fn put_peer(&mut self, peer: Peer) {
self.peers.insert(peer.raft_id, peer.clone());
......@@ -52,6 +59,12 @@ impl Topology {
}
fn choose_replicaset_id(&self) -> String {
for (replicaset_id, peers) in self.replicaset_map.iter() {
if peers.len() < self.replication_factor as usize {
return replicaset_id.clone();
}
}
let mut i = 0u64;
loop {
i += 1;
......@@ -163,11 +176,13 @@ mod tests {
macro_rules! test {
(
replication_factor: $replication_factor:literal,
init: $peers:expr,
req: [ $( $req:expr ),* $(,)?],
expected_diff: $expected:expr
) => {
let mut t = Topology::from_peers($peers);
let mut t = Topology::from_peers($peers)
.with_replication_factor($replication_factor);
$( t.process($req).unwrap(); )*
assert_eq!(t.diff(), $expected);
......@@ -182,6 +197,7 @@ mod tests {
assert_eq!(Topology::from_peers(peers).diff(), vec![]);
test!(
replication_factor: 1,
init: peers![],
req: [
req!("i1", None, "nowhere", true),
......@@ -194,6 +210,7 @@ mod tests {
);
test!(
replication_factor: 1,
init: peers![
(1, "i1", "R1", "addr:1", true),
],
......@@ -209,6 +226,7 @@ mod tests {
#[test]
fn test_override() {
test!(
replication_factor: 1,
init: peers![
(1, "i1", "R1", "addr:1", false),
],
......@@ -224,6 +242,7 @@ mod tests {
#[test]
fn test_batch_overlap() {
test!(
replication_factor: 1,
init: peers![],
req: [
req!("i1", Some("R1"), "addr:1", false),
......@@ -262,4 +281,27 @@ mod tests {
.unwrap_err();
assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]);
}
#[test]
fn test_replication_factor() {
test!(
replication_factor: 2,
init: peers![
(9, "i9", "r9", "nowhere", false),
(10, "i9", "r9", "nowhere", false),
],
req: [
req!("i1", None, "addr:1", true),
req!("i2", None, "addr:2", false),
req!("i3", None, "addr:3", false),
req!("i4", None, "addr:4", false),
],
expected_diff: peers![
(11, "i1", "r1", "addr:1", true),
(12, "i2", "r1", "addr:2", false),
(13, "i3", "r2", "addr:3", false),
(14, "i4", "r2", "addr:4", false),
]
);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment