diff --git a/src/box/vinyl.c b/src/box/vinyl.c index 3e711badbb3a549ef24fb8af524dec36426c8ac6..9db258ee5e89df7fb3b30f1f62dc4feb65cef479 100644 --- a/src/box/vinyl.c +++ b/src/box/vinyl.c @@ -5377,8 +5377,7 @@ space_name_by_id(uint32_t id); * @retval -1 - memory error */ static int -vy_upsert_try_to_squash(struct tuple_format *format, uint32_t part_count, - struct region *region, +vy_upsert_try_to_squash(struct tuple_format *format, struct region *region, const char *key_mp, const char *key_mp_end, const char *old_serie, const char *old_serie_end, const char *new_serie, const char *new_serie_end, @@ -5399,8 +5398,8 @@ vy_upsert_try_to_squash(struct tuple_format *format, uint32_t part_count, operations[0].iov_base = (void *)squashed; operations[0].iov_len = squashed_size; - *result_stmt = vy_stmt_new_upsert(key_mp, key_mp_end, format, - part_count, operations, 1); + *result_stmt = vy_stmt_new_upsert(format, key_mp, key_mp_end, + operations, 1); if (*result_stmt == NULL) return -1; return 0; @@ -5455,8 +5454,8 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt, /* * UPDATE case: return the updated old stmt. */ - result_stmt = vy_stmt_new_replace(result_mp, result_mp_end, - format, key_def->part_count); + result_stmt = vy_stmt_new_replace(format, result_mp, + result_mp_end); region_truncate(region, region_svp); if (result_stmt == NULL) return NULL; /* OOM */ @@ -5477,7 +5476,7 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt, * UPSERT + UPSERT case: combine operations */ assert(old_ops_end - old_ops > 0); - if (vy_upsert_try_to_squash(format, key_def->part_count, region, + if (vy_upsert_try_to_squash(format, region, result_mp, result_mp_end, old_ops, old_ops_end, new_ops, new_ops_end, @@ -5508,8 +5507,7 @@ vy_apply_upsert(const struct tuple *new_stmt, const struct tuple *old_stmt, operations[0].iov_base = (void *)ops_buf; operations[0].iov_len = header - ops_buf; - result_stmt = vy_stmt_new_upsert(result_mp, result_mp_end, - format, key_def->part_count, + result_stmt = vy_stmt_new_upsert(format, result_mp, result_mp_end, operations, 3); if (result_stmt == NULL) { region_truncate(region, region_svp); @@ -5747,8 +5745,7 @@ vy_insert_secondary(struct vy_tx *tx, struct vy_index *index, if (vy_check_dup_key(tx, index, check_key, part_count)) return -1; } - struct tuple *tuple = vy_stmt_new_replace(key, key_end, index->format, - def->part_count); + struct tuple *tuple = vy_stmt_new_replace(index->format, key, key_end); if (tuple == NULL) return -1; int rc = vy_tx_set(tx, index, tuple); @@ -5781,8 +5778,8 @@ vy_replace_one(struct vy_tx *tx, struct space *space, struct key_def *def = pk->key_def; assert(def->iid == 0); struct tuple *new_tuple = - vy_stmt_new_replace(request->tuple, request->tuple_end, - pk->format, def->part_count); + vy_stmt_new_replace(pk->format, request->tuple, + request->tuple_end); if (new_tuple == NULL) return -1; /** @@ -5865,8 +5862,8 @@ vy_replace_impl(struct vy_tx *tx, struct space *space, struct request *request, return -1; struct key_def *def = pk->key_def; assert(def->iid == 0); - new_stmt = vy_stmt_new_replace(request->tuple, request->tuple_end, - pk->format, def->part_count); + new_stmt = vy_stmt_new_replace(pk->format, request->tuple, + request->tuple_end); if (new_stmt == NULL) return -1; const char *key = tuple_extract_key(new_stmt, def, NULL); @@ -6223,8 +6220,7 @@ vy_update(struct vy_tx *tx, struct txn_stmt *stmt, struct space *space, if (tuple_validate_raw(space->format, new_tuple)) return -1; stmt->new_tuple = - vy_stmt_new_replace(new_tuple, new_tuple_end, pk->format, - pk->key_def->part_count); + vy_stmt_new_replace(pk->format, new_tuple, new_tuple_end); if (stmt->new_tuple == NULL) return -1; if (vy_check_update(pk, stmt->old_tuple, stmt->new_tuple)) @@ -6306,8 +6302,8 @@ vy_index_upsert(struct vy_tx *tx, struct vy_index *index, struct iovec operations[1]; operations[0].iov_base = (void *)expr; operations[0].iov_len = expr_end - expr; - vystmt = vy_stmt_new_upsert(tuple, tuple_end, index->format, - index->key_def->part_count, operations, 1); + vystmt = vy_stmt_new_upsert(index->format, tuple, tuple_end, + operations, 1); if (vystmt == NULL) return -1; assert(vy_stmt_type(vystmt) == IPROTO_UPSERT); @@ -6375,8 +6371,7 @@ vy_upsert(struct vy_tx *tx, struct txn_stmt *stmt, struct space *space, */ if (old_stmt == NULL) { stmt->new_tuple = - vy_stmt_new_replace(tuple, tuple_end, pk->format, - pk_def->part_count); + vy_stmt_new_replace(pk->format, tuple, tuple_end); if (stmt->new_tuple == NULL) return -1; return vy_insert_first_upsert(tx, space, stmt->new_tuple); @@ -6403,8 +6398,7 @@ vy_upsert(struct vy_tx *tx, struct txn_stmt *stmt, struct space *space, new_tuple_end = new_tuple + new_size; stmt->old_tuple = old_stmt; stmt->new_tuple = - vy_stmt_new_replace(new_tuple, new_tuple_end, pk->format, - pk->key_def->part_count); + vy_stmt_new_replace(pk->format, new_tuple, new_tuple_end); if (stmt->new_tuple == NULL) return -1; @@ -6464,8 +6458,8 @@ vy_insert(struct vy_tx *tx, struct txn_stmt *stmt, struct space *space, assert(def->iid == 0); /* First insert into the primary index. */ stmt->new_tuple = - vy_stmt_new_replace(request->tuple, request->tuple_end, - pk->format, def->part_count); + vy_stmt_new_replace(pk->format, request->tuple, + request->tuple_end); if (stmt->new_tuple == NULL) return -1; if (vy_insert_primary(tx, pk, stmt->new_tuple) != 0) @@ -7020,7 +7014,7 @@ vy_page_delete(struct vy_page *page) */ static struct tuple * vy_page_stmt(struct vy_page *page, uint32_t stmt_no, - struct tuple_format *format, const struct key_def *key_def) + struct tuple_format *format) { assert(stmt_no < page->count); const char *data = page->data + page->row_index[stmt_no]; @@ -7030,7 +7024,7 @@ vy_page_stmt(struct vy_page *page, uint32_t stmt_no, struct xrow_header xrow; if (xrow_header_decode(&xrow, &data, data_end) != 0) return NULL; - return vy_stmt_decode(&xrow, format, key_def->part_count); + return vy_stmt_decode(&xrow, format); } /** @@ -7364,8 +7358,7 @@ vy_run_iterator_read(struct vy_run_iterator *itr, int rc = vy_run_iterator_load_page(itr, pos.page_no, &page); if (rc != 0) return rc; - *stmt = vy_page_stmt(page, pos.pos_in_page, itr->index->format, - itr->index->key_def); + *stmt = vy_page_stmt(page, pos.pos_in_page, itr->index->format); if (*stmt == NULL) return -1; return 0; @@ -7425,8 +7418,7 @@ vy_run_iterator_search_in_page(struct vy_run_iterator *itr, struct vy_index *idx = itr->index; while (beg != end) { uint32_t mid = beg + (end - beg) / 2; - struct tuple *fnd_key = vy_page_stmt(page, mid, idx->format, - idx->key_def); + struct tuple *fnd_key = vy_page_stmt(page, mid, idx->format); if (fnd_key == NULL) return end; int cmp = vy_stmt_compare(fnd_key, key, idx->key_def); diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index d5eb2ad7b4777e2c7baba3c4b958390bc096c743..589c9fb4e67f20be904342721f77a14b52cfc50d 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -164,21 +164,15 @@ vy_stmt_new_delete(struct tuple_format *format, const char *key, * Operations can be saved in the space available by @param extra. * For details @sa struct vy_stmt comment. */ -struct tuple * -vy_stmt_new_with_ops(const char *tuple_begin, const char *tuple_end, - enum iproto_type type, struct tuple_format *format, - uint32_t part_count, - struct iovec *operations, uint32_t iovcnt) +static struct tuple * +vy_stmt_new_with_ops(struct tuple_format *format, const char *tuple_begin, + const char *tuple_end, struct iovec *operations, + uint32_t iovcnt, enum iproto_type type) { - (void) part_count; /* unused in release. */ -#ifndef NDEBUG - const char *tuple_end_must_be = tuple_begin; - mp_next(&tuple_end_must_be); - assert(tuple_end == tuple_end_must_be); -#endif + mp_tuple_assert(tuple_begin, tuple_end); uint32_t field_count = mp_decode_array(&tuple_begin); - assert(field_count >= part_count); + assert(field_count >= format->field_count); uint32_t extra_size = 0; for (uint32_t i = 0; i < iovcnt; ++i) { @@ -222,20 +216,20 @@ vy_stmt_new_with_ops(const char *tuple_begin, const char *tuple_end, } struct tuple * -vy_stmt_new_upsert(const char *tuple_begin, const char *tuple_end, - struct tuple_format *format, uint32_t part_count, - struct iovec *operations, uint32_t ops_cnt) +vy_stmt_new_upsert(struct tuple_format *format, const char *tuple_begin, + const char *tuple_end, struct iovec *operations, + uint32_t ops_cnt) { - return vy_stmt_new_with_ops(tuple_begin, tuple_end, IPROTO_UPSERT, - format, part_count, operations, ops_cnt); + return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, + operations, ops_cnt, IPROTO_UPSERT); } struct tuple * -vy_stmt_new_replace(const char *tuple_begin, const char *tuple_end, - struct tuple_format *format, uint32_t part_count) +vy_stmt_new_replace(struct tuple_format *format, const char *tuple_begin, + const char *tuple_end) { - return vy_stmt_new_with_ops(tuple_begin, tuple_end, IPROTO_REPLACE, - format, part_count, NULL, 0); + return vy_stmt_new_with_ops(format, tuple_begin, tuple_end, + NULL, 0, IPROTO_REPLACE); } struct tuple * @@ -333,8 +327,7 @@ vy_stmt_encode(const struct tuple *value, const struct key_def *key_def, } struct tuple * -vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format, - uint32_t part_count) +vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format) { struct request request; request_create(&request, xrow->type); @@ -348,20 +341,17 @@ vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format, case IPROTO_DELETE: /* extract key */ field_count = mp_decode_array(&request.key); - assert(field_count == part_count); stmt = vy_stmt_new_delete(format, request.key, field_count); break; case IPROTO_REPLACE: - stmt = vy_stmt_new_replace(request.tuple, - request.tuple_end, - format, part_count); + stmt = vy_stmt_new_replace(format, request.tuple, + request.tuple_end); break; case IPROTO_UPSERT: ops.iov_base = (char *)request.ops; ops.iov_len = request.ops_end - request.ops; - stmt = vy_stmt_new_upsert(request.tuple, - request.tuple_end, - format, part_count, &ops, 1); + stmt = vy_stmt_new_upsert(format, request.tuple, + request.tuple_end, &ops, 1); break; default: diag_set(ClientError, ER_VINYL, "unknown request type"); diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index d69709496b72d77dae220864d100a88d2d285e4f..07a401af56f6fe7c92cfb57f439a63049ace3936 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -302,18 +302,17 @@ vy_stmt_new_delete(struct tuple_format *format, const char *key, /** * Create the REPLACE statement from raw MessagePack data. + * @param format Format of a tuple for offsets generating. * @param tuple_begin MessagePack data that contain an array of fields WITH the * array header. * @param tuple_end End of the array that begins from @param tuple_begin. - * @param format Format of a tuple for offsets generating. - * @param part_count Part count from key definition. * * @retval NULL Memory allocation error. * @retval not NULL Success. */ struct tuple * -vy_stmt_new_replace(const char *tuple_begin, const char *tuple_end, - struct tuple_format *format, uint32_t part_count); +vy_stmt_new_replace(struct tuple_format *format, + const char *tuple_begin, const char *tuple_end); /** * Create the UPSERT statement from raw MessagePack data. @@ -329,8 +328,8 @@ vy_stmt_new_replace(const char *tuple_begin, const char *tuple_end, * @retval not NULL Success. */ struct tuple * -vy_stmt_new_upsert(const char *tuple_begin, const char *tuple_end, - struct tuple_format *format, uint32_t part_count, +vy_stmt_new_upsert(struct tuple_format *format, + const char *tuple_begin, const char *tuple_end, struct iovec *operations, uint32_t ops_cnt); /** @@ -432,8 +431,7 @@ vy_stmt_encode(const struct tuple *value, const struct key_def *key_def, * @retval NULL on error */ struct tuple * -vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format, - uint32_t part_count); +vy_stmt_decode(struct xrow_header *xrow, struct tuple_format *format); /** * Format a key into string.