From 96df090fe71a7152ddee105eca454efc0c096c28 Mon Sep 17 00:00:00 2001
From: Aleksandr Lyapunov <alyapunov@tarantool.org>
Date: Fri, 17 May 2024 17:24:14 +0300
Subject: [PATCH] box: introduce next and previous prefix iterators

Implement 'np' (next prefix) and 'pp' (previous prefix) iterators.
They work only in memtx tree and in a nutshell searches for
strings with greater ('np') or less ('pp') prefix of size as in
given key, comparing with given key.

Closes #9994

@TarantoolBot document
Title: 'np' and 'pp' (next/previous prefix) iterators

Now there are two more iterators available: 'np' (next prefix)
and 'pp' (previous prefix). They work only in memtx tree. Also,
if the last part of key is not a string, they degrade to 'gt'
and 'lt' iterators.

These iterators introduce special comparison of the last part of
key (if it is a string). In terms of lua, if s is the search part,
and t is the corresponding tuple part, 'np' iterator searches for
the first tuple with string.sub(t, 1, #s) > s, while 'pp' searches
for the last tuple with string.sub(t, 1, #s) < s.

Comparison of all other parts of the key remains normal.

As usual, these iterators are available both in select and pairs,
in index and space methods.

Similar to all other tree iterators, they change only initial
search of selection. Once the first tuple found, the rest are
selected sequentially in direct (for 'np') or reverse (for 'pp')
order of the index.

For example:
```
tarantool> s:select{}
---
- - ['a']
  - ['aa']
  - ['ab']
  - ['b']
  - ['ba']
  - ['bb']
  - ['c']
  - ['ca']
  - ['cb']
...

tarantool> s:select({'b'}, {iterator = 'np'})
---
- - ['c']
  - ['ca']
  - ['cb']
...

tarantool> s:select({'b'}, {iterator = 'pp'})
---
- - ['ab']
  - ['aa']
  - ['a']
...
```
---
 .../gh-9994-next-prefix-iterator.md           |   4 +
 src/box/iterator_type.c                       |   2 +
 src/box/iterator_type.h                       |  28 +-
 src/box/memtx_tree.cc                         |  87 +++-
 .../gh_9994_next_prefix_iterator_test.lua     | 405 ++++++++++++++++++
 5 files changed, 514 insertions(+), 12 deletions(-)
 create mode 100644 changelogs/unreleased/gh-9994-next-prefix-iterator.md
 create mode 100644 test/box-luatest/gh_9994_next_prefix_iterator_test.lua

diff --git a/changelogs/unreleased/gh-9994-next-prefix-iterator.md b/changelogs/unreleased/gh-9994-next-prefix-iterator.md
new file mode 100644
index 0000000000..e341bd13ee
--- /dev/null
+++ b/changelogs/unreleased/gh-9994-next-prefix-iterator.md
@@ -0,0 +1,4 @@
+## feature/core
+
+* Implemented new iterators for the memtx tree index: 'np' (next prefix)
+  and 'pp' (previous prefix) (gh-9994).
diff --git a/src/box/iterator_type.c b/src/box/iterator_type.c
index 5d6b55f21c..0979a16916 100644
--- a/src/box/iterator_type.c
+++ b/src/box/iterator_type.c
@@ -44,6 +44,8 @@ const char *iterator_type_strs[] = {
 	/* [ITER_BITS_ALL_NOT_SET] = */ "BITS_ALL_NOT_SET",
 	/* [ITER_OVERLAPS] = */ "OVERLAPS",
 	/* [ITER_NEIGHBOR] = */ "NEIGHBOR",
+	/* [ITER_NP] = */ "NP",
+	/* [ITER_PP] = */ "PP",
 };
 
 static_assert(sizeof(iterator_type_strs) / sizeof(const char *) ==
diff --git a/src/box/iterator_type.h b/src/box/iterator_type.h
index c57e61407d..7aa9f5c15f 100644
--- a/src/box/iterator_type.h
+++ b/src/box/iterator_type.h
@@ -72,7 +72,9 @@ enum iterator_type {
 	ITER_BITS_ANY_SET     =  8, /* at least one x's bit is set         */
 	ITER_BITS_ALL_NOT_SET =  9, /* all bits are not set                */
 	ITER_OVERLAPS         = 10, /* key overlaps x                      */
-	ITER_NEIGHBOR         = 11, /* tuples in distance ascending order from specified point */
+	ITER_NEIGHBOR         = 11, /* tuples as they move away from x point */
+	ITER_NP               = 12, /* next prefix, ASC order              */
+	ITER_PP               = 13, /* previous prefix, DESC order         */
 	iterator_type_MAX
 };
 
@@ -81,21 +83,25 @@ enum iterator_type {
 extern const char *iterator_type_strs[];
 
 /**
- * Determine a direction of the given iterator type.
- * That is -1 for REQ, LT and LE and +1 for all others.
+ * Determine whether direction of given iterator type is reverse,
+ * That is true for REQ, LT and LE etc and false for all others.
  */
-static inline int
-iterator_direction(enum iterator_type type)
+static inline bool
+iterator_type_is_reverse(enum iterator_type type)
 {
-	const unsigned reverse =
-		(1u << ITER_REQ) | (1u << ITER_LT) | (1u << ITER_LE);
-	return (reverse & (1u << type)) ? -1 : 1;
+	const unsigned reverse = (1u << ITER_REQ) | (1u << ITER_LT) |
+				 (1u << ITER_LE) | (1u << ITER_PP);
+	return reverse & (1u << type);
 }
 
-static inline bool
-iterator_type_is_reverse(enum iterator_type type)
+/**
+ * Determine a direction of given iterator type.
+ * That is -1 for REQ, LT and LE etc and +1 for all others.
+ */
+static inline int
+iterator_direction(enum iterator_type type)
 {
-	return type == ITER_REQ || type == ITER_LT || type == ITER_LE;
+	return iterator_type_is_reverse(type) ? -1 : 1;
 }
 
 #if defined(__cplusplus)
diff --git a/src/box/memtx_tree.cc b/src/box/memtx_tree.cc
index 1e6716d759..94962439be 100644
--- a/src/box/memtx_tree.cc
+++ b/src/box/memtx_tree.cc
@@ -646,10 +646,12 @@ tree_iterator_set_next_method(struct tree_iterator<USE_HINT> *it)
 		break;
 	case ITER_LT:
 	case ITER_LE:
+	case ITER_PP:
 		it->base.next_internal = tree_iterator_prev<USE_HINT>;
 		break;
 	case ITER_GE:
 	case ITER_GT:
+	case ITER_NP:
 		it->base.next_internal = tree_iterator_next<USE_HINT>;
 		break;
 	default:
@@ -659,10 +661,77 @@ tree_iterator_set_next_method(struct tree_iterator<USE_HINT> *it)
 	it->base.next = memtx_iterator_next;
 }
 
+/**
+ * Having iterator @a type as ITER_NP or ITER_PP, transform initial search key
+ * @a start_data and the @a type so that normal initial search in iterator would
+ * find exactly what needed for next prefix or previous prefix iterator.
+ * The resulting type is one of ITER_GT/ITER_LT/ITER_GE/ITER_LE.
+ * In the most common case a new search key is allocated on @a region, so
+ * region cleanup is needed after the key is no more needed.
+ * @retval true if @a start_data and @a type are ready for search.
+ * @retval false if the iteration must be stopped without an error.
+ */
+template <bool USE_HINT>
+static bool
+prepare_start_prefix_iterator(struct memtx_tree_key_data<USE_HINT> *start_data,
+			      enum iterator_type *type, struct key_def *cmp_def,
+			      struct region *region)
+{
+	assert(*type == ITER_NP || *type == ITER_PP);
+	assert(start_data->part_count > 0);
+	*type = (*type == ITER_NP) ? ITER_GT : ITER_LT;
+
+	/* PP with ASC and NP with DESC works exactly as LT and GT. */
+	bool part_order = cmp_def->parts[start_data->part_count - 1].sort_order;
+	if ((*type == ITER_LT) == (part_order == SORT_ORDER_ASC))
+		return true;
+
+	/* Find the last part of given key. */
+	const char *c = start_data->key;
+	for (uint32_t i = 1; i < start_data->part_count; i++)
+		mp_next(&c);
+	/* If the last part is not a string the iterator degrades to GT/LT. */
+	if (mp_typeof(*c) != MP_STR)
+		return true;
+
+	uint32_t str_size = mp_decode_strl(&c);
+	/* Any string logically starts with empty string; iteration is over. */
+	if (str_size == 0)
+		return false;
+	size_t prefix_size = c - start_data->key;
+	size_t total_size = prefix_size + str_size;
+
+	unsigned char *p = (unsigned char *)xregion_alloc(region, total_size);
+	memcpy(p, start_data->key, total_size);
+
+	/* Increase the key to the least greater value. */
+	unsigned char *str = p + prefix_size;
+	for (uint32_t i = str_size - 1; ; i--) {
+		if (str[i] != UCHAR_MAX) {
+			str[i]++;
+			break;
+		} else if (i == 0) {
+			/* If prefix consists of CHAR_MAX, there's no next. */
+			return false;
+		}
+		str[i] = 0;
+	}
+
+	/* With increased key we can continue the GE/LE search. */
+	*type = (*type == ITER_GT) ? ITER_GE : ITER_LE;
+	start_data->key = (char *)p;
+	if (USE_HINT)
+		start_data->set_hint(key_hint(start_data->key,
+					      start_data->part_count, cmp_def));
+	return true;
+}
+
 template <bool USE_HINT>
 static int
 tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 {
+	struct region *region = &fiber()->gc;
+	RegionGuard region_guard(region);
 	*ret = NULL;
 	struct space *space;
 	struct index *index_base;
@@ -677,6 +746,12 @@ tree_iterator_start(struct iterator *iterator, struct tuple **ret)
 	struct memtx_tree_key_data<USE_HINT> start_data =
 		it->after_data.key != NULL ? it->after_data : it->key_data;
 	enum iterator_type type = it->type;
+	if ((type == ITER_NP || type == ITER_PP) &&
+	    it->after_data.key == NULL) {
+		if (!prepare_start_prefix_iterator(&start_data, &type,
+						   cmp_def, region))
+			return 0;
+	}
 	/*
 	 * Since iteration with equality iterators returns first found tuple,
 	 * we need a special flag for EQ and REQ if we want to start iteration
@@ -1577,11 +1652,21 @@ memtx_tree_index_create_iterator(struct index *base, enum iterator_type type,
 	struct key_def *cmp_def = memtx_tree_cmp_def(&index->tree);
 
 	assert(part_count == 0 || key != NULL);
-	if (type > ITER_GT) {
+	assert(type >= 0 && type < iterator_type_MAX);
+	static_assert(iterator_type_MAX < 32, "Too big for bit logic");
+	const uint32_t supported_mask = ((1u << (ITER_GT + 1)) - 1) |
+		(1u << ITER_NP) | (1u << ITER_PP);
+	if (((1u << type) & supported_mask) == 0) {
 		diag_set(UnsupportedIndexFeature, base->def,
 			 "requested iterator type");
 		return NULL;
 	}
+	if ((type == ITER_NP || type == ITER_PP) && part_count > 0 &&
+	    cmp_def->parts[part_count - 1].coll != NULL) {
+		diag_set(UnsupportedIndexFeature, base->def,
+			 "requested iterator type along with collation");
+		return NULL;
+	}
 	if (part_count == 0) {
 		/*
 		 * If no key is specified, downgrade equality
diff --git a/test/box-luatest/gh_9994_next_prefix_iterator_test.lua b/test/box-luatest/gh_9994_next_prefix_iterator_test.lua
new file mode 100644
index 0000000000..8dd1499695
--- /dev/null
+++ b/test/box-luatest/gh_9994_next_prefix_iterator_test.lua
@@ -0,0 +1,405 @@
+local server = require('luatest.server')
+local t = require('luatest')
+
+local g = t.group()
+
+g.before_all(function(cg)
+    cg.server = server:new()
+    cg.server:start()
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+g.after_each(function(cg)
+    cg.server:exec(function()
+        if box.space.test then
+            box.space.test:drop()
+        end
+        if box.func.func then
+            box.func.func:drop()
+        end
+    end)
+end)
+
+-- Check some cases when NP and PP iterators are unsupported..
+g.test_next_prefix_unsupported = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {type = 'hash', parts = {{1, 'string'}}})
+        local m = "Index 'pk' (HASH) of space 'test' (memtx) " ..
+                  "does not support requested iterator type"
+        t.assert_error_msg_content_equals(m, s.select, s, '', {iterator = 'np'})
+        t.assert_error_msg_content_equals(m, s.select, s, '', {iterator = 'pp'})
+
+        s:create_index('sk', {parts = {{1, 'str', collation = 'unicode_ci'}}})
+        local i = s.index.sk
+        m = "Index 'sk' (TREE) of space 'test' (memtx) " ..
+            "does not support requested iterator type along with collation"
+        t.assert_error_msg_content_equals(m, i.select, i, '', {iterator = 'np'})
+        t.assert_error_msg_content_equals(m, i.select, i, '', {iterator = 'pp'})
+
+        s:drop()
+        s = box.schema.space.create('test', {engine = 'vinyl'})
+        s:create_index('pk', {parts = {{1, 'string'}}})
+        m = "Index 'pk' (TREE) of space 'test' (vinyl) " ..
+            "does not support requested iterator type"
+        t.assert_error_msg_content_equals(m, s.select, s, '', {iterator = 'np'})
+        t.assert_error_msg_content_equals(m, s.select, s, '', {iterator = 'pp'})
+        s:drop()
+    end)
+end
+
+-- Simple test of next prefix and previous prefix iterators.
+g.test_next_prefix_simple = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'string'}}})
+        s:replace{'a'}
+        s:replace{'aa'}
+        s:replace{'ab'}
+        s:replace{'b'}
+        s:replace{'ba'}
+        s:replace{'bb'}
+        s:replace{'c'}
+        s:replace{'ca'}
+        s:replace{'cb'}
+        local all = s:select({}, {fullscan = true})
+        local rall = s:select({}, {fullscan = true, iterator = 'le'})
+
+        t.assert_equals(s:select({}, {iterator = 'np', fullscan = true}), all)
+        t.assert_equals(s:select('', {iterator = 'np'}), {})
+        t.assert_equals(s:select('a', {iterator = 'np'}),
+                        {{'b'}, {'ba'}, {'bb'}, {'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(s:select('b', {iterator = 'np'}),
+                        {{'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(s:select('c', {iterator = 'np'}), {})
+        t.assert_equals(s:select('b', {iterator = 'np', limit = 1}), {{'c'}})
+
+        t.assert_equals(s:select({}, {iterator = 'pp', fullscan = true}), rall)
+        t.assert_equals(s:select('', {iterator = 'pp'}), {})
+        t.assert_equals(s:select('a', {iterator = 'pp'}), {})
+        t.assert_equals(s:select('b', {iterator = 'pp'}),
+                        {{'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(s:select('c', {iterator = 'pp'}),
+                        {{'bb'}, {'ba'}, {'b'}, {'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(s:select('b', {iterator = 'pp', limit = 1}), {{'ab'}})
+
+        local function get_pairs(key, opts)
+            local res = {}
+            for _, t in s:pairs(key, opts) do
+                table.insert(res, t)
+            end
+            return res
+        end
+
+        t.assert_equals(get_pairs({}, {iterator = 'np', fullscan = true}), all)
+        t.assert_equals(get_pairs('', {iterator = 'np'}), {})
+        t.assert_equals(get_pairs('a', {iterator = 'np'}),
+                        {{'b'}, {'ba'}, {'bb'}, {'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(get_pairs('b', {iterator = 'np'}),
+                        {{'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(get_pairs('c', {iterator = 'np'}), {})
+
+        t.assert_equals(get_pairs({}, {iterator = 'pp', fullscan = true}), rall)
+        t.assert_equals(get_pairs('', {iterator = 'pp'}), {})
+        t.assert_equals(get_pairs('a', {iterator = 'pp'}), {})
+        t.assert_equals(get_pairs('b', {iterator = 'pp'}),
+                        {{'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(get_pairs('c', {iterator = 'pp'}),
+                        {{'bb'}, {'ba'}, {'b'}, {'ab'}, {'aa'}, {'a'}})
+    end)
+end
+
+-- Simple test of next prefix and previous prefix iterators with desc order.
+g.test_next_prefix_simple_reverse = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'string', sort_order = 'desc'}}})
+        s:replace{'a'}
+        s:replace{'aa'}
+        s:replace{'ab'}
+        s:replace{'b'}
+        s:replace{'ba'}
+        s:replace{'bb'}
+        s:replace{'c'}
+        s:replace{'ca'}
+        s:replace{'cb'}
+        local all = s:select({}, {fullscan = true})
+        local rall = s:select({}, {fullscan = true, iterator = 'le'})
+
+        t.assert_equals(s:select({}, {iterator = 'np', fullscan = true}), all)
+        t.assert_equals(s:select('', {iterator = 'np'}), {})
+        t.assert_equals(s:select('a', {iterator = 'np'}), {})
+        t.assert_equals(s:select('b', {iterator = 'np'}),
+                        {{'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(s:select('c', {iterator = 'np'}),
+                        {{'bb'}, {'ba'}, {'b'}, {'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(s:select('b', {iterator = 'np', limit = 1}), {{'ab'}})
+
+        t.assert_equals(s:select({}, {iterator = 'pp', fullscan = true}), rall)
+        t.assert_equals(s:select('', {iterator = 'pp'}), {})
+        t.assert_equals(s:select('a', {iterator = 'pp'}),
+                        {{'b'}, {'ba'}, {'bb'}, {'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(s:select('b', {iterator = 'pp'}),
+                        {{'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(s:select('c', {iterator = 'pp'}), {})
+        t.assert_equals(s:select('b', {iterator = 'pp', limit = 1}), {{'c'}})
+
+        local function get_pairs(key, opts)
+            local res = {}
+            for _, t in s:pairs(key, opts) do
+                table.insert(res, t)
+            end
+            return res
+        end
+
+        t.assert_equals(get_pairs({}, {iterator = 'np', fullscan = true}), all)
+        t.assert_equals(get_pairs('', {iterator = 'np'}), {})
+        t.assert_equals(get_pairs('a', {iterator = 'np'}), {})
+        t.assert_equals(get_pairs('b', {iterator = 'np'}),
+                        {{'ab'}, {'aa'}, {'a'}})
+        t.assert_equals(get_pairs('c', {iterator = 'np'}),
+                        {{'bb'}, {'ba'}, {'b'}, {'ab'}, {'aa'}, {'a'}})
+
+        t.assert_equals(get_pairs({}, {iterator = 'pp', fullscan = true}), rall)
+        t.assert_equals(get_pairs('', {iterator = 'pp'}), {})
+        t.assert_equals(get_pairs('a', {iterator = 'pp'}),
+                        {{'b'}, {'ba'}, {'bb'}, {'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(get_pairs('b', {iterator = 'pp'}),
+                        {{'c'}, {'ca'}, {'cb'}})
+        t.assert_equals(get_pairs('c', {iterator = 'pp'}), {})
+    end)
+end
+
+-- Next prefix in scalar index.
+g.test_next_prefix_scalar = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'scalar'}}})
+        s:replace{1}
+        s:replace{2}
+        s:replace{3}
+        s:replace{'a'}
+        s:replace{'ab'}
+        s:replace{'b'}
+        s:replace{'ba'}
+
+        t.assert_equals(s:select('a', {iterator = 'np'}), {{'b'}, {'ba'}})
+        t.assert_equals(s:select('b', {iterator = 'pp'}),
+                        {{'ab'}, {'a'}, {3}, {2}, {1}})
+        t.assert_equals(s:select(2, {iterator = 'np'}),
+                        {{3}, {'a'}, {'ab'}, {'b'}, {'ba'}})
+        t.assert_equals(s:select(2, {iterator = 'pp'}), {{1}})
+    end)
+end
+
+-- Next prefix in functional index.
+g.test_next_prefix_func = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk')
+        local body = [[
+            function(tuple)
+                local res = {}
+                for _, s in pairs(string.split(tuple[2], ' ')) do
+                    table.insert(res, {s})
+                end
+                return res
+            end
+        ]]
+        box.schema.func.create('func',
+                               {body = body,
+                                is_deterministic = true,
+                                is_sandboxed = true,
+                                is_multikey = true})
+        local i = s:create_index('sk', {parts = {{1, 'string'}}, func = 'func'})
+        s:replace{1, 'a aa ab'}
+        s:replace{2, 'b ba bb'}
+        s:replace{3, 'c ca cb'}
+        t.assert_equals(i:select({}, {iterator = 'np', fullscan = true}),
+                        i:select({}, {iterator = 'ge', fullscan = true}))
+        t.assert_equals(i:select('', {iterator = 'np'}), {})
+        t.assert_equals(i:select('b', {iterator = 'np'}),
+                        {{3, 'c ca cb'}, {3, 'c ca cb'}, {3, 'c ca cb'}})
+        t.assert_equals(i:select('c', {iterator = 'np'}), {})
+
+        t.assert_equals(i:select({}, {iterator = 'pp', fullscan = true}),
+                        i:select({}, {iterator = 'le', fullscan = true}))
+        t.assert_equals(i:select('', {iterator = 'pp'}), {})
+        t.assert_equals(i:select('b', {iterator = 'pp'}),
+                        {{1, 'a aa ab'}, {1, 'a aa ab'}, {1, 'a aa ab'}})
+        t.assert_equals(i:select('a', {iterator = 'pp'}), {})
+    end)
+end
+
+-- Next prefix in json index.
+g.test_next_prefix_json = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'string', path = 'data'}}})
+        s:replace{{data = 'a'}}
+        s:replace{{data = 'ab'}}
+        s:replace{{data = 'b'}}
+        s:replace{{data = 'ba'}}
+
+        t.assert_equals(s:select('a', {iterator = 'np'}),
+                        {{{data = 'b'}}, {{data = 'ba'}}})
+        t.assert_equals(s:select('a', {iterator = 'pp'}), {})
+        t.assert_equals(s:select('b', {iterator = 'np'}), {})
+        t.assert_equals(s:select('b', {iterator = 'pp'}),
+                        {{{data = 'ab'}}, {{data = 'a'}}})
+    end)
+end
+
+-- Strange but valid cases.
+g.test_next_prefix_strange = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'unsigned'}, {2, 'string'},
+                                       {3, 'unsigned'}}})
+        local sk = s:create_index('sk', {parts = {{1, 'unsigned'}}})
+        s:replace{1, '', 1}
+        s:replace{2, 'a', 2}
+        s:replace{3, 'b', 3}
+
+        -- Non-string parts works as gt/lt.
+        t.assert_equals(s:select({2}, {iterator = 'np'}), {{3, 'b', 3}})
+        t.assert_equals(s:select({2}, {iterator = 'pp'}), {{1, '', 1}})
+        t.assert_equals(s:select({2, 'a', 2}, {iterator = 'np'}), {{3, 'b', 3}})
+        t.assert_equals(s:select({2, 'a', 2}, {iterator = 'pp'}), {{1, '', 1}})
+        t.assert_equals(sk:select({2}, {iterator = 'np'}), {{3, 'b', 3}})
+        t.assert_equals(sk:select({2}, {iterator = 'pp'}), {{1, '', 1}})
+
+        sk:drop()
+        s:replace{1, 'a', 1}
+        s:replace{1, 'aa', 1}
+        s:replace{1, 'ab', 1}
+        s:replace{1, 'b', 1}
+        s:replace{1, 'ba', 1}
+        s:replace{1, 'bb', 1}
+
+        -- Previous prefix takes all tuples with string.sub('', 1, 3) < 'a'
+        t.assert_equals(s:select({1, 'aaa'}, {iterator = 'pp'}),
+                        {{1, 'aa', 1}, {1, 'a', 1}, {1, '', 1}})
+        s:drop()
+
+        s = box.schema.space.create('test')
+        s:create_index('pk', {parts = {{1, 'string'}}})
+        s:replace{'\x00'}
+        s:replace{'\x00\x00'}
+        s:replace{'\x00\x00\x01'}
+        s:replace{'\x00\x00\x02'}
+        s:replace{'\x7f'}
+        s:replace{'\x7f\x7f'}
+        s:replace{'\x7f\x7f\x01'}
+        s:replace{'\x7f\x7f\x02'}
+        s:replace{'\x80'}
+        s:replace{'\x80\x80'}
+        s:replace{'\x80\x80\x01'}
+        s:replace{'\x80\x80\x02'}
+        s:replace{'\xff'}
+        s:replace{'\xff\xff'}
+        s:replace{'\xff\xff\x01'}
+        s:replace{'\xff\xff\x02'}
+
+        local opts = {iterator = 'np', limit = 1}
+        t.assert_equals(s:select('\x00', opts), {{'\x7f'}})
+        t.assert_equals(s:select('\x00\x00', opts), {{'\x7f'}})
+        t.assert_equals(s:select('\x7f', opts), {{'\x80'}})
+        t.assert_equals(s:select('\x7f\x7f', opts), {{'\x80'}})
+        t.assert_equals(s:select('\x80', opts), {{'\xff'}})
+        t.assert_equals(s:select('\x80\x80', opts), {{'\xff'}})
+        t.assert_equals(s:select('\xff', opts), {})
+        t.assert_equals(s:select('\xff\xff', opts), {})
+
+        local opts = {iterator = 'pp', limit = 1}
+        t.assert_equals(s:select('\x00', opts), {})
+        t.assert_equals(s:select('\x00\x00', opts), {{'\x00'}})
+        t.assert_equals(s:select('\x7f', opts), {{'\x00\x00\x02'}})
+        t.assert_equals(s:select('\x7f\x7f', opts), {{'\x7f'}})
+        t.assert_equals(s:select('\x80', opts), {{'\x7f\x7f\x02'}})
+        t.assert_equals(s:select('\x80\x80', opts), {{'\x80'}})
+        t.assert_equals(s:select('\xff', opts), {{'\x80\x80\x02'}})
+        t.assert_equals(s:select('\xff\xff', opts), {{'\xff'}})
+    end)
+end
+
+-- Practical case. List all files and directories in given directory.
+g.test_directory_list = function(cg)
+    cg.server:exec(function()
+        local s = box.schema.space.create('test')
+        s:format{{'division', 'unsigned'},
+                 {'path', 'string'},
+                 {'version', 'unsigned'}}
+        s:create_index('pk', {parts = {{'division'}, {'path'}, {'version'}}})
+        s:replace{0, '/home/', 0}
+        s:replace{1, '/home/another_user/file.txt', 0}
+        s:replace{1, '/home/who_is_that/file.txt', 0}
+        s:replace{1, '/home/user/file.txt', 0}
+        s:replace{1, '/home/user/file.txt', 1}
+        s:replace{1, '/home/user/data', 0}
+        s:replace{1, '/home/user/data1', 0}
+        s:replace{1, '/home/user/data2', 0}
+        s:replace{1, '/home/user/folder/', 0}
+        s:replace{1, '/home/user/folder/subfolder/data.txt', 0}
+        s:replace{1, '/home/user/folder1/subfolder/data.txt', 0}
+        s:replace{1, '/home/user/bin/tarantool', 1}
+        s:replace{1, '/home/user/bin/tarantool_ctl', 1}
+        s:replace{1, '/home/user/work/tarantool/src/main.cc', 0}
+        s:replace{1, '/home/user/work/tarantool/src/main.cc', 1}
+        s:replace{1, '/home/user/work/tarantool/test/prefix_test.lua', 0}
+        s:replace{1, '/home/user/work/tarantool/README.md', 0}
+        s:replace{1, '/home/user/work/small/README.md', 0}
+        s:replace{1, '/home/user/work/folder/', 0}
+        s:replace{1, '/home/user/work/tarantool/folder/', 0}
+        s:replace{2, '/home/user/secret.txt', 0}
+
+        local function list(division, path)
+            local res = {}
+            if not string.endswith(path, '/') then
+                path = path .. '/'
+            end
+            local function is_good(t)
+                return t and t.division == division and
+                       string.startswith(t.path, path)
+            end
+            local function get_name(subpath)
+                local name = string.sub(subpath, #path + 1)
+                local pos = string.find(name, '/')
+                if pos then
+                    name = string.sub(name, 1, pos)
+                end
+                return name
+            end
+
+            local opts = {iterator = 'gt', limit = 1}
+            local t = s:select({division, path}, opts)[1]
+            if not is_good(t) then
+                return res
+            end
+            local name = get_name(t.path)
+            table.insert(res, name)
+            while true do
+                opts.iterator = string.endswith(name, '/') and 'np' or 'gt'
+                t = s:select({division, path .. name}, opts)[1]
+                if not is_good(t) then
+                    return res
+                end
+                name = get_name(t.path)
+                table.insert(res, name)
+            end
+        end
+
+        t.assert_equals(list(1, '/home/user'),
+                        {'bin/', 'data', 'data1', 'data2',
+                         'file.txt', 'folder/', 'folder1/', 'work/'})
+        t.assert_equals(list(1, '/home/user/bin/'),
+                        {'tarantool', 'tarantool_ctl'})
+        t.assert_equals(list(1, '/home/user/work/'),
+                        {'folder/', 'small/', 'tarantool/'})
+        t.assert_equals(list(1, '/home/user/work/tarantool'),
+                        {'README.md', 'folder/', 'src/', 'test/'})
+    end)
+end
-- 
GitLab