diff --git a/src/cluster_lua.rs b/src/cluster_lua.rs index 4ac46370bf2e618a29bbb7777ef0e803ab9c1305..015333fd5a8fb861d7962ce3cac956c93d8cea58 100644 --- a/src/cluster_lua.rs +++ b/src/cluster_lua.rs @@ -2,23 +2,27 @@ use std::ffi::CStr; use std::os::raw::c_char; use tarantool::ffi::lua::{ - lua_State, lua_getglobal, lua_pushinteger, lua_setfield, lua_settop, lua_tointeger, - lua_tostring, LUA_GLOBALSINDEX, + lua_State, lua_getglobal, lua_pushinteger, lua_pushstring, lua_setfield, lua_settop, + lua_tointeger, lua_tostring, LUA_GLOBALSINDEX, }; const LUA_FUNCS: &str = "local cartridge = require('cartridge'); local vshard = require('vshard') +local yaml = require('yaml') -function get_server_uri_by_bucket(bucket_id) - local replicaset = vshard.router.route(bucket_id) +function execute_sql(bucket_id, query) + local data, err = vshard.router.call( + bucket_id, + 'read', + 'box.execute', + { query } + ) - if replicaset['master'] ~= nil then - if replicaset.master['conn'] ~= nil then - return replicaset.master.conn.host .. ':' .. replicaset.master.conn.port - end - end + if err ~= nil then + error(err) + end - return '' + return yaml.encode(data) end function get_cluster_schema() @@ -55,13 +59,14 @@ pub fn init_cluster_functions(l: *mut lua_State) { } #[allow(dead_code)] -pub fn get_server_uri_by_bucket_id(l: *mut lua_State, bucket_id: isize) -> String { +pub fn execute_sql(l: *mut lua_State, bucket_id: isize, query: &str) -> String { unsafe { - lua_getglobal(l, crate::c_ptr!("get_server_uri_by_bucket")); + lua_getglobal(l, crate::c_ptr!("execute_sql")); lua_pushinteger(l, bucket_id); + lua_pushstring(l, query.as_ptr().cast::<i8>()); } - let res = unsafe { lua_pcall(l, 1, 1, 0) }; + let res = unsafe { lua_pcall(l, 2, 1, 0) }; if res != 0 { panic!("{} {:?}", res, unsafe { CStr::from_ptr(lua_tostring(l, -1)) diff --git a/src/parser.rs b/src/parser.rs index 6c9b0ab0fbba9852fd6e1825a59e53d5f6fa4248..ab0d6ef2278aac2fa3ae744aeceaa6b60185021c 100644 --- a/src/parser.rs +++ b/src/parser.rs @@ -1,5 +1,5 @@ use crate::bucket::str_to_bucket_id; -use crate::cluster_lua::{get_cluster_schema, init_cluster_functions}; +use crate::cluster_lua::{execute_sql, get_cluster_schema, init_cluster_functions}; use crate::errors::QueryPlannerError; use crate::query::ParsedTree; use crate::schema::Cluster; @@ -9,10 +9,11 @@ use std::cell::RefCell; use std::fmt; use std::os::raw::c_int; use tarantool::error::TarantoolErrorCode; -use tarantool::ffi::lua::luaT_state; +use tarantool::ffi::lua::{luaT_state, lua_State}; use tarantool::tuple::{AsTuple, FunctionArgs, FunctionCtx, Tuple}; thread_local!(static CARTRIDGE_SCHEMA: RefCell<Cluster> = RefCell::new(Cluster::new())); +thread_local!(static LUA_STATE: RefCell<*mut lua_State> = RefCell::new( unsafe { luaT_state() })); #[derive(Serialize, Deserialize)] struct Args { @@ -30,8 +31,7 @@ pub extern "C" fn parse_sql(ctx: FunctionCtx, args: FunctionArgs) -> c_int { let args: Tuple = args.into(); let args = args.into_struct::<Args>().unwrap(); - let l = unsafe { luaT_state() }; - init_cluster_functions(l); + let l = LUA_STATE.try_with(|s| s.clone().into_inner()).unwrap(); CARTRIDGE_SCHEMA.with(|s| { let mut schema = s.clone().into_inner(); @@ -88,6 +88,47 @@ pub extern "C" fn calculate_bucket_id(ctx: FunctionCtx, args: FunctionArgs) -> c 0 } +#[derive(Debug, Serialize, Deserialize)] +struct ExecQueryArgs { + pub bucket_id: isize, + pub query: String, +} + +impl AsTuple for ExecQueryArgs {} + +#[no_mangle] +pub extern "C" fn execute_query(ctx: FunctionCtx, args: FunctionArgs) -> c_int { + let args: Tuple = args.into(); + let args = args.into_struct::<ExecQueryArgs>().unwrap(); + + let l = LUA_STATE.try_with(|s| s.clone().into_inner()).unwrap(); + let result = execute_sql(l, args.bucket_id, &args.query); + ctx.return_mp(&result).unwrap(); + 0 +} + +#[derive(Debug, Serialize, Deserialize)] +struct InitArgs { + pub login: String, + pub password: String, +} + +impl AsTuple for InitArgs {} + +/** +Function must be called in router and storage roles of cartridge application. +*/ +#[no_mangle] +pub extern "C" fn init(_ctx: FunctionCtx, _: FunctionArgs) -> c_int { + LUA_STATE + .try_with(|s| { + let l = s.clone().into_inner(); + init_cluster_functions(l); + }) + .unwrap(); + 0 +} + #[derive(Serialize, Debug, Eq, PartialEq)] pub struct QueryResult { pub bucket_id: u64, diff --git a/test_app/app/roles/api.lua b/test_app/app/roles/api.lua index ae2cc904f936dae4fbd3a870fc8874255bc3a3c2..1fbefe92ac7c110eef4e17c6d644a86a82be723b 100644 --- a/test_app/app/roles/api.lua +++ b/test_app/app/roles/api.lua @@ -1,9 +1,10 @@ local vshard = require('vshard') local cartridge = require('cartridge') -local log = require('log') +local yaml = require('yaml') _G.query = nil _G.insert_record = nil +_G.sql_execute = nil local function query(q) local has_err, parser_res = pcall( @@ -58,15 +59,25 @@ local function insert_record(space_name, values) return res end +local function sql_execute(bucket_id, sql_q) + return yaml.decode(box.func["sbroad.execute_query"]:call({ bucket_id, sql_q })) +end + local function init(opts) -- luacheck: no unused args -- if opts.is_master then -- end _G.query = query _G.insert_record = insert_record + _G.sql_execute = sql_execute box.schema.func.create('sbroad.parse_sql', { if_not_exists = true, language = 'C' }) box.schema.func.create('sbroad.invalidate_caching_schema', { if_not_exists = true, language = 'C' }) box.schema.func.create('sbroad.calculate_bucket_id', { if_not_exists = true, language = 'C' }) + box.schema.func.create('sbroad.execute_query', { if_not_exists = true, language = 'C' }) + box.schema.func.create('sbroad.init', { if_not_exists = true, language = 'C' }) + + box.func["sbroad.init"]:call({}) + return true end diff --git a/test_app/app/roles/storage.lua b/test_app/app/roles/storage.lua index 31263a0bf25eed680ec2a2c82f448d790f5e57bd..53b5b8fa02c616e4e5e0f35b857848198e0a5d24 100644 --- a/test_app/app/roles/storage.lua +++ b/test_app/app/roles/storage.lua @@ -14,6 +14,11 @@ local function init(opts) -- luacheck: no unused args -- if opts.is_master then -- end _G.insert_map = insert_map + + box.schema.func.create('sbroad.init', { if_not_exists = true, language = 'C' }) + + box.func["sbroad.init"]:call({}) + return true end diff --git a/test_app/test/integration/api_test.lua b/test_app/test/integration/api_test.lua index 5f221d0ad81fc56ce8e7c86b1306775bdd6124a1..ce2c178fa1b898844b4d522c58fe56aa594d8caa 100644 --- a/test_app/test/integration/api_test.lua +++ b/test_app/test/integration/api_test.lua @@ -53,8 +53,6 @@ g.test_simple_query = function() rows = {}, }) - -- fiber.sleep(300) - r, err = api:call("query", { [[SELECT * FROM "testing_space" where "id" = 1 and "name" = '123']] }) t.assert_equals(err, nil) t.assert_equals(r, { @@ -92,3 +90,22 @@ g.test_union_query = function() }, }) end + +g.test_sql_execute = function () + -- fiber.sleep(300) + + local api = cluster:server("api-1").net_box + local r, err = api:call("sql_execute", { 359, [[SELECT * FROM "testing_space" where "id" = 1 and "name" = '123']] }) + t.assert_equals(err, nil) + t.assert_equals(r, { + metadata = { + {name = "id", type = "integer"}, + {name = "name", type = "string"}, + {name = "product_units", type = "integer"}, + {name = "bucket_id", type = "unsigned"}, + }, + rows = { + { 1, "123", 1, 359 } + }, + }) +end