Skip to content
Snippets Groups Projects
Commit bc308b56 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

feat: implement the monotonically increasing next_schema_version pico property

parent 605accc2
No related branches found
No related tags found
1 merge request!516OpDdl create space
......@@ -622,10 +622,35 @@ pub trait TClusterwideSpaceIndex {
pub enum PropertyName {
ReplicationFactor = "replication_factor",
VshardBootstrapped = "vshard_bootstrapped",
// TODO: remove this
DesiredSchemaVersion = "desired_schema_version",
/// Pending ddl operation which is to be either committed or aborted.
///
/// Is only present during the time between the last ddl prepare
/// operation and the corresponding ddl commit or abort operation.
PendingSchemaChange = "pending_schema_change",
/// Schema version of the pending ddl prepare operation.
/// This will be the current schema version if the next entry is a ddl
/// commit operation.
///
/// Is only present during the time between the last ddl prepare
/// operation and the corresponding ddl commit or abort operation.
PendingSchemaVersion = "pending_schema_version",
/// Current schema version. Increases with every ddl commit operation.
///
/// This is equal to [`pico_schema_version`] during the time between
/// the last ddl commit or abort operation and the next ddl prepare
/// operation.
CurrentSchemaVersion = "current_schema_version",
/// Schema version which should be used for the next ddl prepare
/// operation. This increases with every ddl prepare operation in the
/// log no matter if it is committed or aborted.
/// This guards us from some painfull corner cases.
NextSchemaVersion = "next_schema_version",
}
}
......
......@@ -1001,6 +1001,9 @@ impl NodeImpl {
self.storage
.properties
.put(PropertyName::PendingSchemaVersion, &schema_version)?;
self.storage
.properties
.put(PropertyName::NextSchemaVersion, &(schema_version + 1))?;
Ok(())
}
......
......@@ -524,6 +524,13 @@ class Instance:
eprint(f"CaS:\n {predicate=}\n {dml=}")
return self.call(".proc_cas", self.cluster_id, predicate, dml)[0]["index"]
def next_schema_version(self) -> int:
t = self.call("box.space._picodata_property:get", "next_schema_version")
if t is None:
return 1
return t[1]
def assert_raft_status(self, state, leader_id=None):
status = self._raft_status()
......
......@@ -13,7 +13,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
# Propose a space creation which will fail
op = dict(
kind="ddl_prepare",
schema_version=1,
schema_version=i1.next_schema_version(),
ddl=dict(
kind="create_space",
id=666,
......@@ -40,11 +40,7 @@ def test_ddl_create_space_bulky(cluster: Cluster):
# Propose a space creation which will succeed
op = dict(
kind="ddl_prepare",
# This version number must not be equal to the version of the aborted
# change, otherwise all of the instances joined after this request was
# committed will be blocked during the attempt to apply the previous
# DdlAbort
schema_version=2,
schema_version=i1.next_schema_version(),
ddl=dict(
kind="create_space",
id=666,
......@@ -136,7 +132,7 @@ def test_ddl_create_space_partial_failure(cluster: Cluster):
# Propose a space creation which will fail
op = dict(
kind="ddl_prepare",
schema_version=1,
schema_version=i1.next_schema_version(),
ddl=dict(
kind="create_space",
id=666,
......@@ -176,7 +172,7 @@ def test_ddl_from_snapshot(cluster: Cluster):
# Propose a space creation which will succeed
op = dict(
kind="ddl_prepare",
schema_version=1,
schema_version=i1.next_schema_version(),
ddl=dict(
kind="create_space",
id=666,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment