Skip to content
Snippets Groups Projects
Commit afaab1d4 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: EntryContextNormal::new + Node::add_notify

parent f85c3c61
No related branches found
No related tags found
No related merge requests found
......@@ -256,6 +256,13 @@ pub struct EntryContextNormal {
pub op: Op,
}
impl EntryContextNormal {
#[inline]
pub fn new(lc: LogicalClock, op: impl Into<Op>) -> Self {
Self { lc, op: op.into() }
}
}
/// [`EntryContext`] of a conf change entry, either `EntryConfChange` or `EntryConfChangeV2`
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct EntryContextConfChange {
......
......@@ -41,6 +41,7 @@ use crate::traft::failover;
use crate::traft::notify::Notify;
use crate::traft::ConnectionPool;
use crate::traft::LogicalClock;
use crate::traft::Op;
use crate::traft::Storage;
use crate::traft::Topology;
use crate::traft::TopologyRequest;
......@@ -168,22 +169,16 @@ impl Node {
return Err(RaftError::ProposalDropped.into());
}
let (rx, tx) = Notify::new().into_clones();
let lc = self.next_lc();
self.notifications.borrow_mut().insert(lc, tx);
let (lc, notify) = self.add_notify();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal {
lc,
op: traft::Op::Nop,
}
.to_bytes();
raw_node.read_index(ctx);
let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
raw_node.read_index(ctx.to_bytes());
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
rx.recv_timeout::<u64>(timeout)
notify.recv_timeout::<u64>(timeout)
}
pub fn propose<T: OpResult + Into<traft::Op>>(
......@@ -192,14 +187,12 @@ impl Node {
timeout: Duration,
) -> Result<T::Result, Error> {
let mut raw_node = self.raw_node.lock();
let (rx, tx) = Notify::new().into_clones();
let lc = self.next_lc();
self.notifications.borrow_mut().insert(lc, tx);
let ctx = traft::EntryContextNormal { lc, op: op.into() }.to_bytes();
raw_node.propose(ctx, vec![])?;
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, op);
raw_node.propose(ctx.to_bytes(), vec![])?;
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
rx.recv_timeout::<T::Result>(timeout)
notify.recv_timeout::<T::Result>(timeout)
}
pub fn campaign(&self) -> Result<(), Error> {
......@@ -295,19 +288,13 @@ impl Node {
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let lc = self.next_lc();
let ctx = traft::EntryContextNormal {
op: traft::Op::PersistPeer { peer },
lc,
};
let (rx, tx) = Notify::new().into_clones();
self.notifications.borrow_mut().insert(lc, tx);
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache.put(raw_node.raft.term, topology);
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
rx.recv::<Peer>()
notify.recv::<Peer>()
}
fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> {
......@@ -370,12 +357,21 @@ impl Node {
rx.recv()
}
#[inline]
fn next_lc(&self) -> LogicalClock {
let mut lc = self.lc.take().expect("it's always Some");
let mut lc = self.lc.get().expect("it's always Some");
lc.inc();
self.lc.set(Some(lc));
lc
}
#[inline]
fn add_notify(&self) -> (LogicalClock, Notify) {
let (rx, tx) = Notify::new().into_clones();
let lc = self.next_lc();
self.notifications.borrow_mut().insert(lc, tx);
(lc, rx)
}
}
#[derive(Debug)]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment