diff --git a/changelogs/unreleased/gh-6253-iproto-id.md b/changelogs/unreleased/gh-6253-iproto-id.md new file mode 100644 index 0000000000000000000000000000000000000000..617e695697dccbcdbc646547d532f9cc5e6d24fa --- /dev/null +++ b/changelogs/unreleased/gh-6253-iproto-id.md @@ -0,0 +1,5 @@ +## feature/core + +* Extended the network protocol (IPROTO) with a new request type (`IPROTO_ID`) + that is supposed to be used for exchanging sets of supported features between + server and client (gh-6253). diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 0fb47fa7a5dc3d50eba0ff42a913ce9a19c64f4c..a6de36c66e663891a42abf8fbb92a1484ee61b59 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -82,7 +82,7 @@ include_directories(${CMAKE_BINARY_DIR}/src/box) add_library(box_error STATIC error.cc errcode.c mp_error.cc) target_link_libraries(box_error core stat mpstream vclock) -add_library(xrow STATIC xrow.c iproto_constants.c) +add_library(xrow STATIC xrow.c iproto_constants.c iproto_features.c) target_link_libraries(xrow server core small vclock misc box_error scramble ${MSGPUCK_LIBRARIES}) diff --git a/src/box/iproto.cc b/src/box/iproto.cc index e5664b816ff02462cab935e1c8b46d7df46c531f..0c171544f013cf332451516823cc68a77104213d 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -60,6 +60,7 @@ #include "schema.h" /* schema_version */ #include "replication.h" /* instance_uuid */ #include "iproto_constants.h" +#include "iproto_features.h" #include "rmean.h" #include "execute.h" #include "errinj.h" @@ -297,6 +298,8 @@ struct iproto_msg struct call_request call; /** Authentication request. */ struct auth_request auth; + /** Features request. */ + struct id_request id; /* SQL request, if this is the EXECUTE/PREPARE request. */ struct sql_request sql; /** In case of iproto parse error, saved diagnostics. */ @@ -1525,6 +1528,11 @@ iproto_msg_decode(struct iproto_msg *msg, const char **pos, const char *reqend, case IPROTO_PING: cmsg_init(&msg->base, iproto_thread->misc_route); break; + case IPROTO_ID: + if (xrow_decode_id(&msg->header, &msg->id) != 0) + goto error; + cmsg_init(&msg->base, iproto_thread->misc_route); + break; case IPROTO_JOIN: case IPROTO_FETCH_SNAPSHOT: case IPROTO_REGISTER: @@ -2081,6 +2089,10 @@ tx_process_misc(struct cmsg *m) iproto_reply_ok_xc(out, msg->header.sync, ::schema_version); break; + case IPROTO_ID: + iproto_reply_id_xc(out, msg->header.sync, + ::schema_version); + break; case IPROTO_VOTE_DEPRECATED: iproto_reply_vclock_xc(out, &replicaset.vclock, msg->header.sync, @@ -2728,6 +2740,8 @@ iproto_thread_init(struct iproto_thread *iproto_thread) void iproto_init(int threads_count) { + iproto_features_init(); + iproto_threads_count = 0; struct session_vtab iproto_session_vtab = { /* .push = */ iproto_session_push, diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index 83193a0e734f27fb79d0b3ef111bf3035e342373..453fc63a4d054e97ffa975f68de768547efac4b2 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -147,6 +147,8 @@ const unsigned char iproto_key_type[IPROTO_KEY_MAX] = /* 0x51 */ MP_ARRAY, /* IPROTO_ID_FILTER */ /* 0x52 */ MP_MAP, /* IPROTO_ERROR */ /* 0x53 */ MP_UINT, /* IPROTO_TERM */ + /* 0x54 */ MP_UINT, /* IPROTO_VERSION */ + /* 0x55 */ MP_ARRAY, /* IPROTO_FEATURES */ /* }}} */ }; @@ -278,6 +280,8 @@ const char *iproto_key_strs[IPROTO_KEY_MAX] = { "id filter", /* 0x51 */ "error", /* 0x52 */ "term", /* 0x53 */ + "version", /* 0x54 */ + "features", /* 0x55 */ }; const char *vy_page_info_key_strs[VY_PAGE_INFO_KEY_MAX] = { diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 9210075808b7920cafff55ffc444b499d9adc534..6786947e67ea6faf6ff8c7415f4b3977d9682ee7 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -139,6 +139,10 @@ enum iproto_key { * both iproto (e.g. REPLICA_ID) and raft (RAFT_TERM) keys. */ IPROTO_TERM = 0x53, + /** Protocol version. */ + IPROTO_VERSION = 0x54, + /** Protocol features. */ + IPROTO_FEATURES = 0x55, /* * Be careful to not extend iproto_key values over 0x7f. * iproto_keys are encoded in msgpack as positive fixnum, which ends at @@ -273,6 +277,8 @@ enum iproto_type { IPROTO_REGISTER = 70, IPROTO_JOIN_META = 71, IPROTO_JOIN_SNAPSHOT = 72, + /** Protocol features request. */ + IPROTO_ID = 73, /** Vinyl run info stored in .index file */ VY_INDEX_RUN_INFO = 100, diff --git a/src/box/iproto_features.c b/src/box/iproto_features.c new file mode 100644 index 0000000000000000000000000000000000000000..83205a441236a46c4a7994a4e32924f9d26c5cf7 --- /dev/null +++ b/src/box/iproto_features.c @@ -0,0 +1,65 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. + */ +#include "iproto_features.h" + +#include <stdint.h> + +#include "msgpuck.h" + +struct iproto_features IPROTO_CURRENT_FEATURES; + +uint32_t +mp_sizeof_iproto_features(const struct iproto_features *features) +{ + int count = 0; + uint32_t size = 0; + iproto_features_foreach(features, feature_id) { + size += mp_sizeof_uint(feature_id); + count++; + } + size += mp_sizeof_array(count); + return size; +} + +char * +mp_encode_iproto_features(char *data, const struct iproto_features *features) +{ + int count = 0; + iproto_features_foreach(features, feature_id) + count++; + data = mp_encode_array(data, count); + iproto_features_foreach(features, feature_id) + data = mp_encode_uint(data, feature_id); + return data; +} + +int +mp_decode_iproto_features(const char **data, struct iproto_features *features) +{ + if (mp_typeof(**data) != MP_ARRAY) + return -1; + uint32_t size = mp_decode_array(data); + for (uint32_t i = 0; i < size; i++) { + if (mp_typeof(**data) != MP_UINT) + return -1; + uint64_t feature_id = mp_decode_uint(data); + /* Ignore unknown features for forward compatibility. */ + if (feature_id >= iproto_feature_id_MAX) + continue; + iproto_features_set(features, feature_id); + } + return 0; +} + +void +iproto_features_init(void) +{ + iproto_features_create(&IPROTO_CURRENT_FEATURES); + iproto_features_set(&IPROTO_CURRENT_FEATURES, + IPROTO_FEATURE_STREAMS); + iproto_features_set(&IPROTO_CURRENT_FEATURES, + IPROTO_FEATURE_TRANSACTIONS); +} diff --git a/src/box/iproto_features.h b/src/box/iproto_features.h new file mode 100644 index 0000000000000000000000000000000000000000..8eb5b355d38bc0a31e87f5bbb7029d6b7198499f --- /dev/null +++ b/src/box/iproto_features.h @@ -0,0 +1,118 @@ +/* + * SPDX-License-Identifier: BSD-2-Clause + * + * Copyright 2010-2021, Tarantool AUTHORS, please see AUTHORS file. + */ +#pragma once + +#include <assert.h> +#include <stdint.h> +#include <string.h> + +#include "bit/bit.h" + +#if defined(__cplusplus) +extern "C" { +#endif + +/** + * IPROTO protocol feature ids returned by the IPROTO_ID command. + */ +enum iproto_feature_id { + /** + * Streams support: IPROTO_STREAM_ID header key. + */ + IPROTO_FEATURE_STREAMS = 0, + /** + * Transactions in the protocol: + * IPROTO_BEGIN, IPROTO_COMMIT, IPROTO_ROLLBACK commands. + */ + IPROTO_FEATURE_TRANSACTIONS = 1, + iproto_feature_id_MAX, +}; + +/** + * IPROTO protocol feature bit map. + */ +struct iproto_features { + char bits[BITMAP_SIZE(iproto_feature_id_MAX)]; +}; + +/** + * Current IPROTO protocol version returned by the IPROTO_ID command. + * It should be incremented every time a new feature is added or removed. + */ +enum { + IPROTO_CURRENT_VERSION = 1, +}; + +/** + * Current IPROTO protocol features returned by the IPROTO_ID command. + */ +extern struct iproto_features IPROTO_CURRENT_FEATURES; + +/** + * Initializes a IPROTO protocol feature bit map with all zeros. + */ +static inline void +iproto_features_create(struct iproto_features *features) +{ + memset(features, 0, sizeof(*features)); +} + +/** + * Sets a bit in a IPROTO protocol feature bit map. + */ +static inline void +iproto_features_set(struct iproto_features *features, int id) +{ + assert(id >= 0 && id < iproto_feature_id_MAX); + bit_set(features->bits, id); +} + +/** + * Returns true if a feature is set in a IPROTO protocol feature bit map. + */ +static inline bool +iproto_features_test(const struct iproto_features *features, int id) +{ + assert(id >= 0 && id < iproto_feature_id_MAX); + return bit_test(features->bits, id); +} + +/** + * Iterates over all feature ids set in a IPROTO protocol featreus bit map. + */ +#define iproto_features_foreach(features, id) \ + for (int id = 0; id < iproto_feature_id_MAX; id++) \ + if (iproto_features_test((features), id)) + +/** + * Returns the size of a IPROTO protocol feature bit map encoded in msgpack. + */ +uint32_t +mp_sizeof_iproto_features(const struct iproto_features *features); + +/** + * Encodes a IPROTO protocol feature bit map in msgpack. + * Returns a pointer to the byte following the end of the encoded data. + */ +char * +mp_encode_iproto_features(char *data, const struct iproto_features *features); + +/** + * Decodes a IPROTO protocol features bit map from msgpack. + * Advances the data pointer. Returns 0 on success, -1 on failure. + */ +int +mp_decode_iproto_features(const char **data, struct iproto_features *features); + +/** + * Initializes this module. + */ +void +iproto_features_init(void); + +#if defined(__cplusplus) +} /* extern "C" */ +#endif diff --git a/src/box/xrow.c b/src/box/xrow.c index 3cad2c1bec7f337fe33a712e57076fdb7345434e..9bdd5944ac45e15af67554a72018a7a6fb9182de 100644 --- a/src/box/xrow.c +++ b/src/box/xrow.c @@ -42,6 +42,7 @@ #include "mp_error.h" #include "scramble.h" #include "iproto_constants.h" +#include "iproto_features.h" #include "mpstream/mpstream.h" #include "errinj.h" @@ -423,6 +424,35 @@ iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version) return 0; } +int +iproto_reply_id(struct obuf *out, uint64_t sync, uint32_t schema_version) +{ + size_t size = IPROTO_HEADER_LEN; + size += mp_sizeof_map(2); + size += mp_sizeof_uint(IPROTO_VERSION); + size += mp_sizeof_uint(IPROTO_CURRENT_VERSION); + size += mp_sizeof_uint(IPROTO_FEATURES); + size += mp_sizeof_iproto_features(&IPROTO_CURRENT_FEATURES); + + char *buf = obuf_alloc(out, size); + if (buf == NULL) { + diag_set(OutOfMemory, size, "obuf_alloc", "buf"); + return -1; + } + + char *data = buf + IPROTO_HEADER_LEN; + data = mp_encode_map(data, 2); + data = mp_encode_uint(data, IPROTO_VERSION); + data = mp_encode_uint(data, IPROTO_CURRENT_VERSION); + data = mp_encode_uint(data, IPROTO_FEATURES); + data = mp_encode_iproto_features(data, &IPROTO_CURRENT_FEATURES); + assert(size == (size_t)(data - buf)); + + iproto_header_encode(buf, IPROTO_OK, sync, schema_version, + size - IPROTO_HEADER_LEN); + return 0; +} + int iproto_reply_vclock(struct obuf *out, const struct vclock *vclock, uint64_t sync, uint32_t schema_version) @@ -903,6 +933,53 @@ xrow_encode_dml(const struct request *request, struct region *region, return iovcnt; } +int +xrow_decode_id(const struct xrow_header *row, struct id_request *request) +{ + if (row->bodycnt == 0) { + diag_set(ClientError, ER_INVALID_MSGPACK, "request body"); + return -1; + } + + assert(row->bodycnt == 1); + const char *data = (const char *)row->body[0].iov_base; + const char *end = data + row->body[0].iov_len; + const char *p = data; + if (mp_check(&p, end) != 0 || mp_typeof(*data) != MP_MAP) + goto error; + + request->version = 0; + iproto_features_create(&request->features); + + p = data; + uint32_t map_size = mp_decode_map(&p); + for (uint32_t i = 0; i < map_size; i++) { + if (mp_typeof(*p) != MP_UINT) + goto error; + uint64_t key = mp_decode_uint(&p); + if (key >= IPROTO_KEY_MAX || + iproto_key_type[key] != mp_typeof(*p)) + goto error; + switch (key) { + case IPROTO_VERSION: + request->version = mp_decode_uint(&p); + break; + case IPROTO_FEATURES: + if (mp_decode_iproto_features( + &p, &request->features) != 0) + goto error; + break; + default: + /* Ignore unknown keys for forward compatibility. */ + mp_next(&p); + } + } + return 0; +error: + xrow_on_decode_err(data, end, ER_INVALID_MSGPACK, "request body"); + return -1; +} + void xrow_encode_synchro(struct xrow_header *row, char *body, const struct synchro_request *req) diff --git a/src/box/xrow.h b/src/box/xrow.h index d32dcbc0d7c4c86b3602791baf1f3b7d31b1de40..b7acedc93203394df8967ba28ed11cfc3f1d36b9 100644 --- a/src/box/xrow.h +++ b/src/box/xrow.h @@ -37,6 +37,7 @@ #include "uuid/tt_uuid.h" #include "diag.h" +#include "iproto_features.h" #include "vclock/vclock.h" #if defined(__cplusplus) @@ -227,6 +228,26 @@ int xrow_encode_dml(const struct request *request, struct region *region, struct iovec *iov); +/** + * IPROTO_ID request/response. + */ +struct id_request { + /** IPROTO protocol version. */ + uint64_t version; + /** IPROTO protocol features. */ + struct iproto_features features; +}; + +/** + * Decode IPROTO_ID request from a given MessagePack map. + * @param row request header. + * @param[out] request IPROTO_ID request to decode to. + * @retval 0 on success + * @retval -1 on error + */ +int +xrow_decode_id(const struct xrow_header *xrow, struct id_request *request); + /** * Synchronous replication request - confirmation or rollback of * pending synchronous transactions. @@ -647,6 +668,19 @@ iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, int iproto_reply_ok(struct obuf *out, uint64_t sync, uint32_t schema_version); +/** + * Encode iproto header with IPROTO_OK response code and protocol features + * in the body. + * @param out Encode to. + * @param sync Request sync. + * @param schema_version. + * + * @retval 0 Success. + * @retval -1 Memory error. + */ +int +iproto_reply_id(struct obuf *out, uint64_t sync, uint32_t schema_version); + /** * Encode iproto header with IPROTO_OK response code and vclock * in the body. @@ -1037,6 +1071,14 @@ iproto_reply_ok_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) diag_raise(); } +/** @copydoc iproto_reply_id. */ +static inline void +iproto_reply_id_xc(struct obuf *out, uint64_t sync, uint32_t schema_version) +{ + if (iproto_reply_id(out, sync, schema_version) != 0) + diag_raise(); +} + /** @copydoc iproto_reply_vclock. */ static inline void iproto_reply_vclock_xc(struct obuf *out, const struct vclock *vclock, diff --git a/test/box-py/iproto.result b/test/box-py/iproto.result index 82b5c7e556cff07521d3c6ef6d1d865489d2d8b0..e48c54f831126bc1371ca783443c51a62db3072f 100644 --- a/test/box-py/iproto.result +++ b/test/box-py/iproto.result @@ -198,3 +198,16 @@ box.schema.user.revoke('guest', 'read,write,execute', 'universe') space:drop() --- ... + +# +# gh-6253 IPROTO_ID +# + +# Invalid version +Invalid MsgPack - request body +# Invalid features +Invalid MsgPack - request body +# Empty request body +version=1, features=[0, 1] +# Unknown version and features +version=1, features=[0, 1] diff --git a/test/box-py/iproto.test.py b/test/box-py/iproto.test.py index 8bb39bf8f17c5c270a47d70c23aba41a22dbc9f3..f11bd8161c8c50bd272bf8c3aa9d1e521a26158d 100644 --- a/test/box-py/iproto.test.py +++ b/test/box-py/iproto.test.py @@ -11,6 +11,12 @@ from tarantool.request import Request, RequestInsert, RequestSelect, RequestUpda from tarantool.response import Response from lib.tarantool_connection import TarantoolConnection +# FIXME: Remove after the new constants are added to the Python connector. +if not 'REQUEST_TYPE_ID' in locals(): + REQUEST_TYPE_ID = 73 + IPROTO_VERSION = 0x54 + IPROTO_FEATURES = 0x55 + admin("box.schema.user.grant('guest', 'read,write,execute', 'universe')") print(""" @@ -446,3 +452,28 @@ admin("box.schema.user.revoke('guest', 'read,write,execute', 'universe')") admin("space:drop()") +print(""" +# +# gh-6253 IPROTO_ID +# +""") +c = Connection("localhost", server.iproto.port) +c.connect() +s = c._socket +header = { IPROTO_CODE: REQUEST_TYPE_ID } +print("# Invalid version") +resp = test_request(header, { IPROTO_VERSION: "abc" }) +print(str(resp["body"][IPROTO_ERROR].decode("utf-8"))) +print("# Invalid features") +resp = test_request(header, { IPROTO_FEATURES: ["abc"] }) +print(str(resp["body"][IPROTO_ERROR].decode("utf-8"))) +print("# Empty request body") +resp = test_request(header, {}) +print("version={}, features={}".format( + resp["body"][IPROTO_VERSION], resp["body"][IPROTO_FEATURES])) +print("# Unknown version and features") +resp = test_request(header, { IPROTO_VERSION: 99999999, + IPROTO_FEATURES: [99999999] }) +print("version={}, features={}".format( + resp["body"][IPROTO_VERSION], resp["body"][IPROTO_FEATURES])) +c.close()