From 6eabde5a9cff058c7693f5718aaf2311aef6c270 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 8 Jul 2022 16:46:04 +0300
Subject: [PATCH] refactor: minimize the amount of places raw_node.lock() is
 called in

---
 src/traft/node.rs | 298 +++++++++++++++++++++++-----------------------
 1 file changed, 152 insertions(+), 146 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index 5613dea786..89480211f1 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -154,31 +154,29 @@ impl Node {
     }
 
     pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
-        let mut raw_node = self.raw_node.lock();
-
-        // In some states `raft-rs` ignores the ReadIndex request.
-        // Check it preliminary, don't wait for the timeout.
-        //
-        // See for details:
-        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
-        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
-        let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
-        let term_just_started =
-            raw_node.raft.state == RaftStateRole::Leader && !raw_node.raft.commit_to_current_term();
-        if leader_doesnt_exist || term_just_started {
-            return Err(RaftError::ProposalDropped.into());
-        }
-
-        let (lc, notify) = self.add_notify();
+        self.raw_operation(|raw_node| {
+            // In some states `raft-rs` ignores the ReadIndex request.
+            // Check it preliminary, don't wait for the timeout.
+            //
+            // See for details:
+            // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
+            // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
+            let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
+            let term_just_started = raw_node.raft.state == RaftStateRole::Leader
+                && !raw_node.raft.commit_to_current_term();
+            if leader_doesnt_exist || term_just_started {
+                return Err(RaftError::ProposalDropped.into());
+            }
 
-        // read_index puts this context into an Entry,
-        // so we've got to compose full EntryContext,
-        // despite single LogicalClock would be enough
-        let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
-        raw_node.read_index(ctx.to_bytes());
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
-        notify.recv_timeout::<u64>(timeout)
+            let (lc, notify) = self.add_notify();
+            // read_index puts this context into an Entry,
+            // so we've got to compose full EntryContext,
+            // despite single LogicalClock would be enough
+            let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
+            raw_node.read_index(ctx.to_bytes());
+            Ok(notify)
+        })?
+        .recv_timeout::<u64>(timeout)
     }
 
     pub fn propose<T: OpResult + Into<traft::Op>>(
@@ -186,20 +184,17 @@ impl Node {
         op: T,
         timeout: Duration,
     ) -> Result<T::Result, Error> {
-        let mut raw_node = self.raw_node.lock();
-        let (lc, notify) = self.add_notify();
-        let ctx = traft::EntryContextNormal::new(lc, op);
-        raw_node.propose(ctx.to_bytes(), vec![])?;
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
-        notify.recv_timeout::<T::Result>(timeout)
+        self.raw_operation(|raw_node| {
+            let (lc, notify) = self.add_notify();
+            let ctx = traft::EntryContextNormal::new(lc, op);
+            raw_node.propose(ctx.to_bytes(), vec![])?;
+            Ok(notify)
+        })?
+        .recv_timeout::<T::Result>(timeout)
     }
 
     pub fn campaign(&self) -> Result<(), Error> {
-        let mut raw_node = self.raw_node.lock();
-        raw_node.campaign()?;
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
+        self.raw_operation(|raw_node| raw_node.campaign().map_err(Into::into))?;
         // even though we don't expect a response, we still should let the
         // main_loop do an iteration
         fiber::reschedule();
@@ -207,27 +202,29 @@ impl Node {
     }
 
     pub fn step(&self, msg: raft::Message) {
-        let mut raw_node = self.raw_node.lock();
-        if msg.to != raw_node.raft.id {
-            return;
-        }
-        if let Err(e) = raw_node.step(msg) {
-            tlog!(Error, "{e}");
-        }
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
+        self.raw_operation(|raw_node| {
+            if msg.to != raw_node.raft.id {
+                return Ok(());
+            }
+            if let Err(e) = raw_node.step(msg) {
+                tlog!(Error, "{e}");
+            }
+            Ok(())
+        })
+        .ok();
         // even though we don't expect a response, we still should let the
         // main_loop do an iteration
         fiber::reschedule();
     }
 
     pub fn tick(&self, n_times: u32) {
-        let mut raw_node = self.raw_node.lock();
-        for _ in 0..n_times {
-            raw_node.tick();
-        }
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
+        self.raw_operation(|raw_node| {
+            for _ in 0..n_times {
+                raw_node.tick();
+            }
+            Ok(())
+        })
+        .ok();
         // even though we don't expect a response, we still should let the
         // main_loop do an iteration
         fiber::reschedule();
@@ -243,118 +240,127 @@ impl Node {
     }
 
     pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> {
-        let mut raw_node = self.raw_node.lock();
-
-        let status = raw_node.status();
-        if status.ss.raft_state != RaftStateRole::Leader {
-            return Err(RaftError::ConfChangeError("not a leader".into()).into());
-        }
+        self.raw_operation(|raw_node| {
+            if raw_node.raft.state != RaftStateRole::Leader {
+                return Err(RaftError::ConfChangeError("not a leader".into()).into());
+            }
 
-        let mut topology = self
-            .topology_cache
-            .pop(&raw_node.raft.term)
-            .unwrap_or_else(|| {
-                let peers = Storage::peers().unwrap();
-                let replication_factor = Storage::replication_factor().unwrap().unwrap();
-                Topology::from_peers(peers).with_replication_factor(replication_factor)
+            let mut topology = self
+                .topology_cache
+                .pop(&raw_node.raft.term)
+                .unwrap_or_else(|| {
+                    let peers = Storage::peers().unwrap();
+                    let replication_factor = Storage::replication_factor().unwrap().unwrap();
+                    Topology::from_peers(peers).with_replication_factor(replication_factor)
+                });
+
+            let peer_result = match req {
+                TopologyRequest::Join(JoinRequest {
+                    instance_id,
+                    replicaset_id,
+                    advertise_address,
+                    failure_domain,
+                    ..
+                }) => topology.join(
+                    instance_id,
+                    replicaset_id,
+                    advertise_address,
+                    failure_domain,
+                ),
+
+                TopologyRequest::UpdatePeer(UpdatePeerRequest {
+                    instance_id,
+                    health,
+                    failure_domain,
+                    ..
+                }) => topology.update_peer(&instance_id, health, failure_domain),
+            };
+
+            let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
+                self.topology_cache.put(raw_node.raft.term, topology);
+                return Err(RaftError::ConfChangeError(e).into());
             });
 
-        let peer_result = match req {
-            TopologyRequest::Join(JoinRequest {
-                instance_id,
-                replicaset_id,
-                advertise_address,
-                failure_domain,
-                ..
-            }) => topology.join(
-                instance_id,
-                replicaset_id,
-                advertise_address,
-                failure_domain,
-            ),
-
-            TopologyRequest::UpdatePeer(UpdatePeerRequest {
-                instance_id,
-                health,
-                failure_domain,
-                ..
-            }) => topology.update_peer(&instance_id, health, failure_domain),
-        };
+            peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
 
-        let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
+            let (lc, notify) = self.add_notify();
+            let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
+            raw_node.propose(ctx.to_bytes(), vec![])?;
             self.topology_cache.put(raw_node.raft.term, topology);
-            return Err(RaftError::ConfChangeError(e).into());
-        });
-
-        peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
-
-        let (lc, notify) = self.add_notify();
-        let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
-        raw_node.propose(ctx.to_bytes(), vec![])?;
-        self.topology_cache.put(raw_node.raft.term, topology);
-        drop(raw_node);
-        event::broadcast(Event::RaftLoopNeeded);
-        notify.recv::<Peer>()
+            Ok(notify)
+        })?
+        .recv::<Peer>()
     }
 
     fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> {
-        let mut raw_node = self.raw_node.lock();
-        // In some states proposing a ConfChange is impossible.
-        // Check if there's a reason to reject it.
-
-        #[allow(clippy::never_loop)]
-        let reason: Option<&str> = loop {
-            // Checking leadership is only needed for the
-            // correct latch management. It doesn't affect
-            // raft correctness. Checking the instance is a
-            // leader makes sure the proposed `ConfChange`
-            // is appended to the raft log immediately
-            // instead of sending `MsgPropose` over the
-            // network.
-            if raw_node.raft.state != RaftStateRole::Leader {
-                break Some("not a leader");
-            }
-
-            if term != raw_node.raft.term {
-                break Some("raft term mismatch");
-            }
-
-            // Without this check the node would silently ignore the conf change.
-            // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
-            if raw_node.raft.has_pending_conf() {
-                break Some("already has pending confchange");
-            }
+        self.raw_operation(|raw_node| {
+            // In some states proposing a ConfChange is impossible.
+            // Check if there's a reason to reject it.
+
+            #[allow(clippy::never_loop)]
+            let reason: Option<&str> = loop {
+                // Checking leadership is only needed for the
+                // correct latch management. It doesn't affect
+                // raft correctness. Checking the instance is a
+                // leader makes sure the proposed `ConfChange`
+                // is appended to the raft log immediately
+                // instead of sending `MsgPropose` over the
+                // network.
+                if raw_node.raft.state != RaftStateRole::Leader {
+                    break Some("not a leader");
+                }
 
-            break None;
-        };
+                if term != raw_node.raft.term {
+                    break Some("raft term mismatch");
+                }
 
-        if let Some(e) = reason {
-            return Err(RaftError::ConfChangeError(e.into()).into());
-        }
+                // Without this check the node would silently ignore the conf change.
+                // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
+                if raw_node.raft.has_pending_conf() {
+                    break Some("already has pending confchange");
+                }
 
-        let prev_index = raw_node.raft.raft_log.last_index();
-        raw_node.propose_conf_change(vec![], conf_change)?;
+                break None;
+            };
 
-        // oops, current instance isn't actually a leader
-        // (which is impossible in theory, but we're not
-        // sure in practice) and sent the ConfChange message
-        // to the raft network instead of appending it to the
-        // raft log.
-        let last_index = raw_node.raft.raft_log.last_index();
-        assert_eq!(last_index, prev_index + 1);
+            if let Some(e) = reason {
+                return Err(RaftError::ConfChangeError(e.into()).into());
+            }
 
-        let (rx, tx) = Notify::new().into_clones();
-        with_joint_state_latch(|joint_state_latch| {
-            assert!(joint_state_latch.take().is_none());
-            joint_state_latch.set(Some(JointStateLatch {
-                index: last_index,
-                notify: tx,
-            }));
-        });
+            let prev_index = raw_node.raft.raft_log.last_index();
+            raw_node.propose_conf_change(vec![], conf_change)?;
+
+            // oops, current instance isn't actually a leader
+            // (which is impossible in theory, but we're not
+            // sure in practice) and sent the ConfChange message
+            // to the raft network instead of appending it to the
+            // raft log.
+            let last_index = raw_node.raft.raft_log.last_index();
+            assert_eq!(last_index, prev_index + 1);
+
+            let (rx, tx) = Notify::new().into_clones();
+            with_joint_state_latch(|joint_state_latch| {
+                assert!(joint_state_latch.take().is_none());
+                joint_state_latch.set(Some(JointStateLatch {
+                    index: last_index,
+                    notify: tx,
+                }));
+            });
+            Ok(rx)
+        })?
+        .recv()
+    }
 
+    #[inline]
+    fn raw_operation<R>(
+        &self,
+        f: impl FnOnce(&mut RawNode) -> Result<R, Error>,
+    ) -> Result<R, Error> {
+        let mut raw_node = self.raw_node.lock();
+        let res = f(&mut *raw_node);
         drop(raw_node);
         event::broadcast(Event::RaftLoopNeeded);
-        rx.recv()
+        res
     }
 
     #[inline]
-- 
GitLab