From e2348f8d7d95fccdb24484af0b74c6c319a77501 Mon Sep 17 00:00:00 2001 From: "Dmitry E. Oboukhov" <unera@debian.org> Date: Tue, 24 Jun 2014 18:43:28 +0400 Subject: [PATCH] socket:close wakeup all waiters. #360 --- src/lua/bsdsocket.lua | 61 +++++++++++++++++++++++++-------------- src/lua/init.cc | 9 ++++++ test/box/bsdsocket.result | 41 +++++++++++++++++++++++++- test/box/bsdsocket.test | 14 +++++++++ 4 files changed, 103 insertions(+), 22 deletions(-) diff --git a/src/lua/bsdsocket.lua b/src/lua/bsdsocket.lua index e811602687..0ebd504997 100644 --- a/src/lua/bsdsocket.lua +++ b/src/lua/bsdsocket.lua @@ -140,7 +140,7 @@ socket_methods.sysread = function(self, len) self._errno = nil local buf = ffi.new('char[?]', len) local res = ffi.C.read(self.fh, buf, len) - + if res < 0 then self._errno = box.errno() return nil @@ -175,13 +175,27 @@ socket_methods.nonblock = function(self, nb) end end +local function wait_safely(self, what, timeout) + local f = box.fiber.self() + local fid = f.id() + + if self.waiters == nil then + self.waiters = {} + end + + self.waiters[fid] = f + local res = box.socket.internal.iowait(self.fh, what, timeout) + self.waiters[fid] = nil + return res +end + socket_methods.readable = function(self, timeout) self._errno = nil if timeout == nil then timeout = TIMEOUT_INFINITY end - local wres = box.socket.internal.iowait(self.fh, 0, timeout) + local wres = wait_safely(self, 0, timeout) if wres == 0 then self._errno = box.errno.ETIMEDOUT @@ -196,7 +210,7 @@ socket_methods.wait = function(self, timeout) timeout = TIMEOUT_INFINITY end - local wres = box.socket.internal.iowait(self.fh, 2, timeout) + local wres = wait_safely(self, 2, timeout) if wres == 0 then self._errno = box.errno.ETIMEDOUT @@ -219,7 +233,7 @@ socket_methods.writable = function(self, timeout) timeout = TIMEOUT_INFINITY end - local wres = box.socket.internal.iowait(self.fh, 1, timeout) + local wres = wait_safely(self, 1, timeout) if wres == 0 then self._errno = box.errno.ETIMEDOUT @@ -261,13 +275,19 @@ socket_methods.bind = function(self, host, port) end socket_methods.close = function(self) + if self.waiters ~= nil then + for fid, fiber in pairs(self.waiters) do + fiber:wakeup() + self.waiters[fid] = nil + end + end + self._errno = nil if ffi.C.close(self.fh) < 0 then self._errno = box.errno() return false - else - return true end + return true end socket_methods.shutdown = function(self, how) @@ -353,7 +373,7 @@ socket_methods.setsockopt = function(self, level, name, value) end return true end - + if name == 'SO_LINGER' then error("Use s:linger(active[, timeout])") end @@ -473,7 +493,7 @@ socket_methods.accept = function(self) self._errno = box.errno() return nil end - + -- Make socket to be non-blocked by default -- ignore result ffi.C.bsdsocket_nonblock(fh, 1) @@ -491,7 +511,7 @@ socket_methods.read = function(self, size, timeout) if self.rbuf == nil then self.rbuf = '' end - + if size == nil then size = 4294967295 end @@ -505,12 +525,11 @@ socket_methods.read = function(self, size, timeout) while timeout > 0 do local started = box.time() - + if not self:readable(timeout) then - self._errno = box.errno() return nil end - + timeout = timeout - ( box.time() - started ) local data = self:sysread(4096) @@ -573,7 +592,7 @@ socket_methods.readline = function(self, limit, eol, timeout) eol = { eol } end end - + if timeout == nil then timeout = TIMEOUT_INFINITY end @@ -596,12 +615,12 @@ socket_methods.readline = function(self, limit, eol, timeout) local started = box.time() while timeout > 0 do local started = box.time() - + if not self:readable(timeout) then self._errno = box.errno() return nil end - + timeout = timeout - ( box.time() - started ) local data = self:sysread(4096) @@ -620,7 +639,7 @@ socket_methods.readline = function(self, limit, eol, timeout) self.rbuf = string.sub(self.rbuf, string.len(data) + 1) return data end - + -- limit if string.len(self.rbuf) >= limit then data = string.sub(self.rbuf, 1, limit) @@ -650,7 +669,7 @@ socket_methods.write = function(self, octets, timeout) return false end end - + if written == string.len(octets) then return true end @@ -828,7 +847,7 @@ local function getaddrinfo(host, port, timeout, opts) end end - + local r = box.socket.internal.getaddrinfo(host, port, timeout, ga_opts) if r == nil then self._errno = box.errno() @@ -888,7 +907,7 @@ local function tcp_connect(host, port, timeout) for i, remote in pairs(dns) do - + timeout = timeout - (box.time() - started) if timeout <= 0 then box.errno(box.errno.ETIMEDOUT) @@ -919,10 +938,10 @@ local function tcp_connect(host, port, timeout) s:close() box.errno(save_errno) end - + if timeout <= 0 then box.errno(box.errno.ETIMEDOUT) - end + end return nil end diff --git a/src/lua/init.cc b/src/lua/init.cc index 9db51b84b7..c6a5400c4f 100644 --- a/src/lua/init.cc +++ b/src/lua/init.cc @@ -625,6 +625,14 @@ lbox_fiber_create(struct lua_State *L) return 1; } +static int +lbox_fiber_wakeup(struct lua_State *L) +{ + struct fiber *f = lbox_checkfiber(L, 1); + fiber_wakeup(f); + return 0; +} + static int lbox_fiber_resume(struct lua_State *L) { @@ -928,6 +936,7 @@ static const struct luaL_reg lbox_fiber_meta [] = { {"name", lbox_fiber_name}, {"cancel", lbox_fiber_cancel}, {"resume", lbox_fiber_resume}, + {"wakeup", lbox_fiber_wakeup}, {"__gc", lbox_fiber_gc}, {NULL, NULL} }; diff --git a/test/box/bsdsocket.result b/test/box/bsdsocket.result index 0417e31db8..6b76980062 100644 --- a/test/box/bsdsocket.result +++ b/test/box/bsdsocket.result @@ -219,7 +219,7 @@ lua s:getsockopt('SOL_SOCKET', 'SO_DEBUG') ... lua s:setsockopt('SOL_SOCKET', 'SO_ACCEPTCONN', 1) --- -error: '[string "-- bsdsocket.lua (internal file)..."]:307: Socket option SO_ACCEPTCONN is read only' +error: '[string "-- bsdsocket.lua (internal file)..."]:327: Socket option SO_ACCEPTCONN is read only' ... lua s:getsockopt('SOL_SOCKET', 'SO_RCVBUF') > 32 --- @@ -728,3 +728,42 @@ lua box.socket.tcp_connect('mail.ru', 80, 0.00000000001) --- - nil ... +lua s = box.socket.tcp_connect('127.0.0.1', box.cfg.primary_port) +--- +... +lua sa = { fh = 512 } setmetatable(sa, getmetatable(s)) +--- +... +lua tostring(sa) +--- + - fd 512 +... +lua sa:readable(0) +--- + - true +... +lua sa:writable(0) +--- + - true +... +lua ch = box.ipc.channel() +--- +... +lua f = box.fiber.wrap(function() s:read(12) ch:put(true) end) +--- +... +lua box.fiber.sleep(.1) +--- +... +lua s:close() +--- + - true +... +lua ch:get(1) +--- + - true +... +lua s:error() +--- + - Connection timed out +... diff --git a/test/box/bsdsocket.test b/test/box/bsdsocket.test index 5d9b94f2bd..f051b8ed75 100644 --- a/test/box/bsdsocket.test +++ b/test/box/bsdsocket.test @@ -227,3 +227,17 @@ exec admin "lua string.match(header, '200 [Oo][Kk]') ~= nil" exec admin "lua s:close()" exec admin "lua box.socket.tcp_connect('mail.ru', 80, 0.00000000001)" + + +exec admin "lua s = box.socket.tcp_connect('127.0.0.1', box.cfg.primary_port)" +exec admin "lua sa = { fh = 512 } setmetatable(sa, getmetatable(s))" +exec admin "lua tostring(sa)" +exec admin "lua sa:readable(0)" +exec admin "lua sa:writable(0)" + +exec admin "lua ch = box.ipc.channel()" +exec admin "lua f = box.fiber.wrap(function() s:read(12) ch:put(true) end)" +exec admin "lua box.fiber.sleep(.1)" +exec admin "lua s:close()" +exec admin "lua ch:get(1)" +exec admin "lua s:error()" -- GitLab