diff --git a/src/traft/event.rs b/src/traft/event.rs index de475024996cdb8040d3a5d2b2a471137258365e..09b964f649a5ba13a76049ac29db658274ace8a8 100644 --- a/src/traft/event.rs +++ b/src/traft/event.rs @@ -25,7 +25,7 @@ pub type BoxResult<T> = std::result::Result<T, Box<dyn std::error::Error>>; JointStateLeave = "raft.joint-state-leave", JointStateDrop = "raft.joint-state-drop", RaftLoopNeeded = "raft.loop-needed", - CommitIndexPersisted = "raft.commit-index-persisted", + EntryApplied = "raft.entry-applied", MigrateDone = "picodata.migrate-done", } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 454a8ca282a49f822a6feef2290624591385abd2..94ce364effe26876b1cb102bfa4dc9ae90afb102 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -694,6 +694,7 @@ impl NodeImpl { } let res = self.raft_storage.persist_applied(entry_index); + event::broadcast(Event::EntryApplied); if let Err(e) = res { tlog!( Error, @@ -1305,7 +1306,6 @@ impl NodeImpl { // Raft HardState changed, and we need to persist it. if let Some(hs) = ready.hs() { self.raft_storage.persist_hard_state(hs).unwrap(); - event::broadcast(Event::CommitIndexPersisted); if let Err(e) = self.status.send_modify(|s| s.term = hs.term) { tlog!(Warning, "failed updating current term: {e}"; "term" => hs.term) } @@ -1331,7 +1331,6 @@ impl NodeImpl { // Update commit index. if let Some(commit) = light_rd.commit_index() { self.raft_storage.persist_commit(commit).unwrap(); - event::broadcast(Event::CommitIndexPersisted); } // Apply committed entries. diff --git a/src/traft/rpc/sync.rs b/src/traft/rpc/sync.rs index 2e6a71125806b4819a14fa3dcd6ab4322ae44b8c..2e190576eca7307082b04e95af09dea76af44a19 100644 --- a/src/traft/rpc/sync.rs +++ b/src/traft/rpc/sync.rs @@ -36,7 +36,7 @@ pub fn wait_for_index_timeout( } if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { - event::wait_timeout(event::Event::CommitIndexPersisted, timeout)?; + event::wait_timeout(event::Event::EntryApplied, timeout)?; } else { return Err(Error::Timeout); }