Skip to content
Snippets Groups Projects
node.rs 63.9 KiB
Newer Older
                        "failed creating index '{}': {e}",
                        primary_key_def.name
                    );
                }

                match distribution {
                    Distribution::Global => {
                        // Nothing else is needed
                    }
                    Distribution::ShardedByField { .. } => {
                        todo!()
                    }
                    Distribution::ShardedImplicitly { .. } => {
                        // TODO: if primary key is not the first field or
                        // there's some space between key parts, we want
                        // bucket_id to go closer to the beginning of the tuple,
                        // but this will require to update primary key part
                        // indexes, so somebody should do that at some point.
                        let bucket_id_index = last_pk_part_index + 1;
                        format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());

                        let bucket_id_def = IndexDef {
                            id: 1,
                            name: "bucket_id".into(),
                            space_id: id,
                            schema_version,
                            parts: vec![Part::field(bucket_id_index)
                                .field_type(IFT::Unsigned)
                                .is_nullable(false)],
                            operable: false,
                            unique: false,
                            // TODO: support other cases
                            local: true,
                        };
                        let res = self.storage.indexes.insert(&bucket_id_def);
                        if let Err(e) = res {
                            // Ignore the error for now, let governor deal with it.
                            tlog!(
                                Warning,
                                "failed creating index '{}': {e}",
                                bucket_id_def.name
                            );
                        }
                    }
                }

                let space_def = SpaceDef {
                    id,
                    name,
                    distribution,
                    schema_version,
                    format,
                    operable: false,
                let res = self.storage.spaces.insert(&space_def);
                if let Err(e) = res {
                    // Ignore the error for now, let governor deal with it.
                    tlog!(Warning, "failed creating space '{}': {e}", space_def.name);
                }

            Ddl::CreateIndex {
                space_id,
                index_id,
                by_fields,
            } => {
                let _ = (space_id, index_id, by_fields);
                todo!();
            }
                ddl_meta_space_update_operable(&self.storage, id, false)
                    .expect("storage shouldn't fail");
            Ddl::DropIndex { index_id, space_id } => {
                let _ = (index_id, space_id);
                todo!();
            }
        }

        self.storage
            .properties
            .put(PropertyName::PendingSchemaChange, &ddl)?;
        self.storage
            .properties
            .put(PropertyName::PendingSchemaVersion, &schema_version)?;
        self.storage
            .properties
            .put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
    /// Is called during a transaction
    fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
        let mut latch_unlock = || {
            if let Some(notify) = self.joint_state_latch.take() {
                let _ = notify.send(Ok(()));
                event::broadcast(Event::JointStateLeave);
            }
        };

        // Beware: a tiny difference in type names (`V2` or not `V2`)
        // makes a significant difference in `entry.data` binary layout and
        // in joint state transitions.

        // `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
        // applied in an instant without entering the joint state.

        let conf_state = match entry.entry_type {
            raft::EntryType::EntryConfChange => {
                let mut cc = raft::ConfChange::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                latch_unlock();

                self.raw_node.apply_conf_change(&cc).unwrap()
            }
            raft::EntryType::EntryConfChangeV2 => {
                let mut cc = raft::ConfChangeV2::default();
                cc.merge_from_bytes(&entry.data).unwrap();

                // Unlock the latch when either of conditions is met:
                // - conf_change will leave the joint state;
                // - or it will be applied without even entering one.
                let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
                if leave_joint {
                    latch_unlock();
                }

                // ConfChangeTransition::Auto implies that at this
                // moment raft-rs will implicitly propose another empty
                // conf change that represents leaving the joint state.
                self.raw_node.apply_conf_change(&cc).unwrap()
        self.raft_storage.persist_conf_state(&conf_state).unwrap();
    }

    /// Is called during a transaction
    fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
            if rs.request_ctx.is_empty() {
                continue;
            }
            let ctx = crate::unwrap_ok_or!(
                traft::EntryContextNormal::from_bytes(&rs.request_ctx),
                    tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);

            if let Some(notify) = self.notifications.remove(&ctx.lc) {
                notify.notify_ok(rs.index);
            }
        }
    }

    /// Is called during a transaction
    fn handle_messages(&mut self, messages: Vec<raft::Message>) {
        if messages.is_empty() {
            return;
        }

        self.main_loop_status("sending raft messages");
        let mut sent_count = 0;
        let mut skip_count = 0;

        let instance_reachability = self.instance_reachability.borrow();
            if msg.msg_type == raft::MessageType::MsgHeartbeat
                && !instance_reachability.should_send_heartbeat_this_tick(msg.to)
            {
            if let Err(e) = self.pool.send(msg) {

        tlog!(
            Debug,
            "done sending messages, sent: {sent_count}, skipped: {skip_count}"
        );
    }

    fn fetch_chunkwise_snapshot(
        &self,
        snapshot_data: &mut SnapshotData,
        entry_id: RaftEntryId,
    ) -> traft::Result<()> {
        #[rustfmt::skip]
        let mut position = snapshot_data.next_chunk_position
            .expect("shouldn't be None if this function is called");
        let space_dumps = &mut snapshot_data.space_dumps;
        #[cfg(debug_assertions)]
        let mut last_space_id = 0;
        #[cfg(debug_assertions)]
        let mut last_space_tuple_count = 0;

        loop {
            self.main_loop_status("receiving snapshot");

            let Some(leader_id) = self.status.get().leader_id else {
                tlog!(Warning, "leader id is unknown while trying to request next snapshot chunk");
                return Err(Error::LeaderUnknown);
            };

            #[cfg(debug_assertions)]
            {
                let last = space_dumps.last().expect("should not be empty");

                if last.space_id != last_space_id {
                    last_space_tuple_count = 0;
                }
                last_space_id = last.space_id;

                let mut tuples = last.tuples.as_ref();
                let count = rmp::decode::read_array_len(&mut tuples)
                    .expect("space dump should contain a msgpack array");
                last_space_tuple_count += count;

                assert_eq!(last_space_id, position.space_id);
                assert_eq!(last_space_tuple_count, position.tuple_offset);
            }
            let req = rpc::snapshot::Request { entry_id, position };

            const SNAPSHOT_CHUNK_REQUEST_TIMEOUT: Duration = Duration::from_secs(60);
            tlog!(Debug, "requesting next snapshot chunk";
                "entry_id" => %entry_id,
                "position" => %position,
            );

            let fut = self
                .pool
                .call(&leader_id, &req, SNAPSHOT_CHUNK_REQUEST_TIMEOUT);
            let fut = unwrap_ok_or!(fut,
                Err(e) => {
                    tlog!(Warning, "failed requesting next snapshot chunk: {e}");
                    self.main_loop_status("error when receiving snapshot");
                    fiber::sleep(MainLoop::TICK * 4);
                    continue;
                }
            );

            let resp = fiber::block_on(fut);
            let mut resp = unwrap_ok_or!(resp,
                Err(e) => {
                    let msg = e.to_string();
                    if msg.contains("read view not available") {
                        tlog!(Warning, "aborting snapshot retrieval: {e}");
                        return Err(e);
                    }

                    tlog!(Warning, "failed requesting next snapshot chunk: {e}");
                    self.main_loop_status("error when receiving snapshot");
                    fiber::sleep(MainLoop::TICK * 4);
                    continue;
                }
            );
            space_dumps.append(&mut resp.snapshot_data.space_dumps);

            position = unwrap_some_or!(resp.snapshot_data.next_chunk_position, {
                tlog!(Debug, "received final snapshot chunk");
                break;
            });
        }

        Ok(())
    }

    #[inline(always)]
    fn main_loop_status(&self, status: &'static str) {
        tlog!(Debug, "main_loop_status = '{status}'");
        self.status
            .send_modify(|s| s.main_loop_status = status)
            .expect("status shouldn't ever be borrowed across yields");
    /// Processes a so-called "ready state" of the [`raft::RawNode`].
    ///
    /// This includes:
    /// - Sending messages to other instances (raft nodes);
    /// - Applying committed entries;
    /// - Persisting uncommitted entries;
    /// - Persisting hard state (term, vote, commit);
    /// - Notifying pending fibers;
    ///
    /// See also:
    ///
    /// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
    /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
    ///
    /// This function yields.
    fn advance(&mut self, wake_governor: &mut bool, expelled: &mut bool) {
        // Handle any unreachable nodes from previous iteration.
        let unreachables = self
            .instance_reachability
            .borrow_mut()
            .take_unreachables_to_report();
        for raft_id in unreachables {
            self.raw_node.report_unreachable(raft_id);

            // TODO: remove infos when instances are expelled.
Georgy Moshkin's avatar
Georgy Moshkin committed
            let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else {
                continue;
            };
            // NOTE: Raft-rs will not check if the node should be paused until
            // a new raft entry is appended to the log. This means that once an
            // instance goes silent it will still be bombarded with heartbeats
            // until someone proposes an operation. This is a workaround for
            // that particular case.
            // The istance's state would've only changed if it was not in the
            // Snapshot state, so we have to check for that.
            if pr.state == ::raft::ProgressState::Probe {
                pr.pause();
            }
        }

        // Get the `Ready` with `RawNode::ready` interface.
        if !self.raw_node.has_ready() {
            return;
        }

        let mut ready: raft::Ready = self.raw_node.ready();

        // Apply soft state changes before anything else, so that this info is
        // awailable for other fibers as soon as main loop yields.
        if let Some(ss) = ready.ss() {
            self.status
                .send_modify(|s| {
                    s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
                    s.raft_state = ss.raft_state.into();
                })
                .expect("status shouldn't ever be borrowed across yields");
        }

        // Send out messages to the other nodes.
        self.handle_messages(ready.take_messages());
        // Handle read states before applying snapshot which may fail.
        self.handle_read_states(ready.read_states());

        // This is a snapshot, we need to apply the snapshot at first.
        let snapshot = ready.snapshot();
        let snapshot_data = (|| -> Option<SnapshotData> {
            if snapshot.is_empty() {
                return None;
            }
            let snapshot_data = crate::unwrap_ok_or!(
                SnapshotData::decode(snapshot.get_data()),
                Err(e) => {
                    tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
                    return None;
                }
            );

            let v_snapshot = snapshot_data.schema_version;
            loop {
                let v_local = local_schema_version().expect("storage souldn't fail");
                let v_global = self
                    .storage
                    .properties
                    .global_schema_version()
                    .expect("storage shouldn't fail");
                assert!(
                    v_global <= v_local,
                    "global schema version is only ever increased after local"
                assert!(
                    v_global <= v_snapshot,
                    "global schema version updates are distributed via raft"
                );

                if v_local > v_snapshot {
                    tlog!(
                        Warning,
                        "skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
                        v_local,
                        snapshot_data.schema_version,
                    );
                    return None;
                if !self.is_readonly() {
                    // Replicaset leader applies the schema changes directly.
                    return Some(snapshot_data);
                }

                if v_local == v_snapshot {
                    // Replicaset follower has synced schema with the leader,
                    // now global space dumps should be handled.
                    return Some(snapshot_data);
                }

                self.main_loop_status("awaiting replication");
                // Replicaset follower needs to sync with leader via tarantool
                // replication.
                let timeout = MainLoop::TICK * 4;
                fiber::sleep(timeout);
        if let Some(mut snapshot_data) = snapshot_data {
            if snapshot_data.next_chunk_position.is_some() {
                self.main_loop_status("receiving snapshot");
                let entry_id = RaftEntryId {
                    index: snapshot.get_metadata().index,
                    term: snapshot.get_metadata().term,
                };
                if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
                    // Error has been logged.
                    _ = e;
                    tlog!(Warning, "dropping snapshot data");
                    return;
                }
            }

            self.main_loop_status("applying snapshot");

            if let Err(e) = transaction(|| -> traft::Result<()> {
                let meta = snapshot.get_metadata();
                self.raft_storage.handle_snapshot_metadata(meta)?;
                // FIXME: apply_snapshot_data calls truncate on clusterwide
                // spaces and even though they're all local spaces doing
                // truncate on them is not allowed on read_only instances.
                // Related issue in tarantool:
                // https://github.com/tarantool/tarantool/issues/5616
                let is_readonly = self.is_readonly();
                if is_readonly {
                    crate::tarantool::eval("box.cfg { read_only = false }")?;
                }
                let res = self
                    .storage
                    .apply_snapshot_data(&snapshot_data, !is_readonly);
                if is_readonly {
                    crate::tarantool::exec("box.cfg { read_only = true }")?;
                }
                #[allow(clippy::let_unit_value)]
                let _ = res?;

                // TODO: As long as the snapshot was sent to us in response to
                // a rejected MsgAppend (which is the only possible case
                // currently), we will send a MsgAppendResponse back which will
                // automatically reset our status from Snapshot to Replicate.
                // But when we implement support for manual snapshot requests,
                // we will have to also implement sending a MsgSnapStatus,
                // to reset out status explicitly to avoid leader ignoring us
                // indefinitely after that point.
                Ok(())
            }) {
                tlog!(Warning, "dropping raft ready: {ready:#?}");
                panic!("transaction failed: {e}, {}", TarantoolError::last());
            }
        let res = self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled);
        if let Err(e) = res {
            tlog!(Warning, "dropping raft ready: {ready:#?}");
            panic!("transaction failed: {e}, {}", TarantoolError::last());
        }
        if let Err(e) = transaction(|| -> Result<(), &str> {
            if !ready.entries().is_empty() || ready.hs().is_some() {
                self.main_loop_status("persisting entries");
            }

            // Persist uncommitted entries in the raft log.
            self.raft_storage.persist_entries(ready.entries()).unwrap();
            // Raft HardState changed, and we need to persist it.
            if let Some(hs) = ready.hs() {
                self.raft_storage.persist_hard_state(hs).unwrap();
                self.status
                    .send_modify(|s| s.term = hs.term)
                    .expect("status shouldn't ever be borrowed across yields");
            tlog!(Warning, "dropping raft ready: {ready:#?}");
            panic!("transaction failed: {e}");
        // This bunch of messages is special. It must be sent only
        // AFTER the HardState, Entries and Snapshot are persisted
        // to the stable storage.
        self.handle_messages(ready.take_persisted_messages());

        // Advance the Raft.
        let mut light_rd = self.raw_node.advance(ready);

        // Send out messages to the other nodes.
        self.handle_messages(light_rd.take_messages());

        // Update commit index.
        if let Some(commit) = light_rd.commit_index() {
            self.raft_storage.persist_commit(commit).unwrap();
        }
        let res =
            self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled);
            tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
            panic!("transaction failed: {e}, {}", TarantoolError::last());

        // Advance the apply index.
        self.raw_node.advance_apply();

        self.main_loop_status("idle");
    #[allow(dead_code)]
    fn check_vclock_and_sleep(&mut self) -> traft::Result<()> {
        assert!(self.raw_node.raft.state != RaftStateRole::Leader);
        let my_id = self.raw_node.raft.id;

        let my_instance_info = self.storage.instances.get(&my_id)?;
        let replicaset_id = my_instance_info.replicaset_id;
        let replicaset = self.storage.replicasets.get(&replicaset_id)?;
        let replicaset = replicaset.ok_or_else(|| {
            Error::other(format!("replicaset info for id {replicaset_id} not found"))
        })?;
        if replicaset.master_id == my_instance_info.instance_id {
            return Err(Error::other(
                "check_vclock_and_sleep called on replicaset master",
            ));
        let master = self.storage.instances.get(&replicaset.master_id)?;
        let master_vclock = fiber::block_on(sync::call_get_vclock(&self.pool, &master.raft_id))?;
        let local_vclock = Vclock::current();
        if matches!(
            local_vclock.partial_cmp(&master_vclock),
            None | Some(Ordering::Less)
        ) {
            tlog!(Info, "blocking raft loop until replication progresses";
                "master_vclock" => ?master_vclock,
                "local_vclock" => ?local_vclock,
    /// Check if this is a read only replica. This function is called when we
    /// need to determine if this instance should be changing the schema
    /// definition or if it should instead synchronize with a master.
    ///
    /// Note: it would be a little more reliable to check if the replica is
    /// chosen to be a master by checking master_id in _pico_replicaset, but
    /// currently we cannot do that, because tarantool replication is being
    /// done asynchronously with raft log replication. Basically instance needs
    /// to know it's a replicaset master before it can access the replicaset
    /// info.
    fn is_readonly(&self) -> bool {
        let is_ro: bool = crate::tarantool::eval("return box.info.ro")
            .expect("checking read-onlyness should never fail");
        is_ro
    #[inline]
    fn cleanup_notifications(&mut self) {
        self.notifications.retain(|_, notify| !notify.is_closed());
    }

    /// Generates a pair of logical clock and a notification channel.
    /// Logical clock is a unique identifier suitable for tagging
    /// entries in raft log. Notification is broadcasted when the
    /// corresponding entry is committed.
    #[inline]
    fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
        let (tx, rx) = notification();
        let lc = {
            self.lc.inc();
            self.lc
        };
        self.notifications.insert(lc, tx);
        (lc, rx)
    }
/// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be
/// done as result of attempting to apply a given entry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyEntryResult {
    /// This entry failed to apply for some reason, and must be retried later.
    SleepAndRetry,

    /// Entry applied successfully, proceed to next entry.
    EntryApplied,
}

    _loop: Option<fiber::JoinHandle<'static, ()>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    stop_flag: Rc<Cell<bool>>,
}

struct MainLoopState {
    node_impl: Rc<Mutex<NodeImpl>>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    next_tick: Instant,
    loop_waker: watch::Receiver<()>,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    stop_flag: Rc<Cell<bool>>,
}

impl MainLoop {
    pub const TICK: Duration = Duration::from_millis(100);

    fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
        let (loop_waker_tx, loop_waker_rx) = watch::channel(());
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        let stop_flag: Rc<Cell<bool>> = Default::default();
        let state = MainLoopState {
            node_impl,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            next_tick: Instant::now(),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            stop_flag: stop_flag.clone(),
        };

        Self {
            // implicit yield
            _loop: loop_start!("raft_main_loop", Self::iter_fn, state),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            stop_flag,
        }
    }

    pub fn wakeup(&self) {
        let _ = self.loop_waker.send(());
    async fn iter_fn(state: &mut MainLoopState) -> FlowControl {
        let _ = state.loop_waker.changed().timeout(Self::TICK).await;
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return FlowControl::Break;
        }

        // FIXME: potential deadlock - can't use sync mutex in async fn
        let mut node_impl = state.node_impl.lock(); // yields
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return FlowControl::Break;
        }
        node_impl.cleanup_notifications();
        let now = Instant::now();
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if now > state.next_tick {
            state.next_tick = now.saturating_add(Self::TICK);
            node_impl.raw_node.tick();
        let mut wake_governor = false;
        let mut expelled = false;
        node_impl.advance(&mut wake_governor, &mut expelled); // yields
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        if state.stop_flag.take() {
            return FlowControl::Break;
        }
        if expelled {
            crate::tarantool::exit(0);
        }

        if wake_governor {
            if let Err(e) = async { global()?.governor_loop.wakeup() }.await {
                tlog!(Warning, "failed waking up governor: {e}");
            }
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed

        FlowControl::Continue
    }
}

impl Drop for MainLoop {
    fn drop(&mut self) {
        self.stop_flag.set(true);
        let _ = self.loop_waker.send(());
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        self._loop.take().unwrap().join(); // yields
static mut RAFT_NODE: Option<Box<Node>> = None;

pub fn set_global(node: Node) {
    unsafe {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        assert!(
            RAFT_NODE.is_none(),
            "discovery::set_global() called twice, it's a leak"
        );
pub fn global() -> traft::Result<&'static Node> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    // Uninitialized raft node is a regular case. This case may take
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}

#[proc(packed_args)]
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
    for pb in pbs {
        node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);