From cec405ef00c7a8832f84843380262832223b4a5c Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Wed, 14 Dec 2022 15:59:29 +0300 Subject: [PATCH] refactor(governor): don't pass storages into action_plan --- src/governor/mod.rs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index c8471b4590..ab4cc59d33 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -22,6 +22,7 @@ use crate::traft::rpc::{replication, sharding, sync, update_instance}; use crate::traft::InstanceId; use crate::traft::OpDML; use crate::traft::RaftId; +use crate::traft::RaftIndex; use crate::traft::RaftTerm; use crate::traft::ReplicasetId; use crate::traft::Result; @@ -65,19 +66,19 @@ impl Loop { .collect(); let term = status.get().term; + let commit = raft_storage.commit().unwrap().unwrap(); let cluster_id = raft_storage.cluster_id().unwrap().unwrap(); let node = global().expect("must be initialized"); let plan = action_plan( term, + commit, cluster_id.clone(), instances, &voters, &learners, &replicasets, node.raft_id, - storage, - raft_storage, ); let plan = unwrap_ok_or!(plan, Err(e) => { @@ -213,7 +214,8 @@ impl Loop { .await; if let Err(e) = res { tlog!(Warning, "failed syncing raft log: {e}"; "instance_id" => %instance_id); - event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)).unwrap(); + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(250)) + .unwrap(); return Continue; } } @@ -638,14 +640,13 @@ impl Loop { #[allow(clippy::too_many_arguments)] fn action_plan<'i>( term: RaftTerm, + commit: RaftIndex, cluster_id: String, instances: &'i [Instance], voters: &[RaftId], learners: &[RaftId], replicasets: &HashMap<&ReplicasetId, &'i Replicaset>, my_raft_id: RaftId, - _storage: &Clusterwide, - raft_storage: &RaftSpaceAccess, ) -> Result<Plan<'i>> { //////////////////////////////////////////////////////////////////////////// // conf change @@ -702,7 +703,7 @@ fn action_plan<'i>( if let Some(to) = new_master { let rpc = replication::promote::Request { term, - commit: raft_storage.commit()?.unwrap(), + commit, timeout: Loop::SYNC_TIMEOUT, }; let mut ops = UpdateOps::new(); @@ -729,7 +730,7 @@ fn action_plan<'i>( .collect(); let rpc = sharding::Request { term, - commit: raft_storage.commit()?.unwrap(), + commit, timeout: Loop::SYNC_TIMEOUT, }; let req = update_instance::Request::new(instance_id.clone(), cluster_id) @@ -750,7 +751,7 @@ fn action_plan<'i>( }) = to_sync { let rpc = sync::Request { - commit: raft_storage.commit()?.unwrap(), + commit, timeout: Loop::SYNC_TIMEOUT, }; let req = update_instance::Request::new(instance_id.clone(), cluster_id) -- GitLab