//! This module incapsulates most of the application-specific logics.
//!
//! It's responsible for
//! - handling proposals,
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use crate::governor;
use crate::has_grades;
use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::r#loop::FlowControl;
use crate::schema::ddl_abort_on_master;
use crate::schema::{Distribution, IndexDef, SpaceDef};
use crate::storage::pico_schema_version;
use crate::storage::ToEntryIter as _;
use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
use crate::stringify_cfunc;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::notify::{notification, Notifier, Notify};
use crate::traft::op::{Ddl, Dml, Op, OpResult, PersistInstance};
use crate::traft::rpc::{ddl_apply, join, lsn, update_instance};
use crate::traft::Address;
use crate::traft::ConnectionPool;
use crate::traft::ContextCoercion as _;
use crate::traft::LogicalClock;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::traft::Topology;
use crate::unwrap_some_or;
use crate::util::instant_saturating_add;
use crate::util::AnyWithTypeName;
use crate::warn_or_panic;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber;
use ::tarantool::fiber::mutex::MutexGuard;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::index::FieldType as IFT;
use ::tarantool::index::Part;
use ::tarantool::proc;
use ::tarantool::space::FieldType as SFT;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use protobuf::Message as _;
use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
type RawNode = raft::RawNode<RaftSpaceAccess>;
::tarantool::define_str_enum! {
pub enum RaftState {
Follower = "Follower",
Candidate = "Candidate",
Leader = "Leader",
PreCandidate = "PreCandidate",
}
}
impl RaftState {
pub fn is_leader(&self) -> bool {
matches!(self, Self::Leader)
}
}
impl From<RaftStateRole> for RaftState {
fn from(role: RaftStateRole) -> Self {
match role {
RaftStateRole::Follower => Self::Follower,
RaftStateRole::Candidate => Self::Candidate,
RaftStateRole::Leader => Self::Leader,
RaftStateRole::PreCandidate => Self::PreCandidate,
}
}
}
#[derive(Copy, Clone, Debug, tlua::Push, tlua::PushInto)]
pub struct Status {
/// `raft_id` of the current instance
pub id: RaftId,
/// `raft_id` of the leader instance
pub leader_id: Option<RaftId>,
/// Current term number
pub term: RaftTerm,
/// Current raft state
pub raft_state: RaftState,
}
impl Status {
pub fn check_term(&self, requested_term: RaftTerm) -> traft::Result<()> {
if requested_term != self.term {
return Err(Error::TermMismatch {
requested: requested_term,
current: self.term,
});
}
Ok(())
}
}
/// Key is a cluster-wide space name.
type StorageWatchers = HashMap<String, watch::Sender<()>>;
/// Key is a cluster-wide space name.
type StorageChanges = HashSet<String>;
/// The heart of `traft` module - the Node.
pub struct Node {
/// RaftId of the Node.
//
// It appears twice in the Node: here and in `status.id`.
// This is a concious decision.
// `self.raft_id()` is used in Rust API, and
// `self.status()` is mostly useful in Lua API.
pub(crate) raft_id: RaftId,
node_impl: Rc<Mutex<NodeImpl>>,
pub(crate) storage: Clusterwide,
pub(crate) raft_storage: RaftSpaceAccess,
pub(crate) main_loop: MainLoop,
pub(crate) governor_loop: governor::Loop,
status: watch::Receiver<Status>,
watchers: Rc<Mutex<StorageWatchers>>,
}
impl std::fmt::Debug for Node {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Node")
.field("raft_id", &self.raft_id)
.finish_non_exhaustive()
}
}
impl Node {
/// Initialize the raft node.
/// **This function yields**
pub fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> {
let node_impl = NodeImpl::new(storage.clone(), raft_storage.clone())?;
let raft_id = node_impl.raft_id();
let status = node_impl.status.subscribe();
let node_impl = Rc::new(Mutex::new(node_impl));
let watchers = Rc::new(Mutex::new(HashMap::new()));
let node = Node {
raft_id,
main_loop: MainLoop::start(node_impl.clone(), watchers.clone()), // yields
governor_loop: governor::Loop::start(
status.clone(),
storage.clone(),
raft_storage.clone(),
),
node_impl,
storage,
raft_storage,
status,
watchers,
};
// Wait for the node to enter the main loop
node.tick_and_yield(0);
Ok(node)
}
pub fn raft_id(&self) -> RaftId {
self.raft_id
}
pub fn status(&self) -> Status {
self.status.get()
}
pub(crate) fn node_impl(&self) -> MutexGuard<NodeImpl> {
self.node_impl.lock()
}
/// Wait for the status to be changed.
/// **This function yields**
pub fn wait_status(&self) {
fiber::block_on(self.status.clone().changed()).unwrap();
}
/// **This function yields**
pub fn wait_for_read_state(&self, timeout: Duration) -> traft::Result<RaftIndex> {
let notify = self.raw_operation(|node_impl| node_impl.read_state_async())?;
fiber::block_on(notify.recv_timeout::<RaftIndex>(timeout))
}
/// Propose an operation and wait for it's result.
/// **This function yields**
pub fn propose_and_wait<T: OpResult + Into<Op>>(
&self,
op: T,
timeout: Duration,
) -> traft::Result<T::Result> {
let notify = self.raw_operation(|node_impl| node_impl.propose_async(op))?;
fiber::block_on(notify.recv_timeout::<T::Result>(timeout))
}
/// Become a candidate and wait for a main loop round so that there's a
/// chance we become the leader.
/// **This function yields**
pub fn campaign_and_yield(&self) -> traft::Result<()> {
self.raw_operation(|node_impl| node_impl.campaign())?;
// Even though we don't expect a response, we still should let the
// main_loop do an iteration. Without rescheduling, the Ready state
// wouldn't be processed, the Status wouldn't be updated, and some
// assertions may fail (e.g. in `postjoin()` in `main.rs`).
fiber::reschedule();
Ok(())
}
/// **This function yields**
pub fn step_and_yield(&self, msg: raft::Message) {
self.raw_operation(|node_impl| node_impl.step(msg))
.map_err(|e| tlog!(Error, "{e}"))
.ok();
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
}
/// **This function yields**
pub fn tick_and_yield(&self, n_times: u32) {
self.raw_operation(|node_impl| node_impl.tick(n_times));
// even though we don't expect a response, we still should let the
// main_loop do an iteration
fiber::reschedule();
}
/// **This function yields**
pub fn timeout_now(&self) {
let raft_id = self.raft_id();
self.step_and_yield(raft::Message {
to: raft_id,
from: raft_id,
msg_type: raft::MessageType::MsgTimeoutNow,
..Default::default()
})
}
/// Processes the [`join::Request`] request and appends necessary
/// entries to the raft log (if successful).
///
/// Returns the resulting [`Instance`] when the entry is committed.
///
/// Returns an error if the callee node isn't a raft leader.
///
/// **This function yields**
pub fn handle_join_request_and_wait(
&self,
req: join::Request,
) -> traft::Result<(Box<Instance>, HashSet<Address>)> {
let (notify_addr, notify_instance, replication_addresses) =
self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?;
fiber::block_on(async {
let (addr, instance) = futures::join!(notify_addr.recv_any(), notify_instance.recv());
addr?;
instance.map(|i| (Box::new(i), replication_addresses))
})
}
/// Processes the [`update_instance::Request`] request and appends
/// [`Op::PersistInstance`] entry to the raft log (if successful).
///
/// Returns `Ok(())` when the entry is committed.
///
/// Returns an error if the callee node isn't a raft leader.
///
/// **This function yields**
pub fn handle_update_instance_request_and_wait(
&self,
req: update_instance::Request,
) -> traft::Result<()> {
let notify =
self.raw_operation(|node_impl| node_impl.process_update_instance_request_async(req))?;
fiber::block_on(notify.recv_any())?;
Ok(())
}
/// Only the conf_change_loop on a leader is eligible to call this function.
///
/// **This function yields**
pub(crate) fn propose_conf_change_and_wait(
&self,
term: RaftTerm,
conf_change: raft::ConfChangeV2,
) -> traft::Result<()> {
let notify =
self.raw_operation(|node_impl| node_impl.propose_conf_change_async(term, conf_change))?;
fiber::block_on(notify).unwrap()?;
Ok(())
}
/// Attempt to transfer leadership to a given node and yield.
///
/// **This function yields**
pub fn transfer_leadership_and_yield(&self, new_leader_id: RaftId) {
self.raw_operation(|node_impl| node_impl.raw_node.transfer_leader(new_leader_id));
fiber::reschedule();
}
/// This function **may yield** if `self.node_impl` mutex is acquired.
#[inline]
#[track_caller]
fn raw_operation<R>(&self, f: impl FnOnce(&mut NodeImpl) -> R) -> R {
let mut node_impl = self.node_impl.lock();
let res = f(&mut node_impl);
drop(node_impl);
self.main_loop.wakeup();
res
}
#[inline]
pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> {
self.raft_storage.all_traft_entries()
}
/// Returns a watch which will be notified when a clusterwide space is
/// modified via the specified `index`.
///
/// You can also pass a [`ClusterwideSpace`] in which case the space's
/// primary index will be used.
#[inline(always)]
pub fn storage_watcher(&self, space: impl Into<String>) -> watch::Receiver<()> {
use std::collections::hash_map::Entry;
let mut watchers = self.watchers.lock();
match watchers.entry(space.into()) {
Entry::Vacant(entry) => {
let (tx, rx) = watch::channel(());
entry.insert(tx);
rx
}
Entry::Occupied(entry) => entry.get().subscribe(),
}
}
}
pub(crate) struct NodeImpl {
pub raw_node: RawNode,
pub notifications: HashMap<LogicalClock, Notifier>,
topology_cache: KVCell<RaftTerm, Topology>,
joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
pool: ConnectionPool,
lc: LogicalClock,
status: watch::Sender<Status>,
}
impl NodeImpl {
fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> {
let box_err = |e| StorageError::Other(Box::new(e));
let raft_id: RaftId = raft_storage.raft_id().map_err(box_err)?.unwrap();
let applied: RaftIndex = raft_storage.applied().map_err(box_err)?.unwrap_or(0);
let lc = {
let gen = raft_storage.gen().unwrap().unwrap_or(0) + 1;
raft_storage.persist_gen(gen).unwrap();
LogicalClock::new(raft_id, gen)
};
let pool = ConnectionPool::builder(storage.clone())
.handler_name(stringify_cfunc!(proc_raft_interact))
.call_timeout(MainLoop::TICK.saturating_mul(4))
.build();
let cfg = raft::Config {
id: raft_id,
applied,
pre_vote: true,
..Default::default()
};
let raw_node = RawNode::new(&cfg, raft_storage.clone(), &tlog::root())?;
let (status, _) = watch::channel(Status {
id: raft_id,
leader_id: None,
term: traft::INIT_RAFT_TERM,
raft_state: RaftState::Follower,
});
Ok(Self {
raw_node,
notifications: Default::default(),
topology_cache: KVCell::new(),
joint_state_latch: KVCell::new(),
storage,
raft_storage,
pool,
lc,
status,
})
}
fn raft_id(&self) -> RaftId {
self.raw_node.raft.id
}
/// Provides mutable access to the Topology struct which reflects
/// uncommitted state of the cluster. Ensures the node is a leader.
/// In case it's not — returns an error.
///
/// It's important to access topology through this function so that
/// new changes are consistent with uncommitted ones.
fn topology_mut(&mut self) -> Result<&mut Topology, Error> {
if self.raw_node.raft.state != RaftStateRole::Leader {
self.topology_cache.take(); // invalidate the cache
return Err(Error::NotALeader);
}
let current_term = self.raw_node.raft.term;
let topology: Topology = unwrap_some_or! {
self.topology_cache.take_or_drop(¤t_term),
{
let mut instances = vec![];
for instance @ Instance { raft_id, .. } in self.storage.instances.iter()? {
instances.push((instance, self.storage.peer_addresses.try_get(raft_id)?))
}
let replication_factor = self
.storage
.properties
.get(PropertyName::ReplicationFactor)?
.ok_or_else(|| Error::other("missing replication_factor value in storage"))?;
Topology::new(instances).with_replication_factor(replication_factor)
}
};
Ok(self.topology_cache.insert(current_term, topology))
}
pub fn read_state_async(&mut self) -> Result<Notify, RaftError> {
// In some states `raft-rs` ignores the ReadIndex request.
// Check it preliminary, don't wait for the timeout.
//
// See for details:
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2058>
// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2323>
let leader_doesnt_exist = self.raw_node.raft.leader_id == INVALID_ID;
let term_just_started = // ...
self.raw_node.raft.state == RaftStateRole::Leader
&& !self.raw_node.raft.commit_to_current_term();
if leader_doesnt_exist || term_just_started {
return Err(RaftError::ProposalDropped);
}
let (lc, notify) = self.schedule_notification();
// read_index puts this context into an Entry,
// so we've got to compose full EntryContext,
// despite single LogicalClock would be enough
let ctx = traft::EntryContextNormal::new(lc, Op::Nop);
self.raw_node.read_index(ctx.to_bytes());
Ok(notify)
}
/// **Doesn't yield**
#[inline]
// TODO: rename and document
pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
where
T: Into<Op>,
{
let (lc, notify) = self.schedule_notification();
let ctx = traft::EntryContextNormal::new(lc, op.into());
self.raw_node.propose(ctx.to_bytes(), vec![])?;
Ok(notify)
}
/// Proposes a raft entry to be appended to the log and returns raft index
/// at which it is expected to be committed unless it gets rejected.
///
/// **Doesn't yield**
pub fn propose(&mut self, op: Op) -> Result<RaftIndex, RaftError> {
self.lc.inc();
let ctx = traft::EntryContextNormal::new(self.lc, op);
self.raw_node.propose(ctx.to_bytes(), vec![])?;
let index = self.raw_node.raft.raft_log.last_index();
Ok(index)
}
pub fn campaign(&mut self) -> Result<(), RaftError> {
self.raw_node.campaign()
}
pub fn step(&mut self, msg: raft::Message) -> Result<(), RaftError> {
if msg.to != self.raft_id() {
return Ok(());
}
// TODO check it's not a MsgPropose with op::PersistInstance.
// TODO check it's not a MsgPropose with ConfChange.
self.raw_node.step(msg)
}
pub fn tick(&mut self, n_times: u32) {
for _ in 0..n_times {
self.raw_node.tick();
}
}
/// Processes the [`join::Request`] request and appends necessary entries
/// to the raft log (if successful).
///
/// Returns an error if the callee node isn't a Raft leader.
///
/// **This function doesn't yield**
pub fn process_join_request_async(
&mut self,
req: join::Request,
) -> traft::Result<(Notify, Notify, HashSet<Address>)> {
let topology = self.topology_mut()?;
let (instance, address, replication_addresses) = topology
.join(
req.instance_id,
req.replicaset_id,
req.advertise_address,
req.failure_domain,
)
.map_err(RaftError::ConfChangeError)?;
let peer_address = traft::PeerAddress {
raft_id: instance.raft_id,
address,
};
let op_addr = Dml::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail");
let op_instance = PersistInstance::new(instance);
// Important! Calling `raw_node.propose()` may result in
// `ProposalDropped` error, but the topology has already been
// modified. The correct handling of this case should be the
// following.
//
// The `topology_cache` should be preserved. It won't be fully
// consistent anymore, but that's bearable. (TODO: examine how
// the particular requests are handled). At least it doesn't
// much differ from the case of overriding the entry due to a
// re-election.
//
// On the other hand, dropping topology_cache may be much more
// harmful. Loss of the uncommitted entries could result in
// assigning the same `raft_id` to a two different nodes.
Ok((
self.propose_async(op_addr)?,
self.propose_async(op_instance)?,
replication_addresses,
))
}
/// Processes the [`update_instance::Request`] request and appends [`Op::PersistInstance`]
/// entry to the raft log (if successful).
///
/// Returns an error if the callee node isn't a Raft leader.
///
/// **This function doesn't yield**
pub fn process_update_instance_request_async(
&mut self,
req: update_instance::Request,
) -> traft::Result<Notify> {
let topology = self.topology_mut()?;
let instance = topology
.update_instance(req)
.map_err(RaftError::ConfChangeError)?;
// Important! Calling `raw_node.propose()` may result in
// `ProposalDropped` error, but the topology has already been
// modified. The correct handling of this case should be the
// following.
//
// The `topology_cache` should be preserved. It won't be fully
// consistent anymore, but that's bearable. (TODO: examine how
// the particular requests are handled). At least it doesn't
// much differ from the case of overriding the entry due to a
// re-election.
//
// On the other hand, dropping topology_cache may be much more
// harmful. Loss of the uncommitted entries could result in
// assigning the same `raft_id` to a two different nodes.
//
Ok(self.propose_async(PersistInstance::new(instance))?)
}
fn propose_conf_change_async(
&mut self,
term: RaftTerm,
conf_change: raft::ConfChangeV2,
) -> Result<oneshot::Receiver<Result<(), RaftError>>, RaftError> {
// In some states proposing a ConfChange is impossible.
// Check if there's a reason to reject it.
// Checking leadership is only needed for the
// correct latch management. It doesn't affect
// raft correctness. Checking the instance is a
// leader makes sure the proposed `ConfChange`
// is appended to the raft log immediately
// instead of sending `MsgPropose` over the
// network.
if self.raw_node.raft.state != RaftStateRole::Leader {
return Err(RaftError::ConfChangeError("not a leader".into()));
}
if term != self.raw_node.raft.term {
return Err(RaftError::ConfChangeError("raft term mismatch".into()));
}
// Without this check the node would silently ignore the conf change.
// See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026
if self.raw_node.raft.has_pending_conf() {
return Err(RaftError::ConfChangeError(
"already has pending confchange".into(),
));
}
let prev_index = self.raw_node.raft.raft_log.last_index();
self.raw_node.propose_conf_change(vec![], conf_change)?;
// Ensure the ConfChange was actually appended to the log.
// Otherwise it's a problem: current instance isn't actually a
// leader (which is impossible in theory, but we're not sure in
// practice) and sent the message to the raft network. It may
// lead to an inconsistency.
let last_index = self.raw_node.raft.raft_log.last_index();
assert_eq!(last_index, prev_index + 1);
if !self.joint_state_latch.is_empty() {
warn_or_panic!("joint state latch is locked");
}
let (tx, rx) = oneshot::channel();
self.joint_state_latch.insert(last_index, tx);
event::broadcast(Event::JointStateEnter);
Ok(rx)
}
/// Is called during a transaction
///
/// Returns `true` if wait_lsn is needed in `advance`.
fn handle_committed_entries(
&mut self,
entries: &[raft::Entry],
wake_governor: &mut bool,
expelled: &mut bool,
storage_changes: &mut StorageChanges,
) -> traft::Result<()> {
let mut entries = entries.iter().peekable();
while let Some(&entry) = entries.peek() {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
tlog!(Error, "abnormal entry: {e}"; "entry" => ?entry);
continue;
}
};
let mut wait_lsn = false;
start_transaction(|| -> tarantool::Result<()> {
let entry_index = entry.index;
match entry.entry_type {
raft::EntryType::EntryNormal => {
wait_lsn = self.handle_committed_normal_entry(
entry,
wake_governor,
expelled,
storage_changes,
);
if wait_lsn {
return Ok(());
}
}
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
self.handle_committed_conf_change(entry)
}
}
let res = self.raft_storage.persist_applied(entry_index);
event::broadcast(Event::EntryApplied);
if let Err(e) = res {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => entry_index
);
}
Ok(())
})?;
if wait_lsn {
// TODO: this shouldn't ever happen for a raft leader,
// but what if it does?
// TODO: What if about we get elected leader after wait_lsn?
if let Err(e) = self.wait_lsn() {
let timeout = MainLoop::TICK;
tlog!(
Warning,
"failed syncing with replication master: {e}, retrying in {:?}...",
timeout
);
fiber::sleep(timeout);
}
continue;
}
// Actually advance the iterator.
let _ = entries.next();
}
Ok(())
}
/// Is called during a transaction
///
/// Returns `true` if wait_lsn is needed in `advance`.
fn handle_committed_normal_entry(
&mut self,
entry: traft::Entry,
wake_governor: &mut bool,
expelled: &mut bool,
storage_changes: &mut StorageChanges,
) -> bool {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let lc = entry.lc();
let index = entry.index;
let op = entry.into_op().unwrap_or(Op::Nop);
tlog!(Debug, "applying entry: {op}"; "index" => index);
match &op {
Op::PersistInstance(PersistInstance(instance)) => {
*wake_governor = true;
storage_changes.insert(ClusterwideSpace::Instance.into());
if has_grades!(instance, Expelled -> *) && instance.raft_id == self.raft_id() {
// cannot exit during a transaction
*expelled = true;
}
}
Op::Dml(op) => {
let space = op.space();
if space == &*ClusterwideSpace::Property || space == &*ClusterwideSpace::Replicaset
{
*wake_governor = true;
}
storage_changes.insert(space.into());
}
Op::DdlPrepare { .. } => {
*wake_governor = true;
}
_ => {}
}
let storage_properties = &self.storage.properties;
// apply the operation
let mut result = Box::new(()) as Box<dyn AnyWithTypeName>;
match op {
Op::Nop => {}
Op::PersistInstance(op) => {
let instance = op.0;
self.storage.instances.put(&instance).unwrap();
result = instance as _;
}
Op::Dml(op) => {
let res = match &op {
Dml::Insert { space, tuple } => self.storage.insert(space, tuple).map(Some),
Dml::Replace { space, tuple } => self.storage.replace(space, tuple).map(Some),
Dml::Update { space, key, ops } => self.storage.update(space, key, ops),
Dml::Delete { space, key } => self.storage.delete(space, key),
};
result = Box::new(res) as _;
}
Op::DdlPrepare {
ddl,
schema_version,
} => {
self.apply_op_ddl_prepare(ddl, schema_version)
.expect("storage error");
}
Op::DdlCommit => {
let current_version_in_tarantool = pico_schema_version().expect("storage error");
let pending_version = storage_properties
.pending_schema_version()
.expect("storage error")
.expect("granted we don't mess up log compaction, this should not be None");
// This instance is catching up to the cluster and must sync
// with replication master on it's own.
if current_version_in_tarantool < pending_version {
let is_master = self.is_replicaset_master().expect("storage_error");
if !is_master {
return true; // wait_lsn
}
}
// Update pico metadata.
let ddl = storage_properties
.pending_schema_change()
.expect("storage error")
.expect("granted we don't mess up log compaction, this should not be None");
match ddl {
Ddl::CreateSpace { id, .. } => {
self.storage
.spaces
.update_operable(id, true)
.expect("storage error");
self.storage
.indexes
.update_operable(id, 0, true)
.expect("storage error");
// For now we just assume that during space creation index with id 1
// exists if and only if it is a bucket_id index.
let res = self.storage.indexes.update_operable(id, 1, true);
// TODO: maybe we should first check if this index
// exists or check the space definition if this should
// be done, but for now we just ignore the error "no such index"
let _ = res;
}
_ => {
todo!()
}
}
// Update tarantool metadata.
// This instance is catching up to the cluster and is a
// replication master, so it must apply the schema change on it's
// own.
// FIXME: copy-pasted from above
if current_version_in_tarantool < pending_version {
let is_master = self.is_replicaset_master().expect("storage_error");
if is_master {
let resp =
ddl_apply::apply_schema_change(&self.storage, &ddl, pending_version)
.expect("storage error");
match resp {
ddl_apply::Response::Abort { reason } => {
// There's no risk to brick the cluster at this point because
// the governor would have made sure the ddl applies on all
// active (at the time) instances. This instance is just catching
// up to the cluster and this failure will be at startup time.
tlog!(Critical, "failed applying already committed ddl operation: {reason}";
"ddl" => ?ddl,
);
// TODO: add support to mitigate these failures
panic!("abort of a committed operation");
}
ddl_apply::Response::Ok => {}
}
}
}
storage_properties
.delete(PropertyName::PendingSchemaChange)
.expect("storage error");
storage_properties
.delete(PropertyName::PendingSchemaVersion)
.expect("storage error");
storage_properties
.put(PropertyName::CurrentSchemaVersion, &pending_version)
.expect("storage error");
}
Op::DdlAbort => {
let current_version_in_tarantool = pico_schema_version().expect("storage error");
let pending_version: u64 = storage_properties
.pending_schema_version()
.expect("storage error")
.expect("granted we don't mess up log compaction, this should not be None");
// This condition means, schema versions must always increase
// even after an DdlAbort
if current_version_in_tarantool == pending_version {
let is_master = self.is_replicaset_master().expect("storage_error");
if !is_master {
// wait_lsn
return true;
}
}
// Update pico metadata.
let ddl = storage_properties
.pending_schema_change()
.expect("storage error")
.expect("granted we don't mess up log compaction, this should not be None");
match ddl {
Ddl::CreateSpace { id, .. } => {
self.storage.indexes.delete(id, 0).expect("storage error");
self.storage.spaces.delete(id).expect("storage error");
}
_ => {
todo!()
}
}
// Update tarantool metadata.
// FIXME: copy-pasted from above
if current_version_in_tarantool == pending_version {
let is_master = self.is_replicaset_master().expect("storage_error");
if is_master {
let current_version = storage_properties
.current_schema_version()
.expect("storage error");
ddl_abort_on_master(&ddl, current_version).expect("storage error");
}
}
storage_properties
.delete(PropertyName::PendingSchemaChange)
.expect("storage error");
storage_properties
.delete(PropertyName::PendingSchemaVersion)
.expect("storage error");
}
}
if let Some(lc) = &lc {
if let Some(notify) = self.notifications.remove(lc) {
notify.notify_ok_any(result);
}
}
if let Some(notify) = self.joint_state_latch.take_or_keep(&index) {
// It was expected to be a ConfChange entry, but it's
// normal. Raft must have overriden it, or there was
// a re-election.
let e = RaftError::ConfChangeError("rolled back".into());
let _ = notify.send(Err(e));
event::broadcast(Event::JointStateDrop);
}
false
}
fn apply_op_ddl_prepare(&self, ddl: Ddl, schema_version: u64) -> traft::Result<()> {
debug_assert!(unsafe { tarantool::ffi::tarantool::box_txn() });
match ddl.clone() {
Ddl::CreateSpace {
id,
name,
mut format,
mut primary_key,
distribution,
} => {
use ::tarantool::util::NumOrStr::*;
let mut last_pk_part_index = 0;
for pk_part in &mut primary_key {
let (index, field) = match &pk_part.field {
Num(index) => {
if *index as usize >= format.len() {
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(
Warning,
"invalid primary key part: field index {index} is out of bound"
);
continue;
}
(*index, &format[*index as usize])
}
Str(name) => {
let field_index = format.iter().zip(0..).find(|(f, _)| f.name == *name);
let Some((field, index)) = field_index else {
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(Warning, "invalid primary key part: field '{name}' not found");
continue;
};
// We store all index parts as field indexes.
pk_part.field = Num(index);
(index, field)
}
};
let Some(field_type) =
crate::schema::try_space_field_type_to_index_field_type(field.field_type) else
{
// Ddl prepare operations should be verified before being proposed,
// so this shouldn't ever happen. But ignoring this is safe anyway,
// because proc_apply_schema_change will catch the error and ddl will be aborted.
tlog!(Warning, "invalid primary key part: field type {} cannot be part of an index", field.field_type);
continue;
};
// We overwrite the one provided in the request because
// there's no reason for it to be there, we know the type
// right here.
pk_part.r#type = Some(field_type);
pk_part.is_nullable = Some(field.is_nullable);
last_pk_part_index = last_pk_part_index.max(index);
}
let primary_key_def = IndexDef {
id: 0,
name: "primary_key".into(),
space_id: id,
schema_version,
parts: primary_key,
operable: false,
// TODO: support other cases
unique: true,
local: true,
};
let res = self.storage.indexes.insert(&primary_key_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(
Warning,
"failed creating index '{}': {e}",
primary_key_def.name
);
}
match distribution {
Distribution::Global => {
// Nothing else is needed
}
Distribution::ShardedByField { .. } => {
todo!()
}
Distribution::ShardedImplicitly { .. } => {
// TODO: if primary key is not the first field or
// there's some space between key parts, we want
// bucket_id to go closer to the beginning of the tuple,
// but this will require to update primary key part
// indexes, so somebody should do that at some point.
let bucket_id_index = last_pk_part_index + 1;
format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());
let bucket_id_def = IndexDef {
id: 1,
name: "bucket_id".into(),
space_id: id,
schema_version,
parts: vec![Part::field(bucket_id_index)
.field_type(IFT::Unsigned)
.is_nullable(false)],
operable: false,
unique: false,
// TODO: support other cases
local: true,
};
let res = self.storage.indexes.insert(&bucket_id_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(
Warning,
"failed creating index '{}': {e}",
bucket_id_def.name
);
}
}
}
let space_def = SpaceDef {
id,
name,
distribution,
schema_version,
format,
operable: false,
};
let res = self.storage.spaces.insert(&space_def);
if let Err(e) = res {
// Ignore the error for now, let governor deal with it.
tlog!(Warning, "failed creating space '{}': {e}", space_def.name);
}
}
Ddl::CreateIndex {
space_id,
index_id,
by_fields,
} => {
let _ = (space_id, index_id, by_fields);
todo!();
}
Ddl::DropSpace { id } => {
let _ = id;
todo!();
}
Ddl::DropIndex { index_id, space_id } => {
let _ = (index_id, space_id);
todo!();
}
}
self.storage
.properties
.put(PropertyName::PendingSchemaChange, &ddl)?;
self.storage
.properties
.put(PropertyName::PendingSchemaVersion, &schema_version)?;
self.storage
.properties
.put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
Ok(())
}
/// Is called during a transaction
fn handle_committed_conf_change(&mut self, entry: traft::Entry) {
let mut latch_unlock = || {
if let Some(notify) = self.joint_state_latch.take() {
let _ = notify.send(Ok(()));
event::broadcast(Event::JointStateLeave);
}
};
// Beware: a tiny difference in type names (`V2` or not `V2`)
// makes a significant difference in `entry.data` binary layout and
// in joint state transitions.
// `ConfChangeTransition::Auto` implies that `ConfChangeV2` may be
// applied in an instant without entering the joint state.
let (is_joint, conf_state) = match entry.entry_type {
raft::EntryType::EntryConfChange => {
let mut cc = raft::ConfChange::default();
cc.merge_from_bytes(&entry.data).unwrap();
latch_unlock();
(false, self.raw_node.apply_conf_change(&cc).unwrap())
}
raft::EntryType::EntryConfChangeV2 => {
let mut cc = raft::ConfChangeV2::default();
cc.merge_from_bytes(&entry.data).unwrap();
// Unlock the latch when either of conditions is met:
// - conf_change will leave the joint state;
// - or it will be applied without even entering one.
let leave_joint = cc.leave_joint() || cc.enter_joint().is_none();
if leave_joint {
latch_unlock();
}
// ConfChangeTransition::Auto implies that at this
// moment raft-rs will implicitly propose another empty
// conf change that represents leaving the joint state.
(!leave_joint, self.raw_node.apply_conf_change(&cc).unwrap())
}
_ => unreachable!(),
};
let raft_id = &self.raft_id();
let voters_old = self.raft_storage.voters().unwrap().unwrap_or_default();
if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) {
if is_joint {
event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok();
} else {
event::broadcast(Event::Demoted);
}
}
self.raft_storage.persist_conf_state(&conf_state).unwrap();
}
/// Is called during a transaction
fn handle_read_states(&mut self, read_states: &[raft::ReadState]) {
for rs in read_states {
if rs.request_ctx.is_empty() {
continue;
}
let ctx = crate::unwrap_ok_or!(
traft::EntryContextNormal::from_bytes(&rs.request_ctx),
Err(e) => {
tlog!(Error, "abnormal read_state: {e}"; "read_state" => ?rs);
continue;
}
);
if let Some(notify) = self.notifications.remove(&ctx.lc) {
notify.notify_ok(rs.index);
}
}
}
/// Is called during a transaction
fn handle_messages(&mut self, messages: Vec<raft::Message>) {
for msg in messages {
if let Err(e) = self.pool.send(msg) {
tlog!(Error, "{e}");
}
}
}
/// Processes a so-called "ready state" of the [`raft::RawNode`].
///
/// This includes:
/// - Sending messages to other instances (raft nodes);
/// - Applying committed entries;
/// - Persisting uncommitted entries;
/// - Persisting hard state (term, vote, commit);
/// - Notifying pending fibers;
///
/// See also:
///
/// - <https://github.com/tikv/raft-rs/blob/v0.6.0/src/raw_node.rs#L85>
/// - or better <https://github.com/etcd-io/etcd/blob/v3.5.5/raft/node.go#L49>
///
/// This function yields.
fn advance(
&mut self,
wake_governor: &mut bool,
expelled: &mut bool,
storage_changes: &mut StorageChanges,
) {
// Get the `Ready` with `RawNode::ready` interface.
if !self.raw_node.has_ready() {
return;
}
let mut ready: raft::Ready = self.raw_node.ready();
// Send out messages to the other nodes.
self.handle_messages(ready.take_messages());
// This is a snapshot, we need to apply the snapshot at first.
let snapshot = ready.snapshot();
if !snapshot.is_empty() {
if let Err(e) = start_transaction(|| -> traft::Result<()> {
let meta = snapshot.get_metadata();
self.raft_storage.handle_snapshot_metadata(meta)?;
// FIXME: apply_snapshot_data calls truncate on clusterwide
// spaces and even though they're all local spaces doing
// truncate on them is not allowed on read_only instances.
// Related issue in tarantool:
// https://github.com/tarantool/tarantool/issues/5616
let is_readonly: bool = crate::tarantool::eval("return box.info.ro")?;
if is_readonly {
crate::tarantool::eval("box.cfg { read_only = false }")?;
}
let res = self
.storage
.apply_snapshot_data(snapshot.get_data(), !is_readonly);
if is_readonly {
crate::tarantool::exec("box.cfg { read_only = true }")?;
}
#[allow(clippy::let_unit_value)]
let _ = res?;
// TODO: As long as the snapshot was sent to us in response to
// a rejected MsgAppend (which is the only possible case
// currently), we will send a MsgAppendResponse back which will
// automatically reset our status from Snapshot to Replicate.
// But when we implement support for manual snapshot requests,
// we will have to also implement sending a MsgSnapStatus,
// to reset out status explicitly to avoid leader ignoring us
// indefinitely after that point.
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
}
if let Some(ss) = ready.ss() {
if let Err(e) = self.status.send_modify(|s| {
s.leader_id = (ss.leader_id != INVALID_ID).then_some(ss.leader_id);
s.raft_state = ss.raft_state.into();
}) {
tlog!(Warning, "failed updating node status: {e}";
"leader_id" => ss.leader_id,
"raft_state" => ?ss.raft_state,
)
}
}
self.handle_read_states(ready.read_states());
// Apply committed entries.
let res = self.handle_committed_entries(
ready.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Persist uncommitted entries in the raft log.
self.raft_storage.persist_entries(ready.entries()).unwrap();
// Raft HardState changed, and we need to persist it.
if let Some(hs) = ready.hs() {
self.raft_storage.persist_hard_state(hs).unwrap();
if let Err(e) = self.status.send_modify(|s| s.term = hs.term) {
tlog!(Warning, "failed updating current term: {e}"; "term" => hs.term)
}
}
Ok(())
}) {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
// This bunch of messages is special. It must be sent only
// AFTER the HardState, Entries and Snapshot are persisted
// to the stable storage.
self.handle_messages(ready.take_persisted_messages());
// Advance the Raft.
let mut light_rd = self.raw_node.advance(ready);
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.raft_storage.persist_commit(commit).unwrap();
}
// Apply committed entries.
let res = self.handle_committed_entries(
light_rd.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
if let Err(e) = res {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
// Advance the apply index.
self.raw_node.advance_apply();
}
fn wait_lsn(&mut self) -> traft::Result<()> {
assert!(self.raw_node.raft.state != RaftStateRole::Leader);
let my_id = self.raw_node.raft.id;
let my_instance_info = self.storage.instances.get(&my_id)?;
let replicaset_id = my_instance_info.replicaset_id;
let replicaset = self.storage.replicasets.get(&replicaset_id)?;
let replicaset = replicaset.ok_or_else(|| {
Error::other(format!("replicaset info for id {replicaset_id} not found"))
})?;
if replicaset.master_id == my_instance_info.instance_id {
return Err(Error::other("wait_lsn called on replicaset master"));
}
let master = self.storage.instances.get(&replicaset.master_id)?;
let master_uuid = master.instance_uuid;
let resp = fiber::block_on(self.pool.call(&master.raft_id, &lsn::Request {})?)?;
let target_lsn = resp.lsn;
let mut current_lsn = None;
let replication: HashMap<u64, ReplicationInfo> =
crate::tarantool::eval("return box.info.replication")?;
for r in replication.values() {
if r.uuid != master_uuid {
continue;
}
current_lsn = Some(r.lsn);
break;
}
let current_lsn = unwrap_some_or!(current_lsn, {
return Err(Error::other(format!(
"replication info is unavailable for instance with uuid \"{master_uuid}\""
)));
});
if current_lsn < target_lsn {
tlog!(Info, "blocking raft loop until replication progresses";
"target_lsn" => target_lsn,
"current_lsn" => current_lsn,
);
fiber::sleep(MainLoop::TICK * 4);
}
return Ok(());
#[derive(tlua::LuaRead)]
struct ReplicationInfo {
lsn: u64,
uuid: String,
}
}
fn is_replicaset_master(&self) -> traft::Result<bool> {
let my_raft_id = self.raw_node.raft.id;
let my_instance_info = self.storage.instances.get(&my_raft_id)?;
let replicaset_id = my_instance_info.replicaset_id;
let replicaset = self.storage.replicasets.get(&replicaset_id)?;
let res = if let Some(replicaset) = replicaset {
my_instance_info.instance_id == replicaset.master_id
} else {
// Replicaset wasn't initialized yet, fallback to lua eval
let is_ro: bool = crate::tarantool::eval("return box.info.ro")?;
!is_ro
};
Ok(res)
}
#[inline]
fn cleanup_notifications(&mut self) {
self.notifications.retain(|_, notify| !notify.is_closed());
}
/// Generates a pair of logical clock and a notification channel.
/// Logical clock is a unique identifier suitable for tagging
/// entries in raft log. Notification is broadcasted when the
/// corresponding entry is committed.
#[inline]
fn schedule_notification(&mut self) -> (LogicalClock, Notify) {
let (tx, rx) = notification();
let lc = {
self.lc.inc();
self.lc
};
self.notifications.insert(lc, tx);
(lc, rx)
}
}
pub(crate) struct MainLoop {
_loop: Option<fiber::UnitJoinHandle<'static>>,
loop_waker: watch::Sender<()>,
stop_flag: Rc<Cell<bool>>,
}
struct MainLoopArgs {
node_impl: Rc<Mutex<NodeImpl>>,
}
struct MainLoopState {
next_tick: Instant,
loop_waker: watch::Receiver<()>,
stop_flag: Rc<Cell<bool>>,
watchers: Rc<Mutex<StorageWatchers>>,
}
impl MainLoop {
pub const TICK: Duration = Duration::from_millis(100);
fn start(node_impl: Rc<Mutex<NodeImpl>>, watchers: Rc<Mutex<StorageWatchers>>) -> Self {
let (loop_waker_tx, loop_waker_rx) = watch::channel(());
let stop_flag: Rc<Cell<bool>> = Default::default();
let args = MainLoopArgs { node_impl };
let initial_state = MainLoopState {
next_tick: Instant::now(),
loop_waker: loop_waker_rx,
stop_flag: stop_flag.clone(),
watchers,
};
Self {
// implicit yield
_loop: loop_start!("raft_main_loop", Self::iter_fn, args, initial_state),
loop_waker: loop_waker_tx,
stop_flag,
}
}
pub fn wakeup(&self) {
let _ = self.loop_waker.send(());
}
async fn iter_fn(args: &MainLoopArgs, state: &mut MainLoopState) -> FlowControl {
let _ = state.loop_waker.changed().timeout(Self::TICK).await;
if state.stop_flag.take() {
return FlowControl::Break;
}
let mut node_impl = args.node_impl.lock(); // yields
if state.stop_flag.take() {
return FlowControl::Break;
}
node_impl.cleanup_notifications();
let now = Instant::now();
if now > state.next_tick {
state.next_tick = instant_saturating_add(now, Self::TICK);
node_impl.raw_node.tick();
}
let mut wake_governor = false;
let mut expelled = false;
let mut storage_changes = StorageChanges::new();
node_impl.advance(&mut wake_governor, &mut expelled, &mut storage_changes); // yields
drop(node_impl);
if state.stop_flag.take() {
return FlowControl::Break;
}
{
// node_impl lock must be dropped before this to avoid deadlocking
let mut watchers = state.watchers.lock();
for index in storage_changes {
if let Some(tx) = watchers.get(&index) {
let res = tx.send(());
if res.is_err() {
watchers.remove(&index);
}
}
}
}
if expelled {
crate::tarantool::exit(0);
}
if wake_governor {
if let Err(e) = async { global()?.governor_loop.wakeup() }.await {
tlog!(Warning, "failed waking up governor: {e}");
}
}
FlowControl::Continue
}
}
impl Drop for MainLoop {
fn drop(&mut self) {
self.stop_flag.set(true);
let _ = self.loop_waker.send(());
self._loop.take().unwrap().join(); // yields
}
}
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
assert!(
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::new(node));
}
}
pub fn global() -> traft::Result<&'static Node> {
// Uninitialized raft node is a regular case. This case may take
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}
#[proc(packed_args)]
fn proc_raft_interact(pbs: Vec<traft::MessagePb>) -> traft::Result<()> {
let node = global()?;
for pb in pbs {
node.step_and_yield(raft::Message::try_from(pb).map_err(Error::other)?);
}
Ok(())
}