Skip to content
Snippets Groups Projects
Commit ba555088 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

test: implement basic error injection & add some tests for main loop

parent ca3a811a
No related branches found
No related tags found
No related merge requests found
......@@ -73,6 +73,7 @@ nix = { git = "https://git.picodata.io/picodata/picodata/nix.git", branch = "fix
[features]
load_test = []
webui = []
error_injection = []
[lib]
doctest = true
......
use std::collections::HashSet;
static mut INJECTED_ERRORS: Option<HashSet<String>> = None;
#[inline(always)]
pub fn enable(error: &str, enable: bool) {
// SAFETY: safe as long as only called from tx thread
let injected_errors = unsafe { &mut INJECTED_ERRORS };
let injected_errors = injected_errors.get_or_insert_with(Default::default);
if enable {
injected_errors.insert(error.into());
} else {
injected_errors.remove(error);
}
}
#[inline(always)]
pub fn is_enabled(error: &str) -> bool {
// SAFETY: safe as long as only called from tx thread
let Some(injected_errors) = (unsafe { &INJECTED_ERRORS }) else {
return false;
};
injected_errors.contains(error)
}
#[cfg(not(feature = "error_injection"))]
#[macro_export]
macro_rules! error_injection {
($($t:tt)*) => {
// Error injection is disabled
};
}
#[cfg(feature = "error_injection")]
#[macro_export]
macro_rules! error_injection {
(exit $error:expr) => {
#[rustfmt::skip]
if $crate::error_injection::is_enabled($error) {
$crate::tlog!(Info, "################################################################");
$crate::tlog!(Info, "ERROR INJECTED '{}': EXITING", $error);
$crate::tlog!(Info, "################################################################");
$crate::tarantool::exit(69);
};
};
}
......@@ -36,6 +36,7 @@ mod bootstrap_entries;
pub mod cas;
pub mod cli;
pub mod discovery;
pub mod error_injection;
pub mod failure_domain;
pub mod governor;
pub mod instance;
......
......@@ -1446,4 +1446,23 @@ pub(crate) fn setup(args: &args::Run) {
crate::traft::error::is_retriable_error_message(&msg)
}),
);
#[cfg(feature = "error_injection")]
luamod_set(
&l,
"_inject_error",
indoc! {"
pico._inject_error(error, enable)
Internal API, see src/luamod.rs for the details.
Params:
1. error (string)
2. enable (bool)
"},
tlua::Function::new(|error: String, enable: bool| {
crate::error_injection::enable(&error, enable);
}),
);
}
......@@ -1592,6 +1592,13 @@ impl NodeImpl {
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
if hard_state.is_some() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
}
if !entries_to_persist.is_empty() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_ENTRIES");
}
}
// Apply committed entries.
......@@ -1602,6 +1609,8 @@ impl NodeImpl {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
}
// These messages are only available on followers. They must be sent only
......@@ -1646,6 +1655,8 @@ impl NodeImpl {
if let Err(e) = res {
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
}
// Advance the apply index.
......
......@@ -679,6 +679,19 @@ class Instance:
self.kill()
raise e from e
def wait_process_stopped(self, timeout: int = 5):
if self.process is None:
return
# FIXME: copy-pasted from above
self.process.wait(timeout)
# Wait for all the output to be handled in the separate threads
while not self.process.stdout.closed or not self.process.stderr.closed: # type: ignore
time.sleep(0.1)
self.process = None
def restart(self, kill: bool = False, remove_data: bool = False):
if kill:
self.kill()
......@@ -733,6 +746,7 @@ class Instance:
case _:
raise TypeError("space must be str or int")
# FIXME: this method's parameters are out of sync with Cluster.cas
def cas(
self,
op_kind: Literal["insert", "replace", "delete"],
......@@ -968,12 +982,21 @@ class Instance:
See `crate::traft::node::Node::wait_index`.
"""
return self.call(
"pico.raft_wait_index",
target,
timeout, # this timeout is passed as an argument
timeout=timeout + 1, # this timeout is for network call
)
result = None
def make_attempt():
nonlocal result
result = self.call(
"pico.raft_wait_index",
target,
timeout, # this timeout is passed as an argument
timeout=timeout + 1, # this timeout is for network call
)
Retriable(timeout=timeout + 1, rps=10).call(make_attempt)
assert result is not None
return result
def get_vclock(self) -> int:
"""Get current vclock"""
......@@ -1347,7 +1370,9 @@ class PortalStorage:
@pytest.fixture(scope="session")
def binary_path() -> str:
"""Path to the picodata binary, e.g. "./target/debug/picodata"."""
assert subprocess.call(["cargo", "build"]) == 0, "cargo build failed"
assert (
subprocess.call(["cargo", "build", "--features", "error_injection"]) == 0
), "cargo build failed"
metadata = subprocess.check_output(["cargo", "metadata", "--format-version=1"])
target = json.loads(metadata)["target_directory"]
return os.path.realpath(os.path.join(target, "debug/picodata"))
......
......@@ -119,3 +119,63 @@ def test_restart_both(cluster2: Cluster):
i2.raft_wait_index(index)
assert i1.eval("return box.space._pico_property:get('check')")[1] is True
assert i2.eval("return box.space._pico_property:get('check')")[1] is True
def test_exit_after_persist_before_commit(cluster2: Cluster):
[i1, i2] = cluster2.instances
# Make sure i1 is raft leader
i1.promote_or_fail()
i2.call("pico._inject_error", "EXIT_AFTER_RAFT_PERSISTS_ENTRIES", True)
index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
# The injected error forces the instance to exit right after persisting the
# raft log entries, but before committing them to raft.
i2.wait_process_stopped()
# Instance restarts successfully and the entry is eventually applied.
i2.start()
i2.raft_wait_index(index)
assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
def test_exit_after_commit_before_apply(cluster2: Cluster):
[i1, i2] = cluster2.instances
# Make sure i1 is raft leader
i1.promote_or_fail()
i2.call("pico._inject_error", "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE", True)
index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
# The injected error forces the instance to exit right after persisting the
# hard state, but before applying the entries to the local storage.
i2.wait_process_stopped()
# Instance restarts successfully and the entry is eventually applied.
i2.start()
i2.raft_wait_index(index)
assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
def test_exit_after_apply(cluster2: Cluster):
[i1, i2] = cluster2.instances
# Make sure i1 is raft leader
i1.promote_or_fail()
i2.call("pico._inject_error", "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES", True)
index = cluster2.cas("insert", "_pico_property", ["foo", "bar"])
# The injected error forces the instance to exit right after applying the
# committed entry, but before letting raft-rs know about it.
i2.wait_process_stopped()
# Instance restarts successfully and the entry is eventually applied.
i2.start()
i2.raft_wait_index(index)
assert i2.eval("return box.space._pico_property:get('foo').value") == "bar"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment