diff --git a/src/box/vinyl.c b/src/box/vinyl.c index c75c36724edde727981dbbf71a004def7cb6c549..6055f6fe4f92fbe6bddc5431a36f3f9c7b63921d 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -7592,6 +7592,465 @@ vy_merge_iterator_restore(struct vy_merge_iterator *itr, /* {{{ Iterator over index */ +/** + * ID of an iterator source type. Can be used in bitmaps. + */ +enum iterator_src_type { + ITER_SRC_TXW = 1, + ITER_SRC_CACHE = 2, + ITER_SRC_MEM = 4, + ITER_SRC_RUN = 8, +}; + +/** + * History of a key in vinyl is a continuous sequence of statements of the + * same key in order of decreasing lsn. The history can be represented as a + * list, the structure below describes one node of the list. + */ +struct vy_stmt_history_node { + /* Type of source that the history statement came from */ + enum iterator_src_type src_type; + /* The history statement. Referenced for runs. */ + struct tuple *stmt; + /* Link in the history list */ + struct rlist link; +}; + +/** + * Point iterator is a special read iterator that is designed for + * retrieving one value from index by a full key (all parts are present). + * + * Iterator collects necessary history of the given key from different sources + * (txw, cache, mems, runs) that consists of some number of sequential upserts + * and possibly one terminal statement (replace or delete). The iterator + * sequentially scans txw, cache, mems and runs until a terminal statement is + * met. After reading the slices the iterator checks that the list of mems + * hasn't been changed and restarts if it is the case. + * After the history is collected the iterator calculates resultant statement + * and, if the result is the latest version of the key, adds it to cache. + */ +struct vy_point_iterator { + /* Search location and options */ + struct vy_index *index; + struct vy_tx *tx; + const struct vy_read_view **p_read_view; + const struct tuple *key; + + /** + * For compatibility reasons, the iterator references the + * resultant statement until own destruction. + */ + struct tuple *curr_stmt; +}; + +/** + * Create an iterator by full key. + */ +static void +vy_point_iterator_open(struct vy_point_iterator *itr, struct vy_index *index, + struct vy_tx *tx, const struct vy_read_view **rv, + const struct tuple *key) +{ + vy_index_ref(index); + itr->index = index; + itr->tx = tx; + itr->p_read_view = rv; + itr->key = key; + + itr->curr_stmt = NULL; +} + +/** + * Allocate (region) new history node. + * @return new node or NULL on memory error (diag is set). + */ +static struct vy_stmt_history_node * +vy_point_iterator_new_node() +{ + struct region *region = &fiber()->gc; + struct vy_stmt_history_node *node = region_alloc(region, sizeof(*node)); + if (node == NULL) + diag_set(OutOfMemory, sizeof(*node), "region", + "struct vy_stmt_history_node"); + return node; +} + +/** + * Unref statement if necessary, remove node from history if it's there. + */ +static void +vy_point_iterator_cleanup(struct rlist *history, size_t region_svp) +{ + struct vy_stmt_history_node *node; + rlist_foreach_entry(node, history, link) + if (node->src_type == ITER_SRC_RUN) + tuple_unref(node->stmt); + + region_truncate(&fiber()->gc, region_svp); +} + +/** + * Free resources and close the iterator. + */ +static void +vy_point_iterator_close(struct vy_point_iterator *itr) +{ + if (itr->curr_stmt != NULL) + tuple_unref(itr->curr_stmt); + vy_index_unref(itr->index); + TRASH(itr); +} + +/** + * Return true if the history of a key contains terminal node in the end, + * i.e. REPLACE of DELETE statement. + */ +static bool +vy_point_iterator_history_is_terminal(struct rlist *history) +{ + if (rlist_empty(history)) + return false; + struct vy_stmt_history_node *node = + rlist_last_entry(history, struct vy_stmt_history_node, link); + assert(vy_stmt_type(node->stmt) == IPROTO_REPLACE || + vy_stmt_type(node->stmt) == IPROTO_DELETE || + vy_stmt_type(node->stmt) == IPROTO_UPSERT); + return vy_stmt_type(node->stmt) != IPROTO_UPSERT; +} + +/** + * Scan TX write set for given key. + * Add one or no statement to the history list. + */ +static int +vy_point_iterator_scan_txw(struct vy_point_iterator *itr, struct rlist *history) +{ + struct vy_tx *tx = itr->tx; + if (tx == NULL) + return 0; + struct txv *txv = + write_set_search_key(&tx->write_set, itr->index, itr->key); + if (txv == NULL) + return 0; + struct vy_stmt_history_node *node = vy_point_iterator_new_node(); + if (node == NULL) + return -1; + node->src_type = ITER_SRC_TXW; + node->stmt = txv->stmt; + rlist_add_tail(history, &node->link); + return 0; +} + +/** + * Scan index cache for given key. + * Add one or no statement to the history list. + */ +static int +vy_point_iterator_scan_cache(struct vy_point_iterator *itr, + struct rlist *history) +{ + struct tuple *stmt = vy_cache_get(&itr->index->cache, itr->key); + + if (stmt == NULL || vy_stmt_lsn(stmt) > (*itr->p_read_view)->vlsn) + return 0; + + struct vy_stmt_history_node *node = vy_point_iterator_new_node(); + if (node == NULL) + return -1; + + node->src_type = ITER_SRC_CACHE; + node->stmt = stmt; + rlist_add_tail(history, &node->link); + return 0; +} + +/** + * Scan one particular mem. + * Add found statements to the history list up to terminal statement. + */ +static int +vy_point_iterator_scan_mem(struct vy_point_iterator *itr, struct vy_mem *mem, + struct rlist *history) +{ + struct tree_mem_key tree_key; + tree_key.stmt = itr->key; + tree_key.lsn = (*itr->p_read_view)->vlsn; + bool exact; + struct vy_mem_tree_iterator mem_itr = + vy_mem_tree_lower_bound(&mem->tree, &tree_key, &exact); + itr->index->env->stat->mem_stat.lookup_count++; + const struct tuple *stmt = NULL; + if (!vy_mem_tree_iterator_is_invalid(&mem_itr)) { + stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr); + if (vy_stmt_compare(stmt, itr->key, mem->key_def) != 0) + stmt = NULL; + } + + if (stmt == NULL) + return 0; + + while (true) { + struct vy_stmt_history_node *node = vy_point_iterator_new_node(); + if (node == NULL) + return -1; + + node->src_type = ITER_SRC_MEM; + node->stmt = (struct tuple *)stmt; + rlist_add_tail(history, &node->link); + if(vy_point_iterator_history_is_terminal(history)) + break; + + if (!vy_mem_tree_iterator_next(&mem->tree, &mem_itr)) + break; + itr->index->env->stat->mem_stat.step_count++; + + const struct tuple *prev_stmt = stmt; + stmt = *vy_mem_tree_iterator_get_elem(&mem->tree, &mem_itr); + if (vy_stmt_lsn(stmt) >= vy_stmt_lsn(prev_stmt)) + break; + if (vy_stmt_compare(stmt, itr->key, mem->key_def) != 0) + break; + } + return 0; + +} + +/** + * Scan all mems that belongs to the index. + * Add found statements to the history list up to terminal statement. + */ +static int +vy_point_iterator_scan_mems(struct vy_point_iterator *itr, + struct rlist *history) +{ + assert(itr->index->mem != NULL); + int rc = vy_point_iterator_scan_mem(itr, itr->index->mem, history); + struct vy_mem *mem; + rlist_foreach_entry(mem, &itr->index->sealed, in_sealed) { + if (rc != 0 || vy_point_iterator_history_is_terminal(history)) + return rc; + + rc = vy_point_iterator_scan_mem(itr, itr->index->mem, history); + } + return 0; +} + +/** + * Scan one particular slice. + * Add found statements to the history list up to terminal statement. + */ +static int +vy_point_iterator_scan_slice(struct vy_point_iterator *itr, + struct vy_slice *slice, + struct rlist *history) +{ + int rc = 0; + /* + * The format of the statement must be exactly the space + * format with the same identifier to fully match the + * format in vy_mem. + */ + struct tuple_format *format = + (itr->index->space_index_count == 1 ? + itr->index->space_format : itr->index->surrogate_format); + struct vy_env *env = itr->index->env; + struct vy_run_iterator run_itr; + const struct vy_read_view **p_read_view; + p_read_view = itr->p_read_view; + vy_run_iterator_open(&run_itr, true, &env->stat->run_stat, + &env->run_env, slice, + ITER_EQ, itr->key, p_read_view, + itr->index->key_def, itr->index->user_key_def, + format, itr->index->upsert_format, + itr->index->id == 0); + while (true) { + struct tuple *stmt; + rc = run_itr.base.iface->next_lsn(&run_itr.base, &stmt); + if (rc != 0) + break; + if (stmt == NULL) + break; + + struct vy_stmt_history_node *node = vy_point_iterator_new_node(); + if (node == NULL) { + rc = -1; + break; + } + + node->src_type = ITER_SRC_RUN; + node->stmt = stmt; + tuple_ref(stmt); + rlist_add_tail(history, &node->link); + if(vy_point_iterator_history_is_terminal(history)) + break; + } + run_itr.base.iface->cleanup(&run_itr.base); + run_itr.base.iface->close(&run_itr.base); + return rc; +} + +/** + * Find a range and scan all slices that belongs to the range. + * Add found statements to the history list up to terminal statement. + * All slices are pinned before first slice scan, so it's guaranteed + * that complete history from runs will be extracted. + */ +static int +vy_point_iterator_scan_slices(struct vy_point_iterator *itr, + struct rlist *history) +{ + struct vy_range *range = + vy_range_tree_find_by_key(itr->index->tree, ITER_EQ, + itr->key, itr->index->key_def); + assert(range != NULL); + int slice_count = range->slice_count; + struct vy_slice **slices = (struct vy_slice **) + region_alloc(&fiber()->gc, slice_count * sizeof(*slices)); + if (slices == NULL) { + diag_set(OutOfMemory, slice_count * sizeof(*slices), + "region", "slices array"); + return -1; + } + int i = 0; + struct vy_slice *slice; + rlist_foreach_entry(slice, &range->slices, in_range) { + vy_slice_pin(slice); + slices[i++] = slice; + } + assert(i == slice_count); + int rc = 0; + for (i = 0; i < slice_count; i++) { + if (rc == 0 && !vy_point_iterator_history_is_terminal(history)) + rc = vy_point_iterator_scan_slice(itr, slices[i], + history); + vy_slice_unpin(slices[i]); + } + return rc; +} + +/** + * Get a resultant statement from collected history. Add to cache if possible. + */ +static int +vy_point_iterator_apply_history(struct vy_point_iterator *itr, + struct rlist *history) +{ + assert(itr->curr_stmt == NULL); + if (rlist_empty(history)) + return 0; + int64_t vlsn = (*itr->p_read_view)->vlsn; + + struct vy_stmt_history_node *node = + rlist_last_entry(history, struct vy_stmt_history_node, link); + if (vy_point_iterator_history_is_terminal(history)) { + if (vy_stmt_type(node->stmt) == IPROTO_DELETE) { + /* Ignore terminal delete */ + } else if (node->src_type == ITER_SRC_MEM) { + itr->curr_stmt = vy_stmt_dup(node->stmt, + tuple_format(node->stmt)); + } else { + itr->curr_stmt = node->stmt; + tuple_ref(itr->curr_stmt); + } + node = rlist_prev_entry_safe(node, history, link); + } + while (node != NULL) { + assert(vy_stmt_type(node->stmt) == IPROTO_UPSERT); + if (vy_stmt_lsn(node->stmt) > vlsn) { + /* We were sent to read view, skip the statement */ + node = rlist_prev_entry_safe(node, history, link); + continue; + } + + struct tuple *stmt = + vy_apply_upsert(node->stmt, itr->curr_stmt, + itr->index->key_def, + itr->index->space_format, + itr->index->upsert_format, true); + rmean_collect(itr->index->env->stat->rmean, + VY_STAT_UPSERT_APPLIED, 1); + if (stmt == NULL) + return -1; + if (itr->curr_stmt) + tuple_unref(itr->curr_stmt); + itr->curr_stmt = stmt; + node = rlist_prev_entry_safe(node, history, link); + } + + /** + * Add a statement to the cache + */ + if ((**itr->p_read_view).vlsn == INT64_MAX) /* Do not store non-latest data */ + vy_cache_add(&itr->index->cache, itr->curr_stmt, NULL, + itr->key, ITER_EQ); + return 0; +} + +/* + * Get a resultant tuple from the iterator. Actually do not change + * iterator state thus second call will return the same statement + * (unlike all other iterators that would return NULL on the second call) + */ +static int +vy_point_iterator_get(struct vy_point_iterator *itr, struct tuple **result) +{ + *result = NULL; + size_t region_svp = region_used(&fiber()->gc); + int rc = 0; + + /* History list */ + struct rlist history; +restart: + rlist_create(&history); + + rc = vy_point_iterator_scan_txw(itr, &history); + if (rc != 0 || vy_point_iterator_history_is_terminal(&history)) + goto done; + + vy_point_iterator_scan_cache(itr, &history); + if (rc != 0 || vy_point_iterator_history_is_terminal(&history)) + goto done; + + rc = vy_point_iterator_scan_mems(itr, &history); + if (rc != 0 || vy_point_iterator_history_is_terminal(&history)) + goto done; + + /* + * From this moment we have to notify TX manager that we + * are about to read the key and if a new statement with the same + * key arrives we will be sent to read view. + */ + if (itr->tx != NULL) { + rc = vy_tx_track(itr->tx, itr->index, + (struct tuple *) itr->key, false); + } + /* Save version before yield */ + uint32_t mem_list_version = itr->index->mem_list_version; + + rc = vy_point_iterator_scan_slices(itr, &history); + if (rc != 0) + goto done; + + if (mem_list_version != itr->index->mem_list_version) { + /* + * Mem list was changed during yield. This could be rotation + * or a dump. In case of dump the memory referenced by + * statement history is gone and we need to reread new history. + * This in unnecessary in case of rotation but since we + * cannot distinguish these two cases we always restart. + */ + vy_point_iterator_cleanup(&history, region_svp); + goto restart; + } + +done: + if (rc == 0) + rc = vy_point_iterator_apply_history(itr, &history); + *result = itr->curr_stmt; + vy_point_iterator_cleanup(&history, region_svp); + return rc; +} + static void vy_read_iterator_add_tx(struct vy_read_iterator *itr) { @@ -7861,6 +8320,36 @@ vy_read_iterator_next(struct vy_read_iterator *itr, struct tuple **result) { ev_tstamp start_time = ev_now(loop()); + /* The key might be set to NULL during previous call, that means + * that there's no more data */ + if (itr->key == NULL) { + *result = NULL; + return 0; + } + bool one_value = false; + if (itr->iterator_type == ITER_EQ) { + if (itr->index->opts.is_unique) + one_value = tuple_field_count(itr->key) >= + itr->index->user_key_def->part_count; + else + one_value = tuple_field_count(itr->key) >= + itr->index->key_def->part_count; + } + /* Run a special iterator for a special case */ + if (one_value) { + struct vy_point_iterator one; + vy_point_iterator_open(&one, itr->index, itr->tx, + itr->read_view, itr->key); + int rc = vy_point_iterator_get(&one, result); + if (*result) { + tuple_ref(*result); + itr->curr_stmt = *result; + } + vy_point_iterator_close(&one); + itr->key = NULL; + return rc; + } + *result = NULL; if (!itr->search_started) diff --git a/src/box/vy_cache.c b/src/box/vy_cache.c index 7ce43a8efbdb2debd941ccec14a071fc1224aaca..850d286a197eac9172a74bc27f57caa0726f420b 100644 --- a/src/box/vy_cache.c +++ b/src/box/vy_cache.c @@ -357,6 +357,16 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt, VY_CACHE_RIGHT_LINKED) ^ flag; } +struct tuple * +vy_cache_get(struct vy_cache *cache, const struct tuple *key) +{ + struct vy_cache_entry **entry = + vy_cache_tree_find(&cache->cache_tree, key); + if (entry == NULL) + return NULL; + return (*entry)->stmt; +} + void vy_cache_on_write(struct vy_cache *cache, const struct tuple *stmt, struct tuple **deleted) diff --git a/src/box/vy_cache.h b/src/box/vy_cache.h index 2b2dab5150f649b8e55c7255a6019be7f2b1c47c..ed4453e5f430337ab20d11e25a5fbcc33fdea290 100644 --- a/src/box/vy_cache.h +++ b/src/box/vy_cache.h @@ -90,8 +90,8 @@ vy_cache_tree_key_cmp(struct vy_cache_entry *a, #define BPS_TREE_NAME vy_cache_tree #define BPS_TREE_BLOCK_SIZE 512 #define BPS_TREE_EXTENT_SIZE VY_CACHE_TREE_EXTENT_SIZE -#define BPS_TREE_COMPARE(a, b, index) vy_cache_tree_cmp(a, b, index) -#define BPS_TREE_COMPARE_KEY(a, b, index) vy_cache_tree_key_cmp(a, b, index) +#define BPS_TREE_COMPARE(a, b, key_def) vy_cache_tree_cmp(a, b, key_def) +#define BPS_TREE_COMPARE_KEY(a, b, key_def) vy_cache_tree_key_cmp(a, b, key_def) #define bps_tree_elem_t struct vy_cache_entry * #define bps_tree_key_t const struct tuple * #define bps_tree_arg_t struct key_def * @@ -190,6 +190,13 @@ vy_cache_add(struct vy_cache *cache, struct tuple *stmt, struct tuple *prev_stmt, const struct tuple *key, enum iterator_type order); +/** + * Find value in cache. + * @return A tuple equal to key or NULL if not found. + */ +struct tuple * +vy_cache_get(struct vy_cache *cache, const struct tuple *key); + /** * Invalidate possibly cached value due to its overwriting * @param cache - pointer to tuple cache.