diff --git a/sbroad-core/src/executor/ir.rs b/sbroad-core/src/executor/ir.rs index e0c5f8b1e7d6ebb93cabd1e3dd8271a119d7764c..a13c40ec9983b706886de0526073436a0a9a12ac 100644 --- a/sbroad-core/src/executor/ir.rs +++ b/sbroad-core/src/executor/ir.rs @@ -379,21 +379,51 @@ impl ExecutionPlan { // We can't replace CTE subtree as it can be reused in other slices of the plan. // So, collect all CTE nodes and their subtree nodes (relational and expression) // as a set to avoid their removal. - let cte_id_iter = nodes.iter().map(|(_, id)| *id).filter(|id| { - matches!( - plan.get_node(*id), - Ok(Node::Relational(Relational::ScanCte { .. })) - ) - }); + let cte_scans = nodes + .iter() + .map(|(_, id)| *id) + .filter(|id| { + matches!( + plan.get_node(*id), + Ok(Node::Relational(Relational::ScanCte { .. })) + ) + }) + .collect::<Vec<_>>(); + + // Get the capacity of the CTE nodes. We expect that CTE subtree nodes are located + // in the beginning of the plan arena (while CTE scans can be located anywhere). + // So, we get the biggest child id of the CTE nodes and add 1 to get the capacity. + let mut all_cte_nodes_capacity = 0; + let mut cte_amount = 0; + for cte_id in &cte_scans { + let cte_node = plan.get_relation_node(*cte_id)?; + let Relational::ScanCte { child, .. } = cte_node else { + unreachable!("Expected CTE scan node."); + }; + let child_id = *child; + if all_cte_nodes_capacity < child_id { + all_cte_nodes_capacity = child_id; + } + cte_amount += 1; + } + all_cte_nodes_capacity += 1; + let single_cte_capacity = if cte_amount <= 1 { + all_cte_nodes_capacity + } else { + all_cte_nodes_capacity / cte_amount * 2 + }; + let mut cte_ids: AHashSet<usize> = AHashSet::new(); let mut is_reserved = false; - for cte_id in cte_id_iter { + for cte_id in cte_scans { if !is_reserved { is_reserved = true; - cte_ids.reserve(nodes.len()); + cte_ids.reserve(all_cte_nodes_capacity); } - let mut cte_subtree = - PostOrder::with_capacity(|node| plan.exec_plan_subtree_iter(node), nodes.len()); + let mut cte_subtree = PostOrder::with_capacity( + |node| plan.exec_plan_subtree_iter(node), + single_cte_capacity, + ); for (_, id) in cte_subtree.iter(cte_id) { cte_ids.insert(id); } diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs index 84616fe4073248e945d5331817e53d721b48a61b..a004ba480c003f5c3335246da055be86a3a2731c 100644 --- a/sbroad-core/src/frontend/sql.rs +++ b/sbroad-core/src/frontend/sql.rs @@ -2470,8 +2470,15 @@ impl AbstractSyntaxTree { map.add(id, rel_child_id_plan); if let Some(ast_alias_id) = node.children.get(1) { let alias_name = parse_normalized_identifier(self, *ast_alias_id)?; - let scan = plan.get_mut_relation_node(rel_child_id_plan)?; - scan.set_scan_name(Some(alias_name.to_smolstr()))?; + // CTE scans can have different aliases, so clone the CTE scan node, + // preserving its subtree. + if let Relational::ScanCte { child, .. } = rel_child_node { + let scan_id = plan.add_cte(*child, alias_name, vec![])?; + map.add(id, scan_id); + } else { + let scan = plan.get_mut_relation_node(rel_child_id_plan)?; + scan.set_scan_name(Some(alias_name.to_smolstr()))?; + } } } Rule::ScanTable => { diff --git a/sbroad-core/src/frontend/sql/ir/tests/cte.rs b/sbroad-core/src/frontend/sql/ir/tests/cte.rs index 6a85fc21ddb3a3a343953cdc1db28e5d3a2c1bda..d22bcb52b9c7e97a24902a5693bfd9e234297a94 100644 --- a/sbroad-core/src/frontend/sql/ir/tests/cte.rs +++ b/sbroad-core/src/frontend/sql/ir/tests/cte.rs @@ -99,6 +99,43 @@ vtable_max_rows = 5000 assert_eq!(expected_explain, plan.as_explain().unwrap()); } +#[test] +fn reuse_cte_values() { + let sql = r#" + WITH cte (b) AS (VALUES(1)) + SELECT t.c FROM (SELECT count(*) as c FROM cte c1 JOIN cte c2 ON true) t + JOIN cte ON true + "#; + let plan = sql_to_optimized_ir(sql, vec![]); + println!("{}", plan.as_explain().unwrap()); + + let expected_explain = String::from( + r#"projection ("T"."C"::integer -> "C") + join on true::boolean + scan "T" + projection (count((*::integer))::integer -> "C") + join on true::boolean + scan cte "C1"($0) + scan cte "C2"($0) + scan cte "CTE"($1) +subquery $0: +projection ("CTE"."COLUMN_1"::unsigned -> "B") + scan "CTE" + values + value row (data=ROW(1::unsigned)) +subquery $1: +projection ("CTE"."COLUMN_1"::unsigned -> "B") + scan "CTE" + values + value row (data=ROW(1::unsigned)) +execution options: +sql_vdbe_max_steps = 45000 +vtable_max_rows = 5000 +"#, + ); + assert_eq!(expected_explain, plan.as_explain().unwrap()); +} + #[test] fn join_cte() { let sql = r#" diff --git a/sbroad-core/src/ir/transformation/redistribution.rs b/sbroad-core/src/ir/transformation/redistribution.rs index 15ef4a9936f8e39700f78c4935049ea2f401bbb4..35caaee58cc8a983a55e0ede802b31dc2e4a1e3b 100644 --- a/sbroad-core/src/ir/transformation/redistribution.rs +++ b/sbroad-core/src/ir/transformation/redistribution.rs @@ -29,6 +29,8 @@ pub(crate) mod eq_cols; pub(crate) mod groupby; pub(crate) mod left_join; +const CTE_CAPACITY: usize = 8; + #[derive(Debug)] pub(crate) enum JoinChild { Inner, @@ -2147,6 +2149,9 @@ impl Plan { /// - failed to set distribution #[otm_child_span("plan.transformation.add_motions")] pub fn add_motions(&mut self) -> Result<(), SbroadError> { + type CteChildId = usize; + type MotionId = usize; + let mut cte_motions: AHashMap<CteChildId, MotionId> = AHashMap::with_capacity(CTE_CAPACITY); let top = self.get_top()?; let mut post_tree = PostOrder::with_capacity(|node| self.nodes.rel_iter(node), REL_CAPACITY); @@ -2288,9 +2293,20 @@ impl Plan { self.create_motion_nodes(strategy)?; self.set_distribution(output)?; } - Relational::ScanCte { output, .. } => { - let strategy = self.resolve_cte_conflicts(id)?; - self.create_motion_nodes(strategy)?; + Relational::ScanCte { output, child, .. } => { + // Possible, current CTE subtree has already been resolved and we + // can just copy the corresponding motion node. + if let Some(motion_id) = cte_motions.get(&child) { + self.set_relational_children(id, vec![*motion_id])?; + } else { + let strategy = self.resolve_cte_conflicts(id)?; + self.create_motion_nodes(strategy)?; + let new_child_id = self.get_relational_child(id, 0)?; + let new_child_node = self.get_relation_node(new_child_id)?; + if let Relational::Motion { .. } = new_child_node { + cte_motions.insert(child, new_child_id); + } + } // We don't materialize CTEs with global and single distribution. // So, for global child let's preserve global distribution for CTE. // Otherwise force a single distribution.