Skip to content
Snippets Groups Projects
discovery.rs 11.17 KiB
use ::tarantool::fiber::{mutex::MutexGuard, sleep, Mutex};
use ::tarantool::proc;
use ::tarantool::uuid::Uuid;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::error::Error as StdError;
use std::time::{Duration, Instant};

use crate::stringify_cfunc;
use crate::tarantool;
use crate::traft;
use crate::util::Either::{self, Left, Right};

type Address = String;

#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Role {
    Leader { address: Address },
    NonLeader { leader: Address },
}

impl Role {
    fn new(address: Address, is_leader: bool) -> Self {
        if is_leader {
            Self::Leader { address }
        } else {
            Self::NonLeader { leader: address }
        }
    }

    fn leader_address(&self) -> &Address {
        match self {
            Self::Leader { address } => address,
            Self::NonLeader { leader } => leader,
        }
    }
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LeaderElection {
    tmp_id: String,
    peers: BTreeSet<Address>,
}

#[derive(Clone, Debug, Serialize, Deserialize)]
pub enum State {
    LeaderElection(LeaderElection),
    Done(Role),
}

pub type Request = LeaderElection;
pub type Response = State;

#[derive(Debug)]
pub struct Discovery {
    // BTreeSet for determinism
    visited: BTreeSet<Address>,
    address: Option<Address>,
    state: State,
}

impl Discovery {
    fn new(tmp_id: impl Into<String>, peers: impl IntoIterator<Item = impl Into<Address>>) -> Self {
        // BTree and sorting for deterministic results and simpler asserts tests.
        let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect();
        assert!(!peers.is_empty(), "peers should not be empty");
        Self {
            visited: [].into(),
            address: None,
            state: State::LeaderElection(LeaderElection {
                tmp_id: tmp_id.into(),
                peers,
            }),
        }
    }

    fn handle_request(&mut self, request: Request, to: Address) -> &Response {
        match &mut self.state {
            State::Done(_) => {} // done we are
            State::LeaderElection(LeaderElection { tmp_id, peers }) => {
                if !request.peers.is_subset(peers) {
                    // found a new peer
                    self.visited.clear()
                }
                peers.extend(request.peers);

                if tmp_id == &request.tmp_id {
                    match &self.address {
                        Some(address) if address != &to => {
                            todo!("current peer is reachable by multiple addresses")
                        }
                        Some(_) => {}
                        None => self.address = Some(to),
                    };
                }
            }
        }
        &self.state
    }

    fn handle_response(&mut self, from: Address, response: Response) {
        self.visited.insert(from);
        match (&mut self.state, response) {
            (
                State::LeaderElection(LeaderElection { peers, .. }),
                Response::LeaderElection(response),
            ) => {
                if !response.peers.is_subset(peers) {
                    // found a new peer
                    self.visited.clear()
                }
                peers.extend(response.peers);

                if let Some(address) = &self.address {
                    if peers.is_subset(&self.visited)
                        && peers
                            .iter()
                            .next()
                            .expect("not expected peer_addresses to be empty")
                            == address
                    {
                        self.state = State::Done(Role::Leader {
                            address: address.clone(),
                        });
                        self.visited.clear();
                        self.address = None;
                    }
                }
            }
            (State::LeaderElection { .. }, Response::Done(role)) => {
                self.state = State::Done(Role::NonLeader {
                    leader: role.leader_address().into(),
                });
                self.visited.clear();
                self.address = None;
            }
            (State::Done(_), _) => {}
        }
    }
    fn next_or_role(&self) -> Either<(Request, Vec<Address>), Role> {
        match &self.state {
            State::LeaderElection(election) => {
                let mut next_peers = election
                    .peers
                    .difference(&self.visited)
                    .cloned()
                    .collect::<Vec<_>>();
                if next_peers.is_empty() {
                    next_peers.extend(election.peers.iter().next().cloned())
                }
                assert!(!next_peers.is_empty());
                Left((election.clone(), next_peers))
            }
            State::Done(role) => Right(role.clone()),
        }
    }
}
static mut DISCOVERY: Option<Box<Mutex<Discovery>>> = None;

fn discovery() -> Option<MutexGuard<'static, Discovery>> {
    unsafe { DISCOVERY.as_ref() }.map(|d| d.lock())
}

pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) {
    let d = Discovery::new(Uuid::random().to_string(), peers);
    unsafe { DISCOVERY = Some(Box::new(Mutex::new(d))) }
}

pub fn wait_global() -> Role {
    loop {
        let d = discovery().expect("discovery uninitialized");
        let (request, curr_peers) = match d.next_or_role() {
            Left(l) => l,
            Right(role) => break role,
        };
        drop(d); // release the lock before doing i/o
        let round_start = Instant::now();
        for address in curr_peers {
            if let Some(response) = tarantool::net_box_call_or_log(
                &address,
                stringify_cfunc!(proc_discover),
                (&request, &address),
                Duration::from_secs(2),
            ) {
                discovery()
                    .expect("discovery deinitialized")
                    .handle_response(address, response)
            }
        }
        sleep(Duration::from_millis(200).saturating_sub(round_start.elapsed()))
    }
}

#[proc]
fn proc_discover<'a>(request: Request, request_to: Address) -> Result<Response, Box<dyn StdError>> {
    let ready_ids = traft::node::global().ok().and_then(|node| {
        let status = node.status();
        status
            .leader_id
            .map(|leader_id| (&node.storage.peer_addresses, leader_id, status.id))
    });
    if let Some((peers_addresses, leader_id, id)) = ready_ids {
        let leader_address = peers_addresses.try_get(leader_id)?;
        Ok(Response::Done(Role::new(leader_address, leader_id == id)))
    } else {
        let mut discovery = discovery();
        let discovery = discovery.as_mut().ok_or("discovery uninitialized")?;
        Ok(discovery.handle_request(request, request_to).clone())
    }
}

#[cfg(test)]
mod tests {
    use std::collections::{BTreeMap, HashMap, HashSet};

    use super::*;
    use rand::prelude::*;

    fn run(
        instances: impl IntoIterator<Item = (impl Into<Address>, Discovery)>,
    ) -> HashMap<Address, Role> {
        let mut instances: BTreeMap<Address, Discovery> =
            instances.into_iter().map(|(k, v)| (k.into(), v)).collect();
        let mut done = HashMap::<Address, Role>::new();
        let len = instances.len();
        let addrs = instances.keys().cloned().collect::<Vec<_>>();
        let mut pending_requests: HashMap<_, _> = addrs
            .iter()
            .cloned()
            .zip(std::iter::repeat(HashSet::new()))
            .collect();

        let mut rng = rand::thread_rng();

        #[derive(Debug)]
        enum Event {
            Request(Address, Request, Address),
            Response(Address, Response, Address),
        }

        let mut network: Vec<Event> = [].into();

        while done.len() != len {
            if rng.gen_bool(0.5) {
                let src = addrs.choose(&mut rng).unwrap();
                if !pending_requests.get(src).unwrap().is_empty() {
                    continue;
                }
                let discovery = instances.get_mut(src).unwrap();
                if let Left((request, peer_addrs)) = discovery.next_or_role() {
                    for dst in peer_addrs {
                        pending_requests.get_mut(src).unwrap().insert(dst.clone());
                        network.push(Event::Request(src.clone(), request.clone(), dst))
                    }
                }
            } else {
                match network.pop() {
                    Some(Event::Request(src, request, dst)) => {
                        let peer = instances.get_mut(&dst).unwrap();
                        let response = peer.handle_request(request, dst.clone()).clone();
                        network.push(Event::Response(dst, response, src))
                    }
                    Some(Event::Response(src, response, dst)) => {
                        let peer = instances.get_mut(&dst).unwrap();
                        pending_requests.get_mut(&dst).unwrap().remove(&src);
                        peer.handle_response(src, response);
                        if let State::Done(role) = &peer.state {
                            done.insert(dst.clone(), role.clone());
                        }
                    }
                    None => {}
                };
            }
        }

        done
    }

    #[test]
    fn test_discovery_1() {
        for _ in 0..999 {
            let instances = [
                ("host1:1", Discovery::new("1", ["host1:1"])),
                ("host2:2", Discovery::new("2", ["host1:1"])),
                ("host3:3", Discovery::new("3", ["host1:1"])),
            ];
            let res = run(instances);
            let first = res.values().next().unwrap().leader_address();
            assert!(
                res.values().map(Role::leader_address).all(|la| la == first),
                "multiple leaders: {:#?}",
                res
            );
        }
    }

    #[test]
    fn test_discovery_2() {
        for _ in 0..999 {
            let instances = [
                ("host1:1", Discovery::new("1", ["host2:2"])),
                ("host2:2", Discovery::new("2", ["host2:2"])),
                ("host3:3", Discovery::new("3", ["host2:2"])),
            ];
            let res = run(instances);
            let first = res.values().next().unwrap().leader_address();
            assert!(
                res.values().map(Role::leader_address).all(|la| la == first),
                "multiple leaders: {:#?}",
                res
            );
        }
    }

    #[test]
    fn test_discovery_3() {
        for _ in 0..999 {
            let instances = [
                (
                    "host1:1",
                    Discovery::new("1", ["host1:1", "host2:2", "host3:3"]),
                ),
                ("host2:2", Discovery::new("2", ["host2:2", "host3:3"])),
                ("host3:3", Discovery::new("3", ["host3:3"])),
            ];
            let res = run(instances);
            let first = res.values().next().unwrap().leader_address();
            assert!(
                res.values().map(Role::leader_address).all(|la| la == first),
                "multiple leaders: {:#?}",
                res
            );
        }
    }
}