From 448b643eb9e0bd64d0facfbd857e96135a95401b Mon Sep 17 00:00:00 2001
From: Vladimir Davydov <vdavydov.dev@gmail.com>
Date: Sat, 1 Jul 2017 18:15:46 +0300
Subject: [PATCH] vinyl: move vy_range to its own source file

Needed for #1906
---
 src/box/CMakeLists.txt |   1 +
 src/box/vinyl.c        | 648 +----------------------------------------
 src/box/vy_range.c     | 549 ++++++++++++++++++++++++++++++++++
 src/box/vy_range.h     | 321 ++++++++++++++++++++
 4 files changed, 872 insertions(+), 647 deletions(-)
 create mode 100644 src/box/vy_range.c
 create mode 100644 src/box/vy_range.h

diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt
index 6f61c5b3bf..98ddc80000 100644
--- a/src/box/CMakeLists.txt
+++ b/src/box/CMakeLists.txt
@@ -54,6 +54,7 @@ add_library(box STATIC
     vy_stmt.c
     vy_mem.c
     vy_run.c
+    vy_range.c
     vy_write_iterator.c
     vy_cache.c
     vy_log.c
diff --git a/src/box/vinyl.c b/src/box/vinyl.c
index 0eb1e8d27b..3673091570 100644
--- a/src/box/vinyl.c
+++ b/src/box/vinyl.c
@@ -32,6 +32,7 @@
 
 #include "vy_mem.h"
 #include "vy_run.h"
+#include "vy_range.h"
 #include "vy_cache.h"
 #include "vy_log.h"
 #include "vy_upsert.h"
@@ -263,68 +264,6 @@ vy_stat_tx_write_rate(struct vy_stat *s)
 	return rmean_mean(s->rmean, VY_STAT_TX_WRITE);
 }
 
-struct vy_range {
-	/** Unique ID of this range. */
-	int64_t   id;
-	/**
-	 * Range lower bound. NULL if range is leftmost.
-	 * Both 'begin' and 'end' statements have SELECT type with the full
-	 * idexed key.
-	 */
-	struct tuple *begin;
-	/** Range upper bound. NULL if range is rightmost. */
-	struct tuple *end;
-	/** Key definition for comparing range boundaries. */
-	const struct key_def *key_def;
-	/** An estimate of the number of statements in this range. */
-	struct vy_disk_stmt_counter count;
-	/**
-	 * List of run slices in this range, linked by vy_slice->in_range.
-	 * The newer a slice, the closer it to the list head.
-	 */
-	struct rlist slices;
-	/** Number of entries in the ->slices list. */
-	int slice_count;
-	/**
-	 * The goal of compaction is to reduce read amplification.
-	 * All ranges for which the LSM tree has more runs per
-	 * level than run_count_per_level or run size larger than
-	 * one defined by run_size_ratio of this level are candidates
-	 * for compaction.
-	 * Unlike other LSM implementations, Vinyl can have many
-	 * sorted runs in a single level, and is able to compact
-	 * runs from any number of adjacent levels. Moreover,
-	 * higher levels are always taken in when compacting
-	 * a lower level - i.e. L1 is always included when
-	 * compacting L2, and both L1 and L2 are always included
-	 * when compacting L3.
-	 *
-	 * This variable contains the number of runs the next
-	 * compaction of this range will include.
-	 *
-	 * The lower the level is scheduled for compaction,
-	 * the bigger it tends to be because upper levels are
-	 * taken in.
-	 * @sa vy_range_update_compact_priority() to see
-	 * how we  decide how many runs to compact next time.
-	 */
-	int compact_priority;
-	/** Number of times the range was compacted. */
-	int n_compactions;
-	/** Link in vy_index->tree. */
-	rb_node(struct vy_range) tree_node;
-	/** Link in vy_index->range_heap. */
-	struct heap_node heap_node;
-	/**
-	 * Incremented whenever an in-memory index or on disk
-	 * run is added to or deleted from this range. Used to
-	 * invalidate iterators.
-	 */
-	uint32_t version;
-};
-
-typedef rb_tree(struct vy_range) vy_range_tree_t;
-
 /**
  * A single operation made by a transaction:
  * a single read or write in a vy_index.
@@ -779,13 +718,6 @@ struct vy_merge_iterator {
 	bool range_ended;
 };
 
-struct vy_range_iterator {
-	vy_range_tree_t *tree;
-	enum iterator_type iterator_type;
-	const struct tuple *key;
-	struct vy_range *curr_range;
-};
-
 /**
  * Complex read iterator over vinyl index and write_set of current tx
  * Iterates over ranges, creates merge iterator for every range and outputs
@@ -1168,69 +1100,6 @@ vy_index_unacct_range(struct vy_index *index, struct vy_range *range)
 	histogram_discard(index->run_hist, range->slice_count);
 }
 
-/** An snprint-style function to print a range's boundaries. */
-static int
-vy_range_snprint(char *buf, int size, const struct vy_range *range)
-{
-	int total = 0;
-	SNPRINT(total, snprintf, buf, size, "(");
-	if (range->begin != NULL)
-		SNPRINT(total, vy_key_snprint, buf, size,
-			tuple_data(range->begin));
-	else
-		SNPRINT(total, snprintf, buf, size, "-inf");
-	SNPRINT(total, snprintf, buf, size, "..");
-	if (range->end != NULL)
-		SNPRINT(total, vy_key_snprint, buf, size,
-			tuple_data(range->end));
-	else
-		SNPRINT(total, snprintf, buf, size, "inf");
-	SNPRINT(total, snprintf, buf, size, ")");
-	return total;
-}
-
-/**
- * Helper function returning a human readable representation
- * of a range's boundaries.
- */
-static const char *
-vy_range_str(struct vy_range *range)
-{
-	char *buf = tt_static_buf();
-	vy_range_snprint(buf, TT_STATIC_BUF_LEN, range);
-	return buf;
-}
-
-/** Add a run slice to the head of a range's list. */
-static void
-vy_range_add_slice(struct vy_range *range, struct vy_slice *slice)
-{
-	rlist_add_entry(&range->slices, slice, in_range);
-	range->slice_count++;
-	vy_disk_stmt_counter_add(&range->count, &slice->count);
-}
-
-/** Add a run slice to a range's list before @next_slice. */
-static void
-vy_range_add_slice_before(struct vy_range *range, struct vy_slice *slice,
-			  struct vy_slice *next_slice)
-{
-	rlist_add_tail(&next_slice->in_range, &slice->in_range);
-	range->slice_count++;
-	vy_disk_stmt_counter_add(&range->count, &slice->count);
-}
-
-/** Remove a run slice from a range's list. */
-static void
-vy_range_remove_slice(struct vy_range *range, struct vy_slice *slice)
-{
-	assert(range->slice_count > 0);
-	assert(!rlist_empty(&range->slices));
-	rlist_del_entry(slice, in_range);
-	range->slice_count--;
-	vy_disk_stmt_counter_sub(&range->count, &slice->count);
-}
-
 /**
  * Allocate a new run for an index and write the information
  * about it to the metadata log so that we could still find
@@ -1288,30 +1157,6 @@ vy_run_discard(struct vy_run *run)
 	}
 }
 
-/** Return true if a task was scheduled for a given range. */
-static bool
-vy_range_is_scheduled(struct vy_range *range)
-{
-	return range->heap_node.pos == UINT32_MAX;
-}
-
-#define HEAP_NAME vy_range_heap
-
-static bool
-vy_range_heap_less(struct heap_node *a, struct heap_node *b)
-{
-	struct vy_range *left = container_of(a, struct vy_range, heap_node);
-	struct vy_range *right = container_of(b, struct vy_range, heap_node);
-	return left->compact_priority > right->compact_priority;
-}
-
-#define HEAP_LESS(h, l, r) vy_range_heap_less(l, r)
-
-#include "salad/heap.h"
-
-#undef HEAP_LESS
-#undef HEAP_NAME
-
 /** Return max compact_priority among ranges of an index. */
 static int
 vy_index_compact_priority(struct vy_index *index)
@@ -1486,215 +1331,6 @@ vy_scheduler_remove_mem(struct vy_scheduler *scheduler, struct vy_mem *mem);
 static bool
 vy_scheduler_needs_dump(struct vy_scheduler *scheduler);
 
-static int
-vy_range_tree_cmp(struct vy_range *a, struct vy_range *b);
-
-static int
-vy_range_tree_key_cmp(const struct tuple *a, struct vy_range *b);
-
-rb_gen_ext_key(MAYBE_UNUSED static inline, vy_range_tree_, vy_range_tree_t,
-	       struct vy_range, tree_node, vy_range_tree_cmp,
-	       const struct tuple *, vy_range_tree_key_cmp);
-
-static int
-vy_range_tree_cmp(struct vy_range *range_a, struct vy_range *range_b)
-{
-	if (range_a == range_b)
-		return 0;
-
-	/* Any key > -inf. */
-	if (range_a->begin == NULL)
-		return -1;
-	if (range_b->begin == NULL)
-		return 1;
-
-	assert(range_a->key_def == range_b->key_def);
-	return vy_key_compare(range_a->begin, range_b->begin,
-			      range_a->key_def);
-}
-
-static int
-vy_range_tree_key_cmp(const struct tuple *stmt, struct vy_range *range)
-{
-	/* Any key > -inf. */
-	if (range->begin == NULL)
-		return 1;
-	return vy_stmt_compare_with_key(stmt, range->begin, range->key_def);
-}
-
-static void
-vy_range_iterator_open(struct vy_range_iterator *itr, vy_range_tree_t *tree,
-		       enum iterator_type iterator_type,
-		       const struct tuple *key)
-{
-	itr->tree = tree;
-	itr->iterator_type = iterator_type;
-	itr->key = key;
-	itr->curr_range = NULL;
-}
-
-/*
- * Find the first range in which a given key should be looked up.
- */
-static struct vy_range *
-vy_range_tree_find_by_key(vy_range_tree_t *tree,
-			  enum iterator_type iterator_type,
-			  const struct tuple *key)
-{
-	uint32_t key_field_count = tuple_field_count(key);
-	if (key_field_count == 0) {
-		switch (iterator_type) {
-		case ITER_LT:
-		case ITER_LE:
-			return vy_range_tree_last(tree);
-		case ITER_GT:
-		case ITER_GE:
-		case ITER_EQ:
-			return vy_range_tree_first(tree);
-		default:
-			unreachable();
-			return NULL;
-		}
-	}
-	/* route */
-	struct vy_range *range;
-	if (iterator_type == ITER_GE || iterator_type == ITER_GT ||
-	    iterator_type == ITER_EQ) {
-		/**
-		 * Case 1. part_count == 1, looking for [10]. ranges:
-		 * {1, 3, 5} {7, 8, 9} {10, 15 20} {22, 32, 42}
-		 *                      ^looking for this
-		 * Case 2. part_count == 1, looking for [10]. ranges:
-		 * {1, 2, 4} {5, 6, 7, 8} {50, 100, 200}
-		 *            ^looking for this
-		 * Case 3. part_count == 2, looking for [10]. ranges:
-		 * {[1, 2], [2, 3]} {[9, 1], [10, 1], [10 2], [11 3]} {[12,..}
-		 *                   ^looking for this
-		 * Case 4. part_count == 2, looking for [10]. ranges:
-		 * {[1, 2], [10, 1]} {[10, 2] [10 3] [11 3]} {[12, 1]..}
-		 *  ^looking for this
-		 * Case 5. part_count does not matter, looking for [10].
-		 * ranges:
-		 * {100, 200}, {300, 400}
-		 * ^looking for this
-		 */
-		/**
-		 * vy_range_tree_psearch finds least range with begin == key
-		 * or previous if equal was not found
-		 */
-		range = vy_range_tree_psearch(tree, key);
-		/* switch to previous for case (4) */
-		if (range != NULL && range->begin != NULL &&
-		    key_field_count < range->key_def->part_count &&
-		    vy_stmt_compare_with_key(key, range->begin,
-					     range->key_def) == 0)
-			range = vy_range_tree_prev(tree, range);
-		/* for case 5 or subcase of case 4 */
-		if (range == NULL)
-			range = vy_range_tree_first(tree);
-	} else {
-		assert(iterator_type == ITER_LT || iterator_type == ITER_LE);
-		/**
-		 * Case 1. part_count == 1, looking for [10]. ranges:
-		 * {1, 3, 5} {7, 8, 9} {10, 15 20} {22, 32, 42}
-		 *                      ^looking for this
-		 * Case 2. part_count == 1, looking for [10]. ranges:
-		 * {1, 2, 4} {5, 6, 7, 8} {50, 100, 200}
-		 *            ^looking for this
-		 * Case 3. part_count == 2, looking for [10]. ranges:
-		 * {[1, 2], [2, 3]} {[9, 1], [10, 1], [10 2], [11 3]} {[12,..}
-		 *                   ^looking for this
-		 * Case 4. part_count == 2, looking for [10]. ranges:
-		 * {[1, 2], [10, 1]} {[10, 2] [10 3] [11 3]} {[12, 1]..}
-		 *                    ^looking for this
-		 * Case 5. part_count does not matter, looking for [10].
-		 * ranges:
-		 * {1, 2}, {3, 4, ..}
-		 *          ^looking for this
-		 */
-		/**
-		 * vy_range_tree_nsearch finds most range with begin == key
-		 * or next if equal was not found
-		 */
-		range = vy_range_tree_nsearch(tree, key);
-		if (range != NULL) {
-			/* fix curr_range for cases 2 and 3 */
-			if (range->begin != NULL &&
-			    vy_stmt_compare_with_key(key, range->begin,
-						     range->key_def) != 0) {
-				struct vy_range *prev;
-				prev = vy_range_tree_prev(tree,
-							  range);
-				if (prev != NULL)
-					range = prev;
-			}
-		} else {
-			/* Case 5 */
-			range = vy_range_tree_last(tree);
-		}
-	}
-	/* Range tree must span all possible keys. */
-	assert(range != NULL);
-	return range;
-}
-
-/**
- * Iterate to the next range. The next range is returned in @result.
- */
-static void
-vy_range_iterator_next(struct vy_range_iterator *itr, struct vy_range **result)
-{
-	struct vy_range *curr = itr->curr_range;
-	struct vy_range *next;
-
-	if (curr == NULL) {
-		/* First iteration */
-		next = vy_range_tree_find_by_key(itr->tree, itr->iterator_type,
-						 itr->key);
-		goto out;
-	}
-	switch (itr->iterator_type) {
-	case ITER_LT:
-	case ITER_LE:
-		next = vy_range_tree_prev(itr->tree, curr);
-		break;
-	case ITER_GT:
-	case ITER_GE:
-		next = vy_range_tree_next(itr->tree, curr);
-		break;
-	case ITER_EQ:
-		if (curr->end != NULL &&
-		    vy_stmt_compare_with_key(itr->key, curr->end,
-					     curr->key_def) >= 0) {
-			/* A partial key can be found in more than one range. */
-			next = vy_range_tree_next(itr->tree, curr);
-		} else {
-			next = NULL;
-		}
-		break;
-	default:
-		unreachable();
-	}
-out:
-	*result = itr->curr_range = next;
-}
-
-/**
- * Position iterator @itr to the range that contains @last_stmt and
- * return the current range in @result. If @last_stmt is NULL, restart
- * the iterator.
- */
-static void
-vy_range_iterator_restore(struct vy_range_iterator *itr,
-			  const struct tuple *last_stmt,
-			  struct vy_range **result)
-{
-	struct vy_range *curr = vy_range_tree_find_by_key(itr->tree,
-				itr->iterator_type,
-				last_stmt != NULL ? last_stmt : itr->key);
-	*result = itr->curr_range = curr;
-}
-
 static void
 vy_index_add_range(struct vy_index *index, struct vy_range *range)
 {
@@ -1713,43 +1349,6 @@ vy_index_remove_range(struct vy_index *index, struct vy_range *range)
 	index->range_count--;
 }
 
-/**
- * Allocate and initialize a range (either a new one or for
- * restore from disk).
- *
- * @param id         Range id.
- * @param begin      Range begin (inclusive) or NULL for -inf.
- * @param end        Range end (exclusive) or NULL for +inf.
- * @param key_def    Key definition for comparing range boundaries.
- *
- * @retval not NULL  The new range.
- * @retval NULL      Out of memory.
- */
-static struct vy_range *
-vy_range_new(int64_t id, struct tuple *begin, struct tuple *end,
-	     const struct key_def *key_def)
-{
-	struct vy_range *range = (struct vy_range*) calloc(1, sizeof(*range));
-	if (range == NULL) {
-		diag_set(OutOfMemory, sizeof(struct vy_range), "malloc",
-			 "struct vy_range");
-		return NULL;
-	}
-	range->id = id;
-	if (begin != NULL) {
-		tuple_ref(begin);
-		range->begin = begin;
-	}
-	if (end != NULL) {
-		tuple_ref(end);
-		range->end = end;
-	}
-	range->key_def = key_def;
-	rlist_create(&range->slices);
-	range->heap_node.pos = UINT32_MAX;
-	return range;
-}
-
 /**
  * Allocate a new active in-memory index for an index while moving
  * the old one to the sealed list. Used by the dump task in order
@@ -1778,100 +1377,6 @@ vy_index_rotate_mem(struct vy_index *index)
 	return 0;
 }
 
-static void
-vy_range_delete(struct vy_range *range)
-{
-	if (range->begin != NULL)
-		tuple_unref(range->begin);
-	if (range->end != NULL)
-		tuple_unref(range->end);
-
-	/* Delete all run slices. */
-	while (!rlist_empty(&range->slices)) {
-		struct vy_slice *slice = rlist_shift_entry(&range->slices,
-						struct vy_slice, in_range);
-		vy_slice_delete(slice);
-	}
-
-	TRASH(range);
-	free(range);
-}
-
-
-/**
- * Return true and set split_key accordingly if the range needs to be
- * split in two.
- *
- * - We should never split a range until it was merged at least once
- *   (actually, it should be a function of run_count_per_level/number
- *   of runs used for the merge: with low run_count_per_level it's more
- *   than once, with high run_count_per_level it's once).
- * - We should use the last run size as the size of the range.
- * - We should split around the last run middle key.
- * - We should only split if the last run size is greater than
- *   4/3 * range_size.
- */
-static bool
-vy_range_needs_split(struct vy_range *range, const struct index_opts *opts,
-		     const char **p_split_key)
-{
-	struct vy_slice *slice;
-
-	/* The range hasn't been merged yet - too early to split it. */
-	if (range->n_compactions < 1)
-		return false;
-
-	/* Find the oldest run. */
-	assert(!rlist_empty(&range->slices));
-	slice = rlist_last_entry(&range->slices, struct vy_slice, in_range);
-
-	/* The range is too small to be split. */
-	if (slice->count.bytes_compressed < opts->range_size * 4 / 3)
-		return false;
-
-	/* Find the median key in the oldest run (approximately). */
-	struct vy_page_info *mid_page;
-	mid_page = vy_run_page_info(slice->run, slice->first_page_no +
-				    (slice->last_page_no -
-				     slice->first_page_no) / 2);
-
-	struct vy_page_info *first_page = vy_run_page_info(slice->run,
-						slice->first_page_no);
-
-	/* No point in splitting if a new range is going to be empty. */
-	if (key_compare(first_page->min_key, mid_page->min_key,
-			range->key_def) == 0)
-		return false;
-	/*
-	 * In extreme cases the median key can be < the beginning
-	 * of the slice, e.g.
-	 *
-	 * RUN:
-	 * ... |---- page N ----|-- page N + 1 --|-- page N + 2 --
-	 *     | min_key = [10] | min_key = [50] | min_key = [100]
-	 *
-	 * SLICE:
-	 * begin = [30], end = [70]
-	 * first_page_no = N, last_page_no = N + 1
-	 *
-	 * which makes mid_page_no = N and mid_page->min_key = [10].
-	 *
-	 * In such cases there's no point in splitting the range.
-	 */
-	if (slice->begin != NULL && key_compare(mid_page->min_key,
-			tuple_data(slice->begin), range->key_def) <= 0)
-		return false;
-	/*
-	 * The median key can't be >= the end of the slice as we
-	 * take the min key of a page for the median key.
-	 */
-	assert(slice->end == NULL || key_compare(mid_page->min_key,
-			tuple_data(slice->end), range->key_def) < 0);
-
-	*p_split_key = mid_page->min_key;
-	return true;
-}
-
 /**
  * Split a range if it has grown too big, return true if the range
  * was split. Splitting is done by making slices of the runs used
@@ -1991,157 +1496,6 @@ vy_index_split_range(struct vy_index *index, struct vy_range *range)
 	return false;
 }
 
-/**
- * To reduce write amplification caused by compaction, we follow
- * the LSM tree design. Runs in each range are divided into groups
- * called levels:
- *
- *   level 1: runs 1 .. L_1
- *   level 2: runs L_1 + 1 .. L_2
- *   ...
- *   level N: runs L_{N-1} .. L_N
- *
- * where L_N is the total number of runs, N is the total number of
- * levels, older runs have greater numbers. Runs at each subsequent
- * are run_size_ratio times larger than on the previous one. When
- * the number of runs at a level exceeds run_count_per_level, we
- * compact all its runs along with all runs from the upper levels
- * and in-memory indexes.  Including  previous levels into
- * compaction is relatively cheap, because of the level size
- * ratio.
- *
- * Given a range, this function computes the maximal level that needs
- * to be compacted and sets @compact_priority to the number of runs in
- * this level and all preceding levels.
- */
-static void
-vy_range_update_compact_priority(struct vy_range *range,
-				 const struct index_opts *opts)
-{
-	assert(opts->run_count_per_level > 0);
-	assert(opts->run_size_ratio > 1);
-
-	range->compact_priority = 0;
-
-	/* Total number of checked runs. */
-	uint32_t total_run_count = 0;
-	/* The total size of runs checked so far. */
-	uint64_t total_size = 0;
-	/* Estimated size of a compacted run, if compaction is scheduled. */
-	uint64_t est_new_run_size = 0;
-	/* The number of runs at the current level. */
-	uint32_t level_run_count = 0;
-	/*
-	 * The target (perfect) size of a run at the current level.
-	 * For the first level, it's the size of the newest run.
-	 * For lower levels it's computed as first level run size
-	 * times run_size_ratio.
-	 */
-	uint64_t target_run_size = 0;
-
-	struct vy_slice *slice;
-	rlist_foreach_entry(slice, &range->slices, in_range) {
-		uint64_t size = slice->count.bytes_compressed;
-		/*
-		 * The size of the first level is defined by
-		 * the size of the most recent run.
-		 */
-		if (target_run_size == 0)
-			target_run_size = size;
-		total_size += size;
-		level_run_count++;
-		total_run_count++;
-		while (size > target_run_size) {
-			/*
-			 * The run size exceeds the threshold
-			 * set for the current level. Move this
-			 * run down to a lower level. Switch the
-			 * current level and reset the level run
-			 * count.
-			 */
-			level_run_count = 1;
-			/*
-			 * If we have already scheduled
-			 * a compaction of an upper level, and
-			 * estimated compacted run will end up at
-			 * this level, include the new run into
-			 * this level right away to avoid
-			 * a cascading compaction.
-			 */
-			if (est_new_run_size > target_run_size)
-				level_run_count++;
-			/*
-			 * Calculate the target run size for this
-			 * level.
-			 */
-			target_run_size *= opts->run_size_ratio;
-			/*
-			 * Keep pushing the run down until
-			 * we find an appropriate level for it.
-			 */
-		}
-		if (level_run_count > opts->run_count_per_level) {
-			/*
-			 * The number of runs at the current level
-			 * exceeds the configured maximum. Arrange
-			 * for compaction. We compact all runs at
-			 * this level and upper levels.
-			 */
-			range->compact_priority = total_run_count;
-			est_new_run_size = total_size;
-		}
-	}
-}
-
-/**
- * Check if a range should be coalesced with one or more its neighbors.
- * If it should, return true and set @p_first and @p_last to the first
- * and last ranges to coalesce, otherwise return false.
- *
- * We coalesce ranges together when they become too small, less than
- * half the target range size to avoid split-coalesce oscillations.
- */
-static bool
-vy_range_needs_coalesce(struct vy_range *range, vy_range_tree_t *tree,
-			const struct index_opts *opts,
-			struct vy_range **p_first, struct vy_range **p_last)
-{
-	struct vy_range *it;
-
-	/* Size of the coalesced range. */
-	uint64_t total_size = range->count.bytes_compressed;
-	/* Coalesce ranges until total_size > max_size. */
-	uint64_t max_size = opts->range_size / 2;
-
-	/*
-	 * We can't coalesce a range that was scheduled for dump
-	 * or compaction, because it is about to be processed by
-	 * a worker thread.
-	 */
-	assert(!vy_range_is_scheduled(range));
-
-	*p_first = *p_last = range;
-	for (it = vy_range_tree_next(tree, range);
-	     it != NULL && !vy_range_is_scheduled(it);
-	     it = vy_range_tree_next(tree, it)) {
-		uint64_t size = it->count.bytes_compressed;
-		if (total_size + size > max_size)
-			break;
-		total_size += size;
-		*p_last = it;
-	}
-	for (it = vy_range_tree_prev(tree, range);
-	     it != NULL && !vy_range_is_scheduled(it);
-	     it = vy_range_tree_prev(tree, it)) {
-		uint64_t size = it->count.bytes_compressed;
-		if (total_size + size > max_size)
-			break;
-		total_size += size;
-		*p_first = it;
-	}
-	return *p_first != *p_last;
-}
-
 /**
  * Coalesce a range with one or more its neighbors if it is too small,
  * return true if the range was coalesced. We coalesce ranges by
diff --git a/src/box/vy_range.c b/src/box/vy_range.c
new file mode 100644
index 0000000000..3a7401d962
--- /dev/null
+++ b/src/box/vy_range.c
@@ -0,0 +1,549 @@
+/*
+ * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "vy_range.h"
+
+#include <assert.h>
+#include <stdbool.h>
+#include <stddef.h>
+#include <stdint.h>
+#include <stdio.h>
+#include <stdlib.h>
+
+#define RB_COMPACT 1
+#include <small/rb.h>
+#include <small/rlist.h>
+
+#include "diag.h"
+#include "iterator_type.h"
+#include "key_def.h"
+#include "trivia/util.h"
+#include "tuple.h"
+#include "tuple_compare.h"
+#include "vy_run.h"
+#include "vy_stat.h"
+#include "vy_stmt.h"
+
+int
+vy_range_tree_cmp(struct vy_range *range_a, struct vy_range *range_b)
+{
+	if (range_a == range_b)
+		return 0;
+
+	/* Any key > -inf. */
+	if (range_a->begin == NULL)
+		return -1;
+	if (range_b->begin == NULL)
+		return 1;
+
+	assert(range_a->key_def == range_b->key_def);
+	return vy_key_compare(range_a->begin, range_b->begin,
+			      range_a->key_def);
+}
+
+int
+vy_range_tree_key_cmp(const struct tuple *stmt, struct vy_range *range)
+{
+	/* Any key > -inf. */
+	if (range->begin == NULL)
+		return 1;
+	return vy_stmt_compare_with_key(stmt, range->begin, range->key_def);
+}
+
+struct vy_range *
+vy_range_tree_find_by_key(vy_range_tree_t *tree,
+			  enum iterator_type iterator_type,
+			  const struct tuple *key)
+{
+	uint32_t key_field_count = tuple_field_count(key);
+	if (key_field_count == 0) {
+		switch (iterator_type) {
+		case ITER_LT:
+		case ITER_LE:
+			return vy_range_tree_last(tree);
+		case ITER_GT:
+		case ITER_GE:
+		case ITER_EQ:
+			return vy_range_tree_first(tree);
+		default:
+			unreachable();
+			return NULL;
+		}
+	}
+	struct vy_range *range;
+	if (iterator_type == ITER_GE || iterator_type == ITER_GT ||
+	    iterator_type == ITER_EQ) {
+		/**
+		 * Case 1. part_count == 1, looking for [10]. ranges:
+		 * {1, 3, 5} {7, 8, 9} {10, 15 20} {22, 32, 42}
+		 *                      ^looking for this
+		 * Case 2. part_count == 1, looking for [10]. ranges:
+		 * {1, 2, 4} {5, 6, 7, 8} {50, 100, 200}
+		 *            ^looking for this
+		 * Case 3. part_count == 2, looking for [10]. ranges:
+		 * {[1, 2], [2, 3]} {[9, 1], [10, 1], [10 2], [11 3]} {[12,..}
+		 *                   ^looking for this
+		 * Case 4. part_count == 2, looking for [10]. ranges:
+		 * {[1, 2], [10, 1]} {[10, 2] [10 3] [11 3]} {[12, 1]..}
+		 *  ^looking for this
+		 * Case 5. part_count does not matter, looking for [10].
+		 * ranges:
+		 * {100, 200}, {300, 400}
+		 * ^looking for this
+		 */
+		/**
+		 * vy_range_tree_psearch finds least range with begin == key
+		 * or previous if equal was not found
+		 */
+		range = vy_range_tree_psearch(tree, key);
+		/* switch to previous for case (4) */
+		if (range != NULL && range->begin != NULL &&
+		    key_field_count < range->key_def->part_count &&
+		    vy_stmt_compare_with_key(key, range->begin,
+					     range->key_def) == 0)
+			range = vy_range_tree_prev(tree, range);
+		/* for case 5 or subcase of case 4 */
+		if (range == NULL)
+			range = vy_range_tree_first(tree);
+	} else {
+		assert(iterator_type == ITER_LT || iterator_type == ITER_LE);
+		/**
+		 * Case 1. part_count == 1, looking for [10]. ranges:
+		 * {1, 3, 5} {7, 8, 9} {10, 15 20} {22, 32, 42}
+		 *                      ^looking for this
+		 * Case 2. part_count == 1, looking for [10]. ranges:
+		 * {1, 2, 4} {5, 6, 7, 8} {50, 100, 200}
+		 *            ^looking for this
+		 * Case 3. part_count == 2, looking for [10]. ranges:
+		 * {[1, 2], [2, 3]} {[9, 1], [10, 1], [10 2], [11 3]} {[12,..}
+		 *                   ^looking for this
+		 * Case 4. part_count == 2, looking for [10]. ranges:
+		 * {[1, 2], [10, 1]} {[10, 2] [10 3] [11 3]} {[12, 1]..}
+		 *                    ^looking for this
+		 * Case 5. part_count does not matter, looking for [10].
+		 * ranges:
+		 * {1, 2}, {3, 4, ..}
+		 *          ^looking for this
+		 */
+		/**
+		 * vy_range_tree_nsearch finds most range with begin == key
+		 * or next if equal was not found
+		 */
+		range = vy_range_tree_nsearch(tree, key);
+		if (range != NULL) {
+			/* fix curr_range for cases 2 and 3 */
+			if (range->begin != NULL &&
+			    vy_stmt_compare_with_key(key, range->begin,
+						     range->key_def) != 0) {
+				struct vy_range *prev;
+				prev = vy_range_tree_prev(tree, range);
+				if (prev != NULL)
+					range = prev;
+			}
+		} else {
+			/* Case 5 */
+			range = vy_range_tree_last(tree);
+		}
+	}
+	return range;
+}
+
+struct vy_range *
+vy_range_new(int64_t id, struct tuple *begin, struct tuple *end,
+	     const struct key_def *key_def)
+{
+	struct vy_range *range = calloc(1, sizeof(*range));
+	if (range == NULL) {
+		diag_set(OutOfMemory, sizeof(*range),
+			 "malloc", "struct vy_range");
+		return NULL;
+	}
+	range->id = id;
+	if (begin != NULL) {
+		tuple_ref(begin);
+		range->begin = begin;
+	}
+	if (end != NULL) {
+		tuple_ref(end);
+		range->end = end;
+	}
+	range->key_def = key_def;
+	rlist_create(&range->slices);
+	range->heap_node.pos = UINT32_MAX;
+	return range;
+}
+
+void
+vy_range_delete(struct vy_range *range)
+{
+	if (range->begin != NULL)
+		tuple_unref(range->begin);
+	if (range->end != NULL)
+		tuple_unref(range->end);
+
+	struct vy_slice *slice, *next_slice;
+	rlist_foreach_entry_safe(slice, &range->slices, in_range, next_slice)
+		vy_slice_delete(slice);
+
+	TRASH(range);
+	free(range);
+}
+
+int
+vy_range_snprint(char *buf, int size, const struct vy_range *range)
+{
+	int total = 0;
+	SNPRINT(total, snprintf, buf, size, "(");
+	if (range->begin != NULL)
+		SNPRINT(total, vy_key_snprint, buf, size,
+			tuple_data(range->begin));
+	else
+		SNPRINT(total, snprintf, buf, size, "-inf");
+	SNPRINT(total, snprintf, buf, size, "..");
+	if (range->end != NULL)
+		SNPRINT(total, vy_key_snprint, buf, size,
+			tuple_data(range->end));
+	else
+		SNPRINT(total, snprintf, buf, size, "inf");
+	SNPRINT(total, snprintf, buf, size, ")");
+	return total;
+}
+
+void
+vy_range_add_slice(struct vy_range *range, struct vy_slice *slice)
+{
+	rlist_add_entry(&range->slices, slice, in_range);
+	range->slice_count++;
+	vy_disk_stmt_counter_add(&range->count, &slice->count);
+}
+
+void
+vy_range_add_slice_before(struct vy_range *range, struct vy_slice *slice,
+			  struct vy_slice *next_slice)
+{
+	rlist_add_tail(&next_slice->in_range, &slice->in_range);
+	range->slice_count++;
+	vy_disk_stmt_counter_add(&range->count, &slice->count);
+}
+
+void
+vy_range_remove_slice(struct vy_range *range, struct vy_slice *slice)
+{
+	assert(range->slice_count > 0);
+	assert(!rlist_empty(&range->slices));
+	rlist_del_entry(slice, in_range);
+	range->slice_count--;
+	vy_disk_stmt_counter_sub(&range->count, &slice->count);
+}
+
+/**
+ * To reduce write amplification caused by compaction, we follow
+ * the LSM tree design. Runs in each range are divided into groups
+ * called levels:
+ *
+ *   level 1: runs 1 .. L_1
+ *   level 2: runs L_1 + 1 .. L_2
+ *   ...
+ *   level N: runs L_{N-1} .. L_N
+ *
+ * where L_N is the total number of runs, N is the total number of
+ * levels, older runs have greater numbers. Runs at each subsequent
+ * are run_size_ratio times larger than on the previous one. When
+ * the number of runs at a level exceeds run_count_per_level, we
+ * compact all its runs along with all runs from the upper levels
+ * and in-memory indexes.  Including  previous levels into
+ * compaction is relatively cheap, because of the level size
+ * ratio.
+ *
+ * Given a range, this function computes the maximal level that needs
+ * to be compacted and sets @compact_priority to the number of runs in
+ * this level and all preceding levels.
+ */
+void
+vy_range_update_compact_priority(struct vy_range *range,
+				 const struct index_opts *opts)
+{
+	assert(opts->run_count_per_level > 0);
+	assert(opts->run_size_ratio > 1);
+
+	range->compact_priority = 0;
+
+	/* Total number of checked runs. */
+	uint32_t total_run_count = 0;
+	/* The total size of runs checked so far. */
+	uint64_t total_size = 0;
+	/* Estimated size of a compacted run, if compaction is scheduled. */
+	uint64_t est_new_run_size = 0;
+	/* The number of runs at the current level. */
+	uint32_t level_run_count = 0;
+	/*
+	 * The target (perfect) size of a run at the current level.
+	 * For the first level, it's the size of the newest run.
+	 * For lower levels it's computed as first level run size
+	 * times run_size_ratio.
+	 */
+	uint64_t target_run_size = 0;
+
+	struct vy_slice *slice;
+	rlist_foreach_entry(slice, &range->slices, in_range) {
+		uint64_t size = slice->count.bytes_compressed;
+		/*
+		 * The size of the first level is defined by
+		 * the size of the most recent run.
+		 */
+		if (target_run_size == 0)
+			target_run_size = size;
+		total_size += size;
+		level_run_count++;
+		total_run_count++;
+		while (size > target_run_size) {
+			/*
+			 * The run size exceeds the threshold
+			 * set for the current level. Move this
+			 * run down to a lower level. Switch the
+			 * current level and reset the level run
+			 * count.
+			 */
+			level_run_count = 1;
+			/*
+			 * If we have already scheduled
+			 * a compaction of an upper level, and
+			 * estimated compacted run will end up at
+			 * this level, include the new run into
+			 * this level right away to avoid
+			 * a cascading compaction.
+			 */
+			if (est_new_run_size > target_run_size)
+				level_run_count++;
+			/*
+			 * Calculate the target run size for this
+			 * level.
+			 */
+			target_run_size *= opts->run_size_ratio;
+			/*
+			 * Keep pushing the run down until
+			 * we find an appropriate level for it.
+			 */
+		}
+		if (level_run_count > opts->run_count_per_level) {
+			/*
+			 * The number of runs at the current level
+			 * exceeds the configured maximum. Arrange
+			 * for compaction. We compact all runs at
+			 * this level and upper levels.
+			 */
+			range->compact_priority = total_run_count;
+			est_new_run_size = total_size;
+		}
+	}
+}
+
+/**
+ * Return true and set split_key accordingly if the range needs to be
+ * split in two.
+ *
+ * - We should never split a range until it was merged at least once
+ *   (actually, it should be a function of run_count_per_level/number
+ *   of runs used for the merge: with low run_count_per_level it's more
+ *   than once, with high run_count_per_level it's once).
+ * - We should use the last run size as the size of the range.
+ * - We should split around the last run middle key.
+ * - We should only split if the last run size is greater than
+ *   4/3 * range_size.
+ */
+bool
+vy_range_needs_split(struct vy_range *range, const struct index_opts *opts,
+		     const char **p_split_key)
+{
+	struct vy_slice *slice;
+
+	/* The range hasn't been merged yet - too early to split it. */
+	if (range->n_compactions < 1)
+		return false;
+
+	/* Find the oldest run. */
+	assert(!rlist_empty(&range->slices));
+	slice = rlist_last_entry(&range->slices, struct vy_slice, in_range);
+
+	/* The range is too small to be split. */
+	if (slice->count.bytes_compressed < opts->range_size * 4 / 3)
+		return false;
+
+	/* Find the median key in the oldest run (approximately). */
+	struct vy_page_info *mid_page;
+	mid_page = vy_run_page_info(slice->run, slice->first_page_no +
+				    (slice->last_page_no -
+				     slice->first_page_no) / 2);
+
+	struct vy_page_info *first_page = vy_run_page_info(slice->run,
+						slice->first_page_no);
+
+	/* No point in splitting if a new range is going to be empty. */
+	if (key_compare(first_page->min_key, mid_page->min_key,
+			range->key_def) == 0)
+		return false;
+	/*
+	 * In extreme cases the median key can be < the beginning
+	 * of the slice, e.g.
+	 *
+	 * RUN:
+	 * ... |---- page N ----|-- page N + 1 --|-- page N + 2 --
+	 *     | min_key = [10] | min_key = [50] | min_key = [100]
+	 *
+	 * SLICE:
+	 * begin = [30], end = [70]
+	 * first_page_no = N, last_page_no = N + 1
+	 *
+	 * which makes mid_page_no = N and mid_page->min_key = [10].
+	 *
+	 * In such cases there's no point in splitting the range.
+	 */
+	if (slice->begin != NULL && key_compare(mid_page->min_key,
+			tuple_data(slice->begin), range->key_def) <= 0)
+		return false;
+	/*
+	 * The median key can't be >= the end of the slice as we
+	 * take the min key of a page for the median key.
+	 */
+	assert(slice->end == NULL || key_compare(mid_page->min_key,
+			tuple_data(slice->end), range->key_def) < 0);
+
+	*p_split_key = mid_page->min_key;
+	return true;
+}
+
+/**
+ * Check if a range should be coalesced with one or more its neighbors.
+ * If it should, return true and set @p_first and @p_last to the first
+ * and last ranges to coalesce, otherwise return false.
+ *
+ * We coalesce ranges together when they become too small, less than
+ * half the target range size to avoid split-coalesce oscillations.
+ */
+bool
+vy_range_needs_coalesce(struct vy_range *range, vy_range_tree_t *tree,
+			const struct index_opts *opts,
+			struct vy_range **p_first, struct vy_range **p_last)
+{
+	struct vy_range *it;
+
+	/* Size of the coalesced range. */
+	uint64_t total_size = range->count.bytes_compressed;
+	/* Coalesce ranges until total_size > max_size. */
+	uint64_t max_size = opts->range_size / 2;
+
+	/*
+	 * We can't coalesce a range that was scheduled for dump
+	 * or compaction, because it is about to be processed by
+	 * a worker thread.
+	 */
+	assert(!vy_range_is_scheduled(range));
+
+	*p_first = *p_last = range;
+	for (it = vy_range_tree_next(tree, range);
+	     it != NULL && !vy_range_is_scheduled(it);
+	     it = vy_range_tree_next(tree, it)) {
+		uint64_t size = it->count.bytes_compressed;
+		if (total_size + size > max_size)
+			break;
+		total_size += size;
+		*p_last = it;
+	}
+	for (it = vy_range_tree_prev(tree, range);
+	     it != NULL && !vy_range_is_scheduled(it);
+	     it = vy_range_tree_prev(tree, it)) {
+		uint64_t size = it->count.bytes_compressed;
+		if (total_size + size > max_size)
+			break;
+		total_size += size;
+		*p_first = it;
+	}
+	return *p_first != *p_last;
+}
+
+void
+vy_range_iterator_open(struct vy_range_iterator *itr, vy_range_tree_t *tree,
+		       enum iterator_type iterator_type,
+		       const struct tuple *key)
+{
+	itr->tree = tree;
+	itr->iterator_type = iterator_type;
+	itr->key = key;
+	itr->curr_range = NULL;
+}
+
+void
+vy_range_iterator_next(struct vy_range_iterator *itr, struct vy_range **result)
+{
+	struct vy_range *curr = itr->curr_range;
+	struct vy_range *next;
+
+	if (curr == NULL) {
+		/* First iteration */
+		next = vy_range_tree_find_by_key(itr->tree, itr->iterator_type,
+						 itr->key);
+		goto out;
+	}
+	switch (itr->iterator_type) {
+	case ITER_LT:
+	case ITER_LE:
+		next = vy_range_tree_prev(itr->tree, curr);
+		break;
+	case ITER_GT:
+	case ITER_GE:
+		next = vy_range_tree_next(itr->tree, curr);
+		break;
+	case ITER_EQ:
+		if (curr->end != NULL &&
+		    vy_stmt_compare_with_key(itr->key, curr->end,
+					     curr->key_def) >= 0) {
+			/* A partial key can be found in more than one range. */
+			next = vy_range_tree_next(itr->tree, curr);
+		} else {
+			next = NULL;
+		}
+		break;
+	default:
+		unreachable();
+	}
+out:
+	*result = itr->curr_range = next;
+}
+
+void
+vy_range_iterator_restore(struct vy_range_iterator *itr,
+			  const struct tuple *last_stmt,
+			  struct vy_range **result)
+{
+	struct vy_range *curr = vy_range_tree_find_by_key(itr->tree,
+				itr->iterator_type,
+				last_stmt != NULL ? last_stmt : itr->key);
+	*result = itr->curr_range = curr;
+}
diff --git a/src/box/vy_range.h b/src/box/vy_range.h
new file mode 100644
index 0000000000..8bb6849a7f
--- /dev/null
+++ b/src/box/vy_range.h
@@ -0,0 +1,321 @@
+#ifndef INCLUDES_TARANTOOL_BOX_VY_RANGE_H
+#define INCLUDES_TARANTOOL_BOX_VY_RANGE_H
+/*
+ * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file.
+ *
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include <stdbool.h>
+#include <stdint.h>
+
+#define RB_COMPACT 1
+#include <small/rb.h>
+#include <small/rlist.h>
+
+#include "iterator_type.h"
+#define HEAP_FORWARD_DECLARATION
+#include "salad/heap.h"
+#include "trivia/util.h"
+#include "vy_stat.h"
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+struct index_opts;
+struct key_def;
+struct tuple;
+struct vy_slice;
+
+/**
+ * Range of keys in an index stored on disk.
+ */
+struct vy_range {
+	/** Unique ID of this range. */
+	int64_t id;
+	/**
+	 * Range lower bound. NULL if range is leftmost.
+	 * Both 'begin' and 'end' statements have SELECT type with
+	 * the full idexed key.
+	 */
+	struct tuple *begin;
+	/** Range upper bound. NULL if range is rightmost. */
+	struct tuple *end;
+	/** Key definition for comparing range boundaries. */
+	const struct key_def *key_def;
+	/** An estimate of the number of statements in this range. */
+	struct vy_disk_stmt_counter count;
+	/**
+	 * List of run slices in this range, linked by vy_slice->in_range.
+	 * The newer a slice, the closer it to the list head.
+	 */
+	struct rlist slices;
+	/** Number of entries in the ->slices list. */
+	int slice_count;
+	/**
+	 * The goal of compaction is to reduce read amplification.
+	 * All ranges for which the LSM tree has more runs per
+	 * level than run_count_per_level or run size larger than
+	 * one defined by run_size_ratio of this level are candidates
+	 * for compaction.
+	 * Unlike other LSM implementations, Vinyl can have many
+	 * sorted runs in a single level, and is able to compact
+	 * runs from any number of adjacent levels. Moreover,
+	 * higher levels are always taken in when compacting
+	 * a lower level - i.e. L1 is always included when
+	 * compacting L2, and both L1 and L2 are always included
+	 * when compacting L3.
+	 *
+	 * This variable contains the number of runs the next
+	 * compaction of this range will include.
+	 *
+	 * The lower the level is scheduled for compaction,
+	 * the bigger it tends to be because upper levels are
+	 * taken in.
+	 * @sa vy_range_update_compact_priority() to see
+	 * how we  decide how many runs to compact next time.
+	 */
+	int compact_priority;
+	/** Number of times the range was compacted. */
+	int n_compactions;
+	/** Link in vy_index->tree. */
+	rb_node(struct vy_range) tree_node;
+	/** Link in vy_index->range_heap. */
+	struct heap_node heap_node;
+	/**
+	 * Incremented whenever an in-memory index or on disk
+	 * run is added to or deleted from this range. Used to
+	 * invalidate iterators.
+	 */
+	uint32_t version;
+};
+
+/**
+ * Heap of all ranges of the same index, prioritized by
+ * vy_range->compact_priority.
+ */
+#define HEAP_NAME vy_range_heap
+static inline bool
+vy_range_heap_less(struct heap_node *a, struct heap_node *b)
+{
+	struct vy_range *r1 = container_of(a, struct vy_range, heap_node);
+	struct vy_range *r2 = container_of(b, struct vy_range, heap_node);
+	return r1->compact_priority > r2->compact_priority;
+}
+#define HEAP_LESS(h, l, r) vy_range_heap_less(l, r)
+#include "salad/heap.h"
+#undef HEAP_LESS
+#undef HEAP_NAME
+
+/** Return true if a task is scheduled for a given range. */
+static inline bool
+vy_range_is_scheduled(struct vy_range *range)
+{
+	return range->heap_node.pos == UINT32_MAX;
+}
+
+/**
+ * Search tree of all ranges of the same index, sorted by
+ * vy_range->begin. Ranges in a tree are supposed to span
+ * all possible keys without overlaps.
+ */
+int
+vy_range_tree_cmp(struct vy_range *range_a, struct vy_range *range_b);
+int
+vy_range_tree_key_cmp(const struct tuple *stmt, struct vy_range *range);
+
+typedef rb_tree(struct vy_range) vy_range_tree_t;
+rb_gen_ext_key(MAYBE_UNUSED static inline, vy_range_tree_, vy_range_tree_t,
+	       struct vy_range, tree_node, vy_range_tree_cmp,
+	       const struct tuple *, vy_range_tree_key_cmp);
+
+/**
+ * Find the first range in which a given key should be looked up.
+ *
+ * @param tree          Range tree to search.
+ * @param iterator_type Iterator type.
+ * @param key           Key to look up.
+ *
+ * @retval              The first range to look up the key in.
+ */
+struct vy_range *
+vy_range_tree_find_by_key(vy_range_tree_t *tree,
+			  enum iterator_type iterator_type,
+			  const struct tuple *key);
+
+/**
+ * Allocate and initialize a range (either a new one or for
+ * restore from disk).
+ *
+ * @param id        Range id.
+ * @param begin     Range begin (inclusive) or NULL for -inf.
+ * @param end       Range end (exclusive) or NULL for +inf.
+ * @param key_def   Key definition for comparing range boundaries.
+ *
+ * @retval not NULL The new range.
+ * @retval NULL     Out of memory.
+ */
+struct vy_range *
+vy_range_new(int64_t id, struct tuple *begin, struct tuple *end,
+	     const struct key_def *key_def);
+
+/**
+ * Free a range and all its slices.
+ *
+ * @param range     Range to free.
+ */
+void
+vy_range_delete(struct vy_range *range);
+
+/** An snprint-style function to print boundaries of a range. */
+int
+vy_range_snprint(char *buf, int size, const struct vy_range *range);
+
+static inline const char *
+vy_range_str(struct vy_range *range)
+{
+	char *buf = tt_static_buf();
+	vy_range_snprint(buf, TT_STATIC_BUF_LEN, range);
+	return buf;
+}
+
+/** Add a run slice to the head of a range's list. */
+void
+vy_range_add_slice(struct vy_range *range, struct vy_slice *slice);
+
+/** Add a run slice to a range's list before @next_slice. */
+void
+vy_range_add_slice_before(struct vy_range *range, struct vy_slice *slice,
+			  struct vy_slice *next_slice);
+
+/** Remove a run slice from a range's list. */
+void
+vy_range_remove_slice(struct vy_range *range, struct vy_slice *slice);
+
+/**
+ * Update compaction priority of a range.
+ *
+ * @param range     The range.
+ * @param opts      Index options.
+ */
+void
+vy_range_update_compact_priority(struct vy_range *range,
+				 const struct index_opts *opts);
+
+/**
+ * Check if a range needs to be split in two.
+ *
+ * @param range             The range.
+ * @param opts              Index options.
+ * @param[out] p_split_key  Key to split the range by.
+ *
+ * @retval true             If the range needs to be split.
+ */
+bool
+vy_range_needs_split(struct vy_range *range, const struct index_opts *opts,
+		     const char **p_split_key);
+
+/**
+ * Check if a range needs to be coalesced with adjacent
+ * ranges in a range tree.
+ *
+ * @param range         The range.
+ * @param tree          The range tree.
+ * @param opts          Index options.
+ * @param[out] p_first  The first range in the tree to coalesce.
+ * @param[out] p_last   The last range in the tree to coalesce.
+ *
+ * @retval true         If the range needs to be coalesced.
+ */
+bool
+vy_range_needs_coalesce(struct vy_range *range, vy_range_tree_t *tree,
+			const struct index_opts *opts,
+			struct vy_range **p_first, struct vy_range **p_last);
+
+/**
+ * Iterator over ranges in a tree.
+ */
+struct vy_range_iterator {
+	/** Range tree to iterate. */
+	vy_range_tree_t *tree;
+	/** Iterator type. */
+	enum iterator_type iterator_type;
+	/** Search key. */
+	const struct tuple *key;
+	/**
+	 * Current range or NULL if the iteration
+	 * has stopped or has not been started.
+	 */
+	struct vy_range *curr_range;
+};
+
+/**
+ * Initialize a range iterator.
+ *
+ * @param itr           The iterator.
+ * @param tree          Range tree to iterate.
+ * @param iterator_type Iterator type.
+ * @param key           Search key.
+ */
+void
+vy_range_iterator_open(struct vy_range_iterator *itr, vy_range_tree_t *tree,
+		       enum iterator_type iterator_type,
+		       const struct tuple *key);
+
+/**
+ * Iterate to the next range.
+ *
+ * @param itr           The iterator.
+ * @param[out] result   Next range.
+ */
+void
+vy_range_iterator_next(struct vy_range_iterator *itr,
+		       struct vy_range **result);
+
+/**
+ * Restore an iterator after a tree modification.
+ *
+ * @param itr           The iterator.
+ * @param last_stmt     The last iterated statement.
+ * @param[out] result   Next range.
+ *
+ * This function positions the iterator to the range that
+ * contains @last_stmt. If @last_stmt is NULL, it restarts
+ * the iterator.
+ */
+void
+vy_range_iterator_restore(struct vy_range_iterator *itr,
+			  const struct tuple *last_stmt,
+			  struct vy_range **result);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
+
+#endif /* INCLUDES_TARANTOOL_BOX_VY_RANGE_H */
-- 
GitLab