From a3280960f78b736c1d24984df625d96d29c5cd73 Mon Sep 17 00:00:00 2001
From: Erik Khamitov <e.khamitov@picodata.io>
Date: Thu, 23 Jan 2025 17:06:19 +0300
Subject: [PATCH] feat(sql): add support for substring

---
 CHANGELOG.md                                  |   4 +
 Cargo.lock                                    |   1 +
 Cargo.toml                                    |   1 +
 sbroad/sbroad-core/src/builtins.lua           |  16 +-
 sbroad/sbroad-core/src/executor/engine.rs     |   1 +
 sbroad/sbroad-core/src/frontend/sql.rs        | 410 +++++++++++++++++-
 .../sbroad-core/src/frontend/sql/query.pest   |  14 +-
 src/lib.rs                                    |  41 ++
 test/int/test_sql.py                          | 131 ++++++
 9 files changed, 614 insertions(+), 5 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index a086537f3b..2890453fd2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -115,6 +115,10 @@ to 2 and 3.
 - `DROP PLUGIN` now leaves the plugin's data in the database if `WITH DATA`
   wasn't specified. Previously we would return an error instead.
 
+### SQL
+
+- SQL support `SUBSTRING` function
+
 ## [24.6.1] - 2024-10-28
 
 ### Configuration
diff --git a/Cargo.lock b/Cargo.lock
index 6254892d90..4582c359f3 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2689,6 +2689,7 @@ dependencies = [
  "quote 1.0.37",
  "raft",
  "rand",
+ "regex",
  "rmp",
  "rmp-serde",
  "rmpv",
diff --git a/Cargo.toml b/Cargo.toml
index eabd471b64..11e47f4a7b 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -57,6 +57,7 @@ pico_proc_macro = { path = "pico_proc_macro" }
 picodata-plugin = { path = "picodata-plugin", features = ["internal_test"] }
 sbroad-core = { path = "sbroad/sbroad-core" }
 tarantool = { path = "tarantool/tarantool", features = ["picodata", "test", "stored_procs_slice"] }
+regex = "1.11"
 
 [dev-dependencies]
 pretty_assertions = "1.4"
diff --git a/sbroad/sbroad-core/src/builtins.lua b/sbroad/sbroad-core/src/builtins.lua
index d2411fb2ef..78fef7f8e5 100644
--- a/sbroad/sbroad-core/src/builtins.lua
+++ b/sbroad/sbroad-core/src/builtins.lua
@@ -1,7 +1,7 @@
 local dt = require('datetime')
 local helper = require('sbroad.helper')
 
--- Builtin sbroad funcs inplemented in LUA
+-- Builtin sbroad funcs implemented in LUA
 local builtins = {}
 
 builtins.TO_DATE = function (s, fmt)
@@ -67,8 +67,20 @@ local function init()
       is_deterministic = true,
       if_not_exists=true
   })
+
+  body = string.format("function(...) return %s.builtins.SUBSTRING(...) end",
+  module)
+  box.schema.func.create("substring", {
+      language = 'LUA',
+      returns = 'string',
+      body = body,
+      param_list = {'string', 'string'},
+      exports = {'SQL'},
+      is_deterministic = true,
+      if_not_exists=true
+  })
 end
 
 return {
   init = init,
-}
+}
\ No newline at end of file
diff --git a/sbroad/sbroad-core/src/executor/engine.rs b/sbroad/sbroad-core/src/executor/engine.rs
index e03ea1b1f4..e437e8f2ce 100644
--- a/sbroad/sbroad-core/src/executor/engine.rs
+++ b/sbroad/sbroad-core/src/executor/engine.rs
@@ -80,6 +80,7 @@ pub fn get_builtin_functions() -> &'static [Function] {
             vec![
                 Function::new_stable("to_date".into(), Type::Datetime, false),
                 Function::new_stable("to_char".into(), Type::String, false),
+                Function::new_stable("substring".into(), Type::String, false),
                 Function::new_stable("substr".into(), Type::String, true),
                 Function::new_stable("lower".into(), Type::String, true),
                 Function::new_stable("upper".into(), Type::String, true),
diff --git a/sbroad/sbroad-core/src/frontend/sql.rs b/sbroad/sbroad-core/src/frontend/sql.rs
index c8ead10309..883c3d6804 100644
--- a/sbroad/sbroad-core/src/frontend/sql.rs
+++ b/sbroad/sbroad-core/src/frontend/sql.rs
@@ -6,7 +6,7 @@
 use crate::ir::node::ddl::DdlOwned;
 use crate::ir::node::deallocate::Deallocate;
 use crate::ir::node::tcl::Tcl;
-use crate::ir::node::{Alias, LocalTimestamp, Reference, ReferenceAsteriskSource};
+use crate::ir::node::{Alias, LocalTimestamp, Node64, Reference, ReferenceAsteriskSource};
 use crate::ir::relation::Type;
 use ahash::{AHashMap, AHashSet};
 use core::panic;
@@ -2426,6 +2426,413 @@ fn cast_type_from_pair(type_pair: Pair<Rule>) -> Result<CastType, SbroadError> {
     Ok(type_cast)
 }
 
+fn similar_escape_internal(pat_text: &str, esc_text: &str) -> Result<String, SbroadError> {
+    let escape = match esc_text {
+        e if e.len() == 1 => e,
+        _ => {
+            return Err(SbroadError::ParsingError(
+                Entity::Expression,
+                "invalid escape string. Escape string must be empty or one character.".into(),
+            ))
+        }
+    };
+
+    let mut result = String::from("^(?:");
+    let mut after_escape = false;
+    let mut in_char_class = false;
+    let mut nquotes = 0;
+
+    let chars = pat_text.chars().peekable();
+
+    for c in chars {
+        if after_escape {
+            if c == '"' && !in_char_class {
+                // escape-double-quote?
+                if nquotes == 0 {
+                    // First quote: end first part, make it non-greedy
+                    result.push_str("){1,1}?");
+                    result.push('(');
+                } else if nquotes == 1 {
+                    // Second quote: end second part, make it greedy
+                    result.push_str("){1,1}(");
+                    result.push_str("?:");
+                } else {
+                    return Err(SbroadError::ParsingError(Entity::Expression, "SQL regular expression may not contain more than two escape-double-quote separators".into()));
+                }
+                nquotes += 1;
+            } else {
+                // Escape any character
+                result.push('\\');
+                result.push(c);
+            }
+            after_escape = false;
+        } else if c.to_string() == escape {
+            // SQL escape character; do not send to output
+            after_escape = true;
+        } else if in_char_class {
+            if c == '\\' {
+                result.push('\\');
+            }
+            result.push(c);
+            if c == ']' {
+                in_char_class = false;
+            }
+        } else {
+            match c {
+                '[' => {
+                    result.push(c);
+                    in_char_class = true;
+                }
+                '%' => {
+                    result.push_str(".*");
+                }
+                '_' => {
+                    result.push('.');
+                }
+                '(' => {
+                    result.push_str("(?:");
+                }
+                '\\' | '.' | '^' | '$' => {
+                    result.push('\\');
+                    result.push(c);
+                }
+                _ => {
+                    result.push(c);
+                }
+            }
+        }
+    }
+
+    result.push_str(")$");
+    Ok(result)
+}
+
+fn parse_substring<M: Metadata>(
+    pair: Pair<Rule>,
+    referred_relation_ids: &[NodeId],
+    worker: &mut ExpressionsWorker<M>,
+    plan: &mut Plan,
+) -> Result<ParseExpression, SbroadError> {
+    assert_eq!(pair.as_rule(), Rule::Substring);
+
+    let mut inner = pair.into_inner();
+    let variant = inner.next().ok_or_else(|| {
+        SbroadError::ParsingError(Entity::Expression, "no substring variant".into())
+    })?;
+
+    match variant.as_rule() {
+        Rule::SubstringFromFor | Rule::SubstringRegular => {
+            // Handle: substring(expr FROM expr FOR expr) and substring(expr, expr, expr)
+            let mut pieces = variant.into_inner();
+            let string_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            let from_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            let for_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            // Check if second number is negative
+            if is_negative_number(plan, &for_expr)? {
+                return Err(SbroadError::Invalid(
+                    Entity::Expression,
+                    Some("Length parameter in substring cannot be negative".into()),
+                ));
+            }
+            // Check if from_expr and for_expr are numeric literals
+            let (is_numeric, use_one) = match (&from_expr, &for_expr) {
+                (
+                    ParseExpression::PlanId { plan_id: from_id },
+                    ParseExpression::PlanId { plan_id: for_id },
+                ) => {
+                    let from_is_negative = is_negative_number(plan, &from_expr)?;
+                    let is_numeric = matches!(
+                        plan.get_node(*from_id)?,
+                        Node::Expression(Expression::Constant(Constant {
+                            value: Value::Integer(_) | Value::Unsigned(_)
+                        }))
+                    ) && matches!(
+                        plan.get_node(*for_id)?,
+                        Node::Expression(Expression::Constant(Constant {
+                            value: Value::Integer(_) | Value::Unsigned(_)
+                        }))
+                    );
+                    (is_numeric, from_is_negative)
+                }
+                _ => (false, false),
+            };
+            if is_numeric {
+                let args = if use_one {
+                    let one_literal = plan.add_const(Value::Unsigned(1));
+                    vec![
+                        string_expr,
+                        ParseExpression::PlanId {
+                            plan_id: one_literal,
+                        },
+                        for_expr,
+                    ]
+                } else {
+                    vec![string_expr, from_expr, for_expr]
+                };
+                Ok(ParseExpression::Function {
+                    name: "substr".to_string(),
+                    args,
+                    feature: None,
+                })
+            } else {
+                let (pattern_str, escape_str) =
+                    extract_pattern_and_escape(plan, &from_expr, &for_expr)?;
+                let regex_pattern = similar_escape_internal(&pattern_str, &escape_str)?;
+                // Update pattern with regex in plan
+                if let ParseExpression::PlanId { plan_id } = from_expr {
+                    let value = Value::String(regex_pattern);
+                    plan.nodes
+                        .replace(plan_id, Node64::Constant(Constant { value }))?;
+                }
+                let string_id = string_expr.populate_plan(plan, worker)?;
+                let pattern_id = from_expr.populate_plan(plan, worker)?;
+                Ok(ParseExpression::Function {
+                    name: "substring".to_string(),
+                    args: vec![
+                        ParseExpression::PlanId { plan_id: string_id },
+                        ParseExpression::PlanId {
+                            plan_id: pattern_id,
+                        },
+                    ],
+                    feature: None,
+                })
+            }
+        }
+        Rule::SubstringFor => {
+            // Handle: substring(expr FOR expr)
+            let mut pieces = variant.into_inner();
+            let string_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            let for_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+
+            // Check if length is negative
+            if is_negative_number(plan, &for_expr)? {
+                return Err(SbroadError::Invalid(
+                    Entity::Expression,
+                    Some("Length parameter in substring cannot be negative".into()),
+                ));
+            }
+
+            let string_id = string_expr.populate_plan(plan, worker)?;
+            let one_literal = plan.add_const(Value::Unsigned(1));
+            let for_id = for_expr.populate_plan(plan, worker)?;
+
+            Ok(ParseExpression::Function {
+                name: "substr".to_string(),
+                args: vec![
+                    ParseExpression::PlanId { plan_id: string_id },
+                    ParseExpression::PlanId {
+                        plan_id: one_literal,
+                    },
+                    ParseExpression::PlanId { plan_id: for_id },
+                ],
+                feature: None,
+            })
+        }
+        Rule::SubstringFrom => {
+            // Handle: substring(expr FROM expr) - both numeric and regexp variants
+            let mut pieces = variant.into_inner();
+            let string_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            let from_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            // Check if from_expr is a numeric literal
+            let (is_numeric, use_one) = if let ParseExpression::PlanId { plan_id } = &from_expr {
+                let is_negative = is_negative_number(plan, &from_expr)?;
+                let is_numeric = matches!(
+                    plan.get_node(*plan_id)?,
+                    Node::Expression(Expression::Constant(Constant {
+                        value: Value::Integer(_) | Value::Unsigned(_)
+                    }))
+                );
+                (is_numeric, is_negative)
+            } else {
+                (false, false)
+            };
+            let args = if use_one {
+                let one_literal = plan.add_const(Value::Unsigned(1));
+                vec![
+                    string_expr,
+                    ParseExpression::PlanId {
+                        plan_id: one_literal,
+                    },
+                ]
+            } else {
+                vec![string_expr, from_expr]
+            };
+            Ok(ParseExpression::Function {
+                name: if is_numeric {
+                    "substr".to_string()
+                } else {
+                    "substring".to_string()
+                },
+                args,
+                feature: None,
+            })
+        }
+        Rule::SubstringSimilar => {
+            // Handle: substring(expr SIMILAR expr ESCAPE expr)
+            let mut pieces = variant.into_inner();
+            let string_expr =
+                //parse_next_expression(&mut pieces, referred_relation_ids, worker, plan)?;
+                parse_expr_pratt(
+                    pieces.next().expect("Expected expression").into_inner(),
+                    referred_relation_ids,
+                    worker,
+                    plan,
+                )?;
+            let pattern_expr = parse_expr_pratt(
+                Pairs::single(pieces.next().expect("Expected expression")),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            //parse_next_expression(&mut pieces, referred_relation_ids, worker, plan)?;
+            let escape_expr = parse_expr_pratt(
+                pieces.next().expect("Expected expression").into_inner(),
+                referred_relation_ids,
+                worker,
+                plan,
+            )?;
+            //parse_next_expression(&mut pieces, referred_relation_ids, worker, plan)?;
+
+            let (pattern_str, escape_str) =
+                extract_pattern_and_escape(plan, &pattern_expr, &escape_expr)?;
+            let regex_pattern = similar_escape_internal(&pattern_str, &escape_str)?;
+
+            // Update pattern with regex in plan
+            if let ParseExpression::PlanId { plan_id } = pattern_expr {
+                let value = Value::String(regex_pattern);
+                plan.nodes
+                    .replace(plan_id, Node64::Constant(Constant { value }))?;
+            }
+
+            let string_id = string_expr.populate_plan(plan, worker)?;
+            let pattern_id = pattern_expr.populate_plan(plan, worker)?;
+
+            Ok(ParseExpression::Function {
+                name: "substring".to_string(),
+                args: vec![
+                    ParseExpression::PlanId { plan_id: string_id },
+                    ParseExpression::PlanId {
+                        plan_id: pattern_id,
+                    },
+                ],
+                feature: None,
+            })
+        }
+        // Rule::SubstringRegular => {
+        //     // Handle: substring(expr, expr, expr)
+        //     let mut pieces = variant.into_inner();
+
+        //     let exprs = vec![
+        //     parse_expr_pratt(
+        //         pieces.next().expect("Expected expression").into_inner(),
+        //         referred_relation_ids,
+        //         worker,
+        //         plan,
+        //     )?,
+        //     parse_expr_pratt(
+        //         pieces.next().expect("Expected expression").into_inner(),
+        //         referred_relation_ids,
+        //         worker,
+        //         plan,
+        //     )?
+        //     ,
+        //     parse_expr_pratt(
+        //         pieces.next().expect("Expected expression").into_inner(),
+        //         referred_relation_ids,
+        //         worker,
+        //         plan,
+        //     )?
+        //     ];
+
+        //     Ok(ParseExpression::Function {
+        //         name: "substr".to_string(),
+        //         args: exprs,
+        //         feature: None,
+        //     })
+        // }
+        _ => Err(SbroadError::ParsingError(
+            Entity::Expression,
+            "Unrecognized SubstringVariant".into(),
+        )),
+    }
+}
+
+// Helper functions
+
+fn is_negative_number(plan: &Plan, expr: &ParseExpression) -> Result<bool, SbroadError> {
+    if let ParseExpression::PlanId { plan_id } = expr {
+        if let Node::Expression(Expression::Constant(Constant { value })) =
+            plan.get_node(*plan_id)?
+        {
+            return Ok(matches!(value, Value::Integer(n) if *n < 0));
+        }
+    }
+    Ok(false)
+}
+
+fn extract_pattern_and_escape(
+    plan: &Plan,
+    pattern_expr: &ParseExpression,
+    escape_expr: &ParseExpression,
+) -> Result<(String, String), SbroadError> {
+    let mut pattern_str = String::new();
+    let mut escape_str = String::new();
+
+    if let ParseExpression::PlanId { plan_id } = pattern_expr {
+        if let Node::Expression(Expression::Constant(Constant {
+            value: Value::String(s),
+        })) = plan.get_node(*plan_id)?
+        {
+            pattern_str = s.to_string();
+        }
+    }
+    if let ParseExpression::PlanId { plan_id } = escape_expr {
+        if let Node::Expression(Expression::Constant(Constant {
+            value: Value::String(s),
+        })) = plan.get_node(*plan_id)?
+        {
+            escape_str = s.to_string();
+        }
+    }
+
+    Ok((pattern_str, escape_str))
+}
+
 /// Function responsible for parsing expressions using Pratt parser.
 ///
 /// Parameters:
@@ -2684,6 +3091,7 @@ where
                     ParseExpression::Exists { is_not: first_is_not, child: Box::new(child_parse_expr)}
                 }
                 Rule::Trim => parse_trim(primary, referred_relation_ids, worker, plan)?,
+                Rule::Substring => parse_substring(primary, referred_relation_ids, worker, plan)?,
                 Rule::CastOp => {
                     let mut inner_pairs = primary.into_inner();
                     let expr_pair = inner_pairs.next().expect("Cast has no expr child.");
diff --git a/sbroad/sbroad-core/src/frontend/sql/query.pest b/sbroad/sbroad-core/src/frontend/sql/query.pest
index e0de518afd..5d190e3089 100644
--- a/sbroad/sbroad-core/src/frontend/sql/query.pest
+++ b/sbroad/sbroad-core/src/frontend/sql/query.pest
@@ -309,7 +309,7 @@ Identifier = @{ DelimitedIdentifier | RegularIdentifier  }
                         | ^"having" | ^"inner" | ^"int2" | ^"int4" | ^"int8" | ^"integer" | ^"into" | ^"int" | ^"in" | ^"is"
                         | ^"join" | ^"left" | ^"limit" | ^"localtimestamp" | ^"not" | ^"null" | ^"number" | ^"numeric"
                         | ^"on" | ^"option" | ^"order" | ^"or" | ^"outer" | ^"primary"
-                        | ^"scalar" | ^"select" | ^"set" | ^"smallint" |^"string"
+                        | ^"scalar" | ^"select" | ^"set" | ^"similar" | ^"smallint" |^"string"
                         | ^"table" | ^"text" | ^"then" | ^"to" | ^"true"
                         | ^"union" | ^"unsigned" | ^"using" | ^"uuid"
                         | ^"values" | ^"varchar" | ^"when" | ^"where" | ^"with"
@@ -357,7 +357,7 @@ Expr = ${ ExprAtomValue ~ (ExprInfixOpo ~ ExprAtomValue)* }
         CastPostfix = { "::" ~ ColumnDefType }
         IsPostfix = ${ ^"is" ~ W ~ (NotFlag ~ W)? ~ (True | False | Unknown | Null) }
             Unknown = { ^"unknown" }
-        AtomicExpr = _{ Literal | Parameter | CastOp | Trim | CurrentDate | LocalTimestamp | IdentifierWithOptionalContinuation | ExpressionInParentheses | UnaryOperator | Case | SubQuery | Row }
+        AtomicExpr = _{ Literal | Parameter | CastOp | Trim | Substring | CurrentDate | LocalTimestamp | IdentifierWithOptionalContinuation | ExpressionInParentheses | UnaryOperator | Case | SubQuery | Row }
             Literal = { True | False | Null | Double | Decimal | Unsigned | Integer | SingleQuotedString }
                 True     = { ^"true" }
                 False    = { ^"false" }
@@ -390,6 +390,7 @@ Expr = ${ ExprAtomValue ~ (ExprInfixOpo ~ ExprAtomValue)* }
                     TrimKindBoth = { ^"both" }
                 TrimPattern = { Expr }
                 TrimTarget = { Expr }
+
             Case = ${
                 ^"case" ~ W ~
                 (Expr ~ W)? ~
@@ -455,3 +456,12 @@ WHITESPACE = _{ " " | "\t" | "\n" | "\r\n" }
 W  = _{ WHITESPACE+ }
 WO = _{ WHITESPACE* }
 EOF = { EOI | ";" }
+
+// Add the Substring rule after the existing AtomicExpr rule
+Substring = ${ ^"substring" ~ WO ~ "(" ~ WO ~ SubstringVariant ~ WO ~ ")" }
+    SubstringVariant = _{ SubstringFromFor | SubstringRegular | SubstringFor | SubstringFrom | SubstringSimilar }
+    SubstringFromFor = ${ Expr ~ W ~ ^"from" ~ W ~ Expr ~ W ~ ^"for" ~ W ~ Expr }
+    SubstringRegular = ${ Expr ~ (WO ~ "," ~ WO ~ Expr) ~ (WO ~ "," ~ WO ~ Expr) }
+    SubstringFor = ${ Expr ~ W ~ ^"for" ~ W ~ Expr }
+    SubstringFrom = ${ Expr ~ W ~ ^"from" ~ W ~ Expr }
+    SubstringSimilar = ${ Expr ~ W ~ ^"similar" ~ W ~ ExprAtomValue ~ W ~ ^"escape" ~ W ~ Expr }
\ No newline at end of file
diff --git a/src/lib.rs b/src/lib.rs
index f2f5582a74..503d65cb6a 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -12,6 +12,7 @@
 #![allow(clippy::unused_io_amount)]
 #![allow(clippy::bool_assert_comparison)]
 use config::apply_parameter;
+use regex::Regex;
 use serde::{Deserialize, Serialize};
 use storage::ToEntryIter;
 
@@ -173,6 +174,46 @@ fn init_sbroad() {
     "#,
     )
     .unwrap();
+    let _ = lua.exec_with(
+        r#"
+    if rawget(_G, 'pico') == nil then
+           error('pico module must be initialized before regexp_extract')
+       end
+       if pico.builtins == nil then
+           pico.builtins = {}
+       end
+    pico.builtins.SUBSTRING = ...
+    "#,
+        tlua::function2(|input: String, pattern: String| -> String {
+            match Regex::new(&pattern) {
+                Ok(re) => {
+                    if let Some(caps) = re.captures(&input) {
+                        // If there is a capturing group (i.e. len() > 1), return the first group
+                        if caps.len() > 1 {
+                            match caps.get(1) {
+                                Some(_) => {
+                                    if let Some(matched) = caps.get(1) {
+                                        return matched.as_str().to_string();
+                                    }
+                                }
+                                None => return String::new(),
+                            }
+                        }
+                        // Otherwise, return the full match
+                        if let Some(matched) = caps.get(0) {
+                            return matched.as_str().to_string();
+                        }
+                    }
+                    // No match found
+                    return String::new();
+                }
+                Err(e) => {
+                    tlog!(Error, "Invalid regex pattern: {}", e);
+                    return String::new();
+                }
+            }
+        }),
+    );
 }
 
 extern "C-unwind" {
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index 3578600704..0569890ee5 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -1397,6 +1397,137 @@ def test_substr(instance: Instance):
     assert data[0] == [""]
 
 
+def test_substring(instance: Instance):
+    # substring expression examples
+    data = instance.sql("SELECT SUBSTRING('1234567890' FROM 3)")
+    assert data[0] == ["34567890"]
+
+    data = instance.sql("SELECT SUBSTRING('1234567890' FROM 4 FOR 3)")
+    assert data[0] == ["456"]
+
+    # overflow cases
+    data = instance.sql("SELECT SUBSTRING('string' FROM 2 FOR 2147483646)")
+    assert data[0] == ["tring"]
+
+    with pytest.raises(
+        TarantoolError, match="u64 parsing error number too large to fit in target"
+    ):
+        instance.sql("SELECT SUBSTRING('string' FROM 2 FOR 99999999999999999999)")
+
+    with pytest.raises(
+        TarantoolError, match="Length parameter in substring cannot be negative"
+    ):
+        instance.sql("SELECT SUBSTRING('string' FROM 2 FOR -2147483646)")
+
+    # regular expression substring (with SQL's regex syntax)
+    data = instance.sql(
+        """SELECT SUBSTRING('abcdefg' SIMILAR 'a#"(b_d)#"%' ESCAPE '#')"""
+    )
+    assert data[0] == ["bcd"]
+
+    # obsolete SQL99 syntax
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' FROM 'a#"(b_d)#"%' FOR '#')""")
+    assert data[0] == ["bcd"]
+
+    # null as parameter
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR NULL ESCAPE '#')""")
+    assert data[0] == [""]
+
+    with pytest.raises(TarantoolError, match="invalid escape string"):
+        instance.sql("SELECT SUBSTRING('abcdefg' SIMILAR '%' ESCAPE NULL)")
+
+    # the first and last parts should act non-greedy
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'a#"%#"g' ESCAPE '#')""")
+    assert data[0] == ["bcdef"]
+
+    data = instance.sql(
+        """SELECT SUBSTRING('abcdefg' SIMILAR 'a*#"%#"g*' ESCAPE '#')"""
+    )
+    assert data[0] == ["bcdefg"]
+
+    # vertical bar in any part affects only that part
+    data = instance.sql(
+        """SELECT SUBSTRING('abcdefg' SIMILAR 'a|b#"%#"g' ESCAPE '#')"""
+    )
+    assert data[0] == ["bcdef"]
+
+    data = instance.sql(
+        """SELECT SUBSTRING('abcdefg' SIMILAR 'a#"%#"x|g' ESCAPE '#')"""
+    )
+    assert data[0] == ["bcdef"]
+
+    data = instance.sql(
+        """SELECT SUBSTRING('abcdefg' SIMILAR 'a#"%|ab#"g' ESCAPE '#')"""
+    )
+    assert data[0] == ["bcdef"]
+
+    # postgres extension cases
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'a#"%g' ESCAPE '#')""")
+    assert data[0] == ["bcdefg"]
+
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'a%g' ESCAPE '#')""")
+    assert data[0] == ["abcdefg"]
+
+    # with two arguments as POSIX regex
+    data = instance.sql("SELECT SUBSTRING('abcdefg' FROM 'c.e')")
+    assert data[0] == ["cde"]
+
+    # with parenthesized subexpression
+    data = instance.sql("SELECT SUBSTRING('abcdefg' FROM 'b(.*)f')")
+    assert data[0] == ["cde"]
+
+    # check case where there's a match but no subexpression
+    data = instance.sql("SELECT SUBSTRING('foo' FROM 'foo(bar)?')")
+    assert data[0] == [""]
+
+    # invalid escape string (should error)
+    with pytest.raises(TarantoolError, match="invalid escape string"):
+        instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'abc' ESCAPE 'xy')""")
+
+    # character class handling
+    data = instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'a[bcd]%g' ESCAPE '#')""")
+    assert data[0] == ["abcdefg"]
+
+    data = instance.sql(r"SELECT SUBSTRING('abcdefg' SIMILAR 'a[b\]c]%g' ESCAPE '\')")
+    assert data[0] == ["abcdefg"]
+
+    # escaping special characters
+    data = instance.sql("""SELECT SUBSTRING('a.c' SIMILAR '%#.%' ESCAPE '#')""")
+    assert data[0] == ["a.c"]
+
+    data = instance.sql("""SELECT SUBSTRING('a^c' SIMILAR '%#^%' ESCAPE '#')""")
+    assert data[0] == ["a^c"]
+
+    data = instance.sql("""SELECT SUBSTRING('a$c' SIMILAR '%#$%' ESCAPE '#')""")
+    assert data[0] == ["a$c"]
+
+    # error on too many escape-double-quote separators
+    with pytest.raises(
+        TarantoolError,
+        match="may not contain more than two escape-double-quote separators",
+    ):
+        instance.sql("""SELECT SUBSTRING('abcdefg' SIMILAR 'a#"b#"c#"d' ESCAPE '#')""")
+
+    # backslash outside char class
+    data = instance.sql("SELECT SUBSTRING('abc\\def' SIMILAR '%\\%' ESCAPE '#')")
+    assert data[0] == ["abc\\def"]
+
+    # backslash inside char class
+    data = instance.sql("SELECT SUBSTRING('abc\\def' SIMILAR '%[\\\\]d%' ESCAPE '#')")
+    assert data[0] == ["abc\\def"]
+
+    # casted to string expressions as parameters
+    data = instance.sql("SELECT SUBSTRING(TRUE::STRING SIMILAR '%%' ESCAPE '#')")
+    assert data[0] == ["TRUE"]
+
+    data = instance.sql("SELECT SUBSTRING(1234::STRING for 2)")
+    assert data[0] == ["12"]
+
+    # casted to integer expressions as parameters
+    data = instance.sql("SELECT SUBSTRING('1234567890' FOR 5::integer)")
+    assert data[0] == ["12345"]
+
+
 def test_coalesce(instance: Instance):
     instance.sql("create table foo (id int primary key, bar unsigned null);")
     instance.sql("insert into foo values (1, null), (2, 1);")
-- 
GitLab