Skip to content
Snippets Groups Projects
Commit d291744c authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix(governor): don't bootstrap vshard until replication factor is satisfied

NOTE: there's still a bug, because we set each new replicaset's weight
to 1 (even ones which don't satisfy replication factor)
before the bucket distribution is bootstrapped. But there must be at
least one replicaset with non zero weight in order for vshard.*.cfg to
work.

A potential solution would be to only configure vshard once a replicaset
is filled up.
parent 93309ba7
No related branches found
No related tags found
1 merge request!429fix(governor): don't bootstrap vshard until replication factor is satisfied
Pipeline #14313 passed
......@@ -161,7 +161,6 @@ impl Loop {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
bootstrap: false,
},
)
});
......@@ -449,7 +448,6 @@ impl Loop {
});
if let Some(instance) = to_shard {
let res: Result<_> = async {
let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?;
let commit = raft_storage.commit()?.unwrap();
let reqs = maybe_responding(&instances).map(|instance| {
(
......@@ -458,7 +456,6 @@ impl Loop {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
bootstrap: !vshard_bootstrapped && instance.raft_id == node.raft_id,
},
)
});
......@@ -480,19 +477,6 @@ impl Loop {
));
node.handle_update_instance_request_and_wait(req)?;
if !vshard_bootstrapped {
// TODO: if this fails, it will only rerun next time vshard
// gets reconfigured
node.propose_and_wait(
OpDML::replace(
ClusterwideSpace::Property,
&(PropertyName::VshardBootstrapped, true),
)?,
// TODO: don't hard code the timeout
Duration::from_secs(3),
)??;
}
Ok(())
}
.await;
......@@ -508,6 +492,59 @@ impl Loop {
return Continue;
}
////////////////////////////////////////////////////////////////////////
// bootstrap sharding
let to_bootstrap = get_first_full_replicaset(&instances, storage);
if let Err(e) = to_bootstrap {
tlog!(
Warning,
"failed checking if bucket bootstrapping is needed: {e}"
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
return Continue;
}
if let Ok(Some(Replicaset { master_id, .. })) = to_bootstrap {
// TODO: change `Info` to `Debug`
tlog!(Info, "bootstrapping bucket distribution";
"instance_id" => %master_id,
);
let res: Result<_> = async {
let req = sharding::bootstrap::Request {
term,
commit: raft_storage.commit()?.unwrap(),
timeout: Self::SYNC_TIMEOUT,
};
pool.call(&master_id, &req)?
// TODO: don't hard code timeout
.timeout(Duration::from_secs(3))
.await??;
let op = OpDML::replace(
ClusterwideSpace::Property,
&(PropertyName::VshardBootstrapped, true),
)?;
// TODO: don't hard code timeout
node.propose_and_wait(op, Duration::from_secs(3))??;
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning, "failed bootstrapping bucket distribution: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
return Continue;
}
// TODO: change `Info` to `Debug`
tlog!(Info, "bootstrapped bucket distribution";
"instance_id" => %master_id,
);
return Continue;
};
////////////////////////////////////////////////////////////////////////
// sharding weights
let to_update_weights = instances.iter().find(|instance| {
......@@ -538,7 +575,6 @@ impl Loop {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
bootstrap: false,
}));
// TODO: don't hard code timeout
let res = call_all(pool, reqs, Duration::from_secs(3)).await?;
......@@ -748,6 +784,31 @@ fn get_weight_changes<'p>(
(!weight_changes.is_empty()).then_some(weight_changes)
}
#[inline(always)]
fn get_first_full_replicaset(
instances: &[Instance],
storage: &Clusterwide,
) -> Result<Option<Replicaset>> {
if storage.properties.vshard_bootstrapped()? {
return Ok(None);
}
let replication_factor = storage.properties.replication_factor()?;
let mut replicaset_sizes = HashMap::new();
let mut full_replicaset_id = None;
for Instance { replicaset_id, .. } in maybe_responding(instances) {
let replicaset_size = replicaset_sizes.entry(replicaset_id).or_insert(0);
*replicaset_size += 1;
if *replicaset_size >= replication_factor {
full_replicaset_id = Some(replicaset_id);
}
}
let Some(replicaset_id) = full_replicaset_id else { return Ok(None); };
let res = storage.replicasets.get(replicaset_id)?;
Ok(res)
}
#[inline(always)]
fn maybe_responding(instances: &[Instance]) -> impl Iterator<Item = &Instance> {
instances.iter().filter(|instance| instance.may_respond())
......
use ::tarantool::tlua;
use crate::traft::rpc::sync::wait_for_index_timeout;
use crate::traft::Result;
use crate::traft::{node, RaftIndex, RaftTerm};
......@@ -9,7 +10,7 @@ crate::define_rpc_request! {
fn proc_sharding(req: Request) -> Result<Response> {
let node = node::global()?;
node.status().check_term(req.term)?;
super::sync::wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
let storage = &node.storage;
let cfg = cfg::Cfg::from_storage(storage)?;
......@@ -32,10 +33,6 @@ crate::define_rpc_request! {
)
.map_err(tlua::LuaError::from)?;
if req.bootstrap {
lua.exec("vshard.router.bootstrap()")?;
}
// After reconfiguring vshard leaves behind net.box.connection objects,
// which try reconnecting every 0.5 seconds. Garbage collecting them helps
lua.exec("collectgarbage()")?;
......@@ -49,7 +46,6 @@ crate::define_rpc_request! {
pub term: RaftTerm,
pub commit: RaftIndex,
pub timeout: Duration,
pub bootstrap: bool,
}
/// Response to [`sharding::Request`].
......@@ -58,6 +54,35 @@ crate::define_rpc_request! {
pub struct Response {}
}
pub mod bootstrap {
use super::*;
crate::define_rpc_request! {
fn proc_sharding_bootstrap(req: Request) -> Result<Response> {
let node = node::global()?;
node.status().check_term(req.term)?;
wait_for_index_timeout(req.commit, &node.raft_storage, req.timeout)?;
::tarantool::lua_state().exec("vshard.router.bootstrap()")?;
Ok(Response {})
}
/// Request to bootstrap bucket distribution.
#[derive(Default)]
pub struct Request {
pub term: RaftTerm,
pub commit: RaftIndex,
pub timeout: Duration,
}
/// Response to [`sharding::bootstrap::Request`].
///
/// [`sharding::bootstrap::Request`]: Request
pub struct Response {}
}
}
#[rustfmt::skip]
pub mod cfg {
use crate::storage::Clusterwide;
......
......@@ -137,24 +137,35 @@ def test_bucket_discovery_single(instance: Instance):
wait_buckets_awailable(instance, 3000)
@funcy.retry(tries=30, timeout=0.2)
def wait_has_buckets(i: Instance, expected_active: int):
i.call("vshard.storage.rebalancer_wakeup")
storage_info = i.call("vshard.storage.info")
assert expected_active == storage_info["bucket"]["active"]
@pytest.mark.xfail(
run=True,
reason=(
"currently we bootstrap vshard even before the first replicaset is filled, "
"but we shouldn't"
"currently we set non zero weights for all replicasets before bootstrap, "
"even those which don't satisfy the replication factor"
),
)
def test_bucket_discovery_respects_replication_factor(cluster: Cluster):
i1, *_ = cluster.deploy(instance_count=1, init_replication_factor=2)
time.sleep(1)
time.sleep(0.5)
assert 0 == i1.call("vshard.router.info")["bucket"]["available_rw"]
assert None is i1.call("pico.space.property:get", "vshard_bootstrapped")
i2 = cluster.add_instance(replicaset_id="r2")
time.sleep(0.5)
assert 0 == i2.call("vshard.router.info")["bucket"]["available_rw"]
assert None is i2.call("pico.space.property:get", "vshard_bootstrapped")
@funcy.retry(tries=30, timeout=0.2)
def wait_has_buckets(i: Instance, expected_active: int):
i.call("vshard.storage.rebalancer_wakeup")
storage_info = i.call("vshard.storage.info")
assert expected_active == storage_info["bucket"]["active"]
i3 = cluster.add_instance(replicaset_id="r1")
time.sleep(0.5)
assert i3.call("pico.space.property:get", "vshard_bootstrapped")[1]
wait_has_buckets(i3, 3000)
def test_bucket_rebalancing(cluster: Cluster):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment