diff --git a/picolib/lib.rs b/picolib/lib.rs index 9f71c348ad41f9ebdc623dba35caf030e15dd9cd..800f9a5523a6911c0b1f3d7d6a1c997ea2adb672 100644 --- a/picolib/lib.rs +++ b/picolib/lib.rs @@ -1,10 +1,8 @@ use slog::{debug, info, o}; use std::os::raw::c_int; use ::tarantool::tlua; -use std::time::Duration; mod tarantool; -mod raft; -use ::tarantool::fiber; +mod traft; pub struct InnerTest { pub name: &'static str, @@ -19,15 +17,13 @@ use std::rc::Rc; #[derive(Default)] struct Stash { - raft_node: Option<raft::Node>, - raft_loop: Option<fiber::LuaUnitJoinHandle>, + raft_node: Option<traft::Node>, } impl std::fmt::Debug for Stash { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("") .field("raft_node", &self.raft_node.is_some()) - .field("raft_loop", &self.raft_loop.is_some()) .finish() } } @@ -107,7 +103,10 @@ fn main_run(stash: &Rc<RefCell<Stash>>) { tarantool::set_cfg(&cfg); - raft_init(&stash); + traft::Storage::init_schema(); + let mut node = traft::Node::new(&traft::Config::new(1)).unwrap(); + node.start(); + stash.borrow_mut().raft_node = Some(node); std::env::var("PICODATA_LISTEN").ok().and_then(|v| { cfg.listen = Some(v.clone()); @@ -134,158 +133,12 @@ fn get_stash(stash: &Rc<RefCell<Stash>>) { println!("{:?}", stash); } -// A simple example about how to use the Raft library in Rust. -fn raft_init(stash: &Rc<RefCell<Stash>>) { - raft::Storage::init_schema(); - let logger = slog::Logger::root(tarantool::SlogDrain, o!()); - - // Create the configuration for the Raft node. - let cfg = raft::Config { - // The unique ID for the Raft node. - id: 1, - // Election tick is for how long the follower may campaign again after - // it doesn't receive any message from the leader. - election_tick: 10, - // Heartbeat tick is for how long the leader needs to send - // a heartbeat to keep alive. - heartbeat_tick: 3, - // The max size limits the max size of each appended message. Mostly, 1 MB is enough. - max_size_per_msg: 1024 * 1024 * 1024, - // Max inflight msgs that the leader sends messages to follower without - // receiving ACKs. - max_inflight_msgs: 256, - // The Raft applied index. - // You need to save your applied index when you apply the committed Raft logs. - applied: 0, - ..Default::default() - }; - - // Create the Raft node. - let r = raft::Node::new(&cfg, raft::Storage, &logger).unwrap(); - stash.borrow_mut().raft_node = Some(r); - - let loop_fn = { - let stash = stash.clone(); - move || { - let logger = slog::Logger::root(tarantool::SlogDrain, o!()); - loop { - fiber::sleep(Duration::from_millis(100)); - let mut stash: RefMut<Stash> = stash.borrow_mut(); - let mut raft_node = stash.raft_node.as_mut().unwrap(); - raft_node.tick(); - on_ready(&mut raft_node, &logger); - } - } - }; - - stash.borrow_mut().raft_loop = Some(fiber::defer_proc(loop_fn)); -} - -fn on_ready( - raft_group: &mut raft::Node, - logger: &slog::Logger, -) { - if !raft_group.has_ready() { - return; - } - - info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"); - - // Get the `Ready` with `RawNode::ready` interface. - let mut ready: raft::Ready = raft_group.ready(); - info!(logger, "--- {:?}", ready); - - let handle_messages = |msgs: Vec<raft::Message>| { - for _msg in msgs { - info!(logger, "--- handle message: {:?}", _msg); - // Send messages to other peers. - } - }; - - if !ready.messages().is_empty() { - // Send out the messages come from the node. - handle_messages(ready.take_messages()); - } - - if !ready.snapshot().is_empty() { - // This is a snapshot, we need to apply the snapshot at first. - let snap = ready.snapshot().clone(); - info!(logger, "--- apply_snapshot: {:?}", snap); - unimplemented!(); - // store.wl().apply_snapshot(snap).unwrap(); - } - - let mut _last_apply_index = 0; - let mut handle_committed_entries = |committed_entries: Vec<raft::Entry>| { - for entry in committed_entries { - info!(logger, "--- committed_entry: {:?}", entry); - // Mostly, you need to save the last apply index to resume applying - // after restart. Here we just ignore this because we use a Memory storage. - _last_apply_index = entry.index; - - if entry.data.is_empty() { - // Emtpy entry, when the peer becomes Leader it will send an empty entry. - continue; - } - - if entry.get_entry_type() == raft::EntryType::EntryNormal { - // let key = entry.data.get(0).unwrap(); - // if let Some(value) = cbs.remove(key) { - // } - } - - // TODO: handle EntryConfChange - } - }; - handle_committed_entries(ready.take_committed_entries()); - - if !ready.entries().is_empty() { - // Append entries to the Raft log. - let entries = ready.entries(); - for entry in entries { - info!(logger, "--- uncommitted_entry: {:?}", entry); - } - - raft::Storage::persist_entries(entries).unwrap(); - } - - if let Some(hs) = ready.hs() { - // Raft HardState changed, and we need to persist it. - // let hs = hs.clone(); - info!(logger, "--- hard_state: {:?}", hs); - raft::Storage::persist_hard_state(&hs).unwrap(); - // store.wl().set_hardstate(hs); - } - - if !ready.persisted_messages().is_empty() { - // Send out the persisted messages come from the node. - handle_messages(ready.take_persisted_messages()); - } - - info!(logger, "ADVANCE -----------------------------------------"); - - // Advance the Raft. - let mut light_rd = raft_group.advance(ready); - info!(logger, "--- {:?}", light_rd); - // Update commit index. - if let Some(commit) = light_rd.commit_index() { - raft::Storage::persist_commit(commit).unwrap(); - } - // Send out the messages. - handle_messages(light_rd.take_messages()); - // Apply all committed entries. - handle_committed_entries(light_rd.take_committed_entries()); - // Advance the apply index. - raft_group.advance_apply(); - info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"); -} - #[no_mangle] fn raft_propose(stash: &Rc<RefCell<Stash>>, key: u8) { let mut stash: RefMut<Stash> = stash.borrow_mut(); let raft_node = stash.raft_node.as_mut().unwrap(); let logger = slog::Logger::root(tarantool::SlogDrain, o!()); info!(logger, "propose {} .......................................", key); - raft_node.propose(vec![], vec![key]).unwrap(); + raft_node.borrow_mut().propose(vec![], vec![key]).unwrap(); info!(logger, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); } diff --git a/picolib/traft/mod.rs b/picolib/traft/mod.rs new file mode 100644 index 0000000000000000000000000000000000000000..cc0afbf74c1e9a2eaec9422be734cd6bcc6c44f8 --- /dev/null +++ b/picolib/traft/mod.rs @@ -0,0 +1,11 @@ +mod node; +mod storage; + +pub use node::Node; +pub use storage::Storage; + +pub use raft::Ready; +pub use raft::Config; +pub use raft::eraftpb::Message; +pub use raft::eraftpb::Entry; +// pub use raft::prelude::*; diff --git a/picolib/traft/node.rs b/picolib/traft/node.rs new file mode 100644 index 0000000000000000000000000000000000000000..3298305feacc8780b7579757192a7e1291bfaa90 --- /dev/null +++ b/picolib/traft/node.rs @@ -0,0 +1,167 @@ +use slog::{ + info, +}; +use std::ops::{Deref, DerefMut}; +use raft::prelude::*; +use raft::Error as RaftError; + +use std::cell::RefCell; +use std::rc::Rc; + +use std::time::Duration; + + +use super::storage::Storage; +use crate::tarantool::SlogDrain; +use ::tarantool::fiber; + +// pub type Node = RawNode<Storage>; +type RawNode = raft::RawNode<Storage>; + +pub struct Node { + logger: slog::Logger, + raw_node: Rc<RefCell<RawNode>>, + main_loop: Option<fiber::LuaUnitJoinHandle>, +} + +impl Node { + pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { + let logger = slog::Logger::root(SlogDrain, slog::o!()); + let raw_node = RawNode::new(cfg, Storage, &logger)?; + let raw_node = Rc::from(RefCell::from(raw_node)); + let ret = Node {logger, raw_node, main_loop: None}; + Ok(ret) + } + + pub fn start(&mut self) { + assert!(self.main_loop.is_none(), "Raft loop is already started"); + + let logger = self.logger.clone(); + let raw_node = self.raw_node.clone(); + let loop_fn = move || { + loop { + fiber::sleep(Duration::from_millis(100)); + // let mut stash: RefMut<Stash> = stash.borrow_mut(); + // let mut raft_node = stash.raft_node.as_mut().unwrap(); + let mut raw_node = raw_node.borrow_mut(); + raw_node.tick(); + on_ready(&mut raw_node, &logger); + } + }; + + self.main_loop = Some(fiber::defer_proc(loop_fn)); + } +} + +impl Deref for Node { + type Target = Rc<RefCell<RawNode>>; + + fn deref(&self) -> &Self::Target { + &self.raw_node + } +} + +impl DerefMut for Node { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.raw_node + } +} + +fn on_ready( + raft_group: &mut RawNode, + logger: &slog::Logger, +) { + if !raft_group.has_ready() { + return; + } + + info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv"); + + // Get the `Ready` with `RawNode::ready` interface. + let mut ready: raft::Ready = raft_group.ready(); + info!(logger, "--- {:?}", ready); + + let handle_messages = |msgs: Vec<Message>| { + for _msg in msgs { + info!(logger, "--- handle message: {:?}", _msg); + // Send messages to other peers. + } + }; + + if !ready.messages().is_empty() { + // Send out the messages come from the node. + handle_messages(ready.take_messages()); + } + + if !ready.snapshot().is_empty() { + // This is a snapshot, we need to apply the snapshot at first. + let snap = ready.snapshot().clone(); + info!(logger, "--- apply_snapshot: {:?}", snap); + unimplemented!(); + // store.wl().apply_snapshot(snap).unwrap(); + } + + let mut _last_apply_index = 0; + let mut handle_committed_entries = |committed_entries: Vec<Entry>| { + for entry in committed_entries { + info!(logger, "--- committed_entry: {:?}", entry); + // Mostly, you need to save the last apply index to resume applying + // after restart. Here we just ignore this because we use a Memory storage. + _last_apply_index = entry.index; + + if entry.data.is_empty() { + // Emtpy entry, when the peer becomes Leader it will send an empty entry. + continue; + } + + // if entry.get_entry_type() == traft::EntryType::EntryNormal { + // let key = entry.data.get(0).unwrap(); + // if let Some(value) = cbs.remove(key) { + // } + // } + + // TODO: handle EntryConfChange + } + }; + handle_committed_entries(ready.take_committed_entries()); + + if !ready.entries().is_empty() { + // Append entries to the Raft log. + let entries = ready.entries(); + for entry in entries { + info!(logger, "--- uncommitted_entry: {:?}", entry); + } + + Storage::persist_entries(entries).unwrap(); + } + + if let Some(hs) = ready.hs() { + // Raft HardState changed, and we need to persist it. + // let hs = hs.clone(); + info!(logger, "--- hard_state: {:?}", hs); + Storage::persist_hard_state(&hs).unwrap(); + // store.wl().set_hardstate(hs); + } + + if !ready.persisted_messages().is_empty() { + // Send out the persisted messages come from the node. + handle_messages(ready.take_persisted_messages()); + } + + info!(logger, "ADVANCE -----------------------------------------"); + + // Advance the Raft. + let mut light_rd = raft_group.advance(ready); + info!(logger, "--- {:?}", light_rd); + // Update commit index. + if let Some(commit) = light_rd.commit_index() { + Storage::persist_commit(commit).unwrap(); + } + // Send out the messages. + handle_messages(light_rd.take_messages()); + // Apply all committed entries. + handle_committed_entries(light_rd.take_committed_entries()); + // Advance the apply index. + raft_group.advance_apply(); + info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^"); +} diff --git a/picolib/raft.rs b/picolib/traft/storage.rs similarity index 98% rename from picolib/raft.rs rename to picolib/traft/storage.rs index f7e22741049d3118cd1191951db92178de1fba8b..b3861160e3f5a8695e57379259cb5813eef9023a 100644 --- a/picolib/raft.rs +++ b/picolib/traft/storage.rs @@ -6,10 +6,9 @@ use ::tarantool::space::Space; use ::tarantool::tuple::Tuple; use ::tarantool::index::IteratorType; -pub use raft::prelude::*; -pub use raft::Error as RaftError; +use raft::prelude::*; +use raft::Error as RaftError; -pub type Node = RawNode<Storage>; pub struct Storage; impl Storage {