diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 4d807737e1f38af0059e741aabad87d7dd95cf06..4141a105768e7526dfe23dd11c80fdf61f22cc7f 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -1304,22 +1304,27 @@ struct vy_scheduler { */ int64_t generation; /** - * Target generation for checkpoint. The scheduler will force - * dumping of all in-memory trees whose generation is less. + * Generation of in-memory data currently being dumped. + * + * If @dump_generation < @generation, the scheduler is dumping + * in-memory trees created at @dump_generation. When all such + * trees have been dumped, it bumps @dump_generation and frees + * memory. + * + * If @dump_generation == @generation, dump have been completed + * and the scheduler won't schedule a dump task until @generation + * is bumped, which may happen either on exceeding the memory + * quota or on checkpoint. + * + * Throughout the code, a process of dumping all in-memory trees + * at @dump_generation is called 'dump round'. */ - int64_t checkpoint_generation; - /** Time of dump start. */ + int64_t dump_generation; + /** Number of dump tasks that are currently in progress. */ + int dump_task_count; + /** Time when the current dump round started. */ ev_tstamp dump_start; - /** - * List of all in-memory trees, scheduled for dump. - * Older trees are closer to the tail of the list. - */ - struct rlist dump_fifo; - /** - * Signaled on dump completion, i.e. as soon as all in-memory - * trees whose generation is less than the current generation - * have been dumped. Also signaled on any scheduler failure. - */ + /** Signaled on dump round completion. */ struct ipc_cond dump_cond; }; @@ -1334,11 +1339,9 @@ vy_scheduler_pin_index(struct vy_scheduler *, struct vy_index *); static void vy_scheduler_unpin_index(struct vy_scheduler *, struct vy_index *); static void -vy_scheduler_add_mem(struct vy_scheduler *scheduler, struct vy_mem *mem); +vy_scheduler_trigger_dump(struct vy_scheduler *); static void -vy_scheduler_remove_mem(struct vy_scheduler *scheduler, struct vy_mem *mem); -static bool -vy_scheduler_needs_dump(struct vy_scheduler *scheduler); +vy_scheduler_complete_dump(struct vy_scheduler *); static void vy_index_add_range(struct vy_index *index, struct vy_range *range) @@ -1381,8 +1384,6 @@ vy_index_rotate_mem(struct vy_index *index) rlist_add_entry(&index->sealed, index->mem, in_sealed); index->mem = mem; index->mem_list_version++; - - vy_scheduler_add_mem(scheduler, mem); return 0; } @@ -2258,12 +2259,6 @@ struct vy_task { struct vy_run *new_run; /** Write iterator producing statements for the new run. */ struct vy_stmt_stream *wi; - /** - * The current generation at the time of task start. - * On success a dump task dumps all in-memory trees - * whose generation is less. - */ - int64_t generation; /** * First (newest) and last (oldest) slices to compact. * @@ -2477,12 +2472,11 @@ vy_task_dump_complete(struct vy_task *task) * Delete dumped in-memory trees. */ rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { - if (mem->generation >= task->generation) + if (mem->generation > scheduler->dump_generation) continue; rlist_del_entry(mem, in_sealed); vy_stmt_counter_sub(&index->stat.memory.count, &mem->count); vy_stmt_counter_add(&index->stat.disk.dump.in, &mem->count); - vy_scheduler_remove_mem(scheduler, mem); vy_mem_delete(mem); } index->mem_list_version++; @@ -2498,6 +2492,11 @@ vy_task_dump_complete(struct vy_task *task) if (index->id != 0) vy_scheduler_unpin_index(scheduler, index->pk); + assert(scheduler->dump_task_count > 0); + scheduler->dump_task_count--; + + vy_scheduler_complete_dump(scheduler); + say_info("%s: dump completed", vy_index_name(index)); return 0; @@ -2537,13 +2536,16 @@ vy_task_dump_abort(struct vy_task *task, bool in_shutdown) if (index->id != 0) vy_scheduler_unpin_index(scheduler, index->pk); + + assert(scheduler->dump_task_count > 0); + scheduler->dump_task_count--; } /** * Create a task to dump an index. * * On success the task is supposed to dump all in-memory - * trees older than @scheduler->generation. + * trees created at @scheduler->dump_generation. */ static int vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) @@ -2556,12 +2558,11 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) struct tx_manager *xm = index->env->xm; struct vy_scheduler *scheduler = index->env->scheduler; - int64_t generation = scheduler->generation; assert(!index->is_dropped); assert(!index->is_dumping); assert(index->pin_count == 0); - assert(vy_index_generation(index) < generation); + assert(vy_index_generation(index) == scheduler->dump_generation); struct errinj *inj = errinj(ERRINJ_VY_INDEX_DUMP, ERRINJ_INT); if (inj != NULL && inj->iparam == (int)index->id) { @@ -2570,7 +2571,7 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) } /* Rotate the active tree if it needs to be dumped. */ - if (index->mem->generation < generation && + if (index->mem->generation == scheduler->dump_generation && vy_index_rotate_mem(index) != 0) goto err; @@ -2582,7 +2583,7 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) size_t max_output_count = 0; struct vy_mem *mem, *next_mem; rlist_foreach_entry_safe(mem, &index->sealed, in_sealed, next_mem) { - if (mem->generation >= generation) + if (mem->generation > scheduler->dump_generation) continue; vy_mem_wait_pinned(mem); if (mem->tree.size == 0) { @@ -2593,7 +2594,6 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) vy_stmt_counter_sub(&index->stat.memory.count, &mem->count); rlist_del_entry(mem, in_sealed); - vy_scheduler_remove_mem(scheduler, mem); vy_mem_delete(mem); continue; } @@ -2604,6 +2604,7 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) if (max_output_count == 0) { /* Nothing to do, pick another index. */ vy_scheduler_update_index(scheduler, index); + vy_scheduler_complete_dump(scheduler); return 0; } @@ -2627,7 +2628,7 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) if (wi == NULL) goto err_wi; rlist_foreach_entry(mem, &index->sealed, in_sealed) { - if (mem->generation >= generation) + if (mem->generation > scheduler->dump_generation) continue; if (vy_write_iterator_add_mem(wi, mem) != 0) goto err_wi_sub; @@ -2635,7 +2636,6 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) task->new_run = new_run; task->wi = wi; - task->generation = generation; task->max_output_count = max_output_count; task->bloom_fpr = index->opts.bloom_fpr; task->page_size = index->opts.page_size; @@ -2655,6 +2655,8 @@ vy_task_dump_new(struct vy_index *index, struct vy_task **p_task) vy_scheduler_pin_index(scheduler, index->pk); } + scheduler->dump_task_count++; + say_info("%s: dump started", vy_index_name(index)); *p_task = task; return 0; @@ -2999,7 +3001,6 @@ vy_scheduler_new(struct vy_env *env) } tt_pthread_mutex_init(&scheduler->mutex, NULL); diag_create(&scheduler->diag); - rlist_create(&scheduler->dump_fifo); ipc_cond_create(&scheduler->dump_cond); vclock_create(&scheduler->last_checkpoint); scheduler->env = env; @@ -3116,20 +3117,82 @@ vy_scheduler_peek_dump(struct vy_scheduler *scheduler, struct vy_task **ptask) { retry: *ptask = NULL; - if (!vy_scheduler_needs_dump(scheduler)) - return 0; + bool dump_in_progress = (scheduler->dump_generation < + scheduler->generation); + /* + * Look up the oldest index eligible for dump. + */ struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); - if (pn == NULL) - return 0; /* nothing to do */ + if (pn == NULL) { + /* + * There is no vinyl index and so no task to schedule. + * If a dump round is in progress, complete it. + */ + if (dump_in_progress) + vy_scheduler_complete_dump(scheduler); + return 0; + } struct vy_index *index = container_of(pn, struct vy_index, in_dump); - if (index->is_dumping || index->pin_count > 0 || - vy_index_generation(index) == scheduler->generation) - return 0; /* nothing to do */ - if (vy_task_dump_new(index, ptask) != 0) - return -1; - if (*ptask == NULL) - goto retry; /* index dropped or all mems empty */ - return 0; /* new task */ + if (index->is_dumping || index->pin_count > 0) { + /* All eligible indexes are already being dumped. */ + return 0; + } + if (dump_in_progress && + vy_index_generation(index) == scheduler->dump_generation) { + /* + * Dump is in progress and there is an index that + * contains data that must be dumped at the current + * round. Try to create a task for it. + */ + if (vy_task_dump_new(index, ptask) != 0) + return -1; + if (*ptask != NULL) + return 0; /* new task */ + /* + * All in-memory trees eligible for dump were empty + * and so were deleted without involving a worker + * thread. Check another index. + */ + goto retry; + } + if (dump_in_progress) { + /* + * Dump is in progress, but there is no index eligible + * for dump in the heap. Wait until the current round + * is complete. + */ + assert(scheduler->dump_task_count > 0); + return 0; + } + /* + * Nothing being dumped right now. Consider triggering + * dump if memory quota is exceeded. + */ + if (scheduler->checkpoint_in_progress) { + /* + * Do not trigger another dump until checkpoint + * is complete so as to make sure no statements + * inserted after WAL rotation are written to + * the snapshot. + */ + return 0; + } + if (!vy_quota_is_exceeded(&scheduler->env->quota)) { + /* + * Memory consumption is below the watermark, + * nothing to do. + */ + return 0; + } + if (lsregion_used(&scheduler->env->allocator) == 0) { + /* + * Quota must be exceeded by a pending transaction, + * there's nothing we can do about that. + */ + return 0; + } + vy_scheduler_trigger_dump(scheduler); + goto retry; } /** @@ -3460,138 +3523,79 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler) } } -/** - * Return true if there are in-memory trees that need to - * be dumped (are older than the current generation). - */ -static bool -vy_scheduler_dump_in_progress(struct vy_scheduler *scheduler) -{ - if (rlist_empty(&scheduler->dump_fifo)) - return false; - - struct vy_mem *mem = rlist_last_entry(&scheduler->dump_fifo, - struct vy_mem, in_dump_fifo); - if (mem->generation == scheduler->generation) - return false; - - assert(mem->generation < scheduler->generation); - return true; - -} - -/** Called to trigger memory dump. */ +/** Trigger dump of all currently existing in-memory trees. */ static void vy_scheduler_trigger_dump(struct vy_scheduler *scheduler) { - /* - * Increment the generation to trigger dump of - * all in-memory trees. - */ + assert(scheduler->dump_generation <= scheduler->generation); + if (scheduler->generation == scheduler->dump_generation) { + /* + * We are about to start a new dump round. + * Remember the current time so that we can update + * dump bandwidth when the dump round is complete + * (see vy_scheduler_complete_dump()). + */ + scheduler->dump_start = ev_now(loop()); + } scheduler->generation++; - scheduler->dump_start = ev_now(loop()); } -/** Called on memory dump completion. */ +/** + * Check whether the current dump round is complete. + * If it is, free memory and proceed to the next dump round. + */ static void vy_scheduler_complete_dump(struct vy_scheduler *scheduler) { + assert(scheduler->dump_generation < scheduler->generation); + + if (scheduler->dump_task_count > 0) { + /* + * There are still dump tasks in progress, + * the dump round can't be over yet. + */ + return; + } + + int64_t min_generation = scheduler->generation; + struct heap_node *pn = vy_dump_heap_top(&scheduler->dump_heap); + if (pn != NULL) { + struct vy_index *index; + index = container_of(pn, struct vy_index, in_dump); + min_generation = vy_index_generation(index); + } + if (min_generation == scheduler->dump_generation) { + /* + * There are still indexes that must be dumped + * during the current dump round. + */ + return; + } + /* - * All old in-memory trees have been dumped. - * Free memory, release quota, and signal - * dump completion. + * The oldest index data is newer than @dump_generation, + * so the current dump round has been finished. + * Free memory, release quota, and signal dump completion. */ struct lsregion *allocator = &scheduler->env->allocator; struct vy_quota *quota = &scheduler->env->quota; size_t mem_used_before = lsregion_used(allocator); - lsregion_gc(allocator, scheduler->generation - 1); + lsregion_gc(allocator, min_generation - 1); size_t mem_used_after = lsregion_used(allocator); assert(mem_used_after <= mem_used_before); size_t mem_dumped = mem_used_before - mem_used_after; vy_quota_release(quota, mem_dumped); + + scheduler->dump_generation = min_generation; ipc_cond_signal(&scheduler->dump_cond); /* Account dump bandwidth. */ struct vy_stat *stat = scheduler->env->stat; - ev_tstamp dump_duration = ev_now(loop()) - scheduler->dump_start; + ev_tstamp now = ev_now(loop()); + ev_tstamp dump_duration = now - scheduler->dump_start; if (dump_duration > 0) histogram_collect(stat->dump_bw, mem_dumped / dump_duration); -} - -/** Check if memory dump is required. */ -static bool -vy_scheduler_needs_dump(struct vy_scheduler *scheduler) -{ - if (vy_scheduler_dump_in_progress(scheduler)) { - /* - * There are old in-memory trees to be dumped. - * Do not increase the generation until all of - * them are dumped to guarantee dump consistency. - */ - return true; - } - - if (scheduler->checkpoint_in_progress) { - /* - * If checkpoint is in progress, force dumping - * all in-memory data that need to be included - * in the snapshot. - */ - if (scheduler->generation < scheduler->checkpoint_generation) - goto trigger_dump; - /* - * Do not trigger another dump until checkpoint - * is complete so as to make sure no statements - * inserted after WAL rotation are written to - * the snapshot. - */ - return false; - } - - if (!vy_quota_is_exceeded(&scheduler->env->quota)) { - /* - * Memory consumption is below the watermark, - * nothing to do. - */ - return false; - } - - if (lsregion_used(&scheduler->env->allocator) == 0) { - /* - * Quota must be exceeded by a pending transaction, - * there's nothing we can do about that. - */ - return false; - } - -trigger_dump: - vy_scheduler_trigger_dump(scheduler); - return true; -} - -static void -vy_scheduler_add_mem(struct vy_scheduler *scheduler, struct vy_mem *mem) -{ - assert(mem->generation <= scheduler->generation); - assert(rlist_empty(&mem->in_dump_fifo)); - rlist_add_entry(&scheduler->dump_fifo, mem, in_dump_fifo); -} - -static void -vy_scheduler_remove_mem(struct vy_scheduler *scheduler, struct vy_mem *mem) -{ - assert(mem->generation <= scheduler->generation); - assert(!rlist_empty(&mem->in_dump_fifo)); - rlist_del_entry(mem, in_dump_fifo); - - if (mem->generation < scheduler->generation && - !vy_scheduler_dump_in_progress(scheduler)) { - /* - * The last in-memory tree left from the previous - * generation has just been deleted, complete dump. - */ - vy_scheduler_complete_dump(scheduler); - } + scheduler->dump_start = now; } /* @@ -3619,8 +3623,8 @@ vy_begin_checkpoint(struct vy_env *env) return -1; } + vy_scheduler_trigger_dump(scheduler); scheduler->checkpoint_in_progress = true; - scheduler->checkpoint_generation = scheduler->generation + 1; ipc_cond_signal(&scheduler->scheduler_cond); return 0; } @@ -3636,11 +3640,10 @@ vy_wait_checkpoint(struct vy_env *env, struct vclock *vclock) assert(scheduler->checkpoint_in_progress); /* - * Wait until all in-memory trees whose generation is - * less than checkpoint_generation have been dumped. + * Wait until all in-memory trees created before + * checkpoint started have been dumped. */ - while (scheduler->generation < scheduler->checkpoint_generation || - vy_scheduler_dump_in_progress(scheduler)) { + while (scheduler->dump_generation < scheduler->generation) { if (scheduler->is_throttled) { /* A dump error occurred, abort checkpoint. */ assert(!diag_is_empty(&scheduler->diag)); @@ -4321,8 +4324,6 @@ vy_index_new(struct vy_env *e, struct index_def *user_index_def, index->space_id = user_index_def->space_id; index->id = user_index_def->iid; index->opts = user_index_def->opts; - - vy_scheduler_add_mem(scheduler, index->mem); return index; fail_mem: @@ -4504,8 +4505,6 @@ vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg) void vy_index_delete(struct vy_index *index) { - struct vy_scheduler *scheduler = index->env->scheduler; - assert(index->refs == 0); assert(index->in_dump.pos == UINT32_MAX); assert(index->in_compact.pos = UINT32_MAX); @@ -4515,12 +4514,10 @@ vy_index_delete(struct vy_index *index) /* Delete all in-memory trees. */ assert(index->mem != NULL); - vy_scheduler_remove_mem(scheduler, index->mem); vy_mem_delete(index->mem); while (!rlist_empty(&index->sealed)) { struct vy_mem *mem = rlist_shift_entry(&index->sealed, struct vy_mem, in_sealed); - vy_scheduler_remove_mem(scheduler, mem); vy_mem_delete(mem); } diff --git a/src/box/vy_mem.c b/src/box/vy_mem.c index d80a301c8449d252e6bb140ccc2f8f5290537318..3ad8c255c800371958cf369c7a45feaebbe1ab60 100644 --- a/src/box/vy_mem.c +++ b/src/box/vy_mem.c @@ -86,7 +86,6 @@ vy_mem_new(struct lsregion *allocator, int64_t generation, vy_mem_tree_extent_alloc, vy_mem_tree_extent_free, index); rlist_create(&index->in_sealed); - rlist_create(&index->in_dump_fifo); ipc_cond_create(&index->pin_cond); return index; } diff --git a/src/box/vy_mem.h b/src/box/vy_mem.h index c305ad80040d4b1e0b2e583cee9c588f12ed56a7..3c193e48145b94ad8f9a022631c0a920e1c7f6e0 100644 --- a/src/box/vy_mem.h +++ b/src/box/vy_mem.h @@ -132,12 +132,6 @@ vy_mem_tree_cmp_key(const struct tuple *a, struct tree_mem_key *key, struct vy_mem { /** Link in range->sealed list. */ struct rlist in_sealed; - /* - * Link in scheduler->dump_fifo list. The mem is - * added to the list when it has the first statement - * allocated in it. - */ - struct rlist in_dump_fifo; /** BPS tree */ struct vy_mem_tree tree; /** Number of statements. */