From 4b69dfcdcc8f37474960fa54775ea4df2d5a8efa Mon Sep 17 00:00:00 2001
From: Vladislav Shpilevoy <v.shpilevoy@tarantool.org>
Date: Wed, 21 Feb 2018 18:53:16 +0300
Subject: [PATCH] netbox: fix leak of connection with set reconnect_after
 option

If a connection has reconnect_after > 0, then it is never deleted
until it is explicitly closed or reconnect_after is reset. It is
because worker fiber of such connection holds all references
during yield.

Fix it by do not waiting for a next reconnection inside a state
machine - protocol_sm() function must not be infinite in a case
of error.

Closes #3164
---
 src/box/lua/net_box.lua   | 43 ++++++++++++++------
 test/box/net.box.result   | 82 +++++++++++++++++++++++++++++++++++++--
 test/box/net.box.test.lua | 32 ++++++++++++++-
 3 files changed, 140 insertions(+), 17 deletions(-)

diff --git a/src/box/lua/net_box.lua b/src/box/lua/net_box.lua
index 396dc39ebd..0c8b18793b 100644
--- a/src/box/lua/net_box.lua
+++ b/src/box/lua/net_box.lua
@@ -112,10 +112,11 @@ local function next_id(id) return band(id + 1, 0x7FFFFFFF) end
 -- The following events are delivered, with arguments:
 --
 --  'state_changed', state, errno, error
---  'handshake', greeting           -> nil (accept) / errno, error (reject)
---  'will_fetch_schema'             -> true (approve) / false (skip fetch)
+--  'handshake', greeting -> nil (accept) / errno, error (reject)
+--  'will_fetch_schema'   -> true (approve) / false (skip fetch)
 --  'did_fetch_schema', schema_version, spaces, indices
---  'will_reconnect', errno, error  -> true (approve) / false (reject)
+--  'reconnect_timeout'   -> get reconnect timeout if set and > 0,
+--                           else nil is returned.
 --
 -- Suggestion for callback writers: sleep a few secs before approving
 -- reconnect.
@@ -193,6 +194,7 @@ local function create_transport(host, port, user, password, callback)
         fiber.create(function()
             worker_fiber = fiber_self()
             fiber.name(string.format('%s:%s (net.box)', host, port), {truncate=true})
+    ::reconnect::
             local ok, err = pcall(protocol_sm)
             if not (ok or is_final_state[state]) then
                 set_state('error', E_UNKNOWN, err)
@@ -201,6 +203,14 @@ local function create_transport(host, port, user, password, callback)
                 connection:close()
                 connection = nil
             end
+            local timeout = callback('reconnect_timeout')
+            if timeout and state == 'error_reconnect' then
+                fiber.sleep(timeout)
+                timeout = callback('reconnect_timeout')
+                if timeout and state == 'error_reconnect' then
+                    goto reconnect
+                end
+            end
             send_buf:recycle()
             recv_buf:recycle()
             worker_fiber = nil
@@ -349,6 +359,14 @@ local function create_transport(host, port, user, password, callback)
     -- a state machine in Lua.
     local console_sm, iproto_auth_sm, iproto_schema_sm, iproto_sm, error_sm
 
+    --
+    -- Protocol_sm is a core function of netbox. It calls all
+    -- other ..._sm() functions, and explicitly or implicitly
+    -- holds Lua referece on a connection object. It means, that
+    -- until it works, the connection can not be garbage
+    -- collected. See gh-3164, where because of reconnect sleeps
+    -- in this function, a connection could not be deleted.
+    --
     protocol_sm = function ()
         local tm_begin, tm = fiber.clock(), callback('fetch_connect_timeout')
         connection = socket.tcp_connect(host, port, tm)
@@ -496,12 +514,12 @@ local function create_transport(host, port, user, password, callback)
         if connection then connection:close(); connection = nil end
         send_buf:recycle()
         recv_buf:recycle()
-        set_state('error_reconnect', err, msg)
-        if callback('will_reconnect', err, msg) then
-            set_state('connecting')
-            return protocol_sm()
-        else
-            set_state('error', err, msg)
+        if state ~= 'closed' then
+            if callback('reconnect_timeout') then
+                set_state('error_reconnect', err, msg)
+            else
+                set_state('error', err, msg)
+            end
         end
     end
 
@@ -629,9 +647,10 @@ local function connect(...)
             return opts.connect_timeout or 10
         elseif what == 'did_fetch_schema' then
             remote:_install_schema(...)
-        elseif what == 'will_reconnect' then
-            if type(opts.reconnect_after) == 'number' then
-                fiber.sleep(opts.reconnect_after); return true
+        elseif what == 'reconnect_timeout' then
+            if type(opts.reconnect_after) == 'number' and
+               opts.reconnect_after > 0 then
+                return opts.reconnect_after
             end
         end
     end
diff --git a/test/box/net.box.result b/test/box/net.box.result
index 5f4e163c03..da236012d3 100644
--- a/test/box/net.box.result
+++ b/test/box/net.box.result
@@ -1962,10 +1962,6 @@ test_run:cmd('stop server connecter')
 ---
 - true
 ...
-test_run:cmd('cleanup server connecter')
----
-- true
-...
 --
 -- gh-2401 update pseudo objects not replace them
 --
@@ -2192,6 +2188,84 @@ box.schema.user.revoke('guest', 'write', 'space', '_space')
 box.schema.user.revoke('guest', 'write', 'space', '_schema')
 ---
 ...
+c:close()
+---
+...
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+---
+- true
+...
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+---
+- true
+...
+weak = setmetatable({}, {__mode = 'v'})
+---
+...
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.1})
+---
+...
+weak.c = strong
+---
+...
+weak.c:ping()
+---
+- true
+...
+test_run:cmd('stop server connecter')
+---
+- true
+...
+test_run:cmd('cleanup server connecter')
+---
+- true
+...
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+log.info(string.rep('a', 1000))
+---
+...
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+strong.state
+---
+- error_reconnect
+...
+strong == weak.c
+---
+- true
+...
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+---
+...
+collectgarbage('collect')
+---
+- 0
+...
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
+---
+- null
+...
 box.schema.user.revoke('guest', 'execute', 'universe')
 ---
 ...
diff --git a/test/box/net.box.test.lua b/test/box/net.box.test.lua
index b23a8830c1..9de440efd1 100644
--- a/test/box/net.box.test.lua
+++ b/test/box/net.box.test.lua
@@ -789,7 +789,6 @@ disconnected_cnt
 conn:close()
 disconnected_cnt
 test_run:cmd('stop server connecter')
-test_run:cmd('cleanup server connecter')
 
 --
 -- gh-2401 update pseudo objects not replace them
@@ -892,6 +891,37 @@ box.space.test2:drop()
 box.space.test3:drop()
 box.schema.user.revoke('guest', 'write', 'space', '_space')
 box.schema.user.revoke('guest', 'write', 'space', '_schema')
+c:close()
+
+--
+-- gh-3164: netbox connection is not closed and garbage collected
+-- ever, if reconnect_after is set.
+--
+test_run:cmd('start server connecter')
+test_run:cmd("set variable connect_to to 'connecter.listen'")
+weak = setmetatable({}, {__mode = 'v'})
+-- Create strong and weak reference. Weak is valid until strong
+-- is valid too.
+strong = net.connect(connect_to, {reconnect_after = 0.1})
+weak.c = strong
+weak.c:ping()
+test_run:cmd('stop server connecter')
+test_run:cmd('cleanup server connecter')
+-- Check the connection tries to reconnect at least two times.
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+log.info(string.rep('a', 1000))
+while test_run:grep_log('default', 'Connection refused', 1000) == nil do fiber.sleep(0.1) end
+collectgarbage('collect')
+strong.state
+strong == weak.c
+-- Remove single strong reference. Now connection must be garbage
+-- collected.
+strong = nil
+collectgarbage('collect')
+-- Now weak.c is null, because it was weak reference, and the
+-- connection is deleted by 'collect'.
+weak.c
 
 box.schema.user.revoke('guest', 'execute', 'universe')
 c:close()
-- 
GitLab