Skip to content
Snippets Groups Projects

refactor: traft Storage

Merged Alexey Protsenko requested to merge refactor/traft-storage into master
All threads resolved!
1 file
+ 41
24
Compare changes
  • Side-by-side
  • Inline
+ 41
24
@@ -11,12 +11,17 @@ use raft::prelude::HardState as RaftHardState;
use raft::prelude::RaftState;
use raft::prelude::Snapshot as RaftSnapshot;
use raft::Error as RaftError;
use serde::de::DeserializeOwned;
use serde::Serialize;
mod entry_row;
use entry_row::RaftEntryRow;
pub struct Storage;
pub const SPACE_RAFT_STATE: &'static str = "raft_state";
pub const SPACE_RAFT_LOG: &'static str = "raft_log";
impl Storage {
pub fn init_schema() {
crate::tarantool::eval(
@@ -75,43 +80,56 @@ impl Storage {
);
}
fn persist_raft_state<T: Serialize>(key: &str, value: T) {
Space::find(SPACE_RAFT_STATE)
.unwrap()
.replace(&(key, value))
.unwrap();
}
fn raft_state<T: DeserializeOwned>(key: &str) -> Option<T> {
Space::find(SPACE_RAFT_STATE)?
.get(&(key,))
.unwrap()?
.field(1)
.unwrap()
}
pub fn term() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("term",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
Storage::raft_state("term")
}
pub fn vote() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("vote",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
Storage::raft_state("vote")
}
pub fn commit() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("commit",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
Storage::raft_state("commit")
}
pub fn persist_commit(commit: u64) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("commit", commit)).unwrap();
pub fn applied() -> Option<u64> {
Storage::raft_state("applied")
}
pub fn applied() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("applied",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
pub fn persist_commit(commit: u64) {
Storage::persist_raft_state("commit", commit)
}
pub fn persist_applied(applied: u64) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("applied", applied)).unwrap();
Storage::persist_raft_state("applied", applied)
}
pub fn persist_term(term: u64) {
Storage::persist_raft_state("term", term)
}
pub fn persist_vote(vote: u64) {
Storage::persist_raft_state("vote", vote)
}
pub fn entries(low: u64, high: u64) -> Vec<RaftEntry> {
let mut ret: Vec<RaftEntry> = vec![];
let space = Space::find("raft_log").unwrap();
let space = Space::find(SPACE_RAFT_LOG).unwrap();
let iter = space
.primary_key()
.select(IteratorType::GE, &(low,))
@@ -129,7 +147,7 @@ impl Storage {
}
pub fn persist_entries(entries: &Vec<RaftEntry>) {
let mut space = Space::find("raft_log").unwrap();
let mut space = Space::find(SPACE_RAFT_LOG).unwrap();
for entry in entries {
let row: RaftEntryRow = entry.try_into().unwrap();
space.insert(&row).unwrap();
@@ -145,10 +163,9 @@ impl Storage {
}
pub fn persist_hard_state(hs: &RaftHardState) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("term", hs.term)).unwrap();
space.replace(&("vote", hs.vote)).unwrap();
space.replace(&("commit", hs.commit)).unwrap();
Storage::persist_term(hs.term);
Storage::persist_vote(hs.vote);
Storage::persist_commit(hs.commit);
}
}
Loading