Skip to content
Snippets Groups Projects
Commit 0e86fbde authored by Nikolay Shirokovskiy's avatar Nikolay Shirokovskiy Committed by Vladimir Davydov
Browse files

misc: cleanup usage of pthread_cancel

At last we can drop usage of pthread_cancel and associated functions.
And remove related leak suppressions.

Let's keep memory protection disabling under ASAN. Otherwise
leak sanitizer may misbehave on Tarantool panic as below.

```
  #   Tracer caught signal 11: addr=0x705236d1e000 pc=0x57b7605b10d0 sp=0x705232a00ca0\
  #   ==1022907==LeakSanitizer has encountered a fatal error.\
  #   ==1022907==HINT: For debugging, try setting environment variable LSAN_OPTIONS=verbosity=1:log_threads=1\
  #   ==1022907==HINT: LeakSanitizer does not work under ptrace (strace, gdb, etc)",
```

Let's also add missing pipe/endpoint destroy in wal while at it.

Close #8423

NO_CHANGELOG=internal
NO_DOC=internal
parent a83d5e3c
No related branches found
No related tags found
No related merge requests found
......@@ -4,5 +4,3 @@
# File format:
#fun:*
#src:*
# TODO(gh-8423) expected to be removed when the issue is solved
fun:ev_async_stop
......@@ -61,12 +61,6 @@ leak:libc.so*
# source: src/lib/salad/mhash.h
leak:mh_i32ptr_new
# test: replication/gh-3637-misc-error-on-replica-auth-fail.test.lua
# source: src/lib/core/coio_task.c
leak:coio_on_call
# source: src/lib/salad/mhash.h
leak:mh_i64ptr_new
# test: sql-tap/gh2250-trigger-chain-limit.test.lua
# source: src/lib/core/exception.cc
leak:Exception::operator new
......@@ -75,10 +69,6 @@ leak:Exception::operator new
# source: src/lib/core/fiber.h
leak:fiber_cxx_invoke
# test: vinyl/recover.test.lua
# source: src/lib/core/fiber.c
leak:cord_costart_thread_func
# TODO(gh-9213): remove when the issue is fixed
leak:luaT_push_key_def
......
......@@ -597,6 +597,7 @@ wal_free(void)
struct wal_writer *writer = &wal_writer_singleton;
cbus_stop_loop(&writer->wal_pipe);
cpipe_destroy(&writer->wal_pipe);
if (cord_join(&writer->cord)) {
/* We can't recover from this in any reasonable way. */
......@@ -1306,6 +1307,7 @@ wal_writer_f(va_list ap)
wal_xlog_close(&vy_log_writer.xlog);
cpipe_destroy(&writer->tx_prio_pipe);
cbus_endpoint_destroy(&endpoint, cbus_process);
return 0;
}
......
......@@ -132,13 +132,6 @@ cbus_endpoint_poison_f(struct cmsg *msg)
void
cpipe_destroy(struct cpipe *pipe)
{
/*
* The thread should not be canceled while mutex is locked.
* And everything else must be protected for consistency.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
ev_async_stop(pipe->producer, &pipe->flush_input);
static const struct cmsg_hop route[1] = {
......@@ -171,9 +164,6 @@ cpipe_destroy(struct cpipe *pipe)
*/
ev_async_send(endpoint->consumer, &endpoint->async);
tt_pthread_mutex_unlock(&endpoint->mutex);
tt_pthread_setcancelstate(old_cancel_state, NULL);
TRASH(pipe);
}
......@@ -291,17 +281,6 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
/* Trigger task processing when the queue becomes non-empty. */
bool output_was_empty;
/*
* We need to set a thread cancellation guard, because
* another thread may cancel the current thread
* (write() is a cancellation point in ev_async_send)
* and the activation of the ev_async watcher
* through ev_async_send will fail.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
tt_pthread_mutex_lock(&endpoint->mutex);
output_was_empty = stailq_empty(&endpoint->output);
/** Flush input */
......@@ -315,8 +294,6 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
ev_async_send(endpoint->consumer, &endpoint->async);
}
tt_pthread_setcancelstate(old_cancel_state, NULL);
}
void
......
......@@ -209,9 +209,8 @@ fiber_mprotect(void *addr, size_t len, int prot)
goto error;
}
/*
* TODO(gh-8423) Disable mprotect temporarily. Leak sanitizer does not work
* well if memory is protected. We fail to remove protection due to the use of
* `cord_cancel_and_join` to cancel cords.
* If we panic then fiber stacks remain protected which cause leak sanitizer
* failures. Disable memory protection under ASAN.
*/
#ifndef ENABLE_ASAN
if (mprotect(addr, len, prot) != 0)
......@@ -2241,29 +2240,6 @@ cord_is_main(void)
return cord() == &main_cord;
}
void
cord_cancel_and_join(struct cord *cord)
{
assert(cord->id != 0);
tt_pthread_cancel(cord->id);
if (tt_pthread_join(cord->id, NULL) != 0)
panic("failed to join a canceled thread");
int old_cord_count = pm_atomic_fetch_sub(&cord_count, 1);
assert(old_cord_count > 0);
(void)old_cord_count;
/*
* Can't destroy the cord safely. The cancellation could even happen
* before the cord was properly initialized in its own thread. It might
* be fixed if cord would be initialized before its thread is started.
*
* Also obviously even if the creation would be fine, the destruction
* can't free everything. The cord could have some resources allocated
* on the heap with pointers not stored anywhere in struct cord - they
* can't be possibly located.
*/
memset(cord, 0, sizeof(*cord));
}
static NOINLINE int
check_stack_direction(void *prev_stack_frame)
{
......
......@@ -969,15 +969,6 @@ cord_is_main(void);
void
cord_collect_garbage(struct cord *cord);
/**
* Pthread-cancel the thread and join it in a blocking way, without yielding.
* That way is the only possible one if the event loop is already destroyed.
* Should only be used as an emergency, because all the cord resources simply
* leak.
*/
void
cord_cancel_and_join(struct cord *cord);
/**
* Return slab_cache suitable to use with tarantool/small library
*/
......
......@@ -287,12 +287,6 @@
tt_pthread_error(e__); \
})
#define tt_pthread_cancel(thread) \
({ int e__ = pthread_cancel(thread); \
assert(e__ == 0 || e__ == ESRCH); \
e__; \
})
#define tt_pthread_key_create(key, dtor) \
({ int e__ = pthread_key_create(key, dtor); \
tt_pthread_error(e__); \
......@@ -310,11 +304,6 @@
#define tt_pthread_getspecific(key) pthread_getspecific(key)
#define tt_pthread_setcancelstate(state, oldstate) \
({ int e__ = pthread_setcancelstate(state, oldstate);\
tt_pthread_error(e__); \
})
/** Set the current thread's name
*/
static inline void
......
......@@ -42,16 +42,12 @@ g2.before_each(function(cg)
end, {cg.params.cfg_type})
end)
local function drop_server(cg)
-- TODO(gh-8423): Remove this workaround.
-- If log level is 'debug' during server:drop(), the test may hang on macOS
-- (see gh-8420).
cg.server:update_box_cfg({log_level = 'info',
log_modules = {tarantool = 'info'}})
g1.after_each(function(cg)
cg.server:drop()
end
g1.after_each(drop_server)
g2.after_each(drop_server)
end)
g2.after_each(function(cg)
cg.server:drop()
end)
-- Test log.new{...}
g1.test_log_new = function(cg)
......
......@@ -261,15 +261,6 @@ create_unit_test(PREFIX cbus_call
LIBRARIES core unit stat
)
include(CheckSymbolExists)
check_symbol_exists(__GLIBC__ features.h GLIBC_USED)
if (GLIBC_USED)
create_unit_test(PREFIX cbus_hang
SOURCES cbus_hang.c core_test_utils.c
LIBRARIES core unit stat
)
endif ()
create_unit_test(PREFIX coio
SOURCES coio.cc core_test_utils.c
LIBRARIES core eio bit uri unit
......
#include "cbus.h"
#include "fiber.h"
#include "memory.h"
#include "unit.h"
struct cord hang_worker;
struct cord canceled_worker;
struct cbus_endpoint hang_endpoint;
struct cpipe pipe_from_cl_to_hang;
struct cpipe pipe_from_main_to_hang;
/*
* We want to cancel canceled thread in the moment of cpipe_flush_cb
* will be processing.
* A Linux specific dirty hack will be used for reproduce the bug.
* We need to synchronize the main thread and the canceled worker thread.
* So, do it using the endpoint's mutex internal field(__data.__lock).
* __lock == 0 - unlock
* __lock == 1 - lock
* __lock == 2 - possible waiters exists
* After pthred create - __lock change state from 1 to 2
*/
pthread_mutex_t endpoint_hack_mutex_1;
pthread_cond_t endpoint_hack_cond_1;
pthread_mutex_t endpoint_hack_mutex_2;
pthread_cond_t endpoint_hack_cond_2;
static
void join_fail(int signum) {
(void)signum;
printf("Can't join the hang worker\n");
exit(EXIT_FAILURE);
}
static void
do_nothing(struct cmsg *m)
{
(void) m;
}
static int
hang_worker_f(va_list ap)
{
(void) ap;
cbus_endpoint_create(&hang_endpoint, "hang_worker",
fiber_schedule_cb, fiber());
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
tt_pthread_cond_signal(&endpoint_hack_cond_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
cbus_loop(&hang_endpoint);
cbus_endpoint_destroy(&hang_endpoint, cbus_process);
return 0;
}
static void
hang_worker_start()
{
cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
}
static int
canceled_worker_f(va_list ap)
{
(void) ap;
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
tt_pthread_cond_signal(&endpoint_hack_cond_1);
/* Wait a start command from the main thread */
tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
tt_pthread_cond_wait(&endpoint_hack_cond_2, &endpoint_hack_mutex_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
static struct cmsg_hop nothing_route = { do_nothing, NULL };
static struct cmsg nothing_msg;
cmsg_init(&nothing_msg, &nothing_route);
/*
* We need to use the cpipe_push_input cause
* an ev_invoke must be called for a hang reproducing
*/
cpipe_push_input(&pipe_from_cl_to_hang, &nothing_msg);
cpipe_destroy(&pipe_from_cl_to_hang);
return 0;
}
static void
canceled_worker_start()
{
cord_costart(&canceled_worker, "canceled_worker",
canceled_worker_f, NULL);
}
static int
main_f(va_list ap)
{
(void) ap;
/* Start the endpoint's mutex hack */
/* Initialize the endpoint mutex */
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
hang_worker_start();
tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
/*
* Create (only create) the canceled worker before the endpoint mutex will be locked
* for the hack work correctly
*/
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
canceled_worker_start();
tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
tt_pthread_mutex_lock(&(hang_endpoint.mutex));
/* Start canceled worker */
tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
tt_pthread_cond_signal(&endpoint_hack_cond_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
while(hang_endpoint.mutex.__data.__lock < 2) {
usleep(200);
}
tt_pthread_cancel(canceled_worker.id);
tt_pthread_mutex_unlock(&(hang_endpoint.mutex));
/* Hack end */
cord_join(&canceled_worker);
unsigned join_timeout = 5;
signal(SIGALRM, join_fail); // For exit in a hang case
alarm(join_timeout);
cpipe_create(&pipe_from_main_to_hang, "hang_worker");
cbus_stop_loop(&pipe_from_main_to_hang);
cpipe_destroy(&pipe_from_main_to_hang);
cord_join(&hang_worker);
ok(true, "The hang worker has been joined");
alarm(0);
ev_break(loop(), EVBREAK_ALL);
return 0;
}
int
main()
{
header();
plan(1);
memory_init();
fiber_init(fiber_c_invoke);
cbus_init();
tt_pthread_cond_init(&endpoint_hack_cond_1, NULL);
tt_pthread_mutex_init(&endpoint_hack_mutex_1, NULL);
tt_pthread_cond_init(&endpoint_hack_cond_2, NULL);
tt_pthread_mutex_init(&endpoint_hack_mutex_2, NULL);
struct fiber *main_fiber = fiber_new("main", main_f);
assert(main_fiber != NULL);
fiber_wakeup(main_fiber);
ev_run(loop(), 0);
tt_pthread_cond_destroy(&endpoint_hack_cond_1);
tt_pthread_cond_destroy(&endpoint_hack_cond_2);
cbus_free();
fiber_free();
memory_free();
int rc = check_plan();
footer();
return rc;
}
*** main ***
1..1
ok 1 - The hang worker has been joined
*** main: done ***
......@@ -72,15 +72,6 @@ cancel_dead_f(va_list ap)
return 0;
}
static int
usleep_f(va_list ap)
{
(void)ap;
while (true)
usleep(1000);
return 0;
}
static void NOINLINE
stack_expand(unsigned long *ret, unsigned long nr_calls)
{
......@@ -444,29 +435,6 @@ cord_cojoin_cancel_test(void)
footer();
}
static void
cord_cancel_and_join_test(void)
{
header();
struct cord tcord;
/* Join an exited but not yet joined thread. */
memset(&tcord, 0, sizeof(tcord));
fail_if(cord_costart(&tcord, "test", noop_f, NULL) != 0);
/* Give the thread some time to exit. */
fiber_sleep(0.01);
cord_cancel_and_join(&tcord);
/* Cancel and join a hanging thread. */
memset(&tcord, 0, sizeof(tcord));
fail_if(cord_costart(&tcord, "test", usleep_f, NULL) != 0);
/* Give the thread some time to start. */
fiber_sleep(0.01);
cord_cancel_and_join(&tcord);
footer();
}
static void
fiber_test_defaults()
{
......@@ -765,7 +733,6 @@ main_f(va_list ap)
fiber_wait_on_deadline_test();
cord_cojoin_test();
cord_cojoin_cancel_test();
cord_cancel_and_join_test();
fiber_test_defaults();
fiber_test_leak_modes();
fiber_test_client_fiber_count();
......
......@@ -32,8 +32,6 @@ OutOfMemory: Failed to allocate 42 bytes in allocator for exception
*** cord_cojoin_test: done ***
*** cord_cojoin_cancel_test ***
*** cord_cojoin_cancel_test: done ***
*** cord_cancel_and_join_test ***
*** cord_cancel_and_join_test: done ***
*** fiber_test_defaults ***
*** fiber_test_defaults: done ***
*** fiber_test_leak ***
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment