diff --git a/README.md b/README.md index 6d72a6d2b97fa3482fcd1ff5badb5f877a327b32..eeede13bffbdee0447f40d8c8b4faa6a1d80f85b 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ To start the server, try: This will start Tarantool in interactive mode. To run Tarantool regression tests (test/test-run.py), -a few additional Python modules are ncessary: +a few additional Python modules are necessary: * daemon * pyyaml * msgpack-python diff --git a/src/box/box.cc b/src/box/box.cc index f98318e9427e12980ae063e2157ecc3e64e67e0b..d07b56a8467fd91aaf2418b996e0f309221aaa9b 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -602,6 +602,7 @@ box_free(void) tuple_free(); port_free(); engine_shutdown(); + rmean_delete(rmean_error); rmean_delete(rmean_box); } } @@ -626,6 +627,25 @@ engine_init() engine_register(sophia); } +/** + * @brief Reduce the current number of threads in the thread pool to the + * bare minimum. Doesn't prevent the pool from spawning new threads later + * if demand mounts. + */ +static void +thread_pool_trim() +{ + /* + * Trim OpenMP thread pool. + * Though we lack the direct control the workaround below works for + * GNU OpenMP library. The library stops surplus threads on entering + * a parallel region. Can't go below 2 threads due to the + * implementation quirk. + */ +#pragma omp parallel num_threads(2) + ; +} + static inline void box_init(void) { @@ -635,6 +655,7 @@ box_init(void) cfg_getd("slab_alloc_factor")); rmean_box = rmean_new(iproto_type_strs, IPROTO_TYPE_STAT_MAX); + rmean_error = rmean_new(rmean_error_strings, RMEAN_ERROR_LAST); engine_init(); @@ -696,6 +717,11 @@ box_init(void) engine_end_recovery(); + /* + * Recovery inflates the thread pool quite a bit (due to parallel sort). + */ + thread_pool_trim(); + rmean_cleanup(rmean_box); if (recovery_has_replica(recovery)) diff --git a/src/box/error.cc b/src/box/error.cc index 789ffc6f6287941fb7fbb7e0d06b9f9180d14e6f..d50c0f33d2614e7e21afd80c9cabc316b92f110b 100644 --- a/src/box/error.cc +++ b/src/box/error.cc @@ -32,6 +32,11 @@ #include <stdio.h> #include <fiber.h> +struct rmean *rmean_error = NULL; + +const char *rmean_error_strings[RMEAN_ERROR_LAST] = { + "ERROR" +}; static struct method clienterror_methods[] = { make_method(&type_ClientError, "code", &ClientError::errcode), @@ -49,6 +54,8 @@ ClientError::ClientError(const char *file, unsigned line, va_start(ap, errcode); vsnprintf(m_errmsg, sizeof(m_errmsg), tnt_errcode_desc(m_errcode), ap); + if (rmean_error) + rmean_collect(rmean_error, RMEAN_ERROR, 1); va_end(ap); } @@ -59,6 +66,8 @@ ClientError::ClientError(const char *file, unsigned line, const char *msg, m_errcode = errcode; strncpy(m_errmsg, msg, sizeof(m_errmsg) - 1); m_errmsg[sizeof(m_errmsg) - 1] = 0; + if (rmean_error) + rmean_collect(rmean_error, RMEAN_ERROR, 1); } void diff --git a/src/box/error.h b/src/box/error.h index 95e41ca0b3a624a6d94fd5ff263ae4f23fbc83f2..9aa96b4ffaf29a9884251059850216c03ee62000 100644 --- a/src/box/error.h +++ b/src/box/error.h @@ -32,6 +32,15 @@ */ #include "errcode.h" #include "exception.h" +#include "rmean.h" + +extern struct rmean *rmean_error; + +enum rmean_error_name { + RMEAN_ERROR, + RMEAN_ERROR_LAST +}; +extern const char *rmean_error_strings[RMEAN_ERROR_LAST]; extern const struct type type_ClientError; class ClientError: public Exception { diff --git a/src/box/index.cc b/src/box/index.cc index a07f41d0ea063bb03abf8d1616e455911190268c..b0519d774590ee3df1699d1e81f3f71d1f236820 100644 --- a/src/box/index.cc +++ b/src/box/index.cc @@ -34,6 +34,8 @@ #include "schema.h" #include "user_def.h" #include "space.h" +#include "iproto_constants.h" +#include "request.h" const char *iterator_type_strs[] = { /* [ITER_EQ] = */ "EQ", @@ -305,6 +307,9 @@ box_index_get(uint32_t space_id, uint32_t index_id, const char *key, uint32_t part_count = key ? mp_decode_array(&key) : 0; primary_key_validate(index->key_def, key, part_count); struct tuple *tuple = index->findByKey(key, part_count); + /* Count statistics */ + rmean_collect(rmean_box, IPROTO_SELECT, 1); + *result = tuple_bless_null(tuple); return 0; } catch (Exception *) { diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 79867f03fa0907be3d245c27268b1280bb3004d5..be8026c38526d8fd0632e376becffe64549ca244 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -55,6 +55,7 @@ /* {{{ iproto_msg - declaration */ + /** * A single msg from io thread. All requests * from all connections are queued into a single queue @@ -513,6 +514,9 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, iproto_connection_close(con); return; } + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_RECEIVED, nrd); + /* Update the read position and connection state. */ in->wpos += nrd; con->parse_size += nrd; @@ -564,6 +568,9 @@ iproto_flush(struct iobuf *iobuf, struct iproto_connection *con) iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1); ssize_t nwr = sio_writev(fd, iov, iovcnt); + + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_SENT, nwr); if (nwr > 0) { if (begin->used + nwr == end->used) { if (ibuf_used(&iobuf->in) == 0) { @@ -789,8 +796,11 @@ net_send_greeting(struct cmsg *m) if (msg->close_connection) { struct obuf *out = &msg->iobuf->out; try { - sio_writev(con->output.fd, out->iov, - obuf_iovcnt(out)); + int64_t nwr = sio_writev(con->output.fd, out->iov, + obuf_iovcnt(out)); + + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_SENT, nwr); } catch (Exception *e) { e->log(); } @@ -863,13 +873,27 @@ net_cord_f(va_list /* ap */) evio_service_init(loop(), &binary, "binary", iproto_on_accept, NULL); + + /* Init statistics counter */ + rmean_net = rmean_new(rmean_net_strings, RMEAN_NET_LAST); + + if (rmean_net == NULL) + tnt_raise(OutOfMemory, + sizeof(*rmean_net) + + RMEAN_NET_LAST * sizeof(stats), + "rmean", "struct rmean"); + + cbus_join(&net_tx_bus, &net_pipe); + /* * Nothing to do in the fiber so far, the service * will take care of creating events for incoming * connections. */ fiber_yield(); + + rmean_delete(rmean_net); } /** Initialize the iproto subsystem and start network io thread */ @@ -890,6 +914,7 @@ iproto_init() if (cord_costart(&net_cord, "iproto", net_cord_f, NULL)) panic("failed to initialize iproto thread"); + cbus_join(&net_tx_bus, &tx_pipe); } diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 9c9994c5b2360daa746c1bffb0f132feed4a2f58..3275f86352fd844d5bc5687a19cab3321915a7c5 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -289,7 +289,7 @@ end -- space format - the metadata about space fields function box.schema.space.format(id, format) - _space = box.space._space + local _space = box.space._space check_param(id, 'id', 'number') check_param(format, 'format', 'table') if format == nil then diff --git a/src/box/lua/stat.cc b/src/box/lua/stat.cc index 7aa56497429d432b4280cad414d1269ee5b04016..efdc1d3121de07c0fda45aa7239361c53ad7187b 100644 --- a/src/box/lua/stat.cc +++ b/src/box/lua/stat.cc @@ -34,6 +34,8 @@ #include <string.h> #include <rmean.h> #include <box/request.h> +#include <cbus.h> +#include <box/error.h> extern "C" { #include <lua.h> @@ -91,7 +93,10 @@ static int lbox_stat_index(struct lua_State *L) { luaL_checkstring(L, -1); - return rmean_foreach(rmean_box, seek_stat_item, L); + int res = rmean_foreach(rmean_box, seek_stat_item, L); + if (res) + return res; + return rmean_foreach(rmean_error, seek_stat_item, L); } static int @@ -99,6 +104,22 @@ lbox_stat_call(struct lua_State *L) { lua_newtable(L); rmean_foreach(rmean_box, set_stat_item, L); + rmean_foreach(rmean_error, set_stat_item, L); + return 1; +} + +static int +lbox_stat_net_index(struct lua_State *L) +{ + luaL_checkstring(L, -1); + return rmean_foreach(rmean_net, seek_stat_item, L); +} + +static int +lbox_stat_net_call(struct lua_State *L) +{ + lua_newtable(L); + rmean_foreach(rmean_net, set_stat_item, L); return 1; } @@ -108,7 +129,13 @@ static const struct luaL_reg lbox_stat_meta [] = { {NULL, NULL} }; -/** Initialize bos.stat package. */ +static const struct luaL_reg lbox_stat_net_meta [] = { + {"__index", lbox_stat_net_index}, + {"__call", lbox_stat_net_call}, + {NULL, NULL} +}; + +/** Initialize box.stat package. */ void box_lua_stat_init(struct lua_State *L) { @@ -121,7 +148,14 @@ box_lua_stat_init(struct lua_State *L) lua_newtable(L); luaL_register(L, NULL, lbox_stat_meta); lua_setmetatable(L, -2); - lua_pop(L, 1); /* stat module */ + + + luaL_register_module(L, "box.stat.net", statlib); + + lua_newtable(L); + luaL_register(L, NULL, lbox_stat_net_meta); + lua_setmetatable(L, -2); + lua_pop(L, 1); /* stat net module */ } diff --git a/src/cbus.cc b/src/cbus.cc index 131a7f9d57c4400423d5d934d71bb7daae0c5f49..fd4a3f002ddc8615e47a425aec3277f2b9b372fd 100644 --- a/src/cbus.cc +++ b/src/cbus.cc @@ -31,6 +31,14 @@ #include "cbus.h" #include "scoped_guard.h" +struct rmean *rmean_net = NULL; +const char *rmean_net_strings[RMEAN_NET_LAST] = { + "EVENTS", + "LOCKS", + "RECEIVED", + "SENT" +}; + static void cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, int /* events */); @@ -183,9 +191,14 @@ cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, STAILQ_CONCAT(&peer->output, &peer->pipe); cbus_unlock(pipe->bus); + pipe->n_input = 0; - if (pipe_was_empty) + if (pipe_was_empty) { + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_EVENTS, 1); + ev_async_send(pipe->consumer, &pipe->fetch_output); + } if (peer_output_was_empty && !STAILQ_EMPTY(&peer->output)) ev_feed_event(peer->consumer, &peer->fetch_output, EV_CUSTOM); } @@ -200,6 +213,8 @@ cpipe_peek_impl(struct cpipe *pipe) assert(peer->producer == loop()); bool peer_pipe_was_empty = false; + + cbus_lock(pipe->bus); STAILQ_CONCAT(&pipe->output, &pipe->pipe); if (! STAILQ_EMPTY(&peer->input)) { @@ -209,8 +224,12 @@ cpipe_peek_impl(struct cpipe *pipe) cbus_unlock(pipe->bus); peer->n_input = 0; - if (peer_pipe_was_empty) + if (peer_pipe_was_empty) { + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_EVENTS, 1); + ev_async_send(peer->consumer, &peer->fetch_output); + } return STAILQ_FIRST(&pipe->output); } diff --git a/src/cbus.h b/src/cbus.h index 4985a36e3e1b15cc9d28d6d339846b22e52c9464..3b717d369950e79ea007dd7211e3a98b420ab343 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -32,6 +32,7 @@ */ #include "fiber.h" #include "coio.h" +#include "rmean.h" /** cbus, cmsg - inter-cord bus and messaging */ @@ -39,6 +40,20 @@ struct cmsg; struct cpipe; typedef void (*cmsg_f)(struct cmsg *); +/** rmean_net - network statistics (iproto & cbus) */ +extern struct rmean *rmean_net; + +enum rmean_net_name { + RMEAN_NET_EVENTS, + RMEAN_NET_LOCKS, + RMEAN_NET_RECEIVED, + RMEAN_NET_SENT, + + RMEAN_NET_LAST +}; + +extern const char *rmean_net_strings[RMEAN_NET_LAST]; + /** * One hop in a message travel route. A message may need to be * delivered to many destinations before it can be dispensed with. @@ -364,6 +379,10 @@ cbus_join(struct cbus *bus, struct cpipe *pipe); static inline void cbus_lock(struct cbus *bus) { + /* Count statistics */ + if (rmean_net) + rmean_collect(rmean_net, RMEAN_NET_LOCKS, 1); + tt_pthread_mutex_lock(&bus->mutex); } diff --git a/src/rmean.cc b/src/rmean.cc index b16c20a78b5ac9a32377a94a9e97415a8cbe9532..0720bf01749f66c56cf92a1b00ce969d04a153bc 100644 --- a/src/rmean.cc +++ b/src/rmean.cc @@ -38,6 +38,8 @@ void rmean_collect(struct rmean *rmean, size_t name, int64_t value) { + assert(name < rmean->stats_n); + rmean->stats[name].value[0] += value; rmean->stats[name].total += value; } @@ -70,8 +72,6 @@ rmean_age(ev_loop * /* loop */, ev_timer *timer, int /* events */) { struct rmean *rmean = (struct rmean *) timer->data; - if (rmean->stats == NULL) - return; for (size_t i = 0; i < rmean->stats_n; i++) { if (rmean->stats[i].name == NULL) @@ -96,10 +96,11 @@ struct rmean * rmean_new(const char **name, size_t n) { struct rmean *rmean = (struct rmean *) realloc(NULL, - sizeof(rmean) + sizeof(stats) * (n + 1)); + sizeof(struct rmean) + + sizeof(struct stats) * n); if (rmean == NULL) return NULL; - memset(rmean, 0, sizeof(rmean) + sizeof(stats) * n); + memset(rmean, 0, sizeof(struct rmean) + sizeof(struct stats) * n); rmean->stats_n = n; rmean->timer.data = (void *)rmean; ev_timer_init(&rmean->timer, rmean_age, 0, 1.); @@ -116,10 +117,10 @@ rmean_new(const char **name, size_t n) void rmean_delete(struct rmean *rmean) { - if (rmean) { - ev_timer_stop(loop(), &rmean->timer); - free(rmean); - } + + ev_timer_stop(loop(), &rmean->timer); + free(rmean); + rmean = 0; } void diff --git a/src/rmean.h b/src/rmean.h index 5b980b620c06d03694ef658ca9a5b4de54203275..b6dd5848a933aa40a14abf8775a19ffcd094a037 100644 --- a/src/rmean.h +++ b/src/rmean.h @@ -39,6 +39,7 @@ #define PERF_SECS 5 + struct stats { const char *name; int64_t value[PERF_SECS + 1]; diff --git a/test/box/misc.result b/test/box/misc.result index 24a3d4705eb3004b0849050db3f0f2dbce7285fb..0a4bd84df56668356e4478b72fc3fdfbbecf6c08 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -146,6 +146,7 @@ t; - REPLACE - UPSERT - AUTH + - ERROR - UPDATE - total - rps diff --git a/test/box/stat.result b/test/box/stat.result index f497bce2950ac9ed7361cea9924209f0c141f798..4ea136fccd32e24696f54bc0557b7b12015f7c29 100644 --- a/test/box/stat.result +++ b/test/box/stat.result @@ -21,6 +21,10 @@ box.stat.SELECT.total --- - 0 ... +box.stat.ERROR.total +--- +- 0 +... space = box.schema.space.create('tweedledum') --- ... @@ -50,6 +54,15 @@ box.stat.REPLACE.total ... box.stat.SELECT.total --- +- 2 +... +-- check exceptions +space:get('Impossible value') +--- +- error: 'Supplied key type of part 0 does not match index part type: expected NUM' +... +box.stat.ERROR.total +--- - 1 ... --# stop server default @@ -75,6 +88,10 @@ box.stat.SELECT.total --- - 0 ... +box.stat.ERROR.total +--- +- 0 +... -- cleanup box.space.tweedledum:drop() --- diff --git a/test/box/stat.test.lua b/test/box/stat.test.lua index 5ff384a4d19608c9320d71fc863e4dd644691b95..7484c781cfd41becb98fb988e7ff48d8e4f8cc24 100644 --- a/test/box/stat.test.lua +++ b/test/box/stat.test.lua @@ -6,6 +6,7 @@ box.stat.DELETE.total box.stat.UPDATE.total box.stat.REPLACE.total box.stat.SELECT.total +box.stat.ERROR.total space = box.schema.space.create('tweedledum') index = space:create_index('primary', { type = 'hash' }) @@ -19,6 +20,10 @@ box.stat.UPDATE.total box.stat.REPLACE.total box.stat.SELECT.total +-- check exceptions +space:get('Impossible value') +box.stat.ERROR.total + --# stop server default --# start server default @@ -28,6 +33,7 @@ box.stat.DELETE.total box.stat.UPDATE.total box.stat.REPLACE.total box.stat.SELECT.total +box.stat.ERROR.total -- cleanup box.space.tweedledum:drop() diff --git a/test/box/stat_net.result b/test/box/stat_net.result new file mode 100644 index 0000000000000000000000000000000000000000..fa3ca518dc1e1bcc7e60df7182aead09184c664e --- /dev/null +++ b/test/box/stat_net.result @@ -0,0 +1,57 @@ +-- clear statistics +--# stop server default +--# start server default +box.stat.net.SENT -- zero +--- +- total: 0 + rps: 0 +... +box.stat.net.RECEIVED -- zero +--- +- total: 0 + rps: 0 +... +space = box.schema.space.create('tweedledum') +--- +... +box.schema.user.grant('guest','read,write,execute','universe') +--- +... +index = space:create_index('primary', { type = 'hash' }) +--- +... +remote = require 'net.box' +--- +... +LISTEN = require('uri').parse(box.cfg.listen) +--- +... +cn = remote:new(LISTEN.host, LISTEN.service) +--- +... +cn.space.tweedledum:select() --small request +--- +- [] +... +box.stat.net.SENT.total > 0 +--- +- true +... +box.stat.net.RECEIVED.total > 0 +--- +- true +... +box.stat.net.EVENTS.total > 0 +--- +- true +... +box.stat.net.LOCKS.total > 0 +--- +- true +... +space:drop() +--- +... +cn:close() +--- +... diff --git a/test/box/stat_net.test.lua b/test/box/stat_net.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..62eaf09ecf3425de172711d5671be0ef5680f751 --- /dev/null +++ b/test/box/stat_net.test.lua @@ -0,0 +1,24 @@ +-- clear statistics +--# stop server default +--# start server default + +box.stat.net.SENT -- zero +box.stat.net.RECEIVED -- zero + +space = box.schema.space.create('tweedledum') +box.schema.user.grant('guest','read,write,execute','universe') +index = space:create_index('primary', { type = 'hash' }) +remote = require 'net.box' + +LISTEN = require('uri').parse(box.cfg.listen) +cn = remote:new(LISTEN.host, LISTEN.service) + +cn.space.tweedledum:select() --small request + +box.stat.net.SENT.total > 0 +box.stat.net.RECEIVED.total > 0 +box.stat.net.EVENTS.total > 0 +box.stat.net.LOCKS.total > 0 + +space:drop() +cn:close() diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt index 384fc7674ffa4fd93e752fe4cbf54a9ecb3bce75..c86a062ad7d52fce7758d5b0d33995a342e1032e 100644 --- a/test/unit/CMakeLists.txt +++ b/test/unit/CMakeLists.txt @@ -60,7 +60,8 @@ target_link_libraries(light.test small) add_executable(vclock.test vclock.cc unit.c ${CMAKE_SOURCE_DIR}/src/box/vclock.c ${CMAKE_SOURCE_DIR}/src/box/errcode.c - ${CMAKE_SOURCE_DIR}/src/box/error.cc) + ${CMAKE_SOURCE_DIR}/src/box/error.cc + ${CMAKE_SOURCE_DIR}/src/rmean.cc) target_link_libraries(vclock.test core small) add_executable(quota.test quota.cc unit.c) target_link_libraries(quota.test pthread)