Skip to content
Snippets Groups Projects
Commit 0f315f0b authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: don't compact unapplied entries

parent ba555088
No related branches found
No related tags found
No related merge requests found
...@@ -43,4 +43,21 @@ macro_rules! error_injection { ...@@ -43,4 +43,21 @@ macro_rules! error_injection {
$crate::tarantool::exit(69); $crate::tarantool::exit(69);
}; };
}; };
(block $error:expr) => {{
let error = $error;
#[rustfmt::skip]
if $crate::error_injection::is_enabled(error) {
$crate::tlog!(Info, "################################################################");
$crate::tlog!(Info, "ERROR INJECTED '{}': BLOCKING", error);
$crate::tlog!(Info, "################################################################");
while $crate::error_injection::is_enabled(error) {
::tarantool::fiber::sleep(::std::time::Duration::from_millis(100));
}
$crate::tlog!(Info, "################################################################");
$crate::tlog!(Info, "ERROR UNINJECTED '{}': UNBLOCKING", error);
$crate::tlog!(Info, "################################################################");
};
}};
} }
...@@ -1645,6 +1645,8 @@ impl NodeImpl { ...@@ -1645,6 +1645,8 @@ impl NodeImpl {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}"); tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}"); panic!("transaction failed: {e}");
} }
crate::error_injection!(block "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX");
} }
// Apply committed entries. // Apply committed entries.
......
...@@ -8,6 +8,7 @@ use std::cmp::Ordering; ...@@ -8,6 +8,7 @@ use std::cmp::Ordering;
use std::convert::TryFrom as _; use std::convert::TryFrom as _;
use crate::instance::InstanceId; use crate::instance::InstanceId;
use crate::tlog;
use crate::traft; use crate::traft;
use crate::traft::RaftEntryId; use crate::traft::RaftEntryId;
use crate::traft::RaftId; use crate::traft::RaftId;
...@@ -455,9 +456,9 @@ impl RaftSpaceAccess { ...@@ -455,9 +456,9 @@ impl RaftSpaceAccess {
pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> { pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> {
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
// We cannot drop entries, which weren't commited yet // We cannot drop entries, which weren't applied yet
let commit = self.commit()?; let applied = self.applied()?;
let up_to = up_to.min(commit + 1); let up_to = up_to.min(applied + 1);
// IteratorType::LT means tuples are returned in descending order // IteratorType::LT means tuples are returned in descending order
let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?; let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?;
...@@ -477,14 +478,19 @@ impl RaftSpaceAccess { ...@@ -477,14 +478,19 @@ impl RaftSpaceAccess {
self.persist_compacted_index(compacted_index)?; self.persist_compacted_index(compacted_index)?;
self.persist_compacted_term(compacted_term)?; self.persist_compacted_term(compacted_term)?;
let mut num_deleted = 1;
for tuple in iter { for tuple in iter {
let index = tuple let index = tuple
.field::<RaftIndex>(Self::FIELD_ENTRY_INDEX)? .field::<RaftIndex>(Self::FIELD_ENTRY_INDEX)?
.expect("index is non-nullable"); .expect("index is non-nullable");
self.space_raft_log.delete(&(index,))?; self.space_raft_log.delete(&(index,))?;
num_deleted += 1;
} }
Ok(1 + compacted_index) let new_first_index = 1 + compacted_index;
tlog!(Info, "compacted raft log: deleted {num_deleted} entries, new first entry index is {new_first_index}");
Ok(new_first_index)
} }
} }
......
from conftest import Instance from conftest import Instance, Retriable
def test_compaction(instance: Instance): def test_compaction(instance: Instance):
...@@ -33,3 +33,30 @@ def test_compaction(instance: Instance): ...@@ -33,3 +33,30 @@ def test_compaction(instance: Instance):
# Check idempotency # Check idempotency
assert instance.raft_compact_log(2) == applied_index + 1 assert instance.raft_compact_log(2) == applied_index + 1
assert raft_state("compacted_index") == applied_index assert raft_state("compacted_index") == applied_index
def raft_wait_commit_index(i: Instance, expected, timeout=1):
def make_attempt():
commit = i.eval("return box.space._raft_state:get('commit').value")
assert commit == expected
Retriable(timeout=timeout, rps=10).call(make_attempt)
def test_unapplied_entries_arent_compacted(instance: Instance):
i1 = instance
i1.call("pico._inject_error", "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX", True)
# Propose a raft log entry and wait for it to be committed, but not applied.
index = i1.cas("insert", "_pico_property", ["dont", "compact", "me"])
raft_wait_commit_index(i1, index)
assert i1.call("box.space._pico_property:get", "dont") is None
first = i1.raft_compact_log()
assert first == index
i1.call("pico._inject_error", "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX", False)
i1.raft_wait_index(index)
assert i1.call("box.space._pico_property:get", "dont") == ["dont", "compact", "me"]
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment