From 92d742635b0d6d51ceec1b54b77177f64b8273a1 Mon Sep 17 00:00:00 2001
From: Kurdakov Alexander <kusancho12@gmail.com>
Date: Fri, 31 May 2024 03:24:46 +0300
Subject: [PATCH]  feat: tier support in sql

---
 doc/sql/query.ebnf                      |  2 +-
 sbroad-core/src/frontend/sql.rs         | 84 ++++++++++++++++++++-----
 sbroad-core/src/frontend/sql/query.pest |  3 +-
 sbroad-core/src/ir/ddl.rs               |  6 ++
 sbroad-core/src/ir/relation.rs          | 23 +++++++
 5 files changed, 99 insertions(+), 19 deletions(-)

diff --git a/doc/sql/query.ebnf b/doc/sql/query.ebnf
index 5b0276c5ee..012801f3a8 100644
--- a/doc/sql/query.ebnf
+++ b/doc/sql/query.ebnf
@@ -147,7 +147,7 @@ create_table   ::= 'CREATE' 'TABLE' table
                        'PRIMARY KEY' '(' column (',' column)* ')'
                    ')'
                    ('USING' ('MEMTX' | 'VINYL'))?
-                   ('DISTRIBUTED' (('BY' '(' column (',' column)*  ')') | 'GLOBALLY'))
+                   ('DISTRIBUTED' (('BY' '(' column (',' column)*  ')' ('ON' 'TIER' '(' tier ')')?) | 'GLOBALLY'))
 create_user    ::= 'CREATE' 'USER' user 'WITH'? 'PASSWORD' "'" password "'"
                    ('USING' ('CHAP-SHA1' | 'LDAP' | 'MD5'))?
 alter_user     ::= 'ALTER' 'USER' user
diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs
index 07fb5353b2..9a145f08f2 100644
--- a/sbroad-core/src/frontend/sql.rs
+++ b/sbroad-core/src/frontend/sql.rs
@@ -5,6 +5,7 @@
 
 use ahash::AHashMap;
 use core::panic;
+use itertools::Itertools;
 use pest::iterators::{Pair, Pairs};
 use pest::pratt_parser::PrattParser;
 use pest::Parser;
@@ -36,7 +37,7 @@ use crate::ir::operator::{
     Arithmetic, Bool, ConflictStrategy, JoinKind, OrderByElement, OrderByEntity, OrderByType,
     Relational, Unary,
 };
-use crate::ir::relation::{Column, ColumnRole, Type as RelationType};
+use crate::ir::relation::{Column, ColumnRole, TableKind, Type as RelationType};
 use crate::ir::tree::traversal::{PostOrder, EXPR_CAPACITY};
 use crate::ir::value::Value;
 use crate::ir::{Node, NodeId, OptionKind, OptionParamValue, OptionSpec, Plan};
@@ -504,6 +505,7 @@ fn parse_create_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<Ddl,
     let mut shard_key: Vec<SmolStr> = Vec::new();
     let mut engine_type: SpaceEngineType = SpaceEngineType::default();
     let mut timeout = get_default_timeout();
+    let mut tier = None;
     for child_id in &node.children {
         let child_node = ast.nodes.get_node(*child_id)?;
         match child_node.rule {
@@ -683,33 +685,63 @@ fn parse_create_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<Ddl,
                     match distribution_type_node.rule {
                         Rule::Global => {}
                         Rule::Sharding => {
-                            let shard_node = ast.nodes.get_node(*distribution_type_id)?;
-                            for shard_col_id in &shard_node.children {
-                                let shard_col_name = parse_identifier(ast, *shard_col_id)?;
+                            let sharding_node = ast.nodes.get_node(*distribution_type_id)?;
+                            for sharding_node_child in &sharding_node.children {
+                                let shard_child = ast.nodes.get_node(*sharding_node_child)?;
+                                match shard_child.rule {
+                                    Rule::Tier => {
+                                        let Some(tier_node_id) = shard_child.children.first()
+                                        else {
+                                            return Err(SbroadError::Invalid(
+                                                Entity::Node,
+                                                Some(format_smolstr!(
+                                                    "AST table tier node {:?} contains unexpected children",
+                                                    distribution_type_node,
+                                                )),
+                                            ));
+                                        };
+
+                                        let tier_name = parse_identifier(ast, *tier_node_id)?;
+                                        tier = Some(tier_name);
+                                    }
+                                    Rule::Identifier => {
+                                        let shard_col_name =
+                                            parse_identifier(ast, *sharding_node_child)?;
 
-                                let column_found =
-                                    columns.iter().find(|c| c.name == shard_col_name);
-                                if column_found.is_none() {
-                                    return Err(SbroadError::Invalid(
-                                        Entity::Column,
-                                        Some(format_smolstr!(
+                                        let column_found =
+                                            columns.iter().find(|c| c.name == shard_col_name);
+                                        if column_found.is_none() {
+                                            return Err(SbroadError::Invalid(
+                                                Entity::Column,
+                                                Some(format_smolstr!(
                                             "Sharding key column {shard_col_name} not found."
                                         )),
-                                    ));
-                                }
+                                            ));
+                                        }
 
-                                if let Some(column) = column_found {
-                                    if !column.data_type.is_scalar() {
-                                        return Err(SbroadError::Invalid(
+                                        if let Some(column) = column_found {
+                                            if !column.data_type.is_scalar() {
+                                                return Err(SbroadError::Invalid(
                                             Entity::Column,
                                             Some(format_smolstr!(
                                                 "Sharding key column {shard_col_name} is not of scalar type."
                                             )),
                                         ));
+                                            }
+                                        }
+
+                                        shard_key.push(shard_col_name);
+                                    }
+                                    _ => {
+                                        return Err(SbroadError::Invalid(
+                                            Entity::Node,
+                                            Some(format_smolstr!(
+                                                "AST table sharding node {:?} contains unexpected children",
+                                                distribution_type_node,
+                                            )),
+                                        ));
                                     }
                                 }
-
-                                shard_key.push(shard_col_name);
                             }
                         }
                         _ => {
@@ -760,6 +792,7 @@ fn parse_create_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<Ddl,
             sharding_key: None,
             engine_type,
             timeout,
+            tier,
         }
     } else {
         Ddl::CreateTable {
@@ -769,6 +802,7 @@ fn parse_create_table(ast: &AbstractSyntaxTree, node: &ParseNode) -> Result<Ddl,
             sharding_key: Some(shard_key),
             engine_type,
             timeout,
+            tier,
         }
     };
     Ok(create_sharded_table)
@@ -3470,6 +3504,22 @@ impl AbstractSyntaxTree {
                 SbroadError::Invalid(Entity::AST, Some("no top in AST".into()))
             })?)?;
         plan.set_top(plan_top_id)?;
+
+        // check that all tables from query from one tier. Ignore global tables.
+        if !plan
+            .relations
+            .tables
+            .iter()
+            .filter(|(_, table)| matches!(table.kind, TableKind::ShardedSpace { .. }))
+            .map(|(_, table)| table.tier.as_ref())
+            .all_equal()
+        {
+            return Err(SbroadError::Invalid(
+                Entity::Query,
+                Some("Query cannot use tables from different tiers".into()),
+            ));
+        }
+
         let replaces = plan.replace_sq_with_references()?;
         plan.fix_betweens(&worker.betweens, &replaces)?;
         Ok(plan)
diff --git a/sbroad-core/src/frontend/sql/query.pest b/sbroad-core/src/frontend/sql/query.pest
index 3340855810..15e304c39c 100644
--- a/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad-core/src/frontend/sql/query.pest
@@ -74,7 +74,8 @@ DDL = _{ CreateTable | DropTable | CreateIndex | DropIndex | CreateProc | DropPr
             Vinyl = { ^"vinyl" }
         Distribution = { ^"distributed" ~ (Global | Sharding) }
         Global = { ^"globally" }
-        Sharding = { ^"by" ~ "(" ~ Identifier ~ ("," ~ Identifier)* ~ ")"}
+        Sharding = { ^"by" ~ "(" ~ Identifier ~ ("," ~ Identifier)* ~ ")" ~ Tier? }
+        Tier = { ^"on" ~ ^"tier" ~ "(" ~ Identifier ~ ")" }
     DropTable = { ^"drop" ~ ^"table" ~ Table ~ TimeoutOption? }
 
     CreateProc = {
diff --git a/sbroad-core/src/ir/ddl.rs b/sbroad-core/src/ir/ddl.rs
index 05fdda993a..5eb1635e0b 100644
--- a/sbroad-core/src/ir/ddl.rs
+++ b/sbroad-core/src/ir/ddl.rs
@@ -49,6 +49,12 @@ pub enum Ddl {
         /// Vinyl is supported only for sharded tables.
         engine_type: SpaceEngineType,
         timeout: Decimal,
+        /// Shows which tier the sharded table belongs to.
+        /// Field has value, only if it was specified in [ON TIER] part of CREATE TABLE statement.
+        /// Field is None, if:
+        /// 1) Global table.
+        /// 2) Sharded table without [ON TIER] part. In this case picodata will use default tier.
+        tier: Option<SmolStr>,
     },
     DropTable {
         name: SmolStr,
diff --git a/sbroad-core/src/ir/relation.rs b/sbroad-core/src/ir/relation.rs
index 54b9d25e18..8ca0574d01 100644
--- a/sbroad-core/src/ir/relation.rs
+++ b/sbroad-core/src/ir/relation.rs
@@ -565,6 +565,7 @@ pub struct Table {
     /// Unique table name.
     pub name: SmolStr,
     pub kind: TableKind,
+    pub tier: Option<SmolStr>,
 }
 
 impl Table {
@@ -573,6 +574,25 @@ impl Table {
         &self.name
     }
 
+    /// Constructor for sharded table in specified tier.
+    ///
+    /// # Errors
+    /// - column names are duplicated;
+    /// - primary key is not found among the columns;
+    /// - sharding key is not found among the columns;
+    pub fn new_sharded_in_tier(
+        name: &str,
+        columns: Vec<Column>,
+        sharding_key: &[&str],
+        primary_key: &[&str],
+        engine: SpaceEngine,
+        tier: Option<SmolStr>,
+    ) -> Result<Self, SbroadError> {
+        let mut table = Self::new_sharded(name, columns, sharding_key, primary_key, engine)?;
+        table.tier = tier;
+        Ok(table)
+    }
+
     /// Sharded table constructor.
     ///
     /// # Errors
@@ -594,6 +614,7 @@ impl Table {
             columns,
             primary_key,
             kind,
+            tier: None,
         })
     }
 
@@ -614,6 +635,7 @@ impl Table {
             columns,
             primary_key,
             kind,
+            tier: None,
         })
     }
 
@@ -634,6 +656,7 @@ impl Table {
             columns,
             primary_key,
             kind,
+            tier: None,
         })
     }
 
-- 
GitLab