diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 91a66393e41a55ba278750abc917f3cba8ee7e7a..a4788430ca1e69bfb43a904d43d5cce875db4d5f 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -25,6 +25,7 @@ lua_source(lua_sources lua/console.lua) lua_source(lua_sources lua/bsdsocket.lua) lua_source(lua_sources lua/errno.lua) lua_source(lua_sources lua/log.lua) +lua_source(lua_sources lua/box_net_box.lua) file(MAKE_DIRECTORY ${CMAKE_BINARY_DIR}/third_party/luafun) lua_source(lua_sources ../third_party/luafun/fun.lua) diff --git a/src/box/CMakeLists.txt b/src/box/CMakeLists.txt index 4916bcde7568318ab43f870733f33d41706d6fef..31067cc95a445324da0755bc3282a113b562664c 100644 --- a/src/box/CMakeLists.txt +++ b/src/box/CMakeLists.txt @@ -5,7 +5,6 @@ include_directories(${SOPHIA_INCLUDE_DIR}) set(lua_sources) lua_source(lua_sources lua/load_cfg.lua) lua_source(lua_sources lua/schema.lua) -lua_source(lua_sources lua/box_net.lua) lua_source(lua_sources lua/tuple.lua) set(bin_sources) bin_source(bin_sources bootstrap.snap bootstrap.h) diff --git a/src/box/lua/box_net.lua b/src/box/lua/box_net.lua deleted file mode 100644 index 55d7a1a35431ee6ec17d7d2ef210cd14518c7123..0000000000000000000000000000000000000000 --- a/src/box/lua/box_net.lua +++ /dev/null @@ -1,572 +0,0 @@ --- box_net.lua (internal file) - -(function() - -local msgpack = require('msgpack') -local fiber = require('fiber') -local socket = require('socket') -local internal = require('box.internal') - -local function keify(key) - if key == nil then - return {} - end - if type(key) == "table" then - return key - end - return {key} -end - - -local function sprintf(fmt, ...) return string.format(fmt, ...) end -local function printf(fmt, ...) return print(sprintf(fmt, ...)) end -local function errorf(fmt, ...) error(sprintf(fmt, ...)) end - - - - -local function rpc_call(r, slf, ...) - - local path = rawget(r, 'path') - local args = { slf, ... } - if type(slf) == 'table' then - if rawget(slf, 'path') ~= nil and rawget(slf, 'r') ~= nil then - path = slf.path .. ':' .. r.method - args = { ... } - end - end - - - return r.r:call(path, unpack(args)) -end - -local function rpc_index(r, name) - local o = { r = rawget(r, 'r') } - local path = rawget(r, 'path') - if path == nil then - path = name - else - if type(name) == 'string' then - path = sprintf("%s.%s", path, name) - elseif type(name) == 'number' then - path = sprintf("%s[%s]", path, name) - else - errorf("Wrong path subitem: %s", tostring(name)) - end - end - o.path = path - o.method = name - - setmetatable(o, { - __index = rpc_index, - __call = rpc_call, - }) - - rawset(r, name, o) - return o -end - - - -box.net = { - --- --- The idea of box.net.box implementation is that --- most calls are simply wrappers around 'process' --- function. The embedded 'process' function sends --- requests to the local server, the remote 'process' --- routes requests to a remote. --- - box = { - PING = 64, - SELECT = 1, - INSERT = 2, - REPLACE = 3, - UPDATE = 4, - DELETE = 5, - CALL = 6, - - TYPE = 0x00, - SYNC = 0x01, - SPACE_ID = 0x10, - INDEX_ID = 0x11, - LIMIT = 0x12, - OFFSET = 0x13, - ITERATOR = 0x14, - KEY = 0x20, - TUPLE = 0x21, - FUNCTION_NAME = 0x22, - DATA = 0x30, - ERROR = 0x31, - GREETING_SIZE = 128, - - delete = function(self, space, key) - local t = self:process(box.net.box.DELETE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.KEY] = keify(key) - })) - if t[1] ~= nil then - return t[1] - else - return - end - end, - - replace = function(self, space, tuple) - local t = self:process(box.net.box.REPLACE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.TUPLE] = tuple - })) - if t[1] ~= nil then - return t[1] - else - return - end - end, - - -- insert a tuple (produces an error if the tuple already exists) - insert = function(self, space, tuple) - local t = self:process(box.net.box.INSERT, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.TUPLE] = tuple - })) - if t[1] ~= nil then - return t[1] - else - return - end - end, - - -- update a tuple - update = function(self, space, key, ops) - local t = self:process(box.net.box.UPDATE, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.KEY] = keify(key), - [box.net.box.TUPLE] = ops - })) - if t[1] ~= nil then - return t[1] - else - return - end - end, - - get = function(self, space, key) - key = keify(key) - local result = self:process(box.net.box.SELECT, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.KEY] = key, - [box.net.box.ITERATOR] = box.index.EQ, - [box.net.box.OFFSET] = 0, - [box.net.box.LIMIT] = 2 - })) - if #result == 0 then - return - elseif #result == 1 then - return result[1] - else - box.raise(box.error.MORE_THAN_ONE_TUPLE, - "More than one tuple found without 'limit'") - end - end, - - select = function(self, space, key, opts) - local offset = 0 - local limit = 4294967295 - local iterator = box.index.EQ - - key = keify(key) - if #key == 0 then - iterator = box.index.ALL - end - - if opts ~= nil then - if opts.offset ~= nil then - offset = tonumber(opts.offset) - end - if type(opts.iterator) == "string" then - opts.iterator = box.index[opts.iterator] - end - if opts.iterator ~= nil then - iterator = tonumber(opts.iterator) - end - if opts.limit ~= nil then - limit = tonumber(opts.limit) - end - end - local result = self:process(box.net.box.SELECT, - msgpack.encode({ - [box.net.box.SPACE_ID] = space, - [box.net.box.KEY] = key, - [box.net.box.ITERATOR] = iterator, - [box.net.box.OFFSET] = offset, - [box.net.box.LIMIT] = limit - })) - return result - end, - - ping = function(self) - return self:process(box.net.box.PING, '') - end, - - call = function(self, name, ...) - assert(type(name) == 'string') - return self:process(box.net.box.CALL, - msgpack.encode({ - [box.net.box.FUNCTION_NAME] = name, - [box.net.box.TUPLE] = {...}})) - end, - - -- To make use of timeouts safe across multiple - -- concurrent fibers do not store timeouts as - -- part of conection state, but put it inside - -- a helper object. - - timeout = function(self, timeout) - - local wrapper = {} - - setmetatable(wrapper, { - __index = function(wrp, name, ...) - local func = self[name] - if func ~= nil then - return - function(wr, ...) - self.request_timeout = timeout - return func(self, ...) - end - end - - errorf('Can not find "box.net.box.%s" function', name) - end - }); - - return wrapper - end, - }, - - - -- local tarantool - self = { - process = function(self, ...) - return internal.process(...) - end, - - -- for compatibility with the networked version, - -- implement call - call = function(self, proc_name, ...) - local proc = { internal.call_loadproc(proc_name) } - if #proc == 2 then - return { proc[1](proc[2], ...) } - else - return { proc[1](...) } - end - end, - - ping = function(self) - return true - end, - - -- local tarantool doesn't provide timeouts - timeout = function(self, timeout) - return self - end, - - close = function(self) - return true - end, - - } -} - -box.net.box.put = box.net.box.replace; -- put is an alias for replace - --- box.net.self rpc works like remote.rpc -box.net.self.rpc = { r = box.net.self } -setmetatable(box.net.self.rpc, { __index = rpc_index }) - - --- --- Make sure box.net.box.select(conn, ...) works --- just as well as conn:select(...) --- -setmetatable(box.net.self, { __index = box.net.box }) - -box.net.box.new = function(host, port, reconnect_timeout) - if reconnect_timeout == nil then - reconnect_timeout = 0 - else - reconnect_timeout = reconnect_timeout - end - - local remote = { - host = host, - port = port, - reconnect_timeout = reconnect_timeout, - closed = false, - timedout = {}, - - title = function(self) - return sprintf('%s:%s', tostring(self.host), tostring(self.port)) - end, - - processing = { - last_sync = 0, - next_sync = function(self) - while true do - self.last_sync = self.last_sync + 1 - if self[ self.last_sync ] == nil then - return self.last_sync - end - - if self.last_sync > 0x7FFFFFFF then - self.last_sync = 0 - end - end - end, - - -- write channel - wch = fiber.channel(1), - - -- ready socket channel - rch = fiber.channel(1), - }, - - - - process = function(self, op, request) - local started = fiber.time() - local timeout = self.request_timeout - self.request_timeout = nil - - -- get an auto-incremented request id - local sync = self.processing:next_sync() - self.processing[sync] = fiber.channel(1) - local header = msgpack.encode{ - [box.net.box.TYPE] = op, [box.net.box.SYNC] = sync - } - request = msgpack.encode(header:len() + request:len()).. - header..request - - if timeout ~= nil then - timeout = timeout - if not self.processing.wch:put(request, timeout) then - self.processing[sync] = nil - return nil - end - - timeout = timeout - (fiber.time() - started) - else - self.processing.wch:put(request) - end - - local res - if timeout ~= nil then - res = self.processing[sync]:get(timeout) - else - res = self.processing[sync]:get() - end - self.processing[sync] = nil - - -- timeout - if res == nil then - self.timedout[ sync ] = true - if op == box.net.box.PING then - return false - else - return nil - end - end - - local function totuples(t) - res = {} - for k, v in pairs(t) do - table.insert(res, box.tuple.new(v)) - end - return res - end - - -- results { status, response } received - if res[1] then - if op == box.net.box.PING then - return true - else - local code = res[2] - local body = msgpack.decode(res[3]) - if code ~= 0 then - box.raise(code, body[box.net.box.ERROR]) - end - return totuples(body[box.net.box.DATA]) - end - else - if op == 65280 then - return false - end - errorf('%s: %s', self:title(), res[2]) - end - end, - - - try_connect = function(self) - if self.s ~= nil then - return true - end - - local sc = socket.tcp() - if sc == nil then - self:fatal("Can't create socket") - return false - end - - local s = { sc:connect( self.host, self.port ) } - if s[1] == nil then - self:fatal("Can't connect to %s:%s: %s", - self.host, self.port, s[4]) - return false - end - sc:recv(box.net.box.GREETING_SIZE) - - self.s = sc - - return true - end, - - read_response = function(self) - if self.s == nil then - return - end - local blen = self.s:recv(5) - if string.len(blen) < 5 then - self:fatal("The server has closed connection") - return - end - blen = msgpack.decode(blen) - - local body = '' - if blen > 0 then - res = { self.s:recv(blen) } - if res[4] ~= nil then - self:fatal("Error while reading socket: %s", res[4]) - return - end - body = res[1] - if string.len(body) ~= blen then - self:fatal("Unexpected eof while reading body") - return - end - end - return body - end, - - rfiber = function(self) - while not self.closed do - while not self.closed do - if self:try_connect(self.host, self.port) then - break - end - -- timeout between reconnect attempts - fiber.sleep(self.reconnect_timeout) - end - - -- wakeup write fiber - self.processing.rch:put(true, 0) - - while not self.closed do - local resp = self:read_response() - if resp == nil or string.len(resp) == 0 then - break - end - local header, offset = msgpack.decode(resp); - local code = header[box.net.box.TYPE] - local sync = header[box.net.box.SYNC] - if sync == nil then - break - end - - if self.processing[sync] ~= nil then - self.processing[sync]:put({true, code, resp:sub(offset)}, 0) - else - if self.timedout[ sync ] then - self.timedout[ sync ] = nil - printf("Timed out response from %s", self:title()) - else - printf("Unexpected response %s from %s", - sync, self:title()) - end - end - end - - end - self.irfiber = nil - end, - - - wfiber = function(self) - local request - while not self.closed do - while self.s == nil do - self.processing.rch:get(1) - end - if request == nil then - request = self.processing.wch:get(1) - end - if self.s ~= nil and request ~= nil then - local res = { self.s:send(request) } - if res[1] ~= string.len(request) then - self:fatal("Error while write socket: %s", res[4]) - end - request = nil - end - end - self.iwfiber = nil - end, - - fatal = function(self, message, ...) - message = sprintf(message, ...) - self.s = nil - for sync, ch in pairs(self.processing) do - if type(sync) == 'number' then - ch:put({ false, message }, 0) - end - end - self.timedout = {} - end, - - close = function(self) - if self.closed then - error("box.net.box: already closed") - end - self.closed = true - local message = 'box.net.box: connection was closed' - self.process = function() - error(message) - end - self:fatal(message) - - -- wake up write fiber - self.processing.rch:put(true, 0) - self.processing.wch:put(true, 0) - return true - end - } - - - setmetatable( remote, { __index = box.net.box } ) - - remote.irfiber = fiber.wrap(remote.rfiber, remote) - remote.iwfiber = fiber.wrap(remote.wfiber, remote) - - remote.rpc = { r = remote } - setmetatable(remote.rpc, { __index = rpc_index }) - - return remote - -end - -end)() --- vim: set et ts=4 sts diff --git a/src/box/lua/call.cc b/src/box/lua/call.cc index afea7defe309fd683a242ea7e6b0cbd54f400e41..40fc5355c5966da06a71cde5a7b5fa3707eb28cd 100644 --- a/src/box/lua/call.cc +++ b/src/box/lua/call.cc @@ -49,8 +49,8 @@ #include "box/schema.h" /* contents of box.lua, misc.lua, box.net.lua respectively */ -extern char load_cfg_lua[], schema_lua[], box_net_lua[]; -static const char *lua_sources[] = { schema_lua, box_net_lua, load_cfg_lua, NULL }; +extern char schema_lua[]; +static const char *lua_sources[] = { schema_lua, NULL }; /* * Functions, exported in box_lua.h should have prefix diff --git a/src/errcode.h b/src/errcode.h index 209660af906fba0651b95776ce1ed5b93427e4b2..c3bc75e4a2eb770ccd9194b4cdf48e4409bb00cf 100644 --- a/src/errcode.h +++ b/src/errcode.h @@ -126,6 +126,8 @@ enum { TNT_ERRMSG_MAX = 512 }; /* 74 */_(ER_INVALID_XLOG, 2, "Failed to read xlog: %lld") \ /* 75 */_(ER_INVALID_XLOG_NAME, 2, "Invalid xlog name: expected %lld got %lld") \ /* 76 */_(ER_INVALID_XLOG_ORDER, 2, "Invalid xlog order: %lld and %lld") \ + /* 77 */_(ER_NO_CONNECTION, 2, "Connection is not established") \ + /* 78 */_(ER_TIMEOUT, 2, "Timeout exceeded") \ /* * !IMPORTANT! Please follow instructions at start of the file diff --git a/src/ffisyms.cc b/src/ffisyms.cc index 20178da7a68522efa88f3cbd0123567831be06b1..2112a20b00182338f748b664a71d89eb52425f96 100644 --- a/src/ffisyms.cc +++ b/src/ffisyms.cc @@ -8,6 +8,7 @@ #include <lua/init.h> #include <tarantool.h> #include "lua/bsdsocket.h" +#include "base64.h" /* * A special hack to cc/ld to keep symbols in an optimized binary. @@ -41,5 +42,6 @@ void *ffi_symbols[] = { (void *) box_set_snap_io_rate_limit, (void *) box_set_too_long_threshold, (void *) bsdsocket_local_resolve, - (void *) bsdsocket_nonblock + (void *) bsdsocket_nonblock, + (void *) base64_decode }; diff --git a/src/lua/box_net_box.lua b/src/lua/box_net_box.lua new file mode 100644 index 0000000000000000000000000000000000000000..2f2a936772a3e79a5b5fdba7fceab743dde2d4b1 --- /dev/null +++ b/src/lua/box_net_box.lua @@ -0,0 +1,975 @@ +-- net_box.lua (internal file) + +local msgpack = require 'msgpack' +local fiber = require 'fiber' +local socket = require 'socket' +local log = require 'log' +local errno = require 'errno' +local ffi = require 'ffi' +local digest = require 'digest' +local yaml = require 'yaml' + +local CODE = 0 +local PING = 64 +local SELECT = 1 +local INSERT = 2 +local REPLACE = 3 +local UPDATE = 4 +local DELETE = 5 +local CALL = 6 +local AUTH = 7 +local TYPE = 0x00 +local SYNC = 0x01 +local SPACE_ID = 0x10 +local INDEX_ID = 0x11 +local LIMIT = 0x12 +local OFFSET = 0x13 +local ITERATOR = 0x14 +local KEY = 0x20 +local TUPLE = 0x21 +local FUNCTION_NAME = 0x22 +local USER = 0x23 +local DATA = 0x30 +local ERROR = 0x31 +local GREETING_SIZE = 128 + +local TIMEOUT_INFINITY = 500 * 365 * 86400 + + +ffi.cdef[[ + int base64_decode(const char *in_base64, int in_len, + char *out_bin, int out_len); +]] + + +local function request(header, body) + + header = msgpack.encode(header) + body = msgpack.encode(body) + + local len = msgpack.encode(string.len(header) + string.len(body)) + + + return len .. header .. body +end + + +local function strxor(s1, s2) + local res = '' + for i = 1, string.len(s1) do + if i > string.len(s2) then + break + end + + local b1 = string.byte(s1, i) + local b2 = string.byte(s2, i) + res = res .. string.char(bit.bxor(b1, b2)) + end + return res +end + +local function b64decode(str) + local so = ffi.new('char[?]', string.len(str) * 2); + local len = + ffi.C.base64_decode(str, string.len(str), so, string.len(str) * 2) + return ffi.string(so, len) +end + + +local function keyfy(v) + if type(v) == 'table' then + return v + end + if v == nil then + return {} + end + return { v } +end + +local function one_tuple(tbl) + if tbl == nil then + return + end + if #tbl > 0 then + return tbl[1] + end + return +end + +local proto = { + _sync = -1, + + -- sync + sync = function(self) + self._sync = self._sync + 1 + if self._sync >= 0x7FFFFFFF then + self._sync = 0 + end + return self._sync + end, + + + ping = function(sync) + return request( + { [SYNC] = sync, [TYPE] = PING }, + {} + ) + end, + + + -- lua call + call = function(sync, proc, args) + if args == nil then + args = {} + end + return request( + { [SYNC] = sync, [TYPE] = CALL }, + { [FUNCTION_NAME] = proc, [TUPLE] = args } + ) + end, + + -- insert + insert = function(sync, spaceno, tuple) + return request( + { [SYNC] = sync, [TYPE] = INSERT }, + { [SPACE_ID] = spaceno, [TUPLE] = tuple } + ) + end, + + -- replace + replace = function(sync, spaceno, tuple) + return request( + { [SYNC] = sync, [TYPE] = REPLACE }, + { [SPACE_ID] = spaceno, [TUPLE] = tuple } + ) + end, + + -- delete + delete = function(sync, spaceno, key) + return request( + { [SYNC] = sync, [TYPE] = DELETE }, + { [SPACE_ID] = spaceno, [KEY] = key } + ) + end, + + -- update + update = function(sync, spaceno, key, oplist) + return request( + { [SYNC] = sync, [TYPE] = UPDATE }, + { [KEY] = key, [TUPLE] = oplist, [SPACE_ID] = spaceno } + ) + end, + + -- select + select = function(sync, spaceno, indexno, key, opts) + + if opts == nil then + opts = {} + end + if spaceno == nil or type(spaceno) ~= 'number' then + box.raise(box.error.NO_SUCH_SPACE, + string.format("Space %s does not exist", tostring(spaceno))) + end + + if indexno == nil or type(indexno) ~= 'number' then + box.raise(box.error.NO_SUCH_INDEX, + string.format("No index #%s is defined in space %s", + tostring(spaceno), + tostring(indexno))) + end + + local body = { + [SPACE_ID] = spaceno, + [INDEX_ID] = indexno, + [KEY] = keyfy(key) + } + + if opts.limit ~= nil then + body[LIMIT] = tonumber(opts.limit) + else + body[LIMIT] = 0xFFFFFFFF + end + if opts.offset ~= nil then + body[OFFSET] = tonumber(opts.offset) + else + body[OFFSET] = 0 + end + + if opts.iterator ~= nil then + if type(opts.iterator) == 'string' then + local iterator = box.index[ opts.iterator ] + if iterator == nil then + box.raise(box.error.INVALID_MSGPACK, + "Wrong iterator " .. opts.iterator) + end + body[ITERATOR] = iterator + else + body[ITERATOR] = tonumber(opts.iterator) + end + end + + return request( { [SYNC] = sync, [TYPE] = SELECT }, body ) + end, + + auth = function(sync, user, password, handshake) + local saltb64 = string.sub(handshake, 65) + local salt = string.sub(b64decode(saltb64), 1, 20) + + local hpassword = digest.sha1(password) + local hhpassword = digest.sha1(hpassword) + local scramble = digest.sha1(salt .. hhpassword) + + local hash = strxor(hpassword, scramble) + return request( + { [SYNC] = sync, [TYPE] = AUTH }, + { [USER] = user, [TUPLE] = { 'chap-sha1', hash } } + ) + end, + + b64decode = b64decode, +} + + +local function space_metatable(self) + return { + __index = { + insert = function(space, tuple) + return self:_insert(space.id, tuple) + end, + + replace = function(space, tuple) + return self:_replace(space.id, tuple) + end, + + select = function(space, key, opts) + return self:_select(space.id, 0, key, opts) + end, + + delete = function(space, key) + return self:_delete(space.id, key) + end, + + update = function(space, key, oplist) + return self:_update(space.id, key, oplist) + end, + + get = function(space, key) + local res = self:_select(space.id, 0, key, + { limit = 2, iterator = 'EQ' }) + if #res == 0 then + return + end + if #res == 1 then + return res[1] + end + box.raise(box.error.MORE_THAN_ONE_TUPLE, + "More than one tuple found by get()") + end + } + } +end + +local function index_metatable(self) + return { + __index = { + select = function(idx, key, opts) + return self:_select(idx.space.id, idx.id, key, opts) + end, + + + + get = function(idx, key) + local res = self:_select(idx.space.id, idx.id, key, + { limit = 2, iterator = 'EQ' }) + if #res == 0 then + return + end + if #res == 1 then + return res[1] + end + box.raise(box.error.MORE_THAN_ONE_TUPLE, + "More than one tuple found by get()") + end, + + min = function(idx, key) + local res = self:_select(idx.space.id, idx.id, key, + { limit = 1, iterator = 'GE' }) + if #res > 0 then + return res[1] + end + end, + + max = function(idx, key) + local res = self:_select(idx.space.id, idx.id, key, + { limit = 1, iterator = 'LE' }) + if #res > 0 then + return res[1] + end + end, + + count = function(idx, key) + local proc = string.format('box.space.%s.index.%s:count', + idx.space.name, idx.name) + local res = self:call(proc, key) + if #res > 0 then + return res[1][1] + end + end + } + } +end + + +local remote = {} + +local remote_methods = { + proto = proto, + + new = function(cls, host, port, opts) + local self = {} + setmetatable(self, getmetatable(cls)) + + self.is_instance = true + self.host = host + self.port = port + self.opts = opts + if self.opts == nil then + self.opts = {} + end + + if self.host == nil then + self.host = 'localhost' + end + + self.is_run = true + self.state = 'init' + + self.ch = { sync = {}, fid = {} } + self.wait = { state = {} } + self.timeouts = {} + + + fiber.wrap(function() self:_connect_worker() end) + fiber.wrap(function() self:_read_worker() end) + fiber.wrap(function() self:_write_worker() end) + + if self.opts.wait_connected == nil or self.opts.wait_connected then + self:wait_connected() + end + + return self + end, + + + ping = function(self) + if not self:is_connected() then + return false + end + local sync = self.proto:sync() + local req = self.proto.ping(sync) + + local res = self:_request('ping', false) + + + if res == nil then + return false + end + + if res.hdr[CODE] == 0 then + return true + end + return false + end, + + call = function(self, proc, ...) + local res = self:_request('call', true, proc, {...}) + return res.body[DATA] + end, + + is_connected = function(self) + if self.state == 'active' then + return true + end + if self.state == 'activew' then + return true + end + return false + end, + + wait_connected = function(self, timeout) + return self:_wait_state({ 'active', 'activew', 'closed' }, timeout) + end, + + timeout = function(self, timeout) + if timeout == nil then + return self + end + if self.is_instance then + self.timeouts[ fiber.id() ] = timeout + return self + end + + return { + new = function(cls, host, port, opts) + if opts == nil then + opts = {} + end + + opts.wait_connected = false + + local cn = self:new(host, port, opts) + + if not cn:wait_connected(timeout) then + cn:close() + box.raise(box.error.TIMEOUT, 'Timeout exceeded') + end + return cn + end + } + end, + + close = function(self) + if self.state ~= 'closed' then + self:_switch_state('closed') + self:_error_waiters('Connection was closed') + if self.s ~= nil then + self.s:close() + self.s = nil + end + end + end, + + -- private methods + _fatal = function(self, efmt, ...) + if self.state == 'error' then + return + end + local emsg = efmt + if select('#', ...) > 0 then + emsg = string.format(efmt, ...) + end + + if self.s ~= nil then + self.s:close() + self.s = nil + end + + log.warn(emsg) + self.error = emsg + self.space = {} + self:_switch_state('error') + self:_error_waiters(emsg) + self.rbuf = '' + self.wbuf = '' + self.handshake = '' + end, + + _error_waiters = function(self, emsg) + local waiters = self.ch.sync + self.ch.sync = {} + for sync, channel in pairs(waiters) do + channel:put{ + hdr = { + [TYPE] = ERROR, + [CODE] = box.error.NO_CONNECTION, + [SYNC] = sync + }, + body = { + [ERROR] = emsg + } + } + end + end, + + _check_response = function(self) + while true do + if #self.rbuf < 5 then + break + end + + local len, off = msgpack.decode(self.rbuf) + + if len < #self.rbuf - off then + return + end + + local hdr, body + hdr, off = msgpack.decode(self.rbuf, off) + if off < #self.rbuf then + body, off = msgpack.decode(self.rbuf, off) + else + body = {} + end + self.rbuf = string.sub(self.rbuf, off + 1) + + local sync = hdr[SYNC] + + if self.ch.sync[sync] ~= nil then + self.ch.sync[sync]:put({ hdr = hdr, body = body }) + self.ch.sync[sync] = nil + else + log.warn("Unexpected response %s", sync) + end + end + end, + + _switch_state = function(self, state) + if self.state == state then + return + end + self.state = state + + local list = self.wait.state[ state ] + self.wait.state[ state ] = nil + + if list == nil then + return + end + + for _, fid in pairs(list) do + if self.ch.fid[fid] ~= nil then + self.ch.fid[fid]:put(true) + self.ch.fid[fid] = nil + end + end + end, + + _wait_state = function(self, states, timeout) + if timeout == nil then + timeout = TIMEOUT_INFINITY + end + while timeout > 0 do + local started = fiber.time() + for _, state in pairs(states) do + if self.state == state then + return true + end + end + + local fid = fiber.id() + local ch = fiber.channel() + for _, state in pairs(states) do + if self.wait.state[state] == nil then + self.wait.state[state] = {} + end + self.wait.state[state][fid] = fid + end + + self.ch.fid[fid] = ch + local res + res = ch:get(timeout) + self.ch.fid[fid] = nil + + local has_state = false + for _, state in pairs(states) do + if self.wait.state[state] ~= nil then + self.wait.state[state][fid] = nil + end + if self.state == state then + has_state = true + end + end + + if has_state or res == nil then + return res + end + + timeout = timeout - (fiber.time() - started) + end + end, + + _connect_worker = function(self) + fiber.name('net.box.connector') + while true do + self:_wait_state{ 'init', 'error', 'closed' } + if self.state == 'closed' then + return + end + + if self.state == 'error' then + if self.opts.reconnect_after == nil then + self:_switch_state('closed') + return + end + fiber.sleep(self.opts.reconnect_after) + end + + self:_switch_state('connecting') + + self.s = socket.tcp_connect(self.host, self.port) + if self.s == nil then + self:_fatal(errno.strerror(errno())) + else + + -- on_connect + self:_switch_state('handshake') + self.handshake = self.s:read(128) + if self.handshake == nil then + self:_fatal(errno.strerror(errno())) + elseif string.len(self.handshake) ~= 128 then + self:_fatal("Can't read handshake") + else + + self.wbuf = '' + self.rbuf = '' + + local s, e = pcall(function() + self:_auth() + end) + if not s then + self:_fatal(e) + end + + xpcall(function() self:_load_schema() end, + function(e) + log.info("Can't load schema: %s", tostring(e)) + end) + + if self.state ~= 'error' and self.state ~= 'closed' then + self:_switch_state('active') + end + end + end + end + end, + + _auth = function(self) + if self.opts.user == nil or self.opts.password == nil then + self:_switch_state 'authen' + return + end + + self:_switch_state 'auth' + + local auth_res = self:_request_internal('auth', + false, self.opts.user, self.opts.password, self.handshake) + + if auth_res.hdr[CODE] ~= 0 then + self:_fatal(auth_res.body[ERROR]) + return + end + + self:_switch_state 'authen' + end, + + -- states wakeup _read_worker + _r_states = { + 'active', 'activew', 'schema', 'schemaw', 'closed', 'auth', 'authw' + }, + -- states wakeup _write_worker + _rw_states = { 'activew', 'schemaw', 'closed', 'authw' }, + + _is_r_state = function(self) + for _, state in pairs(self._r_states) do + if state == self.state then + return true + end + end + return false + end, + + _is_rw_state = function(self) + for _, state in pairs(self._rw_states) do + if state == self.state then + return true + end + end + return false + end, + + + _load_schema = function(self) + if self.state ~= 'authen' then + self:_fatal 'Can not load schema from the state' + return + end + + self:_switch_state('schema') + + local spaces = self:_request_internal('select', + true, box.schema.SPACE_ID, 0, nil, { iterator = 'ALL' }).body[DATA] + local indexes = self:_request_internal('select', + true, box.schema.INDEX_ID, 0, nil, { iterator = 'ALL' }).body[DATA] + + local sl = {} + + + for _, space in pairs(spaces) do + local name = space[3] + local id = space[1] + local engine = space[4] + local field_count = space[5] + + local s = { + id = id, + name = name, + engine = engine, + field_count = field_count, + enabled = true, + index = {} + + } + if #space > 5 and string.match(space[6], 'temporary') then + s.temporary = true + else + s.temporary = false + end + + setmetatable(s, space_metatable(self)) + + sl[id] = s + sl[name] = s + + end + + for _, index in pairs(indexes) do + local idx = { + space = index[1], + id = index[2], + name = index[3], + type = string.upper(index[4]), + parts = {}, + } + + if index[5] == 0 then + idx.unique = false + else + idx.unique = true + end + + for k = 0, index[6] - 1 do + local pktype = index[7 + k * 2 + 1] + local pkfield = index[7 + k * 2] + + local pk = { + type = string.upper(pktype), + fieldno = pkfield + } + idx.parts[k] = pk + end + + if sl[idx.space] ~= nil then + sl[idx.space].index[idx.id] = idx + sl[idx.space].index[idx.name] = idx + idx.space = sl[idx.space] + setmetatable(idx, index_metatable(self)) + end + end + + self.space = sl + end, + + _read_worker = function(self) + fiber.name('net.box.read') + while self.state ~= 'closed' do + self:_wait_state(self._r_states) + if self.state == 'closed' then + break + end + + if self.s:readable(.5) then + if self.state == 'closed' then + break + end + + if self:_is_r_state() then + local data = self.s:sysread(4096) + + if data ~= nil then + self.rbuf = self.rbuf .. data + self:_check_response() + else + self:_fatal(errno.strerror(errno())) + end + end + end + end + end, + + _to_wstate = { active = 'activew', schema = 'schemaw', auth = 'authw' }, + _to_rstate = { activew = 'active', schemaw = 'schema', authw = 'auth' }, + + _write_worker = function(self) + fiber.name('net.box.write') + while self.state ~= 'closed' do + self:_wait_state(self._rw_states) + + if self.state == 'closed' then + break + end + + if string.len(self.wbuf) == 0 then + + local wstate = self._to_rstate[self.state] + if wstate ~= nil then + self:_switch_state(wstate) + end + + elseif self.s:writable(.5) then + + if self.state == 'closed' then + break + end + if self:_is_rw_state() then + if #self.wbuf > 0 then + local written = self.s:syswrite(self.wbuf) + if written ~= nil then + self.wbuf = string.sub(self.wbuf, + tonumber(1 + written)) + else + self:_fatal(errno.strerror(errno())) + end + end + end + + end + end + end, + + + _request = function(self, name, raise, ...) + + local fid = fiber.id() + if self.timeouts[fid] == nil then + self.timeouts[fid] = TIMEOUT_INFINITY + end + + local started = fiber.time() + + self:_wait_state({ 'active', 'activew', 'closed' }, self.timeouts[fid]) + + self.timeouts[fid] = self.timeouts[fid] - (fiber.time() - started) + + if self.state == 'closed' then + if raise then + box.raise(box.error.NO_CONNECTION, + "Connection was closed") + end + end + + if self.timeouts[fid] <= 0 then + self.timeouts[fid] = nil + if raise then + box.raise(box.error.TIMEOUT, 'Timeout exceeded') + else + return { + hdr = { [CODE] = box.error.TIMEOUT }, + body = { [ERROR] = 'Timeout exceeded' } + } + end + end + + return self:_request_internal(name, raise, ...) + end, + + _request_internal = function(self, name, raise, ...) + + local fid = fiber.id() + if self.timeouts[fid] == nil then + self.timeouts[fid] = TIMEOUT_INFINITY + end + + local sync = self.proto:sync() + local request = self.proto[name](sync, ...) + + self.wbuf = self.wbuf .. request + + local wstate = self._to_wstate[self.state] + if wstate ~= nil then + self:_switch_state(wstate) + end + + local ch = fiber.channel() + + self.ch.sync[sync] = ch + + local response = ch:get(self.timeouts[fid]) + self.ch.sync[sync] = nil + self.timeouts[fid] = nil + + + if response == nil then + if raise then + box.raise(box.error.TIMEOUT, 'Timeout exceeded') + else + return { + hdr = { [CODE] = box.error.TIMEOUT }, + body = { [ERROR] = 'Timeout exceeded' } + } + end + end + + if raise and response.hdr[CODE] ~= 0 then + box.raise(response.hdr[CODE], response.body[ERROR]) + end + + if response.body[DATA] ~= nil then + for i, v in pairs(response.body[DATA]) do + response.body[DATA][i] = box.tuple.new(response.body[DATA][i]) + end + end + + return response + end, + + -- private (low level) methods + _select = function(self, spaceno, indexno, key, opts) + local res = self:_request('select', true, spaceno, indexno, key, opts) + return res.body[DATA] + end, + + _insert = function(self, spaceno, tuple) + local res = self:_request('insert', true, spaceno, tuple) + return one_tuple(res.body[DATA]) + end, + + _replace = function(self, spaceno, tuple) + local res = self:_request('replace', true, spaceno, tuple) + return one_tuple(res.body[DATA]) + end, + + _delete = function(self, spaceno, key) + local res = self:_request('delete', true, spaceno, key) + return one_tuple(res.body[DATA]) + end, + + _update = function(self, spaceno, key, oplist) + local res = self:_request('update', true, spaceno, key, oplist) + return one_tuple(res.body[DATA]) + end +} + +setmetatable(remote, { __index = remote_methods }) + +remote.self = { + ping = function() return true end, + close = function() end, + timeout = function(self) return self end, + wait_connected = function(self) return true end, + call = function(_box, proc_name, ...) + local proc = { package.loaded['box.internal'] + .call_loadproc(proc_name) } + local result + if #proc == 2 then + result = { proc[1](proc[2], ...) } + else + result = { proc[1](...) } + end + + if #result == 1 and type(result[1]) == 'table' then + result = result[1] + end + + for i, v in pairs(result) do + result[i] = box.tuple.new(v) + end + return result + end +} + +setmetatable(remote.self, { __index = box }) + + +return remote + + diff --git a/src/lua/init.cc b/src/lua/init.cc index 44919557389c5d33e9bf744c354fb3c9925c2cf8..0e6cfb64f34f4e945f759ec9c2629253e552aa25 100644 --- a/src/lua/init.cc +++ b/src/lua/init.cc @@ -31,7 +31,7 @@ #include "tarantool.h" #include "box/box.h" #include "tbuf.h" -#if defined(__FreeBSD__) || defined(__APPLE__) +#if defined(__FreeBSD__) || defined(__APPLE__) #include "libgen.h" #endif @@ -69,14 +69,33 @@ extern "C" { struct lua_State *tarantool_L; /* contents of src/lua/ files */ -extern char uuid_lua[], session_lua[], msgpackffi_lua[], fun_lua[], - console_lua[], digest_lua[], init_lua[], - log_lua[]; -static const char *lua_sources[] = { init_lua, session_lua, NULL }; -static const char *lua_modules[] = { "msgpackffi", msgpackffi_lua, - "fun", fun_lua, "digest", digest_lua, +extern char uuid_lua[], + session_lua[], + msgpackffi_lua[], + fun_lua[], + digest_lua[], + init_lua[], + log_lua[], + console_lua[], + box_net_box_lua[]; + +static const char *lua_sources[] = { + init_lua, + session_lua, + NULL +}; + +static const char *lua_modules[] = { + "msgpackffi", msgpackffi_lua, + "fun", fun_lua, + "digest", digest_lua, "console", console_lua, - "uuid", uuid_lua, "log", log_lua, NULL }; + "uuid", uuid_lua, + "log", log_lua, + "net.box", box_net_box_lua, + NULL +}; + /* * {{{ box Lua library: common functions */ diff --git a/src/module/sql/sql.lua b/src/module/sql/sql.lua index 89266845d0829b01702cbc2749b436092ce3fcdc..65df14268ea29b3f08b1212422aca191df14dc1f 100644 --- a/src/module/sql/sql.lua +++ b/src/module/sql/sql.lua @@ -2,6 +2,10 @@ local fiber = require('fiber') +if box.net == nil then + box.net = {} +end + box.net.sql = { -- constructor -- box.net.sql.connect( diff --git a/test/box/access.result b/test/box/access.result index be51a84d20dee19e2c62c02602020ce2be51a26d..0f4d7614979ba1cce1dbac0b0ab7eef1c1396d86 100644 --- a/test/box/access.result +++ b/test/box/access.result @@ -156,7 +156,7 @@ box.schema.user.drop('ПетÑ_Иванов') --- ... -- gh-300: misleading error message if a function does not exist -c = box.net.box.new("localhost", box.cfg.primary_port) +c = (require 'net.box'):new("127.0.0.1", box.cfg.primary_port) --- ... c:call('nosuchfunction') @@ -179,7 +179,6 @@ c:call('nosuchfunction') ... c:close() --- -- true ... -- Dropping a space recursively drops all grants - it's possible to -- restore from a snapshot diff --git a/test/box/access.test.lua b/test/box/access.test.lua index 5242079abb03110c5b6930262d949de76f210ef3..7a1f5119b4c22a3f469193a4dd4b41467cc0c0ba 100644 --- a/test/box/access.test.lua +++ b/test/box/access.test.lua @@ -74,7 +74,8 @@ box.schema.user.create('ПетÑ_Иванов') box.schema.user.drop('ПетÑ_Иванов') -- gh-300: misleading error message if a function does not exist -c = box.net.box.new("localhost", box.cfg.primary_port) +c = (require 'net.box'):new("127.0.0.1", box.cfg.primary_port) + c:call('nosuchfunction') function nosuchfunction() end c:call('nosuchfunction') diff --git a/test/box/box.net.box.result b/test/box/box.net.box.result new file mode 100644 index 0000000000000000000000000000000000000000..f112b865d8d03267b5d1303b8264183cb2d395e2 --- /dev/null +++ b/test/box/box.net.box.result @@ -0,0 +1,422 @@ +remote = require 'net.box' +--- +... +fiber = require 'fiber' +--- +... +log = require 'log' +--- +... +box.schema.user.grant('guest', 'read,write,execute', 'universe') +--- +... +port = box.cfg.primary_port +--- +... +space = box.schema.create_space('net_box_test_space') +--- +... +space:create_index('primary', { type = 'tree' }) +--- +... +-- low level connection +log.info("create connection") +--- +... +cn = remote:new('127.0.0.1', port) +--- +... +cn:_wait_state({'active', 'error'}, 1) +--- +- true +... +log.info("state is %s", cn.state) +--- +... +cn:ping() +--- +- true +... +log.info("ping is done") +--- +... +cn:ping() +--- +- true +... +log.info("ping is done") +--- +... +cn:ping() +--- +- true +... +cn:call('unexists_procedure') +--- +- error: Procedure 'unexists_procedure' is not defined +... +function test_foo(a,b,c) return { {{ [a] = 1 }}, {{ [b] = 2 }}, c } end +--- +... +cn:call('test_foo', 'a', 'b', 'c') +--- +- - [{'a': 1}] + - [{'b': 2}] + - ['c'] +... +cn:_select(space.id, space.index.primary.id, 123) +--- +- [] +... +space:insert{123, 345} +--- +- [123, 345] +... +cn:_select(space.id, space.index.primary.id, 123) +--- +- - [123, 345] +... +cn:_select(space.id, space.index.primary.id, 123, { limit = 0 }) +--- +- [] +... +cn:_select(space.id, space.index.primary.id, 123, { limit = 1 }) +--- +- - [123, 345] +... +cn:_select(space.id, space.index.primary.id, 123, { limit = 1, offset = 1 }) +--- +- [] +... +cn.space[space.id] ~= nil +--- +- true +... +cn.space.net_box_test_space ~= nil +--- +- true +... +cn.space.net_box_test_space ~= nil +--- +- true +... +cn.space.net_box_test_space.index ~= nil +--- +- true +... +cn.space.net_box_test_space.index.primary ~= nil +--- +- true +... +cn.space.net_box_test_space.index[space.index.primary.id] ~= nil +--- +- true +... +cn.space.net_box_test_space.index.primary:select(123) +--- +- - [123, 345] +... +cn.space.net_box_test_space.index.primary:select(123, { limit = 0 }) +--- +- [] +... +cn.space.net_box_test_space.index.primary:select(nil, { limit = 1, }) +--- +- - [123, 345] +... +cn.space.net_box_test_space:insert{234, 1,2,3} +--- +- [234, 1, 2, 3] +... +cn.space.net_box_test_space:insert{234, 1,2,3} +--- +- error: Duplicate key exists in unique index 0 +... +cn.space.net_box_test_space:replace{354, 1,2,3} +--- +- [354, 1, 2, 3] +... +cn.space.net_box_test_space:replace{354, 1,2,4} +--- +- [354, 1, 2, 4] +... +cn.space.net_box_test_space:select{123} +--- +- - [123, 345] +... +space:select({123}, { iterator = 'GE' }) +--- +- - [123, 345] + - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn.space.net_box_test_space:select({123}, { iterator = 'GE' }) +--- +- - [123, 345] + - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn.space.net_box_test_space:select({123}, { iterator = 'GT' }) +--- +- - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn.space.net_box_test_space:select({123}, { iterator = 'GT', limit = 1 }) +--- +- - [234, 1, 2, 3] +... +cn.space.net_box_test_space:select({123}, { iterator = 'GT', limit = 1, offset = 1 }) +--- +- - [354, 1, 2, 4] +... +cn.space.net_box_test_space:select{123} +--- +- - [123, 345] +... +cn.space.net_box_test_space:update({123}, { { '+', 1, 2 } }) +--- +- [123, 347] +... +cn.space.net_box_test_space:select{123} +--- +- - [123, 347] +... +cn.space.net_box_test_space:update({123}, { { '=', 0, 2 } }) +--- +- [2, 347] +... +cn.space.net_box_test_space:select{2} +--- +- - [2, 347] +... +cn.space.net_box_test_space:select({234}, { iterator = 'LT' }) +--- +- - [2, 347] +... +cn.space.net_box_test_space:update({1}, { { '+', 1, 2 } }) +--- +... +cn.space.net_box_test_space:delete{1} +--- +... +cn.space.net_box_test_space:delete{2} +--- +- [2, 347] +... +cn.space.net_box_test_space:delete{2} +--- +... +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) +--- +- - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn.space.net_box_test_space.index.primary:min() +--- +- [234, 1, 2, 3] +... +cn.space.net_box_test_space.index.primary:min(354) +--- +- [354, 1, 2, 4] +... +cn.space.net_box_test_space.index.primary:max() +--- +- [354, 1, 2, 4] +... +cn.space.net_box_test_space.index.primary:max(234) +--- +- [234, 1, 2, 3] +... +cn.space.net_box_test_space.index.primary:count() +--- +- 2 +... +cn.space.net_box_test_space.index.primary:count(354) +--- +- 1 +... +cn.space.net_box_test_space:get(354) +--- +- [354, 1, 2, 4] +... +-- reconnects after errors +-- -- 1. no reconnect +cn:_fatal('Test fatal error') +--- +... +cn.state +--- +- closed +... +cn:ping() +--- +- false +... +cn:call('test_foo') +--- +- error: Connection was closed +... +-- -- 2 reconnect +cn = remote:new('127.0.0.1', port, { reconnect_after = .1 }) +--- +... +cn:_wait_state({'active'}, 1) +--- +- true +... +cn.space ~= nil +--- +- true +... +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) +--- +- - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn:_fatal 'Test error' +--- +... +cn:_wait_state({'active', 'activew'}, 2) +--- +- true +... +cn:ping() +--- +- true +... +cn.state +--- +- active +... +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) +--- +- - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +cn:_fatal 'Test error' +--- +... +cn:_select(space.id, 0, {}, { iterator = 'ALL' }) +--- +- - [234, 1, 2, 3] + - [354, 1, 2, 4] +... +-- -- error while waiting for response +type(fiber.wrap(function() fiber.sleep(.5) cn:_fatal('Test error') end)) +--- +- userdata +... +function pause() fiber.sleep(10) return true end +--- +... +cn:call('pause') +--- +- error: Test error +... +cn:call('test_foo', 'a', 'b', 'c') +--- +- - [{'a': 1}] + - [{'b': 2}] + - ['c'] +... +-- call +remote.self:call('test_foo', 'a', 'b', 'c') +--- +- - [{'a': 1}] + - [{'b': 2}] + - ['c'] +... +cn:call('test_foo', 'a', 'b', 'c') +--- +- - [{'a': 1}] + - [{'b': 2}] + - ['c'] +... +-- auth +cn.proto.b64decode('gJLocxbO32VmfO8x04xRVxKfgwzmNVM2t6a1ME8XsD0=') +--- +- !!binary gJLocxbO32VmfO8x04xRVxKfgwzmNVM2t6a1ME8XsD0= +... +cn.proto.b64decode('gJLoc!!!!!!!') +--- +- !!binary gJLo +... +cn = remote:new('127.0.0.1', port, { user = 'netbox', password = '123', wait_connected = true }) +--- +... +cn:is_connected() +--- +- false +... +cn.error +--- +- User 'netbox' is not found +... +cn.state +--- +- closed +... +box.schema.user.create('netbox', { password = 'test' }) +--- +... +box.schema.user.grant('netbox', 'read, write, execute', 'universe'); +--- +... +cn = remote:new('127.0.0.1', port, { user = 'netbox', password = 'test' }) +--- +... +cn.state +--- +- active +... +cn.error +--- +- null +... +cn:ping() +--- +- true +... +function ret_after(to) fiber.sleep(to) return {{to}} end +--- +... +-- timeouts +cn:timeout(1).space.net_box_test_space.index.primary:select{234} +--- +- - [234, 1, 2, 3] +... +cn:call('ret_after', .01) +--- +- - [0.01] +... +cn:timeout(1):call('ret_after', .01) +--- +- - [0.01] +... +cn:timeout(.01):call('ret_after', 1) +--- +- error: Timeout exceeded +... +cn = remote:timeout(0.0000000001):new('127.0.0.1', port, { user = 'netbox', password = '123' }) +--- +- error: Timeout exceeded +... +cn = remote:timeout(1):new('127.0.0.1', port, { user = 'netbox', password = '123' }) +--- +... +remote.self:ping() +--- +- true +... +remote.self.space.net_box_test_space:select{234} +--- +- - [234, 1, 2, 3] +... +remote.self:timeout(123).space.net_box_test_space:select{234} +--- +- - [234, 1, 2, 3] +... +-- cleanup database after tests +space:drop() +--- +... diff --git a/test/box/box.net.box.test.lua b/test/box/box.net.box.test.lua new file mode 100644 index 0000000000000000000000000000000000000000..953e964a5f95145af2d1d8a7085a1ee8d8ecce6e --- /dev/null +++ b/test/box/box.net.box.test.lua @@ -0,0 +1,158 @@ +remote = require 'net.box' +fiber = require 'fiber' +log = require 'log' + +box.schema.user.grant('guest', 'read,write,execute', 'universe') +port = box.cfg.primary_port +space = box.schema.create_space('net_box_test_space') +space:create_index('primary', { type = 'tree' }) + +-- low level connection +log.info("create connection") +cn = remote:new('127.0.0.1', port) +cn:_wait_state({'active', 'error'}, 1) +log.info("state is %s", cn.state) + +cn:ping() +log.info("ping is done") +cn:ping() +log.info("ping is done") + + +cn:ping() + + +cn:call('unexists_procedure') + +function test_foo(a,b,c) return { {{ [a] = 1 }}, {{ [b] = 2 }}, c } end + +cn:call('test_foo', 'a', 'b', 'c') + + +cn:_select(space.id, space.index.primary.id, 123) +space:insert{123, 345} +cn:_select(space.id, space.index.primary.id, 123) +cn:_select(space.id, space.index.primary.id, 123, { limit = 0 }) +cn:_select(space.id, space.index.primary.id, 123, { limit = 1 }) +cn:_select(space.id, space.index.primary.id, 123, { limit = 1, offset = 1 }) + +cn.space[space.id] ~= nil +cn.space.net_box_test_space ~= nil +cn.space.net_box_test_space ~= nil +cn.space.net_box_test_space.index ~= nil +cn.space.net_box_test_space.index.primary ~= nil +cn.space.net_box_test_space.index[space.index.primary.id] ~= nil + + +cn.space.net_box_test_space.index.primary:select(123) +cn.space.net_box_test_space.index.primary:select(123, { limit = 0 }) +cn.space.net_box_test_space.index.primary:select(nil, { limit = 1, }) +cn.space.net_box_test_space:insert{234, 1,2,3} +cn.space.net_box_test_space:insert{234, 1,2,3} + +cn.space.net_box_test_space:replace{354, 1,2,3} +cn.space.net_box_test_space:replace{354, 1,2,4} + +cn.space.net_box_test_space:select{123} +space:select({123}, { iterator = 'GE' }) +cn.space.net_box_test_space:select({123}, { iterator = 'GE' }) +cn.space.net_box_test_space:select({123}, { iterator = 'GT' }) +cn.space.net_box_test_space:select({123}, { iterator = 'GT', limit = 1 }) +cn.space.net_box_test_space:select({123}, { iterator = 'GT', limit = 1, offset = 1 }) + +cn.space.net_box_test_space:select{123} +cn.space.net_box_test_space:update({123}, { { '+', 1, 2 } }) +cn.space.net_box_test_space:select{123} + +cn.space.net_box_test_space:update({123}, { { '=', 0, 2 } }) +cn.space.net_box_test_space:select{2} +cn.space.net_box_test_space:select({234}, { iterator = 'LT' }) + +cn.space.net_box_test_space:update({1}, { { '+', 1, 2 } }) + +cn.space.net_box_test_space:delete{1} +cn.space.net_box_test_space:delete{2} +cn.space.net_box_test_space:delete{2} + +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) + +cn.space.net_box_test_space.index.primary:min() +cn.space.net_box_test_space.index.primary:min(354) +cn.space.net_box_test_space.index.primary:max() +cn.space.net_box_test_space.index.primary:max(234) +cn.space.net_box_test_space.index.primary:count() +cn.space.net_box_test_space.index.primary:count(354) + +cn.space.net_box_test_space:get(354) + +-- reconnects after errors + +-- -- 1. no reconnect +cn:_fatal('Test fatal error') +cn.state +cn:ping() +cn:call('test_foo') + +-- -- 2 reconnect +cn = remote:new('127.0.0.1', port, { reconnect_after = .1 }) +cn:_wait_state({'active'}, 1) +cn.space ~= nil + +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) +cn:_fatal 'Test error' +cn:_wait_state({'active', 'activew'}, 2) +cn:ping() +cn.state +cn.space.net_box_test_space:select({}, { iterator = 'ALL' }) + +cn:_fatal 'Test error' +cn:_select(space.id, 0, {}, { iterator = 'ALL' }) + +-- -- error while waiting for response +type(fiber.wrap(function() fiber.sleep(.5) cn:_fatal('Test error') end)) +function pause() fiber.sleep(10) return true end + +cn:call('pause') +cn:call('test_foo', 'a', 'b', 'c') + + +-- call +remote.self:call('test_foo', 'a', 'b', 'c') +cn:call('test_foo', 'a', 'b', 'c') + +-- auth +cn.proto.b64decode('gJLocxbO32VmfO8x04xRVxKfgwzmNVM2t6a1ME8XsD0=') +cn.proto.b64decode('gJLoc!!!!!!!') + + +cn = remote:new('127.0.0.1', port, { user = 'netbox', password = '123', wait_connected = true }) +cn:is_connected() +cn.error +cn.state + +box.schema.user.create('netbox', { password = 'test' }) +box.schema.user.grant('netbox', 'read, write, execute', 'universe'); + +cn = remote:new('127.0.0.1', port, { user = 'netbox', password = 'test' }) +cn.state +cn.error +cn:ping() + +function ret_after(to) fiber.sleep(to) return {{to}} end + +-- timeouts +cn:timeout(1).space.net_box_test_space.index.primary:select{234} +cn:call('ret_after', .01) +cn:timeout(1):call('ret_after', .01) +cn:timeout(.01):call('ret_after', 1) + +cn = remote:timeout(0.0000000001):new('127.0.0.1', port, { user = 'netbox', password = '123' }) +cn = remote:timeout(1):new('127.0.0.1', port, { user = 'netbox', password = '123' }) + +remote.self:ping() +remote.self.space.net_box_test_space:select{234} +remote.self:timeout(123).space.net_box_test_space:select{234} + +-- cleanup database after tests +space:drop() + diff --git a/test/box/misc.result b/test/box/misc.result index d9b038bd1261f4a68e504945d289aef2bd74a67c..35e4c2bbc1faaa587f162a0514d97f316a0c7865 100644 --- a/test/box/misc.result +++ b/test/box/misc.result @@ -16,18 +16,22 @@ t = {} for n in pairs(box) do table.insert(t, tostring(n)) end table.sort(t) ... t --- -- - cfg +- - call + - cfg + - close - error - index - info - - net + - ping - raise - schema - slab - snapshot - space - stat + - timeout - tuple + - wait_connected ... t = nil --- @@ -203,6 +207,8 @@ t; - 'box.error.INVALID_MSGPACK : 20' - 'box.error.KEY_PART_COUNT : 31' - 'box.error.ALTER_SPACE : 12' + - 'box.error.SPLICE : 25' + - 'box.error.NO_CONNECTION : 77' - 'box.error.TUPLE_FOUND : 3' - 'box.error.INVALID_XLOG_NAME : 75' - 'box.error.INVALID_XLOG : 74' @@ -229,7 +235,7 @@ t; - 'box.error.TUPLE_FORMAT_LIMIT : 16' - 'box.error.INVALID_UUID : 64' - 'box.error.DROP_SPACE : 11' - - 'box.error.SPLICE : 25' + - 'box.error.TIMEOUT : 78' - 'box.error.MORE_THAN_ONE_TUPLE : 41' - 'box.error.MEMORY_ISSUE : 2' - 'box.error.NO_SUCH_TRIGGER : 34' diff --git a/test/box/protocol.result b/test/box/protocol.result index 0042bade62bd2de6ce77d534fa32437eb1dac399..829f15f04265bc470eeae42314fa0cea6552a8fe 100644 --- a/test/box/protocol.result +++ b/test/box/protocol.result @@ -16,34 +16,33 @@ for i=1,5 do space:insert{i} end primary_port = string.gsub(box.cfg.primary_port, '^.*:', '') --- ... -conn = box.net.box.new('127.0.0.1', tonumber(primary_port)) +conn = (require 'net.box'):new('127.0.0.1', tonumber(primary_port)) --- ... -conn:select(space.id, 3, { iterator = 'GE' }) +conn.space[space.id]:select(3, { iterator = 'GE' }) --- - - [3] - [4] - [5] ... -conn:select(space.id, 3, { iterator = 'LE' }) +conn.space[space.id]:select(3, { iterator = 'LE' }) --- - - [3] - [2] - [1] ... -conn:select(space.id, 3, { iterator = 'GT' }) +conn.space[space.id]:select(3, { iterator = 'GT' }) --- - - [4] - [5] ... -conn:select(space.id, 3, { iterator = 'LT' }) +conn.space[space.id]:select(3, { iterator = 'LT' }) --- - - [2] - [1] ... conn:close() --- -- true ... space:drop() --- diff --git a/test/box/protocol.test.lua b/test/box/protocol.test.lua index fce127189d8fce3d5e5dd71e2f9749b76289915e..cf1247521035c0b9c1bd628f70ff6cfcf5c6329a 100644 --- a/test/box/protocol.test.lua +++ b/test/box/protocol.test.lua @@ -9,11 +9,11 @@ space:create_index('primary', { type = 'tree'}) for i=1,5 do space:insert{i} end primary_port = string.gsub(box.cfg.primary_port, '^.*:', '') -conn = box.net.box.new('127.0.0.1', tonumber(primary_port)) -conn:select(space.id, 3, { iterator = 'GE' }) -conn:select(space.id, 3, { iterator = 'LE' }) -conn:select(space.id, 3, { iterator = 'GT' }) -conn:select(space.id, 3, { iterator = 'LT' }) +conn = (require 'net.box'):new('127.0.0.1', tonumber(primary_port)) +conn.space[space.id]:select(3, { iterator = 'GE' }) +conn.space[space.id]:select(3, { iterator = 'LE' }) +conn.space[space.id]:select(3, { iterator = 'GT' }) +conn.space[space.id]:select(3, { iterator = 'LT' }) conn:close() space:drop() diff --git a/test/replication/hot_standby.result b/test/replication/hot_standby.result index c0ede3a300300e2eb90aebd66cb2611fed947370..fa9fe598473f4b7660b849e3951be45b65392773 100644 --- a/test/replication/hot_standby.result +++ b/test/replication/hot_standby.result @@ -58,7 +58,7 @@ end; --# set connection default -- set begin lsn on master, replica and hot_standby. --# set variable replica_port to 'replica.primary_port' -a = box.net.box.new('127.0.0.1', replica_port) +a = (require 'net.box'):new('127.0.0.1', replica_port) --- ... a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) @@ -67,7 +67,6 @@ a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) ... a:close() --- -- true ... space = box.schema.create_space('tweedledum') --- @@ -125,7 +124,7 @@ require('fiber').sleep(0.2) -- hot_standby.primary_port is garbage, since hot_standby.lua -- uses MASTER_PORT environment variable for its primary_port --# set variable hot_standby_port to 'hot_standby.master_port' -a = box.net.box.new('127.0.0.1', hot_standby_port) +a = (require 'net.box'):new('127.0.0.1', hot_standby_port) --- ... a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) @@ -134,7 +133,6 @@ a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) ... a:close() --- -- true ... --# set connection hot_standby _insert(11, 20) diff --git a/test/replication/hot_standby.test.lua b/test/replication/hot_standby.test.lua index 6894ca047c780187d8b1cbdfd88a0f931eb4661d..8847b10eeb3977f86fc2a8c41230479d51fbc7a7 100644 --- a/test/replication/hot_standby.test.lua +++ b/test/replication/hot_standby.test.lua @@ -54,7 +54,7 @@ end; -- set begin lsn on master, replica and hot_standby. --# set variable replica_port to 'replica.primary_port' -a = box.net.box.new('127.0.0.1', replica_port) +a = (require 'net.box'):new('127.0.0.1', replica_port) a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) a:close() @@ -74,7 +74,7 @@ require('fiber').sleep(0.2) -- hot_standby.primary_port is garbage, since hot_standby.lua -- uses MASTER_PORT environment variable for its primary_port --# set variable hot_standby_port to 'hot_standby.master_port' -a = box.net.box.new('127.0.0.1', hot_standby_port) +a = (require 'net.box'):new('127.0.0.1', hot_standby_port) a:call('_set_pri_lsn', box.info.node.id, box.info.node.lsn) a:close()