diff --git a/src/box/wal.cc b/src/box/wal.cc index 2ecb657aff31f4eaf9ad2ff54fce676f0df8ac7e..de7be5060f30ef77de36ae0c672726361109eb9d 100644 --- a/src/box/wal.cc +++ b/src/box/wal.cc @@ -42,17 +42,12 @@ const char *wal_mode_STRS[] = { "none", "write", "fsync", NULL }; * WAL writer - maintain a Write Ahead Log for every change * in the data state. */ - -/* Context of the WAL writer thread. */ -STAILQ_HEAD(wal_fifo, wal_request); - struct wal_writer { - struct wal_fifo input; - struct wal_fifo commit; struct cord cord; + struct cpipe tx_pipe; + struct cpipe wal_pipe; struct cbus tx_wal_bus; - ev_async write_event; int rows_per_wal; struct fio_batch *batch; bool is_shutdown; @@ -61,11 +56,23 @@ struct wal_writer struct vclock vclock; }; -static struct wal_writer wal_writer; +static void +wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */) +{ + struct cpipe *pipe = (struct cpipe *) watcher->data; + cbus_lock(pipe->bus); + bool input_was_empty = STAILQ_EMPTY(&pipe->pipe); + STAILQ_CONCAT(&pipe->pipe, &pipe->input); + cbus_unlock(pipe->bus); + + if (input_was_empty) + cbus_signal(pipe->bus); + pipe->n_input = 0; +} /** * A commit watcher callback is invoked whenever there - * are requests in wal_writer->commit. This callback is + * are requests in wal_writer->tx.pipe. This callback is * associated with an internal WAL writer watcher and is * invoked in the front-end main event loop. * @@ -79,33 +86,36 @@ static struct wal_writer wal_writer; * call in the writer thread loop). */ static void -wal_schedule_queue(struct wal_fifo *queue) +tx_schedule_queue(struct cmsg_fifo *queue) { /* * Can't use STAILQ_FOREACH since fiber_call() * destroys the list entry. */ - struct wal_request *req, *tmp; - STAILQ_FOREACH_SAFE(req, queue, wal_fifo_entry, tmp) - fiber_call(req->fiber); + struct cmsg *m, *tmp; + STAILQ_FOREACH_SAFE(m, queue, fifo, tmp) + fiber_call(((struct wal_request *) m)->fiber); } static void -wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */) +tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */) { struct wal_writer *writer = (struct wal_writer *) watcher->data; - struct wal_fifo commit = STAILQ_HEAD_INITIALIZER(commit); - struct wal_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); + struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit); + struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); + bool is_rollback; cbus_lock(&writer->tx_wal_bus); - STAILQ_CONCAT(&commit, &writer->commit); - if (writer->is_rollback) { - STAILQ_CONCAT(&rollback, &writer->input); - writer->is_rollback = false; - } + STAILQ_CONCAT(&commit, &writer->tx_pipe.pipe); + is_rollback = writer->is_rollback; + if (is_rollback) + STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe); + writer->is_rollback = false; cbus_unlock(&writer->tx_wal_bus); + if (is_rollback) + STAILQ_CONCAT(&rollback, &writer->wal_pipe.input); - wal_schedule_queue(&commit); + tx_schedule_queue(&commit); /* * Perform a cascading abort of all transactions which * depend on the transaction which failed to get written @@ -113,8 +123,8 @@ wal_schedule(ev_loop * /* loop */, ev_async *watcher, int /* event */) * in reverse order, performing a playback of the * in-memory database state. */ - STAILQ_REVERSE(&rollback, wal_request, wal_fifo_entry); - wal_schedule_queue(&rollback); + STAILQ_REVERSE(&rollback, cmsg, fifo); + tx_schedule_queue(&rollback); } /** @@ -128,12 +138,9 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock, { cbus_create(&writer->tx_wal_bus); - STAILQ_INIT(&writer->input); - STAILQ_INIT(&writer->commit); + cpipe_create(&writer->tx_pipe); + cpipe_set_fetch_cb(&writer->tx_pipe, tx_fetch_output, writer); - ev_async_init(&writer->write_event, wal_schedule); - writer->write_event.data = writer; - writer->txn_loop = loop(); writer->rows_per_wal = rows_per_wal; writer->batch = fio_batch_alloc(sysconf(_SC_IOV_MAX)); @@ -150,12 +157,13 @@ wal_writer_init(struct wal_writer *writer, struct vclock *vclock, static void wal_writer_destroy(struct wal_writer *writer) { + cpipe_destroy(&writer->tx_pipe); cbus_destroy(&writer->tx_wal_bus); free(writer->batch); } /** WAL writer thread routine. */ -static void *wal_writer_thread(void *worker_args); +static void wal_writer_f(va_list ap); /** * Initialize WAL writer, start the thread. @@ -175,23 +183,25 @@ wal_writer_start(struct recovery_state *r, int rows_per_wal) assert(r->writer == NULL); assert(r->current_wal == NULL); assert(rows_per_wal > 1); - assert(! wal_writer.is_shutdown); - assert(STAILQ_EMPTY(&wal_writer.input)); - assert(STAILQ_EMPTY(&wal_writer.commit)); - /* I. Initialize the state. */ - wal_writer_init(&wal_writer, &r->vclock, rows_per_wal); - r->writer = &wal_writer; + static struct wal_writer wal_writer; - ev_async_start(wal_writer.txn_loop, &wal_writer.write_event); + struct wal_writer *writer = &wal_writer; + r->writer = writer; + + /* I. Initialize the state. */ + wal_writer_init(writer, &r->vclock, rows_per_wal); /* II. Start the thread. */ - if (cord_start(&wal_writer.cord, "wal", wal_writer_thread, r)) { - wal_writer_destroy(&wal_writer); + if (cord_costart(&writer->cord, "wal", wal_writer_f, r)) { + wal_writer_destroy(writer); r->writer = NULL; return -1; } + cbus_join(&writer->tx_wal_bus, &writer->tx_pipe); + cpipe_set_flush_cb(&writer->wal_pipe, wal_flush_input, + &writer->wal_pipe); return 0; } @@ -205,37 +215,18 @@ wal_writer_stop(struct recovery_state *r) cbus_lock(&writer->tx_wal_bus); writer->is_shutdown= true; - cbus_signal(&writer->tx_wal_bus); cbus_unlock(&writer->tx_wal_bus); + cbus_signal(&writer->tx_wal_bus); if (cord_join(&writer->cord)) { /* We can't recover from this in any reasonable way. */ panic_syserror("WAL writer: thread join failed"); } - ev_async_stop(writer->txn_loop, &writer->write_event); wal_writer_destroy(writer); r->writer = NULL; } -/** - * Pop a bulk of requests to write to disk to process. - * Block on the condition only if we have no other work to - * do. Loop in case of a spurious wakeup. - */ -void -wal_writer_pop(struct wal_writer *writer, struct wal_fifo *input) -{ - while (! writer->is_shutdown) - { - if (! writer->is_rollback && ! STAILQ_EMPTY(&writer->input)) { - STAILQ_CONCAT(input, &writer->input); - break; - } - cbus_wait_signal(&writer->tx_wal_bus); - } -} - /** * If there is no current WAL, try to open it, and close the * previous WAL. We close the previous WAL only after opening @@ -310,7 +301,7 @@ wal_fill_batch(struct xlog *wal, struct fio_batch *batch, int rows_per_wal, iovcnt += xlog_encode_row(*row, iov); } fio_batch_add(batch, iovcnt); - req = STAILQ_NEXT(req, wal_fifo_entry); + req = (struct wal_request *) STAILQ_NEXT(req, fifo); } return req; } @@ -338,20 +329,41 @@ wal_write_batch(struct xlog *wal, struct fio_batch *batch, req->rows[req->n_rows - 1]->server_id, req->rows[req->n_rows - 1]->lsn); req->res = 0; - req = STAILQ_NEXT(req, wal_fifo_entry); + req = (struct wal_request *) STAILQ_NEXT(req, fifo); } return req; } +/** + * Pop a bulk of requests to write to disk to process. + * Block on the condition only if we have no other work to + * do. Loop in case of a spurious wakeup. + */ +void +wal_writer_pop(struct wal_writer *writer) +{ + while (! writer->is_shutdown) + { + if (! writer->is_rollback && + ! STAILQ_EMPTY(&writer->wal_pipe.pipe)) { + STAILQ_CONCAT(&writer->wal_pipe.output, + &writer->wal_pipe.pipe); + break; + } + cbus_wait_signal(&writer->tx_wal_bus); + } +} + + static void wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, - struct wal_fifo *input, struct wal_fifo *commit, - struct wal_fifo *rollback) + struct cmsg_fifo *input, struct cmsg_fifo *commit, + struct cmsg_fifo *rollback) { struct xlog **wal = &r->current_wal; struct fio_batch *batch = writer->batch; - struct wal_request *req = STAILQ_FIRST(input); + struct wal_request *req = (struct wal_request *) STAILQ_FIRST(input); struct wal_request *write_end = req; while (req) { @@ -369,29 +381,33 @@ wal_write_to_disk(struct recovery_state *r, struct wal_writer *writer, req = write_end; } fiber_gc(); - STAILQ_SPLICE(input, write_end, wal_fifo_entry, rollback); + STAILQ_SPLICE(input, write_end, fifo, rollback); STAILQ_CONCAT(commit, input); } /** WAL writer thread main loop. */ -static void * -wal_writer_thread(void *worker_args) +static void +wal_writer_f(va_list ap) { - struct recovery_state *r = (struct recovery_state *) worker_args; + struct recovery_state *r = va_arg(ap, struct recovery_state *); struct wal_writer *writer = r->writer; - struct wal_fifo input = STAILQ_HEAD_INITIALIZER(input); - struct wal_fifo commit = STAILQ_HEAD_INITIALIZER(commit); - struct wal_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); + + cpipe_create(&writer->wal_pipe); + cbus_join(&writer->tx_wal_bus, &writer->wal_pipe); + + struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit); + struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); cbus_lock(&writer->tx_wal_bus); while (! writer->is_shutdown) { - wal_writer_pop(writer, &input); + wal_writer_pop(writer); cbus_unlock(&writer->tx_wal_bus); - wal_write_to_disk(r, writer, &input, &commit, &rollback); + wal_write_to_disk(r, writer, &writer->wal_pipe.output, + &commit, &rollback); cbus_lock(&writer->tx_wal_bus); - STAILQ_CONCAT(&writer->commit, &commit); + STAILQ_CONCAT(&writer->tx_pipe.pipe, &commit); if (! STAILQ_EMPTY(&rollback)) { /* * Begin rollback: create a rollback queue @@ -400,17 +416,18 @@ wal_writer_thread(void *worker_args) * input queue. */ writer->is_rollback = true; - STAILQ_CONCAT(&rollback, &writer->input); - STAILQ_CONCAT(&writer->input, &rollback); + STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe); + STAILQ_CONCAT(&writer->wal_pipe.pipe, &rollback); } - ev_async_send(writer->txn_loop, &writer->write_event); + ev_async_send(writer->tx_pipe.consumer, + &writer->tx_pipe.fetch_output); } cbus_unlock(&writer->tx_wal_bus); if (r->current_wal != NULL) { xlog_close(r->current_wal); r->current_wal = NULL; } - return NULL; + cpipe_destroy(&writer->wal_pipe); } /** @@ -430,16 +447,7 @@ wal_write(struct recovery_state *r, struct wal_request *req) req->fiber = fiber(); req->res = -1; - cbus_lock(&writer->tx_wal_bus); - - bool input_was_empty = STAILQ_EMPTY(&writer->input); - STAILQ_INSERT_TAIL(&writer->input, req, wal_fifo_entry); - - if (input_was_empty) - cbus_signal(&writer->tx_wal_bus); - - cbus_unlock(&writer->tx_wal_bus); - + cpipe_push(&writer->wal_pipe, req); /** * It's not safe to spuriously wakeup this fiber * since in that case it will ignore a possible diff --git a/src/box/wal.h b/src/box/wal.h index f2bea13e1bc2f294fe31866a57481afac81eae0e..42ac8f2c8ffde39375de2aec8ecd1366f3cfd160 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -40,8 +40,7 @@ enum wal_mode { WAL_NONE = 0, WAL_WRITE, WAL_FSYNC, WAL_MODE_MAX }; /** String constants for the supported modes. */ extern const char *wal_mode_STRS[]; -struct wal_request { - STAILQ_ENTRY(wal_request) wal_fifo_entry; +struct wal_request: public cmsg { /* Auxiliary. */ int64_t res; struct fiber *fiber; diff --git a/src/cbus.cc b/src/cbus.cc index bf8115d2818f0eb1993304a4f4fa34ede22de26a..0d07ddae34d127219a3a8d2e45dee819cd0a8311 100644 --- a/src/cbus.cc +++ b/src/cbus.cc @@ -66,6 +66,13 @@ cpipe_create(struct cpipe *pipe) ev_async_start(pipe->consumer, &pipe->fetch_output); } +void +cpipe_destroy(struct cpipe *pipe) +{ + assert(loop() == pipe->consumer); + ev_async_stop(pipe->consumer, &pipe->fetch_output); +} + static void cpipe_join(struct cpipe *pipe, struct cbus *bus, struct cpipe *peer) { diff --git a/src/cbus.h b/src/cbus.h index 98d6c3990cf6672c81599a5e6cf49c7c895fbbd9..02b87bc3a6c6fd17504b07da418a4a9527e6b284 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -148,6 +148,9 @@ struct cpipe { void cpipe_create(struct cpipe *pipe); +void +cpipe_destroy(struct cpipe *pipe); + /** * Reset the default fetch output callback with a custom one. */ @@ -164,6 +167,22 @@ cpipe_set_fetch_cb(struct cpipe *pipe, ev_async_cb fetch_output_cb, pipe->fetch_output.data = data; } +/** + * Reset the default fetch output callback with a custom one. + */ +static inline void +cpipe_set_flush_cb(struct cpipe *pipe, ev_async_cb flush_input_cb, + void *data) +{ + assert(loop() == pipe->producer); + /* + * According to libev documentation, you can set cb at + * virtually any time, modulo threads. + */ + ev_set_cb(&pipe->flush_input, flush_input_cb); + pipe->flush_input.data = data; +} + /** * Pop a single message from the staged output area. If * the output is empty, returns NULL. There may be messages