From 7f3221f66e2fbd34d3ffee69c35f8848a0fd8982 Mon Sep 17 00:00:00 2001
From: godzie44 <godzie@yandex.ru>
Date: Fri, 17 May 2024 10:42:30 +0300
Subject: [PATCH] feat(plugins): add data migration mechanism for plugins

---
 src/governor/plan.rs                          |   4 +
 src/luamod.rs                                 |   2 +-
 src/plugin/migration.rs                       | 502 ++++++++++++++++++
 src/plugin/mod.rs                             |  50 +-
 src/schema.rs                                 |  17 +-
 src/sql.rs                                    |   4 +
 test/conftest.py                              |   1 +
 test/int/test_basics.py                       |   4 +-
 test/int/test_plugin.py                       | 123 ++++-
 test/testplug/testplug_w_migration/author.db  |   7 +
 test/testplug/testplug_w_migration/book.db    |   9 +
 .../testplug_w_migration/manifest.yaml        |  10 +
 12 files changed, 715 insertions(+), 18 deletions(-)
 create mode 100644 src/plugin/migration.rs
 create mode 100644 test/testplug/testplug_w_migration/author.db
 create mode 100644 test/testplug/testplug_w_migration/book.db
 create mode 100644 test/testplug/testplug_w_migration/manifest.yaml

diff --git a/src/governor/plan.rs b/src/governor/plan.rs
index 7f17b9ec99..050564d0c2 100644
--- a/src/governor/plan.rs
+++ b/src/governor/plan.rs
@@ -430,6 +430,10 @@ pub(super) fn action_plan<'i>(
             Some(plugin) if plugin.enabled => {
                 // plugin already exists - do nothing
             }
+            Some(plugin) if plugin.migration_list.len() as i32 - 1 != plugin.migration_progress => {
+                // migration is partially applied - do nothing
+                tlog!(Error, "Trying to enable a non-fully installed plugin (migration is partially applied)");
+            }
             None => {
                 // plugin isn't installed - do nothing
                 tlog!(Error, "Trying to enable a non-installed plugin");
diff --git a/src/luamod.rs b/src/luamod.rs
index c8f451d797..b681c5161f 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -1754,7 +1754,7 @@ pub(crate) fn setup() {
                         timeout = duration_from_secs_f64_clamped(t);
                     }
                 }
-                plugin::remove_plugin(&name, timeout)
+                plugin::remove_plugin(&name, timeout, false)
             })
         },
     );
diff --git a/src/plugin/migration.rs b/src/plugin/migration.rs
new file mode 100644
index 0000000000..5611877ea2
--- /dev/null
+++ b/src/plugin/migration.rs
@@ -0,0 +1,502 @@
+use crate::cas::Range;
+use crate::plugin::{do_plugin_cas, PLUGIN_DIR};
+use crate::schema::{PluginDef, ADMIN_ID};
+use crate::storage::ClusterwideTable;
+use crate::traft::node;
+use crate::traft::op::{Dml, Op};
+use crate::{error_injection, sql, tlog, traft};
+use sbroad::backend::sql::ir::PatternWithParams;
+use std::fs::File;
+use std::io;
+use std::io::{BufRead, BufReader, ErrorKind};
+use std::path::{Path, PathBuf};
+use tarantool::space::UpdateOps;
+use tarantool::time::Instant;
+
+const MIGRATION_FILE_EXT: &'static str = "db";
+
+#[derive(thiserror::Error, Debug)]
+pub enum Error {
+    #[error("File `{0}` invalid extension, `.db` expected")]
+    Extension(String),
+
+    #[error("Error while open migration file `{0}`: {1}")]
+    File(String, io::Error),
+
+    #[error("Invalid migration file format: {0}")]
+    InvalidMigrationFormat(&'static str),
+
+    #[error("Error while apply UP command `{0}`: {1}")]
+    Up(String, String),
+
+    #[error("Update migration progress: {0}")]
+    UpdateProgress(String),
+}
+
+#[derive(Debug, PartialEq)]
+enum Annotation {
+    Up,
+    Down,
+}
+
+#[derive(Debug, PartialEq)]
+enum MigrationLine {
+    Comment(String),
+    Annotation(Annotation),
+    String(String),
+}
+
+/// Parse migration data line by line.
+struct MigrationLineParser<B: BufRead> {
+    inner: io::Lines<B>,
+}
+
+impl<B: BufRead> MigrationLineParser<B> {
+    fn new(lines: io::Lines<B>) -> Self {
+        Self { inner: lines }
+    }
+}
+
+impl MigrationLineParser<BufReader<File>> {
+    /// Construct parser from .db file.
+    fn from_file<P: AsRef<Path>>(filename: P) -> Result<Self, Error> {
+        let file_path = filename.as_ref();
+        let file = File::open(file_path)
+            .map_err(|e| Error::File(file_path.to_string_lossy().to_string(), e))?;
+        let lines = io::BufReader::new(file).lines();
+        Ok(Self::new(lines))
+    }
+}
+
+impl<B: BufRead> Iterator for MigrationLineParser<B> {
+    type Item = MigrationLine;
+
+    fn next(&mut self) -> Option<Self::Item> {
+        fn extract_comment(line: &str) -> Option<&str> {
+            let (prefix, comment) = line.trim().split_once("--")?;
+            if !prefix.is_empty() {
+                return None;
+            };
+            Some(comment.trim())
+        }
+
+        for line in self.inner.by_ref() {
+            let line = match line {
+                Err(_) => {
+                    // ignore non-utf8 lines
+                    continue;
+                }
+                Ok(line) => {
+                    if line.is_empty() {
+                        continue;
+                    }
+
+                    let maybe_comment = extract_comment(&line);
+
+                    if let Some(comment) = maybe_comment {
+                        match comment {
+                            "pico.UP" => MigrationLine::Annotation(Annotation::Up),
+                            "pico.DOWN" => MigrationLine::Annotation(Annotation::Down),
+                            _ => MigrationLine::Comment(comment.to_string()),
+                        }
+                    } else {
+                        MigrationLine::String(line)
+                    }
+                }
+            };
+            return Some(line);
+        }
+
+        None
+    }
+}
+
+/// Apply sql from migration file onto cluster.
+trait SqlApplier {
+    fn apply(&self, sql: &str) -> traft::Result<()>;
+}
+
+/// By default, sql applied with SBroad.
+struct SBroadApplier;
+
+impl SqlApplier for SBroadApplier {
+    fn apply(&self, sql: &str) -> traft::Result<()> {
+        let mut params = PatternWithParams::new(sql.to_string(), vec![]);
+        params.tracer = Some("stat".to_string());
+        sql::dispatch_sql_query(params.into()).map(|_| ())
+    }
+}
+
+fn up_single_file<B: BufRead>(
+    mut migration_iter: MigrationLineParser<B>,
+    applier: impl SqlApplier,
+) -> Result<(), Error> {
+    for line in migration_iter.by_ref() {
+        match line {
+            MigrationLine::Comment(_) => continue,
+            MigrationLine::Annotation(Annotation::Up) => break,
+            _ => {
+                return Err(Error::InvalidMigrationFormat(
+                    "no pico.UP annotation found at start of file",
+                ))
+            }
+        }
+    }
+
+    for line in migration_iter {
+        match line {
+            MigrationLine::Comment(_) => continue,
+            MigrationLine::Annotation(Annotation::Down) => return Ok(()),
+            MigrationLine::Annotation(Annotation::Up) => {
+                return Err(Error::InvalidMigrationFormat(
+                    "only single pico.UP annotation allowed",
+                ))
+            }
+            MigrationLine::String(sql) => {
+                if let Err(e) = applier.apply(&sql) {
+                    return Err(Error::Up(sql, e.to_string()));
+                }
+            }
+        }
+    }
+
+    Ok(())
+}
+
+/// Apply UP part from migration files. If one of migration files migrated with errors,
+/// then rollback happens: for file that triggered error and all previously migrated files
+/// DOWN part is called.
+///
+/// # Arguments
+///
+/// * `plugin_name`: name of plugin for which migrations belong to
+/// * `migrations`: list of migration file names
+/// * `deadline`: applying deadline
+pub fn up(
+    plugin_name: &str,
+    migrations: &[String],
+    deadline: Instant,
+) -> crate::plugin::Result<()> {
+    // checking the existence of migration files
+    let mut migration_files = vec![];
+    let plugin_dir = PLUGIN_DIR.with(|dir| dir.lock().clone()).join(plugin_name);
+    for file in migrations {
+        let migration_path = plugin_dir.join(file);
+
+        if error_injection::is_enabled("PLUGIN_MIGRATION_FIRST_FILE_INVALID_EXT") {
+            return Err(Error::Extension(file.to_string()).into());
+        }
+        if migration_path
+            .extension()
+            .and_then(|os_str| os_str.to_str())
+            != Some(MIGRATION_FILE_EXT)
+        {
+            return Err(Error::Extension(file.to_string()).into());
+        }
+
+        if !migration_path.exists() {
+            return Err(Error::File(
+                file.to_string(),
+                io::Error::new(ErrorKind::NotFound, "file not found"),
+            )
+            .into());
+        }
+
+        migration_files.push(migration_path.clone());
+    }
+
+    fn handle_err(to_revert: &[PathBuf]) {
+        let it = to_revert.iter().rev();
+        for f in it {
+            let iter = match MigrationLineParser::from_file(f) {
+                Ok(mi) => mi,
+                Err(e) => {
+                    tlog!(Error, "Rollback DOWN migration error: {e}");
+                    continue;
+                }
+            };
+
+            down_single_file(iter, SBroadApplier);
+        }
+    }
+
+    let node = node::global().expect("node must be already initialized");
+    for (num, db_file) in migration_files.iter().enumerate() {
+        if num == 1 && error_injection::is_enabled("PLUGIN_MIGRATION_SECOND_FILE_APPLY_ERROR") {
+            handle_err(&migration_files[..num + 1]);
+            return Err(Error::Up("".to_string(), "".to_string()).into());
+        }
+
+        let migration_iter = MigrationLineParser::from_file(db_file)?;
+        if let Err(e) = up_single_file(migration_iter, SBroadApplier) {
+            handle_err(&migration_files[..num + 1]);
+            return Err(e.into());
+        }
+
+        let mut enable_ops = UpdateOps::new();
+        enable_ops
+            .assign(PluginDef::FIELD_MIGRATION_PROGRESS, num)
+            .expect("serialization cannot fail");
+        let update_dml = Dml::update(
+            ClusterwideTable::Plugin,
+            &[plugin_name],
+            enable_ops,
+            ADMIN_ID,
+        )?;
+        let ranges = vec![Range::new(ClusterwideTable::Plugin).eq([plugin_name])];
+
+        if let Err(e) = do_plugin_cas(node, Op::Dml(update_dml), ranges, None, deadline) {
+            handle_err(&migration_files[..num + 1]);
+            return Err(Error::UpdateProgress(e.to_string()).into());
+        }
+    }
+
+    Ok(())
+}
+
+fn down_single_file<B: BufRead>(migration_iter: MigrationLineParser<B>, applier: impl SqlApplier) {
+    // skip all while pico.DOWN is not reached
+    let migration_iter = migration_iter
+        .skip_while(|line| !matches!(line, MigrationLine::Annotation(Annotation::Down)))
+        .skip(1);
+
+    for line in migration_iter {
+        match line {
+            MigrationLine::Comment(_) => continue,
+            MigrationLine::Annotation(_) => {
+                let e = Error::InvalidMigrationFormat(
+                    "only single pico.UP/pico.DOWN annotation allowed",
+                );
+                tlog!(Error, "Error while apply DOWN command: {e}");
+            }
+            MigrationLine::String(sql) => {
+                if let Err(e) = applier.apply(&sql) {
+                    tlog!(Error, "Error while apply DOWN command `{sql}`: {e}");
+                }
+            }
+        }
+    }
+}
+
+/// Apply DOWN part from migration files.
+///
+/// # Arguments
+///
+/// * `plugin_name`: name of plugin for which migrations belong to
+/// * `migrations`: list of migration file names
+pub fn down(plugin_name: &str, migrations: &[String]) {
+    let migrations = migrations.iter().rev();
+    for db_file in migrations {
+        let plugin_dir = PLUGIN_DIR.with(|dir| dir.lock().clone()).join(plugin_name);
+        let migration_path = plugin_dir.join(db_file);
+        let migration_iter = match MigrationLineParser::from_file(&migration_path) {
+            Ok(mi) => mi,
+            Err(e) => {
+                tlog!(Error, "Rollback DOWN migration error: {e}");
+                continue;
+            }
+        };
+
+        down_single_file(migration_iter, SBroadApplier);
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+    use std::cell::RefCell;
+    use std::io::{BufRead, BufReader};
+    use std::rc::Rc;
+
+    #[test]
+    fn test_migration_line_parser() {
+        struct TestCase {
+            migration_data: &'static str,
+            expected_lines: Vec<MigrationLine>,
+        }
+        let test_cases = vec![
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+sql_command_1
+"#,
+                expected_lines: vec![
+                    MigrationLine::Annotation(Annotation::Up),
+                    MigrationLine::String("sql_command_1".to_string()),
+                ],
+            },
+            TestCase {
+                migration_data: r#"
+-- test comment
+
+-- pico.UP
+-- pico.DOWN
+
+sql_command_1
+-- test comment
+"#,
+                expected_lines: vec![
+                    MigrationLine::Comment("test comment".to_string()),
+                    MigrationLine::Annotation(Annotation::Up),
+                    MigrationLine::Annotation(Annotation::Down),
+                    MigrationLine::String("sql_command_1".to_string()),
+                    MigrationLine::Comment("test comment".to_string()),
+                ],
+            },
+        ];
+
+        for tc in test_cases {
+            let lines = BufReader::new(tc.migration_data.as_bytes()).lines();
+            let parser = MigrationLineParser::new(lines);
+            let parsing_res = parser.collect::<Vec<_>>();
+
+            assert_eq!(parsing_res, tc.expected_lines);
+        }
+    }
+
+    struct BufApplier {
+        poison_line: Option<&'static str>,
+        buf: Rc<RefCell<Vec<String>>>,
+    }
+
+    impl SqlApplier for BufApplier {
+        fn apply(&self, sql: &str) -> crate::traft::Result<()> {
+            if let Some(p) = self.poison_line {
+                if p == sql {
+                    return Err(crate::traft::error::Error::Other("test error".into()));
+                }
+            }
+            self.buf.borrow_mut().push(sql.to_string());
+            Ok(())
+        }
+    }
+
+    #[test]
+    fn test_migration_up() {
+        struct TestCase {
+            migration_data: &'static str,
+            poison_line: Option<&'static str>,
+            expected_applied_commands: Vec<&'static str>,
+            error: bool,
+        }
+        let test_cases = vec![
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+sql_command_1
+sql_command_2
+sql_command_3
+"#,
+                poison_line: None,
+                expected_applied_commands: vec!["sql_command_1", "sql_command_2", "sql_command_3"],
+                error: false,
+            },
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+sql_command_1
+sql_command_2
+sql_command_3
+"#,
+                poison_line: Some("sql_command_2"),
+                expected_applied_commands: vec!["sql_command_1"],
+                error: true,
+            },
+            TestCase {
+                migration_data: r#"
+-- pico.U
+sql_command_1
+"#,
+                poison_line: None,
+                expected_applied_commands: vec![],
+                error: true,
+            },
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+sql_command_1
+-- pico.UP
+sql_command_2
+"#,
+                poison_line: None,
+                expected_applied_commands: vec!["sql_command_1"],
+                error: true,
+            },
+        ];
+
+        for tc in test_cases {
+            let lines = BufReader::new(tc.migration_data.as_bytes()).lines();
+            let iter = MigrationLineParser::new(lines);
+            let buf = Rc::new(RefCell::new(vec![]));
+            let applier = BufApplier {
+                buf: buf.clone(),
+                poison_line: tc.poison_line,
+            };
+            let result = up_single_file(iter, applier);
+
+            assert_eq!(tc.error, result.is_err());
+            assert_eq!(
+                buf.borrow().iter().map(|s| s.as_str()).collect::<Vec<_>>(),
+                tc.expected_applied_commands
+            );
+        }
+    }
+
+    #[test]
+    fn test_migration_down() {
+        struct TestCase {
+            migration_data: &'static str,
+            poison_line: Option<&'static str>,
+            expected_applied_commands: Vec<&'static str>,
+        }
+        let test_cases = vec![
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+sql_command_1
+-- pico.DOWN
+sql_command_2
+sql_command_3
+"#,
+                poison_line: None,
+                expected_applied_commands: vec!["sql_command_2", "sql_command_3"],
+            },
+            TestCase {
+                migration_data: r#"
+-- pico.UP
+-- pico.DOWN
+sql_command_1
+sql_command_2
+sql_command_3
+"#,
+                poison_line: Some("sql_command_2"),
+                expected_applied_commands: vec!["sql_command_1", "sql_command_3"],
+            },
+            TestCase {
+                migration_data: r#"
+-- pico.DOWN
+sql_command_1
+-- pico.DOWN
+sql_command_2
+"#,
+                poison_line: None,
+                expected_applied_commands: vec!["sql_command_1", "sql_command_2"],
+            },
+        ];
+
+        for tc in test_cases {
+            let lines = BufReader::new(tc.migration_data.as_bytes()).lines();
+            let iter = MigrationLineParser::new(lines);
+            let buf = Rc::new(RefCell::new(vec![]));
+            let applier = BufApplier {
+                buf: buf.clone(),
+                poison_line: tc.poison_line,
+            };
+            down_single_file(iter, applier);
+            assert_eq!(
+                buf.borrow().iter().map(|s| s.as_str()).collect::<Vec<_>>(),
+                tc.expected_applied_commands
+            );
+        }
+    }
+}
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index 03e56fe006..9afc125597 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -1,4 +1,5 @@
 pub mod manager;
+pub mod migration;
 pub mod topology;
 
 use crate::schema::{PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey, ADMIN_ID};
@@ -24,7 +25,7 @@ use crate::traft::node::Node;
 use crate::traft::op::{Dml, Op};
 use crate::traft::{node, RaftIndex};
 use crate::util::effective_user_id;
-use crate::{cas, error_injection, traft};
+use crate::{cas, error_injection, tlog, traft};
 
 const DEFAULT_PLUGIN_DIR: &'static str = "/usr/share/picodata/";
 
@@ -80,6 +81,8 @@ pub enum PluginError {
     TopologyError(String),
     #[error("Found more than one service factory for `{0}` ver. `{1}`")]
     ServiceCollision(String, String),
+    #[error(transparent)]
+    Migration(#[from] migration::Error),
 }
 
 #[derive(thiserror::Error, Debug)]
@@ -122,6 +125,9 @@ pub struct Manifest {
     pub version: String,
     /// Plugin services
     services: Vec<ServiceManifest>,
+    /// Plugin migration list.
+    #[serde(default)]
+    pub migration: Vec<String>,
 }
 
 impl Manifest {
@@ -160,6 +166,8 @@ impl Manifest {
                 .collect(),
             version: self.version.to_string(),
             description: self.description.to_string(),
+            migration_list: self.migration.clone(),
+            migration_progress: -1,
         }
     }
 
@@ -173,7 +181,6 @@ impl Manifest {
                 tiers: vec![],
                 configuration: srv.default_configuration.clone(),
                 version: self.version.to_string(),
-                schema_version: 0,
                 description: srv.description.to_string(),
             })
             .collect()
@@ -412,7 +419,7 @@ pub fn install_plugin(name: &str, timeout: Duration) -> traft::Result<()> {
 
     let dml = Dml::replace(
         ClusterwideTable::Property,
-        &(&PropertyName::PluginInstall, manifest),
+        &(&PropertyName::PluginInstall, &manifest),
         effective_user_id(),
     )?;
 
@@ -429,8 +436,20 @@ pub fn install_plugin(name: &str, timeout: Duration) -> traft::Result<()> {
         index = node.read_index(deadline.duration_since(Instant::now()))?;
     }
 
-    if !node.storage.plugin.contains(name)? {
+    let Some(plugin) = node.storage.plugin.get(name)? else {
         return Err(PluginError::InstallationAborted.into());
+    };
+
+    if error_injection::is_enabled("PLUGIN_MIGRATION_CLIENT_DOWN") {
+        return Ok(());
+    }
+
+    let migration_result = migration::up(&plugin.name, &manifest.migration, deadline);
+    if let Err(e) = migration_result {
+        if let Err(err) = remove_plugin(&plugin.name, Duration::from_secs(2), true) {
+            tlog!(Error, "rollback plugin installation error: {err}");
+        }
+        return Err(e.into());
     }
 
     Ok(())
@@ -550,10 +569,17 @@ pub fn disable_plugin(plugin_name: &str, timeout: Duration) -> traft::Result<()>
 }
 
 /// Remove plugin: clear records from `_pico_plugin` and `_pico_service` system tables.
-pub fn remove_plugin(plugin_name: &str, timeout: Duration) -> traft::Result<()> {
+///
+/// # Arguments
+///
+/// * `plugin_name`: plugin name to remove
+/// * `timeout`: operation timeout
+/// * `force`: whether true if plugin should be removed without DOWN migration, false elsewhere
+pub fn remove_plugin(plugin_name: &str, timeout: Duration, force: bool) -> traft::Result<()> {
     let deadline = Instant::now().saturating_add(timeout);
 
     let node = node::global()?;
+    let maybe_plugin = node.storage.plugin.get(plugin_name)?;
     // we check this condition on any instance, this will allow
     // to return an error in most situations, but there are still
     // situations when instance is a follower and has not yet received up-to-date
@@ -561,8 +587,7 @@ pub fn remove_plugin(plugin_name: &str, timeout: Duration) -> traft::Result<()>
     // the error will not be returned to client and raft op
     // must be applied on instances correctly (op should ignore removing if
     // plugin exists and enabled)
-    let plugin_exists_and_enabled =
-        node.storage.plugin.get(plugin_name)?.map(|p| p.enabled) == Some(true);
+    let plugin_exists_and_enabled = maybe_plugin.as_ref().map(|p| p.enabled) == Some(true);
     if plugin_exists_and_enabled && !error_injection::is_enabled("PLUGIN_EXIST_AND_ENABLED") {
         return Err(RemoveOfEnabledPlugin.into());
     }
@@ -579,8 +604,15 @@ pub fn remove_plugin(plugin_name: &str, timeout: Duration) -> traft::Result<()>
         ],
         None,
         deadline,
-    )
-    .map(|_| ())
+    )?;
+
+    if !force {
+        if let Some(plugin) = maybe_plugin {
+            migration::down(&plugin.name, &plugin.migration_list);
+        }
+    }
+
+    Ok(())
 }
 
 #[derive(Serialize, Deserialize, PartialEq, Clone, Debug)]
diff --git a/src/schema.rs b/src/schema.rs
index 85f1e79438..8241fe20e5 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -433,6 +433,10 @@ pub struct PluginDef {
     pub version: String,
     /// Plugin description
     pub description: String,
+    /// List of migration files.
+    pub migration_list: Vec<String>,
+    /// Number of a last migration file (-1 if no migration files applied).
+    pub migration_progress: i32,
 }
 
 impl Encode for PluginDef {}
@@ -440,6 +444,8 @@ impl Encode for PluginDef {}
 impl PluginDef {
     /// Index (0-based) of field "enable" in the _pico_plugin table format.
     pub const FIELD_ENABLE: usize = 1;
+    /// Index (0-based) of field "migration_progress" in the _pico_plugin table format.
+    pub const FIELD_MIGRATION_PROGRESS: usize = 6;
 
     /// Format of the _pico_plugin global table.
     #[inline(always)]
@@ -451,6 +457,8 @@ impl PluginDef {
             Field::from(("services", FieldType::Array)),
             Field::from(("version", FieldType::String)),
             Field::from(("description", FieldType::String)),
+            Field::from(("migration_list", FieldType::Array)),
+            Field::from(("migration_progress", FieldType::Integer)),
         ]
     }
 
@@ -462,6 +470,8 @@ impl PluginDef {
             services: vec!["service_1".to_string(), "service_2".to_string()],
             version: "0.0.1".into(),
             description: "description".to_string(),
+            migration_list: vec![],
+            migration_progress: -1,
         }
     }
 }
@@ -484,9 +494,6 @@ pub struct ServiceDef {
     pub tiers: Vec<String>,
     /// Current service configuration.
     pub configuration: rmpv::Value,
-    /// Schema version.
-    // FIXME: for future improvements
-    pub schema_version: u64,
     /// Plugin description
     pub description: String,
 }
@@ -504,7 +511,6 @@ impl ServiceDef {
             Field::from(("version", FieldType::String)),
             Field::from(("tiers", FieldType::Array)),
             Field::from(("configuration", FieldType::Any)),
-            Field::from(("schema_version", FieldType::Unsigned)),
             Field::from(("description", FieldType::String)),
         ]
     }
@@ -517,7 +523,6 @@ impl ServiceDef {
             version: "0.0.1".into(),
             tiers: vec!["t1".to_string(), "t2".to_string()],
             configuration: rmpv::Value::Boolean(false),
-            schema_version: 1,
             description: "description".to_string(),
         }
     }
@@ -2624,6 +2629,8 @@ mod test {
         let tuple_data = p.to_tuple_buffer().unwrap();
         let format = PluginDef::format();
         crate::util::check_tuple_matches_format(tuple_data.as_ref(), &format, "PluginDef::format");
+        assert_eq!(format[PluginDef::FIELD_ENABLE].name, "enabled");
+        assert_eq!(format[PluginDef::FIELD_MIGRATION_PROGRESS].name, "migration_progress");
     }
 
     #[test]
diff --git a/src/sql.rs b/src/sql.rs
index ef671b4147..7e97897205 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -282,6 +282,10 @@ pub fn with_tracer(ctx: Context, tracer_kind: TracerKind) -> Context {
 /// Dispatches a query to the cluster.
 #[proc(packed_args)]
 pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result<Tuple> {
+    dispatch_sql_query(encoded_params)
+}
+
+pub fn dispatch_sql_query(encoded_params: EncodedPatternWithParams) -> traft::Result<Tuple> {
     let mut params = PatternWithParams::try_from(encoded_params)?;
     let id = params.clone_id();
     let mut ctx = params.extract_context();
diff --git a/test/conftest.py b/test/conftest.py
index 3631ca37d4..f345a7508f 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -1713,6 +1713,7 @@ def binary_path(cargo_build: None) -> str:
         f"{test_dir}/testplug/testplug_broken_manifest_3/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_small/libtestplug.{ext}",
         f"{test_dir}/testplug/testplug_small_svc2/libtestplug.{ext}",
+        f"{test_dir}/testplug/testplug_w_migration/libtestplug.{ext}",
     ]
     for destination in destinations:
         eprint(f"Copying '{source}' to '{destination}'")
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 4d27b09200..ce37029038 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -452,9 +452,9 @@ Insert({_pico_index}, [{_pico_tier},0,"_pico_tier_name","tree",[{{"unique":true}
 Insert({_pico_table}, [{_pico_routine},"_pico_routine",{{"Global":null}},[{{"field_type":"unsigned","is_nullable":false,"name":"id"}},{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"string","is_nullable":false,"name":"kind"}},{{"field_type":"array","is_nullable":false,"name":"params"}},{{"field_type":"array","is_nullable":false,"name":"returns"}},{{"field_type":"string","is_nullable":false,"name":"language"}},{{"field_type":"string","is_nullable":false,"name":"body"}},{{"field_type":"string","is_nullable":false,"name":"security"}},{{"field_type":"boolean","is_nullable":false,"name":"operable"}},{{"field_type":"unsigned","is_nullable":false,"name":"schema_version"}},{{"field_type":"unsigned","is_nullable":false,"name":"owner"}}],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_routine},0,"_pico_routine_id","tree",[{{"unique":true}}],[["id",null,null,null,null]],true,0,1]),
 Insert({_pico_index}, [{_pico_routine},1,"_pico_routine_name","tree",[{{"unique":true}}],[["name",null,null,null,null]],true,0,1]),
-Insert({_pico_table}, [{_pico_plugin},"_pico_plugin",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"boolean","is_nullable":false,"name":"enabled"}},{{"field_type":"array","is_nullable":false,"name":"services"}},{{"field_type":"string","is_nullable":false,"name":"version"}},{{"field_type":"string","is_nullable":false,"name":"description"}}],0,true,"memtx",1]),
+Insert({_pico_table}, [{_pico_plugin},"_pico_plugin",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"boolean","is_nullable":false,"name":"enabled"}},{{"field_type":"array","is_nullable":false,"name":"services"}},{{"field_type":"string","is_nullable":false,"name":"version"}},{{"field_type":"string","is_nullable":false,"name":"description"}},{{"field_type":"array","is_nullable":false,"name":"migration_list"}},{{"field_type":"integer","is_nullable":false,"name":"migration_progress"}}],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_plugin},0,"_pico_plugin_name","tree",[{{"unique":true}}],[["name",null,null,null,null]],true,0,1]),
-Insert({_pico_table}, [{_pico_service},"_pico_service",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"plugin_name"}},{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"string","is_nullable":false,"name":"version"}},{{"field_type":"array","is_nullable":false,"name":"tiers"}},{{"field_type":"any","is_nullable":false,"name":"configuration"}},{{"field_type":"unsigned","is_nullable":false,"name":"schema_version"}},{{"field_type":"string","is_nullable":false,"name":"description"}}],0,true,"memtx",1]),
+Insert({_pico_table}, [{_pico_service},"_pico_service",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"plugin_name"}},{{"field_type":"string","is_nullable":false,"name":"name"}},{{"field_type":"string","is_nullable":false,"name":"version"}},{{"field_type":"array","is_nullable":false,"name":"tiers"}},{{"field_type":"any","is_nullable":false,"name":"configuration"}},{{"field_type":"string","is_nullable":false,"name":"description"}}],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_service},0,"_pico_service_name","tree",[{{"unique":true}}],[["plugin_name",null,null,null,null],["name",null,null,null,null],["version",null,null,null,null]],true,0,1]),
 Insert({_pico_table}, [{_pico_service_route},"_pico_service_route",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"instance_id"}},{{"field_type":"string","is_nullable":false,"name":"plugin_name"}},{{"field_type":"string","is_nullable":false,"name":"service_name"}},{{"field_type":"boolean","is_nullable":false,"name":"poison"}}],0,true,"memtx",1]),
 Insert({_pico_index}, [{_pico_service_route},0,"_pico_service_routing_key","tree",[{{"unique":true}}],[["instance_id",null,null,null,null],["plugin_name",null,null,null,null],["service_name",null,null,null,null]],true,0,1])
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index 50fa2b992f..d212d2be3d 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -1,6 +1,7 @@
 import pytest
 import time
-from conftest import Cluster, ReturnError, retrying, Instance
+from conftest import Cluster, ReturnError, retrying, Instance, TarantoolError
+from typing import Any, Optional
 
 _3_SEC = 3
 _DEFAULT_CFG = {"foo": True, "bar": 101, "baz": ["one", "two", "three"]}
@@ -13,6 +14,8 @@ _PLUGIN_SMALL = "testplug_small"
 _PLUGIN_SMALL_SERVICES = ["testservice_1"]
 _PLUGIN_SMALL_SERVICES_SVC2 = ["testservice_2"]
 _DEFAULT_TIER = "default"
+_PLUGIN_WITH_MIGRATION = "testplug_w_migration"
+_PLUGIN_WITH_MIGRATION_SERVICES = ["testservice_2"]
 
 
 # ---------------------------------- Test helper classes {-----------------------------------------
@@ -33,6 +36,8 @@ class PluginReflection:
     installed: bool = False
     # if True - assert_synced checks that plugin are enabled
     enabled: bool = False
+    # plugin data [table -> tuples] map
+    data: dict[str, Optional[list[Any]]] = {}
 
     def __init__(self, name: str, services: list[str], *instances):
         """Create reflection with empty topology"""
@@ -68,6 +73,10 @@ class PluginReflection:
         self.instances.append(i)
         return self
 
+    def set_data(self, data: dict[str, Optional[list[Any]]]):
+        self.data = data
+        return self
+
     def assert_synced(self):
         """Assert that plugin reflection and plugin state in cluster are synchronized.
         This means that system tables `_pico_plugin`, `_pico_service` and `_pico_service_route`
@@ -102,6 +111,20 @@ class PluginReflection:
                 )
                 assert routes == expected_services
 
+    def assert_data_synced(self):
+        for table in self.data:
+            data = []
+
+            for i in self.instances:
+                if self.data[table] is None:
+                    with pytest.raises(TarantoolError, match="attempt to index field"):
+                        i.eval(f"return box.space.{table}:select()")
+                else:
+                    data += i.eval(f"return box.space.{table}:select()")
+
+            if self.data[table] is not None:
+                assert data.sort() == self.data[table].sort()
+
     @staticmethod
     def assert_cb_called(service, callback, called_times, *instances):
         for i in instances:
@@ -559,6 +582,101 @@ def test_plugin_not_enable_if_on_start_timeout(cluster: Cluster):
     plugin_ref.assert_cb_called("testservice_1", "on_stop", 2, i1, i2)
 
 
+# -------------------------- migration tests -------------------------------------
+
+
+_DATA = {
+    "AUTHOR": [
+        [1, "Alexander Pushkin"],
+        [2, "Alexander Blok"],
+    ],
+    "BOOK": [
+        [1, "Ruslan and Ludmila"],
+        [2, "The Tale of Tsar Saltan"],
+        [3, "The Twelve"],
+        [4, "The Lady Unknown"],
+    ],
+}
+
+_NO_DATA: dict[str, None] = {
+    "AUTHOR": None,
+    "BOOK": None,
+}
+
+
+def test_migration_on_plugin_install(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+    expected_state = PluginReflection(
+        _PLUGIN_WITH_MIGRATION, _PLUGIN_WITH_MIGRATION_SERVICES, i1, i2
+    )
+
+    i1.call("pico.install_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(True).set_data(_DATA)
+    expected_state.assert_synced()
+    expected_state.assert_data_synced()
+
+    i1.call("pico.remove_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(False).set_data(_NO_DATA)
+    expected_state.assert_synced()
+    expected_state.assert_data_synced()
+
+
+def test_migration_file_invalid_ext(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+    expected_state = PluginReflection(
+        _PLUGIN_WITH_MIGRATION, _PLUGIN_WITH_MIGRATION_SERVICES, i1, i2
+    )
+
+    # the first file in a migration list has an invalid extension
+    i1.call("pico._inject_error", "PLUGIN_MIGRATION_FIRST_FILE_INVALID_EXT", True)
+
+    with pytest.raises(ReturnError, match="invalid extension"):
+        i1.call("pico.install_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(False)
+    expected_state.assert_synced()
+
+
+def test_migration_apply_err(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+    expected_state = PluginReflection(
+        _PLUGIN_WITH_MIGRATION, _PLUGIN_WITH_MIGRATION_SERVICES, i1, i2
+    )
+
+    # second file in a migration list applied with error
+    i1.call("pico._inject_error", "PLUGIN_MIGRATION_SECOND_FILE_APPLY_ERROR", True)
+
+    with pytest.raises(ReturnError, match="Error while apply UP command"):
+        i1.call("pico.install_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(False).set_data(_NO_DATA)
+    expected_state.assert_synced()
+    expected_state.assert_data_synced()
+
+
+def test_migration_client_down(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2)
+    expected_state = PluginReflection(
+        _PLUGIN_WITH_MIGRATION, _PLUGIN_WITH_MIGRATION_SERVICES, i1, i2
+    )
+
+    # client down while applied migration
+    i1.call("pico._inject_error", "PLUGIN_MIGRATION_CLIENT_DOWN", True)
+
+    i1.call("pico.install_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(True)
+    expected_state.assert_synced()
+
+    with pytest.raises(ReturnError, match="Error while enable the plugin"):
+        i1.call("pico.enable_plugin", _PLUGIN_WITH_MIGRATION)
+
+    i1.call("pico.remove_plugin", _PLUGIN_WITH_MIGRATION, timeout=5)
+    expected_state = expected_state.install(False).set_data(_NO_DATA)
+    expected_state.assert_synced()
+    expected_state.assert_data_synced()
+
+
+# -------------------------- configuration tests -------------------------------------
+
+
 def test_config_validation(cluster: Cluster):
     i1, i2 = cluster.deploy(instance_count=2)
     plugin_ref = PluginReflection.default(i1, i2)
@@ -707,6 +825,9 @@ def test_instance_service_poison_and_healthy_then(cluster: Cluster):
     plugin_ref.assert_config("testservice_1", _NEW_CFG_2, i1, i2)
 
 
+# -------------------------- leader change test -----------------------------------
+
+
 def test_on_leader_change(cluster: Cluster):
     i1 = cluster.add_instance(replicaset_id="r1", wait_online=True)
     i2 = cluster.add_instance(replicaset_id="r1", wait_online=True)
diff --git a/test/testplug/testplug_w_migration/author.db b/test/testplug/testplug_w_migration/author.db
new file mode 100644
index 0000000000..f9eb141c0a
--- /dev/null
+++ b/test/testplug/testplug_w_migration/author.db
@@ -0,0 +1,7 @@
+-- pico.UP
+CREATE TABLE author (id INTEGER NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id)) USING memtx DISTRIBUTED BY (id)
+INSERT INTO author (id, name) VALUES (1, 'Alexander Pushkin')
+INSERT INTO author (id, name) VALUES (2, 'Alexander Blok')
+
+-- pico.DOWN
+DROP TABLE author;
diff --git a/test/testplug/testplug_w_migration/book.db b/test/testplug/testplug_w_migration/book.db
new file mode 100644
index 0000000000..cd4fa145ec
--- /dev/null
+++ b/test/testplug/testplug_w_migration/book.db
@@ -0,0 +1,9 @@
+-- pico.UP
+CREATE TABLE book (id INTEGER NOT NULL, name TEXT NOT NULL, PRIMARY KEY (id)) USING memtx DISTRIBUTED BY (id);
+INSERT INTO book (id, name) VALUES (1, 'Ruslan and Ludmila');
+INSERT INTO book (id, name) VALUES (2, 'The Tale of Tsar Saltan');
+INSERT INTO book (id, name) VALUES (3, 'The Twelve');
+INSERT INTO book (id, name) VALUES (4, 'The Lady Unknown');
+
+-- pico.DOWN
+DROP TABLE book;
diff --git a/test/testplug/testplug_w_migration/manifest.yaml b/test/testplug/testplug_w_migration/manifest.yaml
new file mode 100644
index 0000000000..e0cfcd1758
--- /dev/null
+++ b/test/testplug/testplug_w_migration/manifest.yaml
@@ -0,0 +1,10 @@
+description: plugin for test purposes
+name: testplug_w_migration
+version: 0.1.0
+services:
+  - name: testservice_2
+    description: testservice_2 descr
+    default_configuration:
+migration:
+  - author.db
+  - book.db
-- 
GitLab