diff --git a/src/main.rs b/src/main.rs index 26bcf0909040a55316f3aa90996810d10b63cd91..ae8189dd7bc80786963c3034bcc13093e438ed16 100644 --- a/src/main.rs +++ b/src/main.rs @@ -752,7 +752,7 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { fn start_boot(args: &args::Run) { tlog!(Info, ">>>>> start_boot()"); - let (instance, address) = traft::topology::initial_instance( + let (instance, address, _) = traft::topology::initial_instance( args.instance_id.clone(), args.replicaset_id.clone(), args.advertise_address(), diff --git a/src/traft/node.rs b/src/traft/node.rs index d08c5609c89be5d136feccedb1eab6da1aeba03b..0efe038ed5945be7b8d31b34d5c6f09169b92862 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -18,7 +18,7 @@ use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::cell::Cell; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::rc::Rc; use std::time::Duration; @@ -47,6 +47,7 @@ use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; use crate::traft::rpc::{join, update_instance}; +use crate::traft::Address; use crate::traft::ConnectionPool; use crate::traft::CurrentGradeVariant; use crate::traft::LogicalClock; @@ -247,13 +248,13 @@ impl Node { pub fn handle_join_request_and_wait( &self, req: join::Request, - ) -> traft::Result<Box<traft::Instance>> { - let (notify_addr, notify_instance) = + ) -> traft::Result<(Box<Instance>, HashSet<Address>)> { + let (notify_addr, notify_instance, replication_addresses) = self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?; fiber::block_on(async { let (addr, instance) = futures::join!(notify_addr.recv_any(), notify_instance.recv()); addr?; - instance.map(Box::new) + instance.map(|i| (Box::new(i), replication_addresses)) }) } @@ -477,9 +478,9 @@ impl NodeImpl { pub fn process_join_request_async( &mut self, req: join::Request, - ) -> traft::Result<(Notify, Notify)> { + ) -> traft::Result<(Notify, Notify, HashSet<Address>)> { let topology = self.topology_mut()?; - let (instance, address) = topology + let (instance, address, replication_addresses) = topology .join( req.instance_id, req.replicaset_id, @@ -510,6 +511,7 @@ impl NodeImpl { Ok(( self.propose_async(op_addr)?, self.propose_async(op_instance)?, + replication_addresses, )) } diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs index b3f95f1ddc6e488586c966e7bb73e0759d22ab8d..e4c822242f9a8245409532d0c767c27a0f22ccb9 100644 --- a/src/traft/rpc/join.rs +++ b/src/traft/rpc/join.rs @@ -1,12 +1,13 @@ use crate::traft::{ - error::Error, node, FailureDomain, Instance, InstanceId, PeerAddress, ReplicasetId, Result, + error::Error, node, Address, FailureDomain, Instance, InstanceId, PeerAddress, ReplicasetId, + Result, }; #[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)] pub struct OkResponse { pub instance: Box<Instance>, pub peer_addresses: Vec<PeerAddress>, - pub box_replication: Vec<String>, + pub box_replication: Vec<Address>, // Other parameters necessary for box.cfg() // TODO } @@ -28,12 +29,7 @@ crate::define_rpc_request! { } match node.handle_join_request_and_wait(req) { - Ok(instance) => { - let mut box_replication = vec![]; - for replica in node.storage.instances.replicaset_instances(&instance.replicaset_id)? { - box_replication.extend(node.storage.peer_addresses.get(replica.raft_id)?); - } - + Ok((instance, replication_addresses)) => { // A joined instance needs to communicate with other nodes. // TODO: limit the number of entries sent to reduce response size. let peer_addresses = node.storage.peer_addresses.iter()?.collect(); @@ -41,7 +37,7 @@ crate::define_rpc_request! { Ok(Response::Ok(OkResponse { instance, peer_addresses, - box_replication, + box_replication: replication_addresses.into_iter().collect(), })) }, diff --git a/src/traft/topology.rs b/src/traft/topology.rs index aca7f95b32cc5f06de1233fb7d428d39b2585f2c..f8cdfe5faf370eb7d0cabc59432f9c8e71522164 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -10,6 +10,7 @@ use crate::traft::{CurrentGrade, CurrentGradeVariant, Grade, TargetGrade, Target use crate::traft::{InstanceId, RaftId, ReplicasetId}; use crate::util::Uppercase; +#[derive(Debug)] pub struct Topology { replication_factor: u8, max_raft_id: RaftId, @@ -126,7 +127,7 @@ impl Topology { replicaset_id: Option<ReplicasetId>, advertise: Address, failure_domain: FailureDomain, - ) -> Result<(Instance, Address), String> { + ) -> Result<(Instance, Address, HashSet<Address>), String> { if let Some(id) = &instance_id { let existing_instance = self.instance_map.get(id); if matches!(existing_instance, Some((instance, ..)) if instance.is_online()) { @@ -149,7 +150,7 @@ impl Topology { instance_id, instance_uuid, raft_id, - replicaset_id, + replicaset_id: replicaset_id.clone(), replicaset_uuid, current_grade: CurrentGrade::offline(0), target_grade: TargetGrade::offline(0), @@ -157,7 +158,14 @@ impl Topology { }; self.put_instance(instance.clone(), advertise.clone()); - Ok((instance, advertise)) + + let replication_ids = self.replicaset_map.get(&replicaset_id).unwrap(); + let replication_addresses = replication_ids + .iter() + .map(|id| self.instance_map.get(id).unwrap().1.clone()) + .collect(); + + Ok((instance, advertise, replication_addresses)) } pub fn update_instance(&mut self, req: update_instance::Request) -> Result<Instance, String> { @@ -218,7 +226,7 @@ pub fn initial_instance( replicaset_id: Option<ReplicasetId>, advertise: Address, failure_domain: FailureDomain, -) -> Result<(Instance, Address), String> { +) -> Result<(Instance, Address, HashSet<Address>), String> { let mut topology = Topology::new(vec![]); topology.join(instance_id, replicaset_id, advertise, failure_domain) } @@ -226,6 +234,8 @@ pub fn initial_instance( #[rustfmt::skip] #[cfg(test)] mod tests { + use std::collections::HashSet; + use super::Topology; use crate::traft::instance_uuid; @@ -303,6 +313,10 @@ mod tests { }; } + macro_rules! addresses { + [$($address:literal),*] => [HashSet::from([$($address.to_string()),*])] + } + macro_rules! join { ( $topology:expr, @@ -372,22 +386,22 @@ mod tests { assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); assert_eq!( join!(topology, None, Some("R3"), "addr:1").unwrap(), - (instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); assert_eq!( join!(topology, Some("I4"), None, "addr:1").unwrap(), - (instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); let mut topology = Topology::new( @@ -396,7 +410,7 @@ mod tests { assert_eq!( join!(topology, None, None, "addr:1").unwrap(), - (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); } @@ -432,7 +446,7 @@ mod tests { // Disruption isn't destructive if auto-expel allows (TODO). assert_eq!( join!(topology, Some("i2"), None, "inactive:2").unwrap(), - (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "inactive:2".into()), + (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "inactive:2".into(), addresses!["who-cares.biz", "inactive:2"]), // Attention: generated replicaset_id differs from the // original one, as well as raft_id. // That's a desired behavior. @@ -461,7 +475,7 @@ mod tests { assert_eq!( join!(topology, None, Some("r2"), "addr:2").unwrap(), - (instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), + (instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into(), addresses!["addr:2"]), ); } @@ -475,19 +489,19 @@ mod tests { assert_eq!( join!(topology, Some("i1"), None, "addr:1").unwrap(), - (instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into()), + (instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), ); assert_eq!( join!(topology, Some("i2"), None, "addr:2").unwrap(), - (instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into()), + (instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into(), addresses!["addr:1", "addr:2"]), ); assert_eq!( join!(topology, Some("i3"), None, "addr:3").unwrap(), - (instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:3".into()), + (instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:3".into(), addresses!["addr:3"]), ); assert_eq!( join!(topology, Some("i4"), None, "addr:4").unwrap(), - (instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:4".into()), + (instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:4".into(), addresses!["addr:3", "addr:4"]), ); }