From 185341d0d4f20f60885eeebe35067fb740158257 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 27 Aug 2024 19:35:26 +0300
Subject: [PATCH] fix: unify all places where we call compare_and_swap and wait
 for the result

---
 src/rpc/join.rs            | 28 ++++++++++++++--------------
 src/rpc/update_instance.rs | 29 +++++++++++++++--------------
 src/schema.rs              | 14 ++++++++++++--
 3 files changed, 41 insertions(+), 30 deletions(-)

diff --git a/src/rpc/join.rs b/src/rpc/join.rs
index 9338b0de1e..ba924f0152 100644
--- a/src/rpc/join.rs
+++ b/src/rpc/join.rs
@@ -118,7 +118,6 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
             cas::Range::new(ClusterwideTable::Tier),
             cas::Range::new(ClusterwideTable::Replicaset),
         ];
-        // Only in this order - so that when instance exists - address will always be there.
         let cas_req = crate::cas::Request::new(
             Op::BatchDml { ops },
             cas::Predicate {
@@ -129,24 +128,25 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
             ADMIN_ID,
         )?;
         let res = cas::compare_and_swap(&cas_req, deadline.duration_since(fiber::clock()));
-        match res {
-            Ok((index, term)) => {
-                node.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                if term != raft::Storage::term(raft_storage, index)? {
-                    // leader switched - retry
-                    continue;
-                }
-            }
-            Err(err) => {
-                if err.is_retriable() {
-                    // cas error - retry
-                    fiber::sleep(Duration::from_millis(500));
+        let (index, term) = crate::unwrap_ok_or!(res,
+            Err(e) => {
+                if e.is_retriable() {
+                    crate::tlog!(Debug, "local CaS rejected: {e}");
+                    fiber::sleep(Duration::from_millis(250));
                     continue;
                 } else {
-                    return Err(err);
+                    return Err(e);
                 }
             }
+        );
+
+        node.wait_index(index, deadline.duration_since(fiber::clock()))?;
+
+        if term != raft::Storage::term(&node.raft_storage, index)? {
+            // Leader has changed and the entry got rolled back, retry.
+            continue;
         }
+
         node.main_loop.wakeup();
 
         // A joined instance needs to communicate with other nodes.
diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs
index a796b4feb2..ffcab28e06 100644
--- a/src/rpc/update_instance.rs
+++ b/src/rpc/update_instance.rs
@@ -221,27 +221,28 @@ pub fn handle_update_instance_request_in_governor_and_also_wait_too(
             ADMIN_ID,
         )?;
         let res = cas::compare_and_swap(&cas_req, deadline.duration_since(fiber::clock()));
-        match res {
-            Ok((index, term)) => {
-                node.wait_index(index, deadline.duration_since(fiber::clock()))?;
-                if term != raft::Storage::term(raft_storage, index)? {
-                    // leader switched - retry
-                    continue;
-                }
-            }
-            Err(err) => {
+        let (index, term) = crate::unwrap_ok_or!(res,
+            Err(e) => {
                 if req.dont_retry {
-                    return Err(err);
+                    return Err(e);
                 }
-                if err.is_retriable() {
-                    // cas error - retry
-                    fiber::sleep(Duration::from_millis(500));
+                if e.is_retriable() {
+                    crate::tlog!(Debug, "local CaS rejected: {e}");
+                    fiber::sleep(Duration::from_millis(250));
                     continue;
                 } else {
-                    return Err(err);
+                    return Err(e);
                 }
             }
+        );
+
+        node.wait_index(index, deadline.duration_since(fiber::clock()))?;
+
+        if term != raft::Storage::term(raft_storage, index)? {
+            // Leader has changed and the entry got rolled back, retry.
+            continue;
         }
+
         node.main_loop.wakeup();
         drop(guard);
         return Ok(());
diff --git a/src/schema.rs b/src/schema.rs
index 997e6b3dd5..d81bc57909 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -2482,9 +2482,19 @@ pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
             instance_id,
         };
         let req = Request::new(Op::DdlAbort { cause }, predicate, effective_user_id())?;
-        // FIXME: this error handling is wrong, must retry if e.is_retriable()
-        let (index, term) = compare_and_swap(&req, timeout)?;
+        let res = compare_and_swap(&req, timeout);
+        let (index, term) = crate::unwrap_ok_or!(res,
+            Err(e) => {
+                if e.is_retriable() {
+                    continue;
+                } else {
+                    return Err(e);
+                }
+            }
+        );
+
         node.wait_index(index, timeout)?;
+
         if raft::Storage::term(&node.raft_storage, index)? != term {
             // leader switched - retry
             continue;
-- 
GitLab