From 3de8fa7608e6c66e2494efe017eb607267eb4f80 Mon Sep 17 00:00:00 2001
From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io>
Date: Tue, 15 Nov 2022 15:21:16 +0300
Subject: [PATCH] feat: applying migrations by Governor

---
 src/main.rs                |  1 +
 src/traft/event.rs         |  1 +
 src/traft/governor.rs      | 78 ++++++++++++++++++++++++++++++++++++++
 src/traft/mod.rs           | 20 +++++++++-
 src/traft/node.rs          | 61 ++++++++++++++++++++++++++++-
 src/traft/rpc/migration.rs | 33 ++++++++++++++++
 src/traft/rpc/mod.rs       |  1 +
 src/traft/storage.rs       | 30 ++++++++++++++-
 test/int/test_migration.py | 47 +++++++++++++++++++++++
 9 files changed, 267 insertions(+), 5 deletions(-)
 create mode 100644 src/traft/rpc/migration.rs

diff --git a/src/main.rs b/src/main.rs
index 4ee86fc07e..40b59e92cd 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -461,6 +461,7 @@ fn init_handlers() {
     declare_cfunc!(traft::rpc::replication::proc_replication);
     declare_cfunc!(traft::rpc::replication::promote::proc_replication_promote);
     declare_cfunc!(traft::rpc::sharding::proc_sharding);
+    declare_cfunc!(traft::rpc::migration::apply::proc_apply_migration);
 }
 
 fn rm_tarantool_files(data_dir: &str) {
diff --git a/src/traft/event.rs b/src/traft/event.rs
index 0d2c7dede1..83c0396641 100644
--- a/src/traft/event.rs
+++ b/src/traft/event.rs
@@ -28,6 +28,7 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
         TopologyChanged = "raft.topology-changed",
         RaftLoopNeeded = "raft.loop-needed",
         RaftEntryApplied = "raft.entry-applied",
+        ClusterStateChanged = "picodata.cluster-state-updated",
     }
 }
 
diff --git a/src/traft/governor.rs b/src/traft/governor.rs
index 7f174b7331..7e4a32c81c 100644
--- a/src/traft/governor.rs
+++ b/src/traft/governor.rs
@@ -9,6 +9,10 @@ use crate::traft::Peer;
 use crate::traft::RaftId;
 use crate::traft::TargetGradeVariant;
 
+use super::Migration;
+use super::Replicaset;
+use super::ReplicasetId;
+
 struct RaftConf<'a> {
     all: BTreeMap<RaftId, &'a Peer>,
     voters: BTreeSet<RaftId>,
@@ -168,6 +172,27 @@ pub(crate) fn raft_conf_change(
     Some(conf_change)
 }
 
+pub(crate) fn waiting_migrations<'a>(
+    migrations: &'a mut [Migration],
+    replicasets: &'a [Replicaset],
+    desired_schema_version: u64,
+) -> Vec<(u64, Vec<ReplicasetId>)> {
+    migrations.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap());
+    let mut res: Vec<(u64, Vec<ReplicasetId>)> = Vec::new();
+    for m in migrations {
+        let mut rs: Vec<ReplicasetId> = Vec::new();
+        for r in replicasets {
+            if r.current_schema_version < m.id && m.id <= desired_schema_version {
+                rs.push(r.replicaset_id.clone());
+            }
+        }
+        if !rs.is_empty() {
+            res.push((m.id, rs));
+        }
+    }
+    res
+}
+
 #[cfg(test)]
 mod tests {
     use ::raft::prelude as raft;
@@ -367,4 +392,57 @@ mod tests {
             None
         );
     }
+
+    use crate::traft::{InstanceId, Migration, Replicaset, ReplicasetId};
+
+    use super::waiting_migrations;
+
+    macro_rules! m {
+        ($id:literal, $body:literal) => {
+            Migration {
+                id: $id,
+                body: $body.to_string(),
+            }
+        };
+    }
+
+    macro_rules! r {
+        ($id:literal, $schema_version:literal) => {
+            Replicaset {
+                replicaset_id: ReplicasetId($id.to_string()),
+                replicaset_uuid: "".to_string(),
+                master_id: InstanceId("i0".to_string()),
+                weight: 1.0,
+                current_schema_version: $schema_version,
+            }
+        };
+    }
+
+    macro_rules! expect {
+        [$(($migration_id:literal, $($replicaset_id:literal),*)),*] => [vec![
+            $((
+                $migration_id,
+                vec![$(ReplicasetId($replicaset_id.to_string())),*].to_vec()
+            )),*
+        ].to_vec()];
+    }
+
+    #[test]
+    fn test_waiting_migrations() {
+        let ms = vec![m!(1, "m1"), m!(2, "m2"), m!(3, "m3")].to_vec();
+        let rs = vec![r!("r1", 0), r!("r2", 2), r!("r3", 1)].to_vec();
+        assert_eq!(waiting_migrations(&mut ms.clone(), &rs, 0), expect![]);
+        assert_eq!(
+            waiting_migrations(&mut ms.clone(), &rs, 1),
+            expect![(1, "r1")]
+        );
+        assert_eq!(
+            waiting_migrations(&mut ms.clone(), &rs, 2),
+            expect![(1, "r1"), (2, "r1", "r3")]
+        );
+        assert_eq!(
+            waiting_migrations(&mut ms.clone(), &rs, 3),
+            expect![(1, "r1"), (2, "r1", "r3"), (3, "r1", "r2", "r3")]
+        );
+    }
 }
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 108275b2f8..71c5479213 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -36,6 +36,8 @@ use storage::ClusterSpace;
 pub use storage::Storage;
 pub use topology::Topology;
 
+use self::event::Event;
+
 pub type RaftId = u64;
 pub type RaftTerm = u64;
 pub type RaftIndex = u64;
@@ -185,7 +187,13 @@ impl Op {
                 peers.put(peer).unwrap();
                 Box::new(peer.clone())
             }
-            Self::Dml(op) => Box::new(op.result()),
+            Self::Dml(op) => {
+                let res = Box::new(op.result());
+                if op.space() == &ClusterSpace::State {
+                    event::broadcast(Event::ClusterStateChanged);
+                }
+                res
+            }
         }
     }
 
@@ -329,6 +337,16 @@ impl OpDML {
         };
         Ok(res)
     }
+
+    #[rustfmt::skip]
+    pub fn space(&self) -> &ClusterSpace {
+        match &self {
+            Self::Insert { space, .. } => space,
+            Self::Replace { space, .. } => space,
+            Self::Update { space, .. } => space,
+            Self::Delete { space, .. } => space,
+        }
+    }
 }
 
 mod vec_of_raw_byte_buf {
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 301ace677c..b4b7de542b 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -30,7 +30,8 @@ use tarantool::space::UpdateOps;
 use crate::kvcell::KVCell;
 use crate::r#loop::{FlowControl, Loop};
 use crate::stringify_cfunc;
-use crate::traft::governor::raft_conf_change;
+use crate::traft::governor::{raft_conf_change, waiting_migrations};
+use crate::traft::rpc;
 use crate::traft::storage::ClusterSpace;
 use crate::traft::ContextCoercion as _;
 use crate::traft::OpDML;
@@ -1423,7 +1424,63 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
             continue 'governor;
         }
 
-        event::wait(Event::TopologyChanged).expect("Events system must be initialized");
+        ////////////////////////////////////////////////////////////////////////
+        // applying migrations
+        let desired_schema_version = storage.state.desired_schema_version().unwrap();
+        let replicasets = storage.replicasets.iter().unwrap().collect::<Vec<_>>();
+        let mut migrations = storage.migrations.iter().unwrap().collect::<Vec<_>>();
+        let commit = storage.raft.commit().unwrap().unwrap();
+        for (mid, rids) in waiting_migrations(&mut migrations, &replicasets, desired_schema_version)
+        {
+            let migration = storage.migrations.get(mid).unwrap().unwrap();
+            for rid in rids {
+                let replicaset = storage
+                    .replicasets
+                    .get(rid.to_string().as_str())
+                    .unwrap()
+                    .unwrap();
+                let peer = storage.peers.get(&replicaset.master_id).unwrap();
+                let req = rpc::migration::apply::Request {
+                    term,
+                    commit,
+                    timeout: SYNC_TIMEOUT,
+                    migration_id: migration.id,
+                };
+                let res = pool.call_and_wait(&peer.raft_id, req);
+                match res {
+                    Ok(_) => {
+                        let mut ops = UpdateOps::new();
+                        ops.assign("current_schema_version", migration.id).unwrap();
+                        let op = OpDML::update(
+                            ClusterSpace::Replicasets,
+                            &[replicaset.replicaset_id.clone()],
+                            ops,
+                        )
+                        .unwrap();
+                        node.propose_and_wait(op, Duration::MAX).unwrap().unwrap();
+                        tlog!(
+                            Info,
+                            "Migration {0} applied to replicaset {1}",
+                            migration.id,
+                            replicaset.replicaset_id
+                        );
+                    }
+                    Err(e) => {
+                        tlog!(
+                            Warning,
+                            "Could not apply migration {0} to replicaset {1}, error: {2}",
+                            migration.id,
+                            replicaset.replicaset_id,
+                            e
+                        );
+                        continue 'governor;
+                    }
+                }
+            }
+        }
+
+        event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged])
+            .expect("Events system must be initialized");
     }
 
     #[allow(clippy::type_complexity)]
diff --git a/src/traft/rpc/migration.rs b/src/traft/rpc/migration.rs
new file mode 100644
index 0000000000..a7ee2482c5
--- /dev/null
+++ b/src/traft/rpc/migration.rs
@@ -0,0 +1,33 @@
+pub mod apply {
+    use crate::traft::{error::Error, node, rpc::sync, RaftIndex, RaftTerm, Result};
+    use std::time::Duration;
+
+    crate::define_rpc_request! {
+        fn proc_apply_migration(req: Request) -> Result<Response> {
+            let node = node::global()?;
+            node.status().check_term(req.term)?;
+            sync::wait_for_index_timeout(req.commit, &node.storage.raft, req.timeout)?;
+
+            let storage = &node.storage;
+
+            match storage.migrations.get(req.migration_id)? {
+                Some(migration) => {
+                    match crate::tarantool::exec(migration.body.as_str()) {
+                        Ok(_) => Ok(Response{}),
+                        Err(e) => Err(e.into())
+                    }
+                }
+                None => Err(Error::other(format!("Migration {0} not found", req.migration_id))),
+            }
+        }
+
+        pub struct Request {
+            pub term: RaftTerm,
+            pub commit: RaftIndex,
+            pub timeout: Duration,
+            pub migration_id: u64,
+        }
+
+        pub struct Response {}
+    }
+}
diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs
index 15eb371a09..57f09852a2 100644
--- a/src/traft/rpc/mod.rs
+++ b/src/traft/rpc/mod.rs
@@ -11,6 +11,7 @@ use std::time::Duration;
 use serde::de::DeserializeOwned;
 
 pub mod expel;
+pub mod migration;
 pub mod replication;
 pub mod sharding;
 pub mod sync;
diff --git a/src/traft/storage.rs b/src/traft/storage.rs
index eadda6be32..4c8a806afd 100644
--- a/src/traft/storage.rs
+++ b/src/traft/storage.rs
@@ -168,9 +168,8 @@ impl State {
         Ok(res)
     }
 
-    #[allow(dead_code)]
     #[inline]
-    pub fn desired_schema_version(&self) -> tarantool::Result<usize> {
+    pub fn desired_schema_version(&self) -> tarantool::Result<u64> {
         let res = self
             .get(StateKey::DesiredSchemaVersion)?
             .unwrap_or_default();
@@ -675,6 +674,33 @@ impl Migrations {
             None => Ok(None),
         }
     }
+
+    pub fn iter(&self) -> Result<MigrationIter> {
+        let iter = self.space.select(IteratorType::All, &())?;
+        Ok(iter.into())
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// MigrationtIter
+////////////////////////////////////////////////////////////////////////////////
+
+pub struct MigrationIter {
+    iter: IndexIterator,
+}
+
+impl From<IndexIterator> for MigrationIter {
+    fn from(iter: IndexIterator) -> Self {
+        Self { iter }
+    }
+}
+
+impl Iterator for MigrationIter {
+    type Item = traft::Migration;
+    fn next(&mut self) -> Option<Self::Item> {
+        let res = self.iter.next().as_ref().map(Tuple::decode);
+        res.map(|res| res.expect("migration should decode correctly"))
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/test/int/test_migration.py b/test/int/test_migration.py
index 18cce7f085..812ef465c6 100644
--- a/test/int/test_migration.py
+++ b/test/int/test_migration.py
@@ -1,4 +1,5 @@
 from conftest import Cluster
+import funcy  # type: ignore
 
 
 def test_add_migration(cluster: Cluster):
@@ -17,3 +18,49 @@ def test_push_schema_version(cluster: Cluster):
     i1.eval("picolib.push_schema_version(3)")
     key = "desired_schema_version"
     assert [[key, 3]] == i2.call("box.space.cluster_state:select", [key])
+
+
+def test_apply_migrations(cluster: Cluster):
+    # Scenario: apply migration to cluster
+    #   Given a cluster with added migration
+    #   When it push up desired_schema_version
+    #   Then the migration is applied
+    #   And replicaset's current_schema_version is set to desired_schema_version
+
+    cluster.deploy(instance_count=3)
+    i1, _, _ = cluster.instances
+    i1.promote_or_fail()
+    i1.assert_raft_status("Leader")
+    i1.call(
+        "picolib.add_migration",
+        1,
+        """
+        box.schema.space.create('test_space', {
+            format = {
+                {name = 'id', type = 'unsigned'},
+                {name = 'value', type = 'string'},
+            }
+        })
+        """,
+    )
+    i1.call(
+        "picolib.add_migration",
+        2,
+        "box.space.test_space:create_index('pk', {parts = {'id'}})",
+    )
+
+    i1.call("picolib.push_schema_version", 2)
+
+    @funcy.retry(tries=30, timeout=0.2)  # type: ignore
+    def assert_space_insert(conn):
+        assert conn.insert("test_space", [1, "foo"])
+
+    @funcy.retry(tries=30, timeout=0.2)  # type: ignore
+    def assert_replicaset_version(conn):
+        position = conn.schema.get_field("replicasets", "current_schema_version")["id"]
+        assert [2, 2, 2] == [tuple[position] for tuple in conn.select("replicasets")]
+
+    for i in cluster.instances:
+        with i.connect(timeout=1) as conn:
+            assert_space_insert(conn)
+            assert_replicaset_version(conn)
-- 
GitLab