Skip to content
Snippets Groups Projects
Commit 0fa0daf9 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: cleanup build_instance code

parent 291323c2
No related branches found
No related tags found
1 merge request!1399Gmoshkin/fix join after expel
......@@ -154,20 +154,26 @@ mod tests {
storage.tiers.put(&tier)
}
fn add_instance(storage: &Clusterwide, raft_id: RaftId, instance_name: &str, replicaset_name: &str, state: &State) -> tarantool::Result<Instance> {
let instance = Instance {
fn dummy_instance(raft_id: RaftId, name: &str, replicaset_name: &str, state: &State) -> Instance {
Instance {
raft_id,
name: instance_name.into(),
uuid: format!("{instance_name}-uuid"),
name: name.into(),
uuid: format!("{name}-uuid"),
replicaset_name: replicaset_name.into(),
replicaset_uuid: format!("{replicaset_name}-uuid"),
current_state: *state,
target_state: *state,
failure_domain: FailureDomain::default(),
tier: DEFAULT_TIER.into(),
};
storage.instances.put(&instance)?;
Ok(instance)
}
}
fn add_instance(storage: &Clusterwide, instance: &Instance) -> tarantool::Result<()> {
storage.instances.put(instance)?;
// Ignore error in case replicaset already exists. Good enough for tests
_ = storage.replicasets.put(&Replicaset::with_one_instance(instance));
Ok(())
}
fn replication_names(replicaset_name: &ReplicasetName, storage: &Clusterwide) -> HashSet<RaftId> {
......@@ -191,33 +197,33 @@ mod tests {
assert_eq!(i1.target_state, State::new(Offline, 0));
assert_eq!(i1.failure_domain, FailureDomain::default());
assert_eq!(i1.tier, DEFAULT_TIER);
storage.instances.put(&i1).unwrap();
add_instance(&storage, &i1).unwrap();
let i2 = build_instance(None, None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i2.raft_id, 2);
assert_eq!(i2.name, "i2");
assert_eq!(i2.replicaset_name, "r2");
storage.instances.put(&i2).unwrap();
add_instance(&storage, &i2).unwrap();
let i3 = build_instance(None, Some(&ReplicasetName::from("R3")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i3.raft_id, 3);
assert_eq!(i3.name, "i3");
assert_eq!(i3.replicaset_name, "R3");
storage.instances.put(&i3).unwrap();
add_instance(&storage, &i3).unwrap();
let i4 = build_instance(Some(&InstanceName::from("I4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i4.raft_id, 4);
assert_eq!(i4.name, "I4");
assert_eq!(i4.replicaset_name, "r3");
storage.instances.put(&i4).unwrap();
add_instance(&storage, &i4).unwrap();
}
#[::tarantool::test]
fn test_override() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
add_instance(&storage, 2, "i2", "r2-original", &State::new(Expelled, 0)).unwrap();
add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(2, "i2", "r2-original", &State::new(Expelled, 0))).unwrap();
// join::Request with a given instance_name online.
// - It must be an impostor, return an error.
......@@ -262,8 +268,8 @@ mod tests {
fn test_instance_name_collision() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
add_instance(&storage, 2, "i3", "r3", &State::new(Online, 1)).unwrap();
add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(2, "i3", "r3", &State::new(Online, 1))).unwrap();
// Attention: i3 has raft_id=2
let instance = build_instance(None, Some(&ReplicasetName::from("r2")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
......@@ -288,15 +294,14 @@ mod tests {
fn test_replication_factor() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 9, "i9", "r9", &State::new(Online, 1)).unwrap();
add_instance(&storage, 10, "i10", "r9", &State::new(Online, 1)).unwrap();
add_instance(&storage, &dummy_instance(9, "i9", "r9", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(10, "i10", "r9", &State::new(Online, 1))).unwrap();
let i1 = build_instance(Some(&InstanceName::from("i1")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i1.raft_id, 11);
assert_eq!(i1.name, "i1");
assert_eq!(i1.replicaset_name, "r1");
storage.instances.put(&i1).unwrap();
storage.replicasets.put(&Replicaset::with_one_instance(&i1)).unwrap();
add_instance(&storage, &i1).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11]));
......@@ -305,15 +310,14 @@ mod tests {
assert_eq!(i2.name, "i2");
assert_eq!(i2.replicaset_name, "r1");
assert_eq!(i2.replicaset_uuid, i1.replicaset_uuid);
storage.instances.put(&i2).unwrap();
add_instance(&storage, &i2).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11, 12]));
let i3 = build_instance(Some(&InstanceName::from("i3")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i3.raft_id, 13);
assert_eq!(i3.name, "i3");
assert_eq!(i3.replicaset_name, "r2");
storage.instances.put(&i3).unwrap();
storage.replicasets.put(&Replicaset::with_one_instance(&i3)).unwrap();
add_instance(&storage, &i3).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13]));
let i4 = build_instance(Some(&InstanceName::from("i4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
......@@ -321,7 +325,7 @@ mod tests {
assert_eq!(i4.name, "i4");
assert_eq!(i4.replicaset_name, "r2");
assert_eq!(i4.replicaset_uuid, i3.replicaset_uuid);
storage.instances.put(&i4).unwrap();
add_instance(&storage, &i4).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13, 14]));
}
......@@ -339,7 +343,8 @@ mod tests {
fn test_update_state() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 1, true).unwrap();
let instance = add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
let instance = dummy_instance(1, "i1", "r1", &State::new(Online, 1));
add_instance(&storage, &instance).unwrap();
let existing_fds = HashSet::new();
//
......@@ -459,31 +464,31 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Earth, os: BSD}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars, os: BSD}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {os: Arch}, &storage, DEFAULT_TIER).unwrap_err();
assert_eq!(e.to_string(), "missing failure domain names: PLANET");
......@@ -492,19 +497,19 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Venus, os: Arch}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Venus, os: Mac}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars, os: Mac}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {}, &storage, DEFAULT_TIER).unwrap_err();
assert_eq!(e.to_string(), "missing failure domain names: OS, PLANET");
......@@ -519,7 +524,7 @@ mod tests {
// first instance
//
let instance1 = build_instance(Some(&InstanceName::from("i1")), None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER).unwrap();
storage.instances.put(&instance1).unwrap();
add_instance(&storage, &instance1).unwrap();
let existing_fds = storage.instances.failure_domain_names().unwrap();
assert_eq!(instance1.failure_domain, faildoms! {planet: Earth});
assert_eq!(instance1.replicaset_name, "r1");
......@@ -560,7 +565,7 @@ mod tests {
let fd = faildoms! {planet: Mars, owner: Mike};
#[rustfmt::skip]
let instance2 = build_instance(Some(&InstanceName::from("i2")), None, &fd, &storage, DEFAULT_TIER).unwrap();
storage.instances.put(&instance2).unwrap();
add_instance(&storage, &instance2).unwrap();
let existing_fds = storage.instances.failure_domain_names().unwrap();
assert_eq!(instance2.failure_domain, fd);
// doesn't fit into r1
......@@ -587,7 +592,7 @@ mod tests {
#[rustfmt::skip]
let instance3_v1 = build_instance(Some(&InstanceName::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage, DEFAULT_TIER)
.unwrap();
storage.instances.put(&instance3_v1).unwrap();
add_instance(&storage, &instance3_v1).unwrap();
assert_eq!(
instance3_v1.failure_domain,
faildoms! {planet: B, owner: V, dimension: C137}
......@@ -618,31 +623,31 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Earth}, &storage, first_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, second_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, first_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Pluto}, &storage, third_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Venus}, &storage, third_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {planet: 5}, &storage, "noexistent_tier").unwrap_err();
assert_eq!(e.to_string(), r#"tier "noexistent_tier" doesn't exist"#);
......
use std::collections::BTreeMap;
use std::time::Duration;
use crate::cas;
use crate::failure_domain::FailureDomain;
use crate::has_states;
......@@ -16,7 +13,8 @@ use crate::tier::Tier;
use crate::traft::op::{Dml, Op};
use crate::traft::{self, RaftId};
use crate::traft::{error::Error, node, Address, PeerAddress, Result};
use std::collections::HashSet;
use std::time::Duration;
use tarantool::fiber;
const TIMEOUT: Duration = Duration::from_secs(10);
......@@ -154,14 +152,30 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
}
pub fn build_instance(
instance_name: Option<&InstanceName>,
replicaset_name: Option<&ReplicasetName>,
requested_instance_name: Option<&InstanceName>,
requested_replicaset_name: Option<&ReplicasetName>,
failure_domain: &FailureDomain,
storage: &Clusterwide,
tier: &str,
) -> Result<Instance> {
if let Some(id) = instance_name {
if let Ok(existing_instance) = storage.instances.get(id) {
// NOTE: currently we don't ever remove entries from `_pico_instance` even
// when expelling instances. This makes it so we can get a unique raft_id by
// selecting max raft_id from _pico_instance and adding one. However in the
// future we may want to start deleting old instance records and at that
// point we may face a problem of this id not being unique (i.e. belonging
// to an instance). There doesn't seem to be any problems with this per se,
// as raft will not allow there to be a simultaneous raft_id conflict, but
// it's just a thing to look out for.
let raft_id = storage
.instances
.max_raft_id()
.expect("storage should not fail")
+ 1;
// Resolve instance_name
let instance_name;
if let Some(name) = requested_instance_name {
if let Ok(existing_instance) = storage.instances.get(name) {
let is_expelled = has_states!(existing_instance, Expelled -> *);
if is_expelled {
// The instance was expelled explicitly, it's ok to replace it
......@@ -173,10 +187,15 @@ pub fn build_instance(
// joined it has both states Offline, which means it may be
// replaced by another one of the name before it sends a request
// for self activation.
return Err(Error::other(format!("`{id}` is already joined")));
return Err(Error::other(format!("`{name}` is already joined")));
}
}
instance_name = name.clone();
} else {
instance_name = choose_instance_name(raft_id, storage);
}
// Check tier exists
let Some(tier) = storage
.tiers
.by_name(tier)
......@@ -185,41 +204,55 @@ pub fn build_instance(
return Err(Error::other(format!(r#"tier "{tier}" doesn't exist"#)));
};
// Check failure domain constraints
let existing_fds = storage
.instances
.failure_domain_names()
.expect("storage should not fail");
failure_domain.check(&existing_fds)?;
// Anyway, `join` always produces a new raft_id.
let raft_id = storage
.instances
.max_raft_id()
.expect("storage should not fail")
+ 1;
let instance_name = instance_name
.cloned()
.unwrap_or_else(|| choose_instance_name(raft_id, storage));
let replicaset_name = match replicaset_name {
Some(replicaset_name) =>
// FIXME: must make sure the replicaset is not Expelled or ToBeExpelled
{
replicaset_name.clone()
}
None => choose_replicaset_name(failure_domain, storage, &tier)?,
};
let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
//
// Resolve replicaset
//
let replicaset_name;
let replicaset_uuid;
if let Some(replicaset) = storage.replicasets.get(&replicaset_name)? {
if replicaset.tier != tier.name {
return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {replicaset_name} is from tier: '{}'", tier.name, replicaset.tier)));
if let Some(requested_replicaset_name) = requested_replicaset_name {
let replicaset = storage.replicasets.get(requested_replicaset_name)?;
if let Some(replicaset) = replicaset {
if replicaset.tier != tier.name {
return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {requested_replicaset_name} is from tier: '{}'", tier.name, replicaset.tier)));
}
// FIXME: must make sure the replicaset is not Expelled or ToBeExpelled
// FIXME: must make sure the replicaset's tier is correct
// Join instance to existing replicaset
replicaset_name = requested_replicaset_name.clone();
replicaset_uuid = replicaset.uuid;
} else {
// Create a new replicaset
replicaset_name = requested_replicaset_name.clone();
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
}
replicaset_uuid = replicaset.uuid;
} else {
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
let res = choose_replicaset(failure_domain, storage, &tier)?;
match res {
Ok(replicaset) => {
// Join instance to existing replicaset
replicaset_name = replicaset.name;
replicaset_uuid = replicaset.uuid;
}
Err(new_replicaset_name) => {
// Create a new replicaset
replicaset_name = new_replicaset_name;
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
}
}
}
// Generate a unique instance_uuid
let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
Ok(Instance {
raft_id,
name: instance_name,
......@@ -256,57 +289,90 @@ fn choose_instance_name(raft_id: RaftId, storage: &Clusterwide) -> InstanceName
}
}
/// Choose a [`ReplicasetName`] for a new instance given its `failure_domain` and `tier`.
/// Choose a replicaset for the new instance based on `failure_domain`, `tier`
/// and the list of avaliable replicasets and instances in them.
/// FIXME: a couple of problems:
/// - expelled instances are errouneosly counted towards replication factor
/// - must ignore replicasets with state ToBeExpelled & Expelled
fn choose_replicaset_name(
fn choose_replicaset(
failure_domain: &FailureDomain,
storage: &Clusterwide,
Tier {
replication_factor,
name: tier_name,
..
}: &Tier,
) -> Result<ReplicasetName> {
// `BTreeMap` is used so that we get a determenistic order of instance addition to replicasets.
tier: &Tier,
) -> Result<Result<Replicaset, ReplicasetName>> {
let replication_factor = tier.replication_factor as _;
// The list of candidate replicasets for the new instance
let mut replicasets = vec![];
// The list of ids of all replicasets in the cluster
let mut all_replicasets = HashSet::new();
for replicaset in storage.replicasets.iter()? {
all_replicasets.insert(replicaset.name.clone());
// TODO: skip expelled replicasets
if replicaset.tier != tier.name {
continue;
}
replicasets.push(SomeInfoAboutReplicaset {
replicaset,
instances: vec![],
});
}
// We sort the array so that we get a determenistic order of instance addition to replicasets.
// E.g. if both "r1" and "r2" are suitable, "r1" will always be prefered.
let mut replicasets: BTreeMap<_, Vec<_>> = BTreeMap::new();
let replication_factor = (*replication_factor).into();
// NOTE: can't use `sort_unstable_by_key` because of borrow checker, yay rust!
replicasets.sort_unstable_by(|lhs, rhs| lhs.replicaset.name.cmp(&rhs.replicaset.name));
for instance in storage
.instances
.all_instances()
.expect("storage should not fail")
.into_iter()
{
replicasets
.entry(instance.replicaset_name.clone())
.or_default()
.push(instance);
if instance.tier != tier.name {
continue;
}
// TODO: skip expelled instances
let index =
replicasets.binary_search_by_key(&&instance.replicaset_name, |i| &i.replicaset.name);
let index = index.expect("replicaset entries should be present for each instance");
replicasets[index].instances.push(instance);
}
'next_replicaset: for (replicaset_name, instances) in replicasets.iter() {
if instances.len() < replication_factor
&& instances
.first()
.expect("should not fail, each replicaset consists of at least one instance")
.tier
== *tier_name
{
for instance in instances {
if instance.failure_domain.intersects(failure_domain) {
continue 'next_replicaset;
}
'next_replicaset: for info in &replicasets {
// TODO: skip replicasets with state ToBeExpelled & Expelled
if info.instances.len() >= replication_factor {
continue 'next_replicaset;
}
for instance in &info.instances {
if instance.failure_domain.intersects(failure_domain) {
continue 'next_replicaset;
}
return Ok(replicaset_name.clone());
}
return Ok(Ok(info.replicaset.clone()));
}
let mut i = 0u64;
loop {
i += 1;
let replicaset_name = ReplicasetName(format!("r{i}"));
if !replicasets.contains_key(&replicaset_name) {
return Ok(replicaset_name);
if !all_replicasets.contains(&replicaset_name) {
// Not found, hence id is ok
return Ok(Err(replicaset_name));
}
}
struct SomeInfoAboutReplicaset {
replicaset: Replicaset,
instances: Vec<Instance>,
}
fn key_replicaset_name(info: &SomeInfoAboutReplicaset) -> &ReplicasetName {
&info.replicaset.name
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment