diff --git a/extra/exports b/extra/exports index ffd11145d62750d2a53dfad9ebfeff853416b8b1..a9add2cc1a499f737844791b1b730654eae9e76e 100644 --- a/extra/exports +++ b/extra/exports @@ -223,6 +223,7 @@ box_sequence_next box_sequence_current box_sequence_set box_sequence_reset +box_session_push box_index_iterator box_iterator_next box_iterator_free diff --git a/src/box/box.cc b/src/box/box.cc index 0c15ba5e95b6002a1e48658596e5077644337800..f2554f051381158c499f7855b1253cff4d3e65df 100644 --- a/src/box/box.cc +++ b/src/box/box.cc @@ -1455,6 +1455,20 @@ box_sequence_reset(uint32_t seq_id) return sequence_data_delete(seq_id); } +int +box_session_push(const char *data, const char *data_end) +{ + struct session *session = current_session(); + if (session == NULL) + return -1; + struct port_msgpack port; + struct port *base = (struct port *)&port; + port_msgpack_create(base, data, data_end - data); + int rc = session_push(session, session_sync(session), base); + port_msgpack_destroy(base); + return rc; +} + static inline void box_register_replica(uint32_t id, const struct tt_uuid *uuid) { diff --git a/src/box/box.h b/src/box/box.h index 044d929d456d028ec7499be19bbf7c820b5ac468..c94e500ab11992ab029177986ad1e6fc63c7cb61 100644 --- a/src/box/box.h +++ b/src/box/box.h @@ -457,6 +457,20 @@ box_sequence_set(uint32_t seq_id, int64_t value); API_EXPORT int box_sequence_reset(uint32_t seq_id); +/** + * Push MessagePack data into a session data channel - socket, + * console or whatever is behind the session. Note, that + * successful push does not guarantee delivery in case it was sent + * into the network. Just like with write()/send() system calls. + * + * \param data begin of MessagePack to push + * \param data_end end of MessagePack to push + * \retval -1 on error (check box_error_last()) + * \retval 0 on success + */ +API_EXPORT int +box_session_push(const char *data, const char *data_end); + /** \endcond public */ /** diff --git a/src/box/call.c b/src/box/call.c index a46a61c3c7d39aa02d89abacda3c65a8c30ad2fd..7c70d816939be8e27b6511604b514b0449b027a8 100644 --- a/src/box/call.c +++ b/src/box/call.c @@ -64,17 +64,55 @@ port_msgpack_get_msgpack(struct port *base, uint32_t *size) return port->data; } +static int +port_msgpack_dump_msgpack(struct port *base, struct obuf *out) +{ + struct port_msgpack *port = (struct port_msgpack *)base; + assert(port->vtab == &port_msgpack_vtab); + size_t size = port->data_sz; + if (obuf_dup(out, port->data, size) == size) + return 0; + diag_set(OutOfMemory, size, "obuf_dup", "port->data"); + return -1; +} + extern void port_msgpack_dump_lua(struct port *base, struct lua_State *L, bool is_flat); +extern const char * +port_msgpack_dump_plain(struct port *base, uint32_t *size); + +void +port_msgpack_destroy(struct port *base) +{ + struct port_msgpack *port = (struct port_msgpack *)base; + assert(port->vtab == &port_msgpack_vtab); + free(port->plain); +} + +int +port_msgpack_set_plain(struct port *base, const char *plain, uint32_t len) +{ + struct port_msgpack *port = (struct port_msgpack *)base; + assert(port->plain == NULL); + port->plain = (char *)malloc(len + 1); + if (port->plain == NULL) { + diag_set(OutOfMemory, len + 1, "malloc", "port->plain"); + return -1; + } + memcpy(port->plain, plain, len); + port->plain[len] = 0; + return 0; +} + static const struct port_vtab port_msgpack_vtab = { - .dump_msgpack = NULL, + .dump_msgpack = port_msgpack_dump_msgpack, .dump_msgpack_16 = NULL, .dump_lua = port_msgpack_dump_lua, - .dump_plain = NULL, + .dump_plain = port_msgpack_dump_plain, .get_msgpack = port_msgpack_get_msgpack, .get_vdbemem = NULL, - .destroy = NULL, + .destroy = port_msgpack_destroy, }; int diff --git a/src/box/lua/console.c b/src/box/lua/console.c index bd454c26954cc6f5f6eee8858eb108b30ec03a78..b941f50c6626f9bc17580163c6bfa10107f2f4d2 100644 --- a/src/box/lua/console.c +++ b/src/box/lua/console.c @@ -37,6 +37,7 @@ #include "lua/fiber.h" #include "fiber.h" #include "coio.h" +#include "lua/msgpack.h" #include "lua-yaml/lyaml.h" #include <lua.h> #include <lauxlib.h> @@ -390,19 +391,17 @@ console_set_output_format(enum output_format output_format) } /** - * Dump port lua data with respect to output format: + * Dump Lua data to text with respect to output format: * YAML document tagged with !push! global tag or Lua string. - * @param port Port lua. + * @param L Lua state. * @param[out] size Size of the result. * - * @retval not NULL Tagged YAML document. + * @retval not NULL Tagged YAML document or Lua text. * @retval NULL Error. */ -const char * -port_lua_dump_plain(struct port *port, uint32_t *size) +static const char * +console_dump_plain(struct lua_State *L, uint32_t *size) { - struct port_lua *port_lua = (struct port_lua *) port; - struct lua_State *L = port_lua->L; enum output_format fmt = console_get_output_format(); if (fmt == OUTPUT_FORMAT_YAML) { int rc = lua_yaml_encode(L, luaL_yaml_default, "!push!", @@ -435,6 +434,78 @@ port_lua_dump_plain(struct port *port, uint32_t *size) return result; } +/** Plain text converter for port Lua data. */ +const char * +port_lua_dump_plain(struct port *base, uint32_t *size) +{ + return console_dump_plain(((struct port_lua *)base)->L, size); +} + +/** + * A helper for port_msgpack_dump_plain() to execute it safely + * regarding Lua errors. + */ +static int +port_msgpack_dump_plain_via_lua(struct lua_State *L) +{ + void **ctx = (void **)lua_touserdata(L, 1); + struct port_msgpack *port = (struct port_msgpack *)ctx[0]; + uint32_t *size = (uint32_t *)ctx[1]; + const char *data = port->data; + /* + * Need to pop, because YAML decoder will consume all what + * can be found on the stack. + */ + lua_pop(L, 1); + /* + * MessagePack -> Lua object -> YAML/Lua text. The middle + * is not really needed here, but there is no + * MessagePack -> YAML encoder yet. Neither + * MessagePack -> Lua text. + */ + luamp_decode(L, luaL_msgpack_default, &data); + data = console_dump_plain(L, size); + if (data == NULL) { + assert(port->plain == NULL); + } else { + /* + * Result is ignored, because in case of an error + * port->plain will stay NULL. And it will be + * returned by port_msgpack_dump_plain() as is. + */ + port_msgpack_set_plain((struct port *)port, data, *size); + } + return 0; + } + +/** Plain text converter for raw MessagePack. */ +const char * +port_msgpack_dump_plain(struct port *base, uint32_t *size) +{ + struct lua_State *L = tarantool_L; + void *ctx[2] = {(void *)base, (void *)size}; + /* + * lua_cpcall() protects from errors thrown from Lua which + * may break a caller, not knowing about Lua and not + * expecting any exceptions. + */ + if (lua_cpcall(L, port_msgpack_dump_plain_via_lua, ctx) != 0) { + /* + * Error string is pushed in case it was a Lua + * error. + */ + assert(lua_isstring(L, -1)); + diag_set(ClientError, ER_PROC_LUA, lua_tostring(L, -1)); + lua_pop(L, 1); + return NULL; + } + /* + * If there was an error, port->plain stayed NULL with + * installed diag. + */ + return ((struct port_msgpack *)base)->plain; +} + /** * Push a tagged YAML document or a Lua string into a console * socket. diff --git a/src/box/port.h b/src/box/port.h index 9d3d02b3c1626f44321b7a8dfabf0c2f15d87700..fa6c1374d2d43566c95a17d02316ac074145dbf4 100644 --- a/src/box/port.h +++ b/src/box/port.h @@ -86,6 +86,11 @@ struct port_msgpack { const struct port_vtab *vtab; const char *data; uint32_t data_sz; + /** + * Buffer for dump_plain() function. It is created during + * dump on demand and is deleted together with the port. + */ + char *plain; }; static_assert(sizeof(struct port_msgpack) <= sizeof(struct port), @@ -95,6 +100,16 @@ static_assert(sizeof(struct port_msgpack) <= sizeof(struct port), void port_msgpack_create(struct port *port, const char *data, uint32_t data_sz); +/** Destroy a MessagePack port. */ +void +port_msgpack_destroy(struct port *base); + +/** + * Set plain text version of data in the given port. It is copied. + */ +int +port_msgpack_set_plain(struct port *base, const char *plain, uint32_t len); + /** Port for storing the result of a Lua CALL/EVAL. */ struct port_lua { const struct port_vtab *vtab; diff --git a/test/box/function1.c b/test/box/function1.c index 87062d6a8f251fb226c559d571efa7367fab3ab6..a28431e865e437915512bcafc3bd1a61a5209618 100644 --- a/test/box/function1.c +++ b/test/box/function1.c @@ -245,3 +245,10 @@ test_sleep(box_function_ctx_t *ctx, const char *args, const char *args_end) fiber_sleep(0); return 0; } + +int +test_push(box_function_ctx_t *ctx, const char *args, const char *args_end) +{ + (void)ctx; + return box_session_push(args, args_end); +} diff --git a/test/box/push.result b/test/box/push.result index aebcb7501a8a5b524f3cf1be02b21e60801cd883..7888e4d92ee6023e5666d6aafc7bd1632112b211 100644 --- a/test/box/push.result +++ b/test/box/push.result @@ -563,3 +563,124 @@ box.schema.func.drop('do_long_and_push') box.session.on_disconnect(nil, on_disconnect) --- ... +-- +-- gh-4734: C API for session push. +-- +build_path = os.getenv("BUILDDIR") +--- +... +old_cpath = package.cpath +--- +... +package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath +--- +... +box.schema.func.create('function1.test_push', {language = 'C'}) +--- +... +box.schema.user.grant('guest', 'super') +--- +... +c = netbox.connect(box.cfg.listen) +--- +... +messages = {} +--- +... +c:call('function1.test_push', \ + {1, 2, 3}, \ + {on_push = table.insert, \ + on_push_ctx = messages}) +--- +- [] +... +messages +--- +- - [1, 2, 3] +... +c:close() +--- +... +-- +-- C can push to the console. +-- +-- A string having 0 byte inside. Check that it is handled fine. +s = '\x41\x00\x43' +--- +... +console = require('console') +--- +... +fio = require('fio') +--- +... +socket = require('socket') +--- +... +sock_path = fio.pathjoin(fio.cwd(), 'console.sock') +--- +... +_ = fio.unlink(sock_path) +--- +... +server = console.listen(sock_path) +--- +... +client = socket.tcp_connect('unix/', sock_path) +--- +... +_ = client:read({chunk = 128}) +--- +... +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +--- +... +client:read("\n...\n") +--- +- '%TAG !push! tag:tarantool.io/push,2018 + + --- [1, 2, 3, "A\0C"] + + ... + + ' +... +_ = client:read("\n...\n") +--- +... +-- Lua output format is supported too. +_ = client:write("\\set output lua\n") +--- +... +_ = client:read(";") +--- +... +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +--- +... +client:read(";") +--- +- '-- Push + + {1, 2, 3, "A\0C"};' +... +_ = client:read(";") +--- +... +client:close() +--- +- true +... +server:close() +--- +- true +... +box.schema.user.revoke('guest', 'super') +--- +... +box.schema.func.drop('function1.test_push') +--- +... +package.cpath = old_cpath +--- +... diff --git a/test/box/push.test.lua b/test/box/push.test.lua index 7ae6f4a8638bcc2424e486779003ff82e91f8fe4..f4518466ebf3e26d577966591e93f93a92d41ba2 100644 --- a/test/box/push.test.lua +++ b/test/box/push.test.lua @@ -264,3 +264,53 @@ chan_push:put(true) chan_push:get() box.schema.func.drop('do_long_and_push') box.session.on_disconnect(nil, on_disconnect) + +-- +-- gh-4734: C API for session push. +-- +build_path = os.getenv("BUILDDIR") +old_cpath = package.cpath +package.cpath = build_path..'/test/box/?.so;'..build_path..'/test/box/?.dylib;'..old_cpath + +box.schema.func.create('function1.test_push', {language = 'C'}) +box.schema.user.grant('guest', 'super') +c = netbox.connect(box.cfg.listen) +messages = {} +c:call('function1.test_push', \ + {1, 2, 3}, \ + {on_push = table.insert, \ + on_push_ctx = messages}) +messages +c:close() + +-- +-- C can push to the console. +-- + +-- A string having 0 byte inside. Check that it is handled fine. +s = '\x41\x00\x43' + +console = require('console') +fio = require('fio') +socket = require('socket') +sock_path = fio.pathjoin(fio.cwd(), 'console.sock') +_ = fio.unlink(sock_path) +server = console.listen(sock_path) +client = socket.tcp_connect('unix/', sock_path) +_ = client:read({chunk = 128}) +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +client:read("\n...\n") +_ = client:read("\n...\n") +-- Lua output format is supported too. +_ = client:write("\\set output lua\n") +_ = client:read(";") +_ = client:write("box.func['function1.test_push']:call({1, 2, 3, s})\n") +client:read(";") +_ = client:read(";") +client:close() +server:close() + +box.schema.user.revoke('guest', 'super') +box.schema.func.drop('function1.test_push') + +package.cpath = old_cpath