diff --git a/src/schema.rs b/src/schema.rs index 106097b437184188c3eb63413b777978485f091b..ec4837e1e4740954bc1bd0ebbb80d98259c58d72 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -193,7 +193,7 @@ pub fn fields_to_format( pub enum Distribution { /// Tuples will be replicated to each instance. Global, - /// Tuples will be implicitely sharded. E.g. sent to the corresponding bucket + /// Tuples will be implicitly sharded. E.g. sent to the corresponding bucket /// which will be determined by a hash of the provided `sharding_key`. ShardedImplicitly { sharding_key: Vec<String>, @@ -201,7 +201,7 @@ pub enum Distribution { sharding_fn: ShardingFn, tier: String, }, - /// Tuples will be explicitely sharded. E.g. sent to the bucket + /// Tuples will be explicitly sharded. E.g. sent to the bucket /// which id is provided by field that is specified here. /// /// Default field name: "bucket_id" diff --git a/src/storage.rs b/src/storage.rs index 6356679e64f9d18ce40d6376304c7af0a1a56c98..50758b4da8d2f8d3f4d2b927eb51e8ae42740c80 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -797,7 +797,7 @@ impl Clusterwide { old_priv_versions.insert(def, schema_version); } - // There can be mutliple space dumps for a given space in SnapshotData, + // There can be multiple space dumps for a given space in SnapshotData, // because snapshots arrive in chunks. So when restoring space dumps we // shouldn't truncate spaces which previously already restored chunk of // space dump. The simplest way to solve this problem I found is to just @@ -833,7 +833,7 @@ impl Clusterwide { } tlog!( Debug, - "restoring global schema defintions took {:?} [{} tuples]", + "restoring global schema definitions took {:?} [{} tuples]", t0.elapsed(), tuples_count ); @@ -2935,7 +2935,7 @@ pub fn ddl_drop_function_on_master(func_id: u32) -> traft::Result<Option<TntErro /// * `Err(e)` in case of retryable errors. /// // FIXME: this function returns 2 kinds of errors: retryable and non-retryable. -// Currently this is impelemnted by returning one kind of errors as Err(e) and +// Currently this is implemented by returning one kind of errors as Err(e) and // the other as Ok(Some(e)). This was the simplest solution at the time this // function was implemented, as it requires the least amount of boilerplate and // error forwarding code. But this signature is not intuitive, so maybe there's @@ -2963,13 +2963,24 @@ pub fn ddl_create_space_on_master( })?; let tt_pk_def = pico_pk_def.to_index_metadata(&pico_space_def); - // For now we just assume that during space creation index with id 1 - // exists if and only if it is a bucket_id index. - let mut tt_bucket_id_def = None; - let pico_bucket_id_def = storage.indexes.get(space_id, 1)?; - if let Some(def) = &pico_bucket_id_def { - tt_bucket_id_def = Some(def.to_index_metadata(&pico_space_def)); - } + let bucket_id_def = match &pico_space_def.distribution { + Distribution::ShardedImplicitly { .. } => { + let index = IndexDef { + table_id: pico_space_def.id, + id: 1, + name: "bucket_id".into(), + ty: IndexType::Tree, + opts: vec![IndexOption::Unique(false)], + parts: vec![Part::field("bucket_id") + .field_type(IndexFieldType::Unsigned) + .is_nullable(false)], + operable: false, + schema_version: pico_space_def.schema_version, + }; + Some(index) + } + _ => None, + }; let res = (|| -> tarantool::Result<()> { if tt_pk_def.parts.is_empty() { @@ -2984,8 +2995,8 @@ pub fn ddl_create_space_on_master( } sys_space.insert(&tt_space_def)?; sys_index.insert(&tt_pk_def)?; - if let Some(def) = tt_bucket_id_def { - sys_index.insert(&def)?; + if let Some(def) = bucket_id_def { + sys_index.insert(&def.to_index_metadata(&pico_space_def))?; } Ok(()) diff --git a/src/traft/node.rs b/src/traft/node.rs index 1ba12e2849e27d015a45df34d171bfe473da440e..0eddcbcb75f7897d5d54cd3a15206c38c3afcaea 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -61,8 +61,7 @@ use ::tarantool::fiber::r#async::timeout::Error as TimeoutError; use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; use ::tarantool::fiber::r#async::{oneshot, watch}; use ::tarantool::fiber::Mutex; -use ::tarantool::index::FieldType as IFT; -use ::tarantool::index::{IndexType, Part}; +use ::tarantool::index::IndexType; use ::tarantool::proc; use ::tarantool::space::FieldType as SFT; use ::tarantool::time::Instant; @@ -1670,28 +1669,6 @@ impl NodeImpl { // indexes, so somebody should do that at some point. let bucket_id_index = last_pk_part_index + 1; format.insert(bucket_id_index as _, ("bucket_id", SFT::Unsigned).into()); - - let bucket_id_def = IndexDef { - table_id: id, - id: 1, - name: format!("{}_bucket_id", name), - ty: IndexType::Tree, - opts: vec![IndexOption::Unique(false)], - parts: vec![Part::field("bucket_id") - .field_type(IFT::Unsigned) - .is_nullable(false)], - operable: false, - schema_version, - }; - let res = self.storage.indexes.insert(&bucket_id_def); - if let Err(e) = res { - // Ignore the error for now, let governor deal with it. - tlog!( - Warning, - "failed creating index '{}': {e}", - bucket_id_def.name - ); - } } } diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 8e70b3b1659900d0871e45d905804e82bef5e977..363a468071bbd0ba85234415296e6ca45985be81 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -280,23 +280,10 @@ def test_ddl_create_sharded_space(cluster: Cluster): ############################################################################ # This time bucket id was also created - pico_bucket_id_def = [ - space_id, - 1, - "stuff_bucket_id", - "tree", - [dict(unique=False)], - [["bucket_id", "unsigned", None, False, None]], - True, - schema_version, - ] - assert i1.call("box.space._pico_index:get", [space_id, 1]) == pico_bucket_id_def - assert i2.call("box.space._pico_index:get", [space_id, 1]) == pico_bucket_id_def - tt_bucket_id_def = [ space_id, 1, - "stuff_bucket_id", + "bucket_id", "tree", dict(unique=False), [[1, "unsigned", None, False, None]], @@ -418,15 +405,9 @@ def test_ddl_create_table_abort(cluster: Cluster): ) assert i1.call("box.space._space:get", space_id) is not None - assert get_index_names(i1, space_id) == [ - f"{space_name}_pkey", - f"{space_name}_bucket_id", - ] + assert get_index_names(i1, space_id) == [f"{space_name}_pkey"] assert i2.call("box.space._space:get", space_id) is not None - assert get_index_names(i2, space_id) == [ - f"{space_name}_pkey", - f"{space_name}_bucket_id", - ] + assert get_index_names(i2, space_id) == [f"{space_name}_pkey"] # Wake the instance so that governor finds out there's a conflict # and aborts the ddl op. @@ -602,7 +583,7 @@ def test_ddl_create_table_from_snapshot_at_boot(cluster: Cluster): tt_bucket_id_def = [ space_id, 1, - "stuff_bucket_id", + "bucket_id", "tree", dict(unique=False), [[1, "unsigned", None, False, None]], diff --git a/test/int/test_sharding.py b/test/int/test_sharding.py index 0f07ec3cb471291656ede5c45fb7c8b3a8790457..c677869b50e1261d02c4c4a5bc613a8733f9de47 100644 --- a/test/int/test_sharding.py +++ b/test/int/test_sharding.py @@ -314,3 +314,55 @@ def test_gitlab_763_no_missing_buckets_after_proc_sharding_failure(cluster: Clus # All buckets are eventually available to the whole cluster for i in cluster.instances: Retriable(timeout=10, rps=4).call(check_available_buckets, i, 3000) + + +def get_table_size(instance: Instance, table_name: str): + table_size = instance.eval(f"return box.space.{table_name}:count()") + return table_size + + +def test_is_bucket_rebalancing_means_data_migration(cluster: Cluster): + i1 = cluster.add_instance() + cluster.wait_until_instance_has_this_many_active_buckets(i1, 3000) + + ddl = i1.sql( + """ + CREATE TABLE "sharded_table" ( "id" INTEGER NOT NULL, PRIMARY KEY ("id") ) + DISTRIBUTED BY ("id") + """ + ) + assert ddl["row_count"] == 1 + + table_size = 100000 + batch_size = 1000 + for start in range(1, table_size, batch_size): + response = i1.sql( + "INSERT INTO sharded_table VALUES " + + (", ".join([f"({i})" for i in range(start, start + batch_size)])) + ) + assert response["row_count"] == batch_size + + assert get_table_size(i1, "sharded_table") == table_size + + bucket_id_index_in_format = 1 + format = i1.eval("return box.space.sharded_table:format()") + assert format[bucket_id_index_in_format]["name"] == "bucket_id" + + data = i1.eval("return box.space.sharded_table:select()") + bucket_ids_of_table = set([tuple[bucket_id_index_in_format] for tuple in data]) + + # in picodata amount of buckets fixed and equal to 3000 + all_bucket_ids = set([i + 1 for i in range(3000)]) + assert len(all_bucket_ids - bucket_ids_of_table) == 0 + + for _ in range(9): + cluster.add_instance() + + others = cluster.instances[1:] + + # wait until vshard rebalancing done + for instance in others: + cluster.wait_until_instance_has_this_many_active_buckets(instance, 300) + + for instance in others: + assert get_table_size(instance, "sharded_table") > 0 diff --git a/test/int/test_sql.py b/test/int/test_sql.py index 029a8e0c01502b3aa548bf1fb34c82eeaf0ad2b5..5d128f02bafdea6fbd06444d5f3fe12158273d4d 100644 --- a/test/int/test_sql.py +++ b/test/int/test_sql.py @@ -4546,29 +4546,30 @@ def test_unique_index_name_for_sharded_table(cluster: Cluster): for table_name, other_table_name in zip(table_names, reversed(table_names)): with pytest.raises( TarantoolError, - match=f"""index {table_name}_bucket_id already exists""", + match="index bucket_id already exists", ): # try to create existing index i1.sql( - f""" create index "{table_name}_bucket_id" + f""" create index "bucket_id" on "{table_name}" (a) option (timeout = 3) """ ) with pytest.raises( TarantoolError, - match=f"""index {other_table_name}_bucket_id already exists""", + match="index bucket_id already exists", ): # try to create non existing index with existing name i1.sql( - f""" create index "{other_table_name}_bucket_id" - on "{table_name}" (a) option (timeout = 3) """ + f""" create index "bucket_id" + on "{other_table_name}" (a) option (timeout = 3) """ ) - # ensure that index on field bucket_id of sharded table exists - data = i1.sql( - f""" select * from "_pico_index" where "name" = '{table_name}_bucket_id' """ + # ensure that index on field bucket_id of sharded table exists in space _index + assert i1.eval(f"""return box.space.{table_name}.index.bucket_id""") is not None + assert ( + i1.eval(f"""return box.space.{other_table_name}.index.bucket_id""") + is not None ) - assert data != [] def test_metadata(instance: Instance):