From 36a59324986a24b3fc77ab7a8f8865d5a43d05d7 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 23 Dec 2024 18:52:24 +0300
Subject: [PATCH 1/4] test: reduce test_large_snapshot flakiness

---
 test/int/test_snapshot.py | 21 ---------------------
 1 file changed, 21 deletions(-)

diff --git a/test/int/test_snapshot.py b/test/int/test_snapshot.py
index 9933c08ed8..825e12ebda 100644
--- a/test/int/test_snapshot.py
+++ b/test/int/test_snapshot.py
@@ -1,7 +1,5 @@
 import time
 
-import pytest
-
 from conftest import Cluster, Retriable
 
 
@@ -68,9 +66,6 @@ def assert_eq(lhs, rhs):
     assert lhs == rhs
 
 
-@pytest.mark.xfail(
-    reason="flaky, see: https://git.picodata.io/core/picodata/-/issues/779"
-)
 def test_large_snapshot(cluster: Cluster):
     i1, i2, i3, i4 = cluster.deploy(instance_count=4)
 
@@ -215,14 +210,6 @@ def test_large_snapshot(cluster: Cluster):
     i4.env["PICODATA_SCRIPT"] = script_path
     i4.start()
 
-    # Wait for i4 to start receiving the snapshot
-    Retriable(10, 60).call(
-        lambda: assert_eq(
-            i4.call(".proc_runtime_info")["internal"]["main_loop_status"],
-            "receiving snapshot",
-        )
-    )
-
     # In the middle of snapshot application propose a new entry
     index = cluster.cas("insert", "_pico_property", ["pokemon", "snap"])
     for i in [i1, i2, i3]:
@@ -234,14 +221,6 @@ def test_large_snapshot(cluster: Cluster):
     i5.env["PICODATA_SCRIPT"] = script_path
     i5.start()
 
-    # Wait for i5 to start receiving the snapshot
-    Retriable(10, 60).call(
-        lambda: assert_eq(
-            i5.call(".proc_runtime_info")["internal"]["main_loop_status"],
-            "receiving snapshot",
-        )
-    )
-
     i1.raft_compact_log()
     i2.raft_compact_log()
     i3.raft_compact_log()
-- 
GitLab


From e7c8582d687a808edc46ae3785808d7b69414231 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Mon, 23 Dec 2024 18:52:47 +0300
Subject: [PATCH 2/4] fix: bug in raft snapshot application

There was a hard-to-reproduce bug in our snapshot application code.
We always compact the raft log before applying the snapshot, because
the snapshot replaces the entries and some of the logic in raft-rs
seems to rely on this. The problem was, that our compact_log function
would not remove any unapplied entries, which makes sense for compaction
triggered automatically by raft log size, but doesn't make sense for
raft snapshot, as the snapshot contains the state corresponding to the
newer entries. The fix is simple: don't guard from unapplied entry
compaction in case the compaction is for raft snapshot.

We don't add any regression tests for this, because the implementation
would be too difficult and would need us to pollute the code with error
injection logic, which is not a worthy trade off in this case. But also
the logic will still be tested, because this bug was responsible for a
large amount of flaky tests, so we should see a significant reduction in
flakiness from now on in tests concerning raft snapshots.
---
 src/luamod.rs             |  2 +-
 src/traft/node.rs         |  2 +-
 src/traft/raft_storage.rs | 20 ++++++++++++++------
 3 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/src/luamod.rs b/src/luamod.rs
index fbd23a301f..83991ec050 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -993,7 +993,7 @@ pub(crate) fn setup() {
         {
             tlua::function1(|up_to: RaftIndex| -> traft::Result<RaftIndex> {
                 let raft_storage = &node::global()?.raft_storage;
-                let ret = transaction(|| raft_storage.compact_log(up_to));
+                let ret = transaction(|| raft_storage.compact_log(up_to, false));
                 Ok(ret?)
             })
         },
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 385ec758a2..4d861a05e2 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -2473,7 +2473,7 @@ impl NodeImpl {
 
         transaction(|| -> traft::Result<()> {
             self.main_loop_status("log auto compaction");
-            self.raft_storage.compact_log(compact_until)?;
+            self.raft_storage.compact_log(compact_until, false)?;
 
             Ok(())
         })?;
diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs
index e92e0875c1..9ace28dc49 100644
--- a/src/traft/raft_storage.rs
+++ b/src/traft/raft_storage.rs
@@ -440,7 +440,7 @@ impl RaftSpaceAccess {
         let meta_index = meta.index;
         // We don't want to have a hole in the log, so we clear everything
         // before applying the snapshot
-        self.compact_log(meta.index + 1)?;
+        self.compact_log(meta.index + 1, true)?;
 
         let compacted_index = self.compacted_index()?;
         #[rustfmt::skip]
@@ -514,16 +514,24 @@ impl RaftSpaceAccess {
     /// raft-state values, so it **should be invoked within a
     /// transaction**.
     ///
+    /// If `for_snapshot` is `false` the `up_to` index is adjusted so that we
+    /// don't compact any entries which haven't been applied yet.
+    ///
     /// # Panics
     ///
     /// In debug mode panics if invoked out of a transaction.
     ///
-    pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> {
+    pub fn compact_log(&self, mut up_to: RaftIndex, for_snapshot: bool) -> tarantool::Result<u64> {
         debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
-        // We cannot drop entries, which weren't applied yet
-        let applied = self.applied()?;
-        let up_to = up_to.min(applied + 1);
+        if for_snapshot {
+            // Ok to delete unapplied entries in case of snapshot,
+            // because snapshot already contains the applied state.
+        } else {
+            // We cannot drop entries, which weren't applied yet
+            let applied = self.applied()?;
+            up_to = up_to.min(applied + 1);
+        }
 
         // IteratorType::LT means tuples are returned in descending order
         let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?;
@@ -998,7 +1006,7 @@ mod tests {
         storage.persist_applied(applied).unwrap();
         let entries =
             |lo, hi| S::entries(&storage, lo, hi, u64::MAX, GetEntriesContext::empty(false));
-        let compact_log = |up_to| transaction(|| storage.compact_log(up_to));
+        let compact_log = |up_to| transaction(|| storage.compact_log(up_to, false));
 
         assert_eq!(S::first_index(&storage), Ok(first));
         assert_eq!(S::last_index(&storage), Ok(last));
-- 
GitLab


From 1161519ad56f7b3036597fd901e39fd091fd2751 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 17 Dec 2024 16:54:15 +0300
Subject: [PATCH 3/4] test: fix CI=1 hack for local runs

---
 test/conftest.py | 10 +---------
 1 file changed, 1 insertion(+), 9 deletions(-)

diff --git a/test/conftest.py b/test/conftest.py
index c14e414055..14e2d0bc1a 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -2184,15 +2184,7 @@ class PgClient:
 
 
 def build_profile() -> str:
-    from_env = os.environ.get("BUILD_PROFILE")
-
-    if "CI" in os.environ:
-        # When running in CI BUILD_PROFILE must always be specified, we rely
-        # this in a couple of tests
-        assert from_env is not None, "BUILD_PROFILE must always be set in CI"
-
-    # When running on a developers machine, priorities the usability
-    return from_env or "dev"
+    return os.environ.get("BUILD_PROFILE", "dev")
 
 
 def get_test_dir():
-- 
GitLab


From 0957624d22e769d17c814c1a06c76224aa882bb4 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 24 Dec 2024 15:29:35 +0300
Subject: [PATCH 4/4] refactor: assert index is <= applied in compact_log
 instead of truncating

We used to automatically truncate the index in compact_log if the caller
requested to compact too many entries. This made it so that the
requirement of not compacting any un-applied entries was implicit in our
code base, which is not good as it allows for some bugs to creep in
(like the one we fix a couple commits ago).

Now this is changed and instead of silently adjusting the index of last
compacted entry, we just assert that it's no greater than the applied
index. As a consequence there's a minor improvement in do_raft_log_auto_compaction
function.
---
 src/luamod.rs             |  7 +++--
 src/traft/node.rs         |  8 ++---
 src/traft/raft_storage.rs | 63 ++++++++++++++++++++-------------------
 3 files changed, 42 insertions(+), 36 deletions(-)

diff --git a/src/luamod.rs b/src/luamod.rs
index 83991ec050..85c7db4544 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -992,8 +992,11 @@ pub(crate) fn setup() {
         "},
         {
             tlua::function1(|up_to: RaftIndex| -> traft::Result<RaftIndex> {
-                let raft_storage = &node::global()?.raft_storage;
-                let ret = transaction(|| raft_storage.compact_log(up_to, false));
+                let node = node::global()?;
+                let applied = node.get_index();
+                let up_to = up_to.min(applied + 1);
+                let raft_storage = &node.raft_storage;
+                let ret = transaction(|| raft_storage.compact_log(up_to));
                 Ok(ret?)
             })
         },
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 4d861a05e2..624a1d70fb 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -2420,10 +2420,10 @@ impl NodeImpl {
         let mut last_ddl_finalizer = None;
         let mut last_plugin_op_finalizer = None;
 
-        let last_index = self.raft_storage.last_index()?;
+        let last_applied = self.applied.get();
         let newly_added_entries =
             self.raft_storage
-                .entries(old_last_index + 1, last_index + 1, None)?;
+                .entries(old_last_index + 1, last_applied + 1, None)?;
 
         // Check if there's a finalizer (e.g. DdlCommit, Plugin::Abort, etc.)
         // operation among the newly added entries. The finalizers a relied upon
@@ -2456,7 +2456,7 @@ impl NodeImpl {
         }
 
         // Add 1 because this entry is to be removed.
-        let mut compact_until = last_index + 1;
+        let mut compact_until = last_applied + 1;
         if let Some(index) = last_ddl_finalizer {
             if compact_until > index {
                 tlog!(Debug, "preserving ddl finalizer raft op at index {index}");
@@ -2473,7 +2473,7 @@ impl NodeImpl {
 
         transaction(|| -> traft::Result<()> {
             self.main_loop_status("log auto compaction");
-            self.raft_storage.compact_log(compact_until, false)?;
+            self.raft_storage.compact_log(compact_until)?;
 
             Ok(())
         })?;
diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs
index 9ace28dc49..a10a536b24 100644
--- a/src/traft/raft_storage.rs
+++ b/src/traft/raft_storage.rs
@@ -438,9 +438,20 @@ impl RaftSpaceAccess {
     ///
     pub fn handle_snapshot_metadata(&self, meta: &raft::SnapshotMetadata) -> tarantool::Result<()> {
         let meta_index = meta.index;
+
+        #[cfg(debug_assertions)]
+        let applied = self.applied()?;
+        #[cfg(debug_assertions)]
+        #[rustfmt::skip]
+        debug_assert!(meta_index >= applied, "meta_index: {meta_index}, applied: {applied}");
+        // Must bump the applied index before doing log compaction,
+        // because log compaction only removes applied entries
+        self.persist_applied(meta_index)?;
+        self.persist_conf_state(meta.get_conf_state())?;
+
         // We don't want to have a hole in the log, so we clear everything
         // before applying the snapshot
-        self.compact_log(meta.index + 1, true)?;
+        self.compact_log(meta.index + 1)?;
 
         let compacted_index = self.compacted_index()?;
         #[rustfmt::skip]
@@ -449,12 +460,6 @@ impl RaftSpaceAccess {
         // the coordinates of the last entry which was in our log.
         self.persist_compacted_term(meta.term)?;
         self.persist_compacted_index(meta.index)?;
-
-        let applied = self.applied()?;
-        #[rustfmt::skip]
-        debug_assert!(meta_index >= applied, "meta_index: {meta_index}, applied: {applied}");
-        self.persist_applied(meta.index)?;
-        self.persist_conf_state(meta.get_conf_state())?;
         Ok(())
     }
 
@@ -502,9 +507,13 @@ impl RaftSpaceAccess {
         Ok(count as _)
     }
 
-    /// Trims raft log up to the given index (excluding the index
+    /// Trims raft log up to the given index `up_to` (excluding the index
     /// itself).
     ///
+    /// Note that `up_to` must not be greater than current applied index + 1,
+    /// because it doesn't make sense to compact entries which haven't been
+    /// applied yet.
+    ///
     /// Returns the new `first_index` after log compaction. It may
     /// differ from the requested one if the corresponding entry doesn't
     /// exist (either `up_to > last_index+1` or `up_to < first_index`).
@@ -514,24 +523,20 @@ impl RaftSpaceAccess {
     /// raft-state values, so it **should be invoked within a
     /// transaction**.
     ///
-    /// If `for_snapshot` is `false` the `up_to` index is adjusted so that we
-    /// don't compact any entries which haven't been applied yet.
-    ///
     /// # Panics
     ///
-    /// In debug mode panics if invoked out of a transaction.
+    /// In debug mode panics in case:
+    /// - invoked outside of a transaction
+    /// - `up_to > applied + 1`
     ///
-    pub fn compact_log(&self, mut up_to: RaftIndex, for_snapshot: bool) -> tarantool::Result<u64> {
+    pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> {
         debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
-        if for_snapshot {
-            // Ok to delete unapplied entries in case of snapshot,
-            // because snapshot already contains the applied state.
-        } else {
-            // We cannot drop entries, which weren't applied yet
-            let applied = self.applied()?;
-            up_to = up_to.min(applied + 1);
-        }
+        // We cannot drop entries, which weren't applied yet
+        #[cfg(debug_assertions)]
+        let applied = self.applied()?;
+        #[cfg(debug_assertions)]
+        debug_assert!(up_to <= applied + 1, "up_to: {up_to}, applied: {applied}");
 
         // IteratorType::LT means tuples are returned in descending order
         let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?;
@@ -1006,7 +1011,7 @@ mod tests {
         storage.persist_applied(applied).unwrap();
         let entries =
             |lo, hi| S::entries(&storage, lo, hi, u64::MAX, GetEntriesContext::empty(false));
-        let compact_log = |up_to| transaction(|| storage.compact_log(up_to, false));
+        let compact_log = |up_to| transaction(|| storage.compact_log(up_to));
 
         assert_eq!(S::first_index(&storage), Ok(first));
         assert_eq!(S::last_index(&storage), Ok(last));
@@ -1036,14 +1041,12 @@ mod tests {
         assert_eq!(compact_log(0).unwrap(), first);
         assert_eq!(compact_log(first).unwrap(), first);
 
-        // cannot compact past applied
-        assert_eq!(compact_log(applied + 2).unwrap(), applied + 1);
-
-        storage.persist_applied(last).unwrap();
-
-        // trim to the end
-        assert_eq!(compact_log(u64::MAX).unwrap(), last + 1);
-        assert_eq!(storage.space_raft_log.len().unwrap(), 0);
+        // cannot compact past applied, this call will panic
+        // Note that we could use `#[tarantool::test(should_panic)]` but in this
+        // case a panic would happen inside a tarantool transaction, and
+        // unwinding from within it will leave an unfinished transaction, which
+        // we don't want to have
+        // _ = compact_log(applied + 2);
     }
 
     #[::tarantool::test]
-- 
GitLab