From 47c3aa20ba7c2c4c4bc95a4b4ea2a0a61343945a Mon Sep 17 00:00:00 2001 From: Vladimir Davydov <vdavydov.dev@gmail.com> Date: Sat, 1 Jul 2017 14:11:55 +0300 Subject: [PATCH] vinyl: store indexes in scheduler->compact_heap The compact_heap, used by the scheduler to schedule range compaction, contains all ranges except those that are currently being compacted. Since the appropriate vy_index object is required to schedule a range compaction, we have to store a pointer to the index a range belongs to in vy_range->index. This makes it impossible to move vy_range struct and its implementation to a separate source file. To address this, let's rework the scheduler as follows: - Make compact_heap store indexes, not ranges. An index is prioritized by the greatest compact_priority among its ranges. - Add a heap of ranges to each index, prioritized by compact_priority. A range is removed from the heap while it's being compacted. - Do not remove indexes from dump_heap or compact_heap when a task is scheduled (otherwise we could only schedule one compaction per index). Instead just update the index position in the heaps. Needed for #1906 --- src/box/vinyl.c | 268 +++++++++++++++++++++++++++--------------------- 1 file changed, 153 insertions(+), 115 deletions(-) diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 24250c1bb8..b8ad659823 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -313,8 +313,8 @@ struct vy_range { int n_compactions; /** Link in vy_index->tree. */ rb_node(struct vy_range) tree_node; - /** Link in vy_scheduler->compact_heap. */ - struct heap_node in_compact; + /** Link in vy_index->range_heap. */ + struct heap_node heap_node; /** * Incremented whenever an in-memory index or on disk * run is added to or deleted from this range. Used to @@ -509,11 +509,17 @@ struct vy_index { struct vy_index *pk; /** Link in vy_scheduler->dump_heap. */ struct heap_node in_dump; + /** Link in vy_scheduler->compact_heap. */ + struct heap_node in_compact; /** * If pin_count > 0 the index can't be scheduled for dump. * Used to make sure that the primary index is dumped last. */ int pin_count; + /** The index is currently being dumped. */ + bool is_dumping; + /** Heap of ranges, prioritized by compact_priority. */ + heap_t range_heap; /** * The number of times the index was truncated. * @@ -1286,7 +1292,35 @@ vy_run_discard(struct vy_run *run) static bool vy_range_is_scheduled(struct vy_range *range) { - return range->in_compact.pos == UINT32_MAX; + return range->heap_node.pos == UINT32_MAX; +} + +#define HEAP_NAME vy_range_heap + +static bool +vy_range_heap_less(struct heap_node *a, struct heap_node *b) +{ + struct vy_range *left = container_of(a, struct vy_range, heap_node); + struct vy_range *right = container_of(b, struct vy_range, heap_node); + return left->compact_priority > right->compact_priority; +} + +#define HEAP_LESS(h, l, r) vy_range_heap_less(l, r) + +#include "salad/heap.h" + +#undef HEAP_LESS +#undef HEAP_NAME + +/** Return max compact_priority among ranges of an index. */ +static int +vy_index_compact_priority(struct vy_index *index) +{ + struct heap_node *n = vy_range_heap_top(&index->range_heap); + if (n == NULL) + return 0; + struct vy_range *range = container_of(n, struct vy_range, heap_node); + return range->compact_priority; } #define HEAP_NAME vy_dump_heap @@ -1294,12 +1328,21 @@ vy_range_is_scheduled(struct vy_range *range) static bool vy_dump_heap_less(struct heap_node *a, struct heap_node *b) { - struct vy_index *left = container_of(a, struct vy_index, in_dump); - struct vy_index *right = container_of(b, struct vy_index, in_dump); + struct vy_index *i1 = container_of(a, struct vy_index, in_dump); + struct vy_index *i2 = container_of(b, struct vy_index, in_dump); + + /* + * Indexes that are currently being dumped or can't be scheduled + * for dump right now are moved off the top of the heap. + */ + if (i1->is_dumping != i2->is_dumping) + return i1->is_dumping < i2->is_dumping; + if (i1->pin_count != i2->pin_count) + return i1->pin_count < i2->pin_count; /* Older indexes are dumped first. */ - if (left->generation != right->generation) - return left->generation < right->generation; + if (i1->generation != i2->generation) + return i1->generation < i2->generation; /* * If a space has more than one index, appending a statement * to it requires reading the primary index to get the old @@ -1308,7 +1351,7 @@ vy_dump_heap_less(struct heap_node *a, struct heap_node *b) * ahead of secondary indexes of the same space, i.e. it must * be dumped last. */ - return left->id > right->id; + return i1->id > i2->id; } #define HEAP_LESS(h, l, r) vy_dump_heap_less(l, r) @@ -1323,19 +1366,22 @@ vy_dump_heap_less(struct heap_node *a, struct heap_node *b) static bool vy_compact_heap_less(struct heap_node *a, struct heap_node *b) { - struct vy_range *left = container_of(a, struct vy_range, in_compact); - struct vy_range *right = container_of(b, struct vy_range, in_compact); + struct vy_index *i1 = container_of(a, struct vy_index, in_compact); + struct vy_index *i2 = container_of(b, struct vy_index, in_compact); /* - * Prefer ranges whose read amplification will be reduced + * Prefer indexes whose read amplification will be reduced * most as a result of compaction. */ - return left->compact_priority > right->compact_priority; + return vy_index_compact_priority(i1) > vy_index_compact_priority(i2); } #define HEAP_LESS(h, l, r) vy_compact_heap_less(l, r) #include "salad/heap.h" +#undef HEAP_LESS +#undef HEAP_NAME + struct vy_scheduler { pthread_mutex_t mutex; struct vy_env *env; @@ -1434,12 +1480,6 @@ vy_scheduler_pin_index(struct vy_scheduler *, struct vy_index *); static void vy_scheduler_unpin_index(struct vy_scheduler *, struct vy_index *); static void -vy_scheduler_add_range(struct vy_scheduler *, struct vy_range *); -static void -vy_scheduler_update_range(struct vy_scheduler *, struct vy_range *); -static void -vy_scheduler_remove_range(struct vy_scheduler *, struct vy_range *); -static void vy_scheduler_add_mem(struct vy_scheduler *scheduler, struct vy_mem *mem); static void vy_scheduler_remove_mem(struct vy_scheduler *scheduler, struct vy_mem *mem); @@ -1667,6 +1707,8 @@ vy_range_iterator_restore(struct vy_range_iterator *itr, static void vy_index_add_range(struct vy_index *index, struct vy_range *range) { + assert(range->heap_node.pos == UINT32_MAX); + vy_range_heap_insert(&index->range_heap, &range->heap_node); vy_range_tree_insert(index->tree, range); index->range_count++; } @@ -1674,6 +1716,8 @@ vy_index_add_range(struct vy_index *index, struct vy_range *range) static void vy_index_remove_range(struct vy_index *index, struct vy_range *range) { + assert(range->heap_node.pos != UINT32_MAX); + vy_range_heap_delete(&index->range_heap, &range->heap_node); vy_range_tree_remove(index->tree, range); index->range_count--; } @@ -1707,7 +1751,7 @@ vy_range_new(struct vy_index *index, int64_t id, } rlist_create(&range->slices); range->index = index; - range->in_compact.pos = UINT32_MAX; + range->heap_node.pos = UINT32_MAX; return range; } @@ -1742,9 +1786,6 @@ vy_index_rotate_mem(struct vy_index *index) static void vy_range_delete(struct vy_range *range) { - /* The range has been deleted from the scheduler queues. */ - assert(range->in_compact.pos == UINT32_MAX); - if (range->begin != NULL) tuple_unref(range->begin); if (range->end != NULL) @@ -1922,7 +1963,6 @@ vy_range_maybe_split(struct vy_range *range) /* * Replace the old range in the index. */ - vy_scheduler_remove_range(scheduler, range); vy_index_unacct_range(index, range); vy_index_remove_range(index, range); @@ -1930,10 +1970,11 @@ vy_range_maybe_split(struct vy_range *range) part = parts[i]; vy_index_add_range(index, part); vy_index_acct_range(index, part); - vy_scheduler_add_range(scheduler, part); } index->range_tree_version++; + vy_scheduler_update_index(scheduler, index); + say_info("%s: split range %s by key %s", vy_index_name(index), vy_range_str(range), vy_key_str(split_key_raw)); @@ -2164,7 +2205,6 @@ vy_range_maybe_coalesce(struct vy_range *range) it = first; while (it != end) { struct vy_range *next = vy_range_tree_next(index->tree, it); - vy_scheduler_remove_range(scheduler, it); vy_index_unacct_range(index, it); vy_index_remove_range(index, it); rlist_splice(&result->slices, &it->slices); @@ -2183,7 +2223,8 @@ vy_range_maybe_coalesce(struct vy_range *range) vy_index_acct_range(index, result); vy_index_add_range(index, result); index->range_tree_version++; - vy_scheduler_add_range(scheduler, result); + + vy_scheduler_update_index(scheduler, index); say_info("%s: coalesced ranges %s", vy_index_name(index), vy_range_str(result)); @@ -2491,7 +2532,6 @@ vy_index_recover(struct vy_index *index) return -1; } vy_index_acct_range(index, range); - vy_scheduler_add_range(env->scheduler, range); } if (prev == NULL) { diag_set(ClientError, ER_INVALID_VYLOG_FILE, @@ -2506,7 +2546,8 @@ vy_index_recover(struct vy_index *index) (long long)prev->id)); return -1; } - vy_scheduler_add_index(env->scheduler, index); + if (!index->is_dropped) + vy_scheduler_add_index(env->scheduler, index); return 0; } @@ -2926,8 +2967,6 @@ static int vy_task_dump_execute(struct vy_task *task) { struct vy_index *index = task->index; - /* The index has been deleted from the scheduler queues. */ - assert(index->in_dump.pos == UINT32_MAX); return vy_run_write(task->new_run, index->env->conf->path, index->space_id, index->id, task->wi, @@ -2950,6 +2989,8 @@ vy_task_dump_complete(struct vy_task *task) struct tuple *min_key, *max_key; int i, loops = 0; + assert(index->is_dumping); + if (vy_run_is_empty(new_run)) { /* * In case the run is empty, we can discard the run @@ -3055,7 +3096,8 @@ vy_task_dump_complete(struct vy_task *task) vy_index_acct_range(index, range); vy_range_update_compact_priority(range); if (!vy_range_is_scheduled(range)) - vy_scheduler_update_range(scheduler, range); + vy_range_heap_update(&index->range_heap, + &range->heap_node); range->version++; /* * If we yield here, a concurrent fiber will see @@ -3092,7 +3134,9 @@ vy_task_dump_complete(struct vy_task *task) /* The iterator has been cleaned up in a worker thread. */ task->wi->iface->close(task->wi); - vy_scheduler_add_index(scheduler, index); + index->is_dumping = false; + vy_scheduler_update_index(scheduler, index); + if (index->id != 0) vy_scheduler_unpin_index(scheduler, index->pk); @@ -3118,6 +3162,8 @@ vy_task_dump_abort(struct vy_task *task, bool in_shutdown) struct vy_index *index = task->index; struct vy_scheduler *scheduler = index->env->scheduler; + assert(index->is_dumping); + /* The iterator has been cleaned up in a worker thread. */ task->wi->iface->close(task->wi); @@ -3128,7 +3174,9 @@ vy_task_dump_abort(struct vy_task *task, bool in_shutdown) } else vy_run_unref(task->new_run); - vy_scheduler_add_index(scheduler, index); + index->is_dumping = false; + vy_scheduler_update_index(scheduler, index); + if (index->id != 0) vy_scheduler_unpin_index(scheduler, index->pk); } @@ -3152,14 +3200,11 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) struct vy_scheduler *scheduler = index->env->scheduler; int64_t generation = scheduler->generation; + assert(!index->is_dropped); + assert(!index->is_dumping); assert(index->pin_count == 0); assert(index->generation < generation); - if (index->is_dropped) { - vy_scheduler_remove_index(scheduler, index); - return 0; - } - struct errinj *inj = errinj(ERRINJ_VY_INDEX_DUMP, ERRINJ_INT); if (inj != NULL && inj->iparam == (int)index->id) { diag_set(ClientError, ER_INJECTION, "vinyl index dump"); @@ -3238,7 +3283,9 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) task->bloom_fpr = index->opts.bloom_fpr; task->page_size = index->opts.page_size; - vy_scheduler_remove_index(scheduler, index); + index->is_dumping = true; + vy_scheduler_update_index(scheduler, index); + if (index->id != 0) { /* * The primary index must be dumped after all @@ -3271,8 +3318,6 @@ static int vy_task_compact_execute(struct vy_task *task) { struct vy_index *index = task->index; - /* The range has been deleted from the scheduler queues. */ - assert(task->range->in_compact.pos == UINT32_MAX); return vy_run_write(task->new_run, index->env->conf->path, index->space_id, index->id, task->wi, @@ -3405,7 +3450,9 @@ vy_task_compact_complete(struct vy_task *task) /* The iterator has been cleaned up in worker. */ task->wi->iface->close(task->wi); - vy_scheduler_add_range(scheduler, range); + assert(range->heap_node.pos == UINT32_MAX); + vy_range_heap_insert(&index->range_heap, &range->heap_node); + vy_scheduler_update_index(scheduler, index); say_info("%s: completed compacting range %s", vy_index_name(index), vy_range_str(range)); @@ -3430,28 +3477,31 @@ vy_task_compact_abort(struct vy_task *task, bool in_shutdown) } else vy_run_unref(task->new_run); - vy_scheduler_add_range(scheduler, range); + assert(range->heap_node.pos == UINT32_MAX); + vy_range_heap_insert(&index->range_heap, &range->heap_node); + vy_scheduler_update_index(scheduler, index); } static int -vy_task_compact_new(struct vy_range *range, struct vy_task **p_task) +vy_task_compact_new(struct vy_index *index, struct vy_task **p_task) { - assert(range->compact_priority > 1); - static struct vy_task_ops compact_ops = { .execute = vy_task_compact_execute, .complete = vy_task_compact_complete, .abort = vy_task_compact_abort, }; - struct vy_index *index = range->index; struct tx_manager *xm = index->env->xm; struct vy_scheduler *scheduler = index->env->scheduler; + struct heap_node *range_node; + struct vy_range *range; - if (index->is_dropped) { - vy_scheduler_remove_range(scheduler, range); - return 0; - } + assert(!index->is_dropped); + + range_node = vy_range_heap_top(&index->range_heap); + assert(range_node != NULL); + range = container_of(range_node, struct vy_range, heap_node); + assert(range->compact_priority > 1); if (vy_range_maybe_split(range)) return 0; @@ -3504,7 +3554,13 @@ vy_task_compact_new(struct vy_range *range, struct vy_task **p_task) task->bloom_fpr = index->opts.bloom_fpr; task->page_size = index->opts.page_size; - vy_scheduler_remove_range(scheduler, range); + /* + * Remove the range we are going to compact from the heap + * so that it doesn't get selected again. + */ + vy_range_heap_delete(&index->range_heap, range_node); + range_node->pos = UINT32_MAX; + vy_scheduler_update_index(scheduler, index); say_info("%s: started compacting range %s, runs %d/%d", vy_index_name(index), vy_range_str(range), @@ -3634,65 +3690,56 @@ static void vy_scheduler_add_index(struct vy_scheduler *scheduler, struct vy_index *index) { + assert(!index->is_dropped); + assert(index->in_dump.pos == UINT32_MAX); + assert(index->in_compact.pos == UINT32_MAX); vy_dump_heap_insert(&scheduler->dump_heap, &index->in_dump); - assert(index->in_dump.pos != UINT32_MAX); + vy_compact_heap_insert(&scheduler->compact_heap, &index->in_compact); } static void vy_scheduler_update_index(struct vy_scheduler *scheduler, struct vy_index *index) { - vy_dump_heap_update(&scheduler->dump_heap, &index->in_dump); + if (index->is_dropped) { + /* Dropped indexes are exempted from scheduling. */ + assert(index->in_dump.pos == UINT32_MAX); + assert(index->in_compact.pos == UINT32_MAX); + return; + } assert(index->in_dump.pos != UINT32_MAX); + assert(index->in_compact.pos != UINT32_MAX); + vy_dump_heap_update(&scheduler->dump_heap, &index->in_dump); + vy_compact_heap_update(&scheduler->compact_heap, &index->in_compact); } static void vy_scheduler_remove_index(struct vy_scheduler *scheduler, struct vy_index *index) { + assert(index->in_dump.pos != UINT32_MAX); + assert(index->in_compact.pos != UINT32_MAX); vy_dump_heap_delete(&scheduler->dump_heap, &index->in_dump); + vy_compact_heap_delete(&scheduler->compact_heap, &index->in_compact); index->in_dump.pos = UINT32_MAX; + index->in_compact.pos = UINT32_MAX; } static void vy_scheduler_pin_index(struct vy_scheduler *scheduler, struct vy_index *index) { - if (index->pin_count == 0) - vy_scheduler_remove_index(scheduler, index); - index->pin_count++; + assert(!index->is_dumping); + if (index->pin_count++ == 0) + vy_scheduler_update_index(scheduler, index); } static void vy_scheduler_unpin_index(struct vy_scheduler *scheduler, struct vy_index *index) { + assert(!index->is_dumping); assert(index->pin_count > 0); - index->pin_count--; - if (index->pin_count == 0) - vy_scheduler_add_index(scheduler, index); -} - -static void -vy_scheduler_add_range(struct vy_scheduler *scheduler, - struct vy_range *range) -{ - vy_compact_heap_insert(&scheduler->compact_heap, &range->in_compact); - assert(range->in_compact.pos != UINT32_MAX); -} - -static void -vy_scheduler_update_range(struct vy_scheduler *scheduler, - struct vy_range *range) -{ - vy_compact_heap_update(&scheduler->compact_heap, &range->in_compact); - assert(range->in_compact.pos != UINT32_MAX); -} - -static void -vy_scheduler_remove_range(struct vy_scheduler *scheduler, - struct vy_range *range) -{ - vy_compact_heap_delete(&scheduler->compact_heap, &range->in_compact); - range->in_compact.pos = UINT32_MAX; + if (--index->pin_count == 0) + vy_scheduler_update_index(scheduler, index); } /** @@ -3718,7 +3765,8 @@ vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask) if (pn == NULL) return 0; /* nothing to do */ struct vy_index *index = container_of(pn, struct vy_index, in_dump); - if (index->generation == scheduler->generation) + if (index->is_dumping || index->pin_count > 0 || + index->generation == scheduler->generation) return 0; /* nothing to do */ if (vy_task_dump_new(index, ptask) != 0) return -1; @@ -3748,10 +3796,10 @@ vy_scheduler_peek_compact(struct vy_scheduler *scheduler, struct heap_node *pn = vy_compact_heap_top(&scheduler->compact_heap); if (pn == NULL) return 0; /* nothing to do */ - struct vy_range *range = container_of(pn, struct vy_range, in_compact); - if (range->compact_priority <= 1) + struct vy_index *index = container_of(pn, struct vy_index, in_compact); + if (vy_index_compact_priority(index) <= 1) return 0; /* nothing to do */ - if (vy_task_compact_new(range, ptask) != 0) + if (vy_task_compact_new(index, ptask) != 0) return -1; if (*ptask == NULL) goto retry; /* index dropped or range split/coalesced */ @@ -4605,7 +4653,6 @@ vy_index_commit_create(struct vy_index *index) * After we committed the index in the log, we can schedule * a task for it. */ - vy_scheduler_add_range(env->scheduler, range); vy_scheduler_add_index(env->scheduler, index); } @@ -4652,6 +4699,7 @@ vy_index_commit_drop(struct vy_index *index) return; index->is_dropped = true; + vy_scheduler_remove_index(env->scheduler, index); vy_log_tx_begin(); vy_log_index_prune(index); @@ -4679,6 +4727,7 @@ vy_index_swap(struct vy_index *old_index, struct vy_index *new_index) SWAP(old_index->stat, new_index->stat); SWAP(old_index->run_hist, new_index->run_hist); SWAP(old_index->tree, new_index->tree); + SWAP(old_index->range_heap, new_index->range_heap); rlist_swap(&old_index->runs, &new_index->runs); } @@ -4728,7 +4777,10 @@ vy_prepare_truncate_space(struct space *old_space, struct space *new_space) vy_index_swap(old_index, new_index); new_index->is_dropped = old_index->is_dropped; new_index->truncate_count = old_index->truncate_count; - vy_scheduler_add_index(env->scheduler, new_index); + if (!new_index->is_dropped) { + vy_scheduler_remove_index(env->scheduler, old_index); + vy_scheduler_add_index(env->scheduler, new_index); + } continue; } @@ -4759,12 +4811,14 @@ vy_commit_truncate_space(struct space *old_space, struct space *new_space) return; /* - * Mark old indexes as dropped. After this point no task can - * be scheduled or completed for any of them (only aborted). + * Mark old indexes as dropped and remove them from the scheduler. + * After this point no task can be scheduled or completed for any + * of them (only aborted). */ for (uint32_t i = 0; i < index_count; i++) { struct vy_index *index = vy_index(old_space->index[i]); index->is_dropped = true; + vy_scheduler_remove_index(env->scheduler, index); } /* @@ -4800,9 +4854,6 @@ vy_commit_truncate_space(struct space *old_space, struct space *new_space) */ for (uint32_t i = 0; i < index_count; i++) { struct vy_index *index = vy_index(new_space->index[i]); - struct vy_range *range = vy_range_tree_first(index->tree); - - vy_scheduler_add_range(env->scheduler, range); vy_scheduler_add_index(env->scheduler, index); } } @@ -4900,6 +4951,7 @@ vy_index_new(struct vy_env *e, struct index_def *user_index_def, vy_cache_create(&index->cache, &e->cache_env, key_def); rlist_create(&index->sealed); vy_range_tree_new(index->tree); + vy_range_heap_create(&index->range_heap); rlist_create(&index->runs); read_set_new(&index->read_set); index->pk = pk; @@ -4909,6 +4961,7 @@ vy_index_new(struct vy_env *e, struct index_def *user_index_def, tuple_format_ref(index->space_format, 1); index->space_index_count = space->index_count; index->in_dump.pos = UINT32_MAX; + index->in_compact.pos = UINT32_MAX; index->space_id = user_index_def->space_id; index->id = user_index_def->iid; index->opts = user_index_def->opts; @@ -5084,16 +5137,7 @@ static struct vy_range * vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg) { (void)t; - struct vy_scheduler *scheduler = arg; - - if (range->in_compact.pos != UINT32_MAX) { - /* - * The range could have already been removed - * by vy_schedule(). - */ - vy_scheduler_remove_range(scheduler, range); - } - + (void)arg; struct vy_slice *slice; rlist_foreach_entry(slice, &range->slices, in_range) vy_slice_wait_pinned(slice); @@ -5107,18 +5151,12 @@ vy_index_delete(struct vy_index *index) struct vy_scheduler *scheduler = index->env->scheduler; assert(index->refs == 0); + assert(index->in_dump.pos == UINT32_MAX); + assert(index->in_compact.pos = UINT32_MAX); if (index->pk != NULL) vy_index_unref(index->pk); - if (index->in_dump.pos != UINT32_MAX) { - /* - * The index could have already been removed - * by vy_schedule(). - */ - vy_scheduler_remove_index(scheduler, index); - } - /* Delete all in-memory trees. */ assert(index->mem != NULL); vy_scheduler_remove_mem(scheduler, index->mem); @@ -5131,8 +5169,8 @@ vy_index_delete(struct vy_index *index) } read_set_iter(&index->read_set, NULL, read_set_delete_cb, NULL); - vy_range_tree_iter(index->tree, NULL, - vy_range_tree_free_cb, scheduler); + vy_range_tree_iter(index->tree, NULL, vy_range_tree_free_cb, NULL); + vy_range_heap_destroy(&index->range_heap); tuple_format_ref(index->surrogate_format, -1); tuple_format_ref(index->space_format_with_colmask, -1); tuple_format_ref(index->upsert_format, -1); -- GitLab