From d40e24f315435f45cc98490d25b765ed4a6fc6e3 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 13 Aug 2024 00:16:58 +0300
Subject: [PATCH] refactor: cleanup CaS code in plugins

---
 src/plugin/mod.rs | 92 ++++++++++++++++++++++-------------------------
 1 file changed, 42 insertions(+), 50 deletions(-)

diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index ad107241c1..bb33b8d237 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -19,7 +19,7 @@ use tarantool::error::BoxError;
 use tarantool::fiber;
 use tarantool::time::Instant;
 
-use crate::cas::{compare_and_swap, Range};
+use crate::cas::Range;
 use crate::info::InstanceInfo;
 use crate::plugin::migration::MigrationInfo;
 use crate::plugin::PluginError::{PluginNotFound, RemoveOfEnabledPlugin};
@@ -29,6 +29,7 @@ use crate::traft::node::Node;
 use crate::traft::op::PluginRaftOp;
 use crate::traft::op::{Dml, Op};
 use crate::traft::{node, RaftIndex};
+use crate::unwrap_ok_or;
 use crate::util::effective_user_id;
 use crate::{cas, error_injection, tlog, traft};
 
@@ -383,34 +384,30 @@ fn do_routing_table_cas(
             ops: dml_ops.clone(),
         };
 
-        let req = crate::cas::Request::new(
-            op,
-            cas::Predicate {
-                index: raft_storage.applied()?,
-                term: raft_storage.term()?,
-                ranges: ranges.clone(),
-            },
-            ADMIN_ID,
-        )?;
-        let res = cas::compare_and_swap(&req, deadline.duration_since(fiber::clock()));
-        match res {
-            Ok((index, term)) => {
-                node.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                if term != raft::Storage::term(raft_storage, index)? {
-                    // leader switched - retry
-                    continue;
-                }
-            }
-            Err(err) => {
-                if err.is_retriable() {
-                    // cas error - retry
-                    fiber::sleep(Duration::from_millis(500));
+        let predicate = cas::Predicate {
+            index: raft_storage.applied()?,
+            term: raft_storage.term()?,
+            ranges: ranges.clone(),
+        };
+        let req = crate::cas::Request::new(op.clone(), predicate, ADMIN_ID)?;
+        let res = cas::compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
+        let (index, term) = unwrap_ok_or!(res,
+            Err(e) => {
+                if e.is_retriable() {
                     continue;
                 } else {
-                    return Err(err);
+                    return Err(e);
                 }
             }
+        );
+
+        node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
+
+        if term != raft::Storage::term(&node.raft_storage, index)? {
+            // Leader has changed and the entry got rolled back, retry.
+            continue;
         }
+
         return Ok(());
     }
 }
@@ -485,8 +482,6 @@ fn do_plugin_cas(
     try_again_condition: Option<fn(&Node) -> traft::Result<bool>>,
     deadline: Instant,
 ) -> traft::Result<RaftIndex> {
-    let raft_storage = &node.raft_storage;
-
     loop {
         let index = node.read_index(deadline.duration_since(Instant::now_fiber()))?;
         if let Some(try_again_condition) = try_again_condition {
@@ -498,34 +493,31 @@ fn do_plugin_cas(
 
         // FIXME: preconditions & operation must be recomputed on each retry
 
-        let req = crate::cas::Request::new(
-            op.clone(),
-            cas::Predicate {
-                index: raft_storage.applied()?,
-                term: raft_storage.term()?,
-                ranges: ranges.clone(),
-            },
-            // FIXME: access rules will be implemented in future release
-            effective_user_id(),
-        )?;
-        let cas_result = compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
-        match cas_result {
-            Ok((index, term)) => {
-                node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
-                if term != raft::Storage::term(raft_storage, index)? {
-                    // leader switched - retry
-                    continue;
-                }
-            }
-            Err(err) => {
-                if err.is_retriable() {
-                    // cas error - retry
-                    fiber::sleep(Duration::from_millis(500));
+        let term = raft::Storage::term(&node.raft_storage, index)?;
+        let predicate = cas::Predicate {
+            index,
+            term,
+            ranges: ranges.clone(),
+        };
+        // FIXME: access rules will be implemented in future release
+        let current_user = effective_user_id();
+        let req = crate::cas::Request::new(op.clone(), predicate, current_user)?;
+        let res = cas::compare_and_swap(&req, deadline.duration_since(Instant::now_fiber()));
+        let (index, term) = unwrap_ok_or!(res,
+            Err(e) => {
+                if e.is_retriable() {
                     continue;
                 } else {
-                    return Err(err);
+                    return Err(e);
                 }
             }
+        );
+
+        node.wait_index(index, deadline.duration_since(Instant::now_fiber()))?;
+
+        if term != raft::Storage::term(&node.raft_storage, index)? {
+            // Leader has changed and the entry got rolled back, retry.
+            continue;
         }
 
         return Ok(index);
-- 
GitLab