Newer
Older
import funcy # type: ignore
import time
from conftest import (
Cluster,
)
def test_sharding_reinitializes_on_restart(cluster: Cluster):
[i1] = cluster.deploy(instance_count=1)
assert i1.call("vshard.router.info") is not None
incarnation = i1.eval("return pico.instance_info().current_grade.incarnation")
# Instance silently dies without it's grade being updated
i1.kill()
# Instance restarts, it's incarnation is updated and governor reconfigures
# all the subsystems
i1.start()
i1.wait_online(expected_incarnation=incarnation + 1)
# Vshard is configured even though the configuration didn't change
assert i1.call("vshard.router.info") is not None
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def test_bucket_discovery_single(instance: Instance):
@funcy.retry(tries=30, timeout=0.5)
def wait_buckets_awailable(i: Instance, expected: int):
assert i.call("vshard.router.info")["bucket"]["available_rw"] == expected
wait_buckets_awailable(instance, 3000)
def wait_has_buckets(c: Cluster, i: Instance, expected_active: int):
@funcy.retry(tries=30, timeout=0.5)
def buckets_active(i: Instance):
return i.call("vshard.storage.info")["bucket"]["active"]
tries = 4
previous_active = None
while True:
for j in c.instances:
j.eval(
"""
if vshard.storage.internal.rebalancer_fiber ~= nil then
vshard.storage.rebalancer_wakeup()
end
"""
)
actual_active = buckets_active(i)
if actual_active == expected_active:
return
if actual_active == previous_active:
if tries > 0:
tries -= 1
else:
print("vshard.storage.info.bucket.active stopped changing")
assert actual_active == expected_active
previous_active = actual_active
time.sleep(0.5)
def test_bucket_discovery_respects_replication_factor(cluster: Cluster):
i1 = cluster.add_instance(init_replication_factor=2)
time.sleep(0.5)
assert i1.eval("return rawget(_G, 'vshard') == nil")
assert i1.call("box.space._pico_property:get", "vshard_bootstrapped") is None
i2 = cluster.add_instance(replicaset_id="r2")
time.sleep(0.5)
assert i2.eval("return rawget(_G, 'vshard') == nil")
assert i2.call("box.space._pico_property:get", "vshard_bootstrapped") is None
i3 = cluster.add_instance(replicaset_id="r1")
time.sleep(0.5)
assert i3.call("box.space._pico_property:get", "vshard_bootstrapped")[1]
wait_has_buckets(cluster, i3, 3000)
def test_automatic_bucket_rebalancing(cluster: Cluster):
i1 = cluster.add_instance(replicaset_id="r1")
wait_has_buckets(cluster, i1, 3000)
i2 = cluster.add_instance(replicaset_id="r2")
wait_has_buckets(cluster, i2, 1500)
i3 = cluster.add_instance(replicaset_id="r3")
wait_has_buckets(cluster, i3, 1000)
# Set one of the replicaset's weight to 0, to trigger rebalancing from it.
index = cluster.cas(
"update",
"_pico_replicaset",
key=["r1"],
ops=[("=", "weight", 0.0), ("=", "weight_origin", "user")],
)
cluster.raft_wait_index(index)
# This instnace now has no buckets
wait_has_buckets(cluster, i1, 0)
# Set another replicaset's weight to 0.5, to showcase weights are respected.
index = cluster.cas(
"update",
"_pico_replicaset",
key=["r2"],
ops=[("=", "weight", 0.5), ("=", "weight_origin", "user")],
)
cluster.raft_wait_index(index)
# Now i3 has twice as many buckets, because r3.weight == r2.weight * 2
wait_has_buckets(cluster, i2, 1000)
wait_has_buckets(cluster, i3, 2000)
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
def test_bucket_rebalancing_respects_replication_factor(cluster: Cluster):
i1, *_ = cluster.deploy(instance_count=4, init_replication_factor=2)
# wait for buckets to be rebalanced between 2 replicasets 1500 each
for i in cluster.instances:
wait_has_buckets(cluster, i, 1500)
# check vshard routes requests to both replicasets
reached_instances = set()
for bucket_id in [1, 3000]:
info = i1.call("vshard.router.callro", bucket_id, "pico.instance_info")
reached_instances.add(info["instance_id"])
assert len(reached_instances) == 2
# add an instance to a new replicaset
i5 = cluster.add_instance(wait_online=True)
# buckets do not start rebalancing until new replicaset is full
wait_has_buckets(cluster, i5, 0)
for i in cluster.instances:
if i.instance_id != i5.instance_id:
wait_has_buckets(cluster, i, 1500)
# add another instance to new replicaset, it's now full
cluster.add_instance(wait_online=True)
# buckets now must be rebalanced between 3 replicasets 1000 each
for i in cluster.instances:
wait_has_buckets(cluster, i, 1000)
# check vshard routes requests to all 3 replicasets
reached_instances = set()
for bucket_id in [1, 1500, 3000]:
info = i1.call("vshard.router.callro", bucket_id, "pico.instance_info")
reached_instances.add(info["instance_id"])
assert len(reached_instances) == 3
def get_vshards_opinion_about_replicaset_masters(i: Instance):
return i.eval(
"""
local res = {}
for _, r in pairs(vshard.router.static.replicasets) do
res[r.uuid] = r.master.name
end
return res
"""
)
def wait_current_vshard_config_changed(
peer: Instance, old_current_vshard_config, timeout=5
):
def impl():
new_current_vshard_config = peer.eval(
"return box.space._pico_property:get('current_vshard_config').value",
)
assert new_current_vshard_config != old_current_vshard_config
Retriable(timeout=timeout, rps=10).call(impl)
def test_vshard_updates_on_master_change(cluster: Cluster):
i1 = cluster.add_instance(replicaset_id="r1", wait_online=True)
i2 = cluster.add_instance(replicaset_id="r1", wait_online=True)
i3 = cluster.add_instance(replicaset_id="r2", wait_online=True)
i4 = cluster.add_instance(replicaset_id="r2", wait_online=True)
r1_uuid = i1.eval("return box.space._pico_replicaset:get('r1').replicaset_uuid")
r2_uuid = i1.eval("return box.space._pico_replicaset:get('r2').replicaset_uuid")
for i in cluster.instances:
replicaset_masters = get_vshards_opinion_about_replicaset_masters(i)
# The first instance in the replicaset becomes it's master
assert replicaset_masters[r1_uuid] == i1.instance_id
assert replicaset_masters[r2_uuid] == i3.instance_id
old_vshard_config = i1.eval(
"return box.space._pico_property:get('current_vshard_config').value"
)
cluster.cas(
"update",
"_pico_replicaset",
key=["r1"],
ops=[("=", "target_master_id", i2.instance_id)],
)
index = cluster.cas(
"update",
"_pico_replicaset",
key=["r2"],
ops=[("=", "target_master_id", i4.instance_id)],
)
cluster.raft_wait_index(index)
# Wait for governor to change current master and update the vshard config.
wait_current_vshard_config_changed(i1, old_vshard_config)
for i in cluster.instances:
replicaset_masters = get_vshards_opinion_about_replicaset_masters(i)
# Now the chosen instances are replicaset masters
assert replicaset_masters[r1_uuid] == i2.instance_id
assert replicaset_masters[r2_uuid] == i4.instance_id