diff --git a/include/fiber.h b/include/fiber.h index 9a8572399c591a595beb203ec111597cb80c7f37..b8396a5f8e80f771879fe863b259803330b230d6 100644 --- a/include/fiber.h +++ b/include/fiber.h @@ -106,6 +106,7 @@ struct fiber { va_list f_data; uint32_t flags; struct fiber *waiter; + uint64_t cookie; }; extern __thread struct fiber *fiber; @@ -164,12 +165,13 @@ struct tbuf; void fiber_schedule(ev_watcher *watcher, int event __attribute__((unused))); /** - * Attach this fiber to a session identified by sid. + * Attach this fiber to a session identified by sid and to a cookie. */ static inline void -fiber_set_sid(struct fiber *f, uint32_t sid) +fiber_set_sid(struct fiber *f, uint32_t sid, uint64_t cookie) { f->sid = sid; + f->cookie = cookie; } typedef int (*fiber_stat_cb)(struct fiber *f, void *ctx); diff --git a/include/log_io.h b/include/log_io.h index ecb463137341769cc88093618c21c50514566199..a0bf7c12e70003b5c9c96e644b7dcff9fa2acd95 100644 --- a/include/log_io.h +++ b/include/log_io.h @@ -149,7 +149,7 @@ struct wal_row { } __attribute__((packed)); void -wal_row_fill(struct wal_row *row, int64_t lsn, +wal_row_fill(struct wal_row *row, int64_t lsn, uint64_t cookie, const char *metadata, size_t metadata_len, const char *data, size_t data_len); diff --git a/include/recovery.h b/include/recovery.h index 1e54892c0122370182989772eab90a48017fa216..5a6b33ec6e56a3e8b3c37529a91c9711a60236eb 100644 --- a/include/recovery.h +++ b/include/recovery.h @@ -119,7 +119,7 @@ void recover_snap(struct recovery_state *); void recover_existing_wals(struct recovery_state *); void recovery_follow_local(struct recovery_state *r, ev_tstamp wal_dir_rescan_delay); void recovery_finalize(struct recovery_state *r); -int wal_write(struct recovery_state *r, int64_t lsn, +int wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, uint16_t op, const char *data, uint32_t len); void recovery_setup_panic(struct recovery_state *r, bool on_snap_error, bool on_wal_error); diff --git a/include/session.h b/include/session.h index 5aaf652589bd24d6c0764d917dccf43f346dd34a..60719cbc17848cefa169c1857a3918c7c567019e 100644 --- a/include/session.h +++ b/include/session.h @@ -51,7 +51,7 @@ * trigger fails or runs out of resources. */ uint32_t -session_create(int fd); +session_create(int fd, uint64_t cookie); /** * Destroy a session. diff --git a/src/admin.cc b/src/admin.cc index a0a74501ef296bb983ffe4c8da6dc17764e1a028..e5786ae1340c8aa710788c5d0af7a2cd74e55695 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -78,6 +78,7 @@ static void admin_handler(va_list ap) { struct ev_io coio = va_arg(ap, struct ev_io); + struct sockaddr_in *addr = va_arg(ap, struct sockaddr_in *); struct iobuf *iobuf = va_arg(ap, struct iobuf *); lua_State *L = lua_newthread(tarantool_L); int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX); @@ -95,7 +96,7 @@ admin_handler(va_list ap) * a remote client: it's used in Lua * stored procedures. */ - session_create(coio.fd); + session_create(coio.fd, *(uint64_t *) addr); for (;;) { if (admin_dispatch(&coio, iobuf, L) < 0) diff --git a/src/admin.rl b/src/admin.rl new file mode 100644 index 0000000000000000000000000000000000000000..59e5e06d3cfc9b68d298323fe1504546561f31aa --- /dev/null +++ b/src/admin.rl @@ -0,0 +1,387 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include <stdio.h> +#include <string.h> +#include <stdbool.h> +#include <unistd.h> +#include <stdlib.h> + +#include <fiber.h> +#include <palloc.h> +#include <salloc.h> +#include <say.h> +#include <stat.h> +#include <tarantool.h> +#include "lua/init.h" +#include <recovery.h> +#include <tbuf.h> +#include "tarantool/util.h" +#include <errinj.h> +#include "coio_buf.h" + +extern "C" { +#include <lua.h> +#include <lauxlib.h> +#include <lualib.h> +} + +#include "box/box.h" +#include "lua/init.h" +#include "session.h" +#include "scoped_guard.h" + +static const char *help = + "available commands:" CRLF + " - help" CRLF + " - exit" CRLF + " - show info" CRLF + " - show fiber" CRLF + " - show configuration" CRLF + " - show slab" CRLF + " - show palloc" CRLF + " - show stat" CRLF + " - show plugins" CRLF + " - save coredump" CRLF + " - save snapshot" CRLF + " - lua command" CRLF + " - reload configuration" CRLF + " - show injections (debug mode only)" CRLF + " - set injection <name> <state> (debug mode only)" CRLF; + +static const char *unknown_command = "unknown command. try typing help." CRLF; + +%%{ + machine admin; + write data; +}%% + +struct salloc_stat_admin_cb_ctx { + int64_t total_used; + struct tbuf *out; +}; + +static int +salloc_stat_admin_cb(const struct slab_cache_stats *cstat, void *cb_ctx) +{ + struct salloc_stat_admin_cb_ctx *ctx = (struct salloc_stat_admin_cb_ctx *) cb_ctx; + + tbuf_printf(ctx->out, + " - { item_size: %- 5i, slabs: %- 3i, items: %- 11" PRIi64 + ", bytes_used: %- 12" PRIi64 + ", bytes_free: %- 12" PRIi64 " }" CRLF, + (int)cstat->item_size, + (int)cstat->slabs, + cstat->items, + cstat->bytes_used, + cstat->bytes_free); + + ctx->total_used += cstat->bytes_used; + return 0; +} + +static void +show_slab(struct tbuf *out) +{ + struct salloc_stat_admin_cb_ctx cb_ctx; + struct slab_arena_stats astat; + + cb_ctx.total_used = 0; + cb_ctx.out = out; + + tbuf_printf(out, "slab statistics:\n classes:" CRLF); + + salloc_stat(salloc_stat_admin_cb, &astat, &cb_ctx); + + tbuf_printf(out, " items_used: %.2f%%" CRLF, + (double)cb_ctx.total_used / astat.size * 100); + tbuf_printf(out, " arena_used: %.2f%%" CRLF, + (double)astat.used / astat.size * 100); +} + +static void +end(struct tbuf *out) +{ + tbuf_printf(out, "..." CRLF); +} + +static void +start(struct tbuf *out) +{ + tbuf_printf(out, "---" CRLF); +} + +static void +ok(struct tbuf *out) +{ + start(out); + tbuf_printf(out, "ok" CRLF); + end(out); +} + +static void +fail(struct tbuf *out, struct tbuf *err) +{ + start(out); + tbuf_printf(out, "fail:%.*s" CRLF, err->size, (char *)err->data); + end(out); +} + +static void +tarantool_info(struct tbuf *out) +{ + tbuf_printf(out, "info:" CRLF); + tbuf_printf(out, " version: \"%s\"" CRLF, tarantool_version()); + tbuf_printf(out, " uptime: %i" CRLF, (int)tarantool_uptime()); + tbuf_printf(out, " pid: %i" CRLF, getpid()); + tbuf_printf(out, " logger_pid: %i" CRLF, logger_pid); + tbuf_printf(out, " snapshot_pid: %i" CRLF, snapshot_pid); + tbuf_printf(out, " lsn: %" PRIi64 CRLF, + recovery_state->confirmed_lsn); + tbuf_printf(out, " recovery_lag: %.3f" CRLF, + recovery_state->remote ? + recovery_state->remote->recovery_lag : 0); + tbuf_printf(out, " recovery_last_update: %.3f" CRLF, + recovery_state->remote ? + recovery_state->remote->recovery_last_update_tstamp :0); + box_info(out); + const char *path = cfg_filename_fullpath; + if (path == NULL) + path = cfg_filename; + tbuf_printf(out, " config: \"%s\"" CRLF, path); +} + +static int +show_stat_item(const char *name, int rps, int64_t total, void *ctx) +{ + struct tbuf *buf = (struct tbuf *) ctx; + int name_len = strlen(name); + tbuf_printf(buf, + " %s:%*s{ rps: %- 6i, total: %- 12" PRIi64 " }" CRLF, + name, 1 + stat_max_name_len - name_len, " ", rps, total); + return 0; +} + +void +show_stat(struct tbuf *buf) +{ + tbuf_printf(buf, "statistics:" CRLF); + stat_foreach(show_stat_item, buf); +} + +static int +admin_dispatch(struct ev_io *coio, struct iobuf *iobuf, lua_State *L) +{ + struct ibuf *in = &iobuf->in; + struct tbuf *out = tbuf_new(fiber->gc_pool); + struct tbuf *err = tbuf_new(fiber->gc_pool); + int cs; + char *p, *pe; + char *strstart, *strend; + bool state; + + while ((pe = (char *) memchr(in->pos, '\n', in->end - in->pos)) == NULL) { + if (coio_bread(coio, in, 1) <= 0) + return -1; + } + + pe++; + p = in->pos; + + %%{ + action show_plugins { + start(out); + show_plugins_stat(out); + end(out); + } + + action show_configuration { + start(out); + show_cfg(out); + end(out); + } + + action show_injections { + start(out); + errinj_info(out); + end(out); + } + + action help { + start(out); + tbuf_append(out, help, strlen(help)); + end(out); + } + + action lua { + strstart[strend-strstart]='\0'; + start(out); + tarantool_lua(L, out, strstart); + end(out); + } + + action reload_configuration { + if (reload_cfg(err)) + fail(out, err); + else + ok(out); + } + + action save_snapshot { + int ret = snapshot(); + + if (ret == 0) + ok(out); + else { + tbuf_printf(err, " can't save snapshot, errno %d (%s)", + ret, strerror(ret)); + + fail(out, err); + } + } + + action set_injection { + strstart[strend-strstart] = '\0'; + if (errinj_set_byname(strstart, state)) { + tbuf_printf(err, "can't find error injection '%s'", strstart); + fail(out, err); + } else { + ok(out); + } + } + + eol = "\n" | "\r\n"; + show = "sh"("o"("w")?)?; + info = "in"("f"("o")?)?; + check = "ch"("e"("c"("k")?)?)?; + configuration = "co"("n"("f"("i"("g"("u"("r"("a"("t"("i"("o"("n")?)?)?)?)?)?)?)?)?)?)?; + fiber = "fi"("b"("e"("r")?)?)?; + slab = "sl"("a"("b")?)?; + mod = "mo"("d")?; + palloc = "pa"("l"("l"("o"("c")?)?)?)?; + stat = "st"("a"("t")?)?; + plugins = "plugins"; + + help = "h"("e"("l"("p")?)?)?; + exit = "e"("x"("i"("t")?)?)? | "q"("u"("i"("t")?)?)?; + save = "sa"("v"("e")?)?; + coredump = "co"("r"("e"("d"("u"("m"("p")?)?)?)?)?)?; + snapshot = "sn"("a"("p"("s"("h"("o"("t")?)?)?)?)?)?; + string = [^\r\n]+ >{strstart = p;} %{strend = p;}; + reload = "re"("l"("o"("a"("d")?)?)?)?; + lua = "lu"("a")?; + + set = "se"("t")?; + injection = "in"("j"("e"("c"("t"("i"("o"("n")?)?)?)?)?)?)?; + injections = injection"s"; + namech = alnum | punct; + name = namech+ >{ strstart = p; } %{ strend = p; }; + state_on = "on" %{ state = true; }; + state_off = "of"("f")? %{ state = false; }; + state = state_on | state_off; + + commands = (help %help | + exit %{return -1;} | + lua " "+ string %lua | + show " "+ info %{start(out); tarantool_info(out); end(out);} | + show " "+ fiber %{start(out); fiber_info(out); end(out);} | + show " "+ configuration %show_configuration | + show " "+ slab %{start(out); show_slab(out); end(out);} | + show " "+ palloc %{start(out); palloc_stat(out); end(out);} | + show " "+ stat %{start(out); show_stat(out);end(out);} | + show " "+ injections %show_injections | + show " "+ plugins %show_plugins | + set " "+ injection " "+ name " "+ state %set_injection | + save " "+ coredump %{coredump(60); ok(out);} | + save " "+ snapshot %save_snapshot | + check " "+ slab %{slab_validate(); ok(out);} | + reload " "+ configuration %reload_configuration); + + main := commands eol; + write init; + write exec; + }%% + + in->pos = pe; + + if (p != pe) { + start(out); + tbuf_append(out, unknown_command, strlen(unknown_command)); + end(out); + } + + coio_write(coio, out->data, out->size); + return 0; +} + +static void +admin_handler(va_list ap) +{ + struct ev_io coio = va_arg(ap, struct ev_io); + struct sockaddr_in *addr = va_arg(ap, struct sockaddr_in *); + struct iobuf *iobuf = va_arg(ap, struct iobuf *); + lua_State *L = lua_newthread(tarantool_L); + int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX); + + auto scoped_guard = make_scoped_guard([&] { + luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref); + evio_close(&coio); + iobuf_delete(iobuf); + session_destroy(fiber->sid); + }); + + /* + * Admin and iproto connections must have a + * session object, representing the state of + * a remote client: it's used in Lua + * stored procedures. + */ + session_create(coio.fd, *(uint64_t *) addr); + for (;;) { + if (admin_dispatch(&coio, iobuf, L) < 0) + return; + iobuf_gc(iobuf); + fiber_gc(); + } +} + +void +admin_init(const char *bind_ipaddr, int admin_port) +{ + static struct coio_service admin; + coio_service_init(&admin, "admin", bind_ipaddr, + admin_port, admin_handler, NULL); + evio_service_start(&admin.evio_service); +} + +/* + * Local Variables: + * mode: c + * End: + * vim: syntax=objc + */ diff --git a/src/box/txn.cc b/src/box/txn.cc index 1dcfd99f3c2d66f81b16639bbe923afe86ed8820..414b57fae31f09ede9329ef824c0d680dd9ff63b 100644 --- a/src/box/txn.cc +++ b/src/box/txn.cc @@ -87,8 +87,8 @@ txn_commit(struct txn *txn) int64_t lsn = next_lsn(recovery_state); ev_tstamp start = ev_now(), stop; - int res = wal_write(recovery_state, lsn, txn->op, - txn->data, txn->len); + int res = wal_write(recovery_state, lsn, fiber->cookie, + txn->op, txn->data, txn->len); stop = ev_now(); if (stop - start > cfg.too_long_threshold) { diff --git a/src/coio.cc b/src/coio.cc index 677b726a1ead2d0d9aa86150b7b6181f71db3f57..3d143d9a2f0a0b43c449127463adb221b3486185 100644 --- a/src/coio.cc +++ b/src/coio.cc @@ -568,7 +568,7 @@ coio_service_on_accept(struct evio_service *evio_service, * Start the created fiber. It becomes the coio object owner * and will have to close it and free before termination. */ - fiber_call(f, coio, iobuf, service->handler_param); + fiber_call(f, coio, addr, iobuf, service->handler_param); } void diff --git a/src/iproto.cc b/src/iproto.cc index cc401a5c24ce66ec25b9cdd2cc326b32758855e8..4a6a273231df6c0bd4ff35713f77db237adb07b6 100644 --- a/src/iproto.cc +++ b/src/iproto.cc @@ -218,6 +218,9 @@ iproto_queue_init(struct iproto_queue *i_queue, static inline uint32_t iproto_session_id(struct iproto_session *session); +static inline uint64_t +iproto_session_cookie(struct iproto_session *session); + /** A handler to process all queued requests. */ static void iproto_queue_handler(va_list ap) @@ -227,7 +230,7 @@ iproto_queue_handler(va_list ap) restart: while (iproto_dequeue_request(i_queue, &request)) { - fiber_set_sid(fiber, iproto_session_id(request.session)); + fiber_set_sid(fiber, iproto_session_id(request.session), iproto_session_cookie(request.session)); request.process(&request); } iproto_cache_fiber(&request_queue); @@ -274,6 +277,7 @@ struct iproto_session struct ev_io output; /** Session id. */ uint32_t sid; + uint64_t cookie; }; SLIST_HEAD(, iproto_session) iproto_session_cache = @@ -302,6 +306,12 @@ iproto_session_id(struct iproto_session *session) return session->sid; } +static inline uint64_t +iproto_session_cookie(struct iproto_session *session) +{ + return session->cookie; +} + static void iproto_session_on_input(struct ev_io *watcher, int revents __attribute__((unused))); @@ -319,7 +329,8 @@ static void iproto_process_disconnect(struct iproto_request *request); static struct iproto_session * -iproto_session_create(const char *name, int fd, box_process_func *param) +iproto_session_create(const char *name, int fd, struct sockaddr_in *addr, + box_process_func *param) { struct iproto_session *session; if (SLIST_EMPTY(&iproto_session_cache)) { @@ -339,6 +350,7 @@ iproto_session_create(const char *name, int fd, box_process_func *param) session->parse_size = 0; session->write_pos = obuf_create_svp(&session->iobuf[0]->out); session->sid = 0; + session->cookie = *(uint64_t *) addr; return session; } @@ -694,7 +706,7 @@ iproto_process_connect(struct iproto_request *request) struct iobuf *iobuf = request->iobuf; int fd = session->input.fd; try { /* connect. */ - session->sid = session_create(fd); + session->sid = session_create(fd, session->cookie); } catch (const ClientError& e) { iproto_reply_error(&iobuf->out, request->header, e); try { @@ -723,7 +735,7 @@ iproto_process_connect(struct iproto_request *request) static void iproto_process_disconnect(struct iproto_request *request) { - fiber_set_sid(fiber, request->session->sid); + fiber_set_sid(fiber, request->session->sid, request->session->cookie); /* Runs the trigger, which may yield. */ iproto_session_destroy(request->session); } @@ -744,7 +756,7 @@ iproto_on_accept(struct evio_service *service, int fd, box_process_func *process_fun = (box_process_func*) service->on_accept_param; - session = iproto_session_create(name, fd, process_fun); + session = iproto_session_create(name, fd, addr, process_fun); iproto_enqueue_request(&request_queue, session, session->iobuf[0], &dummy_header, iproto_process_connect); diff --git a/src/log_io.cc b/src/log_io.cc index fda032c5fb1fb779af4037b4534133665ffd9afd..cae77c86d9889b807a4308a875a15c4f1f82fd43 100644 --- a/src/log_io.cc +++ b/src/log_io.cc @@ -53,13 +53,13 @@ row_header_sign(struct row_header *header) } void -wal_row_fill(struct wal_row *row, int64_t lsn, +wal_row_fill(struct wal_row *row, int64_t lsn, uint64_t cookie, const char *metadata, size_t metadata_len, const char *data, size_t data_len) { row->marker = row_marker; row->tag = WAL; /* unused. */ - row->cookie = 0; /* unused. */ + row->cookie = cookie; memcpy(row->data, metadata, metadata_len); memcpy(row->data + metadata_len, data, data_len); row_header_fill(&row->header, lsn, metadata_len + data_len + diff --git a/src/lua/fiber.cc b/src/lua/fiber.cc index ae5ab9552580078a18ca63de0cdb61321950cccb..baf5f94ec6fa85a101b37d6827511cc5c3e87058 100644 --- a/src/lua/fiber.cc +++ b/src/lua/fiber.cc @@ -382,7 +382,7 @@ lbox_fiber_detach(struct lua_State *L) /* Request a detach. */ lua_pushinteger(L, DETACH); /* A detached fiber has no associated session. */ - fiber_set_sid(fiber, 0); + fiber_set_sid(fiber, 0, 0); fiber_yield_to(caller); return 0; } @@ -503,7 +503,7 @@ lbox_fiber_create(struct lua_State *L) struct fiber *f = fiber_new("lua", box_lua_fiber_run); /* Preserve the session in a child fiber. */ - fiber_set_sid(f, fiber->sid); + fiber_set_sid(f, fiber->sid, fiber->cookie); /* Initially the fiber is cancellable */ f->flags |= FIBER_USER_MODE | FIBER_CANCELLABLE; diff --git a/src/recovery.cc b/src/recovery.cc index 96d48205e32a0f0b8249a116ad7b1eff993edf61..ad4bb787463afedd113984ca65ff5389188c7964 100644 --- a/src/recovery.cc +++ b/src/recovery.cc @@ -1142,7 +1142,7 @@ wal_writer_thread(void *worker_args) * to be written to disk and wait until this task is completed. */ int -wal_write(struct recovery_state *r, int64_t lsn, +wal_write(struct recovery_state *r, int64_t lsn, uint64_t cookie, uint16_t op, const char *row, uint32_t row_len) { say_debug("wal_write lsn=%" PRIi64, lsn); @@ -1159,7 +1159,7 @@ wal_write(struct recovery_state *r, int64_t lsn, req->fiber = fiber; req->res = -1; - wal_row_fill(&req->row, lsn, (const char *) &op, + wal_row_fill(&req->row, lsn, cookie, (const char *) &op, sizeof(op), row, row_len); (void) tt_pthread_mutex_lock(&writer->mutex); @@ -1206,7 +1206,8 @@ snapshot_write_row(struct log_io *l, struct fio_batch *batch, sizeof(struct wal_row) + data_len + metadata_len); - wal_row_fill(row, ++rows, metadata, metadata_len, data, data_len); + wal_row_fill(row, ++rows, snapshot_cookie, metadata, + metadata_len, data, data_len); row_header_sign(&row->header); fio_batch_add(batch, row, wal_row_size(row)); diff --git a/src/session.cc b/src/session.cc index aff039821dea273a450127304ea08e445d5cb250..e320563a521d48f338572d5f4c5609656ca9863d 100644 --- a/src/session.cc +++ b/src/session.cc @@ -32,6 +32,7 @@ #include "assoc.h" #include "trigger.h" #include "exception.h" +#include <sys/socket.h> uint32_t sid_max; @@ -41,7 +42,7 @@ RLIST_HEAD(session_on_connect); RLIST_HEAD(session_on_disconnect); uint32_t -session_create(int fd) +session_create(int fd, uint64_t cookie) { /* Return the next sid rolling over the reserved value of 0. */ while (++sid_max == 0) @@ -62,11 +63,11 @@ session_create(int fd) * Run the trigger *after* setting the current * fiber sid. */ - fiber_set_sid(fiber, sid); + fiber_set_sid(fiber, sid, cookie); try { trigger_run(&session_on_connect, NULL); } catch (const Exception& e) { - fiber_set_sid(fiber, 0); + fiber_set_sid(fiber, 0, 0); mh_i32ptr_remove(session_registry, &node, NULL); throw; }