diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index c11da5e3d90457f5269f5a866b0cc8c3508a81b6..a1f3d1dd9c7f0470ba5895ed3eda36f3b5218e52 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -83,6 +83,7 @@ add_library(box STATIC vy_log.c vy_upsert.c vy_read_set.c + vy_scheduler.c space.c space_def.c sequence.c diff --git a/src/box/vinyl.c b/src/box/vinyl.c index e33a10916bb2a628d6bb8b6d10ef78c9f1edcec0..e21e0be6d7027a3c450407baf19a99de08a0ade8 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -41,6 +41,7 @@ #include "vy_write_iterator.h" #include "vy_read_iterator.h" #include "vy_quota.h" +#include "vy_scheduler.h" #include "vy_stat.h" #include <small/lsregion.h> @@ -61,9 +62,6 @@ #include "trigger.h" #include "checkpoint.h" -#define HEAP_FORWARD_DECLARATION -#include "salad/heap.h" - /** * Yield after iterating over this many objects (e.g. ranges). * Yield more often in debug mode. @@ -74,8 +72,6 @@ enum { VY_YIELD_LOOPS = 128 }; enum { VY_YIELD_LOOPS = 2 }; #endif -struct vy_scheduler; -struct vy_task; struct vy_stat; struct vy_squash_queue; @@ -267,250 +263,6 @@ struct vy_cursor { struct vy_read_iterator iterator; }; -/** - * Allocate a new run for an index and write the information - * about it to the metadata log so that we could still find - * and delete it in case a write error occured. This function - * is called from dump/compaction task constructor. - */ -static struct vy_run * -vy_run_prepare(struct vy_index *index) -{ - struct vy_run *run = vy_run_new(vy_log_next_id()); - if (run == NULL) - return NULL; - vy_log_tx_begin(); - vy_log_prepare_run(index->commit_lsn, run->id); - if (vy_log_tx_commit() < 0) { - vy_run_unref(run); - return NULL; - } - return run; -} - -/** - * Free an incomplete run and write a record to the metadata - * log indicating that the run is not needed any more. - * This function is called on dump/compaction task abort. - */ -static void -vy_run_discard(struct vy_run *run) -{ - int64_t run_id = run->id; - - vy_run_unref(run); - - ERROR_INJECT(ERRINJ_VY_RUN_DISCARD, - {say_error("error injection: run %lld not discarded", - (long long)run_id); return;}); - - vy_log_tx_begin(); - /* - * The run hasn't been used and can be deleted right away - * so set gc_lsn to minimal possible (0). - */ - vy_log_drop_run(run_id, 0); - if (vy_log_tx_commit() < 0) { - /* - * Failure to log deletion of an incomplete - * run means that we won't retry to delete - * its files on log rotation. We will do that - * after restart though, so just warn and - * carry on. - */ - struct error *e = diag_last_error(diag_get()); - say_warn("failed to log run %lld deletion: %s", - (long long)run_id, e->errmsg); - } -} - -#define HEAP_NAME vy_dump_heap - -static bool -vy_dump_heap_less(struct heap_node *a, struct heap_node *b) -{ - struct vy_index *i1 = container_of(a, struct vy_index, in_dump); - struct vy_index *i2 = container_of(b, struct vy_index, in_dump); - - /* - * Indexes that are currently being dumped or can't be scheduled - * for dump right now are moved off the top of the heap. - */ - if (i1->is_dumping != i2->is_dumping) - return i1->is_dumping < i2->is_dumping; - if (i1->pin_count != i2->pin_count) - return i1->pin_count < i2->pin_count; - - /* Older indexes are dumped first. */ - int64_t i1_generation = vy_index_generation(i1); - int64_t i2_generation = vy_index_generation(i2); - if (i1_generation != i2_generation) - return i1_generation < i2_generation; - /* - * If a space has more than one index, appending a statement - * to it requires reading the primary index to get the old - * tuple and delete it from secondary indexes. This means that - * on local recovery from WAL, the primary index must not be - * ahead of secondary indexes of the same space, i.e. it must - * be dumped last. - */ - return i1->id > i2->id; -} - -#define HEAP_LESS(h, l, r) vy_dump_heap_less(l, r) - -#include "salad/heap.h" - -#undef HEAP_LESS -#undef HEAP_NAME - -#define HEAP_NAME vy_compact_heap - -static bool -vy_compact_heap_less(struct heap_node *a, struct heap_node *b) -{ - struct vy_index *i1 = container_of(a, struct vy_index, in_compact); - struct vy_index *i2 = container_of(b, struct vy_index, in_compact); - /* - * Prefer indexes whose read amplification will be reduced - * most as a result of compaction. - */ - return vy_index_compact_priority(i1) > vy_index_compact_priority(i2); -} - -#define HEAP_LESS(h, l, r) vy_compact_heap_less(l, r) - -#include "salad/heap.h" - -#undef HEAP_LESS -#undef HEAP_NAME - -typedef void -(*vy_scheduler_dump_complete_f)(int64_t dump_generation, double dump_duration, - void *arg); - -struct vy_scheduler { - /** - * Function called by the scheduler upon dump round - * completion. It is supposed to free memory released - * by the dump. - */ - vy_scheduler_dump_complete_f dump_complete_cb; - /** Argument passed to dump_complete_cb(). */ - void *cb_arg; - /** List of read views, see tx_manager::read_views. */ - struct rlist *read_views; - /** Context needed for writing runs. */ - struct vy_run_env *run_env; - - pthread_mutex_t mutex; - heap_t dump_heap; - heap_t compact_heap; - - struct cord *worker_pool; - struct fiber *scheduler; - struct ev_loop *loop; - /** Total number of worker threads. */ - int worker_pool_size; - /** Number worker threads that are currently idle. */ - int workers_available; - bool is_worker_pool_running; - - /** - * There is a pending task for workers in the pool, - * or we want to shutdown workers. - */ - pthread_cond_t worker_cond; - /** - * There is no pending tasks for workers, so scheduler - * needs to create one, or we want to shutdown the - * scheduler. Scheduler is a fiber in TX, so ev_async + fiber_channel - * are used here instead of pthread_cond_t. - */ - struct ev_async scheduler_async; - struct fiber_cond scheduler_cond; - /** - * A queue with all vy_task objects created by the - * scheduler and not yet taken by a worker. - */ - struct stailq input_queue; - /** - * A queue of processed vy_tasks objects. - */ - struct stailq output_queue; - /** - * A memory pool for vy_tasks. - */ - struct mempool task_pool; - - /** Last error seen by the scheduler. */ - struct diag diag; - /** - * Schedule timeout. Grows exponentially with each successive - * failure. Reset on successful task completion. - */ - ev_tstamp timeout; - /** Set if the scheduler is throttled due to errors. */ - bool is_throttled; - /** Set if checkpoint is in progress. */ - bool checkpoint_in_progress; - /** - * In order to guarantee checkpoint consistency, we must not - * dump in-memory trees created after checkpoint was started - * so we set this flag instead, which will make the scheduler - * schedule a dump as soon as checkpoint is complete. - */ - bool dump_pending; - /** - * Current generation of in-memory data. - * - * New in-memory trees inherit the current generation, while - * the scheduler dumps all in-memory trees whose generation - * is less. The generation is increased either on checkpoint - * or on exceeding the memory quota to force dumping all old - * in-memory trees. - */ - int64_t generation; - /** - * Generation of in-memory data currently being dumped. - * - * If @dump_generation < @generation, the scheduler is dumping - * in-memory trees created at @dump_generation. When all such - * trees have been dumped, it bumps @dump_generation and calls - * the dump_complete_cb(), which is supposed to free memory. - * - * If @dump_generation == @generation, dump have been completed - * and the scheduler won't schedule a dump task until @generation - * is bumped, which may happen either on exceeding the memory - * quota or on checkpoint. - * - * Throughout the code, a process of dumping all in-memory trees - * at @dump_generation is called 'dump round'. - */ - int64_t dump_generation; - /** Number of dump tasks that are currently in progress. */ - int dump_task_count; - /** Time when the current dump round started. */ - ev_tstamp dump_start; - /** Signaled on dump round completion. */ - struct fiber_cond dump_cond; -}; - -static void -vy_scheduler_add_index(struct vy_scheduler *, struct vy_index *); -static void -vy_scheduler_update_index(struct vy_scheduler *, struct vy_index *); -static void -vy_scheduler_remove_index(struct vy_scheduler *, struct vy_index *); -static void -vy_scheduler_pin_index(struct vy_scheduler *, struct vy_index *); -static void -vy_scheduler_unpin_index(struct vy_scheduler *, struct vy_index *); -static void -vy_scheduler_trigger_dump(struct vy_scheduler *); -static void -vy_scheduler_complete_dump(struct vy_scheduler *); - /** * A quick intro into Vinyl cosmology and file format * -------------------------------------------------- @@ -544,1409 +296,6 @@ vy_scheduler_complete_dump(struct vy_scheduler *); * in vinyl.meta file. */ -/* {{{ Scheduler Task */ - -struct vy_task_ops { - /** - * This function is called from a worker. It is supposed to do work - * which is too heavy for the tx thread (like IO or compression). - * Returns 0 on success. On failure returns -1 and sets diag. - */ - int (*execute)(struct vy_task *task); - /** - * This function is called by the scheduler upon task completion. - * It may be used to finish the task from the tx thread context. - * - * Returns 0 on success. On failure returns -1 and sets diag. - */ - int (*complete)(struct vy_scheduler *scheduler, struct vy_task *task); - /** - * This function is called by the scheduler if either ->execute - * or ->complete failed. It may be used to undo changes done to - * the index when preparing the task. - * - * If @in_shutdown is set, the callback is invoked from the - * engine destructor. - */ - void (*abort)(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown); -}; - -struct vy_task { - const struct vy_task_ops *ops; - /** Return code of ->execute. */ - int status; - /** If ->execute fails, the error is stored here. */ - struct diag diag; - /** Index this task is for. */ - struct vy_index *index; - /** Range to compact. */ - struct vy_range *range; - /** Run written by this task. */ - struct vy_run *new_run; - /** Write iterator producing statements for the new run. */ - struct vy_stmt_stream *wi; - /** - * First (newest) and last (oldest) slices to compact. - * - * While a compaction task is in progress, a new slice - * can be added to a range by concurrent dump, so we - * need to remember the slices we are compacting. - */ - struct vy_slice *first_slice, *last_slice; - /** - * A link in the list of all pending tasks, generated by - * task scheduler. - */ - struct stailq_entry link; - /** For run-writing tasks: maximum possible number of tuple to write */ - size_t max_output_count; - /** - * Save the snapshot of some index_opts attributes since - * they can be modified in the index->opts by - * an index:alter() call. - */ - double bloom_fpr; - int64_t page_size; -}; - -/** - * Allocate a new task to be executed by a worker thread. - * When preparing an asynchronous task, this function must - * be called before yielding the current fiber in order to - * pin the index the task is for so that a concurrent fiber - * does not free it from under us. - */ -static struct vy_task * -vy_task_new(struct mempool *pool, struct vy_index *index, - const struct vy_task_ops *ops) -{ - struct vy_task *task = mempool_alloc(pool); - if (task == NULL) { - diag_set(OutOfMemory, sizeof(*task), "scheduler", "task"); - return NULL; - } - memset(task, 0, sizeof(*task)); - task->ops = ops; - task->index = index; - vy_index_ref(index); - diag_create(&task->diag); - return task; -} - -/** Free a task allocated with vy_task_new(). */ -static void -vy_task_delete(struct mempool *pool, struct vy_task *task) -{ - vy_index_unref(task->index); - diag_destroy(&task->diag); - TRASH(task); - mempool_free(pool, task); -} - -static int -vy_task_dump_execute(struct vy_task *task) -{ - struct vy_index *index = task->index; - - return vy_run_write(task->new_run, index->env->path, - index->space_id, index->id, task->wi, - task->page_size, index->cmp_def, - index->key_def, task->max_output_count, - task->bloom_fpr); -} - -static int -vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task) -{ - struct vy_index *index = task->index; - struct vy_run *new_run = task->new_run; - int64_t dump_lsn = new_run->dump_lsn; - struct tuple_format *key_format = index->env->key_format; - struct vy_mem *mem, *next_mem; - struct vy_slice **new_slices, *slice; - struct vy_range *range, *begin_range, *end_range; - struct tuple *min_key, *max_key; - int i, loops = 0; - - assert(index->is_dumping); - - if (vy_run_is_empty(new_run)) { - /* - * In case the run is empty, we can discard the run - * and delete dumped in-memory trees right away w/o - * inserting slices into ranges. However, we need - * to log index dump anyway. - */ - vy_log_tx_begin(); - vy_log_dump_index(index->commit_lsn, dump_lsn); - if (vy_log_tx_commit() < 0) - goto fail; - vy_run_discard(new_run); - goto delete_mems; - } - - assert(new_run->info.min_lsn > index->dump_lsn); - assert(new_run->info.max_lsn <= dump_lsn); - - /* - * Figure out which ranges intersect the new run. - * @begin_range is the first range intersecting the run. - * @end_range is the range following the last range - * intersecting the run or NULL if the run itersects all - * ranges. - */ - min_key = vy_key_from_msgpack(key_format, new_run->info.min_key); - if (min_key == NULL) - goto fail; - max_key = vy_key_from_msgpack(key_format, new_run->info.max_key); - if (max_key == NULL) { - tuple_unref(min_key); - goto fail; - } - begin_range = vy_range_tree_psearch(index->tree, min_key); - end_range = vy_range_tree_nsearch(index->tree, max_key); - tuple_unref(min_key); - tuple_unref(max_key); - - /* - * For each intersected range allocate a slice of the new run. - */ - new_slices = calloc(index->range_count, sizeof(*new_slices)); - if (new_slices == NULL) { - diag_set(OutOfMemory, index->range_count * sizeof(*new_slices), - "malloc", "struct vy_slice *"); - goto fail; - } - for (range = begin_range, i = 0; range != end_range; - range = vy_range_tree_next(index->tree, range), i++) { - slice = vy_slice_new(vy_log_next_id(), new_run, - range->begin, range->end, index->cmp_def); - if (slice == NULL) - goto fail_free_slices; - - assert(i < index->range_count); - new_slices[i] = slice; - /* - * It's OK to yield here for the range tree can only - * be changed from the scheduler fiber. - */ - if (++loops % VY_YIELD_LOOPS == 0) - fiber_sleep(0); - } - - /* - * Log change in metadata. - */ - vy_log_tx_begin(); - vy_log_create_run(index->commit_lsn, new_run->id, dump_lsn); - for (range = begin_range, i = 0; range != end_range; - range = vy_range_tree_next(index->tree, range), i++) { - assert(i < index->range_count); - slice = new_slices[i]; - vy_log_insert_slice(range->id, new_run->id, slice->id, - tuple_data_or_null(slice->begin), - tuple_data_or_null(slice->end)); - - if (++loops % VY_YIELD_LOOPS == 0) - fiber_sleep(0); /* see comment above */ - } - vy_log_dump_index(index->commit_lsn, dump_lsn); - if (vy_log_tx_commit() < 0) - goto fail_free_slices; - - /* - * Account the new run. - */ - vy_index_add_run(index, new_run); - vy_stmt_counter_add_disk(&index->stat.disk.dump.out, &new_run->count); - - /* Drop the reference held by the task. */ - vy_run_unref(new_run); - - /* - * Add new slices to ranges. - */ - for (range = begin_range, i = 0; range != end_range; - range = vy_range_tree_next(index->tree, range), i++) { - assert(i < index->range_count); - slice = new_slices[i]; - vy_index_unacct_range(index, range); - vy_range_add_slice(range, slice); - vy_index_acct_range(index, range); - vy_range_update_compact_priority(range, &index->opts); - if (!vy_range_is_scheduled(range)) - vy_range_heap_update(&index->range_heap, - &range->heap_node); - range->version++; - /* - * If we yield here, a concurrent fiber will see - * a range with a run slice containing statements - * present in the in-memory trees of the index. - * This is OK, because read iterator won't use the - * new run slice until index->dump_lsn is bumped, - * which is only done after in-memory trees are - * removed (see vy_read_iterator_add_disk()). - */ - if (++loops % VY_YIELD_LOOPS == 0) - fiber_sleep(0); - } - free(new_slices); - -delete_mems: - /* - * Delete dumped in-memory trees. - */ - rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { - if (mem->generation > scheduler->dump_generation) - continue; - vy_stmt_counter_add(&index->stat.disk.dump.in, &mem->count); - vy_index_delete_mem(index, mem); - } - index->dump_lsn = dump_lsn; - index->stat.disk.dump.count++; - - /* The iterator has been cleaned up in a worker thread. */ - task->wi->iface->close(task->wi); - - index->is_dumping = false; - vy_scheduler_update_index(scheduler, index); - - if (index->id != 0) - vy_scheduler_unpin_index(scheduler, index->pk); - - assert(scheduler->dump_task_count > 0); - scheduler->dump_task_count--; - - vy_scheduler_complete_dump(scheduler); - - say_info("%s: dump completed", vy_index_name(index)); - return 0; - -fail_free_slices: - for (i = 0; i < index->range_count; i++) { - slice = new_slices[i]; - if (slice != NULL) - vy_slice_delete(slice); - if (++loops % VY_YIELD_LOOPS == 0) - fiber_sleep(0); - } - free(new_slices); -fail: - return -1; -} - -static void -vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown) -{ - struct vy_index *index = task->index; - - assert(index->is_dumping); - - /* The iterator has been cleaned up in a worker thread. */ - task->wi->iface->close(task->wi); - - /* - * It's no use alerting the user if the server is - * shutting down or the index was dropped. - */ - if (!in_shutdown && !index->is_dropped) { - say_error("%s: dump failed: %s", vy_index_name(index), - diag_last_error(&task->diag)->errmsg); - } - - /* The metadata log is unavailable on shutdown. */ - if (!in_shutdown) - vy_run_discard(task->new_run); - else - vy_run_unref(task->new_run); - - index->is_dumping = false; - vy_scheduler_update_index(scheduler, index); - - if (index->id != 0) - vy_scheduler_unpin_index(scheduler, index->pk); - - assert(scheduler->dump_task_count > 0); - scheduler->dump_task_count--; -} - -/** - * Create a task to dump an index. - * - * On success the task is supposed to dump all in-memory - * trees created at @scheduler->dump_generation. - */ -static int -vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, - struct vy_task **p_task) -{ - static struct vy_task_ops dump_ops = { - .execute = vy_task_dump_execute, - .complete = vy_task_dump_complete, - .abort = vy_task_dump_abort, - }; - - assert(!index->is_dropped); - assert(!index->is_dumping); - assert(index->pin_count == 0); - assert(vy_index_generation(index) == scheduler->dump_generation); - assert(scheduler->dump_generation < scheduler->generation); - - struct errinj *inj = errinj(ERRINJ_VY_INDEX_DUMP, ERRINJ_INT); - if (inj != NULL && inj->iparam == (int)index->id) { - diag_set(ClientError, ER_INJECTION, "vinyl index dump"); - goto err; - } - - /* Rotate the active tree if it needs to be dumped. */ - if (index->mem->generation == scheduler->dump_generation && - vy_index_rotate_mem(index) != 0) - goto err; - - /* - * Wait until all active writes to in-memory trees - * eligible for dump are over. - */ - int64_t dump_lsn = -1; - size_t max_output_count = 0; - struct vy_mem *mem, *next_mem; - rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { - if (mem->generation > scheduler->dump_generation) - continue; - vy_mem_wait_pinned(mem); - if (mem->tree.size == 0) { - /* - * The tree is empty so we can delete it - * right away, without involving a worker. - */ - vy_index_delete_mem(index, mem); - continue; - } - dump_lsn = MAX(dump_lsn, mem->max_lsn); - max_output_count += mem->tree.size; - } - - if (max_output_count == 0) { - /* Nothing to do, pick another index. */ - vy_scheduler_update_index(scheduler, index); - vy_scheduler_complete_dump(scheduler); - return 0; - } - - struct vy_task *task = vy_task_new(&scheduler->task_pool, - index, &dump_ops); - if (task == NULL) - goto err; - - struct vy_run *new_run = vy_run_prepare(index); - if (new_run == NULL) - goto err_run; - - assert(dump_lsn >= 0); - new_run->dump_lsn = dump_lsn; - - struct vy_stmt_stream *wi; - bool is_last_level = (index->run_count == 0); - wi = vy_write_iterator_new(index->cmp_def, index->disk_format, - index->upsert_format, index->id == 0, - is_last_level, scheduler->read_views); - if (wi == NULL) - goto err_wi; - rlist_foreach_entry(mem, &index->sealed, in_sealed) { - if (mem->generation > scheduler->dump_generation) - continue; - if (vy_write_iterator_new_mem(wi, mem) != 0) - goto err_wi_sub; - } - - task->new_run = new_run; - task->wi = wi; - task->max_output_count = max_output_count; - task->bloom_fpr = index->opts.bloom_fpr; - task->page_size = index->opts.page_size; - - index->is_dumping = true; - vy_scheduler_update_index(scheduler, index); - - if (index->id != 0) { - /* - * The primary index must be dumped after all - * secondary indexes of the same space - see - * vy_dump_heap_less(). To make sure it isn't - * picked by the scheduler while all secondary - * indexes are being dumped, temporarily remove - * it from the dump heap. - */ - vy_scheduler_pin_index(scheduler, index->pk); - } - - scheduler->dump_task_count++; - - say_info("%s: dump started", vy_index_name(index)); - *p_task = task; - return 0; - -err_wi_sub: - task->wi->iface->close(wi); -err_wi: - vy_run_discard(new_run); -err_run: - vy_task_delete(&scheduler->task_pool, task); -err: - say_error("%s: could not start dump: %s", vy_index_name(index), - diag_last_error(diag_get())->errmsg); - return -1; -} - -static int -vy_task_compact_execute(struct vy_task *task) -{ - struct vy_index *index = task->index; - - return vy_run_write(task->new_run, index->env->path, - index->space_id, index->id, task->wi, - task->page_size, index->cmp_def, - index->key_def, task->max_output_count, - task->bloom_fpr); -} - -static int -vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task) -{ - struct vy_index *index = task->index; - struct vy_range *range = task->range; - struct vy_run *new_run = task->new_run; - struct vy_slice *first_slice = task->first_slice; - struct vy_slice *last_slice = task->last_slice; - struct vy_slice *slice, *next_slice, *new_slice = NULL; - struct vy_run *run; - - /* - * Allocate a slice of the new run. - * - * If the run is empty, we don't need to allocate a new slice - * and insert it into the range, but we still need to delete - * compacted runs. - */ - if (!vy_run_is_empty(new_run)) { - new_slice = vy_slice_new(vy_log_next_id(), new_run, NULL, NULL, - index->cmp_def); - if (new_slice == NULL) - return -1; - } - - /* - * Build the list of runs that became unused - * as a result of compaction. - */ - RLIST_HEAD(unused_runs); - for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { - slice->run->compacted_slice_count++; - if (slice == last_slice) - break; - } - for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { - run = slice->run; - if (run->compacted_slice_count == run->refs) - rlist_add_entry(&unused_runs, run, in_unused); - slice->run->compacted_slice_count = 0; - if (slice == last_slice) - break; - } - - /* - * Log change in metadata. - */ - vy_log_tx_begin(); - for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { - vy_log_delete_slice(slice->id); - if (slice == last_slice) - break; - } - int64_t gc_lsn = checkpoint_last(NULL); - rlist_foreach_entry(run, &unused_runs, in_unused) - vy_log_drop_run(run->id, gc_lsn); - if (new_slice != NULL) { - vy_log_create_run(index->commit_lsn, new_run->id, - new_run->dump_lsn); - vy_log_insert_slice(range->id, new_run->id, new_slice->id, - tuple_data_or_null(new_slice->begin), - tuple_data_or_null(new_slice->end)); - } - if (vy_log_tx_commit() < 0) { - if (new_slice != NULL) - vy_slice_delete(new_slice); - return -1; - } - - /* - * Account the new run if it is not empty, - * otherwise discard it. - */ - if (new_slice != NULL) { - vy_index_add_run(index, new_run); - vy_stmt_counter_add_disk(&index->stat.disk.compact.out, - &new_run->count); - /* Drop the reference held by the task. */ - vy_run_unref(new_run); - } else - vy_run_discard(new_run); - - /* - * Replace compacted slices with the resulting slice. - * - * Note, since a slice might have been added to the range - * by a concurrent dump while compaction was in progress, - * we must insert the new slice at the same position where - * the compacted slices were. - */ - RLIST_HEAD(compacted_slices); - vy_index_unacct_range(index, range); - if (new_slice != NULL) - vy_range_add_slice_before(range, new_slice, first_slice); - for (slice = first_slice; ; slice = next_slice) { - next_slice = rlist_next_entry(slice, in_range); - vy_range_remove_slice(range, slice); - rlist_add_entry(&compacted_slices, slice, in_range); - vy_stmt_counter_add_disk(&index->stat.disk.compact.in, - &slice->count); - if (slice == last_slice) - break; - } - range->n_compactions++; - range->version++; - vy_index_acct_range(index, range); - vy_range_update_compact_priority(range, &index->opts); - index->stat.disk.compact.count++; - - /* - * Unaccount unused runs and delete compacted slices. - */ - rlist_foreach_entry(run, &unused_runs, in_unused) - vy_index_remove_run(index, run); - rlist_foreach_entry_safe(slice, &compacted_slices, - in_range, next_slice) { - vy_slice_wait_pinned(slice); - vy_slice_delete(slice); - } - - /* The iterator has been cleaned up in worker. */ - task->wi->iface->close(task->wi); - - assert(range->heap_node.pos == UINT32_MAX); - vy_range_heap_insert(&index->range_heap, &range->heap_node); - vy_scheduler_update_index(scheduler, index); - - say_info("%s: completed compacting range %s", - vy_index_name(index), vy_range_str(range)); - return 0; -} - -static void -vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task, - bool in_shutdown) -{ - struct vy_index *index = task->index; - struct vy_range *range = task->range; - - /* The iterator has been cleaned up in worker. */ - task->wi->iface->close(task->wi); - - /* - * It's no use alerting the user if the server is - * shutting down or the index was dropped. - */ - if (!in_shutdown && !index->is_dropped) { - say_error("%s: failed to compact range %s: %s", - vy_index_name(index), vy_range_str(range), - diag_last_error(&task->diag)->errmsg); - } - - /* The metadata log is unavailable on shutdown. */ - if (!in_shutdown) - vy_run_discard(task->new_run); - else - vy_run_unref(task->new_run); - - assert(range->heap_node.pos == UINT32_MAX); - vy_range_heap_insert(&index->range_heap, &range->heap_node); - vy_scheduler_update_index(scheduler, index); -} - -static int -vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, - struct vy_task **p_task) -{ - static struct vy_task_ops compact_ops = { - .execute = vy_task_compact_execute, - .complete = vy_task_compact_complete, - .abort = vy_task_compact_abort, - }; - - struct heap_node *range_node; - struct vy_range *range; - - assert(!index->is_dropped); - - range_node = vy_range_heap_top(&index->range_heap); - assert(range_node != NULL); - range = container_of(range_node, struct vy_range, heap_node); - assert(range->compact_priority > 1); - - if (vy_index_split_range(index, range) || - vy_index_coalesce_range(index, range)) { - vy_scheduler_update_index(scheduler, index); - return 0; - } - - struct vy_task *task = vy_task_new(&scheduler->task_pool, - index, &compact_ops); - if (task == NULL) - goto err_task; - - struct vy_run *new_run = vy_run_prepare(index); - if (new_run == NULL) - goto err_run; - - struct vy_stmt_stream *wi; - bool is_last_level = (range->compact_priority == range->slice_count); - wi = vy_write_iterator_new(index->cmp_def, index->disk_format, - index->upsert_format, index->id == 0, - is_last_level, scheduler->read_views); - if (wi == NULL) - goto err_wi; - - struct vy_slice *slice; - int n = range->compact_priority; - rlist_foreach_entry(slice, &range->slices, in_range) { - if (vy_write_iterator_new_slice(wi, slice, - scheduler->run_env) != 0) - goto err_wi_sub; - - task->max_output_count += slice->count.rows; - new_run->dump_lsn = MAX(new_run->dump_lsn, - slice->run->dump_lsn); - - /* Remember the slices we are compacting. */ - if (task->first_slice == NULL) - task->first_slice = slice; - task->last_slice = slice; - - if (--n == 0) - break; - } - assert(n == 0); - assert(new_run->dump_lsn >= 0); - - task->range = range; - task->new_run = new_run; - task->wi = wi; - task->bloom_fpr = index->opts.bloom_fpr; - task->page_size = index->opts.page_size; - - /* - * Remove the range we are going to compact from the heap - * so that it doesn't get selected again. - */ - vy_range_heap_delete(&index->range_heap, range_node); - range_node->pos = UINT32_MAX; - vy_scheduler_update_index(scheduler, index); - - say_info("%s: started compacting range %s, runs %d/%d", - vy_index_name(index), vy_range_str(range), - range->compact_priority, range->slice_count); - *p_task = task; - return 0; - -err_wi_sub: - task->wi->iface->close(wi); -err_wi: - vy_run_discard(new_run); -err_run: - vy_task_delete(&scheduler->task_pool, task); -err_task: - say_error("%s: could not start compacting range %s: %s", - vy_index_name(index), vy_range_str(range), - diag_last_error(diag_get())->errmsg); - return -1; -} - -/* Scheduler Task }}} */ - -/* {{{ Scheduler */ - -/* Min and max values for vy_scheduler->timeout. */ -#define VY_SCHEDULER_TIMEOUT_MIN 1 -#define VY_SCHEDULER_TIMEOUT_MAX 60 - -static void -vy_scheduler_start_workers(struct vy_scheduler *scheduler); -static void -vy_scheduler_stop_workers(struct vy_scheduler *scheduler); -static int -vy_scheduler_f(va_list va); - -static void -vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) -{ - (void) loop; - (void) events; - struct vy_scheduler *scheduler = - container_of(watcher, struct vy_scheduler, scheduler_async); - fiber_cond_signal(&scheduler->scheduler_cond); -} - -static struct vy_scheduler * -vy_scheduler_new(int write_threads, - vy_scheduler_dump_complete_f dump_complete_cb, void *cb_arg, - struct vy_run_env *run_env, struct rlist *read_views) -{ - struct vy_scheduler *scheduler = calloc(1, sizeof(*scheduler)); - if (scheduler == NULL) { - diag_set(OutOfMemory, sizeof(*scheduler), "scheduler", - "struct"); - return NULL; - } - scheduler->dump_complete_cb = dump_complete_cb; - scheduler->cb_arg = cb_arg; - scheduler->read_views = read_views; - scheduler->run_env = run_env; - scheduler->worker_pool_size = write_threads; - tt_pthread_mutex_init(&scheduler->mutex, NULL); - diag_create(&scheduler->diag); - fiber_cond_create(&scheduler->dump_cond); - vy_compact_heap_create(&scheduler->compact_heap); - vy_dump_heap_create(&scheduler->dump_heap); - tt_pthread_cond_init(&scheduler->worker_cond, NULL); - scheduler->loop = loop(); - ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb); - fiber_cond_create(&scheduler->scheduler_cond); - mempool_create(&scheduler->task_pool, cord_slab_cache(), - sizeof(struct vy_task)); - /* Start scheduler fiber. */ - scheduler->scheduler = fiber_new("vinyl.scheduler", vy_scheduler_f); - if (scheduler->scheduler == NULL) - panic("failed to start vinyl scheduler fiber"); - fiber_start(scheduler->scheduler, scheduler); - return scheduler; -} - -static void -vy_scheduler_delete(struct vy_scheduler *scheduler) -{ - /* Stop scheduler fiber. */ - scheduler->scheduler = NULL; - /* Sic: fiber_cancel() can't be used here. */ - fiber_cond_signal(&scheduler->scheduler_cond); - - if (scheduler->is_worker_pool_running) - vy_scheduler_stop_workers(scheduler); - - mempool_destroy(&scheduler->task_pool); - diag_destroy(&scheduler->diag); - vy_compact_heap_destroy(&scheduler->compact_heap); - vy_dump_heap_destroy(&scheduler->dump_heap); - tt_pthread_cond_destroy(&scheduler->worker_cond); - TRASH(&scheduler->scheduler_async); - fiber_cond_destroy(&scheduler->scheduler_cond); - tt_pthread_mutex_destroy(&scheduler->mutex); - free(scheduler); -} - -static void -vy_scheduler_add_index(struct vy_scheduler *scheduler, - struct vy_index *index) -{ - assert(index->in_dump.pos == UINT32_MAX); - assert(index->in_compact.pos == UINT32_MAX); - vy_dump_heap_insert(&scheduler->dump_heap, &index->in_dump); - vy_compact_heap_insert(&scheduler->compact_heap, &index->in_compact); -} - -static void -vy_scheduler_update_index(struct vy_scheduler *scheduler, - struct vy_index *index) -{ - if (index->is_dropped) { - /* Dropped indexes are exempted from scheduling. */ - assert(index->in_dump.pos == UINT32_MAX); - assert(index->in_compact.pos == UINT32_MAX); - return; - } - assert(index->in_dump.pos != UINT32_MAX); - assert(index->in_compact.pos != UINT32_MAX); - vy_dump_heap_update(&scheduler->dump_heap, &index->in_dump); - vy_compact_heap_update(&scheduler->compact_heap, &index->in_compact); -} - -static void -vy_scheduler_remove_index(struct vy_scheduler *scheduler, - struct vy_index *index) -{ - assert(index->in_dump.pos != UINT32_MAX); - assert(index->in_compact.pos != UINT32_MAX); - vy_dump_heap_delete(&scheduler->dump_heap, &index->in_dump); - vy_compact_heap_delete(&scheduler->compact_heap, &index->in_compact); - index->in_dump.pos = UINT32_MAX; - index->in_compact.pos = UINT32_MAX; -} - -static void -vy_scheduler_pin_index(struct vy_scheduler *scheduler, struct vy_index *index) -{ - assert(!index->is_dumping); - if (index->pin_count++ == 0) - vy_scheduler_update_index(scheduler, index); -} - -static void -vy_scheduler_unpin_index(struct vy_scheduler *scheduler, struct vy_index *index) -{ - assert(!index->is_dumping); - assert(index->pin_count > 0); - if (--index->pin_count == 0) - vy_scheduler_update_index(scheduler, index); -} - -/** - * Create a task for dumping an index. The new task is returned - * in @ptask. If there's no index that needs to be dumped @ptask - * is set to NULL. - * - * We only dump an index if it needs to be snapshotted or the quota - * on memory usage is exceeded. In either case, the oldest index - * is selected, because dumping it will free the maximal amount of - * memory due to log structured design of the memory allocator. - * - * Returns 0 on success, -1 on failure. - */ -static int -vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask) -{ -retry: - *ptask = NULL; - assert(scheduler->dump_generation <= scheduler->generation); - if (scheduler->dump_generation == scheduler->generation) { - /* - * All memory trees of past generations have - * been dumped, nothing to do. - */ - return 0; - } - /* - * Look up the oldest index eligible for dump. - */ - struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); - if (pn == NULL) { - /* - * There is no vinyl index and so no task to schedule. - * Complete the current dump round. - */ - vy_scheduler_complete_dump(scheduler); - return 0; - } - struct vy_index *index = container_of(pn, struct vy_index, in_dump); - if (!index->is_dumping && index->pin_count == 0 && - vy_index_generation(index) == scheduler->dump_generation) { - /* - * Dump is in progress and there is an index that - * contains data that must be dumped at the current - * round. Try to create a task for it. - */ - if (vy_task_dump_new(scheduler, index, ptask) != 0) - return -1; - if (*ptask != NULL) - return 0; /* new task */ - /* - * All in-memory trees eligible for dump were empty - * and so were deleted without involving a worker - * thread. Check another index. - */ - goto retry; - } - /* - * Dump is in progress, but all eligible indexes are - * already being dumped. Wait until the current round - * is complete. - */ - assert(scheduler->dump_task_count > 0); - return 0; -} - -/** - * Create a task for compacting a range. The new task is returned - * in @ptask. If there's no range that needs to be compacted @ptask - * is set to NULL. - * - * We compact ranges that have more runs in a level than specified - * by run_count_per_level configuration option. Among those runs we - * give preference to those ranges whose compaction will reduce - * read amplification most. - * - * Returns 0 on success, -1 on failure. - */ -static int -vy_scheduler_peek_compact(struct vy_scheduler *scheduler, - struct vy_task **ptask) -{ -retry: - *ptask = NULL; - struct heap_node *pn = vy_compact_heap_top(&scheduler->compact_heap); - if (pn == NULL) - return 0; /* nothing to do */ - struct vy_index *index = container_of(pn, struct vy_index, in_compact); - if (vy_index_compact_priority(index) <= 1) - return 0; /* nothing to do */ - if (vy_task_compact_new(scheduler, index, ptask) != 0) - return -1; - if (*ptask == NULL) - goto retry; /* index dropped or range split/coalesced */ - return 0; /* new task */ -} - -static int -vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask) -{ - *ptask = NULL; - - if (vy_scheduler_peek_dump(scheduler, ptask) != 0) - goto fail; - if (*ptask != NULL) - return 0; - - if (scheduler->workers_available <= 1) { - /* - * If all worker threads are busy doing compaction - * when we run out of quota, ongoing transactions will - * hang until one of the threads has finished, which - * may take quite a while. To avoid unpredictably long - * stalls, always keep one worker thread reserved for - * dumps. - */ - return 0; - } - - if (vy_scheduler_peek_compact(scheduler, ptask) != 0) - goto fail; - if (*ptask != NULL) - return 0; - - /* no task to run */ - return 0; -fail: - assert(!diag_is_empty(diag_get())); - diag_move(diag_get(), &scheduler->diag); - return -1; - -} - -static int -vy_scheduler_complete_task(struct vy_scheduler *scheduler, - struct vy_task *task) -{ - if (task->index->is_dropped) { - if (task->ops->abort) - task->ops->abort(scheduler, task, false); - return 0; - } - - struct diag *diag = &task->diag; - if (task->status != 0) { - assert(!diag_is_empty(diag)); - goto fail; /* ->execute fialed */ - } - ERROR_INJECT(ERRINJ_VY_TASK_COMPLETE, { - diag_set(ClientError, ER_INJECTION, - "vinyl task completion"); - diag_move(diag_get(), diag); - goto fail; }); - if (task->ops->complete && - task->ops->complete(scheduler, task) != 0) { - assert(!diag_is_empty(diag_get())); - diag_move(diag_get(), diag); - goto fail; - } - return 0; -fail: - if (task->ops->abort) - task->ops->abort(scheduler, task, false); - diag_move(diag, &scheduler->diag); - return -1; -} - -static int -vy_scheduler_f(va_list va) -{ - struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); - - /* - * Yield immediately, until the quota watermark is reached - * for the first time or a checkpoint is made. - * Then start the worker threads: we know they will be - * needed. If quota watermark is never reached, workers - * are not started and the scheduler is idle until - * shutdown or checkpoint. - */ - fiber_cond_wait(&scheduler->scheduler_cond); - if (scheduler->scheduler == NULL) - return 0; /* destroyed */ - - vy_scheduler_start_workers(scheduler); - - while (scheduler->scheduler != NULL) { - struct stailq output_queue; - struct vy_task *task, *next; - int tasks_failed = 0, tasks_done = 0; - bool was_empty; - - /* Get the list of processed tasks. */ - stailq_create(&output_queue); - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_concat(&output_queue, &scheduler->output_queue); - tt_pthread_mutex_unlock(&scheduler->mutex); - - /* Complete and delete all processed tasks. */ - stailq_foreach_entry_safe(task, next, &output_queue, link) { - if (vy_scheduler_complete_task(scheduler, task) != 0) - tasks_failed++; - else - tasks_done++; - vy_task_delete(&scheduler->task_pool, task); - scheduler->workers_available++; - assert(scheduler->workers_available <= - scheduler->worker_pool_size); - } - /* - * Reset the timeout if we managed to successfully - * complete at least one task. - */ - if (tasks_done > 0) { - scheduler->timeout = 0; - /* - * Task completion callback may yield, which - * opens a time window for a worker to submit - * a processed task and wake up the scheduler - * (via scheduler_async). Hence we should go - * and recheck the output_queue in order not - * to lose a wakeup event and hang for good. - */ - continue; - } - /* Throttle for a while if a task failed. */ - if (tasks_failed > 0) - goto error; - /* All worker threads are busy. */ - if (scheduler->workers_available == 0) - goto wait; - /* Get a task to schedule. */ - if (vy_schedule(scheduler, &task) != 0) - goto error; - /* Nothing to do. */ - if (task == NULL) - goto wait; - - /* Queue the task and notify workers if necessary. */ - tt_pthread_mutex_lock(&scheduler->mutex); - was_empty = stailq_empty(&scheduler->input_queue); - stailq_add_tail_entry(&scheduler->input_queue, task, link); - if (was_empty) - tt_pthread_cond_signal(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - scheduler->workers_available--; - fiber_reschedule(); - continue; -error: - /* Abort pending checkpoint. */ - fiber_cond_signal(&scheduler->dump_cond); - /* - * A task can fail either due to lack of memory or IO - * error. In either case it is pointless to schedule - * another task right away, because it is likely to fail - * too. So we throttle the scheduler for a while after - * each failure. - */ - scheduler->timeout *= 2; - if (scheduler->timeout < VY_SCHEDULER_TIMEOUT_MIN) - scheduler->timeout = VY_SCHEDULER_TIMEOUT_MIN; - if (scheduler->timeout > VY_SCHEDULER_TIMEOUT_MAX) - scheduler->timeout = VY_SCHEDULER_TIMEOUT_MAX; - struct errinj *inj; - inj = errinj(ERRINJ_VY_SCHED_TIMEOUT, ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam != 0) - scheduler->timeout = inj->dparam; - say_warn("throttling scheduler for %.0f second(s)", - scheduler->timeout); - scheduler->is_throttled = true; - fiber_sleep(scheduler->timeout); - scheduler->is_throttled = false; - continue; -wait: - /* Wait for changes */ - fiber_cond_wait(&scheduler->scheduler_cond); - } - - return 0; -} - -static int -vy_worker_f(va_list va) -{ - struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); - coio_enable(); - struct vy_task *task = NULL; - - tt_pthread_mutex_lock(&scheduler->mutex); - while (scheduler->is_worker_pool_running) { - /* Wait for a task */ - if (stailq_empty(&scheduler->input_queue)) { - /* Wake scheduler up if there are no more tasks */ - ev_async_send(scheduler->loop, - &scheduler->scheduler_async); - tt_pthread_cond_wait(&scheduler->worker_cond, - &scheduler->mutex); - continue; - } - task = stailq_shift_entry(&scheduler->input_queue, - struct vy_task, link); - tt_pthread_mutex_unlock(&scheduler->mutex); - assert(task != NULL); - - /* Execute task */ - task->status = task->ops->execute(task); - if (task->status != 0) { - struct diag *diag = diag_get(); - assert(!diag_is_empty(diag)); - diag_move(diag, &task->diag); - } - - /* Return processed task to scheduler */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_add_tail_entry(&scheduler->output_queue, task, link); - } - tt_pthread_mutex_unlock(&scheduler->mutex); - return 0; -} - -static void -vy_scheduler_start_workers(struct vy_scheduler *scheduler) -{ - assert(!scheduler->is_worker_pool_running); - - /* Start worker threads */ - scheduler->is_worker_pool_running = true; - /* One thread is reserved for dumps, see vy_schedule(). */ - assert(scheduler->worker_pool_size >= 2); - scheduler->workers_available = scheduler->worker_pool_size; - stailq_create(&scheduler->input_queue); - stailq_create(&scheduler->output_queue); - scheduler->worker_pool = (struct cord *) - calloc(scheduler->worker_pool_size, sizeof(struct cord)); - if (scheduler->worker_pool == NULL) - panic("failed to allocate vinyl worker pool"); - ev_async_start(scheduler->loop, &scheduler->scheduler_async); - for (int i = 0; i < scheduler->worker_pool_size; i++) { - cord_costart(&scheduler->worker_pool[i], "vinyl.worker", - vy_worker_f, scheduler); - } -} - -static void -vy_scheduler_stop_workers(struct vy_scheduler *scheduler) -{ - struct stailq task_queue; - stailq_create(&task_queue); - - assert(scheduler->is_worker_pool_running); - scheduler->is_worker_pool_running = false; - - /* Clear the input queue and wake up worker threads. */ - tt_pthread_mutex_lock(&scheduler->mutex); - stailq_concat(&task_queue, &scheduler->input_queue); - pthread_cond_broadcast(&scheduler->worker_cond); - tt_pthread_mutex_unlock(&scheduler->mutex); - - /* Wait for worker threads to exit. */ - for (int i = 0; i < scheduler->worker_pool_size; i++) - cord_join(&scheduler->worker_pool[i]); - ev_async_stop(scheduler->loop, &scheduler->scheduler_async); - free(scheduler->worker_pool); - scheduler->worker_pool = NULL; - - /* Abort all pending tasks. */ - struct vy_task *task, *next; - stailq_concat(&task_queue, &scheduler->output_queue); - stailq_foreach_entry_safe(task, next, &task_queue, link) { - if (task->ops->abort != NULL) - task->ops->abort(scheduler, task, true); - vy_task_delete(&scheduler->task_pool, task); - } -} - -/** Trigger dump of all currently existing in-memory trees. */ -static void -vy_scheduler_trigger_dump(struct vy_scheduler *scheduler) -{ - assert(scheduler->dump_generation <= scheduler->generation); - if (scheduler->dump_generation < scheduler->generation) { - /* Dump is already in progress, nothing to do. */ - return; - } - if (scheduler->checkpoint_in_progress) { - /* - * Do not trigger another dump until checkpoint - * is complete so as to make sure no statements - * inserted after WAL rotation are written to - * the snapshot. - */ - scheduler->dump_pending = true; - return; - } - scheduler->dump_start = ev_monotonic_now(loop()); - scheduler->generation++; - scheduler->dump_pending = false; - fiber_cond_signal(&scheduler->scheduler_cond); -} - -/** - * Check whether the current dump round is complete. - * If it is, free memory and proceed to the next dump round. - */ -static void -vy_scheduler_complete_dump(struct vy_scheduler *scheduler) -{ - assert(scheduler->dump_generation < scheduler->generation); - - if (scheduler->dump_task_count > 0) { - /* - * There are still dump tasks in progress, - * the dump round can't be over yet. - */ - return; - } - - int64_t min_generation = scheduler->generation; - struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); - if (pn != NULL) { - struct vy_index *index; - index = container_of(pn, struct vy_index, in_dump); - min_generation = vy_index_generation(index); - } - if (min_generation == scheduler->dump_generation) { - /* - * There are still indexes that must be dumped - * during the current dump round. - */ - return; - } - - /* - * The oldest index data is newer than @dump_generation, - * so the current dump round has been finished. Notify - * about dump completion. - */ - ev_tstamp now = ev_monotonic_now(loop()); - ev_tstamp dump_duration = now - scheduler->dump_start; - scheduler->dump_start = now; - scheduler->dump_generation = min_generation; - scheduler->dump_complete_cb(min_generation - 1, dump_duration, - scheduler->cb_arg); - fiber_cond_signal(&scheduler->dump_cond); -} - -/** - * Schedule checkpoint. Please call vy_scheduler_wait_checkpoint() - * after that. - */ -static int -vy_scheduler_begin_checkpoint(struct vy_scheduler *scheduler) -{ - assert(!scheduler->checkpoint_in_progress); - - /* - * If the scheduler is throttled due to errors, do not wait - * until it wakes up as it may take quite a while. Instead - * fail checkpoint immediately with the last error seen by - * the scheduler. - */ - if (scheduler->is_throttled) { - assert(!diag_is_empty(&scheduler->diag)); - diag_add_error(diag_get(), diag_last_error(&scheduler->diag)); - say_error("Can't checkpoint, scheduler is throttled with: %s", - diag_last_error(diag_get())->errmsg); - return -1; - } - - assert(scheduler->dump_generation <= scheduler->generation); - if (scheduler->generation == scheduler->dump_generation) { - /* - * We are about to start a new dump round. - * Remember the current time so that we can update - * dump bandwidth when the dump round is complete - * (see vy_scheduler_complete_dump()). - */ - scheduler->dump_start = ev_monotonic_now(loop()); - } - scheduler->generation++; - scheduler->checkpoint_in_progress = true; - fiber_cond_signal(&scheduler->scheduler_cond); - return 0; -} - -/** - * Wait for checkpoint. Please call vy_scheduler_end_checkpoint() - * after that. - */ -static int -vy_scheduler_wait_checkpoint(struct vy_scheduler *scheduler) -{ - if (!scheduler->checkpoint_in_progress) - return 0; - - /* - * Wait until all in-memory trees created before - * checkpoint started have been dumped. - */ - while (scheduler->dump_generation < scheduler->generation) { - if (scheduler->is_throttled) { - /* A dump error occurred, abort checkpoint. */ - assert(!diag_is_empty(&scheduler->diag)); - diag_add_error(diag_get(), - diag_last_error(&scheduler->diag)); - say_error("vinyl checkpoint error: %s", - diag_last_error(diag_get())->errmsg); - return -1; - } - fiber_cond_wait(&scheduler->dump_cond); - } - say_info("vinyl checkpoint done"); - return 0; -} - -/** - * End checkpoint. Called on both checkpoint commit and abort. - */ -static void -vy_scheduler_end_checkpoint(struct vy_scheduler *scheduler) -{ - if (!scheduler->checkpoint_in_progress) - return; - - scheduler->checkpoint_in_progress = false; - if (scheduler->dump_pending) { - /* - * Dump was triggered while checkpoint was - * in progress and hence it was postponed. - * Schedule it now. - */ - vy_scheduler_trigger_dump(scheduler); - } -} - -/* Scheduler }}} */ - /** {{{ Introspection */ static void diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c new file mode 100644 index 0000000000000000000000000000000000000000..7ba0c6f96d900e62b8880d65e07cdf972c89c1c6 --- /dev/null +++ b/src/box/vy_scheduler.c @@ -0,0 +1,1584 @@ +/* + * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "vy_scheduler.h" + +#include <assert.h> +#include <stdarg.h> +#include <stdbool.h> +#include <stddef.h> +#include <stdint.h> +#include <stdlib.h> +#include <small/mempool.h> +#include <small/rlist.h> +#include <tarantool_ev.h> + +#include "checkpoint.h" +#include "coio_task.h" +#include "diag.h" +#include "errcode.h" +#include "errinj.h" +#include "fiber.h" +#include "fiber_cond.h" +#include "salad/stailq.h" +#include "say.h" +#include "vy_index.h" +#include "vy_log.h" +#include "vy_mem.h" +#include "vy_range.h" +#include "vy_run.h" +#include "vy_write_iterator.h" +#include "trivia/util.h" +#include "tt_pthread.h" + +/** + * Yield after iterating over this many objects (e.g. ranges). + * Yield more often in debug mode. + */ +#if defined(NDEBUG) +enum { VY_YIELD_LOOPS = 128 }; +#else +enum { VY_YIELD_LOOPS = 2 }; +#endif + +/* Min and max values for vy_scheduler::timeout. */ +#define VY_SCHEDULER_TIMEOUT_MIN 1 +#define VY_SCHEDULER_TIMEOUT_MAX 60 + +static int vy_worker_f(va_list); +static int vy_scheduler_f(va_list); + +struct vy_task; + +struct vy_task_ops { + /** + * This function is called from a worker. It is supposed to do work + * which is too heavy for the tx thread (like IO or compression). + * Returns 0 on success. On failure returns -1 and sets diag. + */ + int (*execute)(struct vy_task *task); + /** + * This function is called by the scheduler upon task completion. + * It may be used to finish the task from the tx thread context. + * + * Returns 0 on success. On failure returns -1 and sets diag. + */ + int (*complete)(struct vy_scheduler *scheduler, struct vy_task *task); + /** + * This function is called by the scheduler if either ->execute + * or ->complete failed. It may be used to undo changes done to + * the index when preparing the task. + * + * If @in_shutdown is set, the callback is invoked from the + * engine destructor. + */ + void (*abort)(struct vy_scheduler *scheduler, struct vy_task *task, + bool in_shutdown); +}; + +struct vy_task { + const struct vy_task_ops *ops; + /** Return code of ->execute. */ + int status; + /** If ->execute fails, the error is stored here. */ + struct diag diag; + /** Index this task is for. */ + struct vy_index *index; + /** Range to compact. */ + struct vy_range *range; + /** Run written by this task. */ + struct vy_run *new_run; + /** Write iterator producing statements for the new run. */ + struct vy_stmt_stream *wi; + /** + * First (newest) and last (oldest) slices to compact. + * + * While a compaction task is in progress, a new slice + * can be added to a range by concurrent dump, so we + * need to remember the slices we are compacting. + */ + struct vy_slice *first_slice, *last_slice; + /** + * Link in the list of pending or processed tasks. + * See vy_scheduler::input_queue, output_queue. + */ + struct stailq_entry link; + /** + * An estimate of the maximal number of statements that + * can be written by the task. Used to create a bloom + * filter of the perfect size. + */ + size_t max_output_count; + /** + * Index options may be modified while a task is in + * progress so we save them here to safely access them + * from another thread. + */ + double bloom_fpr; + int64_t page_size; +}; + +/** + * Allocate a new task to be executed by a worker thread. + * When preparing an asynchronous task, this function must + * be called before yielding the current fiber in order to + * pin the index the task is for so that a concurrent fiber + * does not free it from under us. + */ +static struct vy_task * +vy_task_new(struct mempool *pool, struct vy_index *index, + const struct vy_task_ops *ops) +{ + struct vy_task *task = mempool_alloc(pool); + if (task == NULL) { + diag_set(OutOfMemory, sizeof(*task), + "mempool", "struct vy_task"); + return NULL; + } + memset(task, 0, sizeof(*task)); + task->ops = ops; + task->index = index; + vy_index_ref(index); + diag_create(&task->diag); + return task; +} + +/** Free a task allocated with vy_task_new(). */ +static void +vy_task_delete(struct mempool *pool, struct vy_task *task) +{ + vy_index_unref(task->index); + diag_destroy(&task->diag); + TRASH(task); + mempool_free(pool, task); +} + +static bool +vy_dump_heap_less(struct heap_node *a, struct heap_node *b) +{ + struct vy_index *i1 = container_of(a, struct vy_index, in_dump); + struct vy_index *i2 = container_of(b, struct vy_index, in_dump); + + /* + * Indexes that are currently being dumped or can't be scheduled + * for dump right now are moved off the top of the heap. + */ + if (i1->is_dumping != i2->is_dumping) + return i1->is_dumping < i2->is_dumping; + if (i1->pin_count != i2->pin_count) + return i1->pin_count < i2->pin_count; + + /* Older indexes are dumped first. */ + int64_t i1_generation = vy_index_generation(i1); + int64_t i2_generation = vy_index_generation(i2); + if (i1_generation != i2_generation) + return i1_generation < i2_generation; + /* + * If a space has more than one index, appending a statement + * to it requires reading the primary index to get the old + * tuple and delete it from secondary indexes. This means that + * on local recovery from WAL, the primary index must not be + * ahead of secondary indexes of the same space, i.e. it must + * be dumped last. + */ + return i1->id > i2->id; +} + +#define HEAP_NAME vy_dump_heap +#define HEAP_LESS(h, l, r) vy_dump_heap_less(l, r) + +#include "salad/heap.h" + +#undef HEAP_LESS +#undef HEAP_NAME + +static bool +vy_compact_heap_less(struct heap_node *a, struct heap_node *b) +{ + struct vy_index *i1 = container_of(a, struct vy_index, in_compact); + struct vy_index *i2 = container_of(b, struct vy_index, in_compact); + /* + * Prefer indexes whose read amplification will be reduced + * most as a result of compaction. + */ + return vy_index_compact_priority(i1) > vy_index_compact_priority(i2); +} + +#define HEAP_NAME vy_compact_heap +#define HEAP_LESS(h, l, r) vy_compact_heap_less(l, r) + +#include "salad/heap.h" + +#undef HEAP_LESS +#undef HEAP_NAME + +static void +vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) +{ + (void)loop; + (void)events; + struct vy_scheduler *scheduler = container_of(watcher, + struct vy_scheduler, scheduler_async); + fiber_cond_signal(&scheduler->scheduler_cond); +} + +static void +vy_scheduler_start_workers(struct vy_scheduler *scheduler) +{ + assert(!scheduler->is_worker_pool_running); + /* One thread is reserved for dumps, see vy_schedule(). */ + assert(scheduler->worker_pool_size >= 2); + + scheduler->is_worker_pool_running = true; + scheduler->workers_available = scheduler->worker_pool_size; + scheduler->worker_pool = calloc(scheduler->worker_pool_size, + sizeof(struct cord)); + if (scheduler->worker_pool == NULL) + panic("failed to allocate vinyl worker pool"); + + ev_async_start(scheduler->scheduler_loop, &scheduler->scheduler_async); + for (int i = 0; i < scheduler->worker_pool_size; i++) { + if (cord_costart(&scheduler->worker_pool[i], "vinyl.worker", + vy_worker_f, scheduler) != 0) + panic("failed to start vinyl worker thread"); + } +} + +static void +vy_scheduler_stop_workers(struct vy_scheduler *scheduler) +{ + struct stailq task_queue; + stailq_create(&task_queue); + + assert(scheduler->is_worker_pool_running); + scheduler->is_worker_pool_running = false; + + /* Clear the input queue and wake up worker threads. */ + tt_pthread_mutex_lock(&scheduler->mutex); + stailq_concat(&task_queue, &scheduler->input_queue); + pthread_cond_broadcast(&scheduler->worker_cond); + tt_pthread_mutex_unlock(&scheduler->mutex); + + /* Wait for worker threads to exit. */ + for (int i = 0; i < scheduler->worker_pool_size; i++) + cord_join(&scheduler->worker_pool[i]); + ev_async_stop(scheduler->scheduler_loop, &scheduler->scheduler_async); + + free(scheduler->worker_pool); + scheduler->worker_pool = NULL; + + /* Abort all pending tasks. */ + struct vy_task *task, *next; + stailq_concat(&task_queue, &scheduler->output_queue); + stailq_foreach_entry_safe(task, next, &task_queue, link) { + if (task->ops->abort != NULL) + task->ops->abort(scheduler, task, true); + vy_task_delete(&scheduler->task_pool, task); + } +} + +struct vy_scheduler * +vy_scheduler_new(int write_threads, + vy_scheduler_dump_complete_f dump_complete_cb, void *cb_arg, + struct vy_run_env *run_env, struct rlist *read_views) +{ + struct vy_scheduler *scheduler = calloc(1, sizeof(*scheduler)); + if (scheduler == NULL) { + diag_set(OutOfMemory, sizeof(*scheduler), + "malloc", "struct vy_scheduler"); + return NULL; + } + + scheduler->dump_complete_cb = dump_complete_cb; + scheduler->cb_arg = cb_arg; + scheduler->read_views = read_views; + scheduler->run_env = run_env; + + scheduler->scheduler_fiber = fiber_new("vinyl.scheduler", + vy_scheduler_f); + if (scheduler->scheduler_fiber == NULL) + panic("failed to allocate vinyl scheduler fiber"); + + scheduler->scheduler_loop = loop(); + fiber_cond_create(&scheduler->scheduler_cond); + ev_async_init(&scheduler->scheduler_async, vy_scheduler_async_cb); + + scheduler->worker_pool_size = write_threads; + mempool_create(&scheduler->task_pool, cord_slab_cache(), + sizeof(struct vy_task)); + stailq_create(&scheduler->input_queue); + stailq_create(&scheduler->output_queue); + + tt_pthread_cond_init(&scheduler->worker_cond, NULL); + tt_pthread_mutex_init(&scheduler->mutex, NULL); + + vy_dump_heap_create(&scheduler->dump_heap); + vy_compact_heap_create(&scheduler->compact_heap); + + diag_create(&scheduler->diag); + fiber_cond_create(&scheduler->dump_cond); + + fiber_start(scheduler->scheduler_fiber, scheduler); + return scheduler; +} + +void +vy_scheduler_delete(struct vy_scheduler *scheduler) +{ + /* Stop scheduler fiber. */ + scheduler->scheduler_fiber = NULL; + /* Sic: fiber_cancel() can't be used here. */ + fiber_cond_signal(&scheduler->scheduler_cond); + + if (scheduler->is_worker_pool_running) + vy_scheduler_stop_workers(scheduler); + + tt_pthread_cond_destroy(&scheduler->worker_cond); + tt_pthread_mutex_destroy(&scheduler->mutex); + + diag_destroy(&scheduler->diag); + mempool_destroy(&scheduler->task_pool); + fiber_cond_destroy(&scheduler->dump_cond); + fiber_cond_destroy(&scheduler->scheduler_cond); + vy_dump_heap_destroy(&scheduler->dump_heap); + vy_compact_heap_destroy(&scheduler->compact_heap); + + TRASH(scheduler); + free(scheduler); +} + +void +vy_scheduler_add_index(struct vy_scheduler *scheduler, + struct vy_index *index) +{ + assert(index->in_dump.pos == UINT32_MAX); + assert(index->in_compact.pos == UINT32_MAX); + vy_dump_heap_insert(&scheduler->dump_heap, &index->in_dump); + vy_compact_heap_insert(&scheduler->compact_heap, &index->in_compact); +} + +void +vy_scheduler_remove_index(struct vy_scheduler *scheduler, + struct vy_index *index) +{ + assert(index->in_dump.pos != UINT32_MAX); + assert(index->in_compact.pos != UINT32_MAX); + vy_dump_heap_delete(&scheduler->dump_heap, &index->in_dump); + vy_compact_heap_delete(&scheduler->compact_heap, &index->in_compact); + index->in_dump.pos = UINT32_MAX; + index->in_compact.pos = UINT32_MAX; +} + +static void +vy_scheduler_update_index(struct vy_scheduler *scheduler, + struct vy_index *index) +{ + if (index->is_dropped) { + /* Dropped indexes are exempted from scheduling. */ + assert(index->in_dump.pos == UINT32_MAX); + assert(index->in_compact.pos == UINT32_MAX); + return; + } + assert(index->in_dump.pos != UINT32_MAX); + assert(index->in_compact.pos != UINT32_MAX); + vy_dump_heap_update(&scheduler->dump_heap, &index->in_dump); + vy_compact_heap_update(&scheduler->compact_heap, &index->in_compact); +} + +static void +vy_scheduler_pin_index(struct vy_scheduler *scheduler, struct vy_index *index) +{ + assert(!index->is_dumping); + if (index->pin_count++ == 0) + vy_scheduler_update_index(scheduler, index); +} + +static void +vy_scheduler_unpin_index(struct vy_scheduler *scheduler, struct vy_index *index) +{ + assert(!index->is_dumping); + assert(index->pin_count > 0); + if (--index->pin_count == 0) + vy_scheduler_update_index(scheduler, index); +} + +void +vy_scheduler_trigger_dump(struct vy_scheduler *scheduler) +{ + assert(scheduler->dump_generation <= scheduler->generation); + if (scheduler->dump_generation < scheduler->generation) { + /* Dump is already in progress, nothing to do. */ + return; + } + if (scheduler->checkpoint_in_progress) { + /* + * Do not trigger another dump until checkpoint + * is complete so as to make sure no statements + * inserted after WAL rotation are written to + * the snapshot. + */ + scheduler->dump_pending = true; + return; + } + scheduler->dump_start = ev_monotonic_now(loop()); + scheduler->generation++; + scheduler->dump_pending = false; + fiber_cond_signal(&scheduler->scheduler_cond); +} + +/** + * Check whether the current dump round is complete. + * If it is, free memory and proceed to the next dump round. + */ +static void +vy_scheduler_complete_dump(struct vy_scheduler *scheduler) +{ + assert(scheduler->dump_generation < scheduler->generation); + + if (scheduler->dump_task_count > 0) { + /* + * There are still dump tasks in progress, + * the dump round can't be over yet. + */ + return; + } + + int64_t min_generation = scheduler->generation; + struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); + if (pn != NULL) { + struct vy_index *index; + index = container_of(pn, struct vy_index, in_dump); + min_generation = vy_index_generation(index); + } + if (min_generation == scheduler->dump_generation) { + /* + * There are still indexes that must be dumped + * during the current dump round. + */ + return; + } + + /* + * The oldest index data is newer than @dump_generation, + * so the current dump round has been finished. Notify + * about dump completion. + */ + double now = ev_monotonic_now(loop()); + double dump_duration = now - scheduler->dump_start; + scheduler->dump_start = now; + scheduler->dump_generation = min_generation; + scheduler->dump_complete_cb(min_generation - 1, dump_duration, + scheduler->cb_arg); + fiber_cond_signal(&scheduler->dump_cond); +} + +int +vy_scheduler_begin_checkpoint(struct vy_scheduler *scheduler) +{ + assert(!scheduler->checkpoint_in_progress); + + /* + * If the scheduler is throttled due to errors, do not wait + * until it wakes up as it may take quite a while. Instead + * fail checkpoint immediately with the last error seen by + * the scheduler. + */ + if (scheduler->is_throttled) { + assert(!diag_is_empty(&scheduler->diag)); + diag_add_error(diag_get(), diag_last_error(&scheduler->diag)); + say_error("Can't checkpoint, scheduler is throttled with: %s", + diag_last_error(diag_get())->errmsg); + return -1; + } + + assert(scheduler->dump_generation <= scheduler->generation); + if (scheduler->generation == scheduler->dump_generation) { + /* + * We are about to start a new dump round. + * Remember the current time so that we can update + * dump bandwidth when the dump round is complete + * (see vy_scheduler_complete_dump()). + */ + scheduler->dump_start = ev_monotonic_now(loop()); + } + scheduler->generation++; + scheduler->checkpoint_in_progress = true; + fiber_cond_signal(&scheduler->scheduler_cond); + return 0; +} + +int +vy_scheduler_wait_checkpoint(struct vy_scheduler *scheduler) +{ + if (!scheduler->checkpoint_in_progress) + return 0; + + /* + * Wait until all in-memory trees created before + * checkpoint started have been dumped. + */ + while (scheduler->dump_generation < scheduler->generation) { + if (scheduler->is_throttled) { + /* A dump error occurred, abort checkpoint. */ + assert(!diag_is_empty(&scheduler->diag)); + diag_add_error(diag_get(), + diag_last_error(&scheduler->diag)); + say_error("vinyl checkpoint error: %s", + diag_last_error(diag_get())->errmsg); + return -1; + } + fiber_cond_wait(&scheduler->dump_cond); + } + say_info("vinyl checkpoint done"); + return 0; +} + +void +vy_scheduler_end_checkpoint(struct vy_scheduler *scheduler) +{ + if (!scheduler->checkpoint_in_progress) + return; + + scheduler->checkpoint_in_progress = false; + if (scheduler->dump_pending) { + /* + * Dump was triggered while checkpoint was + * in progress and hence it was postponed. + * Schedule it now. + */ + vy_scheduler_trigger_dump(scheduler); + } +} + +/** + * Allocate a new run for an index and write the information + * about it to the metadata log so that we could still find + * and delete it in case a write error occured. This function + * is called from dump/compaction task constructor. + */ +static struct vy_run * +vy_run_prepare(struct vy_index *index) +{ + struct vy_run *run = vy_run_new(vy_log_next_id()); + if (run == NULL) + return NULL; + vy_log_tx_begin(); + vy_log_prepare_run(index->commit_lsn, run->id); + if (vy_log_tx_commit() < 0) { + vy_run_unref(run); + return NULL; + } + return run; +} + +/** + * Free an incomplete run and write a record to the metadata + * log indicating that the run is not needed any more. + * This function is called on dump/compaction task abort. + */ +static void +vy_run_discard(struct vy_run *run) +{ + int64_t run_id = run->id; + + vy_run_unref(run); + + ERROR_INJECT(ERRINJ_VY_RUN_DISCARD, + {say_error("error injection: run %lld not discarded", + (long long)run_id); return;}); + + vy_log_tx_begin(); + /* + * The run hasn't been used and can be deleted right away + * so set gc_lsn to minimal possible (0). + */ + vy_log_drop_run(run_id, 0); + if (vy_log_tx_commit() < 0) { + /* + * Failure to log deletion of an incomplete + * run means that we won't retry to delete + * its files on log rotation. We will do that + * after restart though, so just warn and + * carry on. + */ + struct error *e = diag_last_error(diag_get()); + say_warn("failed to log run %lld deletion: %s", + (long long)run_id, e->errmsg); + } +} + +static int +vy_task_dump_execute(struct vy_task *task) +{ + struct vy_index *index = task->index; + + return vy_run_write(task->new_run, index->env->path, + index->space_id, index->id, task->wi, + task->page_size, index->cmp_def, + index->key_def, task->max_output_count, + task->bloom_fpr); +} + +static int +vy_task_dump_complete(struct vy_scheduler *scheduler, struct vy_task *task) +{ + struct vy_index *index = task->index; + struct vy_run *new_run = task->new_run; + int64_t dump_lsn = new_run->dump_lsn; + struct tuple_format *key_format = index->env->key_format; + struct vy_mem *mem, *next_mem; + struct vy_slice **new_slices, *slice; + struct vy_range *range, *begin_range, *end_range; + struct tuple *min_key, *max_key; + int i, loops = 0; + + assert(index->is_dumping); + + if (vy_run_is_empty(new_run)) { + /* + * In case the run is empty, we can discard the run + * and delete dumped in-memory trees right away w/o + * inserting slices into ranges. However, we need + * to log index dump anyway. + */ + vy_log_tx_begin(); + vy_log_dump_index(index->commit_lsn, dump_lsn); + if (vy_log_tx_commit() < 0) + goto fail; + vy_run_discard(new_run); + goto delete_mems; + } + + assert(new_run->info.min_lsn > index->dump_lsn); + assert(new_run->info.max_lsn <= dump_lsn); + + /* + * Figure out which ranges intersect the new run. + * @begin_range is the first range intersecting the run. + * @end_range is the range following the last range + * intersecting the run or NULL if the run itersects all + * ranges. + */ + min_key = vy_key_from_msgpack(key_format, new_run->info.min_key); + if (min_key == NULL) + goto fail; + max_key = vy_key_from_msgpack(key_format, new_run->info.max_key); + if (max_key == NULL) { + tuple_unref(min_key); + goto fail; + } + begin_range = vy_range_tree_psearch(index->tree, min_key); + end_range = vy_range_tree_nsearch(index->tree, max_key); + tuple_unref(min_key); + tuple_unref(max_key); + + /* + * For each intersected range allocate a slice of the new run. + */ + new_slices = calloc(index->range_count, sizeof(*new_slices)); + if (new_slices == NULL) { + diag_set(OutOfMemory, index->range_count * sizeof(*new_slices), + "malloc", "struct vy_slice *"); + goto fail; + } + for (range = begin_range, i = 0; range != end_range; + range = vy_range_tree_next(index->tree, range), i++) { + slice = vy_slice_new(vy_log_next_id(), new_run, + range->begin, range->end, index->cmp_def); + if (slice == NULL) + goto fail_free_slices; + + assert(i < index->range_count); + new_slices[i] = slice; + /* + * It's OK to yield here for the range tree can only + * be changed from the scheduler fiber. + */ + if (++loops % VY_YIELD_LOOPS == 0) + fiber_sleep(0); + } + + /* + * Log change in metadata. + */ + vy_log_tx_begin(); + vy_log_create_run(index->commit_lsn, new_run->id, dump_lsn); + for (range = begin_range, i = 0; range != end_range; + range = vy_range_tree_next(index->tree, range), i++) { + assert(i < index->range_count); + slice = new_slices[i]; + vy_log_insert_slice(range->id, new_run->id, slice->id, + tuple_data_or_null(slice->begin), + tuple_data_or_null(slice->end)); + + if (++loops % VY_YIELD_LOOPS == 0) + fiber_sleep(0); /* see comment above */ + } + vy_log_dump_index(index->commit_lsn, dump_lsn); + if (vy_log_tx_commit() < 0) + goto fail_free_slices; + + /* + * Account the new run. + */ + vy_index_add_run(index, new_run); + vy_stmt_counter_add_disk(&index->stat.disk.dump.out, &new_run->count); + + /* Drop the reference held by the task. */ + vy_run_unref(new_run); + + /* + * Add new slices to ranges. + */ + for (range = begin_range, i = 0; range != end_range; + range = vy_range_tree_next(index->tree, range), i++) { + assert(i < index->range_count); + slice = new_slices[i]; + vy_index_unacct_range(index, range); + vy_range_add_slice(range, slice); + vy_index_acct_range(index, range); + vy_range_update_compact_priority(range, &index->opts); + if (!vy_range_is_scheduled(range)) + vy_range_heap_update(&index->range_heap, + &range->heap_node); + range->version++; + /* + * If we yield here, a concurrent fiber will see + * a range with a run slice containing statements + * present in the in-memory trees of the index. + * This is OK, because read iterator won't use the + * new run slice until index->dump_lsn is bumped, + * which is only done after in-memory trees are + * removed (see vy_read_iterator_add_disk()). + */ + if (++loops % VY_YIELD_LOOPS == 0) + fiber_sleep(0); + } + free(new_slices); + +delete_mems: + /* + * Delete dumped in-memory trees. + */ + rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { + if (mem->generation > scheduler->dump_generation) + continue; + vy_stmt_counter_add(&index->stat.disk.dump.in, &mem->count); + vy_index_delete_mem(index, mem); + } + index->dump_lsn = dump_lsn; + index->stat.disk.dump.count++; + + /* The iterator has been cleaned up in a worker thread. */ + task->wi->iface->close(task->wi); + + index->is_dumping = false; + vy_scheduler_update_index(scheduler, index); + + if (index->id != 0) + vy_scheduler_unpin_index(scheduler, index->pk); + + assert(scheduler->dump_task_count > 0); + scheduler->dump_task_count--; + + vy_scheduler_complete_dump(scheduler); + + say_info("%s: dump completed", vy_index_name(index)); + return 0; + +fail_free_slices: + for (i = 0; i < index->range_count; i++) { + slice = new_slices[i]; + if (slice != NULL) + vy_slice_delete(slice); + if (++loops % VY_YIELD_LOOPS == 0) + fiber_sleep(0); + } + free(new_slices); +fail: + return -1; +} + +static void +vy_task_dump_abort(struct vy_scheduler *scheduler, struct vy_task *task, + bool in_shutdown) +{ + struct vy_index *index = task->index; + + assert(index->is_dumping); + + /* The iterator has been cleaned up in a worker thread. */ + task->wi->iface->close(task->wi); + + /* + * It's no use alerting the user if the server is + * shutting down or the index was dropped. + */ + if (!in_shutdown && !index->is_dropped) { + say_error("%s: dump failed: %s", vy_index_name(index), + diag_last_error(&task->diag)->errmsg); + } + + /* The metadata log is unavailable on shutdown. */ + if (!in_shutdown) + vy_run_discard(task->new_run); + else + vy_run_unref(task->new_run); + + index->is_dumping = false; + vy_scheduler_update_index(scheduler, index); + + if (index->id != 0) + vy_scheduler_unpin_index(scheduler, index->pk); + + assert(scheduler->dump_task_count > 0); + scheduler->dump_task_count--; +} + +/** + * Create a task to dump an index. + * + * On success the task is supposed to dump all in-memory + * trees created at @scheduler->dump_generation. + */ +static int +vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, + struct vy_task **p_task) +{ + static struct vy_task_ops dump_ops = { + .execute = vy_task_dump_execute, + .complete = vy_task_dump_complete, + .abort = vy_task_dump_abort, + }; + + assert(!index->is_dropped); + assert(!index->is_dumping); + assert(index->pin_count == 0); + assert(vy_index_generation(index) == scheduler->dump_generation); + assert(scheduler->dump_generation < scheduler->generation); + + struct errinj *inj = errinj(ERRINJ_VY_INDEX_DUMP, ERRINJ_INT); + if (inj != NULL && inj->iparam == (int)index->id) { + diag_set(ClientError, ER_INJECTION, "vinyl index dump"); + goto err; + } + + /* Rotate the active tree if it needs to be dumped. */ + if (index->mem->generation == scheduler->dump_generation && + vy_index_rotate_mem(index) != 0) + goto err; + + /* + * Wait until all active writes to in-memory trees + * eligible for dump are over. + */ + int64_t dump_lsn = -1; + size_t max_output_count = 0; + struct vy_mem *mem, *next_mem; + rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { + if (mem->generation > scheduler->dump_generation) + continue; + vy_mem_wait_pinned(mem); + if (mem->tree.size == 0) { + /* + * The tree is empty so we can delete it + * right away, without involving a worker. + */ + vy_index_delete_mem(index, mem); + continue; + } + dump_lsn = MAX(dump_lsn, mem->max_lsn); + max_output_count += mem->tree.size; + } + + if (max_output_count == 0) { + /* Nothing to do, pick another index. */ + vy_scheduler_update_index(scheduler, index); + vy_scheduler_complete_dump(scheduler); + return 0; + } + + struct vy_task *task = vy_task_new(&scheduler->task_pool, + index, &dump_ops); + if (task == NULL) + goto err; + + struct vy_run *new_run = vy_run_prepare(index); + if (new_run == NULL) + goto err_run; + + assert(dump_lsn >= 0); + new_run->dump_lsn = dump_lsn; + + struct vy_stmt_stream *wi; + bool is_last_level = (index->run_count == 0); + wi = vy_write_iterator_new(index->cmp_def, index->disk_format, + index->upsert_format, index->id == 0, + is_last_level, scheduler->read_views); + if (wi == NULL) + goto err_wi; + rlist_foreach_entry(mem, &index->sealed, in_sealed) { + if (mem->generation > scheduler->dump_generation) + continue; + if (vy_write_iterator_new_mem(wi, mem) != 0) + goto err_wi_sub; + } + + task->new_run = new_run; + task->wi = wi; + task->max_output_count = max_output_count; + task->bloom_fpr = index->opts.bloom_fpr; + task->page_size = index->opts.page_size; + + index->is_dumping = true; + vy_scheduler_update_index(scheduler, index); + + if (index->id != 0) { + /* + * The primary index must be dumped after all + * secondary indexes of the same space - see + * vy_dump_heap_less(). To make sure it isn't + * picked by the scheduler while all secondary + * indexes are being dumped, temporarily remove + * it from the dump heap. + */ + vy_scheduler_pin_index(scheduler, index->pk); + } + + scheduler->dump_task_count++; + + say_info("%s: dump started", vy_index_name(index)); + *p_task = task; + return 0; + +err_wi_sub: + task->wi->iface->close(wi); +err_wi: + vy_run_discard(new_run); +err_run: + vy_task_delete(&scheduler->task_pool, task); +err: + say_error("%s: could not start dump: %s", vy_index_name(index), + diag_last_error(diag_get())->errmsg); + return -1; +} + +static int +vy_task_compact_execute(struct vy_task *task) +{ + struct vy_index *index = task->index; + + return vy_run_write(task->new_run, index->env->path, + index->space_id, index->id, task->wi, + task->page_size, index->cmp_def, + index->key_def, task->max_output_count, + task->bloom_fpr); +} + +static int +vy_task_compact_complete(struct vy_scheduler *scheduler, struct vy_task *task) +{ + struct vy_index *index = task->index; + struct vy_range *range = task->range; + struct vy_run *new_run = task->new_run; + struct vy_slice *first_slice = task->first_slice; + struct vy_slice *last_slice = task->last_slice; + struct vy_slice *slice, *next_slice, *new_slice = NULL; + struct vy_run *run; + + /* + * Allocate a slice of the new run. + * + * If the run is empty, we don't need to allocate a new slice + * and insert it into the range, but we still need to delete + * compacted runs. + */ + if (!vy_run_is_empty(new_run)) { + new_slice = vy_slice_new(vy_log_next_id(), new_run, NULL, NULL, + index->cmp_def); + if (new_slice == NULL) + return -1; + } + + /* + * Build the list of runs that became unused + * as a result of compaction. + */ + RLIST_HEAD(unused_runs); + for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { + slice->run->compacted_slice_count++; + if (slice == last_slice) + break; + } + for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { + run = slice->run; + if (run->compacted_slice_count == run->refs) + rlist_add_entry(&unused_runs, run, in_unused); + slice->run->compacted_slice_count = 0; + if (slice == last_slice) + break; + } + + /* + * Log change in metadata. + */ + vy_log_tx_begin(); + for (slice = first_slice; ; slice = rlist_next_entry(slice, in_range)) { + vy_log_delete_slice(slice->id); + if (slice == last_slice) + break; + } + int64_t gc_lsn = checkpoint_last(NULL); + rlist_foreach_entry(run, &unused_runs, in_unused) + vy_log_drop_run(run->id, gc_lsn); + if (new_slice != NULL) { + vy_log_create_run(index->commit_lsn, new_run->id, + new_run->dump_lsn); + vy_log_insert_slice(range->id, new_run->id, new_slice->id, + tuple_data_or_null(new_slice->begin), + tuple_data_or_null(new_slice->end)); + } + if (vy_log_tx_commit() < 0) { + if (new_slice != NULL) + vy_slice_delete(new_slice); + return -1; + } + + /* + * Account the new run if it is not empty, + * otherwise discard it. + */ + if (new_slice != NULL) { + vy_index_add_run(index, new_run); + vy_stmt_counter_add_disk(&index->stat.disk.compact.out, + &new_run->count); + /* Drop the reference held by the task. */ + vy_run_unref(new_run); + } else + vy_run_discard(new_run); + + /* + * Replace compacted slices with the resulting slice. + * + * Note, since a slice might have been added to the range + * by a concurrent dump while compaction was in progress, + * we must insert the new slice at the same position where + * the compacted slices were. + */ + RLIST_HEAD(compacted_slices); + vy_index_unacct_range(index, range); + if (new_slice != NULL) + vy_range_add_slice_before(range, new_slice, first_slice); + for (slice = first_slice; ; slice = next_slice) { + next_slice = rlist_next_entry(slice, in_range); + vy_range_remove_slice(range, slice); + rlist_add_entry(&compacted_slices, slice, in_range); + vy_stmt_counter_add_disk(&index->stat.disk.compact.in, + &slice->count); + if (slice == last_slice) + break; + } + range->n_compactions++; + range->version++; + vy_index_acct_range(index, range); + vy_range_update_compact_priority(range, &index->opts); + index->stat.disk.compact.count++; + + /* + * Unaccount unused runs and delete compacted slices. + */ + rlist_foreach_entry(run, &unused_runs, in_unused) + vy_index_remove_run(index, run); + rlist_foreach_entry_safe(slice, &compacted_slices, + in_range, next_slice) { + vy_slice_wait_pinned(slice); + vy_slice_delete(slice); + } + + /* The iterator has been cleaned up in worker. */ + task->wi->iface->close(task->wi); + + assert(range->heap_node.pos == UINT32_MAX); + vy_range_heap_insert(&index->range_heap, &range->heap_node); + vy_scheduler_update_index(scheduler, index); + + say_info("%s: completed compacting range %s", + vy_index_name(index), vy_range_str(range)); + return 0; +} + +static void +vy_task_compact_abort(struct vy_scheduler *scheduler, struct vy_task *task, + bool in_shutdown) +{ + struct vy_index *index = task->index; + struct vy_range *range = task->range; + + /* The iterator has been cleaned up in worker. */ + task->wi->iface->close(task->wi); + + /* + * It's no use alerting the user if the server is + * shutting down or the index was dropped. + */ + if (!in_shutdown && !index->is_dropped) { + say_error("%s: failed to compact range %s: %s", + vy_index_name(index), vy_range_str(range), + diag_last_error(&task->diag)->errmsg); + } + + /* The metadata log is unavailable on shutdown. */ + if (!in_shutdown) + vy_run_discard(task->new_run); + else + vy_run_unref(task->new_run); + + assert(range->heap_node.pos == UINT32_MAX); + vy_range_heap_insert(&index->range_heap, &range->heap_node); + vy_scheduler_update_index(scheduler, index); +} + +static int +vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, + struct vy_task **p_task) +{ + static struct vy_task_ops compact_ops = { + .execute = vy_task_compact_execute, + .complete = vy_task_compact_complete, + .abort = vy_task_compact_abort, + }; + + struct heap_node *range_node; + struct vy_range *range; + + assert(!index->is_dropped); + + range_node = vy_range_heap_top(&index->range_heap); + assert(range_node != NULL); + range = container_of(range_node, struct vy_range, heap_node); + assert(range->compact_priority > 1); + + if (vy_index_split_range(index, range) || + vy_index_coalesce_range(index, range)) { + vy_scheduler_update_index(scheduler, index); + return 0; + } + + struct vy_task *task = vy_task_new(&scheduler->task_pool, + index, &compact_ops); + if (task == NULL) + goto err_task; + + struct vy_run *new_run = vy_run_prepare(index); + if (new_run == NULL) + goto err_run; + + struct vy_stmt_stream *wi; + bool is_last_level = (range->compact_priority == range->slice_count); + wi = vy_write_iterator_new(index->cmp_def, index->disk_format, + index->upsert_format, index->id == 0, + is_last_level, scheduler->read_views); + if (wi == NULL) + goto err_wi; + + struct vy_slice *slice; + int n = range->compact_priority; + rlist_foreach_entry(slice, &range->slices, in_range) { + if (vy_write_iterator_new_slice(wi, slice, + scheduler->run_env) != 0) + goto err_wi_sub; + + task->max_output_count += slice->count.rows; + new_run->dump_lsn = MAX(new_run->dump_lsn, + slice->run->dump_lsn); + + /* Remember the slices we are compacting. */ + if (task->first_slice == NULL) + task->first_slice = slice; + task->last_slice = slice; + + if (--n == 0) + break; + } + assert(n == 0); + assert(new_run->dump_lsn >= 0); + + task->range = range; + task->new_run = new_run; + task->wi = wi; + task->bloom_fpr = index->opts.bloom_fpr; + task->page_size = index->opts.page_size; + + /* + * Remove the range we are going to compact from the heap + * so that it doesn't get selected again. + */ + vy_range_heap_delete(&index->range_heap, range_node); + range_node->pos = UINT32_MAX; + vy_scheduler_update_index(scheduler, index); + + say_info("%s: started compacting range %s, runs %d/%d", + vy_index_name(index), vy_range_str(range), + range->compact_priority, range->slice_count); + *p_task = task; + return 0; + +err_wi_sub: + task->wi->iface->close(wi); +err_wi: + vy_run_discard(new_run); +err_run: + vy_task_delete(&scheduler->task_pool, task); +err_task: + say_error("%s: could not start compacting range %s: %s", + vy_index_name(index), vy_range_str(range), + diag_last_error(diag_get())->errmsg); + return -1; +} + +/** + * Create a task for dumping an index. The new task is returned + * in @ptask. If there's no index that needs to be dumped @ptask + * is set to NULL. + * + * We only dump an index if it needs to be snapshotted or the quota + * on memory usage is exceeded. In either case, the oldest index + * is selected, because dumping it will free the maximal amount of + * memory due to log structured design of the memory allocator. + * + * Returns 0 on success, -1 on failure. + */ +static int +vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask) +{ +retry: + *ptask = NULL; + assert(scheduler->dump_generation <= scheduler->generation); + if (scheduler->dump_generation == scheduler->generation) { + /* + * All memory trees of past generations have + * been dumped, nothing to do. + */ + return 0; + } + /* + * Look up the oldest index eligible for dump. + */ + struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); + if (pn == NULL) { + /* + * There is no vinyl index and so no task to schedule. + * Complete the current dump round. + */ + vy_scheduler_complete_dump(scheduler); + return 0; + } + struct vy_index *index = container_of(pn, struct vy_index, in_dump); + if (!index->is_dumping && index->pin_count == 0 && + vy_index_generation(index) == scheduler->dump_generation) { + /* + * Dump is in progress and there is an index that + * contains data that must be dumped at the current + * round. Try to create a task for it. + */ + if (vy_task_dump_new(scheduler, index, ptask) != 0) + return -1; + if (*ptask != NULL) + return 0; /* new task */ + /* + * All in-memory trees eligible for dump were empty + * and so were deleted without involving a worker + * thread. Check another index. + */ + goto retry; + } + /* + * Dump is in progress, but all eligible indexes are + * already being dumped. Wait until the current round + * is complete. + */ + assert(scheduler->dump_task_count > 0); + return 0; +} + +/** + * Create a task for compacting a range. The new task is returned + * in @ptask. If there's no range that needs to be compacted @ptask + * is set to NULL. + * + * We compact ranges that have more runs in a level than specified + * by run_count_per_level configuration option. Among those runs we + * give preference to those ranges whose compaction will reduce + * read amplification most. + * + * Returns 0 on success, -1 on failure. + */ +static int +vy_scheduler_peek_compact(struct vy_scheduler *scheduler, + struct vy_task **ptask) +{ +retry: + *ptask = NULL; + struct heap_node *pn = vy_compact_heap_top(&scheduler->compact_heap); + if (pn == NULL) + return 0; /* nothing to do */ + struct vy_index *index = container_of(pn, struct vy_index, in_compact); + if (vy_index_compact_priority(index) <= 1) + return 0; /* nothing to do */ + if (vy_task_compact_new(scheduler, index, ptask) != 0) + return -1; + if (*ptask == NULL) + goto retry; /* index dropped or range split/coalesced */ + return 0; /* new task */ +} + +static int +vy_schedule(struct vy_scheduler *scheduler, struct vy_task **ptask) +{ + *ptask = NULL; + + if (vy_scheduler_peek_dump(scheduler, ptask) != 0) + goto fail; + if (*ptask != NULL) + return 0; + + if (scheduler->workers_available <= 1) { + /* + * If all worker threads are busy doing compaction + * when we run out of quota, ongoing transactions will + * hang until one of the threads has finished, which + * may take quite a while. To avoid unpredictably long + * stalls, always keep one worker thread reserved for + * dumps. + */ + return 0; + } + + if (vy_scheduler_peek_compact(scheduler, ptask) != 0) + goto fail; + if (*ptask != NULL) + return 0; + + /* no task to run */ + return 0; +fail: + assert(!diag_is_empty(diag_get())); + diag_move(diag_get(), &scheduler->diag); + return -1; + +} + +static int +vy_scheduler_complete_task(struct vy_scheduler *scheduler, + struct vy_task *task) +{ + if (task->index->is_dropped) { + if (task->ops->abort) + task->ops->abort(scheduler, task, false); + return 0; + } + + struct diag *diag = &task->diag; + if (task->status != 0) { + assert(!diag_is_empty(diag)); + goto fail; /* ->execute fialed */ + } + ERROR_INJECT(ERRINJ_VY_TASK_COMPLETE, { + diag_set(ClientError, ER_INJECTION, + "vinyl task completion"); + diag_move(diag_get(), diag); + goto fail; }); + if (task->ops->complete && + task->ops->complete(scheduler, task) != 0) { + assert(!diag_is_empty(diag_get())); + diag_move(diag_get(), diag); + goto fail; + } + return 0; +fail: + if (task->ops->abort) + task->ops->abort(scheduler, task, false); + diag_move(diag, &scheduler->diag); + return -1; +} + +static int +vy_scheduler_f(va_list va) +{ + struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); + + /* + * Yield immediately, until the quota watermark is reached + * for the first time or a checkpoint is made. + * Then start the worker threads: we know they will be + * needed. If quota watermark is never reached, workers + * are not started and the scheduler is idle until + * shutdown or checkpoint. + */ + fiber_cond_wait(&scheduler->scheduler_cond); + if (scheduler->scheduler_fiber == NULL) + return 0; /* destroyed */ + + vy_scheduler_start_workers(scheduler); + + while (scheduler->scheduler_fiber != NULL) { + struct stailq output_queue; + struct vy_task *task, *next; + int tasks_failed = 0, tasks_done = 0; + bool was_empty; + + /* Get the list of processed tasks. */ + stailq_create(&output_queue); + tt_pthread_mutex_lock(&scheduler->mutex); + stailq_concat(&output_queue, &scheduler->output_queue); + tt_pthread_mutex_unlock(&scheduler->mutex); + + /* Complete and delete all processed tasks. */ + stailq_foreach_entry_safe(task, next, &output_queue, link) { + if (vy_scheduler_complete_task(scheduler, task) != 0) + tasks_failed++; + else + tasks_done++; + vy_task_delete(&scheduler->task_pool, task); + scheduler->workers_available++; + assert(scheduler->workers_available <= + scheduler->worker_pool_size); + } + /* + * Reset the timeout if we managed to successfully + * complete at least one task. + */ + if (tasks_done > 0) { + scheduler->timeout = 0; + /* + * Task completion callback may yield, which + * opens a time window for a worker to submit + * a processed task and wake up the scheduler + * (via scheduler_async). Hence we should go + * and recheck the output_queue in order not + * to lose a wakeup event and hang for good. + */ + continue; + } + /* Throttle for a while if a task failed. */ + if (tasks_failed > 0) + goto error; + /* All worker threads are busy. */ + if (scheduler->workers_available == 0) + goto wait; + /* Get a task to schedule. */ + if (vy_schedule(scheduler, &task) != 0) + goto error; + /* Nothing to do. */ + if (task == NULL) + goto wait; + + /* Queue the task and notify workers if necessary. */ + tt_pthread_mutex_lock(&scheduler->mutex); + was_empty = stailq_empty(&scheduler->input_queue); + stailq_add_tail_entry(&scheduler->input_queue, task, link); + if (was_empty) + tt_pthread_cond_signal(&scheduler->worker_cond); + tt_pthread_mutex_unlock(&scheduler->mutex); + + scheduler->workers_available--; + fiber_reschedule(); + continue; +error: + /* Abort pending checkpoint. */ + fiber_cond_signal(&scheduler->dump_cond); + /* + * A task can fail either due to lack of memory or IO + * error. In either case it is pointless to schedule + * another task right away, because it is likely to fail + * too. So we throttle the scheduler for a while after + * each failure. + */ + scheduler->timeout *= 2; + if (scheduler->timeout < VY_SCHEDULER_TIMEOUT_MIN) + scheduler->timeout = VY_SCHEDULER_TIMEOUT_MIN; + if (scheduler->timeout > VY_SCHEDULER_TIMEOUT_MAX) + scheduler->timeout = VY_SCHEDULER_TIMEOUT_MAX; + struct errinj *inj; + inj = errinj(ERRINJ_VY_SCHED_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam != 0) + scheduler->timeout = inj->dparam; + say_warn("throttling scheduler for %.0f second(s)", + scheduler->timeout); + scheduler->is_throttled = true; + fiber_sleep(scheduler->timeout); + scheduler->is_throttled = false; + continue; +wait: + /* Wait for changes */ + fiber_cond_wait(&scheduler->scheduler_cond); + } + + return 0; +} + +static int +vy_worker_f(va_list va) +{ + struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); + coio_enable(); + struct vy_task *task = NULL; + + tt_pthread_mutex_lock(&scheduler->mutex); + while (scheduler->is_worker_pool_running) { + /* Wait for a task */ + if (stailq_empty(&scheduler->input_queue)) { + /* Wake scheduler up if there are no more tasks */ + ev_async_send(scheduler->scheduler_loop, + &scheduler->scheduler_async); + tt_pthread_cond_wait(&scheduler->worker_cond, + &scheduler->mutex); + continue; + } + task = stailq_shift_entry(&scheduler->input_queue, + struct vy_task, link); + tt_pthread_mutex_unlock(&scheduler->mutex); + assert(task != NULL); + + /* Execute task */ + task->status = task->ops->execute(task); + if (task->status != 0) { + struct diag *diag = diag_get(); + assert(!diag_is_empty(diag)); + diag_move(diag, &task->diag); + } + + /* Return processed task to scheduler */ + tt_pthread_mutex_lock(&scheduler->mutex); + stailq_add_tail_entry(&scheduler->output_queue, task, link); + } + tt_pthread_mutex_unlock(&scheduler->mutex); + return 0; +} diff --git a/src/box/vy_scheduler.h b/src/box/vy_scheduler.h new file mode 100644 index 0000000000000000000000000000000000000000..20e07e17a0973e78ca893f6987e2a1515aa9d704 --- /dev/null +++ b/src/box/vy_scheduler.h @@ -0,0 +1,229 @@ +#ifndef INCLUDES_TARANTOOL_BOX_VY_SCHEDULER_H +#define INCLUDES_TARANTOOL_BOX_VY_SCHEDULER_H +/* + * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <stdbool.h> +#include <stdint.h> +#include <small/mempool.h> +#include <small/rlist.h> +#include <tarantool_ev.h> + +#include "diag.h" +#include "fiber_cond.h" +#define HEAP_FORWARD_DECLARATION +#include "salad/heap.h" +#include "salad/stailq.h" +#include "tt_pthread.h" + +#if defined(__cplusplus) +extern "C" { +#endif /* defined(__cplusplus) */ + +struct cord; +struct fiber; +struct vy_index; +struct vy_run_env; + +typedef void +(*vy_scheduler_dump_complete_f)(int64_t dump_generation, + double dump_duration, void *arg); + +struct vy_scheduler { + /** Scheduler fiber. */ + struct fiber *scheduler_fiber; + /** Scheduler event loop. */ + struct ev_loop *scheduler_loop; + /** Used to wake up the scheduler fiber from TX. */ + struct fiber_cond scheduler_cond; + /** Used to wake up the scheduler from a worker thread. */ + struct ev_async scheduler_async; + /** + * Array of worker threads used for performing + * dump/compaction tasks. + */ + struct cord *worker_pool; + /** Set if the worker threads are running. */ + bool is_worker_pool_running; + /** Total number of worker threads. */ + int worker_pool_size; + /** Number worker threads that are currently idle. */ + int workers_available; + /** Memory pool used for allocating vy_task objects. */ + struct mempool task_pool; + /** Queue of pending tasks, linked by vy_task::link. */ + struct stailq input_queue; + /** Queue of processed tasks, linked by vy_task::link. */ + struct stailq output_queue; + /** + * Signaled to wake up a worker when there is + * a pending task in the input queue. Also used + * to stop worker threads on shutdown. + */ + pthread_cond_t worker_cond; + /** + * Mutex protecting input and output queues and + * the condition variable used to wake up worker + * threads. + */ + pthread_mutex_t mutex; + /** + * Heap of indexes, ordered by dump priority, + * linked by vy_index::in_dump. + */ + heap_t dump_heap; + /** + * Heap of indexes, ordered by compaction priority, + * linked by vy_index::in_compact. + */ + heap_t compact_heap; + /** Last error seen by the scheduler. */ + struct diag diag; + /** + * Scheduler timeout. Grows exponentially with each + * successive failure. Reset on successful task completion. + */ + double timeout; + /** Set if the scheduler is throttled due to errors. */ + bool is_throttled; + /** Set if checkpoint is in progress. */ + bool checkpoint_in_progress; + /** + * In order to guarantee checkpoint consistency, we must not + * dump in-memory trees created after checkpoint was started + * so we set this flag instead, which will make the scheduler + * schedule a dump as soon as checkpoint is complete. + */ + bool dump_pending; + /** + * Current generation of in-memory data. + * + * New in-memory trees inherit the current generation, while + * the scheduler dumps all in-memory trees whose generation + * is less. The generation is increased either on checkpoint + * or on exceeding the memory quota to force dumping all old + * in-memory trees. + */ + int64_t generation; + /** + * Generation of in-memory data currently being dumped. + * + * If @dump_generation < @generation, the scheduler is dumping + * in-memory trees created at @dump_generation. When all such + * trees have been dumped, it bumps @dump_generation and frees + * memory. + * + * If @dump_generation == @generation, dump have been completed + * and the scheduler won't schedule a dump task until @generation + * is bumped, which may happen either on exceeding the memory + * quota or on checkpoint. + * + * Throughout the code, a process of dumping all in-memory trees + * at @dump_generation is called 'dump round'. + */ + int64_t dump_generation; + /** Number of dump tasks that are currently in progress. */ + int dump_task_count; + /** Time when the current dump round started. */ + double dump_start; + /** Signaled on dump round completion. */ + struct fiber_cond dump_cond; + /** + * Function called by the scheduler upon dump round + * completion. It is supposed to free memory released + * by the dump. + */ + vy_scheduler_dump_complete_f dump_complete_cb; + /** Argument passed to dump_complete_cb(). */ + void *cb_arg; + /** List of read views, see tx_manager::read_views. */ + struct rlist *read_views; + /** Context needed for writing runs. */ + struct vy_run_env *run_env; +}; + +/** + * Create a scheduler instance. + */ +struct vy_scheduler * +vy_scheduler_new(int write_threads, + vy_scheduler_dump_complete_f dump_complete_cb, void *cb_arg, + struct vy_run_env *run_env, struct rlist *read_views); + +/** + * Destroy a scheduler instance. + */ +void +vy_scheduler_delete(struct vy_scheduler *scheduler); + +/** + * Add an index to scheduler dump/compaction queues. + */ +void +vy_scheduler_add_index(struct vy_scheduler *, struct vy_index *); + +/** + * Remove an index from scheduler dump/compaction queues. + */ +void +vy_scheduler_remove_index(struct vy_scheduler *, struct vy_index *); + +/** + * Trigger dump of all currently existing in-memory trees. + */ +void +vy_scheduler_trigger_dump(struct vy_scheduler *scheduler); + +/** + * Schedule a checkpoint. Please call vy_scheduler_wait_checkpoint() + * after that. + */ +int +vy_scheduler_begin_checkpoint(struct vy_scheduler *); + +/** + * Wait for checkpoint. Please call vy_scheduler_end_checkpoint() + * after that. + */ +int +vy_scheduler_wait_checkpoint(struct vy_scheduler *); + +/** + * End checkpoint. Called on both checkpoint commit and abort. + */ +void +vy_scheduler_end_checkpoint(struct vy_scheduler *); + +#if defined(__cplusplus) +} /* extern "C" */ +#endif /* defined(__cplusplus) */ + +#endif /* INCLUDES_TARANTOOL_BOX_VY_SCHEDULER_H */