Skip to content
Snippets Groups Projects
node.rs 56.5 KiB
Newer Older
fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
    let mut pool = ConnectionPool::builder(storage.peers.clone())
        .call_timeout(Duration::from_secs(1))
        .connect_timeout(Duration::from_millis(500))
        .inactivity_timeout(Duration::from_secs(60))
        .build();

Georgy Moshkin's avatar
Georgy Moshkin committed
    'governor: loop {
        if !status.get().raft_state.is_leader() {
            event::wait(Event::StatusChanged).expect("Events system must be initialized");
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        let peers = storage.peers.all_peers().unwrap();
        let term = status.get().term;
        let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
        let node = global().expect("must be initialized");
        ////////////////////////////////////////////////////////////////////////
        // conf change
        if let Some(conf_change) = raft_conf_change(&storage.raft, &peers) {
            // main_loop gives the warranty that every ProposeConfChange
            // will sometimes be handled and there's no need in timeout.
            // It also guarantees that the notification will arrive only
            // after the node leaves the joint state.
            match node.propose_conf_change_and_wait(term, conf_change) {
                Ok(()) => tlog!(Info, "conf_change processed"),
                Err(e) => {
                    tlog!(Warning, "conf_change failed: {e}");
                    fiber::sleep(Duration::from_secs(1));
                }
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        ////////////////////////////////////////////////////////////////////////
        // offline
        let to_offline = peers
            .iter()
            .filter(|peer| peer.current_grade != CurrentGrade::Offline)
            // TODO: process them all, not just the first one
            .find(|peer| peer.target_grade == TargetGrade::Offline);
        if let Some(peer) = to_offline {
            let res = (|| -> Result<_, Error> {
                let reqs = maybe_responding(&peers)
                    .filter(|peer| {
                        peer.current_grade == CurrentGrade::ShardingInitialized
                            || peer.current_grade == CurrentGrade::Online
                    })
                    .map(|peer| {
                        (
                            peer.instance_id.clone(),
                            sharding::Request {
                                leader_and_term: LeaderWithTerm { leader_id, term },
                                ..Default::default()
                            },
                        )
                    });
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
                for (peer_iid, resp) in res {
                    let sharding::Response {} = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "sharding reconfigured on peer";
                        "instance_id" => &*peer_iid,
                    );
                }
                Ok(())
            })();
            if let Err(e) = res {
                tlog!(Warning, "failed to reconfigure sharding: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;
            }

            let instance_id = peer.instance_id.clone();
            let req = UpdatePeerRequest::new(instance_id, cluster_id.clone())
                .with_current_grade(CurrentGrade::Offline);
            let res = node.handle_topology_request_and_wait(req.into());
            if let Err(e) = res {
                tlog!(Warning, "failed to set peer offline: {e}";
                    "instance_id" => &*peer.instance_id,
                );
            }
        }

        ////////////////////////////////////////////////////////////////////////
        // raft sync
        // TODO: putting each stage in a different function
        // will make the control flow more readable
        let to_sync = peers
            .iter()
            .find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online));
        if let Some(peer) = to_sync {
            let commit = storage.raft.commit().unwrap().unwrap();

            let (rx, tx) = fiber::Channel::new(1).into_clones();
            pool.call(
                &peer.raft_id,
                SyncRaftRequest {
                    commit,
                    timeout: Duration::from_secs(10),
                },
                move |res| tx.send(res).expect("mustn't fail"),
            )
            .expect("shouldn't fail");
            let res = rx.recv().expect("ought not fail");
            let res = res.and_then(|SyncRaftResponse { commit }| {
                // TODO: change `Info` to `Debug`
                tlog!(Info, "peer synced";
                    "commit" => commit,
                    "instance_id" => &*peer.instance_id,
                );

                let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                    .with_current_grade(CurrentGrade::RaftSynced);
                global()
                    .expect("can't be deinitialized")
                    .handle_topology_request_and_wait(req.into())
            });
            match res {
                Ok(peer) => {
                    tlog!(Info, "raft sync processed");
                    debug_assert!(peer.current_grade == CurrentGrade::RaftSynced);
                }
                Err(e) => {
                    tlog!(Warning, "raft sync failed: {e}";
                        "instance_id" => &*peer.instance_id,
                        "peer" => &peer.peer_address,
                    );

                    // TODO: don't hard code timeout
                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))
                        .unwrap();
                }
            }

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        }

        ////////////////////////////////////////////////////////////////////////
        // replication
        let to_replicate = peers
            .iter()
            // TODO: find all such peers in a given replicaset,
            // not just the first one
            .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
        if let Some(peer) = to_replicate {
            let replicaset_id = &peer.replicaset_id;
            let replicaset_iids = maybe_responding(&peers)
                .filter(|peer| &peer.replicaset_id == replicaset_id)
                .map(|peer| peer.instance_id.clone())
                .collect::<Vec<_>>();

            let replicaset_size = replicaset_iids.len();
            let res = (|| -> Result<_, Error> {
                let reqs = replicaset_iids
                    .iter()
                    .cloned()
                    .zip(repeat(replication::Request {
                        leader_and_term: LeaderWithTerm { leader_id, term },
                        replicaset_instances: replicaset_iids.clone(),
                        replicaset_id: replicaset_id.clone(),
                        // TODO: what if someone goes offline/expelled?
                        promote: replicaset_size == 1,
                    }));
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                for (peer_iid, resp) in res {
                    let replication::Response { lsn } = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "configured replication with peer";
                        "instance_id" => &*peer_iid,
                        "lsn" => lsn,
                    );

                let mut req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                    .with_current_grade(CurrentGrade::Replicated);
                if replicaset_size == 1 {
                    // TODO: ignore expelled peers
                    // TODO: ignore offline peers
                    req = req.with_is_master(true);
                node.handle_topology_request_and_wait(req.into())?;

                Ok(())
            })();
            if let Err(e) = res {
                tlog!(Warning, "failed to configure replication: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;
            let replicaset_weight = storage
                .state
                .replicaset_weight(replicaset_id)
                .expect("storage error");
            if replicaset_weight.is_none() {
                if let Err(e) = (|| -> Result<(), Error> {
                    let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
                    let weight = if vshard_bootstrapped { 0. } else { 1. };
                    let mut ops = UpdateOps::new();
                    ops.assign(format!("['value']['{replicaset_id}']"), weight)?;
                    let req = traft::OpDML::update(
                        ClusterSpace::State,
                        &[StateKey::ReplicasetWeights],
                        ops,
                    )?;
                    // TODO: don't hard code the timeout
                    node.propose_and_wait(req, Duration::from_secs(3))??;
                    Ok(())
                })() {
                    // TODO: what if all replicas have changed their grade
                    // successfully, but the replicaset_weight failed to set?
                    tlog!(Warning, "failed to set replicaset weight: {e}";
                        "replicaset_id" => replicaset_id,
                    );

                    // TODO: don't hard code timeout
                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                    continue 'governor;
                }
            }

            tlog!(Info, "configured replication"; "replicaset_id" => replicaset_id);

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;

        ////////////////////////////////////////////////////////////////////////
Georgy Moshkin's avatar
Georgy Moshkin committed
            .iter()
            .find(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
        if let Some(peer) = to_shard {
            let res = (|| -> Result<(), Error> {
                let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
                let reqs = maybe_responding(&peers).map(|peer| {
                    (
                        peer.instance_id.clone(),
                        sharding::Request {
                            leader_and_term: LeaderWithTerm { leader_id, term },
                            bootstrap: !vshard_bootstrapped && peer.raft_id == leader_id,
                            ..Default::default()
                        },
                    )
                });
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                for (peer_iid, resp) in res {
                    let sharding::Response {} = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "initialized sharding with peer";
                        "instance_id" => &*peer_iid,
                    );
                let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                    .with_current_grade(CurrentGrade::ShardingInitialized);
                node.handle_topology_request_and_wait(req.into())?;

                    // TODO: if this fails, it will only rerun next time vshard
                    // gets reconfigured
                    node.propose_and_wait(
                        traft::OpDML::replace(
                            ClusterSpace::State,
                            &(StateKey::VshardBootstrapped, true),
                        )?,
                        // TODO: don't hard code the timeout
                        Duration::from_secs(3),
                    )??;
                }

                Ok(())
            })();
            if let Err(e) = res {
                tlog!(Warning, "failed to initialize sharding: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;
            tlog!(Info, "sharding is initialized");
Georgy Moshkin's avatar
Georgy Moshkin committed

            continue 'governor;
        }

        ////////////////////////////////////////////////////////////////////////
        // sharding weights
            .find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
        if let Some(peer) = to_update_weights {
            let res = if let Some(new_weights) =
                get_new_weights(maybe_responding(&peers), &storage.state)
            {
                (|| -> Result<(), Error> {
                    let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
                    let reqs = peer_ids.zip(repeat(sharding::Request {
                        leader_and_term: LeaderWithTerm { leader_id, term },
                        weights: Some(new_weights.clone()),
                        ..Default::default()
                    }));
                    // TODO: don't hard code timeout
                    let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                    for (peer_iid, resp) in res {
                        // TODO: change `Info` to `Debug`
                        tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);
                    }

                    let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                        .with_current_grade(CurrentGrade::Online);
                    node.handle_topology_request_and_wait(req.into())?;

                    // TODO: if this fails, it will only rerun next time vshard
                    // gets reconfigured
                    node.propose_and_wait(
                        // TODO: OpDML::update with just the changes
                        traft::OpDML::replace(
                            ClusterSpace::State,
                            &(StateKey::ReplicasetWeights, new_weights),
                        )?,
                        // TODO: don't hard code the timeout
                        Duration::from_secs(3),
                    )??;
                    Ok(())
                })()
            } else {
                (|| -> Result<(), Error> {
                    let to_online = peers.iter().filter(|peer| {
                        peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)
                    });
                    for Peer { instance_id, .. } in to_online {
                        let cluster_id = cluster_id.clone();
                        let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id)
                            .with_current_grade(CurrentGrade::Online);
                        node.handle_topology_request_and_wait(req.into())?;
                        // TODO: change `Info` to `Debug`
                        tlog!(Info, "peer is online"; "instance_id" => &**instance_id);
                    }
                    Ok(())
                })()
            };
            if let Err(e) = res {
                tlog!(Warning, "updating sharding weights failed: {e}");

                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;

            tlog!(Info, "sharding is configured");

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        event::wait(Event::TopologyChanged).expect("Events system must be initialized");

    #[allow(clippy::type_complexity)]
    fn call_all<R, I>(
        pool: &mut ConnectionPool,
        reqs: impl IntoIterator<Item = (I, R)>,
        timeout: Duration,
    ) -> Result<Vec<(I, Result<R::Response, Error>)>, Error>
    where
        I: traft::network::PeerId + Clone + std::fmt::Debug + 'static,
    {
        // TODO: this crap is only needed to wait until results of all
        // the calls are ready. There are several ways to rafactor this:
        // - we could use a std-style channel that unblocks the reading end
        //   once all the writing ends have dropped
        //   (fiber::Channel cannot do that for now)
        // - using the std Futures we could use futures::join!
        //
        // Those things aren't implemented yet, so this is what we do
        let reqs = reqs.into_iter().collect::<Vec<_>>();
        static mut SENT_COUNT: usize = 0;
        unsafe { SENT_COUNT = 0 };
        let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones();
        let (rx, tx) = fiber::Channel::new(peer_count as _).into_clones();
            let tx = tx.clone();
            let cond_tx = cond_tx.clone();
            let id_copy = id.clone();
                tx.send((id_copy, res)).expect("mustn't fail");
                unsafe { SENT_COUNT += 1 };
                if unsafe { SENT_COUNT } == peer_count {
                    cond_tx.signal()
                }
            })
            .expect("shouldn't fail");
        }
        // TODO: don't hard code timeout
        if !cond_rx.wait_timeout(timeout) {
            return Err(Error::Timeout);
        }

        Ok(rx.into_iter().take(peer_count).collect())
    }
    #[inline(always)]
    fn get_new_weights<'p>(
        peers: impl IntoIterator<Item = &'p Peer>,
        state: &State,
    ) -> Option<ReplicasetWeights> {
        let replication_factor = state.replication_factor().expect("storage error");
        let mut replicaset_weights = state.replicaset_weights().expect("storage error");
        let mut replicaset_sizes = HashMap::new();
        let mut weights_changed = false;
        for peer @ Peer { replicaset_id, .. } in peers {
            if !peer.may_respond() {
                continue;
            }
            let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
            *replicaset_size += 1;
            if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
                weights_changed = true;
                *replicaset_weights.get_mut(replicaset_id).unwrap() = 1.;
            }
        }
        weights_changed.then_some(replicaset_weights)
    }

    #[inline(always)]
    fn maybe_responding(peers: &[Peer]) -> impl Iterator<Item = &Peer> {
        peers.iter().filter(|peer| peer.may_respond())
    }
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
    let change_type = if is_voter {
        raft::ConfChangeType::AddNode
    } else {
        raft::ConfChangeType::AddLearnerNode
    };
    raft::ConfChangeSingle {
        change_type,
        node_id,
        ..Default::default()
    }
}

static mut RAFT_NODE: Option<Box<Node>> = None;

pub fn set_global(node: Node) {
    unsafe {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
        assert!(
            RAFT_NODE.is_none(),
            "discovery::set_global() called twice, it's a leak"
        );
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
pub fn global() -> Result<&'static Node, Error> {
    // Uninitialized raft node is a regular case. This case may take
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}

#[proc(packed_args)]
fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
    for pb in pbs {
        node.step_and_yield(raft::Message::try_from(pb)?);
    }
    Ok(())
}

#[proc(packed_args)]
fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
        .cluster_id()?
        .ok_or("cluster_id is not set yet")?;
Sergey V's avatar
Sergey V committed

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,
        }));
    }

    let peer = node.handle_topology_request_and_wait(req.into())?;
    let box_replication = node
        .replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?;
    // A joined peer needs to communicate with other nodes.
    // Provide it the list of raft voters in response.
    let mut raft_group = vec![];
    for raft_id in node.storage.raft.voters()?.unwrap_or_default().into_iter() {
        match node.storage.peers.get(&raft_id) {
            Err(e) => {
                crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e);
            }
            Ok(peer) => raft_group.push(peer),
    Ok(JoinResponse {
        peer,
        raft_group,
        box_replication,
    })
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

// Lua API entrypoint, run on any node.
pub fn expel_wrapper(instance_id: InstanceId) -> Result<(), traft::error::Error> {
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
    match expel_by_instance_id(instance_id) {
        Ok(ExpelResponse {}) => Ok(()),
        Err(e) => Err(traft::error::Error::Other(e)),
    }
}

fn expel_by_instance_id(
    instance_id: InstanceId,
) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
        .cluster_id()?
        .ok_or("cluster_id is not set yet")?;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    expel(ExpelRequest {
        instance_id,
        cluster_id,
    })
}

// NetBox entrypoint. Run on any node.
#[proc(packed_args)]
fn raft_expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
    expel(req)
}

// Netbox entrypoint. For run on Leader only. Don't call directly, use `raft_expel` instead.
#[proc(packed_args)]
fn raft_expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
    expel_on_leader(req)
}

fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
    let node = global()?;
    let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
    let leader = node.storage.peers.get(&leader_id).unwrap();
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
    let leader_address = leader.peer_address;

    let fn_name = stringify_cfunc!(traft::node::raft_expel_on_leader);

    match crate::tarantool::net_box_call(&leader_address, fn_name, &req, Duration::MAX) {
        Ok::<traft::ExpelResponse, _>(_resp) => Ok(ExpelResponse {}),
        Err(e) => Err(Box::new(e)),
    }
}

fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
        .cluster_id()?
        .ok_or("cluster_id is not set yet")?;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,
        }));
    }

    let node = global()?;

    let leader_id = node.status().leader_id.ok_or("leader_id not found")?;

    if node.raft_id() != leader_id {
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
        return Err(Box::from("not a leader"));
    }

    let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
        .with_current_grade(CurrentGrade::Expelled);
    node.handle_topology_request_and_wait(req2.into())?;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    Ok(ExpelResponse {})
}

// NetBox entrypoint. Run on any node.
#[proc(packed_args)]
fn raft_sync_raft(req: SyncRaftRequest) -> Result<SyncRaftResponse, Box<dyn std::error::Error>> {
    let deadline = Instant::now() + req.timeout;
    loop {
        let commit = global()?.storage.raft.commit().unwrap().unwrap();
        if commit >= req.commit {
            return Ok(SyncRaftResponse { commit });
        }

        let now = Instant::now();
        if now > deadline {
            return Err(Box::new(Error::Timeout));
        }

        event::wait_timeout(Event::RaftEntryApplied, deadline - now)?;
    }
}