Skip to content
Snippets Groups Projects
Commit 6d0874fd authored by Egor Ivkov's avatar Egor Ivkov Committed by Егор Ивков
Browse files

feat: fail cas if space operable changes in unseen entries

parent bdc2cc07
No related branches found
No related tags found
1 merge request!525cas predicate tests and minor features
use crate::storage::TClusterwideSpace as _;
use crate::storage::{ClusterwideSpace, Indexes, Spaces};
use crate::storage::{ClusterwideSpace, Indexes, Properties, Spaces};
use crate::storage::{PropertyName, TClusterwideSpace as _};
use crate::tlog;
use crate::traft::error::Error as TraftError;
use crate::traft::node;
use crate::traft::op::{Dml, Op};
use crate::traft::op::{Ddl, Dml, Op};
use crate::traft::Result;
use crate::traft::{EntryContext, EntryContextNormal};
use crate::traft::{RaftIndex, RaftTerm};
......@@ -24,6 +24,7 @@ crate::define_rpc_request! {
fn proc_cas(req: Request) -> Result<Response> {
let node = node::global()?;
let raft_storage = &node.raft_storage;
let storage = &node.storage;
let cluster_id = raft_storage
.cluster_id()?
.expect("cluster_id is set on boot");
......@@ -120,7 +121,7 @@ crate::define_rpc_request! {
assert_eq!(entry.term, status.term);
let entry_index = entry.index;
let Some(op) = entry.into_op() else { continue };
req.predicate.check_entry(entry_index, &op)?;
req.predicate.check_entry(entry_index, &op, &storage.properties)?;
}
}
......@@ -135,7 +136,7 @@ crate::define_rpc_request! {
let Some(EntryContext::Normal(EntryContextNormal { op, .. })) = cx else {
continue;
};
req.predicate.check_entry(entry.index, &op)?;
req.predicate.check_entry(entry.index, &op, &storage.properties)?;
}
// TODO: apply to limbo first
......@@ -220,6 +221,7 @@ impl Predicate {
&self,
entry_index: RaftIndex,
entry_op: &Op,
properties: &Properties,
) -> std::result::Result<(), Error> {
let error = || Error::ConflictFound {
requested: self.index,
......@@ -263,6 +265,9 @@ impl Predicate {
.expect("keys should convert to tuple")
});
for range in &self.ranges {
if modifies_operable(entry_op, &range.space, properties) {
return Err(error());
}
let Some(space) = space(entry_op) else {
continue
};
......@@ -327,6 +332,32 @@ fn space(op: &Op) -> Option<ClusterwideSpace> {
}
}
/// Checks if the operation would by its semantics modify `operable` flag of the provided `space`.
fn modifies_operable(op: &Op, space: &str, properties: &Properties) -> bool {
let ddl_modifies = |ddl: &Ddl| match ddl {
Ddl::CreateSpace { name, .. } => name == space,
Ddl::DropSpace { id: _id } => {
todo!("we should put clusterwide spaces defs into _picodata_space first")
}
Ddl::CreateIndex { .. } => false,
Ddl::DropIndex { .. } => false,
};
match op {
Op::DdlPrepare { ddl, .. } => ddl_modifies(ddl),
Op::DdlCommit | Op::DdlAbort => {
if let Some(change) = properties
.get::<Ddl>(PropertyName::PendingSchemaChange)
.expect("conversion should not fail")
{
ddl_modifies(&change)
} else {
false
}
}
_ => false,
}
}
mod tests {
use tarantool::tuple::ToTupleBuffer;
......@@ -337,7 +368,7 @@ mod tests {
#[::tarantool::test]
fn check_ddl_predicate() {
Clusterwide::new().unwrap();
let clusterwide = Clusterwide::new().unwrap();
let predicate = Predicate {
index: 1,
......@@ -356,7 +387,9 @@ mod tests {
),
}],
};
let err = predicate.check_entry(2, &Op::DdlCommit).unwrap_err();
let err = predicate
.check_entry(2, &Op::DdlCommit, &clusterwide.properties)
.unwrap_err();
assert_eq!(
&err.to_string(),
"comparison failed for index 1 as it conflicts with 2"
......@@ -367,14 +400,14 @@ mod tests {
term: 1,
ranges: vec![],
}
.check_entry(2, &Op::DdlCommit)
.check_entry(2, &Op::DdlCommit, &clusterwide.properties)
.unwrap();
}
#[::tarantool::test]
fn check_bounds() {
#[track_caller]
fn test(range: Range, test_cases: &[&str]) -> Vec<bool> {
fn test(range: Range, test_cases: &[&str], storage: &Clusterwide) -> Vec<bool> {
let predicate = Predicate {
index: 1,
term: 1,
......@@ -391,13 +424,14 @@ mod tests {
space: ClusterwideSpace::Property,
key,
}),
&storage.properties,
)
.is_err()
})
.collect()
}
Clusterwide::new().unwrap();
let storage = Clusterwide::new().unwrap();
let test_cases = ["a", "b", "c", "d", "e"];
......@@ -409,7 +443,7 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
let errors = test(range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, true, false]);
// For range ("b", "d"]
......@@ -420,7 +454,7 @@ mod tests {
key_min: Bound::Excluded(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
let errors = test(range, &test_cases, &storage);
assert_eq!(errors, vec![false, false, true, true, false]);
// For range ["b", "d")
......@@ -431,7 +465,7 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Excluded(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
let errors = test(range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, false, false]);
// For range _, "d"]
......@@ -442,7 +476,7 @@ mod tests {
key_min: Bound::Unbounded,
key_max: Bound::Included(("d",).to_tuple_buffer().unwrap()),
};
let errors = test(range, &test_cases);
let errors = test(range, &test_cases, &storage);
assert_eq!(errors, vec![true, true, true, true, false]);
// For range ["b", _
......@@ -453,7 +487,7 @@ mod tests {
key_min: Bound::Included(("b",).to_tuple_buffer().unwrap()),
key_max: Bound::Unbounded,
};
let errors = test(range, &test_cases);
let errors = test(range, &test_cases, &storage);
assert_eq!(errors, vec![false, true, true, true, true]);
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment