Skip to content
Snippets Groups Projects
Commit 038fe692 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files
parent 20ff337b
No related branches found
No related tags found
1 merge request!4Implement simplest raft node
......@@ -20,7 +20,7 @@ use raft::prelude::*;
#[derive(Default)]
struct Stash {
raft_node: Option<RawNode<MemStorage>>,
raft_node: Option<tarantool::RaftNode>,
raft_loop: Option<fiber::LuaUnitJoinHandle>,
}
......@@ -126,9 +126,6 @@ fn main_run() {
// raft_main();
}
use raft::eraftpb::ConfState;
use raft::storage::MemStorage;
fn get_stash(stash: &Rc<RefCell<Stash>>) {
let stash: Ref<Stash> = stash.borrow();
println!("{:?}", stash);
......@@ -139,7 +136,7 @@ fn raft_init(stash: &Rc<RefCell<Stash>>) {
// Create a storage for Raft, and here we just use a simple memory storage.
// You need to build your own persistent storage in your production.
// Please check the Storage trait in src/storage.rs to see how to implement one.
let storage = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![])));
let storage = tarantool::NodeStorage::new();
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
......@@ -165,7 +162,7 @@ fn raft_init(stash: &Rc<RefCell<Stash>>) {
};
// Create the Raft node.
let r = RawNode::new(&cfg, storage, &logger).unwrap();
let r = tarantool::RaftNode::new(&cfg, storage, &logger).unwrap();
stash.borrow_mut().raft_node = Some(r);
let loop_fn = {
......@@ -186,20 +183,23 @@ fn raft_init(stash: &Rc<RefCell<Stash>>) {
}
fn on_ready(
raft_group: &mut RawNode<MemStorage>,
raft_group: &mut tarantool::RaftNode,
logger: &slog::Logger,
) {
if !raft_group.has_ready() {
return;
}
let store = raft_group.raft.raft_log.store.clone();
let store = raft_group.raft.raft_log.store.0.clone();
info!(logger, "vvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvvv");
// Get the `Ready` with `RawNode::ready` interface.
let mut ready: Ready = raft_group.ready();
println!("{:?}", ready);
info!(logger, "--- {:?}", ready);
let handle_messages = |msgs: Vec<Message>| {
for _msg in msgs {
info!(logger, "--- handle message: {:?}", _msg);
// Send messages to other peers.
}
};
......@@ -211,12 +211,15 @@ fn on_ready(
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
store.wl().apply_snapshot(ready.snapshot().clone()).unwrap();
let snap = ready.snapshot().clone();
info!(logger, "--- apply_snapshot: {:?}", snap);
store.wl().apply_snapshot(snap).unwrap();
}
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
info!(logger, "--- committed_entry: {:?}", entry);
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
......@@ -227,8 +230,7 @@ fn on_ready(
}
if entry.get_entry_type() == EntryType::EntryNormal {
let key = entry.data.get(0).unwrap();
info!(logger, "Inserted key: {}", key);
// let key = entry.data.get(0).unwrap();
// if let Some(value) = cbs.remove(key) {
// }
}
......@@ -240,12 +242,19 @@ fn on_ready(
if !ready.entries().is_empty() {
// Append entries to the Raft log.
store.wl().append(ready.entries()).unwrap();
let entries = ready.entries();
for entry in entries {
info!(logger, "--- uncommitted_entry: {:?}", entry);
}
store.wl().append(entries).unwrap();
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
store.wl().set_hardstate(hs.clone());
let hs = hs.clone();
info!(logger, "--- hard_state: {:?}", hs);
store.wl().set_hardstate(hs);
}
if !ready.persisted_messages().is_empty() {
......@@ -253,6 +262,8 @@ fn on_ready(
handle_messages(ready.take_persisted_messages());
}
info!(logger, "ADVANCE -----------------------------------------");
// Advance the Raft.
let mut light_rd = raft_group.advance(ready);
// Update commit index.
......@@ -265,11 +276,15 @@ fn on_ready(
handle_committed_entries(light_rd.take_committed_entries());
// Advance the apply index.
raft_group.advance_apply();
info!(logger, "^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^");
}
#[no_mangle]
fn raft_propose(stash: &Rc<RefCell<Stash>>, key: u8) {
let mut stash: RefMut<Stash> = stash.borrow_mut();
let raft_node = stash.raft_node.as_mut().unwrap();
let logger = slog::Logger::root(tarantool::SlogDrain, o!());
info!(logger, "propose {} .......................................", key);
raft_node.propose(vec![], vec![key]).unwrap();
info!(logger, ",,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,");
}
......@@ -133,3 +133,76 @@ mod slog {
}
}
}
pub use self::raft::RaftNode;
pub use self::raft::NodeStorage;
mod raft {
use slog::{debug, o};
use raft::prelude::*;
use raft::Error as RaftError;
use raft::storage::MemStorage;
use raft::eraftpb::ConfState;
pub struct NodeStorage(pub MemStorage);
pub type RaftNode = RawNode<NodeStorage>;
impl NodeStorage {
pub fn new() -> Self {
let stor = MemStorage::new_with_conf_state(ConfState::from((vec![1], vec![])));
Self(stor)
}
}
impl Storage for NodeStorage {
fn initial_state(&self) -> Result<RaftState, RaftError> {
let ret = self.0.initial_state();
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ initial_state() -> {:?}", ret);
ret
}
fn entries(
&self,
low: u64,
high: u64,
max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>, RaftError> {
let max_size: Option<u64> = max_size.into();
let ret = self.0.entries(low, high, max_size);
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ entries(low={}, high={}, max_size={:?}) -> {:?}",
low, high, max_size, ret
);
ret
}
fn term(&self, idx: u64) -> Result<u64, RaftError> {
let ret = self.0.term(idx);
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ term(idx={}) -> {:?}", idx, ret);
ret
}
fn first_index(&self) -> Result<u64, RaftError> {
let ret = self.0.first_index();
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ first_index() -> {:?}", ret);
ret
}
fn last_index(&self) -> Result<u64, RaftError> {
let ret = self.0.last_index();
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ last_index() -> {:?}", ret);
ret
}
fn snapshot(&self, request_index: u64) -> Result<Snapshot, RaftError> {
let ret = self.0.snapshot(request_index);
let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
debug!(logger, "+++ snapshot(idx={}) -> {:?}", request_index, ret);
ret
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment