From a9a48cbdb7bcf6283a1963a8174994bed6ec14f5 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 28 Nov 2023 19:13:05 +0300
Subject: [PATCH] fix: don't compact unapplied entries

---
 src/error_injection.rs      | 17 +++++++++++++++++
 src/traft/node.rs           |  2 ++
 src/traft/raft_storage.rs   | 14 ++++++++++----
 test/int/test_compaction.py | 29 ++++++++++++++++++++++++++++-
 4 files changed, 57 insertions(+), 5 deletions(-)

diff --git a/src/error_injection.rs b/src/error_injection.rs
index d43607e41b..0f6ef7fafc 100644
--- a/src/error_injection.rs
+++ b/src/error_injection.rs
@@ -43,4 +43,21 @@ macro_rules! error_injection {
             $crate::tarantool::exit(69);
         };
     };
+    (block $error:expr) => {{
+        let error = $error;
+        #[rustfmt::skip]
+        if $crate::error_injection::is_enabled(error) {
+            $crate::tlog!(Info, "################################################################");
+            $crate::tlog!(Info, "ERROR INJECTED '{}': BLOCKING", error);
+            $crate::tlog!(Info, "################################################################");
+
+            while $crate::error_injection::is_enabled(error) {
+                ::tarantool::fiber::sleep(::std::time::Duration::from_millis(100));
+            }
+
+            $crate::tlog!(Info, "################################################################");
+            $crate::tlog!(Info, "ERROR UNINJECTED '{}': UNBLOCKING", error);
+            $crate::tlog!(Info, "################################################################");
+        };
+    }};
 }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index ff84def900..159974ef0d 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1645,6 +1645,8 @@ impl NodeImpl {
                 tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
                 panic!("transaction failed: {e}");
             }
+
+            crate::error_injection!(block "BLOCK_AFTER_PERSIST_COMMIT_INDEX");
         }
 
         // Apply committed entries.
diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs
index a37e94ecb2..634d384dd2 100644
--- a/src/traft/raft_storage.rs
+++ b/src/traft/raft_storage.rs
@@ -8,6 +8,7 @@ use std::cmp::Ordering;
 use std::convert::TryFrom as _;
 
 use crate::instance::InstanceId;
+use crate::tlog;
 use crate::traft;
 use crate::traft::RaftEntryId;
 use crate::traft::RaftId;
@@ -455,9 +456,9 @@ impl RaftSpaceAccess {
     pub fn compact_log(&self, up_to: RaftIndex) -> tarantool::Result<u64> {
         debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
 
-        // We cannot drop entries, which weren't commited yet
-        let commit = self.commit()?;
-        let up_to = up_to.min(commit + 1);
+        // We cannot drop entries, which weren't applied yet
+        let applied = self.applied()?;
+        let up_to = up_to.min(applied + 1);
 
         // IteratorType::LT means tuples are returned in descending order
         let mut iter = self.space_raft_log.select(IteratorType::LT, &(up_to,))?;
@@ -477,14 +478,19 @@ impl RaftSpaceAccess {
         self.persist_compacted_index(compacted_index)?;
         self.persist_compacted_term(compacted_term)?;
 
+        let mut num_deleted = 1;
         for tuple in iter {
             let index = tuple
                 .field::<RaftIndex>(Self::FIELD_ENTRY_INDEX)?
                 .expect("index is non-nullable");
             self.space_raft_log.delete(&(index,))?;
+            num_deleted += 1;
         }
 
-        Ok(1 + compacted_index)
+        let new_first_index = 1 + compacted_index;
+        tlog!(Info, "compacted raft log: deleted {num_deleted} entries, new first entry index is {new_first_index}");
+
+        Ok(new_first_index)
     }
 }
 
diff --git a/test/int/test_compaction.py b/test/int/test_compaction.py
index 517eb674cb..29435266c5 100644
--- a/test/int/test_compaction.py
+++ b/test/int/test_compaction.py
@@ -1,4 +1,4 @@
-from conftest import Instance
+from conftest import Instance, Retriable
 
 
 def test_compaction(instance: Instance):
@@ -33,3 +33,30 @@ def test_compaction(instance: Instance):
     # Check idempotency
     assert instance.raft_compact_log(2) == applied_index + 1
     assert raft_state("compacted_index") == applied_index
+
+
+def raft_wait_commit_index(i: Instance, expected, timeout=1):
+    def make_attempt():
+        commit = i.eval("return box.space._raft_state:get('commit').value")
+        assert commit == expected
+
+    Retriable(timeout=timeout, rps=10).call(make_attempt)
+
+
+def test_unapplied_entries_arent_compacted(instance: Instance):
+    i1 = instance
+
+    i1.call("pico._inject_error", "BLOCK_AFTER_PERSIST_COMMIT_INDEX", True)
+
+    # Propose a raft log entry and wait for it to be committed, but not applied.
+    index = i1.cas("insert", "_pico_property", ["dont", "compact", "me"])
+    raft_wait_commit_index(i1, index)
+
+    assert i1.call("box.space._pico_property:get", "dont") is None
+
+    first = i1.raft_compact_log()
+    assert first == index
+
+    i1.call("pico._inject_error", "BLOCK_AFTER_PERSIST_COMMIT_INDEX", False)
+    i1.raft_wait_index(index)
+    assert i1.call("box.space._pico_property:get", "dont") == ["dont", "compact", "me"]
-- 
GitLab