Skip to content
Snippets Groups Projects
Commit 3638d336 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: cas requests are always reconstructed before retrying

parent 8a74d022
No related branches found
No related tags found
1 merge request!1246cleanup a bunch of stuff related to compare and swap requests
......@@ -62,6 +62,8 @@ tarantool::define_enum_with_introspection! {
NoSuchInstance = 10016,
NoSuchReplicaset = 10017,
LeaderUnknown = 10018,
/// Not an actual error code, just designates the start of the range.
UserDefinedErrorCodesStart = 20000,
// Plugin writers should use error codes in this range
......@@ -80,9 +82,12 @@ impl ErrorCode {
#[inline]
pub fn is_retriable_for_cas(&self) -> bool {
match *self {
// Raft leader is in the middle of being changed.
// The client should synchronize and retry the request.
ErrorCode::LeaderUnknown
// Raft leader has changed since the CaS request was generated.
// The client should synchronize and retry the request.
ErrorCode::NotALeader
| ErrorCode::NotALeader
// Raft term has changed since the CaS request was generated.
// The client should synchronize and retry the request.
| ErrorCode::TermMismatch
......
......@@ -14,7 +14,6 @@ use crate::traft::op::{Ddl, Dml, Op};
use crate::traft::EntryContext;
use crate::traft::Result;
use crate::traft::{RaftIndex, RaftTerm};
use crate::unwrap_ok_or;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
......@@ -25,7 +24,6 @@ use crate::schema::ADMIN_ID;
use tarantool::error::Error as TntError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::fiber::r#async::sleep;
use tarantool::fiber::r#async::timeout::IntoTimeout;
use tarantool::session::UserId;
use tarantool::space::{Space, SpaceId};
......@@ -78,47 +76,21 @@ pub async fn compare_and_swap_async(request: &Request) -> traft::Result<(RaftInd
}
}
loop {
let Some(leader_id) = node.status().leader_id else {
tlog!(
Warning,
"leader id is unknown, waiting for status change..."
);
node.wait_status();
continue;
};
let leader_address = unwrap_ok_or!(
node.storage.peer_addresses.try_get(leader_id),
Err(e) => {
tlog!(Warning, "failed getting leader address: {e}");
tlog!(Info, "going to retry in a while...");
sleep(Duration::from_millis(250)).await;
continue;
}
);
let resp = if leader_id == node.raft_id {
// cas has to be called locally in cases when listen ports are closed,
// for example on shutdown
proc_cas_local(request)
} else {
rpc::network_call(&leader_address, proc_name!(proc_cas), request)
.await
.map_err(TraftError::from)
};
match resp {
Ok(Response { index, term }) => return Ok((index, term)),
Err(e) => {
tlog!(Warning, "{e}");
if e.is_not_leader_err() {
tlog!(Info, "going to retry in a while...");
node.wait_status();
continue;
} else {
return Err(e);
}
}
}
let Some(leader_id) = node.status().leader_id else {
return Err(TraftError::LeaderUnknown);
};
let i_am_leader = leader_id == node.raft_id;
if i_am_leader {
// cas has to be called locally in cases when listen ports are closed,
// for example on shutdown
let resp = proc_cas_local(request)?;
return Ok((resp.index, resp.term));
}
let leader_address = node.storage.peer_addresses.try_get(leader_id)?;
let resp = rpc::network_call(&leader_address, proc_name!(proc_cas), request).await?;
Ok((resp.index, resp.term))
}
/// Performs a clusterwide compare and swap operation.
......
......@@ -145,6 +145,7 @@ impl Error {
// `IntoBoxError` for `sbroad::errors::SbroadError` and
// uncomment the following line:
// Self::Sbroad(e) => e.error_code(),
Self::LeaderUnknown => ErrorCode::LeaderUnknown as _,
Self::NotALeader => ErrorCode::NotALeader as _,
Self::TermMismatch { .. } => ErrorCode::TermMismatch as _,
Self::NoSuchInstance(_) => ErrorCode::NoSuchInstance as _,
......@@ -167,22 +168,6 @@ impl Error {
Self::InvalidConfiguration(msg.to_string())
}
// FIXME: remove this function, replace it with `is_retriable` everywhere it's used
#[inline]
pub fn is_retriable_cas_err(&self) -> bool {
matches!(
ErrorCode::try_from(self.error_code()),
Ok(ErrorCode::RaftLogCompacted)
| Ok(ErrorCode::CasConflictFound)
| Ok(ErrorCode::CasEntryTermMismatch)
)
}
#[inline(always)]
pub fn is_not_leader_err(&self) -> bool {
self.error_code() == ErrorCode::NotALeader as u32
}
#[inline]
pub fn is_retriable(&self) -> bool {
let code = self.error_code();
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment