From 3cf669c0883c17632a6dda7ead1e998e82380d6d Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 1 Dec 2022 14:34:56 +0300 Subject: [PATCH] refactor(op): add OpPersistPeer and impl OpResult for it --- src/main.rs | 4 +-- src/traft/mod.rs | 90 +++++++++++++++++++++++++++++------------------ src/traft/node.rs | 34 +++++++++--------- 3 files changed, 76 insertions(+), 52 deletions(-) diff --git a/src/main.rs b/src/main.rs index 5e8f2990e6..2989821208 100644 --- a/src/main.rs +++ b/src/main.rs @@ -307,7 +307,7 @@ fn picolib_setup(args: &args::Run) { entry.term.to_string(), entry .lc() - .map(ToString::to_string) + .map(|lc| lc.to_string()) .unwrap_or_else(String::new), entry.payload().to_string(), ]; @@ -829,7 +829,7 @@ fn start_boot(args: &args::Run) { .expect("cannot fail") .into(), ); - init_entries_push_op(traft::Op::persist_peer(peer)); + init_entries_push_op(traft::OpPersistPeer::new(peer).into()); init_entries_push_op( OpDML::insert( ClusterwideSpace::State, diff --git a/src/traft/mod.rs b/src/traft/mod.rs index ccbf6b579e..da3f149481 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -102,16 +102,13 @@ pub enum Op { /// No operation. Nop, /// Print the message in tarantool log. - Info { - msg: String, - }, + Info { msg: String }, /// Evaluate the code on every instance in cluster. EvalLua(OpEvalLua), /// ReturnOne(OpReturnOne), - PersistPeer { - peer: Box<Peer>, - }, + /// Update the given peer's entry in [`storage::Peers`]. + PersistPeer(OpPersistPeer), /// Cluster-wide data modification operation. /// Should be used to manipulate the cluster-wide configuration. Dml(OpDML), @@ -124,7 +121,7 @@ impl std::fmt::Display for Op { Self::Info { msg } => write!(f, "Info({msg:?})"), Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), Self::ReturnOne(_) => write!(f, "ReturnOne"), - Self::PersistPeer { peer } => { + Self::PersistPeer(OpPersistPeer(peer)) => { write!(f, "PersistPeer{}", peer) } Self::Dml(OpDML::Insert { space, tuple }) => { @@ -174,7 +171,7 @@ impl std::fmt::Display for Op { } impl Op { - pub fn on_commit(&self, peers: &storage::Peers) -> Box<dyn AnyWithTypeName> { + pub fn on_commit(self, peers: &storage::Peers) -> Box<dyn AnyWithTypeName> { match self { Self::Nop => Box::new(()), Self::Info { msg } => { @@ -183,30 +180,24 @@ impl Op { } Self::EvalLua(op) => Box::new(op.result()), Self::ReturnOne(op) => Box::new(op.result()), - Self::PersistPeer { peer } => { - peers.put(peer).unwrap(); - Box::new(peer.clone()) + Self::PersistPeer(op) => { + let peer = op.result(); + peers.put(&peer).unwrap(); + peer } Self::Dml(op) => { - let res = Box::new(op.result()); if op.space() == &ClusterwideSpace::State { event::broadcast(Event::ClusterStateChanged); } - res + Box::new(op.result()) } } } - - pub fn persist_peer(peer: Peer) -> Self { - Self::PersistPeer { - peer: Box::new(peer), - } - } } impl OpResult for Op { type Result = (); - fn result(&self) -> Self::Result {} + fn result(self) -> Self::Result {} } impl From<OpReturnOne> for Op { @@ -220,7 +211,7 @@ pub struct OpReturnOne; impl OpResult for OpReturnOne { type Result = u8; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { 1 } } @@ -232,7 +223,7 @@ pub struct OpEvalLua { impl OpResult for OpEvalLua { type Result = StdResult<(), LuaError>; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { crate::tarantool::exec(&self.code) } } @@ -245,7 +236,37 @@ impl From<OpEvalLua> for Op { pub trait OpResult { type Result: 'static; - fn result(&self) -> Self::Result; + // FIXME: this signature makes it look like result of any operation depends + // only on what is contained within the operation which is almost never true + // And it makes it hard to do anything useful inside this function. + fn result(self) -> Self::Result; +} + +//////////////////////////////////////////////////////////////////////////////// +// OpPersistPeer +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)] +pub struct OpPersistPeer(pub Box<Peer>); + +impl OpPersistPeer { + pub fn new(peer: Peer) -> Self { + Self(Box::new(peer)) + } +} + +impl OpResult for OpPersistPeer { + type Result = Box<Peer>; + fn result(self) -> Self::Result { + self.0 + } +} + +impl From<OpPersistPeer> for Op { + #[inline] + fn from(op: OpPersistPeer) -> Op { + Op::PersistPeer(op) + } } ////////////////////////////////////////////////////////////////////////////////////////// @@ -280,12 +301,12 @@ pub enum OpDML { impl OpResult for OpDML { type Result = tarantool::Result<Option<Tuple>>; - fn result(&self) -> Self::Result { + fn result(self) -> Self::Result { match self { - Self::Insert { space, tuple } => space.insert(tuple).map(Some), - Self::Replace { space, tuple } => space.replace(tuple).map(Some), - Self::Update { space, key, ops } => space.update(key, ops), - Self::Delete { space, key } => space.delete(key), + Self::Insert { space, tuple } => space.insert(&tuple).map(Some), + Self::Replace { space, tuple } => space.replace(&tuple).map(Some), + Self::Update { space, key, ops } => space.update(&key, &ops), + Self::Delete { space, key } => space.delete(&key), } } } @@ -619,18 +640,19 @@ impl ContextCoercion for EntryContextConfChange {} impl Entry { /// Returns the logical clock value if it's an `EntryNormal`. - pub fn lc(&self) -> Option<&LogicalClock> { + pub fn lc(&self) -> Option<LogicalClock> { match &self.context { - Some(EntryContext::Normal(v)) => Some(&v.lc), + Some(EntryContext::Normal(v)) => Some(v.lc), Some(EntryContext::ConfChange(_)) => None, None => None, } } - /// Returns the contained `Op` if it's an `EntryNormal`. - fn op(&self) -> Option<&Op> { - match &self.context { - Some(EntryContext::Normal(v)) => Some(&v.op), + /// Returns the contained `Op` if it's an `EntryNormal` + /// consuming `self` by value. + fn into_op(self) -> Option<Op> { + match self.context { + Some(EntryContext::Normal(v)) => Some(v.op), Some(EntryContext::ConfChange(_)) => None, None => None, } diff --git a/src/traft/node.rs b/src/traft/node.rs index d91f9fa044..c89149d4c6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -37,11 +37,11 @@ use crate::storage::{Clusterwide, ClusterwideSpace, StateKey}; use crate::stringify_cfunc; use crate::traft::rpc; use crate::traft::ContextCoercion as _; -use crate::traft::OpDML; use crate::traft::Peer; use crate::traft::RaftId; use crate::traft::RaftIndex; use crate::traft::RaftTerm; +use crate::traft::{OpDML, OpPersistPeer}; use crate::unwrap_some_or; use crate::warn_or_panic; use ::tarantool::util::IntoClones as _; @@ -278,7 +278,7 @@ impl Node { // FIXME: this error should be handled let _ = notify.recv_any().await; } - notify.recv().await + notify.recv().await.map(Box::new) }) } @@ -438,7 +438,7 @@ impl NodeImpl { T: Into<traft::Op>, { let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, op); + let ctx = traft::EntryContextNormal::new(lc, op.into()); self.raw_node.propose(ctx.to_bytes(), vec![])?; Ok(notify) } @@ -513,7 +513,7 @@ impl NodeImpl { } }; let (lc, notify) = self.schedule_notification(); - let ctx = traft::EntryContextNormal::new(lc, Op::persist_peer(peer)); + let ctx = traft::EntryContextNormal::new(lc, OpPersistPeer::new(peer)); // Important! Calling `raw_node.propose()` may result in // `ProposalDropped` error, but the topology has already been @@ -635,18 +635,11 @@ impl NodeImpl { expelled: &mut bool, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); - let result = entry - .op() - .unwrap_or(&traft::Op::Nop) - .on_commit(&self.storage.peers); + let lc = entry.lc(); + let index = entry.index; + let op = entry.into_op().unwrap_or(traft::Op::Nop); - if let Some(lc) = entry.lc() { - if let Some(notify) = self.notifications.remove(lc) { - notify.notify_ok_any(result); - } - } - - if let Some(traft::Op::PersistPeer { peer, .. }) = entry.op() { + if let traft::Op::PersistPeer(OpPersistPeer(peer)) = &op { *topology_changed = true; if peer.current_grade == CurrentGradeVariant::Expelled && peer.raft_id == self.raft_id() { @@ -655,7 +648,16 @@ impl NodeImpl { } } - if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) { + // apply the operation + let result = op.on_commit(&self.storage.peers); + + if let Some(lc) = &lc { + if let Some(notify) = self.notifications.remove(lc) { + notify.notify_ok_any(result); + } + } + + if let Some(notify) = self.joint_state_latch.take_or_keep(&index) { // It was expected to be a ConfChange entry, but it's // normal. Raft must have overriden it, or there was // a re-election. -- GitLab