From 2b7efc1e9f4b055f8384f8802de709636b77efe3 Mon Sep 17 00:00:00 2001 From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org> Date: Sat, 10 Feb 2018 02:00:39 +0300 Subject: [PATCH] vinyl: introduce vy_run_writer vy_run_writer incapsulates logic of writing statements to a run. It provides API to write statements one by one. It is needed so that we can abort a run writing task before waiting for it to finish writing the file. Edited by @locker: - add region_truncate() wherever necessary - do not reallocate row index ibuf for each page - remove vy_run_write(), use vy_run_writer directly instead Needed for #3166 --- src/box/vy_run.c | 511 +++++++++++++++++------------------- src/box/vy_run.h | 91 ++++++- src/box/vy_scheduler.c | 60 ++++- test/unit/vy_point_lookup.c | 47 +++- 4 files changed, 412 insertions(+), 297 deletions(-) diff --git a/src/box/vy_run.c b/src/box/vy_run.c index 601808415e..c677dc0986 100644 --- a/src/box/vy_run.c +++ b/src/box/vy_run.c @@ -1890,9 +1890,6 @@ vy_run_dump_stmt(const struct tuple *value, struct xlog *data_xlog, struct vy_page_info *info, const struct key_def *key_def, bool is_primary) { - struct region *region = &fiber()->gc; - size_t used = region_used(region); - struct xrow_header xrow; int rc = (is_primary ? vy_stmt_encode_primary(value, key_def, 0, &xrow) : @@ -1904,8 +1901,6 @@ vy_run_dump_stmt(const struct tuple *value, struct xlog *data_xlog, if ((row_size = xlog_write_row(data_xlog, &xrow)) < 0) return -1; - region_truncate(region, used); - info->unpacked_size += row_size; info->row_count++; return 0; @@ -1967,247 +1962,6 @@ vy_run_alloc_page_info(struct vy_run *run, uint32_t *page_info_capacity) return 0; } -/** - * Write statements from the iterator to a new page in the run, - * update page and run statistics. - * - * @retval 1 all is ok, the iterator is finished - * @retval 0 all is ok, the iterator isn't finished - * @retval -1 error occurred - */ -static int -vy_run_write_page(struct vy_run *run, struct xlog *data_xlog, - struct vy_stmt_stream *wi, struct tuple **curr_stmt, - uint64_t page_size, struct bloom_spectrum *bs, - const struct key_def *cmp_def, - const struct key_def *key_def, bool is_primary, - uint32_t *page_info_capacity) -{ - assert(curr_stmt != NULL); - assert(*curr_stmt != NULL); - struct vy_page_info *page = NULL; - const char *region_key; - bool end_of_run = false; - /* Last written statement */ - struct tuple *last_stmt = *curr_stmt; - vy_stmt_ref_if_possible(last_stmt); - - /* row offsets accumulator */ - struct ibuf row_index_buf; - ibuf_create(&row_index_buf, &cord()->slabc, sizeof(uint32_t) * 4096); - - if (run->info.page_count >= *page_info_capacity && - vy_run_alloc_page_info(run, page_info_capacity) != 0) - goto error_row_index; - assert(*page_info_capacity >= run->info.page_count); - - /* See comment to run_info->max_key allocation below. */ - region_key = tuple_extract_key(*curr_stmt, cmp_def, NULL); - if (region_key == NULL) - goto error_row_index; - - if (run->info.page_count == 0) { - assert(run->info.min_key == NULL); - run->info.min_key = vy_key_dup(region_key); - if (run->info.min_key == NULL) - goto error_row_index; - } - - page = run->page_info + run->info.page_count; - if (vy_page_info_create(page, data_xlog->offset, region_key) != 0) - goto error_row_index; - xlog_tx_begin(data_xlog); - - do { - uint32_t *offset = (uint32_t *) ibuf_alloc(&row_index_buf, - sizeof(uint32_t)); - if (offset == NULL) { - diag_set(OutOfMemory, sizeof(uint32_t), - "ibuf", "row index"); - goto error_rollback; - } - *offset = page->unpacked_size; - - if (vy_run_dump_stmt(*curr_stmt, data_xlog, page, - cmp_def, is_primary) != 0) - goto error_rollback; - - if (bs != NULL) - bloom_spectrum_add(bs, tuple_hash(*curr_stmt, key_def)); - - int64_t lsn = vy_stmt_lsn(*curr_stmt); - run->info.min_lsn = MIN(run->info.min_lsn, lsn); - run->info.max_lsn = MAX(run->info.max_lsn, lsn); - - if (wi->iface->next(wi, curr_stmt)) - goto error_rollback; - - if (*curr_stmt == NULL) { - end_of_run = true; - } else { - vy_stmt_unref_if_possible(last_stmt); - last_stmt = *curr_stmt; - vy_stmt_ref_if_possible(last_stmt); - } - } while (end_of_run == false && - obuf_size(&data_xlog->obuf) < page_size); - - /* We don't write empty pages. */ - assert(last_stmt != NULL); - - if (end_of_run) { - /* - * Tuple_extract_key allocates the key on a - * region, but the max_key must be allocated on - * the heap, because the max_key can live longer - * than a fiber. To reach this, we must copy the - * key into malloced memory. - */ - region_key = tuple_extract_key(last_stmt, cmp_def, NULL); - if (region_key == NULL) - goto error_rollback; - assert(run->info.max_key == NULL); - run->info.max_key = vy_key_dup(region_key); - if (run->info.max_key == NULL) - goto error_rollback; - } - vy_stmt_unref_if_possible(last_stmt); - last_stmt = NULL; - - /* Save offset to row index */ - page->row_index_offset = page->unpacked_size; - - /* Write row index */ - struct xrow_header xrow; - const uint32_t *row_index = (const uint32_t *) row_index_buf.rpos; - assert(ibuf_used(&row_index_buf) == sizeof(uint32_t) * page->row_count); - if (vy_row_index_encode(row_index, page->row_count, &xrow) < 0) - goto error_rollback; - - ssize_t written = xlog_write_row(data_xlog, &xrow); - if (written < 0) - goto error_rollback; - - page->unpacked_size += written; - - written = xlog_tx_commit(data_xlog); - if (written == 0) - written = xlog_flush(data_xlog); - if (written < 0) - goto error_row_index; - - page->size = written; - - assert(page->row_count > 0); - - run->info.page_count++; - vy_run_acct_page(run, page); - - ibuf_destroy(&row_index_buf); - return !end_of_run ? 0: 1; - -error_rollback: - xlog_tx_rollback(data_xlog); -error_row_index: - ibuf_destroy(&row_index_buf); - if (last_stmt != NULL) - vy_stmt_unref_if_possible(last_stmt); - return -1; -} - -/** - * Write statements from the iterator to a new run file. - * - * @retval 0 success - * @retval -1 error occurred - */ -static int -vy_run_write_data(struct vy_run *run, const char *dirpath, - uint32_t space_id, uint32_t iid, - struct vy_stmt_stream *wi, uint64_t page_size, - const struct key_def *cmp_def, - const struct key_def *key_def, - size_t max_output_count, double bloom_fpr) -{ - struct tuple *stmt; - - /* Start iteration. */ - if (wi->iface->start(wi) != 0) - goto err; - if (wi->iface->next(wi, &stmt) != 0) - goto err; - - /* Do not create empty run files. */ - if (stmt == NULL) - goto done; - - struct bloom_spectrum bs; - bool has_bloom = bloom_fpr < 1; - if (has_bloom && bloom_spectrum_create(&bs, max_output_count, - bloom_fpr, runtime.quota) != 0) { - diag_set(OutOfMemory, 0, - "bloom_spectrum_create", "bloom_spectrum"); - goto err; - } - - char path[PATH_MAX]; - vy_run_snprint_path(path, sizeof(path), dirpath, - space_id, iid, run->id, VY_FILE_RUN); - - say_info("writing `%s'", path); - - struct xlog data_xlog; - struct xlog_meta meta = { - .filetype = XLOG_META_TYPE_RUN, - .instance_uuid = INSTANCE_UUID, - }; - if (xlog_create(&data_xlog, path, 0, &meta) < 0) - goto err_free_bloom; - - run->info.min_lsn = INT64_MAX; - run->info.max_lsn = -1; - - assert(run->page_info == NULL); - uint32_t page_info_capacity = 0; - int rc; - do { - rc = vy_run_write_page(run, &data_xlog, wi, &stmt, page_size, - has_bloom ? &bs : NULL, cmp_def, key_def, - iid == 0, &page_info_capacity); - if (rc < 0) - goto err_close_xlog; - fiber_gc(); - } while (rc == 0); - - /* Sync data and link the file to the final name. */ - if (xlog_sync(&data_xlog) < 0 || - xlog_rename(&data_xlog) < 0) - goto err_close_xlog; - - run->fd = data_xlog.fd; - xlog_close(&data_xlog, true); - fiber_gc(); - - if (has_bloom) { - bloom_spectrum_choose(&bs, &run->info.bloom); - run->info.has_bloom = true; - bloom_spectrum_destroy(&bs, runtime.quota); - } -done: - wi->iface->stop(wi); - return 0; - -err_close_xlog: - xlog_close(&data_xlog, false); - fiber_gc(); -err_free_bloom: - if (has_bloom) - bloom_spectrum_destroy(&bs, runtime.quota); -err: - wi->iface->stop(wi); - return -1; -} - /** {{{ vy_page_info */ /** @@ -2446,40 +2200,261 @@ vy_run_write_index(struct vy_run *run, const char *dirpath, return -1; } -/* - * Create a run file, write statements returned by a write - * iterator to it, and create an index file. - */ int -vy_run_write(struct vy_run *run, const char *dirpath, - uint32_t space_id, uint32_t iid, - struct vy_stmt_stream *wi, uint64_t page_size, - const struct key_def *cmp_def, - const struct key_def *key_def, - size_t max_output_count, double bloom_fpr) +vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run, + const char *dirpath, uint32_t space_id, uint32_t iid, + const struct key_def *cmp_def, const struct key_def *key_def, + uint64_t page_size, double bloom_fpr, size_t max_output_count) +{ + memset(writer, 0, sizeof(*writer)); + writer->run = run; + writer->dirpath = dirpath; + writer->space_id = space_id; + writer->iid = iid; + writer->cmp_def = cmp_def; + writer->key_def = key_def; + writer->page_size = page_size; + writer->has_bloom = (max_output_count > 0 && bloom_fpr < 1); + if (writer->has_bloom && + bloom_spectrum_create(&writer->bloom, max_output_count, + bloom_fpr, runtime.quota) != 0) { + diag_set(OutOfMemory, 0, + "bloom_spectrum_create", "bloom_spectrum"); + return -1; + } + xlog_clear(&writer->data_xlog); + ibuf_create(&writer->row_index_buf, &cord()->slabc, + 4096 * sizeof(uint32_t)); + run->info.min_lsn = INT64_MAX; + run->info.max_lsn = -1; + assert(run->page_info == NULL); + return 0; +} + +/** + * Create an xlog to write run. + * @param writer Run writer. + * @retval -1 Memory or IO error. + * @retval 0 Success. + */ +static int +vy_run_writer_create_xlog(struct vy_run_writer *writer) { - ERROR_INJECT(ERRINJ_VY_RUN_WRITE, - {diag_set(ClientError, ER_INJECTION, - "vinyl dump"); return -1;}); + assert(!xlog_is_open(&writer->data_xlog)); + char path[PATH_MAX]; + vy_run_snprint_path(path, sizeof(path), writer->dirpath, + writer->space_id, writer->iid, writer->run->id, + VY_FILE_RUN); + say_info("writing `%s'", path); + const struct xlog_meta meta = { + .filetype = XLOG_META_TYPE_RUN, + .instance_uuid = INSTANCE_UUID, + }; + return xlog_create(&writer->data_xlog, path, 0, &meta); +} - struct errinj *inj = errinj(ERRINJ_VY_RUN_WRITE_TIMEOUT, ERRINJ_DOUBLE); - if (inj != NULL && inj->dparam > 0) - usleep(inj->dparam * 1000000); +/** + * Start a new page with a min_key stored in @a first_stmt. + * @param writer Run writer. + * @param first_stmt First statement of a page. + * + * @retval -1 Memory error. + * @retval 0 Success. + */ +static int +vy_run_writer_start_page(struct vy_run_writer *writer, + const struct tuple *first_stmt) +{ + struct vy_run *run = writer->run; + if (run->info.page_count >= writer->page_info_capacity && + vy_run_alloc_page_info(run, &writer->page_info_capacity) != 0) + return -1; + const char *key = tuple_extract_key(first_stmt, writer->cmp_def, NULL); + if (key == NULL) + return -1; + if (run->info.page_count == 0) { + assert(run->info.min_key == NULL); + run->info.min_key = vy_key_dup(key); + if (run->info.min_key == NULL) + return -1; + } + struct vy_page_info *page = run->page_info + run->info.page_count; + if (vy_page_info_create(page, writer->data_xlog.offset, key) != 0) + return -1; + xlog_tx_begin(&writer->data_xlog); + return 0; +} - if (vy_run_write_data(run, dirpath, space_id, iid, - wi, page_size, cmp_def, key_def, - max_output_count, bloom_fpr) != 0) +/** + * Write @a stmt into a current page. + * @param writer Run writer. + * @param stmt Statement to write. + * + * @retval -1 Memory or IO error. + * @retval 0 Success. + */ +static int +vy_run_writer_write_to_page(struct vy_run_writer *writer, struct tuple *stmt) +{ + if (writer->last_stmt != NULL) + vy_stmt_unref_if_possible(writer->last_stmt); + writer->last_stmt = stmt; + vy_stmt_ref_if_possible(stmt); + struct vy_run *run = writer->run; + struct vy_page_info *page = run->page_info + run->info.page_count; + uint32_t *offset = (uint32_t *)ibuf_alloc(&writer->row_index_buf, + sizeof(uint32_t)); + if (offset == NULL) { + diag_set(OutOfMemory, sizeof(uint32_t), "ibuf", "row index"); + return -1; + } + *offset = page->unpacked_size; + if (vy_run_dump_stmt(stmt, &writer->data_xlog, page, + writer->cmp_def, writer->iid == 0) != 0) return -1; + if (writer->has_bloom) { + bloom_spectrum_add(&writer->bloom, + tuple_hash(stmt, writer->key_def)); + } + int64_t lsn = vy_stmt_lsn(stmt); + run->info.min_lsn = MIN(run->info.min_lsn, lsn); + run->info.max_lsn = MAX(run->info.max_lsn, lsn); + return 0; +} - if (vy_run_is_empty(run)) - return 0; +/** + * Finish a current page. + * @param writer Run writer. + * @retval -1 Memory or IO error. + * @retval 0 Success. + */ +static int +vy_run_writer_end_page(struct vy_run_writer *writer) +{ + struct vy_run *run = writer->run; + struct vy_page_info *page = run->page_info + run->info.page_count; + + assert(page->row_count > 0); + assert(ibuf_used(&writer->row_index_buf) == + sizeof(uint32_t) * page->row_count); - if (vy_run_write_index(run, dirpath, space_id, iid) != 0) + struct xrow_header xrow; + uint32_t *row_index = (uint32_t *)writer->row_index_buf.rpos; + if (vy_row_index_encode(row_index, page->row_count, &xrow) < 0) + return -1; + ssize_t written = xlog_write_row(&writer->data_xlog, &xrow); + if (written < 0) return -1; + page->row_index_offset = page->unpacked_size; + page->unpacked_size += written; + written = xlog_tx_commit(&writer->data_xlog); + if (written == 0) + written = xlog_flush(&writer->data_xlog); + if (written < 0) + return -1; + page->size = written; + run->info.page_count++; + vy_run_acct_page(run, page); + ibuf_reset(&writer->row_index_buf); return 0; } +int +vy_run_writer_append_stmt(struct vy_run_writer *writer, struct tuple *stmt) +{ + int rc = -1; + size_t region_svp = region_used(&fiber()->gc); + if (!xlog_is_open(&writer->data_xlog) && + vy_run_writer_create_xlog(writer) != 0) + goto out; + if (ibuf_used(&writer->row_index_buf) == 0 && + vy_run_writer_start_page(writer, stmt) != 0) + goto out; + if (vy_run_writer_write_to_page(writer, stmt) != 0) + goto out; + if (obuf_size(&writer->data_xlog.obuf) >= writer->page_size && + vy_run_writer_end_page(writer) != 0) + goto out; + rc = 0; +out: + region_truncate(&fiber()->gc, region_svp); + return rc; +} + +/** + * Destroy a run writer. + * @param writer Writer to destroy. + * @param reuse_fd True in a case of success run write. And else + * false. + */ +static void +vy_run_writer_destroy(struct vy_run_writer *writer, bool reuse_fd) +{ + if (writer->last_stmt != NULL) + vy_stmt_unref_if_possible(writer->last_stmt); + if (xlog_is_open(&writer->data_xlog)) + xlog_close(&writer->data_xlog, reuse_fd); + if (writer->has_bloom) + bloom_spectrum_destroy(&writer->bloom, runtime.quota); + ibuf_destroy(&writer->row_index_buf); +} + +int +vy_run_writer_commit(struct vy_run_writer *writer) +{ + int rc = -1; + size_t region_svp = region_used(&fiber()->gc); + + if (ibuf_used(&writer->row_index_buf) != 0 && + vy_run_writer_end_page(writer) != 0) + goto out; + + struct vy_run *run = writer->run; + if (vy_run_is_empty(run)) { + vy_run_writer_destroy(writer, false); + rc = 0; + goto out; + } + + assert(writer->last_stmt != NULL); + const char *key = tuple_extract_key(writer->last_stmt, + writer->cmp_def, NULL); + if (key == NULL) + goto out; + + assert(run->info.max_key == NULL); + run->info.max_key = vy_key_dup(key); + if (run->info.max_key == NULL) + goto out; + + /* Sync data and link the file to the final name. */ + if (xlog_sync(&writer->data_xlog) < 0 || + xlog_rename(&writer->data_xlog) < 0) + goto out; + + if (writer->has_bloom) { + bloom_spectrum_choose(&writer->bloom, &run->info.bloom); + run->info.has_bloom = true; + } + if (vy_run_write_index(run, writer->dirpath, + writer->space_id, writer->iid) != 0) + goto out; + + run->fd = writer->data_xlog.fd; + vy_run_writer_destroy(writer, true); + rc = 0; +out: + region_truncate(&fiber()->gc, region_svp); + return rc; +} + +void +vy_run_writer_abort(struct vy_run_writer *writer) +{ + vy_run_writer_destroy(writer, false); +} + int vy_run_rebuild_index(struct vy_run *run, const char *dir, uint32_t space_id, uint32_t iid, diff --git a/src/box/vy_run.h b/src/box/vy_run.h index de54937c26..60a29d7343 100644 --- a/src/box/vy_run.h +++ b/src/box/vy_run.h @@ -41,6 +41,7 @@ #include "vy_read_view.h" #include "vy_stat.h" #include "index_def.h" +#include "xlog.h" #include "small/mempool.h" #include "salad/bloom.h" @@ -418,14 +419,6 @@ int vy_run_remove_files(const char *dir, uint32_t space_id, uint32_t iid, int64_t run_id); -int -vy_run_write(struct vy_run *run, const char *dirpath, - uint32_t space_id, uint32_t iid, - struct vy_stmt_stream *wi, uint64_t page_size, - const struct key_def *cmp_def, - const struct key_def *key_def, - size_t max_output_count, double bloom_fpr); - /** * Allocate a new run slice. * This function increments @run->refs. @@ -575,6 +568,88 @@ vy_slice_stream_open(struct vy_slice_stream *stream, struct vy_slice *slice, const struct key_def *cmp_def, struct tuple_format *format, struct tuple_format *upsert_format, bool is_primary); +/** + * Run_writer fills a created run with statements one by one, + * splitting them into pages. + */ +struct vy_run_writer { + /** Run to fill. */ + struct vy_run *run; + /** Path to directory with run files. */ + const char *dirpath; + /** Identifier of a space owning the run. */ + uint32_t space_id; + /** Identifier of an index owning the run. */ + uint32_t iid; + /** + * Key definition to extract from tuple and store as page + * min key, run min/max keys, and secondary index + * statements. + */ + const struct key_def *cmp_def; + /** Key definition to calculate bloom. */ + const struct key_def *key_def; + /** + * Minimal page size. When a page becames bigger, it is + * dumped. + */ + uint64_t page_size; + /** + * Current page info capacity. Can grow with page number. + */ + uint32_t page_info_capacity; + /** Xlog to write data. */ + struct xlog data_xlog; + /** Set iff bloom filter is available. */ + bool has_bloom; + /** Bloom filter. */ + struct bloom_spectrum bloom; + /** Buffer of a current page row offsets. */ + struct ibuf row_index_buf; + /** + * Remember a last written statement to use it as a source + * of max key of a finished run. + */ + struct tuple *last_stmt; +}; + +/** Create a run writer to fill a run with statements. */ +int +vy_run_writer_create(struct vy_run_writer *writer, struct vy_run *run, + const char *dirpath, uint32_t space_id, uint32_t iid, + const struct key_def *cmp_def, const struct key_def *key_def, + uint64_t page_size, double bloom_fpr, size_t max_output_count); + +/** + * Write a specified statement into a run. + * @param writer Writer to write a statement. + * @param stmt Statement to write. + * + * @retval -1 Memory error. + * @retval 0 Success. + */ +int +vy_run_writer_append_stmt(struct vy_run_writer *writer, struct tuple *stmt); + +/** + * Finalize run writing by writing run index into file. The writer + * is deleted after call. + * @param writer Run writer. + * @retval -1 Memory or IO error. + * @retval 0 Success. + */ +int +vy_run_writer_commit(struct vy_run_writer *writer); + +/** + * Abort run writing. Can not delete a run and run's file here, + * becase it must be done from tx thread. The writer is deleted + * after call. + * @param Run writer. + */ +void +vy_run_writer_abort(struct vy_run_writer *writer); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/box/vy_scheduler.c b/src/box/vy_scheduler.c index df8eb87fab..e4cf56019d 100644 --- a/src/box/vy_scheduler.c +++ b/src/box/vy_scheduler.c @@ -622,15 +622,55 @@ vy_run_discard(struct vy_run *run) } static int -vy_task_dump_execute(struct vy_task *task) +vy_task_write_run(struct vy_task *task) { struct vy_index *index = task->index; + struct vy_stmt_stream *wi = task->wi; + + ERROR_INJECT(ERRINJ_VY_RUN_WRITE, + {diag_set(ClientError, ER_INJECTION, + "vinyl dump"); return -1;}); + + struct errinj *inj = errinj(ERRINJ_VY_RUN_WRITE_TIMEOUT, ERRINJ_DOUBLE); + if (inj != NULL && inj->dparam > 0) + usleep(inj->dparam * 1000000); + + struct vy_run_writer writer; + if (vy_run_writer_create(&writer, task->new_run, index->env->path, + index->space_id, index->id, + index->cmp_def, index->key_def, + task->page_size, task->bloom_fpr, + task->max_output_count) != 0) + goto fail; + + if (wi->iface->start(wi) != 0) + goto fail_abort_writer; + int rc; + struct tuple *stmt = NULL; + while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) { + rc = vy_run_writer_append_stmt(&writer, stmt); + if (rc != 0) + break; + } + wi->iface->stop(wi); - return vy_run_write(task->new_run, index->env->path, - index->space_id, index->id, task->wi, - task->page_size, index->cmp_def, - index->key_def, task->max_output_count, - task->bloom_fpr); + if (rc == 0) + rc = vy_run_writer_commit(&writer); + if (rc != 0) + goto fail_abort_writer; + + return 0; + +fail_abort_writer: + vy_run_writer_abort(&writer); +fail: + return -1; +} + +static int +vy_task_dump_execute(struct vy_task *task) +{ + return vy_task_write_run(task); } static int @@ -993,13 +1033,7 @@ vy_task_dump_new(struct vy_scheduler *scheduler, struct vy_index *index, static int vy_task_compact_execute(struct vy_task *task) { - struct vy_index *index = task->index; - - return vy_run_write(task->new_run, index->env->path, - index->space_id, index->id, task->wi, - task->page_size, index->cmp_def, - index->key_def, task->max_output_count, - task->bloom_fpr); + return vy_task_write_run(task); } static int diff --git a/test/unit/vy_point_lookup.c b/test/unit/vy_point_lookup.c index d360b3b488..0cc8dc223f 100644 --- a/test/unit/vy_point_lookup.c +++ b/test/unit/vy_point_lookup.c @@ -13,13 +13,48 @@ uint32_t schema_version; +static int +write_run(struct vy_run *run, const char *dir_name, + struct vy_index *index, struct vy_stmt_stream *wi) +{ + struct vy_run_writer writer; + if (vy_run_writer_create(&writer, run, dir_name, + index->space_id, index->id, + index->cmp_def, index->key_def, + 4096, 0.1, 100500) != 0) + goto fail; + + if (wi->iface->start(wi) != 0) + goto fail_abort_writer; + int rc; + struct tuple *stmt = NULL; + while ((rc = wi->iface->next(wi, &stmt)) == 0 && stmt != NULL) { + rc = vy_run_writer_append_stmt(&writer, stmt); + if (rc != 0) + break; + } + wi->iface->stop(wi); + + if (rc == 0) + rc = vy_run_writer_commit(&writer); + if (rc != 0) + goto fail_abort_writer; + + return 0; + +fail_abort_writer: + vy_run_writer_abort(&writer); +fail: + return -1; +} + static void test_basic() { header(); plan(15); - /** Suppress info messages from vy_run_write(). */ + /** Suppress info messages from vy_run_writer. */ say_set_log_level(S_WARN); const size_t QUOTA = 100 * 1024 * 1024; @@ -72,7 +107,7 @@ test_basic() isnt(dir_name, NULL, "temp dir name is not NULL") char path[PATH_MAX]; strcpy(path, dir_name); - strcat(path, "/0"); + strcat(path, "/512"); rc = mkdir(path, 0777); is(rc, 0, "temp dir create (2)"); strcat(path, "/0"); @@ -163,9 +198,7 @@ test_basic() struct vy_run *run = vy_run_new(&run_env, 1); isnt(run, NULL, "vy_run_new"); - rc = vy_run_write(run, dir_name, 0, pk->id, - write_stream, 4096, pk->cmp_def, pk->key_def, - 100500, 0.1); + rc = write_run(run, dir_name, pk, write_stream); is(rc, 0, "vy_run_write"); write_stream->iface->close(write_stream); @@ -200,9 +233,7 @@ test_basic() run = vy_run_new(&run_env, 2); isnt(run, NULL, "vy_run_new"); - rc = vy_run_write(run, dir_name, 0, pk->id, - write_stream, 4096, pk->cmp_def, pk->key_def, - 100500, 0.1); + rc = write_run(run, dir_name, pk, write_stream); is(rc, 0, "vy_run_write"); write_stream->iface->close(write_stream); -- GitLab