Skip to content
Snippets Groups Projects
Commit caf78484 authored by Sergey V's avatar Sergey V Committed by Yaroslav Dynnikov
Browse files

refactor: traft Storage

parent 404f7851
No related branches found
No related tags found
1 merge request!17refactor: traft Storage
Pipeline #3421 passed
...@@ -11,12 +11,17 @@ use raft::prelude::HardState as RaftHardState; ...@@ -11,12 +11,17 @@ use raft::prelude::HardState as RaftHardState;
use raft::prelude::RaftState; use raft::prelude::RaftState;
use raft::prelude::Snapshot as RaftSnapshot; use raft::prelude::Snapshot as RaftSnapshot;
use raft::Error as RaftError; use raft::Error as RaftError;
use serde::de::DeserializeOwned;
use serde::Serialize;
mod entry_row; mod entry_row;
use entry_row::RaftEntryRow; use entry_row::RaftEntryRow;
pub struct Storage; pub struct Storage;
pub const SPACE_RAFT_STATE: &'static str = "raft_state";
pub const SPACE_RAFT_LOG: &'static str = "raft_log";
impl Storage { impl Storage {
pub fn init_schema() { pub fn init_schema() {
crate::tarantool::eval( crate::tarantool::eval(
...@@ -75,43 +80,56 @@ impl Storage { ...@@ -75,43 +80,56 @@ impl Storage {
); );
} }
fn persist_raft_state<T: Serialize>(key: &str, value: T) {
Space::find(SPACE_RAFT_STATE)
.unwrap()
.replace(&(key, value))
.unwrap();
}
fn raft_state<T: DeserializeOwned>(key: &str) -> Option<T> {
Space::find(SPACE_RAFT_STATE)?
.get(&(key,))
.unwrap()?
.field(1)
.unwrap()
}
pub fn term() -> Option<u64> { pub fn term() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap(); Storage::raft_state("term")
let row = space.get(&("term",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
} }
pub fn vote() -> Option<u64> { pub fn vote() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap(); Storage::raft_state("vote")
let row = space.get(&("vote",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
} }
pub fn commit() -> Option<u64> { pub fn commit() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap(); Storage::raft_state("commit")
let row = space.get(&("commit",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
} }
pub fn persist_commit(commit: u64) { pub fn applied() -> Option<u64> {
let mut space: Space = Space::find("raft_state").unwrap(); Storage::raft_state("applied")
space.replace(&("commit", commit)).unwrap();
} }
pub fn applied() -> Option<u64> { pub fn persist_commit(commit: u64) {
let space: Space = Space::find("raft_state").unwrap(); Storage::persist_raft_state("commit", commit)
let row = space.get(&("applied",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
} }
pub fn persist_applied(applied: u64) { pub fn persist_applied(applied: u64) {
let mut space: Space = Space::find("raft_state").unwrap(); Storage::persist_raft_state("applied", applied)
space.replace(&("applied", applied)).unwrap(); }
pub fn persist_term(term: u64) {
Storage::persist_raft_state("term", term)
}
pub fn persist_vote(vote: u64) {
Storage::persist_raft_state("vote", vote)
} }
pub fn entries(low: u64, high: u64) -> Vec<RaftEntry> { pub fn entries(low: u64, high: u64) -> Vec<RaftEntry> {
let mut ret: Vec<RaftEntry> = vec![]; let mut ret: Vec<RaftEntry> = vec![];
let space = Space::find("raft_log").unwrap(); let space = Space::find(SPACE_RAFT_LOG).unwrap();
let iter = space let iter = space
.primary_key() .primary_key()
.select(IteratorType::GE, &(low,)) .select(IteratorType::GE, &(low,))
...@@ -129,7 +147,7 @@ impl Storage { ...@@ -129,7 +147,7 @@ impl Storage {
} }
pub fn persist_entries(entries: &Vec<RaftEntry>) { pub fn persist_entries(entries: &Vec<RaftEntry>) {
let mut space = Space::find("raft_log").unwrap(); let mut space = Space::find(SPACE_RAFT_LOG).unwrap();
for entry in entries { for entry in entries {
let row: RaftEntryRow = entry.try_into().unwrap(); let row: RaftEntryRow = entry.try_into().unwrap();
space.insert(&row).unwrap(); space.insert(&row).unwrap();
...@@ -145,10 +163,9 @@ impl Storage { ...@@ -145,10 +163,9 @@ impl Storage {
} }
pub fn persist_hard_state(hs: &RaftHardState) { pub fn persist_hard_state(hs: &RaftHardState) {
let mut space: Space = Space::find("raft_state").unwrap(); Storage::persist_term(hs.term);
space.replace(&("term", hs.term)).unwrap(); Storage::persist_vote(hs.vote);
space.replace(&("vote", hs.vote)).unwrap(); Storage::persist_commit(hs.commit);
space.replace(&("commit", hs.commit)).unwrap();
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment