Skip to content
Snippets Groups Projects
Commit 3c03a5eb authored by Igor Kuznetsov's avatar Igor Kuznetsov
Browse files

feat: migrate getting vshard count of buckets from lua to rust and added...

feat: migrate getting vshard count of buckets from lua to rust and added tarantool logging to proxy functions
parent df21abe1
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -52,3 +52,14 @@ pub fn exec_query(bucket_id: u64, query: &str) -> Result<BoxExecuteResult, LuaEr
Ok(res)
}
/// Function get summary count of bucket from vshard
pub fn bucket_count() -> Result<u64, LuaError> {
let lua = unsafe { Lua::from_existing_state(luaT_state(), false) };
let bucket_count_fn: LuaFunction<_> =
lua.eval("return require('vshard').router.bucket_count")?;
let result = bucket_count_fn.call()?;
Ok(result)
}
......@@ -6,11 +6,12 @@ use std::os::raw::c_int;
use serde::{Deserialize, Serialize};
use sqlparser::ast::Select;
use tarantool::error::TarantoolErrorCode;
use tarantool::log::{say, SayLevel};
use tarantool::tuple::{AsTuple, FunctionArgs, FunctionCtx, Tuple};
use crate::bucket::str_to_bucket_id;
use crate::errors::QueryPlannerError;
use crate::lua_bridge::{exec_query, get_cluster_schema};
use crate::lua_bridge::{bucket_count, exec_query, get_cluster_schema};
use crate::query::ParsedTree;
use crate::schema::Cluster;
......@@ -19,7 +20,6 @@ thread_local!(static CARTRIDGE_SCHEMA: RefCell<Cluster> = RefCell::new(Cluster::
#[derive(Serialize, Deserialize)]
struct Args {
pub query: String,
pub bucket_count: u64,
}
#[derive(Debug, Serialize)]
......@@ -39,7 +39,14 @@ pub extern "C" fn parse_sql(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let text_schema = match get_cluster_schema() {
Ok(s) => s,
Err(e) => {
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string())
say(
SayLevel::Error,
"parser.rs",
40,
Option::from("get cluster schema error"),
&format!("{:?}", e),
);
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
schema = Cluster::from(text_schema);
......@@ -47,13 +54,37 @@ pub extern "C" fn parse_sql(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
*s.borrow_mut() = schema.clone();
}
let q = ParsedTree::new(args.query.as_str(), schema, args.bucket_count).unwrap();
let bucket_count = match bucket_count() {
Ok(c) => c,
Err(e) => {
say(
SayLevel::Error,
"parser.rs",
58,
Option::from("bucket id error"),
&format!("{:?}", e),
);
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
let q = ParsedTree::new(args.query.as_str(), schema, bucket_count).unwrap();
let result = match q.transform() {
Ok(p) => {
ctx.return_mp(&p).unwrap();
0
}
Err(e) => tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string()),
Err(e) => {
say(
SayLevel::Error,
"parser.rs",
73,
Option::from("transform error"),
&format!("{:?}", e),
);
tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string())
}
};
result
......@@ -77,7 +108,6 @@ pub extern "C" fn invalidate_caching_schema(ctx: FunctionCtx, _: FunctionArgs) -
#[derive(Serialize, Deserialize)]
struct BucketCalcArgs {
pub val: String,
pub bucket_count: u64,
}
impl AsTuple for BucketCalcArgs {}
......@@ -86,8 +116,20 @@ impl AsTuple for BucketCalcArgs {}
pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let args: Tuple = args.into();
let args = args.into_struct::<BucketCalcArgs>().unwrap();
let result = str_to_bucket_id(&args.val, args.bucket_count.try_into().unwrap());
let bucket_count = match bucket_count() {
Ok(c) => c,
Err(e) => {
say(
SayLevel::Error,
"parser.rs",
121,
Option::from("bucket id error"),
&format!("{:?}", e),
);
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
};
let result = str_to_bucket_id(&args.val, bucket_count.try_into().unwrap());
ctx.return_mp(&result).unwrap();
0
}
......@@ -110,7 +152,16 @@ pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
ctx.return_mp(&p).unwrap();
0
}
Err(e) => tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string()),
Err(e) => {
say(
SayLevel::Error,
"parser.rs",
152,
Option::from("query execution error"),
&format!("{:?}", e),
);
return tarantool::set_error!(TarantoolErrorCode::ProcC, "{}", e.to_string());
}
}
}
......
......@@ -8,7 +8,7 @@ _G.sql_execute = nil
local function query(q)
local has_err, parser_res = pcall(
function()
return box.func["sbroad.parse_sql"]:call({ q, vshard.router.bucket_count() })
return box.func["sbroad.parse_sql"]:call({ q })
end
)
......@@ -40,7 +40,7 @@ local function insert_record(space_name, values)
shard_val = shard_val .. tostring(values[key])
end
values['bucket_id'] = box.func["sbroad.calculate_bucket_id"]:call({ shard_val, vshard.router.bucket_count() })
values['bucket_id'] = box.func["sbroad.calculate_bucket_id"]:call({ shard_val })
local res = vshard.router.call(
values['bucket_id'],
"write",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment