From ccf0711668fce5fc8dad6a0450d9c111fef25dc7 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 29 Aug 2024 00:07:17 +0300
Subject: [PATCH] feat: add ability to construct cas::Range from a Dml

---
 src/cas.rs        |  23 ++++++++--
 src/storage.rs    | 109 ++++++++++++++++++++++------------------------
 src/traft/node.rs |   7 ++-
 3 files changed, 76 insertions(+), 63 deletions(-)

diff --git a/src/cas.rs b/src/cas.rs
index 5234b1715c..89a7ac8422 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -1,6 +1,7 @@
 use crate::access_control;
 use crate::error_code::ErrorCode;
 use crate::proc_name;
+use crate::storage;
 use crate::storage::Clusterwide;
 use crate::storage::ClusterwideTable;
 use crate::tlog;
@@ -580,14 +581,14 @@ impl Predicate {
                 match op {
                     Dml::Update { key, .. } | Dml::Delete { key, .. } => {
                         let key = Tuple::new(key)?;
-                        let key_def = storage.key_def_for_key(space_id, 0)?;
+                        let key_def = storage::cached_key_def_for_key(space_id, 0)?;
                         if range.contains(&key_def, &key) {
                             return Err(error());
                         }
                     }
                     Dml::Insert { tuple, .. } | Dml::Replace { tuple, .. } => {
                         let tuple = Tuple::new(tuple)?;
-                        let key_def = storage.key_def(space_id, 0)?;
+                        let key_def = storage::cached_key_def(space_id, 0)?;
                         if range.contains(&key_def, &tuple) {
                             return Err(error());
                         }
@@ -625,7 +626,7 @@ impl Predicate {
                     if space != range.table {
                         continue;
                     }
-                    let key_def = storage.key_def_for_key(space, 0)?;
+                    let key_def = storage::cached_key_def_for_key(space, 0)?;
                     for key in schema_related_property_keys() {
                         if range.contains(&key_def, key) {
                             return Err(error());
@@ -746,6 +747,22 @@ impl Range {
         }
     }
 
+    pub fn for_dml(dml: &Dml) -> Result<Self> {
+        let table = dml.table_id();
+        let key = match dml {
+            Dml::Update { key, .. } | Dml::Delete { key, .. } => key.clone(),
+            Dml::Insert { tuple, .. } | Dml::Replace { tuple, .. } => {
+                let key_def = storage::cached_key_def(table, 0)?;
+                let tuple = Tuple::new(tuple)?;
+                key_def.extract_key(&tuple)?
+            }
+        };
+        Ok(Self {
+            table,
+            bounds: Some(RangeBounds::Eq { key }),
+        })
+    }
+
     #[inline(always)]
     pub fn from_parts(
         table: impl Into<SpaceId>,
diff --git a/src/storage.rs b/src/storage.rs
index 9db1dc0b30..aae8c0197f 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -1014,62 +1014,6 @@ impl Clusterwide {
         Ok(())
     }
 
-    /// Return a `KeyDef` to be used for comparing **tuples** of the
-    /// corresponding global space.
-    pub(crate) fn key_def(
-        &self,
-        space_id: SpaceId,
-        index_id: IndexId,
-    ) -> tarantool::Result<Rc<KeyDef>> {
-        static mut KEY_DEF: Option<HashMap<(ClusterwideTable, IndexId), Rc<KeyDef>>> = None;
-        let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) };
-        if let Ok(sys_space_id) = ClusterwideTable::try_from(space_id) {
-            let key_def = match key_defs.entry((sys_space_id, index_id)) {
-                Entry::Occupied(o) => o.into_mut(),
-                Entry::Vacant(v) => {
-                    let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
-                    let key_def = index.meta()?.to_key_def();
-                    // System space definition's never change during a single
-                    // execution, so it's safe to cache these
-                    v.insert(Rc::new(key_def))
-                }
-            };
-            return Ok(key_def.clone());
-        }
-
-        let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
-        let key_def = index.meta()?.to_key_def();
-        Ok(Rc::new(key_def))
-    }
-
-    /// Return a `KeyDef` to be used for comparing **keys** of the
-    /// corresponding global space.
-    pub(crate) fn key_def_for_key(
-        &self,
-        space_id: SpaceId,
-        index_id: IndexId,
-    ) -> tarantool::Result<Rc<KeyDef>> {
-        static mut KEY_DEF: Option<HashMap<(ClusterwideTable, IndexId), Rc<KeyDef>>> = None;
-        let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) };
-        if let Ok(sys_space_id) = ClusterwideTable::try_from(space_id) {
-            let key_def = match key_defs.entry((sys_space_id, index_id)) {
-                Entry::Occupied(o) => o.into_mut(),
-                Entry::Vacant(v) => {
-                    let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
-                    let key_def = index.meta()?.to_key_def_for_key();
-                    // System space definition's never change during a single
-                    // execution, so it's safe to cache these
-                    v.insert(Rc::new(key_def))
-                }
-            };
-            return Ok(key_def.clone());
-        }
-
-        let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
-        let key_def = index.meta()?.to_key_def_for_key();
-        Ok(Rc::new(key_def))
-    }
-
     /// Perform the `dml` operation on the local storage.
     /// When possible, return the new tuple produced by the operation:
     ///   * `Some(tuple)` in case of insert and replace;
@@ -1087,6 +1031,59 @@ impl Clusterwide {
     }
 }
 
+/// Return a `KeyDef` to be used for comparing **tuples** of the corresponding global table.
+///
+/// The return value is cached for system tables.
+pub fn cached_key_def(space_id: SpaceId, index_id: IndexId) -> tarantool::Result<Rc<KeyDef>> {
+    static mut KEY_DEF: Option<HashMap<(ClusterwideTable, IndexId), Rc<KeyDef>>> = None;
+    let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) };
+    if let Ok(sys_space_id) = ClusterwideTable::try_from(space_id) {
+        let key_def = match key_defs.entry((sys_space_id, index_id)) {
+            Entry::Occupied(o) => o.into_mut(),
+            Entry::Vacant(v) => {
+                let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
+                let key_def = index.meta()?.to_key_def();
+                // System space definition's never change during a single
+                // execution, so it's safe to cache these
+                v.insert(Rc::new(key_def))
+            }
+        };
+        return Ok(key_def.clone());
+    }
+
+    let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
+    let key_def = index.meta()?.to_key_def();
+    Ok(Rc::new(key_def))
+}
+
+/// Return a `KeyDef` to be used for comparing **keys** of the corresponding global table.
+///
+/// The return value is cached for system tables.
+pub(crate) fn cached_key_def_for_key(
+    space_id: SpaceId,
+    index_id: IndexId,
+) -> tarantool::Result<Rc<KeyDef>> {
+    static mut KEY_DEF: Option<HashMap<(ClusterwideTable, IndexId), Rc<KeyDef>>> = None;
+    let key_defs = unsafe { KEY_DEF.get_or_insert_with(HashMap::new) };
+    if let Ok(sys_space_id) = ClusterwideTable::try_from(space_id) {
+        let key_def = match key_defs.entry((sys_space_id, index_id)) {
+            Entry::Occupied(o) => o.into_mut(),
+            Entry::Vacant(v) => {
+                let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
+                let key_def = index.meta()?.to_key_def_for_key();
+                // System space definition's never change during a single
+                // execution, so it's safe to cache these
+                v.insert(Rc::new(key_def))
+            }
+        };
+        return Ok(key_def.clone());
+    }
+
+    let index = unsafe { Index::from_ids_unchecked(space_id, index_id) };
+    let key_def = index.meta()?.to_key_def_for_key();
+    Ok(Rc::new(key_def))
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // ClusterwideTable
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 8827ce33c7..710ae5e803 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -23,6 +23,7 @@ use crate::schema::SchemaObjectType;
 use crate::schema::{Distribution, IndexDef, IndexOption, TableDef};
 use crate::sentinel;
 use crate::storage::acl;
+use crate::storage::cached_key_def;
 use crate::storage::ddl_meta_drop_routine;
 use crate::storage::ddl_meta_drop_space;
 use crate::storage::space_by_id;
@@ -791,10 +792,8 @@ impl NodeImpl {
                     Dml::Delete { key, .. } => s.get(key),
                     Dml::Replace { tuple, .. } => {
                         let tuple = Tuple::from(tuple);
-                        let key_def = self
-                            .storage
-                            .key_def(s.id(), 0)
-                            .expect("index for space must be found");
+                        let key_def =
+                            cached_key_def(s.id(), 0).expect("index for space must be found");
                         let key = key_def
                             .extract_key(&tuple)
                             .expect("cas should validate operation before committing a log entry");
-- 
GitLab