Skip to content
Snippets Groups Projects
Commit ec20a0de authored by Igor Kuznetsov's avatar Igor Kuznetsov
Browse files

feat: add query execute function

Add sql executor function binding.
parent c33b60dc
No related branches found
No related tags found
1 merge request!1414sbroad import
......@@ -2,23 +2,27 @@ use std::ffi::CStr;
use std::os::raw::c_char;
use tarantool::ffi::lua::{
lua_State, lua_getglobal, lua_pushinteger, lua_setfield, lua_settop, lua_tointeger,
lua_tostring, LUA_GLOBALSINDEX,
lua_State, lua_getglobal, lua_pushinteger, lua_pushstring, lua_setfield, lua_settop,
lua_tointeger, lua_tostring, LUA_GLOBALSINDEX,
};
const LUA_FUNCS: &str = "local cartridge = require('cartridge');
local vshard = require('vshard')
local yaml = require('yaml')
function get_server_uri_by_bucket(bucket_id)
local replicaset = vshard.router.route(bucket_id)
function execute_sql(bucket_id, query)
local data, err = vshard.router.call(
bucket_id,
'read',
'box.execute',
{ query }
)
if replicaset['master'] ~= nil then
if replicaset.master['conn'] ~= nil then
return replicaset.master.conn.host .. ':' .. replicaset.master.conn.port
end
end
if err ~= nil then
error(err)
end
return ''
return yaml.encode(data)
end
function get_cluster_schema()
......@@ -55,13 +59,14 @@ pub fn init_cluster_functions(l: *mut lua_State) {
}
#[allow(dead_code)]
pub fn get_server_uri_by_bucket_id(l: *mut lua_State, bucket_id: isize) -> String {
pub fn execute_sql(l: *mut lua_State, bucket_id: isize, query: &str) -> String {
unsafe {
lua_getglobal(l, crate::c_ptr!("get_server_uri_by_bucket"));
lua_getglobal(l, crate::c_ptr!("execute_sql"));
lua_pushinteger(l, bucket_id);
lua_pushstring(l, query.as_ptr().cast::<i8>());
}
let res = unsafe { lua_pcall(l, 1, 1, 0) };
let res = unsafe { lua_pcall(l, 2, 1, 0) };
if res != 0 {
panic!("{} {:?}", res, unsafe {
CStr::from_ptr(lua_tostring(l, -1))
......
use crate::bucket::str_to_bucket_id;
use crate::cluster_lua::{get_cluster_schema, init_cluster_functions};
use crate::cluster_lua::{execute_sql, get_cluster_schema, init_cluster_functions};
use crate::errors::QueryPlannerError;
use crate::query::ParsedTree;
use crate::schema::Cluster;
......@@ -9,10 +9,11 @@ use std::cell::RefCell;
use std::fmt;
use std::os::raw::c_int;
use tarantool::error::TarantoolErrorCode;
use tarantool::ffi::lua::luaT_state;
use tarantool::ffi::lua::{luaT_state, lua_State};
use tarantool::tuple::{AsTuple, FunctionArgs, FunctionCtx, Tuple};
thread_local!(static CARTRIDGE_SCHEMA: RefCell<Cluster> = RefCell::new(Cluster::new()));
thread_local!(static LUA_STATE: RefCell<*mut lua_State> = RefCell::new( unsafe { luaT_state() }));
#[derive(Serialize, Deserialize)]
struct Args {
......@@ -30,8 +31,7 @@ pub extern "C" fn parse_sql(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let args: Tuple = args.into();
let args = args.into_struct::<Args>().unwrap();
let l = unsafe { luaT_state() };
init_cluster_functions(l);
let l = LUA_STATE.try_with(|s| s.clone().into_inner()).unwrap();
CARTRIDGE_SCHEMA.with(|s| {
let mut schema = s.clone().into_inner();
......@@ -88,6 +88,47 @@ pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c
0
}
#[derive(Debug, Serialize, Deserialize)]
struct ExecQueryArgs {
pub bucket_id: isize,
pub query: String,
}
impl AsTuple for ExecQueryArgs {}
#[no_mangle]
pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int {
let args: Tuple = args.into();
let args = args.into_struct::<ExecQueryArgs>().unwrap();
let l = LUA_STATE.try_with(|s| s.clone().into_inner()).unwrap();
let result = execute_sql(l, args.bucket_id, &args.query);
ctx.return_mp(&result).unwrap();
0
}
#[derive(Debug, Serialize, Deserialize)]
struct InitArgs {
pub login: String,
pub password: String,
}
impl AsTuple for InitArgs {}
/**
Function must be called in router and storage roles of cartridge application.
*/
#[no_mangle]
pub extern "C" fn init(_ctx: FunctionCtx, _: FunctionArgs) -> c_int {
LUA_STATE
.try_with(|s| {
let l = s.clone().into_inner();
init_cluster_functions(l);
})
.unwrap();
0
}
#[derive(Serialize, Debug, Eq, PartialEq)]
pub struct QueryResult {
pub bucket_id: u64,
......
local vshard = require('vshard')
local cartridge = require('cartridge')
local log = require('log')
local yaml = require('yaml')
_G.query = nil
_G.insert_record = nil
_G.sql_execute = nil
local function query(q)
local has_err, parser_res = pcall(
......@@ -58,15 +59,25 @@ local function insert_record(space_name, values)
return res
end
local function sql_execute(bucket_id, sql_q)
return yaml.decode(box.func["sbroad.execute_query"]:call({ bucket_id, sql_q }))
end
local function init(opts) -- luacheck: no unused args
-- if opts.is_master then
-- end
_G.query = query
_G.insert_record = insert_record
_G.sql_execute = sql_execute
box.schema.func.create('sbroad.parse_sql', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.invalidate_caching_schema', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.calculate_bucket_id', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.execute_query', { if_not_exists = true, language = 'C' })
box.schema.func.create('sbroad.init', { if_not_exists = true, language = 'C' })
box.func["sbroad.init"]:call({})
return true
end
......
......@@ -14,6 +14,11 @@ local function init(opts) -- luacheck: no unused args
-- if opts.is_master then
-- end
_G.insert_map = insert_map
box.schema.func.create('sbroad.init', { if_not_exists = true, language = 'C' })
box.func["sbroad.init"]:call({})
return true
end
......
......@@ -53,8 +53,6 @@ g.test_simple_query = function()
rows = {},
})
-- fiber.sleep(300)
r, err = api:call("query", { [[SELECT * FROM "testing_space" where "id" = 1 and "name" = '123']] })
t.assert_equals(err, nil)
t.assert_equals(r, {
......@@ -92,3 +90,22 @@ g.test_union_query = function()
},
})
end
g.test_sql_execute = function ()
-- fiber.sleep(300)
local api = cluster:server("api-1").net_box
local r, err = api:call("sql_execute", { 359, [[SELECT * FROM "testing_space" where "id" = 1 and "name" = '123']] })
t.assert_equals(err, nil)
t.assert_equals(r, {
metadata = {
{name = "id", type = "integer"},
{name = "name", type = "string"},
{name = "product_units", type = "integer"},
{name = "bucket_id", type = "unsigned"},
},
rows = {
{ 1, "123", 1, 359 }
},
})
end
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment