From 492bbb08fdbd9cb88b9e4c799be5072bbbd330b4 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Wed, 14 Aug 2024 20:32:22 +0300
Subject: [PATCH] refactor: preparations for returning errors from governor

---
 src/governor/mod.rs      | 33 ++++++++++++++-------------
 src/plugin/mod.rs        | 15 ++++++++++---
 src/rpc/ddl_apply.rs     | 31 +++++++++++++++++---------
 src/rpc/enable_plugin.rs | 13 +++++++++--
 src/traft/error.rs       | 48 +++++++++++++++++++++++++++++++++++++++-
 5 files changed, 106 insertions(+), 34 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index 6cdefcddf1..201686cdad 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -28,6 +28,7 @@ use crate::storage::Clusterwide;
 use crate::storage::ToEntryIter as _;
 use crate::tlog;
 use crate::traft::error::Error;
+use crate::traft::error::ErrorInfo;
 use crate::traft::network::ConnectionPool;
 use crate::traft::node::global;
 use crate::traft::node::Status;
@@ -178,13 +179,13 @@ impl Loop {
         #[derive(Debug)]
         enum OnError {
             Retry(Error),
-            Abort,
+            Abort(ErrorInfo),
         }
         impl From<OnError> for Error {
             fn from(e: OnError) -> Error {
                 match e {
                     OnError::Retry(e) => e,
-                    OnError::Abort => Error::other("schema change was aborted"),
+                    OnError::Abort(_) => unreachable!("we never convert Abort to Error"),
                 }
             }
         }
@@ -457,11 +458,11 @@ impl Loop {
                                         );
                                         Ok(())
                                     }
-                                    Ok(rpc::ddl_apply::Response::Abort { reason }) => {
-                                        tlog!(Error, "failed to apply schema change on instance: {reason}";
+                                    Ok(rpc::ddl_apply::Response::Abort { cause }) => {
+                                        tlog!(Error, "failed to apply schema change on instance: {cause}";
                                             "instance_id" => %instance_id,
                                         );
-                                        Err(OnError::Abort)
+                                        Err(OnError::Abort(cause))
                                     }
                                     Err(e) => {
                                         tlog!(Warning, "failed calling proc_apply_schema_change: {e}";
@@ -474,7 +475,7 @@ impl Loop {
                         }
                         // TODO: don't hard code timeout
                         let res = try_join_all(fs).timeout(Duration::from_secs(3)).await;
-                        if let Err(TimeoutError::Failed(OnError::Abort)) = res {
+                        if let Err(TimeoutError::Failed(OnError::Abort(_cause))) = res {
                             next_op = Op::DdlAbort;
                             return Ok(());
                         }
@@ -525,14 +526,14 @@ impl Loop {
                                         tlog!(Error, "failed to call proc_load_plugin_dry_run: {e}";
                                             "instance_id" => %instance_id
                                         );
-                                        Err(OnError::Abort)
+                                        Err(e)
                                     }
                                 }
                             });
                         }
 
                         if let Err(e) = try_join_all(fs).timeout(Duration::from_secs(5)).await {
-                            tlog!(Error, "Plugin installation aborted: {e:?}");
+                            tlog!(Error, "Plugin installation aborted: {e}");
                             return Ok(());
                         }
 
@@ -569,22 +570,20 @@ impl Loop {
                             fs.push(async move {
                                 match resp.await {
                                     Ok(rpc::enable_plugin::Response::Ok) => {
-                                        tlog!(Info, "load plugin on instance";
-                                            "instance_id" => %instance_id,
-                                        );
+                                        tlog!(Info, "enable plugin on instance"; "instance_id" => %instance_id);
                                         Ok(())
                                     }
-                                    Ok(rpc::enable_plugin::Response::Abort { reason }) => {
-                                        tlog!(Error, "failed to load plugin at instance: {reason}";
+                                    Ok(rpc::enable_plugin::Response::Abort { cause }) => {
+                                        tlog!(Error, "failed to enable plugin at instance: {cause}";
                                             "instance_id" => %instance_id,
                                         );
-                                        Err(OnError::Abort)
+                                        Err(OnError::Abort(cause))
                                     }
                                     Err(Error::Timeout) => {
-                                        tlog!(Error, "failed to load plugin at instance: timeout";
+                                        tlog!(Error, "failed to enable plugin at instance: timeout";
                                             "instance_id" => %instance_id,
                                         );
-                                        Err(OnError::Abort)
+                                        Err(OnError::Abort(ErrorInfo::timeout(instance_id.clone(), "failed to enable plugin")))
                                     }
                                     Err(e) => {
                                         tlog!(Warning, "failed calling proc_load_plugin: {e}";
@@ -597,7 +596,7 @@ impl Loop {
                         }
 
                         let enable_result = try_join_all(fs).timeout(on_start_timeout.add(Duration::from_secs(1))).await;
-                        if let Err(TimeoutError::Failed(OnError::Abort)) = enable_result {
+                        if let Err(TimeoutError::Failed(OnError::Abort(_cause))) = enable_result {
                             next_op = Some(rollback_op);
                             return Ok(());
                         }
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index f1ab025383..dd4e2a09e7 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -6,6 +6,7 @@ pub mod topology;
 
 use once_cell::unsync;
 use picoplugin::background::ServiceId;
+use picoplugin::error_code::ErrorCode;
 use picoplugin::plugin::interface::ServiceBox;
 use serde::{Deserialize, Serialize};
 use std::fmt::{Display, Formatter};
@@ -14,7 +15,7 @@ use std::io;
 use std::path::{Path, PathBuf};
 use std::rc::Rc;
 use std::time::Duration;
-use tarantool::error::BoxError;
+use tarantool::error::{BoxError, IntoBoxError};
 use tarantool::fiber;
 use tarantool::time::Instant;
 
@@ -61,8 +62,6 @@ pub enum PluginError {
     ManifestNotFound(String, io::Error),
     #[error("Error while parsing manifest `{0}`, reason: {1}")]
     InvalidManifest(String, Box<dyn std::error::Error>),
-    #[error("`{0}` service defenition not found")]
-    ServiceDefenitionNotFound(String),
     #[error("Read plugin_dir: {0}")]
     ReadPluginDir(#[from] io::Error),
     #[error("Invalid shared object file: {0}")]
@@ -99,6 +98,16 @@ pub enum PluginError {
     AmbiguousEnableCandidate,
 }
 
+impl IntoBoxError for PluginError {
+    #[inline(always)]
+    fn error_code(&self) -> u32 {
+        match self {
+            Self::ServiceNotFound { .. } => ErrorCode::NoSuchService as _,
+            _ => ErrorCode::Other as _,
+        }
+    }
+}
+
 #[derive(thiserror::Error, Debug)]
 pub enum PluginCallbackError {
     #[error("on_start: {0}")]
diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs
index 6c6ab79c52..0803ab4f68 100644
--- a/src/rpc/ddl_apply.rs
+++ b/src/rpc/ddl_apply.rs
@@ -7,9 +7,11 @@ use crate::storage::{ddl_rename_function_on_master, Clusterwide};
 use crate::storage::{local_schema_version, set_local_schema_version};
 use crate::tlog;
 use crate::traft::error::Error as TraftError;
+use crate::traft::error::ErrorInfo;
 use crate::traft::node;
 use crate::traft::{RaftIndex, RaftTerm};
 use std::time::Duration;
+use tarantool::error::IntoBoxError;
 use tarantool::error::{BoxError, TarantoolErrorCode};
 use tarantool::transaction::{transaction, TransactionError};
 
@@ -60,7 +62,14 @@ crate::define_rpc_request! {
             Ok(()) => Ok(Response::Ok),
             Err(TransactionError::RolledBack(Error::Aborted(err))) => {
                 tlog!(Warning, "schema change aborted: {err}");
-                Ok(Response::Abort { reason: err})
+
+                let instance_id = node.raft_storage.instance_id()?.expect("should be persisted before procs are defined");
+                let cause = ErrorInfo {
+                    error_code: err.error_code(),
+                    message: err.to_string(),
+                    instance_id,
+                };
+                Ok(Response::Abort { cause })
             }
             Err(err) => {
                 tlog!(Warning, "applying schema change failed: {err}");
@@ -80,7 +89,7 @@ crate::define_rpc_request! {
         Ok,
         /// Schema change failed on this instance and should be aborted on the
         /// whole cluster.
-        Abort { reason: String },
+        Abort { cause: ErrorInfo },
     }
 }
 
@@ -89,7 +98,7 @@ pub enum Error {
     /// Schema change failed on this instance and should be aborted on the
     /// whole cluster.
     #[error("{0}")]
-    Aborted(String),
+    Aborted(TraftError),
 
     #[error("{0}")]
     Other(TraftError),
@@ -127,7 +136,7 @@ pub fn apply_schema_change(
                         tlog!(Error, "{}:{}: {e}", file, line);
                     }
                 }
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e.into()));
             }
         }
 
@@ -139,13 +148,13 @@ pub fn apply_schema_change(
 
             let abort_reason = ddl_drop_space_on_master(id).map_err(Error::Other)?;
             if let Some(e) = abort_reason {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e.into()));
             }
         }
 
         Ddl::CreateProcedure { id, .. } => {
             if let Err(e) = ddl_create_function_on_master(storage, id) {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e));
             }
         }
 
@@ -156,7 +165,7 @@ pub fn apply_schema_change(
 
             let abort_reason = ddl_drop_function_on_master(id).map_err(Error::Other)?;
             if let Some(e) = abort_reason {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e.into()));
             }
         }
 
@@ -166,7 +175,7 @@ pub fn apply_schema_change(
             ..
         } => {
             if let Err(e) = ddl_rename_function_on_master(storage, routine_id, new_name) {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e));
             }
         }
 
@@ -174,7 +183,7 @@ pub fn apply_schema_change(
             space_id, index_id, ..
         } => {
             if let Err(e) = ddl_create_index_on_master(storage, space_id, index_id) {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e));
             }
         }
 
@@ -186,13 +195,13 @@ pub fn apply_schema_change(
             }
 
             if let Err(e) = ddl_drop_index_on_master(space_id, index_id) {
-                return Err(Error::Aborted(e.to_string()));
+                return Err(Error::Aborted(e));
             }
         }
     }
 
     if let Err(e) = set_local_schema_version(version) {
-        return Err(Error::Aborted(e.to_string()));
+        return Err(Error::Aborted(e.into()));
     }
 
     Ok(())
diff --git a/src/rpc/enable_plugin.rs b/src/rpc/enable_plugin.rs
index 8a70fd6feb..07ed7bcb6d 100644
--- a/src/rpc/enable_plugin.rs
+++ b/src/rpc/enable_plugin.rs
@@ -1,9 +1,11 @@
 use crate::plugin::PluginOp;
 use crate::tlog;
 use crate::traft::error::Error;
+use crate::traft::error::ErrorInfo;
 use crate::traft::node;
 use crate::traft::{RaftIndex, RaftTerm};
 use std::time::Duration;
+use tarantool::error::IntoBoxError;
 
 crate::define_rpc_request! {
     /// Forces the target instance to actually enable the plugin locally.
@@ -37,7 +39,14 @@ crate::define_rpc_request! {
              Ok(()) => Ok(Response::Ok),
              Err(err) => {
                 tlog!(Warning, "plugin enabling aborted: {err}");
-                Ok(Response::Abort { reason: err.to_string() })
+
+                let instance_id = node.raft_storage.instance_id()?.expect("should be persisted before procs are defined");
+                let cause = ErrorInfo {
+                    error_code: err.error_code(),
+                    message: err.to_string(),
+                    instance_id,
+                };
+                Ok(Response::Abort { cause })
             }
         }
     }
@@ -53,6 +62,6 @@ crate::define_rpc_request! {
         Ok,
         /// Plugin loaded failed on this instance and should be aborted on the
         /// whole cluster.
-        Abort { reason: String },
+        Abort { cause: ErrorInfo },
     }
 }
diff --git a/src/traft/error.rs b/src/traft/error.rs
index 860bbe2514..fa1e134096 100644
--- a/src/traft/error.rs
+++ b/src/traft/error.rs
@@ -4,8 +4,8 @@ use crate::error_code::ErrorCode;
 use crate::instance::InstanceId;
 use crate::plugin::PluginError;
 use crate::traft::{RaftId, RaftTerm};
-use tarantool::error::BoxError;
 use tarantool::error::IntoBoxError;
+use tarantool::error::{BoxError, TarantoolErrorCode};
 use tarantool::fiber::r#async::timeout;
 use tarantool::tlua::LuaError;
 use thiserror::Error;
@@ -217,3 +217,49 @@ impl IntoBoxError for Error {
         }
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+// ErrorInfo
+////////////////////////////////////////////////////////////////////////////////
+
+/// This is a serializable version of [`BoxError`].
+///
+/// TODO<https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/221> just make BoxError serializable.
+#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
+pub struct ErrorInfo {
+    pub error_code: u32,
+    pub message: String,
+    pub instance_id: InstanceId,
+}
+
+impl ErrorInfo {
+    #[inline(always)]
+    pub fn timeout(instance_id: InstanceId, message: impl Into<String>) -> Self {
+        Self {
+            error_code: TarantoolErrorCode::Timeout as _,
+            message: message.into(),
+            instance_id,
+        }
+    }
+}
+
+impl std::fmt::Display for ErrorInfo {
+    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
+        write!(f, "[instance_id:{}] ", self.instance_id)?;
+        if let Some(c) = ErrorCode::from_i64(self.error_code as _) {
+            write!(f, "{c:?}")?;
+        } else if let Some(c) = TarantoolErrorCode::from_i64(self.error_code as _) {
+            write!(f, "{c:?}")?;
+        } else {
+            write!(f, "#{}", self.error_code)?;
+        }
+        write!(f, ": {}", self.message)
+    }
+}
+
+impl IntoBoxError for ErrorInfo {
+    #[inline]
+    fn error_code(&self) -> u32 {
+        self.error_code
+    }
+}
-- 
GitLab