//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.

use crate::governor;
use crate::has_grades;
use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::r#loop::FlowControl;
use crate::rpc;
use crate::schema::ddl_abort_on_master;
use crate::schema::{Distribution, IndexDef, SpaceDef};
use crate::storage::pico_schema_version;
use crate::storage::ToEntryIter as _;
use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
use crate::stringify_cfunc;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::notify::{notification, Notifier, Notify};
use crate::traft::op::{Ddl, Dml, Op, OpResult, PersistInstance};
use crate::traft::Address;
use crate::traft::ConnectionPool;
use crate::traft::ContextCoercion as _;
use crate::traft::LogicalClock;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::traft::Topology;
use crate::unwrap_some_or;
use crate::util::instant_saturating_add;
use crate::util::AnyWithTypeName;
use crate::warn_or_panic;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::index::FieldType as IFT;
use ::tarantool::index::Part;
use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::space::SpaceId;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use protobuf::Message as _;
use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;

type RawNode = raft::RawNode<RaftSpaceAccess>;

::tarantool::define_str_enum! {
    pub enum RaftState {
        Follower = "Follower",
        Candidate = "Candidate",
        Leader = "Leader",
        PreCandidate = "PreCandidate",
    }
}

impl RaftState {
    pub fn is_leader(&self) -> bool {
        matches!(self, Self::Leader)
    }
}

impl From<RaftStateRole> for RaftState {
    fn from(role: RaftStateRole) -> Self {
        match role {
            RaftStateRole::Follower => Self::Follower,
            RaftStateRole::Candidate => Self::Candidate,
            RaftStateRole::Leader => Self::Leader,
            RaftStateRole::PreCandidate => Self::PreCandidate,
        }
    }
}

#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
    /// `raft_id` of the current instance
    pub id: RaftId,
    /// `raft_id` of the leader instance
    pub leader_id: Option<RaftId>,
    /// Current term number
    pub term: RaftTerm,
    /// Current raft state
    pub raft_state: RaftState,
}

impl Status {
    pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
        if requested_term != self.term {
            return Err(Error::TermMismatch {
                requested: requested_term,
                current: self.term,
            });
        }
        Ok(())
    }
}

type StorageWatchers = HashMap<SpaceId, watch::Sender<()>>;
type StorageChanges = HashSet<SpaceId>;

/// The heart of `traft` module - the Node.
pub struct Node {
    /// RaftId of the Node.
    //
    // It appears twice in the Node: here and in `status.id`.
    // This is a concious decision.
    // `self.raft_id()` is used in Rust API, and
    // `self.status()` is mostly useful in Lua API.
    pub(crate) raft_id: RaftId,

    node_impl: Rc<Mutex<NodeImpl>>,
    pub(crate) storage: Clusterwide,
    pub(crate) raft_storage: RaftSpaceAccess,
    pub(crate) main_loop: MainLoop,
    pub(crate) governor_loop: governor::Loop,
    status: watch::Receiver<Status>,
    watchers: Rc<Mutex<StorageWatchers>>,
}

impl std::fmt::Debug for Node {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Node")
            .field("raft_id", &self.raft_id)
            .finish_non_exhaustive()
    }
}

impl Node {
    /// Initialize the raft node.
    /// **This function yields**
    pub fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> {
        let node_impl = NodeImpl::new(storage.clone(), raft_storage.clone())?;

        let raft_id = node_impl.raft_id();
        let status = node_impl.status.subscribe();

        let node_impl = Rc::new(Mutex::new(node_impl));
        let watchers = Rc::new(Mutex::new(HashMap::new()));

        let node = Node {
            raft_id,
            main_loop: MainLoop::start(node_impl.clone(), watchers.clone()), // yields
            governor_loop: governor::Loop::start(
                status.clone(),
                storage.clone(),
                raft_storage.clone(),
            ),
            node_impl,
            storage,
            raft_storage,
            status,
            watchers,
        };

        // Wait for the node to enter the main loop
        node.tick_and_yield(0);
        Ok(node)
    }

    pub fn raft_id(&self) -> RaftId {
        self.raft_id
    }

    pub fn status(&self) -> Status {
        self.status.get()
    }

    pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {
        self.node_impl.lock()
    }

    /// Wait for the status to be changed.
    /// **This function yields**
    pub fn wait_status(&self) {
        fiber::block_on(self.status.clone().changed()).unwrap();
    }

    /// **This function yields**
    pub fn wait_for_read_state(&self, timeout: Duration) -> traft::Result<RaftIndex> {
        let notify = self.raw_operation(|node_impl| node_impl.read_state_async())?;
        fiber::block_on(notify.recv_timeout::<RaftIndex>(timeout))
    }

    /// Propose an operation and wait for it's result.
    /// **This function yields**
    pub fn propose_and_wait<T: OpResult + Into<Op>>(
        &self,
        op: T,
        timeout: Duration,
    ) -> traft::Result<T::Result> {
        let notify = self.raw_operation(|node_impl| node_impl.propose_async(op))?;
        fiber::block_on(notify.recv_timeout::<T::Result>(timeout))
    }

    /// Become a candidate and wait for a main loop round so that there's a
    /// chance we become the leader.
    /// **This function yields**
    pub fn campaign_and_yield(&self) -> traft::Result<()> {
        self.raw_operation(|node_impl| node_impl.campaign())?;
        // Even though we don't expect a response, we still should let the
        // main_loop do an iteration. Without rescheduling, the Ready state
        // wouldn't be processed, the Status wouldn't be updated, and some
        // assertions may fail (e.g. in `postjoin()` in `main.rs`).
        fiber::reschedule();
        Ok(())
    }

    /// **This function yields**
    pub fn step_and_yield(&self, msg: raft::Message) {
        self.raw_operation(|node_impl| node_impl.step(msg))
            .map_err(|e| tlog!(Error, "{e}"))
            .ok();
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    }

    /// **This function yields**
    pub fn tick_and_yield(&self, n_times: u32) {
        self.raw_operation(|node_impl| node_impl.tick(n_times));
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    }

    /// **This function yields**
    pub fn timeout_now(&self) {
        let raft_id = self.raft_id();
        self.step_and_yield(raft::Message {
            to: raft_id,
            from: raft_id,
            msg_type: raft::MessageType::MsgTimeoutNow,
            ..Default::default()
        })
    }

    /// Processes the [`rpc::join::Request`] and appends necessary
    /// entries to the raft log (if successful).
    ///
    /// Returns the resulting [`Instance`] when the entry is committed.
    ///
    /// Returns an error if the callee node isn't a raft leader.
    ///
    /// **This function yields**
    pub fn handle_join_request_and_wait(
        &self,
        req: rpc::join::Request,
    ) -> traft::Result<(Box<Instance>, HashSet<Address>)> {
        let (notify_addr, notify_instance, replication_addresses) =
            self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?;
        fiber::block_on(async {
            let (addr, instance) = futures::join!(notify_addr.recv_any(), notify_instance.recv());
            addr?;
            instance.map(|i| (Box::new(i), replication_addresses))
        })
    }

    /// Processes the [`rpc::update_instance::Request`] and appends
    /// [`Op::PersistInstance`] entry to the raft log (if successful).
    ///
    /// Returns `Ok(())` when the entry is committed.
    ///
    /// Returns an error if the callee node isn't a raft leader.
    ///
    /// **This function yields**
    pub fn handle_update_instance_request_and_wait(
        &self,
        req: rpc::update_instance::Request,
    ) -> traft::Result<()> {
        let notify =
            self.raw_operation(|node_impl| node_impl.process_update_instance_request_async(req))?;
        fiber::block_on(notify.recv_any())?;
        Ok(())
    }

    /// Only the conf_change_loop on a leader is eligible to call this function.
    ///
    /// **This function yields**
    pub(crate) fn propose_conf_change_and_wait(
        &self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
    ) -> traft::Result<()> {
        let notify =
            self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
        fiber::block_on(notify).unwrap()?;
        Ok(())
    }

    /// Attempt to transfer leadership to a given node and yield.
    ///
    /// **This function yields**
    pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
        self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));
        fiber::reschedule();
    }

    /// This function **may yield** if `self.node_impl` mutex is acquired.
    #[inline]
    #[track_caller]
    fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
        let mut node_impl = self.node_impl.lock();
        let res = f(&mut node_impl);
        drop(node_impl);
        self.main_loop.wakeup();
        res
    }

    #[inline]
    pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
        self.raft_storage.all_traft_entries()
    }

    /// Returns a watch which will be notified when a clusterwide space is
    /// modified via the specified `index`.
    ///
    /// You can also pass a [`ClusterwideSpace`] in which case the space's
    /// primary index will be used.
    #[inline(always)]
    pub fn storage_watcher(&self, space: impl Into<SpaceId>) -> watch::Receiver<()> {
        use std::collections::hash_map::Entry;
        let mut watchers = self.watchers.lock();
        match watchers.entry(space.into()) {
            Entry::Vacant(entry) => {
                let (tx, rx) = watch::channel(());
                entry.insert(tx);
                rx
            }
            Entry::Occupied(entry) => entry.get().subscribe(),
        }
    }
}

pub(crate) struct NodeImpl {
    pub raw_node: RawNode,
    pub notifications: HashMap<LogicalClock, Notifier>,
    topology_cache: KVCell<RaftTerm, Topology>,
    joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
    storage: Clusterwide,
    raft_storage: RaftSpaceAccess,
    pool: ConnectionPool,
    lc: LogicalClock,
    status: watch::Sender<Status>,
}

impl NodeImpl {
    fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> {
        let box_err = |e| StorageError::Other(Box::new(e));

        let raft_id: RaftId = raft_storage.raft_id().map_err(box_err)?.unwrap();
        let applied: RaftIndex = raft_storage.applied().map_err(box_err)?.unwrap_or(0);
        let lc = {
            let gen = raft_storage.gen().unwrap().unwrap_or(0) + 1;
            raft_storage.persist_gen(gen).unwrap();
            LogicalClock::new(raft_id, gen)
        };

        let pool = ConnectionPool::builder(storage.clone())
            .handler_name(stringify_cfunc!(proc_raft_interact))
            .call_timeout(MainLoop::TICK.saturating_mul(4))
            .build();

        let cfg = raft::Config {
            id: raft_id,
            applied,
            pre_vote: true,
            ..Default::default()
        };

        let raw_node = RawNode::new(&cfg, raft_storage.clone(), &tlog::root())?;

        let (status, _) = watch::channel(Status {
            id: raft_id,
            leader_id: None,
            term: traft::INIT_RAFT_TERM,
            raft_state: RaftState::Follower,
        });

        Ok(Self {
            raw_node,
            notifications: Default::default(),
            topology_cache: KVCell::new(),
            joint_state_latch: KVCell::new(),
            storage,
            raft_storage,
            pool,
            lc,
            status,
        })
    }

    fn raft_id(&self) -> RaftId {
        self.raw_node.raft.id
    }

    /// Provides mutable access to the Topology struct which reflects
    /// uncommitted state of the cluster. Ensures the node is a leader.
    /// In case it's not — returns an error.
    ///
    /// It's important to access topology through this function so that
    /// new changes are consistent with uncommitted ones.
    fn topology_mut(&mut self) -> Result<&mut Topology, Error> {
        if self.raw_node.raft.state != RaftStateRole::Leader {
            self.topology_cache.take(); // invalidate the cache
            return Err(Error::NotALeader);
        }

        let current_term = self.raw_node.raft.term;

        let topology: Topology = unwrap_some_or! {
            self.topology_cache.take_or_drop(&current_term),
            {
                let mut instances = vec![];
                for instance @ Instance { raft_id, .. } in self.storage.instances.iter()? {
                    instances.push((instance, self.storage.peer_addresses.try_get(raft_id)?))
                }
                let replication_factor = self
                    .storage
                    .properties
                    .get(PropertyName::ReplicationFactor)?
                    .ok_or_else(|| Error::other("missing replication_factor value in storage"))?;
                Topology::new(instances).with_replication_factor(replication_factor)
            }
        };

        Ok(self.topology_cache.insert(current_term, topology))
    }

    pub fn read_state_async(&mut self) -> Result<Notify, RaftError> {
        // In some states `raft-rs` ignores the ReadIndex request.
        // Check it preliminary, don't wait for the timeout.
        //
        // See for details:
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
        let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
        let term_just_started = // ...
            self.raw_node.raft.state == RaftStateRole::Leader
            && !self.raw_node.raft.commit_to_current_term();

        if leader_doesnt_exist || term_just_started {
            return Err(RaftError::ProposalDropped);
        }

        let (lc, notify) = self.schedule_notification();
        // read_index puts this context into an Entry,
        // so we've got to compose full EntryContext,
        // despite single LogicalClock would be enough
        let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
        self.raw_node.read_index(ctx.to_bytes());
        Ok(notify)
    }

    /// **Doesn't yield**
    #[inline]
    // TODO: rename and document
    pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
    where
        T: Into<Op>,
    {
        let (lc, notify) = self.schedule_notification();
        let ctx = traft::EntryContextNormal::new(lc, op.into());
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        Ok(notify)
    }

    /// Proposes a raft entry to be appended to the log and returns raft index
    /// at which it is expected to be committed unless it gets rejected.
    ///
    /// **Doesn't yield**
    pub fn propose(&mut self, op: Op) -> Result<RaftIndex, RaftError> {
        self.lc.inc();
        let ctx = traft::EntryContextNormal::new(self.lc, op);
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        let index = self.raw_node.raft.raft_log.last_index();
        Ok(index)
    }

    pub fn campaign(&mut self) -> Result<(), RaftError> {
        self.raw_node.campaign()
    }

    pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
        if msg.to != self.raft_id() {
            return Ok(());
        }

        // TODO check it's not a MsgPropose with op::PersistInstance.
        // TODO check it's not a MsgPropose with ConfChange.
        self.raw_node.step(msg)
    }

    pub fn tick(&mut self, n_times: u32) {
        for _ in 0..n_times {
            self.raw_node.tick();
        }
    }

    /// Processes the [`rpc::join::Request`] and appends necessary
    /// entries to the raft log (if successful).
    ///
    /// Returns an error if the callee node isn't a Raft leader.
    ///
    /// **This function doesn't yield**
    pub fn process_join_request_async(
        &mut self,
        req: rpc::join::Request,
    ) -> traft::Result<(Notify, Notify, HashSet<Address>)> {
        let topology = self.topology_mut()?;
        let (instance, address, replication_addresses) = topology
            .join(
                req.instance_id,
                req.replicaset_id,
                req.advertise_address,
                req.failure_domain,
            )
            .map_err(RaftError::ConfChangeError)?;
        let peer_address = traft::PeerAddress {
            raft_id: instance.raft_id,
            address,
        };
        let op_addr = Dml::replace(ClusterwideSpaceId::Address, &peer_address).expect("can't fail");
        let op_instance = PersistInstance::new(instance);
        // Important! Calling `raw_node.propose()` may result in
        // `ProposalDropped` error, but the topology has already been
        // modified. The correct handling of this case should be the
        // following.
        //
        // The `topology_cache` should be preserved. It won't be fully
        // consistent anymore, but that's bearable. (TODO: examine how
        // the particular requests are handled). At least it doesn't
        // much differ from the case of overriding the entry due to a
        // re-election.
        //
        // On the other hand, dropping topology_cache may be much more
        // harmful. Loss of the uncommitted entries could result in
        // assigning the same `raft_id` to a two different nodes.
        Ok((
            self.propose_async(op_addr)?,
            self.propose_async(op_instance)?,
            replication_addresses,
        ))
    }

    /// Processes the [`rpc::update_instance::Request`] and appends
    /// [`Op::PersistInstance`] entry to the raft log (if successful).
    ///
    /// Returns an error if the callee node isn't a Raft leader.
    ///
    /// **This function doesn't yield**
    pub fn process_update_instance_request_async(
        &mut self,
        req: rpc::update_instance::Request,
    ) -> traft::Result<Notify> {
        let topology = self.topology_mut()?;
        let instance = topology
            .update_instance(req)
            .map_err(RaftError::ConfChangeError)?;
        // Important! Calling `raw_node.propose()` may result in
        // `ProposalDropped` error, but the topology has already been
        // modified. The correct handling of this case should be the
        // following.
        //
        // The `topology_cache` should be preserved. It won't be fully
        // consistent anymore, but that's bearable. (TODO: examine how
        // the particular requests are handled). At least it doesn't
        // much differ from the case of overriding the entry due to a
        // re-election.
        //
        // On the other hand, dropping topology_cache may be much more
        // harmful. Loss of the uncommitted entries could result in
        // assigning the same `raft_id` to a two different nodes.
        //
        Ok(self.propose_async(PersistInstance::new(instance))?)
    }

    fn propose_conf_change_async(
        &mut self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
    ) -> Result<oneshot::Receiver<Result<(), RaftError>>, RaftError> {
        // In some states proposing a ConfChange is impossible.
        // Check if there's a reason to reject it.

        // Checking leadership is only needed for the
        // correct latch management. It doesn't affect
        // raft correctness. Checking the instance is a
        // leader makes sure the proposed `ConfChange`
        // is appended to the raft log immediately
        // instead of sending `MsgPropose` over the
        // network.
        if self.raw_node.raft.state != RaftStateRole::Leader {
            return Err(RaftError::ConfChangeError("not a leader".into()));
        }

        if term != self.raw_node.raft.term {
            return Err(RaftError::ConfChangeError("raft term mismatch".into()));
        }

        // Without this check the node would silently ignore the conf change.
        // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
        if self.raw_node.raft.has_pending_conf() {
            return Err(RaftError::ConfChangeError(
                "already has pending confchange".into(),
            ));
        }

        let prev_index = self.raw_node.raft.raft_log.last_index();
        self.raw_node.propose_conf_change(vec![], conf_change)?;

        // Ensure the ConfChange was actually appended to the log.
        // Otherwise it's a problem: current instance isn't actually a
        // leader (which is impossible in theory, but we're not sure in
        // practice) and sent the message to the raft network. It may
        // lead to an inconsistency.
        let last_index = self.raw_node.raft.raft_log.last_index();
        assert_eq!(last_index, prev_index + 1);

        if !self.joint_state_latch.is_empty() {
            warn_or_panic!("joint state latch is locked");
        }

        let (tx, rx) = oneshot::channel();
        self.joint_state_latch.insert(last_index, tx);
        event::broadcast(Event::JointStateEnter);

        Ok(rx)
    }

    /// Is called during a transaction
    ///
    /// Returns `true` if wait_lsn is needed in `advance`.
    fn handle_committed_entries(
        &mut self,
        entries: &[raft::Entry],
        wake_governor: &mut bool,
        expelled: &mut bool,
        storage_changes: &mut StorageChanges,
    ) -> traft::Result<()> {
        let mut entries = entries.iter().peekable();

        while let Some(&entry) = entries.peek() {
            let entry = match traft::Entry::try_from(entry) {
                Ok(v) => v,
                Err(e) => {
                    tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
                    continue;
                }
            };

            let mut wait_lsn = false;
            start_transaction(|| -> tarantool::Result<()> {
                let entry_index = entry.index;
                match entry.entry_type {
                    raft::EntryType::EntryNormal => {
                        wait_lsn = self.handle_committed_normal_entry(
                            entry,
                            wake_governor,
                            expelled,
                            storage_changes,
                        );
                        if wait_lsn {
                            return Ok(());
                        }
                    }
                    raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
                        self.handle_committed_conf_change(entry)
                    }
                }

                let res = self.raft_storage.persist_applied(entry_index);
                event::broadcast(Event::EntryApplied);
                if let Err(e) = res {
                    tlog!(
                        Error,
                        "error persisting applied index: {e}";
                        "index" => entry_index
                    );
                }

                Ok(())
            })?;

            if wait_lsn {
                // TODO: this shouldn't ever happen for a raft leader,
                // but what if it does?
                // TODO: What if about we get elected leader after wait_lsn?
                if let Err(e) = self.wait_lsn() {
                    let timeout = MainLoop::TICK;
                    tlog!(
                        Warning,
                        "failed syncing with replication master: {e}, retrying in {:?}...",
                        timeout
                    );
                    fiber::sleep(timeout);
                }
                continue;
            }

            // Actually advance the iterator.
            let _ = entries.next();
        }

        Ok(())
    }

    /// Is called during a transaction
    ///
    /// Returns `true` if wait_lsn is needed in `advance`.
    fn handle_committed_normal_entry(
        &mut self,
        entry: traft::Entry,
        wake_governor: &mut bool,
        expelled: &mut bool,
        storage_changes: &mut StorageChanges,
    ) -> bool {
        assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
        let lc = entry.lc();
        let index = entry.index;
        let op = entry.into_op().unwrap_or(Op::Nop);
        tlog!(Debug, "applying entry: {op}"; "index" => index);

        match &op {
            Op::PersistInstance(PersistInstance(instance)) => {
                *wake_governor = true;
                storage_changes.insert(ClusterwideSpaceId::Instance.into());
                if has_grades!(instance, Expelled -> *) && instance.raft_id == self.raft_id() {
                    // cannot exit during a transaction
                    *expelled = true;
                }
            }
            Op::Dml(op) => {
                let space = op.space();
                if space == ClusterwideSpaceId::Property as SpaceId
                    || space == ClusterwideSpaceId::Replicaset as SpaceId
                {
                    *wake_governor = true;
                }
                storage_changes.insert(space);
            }
            Op::DdlPrepare { .. } => {
                *wake_governor = true;
            }
            _ => {}
        }

        let storage_properties = &self.storage.properties;

        // apply the operation
        let mut result = Box::new(()) as Box<dyn AnyWithTypeName>;
        match op {
            Op::Nop => {}
            Op::PersistInstance(op) => {
                let instance = op.0;
                self.storage.instances.put(&instance).unwrap();
                result = instance as _;
            }
            Op::Dml(op) => {
                let res = match &op {
                    Dml::Insert { space, tuple } => self.storage.insert(*space, tuple).map(Some),
                    Dml::Replace { space, tuple } => self.storage.replace(*space, tuple).map(Some),
                    Dml::Update { space, key, ops } => self.storage.update(*space, key, ops),
                    Dml::Delete { space, key } => self.storage.delete(*space, key),
                };
                result = Box::new(res) as _;
            }
            Op::DdlPrepare {
                ddl,
                schema_version,
            } => {
                self.apply_op_ddl_prepare(ddl, schema_version)
                    .expect("storage error");
            }
            Op::DdlCommit => {
                let current_version_in_tarantool = pico_schema_version().expect("storage error");
                let pending_version = storage_properties
                    .pending_schema_version()
                    .expect("storage error")
                    .expect("granted we don't mess up log compaction, this should not be None");
                // This instance is catching up to the cluster and must sync
                // with replication master on it's own.
                if current_version_in_tarantool < pending_version {
                    let is_master = self.is_replicaset_master().expect("storage_error");
                    if !is_master {
                        return true; // wait_lsn
                    }
                }

                // Update pico metadata.
                let ddl = storage_properties
                    .pending_schema_change()
                    .expect("storage error")
                    .expect("granted we don't mess up log compaction, this should not be None");
                match ddl {
                    Ddl::CreateSpace { id, .. } => {
                        self.storage
                            .spaces
                            .update_operable(id, true)
                            .expect("storage error");
                        self.storage
                            .indexes
                            .update_operable(id, 0, true)
                            .expect("storage error");
                        // For now we just assume that during space creation index with id 1
                        // exists if and only if it is a bucket_id index.
                        let res = self.storage.indexes.update_operable(id, 1, true);
                        // TODO: maybe we should first check if this index
                        // exists or check the space definition if this should
                        // be done, but for now we just ignore the error "no such index"
                        let _ = res;
                    }
                    _ => {
                        todo!()
                    }
                }

                // Update tarantool metadata.
                // This instance is catching up to the cluster and is a
                // replication master, so it must apply the schema change on it's
                // own.
                // FIXME: copy-pasted from above
                if current_version_in_tarantool < pending_version {
                    let is_master = self.is_replicaset_master().expect("storage_error");
                    if is_master {
                        let resp = rpc::ddl_apply::apply_schema_change(
                            &self.storage,
                            &ddl,
                            pending_version,
                        )
                        .expect("storage error");
                        match resp {
                            rpc::ddl_apply::Response::Abort { reason } => {
                                // There's no risk to brick the cluster at this point because
                                // the governor would have made sure the ddl applies on all
                                // active (at the time) instances. This instance is just catching
                                // up to the cluster and this failure will be at startup time.
                                tlog!(Critical, "failed applying already committed ddl operation: {reason}";
                                    "ddl" => ?ddl,
                                );
                                // TODO: add support to mitigate these failures
                                panic!("abort of a committed operation");
                            }
                            rpc::ddl_apply::Response::Ok => {}
                        }
                    }
                }

                storage_properties
                    .delete(PropertyName::PendingSchemaChange)
                    .expect("storage error");
                storage_properties
                    .delete(PropertyName::PendingSchemaVersion)
                    .expect("storage error");
                storage_properties
                    .put(PropertyName::CurrentSchemaVersion, &pending_version)
                    .expect("storage error");
            }
            Op::DdlAbort => {
                let current_version_in_tarantool = pico_schema_version().expect("storage error");
                let pending_version: u64 = storage_properties
                    .pending_schema_version()
                    .expect("storage error")
                    .expect("granted we don't mess up log compaction, this should not be None");
                // This condition means, schema versions must always increase
                // even after an DdlAbort
                if current_version_in_tarantool == pending_version {
                    let is_master = self.is_replicaset_master().expect("storage_error");
                    if !is_master {
                        // wait_lsn
                        return true;
                    }
                }

                // Update pico metadata.
                let ddl = storage_properties
                    .pending_schema_change()
                    .expect("storage error")
                    .expect("granted we don't mess up log compaction, this should not be None");
                match ddl {
                    Ddl::CreateSpace { id, .. } => {
                        self.storage.indexes.delete(id, 0).expect("storage error");
                        self.storage.spaces.delete(id).expect("storage error");
                    }
                    _ => {
                        todo!()
                    }
                }

                // Update tarantool metadata.
                // FIXME: copy-pasted from above
                if current_version_in_tarantool == pending_version {
                    let is_master = self.is_replicaset_master().expect("storage_error");
                    if is_master {
                        let current_version = storage_properties
                            .current_schema_version()
                            .expect("storage error");
                        ddl_abort_on_master(&ddl, current_version).expect("storage error");
                    }
                }

                storage_properties
                    .delete(PropertyName::PendingSchemaChange)
                    .expect("storage error");
                storage_properties
                    .delete(PropertyName::PendingSchemaVersion)
                    .expect("storage error");
            }
        }

        if let Some(lc) = &lc {
            if let Some(notify) = self.notifications.remove(lc) {
                notify.notify_ok_any(result);
            }
        }

        if let Some(notify) = self.joint_state_latch.take_or_keep(&index) {
            // It was expected to be a ConfChange entry, but it's
            // normal. Raft must have overriden it, or there was
            // a re-election.
            let e = RaftError::ConfChangeError("rolled back".into());

            let _ = notify.send(Err(e));
            event::broadcast(Event::JointStateDrop);
        }

        false
    }

    fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> {
        debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });

        match ddl.clone() {
            Ddl::CreateSpace {
                id,
                name,
                mut format,
                mut primary_key,
                distribution,
            } => {
                use ::tarantool::util::NumOrStr::*;

                let mut last_pk_part_index = 0;
                for pk_part in &mut primary_key {
                    let (index, field) = match &pk_part.field {
                        Num(index) => {
                            if *index as usize >= format.len() {
                                // Ddl prepare operations should be verified before being proposed,
                                // so this shouldn't ever happen. But ignoring this is safe anyway,
                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
                                tlog!(
                                    Warning,
                                    "invalid primary key part: field index {index} is out of bound"
                                );
                                continue;
                            }
                            (*index, &format[*index as usize])
                        }
                        Str(name) => {
                            let field_index = format.iter().zip(0..).find(|(f, _)| f.name == *name);
                            let Some((field, index)) = field_index else {
                                // Ddl prepare operations should be verified before being proposed,
                                // so this shouldn't ever happen. But ignoring this is safe anyway,
                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
                                tlog!(Warning, "invalid primary key part: field '{name}' not found");
                                continue;
                            };
                            // We store all index parts as field indexes.
                            pk_part.field = Num(index);
                            (index, field)
                        }
                    };
                    let Some(field_type) =
                        crate::schema::try_space_field_type_to_index_field_type(field.field_type) else
                    {
                        // Ddl prepare operations should be verified before being proposed,
                        // so this shouldn't ever happen. But ignoring this is safe anyway,
                        // because proc_apply_schema_change will catch the error and ddl will be aborted.
                        tlog!(Warning, "invalid primary key part: field type {} cannot be part of an index", field.field_type);
                        continue;
                    };
                    // We overwrite the one provided in the request because
                    // there's no reason for it to be there, we know the type
                    // right here.
                    pk_part.r#type = Some(field_type);
                    pk_part.is_nullable = Some(field.is_nullable);
                    last_pk_part_index = last_pk_part_index.max(index);
                }

                let primary_key_def = IndexDef {
                    id: 0,
                    name: "primary_key".into(),
                    space_id: id,
                    schema_version,
                    parts: primary_key,
                    operable: false,
                    // TODO: support other cases
                    unique: true,
                    local: true,
                };
                let res = self.storage.indexes.insert(&primary_key_def);
                if let Err(e) = res {
                    // Ignore the error for now, let governor deal with it.
                    tlog!(
                        Warning,
                        "failed creating index '{}': {e}",
                        primary_key_def.name
                    );
                }

                match distribution {
                    Distribution::Global => {
                        // Nothing else is needed
                    }
                    Distribution::ShardedByField { .. } => {
                        todo!()
                    }
                    Distribution::ShardedImplicitly { .. } => {
                        // TODO: if primary key is not the first field or
                        // there's some space between key parts, we want
                        // bucket_id to go closer to the beginning of the tuple,
                        // but this will require to update primary key part
                        // indexes, so somebody should do that at some point.
                        let bucket_id_index = last_pk_part_index + 1;
                        format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());

                        let bucket_id_def = IndexDef {
                            id: 1,
                            name: "bucket_id".into(),
                            space_id: id,
                            schema_version,
                            parts: vec![Part::field(bucket_id_index)
                                .field_type(IFT::Unsigned)
                                .is_nullable(false)],
                            operable: false,
                            unique: false,
                            // TODO: support other cases
                            local: true,
                        };
                        let res = self.storage.indexes.insert(&bucket_id_def);
                        if let Err(e) = res {
                            // Ignore the error for now, let governor deal with it.
                            tlog!(
                                Warning,
                                "failed creating index '{}': {e}",
                                bucket_id_def.name
                            );
                        }
                    }
                }

                let space_def = SpaceDef {
                    id,
                    name,
                    distribution,
                    schema_version,
                    format,
                    operable: false,
                };
                let res = self.storage.spaces.insert(&space_def);
                if let Err(e) = res {
                    // Ignore the error for now, let governor deal with it.
                    tlog!(Warning, "failed creating space '{}': {e}", space_def.name);
                }
            }

            Ddl::CreateIndex {
                space_id,
                index_id,
                by_fields,
            } => {
                let _ = (space_id, index_id, by_fields);
                todo!();
            }
            Ddl::DropSpace { id } => {
                let _ = id;
                todo!();
            }
            Ddl::DropIndex { index_id, space_id } => {
                let _ = (index_id, space_id);
                todo!();
            }
        }

        self.storage
            .properties
            .put(PropertyName::PendingSchemaChange, &ddl)?;
        self.storage
            .properties
            .put(PropertyName::PendingSchemaVersion, &schema_version)?;
        self.storage
            .properties
            .put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;

        Ok(())
    }

    /// Is called during a transaction
    fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
        let mut latch_unlock = || {
            if let Some(notify) = self.joint_state_latch.take() {
                let _ = notify.send(Ok(()));
                event::broadcast(Event::JointStateLeave);
            }
        };

        // Beware: a tiny difference in type names (`V2` or not `V2`)
        // makes a significant difference in `entry.data` binary layout and
        // in joint state transitions.

        // `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
        // applied in an instant without entering the joint state.

        let (is_joint, conf_state) = match entry.entry_type {
            raft::EntryType::EntryConfChange => {
                let mut cc = raft::ConfChange::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                latch_unlock();

                (false, self.raw_node.apply_conf_change(&cc).unwrap())
            }
            raft::EntryType::EntryConfChangeV2 => {
                let mut cc = raft::ConfChangeV2::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                // Unlock the latch when either of conditions is met:
                // - conf_change will leave the joint state;
                // - or it will be applied without even entering one.
                let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
                if leave_joint {
                    latch_unlock();
                }

                // ConfChangeTransition::Auto implies that at this
                // moment raft-rs will implicitly propose another empty
                // conf change that represents leaving the joint state.
                (!leave_joint, self.raw_node.apply_conf_change(&cc).unwrap())
            }
            _ => unreachable!(),
        };

        let raft_id = &self.raft_id();
        let voters_old = self.raft_storage.voters().unwrap().unwrap_or_default();
        if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) {
            if is_joint {
                event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok();
            } else {
                event::broadcast(Event::Demoted);
            }
        }

        self.raft_storage.persist_conf_state(&conf_state).unwrap();
    }

    /// Is called during a transaction
    fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
        for rs in read_states {
            if rs.request_ctx.is_empty() {
                continue;
            }
            let ctx = crate::unwrap_ok_or!(
                traft::EntryContextNormal::from_bytes(&rs.request_ctx),
                Err(e) => {
                    tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);
                    continue;
                }
            );

            if let Some(notify) = self.notifications.remove(&ctx.lc) {
                notify.notify_ok(rs.index);
            }
        }
    }

    /// Is called during a transaction
    fn handle_messages(&mut self, messages: Vec<raft::Message>) {
        for msg in messages {
            if let Err(e) = self.pool.send(msg) {
                tlog!(Error, "{e}");
            }
        }
    }

    /// Processes a so-called "ready state" of the [`raft::RawNode`].
    ///
    /// This includes:
    /// - Sending messages to other instances (raft nodes);
    /// - Applying committed entries;
    /// - Persisting uncommitted entries;
    /// - Persisting hard state (term, vote, commit);
    /// - Notifying pending fibers;
    ///
    /// See also:
    ///
    /// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
    /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
    ///
    /// This function yields.
    fn advance(
        &mut self,
        wake_governor: &mut bool,
        expelled: &mut bool,
        storage_changes: &mut StorageChanges,
    ) {
        // Get the `Ready` with `RawNode::ready` interface.
        if !self.raw_node.has_ready() {
            return;
        }

        let mut ready: raft::Ready = self.raw_node.ready();

        // Send out messages to the other nodes.
        self.handle_messages(ready.take_messages());

        // This is a snapshot, we need to apply the snapshot at first.
        let snapshot = ready.snapshot();
        if !snapshot.is_empty() {
            if let Err(e) = start_transaction(|| -> traft::Result<()> {
                let meta = snapshot.get_metadata();
                self.raft_storage.handle_snapshot_metadata(meta)?;
                // FIXME: apply_snapshot_data calls truncate on clusterwide
                // spaces and even though they're all local spaces doing
                // truncate on them is not allowed on read_only instances.
                // Related issue in tarantool:
                // https://github.com/tarantool/tarantool/issues/5616
                let is_readonly: bool = crate::tarantool::eval("return box.info.ro")?;
                if is_readonly {
                    crate::tarantool::eval("box.cfg { read_only = false }")?;
                }
                let res = self
                    .storage
                    .apply_snapshot_data(snapshot.get_data(), !is_readonly);
                if is_readonly {
                    crate::tarantool::exec("box.cfg { read_only = true }")?;
                }
                #[allow(clippy::let_unit_value)]
                let _ = res?;

                // TODO: As long as the snapshot was sent to us in response to
                // a rejected MsgAppend (which is the only possible case
                // currently), we will send a MsgAppendResponse back which will
                // automatically reset our status from Snapshot to Replicate.
                // But when we implement support for manual snapshot requests,
                // we will have to also implement sending a MsgSnapStatus,
                // to reset out status explicitly to avoid leader ignoring us
                // indefinitely after that point.
                Ok(())
            }) {
                tlog!(Warning, "dropping raft ready: {ready:#?}");
                panic!("transaction failed: {e}, {}", TarantoolError::last());
            }
        }

        if let Some(ss) = ready.ss() {
            if let Err(e) = self.status.send_modify(|s| {
                s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
                s.raft_state = ss.raft_state.into();
            }) {
                tlog!(Warning, "failed updating node status: {e}";
                    "leader_id" => ss.leader_id,
                    "raft_state" => ?ss.raft_state,
                )
            }
        }

        self.handle_read_states(ready.read_states());

        // Apply committed entries.
        let res = self.handle_committed_entries(
            ready.committed_entries(),
            wake_governor,
            expelled,
            storage_changes,
        );
        if let Err(e) = res {
            tlog!(Warning, "dropping raft ready: {ready:#?}");
            panic!("transaction failed: {e}, {}", TarantoolError::last());
        }

        if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
            // Persist uncommitted entries in the raft log.
            self.raft_storage.persist_entries(ready.entries()).unwrap();

            // Raft HardState changed, and we need to persist it.
            if let Some(hs) = ready.hs() {
                self.raft_storage.persist_hard_state(hs).unwrap();
                if let Err(e) = self.status.send_modify(|s| s.term = hs.term) {
                    tlog!(Warning, "failed updating current term: {e}"; "term" => hs.term)
                }
            }

            Ok(())
        }) {
            tlog!(Warning, "dropping raft ready: {ready:#?}");
            panic!("transaction failed: {e}, {}", TarantoolError::last());
        }

        // This bunch of messages is special. It must be sent only
        // AFTER the HardState, Entries and Snapshot are persisted
        // to the stable storage.
        self.handle_messages(ready.take_persisted_messages());

        // Advance the Raft.
        let mut light_rd = self.raw_node.advance(ready);

        // Send out messages to the other nodes.
        self.handle_messages(light_rd.take_messages());

        // Update commit index.
        if let Some(commit) = light_rd.commit_index() {
            self.raft_storage.persist_commit(commit).unwrap();
        }

        // Apply committed entries.
        let res = self.handle_committed_entries(
            light_rd.committed_entries(),
            wake_governor,
            expelled,
            storage_changes,
        );
        if let Err(e) = res {
            tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
            panic!("transaction failed: {e}, {}", TarantoolError::last());
        }

        // Advance the apply index.
        self.raw_node.advance_apply();
    }

    fn wait_lsn(&mut self) -> traft::Result<()> {
        assert!(self.raw_node.raft.state != RaftStateRole::Leader);
        let my_id = self.raw_node.raft.id;

        let my_instance_info = self.storage.instances.get(&my_id)?;
        let replicaset_id = my_instance_info.replicaset_id;
        let replicaset = self.storage.replicasets.get(&replicaset_id)?;
        let replicaset = replicaset.ok_or_else(|| {
            Error::other(format!("replicaset info for id {replicaset_id} not found"))
        })?;
        if replicaset.master_id == my_instance_info.instance_id {
            return Err(Error::other("wait_lsn called on replicaset master"));
        }
        let master = self.storage.instances.get(&replicaset.master_id)?;
        let master_uuid = master.instance_uuid;

        let resp = fiber::block_on(self.pool.call(&master.raft_id, &rpc::lsn::Request {})?)?;
        let target_lsn = resp.lsn;

        let mut current_lsn = None;

        let replication: HashMap<u64, ReplicationInfo> =
            crate::tarantool::eval("return box.info.replication")?;
        for r in replication.values() {
            if r.uuid != master_uuid {
                continue;
            }
            current_lsn = Some(r.lsn);
            break;
        }

        let current_lsn = unwrap_some_or!(current_lsn, {
            return Err(Error::other(format!(
                "replication info is unavailable for instance with uuid \"{master_uuid}\""
            )));
        });

        if current_lsn < target_lsn {
            tlog!(Info, "blocking raft loop until replication progresses";
                "target_lsn" => target_lsn,
                "current_lsn" => current_lsn,
            );
            fiber::sleep(MainLoop::TICK * 4);
        }

        return Ok(());

        #[derive(tlua::LuaRead)]
        struct ReplicationInfo {
            lsn: u64,
            uuid: String,
        }
    }

    fn is_replicaset_master(&self) -> traft::Result<bool> {
        let my_raft_id = self.raw_node.raft.id;
        let my_instance_info = self.storage.instances.get(&my_raft_id)?;
        let replicaset_id = my_instance_info.replicaset_id;
        let replicaset = self.storage.replicasets.get(&replicaset_id)?;
        let res = if let Some(replicaset) = replicaset {
            my_instance_info.instance_id == replicaset.master_id
        } else {
            // Replicaset wasn't initialized yet, fallback to lua eval
            let is_ro: bool = crate::tarantool::eval("return box.info.ro")?;
            !is_ro
        };
        Ok(res)
    }

    #[inline]
    fn cleanup_notifications(&mut self) {
        self.notifications.retain(|_, notify| !notify.is_closed());
    }

    /// Generates a pair of logical clock and a notification channel.
    /// Logical clock is a unique identifier suitable for tagging
    /// entries in raft log. Notification is broadcasted when the
    /// corresponding entry is committed.
    #[inline]
    fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
        let (tx, rx) = notification();
        let lc = {
            self.lc.inc();
            self.lc
        };
        self.notifications.insert(lc, tx);
        (lc, rx)
    }
}

pub(crate) struct MainLoop {
    _loop: Option<fiber::UnitJoinHandle<'static>>,
    loop_waker: watch::Sender<()>,
    stop_flag: Rc<Cell<bool>>,
}

struct MainLoopArgs {
    node_impl: Rc<Mutex<NodeImpl>>,
}

struct MainLoopState {
    next_tick: Instant,
    loop_waker: watch::Receiver<()>,
    stop_flag: Rc<Cell<bool>>,
    watchers: Rc<Mutex<StorageWatchers>>,
}

impl MainLoop {
    pub const TICK: Duration = Duration::from_millis(100);

    fn start(node_impl: Rc<Mutex<NodeImpl>>, watchers: Rc<Mutex<StorageWatchers>>) -> Self {
        let (loop_waker_tx, loop_waker_rx) = watch::channel(());
        let stop_flag: Rc<Cell<bool>> = Default::default();

        let args = MainLoopArgs { node_impl };
        let initial_state = MainLoopState {
            next_tick: Instant::now(),
            loop_waker: loop_waker_rx,
            stop_flag: stop_flag.clone(),
            watchers,
        };

        Self {
            // implicit yield
            _loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state),
            loop_waker: loop_waker_tx,
            stop_flag,
        }
    }

    pub fn wakeup(&self) {
        let _ = self.loop_waker.send(());
    }

    async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl {
        let _ = state.loop_waker.changed().timeout(Self::TICK).await;
        if state.stop_flag.take() {
            return FlowControl::Break;
        }

        let mut node_impl = args.node_impl.lock(); // yields
        if state.stop_flag.take() {
            return FlowControl::Break;
        }

        node_impl.cleanup_notifications();

        let now = Instant::now();
        if now > state.next_tick {
            state.next_tick = instant_saturating_add(now, Self::TICK);
            node_impl.raw_node.tick();
        }

        let mut wake_governor = false;
        let mut expelled = false;
        let mut storage_changes = StorageChanges::new();
        node_impl.advance(&mut wake_governor, &mut expelled, &mut storage_changes); // yields
        drop(node_impl);
        if state.stop_flag.take() {
            return FlowControl::Break;
        }

        {
            // node_impl lock must be dropped before this to avoid deadlocking
            let mut watchers = state.watchers.lock();
            for index in storage_changes {
                if let Some(tx) = watchers.get(&index) {
                    let res = tx.send(());
                    if res.is_err() {
                        watchers.remove(&index);
                    }
                }
            }
        }

        if expelled {
            crate::tarantool::exit(0);
        }

        if wake_governor {
            if let Err(e) = async { global()?.governor_loop.wakeup() }.await {
                tlog!(Warning, "failed waking up governor: {e}");
            }
        }

        FlowControl::Continue
    }
}

impl Drop for MainLoop {
    fn drop(&mut self) {
        self.stop_flag.set(true);
        let _ = self.loop_waker.send(());
        self._loop.take().unwrap().join(); // yields
    }
}

static mut RAFT_NODE: Option<Box<Node>> = None;

pub fn set_global(node: Node) {
    unsafe {
        assert!(
            RAFT_NODE.is_none(),
            "discovery::set_global() called twice, it's a leak"
        );
        RAFT_NODE = Some(Box::new(node));
    }
}

pub fn global() -> traft::Result<&'static Node> {
    // Uninitialized raft node is a regular case. This case may take
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}

#[proc(packed_args)]
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
    let node = global()?;
    for pb in pbs {
        node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);
    }
    Ok(())
}