From e66d152ffb80dcc17812ecca3ba811a031104b7c Mon Sep 17 00:00:00 2001
From: Dmitry Rodionov <d.rodionov@picodata.io>
Date: Tue, 1 Oct 2024 17:05:59 +0300
Subject: [PATCH] refactor: avoid blocking in migration file parsing

Previouslly we've blocked on scope termination before reading from
channel, so blocking still occured.

With this patch there is new `blocking` wrapper function that executes
provided closure on separate thread and yields until result is ready
---
 src/cbus.rs             |   1 +
 src/plugin/migration.rs | 198 +++++++++++++++++++++++-----------------
 2 files changed, 117 insertions(+), 82 deletions(-)

diff --git a/src/cbus.rs b/src/cbus.rs
index 0cb7c66f38..4876f80d7a 100644
--- a/src/cbus.rs
+++ b/src/cbus.rs
@@ -22,3 +22,4 @@ pub fn init_cbus_endpoint() {
         .start_non_joinable()
         .expect("starting a fiber shouldn't fail");
 }
+
diff --git a/src/plugin/migration.rs b/src/plugin/migration.rs
index afd27e5bc0..75a969d069 100644
--- a/src/plugin/migration.rs
+++ b/src/plugin/migration.rs
@@ -11,9 +11,9 @@ use crate::util::Lexer;
 use crate::util::QuoteEscapingStyle;
 use crate::{sql, tlog, traft};
 use std::fs::File;
-use std::io;
 use std::io::{ErrorKind, Read};
 use std::time::Duration;
+use std::{io, panic};
 use tarantool::cbus;
 use tarantool::fiber;
 use tarantool::time::Instant;
@@ -24,7 +24,7 @@ pub enum Error {
     File(String, io::Error),
 
     #[error("Failed spawning a migrations parsing thread: {0}")]
-    ThreadDead(String),
+    Blocking(#[from] BlockingError),
 
     #[error("Invalid migration file format: {0}")]
     InvalidMigrationFormat(String),
@@ -88,13 +88,48 @@ impl std::fmt::Display for DisplayTruncated<'_> {
     }
 }
 
+#[derive(thiserror::Error, Debug)]
+pub enum BlockingError {
+    #[error("failed to spawn a thread: {0}")]
+    Io(#[from] io::Error),
+
+    #[error("BUG: thread returned without sending result to the channel")]
+    Bug,
+}
+
+/// Function executes provided closure on a separate thread without blocking current fiber.
+/// In case provided closure panics the panic is forwarded to caller.
+pub fn blocking<F, R>(f: F) -> Result<R, BlockingError>
+where
+    F: FnOnce() -> R + Send + 'static,
+    R: Send + 'static,
+{
+    let (sender, receiver) = cbus::oneshot::channel(ENDPOINT_NAME);
+
+    let jh = std::thread::Builder::new()
+        .name("blocking".to_owned())
+        .spawn(|| {
+            sender.send(f());
+        })?;
+
+    if let Ok(r) = receiver.receive() {
+        return Ok(r);
+    };
+
+    // Sender is dropped, which means provided function panicked.
+    // Attempt to get proper error by joining the thread.
+    // In theory this can block for insignificant amount of time.
+    match jh.join() {
+        Ok(_) => Err(BlockingError::Bug),
+        Err(e) => panic::resume_unwind(e),
+    }
+}
+
 /// Sends a task to a separate thread to calculate the checksum of the migrations file
 /// and blocks the current fiber until the result is ready.
 pub fn calculate_migration_hash_async(migration: &MigrationInfo) -> Result<md5::Digest, Error> {
     let shortname = &migration.filename_from_manifest;
-    let fullpath = &migration.full_filepath;
-
-    let (sender, receiver) = cbus::oneshot::channel(ENDPOINT_NAME);
+    let fullpath = migration.full_filepath.clone();
 
     fn calculate_migration_hash_from_file(filename: &str) -> Result<md5::Digest, io::Error> {
         const BUF_SIZE: usize = 4096;
@@ -119,34 +154,25 @@ pub fn calculate_migration_hash_async(migration: &MigrationInfo) -> Result<md5::
     tlog!(Info, "hashing migrations file '{shortname}'");
     let t0 = Instant::now_accurate();
 
-    std::thread::scope(|s| -> std::io::Result<_> {
-        std::thread::Builder::new()
-            .name("migrations_parser".into())
-            .spawn_scoped(s, move || {
-                tlog!(Debug, "hashing a migrations file '{fullpath}'");
-                let res = calculate_migration_hash_from_file(fullpath)
-                    .map_err(|e| Error::File(fullpath.to_string(), e));
-                if let Err(e) = &res {
-                    tlog!(Debug, "failed hashing migrations file '{fullpath}': {e}");
-                }
-
-                sender.send(res)
-            })?;
-        Ok(())
-    })
-    .map_err(|e| Error::ThreadDead(e.to_string()))?;
-
-    let res = receiver.receive().map_err(|e| {
+    let res = blocking(move || {
+        tlog!(Debug, "hashing a migrations file '{fullpath}'");
+        let res = calculate_migration_hash_from_file(&fullpath)
+            .map_err(|e| Error::File(fullpath.to_string(), e));
+        if let Err(e) = &res {
+            tlog!(Debug, "failed hashing migrations file '{fullpath}': {e}");
+        }
+        res
+    })?
+    .inspect_err(|e| {
         #[rustfmt::skip]
         tlog!(Error, "failed receiving migrations hash from file '{shortname}': {e}");
-        Error::ThreadDead(e.to_string())
-    });
+    })?;
 
     let elapsed = t0.elapsed();
     #[rustfmt::skip]
     tlog!(Info, "done hashing migrations file '{shortname}', elapsed time: {elapsed:?}");
 
-    res?
+    Ok(res)
 }
 
 /// Stores info about a migration file which is being processed.
@@ -200,53 +226,32 @@ impl MigrationInfo {
 
 /// Sends a task to a separate thread to parse the migrations file and blocks
 /// the current fiber until the result is ready.
-fn read_migration_queries_from_file_async(migration: &mut MigrationInfo) -> Result<(), Error> {
-    let (sender, receiver) = cbus::oneshot::channel(ENDPOINT_NAME);
-
-    let shortname = &migration.filename_from_manifest;
-    let fullpath = &migration.full_filepath;
-    let mut migration_for_other_thread = migration.clone();
-
-    tlog!(Info, "parsing migrations file '{shortname}'");
+fn read_migration_queries_from_file_async(
+    mut migration: MigrationInfo,
+) -> Result<MigrationInfo, Error> {
+    tlog!(Info, "parsing migrations file '{}'", migration.shortname());
     let t0 = Instant::now_accurate();
 
-    std::thread::scope(|s| -> std::io::Result<_> {
-        std::thread::Builder::new()
-            .name("migrations_parser".into())
-            .spawn_scoped(s, move || {
-                let fullpath = &migration_for_other_thread.full_filepath;
-                tlog!(Debug, "parsing a migrations file '{fullpath}'");
-                let res = read_migration_queries_from_file(&mut migration_for_other_thread);
-                if let Err(e) = &res {
-                    let fullpath = &migration_for_other_thread.full_filepath;
-                    tlog!(Debug, "failed parsing migrations file '{fullpath}': {e}");
-                }
-
-                sender.send(res.map(|()| migration_for_other_thread))
-            })?;
-        Ok(())
-    })
-    .map_err(|e| Error::ThreadDead(e.to_string()))?;
+    let mut migration = blocking(|| {
+        let fullpath = &migration.full_filepath;
+        tlog!(Debug, "parsing a migrations file '{fullpath}'");
+        let res = read_migration_queries_from_file(&mut migration);
+        if let Err(e) = &res {
+            let fullpath = &migration.full_filepath;
+            tlog!(Debug, "failed parsing migrations file '{fullpath}': {e}");
+        }
 
-    // FIXME: add receive_timeout/receive_deadline
-    let res = receiver.receive().map_err(|e| {
-        #[rustfmt::skip]
-        tlog!(Error, "failed receiving migrations parsed from file '{fullpath}': {e}");
-        Error::ThreadDead(e.to_string())
-    });
+        migration
+    })?;
 
     let elapsed = t0.elapsed();
-    #[rustfmt::skip]
-    tlog!(Info, "done parsing migrations file '{shortname}', elapsed time: {elapsed:?}");
-
-    let migration_from_other_thread = res??;
-    #[rustfmt::skip]
-    debug_assert_eq!(migration.full_filepath, migration_from_other_thread.full_filepath);
-    #[rustfmt::skip]
-    debug_assert_eq!(migration.filename_from_manifest, migration_from_other_thread.filename_from_manifest);
+    tlog!(
+        Info,
+        "done parsing migrations file '{}', elapsed time: {elapsed:?}",
+        migration.shortname()
+    );
 
-    *migration = migration_from_other_thread;
-    Ok(())
+    Ok(migration)
 }
 
 /// Reads and parses migrations file desribed by `migration`.
@@ -513,8 +518,8 @@ pub fn apply_up_migrations(
     let handle_err = |to_revert: &[MigrationInfo]| {
         let deadline = fiber::clock().saturating_add(rollback_timeout);
         let it = to_revert.iter().rev();
-        for queries in it {
-            down_single_file_with_commit(&plugin_ident.name, queries, &SBroadApplier, deadline);
+        for migration in it {
+            down_single_file_with_commit(&plugin_ident.name, migration, &SBroadApplier, deadline);
         }
     };
 
@@ -522,15 +527,12 @@ pub fn apply_up_migrations(
     let mut seen_queries = Vec::with_capacity(migrations_count);
 
     let node = node::global().expect("node must be already initialized");
-    for (num, mut migration) in migration_files.into_iter().enumerate() {
+    for (num, migration) in migration_files.into_iter().enumerate() {
         #[rustfmt::skip]
         tlog!(Info, "applying `UP` migrations, progress: {num}/{migrations_count}");
 
-        let res = read_migration_queries_from_file_async(&mut migration);
-        if let Err(e) = res {
-            handle_err(&seen_queries);
-            return Err(e.into());
-        }
+        let migration = read_migration_queries_from_file_async(migration)
+            .inspect(|_| handle_err(&seen_queries))?;
         seen_queries.push(migration);
         let migration = seen_queries.last().expect("just inserted");
 
@@ -601,14 +603,20 @@ pub fn apply_down_migrations(
         #[rustfmt::skip]
         tlog!(Info, "applying `DOWN` migrations, progress: {num}/{}", migrations.len());
 
-        let mut migration = MigrationInfo::new_unparsed(plugin_identity, filename.clone());
-        let res = read_migration_queries_from_file_async(&mut migration);
-        if let Err(e) = res {
-            tlog!(Error, "Rollback DOWN migration error: {e}");
-            continue;
+        let migration = MigrationInfo::new_unparsed(plugin_identity, filename.clone());
+        match read_migration_queries_from_file_async(migration) {
+            Ok(migration) => {
+                down_single_file_with_commit(
+                    &plugin_identity.name,
+                    &migration,
+                    &SBroadApplier,
+                    deadline,
+                );
+            }
+            Err(e) => {
+                tlog!(Error, "Rollback DOWN migration error: {e}");
+            }
         }
-
-        down_single_file_with_commit(&plugin_identity.name, &migration, &SBroadApplier, deadline);
     }
     #[rustfmt::skip]
     tlog!(Info, "applying `DOWN` migrations, progress: {0}/{0}", migrations.len());
@@ -852,3 +860,29 @@ sql_command_3;
         );
     }
 }
+
+mod tests_internal {
+    use std::panic;
+
+    use crate::cbus::init_cbus_endpoint;
+    use crate::plugin::migration::blocking;
+
+    #[tarantool::test]
+    fn test_blocking_ok() {
+        init_cbus_endpoint();
+
+        let res = blocking(|| 42).unwrap();
+        assert_eq!(res, 42)
+    }
+
+    #[::tarantool::test]
+    fn test_blocking_panics() {
+        init_cbus_endpoint();
+
+        let res = panic::catch_unwind(|| blocking(|| panic!("uh oh")).unwrap());
+        assert_eq!(
+            res.unwrap_err().downcast_ref::<&'static str>().unwrap(),
+            &"uh oh"
+        );
+    }
+}
-- 
GitLab