From a1e005d80cb43b605b8777af1bbeca9e904c1a9b Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Date: Fri, 2 Jun 2017 16:38:00 +0300 Subject: [PATCH] vinyl: write_iterator merges vlsns subsequnces Now write iterator see only oldest vlsn. Because of it the write iterator can not skip or merge or squash newer statements, even if they are not used by any transactions. For example, consider one key and its lsns: LSNs: 10 20 30 40 50 Read Views: * * * Statements with lsn 40 can be merged with 50; 20 can be merged with 30. Old write iterator will merge nothing in that case, because there is oldest vlsn = 10. Lets allow the write iterator to see all read views, instead of only oldest. Such write iterator can merge multiple lsns subsequences of each key. For the example above it will merge in the following way: LSNs: 10 20 30 40 50 Read Views: * * * Merge and \__/\_________/\________/ return: merge, merge, merge and return each merged result. For the considered case the difference is 3 result statements instead of 5 by the old write iterator. Closes #1920 Closes #2502 Closes #1824 Closes #2578 --- src/box/vinyl.c | 8 +- src/box/vy_run.c | 27 +- src/box/vy_stmt.h | 26 ++ src/box/vy_write_iterator.c | 616 +++++++++++++++++++++++------ src/box/vy_write_iterator.h | 196 ++++----- test/unit/CMakeLists.txt | 3 + test/unit/vy_iterators_helper.c | 176 +++++++++ test/unit/vy_iterators_helper.h | 139 +++++++ test/unit/vy_write_iterator.c | 418 ++++++++++++++++++++ test/unit/vy_write_iterator.result | 39 ++ 10 files changed, 1406 insertions(+), 242 deletions(-) create mode 100644 test/unit/vy_iterators_helper.c create mode 100644 test/unit/vy_iterators_helper.h create mode 100644 test/unit/vy_write_iterator.c create mode 100644 test/unit/vy_write_iterator.result diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 75b6815a44..54e885467f 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -947,7 +947,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (index->run_count == 0); wi = vy_write_iterator_new(index->key_def, index->surrogate_format, index->upsert_format, index->id == 0, - is_last_level, tx_manager_vlsn(xm)); + is_last_level, &xm->read_views); if (wi == NULL) goto err_wi; rlist_foreach_entry(mem, &index->sealed, in_sealed) { @@ -1203,7 +1203,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (range->compact_priority == range->slice_count); wi = vy_write_iterator_new(index->key_def, index->surrogate_format, index->upsert_format, index->id == 0, - is_last_level, tx_manager_vlsn(xm)); + is_last_level, &xm->read_views); if (wi == NULL) goto err_wi; @@ -4057,9 +4057,11 @@ vy_send_range(struct vy_join_ctx *ctx) return 0; /* nothing to do */ int rc = -1; + struct rlist fake_read_views; + rlist_create(&fake_read_views); ctx->wi = vy_write_iterator_new(ctx->key_def, ctx->format, ctx->upsert_format, - true, true, INT64_MAX); + true, true, &fake_read_views); if (ctx->wi == NULL) goto out; diff --git a/src/box/vy_run.c b/src/box/vy_run.c index a1e33e7651..73b69b605c 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -2051,6 +2051,9 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, struct vy_page_info *page = NULL; const char *region_key; bool end_of_run = false; + /* Last written statement */ + struct tuple *last_stmt = *curr_stmt; + vy_stmt_ref_if_possible(last_stmt); /* row offsets accumulator */ struct ibuf row_index_buf; @@ -2086,8 +2089,6 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, vy_page_info_create(page, data_xlog->offset, *curr_stmt, key_def); xlog_tx_begin(data_xlog); - /* Last written statement */ - struct tuple *last_stmt = NULL; do { uint32_t *offset = (uint32_t *) ibuf_alloc(&row_index_buf, sizeof(uint32_t)); @@ -2098,13 +2099,6 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, } *offset = page->unpacked_size; - if (last_stmt != NULL && vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); - - last_stmt = *curr_stmt; - if (vy_stmt_is_refable(last_stmt)) - tuple_ref(last_stmt); - if (vy_run_dump_stmt(*curr_stmt, data_xlog, page, key_def, is_primary) != 0) goto error_rollback; @@ -2118,8 +2112,13 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, if (wi->iface->next(wi, curr_stmt)) goto error_rollback; - if (*curr_stmt == NULL) + if (*curr_stmt == NULL) { end_of_run = true; + } else { + vy_stmt_unref_if_possible(last_stmt); + last_stmt = *curr_stmt; + vy_stmt_ref_if_possible(last_stmt); + } } while (end_of_run == false && obuf_size(&data_xlog->obuf) < page_size); @@ -2142,9 +2141,8 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, if (run->info.max_key == NULL) goto error_rollback; } - - if (vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); + vy_stmt_unref_if_possible(last_stmt); + last_stmt = NULL; /* Save offset to row index */ page->row_index_offset = page->unpacked_size; @@ -2180,10 +2178,9 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, error_rollback: xlog_tx_rollback(data_xlog); - if (last_stmt != NULL && vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); error_row_index: ibuf_destroy(&row_index_buf); + vy_stmt_unref_if_possible(last_stmt); return -1; } diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index bc335e207f..e111411aa8 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -259,6 +259,32 @@ vy_stmt_is_refable(const struct tuple *stmt) return stmt->refs > 0; } +/** + * Ref tuple, if it exists (!= NULL) and can be referenced. + * @sa vy_stmt_is_refable. + * + * @param tuple Tuple to ref or NULL. + */ +static inline void +vy_stmt_ref_if_possible(struct tuple *stmt) +{ + if (stmt != NULL && vy_stmt_is_refable(stmt)) + tuple_ref(stmt); +} + +/** + * Unref tuple, if it exists (!= NULL) and can be unreferenced. + * @sa vy_stmt_is_refable. + * + * @param tuple Tuple to unref or NULL. + */ +static inline void +vy_stmt_unref_if_possible(struct tuple *stmt) +{ + if (stmt != NULL && vy_stmt_is_refable(stmt)) + tuple_unref(stmt); +} + /** * Specialized comparators are faster than general-purpose comparators. * For example, vy_stmt_compare - slowest comparator because it in worst case diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c index ceefd408b5..1b823e295f 100644 --- a/src/box/vy_write_iterator.c +++ b/src/box/vy_write_iterator.c @@ -33,6 +33,7 @@ #include "vy_run.h" #include "vy_upsert.h" #include "column_mask.h" +#include "fiber.h" #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" @@ -54,11 +55,6 @@ struct vy_write_src { struct heap_node heap_node; /* Current tuple in the source (lowest and with maximal lsn) */ struct tuple *tuple; - /** - * Is the tuple (@sa tuple) refable or not. - * Tuples from mems are reafble, from runs - not - */ - bool is_tuple_refable; /** * There are special rules of comparison for virtual sources * that represent a delimiter between the current key and @@ -76,8 +72,90 @@ struct vy_write_src { }; /** - * Write iterator itself. + * Sequence of verions of a key, sorted by LSN in ascending order. + * (history->tuple.lsn < history->next->tuple.lsn). + */ +struct vy_write_history { + /** Next version with greater LSN. */ + struct vy_write_history *next; + /** Key version. */ + struct tuple *tuple; +}; + +/** + * Create a new vy_write_history object, save a statement into it + * and link with a newer version. + * + * @param region Allocator for the object. + * @param tuple Key version. + * @param next Next version of the key. + * + * @return not NULL Created object. + * @return NULL Memory error. + */ +static inline struct vy_write_history * +vy_write_history_new(struct region *region, struct tuple *tuple, + struct vy_write_history *next) +{ + struct vy_write_history *h = + region_alloc_object(region, struct vy_write_history); + if (h == NULL) + return NULL; + h->tuple = tuple; + assert(next == NULL || (next->tuple != NULL && + vy_stmt_lsn(next->tuple) > vy_stmt_lsn(tuple))); + h->next = next; + if (vy_stmt_is_refable(tuple)) + tuple_ref(tuple); + return h; +} + +/** + * Clear entire sequence of versions of a key. Free resources of + * each version. + * @param history History to clear. */ +static inline void +vy_write_history_destroy(struct vy_write_history *history) +{ + do { + if (history->tuple != NULL && + vy_stmt_is_refable(history->tuple)) + tuple_unref(history->tuple); + history = history->next; + } while (history != NULL); +} + +/** Read view of a key. */ +struct vy_read_view_stmt { + /** Read view LSN. */ + int64_t vlsn; + /** Result key version, visible for the @vlsn. */ + struct tuple *tuple; + /** + * Sequence of key versions. It is merged at the end of + * the key building into @tuple. + */ + struct vy_write_history *history; +}; + +/** + * Free resources, unref tuples, including all tuples in the + * history. + * @param rv Read view to clear. + */ +static inline void +vy_read_view_stmt_destroy(struct vy_read_view_stmt *rv) +{ + if (rv->tuple != NULL && vy_stmt_is_refable(rv->tuple)) + tuple_unref(rv->tuple); + rv->tuple = NULL; + if (rv->history != NULL) + vy_write_history_destroy(rv->history); + rv->history = NULL; +} + +/* @sa vy_write_iterator.h */ struct vy_write_iterator { /** Parent class, must be the first member */ struct vy_stmt_stream base; @@ -85,28 +163,46 @@ struct vy_write_iterator { struct rlist src_list; /* Heap with sources with the lowest source in head */ heap_t src_heap; - /** - * Tuple that was returned in the last vy_write_iterator_next call, - * or the tuple to be returned in vy_write_iterator_next execution. - */ - struct tuple *tuple; - /** - * Is the tuple (member) refable or not. - * Tuples from runs are reafable, from mems - not. - */ - bool is_tuple_refable; /** Index key definition used for storing statements on disk. */ const struct key_def *key_def; /** Format to allocate new REPLACE and DELETE tuples from vy_run */ struct tuple_format *format; /** Same as format, but for UPSERT tuples. */ struct tuple_format *upsert_format; - /* The minimal VLSN among all active transactions */ - int64_t oldest_vlsn; /* There are is no level older than the one we're writing to. */ bool is_last_level; /** Set if this iterator is for a primary index. */ bool is_primary; + + /** Length of the @read_views. */ + int rv_count; + /** Count of not empty read views. */ + int rv_used_count; + /** + * Current read view statement in @read_views. It is used + * to return key versions one by one from + * vy_write_iterator_next. + */ + int stmt_i; + /** + * Read views of the same key, sorted by lsn in + * descending order and started from the INT64_MAX. Each + * is referenced if needed. Example: + * stmt_count = 3 + * rv_count = 6 + * 0 1 2 3 4 5 + * [lsn=6, lsn=5, lsn=4, -, -, -] + * + * @Read_views can have gaps, if there are read views with + * the same key versions. Example: + * + * LSN: 20 - - - 10 9 8 + * Read views: * * * * * + * @read_views array: [lsn=20, -, -, -, lsn=10, -, -] + * \___________________/ + * Same versions of the key. + */ + struct vy_read_view_stmt read_views[0]; }; /** @@ -147,11 +243,10 @@ heap_less(heap_t *heap, struct heap_node *node1, struct heap_node *node2) * Allocate a source and put it to the list. * The underlying stream (src->stream) must be opened immediately. * @param stream - the write iterator. - * @param is_tuple_refable - true for runs and false for mems. * @return the source or NULL on memory error. */ static struct vy_write_src * -vy_write_iterator_new_src(struct vy_write_iterator *stream, bool is_tuple_refable) +vy_write_iterator_new_src(struct vy_write_iterator *stream) { struct vy_write_src *res = (struct vy_write_src *) malloc(sizeof(*res)); if (res == NULL) { @@ -159,7 +254,6 @@ vy_write_iterator_new_src(struct vy_write_iterator *stream, bool is_tuple_refabl "malloc", "write stream src"); return NULL; } - res->is_tuple_refable = is_tuple_refable; res->is_end_of_key = false; rlist_add(&stream->src_list, &res->in_src_list); return res; @@ -234,14 +328,33 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface; struct vy_stmt_stream * vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format, struct tuple_format *upsert_format, bool is_primary, - bool is_last_level, int64_t oldest_vlsn) + bool is_last_level, struct rlist *read_views) { + /* + * One is reserved for the INT64_MAX - maximal read view. + */ + int count = 1; + struct rlist *unused; + rlist_foreach(unused, read_views) + ++count; + size_t size = sizeof(struct vy_write_iterator) + + count * sizeof(struct vy_read_view_stmt); struct vy_write_iterator *stream = - (struct vy_write_iterator *) malloc(sizeof(*stream)); + (struct vy_write_iterator *) calloc(1, size); if (stream == NULL) { - diag_set(OutOfMemory, sizeof(*stream), "malloc", "write stream"); + diag_set(OutOfMemory, size, "malloc", "write stream"); return NULL; } + stream->stmt_i = -1; + stream->rv_count = count; + stream->read_views[0].vlsn = INT64_MAX; + count--; + struct vy_read_view *rv; + /* Descending order. */ + rlist_foreach_entry(rv, read_views, in_read_views) + stream->read_views[count--].vlsn = rv->vlsn; + assert(count == 0); + stream->base.iface = &vy_slice_stream_iface; src_heap_create(&stream->src_heap); rlist_create(&stream->src_list); @@ -251,38 +364,10 @@ vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format stream->upsert_format = upsert_format; tuple_format_ref(stream->upsert_format, 1); stream->is_primary = is_primary; - stream->oldest_vlsn = oldest_vlsn; stream->is_last_level = is_last_level; - stream->tuple = NULL; - stream->is_tuple_refable = false; return &stream->base; } -/** - * Set stream->tuple as a tuple to be output as a result of .._next call. - * Ref the new tuple if necessary, unref older value if needed. - * @param stream - the write iterator. - * @param tuple - the tuple to be saved. - * @param is_tuple_refable - is the tuple must of must not be referenced. - */ -static void -vy_write_iterator_set_tuple(struct vy_write_iterator *stream, - struct tuple *tuple, bool is_tuple_refable) -{ - if (stream->tuple != NULL && tuple != NULL) - assert(tuple_compare(stream->tuple, tuple, stream->key_def) < 0 || - vy_stmt_lsn(stream->tuple) >= vy_stmt_lsn(tuple)); - - if (stream->tuple != NULL && stream->is_tuple_refable) - tuple_unref(stream->tuple); - - stream->tuple = tuple; - stream->is_tuple_refable = is_tuple_refable; - - if (stream->tuple != NULL && stream->is_tuple_refable) - tuple_ref(stream->tuple); -} - /** * Start the search. Must be called after *add* methods and * before *next* method. @@ -309,7 +394,8 @@ vy_write_iterator_stop(struct vy_stmt_stream *vstream) { assert(vstream->iface->stop == vy_write_iterator_stop); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - vy_write_iterator_set_tuple(stream, NULL, false); + for (int i = 0; i < stream->rv_count; ++i) + vy_read_view_stmt_destroy(&stream->read_views[i]); struct vy_write_src *src, *tmp; rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) vy_write_iterator_delete_src(stream, src); @@ -337,7 +423,7 @@ NODISCARD int vy_write_iterator_add_mem(struct vy_stmt_stream *vstream, struct vy_mem *mem) { struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - struct vy_write_src *src = vy_write_iterator_new_src(stream, false); + struct vy_write_src *src = vy_write_iterator_new_src(stream); if (src == NULL) return -1; vy_mem_stream_open(&src->mem_stream, mem); @@ -353,7 +439,7 @@ vy_write_iterator_add_slice(struct vy_stmt_stream *vstream, struct vy_slice *slice, struct vy_run_env *run_env) { struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - struct vy_write_src *src = vy_write_iterator_new_src(stream, true); + struct vy_write_src *src = vy_write_iterator_new_src(stream); if (src == NULL) return -1; vy_slice_stream_open(&src->slice_stream, slice, stream->key_def, @@ -384,13 +470,115 @@ vy_write_iterator_merge_step(struct vy_write_iterator *stream) } /** - * Squash in the single statement all rest statements of current key - * starting from the current statement. + * Try to get VLSN of the read view with the specified number in + * the vy_write_iterator.read_views array. + * If the requested read view is older than all existing ones, + * return 0, as the oldest possible VLSN. + * + * @param stream Write iterator. + * @param current_rv_i Index of the read view. + * + * @retval VLSN. + */ +static inline int64_t +vy_write_iterator_get_vlsn(struct vy_write_iterator *stream, int rv_i) +{ + if (rv_i >= stream->rv_count) + return 0; + return stream->read_views[rv_i].vlsn; +} + +/** + * Remember the current tuple of the @src as a part of the + * current read view. + * @param History objects allocator. + * @param stream Write iterator. + * @param src Source of the wanted tuple. + * @param current_rv_i Index of the current read view. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static inline int +vy_write_iterator_push_rv(struct region *region, + struct vy_write_iterator *stream, + struct vy_write_src *src, int current_rv_i) +{ + assert(current_rv_i < stream->rv_count); + struct vy_read_view_stmt *rv = &stream->read_views[current_rv_i]; + assert(rv->vlsn >= vy_stmt_lsn(src->tuple)); + struct vy_write_history *h = + vy_write_history_new(region, src->tuple, rv->history); + if (h == NULL) + return -1; + rv->history = h; + return 0; +} + +/** + * Return the next statement from the current key read view + * statements sequence. Unref the previous statement, if needed. + * We can't unref the statement right before returning it to the + * caller, because reference in the read_views array can be + * single reference of this statement, and unref could delete it + * before returning. + * + * @param stream Write iterator. + * @retval not NULL Next statement of the current key. + * @retval NULL End of the key (not the end of the sources). + */ +static inline struct tuple * +vy_write_iterator_pop_read_view_stmt(struct vy_write_iterator *stream) +{ + struct vy_read_view_stmt *rv; + if (stream->stmt_i >= 0) { + /* Destroy the current before getting the next. */ + rv = &stream->read_views[stream->stmt_i]; + assert(rv->history == NULL); + vy_read_view_stmt_destroy(rv); + } + if (stream->rv_used_count == 0) + return NULL; + /* Find a next not empty history element. */ + do { + assert(stream->stmt_i + 1 < stream->rv_count); + stream->stmt_i++; + rv = &stream->read_views[stream->stmt_i]; + assert(rv->history == NULL); + } while (rv->tuple == NULL); + assert(stream->rv_used_count > 0); + stream->rv_used_count--; + return rv->tuple; +} + +/** + * Build the history of the current key. During the history + * building already some optimizations can be applied - + * @sa optimizations 1, 2 and 3 in vy_write_iterator.h. + * During building of a key history, some statements can be + * skipped, but no one can be merged. + * UPSERTs merge is executed on a special 'merge' phase. + * + * @param region History objects allocator. + * @param stream Write iterator. + * @param[out] count Count of statements, saved in a history. + * + * @retval 0 Success. + * @retval -1 Memory error. */ static NODISCARD int -vy_write_iterator_next_key(struct vy_write_iterator *stream) +vy_write_iterator_build_history(struct region *region, + struct vy_write_iterator *stream, int *count) { - assert(stream->tuple != NULL); + *count = 0; + assert(stream->stmt_i == -1); + struct heap_node *node = src_heap_top(&stream->src_heap); + if (node == NULL) + return 0; /* no more data */ + struct vy_write_src *src = + container_of(node, struct vy_write_src, heap_node); + /* Search must be started in the task. */ + assert(src->tuple != NULL); /* * A virtual source instance that represents the end on current key in * source heap. Due to a special branch in heap's comparator the @@ -401,56 +589,253 @@ vy_write_iterator_next_key(struct vy_write_iterator *stream) */ struct vy_write_src end_of_key_src; end_of_key_src.is_end_of_key = true; - end_of_key_src.tuple = stream->tuple; + end_of_key_src.tuple = src->tuple; int rc = src_heap_insert(&stream->src_heap, &end_of_key_src.heap_node); if (rc) { diag_set(OutOfMemory, sizeof(void *), "malloc", "write stream heap"); return rc; } + if (vy_stmt_is_refable(src->tuple)) + tuple_ref(src->tuple); + /* + * For each pair (merge_until_lsn, current_rv_lsn] build + * history of a corresponding read view. + * current_rv_i - index of the current read view. + */ + int current_rv_i = 0; + int64_t current_rv_lsn = vy_write_iterator_get_vlsn(stream, 0); + int64_t merge_until_lsn = vy_write_iterator_get_vlsn(stream, 1); + uint64_t key_mask = stream->key_def->column_mask; while (true) { - struct heap_node *node = src_heap_top(&stream->src_heap); - assert(node != NULL); - struct vy_write_src *src = - container_of(node, struct vy_write_src, heap_node); - assert(src->tuple != NULL); /* Is search started? */ - - if (vy_stmt_type(stream->tuple) == IPROTO_UPSERT && - (!src->is_end_of_key || stream->is_last_level)) { - const struct tuple *apply_to = - src->is_end_of_key ? NULL : src->tuple; - struct tuple *applied = - vy_apply_upsert(stream->tuple, apply_to, - stream->key_def, stream->format, - stream->upsert_format, false); - if (applied == NULL) { - rc = -1; + /* + * Skip statements, unvisible by the current read + * view and unused by the previous read view. + */ + if (vy_stmt_lsn(src->tuple) > current_rv_lsn) + goto next_step; + /* + * The iterator reached the border of two read + * views. + */ + while (vy_stmt_lsn(src->tuple) <= merge_until_lsn) { + /* + * Skip read views, which have the same + * versions of the key. + * The src->tuple must be between + * merge_until_lsn and current_rv_lsn. + */ + current_rv_i++; + current_rv_lsn = merge_until_lsn; + int n = current_rv_i + 1; + merge_until_lsn = + vy_write_iterator_get_vlsn(stream, n); + } + + /* + * Optimization 1: skip last level delete. + * @sa vy_write_iterator for details about this + * and other optimizations. + */ + if (vy_stmt_type(src->tuple) == IPROTO_DELETE && + stream->is_last_level && merge_until_lsn == 0) { + current_rv_lsn = 0; + goto next_step; + } + + /* + * Optimization 2: skip after REPLACE/DELETE. + */ + if (vy_stmt_type(src->tuple) == IPROTO_REPLACE || + vy_stmt_type(src->tuple) == IPROTO_DELETE) { + uint64_t stmt_mask = vy_stmt_column_mask(src->tuple); + /* + * Optimization 3: skip statements, which + * do not update the secondary key. + */ + if (!stream->is_primary && + key_update_can_be_skipped(key_mask, stmt_mask)) + goto next_step; + + rc = vy_write_iterator_push_rv(region, stream, src, + current_rv_i); + if (rc != 0) break; - } - vy_write_iterator_set_tuple(stream, applied, true); - /* refresh tuple in virtual source */ - end_of_key_src.tuple = stream->tuple; + ++*count; + current_rv_i++; + current_rv_lsn = merge_until_lsn; + merge_until_lsn = + vy_write_iterator_get_vlsn(stream, + current_rv_i + 1); + goto next_step; } - if (src->is_end_of_key) + assert(vy_stmt_type(src->tuple) == IPROTO_UPSERT); + rc = vy_write_iterator_push_rv(region, stream, src, + current_rv_i); + if (rc != 0) break; - + ++*count; +next_step: rc = vy_write_iterator_merge_step(stream); if (rc != 0) break; + node = src_heap_top(&stream->src_heap); + assert(node != NULL); + src = container_of(node, struct vy_write_src, heap_node); + assert(src->tuple != NULL); + if (src->is_end_of_key) + break; } src_heap_delete(&stream->src_heap, &end_of_key_src.heap_node); + if (vy_stmt_is_refable(end_of_key_src.tuple)) + tuple_unref(end_of_key_src.tuple); return rc; } +/** + * Apply accumulated UPSERTs in the read view with a hint from + * a previous read view. After merge, the read view must contain + * single statement. + * + * @param stream Write iterator. + * @param previous_version Hint from a previous read view. + * @param rv Read view to merge. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static NODISCARD int +vy_read_view_merge(struct vy_write_iterator *stream, + struct tuple *previous_version, struct vy_read_view_stmt *rv) +{ + assert(rv != NULL); + assert(rv->tuple == NULL); + assert(rv->history != NULL); + struct vy_write_history *h = rv->history; + /* + * Two possible hints to remove the current UPSERT. + * 1. If the previous read view contains DELETE or + * REPLACE, then the current UPSERT can be applied to + * it, whether is_last_level true or not. + * 2. If the stream is working on the last level. Then we + * are sure the UPSERT to be oldest version of a key + * and it can be turned into REPLACE. + */ + if (vy_stmt_type(h->tuple) == IPROTO_UPSERT && + (stream->is_last_level || (previous_version != NULL && + vy_stmt_type(previous_version) != IPROTO_UPSERT))) { + assert(!stream->is_last_level || previous_version == NULL || + vy_stmt_type(previous_version) != IPROTO_UPSERT); + struct tuple *applied = + vy_apply_upsert(h->tuple, previous_version, + stream->key_def, stream->format, + stream->upsert_format, false); + if (applied == NULL) + return -1; + if (vy_stmt_is_refable(h->tuple)) + tuple_unref(h->tuple); + h->tuple = applied; + } + /* Squash rest of UPSERTs. */ + struct vy_write_history *result = h; + h = h->next; + while (h != NULL) { + assert(h->tuple != NULL && + vy_stmt_type(h->tuple) == IPROTO_UPSERT); + assert(result->tuple != NULL); + struct tuple *applied = + vy_apply_upsert(h->tuple, result->tuple, + stream->key_def, stream->format, + stream->upsert_format, false); + if (applied == NULL) + return -1; + if (vy_stmt_is_refable(result->tuple)) + tuple_unref(result->tuple); + result->tuple = applied; + /* + * Before: + * result -> h -> next + * + * Will be truncated by region. + * After: / + * result -. h .-> next + * \_ _ _ _ / + */ + if (vy_stmt_is_refable(h->tuple)) + tuple_unref(h->tuple); + h = h->next; + result->next = h; + } + rv->tuple = result->tuple; + rv->history = NULL; + result->tuple = NULL; + assert(result->next == NULL); + return 0; +} + +/** + * Split the current key into the sequence of the read view + * statements. @sa struct vy_write_iterator comment for details + * about algorithm and optimizations. + * + * @param stream Write iterator. + * @param[out] count Length of the result key versions sequence. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static NODISCARD int +vy_write_iterator_build_read_views(struct vy_write_iterator *stream, int *count) +{ + *count = 0; + int raw_count; + struct region *region = &fiber()->gc; + size_t used = region_used(region); + stream->rv_used_count = 0; + if (vy_write_iterator_build_history(region, stream, &raw_count) != 0) + goto error; + if (raw_count == 0) { + /* A key is fully optimized. */ + region_truncate(region, used); + return 0; + } + /* Find a first not empty read view. */ + struct vy_read_view_stmt *rv = + &stream->read_views[stream->rv_count - 1]; + while (rv > &stream->read_views[0] && rv->history == NULL) + --rv; + /* + * At least one statement has been found, since raw_count + * here > 0. + */ + assert(rv >= &stream->read_views[0] && rv->history != NULL); + struct tuple *previous_version = NULL; + for (; rv >= &stream->read_views[0]; --rv) { + if (rv->history == NULL) + continue; + if (vy_read_view_merge(stream, previous_version, rv) != 0) + goto error; + stream->rv_used_count++; + ++*count; + previous_version = rv->tuple; + assert(rv->history == NULL); + } + region_truncate(region, used); + return 0; +error: + region_truncate(region, used); + return -1; +} + /** * Get the next statement to write. * The user of the write iterator simply expects a stream * of statements to write to the output. * The tuple *ret is guaranteed to be valid until next tuple is - * returned (thus last non-null tuple is valid after EOF). + * returned (thus last non-null tuple is valid after EOF). * * @return 0 on success or not 0 on error (diag is set). */ @@ -461,55 +846,34 @@ vy_write_iterator_next(struct vy_stmt_stream *vstream, assert(vstream->iface->next == vy_write_iterator_next); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; /* - * Nullify the result stmt. If the next stmt is not - * found, this would be a marker of the end of the stream. + * Try to get the next statement from the current key + * read view statements sequence. */ - *ret = NULL; + *ret = vy_write_iterator_pop_read_view_stmt(stream); + if (*ret != NULL) + return 0; - while (true) { - struct heap_node *node = src_heap_top(&stream->src_heap); - if (node == NULL) - return 0; /* no more data */ - struct vy_write_src *src = - container_of(node, struct vy_write_src, heap_node); - assert(src->tuple != NULL); /* Is search started? */ - vy_write_iterator_set_tuple(stream, src->tuple, - src->is_tuple_refable); - - int rc = vy_write_iterator_merge_step(stream); - if (rc != 0) - return -1; - - if (vy_stmt_lsn(stream->tuple) > stream->oldest_vlsn) - break; /* Save the current stmt as the result. */ - - if (vy_stmt_type(stream->tuple) == IPROTO_REPLACE || - vy_stmt_type(stream->tuple) == IPROTO_DELETE) { - /* - * If the tuple has extra size - it has - * column mask of an update operation. - * The tuples from secondary indexes - * which don't modify its keys can be - * skipped during dump, - * @sa vy_can_skip_update(). - */ - if (!stream->is_primary && - key_update_can_be_skipped(stream->key_def->column_mask, - vy_stmt_column_mask(stream->tuple))) - continue; - } + /* Build the next key sequence. */ + stream->stmt_i = -1; + int count = 0; + while (true) { /* Squash upserts and/or go to the next key */ - rc = vy_write_iterator_next_key(stream); - if (rc != 0) + if (vy_write_iterator_build_read_views(stream, &count) != 0) return -1; - - if (vy_stmt_type(stream->tuple) == IPROTO_DELETE && - stream->is_last_level) - continue; /* Skip unnecessary DELETE */ - break; + /* + * Next_key() routine could skip the next key, for + * example, if it was truncated by last level + * DELETE or it consisted only from optimized + * updates. Then try get the next key. + */ + if (count != 0 || stream->src_heap.size == 0) + break; } - *ret = stream->tuple; + /* + * Again try to get the statement, after calling next_key. + */ + *ret = vy_write_iterator_pop_read_view_stmt(stream); return 0; } diff --git a/src/box/vy_write_iterator.h b/src/box/vy_write_iterator.h index 48a9013ad5..7d5d347fae 100644 --- a/src/box/vy_write_iterator.h +++ b/src/box/vy_write_iterator.h @@ -40,104 +40,104 @@ * or over a series of sorted runs on disk to create a new sorted * run (compaction). * - * Use merge iterator to order the output and filter out - * too old statements (older than the oldest active read view). - * - * Squash multiple UPSERT statements over the same key into one, - * if possible. - * * Background * ---------- - * Vinyl provides support for consistent read views. The oldest - * active read view is maintained in the transaction manager. - * To support it, when dumping or compacting statements on disk, - * older versions need to be preserved, and versions outside - * any active read view garbage collected. This task is handled - * by the write iterator. - * - * Filtering - * --------- - * Let's call each transaction consistent read view LSN vlsn. - * - * oldest_vlsn = MIN(vlsn) over all active transactions - * - * Thus to preserve relevant data for every key and purge old - * versions, the iterator works as follows: - * - * If statement lsn is greater than oldest vlsn, the - * statement is preserved. - * - * Otherwise, if statement type is REPLACE/DELETE, then - * it's returned, and the iterator can proceed to the - * next key: the readers do not need the history. - * - * Otherwise, the statement is UPSERT, and in order - * to restore the original tuple from UPSERT the reader - * does need the history: they need to look for an older - * statement to which the UPSERT can be applied to get - * a tuple. This older statement can be UPSERT as well, - * and so on. - * In other words, of statement type is UPSERT, the reader - * needs a range of statements from the youngest statement - * with lsn <= vlsn to the youngest non-UPSERT statement - * with lsn <= vlsn, borders included. - * - * All other versions of this key can be skipped, and hence - * garbage collected. - * - * Squashing and garbage collection - * -------------------------------- - * Filtering and garbage collection, performed by write iterator, - * must have no effect on read views of active transactions: - * they should read the same data as before. - * - * On the other hand, old version should be deleted as soon as possible; - * multiple UPSERTs could be merged together to take up less - * space, or substituted with REPLACE. - * - * Here's how it's done: - * - * - * 1) Every statement with lsn greater than oldest vlsn is preserved - * in the output, since there could be an active transaction - * that needs it. - * - * 2) For all statements with lsn <= oldest_vlsn, only a single - * resultant statement is returned. Here's how. - * - * 2.1) If the youngest statement with lsn <= oldest _vlsn is a - * REPLACE/DELETE, it becomes the resultant statement. - * - * 2.2) Otherwise, it as an UPSERT. Then we must iterate over - * all older LSNs for this key until we find a REPLACE/DELETE - * or exhaust all input streams for this key. - * - * If the older lsn is a yet another UPSERT, two upserts are - * squashed together into one. Otherwise we found an - * REPLACE/DELETE, so apply all preceding UPSERTs to it and - * get the resultant statement. - * - * There is an extra twist to this algorithm, used when performing - * compaction of the last LSM level (i.e. merging all existing - * runs into one). The last level does not need to store DELETEs. - * Thus we can: - * 1) Completely skip the resultant statement from output if it's - * a DELETE. - * | ... | | ... | - * | | | | ^ - * +- oldest vlsn -+ = +- oldest lsn -+ ^ lsn - * | | | | ^ - * | DELETE | +--------------+ - * | ... | - * 2) Replace an accumulated resultant UPSERT with an appropriate - * REPLACE. - * | ... | | ... | - * | UPSERT | | REPLACE | - * | | | | ^ - * +- oldest vlsn -+ = +- oldest lsn -+ ^ lsn - * | | | | ^ - * | DELETE | +--------------+ - * | ... | + * With no loss of generality, lets consider the write_iterator to + * have a single statements source (struct vy_write_src). Each key + * consists of an LSNs sequence. A range of control points also + * exists, named read views, each of which is characterized by + * their VLSN. By default, for the biggest VLSN INT64_MAX is used, + * and for the smallest one 0 is used: + * + * [0, vlsn1, vlsn2, vlsn3, ... INT64_MAX]. + * + * The purpose of the write_iterator is to split LSNs sequence of + * one key into subsequences, bordered with VLSNs, and then merge + * each subsequence into a one statement. + * -------- + * ONE KEY: + * -------- + * 0 VLSN1 VLSN2 ... INT64_MAX + * | | | | + * | LSN1 ... LSN(i)|LSN(i+1) ... LSN(j)|LSN(j+1) ... LSN(N)| + * \_______________/\__________________/\___________________/ + * merge merge merge + * + * A range of optimizations is possible, which allow decrease + * count of source statements and count of merged subsequences. + * + * --------------------------------------------------------------- + * Optimization 1: skip DELETE from the last level of the oldest + * read view. + * --------------------------- + * ONE KEY, LAST LEVEL SOURCE: + * --------------------------- + * LSN1 LSN2 ... DELETE LSNi LSNi+1 ... LSN_N + * \___________________________/\___________________________/ + * skip merge + * + * Details: if the source is the oldest source of all, then it is + * not necessary to write any DELETE to the disk, including all + * LSNs of the same key, which are older than this DELETE. However + * such a skip is possible only if there is no read views to the + * same key, older than this DELETE, because read views can't be + * skipped. + * + * --------------------------------------------------------------- + * Optimization 2: on REPLACE/DELETE skip rest of statements, + * until the next read view. + * -------- + * ONE KEY: + * -------- + * ... LSN(k) ... LSN(i-1) LSN(i) REPLACE/DELETE + * Read + * views: * * + * _____________/\_____________________________/\__________/ + * merge skip return + * + * Is is possible, since the REPLACE and DELETE discard all older + * key versions. + * + * --------------------------------------------------------------- + * Optimization 3: skip statements, which do not update the + * secondary key. + * + * Masks intersection: not 0 0 0 not 0 not 0 + * KEY: LSN1 DELETE REPLACE LSN5 ... REPLACE + * \____/\_______________/\__________________/ + * merge skip merge + * + * Details: if UPDATE is executed, it is transformed into + * DELETE + REPLACE or single REPLACE. But in the secondary index + * only keys are stored and if such UPDATE didn't change this key, + * then there is no necessity to write this UPDATE. Actually, + * existance of such UPDATEs is simply ignored. + * + * --------------------------------------------------------------- + * Optimization 4: use older REPLACE/DELETE as a hints to apply + * newer UPSERTs. + * + * After building history of a key, UPSERT sequences can be + * accumulated in some read views. If the older read views have + * REPLACE/DELETE, they can be used to turn newer UPSERTs into + * REPLACEs. + * -------- + * ONE KEY: + * -------- + * LSN1 LSN2 LSN3 LSN4 + * REPLACE UPSERT UPSERT UPSERT + * Read + * views: * * * * + * ^ \________________________/ + * +- - - - - - - -< apply + * Result: + * LSN1 LSN2 LSN3 LSN4 + * REPLACE REPLACE REPLACE REPLACE + * Read + * views: * * * * + * + * See implementation details in + * vy_write_iterator_build_read_views. */ struct vy_write_iterator; @@ -156,13 +156,13 @@ struct vy_run_env; * @param upsert_format - same as format, but for UPSERT tuples. * @param is_primary - set if this iterator is for a primary index. * @param is_last_level - there is no older level than the one we're writing to. - * @param oldest_vlsn - the minimal VLSN among all active transactions. + * @param read_views - Opened read views. * @return the iterator or NULL on error (diag is set). */ struct vy_stmt_stream * vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format, struct tuple_format *upsert_format, bool is_primary, - bool is_last_level, int64_t oldest_vlsn); + bool is_last_level, struct rlist *read_views); /** * Add a mem as a source of iterator. diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 3bfe94b9b8..d8b2b03705 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -135,3 +135,6 @@ target_link_libraries(vy_mem.test box unit) add_executable(column_mask.test column_mask.c) target_link_libraries(column_mask.test box unit) + +add_executable(vy_write_iterator.test vy_write_iterator.c vy_iterators_helper.c) +target_link_libraries(vy_write_iterator.test box unit) diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c new file mode 100644 index 0000000000..ef1e6d8e83 --- /dev/null +++ b/test/unit/vy_iterators_helper.c @@ -0,0 +1,176 @@ +#include "vy_iterators_helper.h" + +struct tuple * +vy_new_simple_stmt(struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask, + const struct vy_stmt_template *templ) +{ + /* Calculate binary size. */ + int i = 0; + size_t size = 0; + while (templ->fields[i] != vyend) { + fail_if(i > MAX_FIELDS_COUNT); + if (templ->fields[i] >= 0) + size += mp_sizeof_uint(templ->fields[i]); + else + size += mp_sizeof_int(templ->fields[i]); + ++i; + } + size += mp_sizeof_array(i); + fail_if(templ->optimize_update && templ->type == IPROTO_UPSERT); + if (templ->optimize_update) + format = format_with_colmask; + + /* Encode the statement. */ + char *buf = (char *) malloc(size); + fail_if(buf == NULL); + char *pos = mp_encode_array(buf, i); + i = 0; + struct tuple *ret = NULL; + while (templ->fields[i] != vyend) { + if (templ->fields[i] >= 0) + pos = mp_encode_uint(pos, templ->fields[i]); + else + pos = mp_encode_int(pos, templ->fields[i]); + ++i; + } + + /* + * Create the result statement, using one of the formats. + */ + if (templ->type == IPROTO_REPLACE || templ->type == IPROTO_DELETE) { + ret = vy_stmt_new_replace(format, buf, pos); + fail_if(ret == NULL); + if (templ->type == IPROTO_REPLACE) + goto end; + + struct tuple *tmp = vy_stmt_new_surrogate_delete(format, ret); + fail_if(tmp == NULL); + tuple_unref(ret); + ret = tmp; + goto end; + } + if (templ->type == IPROTO_UPSERT) { + /* + * Create the upsert statement without operations. + * Validation of result of UPSERT operations + * applying is not a test for the iterators. + * For the iterators only UPSERT type is + * important. + */ + struct iovec operations[1]; + char tmp[16]; + char *ops = mp_encode_array(tmp, 1); + ops = mp_encode_array(ops, 0); + operations[0].iov_base = tmp; + operations[0].iov_len = ops - tmp; + fail_if(templ->optimize_update); + ret = vy_stmt_new_upsert(upsert_format, buf, pos, + operations, 1); + fail_if(ret == NULL); + goto end; + } + fail_if(true); +end: + free(buf); + vy_stmt_set_lsn(ret, templ->lsn); + if (templ->optimize_update) + vy_stmt_set_column_mask(ret, 0); + return ret; +} + +void +vy_mem_insert_template(struct vy_mem *mem, const struct vy_stmt_template *templ) +{ + struct tuple *stmt; + if (templ->type == IPROTO_UPSERT) { + stmt = vy_new_simple_stmt(mem->format, mem->upsert_format, + mem->format_with_colmask, templ); + } else { + stmt = vy_new_simple_stmt(mem->format, mem->upsert_format, + mem->format_with_colmask, templ); + } + struct tuple *region_stmt = vy_stmt_dup_lsregion(stmt, mem->allocator, + mem->generation); + assert(region_stmt != NULL); + tuple_unref(stmt); + if (templ->type == IPROTO_UPSERT) + vy_mem_insert_upsert(mem, region_stmt); + else + vy_mem_insert(mem, region_stmt); +} + +void +init_read_views_list(struct rlist *rlist, struct vy_read_view *rvs, + const int *vlsns, int count) +{ + rlist_create(rlist); + for (int i = 0; i < count; ++i) { + rvs[i].vlsn = vlsns[i]; + rlist_add_tail_entry(rlist, &rvs[i], in_read_views); + } +} + +struct vy_mem * +create_test_mem(struct lsregion *region, struct key_def *def) +{ + /* Create format */ + struct tuple_format *format = tuple_format_new(&vy_tuple_format_vtab, + &def, def->part_count, + 0); + fail_if(format == NULL); + + /* Create format with column mask */ + struct tuple_format *format_with_colmask = + vy_tuple_format_new_with_colmask(format); + assert(format_with_colmask != NULL); + + /* Create upsert format */ + struct tuple_format *format_upsert = + vy_tuple_format_new_upsert(format); + assert(format_upsert != NULL); + + /* Create mem */ + struct vy_mem *mem = vy_mem_new(region, 1, def, format, + format_with_colmask, format_upsert, 0); + fail_if(mem == NULL); + return mem; +} + +bool +vy_stmt_are_same(const struct tuple *actual, + const struct vy_stmt_template *expected, + struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask) +{ + if (vy_stmt_type(actual) != expected->type) + return false; + struct tuple *tmp = vy_new_simple_stmt(format, upsert_format, + format_with_colmask, expected); + fail_if(tmp == NULL); + uint32_t a_len, b_len; + const char *a, *b; + if (vy_stmt_type(actual) == IPROTO_UPSERT) { + a = vy_upsert_data_range(actual, &a_len); + } else { + a = tuple_data_range(actual, &a_len); + } + if (vy_stmt_type(tmp) == IPROTO_UPSERT) { + b = vy_upsert_data_range(tmp, &b_len); + } else { + b = tuple_data_range(tmp, &b_len); + } + if (a_len != b_len) { + tuple_unref(tmp); + return false; + } + if (vy_stmt_lsn(actual) != expected->lsn) { + tuple_unref(tmp); + return false; + } + bool rc = memcmp(a, b, a_len) == 0; + tuple_unref(tmp); + return rc; +} diff --git a/test/unit/vy_iterators_helper.h b/test/unit/vy_iterators_helper.h new file mode 100644 index 0000000000..bba909327c --- /dev/null +++ b/test/unit/vy_iterators_helper.h @@ -0,0 +1,139 @@ +#ifndef INCLUDES_TARANTOOL_TEST_VY_ITERATORS_HELPER_H +#define INCLUDES_TARANTOOL_TEST_VY_ITERATORS_HELPER_H +/* + * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <sys/uio.h> +#include "unit.h" +#include "vy_stmt.h" +#include "small/rlist.h" +#include "small/lsregion.h" +#include "vy_mem.h" +#include "vy_stmt_iterator.h" + +#define vyend 99999999 +#define MAX_FIELDS_COUNT 100 +#define STMT_TEMPLATE(lsn, type, ...) \ +{ { __VA_ARGS__, vyend }, IPROTO_##type, lsn, false } + +#define STMT_TEMPLATE_OPTIMIZED(lsn, type, ...) \ +{ { __VA_ARGS__, vyend }, IPROTO_##type, lsn, true } + +extern struct tuple_format_vtab vy_tuple_format_vtab; + +/** Template for creation a vinyl statement. */ +struct vy_stmt_template { + /** Array of statement fields, ended with 'vyend'. */ + const int fields[MAX_FIELDS_COUNT]; + /** Statement type: REPLACE/UPSERT/DELETE. */ + enum iproto_type type; + /** Statement lsn. */ + int64_t lsn; + /* + * True, if statement must have column mask, that allows + * to skip it in the write_iterator. + */ + bool optimize_update; +}; + +/** + * Create a new vinyl statement using the specified template. + * + * @param format + * @param upsert_format Format for upsert statements. + * @param format_with_colmask Format for statements with a + * colmask. + * @param templ Statement template. + * + * @return Created statement. + */ +struct tuple * +vy_new_simple_stmt(struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask, + const struct vy_stmt_template *templ); + +/** + * Insert into the mem the statement, created by the specified + * template. + * + * @param vy_mem Mem to insert into. + * @param templ Statement template to insert. + */ +void +vy_mem_insert_template(struct vy_mem *mem, + const struct vy_stmt_template *templ); + +/** + * Create a list of read views using the specified vlsns. + * + * @param rlist[out] Result list of read views. + * @param rvs[out] Read views array. + * @param vlsns Array of read view lsns, sorted in ascending + * order. + * @param count Size of the @vlsns. + */ +void +init_read_views_list(struct rlist *rlist, struct vy_read_view *rvs, + const int *vlsns, int count); + +/** + * Create vy_mem with the specified key_def, using the @region as + * allocator. + * + * @param region Allocator for statements and bps. + * @param def Key definition. + * + * @return New vy_mem. + */ +struct vy_mem * +create_test_mem(struct lsregion *region, struct key_def *def); + +/** + * Check that the template specifies completely the same statement + * as @stmt. + * + * @param stmt Actual value. + * @param templ Expected value. + * @param format Template statement format. + * @param upsert_format Template upsert statement format. + * @param format_with_colmask Template statement format with colmask. + * + * @retval stmt === template. + */ +bool +vy_stmt_are_same(const struct tuple *actual, + const struct vy_stmt_template *expected, + struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask); + +#endif diff --git a/test/unit/vy_write_iterator.c b/test/unit/vy_write_iterator.c new file mode 100644 index 0000000000..94c0087260 --- /dev/null +++ b/test/unit/vy_write_iterator.c @@ -0,0 +1,418 @@ +#include "memory.h" +#include "fiber.h" +#include "vy_write_iterator.h" +#include <small/slab_cache.h> +#include "vy_iterators_helper.h" + +/** + * Create the mem with the specified key_def and content, iterate + * over it with write_iterator and compare actual result + * statements with the expected ones. + * + * @param key_def Key definition for the mem. + * @param content Mem content statements. + * @param content_count Size of the @content. + * @param expected Expected results of the iteration. + * @param expected_count Size of the @expected. + * @param vlsns Read view lsns for the write iterator. + * @param vlsns_count Size of the @vlsns. + * @param is_primary True, if the new mem belongs to the primary + * index. + * @param is_last_level True, if the new mem is the last level. + */ +void +compare_write_iterator_results(struct key_def *key_def, + const struct vy_stmt_template *content, + int content_count, + const struct vy_stmt_template *expected, + int expected_count, + const int *vlsns, int vlsns_count, + bool is_primary, bool is_last_level) +{ + /* Create lsregion */ + struct lsregion lsregion; + struct slab_cache *slab_cache = cord_slab_cache(); + lsregion_create(&lsregion, slab_cache->arena); + struct vy_mem *mem = create_test_mem(&lsregion, key_def); + for (int i = 0; i < content_count; ++i) + vy_mem_insert_template(mem, &content[i]); + struct rlist rv_list; + struct vy_read_view *rv_array = malloc(sizeof(*rv_array) * vlsns_count); + fail_if(rv_array == NULL); + init_read_views_list(&rv_list, rv_array, vlsns, vlsns_count); + + struct vy_stmt_stream *wi = + vy_write_iterator_new(key_def, mem->format, mem->upsert_format, + is_primary, is_last_level, &rv_list); + fail_if(wi == NULL); + fail_if(vy_write_iterator_add_mem(wi, mem) != 0); + + struct tuple *ret; + fail_if(wi->iface->start(wi) != 0); + int i = 0; + do { + fail_if(wi->iface->next(wi, &ret) != 0); + if (ret == NULL) + break; + fail_if(i >= expected_count); + ok(vy_stmt_are_same(ret, &expected[i], mem->format, + mem->upsert_format, + mem->format_with_colmask), + "stmt %d is correct", i); + ++i; + } while (ret != NULL); + ok(i == expected_count, "correct results count"); + + /* Clean up */ + wi->iface->close(wi); + vy_mem_delete(mem); + lsregion_destroy(&lsregion); + + free(rv_array); +} + +void +test_basic(void) +{ + header(); + plan(36); + + /* Create key_def */ + uint32_t fields[] = { 0 }; + uint32_t types[] = { FIELD_TYPE_UNSIGNED }; + struct key_def *key_def = box_key_def_new(fields, types, 1); + assert(key_def != NULL); + +/* + * STATEMENT: REPL REPL REPL DEL REPL REPL REPL REPL REPL REPL + * LSN: 5 6 7 8 9 10 11 12 13 14 + * READ VIEW: * * * + * \____________/\________/\_________________/\___________/ + * merge merge merge merge + */ +{ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE(6, REPLACE, 1, 2), + STMT_TEMPLATE(7, REPLACE, 1, 3), + STMT_TEMPLATE(8, REPLACE, 1, 4), + STMT_TEMPLATE(9, REPLACE, 1, 5), + STMT_TEMPLATE(10, REPLACE, 1, 6), + STMT_TEMPLATE(11, REPLACE, 1, 7), + STMT_TEMPLATE(12, REPLACE, 1, 8), + STMT_TEMPLATE(13, REPLACE, 1, 9), + STMT_TEMPLATE(14, REPLACE, 1, 10), + }; + const struct vy_stmt_template expected[] = { + content[9], content[7], content[4], content[2] + }; + const int vlsns[] = {7, 9, 12}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: UPS UPS UPS UPS UPS UPS UPS UPS UPS UPS + * LSN: 5 6 7 8 9 10 11 12 13 14 + * READ VIEW: * * * + * \________/\_________________/\_____________/\_____/ + * squash squash squash squash + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, UPSERT, 1, 1), + STMT_TEMPLATE(6, UPSERT, 1, 2), + STMT_TEMPLATE(7, UPSERT, 1, 3), + STMT_TEMPLATE(8, UPSERT, 1, 4), + STMT_TEMPLATE(9, UPSERT, 1, 5), + STMT_TEMPLATE(10, UPSERT, 1, 6), + STMT_TEMPLATE(11, UPSERT, 1, 7), + STMT_TEMPLATE(12, UPSERT, 1, 8), + STMT_TEMPLATE(13, UPSERT, 1, 9), + STMT_TEMPLATE(14, UPSERT, 1, 10), + }; + const struct vy_stmt_template expected[] = { + content[9], + STMT_TEMPLATE(13, UPSERT, 1, 7), + STMT_TEMPLATE(10, UPSERT, 1, 3), + STMT_TEMPLATE(6, UPSERT, 1, 1), + }; + const int vlsns[] = {6, 10, 13}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL DEL UPS REPL + * LSN: 5 6 7 8 + * READ VIEW: * + * \_______________/\_______/ + * \_____\_/_____/ merge + * skip last level merge + * delete + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE(6, DELETE, 1), + STMT_TEMPLATE(7, UPSERT, 1, 2), + STMT_TEMPLATE(8, REPLACE, 1, 3), + }; + const struct vy_stmt_template expected[] = { + content[3], + STMT_TEMPLATE(7, REPLACE, 1, 2) + }; + const int vlsns[] = {7}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: REPL REPL + * LSN: 7 8 + * READ VIEW: * * + * No merge. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, REPLACE, 1, 1), + STMT_TEMPLATE(8, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[1], content[0] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * LINKED WITH: gh-1824, about pruning last DELETE. + * STATEMENT: DEL REPL + * LSN: 7 8 + * READ VIEW: * * + * + * is_last_level = true. + * No merge, skip DELETE from last level, although there the read + * view on the DELETE exists. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, DELETE, 1), + STMT_TEMPLATE(8, REPLACE, 1, 1), + }; + const struct vy_stmt_template expected[] = { content[1] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * LINKED WITH: gh-1824, about pruning last DELETE. + * STATEMENT: DEL REPL + * LSN: 7 8 + * READ VIEW: * * + * + * is_last_level = false; + * No merge, don't skip DELETE from last level. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, DELETE, 1), + STMT_TEMPLATE(8, REPLACE, 1, 1), + }; + const struct vy_stmt_template expected[] = { content[1], content[0] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL DEL REPL REPL + * LSN: 5 6 6 7 + * READ VIEW: * + * \_______________/\_______/ + * \_____/\______/ + * merge skip as + * optimized + * update + * DEL and REPL with lsn 6 can be skipped for read view 6 for + * secondary index, because they do not change secondary key. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(6, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(6, REPLACE, 1, 2), + STMT_TEMPLATE(7, REPLACE, 1, 3) + }; + const struct vy_stmt_template expected[] = { content[3], content[0] }; + const int vlsns[] = {6}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, true); +} +{ +/* + * STATEMENT: DEL REPL + * LSN: 6 6 + * \______/ + * skip both as optimized update + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE_OPTIMIZED(6, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(6, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = {}; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, false); +} +{ +/* + * STATEMENT: UPS UPS UPS REPL + * LSN: 6 7 8 9 + * READ VIEW: * + * \______/\________/ + * merge merge + * UPSERT before REPLACE must be squashed with only older + * statements. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, UPSERT, 1, 1), + STMT_TEMPLATE(7, UPSERT, 1, 2), + STMT_TEMPLATE(8, UPSERT, 1, 3), + STMT_TEMPLATE(9, REPLACE, 1, 4) + }; + const struct vy_stmt_template expected[] = { + content[3], STMT_TEMPLATE(7, UPSERT, 1, 1) + }; + const int vlsns[] = {7}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL REPL REPL REPL + * LSN: 6 7 20 21 + * READ VIEW: * *(10) * * *(22) *(23) + * \________/\______/\_____/\______/\____________/ + * merge nullify merge merge nullify + * + * Do not remember the read views with the same versions of the + * key. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE(7, REPLACE, 1, 2), + STMT_TEMPLATE(20, REPLACE, 1, 3), + STMT_TEMPLATE(21, REPLACE, 1, 4) + }; + const struct vy_stmt_template expected[] = { + content[3], content[2], content[1] + }; + const int vlsns[] = {7, 10, 20, 21, 22, 23}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: REPL DEL REPL + * LSN: 6 7 7 + * \___/\__________/ + * merge skip as optimized update + * + * last_level = false. + * Check if the key is not fully skipped in a case of optimized + * update as the newest version. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(7, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(7, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[0] }; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, false); +} +{ +/* + * STATEMENT: REPL DEL REPL + * LSN: 6 7 7 + * \_________/|\___/ + * skip last level | skip as optimized + * delete. | update. + * + * last_level = true. First apply 'last level DELETE' optimization + * and only then the 'optimized UPDATE'. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(7, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(7, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[2] }; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} + box_key_def_delete(key_def); + fiber_gc(); + footer(); + check_plan(); +} + +int +main(int argc, char *argv[]) +{ + memory_init(); + fiber_init(fiber_c_invoke); + tuple_init(); + + test_basic(); + + tuple_free(); + fiber_free(); + memory_free(); + return 0; +} diff --git a/test/unit/vy_write_iterator.result b/test/unit/vy_write_iterator.result new file mode 100644 index 0000000000..86029f9864 --- /dev/null +++ b/test/unit/vy_write_iterator.result @@ -0,0 +1,39 @@ + *** test_basic *** +1..36 +ok 1 - stmt 0 is correct +ok 2 - stmt 1 is correct +ok 3 - stmt 2 is correct +ok 4 - stmt 3 is correct +ok 5 - correct results count +ok 6 - stmt 0 is correct +ok 7 - stmt 1 is correct +ok 8 - stmt 2 is correct +ok 9 - stmt 3 is correct +ok 10 - correct results count +ok 11 - stmt 0 is correct +ok 12 - stmt 1 is correct +ok 13 - correct results count +ok 14 - stmt 0 is correct +ok 15 - stmt 1 is correct +ok 16 - correct results count +ok 17 - stmt 0 is correct +ok 18 - correct results count +ok 19 - stmt 0 is correct +ok 20 - stmt 1 is correct +ok 21 - correct results count +ok 22 - stmt 0 is correct +ok 23 - stmt 1 is correct +ok 24 - correct results count +ok 25 - correct results count +ok 26 - stmt 0 is correct +ok 27 - stmt 1 is correct +ok 28 - correct results count +ok 29 - stmt 0 is correct +ok 30 - stmt 1 is correct +ok 31 - stmt 2 is correct +ok 32 - correct results count +ok 33 - stmt 0 is correct +ok 34 - correct results count +ok 35 - stmt 0 is correct +ok 36 - correct results count + *** test_basic: done *** -- GitLab