Skip to content
Snippets Groups Projects

fix(discovery): fix hanging if some peers don't respond

Merged Georgy Moshkin requested to merge fix/parallel-discovery into master
Files
5
+ 62
52
use ::tarantool::fiber::{mutex::MutexGuard, Mutex};
use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex};
use ::tarantool::proc;
use ::tarantool::uuid::Uuid;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::error::Error as StdError;
use std::time::{Duration, Instant};
use crate::stringify_cfunc;
use crate::tarantool;
use crate::traft;
use crate::unwrap_or;
type Address = String;
@@ -53,7 +55,6 @@ pub type Response = State;
#[derive(Debug)]
pub struct Discovery {
// BTreeSet for determinism
pending_request: bool,
visited: BTreeSet<Address>,
address: Option<Address>,
state: State,
@@ -65,7 +66,6 @@ impl Discovery {
let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect();
assert!(!peers.is_empty(), "peers should not be empty");
Self {
pending_request: false,
visited: [].into(),
address: None,
state: State::LeaderElection(LeaderElection {
@@ -99,20 +99,18 @@ impl Discovery {
&self.state
}
fn handle_response(&mut self, response: Response) {
fn handle_response(&mut self, from: Address, response: Response) {
self.visited.insert(from);
match (&mut self.state, response) {
(
State::LeaderElection(LeaderElection { peers, .. }),
Response::LeaderElection(LeaderElection {
peers: response_peers,
..
}),
Response::LeaderElection(response),
) => {
if !response_peers.is_subset(peers) {
if !response.peers.is_subset(peers) {
// found a new peer
self.visited.clear()
}
peers.extend(response_peers);
peers.extend(response.peers);
if let Some(address) = &self.address {
if peers.is_subset(&self.visited)
@@ -139,30 +137,23 @@ impl Discovery {
}
(State::Done(_), _) => {}
}
self.pending_request = false;
}
fn next(&mut self) -> Option<(Request, Address)> {
if self.pending_request {
return None;
}
fn next_or_role(&self) -> Result<(Request, Vec<Address>), Role> {
match &self.state {
State::LeaderElection(le @ LeaderElection { peers, .. }) => {
if self.pending_request {
return None;
State::LeaderElection(election) => {
let mut next_peers = election
.peers
.difference(&self.visited)
.cloned()
.collect::<Vec<_>>();
if next_peers.is_empty() {
next_peers.extend(election.peers.iter().next().cloned())
}
let res = peers.difference(&self.visited).next().cloned();
let addr = match &res {
Some(addr) => {
self.visited.insert(addr.clone());
addr
}
None => peers.iter().next().unwrap(), // peers is not empty
};
self.pending_request = true;
Some((le.clone(), addr.clone()))
assert!(!next_peers.is_empty());
Ok((election.clone(), next_peers))
}
State::Done(_) => None,
State::Done(role) => Err(role.clone()),
}
}
}
@@ -177,21 +168,27 @@ pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) {
unsafe { DISCOVERY = Some(Box::new(Mutex::new(d))) }
}
const ROUND_TIMEOUT: Duration = Duration::from_millis(200);
pub fn wait_global() -> Role {
loop {
let mut d = discovery().expect("discovery uninitialized");
if let State::Done(role) = &d.state {
return role.clone();
}
let step = d.next();
let d = discovery().expect("discovery uninitialized");
let (request, curr_peers) = unwrap_or!(d.next_or_role(), Err(role) => break role);
drop(d); // release the lock before doing i/o
if let Some((request, address)) = &step {
let fn_name = stringify_cfunc!(proc_discover);
let response = tarantool::net_box_call_retry(address, fn_name, &(request, address));
discovery()
.expect("discovery uninitialized")
.handle_response(response);
let round_start = Instant::now();
for address in curr_peers {
if let Some(response) = tarantool::net_box_call_or_log(
&address,
stringify_cfunc!(proc_discover),
(&request, &address),
ROUND_TIMEOUT,
) {
discovery()
.expect("discovery deinitialized")
.handle_response(address, response)
}
}
sleep(ROUND_TIMEOUT.saturating_sub(round_start.elapsed()));
}
}
@@ -225,7 +222,7 @@ fn proc_discover<'a>(
#[cfg(test)]
mod tests {
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use super::*;
use rand::prelude::*;
@@ -238,34 +235,47 @@ mod tests {
let mut done = HashMap::<Address, Role>::new();
let len = instances.len();
let addrs = instances.keys().cloned().collect_vec();
let mut pending_requests: HashMap<_, _> = addrs
.iter()
.cloned()
.zip(std::iter::repeat(HashSet::new()))
.collect();
let mut rng = rand::thread_rng();
#[derive(Debug)]
enum Event {
Request(Address, Request, Address),
Response(Response, Address),
Response(Address, Response, Address),
}
let mut network: Vec<Event> = [].into();
while done.len() != len {
if rng.gen_bool(0.5) {
let addr = addrs.choose(&mut rng).unwrap();
let discovery = instances.get_mut(addr).unwrap();
if let Some((request, peer_addr)) = discovery.next() {
network.push(Event::Request(addr.clone(), request, peer_addr));
let src = addrs.choose(&mut rng).unwrap();
if !pending_requests.get(src).unwrap().is_empty() {
continue;
}
let discovery = instances.get_mut(src).unwrap();
if let Ok((request, peer_addrs)) = discovery.next_or_role() {
for dst in peer_addrs {
pending_requests.get_mut(src).unwrap().insert(dst.clone());
network.push(Event::Request(src.clone(), request.clone(), dst))
}
}
} else {
match network.pop() {
match dbg!(network.pop()) {
Some(Event::Request(src, request, dst)) => {
let peer = instances.get_mut(&dst).unwrap();
let response = peer.handle_request(request, dst).clone();
network.push(Event::Response(response, src))
let response = peer.handle_request(request, dst.clone()).clone();
network.push(Event::Response(dst, response, src))
}
Some(Event::Response(response, dst)) => {
Some(Event::Response(src, response, dst)) => {
let peer = instances.get_mut(&dst).unwrap();
peer.handle_response(response);
if let State::Done(role) = &instances.get_mut(&dst).unwrap().state {
pending_requests.get_mut(&dst).unwrap().remove(&src);
peer.handle_response(src, response);
if let State::Done(role) = &peer.state {
done.insert(dst.clone(), role.clone());
}
}
Loading