diff --git a/picolib/traft/storage.rs b/picolib/traft/storage.rs index 98c610dbc9e4004fe4d4de135e9ed3f7c0abd9ba..60c1814e99220b145f3583c33d32a387a5d777ac 100644 --- a/picolib/traft/storage.rs +++ b/picolib/traft/storage.rs @@ -11,12 +11,17 @@ use raft::prelude::HardState as RaftHardState; use raft::prelude::RaftState; use raft::prelude::Snapshot as RaftSnapshot; use raft::Error as RaftError; +use serde::de::DeserializeOwned; +use serde::Serialize; mod entry_row; use entry_row::RaftEntryRow; pub struct Storage; +pub const SPACE_RAFT_STATE: &'static str = "raft_state"; +pub const SPACE_RAFT_LOG: &'static str = "raft_log"; + impl Storage { pub fn init_schema() { crate::tarantool::eval( @@ -75,43 +80,56 @@ impl Storage { ); } + fn persist_raft_state<T: Serialize>(key: &str, value: T) { + Space::find(SPACE_RAFT_STATE) + .unwrap() + .replace(&(key, value)) + .unwrap(); + } + + fn raft_state<T: DeserializeOwned>(key: &str) -> Option<T> { + Space::find(SPACE_RAFT_STATE)? + .get(&(key,)) + .unwrap()? + .field(1) + .unwrap() + } + pub fn term() -> Option<u64> { - let space: Space = Space::find("raft_state").unwrap(); - let row = space.get(&("term",)).unwrap(); - row.and_then(|row| row.field(1).unwrap()) + Storage::raft_state("term") } pub fn vote() -> Option<u64> { - let space: Space = Space::find("raft_state").unwrap(); - let row = space.get(&("vote",)).unwrap(); - row.and_then(|row| row.field(1).unwrap()) + Storage::raft_state("vote") } pub fn commit() -> Option<u64> { - let space: Space = Space::find("raft_state").unwrap(); - let row = space.get(&("commit",)).unwrap(); - row.and_then(|row| row.field(1).unwrap()) + Storage::raft_state("commit") } - pub fn persist_commit(commit: u64) { - let mut space: Space = Space::find("raft_state").unwrap(); - space.replace(&("commit", commit)).unwrap(); + pub fn applied() -> Option<u64> { + Storage::raft_state("applied") } - pub fn applied() -> Option<u64> { - let space: Space = Space::find("raft_state").unwrap(); - let row = space.get(&("applied",)).unwrap(); - row.and_then(|row| row.field(1).unwrap()) + pub fn persist_commit(commit: u64) { + Storage::persist_raft_state("commit", commit) } pub fn persist_applied(applied: u64) { - let mut space: Space = Space::find("raft_state").unwrap(); - space.replace(&("applied", applied)).unwrap(); + Storage::persist_raft_state("applied", applied) + } + + pub fn persist_term(term: u64) { + Storage::persist_raft_state("term", term) + } + + pub fn persist_vote(vote: u64) { + Storage::persist_raft_state("vote", vote) } pub fn entries(low: u64, high: u64) -> Vec<RaftEntry> { let mut ret: Vec<RaftEntry> = vec![]; - let space = Space::find("raft_log").unwrap(); + let space = Space::find(SPACE_RAFT_LOG).unwrap(); let iter = space .primary_key() .select(IteratorType::GE, &(low,)) @@ -129,7 +147,7 @@ impl Storage { } pub fn persist_entries(entries: &Vec<RaftEntry>) { - let mut space = Space::find("raft_log").unwrap(); + let mut space = Space::find(SPACE_RAFT_LOG).unwrap(); for entry in entries { let row: RaftEntryRow = entry.try_into().unwrap(); space.insert(&row).unwrap(); @@ -145,10 +163,9 @@ impl Storage { } pub fn persist_hard_state(hs: &RaftHardState) { - let mut space: Space = Space::find("raft_state").unwrap(); - space.replace(&("term", hs.term)).unwrap(); - space.replace(&("vote", hs.vote)).unwrap(); - space.replace(&("commit", hs.commit)).unwrap(); + Storage::persist_term(hs.term); + Storage::persist_vote(hs.vote); + Storage::persist_commit(hs.commit); } }