diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 08a9eec5eea69cf81453aa6ef93fd71db41fe3a2..bef3d928599d4b02f47441cda1c6e40f66c78c02 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -18,8 +18,9 @@ set_property(DIRECTORY PROPERTY ADDITIONAL_MAKE_CLEAN_FILES ${lua_sources}) add_library(box iproto.cc - iproto_constants.cc + iproto_constants.c iproto_port.cc + xrow.cc tuple.cc tuple_convert.cc tuple_update.cc diff --git a/src/box/iproto.cc b/src/box/iproto.cc index eefc3b7130d6fb12a5566029f8ea280194004f03..87761cbfbbe01d7ecd25f9f625fe7dcd3d61b34e 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -42,10 +42,12 @@ #include "scoped_guard.h" #include "memory.h" #include "msgpuck/msgpuck.h" -#include "box/replication.h" -#include "box/session.h" +#include "replication.h" +#include "session.h" #include "third_party/base64.h" #include "coio.h" +#include "xrow.h" +#include "iproto_constants.h" class IprotoConnectionShutdown: public Exception { diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c new file mode 100644 index 0000000000000000000000000000000000000000..a0893501f7cf4bcb4e31acf23663fc7e05e1acb1 --- /dev/null +++ b/src/box/iproto_constants.c @@ -0,0 +1,154 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "iproto_constants.h" + +const unsigned char iproto_key_type[IPROTO_KEY_MAX] = +{ + /* {{{ header */ + /* 0x00 */ MP_UINT, /* IPROTO_REQUEST_TYPE */ + /* 0x01 */ MP_UINT, /* IPROTO_SYNC */ + /* 0x02 */ MP_UINT, /* IPROTO_SERVER_ID */ + /* 0x03 */ MP_UINT, /* IPROTO_LSN */ + /* 0x04 */ MP_DOUBLE, /* IPROTO_TIMESTAMP */ + /* }}} */ + + /* {{{ unused */ + /* 0x05 */ MP_UINT, + /* 0x06 */ MP_UINT, + /* 0x07 */ MP_UINT, + /* 0x08 */ MP_UINT, + /* 0x09 */ MP_UINT, + /* 0x0a */ MP_UINT, + /* 0x0b */ MP_UINT, + /* 0x0c */ MP_UINT, + /* 0x0d */ MP_UINT, + /* 0x0e */ MP_UINT, + /* 0x0f */ MP_UINT, + /* }}} */ + + /* {{{ body -- integer keys */ + /* 0x10 */ MP_UINT, /* IPROTO_SPACE_ID */ + /* 0x11 */ MP_UINT, /* IPROTO_INDEX_ID */ + /* 0x12 */ MP_UINT, /* IPROTO_LIMIT */ + /* 0x13 */ MP_UINT, /* IPROTO_OFFSET */ + /* 0x14 */ MP_UINT, /* IPROTO_ITERATOR */ + /* }}} */ + + /* {{{ unused */ + /* 0x15 */ MP_UINT, + /* 0x16 */ MP_UINT, + /* 0x17 */ MP_UINT, + /* 0x18 */ MP_UINT, + /* 0x19 */ MP_UINT, + /* 0x1a */ MP_UINT, + /* 0x1b */ MP_UINT, + /* 0x1c */ MP_UINT, + /* 0x1d */ MP_UINT, + /* 0x1e */ MP_UINT, + /* 0x1f */ MP_UINT, + /* }}} */ + + /* {{{ body -- all keys */ + /* 0x20 */ MP_ARRAY, /* IPROTO_KEY */ + /* 0x21 */ MP_ARRAY, /* IPROTO_TUPLE */ + /* 0x22 */ MP_STR, /* IPROTO_FUNCTION_NAME */ + /* 0x23 */ MP_STR, /* IPROTO_USER_NAME */ + /* 0x24 */ MP_STR, /* IPROTO_SERVER_UUID */ + /* 0x25 */ MP_STR, /* IPROTO_CLUSTER_UUID */ + /* 0x26 */ MP_MAP, /* IPROTO_VCLOCK */ + /* }}} */ +}; + +const char *iproto_type_strs[] = +{ + NULL, + "SELECT", + "INSERT", + "REPLACE", + "UPDATE", + "DELETE", + "CALL", + "AUTH" +}; + +#define bit(c) (1ULL<<IPROTO_##c) +const uint64_t iproto_body_key_map[IPROTO_TYPE_DML_MAX] = { + 0, /* unused */ + bit(SPACE_ID) | bit(LIMIT) | bit(KEY), /* SELECT */ + bit(SPACE_ID) | bit(TUPLE), /* INSERT */ + bit(SPACE_ID) | bit(TUPLE), /* REPLACE */ + bit(SPACE_ID) | bit(KEY) | bit(TUPLE), /* UPDATE */ + bit(SPACE_ID) | bit(KEY), /* DELETE */ + bit(FUNCTION_NAME) | bit(TUPLE), /* CALL */ + bit(USER_NAME) | bit(TUPLE) /* AUTH */ +}; +#undef bit + +const char *iproto_key_strs[IPROTO_KEY_MAX] = { + "type", /* 0x00 */ + "sync", /* 0x01 */ + "server_id", /* 0x02 */ + "lsn", /* 0x03 */ + "timestamp", /* 0x04 */ + "", /* 0x05 */ + "", /* 0x06 */ + "", /* 0x07 */ + "", /* 0x08 */ + "", /* 0x09 */ + "", /* 0x0a */ + "", /* 0x0b */ + "", /* 0x0c */ + "", /* 0x0d */ + "", /* 0x0e */ + "", /* 0x0f */ + "space_id", /* 0x10 */ + "index_id", /* 0x11 */ + "limit", /* 0x12 */ + "offset", /* 0x13 */ + "iterator", /* 0x14 */ + "", /* 0x15 */ + "", /* 0x16 */ + "", /* 0x17 */ + "", /* 0x18 */ + "", /* 0x19 */ + "", /* 0x1a */ + "", /* 0x1b */ + "", /* 0x1c */ + "", /* 0x1d */ + "", /* 0x1e */ + "", /* 0x1f */ + "key", /* 0x20 */ + "tuple", /* 0x21 */ + "function name", /* 0x22 */ + "user name", /* 0x23 */ + "server UUID" /* 0x24 */ + "cluster UUID" /* 0x25 */ + "vector clock" /* 0x26 */ +}; + diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index 0f79ed989b7f6c642847f2ea0d87a42509cdef46..52f470d9ce2cfa09b1df7c14be1542307e1461b1 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -30,7 +30,6 @@ */ #include <stdbool.h> #include <stdint.h> -#include <sys/uio.h> /* struct iovec */ #include <msgpuck/msgpuck.h> #if defined(__cplusplus) @@ -41,8 +40,8 @@ enum { /** Maximal iproto package body length (2GiB) */ IPROTO_BODY_LEN_MAX = 2147483648UL, IPROTO_GREETING_SIZE = 128, - IPROTO_FIXHEADER_SIZE = 5, /* len + (padding) */ - XLOG_FIXHEADER_SIZE = 19 /* marker + len + prev crc32 + cur crc32 + (padding) */ + /** marker + len + prev crc32 + cur crc32 + (padding) */ + XLOG_FIXHEADER_SIZE = 19 }; @@ -161,132 +160,6 @@ iproto_type_is_error(uint32_t type) return (type & IPROTO_TYPE_ERROR) != 0; } -enum { - IPROTO_PACKET_HEAD_IOVMAX = 1, - IPROTO_PACKET_BODY_IOVMAX = 2, - IPROTO_PACKET_IOVMAX = IPROTO_PACKET_HEAD_IOVMAX + - IPROTO_PACKET_BODY_IOVMAX -}; - -enum { IPROTO_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 }; - - -struct iproto_header { - uint32_t type; - uint32_t server_id; - uint64_t sync; - uint64_t lsn; - double tm; - - int bodycnt; - struct iovec body[IPROTO_PACKET_BODY_IOVMAX]; -}; - -void -iproto_header_decode(struct iproto_header *header, - const char **pos, const char *end); -struct tt_uuid; - -void -iproto_decode_uuid(const char **pos, struct tt_uuid *out); - -char * -iproto_encode_uuid(char *pos, const struct tt_uuid *in); - -/** Return a 4-byte numeric error code, with status flags. */ -static inline uint32_t -iproto_encode_error(uint32_t error) -{ - return error | IPROTO_TYPE_ERROR; -} - -int -iproto_header_encode(const struct iproto_header *header, - struct iovec *out); - -int -iproto_row_encode(const struct iproto_header *row, struct iovec *out); - -/** - * \brief Decode ERROR and re-throw it as ClientError exception - * \param row - */ -void -iproto_decode_error(struct iproto_header *row); - -/** - * \brief Encode AUTH command - * \param[out] row - * \param greeting - IPROTO greeting - * \param login - user login - * \param password - user password - */ -void -iproto_encode_auth(struct iproto_header *row, const char *greeting, - const char *login, const char *password); - -/** - * \brief Encode SUBSCRIBE command - * \param row[out] - * \param cluster_uuid cluster uuid - * \param server_uuid server uuid - * \param vclock server vclock - */ -void -iproto_encode_subscribe(struct iproto_header *row, - const struct tt_uuid *cluster_uuid, - const struct tt_uuid *server_uuid, - const struct vclock *vclock); - -/** - * \brief Decode SUBSCRIBE command - * \param row - * \param[out] cluster_uuid - * \param[out] server_uuid - * \param[out] vclock - */ -void -iproto_decode_subscribe(struct iproto_header *row, struct tt_uuid *cluster_uuid, - struct tt_uuid *server_uuid, struct vclock *vclock); - -/** - * \brief Encode JOIN command - * \param[out] row - * \param server_uuid - */ -void -iproto_encode_join(struct iproto_header *row, const struct tt_uuid *server_uuid); - -/** - * \brief Decode JOIN command - * \param row - * \param[out] server_uuid - */ -static inline void -iproto_decode_join(struct iproto_header *row, struct tt_uuid *server_uuid) -{ - return iproto_decode_subscribe(row, NULL, server_uuid, NULL); -} - -/** - * \brief Encode end of stream command (a response to JOIN command) - * \param row[out] - * \param vclock - */ -void -iproto_encode_eos(struct iproto_header *row, const struct vclock *vclock); - -/** - * \brief Decode end of stream command (a response to JOIN command) - * \param row - * \param[out] vclock - */ -static inline void -iproto_decode_eos(struct iproto_header *row, struct vclock *vclock) -{ - return iproto_decode_subscribe(row, NULL, NULL, vclock); -} - #if defined(__cplusplus) } /* extern "C" */ #endif diff --git a/src/box/iproto_port.cc b/src/box/iproto_port.cc index df15328626616bdc0c921a5bb0bbdcb08226a6b0..b1f227dd1d36d1f2ce459f9ca582ee2f4a6c3174 100644 --- a/src/box/iproto_port.cc +++ b/src/box/iproto_port.cc @@ -27,7 +27,7 @@ * SUCH DAMAGE. */ #include "iproto_port.h" - +#include "iproto_constants.h" /* m_ - msgpack meta, k_ - key, v_ - value */ struct iproto_header_bin { @@ -61,6 +61,13 @@ static const struct iproto_body_bin iproto_error_bin = { 0x81, IPROTO_ERROR, 0xdb, 0 }; +/** Return a 4-byte numeric error code, with status flags. */ +static inline uint32_t +iproto_encode_error(uint32_t error) +{ + return error | IPROTO_TYPE_ERROR; +} + void iproto_reply_ping(struct obuf *out, uint64_t sync) { diff --git a/src/box/iproto_port.h b/src/box/iproto_port.h index 0c45798ceac4da8d2d8ffbd0cee05acf07737fee..0613854abf911c67eb00b04c6ccb513f3aaad344 100644 --- a/src/box/iproto_port.h +++ b/src/box/iproto_port.h @@ -28,13 +28,12 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include "box/box.h" -#include "box/request.h" -#include "box/port.h" -#include "box/tuple.h" +#include "box.h" +#include "request.h" +#include "port.h" +#include "tuple.h" #include "iobuf.h" #include "msgpuck/msgpuck.h" -#include "iproto_constants.h" /** * struct iproto_port users need to be careful to: diff --git a/src/box/log_io.cc b/src/box/log_io.cc index 69e6d1660d96a257229c9f05d4e026eaeb7aeef4..ff6f7abffb964e4e868ea31d560a0c16139ed8b2 100644 --- a/src/box/log_io.cc +++ b/src/box/log_io.cc @@ -36,11 +36,11 @@ #include "third_party/tarantool_eio.h" #include "fiob.h" #include "msgpuck/msgpuck.h" -#include "iproto_constants.h" #include "scoped_guard.h" #define MH_UNDEF 1 /* conflicts with mh_nodeids_t */ #include "recovery.h" /* for mh_cluster */ #include "vclock.h" +#include "iproto_constants.h" /* * marker is MsgPack fixext2 diff --git a/src/box/log_io.h b/src/box/log_io.h index 7cc63892b2249f71cc5384a2b098d0d2539acb31..cb56cb5684ece7a6b28742967606f24867cf806d 100644 --- a/src/box/log_io.h +++ b/src/box/log_io.h @@ -34,7 +34,7 @@ #include <sys/uio.h> #include "trivia/util.h" #include "third_party/tarantool_ev.h" -#include "iproto_constants.h" +#include "xrow.h" #include "tt_uuid.h" #include "vclock.h" diff --git a/src/box/recovery.cc b/src/box/recovery.cc index 55de154ccb0db67552366ca60ebbf202b7b59b29..b702e76a021f02a955619cd440c67964cd685e16 100644 --- a/src/box/recovery.cc +++ b/src/box/recovery.cc @@ -42,7 +42,7 @@ #include "replica.h" #include "fiber.h" #include "msgpuck/msgpuck.h" -#include "iproto_constants.h" +#include "xrow.h" #include "crc32.h" #include "scoped_guard.h" #include "box/cluster.h" diff --git a/src/box/replica.cc b/src/box/replica.cc index bb9a9975876aa862eaa310bf33807b0363297407..e06c3bae939a1373e544b419abf6687904dac9e9 100644 --- a/src/box/replica.cc +++ b/src/box/replica.cc @@ -38,10 +38,11 @@ #include "scoped_guard.h" #include "coio_buf.h" #include "recovery.h" -#include "iproto_constants.h" +#include "xrow.h" #include "msgpuck/msgpuck.h" #include "session.h" #include "box/cluster.h" +#include "iproto_constants.h" static const int RECONNECT_DELAY = 1.0; @@ -52,23 +53,22 @@ remote_read_row(struct ev_io *coio, struct iobuf *iobuf, struct ibuf *in = &iobuf->in; /* Read fixed header */ - if (ibuf_size(in) < IPROTO_FIXHEADER_SIZE) - coio_breadn(coio, in, IPROTO_FIXHEADER_SIZE - ibuf_size(in)); + if (ibuf_size(in) < 1) + coio_breadn(coio, in, 1); /* Read length */ if (mp_typeof(*in->pos) != MP_UINT) { tnt_raise(ClientError, ER_INVALID_MSGPACK, - "invalid fixed header"); + "packet length"); } + ssize_t to_read = mp_check_uint(in->pos, in->end); + if (to_read > 0) + coio_breadn(coio, in, to_read); uint32_t len = mp_decode_uint((const char **) &in->pos); - if (len > IPROTO_BODY_LEN_MAX) { - tnt_raise(ClientError, ER_INVALID_MSGPACK, - "received packet is too big"); - } /* Read header and body */ - ssize_t to_read = len - ibuf_size(in); + to_read = len - ibuf_size(in); if (to_read > 0) coio_breadn(coio, in, to_read); diff --git a/src/box/request.h b/src/box/request.h index 3e06650310a91ffa8876c0cd858cf81868d9d281..266004e5d6aed69d1299d66b4e8cbc9eeac0884a 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -29,13 +29,12 @@ * SUCH DAMAGE. */ #include <stdbool.h> -#include "iproto_constants.h" +#include "xrow.h" struct txn; struct port; typedef void (*request_execute_f)(struct request *, struct port *); -enum { REQUEST_IOVMAX = IPROTO_PACKET_BODY_IOVMAX }; struct request { diff --git a/src/box/txn.cc b/src/box/txn.cc index 6a92204bb936576533ebe9d197236f64bd70c0ff..0a9f6b6c01f9dea2d334fbd6c3b19d6e46b18919 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -36,6 +36,7 @@ #include "request.h" /* for request_name */ #include "session.h" #include "port.h" +#include "iproto_constants.h" double too_long_threshold; diff --git a/src/box/iproto_constants.cc b/src/box/xrow.cc similarity index 73% rename from src/box/iproto_constants.cc rename to src/box/xrow.cc index b9207d9eb92d475dc26f3b7e27f490404f809edb..221bbbe1f86fd5c41854bf5f2aa5ff8500c4ca95 100644 --- a/src/box/iproto_constants.cc +++ b/src/box/xrow.cc @@ -26,139 +26,15 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include "iproto_constants.h" +#include "xrow.h" #include "msgpuck/msgpuck.h" #include "exception.h" #include "fiber.h" -#include "crc32.h" #include "tt_uuid.h" -#include "box/vclock.h" +#include "vclock.h" #include "scramble.h" #include "third_party/base64.h" - -const unsigned char iproto_key_type[IPROTO_KEY_MAX] = -{ - /* {{{ header */ - /* 0x00 */ MP_UINT, /* IPROTO_REQUEST_TYPE */ - /* 0x01 */ MP_UINT, /* IPROTO_SYNC */ - /* 0x02 */ MP_UINT, /* IPROTO_SERVER_ID */ - /* 0x03 */ MP_UINT, /* IPROTO_LSN */ - /* 0x04 */ MP_DOUBLE, /* IPROTO_TIMESTAMP */ - /* }}} */ - - /* {{{ unused */ - /* 0x05 */ MP_UINT, - /* 0x06 */ MP_UINT, - /* 0x07 */ MP_UINT, - /* 0x08 */ MP_UINT, - /* 0x09 */ MP_UINT, - /* 0x0a */ MP_UINT, - /* 0x0b */ MP_UINT, - /* 0x0c */ MP_UINT, - /* 0x0d */ MP_UINT, - /* 0x0e */ MP_UINT, - /* 0x0f */ MP_UINT, - /* }}} */ - - /* {{{ body -- integer keys */ - /* 0x10 */ MP_UINT, /* IPROTO_SPACE_ID */ - /* 0x11 */ MP_UINT, /* IPROTO_INDEX_ID */ - /* 0x12 */ MP_UINT, /* IPROTO_LIMIT */ - /* 0x13 */ MP_UINT, /* IPROTO_OFFSET */ - /* 0x14 */ MP_UINT, /* IPROTO_ITERATOR */ - /* }}} */ - - /* {{{ unused */ - /* 0x15 */ MP_UINT, - /* 0x16 */ MP_UINT, - /* 0x17 */ MP_UINT, - /* 0x18 */ MP_UINT, - /* 0x19 */ MP_UINT, - /* 0x1a */ MP_UINT, - /* 0x1b */ MP_UINT, - /* 0x1c */ MP_UINT, - /* 0x1d */ MP_UINT, - /* 0x1e */ MP_UINT, - /* 0x1f */ MP_UINT, - /* }}} */ - - /* {{{ body -- all keys */ - /* 0x20 */ MP_ARRAY, /* IPROTO_KEY */ - /* 0x21 */ MP_ARRAY, /* IPROTO_TUPLE */ - /* 0x22 */ MP_STR, /* IPROTO_FUNCTION_NAME */ - /* 0x23 */ MP_STR, /* IPROTO_USER_NAME */ - /* 0x24 */ MP_STR, /* IPROTO_SERVER_UUID */ - /* 0x25 */ MP_STR, /* IPROTO_CLUSTER_UUID */ - /* 0x26 */ MP_MAP, /* IPROTO_VCLOCK */ - /* }}} */ -}; - -const char *iproto_type_strs[] = -{ - NULL, - "SELECT", - "INSERT", - "REPLACE", - "UPDATE", - "DELETE", - "CALL", - "AUTH" -}; - -#define bit(c) (1ULL<<IPROTO_##c) -const uint64_t iproto_body_key_map[IPROTO_TYPE_DML_MAX] = { - 0, /* unused */ - bit(SPACE_ID) | bit(LIMIT) | bit(KEY), /* SELECT */ - bit(SPACE_ID) | bit(TUPLE), /* INSERT */ - bit(SPACE_ID) | bit(TUPLE), /* REPLACE */ - bit(SPACE_ID) | bit(KEY) | bit(TUPLE), /* UPDATE */ - bit(SPACE_ID) | bit(KEY), /* DELETE */ - bit(FUNCTION_NAME) | bit(TUPLE), /* CALL */ - bit(USER_NAME) | bit(TUPLE) /* AUTH */ -}; -#undef bit - -const char *iproto_key_strs[IPROTO_KEY_MAX] = { - "type", /* 0x00 */ - "sync", /* 0x01 */ - "server_id", /* 0x02 */ - "lsn", /* 0x03 */ - "timestamp", /* 0x04 */ - "", /* 0x05 */ - "", /* 0x06 */ - "", /* 0x07 */ - "", /* 0x08 */ - "", /* 0x09 */ - "", /* 0x0a */ - "", /* 0x0b */ - "", /* 0x0c */ - "", /* 0x0d */ - "", /* 0x0e */ - "", /* 0x0f */ - "space_id", /* 0x10 */ - "index_id", /* 0x11 */ - "limit", /* 0x12 */ - "offset", /* 0x13 */ - "iterator", /* 0x14 */ - "", /* 0x15 */ - "", /* 0x16 */ - "", /* 0x17 */ - "", /* 0x18 */ - "", /* 0x19 */ - "", /* 0x1a */ - "", /* 0x1b */ - "", /* 0x1c */ - "", /* 0x1d */ - "", /* 0x1e */ - "", /* 0x1f */ - "key", /* 0x20 */ - "tuple", /* 0x21 */ - "function name", /* 0x22 */ - "user name", /* 0x23 */ - "server UUID" /* 0x24 */ - "cluster UUID" /* 0x25 */ - "vector clock" /* 0x26 */ -}; +#include "iproto_constants.h" void iproto_header_decode(struct iproto_header *header, const char **pos, @@ -289,22 +165,19 @@ int iproto_row_encode(const struct iproto_header *row, struct iovec *out) { + static const int iov0_len = mp_sizeof_uint(UINT32_MAX); int iovcnt = iproto_header_encode(row, out + 1) + 1; - char *fixheader = (char *) - region_alloc(&fiber()->gc, IPROTO_FIXHEADER_SIZE); + char *fixheader = (char *) region_alloc(&fiber()->gc, iov0_len); uint32_t len = 0; for (int i = 1; i < iovcnt; i++) len += out[i].iov_len; /* Encode length */ - assert(IPROTO_FIXHEADER_SIZE == mp_sizeof_uint(UINT32_MAX)); char *data = fixheader; *(data++) = 0xce; /* MP_UINT32 */ *(uint32_t *) data = mp_bswap_u32(len); - data += sizeof(uint32_t); - assert(data == fixheader + IPROTO_FIXHEADER_SIZE); out[0].iov_base = fixheader; - out[0].iov_len = IPROTO_FIXHEADER_SIZE; + out[0].iov_len = iov0_len; assert(iovcnt <= IPROTO_ROW_IOVMAX); return iovcnt; diff --git a/src/box/xrow.h b/src/box/xrow.h new file mode 100644 index 0000000000000000000000000000000000000000..858c2b7e61ffaf6db6a6b008c5bf1e98e2ec722a --- /dev/null +++ b/src/box/xrow.h @@ -0,0 +1,162 @@ +#ifndef TARANTOOL_XROW_H_INCLUDED +#define TARANTOOL_XROW_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include <stdint.h> +#include <stddef.h> +#include <sys/uio.h> /* struct iovec */ + +#if defined(__cplusplus) +extern "C" { +#endif + +enum { + IPROTO_PACKET_HEAD_IOVMAX = 1, + IPROTO_PACKET_BODY_IOVMAX = 2, + IPROTO_PACKET_IOVMAX = IPROTO_PACKET_HEAD_IOVMAX + + IPROTO_PACKET_BODY_IOVMAX +}; + +enum { IPROTO_ROW_IOVMAX = IPROTO_PACKET_IOVMAX + 1 }; + + +struct iproto_header { + uint32_t type; + uint32_t server_id; + uint64_t sync; + uint64_t lsn; + double tm; + + int bodycnt; + struct iovec body[IPROTO_PACKET_BODY_IOVMAX]; +}; + +void +iproto_header_decode(struct iproto_header *header, + const char **pos, const char *end); +struct tt_uuid; + +void +iproto_decode_uuid(const char **pos, struct tt_uuid *out); + +char * +iproto_encode_uuid(char *pos, const struct tt_uuid *in); + +int +iproto_header_encode(const struct iproto_header *header, + struct iovec *out); + +int +iproto_row_encode(const struct iproto_header *row, struct iovec *out); + +/** + * \brief Decode ERROR and re-throw it as ClientError exception + * \param row + */ +void +iproto_decode_error(struct iproto_header *row); + +/** + * \brief Encode AUTH command + * \param[out] row + * \param greeting - IPROTO greeting + * \param login - user login + * \param password - user password + */ +void +iproto_encode_auth(struct iproto_header *row, const char *greeting, + const char *login, const char *password); + +/** + * \brief Encode SUBSCRIBE command + * \param row[out] + * \param cluster_uuid cluster uuid + * \param server_uuid server uuid + * \param vclock server vclock + */ +void +iproto_encode_subscribe(struct iproto_header *row, + const struct tt_uuid *cluster_uuid, + const struct tt_uuid *server_uuid, + const struct vclock *vclock); + +/** + * \brief Decode SUBSCRIBE command + * \param row + * \param[out] cluster_uuid + * \param[out] server_uuid + * \param[out] vclock + */ +void +iproto_decode_subscribe(struct iproto_header *row, struct tt_uuid *cluster_uuid, + struct tt_uuid *server_uuid, struct vclock *vclock); + +/** + * \brief Encode JOIN command + * \param[out] row + * \param server_uuid + */ +void +iproto_encode_join(struct iproto_header *row, const struct tt_uuid *server_uuid); + +/** + * \brief Decode JOIN command + * \param row + * \param[out] server_uuid + */ +static inline void +iproto_decode_join(struct iproto_header *row, struct tt_uuid *server_uuid) +{ + return iproto_decode_subscribe(row, NULL, server_uuid, NULL); +} + +/** + * \brief Encode end of stream command (a response to JOIN command) + * \param row[out] + * \param vclock + */ +void +iproto_encode_eos(struct iproto_header *row, const struct vclock *vclock); + +/** + * \brief Decode end of stream command (a response to JOIN command) + * \param row + * \param[out] vclock + */ +static inline void +iproto_decode_eos(struct iproto_header *row, struct vclock *vclock) +{ + return iproto_decode_subscribe(row, NULL, NULL, vclock); +} + +#if defined(__cplusplus) +} /* extern "C" */ +#endif + +#endif /* TARANTOOL_XROW_H_INCLUDED */