Skip to content
Snippets Groups Projects
Commit 0f686829 authored by Vladislav Shpilevoy's avatar Vladislav Shpilevoy Committed by Konstantin Osipov
Browse files

netbox: introduce fiber-async API

Now any netbox call blocks a caller-fiber until a result is read
from a socket, or time is out. To use it asynchronously it is
necessary to create a fiber per request. Sometimes it is
unwanted - for example if RPS is very high (for example, about
100k), and latency is about 1 second. Or when it is neccessary
to send multiple requests in parallel and then collect responses
(map-reduce).

The patch introduces a new option for all netbox requests:
is_async. With this option any called netbox method returns
immediately (but still yields for a moment) a 'future' object.

By a future object a user can check if the request is finalized,
get a result or error, wait for a timeout, discard a response.

Example of is_async usage:
future = conn:call(func, {params}, {..., is_async = true})
-- Do some work ...
if not future.is_ready() then
    result, err = future:wait_result(timeout)
end
-- Or:
result, error = future:result()

A future:result() and :wait_result() returns either an error or
a response in the same format, as the sync versions of the called
methods.

Part of #3107
parent 1766db6b
No related branches found
No related tags found
No related merge requests found
......@@ -215,12 +215,14 @@ local function create_transport(host, port, user, password, callback,
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
-- requests: requests currently 'in flight', keyed by a request id;
-- value refs are weak hence if a client dies unexpectedly,
-- GC cleans the mess. Client submits a request and waits on state_cond.
-- If the reponse arrives within the timeout, the worker wakes
-- client fiber explicitly. Otherwize, wait on state_cond completes and
-- the client reports E_TIMEOUT.
-- Async requests currently 'in flight', keyed by a request
-- id. Value refs are weak hence if a client dies
-- unexpectedly, GC cleans the mess.
-- Async request can not be timed out completely. Instead a
-- user must decide when he does not want to wait for
-- response anymore.
-- Sync requests are implemented as async call + immediate
-- wait for a result.
local requests = setmetatable({}, { __mode = 'v' })
local next_request_id = 1
......@@ -228,6 +230,81 @@ local function create_transport(host, port, user, password, callback,
local send_buf = buffer.ibuf(buffer.READAHEAD)
local recv_buf = buffer.ibuf(buffer.READAHEAD)
--
-- Async request metamethods.
--
local request_index = {}
--
-- When an async request is finalized (with ok or error - no
-- matter), its 'id' field is nullified by a response
-- dispatcher.
--
function request_index:is_ready()
return self.id == nil or worker_fiber == nil
end
--
-- When a request is finished, a result can be got from a
-- future object anytime.
-- @retval result, nil Success, the response is returned.
-- @retval nil, error Error occured.
--
function request_index:result()
if self.errno then
return nil, box.error.new({code = self.errno,
reason = self.response})
elseif not self.id then
return self.response
elseif not worker_fiber then
return nil, box.error.new(E_NO_CONNECTION)
else
return nil, box.error.new(box.error.PROC_LUA,
'Response is not ready')
end
end
--
-- Wait for a response or error max timeout seconds.
-- @param timeout Max seconds to wait.
-- @retval result, nil Success, the response is returned.
-- @retval nil, error Error occured.
--
function request_index:wait_result(timeout)
if timeout then
if type(timeout) ~= 'number' or timeout < 0 then
error('Usage: future:wait_result(timeout)')
end
else
timeout = TIMEOUT_INFINITY
end
if not self:is_ready() then
-- When a response is ready before timeout, the
-- waiting client is waked up prematurely.
while timeout > 0 and not self:is_ready() do
local ts = fiber.clock()
self.cond:wait(timeout)
timeout = timeout - (fiber.clock() - ts)
end
if not self:is_ready() then
return nil, box.error.new(E_TIMEOUT)
end
end
return self:result()
end
--
-- Make a connection forget about the response. When it will
-- be received, it will be ignored. It reduces size of
-- requests table speeding up other requests.
--
function request_index:discard()
if self.id then
requests[self.id] = nil
self.id = nil
self.errno = box.error.PROC_LUA
self.response = 'Response is discarded'
end
end
local request_mt = { __index = request_index }
-- STATE SWITCHING --
local function set_state(new_state, new_errno, new_error)
state = new_state
......@@ -237,8 +314,10 @@ local function create_transport(host, port, user, password, callback,
state_cond:broadcast()
if state == 'error' or state == 'error_reconnect' then
for _, request in pairs(requests) do
request.id = nil
request.errno = new_errno
request.response = new_error
request.cond:broadcast()
end
requests = {}
end
......@@ -316,12 +395,16 @@ local function create_transport(host, port, user, password, callback,
end
end
-- REQUEST/RESPONSE --
local function perform_request(timeout, buffer, method, schema_version, ...)
--
-- Send a request and do not wait for response.
-- @retval nil, error Error occured.
-- @retval not nil Future object.
--
local function perform_async_request(buffer, method, schema_version, ...)
if state ~= 'active' then
return last_errno or E_NO_CONNECTION, last_error
return nil, box.error.new({code = last_errno or E_NO_CONNECTION,
reason = last_error})
end
local deadline = fiber_clock() + (timeout or TIMEOUT_INFINITY)
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
if send_buf:size() == 0 then
......@@ -330,26 +413,31 @@ local function create_transport(host, port, user, password, callback,
local id = next_request_id
method_encoder[method](send_buf, id, schema_version, ...)
next_request_id = next_id(id)
local request = table_new(0, 6) -- reserve space for 6 keys
request.client = fiber_self()
-- Request has maximum 7 members:
-- method, schema_version, buffer, id, cond, errno,
-- response.
local request = setmetatable(table_new(0, 7), request_mt)
request.method = method
request.schema_version = schema_version
request.buffer = buffer
request.id = id
request.cond = fiber.cond()
requests[id] = request
repeat
local timeout = max(0, deadline - fiber_clock())
if not state_cond:wait(timeout) then
requests[id] = nil
return E_TIMEOUT, 'Timeout exceeded'
end
until requests[id] == nil -- i.e. completed (beware spurious wakeups)
return request.errno, request.response
return request
end
local function wakeup_client(client)
if client:status() ~= 'dead' then
client:wakeup()
--
-- Send a request and wait for response.
-- @retval nil, error Error occured.
-- @retval not nil Response object.
--
local function perform_request(timeout, buffer, method, schema_version, ...)
local request, err =
perform_async_request(buffer, method, schema_version, ...)
if not request then
return nil, err
end
return request:wait_result(timeout)
end
local function dispatch_response_iproto(hdr, body_rpos, body_end)
......@@ -359,6 +447,7 @@ local function create_transport(host, port, user, password, callback,
return
end
requests[id] = nil
request.id = nil
local status = hdr[IPROTO_STATUS_KEY]
local body, body_end_check
......@@ -368,7 +457,7 @@ local function create_transport(host, port, user, password, callback,
assert(body_end == body_end_check, "invalid xrow length")
request.errno = band(status, IPROTO_ERRNO_MASK)
request.response = body[IPROTO_ERROR_KEY]
wakeup_client(request.client)
request.cond:broadcast()
return
end
......@@ -379,7 +468,7 @@ local function create_transport(host, port, user, password, callback,
local wpos = buffer:alloc(body_len)
ffi.copy(wpos, body_rpos, body_len)
request.response = tonumber(body_len)
wakeup_client(request.client)
request.cond:broadcast()
return
end
......@@ -390,7 +479,7 @@ local function create_transport(host, port, user, password, callback,
request.response, request.errno =
method_decoder[request.method](body[IPROTO_DATA_KEY])
end
wakeup_client(request.client)
request.cond:broadcast()
end
local function new_request_id()
......@@ -498,9 +587,10 @@ local function create_transport(host, port, user, password, callback,
if request == nil then -- nobody is waiting for the response
return
end
request.id = nil
requests[rid] = nil
request.response = response
wakeup_client(request.client)
request.cond:broadcast()
return console_sm(next_id(rid))
end
end
......@@ -608,7 +698,8 @@ local function create_transport(host, port, user, password, callback,
stop = stop,
start = start,
wait_state = wait_state,
perform_request = perform_request
perform_request = perform_request,
perform_async_request = perform_async_request,
}
end
......@@ -865,8 +956,12 @@ function remote_methods:wait_connected(timeout)
end
function remote_methods:_request(method, opts, ...)
local this_fiber = fiber_self()
local transport = self._transport
local buffer = opts and opts.buffer
if opts and opts.is_async then
return transport.perform_async_request(buffer, method, 0, ...)
end
local this_fiber = fiber_self()
local perform_request = transport.perform_request
local wait_state = transport.wait_state
local deadline = nil
......@@ -878,7 +973,6 @@ function remote_methods:_request(method, opts, ...)
-- @deprecated since 1.7.4
deadline = self._deadlines[this_fiber]
end
local buffer = opts and opts.buffer
local err, res
repeat
local timeout = deadline and max(0, deadline - fiber_clock())
......@@ -886,15 +980,17 @@ function remote_methods:_request(method, opts, ...)
wait_state('active', timeout)
timeout = deadline and max(0, deadline - fiber_clock())
end
err, res = perform_request(timeout, buffer, method,
res, err = perform_request(timeout, buffer, method,
self.schema_version, ...)
if not err then
if err then
if err.code == E_WRONG_SCHEMA_VERSION then
err = nil
end
else
return res
elseif err == E_WRONG_SCHEMA_VERSION then
err = nil
end
until err
box.error({code = err, reason = res})
box.error(err)
end
function remote_methods:ping(opts)
......@@ -907,9 +1003,9 @@ function remote_methods:ping(opts)
timeout = deadline and max(0, deadline - fiber_clock())
or (opts and opts.timeout)
end
local err = self._transport.perform_request(timeout, nil, 'ping',
self.schema_version)
return not err or err == E_WRONG_SCHEMA_VERSION
local _, err = self._transport.perform_request(timeout, nil, 'ping',
self.schema_version)
return not err or err.code == E_WRONG_SCHEMA_VERSION
end
function remote_methods:reload_schema()
......@@ -929,7 +1025,7 @@ function remote_methods:call(func_name, args, opts)
check_call_args(args)
args = args or {}
local res = self:_request('call_17', opts, tostring(func_name), args)
if type(res) ~= 'table' then
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
......@@ -946,7 +1042,7 @@ function remote_methods:eval(code, args, opts)
check_eval_args(args)
args = args or {}
local res = self:_request('eval', opts, code, args)
if type(res) ~= 'table' then
if type(res) ~= 'table' or opts and opts.is_async then
return res
end
return unpack(res)
......@@ -1088,13 +1184,13 @@ function console_methods:eval(line, timeout)
end
if self.protocol == 'Binary' then
local loader = 'return require("console").eval(...)'
err, res = pr(timeout, nil, 'eval', nil, loader, {line})
res, err = pr(timeout, nil, 'eval', nil, loader, {line})
else
assert(self.protocol == 'Lua console')
err, res = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
res, err = pr(timeout, nil, 'inject', nil, line..'$EOF$\n')
end
if err then
box.error({code = err, reason = res})
box.error(err)
end
return res[1] or res
end
......
......@@ -2463,9 +2463,6 @@ box.internal.collation.drop('test')
space:drop()
---
...
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
---
...
c.state
---
- closed
......@@ -2473,3 +2470,527 @@ c.state
c = nil
---
...
--
-- gh-3107: fiber-async netbox.
--
cond = nil
---
...
function long_function(...) cond = fiber.cond() cond:wait() return ... end
---
...
function finalize_long() while not cond do fiber.sleep(0.01) end cond:signal() cond = nil end
---
...
s = box.schema.create_space('test')
---
...
pk = s:create_index('pk')
---
...
s:replace{1}
---
- [1]
...
s:replace{2}
---
- [2]
...
s:replace{3}
---
- [3]
...
s:replace{4}
---
- [4]
...
c = net:connect(box.cfg.listen)
---
...
--
-- Check long connections, multiple wait_result().
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
future:result()
---
- null
- Response is not ready
...
future:is_ready()
---
- false
...
future:wait_result(0.01) -- Must fail on timeout.
---
- null
- Timeout exceeded
...
finalize_long()
---
...
ret = future:wait_result(100)
---
...
future:is_ready()
---
- true
...
-- Any timeout is ok - response is received already.
future:wait_result(0)
---
- [1, 2, 3]
...
future:wait_result(0.01)
---
- [1, 2, 3]
...
ret
---
- [1, 2, 3]
...
_, err = pcall(future.wait_result, future, true)
---
...
err:find('Usage') ~= nil
---
- true
...
_, err = pcall(future.wait_result, future, '100')
---
...
err:find('Usage') ~= nil
---
- true
...
--
-- Check infinity timeout.
--
ret = nil
---
...
_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
---
...
finalize_long()
---
...
while not ret do fiber.sleep(0.01) end
---
...
ret
---
- [1, 2, 3]
...
future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
---
...
future:result()
---
- null
- Response is not ready
...
future:wait_result(0.01) -- Must fail on timeout.
---
- null
- Timeout exceeded
...
finalize_long()
---
...
future:wait_result(100)
---
- [1, 2, 3]
...
--
-- Ensure the request is garbage collected both if is not used and
-- if is.
--
gc_test = setmetatable({}, {__mode = 'v'})
---
...
gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
gc_test.future ~= nil
---
- true
...
collectgarbage()
---
- 0
...
gc_test
---
- []
...
finalize_long()
---
...
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
collectgarbage()
---
- 0
...
future ~= nil
---
- true
...
finalize_long()
---
...
future:wait_result(1000)
---
- [1, 2, 3]
...
collectgarbage()
---
- 0
...
future ~= nil
---
- true
...
gc_test.future = future
---
...
future = nil
---
...
collectgarbage()
---
- 0
...
gc_test
---
- []
...
--
-- Ensure a request can be finalized from non-caller fibers.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
ret = {}
---
...
count = 0
---
...
for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
---
...
future:wait_result(0.01) -- Must fail on timeout.
---
- null
- Timeout exceeded
...
finalize_long()
---
...
while count ~= 10 do fiber.sleep(0.1) end
---
...
ret
---
- - &0 [1, 2, 3]
- *0
- *0
- *0
- *0
- *0
- *0
- *0
- *0
- *0
...
--
-- Test space methods.
--
future = c.space.test:select({1}, {is_async = true})
---
...
ret = future:wait_result(100)
---
...
ret
---
- - [1]
...
type(ret[1])
---
- cdata
...
future = c.space.test:insert({5}, {is_async = true})
---
...
future:wait_result(100)
---
- [5]
...
s:get{5}
---
- [5]
...
future = c.space.test:replace({6}, {is_async = true})
---
...
future:wait_result(100)
---
- [6]
...
s:get{6}
---
- [6]
...
future = c.space.test:delete({6}, {is_async = true})
---
...
future:wait_result(100)
---
- [6]
...
s:get{6}
---
...
future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
---
...
future:wait_result(100)
---
- [5, 5]
...
s:get{5}
---
- [5, 5]
...
future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
---
...
future:wait_result(100)
---
- null
...
s:get{5}
---
- [5, 6]
...
future = c.space.test:get({5}, {is_async = true})
---
...
future:wait_result(100)
---
- [5, 6]
...
--
-- Test index methods.
--
future = c.space.test.index.pk:select({1}, {is_async = true})
---
...
future:wait_result(100)
---
- - [1]
...
future = c.space.test.index.pk:get({2}, {is_async = true})
---
...
future:wait_result(100)
---
- [2]
...
future = c.space.test.index.pk:min({}, {is_async = true})
---
...
future:wait_result(100)
---
- [1]
...
future = c.space.test.index.pk:max({}, {is_async = true})
---
...
future:wait_result(100)
---
- [5, 6]
...
future = c.space.test.index.pk:count({3}, {is_async = true})
---
...
future:wait_result(100)
---
- 1
...
future = c.space.test.index.pk:delete({3}, {is_async = true})
---
...
future:wait_result(100)
---
- [3]
...
s:get{3}
---
...
future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
---
...
future:wait_result(100)
---
- [4, 6]
...
s:get{4}
---
- [4, 6]
...
--
-- Test async errors.
--
future = c.space.test:insert({1}, {is_async = true})
---
...
future:wait_result()
---
- null
- Duplicate key exists in unique index 'pk' in space 'test'
...
future:result()
---
- null
- Duplicate key exists in unique index 'pk' in space 'test'
...
--
-- Test discard.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
future:discard()
---
...
finalize_long()
---
...
future:result()
---
- null
- Response is discarded
...
future:wait_result(100)
---
- null
- Response is discarded
...
--
-- Test closed connection.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
finalize_long()
---
...
future:wait_result(100)
---
- [1, 2, 3]
...
future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
c:close()
---
...
future2:wait_result(100)
---
- null
- Connection is not established
...
future2:result()
---
- null
- Connection is not established
...
future2:discard()
---
...
-- Already successful result must be available.
future:wait_result(100)
---
- [1, 2, 3]
...
future:result()
---
- [1, 2, 3]
...
future:is_ready()
---
- true
...
--
-- Test reconnect.
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
---
...
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
---
...
while not c:is_connected() do fiber.sleep(0.01) end
---
...
future:wait_result(100)
---
- null
- Peer closed
...
future:result()
---
- null
- Peer closed
...
future = c:call('long_function', {1, 2, 3}, {is_async = true})
---
...
finalize_long()
---
...
future:wait_result(100)
---
- [1, 2, 3]
...
--
-- Test raw response getting.
--
ibuf = require('buffer').ibuf()
---
...
future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
---
...
finalize_long()
---
...
future:wait_result(100)
---
- 10
...
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
---
...
result
---
- {48: [1, 2, 3]}
...
c:close()
---
...
s:drop()
---
...
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
---
...
......@@ -1004,8 +1004,193 @@ c.space.test.index.sk.parts
c:close()
box.internal.collation.drop('test')
space:drop()
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
c.state
c = nil
--
-- gh-3107: fiber-async netbox.
--
cond = nil
function long_function(...) cond = fiber.cond() cond:wait() return ... end
function finalize_long() while not cond do fiber.sleep(0.01) end cond:signal() cond = nil end
s = box.schema.create_space('test')
pk = s:create_index('pk')
s:replace{1}
s:replace{2}
s:replace{3}
s:replace{4}
c = net:connect(box.cfg.listen)
--
-- Check long connections, multiple wait_result().
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
future:result()
future:is_ready()
future:wait_result(0.01) -- Must fail on timeout.
finalize_long()
ret = future:wait_result(100)
future:is_ready()
-- Any timeout is ok - response is received already.
future:wait_result(0)
future:wait_result(0.01)
ret
_, err = pcall(future.wait_result, future, true)
err:find('Usage') ~= nil
_, err = pcall(future.wait_result, future, '100')
err:find('Usage') ~= nil
--
-- Check infinity timeout.
--
ret = nil
_ = fiber.create(function() ret = c:call('long_function', {1, 2, 3}, {is_async = true}):wait_result() end)
finalize_long()
while not ret do fiber.sleep(0.01) end
ret
future = c:eval('return long_function(...)', {1, 2, 3}, {is_async = true})
future:result()
future:wait_result(0.01) -- Must fail on timeout.
finalize_long()
future:wait_result(100)
--
-- Ensure the request is garbage collected both if is not used and
-- if is.
--
gc_test = setmetatable({}, {__mode = 'v'})
gc_test.future = c:call('long_function', {1, 2, 3}, {is_async = true})
gc_test.future ~= nil
collectgarbage()
gc_test
finalize_long()
future = c:call('long_function', {1, 2, 3}, {is_async = true})
collectgarbage()
future ~= nil
finalize_long()
future:wait_result(1000)
collectgarbage()
future ~= nil
gc_test.future = future
future = nil
collectgarbage()
gc_test
--
-- Ensure a request can be finalized from non-caller fibers.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
ret = {}
count = 0
for i = 1, 10 do fiber.create(function() ret[i] = future:wait_result(1000) count = count + 1 end) end
future:wait_result(0.01) -- Must fail on timeout.
finalize_long()
while count ~= 10 do fiber.sleep(0.1) end
ret
--
-- Test space methods.
--
future = c.space.test:select({1}, {is_async = true})
ret = future:wait_result(100)
ret
type(ret[1])
future = c.space.test:insert({5}, {is_async = true})
future:wait_result(100)
s:get{5}
future = c.space.test:replace({6}, {is_async = true})
future:wait_result(100)
s:get{6}
future = c.space.test:delete({6}, {is_async = true})
future:wait_result(100)
s:get{6}
future = c.space.test:update({5}, {{'=', 2, 5}}, {is_async = true})
future:wait_result(100)
s:get{5}
future = c.space.test:upsert({5}, {{'=', 2, 6}}, {is_async = true})
future:wait_result(100)
s:get{5}
future = c.space.test:get({5}, {is_async = true})
future:wait_result(100)
--
-- Test index methods.
--
future = c.space.test.index.pk:select({1}, {is_async = true})
future:wait_result(100)
future = c.space.test.index.pk:get({2}, {is_async = true})
future:wait_result(100)
future = c.space.test.index.pk:min({}, {is_async = true})
future:wait_result(100)
future = c.space.test.index.pk:max({}, {is_async = true})
future:wait_result(100)
future = c.space.test.index.pk:count({3}, {is_async = true})
future:wait_result(100)
future = c.space.test.index.pk:delete({3}, {is_async = true})
future:wait_result(100)
s:get{3}
future = c.space.test.index.pk:update({4}, {{'=', 2, 6}}, {is_async = true})
future:wait_result(100)
s:get{4}
--
-- Test async errors.
--
future = c.space.test:insert({1}, {is_async = true})
future:wait_result()
future:result()
--
-- Test discard.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
future:discard()
finalize_long()
future:result()
future:wait_result(100)
--
-- Test closed connection.
--
future = c:call('long_function', {1, 2, 3}, {is_async = true})
finalize_long()
future:wait_result(100)
future2 = c:call('long_function', {1, 2, 3}, {is_async = true})
c:close()
future2:wait_result(100)
future2:result()
future2:discard()
-- Already successful result must be available.
future:wait_result(100)
future:result()
future:is_ready()
--
-- Test reconnect.
--
c = net:connect(box.cfg.listen, {reconnect_after = 0.01})
future = c:call('long_function', {1, 2, 3}, {is_async = true})
_ = c._transport.perform_request(nil, nil, 'inject', nil, '\x80')
while not c:is_connected() do fiber.sleep(0.01) end
future:wait_result(100)
future:result()
future = c:call('long_function', {1, 2, 3}, {is_async = true})
finalize_long()
future:wait_result(100)
--
-- Test raw response getting.
--
ibuf = require('buffer').ibuf()
future = c:call('long_function', {1, 2, 3}, {is_async = true, buffer = ibuf})
finalize_long()
future:wait_result(100)
result, ibuf.rpos = msgpack.decode_unchecked(ibuf.rpos)
result
c:close()
s:drop()
box.schema.user.revoke('guest', 'read,write,execute', 'universe')
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment