Skip to content
Snippets Groups Projects
replication.rs 8.48 KiB
Newer Older
//! # The wait_index policy
//!
//! Note that stored procedures in this module do not do [`Node::wait_index`].
//! This is different from all other stored procedures called by governor in
//! [`governor::Loop::iter_fn`]
//! (for example [`rpc::sharding::proc_sharding`] calls wait_index at the
//! start and so do most others procs).
//!
//! The surface-level reason for this difference is that raft_main_loop
//! ([`NodeImpl::advance`]) in some cases needs the tarantool replication
//! to be configured before it can advance the raft replication (which advances
//! the applied index which `wait_index` is waiting for). The specific place
//! where we the deadlock will happen is [`NodeImpl::prepare_for_snapshot`]
//! (see the "awaiting replication" status).
//!
//! The deeper reason is how our DDL is implemented. We have a
//! [`Op::DdlPrepare`] raft operation, which when applied in
//! [`NodeImpl::handle_committed_normal_entry`] is handled differently on
//! replicaset master vs read-only replica. Masters apply the changes (create
//! table, etc.) directly to the storage engine, while the read-only replicas
//! are simply waiting for the master to transfer to them the storage state via
//! tarantool replication. This means that raft_main_loop in some cases depends
//! on the tarantool replication being configured and hence tarantool
//! replication configuring ([`proc_replication`] is responsible for this) must
//! not depend on raft_main_loop ([`Node::wait_index`]).
//!
//! As for [`proc_replication_sync`] and [`proc_replication_demote`], they also
//! must not depend on `wait_index`, for a related reason. These procs are part
//! of replicaset master switchover step of the governor loop
//! (see [`plan::stage::Plan::ReplicasetMasterConsistentSwitchover`]).
//! And this step must also be done before we can advance the raft_main_loop,
//! because otherwise the instance would not know if it should apply the DDL
//! itself or wait for the tarantool replication.
//!
#[allow(unused_imports)]
use crate::governor;
#[allow(unused_imports)]
use crate::governor::plan;
use crate::pico_service::pico_service_password;
godzie44's avatar
godzie44 committed
use crate::plugin::PluginEvent;
#[allow(unused_imports)]
use crate::rpc;
use crate::schema::PICO_SERVICE_USER_NAME;
use crate::tarantool::set_cfg_field;
Georgy Moshkin's avatar
Georgy Moshkin committed
use crate::tlog;
use crate::traft::error::Error;
#[allow(unused_imports)]
use crate::traft::node::{Node, NodeImpl};
#[allow(unused_imports)]
use crate::traft::op::Op;
use crate::traft::{node, RaftTerm, Result};
use std::time::Duration;
use tarantool::tlua;
use tarantool::vclock::Vclock;
crate::define_rpc_request! {
Егор Ивков's avatar
Егор Ивков committed
    /// Configures replication on the target replica.
    /// Specifies addresses of all the replicas in the replicaset
    /// and whether the target instance should be a replicaset master.
    ///
    /// Returns errors in the following cases:
    /// 1. Lua error during call to `box.cfg`
    /// 2. Storage failure
    fn proc_replication(req: ConfigureReplicationRequest) -> Result<Response> {
        let node = node::global()?;
        // Must not call node.wait_index(...) here. See doc-comments at the top
        // of the file for explanation.
        node.status().check_term(req.term)?;

        // TODO: check this configuration is newer then the one currently
        // applied. For this we'll probably need to store the governor's applied
        // index at the moment of request generation in box.space._schema on the
        // requestee. And if the new request has index less then the one in our
        // _schema, then we ignore it.
        let mut replication_cfg = Vec::with_capacity(req.replicaset_peers.len());
        let password = pico_service_password();
        for address in &req.replicaset_peers {
            replication_cfg.push(format!("{PICO_SERVICE_USER_NAME}:{password}@{address}"))
        // box.cfg checks if the replication is already the same
        // and ignores it if nothing changed
        set_cfg_field("replication", &replication_cfg)?;
        if req.is_master {
            promote_to_master()?;
        } else {
            // Everybody else should be read-only
            set_cfg_field("read_only", true)?;
    /// Request to configure tarantool replication.
    pub struct ConfigureReplicationRequest {
        /// If this is `true` the target replica will become the new `master`.
Егор Ивков's avatar
Егор Ивков committed
        /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
        /// for more.
Егор Ивков's avatar
Егор Ивков committed
        /// URIs of all replicas in the replicaset.
        /// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#confval-replication)
        /// for more.
    /// Response to [`ConfigureReplicationRequest`].
crate::define_rpc_request! {
    /// Waits until instance synchronizes tarantool replication.
    fn proc_replication_sync(req: ReplicationSyncRequest) -> Result<ReplicationSyncResponse> {
        let node = node::global()?;
        // Must not call node.wait_index(...) here. See doc-comments at the top
        // of the file for explanation.
        node.status().check_term(req.term)?;

        debug_assert!(is_read_only()?);

        crate::error_injection!("TIMEOUT_WHEN_SYNCHING_BEFORE_PROMOTION_TO_MASTER" => return Err(Error::Timeout));

        // Wait until replication progresses.
        crate::sync::wait_vclock(req.vclock, req.timeout)?;

        Ok(ReplicationSyncResponse {})
    }

    /// Request to wait until instance synchronizes tarantool replication.
    pub struct ReplicationSyncRequest {
        /// Current term of the sender.
        pub term: RaftTerm,

        /// Wait until instance progresses replication past this vclock value.
        pub vclock: Vclock,

        /// Wait for this long.
        pub timeout: Duration,
    }

    pub struct ReplicationSyncResponse {}
}

fn is_read_only() -> Result<bool> {
    let lua = tarantool::lua_state();
    let ro = lua.eval("return box.info.ro")?;
    Ok(ro)
}

/// Promotes the target instance from read-only replica to master.
/// See [tarantool documentation](https://www.tarantool.io/en/doc/latest/reference/configuration/#cfg-basic-read-only)
/// for more.
///
/// Returns errors in the following cases: See implementation.
fn promote_to_master() -> Result<()> {
    // XXX: Currently we just change the box.cfg.read_only option of the
    // instance but at some point we will implement support for
    // tarantool synchronous transactions then this operation will probably
    // become more involved.
    let was_read_only = is_read_only()?;

    let lua = tarantool::lua_state();
    let ro_reason: Option<tlua::StringInLua<_>> = lua.eval(
        "box.cfg { read_only = false }
        return box.info.ro_reason",
    )?;

    #[rustfmt::skip]
    if let Some(ro_reason) = ro_reason.as_deref() {
        tlog!(Warning, "failed to promote self to replication leader, reason = {ro_reason}");
        return Err(Error::other(format!("instance is still in read only mode: {ro_reason}")));
    };

    if was_read_only {
        // errors ignored because it must be already handled by plugin manager itself
        _ = node::global()?
            .plugin_manager
            .handle_event_sync(PluginEvent::InstancePromote);
    }

crate::define_rpc_request! {
    /// Demotes the target instance from master to read-only replica.
    ///
    /// Returns errors in the following cases:
    /// 1. Lua error during call to `box.cfg`
    fn proc_replication_demote(req: DemoteRequest) -> Result<DemoteResponse> {
        let _ = req;
        // Must not call node.wait_index(...) here. See doc-comments at the top
        // of the file for explanation.
        node.status().check_term(req.term)?;

        let was_read_only = is_read_only()?;

        crate::tarantool::exec("box.cfg { read_only = true }")?;
godzie44's avatar
godzie44 committed

        if !was_read_only {
            // errors ignored because it must be already handled by plugin manager itself
            _ = node::global()?.plugin_manager.handle_event_sync(PluginEvent::InstanceDemote);
        }
godzie44's avatar
godzie44 committed

        let vclock = Vclock::current();
        let vclock = vclock.ignore_zero();
        Ok(DemoteResponse { vclock })
    }
    /// Request to promote instance to tarantool replication leader.
    pub struct DemoteRequest {
        pub term: RaftTerm,
    }
    /// Response to [`DemoteRequest`].
    pub struct DemoteResponse {
        pub vclock: Vclock,