diff --git a/src/lua/swim.lua b/src/lua/swim.lua index c828faceb05e94cbd85b45b663f2c4571e4fd0c6..9cc98352f961921ca489fac71197f5bef4a39767 100644 --- a/src/lua/swim.lua +++ b/src/lua/swim.lua @@ -1,5 +1,7 @@ local ffi = require('ffi') local uuid = require('uuid') +local buffer = require('buffer') +local msgpack = require('msgpack') ffi.cdef[[ struct swim; @@ -100,6 +102,14 @@ ffi.cdef[[ local capi = ffi.C local swim_t = ffi.typeof('struct swim *') +local swim_member_t = ffi.typeof('struct swim_member *') + +local swim_member_status_strs = { + [capi.MEMBER_ALIVE] = 'alive', + [capi.MEMBER_SUSPECTED] = 'suspected', + [capi.MEMBER_DEAD] = 'dead', + [capi.MEMBER_LEFT] = 'left' +} -- -- Check if @a value is something that can be passed as a @@ -193,6 +203,50 @@ local function swim_check_uuid(value, func_name) return value end +-- +-- Check if @a value is something that can be passed as const +-- char *, as binary data. It should be either string, or cdata. +-- @param value Value to check if can be converted to +-- const char *. +-- @param size Size of @a value in bytes. In case of cdata the +-- argument is mandatory. In case of string @a size is +-- optional and can be <= @a value length. +-- @param func_name Caller function name to include into an error +-- message. +-- @param param_name Parameter name to include into an error +-- message. Examples: 'payload', 'key'. +-- @return Value compatible with const char *, and size in bytes. +-- +local function swim_check_const_char(value, size, func_name, param_name) + if size ~= nil and type(size) ~= 'number' then + return error(func_name..': expected number '..param_name..' size') + end + if type(value) == 'cdata' then + if not size then + return error(func_name..': size is mandatory for cdata '.. + param_name) + end + value = ffi.cast('const char *', value) + elseif type(value) == 'string' then + if not size then + size = value:len() + elseif size > value:len() then + return error(func_name..': explicit '..param_name.. + ' size > string length') + end + elseif value == nil then + if size then + return error(func_name..': size can not be set without '.. + param_name) + end + size = 0 + else + return error(func_name..': '..param_name..' should be either string '.. + 'or cdata') + end + return value, size +end + -- -- Check if @a s is a SWIM instance. It should be a table with -- cdata struct swim in 'ptr' attribute. Throws on invalid type. @@ -212,6 +266,142 @@ local function swim_check_instance(s, func_name) return error(func_name..': first argument is not a SWIM instance') end +-- +-- The same for SWIM member. +-- +local function swim_check_member(m, func_name) + if type(m) == 'table' then + local ptr = m.ptr + if ffi.istype(swim_member_t, ptr) then + return ptr + end + end + return error(func_name..': first argument is not a SWIM member') +end + +-- +-- Member methods. Most of them are one-liners, not much to +-- comment. +-- + +local function swim_member_status(m) + local ptr = swim_check_member(m, 'member:status()') + return swim_member_status_strs[tonumber(capi.swim_member_status(ptr))] +end + +local function swim_member_uri(m) + local ptr = swim_check_member(m, 'member:uri()') + return ffi.string(capi.swim_member_uri(ptr)) +end + +local function swim_member_incarnation(m) + local ptr = swim_check_member(m, 'member:incarnation()') + return capi.swim_member_incarnation(ptr) +end + +local function swim_member_is_dropped(m) + local ptr = swim_check_member(m, 'member:is_dropped()') + return capi.swim_member_is_dropped(ptr) +end + +local function swim_member_payload_raw(ptr) + local int = buffer.reg1.ai + local cdata = capi.swim_member_payload(ptr, int) + return cdata, int[0] +end + +-- +-- Payload can be bigger than KB, and probably it is undesirable +-- to copy it into a Lua string or decode MessagePack into a +-- Lua object. This method is the cheapest way of taking payload. +-- +local function swim_member_payload_cdata(m) + local ptr = swim_check_member(m, 'member:payload_cdata()') + return swim_member_payload_raw(ptr) +end + +-- +-- Cdata requires to keep explicit size, besides not all user +-- methods can be able to work with cdata. This is why it may be +-- needed to take payload as a string - text or binary. +-- +local function swim_member_payload_str(m) + local ptr = swim_check_member(m, 'member:payload_str()') + return ffi.string(swim_member_payload_raw(ptr)) +end + +-- +-- Since this is a Lua module, a user is likely to use Lua objects +-- as a payload - tables, numbers, string etc. And it is natural +-- to expect that member:payload() should return the same object +-- which was passed into swim:set_payload() on another instance. +-- This member method tries to interpret payload as MessagePack, +-- and if fails, returns the payload as a string. +-- +local function swim_member_payload(m) + local ptr = swim_check_member(m, 'member:payload()') + local cdata, size = swim_member_payload_raw(ptr) + if size == 0 then + return '' + end + local ok, res = pcall(msgpack.decode, cdata, size) + if not ok then + return ffi.string(cdata, size) + end + return res +end + +-- +-- Cdata UUID. It is ok to return cdata, because struct tt_uuid +-- type has strong support by 'uuid' Lua module with nice +-- metatable, serialization, string conversions etc. +-- +local function swim_member_uuid(m) + return capi.swim_member_uuid(swim_check_member(m, 'member:uuid()')) +end + +local function swim_member_serialize(m) + local _, size = swim_member_payload_raw(m.ptr) + return { + status = swim_member_status(m), + uuid = swim_member_uuid(m), + uri = swim_member_uri(m), + incarnation = swim_member_incarnation(m), + -- There are many ways to interpret a payload, and it is + -- not a job of a serialization method. Only binary size + -- here is returned to allow a user to detect, whether a + -- payload exists. + payload_size = size, + } +end + +local swim_member_mt = { + __index = { + status = swim_member_status, + uuid = swim_member_uuid, + uri = swim_member_uri, + incarnation = swim_member_incarnation, + payload_cdata = swim_member_payload_cdata, + payload_str = swim_member_payload_str, + payload = swim_member_payload, + is_dropped = swim_member_is_dropped, + }, + __serialize = swim_member_serialize, + __newindex = function(m) + return error('swim_member is a read-only object') + end +} + +-- +-- Wrap a SWIM member into a table with proper metamethods. Also +-- it is going to be used to cache a decoded payload. +-- +local function swim_member_wrap(ptr) + capi.swim_member_ref(ptr) + ffi.gc(ptr, capi.swim_member_unref) + return setmetatable({ptr = ptr}, swim_member_mt) +end + -- -- When a SWIM instance is deleted or has quited, it can't be used -- anymore. This function replaces all methods of a deleted @@ -339,6 +529,62 @@ local function swim_broadcast(s, port) return true end +-- +-- Shortcut to get the self member in O(1) not making a lookup +-- into the member table. +-- +local function swim_self(s) + return swim_member_wrap(capi.swim_self(swim_check_instance(s, 'swim:self'))) +end + +-- +-- Find a member by UUID in the local member table. +-- +local function swim_member_by_uuid(s, uuid) + local func_name = 'swim:member_by_uuid' + local ptr = swim_check_instance(s, func_name) + uuid = swim_check_uuid(uuid, func_name) + local m = capi.swim_member_by_uuid(ptr, uuid) + if m == nil then + return nil + end + return swim_member_wrap(m) +end + +-- +-- Set raw payload without any preprocessing nor encoding. It can +-- be anything, not necessary MessagePack. +-- +local function swim_set_payload_raw(s, payload, payload_size) + local func_name = 'swim:set_payload_raw' + local ptr = swim_check_instance(s, func_name) + payload, payload_size = + swim_check_const_char(payload, payload_size, func_name, 'payload') + if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then + return nil, box.error.last() + end + return true +end + +-- +-- Set Lua object as a payload. It is encoded into MessagePack. +-- +local function swim_set_payload(s, payload) + local func_name = 'swim:set_payload' + local ptr = swim_check_instance(s, func_name) + local payload_size = 0 + if payload ~= nil then + local buf = buffer.IBUF_SHARED + buf:reset() + payload_size = msgpack.encode(payload, buf) + payload = buf.rpos + end + if capi.swim_set_payload(ptr, payload, payload_size) ~= 0 then + return nil, box.error.last() + end + return true +end + -- -- Normal metatable of a configured SWIM instance. -- @@ -352,6 +598,10 @@ local swim_mt = { add_member = swim_add_member, remove_member = swim_remove_member, broadcast = swim_broadcast, + self = swim_self, + member_by_uuid = swim_member_by_uuid, + set_payload_raw = swim_set_payload_raw, + set_payload = swim_set_payload, }, __serialize = swim_serialize } diff --git a/test/swim/swim.result b/test/swim/swim.result index eb234115556692fd1d8ca16c48b4bddffd502b85..5caab9f11e3687f74566a3a666b1391b6378413e 100644 --- a/test/swim/swim.result +++ b/test/swim/swim.result @@ -5,6 +5,16 @@ test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '") --- - true ... +test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'") +--- +- true +... +msgpack = require('msgpack') +--- +... +ffi = require('ffi') +--- +... -- -- gh-3234: SWIM gossip protocol. -- @@ -104,7 +114,7 @@ s:size() s.cfg --- - uuid: 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1:0 + uri: 127.0.0.1:<port> ... s.cfg.gc_mode = 'off' --- @@ -119,7 +129,7 @@ s.cfg --- - gc_mode: off uuid: 00000000-0000-1000-8000-000000000001 - uri: 127.0.0.1:0 + uri: 127.0.0.1:<port> ... s.cfg.gc_mode --- @@ -299,6 +309,18 @@ s1:cfg({uuid = uuid(3)}) --- - true ... +s1:self():uuid() +--- +- 00000000-0000-1000-8000-000000000003 +... +s1:member_by_uuid(uuid(1)) +--- +- uri: 127.0.0.1:<port> + status: left + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 0 +... -- Can't remove self. s1:remove_member(uuid(3)) --- @@ -366,6 +388,365 @@ s1:delete() s2:delete() --- ... +-- +-- Member API. +-- +s1 = swim.new({uuid = uuid(1), uri = uri()}) +--- +... +s = s1:self() +--- +... +s +--- +- uri: 127.0.0.1:<port> + status: alive + incarnation: 1 + uuid: 00000000-0000-1000-8000-000000000001 + payload_size: 0 +... +s:status() +--- +- alive +... +s:uuid() +--- +- 00000000-0000-1000-8000-000000000001 +... +s:uri() +--- +- 127.0.0.1:<port> +... +s:incarnation() +--- +- 1 +... +s:payload_cdata() +--- +- 'cdata<const char *>: NULL' +- 0 +... +s:payload_str() +--- +- +... +s:payload() +--- +- +... +s:is_dropped() +--- +- false +... +s.unknown_index +--- +- null +... +s.status() +--- +- error: 'builtin/swim.lua:<line>: member:status(): first argument is not a SWIM member' +... +s.uuid() +--- +- error: 'builtin/swim.lua:<line>: member:uuid(): first argument is not a SWIM member' +... +s.uri() +--- +- error: 'builtin/swim.lua:<line>: member:uri(): first argument is not a SWIM member' +... +s.incarnation() +--- +- error: 'builtin/swim.lua:<line>: member:incarnation(): first argument is not a SWIM + member' +... +s.payload_cdata() +--- +- error: 'builtin/swim.lua:<line>: member:payload_cdata(): first argument is not a SWIM + member' +... +s.payload_str() +--- +- error: 'builtin/swim.lua:<line>: member:payload_str(): first argument is not a SWIM + member' +... +s.payload() +--- +- error: 'builtin/swim.lua:<line>: member:payload(): first argument is not a SWIM member' +... +s.is_dropped() +--- +- error: 'builtin/swim.lua:<line>: member:is_dropped(): first argument is not a SWIM + member' +... +s1:member_by_uuid(uuid(1)) ~= nil +--- +- true +... +s1:member_by_uuid(50) +--- +- error: 'builtin/swim.lua:<line>: swim:member_by_uuid: expected string UUID' +... +s1:member_by_uuid(uuid(2)) +--- +- null +... +s1:quit() +--- +... +s:status() +--- +- left +... +s:is_dropped() +--- +- true +... +-- +-- Payload. +-- +s = swim.new({uuid = uuid(1), uri = uri()}) +--- +... +s.set_payload() +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload: first argument is not a SWIM instance' +... +s.set_payload_raw() +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: first argument is not a SWIM + instance' +... +self = s:self() +--- +... +s:set_payload() +--- +- true +... +self:payload() +--- +- +... +s:set_payload({a = 100}) +--- +- true +... +self:payload() +--- +- {'a': 100} +... +s:set_payload(100) +--- +- true +... +self:payload() +--- +- 100 +... +s:set_payload(false) +--- +- true +... +self:payload() +--- +- false +... +p = self:payload_str() +--- +... +p +--- +- !!binary wg== +... +(msgpack.decode(p)) +--- +- false +... +p, size = self:payload_cdata() +--- +... +type(p) +--- +- cdata +... +size +--- +- 1 +... +(msgpack.decode(p, size)) +--- +- false +... +s:set_payload(string.rep('a', 1500)) +--- +- null +- Payload should be <= 1200 and >= 0 +... +self:payload() +--- +- false +... +s:set_payload() +--- +- true +... +self:payload() +--- +- +... +s:set_payload(100) +--- +- true +... +self:payload() +--- +- 100 +... +s:set_payload_raw() +--- +- true +... +self:payload() +--- +- +... +-- Raw payload setting can be used when MessagePack is not needed, +-- or already encoded. +s:set_payload_raw(nil, '123') +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: expected number payload size' +... +s:set_payload_raw('123', -1) +--- +- null +- Payload should be <= 1200 and >= 0 +... +size = 10 +--- +... +cdata = ffi.new('int[?]', size) +--- +... +for i = 0, size - 1 do cdata[i] = i end +--- +... +bsize = ffi.sizeof('int') * size +--- +... +s:set_payload_raw(cdata) +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: size is mandatory for cdata + payload' +... +s:set_payload_raw('str', 4) +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: explicit payload size > string + length' +... +s:set_payload_raw(true) +--- +- error: 'builtin/swim.lua:<line>: swim:set_payload_raw: payload should be either string + or cdata' +... +s:set_payload_raw(cdata, bsize) +--- +- true +... +self:payload_str():len() == bsize +--- +- true +... +self_cdata, self_bsize = self:payload_cdata() +--- +... +self_bsize == bsize +--- +- true +... +self_cdata = ffi.cast('int *', self_cdata) +--- +... +for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end +--- +... +s:set_payload_raw('raw str') +--- +- true +... +self:payload_str() +--- +- raw str +... +s:set_payload_raw('raw str', 3) +--- +- true +... +self:payload_str() +--- +- raw +... +s:delete() +--- +... +self:is_dropped() +--- +- true +... +-- +-- Check payload dissemination. +-- +s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}) +--- +... +s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01}) +--- +... +s1:add_member({uuid = uuid(2), uri = listen_port}) +--- +- true +... +while s2:size() ~= 2 do fiber.sleep(0.01) end +--- +... +s1_view = s2:member_by_uuid(uuid(1)) +--- +... +s1_view:payload() +--- +- +... +s1_view:incarnation() +--- +- 1 +... +s1:set_payload('payload') +--- +- true +... +while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end +--- +... +s1_view:incarnation() +--- +- 2 +... +s1:set_payload('payload2') +--- +- true +... +while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end +--- +... +s1_view:incarnation() +--- +- 3 +... +s1:delete() +--- +... +s2:delete() +--- +... test_run:cmd("clear filter") --- - true diff --git a/test/swim/swim.test.lua b/test/swim/swim.test.lua index d3667376e4f99e66471240b69f6ba050ec9aab97..befd709167de2eac61daa015c6c70ac750e94364 100644 --- a/test/swim/swim.test.lua +++ b/test/swim/swim.test.lua @@ -1,5 +1,8 @@ test_run = require('test_run').new() test_run:cmd("push filter '\\.lua.*:[0-9]+: ' to '.lua:<line>: '") +test_run:cmd("push filter '127.0.0.1:[0-9]+$' to '127.0.0.1:<port>'") +msgpack = require('msgpack') +ffi = require('ffi') -- -- gh-3234: SWIM gossip protocol. -- @@ -101,6 +104,8 @@ s1:add_member({uri = listen_uri, uuid = uuid(2)}) s1:size() s1:cfg({uuid = uuid(3)}) +s1:self():uuid() +s1:member_by_uuid(uuid(1)) -- Can't remove self. s1:remove_member(uuid(3)) -- Not existing. @@ -126,4 +131,123 @@ s2:size() s1:delete() s2:delete() +-- +-- Member API. +-- +s1 = swim.new({uuid = uuid(1), uri = uri()}) +s = s1:self() +s +s:status() +s:uuid() +s:uri() +s:incarnation() +s:payload_cdata() +s:payload_str() +s:payload() +s:is_dropped() +s.unknown_index + +s.status() +s.uuid() +s.uri() +s.incarnation() +s.payload_cdata() +s.payload_str() +s.payload() +s.is_dropped() + +s1:member_by_uuid(uuid(1)) ~= nil +s1:member_by_uuid(50) +s1:member_by_uuid(uuid(2)) + +s1:quit() +s:status() +s:is_dropped() + +-- +-- Payload. +-- +s = swim.new({uuid = uuid(1), uri = uri()}) +s.set_payload() +s.set_payload_raw() + +self = s:self() +s:set_payload() +self:payload() +s:set_payload({a = 100}) +self:payload() +s:set_payload(100) +self:payload() +s:set_payload(false) +self:payload() + +p = self:payload_str() +p +(msgpack.decode(p)) + +p, size = self:payload_cdata() +type(p) +size +(msgpack.decode(p, size)) + +s:set_payload(string.rep('a', 1500)) +self:payload() +s:set_payload() +self:payload() + +s:set_payload(100) +self:payload() +s:set_payload_raw() +self:payload() + +-- Raw payload setting can be used when MessagePack is not needed, +-- or already encoded. +s:set_payload_raw(nil, '123') +s:set_payload_raw('123', -1) + +size = 10 +cdata = ffi.new('int[?]', size) +for i = 0, size - 1 do cdata[i] = i end +bsize = ffi.sizeof('int') * size +s:set_payload_raw(cdata) +s:set_payload_raw('str', 4) +s:set_payload_raw(true) + +s:set_payload_raw(cdata, bsize) +self:payload_str():len() == bsize +self_cdata, self_bsize = self:payload_cdata() +self_bsize == bsize +self_cdata = ffi.cast('int *', self_cdata) +for i = 0, size - 1 do assert(self_cdata[i] == cdata[i]) end + +s:set_payload_raw('raw str') +self:payload_str() +s:set_payload_raw('raw str', 3) +self:payload_str() + +s:delete() +self:is_dropped() + +-- +-- Check payload dissemination. +-- +s1 = swim.new({uuid = uuid(1), uri = uri(), heartbeat_rate = 0.01}) +s2 = swim.new({uuid = uuid(2), uri = listen_port, heartbeat_rate = 0.01}) +s1:add_member({uuid = uuid(2), uri = listen_port}) +while s2:size() ~= 2 do fiber.sleep(0.01) end +s1_view = s2:member_by_uuid(uuid(1)) +s1_view:payload() +s1_view:incarnation() + +s1:set_payload('payload') +while s1_view:payload() ~= 'payload' do fiber.sleep(0.01) end +s1_view:incarnation() + +s1:set_payload('payload2') +while s1_view:payload() ~= 'payload2' do fiber.sleep(0.01) end +s1_view:incarnation() + +s1:delete() +s2:delete() + test_run:cmd("clear filter")