From 43a3ce8c7a6c293db87c38ce56599a102d445835 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 26 Sep 2024 16:20:08 +0300 Subject: [PATCH] refactor: extract prepare_update_instance_cas_request function for reusability --- src/rpc/update_instance.rs | 110 ++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 43 deletions(-) diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index a9704f0a81..e0da50c73e 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -125,58 +125,28 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) return Err(Error::NoSuchReplicaset { id: replicaset_id.to_string(), id_is_uuid: false }); }; - let existing_fds = storage.instances.failure_domain_names()?; - - let dml = update_instance(&instance, &req, &existing_fds)?; - let Some((dml, version_bump_needed)) = dml else { - // No point in proposing an operation which doesn't change anything. - // Note: if the request tried setting target state Online while it - // was already Online the incarnation will be increased and so - // dml will not be None and this is the intended behaviour. - return Ok(()); - }; - debug_assert!(matches!(dml, Dml::Update { .. }), "{dml:?}"); - debug_assert_eq!(dml.table_id(), ClusterwideTable::Instance.id()); - - let mut ops = vec![]; - ops.push(dml); - - if version_bump_needed && - // Don't bump version if it's already bumped - replicaset.current_config_version == replicaset.target_config_version - { - let mut update_ops = UpdateOps::new(); - #[rustfmt::skip] - update_ops.assign(column_name!(Replicaset, target_config_version), replicaset.target_config_version + 1)?; - #[rustfmt::skip] - let replicaset_dml = Dml::update(ClusterwideTable::Replicaset, &[replicaset_id], update_ops, ADMIN_ID)?; - ops.push(replicaset_dml); - } - let tier = &instance.tier; let tier = storage .tiers .by_name(tier)? .expect("tier for instance should exists"); - if version_bump_needed { - let vshard_bump = Tier::get_vshard_config_version_bump_op_if_needed(&tier)?; - if let Some(dml) = vshard_bump { - ops.push(dml); - } - } - - let op = Op::single_dml_or_batch(ops); + let existing_fds = storage.instances.failure_domain_names()?; - let ranges = vec![ - cas::Range::new(ClusterwideTable::Instance), - cas::Range::new(ClusterwideTable::Address), - cas::Range::new(ClusterwideTable::Tier), - ]; + let Some((op, ranges)) = prepare_update_instance_cas_request( + &req, + &instance, + &replicaset, + &tier, + &existing_fds, + )? + else { + return Ok(()); + }; let predicate = cas::Predicate::with_applied_index(ranges); - let cas_req = crate::cas::Request::new(op, predicate, ADMIN_ID)?; - let res = cas::compare_and_swap_local(&cas_req, deadline)?; + let cas = crate::cas::Request::new(op, predicate, ADMIN_ID)?; + let res = cas::compare_and_swap_local(&cas, deadline)?; if req.dont_retry { res.no_retries()?; } else if let Some(e) = res.into_retriable_error() { @@ -191,6 +161,60 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) } } +pub fn prepare_update_instance_cas_request( + request: &Request, + instance: &Instance, + replicaset: &Replicaset, + tier: &Tier, + existing_fds: &HashSet<Uppercase>, +) -> Result<Option<(Op, Vec<cas::Range>)>> { + debug_assert_eq!(instance.replicaset_id, replicaset.replicaset_id); + debug_assert_eq!(instance.tier, replicaset.tier); + debug_assert_eq!(instance.tier, tier.name); + + let dml = update_instance(instance, request, existing_fds)?; + let Some((dml, version_bump_needed)) = dml else { + // No point in proposing an operation which doesn't change anything. + // Note: if the request tried setting target state Online while it + // was already Online the incarnation will be increased and so + // dml will not be None and this is the intended behaviour. + return Ok(None); + }; + debug_assert!(matches!(dml, Dml::Update { .. }), "{dml:?}"); + debug_assert_eq!(dml.table_id(), ClusterwideTable::Instance.id()); + + let mut ops = vec![]; + ops.push(dml); + + if version_bump_needed && + // Don't bump version if it's already bumped + replicaset.current_config_version == replicaset.target_config_version + { + let mut update_ops = UpdateOps::new(); + #[rustfmt::skip] + update_ops.assign(column_name!(Replicaset, target_config_version), replicaset.target_config_version + 1)?; + #[rustfmt::skip] + let replicaset_dml = Dml::update(ClusterwideTable::Replicaset, &[&replicaset.replicaset_id], update_ops, ADMIN_ID)?; + ops.push(replicaset_dml); + } + + if version_bump_needed { + let vshard_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?; + if let Some(dml) = vshard_bump { + ops.push(dml); + } + } + + let op = Op::single_dml_or_batch(ops); + + let ranges = vec![ + cas::Range::new(ClusterwideTable::Instance), + cas::Range::new(ClusterwideTable::Address), + cas::Range::new(ClusterwideTable::Tier), + ]; + Ok(Some((op, ranges))) +} + /// Returns a pair: /// - A [`Dml::Update`] operation which should be applied to the `_pico_instance` table to satisfy the `req`. /// May be `None` if the request is already satisfied. -- GitLab