schema.rs 30.93 KiB
use std::borrow::Cow;
use std::collections::{BTreeMap, HashSet};
use std::time::{Duration, Instant};
use tarantool::space::{FieldType, SpaceCreateOptions, SpaceEngineType};
use tarantool::space::{Space, SystemSpace};
use tarantool::transaction::{transaction, TransactionError};
use tarantool::{
index::IteratorType,
index::Metadata as IndexMetadata,
index::{IndexId, Part},
schema::space::SpaceMetadata,
space::SpaceId,
tlua::{self, LuaRead},
tuple::Encode,
util::Value,
};
use serde::{Deserialize, Serialize};
use crate::cas::{self, compare_and_swap};
use crate::storage::SPACE_ID_INTERNAL_MAX;
use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName};
use crate::traft::error::Error;
use crate::traft::op::{Ddl, Op};
use crate::traft::{self, event, node, RaftIndex};
use crate::util::instant_saturating_add;
////////////////////////////////////////////////////////////////////////////////
// SpaceDef
////////////////////////////////////////////////////////////////////////////////
/// Space definition.
///
/// Describes a user-defined space.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct SpaceDef {
pub id: SpaceId,
pub name: String,
pub distribution: Distribution,
pub format: Vec<tarantool::space::Field>,
pub schema_version: u64,
pub operable: bool,
}
impl Encode for SpaceDef {}
impl SpaceDef {
// Don't forget to update this, if fields of `SpaceDef` change.
pub const FIELD_OPERABLE: usize = 5;
pub fn to_space_metadata(&self) -> traft::Result<SpaceMetadata> {
use tarantool::session::uid;
// FIXME: this is copy pasted from tarantool::schema::space::create_space
let format = self
.format
.iter()
.map(|f| {
IntoIterator::into_iter([
("name".into(), Value::Str(f.name.as_str().into())),
("type".into(), Value::Str(f.field_type.as_str().into())),
("is_nullable".into(), Value::Bool(f.is_nullable)),
])
.collect()
})
.collect();
let mut flags = BTreeMap::new();
if matches!(self.distribution, Distribution::Global) {
flags.insert("group_id".into(), 1.into());
}
let space_def = SpaceMetadata {
id: self.id,
// Do we want to be more explicit about user_id?
user_id: uid()? as _,
name: self.name.as_str().into(),
engine: SpaceEngineType::Memtx,
field_count: 0,
flags,
format,
};
Ok(space_def)
}
}
////////////////////////////////////////////////////////////////////////////////
// Distribution
////////////////////////////////////////////////////////////////////////////////
/// Defines how to distribute tuples in a space across replicasets.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, LuaRead)]
#[serde(rename_all = "snake_case")]
#[serde(tag = "kind")]
pub enum Distribution {
/// Tuples will be replicated to each instance.
Global,
/// Tuples will be implicitely sharded. E.g. sent to the corresponding bucket
/// which will be determined by a hash of the provided `sharding_key`.
ShardedImplicitly {
sharding_key: Vec<String>,
#[serde(default)]
sharding_fn: ShardingFn,
},
/// Tuples will be explicitely sharded. E.g. sent to the bucket
/// which id is provided by field that is specified here.
///
/// Default field name: "bucket_id"
ShardedByField {
#[serde(default = "default_bucket_id_field")]
field: String,
},
}
fn default_bucket_id_field() -> String {
"bucket_id".into()
}
::tarantool::define_str_enum! {
/// Custom sharding functions are not yet supported.
#[derive(Default)]
pub enum ShardingFn {
Crc32 = "crc32",
#[default]
Murmur3 = "murmur3",
Xxhash = "xxhash",
Md5 = "md5",
}
}
////////////////////////////////////////////////////////////////////////////////
// IndexDef
////////////////////////////////////////////////////////////////////////////////
/// Index definition.
///
/// Describes a user-defined index.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct IndexDef {
pub space_id: SpaceId,
pub id: IndexId,
pub name: String,
pub local: bool,
pub parts: Vec<Part>,
pub schema_version: u64,
pub operable: bool,
pub unique: bool,
}
impl Encode for IndexDef {}
impl IndexDef {
// Don't forget to update this, if fields of `IndexDef` change.
pub const FIELD_OPERABLE: usize = 6;
pub fn to_index_metadata(&self) -> IndexMetadata {
use tarantool::index::IndexType;
let mut opts = BTreeMap::new();
opts.insert(Cow::from("unique"), Value::Bool(self.unique));
let index_meta = IndexMetadata {
space_id: self.space_id,
index_id: self.id,
name: self.name.as_str().into(),
r#type: IndexType::Tree,
opts,
parts: self.parts.clone(),
};
index_meta
}
}
////////////////////////////////////////////////////////////////////////////////
// UserDef
////////////////////////////////////////////////////////////////////////////////
/// User definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct UserDef {
pub id: UserId,
pub name: String,
pub schema_version: u64,
pub auth: AuthDef,
}
pub type UserId = u32;
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct AuthDef {
pub method: AuthMethod,
/// Base64 encoded digest.
pub data: String,
}
::tarantool::define_str_enum! {
pub enum AuthMethod {
ChapSha1 = "chap-sha1",
}
}
impl Encode for UserDef {}
impl UserDef {
pub const FIELD_AUTH: usize = 3;
}
////////////////////////////////////////////////////////////////////////////////
// RoleDef
////////////////////////////////////////////////////////////////////////////////
/// Role definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct RoleDef {
pub id: UserId,
pub name: String,
pub schema_version: u64,
}
impl Encode for RoleDef {}
////////////////////////////////////////////////////////////////////////////////
// PrivilegeDef
////////////////////////////////////////////////////////////////////////////////
/// Privilege definition.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Hash)]
pub struct PrivilegeDef {
/// Id of the user or role to whom the privilege is granted.
///
/// In tarantool users and roles are stored in the same space, which means a
/// role and a user cannot have the same id or name.
pub grantee_id: UserId,
pub object_type: String,
pub object_name: String,
pub privilege: String,
pub schema_version: u64,
}
impl Encode for PrivilegeDef {}
////////////////////////////////////////////////////////////////////////////////
// ...
////////////////////////////////////////////////////////////////////////////////
// TODO: this should be a TryFrom in tarantool-module
pub fn try_space_field_type_to_index_field_type(
ft: tarantool::space::FieldType,
) -> Option<tarantool::index::FieldType> {
use tarantool::index::FieldType as IFT;
use tarantool::space::FieldType as SFT;
let res = match ft {
SFT::Any => None,
SFT::Unsigned => Some(IFT::Unsigned),
SFT::String => Some(IFT::String),
SFT::Number => Some(IFT::Number),
SFT::Double => Some(IFT::Double),
SFT::Integer => Some(IFT::Integer),
SFT::Boolean => Some(IFT::Boolean),
SFT::Varbinary => Some(IFT::Varbinary),
SFT::Scalar => Some(IFT::Scalar),
SFT::Decimal => Some(IFT::Decimal),
SFT::Uuid => Some(IFT::Uuid),
SFT::Datetime => Some(IFT::Datetime),
SFT::Interval => None,
SFT::Array => Some(IFT::Array),
SFT::Map => None,
};
res
}
#[derive(Debug, thiserror::Error)]
pub enum DdlError {
#[error("space creation failed: {0}")]
CreateSpace(#[from] CreateSpaceError),
#[error("ddl operation was aborted")]
Aborted,
#[error("there is no pending ddl operation")]
NoPendingDdl,
}
#[derive(Debug, thiserror::Error)]
pub enum CreateSpaceError {
#[error("space with id {0} already exists")]
IdExists(SpaceId),
#[error("space with name {0} already exists")]
NameExists(String),
#[error("several fields have the same name: {0}")]
DuplicateFieldName(String),
#[error("no field with name: {0}")]
FieldUndefined(String),
#[error("distribution is `sharded`, but neither `by_field` nor `sharding_key` is set")]
ShardingPolicyUndefined,
#[error("only one of sharding policy fields (`by_field`, `sharding_key`) should be set")]
ConflictingShardingPolicy,
}
impl From<CreateSpaceError> for Error {
fn from(err: CreateSpaceError) -> Self {
DdlError::CreateSpace(err).into()
}
}
// TODO: Add `LuaRead` to tarantool::space::Field and use it
#[derive(Clone, Debug, LuaRead)]
pub struct Field {
pub name: String, // TODO(gmoshkin): &str
pub r#type: FieldType,
pub is_nullable: bool,
}
impl From<Field> for tarantool::space::Field {
fn from(field: Field) -> Self {
tarantool::space::Field {
name: field.name,
field_type: field.r#type,
is_nullable: field.is_nullable,
}
}
}
::tarantool::define_str_enum! {
#[derive(Default)]
pub enum DistributionParam {
#[default]
Global = "global",
Sharded = "sharded",
}
}
#[derive(Clone, Debug, LuaRead)]
pub struct CreateSpaceParams {
id: Option<SpaceId>,
name: String,
format: Vec<Field>,
primary_key: Vec<String>,
distribution: DistributionParam,
by_field: Option<String>,
sharding_key: Option<Vec<String>>,
sharding_fn: Option<ShardingFn>,
/// Timeout in seconds.
///
/// Specifying the timeout identifies how long user is ready to wait for ddl to be applied.
/// But it does not provide guarantees that a ddl will be aborted if wait for commit timeouts.
pub timeout: f64,
}
impl CreateSpaceParams {
pub fn validate(self, storage: &Clusterwide) -> traft::Result<ValidCreateSpaceParams> {
// Check that there is no space with this name
if storage.spaces.by_name(&self.name)?.is_some() {
return Err(CreateSpaceError::NameExists(self.name).into());
}
// Check that there is no space with this id (if specified)
if let Some(id) = self.id {
if storage.spaces.get(id)?.is_some() {
return Err(CreateSpaceError::IdExists(id).into());
}
if id <= SPACE_ID_INTERNAL_MAX {
crate::tlog!(Warning, "requested space id {id} is in the range 0..={SPACE_ID_INTERNAL_MAX} reserved for future use by picodata, you may have a conflict in a future version");
}
}
// All field names are unique
let mut field_names = HashSet::new();
for field in &self.format {
if !field_names.insert(field.name.as_str()) {
return Err(CreateSpaceError::DuplicateFieldName(field.name.clone()).into());
}
}
// All primary key components exist in fields
for part in &self.primary_key {
if !field_names.contains(part.as_str()) {
return Err(CreateSpaceError::FieldUndefined(part.clone()).into());
}
}
// All sharding key components exist in fields
if self.distribution == DistributionParam::Sharded {
match (&self.by_field, &self.sharding_key) {
(Some(by_field), None) => {
if !field_names.contains(by_field.as_str()) {
return Err(CreateSpaceError::FieldUndefined(by_field.clone()).into());
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as sharding `by_field` is set"
);
}
}
(None, Some(sharding_key)) => {
let mut parts = HashSet::new();
for part in sharding_key {
if !field_names.contains(part.as_str()) {
return Err(CreateSpaceError::FieldUndefined(part.clone()).into());
}
// And all parts are unique
if !parts.insert(part.as_str()) {
return Err(CreateSpaceError::DuplicateFieldName(part.clone()).into());
}
}
}
(None, None) => return Err(CreateSpaceError::ShardingPolicyUndefined.into()),
(Some(_), Some(_)) => {
return Err(CreateSpaceError::ConflictingShardingPolicy.into())
}
}
} else {
if self.by_field.is_some() {
crate::tlog!(
Warning,
"`by_field` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_key.is_some() {
crate::tlog!(
Warning,
"`sharding_key` is specified but will be ignored, as `distribution` is `global`"
);
}
if self.sharding_fn.is_some() {
crate::tlog!(
Warning,
"`sharding_fn` is specified but will be ignored, as `distribution` is `global`"
);
}
}
Ok(ValidCreateSpaceParams(self))
}
}
#[derive(Debug)]
pub struct ValidCreateSpaceParams(CreateSpaceParams);
impl ValidCreateSpaceParams {
/// Create space and then rollback.
///
/// Should be used for checking if a space with these params can be created.
pub fn test_create_space(&mut self, storage: &Clusterwide) -> traft::Result<()> {
let id = self.id(storage)?;
let params = &self.0;
let err = transaction(|| -> Result<(), Option<tarantool::error::Error>> {
::tarantool::schema::space::create_space(
¶ms.name,
&SpaceCreateOptions {
if_not_exists: false,
engine: SpaceEngineType::Memtx,
id: Some(id),
field_count: params.format.len() as u32,
user: None,
is_local: false,
is_temporary: false,
is_sync: false,
format: Some(
params
.format
.iter()
.cloned()
.map(tarantool::space::Field::from)
.collect(),
),
},
)
.map_err(Some)?;
// Rollback space creation
Err(None)
})
.unwrap_err();
match err {
// Space was successfully created and rolled back
TransactionError::RolledBack(None) => Ok(()),
// Space creation failed
TransactionError::RolledBack(Some(err)) => Err(err.into()),
// Error during commit or rollback
err => panic!("transaction mechanism should not fail: {err:?}"),
}
}
/// Memoizes id if it is automatically selected.
fn id(&mut self, storage: &Clusterwide) -> traft::Result<SpaceId> {
let _ = storage;
let sys_space = Space::from(SystemSpace::Space);
let id = if let Some(id) = self.0.id {
id
} else {
let mut id = SPACE_ID_INTERNAL_MAX;
let mut taken_ids = Vec::with_capacity(32);
// IteratorType::All iterates in an order of ascending space ids
for space in sys_space.select(IteratorType::All, &())? {
let space_id: SpaceId = space
.field(0)?
.expect("space metadata should contain a space_id");
taken_ids.push(space_id);
}
// Find the first accessible space id.
for space_id in taken_ids {
// We aren't forcing users to not use internal range, so we have
// to ignore those
if space_id <= SPACE_ID_INTERNAL_MAX {
continue;
}
if space_id != id + 1 {
break;
}
id += 1;
}
id + 1
};
self.0.id = Some(id);
Ok(id)
}
pub fn into_ddl(mut self, storage: &Clusterwide) -> traft::Result<Ddl> {
let id = self.id(storage)?;
let primary_key: Vec<_> = self.0.primary_key.into_iter().map(Part::field).collect();
let format: Vec<_> = self
.0
.format
.into_iter()
.map(tarantool::space::Field::from)
.collect();
let distribution = match self.0.distribution {
DistributionParam::Global => Distribution::Global,
DistributionParam::Sharded => {
if let Some(field) = self.0.by_field {
Distribution::ShardedByField { field }
} else {
Distribution::ShardedImplicitly {
sharding_key: self
.0
.sharding_key
.expect("should be checked during `validate`"),
sharding_fn: self.0.sharding_fn.unwrap_or_default(),
}
}
}
};
let res = Ddl::CreateSpace {
id,
name: self.0.name,
format,
primary_key,
distribution,
};
Ok(res)
}
}
/// Waits for a pending ddl to be either `Committed` or `Aborted` by the governor.
///
/// Returns an index of the corresponding `DdlCommit` entry.
///
/// If `timeout` is reached earlier returns an error.
pub fn wait_for_ddl_commit(
prepare_commit: RaftIndex,
timeout: Duration,
) -> traft::Result<RaftIndex> {
let raft_storage = &node::global()?.raft_storage;
let deadline = instant_saturating_add(Instant::now(), timeout);
let last_seen = prepare_commit;
loop {
let cur_applied = raft_storage.applied()?;
let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?;
for entry in new_entries {
if entry.entry_type != raft::prelude::EntryType::EntryNormal {
continue;
}
let index = entry.index;
let op = entry.into_op().unwrap_or(Op::Nop);
match op {
Op::DdlCommit => return Ok(index),
Op::DdlAbort => return Err(DdlError::Aborted.into()),
_ => (),
}
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
return Err(Error::Timeout);
}
}
}
/// Waits until there is no pending schema change.
///
/// If `timeout` is reached earlier returns an error.
fn wait_for_no_pending_schema_change(
storage: &Clusterwide,
timeout: Duration,
) -> traft::Result<()> {
let deadline = instant_saturating_add(Instant::now(), timeout);
loop {
if storage.properties.pending_schema_change()?.is_none() {
return Ok(());
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
event::wait_timeout(event::Event::EntryApplied, timeout)?;
} else {
return Err(Error::Timeout);
}
}
}
/// Prepares an `op` for execution on a cluster-wide schema
/// by proposing a corresponding Raft entry.
/// Retries entry proposal if leader changes until the entry is added to the log.
/// Waits for any pending schema change to finalize.
///
/// If `timeout` is reached earlier returns an error.
// TODO: Use deadline instead of timeout
pub fn prepare_schema_change(op: Op, timeout: Duration) -> traft::Result<RaftIndex> {
debug_assert!(op.is_schema_change());
loop {
let node = node::global()?;
let storage = &node.storage;
let raft_storage = &node.raft_storage;
let mut op = op.clone();
op.set_schema_version(storage.properties.next_schema_version()?);
wait_for_no_pending_schema_change(storage, timeout)?;
let index = node::global()?
.read_index(timeout)
.map_err(|e| Error::other(format!("read_index failed: {e}")))?;
let term = raft::Storage::term(raft_storage, index)?;
let predicate = cas::Predicate {
index,
term,
ranges: vec![
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::PendingSchemaChange,)),
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::PendingSchemaVersion,)),
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::GlobalSchemaVersion,)),
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::NextSchemaVersion,)),
],
};
let (index, term) = compare_and_swap(op, predicate, timeout)?;
node.wait_index(index, timeout)?;
if raft::Storage::term(raft_storage, index)? != term {
// leader switched - retry
continue;
}
return Ok(index);
}
}
/// Aborts a pending DDL operation and waits for abort to be committed localy.
/// If `timeout` is reached earlier returns an error.
///
/// Returns an index of the corresponding DdlAbort raft entry, or an error if
/// there is no pending DDL operation.
pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> {
let node = node::global()?;
loop {
if node.storage.properties.pending_schema_change()?.is_none() {
return Err(DdlError::NoPendingDdl.into());
}
let index = node.get_index();
let term = raft::Storage::term(&node.raft_storage, index)?;
let predicate = cas::Predicate {
index,
term,
ranges: vec![
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::PendingSchemaChange,)),
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::GlobalSchemaVersion,)),
cas::Range::new(ClusterwideSpaceId::Property)
.eq((PropertyName::NextSchemaVersion,)),
],
};
let (index, term) = compare_and_swap(Op::DdlAbort, predicate, timeout)?;
node.wait_index(index, timeout)?;
if raft::Storage::term(&node.raft_storage, index)? != term {
// leader switched - retry
continue;
}
return Ok(index);
}
}
mod tests {
use tarantool::space::FieldType;
use super::*;
#[::tarantool::test]
fn test_create_space() {
let storage = Clusterwide::new().unwrap();
ValidCreateSpaceParams(CreateSpaceParams {
id: None,
name: "friends_of_peppa".into(),
format: vec![
Field {
name: "id".into(),
r#type: FieldType::Number,
is_nullable: false,
},
Field {
name: "name".into(),
r#type: FieldType::String,
is_nullable: false,
},
],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
})
.test_create_space(&storage)
.unwrap();
assert!(tarantool::space::Space::find("friends_of_peppa").is_none());
let err = ValidCreateSpaceParams(CreateSpaceParams {
id: Some(0),
name: "friends_of_peppa".into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
})
.test_create_space(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"tarantool error: CreateSpace: Failed to create space 'friends_of_peppa': space id 0 is reserved"
);
}
#[::tarantool::test]
fn ddl() {
let storage = Clusterwide::new().unwrap();
let existing_space = "existing_space";
let existing_id = 0;
storage
.spaces
.insert(&SpaceDef {
id: existing_id,
name: existing_space.into(),
distribution: Distribution::Global,
format: vec![],
schema_version: 0,
operable: true,
})
.unwrap();
let new_space = "new_space";
let new_id = 1;
let field1 = Field {
name: "field1".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let field2 = Field {
name: "field2".into(),
r#type: FieldType::Any,
is_nullable: false,
};
let err = CreateSpaceParams {
id: Some(existing_id),
name: new_space.into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: space with id 0 already exists"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: existing_space.into(),
format: vec![],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: space with name existing_space already exists"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone(), field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: several fields have the same name: field1"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Global,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: no field with name: field2"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field2.name.clone()]),
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: no field with name: field2"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: Some(vec![field1.name.clone(), field1.name.clone()]),
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: several fields have the same name: field1"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: no field with name: field2"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: None,
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: distribution is `sharded`, but neither `by_field` nor `sharding_key` is set"
);
let err = CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1.clone()],
primary_key: vec![],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name.clone()),
sharding_key: Some(vec![]),
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap_err();
assert_eq!(
err.to_string(),
"ddl failed: space creation failed: only one of sharding policy fields (`by_field`, `sharding_key`) should be set"
);
CreateSpaceParams {
id: Some(new_id),
name: new_space.into(),
format: vec![field1, field2.clone()],
primary_key: vec![field2.name.clone()],
distribution: DistributionParam::Sharded,
by_field: Some(field2.name),
sharding_key: None,
sharding_fn: None,
timeout: 0.0,
}
.validate(&storage)
.unwrap();
}
}