Skip to content
Snippets Groups Projects
Commit 63d75e92 authored by Leonid Vasiliev's avatar Leonid Vasiliev Committed by Kirill Yukhin
Browse files

Add some cancellation guard

We need to set a thread cancellation guard, because
another thread may cancel the current thread at a
really bad time (messages flush, mutex lock)

Fixes: #4127
parent f4f886bd
No related branches found
No related tags found
No related merge requests found
...@@ -132,6 +132,13 @@ cbus_endpoint_poison_f(struct cmsg *msg) ...@@ -132,6 +132,13 @@ cbus_endpoint_poison_f(struct cmsg *msg)
void void
cpipe_destroy(struct cpipe *pipe) cpipe_destroy(struct cpipe *pipe)
{ {
/*
* The thread should not be canceled while mutex is locked.
* And everything else must be protected for consistency.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
ev_async_stop(pipe->producer, &pipe->flush_input); ev_async_stop(pipe->producer, &pipe->flush_input);
static const struct cmsg_hop route[1] = { static const struct cmsg_hop route[1] = {
...@@ -165,6 +172,8 @@ cpipe_destroy(struct cpipe *pipe) ...@@ -165,6 +172,8 @@ cpipe_destroy(struct cpipe *pipe)
ev_async_send(endpoint->consumer, &endpoint->async); ev_async_send(endpoint->consumer, &endpoint->async);
tt_pthread_mutex_unlock(&endpoint->mutex); tt_pthread_mutex_unlock(&endpoint->mutex);
tt_pthread_setcancelstate(old_cancel_state, NULL);
TRASH(pipe); TRASH(pipe);
} }
...@@ -284,6 +293,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) ...@@ -284,6 +293,17 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
/* Trigger task processing when the queue becomes non-empty. */ /* Trigger task processing when the queue becomes non-empty. */
bool output_was_empty; bool output_was_empty;
/*
* We need to set a thread cancellation guard, because
* another thread may cancel the current thread
* (write() is a cancellation point in ev_async_send)
* and the activation of the ev_async watcher
* through ev_async_send will fail.
*/
int old_cancel_state;
tt_pthread_setcancelstate(PTHREAD_CANCEL_DISABLE, &old_cancel_state);
tt_pthread_mutex_lock(&endpoint->mutex); tt_pthread_mutex_lock(&endpoint->mutex);
output_was_empty = stailq_empty(&endpoint->output); output_was_empty = stailq_empty(&endpoint->output);
/** Flush input */ /** Flush input */
...@@ -297,6 +317,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events) ...@@ -297,6 +317,8 @@ cpipe_flush_cb(ev_loop *loop, struct ev_async *watcher, int events)
ev_async_send(endpoint->consumer, &endpoint->async); ev_async_send(endpoint->consumer, &endpoint->async);
} }
tt_pthread_setcancelstate(old_cancel_state, NULL);
} }
void void
......
...@@ -300,6 +300,11 @@ ...@@ -300,6 +300,11 @@
#define tt_pthread_getspecific(key) pthread_getspecific(key) #define tt_pthread_getspecific(key) pthread_getspecific(key)
#define tt_pthread_setcancelstate(state, oldstate) \
({ int e__ = pthread_setcancelstate(state, oldstate);\
tt_pthread_error(e__); \
})
/** Set the current thread's name /** Set the current thread's name
*/ */
static inline void static inline void
......
...@@ -102,6 +102,11 @@ target_link_libraries(cbus_stress.test core stat) ...@@ -102,6 +102,11 @@ target_link_libraries(cbus_stress.test core stat)
add_executable(cbus.test cbus.c) add_executable(cbus.test cbus.c)
target_link_libraries(cbus.test core unit stat) target_link_libraries(cbus.test core unit stat)
if (${CMAKE_HOST_SYSTEM_NAME} MATCHES "Linux")
add_executable(cbus_hang.test cbus_hang.c)
target_link_libraries(cbus_hang.test core unit stat)
endif ()
add_executable(coio.test coio.cc) add_executable(coio.test coio.cc)
target_link_libraries(coio.test core eio bit uri unit) target_link_libraries(coio.test core eio bit uri unit)
......
#include "cbus.h"
#include "fiber.h"
#include "memory.h"
#include "unit.h"
struct cord hang_worker;
struct cord canceled_worker;
struct cbus_endpoint hang_endpoint;
struct cpipe pipe_from_cl_to_hang;
struct cpipe pipe_from_main_to_hang;
/*
* We want to cancel canceled thread in the moment of cpipe_flush_cb
* will be processing.
* A Linux specific dirty hack will be used for reproduce the bug.
* We need to synchronize the main thread and the canceled worker thread.
* So, do it using the endpoint's mutex internal field(__data.__lock).
* __lock == 0 - unlock
* __lock == 1 - lock
* __lock == 2 - possible waiters exists
* After pthred create - __lock change state from 1 to 2
*/
pthread_mutex_t endpoint_hack_mutex_1;
pthread_cond_t endpoint_hack_cond_1;
pthread_mutex_t endpoint_hack_mutex_2;
pthread_cond_t endpoint_hack_cond_2;
static
void join_fail(int signum) {
(void)signum;
printf("Can't join the hang worker\n");
exit(EXIT_FAILURE);
}
static void
do_nothing(struct cmsg *m)
{
(void) m;
}
static int
hang_worker_f(va_list ap)
{
(void) ap;
cbus_endpoint_create(&hang_endpoint, "hang_worker",
fiber_schedule_cb, fiber());
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
tt_pthread_cond_signal(&endpoint_hack_cond_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
cbus_loop(&hang_endpoint);
cbus_endpoint_destroy(&hang_endpoint, cbus_process);
return 0;
}
static void
hang_worker_start()
{
cord_costart(&hang_worker, "hang_worker", hang_worker_f, NULL);
}
static int
canceled_worker_f(va_list ap)
{
(void) ap;
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
tt_pthread_cond_signal(&endpoint_hack_cond_1);
/* Wait a start command from the main thread */
tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
tt_pthread_cond_wait(&endpoint_hack_cond_2, &endpoint_hack_mutex_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
cpipe_create(&pipe_from_cl_to_hang, "hang_worker");
cpipe_set_max_input(&pipe_from_cl_to_hang, 1);
static struct cmsg_hop nothing_route = { do_nothing, NULL };
static struct cmsg nothing_msg;
cmsg_init(&nothing_msg, &nothing_route);
/*
* We need to use the cpipe_push_input cause
* an ev_invoke must be called for a hang reproducing
*/
cpipe_push_input(&pipe_from_cl_to_hang, &nothing_msg);
cpipe_destroy(&pipe_from_cl_to_hang);
return 0;
}
static void
canceled_worker_start()
{
cord_costart(&canceled_worker, "canceled_worker",
canceled_worker_f, NULL);
}
static int
main_f(va_list ap)
{
(void) ap;
/* Start the endpoint's mutex hack */
/* Initialize the endpoint mutex */
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
hang_worker_start();
tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
/*
* Create (only create) the canceled worker before the endpoint mutex will be locked
* for the hack work correctly
*/
tt_pthread_mutex_lock(&endpoint_hack_mutex_1);
canceled_worker_start();
tt_pthread_cond_wait(&endpoint_hack_cond_1, &endpoint_hack_mutex_1);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_1);
tt_pthread_mutex_lock(&(hang_endpoint.mutex));
/* Start canceled worker */
tt_pthread_mutex_lock(&endpoint_hack_mutex_2);
tt_pthread_cond_signal(&endpoint_hack_cond_2);
tt_pthread_mutex_unlock(&endpoint_hack_mutex_2);
while(hang_endpoint.mutex.__data.__lock < 2) {
usleep(200);
}
tt_pthread_cancel(canceled_worker.id);
tt_pthread_mutex_unlock(&(hang_endpoint.mutex));
/* Hack end */
tt_pthread_join(canceled_worker.id, NULL);
unsigned join_timeout = 5;
signal(SIGALRM, join_fail); // For exit in a hang case
alarm(join_timeout);
cpipe_create(&pipe_from_main_to_hang, "hang_worker");
cbus_stop_loop(&pipe_from_main_to_hang);
cpipe_destroy(&pipe_from_main_to_hang);
cord_join(&hang_worker);
ok(true, "The hang worker has been joined");
alarm(0);
ev_break(loop(), EVBREAK_ALL);
return 0;
}
int
main()
{
header();
plan(1);
memory_init();
fiber_init(fiber_c_invoke);
cbus_init();
tt_pthread_cond_init(&endpoint_hack_cond_1, NULL);
tt_pthread_mutex_init(&endpoint_hack_mutex_1, NULL);
tt_pthread_cond_init(&endpoint_hack_cond_2, NULL);
tt_pthread_mutex_init(&endpoint_hack_mutex_2, NULL);
struct fiber *main_fiber = fiber_new("main", main_f);
assert(main_fiber != NULL);
fiber_wakeup(main_fiber);
ev_run(loop(), 0);
tt_pthread_cond_destroy(&endpoint_hack_cond_1);
tt_pthread_cond_destroy(&endpoint_hack_cond_2);
cbus_free();
fiber_free();
memory_free();
int rc = check_plan();
footer();
return rc;
}
*** main ***
1..1
ok 1 - The hang worker has been joined
*** main: done ***
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment