Skip to content
Snippets Groups Projects
Commit 2b949e64 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: Mailbox<NormalRequest> -> Mutex<RawNode>

parent d49e760c
No related branches found
No related tags found
No related merge requests found
Pipeline #9110 failed
......@@ -585,6 +585,8 @@ fn postjoin(args: &args::Run) {
let node = traft::node::Node::new(&raft_cfg);
let node = node.expect("failed initializing raft node");
traft::node::set_global(node);
let node = traft::node::global().unwrap();
let cs = traft::Storage::conf_state().unwrap();
if cs.voters == [raft_cfg.id] {
......@@ -598,12 +600,8 @@ fn postjoin(args: &args::Run) {
node.tick(1); // apply configuration, if any
node.campaign().ok(); // trigger election immediately
assert_eq!(node.status().raft_state, "Leader");
}
traft::node::set_global(node);
let node = traft::node::global().unwrap();
box_cfg.listen = Some(args.listen.clone());
tarantool::set_cfg(&box_cfg);
......@@ -617,7 +615,7 @@ fn postjoin(args: &args::Run) {
loop {
let timeout = Duration::from_secs(1);
if let Err(e) = traft::node::global().unwrap().read_index(timeout) {
if let Err(e) = node.read_index(timeout) {
tlog!(Warning, "unable to get a read barrier: {e}");
continue;
} else {
......
......@@ -60,6 +60,7 @@ define_events! {
LeaveJointState, "raft.leave-joint-state";
StatusChanged, "raft.status-changed";
TopologyChanged, "raft.topology-changed";
RaftLoopNeeded, "raft.loop-needed";
}
////////////////////////////////////////////////////////////////////////////////
......
......@@ -38,7 +38,7 @@ pub type ReplicasetId = String;
/// - `count` is a simple in-memory counter. It's cheap to increment because it's volatile.
/// - `gen` should be persisted upon LogicalClock initialization to ensure the uniqueness.
/// - `id` corresponds to `raft_id` of the instance (that is already unique across nodes).
#[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct LogicalClock {
id: u64,
gen: u64,
......
......@@ -11,6 +11,7 @@ use ::raft::StateRole as RaftStateRole;
use ::raft::INVALID_ID;
use ::tarantool::error::TransactionError;
use ::tarantool::fiber;
use ::tarantool::fiber::Mutex;
use ::tarantool::proc;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
......@@ -30,7 +31,7 @@ use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use std::iter::FromIterator as _;
use crate::mailbox::Mailbox;
use crate::cache::CachedCell;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
......@@ -63,86 +64,31 @@ pub struct Status {
}
/// The heart of `traft` module - the Node.
#[derive(Debug)]
pub struct Node {
raw_node: Rc<Mutex<RawNode>>,
raft_id: RaftId,
_main_loop: fiber::UnitJoinHandle<'static>,
_conf_change_loop: fiber::UnitJoinHandle<'static>,
main_inbox: Mailbox<NormalRequest>,
// join_inbox: TopologyMailbox,
status: Rc<RefCell<Status>>,
notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>,
topology_cache: CachedCell<u64, Topology>,
lc: Cell<Option<LogicalClock>>,
}
/// A request to the raft main loop.
#[derive(Clone, Debug)]
enum NormalRequest {
/// Propose `raft::prelude::Entry` of `EntryNormal` kind.
///
/// Make a notification when it's committed.
/// Notify the caller with `Result<Box<_>>`,
/// of the associated type `traft::Op::Result`.
///
ProposeNormal { op: traft::Op, notify: Notify },
/// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
///
/// Make a notification when the second EntryConfChange is
/// committed (that corresponds to the leaving of a joint state).
/// Notify the caller with `Result<Box<()>>`.
///
ProposeConfChange {
conf_change: raft::ConfChangeV2,
term: u64,
notify: Notify,
},
/// This kind of requests is special: it can only be processed on
/// the leader, and implies corresponding `TopologyRequest` to a
/// cached `struct Topology`. As a result, it proposes the
/// `Op::PersistPeer` entry to the raft.
///
/// Make a notification when the entry is committed.
/// Notify the caller with `Result<Box<Peer>>`.
///
HandleTopologyRequest {
req: TopologyRequest,
notify: Notify,
},
/// Get a read barrier. In some systems it's also called the "quorum read".
///
/// Make a notification when index is read.
/// Notify the caller with `Result<Box<u64>>`,
/// holding the resulting `raft_index`.
///
ReadIndex { notify: Notify },
/// Start a new raft term.
///
/// Make a notification when request is processed.
/// Notify the caller with `Result<Box<()>>`
///
Campaign { notify: Notify },
/// Handle message from anoher raft node.
///
Step(raft::Message),
/// Tick the node.
///
/// Make a notification when request is processed.
/// Notify the caller with `Result<(Box<()>)>`, with
/// the `Err` variant unreachable
///
Tick { n_times: u32, notify: Notify },
impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("raft_id", &self.raft_id)
.finish_non_exhaustive()
}
}
impl Node {
pub const TICK: Duration = Duration::from_millis(100);
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let main_inbox = Mailbox::<NormalRequest>::new();
let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
let raw_node = Rc::new(Mutex::new(raw_node));
let status = Rc::new(RefCell::new(Status {
id: cfg.id,
leader_id: None,
......@@ -150,21 +96,24 @@ impl Node {
is_ready: false,
}));
let notifications = Rc::new(RefCell::new(HashMap::new()));
let main_loop_fn = {
let status = status.clone();
let main_inbox = main_inbox.clone();
move || raft_main_loop(main_inbox, status, raw_node)
let raw_node = raw_node.clone();
let notifications = notifications.clone();
move || raft_main_loop(status, raw_node, notifications)
};
let conf_change_loop_fn = {
let status = status.clone();
let main_inbox = main_inbox.clone();
move || raft_conf_change_loop(status, main_inbox)
move || raft_conf_change_loop(status)
};
let node = Node {
raw_node,
raft_id: cfg.id,
main_inbox,
notifications,
status,
_main_loop: fiber::Builder::new()
.name("raft_main_loop")
......@@ -176,6 +125,13 @@ impl Node {
.proc(conf_change_loop_fn)
.start()
.unwrap(),
topology_cache: CachedCell::new(),
lc: {
let id = Storage::raft_id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
Cell::new(Some(LogicalClock::new(id, gen)))
},
};
// Wait for the node to enter the main loop
......@@ -197,9 +153,23 @@ impl Node {
}
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let mut raw_node = self.raw_node.lock();
let (rx, tx) = Notify::new().into_clones();
self.main_inbox
.send(NormalRequest::ReadIndex { notify: tx });
let lc = self.next_lc();
self.notifications.borrow_mut().insert(lc, tx);
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc,
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
rx.recv_timeout::<u64>(timeout)
}
......@@ -208,37 +178,39 @@ impl Node {
op: T,
timeout: Duration,
) -> Result<T::Result, Error> {
let mut raw_node = self.raw_node.lock();
let (rx, tx) = Notify::new().into_clones();
self.main_inbox.send(NormalRequest::ProposeNormal {
op: op.into(),
notify: tx,
});
let lc = self.next_lc();
self.notifications.borrow_mut().insert(lc, tx);
let ctx = traft::EntryContextNormal { lc, op: op.into() }.to_bytes();
raw_node.propose(ctx, vec![])?;
event::broadcast(Event::RaftLoopNeeded);
drop(raw_node);
rx.recv_timeout::<T::Result>(timeout)
}
pub fn campaign(&self) -> Result<(), Error> {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Campaign { notify: tx };
self.main_inbox.send(req);
rx.recv::<()>()
Ok(self.raw_node.lock().campaign()?)
}
pub fn step(&self, msg: raft::Message) {
let req = NormalRequest::Step(msg);
self.main_inbox.send(req);
let mut raw_node = self.raw_node.lock();
if msg.to != raw_node.raft.id {
return;
}
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
drop(raw_node);
}
pub fn tick(&self, n_times: u32) {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::Tick {
n_times,
notify: tx,
};
self.main_inbox.send(req);
match rx.recv() {
Ok(()) => (),
Err(e) => tlog!(Warning, "{e}"),
let mut raw_node = self.raw_node.lock();
for _ in 0..n_times {
raw_node.tick();
}
event::broadcast(Event::RaftLoopNeeded);
drop(raw_node);
}
pub fn timeout_now(&self) {
......@@ -251,12 +223,132 @@ impl Node {
}
pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> {
let (rx, tx) = Notify::new().into_clones();
let req = NormalRequest::HandleTopologyRequest { req, notify: tx };
let mut raw_node = self.raw_node.lock();
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()).into());
}
let mut topology = self
.topology_cache
.pop(&raw_node.raft.term)
.unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor = Storage::replication_factor().unwrap().unwrap();
Topology::from_peers(peers).with_replication_factor(replication_factor)
});
let peer_result = match req {
TopologyRequest::Join(JoinRequest {
instance_id,
replicaset_id,
advertise_address,
failure_domains,
..
}) => topology.join(
instance_id,
replicaset_id,
advertise_address,
failure_domains,
),
TopologyRequest::UpdatePeer(UpdatePeerRequest {
instance_id,
health,
failure_domains,
..
}) => topology.update_peer(&instance_id, health, failure_domains),
};
let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
self.topology_cache.put(raw_node.raft.term, topology);
return Err(RaftError::ConfChangeError(e).into());
});
self.main_inbox.send(req);
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let lc = self.next_lc();
let ctx = traft::EntryContextNormal {
op: traft::Op::PersistPeer { peer },
lc,
};
let (rx, tx) = Notify::new().into_clones();
self.notifications.borrow_mut().insert(lc, tx);
raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache.put(raw_node.raft.term, topology);
event::broadcast(Event::RaftLoopNeeded);
drop(raw_node);
rx.recv::<Peer>()
}
fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> {
let mut raw_node = self.raw_node.lock();
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
break None;
};
if let Some(e) = reason {
return Err(RaftError::ConfChangeError(e.into()).into());
}
let prev_index = raw_node.raft.raft_log.last_index();
raw_node.propose_conf_change(vec![], conf_change)?;
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
let (rx, tx) = Notify::new().into_clones();
with_joint_state_latch(|joint_state_latch| {
assert!(joint_state_latch.take().is_none());
joint_state_latch.set(Some(JointStateLatch {
index: last_index,
notify: tx,
}));
});
event::broadcast(Event::RaftLoopNeeded);
drop(raw_node);
rx.recv()
}
fn next_lc(&self) -> LogicalClock {
let mut lc = self.lc.take().expect("it's always Some");
lc.inc();
self.lc.set(Some(lc));
lc
}
}
#[derive(Debug)]
......@@ -441,9 +533,9 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
}
fn raft_main_loop(
main_inbox: Mailbox<NormalRequest>,
status: Rc<RefCell<Status>>,
mut raw_node: RawNode,
raw_node: Rc<Mutex<RawNode>>,
notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>,
) {
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
......@@ -457,192 +549,15 @@ fn raft_main_loop(
pool.connect(peer.raft_id, peer.peer_address);
}
let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
let mut lc = {
let id = Storage::raft_id().unwrap().unwrap();
let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
Storage::persist_gen(gen).unwrap();
LogicalClock::new(id, gen)
};
let topology_cache = crate::cache::CachedCell::<u64, Topology>::new();
// let mut topology: Option<(u64, Topology)> = None;
loop {
// Clean up obsolete notifications
notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
for req in main_inbox.receive_all(Node::TICK) {
match req {
NormalRequest::ProposeNormal { op, notify } => {
lc.inc();
let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
if let Err(e) = raw_node.propose(ctx, vec![]) {
notify.notify_err(e);
} else {
notifications.insert(lc.clone(), notify);
}
}
NormalRequest::HandleTopologyRequest { req, notify } => {
lc.inc();
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
let e = RaftError::ConfChangeError("not a leader".into());
notify.notify_err(e);
continue;
}
let mut topology =
topology_cache.pop(&raw_node.raft.term).unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor =
Storage::replication_factor().unwrap().unwrap();
Topology::from_peers(peers).with_replication_factor(replication_factor)
});
let peer_result = match req {
TopologyRequest::Join(JoinRequest {
instance_id,
replicaset_id,
advertise_address,
failure_domains,
..
}) => topology.join(
instance_id,
replicaset_id,
advertise_address,
failure_domains,
),
TopologyRequest::UpdatePeer(UpdatePeerRequest {
instance_id,
health,
failure_domains,
..
}) => topology.update_peer(&instance_id, health, failure_domains),
};
let mut peer = match peer_result {
Ok(peer) => peer,
Err(e) => {
notify.notify_err(RaftError::ConfChangeError(e));
topology_cache.put(raw_node.raft.term, topology);
continue;
}
};
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let ctx = traft::EntryContextNormal {
op: traft::Op::PersistPeer { peer },
lc: lc.clone(),
};
if let Err(e) = raw_node.propose(ctx.to_bytes(), vec![]) {
notify.notify_err(e);
} else {
notifications.insert(lc.clone(), notify);
topology_cache.put(raw_node.raft.term, topology);
}
}
NormalRequest::ProposeConfChange {
conf_change,
term,
notify,
} => {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
break None;
};
if let Some(e) = reason {
let e = RaftError::ConfChangeError(e.into());
notify.notify_err(e);
continue;
}
notifications
.borrow_mut()
.retain(|_, notify: &mut Notify| !notify.is_closed());
let prev_index = raw_node.raft.raft_log.last_index();
if let Err(e) = raw_node.propose_conf_change(vec![], conf_change) {
notify.notify_err(e);
} else {
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
with_joint_state_latch(|joint_state_latch| {
assert!(joint_state_latch.take().is_none());
joint_state_latch.set(Some(JointStateLatch {
index: last_index,
notify,
}));
});
}
}
NormalRequest::ReadIndex { notify } => {
lc.inc();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc: lc.clone(),
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
notifications.insert(lc.clone(), notify);
}
NormalRequest::Campaign { notify } => match raw_node.campaign() {
Ok(()) => notify.notify_ok(()),
Err(e) => notify.notify_err(e),
},
NormalRequest::Step(msg) => {
if msg.to != raw_node.raft.id {
continue;
}
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
}
NormalRequest::Tick { n_times, notify } => {
for _ in 0..n_times {
raw_node.tick();
}
notify.notify_ok(());
}
}
}
event::wait_timeout(Event::RaftLoopNeeded, Node::TICK).expect("Events must be initialized");
let mut raw_node = raw_node.lock();
let now = Instant::now();
if now > next_tick {
next_tick = now + Node::TICK;
......@@ -673,7 +588,7 @@ fn raft_main_loop(
let committed_entries = ready.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut *notifications.borrow_mut(),
&mut raw_node,
&mut pool,
&mut topology_changed,
......@@ -708,7 +623,7 @@ fn raft_main_loop(
}
let read_states = ready.take_read_states();
handle_read_states(read_states, &mut notifications);
handle_read_states(read_states, &mut *notifications.borrow_mut());
Ok(())
})
......@@ -731,7 +646,7 @@ fn raft_main_loop(
let committed_entries = light_rd.take_committed_entries();
handle_committed_entries(
committed_entries,
&mut notifications,
&mut *notifications.borrow_mut(),
&mut raw_node,
&mut pool,
&mut topology_changed,
......@@ -756,7 +671,7 @@ fn raft_main_loop(
}
}
fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) {
fn raft_conf_change_loop(status: Rc<RefCell<Status>>) {
loop {
if status.borrow().raft_state != "Leader" {
event::wait(Event::StatusChanged).expect("Events system must be initialized");
......@@ -849,18 +764,12 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<Normal
..Default::default()
};
let (rx, tx) = Notify::new().into_clones();
main_inbox.send(NormalRequest::ProposeConfChange {
term,
conf_change,
notify: tx,
});
let node = global().expect("must be initialized");
// main_loop gives the warranty that every ProposeConfChange
// will sometimes be handled and there's no need in timeout.
// It also guarantees that the notification will arrive only
// after the node leaves the joint state.
match rx.recv() {
match node.propose_conf_change(term, conf_change) {
Ok(()) => tlog!(Info, "conf_change processed"),
Err(e) => tlog!(Warning, "conf_change failed: {e}"),
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment