Skip to content
Snippets Groups Projects
Commit 0d3b2a3b authored by Igor Kuznetsov's avatar Igor Kuznetsov
Browse files

refactoring: transform bucket query to async model

Was made determining uniq set of replicaset by set of buckets that avoid send the same queries to the same replicas. Bucket exec query function was fixed to async mode.
parent 185d20ef
No related branches found
No related tags found
1 merge request!1414sbroad import
use std::collections::HashSet;
use std::convert::TryInto;
use tarantool::log::{say, SayLevel};
......@@ -75,10 +76,7 @@ impl Engine for Runtime {
result.extend(cluster_exec_query(&sql)?)?;
}
Buckets::Filtered(list) => {
for bucket in list {
let temp_result = bucket_exec_query(*bucket, &sql)?;
result.extend(temp_result)?;
}
result.extend(bucket_exec_query(list, &sql)?)?;
}
}
......@@ -172,35 +170,67 @@ impl Runtime {
}
/// Send the query to a single bucket and merge results (map-reduce).
fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, QueryPlannerError> {
say(
SayLevel::Debug,
file!(),
line!().try_into().unwrap_or(0),
None,
&format!("Execute a query {:?} on bucket {:?}", query, bucket),
);
fn bucket_exec_query(
buckets: &HashSet<u64>,
query: &str,
) -> Result<BoxExecuteFormat, QueryPlannerError> {
let lua = tarantool::lua_state();
match lua.exec(
r#"
local vshard = require('vshard')
local yaml = require('yaml')
local log = require('log')
---get_uniq_replicaset_for_buckets - gets unique set of replicaset by bucket list
---@param buckets table - list of buckets.
function get_uniq_replicaset_for_buckets(buckets)
local res = {}
local uniq_replicas = {}
for _, bucket_id in pairs(buckets) do
local current_replicaset = vshard.router.route(bucket_id)
uniq_replicas[current_replicaset.uuid] = current_replicaset
end
function execute_sql(bucket_id, query)
local res, err = vshard.router.call(
bucket_id,
'read',
'box.execute',
{ query }
)
if err ~= nil then
error(err)
for _, r in pairs(uniq_replicas) do
table.insert(res, r)
end
return res
end
function execute_sql(buckets, query)
local replicas = get_uniq_replicaset_for_buckets(buckets)
local result = nil
local futures = {}
for _, replica in pairs(replicas) do
local future, err = replica:callro("box.execute", { query }, {is_async = true})
if err ~= nil then
error(err)
end
table.insert(futures, future)
log.debug("Execute a query " .. query .. " on replica")
end
for _, future in ipairs(futures) do
future:wait_result(360)
local res = future:result()
if res[1] == nil then
error(res[2])
end
if result == nil then
result = res[1]
else
for _, item in pairs(res[1].rows) do
table.insert(result.rows, item)
end
end
end
return result
end
"#,
) {
Ok(_) => {}
......@@ -223,7 +253,8 @@ fn bucket_exec_query(bucket: u64, query: &str) -> Result<BoxExecuteFormat, Query
QueryPlannerError::LuaError("Lua function `execute_sql` not found".into())
})?;
let res: BoxExecuteFormat = match exec_sql.call_with_args((bucket, query)) {
let lua_buckets: Vec<u64> = buckets.iter().copied().collect();
let res: BoxExecuteFormat = match exec_sql.call_with_args((lua_buckets, query)) {
Ok(v) => v,
Err(e) => {
say(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment