Newer
Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::tarantool::transaction::start_transaction;
use std::collections::HashMap;
use std::convert::TryFrom;
use thiserror::Error;
use crate::traft::ContextCoercion as _;
use crate::traft::Peer;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use crate::traft::TopologyRequest;
use crate::traft::{JoinRequest, JoinResponse};
type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>;
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
#[derive(Clone)]
struct Notify {
ch: fiber::Channel<Result<Box<dyn Any>, Error>>,
}
impl Notify {
fn new() -> Self {
Self {
ch: fiber::Channel::new(1),
}
}
fn notify_ok_any(&self, res: Box<dyn Any>) {
self.ch.try_send(Ok(res)).ok();
}
fn notify_ok<T: Any>(&self, res: T) {
println!("notify_ok {}", type_name::<T>());
self.notify_ok_any(Box::new(res));
}
fn notify_err<E: Into<Error>>(&self, err: E) {
self.ch.try_send(Err(err.into())).ok();
}
fn recv_any(self) -> Result<Box<dyn Any>, Error> {
match self.ch.recv() {
Some(v) => v,
None => {
self.ch.close();
Err(Error::Timeout)
}
}
}
fn recv_timeout_any(self, timeout: Duration) -> Result<Box<dyn Any>, Error> {
match self.ch.recv_timeout(timeout) {
Ok(v) => v,
Err(_) => {
self.ch.close();
Err(Error::Timeout)
}
}
}
fn recv_timeout<T: 'static>(self, timeout: Duration) -> Result<T, Error> {
let any: Box<dyn Any> = self.recv_timeout_any(timeout)?;
let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?;
Ok(*boxed)
}
fn recv<T: 'static>(self) -> Result<T, Error> {
let any: Box<dyn Any> = self.recv_any()?;
let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?;
Ok(*boxed)
}
fn is_closed(&self) -> bool {
self.ch.is_closed()
}
}
impl std::fmt::Debug for Notify {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.debug_struct("Notify").finish_non_exhaustive()
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("uninitialized yet")]
Uninitialized,
#[error("timeout")]
Timeout,
#[error("{0}")]
#[error("downcast error")]
DowncastError,
/// cluster_id of the joining peer mismatches the cluster_id of the cluster
#[error("cannot join the instance to the cluster: cluster_id mismatch: cluster_id of the instance = {instance_cluster_id:?}, cluster_id of the cluster = {cluster_cluster_id:?}")]
ClusterIdMismatch {
instance_cluster_id: String,
cluster_cluster_id: String,
},
#[derive(Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
/// `raft_id` of the current instance
/// `raft_id` of the leader instance
/// The heart of `traft` module - the Node.
_main_loop: fiber::UnitJoinHandle<'static>,
_join_loop: fiber::UnitJoinHandle<'static>,
main_inbox: Mailbox<NormalRequest>,
join_inbox: TopologyMailbox,
/// A request to the raft main loop.
/// Propose `raft::prelude::Entry` of `EntryNormal` kind.
/// Make a notification when it's committed.
ProposeNormal { op: traft::Op, notify: Notify },
/// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
/// Make a notification when the second EntryConfChange is
/// committed (that corresponds to the leaving of a joint state).
to_replace: Vec<(u64, traft::Peer)>,
/// Get a read barrier. In some systems it's also called the "quorum read".
/// Make a notification when index is read.
ReadIndex { notify: Notify },
/// Start a new raft term .
/// Make a notification when request is processed.
Campaign { notify: Notify },
/// Handle message from anoher raft node.
/// Tick the node.
/// Make a notification when request is processed.
Tick { n_times: u32, notify: Notify },
pub const TICK: Duration = Duration::from_millis(100);
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let status_cond = Rc::new(fiber::Cond::new());
let main_inbox = Mailbox::<NormalRequest>::new();
let join_inbox = TopologyMailbox::new();
let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
}));
let main_loop_fn = {
let status = status.clone();
let status_cond = status_cond.clone();
let main_inbox = main_inbox.clone();
move || raft_main_loop(main_inbox, status, status_cond, raw_node)
};
let join_loop_fn = {
let main_inbox = main_inbox.clone();
let join_inbox = join_inbox.clone();
move || raft_join_loop(join_inbox, main_inbox)
};
let node = Node {
main_inbox,
join_inbox,
status,
status_cond,
_main_loop: fiber::Builder::new()
.name("raft_main_loop")
.proc(main_loop_fn)
.start()
.unwrap(),
_join_loop: fiber::Builder::new()
.name("raft_join_loop")
.proc(join_loop_fn)
.start()
.unwrap(),
};
// Wait for the node to enter the main loop
pub fn status(&self) -> Status {
self.status.borrow().clone()
}
pub fn mark_as_ready(&self) {
self.status.borrow_mut().is_ready = true;
self.status_cond.broadcast();
}
pub fn wait_status(&self) {
self.status_cond.wait();
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = Notify::new().into_clones();
self.main_inbox
.send(NormalRequest::ReadIndex { notify: tx });
pub fn propose<T: OpResult + Into<traft::Op>>(
&self,
op: T,
timeout: Duration,
) -> Result<T::Result, Error> {
let (rx, tx) = Notify::new().into_clones();
self.main_inbox.send(NormalRequest::ProposeNormal {
op: op.into(),
notify: tx,
});
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Campaign { notify: tx };
if let Err(e) = rx.recv_any() {
pub fn step(&self, msg: raft::Message) {
let req = NormalRequest::Step(msg);
self.main_inbox.send(req);
}
pub fn tick(&self, n_times: u32) {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Tick {
n_times,
notify: tx,
};
}
pub fn timeout_now(&self) {
self.step(raft::Message {
msg_type: raft::MessageType::MsgTimeoutNow,
..Default::default()
})
pub fn change_topology(&self, req: impl Into<TopologyRequest>) -> Result<traft::RaftId, Error> {
let (rx, tx) = Notify::new().into_clones();
self.join_inbox.send((req.into(), tx));
struct JointStateLatch {
index: u64,
notify: Notify,
}
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
config_changed: &mut bool,
) {
for entry in &entries {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
tlog!(
Error,
"error parsing (and applying) an entry: {e}, entry = {entry:?}"
);
continue;
}
};
match entry.entry_type {
raft::EntryType::EntryNormal => {
handle_committed_normal_entry(entry, notifications, joint_state_latch)
}
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
handle_committed_conf_change(
entry,
raw_node,
pool,
joint_state_latch,
config_changed,
)
}
}
}
if let Some(last_entry) = entries.last() {
if let Err(e) = Storage::persist_applied(last_entry.index) {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => last_entry.index
);
};
}
}
fn handle_committed_normal_entry(
entry: traft::Entry,
notifications: &mut HashMap<LogicalClock, Notify>,
joint_state_latch: &mut Option<JointStateLatch>,
) {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit();
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(latch) = joint_state_latch {
if entry.index == latch.index {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
latch.notify.notify_err(e);
}
*joint_state_latch = None;
}
}
fn handle_committed_conf_change(
entry: traft::Entry,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
joint_state_latch: &mut Option<JointStateLatch>,
config_changed: &mut bool,
) {
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
};
Storage::persist_peer_by_instance_id(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
}
// Beware: this tiny difference in type names
// (`V2` or not `V2`) makes a significant
// difference in `entry.data` binary layout
// and in joint state transitions.
let conf_state;
if entry.entry_type == raft::EntryType::EntryConfChange {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
*config_changed = true;
conf_state = raw_node.apply_conf_change(&cc).unwrap();
} else if entry.entry_type == raft::EntryType::EntryConfChangeV2 {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch only when leaving the joint state
if cc.changes.is_empty() {
if let Some(latch) = joint_state_latch {
latch.notify.notify_ok(entry.index);
*joint_state_latch = None;
*config_changed = true;
}
}
// ConfChangeTransition::Implicit implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
conf_state = raw_node.apply_conf_change(&cc).unwrap()
} else {
unreachable!();
};
Storage::persist_conf_state(&conf_state).unwrap();
}
fn handle_read_states(
read_states: Vec<raft::ReadState>,
notifications: &mut HashMap<LogicalClock, Notify>,
) {
for rs in read_states {
let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
Ok(Some(v)) => v,
Ok(None) => continue,
Err(_) => {
tlog!(Error, "abnormal entry, read_state = {rs:?}");
continue;
}
};
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.notify_ok(rs.index);
}
}
}
fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
}
}
}
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
.handler_name(".raft_interact")
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
for peer in Storage::peers().unwrap() {
pool.connect(peer.raft_id, peer.peer_address);
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = {
let id = Storage::id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
LogicalClock::new(id, gen)
};
let mut joint_state_latch: Option<JointStateLatch> = None;
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
for req in main_inbox.receive_all(Node::TICK) {
match req {
NormalRequest::ProposeNormal { op, notify } => {
lc.inc();
let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
if let Err(e) = raw_node.propose(ctx, vec![]) {
} else {
notifications.insert(lc.clone(), notify);
}
NormalRequest::ProposeConfChange {
term,
peers,
notify,
} => {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Raft-rs allows proposing ConfChange from any node, but it may cause
// inconsistent raft_id generation. In picodata only the leader is
// permitted to step a ProposeConfChange message.
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
break None;
};
if let Some(e) = reason {
let e = RaftError::ConfChangeError(e.into());
let mut changes = Vec::with_capacity(peers.len());
let mut new_peers: Vec<Peer> = Vec::new();
for peer in &peers {
let change_type = match peer.voter {
true => raft::ConfChangeType::AddNode,
false => raft::ConfChangeType::AddLearnerNode,
};
changes.push(raft::ConfChangeSingle {
change_type,
node_id: peer.raft_id,
..Default::default()
});
new_peers.push(peer.clone());
}
for (old_raft_id, peer) in &to_replace {
changes.push(raft::ConfChangeSingle {
change_type: raft::ConfChangeType::RemoveNode,
node_id: *old_raft_id,
..Default::default()
});
let change_type = match peer.voter {
true => raft::ConfChangeType::AddNode,
false => raft::ConfChangeType::AddLearnerNode,
};
changes.push(raft::ConfChangeSingle {
change_type,
node_id: peer.raft_id,
..Default::default()
});
new_peers.push(peer.clone());
}
let cc = raft::ConfChangeV2 {
changes: changes.into(),
transition: raft::ConfChangeTransition::Implicit,
..Default::default()
};
let ctx = traft::EntryContextConfChange { peers: new_peers }.to_bytes();
let prev_index = raw_node.raft.raft_log.last_index();
if let Err(e) = raw_node.propose_conf_change(ctx, cc) {
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert!(last_index == prev_index + 1);
joint_state_latch = Some(JointStateLatch {
index: last_index,
notify,
});
}
}
NormalRequest::ReadIndex { notify } => {
lc.inc();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc: lc.clone(),
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
notifications.insert(lc.clone(), notify);
}
NormalRequest::Campaign { notify } => match raw_node.campaign() {
Ok(()) => notify.notify_ok(()),
Err(e) => notify.notify_err(e),
},
NormalRequest::Step(msg) => {
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
NormalRequest::Tick { n_times, notify } => {
for _ in 0..n_times {
raw_node.tick();
}
let now = Instant::now();
if now > next_tick {
next_tick = now + Node::TICK;
raw_node.tick();
// Get the `Ready` with `RawNode::ready` interface.
if !raw_node.has_ready() {
continue;
}
let mut ready: raft::Ready = raw_node.ready();
let mut config_changed = false;
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
// Storage::apply_snapshot(ready.snapshot()).unwrap();
}
let committed_entries = ready.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
Storage::persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
Storage::persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = match ss.leader_id {
INVALID_ID => None,
id => Some(id),
};
status.raft_state = format!("{:?}", ss.raft_state);
status_cond.broadcast();
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
}
let read_states = ready.take_read_states();
handle_read_states(read_states, &mut notifications);
// Advance the Raft.
let mut light_rd = raw_node.advance(ready);
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
// Advance the apply index.
raw_node.advance_apply();
Ok(())
})
.unwrap();
if config_changed {
if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() {
let mut box_cfg = crate::tarantool::cfg().unwrap();
assert_eq!(box_cfg.replication_connect_quorum, 0);
box_cfg.replication =
traft::Storage::box_replication(&peer.replicaset_id, None).unwrap();
crate::tarantool::set_cfg(&box_cfg);
}
}
fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) {
let batch = inbox.receive_all(Duration::MAX);
let term = Storage::term().unwrap().unwrap_or(0);
let mut topology = match Storage::peers() {
Ok(v) => Topology::from_peers(v).with_replication_factor(2),
Err(e) => {
for (_, notify) in batch {
let e = RaftError::ConfChangeError(format!("{e}"));
let mut topology_results = vec![];
match topology.process(req) {
Ok(peer) => {
topology_results.push((notify, peer));
}
Err(e) => {
let e = RaftError::ConfChangeError(e);
let topology_diff = topology.diff();
let topology_to_replace = topology.to_replace();
let mut ids: Vec<String> = vec![];
for peer in &topology_diff {
ids.push(peer.instance_id.clone());
}
for (_, peer) in &topology_to_replace {
ids.push(peer.instance_id.clone());
}
tlog!(Info, "processing batch: {ids:?}");
let (rx, tx) = Notify::new().into_clones();
main_inbox.send(NormalRequest::ProposeConfChange {
term,
peers: topology_diff,
to_replace: topology_to_replace,
notify: tx,
});
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
// after the node leaves the joint state.
let res = rx.recv::<u64>();
tlog!(Info, "batch processed: {ids:?}, {res:?}");
for (notify, peer) in topology_results {
Ok(_) => notify.notify_ok(peer.raft_id),
// RaftError doesn't implement the Clone trait,
// so we have to be creative.
let e = RaftError::ConfChangeError(format!("{e}"));
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
pub fn global() -> Result<&'static Node, Error> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}
#[proc(packed_args)]
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> {
for pb in pbs {
node.step(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;
if req.cluster_id != cluster_id {
return Err(Box::new(Error::ClusterIdMismatch {
instance_cluster_id: req.cluster_id,
cluster_cluster_id: cluster_id,
}));
}
let raft_id = node.change_topology(req)?;
let peer = Storage::peer_by_raft_id(raft_id)?.ok_or("the peer has misteriously disappeared")?;
let raft_group = Storage::peers()?;
let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?;
Ok(JoinResponse {
peer,
raft_group,
box_replication,
})