From d5c2817030ba20997a831f1949135b323be4c074 Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Sat, 16 Apr 2022 03:55:13 +0300
Subject: [PATCH] Fix tests

---
 src/main.rs              | 47 ++++++++++++++++------------------------
 src/tarantool.rs         |  7 ++++--
 src/traft/node.rs        | 31 +++++++++++++++++---------
 test/helper/picodata.lua |  2 +-
 test/threesome_test.lua  | 26 +++++++++++++++-------
 5 files changed, 64 insertions(+), 49 deletions(-)

diff --git a/src/main.rs b/src/main.rs
index b51752d3f4..eef8a290d6 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -52,38 +52,38 @@ fn picolib_setup(args: &args::Run) {
     );
     luamod.set(
         "raft_tick",
-        tlua::function1(|n_times: u32| {
-            traft::node::global().expect("uninitialized").tick(n_times);
+        tlua::function1(|n_times: u32| -> Result<(), traft::node::Error> {
+            Ok(traft::node::global()?.tick(n_times))
         }),
     );
     luamod.set(
         "raft_read_index",
-        tlua::function1(|timeout: f64| {
-            traft::node::global()
-                .expect("uninitialized")
-                .read_index(Duration::from_secs_f64(timeout))
+        tlua::function1(|timeout: f64| -> Result<u64, traft::node::Error> {
+            traft::node::global()?.read_index(Duration::from_secs_f64(timeout))
         }),
     );
     luamod.set(
         "raft_propose_info",
-        tlua::function1(|x: String| {
-            traft::node::global()
-                .expect("uninitialized")
-                .propose(traft::Op::Info { msg: x }, Duration::from_secs(1))
+        tlua::function1(|x: String| -> Result<u64, traft::node::Error> {
+            traft::node::global()?.propose(traft::Op::Info { msg: x }, Duration::from_secs(1))
         }),
     );
     luamod.set(
         "raft_timeout_now",
-        tlua::function0(|| traft::node::global().expect("uninitialized").timeout_now()),
+        tlua::function0(|| -> Result<(), traft::node::Error> {
+            Ok(traft::node::global()?.timeout_now())
+        }),
     );
     luamod.set(
         "raft_propose_eval",
-        tlua::function2(|timeout: f64, x: String| {
-            traft::node::global().expect("uninitialized").propose(
-                traft::Op::EvalLua { code: x },
-                Duration::from_secs_f64(timeout),
-            )
-        }),
+        tlua::function2(
+            |timeout: f64, x: String| -> Result<u64, traft::node::Error> {
+                traft::node::global()?.propose(
+                    traft::Op::EvalLua { code: x },
+                    Duration::from_secs_f64(timeout),
+                )
+            },
+        ),
     );
     {
         l.exec(
@@ -434,16 +434,7 @@ fn start_join(leader_uri: String, supervisor: Supervisor) {
 
     use traft::node::raft_join;
     let fn_name = stringify_cfunc!(raft_join);
-    let timeout = Duration::from_secs_f32(1.5);
-    let resp: traft::node::JoinResponse =
-        tarantool::net_box_call(&leader_uri, fn_name, &req, timeout).unwrap_or_else(|e| {
-            tlog!(Warning, "net_box_call failed: {e}";
-                "peer" => &leader_uri,
-                "fn" => fn_name,
-            );
-
-            panic!();
-        });
+    let resp: traft::node::JoinResponse = tarantool::net_box_call_retry(&leader_uri, fn_name, &req);
 
     picolib_setup(&args);
     assert!(tarantool::cfg().is_none());
@@ -532,7 +523,7 @@ fn postjoin(supervisor: Supervisor) {
     }
 
     loop {
-        let timeout = Duration::from_secs(1);
+        let timeout = Duration::from_millis(220);
         let me = traft::Storage::peer_by_raft_id(raft_id)
             .unwrap()
             .expect("peer not found");
diff --git a/src/tarantool.rs b/src/tarantool.rs
index 62b6788a35..389da9f35d 100644
--- a/src/tarantool.rs
+++ b/src/tarantool.rs
@@ -186,7 +186,7 @@ where
 pub fn net_box_call_retry<Args, Res, Addr>(address: Addr, fn_name: &str, args: &Args) -> Res
 where
     Args: AsTuple,
-    Addr: std::net::ToSocketAddrs + std::fmt::Display,
+    Addr: std::net::ToSocketAddrs + std::fmt::Display + slog::Value,
     Res: serde::de::DeserializeOwned,
 {
     loop {
@@ -195,7 +195,10 @@ where
         match net_box_call(&address, fn_name, args, timeout) {
             Ok(v) => break v,
             Err(e) => {
-                crate::tlog!(Warning, "could not connect to {}: {}", address, e);
+                crate::tlog!(Warning, "net_box_call failed: {e}";
+                    "peer" => &address,
+                    "fn" => fn_name,
+                );
                 fiber::sleep(timeout.saturating_sub(now.elapsed()))
             }
         }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index ab952e103e..e3630c3ef6 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -210,7 +210,6 @@ fn raft_main_loop(
         .inactivity_timeout(Duration::from_secs(60))
         .build();
 
-    // This is a temporary hack until fair joining is implemented
     for peer in Storage::peers().unwrap() {
         pool.connect(peer.raft_id, peer.peer_address);
     }
@@ -544,19 +543,24 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor
 
         main_inbox.send(NormalRequest::ProposeConfChange { peers, notify: tx });
 
-        let res: Result<u64, RaftError> = rx.recv_timeout(Duration::MAX).unwrap();
-        match res {
-            Ok(v) => {
+        match rx.recv_timeout(Duration::from_millis(200)) {
+            Ok(Ok(v)) => {
                 for (_, notify) in &batch {
                     notify.try_send(Ok(v)).expect("that's a bug");
                 }
             }
-            Err(e) => {
+            Ok(Err(e)) => {
                 for (_, notify) in &batch {
                     let e = RaftError::ConfChangeError(format!("{e:?}"));
                     notify.try_send(Err(e)).expect("that's a bug");
                 }
             }
+            Err(_) => {
+                for (_, notify) in &batch {
+                    let e = RaftError::ConfChangeError("timeout".into());
+                    notify.try_send(Err(e)).expect("that's a bug");
+                }
+            }
         }
     }
 }
@@ -565,17 +569,25 @@ static mut RAFT_NODE: Option<&'static Node> = None;
 
 pub fn set_global(node: Node) {
     unsafe {
+        assert!(
+            RAFT_NODE.is_none(),
+            "discovery::set_global() called twice, it's a leak"
+        );
         RAFT_NODE = Some(Box::leak(Box::new(node)));
     }
 }
 
-pub fn global() -> Option<&'static Node> {
-    unsafe { RAFT_NODE }
+pub fn global() -> Result<&'static Node, Error> {
+    // Uninitialized raft node is a regular case. This case may take
+    // place while the instance is executing `start_discover()` function.
+    // It has already started listening, but the node is only initialized
+    // in `postjoin()`.
+    unsafe { RAFT_NODE }.ok_or(Error::Uninitialized)
 }
 
 #[proc(packed_args)]
 fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> {
-    let node = unsafe { RAFT_NODE }.ok_or(Error::Uninitialized)?;
+    let node = global()?;
     for pb in pbs {
         node.step(raft::Message::try_from(pb)?);
     }
@@ -584,8 +596,7 @@ fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> {
 
 #[proc(packed_args)]
 fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> {
-    let node = unsafe { RAFT_NODE }
-        .ok_or_else(|| RaftError::ConfChangeError("Uninitialized yet".into()))?;
+    let node = global()?;
 
     tlog!(Warning, "join_one({req:?})");
     node.join_one(req.clone(), Duration::MAX)?;
diff --git a/test/helper/picodata.lua b/test/helper/picodata.lua
index 4bd355d148..efc299e132 100644
--- a/test/helper/picodata.lua
+++ b/test/helper/picodata.lua
@@ -174,8 +174,8 @@ end
 -- Raise an exception if promotion fails.
 function Picodata:promote_or_fail()
     checks('Picodata')
-    self:connect():call('picolib.raft_timeout_now')
     return luatest.helpers.retrying({}, function()
+        self:connect():call('picolib.raft_timeout_now')
         self:assert_raft_status("Leader", self.id)
     end)
 end
diff --git a/test/threesome_test.lua b/test/threesome_test.lua
index 4d9d43a275..352ccc0fa9 100644
--- a/test/threesome_test.lua
+++ b/test/threesome_test.lua
@@ -59,11 +59,21 @@ local function propose_state_change(srv, value)
 end
 
 g.test_log_rollback = function()
+    -- TODO
+    -- Этот тест стал некорректен с появлением фазы дискавери.
+    -- Инстансы i2 и i3 не смогут стартануть, т.к. одним из пиров
+    -- является дохлый i1.
+    -- Тем не менее, сам тест удалять не следует. Данная
+    -- проблема требует пересмотреть лишь подход к созданию причин
+    -- для ролбека рафт лога. Но основная задача теста (проверка
+    -- поведения пикодаты при ролбеке) остается актуальной.
+    t.skip('Fix me')
+
     -- Speed up node election
     g.cluster.i1:promote_or_fail()
     h.retrying({}, function()
-        g.cluster.i2:assert_raft_status("Follower", 1)
-        g.cluster.i3:assert_raft_status("Follower", 1)
+        g.cluster.i2:assert_raft_status("Follower", g.cluster.i1.id)
+        g.cluster.i3:assert_raft_status("Follower", g.cluster.i1.id)
     end)
 
     t.assert(
@@ -74,9 +84,10 @@ g.test_log_rollback = function()
     g.cluster.i2:stop()
     g.cluster.i3:stop()
 
+    -- No operations can be committed, i1 is alone.
     t.assert_equals(
         {propose_state_change(g.cluster.i1, "i1 lost the quorum")},
-        {nil, "foo"}
+        {nil, "timeout"}
     )
 
     -- And now i2 + i3 can't reach i1.
@@ -87,7 +98,7 @@ g.test_log_rollback = function()
     -- Help I2 to become a new leader.
     g.cluster.i2:promote_or_fail()
     h.retrying({}, function()
-        g.cluster.i3:assert_raft_status("Follower", 2)
+        g.cluster.i3:assert_raft_status("Follower", g.cluster.i2.id)
     end)
 
     t.assert(
@@ -97,12 +108,11 @@ g.test_log_rollback = function()
     -- Now i1 has an uncommitted, but persisted entry that should be rolled back.
     g.cluster.i1:start()
     h.retrying({}, function()
-        g.cluster.i1:assert_raft_status("Follower", 2)
+        g.cluster.i1:assert_raft_status("Follower", g.cluster.i2.id)
     end)
 
-    t.assert_equals(
-        propose_state_change(g.cluster.i1, "i1 is alive again"),
-        true
+    t.assert(
+        propose_state_change(g.cluster.i1, "i1 is alive again")
     )
 end
 
-- 
GitLab