Newer
Older
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
for pb in pbs {
node.step(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;
if req.cluster_id != cluster_id {
return Err(Box::new(Error::ClusterIdMismatch {
instance_cluster_id: req.cluster_id,
cluster_cluster_id: cluster_id,
}));
}
let peer = node.handle_topology_request(req.into())?;
let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?;
// A joined peer needs to communicate with other nodes.
// Provide it the list of raft voters in response.
let mut raft_group = vec![];
for raft_id in Storage::voters()?.into_iter() {
if let Some(peer) = Storage::peer_by_raft_id(raft_id)? {
raft_group.push(peer);
} else {
crate::warn_or_panic!("peer missing in storage, raft_id: {}", raft_id);
}
}
Ok(JoinResponse {
peer,
raft_group,
box_replication,
})