Skip to content
Snippets Groups Projects
Commit e5c4bd63 authored by Ilya Verbin's avatar Ilya Verbin Committed by Vladimir Davydov
Browse files

perf: add column insert test

The test creates an empty space with 1000 nullable columns storing uint64
values. Then it initializes a datasets that consists of 10 columns and
1 million rows (row count and both column counts are configurable), then
it inserts the dataset into the space.

By default the test uses serial C API but one may switch to the Arrow API
for batch insertion (the feature is exclusive to the Enterprise Edition).

It's also possible to specify the engine and wal_mode to use (default are
memtx, write).

Needed for tarantool/tarantool-ee#712

NO_DOC=perf test
NO_TEST=perf test
NO_CHANGELOG=perf test
parent 8cd677da
No related branches found
No related tags found
No related merge requests found
## feature/lua
* Exposed `luaL_pushnull()` and `luaL_isnull()` functions via C module API.
* Exposed the `luaL_pushnull()` and `luaL_isnull()` functions via the C
module API.
enable_tnt_compile_flags()
set(TARANTOOL_BIN $<TARGET_FILE:tarantool>)
set(RUN_PERF_LUA_TESTS_LIST "")
......@@ -61,6 +63,12 @@ create_perf_lua_test(NAME column_scan
DEPENDS column_scan_module
)
build_module(column_insert_module column_insert_module.c)
target_link_libraries(column_insert_module msgpuck)
create_perf_lua_test(NAME column_insert
DEPENDS column_insert_module
)
add_custom_target(test-lua-perf
DEPENDS "${RUN_PERF_LUA_TESTS_LIST}"
COMMENT "Running Lua performance tests"
......
--
-- The test measures run time of batch insertion into the space columns.
--
-- Output format (console):
-- <test-case> <rows-per-second>
--
-- NOTE: The test requires a C module. Set the BUILDDIR environment variable to
-- the tarantool build directory if using out-of-source build.
--
local clock = require('clock')
local fiber = require('fiber')
local fio = require('fio')
local log = require('log')
local tarantool = require('tarantool')
local benchmark = require('benchmark')
local USAGE = [[
engine <string, 'memtx'> - space engine to use for the test
wal_mode <string, 'write'> - write-ahead log mode to use for the test
column_count_total <number, 1000> - number of columns in the test space
column_count_batch <number, 10> - number of columns in the record batch
row_count_total <number, 1000000> - number of inserted rows
row_count_batch <number, 1000> - number of rows per record batch
use_arrow_api <boolean, false> - use the Arrow API for batch insertion
]]
local params = benchmark.argparse(arg, {
{'engine', 'string'},
{'wal_mode', 'string'},
{'column_count_total', 'number'},
{'column_count_batch', 'number'},
{'row_count_total', 'number'},
{'row_count_batch', 'number'},
{'use_arrow_api', 'boolean'},
}, USAGE)
local DEFAULT_ENGINE = 'memtx'
local DEFAULT_WAL_MODE = 'write'
local DEFAULT_COLUMN_COUNT_TOTAL = 1000
local DEFAULT_COLUMN_COUNT_BATCH = 10
local DEFAULT_ROW_COUNT_TOTAL = 1000 * 1000
local DEFAULT_ROW_COUNT_BATCH = 1000
params.engine = params.engine or DEFAULT_ENGINE
params.wal_mode = params.wal_mode or DEFAULT_WAL_MODE
params.column_count_total = params.column_count_total or
DEFAULT_COLUMN_COUNT_TOTAL
params.column_count_batch = params.column_count_batch or
DEFAULT_COLUMN_COUNT_BATCH
params.row_count_total = params.row_count_total or DEFAULT_ROW_COUNT_TOTAL
params.row_count_batch = params.row_count_batch or DEFAULT_ROW_COUNT_BATCH
params.use_arrow_api = params.use_arrow_api or false
assert(params.column_count_batch <= params.column_count_total)
assert(params.column_count_batch < 1000 * 1000)
assert(params.row_count_batch <= params.row_count_total)
assert(params.row_count_total % params.row_count_batch == 0)
local bench = benchmark.new(params)
local BUILDDIR = fio.abspath(fio.pathjoin(os.getenv('BUILDDIR') or '.'))
local MODULEPATH = fio.pathjoin(BUILDDIR, 'perf', 'lua',
'?.' .. tarantool.build.mod_format)
package.cpath = MODULEPATH .. ';' .. package.cpath
local test_module_name = 'column_insert_module'
local has_test_module, test_module = pcall(require, test_module_name)
if not has_test_module then
local errmsg = ('Lua module "%s" is not found.\n'):format(test_module_name)
io.stderr:write(errmsg)
os.exit(1)
end
local test_funcs = {}
for _, func_name in ipairs({'insert'}) do
local full_func_name
if params.use_arrow_api then
full_func_name = func_name .. '_batch'
else
full_func_name = func_name .. '_serial'
end
local f = test_module[full_func_name]
if f == nil then
error('The specified test mode is not supported by this build')
end
test_funcs[func_name] = f
end
local test_dir = fio.tempdir()
local function rmtree(s)
if (fio.path.is_file(s) or fio.path.is_link(s)) then
fio.unlink(s)
return
end
if fio.path.is_dir(s) then
for _,i in pairs(fio.listdir(s)) do
rmtree(s..'/'..i)
end
fio.rmdir(s)
end
end
box.cfg({
log = 'tarantool.log',
work_dir = test_dir,
wal_mode = params.wal_mode,
memtx_memory = 4 * 1024 * 1024 * 1024,
checkpoint_count = 1,
})
box.once('init', function()
log.info('Creating the test space...')
local format = {}
for i = 1, params.column_count_total do
table.insert(format, {'field_' .. i, 'uint64', is_nullable = true})
end
format[1].is_nullable = false
local s = box.schema.space.create('test', {
engine = params.engine,
field_count = #format,
format = format,
})
s:create_index('pk')
end)
local function check_result(result, expected)
log.info('expected %s, got %s', expected, result)
assert(result == expected)
end
local TESTS = {
{
name = 'insert',
func = function()
test_funcs.insert(box.space.test.id, params)
check_result(box.space.test:count(), params.row_count_total)
end,
},
}
local function run_test(test)
local func = test.func
local real_time_start = clock.time()
local cpu_time_start = clock.proc()
func()
local delta_real = clock.time() - real_time_start
local delta_cpu = clock.proc() - cpu_time_start
bench:add_result(test.name, {
real_time = delta_real,
cpu_time = delta_cpu,
items = params.row_count_total,
})
end
fiber.set_max_slice(9000)
test_module.init(params)
for _, test in ipairs(TESTS) do
log.info('Running test %s...', test.name)
run_test(test)
end
bench:dump_results()
test_module.fini()
rmtree(test_dir)
os.exit(0)
#include <lua.h>
#include <lauxlib.h>
#include <module.h>
#include <msgpuck.h>
#include <stddef.h>
#include <stdint.h>
#include "trivia/config.h"
#include "trivia/util.h"
#include "arrow/abi.h"
#ifdef ENABLE_MEMCS_ENGINE
# define ENABLE_BATCH_INSERT 1
#endif
static struct {
int64_t row_count;
int64_t column_count;
struct {
char *name;
const char *type;
uint64_t *data;
} *columns;
} dataset;
static int
insert_serial_lua_func(struct lua_State *L)
{
uint32_t space_id = luaL_checkinteger(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);
lua_getfield(L, 2, "row_count_batch");
int batch_row_count = luaL_checkinteger(L, -1);
lua_getfield(L, 2, "column_count_total");
int total_column_count = luaL_checkinteger(L, -1);
lua_pop(L, 2);
static char tuple_data[1000 * 1000];
VERIFY(box_txn_begin() == 0);
for (int64_t i = 0; i < dataset.row_count; i++) {
char *data_end = tuple_data;
data_end = mp_encode_array(data_end, total_column_count);
int j;
for (j = 0; j < dataset.column_count; j++) {
uint64_t val = dataset.columns[j].data[i];
data_end = mp_encode_uint(data_end, val);
}
for (; j < total_column_count; j++)
data_end = mp_encode_nil(data_end);
size_t tuple_size = data_end - tuple_data;
if (tuple_size > sizeof(tuple_data))
abort();
if (box_insert(space_id, tuple_data, data_end, NULL) != 0)
return luaT_error(L);
if (i % batch_row_count == 0) {
VERIFY(box_txn_commit() == 0);
VERIFY(box_txn_begin() == 0);
}
}
VERIFY(box_txn_commit() == 0);
return 0;
}
#if defined(ENABLE_BATCH_INSERT)
static void
arrow_schema_destroy(struct ArrowSchema *schema)
{
for (int i = 0; i < schema->n_children; i++) {
struct ArrowSchema *child = schema->children[i];
if (child->release != NULL)
child->release(child);
free(child);
}
free(schema->children);
schema->release = NULL;
}
static void
arrow_schema_init(struct ArrowSchema *schema)
{
*schema = (struct ArrowSchema) {
.format = "+s",
.name = NULL,
.metadata = NULL,
.flags = 0,
.n_children = dataset.column_count,
.children = xmalloc(sizeof(struct ArrowSchema *) *
dataset.column_count),
.dictionary = NULL,
.release = arrow_schema_destroy,
.private_data = NULL,
};
for (int i = 0; i < dataset.column_count; i++) {
schema->children[i] = xmalloc(sizeof(*schema->children[i]));
*schema->children[i] = (struct ArrowSchema) {
.format = dataset.columns[i].type,
.name = dataset.columns[i].name,
.metadata = NULL,
.flags = 0,
.n_children = 0,
.children = NULL,
.dictionary = NULL,
.release = arrow_schema_destroy,
.private_data = NULL,
};
};
}
static void
arrow_array_destroy(struct ArrowArray *array)
{
for (int i = 0; i < array->n_children; i++) {
struct ArrowArray *child = array->children[i];
if (child != NULL) {
if (child->release != NULL)
child->release(child);
free(child);
}
}
free(array->children);
free(array->buffers);
array->release = NULL;
}
static void
arrow_array_init(struct ArrowArray *array, int row_count)
{
*array = (struct ArrowArray) {
.length = row_count,
.null_count = 0,
.offset = 0,
.n_buffers = 1,
.n_children = dataset.column_count,
.buffers = xcalloc(1, sizeof(void *)),
.children = xmalloc(sizeof(struct ArrowArray *)
* dataset.column_count),
.dictionary = NULL,
.release = arrow_array_destroy,
.private_data = NULL,
};
for (int i = 0; i < dataset.column_count; i++) {
array->children[i] = xmalloc(sizeof(*array->children[i]));
*array->children[i] = (struct ArrowArray) {
.length = row_count,
.null_count = 0,
.offset = 0,
.n_buffers = 2,
.n_children = 0,
.buffers = xcalloc(2, sizeof(void *)),
.children = NULL,
.dictionary = NULL,
.release = arrow_array_destroy,
.private_data = NULL,
};
};
}
static int
insert_batch_lua_func(struct lua_State *L)
{
uint32_t space_id = luaL_checkinteger(L, 1);
luaL_checktype(L, 2, LUA_TTABLE);
lua_getfield(L, 2, "row_count_batch");
int batch_row_count = luaL_checkinteger(L, -1);
lua_pop(L, 1);
struct ArrowSchema schema;
arrow_schema_init(&schema);
struct ArrowArray array;
arrow_array_init(&array, batch_row_count);
assert(dataset.row_count % batch_row_count == 0);
for (int i = 0; i < dataset.row_count / batch_row_count; i++) {
for (int j = 0; j < dataset.column_count; j++) {
array.children[j]->buffers[1] =
&dataset.columns[j].data[i * batch_row_count];
}
if (box_insert_arrow(space_id, &array, &schema) != 0)
return luaT_error(L);
}
schema.release(&schema);
array.release(&array);
return 0;
}
#endif /* defined(ENABLE_BATCH_INSERT) */
static int
init_lua_func(struct lua_State *L)
{
say_info("Generating the test data set...");
luaL_checktype(L, 1, LUA_TTABLE);
lua_getfield(L, 1, "row_count_total");
dataset.row_count = luaL_checkinteger(L, -1);
lua_getfield(L, 1, "column_count_batch");
dataset.column_count = luaL_checkinteger(L, -1);
lua_pop(L, 2);
dataset.columns = xmalloc(dataset.column_count *
sizeof(*dataset.columns));
size_t data_size = dataset.row_count * sizeof(*dataset.columns->data);
for (int i = 0; i < dataset.column_count; i++) {
uint64_t *data = xaligned_alloc(data_size, 64);
size_t name_size = 20;
char *name = xmalloc(name_size);
snprintf(name, name_size, "field_%d", i + 1);
dataset.columns[i].name = name;
dataset.columns[i].type = "L";
dataset.columns[i].data = data;
for (int j = 0; j < dataset.row_count; j++) {
if (i % 2 == 0)
data[j] = j;
else
data[j] = dataset.row_count - j;
}
}
return 0;
}
static int
fini_lua_func(struct lua_State *L)
{
(void)L;
for (int i = 0; i < dataset.column_count; i++) {
free(dataset.columns[i].name);
free(dataset.columns[i].data);
}
free(dataset.columns);
dataset.columns = NULL;
dataset.column_count = 0;
dataset.row_count = 0;
return 0;
}
LUA_API int
luaopen_column_insert_module(struct lua_State *L)
{
static const struct luaL_Reg lib[] = {
{"init", init_lua_func},
{"fini", fini_lua_func},
{"insert_serial", insert_serial_lua_func},
#ifdef ENABLE_BATCH_INSERT
{"insert_batch", insert_batch_lua_func},
#endif
{NULL, NULL},
};
luaL_register(L, "column_insert_module", lib);
return 1;
}
......@@ -131,6 +131,8 @@ alloc_failure(const char *filename, int line, size_t size)
#define xrealloc(ptr, size) xalloc_impl((size), realloc, (ptr), (size))
#define xstrdup(s) xalloc_impl(strlen((s)) + 1, strdup, (s))
#define xstrndup(s, n) xalloc_impl((n) + 1, strndup, (s), (n))
#define xaligned_alloc(size, align) \
xalloc_impl((size), aligned_alloc, (align), (size))
#define xmempool_alloc(p) xalloc_impl((p)->objsize, mempool_alloc, (p))
#define xregion_alloc(p, size) xalloc_impl((size), region_alloc, (p), (size))
#define xregion_aligned_alloc(p, size, align) \
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment