diff --git a/src/schema.rs b/src/schema.rs index d0607be591e1bb12d47beb05edbab78d0ba6eb7a..1417a7cf7e321c90c03b1e236d72e7a5900d06ee 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -122,7 +122,7 @@ impl IndexDef { // Don't forget to update this, if fields of `IndexDef` change. pub const FIELD_OPERABLE: usize = 6; - pub fn to_index_metadata(&self) -> traft::Result<IndexMetadata> { + pub fn to_index_metadata(&self) -> IndexMetadata { use tarantool::index::IndexType; let index_meta = IndexMetadata { @@ -134,7 +134,7 @@ impl IndexDef { parts: self.parts.clone(), }; - Ok(index_meta) + index_meta } } diff --git a/src/storage.rs b/src/storage.rs index 84ce0836156a792142767ccfa3b550b75336aa0b..b781244af76bb628bd86719ace7c9172b536a476 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1,3 +1,4 @@ +use ::tarantool::error::Error as TntError; use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType}; use ::tarantool::msgpack::{ArrayWriter, ValueIter}; use ::tarantool::space::UpdateOps; @@ -412,16 +413,61 @@ define_clusterwide_spaces! { } impl Clusterwide { - pub fn apply_snapshot_data(&self, raw_data: &[u8]) -> tarantool::Result<()> { + pub fn apply_snapshot_data(&self, raw_data: &[u8], is_master: bool) -> Result<()> { let data = SnapshotData::decode(raw_data)?; for space_dump in &data.space_dumps { let space = self.space(space_dump.space); space.truncate()?; let tuples = space_dump.tuples.as_ref(); - for tuple in ValueIter::from_array(tuples)? { + for tuple in ValueIter::from_array(tuples).map_err(TntError::from)? { space.insert(RawBytes::new(tuple))?; } } + + if !is_master { + return Ok(()); + } + + let sys_space = Space::from(SystemSpace::Space); + let sys_index = Space::from(SystemSpace::Index); + let mut new_pico_schema_version = pico_schema_version()?; + + for space_def in self.spaces.iter()? { + if !space_def.operable { + // This means a ddl operation wasn't committed yet. We probably + // don't want unfinished ddl artifacts comming over the snapshot + // so this will likely never happen. + crate::warn_or_panic!( + "unfinished ddl operation arrived via snapshot: {space_def:?}" + ); + continue; + } + + let Some(index_def) = self.indexes.get(space_def.id, 0)? else { + crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}"); + continue; + }; + + // XXX: this logic is duplicated in proc_apply_schema_change, but + // the code is so small, it doesn't seem forth it extracting it for + // now + let space_meta = space_def.to_space_metadata()?; + let index_meta = index_def.to_index_metadata(); + + sys_space.replace(&space_meta)?; + sys_index.replace(&index_meta)?; + if space_def.schema_version > new_pico_schema_version { + new_pico_schema_version = space_def.schema_version; + } + } + + // TODO: secondary indexes + + set_pico_schema_version(new_pico_schema_version)?; + + // TODO: check if a space exists here in box.space._space, but doesn't + // exist in pico._space, then delete it + Ok(()) } } @@ -1296,6 +1342,15 @@ impl Spaces { } } +impl ToEntryIter for Spaces { + type Entry = SpaceDef; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) + } +} + //////////////////////////////////////////////////////////////////////////////// // Indexes //////////////////////////////////////////////////////////////////////////////// @@ -1757,7 +1812,7 @@ mod tests { let raw_data = data.to_tuple_buffer().unwrap(); storage.for_each_space(|s| s.truncate()).unwrap(); - if let Err(e) = storage.apply_snapshot_data(raw_data.as_ref()) { + if let Err(e) = storage.apply_snapshot_data(raw_data.as_ref(), true) { println!("{e}"); panic!(); } diff --git a/src/traft/error.rs b/src/traft/error.rs index b3e74cb8d7237fdd573dc0b1f1c8a1997ea703f8..8e4aad3479f6646974220797b75d74d23ce8f5fc 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -98,6 +98,12 @@ impl From<::tarantool::network::Error> for Error { } } +impl From<::tarantool::error::TransactionError> for Error { + fn from(err: ::tarantool::error::TransactionError) -> Self { + Self::Tarantool(err.into()) + } +} + #[derive(Debug, Error)] pub enum CoercionError { #[error("unknown entry type ({0})")] diff --git a/src/traft/node.rs b/src/traft/node.rs index 961db7576a48788940a2ef101fc58504848981d8..167c42527944c778489f002cf43d6d9957dd794a 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -1126,7 +1126,7 @@ impl NodeImpl { // This is a snapshot, we need to apply the snapshot at first. let snapshot = ready.snapshot(); if !snapshot.is_empty() { - if let Err(e) = start_transaction(|| -> tarantool::Result<()> { + if let Err(e) = start_transaction(|| -> traft::Result<()> { let meta = snapshot.get_metadata(); self.raft_storage.handle_snapshot_metadata(meta)?; // FIXME: apply_snapshot_data calls truncate on clusterwide @@ -1138,7 +1138,9 @@ impl NodeImpl { if is_readonly { crate::tarantool::eval("box.cfg { read_only = false }")?; } - let res = self.storage.apply_snapshot_data(snapshot.get_data()); + let res = self + .storage + .apply_snapshot_data(snapshot.get_data(), !is_readonly); if is_readonly { crate::tarantool::exec("box.cfg { read_only = true }")?; } diff --git a/src/traft/rpc/ddl_apply.rs b/src/traft/rpc/ddl_apply.rs index 706b8cb41ee4637320339cd81033113c3e52a34d..6b1e5629e94a43d8d23c335fd30c704f419a21a7 100644 --- a/src/traft/rpc/ddl_apply.rs +++ b/src/traft/rpc/ddl_apply.rs @@ -95,7 +95,7 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re )) })?; // TODO: set index parts from space format - let index_meta = index_info.to_index_metadata()?; + let index_meta = index_info.to_index_metadata(); let res = (|| -> tarantool::Result<()> { sys_space.insert(&space_meta)?; diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index d05c823df48775c1fa1441da5fbcf499edb376a6..f23164cbfbe7b7368f36a01350da7dc646818390 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -54,6 +54,7 @@ def test_ddl_create_space_bulky(cluster: Cluster): distribution=dict(kind="global"), ), ) + # TODO: rewrite the test using pico.cas, when it supports ddl i1.call("pico.raft_propose", op) # Schema version was updated @@ -163,3 +164,64 @@ def test_ddl_create_space_partial_failure(cluster: Cluster): # TODO: add instance which will conflict with this ddl and make sure it # panics + + +def test_ddl_from_snapshot(cluster: Cluster): + # Second instance is only for quorum + i1, i2 = cluster.deploy(instance_count=2) + + i1.assert_raft_status("Leader") + + # TODO: check other ddl operations + # Propose a space creation which will succeed + op = dict( + kind="ddl_prepare", + schema_version=1, + ddl=dict( + kind="create_space", + id=666, + name="stuff", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=[dict(field=1, type="unsigned")], + distribution=dict(kind="global"), + ), + ) + # TODO: rewrite the test using pico.cas, when it supports ddl + ret = i1.call("pico.raft_propose", op) + + # Make sure everyone is synchronized + i1.call(".proc_sync_raft", ret, (3, 0)) + i2.call(".proc_sync_raft", ret, (3, 0)) + + space_meta = [ + 666, + 1, + "stuff", + "memtx", + 0, + dict(), + [dict(name="id", type="unsigned", is_nullable=False)], + ] + assert i1.call("box.space._space:get", 666) == space_meta + assert i2.call("box.space._space:get", 666) == space_meta + + index_meta = [ + 666, + 0, + "primary_key", + "tree", + dict(), + [[1, "unsigned", None, None, None]], + ] + assert i1.call("box.space._index:get", [666, 0]) == index_meta + assert i2.call("box.space._index:get", [666, 0]) == index_meta + + # Compact the log to trigger snapshot for the newcommer + i1.raft_compact_log() + i2.raft_compact_log() + + i3 = cluster.add_instance(wait_online=True) + + # Check space was created from the snapshot data + assert i3.call("box.space._space:get", 666) == space_meta + assert i3.call("box.space._index:get", [666, 0]) == index_meta