diff --git a/src/ipc.rs b/src/ipc.rs index afd66ae99ed68ca72c6a915ac15072079e1c8596..57edec95dd08f2072e1b54c781cc468ba3f2da90 100644 --- a/src/ipc.rs +++ b/src/ipc.rs @@ -5,7 +5,7 @@ use serde::ser::Serialize; use std::marker::PhantomData; #[derive(Debug)] -pub struct Fd(libc::c_int); +pub struct Fd(pub libc::c_int); #[derive(Debug)] pub struct Sender<T> { diff --git a/src/main.rs b/src/main.rs index 2e16705eb74cb4ae526bb89d74c832be1b271394..5bda914ce8f046d7b7de6373042d0ead3f75a20c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -148,11 +148,7 @@ fn rm_tarantool_files(data_dir: &str) { .unwrap_or(false) }) .for_each(|f| { - tlog!( - Warning, - "[supervisor] removing file: {}", - (&f).to_string_lossy() - ); + println!("[supervisor] removing file: {}", (&f).to_string_lossy()); std::fs::remove_file(f).unwrap(); }); } @@ -224,8 +220,6 @@ fn main_run(args: args::Run) -> ! { } } - let mut entrypoint = Entrypoint::StartDiscover {}; - // Tarantool running in a fork (or, to be more percise, the // libreadline) modifies termios settings to intercept echoed text. // @@ -236,30 +230,56 @@ fn main_run(args: args::Run) -> ! { // let tcattr = tcgetattr(0).ok(); + let parent = unistd::getpid(); + let mut entrypoint = Entrypoint::StartDiscover {}; loop { - tlog!(Info, "[supervisor] running {:?}", entrypoint); + println!("[supervisor:{parent}] running {entrypoint:?}"); - let pipe = ipc::channel::<IpcMessage>(); - let (rx, tx) = pipe.expect("pipe creation failed"); + let (from_child, to_parent) = + ipc::channel::<IpcMessage>().expect("ipc channel creation failed"); + let (from_parent, to_child) = ipc::pipe().expect("ipc pipe creation failed"); let pid = unsafe { fork() }; match pid.expect("fork failed") { ForkResult::Child => { - drop(rx); + drop(from_child); + drop(to_child); let rc = tarantool_main!( args.tt_args().unwrap(), - callback_data: (entrypoint, args, tx), - callback_data_type: (Entrypoint, args::Run, ipc::Sender<IpcMessage>), + callback_data: (entrypoint, args, to_parent, from_parent), + callback_data_type: (Entrypoint, args::Run, ipc::Sender<IpcMessage>, ipc::Fd), callback_body: { - entrypoint.exec(args, tx) + // We don't want a child to live without a supervisor. + // + // Usually, supervisor waits for child forever and retransmits + // termination signals. But if the parent is killed with a SIGKILL + // there's no way to pass anything. + // + // This fiber serves as a fuse - it tries to read from a pipe + // (that supervisor never writes to), and if the writing end is + // closed, it means the supervisor has terminated. + let fuse = fiber::Builder::new() + .name("supervisor_fuse") + .func(move || { + use ::tarantool::ffi::tarantool::CoIOFlags; + use ::tarantool::coio::coio_wait; + coio_wait(from_parent.0, CoIOFlags::READ, f64::INFINITY).ok(); + tlog!(Warning, "Supervisor terminated, exiting"); + std::process::exit(0); + }); + std::mem::forget(fuse.start()); + + entrypoint.exec(args, to_parent) } ); std::process::exit(rc); } ForkResult::Parent { child } => { - drop(tx); - let msg = rx.recv().ok(); + drop(from_parent); + drop(to_parent); + + let msg = from_child.recv().ok(); let mut rc: i32 = 0; unsafe { @@ -275,14 +295,15 @@ fn main_run(args: args::Run) -> ! { tcsetattr(0, TCSADRAIN, tcattr).unwrap(); } - tlog!(Info, "[supervisor] tarantool process finished: {:?}", - WaitStatus::from_raw(child, rc); + println!( + "[supervisor:{parent}] subprocess finished: {:?}", + WaitStatus::from_raw(child, rc) ); if let Some(msg) = msg { entrypoint = msg.next_entrypoint; if msg.drop_db { - tlog!(Info, "[supervisor] tarantool requested rebootstrap"); + println!("[supervisor:{parent}] subprocess requested rebootstrap"); rm_tarantool_files(&args.data_dir); } } else { diff --git a/test/supervisor_test.lua b/test/supervisor_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..67afe53aab7de2890dd1cee1e2a80f6ea6e81714 --- /dev/null +++ b/test/supervisor_test.lua @@ -0,0 +1,57 @@ +local t = require('luatest') +local h = require('test.helper') +local g = t.group() + +local ffi = require('ffi') +local fio = require('fio') +local log = require('log') +local popen = require('popen') + +local function pgrep_children(pid, result) + pid = pid or require('tarantool').pid() + result = result or {} + + local ps = t.assert(popen.shell('exec pgrep -P' .. pid, 'r')) + for _, child in ipairs(ps:read():strip():split()) do + table.insert(result, child) + pgrep_children(child, result) + end + return result +end + +g.before_test('test_sigkill', function() + g.data_dir = fio.tempdir() + + g.node = h.Picodata:new({ + name = 'single', + data_dir = g.data_dir, + listen = '127.0.0.1:13301', + peer = {'127.0.0.1:13301'}, + }) + g.node:start() + g.node:wait_started() + + g.children = pgrep_children() +end) +g.test_sigkill = function() + t.assert_equals(#g.children, 2, "something wrong with pgrep") + + g.node.process:kill(9) + log.warn("supervisor killed with SIGKILL") + + h.retrying({}, function() + for i, pid in ipairs(g.children) do + t.assert_not( + t.Process.is_pid_alive(pid), + string.format("child #%d (pid %s) didn't die", i, pid) + ) + end + end) +end +g.after_test('test_sigkill', function() + for _, pid in ipairs(g.children) do + t.Process.kill_pid(pid, 9, {quiet = true}) + end + fio.rmtree(g.data_dir) +end) +