From 09590071fafc57f9c2b8cd7f06689608240314cf Mon Sep 17 00:00:00 2001 From: Egor Ivkov <e.o.ivkov@gmail.com> Date: Tue, 22 Aug 2023 17:54:40 +0300 Subject: [PATCH] fix: speed up instance join --- src/rpc/join.rs | 2 ++ src/rpc/update_instance.rs | 2 ++ src/traft/node.rs | 5 +++++ test/manual/test_scaling.py | 5 +++-- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/src/rpc/join.rs b/src/rpc/join.rs index 1f2876d1a8..fecdca2dc3 100644 --- a/src/rpc/join.rs +++ b/src/rpc/join.rs @@ -66,6 +66,7 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R let cluster_id = node.raft_storage.cluster_id()?; let storage = &node.storage; let raft_storage = &node.raft_storage; + let guard = node.instances_update.lock(); if req.cluster_id != cluster_id { return Err(Error::ClusterIdMismatch { @@ -147,6 +148,7 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R // A joined instance needs to communicate with other nodes. // TODO: limit the number of entries sent to reduce response size. let peer_addresses = node.storage.peer_addresses.iter()?.collect(); + drop(guard); return Ok(Response { instance: instance.into(), peer_addresses, diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index d109f450cf..0b2671c999 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -87,6 +87,7 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) let cluster_id = node.raft_storage.cluster_id()?; let storage = &node.storage; let raft_storage = &node.raft_storage; + let guard = node.instances_update.lock(); if req.cluster_id != cluster_id { return Err(Error::ClusterIdMismatch { @@ -136,6 +137,7 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) } } node.main_loop.wakeup(); + drop(guard); return Ok(()); } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 1b6403abf1..c8a3798622 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -143,6 +143,10 @@ pub struct Node { pub(crate) governor_loop: governor::Loop, status: watch::Receiver<Status>, watchers: Rc<Mutex<StorageWatchers>>, + + /// Should be locked during join and update instance request + /// to avoid costly cas conflicts during concurrent requests. + pub instances_update: Mutex<()>, } impl std::fmt::Debug for Node { @@ -187,6 +191,7 @@ impl Node { raft_storage, status, watchers, + instances_update: Mutex::new(()), }; // Wait for the node to enter the main loop diff --git a/test/manual/test_scaling.py b/test/manual/test_scaling.py index dce9500915..d90ea40b9e 100644 --- a/test/manual/test_scaling.py +++ b/test/manual/test_scaling.py @@ -1,7 +1,8 @@ +# mypy: disable-error-code="import" import funcy # type: ignore import time -from matplotlib import pyplot # type: ignore -import matplotlib # type: ignore +from matplotlib import pyplot +import matplotlib from conftest import ( Cluster, -- GitLab