Skip to content
Snippets Groups Projects 93.4 KiB
Newer Older
//! This module incapsulates most of the application-specific logics.
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.

use crate::access_control::user_by_id;
use crate::config::PicodataConfig;
use crate::has_states;
use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::proc_name;
use crate::reachability::instance_reachability_manager;
use crate::reachability::InstanceReachabilityManagerRef;
use crate::rpc::snapshot::proc_raft_snapshot_next_chunk;
use crate::schema::RoutineDef;
use crate::schema::RoutineKind;
use crate::schema::SchemaObjectType;
use crate::schema::{Distribution, IndexDef, IndexOption, TableDef};
use crate::storage::acl;
use crate::storage::ddl_meta_drop_routine;
use crate::storage::ddl_meta_drop_space;
use crate::storage::space_by_id;
use crate::storage::SnapshotData;
use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable};
Georgy Moshkin's avatar
Georgy Moshkin committed
use crate::storage::{local_schema_version, set_local_schema_version};
use crate::storage::{Clusterwide, ClusterwideTable, PropertyName};
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::network::WorkerOptions;
use crate::traft::op::PluginRaftOp;
use crate::traft::op::{Acl, Ddl, Dml, Op};
use crate::traft::ConnectionPool;
use crate::traft::LogicalClock;
use crate::traft::RaftEntryId;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::unwrap_ok_or;
use crate::unwrap_some_or;
use crate::warn_or_panic;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::BoxError;
use ::tarantool::error::TarantoolErrorCode;
use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::Error as TimeoutError;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::index::FieldType as IFT;
use ::tarantool::index::{IndexType, Part};
use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::time::Instant;
use ::tarantool::tlua;
use ::tarantool::transaction::transaction;
use ::tarantool::tuple::{Decode, Tuple};
use protobuf::Message as _;
godzie44's avatar
godzie44 committed
use crate::plugin::manager::PluginManager;
use crate::plugin::PluginAsyncEvent;
use std::cell::Cell;
use std::collections::HashMap;
use std::convert::TryFrom;
use std::ops::ControlFlow;
use std::rc::Rc;
use std::time::Duration;
type RawNode = raft::RawNode<RaftSpaceAccess>;
::tarantool::define_str_enum! {
    pub enum RaftState {
        Follower = "Follower",
        Candidate = "Candidate",
        Leader = "Leader",
        PreCandidate = "PreCandidate",

impl RaftState {
    pub fn is_leader(&self) -> bool {
        matches!(self, Self::Leader)

impl From<RaftStateRole> for RaftState {
    fn from(role: RaftStateRole) -> Self {
        match role {
            RaftStateRole::Follower => Self::Follower,
            RaftStateRole::Candidate => Self::Candidate,
            RaftStateRole::Leader => Self::Leader,
            RaftStateRole::PreCandidate => Self::PreCandidate,

#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
    /// `raft_id` of the current instance
    /// `raft_id` of the leader instance
    pub leader_id: Option<RaftId>,
    /// Current term number
    pub term: RaftTerm,
    /// Current raft state
    pub raft_state: RaftState,
    /// Current state of the main loop.
    /// Set this before yielding from [`NodeImpl::advance`].
    pub main_loop_status: &'static str,
    pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
        if requested_term != self.term {
            return Err(Error::TermMismatch {
                requested: requested_term,
                current: self.term,

/// The heart of `traft` module - the Node.
pub struct Node {
    /// RaftId of the Node.
    // It appears twice in the Node: here and in ``.
    // This is a concious decision.
    // `self.raft_id()` is used in Rust API, and
    // `self.status()` is mostly useful in Lua API.
    node_impl: Rc<Mutex<NodeImpl>>,
    pub(crate) storage: Clusterwide,
    pub(crate) raft_storage: RaftSpaceAccess,
    pub(crate) governor_loop: governor::Loop,
    pub(crate) sentinel_loop: sentinel::Loop,
    pub(crate) pool: Rc<ConnectionPool>,
    status: watch::Receiver<Status>,
    applied: watch::Receiver<RaftIndex>,

    /// Should be locked during join and update instance request
    /// to avoid costly cas conflicts during concurrent requests.
    pub instances_update: Mutex<()>,
godzie44's avatar
godzie44 committed
    /// Manage plugins loaded or must be loaded at this node.
    pub plugin_manager: Rc<PluginManager>,
impl std::fmt::Debug for Node {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
            .field("raft_id", &self.raft_id)
static mut RAFT_NODE: Option<Box<Node>> = None;

impl Node {
    /// Initialize the global raft node singleton. Returns a reference to it.
    /// Returns an error in case of storage failure.
    /// **This function yields**
    pub fn init(
        storage: Clusterwide,
        raft_storage: RaftSpaceAccess,
    ) -> Result<&'static Self, Error> {
        if unsafe { RAFT_NODE.is_some() } {
            return Err(Error::other("raft node is already initialized"));

            raft_msg_handler: proc_name!(proc_raft_interact),
            call_timeout: MainLoop::TICK.saturating_mul(4),
        let mut pool = ConnectionPool::new(storage.clone(), opts);
        let instance_reachability = instance_reachability_manager(storage.clone());
        pool.instance_reachability = instance_reachability.clone();
        let plugin_manager = Rc::new(PluginManager::new(storage.clone()));
godzie44's avatar
godzie44 committed
        let node_impl = NodeImpl::new(
        let raft_id = node_impl.raft_id();
        let status = node_impl.status.subscribe();
        let applied = node_impl.applied.subscribe();
        let node_impl = Rc::new(Mutex::new(node_impl));
        // Raft main loop accesses the global node refernce,
        // so it must be initilized before the main loop starts.
        let guard = crate::util::NoYieldsGuard::new();

        let node = Node {
            main_loop: MainLoop::start(node_impl.clone()),
            governor_loop: governor::Loop::start(
            sentinel_loop: sentinel::Loop::start(
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            instances_update: Mutex::new(()),
godzie44's avatar
godzie44 committed
        unsafe { RAFT_NODE = Some(Box::new(node)) };
        let node = global().expect("just initialized it");


        // Wait for the node to enter the main loop
    pub fn raft_id(&self) -> RaftId {
    pub fn status(&self) -> Status {
    pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {

    /// Wait for the status to be changed.
    /// **This function yields**
    pub fn wait_status(&self) {
    /// Returns current applied [`RaftIndex`].
    pub fn get_index(&self) -> RaftIndex {

    /// Performs the quorum read operation.
    /// If works the following way:
    /// 1. The instance forwards a request (`MsgReadIndex`) to a raft
    ///    leader. In case there's no leader at the moment, the function
    ///    returns `Err(ProposalDropped)`.
    /// 2. Raft leader tracks its `commit_index` and broadcasts a
    ///    heartbeat to followers to make certain that it's still a
    ///    leader.
    /// 3. As soon as the heartbeat is acknowlenged by the quorum, the
    ///    function returns that index.
    /// 4. The instance awaits when the index is applied. If timeout
    ///    expires beforehand, the function returns `Err(Timeout)`.
    /// Returns current applied [`RaftIndex`].
    /// **This function yields**
    pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> {
        let deadline = fiber::clock().saturating_add(timeout);
        let rx = self.raw_operation(|node_impl| node_impl.read_index_async())?;
        let index: RaftIndex = fiber::block_on(rx.timeout(timeout)).map_err(|_| Error::Timeout)?;
        self.wait_index(index, deadline.duration_since(fiber::clock()))

    /// Waits for [`RaftIndex`] to be applied to the storage locally.
    /// Returns current applied [`RaftIndex`]. It can be equal to or
    /// greater than the target one. If timeout expires beforehand, the
    /// function returns `Err(Timeout)`.
    /// **This function yields**
    // TODO: this should also take a term and return an error if the term
    // changes, because it means that leader has changed and the entry got
    // rolled back.
    pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
Georgy Moshkin's avatar
Georgy Moshkin committed
        tlog!(Debug, "waiting for applied index {target}");
        let mut applied = self.applied.clone();
        let deadline = fiber::clock().saturating_add(timeout);
        fiber::block_on(async {
            loop {
                let current = self.get_index();
                if current >= target {
                        "done waiting for applied index {target}, current: {current}"
                    return Ok(current);
                let timeout = deadline.duration_since(fiber::clock());
                let res = applied.changed().timeout(timeout).await;
                if let Err(TimeoutError::Expired) = res {
                        "failed waiting for applied index {target}: timeout, current: {current}"
                    return Err(Error::Timeout);
    /// Propose an operation and wait for it's result.
    /// **This function yields**
    pub fn propose_and_wait(&self, op: impl Into<Op>, timeout: Duration) -> traft::Result<()> {
        let entry_id = self.raw_operation(|node_impl| node_impl.propose_async(op))?;
        self.wait_index(entry_id.index, timeout)?;
        // TODO: check entry_id.term
    /// Become a candidate and wait for a main loop round so that there's a
    /// chance we become the leader.
    /// **This function yields**
    pub fn campaign_and_yield(&self) -> traft::Result<()> {
        self.raw_operation(|node_impl| node_impl.campaign())?;
        // Even though we don't expect a response, we still should let the
        // main_loop do an iteration. Without rescheduling, the Ready state
        // wouldn't be processed, the Status wouldn't be updated, and some
        // assertions may fail (e.g. in `postjoin()` in ``).
    /// **This function yields**
    pub fn step_and_yield(&self, msg: raft::Message) {
        self.raw_operation(|node_impl| node_impl.step(msg))
            .map_err(|e| tlog!(Error, "{e}"))
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
    /// **This function yields**
    pub fn tick_and_yield(&self, n_times: u32) {
        self.raw_operation(|node_impl| node_impl.tick(n_times));
        // even though we don't expect a response, we still should let the
        // main_loop do an iteration
    /// Only the conf_change_loop on a leader is eligible to call this function.
    /// **This function yields**
    pub(crate) fn propose_conf_change_and_wait(
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
        let notify =
            self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
    /// Attempt to transfer leadership to a given node and yield.
    /// **This function yields**
    pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
        self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));

    /// This function **may yield** if `self.node_impl` mutex is acquired.
    fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
        let mut node_impl = self.node_impl.lock();
        let res = f(&mut node_impl);
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
godzie44's avatar
godzie44 committed

    pub fn is_readonly(&self) -> bool {
        let is_ro: bool = crate::tarantool::eval("return")
            .expect("checking read-onlyness should never fail");
godzie44's avatar
godzie44 committed
pub(crate) struct NodeImpl {
    pub read_state_wakers: HashMap<LogicalClock, oneshot::Sender<RaftIndex>>,
    joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
    storage: Clusterwide,
    raft_storage: RaftSpaceAccess,
    status: watch::Sender<Status>,
    applied: watch::Sender<RaftIndex>,
    instance_reachability: InstanceReachabilityManagerRef,
godzie44's avatar
godzie44 committed
    plugin_manager: Rc<PluginManager>,
impl NodeImpl {
        storage: Clusterwide,
        raft_storage: RaftSpaceAccess,
godzie44's avatar
godzie44 committed
        plugin_manager: Rc<PluginManager>,
    ) -> Result<Self, RaftError> {
        let box_err = |e| StorageError::Other(Box::new(e));

        let raft_id: RaftId = raft_storage
            .expect("raft_id should be set by the time the node is being initialized");
        let applied: RaftIndex = raft_storage.applied().map_err(box_err)?;
            let gen = raft_storage.gen().unwrap() + 1;
            LogicalClock::new(raft_id, gen)

        let cfg = raft::Config {
            id: raft_id,
            pre_vote: true,
            // XXX: this value is pretty random, we should really do some
            // testing to determine the best value for it.
            max_size_per_msg: 64,
        let raw_node = RawNode::new(&cfg, raft_storage.clone(), tlog::root())?;
        let (status, _) = watch::channel(Status {
            id: raft_id,
            leader_id: None,
            term: traft::INIT_RAFT_TERM,
            raft_state: RaftState::Follower,
            main_loop_status: "idle",
        let (applied, _) = watch::channel(applied);
            read_state_wakers: Default::default(),
            joint_state_latch: KVCell::new(),
            instance_reachability: pool.instance_reachability.clone(),
godzie44's avatar
godzie44 committed

    fn raft_id(&self) -> RaftId {

    pub fn read_index_async(&mut self) -> Result<oneshot::Receiver<RaftIndex>, RaftError> {
        // In some states `raft-rs` ignores the ReadIndex request.
        // Check it preliminary, don't wait for the timeout.
        // See for details:
        // - <>
        // - <>
        let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
        let term_just_started = // ...
            self.raw_node.raft.state == RaftStateRole::Leader
godzie44's avatar
godzie44 committed
                && !self.raw_node.raft.commit_to_current_term();

        if leader_doesnt_exist || term_just_started {
            return Err(RaftError::ProposalDropped);

        let (lc, rx) = self.schedule_read_state_waker();
        // read_index puts this context into an Entry,
        // so we've got to compose full EntryContext,
        // despite single LogicalClock would be enough
        let ctx = traft::ReadStateContext { lc };
    /// **Doesn't yield**
    /// Returns id of the proposed entry, which can be used to await it's
    /// application.
    /// Returns an error if current instance is not a raft leader, because
    /// followers should propose raft log entries via [`proc_cas`].
    /// NOTE: the proposed entry may still be dropped, and the entry at that
    /// index may be some other one. It's the callers responsibility to verify
    /// which entry got committed.
    /// [`proc_cas`]: crate::cas::proc_cas
    pub fn propose_async<T>(&mut self, op: T) -> Result<RaftEntryId, Error>
        T: Into<Op>,
        if self.raw_node.raft.state != RaftStateRole::Leader {
            return Err(Error::NotALeader);

        let index_before = self.raw_node.raft.raft_log.last_index();
        let term = self.raw_node.raft.term;

        let context = traft::EntryContext::Op(op.into());

        // Check resulting raft log entry does not exceed the maximum tuple size limit.
        let entry = context.to_raft_entry();
        let tuple_size = traft::Entry::tuple_size(index_before + 1, term, &[], &entry.context);
        if tuple_size > PicodataConfig::max_tuple_size() {
            return Err(BoxError::new(
                format!("tuple size {tuple_size} exceeds the allowed limit"),
        // Copy-pasted from `raft::raw_node::RawNode::propose`
        let mut m = raft::Message::default();
        m.from =;

        let index = self.raw_node.raft.raft_log.last_index();
        debug_assert!(index == index_before + 1);

        let entry_id = RaftEntryId { term, index };

    pub fn campaign(&mut self) -> Result<(), RaftError> {

    pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
        if != self.raft_id() {
            return Ok(());

        // TODO check it's not a MsgPropose with op::Dml for updating _pico_instance.
        // TODO check it's not a MsgPropose with ConfChange.

    pub fn tick(&mut self, n_times: u32) {
        for _ in 0..n_times {

    fn propose_conf_change_async(
        &mut self,
        term: RaftTerm,
        conf_change: raft::ConfChangeV2,
    ) -> Result<oneshot::Receiver<Result<(), RaftError>>, RaftError> {
        // In some states proposing a ConfChange is impossible.
        // Check if there's a reason to reject it.

        // Checking leadership is only needed for the
        // correct latch management. It doesn't affect
        // raft correctness. Checking the instance is a
        // leader makes sure the proposed `ConfChange`
        // is appended to the raft log immediately
        // instead of sending `MsgPropose` over the
        // network.
        if self.raw_node.raft.state != RaftStateRole::Leader {
            return Err(RaftError::ConfChangeError("not a leader".into()));
        if term != self.raw_node.raft.term {
            return Err(RaftError::ConfChangeError("raft term mismatch".into()));
        // Without this check the node would silently ignore the conf change.
        // See
        if self.raw_node.raft.has_pending_conf() {
            return Err(RaftError::ConfChangeError(
                "already has pending confchange".into(),

        let prev_index = self.raw_node.raft.raft_log.last_index();
        self.raw_node.propose_conf_change(vec![], conf_change)?;

        // Ensure the ConfChange was actually appended to the log.
        // Otherwise it's a problem: current instance isn't actually a
        // leader (which is impossible in theory, but we're not sure in
        // practice) and sent the message to the raft network. It may
        // lead to an inconsistency.
        let last_index = self.raw_node.raft.raft_log.last_index();
        assert_eq!(last_index, prev_index + 1);

        if !self.joint_state_latch.is_empty() {
            warn_or_panic!("joint state latch is locked");

        let (tx, rx) = oneshot::channel();
        self.joint_state_latch.insert(last_index, tx);

    fn handle_committed_entries(
        &mut self,
        entries: &[raft::Entry],
    ) -> traft::Result<()> {
        let mut entries = entries.iter().peekable();

        while let Some(&entry) = entries.peek() {
            let entry = match traft::Entry::try_from(entry) {
                Ok(v) => v,
                Err(e) => {
                    tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
            let mut apply_entry_result = EntryApplied;
            let mut new_applied = None;
            transaction(|| -> tarantool::Result<()> {
                self.main_loop_status("handling committed entries");

                let entry_index = entry.index;
                match entry.entry_type {
                    raft::EntryType::EntryNormal => {
                        apply_entry_result = self.handle_committed_normal_entry(entry, expelled);
                        if apply_entry_result != EntryApplied {
                            return Ok(());
                    raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
                let res = self.raft_storage.persist_applied(entry_index);
                if let Err(e) = res {
                        "error persisting applied index: {e}";
                        "index" => entry_index
                new_applied = Some(entry_index);
            if let Some(new_applied) = new_applied {
                    .expect("applied shouldn't ever be borrowed across yields");

            match apply_entry_result {
                SleepAndRetry => {
                    self.main_loop_status("blocked by raft entry");
                EntryApplied => {
                    // Actually advance the iterator.
                    let _ =;
    fn wake_governor_if_needed(&self, op: &Op) {
        let wake_governor = match &op {
            Op::Dml(op) => dml_is_governor_wakeup_worthy(op),
            Op::BatchDml { ops } => ops.iter().any(dml_is_governor_wakeup_worthy),
            Op::DdlPrepare { .. } => true,
            _ => false,

        fn dml_is_governor_wakeup_worthy(op: &Dml) -> bool {
                    | ClusterwideTable::Replicaset
                    | ClusterwideTable::Instance)

        // NOTE: this may be premature, because the dml may fail to apply and/or
        // the transaction may be rolled back, but we ignore this for the sake
        // of simplicity, as nothing bad can happen if governor makes another
        // idle iteration.
        if wake_governor {
            let res = global()
                .expect("node must be initialized by this point")
            if let Err(e) = res {
                tlog!(Warning, "failed waking up governor: {e}");

    fn handle_dml_entry(&self, op: &Dml, expelled: &mut bool) {
        let space =;

        // In order to implement the audit log events, we have to compare
        // tuples from certain system spaces before and after a DML operation.
        // In theory, we could move this code to `do_dml`, but in practice we
        // cannot do that just yet, because we aren't allowed to universally call
        // `extract_key` for arbitrary tuple/space pairs due to insufficient validity
        // checks on its side -- it might just crash for user input.
        // Remeber, we're not the only ones using CaS; users may call it for their
        // own spaces, thus emitting unrestricted (and unsafe) DML records.
        // TODO: merge this into `do_dml` once `box_tuple_extract_key` is fixed.
        let old = match space {
            Ok(s @ (ClusterwideTable::Property | ClusterwideTable::Instance)) => {
                let s = space_by_id("system space must exist");
                match &op {
                    // There may be no previous version for inserts.
                    Dml::Insert { .. } => Ok(None),
                    Dml::Update { key, .. } => s.get(key),
                    Dml::Delete { key, .. } => s.get(key),
                    Dml::Replace { tuple, .. } => {
                        let tuple = Tuple::from(tuple);
                        let index = s.primary_key();
                        // Safety: safe as long as `tuple` has the correct format
                        // for the `index`, which should be checked via cas before
                        // this log entry is committed.
                        // TODO: rewrite using the safe `KeyDef::extract_key` alternative
                        let key = unsafe { index.extract_key(tuple) };
            _ => Ok(None),

        // Perform DML and combine both tuple versions into a pair.
        let res = old.and_then(|old| {
            let new =;
            Ok((old, new))

        let initiator = op.initiator();
        let initiator_def = user_by_id(initiator).expect("user must exist");

        match &res {
            Err(e) => {
                tlog!(Error, "clusterwide dml failed: {e}");
            // Handle insert, replace, update in _pico_instance
            Ok((old, Some(new))) if space == Ok(ClusterwideTable::Instance) => {
                // Dml::Delete mandates that new tuple is None.
                assert!(!matches!(op, Dml::Delete { .. }));

                let old: Option<Instance> =
                    old.as_ref().map(|x| x.decode().expect("must be Instance"));

                // FIXME: we do this prematurely, because the
                // transaction may still be rolled back for some reason.
                let new: Instance = new
                    .expect("tuple already passed format verification");

                // Check if we're handling a "new node joined" event:
                // * Either there's no tuple for this node in the storage;
                // * Or its raft id has changed, meaning it's no longer the same node.
                // WARN: this condition will not pass on the joining instance
                // as it preemptively puts itself into `_pico_instance` table.
                // Locally it's logged in src/
                if old.as_ref().map(|x| x.raft_id) != Some(new.raft_id) {
                    let instance_id = &new.instance_id;
                        message: "a new instance `{instance_id}` joined the cluster",
                        title: "join_instance",
                        severity: Low,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        initiator: &,
                        message: "local database created on `{instance_id}`",
                        title: "create_local_db",
                        severity: Low,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        initiator: &,
                if old.as_ref().map(|x| x.current_state) != Some(new.current_state) {
                    let instance_id = &new.instance_id;
                    let state = &new.current_state;
                        message: "current state of instance `{instance_id}` changed to {state}",
                        title: "change_current_state",
                        severity: Medium,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        new_state: %state,
                        initiator: &,

                if old.as_ref().map(|x| x.target_state) != Some(new.target_state) {
                    let instance_id = &new.instance_id;
                    let state = &new.target_state;
                        message: "target state of instance `{instance_id}` changed to {state}",
                        title: "change_target_state",
                        severity: Low,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        new_state: %state,
                        initiator: &,

                if has_states!(new, Expelled -> *) {
                    let instance_id = &new.instance_id;
                        message: "instance `{instance_id}` was expelled from the cluster",
                        title: "expel_instance",
                        severity: Low,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        initiator: &,
                        message: "local database dropped on `{instance_id}`",
                        title: "drop_local_db",
                        severity: Low,
                        instance_id: %instance_id,
                        raft_id: %new.raft_id,
                        initiator: &,

                    if new.raft_id == self.raft_id() {
                        // cannot exit during a transaction
                        *expelled = true;
            Ok(_) => {}

    /// Is called during a transaction
    fn handle_committed_normal_entry(
        &mut self,
        entry: traft::Entry,
        expelled: &mut bool,
        assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
        let op = entry.into_op().unwrap_or(Op::Nop);
        tlog!(Debug, "applying entry: {op}"; "index" => index);

        let storage_properties = &;

        // apply the operation
        match op {
            Op::Nop => {}
            Op::BatchDml { ref ops } => {
                for op in ops {
                    self.handle_dml_entry(op, expelled);
                self.handle_dml_entry(&op, expelled);
            Op::DdlPrepare {
            } => {
                self.apply_op_ddl_prepare(ddl, schema_version)
                    .expect("storage should not fail");
                let v_local = local_schema_version().expect("storage should not fail");
                let v_pending = storage_properties
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");
                let ddl = storage_properties
                    .expect("storage should not fail")
                    .expect("granted we don't mess up log compaction, this should not be None");

                // This instance is catching up to the cluster.
                if v_local < v_pending {
                    if self.is_readonly() {
                        return SleepAndRetry;
                        // Master applies schema change at this point.
                        let res = rpc::ddl_apply::apply_schema_change(
                        match res {
                            Err(rpc::ddl_apply::Error::Other(err)) => {
                                panic!("storage should not fail, but failed with: {err}")
                            Err(rpc::ddl_apply::Error::Aborted(reason)) => {
                                tlog!(Warning, "failed applying committed ddl operation: {reason}";
                                    "ddl" => ?ddl,
                                return SleepAndRetry;
                // Update pico metadata.
                        ddl_meta_space_update_operable(&, id, true)
                            .expect("storage shouldn't fail");
                        let initiator_def = user_by_id(owner).expect("user must exist");

                            message: "created table `{name}`",
                            title: "create_table",
                            severity: Medium,