diff --git a/src/governor/plan.rs b/src/governor/plan.rs index 2786731aab5734280467137ae2a855170e4ada10..d233993ec6c073cd7aa5a783db2b760592312740 100644 --- a/src/governor/plan.rs +++ b/src/governor/plan.rs @@ -5,9 +5,9 @@ use crate::replicaset::weight; use crate::replicaset::{Replicaset, ReplicasetId}; use crate::storage::{ClusterwideSpace, PropertyName}; use crate::tlog; +use crate::traft::op::Dml; use crate::traft::rpc; use crate::traft::rpc::{replication, sharding, sync, update_instance}; -use crate::traft::OpDML; use crate::traft::Result; use crate::traft::{RaftId, RaftIndex, RaftTerm}; use ::tarantool::space::UpdateOps; @@ -88,7 +88,7 @@ pub(super) fn action_plan<'i>( }; let mut ops = UpdateOps::new(); ops.assign("master_id", &to.instance_id)?; - let op = OpDML::update(ClusterwideSpace::Replicaset, &[&to.replicaset_id], ops)?; + let op = Dml::update(ClusterwideSpace::Replicaset, &[&to.replicaset_id], ops)?; return Ok(TransferMastership { to, rpc, op }.into()); } else { tlog!(Warning, "replicaset master is going offline and no substitution is found"; @@ -157,7 +157,7 @@ pub(super) fn action_plan<'i>( commit, timeout: Loop::SYNC_TIMEOUT, }; - let op = OpDML::insert( + let op = Dml::insert( ClusterwideSpace::Replicaset, &Replicaset { replicaset_id: replicaset_id.clone(), @@ -247,7 +247,7 @@ pub(super) fn action_plan<'i>( commit, timeout: Loop::SYNC_TIMEOUT, }; - let op = OpDML::replace( + let op = Dml::replace( ClusterwideSpace::Property, &(PropertyName::VshardBootstrapped, true), )?; @@ -269,7 +269,7 @@ pub(super) fn action_plan<'i>( weight::State::UpToDate }; uops.assign(weight::State::PATH, state)?; - let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?; + let op = Dml::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?; return Ok(ProposeWeightChanges { op }.into()); } @@ -312,7 +312,7 @@ pub(super) fn action_plan<'i>( for replicaset_id in to_update_weights { let mut uops = UpdateOps::new(); uops.assign(weight::State::PATH, weight::State::UpToDate)?; - let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?; + let op = Dml::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?; ops.push(op); } return Ok(UpdateWeights { targets, rpc, ops }.into()); @@ -347,7 +347,7 @@ pub(super) fn action_plan<'i>( }; let mut ops = UpdateOps::new(); ops.assign("current_schema_version", migration_id)?; - let op = OpDML::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?; + let op = Dml::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?; return Ok(ApplyMigration { target, rpc, op }.into()); } @@ -400,7 +400,7 @@ pub mod stage { pub struct TransferMastership<'i> { pub to: &'i Instance, pub rpc: replication::promote::Request, - pub op: OpDML, + pub op: Dml, } pub struct ReconfigureShardingAndDowngrade<'i> { @@ -419,7 +419,7 @@ pub mod stage { pub master_id: &'i InstanceId, pub replicaset_id: &'i ReplicasetId, pub rpc: replication::promote::Request, - pub op: OpDML, + pub op: Dml, } pub struct Replication<'i> { @@ -441,17 +441,17 @@ pub mod stage { pub struct ShardingBoot<'i> { pub target: &'i InstanceId, pub rpc: sharding::bootstrap::Request, - pub op: OpDML, + pub op: Dml, } pub struct ProposeWeightChanges { - pub op: OpDML, + pub op: Dml, } pub struct UpdateWeights<'i> { pub targets: Vec<&'i InstanceId>, pub rpc: sharding::Request, - pub ops: Vec<OpDML>, + pub ops: Vec<Dml>, } pub struct ToOnline { @@ -461,7 +461,7 @@ pub mod stage { pub struct ApplyMigration<'i> { pub target: &'i Replicaset, pub rpc: rpc::migration::apply::Request, - pub op: OpDML, + pub op: Dml, } } } diff --git a/src/main.rs b/src/main.rs index 419ed19e5cdc9c7873c082c5f0ddce03ec0962e9..05a0baf3d36fa5be0e2a7497dd76baa11d287417 100644 --- a/src/main.rs +++ b/src/main.rs @@ -27,7 +27,8 @@ use crate::instance::grade::TargetGradeVariant; use crate::instance::InstanceId; use crate::tlog::set_log_level; use crate::traft::event::Event; -use crate::traft::{event, node, Migration, OpDML}; +use crate::traft::op::{self, Op}; +use crate::traft::{event, node, Migration}; use crate::traft::{LogicalClock, RaftIndex}; use traft::error::Error; @@ -134,14 +135,13 @@ fn picolib_setup(args: &args::Run) { luamod.set( "raft_propose_nop", tlua::function0(|| { - traft::node::global()?.propose_and_wait(traft::Op::Nop, Duration::from_secs(1)) + traft::node::global()?.propose_and_wait(Op::Nop, Duration::from_secs(1)) }), ); luamod.set( "raft_propose_info", tlua::function1(|x: String| -> traft::Result<()> { - traft::node::global()? - .propose_and_wait(traft::Op::Info { msg: x }, Duration::from_secs(1)) + traft::node::global()?.propose_and_wait(Op::Info { msg: x }, Duration::from_secs(1)) }), ); luamod.set( @@ -185,10 +185,7 @@ fn picolib_setup(args: &args::Run) { |x: String, opts: Option<ProposeEvalOpts>| -> traft::Result<()> { let timeout = opts.and_then(|opts| opts.timeout).unwrap_or(10.0); traft::node::global()? - .propose_and_wait( - traft::OpEvalLua { code: x }, - Duration::from_secs_f64(timeout), - ) + .propose_and_wait(op::EvalLua { code: x }, Duration::from_secs_f64(timeout)) .and_then(|res| res.map_err(Into::into)) }, ), @@ -196,8 +193,7 @@ fn picolib_setup(args: &args::Run) { luamod.set( "raft_return_one", tlua::function1(|timeout: f64| -> traft::Result<u8> { - traft::node::global()? - .propose_and_wait(traft::OpReturnOne, Duration::from_secs_f64(timeout)) + traft::node::global()?.propose_and_wait(op::ReturnOne, Duration::from_secs_f64(timeout)) }), ); // TODO: remove this @@ -383,7 +379,7 @@ fn picolib_setup(args: &args::Run) { "add_migration", tlua::function2(|id: u64, body: String| -> traft::Result<()> { let migration = Migration { id, body }; - let op = OpDML::insert(ClusterwideSpace::Migration, &migration)?; + let op = op::Dml::insert(ClusterwideSpace::Migration, &migration)?; node::global()?.propose_and_wait(op, Duration::MAX)??; Ok(()) }), @@ -392,7 +388,7 @@ fn picolib_setup(args: &args::Run) { luamod.set( "push_schema_version", tlua::function1(|id: u64| -> traft::Result<()> { - let op = OpDML::replace( + let op = op::Dml::replace( ClusterwideSpace::Property, &(PropertyName::DesiredSchemaVersion, id), )?; @@ -417,7 +413,7 @@ fn picolib_setup(args: &args::Run) { return Ok(Some(current_version)); } - let op = OpDML::replace( + let op = op::Dml::replace( ClusterwideSpace::Property, &(PropertyName::DesiredSchemaVersion, target_version), )?; @@ -839,16 +835,16 @@ fn start_boot(args: &args::Run) { }; init_entries_push_op( - traft::OpDML::insert( + op::Dml::insert( ClusterwideSpace::Address, &traft::PeerAddress { raft_id, address }, ) .expect("cannot fail") .into(), ); - init_entries_push_op(traft::OpPersistInstance::new(instance).into()); + init_entries_push_op(traft::op::PersistInstance::new(instance).into()); init_entries_push_op( - OpDML::insert( + op::Dml::insert( ClusterwideSpace::Property, &( PropertyName::ReplicationFactor, @@ -859,7 +855,7 @@ fn start_boot(args: &args::Run) { .into(), ); init_entries_push_op( - OpDML::insert( + op::Dml::insert( ClusterwideSpace::Property, &(PropertyName::DesiredSchemaVersion, 0), ) diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 2fafdb807e0061f0f528d2eae9ddffbc13dff4f3..344a9b5bc1284f56935db074861b5022067725be 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -5,18 +5,17 @@ pub mod event; pub(crate) mod network; pub mod node; pub mod notify; +pub mod op; pub(crate) mod raft_storage; pub mod rpc; pub mod topology; use crate::instance::Instance; -use crate::storage; -use crate::storage::ClusterwideSpace; use crate::stringify_debug; -use crate::util::{AnyWithTypeName, Uppercase}; +use crate::util::Uppercase; use ::raft::prelude as raft; -use ::tarantool::tlua::LuaError; -use ::tarantool::tuple::{Encode, ToTupleBuffer, Tuple, TupleBuffer}; +use ::tarantool::tuple::Encode; +use op::Op; use serde::de::DeserializeOwned; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -72,315 +71,6 @@ impl std::fmt::Display for LogicalClock { } } -////////////////////////////////////////////////////////////////////////////////////////// -/// The operation on the raft state machine. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -#[serde(rename_all = "snake_case")] -#[serde(tag = "kind")] -pub enum Op { - /// No operation. - Nop, - /// Print the message in tarantool log. - Info { msg: String }, - /// Evaluate the code on every instance in cluster. - EvalLua(OpEvalLua), - /// - ReturnOne(OpReturnOne), - /// Update the given instance's entry in [`storage::Instances`]. - PersistInstance(OpPersistInstance), - /// Cluster-wide data modification operation. - /// Should be used to manipulate the cluster-wide configuration. - Dml(OpDML), -} - -impl std::fmt::Display for Op { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - return match self { - Self::Nop => f.write_str("Nop"), - Self::Info { msg } => write!(f, "Info({msg:?})"), - Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), - Self::ReturnOne(_) => write!(f, "ReturnOne"), - Self::PersistInstance(OpPersistInstance(instance)) => { - write!(f, "PersistInstance{}", instance) - } - Self::Dml(OpDML::Insert { space, tuple }) => { - write!(f, "Insert({space}, {})", DisplayAsJson(tuple)) - } - Self::Dml(OpDML::Replace { space, tuple }) => { - write!(f, "Replace({space}, {})", DisplayAsJson(tuple)) - } - Self::Dml(OpDML::Update { space, key, ops }) => { - let key = DisplayAsJson(key); - let ops = DisplayAsJson(&**ops); - write!(f, "Update({space}, {key}, {ops})") - } - Self::Dml(OpDML::Delete { space, key }) => { - write!(f, "Delete({space}, {})", DisplayAsJson(key)) - } - }; - - struct DisplayAsJson<T>(pub T); - - impl std::fmt::Display for DisplayAsJson<&TupleBuffer> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - if let Some(data) = rmp_serde::from_slice::<serde_json::Value>(self.0.as_ref()) - .ok() - .and_then(|v| serde_json::to_string(&v).ok()) - { - return write!(f, "{data}"); - } - - write!(f, "{:?}", self.0) - } - } - - impl std::fmt::Display for DisplayAsJson<&[TupleBuffer]> { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "[")?; - if let Some(elem) = self.0.first() { - write!(f, "{}", DisplayAsJson(elem))?; - } - for elem in self.0.iter().skip(1) { - write!(f, ", {}", DisplayAsJson(elem))?; - } - write!(f, "]") - } - } - } -} - -impl Op { - pub fn on_commit(self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> { - match self { - Self::Nop => Box::new(()), - Self::Info { msg } => { - crate::tlog!(Info, "{msg}"); - Box::new(()) - } - Self::EvalLua(op) => Box::new(op.result()), - Self::ReturnOne(op) => Box::new(op.result()), - Self::PersistInstance(op) => { - let instance = op.result(); - instances.put(&instance).unwrap(); - instance - } - Self::Dml(op) => Box::new(op.result()), - } - } -} - -impl OpResult for Op { - type Result = (); - fn result(self) -> Self::Result {} -} - -impl From<OpReturnOne> for Op { - fn from(op: OpReturnOne) -> Op { - Op::ReturnOne(op) - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct OpReturnOne; - -impl OpResult for OpReturnOne { - type Result = u8; - fn result(self) -> Self::Result { - 1 - } -} - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct OpEvalLua { - pub code: String, -} - -impl OpResult for OpEvalLua { - type Result = StdResult<(), LuaError>; - fn result(self) -> Self::Result { - crate::tarantool::exec(&self.code) - } -} - -impl From<OpEvalLua> for Op { - fn from(op: OpEvalLua) -> Op { - Op::EvalLua(op) - } -} - -pub trait OpResult { - type Result: 'static; - // FIXME: this signature makes it look like result of any operation depends - // only on what is contained within the operation which is almost never true - // And it makes it hard to do anything useful inside this function. - fn result(self) -> Self::Result; -} - -//////////////////////////////////////////////////////////////////////////////// -// OpPersistInstance -//////////////////////////////////////////////////////////////////////////////// - -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub struct OpPersistInstance(pub Box<Instance>); - -impl OpPersistInstance { - pub fn new(instance: Instance) -> Self { - Self(Box::new(instance)) - } -} - -impl OpResult for OpPersistInstance { - type Result = Box<Instance>; - fn result(self) -> Self::Result { - self.0 - } -} - -impl From<OpPersistInstance> for Op { - #[inline] - fn from(op: OpPersistInstance) -> Op { - Op::PersistInstance(op) - } -} - -////////////////////////////////////////////////////////////////////////////////////////// -// OpDML - -/// Cluster-wide data modification operation. -#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] -pub enum OpDML { - Insert { - space: ClusterwideSpace, - #[serde(with = "serde_bytes")] - tuple: TupleBuffer, - }, - Replace { - space: ClusterwideSpace, - #[serde(with = "serde_bytes")] - tuple: TupleBuffer, - }, - Update { - space: ClusterwideSpace, - #[serde(with = "serde_bytes")] - key: TupleBuffer, - #[serde(with = "vec_of_raw_byte_buf")] - ops: Vec<TupleBuffer>, - }, - Delete { - space: ClusterwideSpace, - #[serde(with = "serde_bytes")] - key: TupleBuffer, - }, -} - -impl OpResult for OpDML { - type Result = tarantool::Result<Option<Tuple>>; - fn result(self) -> Self::Result { - match self { - Self::Insert { space, tuple } => space.insert(&tuple).map(Some), - Self::Replace { space, tuple } => space.replace(&tuple).map(Some), - Self::Update { space, key, ops } => space.update(&key, &ops), - Self::Delete { space, key } => space.delete(&key), - } - } -} - -impl From<OpDML> for Op { - fn from(op: OpDML) -> Op { - Op::Dml(op) - } -} - -impl OpDML { - /// Serializes `tuple` and returns an [`OpDML::Insert`] in case of success. - pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { - let res = Self::Insert { - space, - tuple: tuple.to_tuple_buffer()?, - }; - Ok(res) - } - - /// Serializes `tuple` and returns an [`OpDML::Replace`] in case of success. - pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { - let res = Self::Replace { - space, - tuple: tuple.to_tuple_buffer()?, - }; - Ok(res) - } - - /// Serializes `key` and returns an [`OpDML::Update`] in case of success. - pub fn update( - space: ClusterwideSpace, - key: &impl ToTupleBuffer, - ops: impl Into<Vec<TupleBuffer>>, - ) -> tarantool::Result<Self> { - let res = Self::Update { - space, - key: key.to_tuple_buffer()?, - ops: ops.into(), - }; - Ok(res) - } - - /// Serializes `key` and returns an [`OpDML::Delete`] in case of success. - pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { - let res = Self::Delete { - space, - key: key.to_tuple_buffer()?, - }; - Ok(res) - } - - #[rustfmt::skip] - pub fn space(&self) -> &ClusterwideSpace { - match &self { - Self::Insert { space, .. } => space, - Self::Replace { space, .. } => space, - Self::Update { space, .. } => space, - Self::Delete { space, .. } => space, - } - } -} - -mod vec_of_raw_byte_buf { - use super::TupleBuffer; - use serde::de::Error as _; - use serde::ser::SerializeSeq; - use serde::{self, Deserialize, Deserializer, Serializer}; - use serde_bytes::{ByteBuf, Bytes}; - use std::convert::TryFrom; - - pub fn serialize<S>(v: &[TupleBuffer], ser: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - let mut seq = ser.serialize_seq(Some(v.len()))?; - for buf in v { - seq.serialize_element(Bytes::new(buf.as_ref()))?; - } - seq.end() - } - - pub fn deserialize<'de, D>(de: D) -> Result<Vec<TupleBuffer>, D::Error> - where - D: Deserializer<'de>, - { - let tmp = Vec::<ByteBuf>::deserialize(de)?; - // FIXME(gmoshkin): redundant copy happens here, - // because ByteBuf and TupleBuffer are essentially the same struct, - // but there's no easy foolproof way - // to convert a Vec<ByteBuf> to Vec<TupleBuffer> - // because of borrow and drop checkers - let res: tarantool::Result<_> = tmp - .into_iter() - .map(|bb| TupleBuffer::try_from(bb.into_vec())) - .collect(); - res.map_err(D::Error::custom) - } -} - ////////////////////////////////////////////////////////////////////////////////////////// /// Serializable struct representing an address of a member of raft group #[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] diff --git a/src/traft/node.rs b/src/traft/node.rs index a512e23dd62531139e9d2a2ba4de8936836529ff..ef01f7c8764ea803a01dc6fe9b5ec446ae46f380 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -5,27 +5,8 @@ //! - handling configuration changes, //! - processing raft `Ready` - persisting entries, communicating with other raft nodes. -use ::raft::prelude as raft; -use ::raft::Error as RaftError; -use ::raft::StateRole as RaftStateRole; -use ::raft::StorageError; -use ::raft::INVALID_ID; -use ::tarantool::error::{TarantoolError, TransactionError}; -use ::tarantool::fiber; -use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; -use ::tarantool::fiber::r#async::{oneshot, watch}; -use ::tarantool::fiber::Mutex; -use ::tarantool::proc; -use ::tarantool::tlua; -use ::tarantool::transaction::start_transaction; -use std::cell::Cell; -use std::collections::{HashMap, HashSet}; -use std::convert::TryFrom; -use std::rc::Rc; -use std::time::Duration; -use std::time::Instant; - use crate::governor; +use crate::has_grades; use crate::instance::Instance; use crate::kvcell::KVCell; use crate::loop_start; @@ -33,30 +14,45 @@ use crate::r#loop::FlowControl; use crate::storage::ToEntryIter as _; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::stringify_cfunc; -use crate::traft::ContextCoercion as _; -use crate::traft::RaftId; -use crate::traft::RaftIndex; -use crate::traft::RaftTerm; -use crate::traft::{OpDML, OpPersistInstance}; -use crate::unwrap_some_or; -use crate::warn_or_panic; -use protobuf::Message as _; - -use crate::has_grades; use crate::tlog; use crate::traft; use crate::traft::error::Error; use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; +use crate::traft::op::{Dml, Op, OpResult, PersistInstance}; use crate::traft::rpc::{join, update_instance}; use crate::traft::Address; use crate::traft::ConnectionPool; +use crate::traft::ContextCoercion as _; use crate::traft::LogicalClock; -use crate::traft::Op; -use crate::traft::OpResult; +use crate::traft::RaftId; +use crate::traft::RaftIndex; use crate::traft::RaftSpaceAccess; +use crate::traft::RaftTerm; use crate::traft::Topology; +use crate::unwrap_some_or; +use crate::warn_or_panic; +use ::raft::prelude as raft; +use ::raft::Error as RaftError; +use ::raft::StateRole as RaftStateRole; +use ::raft::StorageError; +use ::raft::INVALID_ID; +use ::tarantool::error::{TarantoolError, TransactionError}; +use ::tarantool::fiber; +use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; +use ::tarantool::fiber::r#async::{oneshot, watch}; +use ::tarantool::fiber::Mutex; +use ::tarantool::proc; +use ::tarantool::tlua; +use ::tarantool::transaction::start_transaction; +use protobuf::Message as _; +use std::cell::Cell; +use std::collections::{HashMap, HashSet}; +use std::convert::TryFrom; +use std::rc::Rc; +use std::time::Duration; +use std::time::Instant; type RawNode = raft::RawNode<RaftSpaceAccess>; @@ -188,7 +184,7 @@ impl Node { /// Propose an operation and wait for it's result. /// **This function yields** - pub fn propose_and_wait<T: OpResult + Into<traft::Op>>( + pub fn propose_and_wait<T: OpResult + Into<Op>>( &self, op: T, timeout: Duration, @@ -443,7 +439,7 @@ impl NodeImpl { #[inline] pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError> where - T: Into<traft::Op>, + T: Into<Op>, { let (lc, notify) = self.schedule_notification(); let ctx = traft::EntryContextNormal::new(lc, op.into()); @@ -494,8 +490,8 @@ impl NodeImpl { raft_id: instance.raft_id, address, }; - let op_addr = OpDML::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail"); - let op_instance = OpPersistInstance::new(instance); + let op_addr = Dml::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail"); + let op_instance = PersistInstance::new(instance); // Important! Calling `raw_node.propose()` may result in // `ProposalDropped` error, but the topology has already been // modified. The correct handling of this case should be the @@ -546,7 +542,7 @@ impl NodeImpl { // harmful. Loss of the uncommitted entries could result in // assigning the same `raft_id` to a two different nodes. // - Ok(self.propose_async(OpPersistInstance::new(instance))?) + Ok(self.propose_async(PersistInstance::new(instance))?) } fn propose_conf_change_async( @@ -651,17 +647,17 @@ impl NodeImpl { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); let lc = entry.lc(); let index = entry.index; - let op = entry.into_op().unwrap_or(traft::Op::Nop); + let op = entry.into_op().unwrap_or(Op::Nop); match &op { - traft::Op::PersistInstance(OpPersistInstance(instance)) => { + Op::PersistInstance(PersistInstance(instance)) => { *wake_governor = true; if has_grades!(instance, Expelled -> *) && instance.raft_id == self.raft_id() { // cannot exit during a transaction *expelled = true; } } - traft::Op::Dml(op) + Op::Dml(op) if matches!( op.space(), ClusterwideSpace::Property | ClusterwideSpace::Replicaset diff --git a/src/traft/op.rs b/src/traft/op.rs new file mode 100644 index 0000000000000000000000000000000000000000..9a861cb19233a89ac81a24a0b6517e518594bde4 --- /dev/null +++ b/src/traft/op.rs @@ -0,0 +1,329 @@ +use crate::instance::Instance; +use crate::storage; +use crate::storage::ClusterwideSpace; +use crate::util::AnyWithTypeName; +use ::tarantool::tlua::LuaError; +use ::tarantool::tuple::{ToTupleBuffer, Tuple, TupleBuffer}; +use serde::{Deserialize, Serialize}; + +//////////////////////////////////////////////////////////////////////////////// +// OpResult +//////////////////////////////////////////////////////////////////////////////// + +pub trait OpResult { + type Result: 'static; + // FIXME: this signature makes it look like result of any operation depends + // only on what is contained within the operation which is almost never true + // And it makes it hard to do anything useful inside this function. + fn result(self) -> Self::Result; +} + +//////////////////////////////////////////////////////////////////////////////// +/// The operation on the raft state machine. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +#[serde(tag = "kind")] +pub enum Op { + /// No operation. + Nop, + /// Print the message in tarantool log. + Info { msg: String }, + /// Evaluate the code on every instance in cluster. + EvalLua(EvalLua), + /// + ReturnOne(ReturnOne), + /// Update the given instance's entry in [`storage::Instances`]. + PersistInstance(PersistInstance), + /// Cluster-wide data modification operation. + /// Should be used to manipulate the cluster-wide configuration. + Dml(Dml), +} + +impl std::fmt::Display for Op { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + return match self { + Self::Nop => f.write_str("Nop"), + Self::Info { msg } => write!(f, "Info({msg:?})"), + Self::EvalLua(EvalLua { code }) => write!(f, "EvalLua({code:?})"), + Self::ReturnOne(_) => write!(f, "ReturnOne"), + Self::PersistInstance(PersistInstance(instance)) => { + write!(f, "PersistInstance{}", instance) + } + Self::Dml(Dml::Insert { space, tuple }) => { + write!(f, "Insert({space}, {})", DisplayAsJson(tuple)) + } + Self::Dml(Dml::Replace { space, tuple }) => { + write!(f, "Replace({space}, {})", DisplayAsJson(tuple)) + } + Self::Dml(Dml::Update { space, key, ops }) => { + let key = DisplayAsJson(key); + let ops = DisplayAsJson(&**ops); + write!(f, "Update({space}, {key}, {ops})") + } + Self::Dml(Dml::Delete { space, key }) => { + write!(f, "Delete({space}, {})", DisplayAsJson(key)) + } + }; + + struct DisplayAsJson<T>(pub T); + + impl std::fmt::Display for DisplayAsJson<&TupleBuffer> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + if let Some(data) = rmp_serde::from_slice::<serde_json::Value>(self.0.as_ref()) + .ok() + .and_then(|v| serde_json::to_string(&v).ok()) + { + return write!(f, "{data}"); + } + + write!(f, "{:?}", self.0) + } + } + + impl std::fmt::Display for DisplayAsJson<&[TupleBuffer]> { + fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { + write!(f, "[")?; + if let Some(elem) = self.0.first() { + write!(f, "{}", DisplayAsJson(elem))?; + } + for elem in self.0.iter().skip(1) { + write!(f, ", {}", DisplayAsJson(elem))?; + } + write!(f, "]") + } + } + } +} + +impl Op { + pub fn on_commit(self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> { + match self { + Self::Nop => Box::new(()), + Self::Info { msg } => { + crate::tlog!(Info, "{msg}"); + Box::new(()) + } + Self::EvalLua(op) => Box::new(op.result()), + Self::ReturnOne(op) => Box::new(op.result()), + Self::PersistInstance(op) => { + let instance = op.result(); + instances.put(&instance).unwrap(); + instance + } + Self::Dml(op) => Box::new(op.result()), + } + } +} + +impl OpResult for Op { + type Result = (); + fn result(self) -> Self::Result {} +} + +//////////////////////////////////////////////////////////////////////////////// +// ReturnOne +//////////////////////////////////////////////////////////////////////////////// + +impl From<ReturnOne> for Op { + fn from(op: ReturnOne) -> Op { + Op::ReturnOne(op) + } +} + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct ReturnOne; + +impl OpResult for ReturnOne { + type Result = u8; + fn result(self) -> Self::Result { + 1 + } +} + +//////////////////////////////////////////////////////////////////////////////// +// EvalLua +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct EvalLua { + pub code: String, +} + +impl OpResult for EvalLua { + type Result = Result<(), LuaError>; + fn result(self) -> Self::Result { + crate::tarantool::exec(&self.code) + } +} + +impl From<EvalLua> for Op { + fn from(op: EvalLua) -> Op { + Op::EvalLua(op) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// PersistInstance +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct PersistInstance(pub Box<Instance>); + +impl PersistInstance { + pub fn new(instance: Instance) -> Self { + Self(Box::new(instance)) + } +} + +impl OpResult for PersistInstance { + type Result = Box<Instance>; + fn result(self) -> Self::Result { + self.0 + } +} + +impl From<PersistInstance> for Op { + #[inline] + fn from(op: PersistInstance) -> Op { + Op::PersistInstance(op) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// Dml +//////////////////////////////////////////////////////////////////////////////// + +/// Cluster-wide data modification operation. +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub enum Dml { + Insert { + space: ClusterwideSpace, + #[serde(with = "serde_bytes")] + tuple: TupleBuffer, + }, + Replace { + space: ClusterwideSpace, + #[serde(with = "serde_bytes")] + tuple: TupleBuffer, + }, + Update { + space: ClusterwideSpace, + #[serde(with = "serde_bytes")] + key: TupleBuffer, + #[serde(with = "vec_of_raw_byte_buf")] + ops: Vec<TupleBuffer>, + }, + Delete { + space: ClusterwideSpace, + #[serde(with = "serde_bytes")] + key: TupleBuffer, + }, +} + +impl OpResult for Dml { + type Result = tarantool::Result<Option<Tuple>>; + fn result(self) -> Self::Result { + match self { + Self::Insert { space, tuple } => space.insert(&tuple).map(Some), + Self::Replace { space, tuple } => space.replace(&tuple).map(Some), + Self::Update { space, key, ops } => space.update(&key, &ops), + Self::Delete { space, key } => space.delete(&key), + } + } +} + +impl From<Dml> for Op { + fn from(op: Dml) -> Op { + Op::Dml(op) + } +} + +impl Dml { + /// Serializes `tuple` and returns an [`Dml::Insert`] in case of success. + pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + let res = Self::Insert { + space, + tuple: tuple.to_tuple_buffer()?, + }; + Ok(res) + } + + /// Serializes `tuple` and returns an [`Dml::Replace`] in case of success. + pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> { + let res = Self::Replace { + space, + tuple: tuple.to_tuple_buffer()?, + }; + Ok(res) + } + + /// Serializes `key` and returns an [`Dml::Update`] in case of success. + pub fn update( + space: ClusterwideSpace, + key: &impl ToTupleBuffer, + ops: impl Into<Vec<TupleBuffer>>, + ) -> tarantool::Result<Self> { + let res = Self::Update { + space, + key: key.to_tuple_buffer()?, + ops: ops.into(), + }; + Ok(res) + } + + /// Serializes `key` and returns an [`Dml::Delete`] in case of success. + pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> { + let res = Self::Delete { + space, + key: key.to_tuple_buffer()?, + }; + Ok(res) + } + + #[rustfmt::skip] + pub fn space(&self) -> &ClusterwideSpace { + match &self { + Self::Insert { space, .. } => space, + Self::Replace { space, .. } => space, + Self::Update { space, .. } => space, + Self::Delete { space, .. } => space, + } + } +} + +mod vec_of_raw_byte_buf { + use super::TupleBuffer; + use serde::de::Error as _; + use serde::ser::SerializeSeq; + use serde::{self, Deserialize, Deserializer, Serializer}; + use serde_bytes::{ByteBuf, Bytes}; + use std::convert::TryFrom; + + pub fn serialize<S>(v: &[TupleBuffer], ser: S) -> Result<S::Ok, S::Error> + where + S: Serializer, + { + let mut seq = ser.serialize_seq(Some(v.len()))?; + for buf in v { + seq.serialize_element(Bytes::new(buf.as_ref()))?; + } + seq.end() + } + + pub fn deserialize<'de, D>(de: D) -> Result<Vec<TupleBuffer>, D::Error> + where + D: Deserializer<'de>, + { + let tmp = Vec::<ByteBuf>::deserialize(de)?; + // FIXME(gmoshkin): redundant copy happens here, + // because ByteBuf and TupleBuffer are essentially the same struct, + // but there's no easy foolproof way + // to convert a Vec<ByteBuf> to Vec<TupleBuffer> + // because of borrow and drop checkers + let res: tarantool::Result<_> = tmp + .into_iter() + .map(|bb| TupleBuffer::try_from(bb.into_vec())) + .collect(); + res.map_err(D::Error::custom) + } +}