Skip to content
Snippets Groups Projects

OpDdl create space

Merged Georgy Moshkin requested to merge feat/ddl-apply-commit into master
Compare and
17 files
+ 1596
110
Compare changes
  • Side-by-side
  • Inline
Files
17
+ 73
0
@@ -2,11 +2,13 @@ use std::collections::HashMap;
use std::time::Duration;
use ::tarantool::fiber;
use ::tarantool::fiber::r#async::timeout::Error as TimeoutError;
use ::tarantool::fiber::r#async::timeout::IntoTimeout as _;
use ::tarantool::fiber::r#async::watch;
use crate::event::{self, Event};
use crate::instance::Instance;
use crate::op::Op;
use crate::r#loop::FlowControl::{self, Continue};
use crate::storage::Clusterwide;
use crate::storage::ToEntryIter as _;
@@ -16,6 +18,7 @@ use crate::traft::network::ConnectionPool;
use crate::traft::node::global;
use crate::traft::node::Status;
use crate::traft::raft_storage::RaftSpaceAccess;
use crate::traft::rpc::ddl_apply;
use crate::traft::rpc::sync;
use crate::traft::Result;
use crate::unwrap_ok_or;
@@ -66,6 +69,8 @@ impl Loop {
let vshard_bootstrapped = storage.properties.vshard_bootstrapped().unwrap();
let replication_factor = storage.properties.replication_factor().unwrap();
let desired_schema_version = storage.properties.desired_schema_version().unwrap();
let pending_schema_change = storage.properties.pending_schema_change().unwrap();
let has_pending_schema_change = pending_schema_change.is_some();
let plan = action_plan(
term,
@@ -80,6 +85,7 @@ impl Loop {
vshard_bootstrapped,
replication_factor,
desired_schema_version,
has_pending_schema_change,
);
let plan = unwrap_ok_or!(plan,
Err(e) => {
@@ -427,6 +433,73 @@ impl Loop {
}
}
Plan::ApplySchemaChange(ApplySchemaChange { targets, rpc }) => {
let mut next_op = Op::Nop;
governor_step! {
"applying pending schema change"
async {
let mut fs = vec![];
for instance_id in targets {
tlog!(Info, "calling proc_apply_schema_change"; "instance_id" => %instance_id);
let resp = pool.call(instance_id, &rpc)?;
fs.push(async move {
match resp.await {
Ok(ddl_apply::Response::Ok) => {
tlog!(Info, "applied schema change on instance";
"instance_id" => %instance_id,
);
Ok(())
}
Ok(ddl_apply::Response::Abort { reason }) => {
tlog!(Error, "failed to apply schema change on instance: {reason}";
"instance_id" => %instance_id,
);
Err(OnError::Abort)
}
Err(e) => {
tlog!(Warning, "failed calling proc_apply_schema_change: {e}";
"instance_id" => %instance_id
);
Err(OnError::Retry(e))
}
}
});
}
// TODO: don't hard code timeout
let res = try_join_all(fs).timeout(Duration::from_secs(3)).await;
if let Err(TimeoutError::Failed(OnError::Abort)) = res {
next_op = Op::DdlAbort;
return Ok(());
}
res?;
next_op = Op::DdlCommit;
enum OnError { Retry(Error), Abort }
impl From<OnError> for Error {
fn from(e: OnError) -> Error {
match e {
OnError::Retry(e) => e,
OnError::Abort => Error::other("schema change was aborted"),
}
}
}
}
}
let op_name = next_op.to_string();
governor_step! {
"finalizing schema change" [
"op" => &op_name,
]
async {
assert!(matches!(next_op, Op::DdlAbort | Op::DdlCommit));
node.propose_and_wait(next_op, Duration::from_secs(3))?;
}
}
}
Plan::ApplyMigration(ApplyMigration { target, rpc, op }) => {
let migration_id = rpc.migration_id;
governor_step! {
Loading