diff --git a/src/traft/node.rs b/src/traft/node.rs
index 2fd3256ca1a30a5bedec45a4289fa7680e128928..237a99ecfd921becd8a4733190002c1911c310df 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -521,11 +521,11 @@ impl NodeImpl {
     /// Is called during a transaction
     fn handle_committed_entries(
         &mut self,
-        entries: Vec<raft::Entry>,
+        entries: &[raft::Entry],
         topology_changed: &mut bool,
         expelled: &mut bool,
     ) {
-        for entry in &entries {
+        for entry in entries {
             let entry = match traft::Entry::try_from(entry) {
                 Ok(v) => v,
                 Err(e) => {
@@ -651,7 +651,7 @@ impl NodeImpl {
     }
 
     /// Is called during a transaction
-    fn handle_read_states(&mut self, read_states: Vec<raft::ReadState>) {
+    fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
         for rs in read_states {
             let ctx = match traft::EntryContextNormal::read_from_bytes(&rs.request_ctx) {
                 Ok(Some(v)) => v,
@@ -705,75 +705,65 @@ impl NodeImpl {
 
         let mut ready: raft::Ready = self.raw_node.ready();
 
-        start_transaction(|| -> Result<(), TransactionError> {
-            if !ready.messages().is_empty() {
-                // Send out the messages come from the node.
-                let messages = ready.take_messages();
-                self.handle_messages(messages);
-            }
+        // Send out messages to the other nodes.
+        self.handle_messages(ready.take_messages());
 
-            if !ready.snapshot().is_empty() {
-                // This is a snapshot, we need to apply the snapshot at first.
-                unimplemented!();
-            }
+        // This is a snapshot, we need to apply the snapshot at first.
+        if !ready.snapshot().is_empty() {
+            unimplemented!();
+        }
 
-            let committed_entries = ready.take_committed_entries();
-            self.handle_committed_entries(committed_entries, topology_changed, expelled);
+        if let Some(ss) = ready.ss() {
+            let mut status = status.borrow_mut();
+            status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
+            status.raft_state = format!("{:?}", ss.raft_state);
+            event::broadcast(Event::StatusChanged);
+        }
 
-            if !ready.entries().is_empty() {
-                let e = ready.entries();
-                // Append entries to the Raft log.
-                self.storage.persist_entries(e).unwrap();
-            }
+        self.handle_read_states(ready.read_states());
 
-            if let Some(hs) = ready.hs() {
-                // Raft HardState changed, and we need to persist it.
-                // let hs = hs.clone();
-                self.storage.persist_hard_state(hs).unwrap();
-            }
+        start_transaction(|| -> Result<(), TransactionError> {
+            // Apply committed entries.
+            self.handle_committed_entries(ready.committed_entries(), topology_changed, expelled);
 
-            if let Some(ss) = ready.ss() {
-                let mut status = status.borrow_mut();
-                status.leader_id = (ss.leader_id != INVALID_ID).then(|| ss.leader_id);
-                status.raft_state = format!("{:?}", ss.raft_state);
-                event::broadcast(Event::StatusChanged);
-            }
+            // Persist uncommitted entries in the raft log.
+            self.storage.persist_entries(ready.entries()).unwrap();
 
-            if !ready.persisted_messages().is_empty() {
-                // Send out the persisted messages come from the node.
-                let messages = ready.take_persisted_messages();
-                self.handle_messages(messages);
+            // Raft HardState changed, and we need to persist it.
+            if let Some(hs) = ready.hs() {
+                self.storage.persist_hard_state(hs).unwrap();
             }
 
-            let read_states = ready.take_read_states();
-            self.handle_read_states(read_states);
-
             Ok(())
         })
         .unwrap();
 
+        // This bunch of messages is special. It must be sent only
+        // AFTER the HardState, Entries and Snapshot are persisted
+        // to the stable storage.
+        self.handle_messages(ready.take_persisted_messages());
+
         // Advance the Raft.
         let mut light_rd = self.raw_node.advance(ready);
 
-        // Update commit index.
+        // Send out messages to the other nodes.
+        self.handle_messages(light_rd.take_messages());
+
         start_transaction(|| -> Result<(), TransactionError> {
+            // Update commit index.
             if let Some(commit) = light_rd.commit_index() {
                 self.storage.persist_commit(commit).unwrap();
             }
 
-            // Send out the messages.
-            let messages = light_rd.take_messages();
-            self.handle_messages(messages);
+            // Apply committed entries.
+            self.handle_committed_entries(light_rd.committed_entries(), topology_changed, expelled);
 
-            // Apply all committed entries.
-            let committed_entries = light_rd.take_committed_entries();
-            self.handle_committed_entries(committed_entries, topology_changed, expelled);
-
-            // Advance the apply index.
-            self.raw_node.advance_apply();
             Ok(())
         })
         .unwrap();
+
+        // Advance the apply index.
+        self.raw_node.advance_apply();
     }
 
     #[inline]