Skip to content
Snippets Groups Projects
Commit 492bbb08 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: preparations for returning errors from governor

parent ebd291e7
No related branches found
No related tags found
1 merge request!1218return errors from governor to client
...@@ -28,6 +28,7 @@ use crate::storage::Clusterwide; ...@@ -28,6 +28,7 @@ use crate::storage::Clusterwide;
use crate::storage::ToEntryIter as _; use crate::storage::ToEntryIter as _;
use crate::tlog; use crate::tlog;
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::error::ErrorInfo;
use crate::traft::network::ConnectionPool; use crate::traft::network::ConnectionPool;
use crate::traft::node::global; use crate::traft::node::global;
use crate::traft::node::Status; use crate::traft::node::Status;
...@@ -178,13 +179,13 @@ impl Loop { ...@@ -178,13 +179,13 @@ impl Loop {
#[derive(Debug)] #[derive(Debug)]
enum OnError { enum OnError {
Retry(Error), Retry(Error),
Abort, Abort(ErrorInfo),
} }
impl From<OnError> for Error { impl From<OnError> for Error {
fn from(e: OnError) -> Error { fn from(e: OnError) -> Error {
match e { match e {
OnError::Retry(e) => e, OnError::Retry(e) => e,
OnError::Abort => Error::other("schema change was aborted"), OnError::Abort(_) => unreachable!("we never convert Abort to Error"),
} }
} }
} }
...@@ -457,11 +458,11 @@ impl Loop { ...@@ -457,11 +458,11 @@ impl Loop {
); );
Ok(()) Ok(())
} }
Ok(rpc::ddl_apply::Response::Abort { reason }) => { Ok(rpc::ddl_apply::Response::Abort { cause }) => {
tlog!(Error, "failed to apply schema change on instance: {reason}"; tlog!(Error, "failed to apply schema change on instance: {cause}";
"instance_id" => %instance_id, "instance_id" => %instance_id,
); );
Err(OnError::Abort) Err(OnError::Abort(cause))
} }
Err(e) => { Err(e) => {
tlog!(Warning, "failed calling proc_apply_schema_change: {e}"; tlog!(Warning, "failed calling proc_apply_schema_change: {e}";
...@@ -474,7 +475,7 @@ impl Loop { ...@@ -474,7 +475,7 @@ impl Loop {
} }
// TODO: don't hard code timeout // TODO: don't hard code timeout
let res = try_join_all(fs).timeout(Duration::from_secs(3)).await; let res = try_join_all(fs).timeout(Duration::from_secs(3)).await;
if let Err(TimeoutError::Failed(OnError::Abort)) = res { if let Err(TimeoutError::Failed(OnError::Abort(_cause))) = res {
next_op = Op::DdlAbort; next_op = Op::DdlAbort;
return Ok(()); return Ok(());
} }
...@@ -525,14 +526,14 @@ impl Loop { ...@@ -525,14 +526,14 @@ impl Loop {
tlog!(Error, "failed to call proc_load_plugin_dry_run: {e}"; tlog!(Error, "failed to call proc_load_plugin_dry_run: {e}";
"instance_id" => %instance_id "instance_id" => %instance_id
); );
Err(OnError::Abort) Err(e)
} }
} }
}); });
} }
if let Err(e) = try_join_all(fs).timeout(Duration::from_secs(5)).await { if let Err(e) = try_join_all(fs).timeout(Duration::from_secs(5)).await {
tlog!(Error, "Plugin installation aborted: {e:?}"); tlog!(Error, "Plugin installation aborted: {e}");
return Ok(()); return Ok(());
} }
...@@ -569,22 +570,20 @@ impl Loop { ...@@ -569,22 +570,20 @@ impl Loop {
fs.push(async move { fs.push(async move {
match resp.await { match resp.await {
Ok(rpc::enable_plugin::Response::Ok) => { Ok(rpc::enable_plugin::Response::Ok) => {
tlog!(Info, "load plugin on instance"; tlog!(Info, "enable plugin on instance"; "instance_id" => %instance_id);
"instance_id" => %instance_id,
);
Ok(()) Ok(())
} }
Ok(rpc::enable_plugin::Response::Abort { reason }) => { Ok(rpc::enable_plugin::Response::Abort { cause }) => {
tlog!(Error, "failed to load plugin at instance: {reason}"; tlog!(Error, "failed to enable plugin at instance: {cause}";
"instance_id" => %instance_id, "instance_id" => %instance_id,
); );
Err(OnError::Abort) Err(OnError::Abort(cause))
} }
Err(Error::Timeout) => { Err(Error::Timeout) => {
tlog!(Error, "failed to load plugin at instance: timeout"; tlog!(Error, "failed to enable plugin at instance: timeout";
"instance_id" => %instance_id, "instance_id" => %instance_id,
); );
Err(OnError::Abort) Err(OnError::Abort(ErrorInfo::timeout(instance_id.clone(), "failed to enable plugin")))
} }
Err(e) => { Err(e) => {
tlog!(Warning, "failed calling proc_load_plugin: {e}"; tlog!(Warning, "failed calling proc_load_plugin: {e}";
...@@ -597,7 +596,7 @@ impl Loop { ...@@ -597,7 +596,7 @@ impl Loop {
} }
let enable_result = try_join_all(fs).timeout(on_start_timeout.add(Duration::from_secs(1))).await; let enable_result = try_join_all(fs).timeout(on_start_timeout.add(Duration::from_secs(1))).await;
if let Err(TimeoutError::Failed(OnError::Abort)) = enable_result { if let Err(TimeoutError::Failed(OnError::Abort(_cause))) = enable_result {
next_op = Some(rollback_op); next_op = Some(rollback_op);
return Ok(()); return Ok(());
} }
......
...@@ -6,6 +6,7 @@ pub mod topology; ...@@ -6,6 +6,7 @@ pub mod topology;
use once_cell::unsync; use once_cell::unsync;
use picoplugin::background::ServiceId; use picoplugin::background::ServiceId;
use picoplugin::error_code::ErrorCode;
use picoplugin::plugin::interface::ServiceBox; use picoplugin::plugin::interface::ServiceBox;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::fmt::{Display, Formatter}; use std::fmt::{Display, Formatter};
...@@ -14,7 +15,7 @@ use std::io; ...@@ -14,7 +15,7 @@ use std::io;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::rc::Rc; use std::rc::Rc;
use std::time::Duration; use std::time::Duration;
use tarantool::error::BoxError; use tarantool::error::{BoxError, IntoBoxError};
use tarantool::fiber; use tarantool::fiber;
use tarantool::time::Instant; use tarantool::time::Instant;
...@@ -61,8 +62,6 @@ pub enum PluginError { ...@@ -61,8 +62,6 @@ pub enum PluginError {
ManifestNotFound(String, io::Error), ManifestNotFound(String, io::Error),
#[error("Error while parsing manifest `{0}`, reason: {1}")] #[error("Error while parsing manifest `{0}`, reason: {1}")]
InvalidManifest(String, Box<dyn std::error::Error>), InvalidManifest(String, Box<dyn std::error::Error>),
#[error("`{0}` service defenition not found")]
ServiceDefenitionNotFound(String),
#[error("Read plugin_dir: {0}")] #[error("Read plugin_dir: {0}")]
ReadPluginDir(#[from] io::Error), ReadPluginDir(#[from] io::Error),
#[error("Invalid shared object file: {0}")] #[error("Invalid shared object file: {0}")]
...@@ -99,6 +98,16 @@ pub enum PluginError { ...@@ -99,6 +98,16 @@ pub enum PluginError {
AmbiguousEnableCandidate, AmbiguousEnableCandidate,
} }
impl IntoBoxError for PluginError {
#[inline(always)]
fn error_code(&self) -> u32 {
match self {
Self::ServiceNotFound { .. } => ErrorCode::NoSuchService as _,
_ => ErrorCode::Other as _,
}
}
}
#[derive(thiserror::Error, Debug)] #[derive(thiserror::Error, Debug)]
pub enum PluginCallbackError { pub enum PluginCallbackError {
#[error("on_start: {0}")] #[error("on_start: {0}")]
......
...@@ -7,9 +7,11 @@ use crate::storage::{ddl_rename_function_on_master, Clusterwide}; ...@@ -7,9 +7,11 @@ use crate::storage::{ddl_rename_function_on_master, Clusterwide};
use crate::storage::{local_schema_version, set_local_schema_version}; use crate::storage::{local_schema_version, set_local_schema_version};
use crate::tlog; use crate::tlog;
use crate::traft::error::Error as TraftError; use crate::traft::error::Error as TraftError;
use crate::traft::error::ErrorInfo;
use crate::traft::node; use crate::traft::node;
use crate::traft::{RaftIndex, RaftTerm}; use crate::traft::{RaftIndex, RaftTerm};
use std::time::Duration; use std::time::Duration;
use tarantool::error::IntoBoxError;
use tarantool::error::{BoxError, TarantoolErrorCode}; use tarantool::error::{BoxError, TarantoolErrorCode};
use tarantool::transaction::{transaction, TransactionError}; use tarantool::transaction::{transaction, TransactionError};
...@@ -60,7 +62,14 @@ crate::define_rpc_request! { ...@@ -60,7 +62,14 @@ crate::define_rpc_request! {
Ok(()) => Ok(Response::Ok), Ok(()) => Ok(Response::Ok),
Err(TransactionError::RolledBack(Error::Aborted(err))) => { Err(TransactionError::RolledBack(Error::Aborted(err))) => {
tlog!(Warning, "schema change aborted: {err}"); tlog!(Warning, "schema change aborted: {err}");
Ok(Response::Abort { reason: err})
let instance_id = node.raft_storage.instance_id()?.expect("should be persisted before procs are defined");
let cause = ErrorInfo {
error_code: err.error_code(),
message: err.to_string(),
instance_id,
};
Ok(Response::Abort { cause })
} }
Err(err) => { Err(err) => {
tlog!(Warning, "applying schema change failed: {err}"); tlog!(Warning, "applying schema change failed: {err}");
...@@ -80,7 +89,7 @@ crate::define_rpc_request! { ...@@ -80,7 +89,7 @@ crate::define_rpc_request! {
Ok, Ok,
/// Schema change failed on this instance and should be aborted on the /// Schema change failed on this instance and should be aborted on the
/// whole cluster. /// whole cluster.
Abort { reason: String }, Abort { cause: ErrorInfo },
} }
} }
...@@ -89,7 +98,7 @@ pub enum Error { ...@@ -89,7 +98,7 @@ pub enum Error {
/// Schema change failed on this instance and should be aborted on the /// Schema change failed on this instance and should be aborted on the
/// whole cluster. /// whole cluster.
#[error("{0}")] #[error("{0}")]
Aborted(String), Aborted(TraftError),
#[error("{0}")] #[error("{0}")]
Other(TraftError), Other(TraftError),
...@@ -127,7 +136,7 @@ pub fn apply_schema_change( ...@@ -127,7 +136,7 @@ pub fn apply_schema_change(
tlog!(Error, "{}:{}: {e}", file, line); tlog!(Error, "{}:{}: {e}", file, line);
} }
} }
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e.into()));
} }
} }
...@@ -139,13 +148,13 @@ pub fn apply_schema_change( ...@@ -139,13 +148,13 @@ pub fn apply_schema_change(
let abort_reason = ddl_drop_space_on_master(id).map_err(Error::Other)?; let abort_reason = ddl_drop_space_on_master(id).map_err(Error::Other)?;
if let Some(e) = abort_reason { if let Some(e) = abort_reason {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e.into()));
} }
} }
Ddl::CreateProcedure { id, .. } => { Ddl::CreateProcedure { id, .. } => {
if let Err(e) = ddl_create_function_on_master(storage, id) { if let Err(e) = ddl_create_function_on_master(storage, id) {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e));
} }
} }
...@@ -156,7 +165,7 @@ pub fn apply_schema_change( ...@@ -156,7 +165,7 @@ pub fn apply_schema_change(
let abort_reason = ddl_drop_function_on_master(id).map_err(Error::Other)?; let abort_reason = ddl_drop_function_on_master(id).map_err(Error::Other)?;
if let Some(e) = abort_reason { if let Some(e) = abort_reason {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e.into()));
} }
} }
...@@ -166,7 +175,7 @@ pub fn apply_schema_change( ...@@ -166,7 +175,7 @@ pub fn apply_schema_change(
.. ..
} => { } => {
if let Err(e) = ddl_rename_function_on_master(storage, routine_id, new_name) { if let Err(e) = ddl_rename_function_on_master(storage, routine_id, new_name) {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e));
} }
} }
...@@ -174,7 +183,7 @@ pub fn apply_schema_change( ...@@ -174,7 +183,7 @@ pub fn apply_schema_change(
space_id, index_id, .. space_id, index_id, ..
} => { } => {
if let Err(e) = ddl_create_index_on_master(storage, space_id, index_id) { if let Err(e) = ddl_create_index_on_master(storage, space_id, index_id) {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e));
} }
} }
...@@ -186,13 +195,13 @@ pub fn apply_schema_change( ...@@ -186,13 +195,13 @@ pub fn apply_schema_change(
} }
if let Err(e) = ddl_drop_index_on_master(space_id, index_id) { if let Err(e) = ddl_drop_index_on_master(space_id, index_id) {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e));
} }
} }
} }
if let Err(e) = set_local_schema_version(version) { if let Err(e) = set_local_schema_version(version) {
return Err(Error::Aborted(e.to_string())); return Err(Error::Aborted(e.into()));
} }
Ok(()) Ok(())
......
use crate::plugin::PluginOp; use crate::plugin::PluginOp;
use crate::tlog; use crate::tlog;
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::error::ErrorInfo;
use crate::traft::node; use crate::traft::node;
use crate::traft::{RaftIndex, RaftTerm}; use crate::traft::{RaftIndex, RaftTerm};
use std::time::Duration; use std::time::Duration;
use tarantool::error::IntoBoxError;
crate::define_rpc_request! { crate::define_rpc_request! {
/// Forces the target instance to actually enable the plugin locally. /// Forces the target instance to actually enable the plugin locally.
...@@ -37,7 +39,14 @@ crate::define_rpc_request! { ...@@ -37,7 +39,14 @@ crate::define_rpc_request! {
Ok(()) => Ok(Response::Ok), Ok(()) => Ok(Response::Ok),
Err(err) => { Err(err) => {
tlog!(Warning, "plugin enabling aborted: {err}"); tlog!(Warning, "plugin enabling aborted: {err}");
Ok(Response::Abort { reason: err.to_string() })
let instance_id = node.raft_storage.instance_id()?.expect("should be persisted before procs are defined");
let cause = ErrorInfo {
error_code: err.error_code(),
message: err.to_string(),
instance_id,
};
Ok(Response::Abort { cause })
} }
} }
} }
...@@ -53,6 +62,6 @@ crate::define_rpc_request! { ...@@ -53,6 +62,6 @@ crate::define_rpc_request! {
Ok, Ok,
/// Plugin loaded failed on this instance and should be aborted on the /// Plugin loaded failed on this instance and should be aborted on the
/// whole cluster. /// whole cluster.
Abort { reason: String }, Abort { cause: ErrorInfo },
} }
} }
...@@ -4,8 +4,8 @@ use crate::error_code::ErrorCode; ...@@ -4,8 +4,8 @@ use crate::error_code::ErrorCode;
use crate::instance::InstanceId; use crate::instance::InstanceId;
use crate::plugin::PluginError; use crate::plugin::PluginError;
use crate::traft::{RaftId, RaftTerm}; use crate::traft::{RaftId, RaftTerm};
use tarantool::error::BoxError;
use tarantool::error::IntoBoxError; use tarantool::error::IntoBoxError;
use tarantool::error::{BoxError, TarantoolErrorCode};
use tarantool::fiber::r#async::timeout; use tarantool::fiber::r#async::timeout;
use tarantool::tlua::LuaError; use tarantool::tlua::LuaError;
use thiserror::Error; use thiserror::Error;
...@@ -217,3 +217,49 @@ impl IntoBoxError for Error { ...@@ -217,3 +217,49 @@ impl IntoBoxError for Error {
} }
} }
} }
////////////////////////////////////////////////////////////////////////////////
// ErrorInfo
////////////////////////////////////////////////////////////////////////////////
/// This is a serializable version of [`BoxError`].
///
/// TODO<https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/221> just make BoxError serializable.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq)]
pub struct ErrorInfo {
pub error_code: u32,
pub message: String,
pub instance_id: InstanceId,
}
impl ErrorInfo {
#[inline(always)]
pub fn timeout(instance_id: InstanceId, message: impl Into<String>) -> Self {
Self {
error_code: TarantoolErrorCode::Timeout as _,
message: message.into(),
instance_id,
}
}
}
impl std::fmt::Display for ErrorInfo {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[instance_id:{}] ", self.instance_id)?;
if let Some(c) = ErrorCode::from_i64(self.error_code as _) {
write!(f, "{c:?}")?;
} else if let Some(c) = TarantoolErrorCode::from_i64(self.error_code as _) {
write!(f, "{c:?}")?;
} else {
write!(f, "#{}", self.error_code)?;
}
write!(f, ": {}", self.message)
}
}
impl IntoBoxError for ErrorInfo {
#[inline]
fn error_code(&self) -> u32 {
self.error_code
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment