From 1f0486fcc9bacd2202b9247d3b71b94d1b060ad2 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 20 Oct 2022 16:50:08 +0300
Subject: [PATCH] feat(governor): ignore offline & expelled peers

---
 src/traft/mod.rs          |  6 ++++
 src/traft/node.rs         | 63 ++++++++++++++++++---------------------
 src/traft/rpc/sharding.rs |  7 +++++
 3 files changed, 42 insertions(+), 34 deletions(-)

diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index f8498548bb..2edf448c3b 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -461,6 +461,12 @@ impl Peer {
         matches!(self.current_grade, CurrentGrade::Online)
     }
 
+    /// Peer has a grade that implies it may cooperate.
+    /// Currently this means that target_grade is neither Offline or Expelled.
+    pub fn may_respond(&self) -> bool {
+        self.target_grade != TargetGrade::Offline && self.target_grade != TargetGrade::Expelled
+    }
+
     pub fn has_grades(&self, current: CurrentGrade, target: TargetGrade) -> bool {
         self.current_grade == current && self.target_grade == target
     }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index dce2ff8667..b8408531bb 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -51,7 +51,6 @@ use crate::traft::failover;
 use crate::traft::notify::Notify;
 use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
 use crate::traft::rpc::{replication, sharding};
-use crate::traft::storage::peer_field;
 use crate::traft::storage::{State, StateKey};
 use crate::traft::ConnectionPool;
 use crate::traft::LogicalClock;
@@ -1008,6 +1007,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
         let leader_id = status.get().id;
         let peers = storage.peers.all_peers().unwrap();
         let term = storage.raft.term().unwrap().unwrap_or(0);
+        let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
         let node = global().expect("must be initialized");
 
         ////////////////////////////////////////////////////////////////////////
@@ -1034,10 +1034,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             .filter(|peer| peer.current_grade != CurrentGrade::Offline)
             .find(|peer| peer.target_grade == TargetGrade::Offline);
         if let Some(peer) = to_offline {
-            // TODO: everybody needs to rerun vshard.*.cfg
             let instance_id = peer.instance_id.clone();
-            let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
-            let req = UpdatePeerRequest::new(instance_id, cluster_id)
+            let req = UpdatePeerRequest::new(instance_id, cluster_id.clone())
                 .with_current_grade(CurrentGrade::Offline);
             let res = node.handle_topology_request_and_wait(req.into());
             if let Err(e) = res {
@@ -1045,7 +1043,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     "instance_id" => &*peer.instance_id,
                 );
             }
-            continue 'governor;
         }
 
         ////////////////////////////////////////////////////////////////////////
@@ -1075,7 +1072,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     "commit" => commit,
                     "instance_id" => &*peer.instance_id,
                 );
-                let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
 
                 let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                     .with_current_grade(CurrentGrade::RaftSynced);
@@ -1112,21 +1108,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
         if let Some(peer) = to_replicate {
             let replicaset_id = &peer.replicaset_id;
-            let replicaset_iids = unwrap_ok_or!(
-                // TODO: filter out Offline & Expelled peers
-                // TODO: use `peers` instead
-                storage.peers.replicaset_fields::<peer_field::InstanceId>(replicaset_id),
-                Err(e) => {
-                    tlog!(Warning, "failed reading replicaset instances: {e}";
-                        "replicaset_id" => replicaset_id,
-                    );
-
-                    // TODO: don't hard code timeout
-                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))
-                        .unwrap();
-                    continue 'governor;
-                }
-            );
+            let replicaset_iids = maybe_responding(&peers)
+                .filter(|peer| &peer.replicaset_id == replicaset_id)
+                .map(|peer| peer.instance_id.clone())
+                .collect::<Vec<_>>();
 
             let replicaset_size = replicaset_iids.len();
             let reqs = replicaset_iids
@@ -1147,7 +1132,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 }
             );
 
-            let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
             for (peer_iid, resp) in res {
                 let cluster_id = cluster_id.clone();
                 let peer_iid_2 = peer_iid.clone();
@@ -1226,8 +1210,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
         if need_sharding {
             let res = (|| -> Result<(), Error> {
                 let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
-                // TODO: filter out Offline & Expelled peers
-                let reqs = peers.iter().map(|peer| {
+                let reqs = maybe_responding(&peers).map(|peer| {
                     (
                         peer.instance_id.clone(),
                         sharding::Request {
@@ -1241,10 +1224,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 // TODO: don't hard code timeout
                 let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
 
-                let cluster_id = storage
-                    .raft
-                    .cluster_id()?
-                    .expect("no cluster_id in storage");
                 for (peer_iid, resp) in res {
                     let sharding::Response {} = resp?;
                     let req = UpdatePeerRequest::new(peer_iid.clone(), cluster_id.clone())
@@ -1258,6 +1237,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 }
 
                 if !vshard_bootstrapped {
+                    // TODO: if this fails, it will only rerun next time vshard
+                    // gets reconfigured
                     node.propose_and_wait(
                         traft::OpDML::replace(
                             ClusterSpace::State,
@@ -1288,9 +1269,11 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             .iter()
             .any(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
         if maybe_need_weights_update {
-            let res = if let Some(new_weights) = get_new_weights(&peers, &storage.state) {
+            let res = if let Some(new_weights) =
+                get_new_weights(maybe_responding(&peers), &storage.state)
+            {
                 (|| -> Result<(), Error> {
-                    let peer_ids = peers.iter().map(|peer| peer.instance_id.clone());
+                    let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
                     let reqs = peer_ids.zip(repeat(sharding::Request {
                         leader_id,
                         term,
@@ -1300,7 +1283,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     // TODO: don't hard code timeout
                     let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
 
-                    let cluster_id = storage.raft.cluster_id()?.unwrap();
                     for (peer_iid, resp) in res {
                         let cluster_id = cluster_id.clone();
                         let peer_iid_2 = peer_iid.clone();
@@ -1313,6 +1295,8 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                         tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);
                     }
 
+                    // TODO: if this fails, it will only rerun next time vshard
+                    // gets reconfigured
                     node.propose_and_wait(
                         // TODO: OpDML::update with just the changes
                         traft::OpDML::replace(
@@ -1326,7 +1310,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 })()
             } else {
                 (|| -> Result<(), Error> {
-                    let cluster_id = storage.raft.cluster_id()?.unwrap();
                     let to_online = peers.iter().filter(|peer| {
                         peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)
                     });
@@ -1402,12 +1385,19 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
         Ok(rx.into_iter().take(peer_count).collect())
     }
 
-    fn get_new_weights(peers: &[Peer], state: &State) -> Option<ReplicasetWeights> {
+    #[inline(always)]
+    fn get_new_weights<'p>(
+        peers: impl IntoIterator<Item = &'p Peer>,
+        state: &State,
+    ) -> Option<ReplicasetWeights> {
         let replication_factor = state.replication_factor().expect("storage error");
         let mut replicaset_weights = state.replicaset_weights().expect("storage error");
         let mut replicaset_sizes = HashMap::new();
         let mut weights_changed = false;
-        for Peer { replicaset_id, .. } in peers {
+        for peer @ Peer { replicaset_id, .. } in peers {
+            if !peer.may_respond() {
+                continue;
+            }
             let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
             *replicaset_size += 1;
             if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
@@ -1417,6 +1407,11 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
         }
         weights_changed.then_some(replicaset_weights)
     }
+
+    #[inline(always)]
+    fn maybe_responding(peers: &[Peer]) -> impl Iterator<Item = &Peer> {
+        peers.iter().filter(|peer| peer.may_respond())
+    }
 }
 
 fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs
index 1a3920ebdb..081483d41b 100644
--- a/src/traft/rpc/sharding.rs
+++ b/src/traft/rpc/sharding.rs
@@ -36,6 +36,10 @@ fn proc_sharding(req: Request) -> Result<Response, Error> {
         lua.exec("vshard.router.bootstrap()")?;
     }
 
+    // After reconfiguring vshard leaves behind net.box.connection objects,
+    // which try reconnecting every 0.5 seconds. Garbage collecting them helps
+    lua.exec("collectgarbage()")?;
+
     Ok(Response {})
 }
 
@@ -124,6 +128,9 @@ pub mod cfg {
         pub fn new(peers: &Peers, replicaset_weights: ReplicasetWeights) -> Result<Self, Error> {
             let mut sharding: HashMap<String, Replicaset> = HashMap::new();
             for peer in peers.iter()? {
+                if !peer.may_respond() {
+                    continue;
+                }
                 let replicaset_id = peer.replicaset_id;
                 let replicaset = sharding.entry(peer.replicaset_uuid).or_insert_with(||
                     Replicaset::with_weight(replicaset_weights.get(&replicaset_id).copied())
-- 
GitLab