Skip to content
Snippets Groups Projects
Commit 440ac8e0 authored by Emir Vildanov's avatar Emir Vildanov
Browse files

feat: add new Statistics trait for sbroad engines

parent cb8f67b6
No related branches found
No related tags found
1 merge request!1414sbroad import
use std::any::Any; use std::any::Any;
use std::cell::{Ref, RefCell}; use std::cell::{Ref, RefCell};
use std::collections::HashMap; use std::collections::HashMap;
use std::rc::Rc;
use sbroad::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}; use sbroad::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
use sbroad::cbo::{TableColumnPair, TableStats};
use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::errors::{Action, Entity, SbroadError};
use sbroad::executor::bucket::Buckets; use sbroad::executor::bucket::Buckets;
use sbroad::executor::engine::{ use sbroad::executor::engine::{
normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration, normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration,
Coordinator, CoordinatorMetadata, Coordinator, CoordinatorMetadata, InitialColumnStats, Statistics,
}; };
use sbroad::executor::hash::bucket_id_by_tuple; use sbroad::executor::hash::bucket_id_by_tuple;
use sbroad::executor::ir::ExecutionPlan; use sbroad::executor::ir::ExecutionPlan;
...@@ -31,13 +33,6 @@ pub struct RouterConfigurationMock { ...@@ -31,13 +33,6 @@ pub struct RouterConfigurationMock {
} }
impl CoordinatorMetadata for RouterConfigurationMock { impl CoordinatorMetadata for RouterConfigurationMock {
/// Get Table by its name that contains:
/// * list of the columns,
/// * distribution key of the output tuples (column positions),
/// * table name.
///
/// # Errors
/// - Failed to get table by name from the metadata.
fn get_table_segment(&self, table_name: &str) -> Result<Table, SbroadError> { fn get_table_segment(&self, table_name: &str) -> Result<Table, SbroadError> {
let name = normalize_name_from_sql(table_name); let name = normalize_name_from_sql(table_name);
match self.tables.get(&name) { match self.tables.get(&name) {
...@@ -406,9 +401,34 @@ impl Configuration for RouterRuntimeMock { ...@@ -406,9 +401,34 @@ impl Configuration for RouterRuntimeMock {
} }
} }
impl Default for RouterRuntimeMock {
fn default() -> Self {
Self::new()
}
}
impl RouterRuntimeMock {
#[allow(dead_code)]
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn new() -> Self {
let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap();
RouterRuntimeMock {
metadata: RefCell::new(RouterConfigurationMock::new()),
virtual_tables: HashMap::new(),
ir_cache: RefCell::new(cache),
}
}
#[allow(dead_code)]
pub fn add_virtual_table(&mut self, id: usize, table: VirtualTable) {
self.virtual_tables.insert(id, table);
}
}
impl Coordinator for RouterRuntimeMock { impl Coordinator for RouterRuntimeMock {
type ParseTree = AbstractSyntaxTree;
type Cache = LRUCache<String, Plan>; type Cache = LRUCache<String, Plan>;
type ParseTree = AbstractSyntaxTree;
fn clear_ir_cache(&self) -> Result<(), SbroadError> { fn clear_ir_cache(&self) -> Result<(), SbroadError> {
*self.ir_cache.borrow_mut() = Self::Cache::new(DEFAULT_CAPACITY, None)?; *self.ir_cache.borrow_mut() = Self::Cache::new(DEFAULT_CAPACITY, None)?;
...@@ -475,27 +495,39 @@ impl Coordinator for RouterRuntimeMock { ...@@ -475,27 +495,39 @@ impl Coordinator for RouterRuntimeMock {
} }
} }
impl Default for RouterRuntimeMock { impl Statistics for RouterRuntimeMock {
fn default() -> Self { #[allow(unused_variables)]
Self::new() fn get_table_stats(&self, table_name: String) -> Result<Rc<TableStats>, SbroadError> {
// Will be added later.
todo!()
} }
}
impl RouterRuntimeMock { #[allow(unused_variables)]
#[allow(dead_code)] fn get_initial_column_stats(
#[allow(clippy::missing_panics_doc)] &self,
#[must_use] table_column_pair: TableColumnPair,
pub fn new() -> Self { ) -> Result<Rc<InitialColumnStats>, SbroadError> {
let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap(); // Will be added later.
RouterRuntimeMock { todo!()
metadata: RefCell::new(RouterConfigurationMock::new()),
virtual_tables: HashMap::new(),
ir_cache: RefCell::new(cache),
}
} }
#[allow(dead_code)] #[allow(unused_variables)]
pub fn add_virtual_table(&mut self, id: usize, table: VirtualTable) { fn update_table_stats_cache(
self.virtual_tables.insert(id, table); &mut self,
table_name: String,
table_stats: TableStats,
) -> Result<(), SbroadError> {
// Will be added later.
todo!()
}
#[allow(unused_variables)]
fn update_column_initial_stats_cache(
&self,
table_column_pair: TableColumnPair,
initial_column_stats: InitialColumnStats,
) -> Result<(), SbroadError> {
// Will be added later.
todo!()
} }
} }
...@@ -19,11 +19,12 @@ use crate::cartridge::config::RouterConfiguration; ...@@ -19,11 +19,12 @@ use crate::cartridge::config::RouterConfiguration;
use crate::cartridge::update_tracing; use crate::cartridge::update_tracing;
use sbroad::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}; use sbroad::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
use sbroad::cbo::{TableColumnPair, TableStats};
use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::errors::{Action, Entity, SbroadError};
use sbroad::executor::bucket::Buckets; use sbroad::executor::bucket::Buckets;
use sbroad::executor::engine::{ use sbroad::executor::engine::{
normalize_name_from_schema, sharding_keys_from_map, sharding_keys_from_tuple, Configuration, normalize_name_from_schema, sharding_keys_from_map, sharding_keys_from_tuple, Configuration,
Coordinator, CoordinatorMetadata, Coordinator, CoordinatorMetadata, InitialColumnStats, Statistics,
}; };
use sbroad::executor::hash::bucket_id_by_tuple; use sbroad::executor::hash::bucket_id_by_tuple;
use sbroad::executor::ir::{ConnectionType, ExecutionPlan, QueryType}; use sbroad::executor::ir::{ConnectionType, ExecutionPlan, QueryType};
...@@ -267,8 +268,8 @@ impl RouterRuntime { ...@@ -267,8 +268,8 @@ impl RouterRuntime {
} }
impl Coordinator for RouterRuntime { impl Coordinator for RouterRuntime {
type ParseTree = AbstractSyntaxTree;
type Cache = LRUCache<String, Plan>; type Cache = LRUCache<String, Plan>;
type ParseTree = AbstractSyntaxTree;
fn clear_ir_cache(&self) -> Result<(), SbroadError> { fn clear_ir_cache(&self) -> Result<(), SbroadError> {
*self.ir_cache.try_borrow_mut().map_err(|e| { *self.ir_cache.try_borrow_mut().map_err(|e| {
...@@ -397,6 +398,43 @@ impl Coordinator for RouterRuntime { ...@@ -397,6 +398,43 @@ impl Coordinator for RouterRuntime {
} }
} }
impl Statistics for RouterRuntime {
#[allow(unused_variables)]
fn get_table_stats(&self, table_name: String) -> Result<Rc<TableStats>, SbroadError> {
// Will be added later.
todo!()
}
#[allow(unused_variables)]
fn get_initial_column_stats(
&self,
table_column_pair: TableColumnPair,
) -> Result<Rc<InitialColumnStats>, SbroadError> {
// Will be added later.
todo!()
}
#[allow(unused_variables)]
fn update_table_stats_cache(
&mut self,
table_name: String,
table_stats: TableStats,
) -> Result<(), SbroadError> {
// Will be added later.
todo!()
}
#[allow(unused_variables)]
fn update_column_initial_stats_cache(
&self,
table_column_pair: TableColumnPair,
initial_column_stats: InitialColumnStats,
) -> Result<(), SbroadError> {
// Will be added later.
todo!()
}
}
impl RouterRuntime { impl RouterRuntime {
/// Create new Tarantool cartridge runtime. /// Create new Tarantool cartridge runtime.
/// ///
......
//! Cost Based Optimizer. //! Cost Based Optimizer.
//! //!
//! Module used to optimize IR tree using statistics and plan cost calculation algorithms. //! Module used to optimize IR tree using statistics and plan cost calculation algorithms.
//!
//! As soon as the biggest part of the logic is taken from
//! `PostgreSQL` implementation, you may see `PostgreSQL lines` comments
//! in some places with indication of function names and corresponding lines of code.
//! `PostgreSQL` version: `REL_15_2`.
use crate::cbo::histogram::Histogram; use crate::cbo::histogram::Histogram;
use crate::errors::{Entity, SbroadError}; use crate::errors::{Entity, SbroadError};
...@@ -9,7 +14,7 @@ use std::collections::HashMap; ...@@ -9,7 +14,7 @@ use std::collections::HashMap;
/// Struct representing statistics for the whole table. /// Struct representing statistics for the whole table.
#[derive(Debug, Clone, PartialEq)] #[derive(Debug, Clone, PartialEq)]
pub(crate) struct TableStats { pub struct TableStats {
/// Table name. /// Table name.
table_name: String, table_name: String,
/// Number of rows in the table. /// Number of rows in the table.
...@@ -26,6 +31,25 @@ pub(crate) struct TableStats { ...@@ -26,6 +31,25 @@ pub(crate) struct TableStats {
remove_counter: u32, remove_counter: u32,
} }
impl TableStats {
#[must_use]
pub fn new(
table_name: String,
rows_number: u64,
insert_counter: u32,
update_counter: u32,
remove_counter: u32,
) -> Self {
Self {
table_name,
rows_number,
insert_counter,
update_counter,
remove_counter,
}
}
}
/// Struct representing statistics for column. /// Struct representing statistics for column.
/// ///
/// May represent transformed statistics, appeared during application /// May represent transformed statistics, appeared during application
...@@ -44,13 +68,13 @@ pub(crate) struct ColumnStats<'col_stats> { ...@@ -44,13 +68,13 @@ pub(crate) struct ColumnStats<'col_stats> {
/// Number of elements in the column. /// Number of elements in the column.
/// ///
/// Note, that the field is filled only ofter `TableStats` for the column table is retrieved. /// Note, that the field is filled only ofter `TableStats` for the column table is retrieved.
elements_count: usize, rows_number: usize,
/// Min value in the column. /// Min value in the column.
min_value: &'col_stats Value, min_value: &'col_stats Value,
/// Max value in the column. /// Max value in the column.
max_value: &'col_stats Value, max_value: &'col_stats Value,
/// Average size of column row in bytes. /// Average size of column row in bytes.
avg_value_size: u64, avg_size: u64,
/// Compressed histogram (equi-height histogram with mcv array). /// Compressed histogram (equi-height histogram with mcv array).
/// ///
/// May have no values inside (`elements_count` field equal to 0) /// May have no values inside (`elements_count` field equal to 0)
...@@ -58,6 +82,37 @@ pub(crate) struct ColumnStats<'col_stats> { ...@@ -58,6 +82,37 @@ pub(crate) struct ColumnStats<'col_stats> {
histogram: &'col_stats Histogram<'col_stats>, histogram: &'col_stats Histogram<'col_stats>,
} }
#[allow(dead_code)]
impl<'column_stats> ColumnStats<'column_stats> {
#[must_use]
pub fn new(
elements_count: usize,
min_value: &'column_stats Value,
max_value: &'column_stats Value,
avg_value_size: u64,
histogram: &'column_stats Histogram,
) -> Self {
Self {
rows_number: elements_count,
min_value,
max_value,
avg_size: avg_value_size,
histogram,
}
}
}
// Alias for pair of table name and column id in the table.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct TableColumnPair(String, usize);
#[allow(dead_code)]
impl TableColumnPair {
pub(crate) fn new(table_name: String, column_id: usize) -> Self {
Self(table_name, column_id)
}
}
/// Structure for global optimizations /// Structure for global optimizations
/// that contains whole statistics information /// that contains whole statistics information
/// which may be useful for optimization. /// which may be useful for optimization.
...@@ -69,7 +124,7 @@ pub(crate) struct CostBasedOptimizer<'cbo> { ...@@ -69,7 +124,7 @@ pub(crate) struct CostBasedOptimizer<'cbo> {
/// that originates from `Scan` nodes during traversal of IR relational operators tree. /// that originates from `Scan` nodes during traversal of IR relational operators tree.
/// Used in `calculate_cost` function in the `Scan` node in order to retrieve stats for /// Used in `calculate_cost` function in the `Scan` node in order to retrieve stats for
/// requested columns. /// requested columns.
initial_column_stats: HashMap<(String, String), ColumnStats<'cbo>>, initial_column_stats: HashMap<TableColumnPair, ColumnStats<'cbo>>,
/// Vector of `Histogram` structures. /// Vector of `Histogram` structures.
/// Initially it's filled with histograms gathered from storages. /// Initially it's filled with histograms gathered from storages.
/// It's updated with new histograms during the statistics transformation process: /// It's updated with new histograms during the statistics transformation process:
...@@ -95,19 +150,19 @@ impl<'cbo> CostBasedOptimizer<'cbo> { ...@@ -95,19 +150,19 @@ impl<'cbo> CostBasedOptimizer<'cbo> {
/// Get `initial_column_stats` map. /// Get `initial_column_stats` map.
#[cfg(test)] #[cfg(test)]
fn get_initial_column_stats(&self) -> &HashMap<(String, String), ColumnStats> { fn get_initial_column_stats(&self) -> &HashMap<TableColumnPair, ColumnStats> {
&self.initial_column_stats &self.initial_column_stats
} }
/// Get value from `initial_column_stats` map by `key` /// Get value from `initial_column_stats` map by `key`
fn get_from_initial_column_stats(&self, key: &(String, String)) -> Option<&ColumnStats> { fn get_from_initial_column_stats(&self, key: &TableColumnPair) -> Option<&ColumnStats> {
self.initial_column_stats.get(key) self.initial_column_stats.get(key)
} }
/// Add new initial column stats to the `initial_column_stats` map. /// Add new initial column stats to the `initial_column_stats` map.
fn update_initial_column_stats( fn update_initial_column_stats(
&'cbo mut self, &'cbo mut self,
key: (String, String), key: TableColumnPair,
stats: ColumnStats<'cbo>, stats: ColumnStats<'cbo>,
) -> Option<ColumnStats> { ) -> Option<ColumnStats> {
self.initial_column_stats.insert(key, stats) self.initial_column_stats.insert(key, stats)
...@@ -141,4 +196,4 @@ impl<'cbo> CostBasedOptimizer<'cbo> { ...@@ -141,4 +196,4 @@ impl<'cbo> CostBasedOptimizer<'cbo> {
} }
} }
mod histogram; pub mod histogram;
...@@ -4,34 +4,66 @@ ...@@ -4,34 +4,66 @@
//! CBO algorithms. //! CBO algorithms.
use crate::errors::{Entity, SbroadError}; use crate::errors::{Entity, SbroadError};
use crate::ir::value::{value_to_decimal_or_error, Value}; use crate::ir::value::{value_to_decimal_or_error, TrivalentOrdering, Value};
use itertools::enumerate; use itertools::enumerate;
use std::fmt::Debug;
use std::str::FromStr; use std::str::FromStr;
/// Helper structure that represents pair of most common value in the column and its frequency. /// Helper structure that represents pair of most common value in the column and its frequency.
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
struct MostCommonValueWithFrequency { pub struct MostCommonValueWithFrequency {
value: Value, value: Value,
frequency: f64, frequency: f64,
} }
impl MostCommonValueWithFrequency { impl MostCommonValueWithFrequency {
#[allow(dead_code)] #[allow(dead_code)]
fn new(value: Value, frequency: f64) -> Self { pub(crate) fn new(value: Value, frequency: f64) -> Self {
MostCommonValueWithFrequency { value, frequency } MostCommonValueWithFrequency { value, frequency }
} }
} }
/// Representation of histogram bucket. /// Representation of histogram bucket.
#[derive(Clone, Debug, PartialEq)] /// Fields:
struct Bucket<'bucket> { /// `from` -- left border value of the bucket.
/// From (left border) value of the bucket (not inclusive, except for the first bucket) /// `to` -- right border value of the bucket (always inclusive).
pub from: &'bucket Value, /// `frequency` -- number of elements stored in the bucket.
/// To (right order) value of the bucket (inclusive) #[derive(PartialEq, Debug, Clone)]
pub to: &'bucket Value, #[allow(dead_code)]
/// Bucket frequency. enum Bucket<'bucket> {
/// Represents the number of elements stored in the bucket. /// Representation of the first histogram bucket with inclusive `from` edge.
pub frequency: usize, First {
from: &'bucket Value,
to: &'bucket Value,
frequency: usize,
},
/// Representation of a non-first histogram bucket with non-inclusive `from` edge.
NonFirst {
from: &'bucket Value,
to: &'bucket Value,
frequency: usize,
},
}
/// Checks whether given value falls into the bucket.
///
/// Returns `None` in case inner call to `partial_cmp` resulted to None.
#[allow(dead_code)]
fn value_falls_into_bucket(bucket: &Bucket, value: &Value) -> Option<bool> {
let (from, to, is_first) = match bucket {
Bucket::First { from, to, .. } => (from, to, true),
Bucket::NonFirst { from, to, .. } => (from, to, false),
};
let from_partial_cmp = value.partial_cmp(from)?;
let to_partial_cmp = value.partial_cmp(to)?;
if (TrivalentOrdering::Greater == from_partial_cmp
|| (is_first && TrivalentOrdering::Equal == from_partial_cmp))
&& (TrivalentOrdering::Less == to_partial_cmp || TrivalentOrdering::Equal == to_partial_cmp)
{
Some(true)
} else {
Some(false)
}
} }
/// Representation of equi-height histogram. /// Representation of equi-height histogram.
...@@ -39,12 +71,9 @@ struct Bucket<'bucket> { ...@@ -39,12 +71,9 @@ struct Bucket<'bucket> {
/// It's assumed that if the histogram is present, then all /// It's assumed that if the histogram is present, then all
/// its fields are filled. /// its fields are filled.
/// ///
/// As soon as the biggest part of the logic is taken from /// **Note**: We don't keep the number of rows stored in the corresponding column during the process
/// `PostgreSQL` implementation, you may see `PostgreSQL lines` comments /// of histogram creation in order to support cases of table size change. We always take the
/// in some places. It means you can find /// information about `rows_number` from `ColumnStats` of corresponding column.
/// implementation of `PostgreSQL` logic by searching the provided text.
///
/// `PostgreSQL` version: `REL_15_2`
#[derive(Debug, PartialEq, Clone)] #[derive(Debug, PartialEq, Clone)]
pub struct Histogram<'histogram> { pub struct Histogram<'histogram> {
// Most common values and their frequencies. // Most common values and their frequencies.
...@@ -59,20 +88,15 @@ pub struct Histogram<'histogram> { ...@@ -59,20 +88,15 @@ pub struct Histogram<'histogram> {
/// * ... /// * ...
/// * i = n -> (b_(n-2); b_(n-1)] /// * i = n -> (b_(n-2); b_(n-1)]
buckets: Vec<Bucket<'histogram>>, buckets: Vec<Bucket<'histogram>>,
/// Fraction of NULL values among all column values. /// Fraction of NULL values among all column rows.
/// Always positive value from 0 to 1.
null_fraction: f64, null_fraction: f64,
/// Number of distinct values for the whole histogram. /// Number of distinct values divided by the number of rows.
/// /// Always positive value from 0 to 1.
/// **Note**: It is easy during the histogram calculation
/// phase to calculate ndv as soon as the elements have to be sorted
/// in order to construct bucket_bounds Vec.
ndv: usize,
/// Number of elements added into histogram.
/// ///
/// **Note**: the number of values added into histogram don't /// **Note**: in order to calculate `number_of_distinct_values` (absolute value) we must
/// have to be equal to the number of rows in the table as soon as /// use formula `rows_number * (1 - null_fraction) * distinct_values_fraction`
/// some rows might have been added after the histogram was created. distinct_values_fraction: f64,
elements_count: usize,
} }
/// Helper structure that represents `String` char sequence. /// Helper structure that represents `String` char sequence.
......
...@@ -109,8 +109,8 @@ where ...@@ -109,8 +109,8 @@ where
plan = cached_plan.clone(); plan = cached_plan.clone();
} }
if plan.is_empty() { if plan.is_empty() {
let ast = C::ParseTree::new(sql)?;
let metadata = &*coordinator.cached_config()?; let metadata = &*coordinator.cached_config()?;
let ast = C::ParseTree::new(sql)?;
plan = ast.resolve_metadata(metadata)?; plan = ast.resolve_metadata(metadata)?;
cache.put(key, plan.clone())?; cache.put(key, plan.clone())?;
} }
......
...@@ -2,10 +2,13 @@ ...@@ -2,10 +2,13 @@
//! //!
//! Traits that define an execution engine interface. //! Traits that define an execution engine interface.
use crate::cbo::histogram::MostCommonValueWithFrequency;
use crate::cbo::{TableColumnPair, TableStats};
use std::any::Any; use std::any::Any;
use std::cell::{Ref, RefCell}; use std::cell::{Ref, RefCell};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::collections::HashMap; use std::collections::HashMap;
use std::rc::Rc;
use crate::errors::{Entity, SbroadError}; use crate::errors::{Entity, SbroadError};
use crate::executor::bucket::Buckets; use crate::executor::bucket::Buckets;
...@@ -161,6 +164,88 @@ pub trait Coordinator: Configuration { ...@@ -161,6 +164,88 @@ pub trait Coordinator: Configuration {
fn determine_bucket_id(&self, s: &[&Value]) -> u64; fn determine_bucket_id(&self, s: &[&Value]) -> u64;
} }
/// Enum that represents initial bucket gathered from storages.
/// It copies `Bucket` enum, where all field are represented by value and not by reference.
#[allow(dead_code)]
#[derive(Debug, Clone)]
enum InitialBucket {
/// Representation of the first histogram bucket with inclusive `from` edge.
First {
from: Value,
to: Value,
frequency: usize,
},
/// Representation of a non-first histogram bucket with non-inclusive `from` edge.
NonFirst {
from: Value,
to: Value,
frequency: usize,
},
}
/// Struct that represents initial histogram gathered from storages.
/// It copies `Histogram` structure, where all field are represented by value and not by reference.
#[allow(dead_code)]
#[derive(Debug, Clone)]
struct InitialHistogram {
most_common: Vec<MostCommonValueWithFrequency>,
buckets: Vec<InitialBucket>,
null_fraction: f64,
distinct_values_fraction: f64,
}
/// Struct that represents initial statistics gathered from storages.
/// It copies `ColumnStats` structure, where all fields are represented by value and not by reference.
///
/// **Note**: `rows_number` field is missing, because during `ColumnStats`
/// structure initialization this information will be passed from `TableStats`.
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub struct InitialColumnStats {
min_value: Value,
max_value: Value,
avg_size: u64,
histogram: InitialHistogram,
}
/// A `CostBased` statistics trait.
pub trait Statistics {
/// Get `TableStats` for table by its name from storages.
///
/// # Errors
/// - Table statistics can not be gathered neither from the cache nor from the storages.
fn get_table_stats(&self, table_name: String) -> Result<Rc<TableStats>, SbroadError>;
/// Get `InitialColumnStats` for column by its table name and column name from storages.
///
/// # Errors
/// - Initial column statistics can not be gathered neither from the cache nor from the storages.
fn get_initial_column_stats(
&self,
table_column_pair: TableColumnPair,
) -> Result<Rc<InitialColumnStats>, SbroadError>;
/// Update `TableStats` cache with given table statistics.
///
/// # Errors
/// - Table statistics couldn't be mutually borrowed.
fn update_table_stats_cache(
&mut self,
table_name: String,
table_stats: TableStats,
) -> Result<(), SbroadError>;
/// Update `InitialColumnStats` cache with given initial column statistics.
///
/// # Errors
/// - Initial column statistics couldn't be mutually borrowed.
fn update_column_initial_stats_cache(
&self,
table_column_pair: TableColumnPair,
initial_column_stats: InitialColumnStats,
) -> Result<(), SbroadError>;
}
/// A common function for all engines to calculate the sharding key value from a tuple. /// A common function for all engines to calculate the sharding key value from a tuple.
/// ///
/// # Errors /// # Errors
......
use std::any::Any; use std::any::Any;
use std::cell::{Ref, RefCell}; use std::cell::{Ref, RefCell};
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::rc::Rc;
use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan}; use crate::backend::sql::tree::{OrderedSyntaxNodes, SyntaxPlan};
use crate::cbo::histogram::MostCommonValueWithFrequency;
use crate::cbo::{TableColumnPair, TableStats};
use crate::collection; use crate::collection;
use crate::errors::{Action, Entity, SbroadError}; use crate::errors::{Action, Entity, SbroadError};
use crate::executor::bucket::Buckets; use crate::executor::bucket::Buckets;
use crate::executor::engine::{ use crate::executor::engine::{
normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration, normalize_name_from_sql, sharding_keys_from_map, sharding_keys_from_tuple, Configuration,
Coordinator, Coordinator, InitialBucket, InitialColumnStats, InitialHistogram, Statistics,
}; };
use crate::executor::hash::bucket_id_by_tuple; use crate::executor::hash::bucket_id_by_tuple;
use crate::executor::ir::ExecutionPlan; use crate::executor::ir::ExecutionPlan;
...@@ -251,6 +254,8 @@ pub struct RouterRuntimeMock { ...@@ -251,6 +254,8 @@ pub struct RouterRuntimeMock {
metadata: RefCell<RouterConfigurationMock>, metadata: RefCell<RouterConfigurationMock>,
virtual_tables: RefCell<HashMap<usize, VirtualTable>>, virtual_tables: RefCell<HashMap<usize, VirtualTable>>,
ir_cache: RefCell<LRUCache<String, Plan>>, ir_cache: RefCell<LRUCache<String, Plan>>,
table_statistics_cache: RefCell<HashMap<String, Rc<TableStats>>>,
initial_column_statistics_cache: RefCell<HashMap<TableColumnPair, Rc<InitialColumnStats>>>,
} }
impl std::fmt::Debug for RouterRuntimeMock { impl std::fmt::Debug for RouterRuntimeMock {
...@@ -258,6 +263,8 @@ impl std::fmt::Debug for RouterRuntimeMock { ...@@ -258,6 +263,8 @@ impl std::fmt::Debug for RouterRuntimeMock {
f.debug_tuple("") f.debug_tuple("")
.field(&self.metadata) .field(&self.metadata)
.field(&self.virtual_tables) .field(&self.virtual_tables)
.field(&self.table_statistics_cache)
.field(&self.initial_column_statistics_cache)
.finish() .finish()
} }
} }
...@@ -320,6 +327,277 @@ impl Configuration for RouterRuntimeMock { ...@@ -320,6 +327,277 @@ impl Configuration for RouterRuntimeMock {
} }
} }
impl Default for RouterRuntimeMock {
fn default() -> Self {
Self::new()
}
}
impl RouterRuntimeMock {
#[allow(dead_code)]
#[allow(clippy::missing_panics_doc)]
#[must_use]
pub fn new() -> Self {
let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap();
let mut table_statistics_cache = HashMap::new();
table_statistics_cache.insert(
"\"hash_testing_hist\"".to_string(),
Rc::new(TableStats::new(
"hash_testing_hist".to_string(),
100,
2100,
1000,
2000,
)),
);
table_statistics_cache.insert(
"\"hash_testing\"".to_string(),
Rc::new(TableStats::new(
"hash_testing".to_string(),
1000,
1200,
300,
200,
)),
);
table_statistics_cache.insert(
"\"test_space\"".to_string(),
Rc::new(TableStats::new(
"test_space".to_string(),
2500,
3000,
1000,
500,
)),
);
// Note that `rows_number` field in inserted column statistics must be equal to the
// `rows_number` field in the corresponding table.
let mut column_statistics_cache = HashMap::new();
// Column statistics with empty histogram.
//
// * rows_number: 100
// * min_value: 1
// * max_value: 50
// * avg_size: 4
// * histogram:
// - null_fraction: 0.0
// - most_common: []
// - ndv (absolute value): 0
// - buckets_count: 0
// - buckets_frequency: 0
// - buckets_boundaries: []
column_statistics_cache.insert(
TableColumnPair::new("\"hash_testing_hist\"".to_string(), 0),
Rc::new(InitialColumnStats {
min_value: Value::Integer(1),
max_value: Value::Integer(50),
avg_size: 4,
histogram: InitialHistogram {
most_common: vec![],
buckets: vec![],
null_fraction: 0.0,
distinct_values_fraction: 0.0,
},
}),
);
// Casual column statistics.
//
// Values `min_value` and `max_value` of `ColumnStats` structure are in fact
// displaying MIN and MAX values of `Histogram` structure (that is seen from its
// `most_common` and `buckets_boundaries` fields).
// An example of statistics, where general column statistics and histogram statistics conform.
//
// * rows_number: 1000
// * min_value: 0
// * max_value: 15
// * avg_size: 8
// * histogram:
// - null_fraction: 0.1 (100)
// - most_common:
// [0 -> 100,
// 1 -> 100,
// 2 -> 50,
// 3 -> 50,
// 4 -> 100]
// - ndv (absolute value): 15 (only 10 in buckets)
// - buckets_count: 5
// - buckets_frequency: 100 (as only 500 elements are stored in buckets)
// - buckets_boundaries: [5, 7, 9, 11, 13, 15]
column_statistics_cache.insert(
TableColumnPair::new("\"hash_testing\"".to_string(), 0),
Rc::new(InitialColumnStats {
min_value: Value::Integer(0),
max_value: Value::Integer(15),
avg_size: 8,
histogram: InitialHistogram {
most_common: vec![
MostCommonValueWithFrequency::new(Value::Integer(0), 100.0),
MostCommonValueWithFrequency::new(Value::Integer(1), 100.0),
MostCommonValueWithFrequency::new(Value::Integer(2), 50.0),
MostCommonValueWithFrequency::new(Value::Integer(3), 50.0),
MostCommonValueWithFrequency::new(Value::Integer(4), 100.0),
],
buckets: vec![
InitialBucket::First {
from: Value::Integer(5),
to: Value::Integer(7),
frequency: 100,
},
InitialBucket::NonFirst {
from: Value::Integer(7),
to: Value::Integer(9),
frequency: 100,
},
InitialBucket::NonFirst {
from: Value::Integer(9),
to: Value::Integer(11),
frequency: 100,
},
InitialBucket::NonFirst {
from: Value::Integer(11),
to: Value::Integer(13),
frequency: 100,
},
InitialBucket::NonFirst {
from: Value::Integer(13),
to: Value::Integer(15),
frequency: 100,
},
],
null_fraction: 0.1,
// 15 / (1000 * (1 - 0.1)) ~ 15
distinct_values_fraction: 0.01666,
},
}),
);
// Column statistics with unique values.
// Note that it's also a column statistics with no `most_common` values.
//
// * rows_number: 1000
// * min_value: 1
// * max_value: 90
// * avg_size: 4
// * histogram:
// - null_fraction: 0.1 (100)
// - most_common: []
// - ndv (absolute value): 900
// - buckets_count: 3
// - buckets_frequency: 300 (as all 900 left elements are stored in buckets)
// - buckets_boundaries: [1, 40, 65, 90]
column_statistics_cache.insert(
TableColumnPair::new("\"hash_testing\"".to_string(), 1),
Rc::new(InitialColumnStats {
min_value: Value::Integer(1),
max_value: Value::Integer(900),
avg_size: 4,
histogram: InitialHistogram {
most_common: vec![],
buckets: vec![
InitialBucket::First {
from: Value::Integer(1),
to: Value::Integer(40),
frequency: 300,
},
InitialBucket::NonFirst {
from: Value::Integer(40),
to: Value::Integer(65),
frequency: 300,
},
InitialBucket::NonFirst {
from: Value::Integer(65),
to: Value::Integer(90),
frequency: 300,
},
],
null_fraction: 0.1,
// 900 / (1000 * (1 - 0.1)) = 1
distinct_values_fraction: 1.0,
},
}),
);
// Casual column statistics, but for different column in different table.
//
// Values `min_value` and `max_value` of `ColumnStats` structure differ
// from MIN and MAX values that we can get from `Histogram` structure (that is seen from its
// `most_common` and `buckets_boundaries` fields).
// An example of statistics, where general column statistics and histogram statistics
// do NOT conform. That means histogram was gathered before updates to the corresponding table were made.
//
// * rows_number: 2500
// * min_value: 1
// * max_value: 2000
// * avg_size: 8
// * histogram:
// - null_fraction: 0.2 (500)
// - most_common:
// [3 -> 250,
// 4 -> 50,
// 5 -> 50,
// 6 -> 150]
// - ndv: 104 (only 100 in buckets)
// - buckets_count: 4
// - buckets_frequency: 375 (as only 1500 elements are stored in buckets)
// - buckets_boundaries: [0, 78, 200, 780, 1800]
column_statistics_cache.insert(
TableColumnPair::new("\"test_space\"".to_string(), 0),
Rc::new(InitialColumnStats {
min_value: Value::Integer(1),
max_value: Value::Integer(2000),
avg_size: 8,
histogram: InitialHistogram {
most_common: vec![
MostCommonValueWithFrequency::new(Value::Integer(3), 150.0),
MostCommonValueWithFrequency::new(Value::Integer(4), 50.0),
MostCommonValueWithFrequency::new(Value::Integer(5), 50.0),
MostCommonValueWithFrequency::new(Value::Integer(6), 150.0),
],
buckets: vec![
InitialBucket::First {
from: Value::Integer(0),
to: Value::Integer(78),
frequency: 375,
},
InitialBucket::NonFirst {
from: Value::Integer(78),
to: Value::Integer(200),
frequency: 375,
},
InitialBucket::NonFirst {
from: Value::Integer(200),
to: Value::Integer(780),
frequency: 375,
},
InitialBucket::NonFirst {
from: Value::Integer(780),
to: Value::Integer(1800),
frequency: 375,
},
],
null_fraction: 0.2,
// 104 / (2500 * (1 - 0.2)) = 0.052
distinct_values_fraction: 0.052,
},
}),
);
RouterRuntimeMock {
metadata: RefCell::new(RouterConfigurationMock::new()),
virtual_tables: RefCell::new(HashMap::new()),
ir_cache: RefCell::new(cache),
table_statistics_cache: RefCell::new(table_statistics_cache),
initial_column_statistics_cache: RefCell::new(column_statistics_cache),
}
}
#[allow(dead_code)]
pub fn add_virtual_table(&self, id: usize, table: VirtualTable) {
self.virtual_tables.borrow_mut().insert(id, table);
}
}
impl Coordinator for RouterRuntimeMock { impl Coordinator for RouterRuntimeMock {
type ParseTree = AbstractSyntaxTree; type ParseTree = AbstractSyntaxTree;
type Cache = LRUCache<String, Plan>; type Cache = LRUCache<String, Plan>;
...@@ -407,28 +685,88 @@ impl Coordinator for RouterRuntimeMock { ...@@ -407,28 +685,88 @@ impl Coordinator for RouterRuntimeMock {
} }
} }
impl Default for RouterRuntimeMock { impl Statistics for RouterRuntimeMock {
fn default() -> Self { fn get_table_stats(&self, table_name: String) -> Result<Rc<TableStats>, SbroadError> {
Self::new() if let Ok(borrow_res) = self.table_statistics_cache.try_borrow() {
if let Some(value) = borrow_res.get(table_name.as_str()) {
Ok(value.clone())
} else {
Err(SbroadError::NotFound(
Entity::Statistics,
String::from("Mocked statistics for table {table_name} wasn't found"),
))
}
} else {
Err(SbroadError::Invalid(
Entity::Statistics,
Some(String::from("Couldn't borrow table statistics")),
))
}
} }
}
impl RouterRuntimeMock { fn get_initial_column_stats(
#[allow(dead_code)] &self,
#[allow(clippy::missing_panics_doc)] table_column_pair: TableColumnPair,
#[must_use] ) -> Result<Rc<InitialColumnStats>, SbroadError> {
pub fn new() -> Self { if let Ok(borrow_res) = self.initial_column_statistics_cache.try_borrow() {
let cache: LRUCache<String, Plan> = LRUCache::new(DEFAULT_CAPACITY, None).unwrap(); if let Some(value) = borrow_res.get(&table_column_pair) {
RouterRuntimeMock { Ok(value.clone())
metadata: RefCell::new(RouterConfigurationMock::new()), } else {
virtual_tables: RefCell::new(HashMap::new()), Err(SbroadError::NotFound(
ir_cache: RefCell::new(cache), Entity::Statistics,
String::from(
"Mocked statistics for table/column pair {table_column_paid} wasn't found",
),
))
}
} else {
Err(SbroadError::Invalid(
Entity::Statistics,
Some(String::from("Couldn't borrow initial column statistics")),
))
} }
} }
#[allow(dead_code)] fn update_table_stats_cache(
pub fn add_virtual_table(&self, id: usize, table: VirtualTable) { &mut self,
self.virtual_tables.borrow_mut().insert(id, table); table_name: String,
table_stats: TableStats,
) -> Result<(), SbroadError> {
if let Ok(mut borrow_res) = self.table_statistics_cache.try_borrow_mut() {
let value = borrow_res.get_mut(table_name.as_str());
if let Some(value) = value {
*value = Rc::new(table_stats)
} else {
borrow_res.insert(table_name, Rc::new(table_stats));
}
Ok(())
} else {
Err(SbroadError::Invalid(
Entity::Statistics,
Some(String::from("Couldn't borrow table statistics")),
))
}
}
fn update_column_initial_stats_cache(
&self,
table_column_pair: TableColumnPair,
initial_column_stats: InitialColumnStats,
) -> Result<(), SbroadError> {
if let Ok(mut borrow_res) = self.initial_column_statistics_cache.try_borrow_mut() {
let value = borrow_res.get_mut(&table_column_pair);
if let Some(value) = value {
*value = Rc::new(initial_column_stats)
} else {
borrow_res.insert(table_column_pair, Rc::new(initial_column_stats));
}
Ok(())
} else {
Err(SbroadError::Invalid(
Entity::Statistics,
Some(String::from("Couldn't borrow initial column statistics")),
))
}
} }
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment