From ab7179b76ae5492ce1dc9432309f367943ba680c Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Tue, 3 Feb 2015 16:45:17 +0300 Subject: [PATCH] core: Implement fiber_join(). Add a test case. Rename fiber.test to fiber_stress.test Change cxception propagation so that exception is fiber-local, not thread local. Added exception propagation to fiber_join(). --- src/box/lua/error.cc | 5 +- src/box/recovery.cc | 4 +- src/coeio.cc | 11 +++-- src/coio.cc | 4 +- src/exception.cc | 60 ++++++----------------- src/exception.h | 26 ++++++++-- src/fiber.cc | 92 +++++++++++++++++++++++++++-------- src/fiber.h | 37 +++++++++++--- src/lua/init.cc | 5 +- test/unit/CMakeLists.txt | 5 +- test/unit/fiber.cc | 71 +++++++++++++++++++-------- test/unit/fiber.result | 7 +++ test/unit/fiber_stress.cc | 42 ++++++++++++++++ test/unit/fiber_stress.result | 0 14 files changed, 259 insertions(+), 110 deletions(-) create mode 100644 test/unit/fiber_stress.cc create mode 100644 test/unit/fiber_stress.result diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc index 6c817a6f38..e709a6b12b 100644 --- a/src/box/lua/error.cc +++ b/src/box/lua/error.cc @@ -53,9 +53,8 @@ lbox_raise(lua_State *L) int top = lua_gettop(L); if (top <= 1) { /* re-throw saved exceptions (if any) */ - if (cord()->exception == NULL) - return 0; - cord()->exception->raise(); + if (fiber()->exception) + fiber()->exception->raise(); return 0; } else if (top >= 2 && lua_type(L, 2) == LUA_TNUMBER) { code = lua_tointeger(L, 2); diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 071122b8dd..563b304f71 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -1137,9 +1137,9 @@ wal_write(struct recovery_state *r, struct xrow_header *row) * error from WAL writer and not roll back the * transaction. */ - bool cancellable = fiber_setcancellable(false); + bool cancellable = fiber_set_cancellable(false); fiber_yield(); /* Request was inserted. */ - fiber_setcancellable(cancellable); + fiber_set_cancellable(cancellable); return req->res; } diff --git a/src/coeio.cc b/src/coeio.cc index 6ea8282244..36efb2b319 100644 --- a/src/coeio.cc +++ b/src/coeio.cc @@ -207,7 +207,7 @@ coeio_custom(ssize_t (*func)(va_list ap), ev_tstamp timeout, ...) * its stack and crash the server when * this stack is accessed in the worker thread. */ - bool cancellable = fiber_setcancellable(false); + bool cancellable = fiber_set_cancellable(false); va_start(task.ap, timeout); struct eio_req *req = eio_custom(coeio_custom_cb, 0, coeio_on_complete, &task); @@ -223,7 +223,7 @@ coeio_custom(ssize_t (*func)(va_list ap), ev_tstamp timeout, ...) errno = task.errorno; } va_end(task.ap); - fiber_setcancellable(cancellable); + fiber_set_cancellable(cancellable); return task.result; } @@ -298,10 +298,11 @@ cord_cojoin(struct cord *cord) { assert(cord() != cord); /* Can't join self. */ int rc = coeio_custom(cord_cojoin_cb, TIMEOUT_INFINITY, cord); - if (rc == 0 && cord->exception) { - Exception::move(cord, cord()); + if (rc == 0 && cord->fiber->exception) { + Exception::move(&cord->fiber->exception, &fiber()->exception); cord_destroy(cord); - cord()->exception->raise(); /* re-throw exception from cord */ + /* re-throw exception in this fiber */ + fiber()->exception->raise(); } cord_destroy(cord); return rc; diff --git a/src/coio.cc b/src/coio.cc index 8e4f107fdc..3acd6a9183 100644 --- a/src/coio.cc +++ b/src/coio.cc @@ -703,9 +703,9 @@ coio_waitpid(pid_t pid) * in this case the server will leave a zombie process * behind. */ - bool allow_cancel = fiber_setcancellable(false); + bool allow_cancel = fiber_set_cancellable(false); fiber_yield(); - fiber_setcancellable(allow_cancel); + fiber_set_cancellable(allow_cancel); ev_child_stop(loop(), &cw); int status = cw.rstatus; fiber_testcancel(); diff --git a/src/exception.cc b/src/exception.cc index ca20809282..851942cd3d 100644 --- a/src/exception.cc +++ b/src/exception.cc @@ -35,64 +35,36 @@ #include <errno.h> #include <typeinfo> +/** out_of_memory::size is zero-initialized by the linker. */ static OutOfMemory out_of_memory(__FILE__, __LINE__, sizeof(OutOfMemory), "malloc", "exception"); void * Exception::operator new(size_t size) { - struct cord *cord = cord(); + struct fiber *fiber = fiber(); - if (cord->exception == &out_of_memory) { - assert(cord->exception_size == 0); - cord->exception = NULL; - } - if (cord->exception) { + if (fiber->exception && fiber->exception->size == 0) + fiber->exception = NULL; + + if (fiber->exception) { /* Explicitly call destructor for previous exception */ - cord->exception->~Exception(); - if (cord->exception_size >= size) { + fiber->exception->~Exception(); + if (fiber->exception->size >= size) { /* Reuse memory allocated for exception */ - return cord->exception; + return fiber->exception; } - free(cord->exception); + free(fiber->exception); } - cord->exception = (Exception *) malloc(size); - if (cord->exception) { - cord->exception_size = size; - return cord->exception; - } - cord->exception = &out_of_memory; - cord->exception_size = 0; - throw cord->exception; -} - -void -Exception::init(struct cord *cord) -{ - cord->exception = NULL; - cord->exception_size = 0; -} - -void -Exception::cleanup(struct cord *cord) -{ - if (cord->exception != NULL && cord->exception != &out_of_memory) { - cord->exception->~Exception(); - free(cord->exception); + fiber->exception = (Exception *) malloc(size); + if (fiber->exception) { + fiber->exception->size = size; + return fiber->exception; } - Exception::init(cord); -} - -void -Exception::move(struct cord *from, struct cord *to) -{ - Exception::cleanup(to); - to->exception = from->exception; - to->exception_size = from->exception_size; - Exception::init(from); + fiber->exception = &out_of_memory; + throw fiber->exception; } - void Exception::operator delete(void * /* ptr */) { diff --git a/src/exception.h b/src/exception.h index c2d68033fb..6071eb95bc 100644 --- a/src/exception.h +++ b/src/exception.h @@ -32,11 +32,12 @@ #include <stdarg.h> #include "say.h" -struct cord; - enum { TNT_ERRMSG_MAX = 512 }; class Exception: public Object { +protected: + /** Allocated size. */ + size_t size; public: void *operator new(size_t size); void operator delete(void*); @@ -55,11 +56,26 @@ class Exception: public Object { virtual void log() const; virtual ~Exception() {} - static void init(struct cord *cord); + static void init(Exception **what) + { + *what = NULL; + } /** Clear the last error saved in the current thread's TLS */ - static void cleanup(struct cord *cord); + static inline void cleanup(Exception **what) + { + if (*what != NULL && (*what)->size > 0) { + (*what)->~Exception(); + free(*what); + } + Exception::init(what); + } /** Move an exception from one thread to another. */ - static void move(struct cord *from, struct cord *to); + static void move(Exception **from, Exception **to) + { + Exception::cleanup(to); + *to = *from; + Exception::init(from); + } protected: Exception(const char *file, unsigned line); /* The copy constructor is needed for C++ throw */ diff --git a/src/fiber.cc b/src/fiber.cc index 97e6587f25..7f0e46cbb9 100644 --- a/src/fiber.cc +++ b/src/fiber.cc @@ -36,6 +36,7 @@ #include "assoc.h" #include "memory.h" #include "trigger.h" +#include <typeinfo> static struct cord main_cord; __thread struct cord *cord_ptr = NULL; @@ -52,6 +53,9 @@ update_last_stack_frame(struct fiber *fiber) } +static void +fiber_recycle(struct fiber *fiber); + void fiber_call(struct fiber *callee) { @@ -157,22 +161,22 @@ void fiber_testcancel(void) { /* - * Fiber can catch FiberCancelException using try..catch block in C or - * pcall()/xpcall() in Lua. However, FIBER_IS_CANCELLED flag is still set - * and the subject fiber will be killed by subsequent unprotected call - * of this function. + * Fiber can catch FiberCancelException using try..catch + * block in C or pcall()/xpcall() in Lua. However, + * FIBER_IS_CANCELLED flag is still set and the subject + * fiber will be killed by subsequent unprotected call of + * this function. */ if (fiber_is_cancelled()) tnt_raise(FiberCancelException); } - /** Change the current cancellation state of a fiber. This is not * a cancellation point. */ bool -fiber_setcancellable(bool yesno) +fiber_set_cancellable(bool yesno) { bool prev = fiber()->flags & FIBER_IS_CANCELLABLE; if (yesno == true) @@ -182,6 +186,39 @@ fiber_setcancellable(bool yesno) return prev; } +void +fiber_set_joinable(struct fiber *fiber, bool yesno) +{ + if (yesno == true) + fiber->flags |= FIBER_IS_JOINABLE; + else + fiber->flags &= ~FIBER_IS_JOINABLE; +} + +void +fiber_join(struct fiber *fiber) +{ + assert(fiber->flags & FIBER_IS_JOINABLE); + if (fiber->flags & FIBER_IS_DEAD) { + /* The fiber is already dead. */ + fiber_recycle(fiber); + } else { + rlist_add_tail_entry(&fiber->wake, fiber(), state); + fiber_yield(); + /* + * Let the fiber recycle. + * This can't be done here since there may be other + * waiters in fiber->wake list, which must run first. + */ + fiber_set_joinable(fiber, false); + } + Exception::move(&fiber->exception, &fiber()->exception); + if (fiber()->exception && + typeid(*fiber()->exception) != typeid(FiberCancelException)) { + fiber()->exception->raise(); + } + fiber_testcancel(); +} /** * @note: this is not a cancellation point (@sa fiber_testcancel()) * but it is considered good practice to call testcancel() @@ -361,9 +398,18 @@ fiber_loop(void *data __attribute__((unused))) } /** By convention, these triggers must not throw. */ if (! rlist_empty(&fiber->on_stop)) - trigger_run(&fiber->on_stop, NULL); + trigger_run(&fiber->on_stop, fiber); fiber_schedule_list(&fiber->wake); - fiber_recycle(fiber); + if (fiber->flags & FIBER_IS_JOINABLE) { + /* + * The fiber needs to be joined, + * and the joiner has not shown up yet, + * wait. + */ + fiber->flags |= FIBER_IS_DEAD; + } else { + fiber_recycle(fiber); + } fiber_yield(); /* give control back to scheduler */ } } @@ -436,22 +482,25 @@ fiber_new(const char *name, void (*f) (va_list)) /** * Free as much memory as possible taken by the fiber. * - * @todo release memory allocated for - * struct fiber and some of its members. + * Sic: cord()->sched needs manual destruction in + * cord_destroy(). */ void fiber_destroy(struct fiber *f) { - if (f == fiber()) /* do not destroy running fiber */ - return; - if (strcmp(fiber_name(f), "sched") == 0) + if (f == fiber()) { + /** End of the application. */ + assert(cord() == &main_cord); return; + } + assert(f != &cord()->sched); trigger_destroy(&f->on_yield); trigger_destroy(&f->on_stop); rlist_del(&f->state); region_destroy(&f->gc); tarantool_coro_destroy(&f->coro); + Exception::cleanup(&f->exception); } void @@ -478,13 +527,15 @@ cord_create(struct cord *cord, const char *name) rlist_create(&cord->dead); cord->fiber_registry = mh_i32ptr_new(); + /* sched fiber is not present in alive/ready/dead list. */ cord->sched.fid = 1; - cord->fiber = &cord->sched; + fiber_reset(&cord->sched); + Exception::init(&cord->sched.exception); region_create(&cord->sched.gc, &cord->slabc); fiber_set_name(&cord->sched, "sched"); + cord->fiber = &cord->sched; cord->max_fid = 100; - Exception::init(cord); ev_async_init(&cord->wakeup_event, fiber_schedule_wakeup); ev_async_start(cord->loop, &cord->wakeup_event); @@ -500,9 +551,10 @@ cord_destroy(struct cord *cord) fiber_destroy_all(cord); mh_i32ptr_delete(cord->fiber_registry); } + region_destroy(&cord->sched.gc); + Exception::cleanup(&cord->sched.exception); slab_cache_destroy(&cord->slabc); ev_loop_destroy(cord->loop); - Exception::cleanup(cord); } struct cord_thread_arg @@ -537,7 +589,7 @@ void *cord_thread_func(void *p) * Clear a possible leftover exception object * to not confuse the invoker of the thread. */ - Exception::cleanup(cord); + Exception::cleanup(&cord->fiber->exception); } catch (Exception *) { /* * The exception is now available to the caller @@ -573,16 +625,16 @@ cord_join(struct cord *cord) assert(cord() != cord); /* Can't join self. */ void *retval = NULL; int res = tt_pthread_join(cord->id, &retval); - if (res == 0 && cord->exception) { + if (res == 0 && cord->fiber->exception) { /* * cord_thread_func guarantees that * cord->exception is only set if the subject cord * has terminated with an uncaught exception, * transfer it to the caller. */ - Exception::move(cord, cord()); + Exception::move(&cord->fiber->exception, &fiber()->exception); cord_destroy(cord); - cord()->exception->raise(); + fiber()->exception->raise(); } cord_destroy(cord); return res; diff --git a/src/fiber.h b/src/fiber.h index 3cdd5c0322..ca2b476c82 100644 --- a/src/fiber.h +++ b/src/fiber.h @@ -55,12 +55,25 @@ enum { * e.g. to force it to check that it's been * cancelled. */ - FIBER_IS_CANCELLABLE = 1 << 0, + FIBER_IS_CANCELLABLE = 1 << 0, /** * Indicates that a fiber has been requested to end * prematurely. */ - FIBER_IS_CANCELLED = 1 << 1, + FIBER_IS_CANCELLED = 1 << 1, + /** + * The fiber will garbage collect automatically + * when fiber function ends. The alternative + * is that some other fiber will wait for + * the end of this fiber and garbage collect it + * with fiber_join(). + */ + FIBER_IS_JOINABLE = 1 << 2, + /** + * This flag is set when fiber function ends and before + * the fiber is recycled. + */ + FIBER_IS_DEAD = 1 << 4, FIBER_DEFAULT_FLAGS = FIBER_IS_CANCELLABLE }; @@ -139,6 +152,8 @@ struct fiber { va_list f_data; /** Fiber local storage */ void *fls[FIBER_KEY_MAX]; + /** Exception which caused this fiber's death. */ + class Exception *exception; }; enum { FIBER_CALL_STACK = 16 }; @@ -183,9 +198,6 @@ struct cord { /** A runtime slab cache for general use in this cord. */ struct slab_cache slabc; char name[FIBER_NAME_MAX]; - /** Last thrown exception */ - class Exception *exception; - size_t exception_size; }; extern __thread struct cord *cord_ptr; @@ -308,7 +320,20 @@ fiber_testcancel(void); * @return previous state. */ bool -fiber_setcancellable(bool yesno); +fiber_set_cancellable(bool yesno); + +/** + * Make a fiber joinable (false by default). + */ +void +fiber_set_joinable(struct fiber *fiber, bool yesno); + +/** + * Wait till the argument fiber ends execution. + * The fiber must not be detached (set fiber_set_joinable()). + */ +void +fiber_join(struct fiber *f); void fiber_sleep(ev_tstamp s); diff --git a/src/lua/init.cc b/src/lua/init.cc index 338b393eac..586b474bf0 100644 --- a/src/lua/init.cc +++ b/src/lua/init.cc @@ -383,8 +383,9 @@ char *history = NULL; extern "C" const char * tarantool_error_message(void) { - assert(cord()->exception != NULL); /* called only from error handler */ - return cord()->exception->errmsg(); + /* called only from error handler */ + assert(fiber()->exception != NULL); + return fiber()->exception->errmsg(); } /** diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 39ef9b6ded..72f7e9bed5 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -58,9 +58,12 @@ target_link_libraries(vclock.test core small) add_executable(quota.test quota.cc unit.c) target_link_libraries(quota.test pthread) -add_executable(fiber.test fiber.cc) +add_executable(fiber.test fiber.cc unit.c) target_link_libraries(fiber.test core) +add_executable(fiber_stress.test fiber_stress.cc) +target_link_libraries(fiber_stress.test core) + add_executable(coio.test coio.cc unit.c ${CMAKE_SOURCE_DIR}/src/sio.cc ${CMAKE_SOURCE_DIR}/src/evio.cc diff --git a/test/unit/fiber.cc b/test/unit/fiber.cc index 1672b3be54..b50e9c9bcb 100644 --- a/test/unit/fiber.cc +++ b/test/unit/fiber.cc @@ -1,31 +1,62 @@ #include "memory.h" #include "fiber.h" +#include "unit.h" -enum { - ITERATIONS = 5000, - FIBERS = 10 -}; +static void +noop_f(va_list ap) +{ + return; +} -void yield_f(va_list ap) +static void +cancel_f(va_list ap) { - for (int i = 0; i < ITERATIONS; i++) { - fiber_wakeup(fiber()); - fiber_yield(); + fiber_set_cancellable(true); + while (true) { + fiber_sleep(0.001); + fiber_testcancel(); } } -void benchmark_f(va_list ap) +static void +exception_f(va_list ap) { - struct fiber *fibers[FIBERS]; - for (int i = 0; i < FIBERS; i++) { - fibers[i] = fiber_new("yield-wielder", yield_f); - fiber_wakeup(fibers[i]); - } - /** Wait for fibers to die. */ - for (int i = 0; i < FIBERS; i++) { - while (fibers[i]->fid > 0) - fiber_sleep(0.001); + tnt_raise(OutOfMemory, 42, "allocator", "exception"); +} + +static void +fiber_join_test() +{ + header(); + + struct fiber *fiber= fiber_new("join", noop_f); + fiber_set_joinable(fiber, true); + fiber_wakeup(fiber); + fiber_join(fiber); + + fiber = fiber_new("cancel", cancel_f); + fiber_set_joinable(fiber, true); + fiber_wakeup(fiber); + fiber_sleep(0); + fiber_cancel(fiber); + fiber_join(fiber); + + fiber = fiber_new("exception", exception_f); + fiber_set_joinable(fiber, true); + fiber_wakeup(fiber); + try { + fiber_join(fiber); + fail("exception not raised", ""); + } catch (Exception *e) { + note("exception propagated"); } + footer(); +} + +static void +main_f(va_list ap) +{ + fiber_join_test(); ev_break(loop(), EVBREAK_ALL); } @@ -33,8 +64,8 @@ int main() { memory_init(); fiber_init(); - struct fiber *benchmark = fiber_new("benchmark", benchmark_f); - fiber_wakeup(benchmark); + struct fiber *main = fiber_new("main", main_f); + fiber_wakeup(main); ev_run(loop(), 0); fiber_free(); memory_free(); diff --git a/test/unit/fiber.result b/test/unit/fiber.result index e69de29bb2..c2f32b54e4 100644 --- a/test/unit/fiber.result +++ b/test/unit/fiber.result @@ -0,0 +1,7 @@ +(null): fiber `cancel' has been cancelled +(null): fiber `cancel': exiting +(null): SystemError Failed to allocate 42 bytes in allocator for exception: Cannot allocate memory + *** fiber_join_test *** +# exception propagated + *** fiber_join_test: done *** + \ No newline at end of file diff --git a/test/unit/fiber_stress.cc b/test/unit/fiber_stress.cc new file mode 100644 index 0000000000..1672b3be54 --- /dev/null +++ b/test/unit/fiber_stress.cc @@ -0,0 +1,42 @@ +#include "memory.h" +#include "fiber.h" + +enum { + ITERATIONS = 5000, + FIBERS = 10 +}; + +void yield_f(va_list ap) +{ + for (int i = 0; i < ITERATIONS; i++) { + fiber_wakeup(fiber()); + fiber_yield(); + } +} + +void benchmark_f(va_list ap) +{ + struct fiber *fibers[FIBERS]; + for (int i = 0; i < FIBERS; i++) { + fibers[i] = fiber_new("yield-wielder", yield_f); + fiber_wakeup(fibers[i]); + } + /** Wait for fibers to die. */ + for (int i = 0; i < FIBERS; i++) { + while (fibers[i]->fid > 0) + fiber_sleep(0.001); + } + ev_break(loop(), EVBREAK_ALL); +} + +int main() +{ + memory_init(); + fiber_init(); + struct fiber *benchmark = fiber_new("benchmark", benchmark_f); + fiber_wakeup(benchmark); + ev_run(loop(), 0); + fiber_free(); + memory_free(); + return 0; +} diff --git a/test/unit/fiber_stress.result b/test/unit/fiber_stress.result new file mode 100644 index 0000000000..e69de29bb2 -- GitLab