From 3261c56225cf30c20274ba8abf798ed72ec75c35 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Tue, 28 Nov 2023 21:00:46 +0300
Subject: [PATCH] test: implement basic error injection & add some tests for
 main loop

---
 .gitlab-ci.yml          |  2 +-
 Cargo.toml              |  1 +
 src/error_injection.rs  | 46 +++++++++++++++++++++++++++++++
 src/lib.rs              |  1 +
 src/luamod.rs           | 19 +++++++++++++
 src/traft/node.rs       | 11 ++++++++
 test/conftest.py        | 36 ++++++++++++++++++++-----
 test/int/test_couple.py | 60 +++++++++++++++++++++++++++++++++++++++++
 8 files changed, 168 insertions(+), 8 deletions(-)
 create mode 100644 src/error_injection.rs

diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml
index 8aed0f5f83..942cfb9e0f 100644
--- a/.gitlab-ci.yml
+++ b/.gitlab-ci.yml
@@ -227,7 +227,7 @@ test-mac-m1:
 
     - cargo fmt -- -v --check
     - cargo clippy --version
-    - cargo clippy --features webui -- --deny clippy::all --no-deps
+    - cargo clippy --all-features -- --deny clippy::all --no-deps
     # - |
     #   # Pipenv install
     #   ci-log-section start "pipenv-install" Installing pip dependencies ...
diff --git a/Cargo.toml b/Cargo.toml
index f762f3fe9e..e1228884dd 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -73,6 +73,7 @@ nix = { git = "https://git.picodata.io/picodata/picodata/nix.git", branch = "fix
 [features]
 load_test = []
 webui = []
+error_injection = []
 
 [lib]
 doctest = true
diff --git a/src/error_injection.rs b/src/error_injection.rs
new file mode 100644
index 0000000000..d43607e41b
--- /dev/null
+++ b/src/error_injection.rs
@@ -0,0 +1,46 @@
+use std::collections::HashSet;
+
+static mut INJECTED_ERRORS: Option<HashSet<String>> = None;
+
+#[inline(always)]
+pub fn enable(error: &str, enable: bool) {
+    // SAFETY: safe as long as only called from tx thread
+    let injected_errors = unsafe { &mut INJECTED_ERRORS };
+    let injected_errors = injected_errors.get_or_insert_with(Default::default);
+    if enable {
+        injected_errors.insert(error.into());
+    } else {
+        injected_errors.remove(error);
+    }
+}
+
+#[inline(always)]
+pub fn is_enabled(error: &str) -> bool {
+    // SAFETY: safe as long as only called from tx thread
+    let Some(injected_errors) = (unsafe { &INJECTED_ERRORS }) else {
+        return false;
+    };
+    injected_errors.contains(error)
+}
+
+#[cfg(not(feature = "error_injection"))]
+#[macro_export]
+macro_rules! error_injection {
+    ($($t:tt)*) => {
+        // Error injection is disabled
+    };
+}
+
+#[cfg(feature = "error_injection")]
+#[macro_export]
+macro_rules! error_injection {
+    (exit $error:expr) => {
+        #[rustfmt::skip]
+        if $crate::error_injection::is_enabled($error) {
+            $crate::tlog!(Info, "################################################################");
+            $crate::tlog!(Info, "ERROR INJECTED '{}': EXITING", $error);
+            $crate::tlog!(Info, "################################################################");
+            $crate::tarantool::exit(69);
+        };
+    };
+}
diff --git a/src/lib.rs b/src/lib.rs
index e312d5440d..5b1d73b247 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -36,6 +36,7 @@ mod bootstrap_entries;
 pub mod cas;
 pub mod cli;
 pub mod discovery;
+pub mod error_injection;
 pub mod failure_domain;
 pub mod governor;
 pub mod instance;
diff --git a/src/luamod.rs b/src/luamod.rs
index fbdcc2808d..af93531365 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -1446,4 +1446,23 @@ pub(crate) fn setup(args: &args::Run) {
             crate::traft::error::is_retriable_error_message(&msg)
         }),
     );
+
+    #[cfg(feature = "error_injection")]
+    luamod_set(
+        &l,
+        "_inject_error",
+        indoc! {"
+        pico._inject_error(error, enable)
+
+        Internal API, see src/luamod.rs for the details.
+
+        Params:
+
+            1. error (string)
+            2. enable (bool)
+        "},
+        tlua::Function::new(|error: String, enable: bool| {
+            crate::error_injection::enable(&error, enable);
+        }),
+    );
 }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index af369367cf..2c62d951cf 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1592,6 +1592,13 @@ impl NodeImpl {
                     .send(new_applied)
                     .expect("applied shouldn't ever be borrowed across yields");
             }
+
+            if hard_state.is_some() {
+                crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
+            }
+            if !entries_to_persist.is_empty() {
+                crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_ENTRIES");
+            }
         }
 
         // Apply committed entries.
@@ -1602,6 +1609,8 @@ impl NodeImpl {
                 tlog!(Warning, "dropping raft ready: {ready:#?}");
                 panic!("transaction failed: {e}");
             }
+
+            crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
         }
 
         // These messages are only available on followers. They must be sent only
@@ -1646,6 +1655,8 @@ impl NodeImpl {
             if let Err(e) = res {
                 panic!("transaction failed: {e}");
             }
+
+            crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
         }
 
         // Advance the apply index.
diff --git a/test/conftest.py b/test/conftest.py
index 7463a29795..d37e65f22c 100644
--- a/test/conftest.py
+++ b/test/conftest.py
@@ -679,6 +679,19 @@ class Instance:
             self.kill()
             raise e from e
 
+    def wait_process_stopped(self, timeout: int = 5):
+        if self.process is None:
+            return
+
+        # FIXME: copy-pasted from above
+        self.process.wait(timeout)
+
+        # Wait for all the output to be handled in the separate threads
+        while not self.process.stdout.closed or not self.process.stderr.closed:  # type: ignore
+            time.sleep(0.1)
+
+        self.process = None
+
     def restart(self, kill: bool = False, remove_data: bool = False):
         if kill:
             self.kill()
@@ -733,6 +746,7 @@ class Instance:
             case _:
                 raise TypeError("space must be str or int")
 
+    # FIXME: this method's parameters are out of sync with Cluster.cas
     def cas(
         self,
         op_kind: Literal["insert", "replace", "delete"],
@@ -968,12 +982,18 @@ class Instance:
         See `crate::traft::node::Node::wait_index`.
         """
 
-        return self.call(
-            "pico.raft_wait_index",
-            target,
-            timeout,  # this timeout is passed as an argument
-            timeout=timeout + 1,  # this timeout is for network call
-        )
+        def make_attempt():
+            return self.call(
+                "pico.raft_wait_index",
+                target,
+                timeout,  # this timeout is passed as an argument
+                timeout=timeout + 1,  # this timeout is for network call
+            )
+
+        index = Retriable(timeout=timeout + 1, rps=10).call(make_attempt)
+
+        assert index is not None
+        return index
 
     def get_vclock(self) -> int:
         """Get current vclock"""
@@ -1347,7 +1367,9 @@ class PortalStorage:
 @pytest.fixture(scope="session")
 def binary_path() -> str:
     """Path to the picodata binary, e.g. "./target/debug/picodata"."""
-    assert subprocess.call(["cargo", "build"]) == 0, "cargo build failed"
+    assert (
+        subprocess.call(["cargo", "build", "--features", "error_injection"]) == 0
+    ), "cargo build failed"
     metadata = subprocess.check_output(["cargo", "metadata", "--format-version=1"])
     target = json.loads(metadata)["target_directory"]
     return os.path.realpath(os.path.join(target, "debug/picodata"))
diff --git a/test/int/test_couple.py b/test/int/test_couple.py
index 3e75687e8c..0674e7a385 100644
--- a/test/int/test_couple.py
+++ b/test/int/test_couple.py
@@ -119,3 +119,63 @@ def test_restart_both(cluster2: Cluster):
     i2.raft_wait_index(index)
     assert i1.eval("return box.space._pico_property:get('check')")[1] is True
     assert i2.eval("return box.space._pico_property:get('check')")[1] is True
+
+
+def test_exit_after_persist_before_commit(cluster2: Cluster):
+    [i1, i2] = cluster2.instances
+
+    # Make sure i1 is raft leader
+    i1.promote_or_fail()
+
+    i2.call("pico._inject_error", "EXIT_AFTER_RAFT_PERSISTS_ENTRIES", True)
+
+    index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
+
+    # The injected error forces the instance to exit right after persisting the
+    # raft log entries, but before committing them to raft.
+    i2.wait_process_stopped()
+
+    # Instance restarts successfully and the entry is eventually applied.
+    i2.start()
+    i2.raft_wait_index(index)
+    assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
+
+
+def test_exit_after_commit_before_apply(cluster2: Cluster):
+    [i1, i2] = cluster2.instances
+
+    # Make sure i1 is raft leader
+    i1.promote_or_fail()
+
+    i2.call("pico._inject_error", "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE", True)
+
+    index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
+
+    # The injected error forces the instance to exit right after persisting the
+    # hard state, but before applying the entries to the local storage.
+    i2.wait_process_stopped()
+
+    # Instance restarts successfully and the entry is eventually applied.
+    i2.start()
+    i2.raft_wait_index(index)
+    assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
+
+
+def test_exit_after_apply(cluster2: Cluster):
+    [i1, i2] = cluster2.instances
+
+    # Make sure i1 is raft leader
+    i1.promote_or_fail()
+
+    i2.call("pico._inject_error", "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES", True)
+
+    index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
+
+    # The injected error forces the instance to exit right after applying the
+    # committed entry, but before letting raft-rs know about it.
+    i2.wait_process_stopped()
+
+    # Instance restarts successfully and the entry is eventually applied.
+    i2.start()
+    i2.raft_wait_index(index)
+    assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
-- 
GitLab