From 44804d0112b28eab3ef5de6942e257b888a032c7 Mon Sep 17 00:00:00 2001 From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io> Date: Wed, 15 Jun 2022 02:57:38 +0300 Subject: [PATCH] feat: rebootstrap Follower in a cluster of 3+ --- src/traft/node.rs | 28 +++++++++++++- src/traft/storage.rs | 15 ++++++++ src/traft/topology.rs | 79 +++++++++++++++++++++++++++++++++++----- test/int/test_joining.py | 18 ++++++++- 4 files changed, 127 insertions(+), 13 deletions(-) diff --git a/src/traft/node.rs b/src/traft/node.rs index 883cf98d95..122c7cf5f0 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -24,6 +24,7 @@ use std::time::Instant; use thiserror::Error; use crate::traft::ContextCoercion as _; +use crate::traft::Peer; use ::tarantool::util::IntoClones as _; use protobuf::Message as _; use protobuf::ProtobufEnum as _; @@ -90,6 +91,7 @@ enum NormalRequest { ProposeConfChange { term: u64, peers: Vec<traft::Peer>, + to_replace: Vec<(u64, traft::Peer)>, notify: Notify, }, @@ -293,6 +295,7 @@ fn raft_main_loop( NormalRequest::ProposeConfChange { term, peers, + to_replace, notify, } => { // In some states proposing a ConfChange is impossible. @@ -328,6 +331,7 @@ fn raft_main_loop( } let mut changes = Vec::with_capacity(peers.len()); + let mut new_peers: Vec<Peer> = Vec::new(); for peer in &peers { let change_type = match peer.voter { true => raft::ConfChangeType::AddNode, @@ -338,6 +342,25 @@ fn raft_main_loop( node_id: peer.raft_id, ..Default::default() }); + new_peers.push(peer.clone()); + } + + for (old_raft_id, peer) in &to_replace { + changes.push(raft::ConfChangeSingle { + change_type: raft::ConfChangeType::RemoveNode, + node_id: *old_raft_id, + ..Default::default() + }); + let change_type = match peer.voter { + true => raft::ConfChangeType::AddNode, + false => raft::ConfChangeType::AddLearnerNode, + }; + changes.push(raft::ConfChangeSingle { + change_type, + node_id: peer.raft_id, + ..Default::default() + }); + new_peers.push(peer.clone()); } let cc = raft::ConfChangeV2 { @@ -346,7 +369,7 @@ fn raft_main_loop( ..Default::default() }; - let ctx = traft::EntryContextConfChange { peers }.to_bytes(); + let ctx = traft::EntryContextConfChange { peers: new_peers }.to_bytes(); let prev_index = raw_node.raft.raft_log.last_index(); if let Err(e) = raw_node.propose_conf_change(ctx, cc) { @@ -485,7 +508,7 @@ fn raft_main_loop( commit_index: entry.index, ..peer.clone() }; - Storage::persist_peer(&peer).unwrap(); + Storage::persist_peer_by_instance_id(&peer).unwrap(); pool.connect(peer.raft_id, peer.peer_address); } @@ -649,6 +672,7 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor main_inbox.send(NormalRequest::ProposeConfChange { term, peers: topology.diff(), + to_replace: topology.to_replace(), notify: tx, }); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index e5fd49c24c..a23956ac76 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -273,6 +273,21 @@ impl Storage { Ok(()) } + pub fn persist_peer_by_instance_id(peer: &traft::Peer) -> Result<(), StorageError> { + if let Some(peer) = Self::peer_by_instance_id(&peer.instance_id)? { + Self::delete_peer(peer.raft_id)?; + } + Self::persist_peer(peer) + } + + pub fn delete_peer(raft_id: u64) -> Result<(), StorageError> { + Storage::space(RAFT_GROUP)? + .delete(&[raft_id]) + .map_err(box_err!())?; + + Ok(()) + } + pub fn entries(low: u64, high: u64) -> Result<Vec<raft::Entry>, StorageError> { // idx \in [low, high) let mut ret: Vec<raft::Entry> = vec![]; diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 49f71bafdc..b4433dd3c4 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -12,6 +12,7 @@ use raft::INVALID_INDEX; pub struct Topology { peers: BTreeMap<RaftId, Peer>, diff: BTreeMap<RaftId, Peer>, + to_replace: BTreeMap<RaftId, (RaftId, Peer)>, replication_factor: u8, max_raft_id: RaftId, @@ -24,6 +25,7 @@ impl Topology { let mut ret = Self { peers: Default::default(), diff: Default::default(), + to_replace: Default::default(), replication_factor: 2, max_raft_id: 0, @@ -97,7 +99,15 @@ impl Topology { peer.peer_address = req.advertise_address.clone(); peer.voter = req.voter; - self.diff.insert(peer.raft_id, peer.clone()); + if req.voter { + self.diff.insert(peer.raft_id, peer.clone()); + } else { + let old_raft_id = peer.raft_id; + peer.raft_id = self.max_raft_id + 1; + + self.to_replace + .insert(peer.raft_id, (old_raft_id, peer.clone())); + } self.put_peer(peer); return Ok(()); @@ -127,8 +137,12 @@ impl Topology { Ok(()) } - pub fn diff(self) -> Vec<Peer> { - self.diff.into_values().collect() + pub fn diff(&self) -> Vec<Peer> { + self.diff.clone().into_values().collect() + } + + pub fn to_replace(&self) -> Vec<(RaftId, Peer)> { + self.to_replace.clone().into_values().collect() } } @@ -163,6 +177,27 @@ mod tests { }; } + macro_rules! peer { + ( + $raft_id:expr, + $instance_id:literal, + $replicaset_id:literal, + $peer_address:literal, + $voter:literal + ) => { + Peer { + raft_id: $raft_id, + peer_address: $peer_address.into(), + voter: $voter, + instance_id: $instance_id.into(), + replicaset_id: $replicaset_id.into(), + instance_uuid: instance_uuid($instance_id), + replicaset_uuid: replicaset_uuid($replicaset_id), + commit_index: raft::INVALID_INDEX, + } + }; + } + macro_rules! req { ( $instance_id:literal, @@ -185,13 +220,15 @@ mod tests { replication_factor: $replication_factor:literal, init: $peers:expr, req: [ $( $req:expr ),* $(,)?], - expected_diff: $expected:expr + expected_diff: $expected:expr, + expected_to_replace: $expected_to_replace:expr, ) => { let mut t = Topology::from_peers($peers) .with_replication_factor($replication_factor); $( t.process($req).unwrap(); )* assert_eq!(t.diff(), $expected); + assert_eq!(t.to_replace(), $expected_to_replace); }; } @@ -212,7 +249,8 @@ mod tests { expected_diff: peers![ (1, "i1", "r1", "nowhere", true), (2, "i2", "r2", "nowhere", true), - ] + ], + expected_to_replace: vec![], ); test!( @@ -225,7 +263,8 @@ mod tests { ], expected_diff: peers![ (2, "i2", "R2", "addr:2", false), - ] + ], + expected_to_replace: vec![], ); } @@ -241,7 +280,8 @@ mod tests { ], expected_diff: peers![ (1, "i1", "R1", "addr:2", true), - ] + ], + expected_to_replace: vec![], ); } @@ -256,7 +296,8 @@ mod tests { ], expected_diff: peers![ (1, "i1", "R1", "addr:2", true), - ] + ], + expected_to_replace: vec![], ); } @@ -275,6 +316,7 @@ mod tests { .map_err(|e| assert_eq!(e, expected_error)) .unwrap_err(); assert_eq!(topology.diff(), vec![]); + assert_eq!(topology.to_replace(), vec![]); let peers = peers![(2, "i2", "R-A", "nowhere", true)]; let mut topology = Topology::from_peers(peers); @@ -286,6 +328,7 @@ mod tests { .map_err(|e| assert_eq!(e, expected_error)) .unwrap_err(); assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]); + assert_eq!(topology.to_replace(), vec![]); } #[test] @@ -307,7 +350,25 @@ mod tests { (12, "i2", "r1", "addr:2", false), (13, "i3", "r2", "addr:3", false), (14, "i4", "r2", "addr:4", false), - ] + ], + expected_to_replace: vec![], + ); + } + + #[test] + fn test_replace() { + test!( + replication_factor: 2, + init: peers![ + (1, "i1", "r1", "nowhere", false), + ], + req: [ + req!("i1", None, "addr:2", false), + ], + expected_diff: peers![], + expected_to_replace: vec![ + (1, peer!(2, "i1", "r1", "addr:2", false)), + ], ); } } diff --git a/test/int/test_joining.py b/test/int/test_joining.py index 56a9f8d982..c4e437c193 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -135,13 +135,13 @@ def test_uuids(cluster2: Cluster): timeout_seconds=1, ) - # Two consequent requests must obtain same raft_id and instance_id + # Two consequent requests must obtain same instance_id but different raft_id fake_peer_1 = join()[0]["peer"] fake_peer_2 = join()[0]["peer"] assert fake_peer_1["instance_id"] == "fake" assert fake_peer_2["instance_id"] == "fake" - assert fake_peer_1["raft_id"] == fake_peer_2["raft_id"] + assert fake_peer_2["raft_id"] == fake_peer_1["raft_id"] + 1 assert fake_peer_1["instance_uuid"] == fake_peer_2["instance_uuid"] @@ -248,3 +248,17 @@ def test_cluster_id_mismatch(instance: Instance): instance_id="whatever", timeout_seconds=1, ) + + +def test_rebootstrap_follower(cluster3: Cluster): + # Scenario: rebootstrap a follower in a cluster of 3+ + # Given a cluster of 3 instances + # When i3 is down + # And i3 data dir is removed + # And i3 started with same command-line arguments as first time + # Then i3 should become a follower + + i1, i2, i3 = cluster3.instances + i3.restart(remove_data=True) + i3.wait_ready() + i3.assert_raft_status("Follower") -- GitLab