From e8c3d4338d019a880e5b55588d81e131050c44e7 Mon Sep 17 00:00:00 2001
From: Roman Tsisyk <roman@tsisyk.com>
Date: Tue, 9 Jun 2015 18:21:52 +0300
Subject: [PATCH] Add Lua/C version of index:get()/min()/max()/count()/random()

A part of #863
---
 src/box/lua/error.cc     |  11 ++-
 src/box/lua/error.h      |   3 +
 src/box/lua/index.cc     | 151 +++++++++++++++++++++++++++----
 src/box/lua/schema.lua   |  46 +++++++---
 src/box/lua/space.cc     |  12 ++-
 src/box/lua/tuple.cc     |  19 ++--
 src/box/lua/tuple.h      |  13 ++-
 test/box/select.result   | 190 +++++++++++++++++++++++++--------------
 test/box/select.test.lua | 172 ++++++++++++++++++++---------------
 9 files changed, 436 insertions(+), 181 deletions(-)

diff --git a/src/box/lua/error.cc b/src/box/lua/error.cc
index 95f3001333..b758d55c2c 100644
--- a/src/box/lua/error.cc
+++ b/src/box/lua/error.cc
@@ -40,6 +40,15 @@ extern "C" {
 #include "lua/utils.h"
 #include "box/error.h"
 
+int
+lbox_error(lua_State *L)
+{
+	(void) L;
+	assert(fiber()->exception != NULL);
+	fiber()->exception->raise();
+	return 0;
+}
+
 static int
 lbox_error_raise(lua_State *L)
 {
@@ -54,7 +63,7 @@ lbox_error_raise(lua_State *L)
 	if (top <= 1) {
 		/* re-throw saved exceptions (if any) */
 		if (fiber()->exception)
-			fiber()->exception->raise();
+			lbox_error(L);
 		return 0;
 	} else if (top >= 2 && lua_type(L, 2) == LUA_TNUMBER) {
 		code = lua_tointeger(L, 2);
diff --git a/src/box/lua/error.h b/src/box/lua/error.h
index f0dc5895f9..9661f4e147 100644
--- a/src/box/lua/error.h
+++ b/src/box/lua/error.h
@@ -34,4 +34,7 @@ struct lua_State;
 void
 box_lua_error_init(struct lua_State *L);
 
+int
+lbox_error(struct lua_State *L);
+
 #endif /* INCLUDES_TARANTOOL_LUA_ERROR_H */
diff --git a/src/box/lua/index.cc b/src/box/lua/index.cc
index 7555537c55..bb4d155dbb 100644
--- a/src/box/lua/index.cc
+++ b/src/box/lua/index.cc
@@ -28,13 +28,16 @@
  */
 #include "box/lua/index.h"
 #include "lua/utils.h"
+#include "lua/msgpack.h"
 #include "box/index.h"
 #include "box/space.h"
 #include "box/schema.h"
 #include "box/user_def.h"
 #include "box/tuple.h"
+#include "box/lua/error.h"
 #include "box/lua/tuple.h"
 #include "fiber.h"
+#include "iobuf.h"
 
 /** {{{ box.index Lua library: access to spaces and indexes
  */
@@ -67,21 +70,65 @@ boxffi_index_len(uint32_t space_id, uint32_t index_id)
 	}
 }
 
+static inline int
+lbox_returntuple(lua_State *L, struct tuple *tuple)
+{
+	if (tuple == (struct tuple *) -1) {
+		return lbox_error(L);
+	} else if (tuple == NULL) {
+		lua_pushnil(L);
+		return 1;
+	} else {
+		lbox_pushtuple_noref(L, tuple);
+		return 1;
+	}
+}
+
+static inline struct tuple *
+boxffi_returntuple(struct tuple *tuple)
+{
+	if (tuple == NULL)
+		return NULL;
+	tuple_ref(tuple);
+	return tuple;
+}
+
+static inline char *
+lbox_tokey(lua_State *L, int idx)
+{
+	struct obuf key_buf;
+	obuf_create(&key_buf, &fiber()->gc, LUAMP_ALLOC_FACTOR);
+	luamp_encode_tuple(L, luaL_msgpack_default, &key_buf, idx);
+	return obuf_join(&key_buf);
+}
+
 struct tuple *
 boxffi_index_random(uint32_t space_id, uint32_t index_id, uint32_t rnd)
 {
 	try {
 		Index *index = check_index(space_id, index_id);
 		struct tuple *tuple = index->random(rnd);
-		if (tuple == NULL)
-			return NULL;
-		tuple_ref(tuple); /* must not throw in this case */
-		return tuple;
+		return boxffi_returntuple(tuple);
 	}  catch (Exception *) {
 		return (struct tuple *) -1; /* handled by box.error() in Lua */
 	}
 }
 
+static int
+lbox_index_random(lua_State *L)
+{
+	if (lua_gettop(L) != 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2) ||
+	    !lua_isnumber(L, 3))
+		return luaL_error(L, "Usage index.random(space_id, index_id, rnd)");
+
+	uint32_t space_id = lua_tointeger(L, 1);
+	uint32_t index_id = lua_tointeger(L, 2);
+	uint32_t rnd = lua_tointeger(L, 3);
+
+	struct tuple *tuple = boxffi_index_random(space_id, index_id, rnd);
+	return lbox_returntuple(L, tuple);
+}
+
 struct tuple *
 boxffi_index_get(uint32_t space_id, uint32_t index_id, const char *key)
 {
@@ -92,15 +139,27 @@ boxffi_index_get(uint32_t space_id, uint32_t index_id, const char *key)
 		uint32_t part_count = key ? mp_decode_array(&key) : 0;
 		primary_key_validate(index->key_def, key, part_count);
 		struct tuple *tuple = index->findByKey(key, part_count);
-		if (tuple == NULL)
-			return NULL;
-		tuple_ref(tuple); /* must not throw in this case */
-		return tuple;
+		return boxffi_returntuple(tuple);
 	}  catch (Exception *) {
 		return (struct tuple *) -1; /* handled by box.error() in Lua */
 	}
 }
 
+static int
+lbox_index_get(lua_State *L)
+{
+	if (lua_gettop(L) != 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2))
+		return luaL_error(L, "Usage index.get(space_id, index_id, key)");
+
+	RegionGuard region_guard(&fiber()->gc);
+	uint32_t space_id = lua_tointeger(L, 1);
+	uint32_t index_id = lua_tointeger(L, 2);
+	const char *key = lbox_tokey(L, 3);
+
+	struct tuple *tuple = boxffi_index_get(space_id, index_id, key);
+	return lbox_returntuple(L, tuple);
+}
+
 struct tuple *
 boxffi_index_min(uint32_t space_id, uint32_t index_id, const char *key)
 {
@@ -115,15 +174,27 @@ boxffi_index_min(uint32_t space_id, uint32_t index_id, const char *key)
 		uint32_t part_count = key ? mp_decode_array(&key) : 0;
 		key_validate(index->key_def, ITER_GE, key, part_count);
 		struct tuple *tuple = index->min(key, part_count);
-		if (tuple == NULL)
-			return NULL;
-		tuple_ref(tuple); /* must not throw in this case */
-		return tuple;
+		return boxffi_returntuple(tuple);
 	}  catch (Exception *) {
 		return (struct tuple *) -1; /* handled by box.error() in Lua */
 	}
 }
 
+static int
+lbox_index_min(lua_State *L)
+{
+	if (lua_gettop(L) != 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2))
+		return luaL_error(L, "usage index.min(space_id, index_id, key)");
+
+	RegionGuard region_guard(&fiber()->gc);
+	uint32_t space_id = lua_tointeger(L, 1);
+	uint32_t index_id = lua_tointeger(L, 2);
+	const char *key = lbox_tokey(L, 3);
+
+	struct tuple *tuple = boxffi_index_min(space_id, index_id, key);
+	return lbox_returntuple(L, tuple);
+}
+
 struct tuple *
 boxffi_index_max(uint32_t space_id, uint32_t index_id, const char *key)
 {
@@ -138,15 +209,27 @@ boxffi_index_max(uint32_t space_id, uint32_t index_id, const char *key)
 		uint32_t part_count = key ? mp_decode_array(&key) : 0;
 		key_validate(index->key_def, ITER_LE, key, part_count);
 		struct tuple *tuple = index->max(key, part_count);
-		if (tuple == NULL)
-			return NULL;
-		tuple_ref(tuple); /* must not throw in this case */
-		return tuple;
+		return boxffi_returntuple(tuple);
 	}  catch (Exception *) {
 		return (struct tuple *) -1; /* handled by box.error() in Lua */
 	}
 }
 
+static int
+lbox_index_max(lua_State *L)
+{
+	if (lua_gettop(L) != 3 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2))
+		return luaL_error(L, "usage index.max(space_id, index_id, key)");
+
+	RegionGuard region_guard(&fiber()->gc);
+	uint32_t space_id = lua_tointeger(L, 1);
+	uint32_t index_id = lua_tointeger(L, 2);
+	const char *key = lbox_tokey(L, 3);
+
+	struct tuple *tuple = boxffi_index_max(space_id, index_id, key);
+	return lbox_returntuple(L, tuple);
+}
+
 ssize_t
 boxffi_index_count(uint32_t space_id, uint32_t index_id, int type, const char *key)
 {
@@ -160,6 +243,28 @@ boxffi_index_count(uint32_t space_id, uint32_t index_id, int type, const char *k
 		return -1; /* handled by box.error() in Lua */
 	}
 }
+static int
+lbox_index_count(lua_State *L)
+{
+	if (lua_gettop(L) != 4 || !lua_isnumber(L, 1) || !lua_isnumber(L, 2) ||
+	    !lua_isnumber(L, 3)) {
+		return luaL_error(L, "usage index.count(space_id, index_id, "
+		       "iterator, key)");
+	}
+
+	RegionGuard region_guard(&fiber()->gc);
+	uint32_t space_id = lua_tointeger(L, 1);
+	uint32_t index_id = lua_tointeger(L, 2);
+	uint32_t iterator = lua_tointeger(L, 3);
+	const char *key = lbox_tokey(L, 4);
+
+	ssize_t count = boxffi_index_count(space_id, index_id,
+		iterator, key);
+	if (count == -1)
+		return lbox_error(L);
+	lua_pushinteger(L, count);
+	return 1;
+}
 
 static void
 box_index_init_iterator_types(struct lua_State *L, int idx)
@@ -197,7 +302,7 @@ boxffi_index_iterator(uint32_t space_id, uint32_t index_id, int type,
 	} catch (Exception *) {
 		if (it)
 			it->free(it);
-		/* will be hanled by box.error() in Lua */
+		/* will be handled by box.error() in Lua */
 		return NULL;
 	}
 }
@@ -241,4 +346,16 @@ box_lua_index_init(struct lua_State *L)
 	luaL_register_module(L, "box.index", indexlib);
 	box_index_init_iterator_types(L, -2);
 	lua_pop(L, 1);
+
+	static const struct luaL_reg boxlib_internal[] = {
+		{"random", lbox_index_random},
+		{"get",  lbox_index_get},
+		{"min", lbox_index_min},
+		{"max", lbox_index_max},
+		{"count", lbox_index_count},
+		{NULL, NULL}
+	};
+
+	luaL_register(L, "box.internal", boxlib_internal);
+	lua_pop(L, 1);
 }
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index f1e1c364e1..415e9928c3 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -604,7 +604,7 @@ function box.schema.space.bless(space)
         return error('Attempt to modify a read-only table') end
     index_mt.__index = index_mt
     -- min and max
-    index_mt.min = function(index, key)
+    index_mt.min_ffi = function(index, key)
         local pkey = msgpackffi.encode_tuple(key)
         local tuple = builtin.boxffi_index_min(index.space_id, index.id, pkey)
         if tuple == ffi.cast('void *', -1) then
@@ -615,7 +615,11 @@ function box.schema.space.bless(space)
             return
         end
     end
-    index_mt.max = function(index, key)
+    index_mt.min_luac = function(index, key)
+        key = keify(key)
+        return internal.min(index.space_id, index.id, key);
+    end
+    index_mt.max_ffi = function(index, key)
         local pkey = msgpackffi.encode_tuple(key)
         local tuple = builtin.boxffi_index_max(index.space_id, index.id, pkey)
         if tuple == ffi.cast('void *', -1) then
@@ -626,7 +630,11 @@ function box.schema.space.bless(space)
             return
         end
     end
-    index_mt.random = function(index, rnd)
+    index_mt.max_luac = function(index, key)
+        key = keify(key)
+        return internal.max(index.space_id, index.id, key);
+    end
+    index_mt.random_ffi = function(index, rnd)
         rnd = rnd or math.random()
         local tuple = builtin.boxffi_index_random(index.space_id, index.id, rnd)
         if tuple == ffi.cast('void *', -1) then
@@ -637,6 +645,10 @@ function box.schema.space.bless(space)
             return
         end
     end
+    index_mt.random_luac = function(index, rnd)
+        rnd = rnd or math.random()
+        return internal.random(index.space_id, index.id, rnd);
+    end
     -- iteration
     index_mt.pairs = function(index, key, opts)
         local pkey, pkey_end = msgpackffi.encode_tuple(key)
@@ -654,7 +666,7 @@ function box.schema.space.bless(space)
     index_mt.__pairs = index_mt.pairs -- Lua 5.2 compatibility
     index_mt.__ipairs = index_mt.pairs -- Lua 5.2 compatibility
     -- index subtree size
-    index_mt.count = function(index, key, opts)
+    index_mt.count_ffi = function(index, key, opts)
         local pkey, pkey_end = msgpackffi.encode_tuple(key)
         local itype = check_iterator_type(opts, pkey + 1 >= pkey_end);
         local count = builtin.boxffi_index_count(index.space_id, index.id,
@@ -664,6 +676,11 @@ function box.schema.space.bless(space)
         end
         return tonumber(count)
     end
+    index_mt.count_luac = function(index, key, opts)
+        key = keify(key)
+        local itype = check_iterator_type(opts, #key == 0);
+        return internal.count(index.space_id, index.id, itype, key);
+    end
 
     local function check_index(space, index_id)
         if space.index[index_id] == nil then
@@ -671,7 +688,7 @@ function box.schema.space.bless(space)
         end
     end
 
-    index_mt.get = function(index, key)
+    index_mt.get_ffi = function(index, key)
         local key, key_end = msgpackffi.encode_tuple(key)
         local tuple = builtin.boxffi_index_get(index.space_id, index.id, key)
         if tuple == ffi.cast('void *', -1) then
@@ -682,6 +699,10 @@ function box.schema.space.bless(space)
             return
         end
     end
+    index_mt.get_luac = function(index, key)
+        key = keify(key)
+        return internal.get(index.space_id, index.id, key)
+    end
 
     local function check_select_opts(opts, key_is_nil)
         local offset = 0
@@ -743,12 +764,15 @@ function box.schema.space.bless(space)
 
     -- true if reading operations may yield
     local read_yields = space.engine == 'sophia'
-    if read_yields then
-        -- use Lua/C implmenetation
-        index_mt.select = index_mt.select_luac
-    else
-        -- use FFI implementation
-        index_mt.select = index_mt.select_ffi
+    local read_ops = {'select', 'get', 'min', 'max', 'count', 'random'}
+    for _, op in ipairs(read_ops) do
+        if read_yields then
+            -- use Lua/C implmenetation
+            index_mt[op] = index_mt[op .. "_luac"]
+        else
+            -- use FFI implementation
+            index_mt[op] = index_mt[op .. "_ffi"]
+        end
     end
     --
     local space_mt = {}
diff --git a/src/box/lua/space.cc b/src/box/lua/space.cc
index e9c22b280b..6fa40dde68 100644
--- a/src/box/lua/space.cc
+++ b/src/box/lua/space.cc
@@ -54,8 +54,16 @@ lbox_space_on_replace_trigger(struct trigger *trigger, void *event)
 
 	lua_rawgeti(L, LUA_REGISTRYINDEX, (intptr_t) trigger->data);
 
-	lbox_pushtuple(L, stmt->old_tuple);
-	lbox_pushtuple(L, stmt->new_tuple);
+	if (stmt->old_tuple) {
+		lbox_pushtuple(L, stmt->old_tuple);
+	} else {
+		lua_pushnil(L);
+	}
+	if (stmt->new_tuple) {
+		lbox_pushtuple(L, stmt->new_tuple);
+	} else {
+		lua_pushnil(L);
+	}
 	/* @todo: maybe the space object has to be here */
 	lua_pushstring(L, stmt->space->def.name);
 
diff --git a/src/box/lua/tuple.cc b/src/box/lua/tuple.cc
index 5c021ab9a6..651bd68d83 100644
--- a/src/box/lua/tuple.cc
+++ b/src/box/lua/tuple.cc
@@ -301,19 +301,14 @@ lbox_tuple_transform(struct lua_State *L)
 }
 
 void
-lbox_pushtuple(struct lua_State *L, struct tuple *tuple)
+lbox_pushtuple_noref(struct lua_State *L, struct tuple *tuple)
 {
-	if (tuple) {
-		assert(CTID_CONST_STRUCT_TUPLE_REF != 0);
-		struct tuple **ptr = (struct tuple **) luaL_pushcdata(L,
-			CTID_CONST_STRUCT_TUPLE_REF, sizeof(struct tuple *));
-		*ptr = tuple;
-		lua_pushcfunction(L, lbox_tuple_gc);
-		luaL_setcdatagc(L, -2);
-		tuple_ref(tuple);
-	} else {
-		return lua_pushnil(L);
-	}
+	assert(CTID_CONST_STRUCT_TUPLE_REF != 0);
+	struct tuple **ptr = (struct tuple **) luaL_pushcdata(L,
+		CTID_CONST_STRUCT_TUPLE_REF, sizeof(struct tuple *));
+	*ptr = tuple;
+	lua_pushcfunction(L, lbox_tuple_gc);
+	luaL_setcdatagc(L, -2);
 }
 
 static const struct luaL_reg lbox_tuple_meta[] = {
diff --git a/src/box/lua/tuple.h b/src/box/lua/tuple.h
index 2e182272e1..3d40286e84 100644
--- a/src/box/lua/tuple.h
+++ b/src/box/lua/tuple.h
@@ -28,6 +28,9 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
+#include <box/tuple.h>
+
 struct lua_State;
 struct txn;
 struct tuple;
@@ -36,7 +39,15 @@ struct tuple;
  * Push tuple on lua stack
  */
 void
-lbox_pushtuple(struct lua_State *L, struct tuple *tuple);
+lbox_pushtuple_noref(struct lua_State *L, struct tuple *tuple);
+
+static inline void
+lbox_pushtuple(struct lua_State *L, struct tuple *tuple)
+{
+	assert(tuple != NULL);
+	lbox_pushtuple_noref(L, tuple);
+	tuple_ref(tuple);
+}
 
 struct tuple *lua_istuple(struct lua_State *L, int narg);
 
diff --git a/test/box/select.result b/test/box/select.result
index 5266bfcfee..e621bcfb0c 100644
--- a/test/box/select.result
+++ b/test/box/select.result
@@ -11,81 +11,104 @@ index2 = s:create_index('second', { type = 'tree', unique = true,  parts = {2, '
 ---
 ...
 for i = 1, 20 do s:insert({ i, 1, 2, 3 }) end
+---
+...
+--# setopt delimiter ';'
+local function test_op(op, idx, ...)
+    local t1 = idx[op .. '_ffi'](idx, ...)
+    local t2 = idx[op .. '_luac'](idx, ...)
+    if msgpack.encode(t1) ~= msgpack.encode(t2) then
+        return 'different result from '..op..'_ffi and '..op..'_luac', t1, t2
+    end
+    return t1
+end
+test = setmetatable({}, {
+    __index = function(_, op) return function(...) return test_op(op, ...) end end
+})
+---
+...
+--# setopt delimiter ''
+local function test_op(op, idx, ...)
+    local t1 = idx[op .. '_ffi'](idx, ...)
+    local t2 = idx[op .. '_luac'](idx, ...)
+    if msgpack.encode(t1) ~= msgpack.encode(t2) then
+        return 'different result from '..op..'_ffi and '..op..'_luac', t1, t2
+    end
+    return t1
+end
+test = setmetatable({}, {
+    __index = function(_, op) return function(...) return test_op(op, ...) end end
+})
+
 ---
 ...
 --------------------------------------------------------------------------------
 -- get tests
 --------------------------------------------------------------------------------
-s.index[0]:get()
+s.index[0].get == s.index[0].get_ffi or s.index[0].get == s.index[0].get_luac
+---
+- true
+...
+test.get(s.index[0])
 ---
 - error: Invalid key part count in an exact match (expected 1, got 0)
 ...
-s.index[0]:get({})
+test.get(s.index[0], {})
 ---
 - error: Invalid key part count in an exact match (expected 1, got 0)
 ...
-s.index[0]:get(nil)
+test.get(s.index[0], nil)
 ---
 - error: Invalid key part count in an exact match (expected 1, got 0)
 ...
-s.index[0]:get(1)
+test.get(s.index[0], 1)
 ---
 - [1, 1, 2, 3]
 ...
-s.index[0]:get({1})
+test.get(s.index[0], {1})
 ---
 - [1, 1, 2, 3]
 ...
-s.index[0]:get({1, 2})
+test.get(s.index[0], {1, 2})
 ---
 - error: Invalid key part count in an exact match (expected 1, got 2)
 ...
-s.index[0]:get(0)
+test.get(s.index[0], 0)
 ---
+- null
 ...
-s.index[0]:get({0})
+test.get(s.index[0], {0})
 ---
+- null
 ...
-s.index[0]:get("0")
+test.get(s.index[0], "0")
 ---
 - error: 'Supplied key type of part 0 does not match index part type: expected NUM'
 ...
-s.index[0]:get({"0"})
+test.get(s.index[0], {"0"})
 ---
 - error: 'Supplied key type of part 0 does not match index part type: expected NUM'
 ...
-s.index[1]:get(1)
+test.get(s.index[1], 1)
 ---
 - error: Invalid key part count in an exact match (expected 2, got 1)
 ...
-s.index[1]:get({1})
+test.get(s.index[1], {1})
 ---
 - error: Invalid key part count in an exact match (expected 2, got 1)
 ...
-s.index[1]:get({1, 2})
+test.get(s.index[1], {1, 2})
 ---
 - [2, 1, 2, 3]
 ...
 --------------------------------------------------------------------------------
 -- select tests
 --------------------------------------------------------------------------------
---# setopt delimiter ';'
-function test(idx, ...)
-    local t1 = idx:select_ffi(...)
-    local t2 = idx:select_luac(...)
-    if msgpack.encode(t1) ~= msgpack.encode(t2) then
-        return 'different result from select_ffi and select_luac', t1, t2
-    end
-    return t1
-end;
----
-...
---# setopt delimiter ''
 s.index[0].select == s.index[0].select_ffi or s.index[0].select == s.index[0].select_luac
 ---
 - true
 ...
-test(s.index[0])
+test.select(s.index[0])
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -108,7 +131,7 @@ test(s.index[0])
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], {})
+test.select(s.index[0], {})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -131,7 +154,7 @@ test(s.index[0], {})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], nil)
+test.select(s.index[0], nil)
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -154,7 +177,7 @@ test(s.index[0], nil)
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], {}, {iterator = 'ALL'})
+test.select(s.index[0], {}, {iterator = 'ALL'})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -177,7 +200,7 @@ test(s.index[0], {}, {iterator = 'ALL'})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = box.index.ALL })
+test.select(s.index[0], nil, {iterator = box.index.ALL })
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -200,7 +223,7 @@ test(s.index[0], nil, {iterator = box.index.ALL })
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], {}, {iterator = box.index.ALL, limit = 10})
+test.select(s.index[0], {}, {iterator = box.index.ALL, limit = 10})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -213,15 +236,15 @@ test(s.index[0], {}, {iterator = box.index.ALL, limit = 10})
   - [9, 1, 2, 3]
   - [10, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = box.index.ALL, limit = 0})
+test.select(s.index[0], nil, {iterator = box.index.ALL, limit = 0})
 ---
 - []
 ...
-test(s.index[0], {}, {iterator = 'ALL', limit = 1, offset = 15})
+test.select(s.index[0], {}, {iterator = 'ALL', limit = 1, offset = 15})
 ---
 - - [16, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = 'ALL', limit = 20, offset = 15})
+test.select(s.index[0], nil, {iterator = 'ALL', limit = 20, offset = 15})
 ---
 - - [16, 1, 2, 3]
   - [17, 1, 2, 3]
@@ -229,7 +252,7 @@ test(s.index[0], nil, {iterator = 'ALL', limit = 20, offset = 15})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = box.index.EQ})
+test.select(s.index[0], nil, {iterator = box.index.EQ})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -252,7 +275,7 @@ test(s.index[0], nil, {iterator = box.index.EQ})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], {}, {iterator = 'EQ'})
+test.select(s.index[0], {}, {iterator = 'EQ'})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -275,7 +298,7 @@ test(s.index[0], {}, {iterator = 'EQ'})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = 'REQ'})
+test.select(s.index[0], nil, {iterator = 'REQ'})
 ---
 - - [20, 1, 2, 3]
   - [19, 1, 2, 3]
@@ -298,7 +321,7 @@ test(s.index[0], nil, {iterator = 'REQ'})
   - [2, 1, 2, 3]
   - [1, 1, 2, 3]
 ...
-test(s.index[0], {}, {iterator = box.index.REQ})
+test.select(s.index[0], {}, {iterator = box.index.REQ})
 ---
 - - [20, 1, 2, 3]
   - [19, 1, 2, 3]
@@ -321,45 +344,45 @@ test(s.index[0], {}, {iterator = box.index.REQ})
   - [2, 1, 2, 3]
   - [1, 1, 2, 3]
 ...
-test(s.index[0], nil, {iterator = 'EQ', limit = 2, offset = 1})
+test.select(s.index[0], nil, {iterator = 'EQ', limit = 2, offset = 1})
 ---
 - - [2, 1, 2, 3]
   - [3, 1, 2, 3]
 ...
-test(s.index[0], {}, {iterator = box.index.REQ, limit = 2, offset = 1})
+test.select(s.index[0], {}, {iterator = box.index.REQ, limit = 2, offset = 1})
 ---
 - - [19, 1, 2, 3]
   - [18, 1, 2, 3]
 ...
-test(s.index[0], 1)
+test.select(s.index[0], 1)
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], {1})
+test.select(s.index[0], {1})
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], {1, 2})
+test.select(s.index[0], {1, 2})
 ---
 - error: Invalid key part count (expected [0..1], got 2)
 ...
-test(s.index[0], 0)
+test.select(s.index[0], 0)
 ---
 - []
 ...
-test(s.index[0], {0})
+test.select(s.index[0], {0})
 ---
 - []
 ...
-test(s.index[0], "0")
+test.select(s.index[0], "0")
 ---
 - error: 'Supplied key type of part 0 does not match index part type: expected NUM'
 ...
-test(s.index[0], {"0"})
+test.select(s.index[0], {"0"})
 ---
 - error: 'Supplied key type of part 0 does not match index part type: expected NUM'
 ...
-test(s.index[1], 1)
+test.select(s.index[1], 1)
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -382,7 +405,7 @@ test(s.index[1], 1)
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[1], {1})
+test.select(s.index[1], {1})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -405,12 +428,12 @@ test(s.index[1], {1})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[1], {1}, {limit = 2})
+test.select(s.index[1], {1}, {limit = 2})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
 ...
-test(s.index[1], 1, {iterator = 'EQ'})
+test.select(s.index[1], 1, {iterator = 'EQ'})
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -433,29 +456,29 @@ test(s.index[1], 1, {iterator = 'EQ'})
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[1], {1}, {iterator = box.index.EQ, offset = 16, limit = 2})
+test.select(s.index[1], {1}, {iterator = box.index.EQ, offset = 16, limit = 2})
 ---
 - - [17, 1, 2, 3]
   - [18, 1, 2, 3]
 ...
-test(s.index[1], {1}, {iterator = box.index.REQ, offset = 16, limit = 2 })
+test.select(s.index[1], {1}, {iterator = box.index.REQ, offset = 16, limit = 2 })
 ---
 - - [4, 1, 2, 3]
   - [3, 1, 2, 3]
 ...
-test(s.index[1], {1, 2}, {iterator = 'EQ'})
+test.select(s.index[1], {1, 2}, {iterator = 'EQ'})
 ---
 - - [2, 1, 2, 3]
 ...
-test(s.index[1], {1, 2}, {iterator = box.index.REQ})
+test.select(s.index[1], {1, 2}, {iterator = box.index.REQ})
 ---
 - - [2, 1, 2, 3]
 ...
-test(s.index[1], {1, 2})
+test.select(s.index[1], {1, 2})
 ---
 - - [2, 1, 2, 3]
 ...
-test(s.index[0], nil, { iterator = 'ALL', offset = 0, limit = 4294967295 })
+test.select(s.index[0], nil, { iterator = 'ALL', offset = 0, limit = 4294967295 })
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -478,7 +501,7 @@ test(s.index[0], nil, { iterator = 'ALL', offset = 0, limit = 4294967295 })
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], {}, { iterator = 'ALL', offset = 0, limit = 4294967295 })
+test.select(s.index[0], {}, { iterator = 'ALL', offset = 0, limit = 4294967295 })
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -501,19 +524,19 @@ test(s.index[0], {}, { iterator = 'ALL', offset = 0, limit = 4294967295 })
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], 1)
+test.select(s.index[0], 1)
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = box.index.EQ })
+test.select(s.index[0], 1, { iterator = box.index.EQ })
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = 'EQ' })
+test.select(s.index[0], 1, { iterator = 'EQ' })
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = 'GE' })
+test.select(s.index[0], 1, { iterator = 'GE' })
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
@@ -536,16 +559,16 @@ test(s.index[0], 1, { iterator = 'GE' })
   - [19, 1, 2, 3]
   - [20, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = 'GE', limit = 2 })
+test.select(s.index[0], 1, { iterator = 'GE', limit = 2 })
 ---
 - - [1, 1, 2, 3]
   - [2, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = 'LE', limit = 2 })
+test.select(s.index[0], 1, { iterator = 'LE', limit = 2 })
 ---
 - - [1, 1, 2, 3]
 ...
-test(s.index[0], 1, { iterator = 'GE', offset = 10, limit = 2 })
+test.select(s.index[0], 1, { iterator = 'GE', offset = 10, limit = 2 })
 ---
 - - [11, 1, 2, 3]
   - [12, 1, 2, 3]
@@ -554,6 +577,43 @@ s:select(2)
 ---
 - - [2, 1, 2, 3]
 ...
+--------------------------------------------------------------------------------
+-- min/max tests
+--------------------------------------------------------------------------------
+test.min(s.index[1])
+---
+- [1, 1, 2, 3]
+...
+test.max(s.index[1])
+---
+- [20, 1, 2, 3]
+...
+--------------------------------------------------------------------------------
+-- count tests
+--------------------------------------------------------------------------------
+test.count(s.index[1])
+---
+- 20
+...
+test.count(s.index[0], nil)
+---
+- 20
+...
+test.count(s.index[0], {})
+---
+- 20
+...
+test.count(s.index[0], 10, { iterator = 'GT'})
+---
+- 10
+...
+--------------------------------------------------------------------------------
+-- random tests
+--------------------------------------------------------------------------------
+test.random(s.index[0], 48)
+---
+- [9, 1, 2, 3]
+...
 s:drop()
 ---
 ...
diff --git a/test/box/select.test.lua b/test/box/select.test.lua
index cfde112f12..d36d866f59 100644
--- a/test/box/select.test.lua
+++ b/test/box/select.test.lua
@@ -5,92 +5,120 @@ index1 = s:create_index('primary', { type = 'tree' })
 index2 = s:create_index('second', { type = 'tree', unique = true,  parts = {2, 'num', 1, 'num'}})
 for i = 1, 20 do s:insert({ i, 1, 2, 3 }) end
 
+--# setopt delimiter ';'
+local function test_op(op, idx, ...)
+    local t1 = idx[op .. '_ffi'](idx, ...)
+    local t2 = idx[op .. '_luac'](idx, ...)
+    if msgpack.encode(t1) ~= msgpack.encode(t2) then
+        return 'different result from '..op..'_ffi and '..op..'_luac', t1, t2
+    end
+    return t1
+end
+test = setmetatable({}, {
+    __index = function(_, op) return function(...) return test_op(op, ...) end end
+})
+--# setopt delimiter ''
+
+
 --------------------------------------------------------------------------------
 -- get tests
 --------------------------------------------------------------------------------
 
-s.index[0]:get()
-s.index[0]:get({})
-s.index[0]:get(nil)
-s.index[0]:get(1)
-s.index[0]:get({1})
-s.index[0]:get({1, 2})
-s.index[0]:get(0)
-s.index[0]:get({0})
-s.index[0]:get("0")
-s.index[0]:get({"0"})
-
-s.index[1]:get(1)
-s.index[1]:get({1})
-s.index[1]:get({1, 2})
+s.index[0].get == s.index[0].get_ffi or s.index[0].get == s.index[0].get_luac
+
+test.get(s.index[0])
+test.get(s.index[0], {})
+test.get(s.index[0], nil)
+test.get(s.index[0], 1)
+test.get(s.index[0], {1})
+test.get(s.index[0], {1, 2})
+test.get(s.index[0], 0)
+test.get(s.index[0], {0})
+test.get(s.index[0], "0")
+test.get(s.index[0], {"0"})
+
+test.get(s.index[1], 1)
+test.get(s.index[1], {1})
+test.get(s.index[1], {1, 2})
 
 --------------------------------------------------------------------------------
 -- select tests
 --------------------------------------------------------------------------------
 
---# setopt delimiter ';'
-function test(idx, ...)
-    local t1 = idx:select_ffi(...)
-    local t2 = idx:select_luac(...)
-    if msgpack.encode(t1) ~= msgpack.encode(t2) then
-        return 'different result from select_ffi and select_luac', t1, t2
-    end
-    return t1
-end;
---# setopt delimiter ''
-
 s.index[0].select == s.index[0].select_ffi or s.index[0].select == s.index[0].select_luac
 
-test(s.index[0])
-test(s.index[0], {})
-test(s.index[0], nil)
-test(s.index[0], {}, {iterator = 'ALL'})
-
-test(s.index[0], nil, {iterator = box.index.ALL })
-test(s.index[0], {}, {iterator = box.index.ALL, limit = 10})
-test(s.index[0], nil, {iterator = box.index.ALL, limit = 0})
-test(s.index[0], {}, {iterator = 'ALL', limit = 1, offset = 15})
-test(s.index[0], nil, {iterator = 'ALL', limit = 20, offset = 15})
-
-test(s.index[0], nil, {iterator = box.index.EQ})
-test(s.index[0], {}, {iterator = 'EQ'})
-test(s.index[0], nil, {iterator = 'REQ'})
-test(s.index[0], {}, {iterator = box.index.REQ})
-
-test(s.index[0], nil, {iterator = 'EQ', limit = 2, offset = 1})
-test(s.index[0], {}, {iterator = box.index.REQ, limit = 2, offset = 1})
-
-test(s.index[0], 1)
-test(s.index[0], {1})
-test(s.index[0], {1, 2})
-test(s.index[0], 0)
-test(s.index[0], {0})
-test(s.index[0], "0")
-test(s.index[0], {"0"})
-
-test(s.index[1], 1)
-test(s.index[1], {1})
-test(s.index[1], {1}, {limit = 2})
-test(s.index[1], 1, {iterator = 'EQ'})
-test(s.index[1], {1}, {iterator = box.index.EQ, offset = 16, limit = 2})
-test(s.index[1], {1}, {iterator = box.index.REQ, offset = 16, limit = 2 })
-test(s.index[1], {1, 2}, {iterator = 'EQ'})
-test(s.index[1], {1, 2}, {iterator = box.index.REQ})
-test(s.index[1], {1, 2})
-
-test(s.index[0], nil, { iterator = 'ALL', offset = 0, limit = 4294967295 })
-test(s.index[0], {}, { iterator = 'ALL', offset = 0, limit = 4294967295 })
-
-test(s.index[0], 1)
-test(s.index[0], 1, { iterator = box.index.EQ })
-test(s.index[0], 1, { iterator = 'EQ' })
-test(s.index[0], 1, { iterator = 'GE' })
-test(s.index[0], 1, { iterator = 'GE', limit = 2 })
-test(s.index[0], 1, { iterator = 'LE', limit = 2 })
-test(s.index[0], 1, { iterator = 'GE', offset = 10, limit = 2 })
+test.select(s.index[0])
+test.select(s.index[0], {})
+test.select(s.index[0], nil)
+test.select(s.index[0], {}, {iterator = 'ALL'})
+
+test.select(s.index[0], nil, {iterator = box.index.ALL })
+test.select(s.index[0], {}, {iterator = box.index.ALL, limit = 10})
+test.select(s.index[0], nil, {iterator = box.index.ALL, limit = 0})
+test.select(s.index[0], {}, {iterator = 'ALL', limit = 1, offset = 15})
+test.select(s.index[0], nil, {iterator = 'ALL', limit = 20, offset = 15})
+
+test.select(s.index[0], nil, {iterator = box.index.EQ})
+test.select(s.index[0], {}, {iterator = 'EQ'})
+test.select(s.index[0], nil, {iterator = 'REQ'})
+test.select(s.index[0], {}, {iterator = box.index.REQ})
+
+test.select(s.index[0], nil, {iterator = 'EQ', limit = 2, offset = 1})
+test.select(s.index[0], {}, {iterator = box.index.REQ, limit = 2, offset = 1})
+
+test.select(s.index[0], 1)
+test.select(s.index[0], {1})
+test.select(s.index[0], {1, 2})
+test.select(s.index[0], 0)
+test.select(s.index[0], {0})
+test.select(s.index[0], "0")
+test.select(s.index[0], {"0"})
+
+test.select(s.index[1], 1)
+test.select(s.index[1], {1})
+test.select(s.index[1], {1}, {limit = 2})
+test.select(s.index[1], 1, {iterator = 'EQ'})
+test.select(s.index[1], {1}, {iterator = box.index.EQ, offset = 16, limit = 2})
+test.select(s.index[1], {1}, {iterator = box.index.REQ, offset = 16, limit = 2 })
+test.select(s.index[1], {1, 2}, {iterator = 'EQ'})
+test.select(s.index[1], {1, 2}, {iterator = box.index.REQ})
+test.select(s.index[1], {1, 2})
+
+test.select(s.index[0], nil, { iterator = 'ALL', offset = 0, limit = 4294967295 })
+test.select(s.index[0], {}, { iterator = 'ALL', offset = 0, limit = 4294967295 })
+
+test.select(s.index[0], 1)
+test.select(s.index[0], 1, { iterator = box.index.EQ })
+test.select(s.index[0], 1, { iterator = 'EQ' })
+test.select(s.index[0], 1, { iterator = 'GE' })
+test.select(s.index[0], 1, { iterator = 'GE', limit = 2 })
+test.select(s.index[0], 1, { iterator = 'LE', limit = 2 })
+test.select(s.index[0], 1, { iterator = 'GE', offset = 10, limit = 2 })
 
 s:select(2)
 
+--------------------------------------------------------------------------------
+-- min/max tests
+--------------------------------------------------------------------------------
+
+test.min(s.index[1])
+test.max(s.index[1])
+
+--------------------------------------------------------------------------------
+-- count tests
+--------------------------------------------------------------------------------
+
+test.count(s.index[1])
+test.count(s.index[0], nil)
+test.count(s.index[0], {})
+test.count(s.index[0], 10, { iterator = 'GT'})
+
+--------------------------------------------------------------------------------
+-- random tests
+--------------------------------------------------------------------------------
+
+test.random(s.index[0], 48)
+
 s:drop()
 
 s = box.schema.space.create('select', { temporary = true })
-- 
GitLab