From d65914e246543847192e1060fc88a18539a27528 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Fri, 12 Aug 2022 11:41:48 +0700
Subject: [PATCH] fix: calculation of the bucket_id by tuple

As a side effect some refactoring was made to deal with code
duplication in the cartridge, mock and bench engines.
---
 benches/engine.rs                       |  50 ++--------
 src/executor/engine.rs                  | 119 ++++++++++++++++++++++--
 src/executor/engine/cartridge/config.rs |   9 +-
 src/executor/engine/cartridge/router.rs |  57 +-----------
 src/executor/engine/mock.rs             |  27 +-----
 src/executor/tests/bucket_id.rs         |  13 +++
 src/ir/expression.rs                    |   2 +-
 src/ir/relation.rs                      |  10 +-
 src/ir/transformation/redistribution.rs |   2 +-
 test_app/test/integration/api_test.lua  |  11 ++-
 10 files changed, 164 insertions(+), 136 deletions(-)

diff --git a/benches/engine.rs b/benches/engine.rs
index 79025b1846..e7449391c3 100644
--- a/benches/engine.rs
+++ b/benches/engine.rs
@@ -7,7 +7,10 @@ use std::collections::HashMap;
 use sbroad::errors::QueryPlannerError;
 use sbroad::executor::bucket::Buckets;
 use sbroad::executor::engine::cartridge::hash::bucket_id_by_tuple;
-use sbroad::executor::engine::{Configuration, Coordinator, CoordinatorMetadata};
+use sbroad::executor::engine::{
+    sharding_keys_from_map, sharding_keys_from_tuple, Configuration, Coordinator,
+    CoordinatorMetadata,
+};
 use sbroad::executor::ir::ExecutionPlan;
 use sbroad::executor::lru::{Cache, LRUCache, DEFAULT_CAPACITY};
 use sbroad::executor::result::ProducerResult;
@@ -53,7 +56,7 @@ impl CoordinatorMetadata for RouterConfigurationMock {
     }
 
     fn get_sharding_key_by_space(&self, space: &str) -> Result<Vec<String>, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         table.get_sharding_column_names()
     }
 
@@ -61,12 +64,12 @@ impl CoordinatorMetadata for RouterConfigurationMock {
         &self,
         space: &str,
     ) -> Result<Vec<usize>, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         Ok(table.get_sharding_positions().to_vec())
     }
 
     fn get_fields_amount_by_space(&self, space: &str) -> Result<usize, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         Ok(table.columns.len())
     }
 }
@@ -421,15 +424,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         args: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        Ok(self
-            .cached_config()
-            .get_sharding_key_by_space(&space)
-            .unwrap()
-            .iter()
-            .fold(Vec::new(), |mut acc: Vec<&Value>, v| {
-                acc.push(args.get(v).unwrap());
-                acc
-            }))
+        sharding_keys_from_map(self.cached_config(), &space, args)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -437,34 +432,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        match self
-            .cached_config()
-            .get_fields_amount_by_space(space.as_str())
-        {
-            Ok(fields_amount) => {
-                if fields_amount != rec.len() + 1 {
-                    return Err(QueryPlannerError::CustomError(format!(
-                        "Expected tuple len {}, got {}",
-                        fields_amount - 1,
-                        rec.len()
-                    )));
-                }
-            }
-            Err(e) => return Err(e),
-        }
-
-        match self
-            .cached_config()
-            .get_sharding_positions_by_space(space.as_str())
-        {
-            Ok(vec) => {
-                let mut vec_values = Vec::new();
-                vec.into_iter()
-                    .for_each(|index| vec_values.push(&rec[index]));
-                Ok(vec_values)
-            }
-            Err(e) => Err(e),
-        }
+        sharding_keys_from_tuple(self.cached_config(), &space, rec)
     }
 
     fn determine_bucket_id(&self, s: &[&Value]) -> u64 {
diff --git a/src/executor/engine.rs b/src/executor/engine.rs
index b68c124e48..02298a4655 100644
--- a/src/executor/engine.rs
+++ b/src/executor/engine.rs
@@ -4,6 +4,7 @@
 
 use std::any::Any;
 use std::cell::RefCell;
+use std::cmp::Ordering;
 use std::collections::HashMap;
 
 use crate::errors::QueryPlannerError;
@@ -34,13 +35,7 @@ pub trait CoordinatorMetadata {
 
     #[must_use]
     fn to_name(s: &str) -> String {
-        if let (Some('"'), Some('"')) = (s.chars().next(), s.chars().last()) {
-            s.to_string()
-        } else if s.to_uppercase() == s {
-            s.to_lowercase()
-        } else {
-            format!("\"{}\"", s)
-        }
+        to_name(s)
     }
 
     /// Provides vector of the sharding key column names or an error
@@ -151,5 +146,115 @@ pub trait Coordinator: Configuration {
     fn determine_bucket_id(&self, s: &[&Value]) -> u64;
 }
 
+/// A common function for all engines to calculate the sharding key value from a tuple.
+///
+/// # Errors
+/// - The space was not found in the metadata.
+/// - The sharding keys are not present in the space.
+pub fn sharding_keys_from_tuple<'rec>(
+    conf: &impl CoordinatorMetadata,
+    space: &str,
+    tuple: &'rec [Value],
+) -> Result<Vec<&'rec Value>, QueryPlannerError> {
+    let sharding_positions = conf.get_sharding_positions_by_space(space)?;
+    let mut sharding_tuple = Vec::with_capacity(sharding_positions.len());
+    let table_col_amount = conf.get_fields_amount_by_space(space)?;
+    if table_col_amount == tuple.len() {
+        // The tuple contains a "bucket_id" column.
+        for position in &sharding_positions {
+            let value = tuple.get(*position).ok_or_else(|| {
+                QueryPlannerError::CustomError(format!(
+                    "Missing sharding key position {:?} in the tuple {:?}",
+                    position, tuple
+                ))
+            })?;
+            sharding_tuple.push(value);
+        }
+        Ok(sharding_tuple)
+    } else if table_col_amount == tuple.len() + 1 {
+        // The tuple doesn't contain the "bucket_id" column.
+        let table = conf.get_table_segment(space)?;
+        let bucket_position = table.get_bucket_id_position()?;
+
+        // If the "bucket_id" splits the sharding key, we need to shift the sharding
+        // key positions of the right part by one.
+        // For example, we have a table with columns a, bucket_id, b, and the sharding
+        // key is (a, b). Then the sharding key positions are (0, 2).
+        // If someone gives us a tuple (42, 666) we should tread is as (42, null, 666).
+        for position in &sharding_positions {
+            let corrected_pos = match position.cmp(&bucket_position) {
+                Ordering::Less => *position,
+                Ordering::Equal => {
+                    return Err(QueryPlannerError::CustomError(format!(
+                        r#"The tuple {:?} contains a "bucket_id" position {} in a sharding key {:?}"#,
+                        tuple, position, sharding_positions
+                    )))
+                }
+                Ordering::Greater => *position - 1,
+            };
+            let value = tuple.get(corrected_pos).ok_or_else(|| {
+                QueryPlannerError::CustomError(format!(
+                    "Missing sharding key position {:?} in the tuple {:?}",
+                    corrected_pos, tuple
+                ))
+            })?;
+            sharding_tuple.push(value);
+        }
+        Ok(sharding_tuple)
+    } else {
+        Err(QueryPlannerError::CustomError(format!(
+            "The tuple {:?} was expected to have {} filed(s), got {}.",
+            tuple,
+            table_col_amount - 1,
+            tuple.len()
+        )))
+    }
+}
+
+/// A common function for all engines to calculate the sharding key value from a map.
+///
+/// # Errors
+/// - The space was not found in the metadata.
+/// - The sharding keys are not present in the space.
+pub fn sharding_keys_from_map<'rec, S: ::std::hash::BuildHasher>(
+    conf: &impl CoordinatorMetadata,
+    space: &str,
+    map: &'rec HashMap<String, Value, S>,
+) -> Result<Vec<&'rec Value>, QueryPlannerError> {
+    let sharding_key = conf.get_sharding_key_by_space(space)?;
+    let quoted_map = map
+        .iter()
+        .map(|(k, _)| (to_name(k), k.as_str()))
+        .collect::<HashMap<String, &str>>();
+    let mut tuple = Vec::with_capacity(sharding_key.len());
+    for quoted_column in &sharding_key {
+        if let Some(column) = quoted_map.get(quoted_column) {
+            let value = map.get(*column).ok_or_else(|| {
+                QueryPlannerError::CustomError(format!(
+                    "Missing sharding key column {:?} in the map {:?}",
+                    column, map
+                ))
+            })?;
+            tuple.push(value);
+        } else {
+            return Err(QueryPlannerError::CustomError(format!(
+                "Missing quoted sharding key column {:?} in the quoted map {:?}. Original map: {:?}",
+                quoted_column, quoted_map, map
+            )));
+        }
+    }
+    Ok(tuple)
+}
+
+fn to_name(s: &str) -> String {
+    if let (Some('"'), Some('"')) = (s.chars().next(), s.chars().last()) {
+        s.to_string()
+    } else if s.to_uppercase() == s {
+        s.to_lowercase()
+    } else {
+        format!("\"{}\"", s)
+    }
+}
+
 #[cfg(test)]
 pub mod mock;
diff --git a/src/executor/engine/cartridge/config.rs b/src/executor/engine/cartridge/config.rs
index 4647aa4d6b..660487e196 100644
--- a/src/executor/engine/cartridge/config.rs
+++ b/src/executor/engine/cartridge/config.rs
@@ -200,7 +200,8 @@ impl CoordinatorMetadata for RouterConfiguration {
     /// Returns `QueryPlannerError` when table was not found.
     #[allow(dead_code)]
     fn get_table_segment(&self, table_name: &str) -> Result<Table, QueryPlannerError> {
-        match self.tables.get(table_name) {
+        let name = Self::to_name(table_name);
+        match self.tables.get(&name) {
             Some(v) => Ok(v.clone()),
             None => Err(QueryPlannerError::CustomError(format!(
                 "Space {} not found",
@@ -220,7 +221,7 @@ impl CoordinatorMetadata for RouterConfiguration {
 
     /// Get sharding key's column names by a space name
     fn get_sharding_key_by_space(&self, space: &str) -> Result<Vec<String>, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         table.get_sharding_column_names()
     }
 
@@ -228,12 +229,12 @@ impl CoordinatorMetadata for RouterConfiguration {
         &self,
         space: &str,
     ) -> Result<Vec<usize>, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         Ok(table.get_sharding_positions().to_vec())
     }
 
     fn get_fields_amount_by_space(&self, space: &str) -> Result<usize, QueryPlannerError> {
-        let table = self.get_table_segment(&Self::to_name(space))?;
+        let table = self.get_table_segment(space)?;
         Ok(table.columns.len())
     }
 }
diff --git a/src/executor/engine/cartridge/router.rs b/src/executor/engine/cartridge/router.rs
index f98ed7c65e..c1b093d5a2 100644
--- a/src/executor/engine/cartridge/router.rs
+++ b/src/executor/engine/cartridge/router.rs
@@ -16,7 +16,9 @@ use crate::executor::bucket::Buckets;
 use crate::executor::engine::cartridge::backend::sql::ir::PatternWithParams;
 use crate::executor::engine::cartridge::config::RouterConfiguration;
 use crate::executor::engine::cartridge::hash::bucket_id_by_tuple;
-use crate::executor::engine::{Configuration, Coordinator};
+use crate::executor::engine::{
+    sharding_keys_from_map, sharding_keys_from_tuple, Configuration, Coordinator,
+};
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::lru::{LRUCache, DEFAULT_CAPACITY};
 use crate::executor::result::ProducerResult;
@@ -247,31 +249,7 @@ impl Coordinator for RouterRuntime {
         space: String,
         map: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        let sharding_key = self
-            .cached_config()
-            .get_sharding_key_by_space(space.as_str())?;
-        let quoted_map = map
-            .iter()
-            .map(|(k, _)| (RouterConfiguration::to_name(k), k.as_str()))
-            .collect::<HashMap<String, &str>>();
-        let mut tuple = Vec::with_capacity(sharding_key.len());
-        for quoted_column in &sharding_key {
-            if let Some(column) = quoted_map.get(quoted_column) {
-                let value = map.get(*column).ok_or_else(|| {
-                    QueryPlannerError::CustomError(format!(
-                        "Missing sharding key column {:?} in the map {:?}",
-                        column, map
-                    ))
-                })?;
-                tuple.push(value);
-            } else {
-                return Err(QueryPlannerError::CustomError(format!(
-                    "Missing quoted sharding key column {:?} in the quoted map {:?}. Original map: {:?}",
-                    quoted_column, quoted_map, map
-                )));
-            }
-        }
-        Ok(tuple)
+        sharding_keys_from_map(&self.metadata, &space, map)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -279,32 +257,7 @@ impl Coordinator for RouterRuntime {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        let fields_amount = self
-            .cached_config()
-            .get_fields_amount_by_space(space.as_str())?;
-        if fields_amount != rec.len() + 1 {
-            return Err(QueryPlannerError::CustomError(format!(
-                "Expected tuple len {}, got {}",
-                fields_amount - 1,
-                rec.len()
-            )));
-        }
-
-        let sharding_positions = self
-            .cached_config()
-            .get_sharding_positions_by_space(space.as_str())?;
-
-        let mut tuple = Vec::with_capacity(sharding_positions.len());
-        for position in &sharding_positions {
-            let value = rec.get(*position).ok_or_else(|| {
-                QueryPlannerError::CustomError(format!(
-                    "Missing sharding key position {:?} in the tuple {:?}",
-                    position, rec
-                ))
-            })?;
-            tuple.push(value);
-        }
-        Ok(tuple)
+        sharding_keys_from_tuple(self.cached_config(), &space, rec)
     }
 
     /// Calculate bucket for a key.
diff --git a/src/executor/engine/mock.rs b/src/executor/engine/mock.rs
index af976195a5..8d373cf991 100644
--- a/src/executor/engine/mock.rs
+++ b/src/executor/engine/mock.rs
@@ -5,7 +5,9 @@ use std::collections::{HashMap, HashSet};
 use crate::collection;
 use crate::errors::QueryPlannerError;
 use crate::executor::bucket::Buckets;
-use crate::executor::engine::{Configuration, Coordinator};
+use crate::executor::engine::{
+    sharding_keys_from_map, sharding_keys_from_tuple, Configuration, Coordinator,
+};
 use crate::executor::ir::ExecutionPlan;
 use crate::executor::lru::{LRUCache, DEFAULT_CAPACITY};
 use crate::executor::result::ProducerResult;
@@ -299,15 +301,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         args: &'rec HashMap<String, Value>,
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        Ok(self
-            .cached_config()
-            .get_sharding_key_by_space(&space)
-            .unwrap()
-            .iter()
-            .fold(Vec::new(), |mut acc: Vec<&Value>, v| {
-                acc.push(args.get(v).unwrap());
-                acc
-            }))
+        sharding_keys_from_map(&self.metadata, &space, args)
     }
 
     fn extract_sharding_keys_from_tuple<'engine, 'rec>(
@@ -315,18 +309,7 @@ impl Coordinator for RouterRuntimeMock {
         space: String,
         rec: &'rec [Value],
     ) -> Result<Vec<&'rec Value>, QueryPlannerError> {
-        match self
-            .cached_config()
-            .get_sharding_positions_by_space(space.as_str())
-        {
-            Ok(vec) => {
-                let mut vec_values = Vec::new();
-                vec.into_iter()
-                    .for_each(|index| vec_values.push(&rec[index]));
-                Ok(vec_values)
-            }
-            Err(e) => Err(e),
-        }
+        sharding_keys_from_tuple(self.cached_config(), &space, rec)
     }
 
     fn determine_bucket_id(&self, s: &[&Value]) -> u64 {
diff --git a/src/executor/tests/bucket_id.rs b/src/executor/tests/bucket_id.rs
index 8d04341aa3..958ce30123 100644
--- a/src/executor/tests/bucket_id.rs
+++ b/src/executor/tests/bucket_id.rs
@@ -65,3 +65,16 @@ fn bucket2_test() {
     ]);
     assert_eq!(expected, result);
 }
+
+#[test]
+fn sharding_keys_from_tuple1() {
+    let coordinator = RouterRuntimeMock::new();
+    let tuple = vec![Value::from("123"), Value::from(1_u64)];
+    let sharding_keys = coordinator
+        .extract_sharding_keys_from_tuple("t1".into(), &tuple)
+        .unwrap();
+    assert_eq!(
+        sharding_keys,
+        vec![&Value::from("123"), &Value::from(1_u64)]
+    );
+}
diff --git a/src/ir/expression.rs b/src/ir/expression.rs
index 8082197196..41a1e3b9a6 100644
--- a/src/ir/expression.rs
+++ b/src/ir/expression.rs
@@ -434,7 +434,7 @@ impl Plan {
                                     relation
                                 ))
                             })?;
-                            let sharding_column_pos = table.get_sharding_column_position()?;
+                            let sharding_column_pos = table.get_bucket_id_position()?;
                             // Take an advantage of the fact that the output aliases
                             // in the relation scan are in the same order as its columns.
                             list.iter()
diff --git a/src/ir/relation.rs b/src/ir/relation.rs
index ee1b6abe2b..5c418a73ce 100644
--- a/src/ir/relation.rs
+++ b/src/ir/relation.rs
@@ -249,11 +249,11 @@ impl Table {
         Ok(ts)
     }
 
-    /// Get position of the sharding column in the table.
+    /// Get position of the `bucket_id` system column in the table.
     ///
     /// # Errors
-    /// - Table doesn't have an exactly one sharding column.
-    pub fn get_sharding_column_position(&self) -> Result<usize, QueryPlannerError> {
+    /// - Table doesn't have an exactly one `bucket_id` column.
+    pub fn get_bucket_id_position(&self) -> Result<usize, QueryPlannerError> {
         let positions: Vec<usize> = self
             .columns
             .iter()
@@ -264,10 +264,10 @@ impl Table {
         match positions.len().cmp(&1) {
             Ordering::Equal => Ok(positions[0]),
             Ordering::Greater => Err(QueryPlannerError::CustomError(
-                "Table has more than one sharding column".into(),
+                "Table has more than one bucket_id column".into(),
             )),
             Ordering::Less => Err(QueryPlannerError::CustomError(
-                "Table has no sharding columns".into(),
+                "Table has no bucket_id columns".into(),
             )),
         }
     }
diff --git a/src/ir/transformation/redistribution.rs b/src/ir/transformation/redistribution.rs
index 18367a2f25..9c113e4f9f 100644
--- a/src/ir/transformation/redistribution.rs
+++ b/src/ir/transformation/redistribution.rs
@@ -853,7 +853,7 @@ impl Plan {
                 let rel = self.get_relation(relation).ok_or_else(|| {
                     QueryPlannerError::CustomError(format!("Relation {} not found", relation))
                 })?;
-                rel.get_sharding_column_position()?
+                rel.get_bucket_id_position()?
             } else {
                 return Err(QueryPlannerError::CustomError(
                     "Expected insert node".into(),
diff --git a/test_app/test/integration/api_test.lua b/test_app/test/integration/api_test.lua
index 029baf3ad0..a4dccd13d4 100644
--- a/test_app/test/integration/api_test.lua
+++ b/test_app/test/integration/api_test.lua
@@ -103,10 +103,15 @@ g.test_bucket_id_calculation = function()
 
     -- incorrect input
     r, err = api:call("calculate_bucket_id", { { 1 }, "testing_space" })
-    t.assert_str_contains(tostring(err), [[Expected tuple len 3, got 1]])
+    t.assert_str_contains(tostring(err), [[expected to have 3 filed(s), got 1]])
 
-    r, err = api:call("calculate_bucket_id", { { 1, "123", 1, 1 }, "testing_space" })
-    t.assert_str_contains(tostring(err), [[Expected tuple len 3, got 4]])
+    -- Test with a "bucket_id" field in the tuple.
+    r, err = api:call("calculate_bucket_id", { { 1, "123", 1, box.NULL }, "testing_space" })
+    t.assert_equals(err, nil)
+    t.assert_equals(r, 360)
+
+    r, err = api:call("calculate_bucket_id", { { 1, "123", 1, 1, 1 }, "testing_space" })
+    t.assert_str_contains(tostring(err), [[expected to have 3 filed(s), got 5]])
 
     r, err = api:call("calculate_bucket_id", { { id = 1 }, "testing_space" })
     t.assert_str_contains(tostring(err), [[Missing quoted sharding key column]])
-- 
GitLab