From b42c8ee0af1615dc270aad736a011937a65e4c64 Mon Sep 17 00:00:00 2001 From: Konstantin Osipov <kostja@tarantool.org> Date: Mon, 1 Jul 2013 13:04:37 +0400 Subject: [PATCH] Refactoring: prepare for addition of tuple formats. Extract the remaining places which access tuple->data directly into tuple_to_port.cc file, which will have full access to all tuple formats and all destinations to which a tuple may need to be converted. In this file it'll be possible to perform a conversion efficiently, at the same time, it will be the only place of cross-dependency between all tuple formats and all conversion destinations. --- src/CMakeLists.txt | 1 + src/box/CMakeLists.txt | 19 +++++- src/box/box.cc | 2 +- src/box/box_lua.cc | 12 ++-- src/box/port.cc | 14 ++--- src/box/port.h | 4 +- src/box/tuple.h | 10 +++ src/box/tuple_convert.cc | 47 ++++++++++++++ src/iproto.cc | 133 ++------------------------------------- src/iproto_port.cc | 69 ++++++++++++++++++++ src/iproto_port.h | 107 +++++++++++++++++++++++++++++++ src/memcached.cc | 4 +- 12 files changed, 272 insertions(+), 150 deletions(-) create mode 100644 src/box/tuple_convert.cc create mode 100644 src/iproto_port.cc create mode 100644 src/iproto_port.h diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 78145267b5..8781917490 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -103,6 +103,7 @@ set (common_sources cpu_feature.c replica.cc iproto.cc + iproto_port.cc session.cc object.cc exception.cc diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 4e251781ae..9929d5be5d 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -15,8 +15,21 @@ add_custom_target(box_generate_lua_sources DEPENDS ${lua_sources}) set_property(DIRECTORY PROPERTY ADDITIONAL_MAKE_CLEAN_FILES ${lua_sources}) -tarantool_module("box" tuple.cc index.cc hash_index.cc tree_index.cc space.cc - port.cc request.cc tuple_update.cc txn.cc box.cc ${lua_sources} box_lua.cc box_lua_space.cc - bitset_index.cc) +tarantool_module("box" + tuple.cc + tuple_convert.cc + tuple_update.cc + index.cc + hash_index.cc + tree_index.cc + bitset_index.cc + space.cc + port.cc + request.cc + txn.cc + box.cc + ${lua_sources} + box_lua.cc + box_lua_space.cc) target_link_libraries(tarantool_box bitset) diff --git a/src/box/box.cc b/src/box/box.cc index 7bbc8f16fa..5bd58b0819 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -176,7 +176,7 @@ recover_row(void *param __attribute__((unused)), struct tbuf *t) recover_snap_row(data); } else if (tag == XLOG) { u16 op = pick_u16(&data, end); - process_rw(&port_null, op, data, end - data); + process_rw(&null_port, op, data, end - data); } else { say_error("unknown row tag: %i", (int)tag); return -1; diff --git a/src/box/box_lua.cc b/src/box/box_lua.cc index 0df009d113..e1202c02cb 100644 --- a/src/box/box_lua.cc +++ b/src/box/box_lua.cc @@ -792,7 +792,7 @@ lbox_create_iterator(struct lua_State *L) /* Tuple. */ struct tuple *tuple = lua_checktuple(L, 2); key_part_count = tuple->field_count; - luaL_addlstring(&b, tuple->data, tuple->bsize); + tuple_to_luabuf(tuple, &b); } else { /* Single or multi- part key. */ key_part_count = argc - 2; @@ -905,7 +905,7 @@ lbox_index_count(struct lua_State *L) if (argc == 1 && lua_type(L, 2) == LUA_TUSERDATA) { /* Searching by tuple. */ struct tuple *tuple = lua_checktuple(L, 2); - luaL_addlstring(&b, tuple->data, tuple->bsize); + tuple_to_luabuf(tuple, &b); key_part_count = tuple->field_count; } else { /* Single or multi- part key. */ @@ -994,7 +994,7 @@ port_lua_add_tuple(struct port *port, struct tuple *tuple, struct port_vtab port_lua_vtab = { port_lua_add_tuple, - port_null_eof, + null_port_eof, }; static struct port * @@ -1398,10 +1398,10 @@ luaL_packvalue(struct lua_State *L, luaL_Buffer *b, int index) switch (lua_type(L, index)) { case LUA_TUSERDATA: { - struct tuple *tu = lua_istuple(L, index); - if (tu == NULL) + struct tuple *tuple = lua_istuple(L, index); + if (tuple == NULL) luaL_error(L, "box.pack: unsupported type"); - luaL_addlstring(b, (char*)tu->data, tu->bsize); + tuple_to_luabuf(tuple, b); return; } case LUA_TTABLE: diff --git a/src/box/port.cc b/src/box/port.cc index 98a8c96542..81fa096b20 100644 --- a/src/box/port.cc +++ b/src/box/port.cc @@ -29,23 +29,23 @@ #include "port.h" void -port_null_eof(struct port *port __attribute__((unused))) +null_port_eof(struct port *port __attribute__((unused))) { } static void -port_null_add_tuple(struct port *port __attribute__((unused)), +null_port_add_tuple(struct port *port __attribute__((unused)), struct tuple *tuple __attribute__((unused)), u32 flags __attribute__((unused))) { } -static struct port_vtab port_null_vtab = { - port_null_add_tuple, - port_null_eof, +static struct port_vtab null_port_vtab = { + null_port_add_tuple, + null_port_eof, }; -struct port port_null = { - /* .vtab = */ &port_null_vtab, +struct port null_port = { + /* .vtab = */ &null_port_vtab, }; diff --git a/src/box/port.h b/src/box/port.h index cd3a816695..dc4c3a811e 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -79,12 +79,12 @@ port_add_tuple(struct port *port, struct tuple *tuple, u32 flags) /** Reused in port_lua */ void -port_null_eof(struct port *port __attribute__((unused))); +null_port_eof(struct port *port __attribute__((unused))); /** * This one does not have state currently, thus a single * instance is sufficient. */ -extern struct port port_null; +extern struct port null_port; #endif /* INCLUDES_TARANTOOL_BOX_PORT_H */ diff --git a/src/box/tuple.h b/src/box/tuple.h index 865eacfd9a..81945b0a03 100644 --- a/src/box/tuple.h +++ b/src/box/tuple.h @@ -230,5 +230,15 @@ int tuple_compare_with_key(const struct tuple *tuple_a, const char *key, uint32_t part_count, const struct key_def *key_def); +/** These functions are implemented in tuple_convert.cc. */ + +/* Store tuple in the output buffer in iproto format. */ +void +tuple_to_obuf(struct tuple *tuple, struct obuf *buf); + +/* Store tuple fields in the Lua buffer, BER-length-encoded. */ +void +tuple_to_luabuf(struct tuple *tuple, struct luaL_Buffer *b); + #endif /* TARANTOOL_BOX_TUPLE_H_INCLUDED */ diff --git a/src/box/tuple_convert.cc b/src/box/tuple_convert.cc new file mode 100644 index 0000000000..fa790ec6a6 --- /dev/null +++ b/src/box/tuple_convert.cc @@ -0,0 +1,47 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "tuple.h" +#include "iobuf.h" +extern "C" { +#include <lua.h> +#include <lauxlib.h> +#include <lualib.h> +} /* extern "C" */ + +void +tuple_to_obuf(struct tuple *tuple, struct obuf *buf) +{ + obuf_dup(buf, &tuple->bsize, tuple_len(tuple)); +} + +void +tuple_to_luabuf(struct tuple *tuple, struct luaL_Buffer *b) +{ + luaL_addlstring(b, (char*)tuple->data, tuple->bsize); +} diff --git a/src/iproto.cc b/src/iproto.cc index d49410ccf5..0824305df4 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -32,143 +32,19 @@ #include <stdarg.h> #include <stdio.h> +#include "iproto_port.h" #include "tarantool.h" #include "exception.h" #include "errcode.h" #include "fiber.h" #include "say.h" -#include "box/box.h" -#include "box/port.h" -#include "box/tuple.h" -#include "box/request.h" -#include "iobuf.h" #include "evio.h" #include "session.h" #include "scoped_guard.h" -enum { - /** Maximal iproto package body length (2GiB) */ - IPROTO_BODY_LEN_MAX = 2147483648UL -}; - -/* - * struct iproto_header and struct iproto_reply_header - * share common prefix {msg_code, len, sync} - */ - -struct iproto_header { - uint32_t msg_code; - uint32_t len; - uint32_t sync; -} __attribute__((packed)); - static struct iproto_header dummy_header = { 0, 0, 0 }; - -struct iproto_reply_header { - struct iproto_header hdr; - uint32_t ret_code; - uint32_t found; -} __attribute__((packed)); - const uint32_t msg_ping = 0xff00; -static inline struct iproto_header * -iproto(const char *pos) -{ - return (struct iproto_header *) pos; -} - -/* {{{ port_iproto */ - -/** - * struct port_iproto users need to be careful to: - * - not unwind output of other fibers when - * rolling back to a savepoint (provided that - * multiple fibers work on the same session), - * - not increment write position before there is a complete - * response, i.e. a response which will not be rolled back - * and which has a complete header. - * - never increment write position without having - * a complete response. Otherwise a situation can occur - * when many requests started processing, but completed - * in a different order, and thus incomplete output is - * sent to the client. - * - * To ensure this, port_iproto must be used only in - * atomic manner, i.e. once first port_add_tuple() is done, - * there can be no yields until port_eof(). - */ -struct port_iproto -{ - struct port_vtab *vtab; - /** Output buffer. */ - struct obuf *buf; - /** Reply header. */ - struct iproto_reply_header reply; - /** A pointer in the reply buffer where the reply starts. */ - struct obuf_svp svp; -}; - -static inline struct port_iproto * -port_iproto(struct port *port) -{ - return (struct port_iproto *) port; -} - -static void -port_iproto_eof(struct port *ptr) -{ - struct port_iproto *port = port_iproto(ptr); - /* found == 0 means add_tuple wasn't called at all. */ - if (port->reply.found == 0) { - port->reply.hdr.len = sizeof(port->reply) - - sizeof(port->reply.hdr); - obuf_dup(port->buf, &port->reply, sizeof(port->reply)); - } else { - port->reply.hdr.len = obuf_size(port->buf) - port->svp.size - - sizeof(port->reply.hdr); - memcpy(obuf_svp_to_ptr(port->buf, &port->svp), - &port->reply, sizeof(port->reply)); - } -} - -static inline void -tuple_to_iproto(struct port_iproto *port, struct tuple *tuple) -{ - obuf_dup(port->buf, &tuple->bsize, tuple_len(tuple)); -} - -static void -port_iproto_add_tuple(struct port *ptr, struct tuple *tuple, u32 flags) -{ - struct port_iproto *port = port_iproto(ptr); - if (++port->reply.found == 1) { - /* Found the first tuple, add header. */ - port->svp = obuf_book(port->buf, sizeof(port->reply)); - } - if (flags & BOX_RETURN_TUPLE) { - tuple_to_iproto(port, tuple); - } -} - -static struct port_vtab port_iproto_vtab = { - port_iproto_add_tuple, - port_iproto_eof, -}; - -static void -port_iproto_init(struct port_iproto *port, struct obuf *buf, - struct iproto_header *req) -{ - port->vtab = &port_iproto_vtab; - port->buf = buf; - port->reply.hdr = *req; - port->reply.found = 0; - port->reply.ret_code = 0; -} - -/* }}} */ - /* {{{ iproto_queue */ struct iproto_request; @@ -758,7 +634,7 @@ iproto_reply_error(struct obuf *out, struct iproto_header *req, /** Stack a reply to a single request to the fiber's io vector. */ static inline void -iproto_reply(struct port_iproto *port, box_process_func callback, +iproto_reply(struct iproto_port *port, box_process_func callback, struct obuf *out, struct iproto_header *header) { if (header->msg_code == msg_ping) @@ -766,7 +642,7 @@ iproto_reply(struct port_iproto *port, box_process_func callback, /* Make request body point to iproto data */ char *body = (char *) &header[1]; - port_iproto_init(port, out, header); + iproto_port_init(port, out, header); try { callback((struct port *) port, header->msg_code, body, header->len); @@ -783,7 +659,7 @@ iproto_process_request(struct iproto_request *request) struct iproto_session *session = request->session; struct iproto_header *header = request->header; struct iobuf *iobuf = request->iobuf; - struct port_iproto port; + struct iproto_port port; auto scope_guard = make_scoped_guard([=]{ iobuf->in.pos += sizeof(*header) + header->len; @@ -851,7 +727,6 @@ iproto_process_disconnect(struct iproto_request *request) /** }}} */ - /** * Create a session context and start input. */ diff --git a/src/iproto_port.cc b/src/iproto_port.cc new file mode 100644 index 0000000000..0363adbf6e --- /dev/null +++ b/src/iproto_port.cc @@ -0,0 +1,69 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "iproto_port.h" + +static inline struct iproto_port * +iproto_port(struct port *port) +{ + return (struct iproto_port *) port; +} + +static inline void +iproto_port_eof(struct port *ptr) +{ + struct iproto_port *port = iproto_port(ptr); + /* found == 0 means add_tuple wasn't called at all. */ + if (port->reply.found == 0) { + port->reply.hdr.len = sizeof(port->reply) - + sizeof(port->reply.hdr); + obuf_dup(port->buf, &port->reply, sizeof(port->reply)); + } else { + port->reply.hdr.len = obuf_size(port->buf) - port->svp.size - + sizeof(port->reply.hdr); + memcpy(obuf_svp_to_ptr(port->buf, &port->svp), + &port->reply, sizeof(port->reply)); + } +} + +static inline void +iproto_port_add_tuple(struct port *ptr, struct tuple *tuple, u32 flags) +{ + struct iproto_port *port = iproto_port(ptr); + if (++port->reply.found == 1) { + /* Found the first tuple, add header. */ + port->svp = obuf_book(port->buf, sizeof(port->reply)); + } + if (flags & BOX_RETURN_TUPLE) + tuple_to_obuf(tuple, port->buf); +} + +struct port_vtab iproto_port_vtab = { + iproto_port_add_tuple, + iproto_port_eof, +}; diff --git a/src/iproto_port.h b/src/iproto_port.h new file mode 100644 index 0000000000..47d1b52495 --- /dev/null +++ b/src/iproto_port.h @@ -0,0 +1,107 @@ +#ifndef TARANTOOL_IPROTO_PORT_H_INCLUDED +#define TARANTOOL_IPROTO_PORT_H_INCLUDED +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "box/box.h" +#include "box/request.h" +#include "box/port.h" +#include "box/tuple.h" +#include "iobuf.h" + +enum { + /** Maximal iproto package body length (2GiB) */ + IPROTO_BODY_LEN_MAX = 2147483648UL +}; + +/* + * struct iproto_header and struct iproto_reply_header + * share common prefix {msg_code, len, sync} + */ + +struct iproto_header { + uint32_t msg_code; + uint32_t len; + uint32_t sync; +} __attribute__((packed)); + +struct iproto_reply_header { + struct iproto_header hdr; + uint32_t ret_code; + uint32_t found; +} __attribute__((packed)); + +static inline struct iproto_header * +iproto(const char *pos) +{ + return (struct iproto_header *) pos; +} + +/** + * struct iproto_port users need to be careful to: + * - not unwind output of other fibers when + * rolling back to a savepoint (provided that + * multiple fibers work on the same session), + * - not increment write position before there is a complete + * response, i.e. a response which will not be rolled back + * and which has a complete header. + * - never increment write position without having + * a complete response. Otherwise a situation can occur + * when many requests started processing, but completed + * in a different order, and thus incomplete output is + * sent to the client. + * + * To ensure this, iproto_port must be used only in + * atomic manner, i.e. once first port_add_tuple() is done, + * there can be no yields until port_eof(). + */ +struct iproto_port +{ + struct port_vtab *vtab; + /** Output buffer. */ + struct obuf *buf; + /** Reply header. */ + struct iproto_reply_header reply; + /** A pointer in the reply buffer where the reply starts. */ + struct obuf_svp svp; +}; + +extern struct port_vtab iproto_port_vtab; + +static inline void +iproto_port_init(struct iproto_port *port, struct obuf *buf, + struct iproto_header *req) +{ + port->vtab = &iproto_port_vtab; + port->buf = buf; + port->reply.hdr = *req; + port->reply.found = 0; + port->reply.ret_code = 0; +} + +#endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */ diff --git a/src/memcached.cc b/src/memcached.cc index ea9931862a..aaf3cd4b8c 100644 --- a/src/memcached.cc +++ b/src/memcached.cc @@ -153,7 +153,7 @@ memcached_store(const char *key, u32 exptime, u32 flags, u32 bytes, * Use a box dispatch wrapper which handles correctly * read-only/read-write modes. */ - box_process(&port_null, REPLACE, req->data, req->size); + box_process(&null_port, REPLACE, req->data, req->size); } static void @@ -168,7 +168,7 @@ memcached_delete(const char *key) tbuf_append(req, &key_len, sizeof(key_len)); tbuf_append_field(req, key); - box_process(&port_null, DELETE, req->data, req->size); + box_process(&null_port, DELETE, req->data, req->size); } static struct tuple * -- GitLab