Skip to content
Snippets Groups Projects
Commit b77ff1da authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov
Browse files

important: arrange transaction scope in NodeImpl

Raft-rs interface requires the application to process special structure,
the so-called "ready state". It also makes a demand on the processing
workflow. The fields must be processed in a certain sequence, see
<https://docs.rs/raft/0.6.0/raft/index.html#processing-the-ready-state>.

For example, it was a mistake to send `persisted_messages` inside a
transaction, because it's forbidded to do so before persisting the hard
state, but the actual write happens only in the end of transaction.

As for handling `soft_state` and `read_states`, they're useless in a
transaction as they don't persist anything.

The documentation is quite complicated, so this code might be revised
once again later.
parent 9450b27f
No related branches found
No related tags found
1 merge request!260important: arrange transaction scope in NodeImpl
Pipeline #12232 passed
......@@ -521,11 +521,11 @@ impl NodeImpl {
/// Is called during a transaction
fn handle_committed_entries(
&mut self,
entries: Vec<raft::Entry>,
entries: &[raft::Entry],
topology_changed: &mut bool,
expelled: &mut bool,
) {
for entry in &entries {
for entry in entries {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
......@@ -651,7 +651,7 @@ impl NodeImpl {
}
/// Is called during a transaction
fn handle_read_states(&mut self, read_states: Vec<raft::ReadState>) {
fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
for rs in read_states {
let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
Ok(Some(v)) => v,
......@@ -705,75 +705,65 @@ impl NodeImpl {
let mut ready: raft::Ready = self.raw_node.ready();
start_transaction(|| -> Result<(), TransactionError> {
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
self.handle_messages(messages);
}
// Send out messages to the other nodes.
self.handle_messages(ready.take_messages());
if !ready.snapshot().is_empty() {
// This is a snapshot, we need to apply the snapshot at first.
unimplemented!();
}
// This is a snapshot, we need to apply the snapshot at first.
if !ready.snapshot().is_empty() {
unimplemented!();
}
let committed_entries = ready.take_committed_entries();
self.handle_committed_entries(committed_entries, topology_changed, expelled);
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
status.raft_state = format!("{:?}", ss.raft_state);
event::broadcast(Event::StatusChanged);
}
if !ready.entries().is_empty() {
let e = ready.entries();
// Append entries to the Raft log.
self.storage.persist_entries(e).unwrap();
}
self.handle_read_states(ready.read_states());
if let Some(hs) = ready.hs() {
// Raft HardState changed, and we need to persist it.
// let hs = hs.clone();
self.storage.persist_hard_state(hs).unwrap();
}
start_transaction(|| -> Result<(), TransactionError> {
// Apply committed entries.
self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled);
if let Some(ss) = ready.ss() {
let mut status = status.borrow_mut();
status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
status.raft_state = format!("{:?}", ss.raft_state);
event::broadcast(Event::StatusChanged);
}
// Persist uncommitted entries in the raft log.
self.storage.persist_entries(ready.entries()).unwrap();
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
self.handle_messages(messages);
// Raft HardState changed, and we need to persist it.
if let Some(hs) = ready.hs() {
self.storage.persist_hard_state(hs).unwrap();
}
let read_states = ready.take_read_states();
self.handle_read_states(read_states);
Ok(())
})
.unwrap();
// This bunch of messages is special. It must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft.
let mut light_rd = self.raw_node.advance(ready);
// Update commit index.
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
start_transaction(|| -> Result<(), TransactionError> {
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.storage.persist_commit(commit).unwrap();
}
// Send out the messages.
let messages = light_rd.take_messages();
self.handle_messages(messages);
// Apply committed entries.
self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled);
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
self.handle_committed_entries(committed_entries, topology_changed, expelled);
// Advance the apply index.
self.raw_node.advance_apply();
Ok(())
})
.unwrap();
// Advance the apply index.
self.raw_node.advance_apply();
}
#[inline]
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment