From 835e2d38f6b3ec152890b49579b4549c0a36e2b4 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 2 Dec 2022 16:04:06 +0300 Subject: [PATCH] refactor: rename Peer -> Instance --- src/governor/cc.rs | 62 ++++---- src/main.rs | 70 +++++---- src/on_shutdown.rs | 16 +- src/storage.rs | 243 ++++++++++++++-------------- src/traft/error.rs | 12 +- src/traft/mod.rs | 48 +++--- src/traft/network.rs | 51 +++--- src/traft/node.rs | 279 +++++++++++++++++---------------- src/traft/rpc/expel.rs | 8 +- src/traft/rpc/join.rs | 12 +- src/traft/rpc/mod.rs | 2 +- src/traft/rpc/replication.rs | 10 +- src/traft/rpc/sharding.rs | 2 +- src/traft/rpc/update_peer.rs | 81 ---------- src/traft/topology.rs | 210 ++++++++++++------------- test/conftest.py | 4 +- test/int/test_basics.py | 26 +-- test/int/test_expelling.py | 18 +-- test/int/test_joining.py | 44 +++--- test/int/test_replication.py | 8 +- test/int/test_shutdown.py | 4 +- test/int/test_uninitialized.py | 4 +- 22 files changed, 582 insertions(+), 632 deletions(-) delete mode 100644 src/traft/rpc/update_peer.rs diff --git a/src/governor/cc.rs b/src/governor/cc.rs index 7f174b7331..b3ec03315c 100644 --- a/src/governor/cc.rs +++ b/src/governor/cc.rs @@ -5,12 +5,12 @@ use std::collections::BTreeMap; use std::collections::BTreeSet; use crate::traft::CurrentGradeVariant; -use crate::traft::Peer; +use crate::traft::Instance; use crate::traft::RaftId; use crate::traft::TargetGradeVariant; struct RaftConf<'a> { - all: BTreeMap<RaftId, &'a Peer>, + all: BTreeMap<RaftId, &'a Instance>, voters: BTreeSet<RaftId>, learners: BTreeSet<RaftId>, } @@ -48,22 +48,23 @@ impl<'a> RaftConf<'a> { } pub(crate) fn raft_conf_change( - peers: &[Peer], + instances: &[Instance], voters: &[RaftId], learners: &[RaftId], ) -> Option<raft::ConfChangeV2> { let mut raft_conf = RaftConf { - all: peers.iter().map(|p| (p.raft_id, p)).collect(), + all: instances.iter().map(|p| (p.raft_id, p)).collect(), voters: voters.iter().cloned().collect(), learners: learners.iter().cloned().collect(), }; let mut changes: Vec<raft::ConfChangeSingle> = vec![]; - let not_expelled = |peer: &&Peer| !peer.is_expelled(); - let target_online = |peer: &&Peer| peer.target_grade == TargetGradeVariant::Online; - let current_online = |peer: &&Peer| peer.current_grade == CurrentGradeVariant::Online; + let not_expelled = |instance: &&Instance| !instance.is_expelled(); + let target_online = |instance: &&Instance| instance.target_grade == TargetGradeVariant::Online; + let current_online = + |instance: &&Instance| instance.current_grade == CurrentGradeVariant::Online; - let cluster_size = peers.iter().filter(not_expelled).count(); + let cluster_size = instances.iter().filter(not_expelled).count(); let voters_needed = match cluster_size { // five and more nodes -> 5 voters 5.. => 5, @@ -77,31 +78,31 @@ pub(crate) fn raft_conf_change( // Remove / replace voters for voter_id in raft_conf.voters.clone().iter() { - let Some(peer) = raft_conf.all.get(voter_id) else { + let Some(instance) = raft_conf.all.get(voter_id) else { // Nearly impossible, but rust forces me to check it. let ccs = raft_conf.change_single(RemoveNode, *voter_id); changes.push(ccs); continue; }; - match peer.target_grade.variant { + match instance.target_grade.variant { TargetGradeVariant::Online => { // Do nothing } TargetGradeVariant::Offline => { // A voter goes offline. Replace it with // another online instance if possible. - let Some(replacement) = peers.iter().find(|peer| { - peer.has_grades(CurrentGradeVariant::Online, TargetGradeVariant::Online) - && !raft_conf.voters.contains(&peer.raft_id) + let Some(replacement) = instances.iter().find(|instance| { + instance.has_grades(CurrentGradeVariant::Online, TargetGradeVariant::Online) + && !raft_conf.voters.contains(&instance.raft_id) }) else { continue }; - let ccs1 = raft_conf.change_single(AddLearnerNode, peer.raft_id); + let ccs1 = raft_conf.change_single(AddLearnerNode, instance.raft_id); let ccs2 = raft_conf.change_single(AddNode, replacement.raft_id); changes.extend_from_slice(&[ccs1, ccs2]); } TargetGradeVariant::Expelled => { // Expelled instance is removed unconditionally. - let ccs = raft_conf.change_single(RemoveNode, peer.raft_id); + let ccs = raft_conf.change_single(RemoveNode, instance.raft_id); changes.push(ccs); } } @@ -116,41 +117,46 @@ pub(crate) fn raft_conf_change( // Remove unknown / expelled learners for learner_id in raft_conf.learners.clone().iter() { - let Some(peer) = raft_conf.all.get(learner_id) else { + let Some(instance) = raft_conf.all.get(learner_id) else { // Nearly impossible, but rust forces me to check it. let ccs = raft_conf.change_single(RemoveNode, *learner_id); changes.push(ccs); continue; }; - match peer.target_grade.variant { + match instance.target_grade.variant { TargetGradeVariant::Online | TargetGradeVariant::Offline => { // Do nothing } TargetGradeVariant::Expelled => { // Expelled instance is removed unconditionally. - let ccs = raft_conf.change_single(RemoveNode, peer.raft_id); + let ccs = raft_conf.change_single(RemoveNode, instance.raft_id); changes.push(ccs); } } } // Promote more voters - for peer in peers.iter().filter(target_online).filter(current_online) { + for instance in instances + .iter() + .filter(target_online) + .filter(current_online) + { if raft_conf.voters.len() >= voters_needed { break; } - if !raft_conf.voters.contains(&peer.raft_id) { - let ccs = raft_conf.change_single(AddNode, peer.raft_id); + if !raft_conf.voters.contains(&instance.raft_id) { + let ccs = raft_conf.change_single(AddNode, instance.raft_id); changes.push(ccs); } } // Promote remaining instances as learners - for peer in peers.iter().filter(not_expelled) { - if !raft_conf.voters.contains(&peer.raft_id) && !raft_conf.learners.contains(&peer.raft_id) + for instance in instances.iter().filter(not_expelled) { + if !raft_conf.voters.contains(&instance.raft_id) + && !raft_conf.learners.contains(&instance.raft_id) { - let ccs = raft_conf.change_single(AddLearnerNode, peer.raft_id); + let ccs = raft_conf.change_single(AddLearnerNode, instance.raft_id); changes.push(ccs); } } @@ -174,7 +180,7 @@ mod tests { use crate::traft::CurrentGradeVariant; use crate::traft::Grade; - use crate::traft::Peer; + use crate::traft::Instance; use crate::traft::RaftId; use crate::traft::TargetGradeVariant; @@ -184,7 +190,7 @@ mod tests { $current_grade:ident -> $target_grade:ident ) => { - Peer { + Instance { raft_id: $raft_id, current_grade: Grade { variant: CurrentGradeVariant::$current_grade, @@ -196,7 +202,7 @@ mod tests { // raft_conf_change doesn't care about incarnations incarnation: 0, }, - ..Peer::default() + ..Instance::default() } }; @@ -226,7 +232,7 @@ mod tests { }}; } - fn cc(p: &[Peer], v: &[RaftId], l: &[RaftId]) -> Option<raft::ConfChangeV2> { + fn cc(p: &[Instance], v: &[RaftId], l: &[RaftId]) -> Option<raft::ConfChangeV2> { let mut cc = super::raft_conf_change(p, v, l)?; cc.changes.sort_by(|l, r| Ord::cmp(&l.node_id, &r.node_id)); Some(cc) diff --git a/src/main.rs b/src/main.rs index c0d687efea..7705c436f4 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,7 +23,7 @@ use crate::tlog::set_log_level; use crate::traft::event::Event; use crate::traft::{event, node, InstanceId, Migration, OpDML}; use crate::traft::{LogicalClock, RaftIndex, TargetGradeVariant}; -use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; +use crate::traft::{UpdateInstanceRequest, UpdateInstanceResponse}; use traft::error::Error; mod app; @@ -82,26 +82,26 @@ fn picolib_setup(args: &args::Run) { ); luamod.set( - "peer_info", + "instance_info", tlua::function1(|iid: Option<String>| -> traft::Result<_> { let node = traft::node::global()?; let iid = iid.unwrap_or(node.raft_storage.instance_id()?.unwrap()); - let peer = node.storage.peers.get(&InstanceId::from(iid))?; + let instance = node.storage.instances.get(&InstanceId::from(iid))?; let peer_address = node .storage .peer_addresses - .get(peer.raft_id)? + .get(instance.raft_id)? .unwrap_or_else(|| "<unknown>".into()); Ok(tlua::AsTable(( - ("raft_id", peer.raft_id), + ("raft_id", instance.raft_id), ("advertise_address", peer_address), - ("instance_id", peer.instance_id.0), - ("instance_uuid", peer.instance_uuid), - ("replicaset_id", peer.replicaset_id), - ("replicaset_uuid", peer.replicaset_uuid), - ("current_grade", peer.current_grade), - ("target_grade", peer.target_grade), + ("instance_id", instance.instance_id.0), + ("instance_uuid", instance.instance_uuid), + ("replicaset_id", instance.replicaset_id), + ("replicaset_uuid", instance.replicaset_uuid), + ("current_grade", instance.current_grade), + ("target_grade", instance.target_grade), ))) }), ); @@ -460,7 +460,7 @@ fn init_handlers() { declare_cfunc!(traft::rpc::expel::proc_expel_on_leader); declare_cfunc!(traft::rpc::expel::redirect::proc_expel_redirect); declare_cfunc!(traft::rpc::sync::proc_sync_raft); - declare_cfunc!(traft::rpc::update_peer::proc_update_peer); + declare_cfunc!(traft::rpc::update_instance::proc_update_instance); declare_cfunc!(traft::rpc::replication::proc_replication); declare_cfunc!(traft::rpc::replication::promote::proc_replication_promote); declare_cfunc!(traft::rpc::sharding::proc_sharding); @@ -750,15 +750,15 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { fn start_boot(args: &args::Run) { tlog!(Info, ">>>>> start_boot()"); - let (peer, address) = traft::topology::initial_peer( + let (instance, address) = traft::topology::initial_instance( args.instance_id.clone(), args.replicaset_id.clone(), args.advertise_address(), args.failure_domain(), ) - .expect("failed adding initial peer"); - let raft_id = peer.raft_id; - let instance_id = peer.instance_id.clone(); + .expect("failed adding initial instance"); + let raft_id = instance.raft_id; + let instance_id = instance.instance_id.clone(); picolib_setup(args); assert!(tarantool::cfg().is_none()); @@ -766,8 +766,8 @@ fn start_boot(args: &args::Run) { let cfg = tarantool::Cfg { listen: None, read_only: false, - instance_uuid: Some(peer.instance_uuid.clone()), - replicaset_uuid: Some(peer.replicaset_uuid.clone()), + instance_uuid: Some(instance.instance_uuid.clone()), + replicaset_uuid: Some(instance.replicaset_uuid.clone()), wal_dir: args.data_dir.clone(), memtx_dir: args.data_dir.clone(), log_level: args.log_level() as u8, @@ -808,7 +808,7 @@ fn start_boot(args: &args::Run) { .expect("cannot fail") .into(), ); - init_entries_push_op(traft::Op::persist_peer(peer)); + init_entries_push_op(traft::Op::persist_instance(instance)); init_entries_push_op( OpDML::insert( ClusterwideSpace::State, @@ -922,8 +922,8 @@ fn start_join(args: &args::Run, leader_address: String) { let cfg = tarantool::Cfg { listen: Some(args.listen.clone()), read_only: resp.box_replication.len() > 1, - instance_uuid: Some(resp.peer.instance_uuid.clone()), - replicaset_uuid: Some(resp.peer.replicaset_uuid.clone()), + instance_uuid: Some(resp.instance.instance_uuid.clone()), + replicaset_uuid: Some(resp.instance.replicaset_uuid.clone()), replication: resp.box_replication.clone(), wal_dir: args.data_dir.clone(), memtx_dir: args.data_dir.clone(), @@ -933,15 +933,15 @@ fn start_join(args: &args::Run, leader_address: String) { let (storage, raft_storage) = init_common(args, &cfg); - let raft_id = resp.peer.raft_id; + let raft_id = resp.instance.raft_id; start_transaction(|| -> Result<(), TntError> { - storage.peers.put(&resp.peer).unwrap(); + storage.instances.put(&resp.instance).unwrap(); for traft::PeerAddress { raft_id, address } in resp.peer_addresses { storage.peer_addresses.put(raft_id, &address).unwrap(); } raft_storage.persist_raft_id(raft_id).unwrap(); raft_storage - .persist_instance_id(&resp.peer.instance_id) + .persist_instance_id(&resp.instance.instance_id) .unwrap(); raft_storage.persist_cluster_id(&args.cluster_id).unwrap(); Ok(()) @@ -990,10 +990,10 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces } loop { - let peer = storage - .peers + let instance = storage + .instances .get(&raft_id) - .expect("peer must be persisted at the time of postjoin"); + .expect("instance must be persisted at the time of postjoin"); let cluster_id = raft_storage .cluster_id() .unwrap() @@ -1008,21 +1008,25 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces continue; }; - tlog!(Info, "initiating self-activation of {}", peer.instance_id); - let req = UpdatePeerRequest::new(peer.instance_id, cluster_id) + tlog!( + Info, + "initiating self-activation of {}", + instance.instance_id + ); + let req = UpdateInstanceRequest::new(instance.instance_id, cluster_id) .with_target_grade(TargetGradeVariant::Online) .with_failure_domain(args.failure_domain()); - // It's necessary to call `proc_update_peer` remotely on a + // It's necessary to call `proc_update_instance` remotely on a // leader over net_box. It always fails otherwise. Only the - // leader is permitted to propose PersistPeer entries. + // leader is permitted to propose PersistInstance entries. let now = Instant::now(); let timeout = Duration::from_secs(10); match rpc::net_box_call(&leader_address, &req, timeout) { - Ok(UpdatePeerResponse::Ok) => { + Ok(UpdateInstanceResponse::Ok) => { break; } - Ok(UpdatePeerResponse::ErrNotALeader) => { + Ok(UpdateInstanceResponse::ErrNotALeader) => { tlog!(Warning, "failed to activate myself: not a leader, retry..."); fiber::sleep(Duration::from_millis(100)); continue; diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index b2ae75a8e8..b3f0bd1346 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -10,7 +10,7 @@ use crate::traft::node; use crate::traft::rpc; use crate::traft::CurrentGradeVariant; use crate::traft::TargetGradeVariant; -use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; +use crate::traft::{UpdateInstanceRequest, UpdateInstanceResponse}; use crate::unwrap_ok_or; pub fn callback() { @@ -29,7 +29,7 @@ pub fn callback() { let raft_id = node.raft_id(); loop { let me = unwrap_ok_or!( - node.storage.peers.get(&raft_id), + node.storage.instances.get(&raft_id), Err(e) => { tlog!(Error, "{e}"); break; @@ -56,8 +56,8 @@ pub fn callback() { let quorum = voters.len() / 2 + 1; let voters_alive = voters .iter() - .filter_map(|raft_id| node.storage.peers.get(raft_id).ok()) - .filter(|peer| peer.current_grade == CurrentGradeVariant::Online) + .filter_map(|raft_id| node.storage.instances.get(raft_id).ok()) + .filter(|instance| instance.current_grade == CurrentGradeVariant::Online) .count(); if voters_alive < quorum { @@ -73,13 +73,13 @@ fn go_offline() -> traft::Result<()> { let node = node::global()?; let raft_id = node.raft_id(); - let peer = node.storage.peers.get(&raft_id)?; + let instance = node.storage.instances.get(&raft_id)?; let cluster_id = node .raft_storage .cluster_id()? .ok_or_else(|| Error::other("missing cluster_id value in storage"))?; - let req = UpdatePeerRequest::new(peer.instance_id, cluster_id) + let req = UpdateInstanceRequest::new(instance.instance_id, cluster_id) .with_target_grade(TargetGradeVariant::Offline); loop { @@ -110,8 +110,8 @@ fn go_offline() -> traft::Result<()> { continue; }; let res = match rpc::net_box_call(&leader_address, &req, Duration::MAX) { - Ok(UpdatePeerResponse::Ok) => Ok(()), - Ok(UpdatePeerResponse::ErrNotALeader) => Err(Error::NotALeader), + Ok(UpdateInstanceResponse::Ok) => Ok(()), + Ok(UpdateInstanceResponse::ErrNotALeader) => Err(Error::NotALeader), Err(e) => Err(e.into()), }; diff --git a/src/storage.rs b/src/storage.rs index 2f7176aae7..c12b5d5ed0 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -84,7 +84,7 @@ impl ClusterwideSpace { #[derive(Clone, Debug)] pub struct Clusterwide { pub state: State, - pub peers: Peers, + pub instances: Instances, pub peer_addresses: PeerAddresses, pub replicasets: Replicasets, pub migrations: Migrations, @@ -94,7 +94,7 @@ impl Clusterwide { pub fn new() -> tarantool::Result<Self> { Ok(Self { state: State::new()?, - peers: Peers::new()?, + instances: Instances::new()?, peer_addresses: PeerAddresses::new()?, replicasets: Replicasets::new()?, migrations: Migrations::new()?, @@ -271,7 +271,7 @@ impl PeerAddresses { const INDEX_RAFT_ID: &'static str = "raft_id"; pub fn new() -> tarantool::Result<Self> { - let space_peers = Space::builder(Self::SPACE_NAME) + let space_instances = Space::builder(Self::SPACE_NAME) .is_local(true) .is_temporary(false) .field(("raft_id", FieldType::Unsigned)) @@ -279,7 +279,7 @@ impl PeerAddresses { .if_not_exists(true) .create()?; - let index_raft_id = space_peers + let index_raft_id = space_instances .index_builder(Self::INDEX_RAFT_ID) .unique(true) .part("raft_id") @@ -287,7 +287,7 @@ impl PeerAddresses { .create()?; Ok(Self { - space: space_peers, + space: space_instances, index_raft_id, }) } @@ -349,55 +349,55 @@ impl Iterator for PeerAddressIter { } //////////////////////////////////////////////////////////////////////////////// -// Peers +// Instance //////////////////////////////////////////////////////////////////////////////// -/// A struct for accessing storage of all the cluster peers. +/// A struct for accessing storage of all the cluster instances. #[derive(Clone, Debug)] -pub struct Peers { +pub struct Instances { space: Space, index_instance_id: Index, index_raft_id: Index, index_replicaset_id: Index, } -impl Peers { +impl Instances { const SPACE_NAME: &'static str = ClusterwideSpace::Instance.as_str(); const INDEX_INSTANCE_ID: &'static str = "instance_id"; const INDEX_RAFT_ID: &'static str = "raft_id"; const INDEX_REPLICASET_ID: &'static str = "replicaset_id"; pub fn new() -> tarantool::Result<Self> { - let space_peers = Space::builder(Self::SPACE_NAME) + let space_instances = Space::builder(Self::SPACE_NAME) .is_local(true) .is_temporary(false) - .format(peer_format()) + .format(instance_format()) .if_not_exists(true) .create()?; - let index_instance_id = space_peers + let index_instance_id = space_instances .index_builder(Self::INDEX_INSTANCE_ID) .unique(true) - .part(peer_field::InstanceId) + .part(instance_field::InstanceId) .if_not_exists(true) .create()?; - let index_raft_id = space_peers + let index_raft_id = space_instances .index_builder(Self::INDEX_RAFT_ID) .unique(true) - .part(peer_field::RaftId) + .part(instance_field::RaftId) .if_not_exists(true) .create()?; - let index_replicaset_id = space_peers + let index_replicaset_id = space_instances .index_builder(Self::INDEX_REPLICASET_ID) .unique(false) - .part(peer_field::ReplicasetId) + .part(instance_field::ReplicasetId) .if_not_exists(true) .create()?; Ok(Self { - space: space_peers, + space: space_instances, index_instance_id, index_raft_id, index_replicaset_id, @@ -405,8 +405,8 @@ impl Peers { } #[inline] - pub fn put(&self, peer: &traft::Peer) -> tarantool::Result<()> { - self.space.replace(peer)?; + pub fn put(&self, instance: &traft::Instance) -> tarantool::Result<()> { + self.space.replace(instance)?; Ok(()) } @@ -417,56 +417,59 @@ impl Peers { Ok(()) } - /// Find a peer by `raft_id` and return a single field specified by `F` - /// (see `PeerFieldDef` & `peer_field` module). + /// Find a instance by `raft_id` and return a single field specified by `F` + /// (see `InstanceFieldDef` & `instance_field` module). #[inline(always)] - pub fn get(&self, id: &impl PeerId) -> Result<traft::Peer> { - let res = id.find_in(self)?.decode().expect("failed to decode peer"); + pub fn get(&self, id: &impl InstanceId) -> Result<traft::Instance> { + let res = id + .find_in(self)? + .decode() + .expect("failed to decode instance"); Ok(res) } - /// Find a peer by `id` (see `PeerId`) and return a single field - /// specified by `F` (see `PeerFieldDef` & `peer_field` module). + /// Find a instance by `id` (see `InstanceId`) and return a single field + /// specified by `F` (see `InstanceFieldDef` & `instance_field` module). #[inline(always)] - pub fn peer_field<F>(&self, id: &impl PeerId) -> Result<F::Type> + pub fn instance_field<F>(&self, id: &impl InstanceId) -> Result<F::Type> where - F: PeerFieldDef, + F: InstanceFieldDef, { let tuple = id.find_in(self)?; let res = F::get_in(&tuple)?; Ok(res) } - /// Return an iterator over all peers. Items of the iterator are - /// specified by `F` (see `PeerFieldDef` & `peer_field` module). + /// Return an iterator over all instances. Items of the iterator are + /// specified by `F` (see `InstanceFieldDef` & `instance_field` module). #[inline(always)] - pub fn peers_fields<F>(&self) -> Result<PeersFields<F>> + pub fn instances_fields<F>(&self) -> Result<InstancesFields<F>> where - F: PeerFieldDef, + F: InstanceFieldDef, { let iter = self.space.select(IteratorType::All, &())?; - Ok(PeersFields::new(iter)) + Ok(InstancesFields::new(iter)) } #[inline] - pub fn iter(&self) -> tarantool::Result<PeerIter> { + pub fn iter(&self) -> tarantool::Result<InstanceIter> { let iter = self.space.select(IteratorType::All, &())?; - Ok(PeerIter::new(iter)) + Ok(InstanceIter::new(iter)) } #[inline] - pub fn all_peers(&self) -> tarantool::Result<Vec<traft::Peer>> { + pub fn all_instances(&self) -> tarantool::Result<Vec<traft::Instance>> { self.space .select(IteratorType::All, &())? .map(|tuple| tuple.decode()) .collect() } - pub fn replicaset_peers(&self, replicaset_id: &str) -> tarantool::Result<PeerIter> { + pub fn replicaset_instances(&self, replicaset_id: &str) -> tarantool::Result<InstanceIter> { let iter = self .index_replicaset_id .select(IteratorType::Eq, &[replicaset_id])?; - Ok(PeerIter::new(iter)) + Ok(InstanceIter::new(iter)) } pub fn replicaset_fields<T>( @@ -474,7 +477,7 @@ impl Peers { replicaset_id: &traft::ReplicasetId, ) -> tarantool::Result<Vec<T::Type>> where - T: PeerFieldDef, + T: InstanceFieldDef, { self.index_replicaset_id .select(IteratorType::Eq, &[replicaset_id])? @@ -484,38 +487,38 @@ impl Peers { } //////////////////////////////////////////////////////////////////////////////// -// PeerField +// InstanceField //////////////////////////////////////////////////////////////////////////////// -macro_rules! define_peer_fields { +macro_rules! define_instance_fields { ($($field:ident: $ty:ty = ($name:literal, $tt_ty:path))+) => { ::tarantool::define_str_enum! { /// An enumeration of raft_space field names - pub enum PeerField { + pub enum InstanceField { $($field = $name,)+ } } - pub mod peer_field { + pub mod instance_field { use super::*; $( /// Helper struct that represents #[doc = stringify!($name)] - /// field of [`Peer`]. + /// field of [`Instance`]. /// /// It's rust type is #[doc = concat!("`", stringify!($ty), "`")] /// and it's tarantool type is #[doc = concat!("`", stringify!($tt_ty), "`")] /// - /// [`Peer`]: crate::traft::Peer + /// [`Instance`]: crate::traft::Instance pub struct $field; - impl PeerFieldDef for $field { + impl InstanceFieldDef for $field { type Type = $ty; fn get_in(tuple: &Tuple) -> tarantool::Result<Self::Type> { - Ok(tuple.try_get($name)?.expect("peer fields aren't nullable")) + Ok(tuple.try_get($name)?.expect("instance fields aren't nullable")) } } @@ -545,7 +548,7 @@ macro_rules! define_peer_fields { )+ } - fn peer_format() -> Vec<::tarantool::space::Field> { + fn instance_format() -> Vec<::tarantool::space::Field> { vec![ $( ::tarantool::space::Field::from(($name, $tt_ty)), )+ ] @@ -553,7 +556,7 @@ macro_rules! define_peer_fields { }; } -define_peer_fields! { +define_instance_fields! { InstanceId : traft::InstanceId = ("instance_id", FieldType::String) InstanceUuid : String = ("instance_uuid", FieldType::String) RaftId : traft::RaftId = ("raft_id", FieldType::Unsigned) @@ -564,7 +567,7 @@ define_peer_fields! { FailureDomain : traft::FailureDomain = ("failure_domain", FieldType::Map) } -impl tarantool::tuple::TupleIndex for PeerField { +impl tarantool::tuple::TupleIndex for InstanceField { fn get_field<'a, T>(self, tuple: &'a Tuple) -> tarantool::Result<Option<T>> where T: tarantool::tuple::Decode<'a>, @@ -573,11 +576,11 @@ impl tarantool::tuple::TupleIndex for PeerField { } } -/// A helper trait for type-safe and efficient access to a Peer's fields +/// A helper trait for type-safe and efficient access to a Instance's fields /// without deserializing the whole tuple. /// /// This trait contains information needed to define and use a given tuple field. -pub trait PeerFieldDef { +pub trait InstanceFieldDef { /// Rust type of the field. /// /// Used when decoding the field. @@ -587,15 +590,15 @@ pub trait PeerFieldDef { fn get_in(tuple: &Tuple) -> tarantool::Result<Self::Type>; } -macro_rules! define_peer_field_def_for_tuples { +macro_rules! define_instance_field_def_for_tuples { () => {}; ($h:ident $($t:ident)*) => { - impl<$h, $($t),*> PeerFieldDef for ($h, $($t),*) + impl<$h, $($t),*> InstanceFieldDef for ($h, $($t),*) where - $h: PeerFieldDef, + $h: InstanceFieldDef, $h::Type: serde::de::DeserializeOwned, $( - $t: PeerFieldDef, + $t: InstanceFieldDef, $t::Type: serde::de::DeserializeOwned, )* { @@ -606,82 +609,82 @@ macro_rules! define_peer_field_def_for_tuples { } } - define_peer_field_def_for_tuples!{ $($t)* } + define_instance_field_def_for_tuples!{ $($t)* } }; } -define_peer_field_def_for_tuples! { +define_instance_field_def_for_tuples! { T0 T1 T2 T3 T4 T5 T6 T7 T8 T9 T10 T11 T12 T13 T14 T15 } //////////////////////////////////////////////////////////////////////////////// -// PeerId +// InstanceId //////////////////////////////////////////////////////////////////////////////// -/// Types implementing this trait can be used to identify a `Peer` when +/// Types implementing this trait can be used to identify a `Instance` when /// accessing storage. -pub trait PeerId: serde::Serialize { - fn find_in(&self, peers: &Peers) -> Result<Tuple>; +pub trait InstanceId: serde::Serialize { + fn find_in(&self, instances: &Instances) -> Result<Tuple>; } -impl PeerId for RaftId { +impl InstanceId for RaftId { #[inline(always)] - fn find_in(&self, peers: &Peers) -> Result<Tuple> { - peers + fn find_in(&self, instances: &Instances) -> Result<Tuple> { + instances .index_raft_id .get(&[self])? - .ok_or(Error::NoPeerWithRaftId(*self)) + .ok_or(Error::NoInstanceWithRaftId(*self)) } } -impl PeerId for traft::InstanceId { +impl InstanceId for traft::InstanceId { #[inline(always)] - fn find_in(&self, peers: &Peers) -> Result<Tuple> { - peers + fn find_in(&self, instances: &Instances) -> Result<Tuple> { + instances .index_instance_id .get(&[self])? - .ok_or_else(|| Error::NoPeerWithInstanceId(self.clone())) + .ok_or_else(|| Error::NoInstanceWithInstanceId(self.clone())) } } //////////////////////////////////////////////////////////////////////////////// -// PeerIter +// InstanceIter //////////////////////////////////////////////////////////////////////////////// -pub struct PeerIter { +pub struct InstanceIter { iter: IndexIterator, } -impl PeerIter { +impl InstanceIter { fn new(iter: IndexIterator) -> Self { Self { iter } } } -impl Iterator for PeerIter { - type Item = traft::Peer; +impl Iterator for InstanceIter { + type Item = traft::Instance; fn next(&mut self) -> Option<Self::Item> { let res = self.iter.next().as_ref().map(Tuple::decode); - res.map(|res| res.expect("peer should decode correctly")) + res.map(|res| res.expect("instance should decode correctly")) } } -impl std::fmt::Debug for PeerIter { +impl std::fmt::Debug for InstanceIter { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PeerIter").finish_non_exhaustive() + f.debug_struct("InstanceIter").finish_non_exhaustive() } } //////////////////////////////////////////////////////////////////////////////// -// PeersFields +// InstancesFields //////////////////////////////////////////////////////////////////////////////// -pub struct PeersFields<F> { +pub struct InstancesFields<F> { iter: IndexIterator, marker: PhantomData<F>, } -impl<F> PeersFields<F> { +impl<F> InstancesFields<F> { fn new(iter: IndexIterator) -> Self { Self { iter, @@ -690,15 +693,15 @@ impl<F> PeersFields<F> { } } -impl<F> Iterator for PeersFields<F> +impl<F> Iterator for InstancesFields<F> where - F: PeerFieldDef, + F: InstanceFieldDef, { type Item = F::Type; fn next(&mut self) -> Option<Self::Item> { let res = self.iter.next().as_ref().map(F::get_in); - res.map(|res| res.expect("peer should decode correctly")) + res.map(|res| res.expect("instance should decode correctly")) } } @@ -792,18 +795,18 @@ macro_rules! assert_err { #[rustfmt::skip] inventory::submit!(crate::InnerTest { - name: "test_storage_peers", + name: "test_storage_instances", body: || { use traft::{CurrentGradeVariant as CurrentGrade, TargetGradeVariant as TargetGrade, InstanceId}; - let storage_peers = Peers::new().unwrap(); - let space_peers = storage_peers.space.clone(); + let storage_instances = Instances::new().unwrap(); + let space_instances = storage_instances.space.clone(); let storage_peer_addresses = PeerAddresses::new().unwrap(); let space_peer_addresses = storage_peer_addresses.space.clone(); let faildom = crate::traft::FailureDomain::from([("a", "b")]); - for peer in vec![ + for instance in vec![ // r1 ("i1", "i1-uuid", 1u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), ("i2", "i2-uuid", 2u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), @@ -813,22 +816,22 @@ inventory::submit!(crate::InnerTest { // r3 ("i5", "i5-uuid", 5u64, "r3", "r3-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,), ] { - space_peers.put(&peer).unwrap(); - let (_, _, raft_id, ..) = peer; + space_instances.put(&instance).unwrap(); + let (_, _, raft_id, ..) = instance; space_peer_addresses.put(&(raft_id, format!("addr:{raft_id}"))).unwrap(); } - let peers = storage_peers.all_peers().unwrap(); + let instance = storage_instances.all_instances().unwrap(); assert_eq!( - peers.iter().map(|p| &p.instance_id).collect::<Vec<_>>(), + instance.iter().map(|p| &p.instance_id).collect::<Vec<_>>(), vec!["i1", "i2", "i3", "i4", "i5"] ); assert_err!( - storage_peers.put(&traft::Peer { + storage_instances.put(&traft::Instance { raft_id: 1, instance_id: "i99".into(), - ..traft::Peer::default() + ..traft::Instance::default() }), format!( concat!( @@ -856,31 +859,31 @@ inventory::submit!(crate::InnerTest { } { - // Check accessing peers by 'raft_id' - assert_eq!(storage_peers.get(&1).unwrap().instance_id, "i1"); - assert_eq!(storage_peers.get(&2).unwrap().instance_id, "i2"); - assert_eq!(storage_peers.get(&3).unwrap().instance_id, "i3"); - assert_eq!(storage_peers.get(&4).unwrap().instance_id, "i4"); - assert_eq!(storage_peers.get(&5).unwrap().instance_id, "i5"); - assert_err!(storage_peers.get(&6), "peer with id 6 not found"); + // Check accessing instances by 'raft_id' + assert_eq!(storage_instances.get(&1).unwrap().instance_id, "i1"); + assert_eq!(storage_instances.get(&2).unwrap().instance_id, "i2"); + assert_eq!(storage_instances.get(&3).unwrap().instance_id, "i3"); + assert_eq!(storage_instances.get(&4).unwrap().instance_id, "i4"); + assert_eq!(storage_instances.get(&5).unwrap().instance_id, "i5"); + assert_err!(storage_instances.get(&6), "instance with id 6 not found"); } { - // Check accessing peers by 'instance_id' - assert_eq!(storage_peers.get(&InstanceId::from("i1")).unwrap().raft_id, 1); - assert_eq!(storage_peers.get(&InstanceId::from("i2")).unwrap().raft_id, 2); - assert_eq!(storage_peers.get(&InstanceId::from("i3")).unwrap().raft_id, 3); - assert_eq!(storage_peers.get(&InstanceId::from("i4")).unwrap().raft_id, 4); - assert_eq!(storage_peers.get(&InstanceId::from("i5")).unwrap().raft_id, 5); + // Check accessing instances by 'instance_id' + assert_eq!(storage_instances.get(&InstanceId::from("i1")).unwrap().raft_id, 1); + assert_eq!(storage_instances.get(&InstanceId::from("i2")).unwrap().raft_id, 2); + assert_eq!(storage_instances.get(&InstanceId::from("i3")).unwrap().raft_id, 3); + assert_eq!(storage_instances.get(&InstanceId::from("i4")).unwrap().raft_id, 4); + assert_eq!(storage_instances.get(&InstanceId::from("i5")).unwrap().raft_id, 5); assert_err!( - storage_peers.get(&InstanceId::from("i6")), - "peer with id \"i6\" not found" + storage_instances.get(&InstanceId::from("i6")), + "instance with id \"i6\" not found" ); } let box_replication = |replicaset_id: &str| -> Vec<traft::Address> { - storage_peers.replicaset_peers(replicaset_id).unwrap() - .map(|peer| storage_peer_addresses.try_get(peer.raft_id).unwrap()) + storage_instances.replicaset_instances(replicaset_id).unwrap() + .map(|instance| storage_peer_addresses.try_get(instance.raft_id).unwrap()) .collect::<Vec<_>>() }; @@ -891,43 +894,43 @@ inventory::submit!(crate::InnerTest { assert_eq!(box_replication("r3"), ["addr:5"]); } - space_peers.index("raft_id").unwrap().drop().unwrap(); + space_instances.index("raft_id").unwrap().drop().unwrap(); assert_err!( - storage_peers.get(&1), + storage_instances.get(&1), concat!( "Tarantool error: NoSuchIndexID: No index #1 is defined", " in space '_picodata_instance'", ) ); - space_peers.index("replicaset_id").unwrap().drop().unwrap(); + space_instances.index("replicaset_id").unwrap().drop().unwrap(); assert_err!( - storage_peers.replicaset_peers(""), + storage_instances.replicaset_instances(""), concat!( "Tarantool error: NoSuchIndexID: No index #2 is defined", " in space '_picodata_instance'", ) ); - space_peers.primary_key().drop().unwrap(); + space_instances.primary_key().drop().unwrap(); assert_err!( - storage_peers.get(&InstanceId::from("i1")), + storage_instances.get(&InstanceId::from("i1")), concat!( "Tarantool error: NoSuchIndexID: No index #0 is defined", " in space '_picodata_instance'", ) ); - space_peers.drop().unwrap(); + space_instances.drop().unwrap(); assert_err!( - storage_peers.all_peers(), + storage_instances.all_instances(), format!( "Tarantool error: NoSuchSpace: Space '{}' does not exist", - space_peers.id(), + space_instances.id(), ) ); } diff --git a/src/traft/error.rs b/src/traft/error.rs index 6a1f67ba32..f0740fbbd6 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -20,13 +20,13 @@ pub enum Error { expected: &'static str, actual: &'static str, }, - /// cluster_id of the joining peer mismatches the cluster_id of the cluster + /// cluster_id of the joining instance mismatches the cluster_id of the cluster #[error("cannot join the instance to the cluster: cluster_id mismatch: cluster_id of the instance = {instance_cluster_id:?}, cluster_id of the cluster = {cluster_cluster_id:?}")] ClusterIdMismatch { instance_cluster_id: String, cluster_cluster_id: String, }, - /// Peer was requested to configure replication with different replicaset. + /// Instance was requested to configure replication with different replicaset. #[error("cannot replicate with different replicaset: expected {instance_rsid:?}, requested {requested_rsid:?}")] ReplicasetIdMismatch { instance_rsid: String, @@ -43,10 +43,10 @@ pub enum Error { Lua(#[from] LuaError), #[error("{0}")] Tarantool(#[from] ::tarantool::error::Error), - #[error("peer with id {0} not found")] - NoPeerWithRaftId(RaftId), - #[error("peer with id \"{0}\" not found")] - NoPeerWithInstanceId(InstanceId), + #[error("instance with id {0} not found")] + NoInstanceWithRaftId(RaftId), + #[error("instance with id \"{0}\" not found")] + NoInstanceWithInstanceId(InstanceId), #[error("address of peer with id {0} not found")] AddressUnknownForRaftId(RaftId), #[error("address of peer with id \"{0}\" not found")] diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 21935d08a9..213b957286 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -31,8 +31,8 @@ pub use network::ConnectionPool; pub use raft_storage::RaftSpaceAccess; pub use rpc::join::Request as JoinRequest; pub use rpc::join::Response as JoinResponse; -pub use rpc::update_peer::Request as UpdatePeerRequest; -pub use rpc::update_peer::Response as UpdatePeerResponse; +pub use rpc::update_instance::Request as UpdateInstanceRequest; +pub use rpc::update_instance::Response as UpdateInstanceResponse; pub use topology::Topology; use self::event::Event; @@ -109,8 +109,8 @@ pub enum Op { EvalLua(OpEvalLua), /// ReturnOne(OpReturnOne), - PersistPeer { - peer: Box<Peer>, + PersistInstance { + instance: Box<Instance>, }, /// Cluster-wide data modification operation. /// Should be used to manipulate the cluster-wide configuration. @@ -124,8 +124,8 @@ impl std::fmt::Display for Op { Self::Info { msg } => write!(f, "Info({msg:?})"), Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), Self::ReturnOne(_) => write!(f, "ReturnOne"), - Self::PersistPeer { peer } => { - write!(f, "PersistPeer{}", peer) + Self::PersistInstance { instance } => { + write!(f, "PersistInstance{}", instance) } Self::Dml(OpDML::Insert { space, tuple }) => { write!(f, "Insert({space}, {})", DisplayAsJson(tuple)) @@ -174,7 +174,7 @@ impl std::fmt::Display for Op { } impl Op { - pub fn on_commit(&self, peers: &storage::Peers) -> Box<dyn AnyWithTypeName> { + pub fn on_commit(&self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> { match self { Self::Nop => Box::new(()), Self::Info { msg } => { @@ -183,9 +183,9 @@ impl Op { } Self::EvalLua(op) => Box::new(op.result()), Self::ReturnOne(op) => Box::new(op.result()), - Self::PersistPeer { peer } => { - peers.put(peer).unwrap(); - Box::new(peer.clone()) + Self::PersistInstance { instance } => { + instances.put(instance).unwrap(); + Box::new(instance.clone()) } Self::Dml(op) => { let res = Box::new(op.result()); @@ -197,9 +197,9 @@ impl Op { } } - pub fn persist_peer(peer: Peer) -> Self { - Self::PersistPeer { - peer: Box::new(peer), + pub fn persist_instance(instance: Instance) -> Self { + Self::PersistInstance { + instance: Box::new(instance), } } } @@ -403,7 +403,7 @@ impl Encode for PeerAddress {} ////////////////////////////////////////////////////////////////////////////////////////// /// Serializable struct representing a member of the raft group. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct Peer { +pub struct Instance { /// Instances are identified by name. pub instance_id: InstanceId, pub instance_uuid: String, @@ -423,11 +423,13 @@ pub struct Peer { /// Instance failure domains. Instances with overlapping failure domains /// must not be in the same replicaset. + // TODO: raft_group space is kinda bloated, maybe we should store some data + // in different spaces/not deserialize the whole tuple every time? pub failure_domain: FailureDomain, } -impl Encode for Peer {} +impl Encode for Instance {} -impl Peer { +impl Instance { pub fn is_online(&self) -> bool { // FIXME: this is probably not what we want anymore matches!(self.current_grade.variant, CurrentGradeVariant::Online) @@ -437,7 +439,7 @@ impl Peer { matches!(self.target_grade.variant, TargetGradeVariant::Expelled) } - /// Peer has a grade that implies it may cooperate. + /// Instance has a grade that implies it may cooperate. /// Currently this means that target_grade is neither Offline or Expelled. #[inline] pub fn may_respond(&self) -> bool { @@ -472,7 +474,7 @@ impl Peer { } } -impl std::fmt::Display for Peer { +impl std::fmt::Display for Instance { #[rustfmt::skip] fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { return write!(f, @@ -608,7 +610,7 @@ impl EntryContextNormal { /// [`EntryContext`] of a conf change entry, either `EntryConfChange` or `EntryConfChangeV2` #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct EntryContextConfChange { - pub peers: Vec<Peer>, + pub instances: Vec<Instance>, } impl Encode for Entry {} @@ -820,7 +822,7 @@ pub trait ContextCoercion: Serialize + DeserializeOwned { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TopologyRequest { Join(JoinRequest), - UpdatePeer(UpdatePeerRequest), + UpdateInstance(UpdateInstanceRequest), } impl From<JoinRequest> for TopologyRequest { @@ -829,9 +831,9 @@ impl From<JoinRequest> for TopologyRequest { } } -impl From<UpdatePeerRequest> for TopologyRequest { - fn from(a: UpdatePeerRequest) -> Self { - Self::UpdatePeer(a) +impl From<UpdateInstanceRequest> for TopologyRequest { + fn from(a: UpdateInstanceRequest) -> Self { + Self::UpdateInstance(a) } } diff --git a/src/traft/network.rs b/src/traft/network.rs index 4141960855..3dbdde5b53 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -14,7 +14,7 @@ use std::rc::Rc; use std::time::{Duration, Instant}; use crate::mailbox::Mailbox; -use crate::storage::{peer_field, Clusterwide, PeerAddresses, Peers}; +use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses}; use crate::tlog; use crate::traft; use crate::traft::error::Error; @@ -364,7 +364,7 @@ impl ConnectionPoolBuilder { workers: HashMap::new(), raft_ids: HashMap::new(), peer_addresses: self.storage.peer_addresses, - peers: self.storage.peers, + instances: self.storage.instances, } } } @@ -379,7 +379,7 @@ pub struct ConnectionPool { workers: HashMap<RaftId, PoolWorker>, raft_ids: HashMap<InstanceId, RaftId>, peer_addresses: PeerAddresses, - peers: Peers, + instances: Instances, } impl ConnectionPool { @@ -401,9 +401,9 @@ impl ConnectionPool { Entry::Occupied(entry) => Ok(entry.into_mut()), Entry::Vacant(entry) => { let instance_id = self - .peers - .peer_field::<peer_field::InstanceId>(&raft_id) - .map_err(|_| Error::NoPeerWithRaftId(raft_id)) + .instances + .instance_field::<instance_field::InstanceId>(&raft_id) + .map_err(|_| Error::NoInstanceWithRaftId(raft_id)) .ok(); // Check if address of this peer is known. // No need to store the result, @@ -435,9 +435,9 @@ impl ConnectionPool { Entry::Vacant(entry) => { let instance_id = entry.key(); let raft_id = self - .peers - .peer_field::<peer_field::RaftId>(instance_id) - .map_err(|_| Error::NoPeerWithInstanceId(instance_id.clone()))?; + .instances + .instance_field::<instance_field::RaftId>(instance_id) + .map_err(|_| Error::NoInstanceWithInstanceId(instance_id.clone()))?; let worker = PoolWorker::run( raft_id, instance_id.clone(), @@ -460,7 +460,7 @@ impl ConnectionPool { self.get_or_create_by_raft_id(msg.to)?.send(msg) } - /// Send a request to peer with `id` (see `PeerId`) and wait for the result. + /// Send a request to instance with `id` (see `IdOfInstance`) and wait for the result. /// /// If the request failed, it's a responsibility of the caller /// to re-send it later. @@ -469,7 +469,7 @@ impl ConnectionPool { #[allow(dead_code)] pub fn call_and_wait_timeout<R>( &mut self, - id: &impl PeerId, + id: &impl IdOfInstance, req: R, timeout: Duration, ) -> Result<R::Response> @@ -482,7 +482,7 @@ impl ConnectionPool { rx.recv_timeout(timeout).map_err(|_| Error::Timeout)? } - /// Send a request to peer with `id` (see `PeerId`) and wait for the result. + /// Send a request to instance with `id` (see `InstanceId`) and wait for the result. /// /// If the request failed, it's a responsibility of the caller /// to re-send it later. @@ -490,14 +490,14 @@ impl ConnectionPool { /// **This function yields.** #[allow(dead_code)] #[inline(always)] - pub fn call_and_wait<R>(&mut self, id: &impl PeerId, req: R) -> Result<R::Response> + pub fn call_and_wait<R>(&mut self, id: &impl IdOfInstance, req: R) -> Result<R::Response> where R: Request, { self.call_and_wait_timeout(id, req, Duration::MAX) } - /// Send a request to peer with `id` (see `PeerId`) and wait for the result. + /// Send a request to instance with `id` (see `InstanceId`) and wait for the result. /// /// If the request failed, it's a responsibility of the caller /// to re-send it later. @@ -505,7 +505,7 @@ impl ConnectionPool { /// **This function never yields.** pub fn call<R>( &mut self, - id: &impl PeerId, + id: &impl IdOfInstance, req: R, cb: impl FnOnce(Result<R::Response>) + 'static, ) -> Result<()> @@ -526,23 +526,23 @@ impl Drop for ConnectionPool { } //////////////////////////////////////////////////////////////////////////////// -// PeerId +// IdOfInstance //////////////////////////////////////////////////////////////////////////////// -/// Types implementing this trait can be used to identify a `Peer` when +/// Types implementing this trait can be used to identify a `Instance` when /// accessing ConnectionPool. -pub trait PeerId: std::hash::Hash { +pub trait IdOfInstance: std::hash::Hash { fn get_or_create_in<'p>(&self, pool: &'p mut ConnectionPool) -> Result<&'p mut PoolWorker>; } -impl PeerId for RaftId { +impl IdOfInstance for RaftId { #[inline(always)] fn get_or_create_in<'p>(&self, pool: &'p mut ConnectionPool) -> Result<&'p mut PoolWorker> { pool.get_or_create_by_raft_id(*self) } } -impl PeerId for InstanceId { +impl IdOfInstance for InstanceId { #[inline(always)] fn get_or_create_in<'p>(&self, pool: &'p mut ConnectionPool) -> Result<&'p mut PoolWorker> { pool.get_or_create_by_instance_id(self) @@ -590,12 +590,15 @@ inventory::submit!(crate::InnerTest { .build(); let listen: String = l.eval("return box.info.listen").unwrap(); - let peer = traft::Peer { + let instance = traft::Instance { raft_id: 1337, - ..traft::Peer::default() + ..traft::Instance::default() }; - storage.peers.put(&peer).unwrap(); - storage.peer_addresses.put(peer.raft_id, &listen).unwrap(); + storage.instances.put(&instance).unwrap(); + storage + .peer_addresses + .put(instance.raft_id, &listen) + .unwrap(); tlog!(Info, "TEST: connecting {listen}"); // pool.connect(1337, listen); diff --git a/src/traft/node.rs b/src/traft/node.rs index 062d4396b8..5977714efa 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -35,8 +35,8 @@ use crate::storage::{Clusterwide, ClusterwideSpace, StateKey}; use crate::stringify_cfunc; use crate::traft::rpc; use crate::traft::ContextCoercion as _; +use crate::traft::Instance; use crate::traft::OpDML; -use crate::traft::Peer; use crate::traft::RaftId; use crate::traft::RaftIndex; use crate::traft::RaftTerm; @@ -59,7 +59,7 @@ use crate::traft::Op; use crate::traft::RaftSpaceAccess; use crate::traft::Topology; use crate::traft::TopologyRequest; -use crate::traft::{JoinRequest, UpdatePeerRequest}; +use crate::traft::{JoinRequest, UpdateInstanceRequest}; use super::OpResult; use super::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; @@ -257,10 +257,10 @@ impl Node { }) } - /// Processes the topology request and appends [`Op::PersistPeer`] + /// Processes the topology request and appends [`Op::PersistInstance`] /// entry to the raft log (if successful). /// - /// Returns the resulting peer when the entry is committed. + /// Returns the resulting instance when the entry is committed. /// /// Returns an error if the callee node isn't a raft leader. /// @@ -268,7 +268,7 @@ impl Node { pub fn handle_topology_request_and_wait( &self, req: TopologyRequest, - ) -> traft::Result<Box<traft::Peer>> { + ) -> traft::Result<Box<traft::Instance>> { let (notify_for_address, notify) = self.raw_operation(|node_impl| node_impl.process_topology_request_async(req))?; notify_for_address.map(Notify::recv_any); @@ -384,16 +384,16 @@ impl NodeImpl { let topology: Topology = unwrap_some_or! { self.topology_cache.take_or_drop(¤t_term), { - let mut peers = vec![]; - for peer @ Peer { raft_id, .. } in self.storage.peers.iter()? { - peers.push((peer, self.storage.peer_addresses.try_get(raft_id)?)) + let mut instances = vec![]; + for instance @ Instance { raft_id, .. } in self.storage.instances.iter()? { + instances.push((instance, self.storage.peer_addresses.try_get(raft_id)?)) } let replication_factor = self .storage .state .get(StateKey::ReplicationFactor)? .ok_or_else(|| Error::other("missing replication_factor value in storage"))?; - Topology::from_peers(peers).with_replication_factor(replication_factor) + Topology::from_instances(instances).with_replication_factor(replication_factor) } }; @@ -444,7 +444,7 @@ impl NodeImpl { return Ok(()); } - // TODO check it's not a MsgPropose with op::PersistPeer. + // TODO check it's not a MsgPropose with op::PersistInstance. // TODO check it's not a MsgPropose with ConfChange. self.raw_node.step(msg) } @@ -455,7 +455,7 @@ impl NodeImpl { } } - /// Processes the topology request and appends [`Op::PersistPeer`] + /// Processes the topology request and appends [`Op::PersistInstance`] /// entry to the raft log (if successful). /// /// Returns an error if the callee node isn't a Raft leader. @@ -468,7 +468,7 @@ impl NodeImpl { let topology = self.topology_mut()?; // FIXME: remove this once we introduce some 'async' stuff let notify_for_address; - let peer = match req { + let instance = match req { TopologyRequest::Join(JoinRequest { instance_id, replicaset_id, @@ -476,7 +476,7 @@ impl NodeImpl { failure_domain, .. }) => { - let (peer, address) = topology + let (instance, address) = topology .join( instance_id, replicaset_id, @@ -485,7 +485,7 @@ impl NodeImpl { ) .map_err(RaftError::ConfChangeError)?; let peer_address = traft::PeerAddress { - raft_id: peer.raft_id, + raft_id: instance.raft_id, address, }; let op = @@ -495,17 +495,17 @@ impl NodeImpl { let ctx = traft::EntryContextNormal::new(lc, op); // Important! Read bellow self.raw_node.propose(ctx.to_bytes(), vec![])?; - peer + instance } - TopologyRequest::UpdatePeer(req) => { + TopologyRequest::UpdateInstance(req) => { notify_for_address = None; topology - .update_peer(req) + .update_instance(req) .map_err(RaftError::ConfChangeError)? } }; let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, Op::persist_peer(peer)); + let ctx = traft::EntryContextNormal::new(lc, Op::persist_instance(instance)); // Important! Calling `raw_node.propose()` may result in // `ProposalDropped` error, but the topology has already been @@ -630,7 +630,7 @@ impl NodeImpl { let result = entry .op() .unwrap_or(&traft::Op::Nop) - .on_commit(&self.storage.peers); + .on_commit(&self.storage.instances); if let Some(lc) = entry.lc() { if let Some(notify) = self.notifications.remove(lc) { @@ -638,9 +638,10 @@ impl NodeImpl { } } - if let Some(traft::Op::PersistPeer { peer, .. }) = entry.op() { + if let Some(traft::Op::PersistInstance { instance, .. }) = entry.op() { *topology_changed = true; - if peer.current_grade == CurrentGradeVariant::Expelled && peer.raft_id == self.raft_id() + if instance.current_grade == CurrentGradeVariant::Expelled + && instance.raft_id == self.raft_id() { // cannot exit during a transaction *expelled = true; @@ -972,7 +973,7 @@ fn raft_conf_change_loop( continue 'governor; } - let peers = storage.peers.all_peers().unwrap(); + let instances = storage.instances.all_instances().unwrap(); let term = status.get().term; let cluster_id = raft_storage.cluster_id().unwrap().unwrap(); let node = global().expect("must be initialized"); @@ -981,7 +982,7 @@ fn raft_conf_change_loop( // conf change let voters = raft_storage.voters().unwrap().unwrap_or_default(); let learners = raft_storage.learners().unwrap().unwrap_or_default(); - if let Some(conf_change) = raft_conf_change(&peers, &voters, &learners) { + if let Some(conf_change) = raft_conf_change(&instances, &voters, &learners) { // main_loop gives the warranty that every ProposeConfChange // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only @@ -996,30 +997,33 @@ fn raft_conf_change_loop( //////////////////////////////////////////////////////////////////////// // offline/expel - let to_offline = peers + let to_offline = instances .iter() - .filter(|peer| peer.current_grade != CurrentGradeVariant::Offline) + .filter(|instance| instance.current_grade != CurrentGradeVariant::Offline) // TODO: process them all, not just the first one - .find(|peer| { - let (target, current) = (peer.target_grade.variant, peer.current_grade.variant); + .find(|instance| { + let (target, current) = ( + instance.target_grade.variant, + instance.current_grade.variant, + ); matches!(target, TargetGradeVariant::Offline) || !matches!(current, CurrentGradeVariant::Expelled) && matches!(target, TargetGradeVariant::Expelled) }); - if let Some(peer) = to_offline { + if let Some(instance) = to_offline { tlog!( Info, "processing {} {} -> {}", - peer.instance_id, - peer.current_grade, - peer.target_grade + instance.instance_id, + instance.current_grade, + instance.target_grade ); // transfer leadership, if we're the one who goes offline - if peer.raft_id == node.raft_id { - if let Some(new_leader) = maybe_responding(&peers).find(|peer| { + if instance.raft_id == node.raft_id { + if let Some(new_leader) = maybe_responding(&instances).find(|instance| { // FIXME: linear search - voters.contains(&peer.raft_id) + voters.contains(&instance.raft_id) }) { tlog!( Info, @@ -1032,19 +1036,19 @@ fn raft_conf_change_loop( } } - let replicaset_id = &peer.replicaset_id; + let replicaset_id = &instance.replicaset_id; // choose a new replicaset master if needed let res = (|| -> traft::Result<_> { let replicaset = storage.replicasets.get(replicaset_id)?; if replicaset - .map(|r| r.master_id == peer.instance_id) + .map(|r| r.master_id == instance.instance_id) .unwrap_or(false) { let new_master = - maybe_responding(&peers).find(|p| p.replicaset_id == replicaset_id); - if let Some(peer) = new_master { + maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id); + if let Some(instance) = new_master { let mut ops = UpdateOps::new(); - ops.assign("master_id", &peer.instance_id)?; + ops.assign("master_id", &instance.instance_id)?; let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?; @@ -1067,18 +1071,18 @@ fn raft_conf_change_loop( // reconfigure vshard storages and routers let res = (|| -> traft::Result<_> { let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&peers) - .filter(|peer| { - peer.current_grade == CurrentGradeVariant::ShardingInitialized - || peer.current_grade == CurrentGradeVariant::Online + let reqs = maybe_responding(&instances) + .filter(|instance| { + instance.current_grade == CurrentGradeVariant::ShardingInitialized + || instance.current_grade == CurrentGradeVariant::Online }) - .map(|peer| { + .map(|instance| { tlog!(Info, "calling rpc::sharding"; - "instance_id" => %peer.instance_id + "instance_id" => %instance.instance_id ); ( - peer.instance_id.clone(), + instance.instance_id.clone(), sharding::Request { term, commit, @@ -1101,34 +1105,36 @@ fn raft_conf_change_loop( continue 'governor; } - // update peer's CurrentGrade - let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id.clone()) - .with_current_grade(peer.target_grade.into()); + // update instance's CurrentGrade + let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id.clone()) + .with_current_grade(instance.target_grade.into()); tlog!(Info, - "handling UpdatePeerRequest"; + "handling UpdateInstanceRequest"; "current_grade" => %req.current_grade.expect("just set"), "instance_id" => %req.instance_id, ); if let Err(e) = node.handle_topology_request_and_wait(req.into()) { tlog!(Warning, - "failed handling UpdatePeerRequest: {e}"; - "instance_id" => %peer.instance_id, + "failed handling UpdateInstanceRequest: {e}"; + "instance_id" => %instance.instance_id, ); // TODO: don't hard code timeout event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); continue 'governor; } - let replicaset_peers = storage - .peers - .replicaset_peers(replicaset_id) + let replicaset_instances = storage + .instances + .replicaset_instances(replicaset_id) .expect("storage error") - .filter(|peer| !peer.is_expelled()) + .filter(|instance| !instance.is_expelled()) .collect::<Vec<_>>(); - let may_respond = replicaset_peers.iter().filter(|peer| peer.may_respond()); + let may_respond = replicaset_instances + .iter() + .filter(|instance| instance.may_respond()); // Check if it makes sense to call box.ctl.promote, // otherwise we risk unpredictable delays - if replicaset_peers.len() / 2 + 1 > may_respond.count() { + if replicaset_instances.len() / 2 + 1 > may_respond.count() { tlog!(Warning, "replicaset lost quorum"; "replicaset_id" => %replicaset_id, @@ -1172,15 +1178,15 @@ fn raft_conf_change_loop( // raft sync // TODO: putting each stage in a different function // will make the control flow more readable - let to_sync = peers.iter().find(|peer| { - peer.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online) - || peer.is_reincarnated() + let to_sync = instances.iter().find(|instance| { + instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online) + || instance.is_reincarnated() }); - if let Some(peer) = to_sync { + if let Some(instance) = to_sync { let (rx, tx) = fiber::Channel::new(1).into_clones(); let commit = raft_storage.commit().unwrap().unwrap(); pool.call( - &peer.raft_id, + &instance.raft_id, sync::Request { commit, timeout: SYNC_TIMEOUT, @@ -1191,25 +1197,27 @@ fn raft_conf_change_loop( let res = rx.recv().expect("ought not fail"); let res = res.and_then(|sync::Response { commit }| { // TODO: change `Info` to `Debug` - tlog!(Info, "peer synced"; + tlog!(Info, "instance synced"; "commit" => commit, - "instance_id" => &*peer.instance_id, + "instance_id" => &*instance.instance_id, ); - let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::raft_synced(peer.target_grade.incarnation)); + let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::raft_synced( + instance.target_grade.incarnation, + )); global() .expect("can't be deinitialized") .handle_topology_request_and_wait(req.into()) }); match res { - Ok(peer) => { + Ok(instance) => { tlog!(Info, "raft sync processed"); - debug_assert!(peer.current_grade == CurrentGradeVariant::RaftSynced); + debug_assert!(instance.current_grade == CurrentGradeVariant::RaftSynced); } Err(e) => { tlog!(Warning, "raft sync failed: {e}"; - "instance_id" => %peer.instance_id, + "instance_id" => %instance.instance_id, ); // TODO: don't hard code timeout @@ -1223,18 +1231,18 @@ fn raft_conf_change_loop( //////////////////////////////////////////////////////////////////////// // replication - let to_replicate = peers + let to_replicate = instances .iter() - // TODO: find all such peers in a given replicaset, + // TODO: find all such instances in a given replicaset, // not just the first one - .find(|peer| { - peer.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online) + .find(|instance| { + instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online) }); - if let Some(peer) = to_replicate { - let replicaset_id = &peer.replicaset_id; - let replicaset_iids = maybe_responding(&peers) - .filter(|peer| peer.replicaset_id == replicaset_id) - .map(|peer| peer.instance_id.clone()) + if let Some(instance) = to_replicate { + let replicaset_id = &instance.replicaset_id; + let replicaset_iids = maybe_responding(&instances) + .filter(|instance| instance.replicaset_id == replicaset_id) + .map(|instance| instance.instance_id.clone()) .collect::<Vec<_>>(); let res = (|| -> traft::Result<_> { @@ -1252,17 +1260,19 @@ fn raft_conf_change_loop( // TODO: don't hard code timeout let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (peer_iid, resp) in res { + for (instance_iid, resp) in res { let replication::Response { lsn } = resp?; // TODO: change `Info` to `Debug` - tlog!(Info, "configured replication with peer"; - "instance_id" => &*peer_iid, + tlog!(Info, "configured replication with instance"; + "instance_id" => &*instance_iid, "lsn" => lsn, ); } - let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::replicated(peer.target_grade.incarnation)); + let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::replicated( + instance.target_grade.incarnation, + )); node.handle_topology_request_and_wait(req.into())?; Ok(()) @@ -1276,23 +1286,23 @@ fn raft_conf_change_loop( let res = (|| -> Result<_, Error> { let master_id = - if let Some(replicaset) = storage.replicasets.get(&peer.replicaset_id)? { + if let Some(replicaset) = storage.replicasets.get(&instance.replicaset_id)? { Cow::Owned(replicaset.master_id) } else { let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; let req = traft::OpDML::insert( ClusterwideSpace::Replicaset, &traft::Replicaset { - replicaset_id: peer.replicaset_id.clone(), - replicaset_uuid: peer.replicaset_uuid.clone(), - master_id: peer.instance_id.clone(), + replicaset_id: instance.replicaset_id.clone(), + replicaset_uuid: instance.replicaset_uuid.clone(), + master_id: instance.instance_id.clone(), weight: if vshard_bootstrapped { 0. } else { 1. }, current_schema_version: 0, }, )?; // TODO: don't hard code the timeout node.propose_and_wait(req, Duration::from_secs(3))??; - Cow::Borrowed(&peer.instance_id) + Cow::Borrowed(&instance.instance_id) }; let commit = raft_storage.commit()?.unwrap(); @@ -1308,7 +1318,7 @@ fn raft_conf_change_loop( )?; tlog!(Debug, "promoted replicaset master"; "instance_id" => %master_id, - "replicaset_id" => %peer.replicaset_id, + "replicaset_id" => %instance.replicaset_id, ); Ok(()) })(); @@ -1325,39 +1335,39 @@ fn raft_conf_change_loop( //////////////////////////////////////////////////////////////////////// // init sharding - let to_shard = peers.iter().find(|peer| { - peer.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online) + let to_shard = instances.iter().find(|instance| { + instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online) }); - if let Some(peer) = to_shard { + if let Some(instance) = to_shard { let res = (|| -> traft::Result<()> { let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&peers).map(|peer| { + let reqs = maybe_responding(&instances).map(|instance| { ( - peer.instance_id.clone(), + instance.instance_id.clone(), sharding::Request { term, commit, timeout: SYNC_TIMEOUT, - bootstrap: !vshard_bootstrapped && peer.raft_id == node.raft_id, + bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id, }, ) }); // TODO: don't hard code timeout let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (peer_iid, resp) in res { + for (instance_iid, resp) in res { let sharding::Response {} = resp?; // TODO: change `Info` to `Debug` - tlog!(Info, "initialized sharding with peer"; - "instance_id" => &*peer_iid, + tlog!(Info, "initialized sharding with instance"; + "instance_id" => &*instance_iid, ); } - let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) + let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::sharding_initialized( - peer.target_grade.incarnation, + instance.target_grade.incarnation, )); node.handle_topology_request_and_wait(req.into())?; @@ -1389,9 +1399,9 @@ fn raft_conf_change_loop( let replicasets = storage.replicasets.iter()?; let masters = replicasets.map(|r| r.master_id).collect::<HashSet<_>>(); let commit = raft_storage.commit()?.unwrap(); - let reqs = maybe_responding(&peers) - .filter(|peer| masters.contains(&peer.instance_id)) - .map(|peer| peer.instance_id.clone()) + let reqs = maybe_responding(&instances) + .filter(|instance| masters.contains(&instance.instance_id)) + .map(|instance| instance.instance_id.clone()) .zip(repeat(replication::promote::Request { term, commit, @@ -1399,9 +1409,9 @@ fn raft_conf_change_loop( })); // TODO: don't hard code timeout let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (peer_iid, resp) in res { + for (instance_iid, resp) in res { resp?; - tlog!(Debug, "promoted replicaset master"; "instance_id" => %peer_iid); + tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_iid); } Ok(()) })(); @@ -1416,15 +1426,15 @@ fn raft_conf_change_loop( //////////////////////////////////////////////////////////////////////// // sharding weights - let to_update_weights = peers.iter().find(|peer| { - peer.has_grades( + let to_update_weights = instances.iter().find(|instance| { + instance.has_grades( CurrentGradeVariant::ShardingInitialized, TargetGradeVariant::Online, ) }); - if let Some(peer) = to_update_weights { + if let Some(instance) = to_update_weights { let res = if let Some(added_weights) = - get_weight_changes(maybe_responding(&peers), &storage) + get_weight_changes(maybe_responding(&instances), &storage) { (|| -> traft::Result<()> { for (replicaset_id, weight) in added_weights { @@ -1441,9 +1451,10 @@ fn raft_conf_change_loop( )??; } - let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone()); + let instance_ids = + maybe_responding(&instances).map(|instance| instance.instance_id.clone()); let commit = raft_storage.commit()?.unwrap(); - let reqs = peer_ids.zip(repeat(sharding::Request { + let reqs = instance_ids.zip(repeat(sharding::Request { term, commit, timeout: SYNC_TIMEOUT, @@ -1452,37 +1463,39 @@ fn raft_conf_change_loop( // TODO: don't hard code timeout let res = call_all(&mut pool, reqs, Duration::from_secs(3))?; - for (peer_iid, resp) in res { + for (instance_iid, resp) in res { resp?; // TODO: change `Info` to `Debug` - tlog!(Info, "peer is online"; "instance_id" => &*peer_iid); + tlog!(Info, "instance is online"; "instance_id" => &*instance_iid); } - let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::online(peer.target_grade.incarnation)); + let req = UpdateInstanceRequest::new(instance.instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::online( + instance.target_grade.incarnation, + )); node.handle_topology_request_and_wait(req.into())?; Ok(()) })() } else { (|| -> traft::Result<()> { - let to_online = peers.iter().filter(|peer| { - peer.has_grades( + let to_online = instances.iter().filter(|instance| { + instance.has_grades( CurrentGradeVariant::ShardingInitialized, TargetGradeVariant::Online, ) }); - for Peer { + for Instance { instance_id, target_grade, .. } in to_online { let cluster_id = cluster_id.clone(); - let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id) + let req = UpdateInstanceRequest::new(instance_id.clone(), cluster_id) .with_current_grade(CurrentGrade::online(target_grade.incarnation)); node.handle_topology_request_and_wait(req.into())?; // TODO: change `Info` to `Debug` - tlog!(Info, "peer is online"; "instance_id" => &**instance_id); + tlog!(Info, "instance is online"; "instance_id" => &**instance_id); } Ok(()) })() @@ -1515,14 +1528,14 @@ fn raft_conf_change_loop( .get(rid.to_string().as_str()) .unwrap() .unwrap(); - let peer = storage.peers.get(&replicaset.master_id).unwrap(); + let instance = storage.instances.get(&replicaset.master_id).unwrap(); let req = rpc::migration::apply::Request { term, commit, timeout: SYNC_TIMEOUT, migration_id: migration.id, }; - let res = pool.call_and_wait(&peer.raft_id, req); + let res = pool.call_and_wait(&instance.raft_id, req); match res { Ok(_) => { let mut ops = UpdateOps::new(); @@ -1568,7 +1581,7 @@ fn raft_conf_change_loop( ) -> traft::Result<Vec<(I, traft::Result<R::Response>)>> where R: traft::rpc::Request, - I: traft::network::PeerId + Clone + std::fmt::Debug + 'static, + I: traft::network::IdOfInstance + Clone + std::fmt::Debug + 'static, { // TODO: this crap is only needed to wait until results of all // the calls are ready. There are several ways to rafactor this: @@ -1585,8 +1598,8 @@ fn raft_conf_change_loop( static mut SENT_COUNT: usize = 0; unsafe { SENT_COUNT = 0 }; let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones(); - let peer_count = reqs.len(); - let (rx, tx) = fiber::Channel::new(peer_count as _).into_clones(); + let instance_count = reqs.len(); + let (rx, tx) = fiber::Channel::new(instance_count as _).into_clones(); for (id, req) in reqs { let tx = tx.clone(); let cond_tx = cond_tx.clone(); @@ -1594,7 +1607,7 @@ fn raft_conf_change_loop( pool.call(&id, req, move |res| { tx.send((id_copy, res)).expect("mustn't fail"); unsafe { SENT_COUNT += 1 }; - if unsafe { SENT_COUNT } == peer_count { + if unsafe { SENT_COUNT } == instance_count { cond_tx.signal() } }) @@ -1605,20 +1618,20 @@ fn raft_conf_change_loop( return Err(Error::Timeout); } - Ok(rx.into_iter().take(peer_count).collect()) + Ok(rx.into_iter().take(instance_count).collect()) } #[inline(always)] fn get_weight_changes<'p>( - peers: impl IntoIterator<Item = &'p Peer>, + instances: impl IntoIterator<Item = &'p Instance>, storage: &Clusterwide, ) -> Option<ReplicasetWeights> { let replication_factor = storage.state.replication_factor().expect("storage error"); let replicaset_weights = storage.replicasets.weights().expect("storage error"); let mut replicaset_sizes = HashMap::new(); let mut weight_changes = HashMap::new(); - for peer @ Peer { replicaset_id, .. } in peers { - if !peer.may_respond() { + for instance @ Instance { replicaset_id, .. } in instances { + if !instance.may_respond() { continue; } let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0); @@ -1631,8 +1644,8 @@ fn raft_conf_change_loop( } #[inline(always)] - fn maybe_responding(peers: &[Peer]) -> impl Iterator<Item = &Peer> { - peers.iter().filter(|peer| peer.may_respond()) + fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> { + instances.iter().filter(|instance| instance.may_respond()) } } diff --git a/src/traft/rpc/expel.rs b/src/traft/rpc/expel.rs index 3883176ecb..6cdace745a 100644 --- a/src/traft/rpc/expel.rs +++ b/src/traft/rpc/expel.rs @@ -1,6 +1,6 @@ use crate::traft; use crate::traft::Result; -use crate::traft::{error::Error, node, InstanceId, UpdatePeerRequest}; +use crate::traft::{error::Error, node, InstanceId, UpdateInstanceRequest}; crate::define_rpc_request! { fn proc_expel_on_leader(req: Request) -> Result<Response> { @@ -22,7 +22,7 @@ crate::define_rpc_request! { return Err(Error::NotALeader); } - let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id) + let req2 = UpdateInstanceRequest::new(req.instance_id, req.cluster_id) .with_target_grade(traft::TargetGradeVariant::Expelled); node.handle_topology_request_and_wait(req2.into())?; @@ -32,7 +32,7 @@ crate::define_rpc_request! { /// A request to expel an instance. /// /// This request is only handled by the leader. - /// Use [`redirect::Request`] for automatic redirection from any peer to + /// Use [`redirect::Request`] for automatic redirection from any instance to /// leader. pub struct Request { pub cluster_id: String, @@ -57,7 +57,7 @@ pub mod redirect { /// A request to expel an instance. /// - /// Can be sent to any peer and will be automatically redirected to + /// Can be sent to any instance and will be automatically redirected to /// leader. pub struct Request(pub super::Request); pub struct Response {} diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs index f6acb64d8d..109305c6e6 100644 --- a/src/traft/rpc/join.rs +++ b/src/traft/rpc/join.rs @@ -1,10 +1,10 @@ use crate::traft::{ - error::Error, node, FailureDomain, InstanceId, Peer, PeerAddress, ReplicasetId, Result, + error::Error, node, FailureDomain, Instance, InstanceId, PeerAddress, ReplicasetId, Result, }; #[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)] pub struct OkResponse { - pub peer: Box<Peer>, + pub instance: Box<Instance>, pub peer_addresses: Vec<PeerAddress>, pub box_replication: Vec<String>, // Other parameters necessary for box.cfg() @@ -28,18 +28,18 @@ crate::define_rpc_request! { } match node.handle_topology_request_and_wait(req.into()) { - Ok(peer) => { + Ok(instance) => { let mut box_replication = vec![]; - for replica in node.storage.peers.replicaset_peers(&peer.replicaset_id)? { + for replica in node.storage.instances.replicaset_instances(&instance.replicaset_id)? { box_replication.extend(node.storage.peer_addresses.get(replica.raft_id)?); } - // A joined peer needs to communicate with other nodes. + // A joined instance needs to communicate with other nodes. // TODO: limit the number of entries sent to reduce response size. let peer_addresses = node.storage.peer_addresses.iter()?.collect(); Ok(Response::Ok(OkResponse { - peer, + instance, peer_addresses, box_replication, })) diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs index 8a7b373856..345d398932 100644 --- a/src/traft/rpc/mod.rs +++ b/src/traft/rpc/mod.rs @@ -16,7 +16,7 @@ pub mod migration; pub mod replication; pub mod sharding; pub mod sync; -pub mod update_peer; +pub mod update_instance; /// Types implementing this trait represent an RPC's (remote procedure call) /// arguments. This trait contains information about the request. diff --git a/src/traft/rpc/replication.rs b/src/traft/rpc/replication.rs index 58a6ca45b3..e176260ca0 100644 --- a/src/traft/rpc/replication.rs +++ b/src/traft/rpc/replication.rs @@ -1,4 +1,4 @@ -use crate::storage::peer_field::ReplicasetId; +use crate::storage::instance_field::ReplicasetId; use crate::tarantool::set_cfg_field; use crate::traft::{self, node, RaftIndex, RaftTerm, Result}; use crate::InstanceId; @@ -12,11 +12,11 @@ crate::define_rpc_request! { super::sync::wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?; let storage = &node.storage; - let rsid = storage.peers.peer_field::<ReplicasetId>(&node.raft_id())?; + let rsid = storage.instances.instance_field::<ReplicasetId>(&node.raft_id())?; let mut box_replication = vec![]; - for replica in storage.peers.replicaset_peers(&rsid)? { + for replica in storage.instances.replicaset_instances(&rsid)? { let Some(address) = storage.peer_addresses.get(replica.raft_id)? else { - crate::tlog!(Warning, "address unknown for peer"; + crate::tlog!(Warning, "address unknown for instance"; "raft_id" => replica.raft_id, ); continue; @@ -61,7 +61,7 @@ pub mod promote { Ok(Response {}) } - /// Request to promote peer to tarantool replication leader. + /// Request to promote instance to tarantool replication leader. pub struct Request { pub term: RaftTerm, pub commit: RaftIndex, diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs index 36ca13f7f9..a68e9488f5 100644 --- a/src/traft/rpc/sharding.rs +++ b/src/traft/rpc/sharding.rs @@ -124,7 +124,7 @@ pub mod cfg { .map(|r| (r.replicaset_id.clone(), r)) .collect(); let mut sharding: HashMap<String, Replicaset> = HashMap::new(); - for peer in storage.peers.iter()? { + for peer in storage.instances.iter()? { if !peer.may_respond() { continue; } diff --git a/src/traft/rpc/update_peer.rs b/src/traft/rpc/update_peer.rs deleted file mode 100644 index da3e763206..0000000000 --- a/src/traft/rpc/update_peer.rs +++ /dev/null @@ -1,81 +0,0 @@ -use crate::tlog; -use crate::traft::FailureDomain; -use crate::traft::Result; -use crate::traft::{error::Error, node, InstanceId}; -use crate::traft::{CurrentGrade, TargetGradeVariant}; - -crate::define_rpc_request! { - fn proc_update_peer(req: Request) -> Result<Response> { - let node = node::global()?; - - let cluster_id = node - .raft_storage - .cluster_id()? - .expect("cluster_id is set at boot"); - - if req.cluster_id != cluster_id { - return Err(Error::ClusterIdMismatch { - instance_cluster_id: req.cluster_id, - cluster_cluster_id: cluster_id, - }); - } - - let mut req = req; - let instance_id = &*req.instance_id; - if let Some(current_grade) = req.current_grade.take() { - tlog!(Warning, "attempt to change current_grade by peer"; - "instance_id" => instance_id, - "current_grade" => %current_grade, - ); - } - match node.handle_topology_request_and_wait(req.into()) { - Ok(_) => Ok(Response::Ok {}), - Err(Error::NotALeader) => Ok(Response::ErrNotALeader), - Err(e) => Err(e), - } - } - - /// Request to update the instance in the storage. - #[derive(Default)] - pub struct Request { - pub instance_id: InstanceId, - pub cluster_id: String, - /// Only allowed to be set by leader - pub current_grade: Option<CurrentGrade>, - /// Can be set by peer - pub target_grade: Option<TargetGradeVariant>, - pub failure_domain: Option<FailureDomain>, - } - - /// Response to a [`Request`] - pub enum Response { - Ok, - ErrNotALeader, - } -} - -impl Request { - #[inline] - pub fn new(instance_id: InstanceId, cluster_id: String) -> Self { - Self { - instance_id, - cluster_id, - ..Request::default() - } - } - #[inline] - pub fn with_current_grade(mut self, value: CurrentGrade) -> Self { - self.current_grade = Some(value); - self - } - #[inline] - pub fn with_target_grade(mut self, value: TargetGradeVariant) -> Self { - self.target_grade = Some(value); - self - } - #[inline] - pub fn with_failure_domain(mut self, value: FailureDomain) -> Self { - self.failure_domain = Some(value); - self - } -} diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 0f10029389..b16e441f56 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -4,8 +4,8 @@ use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; use crate::traft::Address; use crate::traft::FailureDomain; -use crate::traft::Peer; -use crate::traft::UpdatePeerRequest; +use crate::traft::Instance; +use crate::traft::UpdateInstanceRequest; use crate::traft::{CurrentGrade, CurrentGradeVariant, Grade, TargetGrade, TargetGradeVariant}; use crate::traft::{InstanceId, RaftId, ReplicasetId}; use crate::util::Uppercase; @@ -15,13 +15,13 @@ pub struct Topology { max_raft_id: RaftId, failure_domain_names: HashSet<Uppercase>, - instance_map: HashMap<InstanceId, (Peer, Address)>, + instance_map: HashMap<InstanceId, (Instance, Address)>, replicaset_map: BTreeMap<ReplicasetId, HashSet<InstanceId>>, } impl Topology { #[inline(always)] - pub fn from_peers(peers: impl IntoIterator<Item = (Peer, Address)>) -> Self { + pub fn from_instances(instances: impl IntoIterator<Item = (Instance, Address)>) -> Self { let mut ret = Self { replication_factor: 1, max_raft_id: 0, @@ -30,8 +30,8 @@ impl Topology { replicaset_map: Default::default(), }; - for (peer, address) in peers { - ret.put_peer(peer, address); + for (instance, address) in instances { + ret.put_instance(instance, address); } ret @@ -42,22 +42,22 @@ impl Topology { self } - fn put_peer(&mut self, peer: Peer, address: Address) { - self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id); + fn put_instance(&mut self, instance: Instance, address: Address) { + self.max_raft_id = std::cmp::max(self.max_raft_id, instance.raft_id); - let instance_id = peer.instance_id.clone(); - let replicaset_id = peer.replicaset_id.clone(); + let instance_id = instance.instance_id.clone(); + let replicaset_id = instance.replicaset_id.clone(); - if let Some((old_peer, ..)) = self.instance_map.remove(&instance_id) { + if let Some((old_instance, ..)) = self.instance_map.remove(&instance_id) { self.replicaset_map - .get_mut(&old_peer.replicaset_id) - .map(|r| r.remove(&old_peer.instance_id)); + .get_mut(&old_instance.replicaset_id) + .map(|r| r.remove(&old_instance.instance_id)); } self.failure_domain_names - .extend(peer.failure_domain.names().cloned()); + .extend(instance.failure_domain.names().cloned()); self.instance_map - .insert(instance_id.clone(), (peer, address)); + .insert(instance_id.clone(), (instance, address)); self.replicaset_map .entry(replicaset_id) .or_default() @@ -82,11 +82,11 @@ impl Topology { } fn choose_replicaset_id(&self, failure_domain: &FailureDomain) -> ReplicasetId { - 'next_replicaset: for (replicaset_id, peers) in self.replicaset_map.iter() { - if peers.len() < self.replication_factor as usize { - for peer_id in peers { - let (peer, ..) = self.instance_map.get(peer_id).unwrap(); - if peer.failure_domain.intersects(failure_domain) { + 'next_replicaset: for (replicaset_id, instances) in self.replicaset_map.iter() { + if instances.len() < self.replication_factor as usize { + for instance_id in instances { + let (instance, ..) = self.instance_map.get(instance_id).unwrap(); + if instance.failure_domain.intersects(failure_domain) { continue 'next_replicaset; } } @@ -126,10 +126,10 @@ impl Topology { replicaset_id: Option<ReplicasetId>, advertise: Address, failure_domain: FailureDomain, - ) -> Result<(Peer, Address), String> { + ) -> Result<(Instance, Address), String> { if let Some(id) = &instance_id { - let existing_peer = self.instance_map.get(id); - if matches!(existing_peer, Some((peer, ..)) if peer.is_online()) { + let existing_instance = self.instance_map.get(id); + if matches!(existing_instance, Some((instance, ..)) if instance.is_online()) { let e = format!("{} is already joined", id); return Err(e); } @@ -145,7 +145,7 @@ impl Topology { replicaset_id.unwrap_or_else(|| self.choose_replicaset_id(&failure_domain)); let replicaset_uuid = replicaset_uuid(&replicaset_id); - let peer = Peer { + let instance = Instance { instance_id, instance_uuid, raft_id, @@ -156,22 +156,22 @@ impl Topology { failure_domain, }; - self.put_peer(peer.clone(), advertise.clone()); - Ok((peer, advertise)) + self.put_instance(instance.clone(), advertise.clone()); + Ok((instance, advertise)) } - pub fn update_peer(&mut self, req: UpdatePeerRequest) -> Result<Peer, String> { + pub fn update_instance(&mut self, req: UpdateInstanceRequest) -> Result<Instance, String> { let this = self as *const Self; - let (peer, ..) = self + let (instance, ..) = self .instance_map .get_mut(&req.instance_id) .ok_or_else(|| format!("unknown instance {}", req.instance_id))?; - if peer.current_grade == CurrentGradeVariant::Expelled + if instance.current_grade == CurrentGradeVariant::Expelled && !matches!( req, - UpdatePeerRequest { + UpdateInstanceRequest { target_grade: None, current_grade: Some(current_grade), failure_domain: None, @@ -180,8 +180,8 @@ impl Topology { ) { return Err(format!( - "cannot update expelled peer \"{}\"", - peer.instance_id + "cannot update expelled instance \"{}\"", + instance.instance_id )); } @@ -190,36 +190,36 @@ impl Topology { // the function unsafe { &*this }.check_required_failure_domain(&fd)?; self.failure_domain_names.extend(fd.names().cloned()); - peer.failure_domain = fd; + instance.failure_domain = fd; } if let Some(value) = req.current_grade { - peer.current_grade = value; + instance.current_grade = value; } if let Some(variant) = req.target_grade { let incarnation = match variant { - TargetGradeVariant::Online => peer.target_grade.incarnation + 1, - _ => peer.current_grade.incarnation, + TargetGradeVariant::Online => instance.target_grade.incarnation + 1, + _ => instance.current_grade.incarnation, }; - peer.target_grade = Grade { + instance.target_grade = Grade { variant, incarnation, }; } - Ok(peer.clone()) + Ok(instance.clone()) } } -/// Create first peer in the cluster -pub fn initial_peer( +/// Create first instance in the cluster +pub fn initial_instance( instance_id: Option<InstanceId>, replicaset_id: Option<ReplicasetId>, advertise: Address, failure_domain: FailureDomain, -) -> Result<(Peer, Address), String> { - let mut topology = Topology::from_peers(vec![]); +) -> Result<(Instance, Address), String> { + let mut topology = Topology::from_instances(vec![]); topology.join(instance_id, replicaset_id, advertise, failure_domain) } @@ -231,8 +231,8 @@ mod tests { use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; use crate::traft::FailureDomain; - use crate::traft::Peer; - use crate::traft::UpdatePeerRequest; + use crate::traft::Instance; + use crate::traft::UpdateInstanceRequest; use crate::traft::{CurrentGrade, Grade, TargetGrade, TargetGradeVariant}; use pretty_assertions::assert_eq; @@ -252,29 +252,29 @@ mod tests { } } - trait ModifyUpdatePeerRequest { - fn modify(self, req: UpdatePeerRequest) -> UpdatePeerRequest; + trait ModifyUpdateInstanceRequest { + fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest; } - impl ModifyUpdatePeerRequest for CurrentGrade { - fn modify(self, req: UpdatePeerRequest) -> UpdatePeerRequest { + impl ModifyUpdateInstanceRequest for CurrentGrade { + fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest { req.with_current_grade(self) } } - impl ModifyUpdatePeerRequest for TargetGradeVariant { - fn modify(self, req: UpdatePeerRequest) -> UpdatePeerRequest { + impl ModifyUpdateInstanceRequest for TargetGradeVariant { + fn modify(self, req: UpdateInstanceRequest) -> UpdateInstanceRequest { req.with_target_grade(self) } } - macro_rules! peers { - [ $( ( $($peer:tt)+ ) ),* $(,)? ] => { - vec![$( (peer!($($peer)+), "who-cares.biz".into()) ),*] + macro_rules! instances { + [ $( ( $($instance:tt)+ ) ),* $(,)? ] => { + vec![$( (instance!($($instance)+), "who-cares.biz".into()) ),*] }; } - macro_rules! peer { + macro_rules! instance { ( $raft_id:expr, $instance_id:literal, @@ -284,7 +284,7 @@ mod tests { $(, $failure_domain:expr)? $(,)? ) => { - Peer { + Instance { raft_id: $raft_id, instance_id: $instance_id.into(), replicaset_id: $replicaset_id.into(), @@ -298,7 +298,7 @@ mod tests { $( let _f = $failure_domain; )? _f }, - .. Peer::default() + .. Instance::default() } }; } @@ -333,9 +333,9 @@ mod tests { $(, $target_grade:expr)?)? $(,)? ) => { - $topology.update_peer( + $topology.update_instance( { - let req = UpdatePeerRequest::new($instance_id.into(), "".into()); + let req = UpdateInstanceRequest::new($instance_id.into(), "".into()); $( let req = $current_grade.modify(req); $( let req = $target_grade.modify(req); )? @@ -352,8 +352,8 @@ mod tests { $instance_id:expr, $failure_domain:expr $(,)? ) => { - $topology.update_peer( - UpdatePeerRequest::new($instance_id.into(), "".into()) + $topology.update_instance( + UpdateInstanceRequest::new($instance_id.into(), "".into()) .with_failure_domain($failure_domain), ) }; @@ -368,41 +368,41 @@ mod tests { #[test] fn test_simple() { - let mut topology = Topology::from_peers(vec![]).with_replication_factor(1); + let mut topology = Topology::from_instances(vec![]).with_replication_factor(1); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (peer!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); assert_eq!( join!(topology, None, Some("R3"), "addr:1").unwrap(), - (peer!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); assert_eq!( join!(topology, Some("I4"), None, "addr:1").unwrap(), - (peer!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); - let mut topology = Topology::from_peers( - peers![(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))] + let mut topology = Topology::from_instances( + instances![(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))] ).with_replication_factor(1); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (peer!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); } #[test] fn test_override() { - let mut topology = Topology::from_peers(peers![ + let mut topology = Topology::from_instances(instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i2", "r2-original", CurrentGrade::offline(0), TargetGrade::offline(0)), ]) @@ -432,7 +432,7 @@ mod tests { // Disruption isn't destructive if auto-expel allows (TODO). assert_eq!( join!(topology, Some("i2"), None, "inactive:2").unwrap(), - (peer!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "inactive:2".into()), + (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "inactive:2".into()), // Attention: generated replicaset_id differs from the // original one, as well as raft_id. // That's a desired behavior. @@ -453,7 +453,7 @@ mod tests { #[test] fn test_instance_id_collision() { - let mut topology = Topology::from_peers(peers![ + let mut topology = Topology::from_instances(instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i3", "r3", CurrentGrade::online(1), TargetGrade::online(1)), // Attention: i3 has raft_id=2 @@ -461,13 +461,13 @@ mod tests { assert_eq!( join!(topology, None, Some("r2"), "addr:2").unwrap(), - (peer!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), + (instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), ); } #[test] fn test_replication_factor() { - let mut topology = Topology::from_peers(peers![ + let mut topology = Topology::from_instances(instances![ (9, "i9", "r9", CurrentGrade::online(1), TargetGrade::online(1)), (10, "i10", "r9", CurrentGrade::online(1), TargetGrade::online(1)), ]) @@ -475,25 +475,25 @@ mod tests { assert_eq!( join!(topology, Some("i1"), None, "addr:1").unwrap(), - (peer!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), ); assert_eq!( join!(topology, Some("i2"), None, "addr:2").unwrap(), - (peer!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), + (instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), ); assert_eq!( join!(topology, Some("i3"), None, "addr:3").unwrap(), - (peer!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:3".into()), + (instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:3".into()), ); assert_eq!( join!(topology, Some("i4"), None, "addr:4").unwrap(), - (peer!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:4".into()), + (instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:4".into()), ); } #[test] fn test_update_grade() { - let mut topology = Topology::from_peers(peers![ + let mut topology = Topology::from_instances(instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), ]) .with_replication_factor(1); @@ -502,55 +502,55 @@ mod tests { // governor has the authority over it assert_eq!( set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // idempotency assert_eq!( set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // TargetGradeVariant::Offline takes incarnation from current grade assert_eq!( set_grade!(topology, "i1", TargetGradeVariant::Offline).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); // TargetGradeVariant::Online increases incarnation assert_eq!( set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // No idempotency, incarnation goes up assert_eq!( set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(2)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(2)), ); // TargetGrade::Expelled takes incarnation from current grade assert_eq!( set_grade!(topology, "i1", TargetGradeVariant::Expelled).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::expelled(0)), + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::expelled(0)), ); - // Peer get's expelled + // Instance get's expelled assert_eq!( set_grade!(topology, "i1", CurrentGrade::expelled(69)).unwrap(), - peer!(1, "i1", "r1", CurrentGrade::expelled(69), TargetGrade::expelled(0)), + instance!(1, "i1", "r1", CurrentGrade::expelled(69), TargetGrade::expelled(0)), ); - // Updating expelled peers isn't allowed + // Updating expelled instances isn't allowed assert_eq!( set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap_err().to_string(), - "cannot update expelled peer \"i1\"", + "cannot update expelled instance \"i1\"", ); } #[test] fn failure_domain() { - let mut t = Topology::from_peers(peers![]).with_replication_factor(3); + let mut t = Topology::from_instances(instances![]).with_replication_factor(3); assert_eq!( join!(t, None, None, "-", faildoms! {planet: Earth}) @@ -633,12 +633,12 @@ mod tests { #[test] fn reconfigure_failure_domain() { - let mut t = Topology::from_peers(peers![]).with_replication_factor(3); + let mut t = Topology::from_instances(instances![]).with_replication_factor(3); // first instance - let (peer, ..) = join!(t, Some("i1"), None, "-", faildoms! {planet: Earth}).unwrap(); - assert_eq!(peer.failure_domain, faildoms! {planet: Earth}); - assert_eq!(peer.replicaset_id, "r1"); + let (instance, ..) = join!(t, Some("i1"), None, "-", faildoms! {planet: Earth}).unwrap(); + assert_eq!(instance.failure_domain, faildoms! {planet: Earth}); + assert_eq!(instance.replicaset_id, "r1"); // reconfigure single instance, fail assert_eq!( @@ -649,9 +649,9 @@ mod tests { ); // reconfigure single instance, success - let peer = set_faildoms!(t, "i1", faildoms! {planet: Mars, owner: Ivan}).unwrap(); - assert_eq!(peer.failure_domain, faildoms! {planet: Mars, owner: Ivan}); - assert_eq!(peer.replicaset_id, "r1"); // same replicaset + let instance = set_faildoms!(t, "i1", faildoms! {planet: Mars, owner: Ivan}).unwrap(); + assert_eq!(instance.failure_domain, faildoms! {planet: Mars, owner: Ivan}); + assert_eq!(instance.replicaset_id, "r1"); // same replicaset // second instance won't be joined without the newly added required // failure domain subdivision of "OWNER" @@ -664,27 +664,27 @@ mod tests { // second instance #[rustfmt::skip] - let (peer, ..) = join!(t, Some("i2"), None, "-", faildoms! {planet: Mars, owner: Mike}) + let (instance, ..) = join!(t, Some("i2"), None, "-", faildoms! {planet: Mars, owner: Mike}) .unwrap(); - assert_eq!(peer.failure_domain, faildoms! {planet: Mars, owner: Mike}); + assert_eq!(instance.failure_domain, faildoms! {planet: Mars, owner: Mike}); // doesn't fit into r1 - assert_eq!(peer.replicaset_id, "r2"); + assert_eq!(instance.replicaset_id, "r2"); // reconfigure second instance, success - let peer = set_faildoms!(t, "i2", faildoms! {planet: Earth, owner: Mike}).unwrap(); - assert_eq!(peer.failure_domain, faildoms! {planet: Earth, owner: Mike}); + let instance = set_faildoms!(t, "i2", faildoms! {planet: Earth, owner: Mike}).unwrap(); + assert_eq!(instance.failure_domain, faildoms! {planet: Earth, owner: Mike}); // replicaset doesn't change automatically - assert_eq!(peer.replicaset_id, "r2"); + assert_eq!(instance.replicaset_id, "r2"); // add instance with new subdivision #[rustfmt::skip] - let (peer, ..) = join!(t, Some("i3"), None, "-", faildoms! {planet: B, owner: V, dimension: C137}) + let (instance, ..) = join!(t, Some("i3"), None, "-", faildoms! {planet: B, owner: V, dimension: C137}) .unwrap(); assert_eq!( - peer.failure_domain, + instance.failure_domain, faildoms! {planet: B, owner: V, dimension: C137} ); - assert_eq!(peer.replicaset_id, "r1"); + assert_eq!(instance.replicaset_id, "r1"); // even though the only instance with failure domain subdivision of // `DIMENSION` is inactive, we can't add an instance without that diff --git a/test/conftest.py b/test/conftest.py index d89af8e82e..67ba65828a 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -216,7 +216,7 @@ class Instance: return f"{self.host}:{self.port}" def current_grade(self): - return self.call("pico.peer_info", self.instance_id)["current_grade"] + return self.call("pico.instance_info", self.instance_id)["current_grade"] def instance_uuid(self): return self.eval("return box.info.uuid") @@ -436,7 +436,7 @@ class Instance: self.raft_id = whoami["raft_id"] self.instance_id = whoami["instance_id"] - myself = self.call("pico.peer_info", self.instance_id) + myself = self.call("pico.instance_info", self.instance_id) assert isinstance(myself, dict) assert isinstance(myself["current_grade"], dict) assert myself["current_grade"]["variant"] == "Online" diff --git a/test/int/test_basics.py b/test/int/test_basics.py index cb781627b4..ffe420e32f 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -184,21 +184,21 @@ def test_whoami(instance: Instance): } -def test_peer_info(instance: Instance): - def peer_info(iid: str | None = None): - return instance.call("pico.peer_info", iid) +def test_instance_info(instance: Instance): + def instance_info(iid: str | None = None): + return instance.call("pico.instance_info", iid) # Don't compare entire structure, a couple of fields is enough - myself = peer_info("i1") + myself = instance_info("i1") assert myself["raft_id"] == 1 assert myself["instance_id"] == "i1" assert myself["replicaset_id"] == "r1" with pytest.raises(ReturnError) as e: - peer_info("i2") - assert e.value.args == ('peer with id "i2" not found',) + instance_info("i2") + assert e.value.args == ('instance with id "i2" not found',) - assert peer_info() == myself + assert instance_info() == myself def test_raft_log(instance: Instance): @@ -218,18 +218,18 @@ def test_raft_log(instance: Instance): |index|term| lc |contents| +-----+----+-----+--------+ | 1 | 1 |1.0.1|Insert(_picodata_peer_address, [1,"127.0.0.1:{p}"])| -| 2 | 1 |1.0.2|PersistPeer(i1, 1, r1, Offline(0), {b})| +| 2 | 1 |1.0.2|PersistInstance(i1, 1, r1, Offline(0), {b})| | 3 | 1 |1.0.3|Insert(_picodata_cluster_state, ["replication_factor",1])| | 4 | 1 |1.0.4|Insert(_picodata_cluster_state, ["desired_schema_version",0])| | 5 | 1 | |AddNode(1)| | 6 | 2 | |-| -| 7 | 2 |1.1.1|PersistPeer(i1, 1, r1, Offline(0) -> Online(1), {b})| -| 8 | 2 |1.1.2|PersistPeer(i1, 1, r1, RaftSynced(1) -> Online(1), {b})| -| 9 | 2 |1.1.3|PersistPeer(i1, 1, r1, Replicated(1) -> Online(1), {b})| +| 7 | 2 |1.1.1|PersistInstance(i1, 1, r1, Offline(0) -> Online(1), {b})| +| 8 | 2 |1.1.2|PersistInstance(i1, 1, r1, RaftSynced(1) -> Online(1), {b})| +| 9 | 2 |1.1.3|PersistInstance(i1, 1, r1, Replicated(1) -> Online(1), {b})| | 10 | 2 |1.1.4|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,0])| -| 11 | 2 |1.1.5|PersistPeer(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})| +| 11 | 2 |1.1.5|PersistInstance(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})| | 12 | 2 |1.1.6|Replace(_picodata_cluster_state, ["vshard_bootstrapped",true])| -| 13 | 2 |1.1.7|PersistPeer(i1, 1, r1, Online(1), {b})| +| 13 | 2 |1.1.7|PersistInstance(i1, 1, r1, Online(1), {b})| +-----+----+-----+--------+ """.format( # noqa: E501 p=instance.port, b="{}" diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py index 138cb96392..71b231a52e 100644 --- a/test/int/test_expelling.py +++ b/test/int/test_expelling.py @@ -8,9 +8,9 @@ def cluster3(cluster: Cluster): return cluster -def assert_peer_expelled(expelled_peer: Instance, instance: Instance): - peer_info = instance.call("pico.peer_info", expelled_peer.instance_id) - grades = peer_info["current_grade"]["variant"], peer_info["target_grade"]["variant"] +def assert_instance_expelled(expelled_instance: Instance, instance: Instance): + info = instance.call("pico.instance_info", expelled_instance.instance_id) + grades = (info["current_grade"]["variant"], info["target_grade"]["variant"]) assert ("Expelled", "Expelled") == grades @@ -28,7 +28,7 @@ def test_expel_follower(cluster3: Cluster): # Scenario: expel a Follower instance by command to Leader # Given a cluster # When a Follower instance expelled from the cluster - # Then the instance marked as expelled in the peers table + # Then the instance marked as expelled in the instances table # And excluded from the voters list i1, i2, i3 = cluster3.instances @@ -38,7 +38,7 @@ def test_expel_follower(cluster3: Cluster): cluster3.expel(i3, i1) - retrying(lambda: assert_peer_expelled(i3, i1)) + retrying(lambda: assert_instance_expelled(i3, i1)) retrying(lambda: assert_voters([i1, i2], i1)) # assert i3.process @@ -49,7 +49,7 @@ def test_expel_leader(cluster3: Cluster): # Scenario: expel a Leader instance by command to itself # Given a cluster # When a Leader instance expelled from the cluster - # Then the instance marked as expelled in the peers table + # Then the instance marked as expelled in the instances table # And excluded from the voters list i1, i2, i3 = cluster3.instances @@ -59,7 +59,7 @@ def test_expel_leader(cluster3: Cluster): cluster3.expel(i1) - retrying(lambda: assert_peer_expelled(i1, i2)) + retrying(lambda: assert_instance_expelled(i1, i2)) retrying(lambda: assert_voters([i2, i3], i2)) # assert i1.process @@ -80,7 +80,7 @@ def test_expel_by_follower(cluster3: Cluster): cluster3.expel(i3, i2) - retrying(lambda: assert_peer_expelled(i3, i1)) + retrying(lambda: assert_instance_expelled(i3, i1)) retrying(lambda: assert_voters([i1, i2], i1)) # assert i3.process @@ -100,7 +100,7 @@ def test_raft_id_after_expel(cluster: Cluster): assert 3 == i3.raft_id cluster.expel(i3, i1) - retrying(lambda: assert_peer_expelled(i3, i1)) + retrying(lambda: assert_instance_expelled(i3, i1)) i4 = cluster.add_instance() assert 4 == i4.raft_id diff --git a/test/int/test_joining.py b/test/int/test_joining.py index 1dcc5709a5..660722184b 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -22,7 +22,7 @@ def cluster3(cluster: Cluster): def raft_join( - peer: Instance, + instance: Instance, cluster_id: str, instance_id: str, timeout_seconds: float | int, @@ -33,7 +33,7 @@ def raft_join( # invalid address format to eliminate blocking DNS requests. # See https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/81 address = f"nowhere/{instance_id}" - return peer.call( + return instance.call( ".proc_raft_join", cluster_id, instance_id, @@ -56,7 +56,7 @@ def test_request_follower(cluster2: Cluster): expected = [{"ErrNotALeader": {"raft_id": 1, "address": i1.listen}}] actual = raft_join( - peer=i2, cluster_id=cluster2.id, instance_id="fake-0", timeout_seconds=1 + instance=i2, cluster_id=cluster2.id, instance_id="fake-0", timeout_seconds=1 ) assert expected == actual @@ -70,12 +70,12 @@ def test_discovery(cluster3: Cluster): # change leader i2.promote_or_fail() - def req_discover(peer: Instance): + def req_discover(instance: Instance): request = dict(tmp_id="unused", peers=["test:3301"]) - request_to = peer.listen - return peer.call(".proc_discover", request, request_to) + request_to = instance.listen + return instance.call(".proc_discover", request, request_to) - # Run discovery against `--peer i1`. + # Run discovery against `--instance i1`. # It used to be a bootstrap leader, but now it's just a follower. assert req_discover(i1) == [{"Done": {"NonLeader": {"leader": i2.listen}}}] @@ -83,7 +83,7 @@ def test_discovery(cluster3: Cluster): i4 = cluster3.add_instance(peers=[i1.listen]) i4.assert_raft_status("Follower", leader_id=i2.raft_id) - # Run discovery against `--peer i3`. + # Run discovery against `--instance i3`. # It has performed a rebootstrap after discovery, # and now has the discovery module uninitialized. assert req_discover(i3) == [{"Done": {"NonLeader": {"leader": i2.listen}}}] @@ -108,7 +108,7 @@ def test_parallel(cluster3: Cluster): i2.promote_or_fail() i3.assert_raft_status("Follower", leader_id=i2.raft_id) - # Add instance with the first peer being i1 + # Add instance with the first instance being i1 i4 = cluster3.add_instance(peers=[i1.listen, i2.listen, i3.listen]) i4.assert_raft_status("Follower", leader_id=i2.raft_id) @@ -128,7 +128,7 @@ def test_replication(cluster: Cluster): for instance in cluster.instances: with instance.connect(1) as conn: - raft_peer = conn.eval( + raft_instance = conn.eval( "return pico.space.instance:get(...):tomap()", instance.instance_id, )[0] @@ -144,7 +144,7 @@ def test_replication(cluster: Cluster): "target_grade": ["Online", 1], "failure_domain": dict(), } - assert {k: v for k, v in raft_peer.items() if k in expected} == expected + assert {k: v for k, v in raft_instance.items() if k in expected} == expected assert list(space_cluster) == [ [1, i1.instance_uuid()], @@ -191,8 +191,8 @@ def test_init_replication_factor(cluster: Cluster): replicaset_ids = i1.eval( """ return pico.space.instance:pairs() - :map(function(peer) - return peer.replicaset_id + :map(function(instance) + return instance.replicaset_id end) :totable() """ @@ -214,7 +214,7 @@ def test_cluster_id_mismatch(instance: Instance): with pytest.raises(TarantoolError, match=expected_error_re): raft_join( - peer=instance, + instance=instance, cluster_id=wrong_cluster_id, instance_id="whatever", timeout_seconds=1, @@ -224,7 +224,7 @@ def test_cluster_id_mismatch(instance: Instance): @pytest.mark.xfail( run=False, reason=( - "failed reading peer with id `3`: peer with id 3 not found, " + "failed reading instance with id `3`: instance with id 3 not found, " "thread 'main' panicked, src/traft/node.rs:1515:17" ), ) @@ -272,7 +272,7 @@ def test_failure_domains(cluster: Cluster): with pytest.raises(TarantoolError, match="missing failure domain names: PLANET"): raft_join( - peer=i1, + instance=i1, cluster_id=i1.cluster_id, instance_id="x1", failure_domain=dict(os="Arch"), @@ -285,7 +285,7 @@ def test_failure_domains(cluster: Cluster): with pytest.raises(TarantoolError, match="missing failure domain names: OS"): raft_join( - peer=i1, + instance=i1, cluster_id=i1.cluster_id, instance_id="x1", failure_domain=dict(planet="Venus"), @@ -347,8 +347,8 @@ def test_fail_to_join(cluster: Cluster): joined_instances = i1.eval( """ return pico.space.instance:pairs() - :map(function(peer) - return { peer.instance_id, peer.raft_id } + :map(function(instance) + return { instance.instance_id, instance.raft_id } end) :totable() """ @@ -370,9 +370,9 @@ def test_not_a_leader_at_postjoin(cluster: Cluster): i1.eval( """ local args = ... - box.schema.func.drop(".proc_update_peer") - _G[""] = { proc_update_peer = function() - box.schema.func.create(".proc_update_peer", {language="C", if_not_exists=true}) + box.schema.func.drop(".proc_update_instance") + _G[""] = { proc_update_instance = function() + box.schema.func.create(".proc_update_instance", {language="C", if_not_exists=true}) require("net.box").connect(args.addr):call("pico.raft_timeout_now") return {'ErrNotALeader'} end } diff --git a/test/int/test_replication.py b/test/int/test_replication.py index a71cf28805..0a6e622649 100644 --- a/test/int/test_replication.py +++ b/test/int/test_replication.py @@ -24,7 +24,7 @@ def cluster3(cluster: Cluster): def wait_repl_master(i: Instance, other_than=None): repl_master = i.eval( """ - local rid = pico.peer_info(...).replicaset_id + local rid = pico.instance_info(...).replicaset_id return pico.space.replicaset:get(rid).master_id """, i.instance_id, @@ -168,7 +168,7 @@ def test_bucket_rebalancing(cluster: Cluster): def test_bucket_rebalancing_respects_replication_factor(cluster: Cluster): - peer, *_ = cluster.deploy(instance_count=4, init_replication_factor=2) + i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2) # wait for buckets to be rebalanced between 2 replicasets 1500 each for i in cluster.instances: @@ -177,7 +177,7 @@ def test_bucket_rebalancing_respects_replication_factor(cluster: Cluster): # check vshard routes requests to both replicasets reached_instances = set() for bucket_id in [1, 3000]: - info = peer.call("vshard.router.callro", bucket_id, "pico.peer_info") + info = i1.call("vshard.router.callro", bucket_id, "pico.instance_info") reached_instances.add(info["instance_id"]) assert len(reached_instances) == 2 @@ -200,6 +200,6 @@ def test_bucket_rebalancing_respects_replication_factor(cluster: Cluster): # check vshard routes requests to all 3 replicasets reached_instances = set() for bucket_id in [1, 1500, 3000]: - info = peer.call("vshard.router.callro", bucket_id, "pico.peer_info") + info = i1.call("vshard.router.callro", bucket_id, "pico.instance_info") reached_instances.add(info["instance_id"]) assert len(reached_instances) == 3 diff --git a/test/int/test_shutdown.py b/test/int/test_shutdown.py index c43a449835..7fd66823c0 100644 --- a/test/int/test_shutdown.py +++ b/test/int/test_shutdown.py @@ -68,7 +68,7 @@ def test_couple_leader_first(cluster2: Cluster): assert not c1.matched i2.assert_raft_status("Leader") - i1_info = i2.call("pico.peer_info", i1.instance_id) + i1_info = i2.call("pico.instance_info", i1.instance_id) assert i1_info["target_grade"]["variant"] == "Offline" assert i1_info["current_grade"]["variant"] == "Offline" @@ -88,7 +88,7 @@ def test_couple_follower_first(cluster2: Cluster): i2.terminate(kill_after_seconds=1) assert not c2.matched - i2_info = i1.call("pico.peer_info", i2.instance_id) + i2_info = i1.call("pico.instance_info", i2.instance_id) assert i2_info["target_grade"]["variant"] == "Offline" assert i2_info["current_grade"]["variant"] == "Offline" diff --git a/test/int/test_uninitialized.py b/test/int/test_uninitialized.py index d25a2d14a6..e11fdb5388 100644 --- a/test/int/test_uninitialized.py +++ b/test/int/test_uninitialized.py @@ -34,8 +34,8 @@ def test_raft_api(uninitialized_instance: Instance): lambda i: i.call("pico.raft_propose_nop"), lambda i: i.call("pico.raft_propose_info", "who cares"), lambda i: i.call("pico.whoami"), - lambda i: i.call("pico.peer_info", "i1"), - lambda i: i.call("pico.peer_info", "i2"), + lambda i: i.call("pico.instance_info", "i1"), + lambda i: i.call("pico.instance_info", "i2"), ] for f in functions: -- GitLab