diff --git a/doc/user/data-model.xml b/doc/user/data-model.xml index 63560fd000b0387bcc8b19f3bee68feaacb36254..f6e6f590aeeedd056fa3ab69b7f163ade21c4266 100644 --- a/doc/user/data-model.xml +++ b/doc/user/data-model.xml @@ -10,10 +10,10 @@ length is varying: a tuple can contain any number of fields. A field can be either numeric — 32- or 64- bit unsigned integer, or binary - string — a sequence of octets. The first field of a - tuple is always assumed to be - the identifying (unique) key. The remaining fields make up a - value, associated with the key. + string — a sequence of octets. Fields included into the + first index are always assumed to be the identifying (unique) + key. The remaining fields make up a value, associated with the + key. Tuple sets are called <emphasis>spaces<alt>perhaps, not the best name</alt></emphasis>, and there can be up to 255 spaces defined per one Tarantool instance. diff --git a/doc/user/stored-procedures.xml b/doc/user/stored-procedures.xml index 22c57c94581d99c021712afe920b63e1e499bbc7..9f95b4d54731379862dd233ace2665859810f4e0 100644 --- a/doc/user/stored-procedures.xml +++ b/doc/user/stored-procedures.xml @@ -175,7 +175,7 @@ Call OK, 2 rows affected In the above example, the way the procedure receives its argument is identical in two protocols, when the argument is a string. A numeric field, however, when submitted via the - binary protocol, is seen by the procedure as + binary protocol, is seen by the procedure as a 4-byte blob, not as a Lua <quote>number</quote> type. </para> <para>In addition to conventional method invocation, @@ -432,7 +432,7 @@ localhost> lua box.select(5, 1, 'firstname', 'lastname') argument. A format specifier also works as a placeholder for the number of a field, which needs to be updated, or for an argument value. - For example: + For example: <simplelist> <member><code>+p=p</code> — add a value to one field and assign another, @@ -945,7 +945,7 @@ qkl ... localhost> lua t:unpack() --- - - + - - abc - cde - efg @@ -1564,6 +1564,80 @@ and <code>box.fiber.testcancel()</code> is checked whenever such an event occurs <!-- end of lib --> +<variablelist> + <title>Package <code>box.session</code></title> + <para>Learn session state, set on-connect and + on-disconnect triggers. + </para> + <para> +A session is an object associated with each client connection. +Through this module, it's possible to query session state, +as well as set a Lua chunk executed on connect or disconnect +event. + </para> + <varlistentry> + <term> + <emphasis role="lua">box.session.id() </emphasis> + </term> + <listitem><simpara>Return a unique monotonic identifier of + the current session. The identifier can be used to check + whether or not a session is alive. 0 means there is no + session (e.g. a procedure is running in a detached + fiber). + </simpara></listitem> + </varlistentry> + + <varlistentry> + <term> + <emphasis role="lua">box.session.fd(id) </emphasis> + </term> + <listitem><simpara>Return an integer file descriptor + associated with the connected client.</simpara></listitem> + </varlistentry> + + <varlistentry> + <term> + <emphasis role="lua">box.session.exists(id) </emphasis> + </term> + <listitem><simpara>Return true if a session is alive, + false otherwise.</simpara></listitem> + </varlistentry> + + <varlistentry> + <term> + <emphasis role="lua">box.session.on_connect(chunk) </emphasis> + </term> + <listitem><para> + Set a callback (trigger) invoked on each connected session. + The callback doesn't get any arguments, but is the first + thing invoked in the scope of the newly created session. + If the trigger fails by raising an error, the error + is sent to the client and the connection is shut down. + Returns the old value of the trigger. + </para> + <warning> + <para> + If a trigger always results in an error, it may become + impossible to connect to the server to reset it. + </para> + </warning> + </listitem> + </varlistentry> + + <varlistentry> + <term> + <emphasis role="lua">box.session.on_disconnect(chunk)</emphasis> + </term> + <listitem><simpara>Set a trigger invoked after a client has + disconnected. Returns the old value of the trigger. If + the trigger raises an error, the error is logged but otherwise + is ignored. + </simpara></listitem> + </varlistentry> +</variablelist> + +<!-- end of lib --> + <variablelist> <title>Package <code>box.ipc</code> — inter procedure communication</title> <simpara> diff --git a/include/box/box.h b/include/box/box.h index 6cd4096bc52dbc86bf8919e409b2d83867607223..9eba8504898ef971ea66a5bc2020e5489e168f1e 100644 --- a/include/box/box.h +++ b/include/box/box.h @@ -84,12 +84,6 @@ void box_snapshot(struct log_io *, struct fio_batch *batch); */ void box_info(struct tbuf *out); const char *box_status(void); -/** - * Issue a new session identifier - - * called by the networking layer - * when a new connection is established. - */ -uint32_t box_sid(); /** * Called to enter master or replica * mode (depending on the configuration) after @@ -97,5 +91,4 @@ uint32_t box_sid(); */ void box_leave_local_standby_mode(void *data __attribute__((unused))); - #endif /* INCLUDES_TARANTOOL_BOX_H */ diff --git a/include/lua/session.h b/include/lua/session.h new file mode 100644 index 0000000000000000000000000000000000000000..d6456d59871333118bce13168e855d0fff289e93 --- /dev/null +++ b/include/lua/session.h @@ -0,0 +1,35 @@ +#ifndef INCLUDES_TARANTOOL_LUA_SESSION_H +#define INCLUDES_TARANTOOL_LUA_SESSION_H +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +struct lua_State; + +void +tarantool_lua_session_init(struct lua_State *L); +#endif /* INCLUDES_TARANTOOL_LUA_SESSION_H */ diff --git a/include/mhash.h b/include/mhash.h index e0b932fbb2c7ec94b855472397cb6cc2337f2174..04af6f83e27a6cedbc4e9cd9cb0235050965e4ce 100644 --- a/include/mhash.h +++ b/include/mhash.h @@ -286,6 +286,14 @@ _mh(del)(struct _mh(t) *h, mh_int_t x) } #endif +static inline void +_mh(remove)(struct _mh(t) *h, mh_key_t key) +{ + mh_int_t k = _mh(get)(h, key); + if (k != mh_end(h)) + _mh(del)(h, k); +} + #ifdef MH_SOURCE diff --git a/include/session.h b/include/session.h new file mode 100644 index 0000000000000000000000000000000000000000..2eaf1d9c88141a429f0d9acf4ffb4f1fca69c2e0 --- /dev/null +++ b/include/session.h @@ -0,0 +1,106 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include <inttypes.h> +#include <stdbool.h> + +/** + * Abstraction of a single user session: + * for now, only provides accounting of established + * sessions and on-connect/on-disconnect event + * handling, in future: user credentials, protocol, etc. + * Session identifiers grow monotonically. + * 0 sid is reserved to mean 'no session'. + */ + +/** + * Create a session. + * Invokes a Lua trigger box.session.on_connect if it is + * defined. Issues a new session identifier. + * Must called by the networking layer + * when a new connection is established. + * + * @return handle for a created session + * @exception tnt_Exception or lua error if session + * trigger fails or runs out of resources. + */ +uint32_t +session_create(int fd); + +/** + * Destroy a session. + * Must be called by the networking layer on disconnect. + * Invokes a Lua trigger box.session.on_disconnect if it + * is defined. + * @param session session to destroy. may be NULL. + * + * @exception none + */ +void +session_destroy(uint32_t sid); + + +/** + * Return a file descriptor + * associated with a session, or -1 if the + * session doesn't exist. + */ +int +session_fd(uint32_t sid); + +/** + * Check whether a session exists or not. + */ +static inline bool +session_exists(uint32_t sid) +{ + return session_fd(sid) >= 0; +} + +/** + * Type of the callback which may be invoked + * on connect or disconnect event. + */ +typedef void (*session_trigger_f)(void *); + +struct session_trigger +{ + session_trigger_f trigger; + void *param; +}; + +/* The global on-connect trigger. */ +extern struct session_trigger session_on_connect; +/* The global on-disconnect trigger. */ +extern struct session_trigger session_on_disconnect; + +void +session_init(); + +void +session_free(); diff --git a/include/tarantool.h b/include/tarantool.h index 3812c783cd2de77a6b87f19324e05a00410cd4c2..6da91c9b2e49e213c40e2e1acc62fd65af5ff368 100644 --- a/include/tarantool.h +++ b/include/tarantool.h @@ -33,8 +33,6 @@ struct tarantool_cfg; struct tbuf; -struct log_io; -struct fio_batch; extern int snapshot_pid; extern struct tarantool_cfg cfg; diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index bc8a9e86c87a000eb08fbb29e2d8e10c0923ab7a..e55c8ed34d0ee87a17a479086a9e06050d3b090e 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -118,6 +118,7 @@ set (common_sources cpu_feature.m replica.m iproto.m + session.m object.m exception.m errcode.c @@ -132,6 +133,7 @@ set (common_sources lua/slab.m lua/uuid.m lua/lua_ipc.m + lua/session.m ) if (ENABLE_TRACE) diff --git a/src/admin.m b/src/admin.m index cac4900167da7aab5477e0bf897e682a899c6cc5..10f4e8ce4b9e96cfae7fbac486acf2b446dbae13 100644 --- a/src/admin.m +++ b/src/admin.m @@ -51,6 +51,7 @@ #include "lauxlib.h" #include "lualib.h" #include "box/box.h" +#include "session.h" static const char *help = "available commands:" CRLF @@ -72,7 +73,7 @@ static const char *help = static const char *unknown_command = "unknown command. try typing help." CRLF; -#line 76 "src/admin.m" +#line 77 "src/admin.m" static const int admin_start = 1; static const int admin_first_final = 135; static const int admin_error = 0; @@ -80,7 +81,7 @@ static const int admin_error = 0; static const int admin_en_main = 1; -#line 75 "src/admin.rl" +#line 76 "src/admin.rl" struct salloc_stat_admin_cb_ctx { @@ -217,12 +218,12 @@ admin_dispatch(struct ev_io *coio, struct iobuf *iobuf, lua_State *L) p = in->pos; -#line 221 "src/admin.m" +#line 222 "src/admin.m" { cs = admin_start; } -#line 226 "src/admin.m" +#line 227 "src/admin.m" { if ( p == pe ) goto _test_eof; @@ -285,15 +286,15 @@ case 6: } goto st0; tr13: -#line 308 "src/admin.rl" +#line 309 "src/admin.rl" {slab_validate(); ok(out);} goto st135; tr20: -#line 296 "src/admin.rl" +#line 297 "src/admin.rl" {return -1;} goto st135; tr25: -#line 223 "src/admin.rl" +#line 224 "src/admin.rl" { start(out); tbuf_append(out, help, strlen(help)); @@ -301,9 +302,9 @@ tr25: } goto st135; tr36: -#line 282 "src/admin.rl" +#line 283 "src/admin.rl" {strend = p;} -#line 229 "src/admin.rl" +#line 230 "src/admin.rl" { strstart[strend-strstart]='\0'; start(out); @@ -312,7 +313,7 @@ tr36: } goto st135; tr43: -#line 236 "src/admin.rl" +#line 237 "src/admin.rl" { if (reload_cfg(err)) fail(out, err); @@ -321,11 +322,11 @@ tr43: } goto st135; tr67: -#line 306 "src/admin.rl" +#line 307 "src/admin.rl" {coredump(60); ok(out);} goto st135; tr76: -#line 243 "src/admin.rl" +#line 244 "src/admin.rl" { int ret = snapshot(NULL, 0); @@ -340,9 +341,9 @@ tr76: } goto st135; tr98: -#line 292 "src/admin.rl" +#line 293 "src/admin.rl" { state = false; } -#line 256 "src/admin.rl" +#line 257 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -354,9 +355,9 @@ tr98: } goto st135; tr101: -#line 291 "src/admin.rl" +#line 292 "src/admin.rl" { state = true; } -#line 256 "src/admin.rl" +#line 257 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -368,7 +369,7 @@ tr101: } goto st135; tr117: -#line 211 "src/admin.rl" +#line 212 "src/admin.rl" { start(out); show_cfg(out); @@ -376,15 +377,15 @@ tr117: } goto st135; tr131: -#line 299 "src/admin.rl" +#line 300 "src/admin.rl" {start(out); fiber_info(out); end(out);} goto st135; tr137: -#line 298 "src/admin.rl" +#line 299 "src/admin.rl" {start(out); tarantool_info(out); end(out);} goto st135; tr146: -#line 217 "src/admin.rl" +#line 218 "src/admin.rl" { start(out); errinj_info(out); @@ -392,33 +393,33 @@ tr146: } goto st135; tr152: -#line 302 "src/admin.rl" +#line 303 "src/admin.rl" {start(out); palloc_stat(out); end(out);} goto st135; tr160: -#line 301 "src/admin.rl" +#line 302 "src/admin.rl" {start(out); show_slab(out); end(out);} goto st135; tr164: -#line 303 "src/admin.rl" +#line 304 "src/admin.rl" {start(out); show_stat(out);end(out);} goto st135; st135: if ( ++p == pe ) goto _test_eof135; case 135: -#line 411 "src/admin.m" +#line 412 "src/admin.m" goto st0; tr14: -#line 308 "src/admin.rl" +#line 309 "src/admin.rl" {slab_validate(); ok(out);} goto st7; tr21: -#line 296 "src/admin.rl" +#line 297 "src/admin.rl" {return -1;} goto st7; tr26: -#line 223 "src/admin.rl" +#line 224 "src/admin.rl" { start(out); tbuf_append(out, help, strlen(help)); @@ -426,9 +427,9 @@ tr26: } goto st7; tr37: -#line 282 "src/admin.rl" +#line 283 "src/admin.rl" {strend = p;} -#line 229 "src/admin.rl" +#line 230 "src/admin.rl" { strstart[strend-strstart]='\0'; start(out); @@ -437,7 +438,7 @@ tr37: } goto st7; tr44: -#line 236 "src/admin.rl" +#line 237 "src/admin.rl" { if (reload_cfg(err)) fail(out, err); @@ -446,11 +447,11 @@ tr44: } goto st7; tr68: -#line 306 "src/admin.rl" +#line 307 "src/admin.rl" {coredump(60); ok(out);} goto st7; tr77: -#line 243 "src/admin.rl" +#line 244 "src/admin.rl" { int ret = snapshot(NULL, 0); @@ -465,9 +466,9 @@ tr77: } goto st7; tr99: -#line 292 "src/admin.rl" +#line 293 "src/admin.rl" { state = false; } -#line 256 "src/admin.rl" +#line 257 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -479,9 +480,9 @@ tr99: } goto st7; tr102: -#line 291 "src/admin.rl" +#line 292 "src/admin.rl" { state = true; } -#line 256 "src/admin.rl" +#line 257 "src/admin.rl" { strstart[strend-strstart] = '\0'; if (errinj_set_byname(strstart, state)) { @@ -493,7 +494,7 @@ tr102: } goto st7; tr118: -#line 211 "src/admin.rl" +#line 212 "src/admin.rl" { start(out); show_cfg(out); @@ -501,15 +502,15 @@ tr118: } goto st7; tr132: -#line 299 "src/admin.rl" +#line 300 "src/admin.rl" {start(out); fiber_info(out); end(out);} goto st7; tr138: -#line 298 "src/admin.rl" +#line 299 "src/admin.rl" {start(out); tarantool_info(out); end(out);} goto st7; tr147: -#line 217 "src/admin.rl" +#line 218 "src/admin.rl" { start(out); errinj_info(out); @@ -517,22 +518,22 @@ tr147: } goto st7; tr153: -#line 302 "src/admin.rl" +#line 303 "src/admin.rl" {start(out); palloc_stat(out); end(out);} goto st7; tr161: -#line 301 "src/admin.rl" +#line 302 "src/admin.rl" {start(out); show_slab(out); end(out);} goto st7; tr165: -#line 303 "src/admin.rl" +#line 304 "src/admin.rl" {start(out); show_stat(out);end(out);} goto st7; st7: if ( ++p == pe ) goto _test_eof7; case 7: -#line 536 "src/admin.m" +#line 537 "src/admin.m" if ( (*p) == 10 ) goto st135; goto st0; @@ -685,28 +686,28 @@ case 23: } goto tr33; tr33: -#line 282 "src/admin.rl" +#line 283 "src/admin.rl" {strstart = p;} goto st24; st24: if ( ++p == pe ) goto _test_eof24; case 24: -#line 696 "src/admin.m" +#line 697 "src/admin.m" switch( (*p) ) { case 10: goto tr36; case 13: goto tr37; } goto st24; tr34: -#line 282 "src/admin.rl" +#line 283 "src/admin.rl" {strstart = p;} goto st25; st25: if ( ++p == pe ) goto _test_eof25; case 25: -#line 710 "src/admin.m" +#line 711 "src/admin.m" switch( (*p) ) { case 10: goto tr36; case 13: goto tr37; @@ -1156,28 +1157,28 @@ case 73: goto tr91; goto st0; tr91: -#line 290 "src/admin.rl" +#line 291 "src/admin.rl" { strstart = p; } goto st74; st74: if ( ++p == pe ) goto _test_eof74; case 74: -#line 1167 "src/admin.m" +#line 1168 "src/admin.m" if ( (*p) == 32 ) goto tr92; if ( 33 <= (*p) && (*p) <= 126 ) goto st74; goto st0; tr92: -#line 290 "src/admin.rl" +#line 291 "src/admin.rl" { strend = p; } goto st75; st75: if ( ++p == pe ) goto _test_eof75; case 75: -#line 1181 "src/admin.m" +#line 1182 "src/admin.m" switch( (*p) ) { case 32: goto st75; case 111: goto st76; @@ -1869,7 +1870,7 @@ case 134: _out: {} } -#line 314 "src/admin.rl" +#line 315 "src/admin.rl" in->pos = pe; @@ -1889,10 +1890,16 @@ admin_handler(va_list ap) { struct ev_io coio = va_arg(ap, struct ev_io); struct iobuf *iobuf = va_arg(ap, struct iobuf *); - fiber_set_sid(fiber, box_sid()); lua_State *L = lua_newthread(tarantool_L); int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX); @try { + /* + * Admin and iproto connections must have a + * session object, representing the state of + * a remote client: it's used in Lua + * stored procedures. + */ + session_create(coio.fd); for (;;) { if (admin_dispatch(&coio, iobuf, L) < 0) return; @@ -1903,6 +1910,7 @@ admin_handler(va_list ap) luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref); evio_close(&coio); iobuf_destroy(iobuf); + session_destroy(fiber->sid); } } diff --git a/src/admin.rl b/src/admin.rl index 357f7f977044e4eae722985d35e4914454010afd..3b1f3619094df0d8d680128836ca3879768f74ea 100644 --- a/src/admin.rl +++ b/src/admin.rl @@ -49,6 +49,7 @@ #include "lauxlib.h" #include "lualib.h" #include "box/box.h" +#include "session.h" static const char *help = "available commands:" CRLF @@ -330,10 +331,16 @@ admin_handler(va_list ap) { struct ev_io coio = va_arg(ap, struct ev_io); struct iobuf *iobuf = va_arg(ap, struct iobuf *); - fiber_set_sid(fiber, box_sid()); lua_State *L = lua_newthread(tarantool_L); int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX); @try { + /* + * Admin and iproto connections must have a + * session object, representing the state of + * a remote client: it's used in Lua + * stored procedures. + */ + session_create(coio.fd); for (;;) { if (admin_dispatch(&coio, iobuf, L) < 0) return; @@ -344,6 +351,7 @@ admin_handler(va_list ap) luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref); evio_close(&coio); iobuf_destroy(iobuf); + session_destroy(fiber->sid); } } diff --git a/src/box/box.m b/src/box/box.m index aad13d6062d9a4ae85426da100d518cde267b600..ab3e6c1e34988c028deacb6e96d767b17ebbf7f7 100644 --- a/src/box/box.m +++ b/src/box/box.m @@ -539,11 +539,3 @@ box_status(void) { return status; } - - -uint32_t -box_sid() -{ - static uint32_t sid = 1; - return sid++; -} diff --git a/src/fiber.m b/src/fiber.m index 8bd36bd7771d5b4d9bf430291c85fac30a66ba8c..02ba766e312c34ebc9f22ab95c53737979e1f156 100644 --- a/src/fiber.m +++ b/src/fiber.m @@ -305,8 +305,6 @@ fiber_find(int fid) if (k == mh_end(fiber_registry)) return NULL; - if (!mh_exist(fiber_registry, k)) - return NULL; return mh_value(fiber_registry, k); } diff --git a/src/iproto.m b/src/iproto.m index 2560c34812d1253e7f61e3cde14783af55fe5ead..d3fa59411c86380a65a71d3e38f4687a39447c85 100644 --- a/src/iproto.m +++ b/src/iproto.m @@ -44,6 +44,7 @@ #include "box/request.h" #include "iobuf.h" #include "evio.h" +#include "session.h" enum { /** Maximal iproto package body length (2GiB) */ @@ -61,6 +62,8 @@ struct iproto_header { uint32_t sync; } __attribute__((packed)); +static struct iproto_header dummy_header = { 0, 0, 0 }; + struct iproto_reply_header { struct iproto_header hdr; uint32_t ret_code; @@ -75,7 +78,7 @@ iproto(const void *pos) return (struct iproto_header *) pos; } -/* {{{ struct port_iproto */ +/* {{{ port_iproto */ /** * struct port_iproto users need to be careful to: @@ -148,8 +151,9 @@ static struct port_vtab port_iproto_vtab = { port_iproto_eof, }; -void -port_iproto_init(struct port_iproto *port, struct obuf *buf, struct iproto_header *req) +static void +port_iproto_init(struct port_iproto *port, struct obuf *buf, + struct iproto_header *req) { port->vtab = &port_iproto_vtab; port->buf = buf; @@ -160,10 +164,59 @@ port_iproto_init(struct port_iproto *port, struct obuf *buf, struct iproto_heade /* }}} */ -/* {{{ iproto_request_queue */ +/* {{{ iproto_queue */ + +struct iproto_request; + +/** + * Implementation of an input queue of the box request processor. + * + * Socket event handlers read data, determine request boundaries + * and enqueue requests. Once all input/output events are + * processed, an own event handler is invoked to deal with the + * requests in the queue: it's important that each request is + * processed in a fiber environment. + * + * @sa iproto_queue_schedule, iproto_handler, iproto_handshake + */ + +struct iproto_queue +{ + /** + * Ring buffer of fixed size, preallocated + * during initialization. + */ + struct iproto_request *queue; + /** + * Main function of the fiber invoked to handle + * all outstanding tasks in this queue. + */ + void (*handler)(va_list); + /** + * Cache of fibers which work on requests + * in this queue. + */ + struct rlist fiber_cache; + /** + * Used to trigger request processing when + * the queue becomes non-empty. + */ + struct ev_async watcher; + /* Ring buffer position. */ + int begin, end; + /* Ring buffer size. */ + int size; +}; + +enum { + IPROTO_REQUEST_QUEUE_SIZE = 2048, + IPROTO_CONNECT_QUEUE_SIZE = 256 +}; struct iproto_session; +typedef void (*iproto_request_f)(struct iproto_request *); + /** * A single request from the client. All requests * from all clients are queued into a single queue @@ -175,55 +228,129 @@ struct iproto_request struct iobuf *iobuf; /* Position of the request in the input buffer. */ struct iproto_header *header; + iproto_request_f process; }; -/** Request queue. */ -enum { IPROTO_REQUEST_QUEUE_SIZE = 2048 }; -struct iproto_request_queue -{ - int begin, end; - struct iproto_request queue[IPROTO_REQUEST_QUEUE_SIZE]; -} ir_queue; +/** + * A single global queue for all requests in all connections. All + * requests are processed concurrently. + * Is also used as a queue for just established connections and to + * execute disconnect triggers. A few notes about these triggers: + * - they need to be run in a fiber + * - unlike an ordinary request failure, on_connect trigger + * failure must lead to connection shutdown. + * - as long as on_connect trigger can be used for client + * authentication, it must be processed before any other request + * on this connection. + */ +static struct iproto_queue request_queue; -static struct ev_async iproto_postio; +static inline bool +iproto_queue_is_empty(struct iproto_queue *i_queue) +{ + return i_queue->begin == i_queue->end; +} static inline void -iproto_enqueue_request(struct iproto_session *session, struct iobuf *iobuf, - struct iproto_header *header) +iproto_enqueue_request(struct iproto_queue *i_queue, + struct iproto_session *session, + struct iobuf *iobuf, + struct iproto_header *header, + iproto_request_f process) { - /* The queue is full. Invoke iproto_handler to work it off. */ - if (ir_queue.end == IPROTO_REQUEST_QUEUE_SIZE) - ev_invoke(&iproto_postio, EV_CUSTOM); - assert(ir_queue.end < IPROTO_REQUEST_QUEUE_SIZE); - struct iproto_request *request = ir_queue.queue + ir_queue.end++; + /* If the queue is full, invoke the handler to work it off. */ + if (i_queue->end == i_queue->size) + ev_invoke(&i_queue->watcher, EV_CUSTOM); + assert(i_queue->end < i_queue->size); + bool was_empty = iproto_queue_is_empty(i_queue); + struct iproto_request *request = i_queue->queue + i_queue->end++; request->session = session; request->iobuf = iobuf; request->header = header; + request->process = process; + /* + * There were some queued requests, ensure they are + * handled. + */ + if (was_empty) + ev_feed_event(&request_queue.watcher, EV_CUSTOM); } -static inline struct iproto_header * -iproto_dequeue_request(struct iproto_session **session, struct iobuf **iobuf) +static inline bool +iproto_dequeue_request(struct iproto_queue *i_queue, + struct iproto_request *out) { - if (ir_queue.begin == ir_queue.end) - return NULL; - struct iproto_request *request = ir_queue.queue + ir_queue.begin++; - if (ir_queue.begin == ir_queue.end) - ir_queue.begin = ir_queue.end = 0; - *session = request->session; - *iobuf = request->iobuf; - return request->header; + if (i_queue->begin == i_queue->end) + return false; + struct iproto_request *request = i_queue->queue + i_queue->begin++; + if (i_queue->begin == i_queue->end) + i_queue->begin = i_queue->end = 0; + *out = *request; + return true; } -static inline bool -iproto_request_queue_is_empty() +/** Put the current fiber into a queue fiber cache. */ +static inline void +iproto_cache_fiber(struct iproto_queue *i_queue) +{ + fiber_gc(); + rlist_add_entry(&i_queue->fiber_cache, fiber, state); + fiber_yield(); +} + +/** Create fibers to handle all outstanding tasks. */ +static void +iproto_queue_schedule(struct ev_async *watcher, + int events __attribute__((unused))) +{ + struct iproto_queue *i_queue = watcher->data; + while (! iproto_queue_is_empty(i_queue)) { + + struct fiber *f = rlist_shift_entry(&i_queue->fiber_cache, + struct fiber, state); + if (f == NULL) + f = fiber_create("iproto", i_queue->handler); + fiber_call(f, i_queue); + } +} + +static inline void +iproto_queue_init(struct iproto_queue *i_queue, + int size, void (*handler)(va_list)) +{ + i_queue->size = size; + i_queue->begin = i_queue->end = 0; + i_queue->queue = palloc(eter_pool, size * + sizeof (struct iproto_request)); + /** + * Initialize an ev_async event which would start + * workers for all outstanding tasks. + */ + ev_async_init(&i_queue->watcher, iproto_queue_schedule); + i_queue->watcher.data = i_queue; + i_queue->handler = handler; + rlist_init(&i_queue->fiber_cache); +} + +/** A handler to process all queued requests. */ +static void +iproto_queue_handler(va_list ap) { - return ir_queue.begin == ir_queue.end; + struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *); + struct iproto_request request; +restart: + while (iproto_dequeue_request(i_queue, &request)) { + + request.process(&request); + } + iproto_cache_fiber(&request_queue); + goto restart; } /* }}} */ -/* {{{ struct iproto_session */ +/* {{{ iproto_session */ /** Context of a single client connection. */ struct iproto_session @@ -252,10 +379,14 @@ struct iproto_session ssize_t parse_size; /** Current write position in the output buffer */ struct obuf_svp write_pos; + /** + * Function of the request processor to handle + * a single request. + */ box_process_func *handler; struct ev_io input; struct ev_io output; - /** Mod session id. */ + /** Session id. */ uint32_t sid; }; @@ -282,6 +413,15 @@ static void iproto_session_on_output(struct ev_io *watcher, int revents __attribute__((unused))); +static void +iproto_process_request(struct iproto_request *request); + +static void +iproto_process_connect(struct iproto_request *request); + +static void +iproto_process_disconnect(struct iproto_request *request); + static struct iproto_session * iproto_session_create(const char *name, int fd, box_process_func *param) { @@ -300,14 +440,16 @@ iproto_session_create(const char *name, int fd, box_process_func *param) session->iobuf[1] = iobuf_create(name); session->parse_size = 0; session->write_pos = obuf_create_svp(&session->iobuf[0]->out); - session->sid = box_sid(); + session->sid = 0; return session; } +/** Recycle a session. Never throws. */ static inline void iproto_session_destroy(struct iproto_session *session) { assert(iproto_session_is_idle(session)); + session_destroy(session->sid); /* Never throws. No-op if sid is 0. */ iobuf_destroy(session->iobuf[0]); iobuf_destroy(session->iobuf[1]); SLIST_INSERT_HEAD(&iproto_session_cache, session, next_in_cache); @@ -332,11 +474,16 @@ iproto_session_shutdown(struct iproto_session *session) * as soon as all parsed data is processed. */ session->iobuf[0]->in.end -= session->parse_size; - iproto_session_gc(session); + /* + * If the session is not idle it will + * be destroyed only after all its requests + * are processed (and dropped). + */ + iproto_enqueue_request(&request_queue, session, + session->iobuf[0], &dummy_header, + iproto_process_disconnect); } -/* }}} */ - static inline void iproto_validate_header(struct iproto_header *header, int fd) { @@ -421,7 +568,7 @@ iproto_session_input_iobuf(struct iproto_session *session) } /** Enqueue all requests which were read up. */ -static inline int +static inline void iproto_enqueue_batch(struct iproto_session *session, struct ibuf *in, int fd) { int batch_size; @@ -438,11 +585,11 @@ iproto_enqueue_batch(struct iproto_session *session, struct ibuf *in, int fd) header->len)) break; - iproto_enqueue_request(session, session->iobuf[0], - header); + iproto_enqueue_request(&request_queue, session, + session->iobuf[0], header, + iproto_process_request); session->parse_size -= sizeof(*header) + header->len; } - return batch_size; } static void @@ -475,13 +622,7 @@ iproto_session_on_input(struct ev_io *watcher, in->end += nrd; session->parse_size += nrd; /* Enqueue all requests which are fully read up. */ - if (iproto_enqueue_batch(session, in, fd) > 0) { - /* - * There were some queued requests, ensure - * they are handled. - */ - ev_feed_event(&iproto_postio, EV_CUSTOM); - } + iproto_enqueue_batch(session, in, fd); } @catch (tnt_Exception *e) { [e log]; iproto_session_shutdown(session); @@ -531,7 +672,7 @@ iproto_session_on_output(struct ev_io *watcher, int revent __attribute__((unused))) { struct iproto_session *session = watcher->data; - int fd = session->input.fd; + int fd = session->output.fd; struct obuf_svp *svp = &session->write_pos; @try { @@ -552,9 +693,9 @@ iproto_session_on_output(struct ev_io *watcher, } } -/* {{{ iproto_reply fiber */ +/* }}} */ -struct rlist iproto_fiber_cache; +/* {{{ iproto_process_* functions */ /** Stack reply to 'ping' packet. */ static inline void @@ -602,41 +743,68 @@ iproto_reply(struct port_iproto *port, box_process_func callback, } } -/** Execute a single request and cache output in obuf. */ static void -iproto_handler(va_list arg __attribute__((unused))) +iproto_process_request(struct iproto_request *request) { - struct iproto_header *header; - struct iproto_session *session; - struct iobuf *iobuf; + struct iproto_session *session = request->session; + struct iproto_header *header = request->header; + struct iobuf *iobuf = request->iobuf; struct port_iproto port; -restart: - while ((header = iproto_dequeue_request(&session, &iobuf))) { - if (unlikely(! evio_is_connected(&session->output))) { - /* - * Drop a request of a disconnected - * session. - */ - iobuf->in.pos += sizeof(*header) + header->len; - iproto_session_gc(session); - continue; - } - @try { - fiber_set_sid(fiber, session->sid); - iproto_reply(&port, *session->handler, &iobuf->out, header); - } @finally { - iobuf->in.pos += sizeof(*header) + header->len; - } + @try { + if (unlikely(! evio_is_connected(&session->output))) + return; + + fiber_set_sid(fiber, session->sid); + iproto_reply(&port, *session->handler, + &iobuf->out, header); + + if (unlikely(! evio_is_connected(&session->output))) + return; if (! ev_is_active(&session->output)) ev_feed_event(&session->output, EV_WRITE); + } @finally { + iobuf->in.pos += sizeof(*header) + header->len; + iproto_session_gc(session); } - fiber_gc(); - rlist_add_entry(&iproto_fiber_cache, fiber, state); - fiber_yield(); - goto restart; } -/* }}} */ +/** + * Handshake a connection: invoke the on-connect trigger + * and possibly authenticate. Try to send the client an error + * upon a failure. + */ +static void +iproto_process_connect(struct iproto_request *request) +{ + struct iproto_session *session = request->session; + struct iobuf *iobuf = request->iobuf; + int fd = session->input.fd; + @try { /* connect. */ + session->sid = session_create(fd); + } @catch (ClientError *e) { + iproto_reply_error(&iobuf->out, request->header, e); + iproto_flush(iobuf, fd, &session->write_pos); + iproto_session_shutdown(session); + return; + } @catch (tnt_Exception *e) { + [e log]; + assert(session->sid == 0); + iproto_session_shutdown(session); + return; + } + /* Handshake OK, start reading input. */ + ev_feed_event(&session->input, EV_READ); +} + +static void +iproto_process_disconnect(struct iproto_request *request) +{ + /* Runs the trigger, which may yield. */ + iproto_session_gc(request->session); +} + +/** }}} */ + /** * Create a session context and start input. @@ -650,25 +818,9 @@ iproto_on_accept(struct evio_service *service, int fd, struct iproto_session *session; session = iproto_session_create(name, fd, service->on_accept_param); - - ev_feed_event(&session->input, EV_READ); -} - -/** - * Create fibers to handle all outstanding tasks. - */ -static void -iproto_schedule(struct ev_async *watcher __attribute__((unused)), - int events __attribute__((unused))) -{ - while (! iproto_request_queue_is_empty()) { - - struct fiber *f = rlist_shift_entry(&iproto_fiber_cache, - struct fiber, state); - if (f == NULL) - f = fiber_create("iproto", iproto_handler); - fiber_call(f); - } + iproto_enqueue_request(&request_queue, session, + session->iobuf[0], &dummy_header, + iproto_process_connect); } /** @@ -698,11 +850,7 @@ iproto_init(const char *bind_ipaddr, int primary_port, iproto_on_accept, &box_process_ro); evio_service_start(&secondary); } - /** - * Initialize an ev_async event which would start workers - * for all outstanding tasks. - */ - ev_async_init(&iproto_postio, iproto_schedule); - rlist_init(&iproto_fiber_cache); + iproto_queue_init(&request_queue, IPROTO_REQUEST_QUEUE_SIZE, + iproto_queue_handler); } diff --git a/src/lua/init.m b/src/lua/init.m index 50b56c3a69bf22efd20963cf033cb5ab80bd9be4..d8b619a429a4b24e56c09a231a29f5a6bf838032 100644 --- a/src/lua/init.m +++ b/src/lua/init.m @@ -48,6 +48,7 @@ #include "lua/slab.h" #include "lua/stat.h" #include "lua/uuid.h" +#include "lua/session.h" #include TARANTOOL_CONFIG @@ -568,14 +569,6 @@ lbox_fiber_id(struct lua_State *L) return 1; } -static int -lbox_fiber_sid(struct lua_State *L) -{ - struct fiber *f = lua_gettop(L) ? lbox_checkfiber(L, 1) : fiber; - lua_pushinteger(L, f->sid); - return 1; -} - static struct lua_State * box_lua_fiber_get_coro(struct lua_State *L, struct fiber *f) { @@ -798,6 +791,7 @@ lbox_fiber_create(struct lua_State *L) " reached"); struct fiber *f = fiber_create("lua", box_lua_fiber_run); + /* Preserve the session in a child fiber. */ fiber_set_sid(f, fiber->sid); /* Initially the fiber is cancellable */ f->flags |= FIBER_USER_MODE | FIBER_CANCELLABLE; @@ -1048,7 +1042,6 @@ lbox_fiber_testcancel(struct lua_State *L) static const struct luaL_reg lbox_fiber_meta [] = { {"id", lbox_fiber_id}, - {"sid", lbox_fiber_sid}, {"name", lbox_fiber_name}, {"__gc", lbox_fiber_gc}, {NULL, NULL} @@ -1058,7 +1051,6 @@ static const struct luaL_reg fiberlib[] = { {"sleep", lbox_fiber_sleep}, {"self", lbox_fiber_self}, {"id", lbox_fiber_id}, - {"sid", lbox_fiber_sid}, {"find", lbox_fiber_find}, {"cancel", lbox_fiber_cancel}, {"testcancel", lbox_fiber_testcancel}, @@ -1239,12 +1231,6 @@ tarantool_lua_register_type(struct lua_State *L, const char *type_name, lua_pop(L, 1); } -/** - * Remember the LuaJIT FFI extension reference index - * to protect it from being garbage collected. - */ -static int ffi_ref = 0; - struct lua_State * tarantool_lua_init() { @@ -1258,7 +1244,11 @@ tarantool_lua_init() if (lua_pcall(L, 1, 0, 0) != 0) panic("%s", lua_tostring(L, -1)); lua_getglobal(L, "ffi"); - ffi_ref = luaL_ref(L, LUA_REGISTRYINDEX); + /** + * Remember the LuaJIT FFI extension reference index + * to protect it from being garbage collected. + */ + (void) luaL_ref(L, LUA_REGISTRYINDEX); lua_pushnil(L); lua_setglobal(L, "ffi"); luaL_register(L, boxlib_name, boxlib); @@ -1275,6 +1265,7 @@ tarantool_lua_init() tarantool_lua_stat_init(L); tarantool_lua_ipc_init(L); tarantool_lua_uuid_init(L); + tarantool_lua_session_init(L); mod_lua_init(L); @@ -1286,7 +1277,6 @@ tarantool_lua_init() void tarantool_lua_close(struct lua_State *L) { - luaL_unref(L, LUA_REGISTRYINDEX, ffi_ref); /* collects garbage, invoking userdata gc */ lua_close(L); } diff --git a/src/lua/session.m b/src/lua/session.m new file mode 100644 index 0000000000000000000000000000000000000000..523d8d7c0715dbe835d4bc355c8ea1ed8c65179e --- /dev/null +++ b/src/lua/session.m @@ -0,0 +1,204 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "lua/session.h" +#include "lua/init.h" + +#include <lua.h> +#include <lauxlib.h> +#include <lualib.h> + +#include "fiber.h" +#include "session.h" +#include "sio.h" + +static const char *sessionlib_name = "box.session"; + +/** + * Return a unique monotonic session + * identifier. The identifier can be used + * to check whether or not a session is alive. + * 0 means there is no session (e.g. + * a procedure is running in a detached + * fiber). + */ +static int +lbox_session_id(struct lua_State *L) +{ + lua_pushnumber(L, fiber->sid); + return 1; +} + +/** + * Check whether or not a session exists. + */ +static int +lbox_session_exists(struct lua_State *L) +{ + if (lua_gettop(L) != 1) + luaL_error(L, "session.exists(sid): bad arguments"); + + uint32_t sid = luaL_checkint(L, -1); + lua_pushnumber(L, session_exists(sid)); + return 1; +} + +/** + * Pretty print peer name. + */ +static int +lbox_session_peer(struct lua_State *L) +{ + if (lua_gettop(L) > 1) + luaL_error(L, "session.peer(sid): bad arguments"); + + uint32_t sid = lua_gettop(L) == 1 ? + luaL_checkint(L, -1) : fiber->sid; + + int fd = session_fd(sid); + struct sockaddr_in addr; + sio_getpeername(fd, &addr); + + lua_pushstring(L, sio_strfaddr(&addr)); + return 1; +} + +struct lbox_session_trigger +{ + struct session_trigger *trigger; + int coro_ref; +}; + +static struct lbox_session_trigger on_connect = + { &session_on_connect, LUA_NOREF}; +static struct lbox_session_trigger on_disconnect = + { &session_on_disconnect, LUA_NOREF}; + +static void +lbox_session_run_trigger(void *param) +{ + struct lbox_session_trigger *trigger = param; + /* Copy the referenced callable object object stack. */ + lua_rawgeti(tarantool_L, LUA_REGISTRYINDEX, trigger->coro_ref); + + struct lua_State *coro_L = lua_tothread(tarantool_L, -1); + lua_pop(tarantool_L, 1); + /* + * If there was junk from previous invocation of the + * trigger, clear it, only leaving on the stack the chunk + * to be run. The stack may be polluted when previous + * invocation ended up with an error. + */ + lua_settop(coro_L, 1); + /* lua_call pops the function, make a copy */ + lua_pushvalue(coro_L, -1); + @try { + lua_call(coro_L, 0, 0); + } @catch (tnt_Exception *e) { + @throw; + } @catch (...) { + tnt_raise(ClientError, :ER_PROC_LUA, + lua_tostring(coro_L, -1)); + } +} + +static int +lbox_session_set_trigger(struct lua_State *L, + struct lbox_session_trigger *trigger) +{ + if (lua_gettop(L) != 1 || + (lua_type(L, -1) != LUA_TFUNCTION && + lua_type(L, -1) != LUA_TNIL)) { + luaL_error(L, "session.on_connect(chunk): bad arguments"); + } + + struct lua_State *coro_L; + + if (trigger->coro_ref != LUA_NOREF) { + lua_rawgeti(L, LUA_REGISTRYINDEX, trigger->coro_ref); + coro_L = lua_tothread(L, -1); + lua_pop(L, 1); /* pop the coro. */ + /* If there was junk from previous invocation, clear it. */ + lua_settop(coro_L, 1); + } else { + coro_L = lua_newthread(L); + /* Reference the new thread and pop it. */ + trigger->coro_ref = luaL_ref(L, LUA_REGISTRYINDEX); + lua_pushnil(coro_L); + } + /** Move the chunk to the coroutine in will be run in. */ + lua_xmove(L, coro_L, 1); + /* Return the old chunk. */ + lua_insert(coro_L, -2); + lua_xmove(coro_L, L, 1); + /* + * Set or clear the trigger. Return the old value of the + * trigger. + */ + if (lua_type(coro_L, -1) == LUA_TNIL) { + trigger->trigger->trigger = NULL; + trigger->trigger->param = NULL; + } else { + trigger->trigger->trigger = lbox_session_run_trigger; + trigger->trigger->param = trigger; + } + return 1; +} + +static int +lbox_session_on_connect(struct lua_State *L) +{ + return lbox_session_set_trigger(L, &on_connect); +} + +static int +lbox_session_on_disconnect(struct lua_State *L) +{ + return lbox_session_set_trigger(L, &on_disconnect); +} + +static const struct luaL_reg lbox_session_meta [] = { + {"id", lbox_session_id}, + {NULL, NULL} +}; + +static const struct luaL_reg sessionlib[] = { + {"id", lbox_session_id}, + {"exists", lbox_session_exists}, + {"peer", lbox_session_peer}, + {"on_connect", lbox_session_on_connect}, + {"on_disconnect", lbox_session_on_disconnect}, + {NULL, NULL} +}; + +void +tarantool_lua_session_init(struct lua_State *L) +{ + luaL_register(L, sessionlib_name, sessionlib); + lua_pop(L, 1); +} diff --git a/src/session.m b/src/session.m new file mode 100644 index 0000000000000000000000000000000000000000..3992a540169e928e7caf023e752e41580fc5a846 --- /dev/null +++ b/src/session.m @@ -0,0 +1,117 @@ +/* + * Redistribution and use in source and binary forms, with or + * without modification, are permitted provided that the following + * conditions are met: + * + * 1. Redistributions of source code must retain the above + * copyright notice, this list of conditions and the + * following disclaimer. + * + * 2. Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following + * disclaimer in the documentation and/or other materials + * provided with the distribution. + * + * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND + * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED + * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL + * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, + * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL + * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR + * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF + * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF + * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ +#include "session.h" +#include "fiber.h" + +#include "assoc.h" +#include "exception.h" + +uint32_t sid_max; + +static struct mh_i32ptr_t *session_registry; + +struct session_trigger session_on_connect; +struct session_trigger session_on_disconnect; + +uint32_t +session_create(int fd) +{ + /* Return the next sid rolling over the reserved value of 0. */ + while (++sid_max == 0) + ; + + uint32_t sid = sid_max; + + mh_int_t k = mh_i32ptr_put(session_registry, sid, + (void *) (intptr_t) fd, NULL); + + if (k == mh_end(session_registry)) { + tnt_raise(ClientError, :ER_MEMORY_ISSUE, + "session hash", "new session"); + } + /* + * Run the trigger *after* setting the current + * fiber sid. + */ + fiber_set_sid(fiber, sid); + if (session_on_connect.trigger) { + void *param = session_on_connect.param; + @try { + session_on_connect.trigger(param); + } @catch (tnt_Exception *e) { + fiber_set_sid(fiber, 0); + mh_i32ptr_remove(session_registry, sid); + @throw; + } + } + + return sid; +} + +void +session_destroy(uint32_t sid) +{ + if (sid == 0) /* no-op for a dead session. */ + return; + + if (session_on_disconnect.trigger) { + void *param = session_on_disconnect.param; + @try { + session_on_disconnect.trigger(param); + } @catch (tnt_Exception *e) { + [e log]; + } @catch (id e) { + /* catch all. */ + } + } + mh_i32ptr_remove(session_registry, sid); +} + +int +session_fd(uint32_t sid) +{ + mh_int_t k = mh_i32ptr_get(session_registry, sid); + return k == mh_end(session_registry) ? + -1 : (intptr_t) mh_value(session_registry, k); +} + +void +session_init() +{ + session_registry = mh_i32ptr_init(); + if (session_registry == NULL) + panic("out of memory"); +} + +void +session_free() +{ + if (session_registry) + mh_i32ptr_destroy(session_registry); +} diff --git a/src/tarantool.m b/src/tarantool.m index 30e8e5ec6ca46238e21569ae6ec3a180a4f32d40..2fd963c73982168f3565c2fc1fb12bb1b265c62a 100644 --- a/src/tarantool.m +++ b/src/tarantool.m @@ -64,6 +64,7 @@ #include "tarantool_pthread.h" #include "lua/init.h" #include "memcached.h" +#include "session.h" #include "box/box.h" @@ -593,6 +594,7 @@ tarantool_free(void) destroy_tarantool_cfg(&cfg); fiber_free(); + session_free(); palloc_free(); ev_default_destroy(); #ifdef ENABLE_GCOV @@ -876,6 +878,7 @@ main(int argc, char **argv) cfg.secondary_port); admin_init(cfg.bind_ipaddr, cfg.admin_port); replication_init(cfg.bind_ipaddr, cfg.replication_port); + session_init(); /* * Load user init script. The script should have access * to Tarantool Lua API (box.cfg, box.fiber, etc...) that diff --git a/test/box/fiber.result b/test/box/fiber.result index efe9afecf3ae7af825a5505d45c86e45c7901eb9..b44c7713d2bd616d5509798e322ecc3fa0aafefc 100644 --- a/test/box/fiber.result +++ b/test/box/fiber.result @@ -201,24 +201,3 @@ lua box.fiber.find(920) lua box.space[0]:truncate() --- ... -lua box.fiber.sid() > 0 ---- - - true -... -lua f = box.fiber.create(function() box.fiber.detach() failed = box.fiber.sid() ~= 0 end) ---- -... -lua box.fiber.resume(f) ---- -... -lua failed ---- - - false -... -lua f = box.fiber.create(function() if box.fiber.sid() == 0 then error('wrong sid') end end) ---- -... -lua box.fiber.resume(f) ---- - - true -... diff --git a/test/box/fiber.test b/test/box/fiber.test index ff6b2e9f797fa79ca783b061b8e33f1e4cf6ca10..09d8b3d90a8f6f3e36873cc183e5c92ab7065628 100644 --- a/test/box/fiber.test +++ b/test/box/fiber.test @@ -80,13 +80,3 @@ exec admin "lua box.fiber.find(900)" exec admin "lua box.fiber.find(910)" exec admin "lua box.fiber.find(920)" exec admin "lua box.space[0]:truncate()" - -# check fiber.sid() and fiber.id() -exec admin "lua box.fiber.sid() > 0" -exec admin "lua f = box.fiber.create(function() box.fiber.detach() failed = box.fiber.sid() ~= 0 end)" -exec admin "lua box.fiber.resume(f)" -exec admin "lua failed" -exec admin "lua f = box.fiber.create(function() if box.fiber.sid() == 0 then error('wrong sid') end end)" -exec admin "lua box.fiber.resume(f)" - - diff --git a/test/box/lua.result b/test/box/lua.result index e57c46d84d83a338f3194d038f052c2da151955d..429192fad9f45fa5a4a1f06108f5c440b317a94c 100644 --- a/test/box/lua.result +++ b/test/box/lua.result @@ -20,23 +20,24 @@ lua for n in pairs(box) do print(' - box.', n) end - box.replace - box.space - box.cfg + - box.on_reload_configuration - box.select_range - box.insert - - box.on_reload_configuration - box.counter - - box.info - box.auto_increment + - box.info + - box.session - box.uuid_hex - box.update - - box.slab - - box.process - box.dostring - - box.index + - box.process + - box.select_limit + - box.slab - box.select - box.flags - box.unpack + - box.index - box.stat - - box.select_limit - box.pack ... lua box.pack() diff --git a/test/box/session.result b/test/box/session.result new file mode 100644 index 0000000000000000000000000000000000000000..272fc77b977bc8549764ebb1759f77b25f2338b4 --- /dev/null +++ b/test/box/session.result @@ -0,0 +1,171 @@ +lua box.session.exists(box.session.id()) +--- + - 1 +... +lua box.session.exists() +--- +error: 'session.exists(sid): bad arguments' +... +lua box.session.exists(1, 2, 3) +--- +error: 'session.exists(sid): bad arguments' +... +lua box.session.exists(1234567890) +--- + - 0 +... +lua box.session.id() > 0 +--- + - true +... +lua f = box.fiber.create(function() box.fiber.detach() failed = box.session.id() ~= 0 end) +--- +... +lua box.fiber.resume(f) +--- +... +lua failed +--- + - false +... +lua f1 = box.fiber.create(function() if box.session.id() == 0 then failed = true end end) +--- +... +lua box.fiber.resume(f1) +--- + - true +... +lua failed +--- + - false +... +lua box.session.peer() == box.session.peer(box.session.id()) +--- + - true +... +lua box.session.on_connect(function() end) +--- + - nil +... +lua box.session.on_disconnect(function() end) +--- + - nil +... +lua type(box.session.on_connect(function() error('hear') end)) +--- + - function +... +lua type(box.session.on_disconnect(function() error('hear') end)) +--- + - function +... +lua box.session.on_connect() +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_disconnect() +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_connect(function() end, function() end) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_disconnect(function() end, function() end) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_connect(1, 2) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_disconnect(1, 2) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_connect(1) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua box.session.on_disconnect(1) +--- +error: 'session.on_connect(chunk): bad arguments' +... +lua type(box.session.on_connect(nil)) +--- + - function +... +lua type(box.session.on_disconnect(nil)) +--- + - function +... +lua type(box.session.on_connect(nil)) +--- + - nil +... +lua type(box.session.on_disconnect(nil)) +--- + - nil +... +lua function inc() active_connections = active_connections + 1 end +--- +... +lua function dec() active_connections = active_connections - 1 end +--- +... +lua box.session.on_connect(inc) +--- + - nil +... +lua box.session.on_disconnect(dec) +--- + - nil +... +lua active_connections = 0 +--- +... +lua active_connections +--- + - 1 +... +lua active_connections +--- + - 2 +... +lua type(box.session.on_connect(nil)) +--- + - function +... +lua type(box.session.on_disconnect(nil)) +--- + - function +... +lua box.session.on_connect(function() box.insert(0, box.session.id()) end) +--- + - nil +... +lua box.session.on_disconnect(function() box.delete(0, box.session.id()) end) +--- + - nil +... +lua box.unpack('i', box.select(0, 0, box.session.id())[0]) == box.session.id() +--- + - true +... +lua type(box.session.on_connect(function() nosuchfunction() end)) +--- + - function +... +disconnected +lua type(box.session.on_connect(nil)) +--- + - function +... +lua type(box.session.on_disconnect(nil)) +--- + - function +... +lua active_connections +--- + - 0 +... diff --git a/test/box/session.test b/test/box/session.test new file mode 100644 index 0000000000000000000000000000000000000000..e604dd5cac9ca1f80bd97e1b9210c43fb0e1156c --- /dev/null +++ b/test/box/session.test @@ -0,0 +1,84 @@ +# encoding: tarantool +from lib.admin_connection import AdminConnection +from lib.box_connection import BoxConnection + +exec admin "lua box.session.exists(box.session.id())" +exec admin "lua box.session.exists()" +exec admin "lua box.session.exists(1, 2, 3)" +exec admin "lua box.session.exists(1234567890)" + +# check session.id() +exec admin "lua box.session.id() > 0" +exec admin "lua f = box.fiber.create(function() box.fiber.detach() failed = box.session.id() ~= 0 end)" +exec admin "lua box.fiber.resume(f)" +exec admin "lua failed" +exec admin "lua f1 = box.fiber.create(function() if box.session.id() == 0 then failed = true end end)" +exec admin "lua box.fiber.resume(f1)" +exec admin "lua failed" +exec admin "lua box.session.peer() == box.session.peer(box.session.id())" + +# check on_connect/on_disconnect triggers +exec admin "lua box.session.on_connect(function() end)" +exec admin "lua box.session.on_disconnect(function() end)" + +# check it's possible to reset these triggers +# +exec admin "lua type(box.session.on_connect(function() error('hear') end))" +exec admin "lua type(box.session.on_disconnect(function() error('hear') end))" + +# check on_connect/on_disconnect argument count and type +exec admin "lua box.session.on_connect()" +exec admin "lua box.session.on_disconnect()" + +exec admin "lua box.session.on_connect(function() end, function() end)" +exec admin "lua box.session.on_disconnect(function() end, function() end)" + +exec admin "lua box.session.on_connect(1, 2)" +exec admin "lua box.session.on_disconnect(1, 2)" + +exec admin "lua box.session.on_connect(1)" +exec admin "lua box.session.on_disconnect(1)" + +# use of nil to clear the trigger +exec admin "lua type(box.session.on_connect(nil))" +exec admin "lua type(box.session.on_disconnect(nil))" +exec admin "lua type(box.session.on_connect(nil))" +exec admin "lua type(box.session.on_disconnect(nil))" + +# check how connect/disconnect triggers work +exec admin "lua function inc() active_connections = active_connections + 1 end" +exec admin "lua function dec() active_connections = active_connections - 1 end" +exec admin "lua box.session.on_connect(inc)" +exec admin "lua box.session.on_disconnect(dec)" +exec admin "lua active_connections = 0" +con1 = AdminConnection('localhost', server.admin_port) +exec con1 "lua active_connections" +con2 = AdminConnection('localhost', server.admin_port) +exec con2 "lua active_connections" +con1.disconnect() +con2.disconnect() +exec admin "lua type(box.session.on_connect(nil))" +exec admin "lua type(box.session.on_disconnect(nil))" + +# write audit trail of connect/disconnect into a space +exec admin "lua box.session.on_connect(function() box.insert(0, box.session.id()) end)" +exec admin "lua box.session.on_disconnect(function() box.delete(0, box.session.id()) end)" +exec con1 "lua box.unpack('i', box.select(0, 0, box.session.id())[0]) == box.session.id()" +con1.disconnect() + +# if on_connect() trigger raises an exception, the connection is dropped +exec admin "lua type(box.session.on_connect(function() nosuchfunction() end))" +con1 = BoxConnection('localhost', server.primary_port) +try: + con1.execute("select * from t0 where k0=0") + con1.execute("select * from t0 where k0=0") +except Exception as e: + print "disconnected" + +# cleanup + +exec admin "lua type(box.session.on_connect(nil))" +exec admin "lua type(box.session.on_disconnect(nil))" +exec admin "lua active_connections" + +# vim: syntax=python