diff --git a/src/discovery.rs b/src/discovery.rs index d023f89b788d9deb717516667cae0b6a6369a0ad..a4c0e5362249013458dd4275cba01f00ffd02363 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,14 +1,16 @@ -use ::tarantool::fiber::{mutex::MutexGuard, Mutex}; +use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex}; use ::tarantool::proc; use ::tarantool::uuid::Uuid; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::BTreeSet; use std::error::Error as StdError; +use std::time::{Duration, Instant}; use crate::stringify_cfunc; use crate::tarantool; use crate::traft; +use crate::util::Either::{self, Left, Right}; type Address = String; @@ -53,7 +55,6 @@ pub type Response = State; #[derive(Debug)] pub struct Discovery { // BTreeSet for determinism - pending_request: bool, visited: BTreeSet<Address>, address: Option<Address>, state: State, @@ -65,7 +66,6 @@ impl Discovery { let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect(); assert!(!peers.is_empty(), "peers should not be empty"); Self { - pending_request: false, visited: [].into(), address: None, state: State::LeaderElection(LeaderElection { @@ -99,20 +99,18 @@ impl Discovery { &self.state } - fn handle_response(&mut self, response: Response) { + fn handle_response(&mut self, from: Address, response: Response) { + self.visited.insert(from); match (&mut self.state, response) { ( State::LeaderElection(LeaderElection { peers, .. }), - Response::LeaderElection(LeaderElection { - peers: response_peers, - .. - }), + Response::LeaderElection(response), ) => { - if !response_peers.is_subset(peers) { + if !response.peers.is_subset(peers) { // found a new peer self.visited.clear() } - peers.extend(response_peers); + peers.extend(response.peers); if let Some(address) = &self.address { if peers.is_subset(&self.visited) @@ -139,30 +137,23 @@ impl Discovery { } (State::Done(_), _) => {} } - self.pending_request = false; } - fn next(&mut self) -> Option<(Request, Address)> { - if self.pending_request { - return None; - } + fn next_or_role(&self) -> Either<(Request, Vec<Address>), Role> { match &self.state { - State::LeaderElection(le @ LeaderElection { peers, .. }) => { - if self.pending_request { - return None; + State::LeaderElection(election) => { + let mut next_peers = election + .peers + .difference(&self.visited) + .cloned() + .collect::<Vec<_>>(); + if next_peers.is_empty() { + next_peers.extend(election.peers.iter().next().cloned()) } - let res = peers.difference(&self.visited).next().cloned(); - let addr = match &res { - Some(addr) => { - self.visited.insert(addr.clone()); - addr - } - None => peers.iter().next().unwrap(), // peers is not empty - }; - self.pending_request = true; - Some((le.clone(), addr.clone())) + assert!(!next_peers.is_empty()); + Left((election.clone(), next_peers)) } - State::Done(_) => None, + State::Done(role) => Right(role.clone()), } } } @@ -179,19 +170,26 @@ pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) { pub fn wait_global() -> Role { loop { - let mut d = discovery().expect("discovery uninitialized"); - if let State::Done(role) = &d.state { - return role.clone(); - } - let step = d.next(); + let d = discovery().expect("discovery uninitialized"); + let (request, curr_peers) = match d.next_or_role() { + Left(l) => l, + Right(role) => break role, + }; drop(d); // release the lock before doing i/o - if let Some((request, address)) = &step { - let fn_name = stringify_cfunc!(proc_discover); - let response = tarantool::net_box_call_retry(address, fn_name, &(request, address)); - discovery() - .expect("discovery uninitialized") - .handle_response(response); + let round_start = Instant::now(); + for address in curr_peers { + if let Some(response) = tarantool::net_box_call_or_log( + &address, + stringify_cfunc!(proc_discover), + (&request, &address), + Duration::from_secs(2), + ) { + discovery() + .expect("discovery deinitialized") + .handle_response(address, response) + } } + sleep(Duration::from_millis(200).saturating_sub(round_start.elapsed())) } } @@ -225,7 +223,7 @@ fn proc_discover<'a>( #[cfg(test)] mod tests { use itertools::Itertools; - use std::collections::{BTreeMap, HashMap}; + use std::collections::{BTreeMap, HashMap, HashSet}; use super::*; use rand::prelude::*; @@ -238,34 +236,47 @@ mod tests { let mut done = HashMap::<Address, Role>::new(); let len = instances.len(); let addrs = instances.keys().cloned().collect_vec(); + let mut pending_requests: HashMap<_, _> = addrs + .iter() + .cloned() + .zip(std::iter::repeat(HashSet::new())) + .collect(); let mut rng = rand::thread_rng(); + #[derive(Debug)] enum Event { Request(Address, Request, Address), - Response(Response, Address), + Response(Address, Response, Address), } let mut network: Vec<Event> = [].into(); while done.len() != len { if rng.gen_bool(0.5) { - let addr = addrs.choose(&mut rng).unwrap(); - let discovery = instances.get_mut(addr).unwrap(); - if let Some((request, peer_addr)) = discovery.next() { - network.push(Event::Request(addr.clone(), request, peer_addr)); + let src = addrs.choose(&mut rng).unwrap(); + if !pending_requests.get(src).unwrap().is_empty() { + continue; + } + let discovery = instances.get_mut(src).unwrap(); + if let Left((request, peer_addrs)) = discovery.next_or_role() { + for dst in peer_addrs { + pending_requests.get_mut(src).unwrap().insert(dst.clone()); + network.push(Event::Request(src.clone(), request.clone(), dst)) + } } } else { - match network.pop() { + match dbg!(network.pop()) { Some(Event::Request(src, request, dst)) => { let peer = instances.get_mut(&dst).unwrap(); - let response = peer.handle_request(request, dst).clone(); - network.push(Event::Response(response, src)) + let response = peer.handle_request(request, dst.clone()).clone(); + network.push(Event::Response(dst, response, src)) } - Some(Event::Response(response, dst)) => { + Some(Event::Response(src, response, dst)) => { let peer = instances.get_mut(&dst).unwrap(); - peer.handle_response(response); - if let State::Done(role) = &instances.get_mut(&dst).unwrap().state { + pending_requests.get_mut(&dst).unwrap().remove(&src); + peer.handle_response(src, response); + if let State::Done(role) = &peer.state { done.insert(dst.clone(), role.clone()); } } diff --git a/src/main.rs b/src/main.rs index 356cd60ba258efe70591c2f313c3cdf815f73b09..4615c1e78d3e259f526a5e0dcf24d3aa98e5f687 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ mod mailbox; mod tarantool; mod tlog; mod traft; +mod util; inventory::collect!(InnerTest); diff --git a/src/tarantool.rs b/src/tarantool.rs index 2b16613d79408e814d888e8426f387936aa0a45c..49f24157b581c2b5283d2b16695344e56ea4fdbd 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -145,6 +145,7 @@ pub fn set_cfg(cfg: &Cfg) { box_cfg.call_with_args(cfg).unwrap() } +#[track_caller] pub fn eval(code: &str) { let l = lua_state(); l.exec(code).unwrap() @@ -204,3 +205,27 @@ where } } } + +#[inline] +pub fn net_box_call_or_log<Args, Res, Addr>( + address: Addr, + fn_name: &str, + args: Args, + timeout: Duration, +) -> Option<Res> +where + Args: AsTuple, + Addr: std::net::ToSocketAddrs + std::fmt::Display + slog::Value, + Res: serde::de::DeserializeOwned, +{ + match net_box_call(&address, fn_name, &args, timeout) { + Ok(res) => Some(res), + Err(e) => { + crate::tlog!(Warning, "net_box_call failed: {e}"; + "peer" => &address, + "fn" => fn_name, + ); + None + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000000000000000000000000000000000000..fcffbf25e75583926b56d5808253a2c91e12f1f4 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,5 @@ +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum Either<L, R> { + Left(L), + Right(R), +}