Skip to content
Snippets Groups Projects
Commit 4e91fc99 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov Committed by Yaroslav Dynnikov
Browse files

refactor: access RawNode through InnerNode

parent 39daea98
No related branches found
No related tags found
1 merge request!250Move the code here and there
......@@ -174,16 +174,16 @@ impl Node {
/// **This function yields**
pub fn wait_for_read_state(&self, timeout: Duration) -> Result<RaftIndex, Error> {
self.raw_operation(|raw_node| {
self.raw_operation(|inner_node| {
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
let term_just_started = raw_node.raft.state == RaftStateRole::Leader
&& !raw_node.raft.commit_to_current_term();
let leader_doesnt_exist = inner_node.raw_node.raft.leader_id == INVALID_ID;
let term_just_started = inner_node.raw_node.raft.state == RaftStateRole::Leader
&& !inner_node.raw_node.raft.commit_to_current_term();
if leader_doesnt_exist || term_just_started {
return Err(RaftError::ProposalDropped.into());
}
......@@ -193,7 +193,7 @@ impl Node {
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
raw_node.read_index(ctx.to_bytes());
inner_node.raw_node.read_index(ctx.to_bytes());
Ok(notify)
})?
.recv_timeout::<RaftIndex>(timeout)
......@@ -206,10 +206,10 @@ impl Node {
op: T,
timeout: Duration,
) -> Result<T::Result, Error> {
self.raw_operation(|raw_node| {
self.raw_operation(|inner_node| {
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, op);
raw_node.propose(ctx.to_bytes(), vec![])?;
inner_node.raw_node.propose(ctx.to_bytes(), vec![])?;
Ok(notify)
})?
.recv_timeout::<T::Result>(timeout)
......@@ -219,7 +219,7 @@ impl Node {
/// chance we become the leader.
/// **This function yields**
pub fn campaign_and_yield(&self) -> Result<(), Error> {
self.raw_operation(|raw_node| raw_node.campaign().map_err(Into::into))?;
self.raw_operation(|inner_node| inner_node.raw_node.campaign().map_err(Into::into))?;
// Even though we don't expect a response, we still should let the
// main_loop do an iteration. Without rescheduling, the Ready state
// wouldn't be processed, the Status wouldn't be updated, and some
......@@ -230,13 +230,13 @@ impl Node {
/// **This function yields**
pub fn step_and_yield(&self, msg: raft::Message) {
self.raw_operation(|raw_node| {
if msg.to != raw_node.raft.id {
self.raw_operation(|inner_node| {
if msg.to != inner_node.raw_node.raft.id {
return Ok(());
}
// TODO check it's not a MsgPropose with op::PersistPeer.
// TODO check it's not a MsgPropose with ConfChange.
if let Err(e) = raw_node.step(msg) {
if let Err(e) = inner_node.raw_node.step(msg) {
tlog!(Error, "{e}");
}
Ok(())
......@@ -249,9 +249,9 @@ impl Node {
/// **This function yields**
pub fn tick_and_yield(&self, n_times: u32) {
self.raw_operation(|raw_node| {
self.raw_operation(|inner_node| {
for _ in 0..n_times {
raw_node.tick();
inner_node.raw_node.tick();
}
Ok(())
})
......@@ -281,14 +281,14 @@ impl Node {
&self,
req: TopologyRequest,
) -> Result<traft::Peer, Error> {
self.raw_operation(|raw_node| {
if raw_node.raft.state != RaftStateRole::Leader {
self.raw_operation(|inner_node| {
if inner_node.raw_node.raft.state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()).into());
}
let mut topology = self
.topology_cache
.pop(&raw_node.raft.term)
.pop(&inner_node.raw_node.raft.term)
.unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor = Storage::replication_factor().unwrap().unwrap();
......@@ -312,16 +312,17 @@ impl Node {
};
let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
self.topology_cache.put(raw_node.raft.term, topology);
self.topology_cache.put(inner_node.raw_node.raft.term, topology);
return Err(RaftError::ConfChangeError(e).into());
});
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
peer.commit_index = inner_node.raw_node.raft.raft_log.last_index() + 1;
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache.put(raw_node.raft.term, topology);
inner_node.raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache
.put(inner_node.raw_node.raft.term, topology);
Ok(notify)
})?
.recv::<Peer>()
......@@ -335,7 +336,7 @@ impl Node {
term: RaftTerm,
conf_change: raft::ConfChangeV2,
) -> Result<(), Error> {
self.raw_operation(|raw_node| {
self.raw_operation(|inner_node| {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
......@@ -348,17 +349,17 @@ impl Node {
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
if inner_node.raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
if term != inner_node.raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
if inner_node.raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
......@@ -369,15 +370,17 @@ impl Node {
return Err(RaftError::ConfChangeError(e.into()).into());
}
let prev_index = raw_node.raft.raft_log.last_index();
raw_node.propose_conf_change(vec![], conf_change)?;
let prev_index = inner_node.raw_node.raft.raft_log.last_index();
inner_node
.raw_node
.propose_conf_change(vec![], conf_change)?;
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
let last_index = inner_node.raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
let (rx, tx) = Notify::new().into_clones();
......@@ -398,10 +401,10 @@ impl Node {
#[inline]
fn raw_operation<R>(
&self,
f: impl FnOnce(&mut RawNode) -> Result<R, Error>,
f: impl FnOnce(&mut InnerNode) -> Result<R, Error>,
) -> Result<R, Error> {
let mut inner_node = self.inner_node.lock();
let res = f(&mut inner_node.raw_node);
let res = f(&mut *inner_node);
drop(inner_node);
self.raft_loop_cond.broadcast();
res
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment