diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 0d53354b275bc1afaebf55a3e9db867db71537ff..e33a10916bb2a628d6bb8b6d10ef78c9f1edcec0 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -385,9 +385,25 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b) #undef HEAP_LESS #undef HEAP_NAME +typedef void +(*vy_scheduler_dump_complete_f)(int64_t dump_generation, double dump_duration, + void *arg); + struct vy_scheduler { + /** + * Function called by the scheduler upon dump round + * completion. It is supposed to free memory released + * by the dump. + */ + vy_scheduler_dump_complete_f dump_complete_cb; + /** Argument passed to dump_complete_cb(). */ + void *cb_arg; + /** List of read views, see tx_manager::read_views. */ + struct rlist *read_views; + /** Context needed for writing runs. */ + struct vy_run_env *run_env; + pthread_mutex_t mutex; - struct vy_env *env; heap_t dump_heap; heap_t compact_heap; @@ -460,8 +476,8 @@ struct vy_scheduler { * * If @dump_generation < @generation, the scheduler is dumping * in-memory trees created at @dump_generation. When all such - * trees have been dumped, it bumps @dump_generation and frees - * memory. + * trees have been dumped, it bumps @dump_generation and calls + * the dump_complete_cb(), which is supposed to free memory. * * If @dump_generation == @generation, dump have been completed * and the scheduler won't schedule a dump task until @generation @@ -872,8 +888,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, .abort = vy_task_dump_abort, }; - struct tx_manager *xm = scheduler->env->xm; - assert(!index->is_dropped); assert(!index->is_dumping); assert(index->pin_count == 0); @@ -937,7 +951,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (index->run_count == 0); wi = vy_write_iterator_new(index->cmp_def, index->disk_format, index->upsert_format, index->id == 0, - is_last_level, &xm->read_views); + is_last_level, scheduler->read_views); if (wi == NULL) goto err_wi; rlist_foreach_entry(mem, &index->sealed, in_sealed) { @@ -1171,7 +1185,6 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, .abort = vy_task_compact_abort, }; - struct tx_manager *xm = scheduler->env->xm; struct heap_node *range_node; struct vy_range *range; @@ -1201,7 +1214,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (range->compact_priority == range->slice_count); wi = vy_write_iterator_new(index->cmp_def, index->disk_format, index->upsert_format, index->id == 0, - is_last_level, &xm->read_views); + is_last_level, scheduler->read_views); if (wi == NULL) goto err_wi; @@ -1209,7 +1222,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, int n = range->compact_priority; rlist_foreach_entry(slice, &range->slices, in_range) { if (vy_write_iterator_new_slice(wi, slice, - &scheduler->env->run_env) != 0) + scheduler->run_env) != 0) goto err_wi_sub; task->max_output_count += slice->count.rows; @@ -1286,7 +1299,9 @@ vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events) } static struct vy_scheduler * -vy_scheduler_new(struct vy_env *env) +vy_scheduler_new(int write_threads, + vy_scheduler_dump_complete_f dump_complete_cb, void *cb_arg, + struct vy_run_env *run_env, struct rlist *read_views) { struct vy_scheduler *scheduler = calloc(1, sizeof(*scheduler)); if (scheduler == NULL) { @@ -1294,10 +1309,14 @@ vy_scheduler_new(struct vy_env *env) "struct"); return NULL; } + scheduler->dump_complete_cb = dump_complete_cb; + scheduler->cb_arg = cb_arg; + scheduler->read_views = read_views; + scheduler->run_env = run_env; + scheduler->worker_pool_size = write_threads; tt_pthread_mutex_init(&scheduler->mutex, NULL); diag_create(&scheduler->diag); fiber_cond_create(&scheduler->dump_cond); - scheduler->env = env; vy_compact_heap_create(&scheduler->compact_heap); vy_dump_heap_create(&scheduler->dump_heap); tt_pthread_cond_init(&scheduler->worker_cond, NULL); @@ -1561,7 +1580,6 @@ static int vy_scheduler_f(va_list va) { struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *); - MAYBE_UNUSED struct vy_env *env = scheduler->env; /* * Yield immediately, until the quota watermark is reached @@ -1575,21 +1593,6 @@ vy_scheduler_f(va_list va) if (scheduler->scheduler == NULL) return 0; /* destroyed */ - /* - * The scheduler must be disabled during local recovery so as - * not to distort data stored on disk. Not that we really need - * it anyway, because the memory footprint is limited by the - * memory limit from the previous run. - * - * On the contrary, remote recovery does require the scheduler - * to be up and running, because the amount of data received - * when bootstrapping from a remote master is only limited by - * its disk size, which can exceed the size of available - * memory by orders of magnitude. - */ - assert(env->status != VINYL_INITIAL_RECOVERY_LOCAL && - env->status != VINYL_FINAL_RECOVERY_LOCAL); - vy_scheduler_start_workers(scheduler); while (scheduler->scheduler != NULL) { @@ -1734,7 +1737,6 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler) /* Start worker threads */ scheduler->is_worker_pool_running = true; - scheduler->worker_pool_size = scheduler->env->write_threads; /* One thread is reserved for dumps, see vy_schedule(). */ assert(scheduler->worker_pool_size >= 2); scheduler->workers_available = scheduler->worker_pool_size; @@ -1772,7 +1774,6 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler) ev_async_stop(scheduler->loop, &scheduler->scheduler_async); free(scheduler->worker_pool); scheduler->worker_pool = NULL; - scheduler->worker_pool_size = 0; /* Abort all pending tasks. */ struct vy_task *task, *next; @@ -1843,28 +1844,16 @@ vy_scheduler_complete_dump(struct vy_scheduler *scheduler) /* * The oldest index data is newer than @dump_generation, - * so the current dump round has been finished. - * Free memory, release quota, and signal dump completion. + * so the current dump round has been finished. Notify + * about dump completion. */ - struct lsregion *allocator = &scheduler->env->stmt_env.allocator; - struct vy_quota *quota = &scheduler->env->quota; - size_t mem_used_before = lsregion_used(allocator); - lsregion_gc(allocator, min_generation - 1); - size_t mem_used_after = lsregion_used(allocator); - assert(mem_used_after <= mem_used_before); - size_t mem_dumped = mem_used_before - mem_used_after; - vy_quota_release(quota, mem_dumped); - - scheduler->dump_generation = min_generation; - fiber_cond_signal(&scheduler->dump_cond); - - /* Account dump bandwidth. */ - struct vy_stat *stat = scheduler->env->stat; ev_tstamp now = ev_monotonic_now(loop()); ev_tstamp dump_duration = now - scheduler->dump_start; - if (dump_duration > 0) - histogram_collect(stat->dump_bw, mem_dumped / dump_duration); scheduler->dump_start = now; + scheduler->dump_generation = min_generation; + scheduler->dump_complete_cb(min_generation - 1, dump_duration, + scheduler->cb_arg); + fiber_cond_signal(&scheduler->dump_cond); } /** @@ -1876,14 +1865,6 @@ vy_scheduler_begin_checkpoint(struct vy_scheduler *scheduler) { assert(!scheduler->checkpoint_in_progress); - /* - * The scheduler starts worker threads upon the first wakeup. - * To avoid starting the threads for nothing, do not wake it - * up if Vinyl is not used. - */ - if (lsregion_used(&scheduler->env->stmt_env.allocator) == 0) - return 0; - /* * If the scheduler is throttled due to errors, do not wait * until it wakes up as it may take quite a while. Instead @@ -3837,6 +3818,22 @@ static void vy_env_quota_exceeded_cb(struct vy_quota *quota) { struct vy_env *env = container_of(quota, struct vy_env, quota); + + /* + * The scheduler must be disabled during local recovery so as + * not to distort data stored on disk. Not that we really need + * it anyway, because the memory footprint is limited by the + * memory limit from the previous run. + * + * On the contrary, remote recovery does require the scheduler + * to be up and running, because the amount of data received + * when bootstrapping from a remote master is only limited by + * its disk size, which can exceed the size of available + * memory by orders of magnitude. + */ + assert(env->status != VINYL_INITIAL_RECOVERY_LOCAL && + env->status != VINYL_FINAL_RECOVERY_LOCAL); + if (lsregion_used(&env->stmt_env.allocator) == 0) { /* * The memory limit has been exceeded, but there's @@ -3849,6 +3846,28 @@ vy_env_quota_exceeded_cb(struct vy_quota *quota) vy_scheduler_trigger_dump(env->scheduler); } +static void +vy_env_dump_complete_cb(int64_t dump_generation, double dump_duration, + void *arg) +{ + struct vy_env *env = arg; + + /* Free memory and release quota. */ + struct lsregion *allocator = &env->stmt_env.allocator; + struct vy_quota *quota = &env->quota; + size_t mem_used_before = lsregion_used(allocator); + lsregion_gc(allocator, dump_generation); + size_t mem_used_after = lsregion_used(allocator); + assert(mem_used_after <= mem_used_before); + size_t mem_dumped = mem_used_before - mem_used_after; + vy_quota_release(quota, mem_dumped); + + /* Account dump bandwidth. */ + if (dump_duration > 0) + histogram_collect(env->stat->dump_bw, + mem_dumped / dump_duration); +} + static struct vy_squash_queue * vy_squash_queue_new(void); static void @@ -3884,7 +3903,9 @@ vy_env_new(const char *path, size_t memory, size_t cache, int read_threads, e->stat = vy_stat_new(); if (e->stat == NULL) goto error_stat; - e->scheduler = vy_scheduler_new(e); + e->scheduler = vy_scheduler_new(e->write_threads, + vy_env_dump_complete_cb, e, + &e->run_env, &e->xm->read_views); if (e->scheduler == NULL) goto error_sched; e->squash_queue = vy_squash_queue_new(); @@ -3969,6 +3990,13 @@ int vy_begin_checkpoint(struct vy_env *env) { assert(env->status == VINYL_ONLINE); + /* + * The scheduler starts worker threads upon the first wakeup. + * To avoid starting the threads for nothing, do not wake it + * up if Vinyl is not used. + */ + if (lsregion_used(&env->stmt_env.allocator) == 0) + return 0; if (vy_scheduler_begin_checkpoint(env->scheduler) != 0) return -1; return 0; @@ -4071,9 +4099,11 @@ vy_end_recovery(struct vy_env *e) vy_recovery_delete(e->recovery); e->recovery = NULL; e->recovery_vclock = NULL; + e->status = VINYL_ONLINE; vy_quota_set_limit(&e->quota, e->memory); break; case VINYL_FINAL_RECOVERY_REMOTE: + e->status = VINYL_ONLINE; break; default: unreachable(); @@ -4085,7 +4115,6 @@ vy_end_recovery(struct vy_env *e) */ if (e->index_env.index_count > 0) vy_run_env_enable_coio(&e->run_env, e->read_threads); - e->status = VINYL_ONLINE; return 0; }