Skip to content
Snippets Groups Projects
Commit 07f2df5a authored by mechanik20051988's avatar mechanik20051988 Committed by Nikita Pettik
Browse files

iproto: fix multiple socket closures and not deleting unix socket path

All iproto threads listening same socket, and if user change listen
address, this socket is closed in each iproto thread. This patch fix
this error, now socket is closed only in main thread, and in other threads
we are only stop listening, without socket closing. Also this patch fix
error, related to the fact, that tarantool did not delete the unix socket
path, when it's finishing work.
parent 840a65f0
No related branches found
No related tags found
No related merge requests found
## core/bugfix
* Fixed error, related to the fact, that if user changed listen address,
all iproto threads closed same socket multiple times.
Fixed error, related to the fact, that tarantool not deleting the unix
socket path, when it's finishing work.
\ No newline at end of file
......@@ -155,6 +155,14 @@ struct iproto_thread {
static struct iproto_thread *iproto_threads;
static int iproto_threads_count;
/**
* This binary contains all bind socket properties, like
* address the iproto listens for. Is kept in TX to be
* shown in box.info. It should be global, because it contains
* properties, and should be accessible from differnent functions
* in tx thread.
*/
static struct evio_service tx_binary;
/**
* In Greek mythology, Kharon is the ferryman who carries souls
......@@ -198,22 +206,14 @@ unsigned iproto_readahead = 16320;
/* The maximal number of iproto messages in fly. */
static int iproto_msg_max = IPROTO_MSG_MAX_MIN;
/**
* Address the iproto listens for, stored in TX
* thread. Is kept in TX to be shown in box.info.
*/
static struct sockaddr_storage iproto_bound_address_storage;
/** 0 means that no address is listened. */
static socklen_t iproto_bound_address_len;
const char *
iproto_bound_address(char *buf)
{
if (iproto_bound_address_len == 0)
if (tx_binary.addr_len == 0)
return NULL;
sio_addr_snprintf(buf, SERVICE_NAME_MAXLEN,
(struct sockaddr *) &iproto_bound_address_storage,
iproto_bound_address_len);
(struct sockaddr *)&tx_binary.addrstorage,
tx_binary.addr_len);
return buf;
}
......@@ -2064,9 +2064,7 @@ net_cord_f(va_list ap)
* will take care of creating events for incoming
* connections.
*/
if (evio_service_is_active(&iproto_thread->binary))
evio_service_stop(&iproto_thread->binary);
evio_service_detach(&iproto_thread->binary);
return 0;
}
......@@ -2259,7 +2257,11 @@ iproto_init(int threads_count)
/* .fd = */ iproto_session_fd,
/* .sync = */ iproto_session_sync,
};
/*
* We use this tx_binary only for bind, not for listen, so
* we don't need any accept functions.
*/
evio_service_init(loop(), &tx_binary, "tx_binary", NULL, NULL);
iproto_threads = (struct iproto_thread *)
calloc(threads_count, sizeof(struct iproto_thread));
......@@ -2408,8 +2410,7 @@ iproto_do_cfg_f(struct cbus_call_msg *m)
diag_raise();
break;
case IPROTO_CFG_STOP:
if (evio_service_is_active(&iproto_thread->binary))
evio_service_stop(&iproto_thread->binary);
evio_service_detach(&iproto_thread->binary);
break;
case IPROTO_CFG_STAT:
iproto_fill_stat(iproto_thread, cfg_msg);
......@@ -2460,27 +2461,23 @@ iproto_send_listen_msg(struct evio_service *binary)
int
iproto_listen(const char *uri)
{
struct evio_service binary;
memset(&binary, 0, sizeof(binary));
if (iproto_send_stop_msg() != 0)
return -1;
if (uri != NULL) {
/*
* Please note, we bind socket in main thread, and then
* listen this socket in all iproto threads! With this
* implementation, we rely on the Linux kernel to distribute
* incoming connections across iproto threads.
*/
if (evio_service_bind(&binary, uri) != 0)
return -1;
if (iproto_send_listen_msg(&binary) != 0)
return -1;
evio_service_stop(&tx_binary);
if (uri == NULL) {
tx_binary.addr_len = 0;
return 0;
}
iproto_bound_address_storage = binary.addrstorage;
iproto_bound_address_len = binary.addr_len;
/*
* Please note, we bind socket in main thread, and then
* listen this socket in all iproto threads! With this
* implementation, we rely on the Linux kernel to distribute
* incoming connections across iproto threads.
*/
if (evio_service_bind(&tx_binary, uri) != 0)
return -1;
if (iproto_send_listen_msg(&tx_binary) != 0)
return -1;
return 0;
}
......@@ -2584,13 +2581,17 @@ iproto_free(void)
* failing to bind in case it tries to bind before socket
* is closed by OS.
*/
if (evio_service_is_active(&iproto_threads[i].binary))
close(iproto_threads[i].binary.ev.fd);
evio_service_detach(&iproto_threads[i].binary);
rmean_delete(iproto_threads[i].rmean);
slab_cache_destroy(&iproto_threads[i].net_slabc);
}
free(iproto_threads);
/*
* Here we close socket and unlink unix socket path.
* in case it's unix socket.
*/
evio_service_stop(&tx_binary);
}
int
......
......@@ -407,16 +407,29 @@ evio_service_stop(struct evio_service *service)
{
say_info("%s: stopped", evio_service_name(service));
int service_fd = service->ev.fd;
evio_service_detach(service);
if (service_fd < 0)
return;
if (close(service_fd) < 0)
say_error("Failed to close socket: %s", strerror(errno));
if (service->addr.sa_family != AF_UNIX)
return;
if (unlink(((struct sockaddr_un *)&service->addr)->sun_path) < 0) {
say_error("Failed to unlink unix "
"socket path: %s", strerror(errno));
}
}
void
evio_service_detach(struct evio_service *service)
{
if (ev_is_active(&service->ev)) {
ev_io_stop(service->loop, &service->ev);
service->addr_len = 0;
}
if (service->ev.fd >= 0) {
close(service->ev.fd);
ev_io_set(&service->ev, -1, 0);
if (service->addr.sa_family == AF_UNIX) {
unlink(((struct sockaddr_un *) &service->addr)->sun_path);
}
}
ev_io_set(&service->ev, -1, 0);
}
......@@ -117,6 +117,10 @@ evio_service_bind(struct evio_service *service, const char *uri);
int
evio_service_listen(struct evio_service *service);
/** If started, stop event flow, without closing the acceptor socket. */
void
evio_service_detach(struct evio_service *service);
/** If started, stop event flow and close the acceptor socket. */
void
evio_service_stop(struct evio_service *service);
......
#!/usr/bin/env tarantool
require('console').listen(os.getenv('ADMIN'))
box.cfg({
listen = os.getenv('LISTEN'),
iproto_threads = tonumber(arg[1]),
})
-- test-run result file version 2
env = require('test_run')
| ---
| ...
fio = require('fio')
| ---
| ...
test_run = env.new()
| ---
| ...
test_run:cmd("create server test with script='box/box-cfg-unix-socket-del.lua'")
| ---
| - true
| ...
-- Check that unix socket path deleted after tarantool is finished
test_run:cmd("setopt delimiter ';'")
| ---
| - true
| ...
for i = 1, 2 do
local thread_count = i
test_run:cmd(string.format("start server test with args=\"%s\"", thread_count))
server_addr = test_run:eval('test', 'return box.cfg.listen')[1]
test_run:cmd("stop server test")
assert(fio.path.exists(server_addr) == false)
end
test_run:cmd("setopt delimiter ''");
| ---
| ...
-- Check, that all sockets are closed correctly,
-- when the listening address is changed.
test_run:cmd("start server test with args=2")
| ---
| - true
| ...
server_addr_before = test_run:eval('test', 'return box.cfg.listen')[1]
| ---
| ...
test_run:eval('test', string.format("box.cfg{ listen = \'%s\' }", server_addr_before .. "X"))
| ---
| - []
| ...
server_addr_after = test_run:eval('test', 'return box.cfg.listen')[1]
| ---
| ...
assert(server_addr_after == server_addr_before .. "X")
| ---
| - true
| ...
assert(test_run:grep_log("test", "Bad file descriptor") == nil)
| ---
| - true
| ...
assert(test_run:grep_log("test", "No such file or directory") == nil)
| ---
| - true
| ...
test_run:eval('test', string.format("box.cfg { listen = \'%s\' }", server_addr_before))
| ---
| - []
| ...
server_addr_result = test_run:eval('test', 'return box.cfg.listen')[1]
| ---
| ...
assert(server_addr_result == server_addr_before)
| ---
| - true
| ...
test_run:cmd("stop server test")
| ---
| - true
| ...
assert(not fio.path.exists(server_addr_before))
| ---
| - true
| ...
assert(not fio.path.exists(server_addr_after))
| ---
| - true
| ...
test_run:cmd("cleanup server test")
| ---
| - true
| ...
test_run:cmd("delete server test")
| ---
| - true
| ...
env = require('test_run')
fio = require('fio')
test_run = env.new()
test_run:cmd("create server test with script='box/box-cfg-unix-socket-del.lua'")
-- Check that unix socket path deleted after tarantool is finished
test_run:cmd("setopt delimiter ';'")
for i = 1, 2 do
local thread_count = i
test_run:cmd(string.format("start server test with args=\"%s\"", thread_count))
server_addr = test_run:eval('test', 'return box.cfg.listen')[1]
test_run:cmd("stop server test")
assert(fio.path.exists(server_addr) == false)
end
test_run:cmd("setopt delimiter ''");
-- Check, that all sockets are closed correctly,
-- when the listening address is changed.
test_run:cmd("start server test with args=2")
server_addr_before = test_run:eval('test', 'return box.cfg.listen')[1]
test_run:eval('test', string.format("box.cfg{ listen = \'%s\' }", server_addr_before .. "X"))
server_addr_after = test_run:eval('test', 'return box.cfg.listen')[1]
assert(server_addr_after == server_addr_before .. "X")
assert(test_run:grep_log("test", "Bad file descriptor") == nil)
assert(test_run:grep_log("test", "No such file or directory") == nil)
test_run:eval('test', string.format("box.cfg { listen = \'%s\' }", server_addr_before))
server_addr_result = test_run:eval('test', 'return box.cfg.listen')[1]
assert(server_addr_result == server_addr_before)
test_run:cmd("stop server test")
assert(not fio.path.exists(server_addr_before))
assert(not fio.path.exists(server_addr_after))
test_run:cmd("cleanup server test")
test_run:cmd("delete server test")
......@@ -70,9 +70,6 @@ test_run:cmd('cleanup server connecter')
- true
...
-- Check the connection tries to reconnect at least two times.
-- 'Cannot assign requested address' is the crutch for running the
-- tests in a docker. This error emits instead of
-- 'Connection refused' inside a docker.
old_log_level = box.cfg.log_level
---
...
......@@ -86,9 +83,8 @@ test_run:cmd("setopt delimiter ';'")
---
- true
...
while test_run:grep_log('default', 'Network is unreachable', 1000) == nil and
test_run:grep_log('default', 'Connection refused', 1000) == nil and
test_run:grep_log('default', 'Cannot assign requested address', 1000) == nil do
while test_run:grep_log('default', 'unix/', 1000) == nil and
test_run:grep_log('default', 'No such file or directory', 1000) == nil do
fiber.sleep(0.1)
end;
---
......@@ -96,9 +92,8 @@ end;
log.info(string.rep('a', 1000));
---
...
while test_run:grep_log('default', 'Network is unreachable', 1000) == nil and
test_run:grep_log('default', 'Connection refused', 1000) == nil and
test_run:grep_log('default', 'Cannot assign requested address', 1000) == nil do
while test_run:grep_log('default', 'unix/', 1000) == nil and
test_run:grep_log('default', 'No such file or directory', 1000) == nil do
fiber.sleep(0.1)
end;
---
......
......@@ -26,22 +26,17 @@ weak.c:ping()
test_run:cmd('stop server connecter')
test_run:cmd('cleanup server connecter')
-- Check the connection tries to reconnect at least two times.
-- 'Cannot assign requested address' is the crutch for running the
-- tests in a docker. This error emits instead of
-- 'Connection refused' inside a docker.
old_log_level = box.cfg.log_level
box.cfg{log_level = 6}
log.info(string.rep('a', 1000))
test_run:cmd("setopt delimiter ';'")
while test_run:grep_log('default', 'Network is unreachable', 1000) == nil and
test_run:grep_log('default', 'Connection refused', 1000) == nil and
test_run:grep_log('default', 'Cannot assign requested address', 1000) == nil do
while test_run:grep_log('default', 'unix/', 1000) == nil and
test_run:grep_log('default', 'No such file or directory', 1000) == nil do
fiber.sleep(0.1)
end;
log.info(string.rep('a', 1000));
while test_run:grep_log('default', 'Network is unreachable', 1000) == nil and
test_run:grep_log('default', 'Connection refused', 1000) == nil and
test_run:grep_log('default', 'Cannot assign requested address', 1000) == nil do
while test_run:grep_log('default', 'unix/', 1000) == nil and
test_run:grep_log('default', 'No such file or directory', 1000) == nil do
fiber.sleep(0.1)
end;
test_run:cmd("setopt delimiter ''");
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment