diff --git a/src/lib.rs b/src/lib.rs index e648d2e14d505da762554b74eee570cde4343c2c..d245689b2d074658c8c892f5286f554a423ebbf6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -843,7 +843,7 @@ fn start_boot(args: &args::Run) { init_entries_push_op( op::Dml::insert( ClusterwideSpace::Property, - &(PropertyName::DesiredSchemaVersion, 0), + &(PropertyName::CurrentSchemaVersion, 0), ) .expect("cannot fail") .into(), diff --git a/src/schema.rs b/src/schema.rs index e354b9145c36bbbc799cbb6ae10e1c3b4bc9d17f..d21eb8a298bf88719ad95bfc0a0d19cb88049991 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -19,6 +19,11 @@ pub struct SpaceDef { } impl Encode for SpaceDef {} +impl SpaceDef { + // Don't forget to update this, if fields of `SpaceDef` change. + pub const FIELD_OPERABLE: usize = 5; +} + /// Defines how to distribute tuples in a space across replicasets. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] #[serde(rename_all = "snake_case")] @@ -73,3 +78,8 @@ pub struct IndexDef { pub operable: bool, } impl Encode for IndexDef {} + +impl IndexDef { + // Don't forget to update this, if fields of `IndexDef` change. + pub const FIELD_OPERABLE: usize = 6; +} diff --git a/src/storage.rs b/src/storage.rs index 645f66cb1d44b819c6da8c1003dfae6be0af9fc5..7543a5effacfb6dbac2ba77751592ef6f77de668 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,6 +1,7 @@ use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType}; use ::tarantool::msgpack::{ArrayWriter, ValueIter}; -use ::tarantool::space::{FieldType, Space, SpaceId}; +use ::tarantool::space::UpdateOps; +use ::tarantool::space::{FieldType, Space, SpaceId, SystemSpace}; use ::tarantool::tuple::KeyDef; use ::tarantool::tuple::{Decode, DecodeOwned, Encode}; use ::tarantool::tuple::{RawBytes, ToTupleBuffer, Tuple, TupleBuffer}; @@ -617,13 +618,18 @@ impl Properties { } } - #[allow(dead_code)] #[inline] pub fn put(&self, key: PropertyName, value: &impl serde::Serialize) -> tarantool::Result<()> { self.space.put(&(key, value))?; Ok(()) } + #[inline] + pub fn delete(&self, key: PropertyName) -> tarantool::Result<()> { + self.space.delete(&[key])?; + Ok(()) + } + #[inline] pub fn vshard_bootstrapped(&self) -> tarantool::Result<bool> { Ok(self @@ -822,8 +828,7 @@ impl Instances { Ok(()) } - /// Find a instance by `raft_id` and return a single field specified by `F` - /// (see `InstanceFieldDef` & `instance_field` module). + /// Find a instance by `id` (see trait [`InstanceId`]). #[inline(always)] pub fn get(&self, id: &impl InstanceId) -> Result<Instance> { let res = id @@ -1257,12 +1262,31 @@ impl Spaces { } } + #[inline] + pub fn put(&self, space_def: &SpaceDef) -> tarantool::Result<()> { + self.space.replace(space_def)?; + Ok(()) + } + #[inline] pub fn insert(&self, space_def: &SpaceDef) -> tarantool::Result<()> { self.space.insert(space_def)?; Ok(()) } + #[inline] + pub fn update_operable(&self, id: SpaceId, operable: bool) -> tarantool::Result<()> { + let mut ops = UpdateOps::with_capacity(1); + ops.assign(SpaceDef::FIELD_OPERABLE, operable)?; + self.space.update(&[id], ops)?; + Ok(()) + } + + #[inline] + pub fn delete(&self, id: SpaceId) -> tarantool::Result<Option<Tuple>> { + self.space.delete(&[id]) + } + #[inline] pub fn by_name(&self, name: String) -> tarantool::Result<Option<SpaceDef>> { match self.index_name.get(&[name])? { @@ -1322,12 +1346,36 @@ impl Indexes { } } + #[inline] + pub fn put(&self, index_def: &IndexDef) -> tarantool::Result<()> { + self.space.replace(index_def)?; + Ok(()) + } + #[inline] pub fn insert(&self, index_def: &IndexDef) -> tarantool::Result<()> { self.space.insert(index_def)?; Ok(()) } + #[inline] + pub fn update_operable( + &self, + space_id: SpaceId, + index_id: IndexId, + operable: bool, + ) -> tarantool::Result<()> { + let mut ops = UpdateOps::with_capacity(1); + ops.assign(IndexDef::FIELD_OPERABLE, operable)?; + self.space.update(&(space_id, index_id), ops)?; + Ok(()) + } + + #[inline] + pub fn delete(&self, space_id: SpaceId, index_id: IndexId) -> tarantool::Result<Option<Tuple>> { + self.space.delete(&[space_id, index_id]) + } + #[inline] pub fn by_name(&self, space_id: SpaceId, name: String) -> tarantool::Result<Option<IndexDef>> { match self.index_name.get(&(space_id, name))? { @@ -1337,6 +1385,24 @@ impl Indexes { } } +pub fn pico_schema_version() -> tarantool::Result<u64> { + let space_schema = Space::from(SystemSpace::Schema); + let tuple = space_schema.get(&["pico_schema_version"])?; + let mut res = 0; + if let Some(tuple) = tuple { + if let Some(v) = tuple.field(1)? { + res = v; + } + } + Ok(res) +} + +pub fn set_pico_schema_version(v: u64) -> tarantool::Result<()> { + let space_schema = Space::from(SystemSpace::Schema); + space_schema.insert(&("pico_schema_version", v))?; + Ok(()) +} + //////////////////////////////////////////////////////////////////////////////// // tests //////////////////////////////////////////////////////////////////////////////// diff --git a/src/traft/node.rs b/src/traft/node.rs index a8a6f075f8022a51b339b18b05854eb3b2be4252..b63fa8a19f13abce0aad03faf46e207ae5e45625 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,16 +11,19 @@ use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; +use crate::schema::{IndexDef, SpaceDef}; use crate::storage::ToEntryIter as _; +use crate::storage::{pico_schema_version, set_pico_schema_version}; use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName}; use crate::stringify_cfunc; +use crate::tarantool::eval as lua_eval; use crate::tlog; use crate::traft; use crate::traft::error::Error; use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; -use crate::traft::op::{Dml, Op, OpResult, PersistInstance}; +use crate::traft::op::{Ddl, Dml, Op, OpResult, PersistInstance}; use crate::traft::rpc::{join, lsn, update_instance}; use crate::traft::Address; use crate::traft::ConnectionPool; @@ -729,14 +732,6 @@ impl NodeImpl { tlog!(Debug, "applying entry: {op}"; "index" => index); match &op { - Op::DdlCommit => { - // TODO: - // if box.space._schema:get('pico_schema_change') < - // pico.space.property:get('pending_schema_version') - // then - // return true -- wait_lsn - todo!(); - } Op::PersistInstance(PersistInstance(instance)) => { *wake_governor = true; storage_changes.insert(ClusterwideSpace::Instance.into()); @@ -757,8 +752,126 @@ impl NodeImpl { _ => {} } + let storage_properties = &self.storage.properties; + // apply the operation - let result = self.apply_op(op).expect("storage error"); + let mut result = Box::new(()) as Box<dyn AnyWithTypeName>; + match op { + Op::Nop => {} + Op::PersistInstance(op) => { + let instance = op.0; + self.storage.instances.put(&instance).unwrap(); + result = instance as _; + } + Op::Dml(op) => { + let res = match op { + Dml::Insert { space, tuple } => space.insert(&tuple).map(Some), + Dml::Replace { space, tuple } => space.replace(&tuple).map(Some), + Dml::Update { space, key, ops } => space.primary_index().update(&key, &ops), + Dml::Delete { space, key } => space.primary_index().delete(&key), + }; + result = Box::new(res) as _; + } + Op::DdlPrepare { + ddl, + schema_version, + } => { + self.apply_op_ddl_prepare(ddl, schema_version) + .expect("storage error"); + } + Op::DdlCommit => { + let current_version_in_tarantool = pico_schema_version().expect("storage error"); + let pending_version = storage_properties + .pending_schema_version() + .expect("storage error") + .expect("granted we don't mess up log compaction, this should not be None"); + if current_version_in_tarantool < pending_version { + // TODO: check if replication master in _pico_replicaset + let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + if is_master { + set_pico_schema_version(pending_version).expect("storage error"); + // TODO: after this current_version_in_tarantool must be >= pending_version + } else { + // wait_lsn + return true; + } + } + + let ddl = storage_properties + .pending_schema_change() + .expect("storage error") + .expect("granted we don't mess up log compaction, this should not be None"); + match ddl { + Ddl::CreateSpace { id, .. } => { + self.storage + .spaces + .update_operable(id, true) + .expect("storage error"); + self.storage + .indexes + .update_operable(id, 0, true) + .expect("storage error"); + } + _ => { + todo!() + } + } + + storage_properties + .delete(PropertyName::PendingSchemaChange) + .expect("storage error"); + storage_properties + .delete(PropertyName::PendingSchemaVersion) + .expect("storage error"); + storage_properties + .put(PropertyName::CurrentSchemaVersion, &pending_version) + .expect("storage error"); + } + Op::DdlAbort => { + let current_version_in_tarantool = pico_schema_version().expect("storage error"); + let pending_version: u64 = storage_properties + .pending_schema_version() + .expect("storage error") + .expect("granted we don't mess up log compaction, this should not be None"); + // This condition means, schema versions must always increase + // even after an DdlAbort + if current_version_in_tarantool == pending_version { + // TODO: check if replication master in _pico_replicaset + let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + if is_master { + let current_version = storage_properties + .current_schema_version() + .expect("storage error"); + set_pico_schema_version(current_version).expect("storage error"); + // TODO: after this current_version must be == pending_version + } else { + // wait_lsn + return true; + } + } + + let ddl = storage_properties + .pending_schema_change() + .expect("storage error") + .expect("granted we don't mess up log compaction, this should not be None"); + match ddl { + Ddl::CreateSpace { id, .. } => { + self.storage.indexes.delete(id, 0).expect("storage error"); + self.storage.spaces.delete(id).expect("storage error"); + } + _ => { + todo!() + } + } + + storage_properties + .delete(PropertyName::PendingSchemaChange) + .expect("storage error"); + storage_properties + .delete(PropertyName::PendingSchemaVersion) + .expect("storage error"); + } + } if let Some(lc) = &lc { if let Some(notify) = self.notifications.remove(lc) { @@ -779,20 +892,66 @@ impl NodeImpl { false } - fn apply_op(&self, op: Op) -> traft::Result<Box<dyn AnyWithTypeName>> { - let res = match op { - Op::Nop => Box::new(()), - Op::PersistInstance(op) => { - let instance = op.result(); - self.storage.instances.put(&instance).unwrap(); - instance as Box<dyn AnyWithTypeName> + fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + + match ddl.clone() { + Ddl::CreateSpace { + id, + name, + format, + primary_key, + distribution, + } => { + let space_def = SpaceDef { + id, + name, + distribution, + schema_version, + format, + operable: false, + }; + self.storage.spaces.insert(&space_def)?; + + let index_def = IndexDef { + id: 0, + name: "primary_key".into(), + space_id: id, + schema_version, + parts: primary_key, + operable: false, + // TODO: support other cases + local: true, + }; + self.storage.indexes.insert(&index_def)?; } - Op::Dml(op) => op.result().into_box_dyn_any(), - Op::DdlPrepare { .. } => todo!(), - Op::DdlCommit => todo!(), - Op::DdlAbort => todo!(), - }; - Ok(res) + + Ddl::CreateIndex { + space_id, + index_id, + by_fields, + } => { + let _ = (space_id, index_id, by_fields); + todo!(); + } + Ddl::DropSpace { id } => { + let _ = id; + todo!(); + } + Ddl::DropIndex { index_id, space_id } => { + let _ = (index_id, space_id); + todo!(); + } + } + + self.storage + .properties + .put(PropertyName::PendingSchemaChange, &ddl)?; + self.storage + .properties + .put(PropertyName::PendingSchemaVersion, &schema_version)?; + + Ok(()) } /// Is called during a transaction diff --git a/src/traft/op.rs b/src/traft/op.rs index 2cc353f093f3f66a8ee1923cbddf4ed7ff6877cd..24cae792ee1d5e04518285dcde420fc691ca5cee 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -145,9 +145,12 @@ impl std::fmt::Display for Op { } } +// TODO: remove this impl OpResult for Op { type Result = (); - fn result(&self) -> Self::Result {} + fn result(&self) -> Self::Result { + unreachable!() + } } //////////////////////////////////////////////////////////////////////////////// @@ -163,10 +166,11 @@ impl PersistInstance { } } +// TODO: remove this impl OpResult for PersistInstance { type Result = Box<Instance>; fn result(&self) -> Self::Result { - self.0.clone() + unreachable!() } } @@ -221,15 +225,11 @@ pub enum Dml { } } +// TODO: remove this impl OpResult for Dml { type Result = tarantool::Result<Option<Tuple>>; fn result(&self) -> Self::Result { - match self { - Self::Insert { space, tuple } => space.insert(tuple).map(Some), - Self::Replace { space, tuple } => space.replace(tuple).map(Some), - Self::Update { space, key, ops } => space.primary_index().update(key, ops), - Self::Delete { space, key } => space.primary_index().delete(key), - } + unreachable!() } } diff --git a/test/int/test_basics.py b/test/int/test_basics.py index dc010a9d6f39b655558a806747c2ff35fa38451e..715c4bd20ef524b1219324c952edd4419105250a 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -212,7 +212,7 @@ def test_raft_log(instance: Instance): | 1 | 1 |1.0.1|Insert(_pico_peer_address, [1,"127.0.0.1:{p}"])| | 2 | 1 |1.0.2|PersistInstance(i1, 1, r1, Offline(0), {b})| | 3 | 1 |1.0.3|Insert(_pico_property, ["replication_factor",1])| -| 4 | 1 |1.0.4|Insert(_pico_property, ["desired_schema_version",0])| +| 4 | 1 |1.0.4|Insert(_pico_property, ["current_schema_version",0])| | 5 | 1 | |AddNode(1)| | 6 | 2 | |-| | 7 | 2 |1.1.1|PersistInstance(i1, 1, r1, Offline(0) -> Online(1), {b})|