Skip to content
Snippets Groups Projects
Commit ef1bd18c authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

net.box: add comments, update style

Rename connection option 'legacy_call' to 'call_16'.
If you live long enough you know that even shiniest and brightiest
call's time may come.
parent 3b81e959
No related branches found
No related tags found
No related merge requests found
......@@ -450,10 +450,19 @@ netbox_decode_greeting(lua_State *L)
return 1;
}
/*
/**
* communicate(fd, send_buf, recv_buf, limit_or_boundary, timeout)
* -> errno, error
* -> nil, limit/boundary_pos
*
* The need for this function arises from not wanting to
* have more than one watcher for a single fd, and thus issue
* redundant epoll_ctl(EPOLLCTL_ADD) for it when doing both
* reading and writing.
*
* Instead, this function takes an fd, input and output buffer,
* and does sending and receiving on it in a single event loop
* interaction.
*/
static int
netbox_communicate(lua_State *L)
......
......@@ -66,7 +66,7 @@ local method_codec = {
local limit = tonumber(opts and opts.limit) or 0xFFFFFFFF
local offset = tonumber(opts and opts.offset) or 0
local key_is_nil = (key == nil or
(type(key) == 'table' and #key == 0))
encode_select(buf, id, schema_id, spaceno, indexno,
......@@ -115,7 +115,14 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
-- (any_state, but [error]) -> [closed]
--
--
-- Events / queries delivered via a callback:
-- State change events can be delivered to the transport user via
-- the optional 'callback' argument:
--
-- The callback functions needs to have the following signature:
--
-- callback(event_name, ...)
--
-- The following events are delivered, with arguments:
--
-- 'state_changed', state, errno, error
-- 'handshake', greeting -> nil (accept) / errno, error (reject)
......@@ -123,7 +130,8 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
-- 'did_fetch_schema', schema_id, spaces, indices
-- 'will_reconnect', errno, error -> true (approve) / false (reject)
--
-- Suggestion: sleep a few secs before approving reconnect.
-- Suggestion for callback writers: sleep a few secs before approving
-- reconnect.
--
local function create_transport(host, port, user, password, callback)
-- check / normalize credentials
......@@ -138,7 +146,7 @@ local function create_transport(host, port, user, password, callback)
local last_errno
local last_error
local state_cond = fiber.cond() -- signaled when the state changes
-- requests: requests currently 'in flight', keyed by a request id;
-- value refs are weak hence if a client dies unexpectedly,
-- GC cleans the mess. Client submits a request and waits on state_cond.
......@@ -146,7 +154,7 @@ local function create_transport(host, port, user, password, callback)
-- client fiber explicitly. Otherwize, wait on state_cond completes and
-- the client reports E_TIMEOUT.
local requests = setmetatable({}, { __mode = 'v' })
local requests_next_id = 1
local next_request_id = 1
local worker_fiber
local connection
......@@ -161,7 +169,7 @@ local function create_transport(host, port, user, password, callback)
callback('state_changed', new_state, new_errno, new_error)
state_cond:broadcast()
if state ~= 'active' then
-- cancel all request but the ones bearing the particular
-- cancel all requests but the ones bearing the particular
-- schema id; if schema id was omitted or we aren't fetching
-- schema, cancel everything
if not schema_id or state ~= 'fetch_schema' then
......@@ -202,7 +210,10 @@ local function create_transport(host, port, user, password, callback)
if not (ok or is_final_state[state]) then
set_state('error', E_UNKNOWN, err)
end
if connection then connection:close(); connection = nil end
if connection then
connection:close()
connection = nil
end
send_buf:recycle()
recv_buf:recycle()
worker_fiber = nil
......@@ -214,7 +225,10 @@ local function create_transport(host, port, user, password, callback)
if not is_final_state[state] then
set_state('closed', E_NO_CONNECTION, 'Connection closed')
end
if worker_fiber then worker_fiber:cancel(); worker_fiber = nil end
if worker_fiber then
worker_fiber:cancel()
worker_fiber = nil
end
end
-- REQUEST/RESPONSE --
......@@ -225,10 +239,12 @@ local function create_transport(host, port, user, password, callback)
local deadline = fiber_time() + (timeout or TIMEOUT_INFINITY)
-- alert worker to notify it of the queued outgoing data;
-- if the buffer wasn't empty, assume the worker was already alerted
if send_buf:size() == 0 then worker_fiber:wakeup() end
local id = requests_next_id
if send_buf:size() == 0 then
worker_fiber:wakeup()
end
local id = next_request_id
method_codec[method](send_buf, id, schema_id, ...)
requests_next_id = next_id(id)
next_request_id = next_id(id)
local request = table_new(0, 5) -- reserve space for 5 keys
request.client = fiber_self()
request.method = method
......@@ -266,7 +282,8 @@ local function create_transport(host, port, user, password, callback)
end
local function new_request_id()
local id = requests_next_id; requests_next_id = next_id(id)
local id = next_request_id;
next_request_id = next_id(id)
return id
end
......@@ -319,16 +336,20 @@ local function create_transport(host, port, user, password, callback)
end
local size = IPROTO_GREETING_SIZE
local err, msg = send_and_recv(size, 0.3)
if err then return error_sm(err, msg) end
if err then
return error_sm(err, msg)
end
local g = decode_greeting(ffi.string(recv_buf.rpos, size))
recv_buf.rpos = recv_buf.rpos + size
if not g then
return error_sm(E_NO_CONNECTION, 'Can\'t decode handshake')
end
err, msg = callback('handshake', g)
if err then return error_sm(err, msg) end
if err then
return error_sm(err, msg)
end
if g.protocol == 'Lua console' then
local rid = requests_next_id
local rid = next_request_id
set_state('active')
return console_sm(rid)
elseif g.protocol == 'Binary' then
......@@ -377,7 +398,9 @@ local function create_transport(host, port, user, password, callback)
local select1_id = new_request_id()
local select2_id = new_request_id()
local response = {}
-- fetch everything from space _vspace, 2 = ITER_ALL
encode_select(send_buf, select1_id, nil, VSPACE_ID, 0, 2, 0, 0xFFFFFFFF, nil)
-- fetch everything from space _vindex, 2 = ITER_ALL
encode_select(send_buf, select2_id, nil, VINDEX_ID, 0, 2, 0, 0xFFFFFFFF, nil)
schema_id = nil -- any schema_id will do provided that
-- it is consistent across responses
......@@ -555,7 +578,7 @@ local function connect(...)
if opts.wait_connected ~= false then
remote._transport.wait_state('active', tonumber(opts.wait_connected))
end
if opts.legacy_call then remote.call = remote.call_16 end
if opts.call_16 then remote.call = remote.call_16 end
return remote
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment