diff --git a/src/rpc/ddl_apply.rs b/src/rpc/ddl_apply.rs index 6300904ed7e65795ea05f9d7e6a51b2d783871f5..629525a8a9b38f0eb8644b9b3b5a0bd55bd039c2 100644 --- a/src/rpc/ddl_apply.rs +++ b/src/rpc/ddl_apply.rs @@ -3,13 +3,12 @@ use crate::storage::Clusterwide; use crate::storage::{ddl_create_space_on_master, ddl_drop_space_on_master}; use crate::storage::{local_schema_version, set_local_schema_version}; use crate::tlog; -use crate::traft::error::Error; +use crate::traft::error::Error as TraftError; use crate::traft::node; -use crate::traft::Result; use crate::traft::{RaftIndex, RaftTerm}; use std::time::Duration; -use tarantool::error::{TarantoolError, TarantoolErrorCode}; -use tarantool::ffi::tarantool as ffi; +use tarantool::error::TarantoolErrorCode; +use tarantool::transaction::{transaction, TransactionError}; crate::define_rpc_request! { /// Forces the target instance to actually apply the pending schema change locally. @@ -24,14 +23,15 @@ crate::define_rpc_request! { /// 4. Request has an incorrect term - leader changed /// 5. The procedure was called on a read_only instance /// 6. Failed to apply the schema change - fn proc_apply_schema_change(req: Request) -> Result<Response> { + fn proc_apply_schema_change(req: Request) -> crate::traft::Result<Response> { let node = node::global()?; node.wait_index(req.applied, req.timeout)?; node.status().check_term(req.term)?; let storage = &node.storage; - let pending_schema_version = storage.properties.pending_schema_version()?.ok_or_else(|| Error::other("pending schema version not found"))?; + let pending_schema_version = storage.properties.pending_schema_version()? + .ok_or_else(|| TraftError::other("pending schema version not found"))?; // Already applied. if local_schema_version()? >= pending_schema_version { return Ok(Response::Ok); @@ -45,35 +45,25 @@ crate::define_rpc_request! { return Err(e.into()); } - let ddl = storage.properties.pending_schema_change()?.ok_or_else(|| Error::other("pending schema change not found"))?; + let ddl = storage.properties.pending_schema_change()? + .ok_or_else(|| TraftError::other("pending schema change not found"))?; - // FIXME: start_transaction api is awful, it would be too ugly to - // use here in the state it's currently in - let rc = unsafe { ffi::box_txn_begin() }; - assert_eq!(rc, 0, "we're not in a transaction currently"); // TODO: transaction may have already started, if we're in a process of // creating a big index. If governor sends a repeat rpc request to us we // should handle this correctly - let res = apply_schema_change(storage, &ddl, pending_schema_version, false); + let res = transaction(|| apply_schema_change(storage, &ddl, pending_schema_version, false)); match res { - Ok(Response::Abort { .. }) | Err(_) => { - let rc = unsafe { ffi::box_txn_rollback() }; - if rc != 0 { - let e = TarantoolError::last(); - tlog!(Warning, "failed to rollback transaction: {e}"); - } + Ok(()) => Ok(Response::Ok), + Err(TransactionError::RolledBack(Error::Aborted(err))) => { + tlog!(Warning, "schema change aborted: {err}"); + Ok(Response::Abort { reason: err}) } - Ok(Response::Ok) => { - let rc = unsafe { ffi::box_txn_commit() }; - if rc != 0 { - let e = TarantoolError::last(); - tlog!(Warning, "failed to commit transaction: {e}"); - } + Err(err) => { + tlog!(Warning, "applying schema change failed: {err}"); + Err(err.into()) } } - - res } pub struct Request { @@ -91,6 +81,17 @@ crate::define_rpc_request! { } } +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// Schema change failed on this instance and should be aborted on the + /// whole cluster. + #[error("{0}")] + Aborted(String), + + #[error("{0}")] + Other(TraftError), +} + /// Applies the schema change described by `ddl` to the tarantool storage. This /// function is only called on replicaset masters, other replicas get the /// changes via tarantool replication. @@ -111,34 +112,26 @@ pub fn apply_schema_change( ddl: &Ddl, version: u64, is_commit: bool, -) -> Result<Response> { +) -> Result<(), Error> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); match *ddl { Ddl::CreateSpace { id, .. } => { - let abort_reason = ddl_create_space_on_master(storage, id)?; + let abort_reason = ddl_create_space_on_master(storage, id).map_err(Error::Other)?; if let Some(e) = abort_reason { - // We return Ok(error) because currently this is the only - // way to report an application level error. - return Ok(Response::Abort { - reason: e.to_string(), - }); + return Err(Error::Aborted(e.to_string())); } } Ddl::DropSpace { id } => { if !is_commit { // Space is only dropped on commit. - return Ok(Response::Ok); + return Ok(()); } - let abort_reason = ddl_drop_space_on_master(id)?; + let abort_reason = ddl_drop_space_on_master(id).map_err(Error::Other)?; if let Some(e) = abort_reason { - // We return Ok(error) because currently this is the only - // way to report an application level error. - return Ok(Response::Abort { - reason: e.to_string(), - }); + return Err(Error::Aborted(e.to_string())); } } @@ -148,12 +141,8 @@ pub fn apply_schema_change( } if let Err(e) = set_local_schema_version(version) { - // We return Ok(error) because currently this is the only - // way to report an application level error. - return Ok(Response::Abort { - reason: e.to_string(), - }); + return Err(Error::Aborted(e.to_string())); } - Ok(Response::Ok) + Ok(()) } diff --git a/src/traft/node.rs b/src/traft/node.rs index d4c36872e66dd831e55e935dd6f83563e48437b7..31b9367f64dbb93cbb269a64bc49f86a5782f9fe 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -870,17 +870,17 @@ impl NodeImpl { schema_version, } => { self.apply_op_ddl_prepare(ddl, schema_version) - .expect("storage error"); + .expect("storage should not fail"); } Op::DdlCommit => { - let v_local = local_schema_version().expect("storage error"); + let v_local = local_schema_version().expect("storage should not fail"); let v_pending = storage_properties .pending_schema_version() - .expect("storage error") + .expect("storage should not fail") .expect("granted we don't mess up log compaction, this should not be None"); let ddl = storage_properties .pending_schema_change() - .expect("storage error") + .expect("storage should not fail") .expect("granted we don't mess up log compaction, this should not be None"); // This instance is catching up to the cluster. @@ -889,21 +889,23 @@ impl NodeImpl { return SleepAndRetry; } else { // Master applies schema change at this point. - let resp = rpc::ddl_apply::apply_schema_change( + let res = rpc::ddl_apply::apply_schema_change( &self.storage, &ddl, v_pending, true, - ) - .expect("storage error"); - match resp { - rpc::ddl_apply::Response::Abort { reason } => { + ); + match res { + Err(rpc::ddl_apply::Error::Other(err)) => { + panic!("storage should not fail, but failed with: {err}") + } + Err(rpc::ddl_apply::Error::Aborted(reason)) => { tlog!(Warning, "failed applying committed ddl operation: {reason}"; "ddl" => ?ddl, ); return SleepAndRetry; } - rpc::ddl_apply::Response::Ok => {} + Ok(()) => {} } } } @@ -926,23 +928,23 @@ impl NodeImpl { storage_properties .delete(PropertyName::PendingSchemaChange) - .expect("storage error"); + .expect("storage should not fail"); storage_properties .delete(PropertyName::PendingSchemaVersion) - .expect("storage error"); + .expect("storage should not fail"); storage_properties .put(PropertyName::GlobalSchemaVersion, &v_pending) - .expect("storage error"); + .expect("storage should not fail"); } Op::DdlAbort => { - let v_local = local_schema_version().expect("storage error"); + let v_local = local_schema_version().expect("storage should not fail"); let v_pending: u64 = storage_properties .pending_schema_version() - .expect("storage error") + .expect("storage should not fail") .expect("granted we don't mess up log compaction, this should not be None"); let ddl = storage_properties .pending_schema_change() - .expect("storage error") + .expect("storage should not fail") .expect("granted we don't mess up log compaction, this should not be None"); // This condition means, schema versions must always increase // even after an DdlAbort @@ -952,8 +954,8 @@ impl NodeImpl { } else { let v_global = storage_properties .global_schema_version() - .expect("storage error"); - ddl_abort_on_master(&ddl, v_global).expect("storage error"); + .expect("storage should not fail"); + ddl_abort_on_master(&ddl, v_global).expect("storage should not fail"); } } @@ -975,14 +977,14 @@ impl NodeImpl { storage_properties .delete(PropertyName::PendingSchemaChange) - .expect("storage error"); + .expect("storage should not fail"); storage_properties .delete(PropertyName::PendingSchemaVersion) - .expect("storage error"); + .expect("storage should not fail"); } Op::Acl(acl) => { - let v_local = local_schema_version().expect("storage error"); + let v_local = local_schema_version().expect("storage shoudl not fail"); let v_pending = acl.schema_version(); if v_local < v_pending { if self.is_readonly() { @@ -1019,7 +1021,7 @@ impl NodeImpl { .expect("revoking a privilege shouldn't fail"); } } - set_local_schema_version(v_pending).expect("storage error"); + set_local_schema_version(v_pending).expect("storage should not fail"); } } @@ -1056,10 +1058,10 @@ impl NodeImpl { storage_properties .put(PropertyName::GlobalSchemaVersion, &v_pending) - .expect("storage error"); + .expect("storage should not fail"); storage_properties .put(PropertyName::NextSchemaVersion, &(v_pending + 1)) - .expect("storage error"); + .expect("storage should not fail"); } }