Skip to content
Snippets Groups Projects
Commit d40e24f3 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: cleanup CaS code in plugins

parent dca8812f
No related branches found
No related tags found
1 merge request!1209plugin system cleanup
......@@ -19,7 +19,7 @@ use tarantool::error::BoxError;
use tarantool::fiber;
use tarantool::time::Instant;
use crate::cas::{compare_and_swap, Range};
use crate::cas::Range;
use crate::info::InstanceInfo;
use crate::plugin::migration::MigrationInfo;
use crate::plugin::PluginError::{PluginNotFound, RemoveOfEnabledPlugin};
......@@ -29,6 +29,7 @@ use crate::traft::node::Node;
use crate::traft::op::PluginRaftOp;
use crate::traft::op::{Dml, Op};
use crate::traft::{node, RaftIndex};
use crate::unwrap_ok_or;
use crate::util::effective_user_id;
use crate::{cas, error_injection, tlog, traft};
......@@ -383,34 +384,30 @@ fn do_routing_table_cas(
ops: dml_ops.clone(),
};
let req = crate::cas::Request::new(
op,
cas::Predicate {
index: raft_storage.applied()?,
term: raft_storage.term()?,
ranges: ranges.clone(),
},
ADMIN_ID,
)?;
let res = cas::compare_and_swap(&req, deadline.duration_since(fiber::clock()));
match res {
Ok((index, term)) => {
node.wait_index(index, deadline.duration_since(fiber::clock()))?;
if term != raft::Storage::term(raft_storage, index)? {
// leader switched - retry
continue;
}
}
Err(err) => {
if err.is_retriable() {
// cas error - retry
fiber::sleep(Duration::from_millis(500));
let predicate = cas::Predicate {
index: raft_storage.applied()?,
term: raft_storage.term()?,
ranges: ranges.clone(),
};
let req = crate::cas::Request::new(op.clone(), predicate, ADMIN_ID)?;
let res = cas::compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
let (index, term) = unwrap_ok_or!(res,
Err(e) => {
if e.is_retriable() {
continue;
} else {
return Err(err);
return Err(e);
}
}
);
node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
if term != raft::Storage::term(&node.raft_storage, index)? {
// Leader has changed and the entry got rolled back, retry.
continue;
}
return Ok(());
}
}
......@@ -485,8 +482,6 @@ fn do_plugin_cas(
try_again_condition: Option<fn(&Node) -> traft::Result<bool>>,
deadline: Instant,
) -> traft::Result<RaftIndex> {
let raft_storage = &node.raft_storage;
loop {
let index = node.read_index(deadline.duration_since(Instant::now_fiber()))?;
if let Some(try_again_condition) = try_again_condition {
......@@ -498,34 +493,31 @@ fn do_plugin_cas(
// FIXME: preconditions & operation must be recomputed on each retry
let req = crate::cas::Request::new(
op.clone(),
cas::Predicate {
index: raft_storage.applied()?,
term: raft_storage.term()?,
ranges: ranges.clone(),
},
// FIXME: access rules will be implemented in future release
effective_user_id(),
)?;
let cas_result = compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
match cas_result {
Ok((index, term)) => {
node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
if term != raft::Storage::term(raft_storage, index)? {
// leader switched - retry
continue;
}
}
Err(err) => {
if err.is_retriable() {
// cas error - retry
fiber::sleep(Duration::from_millis(500));
let term = raft::Storage::term(&node.raft_storage, index)?;
let predicate = cas::Predicate {
index,
term,
ranges: ranges.clone(),
};
// FIXME: access rules will be implemented in future release
let current_user = effective_user_id();
let req = crate::cas::Request::new(op.clone(), predicate, current_user)?;
let res = cas::compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
let (index, term) = unwrap_ok_or!(res,
Err(e) => {
if e.is_retriable() {
continue;
} else {
return Err(err);
return Err(e);
}
}
);
node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
if term != raft::Storage::term(&node.raft_storage, index)? {
// Leader has changed and the entry got rolled back, retry.
continue;
}
return Ok(index);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment