diff --git a/Cargo.lock b/Cargo.lock index 23ca38d290b218c36ef227df97300dab0096888f..95bc856a51dfd47c4aa0ee3dffab89da13a31f7c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2769,6 +2769,7 @@ dependencies = [ "rmpv", "serde", "serde_json", + "smol_str", "tarantool 6.0.0", "tarolog 0.2.1", "thiserror", diff --git a/exports_picodata b/exports_picodata index 6ab7d8aba7147aa7ea3f4070dc876d2f11eb9907..b9d905e46364411abdad35d770eaa848e1fcf49f 100644 --- a/exports_picodata +++ b/exports_picodata @@ -1,3 +1,6 @@ +pico_ffi_background_cancel_jobs_by_tag +pico_ffi_background_register_job_cancellation_token +pico_ffi_background_set_jobs_shutdown_timeout pico_ffi_cas pico_ffi_instance_info pico_ffi_raft_info diff --git a/picodata-plugin/Cargo.toml b/picodata-plugin/Cargo.toml index 651f8ff74ea3a48dd8dbb325919ca1aa37fa115f..73b4628067780263d8ce9a13ebc704a578113579 100644 --- a/picodata-plugin/Cargo.toml +++ b/picodata-plugin/Cargo.toml @@ -17,6 +17,7 @@ rmp = "0.8" rmp-serde = "1.1" rmpv = { version = "1.0", features = ["with-serde"] } serde = "1.0" +smol_str = "0.2" tarantool = { path = "../tarantool/tarantool", version = "6.0", features = ["picodata", "test", "tokio_components"] } tarolog = "0.2.1" thiserror = "1.0" diff --git a/picodata-plugin/src/background.rs b/picodata-plugin/src/background.rs index ae7670d7d8f64f591c5d17f9eb7d72a9f3a52af7..0ce0382f310420cfc6796a53c235b9d72b7ea7b7 100644 --- a/picodata-plugin/src/background.rs +++ b/picodata-plugin/src/background.rs @@ -1,18 +1,159 @@ //! Background container for long live jobs. //! Background container guarantees job liveness across plugin life cycle. +use crate::internal::ffi; +#[allow(unused_imports)] +use crate::plugin::interface::PicoContext; use crate::plugin::interface::ServiceId; -use crate::system::tarantool::fiber; -use std::collections::HashMap; -use std::mem; -use std::rc::Rc; -use std::string::ToString; -use std::sync::OnceLock; +use crate::util::tarantool_error_to_box_error; +use crate::util::DisplayErrorLocation; +use std::cell::Cell; use std::time::Duration; +use tarantool::error::BoxError; +use tarantool::error::TarantoolErrorCode; +use tarantool::fiber; +use tarantool::fiber::FiberId; use tarantool::fiber::{Channel, RecvError}; -use tarantool::time::Instant; use tarantool::util::IntoClones; +/// Same as [`PicoContext::register_job`]. +#[track_caller] +pub fn register_job<F>(service_id: &ServiceId, job: F) -> Result<(), BoxError> +where + F: FnOnce(CancellationToken) + 'static, +{ + let loc = std::panic::Location::caller(); + let tag = format!("{}:{}", loc.file(), loc.line()); + + register_tagged_job(service_id, job, &tag) +} + +/// Same as [`PicoContext::register_tagged_job`]. +#[allow(deprecated)] +pub fn register_tagged_job<F>(service_id: &ServiceId, job: F, tag: &str) -> Result<(), BoxError> +where + F: FnOnce(CancellationToken) + 'static, +{ + let (token, handle) = CancellationToken::new(); + let finish_channel = handle.finish_channel.clone(); + + let fiber_id = fiber::Builder::new() + .name(tag) + .func(move || { + job(token); + // send shutdown signal to the waiter side + _ = finish_channel.send(()); + }) + .start_non_joinable() + .map_err(tarantool_error_to_box_error)?; + + let plugin = &service_id.plugin; + let service = &service_id.service; + let version = &service_id.version; + + let token = FfiBackgroundJobCancellationToken::new( + fiber_id, + handle.cancel_channel, + handle.finish_channel, + ); + register_background_job_cancellation_token(plugin, service, version, tag, token)?; + + Ok(()) +} + +fn register_background_job_cancellation_token( + plugin: &str, + service: &str, + version: &str, + job_tag: &str, + token: FfiBackgroundJobCancellationToken, +) -> Result<(), BoxError> { + // SAFETY: safe as long as picodata version is compatible + let rc = unsafe { + ffi::pico_ffi_background_register_job_cancellation_token( + plugin.into(), + service.into(), + version.into(), + job_tag.into(), + token, + ) + }; + + if rc != 0 { + return Err(BoxError::last()); + } + + Ok(()) +} + +//////////////////////////////////////////////////////////////////////////////// +// cancel_jobs_by_tag +//////////////////////////////////////////////////////////////////////////////// + +/// Outcome of the request to cancel a set of background jobs. +#[derive(Clone, Copy, Default, Debug, PartialEq, Eq, Hash)] +#[repr(C)] +pub struct JobCancellationResult { + /// Attempted to cancel this many background jobs. + pub n_total: u32, + /// This many jobs didn't finish in time. + pub n_timeouts: u32, +} + +impl JobCancellationResult { + #[inline(always)] + pub fn new(n_total: u32, n_timeouts: u32) -> Self { + Self { + n_total, + n_timeouts, + } + } +} + +/// Same as [`PicoContext::cancel_tagged_jobs`]. +#[inline(always)] +pub fn cancel_jobs_by_tag( + service_id: &ServiceId, + job_tag: &str, + timeout: Duration, +) -> Result<JobCancellationResult, BoxError> { + cancel_background_jobs_by_tag_inner( + &service_id.plugin, + &service_id.service, + &service_id.version, + job_tag, + timeout, + ) +} + +/// Same as [`PicoContext::cancel_tagged_jobs`]. +pub fn cancel_background_jobs_by_tag_inner( + plugin: &str, + service: &str, + version: &str, + job_tag: &str, + timeout: Duration, +) -> Result<JobCancellationResult, BoxError> { + let mut result = JobCancellationResult::default(); + // SAFETY: safe as long as picodata version is compatible + let rc = unsafe { + ffi::pico_ffi_background_cancel_jobs_by_tag( + plugin.into(), + service.into(), + version.into(), + job_tag.into(), + timeout.as_secs_f64(), + &mut result, + ) + }; + + if rc != 0 { + return Err(BoxError::last()); + } + + Ok(result) +} + #[derive(thiserror::Error, Debug)] pub enum Error { #[error("Some of jobs are not fully completed, expected: {0}, completed: {1}")] @@ -21,6 +162,10 @@ pub enum Error { CancellationTimeout, } +//////////////////////////////////////////////////////////////////////////////// +// CancellationToken +//////////////////////////////////////////////////////////////////////////////// + /// A token which can be used to signal a cancellation request to the job. #[non_exhaustive] #[derive(Debug)] @@ -32,6 +177,7 @@ impl CancellationToken { /// Create a cancellation token and cancellation token handle pair. /// User should use cancellation token for graceful shutdown their job. /// Cancellation token handle used by `picodata` for sending cancel signal to a user job. + #[allow(deprecated)] pub fn new() -> (CancellationToken, CancellationTokenHandle) { let (cancel_tx, cancel_rx) = Channel::new(1).into_clones(); ( @@ -63,11 +209,13 @@ impl CancellationToken { } #[derive(Debug)] +#[deprecated = "don't use this"] pub struct CancellationTokenHandle { cancel_channel: Channel<()>, finish_channel: Channel<()>, } +#[allow(deprecated)] impl CancellationTokenHandle { /// Cancel related job and return a backpressure channel. /// Caller should wait a message in the backpressure channel @@ -82,62 +230,22 @@ impl CancellationTokenHandle { } } -type Tag = String; -type TaggedJobs = HashMap<Tag, Vec<CancellationTokenHandle>>; -type UnTaggedJobs = Vec<CancellationTokenHandle>; - -fn cancel_handles(handles: Vec<CancellationTokenHandle>, deadline: Instant) -> usize { - let mut shutdown_channels = Vec::with_capacity(handles.len()); - - for handle in handles { - let shutdown_channel = handle.cancel(); - shutdown_channels.push(shutdown_channel); - } - - let mut completed_counter = 0; - for sd_chan in shutdown_channels { - // recalculate timeout at every iteration - let timeout = deadline.duration_since(Instant::now_fiber()); - - if sd_chan.recv_timeout(timeout).is_ok() { - completed_counter += 1; - } - } - - completed_counter -} +//////////////////////////////////////////////////////////////////////////////// +// Java-style API +//////////////////////////////////////////////////////////////////////////////// /// [`ServiceWorkerManager`] allows plugin services /// to create long-live jobs and manage their life cycle. #[derive(Clone, Debug)] #[non_exhaustive] pub struct ServiceWorkerManager { - jobs: Rc<fiber::Mutex<(TaggedJobs, UnTaggedJobs)>>, - shutdown_timeout: Rc<fiber::Mutex<Duration>>, + service_id: ServiceId, } impl ServiceWorkerManager { - fn register_job_inner<F>(&self, job: F, maybe_tag: Option<&str>) -> tarantool::Result<()> - where - F: FnOnce(CancellationToken) + 'static, - { - let (token, handle) = CancellationToken::new(); - let finish_chan = handle.finish_channel.clone(); - let mut jobs = self.jobs.lock(); - if let Some(tag) = maybe_tag { - jobs.0.entry(tag.to_string()).or_default().push(handle); - } else { - jobs.1.push(handle); - } - - fiber::Builder::new() - .func(move || { - job(token); - // send shutdown signal to the waiter side - _ = finish_chan.send(()); - }) - .start_non_joinable()?; - Ok(()) + #[inline(always)] + pub(crate) fn new(service_id: ServiceId) -> Self { + Self { service_id } } /// Add a new job to the execution. @@ -154,11 +262,10 @@ impl ServiceWorkerManager { /// /// ```no_run /// use std::time::Duration; - /// use picodata_plugin::background::{CancellationToken, InternalGlobalWorkerManager, ServiceWorkerManager}; - /// use picodata_plugin::plugin::interface::ServiceId; + /// use picodata_plugin::background::CancellationToken; /// - /// # let worker_manager = InternalGlobalWorkerManager::instance() - /// # .get_or_init_manager(ServiceId::new("any_plugin", "any_service", "0.1.0")); + /// # use picodata_plugin::background::ServiceWorkerManager; + /// # fn test(worker_manager: ServiceWorkerManager) { /// /// // this job will print "hello" every second, /// // and print "bye" after being canceled @@ -169,12 +276,18 @@ impl ServiceWorkerManager { /// println!("job cancelled, bye!") /// } /// worker_manager.register_job(hello_printer).unwrap(); + /// + /// # } /// ``` + #[track_caller] + #[inline(always)] pub fn register_job<F>(&self, job: F) -> tarantool::Result<()> where F: FnOnce(CancellationToken) + 'static, { - self.register_job_inner(job, None) + register_job(&self.service_id, job)?; + + Ok(()) } /// Same as [`ServiceWorkerManager::register_job`] but caller may provide a special tag. @@ -184,11 +297,14 @@ impl ServiceWorkerManager { /// /// * `job`: callback that will be executed in separated fiber /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs + #[inline(always)] pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> tarantool::Result<()> where F: FnOnce(CancellationToken) + 'static, { - self.register_job_inner(job, Some(tag)) + register_tagged_job(&self.service_id, job, tag)?; + + Ok(()) } /// Cancel all jobs related to the given tag. @@ -201,16 +317,19 @@ impl ServiceWorkerManager { /// * `tag`: determine what jobs should be cancelled /// * `timeout`: shutdown timeout pub fn cancel_tagged(&self, tag: &str, timeout: Duration) -> Result<(), Error> { - let deadline = fiber::clock().saturating_add(timeout); - - let mut jobs = self.jobs.lock(); - let handles = jobs.0.remove(tag).unwrap_or_default(); - - let job_count = handles.len(); + let res = cancel_jobs_by_tag(&self.service_id, tag, timeout); + let res = match res { + Ok(res) => res, + Err(e) => { + let loc = DisplayErrorLocation(&e); + tarantool::say_error!("unexpected error: {loc}{e}"); + return Ok(()); + } + }; - let completed_counter = cancel_handles(handles, deadline); - if job_count != completed_counter { - return Err(Error::PartialCompleted(job_count, completed_counter)); + if res.n_timeouts != 0 { + let n_completed = res.n_total - res.n_timeouts; + return Err(Error::PartialCompleted(res.n_total as _, n_completed as _)); } Ok(()) @@ -221,188 +340,221 @@ impl ServiceWorkerManager { /// jobs gracefully end. /// /// By default, 5-second timeout are used. + #[deprecated = "use `PicoContext::set_jobs_shutdown_timeout` instead"] pub fn set_shutdown_timeout(&self, timeout: Duration) { - *self.shutdown_timeout.lock() = timeout; + let plugin = &self.service_id.plugin; + let service = &self.service_id.service; + let version = &self.service_id.version; + set_jobs_shutdown_timeout(plugin, service, version, timeout) } } -/// This component is using by `picodata` for manage all worker managers. +//////////////////////////////////////////////////////////////////////////////// +// set_background_jobs_shutdown_timeout +//////////////////////////////////////////////////////////////////////////////// + +/// In case when jobs were canceled by `picodata` use this function to determine +/// a shutdown timeout - time duration that `picodata` uses to ensure that all +/// jobs gracefully end. +/// +/// By default, 5-second timeout are used. /// -/// *For internal usage, don't use it in your code*. -#[derive(Default)] -pub struct InternalGlobalWorkerManager { - managers: fiber::Mutex<HashMap<ServiceId, ServiceWorkerManager>>, +/// Consider using [`PicoContext::set_jobs_shutdown_timeout`] instead +pub fn set_jobs_shutdown_timeout(plugin: &str, service: &str, version: &str, timeout: Duration) { + // SAFETY: safe as long as picodata version is compatible + let rc = unsafe { + ffi::pico_ffi_background_set_jobs_shutdown_timeout( + plugin.into(), + service.into(), + version.into(), + timeout.as_secs_f64(), + ) + }; + debug_assert!( + rc == 0, + "return code is only for future compatibility at the moment" + ); } -// SAFETY: `GlobalWorkerManager` must be used only in the tx thread -unsafe impl Send for InternalGlobalWorkerManager {} -unsafe impl Sync for InternalGlobalWorkerManager {} - -static IGWM: OnceLock<InternalGlobalWorkerManager> = OnceLock::new(); - -impl InternalGlobalWorkerManager { - fn get_or_insert_worker_manager(&self, service_id: ServiceId) -> ServiceWorkerManager { - let mut managers = self.managers.lock(); - match managers.get(&service_id) { - None => { - let mgr = ServiceWorkerManager { - jobs: Rc::new(Default::default()), - shutdown_timeout: Rc::new(fiber::Mutex::new(Duration::from_secs(5))), - }; - managers.insert(service_id, mgr.clone()); - mgr - } - Some(mgr) => mgr.clone(), - } - } +//////////////////////////////////////////////////////////////////////////////// +// CancellationCallbackState +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug)] +pub struct CancellationCallbackState { + cancel_channel: Channel<()>, + finish_channel: Channel<()>, + status: Cell<CancellationCallbackStatus>, +} - fn remove_plugin_worker_manager( - &self, - service_id: &ServiceId, - timeout: Duration, - ) -> Result<(), Error> { - let deadline = fiber::clock().saturating_add(timeout); +#[derive(Copy, Clone, Debug, Default, PartialEq, Eq, Hash)] +pub enum CancellationCallbackStatus { + #[default] + Initial = 0, + JobCancelled = 1, + JobFinished = 2, +} - let wm = self.managers.lock().remove(service_id); +impl CancellationCallbackState { + fn new(cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self { + Self { + cancel_channel, + finish_channel, + status: Cell::new(CancellationCallbackStatus::Initial), + } + } - // drain all jobs manually cause user may have shared references to worker manager - if let Some(wm) = wm { - let mut jobs = wm.jobs.lock(); - let (tagged_jobs, untagged_jobs) = mem::take(&mut *jobs); + fn cancellation_callback(&self, action: u64, timeout: Duration) -> Result<(), BoxError> { + use CancellationCallbackStatus::*; + let next_status = action; - let mut job_counter = 0; - let mut completed_job_counter = 0; + if next_status == JobCancelled as u64 { + debug_assert_eq!(self.status.get(), Initial); + _ = self.cancel_channel.send(()); - for (_, handles) in tagged_jobs { - job_counter += handles.len(); - completed_job_counter += cancel_handles(handles, deadline); - } + self.status.set(JobCancelled); + } else if next_status == JobFinished as u64 { + debug_assert_eq!(self.status.get(), JobCancelled); + self.finish_channel.recv_timeout(timeout).map_err(|e| { + BoxError::new(TarantoolErrorCode::Timeout, i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string(e)) + })?; - job_counter += untagged_jobs.len(); - completed_job_counter += cancel_handles(untagged_jobs, deadline); - if job_counter != completed_job_counter { - return Err(Error::PartialCompleted(job_counter, completed_job_counter)); - } + self.status.set(JobFinished); + } else { + return Err(BoxError::new( + TarantoolErrorCode::IllegalParams, + format!("unexpected action: {action}"), + )); } + Ok(()) } +} - /// Create a new worker manager for given `id` or return existed. - pub fn get_or_init_manager(&self, id: ServiceId) -> ServiceWorkerManager { - self.get_or_insert_worker_manager(id) +#[inline] +fn i_wish_this_was_simpler_and_im_sad_that_i_have_created_this_problem_for_my_self_recv_error_to_string( + e: fiber::channel::RecvError, +) -> &'static str { + match e { + fiber::channel::RecvError::Timeout => "timeout", + fiber::channel::RecvError::Disconnected => "disconnected", } +} - /// Return preferred shutdown timeout for worker manager jobs. - pub fn get_shutdown_timeout(&self, service_id: &ServiceId) -> Option<Duration> { - self.managers - .lock() - .get(service_id) - .map(|mgr| *mgr.shutdown_timeout.lock()) - } +//////////////////////////////////////////////////////////////////////////////// +// ffi wrappers +//////////////////////////////////////////////////////////////////////////////// - /// Remove worker manager by `id` with all jobs. - /// This function return after all related jobs will be gracefully shutdown or - /// after `timeout` duration (with [`Error::PartialCompleted`] error. - pub fn unregister_service(&self, id: &ServiceId, timeout: Duration) -> Result<(), Error> { - self.remove_plugin_worker_manager(id, timeout) - } +/// **For internal use**. +#[repr(C)] +pub struct FfiBackgroundJobCancellationToken { + /// This is just a way to do arbitrarily complex things across ABI boundary + /// without introducing a large amount of FFI-safe wrappers. This will + /// always be [`CancellationCallbackState::cancellation_callback`]. + callback: extern "C-unwind" fn(data: *const Self, action: u64, timeout: f64) -> i32, + drop: extern "C-unwind" fn(*mut Self), - /// Return reference to global internal worker manager. - pub fn instance() -> &'static Self { - let igwm_ref = IGWM.get_or_init(InternalGlobalWorkerManager::default); - igwm_ref - } + /// The pointer to the closure object. + closure_pointer: *mut (), + + /// This is the background job fiber which will be cancelled by this token. + pub fiber_id: FiberId, } -#[cfg(feature = "internal_test")] -mod tests { - use super::*; - use std::sync::atomic::{AtomicU64, Ordering}; - use std::time::Duration; - use tarantool::fiber; - - static _1_MS: Duration = Duration::from_millis(1); - static _10_MS: Duration = Duration::from_millis(10); - static _100_MS: Duration = Duration::from_millis(100); - static _200_MS: Duration = Duration::from_millis(200); - - fn make_job( - counter: &'static AtomicU64, - iteration_duration: Duration, - ) -> impl Fn(CancellationToken) { - move |cancel_token: CancellationToken| { - while cancel_token.wait_timeout(_1_MS).is_err() { - counter.fetch_add(1, Ordering::SeqCst); - fiber::sleep(iteration_duration); - } - counter.store(0, Ordering::SeqCst); - } +impl Drop for FfiBackgroundJobCancellationToken { + #[inline(always)] + fn drop(&mut self) { + (self.drop)(self) } +} - #[::tarantool::test] - fn test_work_manager_works() { - static COUNTER: AtomicU64 = AtomicU64::new(0); - - let gwm = InternalGlobalWorkerManager::instance(); - let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0"); - let wm = gwm.get_or_init_manager(svc_id.clone()); - - wm.register_job(make_job(&COUNTER, _1_MS)).unwrap(); - wm.register_job(make_job(&COUNTER, _1_MS)).unwrap(); +impl FfiBackgroundJobCancellationToken { + fn new(fiber_id: FiberId, cancel_channel: Channel<()>, finish_channel: Channel<()>) -> Self { + let callback_state = CancellationCallbackState::new(cancel_channel, finish_channel); + let callback = move |action, timeout| { + let res = callback_state.cancellation_callback(action, timeout); + if let Err(e) = res { + e.set_last(); + return -1; + } - fiber::sleep(_10_MS); - assert!(COUNTER.load(Ordering::SeqCst) > 0); + 0 + }; - gwm.unregister_service(&svc_id, _100_MS).unwrap(); - assert_eq!(COUNTER.load(Ordering::SeqCst), 0); + Self::new_inner(fiber_id, callback) } - #[::tarantool::test] - fn test_work_manager_tagged_jobs_works() { - static COUNTER_1: AtomicU64 = AtomicU64::new(0); - static COUNTER_2: AtomicU64 = AtomicU64::new(0); + /// This function is needed, because we need this `F` type parameter so that + /// we can specialize the `callback` and `drop` with it inside this function. + /// If rust supported something like `type F = type_of(callback);` we + /// wouldn't need this additional function and would just write this code in + /// the [`Self::new`] above. + // FIXME: just define an explicit extern "C" fn for cancellation_callback? + fn new_inner<F>(fiber_id: FiberId, f: F) -> Self + where + F: FnMut(u64, Duration) -> i32, + { + let closure = Box::new(f); + let closure_pointer: *mut F = Box::into_raw(closure); - let gwm = InternalGlobalWorkerManager::instance(); - let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0"); - let wm = gwm.get_or_init_manager(svc_id.clone()); + Self { + callback: Self::trampoline::<F>, + drop: Self::drop_handler::<F>, + closure_pointer: closure_pointer.cast(), - wm.register_tagged_job(make_job(&COUNTER_1, _1_MS), "j1") - .unwrap(); - wm.register_tagged_job(make_job(&COUNTER_1, _1_MS), "j1") - .unwrap(); - wm.register_tagged_job(make_job(&COUNTER_2, _1_MS), "j2") - .unwrap(); + fiber_id, + } + } - fiber::sleep(_10_MS); - assert!(COUNTER_1.load(Ordering::SeqCst) > 0); - assert!(COUNTER_2.load(Ordering::SeqCst) > 0); + /// An ABI-safe wrapper which calls the rust closure stored in `handler`. + /// + /// The result of the closure is copied onto the fiber's region allocation + /// and the pointer to that allocation is written into `output`. + extern "C-unwind" fn trampoline<F>(data: *const Self, action: u64, timeout: f64) -> i32 + where + F: FnMut(u64, Duration) -> i32, + { + // This is safe. To verify see `register_rpc_handler` above. + let closure_pointer: *mut F = unsafe { (*data).closure_pointer.cast::<F>() }; + let closure = unsafe { &mut *closure_pointer }; - wm.cancel_tagged("j1", _10_MS).unwrap(); + closure(action, Duration::from_secs_f64(timeout)) + } - assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0); - assert_ne!(COUNTER_2.load(Ordering::SeqCst), 0); + extern "C-unwind" fn drop_handler<F>(handler: *mut Self) { + unsafe { + let closure_pointer: *mut F = (*handler).closure_pointer.cast::<F>(); + let closure = Box::from_raw(closure_pointer); + drop(closure); - gwm.unregister_service(&svc_id, _10_MS).unwrap(); + if cfg!(debug_assertions) { + // Overwrite the pointer with garbage so that we fail loudly is case of a bug + (*handler).closure_pointer = 0xcccccccccccccccc_u64 as _; + } + } + } - assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0); - assert_eq!(COUNTER_2.load(Ordering::SeqCst), 0); + /// The error is returned via [`BoxError::set_last`]. + #[inline(always)] + pub fn cancel_job(&self) { + let rc = (self.callback)(self, CancellationCallbackStatus::JobCancelled as _, 0.0); + debug_assert!(rc == 0); } - #[::tarantool::test] - fn test_work_manager_graceful_shutdown() { - static COUNTER: AtomicU64 = AtomicU64::new(0); - - let gwm = InternalGlobalWorkerManager::instance(); - let svc_id = ServiceId::new("plugin_x", "svc_x", "0.1.0"); - let wm = gwm.get_or_init_manager(svc_id.clone()); - // this job cannot stop at 10ms interval - wm.register_job(make_job(&COUNTER, _100_MS)).unwrap(); - // but this job can - wm.register_job(make_job(&COUNTER, _1_MS)).unwrap(); - fiber::sleep(_10_MS); - let result = gwm.unregister_service(&svc_id, _10_MS); - assert!( - matches!(result, Err(Error::PartialCompleted(all, completed)) if all == 2 && completed == 1) + /// The error is returned via [`BoxError::set_last`]. + #[inline(always)] + pub fn wait_job_finished(&self, timeout: Duration) -> Result<(), ()> { + let rc = (self.callback)( + self, + CancellationCallbackStatus::JobFinished as _, + timeout.as_secs_f64(), ); + if rc == -1 { + // Actual error is passed through tarantool. Can't return BoxError + // here, because tarantool-module version may be different in picodata. + return Err(()); + } + + Ok(()) } } diff --git a/picodata-plugin/src/internal/ffi.rs b/picodata-plugin/src/internal/ffi.rs index d2e3ff290b44cb1249a6b5f6c3d85240c32d79e4..6fd129892aa48ef0753abe077dd21772e23d76af 100644 --- a/picodata-plugin/src/internal/ffi.rs +++ b/picodata-plugin/src/internal/ffi.rs @@ -1,9 +1,12 @@ +use crate::background::FfiBackgroundJobCancellationToken; +use crate::background::JobCancellationResult; use crate::internal::types; use crate::metrics::FfiMetricsHandler; use crate::sql::types::SqlValue; use crate::transport::rpc::client::FfiSafeRpcRequestArguments; use crate::transport::rpc::server::FfiRpcHandler; use crate::util::FfiSafeBytes; +use crate::util::FfiSafeStr; use abi_stable::derive_macro_reexports::{ROption, RResult}; use abi_stable::std_types::{RDuration, RVec}; use abi_stable::RTuple; @@ -52,4 +55,28 @@ extern "C" { ) -> i32; pub fn pico_ffi_register_metrics_handler(handler: FfiMetricsHandler) -> i32; + + pub fn pico_ffi_background_register_job_cancellation_token( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + job_tag: FfiSafeStr, + token: FfiBackgroundJobCancellationToken, + ) -> i32; + + pub fn pico_ffi_background_cancel_jobs_by_tag( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + job_tag: FfiSafeStr, + timeout: f64, + result: *mut JobCancellationResult, + ) -> i32; + + pub fn pico_ffi_background_set_jobs_shutdown_timeout( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + timeout: f64, + ) -> i32; } diff --git a/picodata-plugin/src/plugin/interface.rs b/picodata-plugin/src/plugin/interface.rs index 208b0bdbefeff82de3a6b857d4829d9d61cccbfd..55d5cca9587e1fa17c9297793dd0654294bc9c27 100644 --- a/picodata-plugin/src/plugin/interface.rs +++ b/picodata-plugin/src/plugin/interface.rs @@ -1,13 +1,17 @@ -use crate::background::{InternalGlobalWorkerManager, ServiceWorkerManager}; -use crate::error_code::ErrorCode::PluginError; +use crate::background; +use crate::background::ServiceWorkerManager; +use crate::error_code::ErrorCode; use crate::util::FfiSafeStr; pub use abi_stable; use abi_stable::pmr::{RErr, RResult, RSlice}; use abi_stable::std_types::{RBox, RHashMap, ROk, RString, RVec}; use abi_stable::{sabi_trait, RTuple, StableAbi}; use serde::de::DeserializeOwned; +use smol_str::SmolStr; use std::error::Error; use std::fmt::Display; +use std::time::Duration; +use tarantool::error::TarantoolErrorCode; use tarantool::error::{BoxError, IntoBoxError}; /// Context of current instance. Produced by picodata. @@ -15,7 +19,6 @@ use tarantool::error::{BoxError, IntoBoxError}; #[derive(StableAbi, Debug)] pub struct PicoContext { is_master: bool, - global_wm: *const (), pub plugin_name: FfiSafeStr, pub service_name: FfiSafeStr, pub plugin_version: FfiSafeStr, @@ -24,12 +27,8 @@ pub struct PicoContext { impl PicoContext { #[inline] pub fn new(is_master: bool) -> PicoContext { - let gwm = InternalGlobalWorkerManager::instance() as *const InternalGlobalWorkerManager - as *const (); - Self { is_master, - global_wm: gwm, plugin_name: "<unset>".into(), service_name: "<unset>".into(), plugin_version: "<unset>".into(), @@ -42,7 +41,6 @@ impl PicoContext { pub unsafe fn clone(&self) -> Self { Self { is_master: self.is_master, - global_wm: self.global_wm, plugin_name: self.plugin_name.clone(), service_name: self.service_name.clone(), plugin_version: self.plugin_version.clone(), @@ -56,23 +54,119 @@ impl PicoContext { } /// Return [`ServiceWorkerManager`] for current service. + #[deprecated = "use `register_job`, `register_tagged_job` or `cancel_background_jobs_by_tag` directly instead"] pub fn worker_manager(&self) -> ServiceWorkerManager { - let global_manager: &'static InternalGlobalWorkerManager = - // SAFETY: `picodata` guaranty that this reference live enough - unsafe { &*(self.global_wm as *const InternalGlobalWorkerManager) }; + ServiceWorkerManager::new(self.make_service_id()) + } + + // TODO: + // pub fn register_job(&self) -> ServiceWorkerManager { + // pub fn register_tagged_job(&self) -> ServiceWorkerManager { + // pub fn cancel_job_by_tag(&self) -> ServiceWorkerManager { + + #[inline(always)] + pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> { + crate::metrics::register_metrics_handler(self, callback) + } + + /// Add a new job to the execution. + /// Job work life cycle will be tied to the service life cycle; + /// this means that job will be canceled just before service is stopped. + /// + /// # Arguments + /// + /// * `job`: callback that will be executed in separated fiber. + /// Note that it is your responsibility to organize job graceful shutdown, see a + /// [`background::CancellationToken`] for details. + /// + /// # Examples + /// + /// ```no_run + /// use std::time::Duration; + /// use picodata_plugin::background::CancellationToken; + /// + /// # use picodata_plugin::plugin::interface::PicoContext; + /// # fn on_start(context: PicoContext) { + /// + /// // this job will print "hello" every second, + /// // and print "bye" after being canceled + /// fn hello_printer(cancel: CancellationToken) { + /// while cancel.wait_timeout(Duration::from_secs(1)).is_err() { + /// println!("hello!"); + /// } + /// println!("job cancelled, bye!") + /// } + /// context.register_job(hello_printer).unwrap(); + /// + /// # } + /// ``` + #[inline(always)] + pub fn register_job<F>(&self, job: F) -> Result<(), BoxError> + where + F: FnOnce(background::CancellationToken) + 'static, + { + background::register_job(&self.make_service_id(), job) + } - // TODO: can we eliminate allocation here? - let service_id = ServiceId::new( + /// Same as [`Self::register_job`] but caller may provide a special tag. + /// This tag may be used for manual job cancellation using [`Self::cancel_tagged_jobs`]. + /// + /// # Arguments + /// + /// * `job`: callback that will be executed in separated fiber + /// * `tag`: tag, that will be related to a job, single tag may be related to the multiple jobs + #[inline(always)] + pub fn register_tagged_job<F>(&self, job: F, tag: &str) -> Result<(), BoxError> + where + F: FnOnce(background::CancellationToken) + 'static, + { + background::register_tagged_job(&self.make_service_id(), job, tag) + } + + /// Cancel all jobs related to the given `tag`. + /// This function return after all related jobs will be gracefully shutdown or + /// after `timeout` duration. + /// + /// Returns error with code [`TarantoolErrorCode::Timeout`] in case some + /// jobs didn't finish within `timeout`. + /// + /// May also theoretically return error with code [`ErrorCode::NoSuchService`] + /// in case the service doesn't exist anymore (highly unlikely). + /// + /// See also [`Self::register_tagged_job`]. + #[inline(always)] + pub fn cancel_tagged_jobs(&self, tag: &str, timeout: Duration) -> Result<(), BoxError> { + let res = background::cancel_jobs_by_tag(&self.make_service_id(), tag, timeout)?; + if res.n_timeouts != 0 { + #[rustfmt::skip] + return Err(BoxError::new(TarantoolErrorCode::Timeout, format!("some background jobs didn't finish in time (expected: {}, timed out: {})", res.n_total, res.n_timeouts))); + } + + Ok(()) + } + + /// In case when jobs were canceled by `picodata` use this function for determine + /// a shutdown timeout - time duration that `picodata` uses to ensure that all + /// jobs gracefully end. + /// + /// By default, 5-second timeout are used. + #[inline(always)] + pub fn set_jobs_shutdown_timeout(&self, timeout: Duration) { + crate::background::set_jobs_shutdown_timeout( self.plugin_name(), self.service_name(), self.plugin_version(), - ); - global_manager.get_or_init_manager(service_id) + timeout, + ) } #[inline(always)] - pub fn register_metrics_callback(&self, callback: impl Fn() -> String) -> Result<(), BoxError> { - crate::metrics::register_metrics_handler(self, callback) + pub fn make_service_id(&self) -> ServiceId { + ServiceId::new( + self.plugin_name(), + self.service_name(), + self.plugin_version(), + ) } #[inline] @@ -97,9 +191,9 @@ impl PicoContext { /// Unique service identifier. #[derive(Debug, PartialEq, Eq, Hash, Clone)] pub struct ServiceId { - pub plugin: String, - pub service: String, - pub version: String, + pub plugin: SmolStr, + pub service: SmolStr, + pub version: SmolStr, } impl std::fmt::Display for ServiceId { @@ -112,9 +206,9 @@ impl std::fmt::Display for ServiceId { impl ServiceId { #[inline(always)] pub fn new( - plugin: impl Into<String>, - service: impl Into<String>, - version: impl Into<String>, + plugin: impl Into<SmolStr>, + service: impl Into<SmolStr>, + version: impl Into<SmolStr>, ) -> Self { Self { plugin: plugin.into(), @@ -298,7 +392,7 @@ impl<C: DeserializeOwned> ServiceProxy<C> { /// UAF can happen if user error points into memory allocated by dynamic lib and lives /// longer than dynamic lib memory (that was unmapped by system). fn error_into_tt_error<T>(source: impl Display) -> RResult<T, ()> { - let tt_error = BoxError::new(PluginError, source.to_string()); + let tt_error = BoxError::new(ErrorCode::PluginError, source.to_string()); tt_error.set_last_error(); RErr(()) } diff --git a/picodata-plugin/src/util.rs b/picodata-plugin/src/util.rs index dc7d3fb792002fe5e538689b031f5973b401a66d..1b60c4dc61b8a2a7dafd1496c44333a38a8b2e07 100644 --- a/picodata-plugin/src/util.rs +++ b/picodata-plugin/src/util.rs @@ -1,3 +1,4 @@ +use crate::error_code::ErrorCode; use abi_stable::StableAbi; use std::ptr::NonNull; use tarantool::error::BoxError; @@ -381,6 +382,14 @@ fn as_non_null_ptr<T>(data: &[T]) -> NonNull<T> { unsafe { NonNull::new_unchecked(pointer as *mut _) } } +// TODO: this should be in tarantool module +pub fn tarantool_error_to_box_error(e: tarantool::error::Error) -> BoxError { + match e { + tarantool::error::Error::Tarantool(e) => e, + other => BoxError::new(ErrorCode::Other, other.to_string()), + } +} + //////////////////////////////////////////////////////////////////////////////// // test //////////////////////////////////////////////////////////////////////////////// diff --git a/src/lib.rs b/src/lib.rs index f44b8ebe78e23865413d299ce7a89a08e64a0dfd..a5815e3166399029dabfeeee85c5f08759a2552d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -293,7 +293,6 @@ fn start_webui() { /// (discovery, rpc, public proc api). fn init_handlers() { plugin::rpc::server::init_handlers(); - plugin::metrics::init_handlers(); rpc::init_static_proc_set(); diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index 4229b9f1bd0ce92db122bdc2f51a5c36d3171c5e..4f2ceca2bf1262448086cbf891f16e0adbe77502 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -1,5 +1,4 @@ use crate::has_states; -use crate::plugin::PluginEvent; use crate::tarantool; use crate::tlog; use crate::traft::error::Error; @@ -31,12 +30,7 @@ pub async fn callback() { // 1. Wake up the sentinel so it starts trying to set target state Offline. node.sentinel_loop.on_shut_down(); - if let Err(e) = node - .plugin_manager - .handle_event_sync(PluginEvent::InstanceShutdown) - { - tlog!(Error, "plugin `on_stop` error: {e}"); - }; + node.plugin_manager.handle_instance_shutdown(); fiber::reschedule(); diff --git a/src/plugin/background.rs b/src/plugin/background.rs new file mode 100644 index 0000000000000000000000000000000000000000..ce651ec2c4a5ded2999702bfcb444c501cd413a4 --- /dev/null +++ b/src/plugin/background.rs @@ -0,0 +1,135 @@ +use crate::plugin::ServiceState; +use crate::tlog; +use crate::traft::node; +use picodata_plugin::background::FfiBackgroundJobCancellationToken; +use picodata_plugin::background::JobCancellationResult; +use picodata_plugin::error_code::ErrorCode; +use picodata_plugin::plugin::interface::ServiceId; +use smol_str::SmolStr; +use std::time::Duration; +use tarantool::error::BoxError; +use tarantool::fiber; +use tarantool::time::Instant; + +//////////////////////////////////////////////////////////////////////////////// +// register_background_job_cancellation_token +//////////////////////////////////////////////////////////////////////////////// + +pub fn register_background_job_cancellation_token( + service: ServiceId, + job_tag: SmolStr, + token: FfiBackgroundJobCancellationToken, +) -> Result<(), BoxError> { + let node = node::global().expect("initialized before plugins"); + let manager = &node.plugin_manager; + + if manager.get_service_state(&service).is_none() { + crate::warn_or_panic!("plugin callback called for non-existent service {service}"); + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::NoSuchService, format!("service `{service}` not found"))); + } + + let mut guard = manager.background_job_cancellation_tokens.lock(); + let all_jobs = guard.entry(service).or_default(); + all_jobs.push((job_tag, token)); + + Ok(()) +} + +//////////////////////////////////////////////////////////////////////////////// +// cancel_background_jobs_by_tag +//////////////////////////////////////////////////////////////////////////////// + +pub fn cancel_background_jobs_by_tag( + service_id: ServiceId, + job_tag: SmolStr, + timeout: Duration, +) -> Result<JobCancellationResult, BoxError> { + let node = node::global().expect("initialized before plugins"); + let manager = &node.plugin_manager; + + let Some(service) = manager.get_service_state(&service_id) else { + crate::warn_or_panic!("plugin callback called for non-existent service {service_id}"); + #[rustfmt::skip] + return Err(BoxError::new(ErrorCode::NoSuchService, format!("service `{service_id}` not found"))); + }; + + let mut guard = manager.background_job_cancellation_tokens.lock(); + let Some(all_jobs) = guard.get_mut(&service.id) else { + return Ok(JobCancellationResult::new(0, 0)); + }; + + let target_tag = job_tag; + let mut jobs_to_cancel = vec![]; + let mut cursor = 0; + while cursor < all_jobs.len() { + let (job_tag, _) = &all_jobs[cursor]; + if &target_tag == job_tag { + let token = all_jobs.swap_remove(cursor); + jobs_to_cancel.push(token); + continue; + } + + cursor += 1; + } + + // Release the lock. + drop(guard); + + cancel_jobs(&service, &jobs_to_cancel); + + let deadline = fiber::clock().saturating_add(timeout); + let n_timeouts = wait_jobs_finished(&service, &jobs_to_cancel, deadline); + + let n_total = jobs_to_cancel.len(); + Ok(JobCancellationResult::new(n_total as _, n_timeouts)) +} + +pub fn cancel_jobs(service: &ServiceState, jobs: &[(SmolStr, FfiBackgroundJobCancellationToken)]) { + let service_id = &service.id; + for (job_tag, token) in jobs { + #[rustfmt::skip] + tlog!(Debug, "cancelling service {service_id} background job `{job_tag}`"); + token.cancel_job(); + } +} + +/// Returns the number of jobs which is didn't finish in time. +pub fn wait_jobs_finished( + service: &ServiceState, + jobs: &[(SmolStr, FfiBackgroundJobCancellationToken)], + deadline: Instant, +) -> u32 { + let service_id = &service.id; + + let mut n_timeouts = 0; + for (job_tag, token) in jobs { + let timeout = deadline.duration_since(fiber::clock()); + let res = token.wait_job_finished(timeout); + if res.is_err() { + let e = BoxError::last(); + #[rustfmt::skip] + tlog!(Warning, "service {service_id} job `{job_tag}` didn't finish in time: {e}"); + + n_timeouts += 1; + } + } + + n_timeouts +} + +//////////////////////////////////////////////////////////////////////////////// +// set_shutdown_jobs_shutdown_timeout +//////////////////////////////////////////////////////////////////////////////// + +pub fn set_jobs_shutdown_timeout(service: ServiceId, timeout: Duration) { + let node = node::global().expect("initialized before plugins"); + let manager = &node.plugin_manager; + + let Some(service) = manager.get_service_state(&service) else { + crate::warn_or_panic!("plugin callback called for non-existent service {service}"); + return; + }; + + service.background_job_shutdown_timeout.set(Some(timeout)); +} diff --git a/src/plugin/ffi.rs b/src/plugin/ffi.rs index 75ca13f892454d45b73c7e2edbefbb130ef9a538..d77741d1a132572965248efa84011e52873d1f17 100644 --- a/src/plugin/ffi.rs +++ b/src/plugin/ffi.rs @@ -1,3 +1,5 @@ +#[allow(unused_imports)] +use crate::error_code::ErrorCode; use crate::info::{InstanceInfo, RaftInfo, VersionInfo}; use crate::instance::StateVariant; use crate::plugin::{rpc, PluginIdentifier}; @@ -8,15 +10,20 @@ use crate::{cas, sql, traft}; use abi_stable::pmr::{RErr, RNone, ROk, ROption, RResult, RSome}; use abi_stable::std_types::{RDuration, RVec, Tuple2}; use abi_stable::{sabi_extern_fn, RTuple}; +use picodata_plugin::background::FfiBackgroundJobCancellationToken; +use picodata_plugin::background::JobCancellationResult; use picodata_plugin::internal::types; use picodata_plugin::internal::types::{DmlInner, OpInner}; use picodata_plugin::metrics::FfiMetricsHandler; +use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::sql::types::{SqlValue, SqlValueInner}; use picodata_plugin::transport::rpc::client::FfiSafeRpcRequestArguments; use picodata_plugin::transport::rpc::server::FfiRpcHandler; use picodata_plugin::util::FfiSafeBytes; +use picodata_plugin::util::FfiSafeStr; use sbroad::ir::value::double::Double; use sbroad::ir::value::{LuaValue, Tuple, Value}; +use std::time::Duration; use std::{mem, slice}; use tarantool::datetime::Datetime; use tarantool::error::IntoBoxError; @@ -395,3 +402,96 @@ pub extern "C" fn pico_ffi_register_metrics_handler(handler: FfiMetricsHandler) 0 } + +/// Returns error with code [`ErrorCode::NoSuchService`] if there's no service with provided id. +/// +/// Otherwise writes into `result` info about how many jobs (if any) didn't +/// finish in the provided `timeout`. +#[no_mangle] +pub extern "C" fn pico_ffi_background_cancel_jobs_by_tag( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + job_tag: FfiSafeStr, + timeout: f64, + result: *mut JobCancellationResult, +) -> i32 { + // SAFETY: data outlives this function call + let plugin = unsafe { plugin.as_str() }; + let service = unsafe { service.as_str() }; + let version = unsafe { version.as_str() }; + let service_id = ServiceId::new(plugin, service, version); + + let job_tag = unsafe { job_tag.as_str() }; + + let res = crate::plugin::background::cancel_background_jobs_by_tag( + service_id, + job_tag.into(), + Duration::from_secs_f64(timeout), + ); + + let res = match res { + Ok(v) => v, + Err(e) => { + e.set_last(); + return -1; + } + }; + + // SAFETY: the caller is responsible for this to be safe + unsafe { std::ptr::write(result, res) }; + + 0 +} + +#[no_mangle] +pub extern "C" fn pico_ffi_background_register_job_cancellation_token( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + job_tag: FfiSafeStr, + token: FfiBackgroundJobCancellationToken, +) -> i32 { + // SAFETY: data outlives this function call + let plugin = unsafe { plugin.as_str() }; + let service = unsafe { service.as_str() }; + let version = unsafe { version.as_str() }; + let service_id = ServiceId::new(plugin, service, version); + + let job_tag = unsafe { job_tag.as_str() }; + + let res = crate::plugin::background::register_background_job_cancellation_token( + service_id, + job_tag.into(), + token, + ); + + if let Err(e) = res { + e.set_last(); + return -1; + } + + 0 +} + +/// Set a user-specified timeout value when shutting down all background jobs +/// in case of service shutdown. +#[no_mangle] +pub extern "C" fn pico_ffi_background_set_jobs_shutdown_timeout( + plugin: FfiSafeStr, + service: FfiSafeStr, + version: FfiSafeStr, + timeout: f64, +) -> i32 { + // SAFETY: data outlives this function call + let plugin = unsafe { plugin.as_str() }; + let service = unsafe { service.as_str() }; + let version = unsafe { version.as_str() }; + let service_id = ServiceId::new(plugin, service, version); + crate::plugin::background::set_jobs_shutdown_timeout( + service_id, + Duration::from_secs_f64(timeout), + ); + + 0 +} diff --git a/src/plugin/manager.rs b/src/plugin/manager.rs index 6495ec20d5ebadbeafdf468bcc5198e209960467..9ac92d4a9de8f38c4e029d4955b6aa0430009c84 100644 --- a/src/plugin/manager.rs +++ b/src/plugin/manager.rs @@ -1,12 +1,13 @@ use crate::config::PicodataConfig; use crate::info::PICODATA_VERSION; +use crate::plugin::background; use crate::plugin::rpc; use crate::plugin::LibraryWrapper; use crate::plugin::PluginError::{PluginNotFound, ServiceCollision}; use crate::plugin::ServiceState; use crate::plugin::{ remove_routes, replace_routes, topology, Manifest, PluginAsyncEvent, PluginCallbackError, - PluginError, PluginEvent, PluginIdentifier, Result, + PluginError, PluginIdentifier, Result, }; use crate::schema::{PluginDef, ServiceDef, ServiceRouteItem, ServiceRouteKey}; use crate::storage::Clusterwide; @@ -18,12 +19,14 @@ use crate::version::Version; use crate::{tlog, traft, warn_or_panic}; use abi_stable::derive_macro_reexports::{RErr, RResult, RSlice}; use abi_stable::std_types::RStr; -use picodata_plugin::background::{Error, InternalGlobalWorkerManager}; +use picodata_plugin::background::FfiBackgroundJobCancellationToken; use picodata_plugin::error_code::ErrorCode::PluginError as PluginErrorCode; +use picodata_plugin::metrics::FfiMetricsHandler; use picodata_plugin::plugin::interface::FnServiceRegistrar; use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::plugin::interface::{PicoContext, ServiceRegistry}; use picodata_plugin::util::DisplayErrorLocation; +use smol_str::SmolStr; use std::collections::HashMap; use std::fs; use std::fs::ReadDir; @@ -74,6 +77,28 @@ struct PluginState { version: String, } +impl PluginState { + fn new(version: String) -> Self { + Self { + version, + services: vec![], + } + } + + fn remove_service(&mut self, service: &str) -> Option<Rc<ServiceState>> { + let index = self + .services + .iter() + .position(|svc| svc.id.service == service)?; + let service = self.services.swap_remove(index); + Some(service) + } +} + +type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; +type BackgroundJobCancellationTokenMap = + HashMap<ServiceId, Vec<(SmolStr, FfiBackgroundJobCancellationToken)>>; + pub struct PluginManager { /// List of pairs (plugin name -> plugin state). /// @@ -86,8 +111,16 @@ pub struct PluginManager { /// Queue of async events. events_queue: Option<fiber::channel::Channel<PluginAsyncEvent>>, + + /// Plugin-defined metrics callbacks. + pub(crate) metrics_handlers: fiber::Mutex<MetricsHandlerMap>, + + pub(crate) background_job_cancellation_tokens: fiber::Mutex<BackgroundJobCancellationTokenMap>, + /// Fiber for handle async events, those handlers need it for avoided yield's. - _loop: Option<fiber::JoinHandle<'static, ()>>, + /// Id is only stored for debugging. + #[allow(dead_code)] + async_event_fiber_id: fiber::FiberId, } impl PluginManager { @@ -100,18 +133,20 @@ impl PluginManager { let options = WorkerOptions::default(); let pool = ConnectionPool::new(storage, options); - let r#loop = Loop::new(plugins.clone()); - let defer_events_fiber = fiber::Builder::new() + let async_event_fiber_id = fiber::Builder::new() .name("plugin_manager_loop") - .func(move || r#loop.run(tx)) - .defer() - .expect("Plugin manager fiber should not fail"); + .func(move || plugin_manager_async_event_loop(tx)) + .defer_non_joinable() + .expect("Plugin manager fiber should not fail") + .expect("fiber id is supported"); Self { plugins, pool, events_queue: Some(rx), - _loop: defer_events_fiber.into(), + metrics_handlers: Default::default(), + background_job_cancellation_tokens: Default::default(), + async_event_fiber_id, } } @@ -288,10 +323,9 @@ impl PluginManager { }, ); - self.handle_event_sync(PluginEvent::PluginLoad { - ident, - service_defs: &service_defs, - }) + self.handle_plugin_load(ident, &service_defs)?; + + Ok(()) } /// Check the possibility of loading plugin into instance. @@ -307,7 +341,7 @@ impl PluginManager { } /// Load and start all enabled plugins and services that must be loaded. - fn handle_instance_online(&self) -> Result<()> { + pub(crate) fn handle_instance_online(&self) -> Result<()> { let node = node::global().expect("node must be already initialized"); let instance_name = node @@ -352,23 +386,41 @@ impl PluginManager { Ok(()) } + fn handle_async_event(&self, event: PluginAsyncEvent) -> traft::Result<()> { + match event { + PluginAsyncEvent::ServiceConfigurationUpdated { + ident, + service, + old_raw, + new_raw, + } => { + let res = self.handle_config_updated(&ident, &service, &old_raw, &new_raw); + if let Err(e) = res { + tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}"); + } + } + PluginAsyncEvent::PluginDisabled { name } => { + let res = self.handle_plugin_disabled(&name); + if let Err(e) = res { + tlog!(Error, "plugin {name} remove error: {e}"); + } + } + } + + Ok(()) + } + /// Call `on_stop` callback at services and remove plugin from managed. - fn handle_plugin_disabled( - plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>, - plugin_name: &str, - ) -> traft::Result<()> { + fn handle_plugin_disabled(&self, plugin_name: &str) -> traft::Result<()> { let node = node::global().expect("node must be already initialized"); let ctx = context_from_node(node); - let plugin = { - let mut lock = plugins.lock(); - lock.remove(plugin_name) - }; + let plugin = self.plugins.lock().remove(plugin_name); if let Some(plugin_state) = plugin { // stop all background jobs and remove metrics first - stop_background_jobs(&plugin_state.services); - remove_metrics(&plugin_state.services); + self.stop_background_jobs(&plugin_state.services); + self.remove_metrics_handlers(&plugin_state.services); for service in plugin_state.services.iter() { stop_service(service, &ctx); @@ -379,7 +431,7 @@ impl PluginManager { } /// Stop all plugin services. - fn handle_instance_shutdown(&self) { + pub(crate) fn handle_instance_shutdown(&self) { let node = node::global().expect("node must be already initialized"); let ctx = context_from_node(node); @@ -387,8 +439,8 @@ impl PluginManager { let services_to_stop = plugins.values().flat_map(|state| &state.services); // stop all background jobs and remove metrics first - stop_background_jobs(services_to_stop.clone()); - remove_metrics(services_to_stop.clone()); + self.stop_background_jobs(services_to_stop.clone()); + self.remove_metrics_handlers(services_to_stop.clone()); for service in services_to_stop { stop_service(service, &ctx); @@ -397,7 +449,7 @@ impl PluginManager { /// Call `on_config_change` at services. Poison services if error at callbacks happens. fn handle_config_updated( - plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>, + &self, plugin_identity: &PluginIdentifier, service: &str, old_cfg_raw: &[u8], @@ -408,7 +460,7 @@ impl PluginManager { let storage = &node.storage; let maybe_service = { - let lock = plugins.lock(); + let lock = self.plugins.lock(); lock.get(&plugin_identity.name) .and_then(|plugin_state| { plugin_state @@ -479,7 +531,7 @@ impl PluginManager { } /// Call `on_leader_change` at services. Poison services if error at callbacks happens. - fn handle_rs_leader_change(&self) -> traft::Result<()> { + pub(crate) fn handle_rs_leader_change(&self) -> traft::Result<()> { let node = node::global()?; let mut ctx = context_from_node(node); let storage = &node.storage; @@ -581,6 +633,9 @@ impl PluginManager { )); }; + let service = Rc::new(new_service); + let id = &service.id; + // call `on_start` callback let mut ctx = context_from_node(node); let cfg = node @@ -588,13 +643,21 @@ impl PluginManager { .plugin_config .get_by_entity_as_mp(plugin_ident, &service_defs[0].name)?; let cfg_raw = rmp_serde::encode::to_vec_named(&cfg).expect("out of memory"); - context_set_service_info(&mut ctx, &new_service); - let id = &new_service.id; + // add the service to the storage, because the `on_start` may attempt to + // indirectly reference it through there + let mut plugins = self.plugins.lock(); + let plugin = plugins + .entry(plugin_def.name) + .or_insert_with(|| PluginState::new(plugin_ident.version.to_string())); + plugin.services.push(service.clone()); + #[rustfmt::skip] tlog!(Debug, "calling {id}.on_start"); - let mut guard = new_service.volatile_state.lock(); + context_set_service_info(&mut ctx, &service); + + let mut guard = service.volatile_state.lock(); let res = guard.inner.on_start(&ctx, RSlice::from(cfg_raw.as_slice())); // Release the lock drop(guard); @@ -605,21 +668,16 @@ impl PluginManager { #[rustfmt::skip] tlog!(Error, "plugin callback {id}.on_start error: {loc}{error}"); + // Remove the service which we just added, because it failed to enable + plugin.remove_service(&id.service); + return Err(PluginError::Callback(PluginCallbackError::OnStart(error))); } - // append new service to a plugin - let mut plugins = self.plugins.lock(); - let entry = plugins.entry(plugin_def.name).or_insert(PluginState { - services: vec![], - version: plugin_ident.version.to_string(), - }); - entry.services.push(Rc::new(new_service)); - Ok(()) } - fn handle_service_disabled(&self, plugin_ident: &PluginIdentifier, service: &str) { + pub(crate) fn handle_service_disabled(&self, plugin_ident: &PluginIdentifier, service: &str) { let node = node::global().expect("node must be already initialized"); let ctx = context_from_node(node); @@ -632,19 +690,14 @@ impl PluginManager { return; } - let Some(svc_idx) = state - .services - .iter() - .position(|svc| svc.id.service == service) - else { + let Some(service_to_del) = state.remove_service(service) else { return; }; - let service_to_del = state.services.swap_remove(svc_idx); drop(plugins); // stop all background jobs and remove metrics first - stop_background_jobs(&[service_to_del.clone()]); - remove_metrics(&[service_to_del.clone()]); + self.stop_background_jobs(&[service_to_del.clone()]); + self.remove_metrics_handlers(&[service_to_del.clone()]); // call `on_stop` callback and drop service stop_service(&service_to_del, &ctx); @@ -704,29 +757,8 @@ impl PluginManager { Ok(()) } - fn handle_plugin_load_error(&self, plugin: &str) -> Result<()> { - let node = node::global().expect("must be initialized"); - let ctx = context_from_node(node); - - let maybe_plugin_state = { - let mut lock = self.plugins.lock(); - lock.remove(plugin) - }; - - if let Some(plugin_state) = maybe_plugin_state { - // stop all background jobs and remove metrics first - stop_background_jobs(&plugin_state.services); - remove_metrics(&plugin_state.services); - - for service in plugin_state.services.iter() { - stop_service(&service, &ctx); - } - } - Ok(()) - } - /// Call user defined service configuration validation. - fn handle_before_service_reconfigured( + pub(crate) fn handle_before_service_reconfigured( &self, plugin_ident: &PluginIdentifier, service_name: &str, @@ -806,76 +838,82 @@ impl PluginManager { )) } - /// Handle picodata event by plugin system. - /// Any event may be handled by any count of plugins that are interested in this event. - /// Return error if any of service callbacks return error. - /// - /// # Arguments - /// - /// * `event`: upcoming event - pub fn handle_event_sync(&self, event: PluginEvent) -> Result<()> { - match event { - PluginEvent::InstanceOnline => { - self.handle_instance_online()?; - } - PluginEvent::InstanceShutdown => { - self.handle_instance_shutdown(); - } - PluginEvent::PluginLoad { - ident, - service_defs, - } => { - self.handle_plugin_load(ident, service_defs)?; - } - PluginEvent::PluginLoadError { name } => { - self.handle_plugin_load_error(name)?; - } - PluginEvent::BeforeServiceConfigurationUpdated { - ident, - service, - new_raw, - } => { - self.handle_before_service_reconfigured(ident, service, new_raw)?; - } - PluginEvent::InstanceDemote | PluginEvent::InstancePromote => { - if let Err(e) = self.handle_rs_leader_change() { - tlog!(Error, "on_leader_change error: {e}"); - } - } - PluginEvent::ServiceEnabled { ident, service } => { - self.handle_service_enabled(ident, service)?; - } - PluginEvent::ServiceDisabled { ident, service } => { - self.handle_service_disabled(ident, service); - } - } - - Ok(()) - } - /// Queue event for deferred execution. /// May be called from transactions because never yields. /// /// # Arguments /// /// * `event`: queued event - pub fn handle_event_async(&self, event: PluginAsyncEvent) -> Result<()> { + pub fn add_async_event_to_queue(&self, event: PluginAsyncEvent) -> Result<()> { self.events_queue .as_ref() .expect("infallible") .try_send(event) .map_err(|_| PluginError::AsyncEventQueueFull) } + + fn remove_metrics_handlers<'a>( + &self, + services: impl IntoIterator<Item = &'a Rc<ServiceState>>, + ) { + let mut handlers = self.metrics_handlers.lock(); + for service in services { + let id = &service.id; + let handler = handlers.remove(id); + if handler.is_some() { + tlog!(Info, "unregistered metrics handler for `{id}`"); + } + } + } + + fn stop_background_jobs<'a>(&self, services: impl IntoIterator<Item = &'a Rc<ServiceState>>) { + let mut guard = self.background_job_cancellation_tokens.lock(); + + let mut service_jobs = vec![]; + for service in services { + let Some(jobs) = guard.remove(&service.id) else { + continue; + }; + + service_jobs.push((service, jobs)); + } + + // Release the lock. + drop(guard); + + for (service, jobs) in &service_jobs { + background::cancel_jobs(service, jobs); + } + + const DEFAULT_BACKGROUND_JOB_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); + + let start = fiber::clock(); + for (service, jobs) in &service_jobs { + let timeout = service.background_job_shutdown_timeout.get(); + let timeout = timeout.unwrap_or(DEFAULT_BACKGROUND_JOB_SHUTDOWN_TIMEOUT); + let deadline = start.saturating_add(timeout); + + background::wait_jobs_finished(service, jobs, deadline); + } + } + + pub fn get_service_state(&self, id: &ServiceId) -> Option<Rc<ServiceState>> { + let plugins = self.plugins.lock(); + let plugin = plugins.get(id.plugin())?; + for service in &plugin.services { + if &service.id == id { + return Some(service.clone()); + } + } + + None + } } impl Drop for PluginManager { fn drop(&mut self) { let event_queue = self.events_queue.take(); event_queue.unwrap().close(); - - if let Some(r#loop) = self._loop.take() { - r#loop.join(); - } } } @@ -910,87 +948,12 @@ fn stop_service(service: &ServiceState, context: &PicoContext) { } /// Plugin manager inner loop, using for handle async events (must be run in a separate fiber). -struct Loop { - /// List of pairs (plugin name -> plugin state) - /// - /// There are two mutex here to avoid the situation when one long service callback - /// will block other services. - plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>, -} - -impl Loop { - pub fn new(plugins: Rc<fiber::Mutex<HashMap<String, PluginState>>>) -> Self { - Self { plugins } - } - - fn handle_event(&self, event: PluginAsyncEvent) -> traft::Result<()> { - match event { - PluginAsyncEvent::ServiceConfigurationUpdated { - ident, - service, - old_raw, - new_raw, - } => { - let plugins = self.plugins.clone(); - if let Err(e) = PluginManager::handle_config_updated( - plugins, &ident, &service, &old_raw, &new_raw, - ) { - tlog!(Error, "plugin {ident} service {service}, apply new plugin configuration error: {e}"); - } - } - PluginAsyncEvent::PluginDisabled { name } => { - let plugins = self.plugins.clone(); - if let Err(e) = PluginManager::handle_plugin_disabled(plugins, &name) { - tlog!(Error, "plugin {name} remove error: {e}"); - } - } +fn plugin_manager_async_event_loop(event_chan: fiber::channel::Channel<PluginAsyncEvent>) { + while let Some(event) = event_chan.recv() { + let node = + node::global().expect("initialized by the time plugin async events start happening"); + if let Err(e) = node.plugin_manager.handle_async_event(event) { + tlog!(Error, "plugin async event handler error: {e}"); } - - Ok(()) - } - - fn run(&self, event_chan: fiber::channel::Channel<PluginAsyncEvent>) { - while let Some(event) = event_chan.recv() { - if let Err(e) = self.handle_event(event) { - tlog!(Error, "plugin async event handler error: {e}"); - } - } - } -} - -fn stop_background_jobs<'a>(services: impl IntoIterator<Item = &'a Rc<ServiceState>>) { - const DEFAULT_SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(5); - - let mut service_to_unregister = vec![]; - let mut max_shutdown_timeout = DEFAULT_SHUTDOWN_TIMEOUT; - for service in services { - if let Some(timeout) = - InternalGlobalWorkerManager::instance().get_shutdown_timeout(&service.id) - { - if max_shutdown_timeout < timeout { - max_shutdown_timeout = timeout; - } - } - service_to_unregister.push(&service.id); - } - - let mut fibers = vec![]; - for svc_id in service_to_unregister { - fibers.push(fiber::start(move || { - if let Err(Error::PartialCompleted(expected, completed)) = - InternalGlobalWorkerManager::instance().unregister_service(&svc_id, max_shutdown_timeout) - { - tlog!(Warning, "Not all jobs for service {svc_id} was completed on time, expected: {expected}, completed: {completed}"); - } - })); - } - for fiber in fibers { - fiber.join(); - } -} - -fn remove_metrics<'a>(plugins: impl IntoIterator<Item = &'a Rc<ServiceState>>) { - for service in plugins { - crate::plugin::metrics::unregister_metrics_handler(&service.id) } } diff --git a/src/plugin/metrics.rs b/src/plugin/metrics.rs index d9df9cfb95a673d40ec3c3c89f7a10c19fbd4095..819507238f7a89e1a6c3dd6cb22fc3e693731a4a 100644 --- a/src/plugin/metrics.rs +++ b/src/plugin/metrics.rs @@ -1,32 +1,11 @@ use crate::tlog; +use crate::traft::node; use picodata_plugin::metrics::FfiMetricsHandler; -use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::util::RegionGuard; use std::collections::hash_map::Entry; -use std::collections::HashMap; use std::rc::Rc; use tarantool::error::BoxError; use tarantool::error::TarantoolErrorCode; -use tarantool::fiber::mutex::MutexGuard; -use tarantool::fiber::Mutex; - -static mut HANDLERS: Option<Mutex<MetricsHandlerMap>> = None; - -type MetricsHandlerMap = HashMap<ServiceId, Rc<FfiMetricsHandler>>; - -pub(crate) fn init_handlers() { - unsafe { - HANDLERS = Some(Mutex::new(HashMap::new())); - } -} - -#[inline] -fn handlers() -> MutexGuard<'static, MetricsHandlerMap> { - // SAFETY: global variable access: safe in tx thread. - let handlers = unsafe { HANDLERS.as_ref() }; - let handlers = handlers.expect("should be initialized at startup"); - handlers.lock() -} pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxError> { let identifier = &handler.identifier; @@ -50,7 +29,8 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr return Err(BoxError::new(TarantoolErrorCode::IllegalParams, "RPC route service version cannot be empty")); } - let mut handlers = handlers(); + let node = node::global()?; + let mut handlers = node.plugin_manager.metrics_handlers.lock(); let service_id = identifier.service_id(); let entry = handlers.entry(service_id); @@ -76,20 +56,16 @@ pub fn register_metrics_handler(handler: FfiMetricsHandler) -> Result<(), BoxErr Ok(()) } -pub fn unregister_metrics_handler(service_id: &ServiceId) { - // SAFETY: global variable access: safe in tx thread. - let handler = handlers().remove(service_id); - if handler.is_some() { - tlog!(Info, "unregistered metrics handler for `{service_id}`"); - } -} - pub fn get_plugin_metrics() -> String { let _guard = RegionGuard::new(); let mut res = String::new(); - let handlers = handlers(); + let Ok(node) = node::global() else { + return "".into(); + }; + + let handlers = node.plugin_manager.metrics_handlers.lock(); // Note: must first copy all references to a local vec, because the callbacks may yield. let handler_copies: Vec<_> = handlers.values().cloned().collect(); // Release the lock. diff --git a/src/plugin/mod.rs b/src/plugin/mod.rs index fc0b3d9fd8ee3c2bdc563ad793c1640de79c756f..978b9618d66437e24b25f4f2378361810694cef1 100644 --- a/src/plugin/mod.rs +++ b/src/plugin/mod.rs @@ -1,3 +1,4 @@ +pub mod background; mod ffi; pub mod lock; pub mod manager; @@ -30,6 +31,7 @@ pub use picodata_plugin::plugin::interface::ServiceId; use picodata_plugin::plugin::interface::{ServiceBox, ValidatorBox}; use rmpv::Value; use serde::{Deserialize, Serialize}; +use std::cell::Cell; use std::collections::HashMap; use std::fmt::{Display, Formatter}; use std::fs::File; @@ -142,6 +144,8 @@ type Result<T, E = PluginError> = std::result::Result<T, E>; pub struct ServiceState { pub id: ServiceId, + background_job_shutdown_timeout: Cell<Option<Duration>>, + /// The dynamic state of the service. It is guarded by a fiber mutex, /// because we don't want to enter the plugin callbacks from concurrent fibers. volatile_state: fiber::Mutex<ServiceStateVolatile>, @@ -157,6 +161,7 @@ impl ServiceState { ) -> Self { Self { id, + background_job_shutdown_timeout: Cell::new(None), volatile_state: fiber::Mutex::new(ServiceStateVolatile { inner, config_validator, @@ -359,43 +364,6 @@ impl Manifest { } } -/// Events that may be fired at picodata -/// and which plugins should respond to. -#[derive(Clone, PartialEq, Debug)] -pub enum PluginEvent<'a> { - /// Picodata instance goes online. - InstanceOnline, - /// Picodata instance shutdown (shutdown trigger is called). - InstanceShutdown, - /// New plugin load at instance. - PluginLoad { - ident: &'a PluginIdentifier, - service_defs: &'a [ServiceDef], - }, - /// Error occurred while the plugin loaded. - PluginLoadError { name: &'a str }, - /// Request for update service configuration received. - BeforeServiceConfigurationUpdated { - ident: &'a PluginIdentifier, - service: &'a str, - new_raw: &'a [u8], - }, - /// Instance demote. - InstanceDemote, - /// Instance promote as a replicaset leader. - InstancePromote, - /// Plugin service enabled at instance. - ServiceEnabled { - ident: &'a PluginIdentifier, - service: &'a str, - }, - /// Plugin service disabled at instance. - ServiceDisabled { - ident: &'a PluginIdentifier, - service: &'a str, - }, -} - /// Events that may be fired at picodata /// and which plugins should respond to *asynchronously*. /// @@ -1227,12 +1195,10 @@ pub fn change_config_atom( let new_cfg_raw = rmp_serde::to_vec_named(&Value::Map(current_cfg)).expect("out of memory"); - let event = PluginEvent::BeforeServiceConfigurationUpdated { - ident, - service, - new_raw: new_cfg_raw.as_slice(), - }; - node.plugin_manager.handle_event_sync(event)?; + + node.plugin_manager + .handle_before_service_reconfigured(ident, service, &new_cfg_raw)?; + service_config_part.push((service.to_string(), kv)); } diff --git a/src/rpc/disable_service.rs b/src/rpc/disable_service.rs index 3c2c25ab414b5a45108d17eb3523bafe81d346b3..4eebe347ac080e0e128fd96e828ac280e6a521f0 100644 --- a/src/rpc/disable_service.rs +++ b/src/rpc/disable_service.rs @@ -1,4 +1,3 @@ -use crate::plugin::PluginEvent; use crate::plugin::PluginOp; use crate::traft::error::Error; use crate::traft::node; @@ -29,10 +28,7 @@ crate::define_rpc_request! { }; // reaction at `ServiceDisabled` is idempotent, so no errors occurred - _ = node.plugin_manager.handle_event_sync(PluginEvent::ServiceDisabled { - ident: plugin, - service, - }); + node.plugin_manager.handle_service_disabled(plugin, service); Ok(Response {}) } diff --git a/src/rpc/enable_all_plugins.rs b/src/rpc/enable_all_plugins.rs index 1490dcea82af178b4b4db354ea6b95ed8ebc5a03..c6936045de73beef78db9d3ce42cff7d5039c799 100644 --- a/src/rpc/enable_all_plugins.rs +++ b/src/rpc/enable_all_plugins.rs @@ -1,4 +1,3 @@ -use crate::plugin::PluginEvent; use crate::tlog; use crate::traft::node; use crate::traft::{RaftIndex, RaftTerm}; @@ -20,7 +19,7 @@ crate::define_rpc_request! { node.wait_index(req.applied, req.timeout)?; node.status().check_term(req.term)?; - let result = node.plugin_manager.handle_event_sync(PluginEvent::InstanceOnline); + let result = node.plugin_manager.handle_instance_online(); if let Err(e) = result { tlog!(Error, "failed initializing plugin system: {e}"); return Err(e.into()); diff --git a/src/rpc/enable_service.rs b/src/rpc/enable_service.rs index 80ea617bc23e65fab1ad236e1f79eed4de7ab881..f77d219f1dfc97deee23559e817366ebcfa32619 100644 --- a/src/rpc/enable_service.rs +++ b/src/rpc/enable_service.rs @@ -1,4 +1,3 @@ -use crate::plugin::PluginEvent; use crate::plugin::PluginOp; use crate::traft::error::Error; use crate::traft::node; @@ -30,10 +29,7 @@ crate::define_rpc_request! { return Err(Error::other("found unexpected plugin operation expected AlterServiceTiers, found {plugin_op:?}")); }; - node.plugin_manager.handle_event_sync(PluginEvent::ServiceEnabled { - ident: plugin, - service, - })?; + node.plugin_manager.handle_service_enabled(plugin, service)?; Ok(Response {}) } diff --git a/src/rpc/replication.rs b/src/rpc/replication.rs index d51c6c223aa36c0529a5a2083c8e344cd1a41ff3..f2be37bd09ed65e00f2f14560224ffc31d5ab6de 100644 --- a/src/rpc/replication.rs +++ b/src/rpc/replication.rs @@ -37,7 +37,6 @@ use crate::governor; #[allow(unused_imports)] use crate::governor::plan; use crate::pico_service::pico_service_password; -use crate::plugin::PluginEvent; #[allow(unused_imports)] use crate::rpc; use crate::schema::PICO_SERVICE_USER_NAME; @@ -155,6 +154,8 @@ fn is_read_only() -> Result<bool> { /// /// Returns errors in the following cases: See implementation. fn promote_to_master() -> Result<()> { + let node = node::global()?; + // XXX: Currently we just change the box.cfg.read_only option of the // instance but at some point we will implement support for // tarantool synchronous transactions then this operation will probably @@ -175,9 +176,10 @@ fn promote_to_master() -> Result<()> { if was_read_only { // errors ignored because it must be already handled by plugin manager itself - _ = node::global()? - .plugin_manager - .handle_event_sync(PluginEvent::InstancePromote); + let res = node.plugin_manager.handle_rs_leader_change(); + if let Err(e) = res { + tlog!(Error, "on_leader_change error: {e}"); + } } Ok(()) @@ -202,7 +204,10 @@ crate::define_rpc_request! { if !was_read_only { // errors ignored because it must be already handled by plugin manager itself - _ = node::global()?.plugin_manager.handle_event_sync(PluginEvent::InstanceDemote); + let res = node.plugin_manager.handle_rs_leader_change(); + if let Err(e) = res { + tlog!(Error, "on_leader_change error: {e}"); + } } let vclock = Vclock::current(); diff --git a/src/traft/node.rs b/src/traft/node.rs index d1d29f43b64de4de20485b9fa42dfa2468297c48..1d2c7fc56c5a6b541c2db387100b104231dfecf2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -1331,7 +1331,7 @@ impl NodeImpl { if let Err(e) = self .plugin_manager - .handle_event_async(PluginAsyncEvent::PluginDisabled { name: ident.name }) + .add_async_event_to_queue(PluginAsyncEvent::PluginDisabled { name: ident.name }) { tlog!(Warning, "async plugin event: {e}"); } @@ -1431,7 +1431,7 @@ impl NodeImpl { let old_cfg_raw = rmp_serde::encode::to_vec_named(&old_cfg).expect("out of memory"); - if let Err(e) = self.plugin_manager.handle_event_async( + if let Err(e) = self.plugin_manager.add_async_event_to_queue( PluginAsyncEvent::ServiceConfigurationUpdated { ident: ident.clone(), service: svc.name, diff --git a/test/int/test_plugin.py b/test/int/test_plugin.py index 83e2feb8c75a9f5c0b1c2d154eb7032ffbed9b60..e1a62e4556a6ade6e93b4950d23caeef4cb841fc 100644 --- a/test/int/test_plugin.py +++ b/test/int/test_plugin.py @@ -2093,11 +2093,11 @@ def test_set_topology_with_error_on_start(cluster: Cluster): # -------------------------- RPC SDK tests ------------------------------------- -def make_context(override: dict[Any, Any] = {}) -> dict[Any, Any]: +def make_context(override: dict[Any, Any] = {}, service=SERVICE_W_RPC) -> dict[Any, Any]: context = { REQUEST_ID: uuid.uuid4(), PLUGIN_NAME: _PLUGIN_W_SDK, - SERVICE_NAME: SERVICE_W_RPC, + SERVICE_NAME: service, PLUGIN_VERSION: _PLUGIN_VERSION_1, "timeout": 5.0, } @@ -2762,28 +2762,36 @@ def test_sdk_metrics(instance: Instance): def test_sdk_background(cluster: Cluster): [i1] = cluster.deploy(instance_count=1) + plugin = _PLUGIN_W_SDK + [service] = _PLUGIN_W_SDK_SERVICES + install_and_enable_plugin( i1, - _PLUGIN_W_SDK, - _PLUGIN_W_SDK_SERVICES, + plugin, + [service], migrate=True, default_config={"test_type": "background"}, ) + # Run internal tests + context = make_context(service=service) + i1.call(".proc_rpc_dispatch", "/test_cancel_tagged_basic", b"", context) + i1.call(".proc_rpc_dispatch", "/test_cancel_tagged_timeout", b"", context) + # assert that job is working Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_running", i1) # assert that job ends after plugin disabled - i1.call("pico.disable_plugin", _PLUGIN_W_SDK, _PLUGIN_VERSION_1) + i1.call("pico.disable_plugin", plugin, "0.1.0") Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_stopped", i1) # run again - i1.call("pico.enable_plugin", _PLUGIN_W_SDK, _PLUGIN_VERSION_1) + i1.call("pico.enable_plugin", plugin, "0.1.0") Retriable(timeout=5, rps=2).call(PluginReflection.assert_persisted_data_exists, "background_job_running", i1) # now shutdown 1 and check that job ended - i1.sql(f"ALTER PLUGIN {_PLUGIN_W_SDK} {_PLUGIN_VERSION_1} SET {_PLUGIN_W_SDK_SERVICES[0]}.test_type = 'no_test'") + i1.sql(f"ALTER PLUGIN {plugin} 0.1.0 SET {service}.test_type = 'no_test'") i1.restart() i1.wait_online() diff --git a/test/testplug/src/lib.rs b/test/testplug/src/lib.rs index 581d50794ae8a96febe3b30ffa44c79eb16c126b..975ebadbce72930f0f61912450cdfce100de6242 100644 --- a/test/testplug/src/lib.rs +++ b/test/testplug/src/lib.rs @@ -422,8 +422,32 @@ impl Service for Service3 { save_persisted_data("background_job_stopped"); } - let wm = ctx.worker_manager(); - wm.register_job(my_job).unwrap(); + ctx.register_job(my_job).unwrap(); + + ctx.set_jobs_shutdown_timeout(Duration::from_secs(3)); + + ctx.cancel_tagged_jobs("no-such-tag", Duration::from_secs(10)) + .unwrap(); + + let service = ctx.make_service_id(); + rpc::RouteBuilder::from(ctx) + .path("/test_cancel_tagged_basic") + .register(move |_, _| { + background_tests::test_cancel_tagged_basic(&service); + + Ok(rpc::Response::empty()) + }) + .unwrap(); + + let service = ctx.make_service_id(); + rpc::RouteBuilder::from(ctx) + .path("/test_cancel_tagged_timeout") + .register(move |_, _| { + background_tests::test_cancel_tagged_timeout(&service); + + Ok(rpc::Response::empty()) + }) + .unwrap(); } "no_test" => {} "metrics" => { @@ -461,6 +485,71 @@ impl Service for Service3 { } } +mod background_tests { + use picodata_plugin::background; + use picodata_plugin::plugin::interface::ServiceId; + use picodata_plugin::system::tarantool::fiber; + use std::sync::atomic::{AtomicU64, Ordering}; + use std::time::Duration; + + static _1_MS: Duration = Duration::from_millis(1); + static _10_MS: Duration = Duration::from_millis(10); + static _100_MS: Duration = Duration::from_millis(100); + static _200_MS: Duration = Duration::from_millis(200); + + fn make_job( + counter: &'static AtomicU64, + iteration_duration: Duration, + ) -> impl Fn(background::CancellationToken) { + move |cancel_token: background::CancellationToken| { + while cancel_token.wait_timeout(_1_MS).is_err() { + counter.fetch_add(1, Ordering::SeqCst); + fiber::sleep(iteration_duration); + } + counter.store(0, Ordering::SeqCst); + } + } + + pub fn test_cancel_tagged_basic(service: &ServiceId) { + static COUNTER_1: AtomicU64 = AtomicU64::new(0); + static COUNTER_2: AtomicU64 = AtomicU64::new(0); + + background::register_tagged_job(service, make_job(&COUNTER_1, _1_MS), "j1").unwrap(); + background::register_tagged_job(service, make_job(&COUNTER_1, _1_MS), "j1").unwrap(); + background::register_tagged_job(service, make_job(&COUNTER_2, _1_MS), "j2").unwrap(); + + fiber::sleep(_10_MS); + assert!(COUNTER_1.load(Ordering::SeqCst) > 0); + assert!(COUNTER_2.load(Ordering::SeqCst) > 0); + + background::cancel_jobs_by_tag(service, "j1", _10_MS).unwrap(); + + assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0); + assert_ne!(COUNTER_2.load(Ordering::SeqCst), 0); + + background::cancel_jobs_by_tag(service, "j2", _10_MS).unwrap(); + + assert_eq!(COUNTER_1.load(Ordering::SeqCst), 0); + assert_eq!(COUNTER_2.load(Ordering::SeqCst), 0); + } + + pub fn test_cancel_tagged_timeout(service: &ServiceId) { + static COUNTER: AtomicU64 = AtomicU64::new(0); + + let job_tag = "my tag"; + // this job cannot stop at 10ms interval + background::register_tagged_job(service, make_job(&COUNTER, _100_MS), job_tag).unwrap(); + // but this job can + background::register_tagged_job(service, make_job(&COUNTER, _1_MS), job_tag).unwrap(); + + fiber::sleep(_10_MS); + + let result = background::cancel_jobs_by_tag(service, job_tag, _10_MS).unwrap(); + assert_eq!(result.n_total, 2); + assert_eq!(result.n_timeouts, 1); + } +} + //////////////////////////////////////////////////////////////////////////////// /// ServiceWithRpcTests ////////////////////////////////////////////////////////////////////////////////