diff --git a/include/coeio.h b/include/coeio.h new file mode 100644 index 0000000000000000000000000000000000000000..0eac1b09834e90b36f497a2af299760a03c373f0 --- /dev/null +++ b/include/coeio.h @@ -0,0 +1,68 @@ +#ifndef TARANTOOL_COEIO_H_INCLUDED +#define TARANTOOL_COEIO_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "config.h" + +#include <stdbool.h> +#include <stdint.h> +#include <stdarg.h> +#include <unistd.h> +#include <tarantool_ev.h> +#include <tarantool_eio.h> +#include <coro.h> +#include <util.h> +#include <rlist.h> + +/** + * Asynchronous IO Tasks (libeio wrapper) + * + * Yield the current fiber until a created task is complete. + */ + +/** + * A single task' context. + */ +struct coeio_req { + struct eio_req *req; + struct fiber *f; + bool complete; + bool wait; + void *f_data; + void *result; + struct rlist link; +}; + +void coeio_init(void); +void coeio_free(void); +struct coeio_req *coeio_custom(void (*f)(eio_req*), void *arg); +void *coeio_wait(struct coeio_req *r); + +#endif /* TARANTOOL_COEIO_H_INCLUDED */ diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index e8a5913b9dc841f2317ea7061c213070e396a56b..ffe13cd7bc05fec6db59c4bfe799ccba0e09d381 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -111,7 +111,7 @@ set (common_sources sio.m evio.m coio.m - asio.m + coeio.m iobuf.m coio_buf.m salloc.m diff --git a/src/coeio.m b/src/coeio.m new file mode 100644 index 0000000000000000000000000000000000000000..83bbed9bfffd7bc9c5503fbd50bbdae30af0e8ec --- /dev/null +++ b/src/coeio.m @@ -0,0 +1,218 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "coeio.h" +#include "fiber.h" +#include "exception.h" +#include <rlist.h> +#include <errno.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +/* + * Asynchronous IO Tasks (libeio wrapper). + * --- + * + * Libeio request processing is designed in edge-trigger + * manner, when libeio is ready to process some requests it + * calls coeio_poller callback. + * + * Due to libeio design, coeio_poller is called while locks + * are being held, so it's unable to call any libeio function + * inside this callback. + * + * coeio_poller triggers coeio_watcher to start the polling process. + * In case if none of the requests are complete by that time, it + * starts idle_watcher, which would periodically invoke eio_poll + * until any of requests are complete. + * + * See for details: + * http://pod.tst.eu/http://cvs.schmorp.de/libeio/eio.pod +*/ +struct coeio_manager { + ev_idle coeio_repeat_watcher; + ev_async coeio_watcher; + struct rlist active; +}; + +static struct coeio_manager coeio_manager; + +static void +coeio_schedule_repeat(struct ev_idle *w, + int events __attribute__((unused))) +{ + if (eio_poll() != -1) + ev_idle_stop(w); +} + +static void +coeio_schedule(struct ev_async *w __attribute__((unused)), + int events __attribute__((unused))) +{ + if (eio_poll() == -1) + ev_idle_start(&coeio_manager.coeio_repeat_watcher); +} + +static void coeio_poller(void) +{ + ev_async_send(&coeio_manager.coeio_watcher); +} + +/** + * Init coeio subsystem. + * + * Create idle and async watchers, init eio. + */ +void +coeio_init(void) +{ + memset(&coeio_manager, 0, sizeof(struct coeio_manager)); + + rlist_init(&coeio_manager.active); + + ev_idle_init(&coeio_manager.coeio_repeat_watcher, coeio_schedule_repeat); + ev_async_init(&coeio_manager.coeio_watcher, coeio_schedule); + ev_async_start(&coeio_manager.coeio_watcher); + + eio_init(coeio_poller, NULL); +} + +/** + * Cancel active tasks and free memory. + */ +void +coeio_free(void) +{ + struct coeio_req *r; + struct coeio_req *r_next; + + /* cancel active requests */ + r = rlist_first_entry(&coeio_manager.active, struct coeio_req, link); + while (1) { + if (r == rlist_last_entry(&coeio_manager.active, + struct coeio_req, link)) + break; + r_next = rlist_next_entry(r, link); + /* eio_cancel sets task as cancelled, this guarantees + * that coeio_on_complete would never be called for + * this request, thus we are allowed to free memory here. */ + eio_cancel(r->req); + free(r); + r = r_next; + } +} + +inline static struct coeio_req* +coeio_alloc(void) +{ + struct coeio_req *r = calloc(1, sizeof(struct coeio_req)); + if (r == NULL) { + tnt_raise(LoggedError, :ER_MEMORY_ISSUE, + sizeof(struct coeio_req), "coeio_alloc", + "coeio_req"); + } + rlist_init(&r->link); + return r; +} + +static int +coeio_on_complete(eio_req *req) +{ + struct coeio_req *r = req->data; + struct fiber *f = r->f; + r->complete = true; + rlist_del_entry(r, link); + if (r->wait) + fiber_wakeup(f); + return 0; +} + +/** + * Create new eio task with specified libeio function and + * argument. + * + * @throws ER_MEMORY_ISSUE + * + * @return coeio object pointer. + * + * @code + * static void request(eio_req *req) { + * (void)req->data; // "arg" + * + * req->result = "result"; + * } + * + * struct coeio_req *r = coeio_custom(request, "arg"); + * + */ +struct coeio_req* +coeio_custom(void (*f)(eio_req*), void *arg) +{ + struct coeio_req *r = coeio_alloc(); + r->f = fiber; + r->f_data = arg; + r->req = eio_custom(f, 0, coeio_on_complete, r); + if (r->req == NULL) { + tnt_raise(LoggedError, :ER_MEMORY_ISSUE, + sizeof(struct eio_req), "coeio_custom", + "eio_req"); + } + rlist_add_tail_entry(&coeio_manager.active, r, link); + return r; +} + +/** + * Yield and wait for a request completion. + * + * @throws FiberCancelException + * + * @return request result pointer. + * + * @code + * struct coeio_req *r = coeio_custom(callback, NULL); + * + * // wait for result and free request object + * void *result = coeio_wait(r); + * + * // continue with result + */ +void *coeio_wait(struct coeio_req *r) +{ + if (r->complete) { + void *result = r->result; + free(r); + return result; + } + r->wait = true; + fiber_yield(); + void *result = r->result; + free(r); + fiber_testcancel(); + return result; +} diff --git a/src/tarantool.m b/src/tarantool.m index c522215450d939fc9e8a5d719ff601e753eab9bd..8a9aeb55631a564b6a8682e5d972076166c82456 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -48,7 +48,7 @@ #include <admin.h> #include <replication.h> #include <fiber.h> -#include <asio.h> +#include <coeio.h> #include <iproto.h> #include <latch.h> #include <recovery.h> @@ -593,7 +593,7 @@ tarantool_free(void) destroy_tarantool_cfg(&cfg); fiber_free(); - asio_free(); + coeio_free(); palloc_free(); ev_default_destroy(); #ifdef ENABLE_GCOV @@ -610,7 +610,7 @@ initialize(double slab_alloc_arena, int slab_alloc_minimal, double slab_alloc_fa if (!salloc_init(slab_alloc_arena * (1 << 30), slab_alloc_minimal, slab_alloc_factor)) panic_syserror("can't initialize slab allocator"); fiber_init(); - asio_init(); + coeio_init(); } static void