Skip to content
Snippets Groups Projects

cas predicate tests and minor features

Merged Егор Ивков requested to merge cas-tests into master
Files
2
+ 265
116
use crate::storage::ClusterwideSpace;
use crate::storage::{Clusterwide, TClusterwideSpace as _};
use crate::storage::{ClusterwideSpace, Indexes, Spaces};
use crate::tlog;
use crate::traft::error::Error as TraftError;
use crate::traft::node;
use crate::traft::op::{Dml, Op};
use crate::traft::op::{Ddl, Dml, Op};
use crate::traft::Result;
use crate::traft::{EntryContext, EntryContextNormal};
use crate::traft::{RaftIndex, RaftTerm};
@@ -13,14 +14,17 @@ use ::raft::StorageError;
use tarantool::error::Error as TntError;
use tarantool::tlua;
use tarantool::tuple::{KeyDef, Tuple, TupleBuffer};
use tarantool::tuple::{KeyDef, ToTupleBuffer, Tuple, TupleBuffer};
use once_cell::sync::Lazy;
const PROHIBITED_SPACES: &[&str] = &[Spaces::SPACE_NAME, Indexes::SPACE_NAME];
crate::define_rpc_request! {
fn proc_cas(req: Request) -> Result<Response> {
let node = node::global()?;
let raft_storage = &node.raft_storage;
let storage = &node.storage;
let cluster_id = raft_storage
.cluster_id()?
.expect("cluster_id is set on boot");
@@ -76,6 +80,14 @@ crate::define_rpc_request! {
let last_persisted = raft::Storage::last_index(raft_storage)?;
assert!(last_persisted <= last);
// Check if ranges in predicate contain prohibited spaces.
for range in &req.predicate.ranges {
if PROHIBITED_SPACES.contains(&range.space.as_str())
{
return Err(Error::SpaceNotAllowed { space: range.space.clone() }.into())
}
}
// It's tempting to just use `raft_log.entries()` here and only
// write the body of the loop once, but this would mean
// converting entries from our storage representation to raft-rs
@@ -109,7 +121,7 @@ crate::define_rpc_request! {
assert_eq!(entry.term, status.term);
let entry_index = entry.index;
let Some(op) = entry.into_op() else { continue };
req.predicate.check_entry(entry_index, &op)?;
req.predicate.check_entry(entry_index, &op, storage)?;
}
}
@@ -124,7 +136,7 @@ crate::define_rpc_request! {
let Some(EntryContext::Normal(EntryContextNormal { op, .. })) = cx else {
continue;
};
req.predicate.check_entry(entry.index, &op)?;
req.predicate.check_entry(entry.index, &op, storage)?;
}
// TODO: apply to limbo first
@@ -179,11 +191,14 @@ pub enum Error {
/// Checking the predicate revealed a collision.
#[error("comparison failed for index {requested} as it conflicts with {conflict_index}")]
Rejected {
ConflictFound {
requested: RaftIndex,
conflict_index: RaftIndex,
},
#[error("space {space} is prohibited for use in a predicate")]
SpaceNotAllowed { space: String },
/// An error related to `key_def` operation arised from tarantool
/// depths while checking the predicate.
#[error("failed comparing predicate ranges: {0}")]
@@ -202,12 +217,14 @@ pub struct Predicate {
}
impl Predicate {
/// Checks if `entry_op` changes anything within the ranges specified in the predicate.
pub fn check_entry(
&self,
entry_index: RaftIndex,
entry_op: &Op,
storage: &Clusterwide,
) -> std::result::Result<(), Error> {
let error = || Error::Rejected {
let error = || Error::ConflictFound {
requested: self.index,
conflict_index: entry_index,
};
@@ -249,6 +266,9 @@ impl Predicate {
.expect("keys should convert to tuple")
});
for range in &self.ranges {
if modifies_operable(entry_op, &range.space, storage) {
return Err(error());
}
let Some(space) = space(entry_op) else {
continue
};
@@ -286,6 +306,7 @@ impl Predicate {
}
}
/// A range of keys used as an argument for a [`Predicate`].
#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize, tlua::LuaRead)]
pub struct Range {
pub space: String,
@@ -293,6 +314,62 @@ pub struct Range {
pub key_max: Bound,
}
impl Range {
/// Creates new unbounded range in `space`. Use other methods to restrict it.
///
/// # Example
/// ```
/// use picodata::traft::rpc::cas::Range;
///
/// // Creates a range for tuples with keys from 1 (excluding) to 10 (excluding)
/// let range = Range::new("my_space").gt((1,)).lt((10,));
/// ```
pub fn new(space: impl ToString) -> Self {
Self {
space: space.to_string(),
key_min: Bound::Unbounded,
key_max: Bound::Unbounded,
}
}
/// Add a "greater than" restriction.
pub fn gt(mut self, key: impl ToTupleBuffer) -> Self {
let tuple = key.to_tuple_buffer().expect("cannot fail");
self.key_min = Bound::Excluded(tuple);
self
}
/// Add a "greater or equal" restriction.
pub fn ge(mut self, key: impl ToTupleBuffer) -> Self {
let tuple = key.to_tuple_buffer().expect("cannot fail");
self.key_min = Bound::Included(tuple);
self
}
/// Add a "less than" restriction.
pub fn lt(mut self, key: impl ToTupleBuffer) -> Self {
let tuple = key.to_tuple_buffer().expect("cannot fail");
self.key_max = Bound::Excluded(tuple);
self
}
/// Add a "less or equal" restriction.
pub fn le(mut self, key: impl ToTupleBuffer) -> Self {
let tuple = key.to_tuple_buffer().expect("cannot fail");
self.key_max = Bound::Included(tuple);
self
}
/// Add a "equal" restriction.
pub fn eq(mut self, key: impl ToTupleBuffer) -> Self {
let tuple = key.to_tuple_buffer().expect("cannot fail");
self.key_min = Bound::Included(tuple.clone());
self.key_max = Bound::Included(tuple);
self
}
}
/// A bound for keys.
#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize, tlua::LuaRead)]
#[serde(rename_all = "snake_case", tag = "kind", content = "value")]
pub enum Bound {
@@ -313,132 +390,204 @@ fn space(op: &Op) -> Option<ClusterwideSpace> {
}
}
/// Checks if the operation would by its semantics modify `operable` flag of the provided `space`.
fn modifies_operable(op: &Op, space: &str, storage: &Clusterwide) -> bool {
let ddl_modifies = |ddl: &Ddl| match ddl {
Ddl::CreateSpace { name, .. } => name == space,
Ddl::DropSpace { id } => {
if let Some(space_def) = storage
.spaces
.get(*id)
.expect("deserialization should not fail")
{
space_def.name == space
} else {
false
}
}
Ddl::CreateIndex { .. } => false,
Ddl::DropIndex { .. } => false,
};
match op {
Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl),
Op::DdlCommit | Op::DdlAbort => {
if let Some(change) = storage
.properties
.pending_schema_change()
.expect("conversion should not fail")
{
ddl_modifies(&change)
} else {
false
}
}
_ => false,
}
}
/// Predicate tests based on the CaS Design Document.
mod tests {
use tarantool::tuple::ToTupleBuffer;
use crate::storage::{Clusterwide, Properties, PropertyName, TClusterwideSpace};
use crate::schema::{Distribution, SpaceDef};
use crate::storage::TClusterwideSpace as _;
use crate::storage::{Clusterwide, Properties, PropertyName};
use crate::traft::op::DdlBuilder;
use super::*;
#[::tarantool::test]
fn check_ddl_predicate() {
Clusterwide::new().unwrap();
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
key_max: Bound::Included(
(PropertyName::PendingSchemaChange,)
.to_tuple_buffer()
.unwrap(),
),
}],
};
let err = predicate.check_entry(2, &Op::DdlCommit).unwrap_err();
assert_eq!(
&err.to_string(),
"comparison failed for index 1 as it conflicts with 2"
);
Predicate {
index: 1,
term: 1,
ranges: vec![],
}
.check_entry(2, &Op::DdlCommit)
.unwrap();
}
fn ddl() {
let storage = Clusterwide::new().unwrap();
#[::tarantool::test]
fn check_bounds() {
#[track_caller]
fn test(range: Range, test_cases: &[&str]) -> Vec<bool> {
let t = |op: &Op, range: Range| -> std::result::Result<(), Error> {
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![range],
};
test_cases
.iter()
.map(|&case| (String::from(case),).to_tuple_buffer().unwrap())
.map(|key| {
predicate
.check_entry(
2,
&Op::Dml(Dml::Delete {
space: ClusterwideSpace::Property,
key,
}),
)
.is_err()
})
.collect()
}
predicate.check_entry(2, op, &storage)
};
Clusterwide::new().unwrap();
let builder = DdlBuilder::with_schema_version(1);
let test_cases = ["a", "b", "c", "d", "e"];
let space_name = "space1";
let space_id = 1;
let index_id = 1;
// For range ["b", "d"]
// Test cases a,b,c,d,e
// Error 0,1,1,1,0
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, true, false]);
// For range ("b", "d"]
// Test cases a,b,c,d,e
// Error 0,0,1,1,0
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Excluded(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, false, true, true, false]);
// For range ["b", "d")
// Test cases a,b,c,d,e
// Error 0,1,1,0,0
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Excluded(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, false, false]);
// For range _, "d"]
// Test cases a,b,c,d,e
// Error 1,1,1,1,0
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Unbounded,
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
let create_space = builder.create_space(
space_id,
space_name.into(),
vec![],
vec![],
Distribution::Global,
);
let drop_space = builder.drop_space(space_id);
let create_index = builder.create_index(space_id, index_id, vec![]);
let drop_index = builder.drop_index(space_id, index_id);
let commit = Op::DdlCommit;
let abort = Op::DdlAbort;
let props = Properties::SPACE_NAME;
let pending_schema_change = (PropertyName::PendingSchemaChange.to_string(),);
let pending_schema_version = (PropertyName::PendingSchemaChange.to_string(),);
let current_schema_version = (PropertyName::CurrentSchemaVersion.to_string(),);
// create_space
assert!(t(&create_space, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&create_space, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&create_space, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&create_space, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&create_space, Range::new(space_name).eq(("any_key",))).is_err());
// drop_space
// `DropSpace` needs `SpaceDef` to get space name
storage
.spaces
.insert(&SpaceDef {
id: space_id,
name: space_name.into(),
operable: true,
distribution: Distribution::Global,
format: vec![],
schema_version: 1,
})
.unwrap();
assert!(t(&drop_space, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&drop_space, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&drop_space, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&drop_space, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&drop_space, Range::new(space_name).eq(("any_key",))).is_err());
// create_index
assert!(t(&create_index, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&create_index, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&create_index, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&create_index, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&create_index, Range::new(space_name).eq(("any_key",))).is_ok());
// drop_index
assert!(t(&drop_index, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&drop_index, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&drop_index, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&drop_index, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&drop_index, Range::new(space_name).eq(("any_key",))).is_ok());
// Abort and Commit need a pending schema change to get space name
let Op::DdlPrepare{ ddl: create_space_ddl, ..} = create_space else {
unreachable!();
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![true, true, true, true, false]);
// For range ["b", _
// Test cases a,b,c,d,e
// Error 0,1,1,1,1
let range = Range {
space: Properties::SPACE_NAME.into(),
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Unbounded,
storage
.properties
.put(PropertyName::PendingSchemaChange, &create_space_ddl)
.unwrap();
// commit
assert!(t(&commit, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&commit, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&commit, Range::new(props).eq(&current_schema_version)).is_err());
assert!(t(&commit, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&commit, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&commit, Range::new(space_name).eq(("any_key",))).is_err());
// abort
assert!(t(&abort, Range::new(props).eq(&pending_schema_change)).is_err());
assert!(t(&abort, Range::new(props).eq(&pending_schema_version)).is_err());
assert!(t(&abort, Range::new(props).eq(&current_schema_version)).is_err());
assert!(t(&abort, Range::new(props).eq(("another_key",))).is_ok());
assert!(t(&abort, Range::new("another_space").eq(("any_key",))).is_ok());
assert!(t(&abort, Range::new(space_name).eq(("any_key",))).is_err());
}
#[::tarantool::test]
fn dml() {
let key = (12,).to_tuple_buffer().unwrap();
let tuple = (12, "twelve").to_tuple_buffer().unwrap();
let storage = Clusterwide::new().unwrap();
let test = |op: &Dml, range: Range| {
let predicate = Predicate {
index: 1,
term: 1,
ranges: vec![range],
};
predicate.check_entry(2, &Op::Dml(op.clone()), &storage)
};
let errors = test(range, &test_cases);
assert_eq!(errors, vec![false, true, true, true, true]);
let space = ClusterwideSpace::Space;
let ops = &[
Dml::Insert {
space,
tuple: tuple.clone(),
},
Dml::Replace { space, tuple },
Dml::Update {
space,
key: key.clone(),
ops: vec![],
},
Dml::Delete { space, key },
];
let space = ClusterwideSpace::Space.to_string();
for op in ops {
assert!(test(op, Range::new(&space)).is_err());
assert!(test(op, Range::new(&space).le((12,))).is_err());
assert!(test(op, Range::new(&space).ge((12,))).is_err());
assert!(test(op, Range::new(&space).eq((12,))).is_err());
assert!(test(op, Range::new(&space).lt((12,))).is_ok());
assert!(test(op, Range::new(&space).le((11,))).is_ok());
assert!(test(op, Range::new(&space).gt((12,))).is_ok());
assert!(test(op, Range::new(&space).ge((13,))).is_ok());
assert!(test(op, Range::new("other_space")).is_ok());
}
}
}
Loading