From 5b5d25f38595557ae97f0f19c1e9a5e6a026f0ce Mon Sep 17 00:00:00 2001
From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com>
Date: Thu, 21 Apr 2022 12:35:51 +0300
Subject: [PATCH] Handle supervisor termination in child

We don't want a child process to live without the supervisor.

Usually, supervisor waits for child forever and retransmits termination
signals. But if the parent is killed with a SIGKILL there's no way to
pass anything.

This patch supplies a child process with a `supervisor_fuse` fiber. It
tries to read from a pipe (that supervisor never writes to), and if the
writing end is closed, it means the supervisor has terminated. In this
case, child process terminates too.

Part of https://git.picodata.io/picodata/picodata/picodata/-/issues/56
---
 src/ipc.rs               |  2 +-
 src/main.rs              | 59 +++++++++++++++++++++++++++-------------
 test/supervisor_test.lua | 57 ++++++++++++++++++++++++++++++++++++++
 3 files changed, 98 insertions(+), 20 deletions(-)
 create mode 100644 test/supervisor_test.lua

diff --git a/src/ipc.rs b/src/ipc.rs
index afd66ae99e..57edec95dd 100644
--- a/src/ipc.rs
+++ b/src/ipc.rs
@@ -5,7 +5,7 @@ use serde::ser::Serialize;
 use std::marker::PhantomData;
 
 #[derive(Debug)]
-pub struct Fd(libc::c_int);
+pub struct Fd(pub libc::c_int);
 
 #[derive(Debug)]
 pub struct Sender<T> {
diff --git a/src/main.rs b/src/main.rs
index 2e16705eb7..5bda914ce8 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -148,11 +148,7 @@ fn rm_tarantool_files(data_dir: &str) {
                 .unwrap_or(false)
         })
         .for_each(|f| {
-            tlog!(
-                Warning,
-                "[supervisor] removing file: {}",
-                (&f).to_string_lossy()
-            );
+            println!("[supervisor] removing file: {}", (&f).to_string_lossy());
             std::fs::remove_file(f).unwrap();
         });
 }
@@ -224,8 +220,6 @@ fn main_run(args: args::Run) -> ! {
         }
     }
 
-    let mut entrypoint = Entrypoint::StartDiscover {};
-
     // Tarantool running in a fork (or, to be more percise, the
     // libreadline) modifies termios settings to intercept echoed text.
     //
@@ -236,30 +230,56 @@ fn main_run(args: args::Run) -> ! {
     //
     let tcattr = tcgetattr(0).ok();
 
+    let parent = unistd::getpid();
+    let mut entrypoint = Entrypoint::StartDiscover {};
     loop {
-        tlog!(Info, "[supervisor] running {:?}", entrypoint);
+        println!("[supervisor:{parent}] running {entrypoint:?}");
 
-        let pipe = ipc::channel::<IpcMessage>();
-        let (rx, tx) = pipe.expect("pipe creation failed");
+        let (from_child, to_parent) =
+            ipc::channel::<IpcMessage>().expect("ipc channel creation failed");
+        let (from_parent, to_child) = ipc::pipe().expect("ipc pipe creation failed");
 
         let pid = unsafe { fork() };
         match pid.expect("fork failed") {
             ForkResult::Child => {
-                drop(rx);
+                drop(from_child);
+                drop(to_child);
 
                 let rc = tarantool_main!(
                     args.tt_args().unwrap(),
-                    callback_data: (entrypoint, args, tx),
-                    callback_data_type: (Entrypoint, args::Run, ipc::Sender<IpcMessage>),
+                    callback_data: (entrypoint, args, to_parent, from_parent),
+                    callback_data_type: (Entrypoint, args::Run, ipc::Sender<IpcMessage>, ipc::Fd),
                     callback_body: {
-                        entrypoint.exec(args, tx)
+                        // We don't want a child to live without a supervisor.
+                        //
+                        // Usually, supervisor waits for child forever and retransmits
+                        // termination signals. But if the parent is killed with a SIGKILL
+                        // there's no way to pass anything.
+                        //
+                        // This fiber serves as a fuse - it tries to read from a pipe
+                        // (that supervisor never writes to), and if the writing end is
+                        // closed, it means the supervisor has terminated.
+                        let fuse = fiber::Builder::new()
+                            .name("supervisor_fuse")
+                            .func(move || {
+                                use ::tarantool::ffi::tarantool::CoIOFlags;
+                                use ::tarantool::coio::coio_wait;
+                                coio_wait(from_parent.0, CoIOFlags::READ, f64::INFINITY).ok();
+                                tlog!(Warning, "Supervisor terminated, exiting");
+                                std::process::exit(0);
+                        });
+                        std::mem::forget(fuse.start());
+
+                        entrypoint.exec(args, to_parent)
                     }
                 );
                 std::process::exit(rc);
             }
             ForkResult::Parent { child } => {
-                drop(tx);
-                let msg = rx.recv().ok();
+                drop(from_parent);
+                drop(to_parent);
+
+                let msg = from_child.recv().ok();
 
                 let mut rc: i32 = 0;
                 unsafe {
@@ -275,14 +295,15 @@ fn main_run(args: args::Run) -> ! {
                     tcsetattr(0, TCSADRAIN, tcattr).unwrap();
                 }
 
-                tlog!(Info, "[supervisor] tarantool process finished: {:?}",
-                    WaitStatus::from_raw(child, rc);
+                println!(
+                    "[supervisor:{parent}] subprocess finished: {:?}",
+                    WaitStatus::from_raw(child, rc)
                 );
 
                 if let Some(msg) = msg {
                     entrypoint = msg.next_entrypoint;
                     if msg.drop_db {
-                        tlog!(Info, "[supervisor] tarantool requested rebootstrap");
+                        println!("[supervisor:{parent}] subprocess requested rebootstrap");
                         rm_tarantool_files(&args.data_dir);
                     }
                 } else {
diff --git a/test/supervisor_test.lua b/test/supervisor_test.lua
new file mode 100644
index 0000000000..67afe53aab
--- /dev/null
+++ b/test/supervisor_test.lua
@@ -0,0 +1,57 @@
+local t = require('luatest')
+local h = require('test.helper')
+local g = t.group()
+
+local ffi = require('ffi')
+local fio = require('fio')
+local log = require('log')
+local popen = require('popen')
+
+local function pgrep_children(pid, result)
+    pid = pid or require('tarantool').pid()
+    result = result or {}
+
+    local ps = t.assert(popen.shell('exec pgrep -P' .. pid, 'r'))
+    for _, child in ipairs(ps:read():strip():split()) do
+        table.insert(result, child)
+        pgrep_children(child, result)
+    end
+    return result
+end
+
+g.before_test('test_sigkill', function()
+    g.data_dir = fio.tempdir()
+
+    g.node = h.Picodata:new({
+            name = 'single',
+            data_dir = g.data_dir,
+            listen = '127.0.0.1:13301',
+            peer = {'127.0.0.1:13301'},
+    })
+    g.node:start()
+    g.node:wait_started()
+
+    g.children = pgrep_children()
+end)
+g.test_sigkill = function()
+    t.assert_equals(#g.children, 2, "something wrong with pgrep")
+
+    g.node.process:kill(9)
+    log.warn("supervisor killed with SIGKILL")
+
+    h.retrying({}, function()
+        for i, pid in ipairs(g.children) do
+            t.assert_not(
+                t.Process.is_pid_alive(pid),
+                string.format("child #%d (pid %s) didn't die", i, pid)
+            )
+        end
+    end)
+end
+g.after_test('test_sigkill', function()
+    for _, pid in ipairs(g.children) do
+        t.Process.kill_pid(pid, 9, {quiet = true})
+    end
+    fio.rmtree(g.data_dir)
+end)
+
-- 
GitLab