Skip to content
Snippets Groups Projects
Commit 43a3ce8c authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: extract prepare_update_instance_cas_request function for reusability

parent 259ff4ac
No related branches found
No related tags found
1 merge request!1323fix: don't use handle_update_instance_request_and_wait in governor
......@@ -125,58 +125,28 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
return Err(Error::NoSuchReplicaset { id: replicaset_id.to_string(), id_is_uuid: false });
};
let existing_fds = storage.instances.failure_domain_names()?;
let dml = update_instance(&instance, &req, &existing_fds)?;
let Some((dml, version_bump_needed)) = dml else {
// No point in proposing an operation which doesn't change anything.
// Note: if the request tried setting target state Online while it
// was already Online the incarnation will be increased and so
// dml will not be None and this is the intended behaviour.
return Ok(());
};
debug_assert!(matches!(dml, Dml::Update { .. }), "{dml:?}");
debug_assert_eq!(dml.table_id(), ClusterwideTable::Instance.id());
let mut ops = vec![];
ops.push(dml);
if version_bump_needed &&
// Don't bump version if it's already bumped
replicaset.current_config_version == replicaset.target_config_version
{
let mut update_ops = UpdateOps::new();
#[rustfmt::skip]
update_ops.assign(column_name!(Replicaset, target_config_version), replicaset.target_config_version + 1)?;
#[rustfmt::skip]
let replicaset_dml = Dml::update(ClusterwideTable::Replicaset, &[replicaset_id], update_ops, ADMIN_ID)?;
ops.push(replicaset_dml);
}
let tier = &instance.tier;
let tier = storage
.tiers
.by_name(tier)?
.expect("tier for instance should exists");
if version_bump_needed {
let vshard_bump = Tier::get_vshard_config_version_bump_op_if_needed(&tier)?;
if let Some(dml) = vshard_bump {
ops.push(dml);
}
}
let op = Op::single_dml_or_batch(ops);
let existing_fds = storage.instances.failure_domain_names()?;
let ranges = vec![
cas::Range::new(ClusterwideTable::Instance),
cas::Range::new(ClusterwideTable::Address),
cas::Range::new(ClusterwideTable::Tier),
];
let Some((op, ranges)) = prepare_update_instance_cas_request(
&req,
&instance,
&replicaset,
&tier,
&existing_fds,
)?
else {
return Ok(());
};
let predicate = cas::Predicate::with_applied_index(ranges);
let cas_req = crate::cas::Request::new(op, predicate, ADMIN_ID)?;
let res = cas::compare_and_swap_local(&cas_req, deadline)?;
let cas = crate::cas::Request::new(op, predicate, ADMIN_ID)?;
let res = cas::compare_and_swap_local(&cas, deadline)?;
if req.dont_retry {
res.no_retries()?;
} else if let Some(e) = res.into_retriable_error() {
......@@ -191,6 +161,60 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
}
}
pub fn prepare_update_instance_cas_request(
request: &Request,
instance: &Instance,
replicaset: &Replicaset,
tier: &Tier,
existing_fds: &HashSet<Uppercase>,
) -> Result<Option<(Op, Vec<cas::Range>)>> {
debug_assert_eq!(instance.replicaset_id, replicaset.replicaset_id);
debug_assert_eq!(instance.tier, replicaset.tier);
debug_assert_eq!(instance.tier, tier.name);
let dml = update_instance(instance, request, existing_fds)?;
let Some((dml, version_bump_needed)) = dml else {
// No point in proposing an operation which doesn't change anything.
// Note: if the request tried setting target state Online while it
// was already Online the incarnation will be increased and so
// dml will not be None and this is the intended behaviour.
return Ok(None);
};
debug_assert!(matches!(dml, Dml::Update { .. }), "{dml:?}");
debug_assert_eq!(dml.table_id(), ClusterwideTable::Instance.id());
let mut ops = vec![];
ops.push(dml);
if version_bump_needed &&
// Don't bump version if it's already bumped
replicaset.current_config_version == replicaset.target_config_version
{
let mut update_ops = UpdateOps::new();
#[rustfmt::skip]
update_ops.assign(column_name!(Replicaset, target_config_version), replicaset.target_config_version + 1)?;
#[rustfmt::skip]
let replicaset_dml = Dml::update(ClusterwideTable::Replicaset, &[&replicaset.replicaset_id], update_ops, ADMIN_ID)?;
ops.push(replicaset_dml);
}
if version_bump_needed {
let vshard_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?;
if let Some(dml) = vshard_bump {
ops.push(dml);
}
}
let op = Op::single_dml_or_batch(ops);
let ranges = vec![
cas::Range::new(ClusterwideTable::Instance),
cas::Range::new(ClusterwideTable::Address),
cas::Range::new(ClusterwideTable::Tier),
];
Ok(Some((op, ranges)))
}
/// Returns a pair:
/// - A [`Dml::Update`] operation which should be applied to the `_pico_instance` table to satisfy the `req`.
/// May be `None` if the request is already satisfied.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment