From 50d839315654e3dfa3b133017347eb9ac630b7a3 Mon Sep 17 00:00:00 2001
From: Nikolay Shirokovskiy <nshirokovskiy@tarantool.org>
Date: Mon, 10 Apr 2023 14:40:12 +0300
Subject: [PATCH] core: introduce sample sort algorithm

The algorithm runs sort in multiple threads and does not use OpenMP. It
has better threads utilization right from the beginning but probably
a worse constant than parallel qsort. See details in code comments.

Besides sort is not performed in calling thread but instead in spawned
worker threads. Calling thread yields waiting for worker threads to
finish. Exception is small data size, in this case sorting is executed
in calling thread saving time on spawning a thread. This should speed up
test execution. This is existing behaviour of qsort_arg but data size
threshold is reduced from 128000 to 1024.

Part of #3389

NO_CHANGELOG=internal
NO_DOC=internal
---
 src/lib/core/CMakeLists.txt |   1 +
 src/lib/core/tt_sort.c      | 488 ++++++++++++++++++++++++++++++++++++
 src/lib/core/tt_sort.h      |  39 +++
 test/unit/CMakeLists.txt    |   5 +
 test/unit/qsort_arg.cc      |   4 +-
 test/unit/qsort_arg.result  |  17 --
 test/unit/tt_sort.cc        | 262 +++++++++++++++++++
 third_party/qsort_arg.c     |   3 +-
 third_party/qsort_arg.h     |   7 +
 9 files changed, 806 insertions(+), 20 deletions(-)
 create mode 100644 src/lib/core/tt_sort.c
 create mode 100644 src/lib/core/tt_sort.h
 delete mode 100644 test/unit/qsort_arg.result
 create mode 100644 test/unit/tt_sort.cc

diff --git a/src/lib/core/CMakeLists.txt b/src/lib/core/CMakeLists.txt
index ecd6e93dc6..e842fc721f 100644
--- a/src/lib/core/CMakeLists.txt
+++ b/src/lib/core/CMakeLists.txt
@@ -48,6 +48,7 @@ set(core_sources
     md5.c
     cryptohash.c
     crypt.c
+    tt_sort.c
 )
 
 if (ENABLE_BACKTRACE)
diff --git a/src/lib/core/tt_sort.c b/src/lib/core/tt_sort.c
new file mode 100644
index 0000000000..177daa0bfa
--- /dev/null
+++ b/src/lib/core/tt_sort.c
@@ -0,0 +1,488 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2023, Tarantool AUTHORS, please see AUTHORS file.
+ */
+#include "tt_sort.h"
+
+#include <assert.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include "clock.h"
+#include "diag.h"
+#include "fiber.h"
+#include "qsort_arg.h"
+#include "say.h"
+#include "trivia/util.h"
+
+/**
+ * If size of a data to be sorted is less than NOSPAWN_SIZE_THESHOLD then
+ * sorting will be done in calling thread without yielding. This helps to start
+ * application faster if there is no much data in database (test cases in
+ * particular). Otherwise sorting will be done in threads.
+ */
+#define NOSPAWN_SIZE_THESHOLD 1024
+
+/** Sample sort algorithm data. */
+struct sort_data {
+	/** The data being sorted. */
+	void *data;
+	/** Number of elements in data. */
+	size_t elem_count;
+	/** Size of data element. */
+	size_t elem_size;
+	/** Function for comparing two elements. */
+	tt_sort_compare_f cmp;
+	/** Extra argument for `cmp` function. */
+	void *cmp_arg;
+	/**
+	 * Number of threads to run sort on. It is equal to number of
+	 * buckets we divide the data to.
+	 */
+	int thread_count;
+	/**
+	 * Array of elements that are used as boundaries between buckets.
+	 * Array size is `thread_count - 1`, size of element is `elem_size`.
+	 */
+	void *splitters;
+	/**
+	 * Map of element to bucket. `elem_bucket[index]` is bucket for
+	 * element data[index]`.
+	 */
+	unsigned char *elem_bucket;
+	/**
+	 * Extra space which is used to partition data into buckets. The
+	 * size of space is same as `data` size and each element is of
+	 * `elem_size` just as in `data`.
+	 */
+	void *buffer;
+};
+
+/** Data for a single sample sort worker thread. */
+struct sort_worker {
+	/** A reference to data shared between threads. */
+	struct sort_data *sort;
+	/** The worker cord. */
+	struct cord cord;
+	/** Begin index of data part processed by this thread. */
+	size_t begin;
+	/** End index of data part processed by this thread. */
+	size_t end;
+	/** Whether this thread data part is presorted. */
+	bool presorted;
+	/**
+	 * Histogram of how much elements are placed in each bucket on
+	 * partitioning. Array size is `thread_count`.
+	 */
+	size_t *bucket_hist;
+	/**
+	 * Offsets from the beginning of extra space at which this thread
+	 * write elements for each bucket when partitioning. Array size
+	 * is `thread_count`.
+	 */
+	size_t *bucket_offs;
+	/** The thread bucket begin index in `buffer` on bucket sort phase. */
+	size_t bucket_begin;
+	/** This thread bucket size on bucket sort phase. */
+	size_t bucket_size;
+};
+
+/**
+ * Find bucket for element using binary search among sorted in ascending
+ * order splitters.
+ *
+ * Return index of the bucket for the element.
+ */
+static int
+find_bucket(struct sort_data *sort, void *elem)
+{
+	/*
+	 * Bucket count is `thread_count`, thus bucket boundraries (splitters)
+	 * count is `thread_count - 1` omitting most left and most right
+	 * boundaries. Let's place most left and most right boundaries at
+	 * imaginary indexes `-1` and `size of splitters` respectively.
+	 */
+	int b = -1;
+	int e = sort->thread_count - 1;
+
+	do {
+		int m = (b + e) / 2;
+		assert(m >= 0 && m < sort->thread_count - 1);
+		if (sort->cmp(elem, sort->splitters + m * sort->elem_size,
+			      sort->cmp_arg) < 0)
+			e = m;
+		else
+			b = m;
+	} while (e - b > 1);
+	return b + 1;
+}
+
+/**
+ * Calculate element to bucket map for data part assigned to a thread.
+ * Additionally calculate bucket histogramm - how much elements are placed
+ * in each bucket.
+ */
+static int
+calc_elem_bucket(va_list ap)
+{
+	struct sort_worker *worker = va_arg(ap, typeof(worker));
+	struct sort_data *sort = worker->sort;
+
+	void *pos = sort->data + worker->begin * sort->elem_size;
+	for (size_t i = worker->begin; i < worker->end; i++) {
+		int b = find_bucket(sort, pos);
+		assert(b >= 0 && b < sort->thread_count);
+		sort->elem_bucket[i] = b;
+		worker->bucket_hist[b]++;
+		pos += sort->elem_size;
+	}
+
+	return 0;
+}
+
+/**
+ * Distribute data part assigned to a thread to buckets. Each bucket
+ * has a designated place for this thread.
+ */
+static int
+split_to_buckets(va_list ap)
+{
+	struct sort_worker *worker = va_arg(ap, typeof(worker));
+	struct sort_data *sort = worker->sort;
+
+	void *pos = sort->data + worker->begin * sort->elem_size;
+	for (size_t i = worker->begin; i < worker->end; i++) {
+		int b = sort->elem_bucket[i];
+		memcpy(sort->buffer + worker->bucket_offs[b], pos,
+		       sort->elem_size);
+		worker->bucket_offs[b] += sort->elem_size;
+		pos += sort->elem_size;
+	}
+
+	return 0;
+}
+
+/**
+ * Sort bucket assigned to a thread and copy sorted data back to the original
+ * array.
+ */
+static int
+sort_bucket(va_list ap)
+{
+	struct sort_worker *worker = va_arg(ap, typeof(worker));
+	struct sort_data *sort = worker->sort;
+
+	/* Sort this worker bucket. */
+	qsort_arg_st(sort->buffer + worker->bucket_begin * sort->elem_size,
+		     worker->bucket_size, sort->elem_size,
+		     sort->cmp, sort->cmp_arg);
+
+	/* Move sorted data back from temporary space. */
+	memcpy(sort->data + worker->bucket_begin * sort->elem_size,
+	       sort->buffer + worker->bucket_begin * sort->elem_size,
+	       worker->bucket_size * sort->elem_size);
+
+	return 0;
+}
+
+/**
+ * Run function in several threads. Yields while waiting threads to
+ * finish.
+ *
+ * Arguments:
+ *  func         - a function to run in threads
+ *  workers      - array of function arguments. An element of this array will
+ *                 be passed to as an argument to the function `func` for each
+ *                 thread. So the array should have `thread_count` elements
+ *  thread_count - number of threads
+ */
+static void
+sort_run_mt(fiber_func func, struct sort_worker *workers, int thread_count)
+{
+	for (int i = 0; i < thread_count; i++) {
+		char name[FIBER_NAME_MAX];
+
+		snprintf(name, sizeof(name), "sort.worker.%d", i);
+		if (cord_costart(&workers[i].cord, name, func,
+				 &workers[i]) != 0) {
+			diag_log();
+			panic("cord_start failed");
+		}
+	}
+
+	for (int i = 0; i < thread_count; i++) {
+		if (cord_cojoin(&workers[i].cord) != 0) {
+			diag_log();
+			panic("cord_cojoin failed");
+		}
+	}
+}
+
+/**
+ * As we first split data to buckets and then sort each bucket in single
+ * thread the algorithm performance critically depends on even distribution
+ * data among buckets. According to estimation given in [1] with oversample
+ * factor of 100*log2(elem_count) the probability that no bucket size deviates
+ * from even distribution more than 10% is larger than 1 - 1/elem_count.
+ *
+ * We also do not use random sampling as periodic sample should also work
+ * most of the time.
+ *
+ * [1] https://en.wikipedia.org/wiki/Samplesort
+ */
+static void
+find_splitters(struct sort_data *sort)
+{
+	int log2_n = 0;
+	size_t n = sort->elem_count;
+
+	/* Calculate log2(elem_count). */
+	while (n > 1) {
+		n >>= 1;
+		log2_n++;
+	}
+
+	/* Take samples with oversampling. */
+	int oversample = 100 * log2_n;
+	int samples_num = sort->thread_count * oversample - 1;
+	void *samples = xmalloc(samples_num * sort->elem_size);
+	size_t sample_step = sort->elem_count / samples_num;
+	for (int i = 0; i < samples_num; i++)
+		memcpy(samples + i * sort->elem_size,
+		       sort->data + i * sample_step * sort->elem_size,
+		       sort->elem_size);
+
+	qsort_arg_st(samples, samples_num, sort->elem_size, sort->cmp,
+		     sort->cmp_arg);
+
+	/* Take splitters from samples. */
+	for (int i = 0; i < sort->thread_count - 1; i++) {
+		size_t si = oversample - 1 + i * oversample;
+		memcpy(sort->splitters + i * sort->elem_size,
+		       samples + si * sort->elem_size, sort->elem_size);
+	}
+
+	free(samples);
+}
+
+/** Check whether data part assigned to a thread is presorted. */
+static int
+check_presorted(va_list ap)
+{
+	struct sort_worker *worker = va_arg(ap, typeof(worker));
+	struct sort_data *sort = worker->sort;
+	worker->presorted = true;
+
+	void *pos = sort->data + worker->begin * sort->elem_size;
+	void *limit = sort->data + (worker->end - 1) * sort->elem_size;
+	for (; pos < limit; pos += sort->elem_size) {
+		if (sort->cmp(pos, pos + sort->elem_size, sort->cmp_arg) > 0) {
+			worker->presorted = false;
+			break;
+		}
+	}
+
+	return 0;
+}
+
+/** Sort all the data. */
+static int
+sort_all(va_list ap)
+{
+	struct sort_data *sort = va_arg(ap, typeof(sort));
+
+	qsort_arg_st(sort->data, sort->elem_count, sort->elem_size, sort->cmp,
+		     sort->cmp_arg);
+
+	return 0;
+}
+
+/** Sort all the data in a new thread. Yields while waiting. */
+static void
+sort_single_thread(struct sort_data *sort)
+{
+	struct cord cord;
+	if (cord_costart(&cord, "sort.worker.0", sort_all, sort) != 0) {
+		diag_log();
+		panic("cord_start failed");
+	}
+	if (cord_cojoin(&cord) != 0) {
+		diag_log();
+		panic("cord_cojoin failed");
+	}
+}
+
+void
+tt_sort(void *data, size_t elem_count, size_t elem_size,
+	tt_sort_compare_f cmp, void *cmp_arg, int thread_count)
+{
+	struct sort_data sort;
+	double time_start, time_finish;
+
+	/*
+	 * The algorithm idea is to split the data into buckets, each element
+	 * in bucket `i` is greater than each element in bucket `i - 1`, and
+	 * then sort the buckets. As a result we will sort original array.
+	 *
+	 * So the algo outline is next:
+	 *
+	 * 1. Find buckets boundaries (splitters).
+	 * 2. For each element in data find to what bucket is belongs to.
+	 * 3. Copy elements to their buckets.
+	 * 4. Sort buckets using sequentional qsort_arg and then copy buckets
+	 *    back to the original array.
+	 *
+	 * Steps 2, 3 and 4 are run in parallel on `thread_count` threads. Step
+	 * 1 does not have high computation cost and is run single thread.
+	 *
+	 * Additionally we check if data is already sorted before applying main
+	 * algo.
+	 *
+	 * See also:
+	 * [1] https://en.wikipedia.org/wiki/Samplesort
+	 * [2] Super Scalar Sample Sort, Peter Sanders and Sebastian Winkel
+	 *
+	 * Although this implementation does not use superscalar approach to
+	 * map elements to buckets as in [2] and use usual binary search.
+	 */
+
+	say_verbose("start sort, data size: %zu, elem size: %zu, threads: %d",
+		    elem_count, elem_size, thread_count);
+	if (elem_count < NOSPAWN_SIZE_THESHOLD) {
+		say_verbose("data size is less than threshold %d,"
+			    " sort in caller thread", NOSPAWN_SIZE_THESHOLD);
+		qsort_arg_st(data, elem_count, elem_size, cmp, cmp_arg);
+		return;
+	}
+
+	/*
+	 * Upper limit is because element to bucket map has unsigned char
+	 * storage per element.
+	 */
+	assert(thread_count > 0 && thread_count <= TT_SORT_THREADS_MAX);
+
+	sort.data = data;
+	sort.elem_count = elem_count;
+	sort.elem_size = elem_size;
+	sort.cmp = cmp;
+	sort.cmp_arg = cmp_arg;
+	sort.thread_count = thread_count;
+
+	if (thread_count == 1) {
+		sort_single_thread(&sort);
+		say_verbose("sorting thread number is 1, fallback to qsort");
+		return;
+	}
+
+	sort.elem_bucket = xmalloc(elem_count);
+	sort.buffer = xmalloc(elem_count * elem_size);
+	sort.splitters = xmalloc((thread_count - 1) * elem_size);
+
+	size_t part_size = elem_count / thread_count;
+	struct sort_worker *workers = xmalloc(thread_count * sizeof(*workers));
+	bool presorted = true;
+	/* Required for presorted check on part borders. */
+	assert(part_size > 0);
+	for (int i = 0; i < thread_count; i++) {
+		struct sort_worker *worker = &workers[i];
+
+		worker->sort = &sort;
+		worker->begin = i * part_size;
+		/*
+		 * Each thread takes equal share of data except for last
+		 * thread which takes extra `elem_count % thread_count` elements
+		 * if `elem_count` in not multiple of `thread_count`.
+		 */
+		if (i == thread_count - 1)
+			worker->end = elem_count;
+		else
+			worker->end = worker->begin + part_size;
+
+		worker->bucket_hist = xcalloc(thread_count,
+					      sizeof(*worker->bucket_hist));
+		worker->bucket_offs = xmalloc(thread_count *
+					      sizeof(*worker->bucket_offs));
+
+		if (presorted && i < thread_count - 1 &&
+		    cmp(data + (worker->end - 1) * elem_size,
+			data + worker->end * elem_size, cmp_arg) > 0)
+			presorted = false;
+	}
+
+	if (presorted) {
+		sort_run_mt(check_presorted, workers, thread_count);
+		for (int i = 0; i < thread_count; i++) {
+			if (!workers[i].presorted) {
+				presorted = false;
+				break;
+			}
+		}
+		if (presorted) {
+			say_verbose("data is presorted");
+			goto cleanup;
+		}
+	}
+
+	/* Step 1. Find buckets boundaries (splitters). */
+	find_splitters(&sort);
+
+	/* Step 2. For each element in data find to what bucket is belongs. */
+	time_start = clock_monotonic();
+	sort_run_mt(calc_elem_bucket, workers, thread_count);
+	time_finish = clock_monotonic();
+	say_verbose("calculating elements buckets, time spent: %.3f sec",
+		    time_finish - time_start);
+
+	/* Step 3. Copy elements to their buckets. */
+	time_start = clock_monotonic();
+	size_t offset = 0;
+	for (int i = 0; i < thread_count; i++) {
+		for (int j = 0; j < thread_count; j++) {
+			workers[j].bucket_offs[i] = offset;
+			offset += workers[j].bucket_hist[i] * elem_size;
+		}
+	}
+	sort_run_mt(split_to_buckets, workers, thread_count);
+	time_finish = clock_monotonic();
+	say_verbose("splitting to buckets, time spent: %.3f sec",
+		    time_finish - time_start);
+
+	/*
+	 * Step 4. Sort buckets using sequentional qsort_arg and then copy
+	 * buckets back to the original array.
+	 */
+	time_start = clock_monotonic();
+	size_t bucket_begin = 0;
+	for (int i = 0; i < thread_count; i++) {
+		struct sort_worker *worker = &workers[i];
+		size_t bucket_size = 0;
+
+		for (int j = 0; j < thread_count; j++)
+			bucket_size += workers[j].bucket_hist[i];
+
+		worker->bucket_begin = bucket_begin;
+		worker->bucket_size = bucket_size;
+		bucket_begin += bucket_size;
+
+		say_verbose("bucket %d, size %f", i,
+			    (double)worker->bucket_size / elem_count);
+	}
+	sort_run_mt(sort_bucket, workers, thread_count);
+	time_finish = clock_monotonic();
+	say_verbose("sorting buckets, time spent: %.3f sec",
+		    time_finish - time_start);
+
+cleanup:
+	for (int i = 0; i < thread_count; i++) {
+		struct sort_worker *worker = &workers[i];
+
+		free(worker->bucket_hist);
+		free(worker->bucket_offs);
+	}
+	free(workers);
+	free(sort.elem_bucket);
+	free(sort.buffer);
+	free(sort.splitters);
+}
diff --git a/src/lib/core/tt_sort.h b/src/lib/core/tt_sort.h
new file mode 100644
index 0000000000..2daab3afd2
--- /dev/null
+++ b/src/lib/core/tt_sort.h
@@ -0,0 +1,39 @@
+/*
+ * SPDX-License-Identifier: BSD-2-Clause
+ *
+ * Copyright 2010-2023, Tarantool AUTHORS, please see AUTHORS file.
+ */
+#pragma once
+
+#include <stddef.h>
+
+#if defined(__cplusplus)
+extern "C" {
+#endif /* defined(__cplusplus) */
+
+#define TT_SORT_THREADS_MAX 256
+
+typedef int
+(*tt_sort_compare_f)(const void *a, const void *b, void *arg);
+
+/**
+ * A variant of sample sort algorithm. Sort is executed in multiple threads.
+ * The calling thread itself does not take a working load and yields while
+ * waiting for working threads to finish.
+ *
+ * Arguments:
+ *  data         - data to be sorted
+ *  elem_count   - number of elements in data
+ *  elem_size    - sizeof of single data element
+ *  cmp          - comparison function with usual semantics (as in qsort(3)) and
+ *                 extra argument
+ *  arg          - extra argument to be passed to comparison function
+ *  thread_count - number of threads to execute the sort in
+ */
+void
+tt_sort(void *data, size_t elem_count, size_t elem_size,
+	tt_sort_compare_f cmp, void *cmp_arg, int thread_count);
+
+#if defined(__cplusplus)
+} /* extern "C" */
+#endif /* defined(__cplusplus) */
diff --git a/test/unit/CMakeLists.txt b/test/unit/CMakeLists.txt
index 5361c773ee..bf3babf974 100644
--- a/test/unit/CMakeLists.txt
+++ b/test/unit/CMakeLists.txt
@@ -557,6 +557,11 @@ create_unit_test(PREFIX qsort_arg
                  LIBRARIES misc unit
 )
 
+create_unit_test(PREFIX tt_sort
+                 SOURCES tt_sort.cc core_test_utils.c
+                 LIBRARIES unit core
+)
+
 create_unit_test(PREFIX iterator_position
                  SOURCES iterator_position.c box_test_utils.c
                  LIBRARIES unit box core
diff --git a/test/unit/qsort_arg.cc b/test/unit/qsort_arg.cc
index 423f22eb63..c1ae250ebc 100644
--- a/test/unit/qsort_arg.cc
+++ b/test/unit/qsort_arg.cc
@@ -6,9 +6,11 @@
 #include <random>
 #include <vector>
 
-#include "unit.h"
 #include "trivia/util.h"
 
+#define UNIT_TAP_COMPATIBLE 1
+#include "unit.h"
+
 int
 qsort_cmp(const void *a, const void *b, void *)
 {
diff --git a/test/unit/qsort_arg.result b/test/unit/qsort_arg.result
deleted file mode 100644
index fda97084d1..0000000000
--- a/test/unit/qsort_arg.result
+++ /dev/null
@@ -1,17 +0,0 @@
-1..2
-	*** main ***
-    1..3
-	*** test_qsort_st ***
-    ok 1 - Must be sorted
-    ok 2 - Must be sorted
-    ok 3 - Must be sorted
-	*** test_qsort_st: done ***
-ok 1 - subtests
-    1..3
-	*** test_qsort_mt ***
-    ok 1 - Must be sorted
-    ok 2 - Must be sorted
-    ok 3 - Must be sorted
-	*** test_qsort_mt: done ***
-ok 2 - subtests
-	*** main: done ***
diff --git a/test/unit/tt_sort.cc b/test/unit/tt_sort.cc
new file mode 100644
index 0000000000..e8d95a116d
--- /dev/null
+++ b/test/unit/tt_sort.cc
@@ -0,0 +1,262 @@
+#include "tt_sort.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <random>
+#include <vector>
+
+#include "fiber.h"
+#include "memory.h"
+#include "trivia/util.h"
+
+#define UNIT_TAP_COMPATIBLE 1
+#include "unit.h"
+
+static int test_result = 1;
+
+static int
+cmp_testing(const void *a, const void *b, void *arg)
+{
+	uint64_t i, j;
+	memcpy(&i, a, sizeof(i));
+	memcpy(&j, b, sizeof(j));
+	int ret = i < j ? -1 : i > j;
+	if (arg == nullptr)
+		return ret;
+	return -ret;
+}
+
+/**
+ * For low sizes sorting is done in calling thread without yielding and
+ * using qsort_arg_st.
+ */
+static void
+test_no_extra_threads(void)
+{
+	/* Sizes less than 7 are sorted using n^2 algorithm. */
+	const size_t sizes[] = {3, 5, 7, 8, 100, 207, 331};
+	plan(lengthof(sizes));
+	header();
+
+	auto gen = std::mt19937_64{}; /* No seed for reproducibility. */
+	std::vector<uint64_t> data;
+
+	for (size_t N : sizes) {
+		data.resize(N);
+		for (auto &v : data)
+			v = gen();
+
+		tt_sort(data.data(), N, sizeof(data[0]), cmp_testing,
+			nullptr, 4);
+
+		ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+	}
+
+	footer();
+	check_plan();
+}
+
+static void
+test_no_extra_threads_presorted(void)
+{
+	plan(3);
+	header();
+
+	std::vector<uint64_t> data;
+	int N = 100;
+	data.resize(N);
+
+	/* All elements are equal. */
+	for (auto &v : data)
+		v = 1;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/* Data is presorted. */
+	for (int i = 0; i < N; i++)
+		data[i] = i;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/* Data is presorted but in descending order. */
+	for (int i = 0; i < N; i++)
+		data[i] = N - i;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	footer();
+	check_plan();
+}
+
+/**
+ * For big sizes sorting is done in multiple threads using sample sort
+ * algorithm.
+ */
+static void
+test_multi_threaded(void)
+{
+	size_t sizes[] = {10000, 100000, 200000};
+	size_t threads[] = {1, 2, 3, 4, 7, 8};
+	plan(lengthof(sizes) * lengthof(threads));
+	header();
+
+	auto gen = std::mt19937_64{}; /* No seed for reproducibility. */
+	std::vector<uint64_t> data;
+
+	for (size_t N : sizes) {
+		data.resize(N);
+
+		for (size_t t : threads) {
+			for (auto &v : data)
+				v = gen();
+
+			tt_sort(data.data(), N, sizeof(data[0]), cmp_testing,
+				nullptr, t);
+
+			ok(std::is_sorted(data.begin(), data.end()),
+			   "Must be sorted");
+		}
+	}
+
+	footer();
+	check_plan();
+}
+
+static void
+test_presorted()
+{
+	plan(5);
+	header();
+
+	std::vector<uint64_t> data;
+	int N = 20000;
+	data.resize(N);
+
+	/* All elements are equal. */
+	for (auto &v : data)
+		v = 1;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/* Data is presorted. */
+	for (int i = 0; i < N; i++)
+		data[i] = i;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/* Data is presorted but in descending order. */
+	for (int i = 0; i < N; i++)
+		data[i] = N - i;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/*
+	 * Data is presorted in parts corresponding to threads but
+	 * not globally.
+	 */
+	for (int i = 0; i < N / 2; i++)
+		data[i] = i;
+	for (int i = 0; i < N / 2; i++)
+		data[N / 2 + i] = i;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 2);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	/*
+	 * Data is presorted on border of parts corresponding to threads
+	 * but not in parts itself.
+	 */
+	for (int i = 0; i < N; i++)
+		data[i] = i;
+	data[N / 4] = 0;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 2);
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	footer();
+	check_plan();
+}
+
+static void
+test_degenerated_bucket()
+{
+	plan(1);
+	header();
+
+	auto gen = std::mt19937_64{}; /* No seed for reproducibility. */
+	std::vector<uint64_t> data;
+	int N = 20000;
+	data.resize(N);
+
+	/*
+	 * Bucket splitters will be equal to 0 thus we put all elements to
+	 * the last backet. First 3 buckets will have size 0.
+	 */
+	for (int i = 0; i < N; i++)
+		data[i] = i % 7 == 0 ? gen() : 0;
+
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, nullptr, 4);
+
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	footer();
+	check_plan();
+}
+
+/*
+ * Test extra argument is actually passed to compare callback.
+ */
+static void
+test_extra_argument()
+{
+	plan(1);
+	header();
+
+	auto gen = std::mt19937_64{}; /* No seed for reproducibility. */
+	std::vector<uint64_t> data;
+	int N = 10000;
+	data.resize(N);
+	for (auto &v : data)
+		v = gen();
+
+	int arg;
+	tt_sort(data.data(), N, sizeof(data[0]), cmp_testing, &arg, 3);
+	std::reverse(data.begin(), data.end());
+	ok(std::is_sorted(data.begin(), data.end()), "Must be sorted");
+
+	footer();
+	check_plan();
+}
+
+static int
+main_f(va_list ap)
+{
+	plan(6);
+	header();
+
+	test_no_extra_threads();
+	test_no_extra_threads_presorted();
+	test_multi_threaded();
+	test_presorted();
+	test_degenerated_bucket();
+	test_extra_argument();
+
+	footer();
+	test_result = check_plan();
+
+	return 0;
+}
+
+int
+main(void)
+{
+	memory_init();
+	fiber_init(fiber_cxx_invoke);
+
+	struct fiber *main = fiber_new_xc("main", main_f);
+	fiber_wakeup(main);
+	ev_run(loop(), 0);
+
+	fiber_free();
+	memory_free();
+	return test_result;
+}
diff --git a/third_party/qsort_arg.c b/third_party/qsort_arg.c
index faaa4859e5..921e8c31e3 100644
--- a/third_party/qsort_arg.c
+++ b/third_party/qsort_arg.c
@@ -118,7 +118,7 @@ med3(char *a, char *b, char *c, int (*cmp)(const void *a, const void *b, void *a
 /**
  * Single-thread version of qsort.
  */
-static void
+void
 qsort_arg_st(void *a, size_t n, size_t es, int (*cmp)(const void *a, const void *b, void *arg), void *arg)
 {
 	char	   *pa,
@@ -259,4 +259,3 @@ qsort_arg(void *a, size_t n, size_t es,
 	qsort_arg_st(a, n, es, cmp, arg);
 #endif
 }
-
diff --git a/third_party/qsort_arg.h b/third_party/qsort_arg.h
index a96e34be68..918d90ef13 100644
--- a/third_party/qsort_arg.h
+++ b/third_party/qsort_arg.h
@@ -8,6 +8,13 @@
 extern "C" {
 #endif /* defined(__cplusplus) */
 
+/**
+ * Single-thread version of qsort.
+ */
+void
+qsort_arg_st(void *a, size_t n, size_t es,
+	     int (*cmp)(const void *a, const void *b, void *arg), void *arg);
+
 /**
  * General version of qsort that calls single-threaded of multi-threaded
  * qsort depending on open MP availability and given array size.
-- 
GitLab