diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c index 14ac9f7546e305b89d5c98f81da9bc7e84500dad..40d10b2f3989a48845bd19df697c34e0841ce84d 100644 --- a/src/box/vy_read_iterator.c +++ b/src/box/vy_read_iterator.c @@ -52,8 +52,6 @@ struct vy_read_src { struct vy_cache_iterator cache_iterator; struct vy_stmt_iterator iterator; }; - /** Set if the source can change after yield. */ - bool is_mutable; /** Set if the iterator was started. */ bool is_started; /** See vy_read_iterator->front_id. */ @@ -92,24 +90,17 @@ vy_read_iterator_reserve(struct vy_read_iterator *itr, uint32_t capacity) * iteration start and must not be called after. * The resulting vy_stmt_iterator must be properly initialized before merge * iteration start. - * param is_mutable - Source can change during merge iteration */ static struct vy_read_src * -vy_read_iterator_add_src(struct vy_read_iterator *itr, bool is_mutable) +vy_read_iterator_add_src(struct vy_read_iterator *itr) { if (itr->src_count == itr->src_capacity) { if (vy_read_iterator_reserve(itr, itr->src_count + 1) != 0) return NULL; } - if (is_mutable) { - if (itr->mutable_start == itr->mutable_end) - itr->mutable_start = itr->src_count; - itr->mutable_end = itr->src_count + 1; - } itr->src[itr->src_count].front_id = 0; struct vy_read_src *src = &itr->src[itr->src_count++]; memset(src, 0, sizeof(*src)); - src->is_mutable = is_mutable; return src; } @@ -133,202 +124,389 @@ vy_read_iterator_check_version(struct vy_read_iterator *itr) } /** - * Iterate to the next key - * @retval 0 success or EOF (*ret == NULL) - * @retval -1 read error - * @retval -2 iterator is not valid anymore + * Compare two tuples from the read iterator perspective. + * + * Returns: + * -1 if statement @a precedes statement @b in the iterator output + * 0 if statements @a and @b are at the same position + * 1 if statement @a supersedes statement @b + * + * NULL denotes the statement following the last one. */ -static NODISCARD int -vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) +static inline int +vy_read_iterator_cmp_stmt(struct vy_read_iterator *itr, + const struct tuple *a, const struct tuple *b) { - *ret = NULL; - const struct key_def *def = itr->index->cmp_def; - if (itr->curr_stmt != NULL && (itr->iterator_type == ITER_EQ || - itr->iterator_type == ITER_REQ) && - tuple_field_count(itr->key) >= def->part_count) { + if (a == NULL && b != NULL) + return 1; + if (a != NULL && b == NULL) + return -1; + if (a == NULL && b == NULL) + return 0; + return iterator_direction(itr->iterator_type) * + vy_tuple_compare(a, b, itr->index->cmp_def); +} + +/** + * Return true if the statement matches search criteria + * and older sources don't need to be scanned. + */ +static bool +vy_read_iterator_is_exact_match(struct vy_read_iterator *itr, + struct tuple *stmt) +{ + struct tuple *key = itr->key; + enum iterator_type type = itr->iterator_type; + struct key_def *cmp_def = itr->index->cmp_def; + + /* + * If the index is unique and the search key is full, + * we can avoid disk accesses on the first iteration + * in case the key is found in memory. + */ + return itr->last_stmt == NULL && stmt != NULL && + (type == ITER_EQ || type == ITER_REQ || + type == ITER_GE || type == ITER_LE) && + tuple_field_count(key) >= cmp_def->part_count && + vy_stmt_compare(stmt, key, cmp_def) == 0; +} + +/** + * Check if the statement at which the given read source + * is positioned precedes the current candidate for the + * next key ('curr_stmt') and update the latter if so. + * The 'stop' flag is set if the next key is found and + * older sources don't need to be evaluated. + */ +static void +vy_read_iterator_evaluate_src(struct vy_read_iterator *itr, + struct vy_read_src *src, bool *stop) +{ + int cmp; + uint32_t src_id = src - itr->src; + + if (vy_read_iterator_is_exact_match(itr, src->stmt)) { /* - * There may be one statement at max satisfying - * EQ with a full key. + * If we got an exact match, we can skip a tuple + * comparison, because this source must be on top + * of the heap, otherwise 'curr_stmt' would be an + * exact match as well and so we would not have + * scanned this source at all. */ - return 0; + assert(vy_read_iterator_cmp_stmt(itr, src->stmt, + itr->curr_stmt) < 0); + cmp = -1; + *stop = true; + } else { + cmp = vy_read_iterator_cmp_stmt(itr, src->stmt, + itr->curr_stmt); } - if (vy_read_iterator_check_version(itr)) - return -2; - int dir = iterator_direction(itr->iterator_type); - uint32_t prev_front_id = itr->front_id; - itr->front_id++; - itr->curr_src = UINT32_MAX; - struct tuple *min_stmt = NULL; - int rc = 0; + if (cmp < 0) { + assert(src->stmt != NULL); + tuple_ref(src->stmt); + if (itr->curr_stmt != NULL) + tuple_unref(itr->curr_stmt); + itr->curr_stmt = src->stmt; + itr->curr_src = src_id; + itr->front_id++; + } + if (cmp <= 0) + src->front_id = itr->front_id; + if (*stop || src_id >= itr->skipped_src) + itr->skipped_src = src_id + 1; +} - bool was_yield_possible = false; - for (uint32_t i = 0; i < itr->src_count; i++) { - bool is_yield_possible = i >= itr->mutable_end; - was_yield_possible = was_yield_possible || is_yield_possible; +/* + * Each of the functions from the vy_read_iterator_scan_* family + * is used by vy_read_iterator_next_key() to: + * + * 1. Update the position of a read source, which implies: + * + * - Starting iteration over the source if it has not been done + * yet or restoring the iterator position in case the source + * has been modified since the last iteration. + * + * - Advancing the iterator position to the first statement + * following the one returned on the previous iteration. + * To avoid an extra tuple comparison, we maintain front_id + * for each source: all sources with front_id equal to the + * front_id of the read iterator were used on the previous + * iteration and hence need to be advanced. + * + * 2. Update the candidate for the next key ('curr_stmt') if the + * statement at which the source is positioned precedes it. + * The 'stop' flag is set if older sources do not need to be + * scanned (e.g. because a chain was found in the cache). + * See also vy_read_iterator_evaluate_src(). + */ - struct vy_read_src *src = &itr->src[i]; - bool stop = false; +static void +vy_read_iterator_scan_txw(struct vy_read_iterator *itr, bool *stop) +{ + int rc; + bool unused; + struct vy_read_src *src = &itr->src[itr->txw_src]; - if (!src->is_started) { - /* - * This is the first time the source is used. - * Start the iterator. - */ - src->is_started = true; + if (itr->tx == NULL) + return; + + assert(itr->txw_src < itr->skipped_src); + + if (!src->is_started) { + src->is_started = true; + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); + } else { + rc = src->iterator.iface->restore(&src->iterator, + itr->last_stmt, &src->stmt, &unused); + if (rc == 0 && src->front_id == itr->prev_front_id) rc = src->iterator.iface->next_key(&src->iterator, - &src->stmt, &stop); - } else { - /* - * The source might have changed since the last time - * it was used, so the iterator needs to be restored. - */ - rc = src->iterator.iface->restore(&src->iterator, - itr->curr_stmt, - &src->stmt, &stop); - if (rc == 0 && src->front_id == prev_front_id) { - /* - * The source was used on the previous iteration. - * Advance the iterator to the next key unless it - * was restored. - */ - assert(itr->curr_stmt != NULL); - assert(i < itr->skipped_start); - rc = src->iterator.iface->next_key(&src->iterator, - &src->stmt, &stop); - } - } + &src->stmt, &unused); + } + assert(rc >= 0); + (void)rc; + + vy_read_iterator_evaluate_src(itr, src, stop); +} + +static void +vy_read_iterator_scan_cache(struct vy_read_iterator *itr, bool *stop) +{ + int rc; + bool is_interval = false; + struct vy_read_src *src = &itr->src[itr->cache_src]; + + if (!src->is_started) { + src->is_started = true; + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &is_interval); + } else { + rc = src->iterator.iface->restore(&src->iterator, + itr->last_stmt, &src->stmt, &is_interval); + if (rc == 0 && src->front_id == itr->prev_front_id) + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &is_interval); + } + assert(rc >= 0); + (void)rc; + + while (itr->cache_src >= itr->skipped_src && src->stmt != NULL && + vy_read_iterator_cmp_stmt(itr, src->stmt, itr->last_stmt) <= 0) { + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &is_interval); + assert(rc == 0); + (void)rc; + } + + vy_read_iterator_evaluate_src(itr, src, stop); + + if (is_interval) { + itr->skipped_src = itr->cache_src + 1; + *stop = true; + } +} + +static NODISCARD int +vy_read_iterator_scan_mem(struct vy_read_iterator *itr, + uint32_t mem_src, bool *stop) +{ + int rc; + bool unused; + struct vy_read_src *src = &itr->src[mem_src]; + + assert(mem_src >= itr->mem_src && mem_src < itr->disk_src); + + if (!src->is_started) { + src->is_started = true; + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); + } else { + rc = src->iterator.iface->restore(&src->iterator, + itr->last_stmt, &src->stmt, &unused); + if (rc == 0 && src->front_id == itr->prev_front_id) + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); + } + if (rc < 0) + return -1; + + while (mem_src >= itr->skipped_src && src->stmt != NULL && + vy_read_iterator_cmp_stmt(itr, src->stmt, itr->last_stmt) <= 0) { + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); if (rc < 0) return -1; - if (vy_read_iterator_check_version(itr)) - return -2; - if (i >= itr->skipped_start && itr->curr_stmt != NULL) { - /* - * If the source was not used on the last iteration, - * it might have lagged behind the current merge key. - * Advance it until it is up-to-date. - */ - while (src->stmt != NULL && - dir * vy_tuple_compare(src->stmt, itr->curr_stmt, - def) <= 0) { - rc = src->iterator.iface->next_key(&src->iterator, - &src->stmt, - &stop); - if (vy_read_iterator_check_version(itr)) - return -2; - if (rc != 0) - return rc; - } - } - if (i >= itr->skipped_start) - itr->skipped_start++; - - if (stop && src->stmt == NULL && min_stmt == NULL) { - itr->front_id++; - itr->curr_src = i; - src->front_id = itr->front_id; - itr->skipped_start = i + 1; - break; - } - if (src->stmt == NULL) - continue; + } - if (itr->curr_stmt == NULL && (itr->iterator_type == ITER_EQ || - itr->iterator_type == ITER_REQ || - itr->iterator_type == ITER_GE || - itr->iterator_type == ITER_LE) && - tuple_field_count(itr->key) >= def->part_count && - vy_stmt_compare(src->stmt, itr->key, def) == 0) { - /** - * If the index is unique and the search key - * is full, we can avoid disk accesses on the - * first iteration in case the key is found - * in memory. - */ - stop = true; - } + vy_read_iterator_evaluate_src(itr, src, stop); + return 0; +} - int cmp = min_stmt == NULL ? -1 : - dir * vy_tuple_compare(src->stmt, min_stmt, def); - if (cmp < 0) { - itr->front_id++; - if (min_stmt) - tuple_unref(min_stmt); - min_stmt = src->stmt; - tuple_ref(min_stmt); - itr->curr_src = i; - } - if (cmp <= 0) - src->front_id = itr->front_id; +static NODISCARD int +vy_read_iterator_scan_disk(struct vy_read_iterator *itr, + uint32_t disk_src, bool *stop) +{ + int rc = 0; + bool unused; + struct vy_read_src *src = &itr->src[disk_src]; - if (stop) { - itr->skipped_start = i + 1; - break; - } + assert(disk_src >= itr->disk_src && disk_src < itr->src_count); + + if (!src->is_started || src->front_id == itr->prev_front_id) { + src->is_started = true; + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); } + if (rc < 0) + return -1; + if (vy_read_iterator_check_version(itr)) + return -2; - for (int i = MIN(itr->skipped_start, itr->mutable_end) - 1; - was_yield_possible && i >= (int) itr->mutable_start; i--) { - struct vy_read_src *src = &itr->src[i]; - bool stop; - rc = src->iterator.iface->restore(&src->iterator, - itr->curr_stmt, - &src->stmt, &stop); + while (disk_src >= itr->skipped_src && src->stmt != NULL && + vy_read_iterator_cmp_stmt(itr, src->stmt, itr->last_stmt) <= 0) { + rc = src->iterator.iface->next_key(&src->iterator, + &src->stmt, &unused); + if (rc < 0) + return -1; if (vy_read_iterator_check_version(itr)) return -2; - if (rc < 0) - return rc; - if (rc == 0) - continue; + } - int cmp = min_stmt == NULL ? -1 : - dir * vy_tuple_compare(src->stmt, min_stmt, def); - if (cmp > 0) { - /* - * The iterator could have been positioned at - * min_stmt before the restoration, which was - * removed from the source during the yield. - * Make sure, we won't advance it on the next - * iteration, possibly skipping a statement. - */ - src->front_id = 0; - continue; - } + vy_read_iterator_evaluate_src(itr, src, stop); + return 0; +} - if (cmp < 0 || vy_stmt_lsn(src->stmt) > vy_stmt_lsn(min_stmt)) { - if (min_stmt) - tuple_unref(min_stmt); - min_stmt = src->stmt; - tuple_ref(min_stmt); - } +/** + * Restore the position of the active in-memory tree iterator + * after a yield caused by a disk read and update 'curr_stmt' + * if necessary. + */ +static NODISCARD int +vy_read_iterator_restore_mem(struct vy_read_iterator *itr) +{ + int rc; + int cmp; + bool unused; + struct vy_read_src *src = &itr->src[itr->mem_src]; + + rc = src->iterator.iface->restore(&src->iterator, itr->last_stmt, + &src->stmt, &unused); + if (rc < 0) + return -1; /* memory allocation error */ + if (rc == 0) + return 0; /* nothing changed */ + + cmp = vy_read_iterator_cmp_stmt(itr, src->stmt, itr->curr_stmt); + if (cmp > 0) { + /* + * Memory trees are append-only so if the + * source is not on top of the heap after + * restoration, it was not before. + */ + assert(src->front_id < itr->front_id); + return 0; + } + if (cmp < 0 || vy_stmt_lsn(src->stmt) > vy_stmt_lsn(itr->curr_stmt)) { + /* + * The new statement precedes the current + * candidate for the next key or it is a + * newer version of the same key. + */ + tuple_ref(src->stmt); + if (itr->curr_stmt != NULL) + tuple_unref(itr->curr_stmt); + itr->curr_stmt = src->stmt; + itr->curr_src = itr->mem_src; + } else { + /* + * There must be a statement for the same + * key in the transaction write set. + * Make sure we don't read the old value + * from the cache while applying UPSERTs. + */ + assert(itr->curr_src == itr->txw_src); + itr->src[itr->cache_src].front_id = 0; + } + if (cmp < 0) + itr->front_id++; + src->front_id = itr->front_id; + return 0; +} - if (cmp < 0) { - itr->front_id++; - itr->curr_src = i; - } else - itr->curr_src = MIN(itr->curr_src, (uint32_t)i); +/** + * Iterate to the next key + * @retval 0 success or EOF (*ret == NULL) + * @retval -1 read error + * @retval -2 iterator is not valid anymore + */ +static NODISCARD int +vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) +{ + uint32_t i; + bool stop = false; - src->front_id = itr->front_id; + if (itr->last_stmt != NULL && (itr->iterator_type == ITER_EQ || + itr->iterator_type == ITER_REQ) && + tuple_field_count(itr->key) >= itr->index->cmp_def->part_count) { + /* + * There may be one statement at max satisfying + * EQ with a full key. + */ + *ret = NULL; + return 0; } - if (itr->curr_stmt != NULL && min_stmt != NULL) - assert(dir * vy_tuple_compare(min_stmt, itr->curr_stmt, def) > 0); + if (vy_read_iterator_check_version(itr)) + return -2; if (itr->curr_stmt != NULL) tuple_unref(itr->curr_stmt); - itr->curr_stmt = min_stmt; - *ret = itr->curr_stmt; + itr->curr_stmt = NULL; + itr->curr_src = UINT32_MAX; + itr->prev_front_id = itr->front_id; + /* + * Look up the next key in read sources starting + * from the one that stores newest data. + */ + vy_read_iterator_scan_txw(itr, &stop); + if (stop) + goto done; + vy_read_iterator_scan_cache(itr, &stop); + if (stop) + goto done; + + for (i = itr->mem_src; i < itr->disk_src; i++) { + if (vy_read_iterator_scan_mem(itr, i, &stop) != 0) + return -1; + if (stop) + goto done; + } + /* The following code may yield as it needs to access disk. */ + for (i = itr->disk_src; i < itr->src_count; i++) { + int rc = vy_read_iterator_scan_disk(itr, i, &stop); + if (rc != 0) + return rc; + if (stop) + break; + } + /* + * The transaction write set couldn't change during the yield + * as it is owned exclusively by the current fiber so the only + * source to check is the active in-memory tree. + */ + if (vy_read_iterator_restore_mem(itr) != 0) + return -1; +done: + if (itr->last_stmt != NULL && itr->curr_stmt != NULL) + assert(vy_read_iterator_cmp_stmt(itr, itr->curr_stmt, + itr->last_stmt) > 0); + *ret = itr->curr_stmt; return 0; } /** * Iterate to the next (elder) version of the same key - * - * Note, we don't need to restore individual sources in this - * function, because sources that may yield (i.e. runs) are - * immutable and iterated last (after txw, cache, and mems) - * as they contain the oldest data. - * * @retval 0 success or EOF (*ret == NULL) * @retval -1 read error * @retval -2 iterator is not valid anymore @@ -336,61 +514,75 @@ vy_read_iterator_next_key(struct vy_read_iterator *itr, struct tuple **ret) static NODISCARD int vy_read_iterator_next_lsn(struct vy_read_iterator *itr, struct tuple **ret) { - *ret = NULL; - if (itr->curr_src == UINT32_MAX) - return 0; + uint32_t i; + bool unused; + struct vy_read_src *src; + assert(itr->curr_stmt != NULL); - const struct key_def *def = itr->index->cmp_def; - struct vy_read_src *src = &itr->src[itr->curr_src]; - struct vy_stmt_iterator *sub_itr = &src->iterator; - int rc = sub_itr->iface->next_lsn(sub_itr, &src->stmt); - if (vy_read_iterator_check_version(itr)) - return -2; - if (rc != 0) - return rc; - if (src->stmt != NULL) { - tuple_unref(itr->curr_stmt); - itr->curr_stmt = src->stmt; - tuple_ref(itr->curr_stmt); - *ret = itr->curr_stmt; - return 0; + assert(itr->curr_src < itr->skipped_src); + + /* Cache stores only terminal statements. */ + assert(itr->curr_src != itr->cache_src); + + if (itr->curr_src == itr->txw_src) { + /* + * Write set does not store statement history. + * Look up the older statement in the cache and + * if it isn't there proceed to mems and runs. + */ + src = &itr->src[itr->cache_src]; + if (itr->cache_src >= itr->skipped_src) + vy_read_iterator_scan_cache(itr, &unused); + if (src->front_id == itr->front_id) + goto found; } - for (uint32_t i = itr->curr_src + 1; i < itr->src_count; i++) { - src = &itr->src[i]; - if (i >= itr->skipped_start) { - itr->skipped_start++; - bool stop = false; - int cmp = -1; - while (true) { - rc = src->iterator.iface->next_key(&src->iterator, - &src->stmt, - &stop); - if (vy_read_iterator_check_version(itr)) - return -2; - if (rc != 0) - return rc; - if (src->stmt == NULL) - break; - cmp = vy_tuple_compare(src->stmt, itr->curr_stmt, - def); - if (cmp >= 0) - break; - } - if (cmp == 0) - itr->src[i].front_id = itr->front_id; - } + /* Look up the older statement in in-memory trees. */ + for (i = MAX(itr->curr_src, itr->mem_src); i < itr->disk_src; i++) { + src = &itr->src[i]; + if (i >= itr->skipped_src && + vy_read_iterator_scan_mem(itr, i, &unused) != 0) + return -1; + if (src->front_id != itr->front_id) + continue; + if (i == itr->curr_src && + src->iterator.iface->next_lsn(&src->iterator, + &src->stmt) != 0) + return -1; + if (src->stmt != NULL) + goto found; + } - if (itr->src[i].front_id == itr->front_id) { - itr->curr_src = i; - tuple_unref(itr->curr_stmt); - itr->curr_stmt = itr->src[i].stmt; - tuple_ref(itr->curr_stmt); - *ret = itr->curr_stmt; - return 0; + /* Look up the older statement in on-disk runs. */ + for (i = MAX(itr->curr_src, itr->disk_src); i < itr->src_count; i++) { + src = &itr->src[i]; + if (i >= itr->skipped_src) { + int rc = vy_read_iterator_scan_disk(itr, i, &unused); + if (rc != 0) + return rc; } + if (src->front_id != itr->front_id) + continue; + if (i == itr->curr_src && + src->iterator.iface->next_lsn(&src->iterator, + &src->stmt) != 0) + return -1; + if (vy_read_iterator_check_version(itr)) + return -2; + if (src->stmt != NULL) + goto found; } - itr->curr_src = UINT32_MAX; + + /* Searched everywhere, found nothing. */ + *ret = NULL; + return 0; +found: + tuple_ref(src->stmt); + if (itr->curr_stmt != NULL) + tuple_unref(itr->curr_stmt); + itr->curr_stmt = src->stmt; + itr->curr_src = src - itr->src; + *ret = itr->curr_stmt; return 0; } @@ -440,7 +632,7 @@ vy_read_iterator_add_tx(struct vy_read_iterator *itr, { assert(itr->tx != NULL); struct vy_txw_iterator_stat *stat = &itr->index->stat.txw.iterator; - struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, true); + struct vy_read_src *sub_src = vy_read_iterator_add_src(itr); vy_txw_iterator_open(&sub_src->txw_iterator, stat, itr->tx, itr->index, iterator_type, key); } @@ -449,7 +641,7 @@ static void vy_read_iterator_add_cache(struct vy_read_iterator *itr, enum iterator_type iterator_type, struct tuple *key) { - struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, true); + struct vy_read_src *sub_src = vy_read_iterator_add_src(itr); vy_cache_iterator_open(&sub_src->cache_iterator, &itr->index->cache, iterator_type, key, itr->read_view); @@ -464,7 +656,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr, /* Add the active in-memory index. */ assert(index->mem != NULL); - sub_src = vy_read_iterator_add_src(itr, true); + sub_src = vy_read_iterator_add_src(itr); vy_mem_iterator_open(&sub_src->mem_iterator, &index->stat.memory.iterator, index->mem, iterator_type, key, @@ -472,7 +664,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr, /* Add sealed in-memory indexes. */ struct vy_mem *mem; rlist_foreach_entry(mem, &index->sealed, in_sealed) { - sub_src = vy_read_iterator_add_src(itr, false); + sub_src = vy_read_iterator_add_src(itr); vy_mem_iterator_open(&sub_src->mem_iterator, &index->stat.memory.iterator, mem, iterator_type, key, @@ -506,7 +698,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr, if (slice->run->info.min_lsn > index->dump_lsn) continue; assert(slice->run->info.max_lsn <= index->dump_lsn); - struct vy_read_src *sub_src = vy_read_iterator_add_src(itr, false); + struct vy_read_src *sub_src = vy_read_iterator_add_src(itr); vy_run_iterator_open(&sub_src->run_iterator, &index->stat.disk.iterator, itr->run_env, slice, @@ -532,9 +724,11 @@ vy_read_iterator_use_range(struct vy_read_iterator *itr) for (uint32_t i = 0; i < itr->src_count; i++) itr->src[i].iterator.iface->close(&itr->src[i].iterator); itr->src_count = 0; - itr->mutable_start = 0; - itr->mutable_end = 0; - itr->skipped_start = 0; + itr->cache_src = UINT32_MAX; + itr->txw_src = UINT32_MAX; + itr->mem_src = UINT32_MAX; + itr->disk_src = UINT32_MAX; + itr->skipped_src = UINT32_MAX; itr->curr_stmt = NULL; itr->curr_src = UINT32_MAX; itr->front_id = 1; @@ -559,12 +753,18 @@ vy_read_iterator_use_range(struct vy_read_iterator *itr) itr->need_check_eq = true; } - if (itr->tx != NULL) + if (itr->tx != NULL) { + itr->txw_src = itr->src_count; vy_read_iterator_add_tx(itr, iterator_type, key); + } + itr->cache_src = itr->src_count; vy_read_iterator_add_cache(itr, iterator_type, key); + + itr->mem_src = itr->src_count; vy_read_iterator_add_mem(itr, iterator_type, key); + itr->disk_src = itr->src_count; if (itr->curr_range != NULL) { itr->range_version = itr->curr_range->version; vy_read_iterator_add_disk(itr, iterator_type, key); @@ -825,17 +1025,14 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) vy_read_iterator_restore(itr); continue; } - if (vy_stmt_type(t) == IPROTO_REPLACE) { - if (itr->last_stmt != NULL) - tuple_unref(itr->last_stmt); - itr->last_stmt = t; + if (itr->last_stmt != NULL) + tuple_unref(itr->last_stmt); + itr->last_stmt = t; + if (vy_stmt_type(t) == IPROTO_REPLACE) break; - } else { - assert(vy_stmt_type(t) == IPROTO_DELETE); - if (vy_stmt_lsn(t) == INT64_MAX) /* t is from write set */ - skipped_txw_delete = true; - tuple_unref(t); - } + assert(vy_stmt_type(t) == IPROTO_DELETE); + if (vy_stmt_lsn(t) == INT64_MAX) /* t is from write set */ + skipped_txw_delete = true; } *result = itr->last_stmt; diff --git a/src/box/vy_read_iterator.h b/src/box/vy_read_iterator.h index 344e457c8b56ae9cbf238f00295585c8cb904809..d04610e89991967ee60fda0a724555d5fca75571 100644 --- a/src/box/vy_read_iterator.h +++ b/src/box/vy_read_iterator.h @@ -105,17 +105,25 @@ struct vy_read_iterator { uint32_t curr_src; /** Statement returned by the current merge source. */ struct tuple *curr_stmt; - /** Offset of the first mutable source. */ - uint32_t mutable_start; - /** Offset of the source following the last mutable source. */ - uint32_t mutable_end; + /** Offset of the transaction write set source. */ + uint32_t txw_src; + /** Offset of the cache source. */ + uint32_t cache_src; + /** Offset of the first memory source. */ + uint32_t mem_src; + /** Offset of the first disk source. */ + uint32_t disk_src; /** Offset of the first skipped source. */ - uint32_t skipped_start; + uint32_t skipped_src; /** * front_id of the current source and all sources * that are on the same key. */ uint32_t front_id; + /** + * front_id from the previous iteration. + */ + uint32_t prev_front_id; }; /**