From 54ae85bae16fc296c0582d74279497aa9e0c3e7d Mon Sep 17 00:00:00 2001
From: Dmitry Simonenko <pmwkaa@gmail.com>
Date: Thu, 6 Dec 2012 18:33:01 +0400
Subject: [PATCH] coeio: asio refactoring and review fixes.

---
 include/coeio.h    |  68 ++++++++++++++
 src/CMakeLists.txt |   2 +-
 src/coeio.m        | 218 +++++++++++++++++++++++++++++++++++++++++++++
 src/tarantool.m    |   6 +-
 4 files changed, 290 insertions(+), 4 deletions(-)
 create mode 100644 include/coeio.h
 create mode 100644 src/coeio.m

diff --git a/include/coeio.h b/include/coeio.h
new file mode 100644
index 0000000000..0eac1b0983
--- /dev/null
+++ b/include/coeio.h
@@ -0,0 +1,68 @@
+#ifndef TARANTOOL_COEIO_H_INCLUDED
+#define TARANTOOL_COEIO_H_INCLUDED
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "config.h"
+
+#include <stdbool.h>
+#include <stdint.h>
+#include <stdarg.h>
+#include <unistd.h>
+#include <tarantool_ev.h>
+#include <tarantool_eio.h>
+#include <coro.h>
+#include <util.h>
+#include <rlist.h>
+
+/**
+ * Asynchronous IO Tasks (libeio wrapper)
+ *
+ * Yield the current fiber until a created task is complete.
+ */
+
+/**
+ * A single task' context.
+ */
+struct coeio_req {
+	struct eio_req *req;
+	struct fiber *f;
+	bool complete;
+	bool wait;
+	void *f_data;
+	void *result;
+	struct rlist link;
+};
+
+void coeio_init(void);
+void coeio_free(void);
+struct coeio_req *coeio_custom(void (*f)(eio_req*), void *arg);
+void *coeio_wait(struct coeio_req *r);
+
+#endif /* TARANTOOL_COEIO_H_INCLUDED */
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index e8a5913b9d..ffe13cd7bc 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -111,7 +111,7 @@ set (common_sources
      sio.m
      evio.m
      coio.m
-     asio.m
+     coeio.m
      iobuf.m
      coio_buf.m
      salloc.m
diff --git a/src/coeio.m b/src/coeio.m
new file mode 100644
index 0000000000..83bbed9bff
--- /dev/null
+++ b/src/coeio.m
@@ -0,0 +1,218 @@
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "coeio.h"
+#include "fiber.h"
+#include "exception.h"
+#include <rlist.h>
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+/*
+ * Asynchronous IO Tasks (libeio wrapper).
+ * ---
+ *
+ * Libeio request processing is designed in edge-trigger
+ * manner, when libeio is ready to process some requests it
+ * calls coeio_poller callback.
+ *
+ * Due to libeio design, coeio_poller is called while locks
+ * are being held, so it's unable to call any libeio function
+ * inside this callback.
+ *
+ * coeio_poller triggers coeio_watcher to start the polling process.
+ * In case if none of the requests are complete by that time, it
+ * starts idle_watcher, which would periodically invoke eio_poll
+ * until any of requests are complete.
+ *
+ * See for details:
+ * http://pod.tst.eu/http://cvs.schmorp.de/libeio/eio.pod
+*/
+struct coeio_manager {
+	ev_idle coeio_repeat_watcher;
+	ev_async coeio_watcher;
+	struct rlist active;
+};
+
+static struct coeio_manager coeio_manager;
+
+static void
+coeio_schedule_repeat(struct ev_idle *w,
+		     int events __attribute__((unused)))
+{
+	if (eio_poll() != -1)
+		ev_idle_stop(w);
+}
+
+static void
+coeio_schedule(struct ev_async *w __attribute__((unused)),
+	      int events __attribute__((unused)))
+{
+	if (eio_poll() == -1)
+		ev_idle_start(&coeio_manager.coeio_repeat_watcher);
+}
+
+static void coeio_poller(void)
+{
+	ev_async_send(&coeio_manager.coeio_watcher);
+}
+
+/**
+ * Init coeio subsystem.
+ *
+ * Create idle and async watchers, init eio.
+ */
+void
+coeio_init(void)
+{
+	memset(&coeio_manager, 0, sizeof(struct coeio_manager));
+
+	rlist_init(&coeio_manager.active);
+
+	ev_idle_init(&coeio_manager.coeio_repeat_watcher, coeio_schedule_repeat);
+	ev_async_init(&coeio_manager.coeio_watcher, coeio_schedule);
+	ev_async_start(&coeio_manager.coeio_watcher);
+
+	eio_init(coeio_poller, NULL);
+}
+
+/**
+ * Cancel active tasks and free memory.
+ */
+void
+coeio_free(void)
+{
+	struct coeio_req *r;
+	struct coeio_req *r_next;
+
+	/* cancel active requests */
+	r = rlist_first_entry(&coeio_manager.active, struct coeio_req, link);
+	while (1) {
+		if (r == rlist_last_entry(&coeio_manager.active,
+					  struct coeio_req, link))
+			break;
+		r_next = rlist_next_entry(r, link);
+		/* eio_cancel sets task as cancelled, this guarantees
+		 * that coeio_on_complete would never be called for
+		 * this request, thus we are allowed to free memory here. */
+		eio_cancel(r->req);
+		free(r);
+		r = r_next;
+	}
+}
+
+inline static struct coeio_req*
+coeio_alloc(void)
+{
+	struct coeio_req *r = calloc(1, sizeof(struct coeio_req));
+	if (r == NULL) {
+		tnt_raise(LoggedError, :ER_MEMORY_ISSUE,
+			  sizeof(struct coeio_req), "coeio_alloc",
+			  "coeio_req");
+	}
+	rlist_init(&r->link);
+	return r;
+}
+
+static int
+coeio_on_complete(eio_req *req)
+{
+	struct coeio_req *r = req->data;
+	struct fiber *f = r->f;
+	r->complete = true;
+	rlist_del_entry(r, link);
+	if (r->wait)
+		fiber_wakeup(f);
+	return 0;
+}
+
+/**
+ * Create new eio task with specified libeio function and
+ * argument.
+ *
+ * @throws ER_MEMORY_ISSUE
+ *
+ * @return coeio object pointer.
+ *
+ * @code
+ *	static void request(eio_req *req) {
+ *		(void)req->data; // "arg"
+ *
+ *		req->result = "result";
+ *	}
+ *
+ *      struct coeio_req *r = coeio_custom(request, "arg");
+ *
+ */
+struct coeio_req*
+coeio_custom(void (*f)(eio_req*), void *arg)
+{
+	struct coeio_req *r = coeio_alloc();
+	r->f = fiber;
+	r->f_data = arg;
+	r->req = eio_custom(f, 0, coeio_on_complete, r);
+	if (r->req == NULL) {
+		tnt_raise(LoggedError, :ER_MEMORY_ISSUE,
+			  sizeof(struct eio_req), "coeio_custom",
+			  "eio_req");
+	}
+	rlist_add_tail_entry(&coeio_manager.active, r, link);
+	return r;
+}
+
+/**
+ * Yield and wait for a request completion.
+ *
+ * @throws FiberCancelException
+ *
+ * @return request result pointer.
+ *
+ * @code
+ *      struct coeio_req *r = coeio_custom(callback, NULL);
+ *
+ *      // wait for result and free request object
+ *      void *result = coeio_wait(r);
+ *
+ *      // continue with result
+ */
+void *coeio_wait(struct coeio_req *r)
+{
+	if (r->complete) {
+		void *result = r->result;
+		free(r);
+		return result;
+	}
+	r->wait = true;
+	fiber_yield();
+	void *result = r->result;
+	free(r);
+	fiber_testcancel();
+	return result;
+}
diff --git a/src/tarantool.m b/src/tarantool.m
index c522215450..8a9aeb5563 100644
--- a/src/tarantool.m
+++ b/src/tarantool.m
@@ -48,7 +48,7 @@
 #include <admin.h>
 #include <replication.h>
 #include <fiber.h>
-#include <asio.h>
+#include <coeio.h>
 #include <iproto.h>
 #include <latch.h>
 #include <recovery.h>
@@ -593,7 +593,7 @@ tarantool_free(void)
 	destroy_tarantool_cfg(&cfg);
 
 	fiber_free();
-	asio_free();
+	coeio_free();
 	palloc_free();
 	ev_default_destroy();
 #ifdef ENABLE_GCOV
@@ -610,7 +610,7 @@ initialize(double slab_alloc_arena, int slab_alloc_minimal, double slab_alloc_fa
 	if (!salloc_init(slab_alloc_arena * (1 << 30), slab_alloc_minimal, slab_alloc_factor))
 		panic_syserror("can't initialize slab allocator");
 	fiber_init();
-	asio_init();
+	coeio_init();
 }
 
 static void
-- 
GitLab