Skip to content
Snippets Groups Projects
Verified Commit 128afabf authored by Denis Smirnov's avatar Denis Smirnov
Browse files

feat: implement DROP TABLE command

Now it is possible to drop sharded tables with pico.sql() Lua
function.
parent a11aca16
No related branches found
No related tags found
1 merge request!604Support SQL CREATE TABLE command
......@@ -27,6 +27,9 @@ with the `YY.0M.MICRO` scheme.
- _Clusterwide SQL_ now enables the creation of sharded tables.
To learn more, please consult `pico.help('sql')`.
- _Clusterwide SQL_ introduces the capability to delete sharded tables.
To obtain more details, please consult `pico.help('sql')`.
### Lua API:
- Update `pico.LUA_API_VERSION`: `1.0.0` -> `1.3.0`
......
Subproject commit a132740ba664da4977ee705b0e820b3a985bc789
Subproject commit 08b0c3f64fd14a2a24973dae3e66462340bc8262
......@@ -440,7 +440,16 @@ pub(crate) fn setup(args: &args::Run) {
"value" integer,
primary key ("property")
) using memtx distributed by ("property")
options (timeout = 3.0)
option (timeout = 3.0)
]])
---
- row_count: 1
...
picodata> -- Drop 'wonderland' table.
picodata> pico.sql([[
drop table "wonderland"
option (timeout = 3.0)
]])
---
- row_count: 1
......
//! Clusterwide SQL query execution.
use crate::schema::{self, CreateSpaceParams, DistributionParam, Field, ShardingFn};
use crate::schema::{self, CreateSpaceParams, DistributionParam, Field, ShardingFn, SpaceDef};
use crate::sql::runtime::router::RouterRuntime;
use crate::sql::runtime::storage::StorageRuntime;
use crate::traft::{self, node, op::Op};
use crate::traft::op::{Ddl as OpDdl, Op};
use crate::traft::{self, node};
use sbroad::backend::sql::ir::{EncodedPatternWithParams, PatternWithParams};
use sbroad::debug;
......@@ -17,7 +18,6 @@ use sbroad::otm::query_span;
use std::time::Duration;
use ::tarantool::decimal::Decimal;
use ::tarantool::proc;
use ::tarantool::space::FieldType;
use ::tarantool::tuple::{RawBytes, Tuple};
......@@ -46,13 +46,22 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
let top_id = ir_plan.get_top()?;
let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan();
let ddl = ir_plan_mut.get_mut_ddl_node(top_id)?;
match ddl {
let timeout: f64 = ddl.timeout()?;
let storage = &node::global()
.map_err(|e| {
SbroadError::Invalid(
Entity::Runtime,
Some(format!("raft node error {e:?}")),
)
})?
.storage;
let ddl_op = match ddl {
Ddl::CreateShardedTable {
ref mut name,
ref mut format,
ref mut primary_key,
ref mut sharding_key,
ref mut timeout,
..
} => {
let format = format
.iter_mut()
......@@ -62,15 +71,6 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
is_nullable: f.is_nullable,
})
.collect();
let duration: f64 = std::mem::replace(timeout, Decimal::zero())
.to_string()
.parse()
.map_err(|e| {
SbroadError::Invalid(
Entity::SpaceMetadata,
Some(format!("timeout parsing error {e:?}")),
)
})?;
let params = CreateSpaceParams {
id: None,
name: std::mem::take(name),
......@@ -80,9 +80,8 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
by_field: None,
sharding_key: Some(std::mem::take(sharding_key)),
sharding_fn: Some(ShardingFn::Murmur3),
timeout: duration,
timeout,
};
let timeout = Duration::from_secs_f64(params.timeout);
let storage = &node::global()
.map_err(|e| {
SbroadError::Invalid(
......@@ -93,59 +92,63 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
.storage;
let mut params = params.validate(storage).map_err(|e| {
SbroadError::Invalid(
Entity::SpaceMetadata,
Entity::Table,
Some(format!("space parameters validation error {e:?}")),
)
})?;
params.test_create_space(storage).map_err(|e| {
SbroadError::Invalid(
Entity::SpaceMetadata,
Entity::Table,
Some(format!("space parameters test error {e:?}")),
)
})?;
let ddl = params.into_ddl(storage).map_err(|e| {
params.into_ddl(storage).map_err(|e| {
SbroadError::FailedTo(
Action::Create,
Some(Entity::SpaceMetadata),
Some(Entity::Table),
format!("{e:?}"),
)
})?;
let schema_version =
storage.properties.next_schema_version().map_err(|e| {
})?
}
Ddl::DropTable { ref name, .. } => {
let space_def: SpaceDef = storage
.spaces
.by_name(name)
.map_err(|e| {
SbroadError::FailedTo(
Action::Get,
Some(Entity::Schema),
Action::Find,
Some(Entity::Table),
format!("{e:?}"),
)
})?
.ok_or_else(|| {
SbroadError::FailedTo(
Action::Find,
Some(Entity::Table),
format!("{name} doesn't exist in pico_space"),
)
})?;
let op = Op::DdlPrepare {
schema_version,
ddl,
};
let index = schema::prepare_schema_change(op, timeout).map_err(|e| {
SbroadError::FailedTo(
Action::Prepare,
Some(Entity::Schema),
format!("{e:?}"),
)
})?;
schema::wait_for_ddl_commit(index, timeout).map_err(|e| {
SbroadError::FailedTo(
Action::Create,
Some(Entity::Space),
format!("{e:?}"),
)
})?;
let result = ConsumerResult { row_count: 1 };
Tuple::new(&(result,)).map_err(|e| {
SbroadError::FailedTo(
Action::Decode,
Some(Entity::Tuple),
format!("{:?}", e),
)
})
OpDdl::DropSpace { id: space_def.id }
}
}
};
let duration = Duration::from_secs_f64(timeout);
let schema_version = storage.properties.next_schema_version().map_err(|e| {
SbroadError::FailedTo(Action::Get, Some(Entity::Schema), format!("{e:?}"))
})?;
let op = Op::DdlPrepare {
schema_version,
ddl: ddl_op,
};
let index = schema::prepare_schema_change(op, duration).map_err(|e| {
SbroadError::FailedTo(Action::Prepare, Some(Entity::Schema), format!("{e:?}"))
})?;
schema::wait_for_ddl_commit(index, duration).map_err(|e| {
SbroadError::FailedTo(Action::Create, Some(Entity::Space), format!("{e:?}"))
})?;
let result = ConsumerResult { row_count: 1 };
Tuple::new(&(result,)).map_err(|e| {
SbroadError::FailedTo(Action::Decode, Some(Entity::Tuple), format!("{:?}", e))
})
} else {
match query.dispatch() {
Ok(mut any_tuple) => {
......
......@@ -146,3 +146,42 @@ def test_select_string_field(cluster: Cluster):
assert data["row_count"] == 1
data = i1.sql("""select * from STUFF """)
assert data["rows"] == [[1337, "foo"]]
def test_drop_table(cluster: Cluster):
cluster.deploy(instance_count=2)
i1, i2 = cluster.instances
ddl = i1.sql(
"""
create table "t" ("a" integer not null, "b" int not null, primary key ("b", "a"))
using memtx
distributed by ("a", "b")
option (timeout = 3)
"""
)
assert ddl["row_count"] == 1
ddl = i2.sql(
"""
drop table "t"
option (timeout = 3)
"""
)
assert ddl["row_count"] == 1
ddl = i2.sql(
"""
create table "t" ("a" integer not null, "b" int not null, primary key ("b", "a"))
using memtx
distributed by ("a", "b")
"""
)
assert ddl["row_count"] == 1
ddl = i1.sql(
"""
drop table "t"
"""
)
assert ddl["row_count"] == 1
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment