Skip to content
Snippets Groups Projects

cas predicate tests and minor features

Merged Егор Ивков requested to merge cas-tests into master
Files
2
+ 253
61
use crate::storage::ClusterwideSpace;
use crate::storage::{Clusterwide, TClusterwideSpace as _};
use crate::storage::{ClusterwideSpace, Indexes, Spaces};
use crate::tlog;
use crate::traft::error::Error as TraftError;
use crate::traft::node;
use crate::traft::op::{Dml, Op};
use crate::traft::op::{Ddl, Dml, Op};
use crate::traft::Result;
use crate::traft::{EntryContext, EntryContextNormal};
use crate::traft::{RaftIndex, RaftTerm};
@@ -17,10 +18,13 @@ use tarantool::tuple::{KeyDef, Tuple, TupleBuffer};
use once_cell::sync::Lazy;
const PROHIBITED_SPACES: &[&str] = &[Spaces::SPACE_NAME, Indexes::SPACE_NAME];
crate::define_rpc_request! {
fn proc_cas(req: Request) -> Result<Response> {
let node = node::global()?;
let raft_storage = &node.raft_storage;
let storage = &node.storage;
let cluster_id = raft_storage
.cluster_id()?
.expect("cluster_id is set on boot");
@@ -76,6 +80,14 @@ crate::define_rpc_request! {
let last_persisted = raft::Storage::last_index(raft_storage)?;
assert!(last_persisted <= last);
// Check if ranges in predicate contain prohibited spaces.
for range in &req.predicate.ranges {
if PROHIBITED_SPACES.contains(&range.space.as_str())
{
return Err(Error::SpaceNotAllowed { space: range.space.clone() }.into())
}
}
// It's tempting to just use `raft_log.entries()` here and only
// write the body of the loop once, but this would mean
// converting entries from our storage representation to raft-rs
@@ -109,7 +121,7 @@ crate::define_rpc_request! {
assert_eq!(entry.term, status.term);
let entry_index = entry.index;
let Some(op) = entry.into_op() else { continue };
req.predicate.check_entry(entry_index, &op)?;
req.predicate.check_entry(entry_index, &op, storage)?;
}
}
@@ -124,7 +136,7 @@ crate::define_rpc_request! {
let Some(EntryContext::Normal(EntryContextNormal { op, .. })) = cx else {
continue;
};
req.predicate.check_entry(entry.index, &op)?;
req.predicate.check_entry(entry.index, &op, storage)?;
}
// TODO: apply to limbo first
@@ -179,11 +191,14 @@ pub enum Error {
/// Checking the predicate revealed a collision.
#[error("comparison failed for index {requested} as it conflicts with {conflict_index}")]
Rejected {
ConflictFound {
requested: RaftIndex,
conflict_index: RaftIndex,
},
#[error("space {space} is prohibited for use in a predicate")]
SpaceNotAllowed { space: String },
/// An error related to `key_def` operation arised from tarantool
/// depths while checking the predicate.
#[error("failed comparing predicate ranges: {0}")]
@@ -202,12 +217,14 @@ pub struct Predicate {
}
impl Predicate {
/// Checks if `entry_op` changes anything within the ranges specified in the predicate.
pub fn check_entry(
&self,
entry_index: RaftIndex,
entry_op: &Op,
storage: &Clusterwide,
) -> std::result::Result<(), Error> {
let error = || Error::Rejected {
let error = || Error::ConflictFound {
requested: self.index,
conflict_index: entry_index,
};
@@ -249,6 +266,9 @@ impl Predicate {
.expect("keys should convert to tuple")
});
for range in &self.ranges {
if modifies_operable(entry_op, &range.space, storage) {
return Err(error());
}
let Some(space) = space(entry_op) else {
continue
};
@@ -313,78 +333,213 @@ fn space(op: &Op) -> Option<ClusterwideSpace> {
}
}
/// Checks if the operation would by its semantics modify `operable` flag of the provided `space`.
fn modifies_operable(op: &Op, space: &str, storage: &Clusterwide) -> bool {
let ddl_modifies = |ddl: &Ddl| match ddl {
Ddl::CreateSpace { name, .. } => name == space,
Ddl::DropSpace { id } => {
if let Some(space_def) = storage
.spaces
.get(*id)
.expect("deserialization should not fail")
{
space_def.name == space
} else {
false
}
}
Ddl::CreateIndex { .. } => false,
Ddl::DropIndex { .. } => false,
};
match op {
Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl),
Op::DdlCommit | Op::DdlAbort => {
if let Some(change) = storage
.properties
.pending_schema_change()
.expect("conversion should not fail")
{
ddl_modifies(&change)
} else {
false
}
}
_ => false,
}
}
/// Predicate tests based on the CaS Design Document.
mod tests {
use serde::Serialize;
use tarantool::tuple::ToTupleBuffer;
use crate::storage::{Clusterwide, Properties, PropertyName, TClusterwideSpace};
use crate::schema::{Distribution, SpaceDef};
use crate::storage::TClusterwideSpace as _;
use crate::storage::{Clusterwide, Properties, PropertyName};
use crate::traft::op::DdlBuilder;
use super::*;
#[::tarantool::test]
fn check_ddl_predicate() {
Clusterwide::new().unwrap();
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
key_max: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
}],
};
let err = predicate.check_entry(2, &Op::DdlCommit).unwrap_err();
assert_eq!(
&err.to_string(),
"comparison failed for index 1 as it conflicts with 2"
fn ddl() {
#[track_caller]
fn test<T: Serialize + ?Sized>(
op: &Op,
space: &str,
key: &T,
storage: &Clusterwide,
) -> std::result::Result<(), Error> {
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![Range {
space: space.into(),
key_min: Bound::Included((key,).to_tuple_buffer().unwrap()),
key_max: Bound::Included((key,).to_tuple_buffer().unwrap()),
}],
};
predicate.check_entry(2, op, storage)
}
let storage = Clusterwide::new().unwrap();
let builder = DdlBuilder::with_schema_version(1);
let space_name = "space1";
let space_id = 1;
let index_id = 1;
let create_space = builder.create_space(
space_id,
space_name.into(),
vec![],
vec![],
Distribution::Global,
);
let drop_space = builder.drop_space(space_id);
let create_index = builder.create_index(space_id, index_id, vec![]);
let drop_index = builder.drop_index(space_id, index_id);
let commit = Op::DdlCommit;
let abort = Op::DdlAbort;
let all = [
&create_space,
&drop_space,
&create_index,
&drop_index,
&commit,
&abort,
];
for op in all {
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::PendingSchemaChange.into(),
&storage
)
.is_err());
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::PendingSchemaVersion.into(),
&storage
)
.is_err());
assert!(test(op, Properties::SPACE_NAME, "unrelated_key", &storage).is_ok());
assert!(test(op, "unrelated_space", "unrelated_key", &storage).is_ok())
}
Predicate {
index: 1,
term: 1,
ranges: vec![],
// `DropSpace` needs `SpaceDef` to get space name
storage
.spaces
.insert(&SpaceDef {
id: space_id,
name: space_name.into(),
operable: true,
distribution: Distribution::Global,
format: vec![],
schema_version: 1,
})
.unwrap();
for op in [&create_space, &drop_space] {
assert!(test(op, space_name, "any_key", &storage).is_err());
}
for op in [&create_index, &drop_index] {
assert!(test(op, space_name, "any_key", &storage).is_ok());
}
// Abort and Commit need a pending schema change to get space name
let Op::DdlPrepare{ ddl: create_space_ddl, ..} = create_space.clone() else {
unreachable!();
};
storage
.properties
.put(PropertyName::PendingSchemaChange, &create_space_ddl)
.unwrap();
for op in [&abort, &commit] {
assert!(test(
op,
Properties::SPACE_NAME,
PropertyName::CurrentSchemaVersion.into(),
&storage
)
.is_err());
}
.check_entry(2, &Op::DdlCommit)
.unwrap();
}
#[::tarantool::test]
fn check_bounds() {
fn dml() {
#[derive(Debug)]
enum TestOp {
Insert,
Replace,
Update,
Delete,
}
#[track_caller]
fn test(range: Range, test_cases: &[&str]) -> Vec<bool> {
fn test<T: Serialize>(
op: &TestOp,
range: &Range,
test_cases: &[T],
storage: &Clusterwide,
) -> Vec<bool> {
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![range],
ranges: vec![range.clone()],
};
test_cases
.iter()
.map(|&case| (String::from(case),).to_tuple_buffer().unwrap())
.map(|case| (case,).to_tuple_buffer().unwrap())
.map(|key| {
predicate
.check_entry(
2,
&Op::Dml(Dml::Delete {
space: ClusterwideSpace::Property,
key,
}),
)
.is_err()
let space = ClusterwideSpace::Property;
match op {
TestOp::Insert => Dml::Insert { space, tuple: key },
TestOp::Replace => Dml::Replace { space, tuple: key },
TestOp::Update => Dml::Update {
space,
key,
ops: vec![],
},
TestOp::Delete => Dml::Delete { space, key },
}
})
.map(|op| predicate.check_entry(2, &Op::Dml(op), storage).is_err())
.collect()
}
Clusterwide::new().unwrap();
let storage = Clusterwide::new().unwrap();
let test_cases = ["a", "b", "c", "d", "e"];
let ops = &[
TestOp::Insert,
TestOp::Replace,
TestOp::Update,
TestOp::Delete,
];
// For range ["b", "d"]
// Test cases a,b,c,d,e
@@ -394,8 +549,10 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, true, false]);
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, true, false], "{op:?}");
}
// For range ("b", "d"]
// Test cases a,b,c,d,e
@@ -405,8 +562,10 @@ mod tests {
key_min: Bound::Excluded(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, false, true, true, false]);
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![false, false, true, true, false], "{op:?}");
}
// For range ["b", "d")
// Test cases a,b,c,d,e
@@ -416,8 +575,10 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Excluded(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, false, false]);
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, false, false], "{op:?}");
}
// For range _, "d"]
// Test cases a,b,c,d,e
@@ -427,8 +588,10 @@ mod tests {
key_min: Bound::Unbounded,
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![true, true, true, true, false]);
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![true, true, true, true, false], "{op:?}");
}
// For range ["b", _
// Test cases a,b,c,d,e
@@ -438,7 +601,36 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Unbounded,
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, true, true]);
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, true, true], "{op:?}");
}
// For range _, _
// Test cases a,b,c,d,e
// Error 1,1,1,1,1
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Unbounded,
key_max: Bound::Unbounded,
};
for op in ops {
let errors = test(op, &range, &test_cases, &storage);
assert_eq!(errors, vec![true, true, true, true, true], "{op:?}");
}
// Different space
// For range _, _
// Test cases a
// Error 0
let range = Range {
space: Spaces::SPACE_NAME.into(),
key_min: Bound::Unbounded,
key_max: Bound::Unbounded,
};
for op in ops {
let errors = test(op, &range, &[1], &storage);
assert_eq!(errors, vec![false], "{op:?}");
}
}
}
Loading