Skip to content
Snippets Groups Projects
node.rs 93.5 KiB
Newer Older

        loop {
            let v_local = local_schema_version().expect("storage souldn't fail");
            let v_global = self
                .storage
                .properties
                .global_schema_version()
                .expect("storage shouldn't fail");

            #[rustfmt::skip]
            debug_assert!(v_global <= v_local, "global schema version is only ever increased after local");

            #[rustfmt::skip]
            debug_assert!(v_global <= v_snapshot, "global schema version updates are distributed via raft");

            if v_local > v_snapshot {
                tlog!(
                    Warning,
                    "skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
                    v_local,
                    snapshot_data.schema_version,
                );
                return Ok(None);
            }

            if !self.is_readonly() {
                // Replicaset leader applies the schema changes directly.
                break;
            }

            if v_local == v_snapshot {
                // Replicaset follower has synced schema with the leader,
                // now global space dumps should be handled.
                break;
            }

            tlog!(Debug, "v_local: {v_local}, v_snapshot: {v_snapshot}");
            self.main_loop_status("awaiting replication");
            // Replicaset follower needs to sync with leader via tarantool
            // replication.
            let timeout = MainLoop::TICK * 4;
            fiber::sleep(timeout);
        }

        let mut snapshot_data = snapshot_data;
        if snapshot_data.next_chunk_position.is_some() {
            self.main_loop_status("receiving snapshot");
            let entry_id = RaftEntryId {
                index: snapshot.get_metadata().index,
                term: snapshot.get_metadata().term,
            };
            if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
                // Error has been logged.
                tlog!(Warning, "dropping snapshot data");
                return Err(e);
            }
        }

        Ok(Some(snapshot_data))
    }

    #[inline(always)]
    fn main_loop_status(&self, status: &'static str) {
Georgy Moshkin's avatar
Georgy Moshkin committed
        if self.status.get().main_loop_status == status {
            return;
        }

        tlog!(Debug, "main_loop_status = '{status}'");
        self.status
            .send_modify(|s| s.main_loop_status = status)
            .expect("status shouldn't ever be borrowed across yields");
    /// Processes a so-called "ready state" of the [`raft::RawNode`].
    ///
    /// This includes:
    /// - Sending messages to other instances (raft nodes);
    /// - Handling raft snapshot:
    ///   - Verifying & waiting until snapshot data can be applied
    ///     (see [`Self::prepare_for_snapshot`] for more details);
    ///   - Persisting snapshot metadata;
    ///   - Compacting the raft log;
    ///   - Restoring local storage contents from the snapshot;
    /// - Persisting uncommitted entries;
    /// - Persisting hard state (term, vote, commit);
    /// - Applying committed entries;
    /// - Notifying pending fibers;
    /// - Waking up the governor loop, so that it can handle any global state
    ///   changes;
    ///
    /// Returns an error if the instance was expelled from the cluster.
    ///
    /// See also:
    ///
    /// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
    /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
    ///
    /// This function yields.
    fn advance(&mut self) -> traft::Result<()> {
        // Handle any unreachable nodes from previous iteration.
        let unreachables = self
            .instance_reachability
            .borrow_mut()
            .take_unreachables_to_report();
        for raft_id in unreachables {
            self.raw_node.report_unreachable(raft_id);

            // TODO: remove infos when instances are expelled.
Georgy Moshkin's avatar
Georgy Moshkin committed
            let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else {
                continue;
            };
            // NOTE: Raft-rs will not check if the node should be paused until
            // a new raft entry is appended to the log. This means that once an
            // instance goes silent it will still be bombarded with heartbeats
            // until someone proposes an operation. This is a workaround for
            // that particular case.
            // The istance's state would've only changed if it was not in the
            // Snapshot state, so we have to check for that.
            if pr.state == ::raft::ProgressState::Probe {
                pr.pause();
            }
        }

        // Get the `Ready` with `RawNode::ready` interface.
        if !self.raw_node.has_ready() {
        let mut ready: raft::Ready = self.raw_node.ready();

        // Apply soft state changes before anything else, so that this info is
Georgy Moshkin's avatar
Georgy Moshkin committed
        // available for other fibers as soon as main loop yields.
        if let Some(ss) = ready.ss() {
            self.status
                .send_modify(|s| {
                    s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
                    s.raft_state = ss.raft_state.into();
                })
                .expect("status shouldn't ever be borrowed across yields");
        }

        // These messages are only available on leader. Send them out ASAP.
        self.handle_messages(ready.take_messages());
        // Handle read states before applying snapshot which may fail.
        self.handle_read_states(ready.read_states());

        // Raft snapshot has arrived, check if we need to apply it.
        let snapshot = ready.snapshot();
        let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
            // Error was already logged
        // Persist stuff raft wants us to persist.
        let hard_state = ready.hs();
        let entries_to_persist = ready.entries();
        if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() {
            let mut new_term = None;
            let mut new_applied = None;

            if let Err(e) = transaction(|| -> Result<(), Error> {
                self.main_loop_status("persisting hard state, entries and/or snapshot");

                // Raft HardState changed, and we need to persist it.
                if let Some(hard_state) = hard_state {
                    tlog!(Debug, "hard state: {hard_state:?}");
                    self.raft_storage.persist_hard_state(hard_state)?;
                    new_term = Some(hard_state.term);
                }

                // Persist uncommitted entries in the raft log.
                if !entries_to_persist.is_empty() {
                    #[rustfmt::skip]
                    debug_assert!(snapshot.is_empty(), "can't have both the snapshot & log entries");

                    self.raft_storage.persist_entries(entries_to_persist)?;
                }

                if let Some(snapshot_data) = snapshot_data {
                    #[rustfmt::skip]
                    debug_assert!(entries_to_persist.is_empty(), "can't have both the snapshot & log entries");

                    // Persist snapshot metadata and compact the raft log if it wasn't empty.
                    let meta = snapshot.get_metadata();
                    self.raft_storage.handle_snapshot_metadata(meta)?;
                    new_applied = Some(meta.index);

                    // Persist the contents of the global tables from the snapshot data.
                    let is_master = !self.is_readonly();
                    self.storage
                        .apply_snapshot_data(&snapshot_data, is_master)?;

                    // TODO: As long as the snapshot was sent to us in response to
                    // a rejected MsgAppend (which is the only possible case
                    // currently), we will send a MsgAppendResponse back which will
                    // automatically reset our status from Snapshot to Replicate.
                    // But when we implement support for manual snapshot requests,
                    // we will have to also implement sending a MsgSnapStatus,
                    // to reset out status explicitly to avoid leader ignoring us
                    // indefinitely after that point.
                }
                Ok(())
            }) {
                tlog!(Warning, "dropping raft ready: {ready:#?}");
                panic!("transaction failed: {e}");
            if let Some(new_term) = new_term {
                    .send_modify(|s| s.term = new_term)
                    .expect("status shouldn't ever be borrowed across yields");
            if let Some(new_applied) = new_applied {
                // handle_snapshot_metadata persists applied index, so we update the watch channel
                self.applied
                    .send(new_applied)
                    .expect("applied shouldn't ever be borrowed across yields");
            }

            if hard_state.is_some() {
                crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
            }
            if !entries_to_persist.is_empty() {
                crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_ENTRIES");
            }
        }

        // Apply committed entries.
        let committed_entries = ready.committed_entries();
        if !committed_entries.is_empty() {
            let res = self.handle_committed_entries(committed_entries, &mut expelled);
            if let Err(e) = res {
                tlog!(Warning, "dropping raft ready: {ready:#?}");
                panic!("transaction failed: {e}");
            }

            crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
        // These messages are only available on followers. They must be sent only
        // AFTER the HardState, Entries and Snapshot are persisted
        // to the stable storage.
        self.handle_messages(ready.take_persisted_messages());

        // Advance the Raft. Make it know, that the necessary entries have been persisted.
        // If this is a leader, it may commit some of the newly persisted entries.
        let mut light_rd = self.raw_node.advance(ready);

        // Send new message ASAP. (Only on leader)
        let messages = light_rd.take_messages();
        if !messages.is_empty() {
            debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);

            self.handle_messages(messages);
        }
        // Update commit index. (Only on leader)
        if let Some(commit) = light_rd.commit_index() {
            debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);

            if let Err(e) = transaction(|| -> Result<(), Error> {
                self.main_loop_status("persisting commit index");
                tlog!(Debug, "commit index: {}", commit);

                self.raft_storage.persist_commit(commit)?;

                Ok(())
            }) {
                tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
                panic!("transaction failed: {e}");
            }

            crate::error_injection!(block "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX");
        // These are probably entries which we've just persisted.
        let committed_entries = light_rd.committed_entries();
        if !committed_entries.is_empty() {
            let res = self.handle_committed_entries(committed_entries, &mut expelled);
            if let Err(e) = res {
                panic!("transaction failed: {e}");
            }

            crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");

        // Advance the apply index.
        self.raw_node.advance_apply();

        self.main_loop_status("idle");
    /// Check if this is a read only replica. This function is called when we
    /// need to determine if this instance should be changing the schema
    /// definition or if it should instead synchronize with a master.
    ///
    /// Note: it would be a little more reliable to check if the replica is
    /// chosen to be a master by checking master_id in _pico_replicaset, but
    /// currently we cannot do that, because tarantool replication is being
    /// done asynchronously with raft log replication. Basically instance needs
    /// to know it's a replicaset master before it can access the replicaset
    /// info.
    fn is_readonly(&self) -> bool {
        let is_ro: bool = crate::tarantool::eval("return box.info.ro")
            .expect("checking read-onlyness should never fail");
        is_ro
    /// Generates a pair of logical clock and a notification channel.
    /// Logical clock is a unique identifier suitable for tagging
    /// entries in raft log. Notification is broadcasted when the
    /// corresponding entry is committed.
    #[inline]
    fn schedule_read_state_waker(&mut self) -> (LogicalClock, oneshot::Receiver<RaftIndex>) {
        let (tx, rx) = oneshot::channel();
        self.read_state_wakers.insert(lc, tx);
/// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be
/// done as result of attempting to apply a given entry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyEntryResult {
    /// This entry failed to apply for some reason, and must be retried later.
    SleepAndRetry,

    /// Entry applied successfully, proceed to next entry.
    EntryApplied,
}

    _loop: Option<fiber::JoinHandle<'static, ()>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    stop_flag: Rc<Cell<bool>>,
}

struct MainLoopState {
    node_impl: Rc<Mutex<NodeImpl>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    next_tick: Instant,
    loop_waker: watch::Receiver<()>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    stop_flag: Rc<Cell<bool>>,
}

impl MainLoop {
    pub const TICK: Duration = Duration::from_millis(100);

    fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
        let (loop_waker_tx, loop_waker_rx) = watch::channel(());
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        let stop_flag: Rc<Cell<bool>> = Default::default();
        let state = MainLoopState {
            node_impl,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            next_tick: Instant::now(),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            stop_flag: stop_flag.clone(),
        };

        Self {
            _loop: loop_start!("raft_main_loop", Self::iter_fn, state),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            stop_flag,
        }
    }

    pub fn wakeup(&self) {
        let _ = self.loop_waker.send(());
    async fn iter_fn(state: &mut MainLoopState) -> ControlFlow<()> {
        let _ = state.loop_waker.changed().timeout(Self::TICK).await;
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return ControlFlow::Break(());
        // FIXME: potential deadlock - can't use sync mutex in async fn
        let mut node_impl = state.node_impl.lock(); // yields
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return ControlFlow::Break(());
        node_impl
            .read_state_wakers
            .retain(|_, waker| !waker.is_closed());
        let now = Instant::now();
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if now > state.next_tick {
            state.next_tick = now.saturating_add(Self::TICK);
            node_impl.raw_node.tick();
        let res = node_impl.advance(); // yields
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return ControlFlow::Break(());
        match res {
            Err(e @ Error::Expelled) => {
                tlog!(Info, "{e}, shutting down");
                crate::tarantool::exit(0);
            }
            Err(e) => {
                tlog!(Error, "error during raft main loop iteration: {e}");
        ControlFlow::Continue(())
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    }
}

impl Drop for MainLoop {
    fn drop(&mut self) {
        self.stop_flag.set(true);
        let _ = self.loop_waker.send(());
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        self._loop.take().unwrap().join(); // yields
pub fn global() -> traft::Result<&'static Node> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    // Uninitialized raft node is a regular case. This case may take
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}

#[proc(packed_args)]
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
    for pb in pbs {
        node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);

/// Internal API. Causes this instance to artificially timeout on waiting
/// for a heartbeat from raft leader. The instance then will start a new
/// election and transition to a 'PreCandidate' state.
///
/// This function yields. It returns when the raft node changes it's state.
///
/// Later the instance will likely become a leader, unless there are some
/// impediments, e.g. the loss of quorum or split-vote.
///
/// Example log:
/// ```ignore
///     received MsgTimeoutNow from 3 and starts an election
///         to get leadership., from: 3, term: 4, raft_id: 3
///
///     starting a new election, term: 4, raft_id: 3
///
///     became candidate at term 5, term: 5, raft_id: 3
///
///     broadcasting vote request, to: [4, 1], log_index: 54,
///         log_term: 4, term: 5, type: MsgRequestVote, raft_id: 3
///
///     received votes response, term: 5, type: MsgRequestVoteResponse,
///         approvals: 2, rejections: 0, from: 4, vote: true, raft_id: 3
///
///     became leader at term 5, term: 5, raft_id: 3
/// ```
#[proc(public = false)]
fn proc_raft_promote() -> traft::Result<()> {
    let node = global()?;
    node.timeout_now();
    Ok(())
}