From 93309ba7c8988127194e5bb64bd4552a4627816a Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 12 Dec 2022 15:14:56 +0300 Subject: [PATCH] refactor(network): remove ConnectionPool::call_and_wait[_timeout] --- src/governor/mod.rs | 79 ++++++++++++++++++++++++-------------------- src/traft/network.rs | 37 --------------------- 2 files changed, 43 insertions(+), 73 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 64048066a2..02f6fe8267 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -217,7 +217,7 @@ impl Loop { return Continue; } - let res = (|| -> Result<_> { + let res: Result<_> = async { // Promote the replication leader again // because of tarantool bugs if let Some(replicaset) = storage.replicasets.get(replicaset_id)? { @@ -226,19 +226,21 @@ impl Loop { "instance_id" => %replicaset.master_id ); let commit = raft_storage.commit()?.unwrap(); - pool.call_and_wait_timeout( + pool.call( &replicaset.master_id, &replication::promote::Request { term, commit, timeout: Self::SYNC_TIMEOUT, }, - // TODO: don't hard code timeout - Duration::from_secs(3), - )?; + )? + // TODO: don't hard code timeout + .timeout(Duration::from_secs(3)) + .await??; } Ok(()) - })(); + } + .await; if let Err(e) = res { tlog!(Warning, "failed calling rpc::replication::promote: {e}"; @@ -332,8 +334,10 @@ impl Loop { timeout: Self::SYNC_TIMEOUT, }; let replication::promote::Response {} = pool + .call(instance_id, &req)? // TODO: don't hard code the timeout - .call_and_wait_timeout(instance_id, &req, Duration::from_secs(3))?; + .timeout(Duration::from_secs(3)) + .await??; Ok(()) } .await; @@ -613,35 +617,38 @@ impl Loop { timeout: Self::SYNC_TIMEOUT, migration_id: migration.id, }; - let res = pool.call_and_wait(&instance.raft_id, &req); - match res { - Ok(_) => { - let mut ops = UpdateOps::new(); - ops.assign("current_schema_version", migration.id).unwrap(); - let op = OpDML::update( - ClusterwideSpace::Replicaset, - &[replicaset.replicaset_id.clone()], - ops, - ) - .unwrap(); - node.propose_and_wait(op, Duration::MAX).unwrap().unwrap(); - tlog!( - Info, - "Migration {0} applied to replicaset {1}", - migration.id, - replicaset.replicaset_id - ); - } - Err(e) => { - tlog!( - Warning, - "Could not apply migration {0} to replicaset {1}, error: {2}", - migration.id, - replicaset.replicaset_id, - e - ); - return Continue; - } + let res: Result<_> = async { + let rpc::migration::apply::Response {} = pool + .call(&instance.raft_id, &req)? + // TODO: don't hard code timeout + .timeout(Duration::from_secs(3)) + .await??; + let mut ops = UpdateOps::new(); + ops.assign("current_schema_version", migration.id)?; + let op = OpDML::update( + ClusterwideSpace::Replicaset, + &[replicaset.replicaset_id.clone()], + ops, + )?; + node.propose_and_wait(op, Duration::MAX)??; + tlog!( + Info, + "Migration {0} applied to replicaset {1}", + migration.id, + replicaset.replicaset_id + ); + Ok(()) + } + .await; + if let Err(e) = res { + tlog!( + Warning, + "Could not apply migration {0} to replicaset {1}, error: {2}", + migration.id, + replicaset.replicaset_id, + e + ); + return Continue; } } } diff --git a/src/traft/network.rs b/src/traft/network.rs index 3af22f889c..d7b2a77683 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -464,43 +464,6 @@ impl ConnectionPool { self.get_or_create_by_raft_id(msg.to)?.send(msg) } - /// Send a request to instance with `id` (see `IdOfInstance`) and wait for the result. - /// - /// If the request failed, it's a responsibility of the caller - /// to re-send it later. - /// - /// **This function yields.** - #[allow(dead_code)] - pub fn call_and_wait_timeout<R>( - &mut self, - id: &impl IdOfInstance, - req: &R, - timeout: Duration, - ) -> Result<R::Response> - where - R: Request, - { - let (rx, tx) = fiber::Channel::new(1).into_clones(); - id.get_or_create_in(self)? - .rpc(req, move |res| tx.send(res).unwrap()); - rx.recv_timeout(timeout).map_err(|_| Error::Timeout)? - } - - /// Send a request to instance with `id` (see `InstanceId`) and wait for the result. - /// - /// If the request failed, it's a responsibility of the caller - /// to re-send it later. - /// - /// **This function yields.** - #[allow(dead_code)] - #[inline(always)] - pub fn call_and_wait<R>(&mut self, id: &impl IdOfInstance, req: &R) -> Result<R::Response> - where - R: Request, - { - self.call_and_wait_timeout(id, req, Duration::MAX) - } - /// Send a request to instance with `id` (see `IdOfInstance`) returning a /// future. /// -- GitLab