From a11aca169384c671b7774f11d3c247cf5c4d8d6d Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Tue, 11 Jul 2023 17:18:00 +0700
Subject: [PATCH] feat: implement CREATE TABLE SQL command

Adds support for creation of the sharded memtx spaces via pico.sql().
---
 CHANGELOG.md         |   3 +
 src/luamod.rs        |  13 ++++
 src/schema.rs        |  16 ++---
 src/sql.rs           | 147 +++++++++++++++++++++++++++++++++++++------
 test/int/test_sql.py |  78 ++++++++---------------
 5 files changed, 180 insertions(+), 77 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 8284974041..9f3b446138 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -24,6 +24,9 @@ with the `YY.0M.MICRO` scheme.
 - _Clusterwide SQL_ now features Lua documentation. Refer to
   `pico.help('sql')` for more information.
 
+- _Clusterwide SQL_ now enables the creation of sharded tables.
+  To learn more, please consult `pico.help('sql')`.
+
 ### Lua API:
 
 - Update `pico.LUA_API_VERSION`: `1.0.0` -> `1.3.0`
diff --git a/src/luamod.rs b/src/luamod.rs
index 19192fd2fb..c448433400 100644
--- a/src/luamod.rs
+++ b/src/luamod.rs
@@ -433,6 +433,19 @@ pub(crate) fn setup(args: &args::Run) {
 
         Example:
 
+            picodata> -- Create a sharded table 'wonderland'.
+            picodata> pico.sql([[
+                create table "wonderland" (
+                    "property" text not null,
+                    "value" integer,
+                    primary key ("property")
+                ) using memtx distributed by ("property")
+                options (timeout = 3.0)
+            ]])
+            ---
+            - row_count: 1
+            ...
+
             picodata> -- Insert a row into the 'wonderland' table using parameters
             picodata> -- as an optional argument.
             picodata> pico.sql(
diff --git a/src/schema.rs b/src/schema.rs
index 6b675174fd..f588320717 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -332,14 +332,14 @@ impl From<Field> for tarantool::space::Field {
 
 #[derive(Clone, Debug, LuaRead)]
 pub struct CreateSpaceParams {
-    id: Option<SpaceId>,
-    name: String,
-    format: Vec<Field>,
-    primary_key: Vec<String>,
-    distribution: DistributionParam,
-    by_field: Option<String>,
-    sharding_key: Option<Vec<String>>,
-    sharding_fn: Option<ShardingFn>,
+    pub(crate) id: Option<SpaceId>,
+    pub(crate) name: String,
+    pub(crate) format: Vec<Field>,
+    pub(crate) primary_key: Vec<String>,
+    pub(crate) distribution: DistributionParam,
+    pub(crate) by_field: Option<String>,
+    pub(crate) sharding_key: Option<Vec<String>>,
+    pub(crate) sharding_fn: Option<ShardingFn>,
     /// Timeout in seconds.
     ///
     /// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
diff --git a/src/sql.rs b/src/sql.rs
index 6a345bc23a..29b5019c36 100644
--- a/src/sql.rs
+++ b/src/sql.rs
@@ -41,27 +41,138 @@ pub fn dispatch_query(encoded_params: EncodedPatternWithParams) -> traft::Result
         || {
             let runtime = RouterRuntime::new()?;
             let mut query = Query::new(&runtime, &params.pattern, params.params)?;
-            match query.dispatch() {
-                Ok(mut any_tuple) => {
-                    if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() {
-                        debug!(
-                            Option::from("dispatch"),
-                            &format!("Dispatch result: {tuple:?}"),
-                        );
-                        let empty_tuple = Tuple::new(&()).map_err(|e| {
-                            SbroadError::FailedTo(Action::Decode, None, format!("tuple {:?}", e))
+            if query.is_ddl()? {
+                let ir_plan = query.get_exec_plan().get_ir_plan();
+                let top_id = ir_plan.get_top()?;
+                let ir_plan_mut = query.get_mut_exec_plan().get_mut_ir_plan();
+                let ddl = ir_plan_mut.get_mut_ddl_node(top_id)?;
+                match ddl {
+                    Ddl::CreateShardedTable {
+                        ref mut name,
+                        ref mut format,
+                        ref mut primary_key,
+                        ref mut sharding_key,
+                        ref mut timeout,
+                    } => {
+                        let format = format
+                            .iter_mut()
+                            .map(|f| Field {
+                                name: std::mem::take(&mut f.name),
+                                r#type: FieldType::from(&f.data_type),
+                                is_nullable: f.is_nullable,
+                            })
+                            .collect();
+                        let duration: f64 = std::mem::replace(timeout, Decimal::zero())
+                            .to_string()
+                            .parse()
+                            .map_err(|e| {
+                                SbroadError::Invalid(
+                                    Entity::SpaceMetadata,
+                                    Some(format!("timeout parsing error {e:?}")),
+                                )
+                            })?;
+                        let params = CreateSpaceParams {
+                            id: None,
+                            name: std::mem::take(name),
+                            format,
+                            primary_key: std::mem::take(primary_key),
+                            distribution: DistributionParam::Sharded,
+                            by_field: None,
+                            sharding_key: Some(std::mem::take(sharding_key)),
+                            sharding_fn: Some(ShardingFn::Murmur3),
+                            timeout: duration,
+                        };
+                        let timeout = Duration::from_secs_f64(params.timeout);
+                        let storage = &node::global()
+                            .map_err(|e| {
+                                SbroadError::Invalid(
+                                    Entity::Runtime,
+                                    Some(format!("raft node error {e:?}")),
+                                )
+                            })?
+                            .storage;
+                        let mut params = params.validate(storage).map_err(|e| {
+                            SbroadError::Invalid(
+                                Entity::SpaceMetadata,
+                                Some(format!("space parameters validation error {e:?}")),
+                            )
                         })?;
-                        let tuple: Tuple = std::mem::replace(tuple, empty_tuple);
-                        Ok(tuple)
-                    } else {
-                        Err(SbroadError::FailedTo(
-                            Action::Decode,
-                            None,
-                            format!("tuple {any_tuple:?}"),
-                        ))
+                        params.test_create_space(storage).map_err(|e| {
+                            SbroadError::Invalid(
+                                Entity::SpaceMetadata,
+                                Some(format!("space parameters test error {e:?}")),
+                            )
+                        })?;
+                        let ddl = params.into_ddl(storage).map_err(|e| {
+                            SbroadError::FailedTo(
+                                Action::Create,
+                                Some(Entity::SpaceMetadata),
+                                format!("{e:?}"),
+                            )
+                        })?;
+                        let schema_version =
+                            storage.properties.next_schema_version().map_err(|e| {
+                                SbroadError::FailedTo(
+                                    Action::Get,
+                                    Some(Entity::Schema),
+                                    format!("{e:?}"),
+                                )
+                            })?;
+                        let op = Op::DdlPrepare {
+                            schema_version,
+                            ddl,
+                        };
+                        let index = schema::prepare_schema_change(op, timeout).map_err(|e| {
+                            SbroadError::FailedTo(
+                                Action::Prepare,
+                                Some(Entity::Schema),
+                                format!("{e:?}"),
+                            )
+                        })?;
+                        schema::wait_for_ddl_commit(index, timeout).map_err(|e| {
+                            SbroadError::FailedTo(
+                                Action::Create,
+                                Some(Entity::Space),
+                                format!("{e:?}"),
+                            )
+                        })?;
+                        let result = ConsumerResult { row_count: 1 };
+                        Tuple::new(&(result,)).map_err(|e| {
+                            SbroadError::FailedTo(
+                                Action::Decode,
+                                Some(Entity::Tuple),
+                                format!("{:?}", e),
+                            )
+                        })
+                    }
+                }
+            } else {
+                match query.dispatch() {
+                    Ok(mut any_tuple) => {
+                        if let Some(tuple) = any_tuple.downcast_mut::<Tuple>() {
+                            debug!(
+                                Option::from("dispatch"),
+                                &format!("Dispatch result: {tuple:?}"),
+                            );
+                            let empty_tuple = Tuple::new(&()).map_err(|e| {
+                                SbroadError::FailedTo(
+                                    Action::Decode,
+                                    None,
+                                    format!("tuple {:?}", e),
+                                )
+                            })?;
+                            let tuple: Tuple = std::mem::replace(tuple, empty_tuple);
+                            Ok(tuple)
+                        } else {
+                            Err(SbroadError::FailedTo(
+                                Action::Decode,
+                                None,
+                                format!("tuple {any_tuple:?}"),
+                            ))
+                        }
                     }
+                    Err(e) => Err(e),
                 }
-                Err(e) => Err(e),
             }
         },
     );
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index d4926caa9f..e8a9c6954a 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -59,20 +59,15 @@ def test_select(cluster: Cluster):
     cluster.deploy(instance_count=2)
     i1, i2 = cluster.instances
 
-    space_id = 739
-    index = i1.propose_create_space(
-        dict(
-            id=space_id,
-            name="T",
-            format=[
-                dict(name="A", type="integer", is_nullable=False),
-            ],
-            primary_key=[dict(field="A")],
-            # sharding function is implicitly murmur3
-            distribution=dict(kind="sharded_implicitly", sharding_key=["A"]),
-        )
+    ddl = i1.sql(
+        """
+        create table t (a int, primary key (a))
+        using memtx
+        distributed by (a)
+        option (timeout = 3)
+    """
     )
-    i2.raft_wait_index(index, 3)
+    assert ddl["row_count"] == 1
 
     data = i1.sql("""insert into t values(1);""")
     assert data["row_count"] == 1
@@ -95,20 +90,14 @@ def test_hash(cluster: Cluster):
     cluster.deploy(instance_count=1)
     i1 = cluster.instances[0]
 
-    space_id = 777
-    index = i1.propose_create_space(
-        dict(
-            id=space_id,
-            name="T",
-            format=[
-                dict(name="A", type="integer", is_nullable=True),
-            ],
-            primary_key=[dict(field="A")],
-            # sharding function is implicitly murmur3
-            distribution=dict(kind="sharded_implicitly", sharding_key=["A"]),
-        )
+    ddl = i1.sql(
+        """
+        create table t (a int, primary key (a))
+        using memtx
+        distributed by (a)
+    """
     )
-    i1.raft_wait_index(index, 3)
+    assert ddl["row_count"] == 1
 
     # Calculate tuple hash with Lua
     tup = (1,)
@@ -126,19 +115,13 @@ def test_hash(cluster: Cluster):
 def test_select_lowercase_name(cluster: Cluster):
     i1, *_ = cluster.deploy(instance_count=1)
 
-    space_id = 837
-    index = i1.propose_create_space(
-        dict(
-            id=space_id,
-            name="lowercase_name",
-            format=[
-                dict(name="id", type="integer", is_nullable=False),
-            ],
-            primary_key=[dict(field="id")],
-            distribution=dict(kind="sharded_implicitly", sharding_key=["id"]),
-        )
+    ddl = i1.sql(
+        """
+        create table "lowercase_name" ("id" int, primary key ("id"))
+        distributed by ("id")
+    """
     )
-    i1.raft_wait_index(index, 3)
+    assert ddl["row_count"] == 1
 
     assert i1.call("box.space.lowercase_name:select") == []
 
@@ -151,20 +134,13 @@ def test_select_lowercase_name(cluster: Cluster):
 def test_select_string_field(cluster: Cluster):
     i1, *_ = cluster.deploy(instance_count=1)
 
-    space_id = 826
-    index = i1.propose_create_space(
-        dict(
-            id=space_id,
-            name="STUFF",
-            format=[
-                dict(name="id", type="integer", is_nullable=False),
-                dict(name="str", type="string", is_nullable=False),
-            ],
-            primary_key=[dict(field="id")],
-            distribution=dict(kind="sharded_implicitly", sharding_key=["id"]),
-        )
+    ddl = i1.sql(
+        """
+        create table "STUFF" ("id" integer not null, "str" string null, primary key ("id"))
+        distributed by ("id")
+    """
     )
-    i1.raft_wait_index(index, 3)
+    assert ddl["row_count"] == 1
 
     data = i1.sql("""insert into STUFF values(1337, 'foo');""")
     assert data["row_count"] == 1
-- 
GitLab