Skip to content
Snippets Groups Projects
Commit 9c873a8d authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: extract prepare_for_snapshot function

parent 2ff0085a
No related branches found
No related tags found
No related merge requests found
......@@ -49,7 +49,6 @@ use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::TarantoolError;
use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::Error as TimeoutError;
......@@ -1356,6 +1355,90 @@ impl NodeImpl {
Ok(())
}
/// Prepare for applying the raft snapshot if it's not empty.
///
/// This includes:
/// - Verifying snapshot version against global & local ones;
/// - Waiting until tarantool replication proceeds if this is a read-only replica;
/// - Fetching the rest of the snashot chunks if the first one is not the only one;
fn prepare_for_snapshot(
&self,
snapshot: &raft::Snapshot,
) -> traft::Result<Option<SnapshotData>> {
if snapshot.is_empty() {
return Ok(None);
}
let snapshot_data = crate::unwrap_ok_or!(
SnapshotData::decode(snapshot.get_data()),
Err(e) => {
tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
return Err(e.into());
}
);
let v_snapshot = snapshot_data.schema_version;
loop {
let v_local = local_schema_version().expect("storage souldn't fail");
let v_global = self
.storage
.properties
.global_schema_version()
.expect("storage shouldn't fail");
#[rustfmt::skip]
debug_assert!(v_global <= v_local, "global schema version is only ever increased after local");
#[rustfmt::skip]
debug_assert!(v_global <= v_snapshot, "global schema version updates are distributed via raft");
if v_local > v_snapshot {
tlog!(
Warning,
"skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
v_local,
snapshot_data.schema_version,
);
return Ok(None);
}
if !self.is_readonly() {
// Replicaset leader applies the schema changes directly.
break;
}
if v_local == v_snapshot {
// Replicaset follower has synced schema with the leader,
// now global space dumps should be handled.
break;
}
tlog!(Debug, "v_local: {v_local}, v_snapshot: {v_snapshot}");
self.main_loop_status("awaiting replication");
// Replicaset follower needs to sync with leader via tarantool
// replication.
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
}
let mut snapshot_data = snapshot_data;
if snapshot_data.next_chunk_position.is_some() {
self.main_loop_status("receiving snapshot");
let entry_id = RaftEntryId {
index: snapshot.get_metadata().index,
term: snapshot.get_metadata().term,
};
if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
// Error has been logged.
tlog!(Warning, "dropping snapshot data");
return Err(e);
}
}
Ok(Some(snapshot_data))
}
#[inline(always)]
fn main_loop_status(&self, status: &'static str) {
tlog!(Debug, "main_loop_status = '{status}'");
......@@ -1430,83 +1513,12 @@ impl NodeImpl {
// This is a snapshot, we need to apply the snapshot at first.
let snapshot = ready.snapshot();
let snapshot_data = (|| -> Option<SnapshotData> {
if snapshot.is_empty() {
return None;
}
let snapshot_data = crate::unwrap_ok_or!(
SnapshotData::decode(snapshot.get_data()),
Err(e) => {
tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
return None;
}
);
let v_snapshot = snapshot_data.schema_version;
loop {
let v_local = local_schema_version().expect("storage souldn't fail");
let v_global = self
.storage
.properties
.global_schema_version()
.expect("storage shouldn't fail");
assert!(
v_global <= v_local,
"global schema version is only ever increased after local"
);
assert!(
v_global <= v_snapshot,
"global schema version updates are distributed via raft"
);
if v_local > v_snapshot {
tlog!(
Warning,
"skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
v_local,
snapshot_data.schema_version,
);
return None;
}
if !self.is_readonly() {
// Replicaset leader applies the schema changes directly.
return Some(snapshot_data);
}
if v_local == v_snapshot {
// Replicaset follower has synced schema with the leader,
// now global space dumps should be handled.
return Some(snapshot_data);
}
tlog!(Debug, "v_local: {v_local}, v_snapshot: {v_snapshot}");
self.main_loop_status("awaiting replication");
// Replicaset follower needs to sync with leader via tarantool
// replication.
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
}
})();
if let Some(mut snapshot_data) = snapshot_data {
if snapshot_data.next_chunk_position.is_some() {
self.main_loop_status("receiving snapshot");
let entry_id = RaftEntryId {
index: snapshot.get_metadata().index,
term: snapshot.get_metadata().term,
};
if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
// Error has been logged.
_ = e;
tlog!(Warning, "dropping snapshot data");
return;
}
}
let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
// Error was already logged
return;
};
if let Some(snapshot_data) = snapshot_data {
self.main_loop_status("applying snapshot");
if let Err(e) = transaction(|| -> traft::Result<()> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment