From 12d19dc5e1f4bbb51b918e22f542c051a4a87dad Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 6 Dec 2022 10:33:58 +0300
Subject: [PATCH] refactor(governor): governor loop is also a Loop struct
 similar to MainLoop

---
 src/governor/mod.rs | 130 ++++++++++++++++++++++++++++----------------
 src/traft/node.rs   |  21 +++----
 2 files changed, 90 insertions(+), 61 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 3d525509d3..cbca4c6ec9 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -10,6 +10,7 @@ use ::tarantool::space::UpdateOps;
 use ::tarantool::util::IntoClones as _;
 
 use crate::event::{self, Event};
+use crate::r#loop::FlowControl::{self, Continue};
 use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
 use crate::tlog;
 use crate::traft::error::Error;
@@ -31,24 +32,20 @@ pub(crate) mod migration;
 pub(crate) use cc::raft_conf_change;
 pub(crate) use migration::waiting_migrations;
 
-pub(crate) fn governor_loop(
-    status: Rc<Cell<Status>>,
-    storage: Clusterwide,
-    raft_storage: RaftSpaceAccess,
-) {
-    let mut pool = ConnectionPool::builder(storage.clone())
-        .call_timeout(Duration::from_secs(1))
-        .connect_timeout(Duration::from_millis(500))
-        .inactivity_timeout(Duration::from_secs(60))
-        .build();
-
-    // TODO: don't hardcode this
+impl Loop {
     const SYNC_TIMEOUT: Duration = Duration::from_secs(10);
 
-    'governor: loop {
+    async fn iter_fn(
+        Args {
+            status,
+            storage,
+            raft_storage,
+        }: &Args,
+        State { pool }: &mut State,
+    ) -> FlowControl {
         if !status.get().raft_state.is_leader() {
             event::wait(Event::StatusChanged).expect("Events system must be initialized");
-            continue 'governor;
+            return Continue;
         }
 
         let instances = storage.instances.all_instances().unwrap();
@@ -70,7 +67,7 @@ pub(crate) fn governor_loop(
                 tlog!(Warning, "failed proposing conf_change: {e}");
                 fiber::sleep(Duration::from_secs(1));
             }
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -110,7 +107,7 @@ pub(crate) fn governor_loop(
                     );
                     node.transfer_leadership_and_yield(new_leader.raft_id);
                     event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                    continue 'governor;
+                    return Continue;
                 }
             }
 
@@ -143,7 +140,7 @@ pub(crate) fn governor_loop(
                 tlog!(Warning, "failed proposing replicaset master change: {e}");
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             // reconfigure vshard storages and routers
@@ -164,13 +161,13 @@ pub(crate) fn governor_loop(
                             sharding::Request {
                                 term,
                                 commit,
-                                timeout: SYNC_TIMEOUT,
+                                timeout: Self::SYNC_TIMEOUT,
                                 bootstrap: false,
                             },
                         )
                     });
                 // TODO: don't hard code timeout
-                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+                let res = call_all(pool, reqs, Duration::from_secs(3))?;
                 for (_, resp) in res {
                     let sharding::Response {} = resp?;
                 }
@@ -180,13 +177,12 @@ pub(crate) fn governor_loop(
                 tlog!(Warning, "failed calling rpc::sharding: {e}");
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             // update instance's CurrentGrade
-            let req =
-                update_instance::Request::new(instance.instance_id.clone(), cluster_id.clone())
-                    .with_current_grade(instance.target_grade.into());
+            let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id)
+                .with_current_grade(instance.target_grade.into());
             tlog!(Info,
                 "handling update_instance::Request";
                 "current_grade" => %req.current_grade.expect("just set"),
@@ -199,7 +195,7 @@ pub(crate) fn governor_loop(
                 );
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             let replicaset_instances = storage
@@ -218,7 +214,7 @@ pub(crate) fn governor_loop(
                     "replicaset lost quorum";
                     "replicaset_id" => %replicaset_id,
                 );
-                continue 'governor;
+                return Continue;
             }
 
             let res = (|| -> Result<_> {
@@ -235,7 +231,7 @@ pub(crate) fn governor_loop(
                         replication::promote::Request {
                             term,
                             commit,
-                            timeout: SYNC_TIMEOUT,
+                            timeout: Self::SYNC_TIMEOUT,
                         },
                         // TODO: don't hard code timeout
                         Duration::from_secs(3),
@@ -250,7 +246,7 @@ pub(crate) fn governor_loop(
                 );
             }
 
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -268,7 +264,7 @@ pub(crate) fn governor_loop(
                 &instance.raft_id,
                 sync::Request {
                     commit,
-                    timeout: SYNC_TIMEOUT,
+                    timeout: Self::SYNC_TIMEOUT,
                 },
                 move |res| tx.send(res).expect("mustn't fail"),
             )
@@ -304,7 +300,7 @@ pub(crate) fn governor_loop(
                 }
             }
 
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -331,12 +327,12 @@ pub(crate) fn governor_loop(
                     .zip(repeat(replication::Request {
                         term,
                         commit,
-                        timeout: SYNC_TIMEOUT,
+                        timeout: Self::SYNC_TIMEOUT,
                         replicaset_instances: replicaset_iids.clone(),
                         replicaset_id: replicaset_id.clone(),
                     }));
                 // TODO: don't hard code timeout
-                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+                let res = call_all(pool, reqs, Duration::from_secs(3))?;
 
                 for (instance_id, resp) in res {
                     let replication::Response { lsn } = resp?;
@@ -359,7 +355,7 @@ pub(crate) fn governor_loop(
                 tlog!(Warning, "failed to configure replication: {e}");
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             let res = (|| -> Result<_> {
@@ -389,7 +385,7 @@ pub(crate) fn governor_loop(
                     replication::promote::Request {
                         term,
                         commit,
-                        timeout: SYNC_TIMEOUT,
+                        timeout: Self::SYNC_TIMEOUT,
                     },
                     // TODO: don't hard code timeout
                     Duration::from_secs(3),
@@ -408,7 +404,7 @@ pub(crate) fn governor_loop(
 
             tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id);
 
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -426,13 +422,13 @@ pub(crate) fn governor_loop(
                         sharding::Request {
                             term,
                             commit,
-                            timeout: SYNC_TIMEOUT,
+                            timeout: Self::SYNC_TIMEOUT,
                             bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id,
                         },
                     )
                 });
                 // TODO: don't hard code timeout
-                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+                let res = call_all(pool, reqs, Duration::from_secs(3))?;
 
                 for (instance_id, resp) in res {
                     let sharding::Response {} = resp?;
@@ -468,7 +464,7 @@ pub(crate) fn governor_loop(
                 tlog!(Warning, "failed to initialize sharding: {e}");
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             let res = (|| -> Result<()> {
@@ -483,10 +479,10 @@ pub(crate) fn governor_loop(
                     .zip(repeat(replication::promote::Request {
                         term,
                         commit,
-                        timeout: SYNC_TIMEOUT,
+                        timeout: Self::SYNC_TIMEOUT,
                     }));
                 // TODO: don't hard code timeout
-                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+                let res = call_all(pool, reqs, Duration::from_secs(3))?;
                 for (instance_id, resp) in res {
                     resp?;
                     tlog!(Debug, "promoted replicaset master"; "instance_id" => %instance_id);
@@ -499,7 +495,7 @@ pub(crate) fn governor_loop(
 
             tlog!(Info, "sharding is initialized");
 
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -512,7 +508,7 @@ pub(crate) fn governor_loop(
         });
         if let Some(instance) = to_update_weights {
             let res = if let Some(added_weights) =
-                get_weight_changes(maybe_responding(&instances), &storage)
+                get_weight_changes(maybe_responding(&instances), storage)
             {
                 (|| -> Result<()> {
                     for (replicaset_id, weight) in added_weights {
@@ -531,11 +527,11 @@ pub(crate) fn governor_loop(
                     let reqs = instance_ids.zip(repeat(sharding::Request {
                         term,
                         commit,
-                        timeout: SYNC_TIMEOUT,
+                        timeout: Self::SYNC_TIMEOUT,
                         bootstrap: false,
                     }));
                     // TODO: don't hard code timeout
-                    let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+                    let res = call_all(pool, reqs, Duration::from_secs(3))?;
 
                     for (instance_id, resp) in res {
                         resp?;
@@ -580,12 +576,12 @@ pub(crate) fn governor_loop(
 
                 // TODO: don't hard code timeout
                 event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
-                continue 'governor;
+                return Continue;
             }
 
             tlog!(Info, "sharding is configured");
 
-            continue 'governor;
+            return Continue;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -607,7 +603,7 @@ pub(crate) fn governor_loop(
                 let req = rpc::migration::apply::Request {
                     term,
                     commit,
-                    timeout: SYNC_TIMEOUT,
+                    timeout: Self::SYNC_TIMEOUT,
                     migration_id: migration.id,
                 };
                 let res = pool.call_and_wait(&instance.raft_id, req);
@@ -637,7 +633,7 @@ pub(crate) fn governor_loop(
                             replicaset.replicaset_id,
                             e
                         );
-                        continue 'governor;
+                        return Continue;
                     }
                 }
             }
@@ -646,7 +642,47 @@ pub(crate) fn governor_loop(
 
         event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged])
             .expect("Events system must be initialized");
+
+        Continue
     }
+
+    pub fn start(
+        status: Rc<Cell<Status>>,
+        storage: Clusterwide,
+        raft_storage: RaftSpaceAccess,
+    ) -> Self {
+        let args = Args {
+            status,
+            storage,
+            raft_storage,
+        };
+
+        let state = State {
+            pool: ConnectionPool::builder(args.storage.clone())
+                .call_timeout(Duration::from_secs(1))
+                .connect_timeout(Duration::from_millis(500))
+                .inactivity_timeout(Duration::from_secs(60))
+                .build(),
+        };
+
+        Self {
+            _loop: crate::loop_start!("governor_loop", Self::iter_fn, args, state),
+        }
+    }
+}
+
+pub struct Loop {
+    _loop: Option<fiber::UnitJoinHandle<'static>>,
+}
+
+struct Args {
+    status: Rc<Cell<Status>>,
+    storage: Clusterwide,
+    raft_storage: RaftSpaceAccess,
+}
+
+struct State {
+    pool: ConnectionPool,
 }
 
 #[allow(clippy::type_complexity)]
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 78b4913d95..097e9f561d 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -25,7 +25,7 @@ use std::rc::Rc;
 use std::time::Duration;
 use std::time::Instant;
 
-use crate::governor::governor_loop;
+use crate::governor;
 use crate::kvcell::KVCell;
 use crate::loop_start;
 use crate::r#loop::FlowControl;
@@ -122,7 +122,7 @@ pub struct Node {
     pub(crate) storage: Clusterwide,
     pub(crate) raft_storage: RaftSpaceAccess,
     main_loop: MainLoop,
-    _conf_change_loop: fiber::UnitJoinHandle<'static>,
+    _governor_loop: governor::Loop,
     status: Rc<Cell<Status>>,
 }
 
@@ -150,21 +150,14 @@ impl Node {
 
         let node_impl = Rc::new(Mutex::new(node_impl));
 
-        let governor_loop_fn = {
-            let status = status.clone();
-            let storage = storage.clone();
-            let raft_storage = raft_storage.clone();
-            move || governor_loop(status, storage, raft_storage)
-        };
-
         let node = Node {
             raft_id,
             main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields
-            _conf_change_loop: fiber::Builder::new()
-                .name("governor_loop")
-                .proc(governor_loop_fn)
-                .start()
-                .unwrap(),
+            _governor_loop: governor::Loop::start(
+                status.clone(),
+                storage.clone(),
+                raft_storage.clone(),
+            ),
             node_impl,
             storage,
             raft_storage,
-- 
GitLab