From 0fa0daf9558ac97b91a2b603708f10ee1f9344dd Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 22 Oct 2024 18:42:08 +0300 Subject: [PATCH] refactor: cleanup build_instance code --- src/instance.rs | 85 +++++++++++---------- src/rpc/join.rs | 192 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 174 insertions(+), 103 deletions(-) diff --git a/src/instance.rs b/src/instance.rs index 3abb2ab81c..95ae2b1947 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -154,20 +154,26 @@ mod tests { storage.tiers.put(&tier) } - fn add_instance(storage: &Clusterwide, raft_id: RaftId, instance_name: &str, replicaset_name: &str, state: &State) -> tarantool::Result<Instance> { - let instance = Instance { + fn dummy_instance(raft_id: RaftId, name: &str, replicaset_name: &str, state: &State) -> Instance { + Instance { raft_id, - name: instance_name.into(), - uuid: format!("{instance_name}-uuid"), + name: name.into(), + uuid: format!("{name}-uuid"), replicaset_name: replicaset_name.into(), replicaset_uuid: format!("{replicaset_name}-uuid"), current_state: *state, target_state: *state, failure_domain: FailureDomain::default(), tier: DEFAULT_TIER.into(), - }; - storage.instances.put(&instance)?; - Ok(instance) + } + } + + fn add_instance(storage: &Clusterwide, instance: &Instance) -> tarantool::Result<()> { + storage.instances.put(instance)?; + // Ignore error in case replicaset already exists. Good enough for tests + _ = storage.replicasets.put(&Replicaset::with_one_instance(instance)); + + Ok(()) } fn replication_names(replicaset_name: &ReplicasetName, storage: &Clusterwide) -> HashSet<RaftId> { @@ -191,33 +197,33 @@ mod tests { assert_eq!(i1.target_state, State::new(Offline, 0)); assert_eq!(i1.failure_domain, FailureDomain::default()); assert_eq!(i1.tier, DEFAULT_TIER); - storage.instances.put(&i1).unwrap(); + add_instance(&storage, &i1).unwrap(); let i2 = build_instance(None, None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); assert_eq!(i2.raft_id, 2); assert_eq!(i2.name, "i2"); assert_eq!(i2.replicaset_name, "r2"); - storage.instances.put(&i2).unwrap(); + add_instance(&storage, &i2).unwrap(); let i3 = build_instance(None, Some(&ReplicasetName::from("R3")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); assert_eq!(i3.raft_id, 3); assert_eq!(i3.name, "i3"); assert_eq!(i3.replicaset_name, "R3"); - storage.instances.put(&i3).unwrap(); + add_instance(&storage, &i3).unwrap(); let i4 = build_instance(Some(&InstanceName::from("I4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); assert_eq!(i4.raft_id, 4); assert_eq!(i4.name, "I4"); assert_eq!(i4.replicaset_name, "r3"); - storage.instances.put(&i4).unwrap(); + add_instance(&storage, &i4).unwrap(); } #[::tarantool::test] fn test_override() { let storage = Clusterwide::for_tests(); add_tier(&storage, DEFAULT_TIER, 2, true).unwrap(); - add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap(); - add_instance(&storage, 2, "i2", "r2-original", &State::new(Expelled, 0)).unwrap(); + add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap(); + add_instance(&storage, &dummy_instance(2, "i2", "r2-original", &State::new(Expelled, 0))).unwrap(); // join::Request with a given instance_name online. // - It must be an impostor, return an error. @@ -262,8 +268,8 @@ mod tests { fn test_instance_name_collision() { let storage = Clusterwide::for_tests(); add_tier(&storage, DEFAULT_TIER, 2, true).unwrap(); - add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap(); - add_instance(&storage, 2, "i3", "r3", &State::new(Online, 1)).unwrap(); + add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap(); + add_instance(&storage, &dummy_instance(2, "i3", "r3", &State::new(Online, 1))).unwrap(); // Attention: i3 has raft_id=2 let instance = build_instance(None, Some(&ReplicasetName::from("r2")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); @@ -288,15 +294,14 @@ mod tests { fn test_replication_factor() { let storage = Clusterwide::for_tests(); add_tier(&storage, DEFAULT_TIER, 2, true).unwrap(); - add_instance(&storage, 9, "i9", "r9", &State::new(Online, 1)).unwrap(); - add_instance(&storage, 10, "i10", "r9", &State::new(Online, 1)).unwrap(); + add_instance(&storage, &dummy_instance(9, "i9", "r9", &State::new(Online, 1))).unwrap(); + add_instance(&storage, &dummy_instance(10, "i10", "r9", &State::new(Online, 1))).unwrap(); let i1 = build_instance(Some(&InstanceName::from("i1")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); assert_eq!(i1.raft_id, 11); assert_eq!(i1.name, "i1"); assert_eq!(i1.replicaset_name, "r1"); - storage.instances.put(&i1).unwrap(); - storage.replicasets.put(&Replicaset::with_one_instance(&i1)).unwrap(); + add_instance(&storage, &i1).unwrap(); assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11])); @@ -305,15 +310,14 @@ mod tests { assert_eq!(i2.name, "i2"); assert_eq!(i2.replicaset_name, "r1"); assert_eq!(i2.replicaset_uuid, i1.replicaset_uuid); - storage.instances.put(&i2).unwrap(); + add_instance(&storage, &i2).unwrap(); assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11, 12])); let i3 = build_instance(Some(&InstanceName::from("i3")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); assert_eq!(i3.raft_id, 13); assert_eq!(i3.name, "i3"); assert_eq!(i3.replicaset_name, "r2"); - storage.instances.put(&i3).unwrap(); - storage.replicasets.put(&Replicaset::with_one_instance(&i3)).unwrap(); + add_instance(&storage, &i3).unwrap(); assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13])); let i4 = build_instance(Some(&InstanceName::from("i4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap(); @@ -321,7 +325,7 @@ mod tests { assert_eq!(i4.name, "i4"); assert_eq!(i4.replicaset_name, "r2"); assert_eq!(i4.replicaset_uuid, i3.replicaset_uuid); - storage.instances.put(&i4).unwrap(); + add_instance(&storage, &i4).unwrap(); assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13, 14])); } @@ -339,7 +343,8 @@ mod tests { fn test_update_state() { let storage = Clusterwide::for_tests(); add_tier(&storage, DEFAULT_TIER, 1, true).unwrap(); - let instance = add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap(); + let instance = dummy_instance(1, "i1", "r1", &State::new(Online, 1)); + add_instance(&storage, &instance).unwrap(); let existing_fds = HashSet::new(); // @@ -459,31 +464,31 @@ mod tests { build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r1"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r2"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Mars}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r1"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Earth, os: BSD}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r3"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Mars, os: BSD}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r2"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let e = build_instance(None, None, &faildoms! {os: Arch}, &storage, DEFAULT_TIER).unwrap_err(); assert_eq!(e.to_string(), "missing failure domain names: PLANET"); @@ -492,19 +497,19 @@ mod tests { build_instance(None, None, &faildoms! {planet: Venus, os: Arch}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r1"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Venus, os: Mac}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r2"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Mars, os: Mac}, &storage, DEFAULT_TIER) .unwrap(); assert_eq!(instance.replicaset_name, "r3"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let e = build_instance(None, None, &faildoms! {}, &storage, DEFAULT_TIER).unwrap_err(); assert_eq!(e.to_string(), "missing failure domain names: OS, PLANET"); @@ -519,7 +524,7 @@ mod tests { // first instance // let instance1 = build_instance(Some(&InstanceName::from("i1")), None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER).unwrap(); - storage.instances.put(&instance1).unwrap(); + add_instance(&storage, &instance1).unwrap(); let existing_fds = storage.instances.failure_domain_names().unwrap(); assert_eq!(instance1.failure_domain, faildoms! {planet: Earth}); assert_eq!(instance1.replicaset_name, "r1"); @@ -560,7 +565,7 @@ mod tests { let fd = faildoms! {planet: Mars, owner: Mike}; #[rustfmt::skip] let instance2 = build_instance(Some(&InstanceName::from("i2")), None, &fd, &storage, DEFAULT_TIER).unwrap(); - storage.instances.put(&instance2).unwrap(); + add_instance(&storage, &instance2).unwrap(); let existing_fds = storage.instances.failure_domain_names().unwrap(); assert_eq!(instance2.failure_domain, fd); // doesn't fit into r1 @@ -587,7 +592,7 @@ mod tests { #[rustfmt::skip] let instance3_v1 = build_instance(Some(&InstanceName::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage, DEFAULT_TIER) .unwrap(); - storage.instances.put(&instance3_v1).unwrap(); + add_instance(&storage, &instance3_v1).unwrap(); assert_eq!( instance3_v1.failure_domain, faildoms! {planet: B, owner: V, dimension: C137} @@ -618,31 +623,31 @@ mod tests { build_instance(None, None, &faildoms! {planet: Earth}, &storage, first_tier) .unwrap(); assert_eq!(instance.replicaset_name, "r1"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Mars}, &storage, second_tier) .unwrap(); assert_eq!(instance.replicaset_name, "r2"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Mars}, &storage, first_tier) .unwrap(); assert_eq!(instance.replicaset_name, "r1"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Pluto}, &storage, third_tier) .unwrap(); assert_eq!(instance.replicaset_name, "r3"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let instance = build_instance(None, None, &faildoms! {planet: Venus}, &storage, third_tier) .unwrap(); assert_eq!(instance.replicaset_name, "r3"); - storage.instances.put(&instance).unwrap(); + add_instance(&storage, &instance).unwrap(); let e = build_instance(None, None, &faildoms! {planet: 5}, &storage, "noexistent_tier").unwrap_err(); assert_eq!(e.to_string(), r#"tier "noexistent_tier" doesn't exist"#); diff --git a/src/rpc/join.rs b/src/rpc/join.rs index e042158122..d2a41b1607 100644 --- a/src/rpc/join.rs +++ b/src/rpc/join.rs @@ -1,6 +1,3 @@ -use std::collections::BTreeMap; -use std::time::Duration; - use crate::cas; use crate::failure_domain::FailureDomain; use crate::has_states; @@ -16,7 +13,8 @@ use crate::tier::Tier; use crate::traft::op::{Dml, Op}; use crate::traft::{self, RaftId}; use crate::traft::{error::Error, node, Address, PeerAddress, Result}; - +use std::collections::HashSet; +use std::time::Duration; use tarantool::fiber; const TIMEOUT: Duration = Duration::from_secs(10); @@ -154,14 +152,30 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R } pub fn build_instance( - instance_name: Option<&InstanceName>, - replicaset_name: Option<&ReplicasetName>, + requested_instance_name: Option<&InstanceName>, + requested_replicaset_name: Option<&ReplicasetName>, failure_domain: &FailureDomain, storage: &Clusterwide, tier: &str, ) -> Result<Instance> { - if let Some(id) = instance_name { - if let Ok(existing_instance) = storage.instances.get(id) { + // NOTE: currently we don't ever remove entries from `_pico_instance` even + // when expelling instances. This makes it so we can get a unique raft_id by + // selecting max raft_id from _pico_instance and adding one. However in the + // future we may want to start deleting old instance records and at that + // point we may face a problem of this id not being unique (i.e. belonging + // to an instance). There doesn't seem to be any problems with this per se, + // as raft will not allow there to be a simultaneous raft_id conflict, but + // it's just a thing to look out for. + let raft_id = storage + .instances + .max_raft_id() + .expect("storage should not fail") + + 1; + + // Resolve instance_name + let instance_name; + if let Some(name) = requested_instance_name { + if let Ok(existing_instance) = storage.instances.get(name) { let is_expelled = has_states!(existing_instance, Expelled -> *); if is_expelled { // The instance was expelled explicitly, it's ok to replace it @@ -173,10 +187,15 @@ pub fn build_instance( // joined it has both states Offline, which means it may be // replaced by another one of the name before it sends a request // for self activation. - return Err(Error::other(format!("`{id}` is already joined"))); + return Err(Error::other(format!("`{name}` is already joined"))); } } + instance_name = name.clone(); + } else { + instance_name = choose_instance_name(raft_id, storage); } + + // Check tier exists let Some(tier) = storage .tiers .by_name(tier) @@ -185,41 +204,55 @@ pub fn build_instance( return Err(Error::other(format!(r#"tier "{tier}" doesn't exist"#))); }; + // Check failure domain constraints let existing_fds = storage .instances .failure_domain_names() .expect("storage should not fail"); failure_domain.check(&existing_fds)?; - // Anyway, `join` always produces a new raft_id. - let raft_id = storage - .instances - .max_raft_id() - .expect("storage should not fail") - + 1; - let instance_name = instance_name - .cloned() - .unwrap_or_else(|| choose_instance_name(raft_id, storage)); - let replicaset_name = match replicaset_name { - Some(replicaset_name) => - // FIXME: must make sure the replicaset is not Expelled or ToBeExpelled - { - replicaset_name.clone() - } - None => choose_replicaset_name(failure_domain, storage, &tier)?, - }; - - let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string(); + // + // Resolve replicaset + // + let replicaset_name; let replicaset_uuid; - if let Some(replicaset) = storage.replicasets.get(&replicaset_name)? { - if replicaset.tier != tier.name { - return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {replicaset_name} is from tier: '{}'", tier.name, replicaset.tier))); + if let Some(requested_replicaset_name) = requested_replicaset_name { + let replicaset = storage.replicasets.get(requested_replicaset_name)?; + if let Some(replicaset) = replicaset { + if replicaset.tier != tier.name { + return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {requested_replicaset_name} is from tier: '{}'", tier.name, replicaset.tier))); + } + + // FIXME: must make sure the replicaset is not Expelled or ToBeExpelled + // FIXME: must make sure the replicaset's tier is correct + + // Join instance to existing replicaset + replicaset_name = requested_replicaset_name.clone(); + replicaset_uuid = replicaset.uuid; + } else { + // Create a new replicaset + replicaset_name = requested_replicaset_name.clone(); + replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string(); } - replicaset_uuid = replicaset.uuid; } else { - replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string(); + let res = choose_replicaset(failure_domain, storage, &tier)?; + match res { + Ok(replicaset) => { + // Join instance to existing replicaset + replicaset_name = replicaset.name; + replicaset_uuid = replicaset.uuid; + } + Err(new_replicaset_name) => { + // Create a new replicaset + replicaset_name = new_replicaset_name; + replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string(); + } + } } + // Generate a unique instance_uuid + let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string(); + Ok(Instance { raft_id, name: instance_name, @@ -256,57 +289,90 @@ fn choose_instance_name(raft_id: RaftId, storage: &Clusterwide) -> InstanceName } } -/// Choose a [`ReplicasetName`] for a new instance given its `failure_domain` and `tier`. +/// Choose a replicaset for the new instance based on `failure_domain`, `tier` +/// and the list of avaliable replicasets and instances in them. /// FIXME: a couple of problems: /// - expelled instances are errouneosly counted towards replication factor /// - must ignore replicasets with state ToBeExpelled & Expelled -fn choose_replicaset_name( +fn choose_replicaset( failure_domain: &FailureDomain, storage: &Clusterwide, - Tier { - replication_factor, - name: tier_name, - .. - }: &Tier, -) -> Result<ReplicasetName> { - // `BTreeMap` is used so that we get a determenistic order of instance addition to replicasets. + tier: &Tier, +) -> Result<Result<Replicaset, ReplicasetName>> { + let replication_factor = tier.replication_factor as _; + + // The list of candidate replicasets for the new instance + let mut replicasets = vec![]; + // The list of ids of all replicasets in the cluster + let mut all_replicasets = HashSet::new(); + + for replicaset in storage.replicasets.iter()? { + all_replicasets.insert(replicaset.name.clone()); + + // TODO: skip expelled replicasets + if replicaset.tier != tier.name { + continue; + } + + replicasets.push(SomeInfoAboutReplicaset { + replicaset, + instances: vec![], + }); + } + // We sort the array so that we get a determenistic order of instance addition to replicasets. // E.g. if both "r1" and "r2" are suitable, "r1" will always be prefered. - let mut replicasets: BTreeMap<_, Vec<_>> = BTreeMap::new(); - let replication_factor = (*replication_factor).into(); + // NOTE: can't use `sort_unstable_by_key` because of borrow checker, yay rust! + replicasets.sort_unstable_by(|lhs, rhs| lhs.replicaset.name.cmp(&rhs.replicaset.name)); + for instance in storage .instances .all_instances() .expect("storage should not fail") .into_iter() { - replicasets - .entry(instance.replicaset_name.clone()) - .or_default() - .push(instance); + if instance.tier != tier.name { + continue; + } + + // TODO: skip expelled instances + let index = + replicasets.binary_search_by_key(&&instance.replicaset_name, |i| &i.replicaset.name); + let index = index.expect("replicaset entries should be present for each instance"); + replicasets[index].instances.push(instance); } - 'next_replicaset: for (replicaset_name, instances) in replicasets.iter() { - if instances.len() < replication_factor - && instances - .first() - .expect("should not fail, each replicaset consists of at least one instance") - .tier - == *tier_name - { - for instance in instances { - if instance.failure_domain.intersects(failure_domain) { - continue 'next_replicaset; - } + + 'next_replicaset: for info in &replicasets { + // TODO: skip replicasets with state ToBeExpelled & Expelled + + if info.instances.len() >= replication_factor { + continue 'next_replicaset; + } + + for instance in &info.instances { + if instance.failure_domain.intersects(failure_domain) { + continue 'next_replicaset; } - return Ok(replicaset_name.clone()); } + + return Ok(Ok(info.replicaset.clone())); } let mut i = 0u64; loop { i += 1; let replicaset_name = ReplicasetName(format!("r{i}")); - if !replicasets.contains_key(&replicaset_name) { - return Ok(replicaset_name); + if !all_replicasets.contains(&replicaset_name) { + // Not found, hence id is ok + return Ok(Err(replicaset_name)); } } + + struct SomeInfoAboutReplicaset { + replicaset: Replicaset, + instances: Vec<Instance>, + } + + fn key_replicaset_name(info: &SomeInfoAboutReplicaset) -> &ReplicasetName { + &info.replicaset.name + } } -- GitLab