diff --git a/src/traft/node.rs b/src/traft/node.rs index 2fe4a36c9d2ec83ad5f94f2a8ff7484d423432f2..301ace677cf2f53d6a469fa94f23a8292a4975de 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -935,7 +935,6 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { let peers = storage.peers.all_peers().unwrap(); let term = status.get().term; let cluster_id = storage.raft.cluster_id().unwrap().unwrap(); - let commit = storage.raft.commit().unwrap().unwrap(); let node = global().expect("must be initialized"); //////////////////////////////////////////////////////////////////////// @@ -994,6 +993,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { } } + let commit = storage.raft.commit()?.unwrap(); let reqs = maybe_responding(&peers) .filter(|peer| { peer.current_grade == CurrentGradeVariant::ShardingInitialized @@ -1070,6 +1070,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { "calling rpc::replication::promote"; "instance_id" => %replicaset.master_id ); + let commit = storage.raft.commit()?.unwrap(); pool.call_and_wait_timeout( &replicaset.master_id, replication::promote::Request { @@ -1103,6 +1104,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { }); if let Some(peer) = to_sync { let (rx, tx) = fiber::Channel::new(1).into_clones(); + let commit = storage.raft.commit().unwrap().unwrap(); pool.call( &peer.raft_id, sync::Request { @@ -1163,6 +1165,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { .collect::<Vec<_>>(); let res = (|| -> traft::Result<_> { + let commit = storage.raft.commit()?.unwrap(); let reqs = replicaset_iids .iter() .cloned() @@ -1219,6 +1222,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { Cow::Borrowed(&peer.instance_id) }; + let commit = storage.raft.commit()?.unwrap(); pool.call_and_wait_timeout( &*master_id, replication::promote::Request { @@ -1254,6 +1258,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { if let Some(peer) = to_shard { let res = (|| -> traft::Result<()> { let vshard_bootstrapped = storage.state.vshard_bootstrapped()?; + let commit = storage.raft.commit()?.unwrap(); let reqs = maybe_responding(&peers).map(|peer| { ( peer.instance_id.clone(), @@ -1310,6 +1315,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { // because of tarantool bugs let replicasets = storage.replicasets.iter()?; let masters = replicasets.map(|r| r.master_id).collect::<HashSet<_>>(); + let commit = storage.raft.commit()?.unwrap(); let reqs = maybe_responding(&peers) .filter(|peer| masters.contains(&peer.instance_id)) .map(|peer| peer.instance_id.clone()) @@ -1359,6 +1365,7 @@ fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) { } let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone()); + let commit = storage.raft.commit()?.unwrap(); let reqs = peer_ids.zip(repeat(sharding::Request { term, commit,