From ffaf847f967795fc4b8e826ae332c8f0fc116a6d Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 6 Nov 2024 20:22:37 +0300
Subject: [PATCH] fix: report error when sending RPC to expelled replicaset

---
 picodata-plugin/src/error_code.rs |  3 +++
 src/plugin/rpc/client.rs          | 30 ++++++++++++++++++++++++++++++
 src/replicaset.rs                 |  6 +++++-
 test/conftest.py                  |  2 ++
 4 files changed, 40 insertions(+), 1 deletion(-)

diff --git a/picodata-plugin/src/error_code.rs b/picodata-plugin/src/error_code.rs
index 8c837546a2..905f145214 100644
--- a/picodata-plugin/src/error_code.rs
+++ b/picodata-plugin/src/error_code.rs
@@ -70,6 +70,9 @@ tarantool::define_enum_with_introspection! {
         // Instance in question was expelled from the cluster.
         InstanceExpelled = 10020,
 
+        // Replicaset in question was expelled from the cluster.
+        ReplicasetExpelled = 10021,
+
         /// Not an actual error code, just designates the start of the range.
         UserDefinedErrorCodesStart = 20000,
         // Plugin writers should use error codes in this range
diff --git a/src/plugin/rpc/client.rs b/src/plugin/rpc/client.rs
index 3b6bd5c6a2..22a15fac17 100644
--- a/src/plugin/rpc/client.rs
+++ b/src/plugin/rpc/client.rs
@@ -3,6 +3,7 @@ use crate::has_states;
 use crate::instance::InstanceName;
 use crate::plugin::{rpc, PluginIdentifier};
 use crate::replicaset::Replicaset;
+use crate::replicaset::ReplicasetState;
 use crate::schema::ServiceRouteItem;
 use crate::schema::ServiceRouteKey;
 use crate::tlog;
@@ -19,6 +20,7 @@ use tarantool::error::Error as TntError;
 use tarantool::error::IntoBoxError;
 use tarantool::error::TarantoolErrorCode;
 use tarantool::fiber;
+use tarantool::tuple::Tuple;
 use tarantool::tuple::TupleBuffer;
 use tarantool::tuple::{RawByteBuf, RawBytes};
 use tarantool::uuid::Uuid;
@@ -227,6 +229,7 @@ fn resolve_rpc_target(
     use FfiSafeRpcTargetSpecifier as Target;
 
     let mut instance_name = None;
+    let mut replicaset_tuple = None;
     match target {
         Target::InstanceName(iid) => {
             // SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
@@ -248,6 +251,7 @@ fn resolve_rpc_target(
                 return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
             };
             instance_name = Some(master_name);
+            replicaset_tuple = Some(tuple);
         }
 
         &Target::BucketId {
@@ -344,6 +348,7 @@ fn resolve_rpc_target(
             };
 
             tier_and_replicaset_uuid = Some((found_tier, found_replicaset_uuid));
+            replicaset_tuple = Some(tuple);
         }
 
         &Target::BucketId {
@@ -408,6 +413,8 @@ fn resolve_rpc_target(
             return Ok(my_instance_name);
         }
 
+        check_replicaset_is_not_expelled(node, &replicaset_uuid, replicaset_tuple)?;
+
         #[rustfmt::skip]
         return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {ident}.{service}")).into());
     } else {
@@ -479,3 +486,26 @@ fn check_route_to_instance(
     }
     Ok(())
 }
+
+fn check_replicaset_is_not_expelled(
+    node: &Node,
+    uuid: &str,
+    maybe_tuple: Option<Tuple>,
+) -> Result<(), Error> {
+    let tuple;
+    if let Some(t) = maybe_tuple {
+        tuple = t;
+    } else {
+        tuple = node.storage.replicasets.by_uuid_raw(uuid)?;
+    }
+
+    let state = tuple.field(Replicaset::FIELD_STATE)?;
+    let state: ReplicasetState = state.expect("replicaset should always have a state column");
+
+    if state == ReplicasetState::Expelled {
+        #[rustfmt::skip]
+        return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {uuid} was expelled")).into());
+    }
+
+    Ok(())
+}
diff --git a/src/replicaset.rs b/src/replicaset.rs
index 6711756994..bbcfc64cdb 100644
--- a/src/replicaset.rs
+++ b/src/replicaset.rs
@@ -67,7 +67,7 @@ pub struct Replicaset {
 impl Encode for Replicaset {}
 
 impl Replicaset {
-    /// Index of field "replicaset_uuid" in the table _pico_replicaset format.
+    /// Index of field "uuid" in the table _pico_replicaset format.
     ///
     /// Index of first field is 0.
     pub const FIELD_REPLICASET_UUID: u32 = 1;
@@ -78,6 +78,9 @@ impl Replicaset {
     /// Index of field "tier" in the table _pico_replicaset format.
     pub const FIELD_TIER: u32 = 4;
 
+    /// Index of field "state" in the table _pico_replicaset format.
+    pub const FIELD_STATE: u32 = 7;
+
     #[inline]
     pub fn with_one_instance(master: &Instance) -> Replicaset {
         Replicaset {
@@ -215,5 +218,6 @@ mod tests {
             "target_master_name"
         );
         assert_eq!(format[Replicaset::FIELD_TIER as usize].name, "tier");
+        assert_eq!(format[Replicaset::FIELD_STATE as usize].name, "state");
     }
 }
diff --git a/test/conftest.py b/test/conftest.py
index a6adc80e37..2b4a42707a 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -76,6 +76,8 @@ class ErrorCode:
     NoSuchReplicaset = 10017
     LeaderUnknown = 10018
     PluginError = 10019
+    InstanceExpelled = 10020
+    ReplicasetExpelled = 10021
 
     # Make sure this matches this list in
     # picodata_plugin::error_code::ErrorCode::is_retriable_for_cas
-- 
GitLab