diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index 09023cd6517f39fcfd5a1bf5c15610879c91e15a..50f95243ec3fda7e95b45828848540d9ba7c1587 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -161,18 +161,21 @@ impl RaftSpaceAccess { }) } + /// Returns a slice of log entries in the range `[low, high)`. + /// It also can be thought of as `[first_index, last_index+1)`. + /// + /// As distinct from [`raft::Storage::entries`], this function + /// doesn't perform bound checks. In case some entries are missing, + /// the returned vec would be shorter than expected. + /// + /// # Panics + /// + /// Panics if `high` < `low`. + /// pub fn entries(&self, low: RaftIndex, high: RaftIndex) -> tarantool::Result<Vec<raft::Entry>> { - // low <= idx < high - let mut iter = self.space_raft_log.select(IteratorType::GE, &(low,))?; - let Some(tuple) = iter.next() else { - return Ok(vec![]); - }; - - let first_entry: traft::Entry = tuple.decode()?; - assert_eq!(first_entry.index, low); + let iter = self.space_raft_log.select(IteratorType::GE, &(low,))?; + let mut ret = Vec::with_capacity((high - low) as usize); - let mut ret = Vec::with_capacity((high - low) as _); - ret.push(first_entry.into()); for tuple in iter { let row: traft::Entry = tuple.decode()?; if row.index >= high { @@ -284,16 +287,49 @@ impl raft::Storage for RaftSpaceAccess { Ok(ret) } + /// Returns a slice of log entries in the range `[low, high)`. + /// + /// As distinct from [`RaftSpaceAccess::entries`] this function + /// returns either all entries or an error. + /// + /// Returns `Err(Compacted)` if `low <= compacted_index`. Notice + /// that an entry for `compacted_index` doesn't exist. + /// + /// Returns `Err(Unavailable)` in case at least one entry in the + /// range is missing (either `high > last_index` or it's really + /// missing). Raft-rs will panic in this case, but it's fair. + /// + /// The result is handled in raft_log.rs:583 (`RaftLog::slice`). + /// + /// # Panics + /// + /// Panics if `high` < `low`. + /// fn entries( &self, low: RaftIndex, high: RaftIndex, _max_size: impl Into<Option<u64>>, ) -> Result<Vec<raft::Entry>, RaftError> { - // tlog!(Info, "++++++ entries {low} {high}"); - Ok(self.entries(low, high).cvt_err()?) + if low <= self.compacted_index().cvt_err()?.unwrap_or(0) { + return Err(RaftError::Store(StorageError::Compacted)); + } + + let ret = self.entries(low, high).cvt_err()?; + if ret.len() < (high - low) as usize { + return Err(RaftError::Store(StorageError::Unavailable)); + } + Ok(ret) } + /// Returns the term of entry `idx`. + /// + /// The valid range is `[compacted_index, last_index()]`, that's why + /// we also persist `compacted_term`. + /// + /// For `idx < compacted_index` the functon returns `Err(Compacted)`. + /// For `idx > last_index()` the functon returns `Err(Unavailable)`. + /// fn term(&self, idx: RaftIndex) -> Result<RaftTerm, RaftError> { let compacted_index = self.compacted_index().cvt_err()?.unwrap_or(0); let compacted_term = self.compacted_term().cvt_err()?.unwrap_or(0); @@ -301,6 +337,8 @@ impl raft::Storage for RaftSpaceAccess { if idx == compacted_index { return Ok(compacted_term); } else if idx < compacted_index { + // Returning `Err(Compacted)` is safe. It's handled in + // raft_log.rs:134 (`RaftLog::term`) return Err(RaftError::Store(StorageError::Compacted)); } @@ -312,25 +350,32 @@ impl raft::Storage for RaftSpaceAccess { .expect("term is non-nullable")); } - // Returning an error from this function will most likely result in - // a panic, so just don't do it ok? - // - // Don't even try to understand what the hell is going on inside - // raft-rs, the authors don't even know. They just put panics - // wherever they want, because nobody can stop them. - Ok(0) + // Returning `Err(Unavailable)` is safe. It's handled in + // raft_log.rs:134 (`RaftLog::term`) + Err(RaftError::Store(StorageError::Unavailable)) } + /// Returns `compacted_index` plus 1. + /// + /// The naming is actually misleading, as returned index might not + /// exist. And that means `last_index < first_index` and indicates + /// that the log was compacted. Raft-rs treats it the same way. + /// + /// Empty log is considered compacted at index 0 with term 0. + /// fn first_index(&self) -> Result<RaftIndex, RaftError> { let compacted_index = self.compacted_index().cvt_err()?.unwrap_or(0); - // Even though entry with that index might not exist, we pretend - // it does, because raft-rs will panic otherwise. Based on the - // doc comments of `raft::Storage::first_index`, 0 entries - // persisted means first index = 1. raft-rs will panic, if you - // return 0. Ok(1 + compacted_index) } + /// Returns the last index which term is available. + /// + /// And again, the naming is actually misleading. The entry itself + /// might not exist if the whole log was compacted or is actually + /// empty. That case also implies `last_index < first_index`. + /// + /// Empty log is considered compacted at index 0 with term 0. + /// fn last_index(&self) -> Result<RaftIndex, RaftError> { let tuple = self.space_raft_log.primary_key().max(&()).cvt_err()?; @@ -375,8 +420,16 @@ macro_rules! assert_err { mod tests { use super::*; + fn dummy_entry(index: RaftIndex, term: RaftTerm) -> raft::Entry { + raft::Entry { + term, + index, + ..Default::default() + } + } + #[::tarantool::test] - fn test_storage2() { + fn test_cloning() { // It's fine to create two access objects let s1 = RaftSpaceAccess::new().unwrap(); let s2 = RaftSpaceAccess::new().unwrap(); @@ -389,41 +442,82 @@ mod tests { } #[::tarantool::test] - fn test_storage2_log() { + fn test_log() { use ::raft::Storage as S; - let test_entries = vec![raft::Entry { - term: 9u64, - index: 99u64, - ..Default::default() - }]; + // Part 1. No log, no snapshot. let storage = RaftSpaceAccess::new().unwrap(); - storage.persist_entries(&test_entries).unwrap(); - assert_eq!(S::first_index(&storage), Ok(99)); + assert_eq!(S::first_index(&storage), Ok(1)); + assert_eq!(S::last_index(&storage), Ok(0)); + + assert_eq!(S::term(&storage, 0), Ok(0)); + assert_err!(S::term(&storage, 1), "log unavailable"); + + // Part 2. Whole log was compacted. + storage.persist_compacted_term(9).unwrap(); + storage.persist_compacted_index(99).unwrap(); + + assert_eq!(S::first_index(&storage), Ok(100)); assert_eq!(S::last_index(&storage), Ok(99)); + + assert_err!(S::term(&storage, 0), "log compacted"); assert_eq!(S::term(&storage, 99), Ok(9)); - assert_err!(S::entries(&storage, 1, 1, u64::MAX), "log compacted"); - assert_err!(S::entries(&storage, 1, 99, u64::MAX), "log compacted"); + assert_err!(S::term(&storage, 100), "log unavailable"); + + // Part 3. Add some new entries. + let test_entries = vec![ + dummy_entry(100, 10), + dummy_entry(101, 10), + dummy_entry(102, 10), + ]; + storage.persist_entries(&test_entries).unwrap(); + + let (first, last) = (100, 102); + assert_eq!(S::first_index(&storage), Ok(first)); + assert_eq!(S::last_index(&storage), Ok(last)); + + assert_eq!(S::term(&storage, first - 1), Ok(9)); + assert_eq!(S::term(&storage, first), Ok(10)); + assert_eq!(S::term(&storage, last), Ok(10)); + assert_err!(S::term(&storage, last + 1), "log unavailable"); + + assert_err!( + S::entries(&storage, first - 1, last + 1, u64::MAX), + "log compacted" + ); + + assert_err!( + S::entries(&storage, first, last + 2, u64::MAX), + "log unavailable" + ); + // This is what raft-rs literally does, see raft_log.rs:388 + // (`RaftLog::entries`) assert_eq!( - S::entries(&storage, 99, 99, u64::MAX), + S::entries(&storage, first, last + 1, u64::MAX), Ok(test_entries.clone()) ); - assert_eq!(S::entries(&storage, 99, 9999, u64::MAX), Ok(test_entries)); - assert_eq!(S::term(&storage, 100), Ok(0)); + } + + #[::tarantool::test] + fn test_damaged_storage() { + use ::raft::Storage as S; - let raft_log = Space::find(RaftSpaceAccess::SPACE_RAFT_LOG).unwrap(); + let storage = RaftSpaceAccess::new().unwrap(); + let raft_log = storage.space_raft_log.clone(); + let term = 666; - raft_log.put(&(1337, 99, 1, "", ())).unwrap(); + let (first, last) = (1, 1); + raft_log.put(&(1337, first, term, "", ())).unwrap(); assert_err!( - S::entries(&storage, 99, 100, u64::MAX), + S::entries(&storage, first, last + 1, u64::MAX), "unknown error Failed to decode tuple: unknown entry type (1337)" ); - raft_log.put(&(0, 99, 1, "", false)).unwrap(); + raft_log.put(&(0, first, term, "", false)).unwrap(); assert_err!( - S::entries(&storage, 99, 100, u64::MAX), + S::entries(&storage, first, last + 1, u64::MAX), concat!( "unknown error", " Failed to decode tuple:", @@ -432,9 +526,14 @@ mod tests { ) ); + raft_log.put(&(0, 1, term, "", ())).unwrap(); + // skip index 2 + raft_log.put(&(0, 3, term, "", ())).unwrap(); + assert_err!(S::entries(&storage, 1, 4, u64::MAX), "log unavailable"); + raft_log.primary_key().drop().unwrap(); assert_err!( - S::entries(&storage, 1, 100, u64::MAX), + S::entries(&storage, first, last + 1, u64::MAX), concat!( "unknown error", " Tarantool error:", @@ -445,7 +544,7 @@ mod tests { raft_log.drop().unwrap(); assert_err!( - S::entries(&storage, 1, 100, u64::MAX), + S::entries(&storage, first, last + 1, u64::MAX), format!( concat!( "unknown error", @@ -459,7 +558,44 @@ mod tests { } #[::tarantool::test] - fn test_storage2_hard_state() { + fn test_log_compaction() { + use ::raft::Storage as S; + + let storage = RaftSpaceAccess::new().unwrap(); + + let (first, last) = (1, 10); + for i in first..=last { + storage.persist_entries(&vec![dummy_entry(i, i)]).unwrap(); + } + let entries = |lo, hi| S::entries(&storage, lo, hi, u64::MAX); + + assert_eq!(S::first_index(&storage), Ok(first)); + assert_eq!(S::last_index(&storage), Ok(last)); + assert_eq!(entries(first, last + 1).unwrap().len(), 10); + + let first = 6; + storage.compact_log(6).unwrap(); + assert_eq!(S::first_index(&storage), Ok(first)); + assert_eq!(S::last_index(&storage), Ok(last)); + assert_eq!(S::term(&storage, 5), Ok(5)); + assert_eq!(S::term(&storage, first), Ok(6)); + assert_err!(S::term(&storage, 4), "log compacted"); + assert_err!(entries(1, last + 1), "log compacted"); + assert_eq!(entries(first, last + 1).unwrap().len(), 5); + + let tuple = storage + .space_raft_log + .primary_key() + .min(&()) + .unwrap() + .unwrap(); + + let first_entry = tuple.decode::<traft::Entry>().unwrap(); + assert_eq!(first_entry.index, 6); + } + + #[::tarantool::test] + fn test_hard_state() { use ::raft::Storage as S; let storage = RaftSpaceAccess::new().unwrap(); @@ -480,7 +616,7 @@ mod tests { } #[::tarantool::test] - fn test_storage2_conf_state() { + fn test_conf_state() { use ::raft::Storage as S; let storage = RaftSpaceAccess::new().unwrap(); @@ -504,7 +640,7 @@ mod tests { } #[::tarantool::test] - fn test_storage2_other_state() { + fn test_other_state() { let storage = RaftSpaceAccess::new().unwrap(); let raft_state = Space::find(RaftSpaceAccess::SPACE_RAFT_STATE).unwrap();