Skip to content
Snippets Groups Projects
Commit a96f83d8 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: used to persist applied index before commit index

This could result in a broken instance if it terminated in between the
two storage writes. Now we always persist the commit index before
applying the persisted entries and do so in a transaction with unstable
entries or snapshot if they are present.
parent 537cf250
No related branches found
No related tags found
No related merge requests found
......@@ -609,6 +609,7 @@ impl NodeImpl {
};
let mut apply_entry_result = EntryApplied;
let mut new_applied = None;
transaction(|| -> tarantool::Result<()> {
self.main_loop_status("handling committed entries");
......@@ -634,12 +635,17 @@ impl NodeImpl {
"index" => entry_index
);
}
#[rustfmt::skip]
self.applied.send(entry_index).expect("applied shouldn't ever be borrowed across yields");
new_applied = Some(entry_index);
Ok(())
})?;
if let Some(new_applied) = new_applied {
self.applied
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
match apply_entry_result {
SleepAndRetry => {
self.main_loop_status("blocked by raft entry");
......@@ -1506,99 +1512,140 @@ impl NodeImpl {
.expect("status shouldn't ever be borrowed across yields");
}
// Send out messages to the other nodes.
// These messages are only available on leader. Send them out ASAP.
self.handle_messages(ready.take_messages());
// Handle read states before applying snapshot which may fail.
self.handle_read_states(ready.read_states());
// This is a snapshot, we need to apply the snapshot at first.
// Raft snapshot has arrived, check if we need to apply it.
let snapshot = ready.snapshot();
let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
// Error was already logged
return;
};
if let Some(snapshot_data) = snapshot_data {
self.main_loop_status("applying snapshot");
// Persist stuff raft wants us to persist.
let hard_state = ready.hs();
let entries_to_persist = ready.entries();
if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() {
let mut new_term = None;
let mut new_applied = None;
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting hard state, entries and/or snapshot");
// Raft HardState changed, and we need to persist it.
if let Some(hard_state) = hard_state {
tlog!(Debug, "hard state: {hard_state:?}");
self.raft_storage.persist_hard_state(hard_state)?;
new_term = Some(hard_state.term);
}
// Persist uncommitted entries in the raft log.
if !entries_to_persist.is_empty() {
#[rustfmt::skip]
debug_assert!(snapshot.is_empty(), "can't have both the snapshot & log entries");
self.raft_storage.persist_entries(entries_to_persist)?;
}
if let Some(snapshot_data) = snapshot_data {
#[rustfmt::skip]
debug_assert!(entries_to_persist.is_empty(), "can't have both the snapshot & log entries");
// Persist snapshot metadata and compact the raft log if it wasn't empty.
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
new_applied = Some(meta.index);
// Persist the contents of the global tables from the snapshot data.
let is_master = !self.is_readonly();
self.storage
.apply_snapshot_data(&snapshot_data, is_master)?;
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
}
if let Err(e) = transaction(|| -> traft::Result<()> {
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
// handle_snapshot_metadata persists applied index, so we update the watch channel
#[rustfmt::skip]
self.applied.send(meta.index).expect("applied shouldn't ever be borrowed across yields");
let is_master = !self.is_readonly();
self.storage
.apply_snapshot_data(&snapshot_data, is_master)?;
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
}
// Apply committed entries.
let res = self.handle_committed_entries(ready.committed_entries(), wake_governor, expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
if let Err(e) = transaction(|| -> Result<(), &str> {
if !ready.entries().is_empty() || ready.hs().is_some() {
self.main_loop_status("persisting entries");
panic!("transaction failed: {e}");
}
// Persist uncommitted entries in the raft log.
self.raft_storage.persist_entries(ready.entries()).unwrap();
// Raft HardState changed, and we need to persist it.
if let Some(hs) = ready.hs() {
self.raft_storage.persist_hard_state(hs).unwrap();
if let Some(new_term) = new_term {
self.status
.send_modify(|s| s.term = hs.term)
.send_modify(|s| s.term = new_term)
.expect("status shouldn't ever be borrowed across yields");
}
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
if let Some(new_applied) = new_applied {
// handle_snapshot_metadata persists applied index, so we update the watch channel
self.applied
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
}
// Apply committed entries.
let committed_entries = ready.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, wake_governor, expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
}
}
// This bunch of messages is special. It must be sent only
// These messages are only available on followers. They must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft.
// Advance the Raft. Make it know, that the necessary entries have been persisted.
// If this is a leader, it may commit some of the newly persisted entries.
let mut light_rd = self.raw_node.advance(ready);
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
// Send new message ASAP. (Only on leader)
let messages = light_rd.take_messages();
if !messages.is_empty() {
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
self.handle_messages(messages);
}
// Update commit index.
// Update commit index. (Only on leader)
if let Some(commit) = light_rd.commit_index() {
self.raft_storage.persist_commit(commit).unwrap();
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting commit index");
tlog!(Debug, "commit index: {}", commit);
self.raft_storage.persist_commit(commit)?;
Ok(())
}) {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}");
}
}
// Apply committed entries.
let res =
self.handle_committed_entries(light_rd.committed_entries(), wake_governor, expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
// These are probably entries which we've just persisted.
let committed_entries = light_rd.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, wake_governor, expelled);
if let Err(e) = res {
panic!("transaction failed: {e}");
}
}
// Advance the apply index.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment