From 513b48257591069d6c68c1de9eeb2dfbcccae025 Mon Sep 17 00:00:00 2001
From: Georgy Moshkin <gmoshkin@picodata.io>
Date: Fri, 12 May 2023 19:12:11 +0300
Subject: [PATCH] feat: implement distribution sharded_implicitly / use it in
 test_select

---
 src/schema.rs              |  27 ++++++
 src/storage.rs             |  22 +++--
 src/traft/node.rs          | 121 ++++++++++++++++++++++----
 src/traft/rpc/ddl_apply.rs |  27 ++++--
 test/int/test_ddl.py       | 174 ++++++++++++++++++++++++++++++-------
 test/int/test_sql.py       |   7 +-
 6 files changed, 314 insertions(+), 64 deletions(-)

diff --git a/src/schema.rs b/src/schema.rs
index 72b9a0257b..67740805b6 100644
--- a/src/schema.rs
+++ b/src/schema.rs
@@ -150,6 +150,7 @@ pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
 
     match *ddl {
         Ddl::CreateSpace { id, .. } => {
+            sys_index.delete(&[id, 1])?;
             sys_index.delete(&[id, 0])?;
             sys_space.delete(&[id])?;
             set_pico_schema_version(version)?;
@@ -161,3 +162,29 @@ pub fn ddl_abort_on_master(ddl: &Ddl, version: u64) -> traft::Result<()> {
 
     Ok(())
 }
+
+// TODO: this should be a TryFrom in tarantool-module
+pub fn try_space_field_type_to_index_field_type(
+    ft: tarantool::space::FieldType,
+) -> Option<tarantool::index::FieldType> {
+    use tarantool::index::FieldType as IFT;
+    use tarantool::space::FieldType as SFT;
+    let res = match ft {
+        SFT::Any => None,
+        SFT::Unsigned => Some(IFT::Unsigned),
+        SFT::String => Some(IFT::String),
+        SFT::Number => Some(IFT::Number),
+        SFT::Double => Some(IFT::Double),
+        SFT::Integer => Some(IFT::Integer),
+        SFT::Boolean => Some(IFT::Boolean),
+        SFT::Varbinary => Some(IFT::Varbinary),
+        SFT::Scalar => Some(IFT::Scalar),
+        SFT::Decimal => Some(IFT::Decimal),
+        SFT::Uuid => Some(IFT::Uuid),
+        SFT::Datetime => Some(IFT::Datetime),
+        SFT::Interval => None,
+        SFT::Array => Some(IFT::Array),
+        SFT::Map => None,
+    };
+    res
+}
diff --git a/src/storage.rs b/src/storage.rs
index 983549fac4..2910769fe0 100644
--- a/src/storage.rs
+++ b/src/storage.rs
@@ -443,19 +443,31 @@ impl Clusterwide {
                 continue;
             }
 
-            let Some(index_def) = self.indexes.get(space_def.id, 0)? else {
+            let Some(pk_def) = self.indexes.get(space_def.id, 0)? else {
                 crate::warn_or_panic!("a space definition without a primary index arrived via snapshot: {space_def:?}");
                 continue;
             };
 
+            // For now we just assume that during space creation index with id 1
+            // exists if and only if it is a bucket_id index.
+            let bucket_id_def = self.indexes.get(space_def.id, 1)?;
+
             // XXX: this logic is duplicated in proc_apply_schema_change, but
             // the code is so small, it doesn't seem forth it extracting it for
             // now
-            let space_meta = space_def.to_space_metadata()?;
-            let index_meta = index_def.to_index_metadata();
+            let tt_space_def = space_def.to_space_metadata()?;
+            let tt_pk_def = pk_def.to_index_metadata();
+            let mut tt_bucket_id_def = None;
+            if let Some(def) = &bucket_id_def {
+                tt_bucket_id_def = Some(def.to_index_metadata());
+            }
+
+            sys_space.replace(&tt_space_def)?;
+            sys_index.replace(&tt_pk_def)?;
+            if let Some(def) = tt_bucket_id_def {
+                sys_index.replace(&def)?;
+            }
 
-            sys_space.replace(&space_meta)?;
-            sys_index.replace(&index_meta)?;
             if space_def.schema_version > new_pico_schema_version {
                 new_pico_schema_version = space_def.schema_version;
             }
diff --git a/src/traft/node.rs b/src/traft/node.rs
index fd24f22b48..01df7c6f2e 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -12,7 +12,7 @@ use crate::kvcell::KVCell;
 use crate::loop_start;
 use crate::r#loop::FlowControl;
 use crate::schema::ddl_abort_on_master;
-use crate::schema::{IndexDef, SpaceDef};
+use crate::schema::{Distribution, IndexDef, SpaceDef};
 use crate::storage::pico_schema_version;
 use crate::storage::ToEntryIter as _;
 use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName};
@@ -50,7 +50,10 @@ use ::tarantool::fiber::mutex::MutexGuard;
 use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
 use ::tarantool::fiber::r#async::{oneshot, watch};
 use ::tarantool::fiber::Mutex;
+use ::tarantool::index::FieldType as IFT;
+use ::tarantool::index::Part;
 use ::tarantool::proc;
+use ::tarantool::space::FieldType as SFT;
 use ::tarantool::tlua;
 use ::tarantool::transaction::start_transaction;
 use protobuf::Message as _;
@@ -824,6 +827,13 @@ impl NodeImpl {
                             .indexes
                             .update_operable(id, 0, true)
                             .expect("storage error");
+                        // For now we just assume that during space creation index with id 1
+                        // exists if and only if it is a bucket_id index.
+                        let res = self.storage.indexes.update_operable(id, 1, true);
+                        // TODO: maybe we should first check if this index
+                        // exists or check the space definition if this should
+                        // be done, but for now we just ignore the error "no such index"
+                        let _ = res;
                     }
                     _ => {
                         todo!()
@@ -949,33 +959,114 @@ impl NodeImpl {
             Ddl::CreateSpace {
                 id,
                 name,
-                format,
-                primary_key,
+                mut format,
+                mut primary_key,
                 distribution,
             } => {
-                let space_def = SpaceDef {
-                    id,
-                    name,
-                    distribution,
-                    schema_version,
-                    format,
-                    operable: false,
-                };
-                self.storage.spaces.insert(&space_def)?;
+                use ::tarantool::util::NumOrStr::*;
+
+                let mut last_pk_part_index = 0;
+                for pk_part in &mut primary_key {
+                    let (index, field) = match &pk_part.field {
+                        Num(index) => {
+                            if *index as usize >= format.len() {
+                                // Ddl prepare operations should be verified before being proposed,
+                                // so this shouldn't ever happen. But ignoring this is safe anyway,
+                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
+                                tlog!(
+                                    Warning,
+                                    "invalid primary key part: field index {index} is out of bound"
+                                );
+                                continue;
+                            }
+                            (*index, &format[*index as usize])
+                        }
+                        Str(name) => {
+                            let field_index = format.iter().zip(0..).find(|(f, _)| f.name == *name);
+                            let Some((field, index)) = field_index else {
+                                // Ddl prepare operations should be verified before being proposed,
+                                // so this shouldn't ever happen. But ignoring this is safe anyway,
+                                // because proc_apply_schema_change will catch the error and ddl will be aborted.
+                                tlog!(Warning, "invalid primary key part: field '{name}' not found");
+                                continue;
+                            };
+                            // We store all index parts as field indexes.
+                            pk_part.field = Num(index);
+                            (index, field)
+                        }
+                    };
+                    let Some(field_type) =
+                        crate::schema::try_space_field_type_to_index_field_type(field.field_type) else
+                    {
+                        // Ddl prepare operations should be verified before being proposed,
+                        // so this shouldn't ever happen. But ignoring this is safe anyway,
+                        // because proc_apply_schema_change will catch the error and ddl will be aborted.
+                        tlog!(Warning, "invalid primary key part: field type {} cannot be part of an index", field.field_type);
+                        continue;
+                    };
+                    // We overwrite the one provided in the request because
+                    // there's no reason for it to be there, we know the type
+                    // right here.
+                    pk_part.r#type = Some(field_type);
+                    pk_part.is_nullable = Some(field.is_nullable);
+                    last_pk_part_index = last_pk_part_index.max(index);
+                }
 
-                let index_def = IndexDef {
+                let primary_key_def = IndexDef {
                     id: 0,
                     name: "primary_key".into(),
                     space_id: id,
                     schema_version,
-                    // TODO: fill up parts with defaults/stuff we know
                     parts: primary_key,
                     operable: false,
                     // TODO: support other cases
                     unique: true,
                     local: true,
                 };
-                self.storage.indexes.insert(&index_def)?;
+                self.storage.indexes.insert(&primary_key_def)?;
+
+                match distribution {
+                    Distribution::Global => {
+                        // Nothing else is needed
+                    }
+                    Distribution::ShardedByField { .. } => {
+                        todo!()
+                    }
+                    Distribution::ShardedImplicitly { .. } => {
+                        // TODO: if primary key is not the first field or
+                        // there's some space between key parts, we want
+                        // bucket_id to go closer to the beginning of the tuple,
+                        // but this will require to update primary key part
+                        // indexes, so somebody should do that at some point.
+                        let bucket_id_index = last_pk_part_index + 1;
+                        format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into());
+
+                        let bucket_id_def = IndexDef {
+                            id: 1,
+                            name: "bucket_id".into(),
+                            space_id: id,
+                            schema_version,
+                            parts: vec![Part::field(bucket_id_index)
+                                .field_type(IFT::Unsigned)
+                                .is_nullable(false)],
+                            operable: false,
+                            unique: false,
+                            // TODO: support other cases
+                            local: true,
+                        };
+                        self.storage.indexes.insert(&bucket_id_def)?;
+                    }
+                }
+
+                let space_def = SpaceDef {
+                    id,
+                    name,
+                    distribution,
+                    schema_version,
+                    format,
+                    operable: false,
+                };
+                self.storage.spaces.insert(&space_def)?;
             }
 
             Ddl::CreateIndex {
diff --git a/src/traft/rpc/ddl_apply.rs b/src/traft/rpc/ddl_apply.rs
index 6b1e5629e9..38abe4610c 100644
--- a/src/traft/rpc/ddl_apply.rs
+++ b/src/traft/rpc/ddl_apply.rs
@@ -80,26 +80,35 @@ pub fn apply_schema_change(storage: &Clusterwide, ddl: &Ddl, version: u64) -> Re
 
     match *ddl {
         Ddl::CreateSpace { id, .. } => {
-            let space_info = storage
+            let pico_space_def = storage
                 .spaces
                 .get(id)?
                 .ok_or_else(|| Error::other(format!("space with id #{id} not found")))?;
             // TODO: set defaults
-            // TODO: handle `distribution`
-            let space_meta = space_info.to_space_metadata()?;
+            let tt_space_def = pico_space_def.to_space_metadata()?;
 
-            let index_info = storage.indexes.get(id, 0)?.ok_or_else(|| {
+            let pico_pk_def = storage.indexes.get(id, 0)?.ok_or_else(|| {
                 Error::other(format!(
                     "primary index for space {} not found",
-                    space_info.name
+                    pico_space_def.name
                 ))
             })?;
-            // TODO: set index parts from space format
-            let index_meta = index_info.to_index_metadata();
+            let tt_pk_def = pico_pk_def.to_index_metadata();
+
+            // For now we just assume that during space creation index with id 1
+            // exists if and only if it is a bucket_id index.
+            let mut tt_bucket_id_def = None;
+            let pico_bucket_id_def = storage.indexes.get(id, 1)?;
+            if let Some(def) = &pico_bucket_id_def {
+                tt_bucket_id_def = Some(def.to_index_metadata());
+            }
 
             let res = (|| -> tarantool::Result<()> {
-                sys_space.insert(&space_meta)?;
-                sys_index.insert(&index_meta)?;
+                sys_space.insert(&tt_space_def)?;
+                sys_index.insert(&tt_pk_def)?;
+                if let Some(def) = tt_bucket_id_def {
+                    sys_index.insert(&def)?;
+                }
                 set_pico_schema_version(version)?;
 
                 Ok(())
diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py
index 940223bb29..52b62d64d8 100644
--- a/test/int/test_ddl.py
+++ b/test/int/test_ddl.py
@@ -45,7 +45,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
             id=666,
             name="stuff",
             format=[dict(name="id", type="unsigned", is_nullable=False)],
-            primary_key=[dict(field=1, type="unsigned")],
+            primary_key=[dict(field=0, type="unsigned")],
             distribution=dict(kind="global"),
         ),
     )
@@ -60,11 +60,11 @@ def test_ddl_create_space_bulky(cluster: Cluster):
     assert i2.call("box.space._pico_property:get", "current_schema_version")[1] == 2
 
     # Space was created and is operable
-    space_info = [666, "stuff", ["global"], [["id", "unsigned", False]], 2, True]
-    assert i1.call("box.space._pico_space:get", 666) == space_info
-    assert i2.call("box.space._pico_space:get", 666) == space_info
+    pico_space_def = [666, "stuff", ["global"], [["id", "unsigned", False]], 2, True]
+    assert i1.call("box.space._pico_space:get", 666) == pico_space_def
+    assert i2.call("box.space._pico_space:get", 666) == pico_space_def
 
-    space_meta = [
+    tt_space_def = [
         666,
         1,
         "stuff",
@@ -73,56 +73,168 @@ def test_ddl_create_space_bulky(cluster: Cluster):
         dict(),
         [dict(name="id", type="unsigned", is_nullable=False)],
     ]
-    assert i1.call("box.space._space:get", 666) == space_meta
-    assert i2.call("box.space._space:get", 666) == space_meta
+    assert i1.call("box.space._space:get", 666) == tt_space_def
+    assert i2.call("box.space._space:get", 666) == tt_space_def
 
     # Primary index was also created
     # TODO: maybe we want to replace these `None`s with the default values when
     # inserting the index definition into _pico_index?
-    index_info = [
+    pico_pk_def = [
         666,
         0,
         "primary_key",
         True,
-        [[1, "unsigned", None, None, None]],
+        [[0, "unsigned", None, False, None]],
         2,
         True,
         True,
     ]
-    assert i1.call("box.space._pico_index:get", [666, 0]) == index_info
-    assert i2.call("box.space._pico_index:get", [666, 0]) == index_info
+    assert i1.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
+    assert i2.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
 
-    index_meta = [
+    tt_pk_def = [
         666,
         0,
         "primary_key",
         "tree",
         dict(unique=True),
-        [[1, "unsigned", None, None, None]],
+        [[0, "unsigned", None, False, None]],
     ]
-    assert i1.call("box.space._index:get", [666, 0]) == index_meta
-    assert i2.call("box.space._index:get", [666, 0]) == index_meta
+    assert i1.call("box.space._index:get", [666, 0]) == tt_pk_def
+    assert i2.call("box.space._index:get", [666, 0]) == tt_pk_def
 
     # Add a new replicaset master
     i3 = cluster.add_instance(wait_online=True, replicaset_id="r2")
 
     # It's schema was updated automatically
     assert i3.call("box.space._pico_property:get", "current_schema_version")[1] == 2
-    assert i3.call("box.space._pico_space:get", 666) == space_info
-    assert i3.call("box.space._pico_index:get", [666, 0]) == index_info
+    assert i3.call("box.space._pico_space:get", 666) == pico_space_def
+    assert i3.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
     # TODO: this fails
-    assert i3.call("box.space._space:get", 666) == space_meta
-    assert i3.call("box.space._index:get", [666, 0]) == index_meta
+    assert i3.call("box.space._space:get", 666) == tt_space_def
+    assert i3.call("box.space._index:get", [666, 0]) == tt_pk_def
 
     # Add a follower to the new replicaset
     i4 = cluster.add_instance(wait_online=True, replicaset_id="r2")
 
     # It's schema was updated automatically as well
     assert i4.call("box.space._pico_property:get", "current_schema_version")[1] == 2
-    assert i4.call("box.space._pico_space:get", 666) == space_info
-    assert i4.call("box.space._pico_index:get", [666, 0]) == index_info
-    assert i4.call("box.space._space:get", 666) == space_meta
-    assert i4.call("box.space._index:get", [666, 0]) == index_meta
+    assert i4.call("box.space._pico_space:get", 666) == pico_space_def
+    assert i4.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
+    assert i4.call("box.space._space:get", 666) == tt_space_def
+    assert i4.call("box.space._index:get", [666, 0]) == tt_pk_def
+
+
+def test_ddl_create_sharded_space(cluster: Cluster):
+    i1, i2 = cluster.deploy(instance_count=2, init_replication_factor=2)
+
+    # Propose a space creation which will succeed
+    schema_version = i1.next_schema_version()
+    op = dict(
+        kind="ddl_prepare",
+        schema_version=schema_version,
+        ddl=dict(
+            kind="create_space",
+            id=666,
+            name="stuff",
+            format=[
+                dict(name="id", type="unsigned", is_nullable=False),
+                dict(name="foo", type="integer", is_nullable=False),
+                dict(name="bar", type="string", is_nullable=False),
+            ],
+            primary_key=[dict(field="id")],
+            distribution=dict(kind="sharded_implicitly", sharding_key=["foo", "bar"]),
+        ),
+    )
+    # TODO: rewrite the test using pico.cas, when it supports ddl
+    index = i1.call("pico.raft_propose", op)
+
+    i1.call(".proc_sync_raft", index, (3, 0))
+    i2.call(".proc_sync_raft", index, (3, 0))
+
+    # Space was created and is operable
+    pico_space_def = [
+        666,
+        "stuff",
+        ["sharded_implicitly", ["foo", "bar"], "murmur3"],
+        [
+            ["id", "unsigned", False],
+            ["bucket_id", "unsigned", False],
+            ["foo", "integer", False],
+            ["bar", "string", False],
+        ],
+        schema_version,
+        True,
+    ]
+    assert i1.call("box.space._pico_space:get", 666) == pico_space_def
+    assert i2.call("box.space._pico_space:get", 666) == pico_space_def
+
+    tt_space_def = [
+        666,
+        1,
+        "stuff",
+        "memtx",
+        0,
+        dict(),
+        [
+            dict(name="id", type="unsigned", is_nullable=False),
+            dict(name="bucket_id", type="unsigned", is_nullable=False),
+            dict(name="foo", type="integer", is_nullable=False),
+            dict(name="bar", type="string", is_nullable=False),
+        ],
+    ]
+    assert i1.call("box.space._space:get", 666) == tt_space_def
+    assert i2.call("box.space._space:get", 666) == tt_space_def
+
+    # Primary index was also created
+    pico_pk_def = [
+        666,
+        0,
+        "primary_key",
+        True,
+        [[0, "unsigned", None, False, None]],
+        schema_version,
+        True,
+        True,
+    ]
+    assert i1.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
+    assert i2.call("box.space._pico_index:get", [666, 0]) == pico_pk_def
+
+    tt_pk_def = [
+        666,
+        0,
+        "primary_key",
+        "tree",
+        dict(unique=True),
+        [[0, "unsigned", None, False, None]],
+    ]
+    assert i1.call("box.space._index:get", [666, 0]) == tt_pk_def
+    assert i2.call("box.space._index:get", [666, 0]) == tt_pk_def
+
+    # This time bucket id was also created
+    pico_bucket_id_def = [
+        666,
+        1,
+        "bucket_id",
+        True,
+        [[1, "unsigned", None, False, None]],
+        schema_version,
+        True,
+        False,
+    ]
+    assert i1.call("box.space._pico_index:get", [666, 1]) == pico_bucket_id_def
+    assert i2.call("box.space._pico_index:get", [666, 1]) == pico_bucket_id_def
+
+    tt_bucket_id_def = [
+        666,
+        1,
+        "bucket_id",
+        "tree",
+        dict(unique=False),
+        [[1, "unsigned", None, False, None]],
+    ]
+    assert i1.call("box.space._index:get", [666, 1]) == tt_bucket_id_def
+    assert i2.call("box.space._index:get", [666, 1]) == tt_bucket_id_def
 
 
 def test_ddl_create_space_partial_failure(cluster: Cluster):
@@ -192,7 +304,7 @@ def test_ddl_from_snapshot(cluster: Cluster):
     i1.call(".proc_sync_raft", ret, (3, 0))
     i2.call(".proc_sync_raft", ret, (3, 0))
 
-    space_meta = [
+    tt_space_def = [
         666,
         1,
         "stuff",
@@ -201,10 +313,10 @@ def test_ddl_from_snapshot(cluster: Cluster):
         dict(),
         [dict(name="id", type="unsigned", is_nullable=False)],
     ]
-    assert i1.call("box.space._space:get", 666) == space_meta
-    assert i2.call("box.space._space:get", 666) == space_meta
+    assert i1.call("box.space._space:get", 666) == tt_space_def
+    assert i2.call("box.space._space:get", 666) == tt_space_def
 
-    index_meta = [
+    tt_pk_def = [
         666,
         0,
         "primary_key",
@@ -212,8 +324,8 @@ def test_ddl_from_snapshot(cluster: Cluster):
         dict(unique=True),
         [[1, "unsigned", None, None, None]],
     ]
-    assert i1.call("box.space._index:get", [666, 0]) == index_meta
-    assert i2.call("box.space._index:get", [666, 0]) == index_meta
+    assert i1.call("box.space._index:get", [666, 0]) == tt_pk_def
+    assert i2.call("box.space._index:get", [666, 0]) == tt_pk_def
 
     # Compact the log to trigger snapshot for the newcommer
     i1.raft_compact_log()
@@ -222,5 +334,5 @@ def test_ddl_from_snapshot(cluster: Cluster):
     i3 = cluster.add_instance(wait_online=True)
 
     # Check space was created from the snapshot data
-    assert i3.call("box.space._space:get", 666) == space_meta
-    assert i3.call("box.space._index:get", [666, 0]) == index_meta
+    assert i3.call("box.space._space:get", 666) == tt_space_def
+    assert i3.call("box.space._index:get", [666, 0]) == tt_pk_def
diff --git a/test/int/test_sql.py b/test/int/test_sql.py
index 3a3d98f1ac..5768c6fed9 100644
--- a/test/int/test_sql.py
+++ b/test/int/test_sql.py
@@ -73,16 +73,15 @@ def test_select(cluster: Cluster):
             name="T",
             format=[
                 dict(name="A", type="integer", is_nullable=False),
-                # TODO: this should be done automatically by picodata
-                dict(name="bucket_id", type="unsigned", is_nullable=False),
             ],
-            primary_key=[dict(field=0, type="integer")],
+            primary_key=[dict(field="A")],
             # sharding function is implicitly murmur3
-            distribution=dict(kind="sharded_implicitly", sharding_key=["bucket_id"]),
+            distribution=dict(kind="sharded_implicitly", sharding_key=["A"]),
         ),
     )
     # TODO: rewrite the test using pico.cas, when it supports ddl
     index = i1.call("pico.raft_propose", op)
+    i1.call(".proc_sync_raft", index, [3, 0])
 
     data = i1.sql("""insert into t values(1);""")
     assert data["row_count"] == 1
-- 
GitLab