diff --git a/src/box/box.cc b/src/box/box.cc index c397d226556cd16d93abc6ee4c49742427f7f7f8..7fa92e0af027a3486fd0dee2c2de8cc227283e44 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -58,7 +58,7 @@ box_process_func box_process = process_ro; struct recovery_state *recovery; -static int stat_base; +int stat_base; int snapshot_pid = 0; /* snapshot processes pid */ static void @@ -78,7 +78,7 @@ process_rw(struct port *port, struct request *request) { try { stat_collect(stat_base, request->type, 1); - request->execute(request, port); + request_execute(request, port); port_eof(port); } catch (Exception *e) { txn_rollback_stmt(); @@ -249,7 +249,7 @@ box_leave_local_standby_mode(void *data __attribute__((unused))) */ space_end_recover(); - stat_cleanup(stat_base, IPROTO_TYPE_DML_MAX); + stat_cleanup(stat_base, IPROTO_TYPE_STAT_MAX); box_set_wal_mode(cfg_gets("wal_mode")); if (recovery_has_remote(recovery)) @@ -436,7 +436,7 @@ box_init() cfg_geti("panic_on_snap_error"), cfg_geti("panic_on_wal_error")); - stat_base = stat_register(iproto_type_strs, IPROTO_TYPE_DML_MAX); + stat_base = stat_register(iproto_type_strs, IPROTO_TYPE_STAT_MAX); if (recovery_has_data(recovery)) { /* Process existing snapshot */ diff --git a/src/box/box.h b/src/box/box.h index ca18d7043d8d6b4375751f376d9df1ba51fc5027..bba7df512f8ba9b84d6fdd22fdb1c336e49a9255 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -100,6 +100,9 @@ const char *box_status(void); void box_leave_local_standby_mode(void *data __attribute__((unused))); +void +box_process_auth(struct request *request); + void box_process_join(int fd, struct xrow_header *header); @@ -124,6 +127,8 @@ void box_set_too_long_threshold(double threshold); extern struct recovery_state *recovery; +extern int stat_base; + #if defined(__cplusplus) } #endif /* defined(__cplusplus) */ diff --git a/src/box/iproto.cc b/src/box/iproto.cc index c2b04236d9dff9a8d9d47821b2209efa34dffed9..57084d6853b4af512f808a4bbc606d63be5bca54 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -46,6 +46,9 @@ #include "xrow.h" #include "iproto_constants.h" #include "user_def.h" +#include "authentication.h" +#include "stat.h" +#include "lua/call.h" /* {{{ iproto_request - declaration */ @@ -84,10 +87,7 @@ static void iproto_process_disconnect(struct iproto_request *request); static void -iproto_process_dml(struct iproto_request *request); - -static void -iproto_process_admin(struct iproto_request *request); +iproto_process(struct iproto_request *request); struct IprotoRequestGuard { struct iproto_request *ireq; @@ -471,7 +471,7 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) if (reqend > in->end) break; struct iproto_request *ireq = - iproto_request_new(con, iproto_process_dml); + iproto_request_new(con, iproto_process); IprotoRequestGuard guard(ireq); xrow_header_decode(&ireq->header, &pos, reqend); @@ -482,7 +482,9 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) * as well as in->pos must not be advanced, to * stay in sync. */ - if (iproto_type_is_dml(ireq->header.type)) { + if (ireq->header.type >= IPROTO_SELECT && + ireq->header.type <= IPROTO_AUTH) { + /* Pre-parse request before putting it into the queue */ if (ireq->header.bodycnt == 0) { tnt_raise(ClientError, ER_INVALID_MSGPACK, "request type"); @@ -491,8 +493,6 @@ iproto_enqueue_batch(struct iproto_connection *con, struct ibuf *in) pos = (const char *) ireq->header.body[0].iov_base; request_decode(&ireq->request, pos, ireq->header.body[0].iov_len); - } else { - ireq->process = iproto_process_admin; } ireq->request.header = &ireq->header; iproto_queue_push(&request_queue, guard.release()); @@ -630,45 +630,10 @@ iproto_connection_on_output(ev_loop *loop, struct ev_io *watcher, /* {{{ iproto_process_* functions */ static void -iproto_process_dml(struct iproto_request *ireq) +iproto_process(struct iproto_request *ireq) { struct iobuf *iobuf = ireq->iobuf; - struct iproto_connection *con = ireq->connection; - - auto scope_guard = make_scoped_guard([=]{ - /* Discard request (see iproto_enqueue_batch()) */ - iobuf->in.pos += ireq->total_len; - - if (evio_is_active(&con->output)) { - if (! ev_is_active(&con->output)) - ev_feed_event(con->loop, - &con->output, - EV_WRITE); - } else if (iproto_connection_is_idle(con)) { - iproto_connection_delete(con); - } - }); - - if (unlikely(! evio_is_active(&con->output))) - return; - struct obuf *out = &iobuf->out; - - struct iproto_port port; - iproto_port_init(&port, out, ireq->header.sync); - try { - box_process((struct port *) &port, &ireq->request); - } catch (Exception *e) { - if (port.found) - obuf_rollback_to_svp(out, &port.svp); - iproto_reply_error(out, e, ireq->header.sync); - } -} - -static void -iproto_process_admin(struct iproto_request *ireq) -{ - struct iobuf *iobuf = ireq->iobuf; struct iproto_connection *con = ireq->connection; auto scope_guard = make_scoped_guard([=]{ @@ -688,11 +653,33 @@ iproto_process_admin(struct iproto_request *ireq) if (unlikely(! evio_is_active(&con->output))) return; + struct obuf_svp svp = obuf_create_svp(out); try { switch (ireq->header.type) { + case IPROTO_SELECT: + case IPROTO_INSERT: + case IPROTO_REPLACE: + case IPROTO_UPDATE: + case IPROTO_DELETE: + struct iproto_port port; + iproto_port_init(&port, out, ireq->header.sync); + box_process((struct port *) &port, &ireq->request); + break; + case IPROTO_CALL: + stat_collect(stat_base, ireq->request.type, 1); + box_lua_call(&ireq->request, &iobuf->out); + break; + case IPROTO_AUTH: + { + const char *user = ireq->request.key; + uint32_t len = mp_decode_strl(&user); + authenticate(user, len, ireq->request.tuple, + ireq->request.tuple_end); + iproto_reply_ok(&ireq->iobuf->out, ireq->header.sync); + break; + } case IPROTO_PING: - iproto_reply_ping(&ireq->iobuf->out, - ireq->header.sync); + iproto_reply_ok(&ireq->iobuf->out, ireq->header.sync); break; case IPROTO_JOIN: ev_io_stop(con->loop, &con->input); @@ -713,7 +700,7 @@ iproto_process_admin(struct iproto_request *ireq) (uint32_t) ireq->header.type); } } catch (Exception *e) { - say_error("admin command error: %s", e->errmsg()); + obuf_rollback_to_svp(&iobuf->out, &svp); iproto_reply_error(&iobuf->out, e, ireq->header.sync); } } diff --git a/src/box/iproto_constants.c b/src/box/iproto_constants.c index a0893501f7cf4bcb4e31acf23663fc7e05e1acb1..230454326d9df8f0866228558ca60f4fa1023d20 100644 --- a/src/box/iproto_constants.c +++ b/src/box/iproto_constants.c @@ -98,7 +98,7 @@ const char *iproto_type_strs[] = }; #define bit(c) (1ULL<<IPROTO_##c) -const uint64_t iproto_body_key_map[IPROTO_TYPE_DML_MAX] = { +const uint64_t iproto_body_key_map[IPROTO_AUTH + 1] = { 0, /* unused */ bit(SPACE_ID) | bit(LIMIT) | bit(KEY), /* SELECT */ bit(SPACE_ID) | bit(TUPLE), /* INSERT */ diff --git a/src/box/iproto_constants.h b/src/box/iproto_constants.h index e89ad4fd3301e3912b8d8753503b00ef1dd833f7..917dbe62d72b0f3bfb5862e210f98dd2c7592093 100644 --- a/src/box/iproto_constants.h +++ b/src/box/iproto_constants.h @@ -116,9 +116,10 @@ enum iproto_type { IPROTO_REPLACE = 3, IPROTO_UPDATE = 4, IPROTO_DELETE = 5, + IPROTO_TYPE_DML_MAX = IPROTO_DELETE + 1, IPROTO_CALL = 6, + IPROTO_TYPE_STAT_MAX = IPROTO_CALL + 1, IPROTO_AUTH = 7, - IPROTO_TYPE_DML_MAX = IPROTO_AUTH + 1, /* admin command codes */ IPROTO_PING = 64, IPROTO_JOIN = 65, diff --git a/src/box/iproto_port.cc b/src/box/iproto_port.cc index d2810a08b550f478bdda7ab3b0dcfa7cf2f12858..07655b2b9f403618b482e1bda2d2520b7bce67f0 100644 --- a/src/box/iproto_port.cc +++ b/src/box/iproto_port.cc @@ -69,7 +69,7 @@ iproto_encode_error(uint32_t error) } void -iproto_reply_ping(struct obuf *out, uint64_t sync) +iproto_reply_ok(struct obuf *out, uint64_t sync) { struct iproto_header_bin reply = iproto_header_bin; reply.v_len = mp_bswap_u32(sizeof(iproto_header_bin) - 5); @@ -121,20 +121,32 @@ iproto_port_eof(struct port *ptr) struct iproto_port *port = iproto_port(ptr); /* found == 0 means add_tuple wasn't called at all. */ if (port->found == 0) { - port->svp = obuf_book(port->buf, SVP_SIZE); - port->size += SVP_SIZE; + port->svp = iproto_prepare_select(port->buf); } - uint32_t len = port->size - 5; + iproto_reply_select(port->buf, &port->svp, port->sync, port->found); +} + +struct obuf_svp +iproto_prepare_select(struct obuf *buf) +{ + return obuf_book(buf, SVP_SIZE); +} + +void +iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t count) +{ + uint32_t len = obuf_size(buf) - svp->size - 5; struct iproto_header_bin header = iproto_header_bin; header.v_len = mp_bswap_u32(len); - header.v_sync = mp_bswap_u64(port->sync); + header.v_sync = mp_bswap_u64(sync); struct iproto_body_bin body = iproto_body_bin; - body.v_data_len = mp_bswap_u32(port->found); + body.v_data_len = mp_bswap_u32(count); - char *pos = (char *) obuf_svp_to_ptr(port->buf, &port->svp); + char *pos = (char *) obuf_svp_to_ptr(buf, svp); memcpy(pos, &header, sizeof(header)); memcpy(pos + sizeof(header), &body, sizeof(body)); } @@ -145,11 +157,9 @@ iproto_port_add_tuple(struct port *ptr, struct tuple *tuple) struct iproto_port *port = iproto_port(ptr); if (++port->found == 1) { /* Found the first tuple, add header. */ - port->svp = obuf_book(port->buf, SVP_SIZE); - port->size += SVP_SIZE; + port->svp = iproto_prepare_select(port->buf); } tuple_to_obuf(tuple, port->buf); - port->size += tuple->bsize; } struct port_vtab iproto_port_vtab = { diff --git a/src/box/iproto_port.h b/src/box/iproto_port.h index 9349f71c60c568605bc84ae191188d23ed0d8015..4233271ee2d2785943f10557d42a216688f8b9e3 100644 --- a/src/box/iproto_port.h +++ b/src/box/iproto_port.h @@ -64,7 +64,6 @@ struct iproto_port /** A pointer in the reply buffer where the reply starts. */ struct obuf_svp svp; /** Size of data written after reply starts */ - uint32_t size; }; extern struct port_vtab iproto_port_vtab; @@ -77,15 +76,21 @@ iproto_port_init(struct iproto_port *port, struct obuf *buf, port->buf = buf; port->sync = sync; port->found = 0; - port->size = 0; } /** Stack a reply to 'ping' packet. */ void -iproto_reply_ping(struct obuf *out, uint64_t sync); +iproto_reply_ok(struct obuf *out, uint64_t sync); /** Send an error packet back. */ void iproto_reply_error(struct obuf *out, const Exception *e, uint64_t sync); +struct obuf_svp +iproto_prepare_select(struct obuf *buf); + +void +iproto_reply_select(struct obuf *buf, struct obuf_svp *svp, uint64_t sync, + uint32_t count); + #endif /* TARANTOOL_IPROTO_PORT_H_INCLUDED */ diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index 196eefdd85047893f2b49ee11865640739a62ba2..7b1da1085f7cc7e45f495808bffaeeae062e2af8 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -53,6 +53,7 @@ #include "box/schema.h" #include "box/session.h" #include "box/iproto_constants.h" +#include "box/iproto_port.h" /* contents of box.lua, misc.lua, box.net.lua respectively */ extern char session_lua[], @@ -147,60 +148,6 @@ port_lua_table_create(struct lua_State *L) return (struct port *) port; } -static void -port_add_lua_ret(struct port *port, struct lua_State *L, int index) -{ - struct tuple *tuple = lua_totuple(L, index, index); - TupleGuard guard(tuple); - port_add_tuple(port, tuple); -} - -/** - * Add all elements from Lua stack to fiber iov. - * - * To allow clients to understand a complex return from - * a procedure, we are compatible with SELECT protocol, - * and return the number of return values first, and - * then each return value as a tuple. - * - * If a Lua stack contains at least one scalar, each - * value on the stack is converted to a tuple. A Lua - * is converted to a tuple with multiple fields. - * - * If the stack is a Lua table, each member of which is - * not scalar, each member of the table is converted to - * a tuple. This way very large lists of return values can - * be used, since Lua stack size is limited by 8000 elements, - * while Lua table size is pretty much unlimited. - */ -static void -port_add_lua_multret(struct port *port, struct lua_State *L) -{ - int nargs = lua_gettop(L); - /** Check if we deal with a table of tables. */ - if (nargs == 1 && lua_istable(L, 1)) { - /* - * The table is not empty and consists of tables - * or tuples. Treat each table element as a tuple, - * and push it. - */ - lua_pushnil(L); - int has_keys = lua_next(L, 1); - if (has_keys && (lua_istable(L, -1) || lua_istuple(L, -1))) { - do { - port_add_lua_ret(port, L, lua_gettop(L)); - lua_pop(L, 1); - } while (lua_next(L, 1)); - return; - } else if (has_keys) { - lua_pop(L, 1); - } - } - for (int i = 1; i <= nargs; ++i) { - port_add_lua_ret(port, L, i); - } -} - /* }}} */ /** @@ -547,11 +494,9 @@ SetuidGuard::~SetuidGuard() * Invoke a Lua stored procedure from the binary protocol * (implementation of 'CALL' command code). */ -void -box_lua_call(struct request *request, struct port *port) +static inline void +execute_call(lua_State *L, struct request *request, struct obuf *out) { - lua_State *L = lua_newthread(tarantool_L); - LuarefGuard coro_ref(tarantool_L); const char *name = request->key; uint32_t name_len = mp_decode_strl(&name); @@ -573,9 +518,76 @@ box_lua_call(struct request *request, struct port *port) for (uint32_t i = 0; i < arg_count; i++) { luamp_decode(L, luaL_msgpack_default, &args); } - lbox_call(L, arg_count + oc - 1, LUA_MULTRET); - /* Send results of the called procedure to the client. */ - port_add_lua_multret(port, L); + lua_call(L, arg_count + oc - 1, LUA_MULTRET); + + /** + * Add all elements from Lua stack to iproto. + * + * To allow clients to understand a complex return from + * a procedure, we are compatible with SELECT protocol, + * and return the number of return values first, and + * then each return value as a tuple. + * + * If a Lua stack contains at least one scalar, each + * value on the stack is converted to a tuple. A Lua + * is converted to a tuple with multiple fields. + * + * If the stack is a Lua table, each member of which is + * not scalar, each member of the table is converted to + * a tuple. This way very large lists of return values can + * be used, since Lua stack size is limited by 8000 elements, + * while Lua table size is pretty much unlimited. + */ + + uint32_t count = 0; + struct obuf_svp svp = iproto_prepare_select(out); + + /** Check if we deal with a table of tables. */ + int nrets = lua_gettop(L); + if (nrets == 1 && lua_istable(L, 1)) { + /* + * The table is not empty and consists of tables + * or tuples. Treat each table element as a tuple, + * and push it. + */ + lua_pushnil(L); + int has_keys = lua_next(L, 1); + if (has_keys && (lua_istable(L, -1) || lua_istuple(L, -1))) { + do { + int top = lua_gettop(L); + luamp_encodestack(L, out, top, top); + ++count; + lua_pop(L, 1); + } while (lua_next(L, 1)); + goto done; + } else if (has_keys) { + lua_pop(L, 1); + } + } + for (int i = 1; i <= nrets; ++i) { + luamp_encodestack(L, out, i, i); + ++count; + } + +done: + iproto_reply_select(out, &svp, request->header->sync, count); +} + +void +box_lua_call(struct request *request, struct obuf *out) +{ + lua_State *L = NULL; + try { + L = lua_newthread(tarantool_L); + LuarefGuard coro_ref(tarantool_L); + execute_call(L, request, out); + } catch (Exception *e) { + /* Let all well-behaved exceptions pass through. */ + throw; + } catch (...) { + /* Convert Lua error to a Tarantool exception. */ + tnt_raise(LuajitError, L != NULL ? L : tarantool_L); + } } static int diff --git a/src/box/lua/call.h b/src/box/lua/call.h index 44ea45bd4a9abb987d4bdf666748ac8479dac0d6..bbb1a41f5a10613d5554b66a216ad2de24285996 100644 --- a/src/box/lua/call.h +++ b/src/box/lua/call.h @@ -39,7 +39,7 @@ struct port; * (implementation of 'CALL' command code). */ void -box_lua_call(struct request *request, struct port *port); +box_lua_call(struct request *request, struct obuf *out); extern "C" { struct port_ffi diff --git a/src/box/lua/tuple.h b/src/box/lua/tuple.h index 05a830b168ad46897ccd6fc49a28bc52235d8db2..6943d64fee5488f2cde5436e56183b3a8102faff 100644 --- a/src/box/lua/tuple.h +++ b/src/box/lua/tuple.h @@ -31,7 +31,6 @@ struct lua_State; struct txn; struct tuple; -struct tbuf; /** * Push tuple on lua stack @@ -46,7 +45,7 @@ struct tuple* lua_totuple(struct lua_State *L, int first, int last); int -luamp_encodestack(struct lua_State *L, struct tbuf *b, +luamp_encodestack(struct lua_State *L, struct obuf *b, int first, int last); void diff --git a/src/box/request.cc b/src/box/request.cc index e3289d6d58cf7f49d22cf55c8b6723aa1fbb0be7..c8fc74aea53d2b9a15347875d0816bdcd12f1065 100644 --- a/src/box/request.cc +++ b/src/box/request.cc @@ -38,8 +38,6 @@ #include <errinj.h> #include <fiber.h> #include <scoped_guard.h> -#include <third_party/base64.h> -#include "authentication.h" #include "user_def.h" #include "iproto_constants.h" @@ -160,35 +158,35 @@ execute_select(struct request *request, struct port *port) } } +/** }}} */ + void -execute_auth(struct request *request, struct port * /* port */) +request_create(struct request *request, uint32_t type) { - const char *user = request->key; - uint32_t len = mp_decode_strl(&user); - authenticate(user, len, request->tuple, request->tuple_end); + + memset(request, 0, sizeof(*request)); + request->type = type; } -/** }}} */ +typedef void (*request_execute_f)(struct request *, struct port *); void -request_create(struct request *request, uint32_t type) +request_execute(struct request *request, struct port *port) { - if (!iproto_type_is_dml(type)) - tnt_raise(LoggedError, ER_UNKNOWN_REQUEST_TYPE, type); + assert(iproto_type_is_dml(request->type)); static const request_execute_f execute_map[] = { NULL, execute_select, execute_replace, execute_replace, - execute_update, execute_delete, box_lua_call, - execute_auth, + execute_update, execute_delete }; - memset(request, 0, sizeof(*request)); - request->type = type; - request->execute = execute_map[type]; + + request_execute_f fun = execute_map[request->type]; + assert(fun != NULL); + fun(request, port); } void request_decode(struct request *request, const char *data, uint32_t len) { - assert(request->execute != NULL); const char *end = data + len; uint64_t key_map = iproto_body_key_map[request->type]; diff --git a/src/box/request.h b/src/box/request.h index e257109f65fc3b07820e2d040831cc57b7c77b8c..eed53bcb205c2aeadf752689e3a56524be9d0fde 100644 --- a/src/box/request.h +++ b/src/box/request.h @@ -34,7 +34,6 @@ struct txn; struct port; -typedef void (*request_execute_f)(struct request *, struct port *); struct request { @@ -61,13 +60,14 @@ struct request const char *tuple_end; /** Base field offset for error messages, e.g. 0 for C and 1 for Lua. */ int field_base; - - request_execute_f execute; }; void request_create(struct request *request, uint32_t code); +void +request_execute(struct request *request, struct port *port); + void request_decode(struct request *request, const char *data, uint32_t len); diff --git a/test/box/misc.result b/test/box/misc.result index 4a6b2751e572cbe622e672e66348f3227db44573..fd4be8cb229ab86d67ac48a1041754790c504888 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -88,7 +88,6 @@ t; - SELECT - REPLACE - INSERT - - AUTH - CALL - UPDATE - total