Skip to content
Snippets Groups Projects
Commit 289f9a5a authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

refactor(governor): only promote new master if it's chosen

parent d291744c
No related branches found
No related tags found
1 merge request!428refactor(governor): only promote new master if it's chosen
Pipeline #14315 passed
......@@ -111,37 +111,72 @@ impl Loop {
}
let replicaset_id = &instance.replicaset_id;
// choose a new replicaset master if needed
let res = (|| -> Result<_> {
let replicaset = storage.replicasets.get(replicaset_id)?;
if replicaset
.map(|r| r.master_id == instance.instance_id)
.unwrap_or(false)
{
let new_master =
maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id);
if let Some(instance) = new_master {
let mut ops = UpdateOps::new();
ops.assign("master_id", &instance.instance_id)?;
let op =
OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?;
tlog!(Info, "proposing replicaset master change"; "op" => ?op);
// TODO: don't hard code the timeout
node.propose_and_wait(op, Duration::from_secs(3))??;
} else {
tlog!(Info, "skip proposing replicaset master change");
}
let mut new_master = None;
// choose a new replicaset master if needed and promote it
let res: Result<_> = async {
match storage.replicasets.get(replicaset_id)? {
Some(replicaset) if replicaset.master_id == instance.instance_id => {}
_ => return Ok(()),
}
new_master =
maybe_responding(&instances).find(|p| p.replicaset_id == replicaset_id);
let Some(new_master) = new_master else {
return Ok(());
};
tlog!(Info, "calling rpc::replication::promote";
"instance_id" => %new_master.instance_id,
);
let req = replication::promote::Request {
term,
commit: raft_storage.commit()?.unwrap(),
timeout: Self::SYNC_TIMEOUT,
};
pool.call(&new_master.instance_id, &req)?
// TODO: don't hard code timeout
.timeout(Duration::from_secs(3))
.await??;
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning,
"failed calling rpc::replication::promote: {e}";
"replicaset_id" => %replicaset_id,
);
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap();
return Continue;
}
// update replicaset entry if needed
let res: Result<_> = async {
let Some(new_master) = new_master else { return Ok(()); };
tlog!(Info, "proposing replicaset master change";
"master_id" => %new_master.instance_id,
"replicaset_id" => %replicaset_id,
);
let mut ops = UpdateOps::new();
ops.assign("master_id", &new_master.instance_id)?;
let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], ops)?;
// TODO: don't hard code the timeout
node.propose_and_wait(op, Duration::from_secs(3))??;
Ok(())
})();
}
.await;
if let Err(e) = res {
tlog!(Warning, "failed proposing replicaset master change: {e}");
// TODO: don't hard code timeout
event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap();
return Continue;
}
if new_master.is_none() {
tlog!(Info, "skip proposing replicaset master change");
}
// reconfigure vshard storages and routers
let res: Result<_> = async {
let commit = raft_storage.commit()?.unwrap();
......@@ -197,56 +232,6 @@ impl Loop {
return Continue;
}
let replicaset_instances = storage
.instances
.replicaset_instances(replicaset_id)
.expect("storage error")
.filter(|instance| !instance.is_expelled())
.collect::<Vec<_>>();
let may_respond = replicaset_instances
.iter()
.filter(|instance| instance.may_respond());
// Check if it makes sense to call box.ctl.promote,
// otherwise we risk unpredictable delays
if replicaset_instances.len() / 2 + 1 > may_respond.count() {
tlog!(Warning,
"replicaset lost quorum";
"replicaset_id" => %replicaset_id,
);
return Continue;
}
let res: Result<_> = async {
// Promote the replication leader again
// because of tarantool bugs
if let Some(replicaset) = storage.replicasets.get(replicaset_id)? {
tlog!(Info,
"calling rpc::replication::promote";
"instance_id" => %replicaset.master_id
);
let commit = raft_storage.commit()?.unwrap();
pool.call(
&replicaset.master_id,
&replication::promote::Request {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
},
)?
// TODO: don't hard code timeout
.timeout(Duration::from_secs(3))
.await??;
}
Ok(())
}
.await;
if let Err(e) = res {
tlog!(Warning,
"failed calling rpc::replication::promote: {e}";
"replicaset_id" => %replicaset_id,
);
}
return Continue;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment