diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 4c84039ff32201b492621066e137d2f79a4bb1e0..c18d59c1129d7b829e99aa04490f3b12cfac7e3e 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -176,23 +176,6 @@ vy_buf_advance(struct vy_buf *b, size_t size)
 	b->p += size;
 }
 
-static int
-vy_buf_add(struct vy_buf *b, void *buf, size_t size)
-{
-	int rc = vy_buf_ensure(b, size);
-	if (unlikely(rc == -1))
-		return -1;
-	memcpy(b->p, buf, size);
-	vy_buf_advance(b, size);
-	return 0;
-}
-
-static int
-vy_buf_in(struct vy_buf *b, void *v) {
-	assert(b->s != NULL);
-	return (char*)v >= b->s && (char*)v < b->p;
-}
-
 static void*
 vy_buf_at(struct vy_buf *b, int size, int i) {
 	return b->s + size * i;
@@ -248,46 +231,6 @@ struct vy_iter {
 #define vy_iter_get(i) (i)->vif->get(i)
 #define vy_iter_next(i) (i)->vif->next(i)
 
-struct vy_bufiter {
-	struct vy_buf *buf;
-	int vsize;
-	void *v;
-};
-
-static void
-vy_bufiter_open(struct vy_bufiter *bi, struct vy_buf *buf, int vsize)
-{
-	bi->buf = buf;
-	bi->vsize = vsize;
-	bi->v = bi->buf->s;
-	if (bi->v != NULL && ! vy_buf_in(bi->buf, bi->v))
-		bi->v = NULL;
-}
-
-static int
-vy_bufiter_has(struct vy_bufiter *bi)
-{
-	return bi->v != NULL;
-}
-
-static void*
-vy_bufiterref_get(struct vy_bufiter *bi)
-{
-	if (unlikely(bi->v == NULL))
-		return NULL;
-	return *(void**)bi->v;
-}
-
-static void
-vy_bufiter_next(struct vy_bufiter *bi)
-{
-	if (unlikely(bi->v == NULL))
-		return;
-	bi->v = (char*)bi->v + bi->vsize;
-	if (unlikely(! vy_buf_in(bi->buf, bi->v)))
-		bi->v = NULL;
-}
-
 struct vy_avg {
 	uint64_t count;
 	uint64_t total;
@@ -828,17 +771,18 @@ struct vy_range {
 	uint32_t   used; /* sum of mem->used */
 	struct vy_run  *run;
 	uint32_t   run_count;
+	struct vy_mem *mem;
 	uint32_t   mem_count;
+	/** Number of times the range was compacted. */
+	int        merge_count;
 	uint32_t   temperature;
 	uint64_t   temperature_reads;
-	struct vy_mem *mem;
 	/** The file where the run is stored or -1 if it's not dumped yet. */
 	int fd;
 	char path[PATH_MAX];
 	rb_node(struct vy_range) tree_node;
 	struct heap_node   nodecompact;
 	struct heap_node   nodedump;
-	struct rlist     split;
 	uint32_t range_version;
 };
 
@@ -1584,10 +1528,6 @@ vy_run_unload_page(struct vy_run *run, uint32_t pos)
 	pthread_mutex_unlock(&run->cache_lock);
 }
 
-static struct vy_range *vy_range_new(struct vy_index *index);
-static int vy_range_create(struct vy_range*, struct vy_index*);
-static int vy_range_delete(struct vy_range*);
-
 static int64_t
 vy_range_mem_min_lsn(struct vy_range *range)
 {
@@ -1651,6 +1591,8 @@ rb_gen_ext_key(, vy_range_tree_, vy_range_tree_t, struct vy_range, tree_node,
 		 vy_range_tree_cmp, struct vy_range_tree_key *,
 		 vy_range_tree_key_cmp);
 
+static int vy_range_delete(struct vy_range *);
+
 struct vy_range *
 vy_range_tree_free_cb(vy_range_tree_t *t, struct vy_range *range, void *arg)
 {
@@ -1772,12 +1714,10 @@ vy_range_iterator_next(struct vy_range_iterator *ii)
 	}
 }
 
-static int
-vy_index_add_range(struct vy_index *index, struct vy_range *range)
+static void
+vy_range_update_min_key(struct vy_range *range)
 {
-	/*
-	 * Find minimal key for the range
-	 */
+	struct vy_index *index = range->index;
 
 	/* Check disk runs */
 	const char *min_key = NULL;
@@ -1810,16 +1750,22 @@ vy_index_add_range(struct vy_index *index, struct vy_range *range)
 		range->min_key = vy_tuple_extract_key_raw(index, min_key);
 	}
 	if (range->min_key == NULL)
-		return -1;
+		panic("failed to allocate range->min_key");
+}
+
+static void
+vy_index_add_range(struct vy_index *index, struct vy_range *range)
+{
+	if (range->min_key == NULL)
+		vy_range_update_min_key(range);
 
 	/* Add to range tree */
 	vy_range_tree_insert(&index->tree, range);
 	index->range_index_version++;
 	index->range_count++;
-	return 0;
 }
 
-static int
+static void
 vy_index_remove_range(struct vy_index *index, struct vy_range *range)
 {
 	vy_range_tree_remove(&index->tree, range);
@@ -1829,7 +1775,6 @@ vy_index_remove_range(struct vy_index *index, struct vy_range *range)
 	assert (range->min_key != NULL);
 	vy_tuple_unref(range->min_key);
 	range->min_key = NULL;
-	return 0;
 }
 
 /* dump tuple to the run page buffers (tuple header and data) */
@@ -1910,12 +1855,17 @@ vy_write_iterator_delete(struct vy_write_iterator *wi);
 /**
  * Write tuples from the iterator to a new page in the run,
  * update page and run statistics.
+ *
+ * Returns -1 in case of error, 0 if there's more data for this
+ * run, or 1 if the caller should proceed to the next run.
  */
 static int
-vy_run_write_page(int fd, struct vy_write_iterator *wi, uint32_t page_size,
-		  struct vy_run_index *run_index)
+vy_run_write_page(int fd, struct vy_write_iterator *wi,
+		  struct vy_tuple *split_key, struct key_def *key_def,
+		  uint32_t page_size, struct vy_run_index *run_index)
 {
 	struct vy_run_info *run_info = &run_index->info;
+	bool run_done = false;
 
 	struct vy_buf tuplesinfo, values, compressed;
 	vy_buf_create(&tuplesinfo);
@@ -1931,14 +1881,23 @@ vy_run_write_page(int fd, struct vy_write_iterator *wi, uint32_t page_size,
 	page->min_lsn = INT64_MAX;
 	page->offset = run_info->offset + run_info->size;
 
-	struct vy_tuple *tuple;
-	int rc = vy_write_iterator_get(wi, &tuple);
-	while (rc == 0 && (vy_buf_used(&values) < page_size)) {
-		rc = vy_run_dump_tuple(tuple, &tuplesinfo, &values, page);
-		if (rc)
+	while (true) {
+		struct vy_tuple *tuple;
+		if (vy_write_iterator_get(wi, &tuple) != 0) {
+			run_done = true; /* no more data */
+			break;
+		}
+		if (split_key != NULL && vy_tuple_compare(tuple->data,
+					split_key->data, key_def) >= 0) {
+			/* Split key reached, proceed to the next run. */
+			run_done = true;
+			break;
+		}
+		if (vy_buf_used(&values) >= page_size)
+			break;
+		if (vy_run_dump_tuple(tuple, &tuplesinfo, &values, page) != 0)
 			goto err;
 		vy_write_iterator_next(wi);
-		rc = vy_write_iterator_get(wi, &tuple);
 	}
 	page->unpacked_size = vy_buf_used(&tuplesinfo) + vy_buf_used(&values);
 	page->unpacked_size = ALIGN_POS(page->unpacked_size);
@@ -1989,7 +1948,7 @@ vy_run_write_page(int fd, struct vy_write_iterator *wi, uint32_t page_size,
 	vy_buf_destroy(&compressed);
 	vy_buf_destroy(&tuplesinfo);
 	vy_buf_destroy(&values);
-	return 0;
+	return run_done ? 1 : 0;
 err:
 	vy_buf_destroy(&compressed);
 	vy_buf_destroy(&tuplesinfo);
@@ -2003,8 +1962,8 @@ vy_run_write_page(int fd, struct vy_write_iterator *wi, uint32_t page_size,
  */
 static int
 vy_run_write(int fd, struct vy_write_iterator *wi,
-	     uint32_t page_size, uint64_t run_size,
-	     struct vy_run **result)
+	     struct vy_tuple *split_key, struct key_def *key_def,
+	     uint32_t page_size, struct vy_run **result)
 {
 	int rc = 0;
 	struct vy_run *run = vy_run_new();
@@ -2036,15 +1995,15 @@ vy_run_write(int fd, struct vy_write_iterator *wi,
 	header->size += header_size;
 
 	/*
-	 * Read from the iterator until it's exhausted or range
-	 * size limit is reached.
+	 * Read from the iterator until it's exhausted or
+	 * the split key is reached.
 	 */
 	do {
-		if (vy_run_write_page(fd, wi, page_size,
-				      run_index) == -1)
+		rc = vy_run_write_page(fd, wi, split_key, key_def,
+				       page_size, run_index);
+		if (rc < 0)
 			goto err;
-		rc = vy_write_iterator_get(wi, NULL);
-	} while (rc == 0 && header->total < run_size);
+	} while (rc == 0);
 
 	/* Write pages index */
 	header->pages_offset = header->offset +
@@ -2108,156 +2067,6 @@ vy_tmp_mem_iterator_open(struct vy_iter *virt_itr, struct vy_mem *mem,
 static int64_t
 vy_index_range_id_next(struct vy_index *index);
 
-static inline int
-vy_range_split(struct vy_index *index, struct vy_write_iterator *wi,
-	       uint64_t size_node, struct rlist *result);
-
-static int
-vy_range_redistribute(struct vy_index *index, struct vy_range *range,
-		struct rlist *result)
-{
-	(void)index;
-	struct vy_iter ii;
-	struct vy_tuple *key = vy_tuple_from_key(index, NULL, 0);
-	vy_tmp_mem_iterator_open(&ii, range->mem, VINYL_GE, key->data);
-	assert(!rlist_empty(result));
-	struct vy_range *prev = rlist_first_entry(result, struct vy_range,
-						  split);
-	while (1)
-	{
-		if (rlist_next(&prev->split) == result) {
-			/* no more ranges */
-			assert(prev != NULL);
-			while (ii.vif->has(&ii)) {
-				struct vy_tuple *v = ii.vif->get(&ii);
-				vy_mem_set(prev->mem, v);
-				ii.vif->next(&ii);
-			}
-			break;
-		}
-		struct vy_range *p = rlist_next_entry(prev, split);
-		while (ii.vif->has(&ii))
-		{
-			struct vy_tuple *v = ii.vif->get(&ii);
-			struct vy_page_info *page = vy_run_index_first_page(&p->run->index);
-			int rc = vy_tuple_compare(v->data,
-				vy_run_index_min_key(&p->run->index, page),
-				index->key_def);
-			if (unlikely(rc >= 0))
-				break;
-			vy_mem_set(prev->mem, v);
-			ii.vif->next(&ii);
-		}
-		if (unlikely(! ii.vif->has(&ii)))
-			break;
-		prev = p;
-	}
-	assert(ii.vif->get(&ii) == NULL);
-	vy_tuple_unref(key);
-	return 0;
-}
-
-static void
-vy_range_redistribute_set(struct vy_index *index, uint64_t now, struct vy_tuple *v)
-{
-	/* match range */
-	struct vy_range_iterator ii;
-	vy_range_iterator_open(&ii, index, VINYL_GE, v->data, v->size);
-	struct vy_range *range = vy_range_iterator_get(&ii);
-	assert(range != NULL);
-	/* update range */
-	int rc = vy_mem_set(range->mem, v);
-	assert(rc == 0); /* TODO: handle BPS tree errors properly */
-	(void) rc;
-	range->update_time = now;
-	range->used += vy_tuple_size(v);
-}
-
-static int
-vy_range_redistribute_index(struct vy_index *index, struct vy_range *range)
-{
-	struct vy_buf buf;
-	struct vy_tuple *key = vy_tuple_from_key(index, NULL, 0);
-	vy_buf_create(&buf);
-	struct vy_iter ii;
-	vy_tmp_mem_iterator_open(&ii, range->mem, VINYL_GE, key->data);
-	while (ii.vif->has(&ii)) {
-		struct vy_tuple *v = ii.vif->get(&ii);
-		int rc = vy_buf_add(&buf, v, sizeof(struct vy_tuple ***));
-		if (unlikely(rc == -1)) {
-			vy_buf_destroy(&buf);
-			vy_tuple_unref(key);
-			return -1;
-		}
-		ii.vif->next(&ii);
-	}
-	if (unlikely(vy_buf_used(&buf) == 0)) {
-		vy_buf_destroy(&buf);
-		vy_tuple_unref(key);
-		return 0;
-	}
-	uint64_t now = clock_monotonic64();
-	struct vy_bufiter i;
-	vy_bufiter_open(&i, &buf, sizeof(struct vy_tuple **));
-	while (vy_bufiter_has(&i)) {
-		struct vy_tuple **v = vy_bufiterref_get(&i);
-		vy_range_redistribute_set(index, now, *v);
-		vy_bufiter_next(&i);
-	}
-	vy_buf_destroy(&buf);
-	vy_tuple_unref(key);
-	return 0;
-}
-
-static int
-vy_range_splitfree(struct rlist *result)
-{
-	struct vy_range *range, *next;
-	rlist_foreach_entry_safe(range, result, split, next) {
-		rlist_del_entry(range, split);
-		vy_range_delete(range);
-	}
-	assert(rlist_empty(result));
-	return 0;
-}
-
-static inline int
-vy_range_split(struct vy_index *index, struct vy_write_iterator *wi,
-	       uint64_t  size_node, struct rlist *result)
-{
-	int rc;
-	struct vy_range *range = NULL;
-
-	while (!vy_write_iterator_get(wi, NULL)) {
-		/* create new range */
-		range = vy_range_new(index);
-		if (unlikely(range == NULL))
-			goto error;
-		rc = vy_range_create(range, index);
-		if (unlikely(rc == -1))
-			goto error;
-
-		struct vy_run *run;
-		if ((rc = vy_run_write(range->fd, wi,
-				       index->key_def->opts.page_size,
-				       size_node, &run)))
-			goto error;
-
-		range->run = run;
-		++range->run_count;
-
-		rlist_add_tail_entry(result, range, split);
-		if (unlikely(rc == -1))
-			goto error;
-	}
-	return 0;
-error:
-	if (range)
-		vy_range_delete(range);
-	vy_range_splitfree(result);
-	return -1;
-}
-
 static struct vy_range *
 vy_range_new(struct vy_index *index)
 {
@@ -2277,7 +2086,6 @@ vy_range_new(struct vy_index *index)
 	range->index = index;
 	range->nodedump.pos = UINT32_MAX;
 	range->nodecompact.pos = UINT32_MAX;
-	rlist_create(&range->split);
 	return range;
 }
 
@@ -2361,14 +2169,17 @@ vy_range_open(struct vy_index *index, struct vy_range *range, char *path)
 }
 
 static int
-vy_range_create(struct vy_range *range, struct vy_index *index)
+vy_range_create(struct vy_range *range, struct vy_index *index, int *p_fd)
 {
 	snprintf(range->path, PATH_MAX, "%s/.tmpXXXXXX", index->path);
 
 	ERROR_INJECT(ERRINJ_VY_RANGE_CREATE, {errno = EMFILE; goto error;});
 
-	if ((range->fd = mkstemp(range->path)) == -1)
+	int fd = mkstemp(range->path);
+	if (fd < 0)
 		goto error;
+
+	*p_fd = fd;
 	return 0;
 error:
 	vy_error("temp file '%s' create error: %s",
@@ -2441,6 +2252,204 @@ vy_range_complete(struct vy_range *range, struct vy_index *index)
 	return rc;
 }
 
+/*
+ * Return true and set split_key accordingly if the range needs to be
+ * split in two.
+ *
+ * - We should never split a range until it was merged at least once
+ *   (actually, it should be a function of compact_wm/number of runs
+ *   used for the merge: with low compact_wm it's more than once, with
+ *   high compact_wm it's once).
+ * - We should use the last run size as the size of the range.
+ * - We should split around the last run middle key.
+ * - We should only split if the last run size is greater than
+ *   4/3 * range_size.
+ */
+static bool
+vy_range_need_split(struct vy_range *range, const char **split_key)
+{
+	struct key_def *key_def = range->index->key_def;
+	struct vy_run *run = NULL;
+
+	/* The range hasn't been merged yet - too early to split it. */
+	if (range->merge_count < 1)
+		return false;
+
+	/* Find the oldest run. */
+	assert(range->run != NULL);
+	for (run = range->run; run->next; run = run->next) { }
+
+	/* The range is too small to be split. */
+	if (run->index.info.total < key_def->opts.range_size * 4 / 3)
+		return false;
+
+	/* Find the median key in the oldest run (approximately). */
+	struct vy_page_info *p = vy_run_index_get_page(&run->index,
+					run->index.info.count / 2);
+	*split_key = vy_run_index_min_key(&run->index, p);
+	return true;
+}
+
+struct vy_range_compact_part {
+	struct vy_range *range;
+	struct vy_run *run;
+	int fd;
+};
+
+static int
+vy_range_compact_prepare(struct vy_range *range,
+			 struct vy_range_compact_part *parts, int *p_n_parts)
+{
+	struct vy_index *index = range->index;
+	struct vy_tuple *min_key, *split_key = NULL;
+	const char *split_key_raw;
+	int n_parts = 1;
+	int i;
+
+	min_key = range->min_key;
+	vy_tuple_ref(min_key);
+	vy_index_remove_range(index, range);
+
+	if (vy_range_need_split(range, &split_key_raw) &&
+	    vy_tuple_compare(min_key->data, split_key_raw,
+			     index->key_def) != 0) {
+		split_key = vy_tuple_extract_key_raw(index, split_key_raw);
+		if (split_key == NULL)
+			goto fail;
+		n_parts = 2;
+	}
+
+	/* Allocate new ranges and initialize parts. */
+	for (i = 0; i < n_parts; i++) {
+		struct vy_range *r = vy_range_new(index);
+		if (r == NULL)
+			goto fail;
+		parts[i].range = r;
+		parts[i].run = NULL;
+		parts[i].fd = -1;
+	}
+
+	/* Set min keys for the new ranges. */
+	parts[0].range->min_key = min_key;
+	if (split_key != NULL)
+		parts[1].range->min_key = split_key;
+
+	for (i = 0; i < n_parts; i++) {
+		struct vy_range *r = parts[i].range;
+
+		/* Link mem and run to the original range. */
+		r->mem->next = range->mem;
+		r->mem_count = range->mem_count + 1;
+		r->run = range->run;
+		r->run_count = range->run_count;
+		r->fd = range->fd;
+
+		vy_index_add_range(index, r);
+	}
+
+	*p_n_parts = n_parts;
+	return 0;
+fail:
+	for (i = 0; i < n_parts; i++) {
+		struct vy_range *r = parts[i].range;
+		if (r != NULL)
+			vy_range_delete(r);
+	}
+	vy_tuple_unref(min_key);
+	if (split_key != NULL)
+		vy_tuple_unref(split_key);
+
+	vy_index_add_range(index, range);
+	return -1;
+}
+
+static void
+vy_range_compact_commit(struct vy_range *range, int n_parts,
+			struct vy_range_compact_part *parts)
+{
+	struct vy_index *index = range->index;
+	int i;
+
+	index->size -= vy_range_size(range);
+	for (i = 0; i < n_parts; i++) {
+		struct vy_range *r = parts[i].range;
+
+		/* Unlink new ranges from the old one. */
+		r->mem->next = NULL;
+		r->mem_count = 1;
+		r->run = parts[i].run;
+		r->run_count = r->run ? 1 : 0;
+		r->fd = parts[i].fd;
+
+		/*
+		 * If a new range is empty, delete it unless
+		 * it's the only one.
+		 */
+		if (r->run == NULL && r->used == 0 && index->range_count > 1) {
+			vy_index_remove_range(index, r);
+			vy_range_delete(r);
+			continue;
+		}
+
+		index->size += vy_range_size(r);
+
+		/* Account merge w/o split. */
+		if (n_parts == 1 && r->run != NULL)
+			r->merge_count = range->merge_count + 1;
+
+		/* Make the new range visible to the scheduler. */
+		vy_scheduler_add_range(range->index->env->scheduler, r);
+	}
+	vy_range_delete(range);
+}
+
+static void
+vy_range_compact_abort(struct vy_range *range, int n_parts,
+		       struct vy_range_compact_part *parts)
+{
+	struct vy_index *index = range->index;
+	int i;
+
+	/*
+	 * So, we failed to write runs for the new ranges. Attach their
+	 * in-memory indexes to the original range and delete them.
+	 */
+
+	for (i = 0; i < n_parts; i++) {
+		struct vy_range *r = parts[i].range;
+
+		vy_index_remove_range(index, r);
+
+		/* No point in linking an empty mem. */
+		if (r->used == 0) {
+			r->mem->next = range->mem;
+			range->mem = r->mem;
+			range->mem_count++;
+			range->used += r->used;
+			r->mem = NULL;
+		}
+
+		/* Prevent original range's data from being destroyed. */
+		if (r->mem != NULL)
+			r->mem->next = NULL;
+		r->run = NULL;
+		r->fd = -1;
+		vy_range_delete(r);
+
+		if (parts[i].run != NULL)
+			vy_run_delete(parts[i].run);
+		if (parts[i].fd >= 0)
+			close(parts[i].fd);
+	}
+
+	/*
+	 * Finally, insert the old range back to the tree and make it
+	 * visible to the scheduler.
+	 */
+	vy_index_add_range(index, range);
+	vy_scheduler_add_range(index->env->scheduler, range);
+}
+
 static void
 vy_profiler_begin(struct vy_profiler *p, struct vy_index *i)
 {
@@ -2840,7 +2849,8 @@ struct vy_task {
 		} dump;
 		struct {
 			struct vy_range *range;
-			struct rlist split_list;
+			int n_parts;
+			struct vy_range_compact_part parts[2];
 		} compact;
 	};
 	/**
@@ -2883,27 +2893,31 @@ vy_task_dump_execute(struct vy_task *task)
 {
 	struct vy_index *index = task->index;
 	struct vy_range *range = task->dump.range;
-	struct vy_mem *mem;
+	struct vy_write_iterator *wi;
 	int rc;
 
 	if (!range->run) {
 		/* An empty range, create a temp file for it. */
-		rc = vy_range_create(range, index);
-		if (rc)
+		rc = vy_range_create(range, index, &range->fd);
+		if (rc != 0)
 			return rc;
 	}
 
-	struct vy_write_iterator *wi;
 	wi = vy_write_iterator_new(1, index, task->vlsn);
+	if (wi == NULL)
+		return -1;
+
 	/* We dump all memory indexes but the newest one -
 	 * see comment in vy_task_dump_new() */
-	for (mem = range->mem->next; mem; mem = mem->next) {
+	for (struct vy_mem *mem = range->mem->next; mem; mem = mem->next) {
 		rc = vy_write_iterator_add_mem(wi, mem, 0, 0);
+		if (rc != 0)
+			goto out;
 	}
-	if (!rc)
-		rc = vy_run_write(range->fd, wi,
-				  index->key_def->opts.page_size, UINT64_MAX,
-				  &task->dump.new_run);
+	rc = vy_run_write(range->fd, wi, NULL, NULL,
+			  index->key_def->opts.page_size,
+			  &task->dump.new_run);
+out:
 	vy_write_iterator_delete(wi);
 	return rc;
 }
@@ -2978,7 +2992,7 @@ vy_task_dump_new(struct mempool *pool, struct vy_range *range)
 
 	struct vy_mem *mem = vy_mem_new(range->index->key_def);
 	if (!mem) {
-		free(task);
+		vy_task_delete(pool, task);
 		return NULL;
 	}
 
@@ -3000,31 +3014,55 @@ vy_task_compact_execute(struct vy_task *task)
 {
 	struct vy_index *index = task->index;
 	struct vy_range *range = task->compact.range;
+	struct vy_range_compact_part *parts = task->compact.parts;
+	int n_parts = task->compact.n_parts;
+	struct vy_write_iterator *wi;
+	int rc = 0;
+
 	assert(range->nodedump.pos == UINT32_MAX);
 	assert(range->nodecompact.pos == UINT32_MAX);
 
-	struct vy_run *run;
-	int rc;
-	struct vy_write_iterator *wi;
 	wi = vy_write_iterator_new(0, index, task->vlsn);
+	if (wi == NULL)
+		return -1;
 
-	/* compact on disk runs */
-	for (run = range->run; run; run = run->next) {
+	/* Compact on disk runs. */
+	for (struct vy_run *run = range->run; run; run = run->next) {
 		rc = vy_write_iterator_add_run(wi, run, range->fd, 0, 0);
-		if (rc)
-			goto end;
+		if (rc != 0)
+			goto out;
 	}
 
-	/* compact shadow memory indexes */
-	for (struct vy_mem *mem = range->mem->next; mem; mem = mem->next) {
+	/* Compact in-memory indexes. */
+	for (struct vy_mem *mem = range->mem; mem; mem = mem->next) {
 		rc = vy_write_iterator_add_mem(wi, mem, 0, 0);
-		if (rc)
-			goto end;
+		if (rc != 0)
+			goto out;
 	}
 
-	rc = vy_range_split(index, wi, index->key_def->opts.range_size,
-			    &task->compact.split_list);
-end:
+	assert(n_parts > 0);
+	for (int i = 0; i < n_parts; i++) {
+		struct vy_range_compact_part *p = &parts[i];
+		struct vy_tuple *split_key = i < n_parts - 1 ?
+			parts[i + 1].range->min_key : NULL;
+
+		if (vy_write_iterator_get(wi, NULL) != 0)
+			goto out; /* no more data */
+
+		rc = vy_range_create(p->range, index, &parts[i].fd);
+		if (rc != 0)
+			goto out;
+
+		rc = vy_run_write(p->fd, wi, split_key, index->key_def,
+				  index->key_def->opts.page_size, &p->run);
+		if (rc != 0)
+			goto out;
+
+		rc = vy_range_complete(p->range, index);
+		if (rc != 0)
+			goto out;
+	}
+out:
 	vy_write_iterator_delete(wi);
 	return rc;
 }
@@ -3034,70 +3072,15 @@ vy_task_compact_complete(struct vy_task *task)
 {
 	struct vy_index *index = task->index;
 	struct vy_range *range = task->compact.range;
-	struct rlist *split_list = &task->compact.split_list;
-
-	struct vy_range *n;
-	int count = 0;
-	int rc;
+	struct vy_range_compact_part *parts = task->compact.parts;
+	int n_parts = task->compact.n_parts;
 
 	if (task->status != 0) {
-		/* roll back */
-		vy_range_splitfree(split_list);
-		vy_scheduler_add_range(index->env->scheduler, range);
+		vy_range_compact_abort(range, n_parts, parts);
 		return 0;
 	}
 
-	rlist_foreach_entry(n, split_list, split)
-		 count++;
-
-	/* Mask removal of a single range as a single range update */
-	if (count == 0 && index->range_count == 1) {
-		n = vy_range_new(index);
-		if (n == NULL)
-			return -1;
-		rlist_add_entry(split_list, n, split);
-		count++;
-	}
-
-	vy_index_remove_range(index, range);
-	index->size -= vy_range_size(range);
-
-	if (count == 0) {
-		/* delete */
-		vy_range_redistribute_index(index, range);
-	} else if (count == 1) {
-		/* self update */
-		struct vy_mem *mem = range->mem;
-		n = rlist_first_entry(split_list, struct vy_range, split);
-		n->mem->next = mem->next;
-		mem->next = NULL;
-		range->mem = n->mem;
-		n->mem = mem;
-	} else {
-		/* split */
-		rc = vy_range_redistribute(index, range, split_list);
-		if (rc) {
-			vy_range_splitfree(split_list);
-			return rc;
-		}
-	}
-
-	rlist_foreach_entry(n, split_list, split) {
-		n->used = n->mem->used;
-		n->temperature = range->temperature;
-		n->temperature_reads = range->temperature_reads;
-		index->size += vy_range_size(n);
-		vy_index_add_range(index, n);
-		/* Add to scheduler */
-		vy_scheduler_add_range(index->env->scheduler, n);
-	}
-
-	/* complete new nodes */
-	rlist_foreach_entry(n, split_list, split) {
-		rc = vy_range_complete(n, index);
-		if (rc)
-			return rc;
-	}
+	vy_range_compact_commit(range, n_parts, parts);
 
 	if (vy_index_dump_range_index(index)) {
 		/*
@@ -3107,7 +3090,7 @@ vy_task_compact_complete(struct vy_task *task)
 		return -1;
 	}
 
-	return vy_range_delete(range);
+	return 0;
 }
 
 static struct vy_task *
@@ -3117,13 +3100,17 @@ vy_task_compact_new(struct mempool *pool, struct vy_range *range)
 		.execute = vy_task_compact_execute,
 		.complete = vy_task_compact_complete,
 	};
-	struct vy_task *task = vy_task_new(pool, range->index, &compact_ops);
 
+	struct vy_task *task = vy_task_new(pool, range->index, &compact_ops);
 	if (!task)
 		return NULL;
 
+	if (vy_range_compact_prepare(range, task->compact.parts,
+				     &task->compact.n_parts) != 0) {
+		vy_task_delete(pool, task);
+		return NULL;
+	}
 	task->compact.range = range;
-	rlist_create(&task->compact.split_list);
 	return task;
 }
 
diff --git a/test/vinyl/suite.ini b/test/vinyl/suite.ini
index 5ec32efd11bfe131171decdb1eaa56abf515dfea..6c8ca6b0852a4f6c9131195994fb46c14993386a 100644
--- a/test/vinyl/suite.ini
+++ b/test/vinyl/suite.ini
@@ -2,7 +2,7 @@
 core = tarantool
 description = vinyl integration tests
 script = vinyl.lua
-disabled = truncate.test.lua
+disabled = truncate.test.lua split.test.lua
 valgrind_disabled =
 release_disabled = errinj.test.lua
 config = suite.cfg