From 289f9a5a962050d87a6a6aade1e382c25f8ef852 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 13 Dec 2022 12:09:15 +0300 Subject: [PATCH] refactor(governor): only promote new master if it's chosen --- src/governor/mod.rs | 131 ++++++++++++++++++++------------------------ 1 file changed, 58 insertions(+), 73 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 2ee10b2a0a..71a961d56f 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -111,37 +111,72 @@ impl Loop { } let replicaset_id = &instance.replicaset_id; - // choose a new replicaset master if needed - let res = (|| -> Result<_> { - let replicaset = storage.replicasets.get(replicaset_id)?; - if replicaset - .map(|r| r.master_id == instance.instance_id) - .unwrap_or(false) - { - let new_master = - maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id); - if let Some(instance) = new_master { - let mut ops = UpdateOps::new(); - ops.assign("master_id", &instance.instance_id)?; - - let op = - OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?; - tlog!(Info, "proposing replicaset master change"; "op" => ?op); - // TODO: don't hard code the timeout - node.propose_and_wait(op, Duration::from_secs(3))??; - } else { - tlog!(Info, "skip proposing replicaset master change"); - } + let mut new_master = None; + // choose a new replicaset master if needed and promote it + let res: Result<_> = async { + match storage.replicasets.get(replicaset_id)? { + Some(replicaset) if replicaset.master_id == instance.instance_id => {} + _ => return Ok(()), } + new_master = + maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id); + let Some(new_master) = new_master else { + return Ok(()); + }; + tlog!(Info, "calling rpc::replication::promote"; + "instance_id" => %new_master.instance_id, + ); + let req = replication::promote::Request { + term, + commit: raft_storage.commit()?.unwrap(), + timeout: Self::SYNC_TIMEOUT, + }; + pool.call(&new_master.instance_id, &req)? + // TODO: don't hard code timeout + .timeout(Duration::from_secs(3)) + .await??; + + Ok(()) + } + .await; + if let Err(e) = res { + tlog!(Warning, + "failed calling rpc::replication::promote: {e}"; + "replicaset_id" => %replicaset_id, + ); + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap(); + return Continue; + } + + // update replicaset entry if needed + let res: Result<_> = async { + let Some(new_master) = new_master else { return Ok(()); }; + tlog!(Info, "proposing replicaset master change"; + "master_id" => %new_master.instance_id, + "replicaset_id" => %replicaset_id, + ); + + let mut ops = UpdateOps::new(); + ops.assign("master_id", &new_master.instance_id)?; + + let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?; + // TODO: don't hard code the timeout + node.propose_and_wait(op, Duration::from_secs(3))??; Ok(()) - })(); + } + .await; if let Err(e) = res { tlog!(Warning, "failed proposing replicaset master change: {e}"); // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap(); + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap(); return Continue; } + if new_master.is_none() { + tlog!(Info, "skip proposing replicaset master change"); + } + // reconfigure vshard storages and routers let res: Result<_> = async { let commit = raft_storage.commit()?.unwrap(); @@ -197,56 +232,6 @@ impl Loop { return Continue; } - let replicaset_instances = storage - .instances - .replicaset_instances(replicaset_id) - .expect("storage error") - .filter(|instance| !instance.is_expelled()) - .collect::<Vec<_>>(); - let may_respond = replicaset_instances - .iter() - .filter(|instance| instance.may_respond()); - // Check if it makes sense to call box.ctl.promote, - // otherwise we risk unpredictable delays - if replicaset_instances.len() / 2 + 1 > may_respond.count() { - tlog!(Warning, - "replicaset lost quorum"; - "replicaset_id" => %replicaset_id, - ); - return Continue; - } - - let res: Result<_> = async { - // Promote the replication leader again - // because of tarantool bugs - if let Some(replicaset) = storage.replicasets.get(replicaset_id)? { - tlog!(Info, - "calling rpc::replication::promote"; - "instance_id" => %replicaset.master_id - ); - let commit = raft_storage.commit()?.unwrap(); - pool.call( - &replicaset.master_id, - &replication::promote::Request { - term, - commit, - timeout: Self::SYNC_TIMEOUT, - }, - )? - // TODO: don't hard code timeout - .timeout(Duration::from_secs(3)) - .await??; - } - Ok(()) - } - .await; - if let Err(e) = res { - tlog!(Warning, - "failed calling rpc::replication::promote: {e}"; - "replicaset_id" => %replicaset_id, - ); - } - return Continue; } -- GitLab