Skip to content
Snippets Groups Projects
Commit 8aafc110 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

refactor: implement joint_state_latch using KVCell

parent bd16c9fe
No related branches found
No related tags found
1 merge request!252Implement joint_state_latch using KVCell
Pipeline #12223 canceled
...@@ -16,7 +16,7 @@ use ::tarantool::fiber::{Cond, Mutex}; ...@@ -16,7 +16,7 @@ use ::tarantool::fiber::{Cond, Mutex};
use ::tarantool::proc; use ::tarantool::proc;
use ::tarantool::tlua; use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction; use ::tarantool::transaction::start_transaction;
use std::cell::{Cell, RefCell}; use std::cell::RefCell;
use std::collections::HashMap; use std::collections::HashMap;
use std::collections::HashSet; use std::collections::HashSet;
use std::convert::TryFrom; use std::convert::TryFrom;
...@@ -33,6 +33,7 @@ use crate::traft::RaftId; ...@@ -33,6 +33,7 @@ use crate::traft::RaftId;
use crate::traft::RaftIndex; use crate::traft::RaftIndex;
use crate::traft::RaftTerm; use crate::traft::RaftTerm;
use crate::unwrap_some_or; use crate::unwrap_some_or;
use crate::warn_or_panic;
use ::tarantool::util::IntoClones as _; use ::tarantool::util::IntoClones as _;
use protobuf::Message as _; use protobuf::Message as _;
use std::iter::FromIterator as _; use std::iter::FromIterator as _;
...@@ -279,6 +280,7 @@ struct NodeImpl { ...@@ -279,6 +280,7 @@ struct NodeImpl {
pub raw_node: RawNode, pub raw_node: RawNode,
pub notifications: HashMap<LogicalClock, Notify>, pub notifications: HashMap<LogicalClock, Notify>,
topology_cache: KVCell<RaftTerm, Topology>, topology_cache: KVCell<RaftTerm, Topology>,
joint_state_latch: KVCell<RaftIndex, Notify>,
storage: RaftSpaceAccess, storage: RaftSpaceAccess,
pool: ConnectionPool, pool: ConnectionPool,
lc: LogicalClock, lc: LogicalClock,
...@@ -320,6 +322,7 @@ impl NodeImpl { ...@@ -320,6 +322,7 @@ impl NodeImpl {
raw_node, raw_node,
notifications: Default::default(), notifications: Default::default(),
topology_cache: KVCell::new(), topology_cache: KVCell::new(),
joint_state_latch: KVCell::new(),
storage, storage,
pool, pool,
lc, lc,
...@@ -516,15 +519,14 @@ impl NodeImpl { ...@@ -516,15 +519,14 @@ impl NodeImpl {
let last_index = self.raw_node.raft.raft_log.last_index(); let last_index = self.raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1); assert_eq!(last_index, prev_index + 1);
if !self.joint_state_latch.is_empty() {
warn_or_panic!("joint state latch is locked");
}
let (rx, tx) = Notify::new().into_clones(); let (rx, tx) = Notify::new().into_clones();
with_joint_state_latch(|joint_state_latch| { self.joint_state_latch.insert(last_index, tx);
assert!(joint_state_latch.take().is_none()); event::broadcast(Event::JointStateEnter);
event::broadcast(Event::JointStateEnter);
joint_state_latch.set(Some(JointStateLatch {
index: last_index,
notify: tx,
}));
});
Ok(rx) Ok(rx)
} }
...@@ -591,33 +593,24 @@ impl NodeImpl { ...@@ -591,33 +593,24 @@ impl NodeImpl {
} }
} }
with_joint_state_latch(|joint_state_latch| { if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) {
if let Some(latch) = joint_state_latch.take() { // It was expected to be a ConfChange entry, but it's
if entry.index != latch.index { // normal. Raft must have overriden it, or there was
joint_state_latch.set(Some(latch)); // a re-election.
return; let e = RaftError::ConfChangeError("rolled back".into());
}
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
latch.notify.notify_err(e); notify.notify_err(e);
event::broadcast(Event::JointStateDrop); event::broadcast(Event::JointStateDrop);
} }
});
} }
/// Is called during a transaction /// Is called during a transaction
fn handle_committed_conf_change(&mut self, entry: traft::Entry) { fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
let latch_unlock = || { let mut latch_unlock = || {
with_joint_state_latch(|joint_state_latch| { if let Some(notify) = self.joint_state_latch.take() {
if let Some(latch) = joint_state_latch.take() { notify.notify_ok(());
latch.notify.notify_ok(()); event::broadcast(Event::JointStateLeave);
event::broadcast(Event::JointStateLeave); }
}
});
}; };
// Beware: a tiny difference in type names (`V2` or not `V2`) // Beware: a tiny difference in type names (`V2` or not `V2`)
...@@ -802,28 +795,6 @@ impl NodeImpl { ...@@ -802,28 +795,6 @@ impl NodeImpl {
} }
} }
#[derive(Debug)]
struct JointStateLatch {
/// Index of the latest ConfChange entry proposed.
/// Helps detecting when the entry is overridden
/// due to a re-election.
index: RaftIndex,
/// Make a notification when the latch is unlocked.
/// Notification is a `Result<Box<()>>`.
notify: Notify,
}
fn with_joint_state_latch<F, R>(f: F) -> R
where
F: FnOnce(&Cell<Option<JointStateLatch>>) -> R,
{
thread_local! {
static JOINT_STATE_LATCH: Cell<Option<JointStateLatch>> = Cell::new(None);
}
JOINT_STATE_LATCH.with(f)
}
fn raft_main_loop( fn raft_main_loop(
status: Rc<RefCell<Status>>, status: Rc<RefCell<Status>>,
node_impl: Rc<Mutex<NodeImpl>>, node_impl: Rc<Mutex<NodeImpl>>,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment