Skip to content
Snippets Groups Projects
Commit ce6f33cf authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

Refactor raft storage

1. Change raft_state schema. Now it's more suitable for updating
   individual fields of the raft hard state.

   Close https://gitlab.com/picodata/picodata/picodata/-/issues/13

2. Persist commit and applied indices.

   Close https://gitlab.com/picodata/picodata/picodata/-/issues/14

3. Rearrange storage code. High cohesion, low coupling, as they
   teach us.
parent 8f63bab1
No related branches found
No related tags found
1 merge request!6Refactor raft
Pipeline #3381 passed
......@@ -104,7 +104,12 @@ fn main_run(stash: &Rc<RefCell<Stash>>) {
tarantool::set_cfg(&cfg);
traft::Storage::init_schema();
let mut node = traft::Node::new(&traft::Config::new(1)).unwrap();
let raft_cfg = traft::Config {
id: 1,
applied: traft::Storage::applied().unwrap_or_default(),
..Default::default()
};
let mut node = traft::Node::new(&raft_cfg).unwrap();
node.start();
stash.borrow_mut().raft_node = Some(node);
......
......@@ -101,13 +101,10 @@ fn on_ready(
// store.wl().apply_snapshot(snap).unwrap();
}
let mut _last_apply_index = 0;
let mut handle_committed_entries = |committed_entries: Vec<Entry>| {
let handle_committed_entries = |committed_entries: Vec<Entry>| {
for entry in committed_entries {
info!(logger, "--- committed_entry: {:?}", entry);
// Mostly, you need to save the last apply index to resume applying
// after restart. Here we just ignore this because we use a Memory storage.
_last_apply_index = entry.index;
Storage::persist_applied(entry.index);
if entry.data.is_empty() {
// Emtpy entry, when the peer becomes Leader it will send an empty entry.
......@@ -132,14 +129,14 @@ fn on_ready(
info!(logger, "--- uncommitted_entry: {:?}", entry);
}
Storage::persist_entries(entries).unwrap();
Storage::persist_entries(entries);
}
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
info!(logger, "--- hard_state: {:?}", hs);
Storage::persist_hard_state(&hs).unwrap();
Storage::persist_hard_state(&hs);
// store.wl().set_hardstate(hs);
}
......@@ -155,7 +152,7 @@ fn on_ready(
info!(logger, "--- {:?}", light_rd);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
Storage::persist_commit(commit).unwrap();
Storage::persist_commit(commit);
}
// Send out the messages.
handle_messages(light_rd.take_messages());
......
......@@ -37,15 +37,14 @@ impl Storage {
if_not_exists = true,
is_local = true,
format = {
{name = 'term', type = 'unsigned', is_nullable = false},
{name = 'vote', type = 'unsigned', is_nullable = false},
{name = 'commit', type = 'unsigned', is_nullable = false},
{name = 'key', type = 'string', is_nullable = false},
{name = 'value', type = 'any', is_nullable = false},
}
})
box.space.raft_state:create_index('pk', {
if_not_exists = true,
parts = {{'term'}},
parts = {{'key'}},
})
box.schema.space.create('raft_group', {
......@@ -71,39 +70,83 @@ impl Storage {
);
}
pub fn persist_entries(entries: &Vec<Entry>) -> Result<(), RaftError> {
pub fn term() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("term",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
}
pub fn vote() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("vote",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
}
pub fn commit() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("commit",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
}
pub fn persist_commit(commit: u64) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("commit", commit)).unwrap();
}
pub fn applied() -> Option<u64> {
let space: Space = Space::find("raft_state").unwrap();
let row = space.get(&("applied",)).unwrap();
row.and_then(|row| row.field(1).unwrap())
}
pub fn persist_applied(applied: u64) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("applied", applied)).unwrap();
}
pub fn entries(low: u64, high: u64,) -> Vec<Entry> {
let mut ret: Vec<Entry> = vec![];
let space = Space::find("raft_log").unwrap();
let iter = space.primary_key().select(IteratorType::GE, &(low,)).unwrap();
for tuple in iter {
let row: LogRow = tuple.into_struct().unwrap();
if row.raft_index >= high {
break;
}
ret.push(row.into());
}
ret
}
pub fn persist_entries(entries: &Vec<Entry>) {
let mut space = Space::find("raft_log").unwrap();
for entry in entries {
let row: LogRow = LogRow::from(entry);
space.insert(&row).unwrap();
}
Ok(())
}
pub fn persist_hard_state(hs: &HardState) -> Result<(), RaftError> {
let mut space: Space = Space::find("raft_state").unwrap();
let row: HardStateRow = HardStateRow::from(hs);
space.insert(&row).unwrap();
Ok(())
pub fn hard_state() -> HardState {
let mut ret = HardState::default();
Storage::term().map(|v| ret.term = v);
Storage::vote().map(|v| ret.vote = v);
Storage::commit().map(|v| ret.commit = v);
ret
}
pub fn persist_commit(commit: u64) -> Result<(), RaftError> {
// let mut space: Space = Space::find("raft_state").unwrap();
// space.primary_key().update()
println!("--- persist_commit(idx = {})", commit);
Ok(())
pub fn persist_hard_state(hs: &HardState) {
let mut space: Space = Space::find("raft_state").unwrap();
space.replace(&("term", hs.term)).unwrap();
space.replace(&("vote", hs.vote)).unwrap();
space.replace(&("commit", hs.commit)).unwrap();
}
}
impl raft::Storage for Storage {
fn initial_state(&self) -> Result<RaftState, RaftError> {
let space: Space = Space::find("raft_state").unwrap();
let tuple: Option<Tuple> = space.primary_key().max(&()).unwrap();
let row: Option<HardStateRow> = tuple
.and_then(|t| t.into_struct().unwrap());
let hs: HardState = row
.map(|row| row.into())
.unwrap_or_default();
let hs = Storage::hard_state();
// See also: https://github.com/etcd-io/etcd/blob/main/raft/raftpb/raft.pb.go
let cs: ConfState = ConfState {
......@@ -123,24 +166,7 @@ impl raft::Storage for Storage {
high: u64,
_max_size: impl Into<Option<u64>>,
) -> Result<Vec<Entry>, RaftError> {
let mut ret: Vec<Entry> = vec![];
let space = Space::find("raft_log").unwrap();
let iter = space.primary_key().select(IteratorType::GE, &(low,)).unwrap();
for tuple in iter {
let row: LogRow = tuple.into_struct().unwrap();
if row.raft_index >= high {
break;
}
ret.push(row.into());
}
// let max_size: Option<u64> = max_size.into();
// let ret = self.0.entries(low, high, max_size);
// let logger = slog::Logger::root(crate::tarantool::SlogDrain, o!());
// debug!(logger, "+++ entries(low={}, high={}, max_size={:?}) -> {:?}",
// low, high, max_size, ret
// );
Ok(ret)
Ok(Storage::entries(low, high))
}
fn term(&self, idx: u64) -> Result<u64, RaftError> {
......@@ -189,37 +215,6 @@ impl raft::Storage for Storage {
}
}
#[derive(Debug, Serialize, Deserialize)]
struct HardStateRow {
pub term: u64,
pub vote: u64,
pub commit: u64,
}
impl HardStateRow {
fn from(hs: &HardState) -> HardStateRow {
HardStateRow {
term: hs.term,
vote: hs.vote,
commit: hs.commit,
}
}
}
impl From<HardStateRow> for HardState {
fn from(hs: HardStateRow) -> HardState {
HardState {
term: hs.term,
vote: hs.vote,
commit: hs.commit,
..Default::default()
}
}
}
impl ::tarantool::tuple::AsTuple for HardStateRow {}
#[derive(Debug, Serialize, Deserialize)]
struct LogRow {
pub raft_index: u64,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment