From da8d792edd87eb4facf9e92d4a0e06e390f2fa3d Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 22 Dec 2022 17:27:53 +0300
Subject: [PATCH] feat(node): Node::storage_watcher for notifications on
 storage changes

---
 src/governor/mod.rs |   8 --
 src/on_shutdown.rs  |   4 +-
 src/storage.rs      | 291 ++++++++++++++++++++++++++++++++++++--------
 src/traft/node.rs   |  97 ++++++++++++---
 src/traft/op.rs     |  53 +++++---
 5 files changed, 360 insertions(+), 93 deletions(-)

diff --git a/src/governor/mod.rs b/src/governor/mod.rs
index ad514e7bc9..5d99ba1912 100644
--- a/src/governor/mod.rs
+++ b/src/governor/mod.rs
@@ -495,14 +495,6 @@ impl Loop {
     pub fn wakeup(&self) -> Result<()> {
         self.waker.send(()).map_err(|_| Error::GovernorStopped)
     }
-
-    pub async fn awoken(&self) -> Result<()> {
-        self.waker
-            .subscribe()
-            .changed()
-            .await
-            .map_err(|_| Error::GovernorStopped)
-    }
 }
 
 pub struct Loop {
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index d64d9259ee..a25b8d8467 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -4,6 +4,7 @@ use ::tarantool::fiber;
 
 use crate::has_grades;
 use crate::instance::grade::TargetGradeVariant;
+use crate::storage::ClusterwideSpace;
 use crate::tlog;
 use crate::traft;
 use crate::traft::error::Error;
@@ -26,6 +27,7 @@ pub async fn callback() {
     // 2. Meanwhile, wait until either it succeeds or there is no quorum.
     let node = node::global().unwrap();
     let raft_id = node.raft_id();
+    let mut instances_watcher = node.storage_watcher(ClusterwideSpace::Instance);
     loop {
         let me = unwrap_ok_or!(
             node.storage.instances.get(&raft_id),
@@ -64,7 +66,7 @@ pub async fn callback() {
             break;
         }
 
-        if let Err(e) = node.governor_loop.awoken().await {
+        if let Err(e) = instances_watcher.changed().await {
             tlog!(Warning, "failed to shutdown gracefully: {e}");
         }
     }
diff --git a/src/storage.rs b/src/storage.rs
index cd9a5513ca..700e45c1e5 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -31,10 +31,11 @@ use std::marker::PhantomData;
 impl ClusterwideSpace {
     #[inline]
     fn get(&self) -> tarantool::Result<Space> {
-        Space::find(self.as_str()).ok_or_else(|| {
+        Space::find_cached(self.as_str()).ok_or_else(|| {
             tarantool::set_error!(
                 tarantool::error::TarantoolErrorCode::NoSuchSpace,
-                "no such space \"{self}\""
+                "no such space \"{}\"",
+                self
             );
             tarantool::error::TarantoolError::last().into()
         })
@@ -49,6 +50,50 @@ impl ClusterwideSpace {
     pub fn replace(&self, tuple: &impl ToTupleBuffer) -> tarantool::Result<Tuple> {
         self.get()?.replace(tuple)
     }
+}
+
+/// Types implementing this trait represent clusterwide spaces.
+pub trait TClusterwideSpace {
+    type Index: TClusterwideSpaceIndex;
+    const SPACE_NAME: &'static str;
+
+    #[inline(always)]
+    fn primary_index() -> Self::Index {
+        Self::Index::primary()
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// ClusterwideSpaceIndex
+////////////////////////////////////////////////////////////////////////////////
+
+/// A type alias for getting the enumeration of indexes for a clusterwide space.
+pub type IndexOf<T> = <T as TClusterwideSpace>::Index;
+
+/// An index of a clusterwide space.
+#[derive(Copy, Clone, Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq, Hash)]
+pub enum ClusterwideSpaceIndex {
+    Property(IndexOf<Properties>),
+    Instance(IndexOf<Instances>),
+    Address(IndexOf<PeerAddresses>),
+    Replicaset(IndexOf<Replicasets>),
+    Migration(IndexOf<Migrations>),
+}
+
+impl ClusterwideSpaceIndex {
+    #[inline]
+    fn get(&self) -> tarantool::Result<Index> {
+        let space = self.space().get()?;
+        let index_name = self.index_name();
+        let Some(index) = space.index_cached(index_name) else {
+            tarantool::set_error!(
+                tarantool::error::TarantoolErrorCode::NoSuchIndexName,
+                "no such index \"{}\"", index_name
+            );
+            return Err(tarantool::error::TarantoolError::last().into());
+        };
+        Ok(index)
+    }
 
     #[inline]
     pub fn update(
@@ -63,6 +108,114 @@ impl ClusterwideSpace {
     pub fn delete(&self, key: &impl ToTupleBuffer) -> tarantool::Result<Option<Tuple>> {
         self.get()?.delete(key)
     }
+
+    pub const fn space(&self) -> ClusterwideSpace {
+        match self {
+            Self::Property(_) => ClusterwideSpace::Property,
+            Self::Instance(_) => ClusterwideSpace::Instance,
+            Self::Address(_) => ClusterwideSpace::Address,
+            Self::Replicaset(_) => ClusterwideSpace::Replicaset,
+            Self::Migration(_) => ClusterwideSpace::Migration,
+        }
+    }
+
+    pub const fn index_name(&self) -> &'static str {
+        match self {
+            Self::Property(idx) => idx.as_str(),
+            Self::Instance(idx) => idx.as_str(),
+            Self::Address(idx) => idx.as_str(),
+            Self::Replicaset(idx) => idx.as_str(),
+            Self::Migration(idx) => idx.as_str(),
+        }
+    }
+
+    pub fn is_primary(&self) -> bool {
+        match self {
+            Self::Property(idx) => idx.is_primary(),
+            Self::Instance(idx) => idx.is_primary(),
+            Self::Address(idx) => idx.is_primary(),
+            Self::Replicaset(idx) => idx.is_primary(),
+            Self::Migration(idx) => idx.is_primary(),
+        }
+    }
+}
+
+impl From<ClusterwideSpace> for ClusterwideSpaceIndex {
+    fn from(space: ClusterwideSpace) -> Self {
+        match space {
+            ClusterwideSpace::Property => Self::Property(Properties::primary_index()),
+            ClusterwideSpace::Instance => Self::Instance(Instances::primary_index()),
+            ClusterwideSpace::Address => Self::Address(PeerAddresses::primary_index()),
+            ClusterwideSpace::Replicaset => Self::Replicaset(Replicasets::primary_index()),
+            ClusterwideSpace::Migration => Self::Migration(Migrations::primary_index()),
+        }
+    }
+}
+
+impl std::fmt::Display for ClusterwideSpaceIndex {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        if self.is_primary() {
+            write!(f, "{}", self.space())
+        } else {
+            write!(f, "{}.index.{}", self.space(), self.index_name())
+        }
+    }
+}
+
+pub trait TClusterwideSpaceIndex {
+    fn primary() -> Self;
+    fn is_primary(&self) -> bool;
+}
+
+macro_rules! define_clusterwide_space_struct {
+    (
+        $space_name:path;
+
+        $(#[$space_meta:meta])*
+        pub struct $space:ident {
+            $(
+                $(#[$field_meta:meta])*
+                $field:ident : $field_ty:ty,
+            )+
+        }
+
+        $(#[$index_meta:meta])*
+        pub enum $index:ident {
+            #[primary]
+            $primary_index_var:ident = $primary_index_name:expr,
+            $( $index_var:ident = $index_name:expr, )*
+        }
+    ) => {
+        $(#[$space_meta])*
+        pub struct $space {
+            $( $(#[$field_meta])* $field: $field_ty,)+
+        }
+
+        ::tarantool::define_str_enum! {
+            $(#[$index_meta])*
+            pub enum $index {
+                $primary_index_var = $primary_index_name,
+                $( $index_var = $index_name, )*
+            }
+        }
+
+        impl TClusterwideSpace for $space {
+            type Index = $index;
+            const SPACE_NAME: &'static str = $space_name.as_str();
+        }
+
+        impl TClusterwideSpaceIndex for IndexOf<$space> {
+            #[inline(always)]
+            fn primary() -> Self {
+                Self::$primary_index_var
+            }
+
+            #[inline(always)]
+            fn is_primary(&self) -> bool {
+                matches!(self, Self::$primary_index_var)
+            }
+        }
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -107,16 +260,23 @@ impl Clusterwide {
 // Properties
 ////////////////////////////////////////////////////////////////////////////////
 
-/// A struct for accessing storage of the cluster-wide key-value properties
-#[derive(Clone, Debug)]
-pub struct Properties {
-    space: Space,
+define_clusterwide_space_struct! {
+    ClusterwideSpace::Property;
+
+    /// A struct for accessing storage of the cluster-wide key-value properties
+    #[derive(Clone, Debug)]
+    pub struct Properties {
+        space: Space,
+    }
+
+    /// An enumeration of indexes defined for property space.
+    pub enum SpacePropertyIndex {
+        #[primary]
+        Key = "key",
+    }
 }
 
 impl Properties {
-    const SPACE_NAME: &'static str = ClusterwideSpace::Property.as_str();
-    const INDEX_PRIMARY: &'static str = "pk";
-
     pub fn new() -> tarantool::Result<Self> {
         let space = Space::builder(Self::SPACE_NAME)
             .is_local(true)
@@ -127,7 +287,7 @@ impl Properties {
             .create()?;
 
         space
-            .index_builder(Self::INDEX_PRIMARY)
+            .index_builder(Self::primary_index().as_str())
             .unique(true)
             .part("key")
             .if_not_exists(true)
@@ -182,16 +342,24 @@ impl Properties {
 // Replicasets
 ////////////////////////////////////////////////////////////////////////////////
 
-/// A struct for accessing replicaset info from storage
-#[derive(Clone, Debug)]
-pub struct Replicasets {
-    space: Space,
+define_clusterwide_space_struct! {
+    ClusterwideSpace::Replicaset;
+
+    /// A struct for accessing replicaset info from storage
+    #[derive(Clone, Debug)]
+    pub struct Replicasets {
+        space: Space,
+    }
+
+
+    /// An enumeration of indexes defined for replicaset space.
+    pub enum SpaceReplicasetIndex {
+        #[primary]
+        ReplicasetId = "replicaset_id",
+    }
 }
 
 impl Replicasets {
-    const SPACE_NAME: &'static str = ClusterwideSpace::Replicaset.as_str();
-    const INDEX_PRIMARY: &'static str = "pk";
-
     pub fn new() -> tarantool::Result<Self> {
         let space = Space::builder(Self::SPACE_NAME)
             .is_local(true)
@@ -201,7 +369,7 @@ impl Replicasets {
             .create()?;
 
         space
-            .index_builder(Self::INDEX_PRIMARY)
+            .index_builder(Self::primary_index().as_str())
             .unique(true)
             .part("replicaset_id")
             .if_not_exists(true)
@@ -233,18 +401,23 @@ impl ToEntryIter for Replicasets {
 // PeerAddresses
 ////////////////////////////////////////////////////////////////////////////////
 
-/// A struct for accessing storage of peer addresses.
-#[derive(Clone, Debug)]
-pub struct PeerAddresses {
-    space: Space,
-    #[allow(dead_code)]
-    index_raft_id: Index,
+define_clusterwide_space_struct! {
+    ClusterwideSpace::Address;
+
+    /// A struct for accessing storage of peer addresses.
+    #[derive(Clone, Debug)]
+    pub struct PeerAddresses {
+        space: Space,
+    }
+
+    /// An enumeration of indexes defined for peer address space.
+    pub enum SpacePeerAddressIndex {
+        #[primary]
+        RaftId = "raft_id",
+    }
 }
 
 impl PeerAddresses {
-    const SPACE_NAME: &'static str = ClusterwideSpace::Address.as_str();
-    const INDEX_RAFT_ID: &'static str = "raft_id";
-
     pub fn new() -> tarantool::Result<Self> {
         let space_instances = Space::builder(Self::SPACE_NAME)
             .is_local(true)
@@ -254,8 +427,8 @@ impl PeerAddresses {
             .if_not_exists(true)
             .create()?;
 
-        let index_raft_id = space_instances
-            .index_builder(Self::INDEX_RAFT_ID)
+        space_instances
+            .index_builder(Self::primary_index().as_str())
             .unique(true)
             .part("raft_id")
             .if_not_exists(true)
@@ -263,7 +436,6 @@ impl PeerAddresses {
 
         Ok(Self {
             space: space_instances,
-            index_raft_id,
         })
     }
 
@@ -306,21 +478,29 @@ impl ToEntryIter for PeerAddresses {
 // Instance
 ////////////////////////////////////////////////////////////////////////////////
 
-/// A struct for accessing storage of all the cluster instances.
-#[derive(Clone, Debug)]
-pub struct Instances {
-    space: Space,
-    index_instance_id: Index,
-    index_raft_id: Index,
-    index_replicaset_id: Index,
+define_clusterwide_space_struct! {
+    ClusterwideSpace::Instance;
+
+    /// A struct for accessing storage of all the cluster instances.
+    #[derive(Clone, Debug)]
+    pub struct Instances {
+        space: Space,
+        index_instance_id: Index,
+        index_raft_id: Index,
+        index_replicaset_id: Index,
+    }
+
+    /// An enumeration of indexes defined for instance space.
+    #[allow(clippy::enum_variant_names)]
+    pub enum SpaceInstanceIndex {
+        #[primary]
+        InstanceId = "instance_id",
+        RaftId = "raft_id",
+        ReplicasetId = "replicaset_id",
+    }
 }
 
 impl Instances {
-    const SPACE_NAME: &'static str = ClusterwideSpace::Instance.as_str();
-    const INDEX_INSTANCE_ID: &'static str = "instance_id";
-    const INDEX_RAFT_ID: &'static str = "raft_id";
-    const INDEX_REPLICASET_ID: &'static str = "replicaset_id";
-
     pub fn new() -> tarantool::Result<Self> {
         let space_instances = Space::builder(Self::SPACE_NAME)
             .is_local(true)
@@ -330,21 +510,21 @@ impl Instances {
             .create()?;
 
         let index_instance_id = space_instances
-            .index_builder(Self::INDEX_INSTANCE_ID)
+            .index_builder(IndexOf::<Self>::InstanceId.as_str())
             .unique(true)
             .part(instance_field::InstanceId)
             .if_not_exists(true)
             .create()?;
 
         let index_raft_id = space_instances
-            .index_builder(Self::INDEX_RAFT_ID)
+            .index_builder(IndexOf::<Self>::RaftId.as_str())
             .unique(true)
             .part(instance_field::RaftId)
             .if_not_exists(true)
             .create()?;
 
         let index_replicaset_id = space_instances
-            .index_builder(Self::INDEX_REPLICASET_ID)
+            .index_builder(IndexOf::<Self>::ReplicasetId.as_str())
             .unique(false)
             .part(instance_field::ReplicasetId)
             .if_not_exists(true)
@@ -641,15 +821,22 @@ where
 // Migrations
 ////////////////////////////////////////////////////////////////////////////////
 
-#[derive(Clone, Debug)]
-pub struct Migrations {
-    space: Space,
+define_clusterwide_space_struct! {
+    ClusterwideSpace::Migration;
+
+    #[derive(Clone, Debug)]
+    pub struct Migrations {
+        space: Space,
+    }
+
+    /// An enumeration of indexes defined for migration space.
+    pub enum SpaceMigrationIndex {
+        #[primary]
+        Id = "id",
+    }
 }
 
 impl Migrations {
-    const SPACE_NAME: &'static str = ClusterwideSpace::Migration.as_str();
-    const INDEX_PRIMARY: &'static str = "pk";
-
     pub fn new() -> tarantool::Result<Self> {
         let space = Space::builder(Self::SPACE_NAME)
             .is_local(true)
@@ -660,7 +847,7 @@ impl Migrations {
             .create()?;
 
         space
-            .index_builder(Self::INDEX_PRIMARY)
+            .index_builder(Self::primary_index().as_str())
             .unique(true)
             .part("id")
             .if_not_exists(true)
diff --git a/src/traft/node.rs b/src/traft/node.rs
index ef01f7c876..58c70871d5 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -12,7 +12,7 @@ use crate::kvcell::KVCell;
 use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::storage::ToEntryIter as _;
-use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
+use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName};
 use crate::stringify_cfunc;
 use crate::tlog;
 use crate::traft;
@@ -106,6 +106,9 @@ impl Status {
     }
 }
 
+type StorageWatchers = HashMap<ClusterwideSpaceIndex, watch::Sender<()>>;
+type StorageChanges = HashSet<ClusterwideSpaceIndex>;
+
 /// The heart of `traft` module - the Node.
 pub struct Node {
     /// RaftId of the Node.
@@ -122,6 +125,7 @@ pub struct Node {
     main_loop: MainLoop,
     pub(crate) governor_loop: governor::Loop,
     status: watch::Receiver<Status>,
+    watchers: Rc<Mutex<StorageWatchers>>,
 }
 
 impl std::fmt::Debug for Node {
@@ -142,10 +146,11 @@ impl Node {
         let status = node_impl.status.subscribe();
 
         let node_impl = Rc::new(Mutex::new(node_impl));
+        let watchers = Rc::new(Mutex::new(HashMap::new()));
 
         let node = Node {
             raft_id,
-            main_loop: MainLoop::start(node_impl.clone()), // yields
+            main_loop: MainLoop::start(node_impl.clone(), watchers.clone()), // yields
             governor_loop: governor::Loop::start(
                 status.clone(),
                 storage.clone(),
@@ -155,6 +160,7 @@ impl Node {
             storage,
             raft_storage,
             status,
+            watchers,
         };
 
         // Wait for the node to enter the main loop
@@ -311,6 +317,30 @@ impl Node {
     pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
         self.raft_storage.all_traft_entries()
     }
+
+    /// Returns a watch which will be notified when a clusterwide space is
+    /// modified via the specified `index`.
+    ///
+    /// You can also pass a [`ClusterwideSpace`] in which case the space's
+    /// primary index will be used.
+    #[inline(always)]
+    pub fn storage_watcher(&self, index: impl Into<ClusterwideSpaceIndex>) -> watch::Receiver<()> {
+        self.storage_watcher_impl(index.into())
+    }
+
+    /// A non generic version for optimization.
+    fn storage_watcher_impl(&self, index: ClusterwideSpaceIndex) -> watch::Receiver<()> {
+        use std::collections::hash_map::Entry;
+        let mut watchers = self.watchers.lock();
+        match watchers.entry(index) {
+            Entry::Vacant(entry) => {
+                let (tx, rx) = watch::channel(());
+                entry.insert(tx);
+                rx
+            }
+            Entry::Occupied(entry) => entry.get().subscribe(),
+        }
+    }
 }
 
 struct NodeImpl {
@@ -604,6 +634,7 @@ impl NodeImpl {
         entries: &[raft::Entry],
         wake_governor: &mut bool,
         expelled: &mut bool,
+        storage_changes: &mut StorageChanges,
     ) {
         for entry in entries {
             let entry = match traft::Entry::try_from(entry) {
@@ -615,9 +646,12 @@ impl NodeImpl {
             };
 
             match entry.entry_type {
-                raft::EntryType::EntryNormal => {
-                    self.handle_committed_normal_entry(entry, wake_governor, expelled)
-                }
+                raft::EntryType::EntryNormal => self.handle_committed_normal_entry(
+                    entry,
+                    wake_governor,
+                    expelled,
+                    storage_changes,
+                ),
                 raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
                     self.handle_committed_conf_change(entry)
                 }
@@ -643,6 +677,7 @@ impl NodeImpl {
         entry: traft::Entry,
         wake_governor: &mut bool,
         expelled: &mut bool,
+        storage_changes: &mut StorageChanges,
     ) {
         assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
         let lc = entry.lc();
@@ -652,18 +687,20 @@ impl NodeImpl {
         match &op {
             Op::PersistInstance(PersistInstance(instance)) => {
                 *wake_governor = true;
+                storage_changes.insert(ClusterwideSpace::Instance.into());
                 if has_grades!(instance, Expelled -> *) && instance.raft_id == self.raft_id() {
                     // cannot exit during a transaction
                     *expelled = true;
                 }
             }
-            Op::Dml(op)
+            Op::Dml(op) => {
                 if matches!(
                     op.space(),
                     ClusterwideSpace::Property | ClusterwideSpace::Replicaset
-                ) =>
-            {
-                *wake_governor = true;
+                ) {
+                    *wake_governor = true;
+                }
+                storage_changes.insert(op.index());
             }
             _ => {}
         }
@@ -788,7 +825,12 @@ impl NodeImpl {
     /// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
     ///
     /// This function yields.
-    fn advance(&mut self, wake_governor: &mut bool, expelled: &mut bool) {
+    fn advance(
+        &mut self,
+        wake_governor: &mut bool,
+        expelled: &mut bool,
+        storage_changes: &mut StorageChanges,
+    ) {
         // Get the `Ready` with `RawNode::ready` interface.
         if !self.raw_node.has_ready() {
             return;
@@ -820,7 +862,12 @@ impl NodeImpl {
 
         if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
             // Apply committed entries.
-            self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled);
+            self.handle_committed_entries(
+                ready.committed_entries(),
+                wake_governor,
+                expelled,
+                storage_changes,
+            );
 
             // Persist uncommitted entries in the raft log.
             self.raft_storage.persist_entries(ready.entries()).unwrap();
@@ -857,7 +904,12 @@ impl NodeImpl {
             }
 
             // Apply committed entries.
-            self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled);
+            self.handle_committed_entries(
+                light_rd.committed_entries(),
+                wake_governor,
+                expelled,
+                storage_changes,
+            );
 
             Ok(())
         }) {
@@ -904,12 +956,13 @@ struct MainLoopState {
     next_tick: Instant,
     loop_waker: watch::Receiver<()>,
     stop_flag: Rc<Cell<bool>>,
+    watchers: Rc<Mutex<StorageWatchers>>,
 }
 
 impl MainLoop {
     pub const TICK: Duration = Duration::from_millis(100);
 
-    fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
+    fn start(node_impl: Rc<Mutex<NodeImpl>>, watchers: Rc<Mutex<StorageWatchers>>) -> Self {
         let (loop_waker_tx, loop_waker_rx) = watch::channel(());
         let stop_flag: Rc<Cell<bool>> = Default::default();
 
@@ -918,6 +971,7 @@ impl MainLoop {
             next_tick: Instant::now(),
             loop_waker: loop_waker_rx,
             stop_flag: stop_flag.clone(),
+            watchers,
         };
 
         Self {
@@ -953,11 +1007,26 @@ impl MainLoop {
 
         let mut wake_governor = false;
         let mut expelled = false;
-        node_impl.advance(&mut wake_governor, &mut expelled); // yields
+        let mut storage_changes = StorageChanges::new();
+        node_impl.advance(&mut wake_governor, &mut expelled, &mut storage_changes); // yields
+        drop(node_impl);
         if state.stop_flag.take() {
             return FlowControl::Break;
         }
 
+        {
+            // node_impl lock must be dropped before this to avoid deadlocking
+            let mut watchers = state.watchers.lock();
+            for index in storage_changes {
+                if let Some(tx) = watchers.get(&index) {
+                    let res = tx.send(());
+                    if res.is_err() {
+                        watchers.remove(&index);
+                    }
+                }
+            }
+        }
+
         if expelled {
             crate::tarantool::exit(0);
         }
diff --git a/src/traft/op.rs b/src/traft/op.rs
index 9a861cb192..6941f53e5e 100644
--- a/src/traft/op.rs
+++ b/src/traft/op.rs
@@ -1,6 +1,6 @@
 use crate::instance::Instance;
 use crate::storage;
-use crate::storage::ClusterwideSpace;
+use crate::storage::{ClusterwideSpace, ClusterwideSpaceIndex};
 use crate::util::AnyWithTypeName;
 use ::tarantool::tlua::LuaError;
 use ::tarantool::tuple::{ToTupleBuffer, Tuple, TupleBuffer};
@@ -55,13 +55,13 @@ impl std::fmt::Display for Op {
             Self::Dml(Dml::Replace { space, tuple }) => {
                 write!(f, "Replace({space}, {})", DisplayAsJson(tuple))
             }
-            Self::Dml(Dml::Update { space, key, ops }) => {
+            Self::Dml(Dml::Update { index, key, ops }) => {
                 let key = DisplayAsJson(key);
                 let ops = DisplayAsJson(&**ops);
-                write!(f, "Update({space}, {key}, {ops})")
+                write!(f, "Update({index}, {key}, {ops})")
             }
-            Self::Dml(Dml::Delete { space, key }) => {
-                write!(f, "Delete({space}, {})", DisplayAsJson(key))
+            Self::Dml(Dml::Delete { index, key }) => {
+                write!(f, "Delete({index}, {})", DisplayAsJson(key))
             }
         };
 
@@ -207,14 +207,14 @@ pub enum Dml {
         tuple: TupleBuffer,
     },
     Update {
-        space: ClusterwideSpace,
+        index: ClusterwideSpaceIndex,
         #[serde(with = "serde_bytes")]
         key: TupleBuffer,
         #[serde(with = "vec_of_raw_byte_buf")]
         ops: Vec<TupleBuffer>,
     },
     Delete {
-        space: ClusterwideSpace,
+        index: ClusterwideSpaceIndex,
         #[serde(with = "serde_bytes")]
         key: TupleBuffer,
     },
@@ -226,8 +226,8 @@ impl OpResult for Dml {
         match self {
             Self::Insert { space, tuple } => space.insert(&tuple).map(Some),
             Self::Replace { space, tuple } => space.replace(&tuple).map(Some),
-            Self::Update { space, key, ops } => space.update(&key, &ops),
-            Self::Delete { space, key } => space.delete(&key),
+            Self::Update { index, key, ops } => index.update(&key, &ops),
+            Self::Delete { index, key } => index.delete(&key),
         }
     }
 }
@@ -240,6 +240,7 @@ impl From<Dml> for Op {
 
 impl Dml {
     /// Serializes `tuple` and returns an [`Dml::Insert`] in case of success.
+    #[inline(always)]
     pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
         let res = Self::Insert {
             space,
@@ -249,6 +250,7 @@ impl Dml {
     }
 
     /// Serializes `tuple` and returns an [`Dml::Replace`] in case of success.
+    #[inline(always)]
     pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
         let res = Self::Replace {
             space,
@@ -258,13 +260,14 @@ impl Dml {
     }
 
     /// Serializes `key` and returns an [`Dml::Update`] in case of success.
+    #[inline(always)]
     pub fn update(
-        space: ClusterwideSpace,
+        index: impl Into<ClusterwideSpaceIndex>,
         key: &impl ToTupleBuffer,
         ops: impl Into<Vec<TupleBuffer>>,
     ) -> tarantool::Result<Self> {
         let res = Self::Update {
-            space,
+            index: index.into(),
             key: key.to_tuple_buffer()?,
             ops: ops.into(),
         };
@@ -272,21 +275,35 @@ impl Dml {
     }
 
     /// Serializes `key` and returns an [`Dml::Delete`] in case of success.
-    pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> {
+    #[inline(always)]
+    pub fn delete(
+        index: impl Into<ClusterwideSpaceIndex>,
+        key: &impl ToTupleBuffer,
+    ) -> tarantool::Result<Self> {
         let res = Self::Delete {
-            space,
+            index: index.into(),
             key: key.to_tuple_buffer()?,
         };
         Ok(res)
     }
 
     #[rustfmt::skip]
-    pub fn space(&self) -> &ClusterwideSpace {
+    pub fn space(&self) -> ClusterwideSpace {
+        match &self {
+            Self::Insert { space, .. } => *space,
+            Self::Replace { space, .. } => *space,
+            Self::Update { index, .. } => index.space(),
+            Self::Delete { index, .. } => index.space(),
+        }
+    }
+
+    #[rustfmt::skip]
+    pub fn index(&self) -> ClusterwideSpaceIndex {
         match &self {
-            Self::Insert { space, .. } => space,
-            Self::Replace { space, .. } => space,
-            Self::Update { space, .. } => space,
-            Self::Delete { space, .. } => space,
+            Self::Insert { space, .. } => (*space).into(),
+            Self::Replace { space, .. } => (*space).into(),
+            Self::Update { index, .. } => *index,
+            Self::Delete { index, .. } => *index,
         }
     }
 }
-- 
GitLab