diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c index 89bdfffa6f93a54e746ccb12d422db1e97b38ede..91a77b2ae3afacad90e045f9516853083b3dcb27 100644 --- a/src/box/lua/merger.c +++ b/src/box/lua/merger.c @@ -1115,7 +1115,7 @@ encode_result_buffer(struct lua_State *L, struct merge_source *source, while (result_len < limit && (rc = merge_source_next(source, NULL, &tuple)) == 0 && tuple != NULL) { - uint32_t bsize = tuple->bsize; + uint32_t bsize = tuple_bsize(tuple); ibuf_reserve(output_buffer, bsize); memcpy(output_buffer->wpos, tuple_data(tuple), bsize); output_buffer->wpos += bsize; diff --git a/src/box/memtx_engine.c b/src/box/memtx_engine.c index 16344575c3a0e48f3fb737860b6fc7f37dbd616e..de7a468b57969baf928265056acdbf42b65a9da0 100644 --- a/src/box/memtx_engine.c +++ b/src/box/memtx_engine.c @@ -1268,14 +1268,11 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end) * tuple is not the first field of the memtx_tuple. */ uint32_t data_offset = sizeof(struct tuple) + field_map_size; - if (data_offset > INT16_MAX) { - /** tuple->data_offset is 15 bits */ - diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG, - data_offset); + if (tuple_check_data_offset(data_offset) != 0) goto end; - } size_t tuple_len = end - data; + assert(tuple_len <= UINT32_MAX); /* bsize is UINT32_MAX */ size_t total = sizeof(struct memtx_tuple) + field_map_size + tuple_len; ERROR_INJECT(ERRINJ_TUPLE_ALLOC, { @@ -1300,15 +1297,11 @@ memtx_tuple_new(struct tuple_format *format, const char *data, const char *end) goto end; } tuple = &memtx_tuple->base; - tuple_ref_init(tuple, 0); + tuple_create(tuple, 0, tuple_format_id(format), + data_offset, tuple_len); memtx_tuple->version = memtx->snapshot_version; - assert(tuple_len <= UINT32_MAX); /* bsize is UINT32_MAX */ - tuple->bsize = tuple_len; - tuple->format_id = tuple_format_id(format); tuple_format_ref(format); - tuple->data_offset = data_offset; - tuple->is_dirty = false; - char *raw = (char *) tuple + tuple->data_offset; + char *raw = (char *) tuple + data_offset; field_map_build(&builder, raw - field_map_size); memcpy(raw, data, tuple_len); say_debug("%s(%zu) = %p", __func__, tuple_len, memtx_tuple); diff --git a/src/box/memtx_tx.c b/src/box/memtx_tx.c index 5ab8eec29a7e964792cbb027525cc010b9823a39..0e3aaae93b31c87794324e6d936c554e2b33edf7 100644 --- a/src/box/memtx_tx.c +++ b/src/box/memtx_tx.c @@ -1489,14 +1489,14 @@ memtx_tx_history_commit_stmt(struct txn_stmt *stmt) size_t res = 0; if (stmt->add_story != NULL) { assert(stmt->add_story->add_stmt == stmt); - res += stmt->add_story->tuple->bsize; + res += tuple_bsize(stmt->add_story->tuple); stmt->add_story->add_stmt = NULL; stmt->add_story = NULL; } if (stmt->del_story != NULL) { assert(stmt->del_story->del_stmt == stmt); assert(stmt->next_in_del_list == NULL); - res -= stmt->del_story->tuple->bsize; + res -= tuple_bsize(stmt->del_story->tuple); stmt->del_story->del_stmt = NULL; stmt->del_story = NULL; } diff --git a/src/box/sql.c b/src/box/sql.c index b87d236d1236c5cabf7108351e1a39b9b8ff3e02..c805a1e5c6c39206fa85a52468ea458f12090126 100644 --- a/src/box/sql.c +++ b/src/box/sql.c @@ -1248,7 +1248,7 @@ vdbe_field_ref_prepare_tuple(struct vdbe_field_ref *field_ref, struct tuple *tuple) { vdbe_field_ref_create(field_ref, tuple, tuple_data(tuple), - tuple->bsize); + tuple_bsize(tuple)); } ssize_t diff --git a/src/box/tuple.c b/src/box/tuple.c index bc4703cf773366fe25964d223c5e092a5b0fd2d4..d907ecf27e9da5e5415331a1e001b6584160dff1 100644 --- a/src/box/tuple.c +++ b/src/box/tuple.c @@ -115,12 +115,8 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end goto end; uint32_t field_map_size = field_map_build_size(&builder); uint32_t data_offset = sizeof(struct tuple) + field_map_size; - if (data_offset > INT16_MAX) { - /** tuple->data_offset is 15 bits */ - diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG, - data_offset); + if (tuple_check_data_offset(data_offset) != 0) goto end; - } size_t data_len = end - data; size_t total = sizeof(struct tuple) + field_map_size + data_len; @@ -131,12 +127,9 @@ runtime_tuple_new(struct tuple_format *format, const char *data, const char *end goto end; } - tuple_ref_init(tuple, 0); - tuple->bsize = data_len; - tuple->format_id = tuple_format_id(format); + tuple_create(tuple, 0, tuple_format_id(format), + data_offset, data_len); tuple_format_ref(format); - tuple->data_offset = data_offset; - tuple->is_dirty = false; char *raw = (char *) tuple + data_offset; field_map_build(&builder, raw - field_map_size); memcpy(raw, data, data_len); @@ -609,7 +602,7 @@ size_t box_tuple_bsize(box_tuple_t *tuple) { assert(tuple != NULL); - return tuple->bsize; + return tuple_bsize(tuple); } ssize_t diff --git a/src/box/tuple.h b/src/box/tuple.h index c55b88decbccc4dc0d3f4f806b0c7ebfb2d318b8..ae6751bbfdfac570ecd192c695e55d05b568ff23 100644 --- a/src/box/tuple.h +++ b/src/box/tuple.h @@ -315,8 +315,8 @@ struct PACKED tuple * counter is uploaded to external storage that is acquired back * when the counter falls back to zero. * Is always nonzero in normal reference counted tuples. - * Must not be accessed directly, use @sa tuple_ref_init, - * tuple_ref, tuple_unref instead. + * Must not be accessed directly, use @sa tuple_create, + * tuple_ref_init, tuple_ref, tuple_unref instead. */ uint8_t local_refs; /** @@ -334,12 +334,13 @@ struct PACKED tuple /** Format identifier. */ uint16_t format_id; /** - * Length of the MessagePack data in raw part of the - * tuple. + * Length of the MessagePack data in raw part of the tuple. + * Must be set in tuple_create and accessed by tuple_bsize etc. */ uint32_t bsize; /** * Offset to the MessagePack from the begin of the tuple. + * Must be set in tuple_init_default and accessed by tuple_data_offset. */ uint16_t data_offset; /** @@ -356,9 +357,55 @@ enum { TUPLE_LOCAL_REF_MAX = UINT8_MAX, }; +/** + * Check that data_offset is valid and can be stored in tuple. + * @param data_offset - see member description in struct tuple. + * + * @return 0 on success, -1 otherwise (diag is set). + */ +static inline int +tuple_check_data_offset(uint32_t data_offset) +{ + if (data_offset > INT16_MAX) { + /** tuple->data_offset is 15 bits */ + diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG, + data_offset); + return -1; + } + return 0; +} + +/** + * Initialize a tuple. Must be called right after allocation of a new tuple. + * Should only be called for newly created uninitialized tuples. + * If the tuple copied from another tuple and only initialization of reference + * count is needed, it's better to call tuple_ref_init. + * tuple_check_data_offset should be called before to ensure args are correct. + * + * @param tuple - Tuple to initialize. + * @param refs - initial reference count to set. + * @param format_id - see member description in struct tuple. + * @param data_offset - see member description in struct tuple. + * @param bsize - see member description in struct tuple. + * @return 0 on success, -1 on error (diag is set). + */ +static inline void +tuple_create(struct tuple *tuple, uint8_t refs, uint16_t format_id, + uint16_t data_offset, uint32_t bsize) +{ + assert(data_offset <= INT16_MAX); + tuple->local_refs = refs; + tuple->has_uploaded_refs = false; + tuple->is_dirty = false; + tuple->format_id = format_id; + tuple->data_offset = data_offset; + tuple->bsize = bsize; +} + /** * Initialize tuple reference counter to a given value. * Should only be called for newly created uninitialized tuples. + * For initialization of brand new tuples it's better to use tuple_create. * * @param tuple Tuple to reference * @param refs initial reference count to set. @@ -393,12 +440,26 @@ tuple_is_unreferenced(struct tuple *tuple) return tuple->local_refs == 0; } +/** Offset to the MessagePack from the beginning of the tuple. */ +static inline uint16_t +tuple_data_offset(struct tuple *tuple) +{ + return tuple->data_offset; +} + +/** Size of MessagePack data of the tuple. */ +static inline size_t +tuple_bsize(struct tuple *tuple) +{ + return tuple->bsize; +} + /** Size of the tuple including size of struct tuple. */ static inline size_t tuple_size(struct tuple *tuple) { /* data_offset includes sizeof(struct tuple). */ - return tuple->data_offset + tuple->bsize; + return tuple_data_offset(tuple) + tuple_bsize(tuple); } /** @@ -409,7 +470,7 @@ tuple_size(struct tuple *tuple) static inline const char * tuple_data(struct tuple *tuple) { - return (const char *) tuple + tuple->data_offset; + return (const char *) tuple + tuple_data_offset(tuple); } /** @@ -430,8 +491,8 @@ tuple_data_or_null(struct tuple *tuple) static inline const char * tuple_data_range(struct tuple *tuple, uint32_t *p_size) { - *p_size = tuple->bsize; - return (const char *) tuple + tuple->data_offset; + *p_size = tuple_bsize(tuple); + return (const char *) tuple + tuple_data_offset(tuple); } /** @@ -586,7 +647,7 @@ tuple_validate(struct tuple_format *format, struct tuple *tuple) static inline const uint32_t * tuple_field_map(struct tuple *tuple) { - return (const uint32_t *) ((const char *) tuple + tuple->data_offset); + return (const uint32_t *) tuple_data(tuple); } /** diff --git a/src/box/tuple_compare.cc b/src/box/tuple_compare.cc index 98938fb39aa63c45cd5c363adeac83844cebe0dd..43cd29ce93b792d170135be634a3fe8033b7f59e 100644 --- a/src/box/tuple_compare.cc +++ b/src/box/tuple_compare.cc @@ -874,7 +874,7 @@ tuple_compare_with_key_sequential(struct tuple *tuple, hint_t tuple_hint, * Key's and tuple's first field_count fields are * equal, and their bsize too. */ - key += tuple->bsize - mp_sizeof_array(field_count); + key += tuple_bsize(tuple) - mp_sizeof_array(field_count); for (uint32_t i = field_count; i < part_count; ++i, mp_next(&key)) { if (mp_typeof(*key) != MP_NIL) diff --git a/src/box/tuple_extract_key.cc b/src/box/tuple_extract_key.cc index c1ad3929e703db742f58731b239fe75dca43ae41..795dc65591f17166f181365049f2fb2099daa570 100644 --- a/src/box/tuple_extract_key.cc +++ b/src/box/tuple_extract_key.cc @@ -95,7 +95,7 @@ tuple_extract_key_sequential(struct tuple *tuple, struct key_def *key_def, assert(!has_optional_parts || key_def->is_nullable); assert(has_optional_parts == key_def->has_optional_parts); const char *data = tuple_data(tuple); - const char *data_end = data + tuple->bsize; + const char *data_end = data + tuple_bsize(tuple); return tuple_extract_key_sequential_raw<has_optional_parts>(data, data_end, key_def, @@ -127,7 +127,7 @@ tuple_extract_key_slowpath(struct tuple *tuple, struct key_def *key_def, uint32_t bsize = mp_sizeof_array(part_count); struct tuple_format *format = tuple_format(tuple); const uint32_t *field_map = tuple_field_map(tuple); - const char *tuple_end = data + tuple->bsize; + const char *tuple_end = data + tuple_bsize(tuple); /* Calculate the key size. */ for (uint32_t i = 0; i < part_count; ++i) { diff --git a/src/box/vy_stmt.c b/src/box/vy_stmt.c index d121a1e61ddbe2a7fc6250bf89808d5100e7791e..610e8a801577d03dac4cafc7a8ff718e6d202860 100644 --- a/src/box/vy_stmt.c +++ b/src/box/vy_stmt.c @@ -160,12 +160,8 @@ vy_stmt_alloc(struct tuple_format *format, uint32_t data_offset, uint32_t bsize) { assert(data_offset >= sizeof(struct vy_stmt) + format->field_map_size); - if (data_offset > INT16_MAX) { - /** tuple->data_offset is 15 bits */ - diag_set(ClientError, ER_TUPLE_METADATA_IS_TOO_BIG, - data_offset); + if (tuple_check_data_offset(data_offset) != 0) return NULL; - } struct vy_stmt_env *env = format->engine; uint32_t total_size = data_offset + bsize; @@ -192,13 +188,10 @@ vy_stmt_alloc(struct tuple_format *format, uint32_t data_offset, uint32_t bsize) } say_debug("vy_stmt_alloc(format = %d data_offset = %u, bsize = %u) = %p", format->id, data_offset, bsize, tuple); - tuple_ref_init(tuple, 1); - tuple->format_id = tuple_format_id(format); + tuple_create(tuple, 1, tuple_format_id(format), + data_offset, bsize); if (cord_is_main()) tuple_format_ref(format); - tuple->bsize = bsize; - tuple->data_offset = data_offset; - tuple->is_dirty = false; vy_stmt_set_lsn(tuple, 0); vy_stmt_set_type(tuple, 0); vy_stmt_set_flags(tuple, 0); @@ -215,11 +208,12 @@ vy_stmt_dup(struct tuple *stmt) * the original tuple. */ struct tuple *res = vy_stmt_alloc(tuple_format(stmt), - stmt->data_offset, stmt->bsize); + tuple_data_offset(stmt), + tuple_bsize(stmt)); if (res == NULL) return NULL; assert(tuple_size(res) == tuple_size(stmt)); - assert(res->data_offset == stmt->data_offset); + assert(tuple_data_offset(res) == tuple_data_offset(stmt)); memcpy(res, stmt, tuple_size(stmt)); tuple_ref_init(res, 1); return res; @@ -408,17 +402,18 @@ vy_stmt_replace_from_upsert(struct tuple *upsert) /* Get statement size without UPSERT operations */ uint32_t bsize; vy_upsert_data_range(upsert, &bsize); - assert(bsize <= upsert->bsize); + assert(bsize <= tuple_bsize(upsert)); /* Copy statement data excluding UPSERT operations */ struct tuple_format *format = tuple_format(upsert); - struct tuple *replace = vy_stmt_alloc(format, upsert->data_offset, bsize); + uint16_t data_offset = tuple_data_offset(upsert); + struct tuple *replace = vy_stmt_alloc(format, data_offset, bsize); if (replace == NULL) return NULL; /* Copy both data and field_map. */ char *dst = (char *)replace + sizeof(struct vy_stmt); char *src = (char *)upsert + sizeof(struct vy_stmt); - memcpy(dst, src, upsert->data_offset + bsize - sizeof(struct vy_stmt)); + memcpy(dst, src, data_offset + bsize - sizeof(struct vy_stmt)); vy_stmt_set_type(replace, IPROTO_REPLACE); vy_stmt_set_lsn(replace, vy_stmt_lsn(upsert)); return replace; diff --git a/src/box/vy_stmt.h b/src/box/vy_stmt.h index 8e61b83fbfc29247ba12b1211ed13555dcfd3143..7f3f91676dfd699b0d735bdd28ad7a660ba0c443 100644 --- a/src/box/vy_stmt.h +++ b/src/box/vy_stmt.h @@ -591,7 +591,7 @@ vy_stmt_upsert_ops(struct tuple *tuple, uint32_t *mp_size) assert(vy_stmt_type(tuple) == IPROTO_UPSERT); const char *mp = tuple_data(tuple); mp_next(&mp); - *mp_size = tuple_data(tuple) + tuple->bsize - mp; + *mp_size = tuple_data(tuple) + tuple_bsize(tuple) - mp; return mp; }