Skip to content
Snippets Groups Projects
Commit eeba8949 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

refactor: add network::Request struct instead of a unnamed tuple

parent 1005cde2
No related branches found
No related tags found
1 merge request!619Refactor/network improvements
......@@ -53,8 +53,27 @@ impl Default for WorkerOptions {
// PoolWorker
////////////////////////////////////////////////////////////////////////////////
type Callback = Box<dyn FnOnce(Result<Tuple>)>;
type Queue = Mailbox<(Callback, &'static str, TupleBuffer)>;
struct Request {
proc: &'static str,
args: TupleBuffer,
on_result: Box<dyn FnOnce(Result<Tuple>)>,
}
impl Request {
#[inline(always)]
fn new<H>(proc: &'static str, args: TupleBuffer, on_result: H) -> Self
where
H: FnOnce(Result<Tuple>) + 'static,
{
Self {
proc,
args,
on_result: Box::new(on_result),
}
}
}
type Queue = Mailbox<Request>;
pub struct PoolWorker {
// Despite instances are usually identified by `instance_id` in
......@@ -139,9 +158,9 @@ impl PoolWorker {
let mut client_ver: usize = 0;
let mut futures = VecDeque::new();
loop {
let messages = inbox.try_receive_n(max_concurrent_fut - futures.len());
// If there are no new messages and no messages are being sent - wait.
if messages.is_empty() && futures.is_empty() {
let requests = inbox.try_receive_n(max_concurrent_fut - futures.len());
// If there are no new requests and no requests are being sent - wait.
if requests.is_empty() && futures.is_empty() {
inbox_ready
.changed()
.await
......@@ -149,15 +168,18 @@ impl PoolWorker {
continue;
}
// Generate futures for new messages.
for (callback, proc, request) in messages {
// Generate futures for new requests.
for request in requests {
let client = client.clone();
futures.push_back((
client_ver,
callback,
Box::pin(
async move { client.call(proc, &request).timeout(call_timeout).await },
),
request.on_result,
Box::pin(async move {
client
.call(request.proc, &request.args)
.timeout(call_timeout)
.await
}),
));
}
......@@ -170,7 +192,7 @@ impl PoolWorker {
while cursor < futures.len() {
let poll_result = Future::poll(futures[cursor].2.as_mut(), cx);
if let Poll::Ready(result) = poll_result {
let (client_ver, callback, _) = futures.remove(cursor).unwrap();
let (client_ver, on_result, _) = futures.remove(cursor).unwrap();
if let Err(timeout::Error::Failed(network::Error::Protocol(_))) | Ok(_) =
result
{
......@@ -184,7 +206,7 @@ impl PoolWorker {
None => highest_client_ver = Some(client_ver),
}
}
callback(result.map_err(Error::from));
on_result(result.map_err(Error::from));
has_ready = true;
} else {
cursor += 1;
......@@ -220,7 +242,7 @@ impl PoolWorker {
),
};
self.inbox
.send((Box::new(on_result), self.handler_name, args));
.send(Request::new(self.handler_name, args, on_result));
if self.inbox_ready.send(()).is_err() {
tlog!(Warning, "failed sending request to peer, worker loop receiver dropped";
"raft_id" => raft_id,
......@@ -236,7 +258,7 @@ impl PoolWorker {
/// - in case peer was disconnected
/// - in case response failed to deserialize
/// - in case peer responded with an error
pub fn rpc<R>(&mut self, request: &R, cb: impl FnOnce(Result<R::Response>) + 'static)
pub fn rpc<R>(&self, request: &R, cb: impl FnOnce(Result<R::Response>) + 'static)
where
R: rpc::RequestArgs,
{
......@@ -248,11 +270,10 @@ impl PoolWorker {
let ((res,),) = tuple.decode()?;
Ok(res)
};
self.inbox.send((
Box::new(move |res| cb(convert_result(res))),
R::PROC_NAME,
args,
));
self.inbox
.send(Request::new(R::PROC_NAME, args, move |res| {
cb(convert_result(res))
}));
if self.inbox_ready.send(()).is_err() {
tlog!(
Warning,
......@@ -269,7 +290,7 @@ impl PoolWorker {
/// - in case response failed to deserialize
/// - in case peer responded with an error
pub fn rpc_raw<Args, Response>(
&mut self,
&self,
proc: &'static str,
args: &Args,
cb: impl FnOnce(Result<Response>) + 'static,
......@@ -286,7 +307,7 @@ impl PoolWorker {
Ok(res)
};
self.inbox
.send((Box::new(move |res| cb(convert_result(res))), proc, args));
.send(Request::new(proc, args, move |res| cb(convert_result(res))));
if self.inbox_ready.send(()).is_err() {
tlog!(
Warning,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment