From 9d6449d415be15ac30c788c2ae27f91dc3cc4a68 Mon Sep 17 00:00:00 2001
From: Diana Tikhonova <d.tihonova@picodata.io>
Date: Thu, 30 Jan 2025 14:26:08 +0300
Subject: [PATCH] test: init test_topology_management_with_failures

---
 src/discovery.rs        |   2 +
 src/governor/mod.rs     |   7 +
 test/int/test_basics.py | 283 ++++++++++++++++++++++++++++++++++++++++
 3 files changed, 292 insertions(+)

diff --git a/src/discovery.rs b/src/discovery.rs
index a4eea1551d..bd2671b26b 100644
--- a/src/discovery.rs
+++ b/src/discovery.rs
@@ -210,6 +210,7 @@ pub async fn wait_global_async() -> Role {
 
 #[proc]
 fn proc_discover<'a>(request: Request, request_to: Address) -> Result<Response, Box<dyn StdError>> {
+    crate::error_injection!(block "BLOCK_DISCOVERY_START");
     let ready_ids = traft::node::global().ok().and_then(|node| {
         let status = node.status();
         status
@@ -218,6 +219,7 @@ fn proc_discover<'a>(request: Request, request_to: Address) -> Result<Response,
     });
     if let Some((peers_addresses, leader_id, id)) = ready_ids {
         let leader_address = peers_addresses.try_get(leader_id)?;
+        crate::error_injection!(block "BLOCK_AFTER_DISCOVERY");
         Ok(Response::Done(Role::new(leader_address, leader_id == id)))
     } else {
         let mut discovery = discovery();
diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index ee2cce073d..9b73169bb4 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -237,6 +237,7 @@ impl Loop {
                     tlog!(Warning, "failed proposing conf_change: {e}");
                     fiber::sleep(Loop::RETRY_TIMEOUT);
                 }
+                crate::error_injection!(block "BLOCK_AFTER_CONF_CHANGE");
             }
 
             Plan::TransferLeadership(TransferLeadership { to }) => {
@@ -484,6 +485,8 @@ impl Loop {
                     }
                 }
 
+                crate::error_injection!(block "BLOCK_AFTER_CONFIGURE_REPLICATION");
+
                 governor_step! {
                     "actualizing replicaset configuration version" [
                         "replicaset_name" => %replicaset_name,
@@ -585,6 +588,8 @@ impl Loop {
                     }
                 }
 
+                crate::error_injection!(block "BLOCK_AFTER_ENABLE_PLUGINS");
+
                 governor_step! {
                     "handling instance state change" [
                         "instance_name" => %target,
@@ -908,6 +913,8 @@ impl Loop {
                     }
                 }
 
+                crate::error_injection!(block "BLOCK_AFTER_VSHARD_UPDATE");
+
                 governor_step! {
                     "updating current vshard config"
                     async {
diff --git a/test/int/test_basics.py b/test/int/test_basics.py
index 1f666487d7..a13d9e97d6 100644
--- a/test/int/test_basics.py
+++ b/test/int/test_basics.py
@@ -1,6 +1,7 @@
 import os
 import pytest
 import signal
+import random
 
 from conftest import (
     Cluster,
@@ -926,3 +927,285 @@ def test_stale_governor_replication_requests(cluster: Cluster):
         i2.assert_raft_status("Leader")
 
     Retriable(timeout=10).call(check_replication)
+
+
+def test_topology_management_with_failures(cluster: Cluster, seed):
+    print(f"Running topology management test with seed: {seed}")
+
+    rng = random.Random(seed)
+
+    # create a cluster
+    rf = random.choice([1, 2, 3])
+    nodes_count = random.choice([3, 5, 6])
+    instances = cluster.deploy(instance_count=nodes_count, init_replication_factor=rf)
+
+    leader = instances[0]
+
+    def stabilize_leader():
+        try:
+            leader.promote_or_fail()
+            leader.wait_online()
+        except:
+            return False
+
+    Retriable(timeout=120).call(stabilize_leader)
+
+    # create a table with test data
+    leader.sql(
+        """
+        CREATE TABLE test_table(
+            id INT PRIMARY KEY,
+            value TEXT
+        ) 
+    """
+    )
+
+    def verify_table_created():
+        for instance in instances:
+            tables = instance.sql("SELECT name FROM _pico_table")
+            assert any(name == "test_table" for [name] in tables)
+
+    Retriable(timeout=120).call(verify_table_created)
+
+    # add many entries so that they are distributed across different shards
+    for i in range(1000):
+        test_id = i
+        leader.sql("INSERT INTO test_table VALUES(?, ?)", test_id, f"value_{i}")
+
+    # remove a random node from the cluster
+    instance_to_expel = rng.choice([i for i in instances if i != leader])
+
+    def expel_instance_to_expel_with_retry():
+        try:
+            cluster.expel(instance_to_expel)
+            return True
+        except:
+            return False
+
+    Retriable(timeout=60).call(expel_instance_to_expel_with_retry)
+    nodes_count -= 1
+
+    # add a new node with an injected error, randomly selected
+    injection_type = rng.choice(
+        [
+            "BLOCK_AFTER_CONF_CHANGE",
+            "BLOCK_AFTER_CONFIGURE_REPLICATION",
+            "BLOCK_AFTER_VSHARD_UPDATE",
+            "BLOCK_AFTER_ENABLE_PLUGINS",
+            "BLOCK_DISCOVERY_START",
+            "BLOCK_AFTER_DISCOVERY",
+        ]
+    )
+
+    injection_hit = log_crawler(leader, f"ERROR INJECTION '{injection_type}': BLOCKING")
+    leader.call("pico._inject_error", injection_type, True)
+
+    new_instance = cluster.add_instance(wait_online=False)
+    new_instance.start()
+    nodes_count += 1
+
+    # wait for the error injection to trigger
+    injection_hit.wait_matched()
+
+    # perform operations during the pause:
+    # create a topology event
+    another_instance = cluster.add_instance(wait_online=False)
+    another_instance.start()
+    nodes_count += 1
+
+    instances += [new_instance, another_instance]
+
+    # create a schema event
+    def create_table():
+        try:
+            leader.sql("CREATE TABLE test_table2 (id INT PRIMARY KEY)")
+
+            index = leader.raft_get_index()
+            cluster.raft_wait_index(index, timeout=60)
+
+            def verify_replication():
+                for inst in instances:
+                    tables = inst.sql("SELECT name FROM _pico_table")
+                    assert any(name == "test_table2" for [name] in tables)
+                return True
+
+            Retriable(timeout=120).call(verify_replication)
+        except Exception:
+            return False
+
+    Retriable(timeout=120).call(create_table)
+
+    leader.wait_online(timeout=90)
+
+    # modify data
+    def insert_data():
+        try:
+            leader.sql("INSERT INTO test_table VALUES(?, ?)", 1001, "new_value")
+            return True
+        except:
+            return False
+
+    Retriable(timeout=120).call(insert_data)
+
+    # remove the entire data folder of the suspended node
+    new_instance.remove_data()
+
+    # remove the suspended node from the cluster
+    def expel_new_instance():
+        try:
+            cluster.expel(new_instance, peer=leader)
+            return True
+        except:
+            return False
+
+    Retriable(timeout=60).call(expel_new_instance)
+    nodes_count -= 1
+
+    def wait_all_instances():
+        try:
+            active = cluster.online_instances()
+            if len(active) != nodes_count:
+                return False
+
+            for instance in active:
+                try:
+                    instance.eval("return true")
+                except:
+                    return False
+            return True
+        except:
+            return False
+
+    Retriable(timeout=30).call(wait_all_instances)
+
+    # restart a random number of remaining active nodes
+    active_instances = [
+        i
+        for i in instances
+        if i != instance_to_expel and i != leader and i != new_instance
+    ]
+    restart_count = random.randint(1, len(active_instances) - 1)
+    for instance in random.sample(active_instances, restart_count):
+        instance.restart()
+
+    def wait_quorum():
+        online = 0
+        for instance in active_instances:
+            try:
+                instance.eval("return true")
+                online += 1
+            except:
+                pass
+        return online >= (len(active_instances) // 2 + 1)
+
+    Retriable(timeout=120).call(wait_quorum)
+
+    # restart the raft leader node
+    leader.restart()
+
+    Retriable(timeout=120).call(wait_quorum)
+
+    def wait_leader_election():
+        leader.promote_or_fail()
+
+    Retriable(timeout=120).call(wait_leader_election)
+
+    # remove the lock
+    def disable_injection():
+        leader.call("pico._inject_error", injection_type, False)
+
+    Retriable(timeout=30).call(disable_injection)
+
+    # restart the suspended node
+    new_instance.restart()
+
+    Retriable(timeout=30).call(wait_all_instances)
+
+    # check the cluster state
+    def check_cluster():
+        def verify_cluster_state():
+            try:
+                leader = None
+                for instance in active_instances:
+                    try:
+                        status = instance.call(".proc_raft_info")
+                        if status["state"] == "Leader":
+                            if leader is not None:
+                                return False
+                            leader = instance
+                    except:
+                        continue
+
+                if not leader:
+                    return False
+
+                followers = 0
+                for instance in active_instances:
+                    if instance != leader:
+                        try:
+                            instance.assert_raft_status("Follower", leader_id=leader.raft_id)
+                            followers += 1
+                        except:
+                            continue
+
+                return followers == nodes_count - 1
+            except:
+                return False
+
+        Retriable(timeout=120).call(verify_cluster_state)
+
+        for i, instance in enumerate(active_instances):
+            instance.check_process_alive()
+            instance.wait_online()
+
+            def check_table_created():
+                tables = instance.sql("SELECT name FROM _pico_table")
+                assert any(name == "test_table2" for [name] in tables)
+                unique_id = i * 1000 + rng.randint(0, 999)
+
+                def do_insert():
+                    try:
+                        instance.sql("INSERT INTO test_table2 VALUES(?)", unique_id)
+                        return True
+                    except:
+                        return False
+
+                Retriable(timeout=20).call(do_insert)
+
+            Retriable(timeout=40).call(check_table_created)
+
+            def check_data():
+                test_id = 3000 + i * 1000 + rng.randint(0, 999)
+
+                def do_insert():
+                    try:
+                        instance.sql(
+                            "INSERT INTO test_table VALUES(?, ?)", test_id, "test_write"
+                        )
+                        return True
+                    except:
+                        return False
+
+                Retriable(timeout=20).call(do_insert)
+
+                def verify_count():
+                    try:
+                        for inst in active_instances:
+                            try:
+                                status = inst.call(".proc_raft_info")
+                                if status["state"] == "Leader":
+                                    [[count]] = inst.sql(
+                                        "SELECT count(*) FROM test_table"
+                                    )
+                                    return count >= 1000
+                            except:
+                                continue
+                        return False
+                    except:
+                        return False
+
+                Retriable(timeout=40).call(verify_count)
+
+            Retriable(timeout=60).call(check_data)
+
+    Retriable(timeout=240).call(check_cluster)
-- 
GitLab