Skip to content
Snippets Groups Projects
Commit 8f63bab1 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

Incapsulate raft internals

parent 5ab3ad27
No related branches found
No related tags found
1 merge request!6Refactor raft
use slog::{debug, info, o}; use slog::{debug, info, o};
use std::os::raw::c_int; use std::os::raw::c_int;
use ::tarantool::tlua; use ::tarantool::tlua;
use std::time::Duration;
mod tarantool; mod tarantool;
mod raft; mod traft;
use ::tarantool::fiber;
pub struct InnerTest { pub struct InnerTest {
pub name: &'static str, pub name: &'static str,
...@@ -19,15 +17,13 @@ use std::rc::Rc; ...@@ -19,15 +17,13 @@ use std::rc::Rc;
#[derive(Default)] #[derive(Default)]
struct Stash { struct Stash {
raft_node: Option<raft::Node>, raft_node: Option<traft::Node>,
raft_loop: Option<fiber::LuaUnitJoinHandle>,
} }
impl std::fmt::Debug for Stash { impl std::fmt::Debug for Stash {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("") f.debug_struct("")
.field("raft_node", &self.raft_node.is_some()) .field("raft_node", &self.raft_node.is_some())
.field("raft_loop", &self.raft_loop.is_some())
.finish() .finish()
} }
} }
...@@ -107,7 +103,10 @@ fn main_run(stash: &Rc<RefCell<Stash>>) { ...@@ -107,7 +103,10 @@ fn main_run(stash: &Rc<RefCell<Stash>>) {
tarantool::set_cfg(&cfg); tarantool::set_cfg(&cfg);
raft_init(&stash); traft::Storage::init_schema();
let mut node = traft::Node::new(&traft::Config::new(1)).unwrap();
node.start();
stash.borrow_mut().raft_node = Some(node);
std::env::var("PICODATA_LISTEN").ok().and_then(|v| { std::env::var("PICODATA_LISTEN").ok().and_then(|v| {
cfg.listen = Some(v.clone()); cfg.listen = Some(v.clone());
...@@ -134,158 +133,12 @@ fn get_stash(stash: &Rc<RefCell<Stash>>) { ...@@ -134,158 +133,12 @@ fn get_stash(stash: &Rc<RefCell<Stash>>) {
println!("{:?}", stash); println!("{:?}", stash);
} }
// A simple example about how to use the Raft library in Rust.
fn raft_init(stash: &Rc<RefCell<Stash>>) {
raft::Storage::init_schema();
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
// Create the configuration for the Raft node.
let cfg = raft::Config {
// The unique ID for the Raft node.
id: 1,
// Election tick is for how long the follower may campaign again after
// it doesn't receive any message from the leader.
election_tick: 10,
// Heartbeat tick is for how long the leader needs to send
// a heartbeat to keep alive.
heartbeat_tick: 3,
// The max size limits the max size of each appended message. Mostly, 1 MB is enough.
max_size_per_msg: 1024 * 1024 * 1024,
// Max inflight msgs that the leader sends messages to follower without
// receiving ACKs.
max_inflight_msgs: 256,
// The Raft applied index.
// You need to save your applied index when you apply the committed Raft logs.
applied: 0,
..Default::default()
};
// Create the Raft node.
let r = raft::Node::new(&cfg, raft::Storage, &logger).unwrap();
stash.borrow_mut().raft_node = Some(r);
let loop_fn = {
let stash = stash.clone();
move || {
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
loop {
fiber::sleep(Duration::from_millis(100));
let mut stash: RefMut<Stash> = stash.borrow_mut();
let mut raft_node = stash.raft_node.as_mut().unwrap();
raft_node.tick();
on_ready(&mut raft_node, &logger);
}
}
};
stash.borrow_mut().raft_loop = Some(fiber::defer_proc(loop_fn));
}
fn on_ready(
raft_group: &mut raft::Node,
logger: &slog::Logger,
) {
if !raft_group.has_ready() {
return;
}
info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
// Get the `Ready` with `RawNode::ready` interface.
let mut ready: raft::Ready = raft_group.ready();
info!(logger, "--- {:?}", ready);
let handle_messages = |msgs: Vec<raft::Message>| {
for _msg in msgs {
info!(logger, "--- handle message: {:?}", _msg);
// Send messages to other peers.
}
};
if !ready.messages().is_empty() {
// Send out the messages come from the node.
handle_messages(ready.take_messages());
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
let snap = ready.snapshot().clone();
info!(logger, "--- apply_snapshot: {:?}", snap);
unimplemented!();
// store.wl().apply_snapshot(snap).unwrap();
}
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<raft::Entry>| {
for entry in committed_entries {
info!(logger, "--- committed_entry: {:?}", entry);
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
if entry.data.is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
continue;
}
if entry.get_entry_type() == raft::EntryType::EntryNormal {
// let key = entry.data.get(0).unwrap();
// if let Some(value) = cbs.remove(key) {
// }
}
// TODO: handle EntryConfChange
}
};
handle_committed_entries(ready.take_committed_entries());
if !ready.entries().is_empty() {
// Append entries to the Raft log.
let entries = ready.entries();
for entry in entries {
info!(logger, "--- uncommitted_entry: {:?}", entry);
}
raft::Storage::persist_entries(entries).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
info!(logger, "--- hard_state: {:?}", hs);
raft::Storage::persist_hard_state(&hs).unwrap();
// store.wl().set_hardstate(hs);
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
}
info!(logger, "ADVANCE -----------------------------------------");
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
info!(logger, "--- {:?}", light_rd);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
raft::Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
}
#[no_mangle] #[no_mangle]
fn raft_propose(stash: &Rc<RefCell<Stash>>, key: u8) { fn raft_propose(stash: &Rc<RefCell<Stash>>, key: u8) {
let mut stash: RefMut<Stash> = stash.borrow_mut(); let mut stash: RefMut<Stash> = stash.borrow_mut();
let raft_node = stash.raft_node.as_mut().unwrap(); let raft_node = stash.raft_node.as_mut().unwrap();
let logger = slog::Logger::root(tarantool::SlogDrain, o!()); let logger = slog::Logger::root(tarantool::SlogDrain, o!());
info!(logger, "propose {} .......................................", key); info!(logger, "propose {} .......................................", key);
raft_node.propose(vec![], vec![key]).unwrap(); raft_node.borrow_mut().propose(vec![], vec![key]).unwrap();
info!(logger, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,"); info!(logger, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
} }
mod node;
mod storage;
pub use node::Node;
pub use storage::Storage;
pub use raft::Ready;
pub use raft::Config;
pub use raft::eraftpb::Message;
pub use raft::eraftpb::Entry;
// pub use raft::prelude::*;
use slog::{
info,
};
use std::ops::{Deref, DerefMut};
use raft::prelude::*;
use raft::Error as RaftError;
use std::cell::RefCell;
use std::rc::Rc;
use std::time::Duration;
use super::storage::Storage;
use crate::tarantool::SlogDrain;
use ::tarantool::fiber;
// pub type Node = RawNode<Storage>;
type RawNode = raft::RawNode<Storage>;
pub struct Node {
logger: slog::Logger,
raw_node: Rc<RefCell<RawNode>>,
main_loop: Option<fiber::LuaUnitJoinHandle>,
}
impl Node {
pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> {
let logger = slog::Logger::root(SlogDrain, slog::o!());
let raw_node = RawNode::new(cfg, Storage, &logger)?;
let raw_node = Rc::from(RefCell::from(raw_node));
let ret = Node {logger, raw_node, main_loop: None};
Ok(ret)
}
pub fn start(&mut self) {
assert!(self.main_loop.is_none(), "Raft loop is already started");
let logger = self.logger.clone();
let raw_node = self.raw_node.clone();
let loop_fn = move || {
loop {
fiber::sleep(Duration::from_millis(100));
// let mut stash: RefMut<Stash> = stash.borrow_mut();
// let mut raft_node = stash.raft_node.as_mut().unwrap();
let mut raw_node = raw_node.borrow_mut();
raw_node.tick();
on_ready(&mut raw_node, &logger);
}
};
self.main_loop = Some(fiber::defer_proc(loop_fn));
}
}
impl Deref for Node {
type Target = Rc<RefCell<RawNode>>;
fn deref(&self) -> &Self::Target {
&self.raw_node
}
}
impl DerefMut for Node {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.raw_node
}
}
fn on_ready(
raft_group: &mut RawNode,
logger: &slog::Logger,
) {
if !raft_group.has_ready() {
return;
}
info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
// Get the `Ready` with `RawNode::ready` interface.
let mut ready: raft::Ready = raft_group.ready();
info!(logger, "--- {:?}", ready);
let handle_messages = |msgs: Vec<Message>| {
for _msg in msgs {
info!(logger, "--- handle message: {:?}", _msg);
// Send messages to other peers.
}
};
if !ready.messages().is_empty() {
// Send out the messages come from the node.
handle_messages(ready.take_messages());
}
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
let snap = ready.snapshot().clone();
info!(logger, "--- apply_snapshot: {:?}", snap);
unimplemented!();
// store.wl().apply_snapshot(snap).unwrap();
}
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
info!(logger, "--- committed_entry: {:?}", entry);
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
if entry.data.is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
continue;
}
// if entry.get_entry_type() == traft::EntryType::EntryNormal {
// let key = entry.data.get(0).unwrap();
// if let Some(value) = cbs.remove(key) {
// }
// }
// TODO: handle EntryConfChange
}
};
handle_committed_entries(ready.take_committed_entries());
if !ready.entries().is_empty() {
// Append entries to the Raft log.
let entries = ready.entries();
for entry in entries {
info!(logger, "--- uncommitted_entry: {:?}", entry);
}
Storage::persist_entries(entries).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
info!(logger, "--- hard_state: {:?}", hs);
Storage::persist_hard_state(&hs).unwrap();
// store.wl().set_hardstate(hs);
}
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
handle_messages(ready.take_persisted_messages());
}
info!(logger, "ADVANCE -----------------------------------------");
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
info!(logger, "--- {:?}", light_rd);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
}
// Send out the messages.
handle_messages(light_rd.take_messages());
// Apply all committed entries.
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
}
...@@ -6,10 +6,9 @@ use ::tarantool::space::Space; ...@@ -6,10 +6,9 @@ use ::tarantool::space::Space;
use ::tarantool::tuple::Tuple; use ::tarantool::tuple::Tuple;
use ::tarantool::index::IteratorType; use ::tarantool::index::IteratorType;
pub use raft::prelude::*; use raft::prelude::*;
pub use raft::Error as RaftError; use raft::Error as RaftError;
pub type Node = RawNode<Storage>;
pub struct Storage; pub struct Storage;
impl Storage { impl Storage {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment