Skip to content
Snippets Groups Projects
Commit 4d3116b0 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix(discovery): fix hanging if some peers don't respond

Previously the discovery algorithm would try to reach each known peer
sequentially requiring each consequent request to succeed until the next
one can be attempted. This would not work in some cases (see test in
previous commit).

So the new algorithm instead makes a single attempt to reach each peer
within a round, and if some failed they're retried in the next round of
requests. This allows overall discovery to succeed in cases when some
of the initial peers never respond.

Closes #54
parent 39498cb6
No related branches found
No related tags found
1 merge request!127fix(discovery): fix hanging if some peers don't respond
Pipeline #6248 passed
use ::tarantool::fiber::{mutex::MutexGuard, Mutex};
use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex};
use ::tarantool::proc;
use ::tarantool::uuid::Uuid;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::collections::BTreeSet;
use std::error::Error as StdError;
use std::time::{Duration, Instant};
use crate::stringify_cfunc;
use crate::tarantool;
use crate::traft;
use crate::util::Either::{self, Left, Right};
type Address = String;
......@@ -53,7 +55,6 @@ pub type Response = State;
#[derive(Debug)]
pub struct Discovery {
// BTreeSet for determinism
pending_request: bool,
visited: BTreeSet<Address>,
address: Option<Address>,
state: State,
......@@ -65,7 +66,6 @@ impl Discovery {
let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect();
assert!(!peers.is_empty(), "peers should not be empty");
Self {
pending_request: false,
visited: [].into(),
address: None,
state: State::LeaderElection(LeaderElection {
......@@ -99,20 +99,18 @@ impl Discovery {
&self.state
}
fn handle_response(&mut self, response: Response) {
fn handle_response(&mut self, from: Address, response: Response) {
self.visited.insert(from);
match (&mut self.state, response) {
(
State::LeaderElection(LeaderElection { peers, .. }),
Response::LeaderElection(LeaderElection {
peers: response_peers,
..
}),
Response::LeaderElection(response),
) => {
if !response_peers.is_subset(peers) {
if !response.peers.is_subset(peers) {
// found a new peer
self.visited.clear()
}
peers.extend(response_peers);
peers.extend(response.peers);
if let Some(address) = &self.address {
if peers.is_subset(&self.visited)
......@@ -139,30 +137,23 @@ impl Discovery {
}
(State::Done(_), _) => {}
}
self.pending_request = false;
}
fn next(&mut self) -> Option<(Request, Address)> {
if self.pending_request {
return None;
}
fn next_or_role(&self) -> Either<(Request, Vec<Address>), Role> {
match &self.state {
State::LeaderElection(le @ LeaderElection { peers, .. }) => {
if self.pending_request {
return None;
State::LeaderElection(election) => {
let mut next_peers = election
.peers
.difference(&self.visited)
.cloned()
.collect::<Vec<_>>();
if next_peers.is_empty() {
next_peers.extend(election.peers.iter().next().cloned())
}
let res = peers.difference(&self.visited).next().cloned();
let addr = match &res {
Some(addr) => {
self.visited.insert(addr.clone());
addr
}
None => peers.iter().next().unwrap(), // peers is not empty
};
self.pending_request = true;
Some((le.clone(), addr.clone()))
assert!(!next_peers.is_empty());
Left((election.clone(), next_peers))
}
State::Done(_) => None,
State::Done(role) => Right(role.clone()),
}
}
}
......@@ -179,19 +170,26 @@ pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) {
pub fn wait_global() -> Role {
loop {
let mut d = discovery().expect("discovery uninitialized");
if let State::Done(role) = &d.state {
return role.clone();
}
let step = d.next();
let d = discovery().expect("discovery uninitialized");
let (request, curr_peers) = match d.next_or_role() {
Left(l) => l,
Right(role) => break role,
};
drop(d); // release the lock before doing i/o
if let Some((request, address)) = &step {
let fn_name = stringify_cfunc!(proc_discover);
let response = tarantool::net_box_call_retry(address, fn_name, &(request, address));
discovery()
.expect("discovery uninitialized")
.handle_response(response);
let round_start = Instant::now();
for address in curr_peers {
if let Some(response) = tarantool::net_box_call_or_log(
&address,
stringify_cfunc!(proc_discover),
(&request, &address),
Duration::from_secs(2),
) {
discovery()
.expect("discovery deinitialized")
.handle_response(address, response)
}
}
sleep(Duration::from_millis(200).saturating_sub(round_start.elapsed()))
}
}
......@@ -225,7 +223,7 @@ fn proc_discover<'a>(
#[cfg(test)]
mod tests {
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use super::*;
use rand::prelude::*;
......@@ -238,34 +236,47 @@ mod tests {
let mut done = HashMap::<Address, Role>::new();
let len = instances.len();
let addrs = instances.keys().cloned().collect_vec();
let mut pending_requests: HashMap<_, _> = addrs
.iter()
.cloned()
.zip(std::iter::repeat(HashSet::new()))
.collect();
let mut rng = rand::thread_rng();
#[derive(Debug)]
enum Event {
Request(Address, Request, Address),
Response(Response, Address),
Response(Address, Response, Address),
}
let mut network: Vec<Event> = [].into();
while done.len() != len {
if rng.gen_bool(0.5) {
let addr = addrs.choose(&mut rng).unwrap();
let discovery = instances.get_mut(addr).unwrap();
if let Some((request, peer_addr)) = discovery.next() {
network.push(Event::Request(addr.clone(), request, peer_addr));
let src = addrs.choose(&mut rng).unwrap();
if !pending_requests.get(src).unwrap().is_empty() {
continue;
}
let discovery = instances.get_mut(src).unwrap();
if let Left((request, peer_addrs)) = discovery.next_or_role() {
for dst in peer_addrs {
pending_requests.get_mut(src).unwrap().insert(dst.clone());
network.push(Event::Request(src.clone(), request.clone(), dst))
}
}
} else {
match network.pop() {
match dbg!(network.pop()) {
Some(Event::Request(src, request, dst)) => {
let peer = instances.get_mut(&dst).unwrap();
let response = peer.handle_request(request, dst).clone();
network.push(Event::Response(response, src))
let response = peer.handle_request(request, dst.clone()).clone();
network.push(Event::Response(dst, response, src))
}
Some(Event::Response(response, dst)) => {
Some(Event::Response(src, response, dst)) => {
let peer = instances.get_mut(&dst).unwrap();
peer.handle_response(response);
if let State::Done(role) = &instances.get_mut(&dst).unwrap().state {
pending_requests.get_mut(&dst).unwrap().remove(&src);
peer.handle_response(src, response);
if let State::Done(role) = &peer.state {
done.insert(dst.clone(), role.clone());
}
}
......
......@@ -24,6 +24,7 @@ mod mailbox;
mod tarantool;
mod tlog;
mod traft;
mod util;
inventory::collect!(InnerTest);
......
......@@ -145,6 +145,7 @@ pub fn set_cfg(cfg: &Cfg) {
box_cfg.call_with_args(cfg).unwrap()
}
#[track_caller]
pub fn eval(code: &str) {
let l = lua_state();
l.exec(code).unwrap()
......@@ -204,3 +205,27 @@ where
}
}
}
#[inline]
pub fn net_box_call_or_log<Args, Res, Addr>(
address: Addr,
fn_name: &str,
args: Args,
timeout: Duration,
) -> Option<Res>
where
Args: AsTuple,
Addr: std::net::ToSocketAddrs + std::fmt::Display + slog::Value,
Res: serde::de::DeserializeOwned,
{
match net_box_call(&address, fn_name, &args, timeout) {
Ok(res) => Some(res),
Err(e) => {
crate::tlog!(Warning, "net_box_call failed: {e}";
"peer" => &address,
"fn" => fn_name,
);
None
}
}
}
#[derive(Debug, PartialEq, Eq, Hash)]
pub enum Either<L, R> {
Left(L),
Right(R),
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment