From 6e25c341465e0c2acdbc6c2c308fd7dd32d897e9 Mon Sep 17 00:00:00 2001 From: Kurdakov Alexander <kusancho12@gmail.com> Date: Fri, 14 Feb 2025 23:14:56 +0300 Subject: [PATCH] feat: scopes for parameters in _pico_db_config Motivation: some of parameters from _pico_db_config should be different in different tiers. Added scopes to all parameters from `_pico_db_config`. There are two scopes right now - tier and global. Parameters with scope tier can be different on different tiers. For example `ALTER SYSTEM SET parameter_with_scope_tier FOR ALL TIERS` `ALTER SYSTEM SET parameter_with_scope_tier FOR TIER default`. Parameters with scope global are the same on each instance. --- CHANGELOG.md | 7 + pico_proc_macro/src/lib.rs | 25 +++- sbroad/sbroad-core/src/frontend/sql.rs | 13 +- sbroad/sbroad-core/src/ir/node.rs | 10 +- src/bootstrap_entries.rs | 18 +-- src/config.rs | 148 ++++++++++++++----- src/governor/mod.rs | 6 +- src/introspection.rs | 12 ++ src/lib.rs | 10 +- src/sql.rs | 115 ++++++++++----- src/storage.rs | 194 +++++++++++++++++++------ src/traft/node.rs | 58 ++++---- test/int/test_basics.py | 47 +++--- test/int/test_runtime_configuration.py | 143 +++++++++++++++++- test/int/test_sql.py | 88 +++++------ 15 files changed, 643 insertions(+), 251 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 439bc40df8..2737bd3970 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -59,6 +59,13 @@ with the `YY.MINOR.MICRO` scheme. - New field `_pico_property.system_catalog_version` representing version of a system catalog. It may not be changed at every release, so this is not autoincrementing value. +- Added scopes to all parameters from `_pico_db_config`. There are two scopesright now - `tier` +and `global`. Parameters with scope `tier` +can be different on different tiers. +For example `ALTER SYSTEM SET parameter_with_scope_tier FOR ALL TIERS` or +`ALTER SYSTEM SET parameter_with_scope_tier FOR TIER default`. +Parameters with scope `global` are the same on each instance. + ### CLI - `picodata expel` takes instance uuid instead of instance name. diff --git a/pico_proc_macro/src/lib.rs b/pico_proc_macro/src/lib.rs index 32fbd10597..96a3e13c1c 100644 --- a/pico_proc_macro/src/lib.rs +++ b/pico_proc_macro/src/lib.rs @@ -1,4 +1,5 @@ use quote::quote; +use syn::LitStr; macro_rules! unwrap_or_compile_error { ($expr:expr) => { @@ -146,6 +147,12 @@ fn generate_body_for_field_infos(context: &Context) -> proc_macro2::TokenStream for field in &context.fields { let name = &field.name; + let mut scope = quote! { None }; + if let Some(s) = &field.attrs.scope { + let lit = LitStr::new(s, proc_macro2::Span::call_site()); + scope = quote! { Some(#lit) } + } + #[allow(non_snake_case)] let Type = &field.field.ty; @@ -153,6 +160,7 @@ fn generate_body_for_field_infos(context: &Context) -> proc_macro2::TokenStream code.extend(quote! { #crate_::introspection::FieldInfo { name: #name, + scope: #scope, nested_fields: &[], }, }); @@ -160,6 +168,7 @@ fn generate_body_for_field_infos(context: &Context) -> proc_macro2::TokenStream code.extend(quote! { #crate_::introspection::FieldInfo { name: #name, + scope: #scope, nested_fields: #Type::FIELD_INFOS, }, }); @@ -697,6 +706,12 @@ struct FieldAttrs { /// The provided expression must have type `picodata::config::SbroadType`. /// The user must make sure that `config_default` is not conflicting with `sbroad_type`. sbroad_type: Option<syn::Expr>, + + /// Scope of parameter. There is only two scopes - Tier and Global. + /// + /// Parameters with Tier scope can be set for specific tier with `FOR TIER 'tier_name'` + /// syntax. + scope: Option<String>, } impl FieldAttrs { @@ -734,10 +749,18 @@ impl FieldAttrs { input.parse::<syn::Token![=]>()?; result.sbroad_type = Some(input.parse::<syn::Expr>()?); + } else if ident == "scope" { + if result.scope.is_some() { + return Err(syn::Error::new(ident.span(), "duplicate `scope` specified")); + } + + input.parse::<syn::Token![=]>()?; + let scope = input.parse::<syn::Ident>()?; + result.scope = Some(scope.to_string()); } else { return Err(syn::Error::new( ident.span(), - format!("unknown attribute argument `{ident}`, expected one of `ignore`, `nested`, `config_default`, `sbroad_type`"), + format!("unknown attribute argument `{ident}`, expected one of `ignore`, `nested`, `config_default`, `sbroad_type`, `scope`"), )); } diff --git a/sbroad/sbroad-core/src/frontend/sql.rs b/sbroad/sbroad-core/src/frontend/sql.rs index 2ad0fa7473..0faed2e979 100644 --- a/sbroad/sbroad-core/src/frontend/sql.rs +++ b/sbroad/sbroad-core/src/frontend/sql.rs @@ -6,7 +6,9 @@ use crate::ir::node::ddl::DdlOwned; use crate::ir::node::deallocate::Deallocate; use crate::ir::node::tcl::Tcl; -use crate::ir::node::{Alias, LocalTimestamp, Reference, ReferenceAsteriskSource}; +use crate::ir::node::{ + Alias, AlterSystemTierPart, LocalTimestamp, Reference, ReferenceAsteriskSource, +}; use crate::ir::relation::Type; use ahash::{AHashMap, AHashSet}; use core::panic; @@ -394,13 +396,16 @@ fn parse_alter_system<M: Metadata>( .expect("Expected mandatory child node under AlterSystemTier."); let tier_node_child = ast.nodes.get_node(*tier_node_child_id)?; match tier_node_child.rule { - Rule::AlterSystemTiersAll => None, + Rule::AlterSystemTiersAll => Some(AlterSystemTierPart::AllTiers), Rule::AlterSystemTierSingle => { let node_child_id = tier_node_child .children .first() .expect("Child node expected under AlterSystemTierSingle."); - Some(parse_identifier(ast, *node_child_id)?) + Some(AlterSystemTierPart::Tier(parse_identifier( + ast, + *node_child_id, + )?)) } _ => panic!("Unexpected rule met under AlterSystemTier."), } @@ -410,7 +415,7 @@ fn parse_alter_system<M: Metadata>( Ok(AlterSystem { ty, - tier_name, + tier_part: tier_name, timeout: get_default_timeout(), }) } diff --git a/sbroad/sbroad-core/src/ir/node.rs b/sbroad/sbroad-core/src/ir/node.rs index 6d988857ac..9202d91dac 100644 --- a/sbroad/sbroad-core/src/ir/node.rs +++ b/sbroad/sbroad-core/src/ir/node.rs @@ -985,12 +985,16 @@ pub struct SetParam { #[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] pub struct AlterSystem { pub ty: AlterSystemType, - /// In case of None, ALTER is supposed - /// to be executed on all tiers. - pub tier_name: Option<SmolStr>, + pub tier_part: Option<AlterSystemTierPart>, pub timeout: Decimal, } +#[derive(Clone, Debug, Deserialize, PartialEq, Eq, Serialize)] +pub enum AlterSystemTierPart { + AllTiers, + Tier(SmolStr), +} + impl From<AlterSystem> for NodeAligned { fn from(value: AlterSystem) -> Self { Self::Node136(Node136::AlterSystem(value)) diff --git a/src/bootstrap_entries.rs b/src/bootstrap_entries.rs index 606e9e84bd..c8ca88a668 100644 --- a/src/bootstrap_entries.rs +++ b/src/bootstrap_entries.rs @@ -5,19 +5,20 @@ use ::tarantool::msgpack; use crate::access_control::validate_password; use crate::config::PicodataConfig; +use crate::config::{self}; use crate::info::PICODATA_VERSION; use crate::instance::Instance; use crate::replicaset::Replicaset; use crate::schema; use crate::schema::{ADMIN_ID, GUEST_ID}; use crate::storage::PropertyName; +use crate::storage::{self}; use crate::storage::{Clusterwide, TClusterwideTable}; use crate::tier::Tier; use crate::tlog; use crate::traft; use crate::traft::error::Error; use crate::traft::op; -use crate::{config, storage}; use std::collections::HashMap; use std::env; use tarantool::auth::{AuthData, AuthDef, AuthMethod}; @@ -128,19 +129,14 @@ pub(super) fn prepare( .expect("serialization cannot fail"), ); + let tier_names = tiers.keys().map(AsRef::as_ref).collect::<Vec<_>>(); + // // Populate "_pico_db_config" with initial values for cluster-wide properties // - for (name, default) in config::get_defaults_for_all_alter_system_parameters() { - #[rustfmt::skip] - ops.push( - op::Dml::insert( - storage::DbConfig::TABLE_ID, - &(name, default), - ADMIN_ID, - ) - .expect("serialization cannot fail")); - } + let db_config_entries = config::get_defaults_for_all_alter_system_parameters(&tier_names)?; + + ops.extend(db_config_entries); let context = traft::EntryContext::Op(op::Op::BatchDml { ops }); init_entries.push( diff --git a/src/config.rs b/src/config.rs index 2e1b0a0a0f..5c6e19ceef 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,8 +8,9 @@ use crate::introspection::FieldInfo; use crate::introspection::Introspection; use crate::pgproto; use crate::replicaset::ReplicasetName; +use crate::schema::ADMIN_ID; use crate::sql::value_type_str; -use crate::storage; +use crate::storage::{self, DbConfig, TClusterwideTable}; use crate::system_parameter_name; use crate::tarantool::set_cfg_field; use crate::tier::Tier; @@ -17,6 +18,7 @@ use crate::tier::TierConfig; use crate::tier::DEFAULT_TIER; use crate::tlog; use crate::traft::error::Error; +use crate::traft::op::Dml; use crate::traft::RaftSpaceAccess; use crate::util::edit_distance; use crate::util::file_exists; @@ -1661,6 +1663,7 @@ pub struct AlterSystemParameters { /// Corresponds to `box.cfg.checkpoint_count`. #[introspection(sbroad_type = SbroadType::Unsigned)] #[introspection(config_default = 2)] + #[introspection(scope = tier)] pub memtx_checkpoint_count: u64, /// The interval in seconds between actions by the checkpoint daemon. If the @@ -1673,6 +1676,7 @@ pub struct AlterSystemParameters { /// Corresponds to `box.cfg.checkpoint_interval`. #[introspection(config_default = 3600)] #[introspection(sbroad_type = SbroadType::Unsigned)] + #[introspection(scope = tier)] pub memtx_checkpoint_interval: u64, /// To handle messages, Tarantool allocates fibers. To prevent fiber @@ -1702,9 +1706,39 @@ pub struct AlterSystemParameters { /// Corresponds to `box.cfg.net_msg_max` #[introspection(sbroad_type = SbroadType::Unsigned)] #[introspection(config_default = 0x300)] + #[introspection(scope = tier)] pub iproto_net_msg_max: u64, } +impl AlterSystemParameters { + pub const FIELD_NAME: u32 = 0; + pub const FIELD_SCOPE: u32 = 1; + pub const FIELD_VALUE: u32 = 2; + + /// Returns error in case of `parameter_name` not in existing parameters. + fn has_scope(parameter_name: &str, scope: &str) -> Result<bool, Error> { + for field_info in Self::FIELD_INFOS { + if field_info.name == parameter_name { + let scope_value = field_info.scope.unwrap_or(DbConfig::GLOBAL_SCOPE); + return Ok(scope_value == scope); + } + } + + Err(Error::other(format!( + "No such parameter with name '{}'", + parameter_name + ))) + } + + pub fn has_scope_global(parameter_name: &str) -> Result<bool, Error> { + Self::has_scope(parameter_name, "") + } + + pub fn has_scope_tier(parameter_name: &str) -> Result<bool, Error> { + Self::has_scope(parameter_name, "tier") + } +} + /// A special macro helper for referring to alter system parameters thoroughout /// the codebase. It makes sure the parameter with this name exists and returns /// it's name. @@ -1741,9 +1775,8 @@ pub fn validate_alter_system_parameter_value<'v>( let expected_type = DerivedType::new(expected_type); let Ok(casted_value) = value.cast_and_encode(&expected_type) else { let actual_type = value_type_str(value); - return Err(Error::other(format!( - "invalid value for '{name}' expected {expected_type}, got {actual_type}", - ))); + #[rustfmt::skip] + return Err(Error::other(format!("invalid value for '{name}' expected {expected_type}, got {actual_type}",))); }; // Not sure how I feel about this... @@ -1794,8 +1827,12 @@ pub fn get_default_value_of_alter_system_parameter(name: &str) -> Option<rmpv::V Some(default) } -/// Returns an array of pairs (parameter name, default value). -pub fn get_defaults_for_all_alter_system_parameters() -> Vec<(String, rmpv::Value)> { +/// `tiers` is a list of names of all tiers in cluster. +/// Returns an array of dmls that replacing all entries in _pico_db_config +/// with default values for every possible scope. +pub fn get_defaults_for_all_alter_system_parameters( + tier_names: &[&str], +) -> Result<Vec<Dml>, Error> { // NOTE: we need an instance of this struct because of how `Introspection::get_field_default_value_as_rmpv` // works. We allow default values to refer to other fields of the struct // which were actuall provided (see `InstanceConfig::advertise_address` for example). @@ -1810,15 +1847,33 @@ pub fn get_defaults_for_all_alter_system_parameters() -> Vec<(String, rmpv::Valu // the system table change. let parameters = AlterSystemParameters::default(); - let mut result = Vec::with_capacity(AlterSystemParameters::FIELD_INFOS.len()); + let mut dmls = vec![]; + + let global_scope = vec![DbConfig::GLOBAL_SCOPE]; + + // This function called from two places: persisting bootstrap entries - `start_boot`, where + // initiator is ADMIN, and ALTER SYSTEM which allowed only for ADMIN. + let initiator = ADMIN_ID; + for name in &leaf_field_paths::<AlterSystemParameters>() { let default = parameters .get_field_default_value_as_rmpv(name) .expect("paths are correct"); let default = default.expect("default must be specified explicitly for all parameters"); - result.push((name.clone(), default)); + + let scope_values = if AlterSystemParameters::has_scope_tier(&name)? { + tier_names + } else { + &global_scope + }; + + for scope_value in scope_values { + #[rustfmt::skip] + dmls.push(Dml::replace(DbConfig::TABLE_ID, &(&name, scope_value, &default), initiator,)?); + } } - result + + Ok(dmls) } /// Non-persistent apply of parameter from _pico_db_config @@ -1829,46 +1884,61 @@ pub fn get_defaults_for_all_alter_system_parameters() -> Vec<(String, rmpv::Valu /// Panic in following cases: /// - tuple is not key-value with predefined schema /// - while applying via box.cfg -pub fn apply_parameter(tuple: Tuple) { +pub fn apply_parameter(tuple: Tuple, current_tier: &str) { let name = tuple - .field::<&str>(0) - .expect("there is always 2 fields in _pico_db_config tuple") + .field::<&str>(AlterSystemParameters::FIELD_NAME) + .expect("there is always 3 fields in _pico_db_config tuple") .expect("key is always present and it's type string"); - // set dynamic parameters - if name == system_parameter_name!(memtx_checkpoint_count) { - let value = tuple - .field::<u64>(1) - .expect("there is always 2 fields in _pico_db_config tuple") - .expect("type already checked"); - - set_cfg_field("checkpoint_count", value).expect("changing checkpoint_count shouldn't fail"); - } else if name == system_parameter_name!(memtx_checkpoint_interval) { - let value = tuple - .field::<f64>(1) - .expect("there is always 2 fields in _pico_db_config tuple") - .expect("type already checked"); - - set_cfg_field("checkpoint_interval", value) - .expect("changing checkpoint_interval shouldn't fail"); - } else if name == system_parameter_name!(iproto_net_msg_max) { - let value = tuple - .field::<u64>(1) - .expect("there is always 2 fields in _pico_db_config tuple") - .expect("type already checked"); - - set_cfg_field("net_msg_max", value).expect("changing net_msg_max shouldn't fail"); + if AlterSystemParameters::has_scope_tier(name) + .expect("apply_parameter called only with existing names") + { + // There is only two scopes: clusterwide(global) and tier. + // Clusterwide represented as empty string, and scope tier represented by it's name. + let target_tier_name = tuple + .field::<&str>(AlterSystemParameters::FIELD_SCOPE) + .expect("there is always 3 fields in _pico_db_config tuple") + .expect("key is always present and it's type string"); + + if target_tier_name != current_tier { + return; + } + + if name == system_parameter_name!(memtx_checkpoint_count) { + let value = tuple + .field::<u64>(AlterSystemParameters::FIELD_VALUE) + .expect("there is always 3 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("checkpoint_count", value) + .expect("changing checkpoint_count shouldn't fail"); + } else if name == system_parameter_name!(memtx_checkpoint_interval) { + let value = tuple + .field::<f64>(AlterSystemParameters::FIELD_VALUE) + .expect("there is always 3 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("checkpoint_interval", value) + .expect("changing checkpoint_interval shouldn't fail"); + } else if name == system_parameter_name!(iproto_net_msg_max) { + let value = tuple + .field::<u64>(AlterSystemParameters::FIELD_VALUE) + .expect("there is always 3 fields in _pico_db_config tuple") + .expect("type already checked"); + + set_cfg_field("net_msg_max", value).expect("changing net_msg_max shouldn't fail"); + } } else if name == system_parameter_name!(pg_portal_max) { let value = tuple - .field::<usize>(1) - .expect("there is always 2 fields in _pico_db_config tuple") + .field::<usize>(AlterSystemParameters::FIELD_VALUE) + .expect("there is always 3 fields in _pico_db_config tuple") .expect("type already checked"); // Cache the value. MAX_PG_PORTALS.store(value, Ordering::Relaxed); } else if name == system_parameter_name!(pg_statement_max) { let value = tuple - .field::<usize>(1) - .expect("there is always 2 fields in _pico_db_config tuple") + .field::<usize>(AlterSystemParameters::FIELD_VALUE) + .expect("there is always 3 fields in _pico_db_config tuple") .expect("type already checked"); // Cache the value. MAX_PG_STATEMENTS.store(value, Ordering::Relaxed); diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 40ee931198..d7fcd422b4 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -72,19 +72,19 @@ impl Loop { let v: f64 = storage .db_config - .get_or_default(crate::system_parameter_name!(governor_raft_op_timeout)) + .governor_raft_op_timeout() .expect("storage should never ever fail"); let raft_op_timeout = Duration::from_secs_f64(v); let v: f64 = storage .db_config - .get_or_default(crate::system_parameter_name!(governor_common_rpc_timeout)) + .governor_common_rpc_timeout() .expect("storage should never ever fail"); let rpc_timeout = Duration::from_secs_f64(v); let v: f64 = storage .db_config - .get_or_default(crate::system_parameter_name!(governor_plugin_rpc_timeout)) + .governor_plugin_rpc_timeout() .expect("storage should never ever fail"); let plugin_rpc_timeout = Duration::from_secs_f64(v); diff --git a/src/introspection.rs b/src/introspection.rs index 2750dec91b..09a51b7990 100644 --- a/src/introspection.rs +++ b/src/introspection.rs @@ -237,12 +237,14 @@ pub trait Introspection { #[derive(PartialEq, Eq, Hash)] pub struct FieldInfo { pub name: &'static str, + pub scope: Option<&'static str>, pub nested_fields: &'static [FieldInfo], } impl std::fmt::Debug for FieldInfo { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!(f, "{:?}", self.name)?; + write!(f, "{:?}", self.scope)?; if !self.nested_fields.is_empty() { write!(f, ": {:?}", self.nested_fields)?; } @@ -427,6 +429,7 @@ mod test { s: String, #[introspection(config_default = &["this", "also", "works"])] #[introspection(sbroad_type = SbroadType::Array)] + #[introspection(scope = tier)] v: Vec<String>, #[introspection(nested)] #[introspection(sbroad_type = SbroadType::Map)] @@ -442,6 +445,7 @@ mod test { #[introspection(sbroad_type = SbroadType::String)] a: String, #[introspection(config_default = format!("{}, but type safety is missing unfortunately", self.a))] + #[introspection(scope = tier)] b: i64, #[introspection(nested)] empty: Empty, @@ -737,33 +741,41 @@ mod test { &[ FieldInfo { name: "x", + scope: None, nested_fields: &[] }, FieldInfo { name: "y", + scope: None, nested_fields: &[] }, FieldInfo { name: "s", + scope: None, nested_fields: &[] }, FieldInfo { name: "v", + scope: Some("tier"), nested_fields: &[] }, FieldInfo { name: "struct", + scope: None, nested_fields: &[ FieldInfo { name: "a", + scope: None, nested_fields: &[] }, FieldInfo { name: "b", + scope: Some("tier"), nested_fields: &[] }, FieldInfo { name: "empty", + scope: None, nested_fields: &[] }, ] diff --git a/src/lib.rs b/src/lib.rs index 1336e59685..b231cbf26c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -628,9 +628,9 @@ fn set_on_access_denied_audit_trigger() { } /// Apply all dynamic parameters from `_pico_db_config` via box.cfg -fn reapply_dynamic_parameters(storage: &Clusterwide) -> Result<()> { +fn reapply_dynamic_parameters(storage: &Clusterwide, current_tier: &str) -> Result<()> { for parameter in storage.db_config.iter()? { - apply_parameter(parameter); + apply_parameter(parameter, current_tier); } Ok(()) @@ -1020,7 +1020,11 @@ fn postjoin( config.validate_storage(&storage, &raft_storage)?; - reapply_dynamic_parameters(&storage)?; + let current_tier_name = raft_storage + .tier()? + .expect("tier for instance should exists"); + + reapply_dynamic_parameters(&storage, ¤t_tier_name)?; if let Some(config) = &config.instance.audit { let raft_id = raft_storage diff --git a/src/sql.rs b/src/sql.rs index 5bc269c4bc..0b9a3ec227 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -3,6 +3,7 @@ use crate::access_control::access_check_plugin_system; use crate::access_control::{validate_password, UserMetadataKind}; use crate::cas::Predicate; +use crate::config::AlterSystemParameters; use crate::schema::{ wait_for_ddl_commit, CreateIndexParams, CreateProcParams, CreateTableParams, DistributionParam, Field, IndexOption, PrivilegeDef, PrivilegeType, RenameRoutineParams, RoutineDef, @@ -11,9 +12,9 @@ use crate::schema::{ }; use crate::sql::router::RouterRuntime; use crate::sql::storage::StorageRuntime; -use crate::storage::{space_by_name, TClusterwideTable}; +use crate::storage::{space_by_name, DbConfig, TClusterwideTable, ToEntryIter}; use crate::sync::wait_for_index_globally; -use crate::traft::error::{self, Error, Unsupported}; +use crate::traft::error::{self, Error}; use crate::traft::node::Node as TraftNode; use crate::traft::op::{Acl as OpAcl, Ddl as OpDdl, Dml, DmlKind, Op}; use crate::traft::{self, node}; @@ -36,13 +37,13 @@ use sbroad::ir::node::block::Block; use sbroad::ir::node::ddl::{Ddl, DdlOwned}; use sbroad::ir::node::expression::ExprOwned; use sbroad::ir::node::relational::Relational; -use sbroad::ir::node::NodeId; use sbroad::ir::node::{ AlterSystem, AlterUser, Constant, CreateIndex, CreateProc, CreateRole, CreateTable, CreateUser, Delete, DropIndex, DropProc, DropRole, DropTable, DropUser, GrantPrivilege, Insert, Node as IrNode, NodeOwned, Procedure, RenameRoutine, RevokePrivilege, ScanRelation, SetParam, Update, }; +use sbroad::ir::node::{AlterSystemTierPart, NodeId}; use tarantool::decimal::Decimal; use crate::plugin::{InheritOpts, PluginIdentifier, TopologyUpdateOpKind}; @@ -1131,55 +1132,101 @@ fn acl_ir_node_to_op_or_result( } } +#[rustfmt::skip] fn alter_system_ir_node_to_op_or_result( + storage: &Clusterwide, ty: &AlterSystemType, - tier_name: Option<&str>, + tier_part: Option<&AlterSystemTierPart>, current_user: UserId, ) -> traft::Result<ControlFlow<ConsumerResult, Op>> { - if tier_name.is_some() { - // TODO: Should be resolved as a part of - // https://git.picodata.io/picodata/picodata/picodata/-/issues/867. - return Err(Error::Unsupported(Unsupported::new( - String::from("specifying tier name in alter system"), - Some(String::from("use 'all tiers' instead")), - ))); - } - let table = crate::storage::DbConfig::TABLE_ID; let initiator = current_user; + match ty { AlterSystemType::AlterSystemSet { param_name, param_value, } => { - let casted_value = - crate::config::validate_alter_system_parameter_value(param_name, param_value)?; + let casted_value = crate::config::validate_alter_system_parameter_value(param_name, param_value)?; + + let mut dmls = Vec::new(); + + if AlterSystemParameters::has_scope_tier(param_name)? { + let Some(tier_part) = tier_part else { + return Err(Error::other("can't set tiered parameter without specifing tier")); + }; + + let tiers = storage.tiers.iter()?.collect::<Vec<_>>(); + + match tier_part { + AlterSystemTierPart::AllTiers => { + for tier in tiers.iter() { + dmls.push(Dml::replace(table,&(param_name, &tier.name, &casted_value), initiator)?); + } + } + AlterSystemTierPart::Tier(tier_name) => { + let Some(tier) = tiers.iter().find(|tier| tier.name == *tier_name) else { + return Err(Error::other(format!("specified tier '{tier_name}' doesn't exist"))); + }; + + dmls.push(Dml::replace(table,&(param_name, &tier.name, &casted_value), initiator)?); + } + } + } else { + if let Some(AlterSystemTierPart::Tier(tier)) = tier_part { + return Err(Error::other(format!("can't set parameter with global scope for tier '{tier}'"))); + }; - let dml = Dml::replace(table, &(param_name, casted_value), initiator)?; + dmls.push(Dml::replace(table,&(param_name, DbConfig::GLOBAL_SCOPE, casted_value), initiator)?); + } - Ok(Continue(Op::Dml(dml))) + Ok(Continue(Op::BatchDml{ ops: dmls })) } AlterSystemType::AlterSystemReset { param_name } => { match param_name { + // reset one Some(param_name) => { - // reset one - let Some(default_value) = - crate::config::get_default_value_of_alter_system_parameter(param_name) - else { + let Some(default_value) = crate::config::get_default_value_of_alter_system_parameter(param_name) else { return Err(Error::other(format!("unknown parameter: '{param_name}'"))); }; - let dml = Dml::replace(table, &(param_name, default_value), initiator)?; - Ok(Continue(Op::Dml(dml))) + let mut dmls = Vec::new(); + + if AlterSystemParameters::has_scope_tier(param_name)? { + let Some(tier_part) = tier_part else { + return Err(Error::other("can't reset tiered parameter without specifing tier")); + }; + + let tiers = storage.tiers.iter()?.collect::<Vec<_>>(); + + match tier_part { + AlterSystemTierPart::AllTiers => { + for tier in tiers.iter() { + dmls.push(Dml::replace(table,&(param_name, &tier.name, &default_value), initiator)?); + } + } + AlterSystemTierPart::Tier(tier_name) => { + let Some(tier) = tiers.iter().find(|tier| tier.name == *tier_name) else { + return Err(Error::other(format!("specified tier '{tier_name}' doesn't exist"))); + }; + + dmls.push(Dml::replace(table,&(param_name, &tier.name, &default_value), initiator)?); + } + } + } else { + if let Some(AlterSystemTierPart::Tier(tier)) = tier_part { + return Err(Error::other(format!("can't reset parameter with global scope for tier '{tier}'"))); + }; + + dmls.push(Dml::replace(table,&(param_name, DbConfig::GLOBAL_SCOPE, default_value), initiator)?); + } + + Ok(Continue(Op::BatchDml { ops: dmls })) } + // reset all None => { - // reset all - let defaults = crate::config::get_defaults_for_all_alter_system_parameters(); - let mut dmls = Vec::with_capacity(defaults.len()); - for (param_name, default_value) in defaults { - let dml = Dml::replace(table, &(param_name, default_value), initiator)?; - dmls.push(dml); - } + let tiers = storage.tiers.iter()?.map(|tier| tier.name).collect::<Vec<_>>(); + let dmls = crate::config::get_defaults_for_all_alter_system_parameters(&tiers.iter().map(String::as_str).collect::<Vec<_>>())?; Ok(Continue(Op::BatchDml { ops: dmls })) } } @@ -1195,12 +1242,8 @@ fn ddl_ir_node_to_op_or_result( storage: &Clusterwide, ) -> traft::Result<ControlFlow<ConsumerResult, Op>> { match ddl { - DdlOwned::AlterSystem(AlterSystem { ty, tier_name, .. }) => { - alter_system_ir_node_to_op_or_result( - ty, - tier_name.as_ref().map(|s| s.as_str()), - current_user, - ) + DdlOwned::AlterSystem(AlterSystem { ty, tier_part, .. }) => { + alter_system_ir_node_to_op_or_result(storage, ty, tier_part.as_ref(), current_user) } DdlOwned::CreateTable(CreateTable { name, diff --git a/src/storage.rs b/src/storage.rs index 096f6157a8..cbf2f2f329 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -14,7 +14,7 @@ use tarantool::tuple::KeyDef; use tarantool::tuple::{RawBytes, Tuple}; use tarantool::util::NumOrStr; -use crate::config; +use crate::config::{self, AlterSystemParameters}; use crate::failure_domain::FailureDomain; use crate::info::PICODATA_VERSION; use crate::instance::{self, Instance}; @@ -2939,7 +2939,8 @@ impl PluginConfig { #[derive(Debug, Clone)] pub struct DbConfig { pub space: Space, - pub index: Index, + pub primary: Index, // by key + scope + pub secondary: Index, // by key } impl TClusterwideTable for DbConfig { @@ -2950,26 +2951,45 @@ impl TClusterwideTable for DbConfig { use tarantool::space::Field; vec![ Field::from(("key", FieldType::String)), + Field::from(("scope", FieldType::String)), Field::from(("value", FieldType::Any)), ] } fn index_definitions() -> Vec<IndexDef> { - vec![IndexDef { - table_id: Self::TABLE_ID, - // Primary index - id: 0, - name: "_pico_db_config_key".into(), - ty: IndexType::Tree, - opts: vec![IndexOption::Unique(true)], - parts: vec![Part::from(("key", IndexFieldType::String)).is_nullable(false)], - operable: true, - // This means the local schema is already up to date and main loop doesn't need to do anything - schema_version: INITIAL_SCHEMA_VERSION, - }] + vec![ + IndexDef { + table_id: Self::TABLE_ID, + // Primary index + id: 0, + name: "_pico_db_config_pk".into(), + ty: IndexType::Tree, + opts: vec![IndexOption::Unique(true)], + parts: vec![ + Part::from(("key", IndexFieldType::String)).is_nullable(false), + Part::from(("scope", IndexFieldType::String)).is_nullable(false), + ], + operable: true, + // This means the local schema is already up to date and main loop doesn't need to do anything + schema_version: INITIAL_SCHEMA_VERSION, + }, + IndexDef { + table_id: Self::TABLE_ID, + id: 1, + name: "_pico_db_config_key".into(), + ty: IndexType::Tree, + opts: vec![IndexOption::Unique(false)], + parts: vec![Part::from(("key", IndexFieldType::String)).is_nullable(false)], + operable: true, + // This means the local schema is already up to date and main loop doesn't need to do anything + schema_version: INITIAL_SCHEMA_VERSION, + }, + ] } } impl DbConfig { + pub const GLOBAL_SCOPE: &str = ""; + pub fn new() -> tarantool::Result<Self> { let space = Space::builder(Self::TABLE_NAME) .id(Self::TABLE_ID) @@ -2978,73 +2998,116 @@ impl DbConfig { .if_not_exists(true) .create()?; - let index = space - .index_builder("_pico_db_config_key") + let primary = space + .index_builder("_pico_db_config_pk") .unique(true) .part("key") + .part("scope") .if_not_exists(true) .create()?; - Ok(Self { space, index }) + let secondary = space + .index_builder("_pico_db_config_key") + .unique(false) + .part("key") + .if_not_exists(true) + .create()?; + + Ok(Self { + space, + primary, + secondary, + }) } #[inline] - pub fn get_or_default<T>(&self, key: &'static str) -> tarantool::Result<T> + pub fn get_or_default<T>(&self, key: &'static str, target_scope: &str) -> tarantool::Result<T> where T: DecodeOwned, T: serde::de::DeserializeOwned, { - if let Some(t) = self.space.get(&[key])? { - if let Some(res) = t.field(1)? { + let global_scope = AlterSystemParameters::has_scope_global(key) + .map_err(|err| ::tarantool::error::Error::other(err.to_string()))?; + + for (index, tuple) in self.by_key(key)?.enumerate() { + // parameters with global scope are unique by it's name + if cfg!(debug_assertions) && global_scope { + assert!(index < 1); + } + + let res: T = tuple.field(AlterSystemParameters::FIELD_VALUE)?.expect(""); + + if global_scope { + return Ok(res); + } + + let current_scope: &str = tuple.field(AlterSystemParameters::FIELD_SCOPE)?.expect(""); + + if target_scope == current_scope { return Ok(res); } } + // it's not unreachable since some of parameters may be needed before filling _pico_db_config + let value = config::get_default_value_of_alter_system_parameter(key) .expect("parameter name is validated using system_parameter_name! macro"); let res = rmpv::ext::from_value(value).expect("default value type is correct"); Ok(res) } - #[inline] - pub fn get<T>(&self, key: &'static str) -> tarantool::Result<Option<T>> - where - T: DecodeOwned, - { - match self.space.get(&[key])? { - Some(t) => t.field(1), - None => Ok(None), - } + #[inline(always)] + pub fn by_key(&self, key: &str) -> tarantool::Result<EntryIter<Tuple, MP_SERDE>> { + let iter = self.secondary.select(IteratorType::Eq, &[key])?; + Ok(EntryIter::new(iter)) } #[inline] pub fn auth_password_length_min(&self) -> tarantool::Result<usize> { - self.get_or_default(system_parameter_name!(auth_password_length_min)) + self.get_or_default( + system_parameter_name!(auth_password_length_min), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn auth_password_enforce_uppercase(&self) -> tarantool::Result<bool> { - self.get_or_default(system_parameter_name!(auth_password_enforce_uppercase)) + self.get_or_default( + system_parameter_name!(auth_password_enforce_uppercase), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn auth_password_enforce_lowercase(&self) -> tarantool::Result<bool> { - self.get_or_default(system_parameter_name!(auth_password_enforce_lowercase)) + self.get_or_default( + system_parameter_name!(auth_password_enforce_lowercase), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn auth_password_enforce_digits(&self) -> tarantool::Result<bool> { - self.get_or_default(system_parameter_name!(auth_password_enforce_digits)) + self.get_or_default( + system_parameter_name!(auth_password_enforce_digits), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn auth_password_enforce_specialchars(&self) -> tarantool::Result<bool> { - self.get_or_default(system_parameter_name!(auth_password_enforce_specialchars)) + self.get_or_default( + system_parameter_name!(auth_password_enforce_specialchars), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn auth_login_attempt_max(&self) -> tarantool::Result<usize> { - self.get_or_default(system_parameter_name!(auth_login_attempt_max)) + self.get_or_default( + system_parameter_name!(auth_login_attempt_max), + Self::GLOBAL_SCOPE, + ) } #[inline] @@ -3054,7 +3117,8 @@ impl DbConfig { return Ok(cached); } - let res = self.get_or_default(system_parameter_name!(pg_statement_max))?; + let res = + self.get_or_default(system_parameter_name!(pg_statement_max), Self::GLOBAL_SCOPE)?; // Cache the value. config::MAX_PG_STATEMENTS.store(res, Ordering::Relaxed); @@ -3068,7 +3132,7 @@ impl DbConfig { return Ok(cached); } - let res = self.get_or_default(system_parameter_name!(pg_portal_max))?; + let res = self.get_or_default(system_parameter_name!(pg_portal_max), Self::GLOBAL_SCOPE)?; // Cache the value. config::MAX_PG_PORTALS.store(res, Ordering::Relaxed); @@ -3077,46 +3141,80 @@ impl DbConfig { #[inline] pub fn raft_snapshot_chunk_size_max(&self) -> tarantool::Result<usize> { - self.get_or_default(system_parameter_name!(raft_snapshot_chunk_size_max)) + self.get_or_default( + system_parameter_name!(raft_snapshot_chunk_size_max), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn raft_snapshot_read_view_close_timeout(&self) -> tarantool::Result<Duration> { #[rustfmt::skip] - let res: f64 = self.get_or_default(system_parameter_name!(raft_snapshot_read_view_close_timeout))?; + let res: f64 = self.get_or_default(system_parameter_name!(raft_snapshot_read_view_close_timeout), Self::GLOBAL_SCOPE)?; Ok(Duration::from_secs_f64(res)) } #[inline] pub fn governor_auto_offline_timeout(&self) -> tarantool::Result<Duration> { #[rustfmt::skip] - let res: f64 = self.get_or_default(system_parameter_name!(governor_auto_offline_timeout))?; + let res: f64 = self.get_or_default(system_parameter_name!(governor_auto_offline_timeout), Self::GLOBAL_SCOPE)?; Ok(Duration::from_secs_f64(res)) } #[inline] pub fn sql_motion_row_max(&self) -> tarantool::Result<u64> { - self.get_or_default(system_parameter_name!(sql_motion_row_max)) + self.get_or_default( + system_parameter_name!(sql_motion_row_max), + Self::GLOBAL_SCOPE, + ) } #[inline] pub fn sql_vdbe_opcode_max(&self) -> tarantool::Result<u64> { - self.get_or_default(system_parameter_name!(sql_vdbe_opcode_max)) + self.get_or_default( + system_parameter_name!(sql_vdbe_opcode_max), + Self::GLOBAL_SCOPE, + ) } #[inline] - pub fn memtx_checkpoint_count(&self) -> tarantool::Result<u64> { - self.get_or_default(system_parameter_name!(memtx_checkpoint_count)) + pub fn raft_wal_size_max(&self) -> tarantool::Result<u64> { + self.get_or_default( + system_parameter_name!(raft_wal_size_max), + Self::GLOBAL_SCOPE, + ) + } + + #[inline] + pub fn raft_wal_count_max(&self) -> tarantool::Result<u64> { + self.get_or_default( + system_parameter_name!(raft_wal_count_max), + Self::GLOBAL_SCOPE, + ) } #[inline] - pub fn memtx_checkpoint_interval(&self) -> tarantool::Result<u64> { - self.get_or_default(system_parameter_name!(memtx_checkpoint_interval)) + pub fn governor_raft_op_timeout(&self) -> tarantool::Result<f64> { + self.get_or_default( + system_parameter_name!(governor_raft_op_timeout), + Self::GLOBAL_SCOPE, + ) } #[inline] - pub fn iproto_net_msg_max(&self) -> tarantool::Result<u64> { - self.get_or_default(system_parameter_name!(iproto_net_msg_max)) + pub fn governor_common_rpc_timeout(&self) -> tarantool::Result<f64> { + self.get_or_default( + system_parameter_name!(governor_common_rpc_timeout), + Self::GLOBAL_SCOPE, + ) + } + + #[inline] + pub fn governor_plugin_rpc_timeout(&self) -> tarantool::Result<f64> { + self.get_or_default( + system_parameter_name!(governor_plugin_rpc_timeout), + Self::GLOBAL_SCOPE, + ) } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 8988338266..3f6fb08f17 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -36,9 +36,9 @@ use crate::storage::schema::ddl_meta_drop_space; use crate::storage::schema::ddl_meta_space_update_operable; use crate::storage::snapshot::SnapshotData; use crate::storage::space_by_id; +use crate::storage::DbConfig; use crate::storage::{self, Clusterwide, PropertyName, TClusterwideTable}; use crate::storage::{local_schema_version, set_local_schema_version}; -use crate::system_parameter_name; use crate::tlog; use crate::topology_cache::TopologyCache; use crate::traft; @@ -725,7 +725,7 @@ impl NodeImpl { } }; - let mut apply_entry_result = EntryApplied(None); + let mut apply_entry_result = EntryApplied(vec![]); let mut new_applied = None; transaction(|| -> tarantool::Result<()> { self.main_loop_status("handling committed entries"); @@ -769,14 +769,17 @@ impl NodeImpl { fiber::sleep(timeout); continue; } - EntryApplied(None) => { + EntryApplied(dmls) if dmls.is_empty() => { // Actually advance the iterator. let _ = entries.next(); } - EntryApplied(Some(AppliedDml { table, new_tuple })) => { + EntryApplied(dmls) => { + let current_tier = self.topology_cache.my_tier_name(); // currently only parameters from _pico_db_config processed outside of transaction (here) - debug_assert!(table == storage::DbConfig::TABLE_ID); - apply_parameter(new_tuple); + for AppliedDml { table, new_tuple } in dmls { + debug_assert!(table == DbConfig::TABLE_ID); + apply_parameter(new_tuple, current_tier); + } // Actually advance the iterator. let _ = entries.next(); @@ -967,21 +970,25 @@ impl NodeImpl { self.wake_governor_if_needed(&op); let storage_properties = &self.storage.properties; - let mut res = ApplyEntryResult::EntryApplied(None); + let mut res = ApplyEntryResult::EntryApplied(vec![]); // apply the operation match op { Op::Nop => {} - Op::BatchDml { ref ops } => { - for op in ops { - match self.handle_dml_entry(op, expelled) { - Ok(applied_dml) => res = EntryApplied(applied_dml), + Op::BatchDml { ops } => { + let mut applied_dmls = Vec::with_capacity(ops.len()); + for op in ops.into_iter() { + match self.handle_dml_entry(&op, expelled) { + Ok(Some(applied_dml)) => applied_dmls.push(applied_dml), Err(()) => return SleepAndRetry, + _ => (), } } + + res = EntryApplied(applied_dmls); } Op::Dml(op) => match self.handle_dml_entry(&op, expelled) { - Ok(applied_dml) => res = EntryApplied(applied_dml), + Ok(applied_dml) => res = EntryApplied(Vec::from_iter(applied_dml)), Err(()) => return SleepAndRetry, }, Op::DdlPrepare { @@ -1345,7 +1352,7 @@ impl NodeImpl { if let Some(plugin) = maybe_plugin { if plugin.enabled { warn_or_panic!("Op::DropPlugin for an enabled plugin"); - return EntryApplied(None); + return EntryApplied(Vec::new()); } let services = self @@ -2170,6 +2177,7 @@ impl NodeImpl { // Persist stuff raft wants us to persist. let hard_state = ready.hs(); let entries_to_persist = ready.entries(); + if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() { let mut new_term = None; let mut new_applied = None; @@ -2250,11 +2258,6 @@ impl NodeImpl { } if received_snapshot { - // apply changed dynamic parameters - for changed_parameter in changed_parameters { - apply_parameter(Tuple::try_from_slice(&changed_parameter)?); - } - // Need to reload the whole topology cache. We could be more // clever about it and only update the records which changed, // but at the moment this would require doing a full scan and @@ -2265,6 +2268,13 @@ impl NodeImpl { self.topology_cache .full_reload(&self.storage) .expect("schema upgrade not supported yet"); + + let current_tier = self.topology_cache.my_tier_name(); + + // apply changed dynamic parameters + for changed_parameter in changed_parameters { + apply_parameter(Tuple::try_from_slice(&changed_parameter)?, current_tier); + } } if hard_state.is_some() { @@ -2397,10 +2407,7 @@ impl NodeImpl { fn do_raft_log_auto_compaction(&self, old_last_index: RaftIndex) -> traft::Result<()> { let mut compaction_needed = false; - let max_size: u64 = self - .storage - .db_config - .get_or_default(system_parameter_name!(raft_wal_size_max))?; + let max_size: u64 = self.storage.db_config.raft_wal_size_max()?; let current_size = self.raft_storage.raft_log_bsize()?; if current_size > max_size { #[rustfmt::skip] @@ -2408,10 +2415,7 @@ impl NodeImpl { compaction_needed = true; } - let max_count: u64 = self - .storage - .db_config - .get_or_default(system_parameter_name!(raft_wal_count_max))?; + let max_count: u64 = self.storage.db_config.raft_wal_count_max()?; let current_count = self.raft_storage.raft_log_count()?; if current_count > max_count { #[rustfmt::skip] @@ -2516,7 +2520,7 @@ enum ApplyEntryResult { /// Entry applied to persistent storage successfully and should be /// applied to non-persistent outside of transaction, proceed to next entry. - EntryApplied(Option<AppliedDml>), + EntryApplied(Vec<AppliedDml>), } pub(crate) struct MainLoop { diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 403c29a2c6..419f136d26 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -344,27 +344,27 @@ Insert(_pico_property, ["global_schema_version",0]), Insert(_pico_property, ["next_schema_version",1]), Insert(_pico_property, ["system_catalog_version","25.1.0"]), Insert(_pico_property, ["cluster_version","{picodata_version}"]), -Insert(_pico_db_config, ["auth_password_length_min",8]), -Insert(_pico_db_config, ["auth_password_enforce_uppercase",true]), -Insert(_pico_db_config, ["auth_password_enforce_lowercase",true]), -Insert(_pico_db_config, ["auth_password_enforce_digits",true]), -Insert(_pico_db_config, ["auth_password_enforce_specialchars",false]), -Insert(_pico_db_config, ["auth_login_attempt_max",4]), -Insert(_pico_db_config, ["pg_statement_max",1024]), -Insert(_pico_db_config, ["pg_portal_max",1024]), -Insert(_pico_db_config, ["raft_snapshot_chunk_size_max",16777216]), -Insert(_pico_db_config, ["raft_snapshot_read_view_close_timeout",86400]), -Insert(_pico_db_config, ["raft_wal_size_max",67108864]), -Insert(_pico_db_config, ["raft_wal_count_max",64]), -Insert(_pico_db_config, ["governor_auto_offline_timeout",30.0]), -Insert(_pico_db_config, ["governor_raft_op_timeout",3.0]), -Insert(_pico_db_config, ["governor_common_rpc_timeout",3.0]), -Insert(_pico_db_config, ["governor_plugin_rpc_timeout",10.0]), -Insert(_pico_db_config, ["sql_vdbe_opcode_max",45000]), -Insert(_pico_db_config, ["sql_motion_row_max",5000]), -Insert(_pico_db_config, ["memtx_checkpoint_count",2]), -Insert(_pico_db_config, ["memtx_checkpoint_interval",3600]), -Insert(_pico_db_config, ["iproto_net_msg_max",768]))| +Replace(_pico_db_config, ["auth_password_length_min","",8]), +Replace(_pico_db_config, ["auth_password_enforce_uppercase","",true]), +Replace(_pico_db_config, ["auth_password_enforce_lowercase","",true]), +Replace(_pico_db_config, ["auth_password_enforce_digits","",true]), +Replace(_pico_db_config, ["auth_password_enforce_specialchars","",false]), +Replace(_pico_db_config, ["auth_login_attempt_max","",4]), +Replace(_pico_db_config, ["pg_statement_max","",1024]), +Replace(_pico_db_config, ["pg_portal_max","",1024]), +Replace(_pico_db_config, ["raft_snapshot_chunk_size_max","",16777216]), +Replace(_pico_db_config, ["raft_snapshot_read_view_close_timeout","",86400]), +Replace(_pico_db_config, ["raft_wal_size_max","",67108864]), +Replace(_pico_db_config, ["raft_wal_count_max","",64]), +Replace(_pico_db_config, ["governor_auto_offline_timeout","",30.0]), +Replace(_pico_db_config, ["governor_raft_op_timeout","",3.0]), +Replace(_pico_db_config, ["governor_common_rpc_timeout","",3.0]), +Replace(_pico_db_config, ["governor_plugin_rpc_timeout","",10.0]), +Replace(_pico_db_config, ["sql_vdbe_opcode_max","",45000]), +Replace(_pico_db_config, ["sql_motion_row_max","",5000]), +Replace(_pico_db_config, ["memtx_checkpoint_count","default",2]), +Replace(_pico_db_config, ["memtx_checkpoint_interval","default",3600]), +Replace(_pico_db_config, ["iproto_net_msg_max","default",768]))| | 0 | 1 |BatchDml( Insert(_pico_user, [0,"guest",0,["md5","md5084e0343a0486ff05530df6c705c8bb4"],1,"user"]), Insert(_pico_privilege, [1,0,"login","universe",0,0]), @@ -433,8 +433,9 @@ Insert(_pico_table, [{_pico_plugin_migration},"_pico_plugin_migration",{{"Global Insert(_pico_index, [{_pico_plugin_migration},0,"_pico_plugin_migration_primary_key","tree",[{{"unique":true}}],[["plugin_name","string",null,false,null],["migration_file","string",null,false,null]],true,0]), Insert(_pico_table, [{_pico_plugin_config},"_pico_plugin_config",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"plugin"}},{{"field_type":"string","is_nullable":false,"name":"version"}},{{"field_type":"string","is_nullable":false,"name":"entity"}},{{"field_type":"string","is_nullable":false,"name":"key"}},{{"field_type":"any","is_nullable":true,"name":"value"}}],0,true,"memtx",1,""]), Insert(_pico_index, [{_pico_plugin_config},0,"_pico_plugin_config_pk","tree",[{{"unique":true}}],[["plugin","string",null,false,null],["version","string",null,false,null],["entity","string",null,false,null],["key","string",null,false,null]],true,0]), -Insert(_pico_table, [{_pico_db_config},"_pico_db_config",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"key"}},{{"field_type":"any","is_nullable":false,"name":"value"}}],0,true,"memtx",1,""]), -Insert(_pico_index, [{_pico_db_config},0,"_pico_db_config_key","tree",[{{"unique":true}}],[["key","string",null,false,null]],true,0]) +Insert(_pico_table, [{_pico_db_config},"_pico_db_config",{{"Global":null}},[{{"field_type":"string","is_nullable":false,"name":"key"}},{{"field_type":"string","is_nullable":false,"name":"scope"}},{{"field_type":"any","is_nullable":false,"name":"value"}}],0,true,"memtx",1,""]), +Insert(_pico_index, [{_pico_db_config},0,"_pico_db_config_pk","tree",[{{"unique":true}}],[["key","string",null,false,null],["scope","string",null,false,null]],true,0]), +Insert(_pico_index, [{_pico_db_config},1,"_pico_db_config_key","tree",[{{"unique":false}}],[["key","string",null,false,null]],true,0]) )| | 0 | 1 |AddNode(1)| | 0 | 2 |-| diff --git a/test/int/test_runtime_configuration.py b/test/int/test_runtime_configuration.py index 3d46ed8e97..76275374f1 100644 --- a/test/int/test_runtime_configuration.py +++ b/test/int/test_runtime_configuration.py @@ -12,9 +12,9 @@ def test_set_via_alter_system(cluster: Cluster): assert box_config["checkpoint_count"] == 2 # picodata parameters names are slightly different from the tarantools - instance.sql("ALTER SYSTEM SET iproto_net_msg_max TO 100") - instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 100") - instance.sql("ALTER SYSTEM SET memtx_checkpoint_count TO 100") + instance.sql("ALTER SYSTEM SET iproto_net_msg_max TO 100 FOR ALL TIERS") + instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 100 FOR ALL TIERS") + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count TO 100 FOR ALL TIERS") # parameters values changed box_config = instance.eval("return box.cfg") @@ -52,13 +52,92 @@ def test_set_via_alter_system(cluster: Cluster): ): instance.sql("ALTER SYSTEM SET memtx_checkpoint_count = -1") + # can't specify non existent parameter + with pytest.raises( + TarantoolError, + match="""unknown parameter: \'non_existing_name\'""", + ): + instance.sql("ALTER SYSTEM SET non_existing_name = -1") + + # can't specify non existent parameter + with pytest.raises( + TarantoolError, + match="""unknown parameter: \'non_existing_name\'""", + ): + instance.sql("ALTER SYSTEM SET non_existing_name = -1 FOR TIER non_existent") + + # can't specify non existent parameter + with pytest.raises( + TarantoolError, + match="""unknown parameter: \'non_existing_name\'""", + ): + instance.sql("ALTER SYSTEM SET non_existing_name = -1 FOR ALL TIERS") + + # can't specify no tier for tiered parameter + with pytest.raises( + TarantoolError, + match="""can\'t set tiered parameter without specifing tier""", + ): + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count = 10") + + # can't specify tier for global parameter + with pytest.raises( + TarantoolError, + match="""can\'t set parameter with global scope for tier \'default\'""", + ): + instance.sql("ALTER SYSTEM SET raft_wal_count_max = 10 FOR TIER default") + + # can't specify tier for global parameter + with pytest.raises( + TarantoolError, + match="""can\'t set parameter with global scope for tier \'default\'""", + ): + instance.sql("ALTER SYSTEM SET pg_statement_max = 10 FOR TIER default") + + # but it's ok to specify `for all tiers` for parameter with global scope + instance.sql("ALTER SYSTEM SET pg_statement_max = 2000 FOR ALL TIERS") + + # but it's ok to specify `for all tiers` for parameter with tier scope + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count = 200 FOR ALL TIERS") + + # can't specify non existent tier + with pytest.raises( + TarantoolError, + match="""specified tier \'non_existent\' doesn\'t exist""", + ): + instance.sql("ALTER SYSTEM SET memtx_checkpoint_count = 200 FOR TIER non_existent") + + # reset part + + # can't specify tier for global parameter in reset too + with pytest.raises( + TarantoolError, + match="""can\'t reset parameter with global scope for tier \'default\'""", + ): + instance.sql("ALTER SYSTEM RESET raft_wal_count_max FOR TIER default") + + # but it's ok to use both `FOR ALL TIERS` and nothing + instance.sql("ALTER SYSTEM RESET raft_wal_count_max FOR ALL TIERS") + instance.sql("ALTER SYSTEM RESET raft_wal_count_max") + + # can't specify no tier for tiered parameter in reset too + with pytest.raises( + TarantoolError, + match="""can\'t reset tiered parameter without specifing tier""", + ): + instance.sql("ALTER SYSTEM RESET memtx_checkpoint_interval") + + # but it's ok to use both `FOR ALL TIERS` and valid tier + instance.sql("ALTER SYSTEM RESET memtx_checkpoint_interval FOR ALL TIERS") + instance.sql("ALTER SYSTEM RESET memtx_checkpoint_interval FOR TIER default") + def test_snapshot_and_dynamic_parameters(cluster: Cluster): i1, i2, _ = cluster.deploy(instance_count=3) i2.kill() - i1.sql("ALTER SYSTEM SET iproto_net_msg_max = 100") + i1.sql("ALTER SYSTEM SET iproto_net_msg_max = 100 FOR ALL TIERS") # Trigger raft log compaction i1.sql("ALTER SYSTEM SET raft_wal_count_max TO 1") @@ -70,3 +149,59 @@ def test_snapshot_and_dynamic_parameters(cluster: Cluster): for catched_up_by_snapshot in [i2, i4]: box_config = catched_up_by_snapshot.eval("return box.cfg") assert box_config["net_msg_max"] == 100 + + +def test_set_parameters_with_tier_scope(cluster: Cluster): + cluster.set_config_file( + yaml=""" +cluster: + name: test + tier: + red: + blue: +""" + ) + + red_instance = cluster.add_instance(tier="red") + blue_instance = cluster.add_instance(tier="blue") + + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 3600 + red_config = red_instance.eval("return box.cfg") + assert red_config["checkpoint_interval"] == 3600 + + red_instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 100 FOR TIER blue") + + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 100 + red_config = red_instance.eval("return box.cfg") + assert red_config["checkpoint_interval"] == 3600 + + blue_instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 10 FOR TIER red") + + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 100 + red_config = red_instance.eval("return box.cfg") + assert red_config["checkpoint_interval"] == 10 + + # check that reset works + red_instance.sql("ALTER SYSTEM RESET memtx_checkpoint_interval FOR TIER blue") + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 3600 + + # check that reset for all tiers works + red_instance.sql("ALTER SYSTEM RESET memtx_checkpoint_interval FOR ALL TIERS") + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 3600 + red_config = red_instance.eval("return box.cfg") + assert red_config["checkpoint_interval"] == 3600 + + # check that reset all works + red_instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 10 FOR TIER blue") + red_instance.sql("ALTER SYSTEM SET memtx_checkpoint_interval TO 20 FOR TIER red") + + red_instance.sql("ALTER SYSTEM RESET ALL") + blue_config = blue_instance.eval("return box.cfg") + assert blue_config["checkpoint_interval"] == 3600 + red_config = red_instance.eval("return box.cfg") + assert red_config["checkpoint_interval"] == 3600 diff --git a/test/int/test_sql.py b/test/int/test_sql.py index 18aee720f3..21ed88389e 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -267,6 +267,7 @@ def test_read_from_system_tables(cluster: Cluster): ) assert data["metadata"] == [ {"name": "key", "type": "string"}, + {"name": "scope", "type": "string"}, {"name": "value", "type": "any"}, ] # Ignore values for the sake of stability @@ -5252,69 +5253,69 @@ def test_alter_system_property(cluster: Cluster): i1 = cluster.instances[0] non_default_prop = [ - ("auth_password_length_min", 10), - ("auth_password_enforce_digits", True), - ("auth_password_enforce_uppercase", True), - ("auth_password_enforce_lowercase", True), - ("auth_password_enforce_specialchars", False), - ("governor_auto_offline_timeout", 12), - ("auth_login_attempt_max", 4), - ("pg_statement_max", 1024), - ("pg_portal_max", 1024), - ("raft_snapshot_chunk_size_max", 1500), - ("raft_snapshot_read_view_close_timeout", 12312.4), - ("auth_password_enforce_uppercase", False), - ("auth_password_enforce_lowercase", False), - ("auth_password_enforce_specialchars", True), - ("auth_login_attempt_max", 8), - ("pg_statement_max", 4096), - ("pg_portal_max", 2048), - ("governor_raft_op_timeout", 10), - ("governor_common_rpc_timeout", 10), - ("governor_plugin_rpc_timeout", 20), + ("auth_password_length_min", "", 10), + ("auth_password_enforce_digits", "", True), + ("auth_password_enforce_uppercase", "", True), + ("auth_password_enforce_lowercase", "", True), + ("auth_password_enforce_specialchars", "", False), + ("governor_auto_offline_timeout", "", 12), + ("auth_login_attempt_max", "", 4), + ("pg_statement_max", "", 1024), + ("pg_portal_max", "", 1024), + ("raft_snapshot_chunk_size_max", "", 1500), + ("raft_snapshot_read_view_close_timeout", "", 12312.4), + ("auth_password_enforce_uppercase", "", False), + ("auth_password_enforce_lowercase", "", False), + ("auth_password_enforce_specialchars", "", True), + ("auth_login_attempt_max", "", 8), + ("pg_statement_max", "", 4096), + ("pg_portal_max", "", 2048), + ("governor_raft_op_timeout", "", 10), + ("governor_common_rpc_timeout", "", 10), + ("governor_plugin_rpc_timeout", "", 20), ] default_prop = [] - for index, (prop, value) in enumerate(non_default_prop): + for index, (prop, scope, value) in enumerate(non_default_prop): data = i1.sql(f""" select * from "_pico_db_config" where "key" = '{prop}' """) - default_prop.append(data[0][1]) + default_prop.append(data[0][2]) # check simple setting data = i1.sql(f""" alter system set "{prop}" to {value} """) assert data["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == value + assert data[0][2] == value # change back to non default value for check of reset data = i1.sql(f""" alter system set "{prop}" to default """) assert data["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == default_prop[index] + assert data[0][2] == default_prop[index] # change back to data = i1.sql(f""" alter system set "{prop}" to {value} """) assert data["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == value + assert data[0][2] == value # check reset to default data = i1.sql(f""" alter system reset "{prop}" """) assert data["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == default_prop[-1] + assert data[0][2] == default_prop[-1] # change back to non default value for later check of reset all data = i1.sql(f""" alter system set "{prop}" to {value} """) assert data["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == value + assert data[0][2] == value # check reset all data = i1.sql(""" alter system reset all """) assert data["row_count"] == 1 - for (prop, _), default in zip(non_default_prop, default_prop): + for (prop, _, _), default in zip(non_default_prop, default_prop): data = i1.sql(""" select * from "_pico_db_config" where "key" = ? """, prop) - assert data[0][1] == default + assert data[0][2] == default def test_alter_system_property_errors(cluster: Cluster): @@ -5323,33 +5324,33 @@ def test_alter_system_property_errors(cluster: Cluster): # check valid insertion (int) data = i1.sql(""" select * from "_pico_db_config" where "key" = 'governor_auto_offline_timeout' """) - assert data == [["governor_auto_offline_timeout", 30]] + assert data == [["governor_auto_offline_timeout", "", 30]] dml = i1.sql( """ - alter system set "governor_auto_offline_timeout" to 3 + alter system set "governor_auto_offline_timeout" to 3 for all tiers """ ) assert dml["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = 'governor_auto_offline_timeout' """) - assert data == [["governor_auto_offline_timeout", 3]] + assert data == [["governor_auto_offline_timeout", "", 3]] # check valid insertion (bool) data = i1.sql(""" select * from "_pico_db_config" where "key" = 'auth_password_enforce_digits' """) - assert data == [["auth_password_enforce_digits", True]] + assert data == [["auth_password_enforce_digits", "", True]] dml = i1.sql( """ - alter system set "auth_password_enforce_digits" to false + alter system set "auth_password_enforce_digits" to false for all tiers """ ) assert dml["row_count"] == 1 data = i1.sql(""" select * from "_pico_db_config" where "key" = 'auth_password_enforce_digits' """) - assert data == [["auth_password_enforce_digits", False]] + assert data == [["auth_password_enforce_digits", "", False]] # such property does not exist with pytest.raises(TarantoolError, match="unknown parameter: 'invalid_parameter_name'"): dml = i1.sql( """ - alter system set "invalid_parameter_name" to 3 + alter system set "invalid_parameter_name" to 3 for all tiers """ ) @@ -5360,7 +5361,7 @@ def test_alter_system_property_errors(cluster: Cluster): ): dml = i1.sql( """ - alter system set "auth_password_enforce_digits" to 3 + alter system set "auth_password_enforce_digits" to 3 for all tiers """ ) @@ -5368,18 +5369,7 @@ def test_alter_system_property_errors(cluster: Cluster): with pytest.raises(TarantoolError, match="unknown parameter: 'next_schema_version'"): dml = i1.sql( """ - alter system set "next_schema_version" to 3 - """ - ) - - # properties may be changed only globally yet - with pytest.raises( - TarantoolError, - match="unsupported action/entity: specifying tier name in alter system, use 'all tiers' instead", - ): - dml = i1.sql( - """ - alter system set "auto_offline_timeout" to true for tier foo + alter system set "next_schema_version" to 3 for all tiers """ ) -- GitLab