From 78bfc3cbc886dbc1cd9116ff38eca5268f6bed1d Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Tue, 18 Oct 2022 18:57:39 +0700
Subject: [PATCH] feat: implement sbroad core functions

Now sbroad has distributed SQL functions in the core crate. At the
momnet there is only a single implementation of the `bucket_id()`
function.
---
 sbroad-benches/src/engine.rs                  | 19 ++++++
 sbroad-cartridge/src/cartridge/config.rs      | 26 ++++++++
 sbroad-cartridge/src/init.lua                 | 27 +-------
 .../test_app/test/integration/api_test.lua    | 21 +++++++
 sbroad-core/src/backend/sql/ir.rs             |  3 +
 sbroad-core/src/backend/sql/tree.rs           | 10 +++
 sbroad-core/src/core.lua                      | 37 +++++++++++
 sbroad-core/src/executor/engine.rs            | 13 ++--
 sbroad-core/src/executor/engine/mock.rs       | 19 ++++++
 sbroad-core/src/executor/tests/bucket_id.rs   | 24 +++++++
 sbroad-core/src/frontend/sql.rs               | 41 +++++++++++-
 sbroad-core/src/frontend/sql/ast.rs           |  4 ++
 sbroad-core/src/frontend/sql/ast/tests.rs     |  2 +-
 sbroad-core/src/frontend/sql/query.pest       |  2 +-
 sbroad-core/src/ir.rs                         |  1 +
 sbroad-core/src/ir/explain.rs                 |  3 +
 sbroad-core/src/ir/expression.rs              | 13 ++++
 sbroad-core/src/ir/function.rs                | 63 +++++++++++++++++++
 sbroad-core/src/ir/tree.rs                    | 13 +++-
 19 files changed, 307 insertions(+), 34 deletions(-)
 create mode 100644 sbroad-core/src/ir/function.rs

diff --git a/sbroad-benches/src/engine.rs b/sbroad-benches/src/engine.rs
index 1e12195fd4..47e5ff30a8 100644
--- a/sbroad-benches/src/engine.rs
+++ b/sbroad-benches/src/engine.rs
@@ -15,6 +15,7 @@ use sbroad::executor::lru::{Cache, LRUCache, DEFAULT_CAPACITY};
 use sbroad::executor::result::ProducerResult;
 use sbroad::executor::vtable::VirtualTable;
 use sbroad::frontend::sql::ast::AbstractSyntaxTree;
+use sbroad::ir::function::Function;
 use sbroad::ir::relation::{Column, ColumnRole, Table, Type};
 use sbroad::ir::value::Value;
 use sbroad::ir::Plan;
@@ -22,6 +23,7 @@ use sbroad::ir::Plan;
 #[allow(clippy::module_name_repetitions)]
 #[derive(Debug, Clone)]
 pub struct RouterConfigurationMock {
+    functions: HashMap<String, Function>,
     tables: HashMap<String, Table>,
     bucket_count: usize,
     sharding_column: String,
@@ -46,6 +48,17 @@ impl CoordinatorMetadata for RouterConfigurationMock {
         }
     }
 
+    fn get_function(&self, fn_name: &str) -> Result<&Function, QueryPlannerError> {
+        let name = normalize_name_from_sql(fn_name);
+        match self.functions.get(&name) {
+            Some(v) => Ok(v),
+            None => Err(QueryPlannerError::CustomError(format!(
+                "Function {} not found",
+                name
+            ))),
+        }
+    }
+
     fn get_exec_waiting_timeout(&self) -> u64 {
         0
     }
@@ -87,6 +100,11 @@ impl RouterConfigurationMock {
     #[allow(clippy::too_many_lines)]
     #[must_use]
     pub fn new() -> Self {
+        let name_bucket_id = normalize_name_from_sql("bucket_id");
+        let fn_bucket_id = Function::new_stable(name_bucket_id.clone());
+        let mut functions = HashMap::new();
+        functions.insert(name_bucket_id, fn_bucket_id);
+
         let mut tables = HashMap::new();
 
         let columns = vec![
@@ -329,6 +347,7 @@ impl RouterConfigurationMock {
         );
 
         RouterConfigurationMock {
+            functions,
             tables,
             bucket_count: 10000,
             sharding_column: "\"bucket_id\"".into(),
diff --git a/sbroad-cartridge/src/cartridge/config.rs b/sbroad-cartridge/src/cartridge/config.rs
index cdfb078896..edca38c933 100644
--- a/sbroad-cartridge/src/cartridge/config.rs
+++ b/sbroad-cartridge/src/cartridge/config.rs
@@ -9,6 +9,7 @@ use sbroad::errors::QueryPlannerError;
 use sbroad::executor::engine::CoordinatorMetadata;
 use sbroad::executor::engine::{normalize_name_from_schema, normalize_name_from_sql};
 use sbroad::executor::lru::DEFAULT_CAPACITY;
+use sbroad::ir::function::Function;
 use sbroad::ir::relation::{Column, ColumnRole, Table, Type};
 use sbroad::{debug, warn};
 
@@ -35,6 +36,9 @@ pub struct RouterConfiguration {
 
     /// IR table segments from the cluster spaces
     tables: HashMap<String, Table>,
+
+    /// IR functions
+    functions: HashMap<String, Function>,
 }
 
 impl Default for RouterConfiguration {
@@ -53,6 +57,7 @@ impl RouterConfiguration {
             jaeger_agent_port: 6831,
             tables: HashMap::new(),
             sharding_column: String::new(),
+            functions: HashMap::new(),
         }
     }
 
@@ -61,6 +66,7 @@ impl RouterConfiguration {
     /// # Errors
     /// Returns `QueryPlannerError` when process was terminated.
     pub fn load_schema(&mut self, s: &str) -> Result<(), QueryPlannerError> {
+        self.init_core_functions();
         if let Ok(docs) = YamlLoader::load_from_str(s) {
             if let Some(schema) = docs.get(0) {
                 self.init_table_segments(schema)?;
@@ -75,6 +81,15 @@ impl RouterConfiguration {
         self.tables.is_empty()
     }
 
+    /// Populate metadata about core functions.
+    /// Core functions should be present in the cluster
+    /// (imported by the core.lua file).
+    fn init_core_functions(&mut self) {
+        let name_bucket_id = normalize_name_from_sql("bucket_id");
+        let fn_bucket_id = Function::new_stable(name_bucket_id.clone());
+        self.functions.insert(name_bucket_id, fn_bucket_id);
+    }
+
     /// Transform space information from schema to table segments
     ///
     /// # Errors
@@ -237,6 +252,17 @@ impl CoordinatorMetadata for RouterConfiguration {
         }
     }
 
+    fn get_function(&self, fn_name: &str) -> Result<&Function, QueryPlannerError> {
+        let name = normalize_name_from_sql(fn_name);
+        match self.functions.get(&name) {
+            Some(v) => Ok(v),
+            None => Err(QueryPlannerError::CustomError(format!(
+                "Function {} not found",
+                name
+            ))),
+        }
+    }
+
     /// Get response waiting timeout for executor
     fn get_exec_waiting_timeout(&self) -> u64 {
         self.waiting_timeout
diff --git a/sbroad-cartridge/src/init.lua b/sbroad-cartridge/src/init.lua
index 7fbf5020ff..a8dbfc5c66 100644
--- a/sbroad-cartridge/src/init.lua
+++ b/sbroad-cartridge/src/init.lua
@@ -1,32 +1,9 @@
-require('sbroad.core')
-
 local cartridge = require('cartridge')
 local checks = require('checks')
+local core = require('sbroad.core')
 
 local function init ()
-    box.schema.func.create(
-        'libsbroad.calculate_bucket_id',
-        { if_not_exists = true, language = 'C' }
-    )
-
-    box.schema.func.create("BUCKET_ID", {
-        language = "Lua",
-        body = [[
-            function(x)
-                return box.func["libsbroad.calculate_bucket_id"]:call({ x })
-            end
-        ]],
-        if_not_exists = true,
-        param_list = {"string"},
-        returns = "unsigned",
-        aggregate = "none",
-        exports = {"SQL"},
-    })
-
-    box.schema.func.create(
-        'libsbroad.init_statistics',
-        { if_not_exists = true, language = 'C' }
-    )
+    core.init()
 end
 
 local function calculate_bucket_id(values, space_name) -- luacheck: no unused args
diff --git a/sbroad-cartridge/test_app/test/integration/api_test.lua b/sbroad-cartridge/test_app/test/integration/api_test.lua
index 7f7aa161ad..02202326d0 100644
--- a/sbroad-cartridge/test_app/test/integration/api_test.lua
+++ b/sbroad-cartridge/test_app/test/integration/api_test.lua
@@ -346,6 +346,27 @@ g.test_bucket_id_in_join = function()
     })
 end
 
+g.test_bucket_id_function = function()
+    local api = cluster:server("api-1").net_box
+
+    local r, err = api:call(
+        "sbroad.execute",
+        {
+            [[SELECT bucket_id('hello') FROM "space_simple_shard_key" WHERE "id" = 10]],
+            {}
+        }
+    )
+    t.assert_equals(err, nil)
+    t.assert_equals(r, {
+        metadata = {
+            {name = "COLUMN_1", type = "unsigned"},
+        },
+        rows = {
+            { 13352 },
+        },
+    })
+end
+
 g.test_uppercase1 = function()
     local api = cluster:server("api-1").net_box
 
diff --git a/sbroad-core/src/backend/sql/ir.rs b/sbroad-core/src/backend/sql/ir.rs
index 7046963270..f4c6703a2f 100644
--- a/sbroad-core/src/backend/sql/ir.rs
+++ b/sbroad-core/src/backend/sql/ir.rs
@@ -232,6 +232,9 @@ impl ExecutionPlan {
                                         sql.push_str(alias);
                                     }
                                 }
+                                Expression::StableFunction { name, .. } => {
+                                    sql.push_str(name.as_str());
+                                }
                             },
                         }
                     }
diff --git a/sbroad-core/src/backend/sql/tree.rs b/sbroad-core/src/backend/sql/tree.rs
index ab0e46f47b..06bb45a10f 100644
--- a/sbroad-core/src/backend/sql/tree.rs
+++ b/sbroad-core/src/backend/sql/tree.rs
@@ -744,6 +744,16 @@ impl<'p> SyntaxPlan<'p> {
                     );
                     Ok(self.nodes.push_syntax_node(sn))
                 }
+                Expression::StableFunction { children, .. } => {
+                    let mut nodes: Vec<usize> =
+                        vec![self.nodes.push_syntax_node(SyntaxNode::new_open())];
+                    for child in children {
+                        nodes.push(self.nodes.get_syntax_node_id(*child)?);
+                    }
+                    nodes.push(self.nodes.push_syntax_node(SyntaxNode::new_close()));
+                    let sn = SyntaxNode::new_pointer(id, None, nodes);
+                    Ok(self.nodes.push_syntax_node(sn))
+                }
             },
         }
     }
diff --git a/sbroad-core/src/core.lua b/sbroad-core/src/core.lua
index 1f0f09c69c..6ae870aba0 100644
--- a/sbroad-core/src/core.lua
+++ b/sbroad-core/src/core.lua
@@ -2,3 +2,40 @@ _G.fiber_id = function ()
     local fiber = require('fiber')
     return fiber.id()
 end
+
+local function init_bucket_id()
+    box.schema.func.create(
+        'libsbroad.calculate_bucket_id',
+        { if_not_exists = true, language = 'C' }
+    )
+
+    box.schema.func.create('BUCKET_ID', {
+        language = 'Lua',
+        body = [[
+            function(x)
+                return box.func['libsbroad.calculate_bucket_id']:call({ x })
+            end
+        ]],
+        if_not_exists = true,
+        param_list = {'string'},
+        returns = 'unsigned',
+        aggregate = 'none',
+        exports = {'SQL'},
+    })
+end
+
+local function init_statistics()
+    box.schema.func.create(
+        'libsbroad.init_statistics',
+        { if_not_exists = true, language = 'C' }
+    )
+end
+
+local function init()
+    init_bucket_id()
+    init_statistics()
+end
+
+return {
+    init = init,
+}
diff --git a/sbroad-core/src/executor/engine.rs b/sbroad-core/src/executor/engine.rs
index d88ed9c51d..51ac6f8f2a 100644
--- a/sbroad-core/src/executor/engine.rs
+++ b/sbroad-core/src/executor/engine.rs
@@ -11,6 +11,8 @@ use crate::errors::QueryPlannerError;
 use crate::executor::bucket::Buckets;
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::vtable::VirtualTable;
+use crate::ir::function::Function;
+use crate::ir::relation::Table;
 use crate::ir::value::Value;
 
 /// A metadata storage trait of the cluster.
@@ -22,10 +24,13 @@ pub trait CoordinatorMetadata {
     ///
     /// # Errors
     /// - Failed to get table by name from the metadata.
-    fn get_table_segment(
-        &self,
-        table_name: &str,
-    ) -> Result<crate::ir::relation::Table, QueryPlannerError>;
+    fn get_table_segment(&self, table_name: &str) -> Result<Table, QueryPlannerError>;
+
+    /// Lookup for a function in the metadata cache.
+    ///
+    /// # Errors
+    /// - Failed to get function by name from the metadata.
+    fn get_function(&self, fn_name: &str) -> Result<&Function, QueryPlannerError>;
 
     fn get_exec_waiting_timeout(&self) -> u64;
 
diff --git a/sbroad-core/src/executor/engine/mock.rs b/sbroad-core/src/executor/engine/mock.rs
index 7e72e75959..1b9496617b 100644
--- a/sbroad-core/src/executor/engine/mock.rs
+++ b/sbroad-core/src/executor/engine/mock.rs
@@ -17,6 +17,7 @@ use crate::executor::result::ProducerResult;
 use crate::executor::vtable::VirtualTable;
 use crate::executor::{Cache, CoordinatorMetadata};
 use crate::frontend::sql::ast::AbstractSyntaxTree;
+use crate::ir::function::Function;
 use crate::ir::helpers::RepeatableState;
 use crate::ir::relation::{Column, ColumnRole, Table, Type};
 use crate::ir::value::Value;
@@ -25,6 +26,7 @@ use crate::ir::Plan;
 #[allow(clippy::module_name_repetitions)]
 #[derive(Debug, Clone)]
 pub struct RouterConfigurationMock {
+    functions: HashMap<String, Function>,
     tables: HashMap<String, Table>,
     bucket_count: usize,
     sharding_column: String,
@@ -42,6 +44,17 @@ impl CoordinatorMetadata for RouterConfigurationMock {
         }
     }
 
+    fn get_function(&self, fn_name: &str) -> Result<&Function, QueryPlannerError> {
+        let name = normalize_name_from_sql(fn_name);
+        match self.functions.get(&name) {
+            Some(v) => Ok(v),
+            None => Err(QueryPlannerError::CustomError(format!(
+                "Function {} not found",
+                name
+            ))),
+        }
+    }
+
     fn get_exec_waiting_timeout(&self) -> u64 {
         0
     }
@@ -83,6 +96,11 @@ impl RouterConfigurationMock {
     #[allow(clippy::too_many_lines)]
     #[must_use]
     pub fn new() -> Self {
+        let name_bucket_id = normalize_name_from_sql("bucket_id");
+        let fn_bucket_id = Function::new_stable(name_bucket_id.clone());
+        let mut functions = HashMap::new();
+        functions.insert(name_bucket_id, fn_bucket_id);
+
         let mut tables = HashMap::new();
 
         let columns = vec![
@@ -181,6 +199,7 @@ impl RouterConfigurationMock {
         );
 
         RouterConfigurationMock {
+            functions,
             tables,
             bucket_count: 10000,
             sharding_column: "\"bucket_id\"".into(),
diff --git a/sbroad-core/src/executor/tests/bucket_id.rs b/sbroad-core/src/executor/tests/bucket_id.rs
index 3b1124ad56..17abd75618 100644
--- a/sbroad-core/src/executor/tests/bucket_id.rs
+++ b/sbroad-core/src/executor/tests/bucket_id.rs
@@ -63,6 +63,30 @@ fn bucket2_test() {
     assert_eq!(expected, result);
 }
 
+#[test]
+fn bucket3_test() {
+    let sql = r#"SELECT *, bucket_id('111') FROM "t1""#;
+    let coordinator = RouterRuntimeMock::new();
+
+    let mut query = Query::new(&coordinator, sql, vec![]).unwrap();
+    let result = *query
+        .dispatch()
+        .unwrap()
+        .downcast::<ProducerResult>()
+        .unwrap();
+
+    let mut expected = ProducerResult::new();
+
+    expected.rows.push(vec![
+        Value::String("Execute query on all buckets".to_string()),
+        Value::String(String::from(PatternWithParams::new(
+            r#"SELECT "t1"."a", "t1"."b", "BUCKET_ID" (?) as "COLUMN_1" FROM "t1""#.to_string(),
+            vec![Value::from("111".to_string())],
+        ))),
+    ]);
+    assert_eq!(expected, result);
+}
+
 #[test]
 fn sharding_keys_from_tuple1() {
     let coordinator = RouterRuntimeMock::new();
diff --git a/sbroad-core/src/frontend/sql.rs b/sbroad-core/src/frontend/sql.rs
index 9b1f2848e7..1a55183f4d 100644
--- a/sbroad-core/src/frontend/sql.rs
+++ b/sbroad-core/src/frontend/sql.rs
@@ -537,6 +537,35 @@ impl Ast for AbstractSyntaxTree {
                     let plan_child_id = map.get(*ast_child_id)?;
                     map.add(*id, plan_child_id);
                 }
+                Type::Function => {
+                    if let Some((first, other)) = node.children.split_first() {
+                        let mut plan_arg_list = Vec::new();
+                        for ast_child_id in other {
+                            let plan_child_id = map.get(*ast_child_id)?;
+                            plan_arg_list.push(plan_child_id);
+                        }
+                        let function_name =
+                            self.nodes.get_node(*first)?.value.as_ref().ok_or_else(|| {
+                                QueryPlannerError::CustomError("Function name is not found.".into())
+                            })?;
+                        let func = metadata.get_function(function_name)?;
+                        if func.is_stable() {
+                            let plan_func_id = plan.add_stable_function(func, plan_arg_list)?;
+                            map.add(*id, plan_func_id);
+                        } else {
+                            // At the moment we don't support any non-stable functions.
+                            // Later this code block should handle other function behaviors.
+                            return Err(QueryPlannerError::CustomError(format!(
+                                "Function {} is not stable.",
+                                function_name
+                            )));
+                        }
+                    } else {
+                        return Err(QueryPlannerError::CustomError(
+                            "Function has no children.".into(),
+                        ));
+                    }
+                }
                 Type::InnerJoin => {
                     let ast_left_id = node.children.first().ok_or_else(|| {
                         QueryPlannerError::CustomError(
@@ -739,6 +768,7 @@ impl Ast for AbstractSyntaxTree {
                 }
                 Type::AliasName
                 | Type::ColumnName
+                | Type::FunctionName
                 | Type::ScanName
                 | Type::Select
                 | Type::SubQueryName
@@ -861,7 +891,10 @@ impl Plan {
                             }
                         }
                     }
-                    Expression::Row { ref list, .. } => {
+                    Expression::Row { ref list, .. }
+                    | Expression::StableFunction {
+                        children: ref list, ..
+                    } => {
                         for param_id in list {
                             if param_set.contains(param_id) {
                                 idx -= 1;
@@ -935,7 +968,11 @@ impl Plan {
                             }
                         }
                     }
-                    Expression::Row { ref mut list, .. } => {
+                    Expression::Row { ref mut list, .. }
+                    | Expression::StableFunction {
+                        children: ref mut list,
+                        ..
+                    } => {
                         for param_id in list {
                             if param_set.contains(param_id) {
                                 idx -= 1;
diff --git a/sbroad-core/src/frontend/sql/ast.rs b/sbroad-core/src/frontend/sql/ast.rs
index 573464d87d..83122b2e19 100644
--- a/sbroad-core/src/frontend/sql/ast.rs
+++ b/sbroad-core/src/frontend/sql/ast.rs
@@ -38,6 +38,8 @@ pub enum Type {
     Except,
     Explain,
     False,
+    Function,
+    FunctionName,
     Gt,
     GtEq,
     In,
@@ -94,6 +96,8 @@ impl Type {
             Rule::Except => Ok(Type::Except),
             Rule::Explain => Ok(Type::Explain),
             Rule::False => Ok(Type::False),
+            Rule::Function => Ok(Type::Function),
+            Rule::FunctionName => Ok(Type::FunctionName),
             Rule::Gt => Ok(Type::Gt),
             Rule::GtEq => Ok(Type::GtEq),
             Rule::In => Ok(Type::In),
diff --git a/sbroad-core/src/frontend/sql/ast/tests.rs b/sbroad-core/src/frontend/sql/ast/tests.rs
index 986040efc6..d4b648cff3 100644
--- a/sbroad-core/src/frontend/sql/ast/tests.rs
+++ b/sbroad-core/src/frontend/sql/ast/tests.rs
@@ -152,7 +152,7 @@ fn invalid_query() {
 1 | select a frAm t
   |        ^---
   |
-  = expected Alias, Asterisk, True, False, Null, Decimal, Double, Integer, Unsigned, Row, or Parameter"#,
+  = expected Alias, Asterisk, Function, True, False, Null, Decimal, Double, Integer, Unsigned, Row, or Parameter"#,
         ),
         format!("{}", ast),
     );
diff --git a/sbroad-core/src/frontend/sql/query.pest b/sbroad-core/src/frontend/sql/query.pest
index 68446f4ff6..44f41c8f6c 100644
--- a/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad-core/src/frontend/sql/query.pest
@@ -75,7 +75,7 @@ Expr = _{ Or | And | Unary | Between | Cmp | Primary | Parentheses }
 
 Function = { FunctionName ~ ("(" ~ FunctionArgs ~ ")") }
     FunctionName = @{ Name }
-    FunctionArgs = { (Expr ~ ("," ~ Expr)*)? }
+    FunctionArgs = _{ (Expr ~ ("," ~ Expr)*)? }
 
 
 NameString = @{ !(WHITESPACE* ~ Keyword ~ WHITESPACE) ~ ('А' .. 'Я' | 'а' .. 'я' | 'A' .. 'Z' | 'a'..'z' | "-" | "_" | ASCII_DIGIT)+ }
diff --git a/sbroad-core/src/ir.rs b/sbroad-core/src/ir.rs
index b77a02c444..80d8ff0648 100644
--- a/sbroad-core/src/ir.rs
+++ b/sbroad-core/src/ir.rs
@@ -15,6 +15,7 @@ use crate::ir::value::Value;
 
 pub mod distribution;
 pub mod expression;
+pub mod function;
 pub mod helpers;
 pub mod operator;
 pub mod relation;
diff --git a/sbroad-core/src/ir/explain.rs b/sbroad-core/src/ir/explain.rs
index 98efad25fb..63e1848d20 100644
--- a/sbroad-core/src/ir/explain.rs
+++ b/sbroad-core/src/ir/explain.rs
@@ -41,6 +41,7 @@ impl Col {
                     column.alias = Some(name.to_string());
                 }
                 Expression::Bool { .. }
+                | Expression::StableFunction { .. }
                 | Expression::Row { .. }
                 | Expression::Constant { .. }
                 | Expression::Unary { .. } => {
@@ -222,6 +223,7 @@ impl Row {
                     row.add_col(RowVal::Const(value.clone()));
                 }
                 Expression::Bool { .. }
+                | Expression::StableFunction { .. }
                 | Expression::Row { .. }
                 | Expression::Alias { .. }
                 | Expression::Unary { .. } => {
@@ -306,6 +308,7 @@ impl Selection {
                 Selection::Row(row)
             }
             Expression::Reference { .. }
+            | Expression::StableFunction { .. }
             | Expression::Constant { .. }
             | Expression::Alias { .. } => {
                 return Err(QueryPlannerError::CustomError(
diff --git a/sbroad-core/src/ir/expression.rs b/sbroad-core/src/ir/expression.rs
index 8580e523bf..8c36d26cd9 100644
--- a/sbroad-core/src/ir/expression.rs
+++ b/sbroad-core/src/ir/expression.rs
@@ -90,6 +90,19 @@ pub enum Expression {
         /// of the last "add Motion" transformation.
         distribution: Option<Distribution>,
     },
+    /// Stable function cannot modify the database and
+    /// is guaranteed to return the same results given
+    /// the same arguments for all rows within a single
+    /// statement.
+    ///
+    /// Example: `bucket_id("1")` (the number of buckets can be
+    /// changed only after restarting the cluster).
+    StableFunction {
+        /// Function name.
+        name: String,
+        /// Function arguments.
+        children: Vec<usize>,
+    },
     /// Unary expression returning boolean result.
     Unary {
         /// Unary operator.
diff --git a/sbroad-core/src/ir/function.rs b/sbroad-core/src/ir/function.rs
new file mode 100644
index 0000000000..d45bbea7ad
--- /dev/null
+++ b/sbroad-core/src/ir/function.rs
@@ -0,0 +1,63 @@
+use crate::errors::QueryPlannerError;
+use crate::ir::expression::Expression;
+use crate::ir::{Node, Plan};
+use serde::{Deserialize, Serialize};
+
+#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
+pub enum Behavior {
+    /// The function is a stable function, it does not have any side effects.
+    /// It cannot modify the database, and that within a single table scan it
+    /// will consistently return the same result for the same argument values,
+    /// but that its result could change across SQL statements.
+    /// This type of functions can be executed on any node.
+    Stable,
+}
+
+#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq)]
+pub struct Function {
+    pub name: String,
+    pub behavior: Behavior,
+}
+
+impl Function {
+    #[must_use]
+    pub fn new(name: String, behavior: Behavior) -> Self {
+        Self { name, behavior }
+    }
+
+    #[must_use]
+    pub fn new_stable(name: String) -> Self {
+        Self::new(name, Behavior::Stable)
+    }
+
+    #[must_use]
+    pub fn is_stable(&self) -> bool {
+        matches!(self.behavior, Behavior::Stable)
+    }
+}
+
+impl Plan {
+    /// Adds a stable function to the plan.
+    ///
+    /// # Errors
+    /// - Function is not stable.
+    /// - Function is not found in the plan.
+    pub fn add_stable_function(
+        &mut self,
+        function: &Function,
+        children: Vec<usize>,
+    ) -> Result<usize, QueryPlannerError> {
+        if !function.is_stable() {
+            return Err(QueryPlannerError::CustomError(format!(
+                "Function {} is not stable",
+                function.name
+            )));
+        }
+        let func_expr = Expression::StableFunction {
+            name: function.name.to_string(),
+            children,
+        };
+        let func_id = self.nodes.push(Node::Expression(func_expr));
+        Ok(func_id)
+    }
+}
diff --git a/sbroad-core/src/ir/tree.rs b/sbroad-core/src/ir/tree.rs
index 75778493b9..ac51f8f164 100644
--- a/sbroad-core/src/ir/tree.rs
+++ b/sbroad-core/src/ir/tree.rs
@@ -168,6 +168,16 @@ impl<'n> Iterator for ExpressionIterator<'n> {
 
                 None
             }
+            Some(Node::Expression(Expression::StableFunction { children, .. })) => {
+                let child_step = *self.child.borrow();
+                match children.get(child_step) {
+                    None => None,
+                    Some(child) => {
+                        *self.child.borrow_mut() += 1;
+                        Some(child)
+                    }
+                }
+            }
             Some(
                 Node::Expression(Expression::Constant { .. } | Expression::Reference { .. })
                 | Node::Relational(_)
@@ -293,7 +303,8 @@ impl<'p> Iterator for SubtreeIterator<'p> {
                         }
                         None
                     }
-                    Expression::Row { list, .. } => {
+                    Expression::Row { list, .. }
+                    | Expression::StableFunction { children: list, .. } => {
                         let child_step = *self.child.borrow();
                         return match list.get(child_step) {
                             None => None,
-- 
GitLab