From c2ae3ade0868492cb69e3c234a332fb2064bd107 Mon Sep 17 00:00:00 2001 From: Kurdakov Alexander <kusancho12@gmail.com> Date: Fri, 13 Dec 2024 20:31:43 +0300 Subject: [PATCH] feat: move out some parameters from config file to _pico_db_config - checkpoint_interval - checkpoint_count - max_concurrent_messages max_concurrent_messages were renamed to iproto_net_msg_max --- CHANGELOG.md | 8 + sbroad/sbroad-core/src/ir/value.rs | 16 ++ src/config.rs | 233 ++++++++++++++----------- src/lib.rs | 13 ++ src/sql.rs | 15 +- src/storage.rs | 54 +++--- src/storage/snapshot.rs | 32 +++- src/tarantool.rs | 3 - src/traft/node.rs | 76 +++++--- test/int/test_basics.py | 5 +- test/int/test_config_file.py | 14 -- test/int/test_runtime_configuration.py | 72 ++++++++ test/int/test_sql.py | 3 + test/pgproto/storage_limits_test.py | 1 - 14 files changed, 363 insertions(+), 182 deletions(-) create mode 100644 test/int/test_runtime_configuration.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 37dfe6ac2c..c8e6f82f0c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,14 @@ with the `YY.MINOR.MICRO` scheme. PICODATA_PLUGIN_DIR -> PICODATA_SHARE_DIR, config: instance.plugin_dir -> instance.share_dir) +- The following parameters has been moved from configuration file to `_pico_db_config` system table: + + - checkpoint_interval + - checkpoint_count + - max_concurrent_messages + +- `max_concurrent_messages` parameter is renamed to `net_msg_max` + ### CLI - `picodata expel` takes instance uuid instead of instance name. diff --git a/sbroad/sbroad-core/src/ir/value.rs b/sbroad/sbroad-core/src/ir/value.rs index 18d85399fb..732524a205 100644 --- a/sbroad/sbroad-core/src/ir/value.rs +++ b/sbroad/sbroad-core/src/ir/value.rs @@ -962,6 +962,22 @@ pub enum EncodedValue<'v> { Owned(LuaValue), } +impl<'v> EncodedValue<'v> { + /// Try to convert to double underlying value. + pub fn double(&self) -> Option<f64> { + match &self { + EncodedValue::Ref(msg_pack_value) => match msg_pack_value { + MsgPackValue::Double(value) => Some(**value), + _ => None, + }, + EncodedValue::Owned(lua_value) => match lua_value { + LuaValue::Double(value) => Some(*value), + _ => None, + }, + } + } +} + impl<'v> From<MsgPackValue<'v>> for EncodedValue<'v> { fn from(value: MsgPackValue<'v>) -> Self { EncodedValue::Ref(value) diff --git a/src/config.rs b/src/config.rs index 6ebe838f91..0920085ab6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -9,7 +9,10 @@ use crate::introspection::FieldInfo; use crate::introspection::Introspection; use crate::pgproto; use crate::replicaset::ReplicasetName; +use crate::sql::value_type_str; use crate::storage; +use crate::system_parameter_name; +use crate::tarantool::set_cfg_field; use crate::tier::Tier; use crate::tier::TierConfig; use crate::tier::DEFAULT_TIER; @@ -18,6 +21,7 @@ use crate::traft::error::Error; use crate::traft::RaftSpaceAccess; use crate::util::edit_distance; use crate::util::file_exists; +use sbroad::ir::value::{EncodedValue, Value}; use serde_yaml::Value as YamlValue; use std::collections::HashMap; use std::convert::{From, Into}; @@ -28,8 +32,6 @@ use std::path::PathBuf; use std::str::FromStr; use std::sync::atomic::{AtomicUsize, Ordering}; use tarantool::log::SayLevel; -use tarantool::tlua; -use tarantool::tuple::RawBytes; use tarantool::tuple::Tuple; /// This reexport is used in the derive macro for Introspection. @@ -788,6 +790,16 @@ Using configuration file '{args_path}'."); } } } + + // For the tests + pub(crate) fn init_for_tests() { + let config = Box::new(Self::with_defaults()); + // Safe, because we only initialize config once in a single thread. + unsafe { + assert!(GLOBAL_CONFIG.is_none()); + let _ = GLOBAL_CONFIG.insert(config); + }; + } } /// Gets the logging configuration from all possible sources and initializes the core logger. @@ -1140,10 +1152,6 @@ pub struct InstanceConfig { #[introspection(nested)] pub vinyl: VinylSection, - #[serde(default)] - #[introspection(nested)] - pub iproto: IprotoSection, - #[serde(default)] #[introspection(nested)] pub pg: pgproto::Config, @@ -1351,27 +1359,6 @@ pub struct MemtxSection { /// Corresponds to `box.cfg.memtx_memory`. #[introspection(config_default = "64M")] pub memory: Option<ByteSize>, - - /// The maximum number of snapshots that are stored in the memtx_dir - /// directory. If the number of snapshots after creating a new one exceeds - /// this value, the Tarantool garbage collector deletes old snapshots. If - /// the option is set to zero, the garbage collector does not delete old - /// snapshots. - /// - /// Corresponds to `box.cfg.checkpoint_count`. - #[introspection(config_default = 2)] - pub checkpoint_count: Option<u64>, - - /// The interval in seconds between actions by the checkpoint daemon. If the - /// option is set to a value greater than zero, and there is activity that - /// causes change to a database, then the checkpoint daemon calls - /// box.snapshot() every checkpoint_interval seconds, creating a new - /// snapshot file each time. If the option is set to zero, the checkpoint - /// daemon is disabled. - /// - /// Corresponds to `box.cfg.checkpoint_interval`. - #[introspection(config_default = 3600.0)] - pub checkpoint_interval: Option<f64>, } tarantool::define_str_enum! { @@ -1457,52 +1444,6 @@ pub struct VinylSection { pub timeout: Option<f32>, } -//////////////////////////////////////////////////////////////////////////////// -// IprotoSection -//////////////////////////////////////////////////////////////////////////////// - -#[derive( - PartialEq, - Default, - Debug, - Clone, - serde::Deserialize, - serde::Serialize, - tlua::Push, - tlua::PushInto, - Introspection, -)] -#[serde(deny_unknown_fields)] -pub struct IprotoSection { - /// To handle messages, Tarantool allocates fibers. To prevent fiber - /// overhead from affecting the whole system, Tarantool restricts how many - /// messages the fibers handle, so that some pending requests are blocked. - /// - /// On powerful systems, increase net_msg_max and the scheduler will - /// immediately start processing pending requests. - /// - /// On weaker systems, decrease net_msg_max and the overhead may decrease - /// although this may take some time because the scheduler must wait until - /// already-running requests finish. - /// - /// When net_msg_max is reached, Tarantool suspends processing of incoming - /// packages until it has processed earlier messages. This is not a direct - /// restriction of the number of fibers that handle network messages, rather - /// it is a system-wide restriction of channel bandwidth. This in turn - /// causes restriction of the number of incoming network messages that the - /// transaction processor thread handles, and therefore indirectly affects - /// the fibers that handle network messages. (The number of fibers is - /// smaller than the number of messages because messages can be released as - /// soon as they are delivered, while incoming requests might not be - /// processed until some time after delivery.) - /// - /// On typical systems, the default value (768) is correct. - /// - /// Corresponds to `box.cfg.net_msg_max` - #[introspection(config_default = 0x300)] - pub max_concurrent_messages: Option<u64>, -} - //////////////////////////////////////////////////////////////////////////////// // LogSection //////////////////////////////////////////////////////////////////////////////// @@ -1673,6 +1614,58 @@ pub struct AlterSystemParameters { #[introspection(sbroad_type = SbroadType::Unsigned)] #[introspection(config_default = 5000)] pub sql_motion_row_max: u64, + + /// The maximum number of snapshots that are stored in the memtx_dir + /// directory. If the number of snapshots after creating a new one exceeds + /// this value, the Tarantool garbage collector deletes old snapshots. If + /// the option is set to zero, the garbage collector does not delete old + /// snapshots. + /// + /// Corresponds to `box.cfg.checkpoint_count`. + #[introspection(sbroad_type = SbroadType::Unsigned)] + #[introspection(config_default = 2)] + pub memtx_checkpoint_count: u64, + + /// The interval in seconds between actions by the checkpoint daemon. If the + /// option is set to a value greater than zero, and there is activity that + /// causes change to a database, then the checkpoint daemon calls + /// box.snapshot() every checkpoint_interval seconds, creating a new + /// snapshot file each time. If the option is set to zero, the checkpoint + /// daemon is disabled. + /// + /// Corresponds to `box.cfg.checkpoint_interval`. + #[introspection(config_default = 3600)] + #[introspection(sbroad_type = SbroadType::Unsigned)] + pub memtx_checkpoint_interval: u64, + + /// To handle messages, Tarantool allocates fibers. To prevent fiber + /// overhead from affecting the whole system, Tarantool restricts how many + /// messages the fibers handle, so that some pending requests are blocked. + /// + /// On powerful systems, increase net_msg_max and the scheduler will + /// immediately start processing pending requests. + /// + /// On weaker systems, decrease net_msg_max and the overhead may decrease + /// although this may take some time because the scheduler must wait until + /// already-running requests finish. + /// + /// When net_msg_max is reached, Tarantool suspends processing of incoming + /// packages until it has processed earlier messages. This is not a direct + /// restriction of the number of fibers that handle network messages, rather + /// it is a system-wide restriction of channel bandwidth. This in turn + /// causes restriction of the number of incoming network messages that the + /// transaction processor thread handles, and therefore indirectly affects + /// the fibers that handle network messages. (The number of fibers is + /// smaller than the number of messages because messages can be released as + /// soon as they are delivered, while incoming requests might not be + /// processed until some time after delivery.) + /// + /// On typical systems, the default value (768) is correct. + /// + /// Corresponds to `box.cfg.net_msg_max` + #[introspection(sbroad_type = SbroadType::Unsigned)] + #[introspection(config_default = 0x300)] + pub iproto_net_msg_max: u64, } /// A special macro helper for referring to alter system parameters thoroughout @@ -1700,43 +1693,33 @@ pub static MAX_PG_STATEMENTS: AtomicUsize = AtomicUsize::new(0); /// 0 means that the value must be read from the table. pub static MAX_PG_PORTALS: AtomicUsize = AtomicUsize::new(0); -pub fn validate_alter_system_parameter_tuple(name: &str, tuple: &Tuple) -> Result<(), Error> { +pub fn validate_alter_system_parameter_value<'v>( + name: &str, + value: &'v Value, +) -> Result<EncodedValue<'v>, Error> { let Some(expected_type) = get_type_of_alter_system_parameter(name) else { return Err(Error::other(format!("unknown parameter: '{name}'"))); }; - let field_count = tuple.len(); - if field_count != 2 { - #[rustfmt::skip] - return Err(Error::other(format!("too many fields: got {field_count}, expected 2"))); - } - - let raw_field = tuple.field::<&RawBytes>(1)?.expect("value is not nullable"); - crate::util::check_msgpack_matches_type(raw_field, expected_type)?; + let Ok(casted_value) = value.cast_and_encode(&expected_type) else { + let actual_type = value_type_str(value); + return Err(Error::other(format!( + "invalid value for '{name}' expected {expected_type}, got {actual_type}", + ))); + }; // Not sure how I feel about this... if name.ends_with("_timeout") { - let value = tuple.field::<f64>(1)?.expect("type already checked"); + let value = casted_value + .double() + .expect("unreachable, already casted to double"); + if value < 0.0 { - #[rustfmt::skip] return Err(Error::other("timeout value cannot be negative")); } } - // TODO: implement caching for all `config::AlterSystemParameters`. - if name == system_parameter_name!(max_pg_portals) { - let value = tuple.field::<usize>(1)?.expect("type already checked"); - // Cache the value. - MAX_PG_PORTALS.store(value, Ordering::Relaxed); - } - - if name == system_parameter_name!(max_pg_statements) { - let value = tuple.field::<usize>(1)?.expect("type already checked"); - // Cache the value. - MAX_PG_STATEMENTS.store(value, Ordering::Relaxed); - } - - Ok(()) + Ok(casted_value) } /// Returns `None` if there's no such parameter. @@ -1800,6 +1783,60 @@ pub fn get_defaults_for_all_alter_system_parameters() -> Vec<(String, rmpv::Valu result } +/// Non-persistent apply of parameter from _pico_db_config +/// represented by key-value tuple. +/// +/// In case of dynamic parameter, apply parameter via box.cfg. +/// +/// Panic in following cases: +/// - tuple is not key-value with predefined schema +/// - while applying via box.cfg +pub fn apply_parameter(tuple: Tuple) { + let name = tuple + .field::<&str>(0) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("key is always present and it's type string"); + + // set dynamic parameters + if name == system_parameter_name!(memtx_checkpoint_count) { + let value = tuple + .field::<u64>(1) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("checkpoint_count", value).expect("changing checkpoint_count shouldn't fail"); + } else if name == system_parameter_name!(memtx_checkpoint_interval) { + let value = tuple + .field::<f64>(1) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("checkpoint_interval", value) + .expect("changing checkpoint_interval shouldn't fail"); + } else if name == system_parameter_name!(iproto_net_msg_max) { + let value = tuple + .field::<u64>(1) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("net_msg_max", value).expect("changing net_msg_max shouldn't fail"); + } else if name == system_parameter_name!(max_pg_portals) { + let value = tuple + .field::<usize>(1) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("type already checked"); + // Cache the value. + MAX_PG_PORTALS.store(value, Ordering::Relaxed); + } else if name == system_parameter_name!(max_pg_statements) { + let value = tuple + .field::<usize>(1) + .expect("there is always 2 fields in _pico_db_config tuple") + .expect("type already checked"); + // Cache the value. + MAX_PG_STATEMENTS.store(value, Ordering::Relaxed); + } +} + //////////////////////////////////////////////////////////////////////////////// // deserialize_map_forbid_duplicate_keys //////////////////////////////////////////////////////////////////////////////// diff --git a/src/lib.rs b/src/lib.rs index 2c72a5b28b..de688e5692 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,9 @@ #![allow(clippy::vec_init_then_push)] #![allow(clippy::unused_io_amount)] #![allow(clippy::bool_assert_comparison)] +use config::apply_parameter; use serde::{Deserialize, Serialize}; +use storage::ToEntryIter; use ::raft::prelude as raft; use ::tarantool::error::Error as TntError; @@ -558,6 +560,15 @@ fn set_on_access_denied_audit_trigger() { .expect("setting on auth trigger should not fail") } +/// Apply all dynamic parameters from `_pico_db_config` via box.cfg +fn reapply_dynamic_parameters(storage: &Clusterwide) -> Result<()> { + for parameter in storage.db_config.iter()? { + apply_parameter(parameter); + } + + Ok(()) +} + #[allow(clippy::enum_variant_names)] #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub enum Entrypoint { @@ -939,6 +950,8 @@ fn postjoin( config.validate_storage(&storage, &raft_storage)?; + reapply_dynamic_parameters(&storage)?; + if let Some(config) = &config.instance.audit { let raft_id = raft_storage .raft_id() diff --git a/src/sql.rs b/src/sql.rs index d54ae5521f..74d38be3c0 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1154,16 +1154,9 @@ fn alter_system_ir_node_to_op_or_result( param_name, param_value, } => { - let Some(expected_type) = crate::config::get_type_of_alter_system_parameter(param_name) - else { - return Err(Error::other(format!("unknown parameter: '{param_name}'"))); - }; - let Ok(casted_value) = param_value.cast_and_encode(&expected_type) else { - let actual_type = value_type_str(param_value); - return Err(Error::other(format!( - "invalid value for '{param_name}' expected {expected_type}, got {actual_type}", - ))); - }; + let casted_value = + crate::config::validate_alter_system_parameter_value(param_name, param_value)?; + let dml = Dml::replace(table, &(param_name, casted_value), initiator)?; Ok(Continue(Op::Dml(dml))) @@ -1770,7 +1763,7 @@ fn do_dml_on_global_tbl(mut query: Query<RouterRuntime>) -> traft::Result<Consum } // TODO: move this to sbroad -fn value_type_str(value: &Value) -> &'static str { +pub(crate) fn value_type_str(value: &Value) -> &'static str { match value { Value::Boolean { .. } => "boolean", Value::Decimal { .. } => "decimal", diff --git a/src/storage.rs b/src/storage.rs index 2b08d55d98..745bf70803 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -3072,8 +3072,6 @@ impl DbConfig { .if_not_exists(true) .create()?; - on_replace(space.id(), Self::on_replace)?; - Ok(Self { space, index }) } @@ -3120,26 +3118,6 @@ impl DbConfig { Ok(res) } - /// Callback which is called when data in _pico_db_config is updated. - pub fn on_replace(old: Option<Tuple>, new: Option<Tuple>) -> Result<()> { - _ = old; - - // Both `reset` and `set` (in context of alter system) command - // is insert operation, so `new` always is not none. - let new = new.expect("can't "); - let key = new - .field::<&str>(0) - .expect("key has type string") - .expect("key is not nullable"); - - // We can skip type checking (`get_type_of_alter_system_parameter`), - // because it was verified in sql.rs and via alter system - // we can't insert non whitelisted parameters in _pico_db_config. - config::validate_alter_system_parameter_tuple(key, &new)?; - - Ok(()) - } - #[inline] pub fn get<T>(&self, key: &'static str) -> tarantool::Result<Option<T>> where @@ -3237,16 +3215,36 @@ impl DbConfig { #[inline] pub fn sql_motion_row_max(&self) -> tarantool::Result<u64> { - #[rustfmt::skip] - let res: u64 = self.get_or_default(system_parameter_name!(sql_motion_row_max))?; - Ok(res) + self.get_or_default(system_parameter_name!(sql_motion_row_max)) } #[inline] pub fn sql_vdbe_opcode_max(&self) -> tarantool::Result<u64> { - #[rustfmt::skip] - let res: u64 = self.get_or_default(system_parameter_name!(sql_vdbe_opcode_max))?; - Ok(res) + self.get_or_default(system_parameter_name!(sql_vdbe_opcode_max)) + } + + #[inline] + pub fn memtx_checkpoint_count(&self) -> tarantool::Result<u64> { + self.get_or_default(system_parameter_name!(memtx_checkpoint_count)) + } + + #[inline] + pub fn memtx_checkpoint_interval(&self) -> tarantool::Result<u64> { + self.get_or_default(system_parameter_name!(memtx_checkpoint_interval)) + } + + #[inline] + pub fn iproto_net_msg_max(&self) -> tarantool::Result<u64> { + self.get_or_default(system_parameter_name!(iproto_net_msg_max)) + } +} + +impl ToEntryIter<MP_SERDE> for DbConfig { + type Entry = Tuple; + + #[inline(always)] + fn index_iter(&self) -> tarantool::Result<IndexIterator> { + self.space.select(IteratorType::All, &()) } } diff --git a/src/storage/snapshot.rs b/src/storage/snapshot.rs index 13baeb88a7..b2f40c6c84 100644 --- a/src/storage/snapshot.rs +++ b/src/storage/snapshot.rs @@ -296,9 +296,15 @@ impl Clusterwide { /// /// This should be called within a transaction. /// + /// Returns changed dynamic parameters as key-value tuples from `_pico_db_config`. + /// #[allow(rustdoc::private_intra_doc_links)] /// [`NodeImpl::advance`]: crate::traft::node::NodeImpl::advance - pub fn apply_snapshot_data(&self, data: &SnapshotData, is_master: bool) -> Result<()> { + pub fn apply_snapshot_data( + &self, + data: &SnapshotData, + is_master: bool, + ) -> Result<Vec<Vec<u8>>> { debug_assert!(unsafe { ::tarantool::ffi::tarantool::box_txn() }); // These need to be saved before we truncate the corresponding spaces. @@ -306,6 +312,9 @@ impl Clusterwide { let mut old_user_versions = HashMap::new(); let mut old_priv_versions = HashMap::new(); + let mut old_dynamic_parameters = HashSet::new(); + let mut changed_parameters = Vec::new(); + for def in self.tables.iter()? { old_table_versions.insert(def.id, def.schema_version); } @@ -317,6 +326,10 @@ impl Clusterwide { old_priv_versions.insert(def, schema_version); } + for parameter in self.db_config.iter()? { + old_dynamic_parameters.insert(parameter.to_vec()); + } + // There can be multiple space dumps for a given space in SnapshotData, // because snapshots arrive in chunks. So when restoring space dumps we // shouldn't truncate spaces which previously already restored chunk of @@ -409,6 +422,13 @@ impl Clusterwide { tuples_count += tuples.len().expect("ValueIter::from_array sets the length"); for tuple in tuples { + // We do not support removing tuples from _pico_db_config, but if we did we would have + // to keep track of which tuples were removed in the snapshot. Keep this in mind if + // in the future some ALTER SYSTEM parameters are removed and/or renamed + if space_id == PICO_DB_CONFIG && !old_dynamic_parameters.contains(tuple) { + changed_parameters.push(tuple.to_vec()) + } + space.insert(RawBytes::new(tuple))?; } // Debug: restoring rest of snapshot data took 5.9393054s [4194323 tuples] @@ -445,7 +465,7 @@ impl Clusterwide { tuples_count ); - return Ok(()); + return Ok(changed_parameters); const SCHEMA_DEFINITION_SPACES: &[SpaceId] = &[ ClusterwideTable::Table.id(), @@ -453,6 +473,8 @@ impl Clusterwide { ClusterwideTable::User.id(), ClusterwideTable::Privilege.id(), ]; + + const PICO_DB_CONFIG: SpaceId = ClusterwideTable::DbConfig.id(); } /// Updates local storage of the given schema entity in accordance with @@ -819,6 +841,7 @@ impl std::fmt::Display for SnapshotPosition { mod tests { use super::*; + use crate::config::PicodataConfig; use crate::instance::Instance; use crate::replicaset::Replicaset; use tarantool::transaction::transaction; @@ -866,7 +889,8 @@ mod tests { }); storage.for_each_space(|s| s.truncate()).unwrap(); - if let Err(e) = transaction(|| -> Result<()> { storage.apply_snapshot_data(&data, true) }) { + + if let Err(e) = transaction(|| -> Result<_> { storage.apply_snapshot_data(&data, true) }) { println!("{e}"); panic!(); } @@ -909,6 +933,8 @@ mod tests { let r = Replicaset::for_tests(); storage.replicasets.space.insert(&r).unwrap(); + PicodataConfig::init_for_tests(); + let (snapshot_data, _) = storage .first_snapshot_data_chunk(Default::default(), Default::default()) .unwrap(); diff --git a/src/tarantool.rs b/src/tarantool.rs index befd2d13e1..77e7e29375 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -350,9 +350,6 @@ impl Cfg { ("wal_dir", config_parameter_path!(instance.instance_dir)), ("memtx_dir", config_parameter_path!(instance.instance_dir)), ("vinyl_dir", config_parameter_path!(instance.instance_dir)), - ("checkpoint_count", config_parameter_path!(instance.memtx.checkpoint_count)), - ("checkpoint_interval", config_parameter_path!(instance.memtx.checkpoint_interval)), - ("net_msg_max", config_parameter_path!(instance.iproto.max_concurrent_messages)), ("vinyl_bloom_fpr", config_parameter_path!(instance.vinyl.bloom_fpr)), ("vinyl_run_count_per_level", config_parameter_path!(instance.vinyl.run_count_per_level)), ("vinyl_run_size_ratio", config_parameter_path!(instance.vinyl.run_size_ratio)), diff --git a/src/traft/node.rs b/src/traft/node.rs index b613ecb3e8..0deac60ef6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -7,6 +7,7 @@ use crate::access_control::user_by_id; use crate::access_control::UserMetadata; +use crate::config::apply_parameter; use crate::config::PicodataConfig; use crate::governor; use crate::has_states; @@ -460,6 +461,12 @@ pub(crate) struct NodeImpl { plugin_manager: Rc<PluginManager>, } +#[derive(Debug, Clone, PartialEq)] +struct AppliedDml { + table: ClusterwideTable, + new_tuple: Tuple, +} + impl NodeImpl { fn new( pool: Rc<ConnectionPool>, @@ -686,7 +693,7 @@ impl NodeImpl { } }; - let mut apply_entry_result = EntryApplied; + let mut apply_entry_result = EntryApplied(None); let mut new_applied = None; transaction(|| -> tarantool::Result<()> { self.main_loop_status("handling committed entries"); @@ -695,7 +702,7 @@ impl NodeImpl { match entry.entry_type { raft::EntryType::EntryNormal => { apply_entry_result = self.handle_committed_normal_entry(entry, expelled); - if apply_entry_result != EntryApplied { + if !matches!(apply_entry_result, EntryApplied(_)) { return Ok(()); } } @@ -730,7 +737,15 @@ impl NodeImpl { fiber::sleep(timeout); continue; } - EntryApplied => { + EntryApplied(None) => { + // Actually advance the iterator. + let _ = entries.next(); + } + EntryApplied(Some(AppliedDml { table, new_tuple })) => { + // currently only parameters from _pico_db_config processed outside of transaction (here) + debug_assert!(table == ClusterwideTable::DbConfig); + apply_parameter(new_tuple); + // Actually advance the iterator. let _ = entries.next(); } @@ -776,8 +791,8 @@ impl NodeImpl { /// Actions needed when applying a DML entry. /// - /// Returns `true` if entry was applied successfully. - fn handle_dml_entry(&self, op: &Dml, expelled: &mut bool) -> bool { + /// Returns Ok(_) if entry was applied successfully + fn handle_dml_entry(&self, op: &Dml, expelled: &mut bool) -> Result<Option<AppliedDml>, ()> { let space = op.space().try_into(); // In order to implement the audit log events, we have to compare @@ -829,13 +844,13 @@ impl NodeImpl { Ok(v) => v, Err(e) => { tlog!(Error, "clusterwide dml failed: {e}"); - return false; + return Err(()); } }; let Ok(space) = space else { // Not a builtin system table, nothing left to do here - return true; + return Ok(None); }; // FIXME: all of this should be done only after the transaction is committed @@ -899,10 +914,18 @@ impl NodeImpl { self.topology_cache.update_service_route(old, new); } + ClusterwideTable::DbConfig => { + let new_tuple = new.expect("can't delete tuple from _pico_db_config"); + return Ok(Some(AppliedDml { + table: ClusterwideTable::DbConfig, + new_tuple, + })); + } + _ => {} } - true + Ok(None) } /// Is called during a transaction @@ -919,24 +942,23 @@ impl NodeImpl { self.wake_governor_if_needed(&op); let storage_properties = &self.storage.properties; + let mut res = ApplyEntryResult::EntryApplied(None); // apply the operation match op { Op::Nop => {} Op::BatchDml { ref ops } => { for op in ops { - let ok = self.handle_dml_entry(op, expelled); - if !ok { - return SleepAndRetry; + match self.handle_dml_entry(op, expelled) { + Ok(applied_dml) => res = EntryApplied(applied_dml), + Err(()) => return SleepAndRetry, } } } - Op::Dml(op) => { - let ok = self.handle_dml_entry(&op, expelled); - if !ok { - return SleepAndRetry; - } - } + Op::Dml(op) => match self.handle_dml_entry(&op, expelled) { + Ok(applied_dml) => res = EntryApplied(applied_dml), + Err(()) => return SleepAndRetry, + }, Op::DdlPrepare { ddl, schema_version, @@ -1298,7 +1320,7 @@ impl NodeImpl { if let Some(plugin) = maybe_plugin { if plugin.enabled { warn_or_panic!("Op::DropPlugin for an enabled plugin"); - return EntryApplied; + return EntryApplied(None); } let services = self @@ -1515,7 +1537,7 @@ impl NodeImpl { let _ = notify.send(Err(e)); } - EntryApplied + res } fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> { @@ -2127,6 +2149,7 @@ impl NodeImpl { let mut new_term = None; let mut new_applied = None; let mut received_snapshot = false; + let mut changed_parameters = Vec::new(); if let Err(e) = transaction(|| -> Result<(), Error> { self.main_loop_status("persisting hard state, entries and/or snapshot"); @@ -2165,7 +2188,8 @@ impl NodeImpl { } else { // Persist the contents of the global tables from the snapshot data. let is_master = !self.is_readonly(); - self.storage + changed_parameters = self + .storage .apply_snapshot_data(&snapshot_data, is_master)?; new_applied = Some(meta.index); received_snapshot = true; @@ -2201,6 +2225,11 @@ impl NodeImpl { } if received_snapshot { + // apply changed dynamic parameters + for changed_parameter in changed_parameters { + apply_parameter(Tuple::try_from_slice(&changed_parameter)?); + } + // Need to reload the whole topology cache. We could be more // clever about it and only update the records which changed, // but at the moment this would require doing a full scan and @@ -2455,13 +2484,14 @@ impl NodeImpl { /// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be /// done as result of attempting to apply a given entry. -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, PartialEq)] enum ApplyEntryResult { /// This entry failed to apply for some reason, and must be retried later. SleepAndRetry, - /// Entry applied successfully, proceed to next entry. - EntryApplied, + /// Entry applied to persistent storage successfully and should be + /// applied to non-persistent outside of transaction, proceed to next entry. + EntryApplied(Option<AppliedDml>), } pub(crate) struct MainLoop { diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 1e5d146dd1..d49618f24f 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -364,7 +364,10 @@ Insert(_pico_db_config, ["governor_raft_op_timeout",3.0]), Insert(_pico_db_config, ["governor_common_rpc_timeout",3.0]), Insert(_pico_db_config, ["governor_plugin_rpc_timeout",10.0]), Insert(_pico_db_config, ["sql_vdbe_opcode_max",45000]), -Insert(_pico_db_config, ["sql_motion_row_max",5000]))| +Insert(_pico_db_config, ["sql_motion_row_max",5000]), +Insert(_pico_db_config, ["memtx_checkpoint_count",2]), +Insert(_pico_db_config, ["memtx_checkpoint_interval",3600]), +Insert(_pico_db_config, ["iproto_net_msg_max",768]))| | 0 | 1 |BatchDml( Insert(_pico_user, [0,"guest",0,["md5","md5084e0343a0486ff05530df6c705c8bb4"],1,"user"]), Insert(_pico_privilege, [1,0,"login","universe",0,0]), diff --git a/test/int/test_config_file.py b/test/int/test_config_file.py index 2bf8c02231..269be81493 100644 --- a/test/int/test_config_file.py +++ b/test/int/test_config_file.py @@ -106,8 +106,6 @@ instance: peer=dict(value=[f"{host}:{port}"], source="config_file"), memtx=dict( memory=dict(value="64M", source="default"), - checkpoint_count=dict(value=2, source="default"), - checkpoint_interval=dict(value=3600.0, source="default"), ), vinyl=dict( memory=dict(value="128M", source="default"), @@ -122,9 +120,6 @@ instance: write_threads=dict(value=4, source="default"), timeout=dict(value=60.0, source="default"), ), - iproto=dict( - max_concurrent_messages=dict(value=768, source="default"), - ), pg=dict( # pg is enabled by default, so listen should be set listen=dict(source="default", value="127.0.0.1:4327"), @@ -351,15 +346,10 @@ instance: memtx: memory: 2G - checkpoint_count: 8 - checkpoint_interval: 1800 vinyl: memory: 600M cache: 300M - - iproto: - max_concurrent_messages: 0x600 """ ) @@ -395,14 +385,10 @@ instance: assert box_cfg["log_format"] == "json" assert box_cfg["memtx_memory"] == 2147483648 - assert box_cfg["checkpoint_count"] == 8 - assert box_cfg["checkpoint_interval"] == 1800 assert box_cfg["vinyl_memory"] == 629145600 assert box_cfg["vinyl_cache"] == 314572800 - assert box_cfg["net_msg_max"] == 0x600 - def test_picodata_default_config(cluster: Cluster): # Check generating the default config diff --git a/test/int/test_runtime_configuration.py b/test/int/test_runtime_configuration.py new file mode 100644 index 0000000000..529723abfc --- /dev/null +++ b/test/int/test_runtime_configuration.py @@ -0,0 +1,72 @@ +import pytest +from conftest import Cluster, TarantoolError + + +def test_set_via_alter_system(cluster: Cluster): + instance = cluster.add_instance() + + # default values + box_config = instance.eval("return box.cfg") + assert box_config["net_msg_max"] == 768 + assert box_config["checkpoint_interval"] == 3600 + assert box_config["checkpoint_count"] == 2 + + # picodata parameters names are slightly different from the tarantools + instance.sql("ALTER SYSTEM SET iproto_net_msg_max TO 100") + instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 100") + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count TO 100") + + # parameters values changed + box_config = instance.eval("return box.cfg") + assert box_config["net_msg_max"] == 100 + assert box_config["checkpoint_interval"] == 100 + assert box_config["checkpoint_count"] == 100 + + # box settings isn't persistent, so it should be reapplied + instance.restart() + instance.wait_online() + + # parameters values are correct even after restart + box_config = instance.eval("return box.cfg") + assert box_config["net_msg_max"] == 100 + assert box_config["checkpoint_interval"] == 100 + assert box_config["checkpoint_count"] == 100 + + # bad values for parameters shouldn't pass validation + # stage before creating DML from ir node + with pytest.raises( + TarantoolError, + match="""invalid value for 'iproto_net_msg_max' expected unsigned, got integer""", + ): + instance.sql("ALTER SYSTEM SET iproto_net_msg_max = -1") + + with pytest.raises( + TarantoolError, + match="""invalid value for 'memtx_checkpoint_interval' expected unsigned, got decimal""", + ): + instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval = -1.0") + + with pytest.raises( + TarantoolError, + match="""invalid value for 'memtx_checkpoint_count' expected unsigned, got integer""", + ): + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count = -1") + + +def test_snapshot_and_dynamic_parameters(cluster: Cluster): + i1, i2, _ = cluster.deploy(instance_count=3) + + i2.kill() + + i1.sql("ALTER SYSTEM SET iproto_net_msg_max = 100") + + # Trigger raft log compaction + i1.sql("ALTER SYSTEM SET cluster_wal_max_count TO 1") + + # Add a new instance and restart `i2`, which catches up by raft snapshot + i2.start_and_wait() + i4 = cluster.add_instance() + + for catched_up_by_snapshot in [i2, i4]: + box_config = catched_up_by_snapshot.eval("return box.cfg") + assert box_config["net_msg_max"] == 100 diff --git a/test/int/test_sql.py b/test/int/test_sql.py index fda8055bb5..34603613d5 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -280,10 +280,13 @@ def test_read_from_system_tables(cluster: Cluster): "governor_common_rpc_timeout", "governor_plugin_rpc_timeout", "governor_raft_op_timeout", + "iproto_net_msg_max", "max_heartbeat_period", "max_login_attempts", "max_pg_portals", "max_pg_statements", + "memtx_checkpoint_count", + "memtx_checkpoint_interval", "password_enforce_digits", "password_enforce_lowercase", "password_enforce_specialchars", diff --git a/test/pgproto/storage_limits_test.py b/test/pgproto/storage_limits_test.py index 95c4856967..e0d23c042a 100644 --- a/test/pgproto/storage_limits_test.py +++ b/test/pgproto/storage_limits_test.py @@ -42,7 +42,6 @@ def test_statement_storage_config_limit(postgres: Postgres): conn = setup_pg8000_env(postgres) max_pg_statements = 32 - # not sure about not using `?`` postgres.instance.sql( f"ALTER SYSTEM SET max_pg_statements = {max_pg_statements}", sudo=True, -- GitLab