From e030aa41b995022ea8637a39069c26638c162af3 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 7 Jul 2022 21:49:26 +0300 Subject: [PATCH] refactor: Mailbox<NormalRequest> -> Mutex<RawNode> --- src/main.rs | 8 +- src/traft/event.rs | 1 + src/traft/mod.rs | 2 +- src/traft/node.rs | 493 ++++++++++++++++++--------------------------- 4 files changed, 206 insertions(+), 298 deletions(-) diff --git a/src/main.rs b/src/main.rs index 1e315819ed..9f456f2c8e 100644 --- a/src/main.rs +++ b/src/main.rs @@ -585,6 +585,8 @@ fn postjoin(args: &args::Run) { let node = traft::node::Node::new(&raft_cfg); let node = node.expect("failed initializing raft node"); + traft::node::set_global(node); + let node = traft::node::global().unwrap(); let cs = traft::Storage::conf_state().unwrap(); if cs.voters == [raft_cfg.id] { @@ -598,12 +600,8 @@ fn postjoin(args: &args::Run) { node.tick(1); // apply configuration, if any node.campaign().ok(); // trigger election immediately - assert_eq!(node.status().raft_state, "Leader"); } - traft::node::set_global(node); - let node = traft::node::global().unwrap(); - box_cfg.listen = Some(args.listen.clone()); tarantool::set_cfg(&box_cfg); @@ -617,7 +615,7 @@ fn postjoin(args: &args::Run) { loop { let timeout = Duration::from_secs(1); - if let Err(e) = traft::node::global().unwrap().read_index(timeout) { + if let Err(e) = node.read_index(timeout) { tlog!(Warning, "unable to get a read barrier: {e}"); continue; } else { diff --git a/src/traft/event.rs b/src/traft/event.rs index 2bea914b22..b3687172df 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -60,6 +60,7 @@ define_events! { LeaveJointState, "raft.leave-joint-state"; StatusChanged, "raft.status-changed"; TopologyChanged, "raft.topology-changed"; + RaftLoopNeeded, "raft.loop-needed"; } //////////////////////////////////////////////////////////////////////////////// diff --git a/src/traft/mod.rs b/src/traft/mod.rs index e3b7439d6f..8a375d4d06 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -38,7 +38,7 @@ pub type ReplicasetId = String; /// - `count` is a simple in-memory counter. It's cheap to increment because it's volatile. /// - `gen` should be persisted upon LogicalClock initialization to ensure the uniqueness. /// - `id` corresponds to `raft_id` of the instance (that is already unique across nodes). -#[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)] +#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)] pub struct LogicalClock { id: u64, gen: u64, diff --git a/src/traft/node.rs b/src/traft/node.rs index 7d97540480..bc7e0e26bc 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,6 +11,7 @@ use ::raft::StateRole as RaftStateRole; use ::raft::INVALID_ID; use ::tarantool::error::TransactionError; use ::tarantool::fiber; +use ::tarantool::fiber::Mutex; use ::tarantool::proc; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; @@ -30,7 +31,7 @@ use ::tarantool::util::IntoClones as _; use protobuf::Message as _; use std::iter::FromIterator as _; -use crate::mailbox::Mailbox; +use crate::cache::CachedCell; use crate::tlog; use crate::traft; use crate::traft::error::Error; @@ -63,86 +64,31 @@ pub struct Status { } /// The heart of `traft` module - the Node. -#[derive(Debug)] pub struct Node { + raw_node: Rc<Mutex<RawNode>>, raft_id: RaftId, _main_loop: fiber::UnitJoinHandle<'static>, _conf_change_loop: fiber::UnitJoinHandle<'static>, - main_inbox: Mailbox<NormalRequest>, - // join_inbox: TopologyMailbox, status: Rc<RefCell<Status>>, + notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, + topology_cache: CachedCell<u64, Topology>, + lc: Cell<Option<LogicalClock>>, } -/// A request to the raft main loop. -#[derive(Clone, Debug)] -enum NormalRequest { - /// Propose `raft::prelude::Entry` of `EntryNormal` kind. - /// - /// Make a notification when it's committed. - /// Notify the caller with `Result<Box<_>>`, - /// of the associated type `traft::Op::Result`. - /// - ProposeNormal { op: traft::Op, notify: Notify }, - - /// Propose `raft::prelude::Entry` of `EntryConfChange` kind. - /// - /// Make a notification when the second EntryConfChange is - /// committed (that corresponds to the leaving of a joint state). - /// Notify the caller with `Result<Box<()>>`. - /// - ProposeConfChange { - conf_change: raft::ConfChangeV2, - term: u64, - notify: Notify, - }, - - /// This kind of requests is special: it can only be processed on - /// the leader, and implies corresponding `TopologyRequest` to a - /// cached `struct Topology`. As a result, it proposes the - /// `Op::PersistPeer` entry to the raft. - /// - /// Make a notification when the entry is committed. - /// Notify the caller with `Result<Box<Peer>>`. - /// - HandleTopologyRequest { - req: TopologyRequest, - notify: Notify, - }, - - /// Get a read barrier. In some systems it's also called the "quorum read". - /// - /// Make a notification when index is read. - /// Notify the caller with `Result<Box<u64>>`, - /// holding the resulting `raft_index`. - /// - ReadIndex { notify: Notify }, - - /// Start a new raft term. - /// - /// Make a notification when request is processed. - /// Notify the caller with `Result<Box<()>>` - /// - Campaign { notify: Notify }, - - /// Handle message from anoher raft node. - /// - Step(raft::Message), - - /// Tick the node. - /// - /// Make a notification when request is processed. - /// Notify the caller with `Result<(Box<()>)>`, with - /// the `Err` variant unreachable - /// - Tick { n_times: u32, notify: Notify }, +impl std::fmt::Debug for Node { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("Node") + .field("raft_id", &self.raft_id) + .finish_non_exhaustive() + } } impl Node { pub const TICK: Duration = Duration::from_millis(100); pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { - let main_inbox = Mailbox::<NormalRequest>::new(); let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; + let raw_node = Rc::new(Mutex::new(raw_node)); let status = Rc::new(RefCell::new(Status { id: cfg.id, leader_id: None, @@ -150,21 +96,24 @@ impl Node { is_ready: false, })); + let notifications = Rc::new(RefCell::new(HashMap::new())); + let main_loop_fn = { let status = status.clone(); - let main_inbox = main_inbox.clone(); - move || raft_main_loop(main_inbox, status, raw_node) + let raw_node = raw_node.clone(); + let notifications = notifications.clone(); + move || raft_main_loop(status, raw_node, notifications) }; let conf_change_loop_fn = { let status = status.clone(); - let main_inbox = main_inbox.clone(); - move || raft_conf_change_loop(status, main_inbox) + move || raft_conf_change_loop(status) }; let node = Node { + raw_node, raft_id: cfg.id, - main_inbox, + notifications, status, _main_loop: fiber::Builder::new() .name("raft_main_loop") @@ -176,6 +125,13 @@ impl Node { .proc(conf_change_loop_fn) .start() .unwrap(), + topology_cache: CachedCell::new(), + lc: { + let id = Storage::raft_id().unwrap().unwrap(); + let gen = Storage::gen().unwrap().unwrap_or(0) + 1; + Storage::persist_gen(gen).unwrap(); + Cell::new(Some(LogicalClock::new(id, gen))) + }, }; // Wait for the node to enter the main loop @@ -197,9 +153,23 @@ impl Node { } pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> { + let mut raw_node = self.raw_node.lock(); + let (rx, tx) = Notify::new().into_clones(); - self.main_inbox - .send(NormalRequest::ReadIndex { notify: tx }); + let lc = self.next_lc(); + self.notifications.borrow_mut().insert(lc, tx); + + // read_index puts this context into an Entry, + // so we've got to compose full EntryContext, + // despite single LogicalClock would be enough + let ctx = traft::EntryContextNormal { + lc, + op: traft::Op::Nop, + } + .to_bytes(); + raw_node.read_index(ctx); + drop(raw_node); + event::broadcast(Event::RaftLoopNeeded); rx.recv_timeout::<u64>(timeout) } @@ -208,37 +178,39 @@ impl Node { op: T, timeout: Duration, ) -> Result<T::Result, Error> { + let mut raw_node = self.raw_node.lock(); let (rx, tx) = Notify::new().into_clones(); - self.main_inbox.send(NormalRequest::ProposeNormal { - op: op.into(), - notify: tx, - }); + let lc = self.next_lc(); + self.notifications.borrow_mut().insert(lc, tx); + let ctx = traft::EntryContextNormal { lc, op: op.into() }.to_bytes(); + raw_node.propose(ctx, vec![])?; + event::broadcast(Event::RaftLoopNeeded); + drop(raw_node); rx.recv_timeout::<T::Result>(timeout) } pub fn campaign(&self) -> Result<(), Error> { - let (rx, tx) = Notify::new().into_clones(); - let req = NormalRequest::Campaign { notify: tx }; - self.main_inbox.send(req); - rx.recv::<()>() + Ok(self.raw_node.lock().campaign()?) } pub fn step(&self, msg: raft::Message) { - let req = NormalRequest::Step(msg); - self.main_inbox.send(req); + let mut raw_node = self.raw_node.lock(); + if msg.to != raw_node.raft.id { + return; + } + if let Err(e) = raw_node.step(msg) { + tlog!(Error, "{e}"); + } + drop(raw_node); } pub fn tick(&self, n_times: u32) { - let (rx, tx) = Notify::new().into_clones(); - let req = NormalRequest::Tick { - n_times, - notify: tx, - }; - self.main_inbox.send(req); - match rx.recv() { - Ok(()) => (), - Err(e) => tlog!(Warning, "{e}"), + let mut raw_node = self.raw_node.lock(); + for _ in 0..n_times { + raw_node.tick(); } + event::broadcast(Event::RaftLoopNeeded); + drop(raw_node); } pub fn timeout_now(&self) { @@ -251,12 +223,132 @@ impl Node { } pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> { - let (rx, tx) = Notify::new().into_clones(); - let req = NormalRequest::HandleTopologyRequest { req, notify: tx }; + let mut raw_node = self.raw_node.lock(); + + let status = raw_node.status(); + if status.ss.raft_state != RaftStateRole::Leader { + return Err(RaftError::ConfChangeError("not a leader".into()).into()); + } + + let mut topology = self + .topology_cache + .pop(&raw_node.raft.term) + .unwrap_or_else(|| { + let peers = Storage::peers().unwrap(); + let replication_factor = Storage::replication_factor().unwrap().unwrap(); + Topology::from_peers(peers).with_replication_factor(replication_factor) + }); + + let peer_result = match req { + TopologyRequest::Join(JoinRequest { + instance_id, + replicaset_id, + advertise_address, + failure_domains, + .. + }) => topology.join( + instance_id, + replicaset_id, + advertise_address, + failure_domains, + ), + + TopologyRequest::UpdatePeer(UpdatePeerRequest { + instance_id, + health, + failure_domains, + .. + }) => topology.update_peer(&instance_id, health, failure_domains), + }; + + let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => { + self.topology_cache.put(raw_node.raft.term, topology); + return Err(RaftError::ConfChangeError(e).into()); + }); - self.main_inbox.send(req); + peer.commit_index = raw_node.raft.raft_log.last_index() + 1; + + let lc = self.next_lc(); + let ctx = traft::EntryContextNormal { + op: traft::Op::PersistPeer { peer }, + lc, + }; + + let (rx, tx) = Notify::new().into_clones(); + self.notifications.borrow_mut().insert(lc, tx); + raw_node.propose(ctx.to_bytes(), vec![])?; + self.topology_cache.put(raw_node.raft.term, topology); + event::broadcast(Event::RaftLoopNeeded); + drop(raw_node); rx.recv::<Peer>() } + + fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> { + let mut raw_node = self.raw_node.lock(); + // In some states proposing a ConfChange is impossible. + // Check if there's a reason to reject it. + + #[allow(clippy::never_loop)] + let reason: Option<&str> = loop { + // Checking leadership is only needed for the + // correct latch management. It doesn't affect + // raft correctness. Checking the instance is a + // leader makes sure the proposed `ConfChange` + // is appended to the raft log immediately + // instead of sending `MsgPropose` over the + // network. + if raw_node.raft.state != RaftStateRole::Leader { + break Some("not a leader"); + } + + if term != raw_node.raft.term { + break Some("raft term mismatch"); + } + + // Without this check the node would silently ignore the conf change. + // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026 + if raw_node.raft.has_pending_conf() { + break Some("already has pending confchange"); + } + + break None; + }; + + if let Some(e) = reason { + return Err(RaftError::ConfChangeError(e.into()).into()); + } + + let prev_index = raw_node.raft.raft_log.last_index(); + raw_node.propose_conf_change(vec![], conf_change)?; + + // oops, current instance isn't actually a leader + // (which is impossible in theory, but we're not + // sure in practice) and sent the ConfChange message + // to the raft network instead of appending it to the + // raft log. + let last_index = raw_node.raft.raft_log.last_index(); + assert_eq!(last_index, prev_index + 1); + + let (rx, tx) = Notify::new().into_clones(); + with_joint_state_latch(|joint_state_latch| { + assert!(joint_state_latch.take().is_none()); + joint_state_latch.set(Some(JointStateLatch { + index: last_index, + notify: tx, + })); + }); + + event::broadcast(Event::RaftLoopNeeded); + drop(raw_node); + rx.recv() + } + + fn next_lc(&self) -> LogicalClock { + let mut lc = self.lc.take().expect("it's always Some"); + lc.inc(); + self.lc.set(Some(lc)); + lc + } } #[derive(Debug)] @@ -441,9 +533,9 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { } fn raft_main_loop( - main_inbox: Mailbox<NormalRequest>, status: Rc<RefCell<Status>>, - mut raw_node: RawNode, + raw_node: Rc<Mutex<RawNode>>, + notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, ) { let mut next_tick = Instant::now() + Node::TICK; let mut pool = ConnectionPool::builder() @@ -457,192 +549,15 @@ fn raft_main_loop( pool.connect(peer.raft_id, peer.peer_address); } - let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new(); - let mut lc = { - let id = Storage::raft_id().unwrap().unwrap(); - let gen = Storage::gen().unwrap().unwrap_or(0) + 1; - Storage::persist_gen(gen).unwrap(); - LogicalClock::new(id, gen) - }; - - let topology_cache = crate::cache::CachedCell::<u64, Topology>::new(); - // let mut topology: Option<(u64, Topology)> = None; - loop { // Clean up obsolete notifications - notifications.retain(|_, notify: &mut Notify| !notify.is_closed()); - - for req in main_inbox.receive_all(Node::TICK) { - match req { - NormalRequest::ProposeNormal { op, notify } => { - lc.inc(); - - let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes(); - if let Err(e) = raw_node.propose(ctx, vec![]) { - notify.notify_err(e); - } else { - notifications.insert(lc.clone(), notify); - } - } - NormalRequest::HandleTopologyRequest { req, notify } => { - lc.inc(); - - let status = raw_node.status(); - if status.ss.raft_state != RaftStateRole::Leader { - let e = RaftError::ConfChangeError("not a leader".into()); - notify.notify_err(e); - continue; - } - - let mut topology = - topology_cache.pop(&raw_node.raft.term).unwrap_or_else(|| { - let peers = Storage::peers().unwrap(); - let replication_factor = - Storage::replication_factor().unwrap().unwrap(); - Topology::from_peers(peers).with_replication_factor(replication_factor) - }); - - let peer_result = match req { - TopologyRequest::Join(JoinRequest { - instance_id, - replicaset_id, - advertise_address, - failure_domains, - .. - }) => topology.join( - instance_id, - replicaset_id, - advertise_address, - failure_domains, - ), - - TopologyRequest::UpdatePeer(UpdatePeerRequest { - instance_id, - health, - failure_domains, - .. - }) => topology.update_peer(&instance_id, health, failure_domains), - }; - - let mut peer = match peer_result { - Ok(peer) => peer, - Err(e) => { - notify.notify_err(RaftError::ConfChangeError(e)); - topology_cache.put(raw_node.raft.term, topology); - continue; - } - }; - - peer.commit_index = raw_node.raft.raft_log.last_index() + 1; - - let ctx = traft::EntryContextNormal { - op: traft::Op::PersistPeer { peer }, - lc: lc.clone(), - }; - - if let Err(e) = raw_node.propose(ctx.to_bytes(), vec![]) { - notify.notify_err(e); - } else { - notifications.insert(lc.clone(), notify); - topology_cache.put(raw_node.raft.term, topology); - } - } - NormalRequest::ProposeConfChange { - conf_change, - term, - notify, - } => { - // In some states proposing a ConfChange is impossible. - // Check if there's a reason to reject it. - - #[allow(clippy::never_loop)] - let reason: Option<&str> = loop { - // Checking leadership is only needed for the - // correct latch management. It doesn't affect - // raft correctness. Checking the instance is a - // leader makes sure the proposed `ConfChange` - // is appended to the raft log immediately - // instead of sending `MsgPropose` over the - // network. - if raw_node.raft.state != RaftStateRole::Leader { - break Some("not a leader"); - } - - if term != raw_node.raft.term { - break Some("raft term mismatch"); - } - - // Without this check the node would silently ignore the conf change. - // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026 - if raw_node.raft.has_pending_conf() { - break Some("already has pending confchange"); - } - - break None; - }; - - if let Some(e) = reason { - let e = RaftError::ConfChangeError(e.into()); - notify.notify_err(e); - continue; - } + notifications + .borrow_mut() + .retain(|_, notify: &mut Notify| !notify.is_closed()); - let prev_index = raw_node.raft.raft_log.last_index(); - if let Err(e) = raw_node.propose_conf_change(vec![], conf_change) { - notify.notify_err(e); - } else { - // oops, current instance isn't actually a leader - // (which is impossible in theory, but we're not - // sure in practice) and sent the ConfChange message - // to the raft network instead of appending it to the - // raft log. - let last_index = raw_node.raft.raft_log.last_index(); - assert_eq!(last_index, prev_index + 1); - - with_joint_state_latch(|joint_state_latch| { - assert!(joint_state_latch.take().is_none()); - joint_state_latch.set(Some(JointStateLatch { - index: last_index, - notify, - })); - }); - } - } - NormalRequest::ReadIndex { notify } => { - lc.inc(); - - // read_index puts this context into an Entry, - // so we've got to compose full EntryContext, - // despite single LogicalClock would be enough - let ctx = traft::EntryContextNormal { - lc: lc.clone(), - op: traft::Op::Nop, - } - .to_bytes(); - raw_node.read_index(ctx); - notifications.insert(lc.clone(), notify); - } - NormalRequest::Campaign { notify } => match raw_node.campaign() { - Ok(()) => notify.notify_ok(()), - Err(e) => notify.notify_err(e), - }, - NormalRequest::Step(msg) => { - if msg.to != raw_node.raft.id { - continue; - } - if let Err(e) = raw_node.step(msg) { - tlog!(Error, "{e}"); - } - } - NormalRequest::Tick { n_times, notify } => { - for _ in 0..n_times { - raw_node.tick(); - } - notify.notify_ok(()); - } - } - } + event::wait_timeout(Event::RaftLoopNeeded, Node::TICK).expect("Events must be initialized"); + let mut raw_node = raw_node.lock(); let now = Instant::now(); if now > next_tick { next_tick = now + Node::TICK; @@ -673,7 +588,7 @@ fn raft_main_loop( let committed_entries = ready.take_committed_entries(); handle_committed_entries( committed_entries, - &mut notifications, + &mut *notifications.borrow_mut(), &mut raw_node, &mut pool, &mut topology_changed, @@ -708,7 +623,7 @@ fn raft_main_loop( } let read_states = ready.take_read_states(); - handle_read_states(read_states, &mut notifications); + handle_read_states(read_states, &mut *notifications.borrow_mut()); Ok(()) }) @@ -731,7 +646,7 @@ fn raft_main_loop( let committed_entries = light_rd.take_committed_entries(); handle_committed_entries( committed_entries, - &mut notifications, + &mut *notifications.borrow_mut(), &mut raw_node, &mut pool, &mut topology_changed, @@ -756,7 +671,7 @@ fn raft_main_loop( } } -fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) { +fn raft_conf_change_loop(status: Rc<RefCell<Status>>) { loop { if status.borrow().raft_state != "Leader" { event::wait(Event::StatusChanged).expect("Events system must be initialized"); @@ -849,18 +764,12 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<Normal ..Default::default() }; - let (rx, tx) = Notify::new().into_clones(); - main_inbox.send(NormalRequest::ProposeConfChange { - term, - conf_change, - notify: tx, - }); - + let node = global().expect("must be initialized"); // main_loop gives the warranty that every ProposeConfChange // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only // after the node leaves the joint state. - match rx.recv() { + match node.propose_conf_change(term, conf_change) { Ok(()) => tlog!(Info, "conf_change processed"), Err(e) => tlog!(Warning, "conf_change failed: {e}"), } -- GitLab