From cc14fc85f0b39848c0ca2de147b9d8ebc681b97e Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja.osipov@gmail.com>
Date: Tue, 12 Jul 2011 21:31:27 +0400
Subject: [PATCH] feature-feeder-in-core: review fixes

Fix bugs in spawner child handling, performed at shutdown:
- shutdown children more portably
- remove races with lost SIGCHLD
- do not self-terminate when doing kill(0, signal)
(sending signal to entire process group).

Clean up custom_proc_title handling and fiber names.
---
 core/admin.m       |   2 +-
 core/admin.rl      |   2 +-
 core/fiber.m       |   6 +-
 core/replication.m | 474 ++++++++++++++++++++-------------------------
 core/tarantool.m   |   2 +-
 include/fiber.h    |   2 +-
 mod/box/box.m      |  28 +--
 7 files changed, 231 insertions(+), 285 deletions(-)

diff --git a/core/admin.m b/core/admin.m
index 412dc7b886..62d74731f0 100644
--- a/core/admin.m
+++ b/core/admin.m
@@ -1477,7 +1477,7 @@ admin_handler(void *_data __attribute__((unused)))
 int
 admin_init(void)
 {
-	if (fiber_server(cfg.admin_port, admin_handler, NULL, NULL) == NULL) {
+	if (fiber_server("admin", cfg.admin_port, admin_handler, NULL, NULL) == NULL) {
 		say_syserror("can't bind to %d", cfg.admin_port);
 		return -1;
 	}
diff --git a/core/admin.rl b/core/admin.rl
index 81265ecf7a..992b179715 100644
--- a/core/admin.rl
+++ b/core/admin.rl
@@ -223,7 +223,7 @@ admin_handler(void *_data __attribute__((unused)))
 int
 admin_init(void)
 {
-	if (fiber_server(cfg.admin_port, admin_handler, NULL, NULL) == NULL) {
+	if (fiber_server("admin", cfg.admin_port, admin_handler, NULL, NULL) == NULL) {
 		say_syserror("can't bind to %d", cfg.admin_port);
 		return -1;
 	}
diff --git a/core/fiber.m b/core/fiber.m
index 8998f2cbfe..0ccd53a503 100644
--- a/core/fiber.m
+++ b/core/fiber.m
@@ -1069,7 +1069,7 @@ spawn_child(const char *name, int inbox_size, struct tbuf *(*handler) (void *, s
 		close_all_xcpt(2, socks[0], sayfd);
 		snprintf(child_name, sizeof(child_name), "%s/child", name);
 		fiber_set_name(&sched, child_name);
-		set_proc_title(name);
+		set_proc_title("%s%s", name, custom_proc_title);
 		say_crit("%s initialized", name);
 		blocking_loop(socks[0], handler, state);
 	}
@@ -1129,14 +1129,14 @@ tcp_server_handler(void *data)
 }
 
 struct fiber *
-fiber_server(int port, void (*handler) (void *data), void *data,
+fiber_server(const char *name, int port, void (*handler) (void *data), void *data,
 	     void (*on_bind) (void *data))
 {
 	char server_name[FIBER_NAME_MAXLEN];
 	struct fiber_server *server;
 	struct fiber *s;
 
-	snprintf(server_name, sizeof(server_name), "%i/acceptor", port);
+	snprintf(server_name, sizeof(server_name), "%i/%s", port, name);
 	s = fiber_create(server_name, -1, -1, tcp_server_handler, data);
 	s->data = server = palloc(eter_pool, sizeof(struct fiber_server));
 	assert(server != NULL);
diff --git a/core/replication.m b/core/replication.m
index 14d22c7e3a..12b3872f40 100644
--- a/core/replication.m
+++ b/core/replication.m
@@ -46,29 +46,33 @@
  * master, spawner and replication relay.
  *
  * The spawner is created at server start, and master communicates
- * with the spawner using a socketpair(2).
+ * with the spawner using a socketpair(2). Replication relays are
+ * created by the spawner and handle one client connection each.
  *
  * The master process binds to replication_port and accepts
- * incoming connections. We do it in the master to be able to
+ * incoming connections. This is done in the master to be able to
  * correctly handle RELOAD CONFIGURATION, which happens in the
  * master, and, in future, perform authentication of replication
- * clients. Sine the master uses fibers to serve all clients,
- * acceptor fiber is just one of the many fibers in use. Once
- * a client socket is accepted, it is sent to the spawner process,
- * through the master's end of the socket pair.
+ * clients. Since the master uses fibers to serve all clients,
+ * replication acceptor fiber is just one of many fibers in use.
+ * Once a client socket is accepted, it is sent to the spawner
+ * process, through the master's end of the socket pair.
  *
  * The spawner listens on the receiving end of the socket pair and
  * for every received socket creates a replication relay, which is
  * then responsible for sending write ahead logs to the replica.
+ *
+ * Upon shutdown, the master closes its end of the socket pair.
+ * The spawner then reads EOF from its end, terminates all
+ * children and exits.
  */
-
-static int replication_socks[2];
+static int master_to_spawner_sock;
 
 /** replication_port acceptor fiber */
 static void
-acceptor_handler(void *data);
+acceptor_handler(void *data __attribute__((unused)));
 
-/** Send file descriptor to replication relay spawner.
+/** Send a file descriptor to replication relay spawner.
  *
  * @param client_sock the file descriptor to be sent.
  */
@@ -77,14 +81,12 @@ acceptor_send_sock(int client_sock);
 
 /** Replication spawner process */
 static struct spawner {
-	/** communication socket. */
+	/** reading end of the socket pair with the master */
 	int sock;
-	/** waitpid need */
-	bool need_waitpid;
-	/** got signal */
-	bool is_done;
-	/** child process counts */
-	u32 child_count;
+	/** non-zero if got a terminating signal */
+	sig_atomic_t killed;
+	/** child process count */
+	sig_atomic_t child_count;
 } spawner;
 
 /** Initialize spawner process.
@@ -94,38 +96,21 @@ static struct spawner {
 static void
 spawner_init(int sock);
 
-/**
- * Spawner main loop.
- */
+/** Spawner main loop. */
 static void
 spawner_main_loop();
 
-/**
- * Shutdown spawner and all its children.
- */
+/** Shutdown spawner and all its children. */
 static void
 spawner_shutdown();
 
-/** Replication spawner process signal handler.
- *
- * @param signal is signal number.
- */
+/** Handle SIGINT, SIGTERM, SIGPIPE, SIGHUP. */
 static void
 spawner_signal_handler(int signal);
 
-/**
- * Process waitpid children.
- */
+/** Handle SIGCHLD: collect status of a terminated child.  */
 static void
-spawner_wait_children();
-
-/**
- * Receive replication client socket from main process.
- *
- * @return On success, a zero is returned. On error, -1 is returned.
- */
-static int
-spawner_recv_client_sock(int *client_sock);
+spawner_sigchld_handler(int signal __attribute__((unused)));
 
 /** Create a replication relay.
  *
@@ -134,23 +119,22 @@ spawner_recv_client_sock(int *client_sock);
 static int
 spawner_create_replication_relay(int client_sock);
 
-/** Replicator spawner shutdown: kill children. */
+/** Shut down all relays when shutting down the spawner. */
 static void
-spawner_shutdown_kill_children();
-
-/** * Replicator spawner shutdown: wait children.  */
-static int
-spawner_shutdown_wait_children();
+spawner_shutdown_children();
 
 /** Initialize replication relay process. */
 static void
 replication_relay_loop(int client_sock);
 
-/** * Receive data event to replication socket handler. */
+/** A libev callback invoked when a relay client socket is ready
+ * for read. This currently only happens when the client closes
+ * its socket, and we get an EOF.
+ */
 static void
 replication_relay_recv(struct ev_io *w, int revents);
 
-/** * Send a single row to the client. */
+/** Send a single row to the client. */
 static int
 replication_relay_send_row(struct recovery_state *r __attribute__((unused)), struct tbuf *t);
 
@@ -181,16 +165,14 @@ replication_prefork()
 		/* replication is not needed, do nothing */
 		return;
 	}
+	int sockpair[2];
 	/*
 	 * Create UNIX sockets to communicate between the main and
 	 * spawner processes.
          */
-	if (socketpair(PF_LOCAL, SOCK_STREAM, 0, replication_socks) != 0)
+	if (socketpair(PF_LOCAL, SOCK_STREAM, 0, sockpair) != 0)
 		panic_syserror("socketpair");
 
-	if (set_nonblock(replication_socks[0]) == -1)
-		panic("set nonblock fail");
-
 	/* create spawner */
 	pid_t pid = fork();
 	if (pid == -1)
@@ -198,11 +180,19 @@ replication_prefork()
 
 	if (pid != 0) {
 		/* parent process: tarantool */
-		close(replication_socks[1]);
+		close(sockpair[1]);
+		master_to_spawner_sock = sockpair[0];
+		if (set_nonblock(master_to_spawner_sock) == -1)
+			panic("set_nonblock");
 	} else {
 		/* child process: spawner */
-		close(replication_socks[0]);
-		spawner_init(replication_socks[1]);
+		close(sockpair[0]);
+		/*
+		 * Move to an own process group, to not receive
+		 * signals from the controlling tty.
+		 */
+		setpgid(0, 0);
+		spawner_init(sockpair[1]);
 	}
 }
 
@@ -215,7 +205,7 @@ void
 replication_init()
 {
 	if (cfg.replication_port == 0)
-		return;                         /* replication is not in use */
+		return;                        /* replication is not in use */
 
 	char fiber_name[FIBER_NAME_MAXLEN];
 
@@ -267,7 +257,7 @@ acceptor_handler(void *data __attribute__((unused)))
 }
 
 
-/** Send file descriptor to the spawner. */
+/** Send a file descriptor to the spawner. */
 static void
 acceptor_send_sock(int client_sock)
 {
@@ -294,13 +284,13 @@ acceptor_send_sock(int client_sock)
 	*((int *) CMSG_DATA(control_message)) = client_sock;
 
 	/* wait, when interprocess comm. socket is ready for write */
-	fiber_io_start(replication_socks[0], EV_WRITE);
+	fiber_io_start(master_to_spawner_sock, EV_WRITE);
 	fiber_io_yield();
 	/* send client socket to the spawner */
-	if (sendmsg(replication_socks[0], &msg, 0) < 0)
+	if (sendmsg(master_to_spawner_sock, &msg, 0) < 0)
 		say_syserror("sendmsg");
 
-	fiber_io_stop(replication_socks[0], EV_WRITE);
+	fiber_io_stop(master_to_spawner_sock, EV_WRITE);
 	/* close client socket in the main process */
 	close(client_sock);
 }
@@ -318,268 +308,230 @@ spawner_init(int sock)
 	char name[sizeof(fiber->name)];
 	struct sigaction sa;
 
-	snprintf(name, sizeof(name), "replication%s/spawner", custom_proc_title);
+	snprintf(name, sizeof(name), "spawner%s", custom_proc_title);
 	fiber_set_name(fiber, name);
 	set_proc_title(name);
 
 	/* init replicator process context */
-	memset(&spawner, 0, sizeof(spawner));
 	spawner.sock = sock;
-	spawner.need_waitpid = false;
-	spawner.is_done = false;
-	spawner.child_count = 0;
 
-	/* init signal */
+	/* init signals */
 	memset(&sa, 0, sizeof(sa));
-	sa.sa_handler = SIG_IGN;
+	sigemptyset(&sa.sa_mask);
 
-	/* ignore SIGPIPE */
-	if (sigaction(SIGPIPE, &sa, NULL) == -1) {
-		say_syserror("sigaction SIGPIPE");
-	}
-
-	/* set handler for signals: SIGHUP, SIGINT, SIGTERM and SIGCHLD */
+	/*
+	 * The spawner normally does not receive any signals,
+	 * except when sent by a system administrator.
+	 * When the master process terminates, it closes its end
+	 * of the socket pair and this signals to the spawner that
+	 * it's time to die as well. But before exiting, the
+	 * spawner must kill and collect all active replication
+	 * relays. This is why we need to change the default
+	 * signal action here.
+	 */
 	sa.sa_handler = spawner_signal_handler;
 
-	if ((sigaction(SIGHUP, &sa, NULL) == -1) ||
-	    (sigaction(SIGINT, &sa, NULL) == -1) ||
-	    (sigaction(SIGTERM, &sa, NULL) == -1) ||
-	    (sigaction(SIGCHLD, &sa, NULL) == -1)) {
+	if (sigaction(SIGHUP, &sa, NULL) == -1 ||
+	    sigaction(SIGINT, &sa, NULL) == -1 ||
+	    sigaction(SIGTERM, &sa, NULL) == -1 ||
+	    sigaction(SIGPIPE, &sa, NULL) == -1)
+		say_syserror("sigaction");
+
+	sa.sa_handler = spawner_sigchld_handler;
+
+	if (sigaction(SIGCHLD, &sa, NULL) == -1)
 		say_syserror("sigaction");
-	}
 
 	say_crit("initialized");
 	spawner_main_loop();
 }
 
-/** Replicator spawner process main loop. */
+
+
+static int
+spawner_unpack_cmsg(struct msghdr *msg)
+{
+	struct cmsghdr *control_message;
+	for (control_message = CMSG_FIRSTHDR(msg);
+	     control_message != NULL;
+	     control_message = CMSG_NXTHDR(msg, control_message))
+		if ((control_message->cmsg_level == SOL_SOCKET) &&
+		    (control_message->cmsg_type == SCM_RIGHTS))
+			return *((int *) CMSG_DATA(control_message));
+	assert(false);
+	return -1;
+}
+
+/** Replication spawner process main loop. */
 static void
 spawner_main_loop()
 {
-	while (!spawner.is_done) {
-		int client_sock;
+	struct msghdr msg;
+	struct iovec iov[1];
+	char control_buf[CMSG_SPACE(sizeof(int))];
+	int cmd_code = 0;
+	int client_sock;
 
-		if (spawner.need_waitpid) {
-			spawner_wait_children();
-		}
+	iov[0].iov_base = &cmd_code;
+	iov[0].iov_len = sizeof(cmd_code);
 
-		if (spawner_recv_client_sock(&client_sock) != 0) {
-			continue;
-		}
+	msg.msg_name = NULL;
+	msg.msg_namelen = 0;
+	msg.msg_iov = iov;
+	msg.msg_iovlen = 1;
+	msg.msg_control = control_buf;
+	msg.msg_controllen = sizeof(control_buf);
 
-		if (client_sock > 0) {
+	while (!spawner.killed) {
+		int msglen = recvmsg(spawner.sock, &msg, 0);
+		if (msglen > 0) {
+			client_sock = spawner_unpack_cmsg(&msg);
 			spawner_create_replication_relay(client_sock);
+		} else if (msglen == 0) { /* orderly master shutdown */
+			say_info("Exiting: master shutdown");
+			break;
+		} else { /* msglen == -1 */
+			if (errno != EINTR)
+				say_syserror("recvmsg");
+			/* continue, the error may be temporary */
 		}
 	}
-
 	spawner_shutdown();
 }
 
-/** Replicator spawner shutdown. */
+/** Replication spawner shutdown. */
 static void
 spawner_shutdown()
 {
-	say_info("shutdown");
-
-	/* kill all children */
-	spawner_shutdown_kill_children();
 	/* close socket */
 	close(spawner.sock);
 
+	/* kill all children */
+	spawner_shutdown_children();
+
 	exit(EXIT_SUCCESS);
 }
 
-/** Replicator's spawner process signal handler. */
+/** Replication spawner signal handler for terminating signals. */
 static void spawner_signal_handler(int signal)
 {
-	switch (signal) {
-	case SIGHUP:
-	case SIGINT:
-	case SIGTERM:
-		spawner.is_done = true;
-		break;
-	case SIGCHLD:
-		spawner.need_waitpid = true;
-		break;
-	}
+	say_info("Exiting: %s", strsignal(signal));
+	spawner.killed = signal;
 }
 
-/** Process waitpid children. */
+/** Wait for a terminated child. */
 static void
-spawner_wait_children()
+spawner_sigchld_handler(int signo __attribute__((unused)))
 {
-	while (spawner.child_count > 0) {
+	do {
 		int exit_status;
-		pid_t pid;
-
-		pid = waitpid(-1, &exit_status, WNOHANG);
-		if (pid < 0) {
-			say_syserror("waitpid");
-			break;
-		}
-
-		if (pid == 0) {
-			/* done, all finished process are processed */
-			break;
-		}
-
-		say_info("child finished: pid = %d, exit status = %d", pid, WEXITSTATUS(exit_status));
-		spawner.child_count--;
-	}
-
-	spawner.need_waitpid = false;
-}
-
-/** Receive replication client socket from main process. */
-static int
-spawner_recv_client_sock(int *client_sock)
-{
-	struct msghdr msg;
-	struct iovec iov[1];
-	char control_buf[CMSG_SPACE(sizeof(int))];
-	struct cmsghdr *control_message = NULL;
-	int cmd_code = 0;
-
-	iov[0].iov_base = &cmd_code;
-	iov[0].iov_len = sizeof(cmd_code);
-
-	msg.msg_name = NULL;
-	msg.msg_namelen = 0;
-	msg.msg_iov = iov;
-	msg.msg_iovlen = 1;
-	msg.msg_control = control_buf;
-	msg.msg_controllen = sizeof(control_buf);
-
-	if (recvmsg(spawner.sock, &msg, 0) < 0) {
-		if (errno == EINTR) {
-			*client_sock = 0;
-			return 0;
+		pid_t pid = waitpid(-1, &exit_status, WNOHANG);
+		switch (pid) {
+		case -1:
+			if (errno != ECHILD)
+				say_syserror("waitpid");
+		case 0: /* no more changes in children status */
+			return;
+		default:
+			spawner.child_count--;
+			say_info("child finished: pid = %d, exit status = %d",
+				 (int) pid, WEXITSTATUS(exit_status));
 		}
-		say_syserror("recvmsg");
-		return -1;
-	}
-
-	for (control_message = CMSG_FIRSTHDR(&msg);
-	     control_message != NULL;
-	     control_message = CMSG_NXTHDR(&msg, control_message)) {
-		if ((control_message->cmsg_level == SOL_SOCKET) &&
-		    (control_message->cmsg_type == SCM_RIGHTS)) {
-			*client_sock = *((int *) CMSG_DATA(control_message));
-		}
-	}
-
-	return 0;
+	} while (spawner.child_count > 0);
 }
 
-/** Create replicator's client handler process. */
+/** Create replication client handler process. */
 static int
 spawner_create_replication_relay(int client_sock)
 {
-	pid_t pid;
+	pid_t pid = fork();
 
-	pid = fork();
 	if (pid < 0) {
 		say_syserror("fork");
 		return -1;
 	}
 
 	if (pid == 0) {
+		close(spawner.sock);
 		replication_relay_loop(client_sock);
 	} else {
-		say_info("replicator client handler spawned: pid = %d", pid);
 		spawner.child_count++;
 		close(client_sock);
+		say_info("created a replication relay: pid = %d", (int) pid);
 	}
 
 	return 0;
 }
 
-/** Replicator spawner shutdown: kill children. */
+/** Replicator spawner shutdown: kill and wait for children. */
 static void
-spawner_shutdown_kill_children()
+spawner_shutdown_children()
 {
-	int result = 0;
-
-	/* check child process count */
-	if (spawner.child_count == 0) {
-		return;
-	}
-
-	/* send terminate signal to children */
-	say_info("send SIGTERM to %"PRIu32" children", spawner.child_count);
-	result = kill(0, SIGTERM);
-	if (result != 0) {
-		say_syserror("kill");
-		return;
-	}
+	int kill_signo = SIGTERM, signo;
+	sigset_t mask, orig_mask, alarm_mask;
 
-	/* wait when process is down */
-	result = spawner_shutdown_wait_children();
-	if (result != 0) {
-		return;
-	}
+retry:
+	sigemptyset(&mask);
+	sigaddset(&mask, SIGCHLD);
+	sigaddset(&mask, SIGALRM);
+	/*
+	 * We're going to kill the entire process group, which
+	 * we're part of. Handle the signal sent to ourselves.
+	 */
+	sigaddset(&mask, kill_signo);
 
-	/* check child process count */
-	if (spawner.child_count == 0) {
-		say_info("all children terminated");
+	if (spawner.child_count == 0)
 		return;
-	}
-	say_info("%"PRIu32" children still alive", spawner.child_count);
 
-	/* send terminate signal to children */
-	say_info("send SIGKILL to %"PRIu32" children", spawner.child_count);
-	result = kill(0, SIGKILL);
-	if (result != 0) {
-		say_syserror("kill");
+	/* Block SIGCHLD and SIGALRM to avoid races. */
+	if (sigprocmask(SIG_BLOCK, &mask, &orig_mask)) {
+		say_syserror("sigprocmask");
 		return;
 	}
 
-	/* wait when process is down */
-	result = spawner_shutdown_wait_children();
-	if (result != 0) {
-		return;
-	}
-	say_info("all children terminated");
-}
+	/* We'll wait for children no longer than 5 sec.  */
+	alarm(5);
 
-/** Replicator spawner shutdown: wait children. */
-static int
-spawner_shutdown_wait_children()
-{
-	const u32 wait_sec = 5;
-	struct timeval tv;
+	say_info("sending signal %d to %"PRIu32" children", kill_signo,
+		 (u32) spawner.child_count);
 
-	say_info("wait for children %"PRIu32" seconds", wait_sec);
+	kill(0, kill_signo);
 
-	tv.tv_sec = wait_sec;
-	tv.tv_usec = 0;
+	say_info("waiting for children for up to 5 seconds");
 
-	/* wait children process */
-	spawner_wait_children();
 	while (spawner.child_count > 0) {
-		int result;
-
-		/* wait EINTR or timeout */
-		result = select(0, NULL, NULL, NULL, &tv);
-		if (result < 0 && errno != EINTR) {
-			/* this is not signal */
-			say_syserror("select");
-			return - 1;
+		sigwait(&mask, &signo);
+		if (signo == SIGALRM) {         /* timed out */
+			break;
 		}
+		else if (signo != kill_signo) {
+			assert(signo == SIGCHLD);
+			spawner_sigchld_handler(signo);
+		}
+	}
 
-		/* wait children process */
-		spawner_wait_children();
+	/* Reset the alarm. */
+	alarm(0);
 
-		/* check timeout */
-		if (tv.tv_sec == 0 && tv.tv_usec == 0) {
-			/* timeout happen */
-			break;
-		}
+	/* Clear possibly pending SIGALRM. */
+	sigpending(&alarm_mask);
+	if (sigismember(&alarm_mask, SIGALRM)) {
+		sigemptyset(&alarm_mask);
+		sigaddset(&alarm_mask, SIGALRM);
+		sigwait(&alarm_mask, &signo);
 	}
 
-	return 0;
-}
+	/* Restore the old mask. */
+	if (sigprocmask(SIG_SETMASK, &orig_mask, NULL)) {
+		say_syserror("sigprocmask");
+		return;
+	}
 
+	if (kill_signo == SIGTERM) {
+		kill_signo = SIGKILL;
+		goto retry;
+	}
+}
 
 /** The main loop of replication client service process. */
 static void
@@ -597,37 +549,35 @@ replication_relay_loop(int client_sock)
 
 	/* set process title and fiber name */
 	memset(name, 0, sizeof(name));
-	snprintf(name, sizeof(name), "replication%s/relay", custom_proc_title);
+	snprintf(name, sizeof(name), "relay/%s", fiber_peer_name(fiber));
 	fiber_set_name(fiber, name);
-	set_proc_title("%s %s", name, fiber_peer_name(fiber));
+	set_proc_title("%s%s", name, custom_proc_title);
 
 	/* init signals */
 	memset(&sa, 0, sizeof(sa));
+	sigemptyset(&sa.sa_mask);
 
-	/* ignore SIGPIPE and SIGCHLD */
-	sa.sa_handler = SIG_IGN;
-	if ((sigaction(SIGPIPE, &sa, NULL) == -1) ||
-	    (sigaction(SIGCHLD, &sa, NULL) == -1)) {
-		say_syserror("sigaction");
-	}
-
-	/* return signals SIGHUP, SIGINT and SIGTERM to default value */
+	/* Reset all signals to their defaults. */
 	sa.sa_handler = SIG_DFL;
+	if (sigaction(SIGCHLD, &sa, NULL) == -1 ||
+	    sigaction(SIGHUP, &sa, NULL) == -1 ||
+	    sigaction(SIGINT, &sa, NULL) == -1 ||
+	    sigaction(SIGTERM, &sa, NULL) == -1)
+		say_syserror("sigaction");
 
-	if ((sigaction(SIGHUP, &sa, NULL) == -1) ||
-	    (sigaction(SIGINT, &sa, NULL) == -1) ||
-	    (sigaction(SIGTERM, &sa, NULL) == -1)) {
+	/* Block SIGPIPE, we already handle EPIPE. */
+	sa.sa_handler = SIG_IGN;
+	if (sigaction(SIGPIPE, &sa, NULL) == -1)
 		say_syserror("sigaction");
-	}
 
 	r = read(fiber->fd, &lsn, sizeof(lsn));
 	if (r != sizeof(lsn)) {
 		if (r < 0) {
 			panic_syserror("read");
 		}
-		panic("invalid lns request size: %zu", r);
+		panic("invalid LSN request size: %zu", r);
 	}
-	say_info("start recover from lsn:%"PRIi64, lsn);
+	say_info("starting recovery from lsn:%"PRIi64, lsn);
 
 	ver = tbuf_alloc(fiber->pool);
 	tbuf_append(ver, &default_version, sizeof(default_version));
@@ -652,7 +602,7 @@ replication_relay_loop(int client_sock)
 
 	ev_loop(0);
 
-	say_crit("leave loop");
+	say_crit("exiting the relay loop");
 	exit(EXIT_SUCCESS);
 }
 
@@ -664,21 +614,15 @@ replication_relay_recv(struct ev_io *w, int __attribute__((unused)) revents)
 	u8 data;
 
 	int result = recv(fd, &data, sizeof(data), 0);
-	if (result < 0) {
-		if (errno == ECONNRESET) {
-			goto shutdown_handler;
-		}
-		say_syserror("recv");
-		exit(EXIT_FAILURE);
-	} else if (result == 0) {
-		/* end-of-input */
-		goto shutdown_handler;
+
+	if (result == 0 || (result < 0 && errno == ECONNRESET)) {
+		say_info("the client has closed its replication socket, exiting");
+		exit(EXIT_SUCCESS);
 	}
+	if (result < 0)
+		say_syserror("recv");
 
 	exit(EXIT_FAILURE);
-shutdown_handler:
-	say_info("replication socket closed on opposite side, exit");
-	exit(EXIT_SUCCESS);
 }
 
 /** Send to row to client. */
@@ -703,7 +647,7 @@ replication_relay_send_row(struct recovery_state *r __attribute__((unused)), str
 	say_debug("send row: %" PRIu32 " bytes %s", t->len, tbuf_to_hex(t));
 	return 0;
 shutdown_handler:
-	say_info("replication socket closed on opposite side, exit");
+	say_info("the client has closed its replication socket, exiting");
 	exit(EXIT_SUCCESS);
 }
 
diff --git a/core/tarantool.m b/core/tarantool.m
index d6dc3a9776..414bc91200 100644
--- a/core/tarantool.m
+++ b/core/tarantool.m
@@ -557,9 +557,9 @@ main(int argc, char **argv)
 
 	signal_init();
 
-	replication_init();
 	mod_init();
 	admin_init();
+	replication_init();
 
 	prelease(fiber->pool);
 	say_crit("log level %i", cfg.log_level);
diff --git a/include/fiber.h b/include/fiber.h
index d44897396b..0a85b7e0d6 100644
--- a/include/fiber.h
+++ b/include/fiber.h
@@ -203,7 +203,7 @@ int set_nonblock(int sock);
 
 typedef void (*fiber_server_callback)(void *);
 
-struct fiber *fiber_server(int port,
+struct fiber *fiber_server(const char *name, int port,
 			   fiber_server_callback callback, void *,
 			   void (*on_bind) (void *));
 
diff --git a/mod/box/box.m b/mod/box/box.m
index edd4d1dab0..e9496ce8de 100644
--- a/mod/box/box.m
+++ b/mod/box/box.m
@@ -1164,9 +1164,10 @@ title(const char *fmt, ...)
 	va_end(ap);
 
 	int ports[] = { cfg.primary_port, cfg.secondary_port,
-			cfg.memcached_port, cfg.admin_port };
+			cfg.memcached_port, cfg.admin_port,
+			cfg.replication_port };
 	int *pptr = ports;
-	char *names[] = { "pri", "sec", "memc", "adm", NULL };
+	char *names[] = { "pri", "sec", "memc", "adm", "rpl", NULL };
 	char **nptr = names;
 
 	for (; *nptr; nptr++, pptr++)
@@ -1195,8 +1196,8 @@ box_enter_master_or_replica_mode(struct tarantool_cfg *conf)
 
 		memcached_start_expire();
 
-		snprintf(status, sizeof(status), "primary");
-		title("primary");
+		snprintf(status, sizeof(status), "primary%s", custom_proc_title);
+		title("primary%s", custom_proc_title);
 
 		say_info("I am primary");
 	}
@@ -1461,21 +1462,22 @@ mod_init(void)
 		title("hot_standby");
 	}
 
-	/* run memcached server */
-	if (cfg.memcached_port != 0)
-		fiber_server(cfg.memcached_port, memcached_handler, NULL, NULL);
+	/* run primary server */
+	if (cfg.primary_port != 0)
+		fiber_server("primary", cfg.primary_port,
+			     (fiber_server_callback) iproto_interact,
+			     &rw_callback, box_leave_local_standby_mode);
 
 	/* run secondary server */
 	if (cfg.secondary_port != 0)
-		fiber_server(cfg.secondary_port,
+		fiber_server("secondary", cfg.secondary_port,
 			     (fiber_server_callback) iproto_interact,
 			     &ro_callback, NULL);
 
-	/* run primary server */
-	if (cfg.primary_port != 0)
-		fiber_server(cfg.primary_port,
-			     (fiber_server_callback) iproto_interact,
-			     &rw_callback, box_leave_local_standby_mode);
+	/* run memcached server */
+	if (cfg.memcached_port != 0)
+		fiber_server("memcached", cfg.memcached_port,
+			     memcached_handler, NULL, NULL);
 }
 
 int
-- 
GitLab