Newer
Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::tarantool::transaction::start_transaction;
use std::collections::HashMap;
use std::convert::TryFrom;
use thiserror::Error;
use crate::traft::ContextCoercion as _;
use crate::traft::Peer;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use protobuf::ProtobufEnum as _;
use crate::traft::TopologyRequest;
use crate::traft::{JoinRequest, JoinResponse};
type Notify = fiber::Channel<Result<u64, RaftError>>;
type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>;
#[derive(Debug, Error)]
pub enum Error {
#[error("uninitialized yet")]
Uninitialized,
#[error("timeout")]
Timeout,
#[error("{0}")]
/// cluster_id of the joining peer mismatches the cluster_id of the cluster
#[error("cannot join the instance to the cluster: cluster_id mismatch: cluster_id of the instance = {instance_cluster_id:?}, cluster_id of the cluster = {cluster_cluster_id:?}")]
ClusterIdMismatch {
instance_cluster_id: String,
cluster_cluster_id: String,
},
#[derive(Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
/// `raft_id` of the current instance
/// `raft_id` of the leader instance
/// The heart of `traft` module - the Node.
_main_loop: fiber::UnitJoinHandle<'static>,
_join_loop: fiber::UnitJoinHandle<'static>,
main_inbox: Mailbox<NormalRequest>,
join_inbox: TopologyMailbox,
/// A request to the raft main loop.
/// Propose `raft::prelude::Entry` of `EntryNormal` kind.
/// Make a notification when it's committed.
ProposeNormal { op: traft::Op, notify: Notify },
/// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
/// Make a notification when the second EntryConfChange is
/// committed (that corresponds to the leaving of a joint state).
to_replace: Vec<(u64, traft::Peer)>,
/// Get a read barrier. In some systems it's also called the "quorum read".
/// Make a notification when index is read.
ReadIndex { notify: Notify },
/// Start a new raft term .
/// Make a notification when request is processed.
Campaign { notify: Notify },
/// Handle message from anoher raft node.
/// Tick the node.
/// Make a notification when request is processed.
Tick { n_times: u32, notify: Notify },
pub const TICK: Duration = Duration::from_millis(100);
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let status_cond = Rc::new(fiber::Cond::new());
let main_inbox = Mailbox::<NormalRequest>::new();
let join_inbox = TopologyMailbox::new();
let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
}));
let main_loop_fn = {
let status = status.clone();
let status_cond = status_cond.clone();
let main_inbox = main_inbox.clone();
move || raft_main_loop(main_inbox, status, status_cond, raw_node)
};
let join_loop_fn = {
let main_inbox = main_inbox.clone();
let join_inbox = join_inbox.clone();
move || raft_join_loop(join_inbox, main_inbox)
};
let node = Node {
main_inbox,
join_inbox,
status,
status_cond,
_main_loop: fiber::Builder::new()
.name("raft_main_loop")
.proc(main_loop_fn)
.start()
.unwrap(),
_join_loop: fiber::Builder::new()
.name("raft_join_loop")
.proc(join_loop_fn)
.start()
.unwrap(),
};
// Wait for the node to enter the main loop
pub fn status(&self) -> Status {
self.status.borrow().clone()
}
pub fn mark_as_ready(&self) {
self.status.borrow_mut().is_ready = true;
self.status_cond.broadcast();
}
pub fn wait_status(&self) {
self.status_cond.wait();
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = Notify::new(1).into_clones();
self.main_inbox
.send(NormalRequest::ReadIndex { notify: tx });
match rx.recv_timeout(timeout) {
Ok(v) => v.map_err(|e| e.into()),
Err(_) => {
}
pub fn propose(&self, op: traft::Op, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = fiber::Channel::new(1).into_clones();
self.main_inbox
.send(NormalRequest::ProposeNormal { op, notify: tx });
match rx.recv_timeout(timeout) {
Ok(v) => v.map_err(|e| e.into()),
let (rx, tx) = fiber::Channel::new(1).into_clones();
let req = NormalRequest::Campaign { notify: tx };
if let Some(Err(e)) = rx.recv() {
tlog!(Error, "{e}");
}
pub fn step(&self, msg: raft::Message) {
let req = NormalRequest::Step(msg);
self.main_inbox.send(req);
}
pub fn tick(&self, n_times: u32) {
let (rx, tx) = fiber::Channel::new(1).into_clones();
let req = NormalRequest::Tick {
n_times,
notify: tx,
};
}
pub fn timeout_now(&self) {
self.step(raft::Message {
msg_type: raft::MessageType::MsgTimeoutNow,
..Default::default()
})
pub fn change_topology(&self, req: impl Into<TopologyRequest>) -> Result<u64, RaftError> {
let (rx, tx) = fiber::Channel::new(1).into_clones();
self.join_inbox.send((req.into(), tx));
rx.recv().expect("that's a bug")
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
.handler_name(".raft_interact")
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
for peer in Storage::peers().unwrap() {
pool.connect(peer.raft_id, peer.peer_address);
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = {
let id = Storage::id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
LogicalClock::new(id, gen)
};
let mut joint_state_latch: Option<JointStateLatch> = None;
struct JointStateLatch {
index: u64,
notify: Notify,
}
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
for req in main_inbox.receive_all(Node::TICK) {
match req {
NormalRequest::ProposeNormal { op, notify } => {
lc.inc();
let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
if let Err(e) = raw_node.propose(ctx, vec![]) {
notify.try_send(Err(e)).expect("that's a bug");
} else {
notifications.insert(lc.clone(), notify);
}
NormalRequest::ProposeConfChange {
term,
peers,
notify,
} => {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Raft-rs allows proposing ConfChange from any node, but it may cause
// inconsistent raft_id generation. In picodata only the leader is
// permitted to step a ProposeConfChange message.
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
break None;
};
if let Some(e) = reason {
let e = RaftError::ConfChangeError(e.into());
notify.try_send(Err(e)).expect("that's a bug");
continue;
}
let mut changes = Vec::with_capacity(peers.len());
let mut new_peers: Vec<Peer> = Vec::new();
for peer in &peers {
let change_type = match peer.voter {
true => raft::ConfChangeType::AddNode,
false => raft::ConfChangeType::AddLearnerNode,
};
changes.push(raft::ConfChangeSingle {
change_type,
node_id: peer.raft_id,
..Default::default()
});
new_peers.push(peer.clone());
}
for (old_raft_id, peer) in &to_replace {
changes.push(raft::ConfChangeSingle {
change_type: raft::ConfChangeType::RemoveNode,
node_id: *old_raft_id,
..Default::default()
});
let change_type = match peer.voter {
true => raft::ConfChangeType::AddNode,
false => raft::ConfChangeType::AddLearnerNode,
};
changes.push(raft::ConfChangeSingle {
change_type,
node_id: peer.raft_id,
..Default::default()
});
new_peers.push(peer.clone());
}
let cc = raft::ConfChangeV2 {
changes: changes.into(),
transition: raft::ConfChangeTransition::Implicit,
..Default::default()
};
let ctx = traft::EntryContextConfChange { peers: new_peers }.to_bytes();
let prev_index = raw_node.raft.raft_log.last_index();
if let Err(e) = raw_node.propose_conf_change(ctx, cc) {
notify.try_send(Err(e)).expect("that's a bug");
} else {
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert!(last_index == prev_index + 1);
joint_state_latch = Some(JointStateLatch {
index: last_index,
notify,
});
}
}
NormalRequest::ReadIndex { notify } => {
lc.inc();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc: lc.clone(),
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
notifications.insert(lc.clone(), notify);
}
NormalRequest::Campaign { notify } => {
let res = raw_node.campaign().map(|_| 0);
notify.try_send(res).expect("that's a bug");
}
NormalRequest::Step(msg) => {
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
NormalRequest::Tick { n_times, notify } => {
for _ in 0..n_times {
raw_node.tick();
}
notify.try_send(Ok(0)).expect("that's a bug");
let now = Instant::now();
if now > next_tick {
next_tick = now + Node::TICK;
raw_node.tick();
// Get the `Ready` with `RawNode::ready` interface.
if !raw_node.has_ready() {
continue;
}
let mut ready: raft::Ready = raw_node.ready();
fn handle_read_states(
read_states: Vec<raft::ReadState>,
notifications: &mut HashMap<LogicalClock, Notify>,
) {
for rs in read_states {
if let Some(ctx) = traft::EntryContextNormal::read_from_bytes(&rs.request_ctx)
.expect("Abnormal entry in message context")
{
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.try_send(Ok(rs.index)).ok();
fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
) {
for entry in entries
.iter()
.map(|e| traft::Entry::try_from(e).expect("wtf"))
{
match raft::EntryType::from_i32(entry.entry_type) {
Some(raft::EntryType::EntryNormal) => {
use traft::Op::*;
match entry.op() {
None => (),
Some(Nop) => (),
Some(Info { msg }) => tlog!(Info, "{msg}"),
Some(EvalLua { code }) => crate::tarantool::eval(code),
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
// The notification may already have timed out.
// Don't panic. Just ignore `try_send` error.
if let Some(latch) = joint_state_latch {
if entry.index == latch.index {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("ignored".into());
// The `raft_join_loop` waits forever and never closes
// the notification channel. Panic if `try_send` fails.
latch.notify.try_send(Err(e)).expect("that's a bug");
}
*joint_state_latch = None;
}
}
Some(entry_type @ raft::EntryType::EntryConfChange)
| Some(entry_type @ raft::EntryType::EntryConfChangeV2) => {
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
Storage::persist_peer_by_instance_id(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
// Beware: this tiny difference in type names
// (`V2` or not `V2`) makes a significant
// difference in `entry.data` binary layout
// and in joint state transitions.
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch only when leaving the joint state
if cc.changes.is_empty() {
if let Some(latch) = joint_state_latch {
latch
.notify
.try_send(Ok(entry.index))
.expect("that's a bug");
*joint_state_latch = None;
}
}
// ConfChangeTransition::Implicit implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
Storage::persist_conf_state(&cs).unwrap();
}
None => unreachable!(),
Storage::persist_applied(entry.index).unwrap();
}
let mut config_changed = false;
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
// Storage::apply_snapshot(ready.snapshot()).unwrap();
}
let committed_entries = ready.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
Storage::persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
Storage::persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = match ss.leader_id {
INVALID_ID => None,
id => Some(id),
};
status.raft_state = format!("{:?}", ss.raft_state);
status_cond.broadcast();
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
}
let read_states = ready.take_read_states();
handle_read_states(read_states, &mut notifications);
// Advance the Raft.
let mut light_rd = raw_node.advance(ready);
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
// Advance the apply index.
raw_node.advance_apply();
Ok(())
})
.unwrap();
if config_changed {
if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() {
let mut box_cfg = crate::tarantool::cfg().unwrap();
assert_eq!(box_cfg.replication_connect_quorum, 0);
box_cfg.replication =
traft::Storage::box_replication(&peer.replicaset_id, None).unwrap();
crate::tarantool::set_cfg(&box_cfg);
}
}
fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) {
let batch = inbox.receive_all(Duration::MAX);
let ids: Vec<_> = batch.iter().map(|(req, _)| req.instance_id()).collect();
tlog!(Info, "processing batch: {ids:?}");
let term = Storage::term().unwrap().unwrap_or(0);
let mut topology = match Storage::peers() {
Ok(v) => Topology::from_peers(v).with_replication_factor(2),
Err(e) => {
for (_, notify) in batch {
let e = RaftError::ConfChangeError(format!("{e}"));
notify.try_send(Err(e)).ok();
for (req, notify) in &batch {
if let Err(e) = topology.process(req) {
let e = RaftError::ConfChangeError(e);
notify.try_send(Err(e)).expect("that's a bug");
}
let (rx, tx) = fiber::Channel::new(1).into_clones();
main_inbox.send(NormalRequest::ProposeConfChange {
term,
to_replace: topology.to_replace(),
notify: tx,
});
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
// after the node leaves the joint state.
let res = rx.recv().expect("that's a bug");
tlog!(Info, "batch processed: {ids:?}, {res:?}");
for (_, notify) in batch {
// RaftError doesn't implement the Clone trait,
// so we have to be creative.
match &res {
Ok(v) => notify.try_send(Ok(*v)).ok(),
Err(e) => {
let e = RaftError::ConfChangeError(format!("{e}"));
notify.try_send(Err(e)).ok()
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
pub fn global() -> Result<&'static Node, Error> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}
#[proc(packed_args)]
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> {
for pb in pbs {
node.step(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;
if req.cluster_id != cluster_id {
return Err(Box::new(Error::ClusterIdMismatch {
instance_cluster_id: req.cluster_id,
cluster_cluster_id: cluster_id,
}));
}
node.change_topology(req)?;
let peer = Storage::peer_by_instance_id(&instance_id)?
.ok_or("the peer has misteriously disappeared")?;
let raft_group = Storage::peers()?;
let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?;
Ok(JoinResponse {
peer,
raft_group,
box_replication,
})