Skip to content
Snippets Groups Projects
Commit d8ab2e2d authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix(join): put peer_addresses into JoinResponse instead of raft_group

parent 83eaa895
No related branches found
No related tags found
1 merge request!389Refactor/storage/peer addresses
......@@ -932,8 +932,8 @@ fn start_join(args: &args::Run, leader_address: String) {
let raft_id = resp.peer.raft_id;
start_transaction(|| -> Result<(), TntError> {
storage.peers.put(&resp.peer).unwrap();
for peer in resp.raft_group {
storage.peers.put(&peer).unwrap();
for traft::PeerAddress { raft_id, address } in resp.peer_addresses {
storage.peer_addresses.put(raft_id, &address).unwrap();
}
storage.raft.persist_raft_id(raft_id).unwrap();
storage
......
......@@ -57,7 +57,7 @@ pub struct PoolWorker {
// primary for worker.
raft_id: RaftId,
// Store instance_id for the debugging purposes only.
instance_id: InstanceId,
instance_id: Option<InstanceId>,
inbox: Queue,
fiber: fiber::UnitJoinHandle<'static>,
cond: Rc<fiber::Cond>,
......@@ -66,9 +66,10 @@ pub struct PoolWorker {
}
impl PoolWorker {
#[inline]
pub fn run(
raft_id: RaftId,
instance_id: InstanceId,
instance_id: impl Into<Option<InstanceId>>,
storage: PeerAddresses,
opts: WorkerOptions,
) -> PoolWorker {
......@@ -76,8 +77,14 @@ impl PoolWorker {
let inbox = Mailbox::with_cond(cond.clone());
let stop_flag = Rc::new(Cell::default());
let handler_name = opts.handler_name;
let instance_id = instance_id.into();
let fiber = fiber::Builder::new()
.name(format!("to:{instance_id}"))
.name(
instance_id
.as_ref()
.map(|instance_id| format!("to:{instance_id}"))
.unwrap_or_else(|| format!("to:raft:{raft_id}")),
)
.proc({
let cond = cond.clone();
let inbox = inbox.clone();
......@@ -397,14 +404,16 @@ impl ConnectionPool {
let instance_id = self
.peers
.peer_field::<peer_field::InstanceId>(&raft_id)
.map_err(|_| Error::NoPeerWithRaftId(raft_id))?;
.map_err(|_| Error::NoPeerWithRaftId(raft_id)).ok();
let worker = PoolWorker::run(
raft_id,
instance_id.clone(),
self.peer_addresses.clone(),
self.worker_options.clone(),
);
self.raft_ids.insert(instance_id, raft_id);
if let Some(instance_id) = instance_id {
self.raft_ids.insert(instance_id, raft_id);
}
Ok(entry.insert(worker))
}
}
......
use crate::traft::{error::Error, node, FailureDomain, InstanceId, Peer, ReplicasetId, Result};
use crate::traft::{
error::Error, node, FailureDomain, InstanceId, Peer, PeerAddress, ReplicasetId, Result,
};
crate::define_rpc_request! {
fn proc_raft_join(req: Request) -> Result<Response> {
......@@ -24,21 +26,12 @@ crate::define_rpc_request! {
}
// A joined peer needs to communicate with other nodes.
// Provide it the list of raft voters in response.
// TODO: return peer_addresses
let mut raft_group = vec![];
for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() {
match node.storage.peers.get(&raft_id) {
Err(e) => {
crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e);
}
Ok(peer) => raft_group.push(peer),
}
}
// TODO: limit the number of entries sent to reduce response size.
let peer_addresses = node.storage.peer_addresses.iter()?.collect();
Ok(Response {
peer,
raft_group,
peer_addresses,
box_replication,
})
}
......@@ -55,7 +48,7 @@ crate::define_rpc_request! {
/// Response to a [`join::Request`](Request).
pub struct Response {
pub peer: Box<Peer>,
pub raft_group: Vec<Peer>,
pub peer_addresses: Vec<PeerAddress>,
pub box_replication: Vec<String>,
// Other parameters necessary for box.cfg()
// TODO
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment