From 6bbfa9c8ca73cdc642410a3af70cb860b6542b14 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Wed, 28 Feb 2024 18:51:58 +0300 Subject: [PATCH] feat: --config parameter to specify picodata configuration file Also here we refactor all the places where we checked command line parameters, because now PicodataConfig is the main source of this information. --- src/address.rs | 2 +- src/bootstrap_entries.rs | 10 +- src/cli/args.rs | 412 +++------------- src/cli/connect.rs | 3 +- src/cli/run.rs | 26 +- src/cli/test.rs | 2 +- src/config.rs | 926 +++++++++++++++++++++++++++++++++++ src/failure_domain.rs | 5 +- src/lib.rs | 163 +++--- src/luamod.rs | 46 +- src/traft/error.rs | 8 + src/util.rs | 28 +- test/int/test_config_file.py | 23 + 13 files changed, 1180 insertions(+), 474 deletions(-) create mode 100644 src/config.rs diff --git a/src/address.rs b/src/address.rs index 05089f281e..85abde8396 100644 --- a/src/address.rs +++ b/src/address.rs @@ -4,7 +4,7 @@ use tarantool::tlua; const DEFAULT_HOST: &str = "localhost"; const DEFAULT_PORT: &str = "3301"; -#[derive(Debug, Clone, PartialEq, Eq, tlua::Push)] +#[derive(Debug, Clone, PartialEq, Eq, tlua::Push, tlua::PushInto)] pub struct Address { pub user: Option<String>, pub host: String, diff --git a/src/bootstrap_entries.rs b/src/bootstrap_entries.rs index f740164c1b..15d6f17c9e 100644 --- a/src/bootstrap_entries.rs +++ b/src/bootstrap_entries.rs @@ -1,7 +1,7 @@ use ::raft::prelude as raft; use protobuf::Message; -use crate::cli::args; +use crate::config::PicodataConfig; use crate::instance::Instance; use crate::schema; use crate::schema::ADMIN_ID; @@ -13,7 +13,11 @@ use crate::tier::Tier; use crate::traft; use crate::traft::op; -pub(super) fn prepare(args: &args::Run, instance: &Instance, tiers: &[Tier]) -> Vec<raft::Entry> { +pub(super) fn prepare( + config: &PicodataConfig, + instance: &Instance, + tiers: &[Tier], +) -> Vec<raft::Entry> { let mut init_entries = Vec::new(); let mut init_entries_push_op = |dml: tarantool::Result<op::Dml>| { @@ -38,7 +42,7 @@ pub(super) fn prepare(args: &args::Run, instance: &Instance, tiers: &[Tier]) -> ClusterwideTable::Address, &traft::PeerAddress { raft_id: instance.raft_id, - address: args.advertise_address(), + address: config.instance.advertise_address().to_host_port(), }, ADMIN_ID, )); diff --git a/src/cli/args.rs b/src/cli/args.rs index e301cfa72c..fbf0838c15 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -1,8 +1,6 @@ use crate::address::Address; -use crate::failure_domain::FailureDomain; +use crate::config::DEFAULT_USERNAME; use crate::instance::InstanceId; -use crate::replicaset::ReplicasetId; -use crate::tier::DEFAULT_TIER; use crate::util::Uppercase; use clap::Parser; use std::borrow::Cow; @@ -30,29 +28,29 @@ pub enum Picodata { #[derive(Debug, Parser, tlua::Push, PartialEq)] #[clap(about = "Run the picodata instance")] pub struct Run { - #[clap( - long, - value_name = "NAME", - default_value = "demo", - env = "PICODATA_CLUSTER_ID" - )] + #[clap(long, value_name = "NAME", env = "PICODATA_CLUSTER_ID")] /// Name of the cluster. The instance will refuse /// to join a cluster with a different name. - pub cluster_id: String, + /// + /// By default this will be "demo". + pub cluster_id: Option<String>, - #[clap( - long, - value_name = "PATH", - default_value = ".", - env = "PICODATA_DATA_DIR" - )] - /// Here the instance persists all of its data - pub data_dir: String, + #[clap(long, value_name = "PATH", env = "PICODATA_DATA_DIR")] + /// Here the instance persists all of its data. + /// + /// By default this is the current working directory ("."). + pub data_dir: Option<String>, + + #[clap(long, value_name = "PATH", env = "PICODATA_CONFIG_FILE")] + /// Path to configuration file in yaml format. + /// + /// By default the "config.yaml" in the data directory is used. + pub config: Option<String>, #[clap(long, value_name = "NAME", env = "PICODATA_INSTANCE_ID")] /// Name of the instance. /// If not defined, it'll be generated automatically. - pub instance_id: Option<InstanceId>, + pub instance_id: Option<String>, #[clap( long = "advertise", @@ -67,21 +65,23 @@ pub struct Run { short = 'l', long = "listen", value_name = "[HOST][:PORT]", - default_value = "localhost:3301", env = "PICODATA_LISTEN" )] - /// Socket bind address - pub listen: Address, + /// Socket bind address. + /// + /// By default "localhost:3301" is used. + pub listen: Option<Address>, #[clap( long = "peer", value_name = "[HOST][:PORT]", require_value_delimiter = true, use_value_delimiter = true, - default_value = "localhost:3301", env = "PICODATA_PEER" )] /// Address(es) of other instance(s) + /// + /// By default "localhost:3301" is used. pub peers: Vec<Address>, #[clap( @@ -102,22 +102,24 @@ pub struct Run { pub failure_domain: Vec<(Uppercase, Uppercase)>, #[clap(long, value_name = "NAME", env = "PICODATA_REPLICASET_ID")] - /// Name of the replicaset - pub replicaset_id: Option<ReplicasetId>, + /// Name of the replicaset. + /// + /// If not specified, a replicaset will be automatically chosen based on the + /// failure domain settings. + pub replicaset_id: Option<String>, - #[clap(long, arg_enum, default_value = "info", env = "PICODATA_LOG_LEVEL")] - /// Log level - log_level: LogLevel, + #[clap(long, arg_enum, env = "PICODATA_LOG_LEVEL")] + /// Log level. + /// + /// By default "info" is used. + pub log_level: Option<LogLevel>, - #[clap( - long, - default_value = "1", - env = "PICODATA_INIT_REPLICATION_FACTOR", - group = "init_cfg" - )] + #[clap(long, env = "PICODATA_INIT_REPLICATION_FACTOR", group = "init_cfg")] /// Total number of replicas (copies of data) for each replicaset. /// It makes sense only when starting cluster without --init-cfg option. - pub init_replication_factor: u8, + /// + /// By default 1 is used. + pub init_replication_factor: Option<u8>, #[clap(long, value_name = "PATH", env = "PICODATA_SCRIPT")] /// A path to a lua script that will be executed at postjoin stage. @@ -141,7 +143,9 @@ pub struct Run { /// `picodata admin`. Unlike connecting via `picodata connect` /// console communication occurs in plain text /// and always operates under the admin account. - /// Default value: <data_dir>/admin.sock + /// + /// By default the "admin.sock" in the data directory is used. + // TODO: rename to admin_socket pub admin_sock: Option<String>, #[clap( @@ -154,9 +158,11 @@ pub struct Run { /// Path to `some_plugin_name.so` pub plugins: Vec<String>, + #[clap(long = "tier", value_name = "TIER", env = "PICODATA_INSTANCE_TIER")] /// Name of the tier to which the instance will belong. - #[clap(long = "tier", value_name = "TIER", default_value = DEFAULT_TIER, env = "PICODATA_INSTANCE_TIER")] - pub tier: String, + /// + /// By default "default" is used. + pub tier: Option<String>, /// Filepath to configuration file in yaml format. #[clap( @@ -167,6 +173,7 @@ pub struct Run { )] pub init_cfg: Option<String>, + #[clap(long = "audit", value_name = "PATH", env = "PICODATA_AUDIT_LOG")] /// Configuration for the audit log. /// Valid options: /// @@ -182,7 +189,6 @@ pub struct Run { /// /// picodata run --audit 'syslog:' /// - #[clap(long = "audit", value_name = "PATH", env = "PICODATA_AUDIT_LOG")] pub audit: Option<String>, #[clap(long = "shredding", env = "PICODATA_SHREDDING")] @@ -208,37 +214,37 @@ pub struct Run { /// pub log: Option<String>, - #[clap( - long = "memtx-memory", - env = "PICODATA_MEMTX_MEMORY", - default_value = "67108864" - )] + #[clap(long = "memtx-memory", env = "PICODATA_MEMTX_MEMORY")] /// The amount of memory in bytes to allocate for the database engine. - pub memtx_memory: u64, + /// + /// By default 67'108'864 is used. + pub memtx_memory: Option<u64>, - /// Path to a plain-text file with a password for the system user "pico_service". - /// This password will be used for internal communication among instances of - /// picodata, so it must be the same on all instances. #[clap( long = "service-password-file", value_name = "PATH", env = "PICODATA_SERVICE_PASSWORD_FILE" )] + /// Path to a plain-text file with a password for the system user "pico_service". + /// This password will be used for internal communication among instances of + /// picodata, so it must be the same on all instances. pub service_password_file: Option<String>, } // Copy enum because clap:ArgEnum can't be derived for the foreign SayLevel. -#[derive(Debug, Copy, Clone, tlua::Push, PartialEq, clap::ArgEnum)] -#[clap(rename_all = "lower")] -enum LogLevel { - Fatal, - System, - Error, - Crit, - Warn, - Info, - Verbose, - Debug, +tarantool::define_str_enum! { + #[derive(clap::ArgEnum)] + #[clap(rename_all = "lower")] + pub enum LogLevel { + Fatal = "fatal", + System = "system", + Error = "error", + Crit = "crit", + Warn = "warn", + Info = "info", + Verbose = "verbose", + Debug = "debug", + } } impl From<LogLevel> for SayLevel { @@ -271,30 +277,6 @@ impl Run { Ok(args) } - - pub fn admin_sock(&self) -> String { - match &self.admin_sock { - Some(path) => path.clone(), - None => self.data_dir.clone() + "/admin.sock", - } - } - - pub fn advertise_address(&self) -> String { - let Address { host, port, .. } = self.advertise_address.as_ref().unwrap_or(&self.listen); - format!("{host}:{port}") - } - - pub fn log_level(&self) -> SayLevel { - self.log_level.into() - } - - pub fn failure_domain(&self) -> FailureDomain { - FailureDomain::from( - self.failure_domain - .iter() - .map(|(k, v)| (k.clone(), v.clone())), - ) - } } //////////////////////////////////////////////////////////////////////////////// @@ -473,267 +455,3 @@ impl Admin { Ok(vec![current_exe()?]) } } - -pub const DEFAULT_USERNAME: &str = "guest"; - -#[cfg(test)] -mod tests { - use super::*; - - macro_rules! parse { - ($subcmd:ty, $($arg:literal),*) => {{ - let args = vec![stringify!($subcmd), $($arg),*]; - <$subcmd>::try_parse_from(args).unwrap() - }} - } - - struct EnvDump(Vec<(String, String)>); - impl EnvDump { - fn new() -> Self { - let dump: Vec<(String, String)> = std::env::vars() - .filter(|(k, _)| k.starts_with("PICODATA_")) - .collect(); - for (k, _) in &dump { - std::env::remove_var(k); - } - Self(dump) - } - } - - impl Drop for EnvDump { - fn drop(&mut self) { - for (k, v) in self.0.drain(..) { - std::env::set_var(k, v); - } - } - } - - #[test] - fn test_parse() { - let _env_dump = EnvDump::new(); - - std::env::set_var("PICODATA_INSTANCE_ID", "instance-id-from-env"); - { - let parsed = parse![Run,]; - assert_eq!(parsed.instance_id, Some("instance-id-from-env".into())); - assert_eq!( - parsed.peers.as_ref(), - vec![Address { - user: None, - host: "localhost".into(), - port: "3301".into() - }] - ); - assert_eq!( - parsed.listen, - Address { - user: None, - host: "localhost".into(), - port: "3301".into() - } - ); // default - assert_eq!(parsed.advertise_address(), "localhost:3301"); // default - assert_eq!(parsed.log_level(), SayLevel::Info); // default - assert_eq!(parsed.failure_domain(), FailureDomain::default()); // default - - let parsed = parse![Run, "--instance-id", "instance-id-from-args"]; - assert_eq!(parsed.instance_id, Some("instance-id-from-args".into())); - - let parsed = parse![Run, "--instance-id", ""]; - assert_eq!(parsed.instance_id, Some("".into())); - } - - std::env::set_var("PICODATA_PEER", "peer-from-env"); - { - let parsed = parse![Run,]; - assert_eq!( - parsed.peers.as_ref(), - vec![Address { - user: None, - host: "peer-from-env".into(), - port: "3301".into() - }] - ); - - let parsed = parse![Run, "--peer", "peer-from-args"]; - assert_eq!( - parsed.peers.as_ref(), - vec![Address { - user: None, - host: "peer-from-args".into(), - port: "3301".into() - }] - ); - - let parsed = parse![Run, "--peer", ":3302"]; - assert_eq!( - parsed.peers.as_ref(), - vec![Address { - user: None, - host: "localhost".into(), - port: "3302".into() - }] - ); - - let parsed = parse![Run, "--peer", "p1", "--peer", "p2,p3"]; - assert_eq!( - parsed.peers.as_ref(), - vec![ - Address { - user: None, - host: "p1".into(), - port: "3301".into() - }, - Address { - user: None, - host: "p2".into(), - port: "3301".into() - }, - Address { - user: None, - host: "p3".into(), - port: "3301".into() - } - ] - ); - } - - std::env::set_var("PICODATA_INSTANCE_ID", ""); - { - let parsed = parse![Run,]; - assert_eq!(parsed.instance_id, Some("".into())); - } - - std::env::remove_var("PICODATA_INSTANCE_ID"); - { - let parsed = parse![Run,]; - assert_eq!(parsed.instance_id, None); - } - - std::env::set_var("PICODATA_LISTEN", "listen-from-env"); - { - let parsed = parse![Run,]; - assert_eq!( - parsed.listen, - Address { - user: None, - host: "listen-from-env".into(), - port: "3301".into() - } - ); - assert_eq!(parsed.advertise_address(), "listen-from-env:3301"); - - let parsed = parse![Run, "-l", "listen-from-args"]; - assert_eq!( - parsed.listen, - Address { - user: None, - host: "listen-from-args".into(), - port: "3301".into() - } - ); - assert_eq!(parsed.advertise_address(), "listen-from-args:3301"); - } - - std::env::set_var("PICODATA_ADVERTISE", "advertise-from-env"); - { - let parsed = parse![Run,]; - assert_eq!( - parsed.listen, - Address { - user: None, - host: "listen-from-env".into(), - port: "3301".into() - } - ); - assert_eq!(parsed.advertise_address(), "advertise-from-env:3301"); - - let parsed = parse![Run, "-l", "listen-from-args"]; - assert_eq!( - parsed.listen, - Address { - user: None, - host: "listen-from-args".into(), - port: "3301".into() - } - ); - assert_eq!(parsed.advertise_address(), "advertise-from-env:3301"); - - let parsed = parse![Run, "--advertise", "advertise-from-args"]; - assert_eq!( - parsed.listen, - Address { - user: None, - host: "listen-from-env".into(), - port: "3301".into() - } - ); - assert_eq!(parsed.advertise_address(), "advertise-from-args:3301"); - } - - std::env::set_var("PICODATA_LOG_LEVEL", "verbose"); - { - let parsed = parse![Run,]; - assert_eq!(parsed.log_level(), SayLevel::Verbose); - - let parsed = parse![Run, "--log-level", "warn"]; - assert_eq!(parsed.log_level(), SayLevel::Warn); - } - - std::env::set_var("PICODATA_FAILURE_DOMAIN", "k1=env1,k2=env2"); - { - let parsed = parse![Run,]; - assert_eq!( - parsed.failure_domain(), - FailureDomain::from([("K1", "ENV1"), ("K2", "ENV2")]) - ); - - let parsed = parse![Run, "--failure-domain", "k1=arg1,k1=arg1-again"]; - assert_eq!( - parsed.failure_domain(), - FailureDomain::from([("K1", "ARG1-AGAIN")]) - ); - - let parsed = parse![ - Run, - "--failure-domain", - "k2=arg2", - "--failure-domain", - "k3=arg3,k4=arg4" - ]; - assert_eq!( - parsed.failure_domain(), - FailureDomain::from([("K2", "ARG2"), ("K3", "ARG3"), ("K4", "ARG4")]) - ); - } - - { - let parsed = parse![Run,]; - assert_eq!(parsed.init_replication_factor, 1); - - let parsed = parse![Run, "--init-replication-factor", "7"]; - assert_eq!(parsed.init_replication_factor, 7); - - std::env::set_var("PICODATA_INIT_REPLICATION_FACTOR", "9"); - let parsed = parse![Run,]; - assert_eq!(parsed.init_replication_factor, 9); - } - - { - let parsed = parse![Run, "-i"]; - assert!(parsed.interactive_mode); - - let parsed = parse![Run, "--interactive"]; - assert!(parsed.interactive_mode); - - let parsed = parse![Run,]; - assert!(!parsed.interactive_mode); - } - - { - std::env::set_var("PICODATA_USER", "batman"); - let parsed = parse!(Connect, "somewhere:3301"); - assert_eq!(parsed.user, "batman"); - } - } -} diff --git a/src/cli/connect.rs b/src/cli/connect.rs index e127d5a8b4..d891ee45b3 100644 --- a/src/cli/connect.rs +++ b/src/cli/connect.rs @@ -4,10 +4,11 @@ use std::str::FromStr; use tarantool::network::{AsClient, Client, Config}; use crate::address::Address; +use crate::config::DEFAULT_USERNAME; use crate::tarantool_main; use crate::util::{prompt_password, unwrap_or_terminate}; -use super::args::{self, DEFAULT_USERNAME}; +use super::args; use super::console::{Command, Console, ReplError, SpecialCommand}; use comfy_table::{ContentArrangement, Table}; use serde::{Deserialize, Serialize}; diff --git a/src/cli/run.rs b/src/cli/run.rs index fdd81ed392..5edfcff224 100644 --- a/src/cli/run.rs +++ b/src/cli/run.rs @@ -4,14 +4,24 @@ use nix::sys::wait::{waitpid, WaitStatus}; use nix::unistd::{self, fork, ForkResult}; use tarantool::fiber; +use crate::cli::args; +use crate::config::PicodataConfig; use crate::{ipc, tarantool_main, tlog, Entrypoint, IpcMessage}; -use super::args; - pub fn main(args: args::Run) -> ! { + let tt_args = args.tt_args().unwrap(); + + let res = PicodataConfig::init(args); + let config = crate::unwrap_ok_or!(res, + Err(e) => { + tlog!(Error, "{e}"); + std::process::exit(1); + } + ); + // Set the log level as soon as possible to not miss any messages during // initialization. - tlog::set_log_level(args.log_level()); + tlog::set_log_level(config.instance.log_level()); // Tarantool implicitly parses some environment variables. // We don't want them to affect the behavior and thus filter them out. @@ -80,9 +90,9 @@ pub fn main(args: args::Run) -> ! { drop(to_child); let rc = tarantool_main!( - args.tt_args().unwrap(), - callback_data: (entrypoint, args, to_parent, from_parent), - callback_data_type: (Entrypoint, args::Run, ipc::Sender<IpcMessage>, ipc::Fd), + tt_args, + callback_data: (entrypoint, config, to_parent, from_parent), + callback_data_type: (Entrypoint, PicodataConfig, ipc::Sender<IpcMessage>, ipc::Fd), callback_body: { // We don't want a child to live without a supervisor. // @@ -104,7 +114,7 @@ pub fn main(args: args::Run) -> ! { }); std::mem::forget(fuse.start()); - if let Err(e) = entrypoint.exec(args, to_parent) { + if let Err(e) = entrypoint.exec(config, to_parent) { tlog!(Critical, "{e}"); std::process::exit(1); } @@ -169,7 +179,7 @@ pub fn main(args: args::Run) -> ! { if let Ok(msg) = msg { entrypoint = msg.next_entrypoint; if msg.drop_db { - rm_tarantool_files(&args.data_dir); + rm_tarantool_files(&config.instance.data_dir()); } } else { let rc = match status { diff --git a/src/cli/test.rs b/src/cli/test.rs index 3c1ad2f6c3..a0ee3a1bdc 100644 --- a/src/cli/test.rs +++ b/src/cli/test.rs @@ -1,4 +1,4 @@ -use crate::args; +use crate::cli::args; use crate::ipc; use crate::tarantool_main; use ::tarantool::test::TestCase; diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000000..8a7aebbfd5 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,926 @@ +use crate::address::Address; +use crate::cli::args; +use crate::failure_domain::FailureDomain; +use crate::instance::InstanceId; +use crate::replicaset::ReplicasetId; +use crate::storage; +use crate::traft::error::Error; +use crate::traft::RaftSpaceAccess; +use std::collections::HashMap; +use tarantool::log::SayLevel; +use tarantool::tlua; + +pub const DEFAULT_USERNAME: &str = "guest"; +pub const DEFAULT_LISTEN_HOST: &str = "localhost"; +pub const DEFAULT_IPROTO_PORT: &str = "3301"; + +//////////////////////////////////////////////////////////////////////////////// +// PicodataConfig +//////////////////////////////////////////////////////////////////////////////// + +#[derive( + PartialEq, + Default, + Debug, + Clone, + serde::Deserialize, + serde::Serialize, + tlua::Push, + tlua::PushInto, +)] +#[serde(deny_unknown_fields)] +pub struct PicodataConfig { + // TODO: add a flag for each parameter specifying where it came from, i.e.: + // - default value + // - configuration file + // - command line arguments + // - environment variable + // - persisted storage (because some of the values are read from the storage on restart) + #[serde(default)] + pub cluster: ClusterConfig, + + #[serde(default)] + pub instance: InstanceConfig, + // TODO: currently this doesn't compile, because serde_json::Value doesn't implement tlua::Push + // But if we had this, we could report a warning if unknown fields were specified + // #[serde(flatten)] + // pub unknown_sections: HashMap<String, serde_json::Value>, +} + +impl PicodataConfig { + // TODO: + // fn default() -> Self + // which returns an instance of config with all the default parameters. + // Also add a command to generate a default config from command line. + pub fn init(args: args::Run) -> Result<Self, Error> { + let mut config = None; + if let Some(args_path) = &args.config { + config = Some(Self::read_yaml_file(args_path)?); + } + let mut config = config.unwrap_or_default(); + + config.set_from_args(args); + + config.validate_common()?; + + Ok(config) + } + + #[inline] + pub fn read_yaml_file(path: &str) -> Result<Self, Error> { + let contents = std::fs::read_to_string(path) + .map_err(|e| Error::other(format!("can't read from '{path}': {e}")))?; + Self::read_yaml_contents(&contents) + } + + #[inline] + pub fn read_yaml_contents(contents: &str) -> Result<Self, Error> { + let config: Self = serde_yaml::from_str(contents).map_err(Error::invalid_configuration)?; + Ok(config) + } + + pub fn set_from_args(&mut self, args: args::Run) { + // TODO: add forbid_conflicts_with_args so that it's considered an error + // if a parameter is specified both in the config and in the command line + // arguments + + if let Some(data_dir) = args.data_dir { + self.instance.data_dir = Some(data_dir); + } + + if let Some(service_password_file) = args.service_password_file { + self.instance.service_password_file = Some(service_password_file); + } + + if let Some(cluster_id) = args.cluster_id { + self.cluster.cluster_id = Some(cluster_id.clone()); + self.instance.cluster_id = Some(cluster_id); + } + + if let Some(instance_id) = args.instance_id { + self.instance.instance_id = Some(instance_id); + } + + if let Some(replicaset_id) = args.replicaset_id { + self.instance.replicaset_id = Some(replicaset_id); + } + + if let Some(tier) = args.tier { + self.instance.tier = Some(tier); + } + + // TODO: remove this + if let Some(init_cfg) = args.init_cfg { + self.cluster.init_cfg = Some(init_cfg); + } + + if let Some(init_replication_factor) = args.init_replication_factor { + self.cluster.default_replication_factor = Some(init_replication_factor); + } + + if !args.failure_domain.is_empty() { + let map = args.failure_domain.into_iter(); + self.instance.failure_domain = Some(FailureDomain::from(map)); + } + + if let Some(address) = args.advertise_address { + self.instance.advertise_address = Some(address); + } + + if let Some(listen) = args.listen { + self.instance.listen = Some(listen); + } + + if let Some(http_listen) = args.http_listen { + self.instance.http_listen = Some(http_listen); + } + + if !args.peers.is_empty() { + self.instance.peers = Some(args.peers); + } + + if let Some(log_level) = args.log_level { + self.instance.log_level = Some(log_level); + } + + if let Some(log_destination) = args.log { + self.instance.log = Some(log_destination); + } + + if let Some(audit_destination) = args.audit { + self.instance.audit = Some(audit_destination); + } + + if !args.plugins.is_empty() { + self.instance.plugins = Some(args.plugins); + } + + if let Some(script) = args.script { + self.instance.deprecated_script = Some(script); + } + + if args.shredding { + self.instance.shredding = Some(true); + } + + if let Some(memtx_memory) = args.memtx_memory { + self.instance.memtx_memory = Some(memtx_memory); + } + + // TODO: the rest + } + + /// Does basic config validation. This function checks constraints + /// applicable for all configuration updates. + /// + /// Does *NOT* set default values, that is the responsibility of getter + /// methods. + /// + /// Checks specific to reloading a config file on an initialized istance are + /// done in [`Self::validate_reload`]. + fn validate_common(&self) -> Result<(), Error> { + if let (Some(cci), Some(ici)) = (&self.cluster.cluster_id, &self.instance.cluster_id) { + if cci != ici { + return Err(Error::InvalidConfiguration(format!( + "`cluster.cluster_id` ({cci}) conflicts with `instance.cluster_id` ({ici})", + ))); + } + } + + Ok(()) + } + + #[allow(unused)] + fn validate_reload(&self) -> Result<(), Error> { + todo!() + } + + #[inline] + pub fn cluster_id(&self) -> &str { + match (&self.instance.cluster_id, &self.cluster.cluster_id) { + (Some(instance_cluster_id), Some(cluster_cluster_id)) => { + assert_eq!(instance_cluster_id, cluster_cluster_id); + instance_cluster_id + } + (Some(instance_cluster_id), None) => instance_cluster_id, + (None, Some(cluster_cluster_id)) => cluster_cluster_id, + (None, None) => "demo", + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +// ClusterConfig +//////////////////////////////////////////////////////////////////////////////// + +#[derive( + PartialEq, + Default, + Debug, + Clone, + serde::Deserialize, + serde::Serialize, + tlua::Push, + tlua::PushInto, +)] +#[serde(deny_unknown_fields)] +pub struct ClusterConfig { + pub cluster_id: Option<String>, + + // TODO: remove this + pub init_cfg: Option<String>, + + /// Replication factor which is used for tiers which didn't specify one + /// explicitly. For default value see [`Self::default_replication_factor()`]. + pub default_replication_factor: Option<u8>, +} + +impl ClusterConfig { + #[inline] + pub fn default_replication_factor(&self) -> u8 { + self.default_replication_factor.unwrap_or(1) + } +} + +//////////////////////////////////////////////////////////////////////////////// +// InstanceConfig +//////////////////////////////////////////////////////////////////////////////// + +#[derive( + PartialEq, + Default, + Debug, + Clone, + serde::Deserialize, + serde::Serialize, + tlua::Push, + tlua::PushInto, +)] +#[serde(deny_unknown_fields)] +pub struct InstanceConfig { + pub data_dir: Option<String>, + pub service_password_file: Option<String>, + + pub cluster_id: Option<String>, + pub instance_id: Option<String>, + pub replicaset_id: Option<String>, + pub tier: Option<String>, + pub failure_domain: Option<FailureDomain>, + + // TODO: should this be in `cluster` section? + pub peers: Option<Vec<Address>>, + pub advertise_address: Option<Address>, + pub listen: Option<Address>, + pub http_listen: Option<Address>, + pub admin_socket: Option<String>, + + // TODO: + // - more nested sections! + // - dynamic parameters should be marked dynamic! + // - sepparate config file for common parameters + pub plugins: Option<Vec<String>>, + pub deprecated_script: Option<String>, + + pub audit: Option<String>, + pub log_level: Option<args::LogLevel>, + pub log: Option<String>, + pub log_format: Option<String>, + + // pub background: Option<bool>, + // pub pid_file: Option<String>, + + // + pub shredding: Option<bool>, + // pub core_strip: Option<bool>, + // pub core_dump: Option<bool>, + + // pub force_recovery: Option<bool>, + // pub too_long_threshold: Option<f64>, + // pub hot_standby: Option<bool>, + // pub wal_queue_max_size: Option<u64>, + // pub wal_max_size: Option<u64>, + // pub wal_mode: Option<String>, + // pub checkpoint_wal_threshold: Option<f64>, + // pub wal_cleanup_delay: Option<f64>, + // pub wal_dir_rescan_delay: Option<f64>, + + // + pub memtx_memory: Option<u64>, + // pub memtx_min_tuple_size: Option<u64>, + // pub memtx_use_mvcc_engine: Option<bool>, + // pub memtx_allocator: Option<String>, + // pub memtx_checkpoint_count: Option<u64>, + // pub memtx_checkpoint_interval: Option<f64>, + + // pub slab_alloc_factor: Option<u64>, + // pub slab_alloc_granularity: Option<u8>, + + // pub vinyl_cache: Option<u64>, + // pub vinyl_write_threads: Option<u8>, + // pub vinyl_defer_deletes: Option<bool>, + // pub vinyl_run_count_per_level: Option<u64>, + // pub vinyl_run_size_ratio: Option<f64>, + // pub vinyl_memory: Option<u64>, + // pub vinyl_read_threads: Option<u8>, + // pub vinyl_page_size: Option<u64>, + // pub vinyl_bloom_fpr: Option<f64>, + // pub vinyl_timeout: Option<f64>, + + // pub txn_isolation: Option<String>, + // pub txn_timeout: Option<f64>, + + // pub auth_type: Option<String>, + // pub iproto_msg_max: Option<u64>, + // pub iproto_readahead: Option<u64>, + // pub iproto_threads: Option<u8>, + + // Not configurable currently + // pub read_only: Option<bool>, + // pub bootstrap_strategy: Option<String>, + // pub replication_anon: Option<bool>, + // pub replication_connect_timeout: Option<f64>, + // pub replication_skip_conflict: Option<bool>, + // pub replication_sync_lag: Option<f64>, + // pub replication_sync_timeout: Option<f64>, + // pub replication_synchro_quorum: Option<String>, + // pub replication_synchro_timeout: Option<f64>, + // pub replication_timeout: Option<f64>, + // pub replication_threads: Option<u8>, + // pub election_fencing_mode: Option<String>, + // pub election_mode: Option<String>, + // pub election_timeout: Option<f64>, + + // TODO: correct type + // pub metrics: Option<()>, + + // pub feedback_enabled: Option<bool>, + // pub feedback_interval: Option<f64>, + // pub feedback_host: Option<String>, + // pub feedback_send_metrics: Option<bool>, + // pub feedback_metrics_collect_interval: Option<f64>, + // pub feedback_metrics_limit: Option<u64>, + // pub feedback_crashinfo: Option<bool>, + + // pub sql_cache_size: Option<u64>, + + // pub worker_pool_threads: Option<u8>, +} + +impl InstanceConfig { + #[inline] + pub fn data_dir(&self) -> String { + self.data_dir.clone().unwrap_or_else(|| ".".to_owned()) + } + + #[inline] + pub fn instance_id(&self) -> Option<InstanceId> { + self.instance_id.as_deref().map(InstanceId::from) + } + + #[inline] + pub fn replicaset_id(&self) -> Option<ReplicasetId> { + self.replicaset_id.as_deref().map(ReplicasetId::from) + } + + #[inline] + pub fn tier(&self) -> &str { + self.tier.as_deref().unwrap_or("default") + } + + #[inline] + pub fn failure_domain(&self) -> FailureDomain { + self.failure_domain.clone().unwrap_or_default() + } + + #[inline] + pub fn peers(&self) -> Vec<Address> { + match &self.peers { + Some(peers) if !peers.is_empty() => peers.clone(), + _ => vec![Address { + user: None, + host: DEFAULT_LISTEN_HOST.into(), + port: DEFAULT_IPROTO_PORT.into(), + }], + } + } + + #[inline] + pub fn advertise_address(&self) -> Address { + if let Some(advertise_address) = &self.advertise_address { + advertise_address.clone() + } else { + self.listen() + } + } + + #[inline] + pub fn listen(&self) -> Address { + if let Some(listen) = &self.listen { + listen.clone() + } else { + Address { + user: None, + host: DEFAULT_LISTEN_HOST.into(), + port: DEFAULT_IPROTO_PORT.into(), + } + } + } + + #[inline] + pub fn admin_socket(&self) -> String { + if let Some(admin_socket) = &self.admin_socket { + admin_socket.to_owned() + } else { + format!("{}/admin.sock", self.data_dir()) + } + } + + #[inline] + pub fn log_level(&self) -> SayLevel { + if let Some(level) = self.log_level { + level.into() + } else { + SayLevel::Info + } + } + + // TODO: give a default value for audit + // pub fn audit(&self) -> String {} + + #[inline] + pub fn shredding(&self) -> bool { + self.shredding.unwrap_or(false) + } + + // TODO + // pub core_strip: Option<bool>, + // pub core_dump: Option<bool>, + // pub force_recovery: Option<bool>, + + // pub net_msg_max: Option<u64>, + // pub too_long_threshold: Option<f64>, + + #[inline] + pub fn memtx_memory(&self) -> u64 { + self.memtx_memory.unwrap_or(64 * 1024 * 1024) + } + + // TODO + // pub memtx_min_tuple_size: Option<u64>, + // pub memtx_use_mvcc_engine: Option<bool>, + // pub memtx_allocator: Option<String>, + // pub memtx_checkpoint_count: Option<u64>, + // pub memtx_checkpoint_interval: Option<f64>, + + // pub slab_alloc_factor: Option<u64>, + + // pub vinyl_cache: Option<u64>, + // pub vinyl_write_threads: Option<u8>, + // pub vinyl_defer_deletes: Option<bool>, + // pub vinyl_run_count_per_level: Option<u64>, + // pub vinyl_run_size_ratio: Option<f64>, + // pub vinyl_memory: Option<u64>, + // pub vinyl_read_threads: Option<u8>, + // pub vinyl_page_size: Option<u64>, + // pub vinyl_bloom_fpr: Option<f64>, + + // pub replication_skip_conflict: Option<bool>, + // pub wal_queue_max_size: Option<u64>, + // pub replication_anon: Option<bool>, + // pub replication_sync_lag: Option<f64>, + // pub wal_max_size: Option<u64>, + // pub background: Option<bool>, + // pub feedback_send_metrics: Option<bool>, + // pub txn_isolation: Option<String>, + // pub replication_synchro_quorum: Option<String>, + // pub wal_mode: Option<String>, + // pub checkpoint_wal_threshold: Option<u64>, + // pub replication_sync_timeout: Option<f64>, + // pub readahead: Option<u64>, + + // pub feedback_host: Option<String>, + // pub feedback_metrics_collect_interval: Option<f64>, + // // FIXME TODO: + // pub metrics: Option<()>, + // pub feedback_crashinfo: Option<bool>, + // pub feedback_enabled: Option<bool>, + // pub feedback_interval: Option<f64>, + // pub feedback_metrics_limit: Option<u64>, + + // pub replication_connect_timeout: Option<f64>, + // pub replication_timeout: Option<f64>, + // pub auth_type: Option<String>, + // pub election_timeout: Option<f64>, + // pub election_fencing_mode: Option<String>, + // pub election_mode: Option<String>, + // pub wal_cleanup_delay: Option<f64>, + // pub vinyl_timeout: Option<f64>, + // pub bootstrap_strategy: Option<String>, + // pub worker_pool_threads: Option<u8>, + // pub txn_timeout: Option<f64>, + // pub slab_alloc_granularity: Option<u8>, + // pub replication_synchro_timeout: Option<f64>, + // pub hot_standby: Option<bool>, + // pub read_only: Option<bool>, + // pub replication_threads: Option<u8>, + // pub iproto_threads: Option<u8>, + // pub wal_dir_rescan_delay: Option<f64>, + // pub sql_cache_size: Option<u64>, +} + +pub fn deserialize_map_forbid_duplicate_keys<'de, D, K, V>( + des: D, +) -> Result<HashMap<K, V>, D::Error> +where + D: serde::Deserializer<'de>, + K: serde::Deserialize<'de> + std::hash::Hash + Eq + std::fmt::Display, + V: serde::Deserialize<'de>, +{ + use std::collections::hash_map::Entry; + use std::marker::PhantomData; + struct Visitor<K, V>(PhantomData<(K, V)>); + + impl<'de, K, V> serde::de::Visitor<'de> for Visitor<K, V> + where + K: serde::Deserialize<'de> + std::hash::Hash + Eq + std::fmt::Display, + V: serde::Deserialize<'de>, + { + type Value = HashMap<K, V>; + + fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result { + formatter.write_str("a map with unique keys") + } + + fn visit_map<A>(self, mut map: A) -> Result<Self::Value, A::Error> + where + A: serde::de::MapAccess<'de>, + { + let mut res = HashMap::with_capacity(map.size_hint().unwrap_or(16)); + while let Some((key, value)) = map.next_entry::<K, V>()? { + match res.entry(key) { + Entry::Occupied(e) => { + return Err(serde::de::Error::custom(format!( + "duplicate key `{}` found", + e.key() + ))); + } + Entry::Vacant(e) => { + e.insert(value); + } + } + } + + Ok(res) + } + } + + des.deserialize_map(Visitor::<K, V>(PhantomData)) +} + +//////////////////////////////////////////////////////////////////////////////// +// tests +//////////////////////////////////////////////////////////////////////////////// + +#[cfg(test)] +mod tests { + use super::*; + use crate::util::on_scope_exit; + use clap::Parser as _; + + #[test] + fn config_from_yaml() { + // + // Empty config is while useless still valid + // + let yaml = r###" +"###; + let config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + assert_eq!(config, PicodataConfig::default()); + + let yaml = r###" +cluster: + cluster_id: foobar + +instance: + instance_id: voter1 +"###; + + let config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + dbg!(config); + } + + #[test] + fn cluster_id_is_required() { + let yaml = r###" +cluster: + cluster_id: foo +instance: + cluster_id: bar +"###; + let config = PicodataConfig::read_yaml_contents(&yaml.trim()).unwrap(); + let err = config.validate_common().unwrap_err(); + assert_eq!(err.to_string(), "invalid configuration: `cluster.cluster_id` (foo) conflicts with `instance.cluster_id` (bar)") + } + + #[test] + fn spaces_in_addresses() { + let yaml = r###" +instance: + listen: kevin: <- spacey +"###; + let err = PicodataConfig::read_yaml_contents(&yaml.trim_start()).unwrap_err(); + #[rustfmt::skip] + assert_eq!(err.to_string(), "invalid configuration: mapping values are not allowed in this context at line 2 column 19"); + + let yaml = r###" +instance: + listen: kevin-> :spacey # <- some more trailing space +"###; + let config = PicodataConfig::read_yaml_contents(&yaml.trim_start()).unwrap(); + let listen = config.instance.listen.unwrap(); + assert_eq!(listen.host, "kevin-> "); + assert_eq!(listen.port, "spacey"); + + let yaml = r###" +instance: + listen: kevin-> <-spacey +"###; + let config = PicodataConfig::read_yaml_contents(&yaml.trim_start()).unwrap(); + let listen = config.instance.listen.unwrap(); + assert_eq!(listen.host, "kevin-> <-spacey"); + assert_eq!(listen.port, "3301"); + } + + #[rustfmt::skip] + #[test] + fn parameter_source_precedence() { + // Save environment before test, to avoid breaking something unrelated + let env_before: Vec<_> = std::env::vars() + .filter(|(k, _)| k.starts_with("PICODATA_")) + .collect(); + for (k, _) in &env_before { + std::env::remove_var(k); + } + let _guard = on_scope_exit(|| { + for (k, v) in env_before { + std::env::set_var(k, v); + } + }); + + // + // Defaults + // + { + let mut config = PicodataConfig::default(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!( + config.instance.peers(), + vec![Address { + user: None, + host: "localhost".into(), + port: "3301".into(), + }] + ); + assert_eq!(config.instance.instance_id(), None); + assert_eq!(config.instance.listen().to_host_port(), "localhost:3301"); + assert_eq!(config.instance.advertise_address().to_host_port(), "localhost:3301"); + assert_eq!(config.instance.log_level(), SayLevel::Info); + assert!(config.instance.failure_domain().data.is_empty()); + } + + // + // Precedence: command line > env > config + // + { + let yaml = r###" +instance: + instance_id: I-CONFIG +"###; + + // only config + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.instance_id().unwrap(), "I-CONFIG"); + + // env > config + std::env::set_var("PICODATA_INSTANCE_ID", "I-ENVIRON"); + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.instance_id().unwrap(), "I-ENVIRON"); + + // command line > env + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run", "--instance-id=I-COMMANDLINE"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.instance_id().unwrap(), "I-COMMANDLINE"); + } + + // + // peers parsing + // + { + // empty config means default + let yaml = r###" +instance: + peers: +"###; + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!( + config.instance.peers(), + vec![ + Address { + user: None, + host: "localhost".into(), + port: "3301".into(), + } + ] + ); + + // only config + let yaml = r###" +instance: + peers: + - bobbert:420 + - tomathan:69 +"###; + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!( + config.instance.peers(), + vec![ + Address { + user: None, + host: "bobbert".into(), + port: "420".into(), + }, + Address { + user: None, + host: "tomathan".into(), + port: "69".into(), + } + ] + ); + + // env > config + std::env::set_var("PICODATA_PEER", "oops there's a space over here -> <-:13, maybe we should at least strip these:37"); + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!( + config.instance.peers(), + vec![ + Address { + user: None, + host: "oops there's a space over here -> <-".into(), + port: "13".into(), + }, + Address { + user: None, + host: " maybe we should at least strip these".into(), + port: "37".into(), + } + ] + ); + + // command line > env + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run", + "--peer", "one:1", + "--peer", "two:2, <- same problem here,:3,4" + ]).unwrap(); + config.set_from_args(args); + + assert_eq!( + config.instance.peers(), + vec![ + Address { + user: None, + host: "one".into(), + port: "1".into(), + }, + Address { + user: None, + host: "two".into(), + port: "2".into(), + }, + Address { + user: None, + host: " <- same problem here".into(), + port: "3301".into(), + }, + Address { + user: None, + host: "localhost".into(), + port: "3".into(), + }, + Address { + user: None, + host: "4".into(), + port: "3301".into(), + } + ] + ); + } + + // + // Advertise = listen unless specified explicitly + // + { + std::env::set_var("PICODATA_LISTEN", "L-ENVIRON"); + let mut config = PicodataConfig::read_yaml_contents("").unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.listen().to_host_port(), "L-ENVIRON:3301"); + assert_eq!(config.instance.advertise_address().to_host_port(), "L-ENVIRON:3301"); + + let yaml = r###" +instance: + advertise_address: A-CONFIG +"###; + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.listen().to_host_port(), "L-ENVIRON:3301"); + assert_eq!(config.instance.advertise_address().to_host_port(), "A-CONFIG:3301"); + + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run", "-l", "L-COMMANDLINE"]).unwrap(); + config.set_from_args(args); + + assert_eq!(config.instance.listen().to_host_port(), "L-COMMANDLINE:3301"); + assert_eq!(config.instance.advertise_address().to_host_port(), "A-CONFIG:3301"); + } + + // + // Failure domain is parsed correctly + // + { + // FIXME: duplicate keys in failure-domain should be a hard error + // config + let yaml = r###" +instance: + failure_domain: + kconf1: vconf1 + kconf2: vconf2 + kconf2: vconf2-replaced +"###; + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + assert_eq!( + config.instance.failure_domain(), + FailureDomain::from([("KCONF1", "VCONF1"), ("KCONF2", "VCONF2-REPLACED")]) + ); + + // environment + std::env::set_var("PICODATA_FAILURE_DOMAIN", "kenv1=venv1,kenv2=venv2,kenv2=venv2-replaced"); + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run"]).unwrap(); + config.set_from_args(args); + assert_eq!( + config.instance.failure_domain(), + FailureDomain::from([("KENV1", "VENV1"), ("KENV2", "VENV2-REPLACED")]) + ); + + // command line + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run", "--failure-domain", "karg1=varg1,karg1=varg1-replaced"]).unwrap(); + config.set_from_args(args); + assert_eq!( + config.instance.failure_domain(), + FailureDomain::from([("KARG1", "VARG1-REPLACED")]) + ); + + // command line more complex + let mut config = PicodataConfig::read_yaml_contents(&yaml).unwrap(); + let args = args::Run::try_parse_from(["run", + "--failure-domain", "foo=1", + "--failure-domain", "bar=2,baz=3" + ]).unwrap(); + config.set_from_args(args); + assert_eq!( + config.instance.failure_domain(), + FailureDomain::from([ + ("FOO", "1"), + ("BAR", "2"), + ("BAZ", "3"), + ]) + ); + } + } +} diff --git a/src/failure_domain.rs b/src/failure_domain.rs index 6cc9809935..22ca88ec0f 100644 --- a/src/failure_domain.rs +++ b/src/failure_domain.rs @@ -2,6 +2,7 @@ use crate::stringify_debug; use crate::traft::Distance; use crate::util::Uppercase; use std::collections::{HashMap, HashSet}; +use tarantool::tlua; /// Failure domain of a given instance. /// @@ -31,7 +32,9 @@ use std::collections::{HashMap, HashSet}; /// Failure domains are case-insensitive. Components are converted to /// upprcase implicitly. /// -#[derive(Default, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)] +#[derive( + Default, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize, tlua::Push, tlua::PushInto, +)] pub struct FailureDomain { #[serde(flatten)] pub data: HashMap<Uppercase, Uppercase>, diff --git a/src/lib.rs b/src/lib.rs index b5f3c6ca20..1a45d4bb50 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,7 +22,6 @@ use traft::RaftSpaceAccess; use crate::access_control::user_by_id; use crate::address::Address; -use crate::cli::args; use crate::cli::init_cfg::InitCfg; use crate::instance::Grade; use crate::instance::GradeVariant::*; @@ -33,7 +32,8 @@ use crate::schema::PICO_SERVICE_USER_NAME; use crate::tier::{Tier, DEFAULT_TIER}; use crate::traft::error::Error; use crate::traft::op; -use crate::util::{effective_user_id, listen_admin_console}; +use crate::util::effective_user_id; +use config::PicodataConfig; mod access_control; pub mod address; @@ -41,6 +41,7 @@ pub mod audit; mod bootstrap_entries; pub mod cas; pub mod cli; +pub mod config; pub mod discovery; pub mod error_injection; pub mod failure_domain; @@ -507,17 +508,17 @@ pub enum Entrypoint { impl Entrypoint { pub fn exec( self, - args: cli::args::Run, + config: PicodataConfig, to_supervisor: ipc::Sender<IpcMessage>, ) -> Result<(), Error> { - if let Some(filename) = &args.service_password_file { + if let Some(filename) = &config.instance.service_password_file { pico_service::read_pico_service_password_from_file(filename)?; } match self { - Self::StartDiscover => start_discover(&args, to_supervisor)?, - Self::StartBoot => start_boot(&args)?, - Self::StartJoin { leader_address } => start_join(&args, leader_address)?, + Self::StartDiscover => start_discover(&config, to_supervisor)?, + Self::StartBoot => start_boot(&config)?, + Self::StartJoin { leader_address } => start_join(&config, leader_address)?, } Ok(()) @@ -538,15 +539,15 @@ pub struct IpcMessage { /// - `start_boot` /// - `start_join` /// -fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> (Clusterwide, RaftSpaceAccess) { - std::fs::create_dir_all(&args.data_dir).unwrap(); +fn init_common(config: &PicodataConfig, cfg: &tarantool::Cfg) -> (Clusterwide, RaftSpaceAccess) { + std::fs::create_dir_all(config.instance.data_dir()).unwrap(); // See doc comments in tlog.rs for explanation. tlog::set_core_logger_is_initialized(); tarantool::set_cfg(cfg); - if args.shredding { + if config.instance.shredding() { tarantool::xlog_set_remove_file_impl(); } @@ -579,10 +580,13 @@ fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> (Clusterwide, RaftSpac (storage.clone(), raft_storage) } -fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) -> Result<(), Error> { +fn start_discover( + config: &PicodataConfig, + to_supervisor: ipc::Sender<IpcMessage>, +) -> Result<(), Error> { tlog!(Info, "entering discovery phase"); - luamod::setup(args); + luamod::setup(config); assert!(tarantool::cfg().is_none()); // Don't try to guess instance and replicaset uuids now, @@ -590,29 +594,25 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) -> R let mut cfg = tarantool::Cfg { listen: None, read_only: false, - log: args.log.clone(), - wal_dir: args.data_dir.clone(), - memtx_dir: args.data_dir.clone(), - vinyl_dir: args.data_dir.clone(), - log_level: args.log_level() as u8, - memtx_memory: args.memtx_memory, + log: config.instance.log.clone(), + wal_dir: config.instance.data_dir(), + memtx_dir: config.instance.data_dir(), + vinyl_dir: config.instance.data_dir(), + log_level: config.instance.log_level() as u8, + memtx_memory: config.instance.memtx_memory(), ..Default::default() }; - let (storage, raft_storage) = init_common(args, &cfg); - discovery::init_global( - args.peers - .iter() - .map(|Address { host, port, .. }| format!("{host}:{port}")), - ); + let (storage, raft_storage) = init_common(config, &cfg); + discovery::init_global(config.instance.peers().iter().map(|a| a.to_host_port())); - cfg.listen = Some(format!("{}:{}", args.listen.host, args.listen.port)); + cfg.listen = Some(config.instance.listen().to_host_port()); tarantool::set_cfg(&cfg); // TODO assert traft::Storage::instance_id == (null || args.instance_id) if raft_storage.raft_id().unwrap().is_some() { tarantool::set_cfg_field("read_only", true).unwrap(); - return postjoin(args, storage, raft_storage); + return postjoin(config, storage, raft_storage); } let role = discovery::wait_global(); @@ -640,10 +640,10 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) -> R } } -fn start_boot(args: &args::Run) -> Result<(), Error> { +fn start_boot(config: &PicodataConfig) -> Result<(), Error> { tlog!(Info, "entering cluster bootstrap phase"); - let init_cfg = match &args.init_cfg { + let init_cfg = match &config.cluster.init_cfg { Some(path) => InitCfg::try_from_yaml_file(path).map_err(Error::other)?, None => { tlog!(Info, "init-cfg wasn't set"); @@ -651,61 +651,63 @@ fn start_boot(args: &args::Run) -> Result<(), Error> { Info, "filling init-cfg with default tier `{}` using replication-factor={}", DEFAULT_TIER, - args.init_replication_factor + config.cluster.default_replication_factor() ); - let tier = vec![Tier::with_replication_factor(args.init_replication_factor)]; + let tier = vec![Tier::with_replication_factor( + config.cluster.default_replication_factor(), + )]; InitCfg { tier } } }; let tiers = init_cfg.tier; - let Some(current_instance_tier) = tiers.iter().find(|tier| tier.name == args.tier).cloned() + let my_tier_name = config.instance.tier(); + let Some(current_instance_tier) = tiers.iter().find(|tier| tier.name == my_tier_name).cloned() else { return Err(Error::other(format!( - "tier '{}' for current instance is not found in init-cfg", - args.tier + "tier '{my_tier_name}' for current instance is not found in init-cfg", ))); }; let instance = Instance::new( None, - args.instance_id.clone(), - args.replicaset_id.clone(), + config.instance.instance_id.clone(), + config.instance.replicaset_id.clone(), Grade::new(Offline, 0), Grade::new(Offline, 0), - args.failure_domain(), + config.instance.failure_domain(), ¤t_instance_tier.name, ); let raft_id = instance.raft_id; let instance_id = instance.instance_id.clone(); - luamod::setup(args); + luamod::setup(config); assert!(tarantool::cfg().is_none()); let cfg = tarantool::Cfg { listen: None, read_only: false, - log: args.log.clone(), + log: config.instance.log.clone(), instance_uuid: Some(instance.instance_uuid.clone()), replicaset_uuid: Some(instance.replicaset_uuid.clone()), - wal_dir: args.data_dir.clone(), - memtx_dir: args.data_dir.clone(), - vinyl_dir: args.data_dir.clone(), - log_level: args.log_level() as u8, - memtx_memory: args.memtx_memory, + wal_dir: config.instance.data_dir(), + memtx_dir: config.instance.data_dir(), + vinyl_dir: config.instance.data_dir(), + log_level: config.instance.log_level() as u8, + memtx_memory: config.instance.memtx_memory(), ..Default::default() }; - let (storage, raft_storage) = init_common(args, &cfg); + let (storage, raft_storage) = init_common(config, &cfg); let cs = raft::ConfState { voters: vec![raft_id], ..Default::default() }; - let bootstrap_entries = bootstrap_entries::prepare(args, &instance, &tiers); + let bootstrap_entries = bootstrap_entries::prepare(config, &instance, &tiers); let hs = raft::HardState { term: traft::INIT_RAFT_TERM, @@ -719,7 +721,9 @@ fn start_boot(args: &args::Run) -> Result<(), Error> { raft_storage .persist_tier(¤t_instance_tier.name) .unwrap(); - raft_storage.persist_cluster_id(&args.cluster_id).unwrap(); + raft_storage + .persist_cluster_id(config.cluster_id()) + .unwrap(); raft_storage.persist_entries(&bootstrap_entries).unwrap(); raft_storage.persist_conf_state(&cs).unwrap(); raft_storage.persist_hard_state(&hs).unwrap(); @@ -727,9 +731,9 @@ fn start_boot(args: &args::Run) -> Result<(), Error> { }) .unwrap(); - postjoin(args, storage, raft_storage)?; + postjoin(config, storage, raft_storage)?; - let db_name = &args.cluster_id; + let db_name = config.cluster_id(); let instance_id = instance.instance_id.as_ref(); crate::audit!( message: "a new database `{db_name}` was created", @@ -743,16 +747,16 @@ fn start_boot(args: &args::Run) -> Result<(), Error> { Ok(()) } -fn start_join(args: &args::Run, instance_address: String) -> Result<(), Error> { - tlog!(Info, ">>>>> start_join({instance_address})"); +fn start_join(config: &PicodataConfig, instance_address: String) -> Result<(), Error> { + tlog!(Info, "joining cluster, peer address: {instance_address}"); let req = join::Request { - cluster_id: args.cluster_id.clone(), - instance_id: args.instance_id.clone(), - replicaset_id: args.replicaset_id.clone(), - advertise_address: args.advertise_address(), - failure_domain: args.failure_domain(), - tier: args.tier.clone(), + cluster_id: config.cluster_id().into(), + instance_id: config.instance.instance_id().map(From::from), + replicaset_id: config.instance.replicaset_id().map(From::from), + advertise_address: config.instance.advertise_address().to_host_port(), + failure_domain: config.instance.failure_domain(), + tier: config.instance.tier().into(), }; // Arch memo. @@ -787,7 +791,7 @@ fn start_join(args: &args::Run, instance_address: String) -> Result<(), Error> { } }; - luamod::setup(args); + luamod::setup(config); assert!(tarantool::cfg().is_none()); let mut replication_cfg = Vec::with_capacity(resp.box_replication.len()); @@ -796,21 +800,21 @@ fn start_join(args: &args::Run, instance_address: String) -> Result<(), Error> { } let cfg = tarantool::Cfg { - listen: Some(format!("{}:{}", args.listen.host, args.listen.port)), + listen: Some(config.instance.listen().to_host_port()), read_only: resp.box_replication.len() > 1, - log: args.log.clone(), + log: config.instance.log.clone(), instance_uuid: Some(resp.instance.instance_uuid.clone()), replicaset_uuid: Some(resp.instance.replicaset_uuid.clone()), replication: replication_cfg, - wal_dir: args.data_dir.clone(), - memtx_dir: args.data_dir.clone(), - vinyl_dir: args.data_dir.clone(), - log_level: args.log_level() as u8, - memtx_memory: args.memtx_memory, + wal_dir: config.instance.data_dir(), + memtx_dir: config.instance.data_dir(), + vinyl_dir: config.instance.data_dir(), + log_level: config.instance.log_level() as u8, + memtx_memory: config.instance.memtx_memory(), ..Default::default() }; - let (storage, raft_storage) = init_common(args, &cfg); + let (storage, raft_storage) = init_common(config, &cfg); let raft_id = resp.instance.raft_id; transaction(|| -> Result<(), TntError> { @@ -822,29 +826,33 @@ fn start_join(args: &args::Run, instance_address: String) -> Result<(), Error> { raft_storage .persist_instance_id(&resp.instance.instance_id) .unwrap(); - raft_storage.persist_cluster_id(&args.cluster_id).unwrap(); - raft_storage.persist_tier(&args.tier).unwrap(); + raft_storage + .persist_cluster_id(config.cluster_id()) + .unwrap(); + raft_storage.persist_tier(config.instance.tier()).unwrap(); Ok(()) }) .unwrap(); - postjoin(args, storage, raft_storage) + postjoin(config, storage, raft_storage) } fn postjoin( - args: &args::Run, + config: &PicodataConfig, storage: Clusterwide, raft_storage: RaftSpaceAccess, ) -> Result<(), Error> { tlog!(Info, "entering post-join phase"); - if let Some(config) = &args.audit { + if let Some(config) = &config.instance.audit { audit::init(config, &raft_storage); } - PluginList::global_init(&args.plugins); + if let Some(plugins) = &config.instance.plugins { + PluginList::global_init(plugins); + } - if let Some(addr) = &args.http_listen { + if let Some(addr) = &config.instance.http_listen { start_http_server(addr); if cfg!(feature = "webui") { tlog!(Info, "Web UI is enabled"); @@ -855,7 +863,7 @@ fn postjoin( start_webui(); } // Execute postjoin script if present - if let Some(ref script) = args.script { + if let Some(ref script) = config.instance.deprecated_script { let l = ::tarantool::lua_state(); l.exec_with("dofile(...)", script) .unwrap_or_else(|err| panic!("failed to execute postjoin script: {err}")) @@ -882,10 +890,13 @@ fn postjoin( assert!(node.status().raft_state.is_leader()); } - box_cfg.listen = Some(format!("{}:{}", args.listen.host, args.listen.port)); + box_cfg.listen = Some(config.instance.listen().to_host_port()); tarantool::set_cfg(&box_cfg); - listen_admin_console(args)?; + // Start admin console + let socket_uri = util::validate_and_complete_unix_socket_path(&config.instance.admin_socket())?; + let lua = ::tarantool::lua_state(); + lua.exec_with(r#"require('console').listen(...)"#, &socket_uri)?; if let Err(e) = tarantool::on_shutdown(move || fiber::block_on(on_shutdown::callback(PluginList::get()))) @@ -934,7 +945,7 @@ fn postjoin( ); let req = update_instance::Request::new(instance.instance_id, cluster_id) .with_target_grade(Online) - .with_failure_domain(args.failure_domain()); + .with_failure_domain(config.instance.failure_domain()); let now = Instant::now(); let fut = rpc::network_call(&leader_address, &req).timeout(activation_deadline - now); match fiber::block_on(fut) { diff --git a/src/luamod.rs b/src/luamod.rs index a21d408135..012deefb8e 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -1,9 +1,8 @@ //! Lua API exported as `_G.pico` //! -use std::time::Duration; - use crate::cas::{self, compare_and_swap}; +use crate::config::PicodataConfig; use crate::instance::InstanceId; use crate::schema::{self, CreateTableParams, ADMIN_ID}; use crate::traft::error::Error; @@ -12,7 +11,7 @@ use crate::traft::{self, node, RaftIndex, RaftTerm}; use crate::util::str_eq; use crate::util::INFINITY; use crate::util::{duration_from_secs_f64_clamped, effective_user_id}; -use crate::{args, rpc, sync, tlog}; +use crate::{rpc, sync, tlog}; use ::tarantool::fiber; use ::tarantool::session; use ::tarantool::tlua; @@ -21,6 +20,7 @@ use ::tarantool::transaction::transaction; use ::tarantool::tuple::Decode; use ::tarantool::vclock::Vclock; use indoc::indoc; +use std::time::Duration; #[inline(always)] fn luamod_set<V>(l: &LuaThread, name: &str, help: &str, value: V) @@ -41,7 +41,7 @@ fn luamod_set_help_only(l: &LuaThread, name: &str, help: &str) { help_table.set(name, help); } -pub(crate) fn setup(args: &args::Run) { +pub(crate) fn setup(config: &PicodataConfig) { let l = ::tarantool::lua_state(); l.exec(include_str!("luamod.lua")).unwrap(); @@ -95,29 +95,43 @@ pub(crate) fn setup(args: &args::Run) { "3.1.0", ); + let config = config.clone(); luamod_set( &l, - "args", + "config", indoc! {" - pico.args + pico.get_config() ========= - A Lua table (not a function) containing the command-line arguments - specified at the instance startup. The content of the table is not - strictly defined and may depend on circumstances. + Returns a Lua table containing picodata configuration which is + gathered from the configuration file, environment variables and/or + command line arguments. + + The content of the table is not strictly defined and may depend on circumstances. + + Returns: + + (table) Example: - picodata> pico.args + picodata> pico.get_config() --- - - log_level: info - listen: localhost:3301 - data_dir: . - peers: - - localhost:3301 + - cluster: + cluster_id: demo + - instance: + log_level: info + listen: localhost:3301 + data_dir: . + peers: + - localhost:3301 ... "}, - args, + tlua::Function::new(move || -> PicodataConfig { + // FIXME: currently it only contains explicitly specified parameters, + // but default parameters are omitted + config.clone() + }), ); luamod_set( diff --git a/src/traft/error.rs b/src/traft/error.rs index 68e48a05ac..85fd9dcf7b 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -77,6 +77,9 @@ pub enum Error { #[error("storage corrupted: failed to decode field '{field}' from table '{table}'")] StorageCorrupted { table: String, field: String }, + #[error("invalid configuration: {0}")] + InvalidConfiguration(String), + #[error("{0}")] Other(Box<dyn std::error::Error>), } @@ -90,6 +93,11 @@ impl Error { Self::Other(error.into()) } + #[inline(always)] + pub fn invalid_configuration(msg: impl ToString) -> Self { + Self::InvalidConfiguration(msg.to_string()) + } + /// Temporary solution until proc_cas returns structured errors #[inline(always)] pub fn is_cas_err(&self) -> bool { diff --git a/src/util.rs b/src/util.rs index 2b69578896..8c69779fbf 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,4 +1,3 @@ -use crate::cli::args; use crate::traft::error::Error; use nix::sys::termios::{tcgetattr, tcsetattr, LocalFlags, SetArg::TCSADRAIN}; use std::any::{Any, TypeId}; @@ -498,13 +497,11 @@ pub fn prompt_password(prompt: &str) -> Result<String, std::io::Error> { } //////////////////////////////////////////////////////////////////////////////// -/// Validate unix socket uri via lua uri module +/// Returns a unix socket uri from the given file path. /// -/// Doesn't change path in case of absolute path. -/// To relative path `./` prepended. +/// Non-absolute paths are prepended with `./`. /// -/// Return None in case of incorrect path -/// Return Some(`value`) with `unix/:` and, probably, `./` prepended to `value` +/// Returns and error in case validation using lua `uri` module fails. pub fn validate_and_complete_unix_socket_path(socket_path: &str) -> Result<String, Error> { let l = ::tarantool::lua_state(); let path = std::path::Path::new(socket_path); @@ -525,20 +522,6 @@ pub fn validate_and_complete_unix_socket_path(socket_path: &str) -> Result<Strin Ok(console_sock) } -//////////////////////////////////////////////////////////////////////////////// -/// Starts admin console. -/// -/// Returns Err in case of problems with socket path. -pub fn listen_admin_console(args: &args::Run) -> Result<(), Error> { - let lua = ::tarantool::lua_state(); - - let validated_path = validate_and_complete_unix_socket_path(&args.admin_sock())?; - - lua.exec_with(r#"require('console').listen(...)"#, &validated_path)?; - - Ok(()) -} - //////////////////////////////////////////////////////////////////////////////// /// Like unwrap(), but instead of a panic it logs /// the error in picodata format and calls exit() @@ -746,6 +729,11 @@ where // ... //////////////////////////////////////////////////////////////////////////////// +#[inline(always)] +pub fn file_exists(path: &str) -> bool { + std::fs::metadata(path).is_ok() +} + #[inline] pub(crate) fn effective_user_id() -> UserId { session::euid().expect("infallible in picodata") diff --git a/test/int/test_config_file.py b/test/int/test_config_file.py index 4db9b01203..24bf78c7ba 100644 --- a/test/int/test_config_file.py +++ b/test/int/test_config_file.py @@ -1,6 +1,29 @@ from conftest import Cluster, log_crawler +def test_config_works(cluster: Cluster): + instance = cluster.add_instance(instance_id=False, wait_online=False) + config_path = cluster.data_dir + "/config.yaml" + with open(config_path, "w") as f: + f.write( + """ +instance: + instance-id: from-config + replicaset-id: with-love + memtx-memory: 42069 + """ + ) + instance.env["PICODATA_CONFIG_FILE"] = config_path + instance.start() + instance.wait_online() + + info = instance.call(".proc_instance_info") + assert info["instance_id"] == "from-config" + assert info["replicaset_id"] == "with-love" + + assert instance.eval("return box.cfg.memtx_memory") == 42069 + + def test_run_init_cfg_enoent(cluster: Cluster): i1 = cluster.add_instance(wait_online=False) i1.env.update({"PICODATA_INIT_CFG": "./unexisting_dir/trash.yaml"}) -- GitLab