Skip to content
Snippets Groups Projects
Commit 8ae88a3f authored by Vladislav Shpilevoy's avatar Vladislav Shpilevoy
Browse files

swim: cache decoded payload in the Lua module

Users of Lua SWIM module likely will use Lua objects as a
payload. Lua objects are serialized into MessagePack
automatically, and deserialized back on other instances. But
deserialization of 1.2Kb payload on each member:payload()
invocation is quite heavy operation. This commit caches decoded
payloads to return them again until change.

A microbenchmark showed, that cached payload is returned ~100
times faster, than it is decoded each time. Even though a tested
payload was quite small and simple:

    s:set_payload({a = 100, b = 200})

Even this payload is returned 100 times faster, and does not
affect GC.

Part of #3234
parent 70e99323
No related branches found
No related tags found
No related merge requests found
......@@ -113,6 +113,7 @@ swim_member_payload
swim_member_ref
swim_member_unref
swim_member_is_dropped
swim_member_is_payload_up_to_date
# Module API
......
......@@ -2080,3 +2080,9 @@ swim_member_payload(const struct swim_member *member, int *size)
*size = member->payload_size;
return member->payload;
}
bool
swim_member_is_payload_up_to_date(const struct swim_member *member)
{
return member->is_payload_up_to_date;
}
......@@ -220,6 +220,18 @@ swim_member_incarnation(const struct swim_member *member);
const char *
swim_member_payload(const struct swim_member *member, int *size);
/**
* Check if member's payload is up to its incarnation. Sometimes
* it happens, that a member has changed payload, but other
* members learned only a new incarnation without the new payload.
* Then the payload is considered outdated, and is updated
* eventually later. The method is rather internal, and should not
* be used by any public API. It is exposed to implement decoded
* payload cache in the SWIM Lua module.
*/
bool
swim_member_is_payload_up_to_date(const struct swim_member *member);
/**
* Reference a member. The member memory will be valid until unref
* is called.
......
......@@ -96,6 +96,9 @@ ffi.cdef[[
bool
swim_member_is_dropped(const struct swim_member *member);
bool
swim_member_is_payload_up_to_date(const struct swim_member *member);
]]
-- Shortcut to avoid unnecessary lookups in 'ffi' table.
......@@ -341,17 +344,36 @@ end
-- This member method tries to interpret payload as MessagePack,
-- and if fails, returns the payload as a string.
--
-- This function caches its result. It means, that only first call
-- actually decodes cdata payload. All the next calls return
-- pointer to the same result, until payload is changed with a new
-- incarnation.
--
local function swim_member_payload(m)
local ptr = swim_check_member(m, 'member:payload()')
-- Two keys are needed. Incarnation is not enough, because a
-- new incarnation can be disseminated earlier than a new
-- payload. For example, via ACK messages.
local key1 = capi.swim_member_incarnation(ptr)
local key2 = capi.swim_member_is_payload_up_to_date(ptr)
if key1 == m.p_key1 and key2 == m.p_key2 then
return m.p
end
local cdata, size = swim_member_payload_raw(ptr)
local ok, result
if size == 0 then
return ''
end
local ok, res = pcall(msgpack.decode, cdata, size)
if not ok then
return ffi.string(cdata, size)
result = ''
else
ok, result = pcall(msgpack.decode, cdata, size)
if not ok then
result = ffi.string(cdata, size)
end
end
return res
-- Member bans new indexes. Only rawset() can be used.
rawset(m, 'p', result)
rawset(m, 'p_key1', key1)
rawset(m, 'p_key2', key2)
return result
end
--
......@@ -396,8 +418,9 @@ local swim_member_mt = {
}
--
-- Wrap a SWIM member into a table with proper metamethods. Also
-- it is going to be used to cache a decoded payload.
-- Wrap a SWIM member into a table with proper metamethods. The
-- table-wrapper stores not only a pointer, but also cached
-- decoded payload.
--
local function swim_member_wrap(ptr)
capi.swim_member_ref(ptr)
......
......@@ -836,6 +836,119 @@ iterate()
s:delete()
---
...
--
-- Payload caching.
--
s1 = swim.new({uuid = uuid(1), uri = uri(listen_port), heartbeat_rate = 0.01})
---
...
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
---
...
s1_self = s1:self()
---
...
s1:add_member({uuid = s2:self():uuid(), uri = s2:self():uri()})
---
- true
...
s2:add_member({uuid = s1_self:uuid(), uri = s1_self:uri()})
---
- true
...
s1:size()
---
- 2
...
s2:size()
---
- 2
...
s1_view = s2:member_by_uuid(s1_self:uuid())
---
...
s1:set_payload({a = 100})
---
- true
...
p = s1_self:payload()
---
...
s1_self:payload() == p
---
- true
...
while s1_view:payload() == '' do fiber.sleep(0.01) end
---
...
p = s1_view:payload()
---
...
s1_view:payload() == p
---
- true
...
-- Now a complex case. It is possible, that a new member's
-- incarnation is learned, but new payload is not. Payload cache
-- should correctly process that.
s1:cfg({heartbeat_rate = 1000})
---
- true
...
s2:cfg({heartbeat_rate = 1000})
---
- true
...
s1:set_payload({a = 200})
---
- true
...
-- Via probe() S2 learns new incarnation of S1, but without new
-- payload.
s2:probe_member(s1_self:uri())
---
- true
...
s1_view:payload()
---
- {'a': 100}
...
s1_view:incarnation()
---
- 2
...
s1:cfg({heartbeat_rate = 0.01})
---
- true
...
s2:cfg({heartbeat_rate = 0.01})
---
- true
...
while s1_view:payload().a ~= 200 do fiber.sleep(0.01) end
---
...
p = s1_view:payload()
---
...
s1_view:payload() == p
---
- true
...
p
---
- {'a': 200}
...
s1_view:incarnation()
---
- 2
...
s1:delete()
---
...
s2:delete()
---
...
test_run:cmd("clear filter")
---
- true
......
......@@ -268,4 +268,49 @@ s:add_member({uuid = uuid(3), uri = uri()})
iterate()
s:delete()
--
-- Payload caching.
--
s1 = swim.new({uuid = uuid(1), uri = uri(listen_port), heartbeat_rate = 0.01})
s2 = swim.new({uuid = uuid(2), uri = uri(), heartbeat_rate = 0.01})
s1_self = s1:self()
s1:add_member({uuid = s2:self():uuid(), uri = s2:self():uri()})
s2:add_member({uuid = s1_self:uuid(), uri = s1_self:uri()})
s1:size()
s2:size()
s1_view = s2:member_by_uuid(s1_self:uuid())
s1:set_payload({a = 100})
p = s1_self:payload()
s1_self:payload() == p
while s1_view:payload() == '' do fiber.sleep(0.01) end
p = s1_view:payload()
s1_view:payload() == p
-- Now a complex case. It is possible, that a new member's
-- incarnation is learned, but new payload is not. Payload cache
-- should correctly process that.
s1:cfg({heartbeat_rate = 1000})
s2:cfg({heartbeat_rate = 1000})
s1:set_payload({a = 200})
-- Via probe() S2 learns new incarnation of S1, but without new
-- payload.
s2:probe_member(s1_self:uri())
s1_view:payload()
s1_view:incarnation()
s1:cfg({heartbeat_rate = 0.01})
s2:cfg({heartbeat_rate = 0.01})
while s1_view:payload().a ~= 200 do fiber.sleep(0.01) end
p = s1_view:payload()
s1_view:payload() == p
p
s1_view:incarnation()
s1:delete()
s2:delete()
test_run:cmd("clear filter")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment