From e6a0946b257f97473934bdfa283499bf0ace151d Mon Sep 17 00:00:00 2001 From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io> Date: Mon, 7 Nov 2022 14:25:46 +0300 Subject: [PATCH] feat: add Migrations space and picolib.add_migration() --- src/main.rs | 12 +++++++++- src/traft/mod.rs | 10 ++++++++ src/traft/storage.rs | 48 +++++++++++++++++++++++++++++++++++++- test/int/test_migration.py | 10 ++++++++ 4 files changed, 78 insertions(+), 2 deletions(-) create mode 100644 test/int/test_migration.py diff --git a/src/main.rs b/src/main.rs index 932547dc9f..3200c1c1ba 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ use clap::StructOpt as _; use protobuf::Message as _; use crate::tlog::set_log_level; -use crate::traft::InstanceId; +use crate::traft::{node, InstanceId, Migration, OpDML}; use crate::traft::{LogicalClock, RaftIndex, TargetGrade}; use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; use traft::error::Error; @@ -387,6 +387,16 @@ fn picolib_setup(args: &args::Run) { }, ), ); + + luamod.set( + "add_migration", + tlua::function2(|id: u64, body: String| -> traft::Result<()> { + let migration = Migration { id, body }; + let op = OpDML::insert(ClusterSpace::Migrations, &migration)?; + node::global()?.propose_and_wait(op, Duration::MAX)??; + Ok(()) + }), + ); } fn preload_vshard() { diff --git a/src/traft/mod.rs b/src/traft/mod.rs index cca1291141..034309c17f 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -984,3 +984,13 @@ impl<'a> IntoIterator for &'a FailureDomain { self.data.iter() } } + +//////////////////////////////////////////////////////////////////////////////// +/// Migration +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct Migration { + pub id: u64, + pub body: String, +} + +impl Encode for Migration {} diff --git a/src/traft/storage.rs b/src/traft/storage.rs index e96e34059b..be6df51999 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -14,7 +14,7 @@ use crate::traft::Result; use std::marker::PhantomData; -use super::RaftSpaceAccess; +use super::{Migration, RaftSpaceAccess}; //////////////////////////////////////////////////////////////////////////////// // ClusterSpace @@ -26,6 +26,7 @@ define_str_enum! { Group = "raft_group", State = "cluster_state", Replicasets = "replicasets", + Migrations = "migrations", } FromStr::Err = UnknownClusterSpace; @@ -100,6 +101,7 @@ pub struct Storage { pub peers: Peers, pub replicasets: Replicasets, pub raft: RaftSpaceAccess, + pub migrations: Migrations, } impl Storage { @@ -109,6 +111,7 @@ impl Storage { peers: Peers::new()?, replicasets: Replicasets::new()?, raft: RaftSpaceAccess::new()?, + migrations: Migrations::new()?, }) } } @@ -640,6 +643,49 @@ where } } +//////////////////////////////////////////////////////////////////////////////// +// Migrations +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug)] +pub struct Migrations { + #[allow(dead_code)] + space: Space, +} + +impl Migrations { + const SPACE_NAME: &'static str = ClusterSpace::Migrations.as_str(); + const INDEX_PRIMARY: &'static str = "pk"; + + pub fn new() -> tarantool::Result<Self> { + let space = Space::builder(Self::SPACE_NAME) + .is_local(true) + .is_temporary(false) + .field(("id", FieldType::Unsigned)) + .field(("body", FieldType::String)) + .if_not_exists(true) + .create()?; + + space + .index_builder(Self::INDEX_PRIMARY) + .unique(true) + .part("id") + .if_not_exists(true) + .create()?; + + Ok(Self { space }) + } + + #[allow(dead_code)] + #[inline] + pub fn get(&self, id: u64) -> tarantool::Result<Option<Migration>> { + match self.space.get(&[id])? { + Some(tuple) => tuple.decode().map(Some), + None => Ok(None), + } + } +} + //////////////////////////////////////////////////////////////////////////////// // tests //////////////////////////////////////////////////////////////////////////////// diff --git a/test/int/test_migration.py b/test/int/test_migration.py new file mode 100644 index 0000000000..425f505f13 --- /dev/null +++ b/test/int/test_migration.py @@ -0,0 +1,10 @@ +from conftest import Cluster + + +def test_add_migration(cluster: Cluster): + cluster.deploy(instance_count=2) + i1, i2 = cluster.instances + i1.promote_or_fail() + i1.eval("picolib.add_migration(1, 'migration body')") + migrations_table = i2.call("box.space.migrations:select") + assert [[1, "migration body"]] == migrations_table -- GitLab