Skip to content
Snippets Groups Projects
Commit bdc027a4 authored by Igor Munkin's avatar Igor Munkin Committed by Igor Munkin
Browse files

lua: rewrite crc32 digest via Lua C API

As a result of recording <crc32:update> method or <digest.crc32>
function wrong semantics is compiled (strictly saying, the resulting
trace produces the different result from the one yielded by
interpreter). The easiest solution is disabling JIT for particular
functions, however, such approach drops the overall platform
performance. Hence, the mentioned functions are rewritten line by line
via Lua C API to avoid JIT misbehaviour.

NO_DOC=no visible changes
NO_CHANGELOG=no visible changes

(cherry picked from commit 6b913198)
parent 045ca62f
No related branches found
No related tags found
No related merge requests found
......@@ -37,6 +37,7 @@
#include <lua.h>
#include <lauxlib.h>
#include "utils.h"
#include "crc32.h"
#define PBKDF2_MAX_DIGEST_SIZE 128
......@@ -90,6 +91,82 @@ lua_pbkdf2(lua_State *L)
return 1;
}
/* CRC32 internal {{{ */
int
crc32_methods_update(lua_State *L)
{
size_t strl;
/* Get <string>. */
const char *str = lua_tolstring(L, 2, &strl);
if (str == NULL)
luaL_error(L, "Usage crc32:update(string)");
/* Get <self.value>. */
lua_getfield(L, 1, "value");
uint32_t crc32_begin = lua_tointeger(L, -1);
uint32_t crc32_result = crc32_calc(crc32_begin, str,
(unsigned int)strl);
/* Push the result. */
lua_pushinteger(L, crc32_result);
/* Set <self.value>. */
lua_setfield(L, 1, "value");
return 0;
}
int
crc32___call(lua_State *L)
{
size_t strl;
/* Get <string>. */
const char *str = lua_tolstring(L, 2, &strl);
if (str == NULL)
luaL_error(L, "Usage digest.crc32(string)");
/* Get <CRC32> upvalue. */
lua_pushvalue(L, lua_upvalueindex(1));
/* Get <CRC32.crc_begin>. */
lua_getfield(L, -1, "crc_begin");
uint32_t crc32_begin = lua_tointeger(L, -1);
uint32_t crc32_result = crc32_calc(crc32_begin, str,
(unsigned int)strl);
/* Push the result. */
lua_pushinteger(L, crc32_result);
return 1;
}
int
crc32_internal_init(lua_State *L)
{
/* Create the table with internal methods */
lua_createtable(L, 0, 2);
/* Copy <CRC32> on the top of the stack. */
lua_pushvalue(L, 1);
/* Create function <__call>. */
lua_pushcclosure(L, crc32___call, 1);
/* Store it to the table with internal methods. */
lua_setfield(L, -2, "__call");
/* Create function <update>. */
lua_pushcfunction(L, crc32_methods_update);
/* Store it to the table with internal methods. */
lua_setfield(L, -2, "update");
/* Copy the table with internal methods on the top of the stack. */
lua_pushvalue(L, -1);
/* Get the <crc32.internal> field from <package.loaded>. */
lua_getfield(L, LUA_REGISTRYINDEX, "_LOADED");
/* Set the table with internal methods in <package.loaded>. */
lua_setfield(L, -2, "crc32.internal");
/* Return the table with internal methods */
return 1;
}
LUA_API int
luaopen_crc32_internal(lua_State *L)
{
lua_pushcfunction(L, crc32_internal_init);
return 1;
}
/* }}} */
void
tarantool_lua_digest_init(struct lua_State *L)
{
......@@ -97,6 +174,10 @@ tarantool_lua_digest_init(struct lua_State *L)
{"pbkdf2", lua_pbkdf2},
{NULL, NULL}
};
lua_getfield(L, LUA_REGISTRYINDEX, "_PRELOAD");
lua_pushcfunction(L, luaopen_crc32_internal);
lua_setfield(L, -2, "crc32.internal");
lua_pop(L, 1);
luaL_register_module(L, "digest", lua_digest_methods);
lua_pop(L, 1);
};
......@@ -103,13 +103,6 @@ setmetatable(PMurHash, {
local CRC32
local CRC32_methods = {
update = function(self, str)
if type(str) ~= 'string' then
error("Usage crc32:update(string)")
end
self.value = ffi.C.crc32_calc(self.value, str, string.len(str))
end,
result = function(self)
return self.value
end,
......@@ -135,13 +128,12 @@ CRC32 = {
end
}
local __crc32 = require('crc32.internal')(CRC32)
CRC32_methods.update = __crc32.update
setmetatable(CRC32, {
__call = function(self, str)
if type(str) ~= 'string' then
error("Usage digest.crc32(string)")
end
return ffi.C.crc32_calc(CRC32.crc_begin, str, string.len(str))
end
__call = __crc32.__call
})
local pbkdf2 = function(pass, salt, iters, digest_len)
......
local digest = require('digest')
local log = require('log')
local net_box = require('net.box')
local server = require('test.luatest_helpers.server')
local t = require('luatest')
local g = t.group()
local TRIES = 3
-- {{{ from crud
local crud = {}
local sharding = {}
local utils = {}
local schema = {}
local const = {}
const.RELOAD_RETRIES_NUM = 1
const.RELOAD_SCHEMA_TIMEOUT = 3 -- 3 seconds
const.FETCH_SHARDING_KEY_TIMEOUT = 3 -- 3 seconds
function utils.extract_key(tuple, key_parts)
local key = {}
for i, part in ipairs(key_parts) do
key[i] = tuple[part.fieldno]
end
return key
end
jit.off(utils.extract_key)
local function reload_schema(_)
return true
end
function schema.wrap_func_reload(func, ...)
local i = 0
local res, err, need_reload
while true do
res, err, need_reload = func(...)
if err == nil or not need_reload then
break
end
local replicasets = nil
local ok, reload_schema_err = reload_schema(replicasets)
if not ok then
log.warn("Failed to reload schema: %s", reload_schema_err)
break
end
i = i + 1
if i > const.RELOAD_RETRIES_NUM then
break
end
end
return res, err
end
function sharding.key_get_bucket_id(key, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
end
return rawget(_G, 'bucket_id_strcrc32_2')({}, key)
end
function sharding.tuple_get_bucket_id(tuple, _, specified_bucket_id)
if specified_bucket_id ~= nil then
return specified_bucket_id
end
local sharding_index_parts = {{fieldno = 1}}
local sharding_index_parts_new = {}
for i, x in ipairs(sharding_index_parts) do
sharding_index_parts_new[i] = {fieldno = x.fieldno}
end
sharding_index_parts = sharding_index_parts_new
local key = utils.extract_key(tuple, sharding_index_parts)
return sharding.key_get_bucket_id(key)
end
function sharding.tuple_set_and_return_bucket_id(tuple, space, specified_bucket_id)
local err
local bucket_id_fieldno = 4
if specified_bucket_id ~= nil then
if tuple[bucket_id_fieldno] == nil then
tuple[bucket_id_fieldno] = specified_bucket_id
else
if tuple[bucket_id_fieldno] ~= specified_bucket_id then
local err_t = "Tuple and opts.bucket_id contain different " ..
"bucket_id values: %s and %s"
return nil, {err = err_t:format(tuple[bucket_id_fieldno],
specified_bucket_id)}
end
end
end
local bucket_id = tuple[bucket_id_fieldno]
if bucket_id == nil then
bucket_id, err = sharding.tuple_get_bucket_id(tuple, space)
if err ~= nil then
return nil, err
end
tuple[bucket_id_fieldno] = bucket_id
end
return bucket_id
end
jit.off(sharding.tuple_set_and_return_bucket_id)
local function call_insert_on_router(_, tuple, opts)
opts = opts or {}
sharding.tuple_set_and_return_bucket_id(tuple, nil, opts.bucket_id)
rawget(_G, 'r')()
end
local function call_delete_on_router(_, key, opts)
opts = opts or {}
sharding.key_get_bucket_id(key, opts.bucket_id)
end
function crud.insert(space_name, tuple, opts)
return schema.wrap_func_reload(call_insert_on_router, space_name, tuple, opts)
end
jit.off(crud.insert)
function crud.delete(space_name, key, opts)
return schema.wrap_func_reload(call_delete_on_router, space_name, key, opts)
end
jit.off(crud.delete)
-- }}} from crud
-- {{{ from vshard
local function strcrc32(shard_key)
if type(shard_key) ~= 'table' then
return digest.crc32(tostring(shard_key))
else
local crc32 = digest.crc32.new()
for _, v in ipairs(shard_key) do
crc32:update(tostring(v))
end
return crc32:result()
end
end
local function bucket_id_strcrc32(_, key)
local total_bucket_count = 30000
return strcrc32(key) % total_bucket_count + 1
end
_G.bucket_id_strcrc32 = bucket_id_strcrc32
jit.off(bucket_id_strcrc32)
local function bucket_id_strcrc32_2(router, key)
local total_bucket_count = router.total_bucket_count or 30000
return strcrc32(key) % total_bucket_count + 1
end
_G.bucket_id_strcrc32_2 = bucket_id_strcrc32_2
jit.off(bucket_id_strcrc32_2)
-- }}} from vshard
g.before_all = function()
g.server = server:new({alias = 'master'})
g.server:start()
end
local conn
local function r()
pcall(function()
if conn == nil then
conn = net_box.connect(g.server.net_box_uri)
end
conn:reload_schema()
end)
end
_G.r = r
local function test_bucket(iterations)
require('jit').off()
require('jit').flush()
require('jit').on()
for _ = 1, iterations do
crud.insert('transfersScenarioContext', {'d', {a = 1}, 1689123123123})
crud.delete('transfersScenarioContext', {'d'})
end
return bucket_id_strcrc32(nil, {'db095e64-9972-400b-a65b-d44047fcb812', nil})
end
g.test_recording = function()
for _ = 1, TRIES do
t.assert_equals(test_bucket(100), 29526)
end
end
......@@ -249,7 +249,7 @@ digest.md5_hex()
...
digest.crc32()
---
- error: 'builtin/digest.lua:<line>"]: Usage digest.crc32(string)'
- error: Usage digest.crc32(string)
...
digest.crc32_update(4294967295, '')
---
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment