From 0ff9eff7fcdb8816e7da71d52acecc2b220f931f Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Thu, 7 Jul 2022 21:49:26 +0300
Subject: [PATCH] refactor: Mailbox<NormalRequest> -> Mutex<RawNode>

---
 src/main.rs        |   8 +-
 src/traft/event.rs |   1 +
 src/traft/mod.rs   |   2 +-
 src/traft/node.rs  | 493 ++++++++++++++++++---------------------------
 4 files changed, 206 insertions(+), 298 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index c78e922514..1a3be7c5f4 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -584,6 +584,8 @@ fn postjoin(args: &args::Run) {
 
     let node = traft::node::Node::new(&raft_cfg);
     let node = node.expect("failed initializing raft node");
+    traft::node::set_global(node);
+    let node = traft::node::global().unwrap();
 
     let cs = traft::Storage::conf_state().unwrap();
     if cs.voters == [raft_cfg.id] {
@@ -597,12 +599,8 @@ fn postjoin(args: &args::Run) {
 
         node.tick(1); // apply configuration, if any
         node.campaign().ok(); // trigger election immediately
-        assert_eq!(node.status().raft_state, "Leader");
     }
 
-    traft::node::set_global(node);
-    let node = traft::node::global().unwrap();
-
     box_cfg.listen = Some(args.listen.clone());
     tarantool::set_cfg(&box_cfg);
 
@@ -616,7 +614,7 @@ fn postjoin(args: &args::Run) {
 
     loop {
         let timeout = Duration::from_secs(1);
-        if let Err(e) = traft::node::global().unwrap().read_index(timeout) {
+        if let Err(e) = node.read_index(timeout) {
             tlog!(Warning, "unable to get a read barrier: {e}");
             continue;
         } else {
diff --git a/src/traft/event.rs b/src/traft/event.rs
index 2bea914b22..b3687172df 100644
--- a/src/traft/event.rs
+++ b/src/traft/event.rs
@@ -60,6 +60,7 @@ define_events! {
     LeaveJointState, "raft.leave-joint-state";
     StatusChanged, "raft.status-changed";
     TopologyChanged, "raft.topology-changed";
+    RaftLoopNeeded, "raft.loop-needed";
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index e3b7439d6f..8a375d4d06 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -38,7 +38,7 @@ pub type ReplicasetId = String;
 /// - `count` is a simple in-memory counter. It's cheap to increment because it's volatile.
 /// - `gen` should be persisted upon LogicalClock initialization to ensure the uniqueness.
 /// - `id` corresponds to `raft_id` of the instance (that is already unique across nodes).
-#[derive(Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
+#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, Hash, PartialEq, Eq)]
 pub struct LogicalClock {
     id: u64,
     gen: u64,
diff --git a/src/traft/node.rs b/src/traft/node.rs
index 7d97540480..bc7e0e26bc 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -11,6 +11,7 @@ use ::raft::StateRole as RaftStateRole;
 use ::raft::INVALID_ID;
 use ::tarantool::error::TransactionError;
 use ::tarantool::fiber;
+use ::tarantool::fiber::Mutex;
 use ::tarantool::proc;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
@@ -30,7 +31,7 @@ use ::tarantool::util::IntoClones as _;
 use protobuf::Message as _;
 use std::iter::FromIterator as _;
 
-use crate::mailbox::Mailbox;
+use crate::cache::CachedCell;
 use crate::tlog;
 use crate::traft;
 use crate::traft::error::Error;
@@ -63,86 +64,31 @@ pub struct Status {
 }
 
 /// The heart of `traft` module - the Node.
-#[derive(Debug)]
 pub struct Node {
+    raw_node: Rc<Mutex<RawNode>>,
     raft_id: RaftId,
     _main_loop: fiber::UnitJoinHandle<'static>,
     _conf_change_loop: fiber::UnitJoinHandle<'static>,
-    main_inbox: Mailbox<NormalRequest>,
-    // join_inbox: TopologyMailbox,
     status: Rc<RefCell<Status>>,
+    notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>,
+    topology_cache: CachedCell<u64, Topology>,
+    lc: Cell<Option<LogicalClock>>,
 }
 
-/// A request to the raft main loop.
-#[derive(Clone, Debug)]
-enum NormalRequest {
-    /// Propose `raft::prelude::Entry` of `EntryNormal` kind.
-    ///
-    /// Make a notification when it's committed.
-    /// Notify the caller with `Result<Box<_>>`,
-    /// of the associated type `traft::Op::Result`.
-    ///
-    ProposeNormal { op: traft::Op, notify: Notify },
-
-    /// Propose `raft::prelude::Entry` of `EntryConfChange` kind.
-    ///
-    /// Make a notification when the second EntryConfChange is
-    /// committed (that corresponds to the leaving of a joint state).
-    /// Notify the caller with `Result<Box<()>>`.
-    ///
-    ProposeConfChange {
-        conf_change: raft::ConfChangeV2,
-        term: u64,
-        notify: Notify,
-    },
-
-    /// This kind of requests is special: it can only be processed on
-    /// the leader, and implies corresponding `TopologyRequest` to a
-    /// cached `struct Topology`. As a result, it proposes the
-    /// `Op::PersistPeer` entry to the raft.
-    ///
-    /// Make a notification when the entry is committed.
-    /// Notify the caller with `Result<Box<Peer>>`.
-    ///
-    HandleTopologyRequest {
-        req: TopologyRequest,
-        notify: Notify,
-    },
-
-    /// Get a read barrier. In some systems it's also called the "quorum read".
-    ///
-    /// Make a notification when index is read.
-    /// Notify the caller with `Result<Box<u64>>`,
-    /// holding the resulting `raft_index`.
-    ///
-    ReadIndex { notify: Notify },
-
-    /// Start a new raft term.
-    ///
-    /// Make a notification when request is processed.
-    /// Notify the caller with `Result<Box<()>>`
-    ///
-    Campaign { notify: Notify },
-
-    /// Handle message from anoher raft node.
-    ///
-    Step(raft::Message),
-
-    /// Tick the node.
-    ///
-    /// Make a notification when request is processed.
-    /// Notify the caller with `Result<(Box<()>)>`, with
-    /// the `Err` variant unreachable
-    ///
-    Tick { n_times: u32, notify: Notify },
+impl std::fmt::Debug for Node {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("Node")
+            .field("raft_id", &self.raft_id)
+            .finish_non_exhaustive()
+    }
 }
 
 impl Node {
     pub const TICK: Duration = Duration::from_millis(100);
 
     pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
-        let main_inbox = Mailbox::<NormalRequest>::new();
         let raw_node = RawNode::new(cfg, Storage, &tlog::root())?;
+        let raw_node = Rc::new(Mutex::new(raw_node));
         let status = Rc::new(RefCell::new(Status {
             id: cfg.id,
             leader_id: None,
@@ -150,21 +96,24 @@ impl Node {
             is_ready: false,
         }));
 
+        let notifications = Rc::new(RefCell::new(HashMap::new()));
+
         let main_loop_fn = {
             let status = status.clone();
-            let main_inbox = main_inbox.clone();
-            move || raft_main_loop(main_inbox, status, raw_node)
+            let raw_node = raw_node.clone();
+            let notifications = notifications.clone();
+            move || raft_main_loop(status, raw_node, notifications)
         };
 
         let conf_change_loop_fn = {
             let status = status.clone();
-            let main_inbox = main_inbox.clone();
-            move || raft_conf_change_loop(status, main_inbox)
+            move || raft_conf_change_loop(status)
         };
 
         let node = Node {
+            raw_node,
             raft_id: cfg.id,
-            main_inbox,
+            notifications,
             status,
             _main_loop: fiber::Builder::new()
                 .name("raft_main_loop")
@@ -176,6 +125,13 @@ impl Node {
                 .proc(conf_change_loop_fn)
                 .start()
                 .unwrap(),
+            topology_cache: CachedCell::new(),
+            lc: {
+                let id = Storage::raft_id().unwrap().unwrap();
+                let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
+                Storage::persist_gen(gen).unwrap();
+                Cell::new(Some(LogicalClock::new(id, gen)))
+            },
         };
 
         // Wait for the node to enter the main loop
@@ -197,9 +153,23 @@ impl Node {
     }
 
     pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> {
+        let mut raw_node = self.raw_node.lock();
+
         let (rx, tx) = Notify::new().into_clones();
-        self.main_inbox
-            .send(NormalRequest::ReadIndex { notify: tx });
+        let lc = self.next_lc();
+        self.notifications.borrow_mut().insert(lc, tx);
+
+        // read_index puts this context into an Entry,
+        // so we've got to compose full EntryContext,
+        // despite single LogicalClock would be enough
+        let ctx = traft::EntryContextNormal {
+            lc,
+            op: traft::Op::Nop,
+        }
+        .to_bytes();
+        raw_node.read_index(ctx);
+        drop(raw_node);
+        event::broadcast(Event::RaftLoopNeeded);
         rx.recv_timeout::<u64>(timeout)
     }
 
@@ -208,37 +178,39 @@ impl Node {
         op: T,
         timeout: Duration,
     ) -> Result<T::Result, Error> {
+        let mut raw_node = self.raw_node.lock();
         let (rx, tx) = Notify::new().into_clones();
-        self.main_inbox.send(NormalRequest::ProposeNormal {
-            op: op.into(),
-            notify: tx,
-        });
+        let lc = self.next_lc();
+        self.notifications.borrow_mut().insert(lc, tx);
+        let ctx = traft::EntryContextNormal { lc, op: op.into() }.to_bytes();
+        raw_node.propose(ctx, vec![])?;
+        event::broadcast(Event::RaftLoopNeeded);
+        drop(raw_node);
         rx.recv_timeout::<T::Result>(timeout)
     }
 
     pub fn campaign(&self) -> Result<(), Error> {
-        let (rx, tx) = Notify::new().into_clones();
-        let req = NormalRequest::Campaign { notify: tx };
-        self.main_inbox.send(req);
-        rx.recv::<()>()
+        Ok(self.raw_node.lock().campaign()?)
     }
 
     pub fn step(&self, msg: raft::Message) {
-        let req = NormalRequest::Step(msg);
-        self.main_inbox.send(req);
+        let mut raw_node = self.raw_node.lock();
+        if msg.to != raw_node.raft.id {
+            return;
+        }
+        if let Err(e) = raw_node.step(msg) {
+            tlog!(Error, "{e}");
+        }
+        drop(raw_node);
     }
 
     pub fn tick(&self, n_times: u32) {
-        let (rx, tx) = Notify::new().into_clones();
-        let req = NormalRequest::Tick {
-            n_times,
-            notify: tx,
-        };
-        self.main_inbox.send(req);
-        match rx.recv() {
-            Ok(()) => (),
-            Err(e) => tlog!(Warning, "{e}"),
+        let mut raw_node = self.raw_node.lock();
+        for _ in 0..n_times {
+            raw_node.tick();
         }
+        event::broadcast(Event::RaftLoopNeeded);
+        drop(raw_node);
     }
 
     pub fn timeout_now(&self) {
@@ -251,12 +223,132 @@ impl Node {
     }
 
     pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> {
-        let (rx, tx) = Notify::new().into_clones();
-        let req = NormalRequest::HandleTopologyRequest { req, notify: tx };
+        let mut raw_node = self.raw_node.lock();
+
+        let status = raw_node.status();
+        if status.ss.raft_state != RaftStateRole::Leader {
+            return Err(RaftError::ConfChangeError("not a leader".into()).into());
+        }
+
+        let mut topology = self
+            .topology_cache
+            .pop(&raw_node.raft.term)
+            .unwrap_or_else(|| {
+                let peers = Storage::peers().unwrap();
+                let replication_factor = Storage::replication_factor().unwrap().unwrap();
+                Topology::from_peers(peers).with_replication_factor(replication_factor)
+            });
+
+        let peer_result = match req {
+            TopologyRequest::Join(JoinRequest {
+                instance_id,
+                replicaset_id,
+                advertise_address,
+                failure_domains,
+                ..
+            }) => topology.join(
+                instance_id,
+                replicaset_id,
+                advertise_address,
+                failure_domains,
+            ),
+
+            TopologyRequest::UpdatePeer(UpdatePeerRequest {
+                instance_id,
+                health,
+                failure_domains,
+                ..
+            }) => topology.update_peer(&instance_id, health, failure_domains),
+        };
+
+        let mut peer = crate::unwrap_ok_or!(peer_result, Err(e) => {
+            self.topology_cache.put(raw_node.raft.term, topology);
+            return Err(RaftError::ConfChangeError(e).into());
+        });
 
-        self.main_inbox.send(req);
+        peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
+
+        let lc = self.next_lc();
+        let ctx = traft::EntryContextNormal {
+            op: traft::Op::PersistPeer { peer },
+            lc,
+        };
+
+        let (rx, tx) = Notify::new().into_clones();
+        self.notifications.borrow_mut().insert(lc, tx);
+        raw_node.propose(ctx.to_bytes(), vec![])?;
+        self.topology_cache.put(raw_node.raft.term, topology);
+        event::broadcast(Event::RaftLoopNeeded);
+        drop(raw_node);
         rx.recv::<Peer>()
     }
+
+    fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> {
+        let mut raw_node = self.raw_node.lock();
+        // In some states proposing a ConfChange is impossible.
+        // Check if there's a reason to reject it.
+
+        #[allow(clippy::never_loop)]
+        let reason: Option<&str> = loop {
+            // Checking leadership is only needed for the
+            // correct latch management. It doesn't affect
+            // raft correctness. Checking the instance is a
+            // leader makes sure the proposed `ConfChange`
+            // is appended to the raft log immediately
+            // instead of sending `MsgPropose` over the
+            // network.
+            if raw_node.raft.state != RaftStateRole::Leader {
+                break Some("not a leader");
+            }
+
+            if term != raw_node.raft.term {
+                break Some("raft term mismatch");
+            }
+
+            // Without this check the node would silently ignore the conf change.
+            // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
+            if raw_node.raft.has_pending_conf() {
+                break Some("already has pending confchange");
+            }
+
+            break None;
+        };
+
+        if let Some(e) = reason {
+            return Err(RaftError::ConfChangeError(e.into()).into());
+        }
+
+        let prev_index = raw_node.raft.raft_log.last_index();
+        raw_node.propose_conf_change(vec![], conf_change)?;
+
+        // oops, current instance isn't actually a leader
+        // (which is impossible in theory, but we're not
+        // sure in practice) and sent the ConfChange message
+        // to the raft network instead of appending it to the
+        // raft log.
+        let last_index = raw_node.raft.raft_log.last_index();
+        assert_eq!(last_index, prev_index + 1);
+
+        let (rx, tx) = Notify::new().into_clones();
+        with_joint_state_latch(|joint_state_latch| {
+            assert!(joint_state_latch.take().is_none());
+            joint_state_latch.set(Some(JointStateLatch {
+                index: last_index,
+                notify: tx,
+            }));
+        });
+
+        event::broadcast(Event::RaftLoopNeeded);
+        drop(raw_node);
+        rx.recv()
+    }
+
+    fn next_lc(&self) -> LogicalClock {
+        let mut lc = self.lc.take().expect("it's always Some");
+        lc.inc();
+        self.lc.set(Some(lc));
+        lc
+    }
 }
 
 #[derive(Debug)]
@@ -441,9 +533,9 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) {
 }
 
 fn raft_main_loop(
-    main_inbox: Mailbox<NormalRequest>,
     status: Rc<RefCell<Status>>,
-    mut raw_node: RawNode,
+    raw_node: Rc<Mutex<RawNode>>,
+    notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>,
 ) {
     let mut next_tick = Instant::now() + Node::TICK;
     let mut pool = ConnectionPool::builder()
@@ -457,192 +549,15 @@ fn raft_main_loop(
         pool.connect(peer.raft_id, peer.peer_address);
     }
 
-    let mut notifications: HashMap<LogicalClock, Notify> = HashMap::new();
-    let mut lc = {
-        let id = Storage::raft_id().unwrap().unwrap();
-        let gen = Storage::gen().unwrap().unwrap_or(0) + 1;
-        Storage::persist_gen(gen).unwrap();
-        LogicalClock::new(id, gen)
-    };
-
-    let topology_cache = crate::cache::CachedCell::<u64, Topology>::new();
-    // let mut topology: Option<(u64, Topology)> = None;
-
     loop {
         // Clean up obsolete notifications
-        notifications.retain(|_, notify: &mut Notify| !notify.is_closed());
-
-        for req in main_inbox.receive_all(Node::TICK) {
-            match req {
-                NormalRequest::ProposeNormal { op, notify } => {
-                    lc.inc();
-
-                    let ctx = traft::EntryContextNormal { lc: lc.clone(), op }.to_bytes();
-                    if let Err(e) = raw_node.propose(ctx, vec![]) {
-                        notify.notify_err(e);
-                    } else {
-                        notifications.insert(lc.clone(), notify);
-                    }
-                }
-                NormalRequest::HandleTopologyRequest { req, notify } => {
-                    lc.inc();
-
-                    let status = raw_node.status();
-                    if status.ss.raft_state != RaftStateRole::Leader {
-                        let e = RaftError::ConfChangeError("not a leader".into());
-                        notify.notify_err(e);
-                        continue;
-                    }
-
-                    let mut topology =
-                        topology_cache.pop(&raw_node.raft.term).unwrap_or_else(|| {
-                            let peers = Storage::peers().unwrap();
-                            let replication_factor =
-                                Storage::replication_factor().unwrap().unwrap();
-                            Topology::from_peers(peers).with_replication_factor(replication_factor)
-                        });
-
-                    let peer_result = match req {
-                        TopologyRequest::Join(JoinRequest {
-                            instance_id,
-                            replicaset_id,
-                            advertise_address,
-                            failure_domains,
-                            ..
-                        }) => topology.join(
-                            instance_id,
-                            replicaset_id,
-                            advertise_address,
-                            failure_domains,
-                        ),
-
-                        TopologyRequest::UpdatePeer(UpdatePeerRequest {
-                            instance_id,
-                            health,
-                            failure_domains,
-                            ..
-                        }) => topology.update_peer(&instance_id, health, failure_domains),
-                    };
-
-                    let mut peer = match peer_result {
-                        Ok(peer) => peer,
-                        Err(e) => {
-                            notify.notify_err(RaftError::ConfChangeError(e));
-                            topology_cache.put(raw_node.raft.term, topology);
-                            continue;
-                        }
-                    };
-
-                    peer.commit_index = raw_node.raft.raft_log.last_index() + 1;
-
-                    let ctx = traft::EntryContextNormal {
-                        op: traft::Op::PersistPeer { peer },
-                        lc: lc.clone(),
-                    };
-
-                    if let Err(e) = raw_node.propose(ctx.to_bytes(), vec![]) {
-                        notify.notify_err(e);
-                    } else {
-                        notifications.insert(lc.clone(), notify);
-                        topology_cache.put(raw_node.raft.term, topology);
-                    }
-                }
-                NormalRequest::ProposeConfChange {
-                    conf_change,
-                    term,
-                    notify,
-                } => {
-                    // In some states proposing a ConfChange is impossible.
-                    // Check if there's a reason to reject it.
-
-                    #[allow(clippy::never_loop)]
-                    let reason: Option<&str> = loop {
-                        // Checking leadership is only needed for the
-                        // correct latch management. It doesn't affect
-                        // raft correctness. Checking the instance is a
-                        // leader makes sure the proposed `ConfChange`
-                        // is appended to the raft log immediately
-                        // instead of sending `MsgPropose` over the
-                        // network.
-                        if raw_node.raft.state != RaftStateRole::Leader {
-                            break Some("not a leader");
-                        }
-
-                        if term != raw_node.raft.term {
-                            break Some("raft term mismatch");
-                        }
-
-                        // Without this check the node would silently ignore the conf change.
-                        // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
-                        if raw_node.raft.has_pending_conf() {
-                            break Some("already has pending confchange");
-                        }
-
-                        break None;
-                    };
-
-                    if let Some(e) = reason {
-                        let e = RaftError::ConfChangeError(e.into());
-                        notify.notify_err(e);
-                        continue;
-                    }
+        notifications
+            .borrow_mut()
+            .retain(|_, notify: &mut Notify| !notify.is_closed());
 
-                    let prev_index = raw_node.raft.raft_log.last_index();
-                    if let Err(e) = raw_node.propose_conf_change(vec![], conf_change) {
-                        notify.notify_err(e);
-                    } else {
-                        // oops, current instance isn't actually a leader
-                        // (which is impossible in theory, but we're not
-                        // sure in practice) and sent the ConfChange message
-                        // to the raft network instead of appending it to the
-                        // raft log.
-                        let last_index = raw_node.raft.raft_log.last_index();
-                        assert_eq!(last_index, prev_index + 1);
-
-                        with_joint_state_latch(|joint_state_latch| {
-                            assert!(joint_state_latch.take().is_none());
-                            joint_state_latch.set(Some(JointStateLatch {
-                                index: last_index,
-                                notify,
-                            }));
-                        });
-                    }
-                }
-                NormalRequest::ReadIndex { notify } => {
-                    lc.inc();
-
-                    // read_index puts this context into an Entry,
-                    // so we've got to compose full EntryContext,
-                    // despite single LogicalClock would be enough
-                    let ctx = traft::EntryContextNormal {
-                        lc: lc.clone(),
-                        op: traft::Op::Nop,
-                    }
-                    .to_bytes();
-                    raw_node.read_index(ctx);
-                    notifications.insert(lc.clone(), notify);
-                }
-                NormalRequest::Campaign { notify } => match raw_node.campaign() {
-                    Ok(()) => notify.notify_ok(()),
-                    Err(e) => notify.notify_err(e),
-                },
-                NormalRequest::Step(msg) => {
-                    if msg.to != raw_node.raft.id {
-                        continue;
-                    }
-                    if let Err(e) = raw_node.step(msg) {
-                        tlog!(Error, "{e}");
-                    }
-                }
-                NormalRequest::Tick { n_times, notify } => {
-                    for _ in 0..n_times {
-                        raw_node.tick();
-                    }
-                    notify.notify_ok(());
-                }
-            }
-        }
+        event::wait_timeout(Event::RaftLoopNeeded, Node::TICK).expect("Events must be initialized");
 
+        let mut raw_node = raw_node.lock();
         let now = Instant::now();
         if now > next_tick {
             next_tick = now + Node::TICK;
@@ -673,7 +588,7 @@ fn raft_main_loop(
             let committed_entries = ready.take_committed_entries();
             handle_committed_entries(
                 committed_entries,
-                &mut notifications,
+                &mut *notifications.borrow_mut(),
                 &mut raw_node,
                 &mut pool,
                 &mut topology_changed,
@@ -708,7 +623,7 @@ fn raft_main_loop(
             }
 
             let read_states = ready.take_read_states();
-            handle_read_states(read_states, &mut notifications);
+            handle_read_states(read_states, &mut *notifications.borrow_mut());
 
             Ok(())
         })
@@ -731,7 +646,7 @@ fn raft_main_loop(
             let committed_entries = light_rd.take_committed_entries();
             handle_committed_entries(
                 committed_entries,
-                &mut notifications,
+                &mut *notifications.borrow_mut(),
                 &mut raw_node,
                 &mut pool,
                 &mut topology_changed,
@@ -756,7 +671,7 @@ fn raft_main_loop(
     }
 }
 
-fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<NormalRequest>) {
+fn raft_conf_change_loop(status: Rc<RefCell<Status>>) {
     loop {
         if status.borrow().raft_state != "Leader" {
             event::wait(Event::StatusChanged).expect("Events system must be initialized");
@@ -849,18 +764,12 @@ fn raft_conf_change_loop(status: Rc<RefCell<Status>>, main_inbox: Mailbox<Normal
             ..Default::default()
         };
 
-        let (rx, tx) = Notify::new().into_clones();
-        main_inbox.send(NormalRequest::ProposeConfChange {
-            term,
-            conf_change,
-            notify: tx,
-        });
-
+        let node = global().expect("must be initialized");
         // main_loop gives the warranty that every ProposeConfChange
         // will sometimes be handled and there's no need in timeout.
         // It also guarantees that the notification will arrive only
         // after the node leaves the joint state.
-        match rx.recv() {
+        match node.propose_conf_change(term, conf_change) {
             Ok(()) => tlog!(Info, "conf_change processed"),
             Err(e) => tlog!(Warning, "conf_change failed: {e}"),
         }
-- 
GitLab