Skip to content
Snippets Groups Projects
Commit 4fed8854 authored by Vladimir Davydov's avatar Vladimir Davydov Committed by Roman Tsisyk
Browse files

vinyl: fold vy_merge_iterator in vy_read_iterator

Initially merge sort implementation was supposed to be shared between
read and write iterators hence we got vy_merge_iterator. However, over
time such a design decision proved to be inadequate, because it turned
out that the two iterators can barely share a few lines - so different
are the tasks they solve. The write iterator has already been rewritten
without the use of the vy_merge_iterator so we can now fold the latter
in vy_read_iterator. While we are at it, spruce up some comments a bit.
parent 6fc49730
No related branches found
No related tags found
No related merge requests found
......@@ -39,14 +39,12 @@
#include "vy_stat.h"
#include "vy_point_iterator.h"
/* {{{ Merge iterator */
/**
* Merge source, support structure for vy_merge_iterator
* Contains source iterator, additional properties and merge state
* Merge source, support structure for vy_read_iterator.
* Contains source iterator and merge state.
*/
struct vy_merge_src {
/** Source iterator */
struct vy_read_src {
/** Source iterator. */
union {
struct vy_run_iterator run_iterator;
struct vy_mem_iterator mem_iterator;
......@@ -54,80 +52,27 @@ struct vy_merge_src {
struct vy_cache_iterator cache_iterator;
struct vy_stmt_iterator iterator;
};
/** Source can change during merge iteration */
/** Set if the source can change after yield. */
bool is_mutable;
/** Set if the iterator was started. */
bool is_started;
/**
* All sources with the same front_id as in struct
* vy_merge_iterator are on the same key of current output
* stmt (optimization)
*/
/** See vy_read_iterator->front_id. */
uint32_t front_id;
/** Statement the iterator is at. */
struct tuple *stmt;
};
/**
* Open the iterator.
*/
static void
vy_merge_iterator_open(struct vy_merge_iterator *itr,
enum iterator_type iterator_type,
struct tuple *key,
const struct key_def *cmp_def,
struct tuple_format *format,
struct tuple_format *upsert_format,
bool is_primary)
{
assert(key != NULL);
itr->cmp_def = cmp_def;
itr->format = format;
itr->upsert_format = upsert_format;
itr->is_primary = is_primary;
itr->range_tree_version = 0;
itr->p_mem_list_version = 0;
itr->range_version = 0;
itr->p_range_tree_version = NULL;
itr->p_mem_list_version = NULL;
itr->p_range_version = NULL;
itr->key = key;
itr->iterator_type = iterator_type;
itr->src = NULL;
itr->src_count = 0;
itr->src_capacity = 0;
itr->src = NULL;
itr->curr_src = UINT32_MAX;
itr->front_id = 1;
itr->mutable_start = 0;
itr->mutable_end = 0;
itr->skipped_start = 0;
itr->curr_stmt = NULL;
}
/**
* Close the iterator and free resources.
*/
static void
vy_merge_iterator_close(struct vy_merge_iterator *itr)
{
if (itr->curr_stmt != NULL)
tuple_unref(itr->curr_stmt);
for (size_t i = 0; i < itr->src_count; i++)
itr->src[i].iterator.iface->close(&itr->src[i].iterator);
free(itr->src);
}
/**
* Extend internal source array capacity to fit capacity sources.
* Not necessary to call is but calling it allows to optimize internal memory
* allocation
*/
static NODISCARD int
vy_merge_iterator_reserve(struct vy_merge_iterator *itr, uint32_t capacity)
vy_read_iterator_reserve(struct vy_read_iterator *itr, uint32_t capacity)
{
if (itr->src_capacity >= capacity)
return 0;
struct vy_merge_src *new_src = calloc(capacity, sizeof(*new_src));
struct vy_read_src *new_src = calloc(capacity, sizeof(*new_src));
if (new_src == NULL) {
diag_set(OutOfMemory, capacity * sizeof(*new_src),
"calloc", "new_src");
......@@ -143,19 +88,17 @@ vy_merge_iterator_reserve(struct vy_merge_iterator *itr, uint32_t capacity)
}
/**
* Add another source to merge iterator. Must be called before actual
* Add another source to read iterator. Must be called before actual
* iteration start and must not be called after.
* @sa necessary order of adding requirements in struct vy_merge_iterator
* comments.
* The resulting vy_stmt_iterator must be properly initialized before merge
* iteration start.
* param is_mutable - Source can change during merge iteration
*/
static struct vy_merge_src *
vy_merge_iterator_add(struct vy_merge_iterator *itr, bool is_mutable)
static struct vy_read_src *
vy_read_iterator_add_src(struct vy_read_iterator *itr, bool is_mutable)
{
if (itr->src_count == itr->src_capacity) {
if (vy_merge_iterator_reserve(itr, itr->src_count + 1) != 0)
if (vy_read_iterator_reserve(itr, itr->src_count + 1) != 0)
return NULL;
}
if (is_mutable) {
......@@ -164,47 +107,27 @@ vy_merge_iterator_add(struct vy_merge_iterator *itr, bool is_mutable)
itr->mutable_end = itr->src_count + 1;
}
itr->src[itr->src_count].front_id = 0;
struct vy_merge_src *src = &itr->src[itr->src_count++];
struct vy_read_src *src = &itr->src[itr->src_count++];
memset(src, 0, sizeof(*src));
src->is_mutable = is_mutable;
return src;
}
/*
* Enable version checking.
*/
static void
vy_merge_iterator_set_version(struct vy_merge_iterator *itr,
const uint32_t *p_range_tree_version,
const uint32_t *p_mem_list_version,
const uint32_t *p_range_version)
{
itr->p_range_tree_version = p_range_tree_version;
if (itr->p_range_tree_version != NULL)
itr->range_tree_version = *p_range_tree_version;
itr->p_mem_list_version = p_mem_list_version;
if (itr->p_mem_list_version != NULL)
itr->mem_list_version = *p_mem_list_version;
itr->p_range_version = p_range_version;
if (itr->p_range_version != NULL)
itr->range_version = *p_range_version;
}
/*
* Try to restore position of merge iterator
/**
* Check if the read iterator needs to be restored.
*
* @retval 0 if position did not change (iterator started)
* @retval -2 iterator is no more valid
*/
static NODISCARD int
vy_merge_iterator_check_version(struct vy_merge_iterator *itr)
vy_read_iterator_check_version(struct vy_read_iterator *itr)
{
if (itr->p_range_tree_version != NULL &&
*itr->p_range_tree_version != itr->range_tree_version)
if (itr->index->mem_list_version != itr->mem_list_version)
return -2;
if (itr->p_mem_list_version != NULL &&
*itr->p_mem_list_version != itr->mem_list_version)
if (itr->index->range_tree_version != itr->range_tree_version)
return -2;
if (itr->p_range_version != NULL &&
*itr->p_range_version != itr->range_version)
if (itr->curr_range != NULL &&
itr->curr_range->version != itr->range_version)
return -2;
return 0;
}
......@@ -216,10 +139,10 @@ vy_merge_iterator_check_version(struct vy_merge_iterator *itr)
* @retval -2 iterator is not valid anymore
*/
static NODISCARD int
vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret)
{
*ret = NULL;
const struct key_def *def = itr->cmp_def;
const struct key_def *def = itr->index->cmp_def;
if (itr->curr_stmt != NULL && itr->iterator_type == ITER_EQ &&
tuple_field_count(itr->key) >= def->part_count) {
/*
......@@ -228,7 +151,7 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
*/
return 0;
}
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
int dir = iterator_direction(itr->iterator_type);
uint32_t prev_front_id = itr->front_id;
......@@ -242,7 +165,7 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
bool is_yield_possible = i >= itr->mutable_end;
was_yield_possible = was_yield_possible || is_yield_possible;
struct vy_merge_src *src = &itr->src[i];
struct vy_read_src *src = &itr->src[i];
bool stop = false;
if (!src->is_started) {
......@@ -275,7 +198,7 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
}
if (rc < 0)
return -1;
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
if (i >= itr->skipped_start && itr->curr_stmt != NULL) {
/*
......@@ -289,7 +212,7 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
rc = src->iterator.iface->next_key(&src->iterator,
&src->stmt,
&stop);
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
if (rc != 0)
return rc;
......@@ -343,12 +266,12 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
for (int i = MIN(itr->skipped_start, itr->mutable_end) - 1;
was_yield_possible && i >= (int) itr->mutable_start; i--) {
struct vy_merge_src *src = &itr->src[i];
struct vy_read_src *src = &itr->src[i];
bool stop;
rc = src->iterator.iface->restore(&src->iterator,
itr->curr_stmt,
&src->stmt, &stop);
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
if (rc < 0)
return rc;
......@@ -400,17 +323,17 @@ vy_merge_iterator_next_key(struct vy_merge_iterator *itr, struct tuple **ret)
* @retval -2 iterator is not valid anymore
*/
static NODISCARD int
vy_merge_iterator_next_lsn(struct vy_merge_iterator *itr, struct tuple **ret)
vy_read_iterator_next_lsn(struct vy_read_iterator *itr, struct tuple **ret)
{
*ret = NULL;
if (itr->curr_src == UINT32_MAX)
return 0;
assert(itr->curr_stmt != NULL);
const struct key_def *def = itr->cmp_def;
struct vy_merge_src *src = &itr->src[itr->curr_src];
const struct key_def *def = itr->index->cmp_def;
struct vy_read_src *src = &itr->src[itr->curr_src];
struct vy_stmt_iterator *sub_itr = &src->iterator;
int rc = sub_itr->iface->next_lsn(sub_itr, &src->stmt);
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
if (rc != 0)
return rc;
......@@ -432,7 +355,7 @@ vy_merge_iterator_next_lsn(struct vy_merge_iterator *itr, struct tuple **ret)
rc = src->iterator.iface->next_key(&src->iterator,
&src->stmt,
&stop);
if (vy_merge_iterator_check_version(itr))
if (vy_read_iterator_check_version(itr))
return -2;
if (rc != 0)
return rc;
......@@ -461,35 +384,34 @@ vy_merge_iterator_next_lsn(struct vy_merge_iterator *itr, struct tuple **ret)
}
/**
* Squash in the single statement all rest statements of current key
* starting from the current statement.
* Squash in a single REPLACE all UPSERTs for the current key.
*
* @retval 0 success or EOF (*ret == NULL)
* @retval 0 success
* @retval -1 error
* @retval -2 invalid iterator
*/
static NODISCARD int
vy_merge_iterator_squash_upsert(struct vy_merge_iterator *itr,
struct tuple **ret, int64_t *upserts_applied)
vy_read_iterator_squash_upsert(struct vy_read_iterator *itr,
struct tuple **ret)
{
*ret = NULL;
struct vy_index *index = itr->index;
struct tuple *t = itr->curr_stmt;
/* Upserts enabled only in the primary index. */
assert(vy_stmt_type(t) != IPROTO_UPSERT || itr->is_primary);
assert(vy_stmt_type(t) != IPROTO_UPSERT || index->id == 0);
tuple_ref(t);
while (vy_stmt_type(t) == IPROTO_UPSERT) {
struct tuple *next;
int rc = vy_merge_iterator_next_lsn(itr, &next);
int rc = vy_read_iterator_next_lsn(itr, &next);
if (rc != 0) {
tuple_unref(t);
return rc;
}
struct tuple *applied;
assert(itr->is_primary);
applied = vy_apply_upsert(t, next, itr->cmp_def, itr->format,
itr->upsert_format, true);
++*upserts_applied;
struct tuple *applied = vy_apply_upsert(t, next,
index->cmp_def, index->mem_format,
index->upsert_format, true);
index->stat.upsert.applied++;
tuple_unref(t);
if (applied == NULL)
return -1;
......@@ -501,16 +423,13 @@ vy_merge_iterator_squash_upsert(struct vy_merge_iterator *itr,
return 0;
}
/* }}} Merge iterator */
static void
vy_read_iterator_add_tx(struct vy_read_iterator *itr,
enum iterator_type iterator_type, struct tuple *key)
{
assert(itr->tx != NULL);
struct vy_txw_iterator_stat *stat = &itr->index->stat.txw.iterator;
struct vy_merge_src *sub_src =
vy_merge_iterator_add(&itr->merge_iterator, true);
struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, true);
vy_txw_iterator_open(&sub_src->txw_iterator, stat, itr->tx, itr->index,
iterator_type, key);
}
......@@ -519,8 +438,7 @@ static void
vy_read_iterator_add_cache(struct vy_read_iterator *itr,
enum iterator_type iterator_type, struct tuple *key)
{
struct vy_merge_src *sub_src =
vy_merge_iterator_add(&itr->merge_iterator, true);
struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, true);
vy_cache_iterator_open(&sub_src->cache_iterator,
&itr->index->cache, iterator_type,
key, itr->read_view);
......@@ -531,11 +449,11 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr,
enum iterator_type iterator_type, struct tuple *key)
{
struct vy_index *index = itr->index;
struct vy_merge_src *sub_src;
struct vy_read_src *sub_src;
/* Add the active in-memory index. */
assert(index->mem != NULL);
sub_src = vy_merge_iterator_add(&itr->merge_iterator, true);
sub_src = vy_read_iterator_add_src(itr, true);
vy_mem_iterator_open(&sub_src->mem_iterator,
&index->stat.memory.iterator,
index->mem, iterator_type, key,
......@@ -543,7 +461,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr,
/* Add sealed in-memory indexes. */
struct vy_mem *mem;
rlist_foreach_entry(mem, &index->sealed, in_sealed) {
sub_src = vy_merge_iterator_add(&itr->merge_iterator, false);
sub_src = vy_read_iterator_add_src(itr, false);
vy_mem_iterator_open(&sub_src->mem_iterator,
&index->stat.memory.iterator,
mem, iterator_type, key,
......@@ -569,7 +487,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr,
* a new run slice to a range and before removing
* dumped in-memory trees. We must not add both
* the slice and the trees in this case, because
* merge iterator can't deal with duplicates.
* the read iterator can't deal with duplicates.
* Since index->dump_lsn is bumped after deletion
* of dumped in-memory trees, we can filter out
* the run slice containing duplicates by LSN.
......@@ -577,8 +495,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr,
if (slice->run->info.min_lsn > index->dump_lsn)
continue;
assert(slice->run->info.max_lsn <= index->dump_lsn);
struct vy_merge_src *sub_src = vy_merge_iterator_add(
&itr->merge_iterator, false);
struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, false);
vy_run_iterator_open(&sub_src->run_iterator,
&index->stat.disk.iterator,
itr->run_env, slice,
......@@ -590,7 +507,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr,
}
/**
* Set up merge iterator for the current range.
* Set up the read iterator for the current range.
*/
static void
vy_read_iterator_use_range(struct vy_read_iterator *itr)
......@@ -598,6 +515,24 @@ vy_read_iterator_use_range(struct vy_read_iterator *itr)
struct tuple *key = itr->key;
enum iterator_type iterator_type = itr->iterator_type;
/* Close all open sources and reset merge state. */
if (itr->curr_stmt != NULL)
tuple_unref(itr->curr_stmt);
for (uint32_t i = 0; i < itr->src_count; i++)
itr->src[i].iterator.iface->close(&itr->src[i].iterator);
itr->src_count = 0;
itr->mutable_start = 0;
itr->mutable_end = 0;
itr->skipped_start = 0;
itr->curr_stmt = NULL;
itr->curr_src = UINT32_MAX;
itr->front_id = 1;
/*
* Open all sources starting from the last statement
* returned to the user. Newer sources must be added
* first.
*/
if (itr->last_stmt != NULL) {
if (iterator_type == ITER_EQ)
itr->need_check_eq = true;
......@@ -612,36 +547,26 @@ vy_read_iterator_use_range(struct vy_read_iterator *itr)
vy_read_iterator_add_cache(itr, iterator_type, key);
vy_read_iterator_add_mem(itr, iterator_type, key);
if (itr->curr_range != NULL)
if (itr->curr_range != NULL) {
itr->range_version = itr->curr_range->version;
vy_read_iterator_add_disk(itr, iterator_type, key);
/* Enable range and range index version checks */
vy_merge_iterator_set_version(&itr->merge_iterator,
&itr->index->range_tree_version,
&itr->index->mem_list_version,
itr->curr_range != NULL ?
&itr->curr_range->version : NULL);
}
}
/**
* Open the iterator.
*/
void
vy_read_iterator_open(struct vy_read_iterator *itr, struct vy_run_env *run_env,
struct vy_index *index, struct vy_tx *tx,
enum iterator_type iterator_type, struct tuple *key,
const struct vy_read_view **rv)
{
memset(itr, 0, sizeof(*itr));
itr->run_env = run_env;
itr->index = index;
itr->tx = tx;
itr->iterator_type = iterator_type;
itr->key = key;
itr->read_view = rv;
itr->search_started = false;
itr->need_check_eq = false;
itr->last_stmt = NULL;
itr->curr_range = NULL;
if (tuple_field_count(key) == 0) {
/*
......@@ -672,7 +597,7 @@ vy_read_iterator_open(struct vy_read_iterator *itr, struct vy_run_env *run_env,
}
/**
* Start lazy search
* Prepare the read iterator for the first iteration.
*/
static void
vy_read_iterator_start(struct vy_read_iterator *itr)
......@@ -682,12 +607,10 @@ vy_read_iterator_start(struct vy_read_iterator *itr)
assert(itr->curr_range == NULL);
itr->search_started = true;
itr->mem_list_version = itr->index->mem_list_version;
itr->range_tree_version = itr->index->range_tree_version;
itr->curr_range = vy_range_tree_find_by_key(itr->index->tree,
itr->iterator_type, itr->key);
vy_merge_iterator_open(&itr->merge_iterator, itr->iterator_type,
itr->key, itr->index->cmp_def,
itr->index->mem_format,
itr->index->upsert_format, itr->index->id == 0);
vy_read_iterator_use_range(itr);
itr->index->stat.lookup++;
......@@ -701,14 +624,10 @@ vy_read_iterator_start(struct vy_read_iterator *itr)
static void
vy_read_iterator_restore(struct vy_read_iterator *itr)
{
itr->mem_list_version = itr->index->mem_list_version;
itr->range_tree_version = itr->index->range_tree_version;
itr->curr_range = vy_range_tree_find_by_key(itr->index->tree,
itr->iterator_type, itr->last_stmt ?: itr->key);
/* Re-create merge iterator */
vy_merge_iterator_close(&itr->merge_iterator);
vy_merge_iterator_open(&itr->merge_iterator, itr->iterator_type,
itr->key, itr->index->cmp_def,
itr->index->mem_format,
itr->index->upsert_format, itr->index->id == 0);
vy_read_iterator_use_range(itr);
}
......@@ -747,16 +666,12 @@ vy_read_iterator_next_range(struct vy_read_iterator *itr)
if (range == NULL)
return false;
vy_merge_iterator_close(&itr->merge_iterator);
vy_merge_iterator_open(&itr->merge_iterator, itr->iterator_type,
itr->key, index->cmp_def, index->mem_format,
index->upsert_format, index->id == 0);
vy_read_iterator_use_range(itr);
return true;
}
/**
* Conventional wrapper around vy_merge_iterator_next_key() to automatically
* Conventional wrapper around vy_read_iterator_next_key() to automatically
* re-create the merge iterator on vy_index/vy_range/vy_run changes.
*/
static NODISCARD int
......@@ -768,7 +683,7 @@ vy_read_iterator_merge_next_key(struct vy_read_iterator *itr,
struct tuple *stmt;
while (true) {
int rc = vy_merge_iterator_next_key(&itr->merge_iterator, &stmt);
int rc = vy_read_iterator_next_key(itr, &stmt);
if (rc == -1)
return -1;
if (rc == -2) {
......@@ -846,7 +761,6 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result)
bool skipped_txw_delete = false;
struct tuple *t = NULL;
struct vy_merge_iterator *mi = &itr->merge_iterator;
struct vy_index *index = itr->index;
int rc = 0;
while (true) {
......@@ -861,8 +775,7 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result)
rc = 0; /* No more data. */
break;
}
rc = vy_merge_iterator_squash_upsert(mi, &t,
&index->stat.upsert.applied);
rc = vy_read_iterator_squash_upsert(itr, &t);
if (rc == -1)
goto clear;
if (rc == -2) {
......@@ -972,9 +885,10 @@ vy_read_iterator_close(struct vy_read_iterator *itr)
{
if (itr->last_stmt != NULL)
tuple_unref(itr->last_stmt);
itr->last_stmt = NULL;
if (itr->search_started)
vy_merge_iterator_close(&itr->merge_iterator);
if (itr->curr_stmt != NULL)
tuple_unref(itr->curr_stmt);
for (uint32_t i = 0; i < itr->src_count; i++)
itr->src[i].iterator.iface->close(&itr->src[i].iterator);
free(itr->src);
TRASH(itr);
}
/* }}} Iterator over index */
......@@ -35,119 +35,82 @@
#include <stdbool.h>
#include "iterator_type.h"
#include "vy_range.h"
#include "trivia/util.h"
#if defined(__cplusplus)
extern "C" {
#endif /* defined(__cplusplus) */
/**
* Merge iterator takes several iterators as sources and sorts
* output from them by the given order and LSN DESC. It has no filter,
* it just sorts output from its sources.
*
* All statements from all sources can be traversed via
* next_key()/next_lsn() like in a simple iterator (run, mem etc).
* next_key() switches to the youngest statement of
* the next key (according to the order), and next_lsn()
* switches to an older statement of the same key.
* Vinyl read iterator.
*
* There are several merge optimizations, which expect that:
*
* 1) All sources are sorted by age, i.e. the most fresh
* sources are added first.
* 2) Mutable sources are added before read-blocking sources.
*/
struct vy_merge_iterator {
/** Array of sources */
struct vy_merge_src *src;
/** Number of elements in the src array */
uint32_t src_count;
/** Number of elements allocated in the src array */
uint32_t src_capacity;
/** Current source offset that merge iterator is positioned on */
uint32_t curr_src;
/** Offset of the first source with is_mutable == true */
uint32_t mutable_start;
/** Next offset after the last source with is_mutable == true */
uint32_t mutable_end;
/** The offset starting with which the sources were skipped */
uint32_t skipped_start;
/**
* Index key definition for compare (extended with
* primary key parts.
*/
const struct key_def *cmp_def;
/** Format to allocate REPLACE and DELETE tuples. */
struct tuple_format *format;
/** Format to allocate UPSERT tuples. */
struct tuple_format *upsert_format;
/** Set if this iterator is for a primary index. */
bool is_primary;
/* {{{ Range version checking */
/* pointer to index->range_tree_version */
const uint32_t *p_range_tree_version;
/* pointer to index->mem_list_version */
const uint32_t *p_mem_list_version;
/* copy of index->range_tree_version to track range tree changes */
uint32_t range_tree_version;
/* copy of index->mem_list_version to track range tree changes */
uint32_t mem_list_version;
/* pointer to range->version */
const uint32_t *p_range_version;
/* copy of curr_range->version to track mem/run lists changes */
uint32_t range_version;
/* Range version checking }}} */
const struct tuple *key;
/** Iteration type. */
enum iterator_type iterator_type;
/** Current stmt that merge iterator is positioned on */
struct tuple *curr_stmt;
/**
* All sources with this front_id are on the same key of
* current iteration (optimization)
*/
uint32_t front_id;
};
/**
* Complex read iterator over vinyl index and write_set of current tx
* Iterates over ranges, creates merge iterator for every range and outputs
* the result.
* Can also wor without transaction, just set tx = NULL in _open
* Applyes upserts and skips deletes, so only one replace stmt for every key
* is output
* Used for executing a SELECT request over a Vinyl index.
*/
struct vy_read_iterator {
/** Vinyl run environment. */
struct vy_run_env *run_env;
/* index to iterate over */
/** Index to iterate over. */
struct vy_index *index;
/* transaction to iterate over */
/** Active transaction or NULL. */
struct vy_tx *tx;
/* search options */
/** Iterator type. */
enum iterator_type iterator_type;
/** Search key. */
struct tuple *key;
/** Read view the iterator lives in. */
const struct vy_read_view **read_view;
/* current range */
struct vy_range *curr_range;
/* merge iterator over current range */
struct vy_merge_iterator merge_iterator;
/** Last statement returned by vy_read_iterator_next(). */
struct tuple *last_stmt;
/* is lazy search started */
bool search_started;
/**
* Set if the resulting statement needs to be
* checked to match the search key.
*/
bool need_check_eq;
/** Set on the first call to vy_read_iterator_next(). */
bool search_started;
/** Last statement returned by vy_read_iterator_next(). */
struct tuple *last_stmt;
/**
* Copy of index->range_tree_version.
* Used for detecting range tree changes.
*/
uint32_t range_tree_version;
/**
* Copy of index->mem_list_version.
* Used for detecting memory level changes.
*/
uint32_t mem_list_version;
/**
* Copy of curr_range->version.
* Used for detecting changes in the current range.
*/
uint32_t range_version;
/** Range the iterator is currently positioned at. */
struct vy_range *curr_range;
/**
* Array of merge sources. Sources are sorted by age.
* In particular, this means that all mutable sources
* come first while all sources that may yield (runs)
* go last.
*/
struct vy_read_src *src;
/** Number of elements in the src array. */
uint32_t src_count;
/** Maximal capacity of the src array. */
uint32_t src_capacity;
/** Index of the current merge source. */
uint32_t curr_src;
/** Statement returned by the current merge source. */
struct tuple *curr_stmt;
/** Offset of the first mutable source. */
uint32_t mutable_start;
/** Offset of the source following the last mutable source. */
uint32_t mutable_end;
/** Offset of the first skipped source. */
uint32_t skipped_start;
/**
* front_id of the current source and all sources
* that are on the same key.
*/
uint32_t front_id;
};
/**
......@@ -159,7 +122,7 @@ struct vy_read_iterator {
* @param iterator_type Type of the iterator that determines order
* of the iteration.
* @param key Key for the iteration.
* @param vlsn Maximal visible LSN of transactions.
* @param rv Read view.
*/
void
vy_read_iterator_open(struct vy_read_iterator *itr, struct vy_run_env *run_env,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment