Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • core/picodata
1 result
Show changes
Commits on Source (7)
......@@ -2667,6 +2667,7 @@ dependencies = [
"chrono",
"clap 3.2.25",
"comfy-table",
"diff",
"either",
"file_shred",
"futures",
......
......@@ -14,6 +14,7 @@ bytes = "1.8"
chrono = "0.4"
clap = { version = "3.0", features = ["derive", "env"] }
comfy-table = "7.1"
diff = "0.1"
either = "1.13"
file_shred = "1.1"
futures = "0.3"
......
......@@ -107,6 +107,8 @@ generate:
.PHONY: lint
lint:
cargo version
cargo fmt --check
cargo check $(LOCKED) $(MAKE_JOBSERVER_ARGS)
......
......@@ -23,7 +23,10 @@ pub trait MutexLike<T> {
}
impl<T> MutexLike<T> for TMutex<T> {
type Guard<'a> = TMutexGuard<'a, T> where T: 'a;
type Guard<'a>
= TMutexGuard<'a, T>
where
T: 'a;
fn lock(&self) -> Self::Guard<'_> {
self.lock()
......@@ -31,8 +34,10 @@ impl<T> MutexLike<T> for TMutex<T> {
}
impl<T> MutexLike<T> for RefCell<T> {
type Guard<'a> = RefMut<'a, T>
where T: 'a;
type Guard<'a>
= RefMut<'a, T>
where
T: 'a;
fn lock(&self) -> Self::Guard<'_> {
self.borrow_mut()
......
......@@ -682,6 +682,14 @@ Using configuration file '{args_path}'.");
1048576
}
#[inline]
pub fn total_bucket_count() -> u64 {
// This is value is not configurable at the moment, but this may change
// in the future. At that point this function will probably also want to
// accept a `&self` parameter, but for now it's not necessary.
3000
}
pub fn log_config_params(&self) {
for path in &leaf_field_paths::<PicodataConfig>() {
let value = self
......
......@@ -77,6 +77,7 @@ pub mod tarantool;
pub mod tier;
pub mod tlog;
pub mod to_rmpv_named;
pub mod topology_cache;
pub mod traft;
pub mod util;
pub mod vshard;
......
......@@ -2,11 +2,11 @@ use crate::error_code::ErrorCode;
use crate::has_states;
use crate::instance::InstanceName;
use crate::plugin::{rpc, PluginIdentifier};
use crate::replicaset::Replicaset;
use crate::replicaset::ReplicasetState;
use crate::schema::ServiceRouteItem;
use crate::schema::ServiceRouteKey;
use crate::tlog;
use crate::topology_cache::ServiceRouteCheck;
use crate::topology_cache::TopologyCache;
use crate::topology_cache::TopologyCacheRef;
use crate::traft::error::Error;
use crate::traft::network::ConnectionPool;
use crate::traft::node::Node;
......@@ -17,10 +17,8 @@ use picodata_plugin::util::copy_to_region;
use std::time::Duration;
use tarantool::error::BoxError;
use tarantool::error::Error as TntError;
use tarantool::error::IntoBoxError;
use tarantool::error::TarantoolErrorCode;
use tarantool::fiber;
use tarantool::tuple::Tuple;
use tarantool::tuple::TupleBuffer;
use tarantool::tuple::{RawByteBuf, RawBytes};
use tarantool::uuid::Uuid;
......@@ -56,16 +54,13 @@ pub(crate) fn send_rpc_request(
) -> Result<&'static [u8], Error> {
let node = crate::traft::node::global()?;
let pool = &node.plugin_manager.pool;
let topology = &node.topology_cache;
let timeout = Duration::from_secs_f64(timeout);
let instance_name = resolve_rpc_target(plugin_identity, service, target, node)?;
let my_instance_name = node
.raft_storage
.instance_name()?
.expect("should be persisted at this point");
let instance_name = resolve_rpc_target(plugin_identity, service, target, node, topology)?;
let my_instance_name = topology.my_instance_name();
if path.starts_with('.') {
return call_builtin_stored_proc(pool, path, input, &instance_name, timeout);
}
......@@ -189,259 +184,185 @@ pub(crate) fn encode_request_arguments(
Ok(())
}
fn get_replicaset_uuid_by_bucket_id(
tier_name: &str,
bucket_id: u64,
node: &Node,
) -> Result<String, Error> {
let res = vshard::get_replicaset_uuid_by_bucket_id(tier_name, bucket_id);
if res.is_err() && node.storage.tiers.by_name(tier_name)?.is_none() {
return Err(Error::NoSuchTier(tier_name.into()));
}
res
}
fn get_instance_name_of_master_of_replicaset(
tier_name: &str,
bucket_id: u64,
node: &Node,
) -> Result<InstanceName, Error> {
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(tier_name, bucket_id, node)?;
let tuple = node.storage.replicasets.by_uuid_raw(&replicaset_uuid)?;
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
Ok(master_name)
}
fn resolve_rpc_target(
ident: &PluginIdentifier,
plugin: &PluginIdentifier,
service: &str,
target: &FfiSafeRpcTargetSpecifier,
node: &Node,
topology: &TopologyCache,
) -> Result<InstanceName, Error> {
use FfiSafeRpcTargetSpecifier as Target;
let mut instance_name = None;
let mut replicaset_tuple = None;
let mut to_master_chosen = false;
let mut tier_and_bucket_id = None;
let mut by_replicaset_name = None;
match target {
Target::InstanceName(iid) => {
Target::InstanceName(name) => {
//
// Request to a specific instance, single candidate
//
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
instance_name = Some(InstanceName::from(unsafe { iid.as_str() }));
let instance_name = unsafe { name.as_str() };
// A single instance was chosen
check_route_to_instance(topology, plugin, service, instance_name)?;
return Ok(instance_name.into());
}
&Target::Replicaset {
replicaset_name,
to_master: true,
to_master,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let replicaset_name = unsafe { replicaset_name.as_str() };
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(master_name) = tuple
.field(Replicaset::FIELD_TARGET_MASTER_NAME)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'target_master_name' field in _pico_replicaset tuple").into());
};
instance_name = Some(master_name);
replicaset_tuple = Some(tuple);
let name = unsafe { replicaset_name.as_str() };
to_master_chosen = to_master;
by_replicaset_name = Some(name);
}
&Target::BucketId {
bucket_id,
to_master: true,
to_master,
} => {
let current_instance_tier = node
.raft_storage
.tier()?
.expect("storage for instance should exists");
let master_name =
get_instance_name_of_master_of_replicaset(&current_instance_tier, bucket_id, node)?;
instance_name = Some(master_name);
let tier = topology.my_tier_name();
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
&Target::TierAndBucketId {
to_master: true,
to_master,
tier,
bucket_id,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = unsafe { tier.as_str() };
let master_name = get_instance_name_of_master_of_replicaset(tier, bucket_id, node)?;
instance_name = Some(master_name);
to_master_chosen = to_master;
tier_and_bucket_id = Some((tier, bucket_id));
}
// These cases are handled below
#[rustfmt::skip]
Target::BucketId { to_master: false, .. }
| Target::TierAndBucketId { to_master: false, .. }
| Target::Replicaset { to_master: false, .. }
| Target::Any => {}
Target::Any => {}
}
if let Some(instance_name) = instance_name {
// A single instance was chosen
check_route_to_instance(node, ident, service, &instance_name)?;
return Ok(instance_name);
} else {
// Need to pick an instance from a group, fallthrough
let mut by_bucket_id = None;
let mut tier_and_replicaset_uuid = None;
let replicaset_uuid_owned;
if let Some((tier, bucket_id)) = tier_and_bucket_id {
// This call must be done here, because this function may yield
// but later we take a volatile reference to TopologyCache which can't be held across yields
replicaset_uuid_owned = vshard::get_replicaset_uuid_by_bucket_id(tier, bucket_id)?;
let uuid = &*replicaset_uuid_owned;
by_bucket_id = Some((tier, bucket_id, uuid));
tier_and_replicaset_uuid = Some((tier, uuid));
}
let my_instance_name = node
.raft_storage
.instance_name()?
.expect("should be persisted at this point");
let topology_ref = topology.get();
let mut all_instances_with_service = node
.storage
.service_route_table
.get_available_instances(ident, service)?;
//
// Request to replicaset master, single candidate
//
if to_master_chosen {
let mut target_master_name = None;
filter_instances_by_state(node, &mut all_instances_with_service)?;
if let Some(replicaset_name) = by_replicaset_name {
let replicaset = topology_ref.replicaset_by_name(replicaset_name)?;
target_master_name = Some(&replicaset.target_master_name);
}
#[rustfmt::skip]
if all_instances_with_service.is_empty() {
if node.storage.services.get(ident, service)?.is_none() {
return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into());
} else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not started on any instance")).into());
if let Some((_, _, replicaset_uuid)) = by_bucket_id {
let replicaset = topology_ref.replicaset_by_uuid(replicaset_uuid)?;
target_master_name = Some(&replicaset.target_master_name);
}
};
let mut tier_and_replicaset_uuid = None;
let instance_name = target_master_name.expect("set in one of ifs above");
match target {
#[rustfmt::skip]
Target::InstanceName { .. }
| Target::Replicaset { to_master: true, .. }
| Target::TierAndBucketId { to_master: true, .. }
| Target::BucketId { to_master: true, .. } => unreachable!("handled above"),
// A single instance was chosen
check_route_to_instance(topology, plugin, service, instance_name)?;
return Ok(instance_name.clone());
}
&Target::Replicaset {
replicaset_name,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let replicaset_name = unsafe { replicaset_name.as_str() };
let tuple = node.storage.replicasets.get_raw(replicaset_name)?;
let Some(found_replicaset_uuid) = tuple
.field(Replicaset::FIELD_REPLICASET_UUID)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'replicaset_uuid' field in _pico_replicaset tuple").into());
};
let Some(found_tier) = tuple
.field::<'_, String>(Replicaset::FIELD_TIER)
.map_err(IntoBoxError::into_box_error)?
else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "couldn't find 'tier' field in _pico_replicaset tuple").into());
};
tier_and_replicaset_uuid = Some((found_tier, found_replicaset_uuid));
replicaset_tuple = Some(tuple);
}
if let Some(replicaset_name) = by_replicaset_name {
let replicaset = topology_ref.replicaset_by_name(replicaset_name)?;
tier_and_replicaset_uuid = Some((&replicaset.tier, &replicaset.uuid));
}
&Target::BucketId {
bucket_id,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = node
.raft_storage
.tier()?
.expect("storage for instance should exists");
// Get list of all possible targets
let mut all_instances_with_service: Vec<_> = topology_ref
.instances_running_service(&plugin.name, &plugin.version, service)
.collect();
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?;
tier_and_replicaset_uuid = Some((tier, replicaset_uuid));
}
// Remove non-online instances
filter_instances_by_state(&topology_ref, &mut all_instances_with_service)?;
&Target::TierAndBucketId {
tier,
bucket_id,
to_master: false,
} => {
// SAFETY: it's required that argument pointers are valid for the lifetime of this function's call
let tier = unsafe { tier.as_str().to_string() };
let replicaset_uuid = get_replicaset_uuid_by_bucket_id(&tier, bucket_id, node)?;
tier_and_replicaset_uuid = Some((tier, replicaset_uuid));
#[rustfmt::skip]
if all_instances_with_service.is_empty() {
// TODO: put service definitions into TopologyCache as well
if node.storage.services.get(plugin, service)?.is_none() {
return Err(BoxError::new(ErrorCode::NoSuchService, "service '{plugin}.{service}' not found").into());
} else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not started on any instance")).into());
}
};
&Target::Any => {}
}
let my_instance_name = topology.my_instance_name();
//
// Request to any replica in replicaset, multiple candidates
//
if let Some((tier, replicaset_uuid)) = tier_and_replicaset_uuid {
// Need to pick a replica from given replicaset
let replicas = match vshard::get_replicaset_priority_list(&tier, &replicaset_uuid) {
Ok(replicas) => replicas,
Err(err) => {
if node.storage.tiers.by_name(&tier)?.is_none() {
return Err(Error::NoSuchTier(tier));
}
return Err(err);
}
};
let replicas = vshard::get_replicaset_priority_list(tier, replicaset_uuid)?;
let mut skipped_self = false;
// XXX: this shouldn't be a problem if replicasets aren't too big,
// but if they are we might want to construct a HashSet from candidates
for instance_name in replicas {
if my_instance_name == instance_name {
if my_instance_name == &*instance_name {
// Prefer someone else instead of self
skipped_self = true;
continue;
}
if all_instances_with_service.contains(&instance_name) {
if all_instances_with_service.contains(&&*instance_name) {
return Ok(instance_name);
}
}
// In case there's no other suitable candidates, fallback to calling self
if skipped_self && all_instances_with_service.contains(&my_instance_name) {
return Ok(my_instance_name);
return Ok(my_instance_name.into());
}
check_replicaset_is_not_expelled(node, &replicaset_uuid, replicaset_tuple)?;
let replicaset = topology_ref.replicaset_by_uuid(replicaset_uuid)?;
if replicaset.state == ReplicasetState::Expelled {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {replicaset_uuid} was expelled")).into());
}
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {ident}.{service}")).into());
} else {
// Need to pick any instance with the given plugin.service
let mut candidates = all_instances_with_service;
// TODO: find a better strategy then just the random one
let random_index = rand::random::<usize>() % candidates.len();
let mut instance_name = std::mem::take(&mut candidates[random_index]);
if instance_name == my_instance_name && candidates.len() > 1 {
// Prefer someone else instead of self
let index = (random_index + 1) % candidates.len();
instance_name = std::mem::take(&mut candidates[index]);
}
return Ok(instance_name);
return Err(BoxError::new(ErrorCode::ServiceNotAvailable, format!("no {replicaset_uuid} replicas are available for service {plugin}.{service}")).into());
}
//
// Request to any instance with given service, multiple candidates
//
let mut candidates = all_instances_with_service;
// TODO: find a better strategy then just the random one
let random_index = rand::random::<usize>() % candidates.len();
let mut instance_name = std::mem::take(&mut candidates[random_index]);
if instance_name == my_instance_name && candidates.len() > 1 {
// Prefer someone else instead of self
let index = (random_index + 1) % candidates.len();
instance_name = std::mem::take(&mut candidates[index]);
}
return Ok(instance_name.into());
}
fn filter_instances_by_state(
node: &Node,
instance_names: &mut Vec<InstanceName>,
topology_ref: &TopologyCacheRef,
instance_names: &mut Vec<&str>,
) -> Result<(), Error> {
let mut index = 0;
while index < instance_names.len() {
let name = &instance_names[index];
let instance = node.storage.instances.get(name)?;
let instance = topology_ref.instance_by_name(name)?;
if has_states!(instance, Expelled -> *) || !instance.may_respond() {
instance_names.swap_remove(index);
} else {
......@@ -453,63 +374,35 @@ fn filter_instances_by_state(
}
fn check_route_to_instance(
node: &Node,
ident: &PluginIdentifier,
topology: &TopologyCache,
plugin: &PluginIdentifier,
service: &str,
instance_name: &InstanceName,
instance_name: &str,
) -> Result<(), Error> {
let instance = node.storage.instances.get(instance_name)?;
let topology_ref = topology.get();
let instance = topology_ref.instance_by_name(instance_name)?;
if has_states!(instance, Expelled -> *) {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::InstanceExpelled, format!("instance named '{instance_name}' was expelled")).into());
}
if !instance.may_respond() {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::InstanceUnavaliable, format!("instance with instance_name \"{instance_name}\" can't respond due it's state"),).into());
}
let res = node.storage.service_route_table.get_raw(&ServiceRouteKey {
instance_name,
plugin_name: &ident.name,
plugin_version: &ident.version,
service_name: service,
})?;
#[rustfmt::skip]
let Some(tuple) = res else {
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{ident}.{service}' is not running on {instance_name}")).into());
};
let res = tuple
.field(ServiceRouteItem::FIELD_POISON)
.map_err(IntoBoxError::into_box_error)?;
let Some(is_poisoned) = res else {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::StorageCorrupted, "invalid contents has _pico_service_route").into());
};
if is_poisoned {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{ident}.{service}' is poisoned on {instance_name}")).into());
}
Ok(())
}
fn check_replicaset_is_not_expelled(
node: &Node,
uuid: &str,
maybe_tuple: Option<Tuple>,
) -> Result<(), Error> {
let tuple;
if let Some(t) = maybe_tuple {
tuple = t;
} else {
tuple = node.storage.replicasets.by_uuid_raw(uuid)?;
}
let check =
topology_ref.check_service_route(&plugin.name, &plugin.version, service, instance_name);
let state = tuple.field(Replicaset::FIELD_STATE)?;
let state: ReplicasetState = state.expect("replicaset should always have a state column");
if state == ReplicasetState::Expelled {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ReplicasetExpelled, format!("replicaset with id {uuid} was expelled")).into());
match check {
ServiceRouteCheck::Ok => {}
ServiceRouteCheck::RoutePoisoned => {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServicePoisoned, format!("service '{plugin}.{service}' is poisoned on {instance_name}")).into());
}
ServiceRouteCheck::ServiceNotEnabled => {
#[rustfmt::skip]
return Err(BoxError::new(ErrorCode::ServiceNotStarted, format!("service '{plugin}.{service}' is not running on {instance_name}")).into());
}
}
Ok(())
......
......@@ -665,9 +665,12 @@ pub struct ServiceRouteKey<'a> {
/// Instance name.
pub instance_name: &'a InstanceName,
}
impl<'a> Encode for ServiceRouteKey<'a> {}
////////////////////////////////////////////////////////////////////////////////
// PluginMigrationRecord
////////////////////////////////////////////////////////////////////////////////
/// Single record in _pico_plugin_migration system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PluginMigrationRecord {
......@@ -708,6 +711,10 @@ impl PluginMigrationRecord {
}
}
////////////////////////////////////////////////////////////////////////////////
// PluginConfigRecord
////////////////////////////////////////////////////////////////////////////////
/// Single record in _pico_plugin_config system table.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct PluginConfigRecord {
......
......@@ -2768,6 +2768,15 @@ impl ServiceRouteTable {
}
}
impl ToEntryIter<MP_SERDE> for ServiceRouteTable {
type Entry = ServiceRouteItem;
#[inline(always)]
fn index_iter(&self) -> tarantool::Result<IndexIterator> {
self.space.select(IteratorType::All, &())
}
}
////////////////////////////////////////////////////////////////////////////////
// PluginMigration
////////////////////////////////////////////////////////////////////////////////
......
This diff is collapsed.
......@@ -6,6 +6,7 @@
//! - processing raft `Ready` - persisting entries, communicating with other raft nodes.
use crate::access_control::user_by_id;
use crate::access_control::UserMetadata;
use crate::config::PicodataConfig;
use crate::governor;
use crate::has_states;
......@@ -35,6 +36,7 @@ use crate::storage::{local_schema_version, set_local_schema_version};
use crate::storage::{Clusterwide, ClusterwideTable, PropertyName};
use crate::system_parameter_name;
use crate::tlog;
use crate::topology_cache::TopologyCache;
use crate::traft;
use crate::traft::error::Error;
use crate::traft::network::WorkerOptions;
......@@ -55,7 +57,6 @@ use ::raft::prelude as raft;
use ::raft::storage::Storage as _;
use ::raft::Error as RaftError;
use ::raft::StateRole as RaftStateRole;
use ::raft::StorageError;
use ::raft::INVALID_ID;
use ::tarantool::error::BoxError;
use ::tarantool::error::TarantoolErrorCode;
......@@ -152,6 +153,7 @@ pub struct Node {
node_impl: Rc<Mutex<NodeImpl>>,
pub(crate) storage: Clusterwide,
pub(crate) topology_cache: Rc<TopologyCache>,
pub(crate) raft_storage: RaftSpaceAccess,
pub(crate) main_loop: MainLoop,
pub(crate) governor_loop: governor::Loop,
......@@ -210,6 +212,7 @@ impl Node {
raft_storage.clone(),
plugin_manager.clone(),
)?;
let topology_cache = node_impl.topology_cache.clone();
let raft_id = node_impl.raft_id();
let status = node_impl.status.subscribe();
......@@ -239,6 +242,7 @@ impl Node {
),
pool,
node_impl,
topology_cache,
storage,
raft_storage,
status,
......@@ -446,6 +450,7 @@ pub(crate) struct NodeImpl {
pub read_state_wakers: HashMap<LogicalClock, oneshot::Sender<RaftIndex>>,
joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>,
storage: Clusterwide,
topology_cache: Rc<TopologyCache>,
raft_storage: RaftSpaceAccess,
pool: Rc<ConnectionPool>,
lc: LogicalClock,
......@@ -461,15 +466,12 @@ impl NodeImpl {
storage: Clusterwide,
raft_storage: RaftSpaceAccess,
plugin_manager: Rc<PluginManager>,
) -> Result<Self, RaftError> {
let box_err = |e| StorageError::Other(Box::new(e));
) -> traft::Result<Self> {
let raft_id: RaftId = raft_storage
.raft_id()
.map_err(box_err)?
.raft_id()?
.expect("raft_id should be set by the time the node is being initialized");
let applied: RaftIndex = raft_storage.applied().map_err(box_err)?;
let term: RaftTerm = raft_storage.term().map_err(box_err)?;
let applied: RaftIndex = raft_storage.applied()?;
let term: RaftTerm = raft_storage.term()?;
let lc = {
let gen = raft_storage.gen().unwrap() + 1;
raft_storage.persist_gen(gen).unwrap();
......@@ -497,11 +499,14 @@ impl NodeImpl {
});
let (applied, _) = watch::channel(applied);
let topology_cache = Rc::new(TopologyCache::load(&storage, raft_id)?);
Ok(Self {
raw_node,
read_state_wakers: Default::default(),
joint_state_latch: KVCell::new(),
storage,
topology_cache,
raft_storage,
instance_reachability: pool.instance_reachability.clone(),
pool,
......@@ -788,7 +793,13 @@ impl NodeImpl {
//
// TODO: merge this into `do_dml` once `box_tuple_extract_key` is fixed.
let old = match space {
Ok(s @ (ClusterwideTable::Property | ClusterwideTable::Instance)) => {
Ok(
s @ (ClusterwideTable::Property
| ClusterwideTable::Instance
| ClusterwideTable::Replicaset
| ClusterwideTable::Tier
| ClusterwideTable::ServiceRouteTable),
) => {
let s = space_by_id(s.id()).expect("system space must exist");
match &op {
// There may be no previous version for inserts.
......@@ -814,109 +825,81 @@ impl NodeImpl {
let new = self.storage.do_dml(op)?;
Ok((old, new))
});
let initiator = op.initiator();
let initiator_def = user_by_id(initiator).expect("user must exist");
match &res {
let (old, new) = match res {
Ok(v) => v,
Err(e) => {
tlog!(Error, "clusterwide dml failed: {e}");
return false;
}
// Handle insert, replace, update in _pico_instance
Ok((old, Some(new))) if space == Ok(ClusterwideTable::Instance) => {
// Dml::Delete mandates that new tuple is None.
assert!(!matches!(op, Dml::Delete { .. }));
let old: Option<Instance> =
old.as_ref().map(|x| x.decode().expect("must be Instance"));
// FIXME: we do this prematurely, because the
// transaction may still be rolled back for some reason.
let new: Instance = new
.decode()
.expect("tuple already passed format verification");
// Check if we're handling a "new node joined" event:
// * Either there's no tuple for this node in the storage;
// * Or its raft id has changed, meaning it's no longer the same node.
// WARN: this condition will not pass on the joining instance
// as it preemptively puts itself into `_pico_instance` table.
// Locally it's logged in src/lib.rs.
if old.as_ref().map(|x| x.raft_id) != Some(new.raft_id) {
let instance_name = &new.name;
crate::audit!(
message: "a new instance `{instance_name}` joined the cluster",
title: "join_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
crate::audit!(
message: "local database created on `{instance_name}`",
title: "create_local_db",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
}
};
if old.as_ref().map(|x| x.current_state) != Some(new.current_state) {
let instance_name = &new.name;
let state = &new.current_state;
crate::audit!(
message: "current state of instance `{instance_name}` changed to {state}",
title: "change_current_state",
severity: Medium,
instance_name: %instance_name,
raft_id: %new.raft_id,
new_state: %state,
initiator: &initiator_def.name,
);
}
let Ok(space) = space else {
// Not a builtin system table, nothing left to do here
return true;
};
if old.as_ref().map(|x| x.target_state) != Some(new.target_state) {
let instance_name = &new.name;
let state = &new.target_state;
crate::audit!(
message: "target state of instance `{instance_name}` changed to {state}",
title: "change_target_state",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
new_state: %state,
initiator: &initiator_def.name,
);
}
// FIXME: all of this should be done only after the transaction is committed
// See <https://git.picodata.io/core/picodata/-/issues/1149>
match space {
ClusterwideTable::Instance => {
let old = old
.as_ref()
.map(|x| x.decode().expect("schema upgrade not supported yet"));
if has_states!(new, Expelled -> *) {
let instance_name = &new.name;
crate::audit!(
message: "instance `{instance_name}` was expelled from the cluster",
title: "expel_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
crate::audit!(
message: "local database dropped on `{instance_name}`",
title: "drop_local_db",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
let new = new
.as_ref()
.map(|x| x.decode().expect("format was already verified"));
// Handle insert, replace, update in _pico_instance
if let Some(new) = &new {
// Dml::Delete mandates that new tuple is None.
assert!(!matches!(op, Dml::Delete { .. }));
let initiator = op.initiator();
let initiator_def = user_by_id(initiator).expect("user must exist");
if new.raft_id == self.raft_id() {
do_audit_logging_for_instance_update(old.as_ref(), new, &initiator_def);
if has_states!(new, Expelled -> *) && new.raft_id == self.raft_id() {
// cannot exit during a transaction
*expelled = true;
}
}
self.topology_cache.update_instance(old, new);
}
Ok(_) => {}
ClusterwideTable::Replicaset => {
let old = old
.as_ref()
.map(|x| x.decode().expect("schema upgrade not supported yet"));
let new = new
.as_ref()
.map(|x| x.decode().expect("format was already verified"));
self.topology_cache.update_replicaset(old, new);
}
ClusterwideTable::Tier => {
let old = old
.as_ref()
.map(|x| x.decode().expect("schema upgrade not supported yet"));
let new = new
.as_ref()
.map(|x| x.decode().expect("format was already verified"));
self.topology_cache.update_tier(old, new);
}
ClusterwideTable::ServiceRouteTable => {
let old = old
.as_ref()
.map(|x| x.decode().expect("schema upgrade not supported yet"));
let new = new
.as_ref()
.map(|x| x.decode().expect("format was already verified"));
self.topology_cache.update_service_route(old, new);
}
_ => {}
}
true
......@@ -2193,6 +2176,7 @@ impl NodeImpl {
if hard_state.is_some() || !entries_to_persist.is_empty() || snapshot_data.is_some() {
let mut new_term = None;
let mut new_applied = None;
let mut received_snapshot = false;
if let Err(e) = transaction(|| -> Result<(), Error> {
self.main_loop_status("persisting hard state, entries and/or snapshot");
......@@ -2234,6 +2218,7 @@ impl NodeImpl {
self.storage
.apply_snapshot_data(&snapshot_data, is_master)?;
new_applied = Some(meta.index);
received_snapshot = true;
}
// TODO: As long as the snapshot was sent to us in response to
......@@ -2265,6 +2250,19 @@ impl NodeImpl {
.expect("applied shouldn't ever be borrowed across yields");
}
if received_snapshot {
// Need to reload the whole topology cache. We could be more
// clever about it and only update the records which changed,
// but at the moment this would require doing a full scan and
// comparing each record, which is no better than full reload.
// A better solution would be to store a raft index in each
// tuple of each global table, then we would only need to check
// if the index changed, but we don't have that..
self.topology_cache
.full_reload(&self.storage)
.expect("schema upgrade not supported yet");
}
if hard_state.is_some() {
crate::error_injection!(exit "EXIT_AFTER_RAFT_PERSISTS_HARD_STATE");
}
......@@ -2636,3 +2634,83 @@ fn proc_raft_promote() -> traft::Result<()> {
node.campaign_and_yield()?;
Ok(())
}
fn do_audit_logging_for_instance_update(
old: Option<&Instance>,
new: &Instance,
initiator_def: &UserMetadata,
) {
// Check if we're handling a "new node joined" event:
// * Either there's no tuple for this node in the storage;
// * Or its raft id has changed, meaning it's no longer the same node.
// WARN: this condition will not pass on the joining instance
// as it preemptively puts itself into `_pico_instance` table.
// Locally it's logged in src/lib.rs.
if old.map(|x| x.raft_id) != Some(new.raft_id) {
let instance_name = &new.name;
crate::audit!(
message: "a new instance `{instance_name}` joined the cluster",
title: "join_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
crate::audit!(
message: "local database created on `{instance_name}`",
title: "create_local_db",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
}
if old.map(|x| x.current_state) != Some(new.current_state) {
let instance_name = &new.name;
let state = &new.current_state;
crate::audit!(
message: "current state of instance `{instance_name}` changed to {state}",
title: "change_current_state",
severity: Medium,
instance_name: %instance_name,
raft_id: %new.raft_id,
new_state: %state,
initiator: &initiator_def.name,
);
}
if old.map(|x| x.target_state) != Some(new.target_state) {
let instance_name = &new.name;
let state = &new.target_state;
crate::audit!(
message: "target state of instance `{instance_name}` changed to {state}",
title: "change_target_state",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
new_state: %state,
initiator: &initiator_def.name,
);
}
if has_states!(new, Expelled -> *) {
let instance_name = &new.name;
crate::audit!(
message: "instance `{instance_name}` was expelled from the cluster",
title: "expel_instance",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
crate::audit!(
message: "local database dropped on `{instance_name}`",
title: "drop_local_db",
severity: Low,
instance_name: %instance_name,
raft_id: %new.raft_id,
initiator: &initiator_def.name,
);
}
}
......@@ -585,6 +585,33 @@ impl<T> std::ops::DerefMut for NoYieldsRefMut<'_, T> {
}
}
////////////////////////////////////////////////////////////////////////////////
// DebugDiff
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct DebugDiff<'a, T>(pub &'a T, pub &'a T);
impl<T> std::fmt::Display for DebugDiff<'_, T>
where
T: std::fmt::Debug,
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let old_text = format!("{:#?}", self.0);
let new_text = format!("{:#?}", self.1);
let diff = diff::lines(&old_text, &new_text);
for a in diff {
match a {
diff::Result::Left(l) => writeln!(f, "\x1b[31m-{l}\x1b[0m")?,
diff::Result::Both(a, _) => writeln!(f, " {a}")?,
diff::Result::Right(r) => writeln!(f, "\x1b[32m+{r}\x1b[0m")?,
}
}
Ok(())
}
}
////////////////////////////////////////////////////////////////////////////////
// ScopeGuard
////////////////////////////////////////////////////////////////////////////////
......
use crate::config::PicodataConfig;
use crate::instance::Instance;
use crate::instance::InstanceName;
use crate::pico_service::pico_service_password;
......@@ -9,36 +10,66 @@ use crate::storage::Clusterwide;
use crate::storage::ToEntryIter as _;
use crate::storage::TABLE_ID_BUCKET;
use crate::traft::error::Error;
use crate::traft::node;
use crate::traft::RaftId;
use crate::traft::Result;
use std::collections::HashMap;
use tarantool::space::SpaceId;
use tarantool::tlua;
/// This function **never yields**
pub fn get_replicaset_priority_list(
tier: &str,
replicaset_uuid: &str,
) -> Result<Vec<InstanceName>, Error> {
#[cfg(debug_assertions)]
let _guard = tarantool::fiber::safety::NoYieldsGuard::new();
let lua = tarantool::lua_state();
let pico: tlua::LuaTable<_> = lua
.get("pico")
.ok_or_else(|| Error::other("pico lua module disappeared"))?;
let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_priority_list")?;
func.call_with_args((tier, replicaset_uuid))
.map_err(|e| tlua::LuaError::from(e).into())
let res = func.call_with_args((tier, replicaset_uuid));
if res.is_err() {
// Check if tier exists, return corresponding error in that case
node::global()
.expect("initilized by this point")
.topology_cache
.get()
.tier_by_name(tier)?;
}
res.map_err(|e| tlua::LuaError::from(e).into())
}
/// Returns the replicaset uuid and an array of replicas in descending priority
/// order.
///
/// This function **may yield** if vshard needs to update it's bucket mapping.
pub fn get_replicaset_uuid_by_bucket_id(tier: &str, bucket_id: u64) -> Result<String, Error> {
let max_bucket_id = PicodataConfig::total_bucket_count();
if bucket_id < 1 || bucket_id > max_bucket_id {
#[rustfmt::skip]
return Err(Error::other(format!("invalid bucket id: must be within 1..{max_bucket_id}, got {bucket_id}")));
}
let lua = tarantool::lua_state();
let pico: tlua::LuaTable<_> = lua
.get("pico")
.ok_or_else(|| Error::other("pico lua module disappeared"))?;
let func: tlua::LuaFunction<_> = pico.try_get("_replicaset_uuid_by_bucket_id")?;
func.call_with_args((tier, bucket_id))
.map_err(|e| tlua::LuaError::from(e).into())
let res = func.call_with_args((tier, bucket_id));
if res.is_err() {
// Check if tier exists, return corresponding error in that case
node::global()
.expect("initilized by this point")
.topology_cache
.get()
.tier_by_name(tier)?;
}
res.map_err(|e| tlua::LuaError::from(e).into())
}
#[rustfmt::skip]
......@@ -51,6 +82,9 @@ pub struct VshardConfig {
/// Id of system table `_bucket`.
space_bucket_id: SpaceId,
/// Total number of virtual buckets on each tier.
bucket_count: u64,
/// This field is not stored in the global storage, instead
/// it is set right before the config is passed into vshard.*.cfg,
/// otherwise vshard will override it with an incorrect value.
......@@ -153,6 +187,7 @@ impl VshardConfig {
sharding,
discovery_mode: DiscoveryMode::On,
space_bucket_id: TABLE_ID_BUCKET,
bucket_count: PicodataConfig::total_bucket_count(),
}
}
......
......@@ -634,6 +634,7 @@ cluster:
}
space_bucket_id = storage_instance.eval("return box.space._bucket.id")
total_bucket_count = 3000
storage_vshard_config_explicit = storage_instance.call(
".proc_get_vshard_config", "storage"
......@@ -642,6 +643,7 @@ cluster:
discovery_mode="on",
sharding=storage_sharding,
space_bucket_id=space_bucket_id,
bucket_count=total_bucket_count,
)
storage_vshard_config_implicit = storage_instance.call(
......@@ -656,6 +658,7 @@ cluster:
discovery_mode="on",
sharding=router_sharding,
space_bucket_id=space_bucket_id,
bucket_count=total_bucket_count,
)
router_vshard_config_implicit = router_instance_1.call(
......
......@@ -2441,7 +2441,7 @@ cluster:
# Check requesting RPC to unknown bucket id
with pytest.raises(
TarantoolError,
match="Bucket 9999 cannot be found.",
match="invalid bucket id: must be within 1..3000, got 9999",
):
context = make_context()
input = dict(
......@@ -2459,7 +2459,7 @@ cluster:
context = make_context()
input = dict(
path="/ping",
tier_and_bucket_id=("undefined", 9999),
tier_and_bucket_id=("undefined", 1500),
input=msgpack.dumps([]),
)
i1.call(".proc_rpc_dispatch", "/proxy", msgpack.dumps(input), context)
......