diff --git a/CHANGELOG.md b/CHANGELOG.md index daabf7221b92640d8b44f1ba83b043e40da2492e..907aee02f6bdfed548c663aec5eb29530a0aa544 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -65,6 +65,8 @@ with the `YY.MINOR.MICRO` scheme. - Replicated is no longer a valid instance state. +- Removed stored procedure `.proc_replication_promote`. + ### Lua API - Update `pico.LUA_API_VERSION`: `3.1.0` -> `4.0.0` diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 830da7d6031b7c15c7db1feb570f37c6129d3419..d323b9457a275595b9ef73acb2d08dc450a0f1e4 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -19,7 +19,6 @@ use crate::rpc::enable_service::proc_enable_service; use crate::rpc::load_plugin_dry_run::proc_load_plugin_dry_run; use crate::rpc::replication::proc_replication; use crate::rpc::replication::proc_replication_demote; -use crate::rpc::replication::proc_replication_promote; use crate::rpc::replication::SyncAndPromoteRequest; use crate::rpc::sharding::bootstrap::proc_sharding_bootstrap; use crate::rpc::sharding::proc_sharding; @@ -313,24 +312,6 @@ impl Loop { node.propose_and_wait(op, Duration::from_secs(3))? } } - - // FIXME: remove this, it will be done on the ConfigureReplication step - governor_step! { - "promoting new master" [ - "new_master_id" => %new_master_id, - "replicaset_id" => %replicaset_id, - "vclock" => ?promotion_vclock, - ] - async { - let sync_and_promote = SyncAndPromoteRequest { - vclock: promotion_vclock.clone(), - timeout: Loop::SYNC_TIMEOUT, - }; - pool.call(new_master_id, proc_name!(proc_replication_promote), &sync_and_promote, Self::SYNC_TIMEOUT)? - .timeout(Self::SYNC_TIMEOUT) - .await? - } - } } Plan::Downgrade(Downgrade { @@ -370,6 +351,7 @@ impl Loop { targets, master_id, replicaset_peers, + promotion_vclock, replication_config_version_actualize, }) => { set_status(governor_status, "configure replication"); @@ -377,13 +359,30 @@ impl Loop { "configuring replication" async { let mut fs = vec![]; - let mut rpc = rpc::replication::Request { - is_master: false, + let mut rpc = rpc::replication::ConfigureReplicationRequest { + // Is only specified for the master replica + sync_and_promote: None, replicaset_peers, }; + + let mut sync_and_promote = None; + if master_id.is_some() { + sync_and_promote = Some(SyncAndPromoteRequest { + vclock: promotion_vclock.clone(), + timeout: Loop::SYNC_TIMEOUT, + }); + } + for instance_id in targets { tlog!(Info, "calling rpc::replication"; "instance_id" => %instance_id); - rpc.is_master = Some(instance_id) == master_id; + rpc.sync_and_promote = None; + if master_id == Some(instance_id) { + let Some(sync) = sync_and_promote.take() else { + unreachable!("sync_and_promote request should only be sent to at most one replica"); + }; + rpc.sync_and_promote = Some(sync); + } + let resp = pool.call(instance_id, proc_name!(proc_replication), &rpc, Self::RPC_TIMEOUT)?; fs.push(async move { match resp.await { diff --git a/src/governor/plan.rs b/src/governor/plan.rs index b2da103846cbc7e3cd36b3df3630bba0b59fd4b3..4ce57447a1ccd60ed1640bd753c6ce180c33c255 100644 --- a/src/governor/plan.rs +++ b/src/governor/plan.rs @@ -25,6 +25,7 @@ use crate::vshard::VshardConfig; use crate::warn_or_panic; use ::tarantool::space::UpdateOps; use std::collections::HashMap; +use tarantool::vclock::Vclock; use super::cc::raft_conf_change; use super::Loop; @@ -186,12 +187,13 @@ pub(super) fn action_plan<'i>( )?; let replication_config_version_actualize = dml; + let promotion_vclock = &replicaset.promotion_vclock; return Ok(ConfigureReplication { - // TODO: also send the promotion vclock replicaset_id, targets, master_id, replicaset_peers, + promotion_vclock, replication_config_version_actualize, } .into()); @@ -741,7 +743,10 @@ pub mod stage { /// Request to call [`rpc::replication::proc_replication_demote`] on old master. /// It is optional because we don't try demoting the old master if it's already offline. pub demote: Option<rpc::replication::DemoteRequest>, - /// This instance will be promoted via RPC [`rpc::replication::proc_replication_promote`]. + /// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the + /// promotion vclock in case the old master is not available. + /// + /// [`proc_get_vclock`]: crate::sync::proc_get_vclock pub new_master_id: &'i InstanceId, /// Part of the global DML operation which updates a `_pico_replicaset` record /// with the new values for `current_master_id` & `promotion_vclock`. @@ -765,7 +770,7 @@ pub mod stage { } pub struct ConfigureReplication<'i> { - /// This replicaset is being [re]configured. The id is only used for logging. + /// This replicaset is being (re)configured. The id is only used for logging. pub replicaset_id: &'i ReplicasetId, /// These instances belong to one replicaset and will be sent a /// request to call [`rpc::replication::proc_replication`]. @@ -773,8 +778,11 @@ pub mod stage { /// This instance will also become the replicaset master. /// This will be `None` if replicaset's current_master_id != target_master_id. pub master_id: Option<&'i InstanceId>, - /// This is an explicit list of peer addresses (one for each target). + /// This is an explicit list of peer addresses. pub replicaset_peers: Vec<String>, + /// The value of `promotion_vclock` column of the given replicaset. + /// It's used to synchronize new master before making it writable. + pub promotion_vclock: &'i Vclock, /// Global DML operation which updates `current_config_version` in table `_pico_replicaset` for the given replicaset. pub replication_config_version_actualize: Dml, } diff --git a/src/rpc/replication.rs b/src/rpc/replication.rs index 83c6efdfea2fb106977291cfd416e6b828cba319..dd33bf315e58b065127e01c3fa4c4eaa26975857 100644 --- a/src/rpc/replication.rs +++ b/src/rpc/replication.rs @@ -17,7 +17,7 @@ crate::define_rpc_request! { /// Returns errors in the following cases: /// 1. Lua error during call to `box.cfg` /// 2. Storage failure - fn proc_replication(req: Request) -> Result<Response> { + fn proc_replication(req: ConfigureReplicationRequest) -> Result<Response> { // TODO: check this configuration is newer then the one currently // applied. For this we'll probably need to store the governor's applied // index at the moment of request generation in box.space._schema on the @@ -34,75 +34,73 @@ crate::define_rpc_request! { // and ignores it if nothing changed set_cfg_field("replication", &replication_cfg)?; - // We do this everytime because firstly it helps when waking up. - // And secondly just in case, it doesn't hurt anyways. - if req.is_master { - set_cfg_field("read_only", false)?; + if let Some(sync) = req.sync_and_promote { + crate::error_injection!("TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER" => return Err(Error::Timeout)); + + synchronize_and_promote_to_master(sync)?; + } else { + // Everybody else should be read-only + set_cfg_field("read_only", true)?; } Ok(Response {}) } /// Request to configure tarantool replication. - pub struct Request { - /// If the target replica should be made a replicaset `master`. + pub struct ConfigureReplicationRequest { + /// If this is not `None` the target replica will synchronize and become the new `master`. /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only) /// for more. - pub is_master: bool, + pub sync_and_promote: Option<SyncAndPromoteRequest>, /// URIs of all replicas in the replicaset. /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#confval-replication) /// for more. pub replicaset_peers: Vec<String>, } - /// Response to [`replication::Request`]. - /// - /// [`replication::Request`]: Request + /// Response to [`ConfigureReplicationRequest`]. pub struct Response {} } -crate::define_rpc_request! { - /// Promotes the target instance from read-only replica to master. - /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only) - /// for more. - /// - /// Returns errors in the following cases: See implementation. - fn proc_replication_promote(req: SyncAndPromoteRequest) -> Result<SyncAndPromoteResponse> { - // TODO: find a way to guard against stale governor requests. - crate::sync::wait_vclock(req.vclock, req.timeout)?; - - // XXX: Currently we just change the box.cfg.read_only option of the - // instance but at some point we will implement support for - // tarantool synchronous transactions then this operation will probably - // become more involved. - let lua = tarantool::lua_state(); - let ro_reason: Option<tlua::StringInLua<_>> = lua.eval( - "box.cfg { read_only = false } - return box.info.ro_reason" - )?; - - if let Some(ro_reason) = ro_reason.as_deref() { - tlog!(Warning, "failed to promote self to replication leader, reason = {ro_reason}"); - return Err(Error::other(format!("instance is still in read only mode: {ro_reason}"))); - } - - // errors ignored because it must be already handled by plugin manager itself - _ = node::global()?.plugin_manager.handle_event_sync(PluginEvent::InstancePromote); - - Ok(SyncAndPromoteResponse {}) - } - - /// Request to promote instance to tarantool replication leader. - pub struct SyncAndPromoteRequest { - pub vclock: Vclock, - pub timeout: Duration, - } +/// Promotes the target instance from read-only replica to master. +/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only) +/// for more. +/// +/// Returns errors in the following cases: See implementation. +fn synchronize_and_promote_to_master(req: SyncAndPromoteRequest) -> Result<()> { + crate::sync::wait_vclock(req.vclock, req.timeout)?; + + // XXX: Currently we just change the box.cfg.read_only option of the + // instance but at some point we will implement support for + // tarantool synchronous transactions then this operation will probably + // become more involved. + let lua = tarantool::lua_state(); + let ro_reason: Option<tlua::StringInLua<_>> = lua.eval( + "box.cfg { read_only = false } + return box.info.ro_reason", + )?; + + #[rustfmt::skip] + if let Some(ro_reason) = ro_reason.as_deref() { + tlog!(Warning, "failed to promote self to replication leader, reason = {ro_reason}"); + return Err(Error::other(format!("instance is still in read only mode: {ro_reason}"))); + }; + + // errors ignored because it must be already handled by plugin manager itself + _ = node::global()? + .plugin_manager + .handle_event_sync(PluginEvent::InstancePromote); + + Ok(()) +} - /// Response to [`replication::promote::Request`]. - /// - /// [`replication::promote::Request`]: Request - pub struct SyncAndPromoteResponse {} +/// Request to promote instance to tarantool replication leader. +#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)] +pub struct SyncAndPromoteRequest { + pub vclock: Vclock, + pub timeout: Duration, } +impl ::tarantool::tuple::Encode for SyncAndPromoteRequest {} crate::define_rpc_request! { /// Demotes the target instance from master to read-only replica. @@ -124,9 +122,7 @@ crate::define_rpc_request! { /// Request to promote instance to tarantool replication leader. pub struct DemoteRequest {} - /// Response to [`replication::promote::Request`]. - /// - /// [`replication::promote::Request`]: Request + /// Response to [`DemoteRequest`]. pub struct DemoteResponse { pub vclock: Vclock, } diff --git a/test/int/test_replication.py b/test/int/test_replication.py index 99bdee5e1323a97d4b08351c32f47351db0126b6..5b30202264bd14d3ea1e7bfcdbded60278050547 100644 --- a/test/int/test_replication.py +++ b/test/int/test_replication.py @@ -215,13 +215,9 @@ def test_replication_sync_before_master_switchover(cluster: Cluster): i4 = cluster.add_instance(wait_online=True, replicaset_id="r99") i5 = cluster.add_instance(wait_online=True, replicaset_id="r99") - # Temporarilly break i5's replication config, so that it's vclock is outdated. - print("\x1b[31mbreaking i5's replication config\x1b[0m") - i5.eval( - """ - replication_before = box.cfg.replication - box.cfg { replication = {} } - """ + # Make sure i5 will not be able to synchronize before promoting + i5.call( + "pico._inject_error", "TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER", True ) # Do some storage modifications, which will need to be replicated. @@ -249,21 +245,20 @@ def test_replication_sync_before_master_switchover(cluster: Cluster): master_vclock = get_vclock_without_local(i4) - # Wait until governor starts switching the replication leader from i4 to i5. + # Wait until governor switches the replicaset master from i4 to i5 + # and tries to reconfigure replication between them which will require i5 to synchronize first. # This will block until i5 synchronizes with old master, which it won't - # until we fix it's replication config. + # until the injected error is disabled. time.sleep(1) # Just in case, nothing really relies on this sleep - wait_governor_status(i1, "transfer replication leader") + wait_governor_status(i1, "configure replication") - assert i5.eval("return box.space.mytable") is None - vclock = get_vclock_without_local(i5) - assert vclock != master_vclock # i5 does not become writable until it synchronizes assert i5.eval("return box.info.ro") is True - # Fix i5's replication config, so it's able to continue synching. - print("\x1b[32mfixing i5's replication config\x1b[0m") - i5.eval("box.cfg { replication = replication_before }") + # Uninject the error, so it's able to continue synching. + i5.call( + "pico._inject_error", "TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER", False + ) # Wait until governor finishes with all the needed changes. wait_governor_status(i1, "idle")