From 11b8766a2285eb718e75527c3ad10148d0fb5533 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Mon, 12 Dec 2022 12:23:37 +0300 Subject: [PATCH] refactor(network): async fn ConnectionPool::call --- src/governor/mod.rs | 138 ++++++++++++++++--------------------------- src/traft/error.rs | 7 +++ src/traft/network.rs | 36 +++++++---- 3 files changed, 84 insertions(+), 97 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index fb3c25b2a6..64048066a2 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -1,18 +1,16 @@ use std::collections::HashMap; use std::iter::repeat; -use std::rc::Rc; use std::time::Duration; use ::tarantool::fiber; +use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::watch; use ::tarantool::space::UpdateOps; -use ::tarantool::util::IntoClones as _; use crate::event::{self, Event}; use crate::r#loop::FlowControl::{self, Continue}; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::tlog; -use crate::traft::error::Error; use crate::traft::network::{ConnectionPool, IdOfInstance}; use crate::traft::node::global; use crate::traft::node::Status; @@ -26,6 +24,8 @@ use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant}; use crate::traft::{Instance, Replicaset}; use crate::unwrap_ok_or; +use futures::future::join_all; + pub(crate) mod cc; pub(crate) mod migration; @@ -143,7 +143,7 @@ impl Loop { } // reconfigure vshard storages and routers - let res = (|| -> Result<_> { + let res: Result<_> = async { let commit = raft_storage.commit()?.unwrap(); let reqs = maybe_responding(&instances) .filter(|instance| { @@ -166,12 +166,13 @@ impl Loop { ) }); // TODO: don't hard code timeout - let res = call_all(pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3)).await?; for (_, resp) in res { let sharding::Response {} = resp?; } Ok(()) - })(); + } + .await; if let Err(e) = res { tlog!(Warning, "failed calling rpc::sharding: {e}"); // TODO: don't hard code timeout @@ -227,7 +228,7 @@ impl Loop { let commit = raft_storage.commit()?.unwrap(); pool.call_and_wait_timeout( &replicaset.master_id, - replication::promote::Request { + &replication::promote::Request { term, commit, timeout: Self::SYNC_TIMEOUT, @@ -256,47 +257,33 @@ impl Loop { instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online) || instance.is_reincarnated() }); - if let Some(instance) = to_sync { - let (rx, tx) = fiber::Channel::new(1).into_clones(); - let commit = raft_storage.commit().unwrap().unwrap(); - pool.call( - &instance.raft_id, - sync::Request { - commit, + if let Some(Instance { + instance_id, + target_grade, + .. + }) = to_sync + { + // TODO: change `Info` to `Debug` + tlog!(Info, "syncing raft log"; "instance_id" => %instance_id); + let res = async { + let req = sync::Request { + commit: raft_storage.commit().unwrap().unwrap(), timeout: Self::SYNC_TIMEOUT, - }, - move |res| tx.send(res).expect("mustn't fail"), - ) - .expect("shouldn't fail"); - let res = rx.recv().expect("ought not fail"); - let res = res.and_then(|sync::Response { commit }| { + }; + let sync::Response { commit } = pool.call(instance_id, &req)?.await?; // TODO: change `Info` to `Debug` - tlog!(Info, "instance synced"; - "commit" => commit, - "instance_id" => &*instance.instance_id, - ); + tlog!(Info, "instance's commit index is {commit}"; "instance_id" => %instance_id); - let req = update_instance::Request::new(instance.instance_id.clone(), cluster_id) - .with_current_grade(CurrentGrade::raft_synced( - instance.target_grade.incarnation, - )); - global() - .expect("can't be deinitialized") - .handle_update_instance_request_and_wait(req) - }); - match res { - Ok(()) => { - tlog!(Info, "raft sync processed"); - } - Err(e) => { - tlog!(Warning, "raft sync failed: {e}"; - "instance_id" => %instance.instance_id, - ); + let req = update_instance::Request::new(instance_id.clone(), cluster_id) + .with_current_grade(CurrentGrade::raft_synced(target_grade.incarnation)); + node.handle_update_instance_request_and_wait(req) + } + .await; + if let Err(e) = res { + tlog!(Warning, "failed syncing raft log: {e}"; "instance_id" => %instance_id); - // TODO: don't hard code timeout - event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)) - .unwrap(); - } + // TODO: don't hard code timeout + event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300)).unwrap(); } return Continue; @@ -346,7 +333,7 @@ impl Loop { }; let replication::promote::Response {} = pool // TODO: don't hard code the timeout - .call_and_wait_timeout(instance_id, req, Duration::from_secs(3))?; + .call_and_wait_timeout(instance_id, &req, Duration::from_secs(3))?; Ok(()) } .await; @@ -406,7 +393,7 @@ impl Loop { .map(|instance| instance.instance_id.clone()) .collect::<Vec<_>>(); - let res = (|| -> Result<_> { + let res: Result<_> = async { let commit = raft_storage.commit()?.unwrap(); let reqs = replicaset_iids .iter() @@ -419,7 +406,7 @@ impl Loop { replicaset_id: replicaset_id.clone(), })); // TODO: don't hard code timeout - let res = call_all(pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3)).await?; for (instance_id, resp) in res { let replication::Response { lsn } = resp?; @@ -437,7 +424,8 @@ impl Loop { node.handle_update_instance_request_and_wait(req)?; Ok(()) - })(); + } + .await; if let Err(e) = res { tlog!(Warning, "failed to configure replication: {e}"); // TODO: don't hard code timeout @@ -456,7 +444,7 @@ impl Loop { instance.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online) }); if let Some(instance) = to_shard { - let res = (|| -> Result<()> { + let res: Result<_> = async { let vshard_bootstrapped = storage.properties.vshard_bootstrapped()?; let commit = raft_storage.commit()?.unwrap(); let reqs = maybe_responding(&instances).map(|instance| { @@ -471,7 +459,7 @@ impl Loop { ) }); // TODO: don't hard code timeout - let res = call_all(pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3)).await?; for (instance_id, resp) in res { let sharding::Response {} = resp?; @@ -502,7 +490,8 @@ impl Loop { } Ok(()) - })(); + } + .await; if let Err(e) = res { tlog!(Warning, "failed to initialize sharding: {e}"); // TODO: don't hard code timeout @@ -527,7 +516,7 @@ impl Loop { let res = if let Some(added_weights) = get_weight_changes(maybe_responding(&instances), storage) { - (|| -> Result<()> { + async { for (replicaset_id, weight) in added_weights { let mut ops = UpdateOps::new(); ops.assign("weight", weight)?; @@ -548,7 +537,7 @@ impl Loop { bootstrap: false, })); // TODO: don't hard code timeout - let res = call_all(pool, reqs, Duration::from_secs(3))?; + let res = call_all(pool, reqs, Duration::from_secs(3)).await?; for (instance_id, resp) in res { resp?; @@ -563,7 +552,8 @@ impl Loop { )); node.handle_update_instance_request_and_wait(req)?; Ok(()) - })() + } + .await } else { (|| -> Result<()> { let to_online = instances.iter().filter(|instance| { @@ -623,7 +613,7 @@ impl Loop { timeout: Self::SYNC_TIMEOUT, migration_id: migration.id, }; - let res = pool.call_and_wait(&instance.raft_id, req); + let res = pool.call_and_wait(&instance.raft_id, &req); match res { Ok(_) => { let mut ops = UpdateOps::new(); @@ -703,7 +693,7 @@ struct State { } #[allow(clippy::type_complexity)] -fn call_all<R, I>( +async fn call_all<R, I>( pool: &mut ConnectionPool, reqs: impl IntoIterator<Item = (I, R)>, timeout: Duration, @@ -712,42 +702,18 @@ where R: rpc::Request, I: IdOfInstance + 'static, { - // TODO: this crap is only needed to wait until results of all - // the calls are ready. There are several ways to rafactor this: - // - we could use a std-style channel that unblocks the reading end - // once all the writing ends have dropped - // (fiber::Channel cannot do that for now) - // - using the std Futures we could use futures::join! - // - // Those things aren't implemented yet, so this is what we do let reqs = reqs.into_iter().collect::<Vec<_>>(); if reqs.is_empty() { return Ok(vec![]); } - static mut SENT_COUNT: usize = 0; - unsafe { SENT_COUNT = 0 }; - let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones(); - let instance_count = reqs.len(); - let (rx, tx) = fiber::Channel::new(instance_count as _).into_clones(); + let mut fs = vec![]; + let mut ids = vec![]; for (id, req) in reqs { - let tx = tx.clone(); - let cond_tx = cond_tx.clone(); - let id_copy = id.clone(); - pool.call(&id, req, move |res| { - tx.send((id_copy, res)).expect("mustn't fail"); - unsafe { SENT_COUNT += 1 }; - if unsafe { SENT_COUNT } == instance_count { - cond_tx.signal() - } - }) - .expect("shouldn't fail"); + fs.push(pool.call(&id, &req)?); + ids.push(id); } - // TODO: don't hard code timeout - if !cond_rx.wait_timeout(timeout) { - return Err(Error::Timeout); - } - - Ok(rx.into_iter().take(instance_count).collect()) + let responses = join_all(fs).timeout(timeout).await?; + Ok(ids.into_iter().zip(responses).collect()) } #[inline(always)] diff --git a/src/traft/error.rs b/src/traft/error.rs index f0740fbbd6..64009cc365 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -1,5 +1,6 @@ use crate::traft::InstanceId; use crate::traft::{RaftId, RaftTerm}; +use ::tarantool::fiber::r#async::timeout::Expired; use ::tarantool::tlua::LuaError; use raft::StorageError; use rmp_serde::decode::Error as RmpDecodeError; @@ -66,6 +67,12 @@ impl Error { } } +impl From<Expired> for Error { + fn from(_: Expired) -> Self { + Self::Timeout + } +} + #[derive(Debug, Error)] pub enum CoercionError { #[error("unknown entry type ({0})")] diff --git a/src/traft/network.rs b/src/traft/network.rs index cad8cc5851..3af22f889c 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -1,5 +1,6 @@ use ::raft::prelude as raft; use ::tarantool::fiber; +use ::tarantool::fiber::r#async::oneshot; use ::tarantool::net_box::promise::Promise; use ::tarantool::net_box::promise::TryGet; use ::tarantool::net_box::Conn; @@ -7,9 +8,12 @@ use ::tarantool::net_box::ConnOptions; use ::tarantool::tuple::Decode; use ::tarantool::tuple::{RawByteBuf, ToTupleBuffer, TupleBuffer}; use ::tarantool::util::IntoClones; +use futures::future::poll_fn; +use futures::Future; use std::cell::Cell; use std::collections::{hash_map::Entry, HashMap}; use std::io; +use std::pin::Pin; use std::rc::Rc; use std::time::{Duration, Instant}; @@ -291,7 +295,7 @@ impl PoolWorker { /// - in case peer was disconnected /// - in case response failed to deserialize /// - in case peer responded with an error - pub fn rpc<R>(&mut self, request: R, cb: impl FnOnce(Result<R::Response>) + 'static) + pub fn rpc<R>(&mut self, request: &R, cb: impl FnOnce(Result<R::Response>) + 'static) where R: Request, { @@ -470,7 +474,7 @@ impl ConnectionPool { pub fn call_and_wait_timeout<R>( &mut self, id: &impl IdOfInstance, - req: R, + req: &R, timeout: Duration, ) -> Result<R::Response> where @@ -490,30 +494,40 @@ impl ConnectionPool { /// **This function yields.** #[allow(dead_code)] #[inline(always)] - pub fn call_and_wait<R>(&mut self, id: &impl IdOfInstance, req: R) -> Result<R::Response> + pub fn call_and_wait<R>(&mut self, id: &impl IdOfInstance, req: &R) -> Result<R::Response> where R: Request, { self.call_and_wait_timeout(id, req, Duration::MAX) } - /// Send a request to instance with `id` (see `InstanceId`) and wait for the result. + /// Send a request to instance with `id` (see `IdOfInstance`) returning a + /// future. /// /// If the request failed, it's a responsibility of the caller /// to re-send it later. - /// - /// **This function never yields.** pub fn call<R>( &mut self, id: &impl IdOfInstance, - req: R, - cb: impl FnOnce(Result<R::Response>) + 'static, - ) -> Result<()> + req: &R, + ) -> Result<impl Future<Output = Result<R::Response>>> where R: Request, { - id.get_or_create_in(self)?.rpc(req, cb); - Ok(()) + let (tx, mut rx) = oneshot::channel(); + id.get_or_create_in(self)?.rpc(req, move |res| { + // TODO: maybe log a warning if receiver was dropped + let _ = tx.send(res); + }); + + // We use an explicit type implementing Future instead of defining an + // async fn, because we need to tell rust explicitly that the `id` & + // `req` arguments are not borrowed by the returned future. + let f = poll_fn(move |cx| { + let rx = Pin::new(&mut rx); + Future::poll(rx, cx).map(|r| r.unwrap_or_else(|_| Err(Error::other("disconnected")))) + }); + Ok(f) } } -- GitLab