Skip to content
Snippets Groups Projects
plan.rs 54.2 KiB
Newer Older
use super::conf_change::raft_conf_change;
use crate::has_states;
use crate::instance::state::StateVariant;
use crate::instance::{Instance, InstanceName};
use crate::plugin::PluginIdentifier;
use crate::plugin::PluginOp;
use crate::plugin::TopologyUpdateOpKind;
use crate::replicaset::ReplicasetState;
use crate::replicaset::WeightOrigin;
use crate::replicaset::{Replicaset, ReplicasetName};
use crate::rpc::update_instance::prepare_update_instance_cas_request;
use crate::schema::{
    PluginConfigRecord, PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey, ADMIN_ID,
};
use crate::storage::{ClusterwideTable, PropertyName};
use crate::sync::GetVclockRpc;
use crate::tier::Tier;
use crate::tlog;
use crate::traft::error::Error;
use crate::traft::error::IdOfInstance;
use crate::traft::op::Dml;
use crate::traft::op::Op;
#[allow(unused_imports)]
use crate::traft::op::PluginRaftOp;
use crate::traft::Result;
use crate::traft::{RaftId, RaftIndex, RaftTerm};
use ::tarantool::space::UpdateOps;
use tarantool::vclock::Vclock;

#[allow(clippy::too_many_arguments)]
pub(super) fn action_plan<'i>(
    term: RaftTerm,
    instances: &'i [Instance],
    existing_fds: &HashSet<Uppercase>,
    peer_addresses: &'i HashMap<RaftId, String>,
    voters: &[RaftId],
    learners: &[RaftId],
    replicasets: &HashMap<&ReplicasetName, &'i Replicaset>,
    my_raft_id: RaftId,
    has_pending_schema_change: bool,
    plugins: &HashMap<PluginIdentifier, PluginDef>,
    services: &HashMap<PluginIdentifier, Vec<&'i ServiceDef>>,
    plugin_op: Option<&'i PluginOp>,
    sync_timeout: std::time::Duration,
) -> Result<Plan<'i>> {
    // This function is specifically extracted, to separate the task
    // construction from any IO and/or other yielding operations.
    #[cfg(debug_assertions)]
    let _guard = crate::util::NoYieldsGuard::new();

    ////////////////////////////////////////////////////////////////////////////
    // transfer raft leadership
    let Some(this_instance) = instances
        .iter()
        .find(|instance| instance.raft_id == my_raft_id)
    else {
        return Err(Error::NoSuchInstance(IdOfInstance::RaftId(my_raft_id)));
    };
    if has_states!(this_instance, * -> Offline) || has_states!(this_instance, * -> Expelled) {
        let mut new_leader = None;
        for instance in instances {
            if has_states!(instance, * -> Offline) || has_states!(instance, * -> Expelled) {
                continue;
            }

            if !voters.contains(&instance.raft_id) {
                continue;
            }

            new_leader = Some(instance);
            break;
        }
        if let Some(new_leader) = new_leader {
            return Ok(TransferLeadership { to: new_leader }.into());
        } else {
            tlog!(Warning, "leader is going offline and no substitution is found";
                "leader_raft_id" => my_raft_id,
                "voters" => ?voters,
            );
        }
    }

    ////////////////////////////////////////////////////////////////////////////
    // conf change
    if let Some(conf_change) = raft_conf_change(instances, voters, learners, tiers) {
        return Ok(ConfChange { conf_change }.into());
    }

    ////////////////////////////////////////////////////////////////////////////
    // instance going offline non-gracefully
    // TODO: it should be possible to move this step to where Expell is handled
    let to_downgrade = instances
        .iter()
        .find(|instance| has_states!(instance, not Offline -> Offline));
    if let Some(instance) = to_downgrade {
        let instance_name = &instance.name;
        let new_current_state = instance.target_state.variant.as_str();

        let replicaset = *replicasets
            .get(&instance.replicaset_name)
            .expect("replicaset info is always present");
        let tier = *tiers
            .get(&*instance.tier)
            .expect("tier info is always present");

        // TODO: if this is replication leader, we should demote it and wait
        // until someone is promoted in it's place (which implies synchronizing
        // vclocks). Except that we can't do this reliably, as tarantool will
        // stop accepting incoming connections once the on_shutdown even happens.
        // This means that we can't reliably send rpc requests to instances with
        // target state Offline and basically there's nothing we can do about
        // such instances.
        //
        // Therefore basically the user should never expect that turning off a
        // replication leader is safe. Instead it should always first transfer
        // the replication leadership to another instance.

        let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
            .with_current_state(instance.target_state);
        let cas_parameters =
            prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;
        let (ops, ranges) = cas_parameters.expect("already check current state is different");
        let predicate = cas::Predicate::new(applied, ranges);
        let op = Op::single_dml_or_batch(ops);
        let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
        return Ok(Downgrade {
    ////////////////////////////////////////////////////////////////////////////
    // update target replicaset master
    let new_target_master = get_new_replicaset_master_if_needed(instances, replicasets);
    if let Some((to, replicaset)) = new_target_master {
        debug_assert_eq!(to.replicaset_name, replicaset.name);
        let mut ops = UpdateOps::new();
        ops.assign(column_name!(Replicaset, target_master_name), &to.name)?;
        let dml = Dml::update(
            cas::Range::new(ClusterwideTable::Instance).eq([&to.name]),
            cas::Range::new(ClusterwideTable::Instance).eq([&replicaset.target_master_name]),
        let predicate = cas::Predicate::new(applied, ranges);
        let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
        return Ok(UpdateTargetReplicasetMaster { cas }.into());
    ////////////////////////////////////////////////////////////////////////////
    if let Some((replicaset, targets, replicaset_peers)) =
        get_replicaset_to_configure(instances, peer_addresses, replicasets)
    {
        // Targets must not be empty, otherwise we would bump the version
        // without actually calling the RPC.
        debug_assert!(!targets.is_empty());
        let replicaset_name = &replicaset.name;
        let mut master_name = None;
        if replicaset.current_master_name == replicaset.target_master_name {
            master_name = Some(&replicaset.current_master_name);
        let mut ops = UpdateOps::new();
        ops.assign(
            column_name!(Replicaset, current_config_version),
            replicaset.target_config_version,
        )?;
        let dml = Dml::update(
            ClusterwideTable::Replicaset,
        // Implicit ranges are sufficient
        let predicate = cas::Predicate::new(applied, []);
        let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
        let replication_config_version_actualize = cas;
        return Ok(ConfigureReplication {
            replicaset_name,
            targets,
            master_name,
            replicaset_peers,
            replication_config_version_actualize,
    ////////////////////////////////////////////////////////////////////////////
    // replicaset master switchover
    //
    // This must be done after instances have (re)configured replication
    // because master switchover requires synchronizing via tarantool replication.
    let new_current_master = replicasets
        .values()
        .find(|r| r.current_master_name != r.target_master_name);
    if let Some(r) = new_current_master {
        let replicaset_name = &r.name;
        let old_master_name = &r.current_master_name;
        let new_master_name = &r.target_master_name;
        let promotion_vclock = &r.promotion_vclock;
        let mut replicaset_dml = UpdateOps::new();
        replicaset_dml.assign(
            column_name!(Replicaset, current_master_name),
            new_master_name,
        )?;
            get_replicaset_config_version_bump_op_if_needed(replicasets, replicaset_name)
        let tier_name = &r.tier;
        let tier = tiers
            .get(tier_name.as_str())
            .expect("tier for instance should exists");

        let vshard_config_version_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?;
        if let Some(bump) = vshard_config_version_bump {
        let ranges = vec![
            // We make a decision based on this instance's state so the operation
            // should fail in case there's a change to it in the uncommitted log
            cas::Range::new(ClusterwideTable::Instance).eq([old_master_name]),
        ];

        let old_master_may_respond = instances
            .iter()
            .find(|i| i.name == old_master_name)
            .map(|i| i.may_respond());
        if let Some(true) = old_master_may_respond {
            let demote_rpc = rpc::replication::DemoteRequest { term };
            let sync_rpc = rpc::replication::ReplicationSyncRequest {
                vclock: promotion_vclock.clone(),
                timeout: sync_timeout,
            };

            let master_actualize_dml = Dml::update(
                ClusterwideTable::Replicaset,
                &[replicaset_name],
                replicaset_dml,
                ADMIN_ID,
            )?;

            return Ok(ReplicasetMasterConsistentSwitchover {
                replicaset_name,
                old_master_name,
                demote_rpc,
                new_master_name,
                sync_rpc,
                promotion_vclock,
                master_actualize_dml,
                bump_dml,
                ranges,
            }
            .into());
        } else {
            let get_vclock_rpc = GetVclockRpc {};

            return Ok(ReplicasetMasterFailover {
                old_master_name,
                new_master_name,
                get_vclock_rpc,
                replicaset_name,
                replicaset_dml,
                bump_dml,
                ranges,
            }
            .into());
    ////////////////////////////////////////////////////////////////////////////
    // proposing automatic replicaset state & weight change
    let to_change_weights = get_replicaset_state_change(instances, replicasets, tiers);
    if let Some((replicaset_name, tier, need_to_update_weight)) = to_change_weights {
        let mut uops = UpdateOps::new();
            uops.assign(column_name!(Replicaset, weight), 1.)?;
        uops.assign(column_name!(Replicaset, state), ReplicasetState::Ready)?;
        let dml = Dml::update(
        let mut ranges = vec![];
        let mut ops = vec![];
        ranges.push(cas::Range::for_dml(&dml)?);
        ops.push(dml);

        let vshard_config_version_bump = Tier::get_vshard_config_version_bump_op_if_needed(tier)?;
        if let Some(bump) = vshard_config_version_bump {
            ranges.push(cas::Range::for_dml(&bump)?);
            ops.push(bump);
        let op = Op::single_dml_or_batch(ops);
        let predicate = cas::Predicate::new(applied, ranges);
        let cas = cas::Request::new(op, predicate, ADMIN_ID)?;

        return Ok(ProposeReplicasetStateChanges { cas }.into());
    for (&tier_name, &tier) in tiers.iter() {
        ////////////////////////////////////////////////////////////////////////////
        // update current vshard config
        let mut first_ready_replicaset = None;
        if !tier.vshard_bootstrapped {
            first_ready_replicaset =
                get_first_ready_replicaset_in_tier(instances, replicasets, tier_name);
        }
        // Note: the following is a hack stemming from the fact that we have to work around vshard's weird quirks.
        // Everything having to deal with bootstrapping vshard should be removed completely once we migrate to our custom sharding solution.
        //
        // Vshard will fail if we configure it with all replicaset weights set to 0.
        // But we don't set a replicaset's weight until it's filled up to the replication factor.
        // So we wait until at least one replicaset is filled (i.e. `first_ready_replicaset.is_some()`).
        //
        // Also if vshard has already been bootstrapped, the user can mess this up by setting all replicasets' weights to 0,
        // which will break vshard configuration, but this will be the user's fault probably, not sure we can do something about it
        let ok_to_configure_vshard = tier.vshard_bootstrapped || first_ready_replicaset.is_some();
        if ok_to_configure_vshard
            && tier.current_vshard_config_version != tier.target_vshard_config_version
        {
            // Note at this point all the instances should have their replication configured,
            // so it's ok to configure sharding for them
            let targets = maybe_responding(instances)
                .map(|instance| &instance.name)
                .collect();
            let rpc = rpc::sharding::Request {
                term,
                applied,
                timeout: sync_timeout,
            let mut uops = UpdateOps::new();
            uops.assign(
                column_name!(Tier, current_vshard_config_version),
                tier.target_vshard_config_version,
            )?;
            let bump = Dml::update(ClusterwideTable::Tier, &[tier_name], uops, ADMIN_ID)?;
            let ranges = vec![cas::Range::for_dml(&bump)?];
            let predicate = cas::Predicate::new(applied, ranges);
            let cas = cas::Request::new(bump, predicate, ADMIN_ID)?;

            return Ok(UpdateCurrentVshardConfig {
                targets,
                rpc,
                cas,
                tier_name: tier_name.into(),
            }
            .into());
        }
    ////////////////////////////////////////////////////////////////////////////
    // bootstrap sharding on each tier
    for (&tier_name, &tier) in tiers.iter() {
        if tier.vshard_bootstrapped {
            continue;
        }

        if let Some(r) = get_first_ready_replicaset_in_tier(instances, replicasets, tier_name) {
            debug_assert!(
                !tier.vshard_bootstrapped,
                "bucket distribution only needs to be bootstrapped once"
            );
            let target = &r.current_master_name;
            let tier_name = &r.tier;
            let rpc = rpc::sharding::bootstrap::Request {
                term,
                applied,
                timeout: sync_timeout,
                tier: tier_name.into(),
            };

            let mut uops = UpdateOps::new();
            uops.assign(column_name!(Tier, vshard_bootstrapped), true)?;

            let dml = Dml::update(ClusterwideTable::Tier, &[tier_name], uops, ADMIN_ID)?;

            let ranges = vec![cas::Range::for_dml(&dml)?];
            let predicate = cas::Predicate::new(applied, ranges);
            let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;

            return Ok(ShardingBoot {
                target,
                rpc,
                cas,
                tier_name: tier_name.into(),
            }
            .into());
    ////////////////////////////////////////////////////////////////////////////
    // expel replicaset
    if let Some((master, replicaset, tier)) =
        get_replicaset_being_expelled(instances, replicasets, tiers)
    {
        #[rustfmt::skip]
        debug_assert!(has_states!(master, not Expelled -> Expelled), "{} -> {}", master.current_state, master.target_state);

        let master_name = &master.name;
        let target = &replicaset.current_master_name;
        debug_assert_eq!(master_name, target);

        let replicaset_name = replicaset.name.clone();

        let rpc = rpc::sharding::WaitBucketCountRequest {
            term,
            applied,
            timeout: sync_timeout,
            expected_bucket_count: 0,
        };

        // Mark last instance as expelled
        let req = rpc::update_instance::Request::new(master_name.clone(), cluster_name)
            .with_current_state(master.target_state);
        let update_instance =
            prepare_update_instance_cas_request(&req, master, replicaset, tier, existing_fds)?;
        let (mut ops, ranges) = update_instance.expect("already checked target state != current");

        // Mark replicaset as expelled
        let mut update_ops = UpdateOps::new();
        update_ops.assign(column_name!(Replicaset, state), ReplicasetState::Expelled)?;
        let dml = Dml::update(
            ClusterwideTable::Replicaset,
            &[&replicaset_name],
            update_ops,
            ADMIN_ID,
        )?;
        ops.push(dml);

        let predicate = cas::Predicate::new(applied, ranges);
        let dml = Op::BatchDml { ops };
        let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
        return Ok(ExpelReplicaset {
            replicaset_name,
            target,
            rpc,
            cas,
        }
        .into());
    }

    ////////////////////////////////////////////////////////////////////////////
    // prepare a replicaset for expel
    if let Some((tier, replicaset)) = get_replicaset_to_expel(instances, replicasets, tiers) {
        // Master switchover happens on a governor step with higher priority
        debug_assert_eq!(
            replicaset.current_master_name,
            replicaset.target_master_name
        );
        let replicaset_name = replicaset.name.clone();

        let mut update_ops = UpdateOps::new();
        update_ops.assign(column_name!(Replicaset, weight), 0.0)?;
        #[rustfmt::skip]
        update_ops.assign(column_name!(Replicaset, state), ReplicasetState::ToBeExpelled)?;

        let mut ops = vec![];
        let dml = Dml::update(
            ClusterwideTable::Replicaset,
            &[&replicaset_name],
            update_ops,
            ADMIN_ID,
        )?;
        ops.push(dml);

        if let Some(bump) = Tier::get_vshard_config_version_bump_op_if_needed(tier)? {
            ops.push(bump);
        }

        let ranges = vec![
            // Decision was made based on this instance's state so we must make sure it was up to date.
            cas::Range::new(ClusterwideTable::Instance).eq([&replicaset.current_master_name]),
            // The rest of the ranges are implicit.
        ];

        let predicate = cas::Predicate::new(applied, ranges);
        let dml = Op::BatchDml { ops };
        let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
        let replicaset_name = replicaset.name.clone();
        return Ok(PrepareReplicasetForExpel {
            replicaset_name,
            cas,
        }
        .into());
    }

    ////////////////////////////////////////////////////////////////////////////
    // expel instance
    let target = instances
        .iter()
        .find(|instance| has_states!(instance, not Expelled -> Expelled));
    if let Some(instance) = target {
        let instance_name = &instance.name;
        let new_current_state = instance.target_state.variant.as_str();

        let replicaset = *replicasets
            .get(&instance.replicaset_name)
            .expect("replicaset info is always present");
        let tier = *tiers
            .get(&*instance.tier)
            .expect("tier info is always present");

        let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
            .with_current_state(instance.target_state);
        let cas_parameters =
            prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;
        let (ops, ranges) = cas_parameters.expect("already check current state is different");
        let predicate = cas::Predicate::new(applied, ranges);
        let op = Op::single_dml_or_batch(ops);
        let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
        return Ok(Downgrade {
    ////////////////////////////////////////////////////////////////////////////
    // to online
        .find(|instance| has_states!(instance, not Online -> Online) || instance.is_reincarnated());
    if let Some(instance) = to_online {
        let instance_name = &instance.name;
        let target_state = instance.target_state;
        debug_assert_eq!(target_state.variant, StateVariant::Online);
        let new_current_state = target_state.variant.as_str();

        let replicaset = *replicasets
            .get(&instance.replicaset_name)
            .expect("replicaset info is always present");
        let tier = *tiers
            .get(&*instance.tier)
            .expect("tier info is always present");

        let plugin_rpc = rpc::enable_all_plugins::Request {
            term,
            applied,
            timeout: sync_timeout,
        let req = rpc::update_instance::Request::new(instance_name.clone(), cluster_name)
            .with_current_state(target_state);
        let cas_parameters =
            prepare_update_instance_cas_request(&req, instance, replicaset, tier, existing_fds)?;

        let (ops, ranges) = cas_parameters.expect("already check current state is different");
        let predicate = cas::Predicate::new(applied, ranges);
        let op = Op::single_dml_or_batch(ops);
        let cas = cas::Request::new(op, predicate, ADMIN_ID)?;
        return Ok(ToOnline {
    ////////////////////////////////////////////////////////////////////////////
    // ddl
    if has_pending_schema_change {
        let targets = rpc::replicasets_masters(replicasets, instances);
        let rpc = rpc::ddl_apply::Request {
            term,
            applied,
            timeout: sync_timeout,
        };
        return Ok(ApplySchemaChange { rpc, targets }.into());
    }

godzie44's avatar
godzie44 committed
    ////////////////////////////////////////////////////////////////////////////
    // install plugin
    if let Some(PluginOp::CreatePlugin {
        manifest,
        inherit_topology,
    }) = plugin_op
    {
        let ident = manifest.plugin_identifier();
        if plugins.get(&ident).is_some() {
                "received a request to install a plugin which is already installed {ident:?}"
            );
        }
        let mut targets = Vec::with_capacity(instances.len());
        for i in maybe_responding(instances) {
        let rpc = rpc::load_plugin_dry_run::Request {
            term,
            applied,
            timeout: sync_timeout,
        let plugin_def = manifest.plugin_def();
        let mut ranges = vec![];
        let mut ops = vec![];

        let dml = Dml::replace(ClusterwideTable::Plugin, &plugin_def, ADMIN_ID)?;
        ranges.push(cas::Range::for_dml(&dml)?);
        ops.push(dml);
        let ident = plugin_def.into_identifier();
        for mut service_def in manifest.service_defs() {
            if let Some(service_topology) = inherit_topology.get(&service_def.name) {
                service_def.tiers = service_topology.clone();
            }
            let dml = Dml::replace(ClusterwideTable::Service, &service_def, ADMIN_ID)?;
            ranges.push(cas::Range::for_dml(&dml)?);
            ops.push(dml);

            let config = manifest
                .get_default_config(&service_def.name)
                .expect("configuration should exist");
            let config_records =
                PluginConfigRecord::from_config(&ident, &service_def.name, config.clone())?;

            for config_rec in config_records {
                let dml = Dml::replace(ClusterwideTable::PluginConfig, &config_rec, ADMIN_ID)?;
                ranges.push(cas::Range::for_dml(&dml)?);
                ops.push(dml);
            ClusterwideTable::Property,
            &[PropertyName::PendingPluginOperation],
            ADMIN_ID,
        )?;
        ranges.push(cas::Range::for_dml(&dml)?);
        ops.push(dml);
        let success_dml = Op::BatchDml { ops };
godzie44's avatar
godzie44 committed
        }

    ////////////////////////////////////////////////////////////////////////////
    // enable plugin
    if let Some(PluginOp::EnablePlugin {
        plugin,
        timeout: on_start_timeout,
        let service_defs = services.get(plugin).map(|v| &**v).unwrap_or(&[]);

        let targets = maybe_responding(instances).map(|i| &i.name).collect();
        let rpc = rpc::enable_plugin::Request {
godzie44's avatar
godzie44 committed
            term,
            applied,
            timeout: sync_timeout,
godzie44's avatar
godzie44 committed
        };
        let mut success_dml = vec![];
        let mut enable_ops = UpdateOps::new();
        enable_ops.assign(column_name!(PluginDef, enabled), true)?;
            ClusterwideTable::Plugin,
            &[&plugin.name, &plugin.version],
        )?;
        ranges.push(cas::Range::for_dml(&dml)?);
        success_dml.push(dml);
        for i in instances {
                if !svc.tiers.contains(&i.tier) {
                    continue;
                    ClusterwideTable::ServiceRouteTable,
                    &ServiceRouteItem::new_healthy(i.name.clone(), plugin, &svc.name),
                )?;
                ranges.push(cas::Range::for_dml(&dml)?);
                success_dml.push(dml);
godzie44's avatar
godzie44 committed
        }
            ClusterwideTable::Property,
            &[PropertyName::PendingPluginOperation],
            ADMIN_ID,
        )?;
        ranges.push(cas::Range::for_dml(&dml)?);
        success_dml.push(dml);
        let success_dml = Op::BatchDml { ops: success_dml };

        return Ok(EnablePlugin {
godzie44's avatar
godzie44 committed
            rpc,
            targets,
            on_start_timeout: *on_start_timeout,
godzie44's avatar
godzie44 committed
        }
        .into());
    }

    ////////////////////////////////////////////////////////////////////////////
    // update service tiers
    if let Some(PluginOp::AlterServiceTiers {
        plugin,
        service,
        tier,
        kind,
    }) = plugin_op
    {
godzie44's avatar
godzie44 committed
        let mut enable_targets = Vec::with_capacity(instances.len());
        let mut disable_targets = Vec::with_capacity(instances.len());
        let mut on_success_dml = vec![];
godzie44's avatar
godzie44 committed

        let plugin_def = plugins
            .get(plugin)
            .expect("operation for non existent plugin");
        let service_def = *services
            .get(plugin)
            .expect("operation for non existent service")
            .iter()
            .find(|s| &s.name == service)
            .expect("operation for non existent service");

Georgy Moshkin's avatar
Georgy Moshkin committed
        let mut new_service_def = service_def.clone();
        let new_tiers = &mut new_service_def.tiers;
            TopologyUpdateOpKind::Add => {
                if new_tiers.iter().all(|t| t != tier) {
                    new_tiers.push(tier.clone());
                }
            }
            TopologyUpdateOpKind::Remove => {
                new_tiers.retain(|t| t != tier);
Georgy Moshkin's avatar
Georgy Moshkin committed
        let old_tiers = &service_def.tiers;
godzie44's avatar
godzie44 committed

        // note: no need to enable/disable service and update routing table if plugin disabled
        if plugin_def.enabled {
            for i in maybe_responding(instances) {
godzie44's avatar
godzie44 committed
                // if instance in both new and old tiers - do nothing
                if new_tiers.contains(&i.tier) && old_tiers.contains(&i.tier) {
                    continue;
                }

                if new_tiers.contains(&i.tier) {
                    enable_targets.push(&i.name);
godzie44's avatar
godzie44 committed
                        ClusterwideTable::ServiceRouteTable,
                        &ServiceRouteItem::new_healthy(i.name.clone(), plugin, &service_def.name),
godzie44's avatar
godzie44 committed
                        ADMIN_ID,
                    )?;
                    ranges.push(cas::Range::for_dml(&dml)?);
                    on_success_dml.push(dml);
godzie44's avatar
godzie44 committed
                }

                if old_tiers.contains(&i.tier) {
                    disable_targets.push(&i.name);
godzie44's avatar
godzie44 committed
                    let key = ServiceRouteKey {
                        instance_name: &i.name,
                        plugin_name: &plugin.name,
                        plugin_version: &plugin.version,
godzie44's avatar
godzie44 committed
                        service_name: &service_def.name,
                    };
                    let dml = Dml::delete(ClusterwideTable::ServiceRouteTable, &key, ADMIN_ID)?;
                    ranges.push(cas::Range::for_dml(&dml)?);
                    on_success_dml.push(dml);
        let dml = Dml::replace(ClusterwideTable::Service, &new_service_def, ADMIN_ID)?;
        ranges.push(cas::Range::for_dml(&dml)?);
        on_success_dml.push(dml);
godzie44's avatar
godzie44 committed

            ClusterwideTable::Property,
            &[PropertyName::PendingPluginOperation],
            ADMIN_ID,
        )?;
        ranges.push(cas::Range::for_dml(&dml)?);
        on_success_dml.push(dml);
        let success_dml = Op::BatchDml {
            ops: on_success_dml,
        };

godzie44's avatar
godzie44 committed
        let enable_rpc = rpc::enable_service::Request {
            term,
            applied,
            timeout: sync_timeout,
godzie44's avatar
godzie44 committed
        };
        let disable_rpc = rpc::disable_service::Request {
            term,
            applied,
            timeout: sync_timeout,
        return Ok(AlterServiceTiers {
godzie44's avatar
godzie44 committed
            enable_targets,
            disable_targets,
            enable_rpc,
            disable_rpc,
godzie44's avatar
godzie44 committed
        }
        .into());
    }
godzie44's avatar
godzie44 committed

    ////////////////////////////////////////////////////////////////////////////
    // no action needed
    Ok(Plan::None)
}

macro_rules! define_plan {
    (
        $(
            pub struct $stage:ident $(<$lt:tt>)? {
                $(
                    $(#[$field_meta:meta])*
                    pub $field:ident: $field_ty:ty,
                )+
            }
        )+
    ) => {
        $(
            pub struct $stage $(<$lt>)? {
                $(
                    $(#[$field_meta])*
                    pub $field: $field_ty,
                )+
            }

            impl<'i> From<$stage $(<$lt>)?> for Plan<'i> {
                fn from(s: $stage $(<$lt>)?) -> Self {
                    Self::$stage(s)
                }
            }
        )+

        pub enum Plan<'i> {
            None,
            $(
                $stage ( $stage $(<$lt>)? ),
            )+
        }

    }
}

use stage::*;
godzie44's avatar
godzie44 committed

pub mod stage {
    use super::*;
godzie44's avatar
godzie44 committed
    use std::time::Duration;

    define_plan! {
        pub struct ConfChange {
            pub conf_change: raft::prelude::ConfChangeV2,
        }

        pub struct UpdateCurrentVshardConfig<'i> {
            /// Instances to send the `rpc` request to.
            pub targets: Vec<&'i InstanceName>,
            /// Request to call [`rpc::sharding::proc_sharding`] on `targets`.
            pub rpc: rpc::sharding::Request,
            /// Global DML operation which updates `current_vshard_config_version` in corresponding record of table `_pico_tier`.
            pub cas: cas::Request,
            /// Tier name to which the vshard configuration applies
            pub tier_name: String,
        pub struct TransferLeadership<'i> {
            /// This instance should be nominated as next raft leader.
            pub to: &'i Instance,
        }

        pub struct UpdateTargetReplicasetMaster {
            /// Global DML operation which updates `target_master_name` in table `_pico_replicaset`.
            pub cas: cas::Request,
        pub struct ReplicasetMasterFailover<'i> {
            /// This replicaset is changing it's master.
            pub replicaset_name: &'i ReplicasetName,

            /// This instance was master, but is now non responsive. Name only used for logging.
            pub old_master_name: &'i InstanceName,

            /// Request to call [`proc_get_vclock`] on the new master. This
            /// vclock is going to be peristed as `promotion_vclock` (if it's
            /// not already equal) of the given replicaset.
            ///
            /// [`proc_get_vclock`]: crate::sync::proc_get_vclock
            pub get_vclock_rpc: GetVclockRpc,

            /// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the
            /// promotion vclock in case the old master is not available.
            ///
            /// [`proc_get_vclock`]: crate::sync::proc_get_vclock
            pub new_master_name: &'i InstanceName,
            /// Part of the global DML operation which updates a `_pico_replicaset` record
            /// with the new values for `current_master_name` & `promotion_vclock`.
            /// Note: it is only the part of the operation, because we don't know the promotion_vclock yet.
            pub replicaset_dml: UpdateOps,

            /// Optional operations to bump versions of replication and sharding configs.
            pub bump_dml: Vec<Dml>,

            /// Cas ranges for the operation.
            pub ranges: Vec<cas::Range>,
        }

        pub struct ReplicasetMasterConsistentSwitchover<'i> {
            /// This replicaset is changing it's master.
            pub replicaset_name: &'i ReplicasetName,

            /// This instance will be demoted.
            pub old_master_name: &'i InstanceName,

            /// Request to call [`rpc::replication::proc_replication_demote`] on old master.
            pub demote_rpc: rpc::replication::DemoteRequest,

            /// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the
            /// promotion vclock in case the old master is not available.
            ///
            /// [`proc_get_vclock`]: crate::sync::proc_get_vclock
            pub new_master_name: &'i InstanceName,

            /// Request to call [`rpc::replication::proc_replication_sync`] on new master.
            pub sync_rpc: rpc::replication::ReplicationSyncRequest,

            /// Current `promotion_vclock` value of given replicaset.
            pub promotion_vclock: &'i Vclock,

            /// Global DML operation which updates a `_pico_replicaset` record
            /// with the new values for `current_master_name`.
            /// Note: this dml will only be applied if after RPC requests it
            /// turns out that the old master's vclock is not behind the current
            /// promotion vclock. Otherwise the vclock from old master is going
            /// to be persisted as the new `promotion_vclock`.
            pub master_actualize_dml: Dml,

            /// Optional operations to bump versions of replication and sharding configs.