Newer
Older
use super::conf_change::raft_conf_change;

Georgy Moshkin
committed
use crate::column_name;
use crate::instance::state::StateVariant;
use crate::instance::{Instance, InstanceName};
use crate::plugin::PluginIdentifier;
use crate::plugin::TopologyUpdateOpKind;
use crate::replicaset::ReplicasetState;
use crate::replicaset::WeightOrigin;
use crate::replicaset::{Replicaset, ReplicasetName};
use crate::rpc::update_instance::prepare_update_instance_cas_request;
use crate::schema::{
PluginConfigRecord, PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey, ADMIN_ID,
};
use crate::storage::{ClusterwideTable, PropertyName};
use crate::sync::GetVclockRpc;
use crate::traft::error::Error;
use crate::traft::error::IdOfInstance;
use crate::traft::op::Dml;
use crate::traft::op::Op;
use crate::traft::op::PluginRaftOp;
use crate::traft::Result;
use crate::traft::{RaftId, RaftIndex, RaftTerm};
use crate::util::Uppercase;
use crate::warn_or_panic;
use std::collections::HashMap;
use std::collections::HashSet;
use tarantool::vclock::Vclock;
#[allow(clippy::too_many_arguments)]
pub(super) fn action_plan<'i>(
term: RaftTerm,
applied: RaftIndex,
existing_fds: &HashSet<Uppercase>,
peer_addresses: &'i HashMap<RaftId, String>,
voters: &[RaftId],
learners: &[RaftId],
replicasets: &HashMap<&ReplicasetName, &'i Replicaset>,
tiers: &HashMap<&str, &'i Tier>,
has_pending_schema_change: bool,
plugins: &HashMap<PluginIdentifier, PluginDef>,
services: &HashMap<PluginIdentifier, Vec<&'i ServiceDef>>,
plugin_op: Option<&'i PluginOp>,
sync_timeout: std::time::Duration,
// This function is specifically extracted, to separate the task
// construction from any IO and/or other yielding operations.
#[cfg(debug_assertions)]
let _guard = crate::util::NoYieldsGuard::new();
////////////////////////////////////////////////////////////////////////////
// transfer raft leadership
let Some(this_instance) = instances
.iter()
.find(|instance| instance.raft_id == my_raft_id)
else {
return Err(Error::NoSuchInstance(IdOfInstance::RaftId(my_raft_id)));
};
if has_states!(this_instance, * -> Offline) || has_states!(this_instance, * -> Expelled) {
let mut new_leader = None;
for instance in instances {
if has_states!(instance, * -> Offline) || has_states!(instance, * -> Expelled) {
continue;
}
if !voters.contains(&instance.raft_id) {
continue;
}
new_leader = Some(instance);
break;
}
if let Some(new_leader) = new_leader {
return Ok(TransferLeadership { to: new_leader }.into());
} else {
tlog!(Warning, "leader is going offline and no substitution is found";
"leader_raft_id" => my_raft_id,
"voters" => ?voters,
);
}
}
////////////////////////////////////////////////////////////////////////////
// conf change
if let Some(conf_change) = raft_conf_change(instances, voters, learners, tiers) {
return Ok(ConfChange { conf_change }.into());
}
////////////////////////////////////////////////////////////////////////////
// instance going offline non-gracefully
// TODO: it should be possible to move this step to where Expell is handled
let to_downgrade = instances
.iter()
.find(|instance| has_states!(instance, not Offline -> Offline));
if let Some(instance) = to_downgrade {
let instance_name = &instance.name;
let new_current_state = instance.target_state.variant.as_str();
let replicaset = *replicasets
.get(&instance.replicaset_name)
.expect("replicaset info is always present");
let tier = *tiers
.get(&*instance.tier)
.expect("tier info is always present");
// TODO: if this is replication leader, we should demote it and wait
// until someone is promoted in it's place (which implies synchronizing
// vclocks). Except that we can't do this reliably, as tarantool will
// stop accepting incoming connections once the on_shutdown even happens.
// This means that we can't reliably send rpc requests to instances with
// target state Offline and basically there's nothing we can do about
// such instances.
//
// Therefore basically the user should never expect that turning off a
// replication leader is safe. Instead it should always first transfer
// the replication leadership to another instance.
let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
.with_current_state(instance.target_state);
let cas_parameters =
prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;
let (ops, ranges) = cas_parameters.expect("already check current state is different");
let predicate = cas::Predicate::new(applied, ranges);
let op = Op::single_dml_or_batch(ops);
let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
return Ok(Downgrade {
new_current_state,
cas,
}
.into());
////////////////////////////////////////////////////////////////////////////
// update target replicaset master
let new_target_master = get_new_replicaset_master_if_needed(instances, replicasets);

Georgy Moshkin
committed
if let Some((to, replicaset)) = new_target_master {
debug_assert_eq!(to.replicaset_name, replicaset.name);
let mut ops = UpdateOps::new();
ops.assign(column_name!(Replicaset, target_master_name), &to.name)?;
let dml = Dml::update(
ClusterwideTable::Replicaset,
&[&to.replicaset_name],
ops,
ADMIN_ID,
)?;

Georgy Moshkin
committed
let ranges = vec![
cas::Range::new(ClusterwideTable::Instance).eq([&to.name]),
cas::Range::new(ClusterwideTable::Instance).eq([&replicaset.target_master_name]),

Georgy Moshkin
committed
];
let predicate = cas::Predicate::new(applied, ranges);
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
return Ok(UpdateTargetReplicasetMaster { cas }.into());
////////////////////////////////////////////////////////////////////////////
// configure replication
if let Some((replicaset, targets, replicaset_peers)) =
get_replicaset_to_configure(instances, peer_addresses, replicasets)
{
// Targets must not be empty, otherwise we would bump the version
// without actually calling the RPC.
debug_assert!(!targets.is_empty());
let replicaset_name = &replicaset.name;

Georgy Moshkin
committed
let mut master_name = None;
if replicaset.current_master_name == replicaset.target_master_name {
master_name = Some(&replicaset.current_master_name);

Georgy Moshkin
committed
}
let mut ops = UpdateOps::new();

Georgy Moshkin
committed
ops.assign(
column_name!(Replicaset, current_config_version),
replicaset.target_config_version,
)?;
let dml = Dml::update(
ClusterwideTable::Replicaset,
ops,
ADMIN_ID,
)?;
// Implicit ranges are sufficient
let predicate = cas::Predicate::new(applied, []);
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
let replication_config_version_actualize = cas;
return Ok(ConfigureReplication {
replicaset_name,
targets,
master_name,
replicaset_peers,
replication_config_version_actualize,
////////////////////////////////////////////////////////////////////////////
// replicaset master switchover
//
// This must be done after instances have (re)configured replication
// because master switchover requires synchronizing via tarantool replication.
let new_current_master = replicasets
.values()
.find(|r| r.current_master_name != r.target_master_name);
if let Some(r) = new_current_master {
let replicaset_name = &r.name;
let old_master_name = &r.current_master_name;
let new_master_name = &r.target_master_name;
let promotion_vclock = &r.promotion_vclock;
let mut replicaset_dml = UpdateOps::new();
replicaset_dml.assign(
column_name!(Replicaset, current_master_name),
new_master_name,
)?;
let mut bump_dml = vec![];
if let Some(bump) =
get_replicaset_config_version_bump_op_if_needed(replicasets, replicaset_name)
bump_dml.push(bump);
let tier_name = &r.tier;
let tier = tiers
.get(tier_name.as_str())
.expect("tier for instance should exists");
let vshard_config_version_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?;
if let Some(bump) = vshard_config_version_bump {
bump_dml.push(bump);
let ranges = vec![
// We make a decision based on this instance's state so the operation
// should fail in case there's a change to it in the uncommitted log
cas::Range::new(ClusterwideTable::Instance).eq([old_master_name]),
];
let old_master_may_respond = instances
.iter()
.find(|i| i.name == old_master_name)
.map(|i| i.may_respond());
if let Some(true) = old_master_may_respond {
let demote_rpc = rpc::replication::DemoteRequest { term };
let sync_rpc = rpc::replication::ReplicationSyncRequest {
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
vclock: promotion_vclock.clone(),
timeout: sync_timeout,
};
let master_actualize_dml = Dml::update(
ClusterwideTable::Replicaset,
&[replicaset_name],
replicaset_dml,
ADMIN_ID,
)?;
return Ok(ReplicasetMasterConsistentSwitchover {
replicaset_name,
old_master_name,
demote_rpc,
new_master_name,
sync_rpc,
promotion_vclock,
master_actualize_dml,
bump_dml,
ranges,
}
.into());
} else {
let get_vclock_rpc = GetVclockRpc {};
return Ok(ReplicasetMasterFailover {
old_master_name,
new_master_name,
get_vclock_rpc,
replicaset_name,
replicaset_dml,
bump_dml,
ranges,
}
.into());
////////////////////////////////////////////////////////////////////////////
// proposing automatic replicaset state & weight change
let to_change_weights = get_replicaset_state_change(instances, replicasets, tiers);
if let Some((replicaset_name, tier, need_to_update_weight)) = to_change_weights {
let mut uops = UpdateOps::new();
if need_to_update_weight {

Georgy Moshkin
committed
uops.assign(column_name!(Replicaset, weight), 1.)?;

Georgy Moshkin
committed
uops.assign(column_name!(Replicaset, state), ReplicasetState::Ready)?;
let dml = Dml::update(
ClusterwideTable::Replicaset,
uops,
ADMIN_ID,
)?;
let mut ranges = vec![];
let mut ops = vec![];
ranges.push(cas::Range::for_dml(&dml)?);
ops.push(dml);
let vshard_config_version_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?;
if let Some(bump) = vshard_config_version_bump {
ranges.push(cas::Range::for_dml(&bump)?);
ops.push(bump);
let op = Op::single_dml_or_batch(ops);
let predicate = cas::Predicate::new(applied, ranges);
let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
return Ok(ProposeReplicasetStateChanges { cas }.into());
for (&tier_name, &tier) in tiers.iter() {
////////////////////////////////////////////////////////////////////////////
// update current vshard config
let mut first_ready_replicaset = None;
if !tier.vshard_bootstrapped {
first_ready_replicaset =
get_first_ready_replicaset_in_tier(instances, replicasets, tier_name);
}
// Note: the following is a hack stemming from the fact that we have to work around vshard's weird quirks.
// Everything having to deal with bootstrapping vshard should be removed completely once we migrate to our custom sharding solution.
//
// Vshard will fail if we configure it with all replicaset weights set to 0.
// But we don't set a replicaset's weight until it's filled up to the replication factor.
// So we wait until at least one replicaset is filled (i.e. `first_ready_replicaset.is_some()`).
//
// Also if vshard has already been bootstrapped, the user can mess this up by setting all replicasets' weights to 0,
// which will break vshard configuration, but this will be the user's fault probably, not sure we can do something about it
let ok_to_configure_vshard = tier.vshard_bootstrapped || first_ready_replicaset.is_some();
if ok_to_configure_vshard
&& tier.current_vshard_config_version != tier.target_vshard_config_version
{
// Note at this point all the instances should have their replication configured,
// so it's ok to configure sharding for them
let targets = maybe_responding(instances)
.map(|instance| &instance.name)
.collect();
let rpc = rpc::sharding::Request {
term,
applied,
let mut uops = UpdateOps::new();
uops.assign(
column_name!(Tier, current_vshard_config_version),
tier.target_vshard_config_version,
)?;
let bump = Dml::update(ClusterwideTable::Tier, &[tier_name], uops, ADMIN_ID)?;
let ranges = vec![cas::Range::for_dml(&bump)?];
let predicate = cas::Predicate::new(applied, ranges);
let cas = cas::Request::new(bump, predicate, ADMIN_ID)?;
return Ok(UpdateCurrentVshardConfig {
targets,
rpc,
cas,
tier_name: tier_name.into(),
}
.into());
}
////////////////////////////////////////////////////////////////////////////
// bootstrap sharding on each tier
for (&tier_name, &tier) in tiers.iter() {
if tier.vshard_bootstrapped {
continue;
}
if let Some(r) = get_first_ready_replicaset_in_tier(instances, replicasets, tier_name) {
debug_assert!(
!tier.vshard_bootstrapped,
"bucket distribution only needs to be bootstrapped once"
);
let target = &r.current_master_name;
let tier_name = &r.tier;
let rpc = rpc::sharding::bootstrap::Request {
term,
applied,
tier: tier_name.into(),
};
let mut uops = UpdateOps::new();
uops.assign(column_name!(Tier, vshard_bootstrapped), true)?;
let dml = Dml::update(ClusterwideTable::Tier, &[tier_name], uops, ADMIN_ID)?;
let ranges = vec![cas::Range::for_dml(&dml)?];
let predicate = cas::Predicate::new(applied, ranges);
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
return Ok(ShardingBoot {
target,
rpc,
cas,
tier_name: tier_name.into(),
}
.into());
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
////////////////////////////////////////////////////////////////////////////
// expel replicaset
if let Some((master, replicaset, tier)) =
get_replicaset_being_expelled(instances, replicasets, tiers)
{
#[rustfmt::skip]
debug_assert!(has_states!(master, not Expelled -> Expelled), "{} -> {}", master.current_state, master.target_state);
let master_name = &master.name;
let target = &replicaset.current_master_name;
debug_assert_eq!(master_name, target);
let replicaset_name = replicaset.name.clone();
let rpc = rpc::sharding::WaitBucketCountRequest {
term,
applied,
timeout: sync_timeout,
expected_bucket_count: 0,
};
// Mark last instance as expelled
let req = rpc::update_instance::Request::new(master_name.clone(), cluster_name)
.with_current_state(master.target_state);
let update_instance =
prepare_update_instance_cas_request(&req, master, replicaset, tier, existing_fds)?;
let (mut ops, ranges) = update_instance.expect("already checked target state != current");
// Mark replicaset as expelled
let mut update_ops = UpdateOps::new();
update_ops.assign(column_name!(Replicaset, state), ReplicasetState::Expelled)?;
let dml = Dml::update(
ClusterwideTable::Replicaset,
&[&replicaset_name],
update_ops,
ADMIN_ID,
)?;
ops.push(dml);
let predicate = cas::Predicate::new(applied, ranges);
let dml = Op::BatchDml { ops };
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
return Ok(ExpelReplicaset {
replicaset_name,
target,
rpc,
cas,
}
.into());
}
////////////////////////////////////////////////////////////////////////////
// prepare a replicaset for expel
if let Some((tier, replicaset)) = get_replicaset_to_expel(instances, replicasets, tiers) {
// Master switchover happens on a governor step with higher priority
debug_assert_eq!(
replicaset.current_master_name,
replicaset.target_master_name
);
let replicaset_name = replicaset.name.clone();
let mut update_ops = UpdateOps::new();
update_ops.assign(column_name!(Replicaset, weight), 0.0)?;
#[rustfmt::skip]
update_ops.assign(column_name!(Replicaset, state), ReplicasetState::ToBeExpelled)?;
let mut ops = vec![];
let dml = Dml::update(
ClusterwideTable::Replicaset,
&[&replicaset_name],
update_ops,
ADMIN_ID,
)?;
ops.push(dml);
if let Some(bump) = Tier::get_vshard_config_version_bump_op_if_needed(tier)? {
ops.push(bump);
}
let ranges = vec![
// Decision was made based on this instance's state so we must make sure it was up to date.
cas::Range::new(ClusterwideTable::Instance).eq([&replicaset.current_master_name]),
// The rest of the ranges are implicit.
];
let predicate = cas::Predicate::new(applied, ranges);
let dml = Op::BatchDml { ops };
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
let replicaset_name = replicaset.name.clone();
return Ok(PrepareReplicasetForExpel {
replicaset_name,
cas,
}
.into());
}
////////////////////////////////////////////////////////////////////////////
// expel instance
let target = instances
.iter()
.find(|instance| has_states!(instance, not Expelled -> Expelled));
if let Some(instance) = target {
let instance_name = &instance.name;
let new_current_state = instance.target_state.variant.as_str();
let replicaset = *replicasets
.get(&instance.replicaset_name)
.expect("replicaset info is always present");
let tier = *tiers
.get(&*instance.tier)
.expect("tier info is always present");
let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
.with_current_state(instance.target_state);
let cas_parameters =
prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;
let (ops, ranges) = cas_parameters.expect("already check current state is different");
let predicate = cas::Predicate::new(applied, ranges);
let op = Op::single_dml_or_batch(ops);
let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
return Ok(Downgrade {
new_current_state,
cas,
}
.into());
}
////////////////////////////////////////////////////////////////////////////
// to online
let to_online = instances
.iter()
.find(|instance| has_states!(instance, not Online -> Online) || instance.is_reincarnated());
if let Some(instance) = to_online {
let instance_name = &instance.name;
let target_state = instance.target_state;
debug_assert_eq!(target_state.variant, StateVariant::Online);
let new_current_state = target_state.variant.as_str();
let replicaset = *replicasets
.get(&instance.replicaset_name)
.expect("replicaset info is always present");
let tier = *tiers
.get(&*instance.tier)
.expect("tier info is always present");
let plugin_rpc = rpc::enable_all_plugins::Request {
term,
applied,
let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
.with_current_state(target_state);
let cas_parameters =
prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;
let (ops, ranges) = cas_parameters.expect("already check current state is different");
let predicate = cas::Predicate::new(applied, ranges);
let op = Op::single_dml_or_batch(ops);
let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
new_current_state,
cas,
////////////////////////////////////////////////////////////////////////////
// ddl
if has_pending_schema_change {

Maksim Kaitmazian
committed
let targets = rpc::replicasets_masters(replicasets, instances);
let rpc = rpc::ddl_apply::Request {
term,
applied,
};
return Ok(ApplySchemaChange { rpc, targets }.into());
}
////////////////////////////////////////////////////////////////////////////
if let Some(PluginOp::CreatePlugin {
manifest,
inherit_topology,
}) = plugin_op
{
let ident = manifest.plugin_identifier();
if plugins.get(&ident).is_some() {
warn_or_panic!(
"received a request to install a plugin which is already installed {ident:?}"
);
}
let mut targets = Vec::with_capacity(instances.len());
for i in maybe_responding(instances) {
let rpc = rpc::load_plugin_dry_run::Request {
term,
applied,
let plugin_def = manifest.plugin_def();
let mut ranges = vec![];
let mut ops = vec![];
let dml = Dml::replace(ClusterwideTable::Plugin, &plugin_def, ADMIN_ID)?;
ranges.push(cas::Range::for_dml(&dml)?);
ops.push(dml);
let ident = plugin_def.into_identifier();
for mut service_def in manifest.service_defs() {
if let Some(service_topology) = inherit_topology.get(&service_def.name) {
service_def.tiers = service_topology.clone();
}
let dml = Dml::replace(ClusterwideTable::Service, &service_def, ADMIN_ID)?;
ranges.push(cas::Range::for_dml(&dml)?);
ops.push(dml);
let config = manifest
.get_default_config(&service_def.name)
.expect("configuration should exist");
let config_records =
PluginConfigRecord::from_config(&ident, &service_def.name, config.clone())?;
for config_rec in config_records {
let dml = Dml::replace(ClusterwideTable::PluginConfig, &config_rec, ADMIN_ID)?;
ranges.push(cas::Range::for_dml(&dml)?);
ops.push(dml);
let dml = Dml::delete(
ClusterwideTable::Property,
&[PropertyName::PendingPluginOperation],
ADMIN_ID,
)?;
ranges.push(cas::Range::for_dml(&dml)?);
ops.push(dml);
let success_dml = Op::BatchDml { ops };
return Ok(CreatePlugin {
success_dml,
////////////////////////////////////////////////////////////////////////////
// enable plugin
if let Some(PluginOp::EnablePlugin {
plugin,
timeout: on_start_timeout,
let service_defs = services.get(plugin).map(|v| &**v).unwrap_or(&[]);
let targets = maybe_responding(instances).map(|i| &i.name).collect();
let rpc = rpc::enable_plugin::Request {
let mut ranges = vec![];
let mut success_dml = vec![];
let mut enable_ops = UpdateOps::new();

Georgy Moshkin
committed
enable_ops.assign(column_name!(PluginDef, enabled), true)?;
let dml = Dml::update(
ClusterwideTable::Plugin,
&[&plugin.name, &plugin.version],
enable_ops,
ADMIN_ID,
)?;
ranges.push(cas::Range::for_dml(&dml)?);
success_dml.push(dml);
for svc in service_defs {
if !svc.tiers.contains(&i.tier) {
continue;
let dml = Dml::replace(
ClusterwideTable::ServiceRouteTable,
&ServiceRouteItem::new_healthy(i.name.clone(), plugin, &svc.name),
)?;
ranges.push(cas::Range::for_dml(&dml)?);
success_dml.push(dml);
let dml = Dml::delete(
ClusterwideTable::Property,
&[PropertyName::PendingPluginOperation],
ADMIN_ID,
)?;
ranges.push(cas::Range::for_dml(&dml)?);
success_dml.push(dml);
let success_dml = Op::BatchDml { ops: success_dml };
////////////////////////////////////////////////////////////////////////////
// update service tiers
if let Some(PluginOp::AlterServiceTiers {
plugin,
service,
tier,
kind,
}) = plugin_op
{
let mut enable_targets = Vec::with_capacity(instances.len());
let mut disable_targets = Vec::with_capacity(instances.len());
let mut on_success_dml = vec![];
let mut ranges = vec![];
let plugin_def = plugins
.get(plugin)
.expect("operation for non existent plugin");
let service_def = *services
.get(plugin)
.expect("operation for non existent service")
.iter()
.find(|s| &s.name == service)
.expect("operation for non existent service");
let mut new_service_def = service_def.clone();
let new_tiers = &mut new_service_def.tiers;
if new_tiers.iter().all(|t| t != tier) {
new_tiers.push(tier.clone());
}
}
TopologyUpdateOpKind::Remove => {
new_tiers.retain(|t| t != tier);
// note: no need to enable/disable service and update routing table if plugin disabled
if plugin_def.enabled {
for i in maybe_responding(instances) {
// if instance in both new and old tiers - do nothing
if new_tiers.contains(&i.tier) && old_tiers.contains(&i.tier) {
continue;
}
if new_tiers.contains(&i.tier) {
enable_targets.push(&i.name);
let dml = Dml::replace(
&ServiceRouteItem::new_healthy(i.name.clone(), plugin, &service_def.name),
)?;
ranges.push(cas::Range::for_dml(&dml)?);
on_success_dml.push(dml);
disable_targets.push(&i.name);
plugin_name: &plugin.name,
plugin_version: &plugin.version,
let dml = Dml::delete(ClusterwideTable::ServiceRouteTable, &key, ADMIN_ID)?;
ranges.push(cas::Range::for_dml(&dml)?);
on_success_dml.push(dml);
let dml = Dml::replace(ClusterwideTable::Service, &new_service_def, ADMIN_ID)?;
ranges.push(cas::Range::for_dml(&dml)?);
on_success_dml.push(dml);
let dml = Dml::delete(
ClusterwideTable::Property,
&[PropertyName::PendingPluginOperation],
ADMIN_ID,
)?;
ranges.push(cas::Range::for_dml(&dml)?);
on_success_dml.push(dml);
let success_dml = Op::BatchDml {
ops: on_success_dml,
};
let enable_rpc = rpc::enable_service::Request {
term,
applied,
};
let disable_rpc = rpc::disable_service::Request {
term,
applied,
return Ok(AlterServiceTiers {
enable_targets,
disable_targets,
enable_rpc,
disable_rpc,
success_dml,
////////////////////////////////////////////////////////////////////////////
// no action needed
Ok(Plan::None)
}
macro_rules! define_plan {
(
$(
pub struct $stage:ident $(<$lt:tt>)? {
$(
$(#[$field_meta:meta])*
pub $field:ident: $field_ty:ty,
)+
}
)+
) => {
$(
pub struct $stage $(<$lt>)? {
$(
$(#[$field_meta])*
pub $field: $field_ty,
)+
}
impl<'i> From<$stage $(<$lt>)?> for Plan<'i> {
fn from(s: $stage $(<$lt>)?) -> Self {
Self::$stage(s)
}
}
)+
pub enum Plan<'i> {
None,
$(
$stage ( $stage $(<$lt>)? ),
)+
}
}
}
use stage::*;
define_plan! {
pub struct ConfChange {
pub conf_change: raft::prelude::ConfChangeV2,
}
pub struct UpdateCurrentVshardConfig<'i> {
/// Instances to send the `rpc` request to.
pub targets: Vec<&'i InstanceName>,
/// Request to call [`rpc::sharding::proc_sharding`] on `targets`.
pub rpc: rpc::sharding::Request,
/// Global DML operation which updates `current_vshard_config_version` in corresponding record of table `_pico_tier`.
pub cas: cas::Request,
/// Tier name to which the vshard configuration applies
pub tier_name: String,
pub struct TransferLeadership<'i> {
/// This instance should be nominated as next raft leader.
pub struct UpdateTargetReplicasetMaster {
/// Global DML operation which updates `target_master_name` in table `_pico_replicaset`.
pub cas: cas::Request,
pub struct ReplicasetMasterFailover<'i> {
/// This replicaset is changing it's master.
pub replicaset_name: &'i ReplicasetName,
/// This instance was master, but is now non responsive. Name only used for logging.
pub old_master_name: &'i InstanceName,
/// Request to call [`proc_get_vclock`] on the new master. This
/// vclock is going to be peristed as `promotion_vclock` (if it's
/// not already equal) of the given replicaset.
///
/// [`proc_get_vclock`]: crate::sync::proc_get_vclock
pub get_vclock_rpc: GetVclockRpc,
/// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the
/// promotion vclock in case the old master is not available.
///
/// [`proc_get_vclock`]: crate::sync::proc_get_vclock
pub new_master_name: &'i InstanceName,
/// Part of the global DML operation which updates a `_pico_replicaset` record
/// with the new values for `current_master_name` & `promotion_vclock`.
/// Note: it is only the part of the operation, because we don't know the promotion_vclock yet.
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
pub replicaset_dml: UpdateOps,
/// Optional operations to bump versions of replication and sharding configs.
pub bump_dml: Vec<Dml>,
/// Cas ranges for the operation.
pub ranges: Vec<cas::Range>,
}
pub struct ReplicasetMasterConsistentSwitchover<'i> {
/// This replicaset is changing it's master.
pub replicaset_name: &'i ReplicasetName,
/// This instance will be demoted.
pub old_master_name: &'i InstanceName,
/// Request to call [`rpc::replication::proc_replication_demote`] on old master.
pub demote_rpc: rpc::replication::DemoteRequest,
/// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the
/// promotion vclock in case the old master is not available.
///
/// [`proc_get_vclock`]: crate::sync::proc_get_vclock
pub new_master_name: &'i InstanceName,
/// Request to call [`rpc::replication::proc_replication_sync`] on new master.
pub sync_rpc: rpc::replication::ReplicationSyncRequest,
/// Current `promotion_vclock` value of given replicaset.
pub promotion_vclock: &'i Vclock,
/// Global DML operation which updates a `_pico_replicaset` record
/// with the new values for `current_master_name`.
/// Note: this dml will only be applied if after RPC requests it
/// turns out that the old master's vclock is not behind the current
/// promotion vclock. Otherwise the vclock from old master is going
/// to be persisted as the new `promotion_vclock`.
pub master_actualize_dml: Dml,
/// Optional operations to bump versions of replication and sharding configs.
pub bump_dml: Vec<Dml>,