From d8ab2e2da31e5a74d4825dd953b7e00b97823716 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 28 Nov 2022 11:27:44 +0300 Subject: [PATCH] fix(join): put peer_addresses into JoinResponse instead of raft_group --- src/main.rs | 4 ++-- src/traft/network.rs | 19 ++++++++++++++----- src/traft/rpc/join.rs | 21 +++++++-------------- 3 files changed, 23 insertions(+), 21 deletions(-) diff --git a/src/main.rs b/src/main.rs index 9f269b9988..9027d7012e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -932,8 +932,8 @@ fn start_join(args: &args::Run, leader_address: String) { let raft_id = resp.peer.raft_id; start_transaction(|| -> Result<(), TntError> { storage.peers.put(&resp.peer).unwrap(); - for peer in resp.raft_group { - storage.peers.put(&peer).unwrap(); + for traft::PeerAddress { raft_id, address } in resp.peer_addresses { + storage.peer_addresses.put(raft_id, &address).unwrap(); } storage.raft.persist_raft_id(raft_id).unwrap(); storage diff --git a/src/traft/network.rs b/src/traft/network.rs index e9cea4b8b5..f3a32533aa 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -57,7 +57,7 @@ pub struct PoolWorker { // primary for worker. raft_id: RaftId, // Store instance_id for the debugging purposes only. - instance_id: InstanceId, + instance_id: Option<InstanceId>, inbox: Queue, fiber: fiber::UnitJoinHandle<'static>, cond: Rc<fiber::Cond>, @@ -66,9 +66,10 @@ pub struct PoolWorker { } impl PoolWorker { + #[inline] pub fn run( raft_id: RaftId, - instance_id: InstanceId, + instance_id: impl Into<Option<InstanceId>>, storage: PeerAddresses, opts: WorkerOptions, ) -> PoolWorker { @@ -76,8 +77,14 @@ impl PoolWorker { let inbox = Mailbox::with_cond(cond.clone()); let stop_flag = Rc::new(Cell::default()); let handler_name = opts.handler_name; + let instance_id = instance_id.into(); let fiber = fiber::Builder::new() - .name(format!("to:{instance_id}")) + .name( + instance_id + .as_ref() + .map(|instance_id| format!("to:{instance_id}")) + .unwrap_or_else(|| format!("to:raft:{raft_id}")), + ) .proc({ let cond = cond.clone(); let inbox = inbox.clone(); @@ -397,14 +404,16 @@ impl ConnectionPool { let instance_id = self .peers .peer_field::<peer_field::InstanceId>(&raft_id) - .map_err(|_| Error::NoPeerWithRaftId(raft_id))?; + .map_err(|_| Error::NoPeerWithRaftId(raft_id)).ok(); let worker = PoolWorker::run( raft_id, instance_id.clone(), self.peer_addresses.clone(), self.worker_options.clone(), ); - self.raft_ids.insert(instance_id, raft_id); + if let Some(instance_id) = instance_id { + self.raft_ids.insert(instance_id, raft_id); + } Ok(entry.insert(worker)) } } diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs index 3c6fada55d..0ec4a3a545 100644 --- a/src/traft/rpc/join.rs +++ b/src/traft/rpc/join.rs @@ -1,4 +1,6 @@ -use crate::traft::{error::Error, node, FailureDomain, InstanceId, Peer, ReplicasetId, Result}; +use crate::traft::{ + error::Error, node, FailureDomain, InstanceId, Peer, PeerAddress, ReplicasetId, Result, +}; crate::define_rpc_request! { fn proc_raft_join(req: Request) -> Result<Response> { @@ -24,21 +26,12 @@ crate::define_rpc_request! { } // A joined peer needs to communicate with other nodes. - // Provide it the list of raft voters in response. - // TODO: return peer_addresses - let mut raft_group = vec![]; - for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() { - match node.storage.peers.get(&raft_id) { - Err(e) => { - crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e); - } - Ok(peer) => raft_group.push(peer), - } - } + // TODO: limit the number of entries sent to reduce response size. + let peer_addresses = node.storage.peer_addresses.iter()?.collect(); Ok(Response { peer, - raft_group, + peer_addresses, box_replication, }) } @@ -55,7 +48,7 @@ crate::define_rpc_request! { /// Response to a [`join::Request`](Request). pub struct Response { pub peer: Box<Peer>, - pub raft_group: Vec<Peer>, + pub peer_addresses: Vec<PeerAddress>, pub box_replication: Vec<String>, // Other parameters necessary for box.cfg() // TODO -- GitLab