From d94bbef80337790f0c3a8edf446a21b83e9dd4a4 Mon Sep 17 00:00:00 2001
From: GeorgyKirichenko <kirichenkoga@gmail.com>
Date: Tue, 17 May 2016 23:05:35 +0300
Subject: [PATCH] Add on start and on stop handlers to eio. Issue #1452. V2

---
 src/coeio.c              |  8 ++++++++
 src/coeio.h              |  3 +++
 src/fiber.c              | 18 ++++++++----------
 src/fiber.h              |  6 ++++++
 src/main.cc              | 20 ++++++++++++++++++++
 third_party/libeio/eio.c | 12 ++++++++++++
 third_party/libeio/eio.h |  6 ++++++
 third_party/libeio/etp.c | 27 +++++++++++++++++++++++++++
 8 files changed, 90 insertions(+), 10 deletions(-)

diff --git a/src/coeio.c b/src/coeio.c
index c968de97d7..55e8cc9322 100644
--- a/src/coeio.c
+++ b/src/coeio.c
@@ -125,6 +125,14 @@ coeio_init(void)
 	ev_async_start(loop(), &coeio_manager.coeio_async);
 }
 
+void
+coeio_set_thread_cb(int (*on_start_cb)(void *),
+		    int (*on_stop_cb)(void *), void *data)
+{
+	eio_set_thread_on_start(on_start_cb, data);
+	eio_set_thread_on_stop(on_stop_cb, data);
+}
+
 static void
 coio_on_exec(eio_req *req)
 {
diff --git a/src/coeio.h b/src/coeio.h
index 610191c626..5ffe74c99c 100644
--- a/src/coeio.h
+++ b/src/coeio.h
@@ -50,6 +50,9 @@ extern "C" {
 
 void coeio_init(void);
 
+void coeio_set_thread_cb(int (*on_start_cb)(void *),
+			 int (*on_stop_cb)(void *), void *data);
+
 struct coio_task;
 
 typedef ssize_t (*coio_task_cb)(struct coio_task *task); /* like eio_req */
diff --git a/src/fiber.c b/src/fiber.c
index 76810d2b43..c3edec3b9a 100644
--- a/src/fiber.c
+++ b/src/fiber.c
@@ -633,10 +633,11 @@ fiber_destroy_all(struct cord *cord)
 		fiber_destroy(cord, f);
 }
 
-static void
-cord_init(const char *name)
+void
+cord_create(struct cord *cord, const char *name)
 {
-	struct cord *cord = cord();
+	cord() = cord;
+	slab_cache_set_thread(&cord()->slabc);
 
 	cord->id = pthread_self();
 	cord->on_exit = NULL;
@@ -668,7 +669,7 @@ cord_init(const char *name)
 	cord_set_name(name);
 }
 
-static void
+void
 cord_destroy(struct cord *cord)
 {
 	slab_cache_set_thread(&cord->slabc);
@@ -702,9 +703,7 @@ struct cord_thread_arg
 void *cord_thread_func(void *p)
 {
 	struct cord_thread_arg *ct_arg = (struct cord_thread_arg *) p;
-	cord() = ct_arg->cord;
-	slab_cache_set_thread(&cord()->slabc);
-	cord_init(ct_arg->name);
+	cord_create(ct_arg->cord, (ct_arg->name));
 	/** Can't possibly be the main thread */
 	assert(cord()->id != main_thread_id);
 	tt_pthread_mutex_lock(&ct_arg->start_mutex);
@@ -962,9 +961,8 @@ fiber_init(int (*invoke)(fiber_func f, va_list ap))
 {
 	fiber_invoke = invoke;
 	main_thread_id = pthread_self();
-	cord() = &main_cord;
-	cord()->loop = ev_default_loop(EVFLAG_AUTO | EVFLAG_ALLOCFD);
-	cord_init("main");
+	main_cord.loop = ev_default_loop(EVFLAG_AUTO | EVFLAG_ALLOCFD);
+	cord_create(&main_cord, "main");
 }
 
 void
diff --git a/src/fiber.h b/src/fiber.h
index 605f028b9c..efd41f0d2f 100644
--- a/src/fiber.h
+++ b/src/fiber.h
@@ -339,6 +339,12 @@ extern __thread struct cord *cord_ptr;
 #define fiber() cord()->fiber
 #define loop() (cord()->loop)
 
+void
+cord_create(struct cord *cord, const char *name);
+
+void
+cord_destroy(struct cord *cord);
+
 /**
  * Start a cord with the given thread function.
  * The return value of the function can be collected
diff --git a/src/main.cc b/src/main.cc
index a307617bb2..e755be1e0e 100644
--- a/src/main.cc
+++ b/src/main.cc
@@ -517,6 +517,25 @@ tarantool_free(void)
 #endif
 }
 
+static int 
+coeio_cord_start(void *data)
+{
+	(void) data;
+	struct cord *cord = (struct cord *)calloc(sizeof(struct cord), 1);
+	if (!cord)
+		return -1;
+	cord_create(cord, "coeio");
+	return 0;
+}
+
+static int
+coeio_cord_stop(void *data)
+{
+	(void) data;
+	cord_destroy(cord());
+	return 0;
+}
+
 int
 main(int argc, char **argv)
 {
@@ -626,6 +645,7 @@ main(int argc, char **argv)
 	/* Init iobuf library with default readahead */
 	iobuf_init();
 	coeio_init();
+	coeio_set_thread_cb(coeio_cord_start, coeio_cord_stop, NULL);
 	signal_init();
 	tarantool_lua_init(tarantool_bin, main_argc, main_argv);
 	box_lua_init(tarantool_L);
diff --git a/third_party/libeio/eio.c b/third_party/libeio/eio.c
index a5b216f727..7351d5ddae 100644
--- a/third_party/libeio/eio.c
+++ b/third_party/libeio/eio.c
@@ -512,6 +512,18 @@ eio_set_max_poll_reqs (unsigned int maxreqs)
   etp_set_max_poll_reqs (EIO_POOL_USER, maxreqs);
 }
 
+void
+eio_set_thread_on_start(int (*on_start_cb)(void *), void *data)
+{
+  etp_set_thread_on_start(EIO_POOL, on_start_cb, data);
+}
+
+void
+eio_set_thread_on_stop(int (*on_stop)(void *), void *data)
+{
+  etp_set_thread_on_stop(EIO_POOL, on_stop, data);
+}
+
 void ecb_cold
 eio_set_max_idle (unsigned int nthreads)
 {
diff --git a/third_party/libeio/eio.h b/third_party/libeio/eio.h
index 720b3d3624..645167fd08 100644
--- a/third_party/libeio/eio.h
+++ b/third_party/libeio/eio.h
@@ -309,6 +309,12 @@ void eio_set_max_poll_time (eio_tstamp nseconds);
 /* do not handle more then count requests in one call to eio_poll_cb */
 void eio_set_max_poll_reqs (unsigned int nreqs);
 
+/* on start callback for eio worker thread */
+void eio_set_thread_on_start(int (*on_start_cb)(void *), void *data);
+
+/* on stop callback for eio worker thread */
+void eio_set_thread_on_stop(int (*on_stop_cb)(void *), void *data);
+
 /* set minimum required number
  * maximum wanted number
  * or maximum idle number of threads */
diff --git a/third_party/libeio/etp.c b/third_party/libeio/etp.c
index 60b19904de..81921c85e0 100644
--- a/third_party/libeio/etp.c
+++ b/third_party/libeio/etp.c
@@ -138,6 +138,11 @@ struct etp_pool
    xmutex_t lock;
    xcond_t  reqwait;
    xcond_t  wrkwait;
+
+   int (*on_start_cb)(void *data);
+   void *on_start_data;
+   int (*on_stop_cb)(void *data);
+   void *on_stop_data;
 };
 
 struct etp_pool_user
@@ -324,6 +329,11 @@ X_THREAD_PROC (etp_proc)
 
   X_LOCK (pool->lock);
 
+  if (pool->on_start_cb)
+    if (pool->on_start_cb(pool->on_start_data))
+      goto quit;
+
+
   for (;;)
     {
       for (;;)
@@ -379,6 +389,8 @@ X_THREAD_PROC (etp_proc)
   pool->started--;
   X_COND_BROADCAST (pool->wrkwait);
   X_UNLOCK (pool->lock);
+  if (pool->on_stop_cb)
+    pool->on_stop_cb(pool->on_stop_data);
 
   return 0;
 }
@@ -541,6 +553,21 @@ etp_set_max_poll_reqs (etp_pool_user user, unsigned int maxreqs)
   user->max_poll_reqs = maxreqs;
 }
 
+ETP_API_DECL void ecb_cold
+etp_set_thread_on_start(etp_pool pool, int (*on_start_cb)(void *), void *data)
+{
+  pool->on_start_cb = on_start_cb;
+  pool->on_start_data = data;
+}
+
+ETP_API_DECL void ecb_cold
+etp_set_thread_on_stop(etp_pool pool, int (*on_stop_cb)(void *), void *data)
+{
+  pool->on_stop_cb = on_stop_cb;
+  pool->on_stop_data = data;
+}
+
+
 ETP_API_DECL void ecb_cold
 etp_set_max_idle (etp_pool pool, unsigned int threads)
 {
-- 
GitLab