From 70381298503a837a650855f9123fa581e3c43ea9 Mon Sep 17 00:00:00 2001
From: Sergey V <sv@picodata.io>
Date: Mon, 30 May 2022 20:00:40 +0300
Subject: [PATCH] refactor: add prefixes to the instance output

---
 test/int/conftest.py | 34 ++++++++++++++++++++++++++++++----
 1 file changed, 30 insertions(+), 4 deletions(-)

diff --git a/test/int/conftest.py b/test/int/conftest.py
index 890202a2f5..1e6ebd8742 100644
--- a/test/int/conftest.py
+++ b/test/int/conftest.py
@@ -1,6 +1,8 @@
+import io
 import os
 import re
 import sys
+import threading
 import funcy  # type: ignore
 import pytest
 import signal
@@ -20,7 +22,7 @@ from tarantool.error import (  # type: ignore
 # From raft.rs:
 # A constant represents invalid id of raft.
 # pub const INVALID_ID: u64 = 0;
-INVALID_ID = 0
+INVALID_RAFT_ID = 0
 RE_XDIST_WORKER_ID = re.compile(r"^gw(\d+)$")
 
 
@@ -121,6 +123,9 @@ class RaftStatus:
     is_ready: bool
 
 
+OUT_LOCK = threading.Lock()
+
+
 @dataclass
 class Instance:
     binary_path: str
@@ -133,7 +138,7 @@ class Instance:
 
     env: dict[str, str] = field(default_factory=dict)
     process: subprocess.Popen | None = None
-    raft_id: int = INVALID_ID
+    raft_id: int = INVALID_RAFT_ID
 
     @property
     def listen(self):
@@ -214,6 +219,14 @@ class Instance:
         finally:
             self.kill()
 
+    def _process_output(self, src, out):
+        prefix = f"{self.instance_id} | "
+        for line in io.TextIOWrapper(src, line_buffering=True):
+            with OUT_LOCK:
+                out.write(prefix)
+                out.write(line)
+                out.flush()
+
     def start(self):
         if self.process:
             # Be idempotent
@@ -224,8 +237,22 @@ class Instance:
             env=self.env or None,
             stdin=subprocess.DEVNULL,
             start_new_session=True,
+            stdout=subprocess.PIPE,
+            stderr=subprocess.PIPE,
         )
 
+        for src, out in [
+            (self.process.stdout, sys.stdout),
+            (self.process.stderr, sys.stderr),
+        ]:
+            threading.Thread(
+                target=self._process_output,
+                args=(src, out),
+                daemon=True,
+            ).start()
+
+        eprint(f"{self} started")
+
         eprint(f"{self} starting...")
 
         # Assert a new process group is created
@@ -396,8 +423,7 @@ def cluster(binary_path, tmpdir, worker_id) -> Generator[Cluster, None, None]:
         max_port=max_port,
     )
     yield cluster
-    cluster.terminate()
-    cluster.remove_data()
+    cluster.kill()
 
 
 @pytest.fixture
-- 
GitLab