diff --git a/src/main.rs b/src/main.rs index 7fd0c293f4877f88bd181431a02a80f3dd9777a5..26bcf0909040a55316f3aa90996810d10b63cd91 100644 --- a/src/main.rs +++ b/src/main.rs @@ -287,7 +287,7 @@ fn picolib_setup(args: &args::Run) { entry.term.to_string(), entry .lc() - .map(ToString::to_string) + .map(|lc| lc.to_string()) .unwrap_or_else(String::new), entry.payload().to_string(), ]; @@ -810,7 +810,7 @@ fn start_boot(args: &args::Run) { .expect("cannot fail") .into(), ); - init_entries_push_op(traft::Op::persist_instance(instance)); + init_entries_push_op(traft::OpPersistInstance::new(instance).into()); init_entries_push_op( OpDML::insert( ClusterwideSpace::Property, diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 9b4de85718d974b553efce4186dda8c4af7617fa..64c18c2230bc60e88784c2ffae2f55483b59ff13 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -99,16 +99,13 @@ pub enum Op { /// No operation. Nop, /// Print the message in tarantool log. - Info { - msg: String, - }, + Info { msg: String }, /// Evaluate the code on every instance in cluster. EvalLua(OpEvalLua), /// ReturnOne(OpReturnOne), - PersistInstance { - instance: Box<Instance>, - }, + /// Update the given instance's entry in [`storage::Instances`]. + PersistInstance(OpPersistInstance), /// Cluster-wide data modification operation. /// Should be used to manipulate the cluster-wide configuration. Dml(OpDML), @@ -121,7 +118,7 @@ impl std::fmt::Display for Op { Self::Info { msg } => write!(f, "Info({msg:?})"), Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), Self::ReturnOne(_) => write!(f, "ReturnOne"), - Self::PersistInstance { instance } => { + Self::PersistInstance(OpPersistInstance(instance)) => { write!(f, "PersistInstance{}", instance) } Self::Dml(OpDML::Insert { space, tuple }) => { @@ -171,7 +168,7 @@ impl std::fmt::Display for Op { } impl Op { - pub fn on_commit(&self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> { + pub fn on_commit(self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> { match self { Self::Nop => Box::new(()), Self::Info { msg } => { @@ -180,30 +177,24 @@ impl Op { } Self::EvalLua(op) => Box::new(op.result()), Self::ReturnOne(op) => Box::new(op.result()), - Self::PersistInstance { instance } => { - instances.put(instance).unwrap(); - Box::new(instance.clone()) + Self::PersistInstance(op) => { + let instance = op.result(); + instances.put(&instance).unwrap(); + instance } Self::Dml(op) => { - let res = Box::new(op.result()); if op.space() == &ClusterwideSpace::Property { event::broadcast(Event::ClusterStateChanged); } - res + Box::new(op.result()) } } } - - pub fn persist_instance(instance: Instance) -> Self { - Self::PersistInstance { - instance: Box::new(instance), - } - } } impl OpResult for Op { type Result = (); - fn result(&self) -> Self::Result {} + fn result(self) -> Self::Result {} } impl From<OpReturnOne> for Op { @@ -217,7 +208,7 @@ pub struct OpReturnOne; impl OpResult for OpReturnOne { type Result = u8; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { 1 } } @@ -229,7 +220,7 @@ pub struct OpEvalLua { impl OpResult for OpEvalLua { type Result = StdResult<(), LuaError>; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { crate::tarantool::exec(&self.code) } } @@ -242,7 +233,37 @@ impl From<OpEvalLua> for Op { pub trait OpResult { type Result: 'static; - fn result(&self) -> Self::Result; + // FIXME: this signature makes it look like result of any operation depends + // only on what is contained within the operation which is almost never true + // And it makes it hard to do anything useful inside this function. + fn result(self) -> Self::Result; +} + +//////////////////////////////////////////////////////////////////////////////// +// OpPersistInstance +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct OpPersistInstance(pub Box<Instance>); + +impl OpPersistInstance { + pub fn new(instance: Instance) -> Self { + Self(Box::new(instance)) + } +} + +impl OpResult for OpPersistInstance { + type Result = Box<Instance>; + fn result(self) -> Self::Result { + self.0 + } +} + +impl From<OpPersistInstance> for Op { + #[inline] + fn from(op: OpPersistInstance) -> Op { + Op::PersistInstance(op) + } } ////////////////////////////////////////////////////////////////////////////////////////// @@ -277,12 +298,12 @@ pub enum OpDML { impl OpResult for OpDML { type Result = tarantool::Result<Option<Tuple>>; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { match self { - Self::Insert { space, tuple } => space.insert(tuple).map(Some), - Self::Replace { space, tuple } => space.replace(tuple).map(Some), - Self::Update { space, key, ops } => space.update(key, ops), - Self::Delete { space, key } => space.delete(key), + Self::Insert { space, tuple } => space.insert(&tuple).map(Some), + Self::Replace { space, tuple } => space.replace(&tuple).map(Some), + Self::Update { space, key, ops } => space.update(&key, &ops), + Self::Delete { space, key } => space.delete(&key), } } } @@ -616,18 +637,19 @@ impl ContextCoercion for EntryContextConfChange {} impl Entry { /// Returns the logical clock value if it's an `EntryNormal`. - pub fn lc(&self) -> Option<&LogicalClock> { + pub fn lc(&self) -> Option<LogicalClock> { match &self.context { - Some(EntryContext::Normal(v)) => Some(&v.lc), + Some(EntryContext::Normal(v)) => Some(v.lc), Some(EntryContext::ConfChange(_)) => None, None => None, } } - /// Returns the contained `Op` if it's an `EntryNormal`. - fn op(&self) -> Option<&Op> { - match &self.context { - Some(EntryContext::Normal(v)) => Some(&v.op), + /// Returns the contained `Op` if it's an `EntryNormal` + /// consuming `self` by value. + fn into_op(self) -> Option<Op> { + match self.context { + Some(EntryContext::Normal(v)) => Some(v.op), Some(EntryContext::ConfChange(_)) => None, None => None, } diff --git a/src/traft/node.rs b/src/traft/node.rs index bde296384f2aa27b5eed0526423c5ae892c847cd..5293017a2cc2a8a86c99f1db5290191d7426ed41 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -38,10 +38,10 @@ use crate::stringify_cfunc; use crate::traft::rpc; use crate::traft::ContextCoercion as _; use crate::traft::Instance; -use crate::traft::OpDML; use crate::traft::RaftId; use crate::traft::RaftIndex; use crate::traft::RaftTerm; +use crate::traft::{OpDML, OpPersistInstance}; use crate::unwrap_some_or; use crate::warn_or_panic; use ::tarantool::util::IntoClones as _; @@ -277,7 +277,7 @@ impl Node { // FIXME: this error should be handled let _ = notify.recv_any().await; } - notify.recv().await + notify.recv().await.map(Box::new) }) } @@ -437,7 +437,7 @@ impl NodeImpl { T: Into<traft::Op>, { let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, op); + let ctx = traft::EntryContextNormal::new(lc, op.into()); self.raw_node.propose(ctx.to_bytes(), vec![])?; Ok(notify) } @@ -512,7 +512,7 @@ impl NodeImpl { } }; let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, Op::persist_instance(instance)); + let ctx = traft::EntryContextNormal::new(lc, OpPersistInstance::new(instance)); // Important! Calling `raw_node.propose()` may result in // `ProposalDropped` error, but the topology has already been @@ -634,18 +634,11 @@ impl NodeImpl { expelled: &mut bool, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); - let result = entry - .op() - .unwrap_or(&traft::Op::Nop) - .on_commit(&self.storage.instances); + let lc = entry.lc(); + let index = entry.index; + let op = entry.into_op().unwrap_or(traft::Op::Nop); - if let Some(lc) = entry.lc() { - if let Some(notify) = self.notifications.remove(lc) { - notify.notify_ok_any(result); - } - } - - if let Some(traft::Op::PersistInstance { instance, .. }) = entry.op() { + if let traft::Op::PersistInstance(OpPersistInstance(instance)) = &op { *topology_changed = true; if instance.current_grade == CurrentGradeVariant::Expelled && instance.raft_id == self.raft_id() @@ -655,7 +648,16 @@ impl NodeImpl { } } - if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) { + // apply the operation + let result = op.on_commit(&self.storage.instances); + + if let Some(lc) = &lc { + if let Some(notify) = self.notifications.remove(lc) { + notify.notify_ok_any(result); + } + } + + if let Some(notify) = self.joint_state_latch.take_or_keep(&index) { // It was expected to be a ConfChange entry, but it's // normal. Raft must have overriden it, or there was // a re-election.