Skip to content
Snippets Groups Projects
Commit b38bd228 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: use a watch channel instead of Event::EntryApplied

parent 1e69a241
No related branches found
No related tags found
1 merge request!765fix: used to persist applied index before commit index
...@@ -4,7 +4,6 @@ use ::tarantool::fiber; ...@@ -4,7 +4,6 @@ use ::tarantool::fiber;
use crate::has_grades; use crate::has_grades;
use crate::tlog; use crate::tlog;
use crate::traft::event;
use crate::traft::node; use crate::traft::node;
use crate::unwrap_ok_or; use crate::unwrap_ok_or;
...@@ -54,7 +53,8 @@ pub async fn callback(plugin_list: &'static [Plugin]) { ...@@ -54,7 +53,8 @@ pub async fn callback(plugin_list: &'static [Plugin]) {
break; break;
} }
if let Err(e) = event::wait_timeout(event::Event::EntryApplied, Duration::MAX) { let applied = node.get_index();
if let Err(e) = node.wait_index(applied + 1, Duration::MAX) {
tlog!(Warning, "failed to shutdown gracefully: {e}"); tlog!(Warning, "failed to shutdown gracefully: {e}");
} }
} }
......
...@@ -29,7 +29,7 @@ use crate::storage::SPACE_ID_INTERNAL_MAX; ...@@ -29,7 +29,7 @@ use crate::storage::SPACE_ID_INTERNAL_MAX;
use crate::storage::{ClusterwideTable, PropertyName}; use crate::storage::{ClusterwideTable, PropertyName};
use crate::traft::error::Error; use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op}; use crate::traft::op::{Ddl, Op};
use crate::traft::{self, event, node, RaftIndex}; use crate::traft::{self, node, RaftIndex};
use crate::util::effective_user_id; use crate::util::effective_user_id;
//////////////////////////////////////////////////////////////////////////////// ////////////////////////////////////////////////////////////////////////////////
...@@ -887,11 +887,12 @@ pub fn wait_for_ddl_commit( ...@@ -887,11 +887,12 @@ pub fn wait_for_ddl_commit(
prepare_commit: RaftIndex, prepare_commit: RaftIndex,
timeout: Duration, timeout: Duration,
) -> traft::Result<RaftIndex> { ) -> traft::Result<RaftIndex> {
let raft_storage = &node::global()?.raft_storage; let node = node::global()?;
let raft_storage = &node.raft_storage;
let deadline = fiber::clock().saturating_add(timeout); let deadline = fiber::clock().saturating_add(timeout);
let last_seen = prepare_commit; let last_seen = prepare_commit;
loop { loop {
let cur_applied = raft_storage.applied()?; let cur_applied = node.get_index();
let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?; let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?;
for entry in new_entries { for entry in new_entries {
if entry.entry_type != raft::prelude::EntryType::EntryNormal { if entry.entry_type != raft::prelude::EntryType::EntryNormal {
...@@ -906,9 +907,7 @@ pub fn wait_for_ddl_commit( ...@@ -906,9 +907,7 @@ pub fn wait_for_ddl_commit(
} }
} }
if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() { node.wait_index(cur_applied + 1, deadline.duration_since(fiber::clock()))?;
return Err(Error::Timeout);
}
} }
} }
......
...@@ -24,7 +24,6 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; ...@@ -24,7 +24,6 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>;
JointStateEnter = "raft.joint-state-enter", JointStateEnter = "raft.joint-state-enter",
JointStateLeave = "raft.joint-state-leave", JointStateLeave = "raft.joint-state-leave",
JointStateDrop = "raft.joint-state-drop", JointStateDrop = "raft.joint-state-drop",
EntryApplied = "raft.entry-applied",
} }
} }
......
...@@ -52,6 +52,7 @@ use ::raft::INVALID_ID; ...@@ -52,6 +52,7 @@ use ::raft::INVALID_ID;
use ::tarantool::error::TarantoolError; use ::tarantool::error::TarantoolError;
use ::tarantool::fiber; use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard; use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::Error as TimeoutError;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch}; use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex; use ::tarantool::fiber::Mutex;
...@@ -148,6 +149,7 @@ pub struct Node { ...@@ -148,6 +149,7 @@ pub struct Node {
pub(crate) governor_loop: governor::Loop, pub(crate) governor_loop: governor::Loop,
pub(crate) sentinel_loop: sentinel::Loop, pub(crate) sentinel_loop: sentinel::Loop,
status: watch::Receiver<Status>, status: watch::Receiver<Status>,
applied: watch::Receiver<RaftIndex>,
/// Should be locked during join and update instance request /// Should be locked during join and update instance request
/// to avoid costly cas conflicts during concurrent requests. /// to avoid costly cas conflicts during concurrent requests.
...@@ -183,6 +185,7 @@ impl Node { ...@@ -183,6 +185,7 @@ impl Node {
let raft_id = node_impl.raft_id(); let raft_id = node_impl.raft_id();
let status = node_impl.status.subscribe(); let status = node_impl.status.subscribe();
let applied = node_impl.applied.subscribe();
let node_impl = Rc::new(Mutex::new(node_impl)); let node_impl = Rc::new(Mutex::new(node_impl));
...@@ -206,6 +209,7 @@ impl Node { ...@@ -206,6 +209,7 @@ impl Node {
storage, storage,
raft_storage, raft_storage,
status, status,
applied,
instances_update: Mutex::new(()), instances_update: Mutex::new(()),
}; };
...@@ -214,29 +218,32 @@ impl Node { ...@@ -214,29 +218,32 @@ impl Node {
Ok(node) Ok(node)
} }
#[inline(always)]
pub fn raft_id(&self) -> RaftId { pub fn raft_id(&self) -> RaftId {
self.raft_id self.raft_id
} }
#[inline(always)]
pub fn status(&self) -> Status { pub fn status(&self) -> Status {
self.status.get() self.status.get()
} }
#[inline(always)]
pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> { pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {
self.node_impl.lock() self.node_impl.lock()
} }
/// Wait for the status to be changed. /// Wait for the status to be changed.
/// **This function yields** /// **This function yields**
#[inline(always)]
pub fn wait_status(&self) { pub fn wait_status(&self) {
fiber::block_on(self.status.clone().changed()).unwrap(); fiber::block_on(self.status.clone().changed()).unwrap();
} }
/// Returns current applied [`RaftIndex`]. /// Returns current applied [`RaftIndex`].
#[inline(always)]
pub fn get_index(&self) -> RaftIndex { pub fn get_index(&self) -> RaftIndex {
self.raft_storage self.applied.get()
.applied()
.expect("reading from memtx should never fail")
} }
/// Performs the quorum read operation. /// Performs the quorum read operation.
...@@ -276,25 +283,30 @@ impl Node { ...@@ -276,25 +283,30 @@ impl Node {
#[inline] #[inline]
pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> { pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
tlog!(Debug, "waiting for applied index {target}"); tlog!(Debug, "waiting for applied index {target}");
let mut applied = self.applied.clone();
let deadline = fiber::clock().saturating_add(timeout); let deadline = fiber::clock().saturating_add(timeout);
loop { fiber::block_on(async {
let current = self.get_index(); loop {
if current >= target { let current = self.get_index();
tlog!( if current >= target {
Debug, tlog!(
"done waiting for applied index {target}, current: {current}" Debug,
); "done waiting for applied index {target}, current: {current}"
return Ok(current); );
} return Ok(current);
}
if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() { let timeout = deadline.duration_since(fiber::clock());
tlog!( let res = applied.changed().timeout(timeout).await;
Debug, if let Err(TimeoutError::Expired) = res {
"failed waiting for applied index {target}: timeout, current: {current}" tlog!(
); Debug,
return Err(Error::Timeout); "failed waiting for applied index {target}: timeout, current: {current}"
);
return Err(Error::Timeout);
}
} }
} })
} }
/// Propose an operation and wait for it's result. /// Propose an operation and wait for it's result.
...@@ -398,6 +410,7 @@ pub(crate) struct NodeImpl { ...@@ -398,6 +410,7 @@ pub(crate) struct NodeImpl {
pool: Rc<ConnectionPool>, pool: Rc<ConnectionPool>,
lc: LogicalClock, lc: LogicalClock,
status: watch::Sender<Status>, status: watch::Sender<Status>,
applied: watch::Sender<RaftIndex>,
instance_reachability: Rc<RefCell<InstanceReachabilityManager>>, instance_reachability: Rc<RefCell<InstanceReachabilityManager>>,
} }
...@@ -436,6 +449,7 @@ impl NodeImpl { ...@@ -436,6 +449,7 @@ impl NodeImpl {
raft_state: RaftState::Follower, raft_state: RaftState::Follower,
main_loop_status: "idle", main_loop_status: "idle",
}); });
let (applied, _) = watch::channel(applied);
Ok(Self { Ok(Self {
raw_node, raw_node,
...@@ -447,6 +461,7 @@ impl NodeImpl { ...@@ -447,6 +461,7 @@ impl NodeImpl {
pool, pool,
lc, lc,
status, status,
applied,
}) })
} }
...@@ -614,7 +629,6 @@ impl NodeImpl { ...@@ -614,7 +629,6 @@ impl NodeImpl {
} }
let res = self.raft_storage.persist_applied(entry_index); let res = self.raft_storage.persist_applied(entry_index);
event::broadcast(Event::EntryApplied);
if let Err(e) = res { if let Err(e) = res {
tlog!( tlog!(
Error, Error,
...@@ -622,6 +636,8 @@ impl NodeImpl { ...@@ -622,6 +636,8 @@ impl NodeImpl {
"index" => entry_index "index" => entry_index
); );
} }
#[rustfmt::skip]
self.applied.send(entry_index).expect("applied shouldn't ever be borrowed across yields");
Ok(()) Ok(())
})?; })?;
...@@ -1496,6 +1512,10 @@ impl NodeImpl { ...@@ -1496,6 +1512,10 @@ impl NodeImpl {
if let Err(e) = transaction(|| -> traft::Result<()> { if let Err(e) = transaction(|| -> traft::Result<()> {
let meta = snapshot.get_metadata(); let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?; self.raft_storage.handle_snapshot_metadata(meta)?;
// handle_snapshot_metadata persists applied index, so we update the watch channel
#[rustfmt::skip]
self.applied.send(meta.index).expect("applied shouldn't ever be borrowed across yields");
let is_master = !self.is_readonly(); let is_master = !self.is_readonly();
self.storage self.storage
.apply_snapshot_data(&snapshot_data, is_master)?; .apply_snapshot_data(&snapshot_data, is_master)?;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment