From 71146eefe07de82e8c908303c0a6559902493f94 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 5 Dec 2024 16:40:20 +0300 Subject: [PATCH] refactor: clean up handle_dml_entry --- src/traft/node.rs | 192 +++++++++++++++++++++++++--------------------- 1 file changed, 104 insertions(+), 88 deletions(-) diff --git a/src/traft/node.rs b/src/traft/node.rs index 1ff3888ff6..563e858617 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -6,6 +6,7 @@ //! - processing raft `Ready` - persisting entries, communicating with other raft nodes. use crate::access_control::user_by_id; +use crate::access_control::UserMetadata; use crate::config::PicodataConfig; use crate::governor; use crate::has_states; @@ -814,109 +815,44 @@ impl NodeImpl { let new = self.storage.do_dml(op)?; Ok((old, new)) }); - - let initiator = op.initiator(); - let initiator_def = user_by_id(initiator).expect("user must exist"); - - match &res { + let (old, new) = match res { + Ok(v) => v, Err(e) => { tlog!(Error, "clusterwide dml failed: {e}"); return false; } - // Handle insert, replace, update in _pico_instance - Ok((old, Some(new))) if space == Ok(ClusterwideTable::Instance) => { - // Dml::Delete mandates that new tuple is None. - assert!(!matches!(op, Dml::Delete { .. })); + }; + + let Ok(space) = space else { + // Not a builtin system table, nothing left to do here + return true; + }; + + let initiator = op.initiator(); + let initiator_def = user_by_id(initiator).expect("user must exist"); + // FIXME: all of this should be done only after the transaction is committed + // See <https://git.picodata.io/core/picodata/-/issues/1149> + if let Some(new) = &new { + // Dml::Delete mandates that new tuple is None. + assert!(!matches!(op, Dml::Delete { .. })); + + // Handle insert, replace, update in _pico_instance + if space == ClusterwideTable::Instance { let old: Option<Instance> = old.as_ref().map(|x| x.decode().expect("must be Instance")); - // FIXME: we do this prematurely, because the - // transaction may still be rolled back for some reason. let new: Instance = new .decode() .expect("tuple already passed format verification"); - // Check if we're handling a "new node joined" event: - // * Either there's no tuple for this node in the storage; - // * Or its raft id has changed, meaning it's no longer the same node. - // WARN: this condition will not pass on the joining instance - // as it preemptively puts itself into `_pico_instance` table. - // Locally it's logged in src/lib.rs. - if old.as_ref().map(|x| x.raft_id) != Some(new.raft_id) { - let instance_name = &new.name; - crate::audit!( - message: "a new instance `{instance_name}` joined the cluster", - title: "join_instance", - severity: Low, - instance_name: %instance_name, - raft_id: %new.raft_id, - initiator: &initiator_def.name, - ); - crate::audit!( - message: "local database created on `{instance_name}`", - title: "create_local_db", - severity: Low, - instance_name: %instance_name, - raft_id: %new.raft_id, - initiator: &initiator_def.name, - ); - } - - if old.as_ref().map(|x| x.current_state) != Some(new.current_state) { - let instance_name = &new.name; - let state = &new.current_state; - crate::audit!( - message: "current state of instance `{instance_name}` changed to {state}", - title: "change_current_state", - severity: Medium, - instance_name: %instance_name, - raft_id: %new.raft_id, - new_state: %state, - initiator: &initiator_def.name, - ); - } + do_audit_logging_for_instance_update(old.as_ref(), &new, &initiator_def); - if old.as_ref().map(|x| x.target_state) != Some(new.target_state) { - let instance_name = &new.name; - let state = &new.target_state; - crate::audit!( - message: "target state of instance `{instance_name}` changed to {state}", - title: "change_target_state", - severity: Low, - instance_name: %instance_name, - raft_id: %new.raft_id, - new_state: %state, - initiator: &initiator_def.name, - ); - } - - if has_states!(new, Expelled -> *) { - let instance_name = &new.name; - crate::audit!( - message: "instance `{instance_name}` was expelled from the cluster", - title: "expel_instance", - severity: Low, - instance_name: %instance_name, - raft_id: %new.raft_id, - initiator: &initiator_def.name, - ); - crate::audit!( - message: "local database dropped on `{instance_name}`", - title: "drop_local_db", - severity: Low, - instance_name: %instance_name, - raft_id: %new.raft_id, - initiator: &initiator_def.name, - ); - - if new.raft_id == self.raft_id() { - // cannot exit during a transaction - *expelled = true; - } + if has_states!(new, Expelled -> *) && new.raft_id == self.raft_id() { + // cannot exit during a transaction + *expelled = true; } } - Ok(_) => {} } true @@ -2636,3 +2572,83 @@ fn proc_raft_promote() -> traft::Result<()> { node.campaign_and_yield()?; Ok(()) } + +fn do_audit_logging_for_instance_update( + old: Option<&Instance>, + new: &Instance, + initiator_def: &UserMetadata, +) { + // Check if we're handling a "new node joined" event: + // * Either there's no tuple for this node in the storage; + // * Or its raft id has changed, meaning it's no longer the same node. + // WARN: this condition will not pass on the joining instance + // as it preemptively puts itself into `_pico_instance` table. + // Locally it's logged in src/lib.rs. + if old.map(|x| x.raft_id) != Some(new.raft_id) { + let instance_name = &new.name; + crate::audit!( + message: "a new instance `{instance_name}` joined the cluster", + title: "join_instance", + severity: Low, + instance_name: %instance_name, + raft_id: %new.raft_id, + initiator: &initiator_def.name, + ); + crate::audit!( + message: "local database created on `{instance_name}`", + title: "create_local_db", + severity: Low, + instance_name: %instance_name, + raft_id: %new.raft_id, + initiator: &initiator_def.name, + ); + } + + if old.map(|x| x.current_state) != Some(new.current_state) { + let instance_name = &new.name; + let state = &new.current_state; + crate::audit!( + message: "current state of instance `{instance_name}` changed to {state}", + title: "change_current_state", + severity: Medium, + instance_name: %instance_name, + raft_id: %new.raft_id, + new_state: %state, + initiator: &initiator_def.name, + ); + } + + if old.map(|x| x.target_state) != Some(new.target_state) { + let instance_name = &new.name; + let state = &new.target_state; + crate::audit!( + message: "target state of instance `{instance_name}` changed to {state}", + title: "change_target_state", + severity: Low, + instance_name: %instance_name, + raft_id: %new.raft_id, + new_state: %state, + initiator: &initiator_def.name, + ); + } + + if has_states!(new, Expelled -> *) { + let instance_name = &new.name; + crate::audit!( + message: "instance `{instance_name}` was expelled from the cluster", + title: "expel_instance", + severity: Low, + instance_name: %instance_name, + raft_id: %new.raft_id, + initiator: &initiator_def.name, + ); + crate::audit!( + message: "local database dropped on `{instance_name}`", + title: "drop_local_db", + severity: Low, + instance_name: %instance_name, + raft_id: %new.raft_id, + initiator: &initiator_def.name, + ); + } +} -- GitLab