From 75af9e352cc45d0a6c74db163759a5d6fe6f652f Mon Sep 17 00:00:00 2001 From: Roman Tsisyk <roman@tsisyk.com> Date: Wed, 1 Jul 2015 20:14:17 +0300 Subject: [PATCH] Add bindings for struct ibuf and rewrote sockets to use it. + Rewrite msgpackffi to use ibuf to use ibuf for temporary allocations + Rewrite socket:sysread() to use ibuf for temporary allocations + Rewrite socket:read() and socket:write() to use ibuf + Add socket:sysread(char *, size) and socket:syswrite(const char *, size) --- src/CMakeLists.txt | 1 + src/box/lua/tuple.lua | 5 +- src/ffisyms.cc | 7 +- src/iobuf.cc | 9 ++ src/iobuf.h | 3 + src/lua/bsdsocket.lua | 248 ++++++++++++++++++------------------ src/lua/buffer.lua | 191 +++++++++++++++++++++++++++ src/lua/init.cc | 8 ++ src/lua/init.h | 3 + src/lua/msgpackffi.lua | 101 +++++---------- test/app/console.test.lua | 2 +- test/big/iterator.result | 2 +- test/big/lua.result | 2 +- test/box/bsdsocket.result | 37 +++--- test/box/bsdsocket.test.lua | 25 ++-- 15 files changed, 412 insertions(+), 232 deletions(-) create mode 100644 src/lua/buffer.lua diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 573e325ef5..77dc2bc975 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -15,6 +15,7 @@ include_directories(${READLINE_INCLUDE_DIR}) set(lua_sources) lua_source(lua_sources lua/init.lua) lua_source(lua_sources lua/fiber.lua) +lua_source(lua_sources lua/buffer.lua) lua_source(lua_sources lua/uuid.lua) lua_source(lua_sources lua/digest.lua) lua_source(lua_sources lua/msgpackffi.lua) diff --git a/src/box/lua/tuple.lua b/src/box/lua/tuple.lua index a61e466b47..7a87974f67 100644 --- a/src/box/lua/tuple.lua +++ b/src/box/lua/tuple.lua @@ -169,9 +169,8 @@ end -- Set encode hooks for msgpackffi local function tuple_to_msgpack(buf, tuple) - buf:reserve(tuple._bsize) - builtin.tuple_to_buf(tuple, buf.p) - buf.p = buf.p + tuple._bsize + local data = buf:alloc(tuple._bsize) + builtin.tuple_to_buf(tuple, data) end msgpackffi.on_encode(ffi.typeof('const struct tuple &'), tuple_to_msgpack) diff --git a/src/ffisyms.cc b/src/ffisyms.cc index ffa296180b..3125f532a1 100644 --- a/src/ffisyms.cc +++ b/src/ffisyms.cc @@ -15,6 +15,7 @@ #include "fiber.h" #include "base64.h" #include "random.h" +#include "iobuf.h" #include <lib/salad/guava.h> /* @@ -66,5 +67,9 @@ void *ffi_symbols[] = { (void *) random_bytes, (void *) fiber_time, (void *) fiber_time64, - (void *) sophia_schedule + (void *) sophia_schedule, + (void *) tarantool_lua_slab_cache, + (void *) ibuf_create, + (void *) ibuf_destroy, + (void *) ibuf_reserve_nothrow_slow }; diff --git a/src/iobuf.cc b/src/iobuf.cc index 290b130638..430d4cf432 100644 --- a/src/iobuf.cc +++ b/src/iobuf.cc @@ -68,6 +68,15 @@ ibuf_destroy(struct ibuf *ibuf) } } +/** Free memory allocated by this buffer */ +void +ibuf_gc(struct ibuf *ibuf) +{ + struct slab_cache *slabc = ibuf->slabc; + ibuf_destroy(ibuf); + ibuf_create(ibuf, slabc); +} + /** * Ensure the buffer has sufficient capacity * to store size bytes. diff --git a/src/iobuf.h b/src/iobuf.h index b2430e81e0..8759a48997 100644 --- a/src/iobuf.h +++ b/src/iobuf.h @@ -71,6 +71,9 @@ ibuf_create(struct ibuf *ibuf, struct slab_cache *slabc); void ibuf_destroy(struct ibuf *ibuf); +void +ibuf_gc(struct ibuf *ibuf); + /** How much data is read and is not parsed yet. */ static inline size_t ibuf_size(struct ibuf *ibuf) diff --git a/src/lua/bsdsocket.lua b/src/lua/bsdsocket.lua index 6a9caef551..31d836bbe0 100644 --- a/src/lua/bsdsocket.lua +++ b/src/lua/bsdsocket.lua @@ -1,7 +1,7 @@ -- bsdsocket.lua (internal file) local TIMEOUT_INFINITY = 500 * 365 * 86400 -local LIMIT_INFINITY = 4294967295 +local LIMIT_INFINITY = 2147483647 local READAHEAD = 16380 local ffi = require('ffi') @@ -10,6 +10,7 @@ local internal = require('socket') local fiber = require('fiber') local fio = require('fio') local log = require('log') +local buffer = require('buffer') ffi.cdef[[ struct socket { @@ -51,6 +52,9 @@ ffi.cdef[[ int p_proto; /* protocol number */ }; struct protoent *getprotobyname(const char *name); + + void *memmem(const void *haystack, size_t haystacklen, + const void *needle, size_t needlelen); ]] local function sprintf(fmt, ...) @@ -166,31 +170,63 @@ socket_methods.sysconnect = function(self, host, port) return false end -socket_methods.syswrite = function(self, octets) +local function syswrite(self, charptr, size) local fd = check_socket(self) self._errno = nil - local done = ffi.C.write(fd, octets, string.len(octets)) + local done = ffi.C.write(fd, charptr, size) if done < 0 then self._errno = boxerrno() return nil end + return tonumber(done) end -socket_methods.sysread = function(self, size) +socket_methods.syswrite = function(self, arg1, arg2) + -- TODO: ffi.istype('char *', arg1) doesn't work for ffi.new('char[256]') + if type(arg1) == 'cdata' and arg2 ~= nil then + return syswrite(self, arg1, arg2) + elseif type(arg1) == 'string' then + return syswrite(self, arg1, #arg1) + else + error('Usage: socket:syswrite(data) or socket:syswrite(const char *, size)') + end +end + +local function sysread(self, charptr, size) local fd = check_socket(self) - size = size or READAHEAD - self._errno = nil - local buf = ffi.new('char[?]', size) - local res = ffi.C.read(fd, buf, size) + self._errno = nil + local res = ffi.C.read(fd, charptr, size) if res < 0 then self._errno = boxerrno() return nil end - buf = ffi.string(buf, res) - return buf + return tonumber(res) +end + +socket_methods.sysread = function(self, arg1, arg2) + -- TODO: ffi.istype('char *', arg1) doesn't work for ffi.new('char[256]') + if type(arg1) == 'cdata' and arg2 ~= nil then + return sysread(self, arg1, arg2) + end + + local size = arg1 or READAHEAD + + local buf = buffer.IBUF_SHARED + buf:reset() + local p = buf:alloc(size) + + local res = sysread(self, p, size) + if res then + local str = ffi.string(p, res) + buf:recycle() + return str + else + buf:recycle() + return res + end end socket_methods.nonblock = function(self, nb) @@ -546,136 +582,95 @@ local errno_is_fatal = { [boxerrno.ENOTSOCK] = true; } -local function readchunk(self, limit, timeout) - if self.rbuf == nil then - self.rbuf = '' - self.rpos = 1 - self.rlen = 0 - end - - if self.rlen >= limit then - self._errno = nil - local data = string.sub(self.rbuf, self.rpos, self.rpos - 1 + limit) - self.rlen = self.rlen - limit - self.rpos = self.rpos + limit - return data - end - - while true do - local started = fiber.time() - - local to_read - if limit ~= LIMIT_INFINITY and limit > READAHEAD then - to_read = limit - self.rlen - end - local data = self:sysread(to_read) - if data ~= nil then - self.rbuf = string.sub(self.rbuf, self.rpos) .. data - self.rpos = 1 - self.rlen = string.len(self.rbuf) - if string.len(data) == 0 then -- eof - limit = self.rlen - end - if self.rlen >= limit then - data = string.sub(self.rbuf, self.rpos, self.rpos - 1 + limit) - self.rlen = self.rlen - limit - self.rpos = self.rpos + limit - return data - end - elseif not errno_is_transient[self:errno()] then - self._errno = boxerrno() - return nil - end - - if not self:readable(timeout) then - return nil - end - if timeout <= 0 then - break - end - timeout = timeout - ( fiber.time() - started ) +local function check_limit(self, limit) + if self.rbuf.size >= limit then + return limit end - self._errno = boxerrno.ETIMEDOUT return nil end -local function readline_check(self, eols, limit) +local function check_delimiter(self, limit, eols) if limit == 0 then - return '' + return 0 end - if self.rlen == 0 then + local rbuf = self.rbuf + if rbuf.size == 0 then return nil end local shortest for i, eol in ipairs(eols) do - local data = string.match(self.rbuf, "^(.-" .. eol .. ")", self.rpos) + local data = ffi.C.memmem(rbuf.rpos, rbuf.size, eol, #eol) if data ~= nil then - if string.len(data) > limit then - data = string.sub(data, 1, limit) - end - if shortest == nil or string.len(shortest) > string.len(data) then - shortest = data + local len = ffi.cast('char *', data) - rbuf.rpos + #eol + if shortest == nil or shortest > len then + shortest = len end end end - if shortest == nil and self.rlen >= limit then - shortest = string.sub(self.rbuf, self.rpos, self.rpos - 1 + limit) + if shortest ~= nil and shortest <= limit then + return shortest + elseif limit <= rbuf.size then + return limit end - if shortest ~= nil then - local len = string.len(shortest) - self.rpos = self.rpos + len - self.rlen = self.rlen - len - end - return shortest + return nil end -local function readline(self, limit, eol, timeout) - if self.rbuf == nil then - self.rbuf = '' - self.rpos = 1 - self.rlen = 0 +local function read(self, limit, timeout, check, ...) + assert(limit >= 0) + limit = math.min(limit, LIMIT_INFINITY) + local rbuf = self.rbuf + if rbuf == nil then + rbuf = buffer.ibuf() + self.rbuf = rbuf end - self._errno = nil - local data = readline_check(self, eol, limit) - if data ~= nil then + local len = check(self, limit, ...) + if len ~= nil then + self._errno = nil + local data = ffi.string(rbuf.rpos, len) + rbuf.rpos = rbuf.rpos + len return data end local started = fiber.time() while timeout > 0 do local started = fiber.time() - - if not self:readable(timeout) then - self._errno = boxerrno() - return nil - end - - timeout = timeout - ( fiber.time() - started ) - local to_read - if limit ~= LIMIT_INFINITY and limit > READAHEAD then - to_read = limit - self.rlen - end - local data = self:sysread(to_read) - if data ~= nil then - self.rbuf = string.sub(self.rbuf, self.rpos) .. data - self.rpos = 1 - self.rlen = string.len(self.rbuf) - if string.len(data) == 0 then -- eof - limit = self.rlen - end - data = readline_check(self, eol, limit) - if data ~= nil then + assert(rbuf.size < limit) + local to_read = math.min(limit - rbuf.size, READAHEAD) + local data = rbuf:reserve(to_read) + assert(rbuf.unused >= to_read) + local res = sysread(self, data, rbuf.unused) + if res == 0 then -- eof + self._errno = nil + local len = rbuf.size + local data = ffi.string(rbuf.rpos, len) + rbuf.rpos = rbuf.rpos + len + return data + elseif res ~= nil then + rbuf.wpos = rbuf.wpos + res + local len = check(self, limit, ...) + if len ~= nil then + self._errno = nil + local data = ffi.string(rbuf.rpos, len) + rbuf.rpos = rbuf.rpos + len return data end - elseif not errno_is_transient[self:errno()] then self._errno = boxerrno() return nil end + + if not self:readable(timeout) then + return nil + end + if timeout <= 0 then + break + end + timeout = timeout - ( fiber.time() - started ) end + self._errno = boxerrno.ETIMEDOUT return nil end @@ -683,18 +678,18 @@ socket_methods.read = function(self, opts, timeout) check_socket(self) timeout = timeout or TIMEOUT_INFINITY if type(opts) == 'number' then - return readchunk(self, opts, timeout) + return read(self, opts, timeout, check_limit) elseif type(opts) == 'string' then - return readline(self, LIMIT_INFINITY, { opts }, timeout) + return read(self, LIMIT_INFINITY, timeout, check_delimiter, { opts }) elseif type(opts) == 'table' then - local chunk = opts.chunk or opts.size or 4294967295 + local chunk = opts.chunk or opts.size or LIMIT_INFINITY local delimiter = opts.delimiter or opts.line if delimiter == nil then - return readchunk(self, chunk, timeout) + return read(self, chunk, timeout, check_limit) elseif type(delimiter) == 'string' then - return readline(self, chunk, { delimiter }, timeout) + return read(self, chunk, timeout, check_delimiter, { delimiter }) elseif type(delimiter) == 'table' then - return readline(self, chunk, delimiter, timeout) + return read(self, chunk, timeout, check_delimiter, delimiter) end end error('Usage: s:read(delimiter|chunk|{delimiter = x, chunk = x}, timeout)') @@ -706,23 +701,28 @@ socket_methods.write = function(self, octets, timeout) timeout = TIMEOUT_INFINITY end - local total_len = #octets + local s = ffi.cast('const char *', octets) + local p = s + local e = s + #octets + if p == e then + return 0 + end + local started = fiber.time() while true do - local written = self:syswrite(octets) - if written == nil then - if not errno_is_transient[self:errno()] then - return nil + local written = syswrite(self, p, e - p) + if written == 0 then + return p - s -- eof + elseif written ~= nil then + p = p + written + assert(p <= e) + if p == e then + return e - s end - written = 0 + elseif not errno_is_transient[self:errno()] then + return nil end - if written == string.len(octets) then - return total_len - end - if written > 0 then - octets = string.sub(octets, written + 1) - end timeout = timeout - (fiber.time() - started) if timeout <= 0 or not self:writable(timeout) then break diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua new file mode 100644 index 0000000000..3ad7a74558 --- /dev/null +++ b/src/lua/buffer.lua @@ -0,0 +1,191 @@ +-- buffer.lua (internal file) + +local ffi = require('ffi') + +ffi.cdef[[ +struct slab_cache; +struct slab_cache * +tarantool_lua_slab_cache(); + +struct ibuf +{ + struct slab_cache *slabc; + char *buf; + /** Start of input. */ + char *rpos; + /** End of useful input */ + char *wpos; + /** End of ibuf. */ + char *epos; +}; + +void +ibuf_create(struct ibuf *ibuf, struct slab_cache *slabc); + +void +ibuf_destroy(struct ibuf *ibuf); + +void +ibuf_gc(struct ibuf *ibuf); + +int +ibuf_reserve_nothrow_slow(struct ibuf *ibuf, size_t size); +]] + +local builtin = ffi.C +local ibuf_t = ffi.typeof('struct ibuf') + +local function errorf(method, s, ...) + error(string.format(s, ...)) +end + +local function checkibuf(buf, method) + if not ffi.istype(ibuf_t, buf) then + errorf('Attempt to call method without object, use ibuf:%s()', method) + end +end + +local function ibuf_capacity(buf) + checkibuf(buf, 'capacity') + return tonumber(buf.epos - buf.buf) +end + +local function ibuf_pos(buf) + checkibuf(buf, 'pos') + return tonumber(buf.rpos - buf.buf) +end + +local function ibuf_size(buf) + checkibuf(buf, 'size') + return tonumber(buf.wpos - buf.rpos) +end + +local function ibuf_unused(buf) + checkibuf(buf, 'unused') + return tonumber(buf.epos - buf.wpos) +end + +local function ibuf_recycle(buf) + checkibuf(buf, 'recycle') + builtin.ibuf_gc(buf) +end + +local function ibuf_reset(buf) + checkibuf(buf, 'reset') + buf.rpos = buf.buf + buf.wpos = buf.buf +end + +local function ibuf_reserve_slow(buf, size) + if builtin.ibuf_reserve_nothrow_slow(buf, size) ~= 0 then + errorf("Failed to allocate %d bytes in ibuf", size) + end +end + +local function ibuf_reserve(buf, size) + checkibuf(buf, 'reserve') + if buf.wpos + size <= buf.epos then + return buf.wpos + end + ibuf_reserve_slow(buf, size) + return buf.wpos +end + +local function ibuf_alloc(buf, size) + checkibuf(buf, 'alloc') + if buf.wpos + size > buf.epos then + ibuf_reserve_slow(buf, size) + end + local wpos = buf.wpos + buf.wpos = wpos + size + return wpos +end + +local function checksize(buf, size) + if ibuf.rpos + size > ibuf.wpos then + errorf("Attempt to read out of range bytes: needed=%d size=%d", + tonumber(size), ibuf_size(buf)) + end +end + +local function ibuf_checksize(buf, size) + checkibuf(buf, 'checksize') + checksize(buf, size) + return buf.rpos +end + +local function ibuf_read(buf, size) + checkibuf(buf, 'read') + checksize(buf, size) + local rpos = buf.rpos + buf.rpos = rpos + size + return rpos +end + +local ibuf_properties = { + size = ibuf_size; + capacity = ibuf_capacity; + pos = ibuf_pos; + unused = ibuf_unused; +} + +local function ibuf_serialize(buf) + local properties = { rpos = buf.rpos, wpos = buf.wpos } + for key, getter in pairs(ibuf_properties) do + properties[key] = getter(buf) + end + return { ibuf = properties } +end + +local ibuf_methods = { + recycle = ibuf_recycle; + reset = ibuf_reset; + + reserve = ibuf_reserve; + alloc = ibuf_alloc; + + checksize = ibuf_checksize; + read = ibuf_read; + __serialize = ibuf_serialize; +} + +local function ibuf_index(buf, key) + local property = ibuf_properties[key] + if property ~= nil then + return property(buf) + end + local method = ibuf_methods[key] + if method ~= nil then + return method + end + return nil +end + +local function ibuf_tostring(ibuf) + return '<ibuf>' +end +local ibuf_mt = { + __gc = ibuf_recycle; + __index = ibuf_index; + __tostring = ibuf_tostring; +}; + +ffi.metatype(ibuf_t, ibuf_mt); + +local function ibuf_new(arg, arg2) + local buf = ffi.new(ibuf_t) + local slabc = builtin.tarantool_lua_slab_cache() + builtin.ibuf_create(buf, slabc) + if arg == nil then + return buf + elseif type(arg) == 'number' then + ibuf_reserve(buf, arg) + return buf + end + errorf('Usage: ibuf([size])') +end + +return { + ibuf = ibuf_new; + IBUF_SHARED = ibuf_new(); +} diff --git a/src/lua/init.cc b/src/lua/init.cc index 8de13af008..fde8c08f45 100644 --- a/src/lua/init.cc +++ b/src/lua/init.cc @@ -75,6 +75,7 @@ extern char uuid_lua[], fun_lua[], digest_lua[], init_lua[], + buffer_lua[], fiber_lua[], log_lua[], uri_lua[], @@ -89,6 +90,7 @@ extern char uuid_lua[], static const char *lua_modules[] = { "tarantool", init_lua, "fiber", fiber_lua, + "buffer", buffer_lua, "msgpackffi", msgpackffi_lua, "fun", fun_lua, "digest", digest_lua, @@ -388,6 +390,12 @@ tarantool_lua_init(const char *tarantool_bin, int argc, char **argv) char *history = NULL; +struct slab_cache * +tarantool_lua_slab_cache() +{ + return &cord()->slabc; +} + extern "C" const char * tarantool_error_message(void) { diff --git a/src/lua/init.h b/src/lua/init.h index d779255195..af863c6dca 100644 --- a/src/lua/init.h +++ b/src/lua/init.h @@ -83,6 +83,9 @@ tarantool_lua(struct lua_State *L, extern char *history; +extern "C" struct slab_cache * +tarantool_lua_slab_cache(); + /** * Return last exception text */ diff --git a/src/lua/msgpackffi.lua b/src/lua/msgpackffi.lua index df937a49e8..7fd585ca63 100644 --- a/src/lua/msgpackffi.lua +++ b/src/lua/msgpackffi.lua @@ -1,6 +1,7 @@ -- msgpackffi.lua (internal file) local ffi = require('ffi') +local buffer = require('buffer') local builtin = ffi.C local msgpack = require('msgpack') -- .NULL, .array_mt, .map_mt, .cfg local MAXNESTING = 16 @@ -27,37 +28,6 @@ local bswap_u32 = bit.bswap local bswap_u64 = bit.bswap --]] --------------------------------------------------------------------------------- ---- Buffer --------------------------------------------------------------------------------- - -local DEFAULT_CAPACITY = 4096; - -local tmpbuf = {} -tmpbuf.s = ffi.new("char[?]", DEFAULT_CAPACITY) -tmpbuf.e = tmpbuf.s + DEFAULT_CAPACITY -tmpbuf.p = tmpbuf.s -tmpbuf.reserve = function(buf, needed) - if buf.p + needed <= buf.e then - return - end - - local size = buf.p - buf.s - local capacity = buf.e - buf.s - while capacity - size < needed do - capacity = 2 * capacity - end - - local s = ffi.new("char[?]", capacity) - ffi.copy(s, buf.s, size) - buf.s = s - buf.e = s + capacity - buf.p = s + size -end -tmpbuf.reset = function(buf) - buf.p = buf.s -end - -------------------------------------------------------------------------------- -- Encoder -------------------------------------------------------------------------------- @@ -76,54 +46,46 @@ local function on_encode(ctype_or_udataname, callback) end local function encode_fix(buf, code, num) - buf:reserve(1) - buf.p[0] = bit.bor(code, tonumber(num)) - -- buf.p[0] = bit.bor(code, num) -- LuaJIT 2.1 - buf.p = buf.p + 1 + local p = buf:alloc(1) + p[0] = bit.bor(code, tonumber(num)) end local function encode_u8(buf, code, num) - buf:reserve(2) - buf.p[0] = code - ffi.cast(uint16_ptr_t, buf.p + 1)[0] = num - buf.p = buf.p + 2 + local p = buf:alloc(2) + p[0] = code + ffi.cast(uint8_ptr_t, p + 1)[0] = num end local function encode_u16(buf, code, num) - buf:reserve(3) - buf.p[0] = code - ffi.cast(uint16_ptr_t, buf.p + 1)[0] = bswap_u16(num) - buf.p = buf.p + 3 + local p = buf:alloc(3) + p[0] = code + ffi.cast(uint16_ptr_t, p + 1)[0] = bswap_u16(num) end local function encode_u32(buf, code, num) - buf:reserve(5) - buf.p[0] = code - ffi.cast(uint32_ptr_t, buf.p + 1)[0] = bswap_u32(num) - buf.p = buf.p + 5 + local p = buf:alloc(5) + p[0] = code + ffi.cast(uint32_ptr_t, p + 1)[0] = bswap_u32(num) end local function encode_u64(buf, code, num) - buf:reserve(9) - buf.p[0] = code - ffi.cast(uint64_ptr_t, buf.p + 1)[0] = bswap_u64(ffi.cast('uint64_t', num)) - buf.p = buf.p + 9 + local p = buf:alloc(9) + p[0] = code + ffi.cast(uint64_ptr_t, p + 1)[0] = bswap_u64(ffi.cast('uint64_t', num)) end local function encode_float(buf, num) - buf:reserve(5) - buf.p[0] = 0xca; - ffi.cast(float_ptr_t, buf.p + 1)[0] = num - ffi.cast(uint32_ptr_t, buf.p + 1)[0] = bswap_u32(ffi.cast(uint32_ptr_t, buf.p + 1)[0]) - buf.p = buf.p + 5 + local p = buf:alloc(5) + p[0] = 0xca; + ffi.cast(float_ptr_t, p + 1)[0] = num + ffi.cast(uint32_ptr_t, p + 1)[0] = bswap_u32(ffi.cast(uint32_ptr_t, p + 1)[0]) end local function encode_double(buf, num) - buf:reserve(9) - buf.p[0] = 0xcb; - ffi.cast(double_ptr_t, buf.p + 1)[0] = num - ffi.cast(uint64_ptr_t, buf.p + 1)[0] = bswap_u64(ffi.cast(uint64_ptr_t, buf.p + 1)[0]) - buf.p = buf.p + 9 + local p = buf:alloc(9) + p[0] = 0xcb; + ffi.cast(double_ptr_t, p + 1)[0] = num + ffi.cast(uint64_ptr_t, p + 1)[0] = bswap_u64(ffi.cast(uint64_ptr_t, p + 1)[0]) end local function encode_int(buf, num) @@ -166,8 +128,8 @@ local function encode_str(buf, str) else encode_u32(buf, 0xdb, len) end - ffi.copy(buf.p, str, len) - buf.p = buf.p + len + local p = buf:alloc(len) + ffi.copy(p, str, len) end local function encode_array(buf, size) @@ -199,9 +161,8 @@ local function encode_bool_cdata(buf, val) end local function encode_nil(buf) - buf:reserve(1) - buf.p[0] = 0xc0 - buf.p = buf.p + 1 + local p = buf:alloc(1) + p[0] = 0xc0 end local function encode_r(buf, obj, level) @@ -263,9 +224,12 @@ local function encode_r(buf, obj, level) end local function encode(obj) + local tmpbuf = buffer.IBUF_SHARED tmpbuf:reset() encode_r(tmpbuf, obj, 0) - return ffi.string(tmpbuf.s, tmpbuf.p - tmpbuf.s) + local r = ffi.string(tmpbuf.rpos, tmpbuf.size) + tmpbuf:recycle() + return r end on_encode(ffi.typeof('uint8_t'), encode_int) @@ -503,6 +467,7 @@ end -------------------------------------------------------------------------------- local function encode_tuple(obj) + local tmpbuf = buffer.IBUF_SHARED tmpbuf:reset() if obj == nil then encode_fix(tmpbuf, 0x90, 0) -- empty array @@ -516,7 +481,7 @@ local function encode_tuple(obj) encode_fix(tmpbuf, 0x90, 1) -- array of one element encode_r(tmpbuf, obj, 1) end - return tmpbuf.s, tmpbuf.p + return tmpbuf.rpos, tmpbuf.wpos end -------------------------------------------------------------------------------- diff --git a/test/app/console.test.lua b/test/app/console.test.lua index 3ff518d0e1..ad99b7c119 100755 --- a/test/app/console.test.lua +++ b/test/app/console.test.lua @@ -16,7 +16,7 @@ os.remove(CONSOLE_SOCKET) os.remove(IPROTO_SOCKET) -- -local EOL = "\n%.%.%.\n" +local EOL = "\n...\n" test = tap.test("console") diff --git a/test/big/iterator.result b/test/big/iterator.result index cc76c3a7df..6d6010a0a9 100644 --- a/test/big/iterator.result +++ b/test/big/iterator.result @@ -891,7 +891,7 @@ space.index['primary']:pairs({}, {iterator = -666 }) -- Test cases for #123: box.index.count does not check arguments properly space.index['primary']:pairs(function() end, { iterator = box.index.EQ }) --- -- error: 'builtin/msgpackffi.lua:261: can not encode Lua type: ''function''' +- error: 'builtin/msgpackffi.lua:222: can not encode Lua type: ''function''' ... -- Check that iterators successfully invalidated when index deleted gen, param, state = space.index['i1']:pairs(nil, { iterator = box.index.GE }) diff --git a/test/big/lua.result b/test/big/lua.result index 31b231e743..6bf25aeb88 100644 --- a/test/big/lua.result +++ b/test/big/lua.result @@ -574,7 +574,7 @@ space.index['i1']:count() -- Test cases for #123: box.index.count does not check arguments properly space.index['i1']:count(function() end) --- -- error: 'builtin/msgpackffi.lua:261: can not encode Lua type: ''function''' +- error: 'builtin/msgpackffi.lua:222: can not encode Lua type: ''function''' ... space:drop() --- diff --git a/test/box/bsdsocket.result b/test/box/bsdsocket.result index 4e0e3fa6e5..7ff73d18ff 100644 --- a/test/box/bsdsocket.result +++ b/test/box/bsdsocket.result @@ -25,6 +25,9 @@ errno = require 'errno' fio = require 'fio' --- ... +ffi = require('ffi') +--- +... type(socket) --- - table @@ -70,7 +73,7 @@ s:close() s:close() --- -- error: 'builtin/socket.lua:83: attempt to use closed socket' +- error: 'builtin/socket.lua:87: attempt to use closed socket' ... LISTEN = require('uri').parse(box.cfg.listen) --- @@ -134,14 +137,15 @@ s:wait(.01) --- - RW ... -handshake = s:sysread(128) +handshake = ffi.new('char[128]') --- ... -string.len(handshake) +-- test sysread with char * +s:sysread(handshake, 128) --- - 128 ... -string.sub(handshake, 1, 9) +ffi.string(handshake, 9) --- - Tarantool ... @@ -151,7 +155,8 @@ ping = msgpack.encode({ [0] = 64, [1] = 0 }) .. msgpack.encode({}) ping = msgpack.encode(string.len(ping)) .. ping --- ... -s:syswrite(ping) +-- test syswrite with char * +s:syswrite(ffi.cast('const char *', ping), #ping) --- - 7 ... @@ -279,7 +284,7 @@ s:getsockopt('SOL_SOCKET', 'SO_DEBUG') ... s:setsockopt('SOL_SOCKET', 'SO_ACCEPTCONN', 1) --- -- error: 'builtin/socket.lua:357: Socket option SO_ACCEPTCONN is read only' +- error: 'builtin/socket.lua:393: Socket option SO_ACCEPTCONN is read only' ... s:getsockopt('SOL_SOCKET', 'SO_RCVBUF') > 32 --- @@ -482,14 +487,6 @@ sc:shutdown('W') --- - true ... -sa:read(100, 1) ---- -- ' world' -... -sa:read(100, 1) ---- -- -... sa:close() --- - true @@ -1028,7 +1025,7 @@ ch:get(1) ... s:error() --- -- error: 'builtin/socket.lua:83: attempt to use closed socket' +- error: 'builtin/socket.lua:87: attempt to use closed socket' ... -- random port port = 33123 @@ -1395,7 +1392,7 @@ end); client = socket.tcp_connect('unix/', path) --- ... -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) --- ... buf == "a 10\n" @@ -1405,7 +1402,7 @@ buf == "a 10\n" remaining = remaining - #buf --- ... -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) --- ... buf == "b 15\n" @@ -1415,7 +1412,7 @@ buf == "b 15\n" remaining = remaining - #buf --- ... -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) --- ... buf == "abc" @@ -1429,14 +1426,14 @@ remaining == 0 --- - true ... -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) --- ... buf == "" --- - true ... -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) --- ... buf == "" diff --git a/test/box/bsdsocket.test.lua b/test/box/bsdsocket.test.lua index e35748f619..4e25afbe27 100644 --- a/test/box/bsdsocket.test.lua +++ b/test/box/bsdsocket.test.lua @@ -7,6 +7,7 @@ msgpack = require 'msgpack' log = require 'log' errno = require 'errno' fio = require 'fio' +ffi = require('ffi') type(socket) socket('PF_INET', 'SOCK_STREAM', 'tcp121222'); @@ -45,14 +46,16 @@ s:writable(.00000000000001) s:writable(0) s:wait(.01) -handshake = s:sysread(128) -string.len(handshake) -string.sub(handshake, 1, 9) +handshake = ffi.new('char[128]') +-- test sysread with char * +s:sysread(handshake, 128) +ffi.string(handshake, 9) ping = msgpack.encode({ [0] = 64, [1] = 0 }) .. msgpack.encode({}) ping = msgpack.encode(string.len(ping)) .. ping -s:syswrite(ping) +-- test syswrite with char * +s:syswrite(ffi.cast('const char *', ping), #ping) s:readable(1) s:wait(.01) @@ -114,7 +117,6 @@ sc:sysconnect('127.0.0.1', 3457) or errno() == errno.EINPROGRESS sc:writable(10) sc:write('Hello, world') - sa, addr = s:accept() addr2 = sa:name() addr2.host == addr.host @@ -150,8 +152,6 @@ sa:read('ine', 0.1) sc:send('Hello, world') sa:read(',', 1) sc:shutdown('W') -sa:read(100, 1) -sa:read(100, 1) sa:close() sc:close() @@ -410,7 +410,6 @@ s:close() os.remove(path) - server, addr = socket.tcp_server('unix/', path, function(s) s:write('Hello, world') end) type(addr) server ~= nil @@ -475,19 +474,19 @@ server = socket.tcp_server('unix/', path, function(s) end); --# setopt delimiter '' client = socket.tcp_connect('unix/', path) -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) buf == "a 10\n" remaining = remaining - #buf -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) buf == "b 15\n" remaining = remaining - #buf -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) buf == "abc" remaining = remaining - #buf remaining == 0 -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) buf == "" -buf = client:read({ size = remaining, delimiter = "[\r\n]+"}) +buf = client:read({ size = remaining, delimiter = "\n"}) buf == "" client:close() server:close() -- GitLab