diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 7dd84db70260591fcf97636bcfeee8ff5b7e1291..fb3c25b2a60e4710fb3e509052f0fbe51fc49a80 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -1,4 +1,3 @@ -use std::borrow::Cow; use std::collections::HashMap; use std::iter::repeat; use std::rc::Rc; @@ -25,6 +24,7 @@ use crate::traft::OpDML; use crate::traft::Result; use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; use crate::traft::{Instance, Replicaset}; +use crate::unwrap_ok_or; pub(crate) mod cc; pub(crate) mod migration; @@ -302,6 +302,94 @@ impl Loop { return Continue; } + //////////////////////////////////////////////////////////////////////// + // create new replicaset + if let Some(to_create_replicaset) = instances + .iter() + .filter(|instance| { + instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online) + }) + .find_map( + |instance| match storage.replicasets.get(&instance.replicaset_id) { + Err(e) => Some(Err(e)), + Ok(None) => Some(Ok(instance)), + Ok(_) => None, + }, + ) + { + // TODO: what if this is not actually the replicaset bootstrap leader? + let Instance { + instance_id, + replicaset_id, + replicaset_uuid, + .. + } = unwrap_ok_or!(to_create_replicaset, + Err(e) => { + tlog!(Warning, "{e}"); + + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)).unwrap(); + return Continue; + } + ); + + // TODO: change `Info` to `Debug` + tlog!(Info, "promoting new replicaset master"; + "master_id" => %instance_id, + "replicaset_id" => %replicaset_id, + ); + let res: Result<_> = async { + let req = replication::promote::Request { + term, + commit: raft_storage.commit()?.unwrap(), + timeout: Self::SYNC_TIMEOUT, + }; + let replication::promote::Response {} = pool + // TODO: don't hard code the timeout + .call_and_wait_timeout(instance_id, req, Duration::from_secs(3))?; + Ok(()) + } + .await; + if let Err(e) = res { + tlog!(Warning, "failed promoting new replicaset master: {e}"; + "master_id" => %instance_id, + "replicaset_id" => %replicaset_id, + ); + return Continue; + } + + // TODO: change `Info` to `Debug` + tlog!(Info, "creating new replicaset"; + "master_id" => %instance_id, + "replicaset_id" => %replicaset_id, + ); + let res: Result<_> = async { + let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?; + let req = OpDML::insert( + ClusterwideSpace::Replicaset, + &Replicaset { + replicaset_id: replicaset_id.clone(), + replicaset_uuid: replicaset_uuid.clone(), + master_id: instance_id.clone(), + weight: if vshard_bootstrapped { 0. } else { 1. }, + current_schema_version: 0, + }, + )?; + // TODO: don't hard code the timeout + node.propose_and_wait(req, Duration::from_secs(3))??; + Ok(()) + } + .await; + if let Err(e) = res { + tlog!(Warning, "failed creating new replicaset: {e}"; + "replicaset_id" => %replicaset_id, + ); + return Continue; + } + + return Continue; + } + //////////////////////////////////////////////////////////////////////// // replication let to_replicate = instances @@ -357,50 +445,6 @@ impl Loop { return Continue; } - let res = (|| -> Result<_> { - let master_id = - if let Some(replicaset) = storage.replicasets.get(&instance.replicaset_id)? { - Cow::Owned(replicaset.master_id) - } else { - let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?; - let req = OpDML::insert( - ClusterwideSpace::Replicaset, - &Replicaset { - replicaset_id: instance.replicaset_id.clone(), - replicaset_uuid: instance.replicaset_uuid.clone(), - master_id: instance.instance_id.clone(), - weight: if vshard_bootstrapped { 0. } else { 1. }, - current_schema_version: 0, - }, - )?; - // TODO: don't hard code the timeout - node.propose_and_wait(req, Duration::from_secs(3))??; - Cow::Borrowed(&instance.instance_id) - }; - - let commit = raft_storage.commit()?.unwrap(); - pool.call_and_wait_timeout( - &*master_id, - replication::promote::Request { - term, - commit, - timeout: Self::SYNC_TIMEOUT, - }, - // TODO: don't hard code timeout - Duration::from_secs(3), - )?; - tlog!(Debug, "promoted replicaset master"; - "instance_id" => %master_id, - "replicaset_id" => %instance.replicaset_id, - ); - Ok(()) - })(); - if let Err(e) = res { - tlog!(Warning, "failed to promote replicaset master: {e}"; - "replicaset_id" => %replicaset_id, - ); - } - tlog!(Info, "configured replication"; "replicaset_id" => %replicaset_id); return Continue; diff --git a/test/int/test_basics.py b/test/int/test_basics.py index 01086399914b2b9e122bcd63bc60e4c58e946545..c7c6f394406e83bfe432a9b32d1260e4f6487f82 100644 --- a/test/int/test_basics.py +++ b/test/int/test_basics.py @@ -225,8 +225,8 @@ def test_raft_log(instance: Instance): | 6 | 2 | |-| | 7 | 2 |1.1.1|PersistInstance(i1, 1, r1, Offline(0) -> Online(1), {b})| | 8 | 2 |1.1.2|PersistInstance(i1, 1, r1, RaftSynced(1) -> Online(1), {b})| -| 9 | 2 |1.1.3|PersistInstance(i1, 1, r1, Replicated(1) -> Online(1), {b})| -| 10 | 2 |1.1.4|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,0])| +| 9 | 2 |1.1.3|Insert(_picodata_replicaset, ["r1","e0df68c5-e7f9-395f-86b3-30ad9e1b7b07","i1",1.0,0])| +| 10 | 2 |1.1.4|PersistInstance(i1, 1, r1, Replicated(1) -> Online(1), {b})| | 11 | 2 |1.1.5|PersistInstance(i1, 1, r1, ShardingInitialized(1) -> Online(1), {b})| | 12 | 2 |1.1.6|Replace(_picodata_property, ["vshard_bootstrapped",true])| | 13 | 2 |1.1.7|PersistInstance(i1, 1, r1, Online(1), {b})|