diff --git a/src/schema.rs b/src/schema.rs index 333144eebd190f3499fa76b060d95f30520d76be..d0607be591e1bb12d47beb05edbab78d0ba6eb7a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1,10 +1,13 @@ +use crate::storage::set_pico_schema_version; use crate::traft; +use crate::traft::op::Ddl; use serde::{Deserialize, Serialize}; use tarantool::{ index::Metadata as IndexMetadata, index::{IndexId, Part}, schema::space::SpaceMetadata, space::{Field, SpaceId}, + space::{Space, SystemSpace}, tuple::Encode, util::Value, }; @@ -134,3 +137,22 @@ impl IndexDef { Ok(index_meta) } } + +pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + + match *ddl { + Ddl::CreateSpace { id, .. } => { + sys_index.delete(&[id, 0])?; + sys_space.delete(&[id])?; + set_pico_schema_version(version)?; + } + _ => { + todo!(); + } + } + + Ok(()) +} diff --git a/src/traft/node.rs b/src/traft/node.rs index d6a2c6ae4927d1c0ef5212f91a229f699a3d7e61..440553d2adc7280641a9bc999224f9897b7babeb 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -11,9 +11,10 @@ use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; +use crate::schema::ddl_abort_on_master; use crate::schema::{IndexDef, SpaceDef}; +use crate::storage::pico_schema_version; use crate::storage::ToEntryIter as _; -use crate::storage::{pico_schema_version, set_pico_schema_version}; use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName}; use crate::stringify_cfunc; use crate::tarantool::eval as lua_eval; @@ -798,6 +799,8 @@ impl NodeImpl { .pending_schema_version() .expect("storage error") .expect("granted we don't mess up log compaction, this should not be None"); + // This instance is catching up to the cluster and must sync + // with replication master on it's own. if current_version_in_tarantool < pending_version { // TODO: check if replication master in _pico_replicaset let not_master = lua_eval::<bool>("return box.info.ro").expect("lua error"); @@ -806,6 +809,7 @@ impl NodeImpl { } } + // Update pico metadata. let ddl = storage_properties .pending_schema_change() .expect("storage error") @@ -826,6 +830,10 @@ impl NodeImpl { } } + // Update tarantool metadata. + // This instance is catching up to the cluster and is a + // replication master, so it must apply the schema change on it's + // own. // FIXME: copy-pasted from above if current_version_in_tarantool < pending_version { // TODO: check if replication master in _pico_replicaset @@ -836,9 +844,14 @@ impl NodeImpl { .expect("storage error"); match resp { ddl_apply::Response::Abort { reason } => { + // There's no risk to brick the cluster at this point because + // the governor would have made sure the ddl applies on all + // active (at the time) instances. This instance is just catching + // up to the cluster and this failure will be at startup time. tlog!(Critical, "failed applying already committed ddl operation: {reason}"; "ddl" => ?ddl, ); + // TODO: add support to mitigate these failures panic!("abort of a committed operation"); } ddl_apply::Response::Ok => {} @@ -867,18 +880,13 @@ impl NodeImpl { if current_version_in_tarantool == pending_version { // TODO: check if replication master in _pico_replicaset let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); - if is_master { - let current_version = storage_properties - .current_schema_version() - .expect("storage error"); - set_pico_schema_version(current_version).expect("storage error"); - // TODO: after this current_version must be == pending_version - } else { + if !is_master { // wait_lsn return true; } } + // Update pico metadata. let ddl = storage_properties .pending_schema_change() .expect("storage error") @@ -893,6 +901,19 @@ impl NodeImpl { } } + // Update tarantool metadata. + // FIXME: copy-pasted from above + if current_version_in_tarantool == pending_version { + // TODO: check if replication master in _pico_replicaset + let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + if is_master { + let current_version = storage_properties + .current_schema_version() + .expect("storage error"); + ddl_abort_on_master(&ddl, current_version).expect("storage error"); + } + } + storage_properties .delete(PropertyName::PendingSchemaChange) .expect("storage error"); diff --git a/src/traft/op.rs b/src/traft/op.rs index 24cae792ee1d5e04518285dcde420fc691ca5cee..8b5f09b36d2ed8255069b5cd6a714ecba742c879 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; // OpResult //////////////////////////////////////////////////////////////////////////////// +// TODO: remove this trait completely. pub trait OpResult { type Result: 'static; // FIXME: this signature makes it look like result of any operation depends diff --git a/src/traft/rpc/ddl_apply.rs b/src/traft/rpc/ddl_apply.rs index 9e5dbc1d5bc78259c6ec45ac450a0aacd03fe846..0bf13439932036f76d5ca18c926e80408aa3e0fc 100644 --- a/src/traft/rpc/ddl_apply.rs +++ b/src/traft/rpc/ddl_apply.rs @@ -69,6 +69,7 @@ crate::define_rpc_request! { } } +// TODO: move this to crate::schema maybe? pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Result<Response> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); let sys_space = Space::from(SystemSpace::Space); diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index ee6ccfe053a54b261b6c07bc1f4746f4fe0ede85..d05c823df48775c1fa1441da5fbcf499edb376a6 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -1,7 +1,7 @@ from conftest import Cluster -def test_ddl_create_space_basic(cluster: Cluster): +def test_ddl_create_space_bulky(cluster: Cluster): # TODO: add 2 more instances, to check that another replicaset is handled # correctly i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2) @@ -125,4 +125,41 @@ def test_ddl_create_space_basic(cluster: Cluster): assert i4.call("box.space._index:get", [666, 0]) == index_meta -# TODO: test schema change applied only on part of instances and failed on some +def test_ddl_create_space_partial_failure(cluster: Cluster): + i1, i2, i3 = cluster.deploy(instance_count=3) + + # Create a space on one instance + # which will conflict with the clusterwide space. + i3.eval("box.schema.space.create(...)", "space_name_conflict") + + # Propose a space creation which will fail + op = dict( + kind="ddl_prepare", + schema_version=1, + ddl=dict( + kind="create_space", + id=666, + name="space_name_conflict", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=[dict(field=1, type="unsigned")], + distribution=dict(kind="global"), + ), + ) + # TODO: rewrite the test using pico.cas, when it supports ddl + prepare_index = i1.call("pico.raft_propose", op) + abort_index = prepare_index + 1 + + i1.call(".proc_sync_raft", abort_index, (3, 0)) + i2.call(".proc_sync_raft", abort_index, (3, 0)) + i3.call(".proc_sync_raft", abort_index, (3, 0)) + + # No space was created + assert i1.call("box.space._picodata_space:get", 666) is None + assert i1.call("box.space._space:get", 666) is None + assert i2.call("box.space._picodata_space:get", 666) is None + assert i2.call("box.space._space:get", 666) is None + assert i3.call("box.space._picodata_space:get", 666) is None + assert i3.call("box.space._space:get", 666) is None + + # TODO: add instance which will conflict with this ddl and make sure it + # panics