Skip to content
Snippets Groups Projects
Commit ed95d2d9 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: move version bumping from governor Downgrade to update_instance

Closes #875
parent f4fb6b2c
No related branches found
No related tags found
No related merge requests found
Pipeline #49354 passed
......@@ -22,7 +22,7 @@ use crate::rpc::replication::proc_replication_demote;
use crate::rpc::replication::SyncAndPromoteRequest;
use crate::rpc::sharding::bootstrap::proc_sharding_bootstrap;
use crate::rpc::sharding::proc_sharding;
use crate::rpc::update_instance::handle_update_instance_request_in_governor_and_also_wait_too;
use crate::rpc::update_instance::handle_update_instance_request_and_wait;
use crate::schema::ADMIN_ID;
use crate::storage::Clusterwide;
use crate::storage::ClusterwideTable;
......@@ -314,11 +314,7 @@ impl Loop {
}
}
Plan::Downgrade(Downgrade {
req,
replication_config_version_bump,
vshard_config_version_bump,
}) => {
Plan::Downgrade(Downgrade { req }) => {
set_status(governor_status, "update instance state to offline");
tlog!(Info, "downgrading instance {}", req.instance_id);
......@@ -330,18 +326,7 @@ impl Loop {
"current_state" => %current_state,
]
async {
let mut ops = vec![];
if let Some(bump) = replication_config_version_bump {
ops.push(bump);
}
if let Some(bump) = vshard_config_version_bump {
ops.push(bump);
}
handle_update_instance_request_in_governor_and_also_wait_too(
req,
&ops,
Loop::UPDATE_INSTANCE_TIMEOUT,
)?
handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
}
}
}
......@@ -476,7 +461,7 @@ impl Loop {
"current_state" => %current_state,
]
async {
handle_update_instance_request_in_governor_and_also_wait_too(req, &[], Loop::UPDATE_INSTANCE_TIMEOUT)?
handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)?
}
}
}
......
......@@ -75,7 +75,6 @@ pub(super) fn action_plan<'i>(
if let Some(Instance {
raft_id,
instance_id,
replicaset_id,
target_state,
..
}) = to_downgrade
......@@ -112,26 +111,7 @@ pub(super) fn action_plan<'i>(
let req = rpc::update_instance::Request::new(instance_id.clone(), cluster_id)
.with_current_state(*target_state);
let replication_config_version_bump =
get_replicaset_config_version_bump_op_if_needed(replicasets, replicaset_id);
let mut vshard_config_version_bump = None;
#[rustfmt::skip]
if target_vshard_config_version == current_vshard_config_version {
// Only bump the version if it's not already bumped.
vshard_config_version_bump = Some(Dml::replace(
ClusterwideTable::Property,
&(&PropertyName::TargetVshardConfigVersion, target_vshard_config_version + 1),
ADMIN_ID,
)?);
};
return Ok(Downgrade {
req,
replication_config_version_bump,
vshard_config_version_bump,
}
.into());
return Ok(Downgrade { req }.into());
}
////////////////////////////////////////////////////////////////////////////
......@@ -762,11 +742,6 @@ pub mod stage {
/// Update instance request which translates into a global DML operation
/// which updates `current_state` to `Offline` in table `_pico_instance` for a given instance.
pub req: rpc::update_instance::Request,
/// Global DML operation which updates `target_config_version` in table `_pico_replicaset`
/// for the replicaset of the instance which is going `Offline`.
pub replication_config_version_bump: Option<Dml>,
/// Global DML operation which updates `target_vshard_config_version` in table `_pico_property`.
pub vshard_config_version_bump: Option<Dml>,
}
pub struct ConfigureReplication<'i> {
......
......@@ -390,7 +390,7 @@ mod tests {
let mut ops = UpdateOps::new();
ops.assign("target_state", State::new(Offline, 0)).unwrap();
assert_eq!(dml, update_instance_dml("i1", ops));
assert_eq!(do_bump, false);
assert_eq!(do_bump, true, "target state change requires replicaset config version bump");
storage.do_dml(&dml).unwrap();
let instance = storage.instances.get(&InstanceId::from("i1")).unwrap();
......@@ -405,7 +405,7 @@ mod tests {
let mut ops = UpdateOps::new();
ops.assign("target_state", State::new(Online, 1)).unwrap();
assert_eq!(dml, update_instance_dml("i1", ops));
assert_eq!(do_bump, true, "incarnation bump requires replicaset config version bump");
assert_eq!(do_bump, true, "target state change requires replicaset config version bump");
storage.do_dml(&dml).unwrap();
let instance = storage.instances.get(&InstanceId::from("i1")).unwrap();
......@@ -420,7 +420,7 @@ mod tests {
let mut ops = UpdateOps::new();
ops.assign("target_state", State::new(Online, 2)).unwrap();
assert_eq!(dml, update_instance_dml("i1", ops));
assert_eq!(do_bump, true, "incarnation bump requires replicaset config version bump");
assert_eq!(do_bump, true, "target state change requires replicaset config version bump");
storage.do_dml(&dml).unwrap();
let instance = storage.instances.get(&InstanceId::from("i1")).unwrap();
......@@ -435,7 +435,7 @@ mod tests {
let mut ops = UpdateOps::new();
ops.assign("target_state", State::new(Expelled, 0)).unwrap();
assert_eq!(dml, update_instance_dml("i1", ops));
assert_eq!(do_bump, false);
assert_eq!(do_bump, true, "target state change requires replicaset config version bump");
storage.do_dml(&dml).unwrap();
let instance = storage.instances.get(&InstanceId::from("i1")).unwrap();
......
......@@ -100,24 +100,6 @@ impl Request {
/// **This function yields**
#[inline(always)]
pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) -> Result<()> {
handle_update_instance_request_in_governor_and_also_wait_too(req, &[], timeout)
}
/// Processes the [`crate::rpc::update_instance::Request`] and appends
/// the corresponding operation along with the provided `additional_dml` entry
/// to the raft log within a single [`Op::BatchDml`] (if successful).
///
/// **This function should be used directly only from governor** everywhere else
/// should use [`handle_update_instance_request_and_wait`] instead.
///
/// Returns `Ok(())` when the entry is committed.
///
/// **This function yields**
pub fn handle_update_instance_request_in_governor_and_also_wait_too(
req: Request,
additional_dml: &[Dml],
timeout: Duration,
) -> Result<()> {
let node = node::global()?;
let cluster_id = node.raft_storage.cluster_id()?;
let storage = &node.storage;
......@@ -130,18 +112,6 @@ pub fn handle_update_instance_request_in_governor_and_also_wait_too(
});
}
#[cfg(debug_assertions)]
for op in additional_dml {
match ClusterwideTable::from_i64(op.table_id() as _) {
Some(ClusterwideTable::Property | ClusterwideTable::Replicaset) => {
// Allowed
}
_ => {
panic!("for CaS safety reasons currently we only allow updating _pico_property or _pico_replicaset simultaneously with instance")
}
}
}
let deadline = fiber::clock().saturating_add(timeout);
loop {
let instance = storage.instances.get(&req.instance_id)?;
......@@ -195,13 +165,6 @@ pub fn handle_update_instance_request_in_governor_and_also_wait_too(
ops.push(vshard_bump);
}
if !additional_dml.is_empty() {
#[rustfmt::skip]
debug_assert!(!version_bump_needed, "there can only be 1 replicaset version bump");
// TODO: to eliminate redundant copies here we should refactor `BatchDml` and/or `Dml`
ops.extend_from_slice(additional_dml);
}
let op = Op::single_dml_or_batch(ops);
let ranges = vec![
......@@ -272,14 +235,12 @@ pub fn update_instance(
let mut replication_config_version_bump_needed = false;
if let Some(variant) = req.target_state {
let incarnation = match variant {
Online => {
replication_config_version_bump_needed = true;
instance.target_state.incarnation + 1
}
Online => instance.target_state.incarnation + 1,
Offline | Expelled => instance.current_state.incarnation,
};
let state = State::new(variant, incarnation);
if state != instance.target_state {
replication_config_version_bump_needed = true;
ops.assign("target_state", state)?;
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment