From e5c4bd63adca0c06ee062307d40da5e6b729f680 Mon Sep 17 00:00:00 2001
From: Ilya Verbin <iverbin@tarantool.org>
Date: Wed, 3 Jul 2024 14:12:06 +0300
Subject: [PATCH] perf: add column insert test

The test creates an empty space with 1000 nullable columns storing uint64
values. Then it initializes a datasets that consists of 10 columns and
1 million rows (row count and both column counts are configurable), then
it inserts the dataset into the space.

By default the test uses serial C API but one may switch to the Arrow API
for batch insertion (the feature is exclusive to the Enterprise Edition).

It's also possible to specify the engine and wal_mode to use (default are
memtx, write).

Needed for tarantool/tarantool-ee#712

NO_DOC=perf test
NO_TEST=perf test
NO_CHANGELOG=perf test
---
 .../expose-luaL_pushnull-and-luaL_isnull      |   3 +-
 perf/lua/CMakeLists.txt                       |   8 +
 perf/lua/column_insert.lua                    | 170 ++++++++++++
 perf/lua/column_insert_module.c               | 246 ++++++++++++++++++
 src/trivia/util.h                             |   2 +
 5 files changed, 428 insertions(+), 1 deletion(-)
 create mode 100644 perf/lua/column_insert.lua
 create mode 100644 perf/lua/column_insert_module.c

diff --git a/changelogs/unreleased/expose-luaL_pushnull-and-luaL_isnull b/changelogs/unreleased/expose-luaL_pushnull-and-luaL_isnull
index 189962addc..d5e24296cd 100644
--- a/changelogs/unreleased/expose-luaL_pushnull-and-luaL_isnull
+++ b/changelogs/unreleased/expose-luaL_pushnull-and-luaL_isnull
@@ -1,3 +1,4 @@
 ## feature/lua
 
-* Exposed `luaL_pushnull()` and `luaL_isnull()` functions via C module API.
+* Exposed the `luaL_pushnull()` and `luaL_isnull()` functions via the C
+  module API.
diff --git a/perf/lua/CMakeLists.txt b/perf/lua/CMakeLists.txt
index 1419b96e1a..b7da22e9dc 100644
--- a/perf/lua/CMakeLists.txt
+++ b/perf/lua/CMakeLists.txt
@@ -1,3 +1,5 @@
+enable_tnt_compile_flags()
+
 set(TARANTOOL_BIN $<TARGET_FILE:tarantool>)
 set(RUN_PERF_LUA_TESTS_LIST "")
 
@@ -61,6 +63,12 @@ create_perf_lua_test(NAME column_scan
                      DEPENDS column_scan_module
 )
 
+build_module(column_insert_module column_insert_module.c)
+target_link_libraries(column_insert_module msgpuck)
+create_perf_lua_test(NAME column_insert
+                     DEPENDS column_insert_module
+)
+
 add_custom_target(test-lua-perf
                   DEPENDS "${RUN_PERF_LUA_TESTS_LIST}"
                   COMMENT "Running Lua performance tests"
diff --git a/perf/lua/column_insert.lua b/perf/lua/column_insert.lua
new file mode 100644
index 0000000000..b9bab3504b
--- /dev/null
+++ b/perf/lua/column_insert.lua
@@ -0,0 +1,170 @@
+--
+-- The test measures run time of batch insertion into the space columns.
+--
+-- Output format (console):
+-- <test-case> <rows-per-second>
+--
+-- NOTE: The test requires a C module. Set the BUILDDIR environment variable to
+-- the tarantool build directory if using out-of-source build.
+--
+
+local clock = require('clock')
+local fiber = require('fiber')
+local fio = require('fio')
+local log = require('log')
+local tarantool = require('tarantool')
+local benchmark = require('benchmark')
+
+local USAGE = [[
+   engine <string, 'memtx'>          - space engine to use for the test
+   wal_mode <string, 'write'>        - write-ahead log mode to use for the test
+   column_count_total <number, 1000> - number of columns in the test space
+   column_count_batch <number, 10>   - number of columns in the record batch
+   row_count_total <number, 1000000> - number of inserted rows
+   row_count_batch <number, 1000>    - number of rows per record batch
+   use_arrow_api <boolean, false>    - use the Arrow API for batch insertion
+
+]]
+
+local params = benchmark.argparse(arg, {
+    {'engine', 'string'},
+    {'wal_mode', 'string'},
+    {'column_count_total', 'number'},
+    {'column_count_batch', 'number'},
+    {'row_count_total', 'number'},
+    {'row_count_batch', 'number'},
+    {'use_arrow_api', 'boolean'},
+}, USAGE)
+
+local DEFAULT_ENGINE = 'memtx'
+local DEFAULT_WAL_MODE = 'write'
+local DEFAULT_COLUMN_COUNT_TOTAL = 1000
+local DEFAULT_COLUMN_COUNT_BATCH = 10
+local DEFAULT_ROW_COUNT_TOTAL = 1000 * 1000
+local DEFAULT_ROW_COUNT_BATCH = 1000
+
+params.engine = params.engine or DEFAULT_ENGINE
+params.wal_mode = params.wal_mode or DEFAULT_WAL_MODE
+params.column_count_total = params.column_count_total or
+                            DEFAULT_COLUMN_COUNT_TOTAL
+params.column_count_batch = params.column_count_batch or
+                            DEFAULT_COLUMN_COUNT_BATCH
+params.row_count_total = params.row_count_total or DEFAULT_ROW_COUNT_TOTAL
+params.row_count_batch = params.row_count_batch or DEFAULT_ROW_COUNT_BATCH
+params.use_arrow_api = params.use_arrow_api or false
+
+assert(params.column_count_batch <= params.column_count_total)
+assert(params.column_count_batch < 1000 * 1000)
+assert(params.row_count_batch <= params.row_count_total)
+assert(params.row_count_total % params.row_count_batch == 0)
+
+local bench = benchmark.new(params)
+
+local BUILDDIR = fio.abspath(fio.pathjoin(os.getenv('BUILDDIR') or '.'))
+local MODULEPATH = fio.pathjoin(BUILDDIR, 'perf', 'lua',
+                                '?.' .. tarantool.build.mod_format)
+package.cpath = MODULEPATH .. ';' .. package.cpath
+
+local test_module_name = 'column_insert_module'
+local has_test_module, test_module = pcall(require, test_module_name)
+if not has_test_module then
+    local errmsg = ('Lua module "%s" is not found.\n'):format(test_module_name)
+    io.stderr:write(errmsg)
+    os.exit(1)
+end
+
+local test_funcs = {}
+for _, func_name in ipairs({'insert'}) do
+    local full_func_name
+    if params.use_arrow_api then
+        full_func_name = func_name .. '_batch'
+    else
+        full_func_name = func_name .. '_serial'
+    end
+    local f = test_module[full_func_name]
+    if f == nil then
+        error('The specified test mode is not supported by this build')
+    end
+    test_funcs[func_name] = f
+end
+
+local test_dir = fio.tempdir()
+
+local function rmtree(s)
+    if (fio.path.is_file(s) or fio.path.is_link(s)) then
+        fio.unlink(s)
+        return
+    end
+    if fio.path.is_dir(s) then
+        for _,i in pairs(fio.listdir(s)) do
+            rmtree(s..'/'..i)
+        end
+        fio.rmdir(s)
+    end
+end
+
+box.cfg({
+    log = 'tarantool.log',
+    work_dir = test_dir,
+    wal_mode = params.wal_mode,
+    memtx_memory = 4 * 1024 * 1024 * 1024,
+    checkpoint_count = 1,
+})
+
+box.once('init', function()
+    log.info('Creating the test space...')
+    local format = {}
+    for i = 1, params.column_count_total do
+        table.insert(format, {'field_' .. i, 'uint64', is_nullable = true})
+    end
+    format[1].is_nullable = false
+    local s = box.schema.space.create('test', {
+        engine = params.engine,
+        field_count = #format,
+        format = format,
+    })
+    s:create_index('pk')
+end)
+
+local function check_result(result, expected)
+    log.info('expected %s, got %s', expected, result)
+    assert(result == expected)
+end
+
+local TESTS = {
+    {
+        name = 'insert',
+        func = function()
+            test_funcs.insert(box.space.test.id, params)
+            check_result(box.space.test:count(), params.row_count_total)
+        end,
+    },
+}
+
+local function run_test(test)
+    local func = test.func
+    local real_time_start = clock.time()
+    local cpu_time_start = clock.proc()
+    func()
+    local delta_real = clock.time() - real_time_start
+    local delta_cpu = clock.proc() - cpu_time_start
+    bench:add_result(test.name, {
+        real_time = delta_real,
+        cpu_time = delta_cpu,
+        items = params.row_count_total,
+    })
+end
+
+fiber.set_max_slice(9000)
+test_module.init(params)
+
+for _, test in ipairs(TESTS) do
+    log.info('Running test %s...', test.name)
+    run_test(test)
+end
+
+bench:dump_results()
+
+test_module.fini()
+rmtree(test_dir)
+os.exit(0)
diff --git a/perf/lua/column_insert_module.c b/perf/lua/column_insert_module.c
new file mode 100644
index 0000000000..d5b5f768c2
--- /dev/null
+++ b/perf/lua/column_insert_module.c
@@ -0,0 +1,246 @@
+#include <lua.h>
+#include <lauxlib.h>
+#include <module.h>
+#include <msgpuck.h>
+#include <stddef.h>
+#include <stdint.h>
+
+#include "trivia/config.h"
+#include "trivia/util.h"
+#include "arrow/abi.h"
+
+#ifdef ENABLE_MEMCS_ENGINE
+# define ENABLE_BATCH_INSERT 1
+#endif
+
+static struct {
+	int64_t row_count;
+	int64_t column_count;
+	struct {
+		char *name;
+		const char *type;
+		uint64_t *data;
+	} *columns;
+} dataset;
+
+static int
+insert_serial_lua_func(struct lua_State *L)
+{
+	uint32_t space_id = luaL_checkinteger(L, 1);
+	luaL_checktype(L, 2, LUA_TTABLE);
+	lua_getfield(L, 2, "row_count_batch");
+	int batch_row_count = luaL_checkinteger(L, -1);
+	lua_getfield(L, 2, "column_count_total");
+	int total_column_count = luaL_checkinteger(L, -1);
+	lua_pop(L, 2);
+	static char tuple_data[1000 * 1000];
+
+	VERIFY(box_txn_begin() == 0);
+	for (int64_t i = 0; i < dataset.row_count; i++) {
+		char *data_end = tuple_data;
+		data_end = mp_encode_array(data_end, total_column_count);
+		int j;
+		for (j = 0; j < dataset.column_count; j++) {
+			uint64_t val = dataset.columns[j].data[i];
+			data_end = mp_encode_uint(data_end, val);
+		}
+		for (; j < total_column_count; j++)
+			data_end = mp_encode_nil(data_end);
+		size_t tuple_size = data_end - tuple_data;
+		if (tuple_size > sizeof(tuple_data))
+			abort();
+		if (box_insert(space_id, tuple_data, data_end, NULL) != 0)
+			return luaT_error(L);
+		if (i % batch_row_count == 0) {
+			VERIFY(box_txn_commit() == 0);
+			VERIFY(box_txn_begin() == 0);
+		}
+	}
+	VERIFY(box_txn_commit() == 0);
+	return 0;
+}
+
+#if defined(ENABLE_BATCH_INSERT)
+static void
+arrow_schema_destroy(struct ArrowSchema *schema)
+{
+	for (int i = 0; i < schema->n_children; i++) {
+		struct ArrowSchema *child = schema->children[i];
+		if (child->release != NULL)
+			child->release(child);
+		free(child);
+	}
+	free(schema->children);
+	schema->release = NULL;
+}
+
+static void
+arrow_schema_init(struct ArrowSchema *schema)
+{
+	*schema = (struct ArrowSchema) {
+		.format = "+s",
+		.name = NULL,
+		.metadata = NULL,
+		.flags = 0,
+		.n_children = dataset.column_count,
+		.children = xmalloc(sizeof(struct ArrowSchema *) *
+				    dataset.column_count),
+		.dictionary = NULL,
+		.release = arrow_schema_destroy,
+		.private_data = NULL,
+	};
+	for (int i = 0; i < dataset.column_count; i++) {
+		schema->children[i] = xmalloc(sizeof(*schema->children[i]));
+		*schema->children[i] = (struct ArrowSchema) {
+			.format = dataset.columns[i].type,
+			.name = dataset.columns[i].name,
+			.metadata = NULL,
+			.flags = 0,
+			.n_children = 0,
+			.children = NULL,
+			.dictionary = NULL,
+			.release = arrow_schema_destroy,
+			.private_data = NULL,
+		};
+	};
+}
+
+static void
+arrow_array_destroy(struct ArrowArray *array)
+{
+	for (int i = 0; i < array->n_children; i++) {
+		struct ArrowArray *child = array->children[i];
+		if (child != NULL) {
+			if (child->release != NULL)
+				child->release(child);
+			free(child);
+		}
+	}
+	free(array->children);
+	free(array->buffers);
+	array->release = NULL;
+}
+
+static void
+arrow_array_init(struct ArrowArray *array, int row_count)
+{
+	*array = (struct ArrowArray) {
+		.length = row_count,
+		.null_count = 0,
+		.offset = 0,
+		.n_buffers = 1,
+		.n_children = dataset.column_count,
+		.buffers = xcalloc(1, sizeof(void *)),
+		.children = xmalloc(sizeof(struct ArrowArray *)
+				    * dataset.column_count),
+		.dictionary = NULL,
+		.release = arrow_array_destroy,
+		.private_data = NULL,
+	};
+	for (int i = 0; i < dataset.column_count; i++) {
+		array->children[i] = xmalloc(sizeof(*array->children[i]));
+		*array->children[i] = (struct ArrowArray) {
+			.length = row_count,
+			.null_count = 0,
+			.offset = 0,
+			.n_buffers = 2,
+			.n_children = 0,
+			.buffers = xcalloc(2, sizeof(void *)),
+			.children = NULL,
+			.dictionary = NULL,
+			.release = arrow_array_destroy,
+			.private_data = NULL,
+		};
+	};
+}
+
+static int
+insert_batch_lua_func(struct lua_State *L)
+{
+	uint32_t space_id = luaL_checkinteger(L, 1);
+	luaL_checktype(L, 2, LUA_TTABLE);
+	lua_getfield(L, 2, "row_count_batch");
+	int batch_row_count = luaL_checkinteger(L, -1);
+	lua_pop(L, 1);
+
+	struct ArrowSchema schema;
+	arrow_schema_init(&schema);
+	struct ArrowArray array;
+	arrow_array_init(&array, batch_row_count);
+
+	assert(dataset.row_count % batch_row_count == 0);
+	for (int i = 0; i < dataset.row_count / batch_row_count; i++) {
+		for (int j = 0; j < dataset.column_count; j++) {
+			array.children[j]->buffers[1] =
+				&dataset.columns[j].data[i * batch_row_count];
+		}
+		if (box_insert_arrow(space_id, &array, &schema) != 0)
+			return luaT_error(L);
+	}
+	schema.release(&schema);
+	array.release(&array);
+	return 0;
+}
+#endif /* defined(ENABLE_BATCH_INSERT) */
+
+static int
+init_lua_func(struct lua_State *L)
+{
+	say_info("Generating the test data set...");
+	luaL_checktype(L, 1, LUA_TTABLE);
+	lua_getfield(L, 1, "row_count_total");
+	dataset.row_count = luaL_checkinteger(L, -1);
+	lua_getfield(L, 1, "column_count_batch");
+	dataset.column_count = luaL_checkinteger(L, -1);
+	lua_pop(L, 2);
+	dataset.columns = xmalloc(dataset.column_count *
+				  sizeof(*dataset.columns));
+	size_t data_size = dataset.row_count * sizeof(*dataset.columns->data);
+	for (int i = 0; i < dataset.column_count; i++) {
+		uint64_t *data = xaligned_alloc(data_size, 64);
+		size_t name_size = 20;
+		char *name = xmalloc(name_size);
+		snprintf(name, name_size, "field_%d", i + 1);
+		dataset.columns[i].name = name;
+		dataset.columns[i].type = "L";
+		dataset.columns[i].data = data;
+		for (int j = 0; j < dataset.row_count; j++) {
+			if (i % 2 == 0)
+				data[j] = j;
+			else
+				data[j] = dataset.row_count - j;
+		}
+	}
+	return 0;
+}
+
+static int
+fini_lua_func(struct lua_State *L)
+{
+	(void)L;
+	for (int i = 0; i < dataset.column_count; i++) {
+		free(dataset.columns[i].name);
+		free(dataset.columns[i].data);
+	}
+	free(dataset.columns);
+	dataset.columns = NULL;
+	dataset.column_count = 0;
+	dataset.row_count = 0;
+	return 0;
+}
+
+LUA_API int
+luaopen_column_insert_module(struct lua_State *L)
+{
+	static const struct luaL_Reg lib[] = {
+		{"init", init_lua_func},
+		{"fini", fini_lua_func},
+		{"insert_serial", insert_serial_lua_func},
+#ifdef ENABLE_BATCH_INSERT
+		{"insert_batch", insert_batch_lua_func},
+#endif
+		{NULL, NULL},
+	};
+	luaL_register(L, "column_insert_module", lib);
+	return 1;
+}
diff --git a/src/trivia/util.h b/src/trivia/util.h
index 15fa9f16d2..195fd324eb 100644
--- a/src/trivia/util.h
+++ b/src/trivia/util.h
@@ -131,6 +131,8 @@ alloc_failure(const char *filename, int line, size_t size)
 #define xrealloc(ptr, size)	xalloc_impl((size), realloc, (ptr), (size))
 #define xstrdup(s)		xalloc_impl(strlen((s)) + 1, strdup, (s))
 #define xstrndup(s, n)		xalloc_impl((n) + 1, strndup, (s), (n))
+#define xaligned_alloc(size, align) \
+		xalloc_impl((size), aligned_alloc, (align), (size))
 #define xmempool_alloc(p)	xalloc_impl((p)->objsize, mempool_alloc, (p))
 #define xregion_alloc(p, size)	xalloc_impl((size), region_alloc, (p), (size))
 #define xregion_aligned_alloc(p, size, align) \
-- 
GitLab