From aaf9d433164ccd64fc1685249e671af4fe7c8682 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 24 Nov 2023 20:15:36 +0300
Subject: [PATCH] fix: verify dml in proc_cas before it is added to raft log

---
 CHANGELOG.md         |   1 +
 src/cas.rs           |  20 ++++-
 src/storage.rs       | 191 ++++++++++++++++++++++++++++++++++---------
 src/traft/node.rs    |  10 +--
 test/int/test_cas.py |  60 ++++++++++++++
 5 files changed, 235 insertions(+), 47 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 0625a7e355..10b190c5a6 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -108,6 +108,7 @@ with the `YY.0M.MICRO` scheme.
 - Add `pico.change_password()`
 - Add `pico.wait_ddl_finalize()`
 - Make `pico.cas` follow access control rules
+- `pico.cas` now verifies dml operations before applying them
 - Change `pico.raft_log()` arguments
 - Make `opts.timeout` optional in most functions
 
diff --git a/src/cas.rs b/src/cas.rs
index 9e6f9bdc4d..997db280bc 100644
--- a/src/cas.rs
+++ b/src/cas.rs
@@ -27,6 +27,7 @@ use tarantool::session;
 use tarantool::session::UserId;
 use tarantool::space::{Space, SpaceId};
 use tarantool::tlua;
+use tarantool::transaction;
 use tarantool::tuple::{KeyDef, ToTupleBuffer, Tuple, TupleBuffer};
 
 /// This spaces cannot be changed directly dy a [`Dml`] operation. They have
@@ -274,9 +275,24 @@ fn proc_cas_local(req: Request) -> Result<Response> {
         // Note: audit log record is automatically emmitted,
         // because it is hooked into AccessDenied error creation
         box_access_check_space(dml.space(), PrivType::Write)?;
-    }
 
-    // TODO: apply to limbo first
+        // Check if the requested dml is applicable to the local storage.
+        // This will run the required on_replace triggers which will check among
+        // other things conformity to space format, user defined constraints etc.
+        //
+        // FIXME: this works for explicit constraints which would be directly
+        // violated by the operation, but there are still some cases where an
+        // invalid dml operation would be proposed to the raft log, which would
+        // fail to apply, e.g. if there's a number of dml operations in flight
+        // which conflict via the secondary key.
+        // To fix these cases we would need to implement the so-called "limbo".
+        // See https://git.picodata.io/picodata/picodata/picodata/-/issues/368
+        transaction::begin()?;
+        let res = storage.do_dml(dml);
+        transaction::rollback().expect("can't fail");
+        // Return the error if it happened. Ignore the tuple if there was one.
+        _ = res?;
+    }
 
     // Don't wait for the proposal to be accepted, instead return the index
     // to the requestor, so that they can wait for it.
diff --git a/src/storage.rs b/src/storage.rs
index 69ef531907..ef26f7c4b7 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -26,6 +26,7 @@ use crate::tlog;
 use crate::traft;
 use crate::traft::error::Error;
 use crate::traft::op::Ddl;
+use crate::traft::op::Dml;
 use crate::traft::RaftEntryId;
 use crate::traft::RaftId;
 use crate::traft::RaftIndex;
@@ -941,45 +942,16 @@ impl Clusterwide {
         Ok(Rc::new(key_def))
     }
 
-    pub(crate) fn insert(
-        &self,
-        space_id: SpaceId,
-        tuple: &TupleBuffer,
-    ) -> tarantool::Result<Tuple> {
-        let space = space_by_id_unchecked(space_id);
-        let res = space.insert(tuple)?;
-        Ok(res)
-    }
-
-    pub(crate) fn replace(
-        &self,
-        space_id: SpaceId,
-        tuple: &TupleBuffer,
-    ) -> tarantool::Result<Tuple> {
-        let space = space_by_id_unchecked(space_id);
-        let res = space.replace(tuple)?;
-        Ok(res)
-    }
-
-    pub(crate) fn update(
-        &self,
-        space_id: SpaceId,
-        key: &TupleBuffer,
-        ops: &[TupleBuffer],
-    ) -> tarantool::Result<Option<Tuple>> {
-        let space = space_by_id_unchecked(space_id);
-        let res = space.update(key, ops)?;
-        Ok(res)
-    }
-
-    pub(crate) fn delete(
-        &self,
-        space_id: SpaceId,
-        key: &TupleBuffer,
-    ) -> tarantool::Result<Option<Tuple>> {
-        let space = space_by_id_unchecked(space_id);
-        let res = space.delete(key)?;
-        Ok(res)
+    /// Perform the `dml` operation on the local storage.
+    #[inline]
+    pub fn do_dml(&self, dml: &Dml) -> tarantool::Result<Option<Tuple>> {
+        let space = space_by_id_unchecked(dml.space());
+        match dml {
+            Dml::Insert { tuple, .. } => space.insert(tuple).map(Some),
+            Dml::Replace { tuple, .. } => space.replace(tuple).map(Some),
+            Dml::Update { key, ops, .. } => space.update(key, ops),
+            Dml::Delete { key, .. } => space.delete(key),
+        }
     }
 }
 
@@ -1114,6 +1086,73 @@ impl From<ClusterwideTable> for SpaceId {
     }
 }
 
+impl PropertyName {
+    /// Returns `true` if this property cannot be deleted. These are usually the
+    /// values only updated by picodata and some subsystems rely on them always
+    /// being set (or not being unset).
+    #[inline(always)]
+    fn must_not_delete(&self) -> bool {
+        matches!(
+            self,
+            Self::VshardBootstrapped | Self::GlobalSchemaVersion | Self::NextSchemaVersion
+        )
+    }
+
+    /// Verify type of the property value being inserted (or replaced) in the
+    /// _pico_property table. `self` is the first field of the `new` tuple.
+    #[inline]
+    fn verify_new_tuple(&self, new: &Tuple) -> Result<()> {
+        // TODO: some of these properties are only supposed to be updated by
+        // picodata. Maybe for these properties we should check the effective
+        // user id and if it's not admin we deny the change. We'll have to set
+        // effective user id to the one who requested the dml in proc_cas.
+
+        let map_err = |e: TntError| -> Error {
+            // Make the msgpack decoding error message a little bit more human readable.
+            match e {
+                TntError::Decode { error, .. } => {
+                    Error::other(format!("incorrect type of property {self}: {error}"))
+                }
+                e => e.into(),
+            }
+        };
+
+        match self {
+            Self::VshardBootstrapped => {
+                // Check it's a bool.
+                _ = new.field::<bool>(1)?;
+            }
+            Self::NextSchemaVersion
+            | Self::PendingSchemaVersion
+            | Self::GlobalSchemaVersion
+            | Self::PasswordMinLength
+            | Self::MaxLoginAttempts
+            | Self::MaxPgPortals
+            | Self::SnapshotChunkMaxSize => {
+                // Check it's an unsigned integer.
+                _ = new.field::<u64>(1).map_err(map_err)?;
+            }
+            Self::AutoOfflineTimeout
+            | Self::MaxHeartbeatPeriod
+            | Self::SnapshotReadViewCloseTimeout => {
+                // Check it's a floating point number.
+                // NOTE: serde implicitly converts integers to floats for us here.
+                _ = new.field::<f64>(1).map_err(map_err)?;
+            }
+            Self::CurrentVshardConfig | Self::TargetVshardConfig => {
+                // Check it decodes into VshardConfig.
+                _ = new.field::<VshardConfig>(1).map_err(map_err)?;
+            }
+            Self::PendingSchemaChange => {
+                // Check it decodes into Ddl.
+                _ = new.field::<Ddl>(1).map_err(map_err)?;
+            }
+        }
+
+        Ok(())
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // Properties
 ////////////////////////////////////////////////////////////////////////////////
@@ -1142,9 +1181,53 @@ impl Properties {
             .if_not_exists(true)
             .create()?;
 
+        on_replace(space.id(), Self::on_replace)?;
+
         Ok(Self { space, index })
     }
 
+    /// Callback which is called when data in _pico_property is updated.
+    pub fn on_replace(old: Option<Tuple>, new: Option<Tuple>) -> Result<()> {
+        match (old, new) {
+            (Some(old), None) => {
+                // Delete
+                let Ok(Some(key)) = old.field::<PropertyName>(0) else {
+                    // Not a builtin property.
+                    return Ok(());
+                };
+
+                if key.must_not_delete() {
+                    return Err(Error::other(format!("property {key} cannot be deleted")));
+                }
+            }
+            (old, Some(new)) => {
+                // Insert or Update
+                let Ok(Some(key)) = new.field::<PropertyName>(0) else {
+                    // Not a builtin property.
+                    // Cannot be a wrong type error, because tarantool checks
+                    // the format for us.
+                    if old.is_none() { // Insert
+                        // FIXME: this is currently printed twice
+                        tlog!(Warning, "non builtin property inserted into _pico_property, this may be an error in a future version of picodata");
+                    }
+                    return Ok(());
+                };
+
+                key.verify_new_tuple(&new)?;
+
+                let field_count = new.len();
+                if field_count != 2 {
+                    return Err(Error::other(format!(
+                        "too many fields: got {field_count}, expected 2"
+                    )));
+                }
+            }
+            (None, None) => unreachable!(),
+        }
+
+        Ok(())
+    }
+
     #[inline]
     pub fn get<T>(&self, key: PropertyName) -> tarantool::Result<Option<T>>
     where
@@ -3016,6 +3099,36 @@ pub fn set_local_schema_version(v: u64) -> tarantool::Result<()> {
 
 pub const SPACE_ID_INTERNAL_MAX: u32 = 1024;
 
+////////////////////////////////////////////////////////////////////////////////
+// misc
+////////////////////////////////////////////////////////////////////////////////
+
+/// Add a on_replace trigger for space with `space_id` so that `cb` is called
+/// each time the space is updated.
+#[inline]
+pub fn on_replace<F>(space_id: SpaceId, mut cb: F) -> tarantool::Result<()>
+where
+    F: FnMut(Option<Tuple>, Option<Tuple>) -> Result<()> + 'static,
+{
+    // FIXME: rewrite using ffi-api when it's available
+    // See: https://git.picodata.io/picodata/picodata/tarantool-module/-/issues/130
+    let lua = ::tarantool::lua_state();
+    let on_replace = tlua::Function::new(
+        move |old: Option<Tuple>, new: Option<Tuple>, lua: tlua::LuaState| {
+            if let Err(e) = cb(old, new) {
+                tlua::error!(lua, "{}", e);
+            }
+        },
+    );
+    lua.exec_with(
+        "local space_id, callback = ...
+        box.space[space_id]:on_replace(callback)",
+        (space_id, on_replace),
+    )
+    .map_err(tlua::LuaError::from)?;
+    Ok(())
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // tests
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 502a4b48ed..a6999b2794 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -733,12 +733,10 @@ impl NodeImpl {
         match op {
             Op::Nop => {}
             Op::Dml(op) => {
-                let res = match &op {
-                    Dml::Insert { table, tuple } => self.storage.insert(*table, tuple).map(Some),
-                    Dml::Replace { table, tuple } => self.storage.replace(*table, tuple).map(Some),
-                    Dml::Update { table, key, ops } => self.storage.update(*table, key, ops),
-                    Dml::Delete { table, key } => self.storage.delete(*table, key),
-                };
+                let res = self.storage.do_dml(&op);
+                if let Err(e) = &res {
+                    tlog!(Error, "clusterwide dml failed: {e}");
+                }
                 result = Box::new(res) as _;
             }
             Op::DdlPrepare {
diff --git a/test/int/test_cas.py b/test/int/test_cas.py
index 9ca0d14aba..7952be58b5 100644
--- a/test/int/test_cas.py
+++ b/test/int/test_cas.py
@@ -96,6 +96,66 @@ def test_cas_errors(instance: Instance):
             + "in a predicate",
         )
 
+    # Field type error
+    with pytest.raises(TarantoolError) as error:
+        instance.cas(
+            "insert",
+            "_pico_property",
+            [1, 2, 3],
+        )
+    assert error.value.args == (
+        "ER_PROC_C",
+        "tarantool error: FieldType: Tuple field 1 (key) type does not match one required by operation: expected string, got unsigned",  # noqa: E501
+    )
+
+    # Delete of undeletable property
+    with pytest.raises(TarantoolError) as error:
+        instance.cas(
+            "delete",
+            "_pico_property",
+            ["next_schema_version"],
+        )
+    assert error.value.args == (
+        "ER_PROC_C",
+        "tarantool error: ProcLua: property next_schema_version cannot be deleted",  # noqa: E501
+    )
+
+    # Incorrect type of builtin property
+    with pytest.raises(TarantoolError) as error:
+        instance.cas(
+            "replace",
+            "_pico_property",
+            ["global_schema_version", "this is not a version number"],
+        )
+    assert error.value.args == (
+        "ER_PROC_C",
+        """tarantool error: ProcLua: incorrect type of property global_schema_version: invalid type: string "this is not a version number", expected u64""",  # noqa: E501
+    )
+
+    # Too many values for builtin property
+    with pytest.raises(TarantoolError) as error:
+        instance.cas(
+            "replace",
+            "_pico_property",
+            ["password_min_length", 13, 37],
+        )
+    assert error.value.args == (
+        "ER_PROC_C",
+        "tarantool error: ProcLua: too many fields: got 3, expected 2",
+    )
+
+    # Not enough values for builtin property
+    with pytest.raises(TarantoolError) as error:
+        instance.cas(
+            "replace",
+            "_pico_property",
+            ["auto_offline_timeout"],
+        )
+    assert error.value.args == (
+        "ER_PROC_C",
+        "tarantool error: FieldMissing: Tuple field 2 (value) required by space format is missing",  # noqa: E501
+    )
+
 
 def test_cas_predicate(instance: Instance):
     instance.raft_compact_log()
-- 
GitLab