Skip to content
Snippets Groups Projects
node.rs 34.8 KiB
Newer Older
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}

#[proc(packed_args)]
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
    for pb in pbs {
        node.step(raft::Message::try_from(pb)?);
    }
    Ok(())
}

#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
Sergey V's avatar
Sergey V committed
    let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,
        }));
    }

    let peer = node.handle_topology_request(req.into())?;
    let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?;
    // A joined peer needs to communicate with other nodes.
    // Provide it the list of raft voters in response.
    let mut raft_group = vec![];
    for raft_id in Storage::voters()?.into_iter() {
        if let Some(peer) = Storage::peer_by_raft_id(raft_id)? {
            raft_group.push(peer);
        } else {
            crate::warn_or_panic!("peer missing in storage, raft_id: {}", raft_id);
        }
    }

    Ok(JoinResponse {
        peer,
        raft_group,
        box_replication,
    })