From 32da4d7cec893e9231af978c67abde6f37477581 Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Sat, 28 Oct 2017 17:11:41 +0300
Subject: [PATCH] vinyl: remove dependency of scheduler on environment

Instead of storing a pointer to vy_env in vy_scheduler, let's:

 - Add pointers to tx_manager::read_views and vy_env::run_env to
   vy_scheduler struct. They are needed to create a write iterator
   for a dump/compaction task.

 - Add a callback to struct vy_scheduler that is called upon dump
   completion to free memory. This allows us to eliminate accesses
   vy_env::quota and vy_env::allocator from vy_scheduler code.

 - Move the assert that assures that the scheduler isn't started during
   local recovery from vy_scheduler_f() to vy_env_quota_exceeded_cb()
   callback so that we don't need to access vy_env::status from the
   scheduler code. Note, after this change we have to set vy_env::status
   to VINYL_ONLINE before calling vy_quota_set_limit(), because the
   latter might schedule a dump.

 - Check if we have anything to dump from vy_begin_checkpoint() instead
   of vy_scheduler_begin_checkpoint().

This will allow us to isolate the scheduler code in a separate file.
---
 src/box/vinyl.c | 143 +++++++++++++++++++++++++++++-------------------
 1 file changed, 86 insertions(+), 57 deletions(-)

diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 0d53354b27..e33a10916b 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -385,9 +385,25 @@ vy_compact_heap_less(struct heap_node *a, struct heap_node *b)
 #undef HEAP_LESS
 #undef HEAP_NAME
 
+typedef void
+(*vy_scheduler_dump_complete_f)(int64_t dump_generation, double dump_duration,
+				void *arg);
+
 struct vy_scheduler {
+	/**
+	 * Function called by the scheduler upon dump round
+	 * completion. It is supposed to free memory released
+	 * by the dump.
+	 */
+	vy_scheduler_dump_complete_f dump_complete_cb;
+	/** Argument passed to dump_complete_cb(). */
+	void *cb_arg;
+	/** List of read views, see tx_manager::read_views. */
+	struct rlist *read_views;
+	/** Context needed for writing runs. */
+	struct vy_run_env *run_env;
+
 	pthread_mutex_t        mutex;
-	struct vy_env    *env;
 	heap_t dump_heap;
 	heap_t compact_heap;
 
@@ -460,8 +476,8 @@ struct vy_scheduler {
 	 *
 	 * If @dump_generation < @generation, the scheduler is dumping
 	 * in-memory trees created at @dump_generation. When all such
-	 * trees have been dumped, it bumps @dump_generation and frees
-	 * memory.
+	 * trees have been dumped, it bumps @dump_generation and calls
+	 * the dump_complete_cb(), which is supposed to free memory.
 	 *
 	 * If @dump_generation == @generation, dump have been completed
 	 * and the scheduler won't schedule a dump task until @generation
@@ -872,8 +888,6 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 		.abort = vy_task_dump_abort,
 	};
 
-	struct tx_manager *xm = scheduler->env->xm;
-
 	assert(!index->is_dropped);
 	assert(!index->is_dumping);
 	assert(index->pin_count == 0);
@@ -937,7 +951,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	bool is_last_level = (index->run_count == 0);
 	wi = vy_write_iterator_new(index->cmp_def, index->disk_format,
 				   index->upsert_format, index->id == 0,
-				   is_last_level, &xm->read_views);
+				   is_last_level, scheduler->read_views);
 	if (wi == NULL)
 		goto err_wi;
 	rlist_foreach_entry(mem, &index->sealed, in_sealed) {
@@ -1171,7 +1185,6 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index,
 		.abort = vy_task_compact_abort,
 	};
 
-	struct tx_manager *xm = scheduler->env->xm;
 	struct heap_node *range_node;
 	struct vy_range *range;
 
@@ -1201,7 +1214,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	bool is_last_level = (range->compact_priority == range->slice_count);
 	wi = vy_write_iterator_new(index->cmp_def, index->disk_format,
 				   index->upsert_format, index->id == 0,
-				   is_last_level, &xm->read_views);
+				   is_last_level, scheduler->read_views);
 	if (wi == NULL)
 		goto err_wi;
 
@@ -1209,7 +1222,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index,
 	int n = range->compact_priority;
 	rlist_foreach_entry(slice, &range->slices, in_range) {
 		if (vy_write_iterator_new_slice(wi, slice,
-						&scheduler->env->run_env) != 0)
+				scheduler->run_env) != 0)
 			goto err_wi_sub;
 
 		task->max_output_count += slice->count.rows;
@@ -1286,7 +1299,9 @@ vy_scheduler_async_cb(ev_loop *loop, struct ev_async *watcher, int events)
 }
 
 static struct vy_scheduler *
-vy_scheduler_new(struct vy_env *env)
+vy_scheduler_new(int write_threads,
+		 vy_scheduler_dump_complete_f dump_complete_cb, void *cb_arg,
+		 struct vy_run_env *run_env, struct rlist *read_views)
 {
 	struct vy_scheduler *scheduler = calloc(1, sizeof(*scheduler));
 	if (scheduler == NULL) {
@@ -1294,10 +1309,14 @@ vy_scheduler_new(struct vy_env *env)
 			 "struct");
 		return NULL;
 	}
+	scheduler->dump_complete_cb = dump_complete_cb;
+	scheduler->cb_arg = cb_arg;
+	scheduler->read_views = read_views;
+	scheduler->run_env = run_env;
+	scheduler->worker_pool_size = write_threads;
 	tt_pthread_mutex_init(&scheduler->mutex, NULL);
 	diag_create(&scheduler->diag);
 	fiber_cond_create(&scheduler->dump_cond);
-	scheduler->env = env;
 	vy_compact_heap_create(&scheduler->compact_heap);
 	vy_dump_heap_create(&scheduler->dump_heap);
 	tt_pthread_cond_init(&scheduler->worker_cond, NULL);
@@ -1561,7 +1580,6 @@ static int
 vy_scheduler_f(va_list va)
 {
 	struct vy_scheduler *scheduler = va_arg(va, struct vy_scheduler *);
-	MAYBE_UNUSED struct vy_env *env = scheduler->env;
 
 	/*
 	 * Yield immediately, until the quota watermark is reached
@@ -1575,21 +1593,6 @@ vy_scheduler_f(va_list va)
 	if (scheduler->scheduler == NULL)
 		return 0; /* destroyed */
 
-	/*
-	 * The scheduler must be disabled during local recovery so as
-	 * not to distort data stored on disk. Not that we really need
-	 * it anyway, because the memory footprint is limited by the
-	 * memory limit from the previous run.
-	 *
-	 * On the contrary, remote recovery does require the scheduler
-	 * to be up and running, because the amount of data received
-	 * when bootstrapping from a remote master is only limited by
-	 * its disk size, which can exceed the size of available
-	 * memory by orders of magnitude.
-	 */
-	assert(env->status != VINYL_INITIAL_RECOVERY_LOCAL &&
-	       env->status != VINYL_FINAL_RECOVERY_LOCAL);
-
 	vy_scheduler_start_workers(scheduler);
 
 	while (scheduler->scheduler != NULL) {
@@ -1734,7 +1737,6 @@ vy_scheduler_start_workers(struct vy_scheduler *scheduler)
 
 	/* Start worker threads */
 	scheduler->is_worker_pool_running = true;
-	scheduler->worker_pool_size = scheduler->env->write_threads;
 	/* One thread is reserved for dumps, see vy_schedule(). */
 	assert(scheduler->worker_pool_size >= 2);
 	scheduler->workers_available = scheduler->worker_pool_size;
@@ -1772,7 +1774,6 @@ vy_scheduler_stop_workers(struct vy_scheduler *scheduler)
 	ev_async_stop(scheduler->loop, &scheduler->scheduler_async);
 	free(scheduler->worker_pool);
 	scheduler->worker_pool = NULL;
-	scheduler->worker_pool_size = 0;
 
 	/* Abort all pending tasks. */
 	struct vy_task *task, *next;
@@ -1843,28 +1844,16 @@ vy_scheduler_complete_dump(struct vy_scheduler *scheduler)
 
 	/*
 	 * The oldest index data is newer than @dump_generation,
-	 * so the current dump round has been finished.
-	 * Free memory, release quota, and signal dump completion.
+	 * so the current dump round has been finished. Notify
+	 * about dump completion.
 	 */
-	struct lsregion *allocator = &scheduler->env->stmt_env.allocator;
-	struct vy_quota *quota = &scheduler->env->quota;
-	size_t mem_used_before = lsregion_used(allocator);
-	lsregion_gc(allocator, min_generation - 1);
-	size_t mem_used_after = lsregion_used(allocator);
-	assert(mem_used_after <= mem_used_before);
-	size_t mem_dumped = mem_used_before - mem_used_after;
-	vy_quota_release(quota, mem_dumped);
-
-	scheduler->dump_generation = min_generation;
-	fiber_cond_signal(&scheduler->dump_cond);
-
-	/* Account dump bandwidth. */
-	struct vy_stat *stat = scheduler->env->stat;
 	ev_tstamp now = ev_monotonic_now(loop());
 	ev_tstamp dump_duration = now - scheduler->dump_start;
-	if (dump_duration > 0)
-		histogram_collect(stat->dump_bw, mem_dumped / dump_duration);
 	scheduler->dump_start = now;
+	scheduler->dump_generation = min_generation;
+	scheduler->dump_complete_cb(min_generation - 1, dump_duration,
+				    scheduler->cb_arg);
+	fiber_cond_signal(&scheduler->dump_cond);
 }
 
 /**
@@ -1876,14 +1865,6 @@ vy_scheduler_begin_checkpoint(struct vy_scheduler *scheduler)
 {
 	assert(!scheduler->checkpoint_in_progress);
 
-	/*
-	 * The scheduler starts worker threads upon the first wakeup.
-	 * To avoid starting the threads for nothing, do not wake it
-	 * up if Vinyl is not used.
-	 */
-	if (lsregion_used(&scheduler->env->stmt_env.allocator) == 0)
-		return 0;
-
 	/*
 	 * If the scheduler is throttled due to errors, do not wait
 	 * until it wakes up as it may take quite a while. Instead
@@ -3837,6 +3818,22 @@ static void
 vy_env_quota_exceeded_cb(struct vy_quota *quota)
 {
 	struct vy_env *env = container_of(quota, struct vy_env, quota);
+
+	/*
+	 * The scheduler must be disabled during local recovery so as
+	 * not to distort data stored on disk. Not that we really need
+	 * it anyway, because the memory footprint is limited by the
+	 * memory limit from the previous run.
+	 *
+	 * On the contrary, remote recovery does require the scheduler
+	 * to be up and running, because the amount of data received
+	 * when bootstrapping from a remote master is only limited by
+	 * its disk size, which can exceed the size of available
+	 * memory by orders of magnitude.
+	 */
+	assert(env->status != VINYL_INITIAL_RECOVERY_LOCAL &&
+	       env->status != VINYL_FINAL_RECOVERY_LOCAL);
+
 	if (lsregion_used(&env->stmt_env.allocator) == 0) {
 		/*
 		 * The memory limit has been exceeded, but there's
@@ -3849,6 +3846,28 @@ vy_env_quota_exceeded_cb(struct vy_quota *quota)
 	vy_scheduler_trigger_dump(env->scheduler);
 }
 
+static void
+vy_env_dump_complete_cb(int64_t dump_generation, double dump_duration,
+			void *arg)
+{
+	struct vy_env *env = arg;
+
+	/* Free memory and release quota. */
+	struct lsregion *allocator = &env->stmt_env.allocator;
+	struct vy_quota *quota = &env->quota;
+	size_t mem_used_before = lsregion_used(allocator);
+	lsregion_gc(allocator, dump_generation);
+	size_t mem_used_after = lsregion_used(allocator);
+	assert(mem_used_after <= mem_used_before);
+	size_t mem_dumped = mem_used_before - mem_used_after;
+	vy_quota_release(quota, mem_dumped);
+
+	/* Account dump bandwidth. */
+	if (dump_duration > 0)
+		histogram_collect(env->stat->dump_bw,
+				  mem_dumped / dump_duration);
+}
+
 static struct vy_squash_queue *
 vy_squash_queue_new(void);
 static void
@@ -3884,7 +3903,9 @@ vy_env_new(const char *path, size_t memory, size_t cache, int read_threads,
 	e->stat = vy_stat_new();
 	if (e->stat == NULL)
 		goto error_stat;
-	e->scheduler = vy_scheduler_new(e);
+	e->scheduler = vy_scheduler_new(e->write_threads,
+					vy_env_dump_complete_cb, e,
+					&e->run_env, &e->xm->read_views);
 	if (e->scheduler == NULL)
 		goto error_sched;
 	e->squash_queue = vy_squash_queue_new();
@@ -3969,6 +3990,13 @@ int
 vy_begin_checkpoint(struct vy_env *env)
 {
 	assert(env->status == VINYL_ONLINE);
+	/*
+	 * The scheduler starts worker threads upon the first wakeup.
+	 * To avoid starting the threads for nothing, do not wake it
+	 * up if Vinyl is not used.
+	 */
+	if (lsregion_used(&env->stmt_env.allocator) == 0)
+		return 0;
 	if (vy_scheduler_begin_checkpoint(env->scheduler) != 0)
 		return -1;
 	return 0;
@@ -4071,9 +4099,11 @@ vy_end_recovery(struct vy_env *e)
 		vy_recovery_delete(e->recovery);
 		e->recovery = NULL;
 		e->recovery_vclock = NULL;
+		e->status = VINYL_ONLINE;
 		vy_quota_set_limit(&e->quota, e->memory);
 		break;
 	case VINYL_FINAL_RECOVERY_REMOTE:
+		e->status = VINYL_ONLINE;
 		break;
 	default:
 		unreachable();
@@ -4085,7 +4115,6 @@ vy_end_recovery(struct vy_env *e)
 	 */
 	if (e->index_env.index_count > 0)
 		vy_run_env_enable_coio(&e->run_env, e->read_threads);
-	e->status = VINYL_ONLINE;
 	return 0;
 }
 
-- 
GitLab