From e5b615456398cea90495559d0b19535604a258d1 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Sat, 22 Oct 2022 20:34:07 +0300
Subject: [PATCH] fix(governor): used to update CurrentGrade for peers who
 didn't need it

---
 src/traft/node.rs | 125 +++++++++++++++++++++-------------------------
 1 file changed, 56 insertions(+), 69 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index c1966072ed..b26339ac29 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -36,8 +36,8 @@ use crate::traft::Peer;
 use crate::traft::RaftId;
 use crate::traft::RaftIndex;
 use crate::traft::RaftTerm;
+use crate::unwrap_some_or;
 use crate::warn_or_panic;
-use crate::{unwrap_ok_or, unwrap_some_or};
 use ::tarantool::util::IntoClones as _;
 use protobuf::Message as _;
 use std::iter::FromIterator as _;
@@ -1143,10 +1143,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
         ////////////////////////////////////////////////////////////////////////
         // replication
-        // TODO: putting each stage in a different function
-        // will make error handling much easier
         let to_replicate = peers
             .iter()
+            // TODO: find all such peers in a given replicaset,
+            // not just the first one
             .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
         if let Some(peer) = to_replicate {
             let replicaset_id = &peer.replicaset_id;
@@ -1156,59 +1156,47 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                 .collect::<Vec<_>>();
 
             let replicaset_size = replicaset_iids.len();
-            let reqs = replicaset_iids
-                .iter()
-                .cloned()
-                .zip(repeat(replication::Request {
-                    leader_and_term: LeaderWithTerm { leader_id, term },
-                    replicaset_instances: replicaset_iids.clone(),
-                    replicaset_id: replicaset_id.clone(),
-                    // TODO: what if someone goes offline/expelled?
-                    promote: replicaset_size == 1,
-                }));
-            // TODO: don't hard code timeout
-            let res = call_all(&mut pool, reqs, Duration::from_secs(3));
-            let res = unwrap_ok_or!(res,
-                Err(e) => {
-                    tlog!(Warning, "failed to configure replication: {e}");
-                    continue 'governor;
+            let res = (|| -> Result<_, Error> {
+                let reqs = replicaset_iids
+                    .iter()
+                    .cloned()
+                    .zip(repeat(replication::Request {
+                        leader_and_term: LeaderWithTerm { leader_id, term },
+                        replicaset_instances: replicaset_iids.clone(),
+                        replicaset_id: replicaset_id.clone(),
+                        // TODO: what if someone goes offline/expelled?
+                        promote: replicaset_size == 1,
+                    }));
+                // TODO: don't hard code timeout
+                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
+
+                for (peer_iid, resp) in res {
+                    let replication::Response { lsn } = resp?;
+                    // TODO: change `Info` to `Debug`
+                    tlog!(Info, "configured replication with peer";
+                        "instance_id" => &*peer_iid,
+                        "lsn" => lsn,
+                    );
                 }
-            );
-
-            for (peer_iid, resp) in res {
-                let cluster_id = cluster_id.clone();
-                let peer_iid_2 = peer_iid.clone();
-                let res = resp.and_then(move |replication::Response { lsn }| {
-                    let mut req = UpdatePeerRequest::new(peer_iid_2, cluster_id)
-                        .with_current_grade(CurrentGrade::Replicated);
-                    if replicaset_size == 1 {
-                        // TODO: ignore expelled peers
-                        // TODO: ignore offline peers
-                        req = req.with_is_master(true);
-                    }
-                    node.handle_topology_request_and_wait(req.into())
-                        .map(|_| lsn)
-                });
-                match res {
-                    Ok(lsn) => {
-                        // TODO: change `Info` to `Debug`
-                        tlog!(Info, "configured replication with peer";
-                            "instance_id" => &*peer_iid,
-                            "lsn" => lsn,
-                        );
-                    }
-                    Err(e) => {
-                        tlog!(Warning, "failed to configure replication: {e}";
-                            "instance_id" => &*peer_iid,
-                        );
-
-                        // TODO: don't hard code timeout
-                        event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1))
-                            .unwrap();
-                        continue 'governor;
-                    }
+
+                let mut req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
+                    .with_current_grade(CurrentGrade::Replicated);
+                if replicaset_size == 1 {
+                    // TODO: ignore expelled peers
+                    // TODO: ignore offline peers
+                    req = req.with_is_master(true);
                 }
+                node.handle_topology_request_and_wait(req.into())?;
+
+                Ok(())
+            })();
+            if let Err(e) = res {
+                tlog!(Warning, "failed to configure replication: {e}");
+                // TODO: don't hard code timeout
+                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
+                continue 'governor;
             }
+
             let replicaset_weight = storage
                 .state
                 .replicaset_weight(replicaset_id)
@@ -1247,10 +1235,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
         ////////////////////////////////////////////////////////////////////////
         // init sharding
-        let need_sharding = peers
+        let to_shard = peers
             .iter()
-            .any(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
-        if need_sharding {
+            .find(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
+        if let Some(peer) = to_shard {
             let res = (|| -> Result<(), Error> {
                 let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
                 let reqs = maybe_responding(&peers).map(|peer| {
@@ -1268,9 +1256,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
                 for (peer_iid, resp) in res {
                     let sharding::Response {} = resp?;
-                    let req = UpdatePeerRequest::new(peer_iid.clone(), cluster_id.clone())
-                        .with_current_grade(CurrentGrade::ShardingInitialized);
-                    node.handle_topology_request_and_wait(req.into())?;
 
                     // TODO: change `Info` to `Debug`
                     tlog!(Info, "initialized sharding with peer";
@@ -1278,6 +1263,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     );
                 }
 
+                let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
+                    .with_current_grade(CurrentGrade::ShardingInitialized);
+                node.handle_topology_request_and_wait(req.into())?;
+
                 if !vshard_bootstrapped {
                     // TODO: if this fails, it will only rerun next time vshard
                     // gets reconfigured
@@ -1307,10 +1296,10 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
 
         ////////////////////////////////////////////////////////////////////////
         // sharding weights
-        let maybe_need_weights_update = peers
+        let to_update_weights = peers
             .iter()
-            .any(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
-        if maybe_need_weights_update {
+            .find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
+        if let Some(peer) = to_update_weights {
             let res = if let Some(new_weights) =
                 get_new_weights(maybe_responding(&peers), &storage.state)
             {
@@ -1325,17 +1314,15 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
                     let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
 
                     for (peer_iid, resp) in res {
-                        let cluster_id = cluster_id.clone();
-                        let peer_iid_2 = peer_iid.clone();
-                        resp.and_then(move |sharding::Response {}| {
-                            let req = UpdatePeerRequest::new(peer_iid_2, cluster_id)
-                                .with_current_grade(CurrentGrade::Online);
-                            node.handle_topology_request_and_wait(req.into())
-                        })?;
+                        resp?;
                         // TODO: change `Info` to `Debug`
                         tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);
                     }
 
+                    let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
+                        .with_current_grade(CurrentGrade::Online);
+                    node.handle_topology_request_and_wait(req.into())?;
+
                     // TODO: if this fails, it will only rerun next time vshard
                     // gets reconfigured
                     node.propose_and_wait(
-- 
GitLab