diff --git a/src/luamod.rs b/src/luamod.rs index fbd23a301f4813e8dd20f53d76ef8aed84021318..85c7db454442c694a17dfb789010db0aed4816a6 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -992,7 +992,10 @@ pub(crate) fn setup() { "}, { tlua::function1(|up_to: RaftIndex| -> traft::Result<RaftIndex> { - let raft_storage = &node::global()?.raft_storage; + let node = node::global()?; + let applied = node.get_index(); + let up_to = up_to.min(applied + 1); + let raft_storage = &node.raft_storage; let ret = transaction(|| raft_storage.compact_log(up_to)); Ok(ret?) }) diff --git a/src/traft/node.rs b/src/traft/node.rs index 385ec758a287f998a90ed0d41d51a48891c3978b..624a1d70fb633b66c7dcd4e4e6a8cc7d0a6f66a3 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -2420,10 +2420,10 @@ impl NodeImpl { let mut last_ddl_finalizer = None; let mut last_plugin_op_finalizer = None; - let last_index = self.raft_storage.last_index()?; + let last_applied = self.applied.get(); let newly_added_entries = self.raft_storage - .entries(old_last_index + 1, last_index + 1, None)?; + .entries(old_last_index + 1, last_applied + 1, None)?; // Check if there's a finalizer (e.g. DdlCommit, Plugin::Abort, etc.) // operation among the newly added entries. The finalizers a relied upon @@ -2456,7 +2456,7 @@ impl NodeImpl { } // Add 1 because this entry is to be removed. - let mut compact_until = last_index + 1; + let mut compact_until = last_applied + 1; if let Some(index) = last_ddl_finalizer { if compact_until > index { tlog!(Debug, "preserving ddl finalizer raft op at index {index}"); diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index e92e0875c11087aaf8818a09d642d2a449d0ccc6..a10a536b24bb88b88f89209a10e9e36dcefccb77 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -438,6 +438,17 @@ impl RaftSpaceAccess { /// pub fn handle_snapshot_metadata(&self, meta: &raft::SnapshotMetadata) -> tarantool::Result<()> { let meta_index = meta.index; + + #[cfg(debug_assertions)] + let applied = self.applied()?; + #[cfg(debug_assertions)] + #[rustfmt::skip] + debug_assert!(meta_index >= applied, "meta_index: {meta_index}, applied: {applied}"); + // Must bump the applied index before doing log compaction, + // because log compaction only removes applied entries + self.persist_applied(meta_index)?; + self.persist_conf_state(meta.get_conf_state())?; + // We don't want to have a hole in the log, so we clear everything // before applying the snapshot self.compact_log(meta.index + 1)?; @@ -449,12 +460,6 @@ impl RaftSpaceAccess { // the coordinates of the last entry which was in our log. self.persist_compacted_term(meta.term)?; self.persist_compacted_index(meta.index)?; - - let applied = self.applied()?; - #[rustfmt::skip] - debug_assert!(meta_index >= applied, "meta_index: {meta_index}, applied: {applied}"); - self.persist_applied(meta.index)?; - self.persist_conf_state(meta.get_conf_state())?; Ok(()) } @@ -502,9 +507,13 @@ impl RaftSpaceAccess { Ok(count as _) } - /// Trims raft log up to the given index (excluding the index + /// Trims raft log up to the given index `up_to` (excluding the index /// itself). /// + /// Note that `up_to` must not be greater than current applied index + 1, + /// because it doesn't make sense to compact entries which haven't been + /// applied yet. + /// /// Returns the new `first_index` after log compaction. It may /// differ from the requested one if the corresponding entry doesn't /// exist (either `up_to > last_index+1` or `up_to < first_index`). @@ -516,14 +525,18 @@ impl RaftSpaceAccess { /// /// # Panics /// - /// In debug mode panics if invoked out of a transaction. + /// In debug mode panics in case: + /// - invoked outside of a transaction + /// - `up_to > applied + 1` /// pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> { debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() }); // We cannot drop entries, which weren't applied yet + #[cfg(debug_assertions)] let applied = self.applied()?; - let up_to = up_to.min(applied + 1); + #[cfg(debug_assertions)] + debug_assert!(up_to <= applied + 1, "up_to: {up_to}, applied: {applied}"); // IteratorType::LT means tuples are returned in descending order let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?; @@ -1028,14 +1041,12 @@ mod tests { assert_eq!(compact_log(0).unwrap(), first); assert_eq!(compact_log(first).unwrap(), first); - // cannot compact past applied - assert_eq!(compact_log(applied + 2).unwrap(), applied + 1); - - storage.persist_applied(last).unwrap(); - - // trim to the end - assert_eq!(compact_log(u64::MAX).unwrap(), last + 1); - assert_eq!(storage.space_raft_log.len().unwrap(), 0); + // cannot compact past applied, this call will panic + // Note that we could use `#[tarantool::test(should_panic)]` but in this + // case a panic would happen inside a tarantool transaction, and + // unwinding from within it will leave an unfinished transaction, which + // we don't want to have + // _ = compact_log(applied + 2); } #[::tarantool::test] diff --git a/test/conftest.py b/test/conftest.py index c14e4140554855a6438f6d01ecb98129400b4164..14e2d0bc1aec20a0779bd6a567e4347301bd7bcd 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -2184,15 +2184,7 @@ class PgClient: def build_profile() -> str: - from_env = os.environ.get("BUILD_PROFILE") - - if "CI" in os.environ: - # When running in CI BUILD_PROFILE must always be specified, we rely - # this in a couple of tests - assert from_env is not None, "BUILD_PROFILE must always be set in CI" - - # When running on a developers machine, priorities the usability - return from_env or "dev" + return os.environ.get("BUILD_PROFILE", "dev") def get_test_dir(): diff --git a/test/int/test_snapshot.py b/test/int/test_snapshot.py index 9933c08ed8ed51149df0df1e0d7b51aa100e9bbc..825e12ebda97ba66ba81730ae76eb0ce56143893 100644 --- a/test/int/test_snapshot.py +++ b/test/int/test_snapshot.py @@ -1,7 +1,5 @@ import time -import pytest - from conftest import Cluster, Retriable @@ -68,9 +66,6 @@ def assert_eq(lhs, rhs): assert lhs == rhs -@pytest.mark.xfail( - reason="flaky, see: https://git.picodata.io/core/picodata/-/issues/779" -) def test_large_snapshot(cluster: Cluster): i1, i2, i3, i4 = cluster.deploy(instance_count=4) @@ -215,14 +210,6 @@ def test_large_snapshot(cluster: Cluster): i4.env["PICODATA_SCRIPT"] = script_path i4.start() - # Wait for i4 to start receiving the snapshot - Retriable(10, 60).call( - lambda: assert_eq( - i4.call(".proc_runtime_info")["internal"]["main_loop_status"], - "receiving snapshot", - ) - ) - # In the middle of snapshot application propose a new entry index = cluster.cas("insert", "_pico_property", ["pokemon", "snap"]) for i in [i1, i2, i3]: @@ -234,14 +221,6 @@ def test_large_snapshot(cluster: Cluster): i5.env["PICODATA_SCRIPT"] = script_path i5.start() - # Wait for i5 to start receiving the snapshot - Retriable(10, 60).call( - lambda: assert_eq( - i5.call(".proc_runtime_info")["internal"]["main_loop_status"], - "receiving snapshot", - ) - ) - i1.raft_compact_log() i2.raft_compact_log() i3.raft_compact_log()