diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index 50f95243ec3fda7e95b45828848540d9ba7c1587..3d647589ef5dbc2c4c6d0e865152e56c72f960b3 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -215,7 +215,14 @@ impl RaftSpaceAccess { fn persist_compacted_index(&self, replace compacted_index: RaftTerm) -> _; } + /// Persists the `ConfState` (voters, learners, etc.). + /// + /// # Panics + /// + /// In debug mode panics if invoked out of a transaction. + /// pub fn persist_conf_state(&self, cs: &raft::ConfState) -> tarantool::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); self.persist_voters(&cs.voters)?; self.persist_learners(&cs.learners)?; self.persist_voters_outgoing(&cs.voters_outgoing)?; @@ -224,7 +231,14 @@ impl RaftSpaceAccess { Ok(()) } + /// Persists the `HardState` (term, vote, and commit index). + /// + /// # Panics + /// + /// In debug mode panics if invoked out of a transaction. + /// pub fn persist_hard_state(&self, hs: &raft::HardState) -> tarantool::Result<()> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); self.persist_term(hs.term)?; self.persist_vote(hs.vote)?; self.persist_commit(hs.commit)?; @@ -248,7 +262,12 @@ impl RaftSpaceAccess { /// /// Returns the number of entries deleted. /// + /// # Panics + /// + /// In debug mode panics if invoked out of a transaction. + /// pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> { + debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); // IteratorType::LT means tuples are returned in descending order let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?; @@ -419,6 +438,7 @@ macro_rules! assert_err { mod tests { use super::*; + use ::tarantool::transaction::start_transaction; fn dummy_entry(index: RaftIndex, term: RaftTerm) -> raft::Entry { raft::Entry { @@ -455,8 +475,12 @@ mod tests { assert_err!(S::term(&storage, 1), "log unavailable"); // Part 2. Whole log was compacted. - storage.persist_compacted_term(9).unwrap(); - storage.persist_compacted_index(99).unwrap(); + start_transaction(|| -> tarantool::Result<()> { + storage.persist_compacted_term(9)?; + storage.persist_compacted_index(99)?; + Ok(()) + }) + .unwrap(); assert_eq!(S::first_index(&storage), Ok(100)); assert_eq!(S::last_index(&storage), Ok(99)); @@ -568,13 +592,14 @@ mod tests { storage.persist_entries(&vec![dummy_entry(i, i)]).unwrap(); } let entries = |lo, hi| S::entries(&storage, lo, hi, u64::MAX); + let compact_log = |up_to| start_transaction(|| storage.compact_log(up_to)); assert_eq!(S::first_index(&storage), Ok(first)); assert_eq!(S::last_index(&storage), Ok(last)); assert_eq!(entries(first, last + 1).unwrap().len(), 10); let first = 6; - storage.compact_log(6).unwrap(); + assert_eq!(compact_log(6).unwrap(), 5); assert_eq!(S::first_index(&storage), Ok(first)); assert_eq!(S::last_index(&storage), Ok(last)); assert_eq!(S::term(&storage, 5), Ok(5)); @@ -592,6 +617,10 @@ mod tests { let first_entry = tuple.decode::<traft::Entry>().unwrap(); assert_eq!(first_entry.index, 6); + + // check idempotency + assert_eq!(compact_log(0).unwrap(), 0); + assert_eq!(compact_log(first).unwrap(), 0); } #[::tarantool::test] @@ -611,7 +640,7 @@ mod tests { commit: 98, ..Default::default() }; - storage.persist_hard_state(&hs).unwrap(); + start_transaction(|| storage.persist_hard_state(&hs)).unwrap(); assert_eq!(S::initial_state(&storage).unwrap().hard_state, hs); } @@ -635,7 +664,7 @@ mod tests { ..Default::default() }; - storage.persist_conf_state(&cs).unwrap(); + start_transaction(|| storage.persist_conf_state(&cs)).unwrap(); assert_eq!(S::initial_state(&storage).unwrap().conf_state, cs); }