diff --git a/src/traft/node.rs b/src/traft/node.rs index 53446ac9ae5e9486159811348b61dfd676a389ce..af369367cf7f842de504bbf21735551bd698a9a3 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -609,6 +609,7 @@ impl NodeImpl { }; let mut apply_entry_result = EntryApplied; + let mut new_applied = None; transaction(|| -> tarantool::Result<()> { self.main_loop_status("handling committed entries"); @@ -634,12 +635,17 @@ impl NodeImpl { "index" => entry_index ); } - #[rustfmt::skip] - self.applied.send(entry_index).expect("applied shouldn't ever be borrowed across yields"); + new_applied = Some(entry_index); Ok(()) })?; + if let Some(new_applied) = new_applied { + self.applied + .send(new_applied) + .expect("applied shouldn't ever be borrowed across yields"); + } + match apply_entry_result { SleepAndRetry => { self.main_loop_status("blocked by raft entry"); @@ -1506,99 +1512,140 @@ impl NodeImpl { .expect("status shouldn't ever be borrowed across yields"); } - // Send out messages to the other nodes. + // These messages are only available on leader. Send them out ASAP. self.handle_messages(ready.take_messages()); // Handle read states before applying snapshot which may fail. self.handle_read_states(ready.read_states()); - // This is a snapshot, we need to apply the snapshot at first. + // Raft snapshot has arrived, check if we need to apply it. let snapshot = ready.snapshot(); let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else { // Error was already logged return; }; - if let Some(snapshot_data) = snapshot_data { - self.main_loop_status("applying snapshot"); + // Persist stuff raft wants us to persist. + let hard_state = ready.hs(); + let entries_to_persist = ready.entries(); + if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() { + let mut new_term = None; + let mut new_applied = None; + + if let Err(e) = transaction(|| -> Result<(), Error> { + self.main_loop_status("persisting hard state, entries and/or snapshot"); + + // Raft HardState changed, and we need to persist it. + if let Some(hard_state) = hard_state { + tlog!(Debug, "hard state: {hard_state:?}"); + self.raft_storage.persist_hard_state(hard_state)?; + new_term = Some(hard_state.term); + } + + // Persist uncommitted entries in the raft log. + if !entries_to_persist.is_empty() { + #[rustfmt::skip] + debug_assert!(snapshot.is_empty(), "can't have both the snapshot & log entries"); + + self.raft_storage.persist_entries(entries_to_persist)?; + } + + if let Some(snapshot_data) = snapshot_data { + #[rustfmt::skip] + debug_assert!(entries_to_persist.is_empty(), "can't have both the snapshot & log entries"); + + // Persist snapshot metadata and compact the raft log if it wasn't empty. + let meta = snapshot.get_metadata(); + self.raft_storage.handle_snapshot_metadata(meta)?; + new_applied = Some(meta.index); + + // Persist the contents of the global tables from the snapshot data. + let is_master = !self.is_readonly(); + self.storage + .apply_snapshot_data(&snapshot_data, is_master)?; + + // TODO: As long as the snapshot was sent to us in response to + // a rejected MsgAppend (which is the only possible case + // currently), we will send a MsgAppendResponse back which will + // automatically reset our status from Snapshot to Replicate. + // But when we implement support for manual snapshot requests, + // we will have to also implement sending a MsgSnapStatus, + // to reset out status explicitly to avoid leader ignoring us + // indefinitely after that point. + } - if let Err(e) = transaction(|| -> traft::Result<()> { - let meta = snapshot.get_metadata(); - self.raft_storage.handle_snapshot_metadata(meta)?; - // handle_snapshot_metadata persists applied index, so we update the watch channel - #[rustfmt::skip] - self.applied.send(meta.index).expect("applied shouldn't ever be borrowed across yields"); - - let is_master = !self.is_readonly(); - self.storage - .apply_snapshot_data(&snapshot_data, is_master)?; - - // TODO: As long as the snapshot was sent to us in response to - // a rejected MsgAppend (which is the only possible case - // currently), we will send a MsgAppendResponse back which will - // automatically reset our status from Snapshot to Replicate. - // But when we implement support for manual snapshot requests, - // we will have to also implement sending a MsgSnapStatus, - // to reset out status explicitly to avoid leader ignoring us - // indefinitely after that point. Ok(()) }) { tlog!(Warning, "dropping raft ready: {ready:#?}"); - panic!("transaction failed: {e}, {}", TarantoolError::last()); - } - } - - // Apply committed entries. - let res = self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled); - if let Err(e) = res { - tlog!(Warning, "dropping raft ready: {ready:#?}"); - panic!("transaction failed: {e}, {}", TarantoolError::last()); - } - - if let Err(e) = transaction(|| -> Result<(), &str> { - if !ready.entries().is_empty() || ready.hs().is_some() { - self.main_loop_status("persisting entries"); + panic!("transaction failed: {e}"); } - // Persist uncommitted entries in the raft log. - self.raft_storage.persist_entries(ready.entries()).unwrap(); - - // Raft HardState changed, and we need to persist it. - if let Some(hs) = ready.hs() { - self.raft_storage.persist_hard_state(hs).unwrap(); + if let Some(new_term) = new_term { self.status - .send_modify(|s| s.term = hs.term) + .send_modify(|s| s.term = new_term) .expect("status shouldn't ever be borrowed across yields"); } - Ok(()) - }) { - tlog!(Warning, "dropping raft ready: {ready:#?}"); - panic!("transaction failed: {e}"); + if let Some(new_applied) = new_applied { + // handle_snapshot_metadata persists applied index, so we update the watch channel + self.applied + .send(new_applied) + .expect("applied shouldn't ever be borrowed across yields"); + } + } + + // Apply committed entries. + let committed_entries = ready.committed_entries(); + if !committed_entries.is_empty() { + let res = self.handle_committed_entries(committed_entries, wake_governor, expelled); + if let Err(e) = res { + tlog!(Warning, "dropping raft ready: {ready:#?}"); + panic!("transaction failed: {e}"); + } } - // This bunch of messages is special. It must be sent only + // These messages are only available on followers. They must be sent only // AFTER the HardState, Entries and Snapshot are persisted // to the stable storage. self.handle_messages(ready.take_persisted_messages()); - // Advance the Raft. + // Advance the Raft. Make it know, that the necessary entries have been persisted. + // If this is a leader, it may commit some of the newly persisted entries. let mut light_rd = self.raw_node.advance(ready); - // Send out messages to the other nodes. - self.handle_messages(light_rd.take_messages()); + // Send new message ASAP. (Only on leader) + let messages = light_rd.take_messages(); + if !messages.is_empty() { + debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader); + + self.handle_messages(messages); + } - // Update commit index. + // Update commit index. (Only on leader) if let Some(commit) = light_rd.commit_index() { - self.raft_storage.persist_commit(commit).unwrap(); + debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader); + + if let Err(e) = transaction(|| -> Result<(), Error> { + self.main_loop_status("persisting commit index"); + tlog!(Debug, "commit index: {}", commit); + + self.raft_storage.persist_commit(commit)?; + + Ok(()) + }) { + tlog!(Warning, "dropping raft light ready: {light_rd:#?}"); + panic!("transaction failed: {e}"); + } } // Apply committed entries. - let res = - self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled); - if let Err(e) = res { - tlog!(Warning, "dropping raft light ready: {light_rd:#?}"); - panic!("transaction failed: {e}, {}", TarantoolError::last()); + // These are probably entries which we've just persisted. + let committed_entries = light_rd.committed_entries(); + if !committed_entries.is_empty() { + let res = self.handle_committed_entries(committed_entries, wake_governor, expelled); + if let Err(e) = res { + panic!("transaction failed: {e}"); + } } // Advance the apply index.