Skip to content
Snippets Groups Projects
Commit 110f2057 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix(governor): used to synchronize with wrong commit index sometimes

parent 65444759
No related branches found
No related tags found
1 merge request!361Test/sharding is failsafe
......@@ -935,7 +935,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
let peers = storage.peers.all_peers().unwrap();
let term = status.get().term;
let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
let commit = storage.raft.commit().unwrap().unwrap();
let node = global().expect("must be initialized");
////////////////////////////////////////////////////////////////////////
......@@ -994,6 +993,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
}
}
let commit = storage.raft.commit()?.unwrap();
let reqs = maybe_responding(&peers)
.filter(|peer| {
peer.current_grade == CurrentGradeVariant::ShardingInitialized
......@@ -1070,6 +1070,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
"calling rpc::replication::promote";
"instance_id" => %replicaset.master_id
);
let commit = storage.raft.commit()?.unwrap();
pool.call_and_wait_timeout(
&replicaset.master_id,
replication::promote::Request {
......@@ -1103,6 +1104,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
});
if let Some(peer) = to_sync {
let (rx, tx) = fiber::Channel::new(1).into_clones();
let commit = storage.raft.commit().unwrap().unwrap();
pool.call(
&peer.raft_id,
sync::Request {
......@@ -1163,6 +1165,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
.collect::<Vec<_>>();
let res = (|| -> traft::Result<_> {
let commit = storage.raft.commit()?.unwrap();
let reqs = replicaset_iids
.iter()
.cloned()
......@@ -1219,6 +1222,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
Cow::Borrowed(&peer.instance_id)
};
let commit = storage.raft.commit()?.unwrap();
pool.call_and_wait_timeout(
&*master_id,
replication::promote::Request {
......@@ -1254,6 +1258,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
if let Some(peer) = to_shard {
let res = (|| -> traft::Result<()> {
let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
let commit = storage.raft.commit()?.unwrap();
let reqs = maybe_responding(&peers).map(|peer| {
(
peer.instance_id.clone(),
......@@ -1310,6 +1315,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
// because of tarantool bugs
let replicasets = storage.replicasets.iter()?;
let masters = replicasets.map(|r| r.master_id).collect::<HashSet<_>>();
let commit = storage.raft.commit()?.unwrap();
let reqs = maybe_responding(&peers)
.filter(|peer| masters.contains(&peer.instance_id))
.map(|peer| peer.instance_id.clone())
......@@ -1359,6 +1365,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
}
let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
let commit = storage.raft.commit()?.unwrap();
let reqs = peer_ids.zip(repeat(sharding::Request {
term,
commit,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment