diff --git a/src/main.rs b/src/main.rs index 80f9cde94fccb4ab795094a5913bd6703c4a48f1..257aa94bbeb75b46fdb5d585d6028a5e44d5fae5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -75,6 +75,7 @@ fn picolib_setup(args: args::Run) { // // Export public API luamod.set("run", tlua::function0(move || start(&args))); + luamod.set("raft_status", tlua::function0(raft_status)); luamod.set( "raft_propose_info", tlua::function1(|x: String| raft_propose(Message::Info { msg: x })), @@ -184,6 +185,13 @@ fn start(args: &args::Run) { ); } +fn raft_status() -> traft::Status { + let stash = Stash::access(); + let raft_ref = stash.raft_node(); + let raft_node = raft_ref.as_ref().expect("Picodata not running yet"); + raft_node.status() +} + fn raft_propose(msg: Message) { let stash = Stash::access(); let raft_ref = stash.raft_node(); diff --git a/src/traft.rs b/src/traft.rs index 30e96a55ec94f57181c50c295f801a06f6dccbda..3561a38881522d7a73253f9ea5326d38b1e05626 100644 --- a/src/traft.rs +++ b/src/traft.rs @@ -4,6 +4,7 @@ mod storage; pub use network::ConnectionPool; pub use node::Node; +pub use node::Status; use serde::{Deserialize, Serialize}; use std::convert::TryFrom; pub use storage::Storage; diff --git a/src/traft/node.rs b/src/traft/node.rs index 7c82569c65c31c57b6b9fbd3e859fd286e72d950..85b7f13f92ff0e72efb65f35df3bc92eb9eee11a 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -1,9 +1,12 @@ use ::raft::prelude as raft; use ::raft::Error as RaftError; use ::tarantool::fiber; +use ::tarantool::tlua; use ::tarantool::util::IntoClones; +use std::cell::RefCell; use std::collections::HashMap; use std::convert::TryFrom; +use std::rc::Rc; use std::time::Duration; use std::time::Instant; @@ -16,9 +19,17 @@ use crate::traft::Storage; type RawNode = raft::RawNode<Storage>; type Notify = fiber::Channel<()>; +#[derive(Clone, Debug, tlua::Push, tlua::PushInto)] +pub struct Status { + id: u64, + leader_id: u64, + raft_state: String, +} + pub struct Node { _main_loop: fiber::LuaUnitJoinHandle<'static>, inbox: fiber::Channel<Request>, + status: Rc<RefCell<Status>>, } #[derive(Clone, Debug)] @@ -34,14 +45,25 @@ impl Node { pub fn new(cfg: &raft::Config, on_commit: fn(&[u8])) -> Result<Self, RaftError> { let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; let (inbox, inbox_clone) = fiber::Channel::new(0).into_clones(); - let loop_fn = move || raft_main(inbox_clone, raw_node, on_commit); + let (status, status_clone) = Rc::new(RefCell::new(Status { + id: cfg.id, + leader_id: 0, + raft_state: "Follower".into(), + })) + .into_clones(); + let loop_fn = move || raft_main(inbox_clone, status_clone, raw_node, on_commit); Ok(Node { inbox, + status, _main_loop: fiber::defer_proc(loop_fn), }) } + pub fn status(&self) -> Status { + self.status.borrow().clone() + } + pub fn propose(&self, data: impl Into<Vec<u8>>) { let req = Request::Propose { data: data.into() }; self.inbox.send(req).unwrap(); @@ -80,7 +102,12 @@ impl Node { } } -fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: fn(&[u8])) { +fn raft_main( + inbox: fiber::Channel<Request>, + status: Rc<RefCell<Status>>, + mut raw_node: RawNode, + on_commit: fn(&[u8]), +) { let mut next_tick = Instant::now() + Node::TICK; let mut pool = ConnectionPool::with_timeout(Node::TICK * 4); @@ -187,6 +214,12 @@ fn raft_main(inbox: fiber::Channel<Request>, mut raw_node: RawNode, on_commit: f Storage::persist_hard_state(hs).unwrap(); } + if let Some(ss) = ready.ss() { + let mut status = status.borrow_mut(); + status.leader_id = ss.leader_id; + status.raft_state = format!("{:?}", ss.raft_state); + } + if !ready.persisted_messages().is_empty() { // Send out the persisted messages come from the node. handle_messages(ready.take_persisted_messages()); diff --git a/test/couple_test.lua b/test/couple_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..d65b282110a915f2e42deb863d247b8faa3662cf --- /dev/null +++ b/test/couple_test.lua @@ -0,0 +1,74 @@ +local t = require('luatest') +local h = require('test.helper') +local g = t.group() + +local fio = require('fio') + +g.before_all(function() + g.data_dir = fio.tempdir() + local peer = {'127.0.0.1:13301', '127.0.0.1:13302'} + + g.cluster = { + i1 = h.Picodata:new({ + name = 'i1', + data_dir = g.data_dir .. '/i1', + listen = '127.0.0.1:13301', + peer = peer, + env = {PICODATA_RAFT_ID = "1"}, + }), + i2 = h.Picodata:new({ + name = 'i2', + data_dir = g.data_dir .. '/i2', + listen = '127.0.0.1:13302', + peer = peer, + env = {PICODATA_RAFT_ID = "2"}, + }), + } + + for _, node in pairs(g.cluster) do + node:start() + end +end) + +g.after_all(function() + for _, node in pairs(g.cluster) do + node:stop() + end + fio.rmtree(g.data_dir) +end) + +g.test = function() + -- Speed up node election + g.cluster.i1:interact({ + msg_type = "MsgTimeoutNow", + to = 1, + from = 0, + }) + + h.retrying({}, function() + t.assert_equals( + g.cluster.i2:connect():call('picolib.raft_status'), + { + id = 2, + leader_id = 1, + raft_state = "Follower", + } + ) + end) + + t.assert_equals( + g.cluster.i2:connect():call( + 'picolib.raft_propose_eval', + {1, '_G.check = box.info.listen'} + ), + true + ) + t.assert_equals( + g.cluster.i1:connect():eval('return check'), + '127.0.0.1:13301' + ) + t.assert_equals( + g.cluster.i2:connect():eval('return check'), + '127.0.0.1:13302' + ) +end diff --git a/test/single_test.lua b/test/single_test.lua index 7f4130467774159566332947c8eac799e4d37e02..470372faf8b6f4c892fa8c38ae9cc0fa04acdcfc 100644 --- a/test/single_test.lua +++ b/test/single_test.lua @@ -24,6 +24,15 @@ end) g.test = function() local conn = g.node:connect() + t.assert_equals( + conn:call('picolib.raft_status'), + { + id = 1, + leader_id = 0, + raft_state = "Follower", + } + ) + t.assert_equals( conn:call('picolib.raft_propose_eval', {1, 'return'}), false -- No leader is elected yet @@ -36,6 +45,17 @@ g.test = function() from = 0, }) + h.retrying({}, function() + t.assert_equals( + conn:call('picolib.raft_status'), + { + id = 1, + leader_id = 1, + raft_state = "Leader", + } + ) + end) + t.assert_equals( conn:call('picolib.raft_propose_eval', {0, 'return'}), false -- Timeout