diff --git a/src/box/iproto.cc b/src/box/iproto.cc index f5796b96f6893ce70280bdf8c8613f6c02714b29..50812467a48dfe556de756a5f72e74cb8b952c6e 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -55,6 +55,14 @@ /* {{{ iproto_msg - declaration */ +const char *rmean_net_names[RMEAN_NET_LAST] = { + "EVENTS", + "LOCKS", + "RECEIVED", + "SENT" +}; + + /** * A single msg from io thread. All requests * from all connections are queued into a single queue @@ -513,6 +521,9 @@ iproto_connection_on_input(ev_loop *loop, struct ev_io *watcher, iproto_connection_close(con); return; } + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_RECEIVED, nrd); + /* Update the read position and connection state. */ in->wpos += nrd; con->parse_size += nrd; @@ -564,6 +575,9 @@ iproto_flush(struct iobuf *iobuf, struct iproto_connection *con) iov[iovcnt-1].iov_len = end->iov_len - begin->iov_len * (iovcnt == 1); ssize_t nwr = sio_writev(fd, iov, iovcnt); + + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_SENT, nwr); if (nwr > 0) { if (begin->used + nwr == end->used) { if (ibuf_used(&iobuf->in) == 0) { @@ -772,8 +786,11 @@ net_send_greeting(struct cmsg *m) if (msg->close_connection) { struct obuf *out = &msg->iobuf->out; try { - sio_writev(con->output.fd, out->iov, - obuf_iovcnt(out)); + int64_t nwr = sio_writev(con->output.fd, out->iov, + obuf_iovcnt(out)); + + /* Count statistics */ + rmean_collect(rmean_net, RMEAN_NET_SENT, nwr); } catch (Exception *e) { e->log(); } @@ -846,13 +863,28 @@ net_cord_f(va_list /* ap */) evio_service_init(loop(), &binary, "binary", iproto_on_accept, NULL); + + /* Init statistics counter */ + rmean_net = rmean_new(rmean_net_names, RMEAN_NET_LAST); + rmean_cbus_is_count = true; + + if (rmean_net == NULL) + tnt_raise(OutOfMemory, + sizeof(*rmean_net) + + RMEAN_NET_LAST * sizeof(stats), + "rmean", "struct rmean"); + + cbus_join(&net_tx_bus, &net_pipe); + /* * Nothing to do in the fiber so far, the service * will take care of creating events for incoming * connections. */ fiber_yield(); + + rmean_delete(rmean_net); } /** Initialize the iproto subsystem and start network io thread */ @@ -873,6 +905,7 @@ iproto_init() if (cord_costart(&net_cord, "iproto", net_cord_f, NULL)) panic("failed to initialize iproto thread"); + cbus_join(&net_tx_bus, &tx_pipe); } diff --git a/src/box/lua/stat.cc b/src/box/lua/stat.cc index 7aa56497429d432b4280cad414d1269ee5b04016..add58243f19ad5768f2f9cb321d16a0c06e90bf2 100644 --- a/src/box/lua/stat.cc +++ b/src/box/lua/stat.cc @@ -34,6 +34,7 @@ #include <string.h> #include <rmean.h> #include <box/request.h> +#include <cbus.h> extern "C" { #include <lua.h> @@ -102,13 +103,34 @@ lbox_stat_call(struct lua_State *L) return 1; } +static int +lbox_stat_net_index(struct lua_State *L) +{ + luaL_checkstring(L, -1); + return rmean_foreach(rmean_net, seek_stat_item, L); +} + +static int +lbox_stat_net_call(struct lua_State *L) +{ + lua_newtable(L); + rmean_foreach(rmean_net, set_stat_item, L); + return 1; +} + static const struct luaL_reg lbox_stat_meta [] = { {"__index", lbox_stat_index}, {"__call", lbox_stat_call}, {NULL, NULL} }; -/** Initialize bos.stat package. */ +static const struct luaL_reg lbox_stat_net_meta [] = { + {"__index", lbox_stat_net_index}, + {"__call", lbox_stat_net_call}, + {NULL, NULL} +}; + +/** Initialize box.stat package. */ void box_lua_stat_init(struct lua_State *L) { @@ -121,7 +143,14 @@ box_lua_stat_init(struct lua_State *L) lua_newtable(L); luaL_register(L, NULL, lbox_stat_meta); lua_setmetatable(L, -2); - lua_pop(L, 1); /* stat module */ + + + luaL_register_module(L, "box.stat.net", statlib); + + lua_newtable(L); + luaL_register(L, NULL, lbox_stat_net_meta); + lua_setmetatable(L, -2); + lua_pop(L, 1); /* stat net module */ } diff --git a/src/cbus.cc b/src/cbus.cc index 131a7f9d57c4400423d5d934d71bb7daae0c5f49..b6267905d42167662357b2546b0fd3e973164434 100644 --- a/src/cbus.cc +++ b/src/cbus.cc @@ -31,6 +31,10 @@ #include "cbus.h" #include "scoped_guard.h" +struct rmean *rmean_net; + +bool rmean_cbus_is_count = 0; + static void cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, int /* events */); @@ -183,9 +187,15 @@ cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher, STAILQ_CONCAT(&peer->output, &peer->pipe); cbus_unlock(pipe->bus); + pipe->n_input = 0; - if (pipe_was_empty) + if (pipe_was_empty) { + /* Count statistics */ + if (rmean_cbus_is_count) + rmean_collect(rmean_net, RMEAN_NET_EVENTS, 1); + ev_async_send(pipe->consumer, &pipe->fetch_output); + } if (peer_output_was_empty && !STAILQ_EMPTY(&peer->output)) ev_feed_event(peer->consumer, &peer->fetch_output, EV_CUSTOM); } @@ -200,6 +210,8 @@ cpipe_peek_impl(struct cpipe *pipe) assert(peer->producer == loop()); bool peer_pipe_was_empty = false; + + cbus_lock(pipe->bus); STAILQ_CONCAT(&pipe->output, &pipe->pipe); if (! STAILQ_EMPTY(&peer->input)) { @@ -209,8 +221,13 @@ cpipe_peek_impl(struct cpipe *pipe) cbus_unlock(pipe->bus); peer->n_input = 0; - if (peer_pipe_was_empty) + if (peer_pipe_was_empty) { + /* Count statistics */ + if (rmean_cbus_is_count) + rmean_collect(rmean_net, RMEAN_NET_EVENTS, 1); + ev_async_send(peer->consumer, &peer->fetch_output); + } return STAILQ_FIRST(&pipe->output); } diff --git a/src/cbus.h b/src/cbus.h index 4985a36e3e1b15cc9d28d6d339846b22e52c9464..650d54dd6dd900133cfd6ac6fc786178395c821b 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -32,6 +32,7 @@ */ #include "fiber.h" #include "coio.h" +#include "rmean.h" /** cbus, cmsg - inter-cord bus and messaging */ @@ -39,6 +40,19 @@ struct cmsg; struct cpipe; typedef void (*cmsg_f)(struct cmsg *); +/** rmean_net - network statistics (iproto & cbus) */ +extern struct rmean *rmean_net; +extern bool rmean_cbus_is_count; + +enum rmean_net_name { + RMEAN_NET_EVENTS, + RMEAN_NET_LOCKS, + RMEAN_NET_RECEIVED, + RMEAN_NET_SENT, + + RMEAN_NET_LAST +}; + /** * One hop in a message travel route. A message may need to be * delivered to many destinations before it can be dispensed with. @@ -364,6 +378,10 @@ cbus_join(struct cbus *bus, struct cpipe *pipe); static inline void cbus_lock(struct cbus *bus) { + /* Count statistics */ + if (rmean_cbus_is_count) + rmean_collect(rmean_net, RMEAN_NET_LOCKS, 1); + tt_pthread_mutex_lock(&bus->mutex); }