diff --git a/src/main.rs b/src/main.rs index 809d4e7cf7c9d2e2c8ff668983213a8133097971..093d9f0b4874f2f50a95fb2164c211a0ee45f705 100644 --- a/src/main.rs +++ b/src/main.rs @@ -137,7 +137,7 @@ fn init_handlers() { declare_cfunc!(discovery::proc_discover); declare_cfunc!(traft::node::raft_interact); declare_cfunc!(traft::node::raft_join); - declare_cfunc!(traft::failover::raft_deactivate); + declare_cfunc!(traft::failover::raft_set_active); } fn rm_tarantool_files(data_dir: &str) { diff --git a/src/traft/failover.rs b/src/traft/failover.rs index ca68f1f44d5fb3132d69b928f38641455fb97e08..77c41fdaaff2958492e35e6154810d35c241ffa4 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -19,15 +19,14 @@ pub fn on_shutdown() { } let peer = Storage::peer_by_raft_id(raft_id).unwrap().unwrap(); - let req = SetActiveRequest { - instance_id: peer.instance_id, - cluster_id: Storage::cluster_id() + let req = SetActiveRequest::deactivate( + peer.instance_id, + Storage::cluster_id() .unwrap() .expect("cluster_id must be present"), - is_active: false, - }; + ); - let fn_name = stringify_cfunc!(raft_deactivate); + let fn_name = stringify_cfunc!(raft_set_active); // will run until we get successfully deactivate or tarantool shuts down // the on_shutdown fiber (after 3 secs) loop { @@ -51,7 +50,7 @@ pub fn on_shutdown() { } #[proc(packed_args)] -fn raft_deactivate(req: SetActiveRequest) -> Result<SetActiveResponse, Box<dyn std::error::Error>> { +fn raft_set_active(req: SetActiveRequest) -> Result<SetActiveResponse, Box<dyn std::error::Error>> { let node = node::global()?; let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 08d05ab053c3b3aeadc62849926dbab5f9f9aa5a..a28645cbf1f9541e33299f4d5142b0e8c3e0b7ee 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -13,6 +13,7 @@ use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::any::Any; use std::convert::TryFrom; +use std::fmt::Display; use uuid::Uuid; use protobuf::Message as _; @@ -143,12 +144,16 @@ pub struct Peer { /// Index in the raft log. `0` means it's not committed yet. pub commit_index: u64, - /// Is this instance active. Instances become inactive when they shut down. - pub is_active: bool, + /// The state of this instance's activity. + pub health: Health, } impl AsTuple for Peer {} -impl Peer {} +impl Peer { + pub fn is_active(&self) -> bool { + matches!(self.health, Health::Online) + } +} ////////////////////////////////////////////////////////////////////////////////////////// /// Serializable representation of `raft::prelude::Entry`. @@ -426,18 +431,69 @@ pub struct JoinResponse { } impl AsTuple for JoinResponse {} +/////////////////////////////////////////////////////////////////////////////// +/// [`SetActiveRequest`] kind tag +#[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)] +pub enum Health { + // Instance is active and is handling requests. + Online, + // Instance has gracefully shut down. + Offline, +} + +impl Health { + const fn to_str(&self) -> &str { + match self { + Self::Online => "Online", + Self::Offline => "Offline", + } + } +} + +impl Display for Health { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + f.write_str(self.to_str()) + } +} + +impl Default for Health { + fn default() -> Self { + Self::Offline + } +} + /////////////////////////////////////////////////////////////////////////////// /// Request to deactivate the instance. #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SetActiveRequest { + pub kind: Health, pub cluster_id: String, pub instance_id: String, - pub is_active: bool, } impl AsTuple for SetActiveRequest {} +impl SetActiveRequest { + #[inline] + pub fn activate(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { + Self { + kind: Health::Online, + instance_id: instance_id.into(), + cluster_id: cluster_id.into(), + } + } + + #[inline] + pub fn deactivate(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { + Self { + kind: Health::Offline, + instance_id: instance_id.into(), + cluster_id: cluster_id.into(), + } + } +} + /////////////////////////////////////////////////////////////////////////////// -/// Response to a [`DeactivateRequest`] +/// Response to a [`SetActiveRequest`] #[derive(Clone, Debug, Serialize, Deserialize)] pub struct SetActiveResponse { pub peer: Peer, diff --git a/src/traft/node.rs b/src/traft/node.rs index b8f580222bf794e1382aee02462f78640709115d..c614ece8a672cda2bb03055d35bc544feba980e2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -608,10 +608,8 @@ fn raft_main_loop( }) => topology.join(instance_id, replicaset_id, advertise_address), TopologyRequest::SetActive(SetActiveRequest { - instance_id, - is_active: active, - .. - }) => topology.set_active(&instance_id, active), + instance_id, kind, .. + }) => topology.set_active(&instance_id, kind), }; let mut peer = match peer_result { @@ -904,7 +902,7 @@ fn raft_conf_change_loop( let peers: HashMap<RaftId, bool> = Storage::peers() .unwrap() .iter() - .map(|peer| (peer.raft_id, peer.is_active)) + .map(|peer| (peer.raft_id, peer.is_active())) .collect(); let mut changes: Vec<raft::ConfChangeSingle> = Vec::new(); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index bbef207a9d584b25f0ed439af2db12f1f087cb36..0e5ddab29fbd1f140a886d4f56ea68891e0093f0 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -79,7 +79,7 @@ impl Storage { {name = 'replicaset_id', type = 'string', is_nullable = false}, {name = 'replicaset_uuid', type = 'string', is_nullable = false}, {name = 'commit_index', type = 'unsigned', is_nullable = false}, - {name = 'is_active', type = 'boolean', is_nullable = false}, + {name = 'is_active', type = 'string', is_nullable = false}, } }) @@ -545,8 +545,8 @@ inventory::submit!(crate::InnerTest { " Tarantool error:", " TupleFound:", " Duplicate key exists in unique index \"pk\" in space \"raft_state\"", - " with old tuple - [\"id\", 16]", - " and new tuple - [\"id\", 32]" + " with old tuple - [\"raft_id\", 16]", + " and new tuple - [\"raft_id\", 32]" ) ); @@ -572,17 +572,21 @@ inventory::submit!(crate::InnerTest { inventory::submit!(crate::InnerTest { name: "test_storage_peers", body: || { + use traft::Health::{Offline, Online}; + let mut raft_group = Storage::space(RAFT_GROUP).unwrap(); for peer in vec![ // r1 - ("i1", "i1-uuid", 1u64, "addr:1", "r1", "r1-uuid", 1u64, true), - ("i2", "i2-uuid", 2u64, "addr:2", "r1", "r1-uuid", 2, true), + ( + "i1", "i1-uuid", 1u64, "addr:1", "r1", "r1-uuid", 1u64, Online, + ), + ("i2", "i2-uuid", 2u64, "addr:2", "r1", "r1-uuid", 2, Online), // r2 - ("i3", "i3-uuid", 3u64, "addr:3", "r2", "r2-uuid", 10, true), - ("i4", "i4-uuid", 4u64, "addr:4", "r2", "r2-uuid", 10, true), + ("i3", "i3-uuid", 3u64, "addr:3", "r2", "r2-uuid", 10, Online), + ("i4", "i4-uuid", 4u64, "addr:4", "r2", "r2-uuid", 10, Online), // r3 - ("i5", "i5-uuid", 5u64, "addr:5", "r3", "r3-uuid", 10, true), + ("i5", "i5-uuid", 5u64, "addr:5", "r3", "r3-uuid", 10, Online), ] { raft_group.put(&peer).unwrap(); } @@ -599,16 +603,20 @@ inventory::submit!(crate::InnerTest { instance_id: "i99".into(), ..Default::default() }), - concat!( - "unknown error", - " Tarantool error:", - " TupleFound: Duplicate key exists", - " in unique index \"raft_id\"", - " in space \"raft_group\"", - " with old tuple", - " - [\"i1\", \"i1-uuid\", 1, \"addr:1\", \"r1\", \"r1-uuid\", 1, true]", - " and new tuple", - " - [\"i99\", \"\", 1, \"\", \"\", \"\", 0, false]" + format!( + concat!( + "unknown error", + " Tarantool error:", + " TupleFound: Duplicate key exists", + " in unique index \"raft_id\"", + " in space \"raft_group\"", + " with old tuple", + r#" - ["i1", "i1-uuid", 1, "addr:1", "r1", "r1-uuid", 1, "{on}"]"#, + " and new tuple", + r#" - ["i99", "", 1, "", "", "", 0, "{off}"]"#, + ), + on = Online, + off = Offline, ) ); diff --git a/src/traft/topology.rs b/src/traft/topology.rs index b9461cdd77ca6c09aba2267e07537a5451e6c85d..93c56e680ca48dd1c8d49719f55e83ffe67e7b4a 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -3,6 +3,7 @@ use std::collections::BTreeSet; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; +use crate::traft::Health; use crate::traft::Peer; use crate::traft::{InstanceId, RaftId, ReplicasetId}; @@ -88,7 +89,7 @@ impl Topology { if let Some(id) = instance_id.as_ref() { let existing_peer: Option<&Peer> = self.instance_map.get(id); - if matches!(existing_peer, Some(peer) if peer.is_active) { + if matches!(existing_peer, Some(peer) if peer.is_active()) { let e = format!("{} is already joined", id); return Err(e); } @@ -112,7 +113,7 @@ impl Topology { // Mark instance already active when it joins. // It prevents a disruption in case of the // instance_id collision. - is_active: true, + health: Health::Online, }; self.put_peer(peer.clone()); @@ -134,13 +135,13 @@ impl Topology { Ok(peer.clone()) } - pub fn set_active(&mut self, instance_id: &str, active: bool) -> Result<Peer, String> { + pub fn set_active(&mut self, instance_id: &str, health: Health) -> Result<Peer, String> { let mut peer = self .instance_map .get_mut(instance_id) .ok_or_else(|| format!("unknown instance {}", instance_id))?; - peer.is_active = active; + peer.health = health; Ok(peer.clone()) } } @@ -165,22 +166,20 @@ mod tests { use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; + use crate::traft::Health::{Offline, Online}; use crate::traft::Peer; use pretty_assertions::assert_eq; - const INACTIVE: bool = false; - const ACTIVE: bool = true; - macro_rules! peers { [ $( ( $raft_id:expr, $instance_id:literal, $replicaset_id:literal, $peer_address:literal, - $is_active:expr $(,)? + $health:expr $(,)? ) ),* $(,)? ] => { vec![$( - peer!($raft_id, $instance_id, $replicaset_id, $peer_address, $is_active) + peer!($raft_id, $instance_id, $replicaset_id, $peer_address, $health) ),*] }; } @@ -191,7 +190,7 @@ mod tests { $instance_id:literal, $replicaset_id:literal, $peer_address:literal, - $is_active:expr $(,)? + $health:expr $(,)? ) => { Peer { raft_id: $raft_id, @@ -201,7 +200,7 @@ mod tests { instance_uuid: instance_uuid($instance_id), replicaset_uuid: replicaset_uuid($replicaset_id), commit_index: raft::INVALID_INDEX, - is_active: $is_active, + health: $health, } }; } @@ -227,38 +226,38 @@ mod tests { assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - peer!(1, "i1", "r1", "addr:1", ACTIVE) + peer!(1, "i1", "r1", "addr:1", Online) ); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - peer!(2, "i2", "r2", "addr:1", ACTIVE) + peer!(2, "i2", "r2", "addr:1", Online) ); assert_eq!( join!(topology, None, Some("R3"), "addr:1").unwrap(), - peer!(3, "i3", "R3", "addr:1", ACTIVE) + peer!(3, "i3", "R3", "addr:1", Online) ); assert_eq!( join!(topology, Some("I4"), None, "addr:1").unwrap(), - peer!(4, "I4", "r3", "addr:1", ACTIVE) + peer!(4, "I4", "r3", "addr:1", Online) ); - let mut topology = Topology::from_peers(peers![(1, "i1", "r1", "addr:1", ACTIVE)]) + let mut topology = Topology::from_peers(peers![(1, "i1", "r1", "addr:1", Online)]) .with_replication_factor(1); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - peer!(2, "i2", "r2", "addr:1", ACTIVE) + peer!(2, "i2", "r2", "addr:1", Online) ); } #[test] fn test_override_existing() { let mut topology = Topology::from_peers(peers![ - (1, "i1", "R1", "active:1", ACTIVE), - (2, "i2", "R2", "inactive:1", INACTIVE), + (1, "i1", "R1", "active:1", Online), + (2, "i2", "R2", "inactive:1", Offline), ]); assert_eq!( @@ -270,7 +269,7 @@ mod tests { assert_eq!( join!(topology, Some("i2"), None, "inactive:2").unwrap(), - peer!(3, "i2", "R1", "inactive:2", ACTIVE), + peer!(3, "i2", "R1", "inactive:2", Online), ); } @@ -280,7 +279,7 @@ mod tests { assert_eq!( join!(topology, Some("i1"), Some("R1"), "addr:1").unwrap(), - peer!(1, "i1", "R1", "addr:1", ACTIVE), + peer!(1, "i1", "R1", "addr:1", Online), ); assert_eq!( @@ -293,7 +292,7 @@ mod tests { #[test] fn test_replicaset_mismatch() { - let mut topology = Topology::from_peers(peers![(3, "i3", "R-A", "x:1", ACTIVE)]); + let mut topology = Topology::from_peers(peers![(3, "i3", "R-A", "x:1", Online)]); assert_eq!( join!(topology, Some("i3"), Some("R-B"), "x:2") .unwrap_err() @@ -301,10 +300,10 @@ mod tests { "i3 is already joined", ); - let mut topology = Topology::from_peers(vec![peer!(2, "i2", "R-A", "nowhere", ACTIVE)]); + let mut topology = Topology::from_peers(vec![peer!(2, "i2", "R-A", "nowhere", Online)]); assert_eq!( join!(topology, Some("i3"), Some("R-A"), "y:1").unwrap(), - peer!(3, "i3", "R-A", "y:1", ACTIVE), + peer!(3, "i3", "R-A", "y:1", Online), ); assert_eq!( join!(topology, Some("i3"), Some("R-B"), "y:2") @@ -317,57 +316,57 @@ mod tests { #[test] fn test_replication_factor() { let mut topology = Topology::from_peers(peers![ - (9, "i9", "r9", "nowhere", ACTIVE), - (10, "i10", "r9", "nowhere", ACTIVE), + (9, "i9", "r9", "nowhere", Online), + (10, "i10", "r9", "nowhere", Online), ]) .with_replication_factor(2); assert_eq!( join!(topology, Some("i1"), None, "addr:1").unwrap(), - peer!(11, "i1", "r1", "addr:1", ACTIVE), + peer!(11, "i1", "r1", "addr:1", Online), ); assert_eq!( join!(topology, Some("i2"), None, "addr:2").unwrap(), - peer!(12, "i2", "r1", "addr:2", ACTIVE), + peer!(12, "i2", "r1", "addr:2", Online), ); assert_eq!( join!(topology, Some("i3"), None, "addr:3").unwrap(), - peer!(13, "i3", "r2", "addr:3", ACTIVE), + peer!(13, "i3", "r2", "addr:3", Online), ); assert_eq!( join!(topology, Some("i4"), None, "addr:4").unwrap(), - peer!(14, "i4", "r2", "addr:4", ACTIVE), + peer!(14, "i4", "r2", "addr:4", Online), ); } #[test] fn test_set_active() { let mut topology = Topology::from_peers(peers![ - (1, "i1", "r1", "nowhere", ACTIVE), - (2, "i2", "r2", "nowhere", INACTIVE), + (1, "i1", "r1", "nowhere", Online), + (2, "i2", "r2", "nowhere", Offline), ]) .with_replication_factor(1); assert_eq!( - topology.set_active("i1", INACTIVE).unwrap(), - peer!(1, "i1", "r1", "nowhere", INACTIVE), + topology.set_active("i1", Offline).unwrap(), + peer!(1, "i1", "r1", "nowhere", Offline), ); // idempotency assert_eq!( - topology.set_active("i1", INACTIVE).unwrap(), - peer!(1, "i1", "r1", "nowhere", INACTIVE), + topology.set_active("i1", Offline).unwrap(), + peer!(1, "i1", "r1", "nowhere", Offline), ); assert_eq!( - topology.set_active("i2", ACTIVE).unwrap(), - peer!(2, "i2", "r2", "nowhere", ACTIVE), + topology.set_active("i2", Online).unwrap(), + peer!(2, "i2", "r2", "nowhere", Online), ); // idempotency assert_eq!( - topology.set_active("i2", ACTIVE).unwrap(), - peer!(2, "i2", "r2", "nowhere", ACTIVE), + topology.set_active("i2", Online).unwrap(), + peer!(2, "i2", "r2", "nowhere", Online), ); } }