diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 75b6815a44ebb418380cd24d040cd0be7dfbf90d..54e885467fd94a8b1868971dfeaee480c59fe933 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -947,7 +947,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (index->run_count == 0); wi = vy_write_iterator_new(index->key_def, index->surrogate_format, index->upsert_format, index->id == 0, - is_last_level, tx_manager_vlsn(xm)); + is_last_level, &xm->read_views); if (wi == NULL) goto err_wi; rlist_foreach_entry(mem, &index->sealed, in_sealed) { @@ -1203,7 +1203,7 @@ vy_task_compact_new(struct vy_scheduler *scheduler, struct vy_index *index, bool is_last_level = (range->compact_priority == range->slice_count); wi = vy_write_iterator_new(index->key_def, index->surrogate_format, index->upsert_format, index->id == 0, - is_last_level, tx_manager_vlsn(xm)); + is_last_level, &xm->read_views); if (wi == NULL) goto err_wi; @@ -4057,9 +4057,11 @@ vy_send_range(struct vy_join_ctx *ctx) return 0; /* nothing to do */ int rc = -1; + struct rlist fake_read_views; + rlist_create(&fake_read_views); ctx->wi = vy_write_iterator_new(ctx->key_def, ctx->format, ctx->upsert_format, - true, true, INT64_MAX); + true, true, &fake_read_views); if (ctx->wi == NULL) goto out; diff --git a/src/box/vy_run.c b/src/box/vy_run.c index a1e33e765192e46f0d8a3f35ad79d27aee54015b..73b69b605c15e3f257143130bf4d1d44706438b0 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -2051,6 +2051,9 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, struct vy_page_info *page = NULL; const char *region_key; bool end_of_run = false; + /* Last written statement */ + struct tuple *last_stmt = *curr_stmt; + vy_stmt_ref_if_possible(last_stmt); /* row offsets accumulator */ struct ibuf row_index_buf; @@ -2086,8 +2089,6 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, vy_page_info_create(page, data_xlog->offset, *curr_stmt, key_def); xlog_tx_begin(data_xlog); - /* Last written statement */ - struct tuple *last_stmt = NULL; do { uint32_t *offset = (uint32_t *) ibuf_alloc(&row_index_buf, sizeof(uint32_t)); @@ -2098,13 +2099,6 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, } *offset = page->unpacked_size; - if (last_stmt != NULL && vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); - - last_stmt = *curr_stmt; - if (vy_stmt_is_refable(last_stmt)) - tuple_ref(last_stmt); - if (vy_run_dump_stmt(*curr_stmt, data_xlog, page, key_def, is_primary) != 0) goto error_rollback; @@ -2118,8 +2112,13 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, if (wi->iface->next(wi, curr_stmt)) goto error_rollback; - if (*curr_stmt == NULL) + if (*curr_stmt == NULL) { end_of_run = true; + } else { + vy_stmt_unref_if_possible(last_stmt); + last_stmt = *curr_stmt; + vy_stmt_ref_if_possible(last_stmt); + } } while (end_of_run == false && obuf_size(&data_xlog->obuf) < page_size); @@ -2142,9 +2141,8 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, if (run->info.max_key == NULL) goto error_rollback; } - - if (vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); + vy_stmt_unref_if_possible(last_stmt); + last_stmt = NULL; /* Save offset to row index */ page->row_index_offset = page->unpacked_size; @@ -2180,10 +2178,9 @@ vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, error_rollback: xlog_tx_rollback(data_xlog); - if (last_stmt != NULL && vy_stmt_is_refable(last_stmt)) - tuple_unref(last_stmt); error_row_index: ibuf_destroy(&row_index_buf); + vy_stmt_unref_if_possible(last_stmt); return -1; } diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index bc335e207fb55a45230397a42f0f260b8be7e73c..e111411aa8ea7c2d18d38f9149cfd82d693fded3 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -259,6 +259,32 @@ vy_stmt_is_refable(const struct tuple *stmt) return stmt->refs > 0; } +/** + * Ref tuple, if it exists (!= NULL) and can be referenced. + * @sa vy_stmt_is_refable. + * + * @param tuple Tuple to ref or NULL. + */ +static inline void +vy_stmt_ref_if_possible(struct tuple *stmt) +{ + if (stmt != NULL && vy_stmt_is_refable(stmt)) + tuple_ref(stmt); +} + +/** + * Unref tuple, if it exists (!= NULL) and can be unreferenced. + * @sa vy_stmt_is_refable. + * + * @param tuple Tuple to unref or NULL. + */ +static inline void +vy_stmt_unref_if_possible(struct tuple *stmt) +{ + if (stmt != NULL && vy_stmt_is_refable(stmt)) + tuple_unref(stmt); +} + /** * Specialized comparators are faster than general-purpose comparators. * For example, vy_stmt_compare - slowest comparator because it in worst case diff --git a/src/box/vy_write_iterator.c b/src/box/vy_write_iterator.c index ceefd408b5f956dfd17ebb7908a404d68630122d..1b823e295f3965f19c43af8e0684291753192c3b 100644 --- a/src/box/vy_write_iterator.c +++ b/src/box/vy_write_iterator.c @@ -33,6 +33,7 @@ #include "vy_run.h" #include "vy_upsert.h" #include "column_mask.h" +#include "fiber.h" #define HEAP_FORWARD_DECLARATION #include "salad/heap.h" @@ -54,11 +55,6 @@ struct vy_write_src { struct heap_node heap_node; /* Current tuple in the source (lowest and with maximal lsn) */ struct tuple *tuple; - /** - * Is the tuple (@sa tuple) refable or not. - * Tuples from mems are reafble, from runs - not - */ - bool is_tuple_refable; /** * There are special rules of comparison for virtual sources * that represent a delimiter between the current key and @@ -76,8 +72,90 @@ struct vy_write_src { }; /** - * Write iterator itself. + * Sequence of verions of a key, sorted by LSN in ascending order. + * (history->tuple.lsn < history->next->tuple.lsn). + */ +struct vy_write_history { + /** Next version with greater LSN. */ + struct vy_write_history *next; + /** Key version. */ + struct tuple *tuple; +}; + +/** + * Create a new vy_write_history object, save a statement into it + * and link with a newer version. + * + * @param region Allocator for the object. + * @param tuple Key version. + * @param next Next version of the key. + * + * @return not NULL Created object. + * @return NULL Memory error. + */ +static inline struct vy_write_history * +vy_write_history_new(struct region *region, struct tuple *tuple, + struct vy_write_history *next) +{ + struct vy_write_history *h = + region_alloc_object(region, struct vy_write_history); + if (h == NULL) + return NULL; + h->tuple = tuple; + assert(next == NULL || (next->tuple != NULL && + vy_stmt_lsn(next->tuple) > vy_stmt_lsn(tuple))); + h->next = next; + if (vy_stmt_is_refable(tuple)) + tuple_ref(tuple); + return h; +} + +/** + * Clear entire sequence of versions of a key. Free resources of + * each version. + * @param history History to clear. */ +static inline void +vy_write_history_destroy(struct vy_write_history *history) +{ + do { + if (history->tuple != NULL && + vy_stmt_is_refable(history->tuple)) + tuple_unref(history->tuple); + history = history->next; + } while (history != NULL); +} + +/** Read view of a key. */ +struct vy_read_view_stmt { + /** Read view LSN. */ + int64_t vlsn; + /** Result key version, visible for the @vlsn. */ + struct tuple *tuple; + /** + * Sequence of key versions. It is merged at the end of + * the key building into @tuple. + */ + struct vy_write_history *history; +}; + +/** + * Free resources, unref tuples, including all tuples in the + * history. + * @param rv Read view to clear. + */ +static inline void +vy_read_view_stmt_destroy(struct vy_read_view_stmt *rv) +{ + if (rv->tuple != NULL && vy_stmt_is_refable(rv->tuple)) + tuple_unref(rv->tuple); + rv->tuple = NULL; + if (rv->history != NULL) + vy_write_history_destroy(rv->history); + rv->history = NULL; +} + +/* @sa vy_write_iterator.h */ struct vy_write_iterator { /** Parent class, must be the first member */ struct vy_stmt_stream base; @@ -85,28 +163,46 @@ struct vy_write_iterator { struct rlist src_list; /* Heap with sources with the lowest source in head */ heap_t src_heap; - /** - * Tuple that was returned in the last vy_write_iterator_next call, - * or the tuple to be returned in vy_write_iterator_next execution. - */ - struct tuple *tuple; - /** - * Is the tuple (member) refable or not. - * Tuples from runs are reafable, from mems - not. - */ - bool is_tuple_refable; /** Index key definition used for storing statements on disk. */ const struct key_def *key_def; /** Format to allocate new REPLACE and DELETE tuples from vy_run */ struct tuple_format *format; /** Same as format, but for UPSERT tuples. */ struct tuple_format *upsert_format; - /* The minimal VLSN among all active transactions */ - int64_t oldest_vlsn; /* There are is no level older than the one we're writing to. */ bool is_last_level; /** Set if this iterator is for a primary index. */ bool is_primary; + + /** Length of the @read_views. */ + int rv_count; + /** Count of not empty read views. */ + int rv_used_count; + /** + * Current read view statement in @read_views. It is used + * to return key versions one by one from + * vy_write_iterator_next. + */ + int stmt_i; + /** + * Read views of the same key, sorted by lsn in + * descending order and started from the INT64_MAX. Each + * is referenced if needed. Example: + * stmt_count = 3 + * rv_count = 6 + * 0 1 2 3 4 5 + * [lsn=6, lsn=5, lsn=4, -, -, -] + * + * @Read_views can have gaps, if there are read views with + * the same key versions. Example: + * + * LSN: 20 - - - 10 9 8 + * Read views: * * * * * + * @read_views array: [lsn=20, -, -, -, lsn=10, -, -] + * \___________________/ + * Same versions of the key. + */ + struct vy_read_view_stmt read_views[0]; }; /** @@ -147,11 +243,10 @@ heap_less(heap_t *heap, struct heap_node *node1, struct heap_node *node2) * Allocate a source and put it to the list. * The underlying stream (src->stream) must be opened immediately. * @param stream - the write iterator. - * @param is_tuple_refable - true for runs and false for mems. * @return the source or NULL on memory error. */ static struct vy_write_src * -vy_write_iterator_new_src(struct vy_write_iterator *stream, bool is_tuple_refable) +vy_write_iterator_new_src(struct vy_write_iterator *stream) { struct vy_write_src *res = (struct vy_write_src *) malloc(sizeof(*res)); if (res == NULL) { @@ -159,7 +254,6 @@ vy_write_iterator_new_src(struct vy_write_iterator *stream, bool is_tuple_refabl "malloc", "write stream src"); return NULL; } - res->is_tuple_refable = is_tuple_refable; res->is_end_of_key = false; rlist_add(&stream->src_list, &res->in_src_list); return res; @@ -234,14 +328,33 @@ static const struct vy_stmt_stream_iface vy_slice_stream_iface; struct vy_stmt_stream * vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format, struct tuple_format *upsert_format, bool is_primary, - bool is_last_level, int64_t oldest_vlsn) + bool is_last_level, struct rlist *read_views) { + /* + * One is reserved for the INT64_MAX - maximal read view. + */ + int count = 1; + struct rlist *unused; + rlist_foreach(unused, read_views) + ++count; + size_t size = sizeof(struct vy_write_iterator) + + count * sizeof(struct vy_read_view_stmt); struct vy_write_iterator *stream = - (struct vy_write_iterator *) malloc(sizeof(*stream)); + (struct vy_write_iterator *) calloc(1, size); if (stream == NULL) { - diag_set(OutOfMemory, sizeof(*stream), "malloc", "write stream"); + diag_set(OutOfMemory, size, "malloc", "write stream"); return NULL; } + stream->stmt_i = -1; + stream->rv_count = count; + stream->read_views[0].vlsn = INT64_MAX; + count--; + struct vy_read_view *rv; + /* Descending order. */ + rlist_foreach_entry(rv, read_views, in_read_views) + stream->read_views[count--].vlsn = rv->vlsn; + assert(count == 0); + stream->base.iface = &vy_slice_stream_iface; src_heap_create(&stream->src_heap); rlist_create(&stream->src_list); @@ -251,38 +364,10 @@ vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format stream->upsert_format = upsert_format; tuple_format_ref(stream->upsert_format, 1); stream->is_primary = is_primary; - stream->oldest_vlsn = oldest_vlsn; stream->is_last_level = is_last_level; - stream->tuple = NULL; - stream->is_tuple_refable = false; return &stream->base; } -/** - * Set stream->tuple as a tuple to be output as a result of .._next call. - * Ref the new tuple if necessary, unref older value if needed. - * @param stream - the write iterator. - * @param tuple - the tuple to be saved. - * @param is_tuple_refable - is the tuple must of must not be referenced. - */ -static void -vy_write_iterator_set_tuple(struct vy_write_iterator *stream, - struct tuple *tuple, bool is_tuple_refable) -{ - if (stream->tuple != NULL && tuple != NULL) - assert(tuple_compare(stream->tuple, tuple, stream->key_def) < 0 || - vy_stmt_lsn(stream->tuple) >= vy_stmt_lsn(tuple)); - - if (stream->tuple != NULL && stream->is_tuple_refable) - tuple_unref(stream->tuple); - - stream->tuple = tuple; - stream->is_tuple_refable = is_tuple_refable; - - if (stream->tuple != NULL && stream->is_tuple_refable) - tuple_ref(stream->tuple); -} - /** * Start the search. Must be called after *add* methods and * before *next* method. @@ -309,7 +394,8 @@ vy_write_iterator_stop(struct vy_stmt_stream *vstream) { assert(vstream->iface->stop == vy_write_iterator_stop); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - vy_write_iterator_set_tuple(stream, NULL, false); + for (int i = 0; i < stream->rv_count; ++i) + vy_read_view_stmt_destroy(&stream->read_views[i]); struct vy_write_src *src, *tmp; rlist_foreach_entry_safe(src, &stream->src_list, in_src_list, tmp) vy_write_iterator_delete_src(stream, src); @@ -337,7 +423,7 @@ NODISCARD int vy_write_iterator_add_mem(struct vy_stmt_stream *vstream, struct vy_mem *mem) { struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - struct vy_write_src *src = vy_write_iterator_new_src(stream, false); + struct vy_write_src *src = vy_write_iterator_new_src(stream); if (src == NULL) return -1; vy_mem_stream_open(&src->mem_stream, mem); @@ -353,7 +439,7 @@ vy_write_iterator_add_slice(struct vy_stmt_stream *vstream, struct vy_slice *slice, struct vy_run_env *run_env) { struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; - struct vy_write_src *src = vy_write_iterator_new_src(stream, true); + struct vy_write_src *src = vy_write_iterator_new_src(stream); if (src == NULL) return -1; vy_slice_stream_open(&src->slice_stream, slice, stream->key_def, @@ -384,13 +470,115 @@ vy_write_iterator_merge_step(struct vy_write_iterator *stream) } /** - * Squash in the single statement all rest statements of current key - * starting from the current statement. + * Try to get VLSN of the read view with the specified number in + * the vy_write_iterator.read_views array. + * If the requested read view is older than all existing ones, + * return 0, as the oldest possible VLSN. + * + * @param stream Write iterator. + * @param current_rv_i Index of the read view. + * + * @retval VLSN. + */ +static inline int64_t +vy_write_iterator_get_vlsn(struct vy_write_iterator *stream, int rv_i) +{ + if (rv_i >= stream->rv_count) + return 0; + return stream->read_views[rv_i].vlsn; +} + +/** + * Remember the current tuple of the @src as a part of the + * current read view. + * @param History objects allocator. + * @param stream Write iterator. + * @param src Source of the wanted tuple. + * @param current_rv_i Index of the current read view. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static inline int +vy_write_iterator_push_rv(struct region *region, + struct vy_write_iterator *stream, + struct vy_write_src *src, int current_rv_i) +{ + assert(current_rv_i < stream->rv_count); + struct vy_read_view_stmt *rv = &stream->read_views[current_rv_i]; + assert(rv->vlsn >= vy_stmt_lsn(src->tuple)); + struct vy_write_history *h = + vy_write_history_new(region, src->tuple, rv->history); + if (h == NULL) + return -1; + rv->history = h; + return 0; +} + +/** + * Return the next statement from the current key read view + * statements sequence. Unref the previous statement, if needed. + * We can't unref the statement right before returning it to the + * caller, because reference in the read_views array can be + * single reference of this statement, and unref could delete it + * before returning. + * + * @param stream Write iterator. + * @retval not NULL Next statement of the current key. + * @retval NULL End of the key (not the end of the sources). + */ +static inline struct tuple * +vy_write_iterator_pop_read_view_stmt(struct vy_write_iterator *stream) +{ + struct vy_read_view_stmt *rv; + if (stream->stmt_i >= 0) { + /* Destroy the current before getting the next. */ + rv = &stream->read_views[stream->stmt_i]; + assert(rv->history == NULL); + vy_read_view_stmt_destroy(rv); + } + if (stream->rv_used_count == 0) + return NULL; + /* Find a next not empty history element. */ + do { + assert(stream->stmt_i + 1 < stream->rv_count); + stream->stmt_i++; + rv = &stream->read_views[stream->stmt_i]; + assert(rv->history == NULL); + } while (rv->tuple == NULL); + assert(stream->rv_used_count > 0); + stream->rv_used_count--; + return rv->tuple; +} + +/** + * Build the history of the current key. During the history + * building already some optimizations can be applied - + * @sa optimizations 1, 2 and 3 in vy_write_iterator.h. + * During building of a key history, some statements can be + * skipped, but no one can be merged. + * UPSERTs merge is executed on a special 'merge' phase. + * + * @param region History objects allocator. + * @param stream Write iterator. + * @param[out] count Count of statements, saved in a history. + * + * @retval 0 Success. + * @retval -1 Memory error. */ static NODISCARD int -vy_write_iterator_next_key(struct vy_write_iterator *stream) +vy_write_iterator_build_history(struct region *region, + struct vy_write_iterator *stream, int *count) { - assert(stream->tuple != NULL); + *count = 0; + assert(stream->stmt_i == -1); + struct heap_node *node = src_heap_top(&stream->src_heap); + if (node == NULL) + return 0; /* no more data */ + struct vy_write_src *src = + container_of(node, struct vy_write_src, heap_node); + /* Search must be started in the task. */ + assert(src->tuple != NULL); /* * A virtual source instance that represents the end on current key in * source heap. Due to a special branch in heap's comparator the @@ -401,56 +589,253 @@ vy_write_iterator_next_key(struct vy_write_iterator *stream) */ struct vy_write_src end_of_key_src; end_of_key_src.is_end_of_key = true; - end_of_key_src.tuple = stream->tuple; + end_of_key_src.tuple = src->tuple; int rc = src_heap_insert(&stream->src_heap, &end_of_key_src.heap_node); if (rc) { diag_set(OutOfMemory, sizeof(void *), "malloc", "write stream heap"); return rc; } + if (vy_stmt_is_refable(src->tuple)) + tuple_ref(src->tuple); + /* + * For each pair (merge_until_lsn, current_rv_lsn] build + * history of a corresponding read view. + * current_rv_i - index of the current read view. + */ + int current_rv_i = 0; + int64_t current_rv_lsn = vy_write_iterator_get_vlsn(stream, 0); + int64_t merge_until_lsn = vy_write_iterator_get_vlsn(stream, 1); + uint64_t key_mask = stream->key_def->column_mask; while (true) { - struct heap_node *node = src_heap_top(&stream->src_heap); - assert(node != NULL); - struct vy_write_src *src = - container_of(node, struct vy_write_src, heap_node); - assert(src->tuple != NULL); /* Is search started? */ - - if (vy_stmt_type(stream->tuple) == IPROTO_UPSERT && - (!src->is_end_of_key || stream->is_last_level)) { - const struct tuple *apply_to = - src->is_end_of_key ? NULL : src->tuple; - struct tuple *applied = - vy_apply_upsert(stream->tuple, apply_to, - stream->key_def, stream->format, - stream->upsert_format, false); - if (applied == NULL) { - rc = -1; + /* + * Skip statements, unvisible by the current read + * view and unused by the previous read view. + */ + if (vy_stmt_lsn(src->tuple) > current_rv_lsn) + goto next_step; + /* + * The iterator reached the border of two read + * views. + */ + while (vy_stmt_lsn(src->tuple) <= merge_until_lsn) { + /* + * Skip read views, which have the same + * versions of the key. + * The src->tuple must be between + * merge_until_lsn and current_rv_lsn. + */ + current_rv_i++; + current_rv_lsn = merge_until_lsn; + int n = current_rv_i + 1; + merge_until_lsn = + vy_write_iterator_get_vlsn(stream, n); + } + + /* + * Optimization 1: skip last level delete. + * @sa vy_write_iterator for details about this + * and other optimizations. + */ + if (vy_stmt_type(src->tuple) == IPROTO_DELETE && + stream->is_last_level && merge_until_lsn == 0) { + current_rv_lsn = 0; + goto next_step; + } + + /* + * Optimization 2: skip after REPLACE/DELETE. + */ + if (vy_stmt_type(src->tuple) == IPROTO_REPLACE || + vy_stmt_type(src->tuple) == IPROTO_DELETE) { + uint64_t stmt_mask = vy_stmt_column_mask(src->tuple); + /* + * Optimization 3: skip statements, which + * do not update the secondary key. + */ + if (!stream->is_primary && + key_update_can_be_skipped(key_mask, stmt_mask)) + goto next_step; + + rc = vy_write_iterator_push_rv(region, stream, src, + current_rv_i); + if (rc != 0) break; - } - vy_write_iterator_set_tuple(stream, applied, true); - /* refresh tuple in virtual source */ - end_of_key_src.tuple = stream->tuple; + ++*count; + current_rv_i++; + current_rv_lsn = merge_until_lsn; + merge_until_lsn = + vy_write_iterator_get_vlsn(stream, + current_rv_i + 1); + goto next_step; } - if (src->is_end_of_key) + assert(vy_stmt_type(src->tuple) == IPROTO_UPSERT); + rc = vy_write_iterator_push_rv(region, stream, src, + current_rv_i); + if (rc != 0) break; - + ++*count; +next_step: rc = vy_write_iterator_merge_step(stream); if (rc != 0) break; + node = src_heap_top(&stream->src_heap); + assert(node != NULL); + src = container_of(node, struct vy_write_src, heap_node); + assert(src->tuple != NULL); + if (src->is_end_of_key) + break; } src_heap_delete(&stream->src_heap, &end_of_key_src.heap_node); + if (vy_stmt_is_refable(end_of_key_src.tuple)) + tuple_unref(end_of_key_src.tuple); return rc; } +/** + * Apply accumulated UPSERTs in the read view with a hint from + * a previous read view. After merge, the read view must contain + * single statement. + * + * @param stream Write iterator. + * @param previous_version Hint from a previous read view. + * @param rv Read view to merge. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static NODISCARD int +vy_read_view_merge(struct vy_write_iterator *stream, + struct tuple *previous_version, struct vy_read_view_stmt *rv) +{ + assert(rv != NULL); + assert(rv->tuple == NULL); + assert(rv->history != NULL); + struct vy_write_history *h = rv->history; + /* + * Two possible hints to remove the current UPSERT. + * 1. If the previous read view contains DELETE or + * REPLACE, then the current UPSERT can be applied to + * it, whether is_last_level true or not. + * 2. If the stream is working on the last level. Then we + * are sure the UPSERT to be oldest version of a key + * and it can be turned into REPLACE. + */ + if (vy_stmt_type(h->tuple) == IPROTO_UPSERT && + (stream->is_last_level || (previous_version != NULL && + vy_stmt_type(previous_version) != IPROTO_UPSERT))) { + assert(!stream->is_last_level || previous_version == NULL || + vy_stmt_type(previous_version) != IPROTO_UPSERT); + struct tuple *applied = + vy_apply_upsert(h->tuple, previous_version, + stream->key_def, stream->format, + stream->upsert_format, false); + if (applied == NULL) + return -1; + if (vy_stmt_is_refable(h->tuple)) + tuple_unref(h->tuple); + h->tuple = applied; + } + /* Squash rest of UPSERTs. */ + struct vy_write_history *result = h; + h = h->next; + while (h != NULL) { + assert(h->tuple != NULL && + vy_stmt_type(h->tuple) == IPROTO_UPSERT); + assert(result->tuple != NULL); + struct tuple *applied = + vy_apply_upsert(h->tuple, result->tuple, + stream->key_def, stream->format, + stream->upsert_format, false); + if (applied == NULL) + return -1; + if (vy_stmt_is_refable(result->tuple)) + tuple_unref(result->tuple); + result->tuple = applied; + /* + * Before: + * result -> h -> next + * + * Will be truncated by region. + * After: / + * result -. h .-> next + * \_ _ _ _ / + */ + if (vy_stmt_is_refable(h->tuple)) + tuple_unref(h->tuple); + h = h->next; + result->next = h; + } + rv->tuple = result->tuple; + rv->history = NULL; + result->tuple = NULL; + assert(result->next == NULL); + return 0; +} + +/** + * Split the current key into the sequence of the read view + * statements. @sa struct vy_write_iterator comment for details + * about algorithm and optimizations. + * + * @param stream Write iterator. + * @param[out] count Length of the result key versions sequence. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +static NODISCARD int +vy_write_iterator_build_read_views(struct vy_write_iterator *stream, int *count) +{ + *count = 0; + int raw_count; + struct region *region = &fiber()->gc; + size_t used = region_used(region); + stream->rv_used_count = 0; + if (vy_write_iterator_build_history(region, stream, &raw_count) != 0) + goto error; + if (raw_count == 0) { + /* A key is fully optimized. */ + region_truncate(region, used); + return 0; + } + /* Find a first not empty read view. */ + struct vy_read_view_stmt *rv = + &stream->read_views[stream->rv_count - 1]; + while (rv > &stream->read_views[0] && rv->history == NULL) + --rv; + /* + * At least one statement has been found, since raw_count + * here > 0. + */ + assert(rv >= &stream->read_views[0] && rv->history != NULL); + struct tuple *previous_version = NULL; + for (; rv >= &stream->read_views[0]; --rv) { + if (rv->history == NULL) + continue; + if (vy_read_view_merge(stream, previous_version, rv) != 0) + goto error; + stream->rv_used_count++; + ++*count; + previous_version = rv->tuple; + assert(rv->history == NULL); + } + region_truncate(region, used); + return 0; +error: + region_truncate(region, used); + return -1; +} + /** * Get the next statement to write. * The user of the write iterator simply expects a stream * of statements to write to the output. * The tuple *ret is guaranteed to be valid until next tuple is - * returned (thus last non-null tuple is valid after EOF). + * returned (thus last non-null tuple is valid after EOF). * * @return 0 on success or not 0 on error (diag is set). */ @@ -461,55 +846,34 @@ vy_write_iterator_next(struct vy_stmt_stream *vstream, assert(vstream->iface->next == vy_write_iterator_next); struct vy_write_iterator *stream = (struct vy_write_iterator *)vstream; /* - * Nullify the result stmt. If the next stmt is not - * found, this would be a marker of the end of the stream. + * Try to get the next statement from the current key + * read view statements sequence. */ - *ret = NULL; + *ret = vy_write_iterator_pop_read_view_stmt(stream); + if (*ret != NULL) + return 0; - while (true) { - struct heap_node *node = src_heap_top(&stream->src_heap); - if (node == NULL) - return 0; /* no more data */ - struct vy_write_src *src = - container_of(node, struct vy_write_src, heap_node); - assert(src->tuple != NULL); /* Is search started? */ - vy_write_iterator_set_tuple(stream, src->tuple, - src->is_tuple_refable); - - int rc = vy_write_iterator_merge_step(stream); - if (rc != 0) - return -1; - - if (vy_stmt_lsn(stream->tuple) > stream->oldest_vlsn) - break; /* Save the current stmt as the result. */ - - if (vy_stmt_type(stream->tuple) == IPROTO_REPLACE || - vy_stmt_type(stream->tuple) == IPROTO_DELETE) { - /* - * If the tuple has extra size - it has - * column mask of an update operation. - * The tuples from secondary indexes - * which don't modify its keys can be - * skipped during dump, - * @sa vy_can_skip_update(). - */ - if (!stream->is_primary && - key_update_can_be_skipped(stream->key_def->column_mask, - vy_stmt_column_mask(stream->tuple))) - continue; - } + /* Build the next key sequence. */ + stream->stmt_i = -1; + int count = 0; + while (true) { /* Squash upserts and/or go to the next key */ - rc = vy_write_iterator_next_key(stream); - if (rc != 0) + if (vy_write_iterator_build_read_views(stream, &count) != 0) return -1; - - if (vy_stmt_type(stream->tuple) == IPROTO_DELETE && - stream->is_last_level) - continue; /* Skip unnecessary DELETE */ - break; + /* + * Next_key() routine could skip the next key, for + * example, if it was truncated by last level + * DELETE or it consisted only from optimized + * updates. Then try get the next key. + */ + if (count != 0 || stream->src_heap.size == 0) + break; } - *ret = stream->tuple; + /* + * Again try to get the statement, after calling next_key. + */ + *ret = vy_write_iterator_pop_read_view_stmt(stream); return 0; } diff --git a/src/box/vy_write_iterator.h b/src/box/vy_write_iterator.h index 48a9013ad565b40652c8c821d00fcc696f5922e9..7d5d347fae50de58320b9869f0a9eb8659be7269 100644 --- a/src/box/vy_write_iterator.h +++ b/src/box/vy_write_iterator.h @@ -40,104 +40,104 @@ * or over a series of sorted runs on disk to create a new sorted * run (compaction). * - * Use merge iterator to order the output and filter out - * too old statements (older than the oldest active read view). - * - * Squash multiple UPSERT statements over the same key into one, - * if possible. - * * Background * ---------- - * Vinyl provides support for consistent read views. The oldest - * active read view is maintained in the transaction manager. - * To support it, when dumping or compacting statements on disk, - * older versions need to be preserved, and versions outside - * any active read view garbage collected. This task is handled - * by the write iterator. - * - * Filtering - * --------- - * Let's call each transaction consistent read view LSN vlsn. - * - * oldest_vlsn = MIN(vlsn) over all active transactions - * - * Thus to preserve relevant data for every key and purge old - * versions, the iterator works as follows: - * - * If statement lsn is greater than oldest vlsn, the - * statement is preserved. - * - * Otherwise, if statement type is REPLACE/DELETE, then - * it's returned, and the iterator can proceed to the - * next key: the readers do not need the history. - * - * Otherwise, the statement is UPSERT, and in order - * to restore the original tuple from UPSERT the reader - * does need the history: they need to look for an older - * statement to which the UPSERT can be applied to get - * a tuple. This older statement can be UPSERT as well, - * and so on. - * In other words, of statement type is UPSERT, the reader - * needs a range of statements from the youngest statement - * with lsn <= vlsn to the youngest non-UPSERT statement - * with lsn <= vlsn, borders included. - * - * All other versions of this key can be skipped, and hence - * garbage collected. - * - * Squashing and garbage collection - * -------------------------------- - * Filtering and garbage collection, performed by write iterator, - * must have no effect on read views of active transactions: - * they should read the same data as before. - * - * On the other hand, old version should be deleted as soon as possible; - * multiple UPSERTs could be merged together to take up less - * space, or substituted with REPLACE. - * - * Here's how it's done: - * - * - * 1) Every statement with lsn greater than oldest vlsn is preserved - * in the output, since there could be an active transaction - * that needs it. - * - * 2) For all statements with lsn <= oldest_vlsn, only a single - * resultant statement is returned. Here's how. - * - * 2.1) If the youngest statement with lsn <= oldest _vlsn is a - * REPLACE/DELETE, it becomes the resultant statement. - * - * 2.2) Otherwise, it as an UPSERT. Then we must iterate over - * all older LSNs for this key until we find a REPLACE/DELETE - * or exhaust all input streams for this key. - * - * If the older lsn is a yet another UPSERT, two upserts are - * squashed together into one. Otherwise we found an - * REPLACE/DELETE, so apply all preceding UPSERTs to it and - * get the resultant statement. - * - * There is an extra twist to this algorithm, used when performing - * compaction of the last LSM level (i.e. merging all existing - * runs into one). The last level does not need to store DELETEs. - * Thus we can: - * 1) Completely skip the resultant statement from output if it's - * a DELETE. - * | ... | | ... | - * | | | | ^ - * +- oldest vlsn -+ = +- oldest lsn -+ ^ lsn - * | | | | ^ - * | DELETE | +--------------+ - * | ... | - * 2) Replace an accumulated resultant UPSERT with an appropriate - * REPLACE. - * | ... | | ... | - * | UPSERT | | REPLACE | - * | | | | ^ - * +- oldest vlsn -+ = +- oldest lsn -+ ^ lsn - * | | | | ^ - * | DELETE | +--------------+ - * | ... | + * With no loss of generality, lets consider the write_iterator to + * have a single statements source (struct vy_write_src). Each key + * consists of an LSNs sequence. A range of control points also + * exists, named read views, each of which is characterized by + * their VLSN. By default, for the biggest VLSN INT64_MAX is used, + * and for the smallest one 0 is used: + * + * [0, vlsn1, vlsn2, vlsn3, ... INT64_MAX]. + * + * The purpose of the write_iterator is to split LSNs sequence of + * one key into subsequences, bordered with VLSNs, and then merge + * each subsequence into a one statement. + * -------- + * ONE KEY: + * -------- + * 0 VLSN1 VLSN2 ... INT64_MAX + * | | | | + * | LSN1 ... LSN(i)|LSN(i+1) ... LSN(j)|LSN(j+1) ... LSN(N)| + * \_______________/\__________________/\___________________/ + * merge merge merge + * + * A range of optimizations is possible, which allow decrease + * count of source statements and count of merged subsequences. + * + * --------------------------------------------------------------- + * Optimization 1: skip DELETE from the last level of the oldest + * read view. + * --------------------------- + * ONE KEY, LAST LEVEL SOURCE: + * --------------------------- + * LSN1 LSN2 ... DELETE LSNi LSNi+1 ... LSN_N + * \___________________________/\___________________________/ + * skip merge + * + * Details: if the source is the oldest source of all, then it is + * not necessary to write any DELETE to the disk, including all + * LSNs of the same key, which are older than this DELETE. However + * such a skip is possible only if there is no read views to the + * same key, older than this DELETE, because read views can't be + * skipped. + * + * --------------------------------------------------------------- + * Optimization 2: on REPLACE/DELETE skip rest of statements, + * until the next read view. + * -------- + * ONE KEY: + * -------- + * ... LSN(k) ... LSN(i-1) LSN(i) REPLACE/DELETE + * Read + * views: * * + * _____________/\_____________________________/\__________/ + * merge skip return + * + * Is is possible, since the REPLACE and DELETE discard all older + * key versions. + * + * --------------------------------------------------------------- + * Optimization 3: skip statements, which do not update the + * secondary key. + * + * Masks intersection: not 0 0 0 not 0 not 0 + * KEY: LSN1 DELETE REPLACE LSN5 ... REPLACE + * \____/\_______________/\__________________/ + * merge skip merge + * + * Details: if UPDATE is executed, it is transformed into + * DELETE + REPLACE or single REPLACE. But in the secondary index + * only keys are stored and if such UPDATE didn't change this key, + * then there is no necessity to write this UPDATE. Actually, + * existance of such UPDATEs is simply ignored. + * + * --------------------------------------------------------------- + * Optimization 4: use older REPLACE/DELETE as a hints to apply + * newer UPSERTs. + * + * After building history of a key, UPSERT sequences can be + * accumulated in some read views. If the older read views have + * REPLACE/DELETE, they can be used to turn newer UPSERTs into + * REPLACEs. + * -------- + * ONE KEY: + * -------- + * LSN1 LSN2 LSN3 LSN4 + * REPLACE UPSERT UPSERT UPSERT + * Read + * views: * * * * + * ^ \________________________/ + * +- - - - - - - -< apply + * Result: + * LSN1 LSN2 LSN3 LSN4 + * REPLACE REPLACE REPLACE REPLACE + * Read + * views: * * * * + * + * See implementation details in + * vy_write_iterator_build_read_views. */ struct vy_write_iterator; @@ -156,13 +156,13 @@ struct vy_run_env; * @param upsert_format - same as format, but for UPSERT tuples. * @param is_primary - set if this iterator is for a primary index. * @param is_last_level - there is no older level than the one we're writing to. - * @param oldest_vlsn - the minimal VLSN among all active transactions. + * @param read_views - Opened read views. * @return the iterator or NULL on error (diag is set). */ struct vy_stmt_stream * vy_write_iterator_new(const struct key_def *key_def, struct tuple_format *format, struct tuple_format *upsert_format, bool is_primary, - bool is_last_level, int64_t oldest_vlsn); + bool is_last_level, struct rlist *read_views); /** * Add a mem as a source of iterator. diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 3bfe94b9b83d90995bce4d3ea259bb73684f83a4..d8b2b03705c7bcc54b6e621d9224a8df72501df8 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -135,3 +135,6 @@ target_link_libraries(vy_mem.test box unit) add_executable(column_mask.test column_mask.c) target_link_libraries(column_mask.test box unit) + +add_executable(vy_write_iterator.test vy_write_iterator.c vy_iterators_helper.c) +target_link_libraries(vy_write_iterator.test box unit) diff --git a/test/unit/vy_iterators_helper.c b/test/unit/vy_iterators_helper.c new file mode 100644 index 0000000000000000000000000000000000000000..ef1e6d8e83bd60402985110fd2d14ceae1c0d487 --- /dev/null +++ b/test/unit/vy_iterators_helper.c @@ -0,0 +1,176 @@ +#include "vy_iterators_helper.h" + +struct tuple * +vy_new_simple_stmt(struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask, + const struct vy_stmt_template *templ) +{ + /* Calculate binary size. */ + int i = 0; + size_t size = 0; + while (templ->fields[i] != vyend) { + fail_if(i > MAX_FIELDS_COUNT); + if (templ->fields[i] >= 0) + size += mp_sizeof_uint(templ->fields[i]); + else + size += mp_sizeof_int(templ->fields[i]); + ++i; + } + size += mp_sizeof_array(i); + fail_if(templ->optimize_update && templ->type == IPROTO_UPSERT); + if (templ->optimize_update) + format = format_with_colmask; + + /* Encode the statement. */ + char *buf = (char *) malloc(size); + fail_if(buf == NULL); + char *pos = mp_encode_array(buf, i); + i = 0; + struct tuple *ret = NULL; + while (templ->fields[i] != vyend) { + if (templ->fields[i] >= 0) + pos = mp_encode_uint(pos, templ->fields[i]); + else + pos = mp_encode_int(pos, templ->fields[i]); + ++i; + } + + /* + * Create the result statement, using one of the formats. + */ + if (templ->type == IPROTO_REPLACE || templ->type == IPROTO_DELETE) { + ret = vy_stmt_new_replace(format, buf, pos); + fail_if(ret == NULL); + if (templ->type == IPROTO_REPLACE) + goto end; + + struct tuple *tmp = vy_stmt_new_surrogate_delete(format, ret); + fail_if(tmp == NULL); + tuple_unref(ret); + ret = tmp; + goto end; + } + if (templ->type == IPROTO_UPSERT) { + /* + * Create the upsert statement without operations. + * Validation of result of UPSERT operations + * applying is not a test for the iterators. + * For the iterators only UPSERT type is + * important. + */ + struct iovec operations[1]; + char tmp[16]; + char *ops = mp_encode_array(tmp, 1); + ops = mp_encode_array(ops, 0); + operations[0].iov_base = tmp; + operations[0].iov_len = ops - tmp; + fail_if(templ->optimize_update); + ret = vy_stmt_new_upsert(upsert_format, buf, pos, + operations, 1); + fail_if(ret == NULL); + goto end; + } + fail_if(true); +end: + free(buf); + vy_stmt_set_lsn(ret, templ->lsn); + if (templ->optimize_update) + vy_stmt_set_column_mask(ret, 0); + return ret; +} + +void +vy_mem_insert_template(struct vy_mem *mem, const struct vy_stmt_template *templ) +{ + struct tuple *stmt; + if (templ->type == IPROTO_UPSERT) { + stmt = vy_new_simple_stmt(mem->format, mem->upsert_format, + mem->format_with_colmask, templ); + } else { + stmt = vy_new_simple_stmt(mem->format, mem->upsert_format, + mem->format_with_colmask, templ); + } + struct tuple *region_stmt = vy_stmt_dup_lsregion(stmt, mem->allocator, + mem->generation); + assert(region_stmt != NULL); + tuple_unref(stmt); + if (templ->type == IPROTO_UPSERT) + vy_mem_insert_upsert(mem, region_stmt); + else + vy_mem_insert(mem, region_stmt); +} + +void +init_read_views_list(struct rlist *rlist, struct vy_read_view *rvs, + const int *vlsns, int count) +{ + rlist_create(rlist); + for (int i = 0; i < count; ++i) { + rvs[i].vlsn = vlsns[i]; + rlist_add_tail_entry(rlist, &rvs[i], in_read_views); + } +} + +struct vy_mem * +create_test_mem(struct lsregion *region, struct key_def *def) +{ + /* Create format */ + struct tuple_format *format = tuple_format_new(&vy_tuple_format_vtab, + &def, def->part_count, + 0); + fail_if(format == NULL); + + /* Create format with column mask */ + struct tuple_format *format_with_colmask = + vy_tuple_format_new_with_colmask(format); + assert(format_with_colmask != NULL); + + /* Create upsert format */ + struct tuple_format *format_upsert = + vy_tuple_format_new_upsert(format); + assert(format_upsert != NULL); + + /* Create mem */ + struct vy_mem *mem = vy_mem_new(region, 1, def, format, + format_with_colmask, format_upsert, 0); + fail_if(mem == NULL); + return mem; +} + +bool +vy_stmt_are_same(const struct tuple *actual, + const struct vy_stmt_template *expected, + struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask) +{ + if (vy_stmt_type(actual) != expected->type) + return false; + struct tuple *tmp = vy_new_simple_stmt(format, upsert_format, + format_with_colmask, expected); + fail_if(tmp == NULL); + uint32_t a_len, b_len; + const char *a, *b; + if (vy_stmt_type(actual) == IPROTO_UPSERT) { + a = vy_upsert_data_range(actual, &a_len); + } else { + a = tuple_data_range(actual, &a_len); + } + if (vy_stmt_type(tmp) == IPROTO_UPSERT) { + b = vy_upsert_data_range(tmp, &b_len); + } else { + b = tuple_data_range(tmp, &b_len); + } + if (a_len != b_len) { + tuple_unref(tmp); + return false; + } + if (vy_stmt_lsn(actual) != expected->lsn) { + tuple_unref(tmp); + return false; + } + bool rc = memcmp(a, b, a_len) == 0; + tuple_unref(tmp); + return rc; +} diff --git a/test/unit/vy_iterators_helper.h b/test/unit/vy_iterators_helper.h new file mode 100644 index 0000000000000000000000000000000000000000..bba909327cacceefd81f6b2b84959a89827aa8ab --- /dev/null +++ b/test/unit/vy_iterators_helper.h @@ -0,0 +1,139 @@ +#ifndef INCLUDES_TARANTOOL_TEST_VY_ITERATORS_HELPER_H +#define INCLUDES_TARANTOOL_TEST_VY_ITERATORS_HELPER_H +/* + * Copyright 2010-2017, Tarantool AUTHORS, please see AUTHORS file. + * + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY AUTHORS ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * AUTHORS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include <sys/uio.h> +#include "unit.h" +#include "vy_stmt.h" +#include "small/rlist.h" +#include "small/lsregion.h" +#include "vy_mem.h" +#include "vy_stmt_iterator.h" + +#define vyend 99999999 +#define MAX_FIELDS_COUNT 100 +#define STMT_TEMPLATE(lsn, type, ...) \ +{ { __VA_ARGS__, vyend }, IPROTO_##type, lsn, false } + +#define STMT_TEMPLATE_OPTIMIZED(lsn, type, ...) \ +{ { __VA_ARGS__, vyend }, IPROTO_##type, lsn, true } + +extern struct tuple_format_vtab vy_tuple_format_vtab; + +/** Template for creation a vinyl statement. */ +struct vy_stmt_template { + /** Array of statement fields, ended with 'vyend'. */ + const int fields[MAX_FIELDS_COUNT]; + /** Statement type: REPLACE/UPSERT/DELETE. */ + enum iproto_type type; + /** Statement lsn. */ + int64_t lsn; + /* + * True, if statement must have column mask, that allows + * to skip it in the write_iterator. + */ + bool optimize_update; +}; + +/** + * Create a new vinyl statement using the specified template. + * + * @param format + * @param upsert_format Format for upsert statements. + * @param format_with_colmask Format for statements with a + * colmask. + * @param templ Statement template. + * + * @return Created statement. + */ +struct tuple * +vy_new_simple_stmt(struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask, + const struct vy_stmt_template *templ); + +/** + * Insert into the mem the statement, created by the specified + * template. + * + * @param vy_mem Mem to insert into. + * @param templ Statement template to insert. + */ +void +vy_mem_insert_template(struct vy_mem *mem, + const struct vy_stmt_template *templ); + +/** + * Create a list of read views using the specified vlsns. + * + * @param rlist[out] Result list of read views. + * @param rvs[out] Read views array. + * @param vlsns Array of read view lsns, sorted in ascending + * order. + * @param count Size of the @vlsns. + */ +void +init_read_views_list(struct rlist *rlist, struct vy_read_view *rvs, + const int *vlsns, int count); + +/** + * Create vy_mem with the specified key_def, using the @region as + * allocator. + * + * @param region Allocator for statements and bps. + * @param def Key definition. + * + * @return New vy_mem. + */ +struct vy_mem * +create_test_mem(struct lsregion *region, struct key_def *def); + +/** + * Check that the template specifies completely the same statement + * as @stmt. + * + * @param stmt Actual value. + * @param templ Expected value. + * @param format Template statement format. + * @param upsert_format Template upsert statement format. + * @param format_with_colmask Template statement format with colmask. + * + * @retval stmt === template. + */ +bool +vy_stmt_are_same(const struct tuple *actual, + const struct vy_stmt_template *expected, + struct tuple_format *format, + struct tuple_format *upsert_format, + struct tuple_format *format_with_colmask); + +#endif diff --git a/test/unit/vy_write_iterator.c b/test/unit/vy_write_iterator.c new file mode 100644 index 0000000000000000000000000000000000000000..94c00872603a7de9139d020df9606fc97b40be89 --- /dev/null +++ b/test/unit/vy_write_iterator.c @@ -0,0 +1,418 @@ +#include "memory.h" +#include "fiber.h" +#include "vy_write_iterator.h" +#include <small/slab_cache.h> +#include "vy_iterators_helper.h" + +/** + * Create the mem with the specified key_def and content, iterate + * over it with write_iterator and compare actual result + * statements with the expected ones. + * + * @param key_def Key definition for the mem. + * @param content Mem content statements. + * @param content_count Size of the @content. + * @param expected Expected results of the iteration. + * @param expected_count Size of the @expected. + * @param vlsns Read view lsns for the write iterator. + * @param vlsns_count Size of the @vlsns. + * @param is_primary True, if the new mem belongs to the primary + * index. + * @param is_last_level True, if the new mem is the last level. + */ +void +compare_write_iterator_results(struct key_def *key_def, + const struct vy_stmt_template *content, + int content_count, + const struct vy_stmt_template *expected, + int expected_count, + const int *vlsns, int vlsns_count, + bool is_primary, bool is_last_level) +{ + /* Create lsregion */ + struct lsregion lsregion; + struct slab_cache *slab_cache = cord_slab_cache(); + lsregion_create(&lsregion, slab_cache->arena); + struct vy_mem *mem = create_test_mem(&lsregion, key_def); + for (int i = 0; i < content_count; ++i) + vy_mem_insert_template(mem, &content[i]); + struct rlist rv_list; + struct vy_read_view *rv_array = malloc(sizeof(*rv_array) * vlsns_count); + fail_if(rv_array == NULL); + init_read_views_list(&rv_list, rv_array, vlsns, vlsns_count); + + struct vy_stmt_stream *wi = + vy_write_iterator_new(key_def, mem->format, mem->upsert_format, + is_primary, is_last_level, &rv_list); + fail_if(wi == NULL); + fail_if(vy_write_iterator_add_mem(wi, mem) != 0); + + struct tuple *ret; + fail_if(wi->iface->start(wi) != 0); + int i = 0; + do { + fail_if(wi->iface->next(wi, &ret) != 0); + if (ret == NULL) + break; + fail_if(i >= expected_count); + ok(vy_stmt_are_same(ret, &expected[i], mem->format, + mem->upsert_format, + mem->format_with_colmask), + "stmt %d is correct", i); + ++i; + } while (ret != NULL); + ok(i == expected_count, "correct results count"); + + /* Clean up */ + wi->iface->close(wi); + vy_mem_delete(mem); + lsregion_destroy(&lsregion); + + free(rv_array); +} + +void +test_basic(void) +{ + header(); + plan(36); + + /* Create key_def */ + uint32_t fields[] = { 0 }; + uint32_t types[] = { FIELD_TYPE_UNSIGNED }; + struct key_def *key_def = box_key_def_new(fields, types, 1); + assert(key_def != NULL); + +/* + * STATEMENT: REPL REPL REPL DEL REPL REPL REPL REPL REPL REPL + * LSN: 5 6 7 8 9 10 11 12 13 14 + * READ VIEW: * * * + * \____________/\________/\_________________/\___________/ + * merge merge merge merge + */ +{ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE(6, REPLACE, 1, 2), + STMT_TEMPLATE(7, REPLACE, 1, 3), + STMT_TEMPLATE(8, REPLACE, 1, 4), + STMT_TEMPLATE(9, REPLACE, 1, 5), + STMT_TEMPLATE(10, REPLACE, 1, 6), + STMT_TEMPLATE(11, REPLACE, 1, 7), + STMT_TEMPLATE(12, REPLACE, 1, 8), + STMT_TEMPLATE(13, REPLACE, 1, 9), + STMT_TEMPLATE(14, REPLACE, 1, 10), + }; + const struct vy_stmt_template expected[] = { + content[9], content[7], content[4], content[2] + }; + const int vlsns[] = {7, 9, 12}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: UPS UPS UPS UPS UPS UPS UPS UPS UPS UPS + * LSN: 5 6 7 8 9 10 11 12 13 14 + * READ VIEW: * * * + * \________/\_________________/\_____________/\_____/ + * squash squash squash squash + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, UPSERT, 1, 1), + STMT_TEMPLATE(6, UPSERT, 1, 2), + STMT_TEMPLATE(7, UPSERT, 1, 3), + STMT_TEMPLATE(8, UPSERT, 1, 4), + STMT_TEMPLATE(9, UPSERT, 1, 5), + STMT_TEMPLATE(10, UPSERT, 1, 6), + STMT_TEMPLATE(11, UPSERT, 1, 7), + STMT_TEMPLATE(12, UPSERT, 1, 8), + STMT_TEMPLATE(13, UPSERT, 1, 9), + STMT_TEMPLATE(14, UPSERT, 1, 10), + }; + const struct vy_stmt_template expected[] = { + content[9], + STMT_TEMPLATE(13, UPSERT, 1, 7), + STMT_TEMPLATE(10, UPSERT, 1, 3), + STMT_TEMPLATE(6, UPSERT, 1, 1), + }; + const int vlsns[] = {6, 10, 13}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL DEL UPS REPL + * LSN: 5 6 7 8 + * READ VIEW: * + * \_______________/\_______/ + * \_____\_/_____/ merge + * skip last level merge + * delete + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE(6, DELETE, 1), + STMT_TEMPLATE(7, UPSERT, 1, 2), + STMT_TEMPLATE(8, REPLACE, 1, 3), + }; + const struct vy_stmt_template expected[] = { + content[3], + STMT_TEMPLATE(7, REPLACE, 1, 2) + }; + const int vlsns[] = {7}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: REPL REPL + * LSN: 7 8 + * READ VIEW: * * + * No merge. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, REPLACE, 1, 1), + STMT_TEMPLATE(8, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[1], content[0] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * LINKED WITH: gh-1824, about pruning last DELETE. + * STATEMENT: DEL REPL + * LSN: 7 8 + * READ VIEW: * * + * + * is_last_level = true. + * No merge, skip DELETE from last level, although there the read + * view on the DELETE exists. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, DELETE, 1), + STMT_TEMPLATE(8, REPLACE, 1, 1), + }; + const struct vy_stmt_template expected[] = { content[1] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * LINKED WITH: gh-1824, about pruning last DELETE. + * STATEMENT: DEL REPL + * LSN: 7 8 + * READ VIEW: * * + * + * is_last_level = false; + * No merge, don't skip DELETE from last level. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(7, DELETE, 1), + STMT_TEMPLATE(8, REPLACE, 1, 1), + }; + const struct vy_stmt_template expected[] = { content[1], content[0] }; + const int vlsns[] = {7, 8}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL DEL REPL REPL + * LSN: 5 6 6 7 + * READ VIEW: * + * \_______________/\_______/ + * \_____/\______/ + * merge skip as + * optimized + * update + * DEL and REPL with lsn 6 can be skipped for read view 6 for + * secondary index, because they do not change secondary key. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(5, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(6, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(6, REPLACE, 1, 2), + STMT_TEMPLATE(7, REPLACE, 1, 3) + }; + const struct vy_stmt_template expected[] = { content[3], content[0] }; + const int vlsns[] = {6}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, true); +} +{ +/* + * STATEMENT: DEL REPL + * LSN: 6 6 + * \______/ + * skip both as optimized update + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE_OPTIMIZED(6, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(6, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = {}; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, false); +} +{ +/* + * STATEMENT: UPS UPS UPS REPL + * LSN: 6 7 8 9 + * READ VIEW: * + * \______/\________/ + * merge merge + * UPSERT before REPLACE must be squashed with only older + * statements. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, UPSERT, 1, 1), + STMT_TEMPLATE(7, UPSERT, 1, 2), + STMT_TEMPLATE(8, UPSERT, 1, 3), + STMT_TEMPLATE(9, REPLACE, 1, 4) + }; + const struct vy_stmt_template expected[] = { + content[3], STMT_TEMPLATE(7, UPSERT, 1, 1) + }; + const int vlsns[] = {7}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} +{ +/* + * STATEMENT: REPL REPL REPL REPL + * LSN: 6 7 20 21 + * READ VIEW: * *(10) * * *(22) *(23) + * \________/\______/\_____/\______/\____________/ + * merge nullify merge merge nullify + * + * Do not remember the read views with the same versions of the + * key. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE(7, REPLACE, 1, 2), + STMT_TEMPLATE(20, REPLACE, 1, 3), + STMT_TEMPLATE(21, REPLACE, 1, 4) + }; + const struct vy_stmt_template expected[] = { + content[3], content[2], content[1] + }; + const int vlsns[] = {7, 10, 20, 21, 22, 23}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, true); +} +{ +/* + * STATEMENT: REPL DEL REPL + * LSN: 6 7 7 + * \___/\__________/ + * merge skip as optimized update + * + * last_level = false. + * Check if the key is not fully skipped in a case of optimized + * update as the newest version. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(7, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(7, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[0] }; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, false, false); +} +{ +/* + * STATEMENT: REPL DEL REPL + * LSN: 6 7 7 + * \_________/|\___/ + * skip last level | skip as optimized + * delete. | update. + * + * last_level = true. First apply 'last level DELETE' optimization + * and only then the 'optimized UPDATE'. + */ + const struct vy_stmt_template content[] = { + STMT_TEMPLATE(6, REPLACE, 1, 1), + STMT_TEMPLATE_OPTIMIZED(7, DELETE, 1), + STMT_TEMPLATE_OPTIMIZED(7, REPLACE, 1, 2), + }; + const struct vy_stmt_template expected[] = { content[2] }; + const int vlsns[] = {}; + int content_count = sizeof(content) / sizeof(content[0]); + int expected_count = sizeof(expected) / sizeof(expected[0]); + int vlsns_count = sizeof(vlsns) / sizeof(vlsns[0]); + compare_write_iterator_results(key_def, content, content_count, + expected, expected_count, + vlsns, vlsns_count, true, false); +} + box_key_def_delete(key_def); + fiber_gc(); + footer(); + check_plan(); +} + +int +main(int argc, char *argv[]) +{ + memory_init(); + fiber_init(fiber_c_invoke); + tuple_init(); + + test_basic(); + + tuple_free(); + fiber_free(); + memory_free(); + return 0; +} diff --git a/test/unit/vy_write_iterator.result b/test/unit/vy_write_iterator.result new file mode 100644 index 0000000000000000000000000000000000000000..86029f9864fc6e4cc3fcb1f570c8c54a4bd4edfc --- /dev/null +++ b/test/unit/vy_write_iterator.result @@ -0,0 +1,39 @@ + *** test_basic *** +1..36 +ok 1 - stmt 0 is correct +ok 2 - stmt 1 is correct +ok 3 - stmt 2 is correct +ok 4 - stmt 3 is correct +ok 5 - correct results count +ok 6 - stmt 0 is correct +ok 7 - stmt 1 is correct +ok 8 - stmt 2 is correct +ok 9 - stmt 3 is correct +ok 10 - correct results count +ok 11 - stmt 0 is correct +ok 12 - stmt 1 is correct +ok 13 - correct results count +ok 14 - stmt 0 is correct +ok 15 - stmt 1 is correct +ok 16 - correct results count +ok 17 - stmt 0 is correct +ok 18 - correct results count +ok 19 - stmt 0 is correct +ok 20 - stmt 1 is correct +ok 21 - correct results count +ok 22 - stmt 0 is correct +ok 23 - stmt 1 is correct +ok 24 - correct results count +ok 25 - correct results count +ok 26 - stmt 0 is correct +ok 27 - stmt 1 is correct +ok 28 - correct results count +ok 29 - stmt 0 is correct +ok 30 - stmt 1 is correct +ok 31 - stmt 2 is correct +ok 32 - correct results count +ok 33 - stmt 0 is correct +ok 34 - correct results count +ok 35 - stmt 0 is correct +ok 36 - correct results count + *** test_basic: done ***