From b2d6236f06c73dfa99322211ac2ad16476cf9a82 Mon Sep 17 00:00:00 2001
From: Dmitry Ivanov <ivadmi5@gmail.com>
Date: Fri, 1 Dec 2023 14:53:06 +0300
Subject: [PATCH] fix(audit): fix broken instance-related events

This patch reintroduces instance-related events (e.g. `join_instance`)
which were accidentally disabled in a bugfix of the DML handler (see
commit 52ed92378494ba4 for more information) due to the lack of tests.

Furthermore, this patch adds the following events:

* expel_instance
* change_config, which tracks changes in `_pico_property`

This was the original intended change of the patch, but the apparent
breakage of the preexisting events called for action.
---
 src/storage.rs    | 49 ++++++++++++++++++++++++----
 src/traft/node.rs | 81 +++++++++++++++++++++++++++++++++++++----------
 2 files changed, 108 insertions(+), 22 deletions(-)

diff --git a/src/storage.rs b/src/storage.rs
index dfc299ae6f..9782b76a0a 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -962,14 +962,34 @@ impl Clusterwide {
     }
 
     /// Perform the `dml` operation on the local storage.
+    /// Returns a pair of `(old, new)` tuples, where:
+    /// * `old` is a tuple fetched from space before the operation;
+    /// * `new` is a tuple after the operation has completed (possibly `None`).
     #[inline]
-    pub fn do_dml(&self, dml: &Dml) -> tarantool::Result<Option<Tuple>> {
+    pub fn do_dml(&self, dml: &Dml) -> tarantool::Result<(Option<Tuple>, Option<Tuple>)> {
         let space = space_by_id_unchecked(dml.space());
         match dml {
-            Dml::Insert { tuple, .. } => space.insert(tuple).map(Some),
-            Dml::Replace { tuple, .. } => space.replace(tuple).map(Some),
-            Dml::Update { key, ops, .. } => space.update(key, ops),
-            Dml::Delete { key, .. } => space.delete(key),
+            Dml::Insert { tuple, .. } => {
+                // NB: We won't be able to insert if the key exists.
+                let new = space.insert(tuple)?;
+                Ok((None, Some(new)))
+            }
+            Dml::Replace { tuple, .. } => {
+                let key = space.primary_key().extract_key(Tuple::from(tuple));
+                let old = space.get(&key)?;
+                let new = space.replace(tuple)?;
+                Ok((old, Some(new)))
+            }
+            Dml::Update { key, ops, .. } => {
+                let old = space.get(key)?;
+                let new = space.update(key, ops)?;
+                Ok((old, new))
+            }
+            Dml::Delete { key, .. } => {
+                // NB: There'll be no new tuple after this finishes.
+                let old = space.delete(key)?;
+                Ok((old, None))
+            }
         }
     }
 }
@@ -1170,6 +1190,22 @@ impl PropertyName {
 
         Ok(())
     }
+
+    /// Try decoding property's value specifically for audit log.
+    /// Returns `Some(string)` if it should be logged, `None` otherwise.
+    pub fn should_be_audited(&self, tuple: &Tuple) -> Result<Option<String>> {
+        let bad_value = || Error::other("bad value");
+        Ok(match self {
+            Self::PasswordMinLength
+            | Self::MaxLoginAttempts
+            | Self::SnapshotChunkMaxSize
+            | Self::MaxPgPortals => {
+                let v = tuple.field::<usize>(1)?.ok_or_else(bad_value)?;
+                Some(format!("{v}"))
+            }
+            _ => None,
+        })
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1225,7 +1261,8 @@ impl Properties {
                     // Not a builtin property.
                     // Cannot be a wrong type error, because tarantool checks
                     // the format for us.
-                    if old.is_none() { // Insert
+                    if old.is_none() {
+                        // Insert
                         // FIXME: this is currently printed twice
                         tlog!(Warning, "non builtin property inserted into _pico_property, this may be an error in a future version of picodata");
                     }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 4f59fda226..ab4e576517 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -728,32 +728,66 @@ impl NodeImpl {
         match op {
             Op::Nop => {}
             Op::Dml(op) => {
+                let space = op.space().try_into();
                 let res = self.storage.do_dml(&op);
                 match &res {
                     Err(e) => {
                         tlog!(Error, "clusterwide dml failed: {e}");
                     }
-                    Ok(Some(tuple))
-                        if op.space() == ClusterwideTable::Instance.id()
-                        // TODO: do we need to log something into the audit when deleting instances?
-                        && !matches!(op, Dml::Delete { .. }) =>
-                    {
+                    // Handle delete from _pico_property
+                    Ok((Some(old), None)) if space == Ok(ClusterwideTable::Property) => {
+                        assert!(matches!(op, Dml::Delete { .. }));
+
+                        if let Ok(Some(key)) = old.field::<PropertyName>(0) {
+                            crate::audit!(
+                                message: "property `{key}` was deleted",
+                                title: "change_config",
+                                severity: High,
+                            );
+                        }
+                    }
+                    // Handle insert, replace, update in _pico_property
+                    Ok((_old, Some(new))) if space == Ok(ClusterwideTable::Property) => {
+                        // Dml::Delete mandates that new tuple is None.
+                        assert!(false == matches!(op, Dml::Delete { .. }));
+
+                        // Keep in mind that currently we allow invalid names,
+                        // which means that we have to ignore the decoding error.
+                        let key = new.field::<PropertyName>(0).ok().flatten();
+
+                        // Try decoding the value iff the name's known.
+                        let value = key
+                            .map(|p| p.should_be_audited(&new))
+                            .transpose()
+                            .expect("property value decoding should not fail")
+                            .flatten();
+
+                        if let Some((key, value)) = key.zip(value) {
+                            crate::audit!(
+                                message: "property `{key}` was changed to {value}",
+                                title: "change_config",
+                                severity: High,
+                            );
+                        }
+                    }
+                    // Handle insert, replace, update in _pico_instance
+                    Ok((old, Some(new))) if space == Ok(ClusterwideTable::Instance) => {
+                        // Dml::Delete mandates that new tuple is None.
+                        assert!(false == matches!(op, Dml::Delete { .. }));
+
+                        let old: Option<Instance> =
+                            old.as_ref().map(|x| x.decode().expect("must be Instance"));
+
                         // FIXME: we do this prematurely, because the
                         // transaction may still be rolled back for some reason.
-                        let new: Instance = tuple
+                        let new: Instance = new
                             .decode()
                             .expect("tuple already passed format verification");
 
-                        if has_grades!(new, Expelled -> *) && new.raft_id == self.raft_id() {
-                            // cannot exit during a transaction
-                            *expelled = true;
-                        }
-
                         // Check if we're handling a "new node joined" event:
                         // * Either there's no tuple for this node in the storage;
                         // * Or its raft id has changed, meaning it's no longer the same node.
-                        let prev = self.storage.instances.get(&new.instance_id).ok();
-                        if prev.as_ref().map(|x| x.raft_id) != Some(new.raft_id) {
+                        if old.as_ref().map(|x| x.raft_id) != Some(new.raft_id) {
                             let instance_id = &new.instance_id;
                             crate::audit!(
                                 message: "a new instance `{instance_id}` joined the cluster",
@@ -764,7 +798,7 @@ impl NodeImpl {
                             );
                         }
 
-                        if prev.as_ref().map(|x| x.current_grade) != Some(new.current_grade) {
+                        if old.as_ref().map(|x| x.current_grade) != Some(new.current_grade) {
                             let instance_id = &new.instance_id;
                             let grade = &new.current_grade;
                             crate::audit!(
@@ -776,7 +810,7 @@ impl NodeImpl {
                             );
                         }
 
-                        if prev.as_ref().map(|x| x.target_grade) != Some(new.target_grade) {
+                        if old.as_ref().map(|x| x.target_grade) != Some(new.target_grade) {
                             let instance_id = &new.instance_id;
                             let grade = &new.target_grade;
                             crate::audit!(
@@ -787,10 +821,25 @@ impl NodeImpl {
                                 raft_id: %new.raft_id,
                             );
                         }
+
+                        if has_grades!(new, Expelled -> *) {
+                            let instance_id = &new.instance_id;
+                            crate::audit!(
+                                message: "instance `{instance_id}` was expelled from the cluster",
+                                title: "expel_instance",
+                                severity: Low,
+                                instance_id: %instance_id,
+                                raft_id: %new.raft_id,
+                            );
+
+                            if new.raft_id == self.raft_id() {
+                                // cannot exit during a transaction
+                                *expelled = true;
+                            }
+                        }
                     }
                     Ok(_) => {}
                 }
-
                 result = Box::new(res) as _;
             }
             Op::DdlPrepare {
-- 
GitLab