Skip to content
Snippets Groups Projects
Commit 0a8f88d2 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: master is now promoted when configuring the replication

parent 53d0899e
No related branches found
No related tags found
1 merge request!1235use config version to trigger replication configuration instead of state Replicated
Pipeline #49292 failed
......@@ -65,6 +65,8 @@ with the `YY.MINOR.MICRO` scheme.
- Replicated is no longer a valid instance state.
- Removed stored procedure `.proc_replication_promote`.
### Lua API
- Update `pico.LUA_API_VERSION`: `3.1.0` -> `4.0.0`
......
......@@ -19,7 +19,6 @@ use crate::rpc::enable_service::proc_enable_service;
use crate::rpc::load_plugin_dry_run::proc_load_plugin_dry_run;
use crate::rpc::replication::proc_replication;
use crate::rpc::replication::proc_replication_demote;
use crate::rpc::replication::proc_replication_promote;
use crate::rpc::replication::SyncAndPromoteRequest;
use crate::rpc::sharding::bootstrap::proc_sharding_bootstrap;
use crate::rpc::sharding::proc_sharding;
......@@ -313,24 +312,6 @@ impl Loop {
node.propose_and_wait(op, Duration::from_secs(3))?
}
}
// FIXME: remove this, it will be done on the ConfigureReplication step
governor_step! {
"promoting new master" [
"new_master_id" => %new_master_id,
"replicaset_id" => %replicaset_id,
"vclock" => ?promotion_vclock,
]
async {
let sync_and_promote = SyncAndPromoteRequest {
vclock: promotion_vclock.clone(),
timeout: Loop::SYNC_TIMEOUT,
};
pool.call(new_master_id, proc_name!(proc_replication_promote), &sync_and_promote, Self::SYNC_TIMEOUT)?
.timeout(Self::SYNC_TIMEOUT)
.await?
}
}
}
Plan::Downgrade(Downgrade {
......@@ -370,6 +351,7 @@ impl Loop {
targets,
master_id,
replicaset_peers,
promotion_vclock,
replication_config_version_actualize,
}) => {
set_status(governor_status, "configure replication");
......@@ -377,13 +359,30 @@ impl Loop {
"configuring replication"
async {
let mut fs = vec![];
let mut rpc = rpc::replication::Request {
is_master: false,
let mut rpc = rpc::replication::ConfigureReplicationRequest {
// Is only specified for the master replica
sync_and_promote: None,
replicaset_peers,
};
let mut sync_and_promote = None;
if master_id.is_some() {
sync_and_promote = Some(SyncAndPromoteRequest {
vclock: promotion_vclock.clone(),
timeout: Loop::SYNC_TIMEOUT,
});
}
for instance_id in targets {
tlog!(Info, "calling rpc::replication"; "instance_id" => %instance_id);
rpc.is_master = Some(instance_id) == master_id;
rpc.sync_and_promote = None;
if master_id == Some(instance_id) {
let Some(sync) = sync_and_promote.take() else {
unreachable!("sync_and_promote request should only be sent to at most one replica");
};
rpc.sync_and_promote = Some(sync);
}
let resp = pool.call(instance_id, proc_name!(proc_replication), &rpc, Self::RPC_TIMEOUT)?;
fs.push(async move {
match resp.await {
......
......@@ -25,6 +25,7 @@ use crate::vshard::VshardConfig;
use crate::warn_or_panic;
use ::tarantool::space::UpdateOps;
use std::collections::HashMap;
use tarantool::vclock::Vclock;
use super::cc::raft_conf_change;
use super::Loop;
......@@ -186,12 +187,13 @@ pub(super) fn action_plan<'i>(
)?;
let replication_config_version_actualize = dml;
let promotion_vclock = &replicaset.promotion_vclock;
return Ok(ConfigureReplication {
// TODO: also send the promotion vclock
replicaset_id,
targets,
master_id,
replicaset_peers,
promotion_vclock,
replication_config_version_actualize,
}
.into());
......@@ -741,7 +743,10 @@ pub mod stage {
/// Request to call [`rpc::replication::proc_replication_demote`] on old master.
/// It is optional because we don't try demoting the old master if it's already offline.
pub demote: Option<rpc::replication::DemoteRequest>,
/// This instance will be promoted via RPC [`rpc::replication::proc_replication_promote`].
/// This is the new master. It will be sent a RPC [`proc_get_vclock`] to set the
/// promotion vclock in case the old master is not available.
///
/// [`proc_get_vclock`]: crate::sync::proc_get_vclock
pub new_master_id: &'i InstanceId,
/// Part of the global DML operation which updates a `_pico_replicaset` record
/// with the new values for `current_master_id` & `promotion_vclock`.
......@@ -765,7 +770,7 @@ pub mod stage {
}
pub struct ConfigureReplication<'i> {
/// This replicaset is being [re]configured. The id is only used for logging.
/// This replicaset is being (re)configured. The id is only used for logging.
pub replicaset_id: &'i ReplicasetId,
/// These instances belong to one replicaset and will be sent a
/// request to call [`rpc::replication::proc_replication`].
......@@ -773,8 +778,11 @@ pub mod stage {
/// This instance will also become the replicaset master.
/// This will be `None` if replicaset's current_master_id != target_master_id.
pub master_id: Option<&'i InstanceId>,
/// This is an explicit list of peer addresses (one for each target).
/// This is an explicit list of peer addresses.
pub replicaset_peers: Vec<String>,
/// The value of `promotion_vclock` column of the given replicaset.
/// It's used to synchronize new master before making it writable.
pub promotion_vclock: &'i Vclock,
/// Global DML operation which updates `current_config_version` in table `_pico_replicaset` for the given replicaset.
pub replication_config_version_actualize: Dml,
}
......
......@@ -17,7 +17,7 @@ crate::define_rpc_request! {
/// Returns errors in the following cases:
/// 1. Lua error during call to `box.cfg`
/// 2. Storage failure
fn proc_replication(req: Request) -> Result<Response> {
fn proc_replication(req: ConfigureReplicationRequest) -> Result<Response> {
// TODO: check this configuration is newer then the one currently
// applied. For this we'll probably need to store the governor's applied
// index at the moment of request generation in box.space._schema on the
......@@ -34,75 +34,73 @@ crate::define_rpc_request! {
// and ignores it if nothing changed
set_cfg_field("replication", &replication_cfg)?;
// We do this everytime because firstly it helps when waking up.
// And secondly just in case, it doesn't hurt anyways.
if req.is_master {
set_cfg_field("read_only", false)?;
if let Some(sync) = req.sync_and_promote {
crate::error_injection!("TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER" => return Err(Error::Timeout));
synchronize_and_promote_to_master(sync)?;
} else {
// Everybody else should be read-only
set_cfg_field("read_only", true)?;
}
Ok(Response {})
}
/// Request to configure tarantool replication.
pub struct Request {
/// If the target replica should be made a replicaset `master`.
pub struct ConfigureReplicationRequest {
/// If this is not `None` the target replica will synchronize and become the new `master`.
/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
/// for more.
pub is_master: bool,
pub sync_and_promote: Option<SyncAndPromoteRequest>,
/// URIs of all replicas in the replicaset.
/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#confval-replication)
/// for more.
pub replicaset_peers: Vec<String>,
}
/// Response to [`replication::Request`].
///
/// [`replication::Request`]: Request
/// Response to [`ConfigureReplicationRequest`].
pub struct Response {}
}
crate::define_rpc_request! {
/// Promotes the target instance from read-only replica to master.
/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
/// for more.
///
/// Returns errors in the following cases: See implementation.
fn proc_replication_promote(req: SyncAndPromoteRequest) -> Result<SyncAndPromoteResponse> {
// TODO: find a way to guard against stale governor requests.
crate::sync::wait_vclock(req.vclock, req.timeout)?;
// XXX: Currently we just change the box.cfg.read_only option of the
// instance but at some point we will implement support for
// tarantool synchronous transactions then this operation will probably
// become more involved.
let lua = tarantool::lua_state();
let ro_reason: Option<tlua::StringInLua<_>> = lua.eval(
"box.cfg { read_only = false }
return box.info.ro_reason"
)?;
if let Some(ro_reason) = ro_reason.as_deref() {
tlog!(Warning, "failed to promote self to replication leader, reason = {ro_reason}");
return Err(Error::other(format!("instance is still in read only mode: {ro_reason}")));
}
// errors ignored because it must be already handled by plugin manager itself
_ = node::global()?.plugin_manager.handle_event_sync(PluginEvent::InstancePromote);
Ok(SyncAndPromoteResponse {})
}
/// Request to promote instance to tarantool replication leader.
pub struct SyncAndPromoteRequest {
pub vclock: Vclock,
pub timeout: Duration,
}
/// Promotes the target instance from read-only replica to master.
/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
/// for more.
///
/// Returns errors in the following cases: See implementation.
fn synchronize_and_promote_to_master(req: SyncAndPromoteRequest) -> Result<()> {
crate::sync::wait_vclock(req.vclock, req.timeout)?;
// XXX: Currently we just change the box.cfg.read_only option of the
// instance but at some point we will implement support for
// tarantool synchronous transactions then this operation will probably
// become more involved.
let lua = tarantool::lua_state();
let ro_reason: Option<tlua::StringInLua<_>> = lua.eval(
"box.cfg { read_only = false }
return box.info.ro_reason",
)?;
#[rustfmt::skip]
if let Some(ro_reason) = ro_reason.as_deref() {
tlog!(Warning, "failed to promote self to replication leader, reason = {ro_reason}");
return Err(Error::other(format!("instance is still in read only mode: {ro_reason}")));
};
// errors ignored because it must be already handled by plugin manager itself
_ = node::global()?
.plugin_manager
.handle_event_sync(PluginEvent::InstancePromote);
Ok(())
}
/// Response to [`replication::promote::Request`].
///
/// [`replication::promote::Request`]: Request
pub struct SyncAndPromoteResponse {}
/// Request to promote instance to tarantool replication leader.
#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)]
pub struct SyncAndPromoteRequest {
pub vclock: Vclock,
pub timeout: Duration,
}
impl ::tarantool::tuple::Encode for SyncAndPromoteRequest {}
crate::define_rpc_request! {
/// Demotes the target instance from master to read-only replica.
......@@ -124,9 +122,7 @@ crate::define_rpc_request! {
/// Request to promote instance to tarantool replication leader.
pub struct DemoteRequest {}
/// Response to [`replication::promote::Request`].
///
/// [`replication::promote::Request`]: Request
/// Response to [`DemoteRequest`].
pub struct DemoteResponse {
pub vclock: Vclock,
}
......
......@@ -215,13 +215,9 @@ def test_replication_sync_before_master_switchover(cluster: Cluster):
i4 = cluster.add_instance(wait_online=True, replicaset_id="r99")
i5 = cluster.add_instance(wait_online=True, replicaset_id="r99")
# Temporarilly break i5's replication config, so that it's vclock is outdated.
print("\x1b[31mbreaking i5's replication config\x1b[0m")
i5.eval(
"""
replication_before = box.cfg.replication
box.cfg { replication = {} }
"""
# Make sure i5 will not be able to synchronize before promoting
i5.call(
"pico._inject_error", "TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER", True
)
# Do some storage modifications, which will need to be replicated.
......@@ -249,21 +245,20 @@ def test_replication_sync_before_master_switchover(cluster: Cluster):
master_vclock = get_vclock_without_local(i4)
# Wait until governor starts switching the replication leader from i4 to i5.
# Wait until governor switches the replicaset master from i4 to i5
# and tries to reconfigure replication between them which will require i5 to synchronize first.
# This will block until i5 synchronizes with old master, which it won't
# until we fix it's replication config.
# until the injected error is disabled.
time.sleep(1) # Just in case, nothing really relies on this sleep
wait_governor_status(i1, "transfer replication leader")
wait_governor_status(i1, "configure replication")
assert i5.eval("return box.space.mytable") is None
vclock = get_vclock_without_local(i5)
assert vclock != master_vclock
# i5 does not become writable until it synchronizes
assert i5.eval("return box.info.ro") is True
# Fix i5's replication config, so it's able to continue synching.
print("\x1b[32mfixing i5's replication config\x1b[0m")
i5.eval("box.cfg { replication = replication_before }")
# Uninject the error, so it's able to continue synching.
i5.call(
"pico._inject_error", "TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER", False
)
# Wait until governor finishes with all the needed changes.
wait_governor_status(i1, "idle")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment