Skip to content
Snippets Groups Projects
Commit 3cf669c0 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor(op): add OpPersistPeer and impl OpResult for it

parent 96988707
No related branches found
No related tags found
No related merge requests found
...@@ -307,7 +307,7 @@ fn picolib_setup(args: &args::Run) { ...@@ -307,7 +307,7 @@ fn picolib_setup(args: &args::Run) {
entry.term.to_string(), entry.term.to_string(),
entry entry
.lc() .lc()
.map(ToString::to_string) .map(|lc| lc.to_string())
.unwrap_or_else(String::new), .unwrap_or_else(String::new),
entry.payload().to_string(), entry.payload().to_string(),
]; ];
...@@ -829,7 +829,7 @@ fn start_boot(args: &args::Run) { ...@@ -829,7 +829,7 @@ fn start_boot(args: &args::Run) {
.expect("cannot fail") .expect("cannot fail")
.into(), .into(),
); );
init_entries_push_op(traft::Op::persist_peer(peer)); init_entries_push_op(traft::OpPersistPeer::new(peer).into());
init_entries_push_op( init_entries_push_op(
OpDML::insert( OpDML::insert(
ClusterwideSpace::State, ClusterwideSpace::State,
......
...@@ -102,16 +102,13 @@ pub enum Op { ...@@ -102,16 +102,13 @@ pub enum Op {
/// No operation. /// No operation.
Nop, Nop,
/// Print the message in tarantool log. /// Print the message in tarantool log.
Info { Info { msg: String },
msg: String,
},
/// Evaluate the code on every instance in cluster. /// Evaluate the code on every instance in cluster.
EvalLua(OpEvalLua), EvalLua(OpEvalLua),
/// ///
ReturnOne(OpReturnOne), ReturnOne(OpReturnOne),
PersistPeer { /// Update the given peer's entry in [`storage::Peers`].
peer: Box<Peer>, PersistPeer(OpPersistPeer),
},
/// Cluster-wide data modification operation. /// Cluster-wide data modification operation.
/// Should be used to manipulate the cluster-wide configuration. /// Should be used to manipulate the cluster-wide configuration.
Dml(OpDML), Dml(OpDML),
...@@ -124,7 +121,7 @@ impl std::fmt::Display for Op { ...@@ -124,7 +121,7 @@ impl std::fmt::Display for Op {
Self::Info { msg } => write!(f, "Info({msg:?})"), Self::Info { msg } => write!(f, "Info({msg:?})"),
Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"), Self::EvalLua(OpEvalLua { code }) => write!(f, "EvalLua({code:?})"),
Self::ReturnOne(_) => write!(f, "ReturnOne"), Self::ReturnOne(_) => write!(f, "ReturnOne"),
Self::PersistPeer { peer } => { Self::PersistPeer(OpPersistPeer(peer)) => {
write!(f, "PersistPeer{}", peer) write!(f, "PersistPeer{}", peer)
} }
Self::Dml(OpDML::Insert { space, tuple }) => { Self::Dml(OpDML::Insert { space, tuple }) => {
...@@ -174,7 +171,7 @@ impl std::fmt::Display for Op { ...@@ -174,7 +171,7 @@ impl std::fmt::Display for Op {
} }
impl Op { impl Op {
pub fn on_commit(&self, peers: &storage::Peers) -> Box<dyn AnyWithTypeName> { pub fn on_commit(self, peers: &storage::Peers) -> Box<dyn AnyWithTypeName> {
match self { match self {
Self::Nop => Box::new(()), Self::Nop => Box::new(()),
Self::Info { msg } => { Self::Info { msg } => {
...@@ -183,30 +180,24 @@ impl Op { ...@@ -183,30 +180,24 @@ impl Op {
} }
Self::EvalLua(op) => Box::new(op.result()), Self::EvalLua(op) => Box::new(op.result()),
Self::ReturnOne(op) => Box::new(op.result()), Self::ReturnOne(op) => Box::new(op.result()),
Self::PersistPeer { peer } => { Self::PersistPeer(op) => {
peers.put(peer).unwrap(); let peer = op.result();
Box::new(peer.clone()) peers.put(&peer).unwrap();
peer
} }
Self::Dml(op) => { Self::Dml(op) => {
let res = Box::new(op.result());
if op.space() == &ClusterwideSpace::State { if op.space() == &ClusterwideSpace::State {
event::broadcast(Event::ClusterStateChanged); event::broadcast(Event::ClusterStateChanged);
} }
res Box::new(op.result())
} }
} }
} }
pub fn persist_peer(peer: Peer) -> Self {
Self::PersistPeer {
peer: Box::new(peer),
}
}
} }
impl OpResult for Op { impl OpResult for Op {
type Result = (); type Result = ();
fn result(&self) -> Self::Result {} fn result(self) -> Self::Result {}
} }
impl From<OpReturnOne> for Op { impl From<OpReturnOne> for Op {
...@@ -220,7 +211,7 @@ pub struct OpReturnOne; ...@@ -220,7 +211,7 @@ pub struct OpReturnOne;
impl OpResult for OpReturnOne { impl OpResult for OpReturnOne {
type Result = u8; type Result = u8;
fn result(&self) -> Self::Result { fn result(self) -> Self::Result {
1 1
} }
} }
...@@ -232,7 +223,7 @@ pub struct OpEvalLua { ...@@ -232,7 +223,7 @@ pub struct OpEvalLua {
impl OpResult for OpEvalLua { impl OpResult for OpEvalLua {
type Result = StdResult<(), LuaError>; type Result = StdResult<(), LuaError>;
fn result(&self) -> Self::Result { fn result(self) -> Self::Result {
crate::tarantool::exec(&self.code) crate::tarantool::exec(&self.code)
} }
} }
...@@ -245,7 +236,37 @@ impl From<OpEvalLua> for Op { ...@@ -245,7 +236,37 @@ impl From<OpEvalLua> for Op {
pub trait OpResult { pub trait OpResult {
type Result: 'static; type Result: 'static;
fn result(&self) -> Self::Result; // FIXME: this signature makes it look like result of any operation depends
// only on what is contained within the operation which is almost never true
// And it makes it hard to do anything useful inside this function.
fn result(self) -> Self::Result;
}
////////////////////////////////////////////////////////////////////////////////
// OpPersistPeer
////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct OpPersistPeer(pub Box<Peer>);
impl OpPersistPeer {
pub fn new(peer: Peer) -> Self {
Self(Box::new(peer))
}
}
impl OpResult for OpPersistPeer {
type Result = Box<Peer>;
fn result(self) -> Self::Result {
self.0
}
}
impl From<OpPersistPeer> for Op {
#[inline]
fn from(op: OpPersistPeer) -> Op {
Op::PersistPeer(op)
}
} }
////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////
...@@ -280,12 +301,12 @@ pub enum OpDML { ...@@ -280,12 +301,12 @@ pub enum OpDML {
impl OpResult for OpDML { impl OpResult for OpDML {
type Result = tarantool::Result<Option<Tuple>>; type Result = tarantool::Result<Option<Tuple>>;
fn result(&self) -> Self::Result { fn result(self) -> Self::Result {
match self { match self {
Self::Insert { space, tuple } => space.insert(tuple).map(Some), Self::Insert { space, tuple } => space.insert(&tuple).map(Some),
Self::Replace { space, tuple } => space.replace(tuple).map(Some), Self::Replace { space, tuple } => space.replace(&tuple).map(Some),
Self::Update { space, key, ops } => space.update(key, ops), Self::Update { space, key, ops } => space.update(&key, &ops),
Self::Delete { space, key } => space.delete(key), Self::Delete { space, key } => space.delete(&key),
} }
} }
} }
...@@ -619,18 +640,19 @@ impl ContextCoercion for EntryContextConfChange {} ...@@ -619,18 +640,19 @@ impl ContextCoercion for EntryContextConfChange {}
impl Entry { impl Entry {
/// Returns the logical clock value if it's an `EntryNormal`. /// Returns the logical clock value if it's an `EntryNormal`.
pub fn lc(&self) -> Option<&LogicalClock> { pub fn lc(&self) -> Option<LogicalClock> {
match &self.context { match &self.context {
Some(EntryContext::Normal(v)) => Some(&v.lc), Some(EntryContext::Normal(v)) => Some(v.lc),
Some(EntryContext::ConfChange(_)) => None, Some(EntryContext::ConfChange(_)) => None,
None => None, None => None,
} }
} }
/// Returns the contained `Op` if it's an `EntryNormal`. /// Returns the contained `Op` if it's an `EntryNormal`
fn op(&self) -> Option<&Op> { /// consuming `self` by value.
match &self.context { fn into_op(self) -> Option<Op> {
Some(EntryContext::Normal(v)) => Some(&v.op), match self.context {
Some(EntryContext::Normal(v)) => Some(v.op),
Some(EntryContext::ConfChange(_)) => None, Some(EntryContext::ConfChange(_)) => None,
None => None, None => None,
} }
......
...@@ -37,11 +37,11 @@ use crate::storage::{Clusterwide, ClusterwideSpace, StateKey}; ...@@ -37,11 +37,11 @@ use crate::storage::{Clusterwide, ClusterwideSpace, StateKey};
use crate::stringify_cfunc; use crate::stringify_cfunc;
use crate::traft::rpc; use crate::traft::rpc;
use crate::traft::ContextCoercion as _; use crate::traft::ContextCoercion as _;
use crate::traft::OpDML;
use crate::traft::Peer; use crate::traft::Peer;
use crate::traft::RaftId; use crate::traft::RaftId;
use crate::traft::RaftIndex; use crate::traft::RaftIndex;
use crate::traft::RaftTerm; use crate::traft::RaftTerm;
use crate::traft::{OpDML, OpPersistPeer};
use crate::unwrap_some_or; use crate::unwrap_some_or;
use crate::warn_or_panic; use crate::warn_or_panic;
use ::tarantool::util::IntoClones as _; use ::tarantool::util::IntoClones as _;
...@@ -278,7 +278,7 @@ impl Node { ...@@ -278,7 +278,7 @@ impl Node {
// FIXME: this error should be handled // FIXME: this error should be handled
let _ = notify.recv_any().await; let _ = notify.recv_any().await;
} }
notify.recv().await notify.recv().await.map(Box::new)
}) })
} }
...@@ -438,7 +438,7 @@ impl NodeImpl { ...@@ -438,7 +438,7 @@ impl NodeImpl {
T: Into<traft::Op>, T: Into<traft::Op>,
{ {
let (lc, notify) = self.schedule_notification(); let (lc, notify) = self.schedule_notification();
let ctx = traft::EntryContextNormal::new(lc, op); let ctx = traft::EntryContextNormal::new(lc, op.into());
self.raw_node.propose(ctx.to_bytes(), vec![])?; self.raw_node.propose(ctx.to_bytes(), vec![])?;
Ok(notify) Ok(notify)
} }
...@@ -513,7 +513,7 @@ impl NodeImpl { ...@@ -513,7 +513,7 @@ impl NodeImpl {
} }
}; };
let (lc, notify) = self.schedule_notification(); let (lc, notify) = self.schedule_notification();
let ctx = traft::EntryContextNormal::new(lc, Op::persist_peer(peer)); let ctx = traft::EntryContextNormal::new(lc, OpPersistPeer::new(peer));
// Important! Calling `raw_node.propose()` may result in // Important! Calling `raw_node.propose()` may result in
// `ProposalDropped` error, but the topology has already been // `ProposalDropped` error, but the topology has already been
...@@ -635,18 +635,11 @@ impl NodeImpl { ...@@ -635,18 +635,11 @@ impl NodeImpl {
expelled: &mut bool, expelled: &mut bool,
) { ) {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let result = entry let lc = entry.lc();
.op() let index = entry.index;
.unwrap_or(&traft::Op::Nop) let op = entry.into_op().unwrap_or(traft::Op::Nop);
.on_commit(&self.storage.peers);
if let Some(lc) = entry.lc() { if let traft::Op::PersistPeer(OpPersistPeer(peer)) = &op {
if let Some(notify) = self.notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(traft::Op::PersistPeer { peer, .. }) = entry.op() {
*topology_changed = true; *topology_changed = true;
if peer.current_grade == CurrentGradeVariant::Expelled && peer.raft_id == self.raft_id() if peer.current_grade == CurrentGradeVariant::Expelled && peer.raft_id == self.raft_id()
{ {
...@@ -655,7 +648,16 @@ impl NodeImpl { ...@@ -655,7 +648,16 @@ impl NodeImpl {
} }
} }
if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) { // apply the operation
let result = op.on_commit(&self.storage.peers);
if let Some(lc) = &lc {
if let Some(notify) = self.notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(notify) = self.joint_state_latch.take_or_keep(&index) {
// It was expected to be a ConfChange entry, but it's // It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was // normal. Raft must have overriden it, or there was
// a re-election. // a re-election.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment