From 3638d3368890f04e9c3da6f2280070e7e4a70199 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 27 Aug 2024 19:00:43 +0300
Subject: [PATCH] fix: cas requests are always reconstructed before retrying

---
 picoplugin/src/error_code.rs |  7 ++++-
 src/cas.rs                   | 56 +++++++++---------------------------
 src/traft/error.rs           | 17 +----------
 3 files changed, 21 insertions(+), 59 deletions(-)

diff --git a/picoplugin/src/error_code.rs b/picoplugin/src/error_code.rs
index 9620504031..5d3bcc31a3 100644
--- a/picoplugin/src/error_code.rs
+++ b/picoplugin/src/error_code.rs
@@ -62,6 +62,8 @@ tarantool::define_enum_with_introspection! {
         NoSuchInstance = 10016,
         NoSuchReplicaset = 10017,
 
+        LeaderUnknown = 10018,
+
         /// Not an actual error code, just designates the start of the range.
         UserDefinedErrorCodesStart = 20000,
         // Plugin writers should use error codes in this range
@@ -80,9 +82,12 @@ impl ErrorCode {
     #[inline]
     pub fn is_retriable_for_cas(&self) -> bool {
         match *self {
+            // Raft leader is in the middle of being changed.
+            // The client should synchronize and retry the request.
+            ErrorCode::LeaderUnknown
             // Raft leader has changed since the CaS request was generated.
             // The client should synchronize and retry the request.
-            ErrorCode::NotALeader
+            | ErrorCode::NotALeader
             // Raft term has changed since the CaS request was generated.
             // The client should synchronize and retry the request.
             | ErrorCode::TermMismatch
diff --git a/src/cas.rs b/src/cas.rs
index 7a90862260..d415b39647 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -14,7 +14,6 @@ use crate::traft::op::{Ddl, Dml, Op};
 use crate::traft::EntryContext;
 use crate::traft::Result;
 use crate::traft::{RaftIndex, RaftTerm};
-use crate::unwrap_ok_or;
 
 use ::raft::prelude as raft;
 use ::raft::Error as RaftError;
@@ -25,7 +24,6 @@ use crate::schema::ADMIN_ID;
 use tarantool::error::Error as TntError;
 use tarantool::error::TarantoolErrorCode;
 use tarantool::fiber;
-use tarantool::fiber::r#async::sleep;
 use tarantool::fiber::r#async::timeout::IntoTimeout;
 use tarantool::session::UserId;
 use tarantool::space::{Space, SpaceId};
@@ -78,47 +76,21 @@ pub async fn compare_and_swap_async(request: &Request) -> traft::Result<(RaftInd
         }
     }
 
-    loop {
-        let Some(leader_id) = node.status().leader_id else {
-            tlog!(
-                Warning,
-                "leader id is unknown, waiting for status change..."
-            );
-            node.wait_status();
-            continue;
-        };
-        let leader_address = unwrap_ok_or!(
-            node.storage.peer_addresses.try_get(leader_id),
-            Err(e) => {
-                tlog!(Warning, "failed getting leader address: {e}");
-                tlog!(Info, "going to retry in a while...");
-                sleep(Duration::from_millis(250)).await;
-                continue;
-            }
-        );
-        let resp = if leader_id == node.raft_id {
-            // cas has to be called locally in cases when listen ports are closed,
-            // for example on shutdown
-            proc_cas_local(request)
-        } else {
-            rpc::network_call(&leader_address, proc_name!(proc_cas), request)
-                .await
-                .map_err(TraftError::from)
-        };
-        match resp {
-            Ok(Response { index, term }) => return Ok((index, term)),
-            Err(e) => {
-                tlog!(Warning, "{e}");
-                if e.is_not_leader_err() {
-                    tlog!(Info, "going to retry in a while...");
-                    node.wait_status();
-                    continue;
-                } else {
-                    return Err(e);
-                }
-            }
-        }
+    let Some(leader_id) = node.status().leader_id else {
+        return Err(TraftError::LeaderUnknown);
+    };
+
+    let i_am_leader = leader_id == node.raft_id;
+    if i_am_leader {
+        // cas has to be called locally in cases when listen ports are closed,
+        // for example on shutdown
+        let resp = proc_cas_local(request)?;
+        return Ok((resp.index, resp.term));
     }
+
+    let leader_address = node.storage.peer_addresses.try_get(leader_id)?;
+    let resp = rpc::network_call(&leader_address, proc_name!(proc_cas), request).await?;
+    Ok((resp.index, resp.term))
 }
 
 /// Performs a clusterwide compare and swap operation.
diff --git a/src/traft/error.rs b/src/traft/error.rs
index 05197b01e2..ac05e8bb05 100644
--- a/src/traft/error.rs
+++ b/src/traft/error.rs
@@ -145,6 +145,7 @@ impl Error {
             // `IntoBoxError` for `sbroad::errors::SbroadError` and
             // uncomment the following line:
             // Self::Sbroad(e) => e.error_code(),
+            Self::LeaderUnknown => ErrorCode::LeaderUnknown as _,
             Self::NotALeader => ErrorCode::NotALeader as _,
             Self::TermMismatch { .. } => ErrorCode::TermMismatch as _,
             Self::NoSuchInstance(_) => ErrorCode::NoSuchInstance as _,
@@ -167,22 +168,6 @@ impl Error {
         Self::InvalidConfiguration(msg.to_string())
     }
 
-    // FIXME: remove this function, replace it with `is_retriable` everywhere it's used
-    #[inline]
-    pub fn is_retriable_cas_err(&self) -> bool {
-        matches!(
-            ErrorCode::try_from(self.error_code()),
-            Ok(ErrorCode::RaftLogCompacted)
-                | Ok(ErrorCode::CasConflictFound)
-                | Ok(ErrorCode::CasEntryTermMismatch)
-        )
-    }
-
-    #[inline(always)]
-    pub fn is_not_leader_err(&self) -> bool {
-        self.error_code() == ErrorCode::NotALeader as u32
-    }
-
     #[inline]
     pub fn is_retriable(&self) -> bool {
         let code = self.error_code();
-- 
GitLab