Skip to content
Snippets Groups Projects 56.5 KiB
Newer Older
fn raft_conf_change_loop(status: Rc<Cell<Status>>, storage: Storage) {
    let mut pool = ConnectionPool::builder(storage.peers.clone())

Georgy Moshkin's avatar
Georgy Moshkin committed
    'governor: loop {
        if !status.get().raft_state.is_leader() {
            event::wait(Event::StatusChanged).expect("Events system must be initialized");
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        let peers = storage.peers.all_peers().unwrap();
        let term = status.get().term;
        let cluster_id = storage.raft.cluster_id().unwrap().unwrap();
        let node = global().expect("must be initialized");
        // conf change
        if let Some(conf_change) = raft_conf_change(&storage.raft, &peers) {
            // main_loop gives the warranty that every ProposeConfChange
            // will sometimes be handled and there's no need in timeout.
            // It also guarantees that the notification will arrive only
            // after the node leaves the joint state.
            match node.propose_conf_change_and_wait(term, conf_change) {
                Ok(()) => tlog!(Info, "conf_change processed"),
                Err(e) => {
                    tlog!(Warning, "conf_change failed: {e}");
Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        // offline
        let to_offline = peers
            .filter(|peer| peer.current_grade != CurrentGrade::Offline)
            // TODO: process them all, not just the first one
            .find(|peer| peer.target_grade == TargetGrade::Offline);
        if let Some(peer) = to_offline {
            let res = (|| -> Result<_, Error> {
                let reqs = maybe_responding(&peers)
                    .filter(|peer| {
                        peer.current_grade == CurrentGrade::ShardingInitialized
                            || peer.current_grade == CurrentGrade::Online
                    .map(|peer| {
                            sharding::Request {
                                leader_and_term: LeaderWithTerm { leader_id, term },
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;
                for (peer_iid, resp) in res {
                    let sharding::Response {} = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "sharding reconfigured on peer";
                        "instance_id" => &*peer_iid,
            if let Err(e) = res {
                tlog!(Warning, "failed to reconfigure sharding: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;

            let instance_id = peer.instance_id.clone();
            let req = UpdatePeerRequest::new(instance_id, cluster_id.clone())
            let res = node.handle_topology_request_and_wait(req.into());
            if let Err(e) = res {
                tlog!(Warning, "failed to set peer offline: {e}";
                    "instance_id" => &*peer.instance_id,

        // raft sync
        // TODO: putting each stage in a different function
        // will make the control flow more readable
        let to_sync = peers
            .find(|peer| peer.has_grades(CurrentGrade::Offline, TargetGrade::Online));
        if let Some(peer) = to_sync {
            let commit = storage.raft.commit().unwrap().unwrap();

            let (rx, tx) = fiber::Channel::new(1).into_clones();
                SyncRaftRequest {
                    timeout: Duration::from_secs(10),
                move |res| tx.send(res).expect("mustn't fail"),
            .expect("shouldn't fail");
            let res = rx.recv().expect("ought not fail");
            let res = res.and_then(|SyncRaftResponse { commit }| {
                // TODO: change `Info` to `Debug`
                tlog!(Info, "peer synced";
                    "commit" => commit,
                    "instance_id" => &*peer.instance_id,

                let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                    .expect("can't be deinitialized")
            match res {
                Ok(peer) => {
                    tlog!(Info, "raft sync processed");
                    debug_assert!(peer.current_grade == CurrentGrade::RaftSynced);
                Err(e) => {
                    tlog!(Warning, "raft sync failed: {e}";
                        "instance_id" => &*peer.instance_id,
                        "peer" => &peer.peer_address,

                    // TODO: don't hard code timeout
                    event::wait_timeout(Event::TopologyChanged, Duration::from_millis(300))

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;

        // replication
        let to_replicate = peers
            // TODO: find all such peers in a given replicaset,
            // not just the first one
            .find(|peer| peer.has_grades(CurrentGrade::RaftSynced, TargetGrade::Online));
        if let Some(peer) = to_replicate {
            let replicaset_id = &peer.replicaset_id;
            let replicaset_iids = maybe_responding(&peers)
                .filter(|peer| &peer.replicaset_id == replicaset_id)
                .map(|peer| peer.instance_id.clone())

            let replicaset_size = replicaset_iids.len();
            let res = (|| -> Result<_, Error> {
                let reqs = replicaset_iids
                    .zip(repeat(replication::Request {
                        leader_and_term: LeaderWithTerm { leader_id, term },
                        replicaset_instances: replicaset_iids.clone(),
                        replicaset_id: replicaset_id.clone(),
                        // TODO: what if someone goes offline/expelled?
                        promote: replicaset_size == 1,
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                for (peer_iid, resp) in res {
                    let replication::Response { lsn } = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "configured replication with peer";
                        "instance_id" => &*peer_iid,
                        "lsn" => lsn,

                let mut req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)
                if replicaset_size == 1 {
                    // TODO: ignore expelled peers
                    // TODO: ignore offline peers
                    req = req.with_is_master(true);

            if let Err(e) = res {
                tlog!(Warning, "failed to configure replication: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;
            let replicaset_weight = storage
                .expect("storage error");
            if replicaset_weight.is_none() {
                if let Err(e) = (|| -> Result<(), Error> {
                    let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
                    let weight = if vshard_bootstrapped { 0. } else { 1. };
                    let mut ops = UpdateOps::new();
                    ops.assign(format!("['value']['{replicaset_id}']"), weight)?;
                    let req = traft::OpDML::update(
                    // TODO: don't hard code the timeout
                    node.propose_and_wait(req, Duration::from_secs(3))??;
                })() {
                    // TODO: what if all replicas have changed their grade
                    // successfully, but the replicaset_weight failed to set?
                    tlog!(Warning, "failed to set replicaset weight: {e}";
                        "replicaset_id" => replicaset_id,

                    // TODO: don't hard code timeout
                    event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                    continue 'governor;

            tlog!(Info, "configured replication"; "replicaset_id" => replicaset_id);

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;

Georgy Moshkin's avatar
Georgy Moshkin committed
            .find(|peer| peer.has_grades(CurrentGrade::Replicated, TargetGrade::Online));
        if let Some(peer) = to_shard {
            let res = (|| -> Result<(), Error> {
                let vshard_bootstrapped = storage.state.vshard_bootstrapped()?;
                let reqs = maybe_responding(&peers).map(|peer| {
                        sharding::Request {
                            leader_and_term: LeaderWithTerm { leader_id, term },
                            bootstrap: !vshard_bootstrapped && peer.raft_id == leader_id,
                // TODO: don't hard code timeout
                let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                for (peer_iid, resp) in res {
                    let sharding::Response {} = resp?;
                    // TODO: change `Info` to `Debug`
                    tlog!(Info, "initialized sharding with peer";
                        "instance_id" => &*peer_iid,
                let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)

                    // TODO: if this fails, it will only rerun next time vshard
                    // gets reconfigured
                            &(StateKey::VshardBootstrapped, true),
                        // TODO: don't hard code the timeout

            if let Err(e) = res {
                tlog!(Warning, "failed to initialize sharding: {e}");
                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;
            tlog!(Info, "sharding is initialized");
Georgy Moshkin's avatar
Georgy Moshkin committed

            continue 'governor;

        // sharding weights
            .find(|peer| peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online));
        if let Some(peer) = to_update_weights {
            let res = if let Some(new_weights) =
                get_new_weights(maybe_responding(&peers), &storage.state)
                (|| -> Result<(), Error> {
                    let peer_ids = maybe_responding(&peers).map(|peer| peer.instance_id.clone());
                    let reqs = {
                        leader_and_term: LeaderWithTerm { leader_id, term },
                        weights: Some(new_weights.clone()),
                    // TODO: don't hard code timeout
                    let res = call_all(&mut pool, reqs, Duration::from_secs(3))?;

                    for (peer_iid, resp) in res {
                        // TODO: change `Info` to `Debug`
                        tlog!(Info, "peer is online"; "instance_id" => &*peer_iid);

                    let req = UpdatePeerRequest::new(peer.instance_id.clone(), cluster_id)

                    // TODO: if this fails, it will only rerun next time vshard
                    // gets reconfigured
                        // TODO: OpDML::update with just the changes
                            &(StateKey::ReplicasetWeights, new_weights),
                        // TODO: don't hard code the timeout
            } else {
                (|| -> Result<(), Error> {
                    let to_online = peers.iter().filter(|peer| {
                        peer.has_grades(CurrentGrade::ShardingInitialized, TargetGrade::Online)
                    for Peer { instance_id, .. } in to_online {
                        let cluster_id = cluster_id.clone();
                        let req = UpdatePeerRequest::new(instance_id.clone(), cluster_id)
                        // TODO: change `Info` to `Debug`
                        tlog!(Info, "peer is online"; "instance_id" => &**instance_id);
            if let Err(e) = res {
                tlog!(Warning, "updating sharding weights failed: {e}");

                // TODO: don't hard code timeout
                event::wait_timeout(Event::TopologyChanged, Duration::from_secs(1)).unwrap();
                continue 'governor;

            tlog!(Info, "sharding is configured");

Georgy Moshkin's avatar
Georgy Moshkin committed
            continue 'governor;
        event::wait(Event::TopologyChanged).expect("Events system must be initialized");

    fn call_all<R, I>(
        pool: &mut ConnectionPool,
        reqs: impl IntoIterator<Item = (I, R)>,
        timeout: Duration,
    ) -> Result<Vec<(I, Result<R::Response, Error>)>, Error>
        I: traft::network::PeerId + Clone + std::fmt::Debug + 'static,
        // TODO: this crap is only needed to wait until results of all
        // the calls are ready. There are several ways to rafactor this:
        // - we could use a std-style channel that unblocks the reading end
        //   once all the writing ends have dropped
        //   (fiber::Channel cannot do that for now)
        // - using the std Futures we could use futures::join!
        // Those things aren't implemented yet, so this is what we do
        let reqs = reqs.into_iter().collect::<Vec<_>>();
        static mut SENT_COUNT: usize = 0;
        unsafe { SENT_COUNT = 0 };
        let (cond_rx, cond_tx) = Rc::new(fiber::Cond::new()).into_clones();
        let (rx, tx) = fiber::Channel::new(peer_count as _).into_clones();
            let tx = tx.clone();
            let cond_tx = cond_tx.clone();
            let id_copy = id.clone();
                tx.send((id_copy, res)).expect("mustn't fail");
                unsafe { SENT_COUNT += 1 };
                if unsafe { SENT_COUNT } == peer_count {
            .expect("shouldn't fail");
        // TODO: don't hard code timeout
        if !cond_rx.wait_timeout(timeout) {
            return Err(Error::Timeout);

    fn get_new_weights<'p>(
        peers: impl IntoIterator<Item = &'p Peer>,
        state: &State,
    ) -> Option<ReplicasetWeights> {
        let replication_factor = state.replication_factor().expect("storage error");
        let mut replicaset_weights = state.replicaset_weights().expect("storage error");
        let mut replicaset_sizes = HashMap::new();
        let mut weights_changed = false;
        for peer @ Peer { replicaset_id, .. } in peers {
            if !peer.may_respond() {
            let replicaset_size = replicaset_sizes.entry(replicaset_id.clone()).or_insert(0);
            *replicaset_size += 1;
            if *replicaset_size >= replication_factor && replicaset_weights[replicaset_id] == 0. {
                weights_changed = true;
                *replicaset_weights.get_mut(replicaset_id).unwrap() = 1.;

    fn maybe_responding(peers: &[Peer]) -> impl Iterator<Item = &Peer> {
        peers.iter().filter(|peer| peer.may_respond())
fn conf_change_single(node_id: RaftId, is_voter: bool) -> raft::ConfChangeSingle {
    let change_type = if is_voter {
    } else {
    raft::ConfChangeSingle {

static mut RAFT_NODE: Option<Box<Node>> = None;

pub fn set_global(node: Node) {
    unsafe {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
            "discovery::set_global() called twice, it's a leak"
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
pub fn global() -> Result<&'static Node, Error> {
    // Uninitialized raft node is a regular case. This case may take
    // place while the instance is executing `start_discover()` function.
    // It has already started listening, but the node is only initialized
    // in `postjoin()`.
    unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)

fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
    for pb in pbs {

fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> {
Yaroslav Dynnikov's avatar
Yaroslav Dynnikov committed
    let node = global()?;
        .ok_or("cluster_id is not set yet")?;
Sergey V's avatar
Sergey V committed

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,

    let peer = node.handle_topology_request_and_wait(req.into())?;
    let box_replication = node
        .replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?;
    // A joined peer needs to communicate with other nodes.
    // Provide it the list of raft voters in response.
    let mut raft_group = vec![];
    for raft_id in {
        match {
            Err(e) => {
                crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e);
            Ok(peer) => raft_group.push(peer),
    Ok(JoinResponse {
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

// Lua API entrypoint, run on any node.
pub fn expel_wrapper(instance_id: InstanceId) -> Result<(), traft::error::Error> {
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
    match expel_by_instance_id(instance_id) {
        Ok(ExpelResponse {}) => Ok(()),
        Err(e) => Err(traft::error::Error::Other(e)),

fn expel_by_instance_id(
    instance_id: InstanceId,
) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
        .ok_or("cluster_id is not set yet")?;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    expel(ExpelRequest {

// NetBox entrypoint. Run on any node.
fn raft_expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {

// Netbox entrypoint. For run on Leader only. Don't call directly, use `raft_expel` instead.
fn raft_expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {

fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
    let node = global()?;
    let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
    let leader =;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
    let leader_address = leader.peer_address;

    let fn_name = stringify_cfunc!(traft::node::raft_expel_on_leader);

    match crate::tarantool::net_box_call(&leader_address, fn_name, &req, Duration::MAX) {
        Ok::<traft::ExpelResponse, _>(_resp) => Ok(ExpelResponse {}),
        Err(e) => Err(Box::new(e)),

fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> {
        .ok_or("cluster_id is not set yet")?;
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    if req.cluster_id != cluster_id {
        return Err(Box::new(Error::ClusterIdMismatch {
            instance_cluster_id: req.cluster_id,
            cluster_cluster_id: cluster_id,

    let node = global()?;

    let leader_id = node.status().leader_id.ok_or("leader_id not found")?;

    if node.raft_id() != leader_id {
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed
        return Err(Box::from("not a leader"));

    let req2 = UpdatePeerRequest::new(req.instance_id, req.cluster_id)
Valentin Syrovatskiy's avatar
Valentin Syrovatskiy committed

    Ok(ExpelResponse {})

// NetBox entrypoint. Run on any node.
fn raft_sync_raft(req: SyncRaftRequest) -> Result<SyncRaftResponse, Box<dyn std::error::Error>> {
    let deadline = Instant::now() + req.timeout;
    loop {
        let commit = global()?.storage.raft.commit().unwrap().unwrap();
        if commit >= req.commit {
            return Ok(SyncRaftResponse { commit });

        let now = Instant::now();
        if now > deadline {
            return Err(Box::new(Error::Timeout));

        event::wait_timeout(Event::RaftEntryApplied, deadline - now)?;