Skip to content
Snippets Groups Projects
Commit f9fd4195 authored by Egor Ivkov's avatar Egor Ivkov Committed by Yaroslav Dynnikov
Browse files

refactor: split logic in instance.rs between join and update_instance modules

parent 74644fdd
No related branches found
No related tags found
1 merge request!633refactor: split Topology struct logic
Pipeline #22357 passed
......@@ -102,6 +102,23 @@ impl FailureDomain {
.filter(|&&key| self.data.get(key) != other.data.get(key))
.count() as u64
}
/// Check that this failure domain contains all `required_domains`.
pub fn check(&self, required_domains: &HashSet<Uppercase>) -> Result<(), String> {
let mut res = Vec::new();
for domain_name in required_domains {
if !self.contains_name(domain_name) {
res.push(domain_name.to_string());
}
}
if res.is_empty() {
return Ok(());
}
res.sort();
Err(format!("missing failure domain names: {}", res.join(", ")))
}
}
impl std::fmt::Display for FailureDomain {
......
This diff is collapsed.
......@@ -19,7 +19,7 @@ use traft::RaftSpaceAccess;
use protobuf::Message as _;
use crate::args::Address;
use crate::instance::grade::TargetGradeVariant;
use crate::instance::grade::{CurrentGrade, TargetGrade, TargetGradeVariant};
use crate::instance::Instance;
use crate::traft::op;
use crate::traft::LogicalClock;
......@@ -355,9 +355,12 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) {
fn start_boot(args: &args::Run) {
tlog!(Info, ">>>>> start_boot()");
let instance = Instance::initial(
let instance = Instance::new(
None,
args.instance_id.clone(),
args.replicaset_id.clone(),
CurrentGrade::offline(0),
TargetGrade::offline(0),
args.failure_domain(),
);
let raft_id = instance.raft_id;
......
use std::collections::HashSet;
use std::time::Duration;
use crate::cas;
use crate::failure_domain::FailureDomain;
use crate::instance::replication_ids;
use crate::has_grades;
use crate::instance::grade::{CurrentGrade, TargetGrade};
use crate::instance::{Instance, InstanceId};
use crate::replicaset::ReplicasetId;
use crate::storage::ToEntryIter as _;
use crate::storage::{Clusterwide, ToEntryIter as _};
use crate::storage::{ClusterwideSpaceId, PropertyName};
use crate::traft;
use crate::traft::op::{Dml, Op};
use crate::traft::{self, RaftId};
use crate::traft::{error::Error, node, Address, PeerAddress, Result};
use ::tarantool::fiber;
......@@ -74,7 +76,7 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
let deadline = fiber::clock().saturating_add(timeout);
loop {
let instance = Instance::new(
let instance = build_instance(
req.instance_id.as_ref(),
req.replicaset_id.as_ref(),
&req.failure_domain,
......@@ -152,3 +154,105 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
});
}
}
pub fn build_instance(
instance_id: Option<&InstanceId>,
replicaset_id: Option<&ReplicasetId>,
failure_domain: &FailureDomain,
storage: &Clusterwide,
) -> std::result::Result<Instance, String> {
if let Some(id) = instance_id {
let existing_instance = storage.instances.get(id);
if matches!(existing_instance, Ok(instance) if has_grades!(instance, Online -> *)) {
let e = format!("{} is already joined", id);
return Err(e);
}
}
failure_domain.check(&storage.cache().failure_domain_names)?;
// Anyway, `join` always produces a new raft_id.
let raft_id = storage.cache().max_raft_id + 1;
let instance_id = instance_id
.map(Clone::clone)
.unwrap_or_else(|| choose_instance_id(raft_id, storage));
let replicaset_id = replicaset_id
.map(Clone::clone)
.unwrap_or_else(|| choose_replicaset_id(failure_domain, storage));
let instance = Instance::new(
Some(raft_id),
Some(instance_id),
Some(replicaset_id),
CurrentGrade::offline(0),
TargetGrade::offline(0),
failure_domain.clone(),
);
Ok(instance)
}
/// Choose [`InstanceId`] based on `raft_id`.
fn choose_instance_id(raft_id: RaftId, storage: &Clusterwide) -> InstanceId {
let mut suffix: Option<u64> = None;
loop {
let ret = match suffix {
None => format!("i{raft_id}"),
Some(x) => format!("i{raft_id}-{x}"),
}
.into();
if !storage
.instances
.contains(&ret)
.expect("storage should not fail")
{
return ret;
}
suffix = Some(suffix.map_or(2, |x| x + 1));
}
}
/// Choose a [`ReplicasetId`] for a new instance given its `failure_domain`.
fn choose_replicaset_id(failure_domain: &FailureDomain, storage: &Clusterwide) -> ReplicasetId {
'next_replicaset: for (replicaset_id, instances) in storage.cache().replicasets.iter() {
let replication_factor = storage
.properties
.replication_factor()
.expect("storage should not fail");
if instances.len() < replication_factor {
for instance_id in instances {
let instance = storage.instances.get(instance_id).unwrap();
if instance.failure_domain.intersects(failure_domain) {
continue 'next_replicaset;
}
}
return replicaset_id.clone();
}
}
let mut i = 0u64;
loop {
i += 1;
let replicaset_id = ReplicasetId(format!("r{i}"));
if storage.cache().replicasets.get(&replicaset_id).is_none() {
return replicaset_id;
}
}
}
/// Get ids of instances that are part of a replicaset with `replicaset_id`.
pub fn replication_ids(replicaset_id: &ReplicasetId, storage: &Clusterwide) -> HashSet<RaftId> {
if let Some(replication_ids) = storage.cache().replicasets.get(replicaset_id) {
replication_ids
.iter()
.map(|id| {
let instance = storage.instances.get(id).expect("storage should not fail");
instance.raft_id
})
.collect()
} else {
HashSet::new()
}
}
......@@ -2,9 +2,9 @@ use std::time::Duration;
use crate::cas;
use crate::failure_domain::FailureDomain;
use crate::instance::grade::{CurrentGrade, TargetGradeVariant};
use crate::instance::InstanceId;
use crate::storage::{ClusterwideSpaceId, PropertyName};
use crate::instance::grade::{CurrentGrade, CurrentGradeVariant, Grade, TargetGradeVariant};
use crate::instance::{Instance, InstanceId};
use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
use crate::traft::op::{Dml, Op};
use crate::traft::Result;
use crate::traft::{error::Error, node};
......@@ -97,11 +97,8 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
let deadline = fiber::clock().saturating_add(timeout);
loop {
let instance = storage
.instances
.get(&req.instance_id)?
.update(&req, storage)
.map_err(raft::Error::ConfChangeError)?;
let mut instance = storage.instances.get(&req.instance_id)?;
update_instance(&mut instance, &req, storage).map_err(raft::Error::ConfChangeError)?;
let dml = Dml::replace(ClusterwideSpaceId::Instance, &instance)
.expect("encoding should not fail");
......@@ -142,3 +139,49 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
return Ok(());
}
}
/// Updates existing [`Instance`].
pub fn update_instance(
instance: &mut Instance,
req: &Request,
storage: &Clusterwide,
) -> std::result::Result<(), String> {
if instance.current_grade == CurrentGradeVariant::Expelled
&& !matches!(
req,
Request {
target_grade: None,
current_grade: Some(current_grade),
failure_domain: None,
..
} if *current_grade == CurrentGradeVariant::Expelled
)
{
return Err(format!(
"cannot update expelled instance \"{}\"",
instance.instance_id
));
}
if let Some(fd) = req.failure_domain.as_ref() {
fd.check(&storage.cache().failure_domain_names)?;
instance.failure_domain = fd.clone();
}
if let Some(value) = req.current_grade {
instance.current_grade = value;
}
if let Some(variant) = req.target_grade {
let incarnation = match variant {
TargetGradeVariant::Online => instance.target_grade.incarnation + 1,
_ => instance.current_grade.incarnation,
};
instance.target_grade = Grade {
variant,
incarnation,
};
}
Ok(())
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment