Skip to content
Snippets Groups Projects
topology.rs 11.6 KiB
Newer Older
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use crate::traft::instance_uuid;
use crate::traft::replicaset_uuid;
use crate::traft::JoinRequest;
use crate::traft::Peer;
use crate::traft::RaftId;

use raft::INVALID_INDEX;

pub struct Topology {
    peers: BTreeMap<RaftId, Peer>,
    diff: BTreeMap<RaftId, Peer>,
    to_replace: BTreeMap<RaftId, (RaftId, Peer)>,
    replication_factor: u8,

    max_raft_id: RaftId,
    instance_id_map: BTreeMap<String, RaftId>,
    replicaset_map: BTreeMap<String, BTreeSet<RaftId>>,
}

impl Topology {
    pub fn from_peers(mut peers: Vec<Peer>) -> Self {
        let mut ret = Self {
            peers: Default::default(),
            diff: Default::default(),
            to_replace: Default::default(),
            replication_factor: 2,

            max_raft_id: 0,
            instance_id_map: Default::default(),
            replicaset_map: Default::default(),
        };

        for peer in peers.drain(..) {
            ret.put_peer(peer);
        }

        ret
    }

    pub fn with_replication_factor(mut self, replication_factor: u8) -> Self {
        self.replication_factor = replication_factor;
        self
    }

    fn put_peer(&mut self, peer: Peer) {
        self.peers.insert(peer.raft_id, peer.clone());

        self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id);
        self.instance_id_map.insert(peer.instance_id, peer.raft_id);
        self.replicaset_map
            .entry(peer.replicaset_id.clone())
            .or_default()
            .insert(peer.raft_id);
    }

    fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> {
        let raft_id = self.instance_id_map.get(instance_id)?;
        self.peers.get(raft_id)
    }

    fn choose_replicaset_id(&self) -> String {
        for (replicaset_id, peers) in self.replicaset_map.iter() {
            if peers.len() < self.replication_factor as usize {
                return replicaset_id.clone();
            }
        }

        let mut i = 0u64;
        loop {
            i += 1;
            let replicaset_id = format!("r{i}");
            if self.replicaset_map.get(&replicaset_id).is_none() {
                return replicaset_id;
            }
        }
    }

    pub fn process(&mut self, req: &JoinRequest) -> Result<(), String> {
        if let Some(peer) = self.peer_by_instance_id(&req.instance_id) {
            match &req.replicaset_id {
                Some(replicaset_id) if replicaset_id != &peer.replicaset_id => {
                    let e = format!(
                        std::concat!(
                            "{} already joined with a different replicaset_id,",
                            " requested: {},",
                            " existing: {}.",
                        ),
                        req.instance_id, replicaset_id, peer.replicaset_id
                    );
                    return Err(e);
                }
                _ => (),
            }

            let mut peer = peer.clone();
            peer.peer_address = req.advertise_address.clone();
            peer.voter = req.voter;

            if req.voter {
                self.diff.insert(peer.raft_id, peer.clone());
            } else {
                let old_raft_id = peer.raft_id;
                peer.raft_id = self.max_raft_id + 1;

                self.to_replace
                    .insert(peer.raft_id, (old_raft_id, peer.clone()));
            }
            self.put_peer(peer);

            return Ok(());
        } else {
            let raft_id = self.max_raft_id + 1;
            let replicaset_id = match &req.replicaset_id {
                Some(v) => v.clone(),
                None => self.choose_replicaset_id(),
            };
            let replicaset_uuid = replicaset_uuid(&replicaset_id);

            let peer = Peer {
                raft_id,
                instance_id: req.instance_id.clone(),
                replicaset_id,
                commit_index: INVALID_INDEX,
                instance_uuid: instance_uuid(&req.instance_id),
                replicaset_uuid,
                peer_address: req.advertise_address.clone(),
                voter: req.voter,
            };

            self.diff.insert(raft_id, peer.clone());
            self.put_peer(peer);
        }

        Ok(())
    }

    pub fn diff(&self) -> Vec<Peer> {
        self.diff.values().cloned().collect()
    }

    pub fn to_replace(&self) -> Vec<(RaftId, Peer)> {
        self.to_replace.values().cloned().collect()
// Create first peer in the cluster
pub fn initial_peer(
    cluster_id: String,
    instance_id: String,
    replicaset_id: Option<String>,
    advertise_address: String,
) -> Peer {
    let mut topology = Topology::from_peers(vec![]);
    let req = JoinRequest {
        cluster_id,
        instance_id,
        replicaset_id,
        advertise_address,
        voter: true,
    };
    topology.process(&req).unwrap();
    topology.diff().pop().unwrap()
}

#[cfg(test)]
mod tests {
    use super::Topology;
    use crate::traft::instance_uuid;
    use crate::traft::replicaset_uuid;
    use crate::traft::JoinRequest;
    use crate::traft::Peer;

    macro_rules! peers {
        [ $( (
            $raft_id:expr,
            $instance_id:literal,
            $replicaset_id:literal,
            $peer_address:literal,
            $voter:literal
        ) ),* $(,)? ] => {
            vec![$(
                Peer {
                    raft_id: $raft_id,
                    peer_address: $peer_address.into(),
                    voter: $voter,
                    instance_id: $instance_id.into(),
                    replicaset_id: $replicaset_id.into(),
                    instance_uuid: instance_uuid($instance_id),
                    replicaset_uuid: replicaset_uuid($replicaset_id),
                    commit_index: raft::INVALID_INDEX,
    macro_rules! peer {
        (
            $raft_id:expr,
            $instance_id:literal,
            $replicaset_id:literal,
            $peer_address:literal,
            $voter:literal
        ) => {
            Peer {
                raft_id: $raft_id,
                peer_address: $peer_address.into(),
                voter: $voter,
                instance_id: $instance_id.into(),
                replicaset_id: $replicaset_id.into(),
                instance_uuid: instance_uuid($instance_id),
                replicaset_uuid: replicaset_uuid($replicaset_id),
                commit_index: raft::INVALID_INDEX,
            }
        };
    }

    macro_rules! req {
        (
            $instance_id:literal,
            $replicaset_id:expr,
            $advertise_address:literal,
            $voter:literal
        ) => {
            &JoinRequest {
Sergey V's avatar
Sergey V committed
                cluster_id: "cluster1".into(),
                instance_id: $instance_id.into(),
                replicaset_id: $replicaset_id.map(|v: &str| v.into()),
                advertise_address: $advertise_address.into(),
                voter: $voter,
            }
        };
    }

    macro_rules! test {
        (
            replication_factor: $replication_factor:literal,
            init: $peers:expr,
            req: [ $( $req:expr ),* $(,)?],
            expected_diff: $expected:expr,
            expected_to_replace: $expected_to_replace:expr,
        ) => {
            let mut t = Topology::from_peers($peers)
                .with_replication_factor($replication_factor);
            $( t.process($req).unwrap(); )*

            assert_eq!(t.diff(), $expected);
            assert_eq!(t.to_replace(), $expected_to_replace);
        };
    }

    #[test]
    fn test_simple() {
        assert_eq!(Topology::from_peers(vec![]).diff(), vec![]);

        let peers = peers![(1, "i1", "R1", "addr:1", true)];
        assert_eq!(Topology::from_peers(peers).diff(), vec![]);

        test!(
            replication_factor: 1,
            init: peers![],
            req: [
                req!("i1", None, "nowhere", true),
                req!("i2", None, "nowhere", true),
            ],
            expected_diff: peers![
                (1, "i1", "r1", "nowhere", true),
                (2, "i2", "r2", "nowhere", true),
            ],
            expected_to_replace: vec![],
            replication_factor: 1,
            init: peers![
                (1, "i1", "R1", "addr:1", true),
                req!("i2", Some("R2"), "addr:2", false),
            ],
            expected_diff: peers![
                (2, "i2", "R2", "addr:2", false),
            ],
            expected_to_replace: vec![],
        );
    }

    #[test]
    fn test_override() {
        test!(
            replication_factor: 1,
            init: peers![
                (1, "i1", "R1", "addr:1", false),
                req!("i1", None, "addr:2", true),
            ],
            expected_diff: peers![
                (1, "i1", "R1", "addr:2", true),
            ],
            expected_to_replace: vec![],
        );
    }

    #[test]
    fn test_batch_overlap() {
        test!(
            replication_factor: 1,
            init: peers![],
            req: [
                req!("i1", Some("R1"), "addr:1", false),
                req!("i1", None, "addr:2", true),
            ],
            expected_diff: peers![
                (1, "i1", "R1", "addr:2", true),
            ],
            expected_to_replace: vec![],

    #[test]
    fn test_replicaset_mismatch() {
        let expected_error = concat!(
            "i3 already joined with a different replicaset_id,",
            " requested: R-B,",
            " existing: R-A.",
        );

        let peers = peers![(3, "i3", "R-A", "x:1", false)];
        let mut topology = Topology::from_peers(peers);
        topology
            .process(req!("i3", Some("R-B"), "x:2", true))
            .map_err(|e| assert_eq!(e, expected_error))
            .unwrap_err();
        assert_eq!(topology.diff(), vec![]);
        assert_eq!(topology.to_replace(), vec![]);

        let peers = peers![(2, "i2", "R-A", "nowhere", true)];
        let mut topology = Topology::from_peers(peers);
        topology
            .process(req!("i3", Some("R-A"), "y:1", false))
            .unwrap();
        topology
            .process(req!("i3", Some("R-B"), "y:2", true))
            .map_err(|e| assert_eq!(e, expected_error))
            .unwrap_err();
        assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]);
        assert_eq!(topology.to_replace(), vec![]);

    #[test]
    fn test_replication_factor() {
        test!(
            replication_factor: 2,
            init: peers![
                (9, "i9", "r9", "nowhere", false),
                (10, "i9", "r9", "nowhere", false),
            ],
            req: [
                req!("i1", None, "addr:1", true),
                req!("i2", None, "addr:2", false),
                req!("i3", None, "addr:3", false),
                req!("i4", None, "addr:4", false),
            ],
            expected_diff: peers![
                (11, "i1", "r1", "addr:1", true),
                (12, "i2", "r1", "addr:2", false),
                (13, "i3", "r2", "addr:3", false),
                (14, "i4", "r2", "addr:4", false),
            ],
            expected_to_replace: vec![],
        );
    }

    #[test]
    fn test_replace() {
        test!(
            replication_factor: 2,
            init: peers![
                (1, "i1", "r1", "nowhere", false),
            ],
            req: [
                req!("i1", None, "addr:2", false),
            ],
            expected_diff: peers![],
            expected_to_replace: vec![
                (1, peer!(2, "i1", "r1", "addr:2", false)),
            ],