Newer
Older
//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use crate::access_control::user_by_id;
use crate::config::PicodataConfig;

Georgy Moshkin
committed
use crate::governor;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::plugin::migration;
use crate::reachability::instance_reachability_manager;
use crate::reachability::InstanceReachabilityManagerRef;
use crate::rpc::snapshot::proc_raft_snapshot_next_chunk;
use crate::schema::RoutineDef;
use crate::schema::RoutineKind;
use crate::schema::SchemaObjectType;
use crate::schema::{Distribution, IndexDef, IndexOption, TableDef};
use crate::sentinel;
use crate::storage::cached_key_def;
use crate::storage::schema::acl;
use crate::storage::schema::ddl_abort_on_master;
use crate::storage::schema::ddl_meta_drop_routine;
use crate::storage::schema::ddl_meta_drop_space;
use crate::storage::schema::ddl_meta_space_update_operable;
use crate::storage::snapshot::SnapshotData;
use crate::storage::space_by_id;
use crate::storage::{local_schema_version, set_local_schema_version};
use crate::storage::{Clusterwide, ClusterwideTable, PropertyName};
use crate::traft::error::Error;
use crate::traft::network::WorkerOptions;
use crate::traft::op::PluginRaftOp;
use crate::traft::op::{Acl, Ddl, Dml, Op};
use crate::traft::RaftEntryId;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::unwrap_ok_or;
use crate::unwrap_some_or;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::BoxError;
use ::tarantool::error::TarantoolErrorCode;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::Error as TimeoutError;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::index::IndexType;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::time::Instant;
use ::tarantool::tuple::{Decode, Tuple};
use crate::plugin::manager::PluginManager;
use crate::plugin::PluginAsyncEvent;
use std::collections::HashMap;
use std::ops::ControlFlow;
use std::rc::Rc;
use std::time::Duration;
use ApplyEntryResult::*;
type RawNode = raft::RawNode<RaftSpaceAccess>;
::tarantool::define_str_enum! {
pub enum RaftState {
Follower = "Follower",
Candidate = "Candidate",
Leader = "Leader",
PreCandidate = "PreCandidate",
}
}
impl RaftState {
pub fn is_leader(&self) -> bool {
matches!(self, Self::Leader)
}
}
impl From<RaftStateRole> for RaftState {
fn from(role: RaftStateRole) -> Self {
match role {
RaftStateRole::Follower => Self::Follower,
RaftStateRole::Candidate => Self::Candidate,
RaftStateRole::Leader => Self::Leader,
RaftStateRole::PreCandidate => Self::PreCandidate,
}
}
}
#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
/// `raft_id` of the current instance
/// `raft_id` of the leader instance
pub leader_id: Option<RaftId>,
/// Current term number
pub term: RaftTerm,
/// Current raft state
pub raft_state: RaftState,
/// Current state of the main loop.
///
/// Set this before yielding from [`NodeImpl::advance`].
pub main_loop_status: &'static str,
pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
if requested_term != self.term {
return Err(Error::TermMismatch {
requested: requested_term,
current: self.term,
});
}
Ok(())
}
}
/// The heart of `traft` module - the Node.
/// RaftId of the Node.
//
// It appears twice in the Node: here and in `status.id`.
// This is a concious decision.
// `self.raft_id()` is used in Rust API, and
// `self.status()` is mostly useful in Lua API.

Georgy Moshkin
committed
pub(crate) raft_id: RaftId,
pub(crate) storage: Clusterwide,
pub(crate) raft_storage: RaftSpaceAccess,
pub(crate) main_loop: MainLoop,
pub(crate) governor_loop: governor::Loop,
pub(crate) sentinel_loop: sentinel::Loop,
#[allow(unused)]
pub(crate) pool: Rc<ConnectionPool>,
status: watch::Receiver<Status>,
applied: watch::Receiver<RaftIndex>,
/// Should be locked during join and update instance request
/// to avoid costly cas conflicts during concurrent requests.
pub instances_update: Mutex<()>,
/// Manage plugins loaded or must be loaded at this node.
pub plugin_manager: Rc<PluginManager>,
pub(crate) instance_reachability: InstanceReachabilityManagerRef,
impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("raft_id", &self.raft_id)
.finish_non_exhaustive()
}
static mut RAFT_NODE: Option<Box<Node>> = None;
/// Initialize the global raft node singleton. Returns a reference to it.
///
/// Returns an error in case of storage failure.
/// **This function yields**
pub fn init(
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
) -> Result<&'static Self, Error> {
if unsafe { RAFT_NODE.is_some() } {
return Err(Error::other("raft node is already initialized"));
}

Georgy Moshkin
committed
let opts = WorkerOptions {
raft_msg_handler: proc_name!(proc_raft_interact),

Georgy Moshkin
committed
call_timeout: MainLoop::TICK.saturating_mul(4),
..Default::default()
};
let mut pool = ConnectionPool::new(storage.clone(), opts);
let instance_reachability = instance_reachability_manager(storage.clone());
pool.instance_reachability = instance_reachability.clone();
let pool = Rc::new(pool);
let plugin_manager = Rc::new(PluginManager::new(storage.clone()));

Georgy Moshkin
committed
let node_impl = NodeImpl::new(
pool.clone(),
storage.clone(),
raft_storage.clone(),
plugin_manager.clone(),
)?;
let raft_id = node_impl.raft_id();
let status = node_impl.status.subscribe();
let applied = node_impl.applied.subscribe();
let node_impl = Rc::new(Mutex::new(node_impl));
// Raft main loop accesses the global node refernce,
// so it must be initilized before the main loop starts.
let guard = crate::util::NoYieldsGuard::new();
main_loop: MainLoop::start(node_impl.clone()),
governor_loop: governor::Loop::start(
pool.clone(),
status.clone(),
storage.clone(),
raft_storage.clone(),
),
sentinel_loop: sentinel::Loop::start(

Georgy Moshkin
committed
status.clone(),
storage.clone(),
raft_storage.clone(),
instance_reachability.clone(),

Georgy Moshkin
committed
),
storage,
raft_storage,
instance_reachability,
unsafe { RAFT_NODE = Some(Box::new(node)) };
let node = global().expect("just initialized it");
drop(guard);
node.tick_and_yield(0);
#[inline(always)]
pub fn raft_id(&self) -> RaftId {
#[inline(always)]
#[inline(always)]
pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {
self.node_impl.lock()
}
/// Wait for the status to be changed.
/// **This function yields**
#[inline(always)]
fiber::block_on(self.status.clone().changed()).unwrap();
/// Returns current applied [`RaftIndex`].
#[inline(always)]
pub fn get_index(&self) -> RaftIndex {
self.applied.get()
}
/// Performs the quorum read operation.
///
/// If works the following way:
///
/// 1. The instance forwards a request (`MsgReadIndex`) to a raft
/// leader. In case there's no leader at the moment, the function
/// returns `Err(ProposalDropped)`.
/// 2. Raft leader tracks its `commit_index` and broadcasts a
/// heartbeat to followers to make certain that it's still a
/// leader.
/// 3. As soon as the heartbeat is acknowlenged by the quorum, the
/// function returns that index.
/// 4. The instance awaits when the index is applied. If timeout
/// expires beforehand, the function returns `Err(Timeout)`.
///
/// Returns current applied [`RaftIndex`].
///
/// **This function yields**
pub fn read_index(&self, timeout: Duration) -> traft::Result<RaftIndex> {
let deadline = fiber::clock().saturating_add(timeout);
let rx = self.raw_operation(|node_impl| node_impl.read_index_async())?;
let index: RaftIndex = fiber::block_on(rx.timeout(timeout)).map_err(|_| Error::Timeout)?;
self.wait_index(index, deadline.duration_since(fiber::clock()))
}
/// Waits for [`RaftIndex`] to be applied to the storage locally.
///
/// Returns current applied [`RaftIndex`]. It can be equal to or
/// greater than the target one. If timeout expires beforehand, the
/// function returns `Err(Timeout)`.
///
/// **This function yields**
// TODO: this should also take a term and return an error if the term
// changes, because it means that leader has changed and the entry got
// rolled back.
pub fn wait_index(&self, target: RaftIndex, timeout: Duration) -> traft::Result<RaftIndex> {
tlog!(Debug, "waiting for applied index {target}");
let mut applied = self.applied.clone();
let deadline = fiber::clock().saturating_add(timeout);
fiber::block_on(async {
loop {
let current = self.get_index();
if current >= target {
tlog!(
Debug,
"done waiting for applied index {target}, current: {current}"
);
return Ok(current);
}
let timeout = deadline.duration_since(fiber::clock());
let res = applied.changed().timeout(timeout).await;
if let Err(TimeoutError::Expired) = res {
tlog!(
Debug,
"failed waiting for applied index {target}: timeout, current: {current}"
);
return Err(Error::Timeout);
}
/// Proposes a `Op::Nop` operation to raft log.
/// Returns the index of the resuling entry.
///
/// If called on a non leader, returns an error.
///
/// **This function yields**
pub fn propose_nop(&self) -> traft::Result<RaftIndex> {
let entry_id = self.raw_operation(|node_impl| node_impl.propose_async(Op::Nop))?;
Ok(entry_id.index)
/// Become a candidate and wait for a main loop round so that there's a
/// chance we become the leader.
/// **This function yields**
pub fn campaign_and_yield(&self) -> traft::Result<()> {
self.raw_operation(|node_impl| node_impl.campaign())?;
// Even though we don't expect a response, we still should let the
// main_loop do an iteration. Without rescheduling, the Ready state
// wouldn't be processed, the Status wouldn't be updated, and some
// assertions may fail (e.g. in `postjoin()` in `main.rs`).
fiber::reschedule();
Ok(())
/// **This function yields**
pub fn step_and_yield(&self, msg: raft::Message) {
self.raw_operation(|node_impl| node_impl.step(msg))
.map_err(|e| tlog!(Error, "{e}"))
.ok();
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
/// **This function yields**
pub fn tick_and_yield(&self, n_times: u32) {
self.raw_operation(|node_impl| node_impl.tick(n_times));
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
/// Only the conf_change_loop on a leader is eligible to call this function.
///
/// **This function yields**

Georgy Moshkin
committed
pub(crate) fn propose_conf_change_and_wait(
&self,
term: RaftTerm,
conf_change: raft::ConfChangeV2,
) -> traft::Result<()> {
let notify =
self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
fiber::block_on(notify).unwrap()?;
Ok(())
/// Attempt to transfer leadership to a given node and yield.
///
/// **This function yields**
pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));
fiber::reschedule();
}
/// This function **may yield** if `self.node_impl` mutex is acquired.
#[inline]
fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
let mut node_impl = self.node_impl.lock();
res
#[inline]
pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
self.raft_storage.all_traft_entries()
}
let is_ro: bool = crate::tarantool::eval("return box.info.ro")
.expect("checking read-onlyness should never fail");
is_ro
pub raw_node: RawNode,
pub read_state_wakers: HashMap<LogicalClock, oneshot::Sender<RaftIndex>>,
joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,

Georgy Moshkin
committed
pool: Rc<ConnectionPool>,
status: watch::Sender<Status>,
applied: watch::Sender<RaftIndex>,
instance_reachability: InstanceReachabilityManagerRef,

Georgy Moshkin
committed
pool: Rc<ConnectionPool>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
) -> Result<Self, RaftError> {
let box_err = |e| StorageError::Other(Box::new(e));
let raft_id: RaftId = raft_storage
.raft_id()
.map_err(box_err)?
.expect("raft_id should be set by the time the node is being initialized");
let applied: RaftIndex = raft_storage.applied().map_err(box_err)?;

Georgy Moshkin
committed
let term: RaftTerm = raft_storage.term().map_err(box_err)?;
let gen = raft_storage.gen().unwrap() + 1;
raft_storage.persist_gen(gen).unwrap();
LogicalClock::new(raft_id, gen)
};
let cfg = raft::Config {
id: raft_id,
applied,
pre_vote: true,
// XXX: this value is pretty random, we should really do some
// testing to determine the best value for it.
max_size_per_msg: 64,
..Default::default()
};
let raw_node = RawNode::new(&cfg, raft_storage.clone(), tlog::root())?;
let (status, _) = watch::channel(Status {
id: raft_id,
leader_id: None,

Georgy Moshkin
committed
term,
raft_state: RaftState::Follower,
let (applied, _) = watch::channel(applied);
Ok(Self {
raw_node,
read_state_wakers: Default::default(),
joint_state_latch: KVCell::new(),
raft_storage,
instance_reachability: pool.instance_reachability.clone(),
})
}
fn raft_id(&self) -> RaftId {
self.raw_node.raft.id
}
pub fn read_index_async(&mut self) -> Result<oneshot::Receiver<RaftIndex>, RaftError> {
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
let term_just_started = // ...
self.raw_node.raft.state == RaftStateRole::Leader
if leader_doesnt_exist || term_just_started {
return Err(RaftError::ProposalDropped);
}
let (lc, rx) = self.schedule_read_state_waker();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::ReadStateContext { lc };
self.raw_node.read_index(ctx.to_raft_ctx());
Ok(rx)
///
/// Returns id of the proposed entry, which can be used to await it's
/// application.
///
/// Returns an error if current instance is not a raft leader, because
/// followers should propose raft log entries via [`proc_cas`].
///
/// NOTE: the proposed entry may still be dropped, and the entry at that
/// index may be some other one. It's the callers responsibility to verify
/// which entry got committed.
///
/// [`proc_cas`]: crate::cas::proc_cas
pub fn propose_async<T>(&mut self, op: T) -> Result<RaftEntryId, Error>
if self.raw_node.raft.state != RaftStateRole::Leader {
return Err(Error::NotALeader);
}
let index_before = self.raw_node.raft.raft_log.last_index();
let term = self.raw_node.raft.term;
let context = traft::EntryContext::Op(op.into());
// Check resulting raft log entry does not exceed the maximum tuple size limit.
let entry = context.to_raft_entry();
let tuple_size = traft::Entry::tuple_size(index_before + 1, term, &[], &entry.context);
if tuple_size > PicodataConfig::max_tuple_size() {
return Err(BoxError::new(
TarantoolErrorCode::MemtxMaxTupleSize,
format!("tuple size {tuple_size} exceeds the allowed limit"),
)
.into());
}
// Copy-pasted from `raft::raw_node::RawNode::propose`
let mut m = raft::Message::default();
m.set_msg_type(raft::MessageType::MsgPropose);
m.from = self.raw_node.raft.id;
m.set_entries(vec![entry].into());
self.raw_node.raft.step(m)?;
let index = self.raw_node.raft.raft_log.last_index();
debug_assert!(index == index_before + 1);
let entry_id = RaftEntryId { term, index };
Ok(entry_id)
}
pub fn campaign(&mut self) -> Result<(), RaftError> {
self.raw_node.campaign()
}
pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
if msg.to != self.raft_id() {
return Ok(());
}
// TODO check it's not a MsgPropose with op::Dml for updating _pico_instance.
// TODO check it's not a MsgPropose with ConfChange.
self.raw_node.step(msg)
}
pub fn tick(&mut self, n_times: u32) {
for _ in 0..n_times {
self.raw_node.tick();
}
}
fn propose_conf_change_async(
&mut self,
term: RaftTerm,
conf_change: raft::ConfChangeV2,
) -> Result<oneshot::Receiver<Result<(), RaftError>>, RaftError> {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if self.raw_node.raft.state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()));
}
if term != self.raw_node.raft.term {
return Err(RaftError::ConfChangeError("raft term mismatch".into()));
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if self.raw_node.raft.has_pending_conf() {
return Err(RaftError::ConfChangeError(
"already has pending confchange".into(),
));
}
let prev_index = self.raw_node.raft.raft_log.last_index();
self.raw_node.propose_conf_change(vec![], conf_change)?;
// Ensure the ConfChange was actually appended to the log.
// Otherwise it's a problem: current instance isn't actually a
// leader (which is impossible in theory, but we're not sure in
// practice) and sent the message to the raft network. It may
// lead to an inconsistency.
let last_index = self.raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
if !self.joint_state_latch.is_empty() {
warn_or_panic!("joint state latch is locked");
}
let (tx, rx) = oneshot::channel();
self.joint_state_latch.insert(last_index, tx);
Ok(rx)
}
fn handle_committed_entries(
&mut self,
entries: &[raft::Entry],
expelled: &mut bool,
) -> traft::Result<()> {
let mut entries = entries.iter().peekable();
while let Some(&entry) = entries.peek() {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
continue;
}
};
let mut apply_entry_result = EntryApplied;
let mut new_applied = None;
self.main_loop_status("handling committed entries");
let entry_index = entry.index;
match entry.entry_type {
raft::EntryType::EntryNormal => {
apply_entry_result = self.handle_committed_normal_entry(entry, expelled);
if apply_entry_result != EntryApplied {
return Ok(());
}
}
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
self.handle_committed_conf_change(entry)
}
let res = self.raft_storage.persist_applied(entry_index);
if let Err(e) = res {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => entry_index
);
}
new_applied = Some(entry_index);
Ok(())
})?;
if let Some(new_applied) = new_applied {
self.applied
.send(new_applied)
.expect("applied shouldn't ever be borrowed across yields");
}
match apply_entry_result {
SleepAndRetry => {
self.main_loop_status("blocked by raft entry");
let timeout = MainLoop::TICK * 4;
fiber::sleep(timeout);
continue;
}
EntryApplied => {
// Actually advance the iterator.
let _ = entries.next();
Ok(())
fn wake_governor_if_needed(&self, op: &Op) {
let wake_governor = match &op {
Op::Dml(op) => dml_is_governor_wakeup_worthy(op),
Op::BatchDml { ops } => ops.iter().any(dml_is_governor_wakeup_worthy),
Op::DdlPrepare { .. } => true,
_ => false,
};
#[inline(always)]
fn dml_is_governor_wakeup_worthy(op: &Dml) -> bool {
matches!(
op.space().try_into(),
Ok(ClusterwideTable::Property
| ClusterwideTable::Replicaset
| ClusterwideTable::Instance
| ClusterwideTable::Tier)
// NOTE: this may be premature, because the dml may fail to apply and/or
// the transaction may be rolled back, but we ignore this for the sake
// of simplicity, as nothing bad can happen if governor makes another
// idle iteration.
if wake_governor {
let res = global()
.expect("node must be initialized by this point")
.governor_loop
.wakeup();
if let Err(e) = res {
tlog!(Warning, "failed waking up governor: {e}");
}
}
}
/// Actions needed when applying a DML entry.
///
/// Returns `true` if entry was applied successfully.
fn handle_dml_entry(&self, op: &Dml, expelled: &mut bool) -> bool {
let space = op.space().try_into();
// In order to implement the audit log events, we have to compare
// tuples from certain system spaces before and after a DML operation.
//
// In theory, we could move this code to `do_dml`, but in practice we
// cannot do that just yet, because we aren't allowed to universally call
// `extract_key` for arbitrary tuple/space pairs due to insufficient validity
// checks on its side -- it might just crash for user input.
//
// Remeber, we're not the only ones using CaS; users may call it for their
// own spaces, thus emitting unrestricted (and unsafe) DML records.
//
// TODO: merge this into `do_dml` once `box_tuple_extract_key` is fixed.
let old = match space {
Ok(s @ (ClusterwideTable::Property | ClusterwideTable::Instance)) => {
let s = space_by_id(s.id()).expect("system space must exist");
match &op {
// There may be no previous version for inserts.
Dml::Insert { .. } => Ok(None),
Dml::Update { key, .. } => s.get(key),
Dml::Delete { key, .. } => s.get(key),
Dml::Replace { tuple, .. } => {
let tuple = Tuple::from(tuple);
let key_def =
cached_key_def(s.id(), 0).expect("index for space must be found");
let key = key_def
.extract_key(&tuple)
.expect("cas should validate operation before committing a log entry");
s.get(&key)
}
}
}
_ => Ok(None),
};
// Perform DML and combine both tuple versions into a pair.
let res = old.and_then(|old| {
Ok((old, new))
});
let initiator = op.initiator();
let initiator_def = user_by_id(initiator).expect("user must exist");
match &res {
Err(e) => {
tlog!(Error, "clusterwide dml failed: {e}");
}
// Handle insert, replace, update in _pico_instance
Ok((old, Some(new))) if space == Ok(ClusterwideTable::Instance) => {
// Dml::Delete mandates that new tuple is None.
assert!(!matches!(op, Dml::Delete { .. }));
let old: Option<Instance> =
old.as_ref().map(|x| x.decode().expect("must be Instance"));
// FIXME: we do this prematurely, because the
// transaction may still be rolled back for some reason.
let new: Instance = new
.decode()
.expect("tuple already passed format verification");
// Check if we're handling a "new node joined" event:
// * Either there's no tuple for this node in the storage;
// * Or its raft id has changed, meaning it's no longer the same node.
Egor Ivkov
committed
// WARN: this condition will not pass on the joining instance
// as it preemptively puts itself into `_pico_instance` table.
// Locally it's logged in src/lib.rs.
if old.as_ref().map(|x| x.raft_id) != Some(new.raft_id) {
let instance_name = &new.name;
message: "a new instance `{instance_name}` joined the cluster",
title: "join_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
Egor Ivkov
committed
crate::audit!(
message: "local database created on `{instance_name}`",
Egor Ivkov
committed
title: "create_local_db",
severity: Low,
instance_name: %instance_name,
Egor Ivkov
committed
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
if old.as_ref().map(|x| x.current_state) != Some(new.current_state) {
let instance_name = &new.name;
let state = &new.current_state;
message: "current state of instance `{instance_name}` changed to {state}",
instance_name: %instance_name,
initiator: &initiator_def.name,
);
}
if old.as_ref().map(|x| x.target_state) != Some(new.target_state) {
let instance_name = &new.name;
message: "target state of instance `{instance_name}` changed to {state}",
instance_name: %instance_name,
initiator: &initiator_def.name,
);
}
if has_states!(new, Expelled -> *) {
let instance_name = &new.name;
message: "instance `{instance_name}` was expelled from the cluster",
title: "expel_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
Egor Ivkov
committed
);
crate::audit!(
message: "local database dropped on `{instance_name}`",
Egor Ivkov
committed
title: "drop_local_db",
severity: Low,
instance_name: %instance_name,
Egor Ivkov
committed
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
if new.raft_id == self.raft_id() {
// cannot exit during a transaction
*expelled = true;
}
}
}
Ok(_) => {}
}
/// Is called during a transaction
fn handle_committed_normal_entry(
&mut self,
entry: traft::Entry,
expelled: &mut bool,
) -> ApplyEntryResult {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let index = entry.index;
let op = entry.into_op().unwrap_or(Op::Nop);
tlog!(Debug, "applying entry: {op}"; "index" => index);
self.wake_governor_if_needed(&op);
let storage_properties = &self.storage.properties;
// apply the operation
match op {
Op::Nop => {}
Op::BatchDml { ref ops } => {
for op in ops {
let ok = self.handle_dml_entry(op, expelled);
if !ok {
return SleepAndRetry;
}
let ok = self.handle_dml_entry(&op, expelled);
if !ok {
return SleepAndRetry;
}
}
Op::DdlPrepare {
ddl,
schema_version,
} => {
self.apply_op_ddl_prepare(ddl, schema_version)
.expect("storage should not fail");
}
Op::DdlCommit => {
let v_local = local_schema_version().expect("storage should not fail");
let v_pending = storage_properties
.pending_schema_version()
.expect("granted we don't mess up log compaction, this should not be None");
let ddl = storage_properties
.pending_schema_change()
.expect("granted we don't mess up log compaction, this should not be None");
// This instance is catching up to the cluster.
if v_local < v_pending {
// Master applies schema change at this point.

Maksim Kaitmazian
committed
// Note: Unlike RPC handler `proc_apply_schema_change`, there is no need
// for a schema change lock. When instance is catching up to the cluster,
// RPCs will be blocked waiting for the applied index from the request to
// be applied on master *, so no concurrent changes can happen.
//
// * https://git.picodata.io/picodata/picodata/picodata/-/blob/ccba5cf1956e41b31eac8cdfacd0e4344033dda1/src/rpc/ddl_apply.rs#L32
let res = rpc::ddl_apply::apply_schema_change(
&self.storage,
&ddl,
);
match res {
Err(rpc::ddl_apply::Error::Other(err)) => {
panic!("storage should not fail, but failed with: {err}")
}
Err(rpc::ddl_apply::Error::Aborted(reason)) => {
tlog!(Warning, "failed applying committed ddl operation: {reason}";
"ddl" => ?ddl,
);