diff --git a/CHANGELOG.md b/CHANGELOG.md index 10f008a1f9e2f4ca663beb72d45c5abf033a4157..5f52027f7c6b937e1455826a4609eefa0119e436 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,15 @@ with the `YY.0M.MICRO` scheme. - _Clusterwide SQL_ introduces the capability to delete sharded tables. To obtain more details, please consult `pico.help('sql')`. +- _Clusterwide SQL_ now supports simple `on conflict` clause in insert + to specify behaviour when duplicate error arises. Supported behaviour: + replace the conflicting tuple (`do replace`), skip the tuple which causes + error (`do nothing`), return error back to user (`do fail`). + +- _Clusterwide SQL_ now supports two execution limits per query: + max number of rows in virtual table and max number of VDBE opcodes + for local query execution. + ### Lua API: - Update `pico.LUA_API_VERSION`: `1.0.0` -> `1.3.0` diff --git a/sbroad b/sbroad index c5a56009214f6d0c5a53a294b2f5abb605156b40..60c6caa2b2efbba31ec8c9f09d09ff94629a006b 160000 --- a/sbroad +++ b/sbroad @@ -1 +1 @@ -Subproject commit c5a56009214f6d0c5a53a294b2f5abb605156b40 +Subproject commit 60c6caa2b2efbba31ec8c9f09d09ff94629a006b diff --git a/src/sql.rs b/src/sql.rs index b3f8a8ae3a8dac3a94989ec39844b910a2bca454..7f5fba821ec47201b9e784db83352f1f5f7c5e3c 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -31,10 +31,10 @@ pub const DEFAULT_BUCKET_COUNT: u64 = 3000; /// Dispatches a query to the cluster. #[proc(packed_args)] pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result<Tuple> { - let mut params = PatternWithParams::from(encoded_params); + let mut params = PatternWithParams::try_from(encoded_params).map_err(Error::from)?; let id = params.clone_id(); let ctx = params.extract_context(); - let tracer = params.get_tracer(); + let tracer = params.tracer; query_span::<Result<Tuple, Error>, _>( "\"api.router\"", diff --git a/src/sql/init.lua b/src/sql/init.lua index dd002736f90dee0bd9cd424f9dc735d2d679642b..2d7455205ffc2fba98ab6f1338a88e0358976a5a 100644 --- a/src/sql/init.lua +++ b/src/sql/init.lua @@ -5,7 +5,8 @@ local helper = require('sbroad.helper') local function trace(query, params, context, id) local has_err, parser_res = pcall( function() - return box.func[".dispatch_query"]:call({ query, params, context, id, true }) + return box.func[".dispatch_query"]:call({ + query, params, context, id, helper.constants.STAT_TRACER }) end ) @@ -31,7 +32,9 @@ local function sql(...) local has_err, parser_res = pcall( function() - return box.func[".dispatch_query"]:call({ query, params, box.NULL, box.NULL, false }) + return box.func[".dispatch_query"]:call({ + query, params, box.NULL, box.NULL, + helper.constants.GLOBAL_TRACER }) end ) diff --git a/src/sql/router.rs b/src/sql/router.rs index 940d112bb0f551d3e31c1336709bae89aa00649c..77939513c10c038294d1c4774cf84b8e0a818118 100644 --- a/src/sql/router.rs +++ b/src/sql/router.rs @@ -37,7 +37,7 @@ use sbroad::executor::engine::helpers::{ }; use sbroad::executor::engine::Metadata; use sbroad::ir::function::Function; -use sbroad::ir::relation::{Column, ColumnRole, Table, Type}; +use sbroad::ir::relation::{space_pk_columns, Column, ColumnRole, Table, Type}; use std::borrow::Cow; @@ -184,6 +184,7 @@ impl Vshard for RouterRuntime { optional: Binary, query_type: QueryType, conn_type: ConnectionType, + vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { exec_ir_on_all_buckets( &*self.metadata()?, @@ -191,6 +192,7 @@ impl Vshard for RouterRuntime { optional, query_type, conn_type, + vtable_max_rows, ) } @@ -222,6 +224,7 @@ impl Vshard for &RouterRuntime { optional: Binary, query_type: QueryType, conn_type: ConnectionType, + vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { exec_ir_on_all_buckets( &*self.metadata()?, @@ -229,6 +232,7 @@ impl Vshard for &RouterRuntime { optional, query_type, conn_type, + vtable_max_rows, ) } @@ -377,7 +381,7 @@ impl Metadata for RouterMetadata { format!("serde error: {e}"), ) })?; - let keys: Vec<_> = match &space_def.distribution { + let shard_key_cols: Vec<_> = match &space_def.distribution { Distribution::Global => { return Err(SbroadError::Invalid( Entity::Distribution, @@ -406,11 +410,17 @@ impl Metadata for RouterMetadata { )); } }; - let sharding_keys: &[&str] = &keys.iter().map(String::as_str).collect::<Vec<_>>(); + let sharding_key_arg: &[&str] = &shard_key_cols + .iter() + .map(String::as_str) + .collect::<Vec<_>>(); + let pk_cols = space_pk_columns(&name, &columns)?; + let pk_arg = &pk_cols.iter().map(String::as_str).collect::<Vec<_>>(); Table::new_seg( &normalize_name_from_sql(table_name), columns, - sharding_keys, + sharding_key_arg, + pk_arg, engine.into(), ) } diff --git a/src/sql/storage.rs b/src/sql/storage.rs index 184d37917c07e3869964cd089492c7022017fdf1..8e2204958558236913fe8a48dc464fa013fce48f 100644 --- a/src/sql/storage.rs +++ b/src/sql/storage.rs @@ -2,29 +2,23 @@ //! Implements the `sbroad` crate infrastructure //! for execution of the dispatched query plan subtrees. -use sbroad::debug; use sbroad::errors::{Action, Entity, SbroadError}; use sbroad::executor::bucket::Buckets; use sbroad::executor::engine::helpers::storage::meta::StorageMetadata; -use sbroad::executor::engine::helpers::storage::runtime::{ - prepare, read_prepared, read_unprepared, unprepare, write_prepared, write_unprepared, -}; +use sbroad::executor::engine::helpers::storage::runtime::unprepare; use sbroad::executor::engine::helpers::storage::PreparedStmt; use sbroad::executor::engine::helpers::vshard::get_random_bucket; -use sbroad::executor::engine::helpers::{compile_encoded_optional, execute_dml}; +use sbroad::executor::engine::helpers::{self}; use sbroad::executor::engine::{QueryCache, Vshard}; use sbroad::executor::ir::{ConnectionType, ExecutionPlan, QueryType}; use sbroad::executor::lru::{Cache, LRUCache, DEFAULT_CAPACITY}; use sbroad::executor::protocol::{Binary, RequiredData}; use sbroad::ir::value::Value; -use sbroad::warn; use std::{any::Any, cell::RefCell, rc::Rc}; use super::{router::calculate_bucket_id, DEFAULT_BUCKET_COUNT}; -use tarantool::tuple::Tuple; - thread_local!( static STATEMENT_CACHE: Rc<RefCell<LRUCache<String, PreparedStmt>>> = Rc::new( RefCell::new(LRUCache::new(DEFAULT_CAPACITY, Some(Box::new(unprepare))).unwrap()) @@ -70,6 +64,7 @@ impl Vshard for StorageRuntime { _optional: Binary, _query_type: QueryType, _conn_type: ConnectionType, + _vtable_max_rows: u64, ) -> Result<Box<dyn Any>, SbroadError> { Err(SbroadError::Unsupported( Entity::Runtime, @@ -126,168 +121,18 @@ impl StorageRuntime { raw_optional: &mut Vec<u8>, ) -> Result<Box<dyn Any>, SbroadError> { match required.query_type { - QueryType::DML => self.execute_dml(required, raw_optional), + QueryType::DML => helpers::execute_dml(self, required, raw_optional), QueryType::DQL => { if required.can_be_cached { - self.execute_cacheable_dql(required, raw_optional) - } else { - execute_non_cacheable_dql(required, raw_optional) - } - } - } - } - - #[allow(unused_variables)] - fn execute_dml( - &self, - required: &mut RequiredData, - raw_optional: &mut Vec<u8>, - ) -> Result<Box<dyn Any>, SbroadError> { - if required.query_type != QueryType::DML { - return Err(SbroadError::Invalid( - Entity::Plan, - Some("Expected a DML plan.".to_string()), - )); - } - - let result = execute_dml(self, raw_optional)?; - let tuple = Tuple::new(&(result,)) - .map_err(|e| SbroadError::Invalid(Entity::Tuple, Some(format!("{e:?}"))))?; - Ok(Box::new(tuple) as Box<dyn Any>) - } - - #[allow(unused_variables)] - fn execute_cacheable_dql( - &self, - required: &mut RequiredData, - raw_optional: &mut Vec<u8>, - ) -> Result<Box<dyn Any>, SbroadError> { - let plan_id = required.plan_id.clone(); - - if !required.can_be_cached { - return Err(SbroadError::Invalid( - Entity::Plan, - Some("Expected a plan that can be cached.".to_string()), - )); - } - - // Look for the prepared statement in the cache. - if let Some(stmt) = self - .cache - .try_borrow_mut() - .map_err(|e| { - SbroadError::FailedTo(Action::Borrow, Some(Entity::Cache), format!("{e}")) - })? - .get(&plan_id)? - { - let stmt_id = stmt.id()?; - // The statement was found in the cache, so we can execute it. - debug!( - Option::from("execute plan"), - &format!("Execute prepared statement: {stmt:?}"), - ); - let result = match required.query_type { - QueryType::DML => write_prepared(stmt_id, "", &required.parameters), - QueryType::DQL => read_prepared(stmt_id, "", &required.parameters), - }; - - // If prepared statement is invalid for some reason, fallback to the long pass - // and recompile the query. - if result.is_ok() { - return result; - } - } - debug!( - Option::from("execute plan"), - &format!("Failed to find a plan (id {plan_id}) in the cache."), - ); - - let (pattern_with_params, _tmp_spaces) = compile_encoded_optional(raw_optional)?; - let result = match prepare(&pattern_with_params.pattern) { - Ok(stmt) => { - let stmt_id = stmt.id()?; - debug!( - Option::from("execute plan"), - &format!( - "Created prepared statement {} for the pattern {}", - stmt_id, - stmt.pattern()? - ), - ); - self.cache - .try_borrow_mut() - .map_err(|e| { - SbroadError::FailedTo( - Action::Put, - None, - format!("prepared statement {stmt:?} into the cache: {e:?}"), - ) - })? - .put(plan_id, stmt)?; - // The statement was found in the cache, so we can execute it. - debug!( - Option::from("execute plan"), - &format!("Execute prepared statement: {stmt_id}"), - ); - if required.query_type == QueryType::DML { - write_prepared( - stmt_id, - &pattern_with_params.pattern, - &pattern_with_params.params, - ) + helpers::execute_cacheable_dql_with_raw_optional(self, required, raw_optional) } else { - read_prepared( - stmt_id, - &pattern_with_params.pattern, - &pattern_with_params.params, + helpers::execute_non_cacheable_dql_with_raw_optional( + raw_optional, + required.options.vtable_max_rows, + std::mem::take(&mut required.options.execute_options), ) } } - Err(e) => { - // Possibly the statement is correct, but doesn't fit into - // Tarantool's prepared statements cache (`sql_cache_size`). - // So we try to execute it bypassing the cache. - warn!( - Option::from("execute"), - &format!( - "Failed to prepare the statement: {}, error: {e}", - pattern_with_params.pattern - ), - ); - if required.query_type == QueryType::DML { - write_unprepared(&pattern_with_params.pattern, &pattern_with_params.params) - } else { - read_unprepared(&pattern_with_params.pattern, &pattern_with_params.params) - } - } - }; - - result - } -} - -fn execute_non_cacheable_dql( - required: &mut RequiredData, - raw_optional: &mut Vec<u8>, -) -> Result<Box<dyn Any>, SbroadError> { - if required.can_be_cached || required.query_type != QueryType::DQL { - return Err(SbroadError::Invalid( - Entity::Plan, - Some("Expected a DQL plan that can not be cached.".to_string()), - )); + } } - - let (pattern_with_params, _tmp_spaces) = compile_encoded_optional(raw_optional)?; - debug!( - Option::from("execute"), - &format!( - "Failed to execute the statement: {}", - pattern_with_params.pattern - ), - ); - warn!( - Option::from("execute"), - &format!("SQL pattern: {}", pattern_with_params.pattern), - ); - read_unprepared(&pattern_with_params.pattern, &pattern_with_params.params) } diff --git a/test/int/test_sql.py b/test/int/test_sql.py index c20a0b52212bc82cc7c7bd3ee5025bc0181f6555..dbdc2da2f7393eec90db3c0eb6fe46444e812855 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -185,3 +185,99 @@ def test_drop_table(cluster: Cluster): """ ) assert ddl["row_count"] == 1 + + +def test_insert_on_conflict(cluster: Cluster): + cluster.deploy(instance_count=2) + i1, _ = cluster.instances + + ddl = i1.sql( + """ + create table "t" ("a" integer not null, "b" int not null, primary key ("a")) + using memtx + distributed by ("b") + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + + dml = i1.sql( + """ + insert into "t" values (1, 1) + """ + ) + assert dml["row_count"] == 1 + + dml = i1.sql( + """ + insert into "t" values (1, 1) on conflict do nothing + """ + ) + assert dml["row_count"] == 0 + + data = i1.sql( + """select * from "t" + """ + ) + assert data["rows"] == [[1, 1]] + + dml = i1.sql( + """ + insert into "t" values (1, 2) on conflict do replace + """ + ) + assert dml["row_count"] == 1 + + data = i1.sql( + """select * from "t" + """ + ) + assert data["rows"] == [[1, 2]] + + +def test_sql_limits(cluster: Cluster): + cluster.deploy(instance_count=2) + i1, _ = cluster.instances + + ddl = i1.sql( + """ + create table "t" ("a" integer not null, "b" int not null, primary key ("a")) + using memtx + distributed by ("b") + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + + dml = i1.sql( + """ + insert into "t" values (1, 1), (2, 1) + """ + ) + assert dml["row_count"] == 2 + + with pytest.raises( + ReturnError, match="Reached a limit on max executed vdbe opcodes. Limit: 5" + ): + i1.sql( + """ + select * from "t" where "a" = 1 option(sql_vdbe_max_steps=5) + """ + ) + + dql = i1.sql( + """ + select * from "t" where "a" = 1 option(sql_vdbe_max_steps=50) + """ + ) + assert dql["rows"] == [[1, 1]] + + with pytest.raises( + ReturnError, + match=r"Exceeded maximum number of rows \(1\) in virtual table: 2", + ): + i1.sql( + """ + select * from "t" option(vtable_max_rows=1, sql_vdbe_max_steps=50) + """ + )