diff --git a/src/traft/node.rs b/src/traft/node.rs index 8d3d66e7b21571b7b8d94df5deedd606a95e9c22..e12ce518869c43bf5ca6828f8f803aebd3adf3c3 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -71,8 +71,8 @@ pub struct Status { /// The heart of `traft` module - the Node. pub struct Node { - raw_node: Rc<Mutex<RawNode>>, raft_id: RaftId, + inner_node: Rc<Mutex<InnerNode>>, pub(super) storage: RaftSpaceAccess, _main_loop: fiber::UnitJoinHandle<'static>, _conf_change_loop: fiber::UnitJoinHandle<'static>, @@ -98,7 +98,10 @@ impl Node { /// **This function yields** pub fn new(cfg: &raft::Config, mut storage: RaftSpaceAccess) -> Result<Self, RaftError> { let raw_node = RawNode::new(cfg, storage.clone(), &tlog::root())?; - let raw_node = Rc::new(Mutex::new(raw_node)); + + let inner_node = InnerNode { raw_node }; + let inner_node = Rc::new(Mutex::new(inner_node)); + let status = Rc::new(RefCell::new(Status { id: cfg.id, leader_id: None, @@ -110,11 +113,11 @@ impl Node { let main_loop_fn = { let status = status.clone(); - let raw_node = raw_node.clone(); + let inner_node = inner_node.clone(); let storage = storage.clone(); let raft_loop_cond = raft_loop_cond.clone(); let notifications = notifications.clone(); - move || raft_main_loop(status, raw_node, storage, raft_loop_cond, notifications) + move || raft_main_loop(status, inner_node, storage, raft_loop_cond, notifications) }; let conf_change_loop_fn = { @@ -124,8 +127,8 @@ impl Node { }; let node = Node { - raw_node, raft_id: cfg.id, + inner_node, notifications, status, raft_loop_cond, @@ -397,9 +400,9 @@ impl Node { &self, f: impl FnOnce(&mut RawNode) -> Result<R, Error>, ) -> Result<R, Error> { - let mut raw_node = self.raw_node.lock(); - let res = f(&mut *raw_node); - drop(raw_node); + let mut inner_node = self.inner_node.lock(); + let res = f(&mut inner_node.raw_node); + drop(inner_node); self.raft_loop_cond.broadcast(); res } @@ -426,6 +429,10 @@ impl Node { } } +struct InnerNode { + pub raw_node: RawNode, +} + #[derive(Debug)] struct JointStateLatch { /// Index of the latest ConfChange entry proposed. @@ -639,7 +646,7 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { fn raft_main_loop( status: Rc<RefCell<Status>>, - raw_node: Rc<Mutex<RawNode>>, + inner_node: Rc<Mutex<InnerNode>>, mut storage: RaftSpaceAccess, raft_loop_cond: Rc<Cond>, notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, @@ -664,7 +671,7 @@ fn raft_main_loop( raft_loop_cond.wait_timeout(Node::TICK); - let mut raw_node = raw_node.lock(); + let raw_node: &mut RawNode = &mut inner_node.lock().raw_node; let now = Instant::now(); if now > next_tick { next_tick = now + Node::TICK; @@ -697,7 +704,7 @@ fn raft_main_loop( handle_committed_entries( committed_entries, &mut *notifications.borrow_mut(), - &mut raw_node, + raw_node, &mut storage, &mut pool, &mut topology_changed, @@ -758,7 +765,7 @@ fn raft_main_loop( handle_committed_entries( committed_entries, &mut *notifications.borrow_mut(), - &mut raw_node, + raw_node, &mut storage, &mut pool, &mut topology_changed,