From 0e29f2ab185121fd40187eebd8b3614074191356 Mon Sep 17 00:00:00 2001 From: Sulverus <sulverus@gmail.com> Date: Wed, 5 Aug 2015 21:05:30 +0300 Subject: [PATCH] gh-892: update net.box to use buffer.lua - fix net.box to satisfy strict.lua requirements - after review fixes - fix buffer:checksize() in buffer.lua --- src/lua/bsdsocket.lua | 5 +-- src/lua/buffer.lua | 8 ++-- src/lua/msgpack.cc | 10 +++++ src/lua/net_box.lua | 91 +++++++++++++++++++-------------------- test/box/bsdsocket.result | 6 +-- test/box/net.box.result | 6 +-- test/box/net.box.test.lua | 4 +- third_party/luafun | 2 +- 8 files changed, 71 insertions(+), 61 deletions(-) diff --git a/src/lua/bsdsocket.lua b/src/lua/bsdsocket.lua index 31d836bbe0..6d7c5edaec 100644 --- a/src/lua/bsdsocket.lua +++ b/src/lua/bsdsocket.lua @@ -2,7 +2,6 @@ local TIMEOUT_INFINITY = 500 * 365 * 86400 local LIMIT_INFINITY = 2147483647 -local READAHEAD = 16380 local ffi = require('ffi') local boxerrno = require('errno') @@ -212,7 +211,7 @@ socket_methods.sysread = function(self, arg1, arg2) return sysread(self, arg1, arg2) end - local size = arg1 or READAHEAD + local size = arg1 or buffer.READAHEAD local buf = buffer.IBUF_SHARED buf:reset() @@ -638,7 +637,7 @@ local function read(self, limit, timeout, check, ...) local started = fiber.time() assert(rbuf.size < limit) - local to_read = math.min(limit - rbuf.size, READAHEAD) + local to_read = math.min(limit - rbuf.size, buffer.READAHEAD) local data = rbuf:reserve(to_read) assert(rbuf.unused >= to_read) local res = sysread(self, data, rbuf.unused) diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua index ba336ac665..6823b0a98a 100644 --- a/src/lua/buffer.lua +++ b/src/lua/buffer.lua @@ -1,6 +1,7 @@ -- buffer.lua (internal file) local ffi = require('ffi') +local READAHEAD = 16320 ffi.cdef[[ struct slab_cache; @@ -36,7 +37,7 @@ ibuf_reserve_nothrow_slow(struct ibuf *ibuf, size_t size); local builtin = ffi.C local ibuf_t = ffi.typeof('struct ibuf') -local function errorf(method, s, ...) +local function errorf(s, ...) error(string.format(s, ...)) end @@ -106,7 +107,7 @@ local function ibuf_alloc(buf, size) end local function checksize(buf, size) - if ibuf.rpos + size > ibuf.wpos then + if buf.rpos + size > buf.wpos then errorf("Attempt to read out of range bytes: needed=%d size=%d", tonumber(size), ibuf_used(buf)) end @@ -179,7 +180,7 @@ ffi.metatype(ibuf_t, ibuf_mt); local function ibuf_new(arg, arg2) local buf = ffi.new(ibuf_t) local slabc = builtin.tarantool_lua_slab_cache() - builtin.ibuf_create(buf, slabc, 16320) + builtin.ibuf_create(buf, slabc, READAHEAD) if arg == nil then return buf elseif type(arg) == 'number' then @@ -192,4 +193,5 @@ end return { ibuf = ibuf_new; IBUF_SHARED = ibuf_new(); + READAHEAD = READAHEAD; } diff --git a/src/lua/msgpack.cc b/src/lua/msgpack.cc index a31e674cdb..a32b62f248 100644 --- a/src/lua/msgpack.cc +++ b/src/lua/msgpack.cc @@ -419,12 +419,22 @@ lua_msgpack_decode(lua_State *L) return 2; } +static int +lua_ibuf_msgpack_decode(lua_State *L) +{ + const char **start = (const char **)lua_topointer(L, 1); + struct luaL_serializer *cfg = luaL_checkserializer(L); + luamp_decode(L, cfg, start); + return 2; +} + static int lua_msgpack_new(lua_State *L); const luaL_reg msgpacklib[] = { { "encode", lua_msgpack_encode }, { "decode", lua_msgpack_decode }, + { "ibuf_decode", lua_ibuf_msgpack_decode }, { "new", lua_msgpack_new }, { NULL, NULL} }; diff --git a/src/lua/net_box.lua b/src/lua/net_box.lua index 1c6e99ec2d..a06f84073b 100644 --- a/src/lua/net_box.lua +++ b/src/lua/net_box.lua @@ -9,6 +9,7 @@ local ffi = require 'ffi' local digest = require 'digest' local yaml = require 'yaml' local urilib = require 'uri' +local buffer = require 'buffer' -- packet codes local OK = 0 @@ -423,7 +424,6 @@ local remote_methods = { self.is_run = true self.state = 'init' self.wbuf = {} - self.rbuf = '' self.rpos = 1 self.rlen = 0 @@ -575,7 +575,6 @@ local remote_methods = { self.space = {} self:_switch_state('error') self:_error_waiters(emsg) - self.rbuf = '' self.rpos = 1 self.rlen = 0 self.wbuf = {} @@ -599,54 +598,47 @@ local remote_methods = { end, _check_console_response = function(self) - while true do - local docend = "\n...\n" - if self.rlen < #docend or - string.sub(self.rbuf, #self.rbuf + 1 - #docend) ~= docend then - break - end - local resp = string.sub(self.rbuf, self.rpos) - local len = #resp - self.rpos = self.rpos + len - self.rlen = self.rlen - len + local docend = "\n...\n" + local resp = ffi.string(self.rbuf.rpos, self.rbuf.size) - local hdr = { [SYNC] = CONSOLE_FAKESYNC, [TYPE] = 0 } - local body = { [DATA] = resp } + if #resp < #docend or + string.sub(resp, #resp + 1 - #docend) ~= docend then + return -1 + end - if self.ch.sync[CONSOLE_FAKESYNC] ~= nil then - self.ch.sync[CONSOLE_FAKESYNC]:put({hdr = hdr, body = body }) - self.ch.sync[CONSOLE_FAKESYNC] = nil - else - log.warn("Unexpected console response: %s", resp) - end + local hdr = { [SYNC] = CONSOLE_FAKESYNC, [TYPE] = 0 } + local body = { [DATA] = resp } + + if self.ch.sync[CONSOLE_FAKESYNC] ~= nil then + self.ch.sync[CONSOLE_FAKESYNC]:put({hdr = hdr, body = body }) + self.ch.sync[CONSOLE_FAKESYNC] = nil + else + log.warn("Unexpected console response: %s", resp) end + self.rbuf:read(#resp) + return 0 end, _check_binary_response = function(self) while true do - if self.rlen < 5 then - break + if self.rbuf.size < 5 then + return 0 end - local len, off = msgpack.decode(self.rbuf, self.rpos) - -- wait for correct package length - if off + len > #self.rbuf + 1 then - break + local rpos, len = msgpack.ibuf_decode(self.rbuf.rpos) + if rpos + len > self.rbuf.wpos then + return len - (self.rbuf.wpos - rpos) end - local hdr, body - hdr, off = msgpack.decode(self.rbuf, off) - if off <= #self.rbuf then - body, off = msgpack.decode(self.rbuf, off) - -- disable YAML flow output (useful for admin console) - setmetatable(body, mapping_mt) - else - body = {} - end + local rpos, hdr = msgpack.ibuf_decode(rpos) + local body = {} - self.rpos = off - self.rlen = #self.rbuf + 1 - self.rpos + if rpos < self.rbuf.wpos then + rpos, body= msgpack.ibuf_decode(rpos) + end + setmetatable(body, mapping_mt) + self.rbuf.rpos = rpos local sync = hdr[SYNC] if self.ch.sync[sync] ~= nil then @@ -655,6 +647,10 @@ local remote_methods = { else log.warn("Unexpected response %s", tostring(sync)) end + + if self.rbuf.size == 0 then + return 0 + end end end, @@ -888,19 +884,22 @@ local remote_methods = { _read_worker = function(self) fiber.name('net.box.read') + self.rbuf = buffer.ibuf(buffer.READAHEAD) while self:_wait_state(self._r_states) ~= 'closed' do if self.s:readable() then - local data = self.s:sysread() - - if data ~= nil then - if #data == 0 then + local len = self.s:sysread(self.rbuf.wpos, self.rbuf.unused) + if len ~= nil then + if len == 0 then self:_fatal('Remote host closed connection') else - self.rbuf = string.sub(self.rbuf, self.rpos) .. - data - self.rpos = 1 - self.rlen = #self.rbuf - self:_check_response() + self.rbuf.wpos = self.rbuf.wpos + len + + local advance = self:_check_response() + if advance < 0 then + advance = buffer.READAHEAD + end + + self.rbuf:reserve(advance) end elseif errno_is_transient[errno()] ~= true then self:_fatal(errno.strerror(errno())) diff --git a/test/box/bsdsocket.result b/test/box/bsdsocket.result index 714d91681a..9491d7b2a8 100644 --- a/test/box/bsdsocket.result +++ b/test/box/bsdsocket.result @@ -73,7 +73,7 @@ s:close() s:close() --- -- error: 'builtin/socket.lua:87: attempt to use closed socket' +- error: 'builtin/socket.lua:86: attempt to use closed socket' ... LISTEN = require('uri').parse(box.cfg.listen) --- @@ -284,7 +284,7 @@ s:getsockopt('SOL_SOCKET', 'SO_DEBUG') ... s:setsockopt('SOL_SOCKET', 'SO_ACCEPTCONN', 1) --- -- error: 'builtin/socket.lua:393: Socket option SO_ACCEPTCONN is read only' +- error: 'builtin/socket.lua:392: Socket option SO_ACCEPTCONN is read only' ... s:getsockopt('SOL_SOCKET', 'SO_RCVBUF') > 32 --- @@ -1025,7 +1025,7 @@ ch:get(1) ... s:error() --- -- error: 'builtin/socket.lua:87: attempt to use closed socket' +- error: 'builtin/socket.lua:86: attempt to use closed socket' ... -- random port port = 33123 diff --git a/test/box/net.box.result b/test/box/net.box.result index 19c00e451c..843a02d8f3 100644 --- a/test/box/net.box.result +++ b/test/box/net.box.result @@ -288,7 +288,7 @@ cn.space.net_box_test_space:insert{234, 1,2,3} ... cn.space.net_box_test_space.insert{234, 1,2,3} --- -- error: 'builtin/net.box.lua:236: Use space:method(...) instead space.method(...)' +- error: 'builtin/net.box.lua:237: Use space:method(...) instead space.method(...)' ... cn.space.net_box_test_space:replace{354, 1,2,3} --- @@ -861,8 +861,8 @@ file_log:seek(0, 'SEEK_END') ~= 0 --# setopt delimiter ';' _ = fiber.create( function() - conn = require('net.box').new(box.cfg.listen) - conn.call('no_such_function', {}) + local conn = require('net.box').new(box.cfg.listen) + conn:call('no_such_function', {}) end ); --- diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua index ee1c37283d..c5ed905767 100644 --- a/test/box/net.box.test.lua +++ b/test/box/net.box.test.lua @@ -345,8 +345,8 @@ file_log:seek(0, 'SEEK_END') ~= 0 _ = fiber.create( function() - conn = require('net.box').new(box.cfg.listen) - conn.call('no_such_function', {}) + local conn = require('net.box').new(box.cfg.listen) + conn:call('no_such_function', {}) end ); while true do diff --git a/third_party/luafun b/third_party/luafun index 726200c739..3d44c0841d 160000 --- a/third_party/luafun +++ b/third_party/luafun @@ -1 +1 @@ -Subproject commit 726200c73943f8efdade21c79a722a82566ec45f +Subproject commit 3d44c0841dbc93b645546bb13868550089bfa076 -- GitLab