From 4d3116b0a5ce13bd56ed2aade25a677f40c5830f Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 31 May 2022 17:57:40 +0300 Subject: [PATCH] fix(discovery): fix hanging if some peers don't respond Previously the discovery algorithm would try to reach each known peer sequentially requiring each consequent request to succeed until the next one can be attempted. This would not work in some cases (see test in previous commit). So the new algorithm instead makes a single attempt to reach each peer within a round, and if some failed they're retried in the next round of requests. This allows overall discovery to succeed in cases when some of the initial peers never respond. Closes #54 --- src/discovery.rs | 115 ++++++++++++++++++++++++++--------------------- src/main.rs | 1 + src/tarantool.rs | 25 +++++++++++ src/util.rs | 5 +++ 4 files changed, 94 insertions(+), 52 deletions(-) create mode 100644 src/util.rs diff --git a/src/discovery.rs b/src/discovery.rs index d023f89b78..a4c0e53622 100644 --- a/src/discovery.rs +++ b/src/discovery.rs @@ -1,14 +1,16 @@ -use ::tarantool::fiber::{mutex::MutexGuard, Mutex}; +use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex}; use ::tarantool::proc; use ::tarantool::uuid::Uuid; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::BTreeSet; use std::error::Error as StdError; +use std::time::{Duration, Instant}; use crate::stringify_cfunc; use crate::tarantool; use crate::traft; +use crate::util::Either::{self, Left, Right}; type Address = String; @@ -53,7 +55,6 @@ pub type Response = State; #[derive(Debug)] pub struct Discovery { // BTreeSet for determinism - pending_request: bool, visited: BTreeSet<Address>, address: Option<Address>, state: State, @@ -65,7 +66,6 @@ impl Discovery { let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect(); assert!(!peers.is_empty(), "peers should not be empty"); Self { - pending_request: false, visited: [].into(), address: None, state: State::LeaderElection(LeaderElection { @@ -99,20 +99,18 @@ impl Discovery { &self.state } - fn handle_response(&mut self, response: Response) { + fn handle_response(&mut self, from: Address, response: Response) { + self.visited.insert(from); match (&mut self.state, response) { ( State::LeaderElection(LeaderElection { peers, .. }), - Response::LeaderElection(LeaderElection { - peers: response_peers, - .. - }), + Response::LeaderElection(response), ) => { - if !response_peers.is_subset(peers) { + if !response.peers.is_subset(peers) { // found a new peer self.visited.clear() } - peers.extend(response_peers); + peers.extend(response.peers); if let Some(address) = &self.address { if peers.is_subset(&self.visited) @@ -139,30 +137,23 @@ impl Discovery { } (State::Done(_), _) => {} } - self.pending_request = false; } - fn next(&mut self) -> Option<(Request, Address)> { - if self.pending_request { - return None; - } + fn next_or_role(&self) -> Either<(Request, Vec<Address>), Role> { match &self.state { - State::LeaderElection(le @ LeaderElection { peers, .. }) => { - if self.pending_request { - return None; + State::LeaderElection(election) => { + let mut next_peers = election + .peers + .difference(&self.visited) + .cloned() + .collect::<Vec<_>>(); + if next_peers.is_empty() { + next_peers.extend(election.peers.iter().next().cloned()) } - let res = peers.difference(&self.visited).next().cloned(); - let addr = match &res { - Some(addr) => { - self.visited.insert(addr.clone()); - addr - } - None => peers.iter().next().unwrap(), // peers is not empty - }; - self.pending_request = true; - Some((le.clone(), addr.clone())) + assert!(!next_peers.is_empty()); + Left((election.clone(), next_peers)) } - State::Done(_) => None, + State::Done(role) => Right(role.clone()), } } } @@ -179,19 +170,26 @@ pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) { pub fn wait_global() -> Role { loop { - let mut d = discovery().expect("discovery uninitialized"); - if let State::Done(role) = &d.state { - return role.clone(); - } - let step = d.next(); + let d = discovery().expect("discovery uninitialized"); + let (request, curr_peers) = match d.next_or_role() { + Left(l) => l, + Right(role) => break role, + }; drop(d); // release the lock before doing i/o - if let Some((request, address)) = &step { - let fn_name = stringify_cfunc!(proc_discover); - let response = tarantool::net_box_call_retry(address, fn_name, &(request, address)); - discovery() - .expect("discovery uninitialized") - .handle_response(response); + let round_start = Instant::now(); + for address in curr_peers { + if let Some(response) = tarantool::net_box_call_or_log( + &address, + stringify_cfunc!(proc_discover), + (&request, &address), + Duration::from_secs(2), + ) { + discovery() + .expect("discovery deinitialized") + .handle_response(address, response) + } } + sleep(Duration::from_millis(200).saturating_sub(round_start.elapsed())) } } @@ -225,7 +223,7 @@ fn proc_discover<'a>( #[cfg(test)] mod tests { use itertools::Itertools; - use std::collections::{BTreeMap, HashMap}; + use std::collections::{BTreeMap, HashMap, HashSet}; use super::*; use rand::prelude::*; @@ -238,34 +236,47 @@ mod tests { let mut done = HashMap::<Address, Role>::new(); let len = instances.len(); let addrs = instances.keys().cloned().collect_vec(); + let mut pending_requests: HashMap<_, _> = addrs + .iter() + .cloned() + .zip(std::iter::repeat(HashSet::new())) + .collect(); let mut rng = rand::thread_rng(); + #[derive(Debug)] enum Event { Request(Address, Request, Address), - Response(Response, Address), + Response(Address, Response, Address), } let mut network: Vec<Event> = [].into(); while done.len() != len { if rng.gen_bool(0.5) { - let addr = addrs.choose(&mut rng).unwrap(); - let discovery = instances.get_mut(addr).unwrap(); - if let Some((request, peer_addr)) = discovery.next() { - network.push(Event::Request(addr.clone(), request, peer_addr)); + let src = addrs.choose(&mut rng).unwrap(); + if !pending_requests.get(src).unwrap().is_empty() { + continue; + } + let discovery = instances.get_mut(src).unwrap(); + if let Left((request, peer_addrs)) = discovery.next_or_role() { + for dst in peer_addrs { + pending_requests.get_mut(src).unwrap().insert(dst.clone()); + network.push(Event::Request(src.clone(), request.clone(), dst)) + } } } else { - match network.pop() { + match dbg!(network.pop()) { Some(Event::Request(src, request, dst)) => { let peer = instances.get_mut(&dst).unwrap(); - let response = peer.handle_request(request, dst).clone(); - network.push(Event::Response(response, src)) + let response = peer.handle_request(request, dst.clone()).clone(); + network.push(Event::Response(dst, response, src)) } - Some(Event::Response(response, dst)) => { + Some(Event::Response(src, response, dst)) => { let peer = instances.get_mut(&dst).unwrap(); - peer.handle_response(response); - if let State::Done(role) = &instances.get_mut(&dst).unwrap().state { + pending_requests.get_mut(&dst).unwrap().remove(&src); + peer.handle_response(src, response); + if let State::Done(role) = &peer.state { done.insert(dst.clone(), role.clone()); } } diff --git a/src/main.rs b/src/main.rs index 356cd60ba2..4615c1e78d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -24,6 +24,7 @@ mod mailbox; mod tarantool; mod tlog; mod traft; +mod util; inventory::collect!(InnerTest); diff --git a/src/tarantool.rs b/src/tarantool.rs index 2b16613d79..49f24157b5 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -145,6 +145,7 @@ pub fn set_cfg(cfg: &Cfg) { box_cfg.call_with_args(cfg).unwrap() } +#[track_caller] pub fn eval(code: &str) { let l = lua_state(); l.exec(code).unwrap() @@ -204,3 +205,27 @@ where } } } + +#[inline] +pub fn net_box_call_or_log<Args, Res, Addr>( + address: Addr, + fn_name: &str, + args: Args, + timeout: Duration, +) -> Option<Res> +where + Args: AsTuple, + Addr: std::net::ToSocketAddrs + std::fmt::Display + slog::Value, + Res: serde::de::DeserializeOwned, +{ + match net_box_call(&address, fn_name, &args, timeout) { + Ok(res) => Some(res), + Err(e) => { + crate::tlog!(Warning, "net_box_call failed: {e}"; + "peer" => &address, + "fn" => fn_name, + ); + None + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000000..fcffbf25e7 --- /dev/null +++ b/src/util.rs @@ -0,0 +1,5 @@ +#[derive(Debug, PartialEq, Eq, Hash)] +pub enum Either<L, R> { + Left(L), + Right(R), +} -- GitLab