diff --git a/test/int/conftest.py b/test/int/conftest.py index 132d9ac571530ea11619b7f35d786d3853ac46ed..4377349b09a43a51fd3a426ce40533a54c2d3e09 100644 --- a/test/int/conftest.py +++ b/test/int/conftest.py @@ -1,308 +1,365 @@ -from contextlib import contextmanager, suppress -from dataclasses import dataclass -from functools import cached_property -import itertools -import json import os -from pathlib import Path +import re +import funcy # type: ignore +import pytest import signal -import socket import subprocess -import time -import pytest -import tarantool -from filelock import FileLock +from shutil import rmtree +from typing import Generator +from pathlib import Path +from contextlib import contextmanager, suppress +from dataclasses import dataclass, field +from tarantool.connection import Connection # type: ignore +from tarantool.error import ( # type: ignore + tnt_strerror, + DatabaseError, +) -@pytest.fixture(scope="session") -def session_data_mutex(tmp_path_factory, worker_id): +# From raft.rs: +# A constant represents invalid id of raft. +# pub const INVALID_ID: u64 = 0; +INVALID_ID = 0 + + +re_xdist_worker_id = re.compile(r"^gw(\d+)$") + + +def xdist_worker_number(worker_id: str) -> int: """ - Returns a context manager with a mutex-guarded session-scoped dict to use - in parallel tests with multiprocessing. + Identify xdist worker by an integer instead of a string. + This is used for parallel testing. + See also: https://pypi.org/project/pytest-xdist/ """ - parallel_test_run = worker_id != "master" + if worker_id == "master": + return 0 - if parallel_test_run: - common_dir_for_each_pytest_run = tmp_path_factory.getbasetemp().parent - data_file: Path = common_dir_for_each_pytest_run / "data.json" - lock_file: Path = data_file.with_suffix(".lock") + match = re_xdist_worker_id.match(worker_id) + if not match: + raise ValueError(worker_id) - @contextmanager - def lock_session_data() -> dict: - with FileLock(lock_file): - if data_file.is_file(): - data = json.loads(data_file.read_text()) - else: - data = {} - yield data - data_file.write_text(json.dumps(data)) + return int(match.group(1)) - return lock_session_data - else: - data = {} - @contextmanager - def d(): - yield data +class TarantoolError(Exception): + """ + Raised when Tarantool responds with an IPROTO_ERROR. + """ - return d + pass -@pytest.fixture -def run_id(session_data_mutex): - with session_data_mutex() as data: - prev_run_id = data.get("run_id", 0) - run_id = prev_run_id + 1 - data["run_id"] = run_id - return run_id +class ReturnError(Exception): + """ + Raised when Tarantool returns `nil, err`. + """ + + pass + + +class MalformedAPI(Exception): + """ + Raised when Tarantool returns some data (IPROTO_OK), + but it's neither `return value` nor `return nil, err`. + + The actual returned data is contained in `self.args`. + """ + + pass + + +def normalize_net_box_result(func): + """ + Convert lua-style responses to be more python-like. + This also fixes some of the connector API inconveniences. + """ + + def inner(*args, **kwargs): + try: + result = func(*args, **kwargs) + except DatabaseError as exc: + if hasattr(exc, "errno"): + # Error handling in Tarantool connector is awful. + # It wraps NetworkError in DatabaseError. + # We, instead, convert it to a native OSError. + strerror = os.strerror(exc.errno) + raise OSError(exc.errno, strerror) from exc + + match exc.args: + case (int(code), arg): + # Error handling in Tarantool connector is awful. + # It returns error codes as raw numbers. + # Here we format them for easier use in pytest assertions. + raise TarantoolError(tnt_strerror(code)[0], arg) from exc + case _: + raise exc from exc + + match result.data: + case []: + return None + case [x]: + return x + case [None, str(err)]: + raise ReturnError(err) + case ret: + raise MalformedAPI(*ret) + + return inner + + +@dataclass(eq=False, frozen=True) +class RaftStatus: + id: int + raft_state: str + leader_id: int + + def __eq__(self, other): + match other: + # assert status == "Follower" + # assert status == "Candidate" + # assert status == "Leader" + # assert status == "PreCandidate" + case str(raft_state): + return self.raft_state == raft_state + # assert status == ("Follower", 1) + case (str(raft_state), int(leader_id)): + return self.raft_state == raft_state and self.leader_id == leader_id + case RaftStatus(_) as other: + return ( + self.id == other.id + and self.raft_state == other.raft_state + and self.leader_id == other.leader_id + ) + + return False + + def is_ready(self): + return self.leader_id > INVALID_ID @dataclass class Instance: - id: str - cluster_id: str - peers: list[str] - listen: str - address: tuple[str, int] - workdir: str binary_path: str - env: dict[str, str] | None = None - replicaset_id: str | None = None + + instance_id: str + data_dir: str + peers: list[str] + host: str + port: int + + env: dict[str, str] = field(default_factory=dict) process: subprocess.Popen | None = None + raft_id: int = INVALID_ID + + @property + def listen(self): + return f"{self.host}:{self.port}" @property def command(self): # fmt: off return [ self.binary_path, "run", - "--cluster-id", self.cluster_id, - "--instance-id", self.id, + "--instance-id", self.instance_id, + "--data-dir", self.data_dir, "--listen", self.listen, - "--peer", ','.join(self.peers), - # "--data-dir", self.workdir + "--peer", ','.join(self.peers) ] # fmt: on + def __repr__(self): + return f"Instance({self.instance_id}, listen={self.listen})" + @contextmanager - def connection(self): - c = tarantool.connect(*self.address) + def connection(self, timeout: int): + c = Connection( + self.host, + self.port, + socket_timeout=timeout, + connection_timeout=timeout, + ) try: yield c finally: c.close() - def _normalize_net_box_result(self, result): - match result.data: - case []: - return - case [x]: - return x - case [x, *rest]: - return (x, *rest) + @normalize_net_box_result + def call(self, fn, *args, timeout: int = 1): + with self.connection(timeout) as conn: + return conn.call(fn, args) - def call(self, fn, *args): - with self.connection() as conn: - return self._normalize_net_box_result(conn.call(fn, args)) + @normalize_net_box_result + def eval(self, expr, *args, timeout: int = 1): + with self.connection(timeout) as conn: + return conn.eval(expr, *args) - def eval(self, expr, *args): - with self.connection() as conn: - return self._normalize_net_box_result(conn.eval(expr, *args)) + def killpg(self): + """Kill entire process group""" + if self.process is None: + # Be idempotent + return - def kill(self): pid = self.process.pid with suppress(ProcessLookupError, PermissionError): os.killpg(pid, signal.SIGKILL) - print("killed:,", self) with suppress(ChildProcessError): os.waitpid(pid, 0) + print(f"killed: {self}") + self.process = None + + def terminate(self, kill_after_seconds=10): + """Terminate the instance gracefully with SIGTERM""" + if self.process is None: + # Be idempotent + return + + with suppress(ProcessLookupError, PermissionError): + os.killpg(self.process.pid, signal.SIGCONT) - def stop(self, kill_after_seconds=10) -> int: self.process.terminate() + try: - try: - return self.process.wait(kill_after_seconds) - except subprocess.TimeoutExpired: - self.kill() - return -1 + return self.process.wait(timeout=kill_after_seconds) finally: - self.kill() + self.killpg() def start(self): + if self.process: + # Be idempotent + return + self.process = subprocess.Popen( self.command, - cwd=self.workdir, env=self.env or {}, stdin=subprocess.DEVNULL, start_new_session=True, ) + # Assert a new process group is created assert os.getpgid(self.process.pid) == self.process.pid - wait_tcp_port(*self.address) - def restart(self, kill=False): - if kill: - self.kill() + def restart(self, killpg: bool = False, drop_db: bool = False): + if killpg: + self.killpg() else: - self.stop() - self.start() + self.terminate() - def remove_data(self): - for root, dirs, files in os.walk(self.workdir, topdown=False): - for name in files: - os.remove(os.path.join(root, name)) - for name in dirs: - os.rmdir(os.path.join(root, name)) - - def __repr__(self): - return f"picodata({' '.join(self.command[1:])})" + if drop_db: + self.drop_db() + self.start() -@dataclass(frozen=True) -class Cluster: - id: str - instances: list[Instance] + def drop_db(self): + rmtree(self.data_dir) - @cached_property - def instances_by_id(self) -> dict[str, Instance]: - return {i.id: i for i in self.instances} + def __raft_status(self) -> RaftStatus: + status = self.call("picolib.raft_status") + assert isinstance(status, dict) + return RaftStatus(**status) - @cached_property - def replicasets(self) -> dict[str, list[Instance]]: - rss = {} - for i in self.instances: - rss.setdefault(i.replicaset_id, []).append(i) - return rss + def raft_propose_eval(self, lua_code: str, timeout_seconds=2): + return self.call( + "picolib.raft_propose_eval", + timeout_seconds, + lua_code, + ) - def for_each_instance(self, fn): - return list(map(fn, self.instances)) + def assert_raft_status(self, raft_state, leader_id=None): + status = self.__raft_status() + if leader_id is None: + assert status == raft_state + else: + assert status == (raft_state, leader_id) - def stop(self, *args, **kwargs): - self.for_each_instance(lambda i: i.stop(*args, **kwargs)) + @funcy.retry(tries=20, timeout=0.1) + def wait_ready(self): + status = self.__raft_status() + assert status.is_ready() + self.raft_id = status.id - def start(self, *args, **kwargs): - self.for_each_instance(lambda i: i.start(*args, **kwargs)) + @funcy.retry(tries=20, timeout=0.1) + def promote_or_fail(self): + self.assert_raft_status("Leader") + self.call("picolib.raft_timeout_now") - def kill(self, *args, **kwargs): - self.for_each_instance(lambda i: i.kill(*args, **kwargs)) - def restart(self, *args, **kwargs): - self.for_each_instance(lambda i: i.restart(*args, **kwargs)) +@dataclass +class Cluster: + binary_path: str - def remove_data(self, *args, **kwargs): - self.for_each_instance(lambda i: i.remove_data(*args, **kwargs)) + subnet: int + data_dir: str + instances: list[Instance] = field(default_factory=list) def __repr__(self): - return f"Cluster(id={self.id}, [{', '.join(i.id for i in self.instances)}])" + return f'Cluster("127.7.{self.subnet}.1", n={self.instances.count()})' + def __getitem__(self, item: int) -> Instance: + return self.instances[item] -@pytest.fixture(scope="session") -def compile(session_data_mutex): - with session_data_mutex() as data: - if data.get("compiled"): - return - assert subprocess.call(["cargo", "build"]) == 0, "cargo build failed" + def deploy(self, *, instance_count: int) -> list[Instance]: + assert not self.instances, "Already deployed" - data["compiled"] = True + for i in range(1, instance_count + 1): + instance = Instance( + binary_path=self.binary_path, + instance_id=f"i{i}", + data_dir=f"{self.data_dir}/i{i}", + host=f"127.7.{self.subnet}.1", + port=3300 + i, + peers=[f"127.7.{self.subnet}.1:3301"], + ) + self.instances.append(instance) -@pytest.fixture(scope="session") -def binary_path(compile) -> str: - return os.path.realpath(Path(__file__) / "../../../target/debug/picodata") - - -@pytest.fixture -def run_instance(binary_path, tmpdir, run_id): - tmpdir = Path(tmpdir) - ports = (30000 + run_id * 100 + i for i in itertools.count()) - - instances: list[Instance] = [] - - def really_run_instance( - *, - instance_id: str, - cluster_id: str, - port: int | None = None, - peers: list[str] | None = None, - env: dict[str, str] | None = None, - ): - cluster_dir = tmpdir / cluster_id - env = env or {} - workdir = cluster_dir / instance_id - port = port or next(ports) - address = ("127.0.0.1", port) - listen = f"127.0.0.1:{port}" - peers = peers or [listen] - - with suppress(FileExistsError): - cluster_dir.mkdir() - workdir.mkdir() - - instance = Instance( - cluster_id=cluster_id, - id=instance_id, - peers=peers, - address=address, - listen=listen, - workdir=str(workdir), - binary_path=str(binary_path), - env=env, - ) + for instance in self.instances: + instance.start() - instance.start() - instances.append(instance) + for instance in self.instances: + instance.wait_ready() - return instance + return self.instances - yield really_run_instance + def kill_all(self): + for instance in self.instances: + instance.kill() - for instance in instances: - instance.stop(kill_after_seconds=2) + def terminate(self): + for instance in self.instances: + instance.terminate() + def drop_db(self): + rmtree(self.data_dir) -@pytest.fixture -def run_cluster(run_instance, run_id): - cluster_ids = (f"cluster-{run_id:03d}-{i:03d}" for i in itertools.count(1)) - - def really_run_cluster(cluster_id=None, instance_count=1): - cluster_id = cluster_id or next(cluster_ids) - instance_numbers = tuple(range(1, instance_count + 1)) - ports = [40000 + run_id * 1000 + i for i in instance_numbers] - peers = [f"127.0.0.1:{p}" for p in ports] - - return Cluster( - id=cluster_id, - instances=[ - run_instance( - instance_id=f"i{instance_number}", - peers=peers, - cluster_id=cluster_id, - port=port, - ) - for instance_number, port in zip(instance_numbers, ports) - ], - ) - yield really_run_cluster +@pytest.fixture(scope="session") +def compile() -> None: + assert subprocess.call(["cargo", "build"]) == 0, "cargo build failed" -def wait_tcp_port(host, port, timeout=10): - t = time.monotonic() - while time.monotonic() - t < timeout: - try: - s = socket.create_connection((host, port), timeout=timeout) - s.close() - return - except socket.error: - time.sleep(0.02) - raise TimeoutError("Port is closed %s:%i" % (host, port)) +@pytest.fixture(scope="session") +def binary_path(compile) -> str: + return os.path.realpath(Path(__file__) / "../../../target/debug/picodata") @pytest.fixture -def instance(run_instance, run_id): - return run_instance(instance_id="i1", cluster_id=f"cluster-{run_id:03d}-1") +def cluster(binary_path, tmpdir, worker_id) -> Generator[Cluster, None, None]: + subnet = xdist_worker_number(worker_id) + assert isinstance(subnet, int) + assert 0 <= subnet < 256 + + cluster = Cluster( + binary_path=binary_path, + subnet=subnet, + data_dir=tmpdir, + ) + yield cluster + cluster.terminate() + cluster.drop_db() @pytest.fixture -def cluster(run_cluster): - return run_cluster(instance_count=3) +def instance(cluster: Cluster) -> Generator[Instance, None, None]: + cluster.deploy(instance_count=1) + yield cluster[0] diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 6a51f8b274e37942dd6fdd7d382c806142944cd7..aaca2c47fc09b8c0b649f24bd9fca270d9ca2c2b 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -1,95 +1,198 @@ +import errno import os +import funcy # type: ignore import pytest -import tarantool +import signal -from conftest import Cluster -from util import raft_propose_eval, retry_on_network_errors +from conftest import ( + xdist_worker_number, + Instance, + RaftStatus, + TarantoolError, + ReturnError, + MalformedAPI, +) -def process_exists(pid): - try: - os.kill(pid, 0) - except ProcessLookupError: - return False - else: - return True +def test_xdist_worker_number(): + assert xdist_worker_number("master") == 0 + assert xdist_worker_number("gw0") == 0 + assert xdist_worker_number("gw1") == 1 + assert xdist_worker_number("gw007") == 7 + assert xdist_worker_number("gw1024") == 1024 + + with pytest.raises(ValueError, match=r"gw"): + assert xdist_worker_number("gw") + + with pytest.raises(ValueError, match=r"xgw8x"): + assert xdist_worker_number("xgw8x") + + with pytest.raises(ValueError, match=r"wtf"): + assert xdist_worker_number("wtf") + + +def test_raft_status(): + s = RaftStatus( + id=1, + raft_state="SomeState", + leader_id=1, + ) + + assert (s == "SomeState") is True + assert (s == ("SomeState")) is True + assert (s == ("SomeState",)) is False + assert (s == "OtherState") is False + + assert (s == ("SomeState", 1)) is True + assert (s == ("SomeState", -1)) is False + assert (s == ("OtherState", 1)) is False + assert (s == s) is True + assert (s == RaftStatus(s.id, s.raft_state, s.leader_id)) is True + assert (s == RaftStatus(-1, s.raft_state, s.leader_id)) is False + assert (s == RaftStatus(s.id, "OtherState", s.leader_id)) is False + assert (s == RaftStatus(s.id, s.raft_state, -1)) is False -def test_instance(instance): - assert str(instance) + assert RaftStatus(1, "Follower", 0).is_ready() is False + assert RaftStatus(1, "Follower", 1).is_ready() is True + +def test_call_normalization(instance: Instance): assert instance.call("tostring", 1) == "1" assert instance.call("dostring", "return") is None assert instance.call("dostring", "return 1") == 1 - assert instance.call("dostring", "return 1, nil, 2, '3'") == (1, None, 2, "3") + assert instance.call("dostring", "return { }") == [] + assert instance.call("dostring", "return 's'") == "s" + assert instance.call("dostring", "return nil") is None + assert instance.call("dostring", "return true") is True - assert instance.eval("return") is None - assert instance.eval("return 1") == 1 - assert instance.eval("return 1, nil, 2, '3'") == (1, None, 2, "3") + with pytest.raises(ReturnError) as exc: + instance.call("dostring", "return nil, 'some error'") + assert exc.value.args == ("some error",) - pid = instance.process.pid - instance.stop() - assert not process_exists(pid) - with pytest.raises(tarantool.NetworkError): - instance.call("whatever") - instance.start() - assert instance.eval("return 1") == 1 + with pytest.raises(MalformedAPI) as exc: + instance.call("dostring", "return 'x', 1") + assert exc.value.args == ("x", 1) - pid = instance.process.pid - instance.kill() - assert not process_exists(pid) - with pytest.raises(tarantool.NetworkError): - instance.eval("return") - instance.remove_data() - assert os.listdir(instance.workdir) == [] - instance.start() - retry_on_network_errors(instance.eval)("return") + with pytest.raises(TarantoolError) as exc: + instance.call("error", "lua exception", 0) + assert exc.value.args == ("ER_PROC_LUA", "lua exception") - pid = instance.process.pid - instance.restart() - assert not process_exists(pid) - retry_on_network_errors(instance.eval)("return") + with pytest.raises(TarantoolError) as exc: + instance.call("void") + assert exc.value.args == ("ER_NO_SUCH_PROC", "Procedure 'void' is not defined") + + # Python connector for tarantool misinterprets timeout errors. + # It should be TimeoutError instead of ECONNRESET + with pytest.raises(OSError) as exc: + instance.call("package.loaded.fiber.sleep", 1, timeout=0.1) + assert exc.value.errno == errno.ECONNRESET + + with pytest.raises(OSError) as exc: + instance.call("os.exit", 0) + assert exc.value.errno == errno.ECONNRESET + + instance.terminate() + with pytest.raises(OSError) as exc: + instance.call("anything") + assert exc.value.errno == errno.ECONNREFUSED + + +def test_eval_normalization(instance: Instance): + assert instance.eval("return") is None + assert instance.eval("return 1") == 1 + assert instance.eval("return { }") == [] + assert instance.eval("return 's'") == "s" + assert instance.eval("return nil") is None + assert instance.eval("return true") is True + with pytest.raises(ReturnError) as exc: + instance.eval("return nil, 'some error'") + assert exc.value.args == ("some error",) -def test_single_instance_raft_eval(instance): - raft_propose_eval(instance, "_G.success = true") - assert instance.eval("return _G.success") + with pytest.raises(MalformedAPI) as exc: + instance.eval("return 'x', 2") + assert exc.value.args == ("x", 2) + with pytest.raises(TarantoolError) as exc: + instance.eval("error('lua exception', 0)") + assert exc.value.args == ("ER_PROC_LUA", "lua exception") -def test_cluster(cluster: Cluster): - assert cluster.instances - assert str(cluster) + with pytest.raises(TarantoolError) as exc: + instance.eval("return box.schema.space.drop(0, 'void')") + assert exc.value.args == ("ER_NO_SUCH_SPACE", "Space 'void' does not exist") - for instance in cluster.instances: - retry_on_network_errors(raft_propose_eval)(instance, "return true") - cluster.restart() +def test_process_management(instance: Instance): + """ + The test ensures pytest can kill all subprocesses + even if they don't terminate and hang + """ - for instance in cluster.instances: - retry_on_network_errors(raft_propose_eval)(instance, "return true") + assert instance.eval("return 'ok'") == "ok" + assert instance.process is not None + pid = instance.process.pid + pgrp = pid + + class StillAlive(Exception): + pass + + @funcy.retry(tries=10, timeout=0.01, errors=StillAlive) + def waitpg(pgrp): + try: + os.killpg(pgrp, 0) + except ProcessLookupError: + return True + else: + raise StillAlive + + # Sigstop entire pg so that the picodata child can't + # handle the supervisor termination + os.killpg(pgrp, signal.SIGSTOP) + + # Now kill the supervisor + os.kill(pid, signal.SIGKILL) + os.waitpid(pid, 0) + + # Make sure the supervisor is dead + with pytest.raises(ProcessLookupError): + os.kill(pid, 0) - cluster.kill() - cluster.remove_data() + # Make sure the child is still hanging + with pytest.raises(OSError) as exc: + instance.eval("return 'ok'", timeout=0.1) + assert exc.value.errno == errno.ECONNRESET + with pytest.raises(StillAlive): + waitpg(pgrp) + # Kill the remaining child in the process group + instance.killpg() -@pytest.mark.xfail -def test_propose_eval(run_cluster): - cluster: Cluster = run_cluster(instance_count=3) + # When the supervisor is killed, the orphaned child is reparented to + # a subreaper. Pytest isn't the one, and therefore it can't do `waitpid`. + # Instead, the test retries `killpg` until it succeeds. - i1, i2, i3 = cluster.instances + # Ensure the child is dead + waitpg(pgrp) + with pytest.raises(ProcessLookupError): + os.killpg(pgrp, 0) + + # Check idempotency + instance.start() + pid1 = instance.process.pid + instance.start() + pid2 = instance.process.pid + assert pid1 == pid2 - assert raft_propose_eval(i1, "_G.x1 = 1") == (None, None) - assert raft_propose_eval(i2, "_G.x2 = 2") == (None, None) - assert raft_propose_eval(i3, "_G.x3 = 3") == (None, None) + instance.terminate() + instance.terminate() + instance.killpg() + instance.killpg() - assert i1.eval("return rawget(_G, 'x1')") == 1 - assert i1.eval("return rawget(_G, 'x2')") == 2 - assert i1.eval("return rawget(_G, 'x3')") == 3 - assert i2.eval("return rawget(_G, 'x1')") == 1 - assert i2.eval("return rawget(_G, 'x2')") == 2 - assert i2.eval("return rawget(_G, 'x3')") == 3 +def test_propose_eval(instance: Instance): + with pytest.raises(ReturnError, match="timeout"): + instance.raft_propose_eval("return", timeout_seconds=0) - assert i3.eval("return rawget(_G, 'x1')") == 1 - assert i3.eval("return rawget(_G, 'x2')") == 2 - assert i3.eval("return rawget(_G, 'x3')") == 3 + assert instance.raft_propose_eval("_G.success = true") + assert instance.eval("return _G.success") is True diff --git a/test/int/test_couple.py b/test/int/test_couple.py index 882a15e315ad7a33754db260198f81d1282d714c..79e66f8b43e32cdbb1b21a3b834b7d871939be30 100644 --- a/test/int/test_couple.py +++ b/test/int/test_couple.py @@ -1,33 +1,39 @@ +import funcy # type: ignore import pytest from conftest import Cluster -from util import promote_or_fail, raft_propose_eval, assert_raft_status, retry -@pytest.mark.skip -def test_follower_proposal(run_cluster): - cluster: Cluster = run_cluster(instance_count=2) +@funcy.retry(tries=20, timeout=0.1) +def retry_call(call, *args, **kwargs): + return call(*args, **kwargs) - i1, i2 = cluster.instances - promote_or_fail(i1) # Speed up node election +@pytest.fixture +def cluster2(cluster: Cluster): + cluster.deploy(instance_count=2) + return cluster + + +def test_follower_proposal(cluster2: Cluster): + i1, i2 = cluster2.instances + i1.promote_or_fail() + + i2.assert_raft_status("Follower", leader_id=i1.raft_id) + i2.raft_propose_eval("rawset(_G, 'check', box.info.listen)") - raft_propose_eval(i1, "rawset(_G, 'check', box.info.listen)") assert i1.eval("return check") == i1.listen assert i2.eval("return check") == i2.listen -def test_failover(run_cluster): - cluster: Cluster = run_cluster(instance_count=2) - i1, i2 = cluster.instances - - promote_or_fail(i1) +def test_failover(cluster2: Cluster): + i1, i2 = cluster2.instances + i1.promote_or_fail() - retry(assert_raft_status)(i2, raft_state="Follower", leader_id=1) + retry_call(i2.assert_raft_status, "Follower", leader_id=i1.raft_id) - @retry def do_test(): i2.eval("picolib.raft_tick(20)") - assert_raft_status(i2, raft_state="Leader") - assert_raft_status(i1, raft_state="Follower", leader_id=2) + i1.assert_raft_status("Follower", leader_id=i2.raft_id) + i2.assert_raft_status("Leader") - do_test() + retry_call(do_test) diff --git a/test/int/util.py b/test/int/util.py deleted file mode 100644 index 4197b8e9b17e583ef90f632c10c909ec8bd413ab..0000000000000000000000000000000000000000 --- a/test/int/util.py +++ /dev/null @@ -1,54 +0,0 @@ -import tarantool -from conftest import Instance -import funcy - -# Pre-configured retry decorators with an exponential timeout -retry = funcy.retry( - tries=10, - timeout=lambda attempt_number: 0.01 * 2**attempt_number, -) -retry_on_network_errors = funcy.retry( - tries=10, - errors=tarantool.NetworkError, - timeout=lambda attempt_number: 0.01 * 2**attempt_number, -) - - -def raft_propose_eval(instance: Instance, lua_code: str, timeout_seconds=2): - assert isinstance(instance, Instance) - return instance.call( - "picolib.raft_propose_eval", - timeout_seconds, - lua_code, - ) - - -def raft_status(instance): - return instance.call("picolib.raft_status") - - -def assert_covers(covering: dict, covered: dict): - merged = covered | covering - assert merged == covering - - -_IGNORE = object() - - -def assert_raft_status(instance, raft_state, leader_id=_IGNORE): - assert isinstance(instance, Instance) - status = raft_status(instance) - assert isinstance(status, dict) - if leader_id is _IGNORE: - status["raft_state"] == raft_state - else: - assert_covers(status, {"raft_state": raft_state, "leader_id": leader_id}) - - -@retry -def promote_or_fail(instance): - instance.call("picolib.raft_timeout_now") - status = raft_status(instance) - assert isinstance(status, dict) - raft_status_id = status["id"] - assert_raft_status(instance, raft_state="Leader", leader_id=raft_status_id)