diff --git a/src/lib.rs b/src/lib.rs index 432631f15921e8ebef2281c64930daee0a6759b6..bd108eac9c19008ebcb2d025bb7510252e39693e 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,5 @@ #![allow(clippy::let_and_return)] +#![allow(clippy::needless_return)] use serde::{Deserialize, Serialize}; use ::raft::prelude as raft; diff --git a/src/luamod.lua b/src/luamod.lua index c817a8be47b54850f540808b8294dedd99ef285c..852c0ac812cae70138776a82abfeecbd3e265fd6 100644 --- a/src/luamod.lua +++ b/src/luamod.lua @@ -96,18 +96,7 @@ local function is_retriable_error(error) return false end - if error:find('operation request from different term') - or error:find('not a leader') - or error:find('log unavailable') - then - return true - end - - if error:find('compare-and-swap') then - return error:find('Compacted') or error:find('ConflictFound') - end - - return false + return pico._is_retriable_error_message(error) end -- Performs a reenterable schema change CaS request. On success returns an index diff --git a/src/luamod.rs b/src/luamod.rs index 51cd5f69ef07cbc027a5db3d147757f7d128ec83..08e656f13331ee6b6b59f5253aefbb9f7fa3d396 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -687,7 +687,6 @@ pub(crate) fn setup(args: &args::Run) { term, ranges: cas::schema_change_ranges().into(), }; - // TODO: pass the timeout let res = compare_and_swap(op, predicate, timeout)?; Ok(res) }, @@ -1364,4 +1363,26 @@ pub(crate) fn setup(args: &args::Run) { }) }, ); + + luamod_set( + &l, + "_is_retriable_error_message", + indoc! {" + pico._is_retriable_error_message(msg) + ============================ + + Internal API, see src/luamod.rs for the details. + + Params: + + 1. msg (string) + + Returns: + + (bool) + "}, + tlua::Function::new(|msg: String| -> bool { + crate::traft::error::is_retriable_error_message(&msg) + }), + ); } diff --git a/src/schema.rs b/src/schema.rs index ed718eac9b92b3761a4f19aa3ff54d3b74e0c260..03ceb27f5b6bee3584bbe1e5b9f94884e73892f3 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -349,7 +349,7 @@ pub struct CreateSpaceParams { /// /// Specifying the timeout identifies how long user is ready to wait for ddl to be applied. /// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts. - pub timeout: f64, + pub timeout: Option<f64>, } impl CreateSpaceParams { @@ -725,7 +725,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .test_create_space() .unwrap(); @@ -740,7 +740,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .test_create_space() .unwrap_err(); @@ -774,7 +774,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -789,7 +789,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -804,7 +804,7 @@ mod tests { by_field: None, sharding_key: Some(vec![field2.name.clone()]), sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -819,7 +819,7 @@ mod tests { by_field: None, sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]), sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -834,7 +834,7 @@ mod tests { by_field: Some(field2.name.clone()), sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -849,7 +849,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -867,7 +867,7 @@ mod tests { by_field: Some(field2.name.clone()), sharding_key: Some(vec![]), sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap_err(); @@ -885,7 +885,7 @@ mod tests { by_field: Some(field2.name), sharding_key: None, sharding_fn: None, - timeout: 0.0, + timeout: None, } .validate() .unwrap(); diff --git a/src/sql.rs b/src/sql.rs index cc0fd11c10b67b3be865e809b79d10ff1770a09c..65c2a8d75b1017b07e48e3db23e12c702e1c4ad8 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -1,15 +1,17 @@ //! Clusterwide SQL query execution. -use crate::schema::{self, CreateSpaceParams, DistributionParam, Field, ShardingFn, SpaceDef}; +use crate::schema::{CreateSpaceParams, DistributionParam, Field, ShardingFn}; use crate::sql::router::RouterRuntime; use crate::sql::storage::StorageRuntime; use crate::traft::error::Error; use crate::traft::op::{Ddl as OpDdl, Op}; use crate::traft::{self, node}; +use crate::util::duration_from_secs_f64_clamped; +use crate::{cas, unwrap_ok_or}; use sbroad::backend::sql::ir::{EncodedPatternWithParams, PatternWithParams}; use sbroad::debug; -use sbroad::errors::{Action, Entity, SbroadError}; +use sbroad::errors::{Action, SbroadError}; use sbroad::executor::engine::helpers::decode_msgpack; use sbroad::executor::protocol::{EncodedRequiredData, RequiredData}; use sbroad::executor::result::ConsumerResult; @@ -17,10 +19,9 @@ use sbroad::executor::Query; use sbroad::ir::ddl::Ddl; use sbroad::otm::query_span; -use std::time::Duration; - use ::tarantool::proc; use ::tarantool::space::FieldType; +use ::tarantool::time::Instant; use ::tarantool::tuple::{RawBytes, Tuple}; pub mod router; @@ -51,65 +52,10 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result let top_id = ir_plan.get_top().map_err(Error::from)?; let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan(); let ddl = ir_plan_mut.take_ddl_node(top_id).map_err(Error::from)?; - let timeout: f64 = ddl.timeout().map_err(Error::from)?; - let storage = &node::global()?.storage; - let ddl_op = match ddl { - Ddl::CreateShardedTable { - name, - mut format, - primary_key, - sharding_key, - .. - } => { - let format = format - .iter_mut() - .map(|f| Field { - name: std::mem::take(&mut f.name), - r#type: FieldType::from(&f.data_type), - is_nullable: f.is_nullable, - }) - .collect(); - let mut params = CreateSpaceParams { - id: None, - name, - format, - primary_key, - distribution: DistributionParam::Sharded, - by_field: None, - sharding_key: Some(sharding_key), - sharding_fn: Some(ShardingFn::Murmur3), - timeout, - }; - params.validate()?; - if params.space_exists()? { - let result = ConsumerResult { row_count: 0 }; - return Ok(Tuple::new(&[result])?); - } - params.choose_id_if_not_specified()?; - params.test_create_space()?; - params.into_ddl()? - } - Ddl::DropTable { ref name, .. } => { - let space_def: SpaceDef = - storage.spaces.by_name(name)?.ok_or_else(|| { - Error::from(SbroadError::FailedTo( - Action::Find, - Some(Entity::Table), - format!("{name} doesn't exist in pico_space"), - )) - })?; - OpDdl::DropSpace { id: space_def.id } - } - }; - let duration = Duration::from_secs_f64(timeout); - let schema_version = storage.properties.next_schema_version()?; - let op = Op::DdlPrepare { - schema_version, - ddl: ddl_op, - }; - let index = schema::prepare_schema_change(op, duration)?; - schema::wait_for_ddl_commit(index, duration)?; - let result = ConsumerResult { row_count: 1 }; + let timeout = duration_from_secs_f64_clamped(ddl.timeout()?); + let deadline = Instant::now().saturating_add(timeout); + let node = node::global()?; + let result = reenterable_ddl_request(node, ddl, deadline)?; Tuple::new(&(result,)).map_err(Error::from) } else { match query.dispatch() { @@ -136,6 +82,127 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result ) } +fn reenterable_ddl_request( + node: &node::Node, + ddl: Ddl, + deadline: Instant, +) -> traft::Result<ConsumerResult> { + let storage = &node.storage; + + // Check parameters + let params = match ddl { + Ddl::CreateShardedTable { + name, + format, + primary_key, + sharding_key, + .. + } => { + let format = format + .into_iter() + .map(|f| Field { + name: f.name, + r#type: FieldType::from(&f.data_type), + is_nullable: f.is_nullable, + }) + .collect(); + let params = CreateSpaceParams { + id: None, + name, + format, + primary_key, + distribution: DistributionParam::Sharded, + by_field: None, + sharding_key: Some(sharding_key), + sharding_fn: Some(ShardingFn::Murmur3), + timeout: None, + }; + params.validate()?; + Params::CreateSpace(params) + } + Ddl::DropTable { name, .. } => { + // Nothing to check + Params::DropSpace(name) + } + }; + + 'retry: loop { + if Instant::now() > deadline { + return Err(Error::Timeout); + } + + let index = node.read_index(deadline.duration_since(Instant::now()))?; + + if storage.properties.pending_schema_change()?.is_some() { + node.wait_index(index + 1, deadline.duration_since(Instant::now()))?; + continue 'retry; + } + + // Check for conflicts and make the op + let ddl = match ¶ms { + Params::CreateSpace(params) => { + if params.space_exists()? { + // Space already exists, no op needed + return Ok(ConsumerResult { row_count: 0 }); + } + // XXX: this is stupid, we pass raft op by value everywhere even + // though it's always just dropped right after serialization. + // This forces us to clone it quite often. The root problem is + // that we nest structs a lot and having references to structs + // in other structs (which is what we should be doing) is very + // painfull in rust. + let mut params = params.clone(); + params.choose_id_if_not_specified()?; + params.test_create_space()?; + params.into_ddl()? + } + Params::DropSpace(name) => { + let Some(space_def) = storage.spaces.by_name(name)? else { + // Space doesn't exist yet, no op needed + return Ok(ConsumerResult { row_count: 0 }); + }; + OpDdl::DropSpace { id: space_def.id } + } + }; + + let schema_version = storage.properties.next_schema_version()?; + let op = Op::DdlPrepare { + schema_version, + ddl, + }; + let term = raft::Storage::term(&node.raft_storage, index)?; + let predicate = cas::Predicate { + index, + term, + ranges: cas::schema_change_ranges().into(), + }; + let res = cas::compare_and_swap(op, predicate, deadline.duration_since(Instant::now())); + let (index, term) = unwrap_ok_or!(res, + Err(e) => { + if e.is_retriable() { + continue 'retry; + } else { + return Err(e); + } + } + ); + + node.wait_index(index, deadline.duration_since(Instant::now()))?; + + if term != raft::Storage::term(&node.raft_storage, index)? { + // Leader has changed and the entry got rolled back, retry. + continue 'retry; + } + + return Ok(ConsumerResult { row_count: 1 }); + } + + enum Params { + CreateSpace(CreateSpaceParams), + DropSpace(String), + } +} + /// Executes a query sub-plan on the local node. #[proc(packed_args)] pub fn execute(raw: &RawBytes) -> traft::Result<Tuple> { diff --git a/src/storage.rs b/src/storage.rs index a530dde18d0ef9c31d5b83d657dd97da4e2c89e6..5a1848f15f3af377de86c6e2afe345f9a9b10b2a 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -2494,7 +2494,7 @@ pub mod acl { pub fn on_master_create_user(user_def: &UserDef) -> tarantool::Result<()> { let sys_user = Space::from(SystemSpace::User); - // This impelemtation was copied from box.schema.user.create excluding the + // This implementation was copied from box.schema.user.create excluding the // password hashing. let user_id = user_def.id; let euid = ::tarantool::session::euid()?; @@ -2556,7 +2556,7 @@ pub mod acl { pub fn on_master_create_role(role_def: &RoleDef) -> tarantool::Result<()> { let sys_user = Space::from(SystemSpace::User); - // This impelemtation was copied from box.schema.role.create. + // This implementation was copied from box.schema.role.create. // Tarantool expects auth info to be a map `{}`, and currently the simplest // way to achieve this is to use a HashMap. diff --git a/src/traft/error.rs b/src/traft/error.rs index d8aa2814a9e6aedc1a46e76fa427911af943f80b..810d0bfb64b7ec1c4b70912726da0b9bf14168e6 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -81,6 +81,7 @@ pub enum Error { } impl Error { + #[inline(always)] pub fn other<E>(error: E) -> Self where E: Into<Box<dyn std::error::Error>>, @@ -89,20 +90,43 @@ impl Error { } /// Temporary solution until proc_cas returns structured errors + #[inline(always)] pub fn is_cas_err(&self) -> bool { self.to_string().contains("compare-and-swap") } /// Temporary solution until proc_cas returns structured errors + #[inline(always)] pub fn is_term_mismatch_err(&self) -> bool { self.to_string() .contains("operation request from different term") } /// Temporary solution until proc_cas returns structured errors + #[inline(always)] pub fn is_not_leader_err(&self) -> bool { self.to_string().contains("not a leader") } + + #[inline(always)] + pub fn is_retriable(&self) -> bool { + is_retriable_error_message(&self.to_string()) + } +} + +pub fn is_retriable_error_message(msg: &str) -> bool { + if msg.contains("not a leader") + || msg.contains("log unavailable") + || msg.contains("operation request from different term") + { + return true; + } + + if msg.contains("compare-and-swap") { + return msg.contains("Compacted") || msg.contains("ConflictFound"); + } + + return false; } impl<E> From<timeout::Error<E>> for Error diff --git a/test/int/test_sql.py b/test/int/test_sql.py index dbdc2da2f7393eec90db3c0eb6fe46444e812855..62c6c21c5a98ddeee8e3f7451acbd05f991151fa 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -148,7 +148,7 @@ def test_select_string_field(cluster: Cluster): assert data["rows"] == [[1337, "foo"]] -def test_drop_table(cluster: Cluster): +def test_create_drop_table(cluster: Cluster): cluster.deploy(instance_count=2) i1, i2 = cluster.instances @@ -162,6 +162,30 @@ def test_drop_table(cluster: Cluster): ) assert ddl["row_count"] == 1 + # Already exists -> ok. + ddl = i1.sql( + """ + create table "t" ("a" integer not null, "b" int not null, primary key ("b", "a")) + using memtx + distributed by ("a", "b") + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 0 + + # FIXME: this should fail + # see https://git.picodata.io/picodata/picodata/picodata/-/issues/331 + # Already exists with different format -> error. + ddl = i1.sql( + """ + create table "t" ("key" string not null, "value" string not null, primary key ("key")) + using memtx + distributed by ("key") + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 0 + ddl = i2.sql( """ drop table "t" @@ -170,6 +194,15 @@ def test_drop_table(cluster: Cluster): ) assert ddl["row_count"] == 1 + # Already dropped -> ok. + ddl = i2.sql( + """ + drop table "t" + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 0 + ddl = i2.sql( """ create table "t" ("a" integer not null, "b" int not null, primary key ("b", "a"))