diff --git a/picolib/lib.rs b/picolib/lib.rs index 3f5dfdbe8574014d6d7c385b3286e9bf31d43194..1dd935cfeef36eb758548b795d70d98e27367fac 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -131,8 +131,7 @@ fn main_run() { applied: traft::Storage::applied().unwrap_or_default(), ..Default::default() }; - let mut node = traft::Node::new(&raft_cfg).unwrap(); - node.start(handle_committed_data); + let node = traft::Node::new(&raft_cfg, handle_committed_data).unwrap(); stash.set_raft_node(node); std::env::var("PICODATA_LISTEN").ok().and_then(|v| { @@ -154,14 +153,9 @@ fn main_run() { fn raft_propose(msg: Message) { let stash = Stash::access(); let raft_ref = stash.raft_node(); - let raft_node = raft_ref.as_ref().unwrap(); - let data: Vec<u8> = msg.into(); - tlog!( - Info, - "propose binary data ({} bytes).......................................", - data.len() - ); - raft_node.borrow_mut().propose(vec![], data).unwrap(); + let raft_node = raft_ref.as_ref().expect("Picodata not running yet"); + tlog!(Info, "propose {:?} ................................", msg); + raft_node.propose(msg.into()); tlog!(Info, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); } diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs index 6565810438f028d3897ee2f1310f1eff8e04d1d8..670f82c61a4f0ac74c4cb15dd3b86c40a0fccfcc 100644 --- a/picolib/traft/node.rs +++ b/picolib/traft/node.rs @@ -1,66 +1,76 @@ -use raft::prelude::*; +use ::tarantool::fiber; +use ::tarantool::util::IntoClones; +use raft::eraftpb::Entry as RaftEntry; +use raft::eraftpb::EntryType as RaftEntryType; +use raft::eraftpb::Message as RaftMessage; use raft::Error as RaftError; -use std::ops::{Deref, DerefMut}; - -use std::cell::RefCell; -use std::rc::Rc; use std::time::Duration; +use std::time::Instant; use super::storage::Storage; use crate::tlog; -use ::tarantool::fiber; -// pub type Node = RawNode<Storage>; type RawNode = raft::RawNode<Storage>; pub struct Node { - raw_node: Rc<RefCell<RawNode>>, - main_loop: Option<fiber::LuaUnitJoinHandle>, + _main_loop: fiber::LuaUnitJoinHandle, + inbox: fiber::Channel<RawRequest>, +} + +#[derive(Clone, Debug)] +enum RawRequest { + Propose(Vec<u8>), + Step(RaftMessage), } impl Node { - pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { + pub const TICK: Duration = Duration::from_millis(100); + + pub fn new(cfg: &raft::Config, handle_committed_data: fn(&[u8])) -> Result<Self, RaftError> { let logger = tlog::root(); - let raw_node = RawNode::new(cfg, Storage, &logger)?; - let raw_node = Rc::from(RefCell::from(raw_node)); - let ret = Node { - raw_node, - main_loop: None, - }; - Ok(ret) - } + let mut raw_node = RawNode::new(cfg, Storage, &logger)?; - pub fn start(&mut self, handle_committed_data: fn(&[u8])) { - assert!(self.main_loop.is_none(), "Raft loop is already started"); + let (tx, rx) = fiber::Channel::new(0).into_clones(); - let raw_node = self.raw_node.clone(); let loop_fn = move || { + let mut next_tick = Instant::now() + Self::TICK; + loop { - fiber::sleep(Duration::from_millis(100)); - // let mut stash: RefMut<Stash> = stash.borrow_mut(); - // let mut raft_node = stash.raft_node.as_mut().unwrap(); - let mut raw_node = raw_node.borrow_mut(); - raw_node.tick(); + use RawRequest::*; + match rx.recv_timeout(Self::TICK) { + Ok(Propose(data)) => { + raw_node.propose(vec![], data).unwrap(); + } + Ok(Step(msg)) => { + raw_node.step(msg).unwrap(); + } + Err(fiber::RecvError::Timeout) => (), + Err(fiber::RecvError::Disconnected) => unreachable!(), + } + + let now = Instant::now(); + if now > next_tick { + next_tick = now + Self::TICK; + raw_node.tick(); + } + on_ready(&mut raw_node, handle_committed_data); } }; - self.main_loop = Some(fiber::defer_proc(loop_fn)); + Ok(Node { + _main_loop: fiber::defer_proc(loop_fn), + inbox: tx, + }) } -} - -impl Deref for Node { - type Target = Rc<RefCell<RawNode>>; - fn deref(&self) -> &Self::Target { - &self.raw_node + pub fn propose(&self, data: Vec<u8>) { + self.inbox.send(RawRequest::Propose(data)).unwrap(); } -} -impl DerefMut for Node { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.raw_node + pub fn step(&self, msg: RaftMessage) { + self.inbox.send(RawRequest::Step(msg)).unwrap(); } } @@ -75,7 +85,7 @@ fn on_ready(raft_group: &mut RawNode, handle_committed_data: fn(&[u8])) { let mut ready: raft::Ready = raft_group.ready(); tlog!(Info, "--- {:?}", ready); - let handle_messages = |msgs: Vec<Message>| { + let handle_messages = |msgs: Vec<RaftMessage>| { for _msg in msgs { tlog!(Info, "--- handle message: {:?}", _msg); // Send messages to other peers. @@ -95,12 +105,12 @@ fn on_ready(raft_group: &mut RawNode, handle_committed_data: fn(&[u8])) { // store.wl().apply_snapshot(snap).unwrap(); } - let handle_committed_entries = |committed_entries: Vec<Entry>| { + let handle_committed_entries = |committed_entries: Vec<RaftEntry>| { for entry in committed_entries { tlog!(Info, "--- committed_entry: {:?}", entry); Storage::persist_applied(entry.index); - if entry.get_entry_type() == EntryType::EntryNormal { + if entry.get_entry_type() == RaftEntryType::EntryNormal { handle_committed_data(entry.get_data()) }