From db6d2c844025d26cfbd7e7b89e1510eec2235a99 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Wed, 9 Jan 2013 17:53:51 +0400
Subject: [PATCH] Implement box.session, on_connect and on_disconnect triggers.

Since connect/disconnect events in iproto run outside
fiber context, queue execution of on_connect/on_disconnect
triggers through the request queue. Generalize the request
queue for that purpose.

Implement a framework for on_connect/on_disconnect triggers
and Lua bindings for it.

Add tests and docs.
---
 doc/user/data-model.xml        |   8 +-
 doc/user/stored-procedures.xml |  80 +++++++-
 include/box/box.h              |   7 -
 include/lua/session.h          |  35 ++++
 include/mhash.h                |   8 +
 include/session.h              | 106 ++++++++++
 include/tarantool.h            |   2 -
 src/CMakeLists.txt             |   2 +
 src/admin.m                    | 116 ++++++-----
 src/admin.rl                   |  10 +-
 src/box/box.m                  |   8 -
 src/fiber.m                    |   2 -
 src/iproto.m                   | 356 +++++++++++++++++++++++----------
 src/lua/init.m                 |  26 +--
 src/lua/session.m              | 204 +++++++++++++++++++
 src/session.m                  | 117 +++++++++++
 src/tarantool.m                |   3 +
 test/box/fiber.result          |  21 --
 test/box/fiber.test            |  10 -
 test/box/lua.result            |  13 +-
 test/box/session.result        | 171 ++++++++++++++++
 test/box/session.test          |  84 ++++++++
 22 files changed, 1149 insertions(+), 240 deletions(-)
 create mode 100644 include/lua/session.h
 create mode 100644 include/session.h
 create mode 100644 src/lua/session.m
 create mode 100644 src/session.m
 create mode 100644 test/box/session.result
 create mode 100644 test/box/session.test

diff --git a/doc/user/data-model.xml b/doc/user/data-model.xml
index 63560fd000..f6e6f590ae 100644
--- a/doc/user/data-model.xml
+++ b/doc/user/data-model.xml
@@ -10,10 +10,10 @@
   length is varying: a tuple can contain any number
   of fields. A field can be either numeric &mdash;
   32- or 64- bit unsigned integer, or binary
-  string &mdash; a sequence of octets.  The first field of a
-  tuple is always assumed to be
-  the identifying (unique) key. The remaining fields make up a 
-  value, associated with the key.
+  string &mdash; a sequence of octets.  Fields included into the
+  first index are always assumed to be the identifying (unique)
+  key. The remaining fields make up a value, associated with the
+  key.
   Tuple sets are called <emphasis>spaces<alt>perhaps, not the best name</alt></emphasis>, and there can be up to 255 spaces defined per
   one Tarantool instance.
 
diff --git a/doc/user/stored-procedures.xml b/doc/user/stored-procedures.xml
index 22c57c9458..9f95b4d547 100644
--- a/doc/user/stored-procedures.xml
+++ b/doc/user/stored-procedures.xml
@@ -175,7 +175,7 @@ Call OK, 2 rows affected
     In the above example, the way the procedure receives its
     argument is identical in two protocols, when the argument is a
     string. A numeric field, however, when submitted via the
-    binary protocol, is seen by the procedure as 
+    binary protocol, is seen by the procedure as
     a 4-byte blob, not as a Lua <quote>number</quote> type.
     </para>
     <para>In addition to conventional method invocation,
@@ -432,7 +432,7 @@ localhost> lua box.select(5, 1, 'firstname', 'lastname')
                 argument. A format specifier also works as a
                 placeholder for the number of a field, which needs
                 to be updated, or for an argument value.
-                For example: 
+                For example:
                 <simplelist>
                     <member><code>+p=p</code> &mdash; add a value
                     to one field and assign another,
@@ -945,7 +945,7 @@ qkl
 ...
 localhost> lua t:unpack()
 ---
- - 
+ -
  - abc
  - cde
  - efg
@@ -1564,6 +1564,80 @@ and <code>box.fiber.testcancel()</code> is checked whenever such an event occurs
 
 <!--   end of lib -->
 
+<variablelist>
+    <title>Package <code>box.session</code></title>
+    <para>Learn session state, set on-connect and
+    on-disconnect triggers.
+    </para>
+    <para>
+A session is an object associated with each client connection.
+Through this module, it's possible to query session state,
+as well as set a Lua chunk executed on connect or disconnect
+event.
+    </para>
+    <varlistentry>
+        <term>
+            <emphasis role="lua">box.session.id() </emphasis>
+        </term>
+        <listitem><simpara>Return a unique monotonic identifier of
+        the current session. The identifier can be used to check
+        whether or not a session is alive. 0 means there is no
+        session (e.g. a procedure is running in a detached
+        fiber).
+        </simpara></listitem>
+    </varlistentry>
+
+    <varlistentry>
+        <term>
+            <emphasis role="lua">box.session.fd(id) </emphasis>
+        </term>
+        <listitem><simpara>Return an integer file descriptor
+        associated with the connected client.</simpara></listitem>
+    </varlistentry>
+
+    <varlistentry>
+        <term>
+            <emphasis role="lua">box.session.exists(id) </emphasis>
+        </term>
+        <listitem><simpara>Return true if a session is alive,
+        false otherwise.</simpara></listitem>
+    </varlistentry>
+
+    <varlistentry>
+        <term>
+            <emphasis role="lua">box.session.on_connect(chunk) </emphasis>
+        </term>
+        <listitem><para>
+		Set a callback (trigger) invoked on each connected session.
+        The callback doesn't get any arguments, but is the first
+        thing invoked in the scope of the newly created session.
+        If the trigger fails by raising an error, the error
+        is sent to the client and the connection is shut down.
+        Returns the old value of the trigger.
+        </para>
+        <warning>
+        <para>
+        If a trigger always results in an error, it may become
+        impossible to connect to the server to reset it.
+        </para>
+        </warning>
+        </listitem>
+    </varlistentry>
+
+    <varlistentry>
+        <term>
+            <emphasis role="lua">box.session.on_disconnect(chunk)</emphasis>
+        </term>
+        <listitem><simpara>Set a trigger invoked after a client has
+        disconnected. Returns the old value of the trigger. If
+        the trigger raises an error, the error is logged but otherwise
+        is ignored.
+        </simpara></listitem>
+    </varlistentry>
+</variablelist>
+
+<!--   end of lib -->
+
 <variablelist>
     <title>Package <code>box.ipc</code> &mdash; inter procedure communication</title>
     <simpara>
diff --git a/include/box/box.h b/include/box/box.h
index 6cd4096bc5..9eba850489 100644
--- a/include/box/box.h
+++ b/include/box/box.h
@@ -84,12 +84,6 @@ void box_snapshot(struct log_io *, struct fio_batch *batch);
  */
 void box_info(struct tbuf *out);
 const char *box_status(void);
-/**
- * Issue a new session identifier -
- * called by the networking layer
- * when a new connection is established.
- */
-uint32_t box_sid();
 /**
  * Called to enter master or replica
  * mode (depending on the configuration) after
@@ -97,5 +91,4 @@ uint32_t box_sid();
  */
 void
 box_leave_local_standby_mode(void *data __attribute__((unused)));
-
 #endif /* INCLUDES_TARANTOOL_BOX_H */
diff --git a/include/lua/session.h b/include/lua/session.h
new file mode 100644
index 0000000000..d6456d5987
--- /dev/null
+++ b/include/lua/session.h
@@ -0,0 +1,35 @@
+#ifndef INCLUDES_TARANTOOL_LUA_SESSION_H
+#define INCLUDES_TARANTOOL_LUA_SESSION_H
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+struct lua_State;
+
+void
+tarantool_lua_session_init(struct lua_State *L);
+#endif /* INCLUDES_TARANTOOL_LUA_SESSION_H */
diff --git a/include/mhash.h b/include/mhash.h
index e0b932fbb2..04af6f83e2 100644
--- a/include/mhash.h
+++ b/include/mhash.h
@@ -286,6 +286,14 @@ _mh(del)(struct _mh(t) *h, mh_int_t x)
 }
 #endif
 
+static inline void
+_mh(remove)(struct _mh(t) *h, mh_key_t key)
+{
+	mh_int_t k = _mh(get)(h, key);
+	if (k != mh_end(h))
+		_mh(del)(h, k);
+}
+
 
 #ifdef MH_SOURCE
 
diff --git a/include/session.h b/include/session.h
new file mode 100644
index 0000000000..2eaf1d9c88
--- /dev/null
+++ b/include/session.h
@@ -0,0 +1,106 @@
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include <inttypes.h>
+#include <stdbool.h>
+
+/**
+ * Abstraction of a single user session:
+ * for now, only provides accounting of established
+ * sessions and on-connect/on-disconnect event
+ * handling, in future: user credentials, protocol, etc.
+ * Session identifiers grow monotonically.
+ * 0 sid is reserved to mean 'no session'.
+ */
+
+/**
+ * Create a session.
+ * Invokes a Lua trigger box.session.on_connect if it is
+ * defined. Issues a new session identifier.
+ * Must called by the networking layer
+ * when a new connection is established.
+ *
+ * @return handle for a created session
+ * @exception tnt_Exception or lua error if session
+ * trigger fails or runs out of resources.
+ */
+uint32_t
+session_create(int fd);
+
+/**
+ * Destroy a session.
+ * Must be called by the networking layer on disconnect.
+ * Invokes a Lua trigger box.session.on_disconnect if it
+ * is defined.
+ * @param session   session to destroy. may be NULL.
+ *
+ * @exception none
+ */
+void
+session_destroy(uint32_t sid);
+
+
+/**
+ * Return a file descriptor
+ * associated with a session, or -1 if the
+ * session doesn't exist.
+ */
+int
+session_fd(uint32_t sid);
+
+/**
+ * Check whether a session exists or not.
+ */
+static inline bool
+session_exists(uint32_t sid)
+{
+	return session_fd(sid) >= 0;
+}
+
+/**
+ * Type of the callback which may be invoked
+ * on connect or disconnect event.
+ */
+typedef void (*session_trigger_f)(void *);
+
+struct session_trigger
+{
+	session_trigger_f trigger;
+	void *param;
+};
+
+/* The global on-connect trigger. */
+extern struct session_trigger session_on_connect;
+/* The global on-disconnect trigger. */
+extern struct session_trigger session_on_disconnect;
+
+void
+session_init();
+
+void
+session_free();
diff --git a/include/tarantool.h b/include/tarantool.h
index 3812c783cd..6da91c9b2e 100644
--- a/include/tarantool.h
+++ b/include/tarantool.h
@@ -33,8 +33,6 @@
 
 struct tarantool_cfg;
 struct tbuf;
-struct log_io;
-struct fio_batch;
 
 extern int snapshot_pid;
 extern struct tarantool_cfg cfg;
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index bc8a9e86c8..e55c8ed34d 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -118,6 +118,7 @@ set (common_sources
      cpu_feature.m
      replica.m
      iproto.m
+     session.m
      object.m
      exception.m
      errcode.c
@@ -132,6 +133,7 @@ set (common_sources
      lua/slab.m
      lua/uuid.m
      lua/lua_ipc.m
+     lua/session.m
 )
 
 if (ENABLE_TRACE)
diff --git a/src/admin.m b/src/admin.m
index cac4900167..10f4e8ce4b 100644
--- a/src/admin.m
+++ b/src/admin.m
@@ -51,6 +51,7 @@
 #include "lauxlib.h"
 #include "lualib.h"
 #include "box/box.h"
+#include "session.h"
 
 static const char *help =
 	"available commands:" CRLF
@@ -72,7 +73,7 @@ static const char *help =
 static const char *unknown_command = "unknown command. try typing help." CRLF;
 
 
-#line 76 "src/admin.m"
+#line 77 "src/admin.m"
 static const int admin_start = 1;
 static const int admin_first_final = 135;
 static const int admin_error = 0;
@@ -80,7 +81,7 @@ static const int admin_error = 0;
 static const int admin_en_main = 1;
 
 
-#line 75 "src/admin.rl"
+#line 76 "src/admin.rl"
 
 
 struct salloc_stat_admin_cb_ctx {
@@ -217,12 +218,12 @@ admin_dispatch(struct ev_io *coio, struct iobuf *iobuf, lua_State *L)
 	p = in->pos;
 
 	
-#line 221 "src/admin.m"
+#line 222 "src/admin.m"
 	{
 	cs = admin_start;
 	}
 
-#line 226 "src/admin.m"
+#line 227 "src/admin.m"
 	{
 	if ( p == pe )
 		goto _test_eof;
@@ -285,15 +286,15 @@ case 6:
 	}
 	goto st0;
 tr13:
-#line 308 "src/admin.rl"
+#line 309 "src/admin.rl"
 	{slab_validate(); ok(out);}
 	goto st135;
 tr20:
-#line 296 "src/admin.rl"
+#line 297 "src/admin.rl"
 	{return -1;}
 	goto st135;
 tr25:
-#line 223 "src/admin.rl"
+#line 224 "src/admin.rl"
 	{
 			start(out);
 			tbuf_append(out, help, strlen(help));
@@ -301,9 +302,9 @@ tr25:
 		}
 	goto st135;
 tr36:
-#line 282 "src/admin.rl"
+#line 283 "src/admin.rl"
 	{strend = p;}
-#line 229 "src/admin.rl"
+#line 230 "src/admin.rl"
 	{
 			strstart[strend-strstart]='\0';
 			start(out);
@@ -312,7 +313,7 @@ tr36:
 		}
 	goto st135;
 tr43:
-#line 236 "src/admin.rl"
+#line 237 "src/admin.rl"
 	{
 			if (reload_cfg(err))
 				fail(out, err);
@@ -321,11 +322,11 @@ tr43:
 		}
 	goto st135;
 tr67:
-#line 306 "src/admin.rl"
+#line 307 "src/admin.rl"
 	{coredump(60); ok(out);}
 	goto st135;
 tr76:
-#line 243 "src/admin.rl"
+#line 244 "src/admin.rl"
 	{
 			int ret = snapshot(NULL, 0);
 
@@ -340,9 +341,9 @@ tr76:
 		}
 	goto st135;
 tr98:
-#line 292 "src/admin.rl"
+#line 293 "src/admin.rl"
 	{ state = false; }
-#line 256 "src/admin.rl"
+#line 257 "src/admin.rl"
 	{
 			strstart[strend-strstart] = '\0';
 			if (errinj_set_byname(strstart, state)) {
@@ -354,9 +355,9 @@ tr98:
 		}
 	goto st135;
 tr101:
-#line 291 "src/admin.rl"
+#line 292 "src/admin.rl"
 	{ state = true; }
-#line 256 "src/admin.rl"
+#line 257 "src/admin.rl"
 	{
 			strstart[strend-strstart] = '\0';
 			if (errinj_set_byname(strstart, state)) {
@@ -368,7 +369,7 @@ tr101:
 		}
 	goto st135;
 tr117:
-#line 211 "src/admin.rl"
+#line 212 "src/admin.rl"
 	{
 			start(out);
 			show_cfg(out);
@@ -376,15 +377,15 @@ tr117:
 		}
 	goto st135;
 tr131:
-#line 299 "src/admin.rl"
+#line 300 "src/admin.rl"
 	{start(out); fiber_info(out); end(out);}
 	goto st135;
 tr137:
-#line 298 "src/admin.rl"
+#line 299 "src/admin.rl"
 	{start(out); tarantool_info(out); end(out);}
 	goto st135;
 tr146:
-#line 217 "src/admin.rl"
+#line 218 "src/admin.rl"
 	{
 			start(out);
 			errinj_info(out);
@@ -392,33 +393,33 @@ tr146:
 		}
 	goto st135;
 tr152:
-#line 302 "src/admin.rl"
+#line 303 "src/admin.rl"
 	{start(out); palloc_stat(out); end(out);}
 	goto st135;
 tr160:
-#line 301 "src/admin.rl"
+#line 302 "src/admin.rl"
 	{start(out); show_slab(out); end(out);}
 	goto st135;
 tr164:
-#line 303 "src/admin.rl"
+#line 304 "src/admin.rl"
 	{start(out); show_stat(out);end(out);}
 	goto st135;
 st135:
 	if ( ++p == pe )
 		goto _test_eof135;
 case 135:
-#line 411 "src/admin.m"
+#line 412 "src/admin.m"
 	goto st0;
 tr14:
-#line 308 "src/admin.rl"
+#line 309 "src/admin.rl"
 	{slab_validate(); ok(out);}
 	goto st7;
 tr21:
-#line 296 "src/admin.rl"
+#line 297 "src/admin.rl"
 	{return -1;}
 	goto st7;
 tr26:
-#line 223 "src/admin.rl"
+#line 224 "src/admin.rl"
 	{
 			start(out);
 			tbuf_append(out, help, strlen(help));
@@ -426,9 +427,9 @@ tr26:
 		}
 	goto st7;
 tr37:
-#line 282 "src/admin.rl"
+#line 283 "src/admin.rl"
 	{strend = p;}
-#line 229 "src/admin.rl"
+#line 230 "src/admin.rl"
 	{
 			strstart[strend-strstart]='\0';
 			start(out);
@@ -437,7 +438,7 @@ tr37:
 		}
 	goto st7;
 tr44:
-#line 236 "src/admin.rl"
+#line 237 "src/admin.rl"
 	{
 			if (reload_cfg(err))
 				fail(out, err);
@@ -446,11 +447,11 @@ tr44:
 		}
 	goto st7;
 tr68:
-#line 306 "src/admin.rl"
+#line 307 "src/admin.rl"
 	{coredump(60); ok(out);}
 	goto st7;
 tr77:
-#line 243 "src/admin.rl"
+#line 244 "src/admin.rl"
 	{
 			int ret = snapshot(NULL, 0);
 
@@ -465,9 +466,9 @@ tr77:
 		}
 	goto st7;
 tr99:
-#line 292 "src/admin.rl"
+#line 293 "src/admin.rl"
 	{ state = false; }
-#line 256 "src/admin.rl"
+#line 257 "src/admin.rl"
 	{
 			strstart[strend-strstart] = '\0';
 			if (errinj_set_byname(strstart, state)) {
@@ -479,9 +480,9 @@ tr99:
 		}
 	goto st7;
 tr102:
-#line 291 "src/admin.rl"
+#line 292 "src/admin.rl"
 	{ state = true; }
-#line 256 "src/admin.rl"
+#line 257 "src/admin.rl"
 	{
 			strstart[strend-strstart] = '\0';
 			if (errinj_set_byname(strstart, state)) {
@@ -493,7 +494,7 @@ tr102:
 		}
 	goto st7;
 tr118:
-#line 211 "src/admin.rl"
+#line 212 "src/admin.rl"
 	{
 			start(out);
 			show_cfg(out);
@@ -501,15 +502,15 @@ tr118:
 		}
 	goto st7;
 tr132:
-#line 299 "src/admin.rl"
+#line 300 "src/admin.rl"
 	{start(out); fiber_info(out); end(out);}
 	goto st7;
 tr138:
-#line 298 "src/admin.rl"
+#line 299 "src/admin.rl"
 	{start(out); tarantool_info(out); end(out);}
 	goto st7;
 tr147:
-#line 217 "src/admin.rl"
+#line 218 "src/admin.rl"
 	{
 			start(out);
 			errinj_info(out);
@@ -517,22 +518,22 @@ tr147:
 		}
 	goto st7;
 tr153:
-#line 302 "src/admin.rl"
+#line 303 "src/admin.rl"
 	{start(out); palloc_stat(out); end(out);}
 	goto st7;
 tr161:
-#line 301 "src/admin.rl"
+#line 302 "src/admin.rl"
 	{start(out); show_slab(out); end(out);}
 	goto st7;
 tr165:
-#line 303 "src/admin.rl"
+#line 304 "src/admin.rl"
 	{start(out); show_stat(out);end(out);}
 	goto st7;
 st7:
 	if ( ++p == pe )
 		goto _test_eof7;
 case 7:
-#line 536 "src/admin.m"
+#line 537 "src/admin.m"
 	if ( (*p) == 10 )
 		goto st135;
 	goto st0;
@@ -685,28 +686,28 @@ case 23:
 	}
 	goto tr33;
 tr33:
-#line 282 "src/admin.rl"
+#line 283 "src/admin.rl"
 	{strstart = p;}
 	goto st24;
 st24:
 	if ( ++p == pe )
 		goto _test_eof24;
 case 24:
-#line 696 "src/admin.m"
+#line 697 "src/admin.m"
 	switch( (*p) ) {
 		case 10: goto tr36;
 		case 13: goto tr37;
 	}
 	goto st24;
 tr34:
-#line 282 "src/admin.rl"
+#line 283 "src/admin.rl"
 	{strstart = p;}
 	goto st25;
 st25:
 	if ( ++p == pe )
 		goto _test_eof25;
 case 25:
-#line 710 "src/admin.m"
+#line 711 "src/admin.m"
 	switch( (*p) ) {
 		case 10: goto tr36;
 		case 13: goto tr37;
@@ -1156,28 +1157,28 @@ case 73:
 		goto tr91;
 	goto st0;
 tr91:
-#line 290 "src/admin.rl"
+#line 291 "src/admin.rl"
 	{ strstart = p; }
 	goto st74;
 st74:
 	if ( ++p == pe )
 		goto _test_eof74;
 case 74:
-#line 1167 "src/admin.m"
+#line 1168 "src/admin.m"
 	if ( (*p) == 32 )
 		goto tr92;
 	if ( 33 <= (*p) && (*p) <= 126 )
 		goto st74;
 	goto st0;
 tr92:
-#line 290 "src/admin.rl"
+#line 291 "src/admin.rl"
 	{ strend = p; }
 	goto st75;
 st75:
 	if ( ++p == pe )
 		goto _test_eof75;
 case 75:
-#line 1181 "src/admin.m"
+#line 1182 "src/admin.m"
 	switch( (*p) ) {
 		case 32: goto st75;
 		case 111: goto st76;
@@ -1869,7 +1870,7 @@ case 134:
 	_out: {}
 	}
 
-#line 314 "src/admin.rl"
+#line 315 "src/admin.rl"
 
 
 	in->pos = pe;
@@ -1889,10 +1890,16 @@ admin_handler(va_list ap)
 {
 	struct ev_io coio = va_arg(ap, struct ev_io);
 	struct iobuf *iobuf = va_arg(ap, struct iobuf *);
-	fiber_set_sid(fiber, box_sid());
 	lua_State *L = lua_newthread(tarantool_L);
 	int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
 	@try {
+		/*
+		 * Admin and iproto connections must have a
+		 * session object, representing the state of
+		 * a remote client: it's used in Lua
+		 * stored procedures.
+		 */
+		session_create(coio.fd);
 		for (;;) {
 			if (admin_dispatch(&coio, iobuf, L) < 0)
 				return;
@@ -1903,6 +1910,7 @@ admin_handler(va_list ap)
 		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
 		evio_close(&coio);
 		iobuf_destroy(iobuf);
+		session_destroy(fiber->sid);
 	}
 }
 
diff --git a/src/admin.rl b/src/admin.rl
index 357f7f9770..3b1f361909 100644
--- a/src/admin.rl
+++ b/src/admin.rl
@@ -49,6 +49,7 @@
 #include "lauxlib.h"
 #include "lualib.h"
 #include "box/box.h"
+#include "session.h"
 
 static const char *help =
 	"available commands:" CRLF
@@ -330,10 +331,16 @@ admin_handler(va_list ap)
 {
 	struct ev_io coio = va_arg(ap, struct ev_io);
 	struct iobuf *iobuf = va_arg(ap, struct iobuf *);
-	fiber_set_sid(fiber, box_sid());
 	lua_State *L = lua_newthread(tarantool_L);
 	int coro_ref = luaL_ref(tarantool_L, LUA_REGISTRYINDEX);
 	@try {
+		/*
+		 * Admin and iproto connections must have a
+		 * session object, representing the state of
+		 * a remote client: it's used in Lua
+		 * stored procedures.
+		 */
+		session_create(coio.fd);
 		for (;;) {
 			if (admin_dispatch(&coio, iobuf, L) < 0)
 				return;
@@ -344,6 +351,7 @@ admin_handler(va_list ap)
 		luaL_unref(tarantool_L, LUA_REGISTRYINDEX, coro_ref);
 		evio_close(&coio);
 		iobuf_destroy(iobuf);
+		session_destroy(fiber->sid);
 	}
 }
 
diff --git a/src/box/box.m b/src/box/box.m
index aad13d6062..ab3e6c1e34 100644
--- a/src/box/box.m
+++ b/src/box/box.m
@@ -539,11 +539,3 @@ box_status(void)
 {
     return status;
 }
-
-
-uint32_t
-box_sid()
-{
-	static uint32_t sid = 1;
-	return sid++;
-}
diff --git a/src/fiber.m b/src/fiber.m
index 8bd36bd777..02ba766e31 100644
--- a/src/fiber.m
+++ b/src/fiber.m
@@ -305,8 +305,6 @@ fiber_find(int fid)
 
 	if (k == mh_end(fiber_registry))
 		return NULL;
-	if (!mh_exist(fiber_registry, k))
-		return NULL;
 	return mh_value(fiber_registry, k);
 }
 
diff --git a/src/iproto.m b/src/iproto.m
index 2560c34812..d3fa59411c 100644
--- a/src/iproto.m
+++ b/src/iproto.m
@@ -44,6 +44,7 @@
 #include "box/request.h"
 #include "iobuf.h"
 #include "evio.h"
+#include "session.h"
 
 enum {
 	/** Maximal iproto package body length (2GiB) */
@@ -61,6 +62,8 @@ struct iproto_header {
 	uint32_t sync;
 } __attribute__((packed));
 
+static struct iproto_header dummy_header = { 0, 0, 0 };
+
 struct iproto_reply_header {
 	struct iproto_header hdr;
 	uint32_t ret_code;
@@ -75,7 +78,7 @@ iproto(const void *pos)
 	return (struct iproto_header *) pos;
 }
 
-/* {{{ struct port_iproto */
+/* {{{ port_iproto */
 
 /**
  * struct port_iproto users need to be careful to:
@@ -148,8 +151,9 @@ static struct port_vtab port_iproto_vtab = {
 	port_iproto_eof,
 };
 
-void
-port_iproto_init(struct port_iproto *port, struct obuf *buf, struct iproto_header *req)
+static void
+port_iproto_init(struct port_iproto *port, struct obuf *buf,
+		 struct iproto_header *req)
 {
 	port->vtab = &port_iproto_vtab;
 	port->buf = buf;
@@ -160,10 +164,59 @@ port_iproto_init(struct port_iproto *port, struct obuf *buf, struct iproto_heade
 
 /* }}} */
 
-/* {{{ iproto_request_queue */
+/* {{{ iproto_queue */
+
+struct iproto_request;
+
+/**
+ * Implementation of an input queue of the box request processor.
+ *
+ * Socket event handlers read data, determine request boundaries
+ * and enqueue requests. Once all input/output events are
+ * processed, an own event handler is invoked to deal with the
+ * requests in the queue: it's important that each request is
+ * processed in a fiber environment.
+ *
+ * @sa iproto_queue_schedule, iproto_handler, iproto_handshake
+ */
+
+struct iproto_queue
+{
+	/**
+	 * Ring buffer of fixed size, preallocated
+	 * during initialization.
+	 */
+	struct iproto_request *queue;
+	/**
+	 * Main function of the fiber invoked to handle
+	 * all outstanding tasks in this queue.
+	 */
+	void (*handler)(va_list);
+	/**
+	 * Cache of fibers which work on requests
+	 * in this queue.
+         */
+	struct rlist fiber_cache;
+	/**
+	 * Used to trigger request processing when
+	 * the queue becomes non-empty.
+	 */
+	struct ev_async watcher;
+	/* Ring buffer position. */
+	int begin, end;
+	/* Ring buffer size. */
+	int size;
+};
+
+enum {
+	IPROTO_REQUEST_QUEUE_SIZE = 2048,
+	IPROTO_CONNECT_QUEUE_SIZE = 256
+};
 
 struct iproto_session;
 
+typedef void (*iproto_request_f)(struct iproto_request *);
+
 /**
  * A single request from the client. All requests
  * from all clients are queued into a single queue
@@ -175,55 +228,129 @@ struct iproto_request
 	struct iobuf *iobuf;
 	/* Position of the request in the input buffer. */
 	struct iproto_header *header;
+	iproto_request_f process;
 };
 
-/** Request queue. */
-enum { IPROTO_REQUEST_QUEUE_SIZE = 2048 };
-struct iproto_request_queue
-{
-	int begin, end;
-	struct iproto_request queue[IPROTO_REQUEST_QUEUE_SIZE];
-} ir_queue;
+/**
+ * A single global queue for all requests in all connections. All
+ * requests are processed concurrently.
+ * Is also used as a queue for just established connections and to
+ * execute disconnect triggers. A few notes about these triggers:
+ * - they need to be run in a fiber
+ * - unlike an ordinary request failure, on_connect trigger
+ *   failure must lead to connection shutdown.
+ * - as long as on_connect trigger can be used for client
+ *   authentication, it must be processed before any other request
+ *   on this connection.
+ */
+static struct iproto_queue request_queue;
 
-static struct ev_async iproto_postio;
+static inline bool
+iproto_queue_is_empty(struct iproto_queue *i_queue)
+{
+	return i_queue->begin == i_queue->end;
+}
 
 static inline void
-iproto_enqueue_request(struct iproto_session *session, struct iobuf *iobuf,
-		       struct iproto_header *header)
+iproto_enqueue_request(struct iproto_queue *i_queue,
+		       struct iproto_session *session,
+		       struct iobuf *iobuf,
+		       struct iproto_header *header,
+		       iproto_request_f process)
 {
-	/* The queue is full. Invoke iproto_handler to work it off. */
-	if (ir_queue.end == IPROTO_REQUEST_QUEUE_SIZE)
-		ev_invoke(&iproto_postio, EV_CUSTOM);
-	assert(ir_queue.end < IPROTO_REQUEST_QUEUE_SIZE);
-	struct iproto_request *request = ir_queue.queue + ir_queue.end++;
+	/* If the queue is full, invoke the handler to work it off. */
+	if (i_queue->end == i_queue->size)
+		ev_invoke(&i_queue->watcher, EV_CUSTOM);
+	assert(i_queue->end < i_queue->size);
+	bool was_empty = iproto_queue_is_empty(i_queue);
+	struct iproto_request *request = i_queue->queue + i_queue->end++;
 
 	request->session = session;
 	request->iobuf = iobuf;
 	request->header = header;
+	request->process = process;
+	/*
+	 * There were some queued requests, ensure they are
+	 * handled.
+	 */
+	if (was_empty)
+		ev_feed_event(&request_queue.watcher, EV_CUSTOM);
 }
 
-static inline struct iproto_header *
-iproto_dequeue_request(struct iproto_session **session, struct iobuf **iobuf)
+static inline bool
+iproto_dequeue_request(struct iproto_queue *i_queue,
+		       struct iproto_request *out)
 {
-	if (ir_queue.begin == ir_queue.end)
-		return NULL;
-	struct iproto_request *request = ir_queue.queue + ir_queue.begin++;
-	if (ir_queue.begin == ir_queue.end)
-		ir_queue.begin = ir_queue.end = 0;
-	*session = request->session;
-	*iobuf = request->iobuf;
-	return request->header;
+	if (i_queue->begin == i_queue->end)
+		return false;
+	struct iproto_request *request = i_queue->queue + i_queue->begin++;
+	if (i_queue->begin == i_queue->end)
+		i_queue->begin = i_queue->end = 0;
+	*out = *request;
+	return true;
 }
 
-static inline bool
-iproto_request_queue_is_empty()
+/** Put the current fiber into a queue fiber cache. */
+static inline void
+iproto_cache_fiber(struct iproto_queue *i_queue)
+{
+	fiber_gc();
+	rlist_add_entry(&i_queue->fiber_cache, fiber, state);
+	fiber_yield();
+}
+
+/** Create fibers to handle all outstanding tasks. */
+static void
+iproto_queue_schedule(struct ev_async *watcher,
+		      int events __attribute__((unused)))
+{
+	struct iproto_queue *i_queue = watcher->data;
+	while (! iproto_queue_is_empty(i_queue)) {
+
+		struct fiber *f = rlist_shift_entry(&i_queue->fiber_cache,
+						    struct fiber, state);
+		if (f == NULL)
+			f = fiber_create("iproto", i_queue->handler);
+		fiber_call(f, i_queue);
+	}
+}
+
+static inline void
+iproto_queue_init(struct iproto_queue *i_queue,
+		  int size, void (*handler)(va_list))
+{
+	i_queue->size = size;
+	i_queue->begin = i_queue->end = 0;
+	i_queue->queue = palloc(eter_pool, size *
+				sizeof (struct iproto_request));
+	/**
+	 * Initialize an ev_async event which would start
+	 * workers for all outstanding tasks.
+	 */
+	ev_async_init(&i_queue->watcher, iproto_queue_schedule);
+	i_queue->watcher.data = i_queue;
+	i_queue->handler = handler;
+	rlist_init(&i_queue->fiber_cache);
+}
+
+/** A handler to process all queued requests. */
+static void
+iproto_queue_handler(va_list ap)
 {
-	return ir_queue.begin == ir_queue.end;
+	struct iproto_queue *i_queue = va_arg(ap, struct iproto_queue *);
+	struct iproto_request request;
+restart:
+	while (iproto_dequeue_request(i_queue, &request)) {
+
+		request.process(&request);
+	}
+	iproto_cache_fiber(&request_queue);
+	goto restart;
 }
 
 /* }}} */
 
-/* {{{ struct iproto_session */
+/* {{{ iproto_session */
 
 /** Context of a single client connection. */
 struct iproto_session
@@ -252,10 +379,14 @@ struct iproto_session
 	ssize_t parse_size;
 	/** Current write position in the output buffer */
 	struct obuf_svp write_pos;
+	/**
+	 * Function of the request processor to handle
+	 * a single request.
+	 */
 	box_process_func *handler;
 	struct ev_io input;
 	struct ev_io output;
-	/** Mod session id. */
+	/** Session id. */
 	uint32_t sid;
 };
 
@@ -282,6 +413,15 @@ static void
 iproto_session_on_output(struct ev_io *watcher,
 			 int revents __attribute__((unused)));
 
+static void
+iproto_process_request(struct iproto_request *request);
+
+static void
+iproto_process_connect(struct iproto_request *request);
+
+static void
+iproto_process_disconnect(struct iproto_request *request);
+
 static struct iproto_session *
 iproto_session_create(const char *name, int fd, box_process_func *param)
 {
@@ -300,14 +440,16 @@ iproto_session_create(const char *name, int fd, box_process_func *param)
 	session->iobuf[1] = iobuf_create(name);
 	session->parse_size = 0;
 	session->write_pos = obuf_create_svp(&session->iobuf[0]->out);
-	session->sid = box_sid();
+	session->sid = 0;
 	return session;
 }
 
+/** Recycle a session. Never throws. */
 static inline void
 iproto_session_destroy(struct iproto_session *session)
 {
 	assert(iproto_session_is_idle(session));
+	session_destroy(session->sid); /* Never throws. No-op if sid is 0. */
 	iobuf_destroy(session->iobuf[0]);
 	iobuf_destroy(session->iobuf[1]);
 	SLIST_INSERT_HEAD(&iproto_session_cache, session, next_in_cache);
@@ -332,11 +474,16 @@ iproto_session_shutdown(struct iproto_session *session)
 	 * as soon as all parsed data is processed.
 	 */
 	session->iobuf[0]->in.end -= session->parse_size;
-	iproto_session_gc(session);
+	/*
+	 * If the session is not idle it will
+	 * be destroyed only after all its requests
+	 * are processed (and dropped).
+         */
+	iproto_enqueue_request(&request_queue, session,
+			       session->iobuf[0], &dummy_header,
+			       iproto_process_disconnect);
 }
 
-/* }}} */
-
 static inline void
 iproto_validate_header(struct iproto_header *header, int fd)
 {
@@ -421,7 +568,7 @@ iproto_session_input_iobuf(struct iproto_session *session)
 }
 
 /** Enqueue all requests which were read up. */
-static inline int
+static inline void
 iproto_enqueue_batch(struct iproto_session *session, struct ibuf *in, int fd)
 {
 	int batch_size;
@@ -438,11 +585,11 @@ iproto_enqueue_batch(struct iproto_session *session, struct ibuf *in, int fd)
 					   header->len))
 			break;
 
-		iproto_enqueue_request(session, session->iobuf[0],
-				       header);
+		iproto_enqueue_request(&request_queue, session,
+				       session->iobuf[0], header,
+				       iproto_process_request);
 		session->parse_size -= sizeof(*header) + header->len;
 	}
-	return batch_size;
 }
 
 static void
@@ -475,13 +622,7 @@ iproto_session_on_input(struct ev_io *watcher,
 		in->end += nrd;
 		session->parse_size += nrd;
 		/* Enqueue all requests which are fully read up. */
-		if (iproto_enqueue_batch(session, in, fd) > 0) {
-			/*
-			 * There were some queued requests, ensure
-			 * they are handled.
-			 */
-			ev_feed_event(&iproto_postio, EV_CUSTOM);
-		}
+		iproto_enqueue_batch(session, in, fd);
 	} @catch (tnt_Exception *e) {
 		[e log];
 		iproto_session_shutdown(session);
@@ -531,7 +672,7 @@ iproto_session_on_output(struct ev_io *watcher,
 			 int revent __attribute__((unused)))
 {
 	struct iproto_session *session = watcher->data;
-	int fd = session->input.fd;
+	int fd = session->output.fd;
 	struct obuf_svp *svp = &session->write_pos;
 
 	@try {
@@ -552,9 +693,9 @@ iproto_session_on_output(struct ev_io *watcher,
 	}
 }
 
-/* {{{ iproto_reply fiber */
+/* }}} */
 
-struct rlist iproto_fiber_cache;
+/* {{{ iproto_process_* functions */
 
 /** Stack reply to 'ping' packet. */
 static inline void
@@ -602,41 +743,68 @@ iproto_reply(struct port_iproto *port, box_process_func callback,
 	}
 }
 
-/** Execute a single request and cache output in obuf. */
 static void
-iproto_handler(va_list arg __attribute__((unused)))
+iproto_process_request(struct iproto_request *request)
 {
-	struct iproto_header *header;
-	struct iproto_session *session;
-	struct iobuf *iobuf;
+	struct iproto_session *session = request->session;
+	struct iproto_header *header = request->header;
+	struct iobuf *iobuf = request->iobuf;
 	struct port_iproto port;
-restart:
-	while ((header = iproto_dequeue_request(&session, &iobuf))) {
-		if (unlikely(! evio_is_connected(&session->output))) {
-			/*
-			 * Drop a request of a disconnected
-			 * session.
-			 */
-			iobuf->in.pos += sizeof(*header) + header->len;
-			iproto_session_gc(session);
-			continue;
-		}
-		@try {
-			fiber_set_sid(fiber, session->sid);
-			iproto_reply(&port, *session->handler, &iobuf->out, header);
-		} @finally {
-			iobuf->in.pos += sizeof(*header) + header->len;
-		}
+	@try {
+		if (unlikely(! evio_is_connected(&session->output)))
+			return;
+
+		fiber_set_sid(fiber, session->sid);
+		iproto_reply(&port, *session->handler,
+			     &iobuf->out, header);
+
+		if (unlikely(! evio_is_connected(&session->output)))
+			return;
 		if (! ev_is_active(&session->output))
 			ev_feed_event(&session->output, EV_WRITE);
+	} @finally {
+		iobuf->in.pos += sizeof(*header) + header->len;
+		iproto_session_gc(session);
 	}
-	fiber_gc();
-	rlist_add_entry(&iproto_fiber_cache, fiber, state);
-	fiber_yield();
-	goto restart;
 }
 
-/* }}} */
+/**
+ * Handshake a connection: invoke the on-connect trigger
+ * and possibly authenticate. Try to send the client an error
+ * upon a failure.
+ */
+static void
+iproto_process_connect(struct iproto_request *request)
+{
+	struct iproto_session *session = request->session;
+	struct iobuf *iobuf = request->iobuf;
+	int fd = session->input.fd;
+	@try {              /* connect. */
+		session->sid = session_create(fd);
+	} @catch (ClientError *e) {
+		iproto_reply_error(&iobuf->out, request->header, e);
+		iproto_flush(iobuf, fd, &session->write_pos);
+		iproto_session_shutdown(session);
+		return;
+	} @catch (tnt_Exception *e) {
+		[e log];
+		assert(session->sid == 0);
+		iproto_session_shutdown(session);
+		return;
+	}
+	/* Handshake OK, start reading input. */
+	ev_feed_event(&session->input, EV_READ);
+}
+
+static void
+iproto_process_disconnect(struct iproto_request *request)
+{
+	/* Runs the trigger, which may yield. */
+	iproto_session_gc(request->session);
+}
+
+/** }}} */
+
 
 /**
  * Create a session context and start input.
@@ -650,25 +818,9 @@ iproto_on_accept(struct evio_service *service, int fd,
 
 	struct iproto_session *session;
 	session = iproto_session_create(name, fd, service->on_accept_param);
-
-	ev_feed_event(&session->input, EV_READ);
-}
-
-/**
- * Create fibers to handle all outstanding tasks.
- */
-static void
-iproto_schedule(struct ev_async *watcher __attribute__((unused)),
-	      int events __attribute__((unused)))
-{
-	while (! iproto_request_queue_is_empty()) {
-
-		struct fiber *f = rlist_shift_entry(&iproto_fiber_cache,
-						    struct fiber, state);
-		if (f == NULL)
-			f = fiber_create("iproto", iproto_handler);
-		fiber_call(f);
-	}
+	iproto_enqueue_request(&request_queue, session,
+			       session->iobuf[0], &dummy_header,
+			       iproto_process_connect);
 }
 
 /**
@@ -698,11 +850,7 @@ iproto_init(const char *bind_ipaddr, int primary_port,
 				  iproto_on_accept, &box_process_ro);
 		evio_service_start(&secondary);
 	}
-	/**
-	 * Initialize an ev_async event which would start workers
-	 * for all outstanding tasks.
-	 */
-	ev_async_init(&iproto_postio, iproto_schedule);
-	rlist_init(&iproto_fiber_cache);
+	iproto_queue_init(&request_queue, IPROTO_REQUEST_QUEUE_SIZE,
+			  iproto_queue_handler);
 }
 
diff --git a/src/lua/init.m b/src/lua/init.m
index 50b56c3a69..d8b619a429 100644
--- a/src/lua/init.m
+++ b/src/lua/init.m
@@ -48,6 +48,7 @@
 #include "lua/slab.h"
 #include "lua/stat.h"
 #include "lua/uuid.h"
+#include "lua/session.h"
 
 #include TARANTOOL_CONFIG
 
@@ -568,14 +569,6 @@ lbox_fiber_id(struct lua_State *L)
 	return 1;
 }
 
-static int
-lbox_fiber_sid(struct lua_State *L)
-{
-	struct fiber *f = lua_gettop(L) ? lbox_checkfiber(L, 1) : fiber;
-	lua_pushinteger(L, f->sid);
-	return 1;
-}
-
 static struct lua_State *
 box_lua_fiber_get_coro(struct lua_State *L, struct fiber *f)
 {
@@ -798,6 +791,7 @@ lbox_fiber_create(struct lua_State *L)
 			   " reached");
 
 	struct fiber *f = fiber_create("lua", box_lua_fiber_run);
+	/* Preserve the session in a child fiber. */
 	fiber_set_sid(f, fiber->sid);
 	/* Initially the fiber is cancellable */
 	f->flags |= FIBER_USER_MODE | FIBER_CANCELLABLE;
@@ -1048,7 +1042,6 @@ lbox_fiber_testcancel(struct lua_State *L)
 
 static const struct luaL_reg lbox_fiber_meta [] = {
 	{"id", lbox_fiber_id},
-	{"sid", lbox_fiber_sid},
 	{"name", lbox_fiber_name},
 	{"__gc", lbox_fiber_gc},
 	{NULL, NULL}
@@ -1058,7 +1051,6 @@ static const struct luaL_reg fiberlib[] = {
 	{"sleep", lbox_fiber_sleep},
 	{"self", lbox_fiber_self},
 	{"id", lbox_fiber_id},
-	{"sid", lbox_fiber_sid},
 	{"find", lbox_fiber_find},
 	{"cancel", lbox_fiber_cancel},
 	{"testcancel", lbox_fiber_testcancel},
@@ -1239,12 +1231,6 @@ tarantool_lua_register_type(struct lua_State *L, const char *type_name,
 	lua_pop(L, 1);
 }
 
-/**
- * Remember the LuaJIT FFI extension reference index
- * to protect it from being garbage collected.
- */
-static int ffi_ref = 0;
-
 struct lua_State *
 tarantool_lua_init()
 {
@@ -1258,7 +1244,11 @@ tarantool_lua_init()
 	if (lua_pcall(L, 1, 0, 0) != 0)
 		panic("%s", lua_tostring(L, -1));
 	lua_getglobal(L, "ffi");
-	ffi_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+	/**
+	 * Remember the LuaJIT FFI extension reference index
+	 * to protect it from being garbage collected.
+	 */
+	(void) luaL_ref(L, LUA_REGISTRYINDEX);
 	lua_pushnil(L);
 	lua_setglobal(L, "ffi");
 	luaL_register(L, boxlib_name, boxlib);
@@ -1275,6 +1265,7 @@ tarantool_lua_init()
 	tarantool_lua_stat_init(L);
 	tarantool_lua_ipc_init(L);
 	tarantool_lua_uuid_init(L);
+	tarantool_lua_session_init(L);
 
 	mod_lua_init(L);
 
@@ -1286,7 +1277,6 @@ tarantool_lua_init()
 void
 tarantool_lua_close(struct lua_State *L)
 {
-	luaL_unref(L, LUA_REGISTRYINDEX, ffi_ref);
 	/* collects garbage, invoking userdata gc */
 	lua_close(L);
 }
diff --git a/src/lua/session.m b/src/lua/session.m
new file mode 100644
index 0000000000..523d8d7c07
--- /dev/null
+++ b/src/lua/session.m
@@ -0,0 +1,204 @@
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "lua/session.h"
+#include "lua/init.h"
+
+#include <lua.h>
+#include <lauxlib.h>
+#include <lualib.h>
+
+#include "fiber.h"
+#include "session.h"
+#include "sio.h"
+
+static const char *sessionlib_name = "box.session";
+
+/**
+ * Return a unique monotonic session
+ * identifier. The identifier can be used
+ * to check whether or not a session is alive.
+ * 0 means there is no session (e.g.
+ * a procedure is running in a detached
+ * fiber).
+ */
+static int
+lbox_session_id(struct lua_State *L)
+{
+	lua_pushnumber(L, fiber->sid);
+	return 1;
+}
+
+/**
+ * Check whether or not a session exists.
+ */
+static int
+lbox_session_exists(struct lua_State *L)
+{
+	if (lua_gettop(L) != 1)
+		luaL_error(L, "session.exists(sid): bad arguments");
+
+	uint32_t sid = luaL_checkint(L, -1);
+	lua_pushnumber(L, session_exists(sid));
+	return 1;
+}
+
+/**
+ * Pretty print peer name.
+ */
+static int
+lbox_session_peer(struct lua_State *L)
+{
+	if (lua_gettop(L) > 1)
+		luaL_error(L, "session.peer(sid): bad arguments");
+
+	uint32_t sid = lua_gettop(L) == 1 ?
+		luaL_checkint(L, -1) : fiber->sid;
+
+	int fd = session_fd(sid);
+	struct sockaddr_in addr;
+	sio_getpeername(fd, &addr);
+
+	lua_pushstring(L, sio_strfaddr(&addr));
+	return 1;
+}
+
+struct lbox_session_trigger
+{
+	struct session_trigger *trigger;
+	int coro_ref;
+};
+
+static struct lbox_session_trigger on_connect =
+	{ &session_on_connect, LUA_NOREF};
+static struct lbox_session_trigger on_disconnect =
+	{ &session_on_disconnect, LUA_NOREF};
+
+static void
+lbox_session_run_trigger(void *param)
+{
+	struct lbox_session_trigger *trigger = param;
+	/* Copy the referenced callable object object stack. */
+	lua_rawgeti(tarantool_L, LUA_REGISTRYINDEX, trigger->coro_ref);
+
+	struct lua_State *coro_L = lua_tothread(tarantool_L, -1);
+	lua_pop(tarantool_L, 1);
+	/*
+	 * If there was junk from previous invocation of the
+	 * trigger, clear it, only leaving on the stack the chunk
+	 * to be run. The stack may be polluted when previous
+	 * invocation ended up with an error.
+	 */
+	lua_settop(coro_L, 1);
+	/* lua_call pops the function, make a copy */
+	lua_pushvalue(coro_L, -1);
+	@try {
+		lua_call(coro_L, 0, 0);
+	} @catch (tnt_Exception *e) {
+		@throw;
+	} @catch (...) {
+		tnt_raise(ClientError, :ER_PROC_LUA,
+			  lua_tostring(coro_L, -1));
+	}
+}
+
+static int
+lbox_session_set_trigger(struct lua_State *L,
+			 struct lbox_session_trigger *trigger)
+{
+	if (lua_gettop(L) != 1 ||
+	    (lua_type(L, -1) != LUA_TFUNCTION &&
+	     lua_type(L, -1) != LUA_TNIL)) {
+		luaL_error(L, "session.on_connect(chunk): bad arguments");
+	}
+
+	struct lua_State *coro_L;
+
+	if (trigger->coro_ref != LUA_NOREF) {
+		lua_rawgeti(L, LUA_REGISTRYINDEX, trigger->coro_ref);
+		coro_L = lua_tothread(L, -1);
+		lua_pop(L, 1); /* pop the coro. */
+		/* If there was junk from previous invocation, clear it. */
+		lua_settop(coro_L, 1);
+	} else {
+		coro_L = lua_newthread(L);
+		/* Reference the new thread and pop it. */
+		trigger->coro_ref = luaL_ref(L, LUA_REGISTRYINDEX);
+		lua_pushnil(coro_L);
+	}
+	/** Move the chunk to the coroutine in will be run in. */
+	lua_xmove(L, coro_L, 1);
+	/* Return the old chunk. */
+	lua_insert(coro_L, -2);
+	lua_xmove(coro_L, L, 1);
+	/*
+	 * Set or clear the trigger. Return the old value of the
+	 * trigger.
+	 */
+	if (lua_type(coro_L, -1) == LUA_TNIL) {
+		trigger->trigger->trigger = NULL;
+		trigger->trigger->param = NULL;
+	} else {
+		trigger->trigger->trigger = lbox_session_run_trigger;
+		trigger->trigger->param = trigger;
+	}
+	return 1;
+}
+
+static int
+lbox_session_on_connect(struct lua_State *L)
+{
+	return lbox_session_set_trigger(L, &on_connect);
+}
+
+static int
+lbox_session_on_disconnect(struct lua_State *L)
+{
+	return lbox_session_set_trigger(L, &on_disconnect);
+}
+
+static const struct luaL_reg lbox_session_meta [] = {
+	{"id", lbox_session_id},
+	{NULL, NULL}
+};
+
+static const struct luaL_reg sessionlib[] = {
+	{"id", lbox_session_id},
+	{"exists", lbox_session_exists},
+	{"peer", lbox_session_peer},
+	{"on_connect", lbox_session_on_connect},
+	{"on_disconnect", lbox_session_on_disconnect},
+	{NULL, NULL}
+};
+
+void
+tarantool_lua_session_init(struct lua_State *L)
+{
+	luaL_register(L, sessionlib_name, sessionlib);
+	lua_pop(L, 1);
+}
diff --git a/src/session.m b/src/session.m
new file mode 100644
index 0000000000..3992a54016
--- /dev/null
+++ b/src/session.m
@@ -0,0 +1,117 @@
+/*
+ * Redistribution and use in source and binary forms, with or
+ * without modification, are permitted provided that the following
+ * conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above
+ *    copyright notice, this list of conditions and the
+ *    following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above
+ *    copyright notice, this list of conditions and the following
+ *    disclaimer in the documentation and/or other materials
+ *    provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY <COPYRIGHT HOLDER> ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
+ * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
+ * <COPYRIGHT HOLDER> OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT,
+ * INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
+ * BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
+ * LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
+ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+#include "session.h"
+#include "fiber.h"
+
+#include "assoc.h"
+#include "exception.h"
+
+uint32_t sid_max;
+
+static struct mh_i32ptr_t *session_registry;
+
+struct session_trigger session_on_connect;
+struct session_trigger session_on_disconnect;
+
+uint32_t
+session_create(int fd)
+{
+	/* Return the next sid rolling over the reserved value of 0. */
+	while (++sid_max == 0)
+		;
+
+	uint32_t sid = sid_max;
+
+	mh_int_t k = mh_i32ptr_put(session_registry, sid,
+				   (void *) (intptr_t) fd, NULL);
+
+	if (k == mh_end(session_registry)) {
+		tnt_raise(ClientError, :ER_MEMORY_ISSUE,
+			  "session hash", "new session");
+	}
+	/*
+	 * Run the trigger *after* setting the current
+	 * fiber sid.
+	 */
+	fiber_set_sid(fiber, sid);
+	if (session_on_connect.trigger) {
+		void *param = session_on_connect.param;
+		@try {
+			session_on_connect.trigger(param);
+		} @catch (tnt_Exception *e) {
+			fiber_set_sid(fiber, 0);
+			mh_i32ptr_remove(session_registry, sid);
+			@throw;
+		}
+	}
+
+	return sid;
+}
+
+void
+session_destroy(uint32_t sid)
+{
+	if (sid == 0) /* no-op for a dead session. */
+		return;
+
+	if (session_on_disconnect.trigger) {
+		void *param = session_on_disconnect.param;
+		@try {
+			session_on_disconnect.trigger(param);
+		} @catch (tnt_Exception *e) {
+			[e log];
+		} @catch (id e) {
+			/* catch all. */
+		}
+	}
+	mh_i32ptr_remove(session_registry, sid);
+}
+
+int
+session_fd(uint32_t sid)
+{
+	mh_int_t k = mh_i32ptr_get(session_registry, sid);
+	return k == mh_end(session_registry) ?
+		-1 : (intptr_t) mh_value(session_registry, k);
+}
+
+void
+session_init()
+{
+	session_registry = mh_i32ptr_init();
+	if (session_registry == NULL)
+		panic("out of memory");
+}
+
+void
+session_free()
+{
+	if (session_registry)
+		mh_i32ptr_destroy(session_registry);
+}
diff --git a/src/tarantool.m b/src/tarantool.m
index 30e8e5ec6c..2fd963c739 100644
--- a/src/tarantool.m
+++ b/src/tarantool.m
@@ -64,6 +64,7 @@
 #include "tarantool_pthread.h"
 #include "lua/init.h"
 #include "memcached.h"
+#include "session.h"
 #include "box/box.h"
 
 
@@ -593,6 +594,7 @@ tarantool_free(void)
 	destroy_tarantool_cfg(&cfg);
 
 	fiber_free();
+	session_free();
 	palloc_free();
 	ev_default_destroy();
 #ifdef ENABLE_GCOV
@@ -876,6 +878,7 @@ main(int argc, char **argv)
 			    cfg.secondary_port);
 		admin_init(cfg.bind_ipaddr, cfg.admin_port);
 		replication_init(cfg.bind_ipaddr, cfg.replication_port);
+		session_init();
 		/*
 		 * Load user init script.  The script should have access
 		 * to Tarantool Lua API (box.cfg, box.fiber, etc...) that
diff --git a/test/box/fiber.result b/test/box/fiber.result
index efe9afecf3..b44c7713d2 100644
--- a/test/box/fiber.result
+++ b/test/box/fiber.result
@@ -201,24 +201,3 @@ lua box.fiber.find(920)
 lua box.space[0]:truncate()
 ---
 ...
-lua box.fiber.sid() > 0
----
- - true
-...
-lua f = box.fiber.create(function() box.fiber.detach() failed = box.fiber.sid() ~= 0 end)
----
-...
-lua box.fiber.resume(f)
----
-...
-lua failed
----
- - false
-...
-lua f = box.fiber.create(function() if box.fiber.sid() == 0 then error('wrong sid') end end)
----
-...
-lua box.fiber.resume(f)
----
- - true
-...
diff --git a/test/box/fiber.test b/test/box/fiber.test
index ff6b2e9f79..09d8b3d90a 100644
--- a/test/box/fiber.test
+++ b/test/box/fiber.test
@@ -80,13 +80,3 @@ exec admin "lua box.fiber.find(900)"
 exec admin "lua box.fiber.find(910)"
 exec admin "lua box.fiber.find(920)"
 exec admin "lua box.space[0]:truncate()"
-
-# check fiber.sid() and fiber.id()
-exec admin "lua box.fiber.sid() > 0"
-exec admin "lua f = box.fiber.create(function() box.fiber.detach() failed = box.fiber.sid() ~= 0 end)"
-exec admin "lua box.fiber.resume(f)"
-exec admin "lua failed"
-exec admin "lua f = box.fiber.create(function() if box.fiber.sid() == 0 then error('wrong sid') end end)"
-exec admin "lua box.fiber.resume(f)"
-
-
diff --git a/test/box/lua.result b/test/box/lua.result
index e57c46d84d..429192fad9 100644
--- a/test/box/lua.result
+++ b/test/box/lua.result
@@ -20,23 +20,24 @@ lua for n in pairs(box) do print('  - box.', n) end
   - box.replace
   - box.space
   - box.cfg
+  - box.on_reload_configuration
   - box.select_range
   - box.insert
-  - box.on_reload_configuration
   - box.counter
-  - box.info
   - box.auto_increment
+  - box.info
+  - box.session
   - box.uuid_hex
   - box.update
-  - box.slab
-  - box.process
   - box.dostring
-  - box.index
+  - box.process
+  - box.select_limit
+  - box.slab
   - box.select
   - box.flags
   - box.unpack
+  - box.index
   - box.stat
-  - box.select_limit
   - box.pack
 ...
 lua box.pack()
diff --git a/test/box/session.result b/test/box/session.result
new file mode 100644
index 0000000000..272fc77b97
--- /dev/null
+++ b/test/box/session.result
@@ -0,0 +1,171 @@
+lua box.session.exists(box.session.id())
+---
+ - 1
+...
+lua box.session.exists()
+---
+error: 'session.exists(sid): bad arguments'
+...
+lua box.session.exists(1, 2, 3)
+---
+error: 'session.exists(sid): bad arguments'
+...
+lua box.session.exists(1234567890)
+---
+ - 0
+...
+lua box.session.id() > 0
+---
+ - true
+...
+lua f = box.fiber.create(function() box.fiber.detach() failed = box.session.id() ~= 0 end)
+---
+...
+lua box.fiber.resume(f)
+---
+...
+lua failed
+---
+ - false
+...
+lua f1 = box.fiber.create(function() if box.session.id() == 0 then failed = true end end)
+---
+...
+lua box.fiber.resume(f1)
+---
+ - true
+...
+lua failed
+---
+ - false
+...
+lua box.session.peer() == box.session.peer(box.session.id())
+---
+ - true
+...
+lua box.session.on_connect(function() end)
+---
+ - nil
+...
+lua box.session.on_disconnect(function() end)
+---
+ - nil
+...
+lua type(box.session.on_connect(function() error('hear') end))
+---
+ - function
+...
+lua type(box.session.on_disconnect(function() error('hear') end))
+---
+ - function
+...
+lua box.session.on_connect()
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_disconnect()
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_connect(function() end, function() end)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_disconnect(function() end, function() end)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_connect(1, 2)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_disconnect(1, 2)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_connect(1)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua box.session.on_disconnect(1)
+---
+error: 'session.on_connect(chunk): bad arguments'
+...
+lua type(box.session.on_connect(nil))
+---
+ - function
+...
+lua type(box.session.on_disconnect(nil))
+---
+ - function
+...
+lua type(box.session.on_connect(nil))
+---
+ - nil
+...
+lua type(box.session.on_disconnect(nil))
+---
+ - nil
+...
+lua function inc() active_connections = active_connections + 1 end
+---
+...
+lua function dec() active_connections = active_connections - 1 end
+---
+...
+lua box.session.on_connect(inc)
+---
+ - nil
+...
+lua box.session.on_disconnect(dec)
+---
+ - nil
+...
+lua active_connections = 0
+---
+...
+lua active_connections
+---
+ - 1
+...
+lua active_connections
+---
+ - 2
+...
+lua type(box.session.on_connect(nil))
+---
+ - function
+...
+lua type(box.session.on_disconnect(nil))
+---
+ - function
+...
+lua box.session.on_connect(function() box.insert(0, box.session.id()) end)
+---
+ - nil
+...
+lua box.session.on_disconnect(function() box.delete(0, box.session.id()) end)
+---
+ - nil
+...
+lua box.unpack('i', box.select(0, 0, box.session.id())[0]) == box.session.id()
+---
+ - true
+...
+lua type(box.session.on_connect(function() nosuchfunction() end))
+---
+ - function
+...
+disconnected
+lua type(box.session.on_connect(nil))
+---
+ - function
+...
+lua type(box.session.on_disconnect(nil))
+---
+ - function
+...
+lua active_connections
+---
+ - 0
+...
diff --git a/test/box/session.test b/test/box/session.test
new file mode 100644
index 0000000000..e604dd5cac
--- /dev/null
+++ b/test/box/session.test
@@ -0,0 +1,84 @@
+# encoding: tarantool
+from lib.admin_connection import AdminConnection
+from lib.box_connection import BoxConnection
+
+exec admin "lua box.session.exists(box.session.id())"
+exec admin "lua box.session.exists()"
+exec admin "lua box.session.exists(1, 2, 3)"
+exec admin "lua box.session.exists(1234567890)"
+
+# check session.id()
+exec admin "lua box.session.id() > 0"
+exec admin "lua f = box.fiber.create(function() box.fiber.detach() failed = box.session.id() ~= 0 end)"
+exec admin "lua box.fiber.resume(f)"
+exec admin "lua failed"
+exec admin "lua f1 = box.fiber.create(function() if box.session.id() == 0 then failed = true end end)"
+exec admin "lua box.fiber.resume(f1)"
+exec admin "lua failed"
+exec admin "lua box.session.peer() == box.session.peer(box.session.id())"
+
+# check on_connect/on_disconnect triggers
+exec admin "lua box.session.on_connect(function() end)"
+exec admin "lua box.session.on_disconnect(function() end)"
+
+# check it's possible to reset these triggers
+#
+exec admin "lua type(box.session.on_connect(function() error('hear') end))"
+exec admin "lua type(box.session.on_disconnect(function() error('hear') end))"
+
+# check on_connect/on_disconnect argument count and type
+exec admin "lua box.session.on_connect()"
+exec admin "lua box.session.on_disconnect()"
+
+exec admin "lua box.session.on_connect(function() end, function() end)"
+exec admin "lua box.session.on_disconnect(function() end, function() end)"
+
+exec admin "lua box.session.on_connect(1, 2)"
+exec admin "lua box.session.on_disconnect(1, 2)"
+
+exec admin "lua box.session.on_connect(1)"
+exec admin "lua box.session.on_disconnect(1)"
+
+# use of nil to clear the trigger
+exec admin "lua type(box.session.on_connect(nil))"
+exec admin "lua type(box.session.on_disconnect(nil))"
+exec admin "lua type(box.session.on_connect(nil))"
+exec admin "lua type(box.session.on_disconnect(nil))"
+
+# check how connect/disconnect triggers work
+exec admin "lua function inc() active_connections = active_connections + 1 end"
+exec admin "lua function dec() active_connections = active_connections - 1 end"
+exec admin "lua box.session.on_connect(inc)"
+exec admin "lua box.session.on_disconnect(dec)"
+exec admin "lua active_connections = 0"
+con1 = AdminConnection('localhost', server.admin_port)
+exec con1 "lua active_connections"
+con2 = AdminConnection('localhost', server.admin_port)
+exec con2 "lua active_connections"
+con1.disconnect()
+con2.disconnect()
+exec admin "lua type(box.session.on_connect(nil))"
+exec admin "lua type(box.session.on_disconnect(nil))"
+
+# write audit trail of connect/disconnect into a space
+exec admin "lua box.session.on_connect(function() box.insert(0, box.session.id()) end)"
+exec admin "lua box.session.on_disconnect(function() box.delete(0, box.session.id()) end)"
+exec con1 "lua box.unpack('i', box.select(0, 0, box.session.id())[0]) == box.session.id()"
+con1.disconnect()
+
+# if on_connect() trigger raises an exception, the connection is dropped
+exec admin "lua type(box.session.on_connect(function() nosuchfunction() end))"
+con1 = BoxConnection('localhost', server.primary_port)
+try:
+    con1.execute("select * from t0 where k0=0")
+    con1.execute("select * from t0 where k0=0")
+except Exception as e:
+    print "disconnected"
+
+# cleanup
+
+exec admin "lua type(box.session.on_connect(nil))"
+exec admin "lua type(box.session.on_disconnect(nil))"
+exec admin "lua active_connections"
+
+# vim: syntax=python
-- 
GitLab