Skip to content
Snippets Groups Projects
node.rs 63.9 KiB
Newer Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.

use crate::has_grades;
use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::r#loop::FlowControl;
use crate::reachability::InstanceReachabilityManager;
use crate::schema::{Distribution, IndexDef, SpaceDef};
use crate::storage::acl;
use crate::storage::ddl_meta_drop_space;
use crate::storage::SnapshotData;
use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
Georgy Moshkin's avatar
Georgy Moshkin committed
use crate::storage::{local_schema_version, set_local_schema_version};
use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
use crate::sync;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::network::WorkerOptions;
use crate::traft::notify::{notification, Notifier, Notify};
Georgy Moshkin's avatar
Georgy Moshkin committed
use crate::traft::op::{Acl, Ddl, Dml, Op, OpResult};
use crate::traft::ConnectionPool;
use crate::traft::ContextCoercion as _;
use crate::traft::LogicalClock;
use crate::traft::RaftEntryId;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::unwrap_ok_or;
use crate::unwrap_some_or;
use crate::util::AnyWithTypeName;
use crate::warn_or_panic;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::TarantoolError;
use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::index::FieldType as IFT;
use ::tarantool::index::Part;
use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::time::Instant;
use ::tarantool::tlua;
use ::tarantool::transaction::transaction;
use ::tarantool::tuple::Decode;
use ::tarantool::vclock::Vclock;
use protobuf::Message as _;
use std::cell::Cell;
use std::cmp::Ordering;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
type RawNode = raft::RawNode<RaftSpaceAccess>;
::tarantool::define_str_enum! {
    pub enum RaftState {
        Follower = "Follower",
        Candidate = "Candidate",
        Leader = "Leader",
        PreCandidate = "PreCandidate",
    }
}

impl RaftState {
    pub fn is_leader(&self) -> bool {
        matches!(self, Self::Leader)
    }
}

impl From<RaftStateRole> for RaftState {
    fn from(role: RaftStateRole) -> Self {
        match role {
            RaftStateRole::Follower => Self::Follower,
            RaftStateRole::Candidate => Self::Candidate,
            RaftStateRole::Leader => Self::Leader,
            RaftStateRole::PreCandidate => Self::PreCandidate,
        }
    }
}

#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
    /// `raft_id` of the current instance
    /// `raft_id` of the leader instance
    pub leader_id: Option<RaftId>,
    /// Current term number
    pub term: RaftTerm,
    /// Current raft state
    pub raft_state: RaftState,
    /// Current state of the main loop.
    ///
    /// Set this before yielding from [`NodeImpl::advance`].
    pub main_loop_status: &'static str,
    pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
        if requested_term != self.term {
            return Err(Error::TermMismatch {
                requested: requested_term,
                current: self.term,
            });
        }
        Ok(())
    }
}

/// The heart of `traft` module - the Node.
pub struct Node {
    /// RaftId of the Node.
    //
    // It appears twice in the Node: here and in `status.id`.
    // This is a concious decision.
    // `self.raft_id()` is used in Rust API, and
    // `self.status()` is mostly useful in Lua API.
    node_impl: Rc<Mutex<NodeImpl>>,
    pub(crate) storage: Clusterwide,
    pub(crate) raft_storage: RaftSpaceAccess,
    pub(crate) governor_loop: governor::Loop,
    pub(crate) sentinel_loop: sentinel::Loop,
    status: watch::Receiver<Status>,

    /// Should be locked during join and update instance request
    /// to avoid costly cas conflicts during concurrent requests.
    pub instances_update: Mutex<()>,
impl std::fmt::Debug for Node {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("Node")
            .field("raft_id", &self.raft_id)
            .finish_non_exhaustive()
    }
    /// Initialize the raft node.
    /// **This function yields**
    pub fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> {
        let opts = WorkerOptions {
            raft_msg_handler: stringify_cfunc!(proc_raft_interact),
            call_timeout: MainLoop::TICK.saturating_mul(4),
            ..Default::default()
        };
        let mut pool = ConnectionPool::new(storage.clone(), opts);
        let instance_reachability = Rc::new(RefCell::new(InstanceReachabilityManager::new(
            storage.clone(),
        )));
        pool.instance_reachability = instance_reachability.clone();
        let node_impl = NodeImpl::new(pool.clone(), storage.clone(), raft_storage.clone())?;
        let raft_id = node_impl.raft_id();
        let status = node_impl.status.subscribe();
        let node_impl = Rc::new(Mutex::new(node_impl));

        let node = Node {
            main_loop: MainLoop::start(node_impl.clone()), // yields
            governor_loop: governor::Loop::start(
                pool.clone(),
                status.clone(),
                storage.clone(),
                raft_storage.clone(),
            ),
            sentinel_loop: sentinel::Loop::start(
                status.clone(),
                storage.clone(),
                raft_storage.clone(),
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            node_impl,
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            status,
            instances_update: Mutex::new(()),
        };

        // Wait for the node to enter the main loop
        Ok(node)
    pub fn raft_id(&self) -> RaftId {
    pub fn status(&self) -> Status {
        self.status.get()
    pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {
        self.node_impl.lock()
    }

    /// Wait for the status to be changed.
    /// **This function yields**
    pub fn wait_status(&self) {
        fiber::block_on(self.status.clone().changed()).unwrap();
    /// Returns current applied [`RaftIndex`].
    pub fn get_index(&self) -> RaftIndex {
        self.raft_storage
            .applied()
            .expect("reading from memtx should never fail")
    }

    /// Performs the quorum read operation.
    ///
    /// If works the following way:
    ///
    /// 1. The instance forwards a request (`MsgReadIndex`) to a raft
    ///    leader. In case there's no leader at the moment, the function
    ///    returns `Err(ProposalDropped)`.
    /// 2. Raft leader tracks its `commit_index` and broadcasts a
    ///    heartbeat to followers to make certain that it's still a
    ///    leader.
    /// 3. As soon as the heartbeat is acknowlenged by the quorum, the
    ///    function returns that index.
    /// 4. The instance awaits when the index is applied. If timeout
    ///    expires beforehand, the function returns `Err(Timeout)`.
    ///
    /// Returns current applied [`RaftIndex`].
    ///
    /// **This function yields**
    pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> {
        let deadline = fiber::clock().saturating_add(timeout);

        let notify = self.raw_operation(|node_impl| node_impl.read_index_async())?;
        let index: RaftIndex = fiber::block_on(notify.recv_timeout(timeout))?;

        self.wait_index(index, deadline.duration_since(fiber::clock()))
    }

    /// Waits for [`RaftIndex`] to be applied to the storage locally.
    ///
    /// Returns current applied [`RaftIndex`]. It can be equal to or
    /// greater than the target one. If timeout expires beforehand, the
    /// function returns `Err(Timeout)`.
    ///
    /// **This function yields**
    #[inline]
    pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
        let deadline = fiber::clock().saturating_add(timeout);
        loop {
            let current = self.get_index();
            if current >= target {
                return Ok(current);
            }

            if event::wait_deadline(event::Event::EntryApplied, deadline)?.is_timeout() {
                return Err(Error::Timeout);
            }
        }
    /// Propose an operation and wait for it's result.
    /// **This function yields**
    pub fn propose_and_wait<T: OpResult + Into<Op>>(
        &self,
        op: T,
        timeout: Duration,
        let notify = self.raw_operation(|node_impl| node_impl.propose_async(op))?;
        fiber::block_on(notify.recv_timeout::<T::Result>(timeout))
    /// Become a candidate and wait for a main loop round so that there's a
    /// chance we become the leader.
    /// **This function yields**
    pub fn campaign_and_yield(&self) -> traft::Result<()> {
        self.raw_operation(|node_impl| node_impl.campaign())?;
        // Even though we don't expect a response, we still should let the
        // main_loop do an iteration. Without rescheduling, the Ready state
        // wouldn't be processed, the Status wouldn't be updated, and some
        // assertions may fail (e.g. in `postjoin()` in `main.rs`).
        fiber::reschedule();
        Ok(())
    /// **This function yields**
    pub fn step_and_yield(&self, msg: raft::Message) {
        self.raw_operation(|node_impl| node_impl.step(msg))
            .map_err(|e| tlog!(Error, "{e}"))
            .ok();
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    /// **This function yields**
    pub fn tick_and_yield(&self, n_times: u32) {
        self.raw_operation(|node_impl| node_impl.tick(n_times));
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
        fiber::reschedule();
    /// **This function yields**
    pub fn timeout_now(&self) {
        let raft_id = self.raft_id();
        self.step_and_yield(raft::Message {
            to: raft_id,
            from: raft_id,
            msg_type: raft::MessageType::MsgTimeoutNow,
            ..Default::default()
        })
    /// Only the conf_change_loop on a leader is eligible to call this function.
    ///
    /// **This function yields**
    pub(crate) fn propose_conf_change_and_wait(
        &self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
        let notify =
            self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
        fiber::block_on(notify).unwrap()?;
        Ok(())
    /// Attempt to transfer leadership to a given node and yield.
    ///
    /// **This function yields**
    pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
        self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));
        fiber::reschedule();
    }

    /// This function **may yield** if `self.node_impl` mutex is acquired.
    fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
        let mut node_impl = self.node_impl.lock();
        let res = f(&mut node_impl);
        drop(node_impl);
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        self.main_loop.wakeup();
    #[inline]
    pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
        self.raft_storage.all_traft_entries()
pub(crate) struct NodeImpl {
    pub notifications: HashMap<LogicalClock, Notifier>,
    joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
    storage: Clusterwide,
    raft_storage: RaftSpaceAccess,
    status: watch::Sender<Status>,
    instance_reachability: Rc<RefCell<InstanceReachabilityManager>>,
impl NodeImpl {
        storage: Clusterwide,
        raft_storage: RaftSpaceAccess,
    ) -> Result<Self, RaftError> {
        let box_err = |e| StorageError::Other(Box::new(e));

        let raft_id: RaftId = raft_storage
            .raft_id()
            .map_err(box_err)?
            .expect("raft_id should be set by the time the node is being initialized");
        let applied: RaftIndex = raft_storage.applied().map_err(box_err)?;
            let gen = raft_storage.gen().unwrap() + 1;
            raft_storage.persist_gen(gen).unwrap();
            LogicalClock::new(raft_id, gen)
        };

        let cfg = raft::Config {
            id: raft_id,
            applied,
            pre_vote: true,
            ..Default::default()
        };

        let raw_node = RawNode::new(&cfg, raft_storage.clone(), &tlog::root())?;
        let (status, _) = watch::channel(Status {
            id: raft_id,
            leader_id: None,
            term: traft::INIT_RAFT_TERM,
            raft_state: RaftState::Follower,
            main_loop_status: "idle",
        Ok(Self {
            raw_node,
            notifications: Default::default(),
            joint_state_latch: KVCell::new(),
            instance_reachability: pool.instance_reachability.clone(),
        })
    }

    fn raft_id(&self) -> RaftId {
        self.raw_node.raft.id
    }

    pub fn read_index_async(&mut self) -> Result<Notify, RaftError> {
        // In some states `raft-rs` ignores the ReadIndex request.
        // Check it preliminary, don't wait for the timeout.
        //
        // See for details:
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
        // - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
        let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
        let term_just_started = // ...
            self.raw_node.raft.state == RaftStateRole::Leader
            && !self.raw_node.raft.commit_to_current_term();

        if leader_doesnt_exist || term_just_started {
            return Err(RaftError::ProposalDropped);
        }

        let (lc, notify) = self.schedule_notification();
        // read_index puts this context into an Entry,
        // so we've got to compose full EntryContext,
        // despite single LogicalClock would be enough
        let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
        self.raw_node.read_index(ctx.to_bytes());
        Ok(notify)
    }

    /// **Doesn't yield**
    #[inline]
    // TODO: rename and document
    pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
    where
        T: Into<Op>,
    {
        let (lc, notify) = self.schedule_notification();
        let ctx = traft::EntryContextNormal::new(lc, op.into());
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        Ok(notify)
    }

    /// Proposes a raft entry to be appended to the log and returns raft index
    /// at which it is expected to be committed unless it gets rejected.
    ///
    /// **Doesn't yield**
    pub fn propose(&mut self, op: Op) -> Result<RaftIndex, RaftError> {
        self.lc.inc();
        let ctx = traft::EntryContextNormal::new(self.lc, op);
        self.raw_node.propose(ctx.to_bytes(), vec![])?;
        let index = self.raw_node.raft.raft_log.last_index();
        Ok(index)
    }

    pub fn campaign(&mut self) -> Result<(), RaftError> {
        self.raw_node.campaign()
    }

    pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
        if msg.to != self.raft_id() {
            return Ok(());
        }

        // TODO check it's not a MsgPropose with op::Dml for updating _pico_instance.
        // TODO check it's not a MsgPropose with ConfChange.
        self.raw_node.step(msg)
    }

    pub fn tick(&mut self, n_times: u32) {
        for _ in 0..n_times {
            self.raw_node.tick();
        }
    }

    fn propose_conf_change_async(
        &mut self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
    ) -> Result<oneshot::Receiver<Result<(), RaftError>>, RaftError> {
        // In some states proposing a ConfChange is impossible.
        // Check if there's a reason to reject it.

        // Checking leadership is only needed for the
        // correct latch management. It doesn't affect
        // raft correctness. Checking the instance is a
        // leader makes sure the proposed `ConfChange`
        // is appended to the raft log immediately
        // instead of sending `MsgPropose` over the
        // network.
        if self.raw_node.raft.state != RaftStateRole::Leader {
            return Err(RaftError::ConfChangeError("not a leader".into()));
        }
        if term != self.raw_node.raft.term {
            return Err(RaftError::ConfChangeError("raft term mismatch".into()));
        }
        // Without this check the node would silently ignore the conf change.
        // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
        if self.raw_node.raft.has_pending_conf() {
            return Err(RaftError::ConfChangeError(
                "already has pending confchange".into(),
            ));
        }

        let prev_index = self.raw_node.raft.raft_log.last_index();
        self.raw_node.propose_conf_change(vec![], conf_change)?;

        // Ensure the ConfChange was actually appended to the log.
        // Otherwise it's a problem: current instance isn't actually a
        // leader (which is impossible in theory, but we're not sure in
        // practice) and sent the message to the raft network. It may
        // lead to an inconsistency.
        let last_index = self.raw_node.raft.raft_log.last_index();
        assert_eq!(last_index, prev_index + 1);

        if !self.joint_state_latch.is_empty() {
            warn_or_panic!("joint state latch is locked");
        }

        let (tx, rx) = oneshot::channel();
        self.joint_state_latch.insert(last_index, tx);
        event::broadcast(Event::JointStateEnter);

    /// Is called during a transaction
    fn handle_committed_entries(
        &mut self,
        entries: &[raft::Entry],
    ) -> traft::Result<()> {
        let mut entries = entries.iter().peekable();

        while let Some(&entry) = entries.peek() {
            let entry = match traft::Entry::try_from(entry) {
                Ok(v) => v,
                Err(e) => {
                    tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
            let mut apply_entry_result = EntryApplied;
            transaction(|| -> tarantool::Result<()> {
                self.main_loop_status("handling committed entries");

                let entry_index = entry.index;
                match entry.entry_type {
                    raft::EntryType::EntryNormal => {
                        apply_entry_result =
                            self.handle_committed_normal_entry(entry, wake_governor, expelled);
                        if apply_entry_result != EntryApplied {
                            return Ok(());
                        }
                    }
                    raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
                        self.handle_committed_conf_change(entry)
                    }
                let res = self.raft_storage.persist_applied(entry_index);
                event::broadcast(Event::EntryApplied);
                if let Err(e) = res {
                    tlog!(
                        Error,
                        "error persisting applied index: {e}";
                        "index" => entry_index
                    );
                }

                Ok(())
            })?;

            match apply_entry_result {
                SleepAndRetry => {
                    self.main_loop_status("blocked by raft entry");
                    continue;
                }
                EntryApplied => {
                    // Actually advance the iterator.
                    let _ = entries.next();
    }

    /// Is called during a transaction
    fn handle_committed_normal_entry(
        &mut self,
        entry: traft::Entry,
        assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
        let lc = entry.lc();
        let index = entry.index;
        let op = entry.into_op().unwrap_or(Op::Nop);
        tlog!(Debug, "applying entry: {op}"; "index" => index);
                let space = op.space();
                if space == ClusterwideSpace::Property.id()
                    || space == ClusterwideSpace::Replicaset.id()
                } else if space == ClusterwideSpace::Instance.id() {
                    *wake_governor = true;
                    let instance = match op {
                        Dml::Insert { tuple, .. } => Some(tuple),
                        Dml::Replace { tuple, .. } => Some(tuple),
                        Dml::Update { .. } => None,
                        Dml::Delete { .. } => None,
                    };
                    if let Some(instance) = instance {
                        let instance: Instance = rmp_serde::from_slice(instance.as_ref())
                            .expect("should be a valid instance tuple");
                        if has_grades!(instance, Expelled -> *)
                            && instance.raft_id == self.raft_id()
                        {
                            // cannot exit during a transaction
                            *expelled = true;
                        }
                    }
            Op::DdlPrepare { .. } => {
                *wake_governor = true;
            }
        let storage_properties = &self.storage.properties;

        let mut result = Box::new(()) as Box<dyn AnyWithTypeName>;
        match op {
            Op::Nop => {}
            Op::Dml(op) => {
                let res = match &op {
                    Dml::Insert { table, tuple } => self.storage.insert(*table, tuple).map(Some),
                    Dml::Replace { table, tuple } => self.storage.replace(*table, tuple).map(Some),
                    Dml::Update { table, key, ops } => self.storage.update(*table, key, ops),
                    Dml::Delete { table, key } => self.storage.delete(*table, key),
                };
                result = Box::new(res) as _;
            }
            Op::DdlPrepare {
                ddl,
                schema_version,
            } => {
                self.apply_op_ddl_prepare(ddl, schema_version)
                    .expect("storage should not fail");
                let v_local = local_schema_version().expect("storage should not fail");
                let v_pending = storage_properties
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");
                let ddl = storage_properties
                    .pending_schema_change()
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");

                // This instance is catching up to the cluster.
                if v_local < v_pending {
                    if self.is_readonly() {
                        return SleepAndRetry;
                        // Master applies schema change at this point.
                        let res = rpc::ddl_apply::apply_schema_change(
                        );
                        match res {
                            Err(rpc::ddl_apply::Error::Other(err)) => {
                                panic!("storage should not fail, but failed with: {err}")
                            }
                            Err(rpc::ddl_apply::Error::Aborted(reason)) => {
                                tlog!(Warning, "failed applying committed ddl operation: {reason}";
                                    "ddl" => ?ddl,
                                );
                                return SleepAndRetry;
                            }
                // Update pico metadata.
                match ddl {
                    Ddl::CreateSpace { id, .. } => {
                        ddl_meta_space_update_operable(&self.storage, id, true)
                            .expect("storage shouldn't fail");
                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                    _ => {
                        todo!()
                    }
                }

                storage_properties
                    .delete(PropertyName::PendingSchemaChange)
                    .expect("storage should not fail");
                storage_properties
                    .delete(PropertyName::PendingSchemaVersion)
                    .expect("storage should not fail");
                    .put(PropertyName::GlobalSchemaVersion, &v_pending)
                    .expect("storage should not fail");
                let v_local = local_schema_version().expect("storage should not fail");
                let v_pending: u64 = storage_properties
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");
                let ddl = storage_properties
                    .pending_schema_change()
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");
                // This condition means, schema versions must always increase
                // even after an DdlAbort
                if v_local == v_pending {
                    if self.is_readonly() {
                        return SleepAndRetry;
                    } else {
                        let v_global = storage_properties
                            .global_schema_version()
                            .expect("storage should not fail");
                        ddl_abort_on_master(&ddl, v_global).expect("storage should not fail");
                // Update pico metadata.
                match ddl {
                    Ddl::CreateSpace { id, .. } => {
                        ddl_meta_drop_space(&self.storage, id).expect("storage shouldn't fail");
                        ddl_meta_space_update_operable(&self.storage, id, true)
                            .expect("storage shouldn't fail");
                    _ => {
                        todo!()
                    }
                }

                storage_properties
                    .delete(PropertyName::PendingSchemaChange)
                    .expect("storage should not fail");
                storage_properties
                    .delete(PropertyName::PendingSchemaVersion)
                    .expect("storage should not fail");
            Op::Acl(acl) => {
                let v_local = local_schema_version().expect("storage shoudl not fail");
                let v_pending = acl.schema_version();
Georgy Moshkin's avatar
Georgy Moshkin committed
                if v_local < v_pending {
                    if self.is_readonly() {
                        // Wait for tarantool replication with master to progress.
                        return SleepAndRetry;
                    } else {
                        match &acl {
                            Acl::CreateUser { user_def } => {
                                acl::on_master_create_user(user_def)
                                    .expect("creating user shouldn't fail");
                            }
Georgy Moshkin's avatar
Georgy Moshkin committed
                            Acl::ChangeAuth { user_id, auth, .. } => {
                                acl::on_master_change_user_auth(*user_id, auth)
Georgy Moshkin's avatar
Georgy Moshkin committed
                                    .expect("changing user auth shouldn't fail");
                            }
                            Acl::DropUser { user_id, .. } => {
                                acl::on_master_drop_user(*user_id)
Georgy Moshkin's avatar
Georgy Moshkin committed
                                    .expect("droping user shouldn't fail");
                            Acl::CreateRole { role_def } => {
                                acl::on_master_create_role(role_def)
                                    .expect("creating role shouldn't fail");
                            }
                            Acl::DropRole { role_id, .. } => {
                                acl::on_master_drop_role(*role_id)
                                    .expect("droping role shouldn't fail");
                            }
                            Acl::GrantPrivilege { priv_def } => {
                                acl::on_master_grant_privilege(priv_def)
                                    .expect("granting a privilege shouldn't fail");
                            }
                            Acl::RevokePrivilege { priv_def } => {
                                acl::on_master_revoke_privilege(priv_def)
                                    .expect("revoking a privilege shouldn't fail");
                            }
                        }
                        set_local_schema_version(v_pending).expect("storage should not fail");
                match &acl {
                    Acl::CreateUser { user_def } => {
                        acl::global_create_user(&self.storage, user_def)
                            .expect("persisting a user definition shouldn't fail");
                    }
Georgy Moshkin's avatar
Georgy Moshkin committed
                    Acl::ChangeAuth { user_id, auth, .. } => {
                        acl::global_change_user_auth(&self.storage, *user_id, auth)
Georgy Moshkin's avatar
Georgy Moshkin committed
                            .expect("changing user definition shouldn't fail");
                    }
                    Acl::DropUser { user_id, .. } => {
                        acl::global_drop_user(&self.storage, *user_id)
                            .expect("droping a user definition shouldn't fail");
                    }
                    Acl::CreateRole { role_def } => {
                        acl::global_create_role(&self.storage, role_def)
                            .expect("persisting a role definition shouldn't fail");
                    }
                    Acl::DropRole { role_id, .. } => {
                        acl::global_drop_role(&self.storage, *role_id)
                            .expect("droping a role definition shouldn't fail");
                    }
                    Acl::GrantPrivilege { priv_def } => {
                        acl::global_grant_privilege(&self.storage, priv_def)
                            .expect("persiting a privilege definition shouldn't fail");
                    }
                    Acl::RevokePrivilege { priv_def } => {
                        acl::global_revoke_privilege(&self.storage, priv_def)
                            .expect("removing a privilege definition shouldn't fail");
Georgy Moshkin's avatar
Georgy Moshkin committed
                    }
                }

                storage_properties
                    .put(PropertyName::GlobalSchemaVersion, &v_pending)
                    .expect("storage should not fail");
Georgy Moshkin's avatar
Georgy Moshkin committed
                storage_properties
                    .put(PropertyName::NextSchemaVersion, &(v_pending + 1))
                    .expect("storage should not fail");

        if let Some(lc) = &lc {
            if let Some(notify) = self.notifications.remove(lc) {
                notify.notify_ok_any(result);
            }
        }

        if let Some(notify) = self.joint_state_latch.take_or_keep(&index) {
            // It was expected to be a ConfChange entry, but it's
            // normal. Raft must have overriden it, or there was
            // a re-election.
            let e = RaftError::ConfChangeError("rolled back".into());
            let _ = notify.send(Err(e));
            event::broadcast(Event::JointStateDrop);
        }
    fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> {
        debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });

        match ddl.clone() {
            Ddl::CreateSpace {
                id,
                name,
                use ::tarantool::util::NumOrStr::*;

                let mut last_pk_part_index = 0;
                for pk_part in &mut primary_key {
                    let (index, field) = match &pk_part.field {
                        Num(index) => {
                            if *index as usize >= format.len() {
                                // Ddl prepare operations should be verified before being proposed,
                                // so this shouldn't ever happen. But ignoring this is safe anyway,
                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
                                tlog!(
                                    Warning,
                                    "invalid primary key part: field index {index} is out of bound"
                                );
                                continue;
                            }
                            (*index, &format[*index as usize])
                        }
                        Str(name) => {
                            let field_index = format.iter().zip(0..).find(|(f, _)| f.name == *name);
                            let Some((field, index)) = field_index else {
                                // Ddl prepare operations should be verified before being proposed,
                                // so this shouldn't ever happen. But ignoring this is safe anyway,
                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
Georgy Moshkin's avatar
Georgy Moshkin committed
                                tlog!(
                                    Warning,
                                    "invalid primary key part: field '{name}' not found"
                                );
                                continue;
                            };
                            // We store all index parts as field indexes.
                            pk_part.field = Num(index);
                            (index, field)
                        }
                    };
                    let Some(field_type) =
Georgy Moshkin's avatar
Georgy Moshkin committed
                        crate::schema::try_space_field_type_to_index_field_type(field.field_type)
                    else {
                        // Ddl prepare operations should be verified before being proposed,
                        // so this shouldn't ever happen. But ignoring this is safe anyway,
                        // because proc_apply_schema_change will catch the error and ddl will be aborted.
Georgy Moshkin's avatar
Georgy Moshkin committed
                        tlog!(
                            Warning,
                            "invalid primary key part: field type {} cannot be part of an index",
                            field.field_type
                        );
                        continue;
                    };
                    // We overwrite the one provided in the request because
                    // there's no reason for it to be there, we know the type
                    // right here.
                    pk_part.r#type = Some(field_type);
                    pk_part.is_nullable = Some(field.is_nullable);
                    last_pk_part_index = last_pk_part_index.max(index);
                }
                    id: 0,
                    name: "primary_key".into(),
                    space_id: id,
                    schema_version,
                    parts: primary_key,
                    operable: false,
                    // TODO: support other cases
                    unique: true,
                let res = self.storage.indexes.insert(&primary_key_def);
                if let Err(e) = res {
                    // Ignore the error for now, let governor deal with it.
                    tlog!(
                        Warning,