Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (7)
Showing with 910 additions and 799 deletions
......@@ -16,6 +16,7 @@ pub trait AppError: Error {}
impl<T> AppError for T where T: Error {}
#[allow(dead_code)]
#[allow(clippy::upper_case_acronyms)]
enum DDL {
CreateSpace {
name: &'static str,
......
......@@ -7,8 +7,9 @@ use tarantool::log::SayLevel;
use tarantool::tlua;
use thiserror::Error;
use crate::failure_domain::FailureDomain;
use crate::instance::InstanceId;
use crate::replicaset::ReplicasetId;
use crate::traft::{FailureDomain, InstanceId};
use crate::util::Uppercase;
#[derive(Debug, Parser)]
......
use crate::stringify_debug;
use crate::util::Uppercase;
use std::collections::HashMap;
////////////////////////////////////////////////////////////////////////////////
/// Failure domains of a given instance.
#[derive(Default, PartialEq, Eq, Clone, serde::Deserialize, serde::Serialize)]
pub struct FailureDomain {
#[serde(flatten)]
data: HashMap<Uppercase, Uppercase>,
}
impl FailureDomain {
pub fn contains_name(&self, name: &Uppercase) -> bool {
self.data.contains_key(name)
}
pub fn names(&self) -> std::collections::hash_map::Keys<Uppercase, Uppercase> {
self.data.keys()
}
/// Empty `FailureDomain` doesn't intersect with any other `FailureDomain`
/// even with another empty one.
pub fn intersects(&self, other: &Self) -> bool {
for (name, value) in &self.data {
match other.data.get(name) {
Some(other_value) if value == other_value => {
return true;
}
_ => {}
}
}
false
}
}
impl std::fmt::Display for FailureDomain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
f.write_str("{")?;
let mut iter = self.data.iter();
if let Some((k, v)) = iter.next() {
write!(f, "{k}: {v}")?;
for (k, v) in iter {
write!(f, ", {k}: {v}")?;
}
}
f.write_str("}")?;
Ok(())
}
}
impl std::fmt::Debug for FailureDomain {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let mut ds = f.debug_struct(stringify_debug!(FailureDomain));
for (name, value) in &self.data {
ds.field(name, &**value);
}
ds.finish()
}
}
impl<I, K, V> From<I> for FailureDomain
where
I: IntoIterator<Item = (K, V)>,
Uppercase: From<K>,
Uppercase: From<V>,
{
fn from(data: I) -> Self {
Self {
data: data
.into_iter()
.map(|(k, v)| (Uppercase::from(k), Uppercase::from(v)))
.collect(),
}
}
}
impl<'a> IntoIterator for &'a FailureDomain {
type IntoIter = <&'a HashMap<Uppercase, Uppercase> as IntoIterator>::IntoIter;
type Item = <&'a HashMap<Uppercase, Uppercase> as IntoIterator>::Item;
fn into_iter(self) -> Self::IntoIter {
self.data.iter()
}
}
......@@ -4,10 +4,10 @@ use ::raft::prelude::ConfChangeType::*;
use std::collections::BTreeMap;
use std::collections::BTreeSet;
use crate::traft::CurrentGradeVariant;
use crate::traft::Instance;
use crate::has_grades;
use crate::instance::grade::TargetGradeVariant;
use crate::instance::Instance;
use crate::traft::RaftId;
use crate::traft::TargetGradeVariant;
struct RaftConf<'a> {
all: BTreeMap<RaftId, &'a Instance>,
......@@ -59,10 +59,7 @@ pub(crate) fn raft_conf_change(
};
let mut changes: Vec<raft::ConfChangeSingle> = vec![];
let not_expelled = |instance: &&Instance| !instance.is_expelled();
let target_online = |instance: &&Instance| instance.target_grade == TargetGradeVariant::Online;
let current_online =
|instance: &&Instance| instance.current_grade == CurrentGradeVariant::Online;
let not_expelled = |instance: &&Instance| has_grades!(instance, * -> not Expelled);
let cluster_size = instances.iter().filter(not_expelled).count();
let voters_needed = match cluster_size {
......@@ -92,7 +89,7 @@ pub(crate) fn raft_conf_change(
// A voter goes offline. Replace it with
// another online instance if possible.
let Some(replacement) = instances.iter().find(|instance| {
instance.has_grades(CurrentGradeVariant::Online, TargetGradeVariant::Online)
has_grades!(instance, Online -> Online)
&& !raft_conf.voters.contains(&instance.raft_id)
}) else { continue };
......@@ -138,8 +135,7 @@ pub(crate) fn raft_conf_change(
// Promote more voters
for instance in instances
.iter()
.filter(target_online)
.filter(current_online)
.filter(|instance| has_grades!(instance, Online -> Online))
{
if raft_conf.voters.len() >= voters_needed {
break;
......@@ -176,13 +172,12 @@ pub(crate) fn raft_conf_change(
#[cfg(test)]
mod tests {
use super::*;
use ::raft::prelude as raft;
use crate::traft::CurrentGradeVariant;
use crate::traft::Grade;
use crate::traft::Instance;
use crate::instance::grade::{CurrentGradeVariant, Grade};
use crate::traft::RaftId;
use crate::traft::TargetGradeVariant;
macro_rules! p {
(
......
......@@ -6,6 +6,7 @@ use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::watch;
use crate::event::{self, Event};
use crate::instance::Instance;
use crate::r#loop::FlowControl::{self, Continue};
use crate::storage::Clusterwide;
use crate::storage::ToEntryIter as _;
......@@ -16,7 +17,6 @@ use crate::traft::node::global;
use crate::traft::node::Status;
use crate::traft::raft_storage::RaftSpaceAccess;
use crate::traft::rpc::sync;
use crate::traft::Instance;
use crate::traft::Result;
use crate::unwrap_ok_or;
......
use crate::has_grades;
use crate::instance::grade::CurrentGrade;
use crate::instance::{Instance, InstanceId};
use crate::replicaset::weight;
use crate::replicaset::{Replicaset, ReplicasetId};
use crate::storage::{ClusterwideSpace, PropertyName};
use crate::tlog;
use crate::traft::op::Dml;
use crate::traft::rpc;
use crate::traft::rpc::{replication, sharding, sync, update_instance};
use crate::traft::OpDML;
use crate::traft::Result;
use crate::traft::{CurrentGrade, CurrentGradeVariant, TargetGradeVariant};
use crate::traft::{Instance, InstanceId};
use crate::traft::{RaftId, RaftIndex, RaftTerm};
use ::tarantool::space::UpdateOps;
use std::collections::HashMap;
......@@ -43,16 +44,10 @@ pub(super) fn action_plan<'i>(
// downgrading
let to_downgrade = instances
.iter()
.filter(|instance| instance.current_grade != CurrentGradeVariant::Offline)
// TODO: process them all, not just the first one
.find(|instance| {
let (target, current) = (
instance.target_grade.variant,
instance.current_grade.variant,
);
matches!(target, TargetGradeVariant::Offline)
|| !matches!(current, CurrentGradeVariant::Expelled)
&& matches!(target, TargetGradeVariant::Expelled)
has_grades!(instance, not Offline -> Offline)
|| has_grades!(instance, not Expelled -> Expelled)
});
if let Some(Instance {
raft_id,
......@@ -93,7 +88,7 @@ pub(super) fn action_plan<'i>(
};
let mut ops = UpdateOps::new();
ops.assign("master_id", &to.instance_id)?;
let op = OpDML::update(ClusterwideSpace::Replicaset, &[&to.replicaset_id], ops)?;
let op = Dml::update(ClusterwideSpace::Replicaset, &[&to.replicaset_id], ops)?;
return Ok(TransferMastership { to, rpc, op }.into());
} else {
tlog!(Warning, "replicaset master is going offline and no substitution is found";
......@@ -108,8 +103,8 @@ pub(super) fn action_plan<'i>(
// and update instance's CurrentGrade afterwards
let targets = maybe_responding(instances)
.filter(|instance| {
instance.current_grade == CurrentGradeVariant::ShardingInitialized
|| instance.current_grade == CurrentGradeVariant::Online
has_grades!(instance, ShardingInitialized -> *)
|| has_grades!(instance, Online -> *)
})
.map(|instance| &instance.instance_id)
.collect();
......@@ -125,10 +120,9 @@ pub(super) fn action_plan<'i>(
////////////////////////////////////////////////////////////////////////////
// raft sync
let to_sync = instances.iter().find(|instance| {
instance.has_grades(CurrentGradeVariant::Offline, TargetGradeVariant::Online)
|| instance.is_reincarnated()
});
let to_sync = instances
.iter()
.find(|instance| has_grades!(instance, Offline -> Online) || instance.is_reincarnated());
if let Some(Instance {
instance_id,
target_grade,
......@@ -149,9 +143,7 @@ pub(super) fn action_plan<'i>(
// create new replicaset
let to_create_replicaset = instances
.iter()
.filter(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
})
.filter(|instance| has_grades!(instance, RaftSynced -> Online))
.find(|instance| replicasets.get(&instance.replicaset_id).is_none());
if let Some(Instance {
instance_id: master_id,
......@@ -165,7 +157,7 @@ pub(super) fn action_plan<'i>(
commit,
timeout: Loop::SYNC_TIMEOUT,
};
let op = OpDML::insert(
let op = Dml::insert(
ClusterwideSpace::Replicaset,
&Replicaset {
replicaset_id: replicaset_id.clone(),
......@@ -189,9 +181,7 @@ pub(super) fn action_plan<'i>(
.iter()
// TODO: find all such instances in a given replicaset,
// not just the first one
.find(|instance| {
instance.has_grades(CurrentGradeVariant::RaftSynced, TargetGradeVariant::Online)
});
.find(|instance| has_grades!(instance, RaftSynced -> Online));
if let Some(Instance {
instance_id,
replicaset_id,
......@@ -218,7 +208,7 @@ pub(super) fn action_plan<'i>(
// init sharding
let to_shard = instances
.iter()
.filter(|i| i.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online))
.filter(|i| has_grades!(i, Replicated -> Online))
.find(|i| {
vshard_bootstrapped
|| replicasets
......@@ -257,7 +247,7 @@ pub(super) fn action_plan<'i>(
commit,
timeout: Loop::SYNC_TIMEOUT,
};
let op = OpDML::replace(
let op = Dml::replace(
ClusterwideSpace::Property,
&(PropertyName::VshardBootstrapped, true),
)?;
......@@ -279,7 +269,7 @@ pub(super) fn action_plan<'i>(
weight::State::UpToDate
};
uops.assign(weight::State::PATH, state)?;
let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
let op = Dml::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
return Ok(ProposeWeightChanges { op }.into());
}
......@@ -287,7 +277,7 @@ pub(super) fn action_plan<'i>(
// skip sharding
let to_online = instances
.iter()
.find(|i| i.has_grades(CurrentGradeVariant::Replicated, TargetGradeVariant::Online));
.find(|i| has_grades!(i, Replicated -> Online));
if let Some(Instance {
instance_id,
target_grade,
......@@ -322,7 +312,7 @@ pub(super) fn action_plan<'i>(
for replicaset_id in to_update_weights {
let mut uops = UpdateOps::new();
uops.assign(weight::State::PATH, weight::State::UpToDate)?;
let op = OpDML::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
let op = Dml::update(ClusterwideSpace::Replicaset, &[replicaset_id], uops)?;
ops.push(op);
}
return Ok(UpdateWeights { targets, rpc, ops }.into());
......@@ -330,12 +320,9 @@ pub(super) fn action_plan<'i>(
////////////////////////////////////////////////////////////////////////////
// to online
let to_online = instances.iter().find(|instance| {
instance.has_grades(
CurrentGradeVariant::ShardingInitialized,
TargetGradeVariant::Online,
)
});
let to_online = instances
.iter()
.find(|instance| has_grades!(instance, ShardingInitialized -> Online));
if let Some(Instance {
instance_id,
target_grade,
......@@ -360,7 +347,7 @@ pub(super) fn action_plan<'i>(
};
let mut ops = UpdateOps::new();
ops.assign("current_schema_version", migration_id)?;
let op = OpDML::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?;
let op = Dml::update(ClusterwideSpace::Replicaset, &[&target.replicaset_id], ops)?;
return Ok(ApplyMigration { target, rpc, op }.into());
}
......@@ -413,7 +400,7 @@ pub mod stage {
pub struct TransferMastership<'i> {
pub to: &'i Instance,
pub rpc: replication::promote::Request,
pub op: OpDML,
pub op: Dml,
}
pub struct ReconfigureShardingAndDowngrade<'i> {
......@@ -432,7 +419,7 @@ pub mod stage {
pub master_id: &'i InstanceId,
pub replicaset_id: &'i ReplicasetId,
pub rpc: replication::promote::Request,
pub op: OpDML,
pub op: Dml,
}
pub struct Replication<'i> {
......@@ -454,17 +441,17 @@ pub mod stage {
pub struct ShardingBoot<'i> {
pub target: &'i InstanceId,
pub rpc: sharding::bootstrap::Request,
pub op: OpDML,
pub op: Dml,
}
pub struct ProposeWeightChanges {
pub op: OpDML,
pub op: Dml,
}
pub struct UpdateWeights<'i> {
pub targets: Vec<&'i InstanceId>,
pub rpc: sharding::Request,
pub ops: Vec<OpDML>,
pub ops: Vec<Dml>,
}
pub struct ToOnline {
......@@ -474,7 +461,7 @@ pub mod stage {
pub struct ApplyMigration<'i> {
pub target: &'i Replicaset,
pub rpc: rpc::migration::apply::Request,
pub op: OpDML,
pub op: Dml,
}
}
}
......
use super::failure_domain::FailureDomain;
use super::replicaset::ReplicasetId;
use crate::has_grades;
use crate::traft::RaftId;
use crate::util::Transition;
use ::serde::{Deserialize, Serialize};
use ::tarantool::tlua;
use ::tarantool::tuple::Encode;
use grade::{CurrentGrade, TargetGrade};
pub mod grade;
crate::define_string_newtype! {
/// Unique id of a cluster instance.
///
/// This is a new-type style wrapper around String,
/// to distinguish it from other strings.
pub struct InstanceId(pub String);
}
////////////////////////////////////////////////////////////////////////////////
/// Serializable struct representing a member of the raft group.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct Instance {
/// Instances are identified by name.
pub instance_id: InstanceId,
pub instance_uuid: String,
/// Used for identifying raft nodes.
/// Must be unique in the raft group.
pub raft_id: RaftId,
/// Name of a replicaset the instance belongs to.
pub replicaset_id: ReplicasetId,
pub replicaset_uuid: String,
/// The cluster's mind about actual state of this instance's activity.
pub current_grade: CurrentGrade,
/// The desired state of this instance
pub target_grade: TargetGrade,
/// Instance failure domains. Instances with overlapping failure domains
/// must not be in the same replicaset.
// TODO: raft_group space is kinda bloated, maybe we should store some data
// in different spaces/not deserialize the whole tuple every time?
pub failure_domain: FailureDomain,
}
impl Encode for Instance {}
impl Instance {
/// Instance has a grade that implies it may cooperate.
/// Currently this means that target_grade is neither Offline nor Expelled.
#[inline]
pub fn may_respond(&self) -> bool {
has_grades!(self, * -> not Offline) && has_grades!(self, * -> not Expelled)
}
#[inline]
pub fn is_reincarnated(&self) -> bool {
self.current_grade.incarnation < self.target_grade.incarnation
}
/// Only used for testing.
pub(crate) fn default() -> Self {
Self {
instance_id: Default::default(),
instance_uuid: Default::default(),
raft_id: Default::default(),
replicaset_id: Default::default(),
replicaset_uuid: Default::default(),
current_grade: Default::default(),
target_grade: Default::default(),
failure_domain: Default::default(),
}
}
}
impl std::fmt::Display for Instance {
#[rustfmt::skip]
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f,
"({}, {}, {}, {}, {})",
self.instance_id,
self.raft_id,
self.replicaset_id,
Transition { from: self.current_grade, to: self.target_grade },
&self.failure_domain,
)
}
}
use ::serde::{Deserialize, Serialize};
use ::tarantool::tlua;
::tarantool::define_str_enum! {
/// Activity state of an instance.
#[derive(Default)]
pub enum CurrentGradeVariant {
/// Instance has gracefully shut down or has not been started yet.
#[default]
Offline = "Offline",
/// Instance has synced by commit index.
RaftSynced = "RaftSynced",
/// Instance has configured replication.
Replicated = "Replicated",
/// Instance has configured sharding.
ShardingInitialized = "ShardingInitialized",
/// Instance is active and is handling requests.
Online = "Online",
/// Instance has permanently removed from cluster.
Expelled = "Expelled",
}
}
::tarantool::define_str_enum! {
#[derive(Default)]
pub enum TargetGradeVariant {
/// Instance should be configured up
Online = "Online",
/// Instance should be gracefully shut down
#[default]
Offline = "Offline",
/// Instance should be removed from cluster
Expelled = "Expelled",
}
}
////////////////////////////////////////////////////////////////////////////////
macro_rules! impl_constructors {
(
$(
#[variant = $variant:expr]
$(#[$meta:meta])*
$vis:vis fn $constructor:ident(incarnation: u64) -> Self;
)+
) => {
$(
$(#[$meta])*
$vis fn $constructor(incarnation: u64) -> Self {
Self { variant: $variant, incarnation }
}
)+
};
}
/// A grade (current or target) associated with an incarnation (a monotonically
/// increasing number).
#[rustfmt::skip]
#[derive(Copy, Clone, Debug, Default, Serialize, Deserialize, PartialEq, Eq)]
#[derive(tlua::LuaRead, tlua::Push, tlua::PushInto)]
pub struct Grade<V> {
pub variant: V,
pub incarnation: u64,
}
pub type TargetGrade = Grade<TargetGradeVariant>;
impl TargetGrade {
impl_constructors! {
#[variant = TargetGradeVariant::Offline]
pub fn offline(incarnation: u64) -> Self;
#[variant = TargetGradeVariant::Online]
pub fn online(incarnation: u64) -> Self;
#[variant = TargetGradeVariant::Expelled]
pub fn expelled(incarnation: u64) -> Self;
}
}
pub type CurrentGrade = Grade<CurrentGradeVariant>;
impl CurrentGrade {
impl_constructors! {
#[variant = CurrentGradeVariant::Offline]
pub fn offline(incarnation: u64) -> Self;
#[variant = CurrentGradeVariant::RaftSynced]
pub fn raft_synced(incarnation: u64) -> Self;
#[variant = CurrentGradeVariant::Replicated]
pub fn replicated(incarnation: u64) -> Self;
#[variant = CurrentGradeVariant::ShardingInitialized]
pub fn sharding_initialized(incarnation: u64) -> Self;
#[variant = CurrentGradeVariant::Online]
pub fn online(incarnation: u64) -> Self;
#[variant = CurrentGradeVariant::Expelled]
pub fn expelled(incarnation: u64) -> Self;
}
}
impl<G: PartialEq> PartialEq<G> for Grade<G> {
fn eq(&self, other: &G) -> bool {
&self.variant == other
}
}
impl<G: std::fmt::Display> std::fmt::Display for Grade<G> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let Self {
variant,
incarnation,
} = self;
write!(f, "{variant}({incarnation})")
}
}
impl PartialEq<TargetGrade> for CurrentGrade {
fn eq(&self, other: &TargetGrade) -> bool {
self.incarnation == other.incarnation && self.variant.as_str() == other.variant.as_str()
}
}
impl From<TargetGrade> for CurrentGrade {
fn from(target_grade: TargetGrade) -> Self {
let TargetGrade {
variant,
incarnation,
} = target_grade;
let variant = match variant {
TargetGradeVariant::Online => CurrentGradeVariant::Online,
TargetGradeVariant::Offline => CurrentGradeVariant::Offline,
TargetGradeVariant::Expelled => CurrentGradeVariant::Expelled,
};
Self {
variant,
incarnation,
}
}
}
/// Check if instance's current and target grades match the specified pattern.
/// # Examples:
/// ```rust
/// // Check if current_grade == `Offline`, target_grade == `Online`
/// has_grades!(instance, Offline -> Online);
///
/// // Check if current grade == `Online`, target grade can be anything
/// has_grades!(instance, Online -> *);
///
/// // Check if target grade != `Expelled`, current grade can be anything
/// has_grades!(instance, * -> not Expelled);
///
/// // This is always `true`
/// has_grades!(instance, * -> *);
///
/// // Other combinations can also work
/// ```
#[macro_export]
macro_rules! has_grades {
// Entry rule
($instance:expr, $($tail:tt)+) => {
has_grades!(@impl $instance; current[] target[] $($tail)+)
};
// Parsing current
(@impl $i:expr; current[] target[] not $($tail:tt)+) => {
has_grades!(@impl $i; current[ ! ] target[] $($tail)+)
};
(@impl $i:expr; current[] target[] * -> $($tail:tt)+) => {
has_grades!(@impl $i; current[ true ] target[] $($tail)+)
};
(@impl $i:expr; current[ $($not:tt)? ] target[] $current:ident -> $($tail:tt)+) => {
has_grades!(@impl $i;
current[
$($not)?
matches!($i.current_grade.variant, $crate::instance::grade::CurrentGradeVariant::$current)
]
target[]
$($tail)+
)
};
// Parsing target
(@impl $i:expr; current[ $($c:tt)* ] target[] not $($tail:tt)+) => {
has_grades!(@impl $i; current[ $($c)* ] target[ ! ] $($tail)+)
};
(@impl $i:expr; current[ $($c:tt)* ] target[] *) => {
has_grades!(@impl $i; current[ $($c)* ] target[ true ])
};
(@impl $i:expr; current[ $($c:tt)* ] target[ $($not:tt)? ] $target:ident) => {
has_grades!(@impl $i;
current[ $($c)* ]
target[
$($not)?
matches!($i.target_grade.variant, $crate::instance::grade::TargetGradeVariant::$target)
]
)
};
// Terminating rule
(@impl $i:expr; current[ $($c:tt)+ ] target[ $($t:tt)+ ]) => {
$($c)+ && $($t)+
};
}
////////////////////////////////////////////////////////////////////////////////
/// tests
#[cfg(test)]
mod tests {
use super::super::Instance;
use super::{CurrentGradeVariant, TargetGradeVariant};
use crate::has_grades;
#[test]
fn has_grades() {
let mut i = Instance::default();
i.current_grade.variant = CurrentGradeVariant::Online;
i.target_grade.variant = TargetGradeVariant::Offline;
assert!(has_grades!(i, * -> *));
assert!(has_grades!(i, * -> Offline));
assert!(has_grades!(i, * -> not Online));
assert!(!has_grades!(i, * -> Online));
assert!(!has_grades!(i, * -> not Offline));
assert!(has_grades!(i, Online -> *));
assert!(has_grades!(i, Online -> Offline));
assert!(has_grades!(i, Online -> not Online));
assert!(!has_grades!(i, Online -> Online));
assert!(!has_grades!(i, Online -> not Offline));
assert!(has_grades!(i, not Offline -> *));
assert!(has_grades!(i, not Offline -> Offline));
assert!(has_grades!(i, not Offline -> not Online));
assert!(!has_grades!(i, not Offline -> Online));
assert!(!has_grades!(i, not Offline -> not Offline));
assert!(!has_grades!(i, Offline -> *));
assert!(!has_grades!(i, Offline -> Offline));
assert!(!has_grades!(i, Offline -> not Online));
assert!(!has_grades!(i, Offline -> Online));
assert!(!has_grades!(i, Offline -> not Offline));
assert!(!has_grades!(i, not Online -> *));
assert!(!has_grades!(i, not Online -> Offline));
assert!(!has_grades!(i, not Online -> not Online));
assert!(!has_grades!(i, not Online -> Online));
assert!(!has_grades!(i, not Online -> not Offline));
}
}
#![allow(clippy::needless_borrow)]
use nix::sys::signal;
use nix::sys::termios::{tcgetattr, tcsetattr, SetArg::TCSADRAIN};
use nix::sys::wait::{waitpid, WaitStatus};
......@@ -21,16 +23,21 @@ use traft::RaftSpaceAccess;
use clap::StructOpt as _;
use protobuf::Message as _;
use crate::instance::grade::TargetGradeVariant;
use crate::instance::InstanceId;
use crate::tlog::set_log_level;
use crate::traft::event::Event;
use crate::traft::{event, node, InstanceId, Migration, OpDML};
use crate::traft::{LogicalClock, RaftIndex, TargetGradeVariant};
use crate::traft::op::{self, Op};
use crate::traft::{event, node, Migration};
use crate::traft::{LogicalClock, RaftIndex};
use traft::error::Error;
mod app;
mod args;
mod discovery;
mod failure_domain;
mod governor;
mod instance;
mod ipc;
mod kvcell;
mod r#loop;
......@@ -129,14 +136,13 @@ fn picolib_setup(args: &args::Run) {
luamod.set(
"raft_propose_nop",
tlua::function0(|| {
traft::node::global()?.propose_and_wait(traft::Op::Nop, Duration::from_secs(1))
traft::node::global()?.propose_and_wait(Op::Nop, Duration::from_secs(1))
}),
);
luamod.set(
"raft_propose_info",
tlua::function1(|x: String| -> traft::Result<()> {
traft::node::global()?
.propose_and_wait(traft::Op::Info { msg: x }, Duration::from_secs(1))
traft::node::global()?.propose_and_wait(Op::Info { msg: x }, Duration::from_secs(1))
}),
);
luamod.set(
......@@ -180,10 +186,7 @@ fn picolib_setup(args: &args::Run) {
|x: String, opts: Option<ProposeEvalOpts>| -> traft::Result<()> {
let timeout = opts.and_then(|opts| opts.timeout).unwrap_or(10.0);
traft::node::global()?
.propose_and_wait(
traft::OpEvalLua { code: x },
Duration::from_secs_f64(timeout),
)
.propose_and_wait(op::EvalLua { code: x }, Duration::from_secs_f64(timeout))
.and_then(|res| res.map_err(Into::into))
},
),
......@@ -191,8 +194,7 @@ fn picolib_setup(args: &args::Run) {
luamod.set(
"raft_return_one",
tlua::function1(|timeout: f64| -> traft::Result<u8> {
traft::node::global()?
.propose_and_wait(traft::OpReturnOne, Duration::from_secs_f64(timeout))
traft::node::global()?.propose_and_wait(op::ReturnOne, Duration::from_secs_f64(timeout))
}),
);
// TODO: remove this
......@@ -378,7 +380,7 @@ fn picolib_setup(args: &args::Run) {
"add_migration",
tlua::function2(|id: u64, body: String| -> traft::Result<()> {
let migration = Migration { id, body };
let op = OpDML::insert(ClusterwideSpace::Migration, &migration)?;
let op = op::Dml::insert(ClusterwideSpace::Migration, &migration)?;
node::global()?.propose_and_wait(op, Duration::MAX)??;
Ok(())
}),
......@@ -387,7 +389,7 @@ fn picolib_setup(args: &args::Run) {
luamod.set(
"push_schema_version",
tlua::function1(|id: u64| -> traft::Result<()> {
let op = OpDML::replace(
let op = op::Dml::replace(
ClusterwideSpace::Property,
&(PropertyName::DesiredSchemaVersion, id),
)?;
......@@ -412,7 +414,7 @@ fn picolib_setup(args: &args::Run) {
return Ok(Some(current_version));
}
let op = OpDML::replace(
let op = op::Dml::replace(
ClusterwideSpace::Property,
&(PropertyName::DesiredSchemaVersion, target_version),
)?;
......@@ -834,16 +836,16 @@ fn start_boot(args: &args::Run) {
};
init_entries_push_op(
traft::OpDML::insert(
op::Dml::insert(
ClusterwideSpace::Address,
&traft::PeerAddress { raft_id, address },
)
.expect("cannot fail")
.into(),
);
init_entries_push_op(traft::OpPersistInstance::new(instance).into());
init_entries_push_op(traft::op::PersistInstance::new(instance).into());
init_entries_push_op(
OpDML::insert(
op::Dml::insert(
ClusterwideSpace::Property,
&(
PropertyName::ReplicationFactor,
......@@ -854,7 +856,7 @@ fn start_boot(args: &args::Run) {
.into(),
);
init_entries_push_op(
OpDML::insert(
op::Dml::insert(
ClusterwideSpace::Property,
&(PropertyName::DesiredSchemaVersion, 0),
)
......
......@@ -2,14 +2,14 @@ use std::time::{Duration, Instant};
use ::tarantool::fiber;
use crate::has_grades;
use crate::instance::grade::TargetGradeVariant;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::node;
use crate::traft::rpc;
use crate::traft::rpc::update_instance;
use crate::traft::CurrentGradeVariant;
use crate::traft::TargetGradeVariant;
use crate::unwrap_ok_or;
pub async fn callback() {
......@@ -35,7 +35,7 @@ pub async fn callback() {
}
);
if me.current_grade == CurrentGradeVariant::Offline {
if has_grades!(me, Offline -> *) {
tlog!(Info, "graceful shutdown succeeded");
// Dirty hack. Wait a little bit more before actually
......@@ -56,7 +56,7 @@ pub async fn callback() {
let voters_alive = voters
.iter()
.filter_map(|raft_id| node.storage.instances.get(raft_id).ok())
.filter(|instance| instance.current_grade == CurrentGradeVariant::Online)
.filter(|instance| has_grades!(instance, Online -> *))
.count();
if voters_alive < quorum {
......
use crate::traft::InstanceId;
use super::instance::InstanceId;
use ::tarantool::tlua;
use ::tarantool::tuple::Encode;
......
......@@ -2,6 +2,8 @@ use ::tarantool::index::{Index, IndexIterator, IteratorType};
use ::tarantool::space::{FieldType, Space};
use ::tarantool::tuple::{DecodeOwned, ToTupleBuffer, Tuple};
use crate::failure_domain as fd;
use crate::instance::{self, grade, Instance};
use crate::replicaset::{Replicaset, ReplicasetId};
use crate::traft;
use crate::traft::error::Error;
......@@ -357,7 +359,7 @@ impl Instances {
}
#[inline]
pub fn put(&self, instance: &traft::Instance) -> tarantool::Result<()> {
pub fn put(&self, instance: &Instance) -> tarantool::Result<()> {
self.space.replace(instance)?;
Ok(())
}
......@@ -372,7 +374,7 @@ impl Instances {
/// Find a instance by `raft_id` and return a single field specified by `F`
/// (see `InstanceFieldDef` & `instance_field` module).
#[inline(always)]
pub fn get(&self, id: &impl InstanceId) -> Result<traft::Instance> {
pub fn get(&self, id: &impl InstanceId) -> Result<Instance> {
let res = id
.find_in(self)?
.decode()
......@@ -404,7 +406,7 @@ impl Instances {
}
#[inline]
pub fn all_instances(&self) -> tarantool::Result<Vec<traft::Instance>> {
pub fn all_instances(&self) -> tarantool::Result<Vec<Instance>> {
self.space
.select(IteratorType::All, &())?
.map(|tuple| tuple.decode())
......@@ -414,7 +416,7 @@ impl Instances {
pub fn replicaset_instances(
&self,
replicaset_id: &str,
) -> tarantool::Result<EntryIter<traft::Instance>> {
) -> tarantool::Result<EntryIter<Instance>> {
let iter = self
.index_replicaset_id
.select(IteratorType::Eq, &[replicaset_id])?;
......@@ -436,7 +438,7 @@ impl Instances {
}
impl ToEntryIter for Instances {
type Entry = traft::Instance;
type Entry = Instance;
#[inline(always)]
fn index_iter(&self) -> Result<IndexIterator> {
......@@ -469,7 +471,7 @@ macro_rules! define_instance_fields {
/// and it's tarantool type is
#[doc = concat!("`", stringify!($tt_ty), "`")]
///
/// [`Instance`]: crate::traft::Instance
/// [`Instance`]: crate::instance::Instance
pub struct $field;
impl InstanceFieldDef for $field {
......@@ -515,14 +517,14 @@ macro_rules! define_instance_fields {
}
define_instance_fields! {
InstanceId : traft::InstanceId = ("instance_id", FieldType::String)
InstanceId : instance::InstanceId = ("instance_id", FieldType::String)
InstanceUuid : String = ("instance_uuid", FieldType::String)
RaftId : traft::RaftId = ("raft_id", FieldType::Unsigned)
ReplicasetId : String = ("replicaset_id", FieldType::String)
ReplicasetUuid : String = ("replicaset_uuid", FieldType::String)
CurrentGrade : traft::CurrentGrade = ("current_grade", FieldType::Array)
TargetGrade : traft::TargetGrade = ("target_grade", FieldType::Array)
FailureDomain : traft::FailureDomain = ("failure_domain", FieldType::Map)
CurrentGrade : grade::CurrentGrade = ("current_grade", FieldType::Array)
TargetGrade : grade::TargetGrade = ("target_grade", FieldType::Array)
FailureDomain : fd::FailureDomain = ("failure_domain", FieldType::Map)
}
impl tarantool::tuple::TupleIndex for InstanceField {
......@@ -595,7 +597,7 @@ impl InstanceId for RaftId {
}
}
impl InstanceId for traft::InstanceId {
impl InstanceId for instance::InstanceId {
#[inline(always)]
fn find_in(&self, instances: &Instances) -> Result<Tuple> {
instances
......@@ -761,24 +763,26 @@ macro_rules! assert_err {
inventory::submit!(crate::InnerTest {
name: "test_storage_instances",
body: || {
use traft::{CurrentGradeVariant as CurrentGrade, TargetGradeVariant as TargetGrade, InstanceId};
use crate::instance::grade::{CurrentGradeVariant as CGV, TargetGradeVariant as TGV};
use crate::instance::InstanceId;
use crate::failure_domain::FailureDomain;
let storage_instances = Instances::new().unwrap();
let space_instances = storage_instances.space.clone();
let storage_peer_addresses = PeerAddresses::new().unwrap();
let space_peer_addresses = storage_peer_addresses.space.clone();
let faildom = crate::traft::FailureDomain::from([("a", "b")]);
let faildom = FailureDomain::from([("a", "b")]);
for instance in vec![
// r1
("i1", "i1-uuid", 1u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,),
("i2", "i2-uuid", 2u64, "r1", "r1-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,),
("i1", "i1-uuid", 1u64, "r1", "r1-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,),
("i2", "i2-uuid", 2u64, "r1", "r1-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,),
// r2
("i3", "i3-uuid", 3u64, "r2", "r2-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,),
("i4", "i4-uuid", 4u64, "r2", "r2-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,),
("i3", "i3-uuid", 3u64, "r2", "r2-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,),
("i4", "i4-uuid", 4u64, "r2", "r2-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,),
// r3
("i5", "i5-uuid", 5u64, "r3", "r3-uuid", (CurrentGrade::Online, 0), (TargetGrade::Online, 0), &faildom,),
("i5", "i5-uuid", 5u64, "r3", "r3-uuid", (CGV::Online, 0), (TGV::Online, 0), &faildom,),
] {
space_instances.put(&instance).unwrap();
let (_, _, raft_id, ..) = instance;
......@@ -792,10 +796,10 @@ inventory::submit!(crate::InnerTest {
);
assert_err!(
storage_instances.put(&traft::Instance {
storage_instances.put(&Instance {
raft_id: 1,
instance_id: "i99".into(),
..traft::Instance::default()
..Instance::default()
}),
format!(
concat!(
......@@ -808,10 +812,10 @@ inventory::submit!(crate::InnerTest {
" and new tuple",
r#" - ["i99", "", 1, "", "", ["{goff}", 0], ["{tgoff}", 0], {{}}]"#,
),
gon = CurrentGrade::Online,
goff = CurrentGrade::Offline,
tgon = TargetGrade::Online,
tgoff = TargetGrade::Offline,
gon = CGV::Online,
goff = CGV::Offline,
tgon = TGV::Online,
tgoff = TGV::Offline,
)
);
......
use crate::traft::InstanceId;
use crate::instance::InstanceId;
use crate::traft::{RaftId, RaftTerm};
use ::tarantool::fiber::r#async::timeout::Expired;
use ::tarantool::tlua::LuaError;
......
This diff is collapsed.
......@@ -17,14 +17,15 @@ use std::pin::Pin;
use std::rc::Rc;
use std::time::{Duration, Instant};
use crate::instance::InstanceId;
use crate::mailbox::Mailbox;
use crate::storage::{instance_field, Clusterwide, Instances, PeerAddresses};
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::rpc::Request;
use crate::traft::RaftId;
use crate::traft::Result;
use crate::traft::{InstanceId, RaftId};
use crate::unwrap_ok_or;
use crate::util::Either::{self, Left, Right};
......
......@@ -5,58 +5,54 @@
//! - handling configuration changes,
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::proc;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
use crate::governor;
use crate::has_grades;
use crate::instance::Instance;
use crate::kvcell::KVCell;
use crate::loop_start;
use crate::r#loop::FlowControl;
use crate::storage::ToEntryIter as _;
use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName};
use crate::stringify_cfunc;
use crate::traft::ContextCoercion as _;
use crate::traft::Instance;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftTerm;
use crate::traft::{OpDML, OpPersistInstance};
use crate::unwrap_some_or;
use crate::warn_or_panic;
use protobuf::Message as _;
use crate::tlog;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::notify::{notification, Notifier, Notify};
use crate::traft::op::{Dml, Op, OpResult, PersistInstance};
use crate::traft::rpc::{join, update_instance};
use crate::traft::Address;
use crate::traft::ConnectionPool;
use crate::traft::CurrentGradeVariant;
use crate::traft::ContextCoercion as _;
use crate::traft::LogicalClock;
use crate::traft::Op;
use crate::traft::OpResult;
use crate::traft::RaftId;
use crate::traft::RaftIndex;
use crate::traft::RaftSpaceAccess;
use crate::traft::RaftTerm;
use crate::traft::Topology;
use crate::unwrap_some_or;
use crate::warn_or_panic;
use ::raft::prelude as raft;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::{TarantoolError, TransactionError};
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::{oneshot, watch};
use ::tarantool::fiber::Mutex;
use ::tarantool::proc;
use ::tarantool::tlua;
use ::tarantool::transaction::start_transaction;
use protobuf::Message as _;
use std::cell::Cell;
use std::collections::{HashMap, HashSet};
use std::convert::TryFrom;
use std::rc::Rc;
use std::time::Duration;
use std::time::Instant;
type RawNode = raft::RawNode<RaftSpaceAccess>;
......@@ -188,7 +184,7 @@ impl Node {
/// Propose an operation and wait for it's result.
/// **This function yields**
pub fn propose_and_wait<T: OpResult + Into<traft::Op>>(
pub fn propose_and_wait<T: OpResult + Into<Op>>(
&self,
op: T,
timeout: Duration,
......@@ -443,7 +439,7 @@ impl NodeImpl {
#[inline]
pub fn propose_async<T>(&mut self, op: T) -> Result<Notify, RaftError>
where
T: Into<traft::Op>,
T: Into<Op>,
{
let (lc, notify) = self.schedule_notification();
let ctx = traft::EntryContextNormal::new(lc, op.into());
......@@ -494,8 +490,8 @@ impl NodeImpl {
raft_id: instance.raft_id,
address,
};
let op_addr = OpDML::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail");
let op_instance = OpPersistInstance::new(instance);
let op_addr = Dml::replace(ClusterwideSpace::Address, &peer_address).expect("can't fail");
let op_instance = PersistInstance::new(instance);
// Important! Calling `raw_node.propose()` may result in
// `ProposalDropped` error, but the topology has already been
// modified. The correct handling of this case should be the
......@@ -546,7 +542,7 @@ impl NodeImpl {
// harmful. Loss of the uncommitted entries could result in
// assigning the same `raft_id` to a two different nodes.
//
Ok(self.propose_async(OpPersistInstance::new(instance))?)
Ok(self.propose_async(PersistInstance::new(instance))?)
}
fn propose_conf_change_async(
......@@ -651,19 +647,17 @@ impl NodeImpl {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let lc = entry.lc();
let index = entry.index;
let op = entry.into_op().unwrap_or(traft::Op::Nop);
let op = entry.into_op().unwrap_or(Op::Nop);
match &op {
traft::Op::PersistInstance(OpPersistInstance(instance)) => {
Op::PersistInstance(PersistInstance(instance)) => {
*wake_governor = true;
if instance.current_grade == CurrentGradeVariant::Expelled
&& instance.raft_id == self.raft_id()
{
if has_grades!(instance, Expelled -> *) && instance.raft_id == self.raft_id() {
// cannot exit during a transaction
*expelled = true;
}
}
traft::Op::Dml(op)
Op::Dml(op)
if matches!(
op.space(),
ClusterwideSpace::Property | ClusterwideSpace::Replicaset
......
use crate::instance::Instance;
use crate::storage;
use crate::storage::ClusterwideSpace;
use crate::util::AnyWithTypeName;
use ::tarantool::tlua::LuaError;
use ::tarantool::tuple::{ToTupleBuffer, Tuple, TupleBuffer};
use serde::{Deserialize, Serialize};
////////////////////////////////////////////////////////////////////////////////
// OpResult
////////////////////////////////////////////////////////////////////////////////
pub trait OpResult {
type Result: 'static;
// FIXME: this signature makes it look like result of any operation depends
// only on what is contained within the operation which is almost never true
// And it makes it hard to do anything useful inside this function.
fn result(self) -> Self::Result;
}
////////////////////////////////////////////////////////////////////////////////
/// The operation on the raft state machine.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Op {
/// No operation.
Nop,
/// Print the message in tarantool log.
Info { msg: String },
/// Evaluate the code on every instance in cluster.
EvalLua(EvalLua),
///
ReturnOne(ReturnOne),
/// Update the given instance's entry in [`storage::Instances`].
PersistInstance(PersistInstance),
/// Cluster-wide data modification operation.
/// Should be used to manipulate the cluster-wide configuration.
Dml(Dml),
}
impl std::fmt::Display for Op {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
return match self {
Self::Nop => f.write_str("Nop"),
Self::Info { msg } => write!(f, "Info({msg:?})"),
Self::EvalLua(EvalLua { code }) => write!(f, "EvalLua({code:?})"),
Self::ReturnOne(_) => write!(f, "ReturnOne"),
Self::PersistInstance(PersistInstance(instance)) => {
write!(f, "PersistInstance{}", instance)
}
Self::Dml(Dml::Insert { space, tuple }) => {
write!(f, "Insert({space}, {})", DisplayAsJson(tuple))
}
Self::Dml(Dml::Replace { space, tuple }) => {
write!(f, "Replace({space}, {})", DisplayAsJson(tuple))
}
Self::Dml(Dml::Update { space, key, ops }) => {
let key = DisplayAsJson(key);
let ops = DisplayAsJson(&**ops);
write!(f, "Update({space}, {key}, {ops})")
}
Self::Dml(Dml::Delete { space, key }) => {
write!(f, "Delete({space}, {})", DisplayAsJson(key))
}
};
struct DisplayAsJson<T>(pub T);
impl std::fmt::Display for DisplayAsJson<&TupleBuffer> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
if let Some(data) = rmp_serde::from_slice::<serde_json::Value>(self.0.as_ref())
.ok()
.and_then(|v| serde_json::to_string(&v).ok())
{
return write!(f, "{data}");
}
write!(f, "{:?}", self.0)
}
}
impl std::fmt::Display for DisplayAsJson<&[TupleBuffer]> {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
write!(f, "[")?;
if let Some(elem) = self.0.first() {
write!(f, "{}", DisplayAsJson(elem))?;
}
for elem in self.0.iter().skip(1) {
write!(f, ", {}", DisplayAsJson(elem))?;
}
write!(f, "]")
}
}
}
}
impl Op {
pub fn on_commit(self, instances: &storage::Instances) -> Box<dyn AnyWithTypeName> {
match self {
Self::Nop => Box::new(()),
Self::Info { msg } => {
crate::tlog!(Info, "{msg}");
Box::new(())
}
Self::EvalLua(op) => Box::new(op.result()),
Self::ReturnOne(op) => Box::new(op.result()),
Self::PersistInstance(op) => {
let instance = op.result();
instances.put(&instance).unwrap();
instance
}
Self::Dml(op) => Box::new(op.result()),
}
}
}
impl OpResult for Op {
type Result = ();
fn result(self) -> Self::Result {}
}
////////////////////////////////////////////////////////////////////////////////
// ReturnOne
////////////////////////////////////////////////////////////////////////////////
impl From<ReturnOne> for Op {
fn from(op: ReturnOne) -> Op {
Op::ReturnOne(op)
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct ReturnOne;
impl OpResult for ReturnOne {
type Result = u8;
fn result(self) -> Self::Result {
1
}
}
////////////////////////////////////////////////////////////////////////////////
// EvalLua
////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct EvalLua {
pub code: String,
}
impl OpResult for EvalLua {
type Result = Result<(), LuaError>;
fn result(self) -> Self::Result {
crate::tarantool::exec(&self.code)
}
}
impl From<EvalLua> for Op {
fn from(op: EvalLua) -> Op {
Op::EvalLua(op)
}
}
////////////////////////////////////////////////////////////////////////////////
// PersistInstance
////////////////////////////////////////////////////////////////////////////////
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct PersistInstance(pub Box<Instance>);
impl PersistInstance {
pub fn new(instance: Instance) -> Self {
Self(Box::new(instance))
}
}
impl OpResult for PersistInstance {
type Result = Box<Instance>;
fn result(self) -> Self::Result {
self.0
}
}
impl From<PersistInstance> for Op {
#[inline]
fn from(op: PersistInstance) -> Op {
Op::PersistInstance(op)
}
}
////////////////////////////////////////////////////////////////////////////////
// Dml
////////////////////////////////////////////////////////////////////////////////
/// Cluster-wide data modification operation.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub enum Dml {
Insert {
space: ClusterwideSpace,
#[serde(with = "serde_bytes")]
tuple: TupleBuffer,
},
Replace {
space: ClusterwideSpace,
#[serde(with = "serde_bytes")]
tuple: TupleBuffer,
},
Update {
space: ClusterwideSpace,
#[serde(with = "serde_bytes")]
key: TupleBuffer,
#[serde(with = "vec_of_raw_byte_buf")]
ops: Vec<TupleBuffer>,
},
Delete {
space: ClusterwideSpace,
#[serde(with = "serde_bytes")]
key: TupleBuffer,
},
}
impl OpResult for Dml {
type Result = tarantool::Result<Option<Tuple>>;
fn result(self) -> Self::Result {
match self {
Self::Insert { space, tuple } => space.insert(&tuple).map(Some),
Self::Replace { space, tuple } => space.replace(&tuple).map(Some),
Self::Update { space, key, ops } => space.update(&key, &ops),
Self::Delete { space, key } => space.delete(&key),
}
}
}
impl From<Dml> for Op {
fn from(op: Dml) -> Op {
Op::Dml(op)
}
}
impl Dml {
/// Serializes `tuple` and returns an [`Dml::Insert`] in case of success.
pub fn insert(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Insert {
space,
tuple: tuple.to_tuple_buffer()?,
};
Ok(res)
}
/// Serializes `tuple` and returns an [`Dml::Replace`] in case of success.
pub fn replace(space: ClusterwideSpace, tuple: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Replace {
space,
tuple: tuple.to_tuple_buffer()?,
};
Ok(res)
}
/// Serializes `key` and returns an [`Dml::Update`] in case of success.
pub fn update(
space: ClusterwideSpace,
key: &impl ToTupleBuffer,
ops: impl Into<Vec<TupleBuffer>>,
) -> tarantool::Result<Self> {
let res = Self::Update {
space,
key: key.to_tuple_buffer()?,
ops: ops.into(),
};
Ok(res)
}
/// Serializes `key` and returns an [`Dml::Delete`] in case of success.
pub fn delete(space: ClusterwideSpace, key: &impl ToTupleBuffer) -> tarantool::Result<Self> {
let res = Self::Delete {
space,
key: key.to_tuple_buffer()?,
};
Ok(res)
}
#[rustfmt::skip]
pub fn space(&self) -> &ClusterwideSpace {
match &self {
Self::Insert { space, .. } => space,
Self::Replace { space, .. } => space,
Self::Update { space, .. } => space,
Self::Delete { space, .. } => space,
}
}
}
mod vec_of_raw_byte_buf {
use super::TupleBuffer;
use serde::de::Error as _;
use serde::ser::SerializeSeq;
use serde::{self, Deserialize, Deserializer, Serializer};
use serde_bytes::{ByteBuf, Bytes};
use std::convert::TryFrom;
pub fn serialize<S>(v: &[TupleBuffer], ser: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut seq = ser.serialize_seq(Some(v.len()))?;
for buf in v {
seq.serialize_element(Bytes::new(buf.as_ref()))?;
}
seq.end()
}
pub fn deserialize<'de, D>(de: D) -> Result<Vec<TupleBuffer>, D::Error>
where
D: Deserializer<'de>,
{
let tmp = Vec::<ByteBuf>::deserialize(de)?;
// FIXME(gmoshkin): redundant copy happens here,
// because ByteBuf and TupleBuffer are essentially the same struct,
// but there's no easy foolproof way
// to convert a Vec<ByteBuf> to Vec<TupleBuffer>
// because of borrow and drop checkers
let res: tarantool::Result<_> = tmp
.into_iter()
.map(|bb| TupleBuffer::try_from(bb.into_vec()))
.collect();
res.map_err(D::Error::custom)
}
}
use crate::traft;
use crate::instance::grade::TargetGradeVariant;
use crate::instance::InstanceId;
use crate::traft::Result;
use crate::traft::{error::Error, node, rpc::update_instance, InstanceId};
use crate::traft::{error::Error, node, rpc::update_instance};
crate::define_rpc_request! {
fn proc_expel_on_leader(req: Request) -> Result<Response> {
......@@ -23,7 +24,7 @@ crate::define_rpc_request! {
}
let req = update_instance::Request::new(req.instance_id, req.cluster_id)
.with_target_grade(traft::TargetGradeVariant::Expelled);
.with_target_grade(TargetGradeVariant::Expelled);
node.handle_update_instance_request_and_wait(req)?;
Ok(Response {})
......
use crate::failure_domain::FailureDomain;
use crate::instance::{Instance, InstanceId};
use crate::replicaset::ReplicasetId;
use crate::storage::ToEntryIter as _;
use crate::traft::{
error::Error, node, Address, FailureDomain, Instance, InstanceId, PeerAddress, Result,
};
use crate::traft::{error::Error, node, Address, PeerAddress, Result};
#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)]
pub struct OkResponse {
......
use crate::failure_domain::FailureDomain;
use crate::instance::grade::{CurrentGrade, TargetGradeVariant};
use crate::instance::InstanceId;
use crate::tlog;
use crate::traft::FailureDomain;
use crate::traft::Result;
use crate::traft::{error::Error, node, InstanceId};
use crate::traft::{CurrentGrade, TargetGradeVariant};
use crate::traft::{error::Error, node};
crate::define_rpc_request! {
fn proc_update_instance(req: Request) -> Result<Response> {
......