From 3f322af6817deeaa424d88ba63e9c9c78c3770a2 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 1 Dec 2023 12:47:15 +0300
Subject: [PATCH] refactor: cleanup up waking up governor & getting expelled

---
 src/traft/error.rs |   2 +
 src/traft/node.rs  | 100 +++++++++++++++++++++++++++++----------------
 2 files changed, 66 insertions(+), 36 deletions(-)

diff --git a/src/traft/error.rs b/src/traft/error.rs
index dbad2b1ffa..2bf3ce04b5 100644
--- a/src/traft/error.rs
+++ b/src/traft/error.rs
@@ -14,6 +14,8 @@ pub enum Error {
     Uninitialized,
     #[error("timeout")]
     Timeout,
+    #[error("current instance is expelled from the cluster")]
+    Expelled,
     #[error("{0}")]
     Raft(#[from] raft::Error),
     #[error("downcast error: expected {expected:?}, actual: {actual:?}")]
diff --git a/src/traft/node.rs b/src/traft/node.rs
index ce51519e93..4f59fda226 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -611,7 +611,6 @@ impl NodeImpl {
     fn handle_committed_entries(
         &mut self,
         entries: &[raft::Entry],
-        wake_governor: &mut bool,
         expelled: &mut bool,
     ) -> traft::Result<()> {
         let mut entries = entries.iter().peekable();
@@ -633,8 +632,7 @@ impl NodeImpl {
                 let entry_index = entry.index;
                 match entry.entry_type {
                     raft::EntryType::EntryNormal => {
-                        apply_entry_result =
-                            self.handle_committed_normal_entry(entry, wake_governor, expelled);
+                        apply_entry_result = self.handle_committed_normal_entry(entry, expelled);
                         if apply_entry_result != EntryApplied {
                             return Ok(());
                         }
@@ -680,11 +678,39 @@ impl NodeImpl {
         Ok(())
     }
 
+    fn wake_governor_if_needed(&self, op: &Op) {
+        let wake_governor = match &op {
+            Op::Dml(op) => {
+                matches!(
+                    op.space().try_into(),
+                    Ok(ClusterwideTable::Property
+                        | ClusterwideTable::Replicaset
+                        | ClusterwideTable::Instance)
+                )
+            }
+            Op::DdlPrepare { .. } => true,
+            _ => false,
+        };
+
+        // NOTE: this may be premature, because the dml may fail to apply and/or
+        // the transaction may be rolled back, but we ignore this for the sake
+        // of simplicity, as nothing bad can happen if governor makes another
+        // idle iteration.
+        if wake_governor {
+            let res = global()
+                .expect("node must be initialized by this point")
+                .governor_loop
+                .wakeup();
+            if let Err(e) = res {
+                tlog!(Warning, "failed waking up governor: {e}");
+            }
+        }
+    }
+
     /// Is called during a transaction
     fn handle_committed_normal_entry(
         &mut self,
         entry: traft::Entry,
-        wake_governor: &mut bool,
         expelled: &mut bool,
     ) -> ApplyEntryResult {
         assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
@@ -693,21 +719,7 @@ impl NodeImpl {
         let op = entry.into_op().unwrap_or(Op::Nop);
         tlog!(Debug, "applying entry: {op}"; "index" => index);
 
-        match &op {
-            Op::Dml(op) => match op.space().try_into() {
-                Ok(ClusterwideTable::Property | ClusterwideTable::Replicaset) => {
-                    *wake_governor = true;
-                }
-                Ok(ClusterwideTable::Instance) => {
-                    *wake_governor = true;
-                }
-                _ => {}
-            },
-            Op::DdlPrepare { .. } => {
-                *wake_governor = true;
-            }
-            _ => {}
-        }
+        self.wake_governor_if_needed(&op);
 
         let storage_properties = &self.storage.properties;
 
@@ -1478,10 +1490,20 @@ impl NodeImpl {
     ///
     /// This includes:
     /// - Sending messages to other instances (raft nodes);
-    /// - Applying committed entries;
+    /// - Handling raft snapshot:
+    ///   - Verifying & waiting until snapshot data can be applied
+    ///     (see [`Self::prepare_for_snapshot`] for more details);
+    ///   - Persisting snapshot metadata;
+    ///   - Compacting the raft log;
+    ///   - Restoring local storage contents from the snapshot;
     /// - Persisting uncommitted entries;
     /// - Persisting hard state (term, vote, commit);
+    /// - Applying committed entries;
     /// - Notifying pending fibers;
+    /// - Waking up the governor loop, so that it can handle any global state
+    ///   changes;
+    ///
+    /// Returns an error if the instance was expelled from the cluster.
     ///
     /// See also:
     ///
@@ -1489,7 +1511,7 @@ impl NodeImpl {
     /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
     ///
     /// This function yields.
-    fn advance(&mut self, wake_governor: &mut bool, expelled: &mut bool) {
+    fn advance(&mut self) -> traft::Result<()> {
         // Handle any unreachable nodes from previous iteration.
         let unreachables = self
             .instance_reachability
@@ -1516,9 +1538,11 @@ impl NodeImpl {
 
         // Get the `Ready` with `RawNode::ready` interface.
         if !self.raw_node.has_ready() {
-            return;
+            return Ok(());
         }
 
+        let mut expelled = false;
+
         let mut ready: raft::Ready = self.raw_node.ready();
 
         // Apply soft state changes before anything else, so that this info is
@@ -1542,7 +1566,7 @@ impl NodeImpl {
         let snapshot = ready.snapshot();
         let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
             // Error was already logged
-            return;
+            return Ok(());
         };
 
         // Persist stuff raft wants us to persist.
@@ -1624,7 +1648,7 @@ impl NodeImpl {
         // Apply committed entries.
         let committed_entries = ready.committed_entries();
         if !committed_entries.is_empty() {
-            let res = self.handle_committed_entries(committed_entries, wake_governor, expelled);
+            let res = self.handle_committed_entries(committed_entries, &mut expelled);
             if let Err(e) = res {
                 tlog!(Warning, "dropping raft ready: {ready:#?}");
                 panic!("transaction failed: {e}");
@@ -1673,7 +1697,7 @@ impl NodeImpl {
         // These are probably entries which we've just persisted.
         let committed_entries = light_rd.committed_entries();
         if !committed_entries.is_empty() {
-            let res = self.handle_committed_entries(committed_entries, wake_governor, expelled);
+            let res = self.handle_committed_entries(committed_entries, &mut expelled);
             if let Err(e) = res {
                 panic!("transaction failed: {e}");
             }
@@ -1685,6 +1709,12 @@ impl NodeImpl {
         self.raw_node.advance_apply();
 
         self.main_loop_status("idle");
+
+        if expelled {
+            return Err(Error::Expelled);
+        }
+
+        Ok(())
     }
 
     #[allow(dead_code)]
@@ -1796,7 +1826,6 @@ impl MainLoop {
         };
 
         Self {
-            // implicit yield
             _loop: loop_start!("raft_main_loop", Self::iter_fn, state),
             loop_waker: loop_waker_tx,
             stop_flag,
@@ -1827,22 +1856,21 @@ impl MainLoop {
             node_impl.raw_node.tick();
         }
 
-        let mut wake_governor = false;
-        let mut expelled = false;
-        node_impl.advance(&mut wake_governor, &mut expelled); // yields
+        let res = node_impl.advance(); // yields
         drop(node_impl);
         if state.stop_flag.take() {
             return FlowControl::Break;
         }
 
-        if expelled {
-            crate::tarantool::exit(0);
-        }
-
-        if wake_governor {
-            if let Err(e) = async { global()?.governor_loop.wakeup() }.await {
-                tlog!(Warning, "failed waking up governor: {e}");
+        match res {
+            Err(e @ Error::Expelled) => {
+                tlog!(Info, "{e}, shutting down");
+                crate::tarantool::exit(0);
+            }
+            Err(e) => {
+                tlog!(Error, "error during raft main loop iteration: {e}");
             }
+            Ok(()) => {}
         }
 
         FlowControl::Continue
-- 
GitLab