executor.rs 14.17 KiB
//! Executor module.
//!
//! The executor is located on the coordinator node in the cluster.
//! It collects all the intermediate results of the plan execution
//! in memory and executes the IR plan tree in the bottom-up manner.
//! It goes like this:
//!
//! 1. The executor collects all the motion nodes from the bottom layer.
//! In theory all the motions in the same layer can be executed in parallel
//! (this feature is yet to come).
//! 2. For every motion the executor:
//! - inspects the IR sub-tree and detects the buckets to execute the query for.
//! - builds a valid SQL query from the IR sub-tree.
//! - performs map-reduce for that SQL query (we send it to the shards deduced from the buckets).
//! - builds a virtual table with query results that correspond to the original motion.
//! 3. Moves to the next motion layer in the IR tree.
//! 4. For every motion the executor then:
//! - links the virtual table results of the motion from the previous layer we depend on.
//! - inspects the IR sub-tree and detects the buckets to execute the query.
//! - builds a valid SQL query from the IR sub-tree.
//! - performs map-reduce for that SQL query.
//! - builds a virtual table with query results that correspond to the original motion.
//! 5. Repeats step 3 till we are done with motion layers.
//! 6. Executes the final IR top subtree and returns the final result to the user.
use std::any::Any;
use std::collections::HashMap;
use std::rc::Rc;
use self::engine::query_id;
use crate::errors::{Entity, SbroadError};
use crate::executor::bucket::Buckets;
use crate::executor::engine::{Router, TableVersionMap, Vshard};
use crate::executor::ir::ExecutionPlan;
use crate::executor::lru::Cache;
use crate::frontend::Ast;
use crate::ir::node::relational::Relational;
use crate::ir::node::{Motion, NodeId};
use crate::ir::transformation::redistribution::MotionPolicy;
use crate::ir::value::Value;
use crate::ir::{Options, Plan, Slices};
use crate::utils::MutexLike;
use smol_str::SmolStr;
pub mod bucket;
pub mod engine;
pub mod hash;
pub mod ir;
pub mod lru;
pub mod protocol;
pub mod result;
pub mod vtable;
impl Plan {
/// Apply optimization rules to the plan.
///
/// # Errors
/// - Failed to optimize the plan.
pub fn optimize(&mut self) -> Result<(), SbroadError> {
self.cast_constants()?;
self.replace_in_operator()?;
self.push_down_not()?;
self.split_columns()?;
self.set_dnf()?;
self.derive_equalities()?;
self.merge_tuples()?;
self.add_motions()?;
Ok(())
}
}
/// Query to execute.
#[derive(Debug)]
pub struct Query<'a, C>
where
C: Router,
{
/// Explain flag
is_explain: bool,
/// Execution plan
exec_plan: ExecutionPlan,
/// Coordinator runtime
coordinator: &'a C,
/// Bucket map of view { plan output_id (Expression::Row) -> `Buckets` }.
/// It's supposed to denote relational nodes' output buckets destination.
bucket_map: HashMap<NodeId, Buckets>,
}
impl<'a, C> Query<'a, C>
where
C: Router,
{
pub fn from_parts(
is_explain: bool,
exec_plan: ExecutionPlan,
coordinator: &'a C,
bucket_map: HashMap<NodeId, Buckets>,
) -> Self {
Self {
is_explain,
exec_plan,
coordinator,
bucket_map,
}
}
/// Create a new query.
///
/// # Errors
/// - Failed to parse SQL.
/// - Failed to build AST.
/// - Failed to build IR plan.
/// - Failed to apply optimizing transformations to IR plan.
pub fn with_options(
coordinator: &'a C,
sql: &str,
params: Vec<Value>,
options: Option<Options>,
) -> Result<Self, SbroadError>
where
C::Cache: Cache<SmolStr, Plan>,
C::ParseTree: Ast,
{
let key = query_id(sql);
let mut cache = coordinator.cache().lock();
let mut plan = Plan::new();
if let Some(cached_plan) = cache.get(&key)? {
plan = cached_plan.clone();
}
if plan.is_empty() {
let metadata = coordinator.metadata().lock();
plan = C::ParseTree::transform_into_plan(sql, &*metadata)?;
// Empty query.
if plan.is_empty() {
return Ok(Query::empty(coordinator));
}
if coordinator.provides_versions() {
let mut table_version_map =
TableVersionMap::with_capacity(plan.relations.tables.len());
for (tbl_name, tbl) in &plan.relations.tables {
if tbl.is_system() {
continue;
}
let normalized = tbl_name;
let version = coordinator.get_table_version(normalized.as_str())?;
table_version_map.insert(normalized.clone(), version);
}
plan.version_map = table_version_map;
}
if !plan.is_ddl()? && !plan.is_acl()? && !plan.is_plugin()? {
cache.put(key, plan.clone())?;
}
}
if let Some(options) = options {
plan.options = options;
}
if plan.is_block()? {
plan.bind_params(params)?;
} else if !plan.is_ddl()?
&& !plan.is_acl()?
&& !plan.is_plugin()?
&& !plan.is_deallocate()?
&& !plan.is_tcl()?
{
plan.bind_params(params)?;
plan.apply_options()?;
plan.optimize()?;
}
let query = Query {
is_explain: plan.is_explain(),
exec_plan: ExecutionPlan::from(plan),
coordinator,
bucket_map: HashMap::new(),
};
Ok(query)
}
/// Create a new query.
///
/// # Errors
/// - Failed to parse SQL.
/// - Failed to build AST.
/// - Failed to build IR plan.
/// - Failed to apply optimizing transformations to IR plan.
pub fn new(coordinator: &'a C, sql: &str, params: Vec<Value>) -> Result<Self, SbroadError>
where
C::Cache: Cache<SmolStr, Plan>,
C::ParseTree: Ast,
{
Self::with_options(coordinator, sql, params, None)
}
fn empty(coordinator: &'a C) -> Self {
Self {
is_explain: false,
exec_plan: Plan::empty().into(),
coordinator,
bucket_map: Default::default(),
}
}
/// Get the execution plan of the query.
#[must_use]
pub fn get_exec_plan(&self) -> &ExecutionPlan {
&self.exec_plan
}
/// Get the mutable reference to the execution plan of the query.
#[must_use]
pub fn get_mut_exec_plan(&mut self) -> &mut ExecutionPlan {
&mut self.exec_plan
}
/// Get the coordinator runtime of the query.
#[must_use]
pub fn get_coordinator(&self) -> &C {
self.coordinator
}
pub fn materialize_subtree(&mut self, slices: Slices) -> Result<(), SbroadError> {
let tier = self.exec_plan.get_ir_plan().tier.as_ref();
// all tables from one tier, so we can use corresponding vshard object
let vshard = self.coordinator.get_vshard_object_by_tier(tier)?;
for slice in slices.slices() {
// TODO: make it work in parallel
for motion_id in slice.positions() {
if let Some(vtables_map) = self.exec_plan.get_vtables() {
if vtables_map.contains_key(motion_id) {
continue;
}
}
let motion = self.exec_plan.get_ir_plan().get_relation_node(*motion_id)?;
if let Relational::Motion(Motion { policy, .. }) = motion {
match policy {
MotionPolicy::Segment(_) => {
// If child is values, then we can materialize it
// on the router.
let motion_child_id =
self.get_exec_plan().get_motion_child(*motion_id)?;
let motion_child = self
.get_exec_plan()
.get_ir_plan()
.get_relation_node(motion_child_id)?;
if matches!(motion_child, Relational::Values { .. }) {
let virtual_table = self
.coordinator
.materialize_values(&mut self.exec_plan, motion_child_id)?;
self.exec_plan.set_motion_vtable(
motion_id,
virtual_table,
&vshard,
)?;
self.get_mut_exec_plan().unlink_motion_subtree(*motion_id)?;
continue;
}
}
// Skip it and dispatch the query to the segments
// (materialization would be done on the segments). Note that we
// will operate with vtables for LocalSegment motions via calls like
// `self.exec_plan.contains_vtable_for_motion(node_id)`
// in order to define whether virtual table was materialized for values.
MotionPolicy::LocalSegment(_) => {
continue;
}
// Local policy should be skipped and dispatched to the segments:
// materialization would be done there.
MotionPolicy::Local => continue,
_ => {}
}
}
let top_id = self.exec_plan.get_motion_subtree_root(*motion_id)?;
let buckets = self.bucket_discovery(top_id)?;
let virtual_table = self.coordinator.materialize_motion(
&mut self.exec_plan,
motion_id,
&buckets,
)?;
self.exec_plan
.set_motion_vtable(motion_id, virtual_table, &vshard)?;
}
}
Ok(())
}
/// Builds explain from current query
///
/// # Errors
/// - Failed to build explain
pub fn produce_explain(&mut self) -> Result<Box<dyn Any>, SbroadError> {
self.coordinator.explain_format(self.to_explain()?)
}
/// Dispatch a distributed query from coordinator to the segments.
///
/// # Errors
/// - Failed to get a motion subtree.
/// - Failed to discover buckets.
/// - Failed to materialize motion result and build a virtual table.
/// - Failed to get plan top.
pub fn dispatch(&mut self) -> Result<Box<dyn Any>, SbroadError> {
if self.is_explain() {
return self.produce_explain();
}
self.get_mut_exec_plan()
.get_mut_ir_plan()
.restore_constants()?;
let slices = self.exec_plan.get_ir_plan().clone_slices();
self.materialize_subtree(slices)?;
let ir_plan = self.exec_plan.get_ir_plan();
let top_id = ir_plan.get_top()?;
if ir_plan.get_relation_node(top_id)?.is_motion() {
let err =
|s: &str| -> SbroadError { SbroadError::Invalid(Entity::Plan, Some(s.into())) };
let motion_aliases = ir_plan.get_relational_aliases(top_id)?;
let Some(vtables) = self.exec_plan.get_mut_vtables() else {
return Err(err("no vtables in plan with motion top"));
};
let Some(mut vtable) = vtables.remove(&top_id) else {
return Err(err(&format!("no motion on top_id: {top_id:?}")));
};
let Some(v) = Rc::get_mut(&mut vtable) else {
return Err(err("there are other refs to vtable"));
};
return v.to_output(&motion_aliases);
}
let buckets = self.bucket_discovery(top_id)?;
self.coordinator.dispatch(
&mut self.exec_plan,
top_id,
&buckets,
engine::DispatchReturnFormat::Tuple,
)
}
/// Query explain
///
/// # Errors
/// - Failed to build explain
pub fn to_explain(&mut self) -> Result<SmolStr, SbroadError> {
self.as_explain()
}
/// Checks that query is explain and have not to be executed
pub fn is_explain(&self) -> bool {
self.is_explain
}
/// Checks that query is a statement block.
///
/// # Errors
/// - plan is invalid
pub fn is_block(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_block()
}
/// Checks that query is DDL.
///
/// # Errors
/// - Plan is invalid.
pub fn is_ddl(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_ddl()
}
/// Checks that query is ACL.
///
/// # Errors
/// - Plan is invalid
pub fn is_acl(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_acl()
}
/// Checks that query is TCL.
///
/// # Errors
/// - Plan is invalid
pub fn is_tcl(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_tcl()
}
#[cfg(test)]
pub fn get_motion_id(&self, slice_id: usize, pos_idx: usize) -> NodeId {
*self
.exec_plan
.get_ir_plan()
.clone_slices()
.slice(slice_id)
.unwrap()
.position(pos_idx)
.unwrap()
}
#[must_use]
pub fn get_buckets(&self, output_id: NodeId) -> Option<&Buckets> {
self.bucket_map.get(&output_id)
}
/// Checks that query is for plugin.
///
/// # Errors
/// - Plan is invalid
pub fn is_plugin(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_plugin()
}
/// Checks that query is Deallocate.
///
/// # Errors
/// - Plan is invalid
pub fn is_deallocate(&self) -> Result<bool, SbroadError> {
self.exec_plan.get_ir_plan().is_deallocate()
}
/// Checks that query is an empty query.
pub fn is_empty(&self) -> bool {
self.exec_plan.get_ir_plan().is_empty()
}
}
#[cfg(test)]
mod tests;