From 1cfb9d750d7819f19ac4f4453b5edb53da490620 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 23 May 2023 14:48:01 +0300 Subject: [PATCH] fix: remove migrations completely They're being replaced with ddl create_space, etc. --- src/governor/migration.rs | 77 ------------------- src/governor/mod.rs | 34 --------- src/governor/plan.rs | 26 ------- src/lib.rs | 72 +----------------- src/rpc/migration.rs | 41 ----------- src/rpc/mod.rs | 1 - src/storage.rs | 122 +------------------------------ src/traft/mod.rs | 10 --- test/int/test_migration.py | 70 ------------------ test/int/test_sql.py | 7 -- test/manual/sql/test_sql_perf.py | 21 +++--- 11 files changed, 11 insertions(+), 470 deletions(-) delete mode 100644 src/governor/migration.rs delete mode 100644 src/rpc/migration.rs delete mode 100644 test/int/test_migration.py diff --git a/src/governor/migration.rs b/src/governor/migration.rs deleted file mode 100644 index f7aef01358..0000000000 --- a/src/governor/migration.rs +++ /dev/null @@ -1,77 +0,0 @@ -use crate::replicaset::Replicaset; - -pub(crate) fn get_pending_migration<'r>( - mut migration_ids: Vec<u64>, - replicasets: &[&'r Replicaset], - desired_schema_version: u64, -) -> Option<(u64, &'r Replicaset)> { - migration_ids.sort(); - for m_id in migration_ids { - for r in replicasets { - if r.current_schema_version < m_id && m_id <= desired_schema_version { - return Some((m_id, r)); - } - } - } - None -} - -#[cfg(test)] -mod tests { - use super::*; - - #[allow(non_snake_case)] - fn R(rid: &str, mid: u64) -> Replicaset { - Replicaset { - replicaset_id: rid.into(), - replicaset_uuid: Default::default(), - master_id: String::default().into(), - weight: Default::default(), - current_schema_version: mid, - } - } - - #[test] - fn test_waiting_migrations() { - let ms = vec![3, 2, 1]; - let rs = [R("r1", 0), R("r2", 2), R("r3", 1)]; - let rs = rs.iter().collect::<Vec<_>>(); - assert_eq!(get_pending_migration(ms.clone(), &rs, 0), None); - - let (m, r) = get_pending_migration(ms.clone(), &rs, 1).unwrap(); - assert_eq!((m, &*r.replicaset_id), (1, "r1")); - - let rs = [R("r1", 1), R("r2", 2), R("r3", 1)]; - let rs = rs.iter().collect::<Vec<_>>(); - assert_eq!(get_pending_migration(ms.clone(), &rs, 1), None); - - let (m, r) = get_pending_migration(ms.clone(), &rs, 2).unwrap(); - assert_eq!((m, &*r.replicaset_id), (2, "r1")); - - let rs = [R("r1", 2), R("r2", 2), R("r3", 1)]; - let rs = rs.iter().collect::<Vec<_>>(); - let (m, r) = get_pending_migration(ms.clone(), &rs, 2).unwrap(); - assert_eq!((m, &*r.replicaset_id), (2, "r3")); - - let rs = [R("r1", 2), R("r2", 2), R("r3", 2)]; - let rs = rs.iter().collect::<Vec<_>>(); - assert_eq!(get_pending_migration(ms.clone(), &rs, 2), None); - - let (m, r) = get_pending_migration(ms.clone(), &rs, 3).unwrap(); - assert_eq!((m, &*r.replicaset_id), (3, "r1")); - - let rs = [R("r1", 3), R("r2", 2), R("r3", 2)]; - let rs = rs.iter().collect::<Vec<_>>(); - let (m, r) = get_pending_migration(ms.clone(), &rs, 99).unwrap(); - assert_eq!((m, &*r.replicaset_id), (3, "r2")); - - let rs = [R("r1", 3), R("r2", 3), R("r3", 2)]; - let rs = rs.iter().collect::<Vec<_>>(); - let (m, r) = get_pending_migration(ms.clone(), &rs, 99).unwrap(); - assert_eq!((m, &*r.replicaset_id), (3, "r3")); - - let rs = [R("r1", 3), R("r2", 3), R("r3", 3)]; - let rs = rs.iter().collect::<Vec<_>>(); - assert_eq!(get_pending_migration(ms.clone(), &rs, 99), None); - } -} diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 100e3944c9..5933ab6667 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -6,7 +6,6 @@ use ::tarantool::fiber::r#async::timeout::Error as TimeoutError; use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::watch; -use crate::event::{self, Event}; use crate::instance::Instance; use crate::op::Op; use crate::r#loop::FlowControl::{self, Continue}; @@ -25,7 +24,6 @@ use crate::unwrap_ok_or; use futures::future::try_join_all; pub(crate) mod cc; -pub(crate) mod migration; pub(crate) mod plan; use plan::action_plan; @@ -59,7 +57,6 @@ impl Loop { .iter() .map(|rs| (&rs.replicaset_id, rs)) .collect(); - let migration_ids = storage.migrations.iter().unwrap().map(|m| m.id).collect(); let term = status.get().term; let applied = raft_storage.applied().unwrap().unwrap(); @@ -67,7 +64,6 @@ impl Loop { let node = global().expect("must be initialized"); let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap(); let replication_factor = storage.properties.replication_factor().unwrap(); - let desired_schema_version = storage.properties.desired_schema_version().unwrap(); let pending_schema_change = storage.properties.pending_schema_change().unwrap(); let has_pending_schema_change = pending_schema_change.is_some(); @@ -79,11 +75,9 @@ impl Loop { &voters, &learners, &replicasets, - migration_ids, node.raft_id, vshard_bootstrapped, replication_factor, - desired_schema_version, has_pending_schema_change, ); let plan = unwrap_ok_or!(plan, @@ -499,34 +493,6 @@ impl Loop { } } - Plan::ApplyMigration(ApplyMigration { target, rpc, op }) => { - let migration_id = rpc.migration_id; - governor_step! { - "applying migration on a replicaset" [ - "replicaset_id" => %target.replicaset_id, - "migration_id" => %migration_id, - ] - async { - pool - .call(&target.master_id, &rpc)? - .timeout(Loop::SYNC_TIMEOUT) - .await?; - } - } - - governor_step! { - "proposing replicaset current schema version change" [ - "replicaset_id" => %target.replicaset_id, - "schema_version" => %migration_id, - ] - async { - node.propose_and_wait(op, Duration::from_secs(3))?? - } - } - - event::broadcast(Event::MigrateDone); - } - Plan::None => { tlog!(Info, "nothing to do, waiting for events to handle"); _ = waker.changed().await; diff --git a/src/governor/plan.rs b/src/governor/plan.rs index bb4b03119e..4e96eeb1b1 100644 --- a/src/governor/plan.rs +++ b/src/governor/plan.rs @@ -13,7 +13,6 @@ use ::tarantool::space::UpdateOps; use std::collections::HashMap; use super::cc::raft_conf_change; -use super::migration::get_pending_migration; use super::Loop; #[allow(clippy::too_many_arguments)] @@ -25,11 +24,9 @@ pub(super) fn action_plan<'i>( voters: &[RaftId], learners: &[RaftId], replicasets: &HashMap<&ReplicasetId, &'i Replicaset>, - migration_ids: Vec<u64>, my_raft_id: RaftId, vshard_bootstrapped: bool, replication_factor: usize, - desired_schema_version: u64, has_pending_schema_change: bool, ) -> Result<Plan<'i>> { //////////////////////////////////////////////////////////////////////////// @@ -360,23 +357,6 @@ pub(super) fn action_plan<'i>( return Ok(ApplySchemaChange { rpc, targets }.into()); } - //////////////////////////////////////////////////////////////////////////// - // migration - let replicasets: Vec<_> = replicasets.values().copied().collect(); - let to_apply = get_pending_migration(migration_ids, &replicasets, desired_schema_version); - if let Some((migration_id, target)) = to_apply { - let rpc = rpc::migration::apply::Request { - term, - applied, - timeout: Loop::SYNC_TIMEOUT, - migration_id, - }; - let mut ops = UpdateOps::new(); - ops.assign("current_schema_version", migration_id)?; - let op = Dml::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?; - return Ok(ApplyMigration { target, rpc, op }.into()); - } - Ok(Plan::None) } @@ -488,12 +468,6 @@ pub mod stage { pub targets: Vec<&'i InstanceId>, pub rpc: rpc::ddl_apply::Request, } - - pub struct ApplyMigration<'i> { - pub target: &'i Replicaset, - pub rpc: rpc::migration::apply::Request, - pub op: Dml, - } } } diff --git a/src/lib.rs b/src/lib.rs index a49d007ba0..0e7a3c0a13 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -13,7 +13,6 @@ use rpc::{join, update_instance}; use std::convert::TryFrom; use std::time::{Duration, Instant}; use storage::Clusterwide; -use storage::ToEntryIter as _; use storage::{ClusterwideSpace, PropertyName}; use traft::RaftSpaceAccess; use traft::RaftTerm; @@ -24,9 +23,8 @@ use crate::instance::grade::TargetGradeVariant; use crate::instance::InstanceId; use crate::schema::CreateSpaceParams; use crate::tlog::set_log_level; -use crate::traft::event::Event; +use crate::traft::node; use crate::traft::op::{self, Op}; -use crate::traft::{event, node, Migration}; use crate::traft::{LogicalClock, RaftIndex}; use traft::error::Error; @@ -381,74 +379,6 @@ fn picolib_setup(args: &args::Run) { ), ); - luamod.set( - "add_migration", - tlua::function2(|id: u64, body: String| -> traft::Result<()> { - let migration = Migration { id, body }; - let op = op::Dml::insert(ClusterwideSpace::Migration, &migration)?; - node::global()?.propose_and_wait(op, Duration::MAX)??; - Ok(()) - }), - ); - - luamod.set( - "push_schema_version", - tlua::function1(|id: u64| -> traft::Result<()> { - let op = op::Dml::replace( - ClusterwideSpace::Property, - &(PropertyName::DesiredSchemaVersion, id), - )?; - node::global()?.propose_and_wait(op, Duration::MAX)??; - Ok(()) - }), - ); - - luamod.set( - "migrate", - tlua::Function::new( - |m_id: Option<u64>, timeout: Option<f64>| -> traft::Result<Option<u64>> { - let node = node::global()?; - - let Some(latest) = node.storage.migrations.get_latest()? else { - tlog!(Info, "there are no migrations to apply"); - return Ok(None); - }; - let current_version = node.storage.properties.desired_schema_version()?; - let target_version = m_id.map(|id| id.min(latest.id)).unwrap_or(latest.id); - if target_version <= current_version { - return Ok(Some(current_version)); - } - - let op = op::Dml::replace( - ClusterwideSpace::Property, - &(PropertyName::DesiredSchemaVersion, target_version), - )?; - node.propose_and_wait(op, Duration::MAX)??; - - let deadline = { - let timeout = timeout - .map(Duration::from_secs_f64) - .unwrap_or(Duration::MAX); - let now = Instant::now(); - now.checked_add(timeout) - .unwrap_or_else(|| now + Duration::from_secs(30 * 365 * 24 * 60 * 60)) - }; - while node - .storage - .replicasets - .iter()? - .any(|r| r.current_schema_version < target_version) - { - if event::wait_deadline(Event::MigrateDone, deadline)?.is_timeout() { - return Err(Error::Timeout); - } - } - - Ok(Some(node.storage.properties.desired_schema_version()?)) - }, - ), - ); - // Trims raft log up to the given index (excluding the index // itself). Returns the new `first_index` after the log compaction. luamod.set("raft_compact_log", { diff --git a/src/rpc/migration.rs b/src/rpc/migration.rs deleted file mode 100644 index d13199d570..0000000000 --- a/src/rpc/migration.rs +++ /dev/null @@ -1,41 +0,0 @@ -pub mod apply { - use crate::rpc; - use crate::traft::{error::Error, node, RaftIndex, RaftTerm, Result}; - use std::time::Duration; - use tarantool::{lua_state, tlua::LuaError}; - - crate::define_rpc_request! { - fn proc_apply_migration(req: Request) -> Result<Response> { - let node = node::global()?; - node.status().check_term(req.term)?; - rpc::sync::wait_for_index_timeout(req.applied, &node.raft_storage, req.timeout)?; - - let storage = &node.storage; - - let Some(migration) = storage.migrations.get(req.migration_id)? else { - return Err(Error::other(format!("migration {0} not found", req.migration_id))); - }; - - lua_state() - .exec_with( - "local ok, err = box.execute(...) - if not ok then - box.error(err) - end", - migration.body, - ) - .map_err(LuaError::from)?; - - Ok(Response {}) - } - - pub struct Request { - pub term: RaftTerm, - pub applied: RaftIndex, - pub timeout: Duration, - pub migration_id: u64, - } - - pub struct Response {} - } -} diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index 57dd0b3689..b4dcb3e69e 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -18,7 +18,6 @@ pub mod ddl_apply; pub mod expel; pub mod join; pub mod lsn; -pub mod migration; pub mod replication; pub mod sharding; pub mod sync; diff --git a/src/storage.rs b/src/storage.rs index 04c5b96c50..e6388d638f 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -15,7 +15,6 @@ use crate::tlog; use crate::traft; use crate::traft::error::Error; use crate::traft::op::Ddl; -use crate::traft::Migration; use crate::traft::RaftId; use crate::traft::Result; @@ -389,18 +388,6 @@ define_clusterwide_spaces! { /// An enumeration of indexes defined for replicaset space. pub enum SpaceReplicasetIndex; } - Migration = "_pico_migration" => { - Clusterwide::migrations; - - pub struct Migrations { - space: Space, - #[primary] - index: Index => Id = "id", - } - - /// An enumeration of indexes defined for migration space. - pub enum SpaceMigrationIndex; - } Space = "_pico_space" => { Clusterwide::spaces; @@ -723,8 +710,6 @@ pub trait TClusterwideSpaceIndex { pub enum PropertyName { ReplicationFactor = "replication_factor", VshardBootstrapped = "vshard_bootstrapped", - // TODO: remove this - DesiredSchemaVersion = "desired_schema_version", /// Pending ddl operation which is to be either committed or aborted. /// @@ -817,14 +802,6 @@ impl Properties { Ok(res) } - #[inline] - pub fn desired_schema_version(&self) -> tarantool::Result<u64> { - let res = self - .get(PropertyName::DesiredSchemaVersion)? - .unwrap_or_default(); - Ok(res) - } - #[inline] pub fn pending_schema_change(&self) -> tarantool::Result<Option<Ddl>> { self.get(PropertyName::PendingSchemaChange) @@ -1276,56 +1253,6 @@ where } } -//////////////////////////////////////////////////////////////////////////////// -// Migrations -//////////////////////////////////////////////////////////////////////////////// - -impl Migrations { - pub fn new() -> tarantool::Result<Self> { - let space = Space::builder(Self::SPACE_NAME) - .is_local(true) - .is_temporary(false) - .field(("id", FieldType::Unsigned)) - .field(("body", FieldType::String)) - .if_not_exists(true) - .create()?; - - let index = space - .index_builder(Self::primary_index().as_str()) - .unique(true) - .part("id") - .if_not_exists(true) - .create()?; - - Ok(Self { space, index }) - } - - #[inline] - pub fn get(&self, id: u64) -> tarantool::Result<Option<Migration>> { - match self.space.get(&[id])? { - Some(tuple) => tuple.decode().map(Some), - None => Ok(None), - } - } - - #[inline] - pub fn get_latest(&self) -> tarantool::Result<Option<Migration>> { - let iter = self.space.select(IteratorType::Req, &())?; - let iter = EntryIter::new(iter); - let ms = iter.take(1).collect::<Vec<_>>(); - Ok(ms.first().cloned()) - } -} - -impl ToEntryIter for Migrations { - type Entry = Migration; - - #[inline(always)] - fn index_iter(&self) -> Result<IndexIterator> { - Ok(self.space.select(IteratorType::All, &())?) - } -} - //////////////////////////////////////////////////////////////////////////////// // EntryIter //////////////////////////////////////////////////////////////////////////////// @@ -1752,27 +1679,6 @@ mod tests { ); } - #[rustfmt::skip] - #[::tarantool::test] - fn test_storage_migrations() { - let migrations = Migrations::new().unwrap(); - - assert_eq!(None, migrations.get_latest().unwrap()); - - for m in &[ - (1, "first"), - (3, "third"), - (2, "second") - ] { - migrations.space.put(&m).unwrap(); - } - - assert_eq!( - Some(Migration {id: 3, body: "third".to_string()}), - migrations.get_latest().unwrap() - ); - } - #[::tarantool::test] fn clusterwide_space_index() { let storage = Clusterwide::new().unwrap(); @@ -1838,16 +1744,10 @@ mod tests { }; storage.replicasets.space.insert(&r).unwrap(); - storage - .migrations - .space - .insert(&(1, "drop table BANK_ACCOUNTS")) - .unwrap(); - let snapshot_data = Clusterwide::snapshot_data().unwrap(); let space_dumps = snapshot_data.space_dumps; - assert_eq!(space_dumps.len(), 7); + assert_eq!(space_dumps.len(), 6); for space_dump in &space_dumps { match &space_dump.space { @@ -1878,12 +1778,6 @@ mod tests { assert_eq!(replicaset, r); } - s if s == &*ClusterwideSpace::Migration => { - let [migration]: [(i32, String); 1] = - Decode::decode(space_dump.tuples.as_ref()).unwrap(); - assert_eq!(migration, (1, "drop table BANK_ACCOUNTS".to_owned())); - } - s if s == &*ClusterwideSpace::Space => { let []: [(); 0] = Decode::decode(space_dump.tuples.as_ref()).unwrap(); } @@ -1943,16 +1837,6 @@ mod tests { tuples, }); - let m = Migration { - id: 1, - body: "drop table BANK_ACCOUNTS".into(), - }; - let tuples = [&m].to_tuple_buffer().unwrap(); - data.space_dumps.push(SpaceDump { - space: ClusterwideSpace::Migration.into(), - tuples, - }); - let raw_data = data.to_tuple_buffer().unwrap(); storage.for_each_space(|s| s.truncate()).unwrap(); @@ -1974,9 +1858,5 @@ mod tests { assert_eq!(storage.replicasets.space.len().unwrap(), 1); let replicaset = storage.replicasets.get("r1").unwrap().unwrap(); assert_eq!(replicaset, r); - - assert_eq!(storage.migrations.space.len().unwrap(), 1); - let migration = storage.migrations.get(1).unwrap().unwrap(); - assert_eq!(migration, m); } } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 669e9b28c9..aac2a2102c 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -381,13 +381,3 @@ pub fn replicaset_uuid(replicaset_id: &str) -> String { fn uuid_v3(name: &str) -> Uuid { Uuid::new_v3(&Uuid::nil(), name.as_bytes()) } - -//////////////////////////////////////////////////////////////////////////////// -/// Migration -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct Migration { - pub id: u64, - pub body: String, -} - -impl Encode for Migration {} diff --git a/test/int/test_migration.py b/test/int/test_migration.py deleted file mode 100644 index 46e4e95dd9..0000000000 --- a/test/int/test_migration.py +++ /dev/null @@ -1,70 +0,0 @@ -from conftest import Cluster - - -def test_add_migration(cluster: Cluster): - cluster.deploy(instance_count=2) - i1, i2 = cluster.instances - i1.promote_or_fail() - i1.eval("pico.add_migration(1, 'migration body')") - assert i2.call("box.space._pico_migration:select") == [[1, "migration body"]] - - -def test_push_schema_version(cluster: Cluster): - cluster.deploy(instance_count=2) - i1, i2 = cluster.instances - i1.promote_or_fail() - i1.eval("pico.push_schema_version(3)") - key = "desired_schema_version" - assert i2.call("box.space._pico_property:select", [key]) == [[key, 3]] - - -def test_apply_migrations(cluster: Cluster): - # Scenario: apply migration to cluster - # Given a cluster with added migration - # When it push up desired_schema_version - # Then the migration is applied - # And replicaset's current_schema_version is set to desired_schema_version - - cluster.deploy(instance_count=3) - i1 = cluster.instances[0] - i1.promote_or_fail() - i1.assert_raft_status("Leader") - - def replicaset_schema_versions(instance): - return instance.eval( - """ - return box.space._pico_replicaset:pairs() - :map(function(replicaset) - return replicaset.current_schema_version - end) - :totable() - """ - ) - - for n, sql in { - 1: """create table "test_space" ("id" int primary key)""", - 2: """alter table "test_space" add column "value" varchar(100)""", - }.items(): - i1.call("pico.add_migration", n, sql) - - assert i1.call("pico.migrate", 1) == 1 - - for i in cluster.instances: - format = i.call("box.space.test_space:format") - assert [f["name"] for f in format] == ["id"] - - assert set(replicaset_schema_versions(i)) == {1} - - # idempotent - assert i1.call("pico.migrate", 1) == 1 - - assert i1.call("pico.migrate", 2) == 2 - - for i in cluster.instances: - format = i.call("box.space.test_space:format") - assert [f["name"] for f in format] == ["id", "value"] - assert set(replicaset_schema_versions(i)) == {2} - - # idempotent - assert i1.call("pico.migrate", 1) == 2 - assert i1.call("pico.migrate", 2) == 2 diff --git a/test/int/test_sql.py b/test/int/test_sql.py index 42e18b7e1b..8fe090633e 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -1,19 +1,12 @@ -import funcy # type: ignore import pytest import re from conftest import ( Cluster, - Instance, ReturnError, ) -@funcy.retry(tries=30, timeout=0.2) -def apply_migration(i: Instance, n: int): - assert i.call("pico.migrate", n) == n - - def test_pico_sql(cluster: Cluster): cluster.deploy(instance_count=1) i1 = cluster.instances[0] diff --git a/test/manual/sql/test_sql_perf.py b/test/manual/sql/test_sql_perf.py index b6a01f2af8..2b3ecb06c3 100644 --- a/test/manual/sql/test_sql_perf.py +++ b/test/manual/sql/test_sql_perf.py @@ -14,18 +14,15 @@ def test_projection(cluster: Cluster): i1, i2 = cluster.instances # Create a sharded space and populate it with data. - for n, sql in { - 1: """create table t(a int, "bucket_id" unsigned, primary key (a));""", - 2: """create index "bucket_id" on t ("bucket_id");""", - 3: """create table "_pico_space"("id" int, "distribution" text, primary key("id"));""", - }.items(): - i1.call("pico.add_migration", n, sql) - apply_migration(i1, 3) - - space_id = i1.eval("return box.space.T.id") - sql = """insert into "_pico_space" values({id}, 'A');""".format(id=space_id) - i1.call("pico.add_migration", 4, sql) - apply_migration(i1, 4) + cluster.create_space( + dict( + id=895, + name="T", + format=[dict(name="A", type="integer", is_nullable=False)], + primary_key=["A"], + distribution=dict(sharding_key=["A"], sharding_fn="murmur3"), + ) + ) row_number = 100 for n in range(row_number): -- GitLab