diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 20464e854c4ec5097d1f66ddaa55fc113e3d45a8..ba763724ab3fb05d34109ff264b7ad520fe29ec8 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -44,7 +44,7 @@ add_library(box alter.cc schema.cc session.cc - port.c + port.cc request.cc txn.cc box.cc diff --git a/src/box/box.cc b/src/box/box.cc index 647a899ecbeb4be9a5e71ce3d446a22139f0f383..55e273a4ddd7551a1d4e3a63904d6b2812930bee 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -378,6 +378,7 @@ box_free(void) user_cache_free(); schema_free(); tuple_free(); + port_free(); recovery_exit(recovery); recovery = NULL; engine_shutdown(); @@ -464,6 +465,7 @@ box_init(void) cfg_getd("wal_dir_rescan_delay")); title("hot_standby", NULL); + port_init(); iproto_init(); box_set_listen(cfg_gets("listen")); diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index b10d1246cf04f9f93720e327bfd77fa495da8686..fb3e155fc0d2012a99097e4201a99f6d57a92c4d 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -221,53 +221,8 @@ lbox_request_create(struct request *request, } } -static void -port_ffi_add_tuple(struct port *port, struct tuple *tuple) -{ - struct port_ffi *port_ffi = (struct port_ffi *) port; - if (port_ffi->size >= port_ffi->capacity) { - uint32_t capacity = (port_ffi->capacity > 0) ? - 2 * port_ffi->capacity : 1024; - struct tuple **ret = (struct tuple **) - realloc(port_ffi->ret, sizeof(*ret) * capacity); - assert(ret != NULL); - port_ffi->ret = ret; - port_ffi->capacity = capacity; - } - tuple_ref(tuple); - port_ffi->ret[port_ffi->size++] = tuple; -} - -struct port_vtab port_ffi_vtab = { - port_ffi_add_tuple, - null_port_eof, -}; - -void -port_ffi_create(struct port_ffi *port) -{ - memset(port, 0, sizeof(*port)); - port->vtab = &port_ffi_vtab; -} - -static inline void -port_ffi_clear(struct port_ffi *port) -{ - for (uint32_t i = 0; i < port->size; i++) { - tuple_unref(port->ret[i]); - port->ret[i] = NULL; - } - port->size = 0; -} - -void -port_ffi_destroy(struct port_ffi *port) -{ - free(port->ret); -} - int -boxffi_select(struct port_ffi *port, uint32_t space_id, uint32_t index_id, +boxffi_select(struct port *port, uint32_t space_id, uint32_t index_id, int iterator, uint32_t offset, uint32_t limit, const char *key, const char *key_end) { @@ -281,21 +236,10 @@ boxffi_select(struct port_ffi *port, uint32_t space_id, uint32_t index_id, request.key = key; request.key_end = key_end; - /* - * A single instance of port_ffi object is used - * for all selects, reset it. - */ - port->size = 0; try { - box_process(&request, (struct port *) port); + box_process(&request, port); return 0; } catch (Exception *e) { - /* - * The tuples will be not blessed and garbage - * collected, unreference them here, to avoid - * a leak. - */ - port_ffi_clear(port); /* will be hanled by box.error() in Lua */ return -1; } diff --git a/src/box/lua/call.h b/src/box/lua/call.h index f578e6a54c0cb5051161689033c2c5c8f67921f2..35b53cce5660ced6926ea76efdaac6ec6637aa6e 100644 --- a/src/box/lua/call.h +++ b/src/box/lua/call.h @@ -45,22 +45,9 @@ void box_lua_eval(struct request *request, struct obuf *out); extern "C" { -struct port_ffi -{ - struct port_vtab *vtab; - uint32_t size; - uint32_t capacity; - struct tuple **ret; -}; - -void -port_ffi_create(struct port_ffi *port); - -void -port_ffi_destroy(struct port_ffi *port); int -boxffi_select(struct port_ffi *port, uint32_t space_id, uint32_t index_id, +boxffi_select(struct port *port, uint32_t space_id, uint32_t index_id, int iterator, uint32_t offset, uint32_t limit, const char *key, const char *key_end); diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index a02d49643c65b9eea2f4c041f90a0a76589e5ee1..0e4fe51352a1c8c2fb6e4b8a0efae218d4e04c55 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -42,22 +42,35 @@ ffi.cdef[[ struct tuple * boxffi_iterator_next(struct iterator *itr); - struct port; - struct port_ffi + struct port { struct port_vtab *vtab; - uint32_t size; - uint32_t capacity; - struct tuple **ret; + }; + + struct port_buf_entry { + struct port_buf_entry *next; + struct tuple *tuple; + }; + + struct port_buf { + struct port base; + size_t size; + struct port_buf_entry *first; + struct port_buf_entry *last; + struct port_buf_entry first_entry; }; void - port_ffi_create(struct port_ffi *port); + port_buf_create(struct port_buf *port_buf); + + void + port_buf_destroy(struct port_buf *port_buf); + void - port_ffi_destroy(struct port_ffi *port); + port_buf_transfer(struct port_buf *port_buf); int - boxffi_select(struct port_ffi *port, uint32_t space_id, uint32_t index_id, + boxffi_select(struct port_buf *port, uint32_t space_id, uint32_t index_id, int iterator, uint32_t offset, uint32_t limit, const char *key, const char *key_end); void password_prepare(const char *password, int len, @@ -552,9 +565,8 @@ local iterator_cdata_gc = function(iterator) end -- global struct port instance to use by select()/get() -local port = ffi.new('struct port_ffi') -builtin.port_ffi_create(port) -ffi.gc(port, builtin.port_ffi_destroy) +local port_buf = ffi.new('struct port_buf') +local port_buf_entry_t = ffi.typeof('struct port_buf_entry') -- Helper function for nicer error messages -- in some cases when space object is misused @@ -740,16 +752,23 @@ function box.schema.space.bless(space) local key, key_end = msgpackffi.encode_tuple(key) local iterator, offset, limit = check_select_opts(opts, key + 1 >= key_end) - if builtin.boxffi_select(port, index.space_id, + builtin.port_buf_create(port_buf) + if builtin.boxffi_select(port_buf, index.space_id, index.id, iterator, offset, limit, key, key_end) ~=0 then + builtin.port_buf_destroy(port_buf); return box.error() end local ret = {} - for i=0,port.size - 1,1 do + local entry = port_buf.first + local i = 1 + while entry ~= nil do -- tuple.bless must never fail - ret[i + 1] = box.tuple.bless(port.ret[i]) + ret[i] = box.tuple.bless(entry.tuple) + entry = entry.next + i = i + 1 end + builtin.port_buf_transfer(port_buf); return ret end diff --git a/src/box/port.c b/src/box/port.c deleted file mode 100644 index 9eed9679704592c59c09bff202ae5567e2b2fcca..0000000000000000000000000000000000000000 --- a/src/box/port.c +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Redistribution and use in source and binary forms, with or - * without modification, are permitted provided that the following - * conditions are met: - * - * 1. Redistributions of source code must retain the above - * copyright notice, this list of conditions and the - * following disclaimer. - * - * 2. Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following - * disclaimer in the documentation and/or other materials - * provided with the distribution. - * - * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND - * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED - * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL - * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, - * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL - * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF - * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR - * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF - * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF - * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF - * SUCH DAMAGE. - */ -#include "port.h" - -void -null_port_eof(struct port *port __attribute__((unused))) -{ -} - -static void -null_port_add_tuple(struct port *port __attribute__((unused)), - struct tuple *tuple __attribute__((unused))) -{ -} - -static struct port_vtab null_port_vtab = { - null_port_add_tuple, - null_port_eof, -}; - -struct port null_port = { - /* .vtab = */ &null_port_vtab, -}; - diff --git a/src/box/port.cc b/src/box/port.cc new file mode 100644 index 0000000000000000000000000000000000000000..5598e59b1c879e78f6d0622f144106c39d043b95 --- /dev/null +++ b/src/box/port.cc @@ -0,0 +1,138 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "port.h" +#include "tuple.h" +#include <lib/small/slab_cache.h> +#include <lib/small/mempool.h> +#include <fiber.h> + +void +null_port_eof(struct port *port __attribute__((unused))) +{ +} + +static void +null_port_add_tuple(struct port *port __attribute__((unused)), + struct tuple *tuple __attribute__((unused))) +{ +} + +static struct port_vtab null_port_vtab = { + null_port_add_tuple, + null_port_eof, +}; + +struct port null_port = { + /* .vtab = */ &null_port_vtab, +}; + +static struct mempool port_buf_entry_pool; + +static void +port_buf_add_tuple(struct port *port, struct tuple *tuple) +{ + struct port_buf *port_buf = (struct port_buf *) port; + struct port_buf_entry *e; + if (port_buf->size == 0) { + tuple_ref(tuple); /* throws */ + e = &port_buf->first_entry; + port_buf->first = port_buf->last = e; + } else { + e = (struct port_buf_entry *) + mempool_alloc(&port_buf_entry_pool); /* throws */ + try { + tuple_ref(tuple); /* throws */ + } catch (Exception *) { + mempool_free(&port_buf_entry_pool, e); + throw; + } + port_buf->last->next = e; + port_buf->last = e; + } + e->tuple = tuple; + e->next = NULL; + ++port_buf->size; +} + +static struct port_vtab port_buf_vtab = { + port_buf_add_tuple, + null_port_eof, +}; + +void +port_buf_create(struct port_buf *port_buf) +{ + port_buf->base.vtab = &port_buf_vtab; + port_buf->size = 0; + port_buf->first = NULL; + port_buf->last = NULL; +} + +void +port_buf_destroy(struct port_buf *port_buf) +{ + struct port_buf_entry *e = port_buf->first; + if (e == NULL) + return; + tuple_unref(e->tuple); + e = e->next; + while (e != NULL) { + struct port_buf_entry *cur = e; + e = e->next; + tuple_unref(cur->tuple); + mempool_free(&port_buf_entry_pool, cur); + } +} + +void +port_buf_transfer(struct port_buf *port_buf) +{ + struct port_buf_entry *e = port_buf->first; + if (e == NULL) + return; + e = e->next; + while (e != NULL) { + struct port_buf_entry *cur = e; + e = e->next; + mempool_free(&port_buf_entry_pool, cur); + } +} + +void +port_init(void) +{ + mempool_create(&port_buf_entry_pool, &cord()->slabc, + sizeof(struct port_buf_entry)); +} + +void +port_free(void) +{ + mempool_destroy(&port_buf_entry_pool); +} diff --git a/src/box/port.h b/src/box/port.h index e839114876127ea8ea9796e112d7e741c884d4ca..9bfa361c47a75e1bd98cdfa8724b438d1ea94055 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -29,6 +29,7 @@ * SUCH DAMAGE. */ #include "trivia/util.h" +#include "lib/salad/rlist.h" #if defined(__cplusplus) extern "C" { @@ -91,6 +92,40 @@ extern struct port null_port; void null_port_eof(struct port *port __attribute__((unused))); +struct port_buf_entry { + struct port_buf_entry *next; + struct tuple *tuple; +}; + +struct port_buf { + struct port base; + size_t size; + struct port_buf_entry *first; + struct port_buf_entry *last; + struct port_buf_entry first_entry; +}; + +void +port_buf_create(struct port_buf *port_buf); + +/** + * Unref all tuples and free allocated memory + */ +void +port_buf_destroy(struct port_buf *port_buf); + +/** + * Like port_buf_destroy() but doesn't do tuple_unref() + */ +void +port_buf_transfer(struct port_buf *port_buf); + +void +port_init(void); + +void +port_free(void); + #if defined(__cplusplus) } /* extern "C" */ #endif /* defined(__cplusplus) */ diff --git a/src/ffisyms.cc b/src/ffisyms.cc index f2d3fbe3162236af028effbc9d1dccd2e36b9a05..e2edf632796fe4fa2b31233c38473535927b7897 100644 --- a/src/ffisyms.cc +++ b/src/ffisyms.cc @@ -7,6 +7,7 @@ #include <box/lua/tuple.h> #include <box/lua/call.h> #include <box/sophia_engine.h> +#include <box/port.h> #include <lua/init.h> #include "main.h" #include "lua/bsdsocket.h" @@ -43,8 +44,6 @@ void *ffi_symbols[] = { (void *) boxffi_index_iterator, (void *) boxffi_tuple_update, (void *) boxffi_iterator_next, - (void *) port_ffi_create, - (void *) port_ffi_destroy, (void *) boxffi_select, (void *) password_prepare, (void *) tarantool_error_message, @@ -71,4 +70,7 @@ void *ffi_symbols[] = { (void *) ibuf_create, (void *) ibuf_destroy, (void *) ibuf_reserve_nothrow_slow, + (void *) port_buf_create, + (void *) port_buf_destroy, + (void *) port_buf_transfer };