From 07cd2d2061db45874747f8cb77d92bbdea359330 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 15 Dec 2022 18:55:44 +0300
Subject: [PATCH] refactor(governor): plan for applying migrations

---
 src/governor/migration.rs | 126 ++++++++++++++++++------------------
 src/governor/mod.rs       | 130 ++++++++++++++++++--------------------
 src/storage.rs            |   1 +
 3 files changed, 121 insertions(+), 136 deletions(-)

diff --git a/src/governor/migration.rs b/src/governor/migration.rs
index 6790da5e11..9b4968b4aa 100644
--- a/src/governor/migration.rs
+++ b/src/governor/migration.rs
@@ -1,84 +1,78 @@
-use crate::traft::Migration;
 use crate::traft::Replicaset;
-use crate::traft::ReplicasetId;
 
-pub(crate) fn waiting_migrations<'a>(
-    migrations: &'a mut [Migration],
-    replicasets: &'a [Replicaset],
+pub(crate) fn get_pending_migration<'r>(
+    mut migration_ids: Vec<u64>,
+    replicasets: &[&'r Replicaset],
     desired_schema_version: u64,
-) -> Vec<(u64, Vec<ReplicasetId>)> {
-    migrations.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
-    let mut res: Vec<(u64, Vec<ReplicasetId>)> = Vec::new();
-    for m in migrations {
-        let mut rs: Vec<ReplicasetId> = Vec::new();
+) -> Option<(u64, &'r Replicaset)> {
+    migration_ids.sort();
+    for m_id in migration_ids {
         for r in replicasets {
-            if r.current_schema_version < m.id && m.id <= desired_schema_version {
-                rs.push(r.replicaset_id.clone());
+            if r.current_schema_version < m_id && m_id <= desired_schema_version {
+                return Some((m_id, r));
             }
         }
-        if !rs.is_empty() {
-            res.push((m.id, rs));
-        }
     }
-    res
+    None
 }
 
 #[cfg(test)]
 mod tests {
-    use crate::traft::InstanceId;
-    use crate::traft::Migration;
-    use crate::traft::Replicaset;
-    use crate::traft::ReplicasetId;
-
-    use super::waiting_migrations;
-
-    macro_rules! m {
-        ($id:literal, $body:literal) => {
-            Migration {
-                id: $id,
-                body: $body.to_string(),
-            }
-        };
-    }
+    use super::*;
 
-    macro_rules! r {
-        ($id:literal, $schema_version:literal) => {
-            Replicaset {
-                replicaset_id: ReplicasetId($id.to_string()),
-                replicaset_uuid: "".to_string(),
-                master_id: InstanceId("i0".to_string()),
-                current_weight: 1.0,
-                target_weight: 1.0,
-                current_schema_version: $schema_version,
-            }
-        };
-    }
-
-    macro_rules! expect {
-        [$(($migration_id:literal, $($replicaset_id:literal),*)),*] => [vec![
-            $((
-                $migration_id,
-                vec![$(ReplicasetId($replicaset_id.to_string())),*].to_vec()
-            )),*
-        ].to_vec()];
+    #[allow(non_snake_case)]
+    fn R(rid: &str, mid: u64) -> Replicaset {
+        Replicaset {
+            replicaset_id: rid.into(),
+            replicaset_uuid: Default::default(),
+            master_id: String::default().into(),
+            current_weight: Default::default(),
+            target_weight: Default::default(),
+            current_schema_version: mid,
+        }
     }
 
     #[test]
     fn test_waiting_migrations() {
-        let ms = vec![m!(1, "m1"), m!(2, "m2"), m!(3, "m3")].to_vec();
-        let rs = vec![r!("r1", 0), r!("r2", 2), r!("r3", 1)].to_vec();
-        assert_eq!(waiting_migrations(&mut ms.clone(), &rs, 0), expect![]);
-        assert_eq!(
-            waiting_migrations(&mut ms.clone(), &rs, 1),
-            expect![(1, "r1")]
-        );
-        assert_eq!(
-            waiting_migrations(&mut ms.clone(), &rs, 2),
-            expect![(1, "r1"), (2, "r1", "r3")]
-        );
-        assert_eq!(
-            waiting_migrations(&mut ms.clone(), &rs, 3),
-            expect![(1, "r1"), (2, "r1", "r3"), (3, "r1", "r2", "r3")]
-        );
+        let ms = vec![3, 2, 1];
+        let rs = [R("r1", 0), R("r2", 2), R("r3", 1)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        assert_eq!(get_pending_migration(ms.clone(), &rs, 0), None);
+
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 1).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (1, "r1"));
+
+        let rs = [R("r1", 1), R("r2", 2), R("r3", 1)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        assert_eq!(get_pending_migration(ms.clone(), &rs, 1), None);
+
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 2).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (2, "r1"));
+
+        let rs = [R("r1", 2), R("r2", 2), R("r3", 1)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 2).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (2, "r3"));
+
+        let rs = [R("r1", 2), R("r2", 2), R("r3", 2)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        assert_eq!(get_pending_migration(ms.clone(), &rs, 2), None);
+
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 3).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (3, "r1"));
+
+        let rs = [R("r1", 3), R("r2", 2), R("r3", 2)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 99).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (3, "r2"));
+
+        let rs = [R("r1", 3), R("r2", 3), R("r3", 2)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        let (m, r) = get_pending_migration(ms.clone(), &rs, 99).unwrap();
+        assert_eq!((m, &*r.replicaset_id), (3, "r3"));
+
+        let rs = [R("r1", 3), R("r2", 3), R("r3", 3)];
+        let rs = rs.iter().collect::<Vec<_>>();
+        assert_eq!(get_pending_migration(ms.clone(), &rs, 99), None);
     }
 }
diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index baf630bbd9..d192b33579 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -37,7 +37,7 @@ pub(crate) mod cc;
 pub(crate) mod migration;
 
 pub(crate) use cc::raft_conf_change;
-pub(crate) use migration::waiting_migrations;
+pub(crate) use migration::get_pending_migration;
 
 macro_rules! governor_step {
     ($desc:literal $([ $($kv:tt)* ])? async { $($body:tt)+ }) => {
@@ -81,6 +81,7 @@ impl Loop {
             .iter()
             .map(|rs| (&rs.replicaset_id, rs))
             .collect();
+        let migration_ids = storage.migrations.iter().unwrap().map(|m| m.id).collect();
 
         let term = status.get().term;
         let commit = raft_storage.commit().unwrap().unwrap();
@@ -88,6 +89,7 @@ impl Loop {
         let node = global().expect("must be initialized");
         let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap();
         let replication_factor = storage.properties.replication_factor().unwrap();
+        let desired_schema_version = storage.properties.desired_schema_version().unwrap();
 
         let plan = action_plan(
             term,
@@ -97,9 +99,11 @@ impl Loop {
             &voters,
             &learners,
             &replicasets,
+            migration_ids,
             node.raft_id,
             vshard_bootstrapped,
             replication_factor,
+            desired_schema_version,
         );
         let plan = unwrap_ok_or!(plan,
             Err(e) => {
@@ -110,8 +114,6 @@ impl Loop {
             }
         );
 
-        // TODO: remove this once all plans are implemented
-        let mut did_something = true;
         match plan {
             Plan::ConfChange(ConfChange { conf_change }) => {
                 // main_loop gives the warranty that every ProposeConfChange
@@ -416,77 +418,40 @@ impl Loop {
                 }
             }
 
-            Plan::None => {
-                tlog!(Info, "nothing to do");
-                did_something = false;
-            }
-        }
-
-        if did_something {
-            return Continue;
-        }
-
-        ////////////////////////////////////////////////////////////////////////
-        // applying migrations
-        let desired_schema_version = storage.properties.desired_schema_version().unwrap();
-        let replicasets = storage.replicasets.iter().unwrap().collect::<Vec<_>>();
-        let mut migrations = storage.migrations.iter().unwrap().collect::<Vec<_>>();
-        let commit = raft_storage.commit().unwrap().unwrap();
-        for (mid, rids) in waiting_migrations(&mut migrations, &replicasets, desired_schema_version)
-        {
-            let migration = storage.migrations.get(mid).unwrap().unwrap();
-            for rid in rids {
-                let replicaset = storage
-                    .replicasets
-                    .get(rid.to_string().as_str())
-                    .unwrap()
-                    .unwrap();
-                let instance = storage.instances.get(&replicaset.master_id).unwrap();
-                let req = rpc::migration::apply::Request {
-                    term,
-                    commit,
-                    timeout: Self::SYNC_TIMEOUT,
-                    migration_id: migration.id,
-                };
-                let res: Result<_> = async {
-                    let rpc::migration::apply::Response {} = pool
-                        .call(&instance.raft_id, &req)?
-                        // TODO: don't hard code timeout
-                        .timeout(Duration::from_secs(3))
-                        .await??;
-                    let mut ops = UpdateOps::new();
-                    ops.assign("current_schema_version", migration.id)?;
-                    let op = OpDML::update(
-                        ClusterwideSpace::Replicaset,
-                        &[replicaset.replicaset_id.clone()],
-                        ops,
-                    )?;
-                    node.propose_and_wait(op, Duration::MAX)??;
-                    tlog!(
-                        Info,
-                        "Migration {0} applied to replicaset {1}",
-                        migration.id,
-                        replicaset.replicaset_id
-                    );
-                    Ok(())
+            Plan::ApplyMigration(ApplyMigration { target, rpc, op }) => {
+                let migration_id = rpc.migration_id;
+                governor_step! {
+                    "applying migration on a replicaset" [
+                        "replicaset_id" => %target.replicaset_id,
+                        "migration_id" => %migration_id,
+                    ]
+                    async {
+                        pool
+                            .call(&target.master_id, &rpc)?
+                            .timeout(Loop::SYNC_TIMEOUT)
+                            .await??;
+                    }
                 }
-                .await;
-                if let Err(e) = res {
-                    tlog!(
-                        Warning,
-                        "Could not apply migration {0} to replicaset {1}, error: {2}",
-                        migration.id,
-                        replicaset.replicaset_id,
-                        e
-                    );
-                    return Continue;
+
+                governor_step! {
+                    "proposing replicaset current schema version change" [
+                        "replicaset_id" => %target.replicaset_id,
+                        "migration_id" => %migration_id,
+                    ]
+                    async {
+                        node.propose_and_wait(op, Duration::from_secs(3))??
+                    }
                 }
+
+                event::broadcast(Event::MigrateDone);
             }
-        }
-        event::broadcast(Event::MigrateDone);
 
-        event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged])
-            .expect("Events system must be initialized");
+            Plan::None => {
+                tlog!(Info, "nothing to do, waiting for events to handle");
+                event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged])
+                    .expect("Events system must be initialized");
+            }
+        }
 
         Continue
     }
@@ -501,9 +466,11 @@ fn action_plan<'i>(
     voters: &[RaftId],
     learners: &[RaftId],
     replicasets: &HashMap<&ReplicasetId, &'i Replicaset>,
+    migration_ids: Vec<u64>,
     my_raft_id: RaftId,
     vshard_bootstrapped: bool,
     replication_factor: usize,
+    desired_schema_version: u64,
 ) -> Result<Plan<'i>> {
     ////////////////////////////////////////////////////////////////////////////
     // conf change
@@ -784,6 +751,23 @@ fn action_plan<'i>(
         return Ok(ToOnline { req }.into());
     }
 
+    ////////////////////////////////////////////////////////////////////////////
+    // migration
+    let replicasets: Vec<_> = replicasets.values().copied().collect();
+    let to_apply = get_pending_migration(migration_ids, &replicasets, desired_schema_version);
+    if let Some((migration_id, target)) = to_apply {
+        let rpc = rpc::migration::apply::Request {
+            term,
+            commit,
+            timeout: Loop::SYNC_TIMEOUT,
+            migration_id,
+        };
+        let mut ops = UpdateOps::new();
+        ops.assign("current_schema_version", migration_id)?;
+        let op = OpDML::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?;
+        return Ok(ApplyMigration { target, rpc, op }.into());
+    }
+
     Ok(Plan::None)
 }
 
@@ -990,5 +974,11 @@ mod actions {
         pub struct ToOnline {
             pub req: update_instance::Request,
         }
+
+        pub struct ApplyMigration<'i> {
+            pub target: &'i Replicaset,
+            pub rpc: rpc::migration::apply::Request,
+            pub op: OpDML,
+        }
     }
 }
diff --git a/src/storage.rs b/src/storage.rs
index 35d4066d7a..4e4eb0fb7b 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -213,6 +213,7 @@ impl Replicasets {
         Ok(Self { space })
     }
 
+    #[allow(unused)]
     #[inline]
     pub fn get(&self, replicaset_id: &str) -> tarantool::Result<Option<Replicaset>> {
         match self.space.get(&[replicaset_id])? {
-- 
GitLab