Skip to content
Snippets Groups Projects
Commit 9515c371 authored by Dmitry Travyan's avatar Dmitry Travyan :bus:
Browse files

feat: calculate_bucket_id_by_dict

parent 5bcf8264
No related branches found
No related tags found
1 merge request!1414sbroad import
Showing
with 463 additions and 56 deletions
[target.aarch64-apple-darwin]
rustflags = [
"-C", "link-arg=-undefined", "-C", "link-arg=dynamic_lookup"
]
......@@ -11,7 +11,7 @@ edition = "2021"
[dependencies]
decimal = "2.1.0"
fasthash = "0.4.0"
hash32 = "0.2"
itertools = "0.10.3"
pest = "2.0"
pest_derive = "2.0"
......@@ -20,10 +20,10 @@ serde_yaml = "0.8"
tarantool = { git = "https://sbroad-cargo-token:t-nZyqJVVuhGQv17BX6v@gitlab.com/picodata/picodata/tarantool-module.git", rev="d8921ec6"}
traversal = "0.1.2"
yaml-rust = "0.4.1"
rmp-serde = "0.14"
[dev-dependencies]
pretty_assertions = "1.0.0"
rmp-serde = "0.14"
[lib]
crate-type = ["cdylib"]
......@@ -36,7 +36,7 @@ use std::collections::HashMap;
mod bucket;
pub mod engine;
pub(crate) mod ir;
mod result;
pub mod result;
mod vtable;
impl Plan {
......
......@@ -2,11 +2,14 @@
//!
//! Traits that define an execution engine interface.
use std::collections::HashMap;
use crate::errors::QueryPlannerError;
use crate::executor::bucket::Buckets;
use crate::executor::ir::ExecutionPlan;
use crate::executor::result::BoxExecuteFormat;
use crate::executor::vtable::VirtualTable;
use crate::ir::value::Value as IrValue;
pub mod cartridge;
......@@ -33,6 +36,13 @@ pub trait Metadata {
format!("\"{}\"", s)
}
}
/// Provides an `Vec<&str>` with sharding keys or an error
///
/// # Errors
/// - Metadata does not contains space
/// - Metadata contains incorrect sharding keys format
fn get_sharding_key_by_space(&self, space: &str) -> Result<Vec<&str>, QueryPlannerError>;
}
/// An execution engine trait.
......@@ -78,6 +88,17 @@ pub trait Engine {
buckets: &Buckets,
) -> Result<BoxExecuteFormat, QueryPlannerError>;
/// Filter lua table values and return in right order
///
/// # Errors
/// - args does not contains all sharding keys
/// - internal metadata errors
fn extract_sharding_keys(
&self,
space: String,
args: HashMap<String, IrValue>,
) -> Result<Vec<IrValue>, QueryPlannerError>;
/// Determine shard for query execution by sharding key value
fn determine_bucket_id(&self, s: &str) -> u64;
}
......
//! Tarantool cartridge engine module.
use std::collections::HashMap;
use std::convert::TryInto;
use tarantool::log::{say, SayLevel};
......@@ -14,6 +15,7 @@ use crate::executor::ir::ExecutionPlan;
use crate::executor::result::BoxExecuteFormat;
use crate::executor::vtable::VirtualTable;
use crate::executor::Metadata;
use crate::ir::value::Value as IrValue;
mod backend;
pub mod cache;
......@@ -155,6 +157,31 @@ impl Engine for Runtime {
Ok(vtable)
}
/// Extract from the `HashMap<String, Value>` only those values that
/// correspond to the fields of the sharding key.
/// Returns the values of sharding keys in ordered form
fn extract_sharding_keys(
&self,
space: String,
rec: HashMap<String, IrValue>,
) -> Result<Vec<IrValue>, QueryPlannerError> {
self.metadata()
.get_sharding_key_by_space(space.as_str())?
.iter()
.try_fold(Vec::new(), |mut acc: Vec<IrValue>, &val| {
match rec.get(val) {
Some(value) => {
acc.push(value.clone());
Ok(acc)
}
None => Err(QueryPlannerError::CustomError(format!(
"The dict of args missed key/value to calculate bucket_id. Column: {}",
val
))),
}
})
}
/// Calculate bucket for a key.
fn determine_bucket_id(&self, s: &str) -> u64 {
str_to_bucket_id(s, self.bucket_count)
......
......@@ -60,27 +60,6 @@ impl ClusterAppConfig {
Err(QueryPlannerError::InvalidClusterSchema)
}
/// Get table sharding key.
///
/// # Panics
/// - Invalid schema.
#[allow(dead_code)]
#[must_use]
pub fn get_sharding_key_by_space(self, space: &str) -> Vec<String> {
let mut result = Vec::new();
let spaces = self.schema["spaces"].as_hash().unwrap();
for (space_name, params) in spaces.iter() {
let current_space_name = space_name.as_str().unwrap();
if current_space_name == space {
for k in params["sharding_key"].as_vec().unwrap() {
result.push(Self::to_name(k.as_str().unwrap()));
}
}
}
result
}
pub(in crate::executor::engine::cartridge) fn is_empty(&self) -> bool {
self.schema.is_null()
}
......@@ -169,6 +148,28 @@ impl Metadata for ClusterAppConfig {
fn get_exec_waiting_timeout(&self) -> u64 {
self.waiting_timeout
}
/// Get sharding keys by space name
fn get_sharding_key_by_space(&self, space: &str) -> Result<Vec<&str>, QueryPlannerError> {
if let Some(vec) = self.schema["spaces"][space]["sharding_key"].as_vec() {
return vec
.iter()
.try_fold(Vec::new(), |mut acc: Vec<&str>, str| match str.as_str() {
Some(val) => {
acc.push(val);
Ok(acc)
}
_ => Err(QueryPlannerError::CustomError(format!(
"Schema {} contains incorrect sharding keys format",
space
))),
});
}
Err(QueryPlannerError::CustomError(format!(
"Failed to get space {} or sharding key",
space
)))
}
}
#[cfg(test)]
......
......@@ -85,12 +85,10 @@ fn test_yaml_schema_parser() {
let mut s = ClusterAppConfig::new();
s.load_schema(test_schema).unwrap();
let mut expected_keys = Vec::new();
expected_keys.push("\"identification_number\"".to_string());
expected_keys.push("\"product_code\"".to_string());
let expected_keys = vec!["identification_number", "product_code"];
// FIXME: do we need "to_name()" here?
let actual_keys = s.get_sharding_key_by_space("hash_testing");
let actual_keys = s.get_sharding_key_by_space("hash_testing").unwrap();
assert_eq!(actual_keys, expected_keys)
}
......
//! Bucket hash module.
use fasthash::{murmur3::Hasher32, FastHasher};
use std::hash::Hasher;
use hash32::{Hasher, Murmur3Hasher};
/// Determine bucket value using `murmur3` hash function
pub(in crate::executor::engine) fn str_to_bucket_id(s: &str, bucket_count: usize) -> u64 {
let mut hash = Hasher32::new();
hash.write(s.as_bytes());
hash.finish() % bucket_count as u64 + 1
#[must_use]
/// A simple function to calculate the bucket id from a string slice.
/// `(MurMur3 hash at str) % bucket_count + 1`
pub fn str_to_bucket_id(s: &str, bucket_count: usize) -> u64 {
let mut hasher = Murmur3Hasher::default();
hasher.write(s.as_bytes());
u64::from(hasher.finish()) % bucket_count as u64 + 1
}
#[cfg(test)]
......
......@@ -4,12 +4,15 @@ use super::*;
fn test_bucket_id_by_str() {
assert_eq!(str_to_bucket_id("100тесты", 30000), 17339);
assert_eq!(
str_to_bucket_id("4TEST5501605647472000000100000000d92beee8-749f-4539-aa15-3d2941dbb0f1x32https://google.com", 30000),
13815
assert_eq!(str_to_bucket_id(
"4TEST5501605647472000000100000000d9\
2beee8-749f-4539-aa15-3d2941dbb0f1x32https://google.com", 30000),
13815
);
assert_eq!(360, str_to_bucket_id("1123", 30000),);
assert_eq!(str_to_bucket_id("1123", 30000), 360);
assert_eq!(str_to_bucket_id("daymonthyear", 30000), 3333);
}
#[test]
......
......@@ -9,10 +9,12 @@ use crate::executor::result::{BoxExecuteFormat, Value};
use crate::executor::vtable::VirtualTable;
use crate::executor::Metadata;
use crate::ir::relation::{Column, Table, Type};
use crate::ir::value::Value as IrValue;
#[allow(clippy::module_name_repetitions)]
#[derive(Debug, Clone)]
pub struct MetadataMock {
schema: HashMap<String, Vec<String>>,
tables: HashMap<String, Table>,
bucket_count: usize,
}
......@@ -29,6 +31,14 @@ impl Metadata for MetadataMock {
fn get_exec_waiting_timeout(&self) -> u64 {
0
}
fn get_sharding_key_by_space(&self, space: &str) -> Result<Vec<&str>, QueryPlannerError> {
Ok(self
.schema
.get(space)
.map(|v| v.iter().map(|s| s.as_str()).collect::<Vec<&str>>())
.unwrap())
}
}
impl MetadataMock {
......@@ -103,6 +113,15 @@ impl MetadataMock {
);
MetadataMock {
schema: [
("EMPLOYEES".into(), vec!["ID".into()]),
(
"hash_testing".into(),
vec!["identification_number".into(), "product_code".into()],
),
]
.into_iter()
.collect(),
tables,
bucket_count: 10000,
}
......@@ -180,6 +199,22 @@ impl Engine for EngineMock {
Ok(result)
}
fn extract_sharding_keys(
&self,
space: String,
args: HashMap<String, IrValue>,
) -> Result<Vec<IrValue>, QueryPlannerError> {
Ok(self
.metadata()
.get_sharding_key_by_space(&space)
.unwrap()
.iter()
.fold(Vec::new(), |mut acc: Vec<IrValue>, &v| {
acc.push(args.get(v).unwrap().clone());
acc
}))
}
fn determine_bucket_id(&self, s: &str) -> u64 {
str_to_bucket_id(s, self.metadata.bucket_count)
}
......
use std::fmt;
use decimal::d128;
use serde::de::Visitor;
use serde::ser::{Serialize, SerializeMap, Serializer};
use serde::Deserialize;
use tarantool::tlua::{self, LuaRead};
use crate::errors::QueryPlannerError;
......@@ -65,6 +67,76 @@ impl Serialize for Value {
}
}
struct ValueVistor;
impl<'de> Visitor<'de> for ValueVistor {
type Value = Value;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a tarantool value enum implementation")
}
fn visit_bool<E>(self, value: bool) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::Boolean(value))
}
fn visit_i64<E>(self, value: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::Integer(value))
}
fn visit_u64<E>(self, value: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::Unsigned(value))
}
fn visit_f64<E>(self, value: f64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::Number(value))
}
fn visit_str<E>(self, v: &str) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::String(v.to_string()))
}
fn visit_string<E>(self, value: String) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::String(value))
}
fn visit_unit<E>(self) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
Ok(Value::Null(tlua::Null))
}
}
/// Custom Implementation `de::Deserialize`, because if using standard `#derive[Deserialize]`
/// then each `Value` type record deserialize as Value
/// and it doesn't correct for result format
impl<'de> Deserialize<'de> for Value {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
deserializer.deserialize_any(ValueVistor)
}
}
impl Eq for Value {}
type BoxExecuteTuple = Vec<Value>;
......
......@@ -51,6 +51,29 @@ pub enum Value {
String(String),
}
impl From<crate::executor::result::Value> for Value {
fn from(value: crate::executor::result::Value) -> Self {
match value {
crate::executor::result::Value::Boolean(v) => Value::Boolean(v),
// Here is absolutely stupid solution!
// d128 supports floating point values, and f64 conversion cannot cause errors.
// But in crate d128 there is no implementation of `From<f64>`, and the development
// of the crate is dead.
// We have only one way to convert f64 to d128 is a `FromStr` trait, which is also
// not implemented idiomatically. Realization despite the signature with a potential
// error, but always returns a value. For this reason, this trait does not implement
// a Error, since we can be sure that all checks were passed when f64 was received.
crate::executor::result::Value::Number(v) => Value::Number(
d128::from_str(&v.to_string()).expect("Invalid decimal float literal"),
),
crate::executor::result::Value::Integer(v) => Value::Number(d128::from(v)),
crate::executor::result::Value::String(v) => Value::String(v),
crate::executor::result::Value::Unsigned(v) => Value::Number(d128::from(v)),
crate::executor::result::Value::Null(_) => Value::Null,
}
}
}
impl Eq for Value {}
impl fmt::Display for Value {
......
mod extargs;
use std::cell::RefCell;
use std::convert::TryInto;
use std::os::raw::c_int;
......@@ -7,9 +9,12 @@ use tarantool::error::TarantoolErrorCode;
use tarantool::log::{say, SayLevel};
use tarantool::tuple::{AsTuple, FunctionArgs, FunctionCtx, Tuple};
use crate::errors::QueryPlannerError;
use crate::executor::engine::{cartridge, Engine};
use crate::executor::Query;
use self::extargs::{BucketCalcArgs, BucketCalcArgsDict};
thread_local!(static QUERY_ENGINE: RefCell<cartridge::Runtime> = RefCell::new(cartridge::Runtime::new().unwrap()));
#[derive(Serialize, Deserialize)]
......@@ -32,25 +37,58 @@ pub extern "C" fn invalidate_caching_schema(ctx: FunctionCtx, _: FunctionArgs) -
0
}
#[derive(Serialize, Deserialize)]
struct BucketCalcArgs {
pub val: String,
}
impl AsTuple for BucketCalcArgs {}
#[no_mangle]
pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let args: Tuple = args.into();
let args = args.into_struct::<BucketCalcArgs>().unwrap();
QUERY_ENGINE.with(|e| {
let result = e.clone().into_inner().determine_bucket_id(&args.val);
let result = e.clone().into_inner().determine_bucket_id(&args.rec);
ctx.return_mp(&result).unwrap();
0
})
}
#[no_mangle]
pub extern "C" fn calculate_bucket_id_by_dict(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
QUERY_ENGINE.with(|e| {
let mut engine = e.clone().into_inner();
// Update cartridge schema after cache invalidation by calling `apply_config()` in lua code.
if engine.has_metadata() {
match engine.load_metadata() {
Ok(_) => *e.borrow_mut() = engine.clone(),
Err(e) => {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
}
// Closure for more concise error propagation from calls nested in the bucket calculation
let propagate_err = || -> Result<u64, QueryPlannerError> {
// Deserialization error
let bca = BucketCalcArgsDict::try_from(args)?;
// Error in filtering bucket calculation arguments by sharding keys
let fk = engine
.extract_sharding_keys(bca.space, bca.rec)?
.into_iter()
.fold(String::new(), |mut acc, v| {
let s: String = v.into();
acc.push_str(s.as_str());
acc
});
Ok(e.clone().into_inner().determine_bucket_id(fk.as_str()))
};
match propagate_err() {
Ok(bucket_id) => {
ctx.return_mp(&bucket_id).unwrap();
0
}
Err(e) => tarantool::set_error!(TarantoolErrorCode::ProcC, "{:?}", e),
}
})
}
#[derive(Debug, Serialize, Deserialize)]
struct ExecQueryArgs {
pub query: String,
......
use std::{collections::HashMap, convert::TryFrom, ops::Deref};
use serde::{de::Deserializer, Deserialize, Serialize};
use tarantool::tuple::{AsTuple, FunctionArgs, Tuple};
use crate::{errors::QueryPlannerError, executor::result::Value, ir::value::Value as IrValue};
#[derive(Serialize, Deserialize)]
pub struct BucketCalcArgs {
pub rec: String,
}
impl AsTuple for BucketCalcArgs {}
#[derive(Debug, Default, Serialize, PartialEq)]
/// Tuple with space name and `key:value` map of values
pub struct BucketCalcArgsDict {
/// Space name as `String`
pub space: String,
/// A key:value `HashMap` with key String and custom type IrValue
pub rec: HashMap<String, IrValue>,
}
impl Deref for BucketCalcArgsDict {
type Target = HashMap<String, IrValue>;
fn deref(&self) -> &Self::Target {
&self.rec
}
}
impl TryFrom<FunctionArgs> for BucketCalcArgsDict {
type Error = QueryPlannerError;
fn try_from(value: FunctionArgs) -> Result<Self, Self::Error> {
Tuple::from(value)
.into_struct::<BucketCalcArgsDict>()
.map_err(|e| {
QueryPlannerError::CustomError(format!(
"Error then deserializing tuple into BucketCalcArgsDict! {:?}",
e
))
})
}
}
impl<'de> Deserialize<'de> for BucketCalcArgsDict {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: Deserializer<'de>,
{
#[derive(Deserialize)]
#[serde(rename = "BucketCalcArgsDict")]
struct StructHelper(String, HashMap<String, Value>);
let struct_helper = StructHelper::deserialize(deserializer)?;
Ok(BucketCalcArgsDict {
space: struct_helper.0,
rec: struct_helper
.1
.into_iter()
.map(|(k, v)| (k, IrValue::from(v)))
.collect(),
})
}
}
impl AsTuple for BucketCalcArgsDict {}
#[cfg(test)]
mod tests;
use decimal::d128;
use crate::{
executor::engine::{mock::EngineMock, Engine},
ir::value::Value as IrValue,
parser::extargs::BucketCalcArgsDict,
};
#[test]
fn bucket_calc_args() {
let engine = EngineMock::new();
let args = BucketCalcArgsDict {
space: "EMPLOYEES".into(),
rec: [(String::from("ID"), IrValue::Number(d128!(100.0)))]
.iter()
.cloned()
.collect(),
};
let filtered_args = engine
.extract_sharding_keys(args.space, args.rec)
.unwrap()
.into_iter()
.fold(String::new(), |mut acc, v| {
let s: String = v.into();
acc.push_str(s.as_str());
acc
});
assert_eq!(engine.determine_bucket_id(&filtered_args), 2377);
let args = BucketCalcArgsDict {
space: "hash_testing".into(),
rec: [
(
String::from("identification_number"),
IrValue::Number(d128!(93312)),
),
(
String::from("product_code"),
IrValue::String("fff100af".into()),
),
(String::from("product_units"), IrValue::Number(d128!(10))),
(String::from("sys_op"), IrValue::Number(d128!(981.945))),
]
.iter()
.cloned()
.collect(),
};
let filtered_args = engine
.extract_sharding_keys(args.space, args.rec)
.unwrap()
.into_iter()
.fold(String::new(), |mut acc, v| {
let s: String = v.into();
acc.push_str(s.as_str());
acc
});
assert_eq!(engine.determine_bucket_id(&filtered_args), 7704);
}
......@@ -2,9 +2,12 @@ local vshard = require('vshard')
local cartridge = require('cartridge')
_G.query = nil
_G.calculate_bucket_id = nil
_G.calculate_bucket_id_by_dict = nil
_G.insert_record = nil
_G.sql_execute = nil
local function query(q)
local has_err, parser_res = pcall(
function()
......@@ -19,7 +22,8 @@ local function query(q)
return parser_res
end
local function insert_record(space_name, values)
local function calculate_bucket_id(space_name, values)
checks('string', 'table')
local shard_key = cartridge.config_get_deepcopy().schema.spaces[space_name].sharding_key
local shard_val = ''
......@@ -27,7 +31,28 @@ local function insert_record(space_name, values)
shard_val = shard_val .. tostring(values[key])
end
values['bucket_id'] = box.func["sbroad.calculate_bucket_id"]:call({ shard_val })
local bucket_id = box.func["sbroad.calculate_bucket_id"]:call({ shard_val })
return bucket_id
end
local function calculate_bucket_id_by_dict(space_name, values) -- luacheck: no unused args
checks('string', 'table')
local has_err, calc_err = pcall(
function()
return box.func["sbroad.calculate_bucket_id_by_dict"]:call({ space_name, values })
end
)
if has_err == false then
return nil, calc_err
end
return calc_err
end
local function insert_record(space_name, values)
values['bucket_id'] = calculate_bucket_id(space_name, values)
local res = vshard.router.call(
values['bucket_id'],
"write",
......@@ -41,13 +66,27 @@ local function init(opts) -- luacheck: no unused args
-- if opts.is_master then
-- end
_G.query = query
_G.calculate_bucket_id = calculate_bucket_id
_G.calculate_bucket_id_by_dict = calculate_bucket_id_by_dict
_G.insert_record = insert_record
_G.sql_execute = sql_execute
box.schema.func.create('sbroad.parse_sql', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.invalidate_caching_schema', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.calculate_bucket_id', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.execute_query', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.parse_sql', {
if_not_exists = true, language = 'C'
})
box.schema.func.create('sbroad.invalidate_caching_schema', {
if_not_exists = true, language = 'C'
})
box.schema.func.create('sbroad.calculate_bucket_id', {
if_not_exists = true, language = 'C'
})
box.schema.func.create('sbroad.calculate_bucket_id_by_dict', {
if_not_exists = true, language = 'C'
})
box.schema.func.create('sbroad.execute_query', {
if_not_exists = true, language = 'C'
})
return true
end
......
......@@ -43,6 +43,21 @@ g.after_each(
end
)
g.test_bucket_id_calculation = function()
local api = cluster:server("api-1").net_box
local r, err = api:call("calculate_bucket_id", { "testing_space", { id = 1, name = "123", product_units = 1 } })
t.assert_equals(err, nil)
t.assert_equals(r, 360)
r, err = api:call("calculate_bucket_id_by_dict", { "testing_space", { id = 1, name = "123", product_units = 1 } })
t.assert_equals(err, nil)
t.assert_equals(r, 360)
_, err = api:call("calculate_bucket_id_by_dict", { "testing_space", { id = 1 }})
t.assert_equals(err, "CustomError(\"The dict of args missed key/value to calculate bucket_id. Column: name\")")
end
g.test_incorrect_query = function()
local api = cluster:server("api-1").net_box
......@@ -332,4 +347,4 @@ g.test_empty_motion_result = function()
{1, "123", 1, 360}
},
})
end
\ No newline at end of file
end
......@@ -8,8 +8,8 @@ dependencies = {
'tarantool',
'lua >= 5.1',
'checks == 3.1.0-1',
'cartridge == 2.6.0-1',
'metrics == 0.8.0-1',
'cartridge == 2.7.3-1',
'metrics == 0.12.0-1',
'cartridge-cli-extensions == 1.1.1-1',
'luatest',
'luacov'
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment