From 1aba8acf43a206a0493eafce5f7dd91c6552ff02 Mon Sep 17 00:00:00 2001
From: GeorgyKirichenko <kirichenkoga@gmail.com>
Date: Thu, 2 Mar 2017 15:44:57 +0300
Subject: [PATCH] Do listen after recovery. Issue #1932

Split evio service start to separate functions bind and listen. Remove
rebind logic from evio_service and on_bind callback and implement in
iproto logic. Do listen after box recovery.
---
 src/box/box.cc     |  15 ++++-
 src/box/box.h      |   3 +-
 src/box/iproto.cc  | 116 +++++++++++++++++++++++++--------
 src/box/iproto.h   |   5 +-
 src/box/lua/cfg.cc |   3 +-
 src/coio.cc        |  16 +----
 src/evio.cc        | 155 +++++++++++++++++++--------------------------
 src/evio.h         |  33 ++++------
 8 files changed, 191 insertions(+), 155 deletions(-)

diff --git a/src/box/box.cc b/src/box/box.cc
index be0ced6865..36fd3e947c 100644
--- a/src/box/box.cc
+++ b/src/box/box.cc
@@ -375,11 +375,17 @@ box_set_replication_source(void)
 }
 
 void
-box_set_listen(void)
+box_bind(void)
 {
 	const char *uri = cfg_gets("listen");
 	box_check_uri(uri, "listen");
-	iproto_set_listen(uri);
+	iproto_bind(uri);
+}
+
+void
+box_listen(void)
+{
+	iproto_listen();
 }
 
 /**
@@ -1010,6 +1016,8 @@ box_init(void)
 	session_init();
 
 	cluster_init();
+	port_init();
+	iproto_init();
 
 	title("loading");
 
@@ -1070,7 +1078,7 @@ box_init(void)
 
 	port_init();
 	iproto_init();
-	box_set_listen();
+	box_bind();
 
 	int64_t rows_per_wal = box_check_rows_per_wal(cfg_geti64("rows_per_wal"));
 	enum wal_mode wal_mode = box_check_wal_mode(cfg_gets("wal_mode"));
@@ -1085,6 +1093,7 @@ box_init(void)
 
 	/* Enter read-write mode. */
 	cluster_wait_for_id();
+	box_listen();
 
 	title("running");
 	say_info("ready to accept requests");
diff --git a/src/box/box.h b/src/box/box.h
index 30eaf71376..93edfe9f14 100644
--- a/src/box/box.h
+++ b/src/box/box.h
@@ -106,7 +106,8 @@ box_process_subscribe(int fd, struct xrow_header *header);
 void
 box_check_config();
 
-void box_set_listen(void);
+void box_bind(void);
+void box_listen(void);
 void box_set_replication_source(void);
 void box_set_wal_mode(void);
 void box_set_log_level(void);
diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index e51e36807a..0b26f046f3 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -968,7 +968,7 @@ iproto_init()
  * we need to bounce a couple of messages to and
  * from this thread.
  */
-struct iproto_set_listen_msg: public cmsg
+struct iproto_bind_msg: public cmsg
 {
 	/**
 	 * If there was an error setting the listen port,
@@ -985,44 +985,42 @@ struct iproto_set_listen_msg: public cmsg
 	 * bind.
 	 */
 	struct cmsg_notify wakeup;
+	/**
+	 * Return code for bind:
+	 * 0 - Ok,
+	 * 1 - busy,
+	 * -1 - an error
+	 */
+	int rc;
 };
 
-/**
- * The bind has finished, notify the caller.
- */
 static void
-iproto_on_bind(void *arg)
+iproto_do_bind(struct cmsg *m)
 {
-	cpipe_push(&tx_pipe, (struct cmsg *) arg);
-}
-
-static void
-iproto_do_set_listen(struct cmsg *m)
-{
-	struct iproto_set_listen_msg *msg =
-		(struct iproto_set_listen_msg *) m;
+	struct iproto_bind_msg *msg = (struct iproto_bind_msg *) m;
 	try {
 		if (evio_service_is_active(&binary))
 			evio_service_stop(&binary);
-
+		msg->rc = 0;
 		if (msg->uri != NULL) {
-			binary.on_bind = iproto_on_bind;
-			binary.on_bind_param = &msg->wakeup;
-			evio_service_start(&binary, msg->uri);
-		} else {
-			iproto_on_bind(&msg->wakeup);
+			msg->rc = evio_service_bind(&binary, msg->uri);
+			{
+			}
+			/* The bind has finished, notify the caller. */
 		}
+		cpipe_push(&tx_pipe, &msg->wakeup.base);
 	} catch (Exception *e) {
+		msg->rc = -1;
 		diag_move(&fiber()->diag, &msg->diag);
-		iproto_on_bind(&msg->wakeup);
+		cpipe_push(&tx_pipe, &msg->wakeup.base);
 	}
 }
 
 static void
-iproto_set_listen_msg_init(struct iproto_set_listen_msg *msg,
+iproto_bind_msg_init(struct iproto_bind_msg *msg,
 			    const char *uri)
 {
-	static cmsg_hop route[] = { { iproto_do_set_listen, NULL }, };
+	static cmsg_hop route[] = { { iproto_do_bind, NULL }, };
 	cmsg_init(msg, route);
 	msg->uri = uri;
 	diag_create(&msg->diag);
@@ -1031,7 +1029,7 @@ iproto_set_listen_msg_init(struct iproto_set_listen_msg *msg,
 }
 
 void
-iproto_set_listen(const char *uri)
+iproto_bind(const char *uri)
 {
 	/**
 	 * This is a tricky orchestration for something
@@ -1042,8 +1040,75 @@ iproto_set_listen(const char *uri)
 	 * uri, and another one, which will alert tx
 	 * thread when bind() on the new port is done.
 	 */
-	static struct iproto_set_listen_msg msg;
-	iproto_set_listen_msg_init(&msg, uri);
+	const double BIND_RETRY_DELAY = 0.1;
+	static struct iproto_bind_msg msg;
+	iproto_bind_msg_init(&msg, uri);
+
+	bool first_bind = true;
+	while (true) {
+		cpipe_push(&net_pipe, &msg);
+		/** Wait for the end of bind. */
+		fiber_yield();
+		if (msg.rc != 1)
+			break;
+		if (first_bind)
+			say_warn("%s: %s is already in use, "
+				 "will retry binding",
+				 "binary", msg.uri);
+//		first_try = false;
+		fiber_sleep(BIND_RETRY_DELAY);
+	}
+	if (msg.rc != 0) {
+		assert(!diag_is_empty(&msg.diag));
+		diag_move(&msg.diag, &fiber()->diag);
+		diag_raise();
+	}
+}
+
+struct iproto_listen_msg: public cmsg
+{
+	/**
+	 * If there was an error while listen port,
+	 * this will contain the error when the message
+	 * returns to the caller.
+	 */
+	struct diag diag;
+	/**
+	 * The way to tell the caller about the end of
+	 * bind.
+	 */
+	struct cmsg_notify wakeup;
+};
+
+static void
+iproto_do_listen(struct cmsg *m)
+{
+	struct iproto_listen_msg *msg =
+		(struct iproto_listen_msg *) m;
+	try {
+		if (evio_service_is_active(&binary))
+			evio_service_listen(&binary);
+	} catch (Exception *e) {
+		diag_move(&fiber()->diag, &msg->diag);
+	}
+	cpipe_push(&tx_pipe, &msg->wakeup.base);
+}
+
+static void
+iproto_listen_msg_init(struct iproto_listen_msg *msg)
+{
+	static cmsg_hop route[] = { { iproto_do_listen, NULL }, };
+	cmsg_init(msg, route);
+	diag_create(&msg->diag);
+
+	cmsg_notify_init(&msg->wakeup);
+}
+
+void
+iproto_listen()
+{
+	static struct iproto_listen_msg msg;
+	iproto_listen_msg_init(&msg);
 
 	cpipe_push(&net_pipe, &msg);
 	/** Wait for the end of bind. */
@@ -1052,6 +1117,7 @@ iproto_set_listen(const char *uri)
 		diag_move(&msg.diag, &fiber()->diag);
 		diag_raise();
 	}
+
 }
 
 /* vim: set foldmethod=marker */
diff --git a/src/box/iproto.h b/src/box/iproto.h
index 6c62c141e9..6ae86bf028 100644
--- a/src/box/iproto.h
+++ b/src/box/iproto.h
@@ -34,6 +34,9 @@ void
 iproto_init();
 
 void
-iproto_set_listen(const char *uri);
+iproto_bind(const char *uri);
+
+void
+iproto_listen();
 
 #endif
diff --git a/src/box/lua/cfg.cc b/src/box/lua/cfg.cc
index 6c6fbcbf5a..f953dd178d 100644
--- a/src/box/lua/cfg.cc
+++ b/src/box/lua/cfg.cc
@@ -79,7 +79,8 @@ static int
 lbox_cfg_set_listen(struct lua_State *L)
 {
 	try {
-		box_set_listen();
+		box_bind();
+		box_listen();
 	} catch (Exception *) {
 		luaT_error(L);
 	}
diff --git a/src/coio.cc b/src/coio.cc
index 67ec93d7c5..dbba9eb0b4 100644
--- a/src/coio.cc
+++ b/src/coio.cc
@@ -640,23 +640,11 @@ coio_service_init(struct coio_service *service, const char *name,
 	service->handler_param = handler_param;
 }
 
-static void
-on_bind(void *arg)
-{
-	fiber_wakeup((struct fiber *) arg);
-}
-
 void
 coio_service_start(struct evio_service *service, const char *uri)
 {
-	assert(service->on_bind == NULL);
-	assert(service->on_bind_param == NULL);
-	service->on_bind = on_bind;
-	service->on_bind_param = fiber();
-	evio_service_start(service, uri);
-	fiber_yield();
-	service->on_bind_param = NULL;
-	service->on_bind = NULL;
+	evio_service_bind(service, uri);
+	evio_service_listen(service);
 }
 
 void
diff --git a/src/evio.cc b/src/evio.cc
index a407db4711..f1c5d2a82c 100644
--- a/src/evio.cc
+++ b/src/evio.cc
@@ -40,8 +40,6 @@
 
 #include <trivia/util.h>
 
-#define BIND_RETRY_DELAY 0.1
-
 static void
 evio_setsockopt_server(int fd, int family, int type);
 
@@ -194,6 +192,10 @@ evio_service_accept_cb(ev_loop * /* loop */, ev_io *watcher,
 	}
 }
 
+/*
+ * Check if the unix socket which we file to create exists and
+ * no one is listening on it. Unlink the file if it's the case.
+ */
 static bool
 evio_service_reuse_addr(struct evio_service *service)
 {
@@ -220,11 +222,10 @@ evio_service_reuse_addr(struct evio_service *service)
 	return false;
 }
 
-/** Try to bind and listen on the configured port.
+/**
+ * Try to bind on the configured port.
  *
  * Throws an exception if error.
- * Returns -1 if the address is already in use, and one
- * needs to retry binding.
  */
 static int
 evio_service_bind_addr(struct evio_service *service)
@@ -243,85 +244,37 @@ evio_service_bind_addr(struct evio_service *service)
 		assert(errno == EADDRINUSE);
 		if (!evio_service_reuse_addr(service) ||
 			sio_bind(fd, &service->addr, service->addr_len)) {
-			return -1;
+			return 1;
 		}
 	}
 
-	if (sio_listen(fd)) {
-		return -1;
-	}
-
 	say_info("%s: bound to %s", evio_service_name(service),
 		 sio_strfaddr(&service->addr, service->addr_len));
 
-	/* Invoke on_bind callback if it is set. */
-	if (service->on_bind)
-		service->on_bind(service->on_bind_param);
-
 	/* Register the socket in the event loop. */
 	ev_io_set(&service->ev, fd, EV_READ);
-	ev_io_start(service->loop, &service->ev);
+
 	fd_guard.is_active = false;
 	return 0;
 }
 
-static int
-evio_service_bind_and_listen(struct evio_service *service)
-{
-	if (strcmp(service->host, URI_HOST_UNIX) == 0) {
-		/* UNIX domain socket */
-		struct sockaddr_un *un = (struct sockaddr_un *) &service->addr;
-		service->addr_len = sizeof(*un);
-		snprintf(un->sun_path, sizeof(un->sun_path), "%s",
-			 service->serv);
-		un->sun_family = AF_UNIX;
-		return evio_service_bind_addr(service);
-	}
-
-	/* IP socket */
-	struct addrinfo hints, *res;
-	memset(&hints, 0, sizeof(hints));
-	hints.ai_family = AF_UNSPEC;
-	hints.ai_socktype = SOCK_STREAM;
-	hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
-
-	/* make no difference between empty string and NULL for host */
-	if (getaddrinfo(*service->host ? service->host : NULL, service->serv,
-			&hints, &res) != 0 || res == NULL)
-		tnt_raise(SocketError, -1, "can't resolve uri for bind");
-	auto addrinfo_guard = make_scoped_guard([=]{ freeaddrinfo(res); });
-
-	for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
-		memcpy(&service->addr, ai->ai_addr, ai->ai_addrlen);
-		service->addr_len = ai->ai_addrlen;
-		try {
-			return evio_service_bind_addr(service);
-		} catch (SocketError *e) {
-			say_error("%s: failed to bind on %s: %s",
-				  evio_service_name(service),
-				  sio_strfaddr(ai->ai_addr, ai->ai_addrlen),
-				  e->get_errmsg());
-			/* ignore */
-		}
-	}
-
-	tnt_raise(SocketError, -1, "%s: failed to bind",
-		  evio_service_name(service));
-}
-
-/** A callback invoked by libev when sleep timer expires.
+/**
+ * Listen on bounded port.
  *
- * Retry binding. On success, stop the timer. If the port
- * is still in use, pause again.
+ * @retval 0 for success
  */
-static void
-evio_service_timer_cb(ev_loop *loop, ev_timer *watcher, int /* revents */)
+void
+evio_service_listen(struct evio_service *service)
 {
-	struct evio_service *service = (struct evio_service *) watcher->data;
-	assert(! ev_is_active(&service->ev));
+	say_debug("%s: listening on %s...", evio_service_name(service),
+		  sio_strfaddr(&service->addr, service->addr_len));
 
-	if (evio_service_bind_and_listen(service) == 0)
-		ev_timer_stop(loop, watcher);
+	int fd = service->ev.fd;
+	if (sio_listen(fd)) {
+		/* raise for addr in use to */
+		tnt_raise(SocketError, fd, "listen");
+	}
+	ev_io_start(service->loop, &service->ev);
 }
 
 void
@@ -343,17 +296,15 @@ evio_service_init(ev_loop *loop,
 	 * are active or not in evio_service_stop().
 	 */
 	ev_init(&service->ev, evio_service_accept_cb);
-	ev_init(&service->timer, evio_service_timer_cb);
-	service->timer.data = service->ev.data = service;
+	ev_io_set(&service->ev, -1, 0);
+	service->ev.data = service;
 }
 
 /**
- * Try to bind and listen. If the port is in use,
- * say a warning, and start the timer which will retry
- * binding periodically.
+ * Try to bind.
  */
-void
-evio_service_start(struct evio_service *service, const char *uri)
+int
+evio_service_bind(struct evio_service *service, const char *uri)
 {
 	struct uri u;
 	if (uri_parse(&u, uri) || u.service == NULL)
@@ -368,20 +319,44 @@ evio_service_start(struct evio_service *service, const char *uri)
 
 	assert(! ev_is_active(&service->ev));
 
-	say_info("%s: started", evio_service_name(service));
+	if (strcmp(service->host, URI_HOST_UNIX) == 0) {
+		/* UNIX domain socket */
+		struct sockaddr_un *un = (struct sockaddr_un *) &service->addr;
+		service->addr_len = sizeof(*un);
+		snprintf(un->sun_path, sizeof(un->sun_path), "%s",
+			 service->serv);
+		un->sun_family = AF_UNIX;
+		return evio_service_bind_addr(service);
+	}
+
+	/* IP socket */
+	struct addrinfo hints, *res;
+	memset(&hints, 0, sizeof(hints));
+	hints.ai_family = AF_UNSPEC;
+	hints.ai_socktype = SOCK_STREAM;
+	hints.ai_flags = AI_PASSIVE|AI_ADDRCONFIG;
 
-	if (evio_service_bind_and_listen(service)) {
-		/* Try again after a delay. */
-		say_warn("%s: %s is already in use, will "
-			 "retry binding after %lf seconds.",
-			 evio_service_name(service),
-			 sio_strfaddr(&service->addr, service->addr_len),
-			 BIND_RETRY_DELAY);
+	/* make no difference between empty string and NULL for host */
+	if (getaddrinfo(*service->host ? service->host : NULL, service->serv,
+			&hints, &res) != 0 || res == NULL)
+		tnt_raise(SocketError, -1, "can't resolve uri for bind");
+	auto addrinfo_guard = make_scoped_guard([=]{ freeaddrinfo(res); });
 
-		ev_timer_set(&service->timer,
-			     BIND_RETRY_DELAY, BIND_RETRY_DELAY);
-		ev_timer_start(service->loop, &service->timer);
+	for (struct addrinfo *ai = res; ai != NULL; ai = ai->ai_next) {
+		memcpy(&service->addr, ai->ai_addr, ai->ai_addrlen);
+		service->addr_len = ai->ai_addrlen;
+		try {
+			return evio_service_bind_addr(service);
+		} catch (SocketError *e) {
+			say_error("%s: failed to bind on %s: %s",
+				  evio_service_name(service),
+				  sio_strfaddr(ai->ai_addr, ai->ai_addrlen),
+				  e->get_errmsg());
+			/* ignore */
+		}
 	}
+	tnt_raise(SocketError, -1, "%s: failed to bind",
+		  evio_service_name(service));
 }
 
 /** It's safe to stop a service which is not started yet. */
@@ -390,11 +365,13 @@ evio_service_stop(struct evio_service *service)
 {
 	say_info("%s: stopped", evio_service_name(service));
 
-	if (! ev_is_active(&service->ev)) {
-		ev_timer_stop(service->loop, &service->timer);
-	} else {
+	if (ev_is_active(&service->ev)) {
 		ev_io_stop(service->loop, &service->ev);
+	}
+
+	if (service->ev.fd >= 0) {
 		close(service->ev.fd);
+		ev_io_set(&service->ev, -1, 0);
 		if (service->addr.sa_family == AF_UNIX) {
 			unlink(((struct sockaddr_un *) &service->addr)->sun_path);
 		}
diff --git a/src/evio.h b/src/evio.h
index a4babc45ec..a276a1ec16 100644
--- a/src/evio.h
+++ b/src/evio.h
@@ -53,7 +53,8 @@
  * struct evio_service *service;
  * service = malloc(sizeof(struct evio_service));
  * evio_service_init(service, ..., on_accept_cb, ...);
- * evio_service_start(service);
+ * evio_service_bind(service);
+ * evio_service_listen(service);
  * ...
  * evio_service_stop(service);
  * free(service);
@@ -76,12 +77,6 @@ struct evio_service
 	};
 	socklen_t addr_len;
 
-	/** A callback invoked upon a successful bind, optional.
-	 * If on_bind callback throws an exception, it's
-	 * service start is aborted, and exception is re-thrown.
-	 */
-	void (*on_bind)(void *);
-	void *on_bind_param;
 	/**
 	 * A callback invoked on every accepted client socket.
 	 * It's OK to throw an exception in the callback:
@@ -92,8 +87,6 @@ struct evio_service
 			  struct sockaddr *, socklen_t);
 	void *on_accept_param;
 
-	/** libev timer object for the bind retry delay. */
-	struct ev_timer timer;
 	/** libev io object for the acceptor socket. */
 	struct ev_io ev;
 	ev_loop *loop;
@@ -107,19 +100,17 @@ evio_service_init(ev_loop *loop,
 				    int, struct sockaddr *, socklen_t),
 		  void *on_accept_param);
 
-/** Set an optional callback to be invoked upon a successful bind. */
-static inline void
-evio_service_on_bind(struct evio_service *service,
-		     void (*on_bind)(void *),
-		     void *on_bind_param)
-{
-	service->on_bind = on_bind;
-	service->on_bind_param = on_bind_param;
-}
+/** Bind service to specified uri */
+int
+evio_service_bind(struct evio_service *service, const char *uri);
 
-/** Bind to the port and begin listening. */
+/**
+ * Listen on bounded socket
+ *
+ * @retval 0 for success
+ */
 void
-evio_service_start(struct evio_service *service, const char *uri);
+evio_service_listen(struct evio_service *service);
 
 /** If started, stop event flow and close the acceptor socket. */
 void
@@ -134,7 +125,7 @@ evio_close(ev_loop *loop, struct ev_io *evio);
 static inline bool
 evio_service_is_active(struct evio_service *service)
 {
-	return ev_is_active(&service->ev) || ev_is_active(&service->timer);
+	return service->ev.fd >= 0;
 }
 
 static inline bool
-- 
GitLab