Skip to content
Snippets Groups Projects
Commit 577ef3a7 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Thread-based WAL writer: fix the bug with out of order confirm_lsn().

parent eacb063c
No related branches found
No related tags found
No related merge requests found
......@@ -67,10 +67,11 @@ struct recovery_state *recovery_state;
struct wal_writer
{
STAILQ_HEAD(wal_fifo, wal_write_request) input;
STAILQ_HEAD(wal_fifo, wal_write_request) input, output;
pthread_t thread;
pthread_mutex_t mutex;
pthread_cond_t cond;
ev_async async;
bool is_shutdown;
};
......@@ -120,7 +121,7 @@ confirm_lsn(struct recovery_state *r, i64 lsn)
return 0;
} else {
say_warn("lsn double confirmed:%" PRIi64, r->confirmed_lsn);
say_warn("lsn double confirmed:%" PRIi64, lsn);
}
return -1;
......@@ -1211,6 +1212,29 @@ wal_writer_init_once()
pthread_atfork(NULL, NULL, wal_writer_child);
}
static void
wal_writer_schedule(ev_watcher *watcher, int event __attribute__((unused)))
{
struct wal_writer *writer = watcher->data;
struct wal_fifo output;
tt_pthread_mutex_lock(&writer->mutex);
output = writer->output;
STAILQ_INIT(&writer->output);
tt_pthread_mutex_unlock(&writer->mutex);
/*
* Can't use STAILQ_FOREACH since fiber_call()
* destroys the list entry.
*/
struct wal_write_request *req = STAILQ_FIRST(&output);
while (req) {
struct fiber *f = req->fiber;
req = STAILQ_NEXT(req, wal_fifo_entry);
fiber_call(f);
}
}
static void
wal_writer_init(struct wal_writer *writer)
{
......@@ -1238,6 +1262,10 @@ wal_writer_init(struct wal_writer *writer)
tt_pthread_condattr_destroy(&clock_monotonic);
STAILQ_INIT(&writer->input);
STAILQ_INIT(&writer->output);
ev_async_init(&writer->async, (void *) wal_writer_schedule);
writer->async.data = writer;
tt_pthread_once(&wal_writer_once, wal_writer_init_once);
}
......@@ -1265,11 +1293,15 @@ wal_writer_start(struct recovery_state *state)
{
assert(state->writer == NULL);
assert(wal_writer.is_shutdown == false);
assert(STAILQ_EMPTY(&wal_writer.input));
assert(STAILQ_EMPTY(&wal_writer.output));
/* I. Initialize the state. */
wal_writer_init(&wal_writer);
state->writer = &wal_writer;
ev_async_start(&wal_writer.async);
/* II. Start the thread. */
if (pthread_create(&wal_writer.thread, NULL, wal_writer_thread,
......@@ -1295,22 +1327,26 @@ wal_writer_stop(struct recovery_state *state)
tt_pthread_cond_signal(&writer->cond);
tt_pthread_mutex_unlock(&writer->mutex);
if (pthread_join(writer->thread, NULL) == 0) {
wal_writer_destroy(writer);
return 0;
}
say_syserror("WAL writer: thread join failed");
if (pthread_join(writer->thread, NULL) != 0)
goto error;
ev_async_stop(&writer->async);
wal_writer_destroy(writer);
return 0;
error:
/* We can't recover from this in any reasonable way. */
panic_syserror("WAL writer: thread join failed");
return -1;
}
struct wal_fifo
wal_writer_pop(struct wal_writer *writer, bool wait)
wal_writer_pop(struct wal_writer *writer, bool input_was_empty)
{
struct wal_fifo input;
do {
input = writer->input;
STAILQ_INIT(&writer->input);
if (STAILQ_EMPTY(&input) == false || wait == false)
if (STAILQ_EMPTY(&input) == false || input_was_empty == false)
break;
tt_pthread_cond_wait(&writer->cond, &writer->mutex);
} while (writer->is_shutdown == false);
......@@ -1413,35 +1449,32 @@ wal_writer_thread(void *worker_args)
{
struct recovery_state *r = worker_args;
struct wal_writer *writer = r->writer;
struct wal_fifo output = STAILQ_HEAD_INITIALIZER(output);
bool input_was_empty = true;
struct wal_write_request *req;
tt_pthread_mutex_lock(&writer->mutex);
while (writer->is_shutdown == false) {
struct wal_fifo input =
wal_writer_pop(writer, STAILQ_EMPTY(&output));
struct wal_fifo input = wal_writer_pop(writer, input_was_empty);
pthread_mutex_unlock(&writer->mutex);
/*
* Check the old list of fibers to wakeup *here*
* since we needed a membar for its out_lsn's to
* Wake up fibers waiting on the old list *here*
* since we need a membar for request out_lsn's to
* sync up.
*/
STAILQ_FOREACH(req, &output, wal_fifo_entry) {
/*
* @todo:
* Even though wal_write() is not
* a cancellation point, check the fiber
* wasn't cancelled and recycled.
* */
fiber_wakeup(req->fiber);
}
if (input_was_empty == false)
ev_async_send(&writer->async);
STAILQ_FOREACH(req, &input, wal_fifo_entry) {
(void) write_to_disk(r, req);
}
output = input;
input_was_empty = STAILQ_EMPTY(&input);
tt_pthread_mutex_lock(&writer->mutex);
STAILQ_CONCAT(&writer->output, &input);
}
tt_pthread_mutex_unlock(&writer->mutex);
if (input_was_empty == false)
ev_async_send(&writer->async);
write_to_disk(r, NULL);
return NULL;
}
......@@ -1478,6 +1511,8 @@ wal_write(struct recovery_state *r, u16 tag, u16 op, u64 cookie,
fiber_yield();
assert(req->out_lsn == 0 || (req->lsn == lsn && req->out_lsn == lsn));
return req->out_lsn == 0 ? -1 : 0;
}
......
......@@ -116,7 +116,7 @@ vsay(int level, const char *filename, int line, const char *error, const char *f
const char *peer_name = fiber_peer_name(fiber);
size_t p = 0, len = PIPE_BUF;
const char *f;
static char buf[PIPE_BUF];
static __thread char buf[PIPE_BUF];
if (booting) {
fprintf(stderr, "%s: ", binary_filename);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment