From ada4d6bb813c006cd706abe4600073d439e12848 Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com> Date: Fri, 16 Sep 2022 18:26:20 +0300 Subject: [PATCH] refactor: encapsulate InnerNode initialization --- src/main.rs | 14 ++-------- src/traft/node.rs | 70 ++++++++++++++++++++++++++++++++--------------- 2 files changed, 51 insertions(+), 33 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6d8a8a1ac4..2280f43c23 100644 --- a/src/main.rs +++ b/src/main.rs @@ -755,22 +755,14 @@ fn postjoin(args: &args::Run, storage: RaftSpaceAccess) { box_cfg.replication_connect_quorum = 0; tarantool::set_cfg(&box_cfg); - let raft_id = storage.raft_id().unwrap().unwrap(); - let applied = storage.applied().unwrap().unwrap_or(0); - let raft_cfg = raft::Config { - id: raft_id, - applied, - pre_vote: true, - ..Default::default() - }; - - let node = traft::node::Node::new(&raft_cfg, storage.clone()); + let node = traft::node::Node::new(storage.clone()); let node = node.expect("failed initializing raft node"); traft::node::set_global(node); let node = traft::node::global().unwrap(); + let raft_id = node.raft_id(); let cs = storage.conf_state().unwrap(); - if cs.voters == [raft_cfg.id] { + if cs.voters == [raft_id] { tlog!( Info, concat!( diff --git a/src/traft/node.rs b/src/traft/node.rs index e3305e8609..80cbc7f52c 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -8,6 +8,7 @@ use ::raft::prelude as raft; use ::raft::Error as RaftError; use ::raft::StateRole as RaftStateRole; +use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::error::TransactionError; use ::tarantool::fiber; @@ -71,7 +72,6 @@ pub struct Status { /// The heart of `traft` module - the Node. pub struct Node { - raft_id: RaftId, inner_node: Rc<Mutex<InnerNode>>, pub(super) storage: RaftSpaceAccess, _main_loop: fiber::UnitJoinHandle<'static>, @@ -84,7 +84,7 @@ pub struct Node { impl std::fmt::Debug for Node { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("Node") - .field("raft_id", &self.raft_id) + .field("raft_id", &self.raft_id()) .finish_non_exhaustive() } } @@ -94,27 +94,17 @@ impl Node { /// Initialize the raft node. /// **This function yields** - pub fn new(cfg: &raft::Config, mut storage: RaftSpaceAccess) -> Result<Self, RaftError> { - let raw_node = RawNode::new(cfg, storage.clone(), &tlog::root())?; - - let inner_node = InnerNode { - raw_node, - notifications: Default::default(), - lc: { - let gen = storage.gen().unwrap().unwrap_or(0) + 1; - storage.persist_gen(gen).unwrap(); - LogicalClock::new(cfg.id, gen) - }, - }; - - let inner_node = Rc::new(Mutex::new(inner_node)); + pub fn new(storage: RaftSpaceAccess) -> Result<Self, RaftError> { + let inner_node = InnerNode::new(storage.clone())?; let status = Rc::new(RefCell::new(Status { - id: cfg.id, + id: inner_node.raft_id(), leader_id: None, raft_state: "Follower".into(), is_ready: false, })); + + let inner_node = Rc::new(Mutex::new(inner_node)); let raft_loop_cond = Rc::new(Cond::new()); let main_loop_fn = { @@ -132,7 +122,6 @@ impl Node { }; let node = Node { - raft_id: cfg.id, inner_node, status, raft_loop_cond, @@ -155,6 +144,10 @@ impl Node { Ok(node) } + pub fn raft_id(&self) -> RaftId { + self.status.borrow().id + } + pub fn status(&self) -> Status { self.status.borrow().clone() } @@ -261,9 +254,10 @@ impl Node { /// **This function yields** pub fn timeout_now(&self) { + let raft_id = self.raft_id(); self.step_and_yield(raft::Message { - to: self.raft_id, - from: self.raft_id, + to: raft_id, + from: raft_id, msg_type: raft::MessageType::MsgTimeoutNow, ..Default::default() }) @@ -421,6 +415,38 @@ struct InnerNode { } impl InnerNode { + fn new( + mut storage: RaftSpaceAccess, + // TODO: provide clusterwide space access + ) -> Result<Self, RaftError> { + let box_err = |e| StorageError::Other(Box::new(e)); + + let raft_id: RaftId = storage.raft_id().map_err(box_err)?.unwrap(); + let applied: RaftIndex = storage.applied().map_err(box_err)?.unwrap_or(0); + let cfg = raft::Config { + id: raft_id, + applied, + pre_vote: true, + ..Default::default() + }; + + let raw_node = RawNode::new(&cfg, storage.clone(), &tlog::root())?; + + Ok(Self { + raw_node, + notifications: Default::default(), + lc: { + let gen = storage.gen().unwrap().unwrap_or(0) + 1; + storage.persist_gen(gen).unwrap(); + LogicalClock::new(cfg.id, gen) + }, + }) + } + + fn raft_id(&self) -> RaftId { + self.raw_node.raft.id + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications @@ -1062,7 +1088,7 @@ fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::erro let leader_id = node.status().leader_id.ok_or("leader_id not found")?; - if node.raft_id != leader_id { + if node.raft_id() != leader_id { return Err(Box::from("not a leader")); } @@ -1114,7 +1140,7 @@ fn sync_raft(promotee: &Peer) -> Result<(), Box<dyn std::error::Error>> { let leader_id = node.status().leader_id.ok_or("leader_id not found")?; - if node.raft_id != leader_id { + if node.raft_id() != leader_id { return Err(Box::from("not a leader")); } -- GitLab