Skip to content
Snippets Groups Projects
executor.rs 14.17 KiB
//! Executor module.
//!
//! The executor is located on the coordinator node in the cluster.
//! It collects all the intermediate results of the plan execution
//! in memory and executes the IR plan tree in the bottom-up manner.
//! It goes like this:
//!
//! 1. The executor collects all the motion nodes from the bottom layer.
//!    In theory all the motions in the same layer can be executed in parallel
//!    (this feature is yet to come).
//! 2. For every motion the executor:
//!    - inspects the IR sub-tree and detects the buckets to execute the query for.
//!    - builds a valid SQL query from the IR sub-tree.
//!    - performs map-reduce for that SQL query (we send it to the shards deduced from the buckets).
//!    - builds a virtual table with query results that correspond to the original motion.
//! 3. Moves to the next motion layer in the IR tree.
//! 4. For every motion the executor then:
//!    - links the virtual table results of the motion from the previous layer we depend on.
//!    - inspects the IR sub-tree and detects the buckets to execute the query.
//!    - builds a valid SQL query from the IR sub-tree.
//!    - performs map-reduce for that SQL query.
//!    - builds a virtual table with query results that correspond to the original motion.
//! 5. Repeats step 3 till we are done with motion layers.
//! 6. Executes the final IR top subtree and returns the final result to the user.

use std::any::Any;
use std::collections::HashMap;
use std::rc::Rc;

use self::engine::query_id;
use crate::errors::{Entity, SbroadError};
use crate::executor::bucket::Buckets;
use crate::executor::engine::{Router, TableVersionMap, Vshard};
use crate::executor::ir::ExecutionPlan;
use crate::executor::lru::Cache;
use crate::frontend::Ast;
use crate::ir::node::relational::Relational;
use crate::ir::node::{Motion, NodeId};
use crate::ir::transformation::redistribution::MotionPolicy;
use crate::ir::value::Value;
use crate::ir::{Options, Plan, Slices};
use crate::utils::MutexLike;
use smol_str::SmolStr;

pub mod bucket;
pub mod engine;
pub mod hash;
pub mod ir;
pub mod lru;
pub mod protocol;
pub mod result;
pub mod vtable;

impl Plan {
    /// Apply optimization rules to the plan.
    ///
    /// # Errors
    /// - Failed to optimize the plan.
    pub fn optimize(&mut self) -> Result<(), SbroadError> {
        self.cast_constants()?;
        self.replace_in_operator()?;
        self.push_down_not()?;
        self.split_columns()?;
        self.set_dnf()?;
        self.derive_equalities()?;
        self.merge_tuples()?;
        self.add_motions()?;
        Ok(())
    }
}

/// Query to execute.
#[derive(Debug)]
pub struct Query<'a, C>
where
    C: Router,
{
    /// Explain flag
    is_explain: bool,
    /// Execution plan
    exec_plan: ExecutionPlan,
    /// Coordinator runtime
    coordinator: &'a C,
    /// Bucket map of view { plan output_id (Expression::Row) -> `Buckets` }.
    /// It's supposed to denote relational nodes' output buckets destination.
    bucket_map: HashMap<NodeId, Buckets>,
}

impl<'a, C> Query<'a, C>
where
    C: Router,
{
    pub fn from_parts(
        is_explain: bool,
        exec_plan: ExecutionPlan,
        coordinator: &'a C,
        bucket_map: HashMap<NodeId, Buckets>,
    ) -> Self {
        Self {
            is_explain,
            exec_plan,
            coordinator,
            bucket_map,
        }
    }

    /// Create a new query.
    ///
    /// # Errors
    /// - Failed to parse SQL.
    /// - Failed to build AST.
    /// - Failed to build IR plan.
    /// - Failed to apply optimizing transformations to IR plan.
    pub fn with_options(
        coordinator: &'a C,
        sql: &str,
        params: Vec<Value>,
        options: Option<Options>,
    ) -> Result<Self, SbroadError>
    where
        C::Cache: Cache<SmolStr, Plan>,
        C::ParseTree: Ast,
    {
        let key = query_id(sql);
        let mut cache = coordinator.cache().lock();

        let mut plan = Plan::new();
        if let Some(cached_plan) = cache.get(&key)? {
            plan = cached_plan.clone();
        }
        if plan.is_empty() {
            let metadata = coordinator.metadata().lock();
            plan = C::ParseTree::transform_into_plan(sql, &*metadata)?;
            // Empty query.
            if plan.is_empty() {
                return Ok(Query::empty(coordinator));
            }

            if coordinator.provides_versions() {
                let mut table_version_map =
                    TableVersionMap::with_capacity(plan.relations.tables.len());
                for (tbl_name, tbl) in &plan.relations.tables {
                    if tbl.is_system() {
                        continue;
                    }
                    let normalized = tbl_name;
                    let version = coordinator.get_table_version(normalized.as_str())?;
                    table_version_map.insert(normalized.clone(), version);
                }
                plan.version_map = table_version_map;
            }
            if !plan.is_ddl()? && !plan.is_acl()? && !plan.is_plugin()? {
                cache.put(key, plan.clone())?;
            }
        }

        if let Some(options) = options {
            plan.options = options;
        }

        if plan.is_block()? {
            plan.bind_params(params)?;
        } else if !plan.is_ddl()?
            && !plan.is_acl()?
            && !plan.is_plugin()?
            && !plan.is_deallocate()?
            && !plan.is_tcl()?
        {
            plan.bind_params(params)?;
            plan.apply_options()?;
            plan.optimize()?;
        }
        let query = Query {
            is_explain: plan.is_explain(),
            exec_plan: ExecutionPlan::from(plan),
            coordinator,
            bucket_map: HashMap::new(),
        };
        Ok(query)
    }

    /// Create a new query.
    ///
    /// # Errors
    /// - Failed to parse SQL.
    /// - Failed to build AST.
    /// - Failed to build IR plan.
    /// - Failed to apply optimizing transformations to IR plan.
    pub fn new(coordinator: &'a C, sql: &str, params: Vec<Value>) -> Result<Self, SbroadError>
    where
        C::Cache: Cache<SmolStr, Plan>,
        C::ParseTree: Ast,
    {
        Self::with_options(coordinator, sql, params, None)
    }

    fn empty(coordinator: &'a C) -> Self {
        Self {
            is_explain: false,
            exec_plan: Plan::empty().into(),
            coordinator,
            bucket_map: Default::default(),
        }
    }

    /// Get the execution plan of the query.
    #[must_use]
    pub fn get_exec_plan(&self) -> &ExecutionPlan {
        &self.exec_plan
    }

    /// Get the mutable reference to the execution plan of the query.
    #[must_use]
    pub fn get_mut_exec_plan(&mut self) -> &mut ExecutionPlan {
        &mut self.exec_plan
    }

    /// Get the coordinator runtime of the query.
    #[must_use]
    pub fn get_coordinator(&self) -> &C {
        self.coordinator
    }

    pub fn materialize_subtree(&mut self, slices: Slices) -> Result<(), SbroadError> {
        let tier = self.exec_plan.get_ir_plan().tier.as_ref();
        // all tables from one tier, so we can use corresponding vshard object
        let vshard = self.coordinator.get_vshard_object_by_tier(tier)?;

        for slice in slices.slices() {
            // TODO: make it work in parallel
            for motion_id in slice.positions() {
                if let Some(vtables_map) = self.exec_plan.get_vtables() {
                    if vtables_map.contains_key(motion_id) {
                        continue;
                    }
                }
                let motion = self.exec_plan.get_ir_plan().get_relation_node(*motion_id)?;
                if let Relational::Motion(Motion { policy, .. }) = motion {
                    match policy {
                        MotionPolicy::Segment(_) => {
                            // If child is values, then we can materialize it
                            // on the router.
                            let motion_child_id =
                                self.get_exec_plan().get_motion_child(*motion_id)?;
                            let motion_child = self
                                .get_exec_plan()
                                .get_ir_plan()
                                .get_relation_node(motion_child_id)?;

                            if matches!(motion_child, Relational::Values { .. }) {
                                let virtual_table = self
                                    .coordinator
                                    .materialize_values(&mut self.exec_plan, motion_child_id)?;
                                self.exec_plan.set_motion_vtable(
                                    motion_id,
                                    virtual_table,
                                    &vshard,
                                )?;
                                self.get_mut_exec_plan().unlink_motion_subtree(*motion_id)?;
                                continue;
                            }
                        }
                        // Skip it and dispatch the query to the segments
                        // (materialization would be done on the segments). Note that we
                        // will operate with vtables for LocalSegment motions via calls like
                        // `self.exec_plan.contains_vtable_for_motion(node_id)`
                        // in order to define whether virtual table was materialized for values.
                        MotionPolicy::LocalSegment(_) => {
                            continue;
                        }
                        // Local policy should be skipped and dispatched to the segments:
                        // materialization would be done there.
                        MotionPolicy::Local => continue,
                        _ => {}
                    }
                }

                let top_id = self.exec_plan.get_motion_subtree_root(*motion_id)?;

                let buckets = self.bucket_discovery(top_id)?;
                let virtual_table = self.coordinator.materialize_motion(
                    &mut self.exec_plan,
                    motion_id,
                    &buckets,
                )?;
                self.exec_plan
                    .set_motion_vtable(motion_id, virtual_table, &vshard)?;
            }
        }

        Ok(())
    }

    /// Builds explain from current query
    ///
    /// # Errors
    /// - Failed to build explain
    pub fn produce_explain(&mut self) -> Result<Box<dyn Any>, SbroadError> {
        self.coordinator.explain_format(self.to_explain()?)
    }

    /// Dispatch a distributed query from coordinator to the segments.
    ///
    /// # Errors
    /// - Failed to get a motion subtree.
    /// - Failed to discover buckets.
    /// - Failed to materialize motion result and build a virtual table.
    /// - Failed to get plan top.
    pub fn dispatch(&mut self) -> Result<Box<dyn Any>, SbroadError> {
        if self.is_explain() {
            return self.produce_explain();
        }
        self.get_mut_exec_plan()
            .get_mut_ir_plan()
            .restore_constants()?;

        let slices = self.exec_plan.get_ir_plan().clone_slices();
        self.materialize_subtree(slices)?;
        let ir_plan = self.exec_plan.get_ir_plan();
        let top_id = ir_plan.get_top()?;
        if ir_plan.get_relation_node(top_id)?.is_motion() {
            let err =
                |s: &str| -> SbroadError { SbroadError::Invalid(Entity::Plan, Some(s.into())) };
            let motion_aliases = ir_plan.get_relational_aliases(top_id)?;
            let Some(vtables) = self.exec_plan.get_mut_vtables() else {
                return Err(err("no vtables in plan with motion top"));
            };
            let Some(mut vtable) = vtables.remove(&top_id) else {
                return Err(err(&format!("no motion on top_id: {top_id:?}")));
            };
            let Some(v) = Rc::get_mut(&mut vtable) else {
                return Err(err("there are other refs to vtable"));
            };
            return v.to_output(&motion_aliases);
        }
        let buckets = self.bucket_discovery(top_id)?;
        self.coordinator.dispatch(
            &mut self.exec_plan,
            top_id,
            &buckets,
            engine::DispatchReturnFormat::Tuple,
        )
    }

    /// Query explain
    ///
    /// # Errors
    /// - Failed to build explain
    pub fn to_explain(&mut self) -> Result<SmolStr, SbroadError> {
        self.as_explain()
    }

    /// Checks that query is explain and have not to be executed
    pub fn is_explain(&self) -> bool {
        self.is_explain
    }

    /// Checks that query is a statement block.
    ///
    /// # Errors
    /// - plan is invalid
    pub fn is_block(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_block()
    }

    /// Checks that query is DDL.
    ///
    /// # Errors
    /// - Plan is invalid.
    pub fn is_ddl(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_ddl()
    }

    /// Checks that query is ACL.
    ///
    /// # Errors
    /// - Plan is invalid
    pub fn is_acl(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_acl()
    }

    /// Checks that query is TCL.
    ///
    /// # Errors
    /// - Plan is invalid
    pub fn is_tcl(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_tcl()
    }

    #[cfg(test)]
    pub fn get_motion_id(&self, slice_id: usize, pos_idx: usize) -> NodeId {
        *self
            .exec_plan
            .get_ir_plan()
            .clone_slices()
            .slice(slice_id)
            .unwrap()
            .position(pos_idx)
            .unwrap()
    }

    #[must_use]
    pub fn get_buckets(&self, output_id: NodeId) -> Option<&Buckets> {
        self.bucket_map.get(&output_id)
    }

    /// Checks that query is for plugin.
    ///
    /// # Errors
    /// - Plan is invalid
    pub fn is_plugin(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_plugin()
    }

    /// Checks that query is Deallocate.
    ///
    /// # Errors
    /// - Plan is invalid
    pub fn is_deallocate(&self) -> Result<bool, SbroadError> {
        self.exec_plan.get_ir_plan().is_deallocate()
    }

    /// Checks that query is an empty query.
    pub fn is_empty(&self) -> bool {
        self.exec_plan.get_ir_plan().is_empty()
    }
}

#[cfg(test)]
mod tests;