Skip to content
Snippets Groups Projects
Commit 605accc2 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

feat: support ddl create space in snapshot

parent e3dc8a74
No related branches found
No related tags found
1 merge request!516OpDdl create space
......@@ -122,7 +122,7 @@ impl IndexDef {
// Don't forget to update this, if fields of `IndexDef` change.
pub const FIELD_OPERABLE: usize = 6;
pub fn to_index_metadata(&self) -> traft::Result<IndexMetadata> {
pub fn to_index_metadata(&self) -> IndexMetadata {
use tarantool::index::IndexType;
let index_meta = IndexMetadata {
......@@ -134,7 +134,7 @@ impl IndexDef {
parts: self.parts.clone(),
};
Ok(index_meta)
index_meta
}
}
......
use ::tarantool::error::Error as TntError;
use ::tarantool::index::{Index, IndexId, IndexIterator, IteratorType};
use ::tarantool::msgpack::{ArrayWriter, ValueIter};
use ::tarantool::space::UpdateOps;
......@@ -412,16 +413,61 @@ define_clusterwide_spaces! {
}
impl Clusterwide {
pub fn apply_snapshot_data(&self, raw_data: &[u8]) -> tarantool::Result<()> {
pub fn apply_snapshot_data(&self, raw_data: &[u8], is_master: bool) -> Result<()> {
let data = SnapshotData::decode(raw_data)?;
for space_dump in &data.space_dumps {
let space = self.space(space_dump.space);
space.truncate()?;
let tuples = space_dump.tuples.as_ref();
for tuple in ValueIter::from_array(tuples)? {
for tuple in ValueIter::from_array(tuples).map_err(TntError::from)? {
space.insert(RawBytes::new(tuple))?;
}
}
if !is_master {
return Ok(());
}
let sys_space = Space::from(SystemSpace::Space);
let sys_index = Space::from(SystemSpace::Index);
let mut new_pico_schema_version = pico_schema_version()?;
for space_def in self.spaces.iter()? {
if !space_def.operable {
// This means a ddl operation wasn't committed yet. We probably
// don't want unfinished ddl artifacts comming over the snapshot
// so this will likely never happen.
crate::warn_or_panic!(
"unfinished ddl operation arrived via snapshot: {space_def:?}"
);
continue;
}
let Some(index_def) = self.indexes.get(space_def.id, 0)? else {
crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}");
continue;
};
// XXX: this logic is duplicated in proc_apply_schema_change, but
// the code is so small, it doesn't seem forth it extracting it for
// now
let space_meta = space_def.to_space_metadata()?;
let index_meta = index_def.to_index_metadata();
sys_space.replace(&space_meta)?;
sys_index.replace(&index_meta)?;
if space_def.schema_version > new_pico_schema_version {
new_pico_schema_version = space_def.schema_version;
}
}
// TODO: secondary indexes
set_pico_schema_version(new_pico_schema_version)?;
// TODO: check if a space exists here in box.space._space, but doesn't
// exist in pico._space, then delete it
Ok(())
}
}
......@@ -1296,6 +1342,15 @@ impl Spaces {
}
}
impl ToEntryIter for Spaces {
type Entry = SpaceDef;
#[inline(always)]
fn index_iter(&self) -> Result<IndexIterator> {
Ok(self.space.select(IteratorType::All, &())?)
}
}
////////////////////////////////////////////////////////////////////////////////
// Indexes
////////////////////////////////////////////////////////////////////////////////
......@@ -1757,7 +1812,7 @@ mod tests {
let raw_data = data.to_tuple_buffer().unwrap();
storage.for_each_space(|s| s.truncate()).unwrap();
if let Err(e) = storage.apply_snapshot_data(raw_data.as_ref()) {
if let Err(e) = storage.apply_snapshot_data(raw_data.as_ref(), true) {
println!("{e}");
panic!();
}
......
......@@ -98,6 +98,12 @@ impl From<::tarantool::network::Error> for Error {
}
}
impl From<::tarantool::error::TransactionError> for Error {
fn from(err: ::tarantool::error::TransactionError) -> Self {
Self::Tarantool(err.into())
}
}
#[derive(Debug, Error)]
pub enum CoercionError {
#[error("unknown entry type ({0})")]
......
......@@ -1126,7 +1126,7 @@ impl NodeImpl {
// This is a snapshot, we need to apply the snapshot at first.
let snapshot = ready.snapshot();
if !snapshot.is_empty() {
if let Err(e) = start_transaction(|| -> tarantool::Result<()> {
if let Err(e) = start_transaction(|| -> traft::Result<()> {
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
// FIXME: apply_snapshot_data calls truncate on clusterwide
......@@ -1138,7 +1138,9 @@ impl NodeImpl {
if is_readonly {
crate::tarantool::eval("box.cfg { read_only = false }")?;
}
let res = self.storage.apply_snapshot_data(snapshot.get_data());
let res = self
.storage
.apply_snapshot_data(snapshot.get_data(), !is_readonly);
if is_readonly {
crate::tarantool::exec("box.cfg { read_only = true }")?;
}
......
......@@ -95,7 +95,7 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
))
})?;
// TODO: set index parts from space format
let index_meta = index_info.to_index_metadata()?;
let index_meta = index_info.to_index_metadata();
let res = (|| -> tarantool::Result<()> {
sys_space.insert(&space_meta)?;
......
......@@ -54,6 +54,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
distribution=dict(kind="global"),
),
)
# TODO: rewrite the test using pico.cas, when it supports ddl
i1.call("pico.raft_propose", op)
# Schema version was updated
......@@ -163,3 +164,64 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
# TODO: add instance which will conflict with this ddl and make sure it
# panics
def test_ddl_from_snapshot(cluster: Cluster):
# Second instance is only for quorum
i1, i2 = cluster.deploy(instance_count=2)
i1.assert_raft_status("Leader")
# TODO: check other ddl operations
# Propose a space creation which will succeed
op = dict(
kind="ddl_prepare",
schema_version=1,
ddl=dict(
kind="create_space",
id=666,
name="stuff",
format=[dict(name="id", type="unsigned", is_nullable=False)],
primary_key=[dict(field=1, type="unsigned")],
distribution=dict(kind="global"),
),
)
# TODO: rewrite the test using pico.cas, when it supports ddl
ret = i1.call("pico.raft_propose", op)
# Make sure everyone is synchronized
i1.call(".proc_sync_raft", ret, (3, 0))
i2.call(".proc_sync_raft", ret, (3, 0))
space_meta = [
666,
1,
"stuff",
"memtx",
0,
dict(),
[dict(name="id", type="unsigned", is_nullable=False)],
]
assert i1.call("box.space._space:get", 666) == space_meta
assert i2.call("box.space._space:get", 666) == space_meta
index_meta = [
666,
0,
"primary_key",
"tree",
dict(),
[[1, "unsigned", None, None, None]],
]
assert i1.call("box.space._index:get", [666, 0]) == index_meta
assert i2.call("box.space._index:get", [666, 0]) == index_meta
# Compact the log to trigger snapshot for the newcommer
i1.raft_compact_log()
i2.raft_compact_log()
i3 = cluster.add_instance(wait_online=True)
# Check space was created from the snapshot data
assert i3.call("box.space._space:get", 666) == space_meta
assert i3.call("box.space._index:get", [666, 0]) == index_meta
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment