diff --git a/Cargo.lock b/Cargo.lock
index 23ca38d290b218c36ef227df97300dab0096888f..95bc856a51dfd47c4aa0ee3dffab89da13a31f7c 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2769,6 +2769,7 @@ dependencies = [
  "rmpv",
  "serde",
  "serde_json",
+ "smol_str",
  "tarantool 6.0.0",
  "tarolog 0.2.1",
  "thiserror",
diff --git a/exports_picodata b/exports_picodata
index 6ab7d8aba7147aa7ea3f4070dc876d2f11eb9907..b9d905e46364411abdad35d770eaa848e1fcf49f 100644
--- a/exports_picodata
+++ b/exports_picodata
@@ -1,3 +1,6 @@
+pico_ffi_background_cancel_jobs_by_tag
+pico_ffi_background_register_job_cancellation_token
+pico_ffi_background_set_jobs_shutdown_timeout
 pico_ffi_cas
 pico_ffi_instance_info
 pico_ffi_raft_info
diff --git a/picodata-plugin/Cargo.toml b/picodata-plugin/Cargo.toml
index 651f8ff74ea3a48dd8dbb325919ca1aa37fa115f..73b4628067780263d8ce9a13ebc704a578113579 100644
--- a/picodata-plugin/Cargo.toml
+++ b/picodata-plugin/Cargo.toml
@@ -17,6 +17,7 @@ rmp = "0.8"
 rmp-serde = "1.1"
 rmpv = { version = "1.0", features = ["with-serde"] }
 serde = "1.0"
+smol_str = "0.2"
 tarantool = { path = "../tarantool/tarantool", version = "6.0", features = ["picodata", "test", "tokio_components"] }
 tarolog = "0.2.1"
 thiserror = "1.0"
diff --git a/picodata-plugin/src/background.rs b/picodata-plugin/src/background.rs
index ae7670d7d8f64f591c5d17f9eb7d72a9f3a52af7..0ce0382f310420cfc6796a53c235b9d72b7ea7b7 100644
--- a/picodata-plugin/src/background.rs
+++ b/picodata-plugin/src/background.rs
@@ -1,18 +1,159 @@
 //! Background container for long live jobs.
 //! Background container guarantees job liveness across plugin life cycle.
 
+use crate::internal::ffi;
+#[allow(unused_imports)]
+use crate::plugin::interface::PicoContext;
 use crate::plugin::interface::ServiceId;
-use crate::system::tarantool::fiber;
-use std::collections::HashMap;
-use std::mem;
-use std::rc::Rc;
-use std::string::ToString;
-use std::sync::OnceLock;
+use crate::util::tarantool_error_to_box_error;
+use crate::util::DisplayErrorLocation;
+use std::cell::Cell;
 use std::time::Duration;
+use tarantool::error::BoxError;
+use tarantool::error::TarantoolErrorCode;
+use tarantool::fiber;
+use tarantool::fiber::FiberId;
 use tarantool::fiber::{Channel, RecvError};
-use tarantool::time::Instant;
 use tarantool::util::IntoClones;
 
+/// Same as [`PicoContext::register_job`].
+#[track_caller]
+pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError>
+where
+    F: FnOnce(CancellationToken) + 'static,
+{
+    let loc = std::panic::Location::caller();
+    let tag = format!("{}:{}", loc.file(), loc.line());
+
+    register_tagged_job(service_id, job, &tag)
+}
+
+/// Same as [`PicoContext::register_tagged_job`].
+#[allow(deprecated)]
+pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError>
+where
+    F: FnOnce(CancellationToken) + 'static,
+{
+    let (token, handle) = CancellationToken::new();
+    let finish_channel = handle.finish_channel.clone();
+
+    let fiber_id = fiber::Builder::new()
+        .name(tag)
+        .func(move || {
+            job(token);
+            // send shutdown signal to the waiter side
+            _ = finish_channel.send(());
+        })
+        .start_non_joinable()
+        .map_err(tarantool_error_to_box_error)?;
+
+    let plugin = &service_id.plugin;
+    let service = &service_id.service;
+    let version = &service_id.version;
+
+    let token = FfiBackgroundJobCancellationToken::new(
+        fiber_id,
+        handle.cancel_channel,
+        handle.finish_channel,
+    );
+    register_background_job_cancellation_token(plugin, service, version, tag, token)?;
+
+    Ok(())
+}
+
+fn register_background_job_cancellation_token(
+    plugin: &str,
+    service: &str,
+    version: &str,
+    job_tag: &str,
+    token: FfiBackgroundJobCancellationToken,
+) -> Result<(), BoxError> {
+    // SAFETY: safe as long as picodata version is compatible
+    let rc = unsafe {
+        ffi::pico_ffi_background_register_job_cancellation_token(
+            plugin.into(),
+            service.into(),
+            version.into(),
+            job_tag.into(),
+            token,
+        )
+    };
+
+    if rc != 0 {
+        return Err(BoxError::last());
+    }
+
+    Ok(())
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// cancel_jobs_by_tag
+////////////////////////////////////////////////////////////////////////////////
+
+/// Outcome of the request to cancel a set of background jobs.
+#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)]
+#[repr(C)]
+pub struct JobCancellationResult {
+    /// Attempted to cancel this many background jobs.
+    pub n_total: u32,
+    /// This many jobs didn't finish in time.
+    pub n_timeouts: u32,
+}
+
+impl JobCancellationResult {
+    #[inline(always)]
+    pub fn new(n_total: u32, n_timeouts: u32) -> Self {
+        Self {
+            n_total,
+            n_timeouts,
+        }
+    }
+}
+
+/// Same as [`PicoContext::cancel_tagged_jobs`].
+#[inline(always)]
+pub fn cancel_jobs_by_tag(
+    service_id: &ServiceId,
+    job_tag: &str,
+    timeout: Duration,
+) -> Result<JobCancellationResult, BoxError> {
+    cancel_background_jobs_by_tag_inner(
+        &service_id.plugin,
+        &service_id.service,
+        &service_id.version,
+        job_tag,
+        timeout,
+    )
+}
+
+/// Same as [`PicoContext::cancel_tagged_jobs`].
+pub fn cancel_background_jobs_by_tag_inner(
+    plugin: &str,
+    service: &str,
+    version: &str,
+    job_tag: &str,
+    timeout: Duration,
+) -> Result<JobCancellationResult, BoxError> {
+    let mut result = JobCancellationResult::default();
+    // SAFETY: safe as long as picodata version is compatible
+    let rc = unsafe {
+        ffi::pico_ffi_background_cancel_jobs_by_tag(
+            plugin.into(),
+            service.into(),
+            version.into(),
+            job_tag.into(),
+            timeout.as_secs_f64(),
+            &mut result,
+        )
+    };
+
+    if rc != 0 {
+        return Err(BoxError::last());
+    }
+
+    Ok(result)
+}
+
 #[derive(thiserror::Error, Debug)]
 pub enum Error {
     #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")]
@@ -21,6 +162,10 @@ pub enum Error {
     CancellationTimeout,
 }
 
+////////////////////////////////////////////////////////////////////////////////
+// CancellationToken
+////////////////////////////////////////////////////////////////////////////////
+
 /// A token which can be used to signal a cancellation request to the job.
 #[non_exhaustive]
 #[derive(Debug)]
@@ -32,6 +177,7 @@ impl CancellationToken {
     /// Create a cancellation token and cancellation token handle pair.
     /// User should use cancellation token for graceful shutdown their job.
     /// Cancellation token handle used by `picodata` for sending cancel signal to a user job.
+    #[allow(deprecated)]
     pub fn new() -> (CancellationToken, CancellationTokenHandle) {
         let (cancel_tx, cancel_rx) = Channel::new(1).into_clones();
         (
@@ -63,11 +209,13 @@ impl CancellationToken {
 }
 
 #[derive(Debug)]
+#[deprecated = "don't use this"]
 pub struct CancellationTokenHandle {
     cancel_channel: Channel<()>,
     finish_channel: Channel<()>,
 }
 
+#[allow(deprecated)]
 impl CancellationTokenHandle {
     /// Cancel related job and return a backpressure channel.
     /// Caller should wait a message in the backpressure channel
@@ -82,62 +230,22 @@ impl CancellationTokenHandle {
     }
 }
 
-type Tag = String;
-type TaggedJobs = HashMap<Tag, Vec<CancellationTokenHandle>>;
-type UnTaggedJobs = Vec<CancellationTokenHandle>;
-
-fn cancel_handles(handles: Vec<CancellationTokenHandle>, deadline: Instant) -> usize {
-    let mut shutdown_channels = Vec::with_capacity(handles.len());
-
-    for handle in handles {
-        let shutdown_channel = handle.cancel();
-        shutdown_channels.push(shutdown_channel);
-    }
-
-    let mut completed_counter = 0;
-    for sd_chan in shutdown_channels {
-        // recalculate timeout at every iteration
-        let timeout = deadline.duration_since(Instant::now_fiber());
-
-        if sd_chan.recv_timeout(timeout).is_ok() {
-            completed_counter += 1;
-        }
-    }
-
-    completed_counter
-}
+////////////////////////////////////////////////////////////////////////////////
+// Java-style API
+////////////////////////////////////////////////////////////////////////////////
 
 /// [`ServiceWorkerManager`] allows plugin services
 /// to create long-live jobs and manage their life cycle.
 #[derive(Clone, Debug)]
 #[non_exhaustive]
 pub struct ServiceWorkerManager {
-    jobs: Rc<fiber::Mutex<(TaggedJobs, UnTaggedJobs)>>,
-    shutdown_timeout: Rc<fiber::Mutex<Duration>>,
+    service_id: ServiceId,
 }
 
 impl ServiceWorkerManager {
-    fn register_job_inner<F>(&self, job: F, maybe_tag: Option<&str>) -> tarantool::Result<()>
-    where
-        F: FnOnce(CancellationToken) + 'static,
-    {
-        let (token, handle) = CancellationToken::new();
-        let finish_chan = handle.finish_channel.clone();
-        let mut jobs = self.jobs.lock();
-        if let Some(tag) = maybe_tag {
-            jobs.0.entry(tag.to_string()).or_default().push(handle);
-        } else {
-            jobs.1.push(handle);
-        }
-
-        fiber::Builder::new()
-            .func(move || {
-                job(token);
-                // send shutdown signal to the waiter side
-                _ = finish_chan.send(());
-            })
-            .start_non_joinable()?;
-        Ok(())
+    #[inline(always)]
+    pub(crate) fn new(service_id: ServiceId) -> Self {
+        Self { service_id }
     }
 
     /// Add a new job to the execution.
@@ -154,11 +262,10 @@ impl ServiceWorkerManager {
     ///
     /// ```no_run
     /// use std::time::Duration;
-    /// use picodata_plugin::background::{CancellationToken, InternalGlobalWorkerManager, ServiceWorkerManager};
-    /// use picodata_plugin::plugin::interface::ServiceId;
+    /// use picodata_plugin::background::CancellationToken;
     ///
-    /// # let worker_manager = InternalGlobalWorkerManager::instance()
-    /// #    .get_or_init_manager(ServiceId::new("any_plugin", "any_service", "0.1.0"));
+    /// # use picodata_plugin::background::ServiceWorkerManager;
+    /// # fn test(worker_manager: ServiceWorkerManager) {
     ///
     /// // this job will print "hello" every second,
     /// // and print "bye" after being canceled
@@ -169,12 +276,18 @@ impl ServiceWorkerManager {
     ///     println!("job cancelled, bye!")
     /// }
     /// worker_manager.register_job(hello_printer).unwrap();
+    ///
+    /// # }
     /// ```
+    #[track_caller]
+    #[inline(always)]
     pub fn register_job<F>(&self, job: F) -> tarantool::Result<()>
     where
         F: FnOnce(CancellationToken) + 'static,
     {
-        self.register_job_inner(job, None)
+        register_job(&self.service_id, job)?;
+
+        Ok(())
     }
 
     /// Same as [`ServiceWorkerManager::register_job`] but caller may provide a special tag.
@@ -184,11 +297,14 @@ impl ServiceWorkerManager {
     ///
     /// * `job`: callback that will be executed in separated fiber
     /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
+    #[inline(always)]
     pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()>
     where
         F: FnOnce(CancellationToken) + 'static,
     {
-        self.register_job_inner(job, Some(tag))
+        register_tagged_job(&self.service_id, job, tag)?;
+
+        Ok(())
     }
 
     /// Cancel all jobs related to the given tag.
@@ -201,16 +317,19 @@ impl ServiceWorkerManager {
     /// * `tag`: determine what jobs should be cancelled
     /// * `timeout`: shutdown timeout
     pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> {
-        let deadline = fiber::clock().saturating_add(timeout);
-
-        let mut jobs = self.jobs.lock();
-        let handles = jobs.0.remove(tag).unwrap_or_default();
-
-        let job_count = handles.len();
+        let res = cancel_jobs_by_tag(&self.service_id, tag, timeout);
+        let res = match res {
+            Ok(res) => res,
+            Err(e) => {
+                let loc = DisplayErrorLocation(&e);
+                tarantool::say_error!("unexpected error: {loc}{e}");
+                return Ok(());
+            }
+        };
 
-        let completed_counter = cancel_handles(handles, deadline);
-        if job_count != completed_counter {
-            return Err(Error::PartialCompleted(job_count, completed_counter));
+        if res.n_timeouts != 0 {
+            let n_completed = res.n_total - res.n_timeouts;
+            return Err(Error::PartialCompleted(res.n_total as _, n_completed as _));
         }
 
         Ok(())
@@ -221,188 +340,221 @@ impl ServiceWorkerManager {
     /// jobs gracefully end.
     ///
     /// By default, 5-second timeout are used.
+    #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"]
     pub fn set_shutdown_timeout(&self, timeout: Duration) {
-        *self.shutdown_timeout.lock() = timeout;
+        let plugin = &self.service_id.plugin;
+        let service = &self.service_id.service;
+        let version = &self.service_id.version;
+        set_jobs_shutdown_timeout(plugin, service, version, timeout)
     }
 }
 
-/// This component is using by `picodata` for manage all worker managers.
+////////////////////////////////////////////////////////////////////////////////
+// set_background_jobs_shutdown_timeout
+////////////////////////////////////////////////////////////////////////////////
+
+/// In case when jobs were canceled by `picodata` use this function to determine
+/// a shutdown timeout - time duration that `picodata` uses to ensure that all
+/// jobs gracefully end.
+///
+/// By default, 5-second timeout are used.
 ///
-/// *For internal usage, don't use it in your code*.
-#[derive(Default)]
-pub struct InternalGlobalWorkerManager {
-    managers: fiber::Mutex<HashMap<ServiceId, ServiceWorkerManager>>,
+/// Consider using [`PicoContext::set_jobs_shutdown_timeout`] instead
+pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) {
+    // SAFETY: safe as long as picodata version is compatible
+    let rc = unsafe {
+        ffi::pico_ffi_background_set_jobs_shutdown_timeout(
+            plugin.into(),
+            service.into(),
+            version.into(),
+            timeout.as_secs_f64(),
+        )
+    };
+    debug_assert!(
+        rc == 0,
+        "return code is only for future compatibility at the moment"
+    );
 }
 
-// SAFETY: `GlobalWorkerManager` must be used only in the tx thread
-unsafe impl Send for InternalGlobalWorkerManager {}
-unsafe impl Sync for InternalGlobalWorkerManager {}
-
-static IGWM: OnceLock<InternalGlobalWorkerManager> = OnceLock::new();
-
-impl InternalGlobalWorkerManager {
-    fn get_or_insert_worker_manager(&self, service_id: ServiceId) -> ServiceWorkerManager {
-        let mut managers = self.managers.lock();
-        match managers.get(&service_id) {
-            None => {
-                let mgr = ServiceWorkerManager {
-                    jobs: Rc::new(Default::default()),
-                    shutdown_timeout: Rc::new(fiber::Mutex::new(Duration::from_secs(5))),
-                };
-                managers.insert(service_id, mgr.clone());
-                mgr
-            }
-            Some(mgr) => mgr.clone(),
-        }
-    }
+////////////////////////////////////////////////////////////////////////////////
+// CancellationCallbackState
+////////////////////////////////////////////////////////////////////////////////
+
+#[derive(Debug)]
+pub struct CancellationCallbackState {
+    cancel_channel: Channel<()>,
+    finish_channel: Channel<()>,
+    status: Cell<CancellationCallbackStatus>,
+}
 
-    fn remove_plugin_worker_manager(
-        &self,
-        service_id: &ServiceId,
-        timeout: Duration,
-    ) -> Result<(), Error> {
-        let deadline = fiber::clock().saturating_add(timeout);
+#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)]
+pub enum CancellationCallbackStatus {
+    #[default]
+    Initial = 0,
+    JobCancelled = 1,
+    JobFinished = 2,
+}
 
-        let wm = self.managers.lock().remove(service_id);
+impl CancellationCallbackState {
+    fn new(cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
+        Self {
+            cancel_channel,
+            finish_channel,
+            status: Cell::new(CancellationCallbackStatus::Initial),
+        }
+    }
 
-        // drain all jobs manually cause user may have shared references to worker manager
-        if let Some(wm) = wm {
-            let mut jobs = wm.jobs.lock();
-            let (tagged_jobs, untagged_jobs) = mem::take(&mut *jobs);
+    fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> {
+        use CancellationCallbackStatus::*;
+        let next_status = action;
 
-            let mut job_counter = 0;
-            let mut completed_job_counter = 0;
+        if next_status == JobCancelled as u64 {
+            debug_assert_eq!(self.status.get(), Initial);
+            _ = self.cancel_channel.send(());
 
-            for (_, handles) in tagged_jobs {
-                job_counter += handles.len();
-                completed_job_counter += cancel_handles(handles, deadline);
-            }
+            self.status.set(JobCancelled);
+        } else if next_status == JobFinished as u64 {
+            debug_assert_eq!(self.status.get(), JobCancelled);
+            self.finish_channel.recv_timeout(timeout).map_err(|e| {
+                BoxError::new(TarantoolErrorCode::Timeout, i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(e))
+            })?;
 
-            job_counter += untagged_jobs.len();
-            completed_job_counter += cancel_handles(untagged_jobs, deadline);
-            if job_counter != completed_job_counter {
-                return Err(Error::PartialCompleted(job_counter, completed_job_counter));
-            }
+            self.status.set(JobFinished);
+        } else {
+            return Err(BoxError::new(
+                TarantoolErrorCode::IllegalParams,
+                format!("unexpected action: {action}"),
+            ));
         }
+
         Ok(())
     }
+}
 
-    /// Create a new worker manager for given `id` or return existed.
-    pub fn get_or_init_manager(&self, id: ServiceId) -> ServiceWorkerManager {
-        self.get_or_insert_worker_manager(id)
+#[inline]
+fn i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(
+    e: fiber::channel::RecvError,
+) -> &'static str {
+    match e {
+        fiber::channel::RecvError::Timeout => "timeout",
+        fiber::channel::RecvError::Disconnected => "disconnected",
     }
+}
 
-    /// Return preferred shutdown timeout for worker manager jobs.
-    pub fn get_shutdown_timeout(&self, service_id: &ServiceId) -> Option<Duration> {
-        self.managers
-            .lock()
-            .get(service_id)
-            .map(|mgr| *mgr.shutdown_timeout.lock())
-    }
+////////////////////////////////////////////////////////////////////////////////
+// ffi wrappers
+////////////////////////////////////////////////////////////////////////////////
 
-    /// Remove worker manager by `id` with all jobs.
-    /// This function return after all related jobs will be gracefully shutdown or
-    /// after `timeout` duration (with [`Error::PartialCompleted`] error.
-    pub fn unregister_service(&self, id: &ServiceId, timeout: Duration) -> Result<(), Error> {
-        self.remove_plugin_worker_manager(id, timeout)
-    }
+/// **For internal use**.
+#[repr(C)]
+pub struct FfiBackgroundJobCancellationToken {
+    /// This is just a way to do arbitrarily complex things across ABI boundary
+    /// without introducing a large amount of FFI-safe wrappers. This will
+    /// always be [`CancellationCallbackState::cancellation_callback`].
+    callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32,
+    drop: extern "C-unwind" fn(*mut Self),
 
-    /// Return reference to global internal worker manager.
-    pub fn instance() -> &'static Self {
-        let igwm_ref = IGWM.get_or_init(InternalGlobalWorkerManager::default);
-        igwm_ref
-    }
+    /// The pointer to the closure object.
+    closure_pointer: *mut (),
+
+    /// This is the background job fiber which will be cancelled by this token.
+    pub fiber_id: FiberId,
 }
 
-#[cfg(feature = "internal_test")]
-mod tests {
-    use super::*;
-    use std::sync::atomic::{AtomicU64, Ordering};
-    use std::time::Duration;
-    use tarantool::fiber;
-
-    static _1_MS: Duration = Duration::from_millis(1);
-    static _10_MS: Duration = Duration::from_millis(10);
-    static _100_MS: Duration = Duration::from_millis(100);
-    static _200_MS: Duration = Duration::from_millis(200);
-
-    fn make_job(
-        counter: &'static AtomicU64,
-        iteration_duration: Duration,
-    ) -> impl Fn(CancellationToken) {
-        move |cancel_token: CancellationToken| {
-            while cancel_token.wait_timeout(_1_MS).is_err() {
-                counter.fetch_add(1, Ordering::SeqCst);
-                fiber::sleep(iteration_duration);
-            }
-            counter.store(0, Ordering::SeqCst);
-        }
+impl Drop for FfiBackgroundJobCancellationToken {
+    #[inline(always)]
+    fn drop(&mut self) {
+        (self.drop)(self)
     }
+}
 
-    #[::tarantool::test]
-    fn test_work_manager_works() {
-        static COUNTER: AtomicU64 = AtomicU64::new(0);
-
-        let gwm = InternalGlobalWorkerManager::instance();
-        let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0");
-        let wm = gwm.get_or_init_manager(svc_id.clone());
-
-        wm.register_job(make_job(&COUNTER, _1_MS)).unwrap();
-        wm.register_job(make_job(&COUNTER, _1_MS)).unwrap();
+impl FfiBackgroundJobCancellationToken {
+    fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self {
+        let callback_state = CancellationCallbackState::new(cancel_channel, finish_channel);
+        let callback = move |action, timeout| {
+            let res = callback_state.cancellation_callback(action, timeout);
+            if let Err(e) = res {
+                e.set_last();
+                return -1;
+            }
 
-        fiber::sleep(_10_MS);
-        assert!(COUNTER.load(Ordering::SeqCst) > 0);
+            0
+        };
 
-        gwm.unregister_service(&svc_id, _100_MS).unwrap();
-        assert_eq!(COUNTER.load(Ordering::SeqCst), 0);
+        Self::new_inner(fiber_id, callback)
     }
 
-    #[::tarantool::test]
-    fn test_work_manager_tagged_jobs_works() {
-        static COUNTER_1: AtomicU64 = AtomicU64::new(0);
-        static COUNTER_2: AtomicU64 = AtomicU64::new(0);
+    /// This function is needed, because we need this `F` type parameter so that
+    /// we can specialize the `callback` and `drop` with it inside this function.
+    /// If rust supported something like `type F = type_of(callback);` we
+    /// wouldn't need this additional function and would just write this code in
+    /// the [`Self::new`] above.
+    // FIXME: just define an explicit extern "C" fn for cancellation_callback?
+    fn new_inner<F>(fiber_id: FiberId, f: F) -> Self
+    where
+        F: FnMut(u64, Duration) -> i32,
+    {
+        let closure = Box::new(f);
+        let closure_pointer: *mut F = Box::into_raw(closure);
 
-        let gwm = InternalGlobalWorkerManager::instance();
-        let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0");
-        let wm = gwm.get_or_init_manager(svc_id.clone());
+        Self {
+            callback: Self::trampoline::<F>,
+            drop: Self::drop_handler::<F>,
+            closure_pointer: closure_pointer.cast(),
 
-        wm.register_tagged_job(make_job(&COUNTER_1, _1_MS), "j1")
-            .unwrap();
-        wm.register_tagged_job(make_job(&COUNTER_1, _1_MS), "j1")
-            .unwrap();
-        wm.register_tagged_job(make_job(&COUNTER_2, _1_MS), "j2")
-            .unwrap();
+            fiber_id,
+        }
+    }
 
-        fiber::sleep(_10_MS);
-        assert!(COUNTER_1.load(Ordering::SeqCst) > 0);
-        assert!(COUNTER_2.load(Ordering::SeqCst) > 0);
+    /// An ABI-safe wrapper which calls the rust closure stored in `handler`.
+    ///
+    /// The result of the closure is copied onto the fiber's region allocation
+    /// and the pointer to that allocation is written into `output`.
+    extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32
+    where
+        F: FnMut(u64, Duration) -> i32,
+    {
+        // This is safe. To verify see `register_rpc_handler` above.
+        let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() };
+        let closure = unsafe { &mut *closure_pointer };
 
-        wm.cancel_tagged("j1", _10_MS).unwrap();
+        closure(action, Duration::from_secs_f64(timeout))
+    }
 
-        assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0);
-        assert_ne!(COUNTER_2.load(Ordering::SeqCst), 0);
+    extern "C-unwind" fn drop_handler<F>(handler: *mut Self) {
+        unsafe {
+            let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>();
+            let closure = Box::from_raw(closure_pointer);
+            drop(closure);
 
-        gwm.unregister_service(&svc_id, _10_MS).unwrap();
+            if cfg!(debug_assertions) {
+                // Overwrite the pointer with garbage so that we fail loudly is case of a bug
+                (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _;
+            }
+        }
+    }
 
-        assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0);
-        assert_eq!(COUNTER_2.load(Ordering::SeqCst), 0);
+    /// The error is returned via [`BoxError::set_last`].
+    #[inline(always)]
+    pub fn cancel_job(&self) {
+        let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0);
+        debug_assert!(rc == 0);
     }
 
-    #[::tarantool::test]
-    fn test_work_manager_graceful_shutdown() {
-        static COUNTER: AtomicU64 = AtomicU64::new(0);
-
-        let gwm = InternalGlobalWorkerManager::instance();
-        let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0");
-        let wm = gwm.get_or_init_manager(svc_id.clone());
-        // this job cannot stop at 10ms interval
-        wm.register_job(make_job(&COUNTER, _100_MS)).unwrap();
-        // but this job can
-        wm.register_job(make_job(&COUNTER, _1_MS)).unwrap();
-        fiber::sleep(_10_MS);
-        let result = gwm.unregister_service(&svc_id, _10_MS);
-        assert!(
-            matches!(result, Err(Error::PartialCompleted(all, completed)) if all == 2 && completed == 1)
+    /// The error is returned via [`BoxError::set_last`].
+    #[inline(always)]
+    pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> {
+        let rc = (self.callback)(
+            self,
+            CancellationCallbackStatus::JobFinished as _,
+            timeout.as_secs_f64(),
         );
+        if rc == -1 {
+            // Actual error is passed through tarantool. Can't return BoxError
+            // here, because tarantool-module version may be different in picodata.
+            return Err(());
+        }
+
+        Ok(())
     }
 }
diff --git a/picodata-plugin/src/internal/ffi.rs b/picodata-plugin/src/internal/ffi.rs
index d2e3ff290b44cb1249a6b5f6c3d85240c32d79e4..6fd129892aa48ef0753abe077dd21772e23d76af 100644
--- a/picodata-plugin/src/internal/ffi.rs
+++ b/picodata-plugin/src/internal/ffi.rs
@@ -1,9 +1,12 @@
+use crate::background::FfiBackgroundJobCancellationToken;
+use crate::background::JobCancellationResult;
 use crate::internal::types;
 use crate::metrics::FfiMetricsHandler;
 use crate::sql::types::SqlValue;
 use crate::transport::rpc::client::FfiSafeRpcRequestArguments;
 use crate::transport::rpc::server::FfiRpcHandler;
 use crate::util::FfiSafeBytes;
+use crate::util::FfiSafeStr;
 use abi_stable::derive_macro_reexports::{ROption, RResult};
 use abi_stable::std_types::{RDuration, RVec};
 use abi_stable::RTuple;
@@ -52,4 +55,28 @@ extern "C" {
     ) -> i32;
 
     pub fn pico_ffi_register_metrics_handler(handler: FfiMetricsHandler) -> i32;
+
+    pub fn pico_ffi_background_register_job_cancellation_token(
+        plugin: FfiSafeStr,
+        service: FfiSafeStr,
+        version: FfiSafeStr,
+        job_tag: FfiSafeStr,
+        token: FfiBackgroundJobCancellationToken,
+    ) -> i32;
+
+    pub fn pico_ffi_background_cancel_jobs_by_tag(
+        plugin: FfiSafeStr,
+        service: FfiSafeStr,
+        version: FfiSafeStr,
+        job_tag: FfiSafeStr,
+        timeout: f64,
+        result: *mut JobCancellationResult,
+    ) -> i32;
+
+    pub fn pico_ffi_background_set_jobs_shutdown_timeout(
+        plugin: FfiSafeStr,
+        service: FfiSafeStr,
+        version: FfiSafeStr,
+        timeout: f64,
+    ) -> i32;
 }
diff --git a/picodata-plugin/src/plugin/interface.rs b/picodata-plugin/src/plugin/interface.rs
index 208b0bdbefeff82de3a6b857d4829d9d61cccbfd..55d5cca9587e1fa17c9297793dd0654294bc9c27 100644
--- a/picodata-plugin/src/plugin/interface.rs
+++ b/picodata-plugin/src/plugin/interface.rs
@@ -1,13 +1,17 @@
-use crate::background::{InternalGlobalWorkerManager, ServiceWorkerManager};
-use crate::error_code::ErrorCode::PluginError;
+use crate::background;
+use crate::background::ServiceWorkerManager;
+use crate::error_code::ErrorCode;
 use crate::util::FfiSafeStr;
 pub use abi_stable;
 use abi_stable::pmr::{RErr, RResult, RSlice};
 use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec};
 use abi_stable::{sabi_trait, RTuple, StableAbi};
 use serde::de::DeserializeOwned;
+use smol_str::SmolStr;
 use std::error::Error;
 use std::fmt::Display;
+use std::time::Duration;
+use tarantool::error::TarantoolErrorCode;
 use tarantool::error::{BoxError, IntoBoxError};
 
 /// Context of current instance. Produced by picodata.
@@ -15,7 +19,6 @@ use tarantool::error::{BoxError, IntoBoxError};
 #[derive(StableAbi, Debug)]
 pub struct PicoContext {
     is_master: bool,
-    global_wm: *const (),
     pub plugin_name: FfiSafeStr,
     pub service_name: FfiSafeStr,
     pub plugin_version: FfiSafeStr,
@@ -24,12 +27,8 @@ pub struct PicoContext {
 impl PicoContext {
     #[inline]
     pub fn new(is_master: bool) -> PicoContext {
-        let gwm = InternalGlobalWorkerManager::instance() as *const InternalGlobalWorkerManager
-            as *const ();
-
         Self {
             is_master,
-            global_wm: gwm,
             plugin_name: "<unset>".into(),
             service_name: "<unset>".into(),
             plugin_version: "<unset>".into(),
@@ -42,7 +41,6 @@ impl PicoContext {
     pub unsafe fn clone(&self) -> Self {
         Self {
             is_master: self.is_master,
-            global_wm: self.global_wm,
             plugin_name: self.plugin_name.clone(),
             service_name: self.service_name.clone(),
             plugin_version: self.plugin_version.clone(),
@@ -56,23 +54,119 @@ impl PicoContext {
     }
 
     /// Return [`ServiceWorkerManager`] for current service.
+    #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"]
     pub fn worker_manager(&self) -> ServiceWorkerManager {
-        let global_manager: &'static InternalGlobalWorkerManager =
-            // SAFETY: `picodata` guaranty that this reference live enough
-            unsafe { &*(self.global_wm as *const InternalGlobalWorkerManager) };
+        ServiceWorkerManager::new(self.make_service_id())
+    }
+
+    // TODO:
+    // pub fn register_job(&self) -> ServiceWorkerManager {
+    // pub fn register_tagged_job(&self) -> ServiceWorkerManager {
+    // pub fn cancel_job_by_tag(&self) -> ServiceWorkerManager {
+
+    #[inline(always)]
+    pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
+        crate::metrics::register_metrics_handler(self, callback)
+    }
+
+    /// Add a new job to the execution.
+    /// Job work life cycle will be tied to the service life cycle;
+    /// this means that job will be canceled just before service is stopped.
+    ///
+    /// # Arguments
+    ///
+    /// * `job`: callback that will be executed in separated fiber.
+    /// Note that it is your responsibility to organize job graceful shutdown, see a
+    /// [`background::CancellationToken`] for details.
+    ///
+    /// # Examples
+    ///
+    /// ```no_run
+    /// use std::time::Duration;
+    /// use picodata_plugin::background::CancellationToken;
+    ///
+    /// # use picodata_plugin::plugin::interface::PicoContext;
+    /// # fn on_start(context: PicoContext) {
+    ///
+    /// // this job will print "hello" every second,
+    /// // and print "bye" after being canceled
+    /// fn hello_printer(cancel: CancellationToken) {
+    ///     while cancel.wait_timeout(Duration::from_secs(1)).is_err() {
+    ///         println!("hello!");
+    ///     }
+    ///     println!("job cancelled, bye!")
+    /// }
+    /// context.register_job(hello_printer).unwrap();
+    ///
+    /// # }
+    /// ```
+    #[inline(always)]
+    pub fn register_job<F>(&self, job: F) -> Result<(), BoxError>
+    where
+        F: FnOnce(background::CancellationToken) + 'static,
+    {
+        background::register_job(&self.make_service_id(), job)
+    }
 
-        // TODO: can we eliminate allocation here?
-        let service_id = ServiceId::new(
+    /// Same as [`Self::register_job`] but caller may provide a special tag.
+    /// This tag may be used for manual job cancellation using [`Self::cancel_tagged_jobs`].
+    ///
+    /// # Arguments
+    ///
+    /// * `job`: callback that will be executed in separated fiber
+    /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs
+    #[inline(always)]
+    pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError>
+    where
+        F: FnOnce(background::CancellationToken) + 'static,
+    {
+        background::register_tagged_job(&self.make_service_id(), job, tag)
+    }
+
+    /// Cancel all jobs related to the given `tag`.
+    /// This function return after all related jobs will be gracefully shutdown or
+    /// after `timeout` duration.
+    ///
+    /// Returns error with code [`TarantoolErrorCode::Timeout`] in case some
+    /// jobs didn't finish within `timeout`.
+    ///
+    /// May also theoretically return error with code [`ErrorCode::NoSuchService`]
+    /// in case the service doesn't exist anymore (highly unlikely).
+    ///
+    /// See also [`Self::register_tagged_job`].
+    #[inline(always)]
+    pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> {
+        let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?;
+        if res.n_timeouts != 0 {
+            #[rustfmt::skip]
+            return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts)));
+        }
+
+        Ok(())
+    }
+
+    /// In case when jobs were canceled by `picodata` use this function for determine
+    /// a shutdown timeout - time duration that `picodata` uses to ensure that all
+    /// jobs gracefully end.
+    ///
+    /// By default, 5-second timeout are used.
+    #[inline(always)]
+    pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) {
+        crate::background::set_jobs_shutdown_timeout(
             self.plugin_name(),
             self.service_name(),
             self.plugin_version(),
-        );
-        global_manager.get_or_init_manager(service_id)
+            timeout,
+        )
     }
 
     #[inline(always)]
-    pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> {
-        crate::metrics::register_metrics_handler(self, callback)
+    pub fn make_service_id(&self) -> ServiceId {
+        ServiceId::new(
+            self.plugin_name(),
+            self.service_name(),
+            self.plugin_version(),
+        )
     }
 
     #[inline]
@@ -97,9 +191,9 @@ impl PicoContext {
 /// Unique service identifier.
 #[derive(Debug, PartialEq, Eq, Hash, Clone)]
 pub struct ServiceId {
-    pub plugin: String,
-    pub service: String,
-    pub version: String,
+    pub plugin: SmolStr,
+    pub service: SmolStr,
+    pub version: SmolStr,
 }
 
 impl std::fmt::Display for ServiceId {
@@ -112,9 +206,9 @@ impl std::fmt::Display for ServiceId {
 impl ServiceId {
     #[inline(always)]
     pub fn new(
-        plugin: impl Into<String>,
-        service: impl Into<String>,
-        version: impl Into<String>,
+        plugin: impl Into<SmolStr>,
+        service: impl Into<SmolStr>,
+        version: impl Into<SmolStr>,
     ) -> Self {
         Self {
             plugin: plugin.into(),
@@ -298,7 +392,7 @@ impl<C: DeserializeOwned> ServiceProxy<C> {
 /// UAF can happen if user error points into memory allocated by dynamic lib and lives
 /// longer than dynamic lib memory (that was unmapped by system).
 fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> {
-    let tt_error = BoxError::new(PluginError, source.to_string());
+    let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string());
     tt_error.set_last_error();
     RErr(())
 }
diff --git a/picodata-plugin/src/util.rs b/picodata-plugin/src/util.rs
index dc7d3fb792002fe5e538689b031f5973b401a66d..1b60c4dc61b8a2a7dafd1496c44333a38a8b2e07 100644
--- a/picodata-plugin/src/util.rs
+++ b/picodata-plugin/src/util.rs
@@ -1,3 +1,4 @@
+use crate::error_code::ErrorCode;
 use abi_stable::StableAbi;
 use std::ptr::NonNull;
 use tarantool::error::BoxError;
@@ -381,6 +382,14 @@ fn as_non_null_ptr<T>(data: &[T]) -> NonNull<T> {
     unsafe { NonNull::new_unchecked(pointer as *mut _) }
 }
 
+// TODO: this should be in tarantool module
+pub fn tarantool_error_to_box_error(e: tarantool::error::Error) -> BoxError {
+    match e {
+        tarantool::error::Error::Tarantool(e) => e,
+        other => BoxError::new(ErrorCode::Other, other.to_string()),
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 // test
 ////////////////////////////////////////////////////////////////////////////////
diff --git a/src/lib.rs b/src/lib.rs
index f44b8ebe78e23865413d299ce7a89a08e64a0dfd..a5815e3166399029dabfeeee85c5f08759a2552d 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -293,7 +293,6 @@ fn start_webui() {
 /// (discovery, rpc, public proc api).
 fn init_handlers() {
     plugin::rpc::server::init_handlers();
-    plugin::metrics::init_handlers();
 
     rpc::init_static_proc_set();
 
diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs
index 4229b9f1bd0ce92db122bdc2f51a5c36d3171c5e..4f2ceca2bf1262448086cbf891f16e0adbe77502 100644
--- a/src/on_shutdown.rs
+++ b/src/on_shutdown.rs
@@ -1,5 +1,4 @@
 use crate::has_states;
-use crate::plugin::PluginEvent;
 use crate::tarantool;
 use crate::tlog;
 use crate::traft::error::Error;
@@ -31,12 +30,7 @@ pub async fn callback() {
     // 1. Wake up the sentinel so it starts trying to set target state Offline.
     node.sentinel_loop.on_shut_down();
 
-    if let Err(e) = node
-        .plugin_manager
-        .handle_event_sync(PluginEvent::InstanceShutdown)
-    {
-        tlog!(Error, "plugin `on_stop` error: {e}");
-    };
+    node.plugin_manager.handle_instance_shutdown();
 
     fiber::reschedule();
 
diff --git a/src/plugin/background.rs b/src/plugin/background.rs
new file mode 100644
index 0000000000000000000000000000000000000000..ce651ec2c4a5ded2999702bfcb444c501cd413a4
--- /dev/null
+++ b/src/plugin/background.rs
@@ -0,0 +1,135 @@
+use crate::plugin::ServiceState;
+use crate::tlog;
+use crate::traft::node;
+use picodata_plugin::background::FfiBackgroundJobCancellationToken;
+use picodata_plugin::background::JobCancellationResult;
+use picodata_plugin::error_code::ErrorCode;
+use picodata_plugin::plugin::interface::ServiceId;
+use smol_str::SmolStr;
+use std::time::Duration;
+use tarantool::error::BoxError;
+use tarantool::fiber;
+use tarantool::time::Instant;
+
+////////////////////////////////////////////////////////////////////////////////
+// register_background_job_cancellation_token
+////////////////////////////////////////////////////////////////////////////////
+
+pub fn register_background_job_cancellation_token(
+    service: ServiceId,
+    job_tag: SmolStr,
+    token: FfiBackgroundJobCancellationToken,
+) -> Result<(), BoxError> {
+    let node = node::global().expect("initialized before plugins");
+    let manager = &node.plugin_manager;
+
+    if manager.get_service_state(&service).is_none() {
+        crate::warn_or_panic!("plugin callback called for non-existent service {service}");
+        #[rustfmt::skip]
+        return Err(BoxError::new(ErrorCode::NoSuchService, format!("service `{service}` not found")));
+    }
+
+    let mut guard = manager.background_job_cancellation_tokens.lock();
+    let all_jobs = guard.entry(service).or_default();
+    all_jobs.push((job_tag, token));
+
+    Ok(())
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// cancel_background_jobs_by_tag
+////////////////////////////////////////////////////////////////////////////////
+
+pub fn cancel_background_jobs_by_tag(
+    service_id: ServiceId,
+    job_tag: SmolStr,
+    timeout: Duration,
+) -> Result<JobCancellationResult, BoxError> {
+    let node = node::global().expect("initialized before plugins");
+    let manager = &node.plugin_manager;
+
+    let Some(service) = manager.get_service_state(&service_id) else {
+        crate::warn_or_panic!("plugin callback called for non-existent service {service_id}");
+        #[rustfmt::skip]
+        return Err(BoxError::new(ErrorCode::NoSuchService, format!("service `{service_id}` not found")));
+    };
+
+    let mut guard = manager.background_job_cancellation_tokens.lock();
+    let Some(all_jobs) = guard.get_mut(&service.id) else {
+        return Ok(JobCancellationResult::new(0, 0));
+    };
+
+    let target_tag = job_tag;
+    let mut jobs_to_cancel = vec![];
+    let mut cursor = 0;
+    while cursor < all_jobs.len() {
+        let (job_tag, _) = &all_jobs[cursor];
+        if &target_tag == job_tag {
+            let token = all_jobs.swap_remove(cursor);
+            jobs_to_cancel.push(token);
+            continue;
+        }
+
+        cursor += 1;
+    }
+
+    // Release the lock.
+    drop(guard);
+
+    cancel_jobs(&service, &jobs_to_cancel);
+
+    let deadline = fiber::clock().saturating_add(timeout);
+    let n_timeouts = wait_jobs_finished(&service, &jobs_to_cancel, deadline);
+
+    let n_total = jobs_to_cancel.len();
+    Ok(JobCancellationResult::new(n_total as _, n_timeouts))
+}
+
+pub fn cancel_jobs(service: &ServiceState, jobs: &[(SmolStr, FfiBackgroundJobCancellationToken)]) {
+    let service_id = &service.id;
+    for (job_tag, token) in jobs {
+        #[rustfmt::skip]
+        tlog!(Debug, "cancelling service {service_id} background job `{job_tag}`");
+        token.cancel_job();
+    }
+}
+
+/// Returns the number of jobs which is didn't finish in time.
+pub fn wait_jobs_finished(
+    service: &ServiceState,
+    jobs: &[(SmolStr, FfiBackgroundJobCancellationToken)],
+    deadline: Instant,
+) -> u32 {
+    let service_id = &service.id;
+
+    let mut n_timeouts = 0;
+    for (job_tag, token) in jobs {
+        let timeout = deadline.duration_since(fiber::clock());
+        let res = token.wait_job_finished(timeout);
+        if res.is_err() {
+            let e = BoxError::last();
+            #[rustfmt::skip]
+            tlog!(Warning, "service {service_id} job `{job_tag}` didn't finish in time: {e}");
+
+            n_timeouts += 1;
+        }
+    }
+
+    n_timeouts
+}
+
+////////////////////////////////////////////////////////////////////////////////
+// set_shutdown_jobs_shutdown_timeout
+////////////////////////////////////////////////////////////////////////////////
+
+pub fn set_jobs_shutdown_timeout(service: ServiceId, timeout: Duration) {
+    let node = node::global().expect("initialized before plugins");
+    let manager = &node.plugin_manager;
+
+    let Some(service) = manager.get_service_state(&service) else {
+        crate::warn_or_panic!("plugin callback called for non-existent service {service}");
+        return;
+    };
+
+    service.background_job_shutdown_timeout.set(Some(timeout));
+}
diff --git a/src/plugin/ffi.rs b/src/plugin/ffi.rs
index 75ca13f892454d45b73c7e2edbefbb130ef9a538..d77741d1a132572965248efa84011e52873d1f17 100644
--- a/src/plugin/ffi.rs
+++ b/src/plugin/ffi.rs
@@ -1,3 +1,5 @@
+#[allow(unused_imports)]
+use crate::error_code::ErrorCode;
 use crate::info::{InstanceInfo, RaftInfo, VersionInfo};
 use crate::instance::StateVariant;
 use crate::plugin::{rpc, PluginIdentifier};
@@ -8,15 +10,20 @@ use crate::{cas, sql, traft};
 use abi_stable::pmr::{RErr, RNone, ROk, ROption, RResult, RSome};
 use abi_stable::std_types::{RDuration, RVec, Tuple2};
 use abi_stable::{sabi_extern_fn, RTuple};
+use picodata_plugin::background::FfiBackgroundJobCancellationToken;
+use picodata_plugin::background::JobCancellationResult;
 use picodata_plugin::internal::types;
 use picodata_plugin::internal::types::{DmlInner, OpInner};
 use picodata_plugin::metrics::FfiMetricsHandler;
+use picodata_plugin::plugin::interface::ServiceId;
 use picodata_plugin::sql::types::{SqlValue, SqlValueInner};
 use picodata_plugin::transport::rpc::client::FfiSafeRpcRequestArguments;
 use picodata_plugin::transport::rpc::server::FfiRpcHandler;
 use picodata_plugin::util::FfiSafeBytes;
+use picodata_plugin::util::FfiSafeStr;
 use sbroad::ir::value::double::Double;
 use sbroad::ir::value::{LuaValue, Tuple, Value};
+use std::time::Duration;
 use std::{mem, slice};
 use tarantool::datetime::Datetime;
 use tarantool::error::IntoBoxError;
@@ -395,3 +402,96 @@ pub extern "C" fn pico_ffi_register_metrics_handler(handler: FfiMetricsHandler)
 
     0
 }
+
+/// Returns error with code [`ErrorCode::NoSuchService`] if there's no service with provided id.
+///
+/// Otherwise writes into `result` info about how many jobs (if any) didn't
+/// finish in the provided `timeout`.
+#[no_mangle]
+pub extern "C" fn pico_ffi_background_cancel_jobs_by_tag(
+    plugin: FfiSafeStr,
+    service: FfiSafeStr,
+    version: FfiSafeStr,
+    job_tag: FfiSafeStr,
+    timeout: f64,
+    result: *mut JobCancellationResult,
+) -> i32 {
+    // SAFETY: data outlives this function call
+    let plugin = unsafe { plugin.as_str() };
+    let service = unsafe { service.as_str() };
+    let version = unsafe { version.as_str() };
+    let service_id = ServiceId::new(plugin, service, version);
+
+    let job_tag = unsafe { job_tag.as_str() };
+
+    let res = crate::plugin::background::cancel_background_jobs_by_tag(
+        service_id,
+        job_tag.into(),
+        Duration::from_secs_f64(timeout),
+    );
+
+    let res = match res {
+        Ok(v) => v,
+        Err(e) => {
+            e.set_last();
+            return -1;
+        }
+    };
+
+    // SAFETY: the caller is responsible for this to be safe
+    unsafe { std::ptr::write(result, res) };
+
+    0
+}
+
+#[no_mangle]
+pub extern "C" fn pico_ffi_background_register_job_cancellation_token(
+    plugin: FfiSafeStr,
+    service: FfiSafeStr,
+    version: FfiSafeStr,
+    job_tag: FfiSafeStr,
+    token: FfiBackgroundJobCancellationToken,
+) -> i32 {
+    // SAFETY: data outlives this function call
+    let plugin = unsafe { plugin.as_str() };
+    let service = unsafe { service.as_str() };
+    let version = unsafe { version.as_str() };
+    let service_id = ServiceId::new(plugin, service, version);
+
+    let job_tag = unsafe { job_tag.as_str() };
+
+    let res = crate::plugin::background::register_background_job_cancellation_token(
+        service_id,
+        job_tag.into(),
+        token,
+    );
+
+    if let Err(e) = res {
+        e.set_last();
+        return -1;
+    }
+
+    0
+}
+
+/// Set a user-specified timeout value when shutting down all background jobs
+/// in case of service shutdown.
+#[no_mangle]
+pub extern "C" fn pico_ffi_background_set_jobs_shutdown_timeout(
+    plugin: FfiSafeStr,
+    service: FfiSafeStr,
+    version: FfiSafeStr,
+    timeout: f64,
+) -> i32 {
+    // SAFETY: data outlives this function call
+    let plugin = unsafe { plugin.as_str() };
+    let service = unsafe { service.as_str() };
+    let version = unsafe { version.as_str() };
+    let service_id = ServiceId::new(plugin, service, version);
+    crate::plugin::background::set_jobs_shutdown_timeout(
+        service_id,
+        Duration::from_secs_f64(timeout),
+    );
+
+    0
+}
diff --git a/src/plugin/manager.rs b/src/plugin/manager.rs
index 6495ec20d5ebadbeafdf468bcc5198e209960467..9ac92d4a9de8f38c4e029d4955b6aa0430009c84 100644
--- a/src/plugin/manager.rs
+++ b/src/plugin/manager.rs
@@ -1,12 +1,13 @@
 use crate::config::PicodataConfig;
 use crate::info::PICODATA_VERSION;
+use crate::plugin::background;
 use crate::plugin::rpc;
 use crate::plugin::LibraryWrapper;
 use crate::plugin::PluginError::{PluginNotFound, ServiceCollision};
 use crate::plugin::ServiceState;
 use crate::plugin::{
     remove_routes, replace_routes, topology, Manifest, PluginAsyncEvent, PluginCallbackError,
-    PluginError, PluginEvent, PluginIdentifier, Result,
+    PluginError, PluginIdentifier, Result,
 };
 use crate::schema::{PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey};
 use crate::storage::Clusterwide;
@@ -18,12 +19,14 @@ use crate::version::Version;
 use crate::{tlog, traft, warn_or_panic};
 use abi_stable::derive_macro_reexports::{RErr, RResult, RSlice};
 use abi_stable::std_types::RStr;
-use picodata_plugin::background::{Error, InternalGlobalWorkerManager};
+use picodata_plugin::background::FfiBackgroundJobCancellationToken;
 use picodata_plugin::error_code::ErrorCode::PluginError as PluginErrorCode;
+use picodata_plugin::metrics::FfiMetricsHandler;
 use picodata_plugin::plugin::interface::FnServiceRegistrar;
 use picodata_plugin::plugin::interface::ServiceId;
 use picodata_plugin::plugin::interface::{PicoContext, ServiceRegistry};
 use picodata_plugin::util::DisplayErrorLocation;
+use smol_str::SmolStr;
 use std::collections::HashMap;
 use std::fs;
 use std::fs::ReadDir;
@@ -74,6 +77,28 @@ struct PluginState {
     version: String,
 }
 
+impl PluginState {
+    fn new(version: String) -> Self {
+        Self {
+            version,
+            services: vec![],
+        }
+    }
+
+    fn remove_service(&mut self, service: &str) -> Option<Rc<ServiceState>> {
+        let index = self
+            .services
+            .iter()
+            .position(|svc| svc.id.service == service)?;
+        let service = self.services.swap_remove(index);
+        Some(service)
+    }
+}
+
+type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>;
+type BackgroundJobCancellationTokenMap =
+    HashMap<ServiceId, Vec<(SmolStr, FfiBackgroundJobCancellationToken)>>;
+
 pub struct PluginManager {
     /// List of pairs (plugin name -> plugin state).
     ///
@@ -86,8 +111,16 @@ pub struct PluginManager {
 
     /// Queue of async events.
     events_queue: Option<fiber::channel::Channel<PluginAsyncEvent>>,
+
+    /// Plugin-defined metrics callbacks.
+    pub(crate) metrics_handlers: fiber::Mutex<MetricsHandlerMap>,
+
+    pub(crate) background_job_cancellation_tokens: fiber::Mutex<BackgroundJobCancellationTokenMap>,
+
     /// Fiber for handle async events, those handlers need it for avoided yield's.
-    _loop: Option<fiber::JoinHandle<'static, ()>>,
+    /// Id is only stored for debugging.
+    #[allow(dead_code)]
+    async_event_fiber_id: fiber::FiberId,
 }
 
 impl PluginManager {
@@ -100,18 +133,20 @@ impl PluginManager {
         let options = WorkerOptions::default();
         let pool = ConnectionPool::new(storage, options);
 
-        let r#loop = Loop::new(plugins.clone());
-        let defer_events_fiber = fiber::Builder::new()
+        let async_event_fiber_id = fiber::Builder::new()
             .name("plugin_manager_loop")
-            .func(move || r#loop.run(tx))
-            .defer()
-            .expect("Plugin manager fiber should not fail");
+            .func(move || plugin_manager_async_event_loop(tx))
+            .defer_non_joinable()
+            .expect("Plugin manager fiber should not fail")
+            .expect("fiber id is supported");
 
         Self {
             plugins,
             pool,
             events_queue: Some(rx),
-            _loop: defer_events_fiber.into(),
+            metrics_handlers: Default::default(),
+            background_job_cancellation_tokens: Default::default(),
+            async_event_fiber_id,
         }
     }
 
@@ -288,10 +323,9 @@ impl PluginManager {
             },
         );
 
-        self.handle_event_sync(PluginEvent::PluginLoad {
-            ident,
-            service_defs: &service_defs,
-        })
+        self.handle_plugin_load(ident, &service_defs)?;
+
+        Ok(())
     }
 
     /// Check the possibility of loading plugin into instance.
@@ -307,7 +341,7 @@ impl PluginManager {
     }
 
     /// Load and start all enabled plugins and services that must be loaded.
-    fn handle_instance_online(&self) -> Result<()> {
+    pub(crate) fn handle_instance_online(&self) -> Result<()> {
         let node = node::global().expect("node must be already initialized");
 
         let instance_name = node
@@ -352,23 +386,41 @@ impl PluginManager {
         Ok(())
     }
 
+    fn handle_async_event(&self, event: PluginAsyncEvent) -> traft::Result<()> {
+        match event {
+            PluginAsyncEvent::ServiceConfigurationUpdated {
+                ident,
+                service,
+                old_raw,
+                new_raw,
+            } => {
+                let res = self.handle_config_updated(&ident, &service, &old_raw, &new_raw);
+                if let Err(e) = res {
+                    tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}");
+                }
+            }
+            PluginAsyncEvent::PluginDisabled { name } => {
+                let res = self.handle_plugin_disabled(&name);
+                if let Err(e) = res {
+                    tlog!(Error, "plugin {name} remove error: {e}");
+                }
+            }
+        }
+
+        Ok(())
+    }
+
     /// Call `on_stop` callback at services and remove plugin from managed.
-    fn handle_plugin_disabled(
-        plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
-        plugin_name: &str,
-    ) -> traft::Result<()> {
+    fn handle_plugin_disabled(&self, plugin_name: &str) -> traft::Result<()> {
         let node = node::global().expect("node must be already initialized");
         let ctx = context_from_node(node);
 
-        let plugin = {
-            let mut lock = plugins.lock();
-            lock.remove(plugin_name)
-        };
+        let plugin = self.plugins.lock().remove(plugin_name);
 
         if let Some(plugin_state) = plugin {
             // stop all background jobs and remove metrics first
-            stop_background_jobs(&plugin_state.services);
-            remove_metrics(&plugin_state.services);
+            self.stop_background_jobs(&plugin_state.services);
+            self.remove_metrics_handlers(&plugin_state.services);
 
             for service in plugin_state.services.iter() {
                 stop_service(service, &ctx);
@@ -379,7 +431,7 @@ impl PluginManager {
     }
 
     /// Stop all plugin services.
-    fn handle_instance_shutdown(&self) {
+    pub(crate) fn handle_instance_shutdown(&self) {
         let node = node::global().expect("node must be already initialized");
         let ctx = context_from_node(node);
 
@@ -387,8 +439,8 @@ impl PluginManager {
         let services_to_stop = plugins.values().flat_map(|state| &state.services);
 
         // stop all background jobs and remove metrics first
-        stop_background_jobs(services_to_stop.clone());
-        remove_metrics(services_to_stop.clone());
+        self.stop_background_jobs(services_to_stop.clone());
+        self.remove_metrics_handlers(services_to_stop.clone());
 
         for service in services_to_stop {
             stop_service(service, &ctx);
@@ -397,7 +449,7 @@ impl PluginManager {
 
     /// Call `on_config_change` at services. Poison services if error at callbacks happens.
     fn handle_config_updated(
-        plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
+        &self,
         plugin_identity: &PluginIdentifier,
         service: &str,
         old_cfg_raw: &[u8],
@@ -408,7 +460,7 @@ impl PluginManager {
         let storage = &node.storage;
 
         let maybe_service = {
-            let lock = plugins.lock();
+            let lock = self.plugins.lock();
             lock.get(&plugin_identity.name)
                 .and_then(|plugin_state| {
                     plugin_state
@@ -479,7 +531,7 @@ impl PluginManager {
     }
 
     /// Call `on_leader_change` at services. Poison services if error at callbacks happens.
-    fn handle_rs_leader_change(&self) -> traft::Result<()> {
+    pub(crate) fn handle_rs_leader_change(&self) -> traft::Result<()> {
         let node = node::global()?;
         let mut ctx = context_from_node(node);
         let storage = &node.storage;
@@ -581,6 +633,9 @@ impl PluginManager {
             ));
         };
 
+        let service = Rc::new(new_service);
+        let id = &service.id;
+
         // call `on_start` callback
         let mut ctx = context_from_node(node);
         let cfg = node
@@ -588,13 +643,21 @@ impl PluginManager {
             .plugin_config
             .get_by_entity_as_mp(plugin_ident, &service_defs[0].name)?;
         let cfg_raw = rmp_serde::encode::to_vec_named(&cfg).expect("out of memory");
-        context_set_service_info(&mut ctx, &new_service);
 
-        let id = &new_service.id;
+        // add the service to the storage, because the `on_start` may attempt to
+        // indirectly reference it through there
+        let mut plugins = self.plugins.lock();
+        let plugin = plugins
+            .entry(plugin_def.name)
+            .or_insert_with(|| PluginState::new(plugin_ident.version.to_string()));
+        plugin.services.push(service.clone());
+
         #[rustfmt::skip]
         tlog!(Debug, "calling {id}.on_start");
 
-        let mut guard = new_service.volatile_state.lock();
+        context_set_service_info(&mut ctx, &service);
+
+        let mut guard = service.volatile_state.lock();
         let res = guard.inner.on_start(&ctx, RSlice::from(cfg_raw.as_slice()));
         // Release the lock
         drop(guard);
@@ -605,21 +668,16 @@ impl PluginManager {
             #[rustfmt::skip]
             tlog!(Error, "plugin callback {id}.on_start error: {loc}{error}");
 
+            // Remove the service which we just added, because it failed to enable
+            plugin.remove_service(&id.service);
+
             return Err(PluginError::Callback(PluginCallbackError::OnStart(error)));
         }
 
-        // append new service to a plugin
-        let mut plugins = self.plugins.lock();
-        let entry = plugins.entry(plugin_def.name).or_insert(PluginState {
-            services: vec![],
-            version: plugin_ident.version.to_string(),
-        });
-        entry.services.push(Rc::new(new_service));
-
         Ok(())
     }
 
-    fn handle_service_disabled(&self, plugin_ident: &PluginIdentifier, service: &str) {
+    pub(crate) fn handle_service_disabled(&self, plugin_ident: &PluginIdentifier, service: &str) {
         let node = node::global().expect("node must be already initialized");
         let ctx = context_from_node(node);
 
@@ -632,19 +690,14 @@ impl PluginManager {
             return;
         }
 
-        let Some(svc_idx) = state
-            .services
-            .iter()
-            .position(|svc| svc.id.service == service)
-        else {
+        let Some(service_to_del) = state.remove_service(service) else {
             return;
         };
-        let service_to_del = state.services.swap_remove(svc_idx);
         drop(plugins);
 
         // stop all background jobs and remove metrics first
-        stop_background_jobs(&[service_to_del.clone()]);
-        remove_metrics(&[service_to_del.clone()]);
+        self.stop_background_jobs(&[service_to_del.clone()]);
+        self.remove_metrics_handlers(&[service_to_del.clone()]);
 
         // call `on_stop` callback and drop service
         stop_service(&service_to_del, &ctx);
@@ -704,29 +757,8 @@ impl PluginManager {
         Ok(())
     }
 
-    fn handle_plugin_load_error(&self, plugin: &str) -> Result<()> {
-        let node = node::global().expect("must be initialized");
-        let ctx = context_from_node(node);
-
-        let maybe_plugin_state = {
-            let mut lock = self.plugins.lock();
-            lock.remove(plugin)
-        };
-
-        if let Some(plugin_state) = maybe_plugin_state {
-            // stop all background jobs and remove metrics first
-            stop_background_jobs(&plugin_state.services);
-            remove_metrics(&plugin_state.services);
-
-            for service in plugin_state.services.iter() {
-                stop_service(&service, &ctx);
-            }
-        }
-        Ok(())
-    }
-
     /// Call user defined service configuration validation.
-    fn handle_before_service_reconfigured(
+    pub(crate) fn handle_before_service_reconfigured(
         &self,
         plugin_ident: &PluginIdentifier,
         service_name: &str,
@@ -806,76 +838,82 @@ impl PluginManager {
         ))
     }
 
-    /// Handle picodata event by plugin system.
-    /// Any event may be handled by any count of plugins that are interested in this event.
-    /// Return error if any of service callbacks return error.
-    ///
-    /// # Arguments
-    ///
-    /// * `event`: upcoming event
-    pub fn handle_event_sync(&self, event: PluginEvent) -> Result<()> {
-        match event {
-            PluginEvent::InstanceOnline => {
-                self.handle_instance_online()?;
-            }
-            PluginEvent::InstanceShutdown => {
-                self.handle_instance_shutdown();
-            }
-            PluginEvent::PluginLoad {
-                ident,
-                service_defs,
-            } => {
-                self.handle_plugin_load(ident, service_defs)?;
-            }
-            PluginEvent::PluginLoadError { name } => {
-                self.handle_plugin_load_error(name)?;
-            }
-            PluginEvent::BeforeServiceConfigurationUpdated {
-                ident,
-                service,
-                new_raw,
-            } => {
-                self.handle_before_service_reconfigured(ident, service, new_raw)?;
-            }
-            PluginEvent::InstanceDemote | PluginEvent::InstancePromote => {
-                if let Err(e) = self.handle_rs_leader_change() {
-                    tlog!(Error, "on_leader_change error: {e}");
-                }
-            }
-            PluginEvent::ServiceEnabled { ident, service } => {
-                self.handle_service_enabled(ident, service)?;
-            }
-            PluginEvent::ServiceDisabled { ident, service } => {
-                self.handle_service_disabled(ident, service);
-            }
-        }
-
-        Ok(())
-    }
-
     /// Queue event for deferred execution.
     /// May be called from transactions because never yields.
     ///
     /// # Arguments
     ///
     /// * `event`: queued event
-    pub fn handle_event_async(&self, event: PluginAsyncEvent) -> Result<()> {
+    pub fn add_async_event_to_queue(&self, event: PluginAsyncEvent) -> Result<()> {
         self.events_queue
             .as_ref()
             .expect("infallible")
             .try_send(event)
             .map_err(|_| PluginError::AsyncEventQueueFull)
     }
+
+    fn remove_metrics_handlers<'a>(
+        &self,
+        services: impl IntoIterator<Item = &'a Rc<ServiceState>>,
+    ) {
+        let mut handlers = self.metrics_handlers.lock();
+        for service in services {
+            let id = &service.id;
+            let handler = handlers.remove(id);
+            if handler.is_some() {
+                tlog!(Info, "unregistered metrics handler for `{id}`");
+            }
+        }
+    }
+
+    fn stop_background_jobs<'a>(&self, services: impl IntoIterator<Item = &'a Rc<ServiceState>>) {
+        let mut guard = self.background_job_cancellation_tokens.lock();
+
+        let mut service_jobs = vec![];
+        for service in services {
+            let Some(jobs) = guard.remove(&service.id) else {
+                continue;
+            };
+
+            service_jobs.push((service, jobs));
+        }
+
+        // Release the lock.
+        drop(guard);
+
+        for (service, jobs) in &service_jobs {
+            background::cancel_jobs(service, jobs);
+        }
+
+        const DEFAULT_BACKGROUND_JOB_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
+
+        let start = fiber::clock();
+        for (service, jobs) in &service_jobs {
+            let timeout = service.background_job_shutdown_timeout.get();
+            let timeout = timeout.unwrap_or(DEFAULT_BACKGROUND_JOB_SHUTDOWN_TIMEOUT);
+            let deadline = start.saturating_add(timeout);
+
+            background::wait_jobs_finished(service, jobs, deadline);
+        }
+    }
+
+    pub fn get_service_state(&self, id: &ServiceId) -> Option<Rc<ServiceState>> {
+        let plugins = self.plugins.lock();
+        let plugin = plugins.get(id.plugin())?;
+        for service in &plugin.services {
+            if &service.id == id {
+                return Some(service.clone());
+            }
+        }
+
+        None
+    }
 }
 
 impl Drop for PluginManager {
     fn drop(&mut self) {
         let event_queue = self.events_queue.take();
         event_queue.unwrap().close();
-
-        if let Some(r#loop) = self._loop.take() {
-            r#loop.join();
-        }
     }
 }
 
@@ -910,87 +948,12 @@ fn stop_service(service: &ServiceState, context: &PicoContext) {
 }
 
 /// Plugin manager inner loop, using for handle async events (must be run in a separate fiber).
-struct Loop {
-    /// List of pairs (plugin name -> plugin state)
-    ///
-    /// There are two mutex here to avoid the situation when one long service callback
-    /// will block other services.
-    plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>,
-}
-
-impl Loop {
-    pub fn new(plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>) -> Self {
-        Self { plugins }
-    }
-
-    fn handle_event(&self, event: PluginAsyncEvent) -> traft::Result<()> {
-        match event {
-            PluginAsyncEvent::ServiceConfigurationUpdated {
-                ident,
-                service,
-                old_raw,
-                new_raw,
-            } => {
-                let plugins = self.plugins.clone();
-                if let Err(e) = PluginManager::handle_config_updated(
-                    plugins, &ident, &service, &old_raw, &new_raw,
-                ) {
-                    tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}");
-                }
-            }
-            PluginAsyncEvent::PluginDisabled { name } => {
-                let plugins = self.plugins.clone();
-                if let Err(e) = PluginManager::handle_plugin_disabled(plugins, &name) {
-                    tlog!(Error, "plugin {name} remove error: {e}");
-                }
-            }
+fn plugin_manager_async_event_loop(event_chan: fiber::channel::Channel<PluginAsyncEvent>) {
+    while let Some(event) = event_chan.recv() {
+        let node =
+            node::global().expect("initialized by the time plugin async events start happening");
+        if let Err(e) = node.plugin_manager.handle_async_event(event) {
+            tlog!(Error, "plugin async event handler error: {e}");
         }
-
-        Ok(())
-    }
-
-    fn run(&self, event_chan: fiber::channel::Channel<PluginAsyncEvent>) {
-        while let Some(event) = event_chan.recv() {
-            if let Err(e) = self.handle_event(event) {
-                tlog!(Error, "plugin async event handler error: {e}");
-            }
-        }
-    }
-}
-
-fn stop_background_jobs<'a>(services: impl IntoIterator<Item = &'a Rc<ServiceState>>) {
-    const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5);
-
-    let mut service_to_unregister = vec![];
-    let mut max_shutdown_timeout = DEFAULT_SHUTDOWN_TIMEOUT;
-    for service in services {
-        if let Some(timeout) =
-            InternalGlobalWorkerManager::instance().get_shutdown_timeout(&service.id)
-        {
-            if max_shutdown_timeout < timeout {
-                max_shutdown_timeout = timeout;
-            }
-        }
-        service_to_unregister.push(&service.id);
-    }
-
-    let mut fibers = vec![];
-    for svc_id in service_to_unregister {
-        fibers.push(fiber::start(move || {
-            if let Err(Error::PartialCompleted(expected, completed)) =
-                InternalGlobalWorkerManager::instance().unregister_service(&svc_id, max_shutdown_timeout)
-            {
-                tlog!(Warning, "Not all jobs for service {svc_id} was completed on time, expected: {expected}, completed: {completed}");
-            }
-        }));
-    }
-    for fiber in fibers {
-        fiber.join();
-    }
-}
-
-fn remove_metrics<'a>(plugins: impl IntoIterator<Item = &'a Rc<ServiceState>>) {
-    for service in plugins {
-        crate::plugin::metrics::unregister_metrics_handler(&service.id)
     }
 }
diff --git a/src/plugin/metrics.rs b/src/plugin/metrics.rs
index d9df9cfb95a673d40ec3c3c89f7a10c19fbd4095..819507238f7a89e1a6c3dd6cb22fc3e693731a4a 100644
--- a/src/plugin/metrics.rs
+++ b/src/plugin/metrics.rs
@@ -1,32 +1,11 @@
 use crate::tlog;
+use crate::traft::node;
 use picodata_plugin::metrics::FfiMetricsHandler;
-use picodata_plugin::plugin::interface::ServiceId;
 use picodata_plugin::util::RegionGuard;
 use std::collections::hash_map::Entry;
-use std::collections::HashMap;
 use std::rc::Rc;
 use tarantool::error::BoxError;
 use tarantool::error::TarantoolErrorCode;
-use tarantool::fiber::mutex::MutexGuard;
-use tarantool::fiber::Mutex;
-
-static mut HANDLERS: Option<Mutex<MetricsHandlerMap>> = None;
-
-type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>;
-
-pub(crate) fn init_handlers() {
-    unsafe {
-        HANDLERS = Some(Mutex::new(HashMap::new()));
-    }
-}
-
-#[inline]
-fn handlers() -> MutexGuard<'static, MetricsHandlerMap> {
-    // SAFETY: global variable access: safe in tx thread.
-    let handlers = unsafe { HANDLERS.as_ref() };
-    let handlers = handlers.expect("should be initialized at startup");
-    handlers.lock()
-}
 
 pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxError> {
     let identifier = &handler.identifier;
@@ -50,7 +29,8 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr
         return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty"));
     }
 
-    let mut handlers = handlers();
+    let node = node::global()?;
+    let mut handlers = node.plugin_manager.metrics_handlers.lock();
     let service_id = identifier.service_id();
     let entry = handlers.entry(service_id);
 
@@ -76,20 +56,16 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr
     Ok(())
 }
 
-pub fn unregister_metrics_handler(service_id: &ServiceId) {
-    // SAFETY: global variable access: safe in tx thread.
-    let handler = handlers().remove(service_id);
-    if handler.is_some() {
-        tlog!(Info, "unregistered metrics handler for `{service_id}`");
-    }
-}
-
 pub fn get_plugin_metrics() -> String {
     let _guard = RegionGuard::new();
 
     let mut res = String::new();
 
-    let handlers = handlers();
+    let Ok(node) = node::global() else {
+        return "".into();
+    };
+
+    let handlers = node.plugin_manager.metrics_handlers.lock();
     // Note: must first copy all references to a local vec, because the callbacks may yield.
     let handler_copies: Vec<_> = handlers.values().cloned().collect();
     // Release the lock.
diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs
index fc0b3d9fd8ee3c2bdc563ad793c1640de79c756f..978b9618d66437e24b25f4f2378361810694cef1 100644
--- a/src/plugin/mod.rs
+++ b/src/plugin/mod.rs
@@ -1,3 +1,4 @@
+pub mod background;
 mod ffi;
 pub mod lock;
 pub mod manager;
@@ -30,6 +31,7 @@ pub use picodata_plugin::plugin::interface::ServiceId;
 use picodata_plugin::plugin::interface::{ServiceBox, ValidatorBox};
 use rmpv::Value;
 use serde::{Deserialize, Serialize};
+use std::cell::Cell;
 use std::collections::HashMap;
 use std::fmt::{Display, Formatter};
 use std::fs::File;
@@ -142,6 +144,8 @@ type Result<T, E = PluginError> = std::result::Result<T, E>;
 pub struct ServiceState {
     pub id: ServiceId,
 
+    background_job_shutdown_timeout: Cell<Option<Duration>>,
+
     /// The dynamic state of the service. It is guarded by a fiber mutex,
     /// because we don't want to enter the plugin callbacks from concurrent fibers.
     volatile_state: fiber::Mutex<ServiceStateVolatile>,
@@ -157,6 +161,7 @@ impl ServiceState {
     ) -> Self {
         Self {
             id,
+            background_job_shutdown_timeout: Cell::new(None),
             volatile_state: fiber::Mutex::new(ServiceStateVolatile {
                 inner,
                 config_validator,
@@ -359,43 +364,6 @@ impl Manifest {
     }
 }
 
-/// Events that may be fired at picodata
-/// and which plugins should respond to.
-#[derive(Clone, PartialEq, Debug)]
-pub enum PluginEvent<'a> {
-    /// Picodata instance goes online.
-    InstanceOnline,
-    /// Picodata instance shutdown (shutdown trigger is called).
-    InstanceShutdown,
-    /// New plugin load at instance.
-    PluginLoad {
-        ident: &'a PluginIdentifier,
-        service_defs: &'a [ServiceDef],
-    },
-    /// Error occurred while the plugin loaded.
-    PluginLoadError { name: &'a str },
-    /// Request for update service configuration received.
-    BeforeServiceConfigurationUpdated {
-        ident: &'a PluginIdentifier,
-        service: &'a str,
-        new_raw: &'a [u8],
-    },
-    /// Instance demote.
-    InstanceDemote,
-    /// Instance promote as a replicaset leader.
-    InstancePromote,
-    /// Plugin service enabled at instance.
-    ServiceEnabled {
-        ident: &'a PluginIdentifier,
-        service: &'a str,
-    },
-    /// Plugin service disabled at instance.
-    ServiceDisabled {
-        ident: &'a PluginIdentifier,
-        service: &'a str,
-    },
-}
-
 /// Events that may be fired at picodata
 /// and which plugins should respond to *asynchronously*.
 ///
@@ -1227,12 +1195,10 @@ pub fn change_config_atom(
 
             let new_cfg_raw =
                 rmp_serde::to_vec_named(&Value::Map(current_cfg)).expect("out of memory");
-            let event = PluginEvent::BeforeServiceConfigurationUpdated {
-                ident,
-                service,
-                new_raw: new_cfg_raw.as_slice(),
-            };
-            node.plugin_manager.handle_event_sync(event)?;
+
+            node.plugin_manager
+                .handle_before_service_reconfigured(ident, service, &new_cfg_raw)?;
+
             service_config_part.push((service.to_string(), kv));
         }
 
diff --git a/src/rpc/disable_service.rs b/src/rpc/disable_service.rs
index 3c2c25ab414b5a45108d17eb3523bafe81d346b3..4eebe347ac080e0e128fd96e828ac280e6a521f0 100644
--- a/src/rpc/disable_service.rs
+++ b/src/rpc/disable_service.rs
@@ -1,4 +1,3 @@
-use crate::plugin::PluginEvent;
 use crate::plugin::PluginOp;
 use crate::traft::error::Error;
 use crate::traft::node;
@@ -29,10 +28,7 @@ crate::define_rpc_request! {
         };
 
         // reaction at `ServiceDisabled` is idempotent, so no errors occurred
-        _ = node.plugin_manager.handle_event_sync(PluginEvent::ServiceDisabled {
-            ident: plugin,
-            service,
-        });
+        node.plugin_manager.handle_service_disabled(plugin, service);
 
         Ok(Response {})
     }
diff --git a/src/rpc/enable_all_plugins.rs b/src/rpc/enable_all_plugins.rs
index 1490dcea82af178b4b4db354ea6b95ed8ebc5a03..c6936045de73beef78db9d3ce42cff7d5039c799 100644
--- a/src/rpc/enable_all_plugins.rs
+++ b/src/rpc/enable_all_plugins.rs
@@ -1,4 +1,3 @@
-use crate::plugin::PluginEvent;
 use crate::tlog;
 use crate::traft::node;
 use crate::traft::{RaftIndex, RaftTerm};
@@ -20,7 +19,7 @@ crate::define_rpc_request! {
         node.wait_index(req.applied, req.timeout)?;
         node.status().check_term(req.term)?;
 
-        let result = node.plugin_manager.handle_event_sync(PluginEvent::InstanceOnline);
+        let result = node.plugin_manager.handle_instance_online();
         if let Err(e) = result {
             tlog!(Error, "failed initializing plugin system: {e}");
             return Err(e.into());
diff --git a/src/rpc/enable_service.rs b/src/rpc/enable_service.rs
index 80ea617bc23e65fab1ad236e1f79eed4de7ab881..f77d219f1dfc97deee23559e817366ebcfa32619 100644
--- a/src/rpc/enable_service.rs
+++ b/src/rpc/enable_service.rs
@@ -1,4 +1,3 @@
-use crate::plugin::PluginEvent;
 use crate::plugin::PluginOp;
 use crate::traft::error::Error;
 use crate::traft::node;
@@ -30,10 +29,7 @@ crate::define_rpc_request! {
             return Err(Error::other("found unexpected plugin operation expected AlterServiceTiers, found {plugin_op:?}"));
         };
 
-        node.plugin_manager.handle_event_sync(PluginEvent::ServiceEnabled {
-            ident: plugin,
-            service,
-        })?;
+        node.plugin_manager.handle_service_enabled(plugin, service)?;
 
         Ok(Response {})
     }
diff --git a/src/rpc/replication.rs b/src/rpc/replication.rs
index d51c6c223aa36c0529a5a2083c8e344cd1a41ff3..f2be37bd09ed65e00f2f14560224ffc31d5ab6de 100644
--- a/src/rpc/replication.rs
+++ b/src/rpc/replication.rs
@@ -37,7 +37,6 @@ use crate::governor;
 #[allow(unused_imports)]
 use crate::governor::plan;
 use crate::pico_service::pico_service_password;
-use crate::plugin::PluginEvent;
 #[allow(unused_imports)]
 use crate::rpc;
 use crate::schema::PICO_SERVICE_USER_NAME;
@@ -155,6 +154,8 @@ fn is_read_only() -> Result<bool> {
 ///
 /// Returns errors in the following cases: See implementation.
 fn promote_to_master() -> Result<()> {
+    let node = node::global()?;
+
     // XXX: Currently we just change the box.cfg.read_only option of the
     // instance but at some point we will implement support for
     // tarantool synchronous transactions then this operation will probably
@@ -175,9 +176,10 @@ fn promote_to_master() -> Result<()> {
 
     if was_read_only {
         // errors ignored because it must be already handled by plugin manager itself
-        _ = node::global()?
-            .plugin_manager
-            .handle_event_sync(PluginEvent::InstancePromote);
+        let res = node.plugin_manager.handle_rs_leader_change();
+        if let Err(e) = res {
+            tlog!(Error, "on_leader_change error: {e}");
+        }
     }
 
     Ok(())
@@ -202,7 +204,10 @@ crate::define_rpc_request! {
 
         if !was_read_only {
             // errors ignored because it must be already handled by plugin manager itself
-            _ = node::global()?.plugin_manager.handle_event_sync(PluginEvent::InstanceDemote);
+            let res = node.plugin_manager.handle_rs_leader_change();
+            if let Err(e) = res {
+                tlog!(Error, "on_leader_change error: {e}");
+            }
         }
 
         let vclock = Vclock::current();
diff --git a/src/traft/node.rs b/src/traft/node.rs
index d1d29f43b64de4de20485b9fa42dfa2468297c48..1d2c7fc56c5a6b541c2db387100b104231dfecf2 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -1331,7 +1331,7 @@ impl NodeImpl {
 
                 if let Err(e) = self
                     .plugin_manager
-                    .handle_event_async(PluginAsyncEvent::PluginDisabled { name: ident.name })
+                    .add_async_event_to_queue(PluginAsyncEvent::PluginDisabled { name: ident.name })
                 {
                     tlog!(Warning, "async plugin event: {e}");
                 }
@@ -1431,7 +1431,7 @@ impl NodeImpl {
                         let old_cfg_raw =
                             rmp_serde::encode::to_vec_named(&old_cfg).expect("out of memory");
 
-                        if let Err(e) = self.plugin_manager.handle_event_async(
+                        if let Err(e) = self.plugin_manager.add_async_event_to_queue(
                             PluginAsyncEvent::ServiceConfigurationUpdated {
                                 ident: ident.clone(),
                                 service: svc.name,
diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py
index 83e2feb8c75a9f5c0b1c2d154eb7032ffbed9b60..e1a62e4556a6ade6e93b4950d23caeef4cb841fc 100644
--- a/test/int/test_plugin.py
+++ b/test/int/test_plugin.py
@@ -2093,11 +2093,11 @@ def test_set_topology_with_error_on_start(cluster: Cluster):
 # -------------------------- RPC SDK tests -------------------------------------
 
 
-def make_context(override: dict[Any, Any] = {}) -> dict[Any, Any]:
+def make_context(override: dict[Any, Any] = {}, service=SERVICE_W_RPC) -> dict[Any, Any]:
     context = {
         REQUEST_ID: uuid.uuid4(),
         PLUGIN_NAME: _PLUGIN_W_SDK,
-        SERVICE_NAME: SERVICE_W_RPC,
+        SERVICE_NAME: service,
         PLUGIN_VERSION: _PLUGIN_VERSION_1,
         "timeout": 5.0,
     }
@@ -2762,28 +2762,36 @@ def test_sdk_metrics(instance: Instance):
 def test_sdk_background(cluster: Cluster):
     [i1] = cluster.deploy(instance_count=1)
 
+    plugin = _PLUGIN_W_SDK
+    [service] = _PLUGIN_W_SDK_SERVICES
+
     install_and_enable_plugin(
         i1,
-        _PLUGIN_W_SDK,
-        _PLUGIN_W_SDK_SERVICES,
+        plugin,
+        [service],
         migrate=True,
         default_config={"test_type": "background"},
     )
 
+    # Run internal tests
+    context = make_context(service=service)
+    i1.call(".proc_rpc_dispatch", "/test_cancel_tagged_basic", b"", context)
+    i1.call(".proc_rpc_dispatch", "/test_cancel_tagged_timeout", b"", context)
+
     # assert that job is working
     Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_running", i1)
 
     # assert that job ends after plugin disabled
-    i1.call("pico.disable_plugin", _PLUGIN_W_SDK, _PLUGIN_VERSION_1)
+    i1.call("pico.disable_plugin", plugin, "0.1.0")
 
     Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_stopped", i1)
 
     # run again
-    i1.call("pico.enable_plugin", _PLUGIN_W_SDK, _PLUGIN_VERSION_1)
+    i1.call("pico.enable_plugin", plugin, "0.1.0")
     Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_running", i1)
 
     # now shutdown 1 and check that job ended
-    i1.sql(f"ALTER PLUGIN {_PLUGIN_W_SDK} {_PLUGIN_VERSION_1} SET    {_PLUGIN_W_SDK_SERVICES[0]}.test_type = 'no_test'")
+    i1.sql(f"ALTER PLUGIN {plugin} 0.1.0 SET {service}.test_type = 'no_test'")
 
     i1.restart()
     i1.wait_online()
diff --git a/test/testplug/src/lib.rs b/test/testplug/src/lib.rs
index 581d50794ae8a96febe3b30ffa44c79eb16c126b..975ebadbce72930f0f61912450cdfce100de6242 100644
--- a/test/testplug/src/lib.rs
+++ b/test/testplug/src/lib.rs
@@ -422,8 +422,32 @@ impl Service for Service3 {
                     save_persisted_data("background_job_stopped");
                 }
 
-                let wm = ctx.worker_manager();
-                wm.register_job(my_job).unwrap();
+                ctx.register_job(my_job).unwrap();
+
+                ctx.set_jobs_shutdown_timeout(Duration::from_secs(3));
+
+                ctx.cancel_tagged_jobs("no-such-tag", Duration::from_secs(10))
+                    .unwrap();
+
+                let service = ctx.make_service_id();
+                rpc::RouteBuilder::from(ctx)
+                    .path("/test_cancel_tagged_basic")
+                    .register(move |_, _| {
+                        background_tests::test_cancel_tagged_basic(&service);
+
+                        Ok(rpc::Response::empty())
+                    })
+                    .unwrap();
+
+                let service = ctx.make_service_id();
+                rpc::RouteBuilder::from(ctx)
+                    .path("/test_cancel_tagged_timeout")
+                    .register(move |_, _| {
+                        background_tests::test_cancel_tagged_timeout(&service);
+
+                        Ok(rpc::Response::empty())
+                    })
+                    .unwrap();
             }
             "no_test" => {}
             "metrics" => {
@@ -461,6 +485,71 @@ impl Service for Service3 {
     }
 }
 
+mod background_tests {
+    use picodata_plugin::background;
+    use picodata_plugin::plugin::interface::ServiceId;
+    use picodata_plugin::system::tarantool::fiber;
+    use std::sync::atomic::{AtomicU64, Ordering};
+    use std::time::Duration;
+
+    static _1_MS: Duration = Duration::from_millis(1);
+    static _10_MS: Duration = Duration::from_millis(10);
+    static _100_MS: Duration = Duration::from_millis(100);
+    static _200_MS: Duration = Duration::from_millis(200);
+
+    fn make_job(
+        counter: &'static AtomicU64,
+        iteration_duration: Duration,
+    ) -> impl Fn(background::CancellationToken) {
+        move |cancel_token: background::CancellationToken| {
+            while cancel_token.wait_timeout(_1_MS).is_err() {
+                counter.fetch_add(1, Ordering::SeqCst);
+                fiber::sleep(iteration_duration);
+            }
+            counter.store(0, Ordering::SeqCst);
+        }
+    }
+
+    pub fn test_cancel_tagged_basic(service: &ServiceId) {
+        static COUNTER_1: AtomicU64 = AtomicU64::new(0);
+        static COUNTER_2: AtomicU64 = AtomicU64::new(0);
+
+        background::register_tagged_job(service, make_job(&COUNTER_1, _1_MS), "j1").unwrap();
+        background::register_tagged_job(service, make_job(&COUNTER_1, _1_MS), "j1").unwrap();
+        background::register_tagged_job(service, make_job(&COUNTER_2, _1_MS), "j2").unwrap();
+
+        fiber::sleep(_10_MS);
+        assert!(COUNTER_1.load(Ordering::SeqCst) > 0);
+        assert!(COUNTER_2.load(Ordering::SeqCst) > 0);
+
+        background::cancel_jobs_by_tag(service, "j1", _10_MS).unwrap();
+
+        assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0);
+        assert_ne!(COUNTER_2.load(Ordering::SeqCst), 0);
+
+        background::cancel_jobs_by_tag(service, "j2", _10_MS).unwrap();
+
+        assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0);
+        assert_eq!(COUNTER_2.load(Ordering::SeqCst), 0);
+    }
+
+    pub fn test_cancel_tagged_timeout(service: &ServiceId) {
+        static COUNTER: AtomicU64 = AtomicU64::new(0);
+
+        let job_tag = "my tag";
+        // this job cannot stop at 10ms interval
+        background::register_tagged_job(service, make_job(&COUNTER, _100_MS), job_tag).unwrap();
+        // but this job can
+        background::register_tagged_job(service, make_job(&COUNTER, _1_MS), job_tag).unwrap();
+
+        fiber::sleep(_10_MS);
+
+        let result = background::cancel_jobs_by_tag(service, job_tag, _10_MS).unwrap();
+        assert_eq!(result.n_total, 2);
+        assert_eq!(result.n_timeouts, 1);
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 /// ServiceWithRpcTests
 ////////////////////////////////////////////////////////////////////////////////