Newer
Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::tarantool::transaction::start_transaction;
use crate::traft::Peer;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::failover;
use crate::traft::TopologyRequest;
use crate::traft::{JoinRequest, JoinResponse, UpdatePeerRequest};
// type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>;
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
#[derive(Clone)]
struct Notify {
ch: fiber::Channel<Result<Box<dyn Any>, Error>>,
}
impl Notify {
fn new() -> Self {
Self {
ch: fiber::Channel::new(1),
}
}
fn notify_ok_any(&self, res: Box<dyn Any>) {
self.ch.try_send(Ok(res)).ok();
}
fn notify_ok<T: Any>(&self, res: T) {
self.notify_ok_any(Box::new(res));
}
fn notify_err<E: Into<Error>>(&self, err: E) {
self.ch.try_send(Err(err.into())).ok();
}
fn recv_any(self) -> Result<Box<dyn Any>, Error> {
match self.ch.recv() {
Some(v) => v,
None => {
self.ch.close();
Err(Error::Timeout)
}
}
}
fn recv_timeout_any(self, timeout: Duration) -> Result<Box<dyn Any>, Error> {
match self.ch.recv_timeout(timeout) {
Ok(v) => v,
Err(_) => {
self.ch.close();
Err(Error::Timeout)
}
}
}
fn recv_timeout<T: 'static>(self, timeout: Duration) -> Result<T, Error> {
let any: Box<dyn Any> = self.recv_timeout_any(timeout)?;
let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?;
Ok(*boxed)
}
fn recv<T: 'static>(self) -> Result<T, Error> {
let any: Box<dyn Any> = self.recv_any()?;
let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?;
Ok(*boxed)
}
fn is_closed(&self) -> bool {
self.ch.is_closed()
}
}
impl std::fmt::Debug for Notify {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Notify").finish_non_exhaustive()
}
}
#[derive(Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
/// `raft_id` of the current instance
/// `raft_id` of the leader instance
/// One of "Follower", "Candidate", "Leader", "PreCandidate"
/// Whether instance has finished its `postjoin`
/// initialization stage
/// The heart of `traft` module - the Node.
_conf_change_loop: fiber::UnitJoinHandle<'static>,
/// A request to the raft main loop.
/// Propose `raft::prelude::Entry` of `EntryNormal` kind.
/// Make a notification when it's committed.
/// Notify the caller with `Result<Box<_>>`,
/// of the associated type `traft::Op::Result`.
///
ProposeNormal { op: traft::Op, notify: Notify },
/// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
/// Make a notification when the second EntryConfChange is
/// committed (that corresponds to the leaving of a joint state).
/// Notify the caller with `Result<Box<()>>`.
///
notify: Notify,
},
/// This kind of requests is special: it can only be processed on
/// the leader, and implies corresponding `TopologyRequest` to a
/// cached `struct Topology`. As a result, it proposes the
/// `Op::PersistPeer` entry to the raft.
///
/// Make a notification when the entry is committed.
/// Notify the caller with `Result<Box<Peer>>`.
///
HandleTopologyRequest {
req: TopologyRequest,
/// Get a read barrier. In some systems it's also called the "quorum read".
/// Make a notification when index is read.
/// Notify the caller with `Result<Box<u64>>`,
/// holding the resulting `raft_index`.
///
ReadIndex { notify: Notify },
/// Make a notification when request is processed.
/// Notify the caller with `Result<Box<()>>`
///
/// Handle message from anoher raft node.
/// Make a notification when request is processed.
/// Notify the caller with `Result<(Box<()>)>`, with
/// the `Err` variant unreachable
///
Tick { n_times: u32, notify: Notify },
pub const TICK: Duration = Duration::from_millis(100);
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let main_inbox = Mailbox::<NormalRequest>::new();
let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
}));
let main_loop_fn = {
let status = status.clone();
let main_inbox = main_inbox.clone();
move || raft_main_loop(main_inbox, status, raw_node)
let conf_change_loop_fn = {
let status = status.clone();
move || raft_conf_change_loop(status, main_inbox)
_main_loop: fiber::Builder::new()
.name("raft_main_loop")
.proc(main_loop_fn)
.start()
.unwrap(),
_conf_change_loop: fiber::Builder::new()
.name("raft_conf_change_loop")
.proc(conf_change_loop_fn)
};
// Wait for the node to enter the main loop
pub fn status(&self) -> Status {
self.status.borrow().clone()
}
pub fn mark_as_ready(&self) {
self.status.borrow_mut().is_ready = true;
event::broadcast(Event::StatusChanged);
event::wait(Event::StatusChanged).expect("Events system wasn't initialized");
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = Notify::new().into_clones();
self.main_inbox
.send(NormalRequest::ReadIndex { notify: tx });
pub fn propose<T: OpResult + Into<traft::Op>>(
&self,
op: T,
timeout: Duration,
) -> Result<T::Result, Error> {
let (rx, tx) = Notify::new().into_clones();
self.main_inbox.send(NormalRequest::ProposeNormal {
op: op.into(),
notify: tx,
});
rx.recv_timeout::<T::Result>(timeout)
pub fn campaign(&self) -> Result<(), Error> {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Campaign { notify: tx };
pub fn step(&self, msg: raft::Message) {
let req = NormalRequest::Step(msg);
self.main_inbox.send(req);
}
pub fn tick(&self, n_times: u32) {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Tick {
n_times,
notify: tx,
};
match rx.recv() {
Ok(()) => (),
Err(e) => tlog!(Warning, "{e}"),
}
}
pub fn timeout_now(&self) {
self.step(raft::Message {
to: self.raft_id,
from: self.raft_id,
msg_type: raft::MessageType::MsgTimeoutNow,
..Default::default()
})
pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::HandleTopologyRequest { req, notify: tx };
struct JointStateLatch {
/// Index of the latest ConfChange entry proposed.
/// Helps detecting when the entry is overridden
/// due to a re-election.
/// Make a notification when the latch is unlocked.
/// Notification is a `Result<Box<()>>`.
notify: Notify,
}
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
topology_changed: &mut bool,
) {
for entry in &entries {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
tlog!(
Error,
"error parsing (and applying) an entry: {e}, entry = {entry:?}"
);
continue;
}
};
raft::EntryType::EntryNormal => handle_committed_normal_entry(
entry,
notifications,
pool,
joint_state_latch,
topology_changed,
),
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
handle_committed_conf_change(entry, raw_node, joint_state_latch)
}
}
if let Some(last_entry) = entries.last() {
if let Err(e) = Storage::persist_applied(last_entry.index) {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => last_entry.index
);
};
}
}
fn handle_committed_normal_entry(
entry: traft::Entry,
notifications: &mut HashMap<LogicalClock, Notify>,
joint_state_latch: &mut Option<JointStateLatch>,
topology_changed: &mut bool,
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit();
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(traft::Op::PersistPeer { peer }) = entry.op() {
pool.connect(peer.raft_id, peer.peer_address.clone());
*topology_changed = true;
if let Some(latch) = joint_state_latch {
if entry.index == latch.index {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
latch.notify.notify_err(e);
}
}
}
fn handle_committed_conf_change(
entry: traft::Entry,
raw_node: &mut RawNode,
joint_state_latch: &mut Option<JointStateLatch>,
) {
let mut latch_unlock = || {
if let Some(latch) = joint_state_latch {
latch.notify.notify_ok(());
*joint_state_latch = None;
event::broadcast(Event::LeaveJointState);
}
};
// Beware: a tiny difference in type names (`V2` or not `V2`)
// makes a significant difference in `entry.data` binary layout and
// in joint state transitions.
// `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
// applied in an instant without entering the joint state.
let conf_state = match entry.entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch when either of conditions is met:
// - conf_change will leave the joint state;
// - or it will be applied without even entering one.
if cc.leave_joint() || cc.enter_joint().is_none() {
latch_unlock();
// ConfChangeTransition::Auto implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
let raft_id = &raw_node.raft.id;
let voters_old = Storage::voters().unwrap();
if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) {
event::broadcast_when(Event::Demoted, Event::LeaveJointState).ok();
Storage::persist_conf_state(&conf_state).unwrap();
}
fn handle_read_states(
read_states: Vec<raft::ReadState>,
notifications: &mut HashMap<LogicalClock, Notify>,
) {
for rs in read_states {
let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
Ok(Some(v)) => v,
Ok(None) => continue,
Err(_) => {
tlog!(Error, "abnormal entry, read_state = {rs:?}");
continue;
}
};
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.notify_ok(rs.index);
}
}
}
fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
}
}
}
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
status: Rc<RefCell<Status>>,
mut raw_node: RawNode,
) {
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
.handler_name(".raft_interact")
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
for peer in Storage::peers().unwrap() {
pool.connect(peer.raft_id, peer.peer_address);
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = {
let id = Storage::raft_id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
LogicalClock::new(id, gen)
};
let mut joint_state_latch: Option<JointStateLatch> = None;
let topology_cache = crate::cache::CachedCell::<u64, Topology>::new();
// let mut topology: Option<(u64, Topology)> = None;
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
for req in main_inbox.receive_all(Node::TICK) {
match req {
NormalRequest::ProposeNormal { op, notify } => {
lc.inc();
let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
if let Err(e) = raw_node.propose(ctx, vec![]) {
} else {
notifications.insert(lc.clone(), notify);
}
NormalRequest::HandleTopologyRequest { req, notify } => {
lc.inc();
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
let e = RaftError::ConfChangeError("not a leader".into());
notify.notify_err(e);
continue;
}
let mut topology =
topology_cache.pop(&raw_node.raft.term).unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor =
Storage::replication_factor().unwrap().unwrap();
Topology::from_peers(peers).with_replication_factor(replication_factor)
});
let peer_result = match req {
TopologyRequest::Join(JoinRequest {
instance_id,
replicaset_id,
advertise_address,
}) => topology.join(
instance_id,
replicaset_id,
advertise_address,
failure_domains,
),
TopologyRequest::UpdatePeer(UpdatePeerRequest {
instance_id,
health,
}) => topology.update_peer(&instance_id, health, failure_domains),
};
let mut peer = match peer_result {
Ok(peer) => peer,
Err(e) => {
notify.notify_err(RaftError::ConfChangeError(e));
topology_cache.put(raw_node.raft.term, topology);
continue;
}
};
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let ctx = traft::EntryContextNormal {
op: traft::Op::PersistPeer { peer },
lc: lc.clone(),
};
if let Err(e) = raw_node.propose(ctx.to_bytes(), vec![]) {
notify.notify_err(e);
} else {
notifications.insert(lc.clone(), notify);
topology_cache.put(raw_node.raft.term, topology);
}
}
NormalRequest::ProposeConfChange {
term,
notify,
} => {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
break None;
};
if let Some(e) = reason {
let e = RaftError::ConfChangeError(e.into());
let prev_index = raw_node.raft.raft_log.last_index();
if let Err(e) = raw_node.propose_conf_change(vec![], conf_change) {
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
joint_state_latch = Some(JointStateLatch {
index: last_index,
notify,
});
}
}
NormalRequest::ReadIndex { notify } => {
lc.inc();
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
let term_just_started = raw_node.raft.state == RaftStateRole::Leader
&& !raw_node.raft.commit_to_current_term();
if leader_doesnt_exist || term_just_started {
notify.notify_err(RaftError::ProposalDropped);
continue;
}
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc: lc.clone(),
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
notifications.insert(lc.clone(), notify);
}
NormalRequest::Campaign { notify } => match raw_node.campaign() {
Ok(()) => notify.notify_ok(()),
Err(e) => notify.notify_err(e),
},
if msg.to != raw_node.raft.id {
continue;
}
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
NormalRequest::Tick { n_times, notify } => {
for _ in 0..n_times {
raw_node.tick();
}
let now = Instant::now();
if now > next_tick {
next_tick = now + Node::TICK;
raw_node.tick();
// Get the `Ready` with `RawNode::ready` interface.
if !raw_node.has_ready() {
continue;
}
let mut ready: raft::Ready = raw_node.ready();
let mut topology_changed = false;
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
// Storage::apply_snapshot(ready.snapshot()).unwrap();
}
let committed_entries = ready.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
&mut topology_changed,
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
Storage::persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
Storage::persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = match ss.leader_id {
INVALID_ID => None,
id => Some(id),
};
status.raft_state = format!("{:?}", ss.raft_state);
event::broadcast(Event::StatusChanged);
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
}
let read_states = ready.take_read_states();
handle_read_states(read_states, &mut notifications);
// Advance the Raft.
let mut light_rd = raw_node.advance(ready);
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
&mut topology_changed,
// Advance the apply index.
raw_node.advance_apply();
Ok(())
})
.unwrap();
event::broadcast(Event::TopologyChanged);
if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() {
let mut box_cfg = crate::tarantool::cfg().unwrap();
assert_eq!(box_cfg.replication_connect_quorum, 0);
box_cfg.replication =
traft::Storage::box_replication(&peer.replicaset_id, None).unwrap();
crate::tarantool::set_cfg(&box_cfg);
}
}
fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) {
if status.borrow().raft_state != "Leader" {
event::wait(Event::StatusChanged).expect("Events system must be initialized");
let term = Storage::term().unwrap().unwrap_or(0);
let voter_ids: HashSet<RaftId> = HashSet::from_iter(Storage::voters().unwrap());
let learner_ids: HashSet<RaftId> = HashSet::from_iter(Storage::learners().unwrap());
let peer_is_active: HashMap<RaftId, bool> = Storage::peers()
.map(|peer| (peer.raft_id, peer.is_active()))
let (active_voters, to_demote): (Vec<RaftId>, Vec<RaftId>) = voter_ids
.iter()
.partition(|id| peer_is_active.get(id).copied().unwrap_or(false));
let active_learners: Vec<RaftId> = learner_ids
.iter()
.copied()
.filter(|id| peer_is_active.get(id).copied().unwrap_or(false))
.collect();
let new_peers: Vec<RaftId> = peer_is_active
.iter()
.map(|(&id, _)| id)
.filter(|id| !voter_ids.contains(id) && !learner_ids.contains(id))
.collect();
let mut changes: Vec<raft::ConfChangeSingle> = Vec::new();
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
const VOTER: bool = true;
const LEARNER: bool = false;
changes.extend(
to_demote
.into_iter()
.map(|id| conf_change_single(id, LEARNER)),
);
let total_active = active_voters.len() + active_learners.len() + new_peers.len();
let new_peers_to_promote;
match failover::voters_needed(active_voters.len(), total_active) {
0 => {
new_peers_to_promote = 0;
}
pos @ 1..=i64::MAX => {
let pos = pos as usize;
if pos < active_learners.len() {
for &raft_id in &active_learners[0..pos] {
changes.push(conf_change_single(raft_id, VOTER))
}
new_peers_to_promote = 0;
} else {
for &raft_id in &active_learners {
changes.push(conf_change_single(raft_id, VOTER))
}
new_peers_to_promote = pos - active_learners.len();
assert!(new_peers_to_promote <= new_peers.len());
for &raft_id in &new_peers[0..new_peers_to_promote] {
changes.push(conf_change_single(raft_id, VOTER))
}
}
neg @ i64::MIN..=-1 => {
let neg = -neg as usize;
assert!(neg < active_voters.len());
for &raft_id in &active_voters[0..neg] {
changes.push(conf_change_single(raft_id, LEARNER))
}
new_peers_to_promote = 0;
}
}
for &raft_id in &new_peers[new_peers_to_promote..] {
changes.push(conf_change_single(raft_id, LEARNER))
event::wait(Event::TopologyChanged).expect("Events system must be initialized");
let conf_change = raft::ConfChangeV2 {
transition: raft::ConfChangeTransition::Auto,
changes: changes.into(),
..Default::default()
};
let (rx, tx) = Notify::new().into_clones();
main_inbox.send(NormalRequest::ProposeConfChange {
term,
notify: tx,
});
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
// after the node leaves the joint state.
Ok(()) => tlog!(Info, "conf_change processed"),
Err(e) => {
tlog!(Warning, "conf_change failed: {e}");
fiber::sleep(Duration::from_secs(1));
}
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
let change_type = if is_voter {
raft::ConfChangeType::AddNode
} else {
raft::ConfChangeType::AddLearnerNode
};
raft::ConfChangeSingle {
change_type,
node_id,
..Default::default()
}
}
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
pub fn global() -> Result<&'static Node, Error> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
for pb in pbs {
node.step(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;