Newer
Older
2005
2006
2007
2008
2009
2010
2011
2012
2013
2014
2015
2016
2017
2018
2019
2020
2021
2022
2023
2024
2025
2026
2027
2028
2029
2030
2031
2032
2033
2034
2035
2036
2037
2038
2039
2040
2041
2042
2043
2044
2045
2046
2047
2048
2049
2050
2051
2052
2053
2054
2055
2056
2057
2058
2059
2060
2061
2062
2063
2064
2065
2066
2067
2068
2069
2070
2071
2072
2073
2074
2075
2076
2077
2078
2079
2080
2081
2082
2083
2084
2085
2086
2087
2088
/// Prepare for applying the raft snapshot if it's not empty.
///
/// This includes:
/// - Verifying snapshot version against global & local ones;
/// - Waiting until tarantool replication proceeds if this is a read-only replica;
/// - Fetching the rest of the snashot chunks if the first one is not the only one;
fn prepare_for_snapshot(
&self,
snapshot: &raft::Snapshot,
) -> traft::Result<Option<SnapshotData>> {
if snapshot.is_empty() {
return Ok(None);
}
let snapshot_data = crate::unwrap_ok_or!(
SnapshotData::decode(snapshot.get_data()),
Err(e) => {
tlog!(Warning, "skipping snapshot, which failed to deserialize: {e}");
return Err(e.into());
}
);
let v_snapshot = snapshot_data.schema_version;
loop {
let v_local = local_schema_version().expect("storage souldn't fail");
let v_global = self
.storage
.properties
.global_schema_version()
.expect("storage shouldn't fail");
#[rustfmt::skip]
debug_assert!(v_global <= v_local, "global schema version is only ever increased after local");
#[rustfmt::skip]
debug_assert!(v_global <= v_snapshot, "global schema version updates are distributed via raft");
if v_local > v_snapshot {
tlog!(
Warning,
"skipping stale snapshot: local schema version: {}, snapshot schema version: {}",
v_local,
snapshot_data.schema_version,
);
return Ok(None);
}
if !self.is_readonly() {
// Replicaset leader applies the schema changes directly.
break;
}
if v_local == v_snapshot {
// Replicaset follower has synced schema with the leader,
// now global space dumps should be handled.
break;
}
tlog!(Debug, "v_local: {v_local}, v_snapshot: {v_snapshot}");
self.main_loop_status("awaiting replication");
// Replicaset follower needs to sync with leader via tarantool
// replication.
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
}
let mut snapshot_data = snapshot_data;
if snapshot_data.next_chunk_position.is_some() {
self.main_loop_status("receiving snapshot");
let entry_id = RaftEntryId {
index: snapshot.get_metadata().index,
term: snapshot.get_metadata().term,
};
if let Err(e) = self.fetch_chunkwise_snapshot(&mut snapshot_data, entry_id) {
// Error has been logged.
tlog!(Warning, "dropping snapshot data");
return Err(e);
}
}
Ok(Some(snapshot_data))
}
#[inline(always)]
fn main_loop_status(&self, status: &'static str) {
if self.status.get().main_loop_status == status {
return;
}
tlog!(Debug, "main_loop_status = '{status}'");
self.status
.send_modify(|s| s.main_loop_status = status)
.expect("status shouldn't ever be borrowed across yields");
/// Processes a so-called "ready state" of the [`raft::RawNode`].
///
/// This includes:
/// - Sending messages to other instances (raft nodes);
/// - Handling raft snapshot:
/// - Verifying & waiting until snapshot data can be applied
/// (see [`Self::prepare_for_snapshot`] for more details);
/// - Persisting snapshot metadata;
/// - Compacting the raft log;
/// - Restoring local storage contents from the snapshot;
/// - Persisting uncommitted entries;
/// - Persisting hard state (term, vote, commit);
/// - Applying committed entries;
/// - Notifying pending fibers;
/// - Waking up the governor loop, so that it can handle any global state
/// changes;
///
/// Returns an error if the instance was expelled from the cluster.
///
/// See also:
///
/// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
/// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
///
/// This function yields.
fn advance(&mut self) -> traft::Result<()> {
// Handle any unreachable nodes from previous iteration.
let unreachables = self
.instance_reachability
.borrow_mut()
.take_unreachables_to_report();
for raft_id in unreachables {
self.raw_node.report_unreachable(raft_id);
// TODO: remove infos when instances are expelled.
let Some(pr) = self.raw_node.raft.mut_prs().get_mut(raft_id) else {
continue;
};
// NOTE: Raft-rs will not check if the node should be paused until
// a new raft entry is appended to the log. This means that once an
// instance goes silent it will still be bombarded with heartbeats
// until someone proposes an operation. This is a workaround for
// that particular case.
// The istance's state would've only changed if it was not in the
// Snapshot state, so we have to check for that.
if pr.state == ::raft::ProgressState::Probe {
pr.pause();
}
}
// Get the `Ready` with `RawNode::ready` interface.
if !self.raw_node.has_ready() {
return Ok(());
let mut expelled = false;
let mut ready: raft::Ready = self.raw_node.ready();
// Apply soft state changes before anything else, so that this info is
// available for other fibers as soon as main loop yields.
if let Some(ss) = ready.ss() {
self.status
.send_modify(|s| {
s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
s.raft_state = ss.raft_state.into();
})
.expect("status shouldn't ever be borrowed across yields");
}
// These messages are only available on leader. Send them out ASAP.
self.handle_messages(ready.take_messages());
// Handle read states before applying snapshot which may fail.
self.handle_read_states(ready.read_states());
// Raft snapshot has arrived, check if we need to apply it.
let snapshot = ready.snapshot();
let Ok(snapshot_data) = self.prepare_for_snapshot(snapshot) else {
// Error was already logged
return Ok(());
2184
2185
2186
2187
2188
2189
2190
2191
2192
2193
2194
2195
2196
2197
2198
2199
2200
2201
2202
2203
2204
2205
2206
2207
2208
2209
2210
2211
2212
2213
// Persist stuff raft wants us to persist.
let hard_state = ready.hs();
let entries_to_persist = ready.entries();
if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() {
let mut new_term = None;
let mut new_applied = None;
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting hard state, entries and/or snapshot");
// Raft HardState changed, and we need to persist it.
if let Some(hard_state) = hard_state {
tlog!(Debug, "hard state: {hard_state:?}");
self.raft_storage.persist_hard_state(hard_state)?;
new_term = Some(hard_state.term);
}
// Persist uncommitted entries in the raft log.
if !entries_to_persist.is_empty() {
#[rustfmt::skip]
debug_assert!(snapshot.is_empty(), "can't have both the snapshot & log entries");
self.raft_storage.persist_entries(entries_to_persist)?;
}
if let Some(snapshot_data) = snapshot_data {
#[rustfmt::skip]
debug_assert!(entries_to_persist.is_empty(), "can't have both the snapshot & log entries");
let meta = snapshot.get_metadata();
let applied_index = self.raft_storage.applied()?;
// Persist snapshot metadata and compact raft log if it wasn't empty.
self.raft_storage.handle_snapshot_metadata(meta)?;
// Skip snapshot with the same index, as an optimization,
// because the contents of global tables are already up to date.
tlog!(
Warning,
"skipping snapshot with the same index: {applied_index}"
)
} else {
// Persist the contents of the global tables from the snapshot data.
let is_master = !self.is_readonly();
self.storage
.apply_snapshot_data(&snapshot_data, is_master)?;
new_applied = Some(meta.index);
}
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
}
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
if let Some(new_term) = new_term {
.send_modify(|s| s.term = new_term)
.expect("status shouldn't ever be borrowed across yields");
if let Some(new_applied) = new_applied {
// handle_snapshot_metadata persists applied index, so we update the watch channel
self.applied
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
if hard_state.is_some() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
}
if !entries_to_persist.is_empty() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_ENTRIES");
}
#[cfg(feature = "error_injection")]
if crate::error_injection::is_enabled("BLOCK_WHEN_PERSISTING_DDL_COMMIT") {
for entry in entries_to_persist {
let row = traft::Entry::try_from(entry).unwrap();
let op = row.into_op();
if let Some(Op::DdlCommit) = op {
crate::error_injection!(block "BLOCK_WHEN_PERSISTING_DDL_COMMIT");
}
}
}
// Apply committed entries.
let committed_entries = ready.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, &mut expelled);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
// These messages are only available on followers. They must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft. Make it know, that the necessary entries have been persisted.
// If this is a leader, it may commit some of the newly persisted entries.
let mut light_rd = self.raw_node.advance(ready);
// Send new message ASAP. (Only on leader)
let messages = light_rd.take_messages();
if !messages.is_empty() {
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
self.handle_messages(messages);
}
// Update commit index. (Only on leader)
if let Some(commit) = light_rd.commit_index() {
debug_assert!(self.raw_node.raft.state == RaftStateRole::Leader);
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting commit index");
tlog!(Debug, "commit index: {}", commit);
self.raft_storage.persist_commit(commit)?;
Ok(())
}) {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}");
}
crate::error_injection!(block "BLOCK_AFTER_RAFT_PERSISTS_COMMIT_INDEX");
// Apply committed entries.
// These are probably entries which we've just persisted.
let committed_entries = light_rd.committed_entries();
if !committed_entries.is_empty() {
let res = self.handle_committed_entries(committed_entries, &mut expelled);
if let Err(e) = res {
panic!("transaction failed: {e}");
}
crate::error_injection!(exit "EXIT_AFTER_RAFT_HANDLES_COMMITTED_ENTRIES");
// Advance the apply index.
self.raw_node.advance_apply();
self.main_loop_status("idle");
if expelled {
return Err(Error::Expelled);
}
Ok(())
/// Check if this is a read only replica. This function is called when we
/// need to determine if this instance should be changing the schema
/// definition or if it should instead synchronize with a master.
///
/// Note: it would be a little more reliable to check if the replica is
/// chosen to be a master by checking master_id in _pico_replicaset, but
/// currently we cannot do that, because tarantool replication is being
/// done asynchronously with raft log replication. Basically instance needs
/// to know it's a replicaset master before it can access the replicaset
/// info.
fn is_readonly(&self) -> bool {
let is_ro: bool = crate::tarantool::eval("return box.info.ro")
.expect("checking read-onlyness should never fail");
is_ro
/// Generates a pair of logical clock and a notification channel.
/// Logical clock is a unique identifier suitable for tagging
/// entries in raft log. Notification is broadcasted when the
/// corresponding entry is committed.
#[inline]
fn schedule_read_state_waker(&mut self) -> (LogicalClock, oneshot::Receiver<RaftIndex>) {
let (tx, rx) = oneshot::channel();
let lc = {
self.lc.inc();
self.lc
};
self.read_state_wakers.insert(lc, tx);
/// Return value of [`NodeImpl::handle_committed_normal_entry`], explains what should be
/// done as result of attempting to apply a given entry.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum ApplyEntryResult {
/// This entry failed to apply for some reason, and must be retried later.
SleepAndRetry,
/// Entry applied successfully, proceed to next entry.
EntryApplied,
}
pub(crate) struct MainLoop {
_loop: Option<fiber::JoinHandle<'static, ()>>,
loop_waker: watch::Sender<()>,
stop_flag: Rc<Cell<bool>>,
}
struct MainLoopState {
loop_waker: watch::Receiver<()>,
stop_flag: Rc<Cell<bool>>,
}
impl MainLoop {
pub const TICK: Duration = Duration::from_millis(100);
fn start(node_impl: Rc<Mutex<NodeImpl>>) -> Self {
let (loop_waker_tx, loop_waker_rx) = watch::channel(());
let stop_flag: Rc<Cell<bool>> = Default::default();
let state = MainLoopState {
node_impl,
next_tick: Instant::now_fiber(),
loop_waker: loop_waker_rx,
stop_flag: stop_flag.clone(),
};
Self {
_loop: loop_start!("raft_main_loop", Self::iter_fn, state),
loop_waker: loop_waker_tx,
stop_flag,
}
}
pub fn wakeup(&self) {
let _ = self.loop_waker.send(());
async fn iter_fn(state: &mut MainLoopState) -> ControlFlow<()> {
let _ = state.loop_waker.changed().timeout(Self::TICK).await;
return ControlFlow::Break(());
// FIXME: potential deadlock - can't use sync mutex in async fn
let mut node_impl = state.node_impl.lock(); // yields
return ControlFlow::Break(());
node_impl
.read_state_wakers
.retain(|_, waker| !waker.is_closed());
let now = Instant::now_fiber();
state.next_tick = now.saturating_add(Self::TICK);
let res = node_impl.advance(); // yields
drop(node_impl);
return ControlFlow::Break(());
match res {
Err(e @ Error::Expelled) => {
tlog!(Info, "{e}, shutting down");
crate::tarantool::exit(0);
}
Err(e) => {
tlog!(Error, "error during raft main loop iteration: {e}");
Ok(()) => {}
ControlFlow::Continue(())
}
}
impl Drop for MainLoop {
fn drop(&mut self) {
self.stop_flag.set(true);
let _ = self.loop_waker.send(());
pub fn global() -> traft::Result<&'static Node> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
let msg = raft::Message::try_from(pb).map_err(Error::other)?;
node.instance_reachability
.borrow_mut()
.report_result(msg.from, true);
node.step_and_yield(msg);
2505
2506
2507
2508
2509
2510
2511
2512
2513
2514
2515
2516
2517
2518
2519
2520
2521
2522
2523
2524
2525
2526
2527
2528
2529
2530
2531
2532
2533
2534
/// Internal API. Causes this instance to artificially timeout on waiting
/// for a heartbeat from raft leader. The instance then will start a new
/// election and transition to a 'PreCandidate' state.
///
/// This function yields. It returns when the raft node changes it's state.
///
/// Later the instance will likely become a leader, unless there are some
/// impediments, e.g. the loss of quorum or split-vote.
///
/// Example log:
/// ```ignore
/// received MsgTimeoutNow from 3 and starts an election
/// to get leadership., from: 3, term: 4, raft_id: 3
///
/// starting a new election, term: 4, raft_id: 3
///
/// became candidate at term 5, term: 5, raft_id: 3
///
/// broadcasting vote request, to: [4, 1], log_index: 54,
/// log_term: 4, term: 5, type: MsgRequestVote, raft_id: 3
///
/// received votes response, term: 5, type: MsgRequestVoteResponse,
/// approvals: 2, rejections: 0, from: 4, vote: true, raft_id: 3
///
/// became leader at term 5, term: 5, raft_id: 3
/// ```
#[proc(public = false)]
fn proc_raft_promote() -> traft::Result<()> {
let node = global()?;
node.campaign_and_yield()?;