Skip to content
Snippets Groups Projects
Commit 6f46d29e authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

fix: check for timeout between migration queries

parent 1772928d
No related branches found
No related tags found
1 merge request!1023feat: rework migration file parsing
......@@ -8,6 +8,7 @@ use crate::{error_injection, sql, tlog, traft};
use sbroad::backend::sql::ir::PatternWithParams;
use std::io;
use std::io::ErrorKind;
use tarantool::fiber;
use tarantool::space::UpdateOps;
use tarantool::time::Instant;
......@@ -166,23 +167,34 @@ fn split_sql_queries(sql: &str) -> Vec<String> {
/// Apply sql from migration file onto cluster.
trait SqlApplier {
fn apply(&self, sql: &str) -> traft::Result<()>;
fn apply(&self, sql: &str, deadline: Option<Instant>) -> traft::Result<()>;
}
/// By default, sql applied with SBroad.
struct SBroadApplier;
impl SqlApplier for SBroadApplier {
fn apply(&self, sql: &str) -> traft::Result<()> {
fn apply(&self, sql: &str, deadline: Option<Instant>) -> traft::Result<()> {
// Should sbroad accept a timeout parameter?
if let Some(deadline) = deadline {
if fiber::clock() > deadline {
return Err(traft::error::Error::Timeout);
}
}
let mut params = PatternWithParams::new(sql.to_string(), vec![]);
params.tracer = Some("stat".to_string());
sql::dispatch_sql_query(params.into()).map(|_| ())
}
}
fn up_single_file(queries: &MigrationQueries, applier: &impl SqlApplier) -> Result<(), Error> {
fn up_single_file(
queries: &MigrationQueries,
applier: &impl SqlApplier,
deadline: Instant,
) -> Result<(), Error> {
for sql in &queries.up {
if let Err(e) = applier.apply(sql) {
if let Err(e) = applier.apply(sql, Some(deadline)) {
return Err(Error::Up(sql.clone(), e.to_string()));
}
}
......@@ -192,7 +204,7 @@ fn up_single_file(queries: &MigrationQueries, applier: &impl SqlApplier) -> Resu
fn down_single_file(queries: &MigrationQueries, applier: &impl SqlApplier) {
for sql in &queries.down {
if let Err(e) = applier.apply(sql) {
if let Err(e) = applier.apply(sql, None) {
tlog!(Error, "Error while apply DOWN command `{sql}`: {e}");
}
}
......@@ -266,7 +278,7 @@ pub fn up(
return Err(Error::Up("".to_string(), "".to_string()).into());
}
if let Err(e) = up_single_file(queries, &SBroadApplier) {
if let Err(e) = up_single_file(queries, &SBroadApplier, deadline) {
handle_err(&seen_queries);
return Err(e.into());
}
......@@ -425,7 +437,7 @@ command_2
}
impl SqlApplier for BufApplier {
fn apply(&self, sql: &str) -> crate::traft::Result<()> {
fn apply(&self, sql: &str, _deadline: Option<Instant>) -> crate::traft::Result<()> {
if let Some(p) = self.poison_query {
if p == sql {
return Err(crate::traft::error::Error::Other("test error".into()));
......@@ -438,6 +450,8 @@ command_2
#[test]
fn test_migration_up() {
let no_deadline = Instant::now().saturating_add(tarantool::clock::INFINITY);
let source = r#"
-- pico.UP
sql_command_1;
......@@ -449,7 +463,7 @@ sql_command_3;
buf: RefCell::new(vec![]),
poison_query: None,
};
up_single_file(&queries, &applier).unwrap();
up_single_file(&queries, &applier, no_deadline).unwrap();
#[rustfmt::skip]
assert_eq!(
......@@ -469,7 +483,7 @@ sql_command_3;
buf: RefCell::new(vec![]),
poison_query: Some("sql_command_2;"),
};
up_single_file(&queries, &applier).unwrap_err();
up_single_file(&queries, &applier, no_deadline).unwrap_err();
#[rustfmt::skip]
assert_eq!(
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment