Skip to content
Snippets Groups Projects
lib.rs 34.21 KiB
#![allow(unknown_lints)]
#![allow(clippy::too_many_arguments)]
#![allow(clippy::let_and_return)]
#![allow(clippy::needless_return)]
#![allow(clippy::needless_late_init)]
#![allow(clippy::unwrap_or_default)]
#![allow(clippy::redundant_static_lifetimes)]
use serde::{Deserialize, Serialize};

use ::raft::prelude as raft;
use ::tarantool::error::Error as TntError;
use ::tarantool::fiber::r#async::timeout::{self, IntoTimeout};
use ::tarantool::time::Instant;
use ::tarantool::tlua;
use ::tarantool::transaction::transaction;
use ::tarantool::{fiber, session};
use rpc::{join, update_instance};
use sql::otm::SqlStatTables;
use std::time::Duration;
use storage::Clusterwide;
use traft::RaftSpaceAccess;

use crate::access_control::user_by_id;
use crate::address::Address;
use crate::instance::Grade;
use crate::instance::GradeVariant::*;
use crate::instance::Instance;
use crate::plugin::*;
use crate::schema::ADMIN_ID;
use crate::schema::PICO_SERVICE_USER_NAME;
use crate::traft::error::Error;
use crate::traft::op;
use crate::util::effective_user_id;
use config::PicodataConfig;

mod access_control;
pub mod address;
pub mod audit;
mod bootstrap_entries;
pub mod cas;
pub mod cli;
pub mod config;
pub mod discovery;
pub mod error_injection;
pub mod failure_domain;
pub mod governor;
pub mod http_server;
pub mod info;
pub mod instance;
pub mod introspection;
pub mod ipc;
pub mod kvcell;
pub mod r#loop;
mod luamod;
pub mod mailbox;
pub mod on_shutdown;
mod pico_service;
pub mod plugin;
pub mod reachability;
pub mod replicaset;
pub mod rpc;
pub mod schema;
pub mod sentinel;
pub mod sql;
pub mod storage;
pub mod sync;
pub mod tarantool;
pub mod tier;
pub mod tlog;
pub mod traft;
pub mod util;
pub mod vshard;
pub mod yaml_value;

macro_rules! lua_preload {
    ($lua:ident, $module:literal, $path_prefix:literal, $path:literal) => {
        $lua.exec_with(
            "local module, path, code = ...
            package.preload[module] = load(code, '@'..path)",
            ($module, $path, include_str!(concat!($path_prefix, $path))),
        )
        .unwrap();
    };
}

fn preload_vshard() {
    let lua = ::tarantool::lua_state();

    macro_rules! preload {
        ($module:literal, $path:literal) => {
            lua_preload!(lua, $module, "../vshard/", $path)
        };
    }

    preload!("vshard", "vshard/init.lua");
    preload!("vshard.cfg", "vshard/cfg.lua");
    preload!("vshard.consts", "vshard/consts.lua");
    preload!("vshard.error", "vshard/error.lua");
    preload!("vshard.hash", "vshard/hash.lua");
    preload!("vshard.heap", "vshard/heap.lua");
    preload!("vshard.registry", "vshard/registry.lua");
    preload!("vshard.replicaset", "vshard/replicaset.lua");
    preload!("vshard.rlist", "vshard/rlist.lua");
    preload!("vshard.router", "vshard/router/init.lua");
    preload!("vshard.storage", "vshard/storage/init.lua");
    preload!("vshard.storage.ref", "vshard/storage/ref.lua");
    preload!(
        "vshard.storage.reload_evolution",
        "vshard/storage/reload_evolution.lua"
    );
    preload!("vshard.storage.sched", "vshard/storage/sched.lua");
    preload!("vshard.util", "vshard/util.lua");
    preload!("vshard.version", "vshard/version.lua");
}

fn init_sbroad() {
    let lua = ::tarantool::lua_state();

    macro_rules! preload {
        ($module:literal, $path:literal) => {
            lua_preload!(lua, $module, "../", $path);
        };
    }

    preload!("pgproto", "src/sql/pgproto.lua");
    preload!("sbroad", "src/sql/init.lua");
    preload!("sbroad.helper", "sbroad/sbroad-core/src/helper.lua");
    preload!(
        "sbroad.core-router",
        "sbroad/sbroad-core/src/core-router.lua"
    );
    preload!(
        "sbroad.core-storage",
        "sbroad/sbroad-core/src/core-storage.lua"
    );

    for (module, func) in &[
        ("sbroad", "sql"),
        ("pgproto", "pg_bind"),
        ("pgproto", "pg_close_stmt"),
        ("pgproto", "pg_close_portal"),
        ("pgproto", "pg_describe_stmt"),
        ("pgproto", "pg_describe_portal"),
        ("pgproto", "pg_execute"),
        ("pgproto", "pg_parse"),
        ("pgproto", "pg_statements"),
        ("pgproto", "pg_portals"),
        ("pgproto", "pg_close_client_stmts"),
        ("pgproto", "pg_close_client_portals"),
    ] {
        let program = format!(
            r#"
            _G.pico.{func} = require('{module}').{func};
            box.schema.func.create('pico.{func}', {{if_not_exists = true}});
            box.schema.role.grant('public', 'execute', 'function', 'pico.{func}',
                {{if_not_exists = true}})
            "#
        );
        lua.exec(&program).unwrap();
    }
}

#[link(name = "httpd")]
extern "C" {
    fn luaopen_http_lib(lua_state: tlua::LuaState) -> libc::c_int;
}

fn preload_http() {
    let lua = ::tarantool::lua_state();

    // Load C part of the library
    lua.exec_with(
        "package.preload['http.lib'] = ...;",
        tlua::CFunction::new(luaopen_http_lib),
    )
    .unwrap();

    macro_rules! preload {
        ($module:literal, $path:literal) => {
            lua_preload!(lua, $module, "../http/", $path);
        };
    }

    preload!("http.server", "http/server.lua");
    preload!("http.codes", "http/codes.lua");
    preload!("http.mime_types", "http/mime_types.lua");
}

fn start_http_server(Address { host, port, .. }: &Address) {
    tlog!(Info, "starting http server at {host}:{port}");
    let lua = ::tarantool::lua_state();
    lua.exec_with(
        r#"
        local host, port = ...;
        local httpd = require('http.server').new(host, port);
        httpd:start();
        _G.pico.httpd = httpd
        "#,
        (host, port),
    )
    .expect("failed to start http server");
    lua.exec_with(
        "pico.httpd:route({method = 'GET', path = 'api/v1/tiers' }, ...)",
        tlua::Function::new(|| -> _ {
            http_server::wrap_api_result!(http_server::http_api_tiers())
        }),
    )
    .expect("failed to add route api/v1/tiers to http server");
    lua.exec_with(
        "pico.httpd:route({method = 'GET', path = 'api/v1/cluster' }, ...)",
        tlua::Function::new(|| -> _ {
            http_server::wrap_api_result!(http_server::http_api_cluster())
        }),
    )
    .expect("failed to add route api/v1/cluster to http server")
}

#[cfg(feature = "webui")]
fn start_webui() {
    use ::tarantool::tlua::PushInto;
    use std::collections::HashMap;

    /// This structure is used to check that `json` contains valid data
    /// at deserialization.
    #[derive(Deserialize, Debug, PushInto)]
    struct File {
        body: String,
        mime: String,
    }

    let lua = ::tarantool::lua_state();
    let bundle = include_str!(env!("WEBUI_BUNDLE"));
    let bundle: HashMap<String, File> =
        serde_json::from_str(bundle).expect("failed to parse Web UI bundle");
    if !bundle.contains_key("index.html") {
        panic!("'index.html' not present in Web UI bundle")
    }
    lua.exec_with(
        "local bundle = ...;
        for filename, file in pairs(bundle) do
            local handler = function ()
                return {
                    status = 200,
                    headers = { ['content-type'] = file['mime'] },
                    body = file['body'],
                }
            end
            if filename == 'index.html' then
                pico.httpd:route({path = '/', method = 'GET'}, handler);
                pico.httpd:route({path = '/*path', method = 'GET'}, handler);
            end
            pico.httpd:route({path = '/' .. filename, method = 'GET'}, handler);
        end",
        bundle,
    )
    .expect("failed to add Web UI routes")
}

/// Initializes Tarantool stored procedures.
///
/// Those are used for inter-instance communication
/// (discovery, rpc, public proc api).
fn init_handlers() {
    let lua = ::tarantool::lua_state();
    for proc in ::tarantool::proc::all_procs().iter() {
        lua.exec_with(
            "local name, is_public = ...
            local proc_name = '.' .. name
            box.schema.func.create(proc_name, {language = 'C', if_not_exists = true})
            if is_public then
                box.schema.role.grant('public', 'execute', 'function', proc_name, {if_not_exists = true})
            end
            ",
            (proc.name(), proc.is_public()),
        )
        .expect("this shouldn't fail");
    }

    lua.exec(
        r#"
        box.schema.role.grant('public', 'execute', 'function', '.dispatch_query', {if_not_exists = true})
        "#).expect("grant execute on .dispatch_query to public should never fail");

    lua.exec(
        r#"
        box.schema.func.create('pico.cas', {if_not_exists = true});
        box.schema.role.grant('public', 'execute', 'function', 'pico.cas', {if_not_exists = true})
    "#,
    )
    .expect("pico.cas registration should never fail");
}

/// Sets interactive prompt to display `picodata>`.
fn set_console_prompt() {
    tarantool::exec(
        r#"
        local console = require('console')

        console.on_start(function(self)
            self.prompt = "picodata"
        end)
        "#,
    )
    .expect("setting prompt should never fail")
}

fn redirect_interactive_sql() {
    tarantool::exec(
        r#"
        local console = require('console')
        assert(pico.sql)
        console.set_sql_executor(pico.sql)
        "#,
    )
    .expect("overriding sql executor shouldn't fail")
}

/// Sets a check that will performed when a user is logging ini
/// Checks for user exceeding maximum number of login attempts and if user was blocked.
///
/// Also see [`storage::PropertyName::MaxLoginAttempts`].
fn set_login_check(storage: Clusterwide) {
    const MAX_ATTEMPTS_EXCEEDED: &str = "Maximum number of login attempts exceeded";
    const NO_LOGIN_PRIVILEGE: &str = "User does not have login privilege";

    enum Verdict {
        AuthOk,
        AuthFail,
        UnknownUser,
        UserBlocked(&'static str),
    }

    // Determines the outcome of an authentication attempt.
    let compute_auth_verdict = move |user_name: String, successful_authentication: bool| {
        use std::collections::hash_map::Entry;

        // If the user is pico service (used for internal communication) we don't perform any additional checks.
        // Map result to print audit message, tarantool handles auth automatically.
        //
        // The reason for not performaing checks is twofold:
        // 1. We might not have the user or required privileges in _pico_* spaces yet.
        // 2. We should never block pico service user or instances would loose ability to communicate
        // with each other.
        if user_name == PICO_SERVICE_USER_NAME {
            if successful_authentication {
                return Verdict::AuthOk;
            } else {
                return Verdict::AuthFail;
            }
        }

        // Switch to admin to access system spaces.
        let admin_guard = session::su(ADMIN_ID).expect("switching to admin should not fail");
        let Some(user) = storage
            .users
            .by_name(&user_name)
            .expect("accessing storage should not fail")
        else {
            // Prevent DOS attacks by first checking whether the user exists.
            // If it doesn't, we shouldn't even bother tracking its attempts.
            // Too many hashmap records will cause a global OOM event.
            debug_assert!(!successful_authentication);
            return Verdict::UnknownUser;
        };
        let max_login_attempts = storage
            .properties
            .max_login_attempts()
            .expect("accessing storage should not fail");
        if storage
            .privileges
            .get(user.id, "universe", 0, "login")
            .expect("storage should not fail")
            .is_none()
        {
            // User does not have login privilege so should not be allowed to connect.
            return Verdict::UserBlocked(NO_LOGIN_PRIVILEGE);
        }
        drop(admin_guard);

        // Borrowing will not panic as there are no yields while it's borrowed
        let mut attempts = storage.login_attempts.borrow_mut();
        match attempts.entry(user_name) {
            Entry::Occupied(e) if *e.get() >= max_login_attempts => {
                // The account is suspended until instance is restarted
                // or user is unlocked with `grant session` operation.
                //
                // See [`crate::storage::global_grant_privilege`]
                Verdict::UserBlocked(MAX_ATTEMPTS_EXCEEDED)
            }
            Entry::Occupied(mut e) => {
                if successful_authentication {
                    // Forget about previous failures
                    e.remove();
                    Verdict::AuthOk
                } else {
                    *e.get_mut() += 1;
                    Verdict::AuthFail
                }
            }
            Entry::Vacant(e) => {
                if successful_authentication {
                    Verdict::AuthOk
                } else {
                    // Remember the failure, but don't raise an error yet
                    e.insert(1);
                    Verdict::AuthFail
                }
            }
        }
    };

    let lua = ::tarantool::lua_state();
    lua.exec_with(
        "box.session.on_auth(...)",
        tlua::function3(move |user: String, status: bool, lua: tlua::LuaState| {
            match compute_auth_verdict(user.clone(), status) {
                Verdict::AuthOk => {
                    crate::audit!(
                        message: "successfully authenticated user `{user}`",
                        title: "auth_ok",
                        severity: High,
                        user: &user,
                        initiator: &user,
                        verdict: "user is not blocked",
                    );
                }
                Verdict::AuthFail => {
                    crate::audit!(
                        message: "failed to authenticate user `{user}`",
                        title: "auth_fail",
                        severity: High,
                        user: &user,
                        initiator: &user,
                        verdict: "user is not blocked",
                    );
                }
                Verdict::UnknownUser => {
                    crate::audit!(
                        message: "failed to authenticate unknown user `{user}`",
                        title: "auth_fail",
                        severity: High,
                        user: &user,
                        initiator: &user,
                        verdict: "user is not blocked",
                    );
                }
                Verdict::UserBlocked(err) => {
                    crate::audit!(
                        message: "failed to authenticate user `{user}`",
                        title: "auth_fail",
                        severity: High,
                        user: &user,
                        initiator: &user,
                        verdict: format_args!("{err}; user blocked"),
                    );

                    // Raises an error instead of returning it as a function result.
                    // This is the behavior required by `on_auth` trigger to drop the connection
                    // even if auth was successful. If auth failed the connection will be dropped automatically.
                    //
                    // All the drop implementations are called, no need to clean anything up.
                    tlua::error!(lua, "{}", err);
                }
            }
        }),
    )
    .expect("setting on auth trigger should not fail")
}

fn set_on_access_denied_audit_trigger() {
    let lua = ::tarantool::lua_state();

    lua.exec_with(
        "box.session.on_access_denied(...)",
        tlua::function4(
            move |privilege: String,
                  object_type: String, // dummy comment praising rustfmt
                  object: String,
                  _lua: tlua::LuaState| {
                let effective_user = effective_user_id();
                let privilege = privilege.to_lowercase();

                // we do not have box.session.user() equivalent that returns an id straight away
                // so we look up the user by id.
                // Note: since we're in a user context we may not have access to use space.
                let user = session::with_su(ADMIN_ID, || {
                    user_by_id(effective_user).expect("user exists").name
                })
                .expect("must be able to su into admin");
                crate::audit!(
                    message: "{privilege} access denied \
                        on {object_type} `{object}` \
                        for user `{user}`",
                    title: "access_denied",
                    severity: Medium,
                    privilege: &privilege,
                    object: &object,
                    object_type: &object_type,
                    user: &user,
                    initiator: &user,
                );
            },
        ),
    )
    .expect("setting on auth trigger should not fail")
}

#[allow(clippy::enum_variant_names)]
#[derive(Debug, Serialize, Deserialize)]
pub enum Entrypoint {
    StartDiscover,
    StartBoot,
    StartJoin { leader_address: String },
}

impl Entrypoint {
    pub fn exec(
        self,
        config: PicodataConfig,
        to_supervisor: ipc::Sender<IpcMessage>,
    ) -> Result<(), Error> {
        match self {
            Self::StartDiscover => start_discover(&config, to_supervisor)?,
            Self::StartBoot => start_boot(&config)?,
            Self::StartJoin { leader_address } => start_join(&config, leader_address)?,
        }

        Ok(())
    }
}

#[derive(Debug, Serialize, Deserialize)]
pub struct IpcMessage {
    pub next_entrypoint: Entrypoint,
    pub drop_db: bool,
}

/// Performs tarantool initialization calling `box.cfg` for the first time.
///
/// This function is called from:
///
/// - `start_discover`
/// - `start_boot`
/// - `start_join`
///
fn init_common(
    config: &PicodataConfig,
    cfg: &tarantool::Cfg,
) -> Result<(Clusterwide, RaftSpaceAccess), Error> {
    std::fs::create_dir_all(config.instance.data_dir()).unwrap();

    if let Some(log_config) = &cfg.log {
        tlog!(Info, "switching to log configuration: {log_config}");
    }
    // See doc comments in tlog.rs for explanation.
    tlog::set_core_logger_is_initialized(true);

    if let Err(e) = tarantool::set_cfg(cfg) {
        let tnt_err = ::tarantool::error::TarantoolError::last();
        let err_ty = tnt_err.error_type();
        if err_ty == "XlogError" {
            if let Some(config) = &config.instance.audit {
                // Init log with stab values for raft_id and gen.
                // We need to log the 'integrity_violation' event and
                // there is nowhere to take raft state from at the moment.
                audit::init(config, 0, 0);
                crate::audit!(
                    message: "integrity violation detected",
                    title: "integrity_violation",
                    severity: High,
                    error: format!("{err_ty}: {}", tnt_err.message()),
                )
            }
        }
        tlog::set_core_logger_is_initialized(false);
        return Err(Error::other(format!("core initialization failed: {e}")));
    }

    if config.instance.shredding() {
        tarantool::xlog_set_remove_file_impl();
    }

    // Load Lua libraries
    preload_vshard();
    preload_http();
    init_sbroad();

    tarantool::exec(
        "require 'compat' {
            c_func_iproto_multireturn = 'new',
        }",
    )
    .unwrap();

    set_console_prompt();
    redirect_interactive_sql();
    init_handlers();

    let storage = Clusterwide::try_get(true).expect("storage initialization should never fail");
    // init sbroad statistics tables: they must be created
    // after global tables, so that they don't use their ids.
    let _ = SqlStatTables::get_or_init();
    schema::init_user_pico_service();

    set_login_check(storage.clone());
    set_on_access_denied_audit_trigger();
    let raft_storage =
        RaftSpaceAccess::new().expect("raft storage initialization should never fail");
    Ok((storage.clone(), raft_storage))
}

fn start_discover(
    config: &PicodataConfig,
    to_supervisor: ipc::Sender<IpcMessage>,
) -> Result<(), Error> {
    tlog!(Info, "entering discovery phase");

    luamod::setup(config);
    assert!(tarantool::cfg().is_none());

    let cfg = tarantool::Cfg::for_discovery(config);
    let (storage, raft_storage) = init_common(config, &cfg)?;
    discovery::init_global(config.instance.peers().iter().map(|a| a.to_host_port()));

    if let Some(raft_id) = raft_storage.raft_id()? {
        // This is a restart, go to postjoin immediately.
        tarantool::set_cfg_field("read_only", true)?;
        let instance_id = raft_storage
            .instance_id()?
            .expect("instance_id should be already set");
        postjoin(config, storage, raft_storage)?;
        crate::audit!(
            message: "local database recovered on `{instance_id}`",
            title: "recover_local_db",
            severity: Low,
            instance_id: %instance_id,
            raft_id: %raft_id,
            initiator: "admin",
        );
        crate::audit!(
            message: "local database connected on `{instance_id}`",
            title: "connect_local_db",
            severity: Low,
            instance_id: %instance_id,
            raft_id: %raft_id,
            initiator: "admin",
        );
        return Ok(());
    }

    // Start listenning only after we've checked if this is a restart.
    // Postjoin phase has it's own idea of when to start listenning.
    tarantool::set_cfg_field("listen", config.instance.listen().to_host_port())
        .expect("setting listen port shouldn't fail");

    let role = discovery::wait_global();
    match role {
        discovery::Role::Leader { .. } => {
            let next_entrypoint = Entrypoint::StartBoot {};
            let msg = IpcMessage {
                next_entrypoint,
                drop_db: true,
            };
            to_supervisor.send(&msg);
            std::process::exit(0);
        }
        discovery::Role::NonLeader { leader } => {
            let next_entrypoint = Entrypoint::StartJoin {
                leader_address: leader,
            };
            let msg = IpcMessage {
                next_entrypoint,
                drop_db: true,
            };
            to_supervisor.send(&msg);
            std::process::exit(0);
        }
    }
}

fn start_boot(config: &PicodataConfig) -> Result<(), Error> {
    tlog!(Info, "entering cluster bootstrap phase");

    let tiers = config.cluster.tiers();
    let my_tier_name = config.instance.tier();
    if !tiers.contains_key(my_tier_name) {
        return Err(Error::other(format!(
            "invalid configuration: current instance is assigned tier '{my_tier_name}' which is not defined in the configuration file",
        )));
    };

    let instance = Instance::new(
        None,
        config.instance.instance_id.clone(),
        config.instance.replicaset_id.clone(),
        Grade::new(Offline, 0),
        Grade::new(Offline, 0),
        config.instance.failure_domain(),
        my_tier_name,
    );
    let raft_id = instance.raft_id;
    let instance_id = instance.instance_id.clone();

    luamod::setup(config);
    assert!(tarantool::cfg().is_none());

    let cfg = tarantool::Cfg::for_cluster_bootstrap(config, &instance);
    let (storage, raft_storage) = init_common(config, &cfg)?;

    let cs = raft::ConfState {
        voters: vec![raft_id],
        ..Default::default()
    };

    let bootstrap_entries = bootstrap_entries::prepare(config, &instance, &tiers);

    let hs = raft::HardState {
        term: traft::INIT_RAFT_TERM,
        commit: bootstrap_entries.len() as _,
        ..Default::default()
    };

    transaction(|| -> Result<(), TntError> {
        raft_storage.persist_raft_id(raft_id).unwrap();
        raft_storage.persist_instance_id(&instance_id).unwrap();
        raft_storage.persist_tier(my_tier_name).unwrap();
        raft_storage
            .persist_cluster_id(config.cluster_id())
            .unwrap();
        raft_storage.persist_entries(&bootstrap_entries).unwrap();
        raft_storage.persist_conf_state(&cs).unwrap();
        raft_storage.persist_hard_state(&hs).unwrap();
        Ok(())
    })
    .unwrap();

    postjoin(config, storage, raft_storage)?;
    // In this case `create_local_db` is logged in postjoin
    crate::audit!(
        message: "local database connected on `{instance_id}`",
        title: "connect_local_db",
        severity: Low,
        instance_id: %instance_id,
        raft_id: %raft_id,
        initiator: "admin",
    );

    Ok(())
}

fn start_join(config: &PicodataConfig, instance_address: String) -> Result<(), Error> {
    tlog!(Info, "joining cluster, peer address: {instance_address}");

    let req = join::Request {
        cluster_id: config.cluster_id().into(),
        instance_id: config.instance.instance_id().map(From::from),
        replicaset_id: config.instance.replicaset_id().map(From::from),
        advertise_address: config.instance.advertise_address().to_host_port(),
        failure_domain: config.instance.failure_domain(),
        tier: config.instance.tier().into(),
    };

    // Arch memo.
    // - There must be no timeouts. Retrying may lead to flooding the
    //   topology with phantom instances. No worry, specifying a
    //   particular `instance_id` for every instance protects from that
    //   flood.
    // - It's fine to retry "connection refused" errors.
    let resp: rpc::join::Response = loop {
        let now = Instant::now();
        // TODO: exponential delay
        let timeout = Duration::from_secs(1);
        match fiber::block_on(rpc::network_call(&instance_address, &req)) {
            Ok(resp) => {
                break resp;
            }
            Err(TntError::ConnectionClosed(e)) => {
                tlog!(Warning, "join request failed: {e}, retrying...");
                fiber::sleep(timeout.saturating_sub(now.elapsed()));
                continue;
            }
            Err(TntError::IO(e)) => {
                tlog!(Warning, "join request failed: {e}, retrying...");
                fiber::sleep(timeout.saturating_sub(now.elapsed()));
                continue;
            }
            Err(e) => {
                return Err(Error::other(format!(
                    "join request failed: {e}, shutting down..."
                )));
            }
        }
    };

    luamod::setup(config);
    assert!(tarantool::cfg().is_none());

    let cfg = tarantool::Cfg::for_instance_join(config, &resp);
    let (storage, raft_storage) = init_common(config, &cfg)?;

    let raft_id = resp.instance.raft_id;
    transaction(|| -> Result<(), TntError> {
        storage.instances.put(&resp.instance).unwrap();
        for traft::PeerAddress { raft_id, address } in resp.peer_addresses {
            storage.peer_addresses.put(raft_id, &address).unwrap();
        }
        raft_storage.persist_raft_id(raft_id).unwrap();
        raft_storage
            .persist_instance_id(&resp.instance.instance_id)
            .unwrap();
        raft_storage
            .persist_cluster_id(config.cluster_id())
            .unwrap();
        raft_storage.persist_tier(config.instance.tier()).unwrap();
        Ok(())
    })
    .unwrap();

    let instance_id = resp.instance.instance_id;
    postjoin(config, storage, raft_storage)?;
    crate::audit!(
        message: "local database created on `{instance_id}`",
        title: "create_local_db",
        severity: Low,
        instance_id: %instance_id,
        raft_id: %raft_id,
        initiator: "admin",
    );
    crate::audit!(
        message: "local database connected on `{instance_id}`",
        title: "connect_local_db",
        severity: Low,
        instance_id: %instance_id,
        raft_id: %raft_id,
        initiator: "admin",
    );
    Ok(())
}
fn postjoin(
    config: &PicodataConfig,
    storage: Clusterwide,
    raft_storage: RaftSpaceAccess,
) -> Result<(), Error> {
    tlog!(Info, "entering post-join phase");

    config.validate_storage(&storage, &raft_storage)?;

    if let Some(config) = &config.instance.audit {
        let raft_id = raft_storage
            .raft_id()
            .expect("failed to get raft_id for audit log")
            .expect("found zero raft_id during audit log init");
        let gen = raft_storage.gen().expect("failed to get gen for audit log");
        audit::init(config, raft_id, gen);
    }

    if let Some(plugins) = &config.instance.plugins {
        PluginList::global_init(plugins);
    }

    if let Some(addr) = &config.instance.http_listen {
        start_http_server(addr);
        if cfg!(feature = "webui") {
            tlog!(Info, "Web UI is enabled");
        } else {
            tlog!(Info, "Web UI is disabled");
        }
        #[cfg(feature = "webui")]
        start_webui();
    }
    // Execute postjoin script if present
    if let Some(ref script) = config.instance.deprecated_script {
        let l = ::tarantool::lua_state();
        l.exec_with("dofile(...)", script)
            .unwrap_or_else(|err| panic!("failed to execute postjoin script: {err}"))
    }

    // Reset the quorum BEFORE initializing the raft node.
    // Otherwise it may stuck on `box.cfg({replication})` call.
    tarantool::set_cfg_field("replication_connect_quorum", 0)
        .expect("changing replication_connect_quorum shouldn't fail");

    let node = traft::node::Node::init(storage.clone(), raft_storage.clone());
    let node = node.expect("failed initializing raft node");
    let raft_id = node.raft_id();

    let cs = raft_storage.conf_state().unwrap();
    if cs.voters == [raft_id] {
        #[rustfmt::skip]
        tlog!(Info, "this is the only voter in cluster, triggering election immediately");

        node.tick_and_yield(1); // apply configuration, if any
        node.campaign_and_yield().ok(); // trigger election immediately
        assert!(node.status().raft_state.is_leader());
    }

    tarantool::set_cfg_field("listen", config.instance.listen().to_host_port())
        .expect("changing listen port shouldn't fail");

    // Start admin console
    let socket_uri = util::validate_and_complete_unix_socket_path(&config.instance.admin_socket())?;
    let lua = ::tarantool::lua_state();
    lua.exec_with(r#"require('console').listen(...)"#, &socket_uri)?;

    if let Err(e) =
        tarantool::on_shutdown(move || fiber::block_on(on_shutdown::callback(PluginList::get())))
    {
        tlog!(Error, "failed setting on_shutdown trigger: {e}");
    }

    // We will shut down, if we don't receive a confirmation of target grade
    // change from leader before this time.
    let activation_deadline = Instant::now().saturating_add(Duration::from_secs(10));

    // This will be doubled on each retry.
    let mut retry_timeout = Duration::from_millis(250);

    // Activates instance
    loop {
        let Ok(instance) = storage.instances.get(&raft_id) else {
            // This can happen if for example a snapshot arrives
            // and we truncate _pico_instance (read uncommitted btw).
            // In this case we also just wait some more.
            let timeout = Duration::from_millis(100);
            fiber::sleep(timeout);
            continue;
        };
        let cluster_id = raft_storage
            .cluster_id()
            .expect("storage should never fail");
        // Doesn't have to be leader - can be any online peer
        let leader_id = node.status().leader_id;
        let leader_address = leader_id.and_then(|id| storage.peer_addresses.try_get(id).ok());
        let Some(leader_address) = leader_address else {
            // FIXME: don't hard code timeout
            let timeout = Duration::from_millis(250);
            tlog!(
                Debug,
                "leader address is still unkown, retrying in {timeout:?}"
            );
            fiber::sleep(timeout);
            continue;
        };

        tlog!(
            Info,
            "initiating self-activation of {}",
            instance.instance_id
        );
        let req = update_instance::Request::new(instance.instance_id, cluster_id)
            .with_target_grade(Online)
            .with_failure_domain(config.instance.failure_domain());
        let now = Instant::now();
        let fut = rpc::network_call(&leader_address, &req).timeout(activation_deadline - now);
        match fiber::block_on(fut) {
            Ok(update_instance::Response {}) => {
                break;
            }
            Err(timeout::Error::Failed(TntError::Tcp(e))) => {
                let timeout = retry_timeout.saturating_sub(now.elapsed());
                retry_timeout *= 2;
                #[rustfmt::skip]
                tlog!(Warning, "failed to activate myself: {e}, retrying in {timeout:.02?}...");
                fiber::sleep(timeout);
                continue;
            }
            Err(timeout::Error::Failed(TntError::IO(e))) => {
                let timeout = retry_timeout.saturating_sub(now.elapsed());
                retry_timeout *= 2;
                #[rustfmt::skip]
                tlog!(Warning, "failed to activate myself: {e}, retrying in {timeout:.02?}...");
                fiber::sleep(timeout);
                continue;
            }
            Err(e) => {
                return Err(Error::other(format!(
                    "failed to activate myself: {e}, shutting down..."
                )));
            }
        }
    }

    // Wait for target grade to change to Online, so that sentinel doesn't send
    // a redundant update instance request.
    // Otherwise incarnations grow by 2 every time.
    let timeout = Duration::from_secs(10);
    let deadline = fiber::clock().saturating_add(timeout);
    loop {
        if let Ok(instance) = storage.instances.get(&raft_id) {
            if has_grades!(instance, * -> Online) {
                tlog!(Info, "self-activated successfully");
                break;
            }
        } else {
            // This can happen if for example a snapshot arrives
            // and we truncate _pico_instance (read uncommitted btw).
            // In this case we also just wait some more.
        }
        if fiber::clock() > deadline {
            tlog!(
                Warning,
                "didn't receive confirmation of self activation in time"
            );
            break;
        }
        let index = node.get_index();
        _ = node.wait_index(index + 1, deadline.duration_since(fiber::clock()));
    }

    node.sentinel_loop.on_self_activate();

    PluginList::get().iter().for_each(|plugin| {
        plugin.start();
    });

    Ok(())
}