Skip to content
Snippets Groups Projects
Verified Commit 00b2749e authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

feature: support replicaset_id

- Choose it in the topology module if it's not provided in a
  `JoinRequest`.
- Persist in `raft_group` space.
- Respond with an error if `JoinRequest` contains different
  `replicaset_id`.
- In `JoinResponse` it's transferred automatically.

Part of https://git.picodata.io/picodata/picodata/picodata/-/issues/51
parent 19f1f622
No related branches found
No related tags found
1 merge request!108feature: support replicaset_id
Pipeline #4980 passed
...@@ -69,7 +69,7 @@ pub struct Peer { ...@@ -69,7 +69,7 @@ pub struct Peer {
pub peer_address: String, pub peer_address: String,
pub voter: bool, pub voter: bool,
pub instance_id: String, pub instance_id: String,
// pub replicaset_id: String, pub replicaset_id: String,
pub instance_uuid: String, pub instance_uuid: String,
// pub replicaset_uuid: String, // pub replicaset_uuid: String,
/// `0` means it's not committed yet. /// `0` means it's not committed yet.
......
...@@ -76,7 +76,7 @@ impl Storage { ...@@ -76,7 +76,7 @@ impl Storage {
{name = 'peer_address', type = 'string', is_nullable = false}, {name = 'peer_address', type = 'string', is_nullable = false},
{name = 'voter', type = 'boolean', is_nullable = false}, {name = 'voter', type = 'boolean', is_nullable = false},
{name = 'instance_id', type = 'string', is_nullable = false}, {name = 'instance_id', type = 'string', is_nullable = false},
-- {name = 'replicaset_id', type = 'string', is_nullable = false}, {name = 'replicaset_id', type = 'string', is_nullable = false},
{name = 'instance_uuid', type = 'string', is_nullable = false}, {name = 'instance_uuid', type = 'string', is_nullable = false},
-- {name = 'replicaset_uuid', type = 'string', is_nullable = false}, -- {name = 'replicaset_uuid', type = 'string', is_nullable = false},
{name = 'commit_index', type = 'unsigned', is_nullable = false}, {name = 'commit_index', type = 'unsigned', is_nullable = false},
......
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::BTreeSet;
use crate::traft::instance_uuid; use crate::traft::instance_uuid;
use crate::traft::JoinRequest; use crate::traft::JoinRequest;
...@@ -13,6 +14,7 @@ pub struct Topology { ...@@ -13,6 +14,7 @@ pub struct Topology {
max_raft_id: RaftId, max_raft_id: RaftId,
instance_id_map: BTreeMap<String, RaftId>, instance_id_map: BTreeMap<String, RaftId>,
replicaset_map: BTreeMap<String, BTreeSet<RaftId>>,
} }
impl Topology { impl Topology {
...@@ -23,6 +25,7 @@ impl Topology { ...@@ -23,6 +25,7 @@ impl Topology {
max_raft_id: 0, max_raft_id: 0,
instance_id_map: Default::default(), instance_id_map: Default::default(),
replicaset_map: Default::default(),
}; };
for peer in peers.drain(..) { for peer in peers.drain(..) {
...@@ -37,6 +40,10 @@ impl Topology { ...@@ -37,6 +40,10 @@ impl Topology {
self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id); self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id);
self.instance_id_map.insert(peer.instance_id, peer.raft_id); self.instance_id_map.insert(peer.instance_id, peer.raft_id);
self.replicaset_map
.entry(peer.replicaset_id.clone())
.or_default()
.insert(peer.raft_id);
} }
fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> { fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> {
...@@ -44,9 +51,33 @@ impl Topology { ...@@ -44,9 +51,33 @@ impl Topology {
self.peers.get(raft_id) self.peers.get(raft_id)
} }
fn choose_replicaset_id(&self) -> String {
let mut i = 0u64;
loop {
i += 1;
let replicaset_id = format!("r{i}");
if self.replicaset_map.get(&replicaset_id).is_none() {
return replicaset_id;
}
}
}
pub fn process(&mut self, req: &JoinRequest) -> Result<(), String> { pub fn process(&mut self, req: &JoinRequest) -> Result<(), String> {
if let Some(peer) = self.peer_by_instance_id(&req.instance_id) { if let Some(peer) = self.peer_by_instance_id(&req.instance_id) {
// TODO check replicaset_id didn't change match &req.replicaset_id {
Some(replicaset_id) if replicaset_id != &peer.replicaset_id => {
let e = format!(
std::concat!(
"{} already joined with a different replicaset_id,",
" requested: {},",
" existing: {}.",
),
req.instance_id, replicaset_id, peer.replicaset_id
);
return Err(e);
}
_ => (),
}
let mut peer = peer.clone(); let mut peer = peer.clone();
peer.peer_address = req.advertise_address.clone(); peer.peer_address = req.advertise_address.clone();
...@@ -58,10 +89,15 @@ impl Topology { ...@@ -58,10 +89,15 @@ impl Topology {
return Ok(()); return Ok(());
} else { } else {
let raft_id = self.max_raft_id + 1; let raft_id = self.max_raft_id + 1;
let replicaset_id = match &req.replicaset_id {
Some(v) => v.clone(),
None => self.choose_replicaset_id(),
};
let peer = Peer { let peer = Peer {
raft_id, raft_id,
instance_id: req.instance_id.clone(), instance_id: req.instance_id.clone(),
replicaset_id,
commit_index: INVALID_INDEX, commit_index: INVALID_INDEX,
instance_uuid: instance_uuid(&req.instance_id), instance_uuid: instance_uuid(&req.instance_id),
peer_address: req.advertise_address.clone(), peer_address: req.advertise_address.clone(),
...@@ -91,6 +127,7 @@ mod tests { ...@@ -91,6 +127,7 @@ mod tests {
[ $( ( [ $( (
$raft_id:expr, $raft_id:expr,
$instance_id:literal, $instance_id:literal,
$replicaset_id:literal,
$peer_address:literal, $peer_address:literal,
$voter:literal $voter:literal
) ),* $(,)? ] => { ) ),* $(,)? ] => {
...@@ -100,6 +137,7 @@ mod tests { ...@@ -100,6 +137,7 @@ mod tests {
peer_address: $peer_address.into(), peer_address: $peer_address.into(),
voter: $voter, voter: $voter,
instance_id: $instance_id.into(), instance_id: $instance_id.into(),
replicaset_id: $replicaset_id.into(),
commit_index: raft::INVALID_INDEX, commit_index: raft::INVALID_INDEX,
instance_uuid: instance_uuid($instance_id), instance_uuid: instance_uuid($instance_id),
} }
...@@ -110,12 +148,13 @@ mod tests { ...@@ -110,12 +148,13 @@ mod tests {
macro_rules! req { macro_rules! req {
( (
$instance_id:literal, $instance_id:literal,
$replicaset_id:expr,
$advertise_address:literal, $advertise_address:literal,
$voter:literal $voter:literal
) => { ) => {
&JoinRequest { &JoinRequest {
instance_id: $instance_id.into(), instance_id: $instance_id.into(),
replicaset_id: None, replicaset_id: $replicaset_id.map(|v: &str| v.into()),
advertise_address: $advertise_address.into(), advertise_address: $advertise_address.into(),
voter: $voter, voter: $voter,
} }
...@@ -139,28 +178,30 @@ mod tests { ...@@ -139,28 +178,30 @@ mod tests {
fn test_simple() { fn test_simple() {
assert_eq!(Topology::from_peers(vec![]).diff(), vec![]); assert_eq!(Topology::from_peers(vec![]).diff(), vec![]);
let peers = peers![(1, "i1", "addr:1", true)]; let peers = peers![(1, "i1", "R1", "addr:1", true)];
assert_eq!(Topology::from_peers(peers).diff(), vec![]); assert_eq!(Topology::from_peers(peers).diff(), vec![]);
test!( test!(
init: peers![], init: peers![],
req: [ req: [
req!("i1", "nowhere", true), req!("i1", None, "nowhere", true),
req!("i2", None, "nowhere", true),
], ],
expected_diff: peers![ expected_diff: peers![
(1, "i1", "nowhere", true), (1, "i1", "r1", "nowhere", true),
(2, "i2", "r2", "nowhere", true),
] ]
); );
test!( test!(
init: peers![ init: peers![
(1, "i1", "addr:1", true), (1, "i1", "R1", "addr:1", true),
], ],
req: [ req: [
req!("i2", "addr:2", false), req!("i2", Some("R2"), "addr:2", false),
], ],
expected_diff: peers![ expected_diff: peers![
(2, "i2", "addr:2", false), (2, "i2", "R2", "addr:2", false),
] ]
); );
} }
...@@ -169,13 +210,13 @@ mod tests { ...@@ -169,13 +210,13 @@ mod tests {
fn test_override() { fn test_override() {
test!( test!(
init: peers![ init: peers![
(1, "i1", "addr:1", false), (1, "i1", "R1", "addr:1", false),
], ],
req: [ req: [
req!("i1", "addr:2", true), req!("i1", None, "addr:2", true),
], ],
expected_diff: peers![ expected_diff: peers![
(1, "i1", "addr:2", true), (1, "i1", "R1", "addr:2", true),
] ]
); );
} }
...@@ -185,12 +226,40 @@ mod tests { ...@@ -185,12 +226,40 @@ mod tests {
test!( test!(
init: peers![], init: peers![],
req: [ req: [
req!("i1", "addr:1", false), req!("i1", Some("R1"), "addr:1", false),
req!("i1", "addr:2", true), req!("i1", None, "addr:2", true),
], ],
expected_diff: peers![ expected_diff: peers![
(1, "i1", "addr:2", true), (1, "i1", "R1", "addr:2", true),
] ]
); );
} }
#[test]
fn test_replicaset_mismatch() {
let expected_error = concat!(
"i3 already joined with a different replicaset_id,",
" requested: R-B,",
" existing: R-A.",
);
let peers = peers![(3, "i3", "R-A", "x:1", false)];
let mut topology = Topology::from_peers(peers);
topology
.process(req!("i3", Some("R-B"), "x:2", true))
.map_err(|e| assert_eq!(e, expected_error))
.unwrap_err();
assert_eq!(topology.diff(), vec![]);
let peers = peers![(2, "i2", "R-A", "nowhere", true)];
let mut topology = Topology::from_peers(peers);
topology
.process(req!("i3", Some("R-A"), "y:1", false))
.unwrap();
topology
.process(req!("i3", Some("R-B"), "y:2", true))
.map_err(|e| assert_eq!(e, expected_error))
.unwrap_err();
assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]);
}
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment