Skip to content
Snippets Groups Projects
Commit 93309ba7 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor(network): remove ConnectionPool::call_and_wait[_timeout]

parent 11b8766a
No related branches found
No related tags found
1 merge request!424refactor(network): async fn ConnectionPool::call
Pipeline #14263 passed
......@@ -217,7 +217,7 @@ impl Loop {
return Continue;
}
let res = (|| -> Result<_> {
let res: Result<_> = async {
// Promote the replication leader again
// because of tarantool bugs
if let Some(replicaset) = storage.replicasets.get(replicaset_id)? {
......@@ -226,19 +226,21 @@ impl Loop {
"instance_id" => %replicaset.master_id
);
let commit = raft_storage.commit()?.unwrap();
pool.call_and_wait_timeout(
pool.call(
&replicaset.master_id,
&replication::promote::Request {
term,
commit,
timeout: Self::SYNC_TIMEOUT,
},
// TODO: don't hard code timeout
Duration::from_secs(3),
)?;
)?
// TODO: don't hard code timeout
.timeout(Duration::from_secs(3))
.await??;
}
Ok(())
})();
}
.await;
if let Err(e) = res {
tlog!(Warning,
"failed calling rpc::replication::promote: {e}";
......@@ -332,8 +334,10 @@ impl Loop {
timeout: Self::SYNC_TIMEOUT,
};
let replication::promote::Response {} = pool
.call(instance_id, &req)?
// TODO: don't hard code the timeout
.call_and_wait_timeout(instance_id, &req, Duration::from_secs(3))?;
.timeout(Duration::from_secs(3))
.await??;
Ok(())
}
.await;
......@@ -613,35 +617,38 @@ impl Loop {
timeout: Self::SYNC_TIMEOUT,
migration_id: migration.id,
};
let res = pool.call_and_wait(&instance.raft_id, &req);
match res {
Ok(_) => {
let mut ops = UpdateOps::new();
ops.assign("current_schema_version", migration.id).unwrap();
let op = OpDML::update(
ClusterwideSpace::Replicaset,
&[replicaset.replicaset_id.clone()],
ops,
)
.unwrap();
node.propose_and_wait(op, Duration::MAX).unwrap().unwrap();
tlog!(
Info,
"Migration {0} applied to replicaset {1}",
migration.id,
replicaset.replicaset_id
);
}
Err(e) => {
tlog!(
Warning,
"Could not apply migration {0} to replicaset {1}, error: {2}",
migration.id,
replicaset.replicaset_id,
e
);
return Continue;
}
let res: Result<_> = async {
let rpc::migration::apply::Response {} = pool
.call(&instance.raft_id, &req)?
// TODO: don't hard code timeout
.timeout(Duration::from_secs(3))
.await??;
let mut ops = UpdateOps::new();
ops.assign("current_schema_version", migration.id)?;
let op = OpDML::update(
ClusterwideSpace::Replicaset,
&[replicaset.replicaset_id.clone()],
ops,
)?;
node.propose_and_wait(op, Duration::MAX)??;
tlog!(
Info,
"Migration {0} applied to replicaset {1}",
migration.id,
replicaset.replicaset_id
);
Ok(())
}
.await;
if let Err(e) = res {
tlog!(
Warning,
"Could not apply migration {0} to replicaset {1}, error: {2}",
migration.id,
replicaset.replicaset_id,
e
);
return Continue;
}
}
}
......
......@@ -464,43 +464,6 @@ impl ConnectionPool {
self.get_or_create_by_raft_id(msg.to)?.send(msg)
}
/// Send a request to instance with `id` (see `IdOfInstance`) and wait for the result.
///
/// If the request failed, it's a responsibility of the caller
/// to re-send it later.
///
/// **This function yields.**
#[allow(dead_code)]
pub fn call_and_wait_timeout<R>(
&mut self,
id: &impl IdOfInstance,
req: &R,
timeout: Duration,
) -> Result<R::Response>
where
R: Request,
{
let (rx, tx) = fiber::Channel::new(1).into_clones();
id.get_or_create_in(self)?
.rpc(req, move |res| tx.send(res).unwrap());
rx.recv_timeout(timeout).map_err(|_| Error::Timeout)?
}
/// Send a request to instance with `id` (see `InstanceId`) and wait for the result.
///
/// If the request failed, it's a responsibility of the caller
/// to re-send it later.
///
/// **This function yields.**
#[allow(dead_code)]
#[inline(always)]
pub fn call_and_wait<R>(&mut self, id: &impl IdOfInstance, req: &R) -> Result<R::Response>
where
R: Request,
{
self.call_and_wait_timeout(id, req, Duration::MAX)
}
/// Send a request to instance with `id` (see `IdOfInstance`) returning a
/// future.
///
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment