From cb7f653d4483d31da54294988ff35aac729444c2 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Wed, 31 May 2023 17:45:05 +0300 Subject: [PATCH] test: add a couple of ddl drop space tests --- test/conftest.py | 34 ++-- test/int/test_ddl.py | 458 ++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 477 insertions(+), 15 deletions(-) diff --git a/test/conftest.py b/test/conftest.py index 98b73b72f9..d813c8a661 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -624,9 +624,9 @@ class Instance: def cas( self, - dml_kind: Literal["insert", "replace", "delete"], + op_kind: Literal["insert", "replace", "delete", "drop_space"], space: str | int, - tuple: Tuple | List, + tuple: Tuple | List | None = None, index: int | None = None, term: int | None = None, ranges: List[CasRange] | None = None, @@ -666,25 +666,35 @@ class Instance: ranges=predicate_ranges, ) - if dml_kind in ["insert", "replace"]: - dml = dict( + if op_kind in ["insert", "replace"]: + op = dict( kind="dml", - op_kind=dml_kind, + op_kind=op_kind, space=space_id, tuple=msgpack.packb(tuple), ) - elif dml_kind == "delete": - dml = dict( + elif op_kind == "delete": + op = dict( kind="dml", - op_kind=dml_kind, + op_kind=op_kind, space=space_id, key=msgpack.packb(tuple), ) + elif op_kind == "drop_space": + op = dict( + kind="ddl_prepare", + op_kind=op_kind, + schema_version=self.next_schema_version(), + ddl=dict( + kind="drop_space", + id=space_id, + ), + ) else: - raise Exception(f"unsupported {dml_kind=}") + raise Exception(f"unsupported {op_kind=}") - eprint(f"CaS:\n {predicate=}\n {dml=}") - return self.call(".proc_cas", self.cluster_id, predicate, dml)[0]["index"] + eprint(f"CaS:\n {predicate=}\n {op=}") + return self.call(".proc_cas", self.cluster_id, predicate, op)[0]["index"] def next_schema_version(self) -> int: t = self.call("box.space._pico_property:get", "next_schema_version") @@ -1062,7 +1072,7 @@ class Cluster: def create_space(self, params: dict, timeout: float = 3.0): """ - Creates a space. Waits for all peers to be aware of it. + Creates a space. Waits for all online peers to be aware of it. """ index = self.instances[0].create_space(params, timeout) self.raft_wait_index(index, timeout) diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 6b9153ec02..87ae1e3e5e 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -1,4 +1,5 @@ import pytest +import time from conftest import Cluster, ReturnError @@ -12,6 +13,7 @@ def test_ddl_abort(cluster: Cluster): # TODO: test manual abort when we have long-running ddls +################################################################################ def test_ddl_create_space_lua(cluster: Cluster): i1, i2 = cluster.deploy(instance_count=2) @@ -95,6 +97,7 @@ def test_ddl_create_space_lua(cluster: Cluster): assert i2.call("box.space._pico_space:get", space_id) == pico_space_def +################################################################################ def test_ddl_create_space_bulky(cluster: Cluster): i1, i2, i3, i4 = cluster.deploy(instance_count=4, init_replication_factor=2) @@ -236,7 +239,7 @@ def test_ddl_create_space_bulky(cluster: Cluster): assert i4.call("box.space._index:get", [space_id, 0]) == tt_pk_def ############################################################################ - # A new replicaset catches up after the fact successfully + # A new replicaset boots up after the fact successfully i5 = cluster.add_instance(wait_online=True, replicaset_id="r3") @@ -257,7 +260,10 @@ def test_ddl_create_space_bulky(cluster: Cluster): assert i6.call("box.space._space:get", space_id) == tt_space_def assert i6.call("box.space._index:get", [space_id, 0]) == tt_pk_def + # TODO: test replica becoming master in the process of catching up + +################################################################################ def test_ddl_create_sharded_space(cluster: Cluster): i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2) @@ -369,6 +375,7 @@ def test_ddl_create_sharded_space(cluster: Cluster): assert i2.call("box.space._index:get", [space_id, 1]) == tt_bucket_id_def +################################################################################ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): i1, i2, i3 = cluster.deploy(instance_count=3) @@ -422,6 +429,7 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): assert i.eval("return box.space._pico_space:get(...).operable", space_id) +################################################################################ def test_ddl_create_space_partial_failure(cluster: Cluster): # i2 & i3 are for quorum i1, i2, i3, i4, i5 = cluster.deploy(instance_count=5) @@ -490,6 +498,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster): assert i4.call("box.space._space:get", space_id) is not None +################################################################################ def test_successful_wakeup_after_ddl(cluster: Cluster): # Manual replicaset distribution. i1 = cluster.add_instance(replicaset_id="r1", wait_online=True) @@ -526,7 +535,8 @@ def test_successful_wakeup_after_ddl(cluster: Cluster): assert i3.call("box.space._space:get", space_id) is not None -def test_ddl_from_snapshot_at_boot(cluster: Cluster): +################################################################################ +def test_ddl_create_space_from_snapshot_at_boot(cluster: Cluster): # Second instance is only for quorum i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2) @@ -603,7 +613,8 @@ def test_ddl_from_snapshot_at_boot(cluster: Cluster): assert i4.call("box.space._schema:get", "local_schema_version")[1] == 1 -def test_ddl_from_snapshot_at_catchup(cluster: Cluster): +################################################################################ +def test_ddl_create_space_from_snapshot_at_catchup(cluster: Cluster): # Second instance is only for quorum i1 = cluster.add_instance(wait_online=True, replicaset_id="r1") i2 = cluster.add_instance(wait_online=True, replicaset_id="R2") @@ -665,6 +676,7 @@ def test_ddl_from_snapshot_at_catchup(cluster: Cluster): assert i3.call("box.space._schema:get", "local_schema_version")[1] == 1 +################################################################################ def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster): # For quorum. i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) @@ -704,3 +716,443 @@ def test_ddl_create_space_at_catchup_with_master_switchover(cluster: Cluster): # A master catches up by snapshot assert i4.call("box.space._space.index.name:get", space_name) is not None + + +################################################################################ +def test_ddl_drop_space_normal(cluster: Cluster): + # 2 replicasets with 2 replicas each + i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2) + + # Set up. + space_name = "things" + cluster.create_space( + dict( + name=space_name, + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + # Actual behaviour we're testing + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + for i in cluster.instances: + i.raft_wait_index(index_commit) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is None + + # Now we can create another space with the same name. + cluster.create_space( + dict( + name=space_name, + format=[ + dict(name="id", type="unsigned", is_nullable=False), + dict(name="value", type="any", is_nullable=False), + ], + primary_key=["id"], + distribution="global", + ), + ) + + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + +################################################################################ +def test_ddl_drop_space_partial_failure(cluster: Cluster): + # First 3 are fore quorum. + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=1) + # Test subjects. + i4 = cluster.add_instance(wait_online=True, replicaset_id="R99") + i5 = cluster.add_instance(wait_online=True, replicaset_id="R99") + + # Set up. + space_name = "trinkets" + cluster.create_space( + dict( + name=space_name, + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + index = i1.cas("insert", space_name, [9]) + for i in cluster.instances: + i.raft_wait_index(index) + + # Put a replicaset to sleep. + i4.terminate() + i5.terminate() + + # Ddl fails because all masters must be present. + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + with pytest.raises(ReturnError, match="timeout"): + i1.raft_wait_index(index_commit, timeout=3) + + entry, *_ = i1.call( + "box.space._raft_log:select", None, dict(iterator="lt", limit=1) + ) + # Has not yet been finalized + assert entry[4][1][0] == "ddl_prepare" + + # Space is not yet dropped. + assert i1.call("box.space._space.index.name:get", space_name) is not None + assert i2.call("box.space._space.index.name:get", space_name) is not None + assert i3.call("box.space._space.index.name:get", space_name) is not None + + # And no data is lost yet. + assert i1.call("box.space.trinkets:get", 9) == [9] + assert i2.call("box.space.trinkets:get", 9) == [9] + assert i3.call("box.space.trinkets:get", 9) == [9] + + # But the space is marked not operable. + assert not i1.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + assert not i2.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + assert not i3.eval( + "return box.space._pico_space.index.name:get(...).operable", space_name + ) + + # TODO: test manual ddl abort + + # Wakeup the sleeping master. + i4.start() + i4.wait_online() + + # TODO: how do we sync raft log at this point? + time.sleep(2) + + # Now space is dropped. + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + assert i3.call("box.space._space.index.name:get", space_name) is None + assert i4.call("box.space._space.index.name:get", space_name) is None + + # And a replica catches up by raft log successfully. + i5.start() + i5.wait_online() + assert i5.call("box.space._space.index.name:get", space_name) is None + + +################################################################################ +def test_ddl_drop_space_by_raft_log_at_catchup(cluster: Cluster): + # i1 is for quorum + i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1) + i2 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This one will be catching up. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # i3 will be catching up. + i3.terminate() + + # Drop the spaces + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # Wake up the catching up instance. + i3.start() + i3.wait_online() + + # The space was dropped. + assert i3.call("box.space._space.index.name:get", "drop_me") is None + + # The space was dropped and a new one was created without conflict. + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_raft_log_at_boot(cluster: Cluster): + # These guys are for quorum. + i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) + + # + # Set up. + # + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # + # Drop spaces. + # + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # + # Add a new replicaset. + # + i3 = cluster.add_instance(wait_online=False, replicaset_id="r99") + i4 = cluster.add_instance(wait_online=False, replicaset_id="r99") + i3.start() + i4.start() + i3.wait_online() + i4.wait_online() + + # + # Both caught up successfully. + # + assert i3.call("box.space._space.index.name:get", "drop_me") is None + assert i4.call("box.space._space.index.name:get", "drop_me") is None + + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + format = i4.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_snapshot_on_replica(cluster: Cluster): + # i1 is for quorum + i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=1) + i2 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This one will be catching up. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="replace_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "replace_me") is not None + + cluster.create_space( + dict( + name="drop_me", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", "drop_me") is not None + + # i3 will be catching up. + i3.terminate() + + for space_name in ["replace_me", "drop_me"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="replace_me", + format=[ + dict(name="#", type="unsigned", is_nullable=False), + ], + primary_key=["#"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "replace_me") is not None + assert i2.call("box.space._space.index.name:get", "replace_me") is not None + + # Compact raft log to trigger snapshot generation. + i1.raft_compact_log() + i2.raft_compact_log() + + # Wake up the catching up instance. + i3.start() + i3.wait_online() + + # The space was dropped. + assert i3.call("box.space._space.index.name:get", "drop_me") is None + + # The space was dropped and a new one was created without conflict. + format = i3.eval("return box.space[...]:format()", "replace_me") + assert [f["name"] for f in format] == ["#"] + + +################################################################################ +def test_ddl_drop_space_by_snapshot_on_master(cluster: Cluster): + # These ones are for quorum. + i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=1) + # This is a replicaset master, who will be following along with the ddl. + i3 = cluster.add_instance(wait_online=True, replicaset_id="r99") + # This is a replica, who will become master and be catching up. + i4 = cluster.add_instance(wait_online=True, replicaset_id="r99") + + # Set up. + cluster.create_space( + dict( + name="space_to_drop", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + cluster.create_space( + dict( + name="space_to_replace", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="sharded", + sharding_key=["id"], + sharding_fn="murmur3", + ), + ) + + for space_name in ["space_to_drop", "space_to_replace"]: + for i in cluster.instances: + assert i.call("box.space._space.index.name:get", space_name) is not None + + # i4 will be catching up. + i4.terminate() + + # + # Drop spaces. + # + for space_name in ["space_to_drop", "space_to_replace"]: + index = i1.cas("drop_space", space=space_name) + index_commit = index + 1 + i1.raft_wait_index(index_commit) + i2.raft_wait_index(index_commit) + i3.raft_wait_index(index_commit) + assert i1.call("box.space._space.index.name:get", space_name) is None + assert i2.call("box.space._space.index.name:get", space_name) is None + assert i3.call("box.space._space.index.name:get", space_name) is None + + # We replace a sharded space with a global one to check indexes were dropped + # correctly. + cluster.create_space( + dict( + name="space_to_replace", + format=[dict(name="id", type="unsigned", is_nullable=False)], + primary_key=["id"], + distribution="global", + ), + ) + + assert i1.call("box.space._space.index.name:get", "space_to_replace") is not None + assert i2.call("box.space._space.index.name:get", "space_to_replace") is not None + assert i3.call("box.space._space.index.name:get", "space_to_replace") is not None + + # Compact raft log to trigger snapshot generation. + i1.raft_compact_log() + i2.raft_compact_log() + + # Put i3 to sleep to trigger master switchover. + i3.terminate() + + # Wake up the catching up instance. i4 has become master and. + i4.start() + i4.wait_online() + + # The space was dropped. + # assert i4.call("box.space._space.index.name:get", "space_to_drop") is None + # The space was replaced. + assert i4.call("box.space._space.index.name:get", "space_to_replace") is not None -- GitLab