Skip to content
Snippets Groups Projects
Commit 13359a7f authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: minimize the amount of places raw_node.lock() is called in

parent afaab1d4
No related branches found
No related tags found
No related merge requests found
Pipeline #9132 passed
......@@ -154,31 +154,29 @@ impl Node {
}
pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
let mut raw_node = self.raw_node.lock();
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
let term_just_started =
raw_node.raft.state == RaftStateRole::Leader && !raw_node.raft.commit_to_current_term();
if leader_doesnt_exist || term_just_started {
return Err(RaftError::ProposalDropped.into());
}
let (lc, notify) = self.add_notify();
self.raw_operation(|raw_node| {
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = raw_node.raft.leader_id == INVALID_ID;
let term_just_started = raw_node.raft.state == RaftStateRole::Leader
&& !raw_node.raft.commit_to_current_term();
if leader_doesnt_exist || term_just_started {
return Err(RaftError::ProposalDropped.into());
}
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
raw_node.read_index(ctx.to_bytes());
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
notify.recv_timeout::<u64>(timeout)
let (lc, notify) = self.add_notify();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
raw_node.read_index(ctx.to_bytes());
Ok(notify)
})?
.recv_timeout::<u64>(timeout)
}
pub fn propose<T: OpResult + Into<traft::Op>>(
......@@ -186,20 +184,17 @@ impl Node {
op: T,
timeout: Duration,
) -> Result<T::Result, Error> {
let mut raw_node = self.raw_node.lock();
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, op);
raw_node.propose(ctx.to_bytes(), vec![])?;
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
notify.recv_timeout::<T::Result>(timeout)
self.raw_operation(|raw_node| {
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, op);
raw_node.propose(ctx.to_bytes(), vec![])?;
Ok(notify)
})?
.recv_timeout::<T::Result>(timeout)
}
pub fn campaign(&self) -> Result<(), Error> {
let mut raw_node = self.raw_node.lock();
raw_node.campaign()?;
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
self.raw_operation(|raw_node| raw_node.campaign().map_err(Into::into))?;
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
......@@ -207,27 +202,30 @@ impl Node {
}
pub fn step(&self, msg: raft::Message) {
let mut raw_node = self.raw_node.lock();
if msg.to != raw_node.raft.id {
return;
}
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
self.raw_operation(|raw_node| {
if msg.to != raw_node.raft.id {
tlog!(Warning, "ignoring message sent to {}", msg.to);
return Ok(());
}
if let Err(e) = raw_node.step(msg) {
tlog!(Error, "{e}");
}
Ok(())
})
.ok();
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
}
pub fn tick(&self, n_times: u32) {
let mut raw_node = self.raw_node.lock();
for _ in 0..n_times {
raw_node.tick();
}
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
self.raw_operation(|raw_node| {
for _ in 0..n_times {
raw_node.tick();
}
Ok(())
})
.ok();
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
......@@ -243,118 +241,127 @@ impl Node {
}
pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> {
let mut raw_node = self.raw_node.lock();
let status = raw_node.status();
if status.ss.raft_state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()).into());
}
self.raw_operation(|raw_node| {
if raw_node.raft.state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()).into());
}
let mut topology = self
.topology_cache
.pop(&raw_node.raft.term)
.unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor = Storage::replication_factor().unwrap().unwrap();
Topology::from_peers(peers).with_replication_factor(replication_factor)
let mut topology = self
.topology_cache
.pop(&raw_node.raft.term)
.unwrap_or_else(|| {
let peers = Storage::peers().unwrap();
let replication_factor = Storage::replication_factor().unwrap().unwrap();
Topology::from_peers(peers).with_replication_factor(replication_factor)
});
let peer_result = match req {
TopologyRequest::Join(JoinRequest {
instance_id,
replicaset_id,
advertise_address,
failure_domains,
..
}) => topology.join(
instance_id,
replicaset_id,
advertise_address,
failure_domains,
),
TopologyRequest::UpdatePeer(UpdatePeerRequest {
instance_id,
health,
failure_domains,
..
}) => topology.update_peer(&instance_id, health, failure_domains),
};
let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
self.topology_cache.put(raw_node.raft.term, topology);
return Err(RaftError::ConfChangeError(e).into());
});
let peer_result = match req {
TopologyRequest::Join(JoinRequest {
instance_id,
replicaset_id,
advertise_address,
failure_domains,
..
}) => topology.join(
instance_id,
replicaset_id,
advertise_address,
failure_domains,
),
TopologyRequest::UpdatePeer(UpdatePeerRequest {
instance_id,
health,
failure_domains,
..
}) => topology.update_peer(&instance_id, health, failure_domains),
};
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache.put(raw_node.raft.term, topology);
return Err(RaftError::ConfChangeError(e).into());
});
peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
let (lc, notify) = self.add_notify();
let ctx = traft::EntryContextNormal::new(lc, Op::PersistPeer { peer });
raw_node.propose(ctx.to_bytes(), vec![])?;
self.topology_cache.put(raw_node.raft.term, topology);
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
notify.recv::<Peer>()
Ok(notify)
})?
.recv::<Peer>()
}
fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> {
let mut raw_node = self.raw_node.lock();
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
self.raw_operation(|raw_node| {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
#[allow(clippy::never_loop)]
let reason: Option<&str> = loop {
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if raw_node.raft.state != RaftStateRole::Leader {
break Some("not a leader");
}
break None;
};
if term != raw_node.raft.term {
break Some("raft term mismatch");
}
if let Some(e) = reason {
return Err(RaftError::ConfChangeError(e.into()).into());
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if raw_node.raft.has_pending_conf() {
break Some("already has pending confchange");
}
let prev_index = raw_node.raft.raft_log.last_index();
raw_node.propose_conf_change(vec![], conf_change)?;
break None;
};
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
if let Some(e) = reason {
return Err(RaftError::ConfChangeError(e.into()).into());
}
let (rx, tx) = Notify::new().into_clones();
with_joint_state_latch(|joint_state_latch| {
assert!(joint_state_latch.take().is_none());
joint_state_latch.set(Some(JointStateLatch {
index: last_index,
notify: tx,
}));
});
let prev_index = raw_node.raft.raft_log.last_index();
raw_node.propose_conf_change(vec![], conf_change)?;
// oops, current instance isn't actually a leader
// (which is impossible in theory, but we're not
// sure in practice) and sent the ConfChange message
// to the raft network instead of appending it to the
// raft log.
let last_index = raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
let (rx, tx) = Notify::new().into_clones();
with_joint_state_latch(|joint_state_latch| {
assert!(joint_state_latch.take().is_none());
joint_state_latch.set(Some(JointStateLatch {
index: last_index,
notify: tx,
}));
});
Ok(rx)
})?
.recv()
}
#[inline]
fn raw_operation<R>(
&self,
f: impl FnOnce(&mut RawNode) -> Result<R, Error>,
) -> Result<R, Error> {
let mut raw_node = self.raw_node.lock();
let res = f(&mut *raw_node);
drop(raw_node);
event::broadcast(Event::RaftLoopNeeded);
rx.recv()
res
}
#[inline]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment