Skip to content
Snippets Groups Projects
node.rs 54.6 KiB
Newer Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.

use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::TransactionError;
use ::tarantool::fiber;
use ::tarantool::fiber::{Cond, Mutex};
use ::tarantool::proc;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use std::borrow::Cow;
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
use std::cell::Cell;
use std::collections::HashMap;
use std::collections::HashSet;
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
use tarantool::space::UpdateOps;
use crate::kvcell::KVCell;
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
use crate::r#loop::{FlowControl, Loop};
use crate::traft::governor::raft_conf_change;
use crate::traft::storage::ClusterSpace;
use crate::traft::ContextCoercion as _;
use crate::traft::OpDML;
use crate::traft::Peer;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftTerm;
use crate::warn_or_panic;
use ::tarantool::util::IntoClones as _;
use protobuf::Message as _;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::notify::Notify;
use crate::traft::rpc::sharding::cfg::ReplicasetWeights;
use crate::traft::rpc::{replication, sharding, sync};
use crate::traft::storage::StateKey;
use crate::traft::ConnectionPool;
use crate::traft::LogicalClock;
use crate::traft::Topology;
use crate::traft::TopologyRequest;
use crate::traft::{JoinRequest, JoinResponse, UpdatePeerRequest};
use crate::traft::{RaftSpaceAccess, Storage};
use super::OpResult;
use super::{CurrentGrade, TargetGrade};
type RawNode = raft::RawNode<RaftSpaceAccess>;
crate::define_str_enum! {
    pub enum RaftState {
        Follower = "Follower",
        Candidate = "Candidate",
        Leader = "Leader",
        PreCandidate = "PreCandidate",
    }
    FromStr::Err = UnknownRaftState;
}
#[derive(thiserror::Error, Debug)]
#[error("unknown raft state {0}")]
pub struct UnknownRaftState(pub String);

impl RaftState {
    pub fn is_leader(&self) -> bool {
        matches!(self, Self::Leader)
    }
}

impl From<RaftStateRole> for RaftState {
    fn from(role: RaftStateRole) -> Self {
        match role {
            RaftStateRole::Follower => Self::Follower,
            RaftStateRole::Candidate => Self::Candidate,
            RaftStateRole::Leader => Self::Leader,
            RaftStateRole::PreCandidate => Self::PreCandidate,
        }
    }
}

#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
    /// `raft_id` of the current instance
    /// `raft_id` of the leader instance
    pub leader_id: Option<RaftId>,
    /// Current term number
    pub term: RaftTerm,
    /// Current raft state
    pub raft_state: RaftState,
    pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
        if requested_term != self.term {
            return Err(Error::TermMismatch {
                requested: requested_term,
                current: self.term,
            });
        }
        Ok(())
    }
}

/// The heart of `traft` module - the Node.
pub struct Node {
    /// RaftId of the Node.
    //
    // It appears twice in the Node: here and in `status.id`.
    // This is a concious decision.
    // `self.raft_id()` is used in Rust API, and
    // `self.status()` is mostly useful in Lua API.
    raft_id: RaftId,

    node_impl: Rc<Mutex<NodeImpl>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    main_loop: MainLoop,
    _conf_change_loop: fiber::UnitJoinHandle<'static>,
    status: Rc<Cell<Status>>,
impl std::fmt::Debug for Node {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Node")
            .field("raft_id", &self.raft_id)
            .finish_non_exhaustive()
    }
    /// Initialize the raft node.
    /// **This function yields**
    pub fn new(storage: Storage) -> Result<Self, RaftError> {
        let node_impl = NodeImpl::new(storage.clone())?;
        let raft_id = node_impl.raft_id();
        let status = Rc::new(Cell::new(Status {
            leader_id: None,
            term: traft::INIT_RAFT_TERM,
            raft_state: RaftState::Follower,
        }));
        let node_impl = Rc::new(Mutex::new(node_impl));
        let conf_change_loop_fn = {
            let status = status.clone();
            move || raft_conf_change_loop(status, storage)
        };

        let node = Node {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            main_loop: MainLoop::start(status.clone(), node_impl.clone()), // yields
            _conf_change_loop: fiber::Builder::new()
                .name("raft_conf_change_loop")
                .proc(conf_change_loop_fn)
                .start()
                .unwrap(),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            node_impl,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            status,
        };

        // Wait for the node to enter the main loop
        Ok(node)
    pub fn raft_id(&self) -> RaftId {
    pub fn status(&self) -> Status {
        self.status.get()
    /// Wait for the status to be changed.
    /// **This function yields**
    pub fn wait_status(&self) {
        event::wait(Event::StatusChanged).expect("Events system wasn't initialized");
    /// **This function yields**
    pub fn wait_for_read_state(&self, timeout: Duration) -> traft::Result<RaftIndex> {
        let notify = self.raw_operation(|node_impl| node_impl.read_state_async())?;
        notify.recv_timeout::<RaftIndex>(timeout)
    /// Propose an operation and wait for it's result.
    /// **This function yields**
    pub fn propose_and_wait<T: OpResult + Into<traft::Op>>(
        &self,
        op: T,
        timeout: Duration,
        let notify = self.raw_operation(|node_impl| node_impl.propose_async(op))?;
        notify.recv_timeout::<T::Result>(timeout)
    /// Become a candidate and wait for a main loop round so that there's a
    /// chance we become the leader.
    /// **This function yields**
    pub fn campaign_and_yield(&self) -> traft::Result<()> {
        self.raw_operation(|node_impl| node_impl.campaign())?;
        // Even though we don't expect a response, we still should let the
        // main_loop do an iteration. Without rescheduling, the Ready state
        // wouldn't be processed, the Status wouldn't be updated, and some
        // assertions may fail (e.g. in `postjoin()` in `main.rs`).
        fiber::reschedule();
        Ok(())
    /// **This function yields**
    pub fn step_and_yield(&self, msg: raft::Message) {
        self.raw_operation(|node_impl| node_impl.step(msg))
            .map_err(|e| tlog!(Error, "{e}"))
            .ok();
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    /// **This function yields**
    pub fn tick_and_yield(&self, n_times: u32) {
        self.raw_operation(|node_impl| node_impl.tick(n_times));
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    /// **This function yields**
    pub fn timeout_now(&self) {
        let raft_id = self.raft_id();
        self.step_and_yield(raft::Message {
            to: raft_id,
            from: raft_id,
            msg_type: raft::MessageType::MsgTimeoutNow,
            ..Default::default()
        })
    /// Processes the topology request and appends [`Op::PersistPeer`]
    /// entry to the raft log (if successful).
    /// Returns the resulting peer when the entry is committed.
    ///
    /// Returns an error if the callee node isn't a raft leader.
    /// **This function yields**
    pub fn handle_topology_request_and_wait(
        &self,
        req: TopologyRequest,
    ) -> traft::Result<Box<traft::Peer>> {
            self.raw_operation(|node_impl| node_impl.process_topology_request_async(req))?;
    /// Only the conf_change_loop on a leader is eligible to call this function.
    ///
    /// **This function yields**
        &self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
        let notify =
            self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
    /// This function **may yield** if `self.node_impl` mutex is acquired.
    fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
        let mut node_impl = self.node_impl.lock();
        let res = f(&mut node_impl);
        drop(node_impl);
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        self.main_loop.wakeup();
    #[inline]
    pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
        self.storage.raft.all_traft_entries()
struct NodeImpl {
    pub notifications: HashMap<LogicalClock, Notify>,
    topology_cache: KVCell<RaftTerm, Topology>,
    joint_state_latch: KVCell<RaftIndex, Notify>,
    pool: ConnectionPool,
impl NodeImpl {
    fn new(storage: Storage) -> Result<Self, RaftError> {
        let box_err = |e| StorageError::Other(Box::new(e));

        let raft_id: RaftId = storage.raft.raft_id().map_err(box_err)?.unwrap();
        let applied: RaftIndex = storage.raft.applied().map_err(box_err)?.unwrap_or(0);
            let gen = storage.raft.gen().unwrap().unwrap_or(0) + 1;
            storage.raft.persist_gen(gen).unwrap();
            LogicalClock::new(raft_id, gen)
        };

        let pool = ConnectionPool::builder(storage.peers.clone())
            .handler_name(stringify_cfunc!(raft_interact))
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            .call_timeout(MainLoop::TICK * 4)
            .connect_timeout(MainLoop::TICK * 4)
            .inactivity_timeout(Duration::from_secs(60))
            .build();

        let cfg = raft::Config {
            id: raft_id,
            applied,
            pre_vote: true,
            ..Default::default()
        };

        let raw_node = RawNode::new(&cfg, storage.raft.clone(), &tlog::root())?;

        Ok(Self {
            raw_node,
            notifications: Default::default(),
            topology_cache: KVCell::new(),
            joint_state_latch: KVCell::new(),
        })
    }

    fn raft_id(&self) -> RaftId {
        self.raw_node.raft.id
    }

    /// Provides mutable access to the Topology struct which reflects
    /// uncommitted state of the cluster. Ensures the node is a leader.
    /// In case it's not — returns an error.
    ///
    /// It's important to access topology through this function so that
    /// new changes are consistent with uncommitted ones.
    fn topology_mut(&mut self) -> Result<&mut Topology, Error> {
        if self.raw_node.raft.state != RaftStateRole::Leader {
            self.topology_cache.take(); // invalidate the cache
        }

        let current_term = self.raw_node.raft.term;

        let topology: Topology = unwrap_some_or! {
            self.topology_cache.take_or_drop(&current_term),
            {
                let peers = self.storage.peers.all_peers()?;
                let replication_factor = self.storage.state.get(StateKey::ReplicationFactor)?.unwrap();
                Topology::from_peers(peers).with_replication_factor(replication_factor)
            }
        };

        Ok(self.topology_cache.insert(current_term, topology))
    }

    pub fn read_state_async(&mut self) -> Result<Notify, RaftError> {
        // In some states `raft-rs` ignores the ReadIndex request.
        // Check it preliminary, don't wait for the timeout.
        //
        // See for details:
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
        let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
        let term_just_started = // ...
            self.raw_node.raft.state == RaftStateRole::Leader
            && !self.raw_node.raft.commit_to_current_term();

        if leader_doesnt_exist || term_just_started {
            return Err(RaftError::ProposalDropped);
        }

        let (lc, notify) = self.schedule_notification();
        // read_index puts this context into an Entry,
        // so we've got to compose full EntryContext,
        // despite single LogicalClock would be enough
        let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
        self.raw_node.read_index(ctx.to_bytes());
        Ok(notify)
    }

    pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
    where
        T: Into<traft::Op>,
    {
        let (lc, notify) = self.schedule_notification();
        let ctx = traft::EntryContextNormal::new(lc, op);
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        Ok(notify)
    }

    pub fn campaign(&mut self) -> Result<(), RaftError> {
        self.raw_node.campaign()
    }

    pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
        if msg.to != self.raft_id() {
            return Ok(());
        }

        // TODO check it's not a MsgPropose with op::PersistPeer.
        // TODO check it's not a MsgPropose with ConfChange.
        self.raw_node.step(msg)
    }

    pub fn tick(&mut self, n_times: u32) {
        for _ in 0..n_times {
            self.raw_node.tick();
        }
    }

    /// Processes the topology request and appends [`Op::PersistPeer`]
    /// entry to the raft log (if successful).
    ///
    /// Returns an error if the callee node isn't a Raft leader.
    ///
    /// **This function yields**
    pub fn process_topology_request_async(
        &mut self,
        req: TopologyRequest,
        let topology = self.topology_mut()?;
        let peer_result = match req {
            TopologyRequest::Join(JoinRequest {
                instance_id,
                replicaset_id,
                advertise_address,
                failure_domain,
                ..
            }) => topology.join(
                instance_id,
                replicaset_id,
                advertise_address,
                failure_domain,
            ),
            TopologyRequest::UpdatePeer(req) => topology.update_peer(req),
        };

        let mut peer = peer_result.map_err(RaftError::ConfChangeError)?;
        peer.commit_index = self.raw_node.raft.raft_log.last_index() + 1;

        let (lc, notify) = self.schedule_notification();
        let ctx = traft::EntryContextNormal::new(lc, Op::persist_peer(peer));

        // Important! Calling `raw_node.propose()` may result in
        // `ProposalDropped` error, but the topology has already been
        // modified. The correct handling of this case should be the
        // following.
        //
        // The `topology_cache` should be preserved. It won't be fully
        // consistent anymore, but that's bearable. (TODO: examine how
        // the particular requests are handled). At least it doesn't
        // much differ from the case of overriding the entry due to a
        // re-election.
        //
        // On the other hand, dropping topology_cache may be much more
        // harmful. Loss of the uncommitted entries could result in
        // assigning the same `raft_id` to a two different nodes.
        //
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        Ok(notify)
    }

    fn propose_conf_change_async(
        &mut self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
    ) -> Result<Notify, RaftError> {
        // In some states proposing a ConfChange is impossible.
        // Check if there's a reason to reject it.

        // Checking leadership is only needed for the
        // correct latch management. It doesn't affect
        // raft correctness. Checking the instance is a
        // leader makes sure the proposed `ConfChange`
        // is appended to the raft log immediately
        // instead of sending `MsgPropose` over the
        // network.
        if self.raw_node.raft.state != RaftStateRole::Leader {
            return Err(RaftError::ConfChangeError("not a leader".into()));
        }
        if term != self.raw_node.raft.term {
            return Err(RaftError::ConfChangeError("raft term mismatch".into()));
        }
        // Without this check the node would silently ignore the conf change.
        // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
        if self.raw_node.raft.has_pending_conf() {
            return Err(RaftError::ConfChangeError(
                "already has pending confchange".into(),
            ));
        }

        let prev_index = self.raw_node.raft.raft_log.last_index();
        self.raw_node.propose_conf_change(vec![], conf_change)?;

        // Ensure the ConfChange was actually appended to the log.
        // Otherwise it's a problem: current instance isn't actually a
        // leader (which is impossible in theory, but we're not sure in
        // practice) and sent the message to the raft network. It may
        // lead to an inconsistency.
        let last_index = self.raw_node.raft.raft_log.last_index();
        assert_eq!(last_index, prev_index + 1);

        if !self.joint_state_latch.is_empty() {
            warn_or_panic!("joint state latch is locked");
        }

        let (rx, tx) = Notify::new().into_clones();
        self.joint_state_latch.insert(last_index, tx);
        event::broadcast(Event::JointStateEnter);

    /// Is called during a transaction
    fn handle_committed_entries(
        &mut self,
        entries: &[raft::Entry],
        topology_changed: &mut bool,
        expelled: &mut bool,
    ) {
        for entry in entries {
            let entry = match traft::Entry::try_from(entry) {
                Ok(v) => v,
                Err(e) => {
                    tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
                    continue;
                }
            };

            match entry.entry_type {
                raft::EntryType::EntryNormal => {
                    self.handle_committed_normal_entry(entry, topology_changed, expelled)
                }
                raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
                    self.handle_committed_conf_change(entry)
                }
            }
        }

        if let Some(last_entry) = entries.last() {
            if let Err(e) = self.storage.raft.persist_applied(last_entry.index) {
                tlog!(
                    Error,
                    "error persisting applied index: {e}";
                    "index" => last_entry.index
                );
            } else {
                event::broadcast(Event::RaftEntryApplied);
            }
        }
    }

    /// Is called during a transaction
    fn handle_committed_normal_entry(
        &mut self,
        entry: traft::Entry,
        topology_changed: &mut bool,
        expelled: &mut bool,
    ) {
        assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
        let result = entry
            .op()
            .unwrap_or(&traft::Op::Nop)
            .on_commit(&self.storage.peers);

        if let Some(lc) = entry.lc() {
            if let Some(notify) = self.notifications.remove(lc) {
                notify.notify_ok_any(result);
            }
        }

        if let Some(traft::Op::PersistPeer { peer }) = entry.op() {
            *topology_changed = true;
            if peer.current_grade == CurrentGrade::Expelled && peer.raft_id == self.raft_id() {
                // cannot exit during a transaction
                *expelled = true;
            }
        }

        if let Some(notify) = self.joint_state_latch.take_or_keep(&entry.index) {
            // It was expected to be a ConfChange entry, but it's
            // normal. Raft must have overriden it, or there was
            // a re-election.
            let e = RaftError::ConfChangeError("rolled back".into());
            notify.notify_err(e);
            event::broadcast(Event::JointStateDrop);
        }
    }

    /// Is called during a transaction
    fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
        let mut latch_unlock = || {
            if let Some(notify) = self.joint_state_latch.take() {
                notify.notify_ok(());
                event::broadcast(Event::JointStateLeave);
            }
        };

        // Beware: a tiny difference in type names (`V2` or not `V2`)
        // makes a significant difference in `entry.data` binary layout and
        // in joint state transitions.

        // `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
        // applied in an instant without entering the joint state.

        let (is_joint, conf_state) = match entry.entry_type {
            raft::EntryType::EntryConfChange => {
                let mut cc = raft::ConfChange::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                latch_unlock();

                (false, self.raw_node.apply_conf_change(&cc).unwrap())
            }
            raft::EntryType::EntryConfChangeV2 => {
                let mut cc = raft::ConfChangeV2::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                // Unlock the latch when either of conditions is met:
                // - conf_change will leave the joint state;
                // - or it will be applied without even entering one.
                let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
                if leave_joint {
                    latch_unlock();
                }

                // ConfChangeTransition::Auto implies that at this
                // moment raft-rs will implicitly propose another empty
                // conf change that represents leaving the joint state.
                (!leave_joint, self.raw_node.apply_conf_change(&cc).unwrap())
            }
            _ => unreachable!(),
        };

        let raft_id = &self.raft_id();
        let voters_old = self.storage.raft.voters().unwrap().unwrap_or_default();
        if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) {
            if is_joint {
                event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok();
            } else {
                event::broadcast(Event::Demoted);
            }
        }

        self.storage.raft.persist_conf_state(&conf_state).unwrap();
    }

    /// Is called during a transaction
    fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
        for rs in read_states {
            let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
                Ok(Some(v)) => v,
                Ok(None) => continue,
                Err(e) => {
                    tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);
                    continue;
                }
            };

            if let Some(notify) = self.notifications.remove(&ctx.lc) {
                notify.notify_ok(rs.index);
            }
        }
    }

    /// Is called during a transaction
    fn handle_messages(&mut self, messages: Vec<raft::Message>) {
            if let Err(e) = self.pool.send(msg) {
    /// Processes a so-called "ready state" of the [`raft::RawNode`].
    ///
    /// This includes:
    /// - Sending messages to other instances (raft nodes);
    /// - Applying committed entries;
    /// - Persisting uncommitted entries;
    /// - Persisting hard state (term, vote, commit);
    /// - Notifying pending fibers;
    ///
    /// See also:
    ///
    /// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
    /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
    ///
    /// This function yields.
    fn advance(&mut self, status: &Cell<Status>, topology_changed: &mut bool, expelled: &mut bool) {
        // Get the `Ready` with `RawNode::ready` interface.
        if !self.raw_node.has_ready() {
            return;
        }

        let mut ready: raft::Ready = self.raw_node.ready();

        // Send out messages to the other nodes.
        self.handle_messages(ready.take_messages());
        // This is a snapshot, we need to apply the snapshot at first.
        if !ready.snapshot().is_empty() {
            unimplemented!();
        }
        if let Some(ss) = ready.ss() {
            let mut s = status.get();
            s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
            s.raft_state = ss.raft_state.into();
            status.set(s);
            event::broadcast(Event::StatusChanged);
        }
        self.handle_read_states(ready.read_states());
        start_transaction(|| -> Result<(), TransactionError> {
            // Apply committed entries.
            self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled);
            // Persist uncommitted entries in the raft log.
            self.storage.raft.persist_entries(ready.entries()).unwrap();
            // Raft HardState changed, and we need to persist it.
            if let Some(hs) = ready.hs() {
                self.storage.raft.persist_hard_state(hs).unwrap();

                let mut s = status.get();
                s.term = hs.term;
                status.set(s);
        // This bunch of messages is special. It must be sent only
        // AFTER the HardState, Entries and Snapshot are persisted
        // to the stable storage.
        self.handle_messages(ready.take_persisted_messages());

        // Advance the Raft.
        let mut light_rd = self.raw_node.advance(ready);

        // Send out messages to the other nodes.
        self.handle_messages(light_rd.take_messages());

        start_transaction(|| -> Result<(), TransactionError> {
            // Update commit index.
            if let Some(commit) = light_rd.commit_index() {
                self.storage.raft.persist_commit(commit).unwrap();
            // Apply committed entries.
            self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled);

        // Advance the apply index.
        self.raw_node.advance_apply();
    #[inline]
    fn cleanup_notifications(&mut self) {
        self.notifications
            .retain(|_, notify: &mut Notify| !notify.is_closed());
    }

    /// Generates a pair of logical clock and a notification channel.
    /// Logical clock is a unique identifier suitable for tagging
    /// entries in raft log. Notification is broadcasted when the
    /// corresponding entry is committed.
    #[inline]
    fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
        let (rx, tx) = Notify::new().into_clones();
        let lc = {
            self.lc.inc();
            self.lc
        };
        self.notifications.insert(lc, tx);
        (lc, rx)
    }
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
struct MainLoop {
    _loop: Option<Loop>,
    loop_cond: Rc<Cond>,
    stop_flag: Rc<Cell<bool>>,
}

struct MainLoopArgs {
    status: Rc<Cell<Status>>,
    node_impl: Rc<Mutex<NodeImpl>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
}

struct MainLoopState {
    next_tick: Instant,
    loop_cond: Rc<Cond>,
    stop_flag: Rc<Cell<bool>>,
}

impl MainLoop {
    pub const TICK: Duration = Duration::from_millis(100);

    fn start(status: Rc<Cell<Status>>, node_impl: Rc<Mutex<NodeImpl>>) -> Self {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        let loop_cond: Rc<Cond> = Default::default();
        let stop_flag: Rc<Cell<bool>> = Default::default();
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        let args = MainLoopArgs { status, node_impl };
        let initial_state = MainLoopState {
            next_tick: Instant::now(),
            loop_cond: loop_cond.clone(),
            stop_flag: stop_flag.clone(),
        };

        Self {
            // implicit yield
            _loop: Some(Loop::start(
                "raft_main_loop",
                Self::iter_fn,
                args,
                initial_state,
            )),
            loop_cond,
            stop_flag,
        }
    }

    pub fn wakeup(&self) {
        self.loop_cond.broadcast();
    }

    fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl {
        state.loop_cond.wait_timeout(Self::TICK); // yields
        if state.stop_flag.take() {
            return FlowControl::Break;
        }

        let mut node_impl = args.node_impl.lock(); // yields
        if state.stop_flag.take() {
            return FlowControl::Break;
        }
        node_impl.cleanup_notifications();
        let now = Instant::now();
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if now > state.next_tick {
            state.next_tick = now + Self::TICK;
            node_impl.raw_node.tick();
        let mut topology_changed = false;
        let mut expelled = false;
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        node_impl.advance(&args.status, &mut topology_changed, &mut expelled); // yields
        if state.stop_flag.take() {
            return FlowControl::Break;
        }
        if expelled {
            crate::tarantool::exit(0);
        }

        if topology_changed {
            event::broadcast(Event::TopologyChanged);
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed

        FlowControl::Continue
    }
}

impl Drop for MainLoop {
    fn drop(&mut self) {
        self.stop_flag.set(true);
        self.loop_cond.broadcast();
        self._loop.take().unwrap().join(); // yields
fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
    let mut pool = ConnectionPool::builder(storage.peers.clone())
        .call_timeout(Duration::from_secs(1))
        .connect_timeout(Duration::from_millis(500))
        .inactivity_timeout(Duration::from_secs(60))
        .build();

    // TODO: don't hardcode this
    const SYNC_TIMEOUT: Duration = Duration::from_secs(10);

Georgy Moshkin's avatar
Georgy Moshkin committed
    'governor: loop {
        if !status.get().raft_state.is_leader() {
            event::wait(Event::StatusChanged).expect("Events system must be initialized");
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        let leader_id = status.get().id;
        let peers = storage.peers.all_peers().unwrap();
        let term = status.get().term;
        let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
        let commit = storage.raft.commit().unwrap().unwrap();
        let node = global().expect("must be initialized");
        ////////////////////////////////////////////////////////////////////////
        // conf change
        let voters = storage.raft.voters().unwrap().unwrap_or_default();
        let learners = storage.raft.learners().unwrap().unwrap_or_default();
        if let Some(conf_change) = raft_conf_change(&peers, &voters, &learners) {
            // main_loop gives the warranty that every ProposeConfChange
            // will sometimes be handled and there's no need in timeout.
            // It also guarantees that the notification will arrive only
            // after the node leaves the joint state.
            match node.propose_conf_change_and_wait(term, conf_change) {
                Ok(()) => tlog!(Info, "conf_change processed"),
                Err(e) => {
                    tlog!(Warning, "conf_change failed: {e}");
                    fiber::sleep(Duration::from_secs(1));
                }
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        ////////////////////////////////////////////////////////////////////////
        // offline
        let to_offline = peers
            .iter()
            .filter(|peer| peer.current_grade != CurrentGrade::Offline)
            // TODO: process them all, not just the first one
            .find(|peer| peer.target_grade == TargetGrade::Offline);
        if let Some(peer) = to_offline {
            let replicaset_id = &peer.replicaset_id;
            let res = (|| -> traft::Result<_> {
                let replicaset = storage.replicasets.get(replicaset_id)?;
                if replicaset
                    .map(|r| r.master_id == peer.instance_id)
                    .unwrap_or(false)
                {
                    let new_master =
                        maybe_responding(&peers).find(|p| p.replicaset_id == replicaset_id);
                    if let Some(peer) = new_master {
                        let mut ops = UpdateOps::new();
                        ops.assign("master_id", &peer.instance_id)?;
                        node.propose_and_wait(
                            OpDML::update(ClusterSpace::Replicasets, &[replicaset_id], ops)?,
                            // TODO: don't hard code the timeout
                            Duration::from_secs(3),
                            // TODO: these `?` will be processed in the wrong place
                        )??;
                    } else {
                        tlog!(Warning, "the last replica has gone offline";
                            "replicaset_id" => %replicaset_id,
                            "instance_id" => %peer.instance_id,
                        );
                    }
                }

                let reqs = maybe_responding(&peers)
                    .filter(|peer| {
                        peer.current_grade == CurrentGrade::ShardingInitialized
                            || peer.current_grade == CurrentGrade::Online
                    })
                    .map(|peer| {
                        (
                            peer.instance_id.clone(),
                            sharding::Request {