From fb0646847a2e3f24fda70bc69d9fee7dcf55ca7b Mon Sep 17 00:00:00 2001
From: Valentin Syrovatskiy <v.syrovatskiy@picodata.io>
Date: Tue, 23 Aug 2022 12:17:11 +0300
Subject: [PATCH] refactor: add fn raft_sync_raft

---
 src/main.rs        |  1 +
 src/traft/event.rs |  1 +
 src/traft/mod.rs   | 13 ++++++++-
 src/traft/node.rs  | 67 ++++++++++++++++++++++++++++++++++++++++++++--
 4 files changed, 79 insertions(+), 3 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index 05c8198580..dcb067111c 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -281,6 +281,7 @@ fn init_handlers() {
     declare_cfunc!(traft::node::raft_join);
     declare_cfunc!(traft::node::raft_expel_on_leader);
     declare_cfunc!(traft::node::raft_expel);
+    declare_cfunc!(traft::node::raft_sync_raft);
     declare_cfunc!(traft::failover::raft_update_peer);
 }
 
diff --git a/src/traft/event.rs b/src/traft/event.rs
index ddcdc998a1..fed643694a 100644
--- a/src/traft/event.rs
+++ b/src/traft/event.rs
@@ -64,6 +64,7 @@ define_events! {
     StatusChanged, "raft.status-changed";
     TopologyChanged, "raft.topology-changed";
     RaftLoopNeeded, "raft.loop-needed";
+    RaftEntryApplied, "raft.entry-applied";
 }
 
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/traft/mod.rs b/src/traft/mod.rs
index 52b9d22eb6..0f9af4c74f 100644
--- a/src/traft/mod.rs
+++ b/src/traft/mod.rs
@@ -20,6 +20,7 @@ use std::any::Any;
 use std::collections::HashMap;
 use std::convert::TryFrom;
 use std::fmt::{Debug, Display};
+use std::time::Duration;
 use uuid::Uuid;
 
 use protobuf::Message as _;
@@ -574,12 +575,21 @@ impl Encode for ExpelRequest {}
 pub struct ExpelResponse {}
 impl Encode for ExpelResponse {}
 
+#[derive(Clone, Debug, Serialize, Deserialize)]
+pub struct SyncRaftRequest {
+    pub commit: u64,
+    pub timeout: Duration,
+}
+impl Encode for SyncRaftRequest {}
+
 ///////////////////////////////////////////////////////////////////////////////
 /// Activity state of an instance.
 #[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)]
 pub enum Grade {
-    // Instance has gracefully shut down.
+    // Instance has gracefully shut down or has not been started yet.
     Offline,
+    // Instance has synced by commit index.
+    RaftSynced,
     // Instance is active and is handling requests.
     Online,
     // Instance has permanently removed from cluster.
@@ -590,6 +600,7 @@ impl Grade {
     const fn to_str(&self) -> &str {
         match self {
             Self::Offline => "Offline",
+            Self::RaftSynced => "RaftSynced",
             Self::Online => "Online",
             Self::Expelled => "Expelled",
         }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index d8b704f2a0..04bf3dc94b 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -47,7 +47,9 @@ use crate::traft::Op;
 use crate::traft::Storage;
 use crate::traft::Topology;
 use crate::traft::TopologyRequest;
-use crate::traft::{ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, UpdatePeerRequest};
+use crate::traft::{
+    ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, SyncRaftRequest, UpdatePeerRequest,
+};
 
 use super::Grade;
 use super::OpResult;
@@ -477,7 +479,9 @@ fn handle_committed_entries(
                 "error persisting applied index: {e}";
                 "index" => last_entry.index
             );
-        };
+        } else {
+            event::broadcast(Event::RaftEntryApplied);
+        }
     }
 }
 
@@ -997,3 +1001,62 @@ fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::erro
 
     Ok(ExpelResponse {})
 }
+
+// NetBox entrypoint. Run on any node.
+#[proc(packed_args)]
+fn raft_sync_raft(req: SyncRaftRequest) -> Result<(), Box<dyn std::error::Error>> {
+    let deadline = Instant::now() + req.timeout;
+    loop {
+        if Storage::commit().unwrap().unwrap() >= req.commit {
+            return Ok(());
+        }
+
+        let now = Instant::now();
+        if now > deadline {
+            return Err(Box::new(Error::Timeout));
+        }
+
+        event::wait_timeout(Event::RaftEntryApplied, deadline - now)?;
+    }
+}
+
+// Run on Leader
+fn call_raft_sync_raft(promotee: &Peer, commit: u64) -> Result<(), Box<dyn std::error::Error>> {
+    let fn_name = stringify_cfunc!(traft::node::raft_sync_raft);
+    let req = SyncRaftRequest {
+        commit,
+        timeout: Duration::from_secs(10),
+    };
+
+    match crate::tarantool::net_box_call(&promotee.peer_address, fn_name, &req, Duration::MAX) {
+        Ok::<(), _>(_) => Ok(()),
+        Err(e) => Err(Box::new(e)),
+    }
+}
+
+// Run on Leader by topology governor
+fn sync_raft(promotee: &Peer) -> Result<(), Box<dyn std::error::Error>> {
+    let commit = Storage::commit().unwrap().unwrap();
+
+    match call_raft_sync_raft(promotee, commit) {
+        Ok(_) => {
+            let node = global()?;
+
+            let leader_id = node.status().leader_id.ok_or("leader_id not found")?;
+
+            if node.raft_id != leader_id {
+                return Err(Box::from("not a leader"));
+            }
+
+            let instance_id = promotee.instance_id.clone();
+            let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?;
+
+            let req = UpdatePeerRequest::new(instance_id, cluster_id).with_grade(Grade::RaftSynced);
+
+            node.handle_topology_request_and_wait(req.into())?;
+
+            Ok(())
+        }
+        Err(e) => Err(e),
+    }
+}
-- 
GitLab