Skip to content
Snippets Groups Projects
Commit 44ab978b authored by Egor Ivkov's avatar Egor Ivkov
Browse files

extensive ddl tests

parent 594f33d4
No related branches found
No related tags found
No related merge requests found
Pipeline #17974 failed
use crate::storage::TClusterwideSpace as _;
use crate::storage::{ClusterwideSpace, Indexes, Properties, Spaces};
use crate::storage::{Clusterwide, TClusterwideSpace as _};
use crate::storage::{ClusterwideSpace, Indexes, Spaces};
use crate::tlog;
use crate::traft::error::Error as TraftError;
use crate::traft::node;
......@@ -121,7 +121,7 @@ crate::define_rpc_request! {
assert_eq!(entry.term, status.term);
let entry_index = entry.index;
let Some(op) = entry.into_op() else { continue };
req.predicate.check_entry(entry_index, &op, &storage.properties)?;
req.predicate.check_entry(entry_index, &op, &storage)?;
}
}
......@@ -136,7 +136,7 @@ crate::define_rpc_request! {
let Some(EntryContext::Normal(EntryContextNormal { op, .. })) = cx else {
continue;
};
req.predicate.check_entry(entry.index, &op, &storage.properties)?;
req.predicate.check_entry(entry.index, &op, &storage)?;
}
// TODO: apply to limbo first
......@@ -221,7 +221,7 @@ impl Predicate {
&self,
entry_index: RaftIndex,
entry_op: &Op,
properties: &Properties,
storage: &Clusterwide,
) -> std::result::Result<(), Error> {
let error = || Error::ConflictFound {
requested: self.index,
......@@ -265,7 +265,7 @@ impl Predicate {
.expect("keys should convert to tuple")
});
for range in &self.ranges {
if modifies_operable(entry_op, &range.space, properties) {
if modifies_operable(entry_op, &range.space, storage) {
return Err(error());
}
let Some(space) = space(entry_op) else {
......@@ -333,11 +333,19 @@ fn space(op: &Op) -> Option<ClusterwideSpace> {
}
/// Checks if the operation would by its semantics modify `operable` flag of the provided `space`.
fn modifies_operable(op: &Op, space: &str, properties: &Properties) -> bool {
fn modifies_operable(op: &Op, space: &str, storage: &Clusterwide) -> bool {
let ddl_modifies = |ddl: &Ddl| match ddl {
Ddl::CreateSpace { name, .. } => name == space,
Ddl::DropSpace { id: _id } => {
todo!("we should put clusterwide spaces defs into _picodata_space first")
Ddl::DropSpace { id } => {
if let Some(space_def) = storage
.spaces
.get(*id)
.expect("deserialization should not fail")
{
space_def.name == space
} else {
false
}
}
Ddl::CreateIndex { .. } => false,
Ddl::DropIndex { .. } => false,
......@@ -345,7 +353,8 @@ fn modifies_operable(op: &Op, space: &str, properties: &Properties) -> bool {
match op {
Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl),
Op::DdlCommit | Op::DdlAbort => {
if let Some(change) = properties
if let Some(change) = storage
.properties
.pending_schema_change()
.expect("conversion should not fail")
{
......@@ -362,47 +371,120 @@ mod tests {
use serde::Serialize;
use tarantool::tuple::ToTupleBuffer;
use crate::schema::{Distribution, SpaceDef};
use crate::storage::TClusterwideSpace as _;
use crate::storage::{Clusterwide, Properties, PropertyName};
use crate::traft::op::DdlBuilder;
use super::*;
#[::tarantool::test]
fn ddl() {
let clusterwide = Clusterwide::new().unwrap();
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
key_max: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
}],
};
let err = predicate
.check_entry(2, &Op::DdlCommit, &clusterwide.properties)
.unwrap_err();
assert_eq!(
&err.to_string(),
"comparison failed for index 1 as it conflicts with 2"
#[track_caller]
fn test<T: Serialize + ?Sized>(
op: &Op,
space: &str,
key: &T,
storage: &Clusterwide,
) -> std::result::Result<(), Error> {
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![Range {
space: space.into(),
key_min: Bound::Included((key,).to_tuple_buffer().unwrap()),
key_max: Bound::Included((key,).to_tuple_buffer().unwrap()),
}],
};
predicate.check_entry(2, op, storage)
}
let storage = Clusterwide::new().unwrap();
let builder = DdlBuilder::with_schema_version(1);
let space_name = "space1";
let space_id = 1;
let index_id = 1;
let create_space = builder.create_space(
space_id,
space_name.into(),
vec![],
vec![],
Distribution::Global,
);
let drop_space = builder.drop_space(space_id);
let create_index = builder.create_index(space_id, index_id, vec![]);
let drop_index = builder.drop_index(space_id, index_id);
let commit = Op::DdlCommit;
let abort = Op::DdlAbort;
let all = [
&create_space,
&drop_space,
&create_index,
&drop_index,
&commit,
&abort,
];
for op in all {
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::PendingSchemaChange.into(),
&storage
)
.is_err());
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::PendingSchemaVersion.into(),
&storage
)
.is_err());
assert!(test(op, Properties::SPACE_NAME, "unrelated_key", &storage).is_ok());
assert!(test(op, "unrelated_space", "unrelated_key", &storage).is_ok())
}
Predicate {
index: 1,
term: 1,
ranges: vec![],
// `DropSpace` needs `SpaceDef` to get space name
storage
.spaces
.insert(&SpaceDef {
id: space_id,
name: space_name.into(),
operable: true,
distribution: Distribution::Global,
format: vec![],
schema_version: 1,
})
.unwrap();
for op in [&create_space, &drop_space] {
assert!(test(op, space_name, "any_key", &storage).is_err());
}
for op in [&create_index, &drop_index] {
assert!(test(op, space_name, "any_key", &storage).is_ok());
}
// Abort and Commit need a pending schema change to get space name
let Op::DdlPrepare{ ddl: create_space_ddl, ..} = create_space.clone() else {
unreachable!();
};
storage
.properties
.put(PropertyName::PendingSchemaChange, &create_space_ddl)
.unwrap();
for op in [&abort, &commit] {
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::CurrentSchemaVersion.into(),
&storage
)
.is_err());
}
.check_entry(2, &Op::DdlCommit, &clusterwide.properties)
.unwrap();
}
#[::tarantool::test]
......@@ -443,11 +525,7 @@ mod tests {
TestOp::Delete => Dml::Delete { space, key },
}
})
.map(|op| {
predicate
.check_entry(2, &Op::Dml(op), &storage.properties)
.is_err()
})
.map(|op| predicate.check_entry(2, &Op::Dml(op), &storage).is_err())
.collect()
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment