Newer
Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::tarantool::transaction::start_transaction;
use ::tarantool::tuple::AsTuple;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryFrom;
use thiserror::Error;
use crate::traft::ContextCoercion as _;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use protobuf::ProtobufEnum as _;
type Notify = fiber::Channel<Result<u64, RaftError>>;
#[derive(Debug, Error)]
pub enum Error {
#[error("uninitialized yet")]
Uninitialized,
#[error("timeout")]
Timeout,
#[error("{0}")]
#[derive(Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
pub id: u64,
pub leader_id: u64,
pub raft_state: String,
/// The heart of `traft` module - the Node.
_main_loop: fiber::UnitJoinHandle<'static>,
_join_loop: fiber::UnitJoinHandle<'static>,
main_inbox: Mailbox<NormalRequest>,
join_inbox: Mailbox<(JoinRequest, Notify)>,
/// A request to the raft main loop.
/// Propose `raft::prelude::Entry` of `EntryNormal` kind.
/// Make a notification when it's committed.
ProposeNormal { op: traft::Op, notify: Notify },
/// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
/// Make a notification when it's committed.
ProposeConfChange {
peers: Vec<traft::Peer>,
notify: Notify,
},
/// Get a read barrier. In some systems it's also called the "quorum read".
/// Make a notification when index is read.
ReadIndex { notify: Notify },
/// Start a new raft term .
/// Make a notification when request is processed.
Campaign { notify: Notify },
/// Handle message from anoher raft node.
/// Tick the node.
/// Make a notification when request is processed.
Tick { n_times: u32, notify: Notify },
pub const TICK: Duration = Duration::from_millis(100);
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let status_cond = Rc::new(fiber::Cond::new());
let main_inbox = Mailbox::<NormalRequest>::new();
let join_inbox = Mailbox::<(JoinRequest, Notify)>::new();
let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
id: cfg.id,
leader_id: 0,
raft_state: "Follower".into(),
}));
let main_loop_fn = {
let status = status.clone();
let status_cond = status_cond.clone();
let main_inbox = main_inbox.clone();
move || raft_main_loop(main_inbox, status, status_cond, raw_node)
};
let join_loop_fn = {
let main_inbox = main_inbox.clone();
let join_inbox = join_inbox.clone();
move || raft_join_loop(join_inbox, main_inbox)
};
let node = Node {
main_inbox,
join_inbox,
status,
status_cond,
_main_loop: fiber::start_proc(main_loop_fn),
_join_loop: fiber::start_proc(join_loop_fn),
};
// Wait for the node to enter the main loop
pub fn status(&self) -> Status {
self.status.borrow().clone()
}
pub fn wait_status(&self) {
self.status_cond.wait();
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = Notify::new(1).into_clones();
self.main_inbox
.send(NormalRequest::ReadIndex { notify: tx });
match rx.recv_timeout(timeout) {
Ok(v) => v.map_err(|e| e.into()),
Err(_) => {
}
pub fn propose(&self, op: traft::Op, timeout: Duration) -> Result<u64, Error> {
let (rx, tx) = fiber::Channel::new(1).into_clones();
self.main_inbox
.send(NormalRequest::ProposeNormal { op, notify: tx });
match rx.recv_timeout(timeout) {
Ok(v) => v.map_err(|e| e.into()),
let (rx, tx) = fiber::Channel::new(1).into_clones();
let req = NormalRequest::Campaign { notify: tx };
if let Some(Err(e)) = rx.recv() {
tlog!(Error, "{e}");
}
pub fn step(&self, msg: raft::Message) {
let req = NormalRequest::Step(msg);
self.main_inbox.send(req);
}
pub fn tick(&self, n_times: u32) {
let (rx, tx) = fiber::Channel::new(1).into_clones();
let req = NormalRequest::Tick {
n_times,
notify: tx,
};
}
pub fn timeout_now(&self) {
self.step(raft::Message {
msg_type: raft::MessageType::MsgTimeoutNow,
..Default::default()
})
pub fn join_one(&self, req: JoinRequest, timeout: Duration) -> Result<u64, RaftError> {
let (rx, tx) = fiber::Channel::new(1).into_clones();
self.join_inbox.send((req, tx));
match rx.recv_timeout(timeout) {
Ok(Ok(commit_index)) => Ok(commit_index),
Ok(Err(e)) => Err(e),
Err(_) => Err(RaftError::ConfChangeError("timeout".into())),
}
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
.handler_name(".raft_interact")
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
for peer in Storage::peers().unwrap() {
pool.connect(peer.raft_id, peer.peer_address);
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = {
let id = Storage::id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
LogicalClock::new(id, gen)
};
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
for req in main_inbox.recv_timeout(Node::TICK) {
match req {
NormalRequest::ProposeNormal { op, notify } => {
lc.inc();
let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
if let Err(e) = raw_node.propose(ctx, vec![]) {
notify.try_send(Err(e)).expect("that's a bug");
} else {
notifications.insert(lc.clone(), notify);
}
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
NormalRequest::ProposeConfChange { peers, notify } => {
lc.inc();
let mut changes = Vec::with_capacity(peers.len());
for peer in &peers {
let change_type = match peer.voter {
true => raft::ConfChangeType::AddNode,
false => raft::ConfChangeType::AddLearnerNode,
};
changes.push(raft::ConfChangeSingle {
change_type,
node_id: peer.raft_id,
..Default::default()
});
}
let cc = raft::ConfChangeV2 {
changes: changes.into(),
transition: raft::ConfChangeTransition::Implicit,
..Default::default()
};
let ctx = traft::EntryContextConfChange {
lc: lc.clone(),
peers,
}
.to_bytes();
if let Err(e) = raw_node.propose_conf_change(ctx, cc) {
notify.try_send(Err(e)).expect("that's a bug");
} else {
notifications.insert(lc.clone(), notify);
}
}
NormalRequest::ReadIndex { notify } => {
lc.inc();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc: lc.clone(),
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
notifications.insert(lc.clone(), notify);
}
NormalRequest::Campaign { notify } => {
let res = raw_node.campaign().map(|_| 0);
notify.try_send(res).expect("that's a bug");
}
NormalRequest::Step(msg) => {
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
NormalRequest::Tick { n_times, notify } => {
for _ in 0..n_times {
raw_node.tick();
}
notify.try_send(Ok(0)).expect("that's a bug");
let now = Instant::now();
if now > next_tick {
next_tick = now + Node::TICK;
raw_node.tick();
// Get the `Ready` with `RawNode::ready` interface.
if !raw_node.has_ready() {
continue;
}
let mut ready: raft::Ready = raw_node.ready();
fn handle_read_states(
read_states: Vec<raft::ReadState>,
notifications: &mut HashMap<LogicalClock, Notify>,
) {
for rs in read_states {
if let Some(ctx) = traft::EntryContextNormal::read_from_bytes(&rs.request_ctx)
.expect("Abnormal entry in message context")
{
if let Some(notify) = notifications.remove(&ctx.lc) {
notify.try_send(Ok(rs.index)).ok();
fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
tlog!(Error, "{e}");
fn handle_committed_entries(
entries: Vec<raft::Entry>,
notifications: &mut HashMap<LogicalClock, Notify>,
raw_node: &mut RawNode,
pool: &mut ConnectionPool,
) {
for entry in entries
.iter()
.map(|e| traft::Entry::try_from(e).expect("wtf"))
{
match raft::EntryType::from_i32(entry.entry_type) {
Some(raft::EntryType::EntryNormal) => {
use traft::Op::*;
match entry.op() {
None => (),
Some(Nop) => (),
Some(Info { msg }) => tlog!(Info, "{msg}"),
Some(EvalLua { code }) => crate::tarantool::eval(code),
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
}
Some(entry_type @ raft::EntryType::EntryConfChange)
| Some(entry_type @ raft::EntryType::EntryConfChangeV2) => {
if let Some(lc) = entry.lc() {
if let Some(notify) = notifications.remove(lc) {
notify.try_send(Ok(entry.index)).ok();
}
for peer in entry.iter_peers() {
let peer = traft::Peer {
commit_index: entry.index,
..peer.clone()
Storage::persist_peer(&peer).unwrap();
pool.connect(peer.raft_id, peer.peer_address);
let cs = match entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
raw_node.apply_conf_change(&cc).unwrap()
}
_ => unreachable!(),
};
Storage::persist_conf_state(&cs).unwrap();
}
None => unreachable!(),
Storage::persist_applied(entry.index).unwrap();
}
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
// Storage::apply_snapshot(ready.snapshot()).unwrap();
}
let committed_entries = ready.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
);
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
Storage::persist_entries(e).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
Storage::persist_hard_state(hs).unwrap();
}
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = ss.leader_id;
status.raft_state = format!("{:?}", ss.raft_state);
status_cond.broadcast();
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
}
let read_states = ready.take_read_states();
handle_read_states(read_states, &mut notifications);
// Advance the Raft.
let mut light_rd = raw_node.advance(ready);
start_transaction(|| -> Result<(), TransactionError> {
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut raw_node,
&mut pool,
);
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
// Advance the apply index.
raw_node.advance_apply();
Ok(())
})
.unwrap();
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JoinRequest {
pub instance_id: String,
pub replicaset_id: Option<String>,
pub advertise_address: String,
pub voter: bool,
}
impl AsTuple for JoinRequest {}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct JoinResponse {
pub peer: traft::Peer,
pub raft_group: Vec<traft::Peer>,
pub box_replication: Vec<String>,
// TODO add later:
// Other parameters necessary for box.cfg()
// pub read_only: bool,
}
impl AsTuple for JoinResponse {}
fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<NormalRequest>) {
loop {
let batch = inbox.recv_timeout(Duration::MAX);
// TODO check leadership, else continue
let (rx, tx) = fiber::Channel::new(1).into_clones();
let mut peers = Vec::<traft::Peer>::new();
// TODO gererate raft_id
let mut max_id = Storage::max_peer_id().unwrap();
for (req, notify) in &batch {
// Check if the client is already joined.
// Be greedy when giving instances a new raft_id. We don't want
// rebotstrapped instances to affect quorum for no reason.
let maybe_peer = match Storage::peer_by_instance_id(&req.instance_id) {
Ok(v) => v,
Err(e) => {
notify.try_send(Err(e.into())).expect("that's a bug");
continue;
}
};
let raft_id = match maybe_peer {
Some(peer) => peer.raft_id,
None => {
max_id += 1;
max_id
}
};
// TODO check replicaset_id didn't change
peers.push(traft::Peer {
raft_id,
peer_address: req.advertise_address.clone(),
voter: req.voter,
instance_id: req.instance_id.clone(),
commit_index: 0,
});
main_inbox.send(NormalRequest::ProposeConfChange { peers, notify: tx });
match rx.recv_timeout(Duration::from_millis(200)) {
Ok(Ok(v)) => {
for (_, notify) in &batch {
notify.try_send(Ok(v)).expect("that's a bug");
}
}
for (_, notify) in &batch {
let e = RaftError::ConfChangeError(format!("{e:?}"));
notify.try_send(Err(e)).expect("that's a bug");
}
}
Err(_) => {
for (_, notify) in &batch {
let e = RaftError::ConfChangeError("timeout".into());
notify.try_send(Err(e)).expect("that's a bug");
}
}
static mut RAFT_NODE: Option<&'static Node> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
pub fn global() -> Result<&'static Node, Error> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE }.ok_or(Error::Uninitialized)
}
#[proc(packed_args)]
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> {
for pb in pbs {
node.step(raft::Message::try_from(pb)?);
}
Ok(())
}
#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
tlog!(Warning, "join_one({req:?})");
node.join_one(req.clone(), Duration::MAX)?;
let resp = JoinResponse {
peer: Storage::peer_by_instance_id(&req.instance_id)?.unwrap(),
raft_group: Storage::peers()?,
box_replication: vec![],
};
tlog!(Warning, "resp: {resp:?}");
Ok(resp)
}