diff --git a/src/main.rs b/src/main.rs index 4ee86fc07eda10a593782923f77b0d2bd5c1807f..40b59e92cd005a27341f5eae05798ff182e6b579 100644 --- a/src/main.rs +++ b/src/main.rs @@ -461,6 +461,7 @@ fn init_handlers() { declare_cfunc!(traft::rpc::replication::proc_replication); declare_cfunc!(traft::rpc::replication::promote::proc_replication_promote); declare_cfunc!(traft::rpc::sharding::proc_sharding); + declare_cfunc!(traft::rpc::migration::apply::proc_apply_migration); } fn rm_tarantool_files(data_dir: &str) { diff --git a/src/traft/event.rs b/src/traft/event.rs index 0d2c7dede1d614aad432c17dadbdb8642f6fe982..83c039664111549684bc67289c19fc33cc96dc4e 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -28,6 +28,7 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; TopologyChanged = "raft.topology-changed", RaftLoopNeeded = "raft.loop-needed", RaftEntryApplied = "raft.entry-applied", + ClusterStateChanged = "picodata.cluster-state-updated", } } diff --git a/src/traft/governor.rs b/src/traft/governor.rs index 7f174b7331f237b81a7a4c204967d0f6230ece8f..7e4a32c81c7ca30b2d80ce7ceb917b529e0192dc 100644 --- a/src/traft/governor.rs +++ b/src/traft/governor.rs @@ -9,6 +9,10 @@ use crate::traft::Peer; use crate::traft::RaftId; use crate::traft::TargetGradeVariant; +use super::Migration; +use super::Replicaset; +use super::ReplicasetId; + struct RaftConf<'a> { all: BTreeMap<RaftId, &'a Peer>, voters: BTreeSet<RaftId>, @@ -168,6 +172,27 @@ pub(crate) fn raft_conf_change( Some(conf_change) } +pub(crate) fn waiting_migrations<'a>( + migrations: &'a mut [Migration], + replicasets: &'a [Replicaset], + desired_schema_version: u64, +) -> Vec<(u64, Vec<ReplicasetId>)> { + migrations.sort_by(|a, b| a.id.partial_cmp(&b.id).unwrap()); + let mut res: Vec<(u64, Vec<ReplicasetId>)> = Vec::new(); + for m in migrations { + let mut rs: Vec<ReplicasetId> = Vec::new(); + for r in replicasets { + if r.current_schema_version < m.id && m.id <= desired_schema_version { + rs.push(r.replicaset_id.clone()); + } + } + if !rs.is_empty() { + res.push((m.id, rs)); + } + } + res +} + #[cfg(test)] mod tests { use ::raft::prelude as raft; @@ -367,4 +392,57 @@ mod tests { None ); } + + use crate::traft::{InstanceId, Migration, Replicaset, ReplicasetId}; + + use super::waiting_migrations; + + macro_rules! m { + ($id:literal, $body:literal) => { + Migration { + id: $id, + body: $body.to_string(), + } + }; + } + + macro_rules! r { + ($id:literal, $schema_version:literal) => { + Replicaset { + replicaset_id: ReplicasetId($id.to_string()), + replicaset_uuid: "".to_string(), + master_id: InstanceId("i0".to_string()), + weight: 1.0, + current_schema_version: $schema_version, + } + }; + } + + macro_rules! expect { + [$(($migration_id:literal, $($replicaset_id:literal),*)),*] => [vec![ + $(( + $migration_id, + vec![$(ReplicasetId($replicaset_id.to_string())),*].to_vec() + )),* + ].to_vec()]; + } + + #[test] + fn test_waiting_migrations() { + let ms = vec![m!(1, "m1"), m!(2, "m2"), m!(3, "m3")].to_vec(); + let rs = vec![r!("r1", 0), r!("r2", 2), r!("r3", 1)].to_vec(); + assert_eq!(waiting_migrations(&mut ms.clone(), &rs, 0), expect![]); + assert_eq!( + waiting_migrations(&mut ms.clone(), &rs, 1), + expect![(1, "r1")] + ); + assert_eq!( + waiting_migrations(&mut ms.clone(), &rs, 2), + expect![(1, "r1"), (2, "r1", "r3")] + ); + assert_eq!( + waiting_migrations(&mut ms.clone(), &rs, 3), + expect![(1, "r1"), (2, "r1", "r3"), (3, "r1", "r2", "r3")] + ); + } } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 108275b2f8ee254778783985458f8aec632f7c91..71c5479213c2efa10957b7b00da8af9be8a4d8ce 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -36,6 +36,8 @@ use storage::ClusterSpace; pub use storage::Storage; pub use topology::Topology; +use self::event::Event; + pub type RaftId = u64; pub type RaftTerm = u64; pub type RaftIndex = u64; @@ -185,7 +187,13 @@ impl Op { peers.put(peer).unwrap(); Box::new(peer.clone()) } - Self::Dml(op) => Box::new(op.result()), + Self::Dml(op) => { + let res = Box::new(op.result()); + if op.space() == &ClusterSpace::State { + event::broadcast(Event::ClusterStateChanged); + } + res + } } } @@ -329,6 +337,16 @@ impl OpDML { }; Ok(res) } + + #[rustfmt::skip] + pub fn space(&self) -> &ClusterSpace { + match &self { + Self::Insert { space, .. } => space, + Self::Replace { space, .. } => space, + Self::Update { space, .. } => space, + Self::Delete { space, .. } => space, + } + } } mod vec_of_raw_byte_buf { diff --git a/src/traft/node.rs b/src/traft/node.rs index 301ace677cf2f53d6a469fa94f23a8292a4975de..b4b7de542b99b846eac6e54ff924c1529a7541b3 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -30,7 +30,8 @@ use tarantool::space::UpdateOps; use crate::kvcell::KVCell; use crate::r#loop::{FlowControl, Loop}; use crate::stringify_cfunc; -use crate::traft::governor::raft_conf_change; +use crate::traft::governor::{raft_conf_change, waiting_migrations}; +use crate::traft::rpc; use crate::traft::storage::ClusterSpace; use crate::traft::ContextCoercion as _; use crate::traft::OpDML; @@ -1423,7 +1424,63 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { continue 'governor; } - event::wait(Event::TopologyChanged).expect("Events system must be initialized"); + //////////////////////////////////////////////////////////////////////// + // applying migrations + let desired_schema_version = storage.state.desired_schema_version().unwrap(); + let replicasets = storage.replicasets.iter().unwrap().collect::<Vec<_>>(); + let mut migrations = storage.migrations.iter().unwrap().collect::<Vec<_>>(); + let commit = storage.raft.commit().unwrap().unwrap(); + for (mid, rids) in waiting_migrations(&mut migrations, &replicasets, desired_schema_version) + { + let migration = storage.migrations.get(mid).unwrap().unwrap(); + for rid in rids { + let replicaset = storage + .replicasets + .get(rid.to_string().as_str()) + .unwrap() + .unwrap(); + let peer = storage.peers.get(&replicaset.master_id).unwrap(); + let req = rpc::migration::apply::Request { + term, + commit, + timeout: SYNC_TIMEOUT, + migration_id: migration.id, + }; + let res = pool.call_and_wait(&peer.raft_id, req); + match res { + Ok(_) => { + let mut ops = UpdateOps::new(); + ops.assign("current_schema_version", migration.id).unwrap(); + let op = OpDML::update( + ClusterSpace::Replicasets, + &[replicaset.replicaset_id.clone()], + ops, + ) + .unwrap(); + node.propose_and_wait(op, Duration::MAX).unwrap().unwrap(); + tlog!( + Info, + "Migration {0} applied to replicaset {1}", + migration.id, + replicaset.replicaset_id + ); + } + Err(e) => { + tlog!( + Warning, + "Could not apply migration {0} to replicaset {1}, error: {2}", + migration.id, + replicaset.replicaset_id, + e + ); + continue 'governor; + } + } + } + } + + event::wait_any(&[Event::TopologyChanged, Event::ClusterStateChanged]) + .expect("Events system must be initialized"); } #[allow(clippy::type_complexity)] diff --git a/src/traft/rpc/migration.rs b/src/traft/rpc/migration.rs new file mode 100644 index 0000000000000000000000000000000000000000..a7ee2482c5563fbba35a58f9402e7c438212d03e --- /dev/null +++ b/src/traft/rpc/migration.rs @@ -0,0 +1,33 @@ +pub mod apply { + use crate::traft::{error::Error, node, rpc::sync, RaftIndex, RaftTerm, Result}; + use std::time::Duration; + + crate::define_rpc_request! { + fn proc_apply_migration(req: Request) -> Result<Response> { + let node = node::global()?; + node.status().check_term(req.term)?; + sync::wait_for_index_timeout(req.commit, &node.storage.raft, req.timeout)?; + + let storage = &node.storage; + + match storage.migrations.get(req.migration_id)? { + Some(migration) => { + match crate::tarantool::exec(migration.body.as_str()) { + Ok(_) => Ok(Response{}), + Err(e) => Err(e.into()) + } + } + None => Err(Error::other(format!("Migration {0} not found", req.migration_id))), + } + } + + pub struct Request { + pub term: RaftTerm, + pub commit: RaftIndex, + pub timeout: Duration, + pub migration_id: u64, + } + + pub struct Response {} + } +} diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs index 15eb371a09d07dddc7b2baebf155f26224197f8f..57f09852a2681958408f3fa6577af38e2e890d48 100644 --- a/src/traft/rpc/mod.rs +++ b/src/traft/rpc/mod.rs @@ -11,6 +11,7 @@ use std::time::Duration; use serde::de::DeserializeOwned; pub mod expel; +pub mod migration; pub mod replication; pub mod sharding; pub mod sync; diff --git a/src/traft/storage.rs b/src/traft/storage.rs index eadda6be32a7107c1c3cafcd8f2a3ce0b00ef228..4c8a806afd553d979b8ac206675dfa8f047a212b 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -168,9 +168,8 @@ impl State { Ok(res) } - #[allow(dead_code)] #[inline] - pub fn desired_schema_version(&self) -> tarantool::Result<usize> { + pub fn desired_schema_version(&self) -> tarantool::Result<u64> { let res = self .get(StateKey::DesiredSchemaVersion)? .unwrap_or_default(); @@ -675,6 +674,33 @@ impl Migrations { None => Ok(None), } } + + pub fn iter(&self) -> Result<MigrationIter> { + let iter = self.space.select(IteratorType::All, &())?; + Ok(iter.into()) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// MigrationtIter +//////////////////////////////////////////////////////////////////////////////// + +pub struct MigrationIter { + iter: IndexIterator, +} + +impl From<IndexIterator> for MigrationIter { + fn from(iter: IndexIterator) -> Self { + Self { iter } + } +} + +impl Iterator for MigrationIter { + type Item = traft::Migration; + fn next(&mut self) -> Option<Self::Item> { + let res = self.iter.next().as_ref().map(Tuple::decode); + res.map(|res| res.expect("migration should decode correctly")) + } } //////////////////////////////////////////////////////////////////////////////// diff --git a/test/int/test_migration.py b/test/int/test_migration.py index 18cce7f08507a1b21b31962dc71ef37b50bcb629..812ef465c6d1fcb91cd6c63a22d4b7f1b4fa4d55 100644 --- a/test/int/test_migration.py +++ b/test/int/test_migration.py @@ -1,4 +1,5 @@ from conftest import Cluster +import funcy # type: ignore def test_add_migration(cluster: Cluster): @@ -17,3 +18,49 @@ def test_push_schema_version(cluster: Cluster): i1.eval("picolib.push_schema_version(3)") key = "desired_schema_version" assert [[key, 3]] == i2.call("box.space.cluster_state:select", [key]) + + +def test_apply_migrations(cluster: Cluster): + # Scenario: apply migration to cluster + # Given a cluster with added migration + # When it push up desired_schema_version + # Then the migration is applied + # And replicaset's current_schema_version is set to desired_schema_version + + cluster.deploy(instance_count=3) + i1, _, _ = cluster.instances + i1.promote_or_fail() + i1.assert_raft_status("Leader") + i1.call( + "picolib.add_migration", + 1, + """ + box.schema.space.create('test_space', { + format = { + {name = 'id', type = 'unsigned'}, + {name = 'value', type = 'string'}, + } + }) + """, + ) + i1.call( + "picolib.add_migration", + 2, + "box.space.test_space:create_index('pk', {parts = {'id'}})", + ) + + i1.call("picolib.push_schema_version", 2) + + @funcy.retry(tries=30, timeout=0.2) # type: ignore + def assert_space_insert(conn): + assert conn.insert("test_space", [1, "foo"]) + + @funcy.retry(tries=30, timeout=0.2) # type: ignore + def assert_replicaset_version(conn): + position = conn.schema.get_field("replicasets", "current_schema_version")["id"] + assert [2, 2, 2] == [tuple[position] for tuple in conn.select("replicasets")] + + for i in cluster.instances: + with i.connect(timeout=1) as conn: + assert_space_insert(conn) + assert_replicaset_version(conn)