Skip to content
Snippets Groups Projects
Commit 6068445d authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

feat: raft log compaction

parent 020e9334
No related branches found
No related tags found
1 merge request!491feat: raft log compaction
......@@ -428,6 +428,21 @@ fn picolib_setup(args: &args::Run) {
},
),
);
luamod.set("compact_raft_log", {
#[derive(tlua::PushInto)]
struct Output {
n_deleted: u64,
}
tlua::Function::new(|last_index_to_delete: u64| -> traft::Result<Output> {
let node = node::global()?;
let n_deleted = start_transaction(|| -> ::tarantool::Result<_> {
let n_deleted = node.raft_storage.compact_log(last_index_to_delete)?;
Ok(n_deleted)
})?;
Ok(Output { n_deleted })
})
});
}
macro_rules! lua_preload {
......
......@@ -136,6 +136,9 @@ impl RaftSpaceAccess {
fn voters_outgoing(&self) -> _<Vec<RaftId>>;
fn learners_next(&self) -> _<Vec<RaftId>>;
fn auto_leave(&self) -> _<bool>;
fn compacted_term(&self) -> _<RaftTerm>;
fn compacted_index(&self) -> _<RaftIndex>;
}
pub fn conf_state(&self) -> tarantool::Result<raft::ConfState> {
......@@ -160,9 +163,16 @@ impl RaftSpaceAccess {
pub fn entries(&self, low: RaftIndex, high: RaftIndex) -> tarantool::Result<Vec<raft::Entry>> {
// low <= idx < high
let mut ret: Vec<raft::Entry> = vec![];
let iter = self.space_raft_log.select(IteratorType::GE, &(low,))?;
let mut iter = self.space_raft_log.select(IteratorType::GE, &(low,))?;
let Some(tuple) = iter.next() else {
return Ok(vec![]);
};
let first_entry: traft::Entry = tuple.decode()?;
assert_eq!(first_entry.index, low);
let mut ret = Vec::with_capacity((high - low) as _);
ret.push(first_entry.into());
for tuple in iter {
let row: traft::Entry = tuple.decode()?;
if row.index >= high {
......@@ -197,6 +207,9 @@ impl RaftSpaceAccess {
fn persist_voters_outgoing(&self, replace voters_outgoing: &[RaftId]) -> _;
fn persist_learners_next(&self, replace learners_next: &[RaftId]) -> _;
fn persist_auto_leave(&self, replace auto_leave: bool) -> _;
fn persist_compacted_term(&self, replace compacted_term: RaftTerm) -> _;
fn persist_compacted_index(&self, replace compacted_index: RaftTerm) -> _;
}
pub fn persist_conf_state(&self, cs: &raft::ConfState) -> tarantool::Result<()> {
......@@ -222,6 +235,45 @@ impl RaftSpaceAccess {
}
Ok(())
}
/// Trims raft log up to the given index (excluding the index
/// itself).
///
/// It also updates the `compacted_index` & `compacted_term`
/// raft-state values, so it **should be invoked within a
/// transaction**.
///
/// Returns the number of entries deleted.
///
pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> {
// IteratorType::LT means tuples are returned in descending order
let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?;
let Some(tuple) = iter.next() else { return Ok(0) };
let index = tuple
.field::<RaftIndex>(Self::FIELD_ENTRY_INDEX)?
.expect("index is non-nullable");
let term = tuple
.field::<RaftTerm>(Self::FIELD_ENTRY_TERM)?
.expect("term is non-nullable");
self.persist_compacted_index(index)?;
self.persist_compacted_term(term)?;
self.space_raft_log.delete(&(index,))?;
let mut n_deleted = 1;
for tuple in iter {
let index = tuple
.field::<RaftIndex>(Self::FIELD_ENTRY_INDEX)?
.expect("index is non-nullable");
if let Some(_) = self.space_raft_log.delete(&(index,))? {
n_deleted += 1;
}
}
Ok(n_deleted)
}
}
impl raft::Storage for RaftSpaceAccess {
......@@ -243,37 +295,51 @@ impl raft::Storage for RaftSpaceAccess {
}
fn term(&self, idx: RaftIndex) -> Result<RaftTerm, RaftError> {
if idx == 0 {
return Ok(0);
let compacted_index = self.compacted_index().cvt_err()?.unwrap_or(0);
let compacted_term = self.compacted_term().cvt_err()?.unwrap_or(0);
if idx == compacted_index {
return Ok(compacted_term);
} else if idx < compacted_index {
return Err(RaftError::Store(StorageError::Compacted));
}
// tlog!(Info, "++++++ term {idx}");
let tuple = self.space_raft_log.get(&(idx,)).cvt_err()?;
if let Some(tuple) = tuple {
Ok(tuple
return Ok(tuple
.field(Self::FIELD_ENTRY_TERM)
.cvt_err()?
.expect("term is non-nullable"))
} else {
Err(RaftError::Store(StorageError::Unavailable))
.expect("term is non-nullable"));
}
// Returning an error from this function will most likely result in
// a panic, so just don't do it ok?
//
// Don't even try to understand what the hell is going on inside
// raft-rs, the authors don't even know. They just put panics
// wherever they want, because nobody can stop them.
Ok(0)
}
fn first_index(&self) -> Result<RaftIndex, RaftError> {
// tlog!(Info, "++++++ first_index");
Ok(1)
let compacted_index = self.compacted_index().cvt_err()?.unwrap_or(0);
// Even though entry with that index might not exist, we pretend
// it does, because raft-rs will panic otherwise. Based on the
// doc comments of `raft::Storage::first_index`, 0 entries
// persisted means first index = 1. raft-rs will panic, if you
// return 0.
Ok(1 + compacted_index)
}
fn last_index(&self) -> Result<RaftIndex, RaftError> {
let tuple: Option<Tuple> = self.space_raft_log.primary_key().max(&()).cvt_err()?;
let tuple = self.space_raft_log.primary_key().max(&()).cvt_err()?;
if let Some(t) = tuple {
Ok(t.field(Self::FIELD_ENTRY_INDEX)
.cvt_err()?
.expect("index is non-nullabe"))
} else {
Ok(0)
Ok(self.compacted_index().cvt_err()?.unwrap_or(0))
}
}
......@@ -334,28 +400,30 @@ mod tests {
let storage = RaftSpaceAccess::new().unwrap();
storage.persist_entries(&test_entries).unwrap();
assert_eq!(S::first_index(&storage), Ok(1));
assert_eq!(S::first_index(&storage), Ok(99));
assert_eq!(S::last_index(&storage), Ok(99));
assert_eq!(S::term(&storage, 99), Ok(9));
assert_eq!(S::entries(&storage, 1, 99, u64::MAX), Ok(vec![]));
assert_eq!(S::entries(&storage, 1, 100, u64::MAX), Ok(test_entries));
assert_err!(S::entries(&storage, 1, 1, u64::MAX), "log compacted");
assert_err!(S::entries(&storage, 1, 99, u64::MAX), "log compacted");
assert_eq!(
S::term(&storage, 100).map_err(|e| format!("{e}")),
Err("log unavailable".into())
S::entries(&storage, 99, 99, u64::MAX),
Ok(test_entries.clone())
);
assert_eq!(S::entries(&storage, 99, 9999, u64::MAX), Ok(test_entries));
assert_eq!(S::term(&storage, 100), Ok(0));
let raft_log = Space::find(RaftSpaceAccess::SPACE_RAFT_LOG).unwrap();
raft_log.put(&(1337, 99, 1, "", ())).unwrap();
assert_err!(
S::entries(&storage, 1, 100, u64::MAX),
S::entries(&storage, 99, 100, u64::MAX),
"unknown error Failed to decode tuple: unknown entry type (1337)"
);
raft_log.put(&(0, 99, 1, "", false)).unwrap();
assert_err!(
S::entries(&storage, 1, 100, u64::MAX),
S::entries(&storage, 99, 100, u64::MAX),
concat!(
"unknown error",
" Failed to decode tuple:",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment