Skip to content
Snippets Groups Projects
Commit eacbc2a2 authored by Kurdakov Alexander's avatar Kurdakov Alexander
Browse files

feat: implement changed sbroad traits

Changed sbroad vshard related traits and it's capabilities
about working with several vshard groups
parent ec92e7e9
No related branches found
No related tags found
1 merge request!1138feat: support of multiple vshard groups
Subproject commit 5ccdddaa1ce604febb93113c5521adf6f95c9169
Subproject commit 742bcdc2555feea200bb53c29a8411e790d15132
......@@ -76,6 +76,15 @@ function pico.update_plugin_config(plugin_name, plugin_version, service_name, ne
return pico._update_plugin_config(plugin_name, plugin_version, service_name, raw_new_config, opts)
end
pico.router = {}
local function get_router_for_tier(tier_name)
return pico.router[tier_name]
end
pico.get_router_for_tier = get_router_for_tier
function pico._replicaset_priority_list(replicaset_uuid)
local replicaset = vshard.router.internal.static_router.replicasets[replicaset_uuid]
if replicaset == nil then
......
......@@ -18,6 +18,7 @@ use sbroad::ir::Plan;
use sbroad::utils::MutexLike;
use smol_str::{format_smolstr, SmolStr, ToSmolStr};
use tarantool::fiber::Mutex;
use tarantool::session::with_su;
use std::any::Any;
......@@ -26,7 +27,7 @@ use std::rc::Rc;
use crate::sql::DEFAULT_BUCKET_COUNT;
use crate::schema::{Distribution, ShardingFn};
use crate::schema::{Distribution, ShardingFn, ADMIN_ID};
use crate::storage::{Clusterwide, ClusterwideTable};
use sbroad::executor::engine::helpers::normalize_name_from_sql;
......@@ -46,6 +47,66 @@ thread_local! {
Mutex::new(PicoRouterCache::new(DEFAULT_CAPACITY).unwrap()));
}
fn get_tier_info(tier_name: &SmolStr) -> Result<Tier, SbroadError> {
let node = node::global().map_err(|e| {
SbroadError::FailedTo(Action::Get, None, format_smolstr!("raft node: {}", e))
})?;
let tier = with_su(ADMIN_ID, || {
node.storage
.tiers
.by_name(tier_name.as_str())
.map_err(|e| {
SbroadError::FailedTo(
Action::Get,
None,
format_smolstr!("tier object by tier name: {e}"),
)
})?
.ok_or(SbroadError::NotFound(
Entity::Metadata,
format_smolstr!("tier with tier_name `{tier_name}` not found"),
))
})??;
Ok(Tier {
bucket_count: DEFAULT_BUCKET_COUNT,
name: tier.name,
})
}
fn get_current_tier_name() -> Result<String, SbroadError> {
let node = node::global().map_err(|e| {
SbroadError::FailedTo(Action::Get, None, format_smolstr!("raft node: {}", e))
})?;
let tier_name = with_su(ADMIN_ID, || {
node.raft_storage
.tier()
.map_err(|e| {
SbroadError::FailedTo(Action::Get, None, format_smolstr!("tier name: {e}"))
})?
.ok_or(SbroadError::FailedTo(
Action::Get,
None,
format_smolstr!("tier name should be persisted at instance bootstrap"),
))
})??;
Ok(tier_name)
}
#[derive(Default)]
pub struct Tier {
bucket_count: u64,
name: String,
}
impl Tier {
fn name(&self) -> Option<SmolStr> {
Some(SmolStr::from(&self.name))
}
}
pub const DEFAULT_BUCKET_COLUMN: &str = "bucket_id";
/// Get the schema version for the given space.
......@@ -78,7 +139,6 @@ pub fn get_table_version(space_name: &str) -> Result<u64, SbroadError> {
#[allow(clippy::module_name_repetitions)]
pub struct RouterRuntime {
metadata: Mutex<RouterMetadata>,
bucket_count: u64,
ir_cache: Rc<Mutex<PicoRouterCache>>,
}
......@@ -89,10 +149,8 @@ impl RouterRuntime {
/// - If the cache cannot be initialized.
pub fn new() -> Result<Self, SbroadError> {
let metadata = RouterMetadata::default();
let bucket_count = DEFAULT_BUCKET_COUNT;
let runtime = PLAN_CACHE.with(|cache| RouterRuntime {
metadata: Mutex::new(metadata),
bucket_count,
ir_cache: cache.clone(),
});
Ok(runtime)
......@@ -202,6 +260,7 @@ impl QueryCache for RouterRuntime {
impl Router for RouterRuntime {
type ParseTree = AbstractSyntaxTree;
type MetadataProvider = RouterMetadata;
type VshardImplementor = Tier;
fn metadata(&self) -> &impl MutexLike<Self::MetadataProvider> {
&self.metadata
......@@ -245,6 +304,19 @@ impl Router for RouterRuntime {
) -> Result<Vec<&'rec Value>, SbroadError> {
sharding_key_from_tuple(&*self.metadata().lock(), &space, args)
}
fn get_current_tier_name(&self) -> Result<Option<SmolStr>, SbroadError> {
Ok(Some(SmolStr::from(get_current_tier_name()?)))
}
fn get_vshard_object_by_tier(
&self,
tier_name: Option<&SmolStr>,
) -> Result<Self::VshardImplementor, SbroadError> {
let current_instance_tier_name = SmolStr::from(get_current_tier_name()?);
let tier_name = tier_name.unwrap_or(&current_instance_tier_name);
get_tier_info(tier_name)
}
}
pub(crate) fn calculate_bucket_id(tuple: &[&Value], bucket_count: u64) -> Result<u64, SbroadError> {
......@@ -280,14 +352,22 @@ pub(crate) fn calculate_bucket_id(tuple: &[&Value], bucket_count: u64) -> Result
Ok(u64::from(key.hash(&tnt_tuple)) % bucket_count + 1)
}
impl Vshard for RouterRuntime {
impl Vshard for Tier {
fn exec_ir_on_buckets(
&self,
sub_plan: ExecutionPlan,
buckets: &Buckets,
return_format: DispatchReturnFormat,
) -> Result<Box<dyn Any>, SbroadError> {
impl_exec_ir_on_buckets(self, sub_plan, buckets, return_format)
let tier_name = self.name();
impl_exec_ir_on_buckets(
self,
sub_plan,
buckets,
return_format,
DEFAULT_QUERY_TIMEOUT,
tier_name.as_ref(),
)
}
fn bucket_count(&self) -> u64 {
......@@ -312,7 +392,7 @@ impl Vshard for RouterRuntime {
}
}
impl Vshard for &RouterRuntime {
impl Vshard for &Tier {
fn bucket_count(&self) -> u64 {
self.bucket_count
}
......@@ -331,7 +411,15 @@ impl Vshard for &RouterRuntime {
buckets: &Buckets,
return_format: DispatchReturnFormat,
) -> Result<Box<dyn Any>, SbroadError> {
impl_exec_ir_on_buckets(*self, sub_plan, buckets, return_format)
let tier_name = self.name();
impl_exec_ir_on_buckets(
*self,
sub_plan,
buckets,
return_format,
DEFAULT_QUERY_TIMEOUT,
tier_name.as_ref(),
)
}
fn exec_ir_on_any_node(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment