Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (13)
......@@ -3,7 +3,7 @@
url = ../tarantool-module.git
[submodule "tarantool-sys"]
path = tarantool-sys
url = ../../tarantool.git
url = ../tarantool.git
ignore = dirty
[submodule "vshard"]
path = vshard
......
......@@ -8,6 +8,12 @@ with the `YY.MINOR.MICRO` scheme.
<img src="https://img.shields.io/badge/calver-YY.MINOR.MICRO-22bfda.svg">
## 24.7.0 - Unreleased
### CLI
- `picodata expel` takes instance uuid instead of instance name.
## [24.6.1] - 2024-10-28
### Configuration
......@@ -120,6 +126,7 @@ with the `YY.MINOR.MICRO` scheme.
support WAIT APPLIED (GLOBALLY | LOCALLY) options, allowing users to wait for operations to be
committed across all replicasets or only on the current one
- EXPLAIN estimates query buckets
- SQL supports `COALESCE` function
### Fixes
......
......@@ -3457,7 +3457,6 @@ dependencies = [
"hash32",
"itertools",
"lazy_static",
"opentelemetry",
"pest",
"pest_derive",
"pretty_assertions 1.4.1",
......@@ -3465,7 +3464,6 @@ dependencies = [
"rmp",
"rmp-serde",
"rmpv",
"sbroad-proc",
"serde",
"serde_bytes",
"serde_yaml",
......@@ -3475,14 +3473,6 @@ dependencies = [
"uuid 1.11.0",
]
[[package]]
name = "sbroad-proc"
version = "0.1.0"
dependencies = [
"quote 1.0.37",
"syn 1.0.109",
]
[[package]]
name = "schannel"
version = "0.1.26"
......
......@@ -89,9 +89,9 @@ make build-release-pkg
%doc README.md
%{!?_licensedir:%global license %doc}
%if "%{?_build_vendor}" == "alt"
%doc docs/licenses/eula_en.txt docs/licenses/eula_ru.txt AUTHORS
%doc doc/licenses/eula_en.txt doc/licenses/eula_ru.txt AUTHORS
%else
%license docs/licenses/eula_en.txt docs/licenses/eula_ru.txt AUTHORS
%license doc/licenses/eula_en.txt doc/licenses/eula_ru.txt AUTHORS
%endif
%changelog
......
Subproject commit 73e6815a3399c085fcaff3bca7df86463b284bde
Subproject commit c144e8f4a4fb1602888a6c8907945a07f8943093
use crate::address::{HttpAddress, IprotoAddress};
use crate::config::{ByteSize, DEFAULT_USERNAME};
use crate::info::PICODATA_VERSION;
use crate::instance::InstanceName;
use crate::util::Uppercase;
use clap::Parser;
use std::borrow::Cow;
......@@ -360,9 +359,9 @@ pub struct Expel {
/// Name of the cluster from instance should be expelled.
pub cluster_name: String,
#[clap(value_name = "INSTANCE_NAME")]
/// Name of the instance to expel.
pub instance_name: InstanceName,
#[clap(value_name = "INSTANCE_UUID")]
/// UUID of the instance to expel.
pub instance_uuid: String,
#[clap(
long = "peer",
......
......@@ -18,7 +18,7 @@ pub async fn tt_expel(args: args::Expel) -> Result<(), Error> {
let req = ExpelRequest {
cluster_name: args.cluster_name,
instance_name: args.instance_name.clone(),
instance_uuid: args.instance_uuid.clone(),
};
fiber::block_on(client.call(crate::proc_name!(proc_expel_redirect), &req))
.map_err(|e| Error::other(format!("Failed to expel instance: {e}")))?;
......@@ -26,7 +26,7 @@ pub async fn tt_expel(args: args::Expel) -> Result<(), Error> {
tlog!(
Info,
"Instance {} successfully expelled",
args.instance_name
args.instance_uuid
);
Ok(())
......
......@@ -19,6 +19,8 @@ use crate::storage::{ClusterwideTable, PropertyName};
use crate::sync::GetVclockRpc;
use crate::tier::Tier;
use crate::tlog;
use crate::traft::error::Error;
use crate::traft::error::IdOfInstance;
use crate::traft::op::Dml;
use crate::traft::op::Op;
#[allow(unused_imports)]
......@@ -62,7 +64,7 @@ pub(super) fn action_plan<'i>(
.iter()
.find(|instance| instance.raft_id == my_raft_id)
else {
return Err(crate::traft::error::Error::NoSuchInstance(Ok(my_raft_id)));
return Err(Error::NoSuchInstance(IdOfInstance::RaftId(my_raft_id)));
};
if has_states!(this_instance, * -> Offline) || has_states!(this_instance, * -> Expelled) {
let mut new_leader = None;
......@@ -165,26 +167,13 @@ pub(super) fn action_plan<'i>(
////////////////////////////////////////////////////////////////////////////
// configure replication
let replicaset_to_configure = replicasets
.values()
.find(|replicaset| replicaset.current_config_version != replicaset.target_config_version);
if let Some(replicaset) = replicaset_to_configure {
if let Some((replicaset, targets, replicaset_peers)) =
get_replicaset_to_configure(instances, peer_addresses, replicasets)
{
// Targets must not be empty, otherwise we would bump the version
// without actually calling the RPC.
debug_assert!(!targets.is_empty());
let replicaset_name = &replicaset.name;
let mut targets = Vec::new();
let mut replicaset_peers = Vec::new();
for instance in instances {
if instance.replicaset_name != replicaset_name {
continue;
}
if let Some(address) = peer_addresses.get(&instance.raft_id) {
replicaset_peers.push(address.clone());
} else {
warn_or_panic!("replica `{}` address unknown, will be excluded from box.cfg.replication of replicaset `{replicaset_name}`", instance.name);
}
if instance.may_respond() {
targets.push(&instance.name);
}
}
let mut master_name = None;
if replicaset.current_master_name == replicaset.target_master_name {
......@@ -207,20 +196,14 @@ pub(super) fn action_plan<'i>(
let cas = cas::Request::new(dml, predicate, ADMIN_ID)?;
let replication_config_version_actualize = cas;
if !targets.is_empty() {
return Ok(ConfigureReplication {
replicaset_name,
targets,
master_name,
replicaset_peers,
replication_config_version_actualize,
}
.into());
} else {
#[rustfmt::skip]
tlog!(Warning, "all replicas in {replicaset_name} are offline, skipping replication configuration");
// Fall through, look for other things to do
return Ok(ConfigureReplication {
replicaset_name,
targets,
master_name,
replicaset_peers,
replication_config_version_actualize,
}
.into());
}
////////////////////////////////////////////////////////////////////////////
......@@ -1159,6 +1142,55 @@ pub mod stage {
}
}
fn get_replicaset_to_configure<'i>(
instances: &'i [Instance],
peer_addresses: &'i HashMap<RaftId, String>,
replicasets: &HashMap<&ReplicasetName, &'i Replicaset>,
) -> Option<(&'i Replicaset, Vec<&'i InstanceName>, Vec<String>)> {
for replicaset in replicasets.values() {
if replicaset.current_config_version == replicaset.target_config_version {
// Already configured
continue;
}
let replicaset_name = &replicaset.name;
let mut rpc_targets = Vec::new();
let mut replication_peers = Vec::new();
for instance in instances {
if has_states!(instance, Expelled -> *) {
// Expelled instances are ignored for everything,
// we only store them for history
continue;
}
if instance.replicaset_name != replicaset_name {
continue;
}
let instance_name = &instance.name;
if let Some(address) = peer_addresses.get(&instance.raft_id) {
replication_peers.push(address.clone());
} else {
warn_or_panic!("replica `{instance_name}` address unknown, will be excluded from box.cfg.replication of replicaset `{replicaset_name}`");
}
if instance.may_respond() {
rpc_targets.push(instance_name);
}
}
if !rpc_targets.is_empty() {
return Some((replicaset, rpc_targets, replication_peers));
}
#[rustfmt::skip]
tlog!(Warning, "all replicas in {replicaset_name} are offline, skipping replication configuration");
}
// No replication configuration needed
None
}
/// Checks if there's replicaset whose master is offline and tries to find a
/// replica to promote.
///
......@@ -1234,6 +1266,10 @@ fn get_replicaset_state_change<'i>(
let mut replicaset_sizes = HashMap::new();
for instance in maybe_responding(instances) {
let instance_name = &instance.name;
if has_states!(instance, Expelled -> *) {
continue;
}
let replicaset_name = &instance.replicaset_name;
let tier = &instance.tier;
......
......@@ -94,7 +94,10 @@ impl Instance {
// If instance is going offline ugracefully it will likely not respond
has_states!(self, * -> not Offline) &&
// If instance has already been expelled it will definitely not respond
has_states!(self, not Expelled -> *)
has_states!(self, not Expelled -> *) &&
// If instance is currently offline and is being expelled, the above
// rules don't work, but the instance is definitely not going to respond
!has_states!(self, Offline -> Expelled)
}
#[inline]
......@@ -154,20 +157,26 @@ mod tests {
storage.tiers.put(&tier)
}
fn add_instance(storage: &Clusterwide, raft_id: RaftId, instance_name: &str, replicaset_name: &str, state: &State) -> tarantool::Result<Instance> {
let instance = Instance {
fn dummy_instance(raft_id: RaftId, name: &str, replicaset_name: &str, state: &State) -> Instance {
Instance {
raft_id,
name: instance_name.into(),
uuid: format!("{instance_name}-uuid"),
name: name.into(),
uuid: format!("{name}-uuid"),
replicaset_name: replicaset_name.into(),
replicaset_uuid: format!("{replicaset_name}-uuid"),
current_state: *state,
target_state: *state,
failure_domain: FailureDomain::default(),
tier: DEFAULT_TIER.into(),
};
storage.instances.put(&instance)?;
Ok(instance)
}
}
fn add_instance(storage: &Clusterwide, instance: &Instance) -> tarantool::Result<()> {
storage.instances.put(instance)?;
// Ignore error in case replicaset already exists. Good enough for tests
_ = storage.replicasets.put(&Replicaset::with_one_instance(instance));
Ok(())
}
fn replication_names(replicaset_name: &ReplicasetName, storage: &Clusterwide) -> HashSet<RaftId> {
......@@ -191,33 +200,33 @@ mod tests {
assert_eq!(i1.target_state, State::new(Offline, 0));
assert_eq!(i1.failure_domain, FailureDomain::default());
assert_eq!(i1.tier, DEFAULT_TIER);
storage.instances.put(&i1).unwrap();
add_instance(&storage, &i1).unwrap();
let i2 = build_instance(None, None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i2.raft_id, 2);
assert_eq!(i2.name, "i2");
assert_eq!(i2.replicaset_name, "r2");
storage.instances.put(&i2).unwrap();
add_instance(&storage, &i2).unwrap();
let i3 = build_instance(None, Some(&ReplicasetName::from("R3")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i3.raft_id, 3);
assert_eq!(i3.name, "i3");
assert_eq!(i3.replicaset_name, "R3");
storage.instances.put(&i3).unwrap();
add_instance(&storage, &i3).unwrap();
let i4 = build_instance(Some(&InstanceName::from("I4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i4.raft_id, 4);
assert_eq!(i4.name, "I4");
assert_eq!(i4.replicaset_name, "r3");
storage.instances.put(&i4).unwrap();
add_instance(&storage, &i4).unwrap();
}
#[::tarantool::test]
fn test_override() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
add_instance(&storage, 2, "i2", "r2-original", &State::new(Expelled, 0)).unwrap();
add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(2, "i2", "r2-original", &State::new(Expelled, 0))).unwrap();
// join::Request with a given instance_name online.
// - It must be an impostor, return an error.
......@@ -262,8 +271,8 @@ mod tests {
fn test_instance_name_collision() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
add_instance(&storage, 2, "i3", "r3", &State::new(Online, 1)).unwrap();
add_instance(&storage, &dummy_instance(1, "i1", "r1", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(2, "i3", "r3", &State::new(Online, 1))).unwrap();
// Attention: i3 has raft_id=2
let instance = build_instance(None, Some(&ReplicasetName::from("r2")), &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
......@@ -288,15 +297,14 @@ mod tests {
fn test_replication_factor() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 2, true).unwrap();
add_instance(&storage, 9, "i9", "r9", &State::new(Online, 1)).unwrap();
add_instance(&storage, 10, "i10", "r9", &State::new(Online, 1)).unwrap();
add_instance(&storage, &dummy_instance(9, "i9", "r9", &State::new(Online, 1))).unwrap();
add_instance(&storage, &dummy_instance(10, "i10", "r9", &State::new(Online, 1))).unwrap();
let i1 = build_instance(Some(&InstanceName::from("i1")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i1.raft_id, 11);
assert_eq!(i1.name, "i1");
assert_eq!(i1.replicaset_name, "r1");
storage.instances.put(&i1).unwrap();
storage.replicasets.put(&Replicaset::with_one_instance(&i1)).unwrap();
add_instance(&storage, &i1).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11]));
......@@ -305,15 +313,14 @@ mod tests {
assert_eq!(i2.name, "i2");
assert_eq!(i2.replicaset_name, "r1");
assert_eq!(i2.replicaset_uuid, i1.replicaset_uuid);
storage.instances.put(&i2).unwrap();
add_instance(&storage, &i2).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r1"), &storage), HashSet::from([11, 12]));
let i3 = build_instance(Some(&InstanceName::from("i3")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
assert_eq!(i3.raft_id, 13);
assert_eq!(i3.name, "i3");
assert_eq!(i3.replicaset_name, "r2");
storage.instances.put(&i3).unwrap();
storage.replicasets.put(&Replicaset::with_one_instance(&i3)).unwrap();
add_instance(&storage, &i3).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13]));
let i4 = build_instance(Some(&InstanceName::from("i4")), None, &FailureDomain::default(), &storage, DEFAULT_TIER).unwrap();
......@@ -321,7 +328,7 @@ mod tests {
assert_eq!(i4.name, "i4");
assert_eq!(i4.replicaset_name, "r2");
assert_eq!(i4.replicaset_uuid, i3.replicaset_uuid);
storage.instances.put(&i4).unwrap();
add_instance(&storage, &i4).unwrap();
assert_eq!(replication_names(&ReplicasetName::from("r2"), &storage), HashSet::from([13, 14]));
}
......@@ -339,7 +346,8 @@ mod tests {
fn test_update_state() {
let storage = Clusterwide::for_tests();
add_tier(&storage, DEFAULT_TIER, 1, true).unwrap();
let instance = add_instance(&storage, 1, "i1", "r1", &State::new(Online, 1)).unwrap();
let instance = dummy_instance(1, "i1", "r1", &State::new(Online, 1));
add_instance(&storage, &instance).unwrap();
let existing_fds = HashSet::new();
//
......@@ -459,31 +467,31 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Earth, os: BSD}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars, os: BSD}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {os: Arch}, &storage, DEFAULT_TIER).unwrap_err();
assert_eq!(e.to_string(), "missing failure domain names: PLANET");
......@@ -492,19 +500,19 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Venus, os: Arch}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Venus, os: Mac}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars, os: Mac}, &storage, DEFAULT_TIER)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {}, &storage, DEFAULT_TIER).unwrap_err();
assert_eq!(e.to_string(), "missing failure domain names: OS, PLANET");
......@@ -519,7 +527,7 @@ mod tests {
// first instance
//
let instance1 = build_instance(Some(&InstanceName::from("i1")), None, &faildoms! {planet: Earth}, &storage, DEFAULT_TIER).unwrap();
storage.instances.put(&instance1).unwrap();
add_instance(&storage, &instance1).unwrap();
let existing_fds = storage.instances.failure_domain_names().unwrap();
assert_eq!(instance1.failure_domain, faildoms! {planet: Earth});
assert_eq!(instance1.replicaset_name, "r1");
......@@ -560,7 +568,7 @@ mod tests {
let fd = faildoms! {planet: Mars, owner: Mike};
#[rustfmt::skip]
let instance2 = build_instance(Some(&InstanceName::from("i2")), None, &fd, &storage, DEFAULT_TIER).unwrap();
storage.instances.put(&instance2).unwrap();
add_instance(&storage, &instance2).unwrap();
let existing_fds = storage.instances.failure_domain_names().unwrap();
assert_eq!(instance2.failure_domain, fd);
// doesn't fit into r1
......@@ -587,7 +595,7 @@ mod tests {
#[rustfmt::skip]
let instance3_v1 = build_instance(Some(&InstanceName::from("i3")), None, &faildoms! {planet: B, owner: V, dimension: C137}, &storage, DEFAULT_TIER)
.unwrap();
storage.instances.put(&instance3_v1).unwrap();
add_instance(&storage, &instance3_v1).unwrap();
assert_eq!(
instance3_v1.failure_domain,
faildoms! {planet: B, owner: V, dimension: C137}
......@@ -618,31 +626,31 @@ mod tests {
build_instance(None, None, &faildoms! {planet: Earth}, &storage, first_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, second_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r2");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Mars}, &storage, first_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r1");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Pluto}, &storage, third_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let instance =
build_instance(None, None, &faildoms! {planet: Venus}, &storage, third_tier)
.unwrap();
assert_eq!(instance.replicaset_name, "r3");
storage.instances.put(&instance).unwrap();
add_instance(&storage, &instance).unwrap();
let e = build_instance(None, None, &faildoms! {planet: 5}, &storage, "noexistent_tier").unwrap_err();
assert_eq!(e.to_string(), r#"tier "noexistent_tier" doesn't exist"#);
......
......@@ -712,12 +712,14 @@ pub(crate) fn setup() {
(nil, string) in case of an error
"},
tlua::function1(|instance_name: InstanceName| -> traft::Result<bool> {
let raft_storage = &traft::node::global()?.raft_storage;
let node = traft::node::global()?;
let raft_storage = &node.raft_storage;
let instance = node.storage.instances.get(&instance_name)?;
let cluster_name = raft_storage.cluster_name()?;
fiber::block_on(rpc::network_call_to_leader(
crate::proc_name!(rpc::expel::proc_expel),
&rpc::expel::Request {
instance_name,
instance_uuid: instance.uuid,
cluster_name,
},
))?;
......
use std::time::Duration;
use crate::instance::InstanceName;
use crate::instance::StateVariant::*;
use crate::instance::{Instance, StateVariant::*};
use crate::rpc;
use crate::rpc::update_instance::handle_update_instance_request_and_wait;
use crate::traft::error::IdOfInstance;
use crate::traft::Result;
use crate::traft::{error::Error, node};
......@@ -34,7 +34,12 @@ crate::define_rpc_request! {
});
}
let req = rpc::update_instance::Request::new(req.instance_name, req.cluster_name)
let instance = node.storage.instances.by_uuid(&req.instance_uuid)?;
let Some(Instance { name, .. }) = instance else {
return Err(Error::NoSuchInstance(IdOfInstance::Uuid(req.instance_uuid)));
};
let req = rpc::update_instance::Request::new(name, req.cluster_name)
.with_target_state(Expelled);
handle_update_instance_request_and_wait(req, TIMEOUT)?;
......@@ -46,7 +51,7 @@ crate::define_rpc_request! {
/// Use [`redirect::Request`] for automatic redirection from any instance to leader.
pub struct Request {
pub cluster_name: String,
pub instance_name: InstanceName,
pub instance_uuid: String,
}
pub struct Response {}
......
use std::collections::BTreeMap;
use std::time::Duration;
use crate::cas;
use crate::failure_domain::FailureDomain;
use crate::has_states;
......@@ -9,14 +6,17 @@ use crate::instance::StateVariant::*;
use crate::instance::{Instance, InstanceName};
use crate::replicaset::Replicaset;
use crate::replicaset::ReplicasetName;
use crate::replicaset::ReplicasetState;
use crate::schema::ADMIN_ID;
use crate::storage::ClusterwideTable;
use crate::storage::{Clusterwide, ToEntryIter as _};
use crate::tier::Tier;
use crate::tlog;
use crate::traft::op::{Dml, Op};
use crate::traft::{self, RaftId};
use crate::traft::{error::Error, node, Address, PeerAddress, Result};
use std::collections::HashSet;
use std::time::Duration;
use tarantool::fiber;
const TIMEOUT: Duration = Duration::from_secs(10);
......@@ -35,7 +35,10 @@ crate::define_rpc_request! {
/// 4. Compare and swap request to commit new instance and its address failed
/// with an error that cannot be retried.
fn proc_raft_join(req: Request) -> Result<Response> {
handle_join_request_and_wait(req, TIMEOUT)
let res = handle_join_request_and_wait(req, TIMEOUT)?;
tlog!(Info, "new instance joined the cluster: {:?}", res.instance);
Ok(res)
}
/// Request to join the cluster.
......@@ -103,14 +106,18 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
.expect("encoding should not fail"),
);
if storage
.replicasets
.get(&instance.replicaset_name)?
.is_none()
{
let res = storage.replicasets.by_uuid_raw(&instance.replicaset_uuid);
if let Err(Error::NoSuchReplicaset { .. }) = res {
let replicaset = Replicaset::with_one_instance(&instance);
ops.push(
Dml::insert(ClusterwideTable::Replicaset, &replicaset, ADMIN_ID)
// NOTE: we use replace instead of insert, because at the
// moment primary key in _pico_replicaset is the replicaset_name (name),
// but in here we may be creating a new replicaset with
// the name of a previously expelled replicaset.
// The new replicaset will have a new unique uuid, so once we
// make the uuid the primary key, we can switch back to using
// insert here.
Dml::replace(ClusterwideTable::Replicaset, &replicaset, ADMIN_ID)
.expect("encoding should not fail"),
);
}
......@@ -135,13 +142,14 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
// A joined instance needs to communicate with other nodes.
// TODO: limit the number of entries sent to reduce response size.
let peer_addresses = node.storage.peer_addresses.iter()?.collect();
let mut replication_addresses = storage.peer_addresses.addresses_by_ids(
storage
.instances
.replicaset_instances(&instance.replicaset_name)
.expect("storage should not fail")
.map(|i| i.raft_id),
)?;
let replicas = storage
.instances
.replicaset_instances(&instance.replicaset_name)
.expect("storage should not fail")
// Ignore expelled instances
.filter(|i| !has_states!(i, Expelled -> *))
.map(|i| i.raft_id);
let mut replication_addresses = storage.peer_addresses.addresses_by_ids(replicas)?;
replication_addresses.insert(req.advertise_address.clone());
drop(guard);
......@@ -154,14 +162,30 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R
}
pub fn build_instance(
instance_name: Option<&InstanceName>,
replicaset_name: Option<&ReplicasetName>,
requested_instance_name: Option<&InstanceName>,
requested_replicaset_name: Option<&ReplicasetName>,
failure_domain: &FailureDomain,
storage: &Clusterwide,
tier: &str,
) -> Result<Instance> {
if let Some(id) = instance_name {
if let Ok(existing_instance) = storage.instances.get(id) {
// NOTE: currently we don't ever remove entries from `_pico_instance` even
// when expelling instances. This makes it so we can get a unique raft_id by
// selecting max raft_id from _pico_instance and adding one. However in the
// future we may want to start deleting old instance records and at that
// point we may face a problem of this id not being unique (i.e. belonging
// to an instance). There doesn't seem to be any problems with this per se,
// as raft will not allow there to be a simultaneous raft_id conflict, but
// it's just a thing to look out for.
let raft_id = storage
.instances
.max_raft_id()
.expect("storage should not fail")
+ 1;
// Resolve instance_name
let instance_name;
if let Some(name) = requested_instance_name {
if let Ok(existing_instance) = storage.instances.get(name) {
let is_expelled = has_states!(existing_instance, Expelled -> *);
if is_expelled {
// The instance was expelled explicitly, it's ok to replace it
......@@ -173,10 +197,15 @@ pub fn build_instance(
// joined it has both states Offline, which means it may be
// replaced by another one of the name before it sends a request
// for self activation.
return Err(Error::other(format!("`{id}` is already joined")));
return Err(Error::other(format!("`{name}` is already joined")));
}
}
instance_name = name.clone();
} else {
instance_name = choose_instance_name(raft_id, storage);
}
// Check tier exists
let Some(tier) = storage
.tiers
.by_name(tier)
......@@ -185,41 +214,61 @@ pub fn build_instance(
return Err(Error::other(format!(r#"tier "{tier}" doesn't exist"#)));
};
// Check failure domain constraints
let existing_fds = storage
.instances
.failure_domain_names()
.expect("storage should not fail");
failure_domain.check(&existing_fds)?;
// Anyway, `join` always produces a new raft_id.
let raft_id = storage
.instances
.max_raft_id()
.expect("storage should not fail")
+ 1;
let instance_name = instance_name
.cloned()
.unwrap_or_else(|| choose_instance_name(raft_id, storage));
let replicaset_name = match replicaset_name {
Some(replicaset_name) =>
// FIXME: must make sure the replicaset is not Expelled or ToBeExpelled
{
replicaset_name.clone()
}
None => choose_replicaset_name(failure_domain, storage, &tier)?,
};
let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
//
// Resolve replicaset
//
let replicaset_name;
let replicaset_uuid;
if let Some(replicaset) = storage.replicasets.get(&replicaset_name)? {
if replicaset.tier != tier.name {
return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {replicaset_name} is from tier: '{}'", tier.name, replicaset.tier)));
if let Some(requested_replicaset_name) = requested_replicaset_name {
let replicaset = storage.replicasets.get(requested_replicaset_name)?;
match replicaset {
Some(replicaset) if replicaset.state != ReplicasetState::Expelled => {
if replicaset.tier != tier.name {
return Err(Error::other(format!("tier mismatch: instance {instance_name} is from tier: '{}', but replicaset {requested_replicaset_name} is from tier: '{}'", tier.name, replicaset.tier)));
}
if replicaset.state == ReplicasetState::ToBeExpelled {
#[rustfmt::skip]
return Err(Error::other("cannot join replicaset which is being expelled"));
}
// Join instance to existing replicaset
replicaset_name = requested_replicaset_name.clone();
replicaset_uuid = replicaset.uuid;
}
// Replicaset doesn't exist or was expelled
_ => {
// Create a new replicaset
replicaset_name = requested_replicaset_name.clone();
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
}
}
replicaset_uuid = replicaset.uuid;
} else {
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
let res = choose_replicaset(failure_domain, storage, &tier)?;
match res {
Ok(replicaset) => {
// Join instance to existing replicaset
replicaset_name = replicaset.name;
replicaset_uuid = replicaset.uuid;
}
Err(new_replicaset_name) => {
// Create a new replicaset
replicaset_name = new_replicaset_name;
replicaset_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
}
}
}
// Generate a unique instance_uuid
let instance_uuid = uuid::Uuid::new_v4().to_hyphenated().to_string();
Ok(Instance {
raft_id,
name: instance_name,
......@@ -256,57 +305,103 @@ fn choose_instance_name(raft_id: RaftId, storage: &Clusterwide) -> InstanceName
}
}
/// Choose a [`ReplicasetName`] for a new instance given its `failure_domain` and `tier`.
/// FIXME: a couple of problems:
/// - expelled instances are errouneosly counted towards replication factor
/// - must ignore replicasets with state ToBeExpelled & Expelled
fn choose_replicaset_name(
/// Choose a replicaset for the new instance based on `failure_domain`, `tier`
/// and the list of avaliable replicasets and instances in them.
fn choose_replicaset(
failure_domain: &FailureDomain,
storage: &Clusterwide,
Tier {
replication_factor,
name: tier_name,
..
}: &Tier,
) -> Result<ReplicasetName> {
// `BTreeMap` is used so that we get a determenistic order of instance addition to replicasets.
tier: &Tier,
) -> Result<Result<Replicaset, ReplicasetName>> {
let replication_factor = tier.replication_factor as _;
// The list of candidate replicasets for the new instance
let mut replicasets = vec![];
// The list of ids of all replicasets in the cluster
let mut all_replicasets = HashSet::new();
for replicaset in storage.replicasets.iter()? {
all_replicasets.insert(replicaset.name.clone());
if replicaset.tier != tier.name {
continue;
}
if replicaset.state == ReplicasetState::ToBeExpelled {
continue;
}
if replicaset.state == ReplicasetState::Expelled {
// NOTE: we could allow atomatically reusing old expelled
// replicasets, i.e. reusing the name but generating a new uuid, but
// it's not clear why would we do this..
continue;
}
replicasets.push(SomeInfoAboutReplicaset {
replicaset,
instances: vec![],
});
}
// We sort the array so that we get a determenistic order of instance addition to replicasets.
// E.g. if both "r1" and "r2" are suitable, "r1" will always be prefered.
let mut replicasets: BTreeMap<_, Vec<_>> = BTreeMap::new();
let replication_factor = (*replication_factor).into();
// NOTE: can't use `sort_unstable_by_key` because of borrow checker, yay rust!
replicasets.sort_unstable_by(|lhs, rhs| lhs.replicaset.name.cmp(&rhs.replicaset.name));
for instance in storage
.instances
.all_instances()
.expect("storage should not fail")
.into_iter()
{
replicasets
.entry(instance.replicaset_name.clone())
.or_default()
.push(instance);
if instance.tier != tier.name {
continue;
}
if has_states!(instance, Expelled -> *) {
// Expelled instances are ignored
continue;
}
let index =
replicasets.binary_search_by_key(&&instance.replicaset_name, |i| &i.replicaset.name);
let Ok(index) = index else {
debug_assert!(all_replicasets.contains(&instance.replicaset_name));
// Replicaset is skipped for some reason, so this instance's info is
// not going to be used
continue;
};
replicasets[index].instances.push(instance);
}
'next_replicaset: for (replicaset_name, instances) in replicasets.iter() {
if instances.len() < replication_factor
&& instances
.first()
.expect("should not fail, each replicaset consists of at least one instance")
.tier
== *tier_name
{
for instance in instances {
if instance.failure_domain.intersects(failure_domain) {
continue 'next_replicaset;
}
'next_replicaset: for info in &replicasets {
// TODO: skip replicasets with state ToBeExpelled & Expelled
if info.instances.len() >= replication_factor {
continue 'next_replicaset;
}
for instance in &info.instances {
if instance.failure_domain.intersects(failure_domain) {
continue 'next_replicaset;
}
return Ok(replicaset_name.clone());
}
return Ok(Ok(info.replicaset.clone()));
}
let mut i = 0u64;
loop {
i += 1;
let replicaset_name = ReplicasetName(format!("r{i}"));
if !replicasets.contains_key(&replicaset_name) {
return Ok(replicaset_name);
if !all_replicasets.contains(&replicaset_name) {
// Not found, hence id is ok
return Ok(Err(replicaset_name));
}
}
struct SomeInfoAboutReplicaset {
replicaset: Replicaset,
instances: Vec<Instance>,
}
}
......@@ -9,6 +9,7 @@ use crate::replicaset::Replicaset;
use crate::schema::ADMIN_ID;
use crate::storage::ClusterwideTable;
use crate::tier::Tier;
use crate::tlog;
use crate::traft::op::{Dml, Op};
use crate::traft::Result;
use crate::traft::{error::Error, node};
......@@ -35,8 +36,11 @@ crate::define_rpc_request! {
/// with an error that cannot be retried.
fn proc_update_instance(req: Request) -> Result<Response> {
if req.current_state.is_some() {
return Err(Error::Other("Changing current state through Proc API is not allowed.".into()));
tlog!(Warning, "invalid request to update current state: {req:?}");
return Err(Error::Other("Changing current state through Proc API is not allowed.".into()));
}
tlog!(Debug, "got update instance request: {req:?}");
handle_update_instance_request_and_wait(req, TIMEOUT)?;
Ok(Response {})
}
......
......@@ -6,9 +6,7 @@ use sbroad::backend::sql::ir::PatternWithParams;
use sbroad::backend::sql::space::TableGuard;
use sbroad::errors::{Action, Entity, SbroadError};
use sbroad::executor::bucket::Buckets;
use sbroad::executor::engine::helpers::storage::{
unprepare, DQLStorageReturnFormat, StorageMetadata,
};
use sbroad::executor::engine::helpers::storage::{unprepare, StorageMetadata, StorageReturnFormat};
use sbroad::executor::engine::helpers::vshard::{get_random_bucket, CacheInfo};
use sbroad::executor::engine::helpers::{
self, execute_first_cacheable_request, execute_second_cacheable_request, read_or_prepare,
......@@ -247,7 +245,7 @@ impl Vshard for StorageRuntime {
let boxed_bytes: Box<dyn Any> = read_or_prepare::<Self, <Self as QueryCache>::Mutex>(
&mut locked_cache,
&mut info,
&DQLStorageReturnFormat::Raw,
&StorageReturnFormat::DqlRaw,
)?;
let bytes = boxed_bytes.downcast::<Vec<u8>>().map_err(|e| {
SbroadError::Invalid(
......
......@@ -42,7 +42,7 @@ use crate::tarantool::box_schema_version;
use crate::tier::Tier;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::error::{Error, IdOfInstance};
use crate::traft::op::Ddl;
use crate::traft::op::Dml;
use crate::traft::RaftEntryId;
......@@ -1844,6 +1844,16 @@ impl Instances {
Ok(res)
}
#[inline(always)]
pub fn by_uuid(&self, uuid: &str) -> Result<Option<Instance>> {
// FIXME: temporary, should be replace by get by uuid (uuid should become primary key)
let result = self
.all_instances()?
.into_iter()
.find(|instance| instance.uuid == uuid);
Ok(result)
}
/// Checks if an instance with `name` (see trait [`InstanceName`]) is present.
#[inline]
pub fn contains(&self, name: &impl InstanceName) -> Result<bool> {
......@@ -1927,7 +1937,7 @@ impl InstanceName for RaftId {
instances
.index_raft_id
.get(&[self])?
.ok_or(Error::NoSuchInstance(Ok(*self)))
.ok_or(Error::NoSuchInstance(IdOfInstance::RaftId(*self)))
}
}
......@@ -1937,7 +1947,7 @@ impl InstanceName for instance::InstanceName {
instances
.index_instance_name
.get(&[self])?
.ok_or_else(|| Error::NoSuchInstance(Err(self.clone())))
.ok_or_else(|| Error::NoSuchInstance(IdOfInstance::Name(self.clone())))
}
}
......
......@@ -99,8 +99,8 @@ pub enum Error {
Lua(#[from] LuaError),
#[error("{0}")]
Tarantool(#[from] ::tarantool::error::Error),
#[error("instance with {} not found", DisplayIdOfInstance(.0))]
NoSuchInstance(Result<RaftId, InstanceName>),
#[error("instance with {} not found", *.0)]
NoSuchInstance(IdOfInstance),
#[error("replicaset with {} \"{name}\" not found", if *.id_is_uuid { "uuid" } else { "name" })]
NoSuchReplicaset { name: String, id_is_uuid: bool },
#[error("tier with name \"{0}\" not found")]
......@@ -149,13 +149,20 @@ pub enum Error {
Other(Box<dyn std::error::Error>),
}
struct DisplayIdOfInstance<'a>(pub &'a Result<RaftId, InstanceName>);
impl std::fmt::Display for DisplayIdOfInstance<'_> {
#[derive(Debug)]
pub enum IdOfInstance {
RaftId(RaftId),
Name(InstanceName),
Uuid(String),
}
impl std::fmt::Display for IdOfInstance {
#[inline]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
match self.0 {
Ok(raft_id) => write!(f, "raft_id {raft_id}"),
Err(instance_name) => write!(f, "name \"{instance_name}\""),
match self {
IdOfInstance::RaftId(raft_id) => write!(f, "raft_id {raft_id}"),
IdOfInstance::Name(name) => write!(f, "name \"{name}\""),
IdOfInstance::Uuid(uuid) => write!(f, "uuid \"{uuid}\""),
}
}
}
......
......@@ -1278,6 +1278,25 @@ class Instance:
# Make it so we can call Instance.start later
self.process = None
def instance_info(self, timeout: int | float = 10) -> dict[str, Any]:
"""Call .proc_instance_info on the instance
and update the related properties on this object.
"""
info = self.call(".proc_instance_info", timeout=timeout)
assert isinstance(info, dict)
assert isinstance(info["raft_id"], int)
self.raft_id = info["raft_id"]
assert isinstance(info["name"], str)
self.name = info["name"]
assert isinstance(info["replicaset_name"], str)
self.replicaset_name = info["replicaset_name"]
return info
def wait_online(
self, timeout: int | float = 30, rps: int | float = 5, expected_incarnation=None
):
......@@ -1298,16 +1317,7 @@ class Instance:
raise ProcessDead("process was not started")
def fetch_current_state() -> Tuple[str, int]:
self.check_process_alive()
myself = self.call(".proc_instance_info")
assert isinstance(myself, dict)
assert isinstance(myself["raft_id"], int)
self.raft_id = myself["raft_id"]
assert isinstance(myself["name"], str)
self.name = myself["name"]
myself = self.instance_info()
assert isinstance(myself["current_state"], dict)
return (
......@@ -1448,7 +1458,7 @@ class Instance:
expected_status: str,
old_step_counter: int | None = None,
timeout: int | float = 10,
):
) -> int:
assert expected_status != "not a leader", "use another function"
def impl():
......@@ -1457,12 +1467,15 @@ class Instance:
if actual_status == "not a leader":
raise NotALeader("not a leader")
step_counter = info["governor_step_counter"]
if old_step_counter:
assert old_step_counter != info["governor_step_counter"]
assert old_step_counter != step_counter
assert actual_status == expected_status
Retriable(timeout=timeout, rps=1, fatal=NotALeader).call(impl)
return step_counter
return Retriable(timeout=timeout, rps=1, fatal=NotALeader).call(impl)
def promote_or_fail(self):
attempt = 0
......@@ -1613,7 +1626,7 @@ class Cluster:
self,
wait_online=True,
peers: list[str] | None = None,
instance_name: str | bool = True,
name: str | bool = True,
replicaset_name: str | None = None,
failure_domain=dict(),
init_replication_factor: int | None = None,
......@@ -1624,26 +1637,26 @@ class Cluster:
"""Add an `Instance` into the list of instances of the cluster and wait
for it to attain Online grade unless `wait_online` is `False`.
`instance_name` specifies how the instance's name is generated in the
`name` specifies how the instance's name is generated in the
following way:
- if `instance_name` is a string, it will be used as a value for the
- if `name` is a string, it will be used as a value for the
`--instance-name` command-line option.
- If `instance_name` is `True` (default), the `--instance-name` command-line
- If `name` is `True` (default), the `--instance-name` command-line
option will be generated by the pytest according to the instances
sequence number in cluster.
- If `instance_name` is `False`, the instance will be started
- If `name` is `False`, the instance will be started
without the `--instance-name` command-line option and the particular value
will be generated by the cluster.
"""
i = 1 + len(self.instances)
generated_instance_name: str | None
match instance_name:
case str() as iid:
generated_instance_name = iid
match name:
case str() as given_name:
generated_instance_name = given_name
case True:
generated_instance_name = f"i{i}"
case False:
......@@ -1694,7 +1707,7 @@ class Cluster:
def fail_to_add_instance(
self,
peers=None,
instance_name: str | bool = True,
name: str | bool = True,
failure_domain=dict(),
init_replication_factor: int | None = None,
tier: str = "storage",
......@@ -1702,7 +1715,7 @@ class Cluster:
instance = self.add_instance(
wait_online=False,
peers=peers,
instance_name=instance_name,
name=name,
failure_domain=failure_domain,
init_replication_factor=init_replication_factor,
tier=tier,
......@@ -1734,7 +1747,8 @@ class Cluster:
):
peer = peer if peer else target
assert self.service_password_file, "cannot expel without pico_service password"
assert target.name, "cannot expel without target instance name"
target_info = peer.call(".proc_instance_info", target.name)
target_uuid = target_info["uuid"]
# fmt: off
command: list[str] = [
......@@ -1743,7 +1757,7 @@ class Cluster:
"--cluster-name", target.cluster_name or "",
"--password-file", self.service_password_file,
"--auth-type", "chap-sha1",
target.name,
target_uuid,
]
# fmt: on
......@@ -2131,7 +2145,7 @@ def cargo_build(with_webui: bool = False) -> None:
eprint("Skipping cargo build")
return
features = ["error_injection", "sbroad-core/tracing"]
features = ["error_injection"]
if with_webui:
features.append("webui")
......
......@@ -381,7 +381,7 @@ def test_join_expel_instance(cluster: Cluster):
events = audit_i1.events()
audit = os.path.join(cluster.data_dir, "i2", "audit.log")
i2 = cluster.add_instance(instance_name="i2", audit=audit)
i2 = cluster.add_instance(name="i2", audit=audit)
join_instance = take_until_title(events, "join_instance")
assert join_instance is not None
......@@ -423,7 +423,7 @@ def test_join_connect_instance(cluster: Cluster):
i1 = cluster.add_instance(audit=audit)
audit = os.path.join(cluster.data_dir, "i2", "audit.log")
i2 = cluster.add_instance(instance_name="i2", audit=audit)
i2 = cluster.add_instance(name="i2", audit=audit)
i2.terminate()
events = AuditFile(i2.audit_flag_value).events()
......
......@@ -19,7 +19,7 @@ instance:
memory: 42069
"""
)
instance = cluster.add_instance(instance_name=False, wait_online=False)
instance = cluster.add_instance(name=False, wait_online=False)
instance.start()
instance.wait_online()
......@@ -124,7 +124,7 @@ instance:
def test_default_path_to_config_file(cluster: Cluster):
instance = cluster.add_instance(instance_name=False, wait_online=False)
instance = cluster.add_instance(name=False, wait_online=False)
# By default ./config.yaml will be used in the instance's current working directory
work_dir = cluster.data_dir + "/work-dir"
......
......@@ -137,3 +137,95 @@ def test_expel_timeout(cluster: Cluster):
cli.expect_exact("CRITICAL: connect timeout")
cli.expect_exact(pexpect.EOF)
def test_join_replicaset_after_expel(cluster: Cluster):
cluster.set_config_file(
yaml="""
cluster:
name: test
tier:
raft:
replication_factor: 1
storage:
replication_factor: 2
"""
)
cluster.set_service_password("secret")
[leader] = cluster.deploy(instance_count=1, tier="raft")
# Deploy a cluster with at least one full replicaset
storage1 = cluster.add_instance(name="storage1", wait_online=True, tier="storage")
assert storage1.replicaset_name == "r2"
storage2 = cluster.add_instance(name="storage2", wait_online=True, tier="storage")
assert storage2.replicaset_name == "r2"
storage3 = cluster.add_instance(name="storage3", wait_online=True, tier="storage")
assert storage3.replicaset_name == "r3"
# Expel one of the replicas in the full replicaset, wait until the change is finalized
counter = leader.governor_step_counter()
cluster.expel(storage2, peer=leader)
leader.wait_governor_status("idle", old_step_counter=counter)
# Add another instance, it should be assigned to the no longer filled replicaset
storage4 = cluster.add_instance(name="storage4", wait_online=True, tier="storage")
assert storage4.replicaset_name == "r2"
# Attempt to expel an offline replicaset
storage3.terminate()
cluster.expel(storage3, peer=leader)
# Offline replicasets aren't allowed to be expelled,
# so the cluster is blocked attempting to rebalance
counter = leader.wait_governor_status("transfer buckets from replicaset")
# The replicaset is in progress of being expelled
[[r3_state, r3_old_uuid]] = leader.sql(
""" SELECT state, "uuid" FROM _pico_replicaset WHERE name = 'r3' """
)
assert r3_state == "to-be-expelled"
# Add another instance
storage5 = cluster.add_instance(name="storage5", tier="storage", wait_online=False)
storage5.start()
# NOTE: wait_online doesn't work because bucket rebalancing has higher priortiy
leader.wait_governor_status(
"transfer buckets from replicaset", old_step_counter=counter
)
# Update the fields on the object
storage5.instance_info()
# Instance is added to a new replicaset because 'r3' is not yet avaliable
assert storage5.replicaset_name == "r4"
# Try adding an instance to 'r3' directly, which is not allowed
storage6 = cluster.add_instance(
name="storage6", replicaset_name="r3", tier="storage", wait_online=False
)
lc = log_crawler(storage6, "cannot join replicaset which is being expelled")
storage6.fail_to_start()
lc.wait_matched()
# Wake up the instance expelled instance so that replicaset is finally expelled
storage3.start()
# The buckets are finally able to be rebalanced
leader.wait_governor_status("idle")
# The replicaset is finally expelled
[[r3_state]] = leader.sql(
""" SELECT state FROM _pico_replicaset WHERE name = 'r3' """
)
assert r3_state == "expelled"
# Now it's ok to reuse the 'r3' replicaset name, but it will be a different replicaset
cluster.add_instance(replicaset_name="r3", tier="storage", wait_online=True)
# The new replicaset is created
[[r3_state, r3_new_uuid]] = leader.sql(
""" SELECT state, "uuid" FROM _pico_replicaset WHERE name = 'r3' """
)
assert r3_state != "expelled"
assert r3_old_uuid != r3_new_uuid