diff --git a/CHANGELOG.md b/CHANGELOG.md index bbe5c9419aef401ece68cc3703ded056364f4e7d..e96957d2035764d2731c0fa627e0890cdf0d9ca7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,8 @@ with the `YY.0M.MICRO` scheme. - Allow specifying `picodata connect [user@][host][:port]` format. It overrides the `--user` option. +- Allow creating sharded vinyl spaces via `pico.create_space`. + - _Clusterwide SQL_ now uses an internal module called `key_def` to determine tuple buckets. In case the spaces were sharded using a different hash function, executing SQL queries on these spaces would @@ -42,8 +44,10 @@ with the `YY.0M.MICRO` scheme. ### Lua API: -- Update `pico.LUA_API_VERSION`: `1.0.0` -> `2.1.0` +- Update `pico.LUA_API_VERSION`: `1.0.0` -> `2.2.0` - New semantics of `pico.create_space()`. It's idempotent now. +- `pico.create_space()` has new optional parameter: `engine`. + Note: global spaces can only have memtx engine. - Add `pico.drop_space()` - Add `pico.create_user()`, `pico.drop_user()` - Add `pico.create_role()`, `pico.drop_role()` diff --git a/sbroad b/sbroad index 60c6caa2b2efbba31ec8c9f09d09ff94629a006b..8c8c0c616865e94a144466097dc19cd3927da430 160000 --- a/sbroad +++ b/sbroad @@ -1 +1 @@ -Subproject commit 60c6caa2b2efbba31ec8c9f09d09ff94629a006b +Subproject commit 8c8c0c616865e94a144466097dc19cd3927da430 diff --git a/src/cas.rs b/src/cas.rs index 73722cae0a34dd0931cb3460b6218016e77718ca..3dba57eeb4e6fcb8fac0f8dcfbd1ad32515eea81 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -699,6 +699,7 @@ fn modifies_operable(op: &Op, space: SpaceId, storage: &Clusterwide) -> bool { /// Predicate tests based on the CaS Design Document. mod tests { + use tarantool::space::SpaceEngineType; use tarantool::tuple::ToTupleBuffer; use crate::schema::{Distribution, SpaceDef}; @@ -733,6 +734,7 @@ mod tests { format: vec![], primary_key: vec![], distribution: Distribution::Global, + engine: SpaceEngineType::Memtx, }); let drop_space = builder.with_op(Ddl::DropSpace { id: space_id }); let create_index = builder.with_op(Ddl::CreateIndex { @@ -769,6 +771,7 @@ mod tests { distribution: Distribution::Global, format: vec![], schema_version: 1, + engine: SpaceEngineType::Memtx, }) .unwrap(); assert!(t(&drop_space, Range::new(props).eq(&pending_schema_change)).is_err()); diff --git a/src/lib.rs b/src/lib.rs index 4c3fc67016a61633193d1ccac96fe1fc0e388ee9..7970fee6d3ad452aed799d1bbeece1f05e0024a8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -307,6 +307,7 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { read_only: false, wal_dir: args.data_dir.clone(), memtx_dir: args.data_dir.clone(), + vinyl_dir: args.data_dir.clone(), log_level: args.log_level() as u8, ..Default::default() }; @@ -377,6 +378,7 @@ fn start_boot(args: &args::Run) { replicaset_uuid: Some(instance.replicaset_uuid.clone()), wal_dir: args.data_dir.clone(), memtx_dir: args.data_dir.clone(), + vinyl_dir: args.data_dir.clone(), log_level: args.log_level() as u8, ..Default::default() }; @@ -545,6 +547,7 @@ fn start_join(args: &args::Run, instance_address: String) { replication: resp.box_replication.clone(), wal_dir: args.data_dir.clone(), memtx_dir: args.data_dir.clone(), + vinyl_dir: args.data_dir.clone(), log_level: args.log_level() as u8, ..Default::default() }; diff --git a/src/luamod.lua b/src/luamod.lua index 4be9f726c81609bf7b9fea3b3a630da5c008828a..feb8349096edbec615f62d5d72678a6c5e02b964 100644 --- a/src/luamod.lua +++ b/src/luamod.lua @@ -962,6 +962,7 @@ Params: - by_field (optional string), usually 'bucket_id' - sharding_key (optional table {string,...}) with field names - sharding_fn (optional string), only default 'murmur3' is supported for now + - engine (optional string), one of 'memtx' | 'vinyl', default: 'memtx' - timeout (optional number), in seconds, default: infinity Returns: @@ -972,7 +973,7 @@ Returns: Example: - -- Creates a global space 'friends_of_peppa' with two fields: + -- Creates a global memtx space 'friends_of_peppa' with two fields: -- id (unsigned) and name (string). pico.create_space({ name = 'friends_of_peppa', @@ -1016,7 +1017,7 @@ Example: -- Sharded spaces are updated via vshard api, see [1] vshard.router.callrw(bucket_id, 'box.space.wonderland:insert', {{'unicorns', 12}}) - + See also: [1]: https://www.tarantool.io/en/doc/latest/reference/reference_rock/vshard/vshard_router/ @@ -1032,6 +1033,7 @@ function pico.create_space(opts) by_field = 'string', sharding_key = 'table', sharding_fn = 'string', + engine = 'string', timeout = 'number', }) mandatory_param(opts, 'opts') diff --git a/src/luamod.rs b/src/luamod.rs index fae56fc01fc57f01a46ffdfaa3158797013fb936..1831ad9c5f95cc3948dc1d90df7035f3275bc768 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -88,10 +88,10 @@ pub(crate) fn setup(args: &args::Run) { picodata> pico.LUA_API_VERSION --- - - 2.1.0 + - 2.2.0 ... "}, - "2.1.0", + "2.2.0", ); luamod_set( diff --git a/src/schema.rs b/src/schema.rs index 03ceb27f5b6bee3584bbe1e5b9f94884e73892f3..5f30c5a0ef21707bb9dbc0c483a0fe78758ef39b 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -41,6 +41,7 @@ pub struct SpaceDef { pub format: Vec<tarantool::space::Field>, pub schema_version: u64, pub operable: bool, + pub engine: SpaceEngineType, } impl Encode for SpaceDef {} @@ -76,7 +77,7 @@ impl SpaceDef { // Do we want to be more explicit about user_id? user_id: uid()? as _, name: self.name.as_str().into(), - engine: SpaceEngineType::Memtx, + engine: self.engine, field_count: 0, flags, format, @@ -300,6 +301,8 @@ pub enum CreateSpaceError { ShardingPolicyUndefined, #[error("only one of sharding policy fields (`by_field`, `sharding_key`) should be set")] ConflictingShardingPolicy, + #[error("global spaces only support memtx engine")] + IncompatibleGlobalSpaceEngine, } impl From<CreateSpaceError> for Error { @@ -345,6 +348,7 @@ pub struct CreateSpaceParams { pub(crate) by_field: Option<String>, pub(crate) sharding_key: Option<Vec<String>>, pub(crate) sharding_fn: Option<ShardingFn>, + pub(crate) engine: Option<SpaceEngineType>, /// Timeout in seconds. /// /// Specifying the timeout identifies how long user is ready to wait for ddl to be applied. @@ -409,6 +413,12 @@ impl CreateSpaceParams { return Err(CreateSpaceError::FieldUndefined(part.clone()).into()); } } + // Global spaces must have memtx engine + if self.distribution == DistributionParam::Global + && self.engine.is_some_and(|e| e != SpaceEngineType::Memtx) + { + return Err(CreateSpaceError::IncompatibleGlobalSpaceEngine.into()); + } // All sharding key components exist in fields if self.distribution == DistributionParam::Sharded { match (&self.by_field, &self.sharding_key) { @@ -473,7 +483,7 @@ impl CreateSpaceParams { &self.name, &SpaceCreateOptions { if_not_exists: false, - engine: SpaceEngineType::Memtx, + engine: self.engine.unwrap_or_default(), id: Some(id), field_count: self.format.len() as u32, user: None, @@ -569,6 +579,7 @@ impl CreateSpaceParams { format, primary_key, distribution, + engine: self.engine.unwrap_or_default(), }; Ok(res) } @@ -725,6 +736,23 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, + engine: None, + timeout: None, + } + .test_create_space() + .unwrap(); + assert!(tarantool::space::Space::find("friends_of_peppa").is_none()); + + CreateSpaceParams { + id: Some(1337), + name: "friends_of_peppa".into(), + format: vec![], + primary_key: vec![], + distribution: DistributionParam::Sharded, + by_field: None, + sharding_key: None, + sharding_fn: None, + engine: Some(SpaceEngineType::Vinyl), timeout: None, } .test_create_space() @@ -740,6 +768,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .test_create_space() @@ -774,6 +803,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -789,6 +819,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -804,6 +835,7 @@ mod tests { by_field: None, sharding_key: Some(vec![field2.name.clone()]), sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -819,6 +851,7 @@ mod tests { by_field: None, sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]), sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -834,6 +867,7 @@ mod tests { by_field: Some(field2.name.clone()), sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -849,6 +883,7 @@ mod tests { by_field: None, sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -867,6 +902,7 @@ mod tests { by_field: Some(field2.name.clone()), sharding_key: Some(vec![]), sharding_fn: None, + engine: None, timeout: None, } .validate() @@ -879,15 +915,32 @@ mod tests { CreateSpaceParams { id: Some(new_id), name: new_space.into(), - format: vec![field1, field2.clone()], + format: vec![field1.clone(), field2.clone()], primary_key: vec![field2.name.clone()], distribution: DistributionParam::Sharded, - by_field: Some(field2.name), + by_field: Some(field2.name.clone()), sharding_key: None, sharding_fn: None, + engine: None, timeout: None, } .validate() .unwrap(); + + let err = CreateSpaceParams { + id: Some(new_id), + name: new_space.into(), + format: vec![field1, field2.clone()], + primary_key: vec![field2.name], + distribution: DistributionParam::Global, + by_field: None, + sharding_key: None, + sharding_fn: None, + engine: Some(SpaceEngineType::Vinyl), + timeout: None, + } + .validate() + .unwrap_err(); + assert_eq!(err.to_string(), "global spaces only support memtx engine"); } } diff --git a/src/sql.rs b/src/sql.rs index 39d639026cebe60959342ede055f6189bf36a5b3..90afa8730dd5f855629b4fef8f283a6ee9772fc1 100644 --- a/src/sql.rs +++ b/src/sql.rs @@ -96,6 +96,7 @@ fn reenterable_ddl_request( format, primary_key, sharding_key, + engine_type, .. } => { let format = format @@ -115,6 +116,7 @@ fn reenterable_ddl_request( by_field: None, sharding_key: Some(sharding_key), sharding_fn: Some(ShardingFn::Murmur3), + engine: Some(engine_type), timeout: None, }; params.validate()?; diff --git a/src/tarantool.rs b/src/tarantool.rs index f581c3308add32fd04910341d567fb928e889436..0993c92f548c028252b3e647422fd50651397d59 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -88,6 +88,7 @@ pub struct Cfg { pub wal_dir: String, pub memtx_dir: String, + pub vinyl_dir: String, pub memtx_memory: u64, @@ -116,6 +117,7 @@ impl Default for Cfg { wal_dir: ".".into(), memtx_dir: ".".into(), + vinyl_dir: ".".into(), memtx_memory: 32 * 1024 * 1024, diff --git a/src/traft/node.rs b/src/traft/node.rs index c8a379862221829d199e7fc10f8bec1dafc736e6..9a953beb90b3858d9516dbbd3b01a3d6268f4002 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -952,6 +952,7 @@ impl NodeImpl { mut format, mut primary_key, distribution, + engine, } => { use ::tarantool::util::NumOrStr::*; @@ -1071,6 +1072,7 @@ impl NodeImpl { schema_version, format, operable: false, + engine, }; let res = self.storage.spaces.insert(&space_def); if let Err(e) = res { diff --git a/src/traft/op.rs b/src/traft/op.rs index f8672cb0bfd705f92661912170855f591509a398..5193515be85a7c5fe0a53a67fd708f70bf62ec1e 100644 --- a/src/traft/op.rs +++ b/src/traft/op.rs @@ -6,6 +6,7 @@ use ::tarantool::space::{Field, SpaceId}; use ::tarantool::tlua; use ::tarantool::tuple::{ToTupleBuffer, Tuple, TupleBuffer}; use serde::{Deserialize, Serialize}; +use tarantool::space::SpaceEngineType; //////////////////////////////////////////////////////////////////////////////// // OpResult @@ -429,6 +430,7 @@ pub enum Ddl { format: Vec<Field>, primary_key: Vec<Part>, distribution: Distribution, + engine: SpaceEngineType, }, DropSpace { id: SpaceId, diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 801d3652133ae972f0b91c4b659ad21797dfaca0..33e1a9ee939435713f6571946fbaad1d833fdbd1 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -87,6 +87,7 @@ def test_ddl_lua_api(cluster: Cluster): [["id", "unsigned", False]], 2, True, + "memtx", ] assert i1.call("box.space._pico_space:get", space_id) == pico_space_def assert i2.call("box.space._pico_space:get", space_id) == pico_space_def @@ -108,10 +109,46 @@ def test_ddl_lua_api(cluster: Cluster): [["id", "unsigned", False]], 3, True, + "memtx", ] assert i1.call("box.space._pico_space:get", space_id) == pico_space_def assert i2.call("box.space._pico_space:get", space_id) == pico_space_def + # test vinyl space can be created + space_id = 1029 + pico_space_def = [ + space_id, + "stuffy", + ["sharded_implicitly", ["foo"], "murmur3"], + [ + ["id", "unsigned", False], + # Automatically generated by picodata + ["bucket_id", "unsigned", False], + ["foo", "integer", False], + ], + 4, + True, + "vinyl", + ] + cluster.create_space( + dict( + id=space_id, + name="stuffy", + format=[ + dict(name="id", type="unsigned", is_nullable=False), + dict(name="foo", type="integer", is_nullable=False), + ], + primary_key=["id"], + distribution="sharded", + sharding_key=["foo"], + sharding_fn="murmur3", + engine="vinyl", + ), + ) + assert i1.call("box.space._pico_space:get", space_id) == pico_space_def + assert i1.eval("return box.space.stuffy.engine") == "vinyl" + assert i2.eval("return box.space.stuffy.engine") == "vinyl" + # # pico.drop_space # @@ -178,6 +215,7 @@ def test_ddl_create_space_bulky(cluster: Cluster): format=[dict(name="id", type="unsigned", is_nullable=False)], primary_key=[dict(field="(this will cause an error)")], distribution=dict(kind="global"), + engine="memtx", ), ) @@ -241,6 +279,7 @@ def test_ddl_create_space_bulky(cluster: Cluster): [["id", "unsigned", False]], 2, True, + "memtx", ] assert i1.call("box.space._pico_space:get", space_id) == pico_space_def assert i2.call("box.space._pico_space:get", space_id) == pico_space_def @@ -355,6 +394,7 @@ def test_ddl_create_sharded_space(cluster: Cluster): ], schema_version, True, + "memtx", ] assert i1.call("box.space._pico_space:get", space_id) == pico_space_def assert i2.call("box.space._pico_space:get", space_id) == pico_space_def @@ -447,6 +487,7 @@ def test_ddl_create_space_unfinished_from_snapshot(cluster: Cluster): distribution=dict( kind="sharded_implicitly", sharding_key=["id"], sharding_fn="murmur3" ), + engine="memtx", ), wait_index=False, ) @@ -509,6 +550,7 @@ def test_ddl_create_space_abort(cluster: Cluster): sharding_key=["id"], sharding_fn="murmur3", ), + engine="memtx", ), wait_index=False, ) @@ -567,6 +609,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster): format=[dict(name="id", type="unsigned", is_nullable=False)], primary_key=[dict(field="id")], distribution=dict(kind="global"), + engine="memtx", ) index = i1.propose_create_space(space_def) @@ -754,6 +797,7 @@ def test_ddl_create_space_from_snapshot_at_catchup(cluster: Cluster): format=[dict(name="id", type="unsigned", is_nullable=False)], primary_key=["id"], distribution="global", + engine="memtx", ), ) i1.raft_wait_index(index) diff --git a/test/int/test_sql.py b/test/int/test_sql.py index 62c6c21c5a98ddeee8e3f7451acbd05f991151fa..a6917acd323f05880eda6d48ed8a13394bb6218e 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -219,6 +219,25 @@ def test_create_drop_table(cluster: Cluster): ) assert ddl["row_count"] == 1 + # Check vinyl space + ddl = i1.sql( + """ + create table "t" ("key" string not null, "value" string not null, primary key ("key")) + using vinyl + distributed by ("key") + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + + ddl = i2.sql( + """ + drop table "t" + option (timeout = 3) + """ + ) + assert ddl["row_count"] == 1 + def test_insert_on_conflict(cluster: Cluster): cluster.deploy(instance_count=2)