From b28f89094d7f0e1032dc148ba8380d8cfe60c20d Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Mon, 6 Jan 2014 00:20:48 +0400 Subject: [PATCH] Change server reponse format to MessagePack. Update the protoco spec in doc/box-protocol.txt. Update box.net.box with a bunch of hacks. Update the python driver. Remove support for flags variable (BOX_RETURN_TUPLE). It was not tested anywhere! (-> kill) Implemeng Lua msgpack.next() to support streamed processing. Add tests. Fix a bug with messed up aliases of msgpack.dumps() pointing to encode() and loads() pointing to decode() (should be vice versa). --- doc/box-protocol.txt | 324 ++++++++++++++----------------------- src/box/box.cc | 6 +- src/box/lua/box_net.lua | 43 +++-- src/box/lua/call.cc | 5 +- src/box/port.cc | 3 +- src/box/port.h | 6 +- src/box/request.cc | 2 +- src/iproto.cc | 30 +++- src/iproto_port.cc | 114 +++++++++++-- src/iproto_port.h | 67 ++------ src/lua/msgpack.cc | 29 +++- test/box/iproto.test.py | 2 +- test/box/msgpack.result | 55 +++++++ test/box/msgpack.test.lua | 14 ++ test/box/net.box.result | 4 +- test/box/socket.result | 4 +- test/box/socket.test.py | 4 +- test/lib/box_connection.py | 2 +- test/lib/tarantool-python | 2 +- 19 files changed, 393 insertions(+), 323 deletions(-) diff --git a/doc/box-protocol.txt b/doc/box-protocol.txt index 1654bb86ff..dbce2007e4 100644 --- a/doc/box-protocol.txt +++ b/doc/box-protocol.txt @@ -1,4 +1,4 @@ -Tarantool IPROTO protocol. +; Tarantool IPROTO protocol. ; ; The latest version of this document can be found in ; tarantool source tree, doc/box-protocol.txt @@ -9,206 +9,169 @@ Tarantool IPROTO protocol. ; multiple requests via the same connection ; - response format that supports zero-copy writes ; -; The atoms of representation in the protocol include: +; For data structuring and encoding, the protocol uses msgpack +; data format, see http://msgpack.org ; -; int8 - a single 8-bit byte (i.e. an octet) -; -; int32 - a 32-bit integer in little-endian format (Intel x86) -; -; int64 - a 64-bit integer in little-endian format (Intel x86) -; -; msgpack - a valid msgpack sequence for everything else, see -; http://msgpack.org +; Since msgpack uses a variying representation for compound +; data structures, such as arrays and maps, the exact byte +; sequence mandated by msgpack format is omitted in this spec. +; Instead, to specify that some contents is part of a msgpack +; map {} (curly braces) will be used, of a msgpack array - [] +; (square brackets), to define a single key/value pair +; in a map - a semicolon. +; If a piece of data is sent as a msgpack int, "" will be used, +; integers will be denoted without quotes. +; For example, a reply to ping command in this notation looks +; like: +; { len: 0, code: 0, sync: uint } +; The binary layout of reply, according to msgpack speck is: +; 0x83, 0, 0, 1, 0, 3, 0xce, int8, uint8, uint8, uint8 + +; The structure of requests and responses is described below. ; -; All requests and responses utilize the same basic structure: +; All requests and responses utilize the same basic structure <packet> ::= <request> | <response> -<request> ::= <header><request_body> +<request> ::= <len><header><body> +<response> ::= <len><header><body> -; If <return_code> is non-zero, the rest of server -; response is an error message. +; <len> is the length of the packet, in message pack format. +; Implementor note: for simplicity of the implementation, +; the server never "compresses" the packet length, i.e. +; it is always passed as MessagePack 32-bit unsigned int, +; 0xce b4 b3 b2 b1 (5 bytes) ; - -<response> ::= <header><response_body> +<len> ::= message pack unsigned int ; -; <header> has a fixed structure of three 4-byte integers (12 bytes): +; Both <header> and <body> are message pack maps: + +<header> ::= { (<key> : <value>)+ } +<body> ::= { (<key> : <value>)+ } -<header> ::= <type><body_length><request_id> +; They only differ in the allowed set of keys and values, +; and the key defines the type of value that follows. +; If a key is missing, and expects an integer value, +; the missing value is always assumed to be 0. If the +; missing key assumes a string value, the string is assumed +; to be empty. -; <type> represents a request type, a single server command, -; such as PING, SELECT, UPDATE, DELETE, INSERT, etc. -; <type> is replicated intact in the response header. -; The currently supported types are: -; - 13 -- <insert> -; - 17 -- <select> -; - 19 -- <update> -; - 21 -- <delete> -; - 22 -- <call> -; - 65280 -- <ping> -; This list is sparse since a number of old commands -; were deprecated and removed. +<key> ::= <header_key> | <body_key> -<type> ::= <int32> +<header_key> ::= <code> | <sync> ; -; <body_length> tells the sender or receiver the length of data -; that follows the header. If there is no data, <body_length> is 0. -; However, <request_body> is always present. +; <code> is request code or response code ; -; The only exception is <ping>: its request body is empty, and -; so is response. In other words, <ping> request packet -; consists solely of a 12-byte header (65280, 0, 0) -; and gets the same 12-byte header in response. -<body_length> ::= <int32> +<code> ::= 0 ; value is <uint32> -; -; <request_id> is a unique request identifier set by the client, +; Value of <code> key in request can be: +; 0 -- <ping> +; 1 -- <select> +; 2 -- <insert> +; 3 -- <replace> +; 4 -- <update> +; 5 -- <delete> +; 6 -- <call> + +; Value of <code> in response is: +; 0 -- OK +; anything else (32-bit int) - Tarantool error code +; If response <code> is 0 (success), response body contains zero or +; more tuples, otherwise it carries an error message that corresponds +; to the return code. + +; <sync> is a unique request identifier, preserved in the response, ; The identifier is necessary to allow request multiplexing -- ; i.e. sending multiple requests through the same connection ; before fetching a response to any of them. ; The value of the identifier currently bears no meaning to the -; server. Similarly to request <type>, it's simply copied to the -; response header as-is. -; Consequently, <request_id> can be 0 or two requests +; server. Consequently, <sync> can be 0 or two requests ; can have an identical id. -<request_id> ::= <int32> +<sync> ::= 1 ; value is <uint32> -; <request_body> holds actual command data. -; Its format and interpretation are defined by the value of -; request <type>. +<body_key> ::= <request_key> | <response_key> -<request_body> ::= <select_request_body> | - <insert_request_body> | - <update_request_body> | - <delete_request_body> | - <call_request_body> +; Different request types allow different keys in the body: -; -; <response_body> carries command reply -; If <return_code> is 0 (success), it contains zero or more -; tuples, otherwise it carries an error message that corresponds -; the return code. -; - -<response_body> ::= <return_code><text> | - <0><count><fq_tuple>* +<request_key> ::= <select> | <replace> | <delete> | <update> | <call> -; <select_request_body> (required <header> <type> is 17): -; ; Specify which space to query, which index in the space ; to use, offset in the resulting tuple set (set to 0 for no offset), -; a limit (set to 4294967295 for no limit), and one or several -; keys to use in lookup. When more than one key is given, they -; specify a disjunctive search condition (key1 or key2 or ...). -; -; Note, that <tuple> is mandatory, and tuple <count> -; must be non-zero. -; +; a limit (set to 4294967295 for no limit), and a key to use in lookup. -<select_request_body> ::= <space_no><index_no> - <offset><limit><count><tuple>+ +; Find tuples matching the search pattern +<select> ::= <space> | <index> | <iterator> | <offset> | <limit> | <tuple> -; Space number is a non-negative integer, starting from 0. -; All spaces are defined in the server configuration file, -; and then referred to by numeric id. +; Insert a tuple into the space or replace an existing one. -<space_no> ::= <int32> +<replace> ::= <space> | <index> | <tuple> -; Tarantool supports HASH and TREE indexes. Indexes are -; enumerated similarly to spaces, starting from 0. -; Each space has at least index #0, which defines -; the primary key. +; Insert is similar to replace, but will return a duplicate key +; error if such tuple already exists. +<insert> ::= <space> | <index> | <tuple> -<index_no> ::= <int32> +; Delete a tuple +<delete_key> ::= <space> | <index> | <tuple> -; offset in the result set +; Update a tuple +<udpate> ::= <space> | <index> | <tuple> | <ops> -<offset> ::= <int32> +; Call a stored function +<call> ::= <name> | <tuple> -; limit for the result set +; As can be seen from the grammar some requests have common keys, +; whereas other keys can be present only in a body of a single +; request type. -<limit> ::= <int32> -; key count in the conjunctive set. Tarantool will -; consequently return tuples that match the first key, -; then the second, and so on. If a tuple matches -; more than one key, it's returned twice. <limit> -; thus limits the total number of returned tuples, -; which are not necessarily distinct. +; <space> Space to use in the request +; The find the numeric space id by space name, one +; must first query "_space" system space. +<space> ::= 2 ; value is <uint32> -<count> ::= <int32> +; <index> Index to use in the request +; Similarly to space, to find the numeric index id +; by index name, one must query the "_index" system space. -; -; A tuple that represents a search key simply lists all key -; fields, preceded with key cardinality (number of list -; elements). Each key in <select_request_body> can have a -; different cardinality. +<index> ::= 3 ; value is <uint32> -<tuple> ::= <message_pack_array> +; <tuple> defines the actual argument of the operation +; When present in <select> or <update> it defines a (maybe compound) +; search key, whereas in <replace> it defines the tuple which +; will be inserted into the database. +; In <call> it defines call arguments. +; When request body allows <tuple> as a key, it must always +; be present, since otherwise the request is meaningless. -; -; See http://msgpack.org/ for <message_pack_array> definition -; +<tuple> ::= 4 ; value is an arbitrary msgpack array -<select_response_body> ::= <count><fq_tuple>* +; Only in select, offset in the result set, expects <uint32> value -; -; Tuples returned by the server (we call them "fully qualified") -; are always preceded with calculated information: -; total size of the tuple and number of fields in it. -; This is how the tuple is stored on server side. -; While this information can be often derived from body length, -; it allows the recipient to simplify memory allocation and tuple -; decoding. Certain requests, such as -; <select>, can return more than one tuple. In that case -; fully qualified tuples are also used to identify tuple -; boundaries: in Tarantool, tuples have variable cardinality. -; +; Offset in the result set, <select> only -<fq_tuple> ::= <size><tuple> +<offset> ::= 5 ; value is <uint32> -; length of the variable part of the tuple (all tuple fields) -<size> ::= <int32> +; Limit in the result set, <select> only -; -; It is not possible to insert more than one tuple at a time. -; Thus <insert_request_body> (<header> <type> = 13) simply -; holds one tuple, and which space to put it into. -; +<limit> ::= 6 ; value is <uint32> -<insert_request_body> ::= <space_no><flags><tuple> +; Iterator type to use in search, <select> only -; Flag BOX_RETURN_TUPLE (0x01) indicates -; that it is required to return the inserted tuple back: -; Flag BOX_ADD (0x02) requests that no tuple with the same primary -; key exists in the space. I.e. it turns INSERT into INSERT, -; without this flag INSERT behaves like REPLACE: replaces -; an existing tuple if it is found. -; Flag BOX_REPLACE (0x04) requests that a tuple with the same -; primary key is present in the space. +<iterator> ::= 7 ; the only supported value for now is 0 -<flags> ::= <int32> - -; -; A tuple may already exist. In that case INSERT -; returns 0 for tuple count in response. If BOX_RETURN_TUPLE -; is set, the inserted tuple will be sent back: +; Name of the function to call, +<name> ::= 8 ; value is "string" -<insert_response_body> ::= <count> | <count><fq_tuple> +; Result set, array of tuples, present only if there is no error +; (code = ER_OK) -; <update> request, <type> = 19 is similar to <insert>: -; - <space_no>: same as in <select> or <insert> -; - <flags>, <tuple>: same as in <insert> -; Index number for tuple lookup does not need to be provided, -; since only primary key updates are allowed. -; - <count> specifies possibly zero operation count -; +<ops> ::= 9 ; expects <op_list> as value -<update_request_body> ::= <space_no><flags><tuple><count><operation>+ +<op_list> ::= [ (<operation>)+ ] ; ; Operations are optional and exist solely to allow @@ -265,62 +228,23 @@ Tarantool IPROTO protocol. <op_arg_splice> ::= <message_pack_array(3)> # <offset><length><string> -<offset> ::= <message_pack_uint> -<length> ::= <message_pack_uint> -<string> ::= <message_pack_str> - -; - -<update_response_body> ::= <insert_response_body> - -; -; <delete>, request <type> = 21 -; Similarly to updates, <delete> always uses the -; primary key. -; -<delete_request_body> ::= <space_no><flags><tuple> +; The server always returns a tuple or tuples, when found, on success, +; I.e. on success, response <body> contains <set> key. +; For select/update/delete, it's the tuple that matched +; the search criterion. For <replace>, it's the inserted tuple. +; For <call>, it's whatever the called function returns. -; -; <delete> returns the number of deleted tuples. -; Currently it's always either 0 or 1. -; If BOX_RETURN_TUPLE is specified, and there is an old -; tuple, (count > 0), DELETE returns the deleted tuple. -; +<response_key> = <data> | <error> -<delete_response_body> ::= <count> | <count><fq_tuple> +; Set of tuples in the response +<data> ::= 10 ; value is a message pack array of tuples -; -; CALL request body contains <flags>, <proc_name> -; the number of procedure args, and the arguments themselves, -; which are passed into the procedure as Lua strings. -; -<call_request_body> ::= <flags><proc_name><tuple> +; Error message, present in the response only if there is an error +<error> ::= 11 ; value is a message pack string -; -; <proc_name> is a valid Lua identifier, something like -; 'type', or 'box.process'. -; - -<proc_name> ::= <field> - -; -; In case of an error, <call_response_body> always contains -; <return_code>. If it's not 0, it is followed by an -; error message. Otherwise, the response, just like in case of -; SELECT, is a sequence of <fq_tuple>s. -<call_response_body> ::= <select_response_body> - -; -; The server response, in addition to response header and body, -; contains a return code. It's a 4-byte integer, which has -; a lower 1-byte completion status part, and a higher 3-byte -; error code part. -; - -<return_code> ::= <int32> - -; The completion status is complementary: +; The error <code> consists of the actual error code +; and request completion status, which is complementary: ; it can be deduced from the error code. There are only ; 3 completion status codes in use: ; 0 - success; The only possible error code with this status is @@ -377,6 +301,6 @@ Tarantool IPROTO protocol. ; ; Convenience macros which define hexadecimal constants for ; <int32> return codes (completion status + code) can be found -; in include/iproto.h. -; +; in include/errcode.h. + ; vim: syntax=bnf diff --git a/src/box/box.cc b/src/box/box.cc index 6a00a8572d..beae987f7f 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -70,11 +70,11 @@ struct box_snap_row { } __attribute__((packed)); void -port_send_tuple(struct port *port, struct txn *txn, uint32_t flags) +port_send_tuple(struct port *port, struct txn *txn) { struct tuple *tuple; if ((tuple = txn->new_tuple) || (tuple = txn->old_tuple)) - port_add_tuple(port, tuple, flags); + port_add_tuple(port, tuple); } static void @@ -86,7 +86,7 @@ process_rw(struct port *port, struct request *request) stat_collect(stat_base, request->type, 1); request->execute(request, txn, port); txn_commit(txn); - port_send_tuple(port, txn, request->flags); + port_send_tuple(port, txn); port_eof(port); txn_finish(txn); } catch (const Exception& e) { diff --git a/src/box/lua/box_net.lua b/src/box/lua/box_net.lua index ab7a606af9..831a257f62 100644 --- a/src/box/lua/box_net.lua +++ b/src/box/lua/box_net.lua @@ -243,7 +243,6 @@ box.net.box.new = function(host, port, reconnect_timeout) self.processing[sync] = box.ipc.channel(1) request = box.pack('iiia', op, string.len(request), sync, request) - if timeout ~= nil then timeout = tonumber(timeout) if not self.processing.wch:put(request, timeout) then @@ -264,7 +263,6 @@ box.net.box.new = function(host, port, reconnect_timeout) end self.processing[sync] = nil - -- timeout if res == nil then if op == box.net.PING then @@ -274,19 +272,25 @@ box.net.box.new = function(host, port, reconnect_timeout) end end + local function totuples(t) + res = {} + for k, v in pairs(t) do + table.insert(res, box.tuple.new(v)) + end + return res + end + -- results { status, response } received if res[1] then if op == box.net.PING then return true else - local rop, blen, sync, code, body = - box.unpack('iiiia', res[2]) + local code = res[2] + local body = msgpack.decode(res[3]) if code ~= 0 then - box.raise(code, body) + box.raise(code, body[11]) end - - -- box.unpack('R') unpacks response body for us (tuple) - return box.unpack('R', body) + return unpack(totuples(body[10])) end else error(res[2]) @@ -321,18 +325,8 @@ box.net.box.new = function(host, port, reconnect_timeout) if self.s == nil then return end - local res = { self.s:recv(12) } - if res[4] ~= nil then - self:fatal("Can't read socket: %s", res[3]) - return - end - local header = res[1] - if string.len(header) ~= 12 then - self:fatal("Unexpected eof while reading header") - return - end - - local op, blen, sync = box.unpack('iii', header) + local blen = self.s:recv(5) + blen = msgpack.decode(blen) local body = '' if blen > 0 then @@ -347,7 +341,7 @@ box.net.box.new = function(host, port, reconnect_timeout) return end end - return sync, header .. body + return body end, rfiber = function(self) @@ -364,13 +358,16 @@ box.net.box.new = function(host, port, reconnect_timeout) self.processing.rch:put(true, 0) while not self.closed do - local sync, resp = self:read_response() + local resp = self:read_response() + header, offset = msgpack.next(resp); + code = header[0] + sync = header[1] if sync == nil then break end if self.processing[sync] ~= nil then - self.processing[sync]:put({true, resp}, 0) + self.processing[sync]:put({true, code, resp:sub(offset)}, 0) else print("Unexpected response ", sync) end diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index 7da111459e..56be79d407 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -76,8 +76,7 @@ port_lua(struct port *port) { return (struct port_lua *) port; } */ static void -port_lua_add_tuple(struct port *port, struct tuple *tuple, - uint32_t flags __attribute__((unused))) +port_lua_add_tuple(struct port *port, struct tuple *tuple) { lua_State *L = port_lua(port)->L; try { @@ -107,7 +106,7 @@ port_add_lua_ret(struct port *port, struct lua_State *L, int index) { struct tuple *tuple = lua_totuple(L, index, index); TupleGuard guard(tuple); - port_add_tuple(port, tuple, BOX_RETURN_TUPLE); + port_add_tuple(port, tuple); } /** diff --git a/src/box/port.cc b/src/box/port.cc index 417e716005..9eed967970 100644 --- a/src/box/port.cc +++ b/src/box/port.cc @@ -35,8 +35,7 @@ null_port_eof(struct port *port __attribute__((unused))) static void null_port_add_tuple(struct port *port __attribute__((unused)), - struct tuple *tuple __attribute__((unused)), - uint32_t flags __attribute__((unused))) + struct tuple *tuple __attribute__((unused))) { } diff --git a/src/box/port.h b/src/box/port.h index f4e58d4533..776b35d06a 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -55,7 +55,7 @@ struct port; struct port_vtab { - void (*add_tuple)(struct port *port, struct tuple *tuple, uint32_t flags); + void (*add_tuple)(struct port *port, struct tuple *tuple); /** Must be called in the end of execution of a single request. */ void (*eof)(struct port *port); }; @@ -72,9 +72,9 @@ port_eof(struct port *port) } static inline void -port_add_tuple(struct port *port, struct tuple *tuple, uint32_t flags) +port_add_tuple(struct port *port, struct tuple *tuple) { - (port->vtab->add_tuple)(port, tuple, flags); + (port->vtab->add_tuple)(port, tuple); } /** Reused in port_lua */ diff --git a/src/box/request.cc b/src/box/request.cc index efad65ced2..c19c728396 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -167,7 +167,7 @@ execute_select(const struct request *request, struct txn *txn, continue; } - port_add_tuple(port, tuple, BOX_RETURN_TUPLE); + port_add_tuple(port, tuple); if (limit == ++found) break; diff --git a/src/iproto.cc b/src/iproto.cc index f9e7d51e4b..c10b0f5d45 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -43,8 +43,24 @@ #include "scoped_guard.h" #include "memory.h" +/* + * struct iproto_header and struct iproto_reply_header + * share common prefix {msg_code, len, sync} + */ + +struct iproto_header { + uint32_t msg_code; + uint32_t len; + uint32_t sync; +} __attribute__((packed)); + +static inline struct iproto_header * +iproto(const char *pos) +{ + return (struct iproto_header *) pos; +} + static struct iproto_header dummy_header = { 0, 0, 0 }; -const uint32_t msg_ping = 0; /* {{{ iproto_queue */ @@ -600,20 +616,20 @@ static inline void iproto_reply(struct iproto_port *port, box_process_func callback, struct obuf *out, struct iproto_header *header) { - if (header->msg_code == msg_ping) - return iproto_reply_ping(out, header); + if (header->msg_code == MSG_PING) + return iproto_reply_ping(out, header->sync); /* Make request body point to iproto data */ char *body = (char *) &header[1]; - iproto_port_init(port, out, header); + iproto_port_init(port, out, header->sync); try { struct request request; request_create(&request, header->msg_code, body, header->len); callback((struct port *) port, &request); } catch (const ClientError& e) { - if (port->reply.found) + if (port->found) obuf_rollback_to_svp(out, &port->svp); - iproto_reply_error(out, header, e); + iproto_reply_error(out, e, header->sync); } } @@ -657,7 +673,7 @@ iproto_process_connect(struct iproto_request *request) try { /* connect. */ con->session = session_create(fd, con->cookie); } catch (const ClientError& e) { - iproto_reply_error(&iobuf->out, request->header, e); + iproto_reply_error(&iobuf->out, e, request->header->sync); try { iproto_flush(iobuf, fd, &con->write_pos); } catch (const Exception& e) { diff --git a/src/iproto_port.cc b/src/iproto_port.cc index b6501ce08f..abdb6369a6 100644 --- a/src/iproto_port.cc +++ b/src/iproto_port.cc @@ -28,39 +28,123 @@ */ #include "iproto_port.h" +enum iproto_key { + IPROTO_CODE = 0, + IPROTO_SYNC, + IPROTO_SPACE, + IPROTO_INDEX, + IPROTO_TUPLE, + IPROTO_OFFSET, + IPROTO_LIMIT, + IPROTO_ITERATOR, + IPROTO_NAME, + IPROTO_OPS, + IPROTO_DATA, + IPROTO_ERROR, +}; + +/* m_ - msgpack meta, k_ - key, v_ - value */ +struct iproto_header_bin { + uint8_t m_len; /* MP_UINT32 */ + uint32_t v_len; /* length */ + uint8_t m_header; /* MP_MAP */ + uint8_t k_code; /* IPROTO_CODE */ + uint8_t m_code; /* MP_UINT32 */ + uint32_t v_code; /* response status */ + uint8_t k_sync; /* IPROTO_SYNC */ + uint8_t m_sync; /* MP_UIN32 */ + uint32_t v_sync; /* sync */ +} __attribute__((packed)); + +static const struct iproto_header_bin iproto_header_bin = { + 0xce, 0, 0x82, IPROTO_CODE, 0xce, 0, IPROTO_SYNC, 0xce, 0 +}; + +struct iproto_body_bin { + uint8_t m_body; /* MP_MAP */ + uint8_t k_data; /* IPROTO_DATA or IPROTO_ERROR */ + uint8_t m_data; /* MP_STR or MP_ARRAY */ + uint32_t v_data_len; /* string length of array size */ +} __attribute__((packed)); + +static const struct iproto_body_bin iproto_body_bin = { + 0x81, IPROTO_DATA, 0xdd, 0 +}; + +static const struct iproto_body_bin iproto_error_bin = { + 0x81, IPROTO_ERROR, 0xdb, 0 +}; + +void +iproto_reply_ping(struct obuf *out, uint32_t sync) +{ + struct iproto_header_bin reply = iproto_header_bin; + reply.v_len = mp_bswap_u32(sizeof(iproto_header_bin) - 5); + reply.v_sync = mp_bswap_u32(sync); + obuf_dup(out, &reply, sizeof(reply)); +} + +void +iproto_reply_error(struct obuf *out, const ClientError &e, + uint32_t sync) +{ + uint32_t msg_len = strlen(e.errmsg()); + + struct iproto_header_bin header = iproto_header_bin; + struct iproto_body_bin body = iproto_error_bin; + + uint32_t len = sizeof(header) - 5 + sizeof(body) + msg_len; + header.v_len = mp_bswap_u32(len); + header.v_code = mp_bswap_u32(tnt_errcode_val(e.errcode())); + header.v_sync = mp_bswap_u32(sync); + + body.v_data_len = mp_bswap_u32(msg_len); + + obuf_dup(out, &header, sizeof(header)); + obuf_dup(out, &body, sizeof(body)); + obuf_dup(out, e.errmsg(), msg_len); +} + static inline struct iproto_port * iproto_port(struct port *port) { return (struct iproto_port *) port; } +const uint32_t SVP_SIZE = sizeof(iproto_header_bin) + + sizeof(iproto_body_bin); + static inline void iproto_port_eof(struct port *ptr) { struct iproto_port *port = iproto_port(ptr); /* found == 0 means add_tuple wasn't called at all. */ - if (port->reply.found == 0) { - port->reply.hdr.len = sizeof(port->reply) - - sizeof(port->reply.hdr); - obuf_dup(port->buf, &port->reply, sizeof(port->reply)); - } else { - port->reply.hdr.len = obuf_size(port->buf) - port->svp.size - - sizeof(port->reply.hdr); - memcpy(obuf_svp_to_ptr(port->buf, &port->svp), - &port->reply, sizeof(port->reply)); - } + if (port->found == 0) + port->svp = obuf_book(port->buf, SVP_SIZE); + + uint32_t len = obuf_size(port->buf) - port->svp.size - 5; + + struct iproto_header_bin header = iproto_header_bin; + header.v_len = mp_bswap_u32(len); + header.v_sync = mp_bswap_u32(port->sync); + + struct iproto_body_bin body = iproto_body_bin; + body.v_data_len = mp_bswap_u32(port->found); + + char *pos = (char *) obuf_svp_to_ptr(port->buf, &port->svp); + memcpy(pos, &header, sizeof(header)); + memcpy(pos + sizeof(header), &body, sizeof(body)); } static inline void -iproto_port_add_tuple(struct port *ptr, struct tuple *tuple, uint32_t flags) +iproto_port_add_tuple(struct port *ptr, struct tuple *tuple) { struct iproto_port *port = iproto_port(ptr); - if (++port->reply.found == 1) { + if (++port->found == 1) { /* Found the first tuple, add header. */ - port->svp = obuf_book(port->buf, sizeof(port->reply)); + port->svp = obuf_book(port->buf, SVP_SIZE); } - if (flags & BOX_RETURN_TUPLE) - tuple_to_obuf(tuple, port->buf); + tuple_to_obuf(tuple, port->buf); } struct port_vtab iproto_port_vtab = { diff --git a/src/iproto_port.h b/src/iproto_port.h index 6c1df396d2..74b6ba25e4 100644 --- a/src/iproto_port.h +++ b/src/iproto_port.h @@ -33,43 +33,14 @@ #include "box/port.h" #include "box/tuple.h" #include "iobuf.h" +#include "msgpuck/msgpuck.h" enum { /** Maximal iproto package body length (2GiB) */ IPROTO_BODY_LEN_MAX = 2147483648UL }; -enum iproto_key { - IPROTO_LEN = 0, - IPROTO_CODE = 1, - IPROTO_SYNC = 2, - IPROTO_SPACE = 3, - IPROTO_INDEX = 4, - IPROTO_TUPLE = 5 -}; - -/* - * struct iproto_header and struct iproto_reply_header - * share common prefix {msg_code, len, sync} - */ - -struct iproto_header { - uint32_t msg_code; - uint32_t len; - uint32_t sync; -} __attribute__((packed)); - -struct iproto_reply_header { - struct iproto_header hdr; - uint32_t ret_code; - uint32_t found; -} __attribute__((packed)); - -static inline struct iproto_header * -iproto(const char *pos) -{ - return (struct iproto_header *) pos; -} +enum { MSG_PING = 0 }; /** * struct iproto_port users need to be careful to: @@ -95,7 +66,8 @@ struct iproto_port /** Output buffer. */ struct obuf *buf; /** Reply header. */ - struct iproto_reply_header reply; + uint32_t sync; + uint32_t found; /** A pointer in the reply buffer where the reply starts. */ struct obuf_svp svp; }; @@ -104,36 +76,21 @@ extern struct port_vtab iproto_port_vtab; static inline void iproto_port_init(struct iproto_port *port, struct obuf *buf, - struct iproto_header *req) + uint32_t sync) { port->vtab = &iproto_port_vtab; port->buf = buf; - port->reply.hdr = *req; - port->reply.found = 0; - port->reply.ret_code = 0; + port->sync = sync; + port->found = 0; } /** Stack a reply to 'ping' packet. */ -static inline void -iproto_reply_ping(struct obuf *out, struct iproto_header *req) -{ - struct iproto_header reply = *req; - reply.len = 0; - obuf_dup(out, &reply, sizeof(reply)); -} +void +iproto_reply_ping(struct obuf *out, uint32_t sync); /** Send an error packet back. */ -static inline void -iproto_reply_error(struct obuf *out, struct iproto_header *req, - const ClientError& e) -{ - struct iproto_header reply = *req; - int errmsg_len = strlen(e.errmsg()) + 1; - uint32_t ret_code = tnt_errcode_val(e.errcode()); - reply.len = sizeof(ret_code) + errmsg_len;; - obuf_dup(out, &reply, sizeof(reply)); - obuf_dup(out, &ret_code, sizeof(ret_code)); - obuf_dup(out, e.errmsg(), errmsg_len); -} +void +iproto_reply_error(struct obuf *out, const ClientError &e, + uint32_t sync); #endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */ diff --git a/src/lua/msgpack.cc b/src/lua/msgpack.cc index cbf414853c..fb9243cf60 100644 --- a/src/lua/msgpack.cc +++ b/src/lua/msgpack.cc @@ -425,14 +425,39 @@ lua_msgpack_decode(lua_State *L) return 1; } +static int +lua_msgpack_next(lua_State *L) +{ + int index = lua_gettop(L); + if (index != 2 && index != 1 && lua_type(L, 1) != LUA_TSTRING) + return luaL_error(L, "msgpack.next: a Lua string expected"); + + size_t data_len; + uint32_t offset = index > 1 ? lua_tointeger(L, 2) - 1 : 0; + const char *data = lua_tolstring(L, 1, &data_len); + if (offset >= data_len) + luaL_error(L, "msgpack.next: offset is out of bounds"); + const char *end = data + data_len; + + const char *b = data + offset; + if (!mp_check(&b, end)) + return luaL_error(L, "msgpack.decode: invalid MsgPack"); + + b = data + offset; + luamp_decode(L, &b); + lua_pushinteger(L, b - data + 1); + return 2; +} + LUALIB_API int luaopen_msgpack(lua_State *L) { const luaL_reg msgpacklib[] = { { "encode", lua_msgpack_encode }, - { "loads", lua_msgpack_encode }, + { "dumps", lua_msgpack_encode }, { "decode", lua_msgpack_decode }, - { "dumps", lua_msgpack_decode }, + { "loads", lua_msgpack_decode }, + { "next", lua_msgpack_next}, { NULL, NULL} }; diff --git a/test/box/iproto.test.py b/test/box/iproto.test.py index e65adde33f..66728fb15a 100644 --- a/test/box/iproto.test.py +++ b/test/box/iproto.test.py @@ -17,7 +17,7 @@ print """ # Test bug #899343 (server assertion failure on incorrect packet) """ print "# send the package with invalid length" -inval_request = struct.pack('<LLL', box.net.SELECT, 4294967290, 1) +inval_request = struct.pack('<LLL', 1, 4294967290, 1) print s.send(inval_request) print "# check that is server alive" sql("ping") diff --git a/test/box/msgpack.result b/test/box/msgpack.result index 843861360a..e46721ab3c 100644 --- a/test/box/msgpack.result +++ b/test/box/msgpack.result @@ -419,3 +419,58 @@ msgpack.decode(msgpack.encode(a)); - null ... --# setopt delimiter '' +-- Test aliases, loads and dumps +a = { 1, 2, 3 } +--- +... +msgpack.decode(msgpack.dumps(a)) +--- +- - 1 + - 2 + - 3 +... +msgpack.loads(msgpack.encode(a)) +--- +- - 1 + - 2 + - 3 +... +-- Test msgpack.next +dump = msgpack.dumps({1, 2, 3})..msgpack.dumps({4, 5, 6}) +--- +... +dump:len() +--- +- 8 +... +a, offset = msgpack.next(dump) +--- +... +a +--- +- - 1 + - 2 + - 3 +... +offset +--- +- 5 +... +a, offset = msgpack.next(dump, offset) +--- +... +a +--- +- - 4 + - 5 + - 6 +... +offset +--- +- 9 +... +a, offset = msgpack.next(dump, offset) +--- +- error: '[string "a, offset = msgpack.next(dump, offset) "]:1: msgpack.next: offset + is out of bounds' +... diff --git a/test/box/msgpack.test.lua b/test/box/msgpack.test.lua index 8ec9e1c4cb..423dc3369a 100644 --- a/test/box/msgpack.test.lua +++ b/test/box/msgpack.test.lua @@ -182,3 +182,17 @@ a; msgpack.decode(msgpack.encode(a)); --# setopt delimiter '' +-- Test aliases, loads and dumps +a = { 1, 2, 3 } +msgpack.decode(msgpack.dumps(a)) +msgpack.loads(msgpack.encode(a)) +-- Test msgpack.next +dump = msgpack.dumps({1, 2, 3})..msgpack.dumps({4, 5, 6}) +dump:len() +a, offset = msgpack.next(dump) +a +offset +a, offset = msgpack.next(dump, offset) +a +offset +a, offset = msgpack.next(dump, offset) diff --git a/test/box/net.box.result b/test/box/net.box.result index 6118e73c94..f964f85f12 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -222,11 +222,11 @@ remote:close() ... remote:close() --- -- error: '[string "-- box_net.lua (internal file)..."]:416: box.net.box: already closed' +- error: '[string "-- box_net.lua (internal file)..."]:413: box.net.box: already closed' ... remote:ping() --- -- error: '[string "-- box_net.lua (internal file)..."]:421: box.net.box: connection +- error: '[string "-- box_net.lua (internal file)..."]:418: box.net.box: connection was closed' ... space:drop() diff --git a/test/box/socket.result b/test/box/socket.result index 53c7dd6015..65c916abba 100644 --- a/test/box/socket.result +++ b/test/box/socket.result @@ -899,7 +899,7 @@ ping s:close() --- ... - replies = 0 function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(12) replies = replies + 1 end end) ) return s:send(box.pack('iii', box.net.PING, 0, 1)) end + replies = 0 function bug1160869() local s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(18) replies = replies + 1 end end) ) return s:send(box.pack('iii', box.net.PING, 0, 1)) end --- ... bug1160869() @@ -921,7 +921,7 @@ replies --- - 3 ... - s = nil syncno = 0 reps = 0 function iostart() if s ~= nil then return end s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(12) reps = reps + 1 end end)) end function iotest() iostart() syncno = syncno + 1 return s:send(box.pack('iii', box.net.PING, 0, syncno)) end + s = nil syncno = 0 reps = 0 function iostart() if s ~= nil then return end s = box.socket.tcp() s:connect('127.0.0.1', box.cfg.primary_port) box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do s:recv(18) reps = reps + 1 end end)) end function iotest() iostart() syncno = syncno + 1 return s:send(box.pack('iii', box.net.PING, 0, syncno)) end --- ... iotest() diff --git a/test/box/socket.test.py b/test/box/socket.test.py index 5dac5fc404..8e0852ae44 100644 --- a/test/box/socket.test.py +++ b/test/box/socket.test.py @@ -517,7 +517,7 @@ function bug1160869() box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do - s:recv(12) + s:recv(18) replies = replies + 1 end end) ) @@ -545,7 +545,7 @@ function iostart() box.fiber.resume( box.fiber.create(function() box.fiber.detach() while true do - s:recv(12) + s:recv(18) reps = reps + 1 end end)) diff --git a/test/lib/box_connection.py b/test/lib/box_connection.py index 07b1d79637..d58967fe4b 100644 --- a/test/lib/box_connection.py +++ b/test/lib/box_connection.py @@ -57,7 +57,7 @@ class BoxConnection(TarantoolConnection): self.py_con.schema = Schema(schemadict) def check_connection(self): - rc = self.py_con._recv(self.py_con._socket.fileno(), '', 0, socket.MSG_DONTWAIT) + rc = self.py_con._sys_recv(self.py_con._socket.fileno(), '', 0, socket.MSG_DONTWAIT) if ctypes.get_errno() == errno.EAGAIN: ctypes.set_errno(0) return True diff --git a/test/lib/tarantool-python b/test/lib/tarantool-python index c6ebbaa160..e400b6699a 160000 --- a/test/lib/tarantool-python +++ b/test/lib/tarantool-python @@ -1 +1 @@ -Subproject commit c6ebbaa160f1088cd5c11d85241cc4acf5cb2a82 +Subproject commit e400b6699a00044134ae93b8ff83afd7dc48542f -- GitLab