From b77ff1daa71d38576bd977e042cdd158b4f6b686 Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Fri, 23 Sep 2022 10:32:15 +0300
Subject: [PATCH] important: arrange transaction scope in NodeImpl

Raft-rs interface requires the application to process special structure,
the so-called "ready state". It also makes a demand on the processing
workflow. The fields must be processed in a certain sequence, see
<https://docs.rs/raft/0.6.0/raft/index.html#processing-the-ready-state>.

For example, it was a mistake to send `persisted_messages` inside a
transaction, because it's forbidded to do so before persisting the hard
state, but the actual write happens only in the end of transaction.

As for handling `soft_state` and `read_states`, they're useless in a
transaction as they don't persist anything.

The documentation is quite complicated, so this code might be revised
once again later.
---
 src/traft/node.rs | 86 +++++++++++++++++++++--------------------------
 1 file changed, 38 insertions(+), 48 deletions(-)

diff --git a/src/traft/node.rs b/src/traft/node.rs
index 2fd3256ca1..237a99ecfd 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -521,11 +521,11 @@ impl NodeImpl {
     /// Is called during a transaction
     fn handle_committed_entries(
         &mut self,
-        entries: Vec<raft::Entry>,
+        entries: &[raft::Entry],
         topology_changed: &mut bool,
         expelled: &mut bool,
     ) {
-        for entry in &entries {
+        for entry in entries {
             let entry = match traft::Entry::try_from(entry) {
                 Ok(v) => v,
                 Err(e) => {
@@ -651,7 +651,7 @@ impl NodeImpl {
     }
 
     /// Is called during a transaction
-    fn handle_read_states(&mut self, read_states: Vec<raft::ReadState>) {
+    fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
         for rs in read_states {
             let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
                 Ok(Some(v)) => v,
@@ -705,75 +705,65 @@ impl NodeImpl {
 
         let mut ready: raft::Ready = self.raw_node.ready();
 
-        start_transaction(|| -> Result<(), TransactionError> {
-            if !ready.messages().is_empty() {
-                // Send out the messages come from the node.
-                let messages = ready.take_messages();
-                self.handle_messages(messages);
-            }
+        // Send out messages to the other nodes.
+        self.handle_messages(ready.take_messages());
 
-            if !ready.snapshot().is_empty() {
-                // This is a snapshot, we need to apply the snapshot at first.
-                unimplemented!();
-            }
+        // This is a snapshot, we need to apply the snapshot at first.
+        if !ready.snapshot().is_empty() {
+            unimplemented!();
+        }
 
-            let committed_entries = ready.take_committed_entries();
-            self.handle_committed_entries(committed_entries, topology_changed, expelled);
+        if let Some(ss) = ready.ss() {
+            let mut status = status.borrow_mut();
+            status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
+            status.raft_state = format!("{:?}", ss.raft_state);
+            event::broadcast(Event::StatusChanged);
+        }
 
-            if !ready.entries().is_empty() {
-                let e = ready.entries();
-                // Append entries to the Raft log.
-                self.storage.persist_entries(e).unwrap();
-            }
+        self.handle_read_states(ready.read_states());
 
-            if let Some(hs) = ready.hs() {
-                // Raft HardState changed, and we need to persist it.
-                // let hs = hs.clone();
-                self.storage.persist_hard_state(hs).unwrap();
-            }
+        start_transaction(|| -> Result<(), TransactionError> {
+            // Apply committed entries.
+            self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled);
 
-            if let Some(ss) = ready.ss() {
-                let mut status = status.borrow_mut();
-                status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
-                status.raft_state = format!("{:?}", ss.raft_state);
-                event::broadcast(Event::StatusChanged);
-            }
+            // Persist uncommitted entries in the raft log.
+            self.storage.persist_entries(ready.entries()).unwrap();
 
-            if !ready.persisted_messages().is_empty() {
-                // Send out the persisted messages come from the node.
-                let messages = ready.take_persisted_messages();
-                self.handle_messages(messages);
+            // Raft HardState changed, and we need to persist it.
+            if let Some(hs) = ready.hs() {
+                self.storage.persist_hard_state(hs).unwrap();
             }
 
-            let read_states = ready.take_read_states();
-            self.handle_read_states(read_states);
-
             Ok(())
         })
         .unwrap();
 
+        // This bunch of messages is special. It must be sent only
+        // AFTER the HardState, Entries and Snapshot are persisted
+        // to the stable storage.
+        self.handle_messages(ready.take_persisted_messages());
+
         // Advance the Raft.
         let mut light_rd = self.raw_node.advance(ready);
 
-        // Update commit index.
+        // Send out messages to the other nodes.
+        self.handle_messages(light_rd.take_messages());
+
         start_transaction(|| -> Result<(), TransactionError> {
+            // Update commit index.
             if let Some(commit) = light_rd.commit_index() {
                 self.storage.persist_commit(commit).unwrap();
             }
 
-            // Send out the messages.
-            let messages = light_rd.take_messages();
-            self.handle_messages(messages);
+            // Apply committed entries.
+            self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled);
 
-            // Apply all committed entries.
-            let committed_entries = light_rd.take_committed_entries();
-            self.handle_committed_entries(committed_entries, topology_changed, expelled);
-
-            // Advance the apply index.
-            self.raw_node.advance_apply();
             Ok(())
         })
         .unwrap();
+
+        // Advance the apply index.
+        self.raw_node.advance_apply();
     }
 
     #[inline]
-- 
GitLab