diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 8e35269954f49d4e5d8c01ed07a6df6c7a8ba0c0..397b449e125a42092d1d1bd4a5dbc442b8b19993 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -2600,6 +2600,11 @@ vy_env_dump_complete_cb(struct vy_scheduler *scheduler, static struct vy_squash_queue * vy_squash_queue_new(void); + +/** Stop squash queue working fiber. */ +static void +vy_squash_queue_shutdown(struct vy_squash_queue *q); + static void vy_squash_queue_delete(struct vy_squash_queue *q); static void @@ -2726,6 +2731,29 @@ vinyl_engine_free(struct engine *engine) vy_env_delete(env); } +/** + * Vinyl shutdown implementation. Shutdown stops all internal fibers/threads. + * It may yield. + */ +static void +vy_env_shutdown(struct vy_env *e) +{ + vy_scheduler_shutdown(&e->scheduler); + vy_squash_queue_shutdown(e->squash_queue); + vy_log_shutdown(); +} + +/** + * Vinyl shutdown. Shutdown stops all internal fibers/threads. + * It may yield. + */ +static void +vinyl_engine_shutdown(struct engine *engine) +{ + struct vy_env *env = vy_env(engine); + vy_env_shutdown(env); +} + void vinyl_engine_set_cache(struct engine *engine, size_t quota) { @@ -3471,13 +3499,18 @@ vy_squash_queue_new(void) } static void -vy_squash_queue_delete(struct vy_squash_queue *sq) +vy_squash_queue_shutdown(struct vy_squash_queue *sq) { if (sq->fiber != NULL) { + fiber_cancel(sq->fiber); + fiber_join(sq->fiber); sq->fiber = NULL; - /* Sic: fiber_cancel() can't be used here */ - fiber_cond_signal(&sq->cond); } +} + +static void +vy_squash_queue_delete(struct vy_squash_queue *sq) +{ struct vy_squash *squash, *next; stailq_foreach_entry_safe(squash, next, &sq->queue, next) vy_squash_delete(&sq->pool, squash); @@ -3488,7 +3521,7 @@ static int vy_squash_queue_f(va_list va) { struct vy_squash_queue *sq = va_arg(va, struct vy_squash_queue *); - while (sq->fiber != NULL) { + while (!fiber_is_cancelled()) { fiber_check_gc(); if (stailq_empty(&sq->queue)) { fiber_cond_wait(&sq->cond); @@ -3522,6 +3555,7 @@ vy_squash_schedule(struct vy_lsm *lsm, struct vy_entry entry, void *arg) vy_squash_queue_f); if (sq->fiber == NULL) goto fail; + fiber_set_joinable(sq->fiber, true); fiber_start(sq->fiber, sq); } @@ -4586,7 +4620,7 @@ static TRIGGER(on_replace_vinyl_deferred_delete, vy_deferred_delete_on_replace); static const struct engine_vtab vinyl_engine_vtab = { /* .free = */ vinyl_engine_free, - /* .shutdown = */ generic_engine_shutdown, + /* .shutdown = */ vinyl_engine_shutdown, /* .create_space = */ vinyl_engine_create_space, /* .create_read_view = */ generic_engine_create_read_view, /* .prepare_join = */ vinyl_engine_prepare_join, diff --git a/src/box/vy_log.c b/src/box/vy_log.c index d0d88ecaeeb765ed27e895bf7739f4dbd9a408b5..40270bf78949b625a3af6775563fc70a5fbbe5b3 100644 --- a/src/box/vy_log.c +++ b/src/box/vy_log.c @@ -771,6 +771,7 @@ vy_log_init(const char *dir) vy_log_flusher_f); if (vy_log.flusher == NULL) panic("failed to allocate vylog flusher fiber"); + fiber_set_joinable(vy_log.flusher, true); fiber_wakeup(vy_log.flusher); } @@ -807,7 +808,6 @@ vy_log_tx_flush(struct vy_log_tx *tx) diag_set(ClientError, ER_INJECTION, "vinyl log flush"); return -1; }); - ERROR_INJECT_YIELD(ERRINJ_VY_LOG_FLUSH_DELAY); int tx_size = 0; struct vy_log_record *record; @@ -922,6 +922,14 @@ vy_log_flusher_f(va_list va) return 0; } +void +vy_log_shutdown(void) +{ + fiber_cancel(vy_log.flusher); + fiber_join(vy_log.flusher); + vy_log.flusher = NULL; +} + void vy_log_free(void) { @@ -931,6 +939,7 @@ vy_log_free(void) stailq_create(&vy_log.pending_tx); mempool_destroy(&vy_log.tx_pool); xdir_destroy(&vy_log.dir); + latch_destroy(&vy_log.latch); diag_destroy(&vy_log.tx_diag); } diff --git a/src/box/vy_log.h b/src/box/vy_log.h index b4a0bdc91ea0e02249a8e3265341058c26ec25ab..c98cb37f736fc04f215aaac85a69f3b38ba6491a 100644 --- a/src/box/vy_log.h +++ b/src/box/vy_log.h @@ -442,6 +442,10 @@ struct vy_slice_recovery_info { void vy_log_init(const char *dir); +/** Shutdown metadata log. Shutdown stops the subsystem fibers. It may yield. */ +void +vy_log_shutdown(void); + /** * Destroy the metadata log. */ diff --git a/src/box/vy_run.c b/src/box/vy_run.c index 658795e83663496562992aa235cfd23ff2ab8a7f..255034593d069fda795534c9af3816dae47b2a2e 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -166,7 +166,13 @@ vy_run_env_stop_readers(struct vy_run_env *env) { for (int i = 0; i < env->reader_pool_size; i++) { struct vy_run_reader *reader = &env->reader_pool[i]; - cord_cancel_and_join(&reader->cord); + cbus_stop_loop(&reader->reader_pipe); + cpipe_destroy(&reader->reader_pipe); + } + for (int i = 0; i < env->reader_pool_size; i++) { + struct vy_run_reader *reader = &env->reader_pool[i]; + if (cord_join(&reader->cord) != 0) + panic_syserror("failed to join vinyl reader thread"); } free(env->reader_pool); } diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index 677bf5749135465d27328d12f30d214cc98586bc..6a88270b42f582b35c564a4b8d4f644a2b9d2d2b 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -350,16 +350,19 @@ vy_worker_pool_start(struct vy_worker_pool *pool) } } +/** Finish worker threads. */ static void -vy_worker_pool_stop(struct vy_worker_pool *pool) +vy_worker_pool_shutdown(struct vy_worker_pool *pool) { - assert(pool->workers != NULL); + if (pool->workers == NULL) + return; for (int i = 0; i < pool->size; i++) { struct vy_worker *worker = &pool->workers[i]; - cord_cancel_and_join(&worker->cord); + cbus_stop_loop(&worker->worker_pipe); + cpipe_destroy(&worker->worker_pipe); + if (cord_cojoin(&worker->cord) != 0) + panic_syserror("failed to join vinyl worker thread"); } - free(pool->workers); - pool->workers = NULL; } static void @@ -371,13 +374,6 @@ vy_worker_pool_create(struct vy_worker_pool *pool, const char *name, int size) stailq_create(&pool->idle_workers); } -static void -vy_worker_pool_destroy(struct vy_worker_pool *pool) -{ - if (pool->workers != NULL) - vy_worker_pool_stop(pool); -} - /** * Get an idle worker from a pool. */ @@ -429,6 +425,7 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, vy_scheduler_f); if (scheduler->scheduler_fiber == NULL) panic("failed to allocate vinyl scheduler fiber"); + fiber_set_joinable(scheduler->scheduler_fiber, true); fiber_cond_create(&scheduler->scheduler_cond); @@ -468,22 +465,55 @@ vy_scheduler_start(struct vy_scheduler *scheduler) fiber_start(scheduler->scheduler_fiber, scheduler); } +/** Complete and delete processed tasks in queue until it is empty. */ +static void +vy_scheduler_complete_tasks(struct vy_scheduler *scheduler, int *tasks_done, + int *tasks_failed); + void -vy_scheduler_destroy(struct vy_scheduler *scheduler) +vy_scheduler_shutdown(struct vy_scheduler *scheduler) { - /* Stop scheduler fiber. */ + /* + * Order is significant. Stop scheduler fiber before shutting down + * pools. Scheduler sends tasks using worker pipe and we destroy + * worker pipe in pool shutdown. + */ + fiber_cancel(scheduler->scheduler_fiber); + fiber_join(scheduler->scheduler_fiber); scheduler->scheduler_fiber = NULL; - /* Sic: fiber_cancel() can't be used here. */ - fiber_cond_signal(&scheduler->dump_cond); - fiber_cond_signal(&scheduler->scheduler_cond); + vy_worker_pool_shutdown(&scheduler->dump_pool); + vy_worker_pool_shutdown(&scheduler->compaction_pool); + /* + * Complete and free tasks in flight. They are cancelled on + * worker pool shutdown. + */ + while (true) { + int tasks_done, tasks_failed; + vy_scheduler_complete_tasks(scheduler, &tasks_done, + &tasks_failed); + if (scheduler->stat.tasks_inprogress == 0) + break; + fiber_cond_wait(&scheduler->scheduler_cond); + } +} - vy_worker_pool_destroy(&scheduler->dump_pool); - vy_worker_pool_destroy(&scheduler->compaction_pool); +void +vy_worker_pool_destroy(struct vy_worker_pool *pool) +{ + free(pool->workers); + TRASH(pool); +} + +void +vy_scheduler_destroy(struct vy_scheduler *scheduler) +{ diag_destroy(&scheduler->diag); fiber_cond_destroy(&scheduler->dump_cond); fiber_cond_destroy(&scheduler->scheduler_cond); vy_dump_heap_destroy(&scheduler->dump_heap); vy_compaction_heap_destroy(&scheduler->compaction_heap); + vy_worker_pool_destroy(&scheduler->dump_pool); + vy_worker_pool_destroy(&scheduler->compaction_pool); TRASH(scheduler); } @@ -598,8 +628,13 @@ vy_scheduler_dump(struct vy_scheduler *scheduler) * We must not start dump if checkpoint is in progress * so first wait for checkpoint to complete. */ - while (scheduler->checkpoint_in_progress) + while (scheduler->checkpoint_in_progress) { fiber_cond_wait(&scheduler->dump_cond); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + return -1; + } + } /* Trigger dump. */ if (!vy_scheduler_dump_in_progress(scheduler)) @@ -616,6 +651,10 @@ vy_scheduler_dump(struct vy_scheduler *scheduler) return -1; } fiber_cond_wait(&scheduler->dump_cond); + if (fiber_is_cancelled()) { + diag_set(FiberIsCancelled); + return -1; + } } return 0; } @@ -1996,47 +2035,50 @@ vy_task_complete(struct vy_task *task) return -1; } +static void +vy_scheduler_complete_tasks(struct vy_scheduler *scheduler, int *tasks_done, + int *tasks_failed) +{ + *tasks_done = 0; + *tasks_failed = 0; + /* + * Task completion callback may yield thus we have the loop to + * completely drain processed tasks queue. So that callers may + * wait for new tasks in the queue after calling this function. + */ + while (!stailq_empty(&scheduler->processed_tasks)) { + struct stailq tasks = STAILQ_INITIALIZER(tasks); + struct vy_task *task, *next; + stailq_concat(&tasks, &scheduler->processed_tasks); + stailq_foreach_entry_safe(task, next, &tasks, in_processed) { + if (vy_task_complete(task) == 0) + (*tasks_done)++; + else + (*tasks_failed)++; + vy_worker_pool_put(task->worker); + vy_task_delete(task); + } + } +} + static int vy_scheduler_f(va_list va) { struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); - while (scheduler->scheduler_fiber != NULL) { - struct stailq processed_tasks; - struct vy_task *task, *next; - int tasks_failed = 0, tasks_done = 0; + while (!fiber_is_cancelled()) { + struct vy_task *task; + int tasks_done, tasks_failed; fiber_check_gc(); - /* Get the list of processed tasks. */ - stailq_create(&processed_tasks); - stailq_concat(&processed_tasks, &scheduler->processed_tasks); - - /* Complete and delete all processed tasks. */ - stailq_foreach_entry_safe(task, next, &processed_tasks, - in_processed) { - if (vy_task_complete(task) != 0) - tasks_failed++; - else - tasks_done++; - vy_worker_pool_put(task->worker); - vy_task_delete(task); - } + vy_scheduler_complete_tasks(scheduler, &tasks_done, + &tasks_failed); /* * Reset the timeout if we managed to successfully * complete at least one task. */ - if (tasks_done > 0) { + if (tasks_done > 0) scheduler->timeout = 0; - /* - * Task completion callback may yield, which - * opens a time window for a worker to submit - * a processed task and wake up the scheduler - * (via scheduler_async). Hence we should go - * and recheck the processed_tasks in order not - * to lose a wakeup event and hang for good. - */ - continue; - } /* Throttle for a while if a task failed. */ if (tasks_failed > 0) goto error; diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h index fd99e837ff412d57f443adbb543f1da8d2fba085..ce7458799c9e1d4bbaf99e921e0f74f1653ea37b 100644 --- a/src/box/vy_scheduler.h +++ b/src/box/vy_scheduler.h @@ -187,6 +187,13 @@ vy_scheduler_create(struct vy_scheduler *scheduler, int write_threads, void vy_scheduler_start(struct vy_scheduler *scheduler); +/** + * Shutdown scheduler. Shutdown stops all internal fibers/threads. + * It may yield. + */ +void +vy_scheduler_shutdown(struct vy_scheduler *scheduler); + /** * Destroy a scheduler instance. */ diff --git a/src/lib/core/errinj.h b/src/lib/core/errinj.h index f510564b443a32707d3ec51723025853723ef692..b1dae67837c42f6ec6a01674963b4dd21eb7ab3d 100644 --- a/src/lib/core/errinj.h +++ b/src/lib/core/errinj.h @@ -154,7 +154,6 @@ struct errinj { _(ERRINJ_VY_INDEX_FILE_RENAME, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_LOG_FILE_RENAME, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_LOG_FLUSH, ERRINJ_BOOL, {.bparam = false}) \ - _(ERRINJ_VY_LOG_FLUSH_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_POINT_ITER_WAIT, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_QUOTA_DELAY, ERRINJ_BOOL, {.bparam = false}) \ _(ERRINJ_VY_READ_PAGE, ERRINJ_BOOL, {.bparam = false}) \ diff --git a/test/box/errinj.result b/test/box/errinj.result index af5dcc6cea753e8e8b48c8b47788542e71b527ac..bedc467f6fb889c1e495086f0bb1f515c4eacc6e 100644 --- a/test/box/errinj.result +++ b/test/box/errinj.result @@ -127,7 +127,6 @@ evals - ERRINJ_VY_INDEX_FILE_RENAME: false - ERRINJ_VY_LOG_FILE_RENAME: false - ERRINJ_VY_LOG_FLUSH: false - - ERRINJ_VY_LOG_FLUSH_DELAY: false - ERRINJ_VY_POINT_ITER_WAIT: false - ERRINJ_VY_QUOTA_DELAY: false - ERRINJ_VY_READ_PAGE: false diff --git a/test/vinyl-luatest/shutdown_test.lua b/test/vinyl-luatest/shutdown_test.lua new file mode 100644 index 0000000000000000000000000000000000000000..860b42c3c8c3df3f1baaf38d8f50ebc5a14c9748 --- /dev/null +++ b/test/vinyl-luatest/shutdown_test.lua @@ -0,0 +1,88 @@ +local server = require('luatest.server') +local fiber = require('fiber') +local t = require('luatest') + +local g = t.group() + +g.before_each(function(cg) + cg.server = server:new() + cg.server:start() +end) + +g.after_each(function(cg) + if cg.server ~= nil then + cg.server:drop() + end +end) + +local function test_no_hang_on_shutdown(server) + local channel = fiber.channel() + fiber.create(function() + server:stop() + channel:put('finished') + end) + t.assert(channel:get(60) ~= nil) +end + +-- Test we interrupt wait on vinyl dump caused by index creation. +-- Case 1. No snapshot is in progress. +g.test_shutdown_vinyl_dump = function(cg) + t.tarantool.skip_if_not_debug() + cg.server:exec(function() + local fiber = require('fiber') + box.schema.create_space('test', {engine = 'vinyl'}) + box.space.test:create_index('pk') + fiber.set_slice(100) + box.begin() + for i=1,10000 do + box.space.test:insert{i, i} + end + box.commit() + box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.01) + fiber.new(function() + box.space.test:create_index('sk', {parts = {2}}) + end) + end) + -- There are other yields before dump on vinyl index creation. + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log('dump started')) + end) + test_no_hang_on_shutdown(cg.server) +end + +-- Test we interrupt wait on vinyl dump caused by index creation. +-- Case 2. Snapshot is in progress. +g.test_shutdown_vinyl_dump_during_snapshot = function(cg) + t.tarantool.skip_if_not_debug() + cg.server:exec(function() + local fiber = require('fiber') + box.schema.create_space('test', {engine = 'vinyl'}) + box.space.test:create_index('pk') + fiber.set_slice(100) + box.begin() + for i=1,10000 do + box.space.test:insert{i, i} + end + box.commit() + box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.01) + fiber.new(function() + box.snapshot() + end) + end) + t.helpers.retrying({}, function() + t.assert(cg.server:grep_log('vinyl checkpoint started')) + end) + cg.server:exec(function() + local fiber = require('fiber') + fiber.new(function() + box.space.test:create_index('sk', {parts = {2}}) + end) + end) + -- Sleep to pass building index and reach sleep in dump on waiting + -- checkpoint to finish. If we fail to pass index code we will + -- be able to pass the test as index building loop is cancellable. + -- Yet this test is not aimed to test the cancellability of index + -- building loop. + fiber.sleep(3) + test_no_hang_on_shutdown(cg.server) +end diff --git a/test/vinyl/errinj.result b/test/vinyl/errinj.result index e30c34ab11c1dc774183890e3b0cc9f2cecbf9d5..6e759a2874248c5614915b5eecc8569c55989c35 100644 --- a/test/vinyl/errinj.result +++ b/test/vinyl/errinj.result @@ -518,7 +518,7 @@ _ = s:create_index('i1', {parts = {1, 'unsigned'}}) _ = s:create_index('i2', {parts = {2, 'unsigned'}}) --- ... -box.error.injection.set('ERRINJ_VY_DUMP_DELAY', true) +box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.1) --- - ok ... @@ -878,9 +878,8 @@ s:drop() --- ... -- --- gh-3412 - assertion failure at exit in case: --- * there is a fiber waiting for quota --- * there is a pending vylog write +-- gh-3412 - assertion failure at exit in case there is a fiber waiting for +-- quota -- test_run:cmd("create server low_quota with script='vinyl/low_quota.lua'") --- @@ -916,23 +915,6 @@ _ = fiber.create(function() for i = 1, 11 do box.space.test:replace{i, pad} end repeat fiber.sleep(0.001) until box.cfg.vinyl_memory - box.stat.vinyl().memory.level0 < pad:len() --- ... -test_run:cmd("restart server low_quota with args='1048576'") -box.error.injection.set('ERRINJ_VY_LOG_FLUSH_DELAY', true) ---- -- ok -... -fiber = require('fiber') ---- -... -pad = string.rep('x', 100 * 1024) ---- -... -_ = fiber.create(function() for i = 1, 11 do box.space.test:replace{i, pad} end end) ---- -... -repeat fiber.sleep(0.001) until box.cfg.vinyl_memory - box.stat.vinyl().memory.level0 < pad:len() ---- -... test_run:cmd('switch default') --- - true diff --git a/test/vinyl/errinj.test.lua b/test/vinyl/errinj.test.lua index 0a7beac682738df50e71ea6d3f057ca132e3d9f5..bbce29e617d90717294995802302dd8bd5794602 100644 --- a/test/vinyl/errinj.test.lua +++ b/test/vinyl/errinj.test.lua @@ -187,7 +187,7 @@ box.cfg{vinyl_timeout = 0.001} s = box.schema.space.create('test', {engine = 'vinyl'}) _ = s:create_index('i1', {parts = {1, 'unsigned'}}) _ = s:create_index('i2', {parts = {2, 'unsigned'}}) -box.error.injection.set('ERRINJ_VY_DUMP_DELAY', true) +box.error.injection.set('ERRINJ_VY_RUN_WRITE_STMT_TIMEOUT', 0.1) for i = 1, 1000 do s:replace{i, i} end _ = fiber.create(function() box.snapshot() end) fiber.sleep(0.01) @@ -312,9 +312,8 @@ ret s:drop() -- --- gh-3412 - assertion failure at exit in case: --- * there is a fiber waiting for quota --- * there is a pending vylog write +-- gh-3412 - assertion failure at exit in case there is a fiber waiting for +-- quota -- test_run:cmd("create server low_quota with script='vinyl/low_quota.lua'") test_run:cmd("start server low_quota with args='1048576'") @@ -326,12 +325,6 @@ fiber = require('fiber') pad = string.rep('x', 100 * 1024) _ = fiber.create(function() for i = 1, 11 do box.space.test:replace{i, pad} end end) repeat fiber.sleep(0.001) until box.cfg.vinyl_memory - box.stat.vinyl().memory.level0 < pad:len() -test_run:cmd("restart server low_quota with args='1048576'") -box.error.injection.set('ERRINJ_VY_LOG_FLUSH_DELAY', true) -fiber = require('fiber') -pad = string.rep('x', 100 * 1024) -_ = fiber.create(function() for i = 1, 11 do box.space.test:replace{i, pad} end end) -repeat fiber.sleep(0.001) until box.cfg.vinyl_memory - box.stat.vinyl().memory.level0 < pad:len() test_run:cmd('switch default') test_run:cmd("stop server low_quota") test_run:cmd("cleanup server low_quota")